admin管理员组

文章数量:1516870

MQTT自动化测试脚本

基于Python的MQTT自动化测试客户端

简介

现在很多产品都支持MQTT,且很多通信协议都是HEX形式,有些MQTT Client工具不支持HEX发送,有的支持HEX发送但是发送的数据不正确,这样就造成了测试MQTT这个功能不方便的难题。为了解决以上难题,本文档记录了一个基于Python的MQTT自动化测试客户端的开发过程,可以发送自己指定的HEX数据,希望能够对开发人员和测试人员的工作有所帮助。

废话不多说直接上代码~

#!/usr/bin/python3 import paho.mqtt.client as mqtt 
import time ipaddr = '192.168.20.239'				# 服务器IP地址 
port = 1883								# 服务器端口号 
username = 'usr'						# 用户名 
password = 'pwd'						# 密码 
pubtopic = '/abcde/9CA5253B1114/up'		# 推送的主题 
subtopic = '/abcde/9CA5253B1114/down'	# 订阅的主题 
keepalive = 60 							# keepalive时间间隔 
subinterval = 20 						# 推送时间间隔# 推送的数据 
usrdatas = [[0x01,0x02,0x03,0x04,0x05,0x06,0x07,0x08,0x09],	# 数据1 [0x11,0x12,0x13,0x14,0x15,0x16,0x17,0x18,0x19],	# 数据2 [0x21,0x22,0x23,0x24,0x25,0x26,0x27,0x28,0x29]	# 数据3]def pub_info_print(topic):print('publish ' + topic + ' ' + ' '.join(l)) def pub_print_hex(topic, bytes): l = ['%02X' % i for i in bytes] print('publish ' + topic + ' ' + ' '.join(l)) def msg_print_hex(topic, payload): l = ['%02X' % i for i in payload] print('message ' + topic + ' ' + ' '.join(l)) def on_connect(client, userdata, flags, rc): print('Connected with result code: ' + str(rc)) client.subscribe(subtopic, qos=0) def on_message(client, userdata, msg): msg_print_hex(msg.topic, msg.payload) client = mqtt.Client() 
client.username_pw_set(username, password) 
client.on_connect = on_connect 
client.on_message = on_message 
client.connect(ipaddr, port, keepalive) 
client.loop_start() # 启用一个线程保持连接while 1:for usrdata in usrdatas:time.sleep(subinterval)pubdata = bytes(usrdata)client.publish(pubtopic, payload=pubdata, qos=0)pub_print_hex(pubtopic, usrdata)client.loop_stop() # 停止后台线程

感觉好用的点个赞哈~ 栓Q啦

本文标签: MQTT自动化测试脚本