失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > Python对阿里云物联网MQTT设备接入端开发

Python对阿里云物联网MQTT设备接入端开发

时间:2022-06-29 15:14:52

相关推荐

Python对阿里云物联网MQTT设备接入端开发

文章目录

前言一、快速搭建Python对阿里云物联网MQTT设备接入端代码实现1. mqtt子应用下view.py 主文件2. wsgi.py二、具体使用介绍1. 安装包2. 设备认证,一机一密型接入(11-22行)3. 回调函数3.1 on_connect,设备连接云端成功后会通过on_connect回调函数通知用户(25-51行)3.2 on_disconnect ,连接成功以后如果连接断开会通过on_disconnect 回调通知用户(54-62行)4. 订阅云端消息(82-93行,155-156行)5. 接收与处理来自云端的消息(65-79行,145-146行)6. 发送消息到云端(106-112行,115-125行,147-148行)7. 取消消息订阅(95-103行,149-150行)总结

前言

MQTT是用于物联网(IoT)的OASIS标准消息传递协议。本文主要记录使用阿里云物联网平台中,网关设备接入

提示:需要理解Python paho-mqtt 模块,本文使用aliyun-iot-linkkit实现,适用于Django环境下

建议先看完我的另一篇文章阿里云物联网平台使用,在进行使用

一、快速搭建Python对阿里云物联网MQTT设备接入端代码实现

先上代码

1. mqtt子应用下view.py 主文件

import jsonimport loggingimport sysimport threadingimport timefrom linkkit import linkkitlogger = logging.getLogger('django')# 来自一机一密的设备options = {"ProductKey": "xxxxxxxxxxx","DeviceName": "device-name","DeviceSecret": "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy"}# 示例代码配置设备的设备证书以及连接的公共示例的RegionIDlk = linkkit.LinkKit(host_name="cn-shanghai", # 华东2(上海),根据自己的RegionIDproduct_key=options["ProductKey"],device_name=options["DeviceName"],device_secret=options["DeviceSecret"])def on_connect(session_flag, rc, userdata):"""callback after connect_async:param session_flag: type:int description:is previous connect session,0 new session; 1 previous session:param rc: type:int rc的值决定了连接成功或者不成功::param userdata: type: description:same as LinkKit input parameter user_data"""print("on_connect:%d,rc:%d,userdata:" % (session_flag, rc))if rc == 0:# 连接成功print("Connection successful")elif rc == 1:# 协议版本错误print("Protocol version error")elif rc == 2:# 无效的客户端标识print("Invalid client identity")elif rc == 3:# 服务器无法使用print("server unavailable")elif rc == 4:# 错误的用户名或密码print("Wrong user name or password")elif rc == 5:# 未经授权print("unaccredited")print("Connect with the result code " + str(rc))def on_disconnect(rc, userdata):"""callback after connection disconnect:param rc: type:int description:0 success call for disconnect,1 network error:param userdata: type: description:same as LinkKit input parameter user_data"""print("on_disconnect:rc:%d,userdata:" % rc)if rc != 0:print("Unexpected disconnection %s" % rc)def on_topic_message(topic, payload, qos, userdata):"""callback after subscribe_topic call:param topic: 订阅的主题:param payload: 内容:param qos: 质量服务等级:param userdata:"""print("on_topic_message:" + topic + " payload:" + str(payload) + " qos:" + str(qos))json_msg = json.loads(payload.decode('utf-8')) # 一般为json数据try:# 接收到消息后的业务逻辑,同时处理多任务亦可以采用异步、线程池等passexcept Exception as e:print('No this moType', e)def on_subscribe_topic(mid, granted_qos, userdata):"""callback after subscribe_topic call:param mid: type: int description:publish message id:param granted_qos: type:list(int) description: corresponding to subscribe_topic parameter topic,0 represent qos=0,1 represent qos=1,128 represent subscribe error:param userdata: type: description:same as LinkKit input parameter user_data"""print("on_subscribe_topic mid:%d, granted_qos:%s" %(mid, str(','.join('%s' % it for it in granted_qos))))print(granted_qos)if granted_qos == 128:print("订阅失败")def on_unsubscribe_topic(mid, userdata):"""callback after unsubscribe topic:param mid: type: int description:publish message id:param userdata: type: description:same as LinkKit input parameter user_data"""print("on_unsubscribe_topic mid:%d" % mid)passdef on_publish_topic(mid, userdata):"""callback after publish_topic call:param mid: type: int description:publish message id:param userdata: type: description:same as LinkKit input parameter user_data"""print("on_publish_topic mid:%d" % mid)# mqtt发布启动函数def mqtt_publish(sensor_data, topic='defult', qos=0):try:rc, mid = lk.publish_topic(lk.to_full_topic("user/update"), sensor_data)print("mqtt_publish:已启动...", "user/update", sensor_data)returnexcept KeyboardInterrupt:print("EXIT")# 这是网络循环的阻塞形式,直到客户端调用disconnect()时才会返回。它会自动处理重新连接。lk.on_disconnect()sys.exit(0)# 启动函数def mqtt_run():# 账号密码验证放到最前面# client.username_pw_set('user', 'user')# client = mqtt.Client()# 建立mqtt连接# 注册接收到云端数据的方法lk.on_connect = on_connect# 注册取消接收到云端数据的方法lk.on_disconnect = on_disconnect# 如果产品生产时错误地将一个三元组烧写到了多个设备,多个设备将会被物联网平台认为是同一个设备,# 从而出现一个设备上线将另外一个设备的连接断开的情况。用户可以将自己的接口信息上传到云端,那么云端可以通过接口的信息来进行问题定位。lk.config_device_info("Eth|03ACDEFF0032|Eth|03ACDEFF0031")# 企业实例域名配置的更改lk.config_mqtt(secure="", endpoint="iot-060a085o.mqtt.")# 注册云端订阅的方法lk.on_subscribe_topic = on_subscribe_topic# 注册当接受到云端发送的数据的时候的方法lk.on_topic_message = on_topic_message# 注册向云端发布数据的时候顺便所调用的方法lk.on_publish_topic = on_publish_topic# 注册取消云端订阅的方法lk.on_unsubscribe_topic = on_unsubscribe_topic# 连接阿里云的函数(异步调用)lk.connect_async()# 因为他是他是异步调用需要时间所以如果没有这个延时函数的话,他就会出现not in connected state的错误time.sleep(2)# 订阅这个topic,不需要写prodect_key和device_namerc, mid = lk.subscribe_topic(lk.to_full_topic("user/get"))

