03-publish-qos0-no-payload.test 653 字节
Newer Older
1
#!/usr/bin/env python3
R
Roger Light 已提交
2 3 4

import os
import socket
C
Christian Clauss 已提交
5
import subprocess
R
Roger Light 已提交
6 7 8 9
import sys
import time
from struct import *

R
Roger Light 已提交
10
import paho.mqtt.client as mqtt
R
Roger Light 已提交
11 12 13

sent_mid = -1

14
def on_connect(mqttc, obj, flags, rc):
R
Roger Light 已提交
15 16 17 18
    global sent_mid
    if rc != 0:
        exit(rc)
    else:
R
Roger Light 已提交
19
        (res, sent_mid) = mqttc.publish("pub/qos0/no-payload/test")
R
Roger Light 已提交
20

R
Roger Light 已提交
21
def on_publish(mqttc, obj, mid):
R
Roger Light 已提交
22 23
    global sent_mid, run
    if sent_mid == mid:
R
Roger Light 已提交
24
        mqttc.disconnect()
R
Roger Light 已提交
25 26 27
        run = 0

run = -1
R
Roger Light 已提交
28 29 30
mqttc = mqtt.Client("publish-qos0-test-np", run)
mqttc.on_connect = on_connect
mqttc.on_publish = on_publish
R
Roger Light 已提交
31

R
Roger Light 已提交
32
mqttc.connect("localhost", 1888)
R
Roger Light 已提交
33
while run == -1:
R
Roger Light 已提交
34
    mqttc.loop()
R
Roger Light 已提交
35 36

exit(run)