03-publish-b2c-qos1.test 1.1 KB
Newer Older
1
#!/usr/bin/env python3
R
Roger Light 已提交
2 3 4

import os
import socket
C
Christian Clauss 已提交
5
import subprocess
R
Roger Light 已提交
6 7 8 9
import sys
import time
from struct import *

R
Roger Light 已提交
10
import paho.mqtt.client as mqtt
R
Roger Light 已提交
11

12 13 14 15 16 17
if sys.version_info[0] < 3:
    expected_payload = "message"
else:
    expected_payload = b"message"


R
Roger Light 已提交
18
def on_message(mqttc, obj, msg):
R
Roger Light 已提交
19 20 21 22 23 24
    if msg.mid != 123:
        print("Invalid mid: ("+str(msg.mid)+")")
        exit(1)
    if msg.topic != "pub/qos1/receive":
        print("Invalid topic: ("+str(msg.topic)+")")
        exit(1)
25
    if msg.payload != expected_payload:
R
Roger Light 已提交
26 27 28 29 30 31 32 33 34 35
        print("Invalid payload: ("+str(msg.payload)+")")
        exit(1)
    if msg.qos != 1:
        print("Invalid qos: ("+str(msg.qos)+")")
        exit(1)
    if msg.retain != False:
        print("Invalid retain: ("+str(msg.retain)+")")
        exit(1)
    exit(0)

36
def on_connect(mqttc, obj, flags, rc):
R
Roger Light 已提交
37 38 39 40
    if rc != 0:
        print("Connect failed ("+str(rc)+")")
        exit(rc)

R
Roger Light 已提交
41 42 43
mqttc = mqtt.Client("publish-qos1-test")
mqttc.on_connect = on_connect
mqttc.on_message = on_message
R
Roger Light 已提交
44

R
Roger Light 已提交
45
mqttc.connect("localhost", 1888)
R
Roger Light 已提交
46 47
rc = 0
while rc == 0:
R
Roger Light 已提交
48
    rc = mqttc.loop()
R
Roger Light 已提交
49 50
print("rc: "+str(rc))
exit(1)