提交 264453c4 编写于 作者: R Roger A. Light

Add test for reconnect-on-failure

上级 6781c8ed
#!/usr/bin/env python3
# Test the reconnect_on_failure = False mode
import context
import paho_test
rc = 1
keepalive = 60
connect_packet = paho_test.gen_connect("01-reconnect-on-failure", keepalive=keepalive)
connack_packet_ok = paho_test.gen_connack(rc=0)
connack_packet_failure = paho_test.gen_connack(rc=1) # CONNACK_REFUSED_PROTOCOL_VERSION
publish_packet = paho_test.gen_publish(
u"reconnect/test", qos=0, payload="message".encode('utf-8'))
sock = paho_test.create_server_socket()
client = context.start_client()
try:
(conn, address) = sock.accept()
conn.settimeout(10)
if paho_test.expect_packet(conn, "connect", connect_packet):
conn.send(connack_packet_ok)
# Connection is a success, so we expect a publish
if paho_test.expect_packet(conn, "publish", publish_packet):
conn.close()
# Expect the client to quit here due to socket being closed
client.wait(1)
if client.returncode == 42:
# Repeat the test, but with a bad connack code
client = context.start_client()
(conn, address) = sock.accept()
conn.settimeout(10)
if paho_test.expect_packet(conn, "connect", connect_packet):
conn.send(connack_packet_failure)
# Expect the client to quit here due to socket being closed
client.wait(1)
if client.returncode == 42:
rc = 0
conn.close()
finally:
client.terminate()
client.wait()
sock.close()
exit(rc)
......@@ -6,28 +6,29 @@ PYTHON?=python3
all :
test :
$(PYTHON) ./01-will-set.py python/01-will-set.test
$(PYTHON) ./01-unpwd-set.py python/01-unpwd-set.test
$(PYTHON) ./01-unpwd-unicode-set.py python/01-unpwd-unicode-set.test
$(PYTHON) ./01-keepalive-pingreq.py python/01-keepalive-pingreq.test
$(PYTHON) ./01-no-clean-session.py python/01-no-clean-session.test
$(PYTHON) ./01-reconnect-on-failure.py python/01-reconnect-on-failure.test
$(PYTHON) ./01-unpwd-empty-password-set.py python/01-unpwd-empty-password-set.test
$(PYTHON) ./01-unpwd-empty-set.py python/01-unpwd-empty-set.test
$(PYTHON) ./01-unpwd-set.py python/01-unpwd-set.test
$(PYTHON) ./01-unpwd-unicode-set.py python/01-unpwd-unicode-set.test
$(PYTHON) ./01-will-set.py python/01-will-set.test
$(PYTHON) ./01-will-unpwd-set.py python/01-will-unpwd-set.test
$(PYTHON) ./01-zero-length-clientid.py python/01-zero-length-clientid.test
$(PYTHON) ./01-no-clean-session.py python/01-no-clean-session.test
$(PYTHON) ./01-keepalive-pingreq.py python/01-keepalive-pingreq.test
$(PYTHON) ./02-subscribe-qos0.py python/02-subscribe-qos0.test
$(PYTHON) ./02-subscribe-qos1.py python/02-subscribe-qos1.test
$(PYTHON) ./02-subscribe-qos2.py python/02-subscribe-qos2.test
$(PYTHON) ./02-unsubscribe.py python/02-unsubscribe.test
$(PYTHON) ./03-publish-qos0.py python/03-publish-qos0.test
$(PYTHON) ./03-publish-qos0-no-payload.py python/03-publish-qos0-no-payload.test
$(PYTHON) ./03-publish-b2c-qos1.py python/03-publish-b2c-qos1.test
$(PYTHON) ./03-publish-c2b-qos1-disconnect.py python/03-publish-c2b-qos1-disconnect.test
$(PYTHON) ./03-publish-c2b-qos2-disconnect.py python/03-publish-c2b-qos2-disconnect.test
$(PYTHON) ./03-publish-b2c-qos1.py python/03-publish-b2c-qos1.test
$(PYTHON) ./03-publish-helper-qos0.py python/03-publish-helper-qos0.test
$(PYTHON) ./03-publish-helper-qos1-disconnect.py python/03-publish-helper-qos1-disconnect.test
$(PYTHON) ./03-publish-qos0-no-payload.py python/03-publish-qos0-no-payload.test
$(PYTHON) ./03-publish-qos0.py python/03-publish-qos0.test
$(PYTHON) ./04-retain-qos0.py python/04-retain-qos0.test
$(PYTHON) ./08-ssl-connect-no-auth.py python/08-ssl-connect-no-auth.test
$(PYTHON) ./08-ssl-connect-cert-auth.py python/08-ssl-connect-cert-auth.test
$(PYTHON) ./08-ssl-connect-cert-auth-pw.py python/08-ssl-connect-cert-auth-pw.test
$(PYTHON) ./08-ssl-bad-cacert.py python/08-ssl-bad-cacert.test
$(PYTHON) ./08-ssl-connect-cert-auth-pw.py python/08-ssl-connect-cert-auth-pw.test
$(PYTHON) ./08-ssl-connect-cert-auth.py python/08-ssl-connect-cert-auth.test
$(PYTHON) ./08-ssl-connect-no-auth.py python/08-ssl-connect-no-auth.test
#!/usr/bin/env python3
import os
import subprocess
import socket
import sys
import time
from struct import *
import paho.mqtt.client as mqtt
def on_connect(mqttc, obj, flags, rc):
mqttc.publish("reconnect/test", "message")
mqttc = mqtt.Client("01-reconnect-on-failure", reconnect_on_failure=False)
mqttc.on_connect = on_connect
mqttc.connect("localhost", 1888)
mqttc.loop_forever()
exit(42)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册