diff --git a/ChangeLog.txt b/ChangeLog.txt index dde81e8f0c43cdf86bbaf0079bac8b83f8a06f51..16a524277ceb6b63a7994dbd06451fd52ed8c9c5 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,8 +1,34 @@ -v1.1.1 - 2015-xx-xx -=================== +v1.2 - 2016-06-03 +================= +- Client.publish() now returns an MQTTMessageInfo object. The MQTTMessageInfo + object behaves like a tuple of (rc, mid) for backwards compatibility but + also provides two functions: is_published() and wait_for_published(). + This allows a client to determine whether any given message has been + published without need for a callback, and also allows the client to block + waiting until the message has been sent. - Further fix for Client constructor for the case where "localhost" is unresolvable. +- Add paho.mqtt.subscribe module, with simple() and callback() helper + functions. +- Allow ^C to interrupt client loop. +- Fix for keepalive=0 causing an infinite disconnect/reconnect loop. Closes + #42. +- Modify callbacks definition/structure to allow classical inheritence. Closes + #53, #54. +- Add websockets support. +- Default MQTT version is again changed to v3.1.1. +- Client.subscribe() now accepts unicode type topic inputs on Python 2. Closes + #16. +- paho.mqtt.publish() now raises an MQTTException on a CONNECT failure, rather + than blindly continuing. Closes #6. +- Don't block on TLS sockets on Python 3. Closes #2. +- Client.publish() now accepts bytes() payloads on Python 3. Closes #1. +- Don't attempt to join() own thread. Closes #14. +- Allow the use of Client.message_callback_add() from inside callbacks. Closes + #12. +- Use a monotonic time source for keeping track of time, if available. Closes + #56. v1.1 - 2015-01-30 diff --git a/README.rst b/README.rst index 1903090e5db0006199cb57df928019df990a4ec1..791dff31e92856870d5dbff014cc947f7785e5c5 100644 --- a/README.rst +++ b/README.rst @@ -30,6 +30,9 @@ Contents * `Publish`_ * `Single`_ * `Multiple`_ + * `Subscribe`_ + * `Simple`_ + * `Calback`_ * `Reporting bugs`_ * `More information`_ @@ -127,24 +130,39 @@ Client() :: - Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311) + Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp") The ``Client()`` constructor takes the following arguments: client_id - the unique client id string used when connecting to the broker. If ``client_id`` is zero length or ``None``, then one will be randomly generated. In this case the ``clean_session`` parameter must be ``True``. + the unique client id string used when connecting to the broker. If + ``client_id`` is zero length or ``None``, then one will be randomly + generated. In this case the ``clean_session`` parameter must be ``True``. clean_session - a boolean that determines the client type. If ``True``, the broker will remove all information about this client when it disconnects. If ``False``, the client is a durable client and subscription information and queued messages will be retained when the client disconnects. + a boolean that determines the client type. If ``True``, the broker will + remove all information about this client when it disconnects. If ``False``, + the client is a durable client and subscription information and queued + messages will be retained when the client disconnects. - Note that a client will never discard its own outgoing messages on disconnect. Calling connect() or reconnect() will cause the messages to be resent. Use reinitialise() to reset a client to its original state. + Note that a client will never discard its own outgoing messages on + disconnect. Calling connect() or reconnect() will cause the messages to be + resent. Use reinitialise() to reset a client to its original state. userdata - user defined data of any type that is passed as the ``userdata`` parameter to callbacks. It may be updated at a later point with the ``user_data_set()`` function. + user defined data of any type that is passed as the ``userdata`` parameter + to callbacks. It may be updated at a later point with the + ``user_data_set()`` function. protocol - the version of the MQTT protocol to use for this client. Can be either ``MQTTv31`` or ``MQTTv311`` - + the version of the MQTT protocol to use for this client. Can be either + ``MQTTv31`` or ``MQTTv311`` + +transport + set to "websockets" to send MQTT over WebSockets. Leave at the default of + "tcp" to use raw TCP. + + Example ....... @@ -187,6 +205,17 @@ Set the maximum number of messages with QoS>0 that can be part way through their Defaults to 20. Increasing this value will consume more memory but can increase throughput. +max_queued_messages_set() +''''''''''''''''''''''''' + +:: + + max_queued_messages_set(self, queue_size) + +Set the maximum number of outgoing messages with QoS>0 that can be pending in the outgoing message queue. + +Defaults to 0. 0 means unlimited. When the queue is full, any further outgoing messages would be dropped. + message_retry_set() ''''''''''''''''''' @@ -262,21 +291,28 @@ will_set() :: will_set(topic, payload=None, qos=0, retain=False) -Set a Will to be sent to the broker. If the client disconnects without calling ``disconnect()``, the broker will publish the message on its behalf. +Set a Will to be sent to the broker. If the client disconnects without calling +``disconnect()``, the broker will publish the message on its behalf. topic the topic that the will message should be published on. payload - the message to send as a will. If not given, or set to ``None`` a zero length message will be used as the will. Passing an int or float will result in the payload being converted to a string representing that number. If you wish to send a true int/float, use ``struct.pack()`` to create the payload you require. + the message to send as a will. If not given, or set to ``None`` a zero + length message will be used as the will. Passing an int or float will + result in the payload being converted to a string representing that number. + If you wish to send a true int/float, use ``struct.pack()`` to create the + payload you require. qos the quality of service level to use for the will. retain - if set to ``True``, the will message will be set as the "last known good"/retained message for the topic. + if set to ``True``, the will message will be set as the "last known + good"/retained message for the topic. -Raises a ``ValueError`` if ``qos`` is not 0, 1 or 2, or if ``topic`` is ``None`` or has zero string length. +Raises a ``ValueError`` if ``qos`` is not 0, 1 or 2, or if ``topic`` is +``None`` or has zero string length. Connect / reconnect / disconnect ```````````````````````````````` @@ -288,24 +324,31 @@ connect() connect(host, port=1883, keepalive=60, bind_address="") -The ``connect()`` function connects the client to a broker. This is a blocking function. It takes the following arguments: +The ``connect()`` function connects the client to a broker. This is a blocking +function. It takes the following arguments: host the hostname or IP address of the remote broker port - the network port of the server host to connect to. Defaults to 1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you are using ``tls_set()`` the port may need providing manually + the network port of the server host to connect to. Defaults to 1883. Note + that the default port for MQTT over SSL/TLS is 8883 so if you are using + ``tls_set()`` the port may need providing manually keepalive - maximum period in seconds allowed between communications with the broker. If no other messages are being exchanged, this controls the rate at which the client will send ping messages to the broker + maximum period in seconds allowed between communications with the broker. + If no other messages are being exchanged, this controls the rate at which + the client will send ping messages to the broker bind_address - the IP address of a local network interface to bind this client to, assuming multiple interfaces exist + the IP address of a local network interface to bind this client to, + assuming multiple interfaces exist Callback ........ -When the client receives a CONNACK message from the broker in response to the connect it generates an ``on_connect()`` callback. +When the client receives a CONNACK message from the broker in response to the +connect it generates an ``on_connect()`` callback. Example ....... @@ -321,12 +364,14 @@ connect_async() connect_async(host, port=1883, keepalive=60, bind_address="") -Identical to ``connect()``, but non-blocking. The connection will not complete until one of the ``loop*()`` functions is called. +Use in conjunction with ``loop_start()`` to connect in a non-blocking manner. +The connection will not complete until ``loop_start()`` is called. Callback ........ -When the client receives a CONNACK message from the broker in response to the connect it generates an ``on_connect()`` callback. +When the client receives a CONNACK message from the broker in response to the +connect it generates an ``on_connect()`` callback. connect_srv() ''''''''''''' @@ -335,17 +380,21 @@ connect_srv() connect_srv(domain, keepalive=60, bind_address="") -Connect to a broker using an SRV DNS lookup to obtain the broker address. Takes the following arguments: +Connect to a broker using an SRV DNS lookup to obtain the broker address. Takes +the following arguments: domain - the DNS domain to search for SRV records. If ``None``, try to determine the local domain name. + the DNS domain to search for SRV records. If ``None``, try to determine the + local domain name. -See ``connect()`` for a description of the ``keepalive`` and ``bind_address`` arguments. +See ``connect()`` for a description of the ``keepalive`` and ``bind_address`` +arguments. Callback ........ -When the client receives a CONNACK message from the broker in response to the connect it generates an ``on_connect()`` callback. +When the client receives a CONNACK message from the broker in response to the +connect it generates an ``on_connect()`` callback. Example ....... @@ -361,12 +410,14 @@ reconnect() reconnect() -Reconnect to a broker using the previously provided details. You must have called ``connect*()`` before calling this function. +Reconnect to a broker using the previously provided details. You must have +called ``connect*()`` before calling this function. Callback ........ -When the client receives a CONNACK message from the broker in response to the connect it generates an ``on_connect()`` callback. +When the client receives a CONNACK message from the broker in response to the +connect it generates an ``on_connect()`` callback. disconnect() '''''''''''' @@ -375,17 +426,23 @@ disconnect() disconnect() -Disconnect from the broker cleanly. Using ``disconnect()`` will not result in a will message being sent by the broker. +Disconnect from the broker cleanly. Using ``disconnect()`` will not result in a +will message being sent by the broker. Callback ........ -When the client has sent the disconnect message it generates an ``on_disconnect()`` callback. +When the client has sent the disconnect message it generates an +``on_disconnect()`` callback. Network loop ```````````` -These functions are the driving force behind the client. If they are not called, incoming network data will not be processed and outgoing network data may not be sent in a timely fashion. There are four options for managing the network loop. Three are described here, the fourth in "External event loop support" below. Do not mix the different loop functions. +These functions are the driving force behind the client. If they are not +called, incoming network data will not be processed and outgoing network data +may not be sent in a timely fashion. There are four options for managing the +network loop. Three are described here, the fourth in "External event loop +support" below. Do not mix the different loop functions. loop() '''''' @@ -394,7 +451,11 @@ loop() loop(timeout=1.0, max_packets=1) -Call regularly to process network events. This call waits in ``select()`` until the network socket is available for reading or writing, if appropriate, then handles the incoming/outgoing data. This function blocks for up to ``timeout`` seconds. ``timeout`` must not exceed the ``keepalive`` value for the client or your client will be regularly disconnected by the broker. +Call regularly to process network events. This call waits in ``select()`` until +the network socket is available for reading or writing, if appropriate, then +handles the incoming/outgoing data. This function blocks for up to ``timeout`` +seconds. ``timeout`` must not exceed the ``keepalive`` value for the client or +your client will be regularly disconnected by the broker. The ``max_packets`` argument is obsolete and should be left unset. @@ -415,7 +476,12 @@ loop_start() / loop_stop() loop_start() loop_stop(force=False) -These functions implement a threaded interface to the network loop. Calling ``loop_start()`` once, before or after ``connect*()``, runs a thread in the background to call ``loop()`` automatically. This frees up the main thread for other work that may be blocking. This call also handles reconnecting to the broker. Call ``loop_stop()`` to stop the background thread. The ``force`` argument is currently ignored. +These functions implement a threaded interface to the network loop. Calling +``loop_start()`` once, before or after ``connect*()``, runs a thread in the +background to call ``loop()`` automatically. This frees up the main thread for +other work that may be blocking. This call also handles reconnecting to the +broker. Call ``loop_stop()`` to stop the background thread. The ``force`` +argument is currently ignored. Example ....... @@ -436,11 +502,16 @@ loop_forever() loop_forever(timeout=1.0, max_packets=1, retry_first_connection=False) -This is a blocking form of the network loop and will not return until the client calls ``disconnect()``. It automatically handles reconnecting. +This is a blocking form of the network loop and will not return until the +client calls ``disconnect()``. It automatically handles reconnecting. -Except for the first connection attempt when using connect_async, use ``retry_first_connection=True`` to make it retry the first connection. Warning: This might lead to situations where the client keeps connecting to an non existing host without failing. +Except for the first connection attempt when using connect_async, use +``retry_first_connection=True`` to make it retry the first connection. +Warning: This might lead to situations where the client keeps connecting to an +non existing host without failing. -The ``timeout`` and ``max_packets`` arguments are obsolete and should be left unset. +The ``timeout`` and ``max_packets`` arguments are obsolete and should be left +unset. Publishing `````````` @@ -454,28 +525,41 @@ publish() publish(topic, payload=None, qos=0, retain=False) -This causes a message to be sent to the broker and subsequently from the broker to any clients subscribing to matching topics. It takes the following arguments: +This causes a message to be sent to the broker and subsequently from the broker +to any clients subscribing to matching topics. It takes the following +arguments: topic the topic that the message should be published on payload - the actual message to send. If not given, or set to ``None`` a zero length message will be used. Passing an int or float will result in the payload being converted to a string representing that number. If you wish to send a true int/float, use ``struct.pack()`` to create the payload you require + the actual message to send. If not given, or set to ``None`` a zero length + message will be used. Passing an int or float will result in the payload + being converted to a string representing that number. If you wish to send a + true int/float, use ``struct.pack()`` to create the payload you require qos the quality of service level to use retain - if set to ``True``, the message will be set as the "last known good"/retained message for the topic. + if set to ``True``, the message will be set as the "last known + good"/retained message for the topic. -Returns a tuple ``(result, mid)``, where result is ``MQTT_ERR_SUCCESS`` to indicate success or ``MQTT_ERR_NO_CONN`` if the client is not currently connected. ``mid`` is the message ID for the publish request. The mid value can be used to track the publish request by checking against the mid argument in the ``on_publish()`` callback if it is defined. +Returns a tuple ``(result, mid)``, where result is ``MQTT_ERR_SUCCESS`` to +indicate success or ``MQTT_ERR_NO_CONN`` if the client is not currently +connected. ``mid`` is the message ID for the publish request. The mid value can +be used to track the publish request by checking against the mid argument in +the ``on_publish()`` callback if it is defined. -A ``ValueError`` will be raised if topic is ``None``, has zero length or is invalid (contains a wildcard), if ``qos`` is not one of 0, 1 or 2, or if the length of the payload is greater than 268435455 bytes. +A ``ValueError`` will be raised if topic is ``None``, has zero length or is +invalid (contains a wildcard), if ``qos`` is not one of 0, 1 or 2, or if the +length of the payload is greater than 268435455 bytes. Callback ........ -When the message has been sent to the broker an ``on_publish()`` callback will be generated. +When the message has been sent to the broker an ``on_publish()`` callback will +be generated. Subscribe / Unsubscribe @@ -519,22 +603,30 @@ List of string and integer tuples e.g. ``subscribe([("my/topic", 0), ("another/topic", 2)])`` -This allows multiple topic subscriptions in a single SUBSCRIPTION command, which is more efficient than using multiple calls to ``subscribe()``. +This allows multiple topic subscriptions in a single SUBSCRIPTION command, +which is more efficient than using multiple calls to ``subscribe()``. topic - a list of tuple of format ``(topic, qos)``. Both topic and qos must be present in all of the tuples. + a list of tuple of format ``(topic, qos)``. Both topic and qos must be + present in all of the tuples. qos not used. -The function returns a tuple ``(result, mid)``, where ``result`` is ``MQTT_ERR_SUCCESS`` to indicate success or ``(MQTT_ERR_NO_CONN, None)`` if the client is not currently connected. ``mid`` is the message ID for the subscribe request. The mid value can be used to track the subscribe request by checking against the mid argument in the ``on_subscribe()`` callback if it is defined. +The function returns a tuple ``(result, mid)``, where ``result`` is +``MQTT_ERR_SUCCESS`` to indicate success or ``(MQTT_ERR_NO_CONN, None)`` if the +client is not currently connected. ``mid`` is the message ID for the subscribe +request. The mid value can be used to track the subscribe request by checking +against the mid argument in the ``on_subscribe()`` callback if it is defined. -Raises a ``ValueError`` if ``qos`` is not 0, 1 or 2, or if topic is ``None`` or has zero string length, or if ``topic`` is not a string, tuple or list. +Raises a ``ValueError`` if ``qos`` is not 0, 1 or 2, or if topic is ``None`` or +has zero string length, or if ``topic`` is not a string, tuple or list. Callback ........ -When the broker has acknowledged the subscription, an ``on_subscribe()`` callback will be generated. +When the broker has acknowledged the subscription, an ``on_subscribe()`` +callback will be generated. unsubscribe() ''''''''''''' @@ -546,19 +638,23 @@ unsubscribe() Unsubscribe the client from one or more topics. topic - a single string, or list of strings that are the subscription topics to unsubscribe from. + a single string, or list of strings that are the subscription topics to + unsubscribe from. -Returns a tuple ``(result, mid)``, where ``result`` is ``MQTT_ERR_SUCCESS`` -to indicate success, or ``(MQTT_ERR_NO_CONN, None)`` if the client is not -currently connected. ``mid`` is the message ID for the unsubscribe request. The mid value can be used to track the unsubscribe request by checking against the mid -argument in the ``on_unsubscribe()`` callback if it is defined. +Returns a tuple ``(result, mid)``, where ``result`` is ``MQTT_ERR_SUCCESS`` to +indicate success, or ``(MQTT_ERR_NO_CONN, None)`` if the client is not +currently connected. ``mid`` is the message ID for the unsubscribe request. The +mid value can be used to track the unsubscribe request by checking against the +mid argument in the ``on_unsubscribe()`` callback if it is defined. -Raises a ``ValueError`` if ``topic`` is ``None`` or has zero string length, or is not a string or list. +Raises a ``ValueError`` if ``topic`` is ``None`` or has zero string length, or +is not a string or list. Callback ........ -When the broker has acknowledged the unsubscribe, an ``on_unsubscribe()`` callback will be generated. +When the broker has acknowledged the unsubscribe, an ``on_unsubscribe()`` +callback will be generated. Callbacks ````````` @@ -630,7 +726,10 @@ userdata rc the disconnection result -The rc parameter indicates the disconnection state. If ``MQTT_ERR_SUCCESS`` (0), the callback was called in response to a ``disconnect()`` call. If any other value the disconnection was unexpected, such as might be caused by a network error. +The rc parameter indicates the disconnection state. If ``MQTT_ERR_SUCCESS`` +(0), the callback was called in response to a ``disconnect()`` call. If any +other value the disconnection was unexpected, such as might be caused by a +network error. Example ....... @@ -680,24 +779,32 @@ Example message_callback_add() '''''''''''''''''''''' -This function allows you to define callbacks that handle incoming messages for specific subscription filters, including with wildcards. This lets you, for example, subscribe to ``sensors/#`` and have one callback to handle ``sensors/temperature`` and another to handle ``sensors/humidity``. +This function allows you to define callbacks that handle incoming messages for +specific subscription filters, including with wildcards. This lets you, for +example, subscribe to ``sensors/#`` and have one callback to handle +``sensors/temperature`` and another to handle ``sensors/humidity``. :: message_callback_add(sub, callback) sub - the subscription filter to match against for this callback. Only one callback may be defined per literal sub string + the subscription filter to match against for this callback. Only one + callback may be defined per literal sub string callback - the callback to be used. Takes the same form as the ``on_message`` callback. + the callback to be used. Takes the same form as the ``on_message`` + callback. -If using ``message_callback_add()`` and ``on_message``, only messages that do not match a subscription specific filter will be passed to the ``on_message`` callback. +If using ``message_callback_add()`` and ``on_message``, only messages that do +not match a subscription specific filter will be passed to the ``on_message`` +callback. message_callback_remove() ''''''''''''''''''''''''' -Remove a topic/subscription specific callback previously registered using ``message_callback_add()``. +Remove a topic/subscription specific callback previously registered using +``message_callback_add()``. :: @@ -713,9 +820,15 @@ on_publish() on_publish(client, userdata, mid) -Called when a message that was to be sent using the ``publish()`` call has completed transmission to the broker. For messages with QoS levels 1 and 2, this means that the appropriate handshakes have completed. For QoS 0, this simply means that the message has left the client. The ``mid`` variable matches the mid variable returned from the corresponding ``publish()`` call, to allow outgoing messages to be tracked. +Called when a message that was to be sent using the ``publish()`` call has +completed transmission to the broker. For messages with QoS levels 1 and 2, +this means that the appropriate handshakes have completed. For QoS 0, this +simply means that the message has left the client. The ``mid`` variable matches +the mid variable returned from the corresponding ``publish()`` call, to allow +outgoing messages to be tracked. -This callback is important because even if the publish() call returns success, it does not always mean that the message has been sent. +This callback is important because even if the publish() call returns success, +it does not always mean that the message has been sent. on_subscribe() '''''''''''''' @@ -724,7 +837,10 @@ on_subscribe() on_subscribe(client, userdata, mid, granted_qos) -Called when the broker responds to a subscribe request. The ``mid`` variable matches the mid variable returned from the corresponding ``subscribe()`` call. The ``granted_qos`` variable is a list of integers that give the QoS level the broker has granted for each of the different subscription requests. +Called when the broker responds to a subscribe request. The ``mid`` variable +matches the mid variable returned from the corresponding ``subscribe()`` call. +The ``granted_qos`` variable is a list of integers that give the QoS level the +broker has granted for each of the different subscription requests. on_unsubscribe() '''''''''''''''' @@ -733,7 +849,9 @@ on_unsubscribe() on_unsubscribe(client, userdata, mid) -Called when the broker responds to an unsubscribe request. The ``mid`` variable matches the mid variable returned from the corresponding ``unsubscribe()`` call. +Called when the broker responds to an unsubscribe request. The ``mid`` variable +matches the mid variable returned from the corresponding ``unsubscribe()`` +call. on_log() '''''''' @@ -742,7 +860,10 @@ on_log() on_log(client, userdata, level, buf) -Called when the client has log information. Define to allow debugging. The ``level`` variable gives the severity of the message and will be one of ``MQTT_LOG_INFO``, ``MQTT_LOG_NOTICE``, ``MQTT_LOG_WARNING``, ``MQTT_LOG_ERR``, and ``MQTT_LOG_DEBUG``. The message itself is in ``buf``. +Called when the client has log information. Define to allow debugging. The +``level`` variable gives the severity of the message and will be one of +``MQTT_LOG_INFO``, ``MQTT_LOG_NOTICE``, ``MQTT_LOG_WARNING``, ``MQTT_LOG_ERR``, +and ``MQTT_LOG_DEBUG``. The message itself is in ``buf``. External event loop support ``````````````````````````` @@ -754,7 +875,8 @@ loop_read() loop_read(max_packets=1) -Call when the socket is ready for reading. ``max_packets`` is obsolete and should be left unset. +Call when the socket is ready for reading. ``max_packets`` is obsolete and +should be left unset. loop_write() '''''''''''' @@ -763,7 +885,8 @@ loop_write() loop_write(max_packets=1) -Call when the socket is ready for writing. ``max_packets`` is obsolete and should be left unset. +Call when the socket is ready for writing. ``max_packets`` is obsolete and +should be left unset. loop_misc() ''''''''''' @@ -781,7 +904,8 @@ socket() socket() -Returns the socket object in use in the client to allow interfacing with other event loops. +Returns the socket object in use in the client to allow interfacing with other +event loops. want_write() '''''''''''' @@ -790,14 +914,16 @@ want_write() want_write() -Returns true if there is data waiting to be written, to allow interfacing the client with other event loops. +Returns true if there is data waiting to be written, to allow interfacing the +client with other event loops. Global helper functions ``````````````````````` The client module also offers some global helper functions. -``topic_matches_sub(sub, topic)`` can be used to check whether a ``topic`` matches a ``subscription``. +``topic_matches_sub(sub, topic)`` can be used to check whether a ``topic`` +matches a ``subscription``. For example: @@ -806,15 +932,20 @@ For example: the topic ``non/matching`` would not match the subscription ``non/+/+`` -``connack_string(connack_code)`` returns the error string associated with a CONNACK result. +``connack_string(connack_code)`` returns the error string associated with a +CONNACK result. -``error_string(mqtt_errno)`` returns the error string associated with a Paho MQTT error number. +``error_string(mqtt_errno)`` returns the error string associated with a Paho +MQTT error number. Publish ******* -This module provides some helper functions to allow straightforward publishing of messages in a one-shot manner. In other words, they are useful for the situation where you have a single/multiple messages you want to publish to a broker, then disconnect with nothing else required. +This module provides some helper functions to allow straightforward publishing +of messages in a one-shot manner. In other words, they are useful for the +situation where you have a single/multiple messages you want to publish to a +broker, then disconnect with nothing else required. The two functions provided are ``single()`` and ``multiple()``. @@ -827,17 +958,19 @@ Publish a single message to a broker, then disconnect cleanly. single(topic, payload=None, qos=0, retain=False, hostname="localhost", port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, - protocol=mqtt.MQTTv311) + protocol=mqtt.MQTTv311, transport="tcp") Function arguments '''''''''''''''''' topic - the only required argument must be the topic string to which the payload will be published. + the only required argument must be the topic string to which the payload + will be published. payload - the payload to be published. If "" or None, a zero length payload will be published. + the payload to be published. If "" or None, a zero length payload will be + published. qos the qos to use when publishing, default to 0. @@ -846,13 +979,15 @@ retain set the message to be retained (True) or not (False). hostname - a string containing the address of the broker to connect to. Defaults to localhost. + a string containing the address of the broker to connect to. Defaults to + localhost. port the port to connect to the broker on. Defaults to 1883. client_id - the MQTT client id to use. If "" or None, the Paho library will generate a client id automatically. + the MQTT client id to use. If "" or None, the Paho library will + generate a client id automatically. keepalive the keepalive timeout value for the client. Defaults to 60 seconds. @@ -862,7 +997,8 @@ will will = {'topic': "", 'payload':", 'qos':, 'retain':}. - Topic is required, all other parameters are optional and will default to None, 0 and False respectively. + Topic is required, all other parameters are optional and will default to + None, 0 and False respectively. Defaults to None, which indicates no will should be used. @@ -886,7 +1022,10 @@ tls protocol choose the version of the MQTT protocol to use. Use either ``MQTTv31`` or ``MQTTv311``. - + +transport + set to "websockets" to send MQTT over WebSockets. Leave at the default of + "tcp" to use raw TCP. Example ''''''' @@ -904,7 +1043,7 @@ Publish multiple messages to a broker, then disconnect cleanly. :: multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60, - will=None, auth=None, tls=None, protocol=mqtt.MQTTv311) + will=None, auth=None, tls=None, protocol=mqtt.MQTTv311, transport="tcp") Function arguments '''''''''''''''''' @@ -924,7 +1063,7 @@ msgs ("", "", qos, retain) -See ``single()`` for the description of ``hostname``, ``port``, ``client_id``, ``keepalive``, ``will``, ``auth``, ``tls``, ``protocol``. +See ``single()`` for the description of ``hostname``, ``port``, ``client_id``, ``keepalive``, ``will``, ``auth``, ``tls``, ``protocol``, ``transport``. Example ''''''' @@ -938,6 +1077,152 @@ Example publish.multiple(msgs, hostname="iot.eclipse.org") +Subscribe +********* + +This module provides some helper functions to allow straightforward subscribing +and processing of messages. + +The two functions provided are ``simple()`` and ``callback()``. + +Simple +`````` + +Subscribe to a set of topics and return the messages received. This is a +blocking function. + +:: + + simple(topics, qos=0, msg_count=1, retained=False, hostname="localhost", + port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, + protocol=mqtt.MQTTv311) + + +Function arguments +'''''''''''''''''' + +topics + the only required argument is the topic string to which the client will + subscribe. This can either be a string or a list of strings if multiple + topics should be subscribed to. + +qos + the qos to use when subscribing, defaults to 0. + +msg_count + the number of messages to retrieve from the broker. Defaults to 1. If 1, a + single MQTTMessage object will be returned. If >1, a list of MQTTMessages + will be returned. + +retained + set to True to consider retained messages, set to False to ignore messages + with the retained flag set. + +hostname + a string containing the address of the broker to connect to. Defaults to localhost. + +port + the port to connect to the broker on. Defaults to 1883. + +client_id + the MQTT client id to use. If "" or None, the Paho library will + generate a client id automatically. + +keepalive + the keepalive timeout value for the client. Defaults to 60 seconds. + +will + a dict containing will parameters for the client: + + will = {'topic': "", 'payload':", 'qos':, 'retain':}. + + Topic is required, all other parameters are optional and will default to + None, 0 and False respectively. + + Defaults to None, which indicates no will should be used. + +auth + a dict containing authentication parameters for the client: + + auth = {'username':"", 'password':""} + + Username is required, password is optional and will default to None if not + provided. + + Defaults to None, which indicates no authentication is to be used. + +tls + a dict containing TLS configuration parameters for the client: + + dict = {'ca_certs':"", 'certfile':"", 'keyfile':"", 'tls_version':"", 'ciphers':"} + + ca_certs is required, all other parameters are optional and will default to + None if not provided, which results in the client using the default + behaviour - see the paho.mqtt.client documentation. + + Defaults to None, which indicates that TLS should not be used. + +protocol + choose the version of the MQTT protocol to use. Use either ``MQTTv31`` or ``MQTTv311``. + + +Example +''''''' + +:: + + import paho.mqtt.subscribe as subscribe + + msg = subscribe.simple("paho/test/simple", hostname="iot.eclipse.org") + print("%s %s" % (msg.topic, msg.payload)) + +Callback +```````` + +Subscribe to a set of topics and process the messages received using a user +provided callback. + +:: + + callback(callback, topics, qos=0, userdata=None, hostname="localhost", + port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, + protocol=mqtt.MQTTv311) + +Function arguments +'''''''''''''''''' + +callback + an "on_message" callback that will be used for each message received, and + of the form + + def on_message(client, userdata, message) + +topics + the topic string to which the client will subscribe. This can either be a + string or a list of strings if multiple topics should be subscribed to. + +qos + the qos to use when subscribing, defaults to 0. + +userdata + a user provided obkect that will be passed to the on_message callback when + a message is received. + +See ``simple()`` for the description of ``hostname``, ``port``, ``client_id``, ``keepalive``, ``will``, ``auth``, ``tls``, ``protocol``. + +Example +''''''' + +:: + + import paho.mqtt.subscribe as subscribe + + def on_message_print(client, userdata, message): + print("%s %s" % (message.topic, message.payload)) + + subscribe.callback(on_message_print, "paho/test/callback", hostname="iot.eclipse.org") + + Reporting bugs -------------- diff --git a/examples/pub-wait.py b/examples/pub-wait.py new file mode 100755 index 0000000000000000000000000000000000000000..e10214f79fd5f8c4019fe9ca1ea1de6d9f6dec2f --- /dev/null +++ b/examples/pub-wait.py @@ -0,0 +1,71 @@ +#!/usr/bin/python3.5 + +# Copyright (c) 2010-2013 Roger Light +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Distribution License v1.0 +# which accompanies this distribution. +# +# The Eclipse Distribution License is available at +# http://www.eclipse.org/org/documents/edl-v10.php. +# +# Contributors: +# Roger Light - initial implementation +# Copyright (c) 2010,2011 Roger Light +# All rights reserved. + +# This shows a simple example of an MQTT subscriber. + +import sys +import time +try: + import paho.mqtt.client as mqtt +except ImportError: + # This part is only required to run the example from within the examples + # directory when the module itself is not installed. + # + # If you have the module installed, just use "import paho.mqtt.client" + import os + import inspect + cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0], "../src"))) + if cmd_subfolder not in sys.path: + sys.path.insert(0, cmd_subfolder) + import paho.mqtt.client as mqtt + +def on_connect(mqttc, obj, flags, rc): + print("rc: "+str(rc)) + +def on_message(mqttc, obj, msg): + print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload)) + +def on_publish(mqttc, obj, mid): + print("mid: "+str(mid)) + pass + +def on_subscribe(mqttc, obj, mid, granted_qos): + print("Subscribed: "+str(mid)+" "+str(granted_qos)) + +def on_log(mqttc, obj, level, string): + print(string) + +# If you want to use a specific client id, use +# mqttc = mqtt.Client("client-id") +# but note that the client id must be unique on the broker. Leaving the client +# id parameter empty will generate a random id for you. +mqttc = mqtt.Client() +mqttc.on_message = on_message +mqttc.on_connect = on_connect +mqttc.on_publish = on_publish +mqttc.on_subscribe = on_subscribe +# Uncomment to enable debug messages +#mqttc.on_log = on_log +mqttc.connect("localhost", 1883, 60) + +mqttc.loop_start() + +print("tuple") +(rc, mid) = mqttc.publish("tuple", "bar", qos=2) +print("class") +infot = mqttc.publish("class", "bar", qos=2) + +infot.wait_for_publish() diff --git a/examples/sub-callback.py b/examples/sub-callback.py new file mode 100755 index 0000000000000000000000000000000000000000..728da88ba416b96c7818f153666ce6d2683c071e --- /dev/null +++ b/examples/sub-callback.py @@ -0,0 +1,36 @@ +#!/usr/bin/python + +# Copyright (c) 2016 Roger Light +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Distribution License v1.0 +# which accompanies this distribution. +# +# The Eclipse Distribution License is available at +# http://www.eclipse.org/org/documents/edl-v10.php. +# +# Contributors: +# Roger Light - initial implementation + +# This shows an example of using the subscribe.callback helper function. + +import sys +try: + import paho.mqtt.subscribe as subscribe +except ImportError: + # This part is only required to run the example from within the examples + # directory when the module itself is not installed. + # + # If you have the module installed, just use "import paho.mqtt.subscribe" + import os + import inspect + cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"../src"))) + if cmd_subfolder not in sys.path: + sys.path.insert(0, cmd_subfolder) + import paho.mqtt.subscribe as subscribe + import paho.mqtt.client + +def print_msg(client, userdata, message): + print("%s : %s" % (message.topic, message.payload)) + +subscribe.callback(print_msg, "#", hostname="iot.eclipse.org") diff --git a/examples/sub-class.py b/examples/sub-class.py index 1bc9b439e1313bd10de4955c5b9e9fb90847a5eb..cb908db36941815849378be7a77a0d82fc400855 100755 --- a/examples/sub-class.py +++ b/examples/sub-class.py @@ -4,9 +4,9 @@ # # All rights reserved. This program and the accompanying materials # are made available under the terms of the Eclipse Distribution License v1.0 -# which accompanies this distribution. +# which accompanies this distribution. # -# The Eclipse Distribution License is available at +# The Eclipse Distribution License is available at # http://www.eclipse.org/org/documents/edl-v10.php. # # Contributors: @@ -29,36 +29,30 @@ except ImportError: sys.path.insert(0, cmd_subfolder) import paho.mqtt.client as mqtt -class MyMQTTClass: - def __init__(self, clientid=None): - self._mqttc = mqtt.Client(clientid) - self._mqttc.on_message = self.mqtt_on_message - self._mqttc.on_connect = self.mqtt_on_connect - self._mqttc.on_publish = self.mqtt_on_publish - self._mqttc.on_subscribe = self.mqtt_on_subscribe +class MyMQTTClass(mqtt.Client): - def mqtt_on_connect(self, mqttc, obj, flags, rc): + def on_connect(self, mqttc, obj, flags, rc): print("rc: "+str(rc)) - def mqtt_on_message(self, mqttc, obj, msg): + def on_message(self, mqttc, obj, msg): print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload)) - def mqtt_on_publish(self, mqttc, obj, mid): + def on_publish(self, mqttc, obj, mid): print("mid: "+str(mid)) - def mqtt_on_subscribe(self, mqttc, obj, mid, granted_qos): + def on_subscribe(self, mqttc, obj, mid, granted_qos): print("Subscribed: "+str(mid)+" "+str(granted_qos)) - def mqtt_on_log(self, mqttc, obj, level, string): + def on_log(self, mqttc, obj, level, string): print(string) def run(self): - self._mqttc.connect("m2m.eclipse.org", 1883, 60) - self._mqttc.subscribe("$SYS/#", 0) + self.connect("m2m.eclipse.org", 1883, 60) + self.subscribe("$SYS/#", 0) rc = 0 while rc == 0: - rc = self._mqttc.loop() + rc = self.loop() return rc diff --git a/examples/sub-simple.py b/examples/sub-simple.py new file mode 100755 index 0000000000000000000000000000000000000000..5e31d1b6bdd1f5de2a3d7638428646ebd42d07f0 --- /dev/null +++ b/examples/sub-simple.py @@ -0,0 +1,38 @@ +#!/usr/bin/python + +# Copyright (c) 2016 Roger Light +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Distribution License v1.0 +# which accompanies this distribution. +# +# The Eclipse Distribution License is available at +# http://www.eclipse.org/org/documents/edl-v10.php. +# +# Contributors: +# Roger Light - initial implementation + +# This shows an example of using the subscribe.simple helper function. + +import sys +try: + import paho.mqtt.subscribe as subscribe +except ImportError: + # This part is only required to run the example from within the examples + # directory when the module itself is not installed. + # + # If you have the module installed, just use "import paho.mqtt.subscribe" + import os + import inspect + cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"../src"))) + if cmd_subfolder not in sys.path: + sys.path.insert(0, cmd_subfolder) + import paho.mqtt.subscribe as subscribe + import paho.mqtt.client + +topics = ['#'] + +m = subscribe.simple(topics, hostname="iot.eclipse.org", retained=False, msg_count=2) +for a in m: + print(a.topic) + print(a.payload) diff --git a/examples/sub-ws.py b/examples/sub-ws.py new file mode 100755 index 0000000000000000000000000000000000000000..8c1487bc6e1c0ad80c9e037dbf5c94e48a634a48 --- /dev/null +++ b/examples/sub-ws.py @@ -0,0 +1,66 @@ +#!/usr/bin/python3 + +# Copyright (c) 2010-2013 Roger Light +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Distribution License v1.0 +# which accompanies this distribution. +# +# The Eclipse Distribution License is available at +# http://www.eclipse.org/org/documents/edl-v10.php. +# +# Contributors: +# Roger Light - initial implementation +# Copyright (c) 2010,2011 Roger Light +# All rights reserved. + +# This shows a simple example of an MQTT subscriber. + +import sys + +try: + import paho.mqtt.client as mqtt +except ImportError: + # This part is only required to run the example from within the examples + # directory when the module itself is not installed. + # + # If you have the module installed, just use "import paho.mqtt.client" + import os + import inspect + cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"../src"))) + if cmd_subfolder not in sys.path: + sys.path.insert(0, cmd_subfolder) + import paho.mqtt.client as mqtt + +def on_connect(mqttc, obj, flags, rc): + print("rc: "+str(rc)) + +def on_message(mqttc, obj, msg): + print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload)) + +def on_publish(mqttc, obj, mid): + print("mid: "+str(mid)) + +def on_subscribe(mqttc, obj, mid, granted_qos): + print("Subscribed: "+str(mid)+" "+str(granted_qos)) + +def on_log(mqttc, obj, level, string): + print(string) + +# If you want to use a specific client id, use +# mqttc = mqtt.Client("client-id") +# but note that the client id must be unique on the broker. Leaving the client +# id parameter empty will generate a random id for you. +mqttc = mqtt.Client(transport="websockets") +mqttc.on_message = on_message +mqttc.on_connect = on_connect +mqttc.on_publish = on_publish +mqttc.on_subscribe = on_subscribe +# Uncomment to enable debug messages +mqttc.on_log = on_log +mqttc.connect("test.mosquitto.org", 8080, 60) +mqttc.subscribe("$SYS/broker/version", 0) + + +mqttc.loop_forever() + diff --git a/setup.py b/setup.py index ea7d847420801167ccf7f4ce7da6cf24a39761bc..08ce61937da0944080d41d4203aacb9c6bbc6d0d 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ from paho.mqtt import __version__ from distutils.core import setup setup(name='paho-mqtt', version=__version__, - description='MQTT version 3.1/3.1.1 client class', + description='MQTT version 3.1.1 client class', author='Roger Light', author_email='roger@atchoo.org', url='http://eclipse.org/paho', diff --git a/src/paho/mqtt/__init__.py b/src/paho/mqtt/__init__.py index f90140865080afb52d77821d801aa0ec07fc6cf8..4c0b4b2f924cab2719741dda763c3532d4da4b63 100644 --- a/src/paho/mqtt/__init__.py +++ b/src/paho/mqtt/__init__.py @@ -1 +1,5 @@ -__version__ = "1.1" +__version__ = "1.2" + +class MQTTException(Exception): + def __init__(self, *args, **kwargs): + Exception.__init__(self, *args, **kwargs) diff --git a/src/paho/mqtt/client.py b/src/paho/mqtt/client.py index bc2025c7238c22bfe39d1ca934928e30001dc7a6..8017ccfaaf9d5b8aeeadf4913b3df5170ca136bb 100755 --- a/src/paho/mqtt/client.py +++ b/src/paho/mqtt/client.py @@ -33,7 +33,17 @@ except: import struct import sys import threading + import time +import uuid +import base64 +import hashlib +try: + # Use monotionic clock if available + time_func = time.monotonic +except AttributeError: + time_func = time.time + HAVE_DNS = True try: import dns.resolver @@ -46,7 +56,7 @@ else: EAGAIN = errno.EAGAIN VERSION_MAJOR=1 -VERSION_MINOR=0 +VERSION_MINOR=2 VERSION_REVISION=0 VERSION_NUMBER=(VERSION_MAJOR*1000000+VERSION_MINOR*1000+VERSION_REVISION) @@ -60,8 +70,6 @@ else: PROTOCOL_NAMEv31 = b"MQIsdp" PROTOCOL_NAMEv311 = b"MQTT" -PROTOCOL_VERSION = 3 - # Message types CONNECT = 0x10 CONNACK = 0x20 @@ -128,6 +136,7 @@ MQTT_ERR_AUTH = 11 MQTT_ERR_ACL_DENIED = 12 MQTT_ERR_UNKNOWN = 13 MQTT_ERR_ERRNO = 14 +MQTT_ERR_QUEUE_SIZE = 15 if sys.version_info[0] < 3: sockpair_data = "0" @@ -274,9 +283,67 @@ def _socketpair_compat(): return (sock1, sock2) +class MQTTMessageInfo: + """This is a class returned from Client.publish() and can be used to find + out the mid of the message that was published, and to determine whether the + message has been published, and/or wait until it is published. + """ + def __init__(self, mid): + self.mid = mid + self._published = False + self._condition = threading.Condition() + self.rc = 0 + self._iterpos = 0 + + def __str__(self): + return str((self.rc, self.mid)) + + def __iter__(self): + self._iterpos = 0 + return self + + def __next__(self): + return self.next() + + def next(self): + if self._iterpos == 0: + self._iterpos = 1 + return self.rc + elif self._iterpos == 1: + self._iterpos = 2 + return self.mid + else: + raise StopIteration + + def __getitem__(self, index): + if index == 0: + return self.rc + elif index == 1: + return self.mid + else: + raise IndexError("index out of range") + + def _set_as_published(self): + with self._condition: + self._published = True + self._condition.notify() + + def wait_for_publish(self): + """Block until the message associated with this object is published.""" + with self._condition: + while self._published == False: + self._condition.wait() + + def is_published(self): + """Returns True if the message associated with this object has been + published, else returns False.""" + with self._condition: + return self._published + + class MQTTMessage: - """ This is a class that describes an incoming message. It is passed to the - on_message callback as the message parameter. + """ This is a class that describes an incoming or outgoing message. It is + passed to the on_message callback as the message parameter. Members: @@ -286,15 +353,16 @@ class MQTTMessage: retain : Boolean. If true, the message is a retained message and not fresh. mid : Integer. The message id. """ - def __init__(self): + def __init__(self, mid=0, topic=""): self.timestamp = 0 self.state = mqtt_ms_invalid self.dup = False - self.mid = 0 - self.topic = "" + self.mid = mid + self.topic = topic self.payload = None self.qos = 0 self.retain = False + self.info = MQTTMessageInfo(mid) class Client(object): @@ -387,7 +455,7 @@ class Client(object): MQTT_LOG_ERR, and MQTT_LOG_DEBUG. The message itself is in buf. """ - def __init__(self, client_id="", clean_session=True, userdata=None, protocol=MQTTv31): + def __init__(self, client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp"): """client_id is the unique client id string used when connecting to the broker. If client_id is zero length or None, then one will be randomly generated. In this case, clean_session must be True. If this is not the @@ -408,14 +476,18 @@ class Client(object): The protocol argument allows explicit setting of the MQTT version to use for this client. Can be paho.mqtt.client.MQTTv311 (v3.1.1) or - paho.mqtt.client.MQTTv31 (v3.1), with the default being v3.1. If the + paho.mqtt.client.MQTTv31 (v3.1), with the default being v3.1.1 If the broker reports that the client connected with an invalid protocol version, the client will automatically attempt to reconnect using v3.1 instead. + + Set transport to "websockets" to use WebSockets as the transport + mechanism. Set to "tcp" to use raw TCP, which is the default. """ if not clean_session and (client_id == "" or client_id is None): raise ValueError('A client id must be provided if clean session is False.') + self._transport = transport self._protocol = protocol self._userdata = userdata self._sock = None @@ -442,8 +514,8 @@ class Client(object): "pos": 0} self._out_packet = [] self._current_out_packet = None - self._last_msg_in = time.time() - self._last_msg_out = time.time() + self._last_msg_in = time_func() + self._last_msg_out = time_func() self._ping_t = 0 self._last_mid = 0 self._state = mqtt_cs_new @@ -451,25 +523,19 @@ class Client(object): self._in_messages = [] self._max_inflight_messages = 20 self._inflight_messages = 0 + self._max_queued_messages = 0 self._will = False self._will_topic = "" self._will_payload = None self._will_qos = 0 self._will_retain = False - self.on_disconnect = None - self.on_connect = None - self.on_publish = None - self.on_message = None self.on_message_filtered = [] - self.on_subscribe = None - self.on_unsubscribe = None - self.on_log = None self._host = "" self._port = 1883 self._bind_address = "" self._in_callback = False self._strict_protocol = False - self._callback_mutex = threading.Lock() + self._callback_mutex = threading.RLock() self._state_mutex = threading.Lock() self._out_packet_mutex = threading.Lock() self._current_out_packet_mutex = threading.Lock() @@ -486,6 +552,14 @@ class Client(object): self._tls_ciphers = None self._tls_version = tls_version self._tls_insecure = False + # No default callbacks + self._on_log = None + self._on_connect = None + self._on_subscribe = None + self._on_message = None + self._on_publish = None + self._on_unsubscribe = None + self._on_disconnect = None def __del__(self): pass @@ -620,7 +694,7 @@ class Client(object): """ if HAVE_DNS is False: - raise ValueError('No DNS resolver library found.') + raise ValueError('No DNS resolver library found, try "pip install dnspython" or "pip3 install dnspython3".') if domain is None: domain = socket.getfqdn() @@ -708,8 +782,8 @@ class Client(object): self._current_out_packet_mutex.release() self._msgtime_mutex.acquire() - self._last_msg_in = time.time() - self._last_msg_out = time.time() + self._last_msg_in = time_func() + self._last_msg_out = time_func() self._msgtime_mutex.release() self._ping_t = 0 @@ -752,8 +826,17 @@ class Client(object): else: ssl.match_hostname(self._ssl.getpeercert(), self._host) + if self._transport == "websockets": + if self._tls_ca_certs is not None: + self._ssl = WebsocketWrapper(self._ssl, self._host, self._port, True) + else: + sock = WebsocketWrapper(sock, self._host, self._port, False) + self._sock = sock - self._sock.setblocking(0) + if self._ssl: + self._ssl.setblocking(0) + else: + self._sock.setblocking(0) return self._send_connect(self._keepalive, self._clean_session) @@ -804,6 +887,9 @@ class Client(object): # Can occur if we just reconnected but rlist/wlist contain a -1 for # some reason. return MQTT_ERR_CONN_LOST + except KeyboardInterrupt: + # Allow ^C to interrupt + raise except: return MQTT_ERR_UNKNOWN @@ -846,11 +932,20 @@ class Client(object): retain: If set to true, the message will be set as the "last known good"/retained message for the topic. - Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS to - indicate success or MQTT_ERR_NO_CONN if the client is not currently - connected. mid is the message ID for the publish request. The mid - value can be used to track the publish request by checking against the - mid argument in the on_publish() callback if it is defined. + Returns a MQTTMessageInfo class, which can be used to determine whether + the message has been delivered (using info.is_published()) or to block + waiting for the message to be delivered (info.wait_for_publish()). The + message ID and return code of the publish() call can be found at + info.mid and info.rc. + + For backwards compatibility, the MQTTMessageInfo class is iterable so + the old construct of (rc, mid) = client.publish(...) is still valid. + + rc is MQTT_ERR_SUCCESS to indicate success or MQTT_ERR_NO_CONN if the + client is not currently connected. mid is the message ID for the + publish request. The mid value can be used to track the publish request + by checking against the mid argument in the on_publish() callback if it + is defined. A ValueError will be raised if topic is None, has zero length or is invalid (contains a wildcard), if qos is not one of 0, 1 or 2, or if @@ -861,6 +956,8 @@ class Client(object): raise ValueError('Invalid QoS level.') if isinstance(payload, str) or isinstance(payload, bytearray): local_payload = payload + elif sys.version_info[0] == 3 and isinstance(payload, bytes): + local_payload = bytearray(payload) elif sys.version_info[0] < 3 and isinstance(payload, unicode): local_payload = payload elif isinstance(payload, int) or isinstance(payload, float): @@ -879,14 +976,14 @@ class Client(object): local_mid = self._mid_generate() if qos == 0: - rc = self._send_publish(local_mid, topic, local_payload, qos, retain, False) - return (rc, local_mid) + info = MQTTMessageInfo(local_mid) + rc = self._send_publish(local_mid, topic, local_payload, qos, retain, False, info) + info.rc = rc + return info else: - message = MQTTMessage() - message.timestamp = time.time() + message = MQTTMessage(local_mid, topic) + message.timestamp = time_func() - message.mid = local_mid - message.topic = topic if local_payload is None or len(local_payload) == 0: message.payload = None else: @@ -897,6 +994,11 @@ class Client(object): message.dup = False self._out_message_mutex.acquire() + + if self._max_queued_messages > 0 and len(self._out_messages) >= self._max_queued_messages: + self._out_message_mutex.release() + return (MQTT_ERR_QUEUE_SIZE, local_mid) + self._out_messages.append(message) if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages: self._inflight_messages = self._inflight_messages+1 @@ -914,11 +1016,13 @@ class Client(object): self._inflight_messages -= 1 message.state = mqtt_ms_publish - return (rc, local_mid) + message.info.rc = rc + return message.info else: message.state = mqtt_ms_queued; self._out_message_mutex.release() - return (MQTT_ERR_SUCCESS, local_mid) + message.info.rc = MQTT_ERR_SUCCESS + return message.info def username_pw_set(self, username, password=None): """Set a username and optionally a password for broker authentication. @@ -987,7 +1091,7 @@ class Client(object): zero string length, or if topic is not a string, tuple or list. """ topic_qos_list = None - if isinstance(topic, str): + if isinstance(topic, str) or (sys.version_info[0] == 2 and isinstance(topic, unicode)): if qos<0 or qos>2: raise ValueError('Invalid QoS level.') if topic is None or len(topic) == 0: @@ -1119,7 +1223,7 @@ class Client(object): if self._sock is None and self._ssl is None: return MQTT_ERR_NO_CONN - now = time.time() + now = time_func() self._check_keepalive() if self._last_retry_check+1 < now: # Only check once a second at most @@ -1157,6 +1261,16 @@ class Client(object): raise ValueError('Invalid inflight.') self._max_inflight_messages = inflight + def max_queued_messages_set(self, queue_size): + """Set the maximum number of messages in the outgoing message queue. + 0 means unlimited.""" + if queue_size < 0: + raise ValueError('Invalid queue size.') + if not isinstance(queue_size, int): + raise ValueError('Invalid type of queue size.') + self._max_queued_messages = queue_size + return self + def message_retry_set(self, retry): """Set the timeout in seconds before a message with QoS>0 is retried. 20 seconds by default.""" @@ -1244,6 +1358,9 @@ class Client(object): run = True while run: + if self._thread_terminate is True: + break + if self._state == mqtt_cs_connect_async: try: self.reconnect() @@ -1317,18 +1434,192 @@ class Client(object): return MQTT_ERR_INVAL self._thread_terminate = True - self._thread.join() - self._thread = None + if threading.current_thread() != self._thread: + self._thread.join() + self._thread = None + + @property + def on_log(self): + """If implemented, called when the client has log information. + Defined to allow debugging.""" + return self._on_log + + @on_log.setter + def on_log(self, func): + """ Define the logging callback implementation. + + Expected signature is: + log_callback(client, userdata, level, buf) + + client: the client instance for this callback + userdata: the private user data as set in Client() or userdata_set() + level: gives the severity of the message and will be one of + MQTT_LOG_INFO, MQTT_LOG_NOTICE, MQTT_LOG_WARNING, + MQTT_LOG_ERR, and MQTT_LOG_DEBUG. + buf: the message itself + """ + self._on_log = func + + @property + def on_connect(self): + """If implemented, called when the broker responds to our connection + request.""" + return self._on_connect + + @on_connect.setter + def on_connect(self, func): + """ Define the connect callback implementation. + + Expected signature is: + connect_callback(client, userdata, flags, rc) + + client: the client instance for this callback + userdata: the private user data as set in Client() or userdata_set() + flags: response flags sent by the broker + rc: the connection result + + flags is a dict that contains response flags from the broker: + flags['session present'] - this flag is useful for clients that are + using clean session set to 0 only. If a client with clean + session=0, that reconnects to a broker that it has previously + connected to, this flag indicates whether the broker still has the + session information for the client. If 1, the session still exists. + + The value of rc indicates success or not: + 0: Connection successful + 1: Connection refused - incorrect protocol version + 2: Connection refused - invalid client identifier + 3: Connection refused - server unavailable + 4: Connection refused - bad username or password + 5: Connection refused - not authorised + 6-255: Currently unused. + """ + self._on_connect = func + + @property + def on_subscribe(self): + """If implemented, called when the broker responds to a subscribe + request.""" + return self._on_subscribe + + @on_subscribe.setter + def on_subscribe(self, func): + """ Define the suscribe callback implementation. + + Expected signature is: + subscribe_callback(client, userdata, mid, granted_qos) + + client: the client instance for this callback + userdata: the private user data as set in Client() or userdata_set() + mid: matches the mid variable returned from the corresponding + subscribe() call. + granted_qos: list of integers that give the QoS level the broker has + granted for each of the different subscription requests. + """ + self._on_subscribe = func + + @property + def on_message(self): + """If implemented, called when a message has been received on a topic + that the client subscribes to. + + This callback will be called for every message received. Use + message_callback_add() to define multiple callbacks that will be called + for specific topic filters.""" + return self._on_message + + @on_message.setter + def on_message(self, func): + """ Define the message received callback implementation. + + Expected signature is: + on_message_callback(client, userdata, message) + + client: the client instance for this callback + userdata: the private user data as set in Client() or userdata_set() + message: an instance of MQTTMessage. + This is a class with members topic, payload, qos, retain. + """ + self._on_message = func + + @property + def on_publish(self): + """If implemented, called when a message that was to be sent using the + publish() call has completed transmission to the broker. + + For messages with QoS levels 1 and 2, this means that the appropriate + handshakes have completed. For QoS 0, this simply means that the message + has left the client. + This callback is important because even if the publish() call returns + success, it does not always mean that the message has been sent.""" + return self._on_publish + + @on_publish.setter + def on_publish(self, func): + """ Define the published message callback implementation. + + Expected signature is: + on_publish_callback(client, userdata, mid) + + client: the client instance for this callback + userdata: the private user data as set in Client() or userdata_set() + mid: matches the mid variable returned from the corresponding + publish() call, to allow outgoing messages to be tracked. + """ + self._on_publish = func + + @property + def on_unsubscribe(self): + """If implemented, called when the broker responds to an unsubscribe + request.""" + return self._on_unsubscribe + + @on_unsubscribe.setter + def on_unsubscribe(self, func): + """ Define the unsubscribe callback implementation. + + Expected signature is: + unsubscribe_callback(client, userdata, mid) + + client: the client instance for this callback + userdata: the private user data as set in Client() or userdata_set() + mid: matches the mid variable returned from the corresponding + unsubscribe() call. + """ + self._on_unsubscribe = func + + @property + def on_disconnect(self): + """If implemented, called when the client disconnects from the broker. + """ + return self._on_disconnect + + @on_disconnect.setter + def on_disconnect(self, func): + """ Define the disconnect callback implementation. + + Expected signature is: + disconnect_callback(client, userdata, self) + + client: the client instance for this callback + userdata: the private user data as set in Client() or userdata_set() + rc: the disconnection result + The rc parameter indicates the disconnection state. If + MQTT_ERR_SUCCESS (0), the callback was called in response to + a disconnect() call. If any other value the disconnection + was unexpected, such as might be caused by a network error. + """ + self._on_disconnect = func def message_callback_add(self, sub, callback): """Register a message callback for a specific topic. Messages that match 'sub' will be passed to 'callback'. Any non-matching messages will be passed to the default on_message callback. - + Call multiple times with different 'sub' to define multiple topic specific callbacks. - + Topic specific callbacks may be removed with message_callback_remove().""" if callback is None or sub is None: @@ -1486,7 +1777,7 @@ class Client(object): pos=0) self._msgtime_mutex.acquire() - self._last_msg_in = time.time() + self._last_msg_in = time_func() self._msgtime_mutex.release() return rc @@ -1501,7 +1792,7 @@ class Client(object): write_length = self._ssl.write(packet['packet'][packet['pos']:]) else: write_length = self._sock.send(packet['packet'][packet['pos']:]) - except AttributeError: + except (AttributeError, ValueError): self._current_out_packet_mutex.release() return MQTT_ERR_SUCCESS except socket.error as err: @@ -1524,14 +1815,15 @@ class Client(object): self._in_callback = True self.on_publish(self, self._userdata, packet['mid']) self._in_callback = False - self._callback_mutex.release() + packet['info']._set_as_published() + if (packet['command'] & 0xF0) == DISCONNECT: self._current_out_packet_mutex.release() self._msgtime_mutex.acquire() - self._last_msg_out = time.time() + self._last_msg_out = time_func() self._msgtime_mutex.release() self._callback_mutex.acquire() @@ -1561,7 +1853,7 @@ class Client(object): self._current_out_packet_mutex.release() self._msgtime_mutex.acquire() - self._last_msg_out = time.time() + self._last_msg_out = time_func() self._msgtime_mutex.release() return MQTT_ERR_SUCCESS @@ -1571,7 +1863,10 @@ class Client(object): self.on_log(self, self._userdata, level, buf) def _check_keepalive(self): - now = time.time() + if self._keepalive == 0: + return MQTT_ERR_SUCCESS + + now = time_func() self._msgtime_mutex.acquire() last_msg_out = self._last_msg_out last_msg_in = self._last_msg_in @@ -1621,7 +1916,7 @@ class Client(object): self._easy_log(MQTT_LOG_DEBUG, "Sending PINGREQ") rc = self._send_simple_command(PINGREQ) if rc == MQTT_ERR_SUCCESS: - self._ping_t = time.time() + self._ping_t = time_func() return rc def _send_pingresp(self): @@ -1677,7 +1972,7 @@ class Client(object): else: raise TypeError - def _send_publish(self, mid, topic, payload=None, qos=0, retain=False, dup=False): + def _send_publish(self, mid, topic, payload=None, qos=0, retain=False, dup=False, info=None): if self._sock is None and self._ssl is None: return MQTT_ERR_NO_CONN @@ -1724,7 +2019,7 @@ class Client(object): else: raise TypeError('payload must be a string, unicode or a bytearray.') - return self._packet_queue(PUBLISH, packet, mid, qos) + return self._packet_queue(PUBLISH, packet, mid, qos, info) def _send_pubrec(self, mid): self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREC (Mid: "+str(mid)+")") @@ -1837,7 +2132,7 @@ class Client(object): def _message_retry_check_actual(self, messages, mutex): mutex.acquire() - now = time.time() + now = time_func() for m in messages: if m.timestamp + self._message_retry < now: if m.state == mqtt_ms_wait_for_puback or m.state == mqtt_ms_wait_for_pubrec: @@ -1899,14 +2194,15 @@ class Client(object): self._messages_reconnect_reset_out() self._messages_reconnect_reset_in() - def _packet_queue(self, command, packet, mid, qos): + def _packet_queue(self, command, packet, mid, qos, info=None): mpkt = dict( command = command, mid = mid, qos = qos, pos = 0, to_process = len(packet), - packet = packet) + packet = packet, + info = info) self._out_packet_mutex.acquire() self._out_packet.append(mpkt) @@ -2014,7 +2310,7 @@ class Client(object): rc = 0 self._out_message_mutex.acquire() for m in self._out_messages: - m.timestamp = time.time() + m.timestamp = time_func() if m.state == mqtt_ms_queued: self.loop_write() # Process outgoing messages that have just been queued up self._out_message_mutex.release() @@ -2113,7 +2409,7 @@ class Client(object): ", m"+str(message.mid)+", '"+message.topic+ "', ... ("+str(len(message.payload))+" bytes)") - message.timestamp = time.time() + message.timestamp = time_func() if message.qos == 0: self._handle_on_message(message) return MQTT_ERR_SUCCESS @@ -2196,7 +2492,7 @@ class Client(object): for m in self._out_messages: if m.mid == mid: m.state = mqtt_ms_wait_for_pubcomp - m.timestamp = time.time() + m.timestamp = time_func() self._out_message_mutex.release() return self._send_pubrel(mid, False) @@ -2219,6 +2515,25 @@ class Client(object): self._callback_mutex.release() return MQTT_ERR_SUCCESS + def _do_on_publish(self, idx, mid): + with self._callback_mutex: + if self.on_publish: + self._out_message_mutex.release() + self._in_callback = True + self.on_publish(self, self._userdata, mid) + self._in_callback = False + self._out_message_mutex.acquire() + + msg = self._out_messages.pop(idx) + if msg.qos > 0: + self._inflight_messages = self._inflight_messages - 1 + if self._max_inflight_messages > 0: + rc = self._update_inflight() + if rc != MQTT_ERR_SUCCESS: + return rc + msg.info._set_as_published() + return MQTT_ERR_SUCCESS + def _handle_pubackcomp(self, cmd): if self._strict_protocol: if self._in_packet['remaining_length'] != 2: @@ -2233,24 +2548,9 @@ class Client(object): try: if self._out_messages[i].mid == mid: # Only inform the client the message has been sent once. - self._callback_mutex.acquire() - if self.on_publish: - self._out_message_mutex.release() - self._in_callback = True - self.on_publish(self, self._userdata, mid) - self._in_callback = False - self._out_message_mutex.acquire() - - self._callback_mutex.release() - self._out_messages.pop(i) - self._inflight_messages = self._inflight_messages - 1 - if self._max_inflight_messages > 0: - rc = self._update_inflight() - if rc != MQTT_ERR_SUCCESS: - self._out_message_mutex.release() - return rc + rc = self._do_on_publish(i, mid) self._out_message_mutex.release() - return MQTT_ERR_SUCCESS + return rc except IndexError: # Have removed item so i>count. # Not really an error. @@ -2277,14 +2577,7 @@ class Client(object): self._callback_mutex.release() def _thread_main(self): - self._state_mutex.acquire() - if self._state == mqtt_cs_connect_async: - self._state_mutex.release() - self.reconnect() - else: - self._state_mutex.release() - - self.loop_forever() + self.loop_forever(retry_first_connection=True) def _host_matches_cert(self, host, cert_host): if cert_host[0:2] == "*.": @@ -2337,7 +2630,295 @@ class Client(object): raise ssl.SSLError('Certificate subject does not match remote hostname.') -# Compatibility class for easy porting from mosquitto.py. +# Compatibility class for easy porting from mosquitto.py. class Mosquitto(Client): def __init__(self, client_id="", clean_session=True, userdata=None): super(Mosquitto, self).__init__(client_id, clean_session, userdata) + +class WebsocketWrapper: + + OPCODE_CONTINUATION = 0x0 + OPCODE_TEXT = 0x1 + OPCODE_BINARY = 0x2 + OPCODE_CONNCLOSE = 0x8 + OPCODE_PING = 0x9 + OPCODE_PONG = 0xa + + def __init__(self, socket, host, port, is_ssl): + + self.connected = False + + self._ssl = is_ssl + self._host = host + self._port = port + self._socket = socket + + self._sendbuffer = bytearray() + self._readbuffer = bytearray() + + self._requested_size = 0 + self._payload_head = 0 + self._readbuffer_head = 0 + + self._do_handshake() + + def __del__(self): + + self._sendbuffer = None + self._readbuffer = None + + def _do_handshake(self): + + sec_websocket_key = uuid.uuid4().bytes + sec_websocket_key = base64.b64encode(sec_websocket_key) + + header = b"GET /mqtt HTTP/1.1\r\n" +\ + b"Upgrade: websocket\r\n" +\ + b"Connection: Upgrade\r\n" +\ + b"Host: " + str(self._host).encode('utf-8') + b":" + str(self._port).encode('utf-8') + b"\r\n" +\ + b"Origin: http://" + str(self._host).encode('utf-8') + b":" + str(self._port).encode('utf-8') + b"\r\n" +\ + b"Sec-WebSocket-Key: " + sec_websocket_key + b"\r\n" +\ + b"Sec-WebSocket-Version: 13\r\n" +\ + b"Sec-WebSocket-Protocol: mqtt\r\n\r\n" + + if self._ssl: + self._socket.write(header) + else: + self._socket.send(header) + + has_secret = False + has_upgrade = False + + while True: + # read HTTP response header as lines + if self._ssl: + byte = self._socket.read(1) + else: + byte = self._socket.recv(1) + + self._readbuffer.extend(byte) + # line end + if byte == b"\n": + if len(self._readbuffer) > 2: + # check upgrade + if b"connection" in str(self._readbuffer).lower().encode('utf-8'): + if b"upgrade" not in str(self._readbuffer).lower().encode('utf-8'): + raise ValueError("WebSocket handshake error, connection not upgraded") + else: + has_upgrade = True + + # check key hash + if b"sec-websocket-accept" in str(self._readbuffer).lower().encode('utf-8'): + GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" + + server_hash = self._readbuffer.decode('utf-8').split(": ", 1)[1] + server_hash = server_hash.strip().encode('utf-8') + + client_hash = sec_websocket_key.decode('utf-8') + GUID + client_hash = hashlib.sha1(client_hash.encode('utf-8')) + client_hash = base64.b64encode(client_hash.digest()) + + if server_hash != client_hash: + raise ValueError("WebSocket handshake error, invalid secret key") + else: + has_secret = True + else: + # ending linebreak + break + + # reset linebuffer + self._readbuffer = bytearray() + + # connection reset + elif not byte: + raise ValueError("WebSocket handshake error") + + if not has_upgrade or not has_secret: + raise ValueError("WebSocket handshake error") + + self._readbuffer = bytearray() + self.connected = True + + def _create_frame(self, opcode, data, do_masking=1): + + header = bytearray() + length = len(data) + mask_key = bytearray([random.randint(0, 255), random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)]) + mask_flag = do_masking + + # 1 << 7 is the final flag, we don't send continuated data + header.append(1 << 7 | opcode) + + if length < 126: + header.append(mask_flag << 7 | length) + + elif length < 32768: + header.append(mask_flag << 7 | 126) + header += struct.pack("!H", length) + + elif length < 0x8000000000000001: + header.append(mask_flag << 7 | 127) + header += struct.pack("!Q", length) + + else: + raise ValueError("Maximum payload size is 2^63") + + if mask_flag == 1: + for index in range(length): + data[index] ^= mask_key[index % 4] + data = mask_key + data + + return header + data + + def _buffered_read(self, length): + + # try to recv and strore needed bytes + if self._readbuffer_head + length > len(self._readbuffer): + + if self._ssl: + data = self._socket.read(self._readbuffer_head + length - len(self._readbuffer)) + else: + data = self._socket.recv(self._readbuffer_head + length - len(self._readbuffer)) + + if not data: + raise socket.error(errno.ECONNABORTED, 0) + else: + self._readbuffer.extend(data) + + self._readbuffer_head += length + return self._readbuffer[self._readbuffer_head-length:self._readbuffer_head] + + def _recv_impl(self, length): + + # try to decode websocket payload part from data + try: + + self._readbuffer_head = 0 + + result = None + + chunk_startindex = self._payload_head + chunk_endindex = self._payload_head + length + + header1 = self._buffered_read(1) + header2 = self._buffered_read(1) + + opcode = (header1[0] & 0x0f) + maskbit = (header2[0] & 0x80) == 0x80 + lengthbits = (header2[0] & 0x7f) + payload_length = lengthbits + mask_key = None + + # read length + if lengthbits == 0x7e: + + value = self._buffered_read(2) + payload_length = struct.unpack("!H", value)[0] + + elif lengthbits == 0x7f: + + value = self._buffered_read(8) + payload_length = struct.unpack("!Q", value)[0] + + # read mask + if maskbit: + + mask_key = self._buffered_read(4) + + # if frame payload is shorter than the requested data, read only the possible part + readindex = chunk_endindex + if payload_length < readindex: + readindex = payload_length + + if readindex > 0: + + # get payload chunk + payload = self._buffered_read(readindex) + + # unmask only the needed part + if maskbit: + for index in range(chunk_startindex, readindex): + payload[index] ^= mask_key[index % 4] + + result = payload[chunk_startindex:readindex] + self._payload_head = readindex + + # check if full frame arrived and reset readbuffer and payloadhead if needed + if readindex == payload_length: + self._readbuffer = bytearray() + self._payload_head = 0 + + # respond to non-binary opcodes, their arrival is not guaranteed beacause of non-blocking sockets + if opcode == WebsocketWrapper.OPCODE_CONNCLOSE: + frame = self._create_frame(WebsocketWrapper.OPCODE_CONNCLOSE, payload, 0) + if self._ssl: + self._socket.write(frame) + else: + self._socket.send(frame) + + if opcode == WebsocketWrapper.OPCODE_PING: + frame = self._create_frame(WebsocketWrapper.OPCODE_PONG, payload, 0) + if self._ssl: + self._socket.write(frame) + else: + self._socket.send(frame) + + if opcode == WebsocketWrapper.OPCODE_BINARY: + return result + else: + raise socket.error(errno.EAGAIN, 0) + + except socket.error as err: + + if err.errno == errno.ECONNABORTED: + self.connected = False + return b'' + else: + # no more data + raise + + def _send_impl(self, data): + + # if previous frame was sent successfully + if len(self._sendbuffer) == 0: + + # create websocket frame + frame = self._create_frame(WebsocketWrapper.OPCODE_BINARY, bytearray(data)) + self._sendbuffer.extend(frame) + self._requested_size = len(data) + + # try to write out as much as possible + if self._ssl: + length = self._socket.write(self._sendbuffer) + else: + length = self._socket.send(self._sendbuffer) + + self._sendbuffer = self._sendbuffer[length:] + + if len(self._sendbuffer) == 0: + # buffer sent out completely, return with payload's size + return self._requested_size + else: + # couldn't send whole data, request the same data again with 0 as sent length + return 0 + + def recv(self, length): + return self._recv_impl(length) + + def read(self, length): + return self._recv_impl(length) + + def send(self, data): + return self._send_impl(data) + + def write(self, data): + return self._send_impl(data) + + def close(self): + self._socket.close() + + def fileno(self): + return self._socket.fileno() + + def setblocking(self,flag): + self._socket.setblocking(flag) diff --git a/src/paho/mqtt/publish.py b/src/paho/mqtt/publish.py index b468dda82f15f005492a9dd75437c4dca7cb5917..029cf40a5d20dd5c946d27b847fdd523165ec68d 100644 --- a/src/paho/mqtt/publish.py +++ b/src/paho/mqtt/publish.py @@ -19,13 +19,13 @@ situation where you have a single/multiple messages you want to publish to a broker, then disconnect and nothing else is required. """ -import paho.mqtt.client as mqtt +import paho.mqtt.client as paho +import paho.mqtt as mqtt def _do_publish(c): """Internal function""" - m = c._userdata[0] - c._userdata = c._userdata[1:] + m = c._userdata.pop() if type(m) is dict: topic = m['topic'] try: @@ -50,7 +50,10 @@ def _do_publish(c): def _on_connect(c, userdata, flags, rc): """Internal callback""" - _do_publish(c) + if rc == 0: + _do_publish(c) + else: + raise mqtt.MQTTException(paho.connack_string(rc)) def _on_publish(c, userdata, mid): @@ -62,7 +65,7 @@ def _on_publish(c, userdata, mid): def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60, - will=None, auth=None, tls=None, protocol=mqtt.MQTTv31): + will=None, auth=None, tls=None, protocol=paho.MQTTv311, transport="tcp"): """Publish multiple messages to a broker, then disconnect cleanly. This function creates an MQTT client, connects to a broker and publishes a @@ -110,13 +113,15 @@ def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60, default to None if not provided, which results in the client using the default behaviour - see the paho.mqtt.client documentation. Defaults to None, which indicates that TLS should not be used. + transport : set to "tcp" to use the default setting of transport which is + raw TCP. Set to "websockets" to use WebSockets as the transport. """ if type(msgs) is not list: raise ValueError('msgs must be a list') - client = mqtt.Client(client_id=client_id, - userdata=msgs, protocol=protocol) + client = paho.Client(client_id=client_id, + userdata=msgs, protocol=protocol, transport=transport) client.on_publish = _on_publish client.on_connect = _on_connect @@ -172,7 +177,7 @@ def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60, def single(topic, payload=None, qos=0, retain=False, hostname="localhost", port=1883, client_id="", keepalive=60, will=None, auth=None, - tls=None, protocol=mqtt.MQTTv31): + tls=None, protocol=paho.MQTTv311, transport="tcp"): """Publish a single message to a broker, then disconnect cleanly. This function creates an MQTT client, connects to a broker and publishes a @@ -210,8 +215,10 @@ def single(topic, payload=None, qos=0, retain=False, hostname="localhost", default to None if not provided, which results in the client using the default behaviour - see the paho.mqtt.client documentation. Defaults to None, which indicates that TLS should not be used. + transport : set to "tcp" to use the default setting of transport which is + raw TCP. Set to "websockets" to use WebSockets as the transport. """ msg = {'topic':topic, 'payload':payload, 'qos':qos, 'retain':retain} - multiple([msg], hostname, port, client_id, keepalive, will, auth, tls, protocol) + multiple([msg], hostname, port, client_id, keepalive, will, auth, tls, protocol, transport) diff --git a/src/paho/mqtt/subscribe.py b/src/paho/mqtt/subscribe.py new file mode 100644 index 0000000000000000000000000000000000000000..539b84ee2616f61a9bf370a8a3b1b21465720328 --- /dev/null +++ b/src/paho/mqtt/subscribe.py @@ -0,0 +1,259 @@ +# Copyright (c) 2016 Roger Light +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# and Eclipse Distribution License v1.0 which accompany this distribution. +# +# The Eclipse Public License is available at +# http://www.eclipse.org/legal/epl-v10.html +# and the Eclipse Distribution License is available at +# http://www.eclipse.org/org/documents/edl-v10.php. +# +# Contributors: +# Roger Light - initial API and implementation + +""" +This module provides some helper functions to allow straightforward subscribing +to topics and retrieving messages. The two functions are simple(), which +returns one or messages matching a set of topics, and callback() which allows +you to pass a callback for processing of messages. +""" + +import paho.mqtt.client as paho +import paho.mqtt as mqtt +import ssl + + +def _on_connect(c, userdata, flags, rc): + """Internal callback""" + if rc != 0: + raise mqtt.MQTTException(paho.connack_string(rc)) + + if type(userdata['topics']) is list: + for t in userdata['topics']: + c.subscribe(t, userdata['qos']) + else: + c.subscribe(userdata['topics'], userdata['qos']) + + +def _on_message_callback(c, userdata, message): + """Internal callback""" + userdata['callback'](c, userdata['userdata'], message) + + +def _on_message_simple(c, userdata, message): + """Internal callback""" + + if userdata['msg_count'] == 0: + return + + # Don't process stale retained messages if 'retained' was false + if userdata['retained'] == False and message.retain == True: + return + + userdata['msg_count'] = userdata['msg_count'] - 1 + + if userdata['messages'] is None and userdata['msg_count'] == 0: + userdata['messages'] = message + c.disconnect() + return + + userdata['messages'].append(message) + if userdata['msg_count'] == 0: + c.disconnect() + + +def callback(callback, topics, qos=0, userdata=None, hostname="localhost", + port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None, + protocol=paho.MQTTv311, transport="tcp"): + """Subscribe to a list of topics and process them in a callback function. + + This function creates an MQTT client, connects to a broker and subscribes + to a list of topics. Incoming messages are processed by the user provided + callback. This is a blocking function and will never return. + + callback : function of the form "on_message(client, userdata, message)" for + processing the messages received. + + topics : either a string containing a single topic to subscribe to, or a + list of topics to subscribe to. + + qos : the qos to use when subscribing. This is applied to all topics. + + userdata : passed to the callback + + hostname : a string containing the address of the broker to connect to. + Defaults to localhost. + + port : the port to connect to the broker on. Defaults to 1883. + + client_id : the MQTT client id to use. If "" or None, the Paho library will + generate a client id automatically. + + keepalive : the keepalive timeout value for the client. Defaults to 60 + seconds. + + will : a dict containing will parameters for the client: will = {'topic': + "", 'payload':", 'qos':, 'retain':}. + Topic is required, all other parameters are optional and will + default to None, 0 and False respectively. + Defaults to None, which indicates no will should be used. + + auth : a dict containing authentication parameters for the client: + auth = {'username':"", 'password':""} + Username is required, password is optional and will default to None + if not provided. + Defaults to None, which indicates no authentication is to be used. + + tls : a dict containing TLS configuration parameters for the client: + dict = {'ca_certs':"", 'certfile':"", + 'keyfile':"", 'tls_version':"", + 'ciphers':"} + ca_certs is required, all other parameters are optional and will + default to None if not provided, which results in the client using + the default behaviour - see the paho.mqtt.client documentation. + Defaults to None, which indicates that TLS should not be used. + transport : set to "tcp" to use the default setting of transport which is + raw TCP. Set to "websockets" to use WebSockets as the transport. + """ + + if qos < 0 or qos > 2: + raise ValueError('qos must be in the range 0-2') + + callback_userdata = { + 'callback':callback, + 'topics':topics, + 'qos':qos, + 'userdata':userdata} + + client = paho.Client(client_id=client_id, + userdata=callback_userdata, protocol=protocol, transport=transport) + client.on_message = _on_message_callback + client.on_connect = _on_connect + + if auth is not None: + username = auth['username'] + try: + password = auth['password'] + except KeyError: + password = None + client.username_pw_set(username, password) + + if will is not None: + will_topic = will['topic'] + try: + will_payload = will['payload'] + except KeyError: + will_payload = None + try: + will_qos = will['qos'] + except KeyError: + will_qos = 0 + try: + will_retain = will['retain'] + except KeyError: + will_retain = False + + client.will_set(will_topic, will_payload, will_qos, will_retain) + + if tls is not None: + ca_certs = tls['ca_certs'] + try: + certfile = tls['certfile'] + except KeyError: + certfile = None + try: + keyfile = tls['keyfile'] + except KeyError: + keyfile = None + try: + tls_version = tls['tls_version'] + except KeyError: + tls_version = ssl.PROTOCOL_SSLv23; + try: + ciphers = tls['ciphers'] + except KeyError: + ciphers = None + client.tls_set(ca_certs, certfile, keyfile, tls_version=tls_version, + ciphers=ciphers) + + client.connect(hostname, port, keepalive) + client.loop_forever() + + +def simple(topics, qos=0, msg_count=1, retained=True, hostname="localhost", port=1883, + client_id="", keepalive=60, will=None, auth=None, tls=None, + protocol=paho.MQTTv311, transport="tcp"): + """Subscribe to a list of topics and return msg_count messages. + + This function creates an MQTT client, connects to a broker and subscribes + to a list of topics. Once "msg_count" messages have been received, it + disconnects cleanly from the broker and returns the messages. + + topics : either a string containing a single topic to subscribe to, or a + list of topics to subscribe to. + + qos : the qos to use when subscribing. This is applied to all topics. + + msg_count : the number of messages to retrieve from the broker. + if msg_count == 1 then a single MQTTMessage will be returned. + if msg_count > 1 then a list of MQTTMessages will be returned. + + retained : If set to True, retained messages will be processed the same as + non-retained messages. If set to False, retained messages will + be ignored. This means that with retained=False and msg_count=1, + the function will return the first message received that does + not have the retained flag set. + + hostname : a string containing the address of the broker to connect to. + Defaults to localhost. + + port : the port to connect to the broker on. Defaults to 1883. + + client_id : the MQTT client id to use. If "" or None, the Paho library will + generate a client id automatically. + + keepalive : the keepalive timeout value for the client. Defaults to 60 + seconds. + + will : a dict containing will parameters for the client: will = {'topic': + "", 'payload':", 'qos':, 'retain':}. + Topic is required, all other parameters are optional and will + default to None, 0 and False respectively. + Defaults to None, which indicates no will should be used. + + auth : a dict containing authentication parameters for the client: + auth = {'username':"", 'password':""} + Username is required, password is optional and will default to None + if not provided. + Defaults to None, which indicates no authentication is to be used. + + tls : a dict containing TLS configuration parameters for the client: + dict = {'ca_certs':"", 'certfile':"", + 'keyfile':"", 'tls_version':"", + 'ciphers':"} + ca_certs is required, all other parameters are optional and will + default to None if not provided, which results in the client using + the default behaviour - see the paho.mqtt.client documentation. + Defaults to None, which indicates that TLS should not be used. + transport : set to "tcp" to use the default setting of transport which is + raw TCP. Set to "websockets" to use WebSockets as the transport. + """ + + if msg_count < 1: + raise ValueError('msg_count must be > 0') + + # Set ourselves up to return a single message if msg_count == 1, or a list + # if > 1. + if msg_count == 1: + messages = None + else: + messages = [] + + userdata = {'retained':retained, 'msg_count':msg_count, 'messages':messages} + + callback(_on_message_simple, topics, qos, userdata, hostname, port, + client_id, keepalive, will, auth, tls, protocol, transport) + + return userdata['messages'] + diff --git a/test/lib/01-con-discon-success-mqtt311.py b/test/lib/01-con-discon-success-mqtt311.py index 5b61c4c7a5355351053a3338c36bb02332efeb61..077b5a010602decce8170b1ca29dda18bf8b36ff 100755 --- a/test/lib/01-con-discon-success-mqtt311.py +++ b/test/lib/01-con-discon-success-mqtt311.py @@ -13,7 +13,6 @@ import os import subprocess import socket import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/01-con-discon-success.py b/test/lib/01-con-discon-success.py index d5ad1750bdea2c620ff3bfa6fdbc473dd9bb064d..6921bb05506df0903fb8560c097c3d2fd3df3c65 100755 --- a/test/lib/01-con-discon-success.py +++ b/test/lib/01-con-discon-success.py @@ -13,7 +13,6 @@ import os import subprocess import socket import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) @@ -24,7 +23,7 @@ import paho_test rc = 1 keepalive = 60 -connect_packet = paho_test.gen_connect("01-con-discon-success", keepalive=keepalive) +connect_packet = paho_test.gen_connect("01-con-discon-success", keepalive=keepalive, proto_name="MQIsdp", proto_ver=3) connack_packet = paho_test.gen_connack(rc=0) disconnect_packet = paho_test.gen_disconnect() diff --git a/test/lib/01-no-clean-session.py b/test/lib/01-no-clean-session.py index 7afbf3ac3a0031243197d631c2a62b4cfe86e67d..ff387638acddc1378a54ba29a175b257137abe93 100755 --- a/test/lib/01-no-clean-session.py +++ b/test/lib/01-no-clean-session.py @@ -10,7 +10,6 @@ import os import subprocess import socket import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/01-unpwd-set.py b/test/lib/01-unpwd-set.py index 1756d173971daac018682d602c0862b42cb89741..978ed918cf8d1ac88fab5efb69676cf378527a6a 100755 --- a/test/lib/01-unpwd-set.py +++ b/test/lib/01-unpwd-set.py @@ -10,7 +10,6 @@ import os import subprocess import socket import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/01-will-set.py b/test/lib/01-will-set.py index aad12c2324588e95102e8204250a1aec646886f0..44ae37b9418a09034406314fa13b0c284b964a04 100755 --- a/test/lib/01-will-set.py +++ b/test/lib/01-will-set.py @@ -12,7 +12,6 @@ import os import subprocess import socket import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/01-will-unpwd-set.py b/test/lib/01-will-unpwd-set.py index 0ff92943a35afa06e7955861c09363d4d58001ef..42cf19678211bfdc989b0c82413b2f165995a71d 100755 --- a/test/lib/01-will-unpwd-set.py +++ b/test/lib/01-will-unpwd-set.py @@ -12,7 +12,6 @@ import os import subprocess import socket import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/02-subscribe-qos0.py b/test/lib/02-subscribe-qos0.py index 80998e409a81e9c5818897ccd8248f1145ba1268..8eb6a85d4971668e7906210002823e99b113a6fc 100755 --- a/test/lib/02-subscribe-qos0.py +++ b/test/lib/02-subscribe-qos0.py @@ -17,7 +17,6 @@ import os import subprocess import socket import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/02-subscribe-qos1.py b/test/lib/02-subscribe-qos1.py index efcaf3172bf7c07cf931d0c1a4ae73ce12d9eeff..4b5c05f8574b655721a3142780d27c8d1eb5e851 100755 --- a/test/lib/02-subscribe-qos1.py +++ b/test/lib/02-subscribe-qos1.py @@ -17,7 +17,6 @@ import os import subprocess import socket import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/02-subscribe-qos2.py b/test/lib/02-subscribe-qos2.py index fcbf2dd5fcd1612ce2d9a4f0c7ec5972f6ec3cdc..d8c706f3096f8bd59cfd9e77ed97f2509567ea19 100755 --- a/test/lib/02-subscribe-qos2.py +++ b/test/lib/02-subscribe-qos2.py @@ -17,7 +17,6 @@ import os import subprocess import socket import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/02-unsubscribe.py b/test/lib/02-unsubscribe.py index 33b6102bb75df85a6b16b2d3f17fa73d6813cde6..b5effd5239d7ae5a9c4f7553605c51cc5c5a7205 100755 --- a/test/lib/02-unsubscribe.py +++ b/test/lib/02-unsubscribe.py @@ -7,7 +7,6 @@ import os import subprocess import socket import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/03-publish-c2b-qos1-disconnect.py b/test/lib/03-publish-c2b-qos1-disconnect.py index 27f7c4f73ea9b600c6bd6754edb378ff9c3b68fc..ecfdd1d1c3f62e8dc59115199085df09ec59fb75 100755 --- a/test/lib/03-publish-c2b-qos1-disconnect.py +++ b/test/lib/03-publish-c2b-qos1-disconnect.py @@ -7,7 +7,6 @@ import os import subprocess import socket import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/03-publish-c2b-qos1-timeout.py b/test/lib/03-publish-c2b-qos1-timeout.py index 43541d47c8a74b677290f86bd5ec93589d11d875..439ab440cbddb2ff1e8e0e217a6f02f91af5ea98 100755 --- a/test/lib/03-publish-c2b-qos1-timeout.py +++ b/test/lib/03-publish-c2b-qos1-timeout.py @@ -21,7 +21,6 @@ import os import subprocess import socket import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/03-publish-c2b-qos2-disconnect.py b/test/lib/03-publish-c2b-qos2-disconnect.py index 42b06d0255b7332cc6042242d525a5a8450a68f0..cc285a8ff2fcf4554214f11b6b7f3ee1c07e2133 100755 --- a/test/lib/03-publish-c2b-qos2-disconnect.py +++ b/test/lib/03-publish-c2b-qos2-disconnect.py @@ -7,7 +7,6 @@ import os import subprocess import socket import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/03-publish-c2b-qos2-timeout.py b/test/lib/03-publish-c2b-qos2-timeout.py index f8553c04dbb98b2ccb4af723f6c6a4b08a520786..cb63d1c9ca609eec68b8934b4dc516ee635e859b 100755 --- a/test/lib/03-publish-c2b-qos2-timeout.py +++ b/test/lib/03-publish-c2b-qos2-timeout.py @@ -25,7 +25,6 @@ import os import subprocess import socket import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/03-publish-qos0-no-payload.py b/test/lib/03-publish-qos0-no-payload.py index fd02caad05e77b1e0854c291149fdd40c1697706..ea79cdfca47bdc2cf19bb28550d8a27f6f286586 100755 --- a/test/lib/03-publish-qos0-no-payload.py +++ b/test/lib/03-publish-qos0-no-payload.py @@ -15,7 +15,6 @@ import os import subprocess import socket import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/03-publish-qos0.py b/test/lib/03-publish-qos0.py index 987e29afe821920c81ea8bf25c7eae0feab7a875..55c39aafe2db9ce722ce306ce2ca9eda3e77e484 100755 --- a/test/lib/03-publish-qos0.py +++ b/test/lib/03-publish-qos0.py @@ -15,7 +15,6 @@ import os import subprocess import socket import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/04-retain-qos0.py b/test/lib/04-retain-qos0.py index 35dff0a320ffef485995c78688e33c91e27d341b..b6ebf7b345a4454b6f4cd23056200a8dc94efffb 100755 --- a/test/lib/04-retain-qos0.py +++ b/test/lib/04-retain-qos0.py @@ -7,7 +7,6 @@ import os import subprocess import socket import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/08-ssl-bad-cacert.py b/test/lib/08-ssl-bad-cacert.py index c74ba80c86d793282c4cd4e95136b7d0940a6726..e04caae9cacc91781ee26bd65be589c6cec07d92 100755 --- a/test/lib/08-ssl-bad-cacert.py +++ b/test/lib/08-ssl-bad-cacert.py @@ -3,18 +3,13 @@ import inspect import os import subprocess -import socket -import ssl import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) if cmd_subfolder not in sys.path: sys.path.insert(0, cmd_subfolder) -import paho_test - if sys.version < '2.7': print("WARNING: SSL not supported on Python 2.6") exit(0) diff --git a/test/lib/08-ssl-connect-cert-auth.py b/test/lib/08-ssl-connect-cert-auth.py index 74b8ba024433c08bc3c617eddc02cbe090330b6b..3f83d17c942e3d67b8a9ae7d804fd3162cf45281 100755 --- a/test/lib/08-ssl-connect-cert-auth.py +++ b/test/lib/08-ssl-connect-cert-auth.py @@ -16,7 +16,6 @@ import subprocess import socket import ssl import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/08-ssl-connect-no-auth.py b/test/lib/08-ssl-connect-no-auth.py index ddff6b059019f8bbbc8b478c2ce9b05f738ce072..cd4efb52cf37a63dbef3285c276f75b86c3692fe 100755 --- a/test/lib/08-ssl-connect-no-auth.py +++ b/test/lib/08-ssl-connect-no-auth.py @@ -15,7 +15,6 @@ import subprocess import socket import ssl import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/08-ssl-fake-cacert.py b/test/lib/08-ssl-fake-cacert.py index 70c5cf98d2cf9cc16153127836d2081921c11ded..109845879ab55cfd4a20317522fef9f3f7430112 100755 --- a/test/lib/08-ssl-fake-cacert.py +++ b/test/lib/08-ssl-fake-cacert.py @@ -13,8 +13,6 @@ cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(insp if cmd_subfolder not in sys.path: sys.path.insert(0, cmd_subfolder) -import paho_test - if sys.version < '2.7': print("WARNING: SSL not supported on Python 2.6") exit(0) diff --git a/test/lib/09-util-topic-matching.py b/test/lib/09-util-topic-matching.py index 3221642d154dc23a2410ffffa91c13f9dcf7de9c..56db472a2e17f5a8d9484a7354cc0ce8465c1836 100755 --- a/test/lib/09-util-topic-matching.py +++ b/test/lib/09-util-topic-matching.py @@ -4,7 +4,6 @@ import inspect import os import subprocess import sys -import time # From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) diff --git a/test/lib/python/01-con-discon-success.test b/test/lib/python/01-con-discon-success.test index b150f9c0390eb4375adfbd177b08d23ab6f1c424..0a8fc4ce0013a8c01a0aa8e0c08affe8f073bf76 100755 --- a/test/lib/python/01-con-discon-success.test +++ b/test/lib/python/01-con-discon-success.test @@ -22,7 +22,7 @@ def on_disconnect(mqttc, obj, rc): run = -1 -mqttc = mqtt.Client("01-con-discon-success", run) +mqttc = mqtt.Client("01-con-discon-success", run, protocol=mqtt.MQTTv31) mqttc.on_connect = on_connect mqttc.on_disconnect = on_disconnect diff --git a/test/lib/python3/01-con-discon-success.test b/test/lib/python3/01-con-discon-success.test index 65eacb9c45063b5f5c5d99ce087ffb9b3a2ddc46..04fbad3ac0b117697b4f6b629e2878dcc807379a 100755 --- a/test/lib/python3/01-con-discon-success.test +++ b/test/lib/python3/01-con-discon-success.test @@ -22,7 +22,7 @@ def on_disconnect(mqttc, obj, rc): run = -1 -mqttc = mqtt.Client("01-con-discon-success", run) +mqttc = mqtt.Client("01-con-discon-success", run, protocol=mqtt.MQTTv31) mqttc.on_connect = on_connect mqttc.on_disconnect = on_disconnect diff --git a/test/paho_test.py b/test/paho_test.py index 9cc6b0c7fe15f40a5697ea496e93c25a97b2624d..a16c5d4a310053fab76716e75925f20bdb88a2b9 100644 --- a/test/paho_test.py +++ b/test/paho_test.py @@ -201,7 +201,7 @@ def to_string(packet): # Reserved return "0xF0" -def gen_connect(client_id, clean_session=True, keepalive=60, username=None, password=None, will_topic=None, will_qos=0, will_retain=False, will_payload="", proto_name="MQIsdp", proto_ver=3): +def gen_connect(client_id, clean_session=True, keepalive=60, username=None, password=None, will_topic=None, will_qos=0, will_retain=False, will_payload="", proto_name="MQTT", proto_ver=4): if client_id == None: remaining_length = 12 else: