0
点赞
收藏
分享

微信扫一扫

使用 AT命令,4G拨号,python代码


使用 AT命令,4G拨号,python代码


文章目录

  • 使用 AT命令,4G拨号,python代码
  • 前置条件
  • 一、打开AT环境
  • 二、测试
  • 三、python代码
  • 1.at.py 用于发送指令


本文将介绍如何使用 AT命令对 4G 模块进行拨号,以及一些常用的 AT命令。以下演示环境为:

  • 移远 EC200S 4G 模块
  • ARMv7l 架构
  • BusyBox

完整代码git地址 https://github.com/mtl940610/python-athttps://github.com/mtl940610/python-at

前置条件

在进行 4G 拨号前,需要确保以下条件已满足:

  1. 模块能够正常工作,可通过发送 at 命令检测;
  2. SIM 卡 PIN 码已正确验证;
  3. 设备信号强度符合要求,可通过发送 at+csq 命令检测;
  4. 设备已开启 GPRS 服务,可通过发送 at+cfun=1 命令开启;

下表列出了常用的 AT 命令及其作用:

AT命令

返回值

作用

at

OKERROR

检测模块是否能够正常工作

at+cpin?

+CPIN: READY

验证 SIM 卡 PIN 码是否正确

at+csq

CSQ: 31,99

检测设备信号强度是否符合要求

at+cfun=id

+CFUN: 1

开启/关闭设备

at+cimi

IMSI 号码

返回设备 IMSI 号码

at+cgdcont?

+CGDCONT: 1,"IPV4V6","","0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0",0,0,0,0

返回设备当前 APN 配置

at+cgdcont=1,"IPV4V6","3gnet"

配置 APN 为 3gnet

at+cgact?

+CGACT: 1,1

激活 PDP 上下文

atd99**1#

CONNECT 150000000

连接成功,连接速度为 150M

at+creg?

+CREG: 0,1+CREG: 0,5

查询设备是否已经注册到网络

+CREG: 0,1 - 设备已注册到本地网络,但还未获得移动网络服务

+CREG: 0,5 - 设备已注册并获得移动网络服务

AT+CGMI

制造商名称

返回制造商的名称

AT+CGMM

设备型号

返回设备型号

AT+CGSN

设备IMEI号

返回设备的IMEI号

AT+COPS

网络运营商名称

查询并选择网络运营商

ATH

