【PyQT实现MQTT上位机】:协程、paho.mqtt库、json文件读写
立即下载
资源介绍:
这份资源提供了一套完整的Python代码示例,用于构建一个基于PyQT图形界面的MQTT协议上位机应用程序。它集成了现代异步编程技术(协程)、流行的paho.mqtt客户端库以及JSON格式的数据处理能力,适用于需要实时监控和控制远程设备的应用场景。
图形用户界面:利用PyQT设计直观易用的操作界面,支持消息订阅/发布、主题管理等功能。
异步通信:通过协程实现非阻塞式的网络通讯和文件读写,确保GUI响应迅速且流畅。
MQTT协议支持:采用paho.mqtt库来连接至MQTT代理服务器,支持消息收发及QoS质量服务级别设定。
配置管理:使用JSON文件存储应用配置信息,实现离线存储,重新运行软件可以获取之前的配置信息。
from PyQt5.QtGui import QIcon, QFont
import data_mqtt_settings
import time_stamp
import mqtt
from PyQt5.QtCore import QSize, Qt, QThread, QObject, pyqtSignal
from PyQt5.QtWidgets import QApplication, QMainWindow, QPushButton, QTextEdit, QGridLayout, QWidget, QLabel, QLineEdit, \
QHBoxLayout, QVBoxLayout, QMessageBox, QSizePolicy
from json_file_processing import GET_JSON, SET_JSON
width = 900
height = 700
mqtt_settings = data_mqtt_settings.mqtt_settings
mqtt_receive_flag = False
mqtt_loop_publish_flag = False
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
# 配置界面组件
self.btn_ensure = None
self.edit_client_id = None
self.edit_topic = None
self.edit_port = None
self.edit_broker = None
# 日志界面组件
self.btn_start = None
self.edit_log = None
# MQTT
self.client = None
# 协程和实体的空初始化定义
self.get_json_worker = None
self.json_thread = None
self.init_ui()
self.create_thread_json(1)
def init_ui(self):
self.setWindowTitle("HBB_MQTT") # 设置窗口名称
self.setGeometry(100, 100, width, height) # 默认大小和位置
# self.setFixedSize(QSize(width, height)) # 调整窗体大小
self.setWindowIcon(QIcon("src/mqtt.png")) # 设置图标
layout_main = QVBoxLayout() # 主布局纵向布局
self.setup_server_section(layout_main)
self.setup_log_section(layout_main)
self.setup_publish_section(layout_main)
widget = QWidget()
widget.setLayout(layout_main)
self.setCentralWidget(widget)
def setup_server_section(self, layout_main):
""" 服务器配置部分 """
layout_server = QGridLayout()
layout_server.setContentsMargins(50, 20, 50, 20) # 设置表格布局四周的margin间隔
''' 设置服务器配置部分界面元素 '''
# 标签
lab_broker = QLabel("服务器地址")
lab_port = QLabel("端口号")
lab_topic = QLabel("订阅主题")
lab_client_id = QLabel("客户端ID")
# 单行文本输入框
self.edit_broker = QLineEdit(self)
self.edit_port = QLineEdit(self)
self.edit_topic = QLineEdit(self)
self.edit_client_id = QLineEdit(self)
# 确认按键
self.btn_ensure = QPushButton("连接")
''' 设置服务器配置部分界面元素 '''
layout_server.addWidget(lab_broker, 0, 0, 1, 1)
layout_server.addWidget(self.edit_broker, 0, 1, 1, 1)
layout_server.addWidget(lab_port, 0, 2, 1, 1)
layout_server.addWidget(self.edit_port, 0, 3, 1, 1)
layout_server.addWidget(lab_topic, 1, 0, 1, 1)
layout_server.addWidget(self.edit_topic, 1, 1, 1, 1)
layout_server.addWidget(lab_client_id, 1, 2, 1, 1)
layout_server.addWidget(self.edit_client_id, 1, 3, 1, 1)
layout_server.addWidget(self.btn_ensure, 2, 3, 1, 1)
''' 绑定按键响应函数 '''
self.btn_ensure.clicked.connect(self.ensure_settings)
layout_main.addLayout(layout_server)
def setup_log_section(self, layout_main):
""" 日志部分 """
layout_log = QGridLayout()
layout_log.setContentsMargins(50, 0, 20, 0) # 设置表格布局四周的margin间隔
''' 设置日志部分界面元素 '''
self.edit_log = QTextEdit() # 富文本框
# self.edit_log.setFixedSize(width - 250, height - 250)
self.edit_log.setReadOnly(True) # 只读(不允许用户输入)
self.btn_start = QPushButton("开始接收")
btn_clear = QPushButton("清空日志") # 按键
''' 添加日志部分界面元素到对应布局 '''
layout_log.addWidget(self.edit_log, 0, 0, 5, 1)
layout_log.addWidget(self.btn_start, 1, 1, 1, 1)
layout_log.addWidget(btn_clear, 2, 1, 1, 1)
''' 绑定日志部分按键响应函数 '''
self.btn_start.clicked.connect(self.msg_receive)
btn_clear.clicked.connect(self.clear_log)
layout_main.addLayout(layout_log)
def setup_publish_section(self, layout_main):
""" MQTT数据发送部分 """
layout_publish_main = QVBoxLayout()
layout_publish = QHBoxLayout() # 横向布局
self.edit_publish_msg = QLineEdit(self) # 数据发送部分包含一个按钮、一个发布消息输入框
btn_publish = QPushButton("发布")
layout_publish.addWidget(self.edit_publish_msg) # 添加数据部分界面元素到对应布局
layout_publish.addWidget(btn_publish)
btn_publish.clicked.connect(self.publish_msg)
layout_loop = QHBoxLayout() # 横向布局
self.edit_loop_publish_msg = QLineEdit(self)
self.btn_loop_publish = QPushButton("循环发布(3s)")
layout_loop.addWidget(self.edit_loop_publish_msg) # 添加数据部分界面元素到对应布局
layout_loop.addWidget(self.btn_loop_publish)
self.btn_loop_publish.clicked.connect(self.loop_publish_msg)
layout_publish_main.addLayout(layout_publish)
layout_publish_main.addLayout(layout_loop)
layout_main.addLayout(layout_publish_main)
def ensure_settings(self):
""" 连接、解锁按键的响应函数 """
if self.edit_port.isEnabled():
# 如果存在输入框为空的情况,跳出弹窗提醒
fields = [self.edit_client_id.text(), self.edit_port.text(), self.edit_broker.text(),
self.edit_topic.text()]
if any(field == "" for field in fields):
QMessageBox.critical(self, "错误", "请将配置信息填写完整")
else:
# 更新全局变量
global mqtt_settings
settings_info = {
'broker': self.edit_broker.text(),
'port': self.edit_port.text(),
'topic': self.edit_topic.text(),
'client_id': self.edit_client_id.text()
}
mqtt_settings.update(settings_info)
# 保存到json文件,保存完成之后调用MQTT函数连接
self.create_thread_json(2)
# 锁定参数界面
self.lock_setting()
else:
self.unlock_setting()
def lock_setting(self):
""" 配置锁定函数,绑定到按键响应上(ensure_settings 调用) """
self.btn_ensure.setText("解锁")
self.edit_port.setDisabled(True)
self.edit_broker.setDisabled(True)
self.edit_topic.setDisabled(True)
self.edit_client_id.setDisabled(True)
def unlock_setting(self):
""" 配置解锁函数,绑定到按键响应上 (ensure_settings 调用) """
self.btn_ensure.setText("连接")
self.edit_port.setDisabled(False)
self.edit_broker.setDisabled(False)
self.edit_topic.setDisabled(False)
self.edit_client_id.setDisabled(False)
def msg_receive(self):
""" 数据接收开始按键 """
global mqtt_receive_flag
if not mqtt_receive_flag:
if self.edit_port.isEnabled():
QMessageBox.critical(self, "错误", "请先连接MQTT服务器")
else:
self.btn_start.setCheckable(True)
self.btn_start.toggle()
self.btn_start.setText("停止接收")
mqtt.subscribe(self.client, mqtt_settings["topic"], callback=self.set_receive_data)
self.client.loop_start()
mqtt_receive_flag = True
else: # 关闭数据发送
self.btn_start.setCheckable(False)