提交 8c562d3e 编写于 作者: A Alexis BRENON 提交者: Alexis BRENON

Update Client() class with stub callbacks. Closes #53

Signed-off-by: NAlexisBRENON <brenon.alexis@gmail.com>
上级 984ae86d
......@@ -5,6 +5,7 @@ v1.1.1 - 2015-xx-xx
unresolvable.
- Fix for keepalive=0 causing an infinite disconnect/reconnect loop. Closes
#42.
- Add stub callbacks to Client for easy inheritance. Closes #53.
v1.1 - 2015-01-30
......
......@@ -456,14 +456,7 @@ class Client(object):
self._will_payload = None
self._will_qos = 0
self._will_retain = False
self.on_disconnect = None
self.on_connect = None
self.on_publish = None
self.on_message = None
self.on_message_filtered = []
self.on_subscribe = None
self.on_unsubscribe = None
self.on_log = None
self._host = ""
self._port = 1883
self._bind_address = ""
......@@ -1141,10 +1134,9 @@ class Client(object):
rc = MQTT_ERR_SUCCESS
else:
rc = 1
if self.on_disconnect:
self._in_callback = True
self.on_disconnect(self, self._userdata, rc)
self._in_callback = False
self._in_callback = True
self.on_disconnect(self, self._userdata, rc)
self._in_callback = False
self._callback_mutex.release()
return MQTT_ERR_CONN_LOST
......@@ -1320,6 +1312,113 @@ class Client(object):
self._thread.join()
self._thread = None
def on_log(self, client, userdata, level, buf):
"""Called when the client has log information.
This function may be overridden by sub-classes.
Define to allow debugging.
level: gives the severity of the message and will be one of
MQTT_LOG_INFO, MQTT_LOG_NOTICE, MQTT_LOG_WARNING, MQTT_LOG_ERR, and
MQTT_LOG_DEBUG.
buf: the message itself
"""
pass
def on_connect(self, client, userdata, flag, rc):
"""Called when the broker responds to our connection request.
This function may be overridden by sub-classes.
self: the client instance for this callback
userdata: the private user data as set in ``Client()`` or ``userdata_set()``
flags: response flags sent by the broker
rc: the connection result
flags is a dict that contains response flags from the broker:
flags['session present'] - this flag is useful for clients that are
using clean session set to 0 only. If a client with clean
session=0, that reconnects to a broker that it has previously
connected to, this flag indicates whether the broker still has the
session information for the client. If 1, the session still exists.
The value of rc indicates success or not:
0: Connection successful
1: Connection refused - incorrect protocol version
2: Connection refused - invalid client identifier
3: Connection refused - server unavailable
4: Connection refused - bad username or password
5: Connection refused - not authorised
6-255: Currently unused.
"""
pass
def on_subscribe(self, client, userdata, mid, granted_qos):
"""Called when the broker responds to a subscribe request.
This function may be overridden by sub-classes.
mid: matches the mid variable returned from the corresponding
subscribe() call.
granted_qos: list of integers that give the QoS level the broker has
granted for each of the different subscription requests.
"""
pass
def on_message(self, client, userdata, message):
"""Called when a message has been received on a topic that the client
subscribes to.
This function may be overridden by sub-classes.
This callback will be called for every message received. Use
``message_callback_add()`` to define multiple callbacks that will be called for
specific topic filters.
self: the client instance for this callback
userdata: the private user data as set in Client() or userdata_set()
message: an instance of MQTTMessage.
This is a class with members topic, payload, qos, retain.
"""
pass
def on_publish(self, client, userdata, mid):
"""Called when a message that was to be sent using the publish() call
has completed transmission to the broker.
This function may be overridden by sub-classes.
For messages with QoS levels 1 and 2, this means that the appropriate
handshakes have completed. For QoS 0, this simply means that the message
has left the client.
This callback is important because even if the publish() call returns
success, it does not always mean that the message has been sent.
mid: matches the mid variable returned from the corresponding
publish() call, to allow outgoing messages to be tracked.
"""
pass
def on_unsubscribe(self, client, userdata, mid):
"""Called when the broker responds to an unsubscribe request.
This function may be overridden by sub-classes.
mid: matches the mid variable returned from the corresponding
unsubscribe() call.
"""
pass
def on_disconnect(self, client, userdata, rc):
"""Called when the client disconnects from the broker.
This function may be overridden by sub-classes.
self: the client instance for this callback
userdata: the private user data as set in Client() or userdata_set()
rc: the disconnection result
The rc parameter indicates the disconnection state. If MQTT_ERR_SUCCESS
(0), the callback was called in response to a disconnect() call. If any
other value the disconnection was unexpected, such as might be caused by
a network error.
"""
pass
def message_callback_add(self, sub, callback):
"""Register a message callback for a specific topic.
Messages that match 'sub' will be passed to 'callback'. Any
......@@ -1377,10 +1476,9 @@ class Client(object):
rc = MQTT_ERR_SUCCESS
self._state_mutex.release()
self._callback_mutex.acquire()
if self.on_disconnect:
self._in_callback = True
self.on_disconnect(self, self._userdata, rc)
self._in_callback = False
self._in_callback = True
self.on_disconnect(self, self._userdata, rc)
self._in_callback = False
self._callback_mutex.release()
return rc
......@@ -1520,10 +1618,9 @@ class Client(object):
if packet['to_process'] == 0:
if (packet['command'] & 0xF0) == PUBLISH and packet['qos'] == 0:
self._callback_mutex.acquire()
if self.on_publish:
self._in_callback = True
self.on_publish(self, self._userdata, packet['mid'])
self._in_callback = False
self._in_callback = True
self.on_publish(self, self._userdata, packet['mid'])
self._in_callback = False
self._callback_mutex.release()
......@@ -1535,10 +1632,9 @@ class Client(object):
self._msgtime_mutex.release()
self._callback_mutex.acquire()
if self.on_disconnect:
self._in_callback = True
self.on_disconnect(self, self._userdata, 0)
self._in_callback = False
self._in_callback = True
self.on_disconnect(self, self._userdata, 0)
self._in_callback = False
self._callback_mutex.release()
if self._ssl:
......@@ -1567,8 +1663,7 @@ class Client(object):
return MQTT_ERR_SUCCESS
def _easy_log(self, level, buf):
if self.on_log:
self.on_log(self, self._userdata, level, buf)
self.on_log(self, self._userdata, level, buf)
def _check_keepalive(self):
if self._keepalive == 0:
......@@ -1599,10 +1694,9 @@ class Client(object):
else:
rc = 1
self._callback_mutex.acquire()
if self.on_disconnect:
self._in_callback = True
self.on_disconnect(self, self._userdata, rc)
self._in_callback = False
self._in_callback = True
self.on_disconnect(self, self._userdata, rc)
self._in_callback = False
self._callback_mutex.release()
def _mid_generate(self):
......@@ -1997,21 +2091,20 @@ class Client(object):
self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+")")
self._callback_mutex.acquire()
if self.on_connect:
self._in_callback = True
self._in_callback = True
if sys.version_info[0] < 3:
argcount = self.on_connect.func_code.co_argcount
else:
argcount = self.on_connect.__code__.co_argcount
if sys.version_info[0] < 3:
argcount = self.on_connect.func_code.co_argcount
else:
argcount = self.on_connect.__code__.co_argcount
if argcount == 3:
self.on_connect(self, self._userdata, result)
else:
flags_dict = dict()
flags_dict['session present'] = flags & 0x01
self.on_connect(self, self._userdata, flags_dict, result)
self._in_callback = False
if argcount == 3:
self.on_connect(self, self._userdata, result)
else:
flags_dict = dict()
flags_dict['session present'] = flags & 0x01
self.on_connect(self, self._userdata, flags_dict, result)
self._in_callback = False
self._callback_mutex.release()
if result == 0:
rc = 0
......@@ -2075,10 +2168,9 @@ class Client(object):
granted_qos = struct.unpack(pack_format, packet)
self._callback_mutex.acquire()
if self.on_subscribe:
self._in_callback = True
self.on_subscribe(self, self._userdata, mid, granted_qos)
self._in_callback = False
self._in_callback = True
self.on_subscribe(self, self._userdata, mid, granted_qos)
self._in_callback = False
self._callback_mutex.release()
return MQTT_ERR_SUCCESS
......@@ -2215,10 +2307,9 @@ class Client(object):
mid = mid[0]
self._easy_log(MQTT_LOG_DEBUG, "Received UNSUBACK (Mid: "+str(mid)+")")
self._callback_mutex.acquire()
if self.on_unsubscribe:
self._in_callback = True
self.on_unsubscribe(self, self._userdata, mid)
self._in_callback = False
self._in_callback = True
self.on_unsubscribe(self, self._userdata, mid)
self._in_callback = False
self._callback_mutex.release()
return MQTT_ERR_SUCCESS
......@@ -2237,12 +2328,11 @@ class Client(object):
if self._out_messages[i].mid == mid:
# Only inform the client the message has been sent once.
self._callback_mutex.acquire()
if self.on_publish:
self._out_message_mutex.release()
self._in_callback = True
self.on_publish(self, self._userdata, mid)
self._in_callback = False
self._out_message_mutex.acquire()
self._out_message_mutex.release()
self._in_callback = True
self.on_publish(self, self._userdata, mid)
self._in_callback = False
self._out_message_mutex.acquire()
self._callback_mutex.release()
self._out_messages.pop(i)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册