提交 8aab958a 编写于 作者: J James Myatt

Merge branch 'fixes' into develop

Signed-off-by: NJames Myatt <james@jamesmyatt.co.uk>

# Conflicts:
#	ChangeLog.txt
#	src/paho/mqtt/client.py
#	src/paho/mqtt/publish.py
#	test/lib/Makefile
#	test/lib/context.py
#	test/lib/python/01-zero-length-clientid.test
#	test/lib/python/08-ssl-connect-cert-auth.test
#	test/lib/python/08-ssl-connect-no-auth.test
#	test/lib/python/08-ssl-fake-cacert.test
#	test/paho_test.py
Contributing to Paho
====================
Thanks for your interest in this project.
Project description:
--------------------
The Paho project has been created to provide scalable open-source implementations of open and standard messaging protocols aimed at new, existing, and emerging applications for Machine-to-Machine (M2M) and Internet of Things (IoT).
Paho reflects the inherent physical and cost constraints of device connectivity. Its objectives include effective levels of decoupling between devices and applications, designed to keep markets open and encourage the rapid growth of scalable Web and Enterprise middleware and applications. Paho is being kicked off with MQTT publish/subscribe client implementations for use on embedded platforms, along with corresponding server support as determined by the community.
- https://projects.eclipse.org/projects/technology.paho
Source
------
The Paho Python code is stored in a git repository. The URLs to access it are:
ssh://<username>@git.eclipse.org:29418/paho/org.eclipse.paho.mqtt.python
https://<username>@git.eclipse.org/r/paho/org.eclipse.paho.mqtt.python
A web browsable repository is available at
http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.python.git
Contributing a patch
--------------------
The Paho repositories are accessed through Gerrit, the code review
project, which makes it possible for anybody to clone the repository, make
changes and push them back for review and eventual acceptance into the project.
To do this, you must follow a few steps. The first of these are described at
- https://wiki.eclipse.org/Development_Resources/Contributing_via_Git
* Sign the Eclipse CLA
* Use a valid commit record, including a signed-off-by entry.
There are further details at
- https://wiki.eclipse.org/Development_Resources/Handling_Git_Contributions
Once the patch is pushed back to Gerrit, the project committers will be
informed and they will undertake a review of the code. The patch may need
modifying for some reason. In order to make amending commits more
straightforward, the steps at
https://git.eclipse.org/r/Documentation/cmd-hook-commit-msg.html should be
followed. This automatically inserts a "Change-Id" entry to your commit message
which allows you to amend commits and have Gerrit track them as the same
change.
What happens next depends on the content of the patch. If it is 100% authored
by the contributor and is less than 250 lines (and meets the needs of the
project), then it can be committed to the main repository. If not, more steps
are required. These are detailed in the legal process poster:
- http://www.eclipse.org/legal/EclipseLegalProcessPoster.pdf
Developer resources:
--------------------
Information regarding source code management, builds, coding standards, and more.
- https://projects.eclipse.org/projects/technology.paho/developer
Contributor License Agreement:
------------------------------
Before your contribution can be accepted by the project, you need to create and electronically sign the Eclipse Foundation Contributor License Agreement (CLA).
- http://www.eclipse.org/legal/CLA.php
Contact:
--------
Contact the project developers via the project's "dev" list.
- https://dev.eclipse.org/mailman/listinfo/paho-dev
Search for bugs:
----------------
This project uses Bugzilla to track ongoing development and issues.
- https://bugs.eclipse.org/bugs/buglist.cgi?product=Paho
Create a new bug:
-----------------
Be sure to search for existing bugs before you create another one. Remember that contributions are always welcome!
- https://bugs.eclipse.org/bugs/enter_bug.cgi?product=Paho
# Contributing to Paho
Thanks for your interest in this project!
You can contribute bugfixes and new features by sending pull requests through GitHub.
## Legal
In order for your contribution to be accepted, it must comply with the Eclipse Foundation IP policy.
Please read the [Eclipse Foundation policy on accepting contributions via Git](http://wiki.eclipse.org/Development_Resources/Contributing_via_Git).
1. Sign the [Eclipse CLA](http://www.eclipse.org/legal/CLA.php)
1. Register for an Eclipse Foundation User ID. You can register [here](https://dev.eclipse.org/site_login/createaccount.php).
2. Log into the [Projects Portal](https://projects.eclipse.org/), and click on the '[Eclipse CLA](https://projects.eclipse.org/user/sign/cla)' link.
2. Go to your [account settings](https://dev.eclipse.org/site_login/myaccount.php#open_tab_accountsettings) and add your GitHub username to your account.
3. Make sure that you _sign-off_ your Git commits in the following format:
``` Signed-off-by: John Smith <johnsmith@nowhere.com> ``` This is usually at the bottom of the commit message. You can automate this by adding the '-s' flag when you make the commits. e.g. ```git commit -s -m "Adding a cool feature"```
4. Ensure that the email address that you make your commits with is the same one you used to sign up to the Eclipse Foundation website with.
## Contributing a change
1. [Fork the repository on GitHub](https://github.com/eclipse/paho.mqtt.python/fork)
2. Clone the forked repository onto your computer: ``` git clone
https://github.com/<your username>/paho.mqtt.python.git ```
3. If you are adding a new feature, then create a new branch from the latest
```develop``` branch with ```git checkout -b YOUR_BRANCH_NAME
origin/develop```
4. If you are fixing a bug, then create a new branch from the latest
```fixes``` branch with ```git checkout -b YOUR_BRANCH_NAME origin/fixes```
5. Make your changes
6. Ensure that all new and existing tests pass.
7. Commit the changes into the branch: ``` git commit -s ``` Make sure that
your commit message is meaningful and describes your changes correctly.
8. If you have a lot of commits for the change, squash them into a single / few
commits.
9. Push the changes in your branch to your forked repository.
10. Finally, go to
[https://github.com/eclipse/paho.mqtt.python](https://github.com/eclipse/paho.mqtt.python)
and create a pull request from your "YOUR_BRANCH_NAME" branch to the
```develop``` or ```fixes``` branch as appropriate to request review and
merge of the commits in your pushed branch.
What happens next depends on the content of the patch. If it is 100% authored
by the contributor and is less than 1000 lines (and meets the needs of the
project), then it can be pulled into the main repository. If not, more steps
are required. These are detailed in the
[legal process poster](http://www.eclipse.org/legal/EclipseLegalProcessPoster.pdf).
## Developer resources:
Information regarding source code management, builds, coding standards, and
more.
- [https://projects.eclipse.org/projects/iot.paho/developer](https://projects.eclipse.org/projects/iot.paho/developer)
Contact:
--------
Contact the project developers via the project's development
[mailing list](https://dev.eclipse.org/mailman/listinfo/paho-dev).
Search for bugs:
----------------
This project uses [Github](https://github.com/eclipse/paho.mqtt.python/issues)
to track ongoing development and issues.
Create a new bug:
-----------------
Be sure to search for existing bugs before you create another one. Remember
that contributions are always welcome!
- [Create new Paho bug](https://github.com/eclipse/paho.mqtt.python/issues)
v1.x - 2016-xx-xx
v2.x - 2017-xx-xx
=================
- Allow username and password to be zero length (as opposed to not being
......@@ -9,6 +9,22 @@ v1.x - 2016-xx-xx
Closes #115.
v1.2.1 - 2017-04-03
===================
- Handle unicode username and passwords correctly. Closes #79.
- Fix handling of invalid UTF-8 topics on incoming messages - the library now
does not attempt to decode the topic - this will happen when the user
accesses msg.topic in the on_message callback. If the topic is not valid
UTF-8, an exception will be raised. Closes #75.
- Fix issue with WebSocket connection in case of network issue (timeout or
connection broken). Closes #105.
- Fix issue with SSL connection, where latest incoming message may be delayed
or never processed. Closes #131.
- Fix possible message lost with publish.single and publish.multiple. Closes
#119.
v1.2 - 2016-06-03
=================
......
Eclipse Paho MQTT Python Client
===============================
Eclipse Paho MQTT Python Client
================================
This document describes the source code for the `Eclipse Paho <http://eclipse.org/paho/>`_ MQTT Python client library, which implements versions 3.1 and 3.1.1 of the MQTT protocol.
......@@ -32,7 +32,7 @@ Contents
* `Multiple`_
* `Subscribe`_
* `Simple`_
* `Calback`_
* `Using Callback`_
* `Reporting bugs`_
* `More information`_
......@@ -58,14 +58,14 @@ To obtain the full code, including examples and tests, you can clone the git rep
::
git clone git://git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.python.git
git clone https://github.com/eclipse/paho.mqtt.python
Once you have the code, it can be installed from your repository as well:
::
cd org.eclipse.paho.mqtt.python
cd paho.mqtt.python
python setup.py install
Usage and API
......@@ -163,8 +163,8 @@ transport
"tcp" to use raw TCP.
Example
.......
Constructor Example
...................
::
......@@ -182,8 +182,8 @@ reinitialise()
The ``reinitialise()`` function resets the client to its starting state as if it had just been created. It takes the same arguments as the ``Client()`` constructor.
Example
.......
Reinitialise Example
....................
::
......@@ -220,6 +220,7 @@ message_retry_set()
'''''''''''''''''''
::
message_retry_set(retry)
Set the time in seconds before a message with QoS>0 is retried, if the broker does not respond.
......@@ -329,6 +330,7 @@ user_data_set()
'''''''''''''''
::
user_data_set(userdata)
Set the private user data that will be passed to callbacks when events are generated. Use this for your own purpose to support your application.
......@@ -337,6 +339,7 @@ will_set()
''''''''''
::
will_set(topic, payload=None, qos=0, retain=False)
Set a Will to be sent to the broker. If the client disconnects without calling
......@@ -398,8 +401,8 @@ Callback
When the client receives a CONNACK message from the broker in response to the
connect it generates an ``on_connect()`` callback.
Example
.......
Connect Example
...............
::
......@@ -415,8 +418,8 @@ connect_async()
Use in conjunction with ``loop_start()`` to connect in a non-blocking manner.
The connection will not complete until ``loop_start()`` is called.
Callback
........
Callback (connect)
..................
When the client receives a CONNACK message from the broker in response to the
connect it generates an ``on_connect()`` callback.
......@@ -438,14 +441,14 @@ domain
See ``connect()`` for a description of the ``keepalive`` and ``bind_address``
arguments.
Callback
........
Callback (connect_srv)
......................
When the client receives a CONNACK message from the broker in response to the
connect it generates an ``on_connect()`` callback.
Example
.......
SRV Connect Example
...................
::
......@@ -461,8 +464,8 @@ reconnect()
Reconnect to a broker using the previously provided details. You must have
called ``connect*()`` before calling this function.
Callback
........
Callback (reconnect)
....................
When the client receives a CONNACK message from the broker in response to the
connect it generates an ``on_connect()`` callback.
......@@ -477,8 +480,8 @@ disconnect()
Disconnect from the broker cleanly. Using ``disconnect()`` will not result in a
will message being sent by the broker.
Callback
........
Callback (disconnect)
.....................
When the client has sent the disconnect message it generates an
``on_disconnect()`` callback.
......@@ -507,8 +510,8 @@ your client will be regularly disconnected by the broker.
The ``max_packets`` argument is obsolete and should be left unset.
Example
.......
Loop Example
............
::
......@@ -531,8 +534,8 @@ other work that may be blocking. This call also handles reconnecting to the
broker. Call ``loop_stop()`` to stop the background thread. The ``force``
argument is currently ignored.
Example
.......
Loop Start/Stop Example
.......................
::
......@@ -603,8 +606,8 @@ A ``ValueError`` will be raised if topic is ``None``, has zero length or is
invalid (contains a wildcard), if ``qos`` is not one of 0, 1 or 2, or if the
length of the payload is greater than 268435455 bytes.
Callback
........
Callback (publish)
..................
When the message has been sent to the broker an ``on_publish()`` callback will
be generated.
......@@ -670,8 +673,8 @@ against the mid argument in the ``on_subscribe()`` callback if it is defined.
Raises a ``ValueError`` if ``qos`` is not 0, 1 or 2, or if topic is ``None`` or
has zero string length, or if ``topic`` is not a string, tuple or list.
Callback
........
Callback (subscribe)
....................
When the broker has acknowledged the subscription, an ``on_subscribe()``
callback will be generated.
......@@ -698,8 +701,8 @@ mid argument in the ``on_unsubscribe()`` callback if it is defined.
Raises a ``ValueError`` if ``topic`` is ``None`` or has zero string length, or
is not a string or list.
Callback
........
Callback (unsubscribe)
......................
When the broker has acknowledged the unsubscribe, an ``on_unsubscribe()``
callback will be generated.
......@@ -745,8 +748,8 @@ The value of rc indicates success or not:
5: Connection refused - not authorised
6-255: Currently unused.
Example
.......
On Connect Example
..................
::
......@@ -779,8 +782,8 @@ The rc parameter indicates the disconnection state. If ``MQTT_ERR_SUCCESS``
other value the disconnection was unexpected, such as might be caused by a
network error.
Example
.......
On Disconnect Example
.....................
::
......@@ -812,8 +815,8 @@ userdata
message
an instance of MQTTMessage. This is a class with members ``topic``, ``payload``, ``qos``, ``retain``.
Example
.......
On Message Example
..................
::
......@@ -1012,8 +1015,8 @@ Publish a single message to a broker, then disconnect cleanly.
protocol=mqtt.MQTTv311, transport="tcp")
Function arguments
''''''''''''''''''
Publish Single Function arguments
'''''''''''''''''''''''''''''''''
topic
the only required argument must be the topic string to which the payload
......@@ -1077,8 +1080,9 @@ protocol
transport
set to "websockets" to send MQTT over WebSockets. Leave at the default of
"tcp" to use raw TCP.
Example
'''''''
Publish Single Example
''''''''''''''''''''''
::
......@@ -1096,8 +1100,8 @@ Publish multiple messages to a broker, then disconnect cleanly.
multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,
will=None, auth=None, tls=None, protocol=mqtt.MQTTv311, transport="tcp")
Function arguments
''''''''''''''''''
Publish Multiple Function arguments
'''''''''''''''''''''''''''''''''''
msgs
a list of messages to publish. Each message is either a dict or a tuple.
......@@ -1116,8 +1120,8 @@ msgs
See ``single()`` for the description of ``hostname``, ``port``, ``client_id``, ``keepalive``, ``will``, ``auth``, ``tls``, ``protocol``, ``transport``.
Example
'''''''
Publish Multiple Example
''''''''''''''''''''''''
::
......@@ -1149,8 +1153,8 @@ blocking function.
protocol=mqtt.MQTTv311)
Function arguments
''''''''''''''''''
Simple Subscribe Function arguments
'''''''''''''''''''''''''''''''''''
topics
the only required argument is the topic string to which the client will
......@@ -1217,8 +1221,8 @@ protocol
choose the version of the MQTT protocol to use. Use either ``MQTTv31`` or ``MQTTv311``.
Example
'''''''
Simple Example
''''''''''''''
::
......@@ -1227,8 +1231,8 @@ Example
msg = subscribe.simple("paho/test/simple", hostname="iot.eclipse.org")
print("%s %s" % (msg.topic, msg.payload))
Callback
````````
Using Callback
``````````````
Subscribe to a set of topics and process the messages received using a user
provided callback.
......@@ -1239,8 +1243,8 @@ provided callback.
port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,
protocol=mqtt.MQTTv311)
Function arguments
''''''''''''''''''
Callback Subscribe Function arguments
'''''''''''''''''''''''''''''''''''''
callback
an "on_message" callback that will be used for each message received, and
......@@ -1256,13 +1260,13 @@ qos
the qos to use when subscribing, defaults to 0.
userdata
a user provided obkect that will be passed to the on_message callback when
a user provided object that will be passed to the on_message callback when
a message is received.
See ``simple()`` for the description of ``hostname``, ``port``, ``client_id``, ``keepalive``, ``will``, ``auth``, ``tls``, ``protocol``.
Example
'''''''
Callback Example
''''''''''''''''
::
......
__version__ = "1.2"
__version__ = "1.2.1"
class MQTTException(Exception):
def __init__(self, *args, **kwargs):
......
......@@ -22,6 +22,7 @@ import platform
import random
import select
import socket
try:
import ssl
except ImportError:
......@@ -37,6 +38,7 @@ import base64
import string
import hashlib
import logging
try:
# Use monotonic clock if available
time_func = time.monotonic
......@@ -57,10 +59,10 @@ if platform.system() == 'Windows':
else:
EAGAIN = errno.EAGAIN
VERSION_MAJOR=1
VERSION_MINOR=2
VERSION_REVISION=0
VERSION_NUMBER=(VERSION_MAJOR*1000000+VERSION_MINOR*1000+VERSION_REVISION)
VERSION_MAJOR = 1
VERSION_MINOR = 2
VERSION_REVISION = 1
VERSION_NUMBER = (VERSION_MAJOR * 1000000 + VERSION_MINOR * 1000 + VERSION_REVISION)
MQTTv31 = 3
MQTTv311 = 4
......@@ -68,13 +70,11 @@ MQTTv311 = 4
PROTOCOL_NAMEv31 = "MQIsdp"
PROTOCOL_NAMEv311 = "MQTT"
if sys.version_info[0] >= 3:
# define some alias for python2 compatibility
unicode = str
basestring = str
# Message types
CONNECT = 0x10
CONNACK = 0x20
......@@ -121,7 +121,7 @@ mqtt_cs_connect_async = 3
# Message state
mqtt_ms_invalid = 0
mqtt_ms_publish= 1
mqtt_ms_publish = 1
mqtt_ms_wait_for_puback = 2
mqtt_ms_wait_for_pubrec = 3
mqtt_ms_resend_pubrel = 4
......@@ -155,6 +155,11 @@ if sys.version_info[0] < 3:
else:
sockpair_data = b"0"
class WebsocketConnectionError(ValueError):
pass
def error_string(mqtt_errno):
"""Return the error string associated with an mqtt error number."""
if mqtt_errno == MQTT_ERR_SUCCESS:
......@@ -209,7 +214,7 @@ def connack_string(connack_code):
return "Connection Refused: unknown reason."
def base62(num, base=string.digits+string.ascii_letters, padding=1):
def base62(num, base=string.digits + string.ascii_letters, padding=1):
"""Convert a number to base-62 representation."""
assert num >= 0
digits = []
......@@ -325,26 +330,39 @@ class MQTTMessage:
Members:
topic : String. topic that the message was published on.
topic : String/bytes. topic that the message was published on.
payload : String/bytes the message payload.
qos : Integer. The message Quality of Service 0, 1 or 2.
retain : Boolean. If true, the message is a retained message and not fresh.
mid : Integer. The message id.
On Python 3, topic must be bytes.
"""
__slots__ = 'timestamp', 'state', 'dup', 'mid', 'topic', 'payload', 'qos', 'retain', 'info'
__slots__ = 'timestamp', 'state', 'dup', 'mid', '_topic', 'payload', 'qos', 'retain', 'info'
def __init__(self, mid=0, topic=b""):
self.timestamp = 0
self.state = mqtt_ms_invalid
self.dup = False
self.mid = mid
self.topic = topic
self._topic = topic
self.payload = b""
self.qos = 0
self.retain = False
self.info = MQTTMessageInfo(mid)
@property
def topic(self):
if sys.version_info[0] >= 3:
return self._topic.decode('utf-8')
else:
return self._topic
@topic.setter
def topic(self, value):
self._topic = value
class Client(object):
"""MQTT version 3.1/3.1.1 client class.
......@@ -774,7 +792,8 @@ class Client(object):
if keepalive < 0:
raise ValueError('Keepalive must be >=0.')
if bind_address != "" and bind_address is not None:
if (sys.version_info[0] == 2 and sys.version_info[1] < 7) or (sys.version_info[0] == 3 and sys.version_info[1] < 2):
if (sys.version_info[0] == 2 and sys.version_info[1] < 7) or (
sys.version_info[0] == 3 and sys.version_info[1] < 2):
raise ValueError('bind_address requires Python 2.7 or 3.2.')
self._host = host
......@@ -830,7 +849,8 @@ class Client(object):
self._messages_reconnect_reset()
try:
if (sys.version_info[0] == 2 and sys.version_info[1] < 7) or (sys.version_info[0] == 3 and sys.version_info[1] < 2):
if (sys.version_info[0] == 2 and sys.version_info[1] < 7) or (
sys.version_info[0] == 3 and sys.version_info[1] < 2):
sock = socket.create_connection((self._host, self._port))
else:
sock = socket.create_connection((self._host, self._port), source_address=(self._bind_address, 0))
......@@ -861,6 +881,7 @@ class Client(object):
ssl.match_hostname(sock.getpeercert(), self._host)
if self._transport == "websockets":
sock.settimeout(self._keepalive)
sock = WebsocketWrapper(sock, self._host, self._port, self._ssl)
self._sock = sock
......@@ -903,6 +924,15 @@ class Client(object):
self._out_packet_mutex.release()
self._current_out_packet_mutex.release()
# used to check if there are any bytes left in the (SSL) socket
pending_bytes = 0
if hasattr(self._sock, 'pending'):
pending_bytes = self._sock.pending()
# if bytes are pending do not wait in select
if pending_bytes > 0:
timeout = 0.0
# sockpairR is used to break out of select() before the timeout, on a
# call to publish() etc.
rlist = [self._sock, self._sockpairR]
......@@ -921,7 +951,7 @@ class Client(object):
except:
return MQTT_ERR_UNKNOWN
if self._sock in socklist[0]:
if self._sock in socklist[0] or pending_bytes > 0:
rc = self.loop_read(max_packets)
if rc or self._sock is None:
return rc
......@@ -1033,7 +1063,8 @@ class Client(object):
message.state = mqtt_ms_wait_for_pubrec
self._out_message_mutex.release()
rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain, message.dup)
rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain,
message.dup)
# remove from inflight messages so it will be send after a connection is made
if rc is MQTT_ERR_NO_CONN:
......@@ -1058,7 +1089,7 @@ class Client(object):
username: The username to authenticate with. Need have no relationship to the client id.
password: The password to authenticate with. Optional, set to None if not required.
"""
self._username = username.encode('utf-8')
self._username = username
self._password = password
if isinstance(self._password, unicode):
self._password = self._password.encode('utf-8')
......@@ -1388,7 +1419,7 @@ class Client(object):
if self._state == mqtt_cs_connect_async:
try:
self.reconnect()
except socket.error:
except (socket.error, WebsocketConnectionError):
if not retry_first_connection:
raise
self._easy_log(MQTT_LOG_DEBUG, "Connection failed, retrying")
......@@ -1409,7 +1440,6 @@ class Client(object):
and self._current_out_packet is None
and len(self._out_packet) == 0
and len(self._out_messages) == 0):
rc = 1
run = False
......@@ -1429,7 +1459,7 @@ class Client(object):
self._state_mutex.release()
try:
self.reconnect()
except socket.error as err:
except (socket.error, WebsocketConnectionError) as err:
pass
return rc
......@@ -1734,7 +1764,10 @@ class Client(object):
print(err)
return 1
else:
byte, = struct.unpack("!B", byte)
if len(byte) == 0:
return 1
byte = struct.unpack("!B", byte)
byte = byte[0]
self._in_packet['remaining_count'].append(byte)
# Max 4 bytes length for remaining length as defined by protocol.
# Anything more likely means a broken/malicious client.
......@@ -1761,8 +1794,10 @@ class Client(object):
print(err)
return 1
else:
self._in_packet['to_process'] -= len(data)
self._in_packet['packet'] += data
if len(data) == 0:
return 1
self._in_packet['to_process'] = self._in_packet['to_process'] - len(data)
self._in_packet['packet'] = self._in_packet['packet'] + data
# All data for this packet is read.
self._in_packet['pos'] = 0
......@@ -2033,7 +2068,6 @@ class Client(object):
protocol = PROTOCOL_NAMEv311
proto_ver = 4
protocol = protocol.encode('utf-8')
remaining_length = 2 + len(protocol) + 1 + 1 + 2 + 2 + len(self._client_id)
connect_flags = 0
if clean_session:
......@@ -2043,19 +2077,22 @@ class Client(object):
remaining_length += 2 + len(self._will_topic) + 2 + len(self._will_payload)
connect_flags |= 0x04 | ((self._will_qos & 0x03) << 3) | ((self._will_retain & 0x01) << 5)
if self._username is not None:
remaining_length += 2 + len(self._username)
connect_flags |= 0x80
if self._password is not None:
connect_flags |= 0x40
remaining_length += 2 + len(self._password)
if self._username:
uusername = self._username.encode('utf-8')
remaining_length = remaining_length + 2 + len(uusername)
connect_flags = connect_flags | 0x80
if self._password:
connect_flags = connect_flags | 0x40
upassword = self._password # already encoded
remaining_length = remaining_length + 2 + len(upassword)
command = CONNECT
packet = bytearray()
packet.append(command)
self._pack_remaining_length(packet, remaining_length)
packet.extend(struct.pack("!H"+str(len(protocol))+"sBBH", len(protocol), protocol, proto_ver, connect_flags, keepalive))
packet.extend(struct.pack("!H" + str(len(protocol)) + "sBBH", len(protocol), protocol, proto_ver, connect_flags,
keepalive))
self._pack_str16(packet, self._client_id)
......@@ -2384,13 +2421,24 @@ class Client(object):
pack_format = "!H" + str(len(self._in_packet['packet']) - 2) + 's'
(slen, packet) = struct.unpack(pack_format, self._in_packet['packet'])
pack_format = '!' + str(slen) + 's' + str(len(packet) - slen) + 's'
(message.topic, packet) = struct.unpack(pack_format, packet)
(topic, packet) = struct.unpack(pack_format, packet)
if len(message.topic) == 0:
if len(topic) == 0:
return MQTT_ERR_PROTOCOL
# Handle topics with invalid UTF-8
# This replaces an invalid topic with a message and the hex
# representation of the topic for logging. When the user attempts to
# access message.topic in the callback, an exception will be raised.
if sys.version_info[0] >= 3:
message.topic = message.topic.decode('utf-8')
try:
print_topic = topic.decode('utf-8')
except UnicodeDecodeError:
print_topic = "TOPIC WITH INVALID UTF-8: " + str(topic)
else:
print_topic = topic
message.topic = topic
if message.qos > 0:
pack_format = "!H" + str(len(packet) - 2) + 's'
......@@ -2617,7 +2665,8 @@ class WebsocketWrapper:
b"Upgrade: websocket\r\n" + \
b"Connection: Upgrade\r\n" + \
b"Host: " + str(self._host).encode('utf-8') + b":" + str(self._port).encode('utf-8') + b"\r\n" + \
b"Origin: http://" + str(self._host).encode('utf-8') + b":" + str(self._port).encode('utf-8') + b"\r\n" +\
b"Origin: http://" + str(self._host).encode('utf-8') + b":" + str(self._port).encode(
'utf-8') + b"\r\n" + \
b"Sec-WebSocket-Key: " + sec_websocket_key + b"\r\n" + \
b"Sec-WebSocket-Version: 13\r\n" + \
b"Sec-WebSocket-Protocol: mqtt\r\n\r\n"
......@@ -2638,7 +2687,7 @@ class WebsocketWrapper:
# check upgrade
if b"connection" in str(self._readbuffer).lower().encode('utf-8'):
if b"upgrade" not in str(self._readbuffer).lower().encode('utf-8'):
raise ValueError("WebSocket handshake error, connection not upgraded")
raise WebsocketConnectionError("WebSocket handshake error, connection not upgraded")
else:
has_upgrade = True
......@@ -2654,7 +2703,7 @@ class WebsocketWrapper:
client_hash = base64.b64encode(client_hash.digest())
if server_hash != client_hash:
raise ValueError("WebSocket handshake error, invalid secret key")
raise WebsocketConnectionError("WebSocket handshake error, invalid secret key")
else:
has_secret = True
else:
......@@ -2666,10 +2715,10 @@ class WebsocketWrapper:
# connection reset
elif not byte:
raise ValueError("WebSocket handshake error")
raise WebsocketConnectionError("WebSocket handshake error")
if not has_upgrade or not has_secret:
raise ValueError("WebSocket handshake error")
raise WebsocketConnectionError("WebSocket handshake error")
self._readbuffer = bytearray()
self.connected = True
......@@ -2678,7 +2727,8 @@ class WebsocketWrapper:
header = bytearray()
length = len(data)
mask_key = bytearray([random.randint(0, 255), random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)])
mask_key = bytearray(
[random.randint(0, 255), random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)])
mask_flag = do_masking
# 1 << 7 is the final flag, we don't send continuated data
......@@ -2758,7 +2808,6 @@ class WebsocketWrapper:
# read mask
if maskbit:
mask_key = self._buffered_read(4)
# if frame payload is shorter than the requested data, read only the possible part
......@@ -2767,7 +2816,6 @@ class WebsocketWrapper:
readindex = payload_length
if readindex > 0:
# get payload chunk
payload = self._buffered_read(readindex)
......@@ -2778,6 +2826,8 @@ class WebsocketWrapper:
result = payload[chunk_startindex:readindex]
self._payload_head = readindex
else:
payload = bytearray()
# check if full frame arrived and reset readbuffer and payloadhead if needed
if readindex == payload_length:
......@@ -2811,7 +2861,6 @@ class WebsocketWrapper:
# if previous frame was sent successfully
if len(self._sendbuffer) == 0:
# create websocket frame
frame = self._create_frame(WebsocketWrapper.OPCODE_BINARY, bytearray(data))
self._sendbuffer.extend(frame)
......
......@@ -40,9 +40,7 @@ def _on_connect(client, userdata, flags, rc):
"""Internal callback"""
#pylint: disable=invalid-name, unused-argument
if rc == 0:
_do_publish(client)
else:
if rc != 0:
raise mqtt.MQTTException(paho.connack_string(rc))
......@@ -143,6 +141,7 @@ def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,
client.tls_set_context(tls)
client.connect(hostname, port, keepalive)
_do_publish(client)
client.loop_forever()
......
......@@ -10,7 +10,10 @@ import paho_test
rc = 1
keepalive = 60
connect_packet = paho_test.gen_connect("01-unpwd-set", keepalive=keepalive, username="uname", password=";'[08gn=#")
username = "uname"
password = ";'[08gn=#"
connect_packet = paho_test.gen_connect(
"01-unpwd-set", keepalive=keepalive, username=username, password=password)
sock = paho_test.create_server_socket()
......@@ -30,4 +33,3 @@ finally:
sock.close()
exit(rc)
#!/usr/bin/env python
# Test whether a client produces a correct connect with a unicode username and password.
# The client should connect to port 1888 with keepalive=60, clean session set,
# client id 01-unpwd-unicode-set, username and password from corresponding variables
from __future__ import unicode_literals
import context
import paho_test
rc = 1
keepalive = 60
username = "\u00fas\u00e9rn\u00e1m\u00e9-h\u00e9ll\u00f3"
password = "h\u00e9ll\u00f3"
connect_packet = paho_test.gen_connect(
"01-unpwd-unicode-set", keepalive=keepalive, username=username, password=password)
sock = paho_test.create_server_socket()
client = context.start_client()
try:
(conn, address) = sock.accept()
conn.settimeout(10)
if paho_test.expect_packet(conn, "connect", connect_packet):
rc = 0
conn.close()
finally:
client.terminate()
client.wait()
sock.close()
exit(rc)
......@@ -8,6 +8,7 @@ test :
python ./01-con-discon-success-mqtt311.py python/01-con-discon-success-mqtt311.test
python ./01-will-set.py python/01-will-set.test
python ./01-unpwd-set.py python/01-unpwd-set.test
python ./01-unpwd-unicode-set.py python/01-unpwd-unicode-set.test
python ./01-will-unpwd-set.py python/01-will-unpwd-set.test
python ./01-zero-length-clientid.py python/01-zero-length-clientid.test
python ./01-no-clean-session.py python/01-no-clean-session.test
......
#!/usr/bin/env python
from __future__ import unicode_literals
import paho.mqtt.client as mqtt
mqttc = mqtt.Client("01-unpwd-unicode-set")
run = -1
username = "\u00fas\u00e9rn\u00e1m\u00e9-h\u00e9ll\u00f3"
password = "h\u00e9ll\u00f3"
mqttc.username_pw_set(username, password)
mqttc.connect("localhost", 1888)
while run == -1:
mqttc.loop()
exit(run)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册