Support to limit the number of pending outgoing messages

When there's no limitation, the library itself would queue all messages
with QoS > 0 if the messages can not be delivered. This can make the
queue grow to an unreasonable size and lead an out-of-memory issue.
For example, if the user keep publishing messages when the messages
cannot be sent successfully.
Signed-off-by: NChang Yu-heng <mr.changyuheng@gmail.com>
上级 fdec2a9f
...@@ -200,6 +200,17 @@ Set the maximum number of messages with QoS>0 that can be part way through their ...@@ -200,6 +200,17 @@ Set the maximum number of messages with QoS>0 that can be part way through their
Defaults to 20. Increasing this value will consume more memory but can increase throughput. Defaults to 20. Increasing this value will consume more memory but can increase throughput.
max_queued_messages_set()
'''''''''''''''''''''''''
::
max_queued_messages_set(self, queue_size)
Set the maximum number of outgoing messages with QoS>0 that can be pending in the outgoing message queue.
Defaults to 0. 0 means unlimited. When the queue is full, any further outgoing messages would be dropped.
message_retry_set() message_retry_set()
''''''''''''''''''' '''''''''''''''''''
......
...@@ -128,6 +128,7 @@ MQTT_ERR_AUTH = 11 ...@@ -128,6 +128,7 @@ MQTT_ERR_AUTH = 11
MQTT_ERR_ACL_DENIED = 12 MQTT_ERR_ACL_DENIED = 12
MQTT_ERR_UNKNOWN = 13 MQTT_ERR_UNKNOWN = 13
MQTT_ERR_ERRNO = 14 MQTT_ERR_ERRNO = 14
MQTT_ERR_QUEUE_SIZE = 15
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
sockpair_data = "0" sockpair_data = "0"
...@@ -510,6 +511,7 @@ class Client(object): ...@@ -510,6 +511,7 @@ class Client(object):
self._in_messages = [] self._in_messages = []
self._max_inflight_messages = 20 self._max_inflight_messages = 20
self._inflight_messages = 0 self._inflight_messages = 0
self._max_queued_messages = 0
self._will = False self._will = False
self._will_topic = "" self._will_topic = ""
self._will_payload = None self._will_payload = None
...@@ -968,6 +970,11 @@ class Client(object): ...@@ -968,6 +970,11 @@ class Client(object):
message.dup = False message.dup = False
self._out_message_mutex.acquire() self._out_message_mutex.acquire()
if self._max_queued_messages > 0 and len(self._out_messages) >= self._max_queued_messages:
self._out_message_mutex.release()
return (MQTT_ERR_QUEUE_SIZE, local_mid)
self._out_messages.append(message) self._out_messages.append(message)
if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages: if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
self._inflight_messages = self._inflight_messages+1 self._inflight_messages = self._inflight_messages+1
...@@ -1230,6 +1237,16 @@ class Client(object): ...@@ -1230,6 +1237,16 @@ class Client(object):
raise ValueError('Invalid inflight.') raise ValueError('Invalid inflight.')
self._max_inflight_messages = inflight self._max_inflight_messages = inflight
def max_queued_messages_set(self, queue_size):
"""Set the maximum number of messages in the outgoing message queue.
0 means unlimited."""
if queue_size < 0:
raise ValueError('Invalid queue size.')
if not isinstance(queue_size, int):
raise ValueError('Invalid type of queue size.')
self._max_queued_messages = queue_size
return self
def message_retry_set(self, retry): def message_retry_set(self, retry):
"""Set the timeout in seconds before a message with QoS>0 is retried. """Set the timeout in seconds before a message with QoS>0 is retried.
20 seconds by default.""" 20 seconds by default."""
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册