提交 e72010a0 编写于 作者: R Roger Light

Add basic MQTTv5 support to helper functions.

Closes #575. Thanks to briggySmalls.
上级 a155a876
......@@ -19,6 +19,8 @@ v1.6.0 - 2021-xx-xx
- Callbacks can now be updated from within a callback.
- Remove _out_packet_mutex and _current_out_packet_mutex and convert the
_out_packet queue use to thread safe.
- Add basic MQTT v5 support to the subscribe and publish helper functions.
Closes #575.
v1.5.1 - 2020-09-22
......@@ -432,7 +432,7 @@ class Client(object):
broker. To use a callback, define a function and then assign it to the
def on_connect(client, userdata, flags, rc, properties=None):
def on_connect(client, userdata, flags, rc):
print("Connection returned " + str(rc))
client.on_connect = on_connect
......@@ -453,116 +453,12 @@ class Client(object):
If you wish to suppress exceptions within a callback, you should set
`client.suppress_exceptions = True`
The callbacks:
on_connect(client, userdata, flags, rc, properties=None): called when the broker responds to our connection
flags is a dict that contains response flags from the broker:
flags['session present'] - this flag is useful for clients that are
using clean session set to 0 only. If a client with clean
session=0, that reconnects to a broker that it has previously
connected to, this flag indicates whether the broker still has the
session information for the client. If 1, the session still exists.
The value of rc determines success or not:
0: Connection successful
1: Connection refused - incorrect protocol version
2: Connection refused - invalid client identifier
3: Connection refused - server unavailable
4: Connection refused - bad username or password
5: Connection refused - not authorised
6-255: Currently unused.
Decorator: @client.connect_callback() (```client``` is the name of the
instance which this callback is being attached to)
on_connect_fail(client, userdata): called when the connection to the broker fails.
Decorator: @client.connect_fail_callback() (```client``` is the name of the
instance which this callback is being attached to)
on_disconnect(client, userdata, rc): called when the client disconnects from the broker.
The rc parameter indicates the disconnection state:
MQTT_ERR_SUCCESS: the callback was called in response to a disconnect() call.
MQTT_ERR_KEEPALIVE: the client/broker did not communicate within the keepalive interval.
MQTT_ERR_CONN_LOST: the connection was lost.
MQTT_ERR_PROTOCOL: a protocol error occurred when communicating with the broker.
on_disconnect(client, userdata, rc, properties): called when the MQTT V5 client disconnects from the broker.
When using MQTT V5, the broker can send a disconnect message to the client. The
message can contain a reason code and MQTT V5 properties. The properties parameter could be
None if they do not exist in the disconnect message.
Decorator: @client.disconnect_callback() (```client``` is the name of the
instance which this callback is being attached to)
on_message(client, userdata, message): called when a message has been received on a
topic that the client subscribes to. The message variable is a
MQTTMessage that describes all of the message parameters.
Decorator: @client.message_callback() (```client``` is the name of the
instance which this callback is being attached to)
on_publish(client, userdata, mid): called when a message that was to be sent using the
publish() call has completed transmission to the broker. For messages
with QoS levels 1 and 2, this means that the appropriate handshakes have
completed. For QoS 0, this simply means that the message has left the
client. The mid variable matches the mid variable returned from the
corresponding publish() call, to allow outgoing messages to be tracked.
This callback is important because even if the publish() call returns
success, it does not always mean that the message has been sent.
Decorator: @client.publish_callback() (```client``` is the name of the
instance which this callback is being attached to)
on_subscribe(client, userdata, mid, granted_qos, properties=None): called when the broker responds to a
subscribe request. The mid variable matches the mid variable returned
from the corresponding subscribe() call. The granted_qos variable is a
list of integers that give the QoS level the broker has granted for each
of the different subscription requests.
Decorator: @client.subscribe_callback() (```client``` is the name of the
instance which this callback is being attached to)
on_unsubscribe(client, userdata, mid): called when the broker responds to an unsubscribe
request. The mid variable matches the mid variable returned from the
corresponding unsubscribe() call.
Decorator: @client.unsubscribe_callback() (```client``` is the name of the
instance which this callback is being attached to)
on_log(client, userdata, level, buf): called when the client has log information. Define
to allow debugging. The level variable gives the severity of the message
MQTT_LOG_ERR, and MQTT_LOG_DEBUG. The message itself is in buf.
Decorator: @client.log_callback() (```client``` is the name of the
instance which this callback is being attached to)
on_socket_open(client, userdata, sock): Called when the socket has been opened. Use this
to register the socket with an external event loop for reading.
Decorator: @client.socket_open_callback() (```client``` is the name of the
instance which this callback is being attached to)
on_socket_close(client, userdata, sock): Called when the socket is about to be closed.
Use this to unregister a socket from an external event loop for reading.
Decorator: @client.socket_close_callback() (```client``` is the name of the
instance which this callback is being attached to)
on_socket_register_write(client, userdata, sock): Called when a write operation to the
socket failed because it would have blocked, e.g. output buffer full. Use this to
register the socket with an external event loop for writing.
Decorator: @client.socket_register_write_callback() (```client``` is the name of the
instance which this callback is being attached to)
on_socket_unregister_write(client, userdata, sock): Called when a write operation to the
socket succeeded after it had previously failed. Use this to unregister the socket
from an external event loop for writing.
Decorator: @client.socket_unregister_write_callback() (```client``` is the name of the
instance which this callback is being attached to)
The callbacks are listed below, documentation for each of them can be found
at the same function name:
on_connect, on_connect_fail, on_disconnect, on_message, on_publish,
on_subscribe, on_unsubscribe, on_log, on_socket_open, on_socket_close,
on_socket_register_write, on_socket_unregister_write
def __init__(self, client_id="", clean_session=None, userdata=None,
......@@ -1893,6 +1789,9 @@ class Client(object):
buf: the message itself
Decorator: @client.log_callback() (```client``` is the name of the
instance which this callback is being attached to)
self._on_log = func
......@@ -1913,7 +1812,7 @@ class Client(object):
""" Define the connect callback implementation.
Expected signature for MQTT v3.1 and v3.1.1 is:
connect_callback(client, userdata, flags, rc, properties=None)
connect_callback(client, userdata, flags, rc)
and for MQTT v5.0:
connect_callback(client, userdata, flags, reasonCode, properties)
......@@ -1944,6 +1843,10 @@ class Client(object):
4: Connection refused - bad username or password
5: Connection refused - not authorised
6-255: Currently unused.
Decorator: @client.connect_callback() (```client``` is the name of the
instance which this callback is being attached to)
with self._callback_mutex:
self._on_connect = func
......@@ -1969,6 +1872,10 @@ class Client(object):
client: the client instance for this callback
userdata: the private user data as set in Client() or userdata_set()
Decorator: @client.connect_fail_callback() (```client``` is the name of the
instance which this callback is being attached to)
with self._callback_mutex:
self._on_connect_fail = func
......@@ -1990,7 +1897,7 @@ class Client(object):
""" Define the suscribe callback implementation.
Expected signature for MQTT v3.1.1 and v3.1 is:
subscribe_callback(client, userdata, mid, granted_qos, properties=None)
subscribe_callback(client, userdata, mid, granted_qos)
and for MQTT v5.0:
subscribe_callback(client, userdata, mid, reasonCodes, properties)
......@@ -2005,6 +1912,9 @@ class Client(object):
subscription. A list of ReasonCodes instances.
properties: the MQTT v5.0 properties received from the broker. A
list of Properties class instances.
Decorator: @client.subscribe_callback() (```client``` is the name of the
instance which this callback is being attached to)
with self._callback_mutex:
self._on_subscribe = func
......@@ -2036,6 +1946,10 @@ class Client(object):
userdata: the private user data as set in Client() or userdata_set()
message: an instance of MQTTMessage.
This is a class with members topic, payload, qos, retain.
Decorator: @client.message_callback() (```client``` is the name of the
instance which this callback is being attached to)
with self._callback_mutex:
self._on_message = func
......@@ -2069,6 +1983,10 @@ class Client(object):
userdata: the private user data as set in Client() or userdata_set()
mid: matches the mid variable returned from the corresponding
publish() call, to allow outgoing messages to be tracked.
Decorator: @client.publish_callback() (```client``` is the name of the
instance which this callback is being attached to)
with self._callback_mutex:
self._on_publish = func
......@@ -2103,6 +2021,9 @@ class Client(object):
list of Properties class instances.
reasonCodes: the MQTT v5.0 reason codes received from the broker for each
unsubscribe topic. A list of ReasonCodes instances
Decorator: @client.unsubscribe_callback() (```client``` is the name of the
instance which this callback is being attached to)
with self._callback_mutex:
self._on_unsubscribe = func
......@@ -2136,6 +2057,10 @@ class Client(object):
MQTT_ERR_SUCCESS (0), the callback was called in response to
a disconnect() call. If any other value the disconnection
was unexpected, such as might be caused by a network error.
Decorator: @client.disconnect_callback() (```client``` is the name of the
instance which this callback is being attached to)
with self._callback_mutex:
self._on_disconnect = func
......@@ -2163,6 +2088,9 @@ class Client(object):
client: the client instance for this callback
userdata: the private user data as set in Client() or userdata_set()
sock: the socket which was just opened.
Decorator: @client.socket_open_callback() (```client``` is the name of the
instance which this callback is being attached to)
with self._callback_mutex:
self._on_socket_open = func
......@@ -2205,6 +2133,9 @@ class Client(object):
client: the client instance for this callback
userdata: the private user data as set in Client() or userdata_set()
sock: the socket which is about to be closed.
Decorator: @client.socket_close_callback() (```client``` is the name of the
instance which this callback is being attached to)
with self._callback_mutex:
self._on_socket_close = func
......@@ -2247,6 +2178,9 @@ class Client(object):
client: the client instance for this callback
userdata: the private user data as set in Client() or userdata_set()
sock: the socket which should be registered for writing
Decorator: @client.socket_register_write_callback() (```client``` is the name of the
instance which this callback is being attached to)
with self._callback_mutex:
self._on_socket_register_write = func
......@@ -2292,6 +2226,9 @@ class Client(object):
client: the client instance for this callback
userdata: the private user data as set in Client() or userdata_set()
sock: the socket which should be unregistered for writing
Decorator: @client.socket_unregister_write_callback() (```client``` is the name of the
instance which this callback is being attached to)
with self._callback_mutex:
self._on_socket_unregister_write = func
......@@ -54,6 +54,9 @@ def _on_connect(client, userdata, flags, rc):
raise mqtt.MQTTException(paho.connack_string(rc))
def _on_connect_v5(client, userdata, flags, rc, properties):
"""Internal v5 callback"""
_on_connect(client, userdata, flags, rc)
def _on_publish(client, userdata, mid):
"""Internal callback"""
......@@ -133,14 +136,15 @@ def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,
if not isinstance(msgs, Iterable):
raise TypeError('msgs must be an iterable')
if protocol == mqtt.client.MQTTv5:
raise NotImplementedError('protocol MQTTv5 not supported')
client = paho.Client(client_id=client_id, userdata=collections.deque(msgs),
protocol=protocol, transport=transport)
client.on_publish = _on_publish
client.on_connect = _on_connect
if protocol == mqtt.client.MQTTv5:
client.on_connect = _on_connect_v5
client.on_connect = _on_connect
if proxy_args is not None:
......@@ -23,8 +23,7 @@ from __future__ import absolute_import
from .. import mqtt
from . import client as paho
def _on_connect(client, userdata, flags, rc):
def _on_connect_v5(client, userdata, flags, rc, properties):
"""Internal callback"""
if rc != 0:
raise mqtt.MQTTException(paho.connack_string(rc))
......@@ -35,6 +34,10 @@ def _on_connect(client, userdata, flags, rc):
client.subscribe(userdata['topics'], userdata['qos'])
def _on_connect(client, userdata, flags, rc):
"""Internal v5 callback"""
_on_connect_v5(client, userdata, flags, rc, None)
def _on_message_callback(client, userdata, message):
"""Internal callback"""
......@@ -133,9 +136,6 @@ def callback(callback, topics, qos=0, userdata=None, hostname="localhost",
if qos < 0 or qos > 2:
raise ValueError('qos must be in the range 0-2')
if protocol == mqtt.client.MQTTv5:
raise NotImplementedError('protocol MQTTv5 not supported')
callback_userdata = {
......@@ -146,7 +146,10 @@ def callback(callback, topics, qos=0, userdata=None, hostname="localhost",
protocol=protocol, transport=transport,
client.on_message = _on_message_callback
client.on_connect = _on_connect
if protocol == mqtt.client.MQTTv5:
client.on_connect = _on_connect_v5
client.on_connect = _on_connect
if proxy_args is not None:
#!/usr/bin/env python3
# Test whether a client sends a correct PUBLISH to a topic with QoS 0.
# Use paho.mqtt.publish helper for that.
# The client should connect to port 1888 with keepalive=60, clean session set,
# and client id publish-helper-qos0-test
# The test will send a CONNACK message to the client with rc=0. Upon receiving
# the CONNACK and verifying that rc=0, the client should send a PUBLISH message
# to topic "pub/qos0/test" with payload "message" and QoS=0. If rc!=0, the
# client should exit with an error.
# After sending the PUBLISH message, the client should send a
# DISCONNECT message.
import context
import paho_test
rc = 1
keepalive = 60
connect_packet = paho_test.gen_connect(
"publish-helper-qos0-test", keepalive=keepalive, proto_ver=5, properties=None
connack_packet = paho_test.gen_connack(rc=0, proto_ver=5)
publish_packet = paho_test.gen_publish(
u"pub/qos0/test", qos=0, payload="message", proto_ver=5
disconnect_packet = paho_test.gen_disconnect()
sock = paho_test.create_server_socket()
client = context.start_client()
(conn, address) = sock.accept()
paho_test.expect_packet(conn, "connect", connect_packet):
paho_test.expect_packet(conn, "publish", publish_packet):
paho_test.expect_packet(conn, "disconnect", disconnect_packet):
rc = 0
......@@ -26,6 +26,7 @@ test :
$(PYTHON) ./03-publish-c2b-qos1-disconnect.py python/03-publish-c2b-qos1-disconnect.test
$(PYTHON) ./03-publish-c2b-qos2-disconnect.py python/03-publish-c2b-qos2-disconnect.test
$(PYTHON) ./03-publish-helper-qos0.py python/03-publish-helper-qos0.test
$(PYTHON) ./03-publish-helper-qos0-v5.py python/03-publish-helper-qos0-v5.test
$(PYTHON) ./03-publish-helper-qos1-disconnect.py python/03-publish-helper-qos1-disconnect.test
$(PYTHON) ./03-publish-qos0-no-payload.py python/03-publish-qos0-no-payload.test
$(PYTHON) ./03-publish-qos0.py python/03-publish-qos0.test
#!/usr/bin/env python3
import paho.mqtt.publish
import paho.mqtt.client
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册