提交 2b32ef45 编写于 作者: R Roger A. Light

[443935] Fix reconnecting with lots of inflight messages.

Fix reconnecting after sending more QoS>0 messages than inflight messages is
set to, whilst connecting.  Closes #443935. Thanks to Hiram van Paassen.

Bug: https://bugs.eclipse.org/bugs/show_bug.cgi?id=443935
Change-Id: I31ddf2ffcbd6a7efa1e5b51c163d112ce3cf61ae
上级 300fcbdf
...@@ -6,6 +6,8 @@ v1.0.2 ...@@ -6,6 +6,8 @@ v1.0.2
Closes #439277. Closes #439277.
- Don't attempt to encode topic to utf-8 twice. Thanks to Luc Milland. - Don't attempt to encode topic to utf-8 twice. Thanks to Luc Milland.
- Handle "unicode" type payloads on Python 2.7. Thanks to Luc Milland. - Handle "unicode" type payloads on Python 2.7. Thanks to Luc Milland.
- Fix reconnecting after sending more QoS>0 messages than inflight messages is
set to, whilst connecting. Closes #443935. Thanks to Hiram van Paassen.
v1.0.1 v1.0.1
====== ======
......
...@@ -101,10 +101,17 @@ mqtt_cs_connect_async = 3 ...@@ -101,10 +101,17 @@ mqtt_cs_connect_async = 3
# Message state # Message state
mqtt_ms_invalid = 0, mqtt_ms_invalid = 0,
mqtt_ms_wait_puback = 1 mqtt_ms_publish_qos0 = 1
mqtt_ms_wait_pubrec = 2 mqtt_ms_publish_qos1 = 2
mqtt_ms_wait_pubrel = 3 mqtt_ms_wait_for_puback = 3
mqtt_ms_wait_pubcomp = 4 mqtt_ms_publish_qos2 = 4
mqtt_ms_wait_for_pubrec = 5
mqtt_ms_resend_pubrel = 6
mqtt_ms_wait_for_pubrel = 7
mqtt_ms_resend_pubcomp = 8
mqtt_ms_wait_for_pubcomp = 9
mqtt_ms_send_pubrec = 10
mqtt_ms_queued = 11
# Error values # Error values
MQTT_ERR_AGAIN = -1 MQTT_ERR_AGAIN = -1
...@@ -882,15 +889,17 @@ class Client(object): ...@@ -882,15 +889,17 @@ class Client(object):
if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages: if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
self._inflight_messages = self._inflight_messages+1 self._inflight_messages = self._inflight_messages+1
if qos == 1: if qos == 1:
message.state = mqtt_ms_wait_puback message.state = mqtt_ms_wait_for_puback
elif qos == 2: elif qos == 2:
message.state = mqtt_ms_wait_pubrec message.state = mqtt_ms_wait_for_pubrec
self._out_message_mutex.release() self._out_message_mutex.release()
rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain, message.dup) rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain, message.dup)
return (rc, local_mid) return (rc, local_mid)
self._out_message_mutex.release() else:
return (MQTT_ERR_SUCCESS, local_mid) message.state = mqtt_ms_queued;
self._out_message_mutex.release()
return (MQTT_ERR_SUCCESS, local_mid)
def username_pw_set(self, username, password=None): def username_pw_set(self, username, password=None):
"""Set a username and optionally a password for broker authentication. """Set a username and optionally a password for broker authentication.
...@@ -1792,15 +1801,15 @@ class Client(object): ...@@ -1792,15 +1801,15 @@ class Client(object):
now = time.time() now = time.time()
for m in messages: for m in messages:
if m.timestamp + self._message_retry < now: if m.timestamp + self._message_retry < now:
if m.state == mqtt_ms_wait_puback or m.state == mqtt_ms_wait_pubrec: if m.state == mqtt_ms_wait_for_puback or m.state == mqtt_ms_wait_for_pubrec:
m.timestamp = now m.timestamp = now
m.dup = True m.dup = True
self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
elif m.state == mqtt_ms_wait_pubrel: elif m.state == mqtt_ms_wait_for_pubrel:
m.timestamp = now m.timestamp = now
m.dup = True m.dup = True
self._send_pubrec(m.mid) self._send_pubrec(m.mid)
elif m.state == mqtt_ms_wait_pubcomp: elif m.state == mqtt_ms_wait_for_pubcomp:
m.timestamp = now m.timestamp = now
m.dup = True m.dup = True
self._send_pubrel(m.mid, True) self._send_pubrel(m.mid, True)
...@@ -1812,16 +1821,28 @@ class Client(object): ...@@ -1812,16 +1821,28 @@ class Client(object):
def _messages_reconnect_reset_out(self): def _messages_reconnect_reset_out(self):
self._out_message_mutex.acquire() self._out_message_mutex.acquire()
self._inflight_messages = 0
for m in self._out_messages: for m in self._out_messages:
m.timestamp = 0 m.timestamp = 0
if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages: if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
if m.qos == 1: if m.qos == 0:
m.state = mqtt_ms_wait_puback m.state = mqtt_ms_publish_qos0
elif m.qos == 1:
#self._inflight_messages = self._inflight_messages + 1
if m.state == mqtt_ms_wait_for_puback:
m.dup = True
m.state = mqtt_ms_publish_qos1
elif m.qos == 2: elif m.qos == 2:
# Preserve current state #self._inflight_messages = self._inflight_messages + 1
pass if m.state == mqtt_ms_wait_for_pubcomp:
m.state = mqtt_ms_resend_pubrel
m.dup = True
else:
if m.state == mqtt_ms_wait_for_pubrec:
m.dup = True
m.state = mqtt_ms_publish_qos2
else: else:
m.state = mqtt_ms_invalid m.state = mqtt_ms_queued
self._out_message_mutex.release() self._out_message_mutex.release()
def _messages_reconnect_reset_in(self): def _messages_reconnect_reset_in(self):
...@@ -1951,7 +1972,44 @@ class Client(object): ...@@ -1951,7 +1972,44 @@ class Client(object):
self._in_callback = False self._in_callback = False
self._callback_mutex.release() self._callback_mutex.release()
if result == 0: if result == 0:
return MQTT_ERR_SUCCESS rc = 0
self._out_message_mutex.acquire()
for m in self._out_messages:
m.timestamp = time.time()
if m.state == mqtt_ms_queued:
self._out_message_mutex.release()
return MQTT_ERR_SUCCESS
if m.qos == 0:
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
if rc != 0:
self._out_message_mutex.release()
return rc
elif m.qos == 1:
if m.state == mqtt_ms_publish_qos1:
self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_puback
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
if rc != 0:
self._out_message_mutex.release()
return rc
elif m.qos == 2:
if m.state == mqtt_ms_publish_qos2:
self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_pubrec
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
if rc != 0:
self._out_message_mutex.release()
return rc
elif m.state == mqtt_ms_resend_pubrel:
self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_pubcomp
rc = self._send_pubrel(m.mid, m.dup)
if rc != 0:
self._out_message_mutex.release()
return rc
self._out_message_mutex.release()
return rc
elif result > 0 and result < 6: elif result > 0 and result < 6:
return MQTT_ERR_CONN_REFUSED return MQTT_ERR_CONN_REFUSED
else: else:
...@@ -2016,7 +2074,7 @@ class Client(object): ...@@ -2016,7 +2074,7 @@ class Client(object):
return rc return rc
elif message.qos == 2: elif message.qos == 2:
rc = self._send_pubrec(message.mid) rc = self._send_pubrec(message.mid)
message.state = mqtt_ms_wait_pubrel message.state = mqtt_ms_wait_for_pubrel
self._in_message_mutex.acquire() self._in_message_mutex.acquire()
self._in_messages.append(message) self._in_messages.append(message)
self._in_message_mutex.release() self._in_message_mutex.release()
...@@ -2063,7 +2121,7 @@ class Client(object): ...@@ -2063,7 +2121,7 @@ class Client(object):
# Dont lock message_mutex here # Dont lock message_mutex here
for m in self._out_messages: for m in self._out_messages:
if self._inflight_messages < self._max_inflight_messages: if self._inflight_messages < self._max_inflight_messages:
if m.qos > 0 and m.state == mqtt_ms_invalid: if m.qos > 0 and m.state == mqtt_ms_queued:
self._inflight_messages = self._inflight_messages + 1 self._inflight_messages = self._inflight_messages + 1
if m.qos == 1: if m.qos == 1:
m.state = mqtt_ms_wait_puback m.state = mqtt_ms_wait_puback
...@@ -2088,7 +2146,7 @@ class Client(object): ...@@ -2088,7 +2146,7 @@ class Client(object):
self._out_message_mutex.acquire() self._out_message_mutex.acquire()
for m in self._out_messages: for m in self._out_messages:
if m.mid == mid: if m.mid == mid:
m.state = mqtt_ms_wait_pubcomp m.state = mqtt_ms_wait_for_pubcomp
m.timestamp = time.time() m.timestamp = time.time()
self._out_message_mutex.release() self._out_message_mutex.release()
return self._send_pubrel(mid, False) return self._send_pubrel(mid, False)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册