12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455 |
- import threading
- import paho.mqtt.client as mqtt
- from paho.mqtt.properties import Properties
- from paho.mqtt.packettypes import PacketTypes
- import logging
- import time
- logging.basicConfig(format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s',
- level=logging.DEBUG,
- filename='/var/log/aligenie.log',
- filemode='a')
- class MQTTHelper():
- client = mqtt.Client(client_id="aligenie", protocol=5)
- logger = logging.getLogger()
- connectProperties = Properties(PacketTypes.CONNECT)
- publishProperties = Properties(PacketTypes.PUBLISH)
- def __init__(self):
- self.logger.info("连接MQTT服务:127.0.0.1")
- self.client.on_disconnect = self.on_disconnect
- # 定义最大主题别名数
- self.connectProperties.TopicAliasMaximum = 65532
- t1 = threading.Thread(target=self.connectHost, name='connect-daemon-thread')
- t1.start()
- def connectHost(self):
- self.client.connect("127.0.0.1",1883,0,properties=self.connectProperties)
- self.client.loop_start()
- while True:
- time.sleep(5)
- result = self.client.publish("heartbeat","alive")
- status = result[0]
- if status == 4:
- self.logger.info("服务器断线,尝试重新连接")
- self.client.loop_stop()
- self.client = mqtt.Client(client_id="aligenie", protocol=5)
- self.client.connect("127.0.0.1",1883,0,properties=self.connectProperties)
- self.logger.info("重连成功")
- self.client.loop_start()
- def send(self,content):
- self.logger.info("使用主题别名发送!")
- try:
- # 定义主题别名为双字节的1
- self.publishProperties.TopicAlias = 1
- self.client.publish("pccontrol",payload=content, properties=self.publishProperties)
- except Exception as e:
- self.logger.info(str(e))
- def on_disconnect(self, client, userdata, rc):
- self.logger.info("连接断开,5秒后尝试重连")
- time.sleep(5)
- self.client.reconnect()
- my_helper = MQTTHelper()
|