提交 73b85d24 编写于 作者: R Roger A. Light

Merge branch '1.1'

Conflicts:
	ChangeLog.txt

Change-Id: Ib629fe5533b132117e56c1624befd19a7909a575
v1.0.3 - 2014-11-XX v1.1 - 2015-01-30
=================== =================
- Add support for wildcard certificates. Closes #440547.
- Default connection behaviour has been reverted to MQTT v3.1 instead of
v3.1.1. There is as yet insufficient support for v3.1.1 to rely on, and
current v3.1 implementations do not return the correct CONNACK code to allow
detection of the fault. Closes #451735.
- Fix incorrect handling of queued messages after reconnecting. Closes
#452672.
- Fix possible race condition if the connection in loop_start() does not - Fix possible race condition if the connection in loop_start() does not
complete before loop_stop() is called, meaning the network thread never complete before loop_stop() is called, meaning the network thread never
ends. Closes #448428. Thanks to Kees Bakker. ends. Closes #448428. Thanks to Kees Bakker.
v1.0.2 - 2014-09-13 v1.0.2 - 2014-09-13
=================== ===================
......
...@@ -434,10 +434,12 @@ loop_forever() ...@@ -434,10 +434,12 @@ loop_forever()
:: ::
loop_forever(timeout=1.0, max_packets=1) loop_forever(timeout=1.0, max_packets=1, retry_first_connection=False)
This is a blocking form of the network loop and will not return until the client calls ``disconnect()``. It automatically handles reconnecting. This is a blocking form of the network loop and will not return until the client calls ``disconnect()``. It automatically handles reconnecting.
Except for the first connection attempt when using connect_async, use ``retry_first_connection=True`` to make it retry the first connection. Warning: This might lead to situations where the client keeps connecting to an non existing host without failing.
The ``timeout`` and ``max_packets`` arguments are obsolete and should be left unset. The ``timeout`` and ``max_packets`` arguments are obsolete and should be left unset.
Publishing Publishing
......
...@@ -94,7 +94,8 @@ def main(argv): ...@@ -94,7 +94,8 @@ def main(argv):
verbose = True verbose = True
if topic == None: if topic == None:
print("You must provide a topic to clear.") print("You must provide a topic to clear.\n")
print_usage()
sys.exit(2) sys.exit(2)
mqttc = mqtt.Client(client_id) mqttc = mqtt.Client(client_id)
......
__version__ = "1.0.2" __version__ = "1.1"
...@@ -387,7 +387,7 @@ class Client(object): ...@@ -387,7 +387,7 @@ class Client(object):
MQTT_LOG_ERR, and MQTT_LOG_DEBUG. The message itself is in buf. MQTT_LOG_ERR, and MQTT_LOG_DEBUG. The message itself is in buf.
""" """
def __init__(self, client_id="", clean_session=True, userdata=None, protocol=MQTTv311): def __init__(self, client_id="", clean_session=True, userdata=None, protocol=MQTTv31):
"""client_id is the unique client id string used when connecting to the """client_id is the unique client id string used when connecting to the
broker. If client_id is zero length or None, then one will be randomly broker. If client_id is zero length or None, then one will be randomly
generated. In this case, clean_session must be True. If this is not the generated. In this case, clean_session must be True. If this is not the
...@@ -408,7 +408,7 @@ class Client(object): ...@@ -408,7 +408,7 @@ class Client(object):
The protocol argument allows explicit setting of the MQTT version to The protocol argument allows explicit setting of the MQTT version to
use for this client. Can be paho.mqtt.client.MQTTv311 (v3.1.1) or use for this client. Can be paho.mqtt.client.MQTTv311 (v3.1.1) or
paho.mqtt.client.MQTTv31 (v3.1), with the default being v3.1.1. If the paho.mqtt.client.MQTTv31 (v3.1), with the default being v3.1. If the
broker reports that the client connected with an invalid protocol broker reports that the client connected with an invalid protocol
version, the client will automatically attempt to reconnect using v3.1 version, the client will automatically attempt to reconnect using v3.1
instead. instead.
...@@ -1090,7 +1090,7 @@ class Client(object): ...@@ -1090,7 +1090,7 @@ class Client(object):
if self._sock is None and self._ssl is None: if self._sock is None and self._ssl is None:
return MQTT_ERR_NO_CONN return MQTT_ERR_NO_CONN
max_packets = len(self._out_messages) + len(self._in_messages) max_packets = len(self._out_packet) + 1
if max_packets < 1: if max_packets < 1:
max_packets = 1 max_packets = 1
...@@ -1224,17 +1224,36 @@ class Client(object): ...@@ -1224,17 +1224,36 @@ class Client(object):
else: else:
return self._sock return self._sock
def loop_forever(self, timeout=1.0, max_packets=1): def loop_forever(self, timeout=1.0, max_packets=1, retry_first_connection=False):
"""This function call loop() for you in an infinite blocking loop. It """This function call loop() for you in an infinite blocking loop. It
is useful for the case where you only want to run the MQTT client loop is useful for the case where you only want to run the MQTT client loop
in your program. in your program.
loop_forever() will handle reconnecting for you. If you call loop_forever() will handle reconnecting for you. If you call
disconnect() in a callback it will return.""" disconnect() in a callback it will return.
timeout: The time in seconds to wait for incoming/outgoing network
traffic before timing out and returning.
max_packets: Not currently used.
retry_first_connection: Should the first connection attempt be retried on failure.
Raises socket.error on first connection failures unless retry_first_connection=True
"""
run = True run = True
if self._state == mqtt_cs_connect_async:
self.reconnect() while run:
if self._state == mqtt_cs_connect_async:
try:
self.reconnect()
except socket.error:
if not retry_first_connection:
raise
self._easy_log(MQTT_LOG_DEBUG, "Connection failed, retrying")
time.sleep(1)
else:
break
while run: while run:
rc = MQTT_ERR_SUCCESS rc = MQTT_ERR_SUCCESS
...@@ -1997,11 +2016,14 @@ class Client(object): ...@@ -1997,11 +2016,14 @@ class Client(object):
for m in self._out_messages: for m in self._out_messages:
m.timestamp = time.time() m.timestamp = time.time()
if m.state == mqtt_ms_queued: if m.state == mqtt_ms_queued:
self.loop_write() # Process outgoing messages that have just been queued up
self._out_message_mutex.release() self._out_message_mutex.release()
return MQTT_ERR_SUCCESS return MQTT_ERR_SUCCESS
if m.qos == 0: if m.qos == 0:
self._in_callback = True # Don't call loop_write after _send_publish()
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
self._in_callback = False
if rc != 0: if rc != 0:
self._out_message_mutex.release() self._out_message_mutex.release()
return rc return rc
...@@ -2009,7 +2031,9 @@ class Client(object): ...@@ -2009,7 +2031,9 @@ class Client(object):
if m.state == mqtt_ms_publish: if m.state == mqtt_ms_publish:
self._inflight_messages = self._inflight_messages + 1 self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_puback m.state = mqtt_ms_wait_for_puback
self._in_callback = True # Don't call loop_write after _send_publish()
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
self._in_callback = False
if rc != 0: if rc != 0:
self._out_message_mutex.release() self._out_message_mutex.release()
return rc return rc
...@@ -2017,17 +2041,22 @@ class Client(object): ...@@ -2017,17 +2041,22 @@ class Client(object):
if m.state == mqtt_ms_publish: if m.state == mqtt_ms_publish:
self._inflight_messages = self._inflight_messages + 1 self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_pubrec m.state = mqtt_ms_wait_for_pubrec
self._in_callback = True # Don't call loop_write after _send_publish()
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup) rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
self._in_callback = False
if rc != 0: if rc != 0:
self._out_message_mutex.release() self._out_message_mutex.release()
return rc return rc
elif m.state == mqtt_ms_resend_pubrel: elif m.state == mqtt_ms_resend_pubrel:
self._inflight_messages = self._inflight_messages + 1 self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_pubcomp m.state = mqtt_ms_wait_for_pubcomp
self._in_callback = True # Don't call loop_write after _send_pubrel()
rc = self._send_pubrel(m.mid, m.dup) rc = self._send_pubrel(m.mid, m.dup)
self._in_callback = False
if rc != 0: if rc != 0:
self._out_message_mutex.release() self._out_message_mutex.release()
return rc return rc
self.loop_write() # Process outgoing messages that have just been queued up
self._out_message_mutex.release() self._out_message_mutex.release()
return rc return rc
elif result > 0 and result < 6: elif result > 0 and result < 6:
...@@ -2257,6 +2286,23 @@ class Client(object): ...@@ -2257,6 +2286,23 @@ class Client(object):
self.loop_forever() self.loop_forever()
def _host_matches_cert(self, host, cert_host):
if cert_host[0:2] == "*.":
if cert_host.count("*") != 1:
return False
host_match = host.split(".", 1)[1]
cert_match = cert_host.split(".", 1)[1]
if host_match == cert_match:
return True
else:
return False
else:
if host == cert_host:
return True
else:
return False
def _tls_match_hostname(self): def _tls_match_hostname(self):
cert = self._ssl.getpeercert() cert = self._ssl.getpeercert()
san = cert.get('subjectAltName') san = cert.get('subjectAltName')
...@@ -2265,7 +2311,7 @@ class Client(object): ...@@ -2265,7 +2311,7 @@ class Client(object):
for (key, value) in san: for (key, value) in san:
if key == 'DNS': if key == 'DNS':
have_san_dns = True have_san_dns = True
if value.lower() == self._host.lower(): if self._host_matches_cert(self._host.lower(), value.lower()) == True:
return return
if key == 'IP Address': if key == 'IP Address':
have_san_dns = True have_san_dns = True
...@@ -2279,7 +2325,7 @@ class Client(object): ...@@ -2279,7 +2325,7 @@ class Client(object):
if subject: if subject:
for ((key, value),) in subject: for ((key, value),) in subject:
if key == 'commonName': if key == 'commonName':
if value.lower() == self._host.lower(): if self._host_matches_cert(self._host.lower(), value.lower()) == True:
return return
raise ssl.SSLError('Certificate subject does not match remote hostname.') raise ssl.SSLError('Certificate subject does not match remote hostname.')
......
...@@ -62,7 +62,7 @@ def _on_publish(c, userdata, mid): ...@@ -62,7 +62,7 @@ def _on_publish(c, userdata, mid):
def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60, def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,
will=None, auth=None, tls=None, protocol=mqtt.MQTTv311): will=None, auth=None, tls=None, protocol=mqtt.MQTTv31):
"""Publish multiple messages to a broker, then disconnect cleanly. """Publish multiple messages to a broker, then disconnect cleanly.
This function creates an MQTT client, connects to a broker and publishes a This function creates an MQTT client, connects to a broker and publishes a
...@@ -172,7 +172,7 @@ def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60, ...@@ -172,7 +172,7 @@ def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,
def single(topic, payload=None, qos=0, retain=False, hostname="localhost", def single(topic, payload=None, qos=0, retain=False, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None, port=1883, client_id="", keepalive=60, will=None, auth=None,
tls=None, protocol=mqtt.MQTTv311): tls=None, protocol=mqtt.MQTTv31):
"""Publish a single message to a broker, then disconnect cleanly. """Publish a single message to a broker, then disconnect cleanly.
This function creates an MQTT client, connects to a broker and publishes a This function creates an MQTT client, connects to a broker and publishes a
......
#!/usr/bin/env python
# Test whether a client produces a correct connect and subsequent disconnect.
# The client should connect to port 1888 with keepalive=60, clean session set,
# and client id 01-con-discon-success
# The test will send a CONNACK message to the client with rc=0. Upon receiving
# the CONNACK and verifying that rc=0, the client should send a DISCONNECT
# message. If rc!=0, the client should exit with an error.
import inspect
import os
import subprocess
import socket
import sys
import time
# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"..")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
import paho_test
rc = 1
keepalive = 60
connect_packet = paho_test.gen_connect("01-con-discon-success", keepalive=keepalive, proto_name="MQTT", proto_ver=4)
connack_packet = paho_test.gen_connack(rc=0)
disconnect_packet = paho_test.gen_disconnect()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(10)
sock.bind(('', 1888))
sock.listen(5)
client_args = sys.argv[1:]
env = dict(os.environ)
try:
pp = env['PYTHONPATH']
except KeyError:
pp = ''
env['PYTHONPATH'] = '../../src:'+pp
client = subprocess.Popen(client_args, env=env)
try:
(conn, address) = sock.accept()
conn.settimeout(10)
if paho_test.expect_packet(conn, "connect", connect_packet):
conn.send(connack_packet)
if paho_test.expect_packet(conn, "disconnect", disconnect_packet):
rc = 0
conn.close()
finally:
client.terminate()
client.wait()
sock.close()
exit(rc)
...@@ -7,6 +7,7 @@ test : python python3 ...@@ -7,6 +7,7 @@ test : python python3
python python3 : python python3 :
./01-con-discon-success.py $@/01-con-discon-success.test ./01-con-discon-success.py $@/01-con-discon-success.test
./01-con-discon-success-mqtt311.py $@/01-con-discon-success-mqtt311.test
./01-will-set.py $@/01-will-set.test ./01-will-set.py $@/01-will-set.test
./01-unpwd-set.py $@/01-unpwd-set.test ./01-unpwd-set.py $@/01-unpwd-set.test
./01-will-unpwd-set.py $@/01-will-unpwd-set.test ./01-will-unpwd-set.py $@/01-will-unpwd-set.test
......
#!/usr/bin/env python
import os
import subprocess
import socket
import sys
import time
from struct import *
import paho.mqtt.client as mqtt
def on_connect(mqttc, obj, flags, rc):
if rc != 0:
exit(rc)
else:
mqttc.disconnect()
def on_disconnect(mqttc, obj, rc):
mqttc.loop()
obj = rc
run = -1
mqttc = mqtt.Client("01-con-discon-success", run, protocol=mqtt.MQTTv311)
mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect
mqttc.connect("localhost", 1888)
while run == -1:
mqttc.loop()
exit(run)
#!/usr/bin/env python3
import os
import subprocess
import socket
import sys
import time
from struct import *
import paho.mqtt.client as mqtt
def on_connect(mqttc, obj, flags, rc):
if rc != 0:
exit(rc)
else:
mqttc.disconnect()
def on_disconnect(mqttc, obj, rc):
mqttc.loop()
obj = rc
run = -1
mqttc = mqtt.Client("01-con-discon-success", run, protocol=mqtt.MQTTv311)
mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect
mqttc.connect("localhost", 1888)
while run == -1:
mqttc.loop()
exit(run)
...@@ -201,7 +201,7 @@ def to_string(packet): ...@@ -201,7 +201,7 @@ def to_string(packet):
# Reserved # Reserved
return "0xF0" return "0xF0"
def gen_connect(client_id, clean_session=True, keepalive=60, username=None, password=None, will_topic=None, will_qos=0, will_retain=False, will_payload="", proto_name="MQTT", proto_ver=4): def gen_connect(client_id, clean_session=True, keepalive=60, username=None, password=None, will_topic=None, will_qos=0, will_retain=False, will_payload="", proto_name="MQIsdp", proto_ver=3):
if client_id == None: if client_id == None:
remaining_length = 12 remaining_length = 12
else: else:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册