diff --git a/README.rst b/README.rst index 7fe3d4356e4ce86213c14ef92a61ba3f8ea9a7b0..be0725149400ea1f45e3d0a026a2d80cda9f4a81 100644 --- a/README.rst +++ b/README.rst @@ -200,6 +200,17 @@ Set the maximum number of messages with QoS>0 that can be part way through their Defaults to 20. Increasing this value will consume more memory but can increase throughput. +max_queued_messages_set() +''''''''''''''''''''''''' + +:: + + max_queued_messages_set(self, queue_size) + +Set the maximum number of outgoing messages with QoS>0 that can be pending in the outgoing message queue. + +Defaults to 0. 0 means unlimited. When the queue is full, any further outgoing messages would be dropped. + message_retry_set() ''''''''''''''''''' diff --git a/src/paho/mqtt/client.py b/src/paho/mqtt/client.py index 1319f5c72845e9acd4bc91a0b12367409f567a3b..acc9c7cf3468d840126870ab0c5427b4735e44cf 100755 --- a/src/paho/mqtt/client.py +++ b/src/paho/mqtt/client.py @@ -128,6 +128,7 @@ MQTT_ERR_AUTH = 11 MQTT_ERR_ACL_DENIED = 12 MQTT_ERR_UNKNOWN = 13 MQTT_ERR_ERRNO = 14 +MQTT_ERR_QUEUE_SIZE = 15 if sys.version_info[0] < 3: sockpair_data = "0" @@ -510,6 +511,7 @@ class Client(object): self._in_messages = [] self._max_inflight_messages = 20 self._inflight_messages = 0 + self._max_queued_messages = 0 self._will = False self._will_topic = "" self._will_payload = None @@ -968,6 +970,11 @@ class Client(object): message.dup = False self._out_message_mutex.acquire() + + if self._max_queued_messages > 0 and len(self._out_messages) >= self._max_queued_messages: + self._out_message_mutex.release() + return (MQTT_ERR_QUEUE_SIZE, local_mid) + self._out_messages.append(message) if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages: self._inflight_messages = self._inflight_messages+1 @@ -1230,6 +1237,16 @@ class Client(object): raise ValueError('Invalid inflight.') self._max_inflight_messages = inflight + def max_queued_messages_set(self, queue_size): + """Set the maximum number of messages in the outgoing message queue. + 0 means unlimited.""" + if queue_size < 0: + raise ValueError('Invalid queue size.') + if not isinstance(queue_size, int): + raise ValueError('Invalid type of queue size.') + self._max_queued_messages = queue_size + return self + def message_retry_set(self, retry): """Set the timeout in seconds before a message with QoS>0 is retried. 20 seconds by default."""