提交 3515d85f 编写于 作者: R Roger Light

Merge pull request #50 from FloTechnologies/pr-pending-queue-size

Support to limit the number of pending outgoing messages
...@@ -200,6 +200,17 @@ Set the maximum number of messages with QoS>0 that can be part way through their ...@@ -200,6 +200,17 @@ Set the maximum number of messages with QoS>0 that can be part way through their
Defaults to 20. Increasing this value will consume more memory but can increase throughput. Defaults to 20. Increasing this value will consume more memory but can increase throughput.
max_queued_messages_set()
'''''''''''''''''''''''''
::
max_queued_messages_set(self, queue_size)
Set the maximum number of outgoing messages with QoS>0 that can be pending in the outgoing message queue.
Defaults to 0. 0 means unlimited. When the queue is full, any further outgoing messages would be dropped.
message_retry_set() message_retry_set()
''''''''''''''''''' '''''''''''''''''''
......
...@@ -128,6 +128,7 @@ MQTT_ERR_AUTH = 11 ...@@ -128,6 +128,7 @@ MQTT_ERR_AUTH = 11
MQTT_ERR_ACL_DENIED = 12 MQTT_ERR_ACL_DENIED = 12
MQTT_ERR_UNKNOWN = 13 MQTT_ERR_UNKNOWN = 13
MQTT_ERR_ERRNO = 14 MQTT_ERR_ERRNO = 14
MQTT_ERR_QUEUE_SIZE = 15
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
sockpair_data = "0" sockpair_data = "0"
...@@ -510,6 +511,7 @@ class Client(object): ...@@ -510,6 +511,7 @@ class Client(object):
self._in_messages = [] self._in_messages = []
self._max_inflight_messages = 20 self._max_inflight_messages = 20
self._inflight_messages = 0 self._inflight_messages = 0
self._max_queued_messages = 0
self._will = False self._will = False
self._will_topic = "" self._will_topic = ""
self._will_payload = None self._will_payload = None
...@@ -968,6 +970,11 @@ class Client(object): ...@@ -968,6 +970,11 @@ class Client(object):
message.dup = False message.dup = False
self._out_message_mutex.acquire() self._out_message_mutex.acquire()
if self._max_queued_messages > 0 and len(self._out_messages) >= self._max_queued_messages:
self._out_message_mutex.release()
return (MQTT_ERR_QUEUE_SIZE, local_mid)
self._out_messages.append(message) self._out_messages.append(message)
if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages: if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
self._inflight_messages = self._inflight_messages+1 self._inflight_messages = self._inflight_messages+1
...@@ -1230,6 +1237,16 @@ class Client(object): ...@@ -1230,6 +1237,16 @@ class Client(object):
raise ValueError('Invalid inflight.') raise ValueError('Invalid inflight.')
self._max_inflight_messages = inflight self._max_inflight_messages = inflight
def max_queued_messages_set(self, queue_size):
"""Set the maximum number of messages in the outgoing message queue.
0 means unlimited."""
if queue_size < 0:
raise ValueError('Invalid queue size.')
if not isinstance(queue_size, int):
raise ValueError('Invalid type of queue size.')
self._max_queued_messages = queue_size
return self
def message_retry_set(self, retry): def message_retry_set(self, retry):
"""Set the timeout in seconds before a message with QoS>0 is retried. """Set the timeout in seconds before a message with QoS>0 is retried.
20 seconds by default.""" 20 seconds by default."""
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册