2. wsgi.py

为mqtt服务创建一个线程

import threadingfrom apps.mqtt import views# 启用多线程 运行脱机主控模块thread_mqtt_run = threading.Thread(target=views.mqtt_run)thread_mqtt_run.start()

二、具体使用介绍

1. 安装包

安装Python对接mqtt协议库,paho-mqtt

pip install paho-mqtt

安装阿里云物联网封装paho-mqtt后的库,aliyun-iot-linkkit

pip install aliyun-iot-linkkit

2. 设备认证,一机一密型接入(11-22行)

另一篇文章阿里云物联网平台使用有介绍。

从创建好的设备中,找一个设备证书,一键复制传入

注意:host_name是阿里云上你买的服务地址,“地域和可用区中查看对应的RegionID,公共实例和企业实例还有区别。

如果需要改变MQTT连接的一些默认参数,可以通过config_mqtt 指定端口等连接参数,如下所示:

config_mqtt(self, port=1883, protocol="MQTTv311", transport="TCP",secure="TLS", keep_alive=60, clean_session=True,max_inflight_message=20, max_queued_message=0,auto_reconnect_min_sec=1,auto_reconnect_max_sec=60,cadata=None):

3. 回调函数

用户可以在回调中加入自己的业务处理逻辑。

如果不够清晰,可自行查看Python Link SDK中API列表信息。

3.1 on_connect,设备连接云端成功后会通过on_connect回调函数通知用户(25-51行)

135行,为注册该回调函数

3.2 on_disconnect ,连接成功以后如果连接断开会通过on_disconnect 回调通知用户(54-62行)

137行,为注册该回调函数

4. 订阅云端消息(82-93行,155-156行)

订阅结果通过on_subscribe_topic通知用户:

5. 接收与处理来自云端的消息(65-79行,145-146行)

通过on_topic_message()回调告知用户

6. 发送消息到云端(106-112行,115-125行,147-148行)

发布消息结果通知

消息发送后,云端是否成功接收通过on_publish_topic回调通知用户:

发送消息

通过调用publish_topic()实现将消息发送到云端:

核心为:rc, mid = lk.publish_topic(lk.to_full_topic(“user/pub”), “123”)

此处为个人封装,上送时随时调取函数

7. 取消消息订阅(95-103行,149-150行)

通过调用unsubscribe_topic()取消对指定topic消息的订阅:

本文暂时没用用到,如有需要,按照如下写即可

rc, mid = lk.unsubscribe_topic(lk.to_full_topic(“user/test”))

取消订阅的结果通过on_unsubscribe_topic回调通知用户:

总结

希望大家多多交流,一起进步。

您的点赞是我坚持的动力,

如果觉得《Python对阿里云物联网MQTT设备接入端开发》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。