OK 或 `ERROR``

用于挂断电话

AT+QIOPEN

TCP/UDP连接结果信息或错误信息

用于打开一个TCP/UDP连接

AT+QISEND

发送结果信息或错误信息

用于发送数据到服务器

AT+QICLOSE

关闭结果信息或错误信息

用于关闭TCP/UDP连接

一、打开AT环境

# -s 是波特率
microcom -s 115200 /dev/ttyUSB2

二、测试

可以使用Quectel-CM 工具

Quectel-CM -l &

使用 AT命令,4G拨号,python代码_TCP_02

也可以使用 AT原始命令

in: at
out: OK
 in: at+csq
out: OK
 in: at+csq
out: +CSQ: 31,99
out: 
out: OK
 in: at+creg?
# 返回0,1 或者0,5 标识网络已准备好
out: +CREG: 0,1
out: 
out: OK
 in: at+qping=1,"www.baidu.com"
# 网络已通
out: OK
out: +QPING: 0,"220.181.38.150",32,49,255
out: +QPING: 0,"220.181.38.150",32,48,255
out: +QPING: 0,"220.181.38.150",32,53,255
out: +QPING: 0,"220.181.38.150",32,47,255
out: 0,4,4,0,47,53,48

in: at+qiopen=1,0,"TCP","IP",PORT,0,0
# 打开tcp socket链接
out: OK
out: at+qiopen=1,0,"TCP","27.188.73.44",19196,0,0
out: OK
out: +QIOPEN: 0,0

 in: at+qistate?
# 查看当前tcp socket链接
out: +QISTATE: 0,"TCP","27.188.73.44",19196,30497,2,1,0,0,"usbat"
out: 
out: OK

 in: at+qisend=0,4
# 发送数据 at+qisend 第0个通道,发送4个字节
out: > 123
out: SEND OK
out: +QIURC: "recv",0

 in: at+qird=0,4
# 接收 socket 服务器端发送的数据,第0个通道,接收4个字节
out: +QIRD: 4
out: 接收

in: at+close=0
# 关闭socket通道

三、python代码

1.at.py 用于发送指令

完整代码git地址 https://github.com/mtl940610/python-athttps://github.com/mtl940610/python-at

# -*- coding: utf-8 -*-
# !/usr/bin/env python
# @Time    : 2023/3/2 15:46 
# @Author  : mtl
# @Desc    : ***
# @File    : at.py
# @Software: PyCharm
import functools
import threading
import time

import serial
from config import Config
from myLogging import DtuLogger

logger = DtuLogger(name='atlogger', file='at.log')
output = DtuLogger(name='output', file='output.log')

command = {
    "pdd": "AT+CGACT=1,1",
    "qiopen": f'AT+QIOPEN=1,0,"TCP","{Config.host}",{Config.port},0,0',
    "qistate": "AT+QISTATE=?",
    "qisend": "AT+QISEND=0,%s",
    "qiclose": "AT+QICLOSE=0",
    "qird": "at+qird=0,%s"
}


class AT:
    _instance_lock = threading.Lock()

    def __new__(cls):
        if not hasattr(cls, "_instance"):
            with cls._instance_lock:
                if not hasattr(cls, "_instance"):
                    cls._instance = super().__new__(cls)
                    cls._instance.__initialized = False
        return cls._instance

    def __init__(self):
        if self.__initialized:
            return
        self.__initialized = True
        self.conn = None
        self.serial = None
        self.heart_data = Config.heartbeat_data
        self.heart_data_length = len(self.heart_data)

    def connect(self):
        if self.serial and not self.serial.is_open:
            logger.error("连接关闭,重新连接!")
            self.serial = None
        if not self.serial:
            try:
                self.serial = serial.Serial(Config.serial_device, Config.serial_bandrate, timeout=Config.serial_timeout)
                logger.info("连接成功")
                self.__send_command(command["pdd"])
            except Exception as e:
                logger.error(e)
        while True:
            r = self.__send_command("AT+CPIN?\r\n")
            logger.debug(f"检查SIM卡状态->{r}")
            if "+CPIN: READY" in r:
                break
            logger.error("未检测到SIM卡或SIM卡未就绪,请检查!")
            time.sleep(5)

        while True:
            r = self.__send_command("AT+CREG?\r\n")
            logger.debug(f"检查网络状态->{r}")
            if "+CREG: 0,1" in r or "+CREG: 0,5" in r:
                break
            logger.error("未检测到网络,请检查SIM卡和网络状态!")
            time.sleep(5)

        self.__connect_socket()

    def __connect_socket(self):
        r = self.__send_command(command["qistate"])
        logger.debug(f"qistate->{r}")
        if "OK" == r:
            return
        self.__send_command(command["qiopen"])

    @staticmethod
    @functools.lru_cache()
    def __get_instance():
        return AT()

    def __send_command2(self, command, ln=True, retry_max=3):
        with self.__get_instance()._instance_lock:
            if not command:
                return "ERROR"
            if ln:
                command += "\r\n"
            self.serial.flushInput()  # 清空缓冲区
            self.serial.write(str(command).encode('utf-8'))
            start_time = time.time()
            response = ""
            data = "1"
            while not response and data and time.time() - start_time < 5:
                logger.debug(f"发送命令-> {str(command).encode('utf-8')},等待回复中!")
                data = self.serial.readline().decode('utf-8', errors='ignore').strip()
                response += data
                if response:
                    logger.debug(f"发送命令-> {str(command).encode('utf-8')},回复-> {response}")
            if "ERROR" in response:
                logger.debug(f"发送命令-> {str(command).encode('utf-8')},ERROR!")
                self.connect()
            output.info(f"指令->{command},输出->{response}")
            return response

    def __send_command(self, command, ln=True, retry_max=3):
        with self.__get_instance()._instance_lock:
            if not command:
                return "ERROR"
            if ln:
                command += "\r\n"
            retry_count = 0
            response = []
            while retry_count < retry_max and not response:
                self.serial.flushInput()  # 清空缓冲
                self.serial.write(command.encode())
                while True:
                    line = self.serial.readline().decode().strip()
                    if line:
                        response.append(line)
                    else:
                        break
                retry_count += 1
                time.sleep(Config.retry_interval)
            result = '\n'.join(response)
            logger.info(f"{command.strip()} -> {result}")
            output.info(f"{command.strip()} -> {result}")
            return result

    def send_heart(self):
        result = self.__send_command(command["qisend"] % self.heart_data_length)
        if "ERROR" in result:
            self.connect()
        self.__send_command(self.heart_data, ln=False)

    def send_data(self, data=""):
        self.__send_command(command["qisend"] % len(data))
        if "ERROR" in result:
            self.connect()
        return self.__send_command(data, ln=False)

    def get_data(self, length=100):
        return self.__send_command(command["qird"] % length)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        with self.__get_instance()._instance_lock:
            if self.serial:
                self.serial.close()
                self.serial = None

    def __del__(self):
        with self.__get_instance()._instance_lock:
            if self.serial:
                self.serial.close()
                self.serial = None


def send_data(data):
    at = get_at()
    return at.send_data(data)


def send_heart():
    at = get_at()
    return at.send_heart()


def get_data():
    at = get_at()
    return at.get_data()


@functools.lru_cache()
def get_at():
    at_inst = AT()
    at_inst.connect()
    return at_inst


举报

相关推荐

0 条评论