client.py 108.4 KB
Newer Older
1
# Copyright (c) 2012-2014 Roger Light <roger@atchoo.org>
R
Roger Light 已提交
2
#
3 4
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
R
Roger Light 已提交
5
# and Eclipse Distribution License v1.0 which accompany this distribution.
R
Roger Light 已提交
6
#
R
Roger Light 已提交
7
# The Eclipse Public License is available at
8
#    http://www.eclipse.org/legal/epl-v10.html
R
Roger Light 已提交
9
# and the Eclipse Distribution License is available at
10 11 12 13
#   http://www.eclipse.org/org/documents/edl-v10.php.
#
# Contributors:
#    Roger Light - initial API and implementation
R
Roger Light 已提交
14 15 16 17 18 19

"""
This is an MQTT v3.1 client module. MQTT is a lightweight pub/sub messaging
protocol that is easy to implement and suitable for low powered devices.
"""
import errno
R
Roger Light 已提交
20
import platform
R
Roger Light 已提交
21 22 23
import random
import select
import socket
24 25 26 27 28 29 30 31 32
HAVE_SSL = True
try:
    import ssl
    cert_reqs = ssl.CERT_REQUIRED
    tls_version = ssl.PROTOCOL_TLSv1
except:
    HAVE_SSL = False
    cert_reqs = None
    tls_version = None
R
Roger Light 已提交
33 34 35
import struct
import sys
import threading
36

R
Roger Light 已提交
37
import time
M
Milan Toth 已提交
38 39
import uuid
import base64
40
import string
M
Milan Toth 已提交
41
import hashlib
42
import logging
43 44 45 46 47
try:
    # Use monotionic clock if available
    time_func = time.monotonic
except AttributeError:
    time_func = time.time
M
Milan Toth 已提交
48

J
Jan-Piet Mens 已提交
49 50 51 52 53
HAVE_DNS = True
try:
    import dns.resolver
except ImportError:
    HAVE_DNS = False
R
Roger Light 已提交
54

55 56
from .matcher import MQTTMatcher

R
Roger Light 已提交
57 58 59 60 61
if platform.system() == 'Windows':
    EAGAIN = errno.WSAEWOULDBLOCK
else:
    EAGAIN = errno.EAGAIN

R
Roger A. Light 已提交
62
VERSION_MAJOR=1
R
Roger A. Light 已提交
63
VERSION_MINOR=2
R
Roger A. Light 已提交
64
VERSION_REVISION=0
R
Roger Light 已提交
65 66
VERSION_NUMBER=(VERSION_MAJOR*1000000+VERSION_MINOR*1000+VERSION_REVISION)

R
Roger Light 已提交
67 68 69
MQTTv31 = 3
MQTTv311 = 4

70 71
PROTOCOL_NAMEv31 = "MQIsdp"
PROTOCOL_NAMEv311 = "MQTT"
R
Roger Light 已提交
72

R
Roger Light 已提交
73
# Message types
R
Roger Light 已提交
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
CONNECT = 0x10
CONNACK = 0x20
PUBLISH = 0x30
PUBACK = 0x40
PUBREC = 0x50
PUBREL = 0x60
PUBCOMP = 0x70
SUBSCRIBE = 0x80
SUBACK = 0x90
UNSUBSCRIBE = 0xA0
UNSUBACK = 0xB0
PINGREQ = 0xC0
PINGRESP = 0xD0
DISCONNECT = 0xE0

# Log levels
90 91 92 93 94
MQTT_LOG_INFO = 0x01
MQTT_LOG_NOTICE = 0x02
MQTT_LOG_WARNING = 0x04
MQTT_LOG_ERR = 0x08
MQTT_LOG_DEBUG = 0x10
95 96 97 98 99 100 101
LOGGING_LEVEL = {
    MQTT_LOG_DEBUG: logging.DEBUG,
    MQTT_LOG_INFO: logging.INFO,
    MQTT_LOG_NOTICE: logging.INFO,  # This has no direct equivalent level
    MQTT_LOG_WARNING: logging.WARNING,
    MQTT_LOG_ERR: logging.ERROR,
}
R
Roger Light 已提交
102 103 104 105 106 107 108 109 110 111

# CONNACK codes
CONNACK_ACCEPTED = 0
CONNACK_REFUSED_PROTOCOL_VERSION = 1
CONNACK_REFUSED_IDENTIFIER_REJECTED = 2
CONNACK_REFUSED_SERVER_UNAVAILABLE = 3
CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4
CONNACK_REFUSED_NOT_AUTHORIZED = 5

# Connection state
112 113 114 115
mqtt_cs_new = 0
mqtt_cs_connected = 1
mqtt_cs_disconnecting = 2
mqtt_cs_connect_async = 3
R
Roger Light 已提交
116 117

# Message state
R
Roger A. Light 已提交
118 119 120 121 122 123 124 125 126 127
mqtt_ms_invalid = 0
mqtt_ms_publish= 1
mqtt_ms_wait_for_puback = 2
mqtt_ms_wait_for_pubrec = 3
mqtt_ms_resend_pubrel = 4
mqtt_ms_wait_for_pubrel = 5
mqtt_ms_resend_pubcomp = 6
mqtt_ms_wait_for_pubcomp = 7
mqtt_ms_send_pubrec = 8
mqtt_ms_queued = 9
R
Roger Light 已提交
128 129

# Error values
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
MQTT_ERR_AGAIN = -1
MQTT_ERR_SUCCESS = 0
MQTT_ERR_NOMEM = 1
MQTT_ERR_PROTOCOL = 2
MQTT_ERR_INVAL = 3
MQTT_ERR_NO_CONN = 4
MQTT_ERR_CONN_REFUSED = 5
MQTT_ERR_NOT_FOUND = 6
MQTT_ERR_CONN_LOST = 7
MQTT_ERR_TLS = 8
MQTT_ERR_PAYLOAD_SIZE = 9
MQTT_ERR_NOT_SUPPORTED = 10
MQTT_ERR_AUTH = 11
MQTT_ERR_ACL_DENIED = 12
MQTT_ERR_UNKNOWN = 13
MQTT_ERR_ERRNO = 14
146
MQTT_ERR_QUEUE_SIZE = 15
R
Roger Light 已提交
147

148 149 150 151
if sys.version_info[0] < 3:
    sockpair_data = "0"
else:
    sockpair_data = b"0"
R
Roger Light 已提交
152

153 154 155
def error_string(mqtt_errno):
    """Return the error string associated with an mqtt error number."""
    if mqtt_errno == MQTT_ERR_SUCCESS:
R
Roger Light 已提交
156
        return "No error."
157
    elif mqtt_errno == MQTT_ERR_NOMEM:
R
Roger Light 已提交
158
        return "Out of memory."
159
    elif mqtt_errno == MQTT_ERR_PROTOCOL:
R
Roger Light 已提交
160
        return "A network protocol error occurred when communicating with the broker."
161
    elif mqtt_errno == MQTT_ERR_INVAL:
R
Roger Light 已提交
162
        return "Invalid function arguments provided."
163
    elif mqtt_errno == MQTT_ERR_NO_CONN:
R
Roger Light 已提交
164
        return "The client is not currently connected."
165
    elif mqtt_errno == MQTT_ERR_CONN_REFUSED:
R
Roger Light 已提交
166
        return "The connection was refused."
167
    elif mqtt_errno == MQTT_ERR_NOT_FOUND:
R
Roger Light 已提交
168
        return "Message not found (internal error)."
169
    elif mqtt_errno == MQTT_ERR_CONN_LOST:
R
Roger Light 已提交
170
        return "The connection was lost."
171
    elif mqtt_errno == MQTT_ERR_TLS:
R
Roger Light 已提交
172
        return "A TLS error occurred."
173
    elif mqtt_errno == MQTT_ERR_PAYLOAD_SIZE:
R
Roger Light 已提交
174
        return "Payload too large."
175
    elif mqtt_errno == MQTT_ERR_NOT_SUPPORTED:
R
Roger Light 已提交
176
        return "This feature is not supported."
177
    elif mqtt_errno == MQTT_ERR_AUTH:
R
Roger Light 已提交
178
        return "Authorisation failed."
179
    elif mqtt_errno == MQTT_ERR_ACL_DENIED:
R
Roger Light 已提交
180
        return "Access denied by ACL."
181
    elif mqtt_errno == MQTT_ERR_UNKNOWN:
R
Roger Light 已提交
182
        return "Unknown error."
183
    elif mqtt_errno == MQTT_ERR_ERRNO:
R
Roger Light 已提交
184 185 186 187
        return "Error defined by errno."
    else:
        return "Unknown error."

R
Roger Light 已提交
188

R
Roger Light 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
def connack_string(connack_code):
    """Return the string associated with a CONNACK result."""
    if connack_code == 0:
        return "Connection Accepted."
    elif connack_code == 1:
        return "Connection Refused: unacceptable protocol version."
    elif connack_code == 2:
        return "Connection Refused: identifier rejected."
    elif connack_code == 3:
        return "Connection Refused: broker unavailable."
    elif connack_code == 4:
        return "Connection Refused: bad user name or password."
    elif connack_code == 5:
        return "Connection Refused: not authorised."
    else:
        return "Connection Refused: unknown reason."

R
Roger Light 已提交
206

207 208 209 210 211 212 213 214 215 216 217
def base62(num, base=string.digits+string.ascii_letters, padding=1):
    """Convert a number to base-62 representation."""
    assert num >= 0
    digits = []
    while num:
        num, rest = divmod(num, 62)
        digits.append(base[rest])
    digits.extend(base[0] for _ in range(len(digits), padding))
    return ''.join(reversed(digits))


218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
def topic_matches_sub(sub, topic):
    """Check whether a topic matches a subscription.

    For example:

    foo/bar would match the subscription foo/# or +/bar
    non/matching would not match the subscription non/+/+
    """
    matcher = MQTTMatcher()
    matcher[sub] = True
    try:
        next(matcher.iter_match(topic))
        return True
    except StopIteration:
        return False


235 236 237 238
def _socketpair_compat():
    """TCP/IP socketpair including Windows support"""
    listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP)
    listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
239
    listensock.bind(("127.0.0.1", 0))
240 241 242 243 244 245
    listensock.listen(1)

    iface, port = listensock.getsockname()
    sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP)
    sock1.setblocking(0)
    try:
246
        sock1.connect(("127.0.0.1", port))
247
    except socket.error as err:
248
        if err.errno != errno.EINPROGRESS and err.errno != errno.EWOULDBLOCK and err.errno != EAGAIN:
249 250 251 252 253 254 255
            raise
    sock2, address = listensock.accept()
    sock2.setblocking(0)
    listensock.close()
    return (sock1, sock2)


256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
class MQTTMessageInfo:
    """This is a class returned from Client.publish() and can be used to find
    out the mid of the message that was published, and to determine whether the
    message has been published, and/or wait until it is published.
    """
    def __init__(self, mid):
        self.mid = mid
        self._published = False
        self._condition = threading.Condition()
        self.rc = 0
        self._iterpos = 0

    def __str__(self):
        return str((self.rc, self.mid))

    def __iter__(self):
        self._iterpos = 0
        return self

    def __next__(self):
        return self.next()

    def next(self):
        if self._iterpos == 0:
            self._iterpos = 1
            return self.rc
        elif self._iterpos == 1:
            self._iterpos = 2
            return self.mid
        else:
            raise StopIteration

    def __getitem__(self, index):
        if index == 0:
            return self.rc
        elif index == 1:
            return self.mid
        else:
            raise IndexError("index out of range")

    def _set_as_published(self):
        with self._condition:
            self._published = True
            self._condition.notify()

    def wait_for_publish(self):
        """Block until the message associated with this object is published."""
        with self._condition:
            while self._published == False:
                self._condition.wait()

    def is_published(self):
        """Returns True if the message associated with this object has been
        published, else returns False."""
        with self._condition:
            return self._published


314
class MQTTMessage:
315 316
    """ This is a class that describes an incoming or outgoing message. It is
    passed to the on_message callback as the message parameter.
R
Roger Light 已提交
317

R
Roger Light 已提交
318 319 320 321 322 323 324 325
    Members:

    topic : String. topic that the message was published on.
    payload : String/bytes the message payload.
    qos : Integer. The message Quality of Service 0, 1 or 2.
    retain : Boolean. If true, the message is a retained message and not fresh.
    mid : Integer. The message id.
    """
326
    def __init__(self, mid=0, topic=""):
R
Roger Light 已提交
327
        self.timestamp = 0
328
        self.state = mqtt_ms_invalid
R
Roger Light 已提交
329
        self.dup = False
330 331
        self.mid = mid
        self.topic = topic
R
Roger Light 已提交
332 333 334
        self.payload = None
        self.qos = 0
        self.retain = False
335
        self.info = MQTTMessageInfo(mid)
R
Roger Light 已提交
336

R
Roger Light 已提交
337

R
Roger Light 已提交
338
class Client(object):
339
    """MQTT version 3.1/3.1.1 client class.
R
Roger Light 已提交
340

R
Roger Light 已提交
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362
    This is the main class for use communicating with an MQTT broker.

    General usage flow:

    * Use connect()/connect_async() to connect to a broker
    * Call loop() frequently to maintain network traffic flow with the broker
    * Or use loop_start() to set a thread running to call loop() for you.
    * Or use loop_forever() to handle calling loop() for you in a blocking
    * function.
    * Use subscribe() to subscribe to a topic and receive messages
    * Use publish() to send messages
    * Use disconnect() to disconnect from the broker

    Data returned from the broker is made available with the use of callback
    functions as described below.

    Callbacks
    =========

    A number of callback functions are available to receive data back from the
    broker. To use a callback, define a function and then assign it to the
    client:
R
Roger Light 已提交
363

364
    def on_connect(client, userdata, flags, rc):
R
Roger Light 已提交
365 366 367 368
        print("Connection returned " + str(rc))

    client.on_connect = on_connect

369 370
    All of the callbacks as described below have a "client" and an "userdata"
    argument. "client" is the Client instance that is calling the callback.
R
Roger Light 已提交
371 372
    "userdata" is user data of any type and can be set when creating a new client
    instance or with user_data_set(userdata).
R
Roger Light 已提交
373

R
Roger Light 已提交
374 375
    The callbacks:

376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391
    on_connect(client, userdata, flags, rc): called when the broker responds to our connection
      request.
      flags is a dict that contains response flags from the broker:
        flags['session present'] - this flag is useful for clients that are
            using clean session set to 0 only. If a client with clean
            session=0, that reconnects to a broker that it has previously
            connected to, this flag indicates whether the broker still has the
            session information for the client. If 1, the session still exists.
      The value of rc determines success or not:
        0: Connection successful
        1: Connection refused - incorrect protocol version
        2: Connection refused - invalid client identifier
        3: Connection refused - server unavailable
        4: Connection refused - bad username or password
        5: Connection refused - not authorised
        6-255: Currently unused.
R
Roger Light 已提交
392

393 394
    on_disconnect(client, userdata, rc): called when the client disconnects from the broker.
      The rc parameter indicates the disconnection state. If MQTT_ERR_SUCCESS
R
Roger Light 已提交
395 396 397 398
      (0), the callback was called in response to a disconnect() call. If any
      other value the disconnection was unexpected, such as might be caused by
      a network error.

399
    on_message(client, userdata, message): called when a message has been received on a
R
Roger Light 已提交
400
      topic that the client subscribes to. The message variable is a
401
      MQTTMessage that describes all of the message parameters.
R
Roger Light 已提交
402

403
    on_publish(client, userdata, mid): called when a message that was to be sent using the
R
Roger Light 已提交
404 405 406 407 408 409 410 411
      publish() call has completed transmission to the broker. For messages
      with QoS levels 1 and 2, this means that the appropriate handshakes have
      completed. For QoS 0, this simply means that the message has left the
      client. The mid variable matches the mid variable returned from the
      corresponding publish() call, to allow outgoing messages to be tracked.
      This callback is important because even if the publish() call returns
      success, it does not always mean that the message has been sent.

412
    on_subscribe(client, userdata, mid, granted_qos): called when the broker responds to a
R
Roger Light 已提交
413 414 415 416 417
      subscribe request. The mid variable matches the mid variable returned
      from the corresponding subscribe() call. The granted_qos variable is a
      list of integers that give the QoS level the broker has granted for each
      of the different subscription requests.

418
    on_unsubscribe(client, userdata, mid): called when the broker responds to an unsubscribe
R
Roger Light 已提交
419 420 421
      request. The mid variable matches the mid variable returned from the
      corresponding unsubscribe() call.

422
    on_log(client, userdata, level, buf): called when the client has log information. Define
R
Roger Light 已提交
423
      to allow debugging. The level variable gives the severity of the message
424 425
      and will be one of MQTT_LOG_INFO, MQTT_LOG_NOTICE, MQTT_LOG_WARNING,
      MQTT_LOG_ERR, and MQTT_LOG_DEBUG. The message itself is in buf.
R
Roger Light 已提交
426 427

    """
R
Roger A. Light 已提交
428
    def __init__(self, client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp"):
R
Roger Light 已提交
429
        """client_id is the unique client id string used when connecting to the
430 431 432 433 434 435
        broker. If client_id is zero length or None, then the behaviour is
        defined by which protocol version is in use. If using MQTT v3.1.1, then
        a zero length client id will be sent to the broker and the broker will
        generate a random for the client. If using MQGG v3.1 then an id will be
        randomly generated. In both cases, clean_session must be True. If this
        is not the case a ValueError will be raised.
R
Roger Light 已提交
436 437 438 439 440

        clean_session is a boolean that determines the client type. If True,
        the broker will remove all information about this client when it
        disconnects. If False, the client is a persistent client and
        subscription information and queued messages will be retained when the
R
Roger Light 已提交
441
        client disconnects.
R
Roger Light 已提交
442 443 444 445 446 447 448
        Note that a client will never discard its own outgoing messages on
        disconnect. Calling connect() or reconnect() will cause the messages to
        be resent.  Use reinitialise() to reset a client to its original state.

        userdata is user defined data of any type that is passed as the "userdata"
        parameter to callbacks. It may be updated at a later point with the
        user_data_set() function.
449 450 451

        The protocol argument allows explicit setting of the MQTT version to
        use for this client. Can be paho.mqtt.client.MQTTv311 (v3.1.1) or
R
Roger A. Light 已提交
452
        paho.mqtt.client.MQTTv31 (v3.1), with the default being v3.1.1 If the
453 454 455
        broker reports that the client connected with an invalid protocol
        version, the client will automatically attempt to reconnect using v3.1
        instead.
M
Milan Toth 已提交
456

457 458
        Set transport to "websockets" to use WebSockets as the transport
        mechanism. Set to "tcp" to use raw TCP, which is the default.
R
Roger Light 已提交
459
        """
R
Roger Light 已提交
460
        if not clean_session and (client_id == "" or client_id is None):
R
Roger Light 已提交
461 462
            raise ValueError('A client id must be provided if clean session is False.')

463
        self._transport = transport
R
Roger Light 已提交
464
        self._protocol = protocol
R
Roger Light 已提交
465 466
        self._userdata = userdata
        self._sock = None
467
        self._sockpairR, self._sockpairW = _socketpair_compat()
R
Roger Light 已提交
468 469 470 471
        self._keepalive = 60
        self._message_retry = 20
        self._last_retry_check = 0
        self._clean_session = clean_session
472

R
Roger Light 已提交
473
        if client_id == "" or client_id is None:
474
            if protocol == MQTTv31:
475
                self._client_id = base62(uuid.uuid4().int, padding=22)
476 477
            else:
                self._client_id = ""
R
Roger Light 已提交
478 479 480
        else:
            self._client_id = client_id

481 482
        self._username = None
        self._password = None
R
Roger Light 已提交
483 484 485 486 487 488 489 490 491
        self._in_packet = {
            "command": 0,
            "have_remaining": 0,
            "remaining_count": [],
            "remaining_mult": 1,
            "remaining_length": 0,
            "packet": b"",
            "to_process": 0,
            "pos": 0}
R
Roger Light 已提交
492 493
        self._out_packet = []
        self._current_out_packet = None
494 495
        self._last_msg_in = time_func()
        self._last_msg_out = time_func()
R
Roger Light 已提交
496 497
        self._ping_t = 0
        self._last_mid = 0
498
        self._state = mqtt_cs_new
R
Roger Light 已提交
499 500
        self._out_messages = []
        self._in_messages = []
501 502
        self._max_inflight_messages = 20
        self._inflight_messages = 0
503
        self._max_queued_messages = 0
R
Roger Light 已提交
504 505 506 507 508
        self._will = False
        self._will_topic = ""
        self._will_payload = None
        self._will_qos = 0
        self._will_retain = False
509
        self._on_message_filtered = MQTTMatcher()
R
Roger Light 已提交
510 511
        self._host = ""
        self._port = 1883
512
        self._bind_address = ""
R
Roger Light 已提交
513 514
        self._in_callback = False
        self._strict_protocol = False
515
        self._callback_mutex = threading.RLock()
R
Roger Light 已提交
516 517 518 519
        self._state_mutex = threading.Lock()
        self._out_packet_mutex = threading.Lock()
        self._current_out_packet_mutex = threading.Lock()
        self._msgtime_mutex = threading.Lock()
R
Roger Light 已提交
520 521
        self._out_message_mutex = threading.Lock()
        self._in_message_mutex = threading.Lock()
R
Roger Light 已提交
522 523
        self._thread = None
        self._thread_terminate = False
Y
yoch 已提交
524
        self._ssl = False
R
Roger Light 已提交
525 526 527 528 529
        self._tls_certfile = None
        self._tls_keyfile = None
        self._tls_ca_certs = None
        self._tls_cert_reqs = None
        self._tls_ciphers = None
R
Roger Light 已提交
530
        self._tls_version = tls_version
531
        self._tls_insecure = False
532
        self._logger = None
533 534 535 536 537 538 539 540
        # No default callbacks
        self._on_log = None
        self._on_connect = None
        self._on_subscribe = None
        self._on_message = None
        self._on_publish = None
        self._on_unsubscribe = None
        self._on_disconnect = None
R
Roger Light 已提交
541 542 543 544 545

    def __del__(self):
        pass

    def reinitialise(self, client_id="", clean_session=True, userdata=None):
Y
yoch 已提交
546
        if self._sock:
R
Roger Light 已提交
547 548
            self._sock.close()
            self._sock = None
549 550 551 552 553 554 555
        if self._sockpairR:
            self._sockpairR.close()
            self._sockpairR = None
        if self._sockpairW:
            self._sockpairW.close()
            self._sockpairW = None

R
Roger Light 已提交
556 557
        self.__init__(client_id, clean_session, userdata)

558
    def tls_set(self, ca_certs, certfile=None, keyfile=None, cert_reqs=cert_reqs, tls_version=tls_version, ciphers=None):
R
Roger Light 已提交
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576
        """Configure network encryption and authentication options. Enables SSL/TLS support.

        ca_certs : a string path to the Certificate Authority certificate files
        that are to be treated as trusted by this client. If this is the only
        option given then the client will operate in a similar manner to a web
        browser. That is to say it will require the broker to have a
        certificate signed by the Certificate Authorities in ca_certs and will
        communicate using TLS v1, but will not attempt any form of
        authentication. This provides basic network encryption but may not be
        sufficient depending on how the broker is configured.

        certfile and keyfile are strings pointing to the PEM encoded client
        certificate and private keys respectively. If these arguments are not
        None then they will be used as client information for TLS based
        authentication.  Support for this feature is broker dependent. Note
        that if either of these files in encrypted and needs a password to
        decrypt it, Python will ask for the password at the command line. It is
        not currently possible to define a callback to provide the password.
R
Roger Light 已提交
577

R
Roger Light 已提交
578 579 580 581
        cert_reqs allows the certificate requirements that the client imposes
        on the broker to be changed. By default this is ssl.CERT_REQUIRED,
        which means that the broker must provide a certificate. See the ssl
        pydoc for more information on this parameter.
R
Roger Light 已提交
582

R
Roger Light 已提交
583 584 585 586 587 588 589 590 591 592
        tls_version allows the version of the SSL/TLS protocol used to be
        specified. By default TLS v1 is used. Previous versions (all versions
        beginning with SSL) are possible but not recommended due to possible
        security problems.

        ciphers is a string specifying which encryption ciphers are allowable
        for this connection, or None to use the defaults. See the ssl pydoc for
        more information.

        Must be called before connect() or connect_async()."""
R
Roger Light 已提交
593
        if HAVE_SSL is False:
594 595
            raise ValueError('This platform has no SSL/TLS.')

R
Roger Light 已提交
596 597 598
        if sys.version < '2.7':
            raise ValueError('Python 2.7 is the minimum supported version for TLS.')

R
Roger Light 已提交
599
        if ca_certs is None:
R
Roger Light 已提交
600 601 602 603 604 605 606 607
            raise ValueError('ca_certs must not be None.')

        try:
            f = open(ca_certs, "r")
        except IOError as err:
            raise IOError(ca_certs+": "+err.strerror)
        else:
            f.close()
R
Roger Light 已提交
608
        if certfile is not None:
R
Roger Light 已提交
609 610 611 612 613 614
            try:
                f = open(certfile, "r")
            except IOError as err:
                raise IOError(certfile+": "+err.strerror)
            else:
                f.close()
R
Roger Light 已提交
615
        if keyfile is not None:
R
Roger Light 已提交
616 617 618 619 620 621 622
            try:
                f = open(keyfile, "r")
            except IOError as err:
                raise IOError(keyfile+": "+err.strerror)
            else:
                f.close()

Y
yoch 已提交
623
        self._ssl = True
R
Roger Light 已提交
624 625 626 627 628 629 630
        self._tls_ca_certs = ca_certs
        self._tls_certfile = certfile
        self._tls_keyfile = keyfile
        self._tls_cert_reqs = cert_reqs
        self._tls_version = tls_version
        self._tls_ciphers = ciphers

631 632 633 634 635 636 637 638 639 640 641
    def tls_insecure_set(self, value):
        """Configure verification of the server hostname in the server certificate.

        If value is set to true, it is impossible to guarantee that the host
        you are connecting to is not impersonating your server. This can be
        useful in initial server testing, but makes it possible for a malicious
        third party to impersonate your server through DNS spoofing, for
        example.

        Do not use this function in a real system. Setting value to true means
        there is no point using encryption.
R
Roger Light 已提交
642

643
        Must be called before connect()."""
R
Roger Light 已提交
644
        if HAVE_SSL is False:
645 646
            raise ValueError('This platform has no SSL/TLS.')

647 648
        self._tls_insecure = value

649 650 651 652 653 654 655 656 657 658 659
    def enable_logger(self, logger=None):
        if not logger:
            if self._logger:
                # Do not replace existing logger
                return
            logger = logging.getLogger(__name__)
        self._logger = logger

    def disable_logger(self):
        self._logger = None

660
    def connect(self, host, port=1883, keepalive=60, bind_address=""):
R
Roger Light 已提交
661 662 663 664 665 666 667 668 669 670
        """Connect to a remote broker.

        host is the hostname or IP address of the remote broker.
        port is the network port of the server host to connect to. Defaults to
        1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you
        are using tls_set() the port may need providing.
        keepalive: Maximum period in seconds between communications with the
        broker. If no other messages are being exchanged, this controls the
        rate at which the client will send ping messages to the broker.
        """
671
        self.connect_async(host, port, keepalive, bind_address)
R
Roger Light 已提交
672 673
        return self.reconnect()

J
Jan-Piet Mens 已提交
674 675 676 677 678 679 680 681
    def connect_srv(self, domain=None, keepalive=60, bind_address=""):
        """Connect to a remote broker.

        domain is the DNS domain to search for SRV records; if None,
        try to determine local domain name.
        keepalive and bind_address are as for connect()
        """

R
Roger Light 已提交
682
        if HAVE_DNS is False:
683
            raise ValueError('No DNS resolver library found, try "pip install dnspython" or "pip3 install dnspython3".')
J
Jan-Piet Mens 已提交
684 685 686 687 688 689 690

        if domain is None:
            domain = socket.getfqdn()
            domain = domain[domain.find('.') + 1:]

        try:
            rr = '_mqtt._tcp.%s' % domain
Y
yoch 已提交
691
            if self._ssl:
J
Jan-Piet Mens 已提交
692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
                # IANA specifies secure-mqtt (not mqtts) for port 8883
                rr = '_secure-mqtt._tcp.%s' % domain
            answers = []
            for answer in dns.resolver.query(rr, dns.rdatatype.SRV):
                addr = answer.target.to_text()[:-1]
                answers.append((addr, answer.port, answer.priority, answer.weight))
        except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, dns.resolver.NoNameservers):
            raise ValueError("No answer/NXDOMAIN for SRV in %s" % (domain))

        # FIXME: doesn't account for weight
        for answer in answers:
            host, port, prio, weight = answer

            try:
                return self.connect(host, port, keepalive, bind_address)
            except:
                pass

        raise ValueError("No SRV hosts responded")

712
    def connect_async(self, host, port=1883, keepalive=60, bind_address=""):
R
Roger Light 已提交
713 714 715 716 717 718 719 720 721 722 723 724
        """Connect to a remote broker asynchronously. This is a non-blocking
        connect call that can be used with loop_start() to provide very quick
        start.

        host is the hostname or IP address of the remote broker.
        port is the network port of the server host to connect to. Defaults to
        1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you
        are using tls_set() the port may need providing.
        keepalive: Maximum period in seconds between communications with the
        broker. If no other messages are being exchanged, this controls the
        rate at which the client will send ping messages to the broker.
        """
R
Roger Light 已提交
725
        if host is None or len(host) == 0:
R
Roger Light 已提交
726 727 728 729 730
            raise ValueError('Invalid host.')
        if port <= 0:
            raise ValueError('Invalid port number.')
        if keepalive < 0:
            raise ValueError('Keepalive must be >=0.')
R
Roger Light 已提交
731
        if bind_address != "" and bind_address is not None:
732 733
            if (sys.version_info[0] == 2 and sys.version_info[1] < 7) or (sys.version_info[0] == 3 and sys.version_info[1] < 2):
                raise ValueError('bind_address requires Python 2.7 or 3.2.')
R
Roger Light 已提交
734 735 736 737

        self._host = host
        self._port = port
        self._keepalive = keepalive
738
        self._bind_address = bind_address
R
Roger Light 已提交
739 740

        self._state_mutex.acquire()
741
        self._state = mqtt_cs_connect_async
R
Roger Light 已提交
742 743 744 745 746 747 748 749 750 751
        self._state_mutex.release()

    def reconnect(self):
        """Reconnect the client after a disconnect. Can only be called after
        connect()/connect_async()."""
        if len(self._host) == 0:
            raise ValueError('Invalid host.')
        if self._port <= 0:
            raise ValueError('Invalid port number.')

R
Roger Light 已提交
752 753 754 755 756 757 758 759 760 761
        self._in_packet = {
            "command": 0,
            "have_remaining": 0,
            "remaining_count": [],
            "remaining_mult": 1,
            "remaining_length": 0,
            "packet": b"",
            "to_process": 0,
            "pos": 0}

R
Roger Light 已提交
762 763 764 765 766 767 768 769 770
        self._out_packet_mutex.acquire()
        self._out_packet = []
        self._out_packet_mutex.release()

        self._current_out_packet_mutex.acquire()
        self._current_out_packet = None
        self._current_out_packet_mutex.release()

        self._msgtime_mutex.acquire()
771 772
        self._last_msg_in = time_func()
        self._last_msg_out = time_func()
R
Roger Light 已提交
773 774 775 776
        self._msgtime_mutex.release()

        self._ping_t = 0
        self._state_mutex.acquire()
777
        self._state = mqtt_cs_new
R
Roger Light 已提交
778
        self._state_mutex.release()
Y
yoch 已提交
779
        if self._sock:
R
Roger Light 已提交
780 781 782 783 784 785
            self._sock.close()
            self._sock = None

        # Put messages in progress in a valid state.
        self._messages_reconnect_reset()

786
        try:
787
            if (sys.version_info[0] == 2 and sys.version_info[1] < 7) or (sys.version_info[0] == 3 and sys.version_info[1] < 2):
788
                sock = socket.create_connection((self._host, self._port))
R
Roger Light 已提交
789
            else:
790
                sock = socket.create_connection((self._host, self._port), source_address=(self._bind_address, 0))
791
        except socket.error as err:
792
            if err.errno != errno.EINPROGRESS and err.errno != errno.EWOULDBLOCK and err.errno != EAGAIN:
793
                raise
R
Roger Light 已提交
794

R
Roger Light 已提交
795
        if self._tls_ca_certs is not None:
Y
yoch 已提交
796
            sock = ssl.wrap_socket(
797
                sock,
R
Roger Light 已提交
798 799 800 801 802 803 804 805
                certfile=self._tls_certfile,
                keyfile=self._tls_keyfile,
                ca_certs=self._tls_ca_certs,
                cert_reqs=self._tls_cert_reqs,
                ssl_version=self._tls_version,
                ciphers=self._tls_ciphers)

            if self._tls_insecure is False:
Y
yoch 已提交
806 807
                if sys.version_info < (2,7,9) or (sys.version_info[0] == 3 and sys.version_info[1] < 2):
                    self._tls_match_hostname(sock)
808
                else:
Y
yoch 已提交
809
                    ssl.match_hostname(sock.getpeercert(), self._host)
R
Roger Light 已提交
810

811
        if self._transport == "websockets":
M
Milan Toth 已提交
812
            if self._tls_ca_certs is not None:
Y
yoch 已提交
813
                sock = WebsocketWrapper(sock, self._host, self._port, True)
M
Milan Toth 已提交
814 815 816
            else:
                sock = WebsocketWrapper(sock, self._host, self._port, False)

817
        self._sock = sock
Y
yoch 已提交
818
        self._sock.setblocking(0)
R
Roger Light 已提交
819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834

        return self._send_connect(self._keepalive, self._clean_session)

    def loop(self, timeout=1.0, max_packets=1):
        """Process network events.

        This function must be called regularly to ensure communication with the
        broker is carried out. It calls select() on the network socket to wait
        for network events. If incoming data is present it will then be
        processed. Outgoing commands, from e.g. publish(), are normally sent
        immediately that their function is called, but this is not always
        possible. loop() will also attempt to send any remaining outgoing
        messages, which also includes commands that are part of the flow for
        messages with QoS>0.

        timeout: The time in seconds to wait for incoming/outgoing network
R
Roger Light 已提交
835
          traffic before timing out and returning.
R
Roger Light 已提交
836 837
        max_packets: Not currently used.

838
        Returns MQTT_ERR_SUCCESS on success.
R
Roger Light 已提交
839 840 841 842 843 844 845 846
        Returns >0 on error.

        A ValueError will be raised if timeout < 0"""
        if timeout < 0.0:
            raise ValueError('Invalid timeout.')

        self._current_out_packet_mutex.acquire()
        self._out_packet_mutex.acquire()
R
Roger Light 已提交
847
        if self._current_out_packet is None and len(self._out_packet) > 0:
R
Roger Light 已提交
848 849 850
            self._current_out_packet = self._out_packet.pop(0)

        if self._current_out_packet:
Y
yoch 已提交
851
            wlist = [self._sock]
R
Roger Light 已提交
852 853 854 855 856
        else:
            wlist = []
        self._out_packet_mutex.release()
        self._current_out_packet_mutex.release()

857 858
        # sockpairR is used to break out of select() before the timeout, on a
        # call to publish() etc.
Y
yoch 已提交
859
        rlist = [self._sock, self._sockpairR]
R
Roger Light 已提交
860 861 862 863
        try:
            socklist = select.select(rlist, wlist, [], timeout)
        except TypeError:
            # Socket isn't correct type, in likelihood connection is lost
864
            return MQTT_ERR_CONN_LOST
865 866 867 868
        except ValueError:
            # Can occur if we just reconnected but rlist/wlist contain a -1 for
            # some reason.
            return MQTT_ERR_CONN_LOST
M
matt venn 已提交
869 870 871
        except KeyboardInterrupt:
            # Allow ^C to interrupt
            raise
872 873
        except:
            return MQTT_ERR_UNKNOWN
R
Roger Light 已提交
874

Y
yoch 已提交
875
        if self._sock in socklist[0]:
R
Roger Light 已提交
876
            rc = self.loop_read(max_packets)
Y
yoch 已提交
877
            if rc or self._sock is None:
R
Roger Light 已提交
878 879
                return rc

880 881 882
        if self._sockpairR in socklist[0]:
            # Stimulate output write even though we didn't ask for it, because
            # at that point the publish or other command wasn't present.
Y
yoch 已提交
883
            socklist[1].insert(0, self._sock)
884 885 886 887
            # Clear sockpairR - only ever a single byte written.
            try:
                self._sockpairR.recv(1)
            except socket.error as err:
888
                if err.errno != EAGAIN:
889 890
                    raise

Y
yoch 已提交
891
        if self._sock in socklist[1]:
R
Roger Light 已提交
892
            rc = self.loop_write(max_packets)
Y
yoch 已提交
893
            if rc or self._sock is None:
R
Roger Light 已提交
894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913
                return rc

        return self.loop_misc()

    def publish(self, topic, payload=None, qos=0, retain=False):
        """Publish a message on a topic.

        This causes a message to be sent to the broker and subsequently from
        the broker to any clients subscribing to matching topics.

        topic: The topic that the message should be published on.
        payload: The actual message to send. If not given, or set to None a
        zero length message will be used. Passing an int or float will result
        in the payload being converted to a string representing that number. If
        you wish to send a true int/float, use struct.pack() to create the
        payload you require.
        qos: The quality of service level to use.
        retain: If set to true, the message will be set as the "last known
        good"/retained message for the topic.

914 915 916 917 918 919 920 921 922 923 924 925 926 927
        Returns a MQTTMessageInfo class, which can be used to determine whether
        the message has been delivered (using info.is_published()) or to block
        waiting for the message to be delivered (info.wait_for_publish()). The
        message ID and return code of the publish() call can be found at
        info.mid and info.rc.

        For backwards compatibility, the MQTTMessageInfo class is iterable so
        the old construct of (rc, mid) = client.publish(...) is still valid.

        rc is MQTT_ERR_SUCCESS to indicate success or MQTT_ERR_NO_CONN if the
        client is not currently connected.  mid is the message ID for the
        publish request. The mid value can be used to track the publish request
        by checking against the mid argument in the on_publish() callback if it
        is defined.
R
Roger Light 已提交
928

R
Roger Light 已提交
929
        A ValueError will be raised if topic is None, has zero length or is
R
Roger Light 已提交
930 931
        invalid (contains a wildcard), if qos is not one of 0, 1 or 2, or if
        the length of the payload is greater than 268435455 bytes."""
R
Roger Light 已提交
932
        if topic is None or len(topic) == 0:
R
Roger Light 已提交
933 934 935
            raise ValueError('Invalid topic.')
        if qos<0 or qos>2:
            raise ValueError('Invalid QoS level.')
936 937
        if isinstance(payload, str) or isinstance(payload, bytearray):
            local_payload = payload
938 939
        elif sys.version_info[0] == 3 and isinstance(payload, bytes):
            local_payload = bytearray(payload)
940
        elif sys.version_info[0] < 3 and isinstance(payload, unicode):
R
Roger Light 已提交
941
            local_payload = payload
942
        elif isinstance(payload, int) or isinstance(payload, float):
R
Roger Light 已提交
943
            local_payload = str(payload)
R
Roger Light 已提交
944
        elif payload is None:
R
Roger Light 已提交
945 946 947 948
            local_payload = None
        else:
            raise TypeError('payload must be a string, bytearray, int, float or None.')

R
Roger Light 已提交
949
        if local_payload is not None and len(local_payload) > 268435455:
R
Roger Light 已提交
950 951
            raise ValueError('Payload too large.')

952
        if self._topic_wildcard_len_check(topic) != MQTT_ERR_SUCCESS:
R
Roger Light 已提交
953 954 955 956 957
            raise ValueError('Publish topic cannot contain wildcards.')

        local_mid = self._mid_generate()

        if qos == 0:
958 959 960 961
            info = MQTTMessageInfo(local_mid)
            rc = self._send_publish(local_mid, topic, local_payload, qos, retain, False, info)
            info.rc = rc
            return info
R
Roger Light 已提交
962
        else:
963
            message = MQTTMessage(local_mid, topic)
964
            message.timestamp = time_func()
R
Roger Light 已提交
965

R
Roger Light 已提交
966
            if local_payload is None or len(local_payload) == 0:
R
Roger Light 已提交
967 968 969 970 971 972 973 974
                message.payload = None
            else:
                message.payload = local_payload

            message.qos = qos
            message.retain = retain
            message.dup = False

R
Roger Light 已提交
975
            self._out_message_mutex.acquire()
976 977 978 979 980

            if self._max_queued_messages > 0 and len(self._out_messages) >= self._max_queued_messages:
                self._out_message_mutex.release()
                return (MQTT_ERR_QUEUE_SIZE, local_mid)

R
Roger Light 已提交
981
            self._out_messages.append(message)
982 983 984
            if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
                self._inflight_messages = self._inflight_messages+1
                if qos == 1:
985
                    message.state = mqtt_ms_wait_for_puback
986
                elif qos == 2:
987
                    message.state = mqtt_ms_wait_for_pubrec
R
Roger Light 已提交
988
                self._out_message_mutex.release()
989 990

                rc = self._send_publish(message.mid, message.topic, message.payload, message.qos, message.retain, message.dup)
991 992 993 994 995

                # remove from inflight messages so it will be send after a connection is made
                if rc is MQTT_ERR_NO_CONN:
                    with self._out_message_mutex:
                        self._inflight_messages -= 1
R
Roger A. Light 已提交
996
                        message.state = mqtt_ms_publish
997

998 999
                message.info.rc = rc
                return message.info
1000
            else:
1001
                message.state = mqtt_ms_queued
1002
                self._out_message_mutex.release()
1003 1004
                message.info.rc = MQTT_ERR_SUCCESS
                return message.info
R
Roger Light 已提交
1005 1006 1007 1008 1009 1010 1011 1012 1013 1014

    def username_pw_set(self, username, password=None):
        """Set a username and optionally a password for broker authentication.

        Must be called before connect() to have any effect.
        Requires a broker that supports MQTT v3.1.

        username: The username to authenticate with. Need have no relationship to the client id.
        password: The password to authenticate with. Optional, set to None if not required.
        """
1015
        self._username = username.encode('utf-8')
R
Roger Light 已提交
1016 1017 1018 1019 1020
        self._password = password

    def disconnect(self):
        """Disconnect a connected client from the broker."""
        self._state_mutex.acquire()
1021
        self._state = mqtt_cs_disconnecting
R
Roger Light 已提交
1022 1023
        self._state_mutex.release()

Y
yoch 已提交
1024
        if self._sock is None:
1025 1026
            return MQTT_ERR_NO_CONN

R
Roger Light 已提交
1027 1028 1029
        return self._send_disconnect()

    def subscribe(self, topic, qos=0):
1030
        """Subscribe the client to one or more topics.
R
Roger Light 已提交
1031

1032 1033 1034 1035 1036 1037 1038
        This function may be called in three different ways:

        Simple string and integer
        -------------------------
        e.g. subscribe("my/topic", 2)

        topic: A string specifying the subscription topic to subscribe to.
R
Roger Light 已提交
1039
        qos: The desired quality of service level for the subscription.
1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062
             Defaults to 0.

        String and integer tuple
        ------------------------
        e.g. subscribe(("my/topic", 1))

        topic: A tuple of (topic, qos). Both topic and qos must be present in
               the tuple.
        qos: Not used.

        List of string and integer tuples
        ------------------------
        e.g. subscribe([("my/topic", 0), ("another/topic", 2)])

        This allows multiple topic subscriptions in a single SUBSCRIPTION
        command, which is more efficient than using multiple calls to
        subscribe().

        topic: A list of tuple of format (topic, qos). Both topic and qos must
               be present in all of the tuples.
        qos: Not used.

        The function returns a tuple (result, mid), where result is
1063 1064 1065 1066 1067
        MQTT_ERR_SUCCESS to indicate success or (MQTT_ERR_NO_CONN, None) if the
        client is not currently connected.  mid is the message ID for the
        subscribe request. The mid value can be used to track the subscribe
        request by checking against the mid argument in the on_subscribe()
        callback if it is defined.
R
Roger Light 已提交
1068 1069

        Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has
1070
        zero string length, or if topic is not a string, tuple or list.
R
Roger Light 已提交
1071
        """
1072
        topic_qos_list = None
1073
        if isinstance(topic, str) or (sys.version_info[0] == 2 and isinstance(topic, unicode)):
1074 1075 1076 1077
            if qos<0 or qos>2:
                raise ValueError('Invalid QoS level.')
            if topic is None or len(topic) == 0:
                raise ValueError('Invalid topic.')
1078
            topic_qos_list = [(topic.encode('utf-8'), qos)]
1079 1080 1081 1082 1083
        elif isinstance(topic, tuple):
            if topic[1]<0 or topic[1]>2:
                raise ValueError('Invalid QoS level.')
            if topic[0] is None or len(topic[0]) == 0 or not isinstance(topic[0], str):
                raise ValueError('Invalid topic.')
1084
            topic_qos_list = [(topic[0].encode('utf-8'), topic[1])]
1085
        elif isinstance(topic, list):
1086
            topic_qos_list = []
1087 1088 1089 1090 1091
            for t in topic:
                if t[1]<0 or t[1]>2:
                    raise ValueError('Invalid QoS level.')
                if t[0] is None or len(t[0]) == 0 or not isinstance(t[0], str):
                    raise ValueError('Invalid topic.')
1092
                topic_qos_list.append((t[0].encode('utf-8'), t[1]))
1093 1094 1095 1096

        if topic_qos_list is None:
            raise ValueError("No topic specified, or incorrect topic type.")

Y
yoch 已提交
1097 1098 1099
        if any(self._filter_wildcard_len_check(topic) != MQTT_ERR_SUCCESS for topic, _ in topic_qos_list):
            raise ValueError('Invalid subscription filter.')

Y
yoch 已提交
1100
        if self._sock is None:
1101
            return (MQTT_ERR_NO_CONN, None)
R
Roger Light 已提交
1102

1103
        return self._send_subscribe(False, topic_qos_list)
R
Roger Light 已提交
1104 1105

    def unsubscribe(self, topic):
1106
        """Unsubscribe the client from one or more topics.
R
Roger Light 已提交
1107

1108 1109
        topic: A single string, or list of strings that are the subscription
               topics to unsubscribe from.
R
Roger Light 已提交
1110

1111
        Returns a tuple (result, mid), where result is MQTT_ERR_SUCCESS
1112 1113
        to indicate success or (MQTT_ERR_NO_CONN, None) if the client is not
        currently connected.
R
Roger Light 已提交
1114 1115 1116 1117
        mid is the message ID for the unsubscribe request. The mid value can be
        used to track the unsubscribe request by checking against the mid
        argument in the on_unsubscribe() callback if it is defined.

1118 1119
        Raises a ValueError if topic is None or has zero string length, or is
        not a string or list.
R
Roger Light 已提交
1120
        """
1121 1122
        topic_list = None
        if topic is None:
R
Roger Light 已提交
1123
            raise ValueError('Invalid topic.')
1124 1125 1126
        if isinstance(topic, str):
            if len(topic) == 0:
                raise ValueError('Invalid topic.')
1127
            topic_list = [topic.encode('utf-8')]
1128
        elif isinstance(topic, list):
1129
            topic_list = []
1130 1131 1132
            for t in topic:
                if len(t) == 0 or not isinstance(t, str):
                    raise ValueError('Invalid topic.')
1133
                topic_list.append(t.encode('utf-8'))
1134 1135 1136 1137

        if topic_list is None:
            raise ValueError("No topic specified, or incorrect topic type.")

Y
yoch 已提交
1138
        if self._sock is None:
1139
            return (MQTT_ERR_NO_CONN, None)
R
Roger Light 已提交
1140

1141
        return self._send_unsubscribe(False, topic_list)
R
Roger Light 已提交
1142 1143 1144 1145 1146 1147 1148

    def loop_read(self, max_packets=1):
        """Process read network events. Use in place of calling loop() if you
        wish to handle your client reads as part of your own application.

        Use socket() to obtain the client socket to call select() or equivalent
        on.
R
Roger Light 已提交
1149

R
Roger Light 已提交
1150
        Do not use if you are using the threaded interface loop_start()."""
Y
yoch 已提交
1151
        if self._sock is None:
1152
            return MQTT_ERR_NO_CONN
R
Roger Light 已提交
1153

R
Roger Light 已提交
1154
        max_packets = len(self._out_messages) + len(self._in_messages)
R
Roger Light 已提交
1155 1156 1157 1158 1159 1160 1161
        if max_packets < 1:
            max_packets = 1

        for i in range(0, max_packets):
            rc = self._packet_read()
            if rc > 0:
                return self._loop_rc_handle(rc)
1162 1163 1164
            elif rc == MQTT_ERR_AGAIN:
                return MQTT_ERR_SUCCESS
        return MQTT_ERR_SUCCESS
R
Roger Light 已提交
1165 1166 1167 1168

    def loop_write(self, max_packets=1):
        """Process read network events. Use in place of calling loop() if you
        wish to handle your client reads as part of your own application.
R
Roger Light 已提交
1169

R
Roger Light 已提交
1170 1171 1172 1173 1174 1175
        Use socket() to obtain the client socket to call select() or equivalent
        on.

        Use want_write() to determine if there is data waiting to be written.

        Do not use if you are using the threaded interface loop_start()."""
Y
yoch 已提交
1176
        if self._sock is None:
1177
            return MQTT_ERR_NO_CONN
R
Roger Light 已提交
1178

1179
        max_packets = len(self._out_packet) + 1
R
Roger Light 已提交
1180 1181 1182 1183 1184 1185 1186
        if max_packets < 1:
            max_packets = 1

        for i in range(0, max_packets):
            rc = self._packet_write()
            if rc > 0:
                return self._loop_rc_handle(rc)
1187 1188 1189
            elif rc == MQTT_ERR_AGAIN:
                return MQTT_ERR_SUCCESS
        return MQTT_ERR_SUCCESS
R
Roger Light 已提交
1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204

    def want_write(self):
        """Call to determine if there is network data waiting to be written.
        Useful if you are calling select() yourself rather than using loop().
        """
        if self._current_out_packet or len(self._out_packet) > 0:
            return True
        else:
            return False

    def loop_misc(self):
        """Process miscellaneous network events. Use in place of calling loop() if you
        wish to call select() or equivalent on.

        Do not use if you are using the threaded interface loop_start()."""
Y
yoch 已提交
1205
        if self._sock is None:
1206
            return MQTT_ERR_NO_CONN
R
Roger Light 已提交
1207

1208
        now = time_func()
R
Roger Light 已提交
1209 1210 1211 1212 1213 1214 1215
        self._check_keepalive()
        if self._last_retry_check+1 < now:
            # Only check once a second at most
            self._message_retry_check()
            self._last_retry_check = now

        if self._ping_t > 0 and now - self._ping_t >= self._keepalive:
1216
            # client->ping_t != 0 means we are waiting for a pingresp.
R
Roger Light 已提交
1217
            # This hasn't happened in the keepalive time so we should disconnect.
Y
yoch 已提交
1218
            if self._sock:
R
Roger Light 已提交
1219 1220 1221 1222
                self._sock.close()
                self._sock = None

            self._callback_mutex.acquire()
1223 1224
            if self._state == mqtt_cs_disconnecting:
                rc = MQTT_ERR_SUCCESS
R
Roger Light 已提交
1225 1226 1227 1228 1229 1230 1231
            else:
                rc = 1
            if self.on_disconnect:
                self._in_callback = True
                self.on_disconnect(self, self._userdata, rc)
                self._in_callback = False
            self._callback_mutex.release()
1232
            return MQTT_ERR_CONN_LOST
R
Roger Light 已提交
1233

1234
        return MQTT_ERR_SUCCESS
R
Roger Light 已提交
1235

1236 1237 1238 1239 1240 1241 1242
    def max_inflight_messages_set(self, inflight):
        """Set the maximum number of messages with QoS>0 that can be part way
        through their network flow at once. Defaults to 20."""
        if inflight < 0:
            raise ValueError('Invalid inflight.')
        self._max_inflight_messages = inflight

1243 1244 1245 1246 1247 1248 1249 1250 1251 1252
    def max_queued_messages_set(self, queue_size):
        """Set the maximum number of messages in the outgoing message queue.
        0 means unlimited."""
        if queue_size < 0:
            raise ValueError('Invalid queue size.')
        if not isinstance(queue_size, int):
            raise ValueError('Invalid type of queue size.')
        self._max_queued_messages = queue_size
        return self

R
Roger Light 已提交
1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282
    def message_retry_set(self, retry):
        """Set the timeout in seconds before a message with QoS>0 is retried.
        20 seconds by default."""
        if retry < 0:
            raise ValueError('Invalid retry.')

        self._message_retry = retry

    def user_data_set(self, userdata):
        """Set the user data variable passed to callbacks. May be any data type."""
        self._userdata = userdata

    def will_set(self, topic, payload=None, qos=0, retain=False):
        """Set a Will to be sent by the broker in case the client disconnects unexpectedly.

        This must be called before connect() to have any effect.

        topic: The topic that the will message should be published on.
        payload: The message to send as a will. If not given, or set to None a
        zero length message will be used as the will. Passing an int or float
        will result in the payload being converted to a string representing
        that number. If you wish to send a true int/float, use struct.pack() to
        create the payload you require.
        qos: The quality of service level to use for the will.
        retain: If set to true, the will message will be set as the "last known
        good"/retained message for the topic.

        Raises a ValueError if qos is not 0, 1 or 2, or if topic is None or has
        zero string length.
        """
R
Roger Light 已提交
1283
        if topic is None or len(topic) == 0:
R
Roger Light 已提交
1284 1285 1286
            raise ValueError('Invalid topic.')
        if qos<0 or qos>2:
            raise ValueError('Invalid QoS level.')
1287 1288 1289
        if isinstance(payload, str):
            self._will_payload = payload.encode('utf-8')
        elif isinstance(payload, bytearray):
R
Roger Light 已提交
1290
            self._will_payload = payload
1291
        elif isinstance(payload, int) or isinstance(payload, float):
R
Roger Light 已提交
1292
            self._will_payload = str(payload)
R
Roger Light 已提交
1293
        elif payload is None:
R
Roger Light 已提交
1294 1295 1296 1297 1298
            self._will_payload = None
        else:
            raise TypeError('payload must be a string, bytearray, int, float or None.')

        self._will = True
1299
        self._will_topic = topic.encode('utf-8')
R
Roger Light 已提交
1300 1301 1302 1303 1304
        self._will_qos = qos
        self._will_retain = retain

    def will_clear(self):
        """ Removes a will that was previously configured with will_set().
R
Roger Light 已提交
1305

R
Roger Light 已提交
1306 1307 1308 1309 1310 1311 1312 1313 1314
        Must be called before connect() to have any effect."""
        self._will = False
        self._will_topic = ""
        self._will_payload = None
        self._will_qos = 0
        self._will_retain = False

    def socket(self):
        """Return the socket or ssl object for this client."""
Y
yoch 已提交
1315
        return self._sock
R
Roger Light 已提交
1316

1317
    def loop_forever(self, timeout=1.0, max_packets=1, retry_first_connection=False):
R
Roger Light 已提交
1318 1319 1320 1321 1322
        """This function call loop() for you in an infinite blocking loop. It
        is useful for the case where you only want to run the MQTT client loop
        in your program.

        loop_forever() will handle reconnecting for you. If you call
1323 1324 1325 1326 1327 1328 1329 1330 1331 1332
        disconnect() in a callback it will return.


        timeout: The time in seconds to wait for incoming/outgoing network
          traffic before timing out and returning.
        max_packets: Not currently used.
        retry_first_connection: Should the first connection attempt be retried on failure.

        Raises socket.error on first connection failures unless retry_first_connection=True
        """
R
Roger Light 已提交
1333 1334

        run = True
1335 1336

        while run:
1337 1338 1339
            if self._thread_terminate is True:
                break

1340 1341 1342 1343 1344 1345 1346 1347 1348 1349
            if self._state == mqtt_cs_connect_async:
                try:
                    self.reconnect()
                except socket.error:
                    if not retry_first_connection:
                        raise
                    self._easy_log(MQTT_LOG_DEBUG, "Connection failed, retrying")
                    time.sleep(1)
            else:
                break
R
Roger Light 已提交
1350

1351
        while run:
1352 1353
            rc = MQTT_ERR_SUCCESS
            while rc == MQTT_ERR_SUCCESS:
R
Roger Light 已提交
1354
                rc = self.loop(timeout, max_packets)
1355 1356 1357 1358 1359 1360 1361 1362
                # We don't need to worry about locking here, because we've
                # either called loop_forever() when in single threaded mode, or
                # in multi threaded mode when loop_stop() has been called and
                # so no other threads can access _current_out_packet,
                # _out_packet or _messages.
                if (self._thread_terminate is True
                        and self._current_out_packet is None
                        and len(self._out_packet) == 0
R
Roger Light 已提交
1363
                        and len(self._out_messages) == 0):
1364

1365 1366
                    rc = 1
                    run = False
R
Roger Light 已提交
1367

R
Roger Light 已提交
1368
            self._state_mutex.acquire()
R
Roger A. Light 已提交
1369
            if self._state == mqtt_cs_disconnecting or run is False or self._thread_terminate is True:
R
Roger Light 已提交
1370
                run = False
1371
                self._state_mutex.release()
R
Roger Light 已提交
1372
            else:
1373
                self._state_mutex.release()
R
Roger Light 已提交
1374
                time.sleep(1)
1375 1376

                self._state_mutex.acquire()
R
Roger A. Light 已提交
1377
                if self._state == mqtt_cs_disconnecting or run is False or self._thread_terminate is True:
1378 1379 1380 1381
                    run = False
                    self._state_mutex.release()
                else:
                    self._state_mutex.release()
1382 1383 1384 1385
                    try:
                        self.reconnect()
                    except socket.error as err:
                        pass
1386

R
Roger Light 已提交
1387 1388 1389 1390 1391 1392 1393
        return rc

    def loop_start(self):
        """This is part of the threaded client interface. Call this once to
        start a new thread to process network traffic. This provides an
        alternative to repeatedly calling loop() yourself.
        """
R
Roger Light 已提交
1394
        if self._thread is not None:
1395
            return MQTT_ERR_INVAL
R
Roger Light 已提交
1396

1397
        self._thread_terminate = False
R
Roger Light 已提交
1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408
        self._thread = threading.Thread(target=self._thread_main)
        self._thread.daemon = True
        self._thread.start()

    def loop_stop(self, force=False):
        """This is part of the threaded client interface. Call this once to
        stop the network thread previously created with loop_start(). This call
        will block until the network thread finishes.

        The force parameter is currently ignored.
        """
R
Roger Light 已提交
1409
        if self._thread is None:
1410
            return MQTT_ERR_INVAL
R
Roger Light 已提交
1411 1412

        self._thread_terminate = True
1413 1414 1415
        if threading.current_thread() != self._thread:
            self._thread.join()
            self._thread = None
R
Roger Light 已提交
1416

1417 1418
    @property
    def on_log(self):
A
Alexis BRENON 已提交
1419 1420
        """If implemented, called when the client has log information.
        Defined to allow debugging."""
1421
        return self._on_log
1422

1423 1424
    @on_log.setter
    def on_log(self, func):
A
Alexis BRENON 已提交
1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436
        """ Define the logging callback implementation.

        Expected signature is:
            log_callback(client, userdata, level, buf)

        client:     the client instance for this callback
        userdata:   the private user data as set in Client() or userdata_set()
        level:      gives the severity of the message and will be one of
                    MQTT_LOG_INFO, MQTT_LOG_NOTICE, MQTT_LOG_WARNING,
                    MQTT_LOG_ERR, and MQTT_LOG_DEBUG.
        buf:        the message itself
        """
1437 1438 1439 1440
        self._on_log = func

    @property
    def on_connect(self):
A
Alexis BRENON 已提交
1441 1442 1443 1444 1445 1446 1447
        """If implemented, called when the broker responds to our connection
        request."""
        return self._on_connect

    @on_connect.setter
    def on_connect(self, func):
        """ Define the connect callback implementation.
1448

A
Alexis BRENON 已提交
1449 1450 1451 1452 1453
        Expected signature is:
            connect_callback(client, userdata, flags, rc)

        client:     the client instance for this callback
        userdata:   the private user data as set in Client() or userdata_set()
1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472
        flags:      response flags sent by the broker
        rc:         the connection result

        flags is a dict that contains response flags from the broker:
            flags['session present'] - this flag is useful for clients that are
                using clean session set to 0 only. If a client with clean
                session=0, that reconnects to a broker that it has previously
                connected to, this flag indicates whether the broker still has the
                session information for the client. If 1, the session still exists.

        The value of rc indicates success or not:
            0: Connection successful
            1: Connection refused - incorrect protocol version
            2: Connection refused - invalid client identifier
            3: Connection refused - server unavailable
            4: Connection refused - bad username or password
            5: Connection refused - not authorised
            6-255: Currently unused.
        """
1473 1474 1475 1476
        self._on_connect = func

    @property
    def on_subscribe(self):
A
Alexis BRENON 已提交
1477 1478 1479 1480 1481 1482 1483
        """If implemented, called when the broker responds to a subscribe
        request."""
        return self._on_subscribe

    @on_subscribe.setter
    def on_subscribe(self, func):
        """ Define the suscribe callback implementation.
1484

A
Alexis BRENON 已提交
1485 1486 1487 1488 1489
        Expected signature is:
            subscribe_callback(client, userdata, mid, granted_qos)

        client:         the client instance for this callback
        userdata:       the private user data as set in Client() or userdata_set()
1490 1491 1492 1493 1494
        mid:            matches the mid variable returned from the corresponding
                        subscribe() call.
        granted_qos:    list of integers that give the QoS level the broker has
                        granted for each of the different subscription requests.
        """
1495
        self._on_subscribe = func
1496

1497 1498
    @property
    def on_message(self):
A
Alexis BRENON 已提交
1499 1500
        """If implemented, called when a message has been received on a topic
        that the client subscribes to.
1501 1502

        This callback will be called for every message received. Use
A
Alexis BRENON 已提交
1503 1504 1505 1506 1507 1508 1509
        message_callback_add() to define multiple callbacks that will be called
        for specific topic filters."""
        return self._on_message

    @on_message.setter
    def on_message(self, func):
        """ Define the message received callback implementation.
1510

A
Alexis BRENON 已提交
1511 1512 1513 1514
        Expected signature is:
            on_message_callback(client, userdata, message)

        client:     the client instance for this callback
1515 1516 1517 1518
        userdata:   the private user data as set in Client() or userdata_set()
        message:    an instance of MQTTMessage.
                    This is a class with members topic, payload, qos, retain.
        """
1519
        self._on_message = func
1520

1521 1522
    @property
    def on_publish(self):
A
Alexis BRENON 已提交
1523 1524
        """If implemented, called when a message that was to be sent using the
        publish() call has completed transmission to the broker.
1525 1526 1527 1528 1529

        For messages with QoS levels 1 and 2, this means that the appropriate
        handshakes have completed. For QoS 0, this simply means that the message
        has left the client.
        This callback is important because even if the publish() call returns
A
Alexis BRENON 已提交
1530
        success, it does not always mean that the message has been sent."""
1531 1532 1533 1534
        return self._on_publish

    @on_publish.setter
    def on_publish(self, func):
A
Alexis BRENON 已提交
1535 1536 1537 1538 1539 1540 1541 1542 1543 1544
        """ Define the published message callback implementation.

        Expected signature is:
            on_publish_callback(client, userdata, mid)

        client:     the client instance for this callback
        userdata:   the private user data as set in Client() or userdata_set()
        mid:        matches the mid variable returned from the corresponding
                    publish() call, to allow outgoing messages to be tracked.
        """
1545
        self._on_publish = func
1546

1547 1548
    @property
    def on_unsubscribe(self):
A
Alexis BRENON 已提交
1549 1550
        """If implemented, called when the broker responds to an unsubscribe
        request."""
1551 1552 1553 1554
        return self._on_unsubscribe

    @on_unsubscribe.setter
    def on_unsubscribe(self, func):
A
Alexis BRENON 已提交
1555
        """ Define the unsubscribe callback implementation.
1556

A
Alexis BRENON 已提交
1557 1558
        Expected signature is:
            unsubscribe_callback(client, userdata, mid)
1559

A
Alexis BRENON 已提交
1560
        client:     the client instance for this callback
1561
        userdata:   the private user data as set in Client() or userdata_set()
A
Alexis BRENON 已提交
1562 1563 1564 1565
        mid:        matches the mid variable returned from the corresponding
                    unsubscribe() call.
        """
        self._on_unsubscribe = func
1566

A
Alexis BRENON 已提交
1567 1568 1569
    @property
    def on_disconnect(self):
        """If implemented, called when the client disconnects from the broker.
1570
        """
1571 1572 1573 1574
        return self._on_disconnect

    @on_disconnect.setter
    def on_disconnect(self, func):
A
Alexis BRENON 已提交
1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587
        """ Define the disconnect callback implementation.

        Expected signature is:
            disconnect_callback(client, userdata, self)

        client:     the client instance for this callback
        userdata:   the private user data as set in Client() or userdata_set()
        rc:         the disconnection result
                    The rc parameter indicates the disconnection state. If
                    MQTT_ERR_SUCCESS (0), the callback was called in response to
                    a disconnect() call. If any other value the disconnection
                    was unexpected, such as might be caused by a network error.
        """
1588
        self._on_disconnect = func
1589

1590 1591 1592 1593 1594
    def message_callback_add(self, sub, callback):
        """Register a message callback for a specific topic.
        Messages that match 'sub' will be passed to 'callback'. Any
        non-matching messages will be passed to the default on_message
        callback.
1595

1596 1597
        Call multiple times with different 'sub' to define multiple topic
        specific callbacks.
1598

1599 1600 1601 1602 1603 1604
        Topic specific callbacks may be removed with
        message_callback_remove()."""
        if callback is None or sub is None:
            raise ValueError("sub and callback must both be defined.")

        self._callback_mutex.acquire()
1605
        self._on_message_filtered[sub] = callback
1606 1607 1608 1609 1610 1611 1612 1613 1614
        self._callback_mutex.release()

    def message_callback_remove(self, sub):
        """Remove a message callback previously registered with
        message_callback_add()."""
        if sub is None:
            raise ValueError("sub must defined.")

        self._callback_mutex.acquire()
1615 1616 1617 1618
        try:
            del self._on_message_filtered[sub]
        except KeyError:  # no such subscription
            pass
1619 1620
        self._callback_mutex.release()

R
Roger Light 已提交
1621 1622 1623 1624 1625 1626
    # ============================================================
    # Private functions
    # ============================================================

    def _loop_rc_handle(self, rc):
        if rc:
Y
yoch 已提交
1627
            if self._sock:
R
Roger Light 已提交
1628 1629 1630 1631
                self._sock.close()
                self._sock = None

            self._state_mutex.acquire()
1632 1633
            if self._state == mqtt_cs_disconnecting:
                rc = MQTT_ERR_SUCCESS
R
Roger Light 已提交
1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655
            self._state_mutex.release()
            self._callback_mutex.acquire()
            if self.on_disconnect:
                self._in_callback = True
                self.on_disconnect(self, self._userdata, rc)
                self._in_callback = False

            self._callback_mutex.release()
        return rc

    def _packet_read(self):
        # This gets called if pselect() indicates that there is network data
        # available - ie. at least one byte.  What we do depends on what data we
        # already have.
        # If we've not got a command, attempt to read one and save it. This should
        # always work because it's only a single byte.
        # Then try to read the remaining length. This may fail because it is may
        # be more than one byte - will need to save data pending next read if it
        # does fail.
        # Then try to read the remaining payload, where 'payload' here means the
        # combined variable header and actual payload. This is the most likely to
        # fail due to longer length, so save current data and current position.
1656
        # After all data is read, send to _mqtt_handle_packet() to deal with.
R
Roger Light 已提交
1657
        # Finally, free the memory and reset everything to starting conditions.
R
Roger Light 已提交
1658
        if self._in_packet['command'] == 0:
R
Roger Light 已提交
1659
            try:
Y
yoch 已提交
1660
                command = self._sock.recv(1)
R
Roger Light 已提交
1661
            except socket.error as err:
1662
                if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE):
1663
                    return MQTT_ERR_AGAIN
1664
                if err.errno == EAGAIN:
1665
                    return MQTT_ERR_AGAIN
1666
                print(err)
R
Roger Light 已提交
1667
                return 1
R
Roger Light 已提交
1668 1669 1670 1671
            else:
                if len(command) == 0:
                    return 1
                command = struct.unpack("!B", command)
R
Roger Light 已提交
1672
                self._in_packet['command'] = command[0]
R
Roger Light 已提交
1673

R
Roger Light 已提交
1674
        if self._in_packet['have_remaining'] == 0:
R
Roger Light 已提交
1675 1676 1677 1678 1679
            # Read remaining
            # Algorithm for decoding taken from pseudo code at
            # http://publib.boulder.ibm.com/infocenter/wmbhelp/v6r0m0/topic/com.ibm.etools.mft.doc/ac10870_.htm
            while True:
                try:
Y
yoch 已提交
1680
                    byte = self._sock.recv(1)
R
Roger Light 已提交
1681
                except socket.error as err:
1682
                    if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE):
1683
                        return MQTT_ERR_AGAIN
1684
                    if err.errno == EAGAIN:
1685
                        return MQTT_ERR_AGAIN
1686
                    print(err)
R
Roger Light 已提交
1687
                    return 1
R
Roger Light 已提交
1688 1689 1690
                else:
                    byte = struct.unpack("!B", byte)
                    byte = byte[0]
R
Roger Light 已提交
1691
                    self._in_packet['remaining_count'].append(byte)
R
Roger Light 已提交
1692 1693
                    # Max 4 bytes length for remaining length as defined by protocol.
                     # Anything more likely means a broken/malicious client.
R
Roger Light 已提交
1694
                    if len(self._in_packet['remaining_count']) > 4:
1695
                        return MQTT_ERR_PROTOCOL
R
Roger Light 已提交
1696

R
Roger Light 已提交
1697 1698
                    self._in_packet['remaining_length'] = self._in_packet['remaining_length'] + (byte & 127)*self._in_packet['remaining_mult']
                    self._in_packet['remaining_mult'] = self._in_packet['remaining_mult'] * 128
R
Roger Light 已提交
1699 1700 1701 1702

                if (byte & 128) == 0:
                    break

R
Roger Light 已提交
1703 1704
            self._in_packet['have_remaining'] = 1
            self._in_packet['to_process'] = self._in_packet['remaining_length']
R
Roger Light 已提交
1705

R
Roger Light 已提交
1706
        while self._in_packet['to_process'] > 0:
R
Roger Light 已提交
1707
            try:
Y
yoch 已提交
1708
                data = self._sock.recv(self._in_packet['to_process'])
R
Roger Light 已提交
1709
            except socket.error as err:
1710
                if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE):
1711
                    return MQTT_ERR_AGAIN
1712
                if err.errno == EAGAIN:
1713
                    return MQTT_ERR_AGAIN
1714
                print(err)
R
Roger Light 已提交
1715
                return 1
R
Roger Light 已提交
1716
            else:
R
Roger Light 已提交
1717 1718
                self._in_packet['to_process'] = self._in_packet['to_process'] - len(data)
                self._in_packet['packet'] = self._in_packet['packet'] + data
R
Roger Light 已提交
1719 1720

        # All data for this packet is read.
R
Roger Light 已提交
1721
        self._in_packet['pos'] = 0
R
Roger Light 已提交
1722 1723
        rc = self._packet_handle()

R
Roger Light 已提交
1724
        # Free data and reset values
R
Roger Light 已提交
1725 1726 1727 1728 1729 1730 1731 1732 1733
        self._in_packet = dict(
            command=0,
            have_remaining=0,
            remaining_count=[],
            remaining_mult=1,
            remaining_length=0,
            packet=b"",
            to_process=0,
            pos=0)
R
Roger Light 已提交
1734 1735

        self._msgtime_mutex.acquire()
1736
        self._last_msg_in = time_func()
R
Roger Light 已提交
1737 1738 1739 1740 1741 1742 1743 1744 1745 1746
        self._msgtime_mutex.release()
        return rc

    def _packet_write(self):
        self._current_out_packet_mutex.acquire()

        while self._current_out_packet:
            packet = self._current_out_packet

            try:
Y
yoch 已提交
1747
                write_length = self._sock.send(packet['packet'][packet['pos']:])
1748
            except (AttributeError, ValueError):
R
Roger Light 已提交
1749
                self._current_out_packet_mutex.release()
1750
                return MQTT_ERR_SUCCESS
R
Roger Light 已提交
1751 1752
            except socket.error as err:
                self._current_out_packet_mutex.release()
1753
                if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE):
1754
                    return MQTT_ERR_AGAIN
1755
                if err.errno == EAGAIN:
1756
                    return MQTT_ERR_AGAIN
1757
                print(err)
R
Roger Light 已提交
1758
                return 1
R
Roger Light 已提交
1759 1760

            if write_length > 0:
R
Roger Light 已提交
1761 1762
                packet['to_process'] = packet['to_process'] - write_length
                packet['pos'] = packet['pos'] + write_length
R
Roger Light 已提交
1763

R
Roger Light 已提交
1764 1765
                if packet['to_process'] == 0:
                    if (packet['command'] & 0xF0) == PUBLISH and packet['qos'] == 0:
R
Roger Light 已提交
1766 1767 1768
                        self._callback_mutex.acquire()
                        if self.on_publish:
                            self._in_callback = True
R
Roger Light 已提交
1769
                            self.on_publish(self, self._userdata, packet['mid'])
R
Roger Light 已提交
1770 1771 1772
                            self._in_callback = False
                        self._callback_mutex.release()

1773 1774
                        packet['info']._set_as_published()

R
Roger Light 已提交
1775
                    if (packet['command'] & 0xF0) == DISCONNECT:
1776 1777 1778
                        self._current_out_packet_mutex.release()

                        self._msgtime_mutex.acquire()
1779
                        self._last_msg_out = time_func()
1780 1781 1782 1783 1784 1785 1786 1787
                        self._msgtime_mutex.release()

                        self._callback_mutex.acquire()
                        if self.on_disconnect:
                            self._in_callback = True
                            self.on_disconnect(self, self._userdata, 0)
                            self._in_callback = False
                        self._callback_mutex.release()
1788

1789
                        if self._sock:
1790 1791
                            self._sock.close()
                            self._sock = None
R
Roger Light 已提交
1792
                        return MQTT_ERR_SUCCESS
1793

R
Roger Light 已提交
1794 1795 1796 1797 1798 1799 1800
                    self._out_packet_mutex.acquire()
                    if len(self._out_packet) > 0:
                        self._current_out_packet = self._out_packet.pop(0)
                    else:
                        self._current_out_packet = None
                    self._out_packet_mutex.release()
            else:
1801
                break
R
Roger Light 已提交
1802

R
Roger Light 已提交
1803 1804 1805
        self._current_out_packet_mutex.release()

        self._msgtime_mutex.acquire()
1806
        self._last_msg_out = time_func()
R
Roger Light 已提交
1807 1808
        self._msgtime_mutex.release()

1809
        return MQTT_ERR_SUCCESS
R
Roger Light 已提交
1810

Y
yoch 已提交
1811
    def _easy_log(self, level, fmt, *args):
R
Roger Light 已提交
1812
        if self.on_log:
Y
yoch 已提交
1813
            buf = fmt % args
R
Roger Light 已提交
1814
            self.on_log(self, self._userdata, level, buf)
1815 1816 1817
        if self._logger:
            level_std = LOGGING_LEVEL[level]
            self._logger.log(level_std, fmt, *args)
R
Roger Light 已提交
1818 1819

    def _check_keepalive(self):
1820 1821 1822
        if self._keepalive == 0:
            return MQTT_ERR_SUCCESS

1823
        now = time_func()
R
Roger Light 已提交
1824 1825 1826 1827
        self._msgtime_mutex.acquire()
        last_msg_out = self._last_msg_out
        last_msg_in = self._last_msg_in
        self._msgtime_mutex.release()
Y
yoch 已提交
1828
        if self._sock is not None and (now - last_msg_out >= self._keepalive or now - last_msg_in >= self._keepalive):
1829
            if self._state == mqtt_cs_connected and self._ping_t == 0:
R
Roger Light 已提交
1830 1831 1832 1833 1834 1835
                self._send_pingreq()
                self._msgtime_mutex.acquire()
                self._last_msg_out = now
                self._last_msg_in = now
                self._msgtime_mutex.release()
            else:
Y
yoch 已提交
1836
                if self._sock:
R
Roger Light 已提交
1837 1838 1839
                    self._sock.close()
                    self._sock = None

1840 1841
                if self._state == mqtt_cs_disconnecting:
                    rc = MQTT_ERR_SUCCESS
R
Roger Light 已提交
1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856
                else:
                    rc = 1
                self._callback_mutex.acquire()
                if self.on_disconnect:
                    self._in_callback = True
                    self.on_disconnect(self, self._userdata, rc)
                    self._in_callback = False
                self._callback_mutex.release()

    def _mid_generate(self):
        self._last_mid = self._last_mid + 1
        if self._last_mid == 65536:
            self._last_mid = 1
        return self._last_mid

Y
yoch 已提交
1857 1858
    @staticmethod
    def _topic_wildcard_len_check(topic):
1859 1860 1861
        # Search for + or # in a topic. Return MQTT_ERR_INVAL if found.
         # Also returns MQTT_ERR_INVAL if the topic string is too long.
         # Returns MQTT_ERR_SUCCESS if everything is fine.
R
Roger Light 已提交
1862
        if '+' in topic or '#' in topic or len(topic) == 0 or len(topic) > 65535:
1863
            return MQTT_ERR_INVAL
R
Roger Light 已提交
1864
        else:
1865
            return MQTT_ERR_SUCCESS
R
Roger Light 已提交
1866

Y
yoch 已提交
1867 1868 1869 1870 1871 1872 1873 1874 1875
    @staticmethod
    def _filter_wildcard_len_check(sub):
        if (len(sub) == 0 or len(sub) > 65535
            or any(b'+' in p or b'#' in p for p in sub.split(b'/') if len(p) > 1)
            or b'#/' in sub):
            return MQTT_ERR_INVAL
        else:
            return MQTT_ERR_SUCCESS

R
Roger Light 已提交
1876
    def _send_pingreq(self):
1877
        self._easy_log(MQTT_LOG_DEBUG, "Sending PINGREQ")
R
Roger Light 已提交
1878
        rc = self._send_simple_command(PINGREQ)
1879
        if rc == MQTT_ERR_SUCCESS:
1880
            self._ping_t = time_func()
R
Roger Light 已提交
1881 1882 1883
        return rc

    def _send_pingresp(self):
1884
        self._easy_log(MQTT_LOG_DEBUG, "Sending PINGRESP")
R
Roger Light 已提交
1885 1886 1887
        return self._send_simple_command(PINGRESP)

    def _send_puback(self, mid):
Y
yoch 已提交
1888
        self._easy_log(MQTT_LOG_DEBUG, "Sending PUBACK (Mid: %d)", mid)
R
Roger Light 已提交
1889 1890 1891
        return self._send_command_with_mid(PUBACK, mid, False)

    def _send_pubcomp(self, mid):
Y
yoch 已提交
1892
        self._easy_log(MQTT_LOG_DEBUG, "Sending PUBCOMP (Mid: %d)", mid)
R
Roger Light 已提交
1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915
        return self._send_command_with_mid(PUBCOMP, mid, False)

    def _pack_remaining_length(self, packet, remaining_length):
        remaining_bytes = []
        while True:
            byte = remaining_length % 128
            remaining_length = remaining_length // 128
            # If there are more digits to encode, set the top bit of this digit
            if remaining_length > 0:
                byte = byte | 0x80

            remaining_bytes.append(byte)
            packet.extend(struct.pack("!B", byte))
            if remaining_length == 0:
                # FIXME - this doesn't deal with incorrectly large payloads
                return packet

    def _pack_str16(self, packet, data):
        if sys.version_info[0] < 3:
            if isinstance(data, bytearray):
                packet.extend(struct.pack("!H", len(data)))
                packet.extend(data)
            elif isinstance(data, str):
1916 1917 1918
                udata = data.encode('utf-8')
                pack_format = "!H" + str(len(udata)) + "s"
                packet.extend(struct.pack(pack_format, len(udata), udata))
R
Roger Light 已提交
1919 1920 1921 1922 1923 1924 1925
            elif isinstance(data, unicode):
                udata = data.encode('utf-8')
                pack_format = "!H" + str(len(udata)) + "s"
                packet.extend(struct.pack(pack_format, len(udata), udata))
            else:
                raise TypeError
        else:
1926
            if isinstance(data, bytearray) or isinstance(data, bytes):
R
Roger Light 已提交
1927 1928 1929 1930 1931 1932 1933 1934 1935
                packet.extend(struct.pack("!H", len(data)))
                packet.extend(data)
            elif isinstance(data, str):
                udata = data.encode('utf-8')
                pack_format = "!H" + str(len(udata)) + "s"
                packet.extend(struct.pack(pack_format, len(udata), udata))
            else:
                raise TypeError

1936
    def _send_publish(self, mid, topic, payload=None, qos=0, retain=False, dup=False, info=None):
Y
yoch 已提交
1937
        if self._sock is None:
1938
            return MQTT_ERR_NO_CONN
R
Roger Light 已提交
1939

1940
        utopic = topic.encode('utf-8')
R
Roger Light 已提交
1941 1942 1943
        command = PUBLISH | ((dup&0x1)<<3) | (qos<<1) | retain
        packet = bytearray()
        packet.extend(struct.pack("!B", command))
R
Roger Light 已提交
1944
        if payload is None:
1945
            remaining_length = 2+len(utopic)
1946 1947 1948 1949 1950
            self._easy_log(
                MQTT_LOG_DEBUG,
                "Sending PUBLISH (d%d, q%d, r%d, m%d), '%s' (NULL payload)",
                dup, qos, retain, mid, topic
            )
R
Roger Light 已提交
1951
        else:
1952 1953 1954 1955 1956 1957 1958 1959 1960 1961
            if isinstance(payload, str):
                upayload = payload.encode('utf-8')
                payloadlen = len(upayload)
            elif isinstance(payload, bytearray):
                payloadlen = len(payload)
            elif isinstance(payload, unicode):
                upayload = payload.encode('utf-8')
                payloadlen = len(upayload)

            remaining_length = 2+len(utopic) + payloadlen
1962 1963 1964 1965 1966
            self._easy_log(
                MQTT_LOG_DEBUG,
                "Sending PUBLISH (d%d, q%d, r%d, m%d), '%s', ... (%d bytes)",
                dup, qos, retain, mid, topic, payloadlen
            )
R
Roger Light 已提交
1967 1968 1969 1970 1971 1972

        if qos > 0:
            # For message id
            remaining_length = remaining_length + 2

        self._pack_remaining_length(packet, remaining_length)
1973
        self._pack_str16(packet, topic)
R
Roger Light 已提交
1974 1975 1976 1977 1978

        if qos > 0:
            # For message id
            packet.extend(struct.pack("!H", mid))

R
Roger Light 已提交
1979
        if payload is not None:
R
Roger Light 已提交
1980
            if isinstance(payload, str):
1981 1982
                pack_format = str(payloadlen) + "s"
                packet.extend(struct.pack(pack_format, upayload))
R
Roger Light 已提交
1983 1984 1985
            elif isinstance(payload, bytearray):
                packet.extend(payload)
            elif isinstance(payload, unicode):
1986 1987
                pack_format = str(payloadlen) + "s"
                packet.extend(struct.pack(pack_format, upayload))
R
Roger Light 已提交
1988 1989 1990
            else:
                raise TypeError('payload must be a string, unicode or a bytearray.')

1991
        return self._packet_queue(PUBLISH, packet, mid, qos, info)
R
Roger Light 已提交
1992 1993

    def _send_pubrec(self, mid):
Y
yoch 已提交
1994
        self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREC (Mid: %d)", mid)
R
Roger Light 已提交
1995 1996 1997
        return self._send_command_with_mid(PUBREC, mid, False)

    def _send_pubrel(self, mid, dup=False):
Y
yoch 已提交
1998
        self._easy_log(MQTT_LOG_DEBUG, "Sending PUBREL (Mid: %d)", mid)
R
Roger Light 已提交
1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016
        return self._send_command_with_mid(PUBREL|2, mid, dup)

    def _send_command_with_mid(self, command, mid, dup):
        # For PUBACK, PUBCOMP, PUBREC, and PUBREL
        if dup:
            command = command | 8

        remaining_length = 2
        packet = struct.pack('!BBH', command, remaining_length, mid)
        return self._packet_queue(command, packet, mid, 1)

    def _send_simple_command(self, command):
        # For DISCONNECT, PINGREQ and PINGRESP
        remaining_length = 0
        packet = struct.pack('!BB', command, remaining_length)
        return self._packet_queue(command, packet, 0, 0)

    def _send_connect(self, keepalive, clean_session):
2017 2018 2019 2020 2021 2022
        if self._protocol == MQTTv31:
            protocol = PROTOCOL_NAMEv31
            proto_ver = 3
        else:
            protocol = PROTOCOL_NAMEv311
            proto_ver = 4
2023
        protocol = protocol.encode('utf-8')
2024
        remaining_length = 2+len(protocol) + 1+1+2 + 2+len(self._client_id)
R
Roger Light 已提交
2025 2026 2027 2028 2029
        connect_flags = 0
        if clean_session:
            connect_flags = connect_flags | 0x02

        if self._will:
2030 2031 2032 2033 2034
            if self._will_payload is not None:
                remaining_length = remaining_length + 2+len(self._will_topic) + 2+len(self._will_payload)
            else:
                remaining_length = remaining_length + 2+len(self._will_topic) + 2

R
Roger Light 已提交
2035 2036
            connect_flags = connect_flags | 0x04 | ((self._will_qos&0x03) << 3) | ((self._will_retain&0x01) << 5)

2037
        if self._username is not None:
R
Roger Light 已提交
2038 2039
            remaining_length = remaining_length + 2+len(self._username)
            connect_flags = connect_flags | 0x80
2040
            if self._password is not None:
R
Roger Light 已提交
2041 2042 2043 2044 2045 2046
                connect_flags = connect_flags | 0x40
                remaining_length = remaining_length + 2+len(self._password)

        command = CONNECT
        packet = bytearray()
        packet.extend(struct.pack("!B", command))
R
Roger Light 已提交
2047

2048
        self._pack_remaining_length(packet, remaining_length)
R
Roger Light 已提交
2049
        packet.extend(struct.pack("!H"+str(len(protocol))+"sBBH", len(protocol), protocol, proto_ver, connect_flags, keepalive))
R
Roger Light 已提交
2050 2051 2052 2053 2054

        self._pack_str16(packet, self._client_id)

        if self._will:
            self._pack_str16(packet, self._will_topic)
R
Roger Light 已提交
2055
            if self._will_payload is None or len(self._will_payload) == 0:
R
Roger Light 已提交
2056
                packet.extend(struct.pack("!H", 0))
R
Roger Light 已提交
2057 2058
            else:
                self._pack_str16(packet, self._will_payload)
R
Roger Light 已提交
2059

2060
        if self._username is not None:
R
Roger Light 已提交
2061 2062
            self._pack_str16(packet, self._username)

2063
            if self._password is not None:
R
Roger Light 已提交
2064 2065 2066
                self._pack_str16(packet, self._password)

        self._keepalive = keepalive
2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078
        self._easy_log(
            MQTT_LOG_DEBUG,
            "Sending CONNECT (u%d, p%d, wr%d, wq%d, wf%d, c%d, k%d) client_id=%s",
            (connect_flags & 0x80) >> 7,
            (connect_flags & 0x40) >> 6,
            (connect_flags & 0x20) >> 5,
            (connect_flags & 0x18) >> 3,
            (connect_flags & 0x4) >> 2,
            (connect_flags & 0x2) >> 1,
            keepalive,
            self._client_id
        )
R
Roger Light 已提交
2079 2080 2081
        return self._packet_queue(command, packet, 0, 0)

    def _send_disconnect(self):
Y
yoch 已提交
2082
        self._easy_log(MQTT_LOG_DEBUG, "Sending DISCONNECT")
R
Roger Light 已提交
2083 2084
        return self._send_simple_command(DISCONNECT)

2085 2086 2087 2088 2089
    def _send_subscribe(self, dup, topics):
        remaining_length = 2
        for t in topics:
            remaining_length = remaining_length + 2+len(t[0])+1

R
Roger Light 已提交
2090 2091 2092 2093 2094 2095
        command = SUBSCRIBE | (dup<<3) | (1<<1)
        packet = bytearray()
        packet.extend(struct.pack("!B", command))
        self._pack_remaining_length(packet, remaining_length)
        local_mid = self._mid_generate()
        packet.extend(struct.pack("!H", local_mid))
2096 2097 2098
        for t in topics:
            self._pack_str16(packet, t[0])
            packet.extend(struct.pack("B", t[1]))
Y
yoch 已提交
2099

Y
yoch 已提交
2100 2101
        #topics_repr = ", ".join("'%s' q%s" % (topic.decode('utf8'), qos) for topic, qos in topics)
        self._easy_log(MQTT_LOG_DEBUG, "Sending SUBSCRIBE (d%d, m%d) %s", dup, local_mid, topics)
Y
yoch 已提交
2102

R
Roger Light 已提交
2103 2104
        return (self._packet_queue(command, packet, local_mid, 1), local_mid)

2105 2106 2107 2108 2109
    def _send_unsubscribe(self, dup, topics):
        remaining_length = 2
        for t in topics:
            remaining_length = remaining_length + 2+len(t)

R
Roger Light 已提交
2110 2111 2112 2113 2114 2115
        command = UNSUBSCRIBE | (dup<<3) | (1<<1)
        packet = bytearray()
        packet.extend(struct.pack("!B", command))
        self._pack_remaining_length(packet, remaining_length)
        local_mid = self._mid_generate()
        packet.extend(struct.pack("!H", local_mid))
2116 2117
        for t in topics:
            self._pack_str16(packet, t)
Y
yoch 已提交
2118

Y
yoch 已提交
2119 2120
        #topics_repr = ", ".join("'"+topic.decode('utf8')+"'" for topic in topics)
        self._easy_log(MQTT_LOG_DEBUG, "Sending UNSUBSCRIBE (d%d) %s", dup, topics)
R
Roger Light 已提交
2121 2122
        return (self._packet_queue(command, packet, local_mid, 1), local_mid)

R
Roger Light 已提交
2123 2124
    def _message_retry_check_actual(self, messages, mutex):
        mutex.acquire()
2125
        now = time_func()
R
Roger Light 已提交
2126
        for m in messages:
R
Roger Light 已提交
2127
            if m.timestamp + self._message_retry < now:
2128
                if m.state == mqtt_ms_wait_for_puback or m.state == mqtt_ms_wait_for_pubrec:
R
Roger Light 已提交
2129 2130 2131
                    m.timestamp = now
                    m.dup = True
                    self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
2132
                elif m.state == mqtt_ms_wait_for_pubrel:
R
Roger Light 已提交
2133 2134 2135
                    m.timestamp = now
                    m.dup = True
                    self._send_pubrec(m.mid)
2136
                elif m.state == mqtt_ms_wait_for_pubcomp:
R
Roger Light 已提交
2137 2138 2139
                    m.timestamp = now
                    m.dup = True
                    self._send_pubrel(m.mid, True)
R
Roger Light 已提交
2140
        mutex.release()
R
Roger Light 已提交
2141

R
Roger Light 已提交
2142 2143 2144 2145 2146 2147
    def _message_retry_check(self):
        self._message_retry_check_actual(self._out_messages, self._out_message_mutex)
        self._message_retry_check_actual(self._in_messages, self._in_message_mutex)

    def _messages_reconnect_reset_out(self):
        self._out_message_mutex.acquire()
2148
        self._inflight_messages = 0
R
Roger Light 已提交
2149
        for m in self._out_messages:
R
Roger Light 已提交
2150
            m.timestamp = 0
R
Roger Light 已提交
2151
            if self._max_inflight_messages == 0 or self._inflight_messages < self._max_inflight_messages:
2152
                if m.qos == 0:
R
Roger A. Light 已提交
2153
                    m.state = mqtt_ms_publish
2154 2155 2156 2157
                elif m.qos == 1:
                    #self._inflight_messages = self._inflight_messages + 1
                    if m.state == mqtt_ms_wait_for_puback:
                        m.dup = True
R
Roger A. Light 已提交
2158
                    m.state = mqtt_ms_publish
R
Roger Light 已提交
2159
                elif m.qos == 2:
2160 2161 2162 2163 2164 2165 2166
                    #self._inflight_messages = self._inflight_messages + 1
                    if m.state == mqtt_ms_wait_for_pubcomp:
                        m.state = mqtt_ms_resend_pubrel
                        m.dup = True
                    else:
                        if m.state == mqtt_ms_wait_for_pubrec:
                            m.dup = True
R
Roger A. Light 已提交
2167
                        m.state = mqtt_ms_publish
R
Roger Light 已提交
2168
            else:
2169
                m.state = mqtt_ms_queued
R
Roger Light 已提交
2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185
        self._out_message_mutex.release()

    def _messages_reconnect_reset_in(self):
        self._in_message_mutex.acquire()
        for m in self._in_messages:
            m.timestamp = 0
            if m.qos != 2:
                self._in_messages.pop(self._in_messages.index(m))
            else:
                # Preserve current state
                pass
        self._in_message_mutex.release()

    def _messages_reconnect_reset(self):
        self._messages_reconnect_reset_out()
        self._messages_reconnect_reset_in()
R
Roger Light 已提交
2186

2187
    def _packet_queue(self, command, packet, mid, qos, info=None):
R
Roger Light 已提交
2188 2189 2190 2191 2192 2193
        mpkt = dict(
            command = command,
            mid = mid,
            qos = qos,
            pos = 0,
            to_process = len(packet),
2194 2195
            packet = packet,
            info = info)
R
Roger Light 已提交
2196 2197 2198

        self._out_packet_mutex.acquire()
        self._out_packet.append(mpkt)
2199
        if self._current_out_packet_mutex.acquire(False):
R
Roger Light 已提交
2200
            if self._current_out_packet is None and len(self._out_packet) > 0:
R
Roger Light 已提交
2201 2202 2203 2204
                self._current_out_packet = self._out_packet.pop(0)
            self._current_out_packet_mutex.release()
        self._out_packet_mutex.release()

2205 2206
        # Write a single byte to sockpairW (connected to sockpairR) to break
        # out of select() if in threaded mode.
2207 2208 2209
        try:
            self._sockpairW.send(sockpair_data)
        except socket.error as err:
2210
            if err.errno != EAGAIN:
2211
                raise
2212

R
Roger Light 已提交
2213
        if not self._in_callback and self._thread is None:
R
Roger Light 已提交
2214 2215
            return self.loop_write()
        else:
2216
            return MQTT_ERR_SUCCESS
R
Roger Light 已提交
2217 2218

    def _packet_handle(self):
R
Roger Light 已提交
2219
        cmd = self._in_packet['command']&0xF0
R
Roger Light 已提交
2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241
        if cmd == PINGREQ:
            return self._handle_pingreq()
        elif cmd == PINGRESP:
            return self._handle_pingresp()
        elif cmd == PUBACK:
            return self._handle_pubackcomp("PUBACK")
        elif cmd == PUBCOMP:
            return self._handle_pubackcomp("PUBCOMP")
        elif cmd == PUBLISH:
            return self._handle_publish()
        elif cmd == PUBREC:
            return self._handle_pubrec()
        elif cmd == PUBREL:
            return self._handle_pubrel()
        elif cmd == CONNACK:
            return self._handle_connack()
        elif cmd == SUBACK:
            return self._handle_suback()
        elif cmd == UNSUBACK:
            return self._handle_unsuback()
        else:
            # If we don't recognise the command, return an error straight away.
Y
yoch 已提交
2242
            self._easy_log(MQTT_LOG_ERR, "Error: Unrecognised command %s", cmd)
2243
            return MQTT_ERR_PROTOCOL
R
Roger Light 已提交
2244 2245 2246

    def _handle_pingreq(self):
        if self._strict_protocol:
R
Roger Light 已提交
2247
            if self._in_packet['remaining_length'] != 0:
2248
                return MQTT_ERR_PROTOCOL
R
Roger Light 已提交
2249

2250
        self._easy_log(MQTT_LOG_DEBUG, "Received PINGREQ")
R
Roger Light 已提交
2251 2252 2253 2254
        return self._send_pingresp()

    def _handle_pingresp(self):
        if self._strict_protocol:
R
Roger Light 已提交
2255
            if self._in_packet['remaining_length'] != 0:
2256
                return MQTT_ERR_PROTOCOL
R
Roger Light 已提交
2257

R
Roger Light 已提交
2258 2259
        # No longer waiting for a PINGRESP.
        self._ping_t = 0
2260 2261
        self._easy_log(MQTT_LOG_DEBUG, "Received PINGRESP")
        return MQTT_ERR_SUCCESS
R
Roger Light 已提交
2262 2263 2264

    def _handle_connack(self):
        if self._strict_protocol:
R
Roger Light 已提交
2265
            if self._in_packet['remaining_length'] != 2:
2266
                return MQTT_ERR_PROTOCOL
R
Roger Light 已提交
2267

R
Roger Light 已提交
2268
        if len(self._in_packet['packet']) != 2:
2269
            return MQTT_ERR_PROTOCOL
R
Roger Light 已提交
2270

2271
        (flags, result) = struct.unpack("!BB", self._in_packet['packet'])
2272
        if result == CONNACK_REFUSED_PROTOCOL_VERSION and self._protocol == MQTTv311:
2273 2274 2275 2276 2277
            self._easy_log(
                MQTT_LOG_DEBUG,
                "Received CONNACK (%s, %s), attempting downgrade to MQTT v3.1.",
                flags, result
            )
2278 2279 2280 2281
            # Downgrade to MQTT v3.1
            self._protocol = MQTTv31
            return self.reconnect()

2282 2283 2284
        if result == 0:
            self._state = mqtt_cs_connected

Y
yoch 已提交
2285
        self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK (%s, %s)", flags, result)
R
Roger Light 已提交
2286 2287 2288
        self._callback_mutex.acquire()
        if self.on_connect:
            self._in_callback = True
2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300

            if sys.version_info[0] < 3:
                argcount = self.on_connect.func_code.co_argcount
            else:
                argcount = self.on_connect.__code__.co_argcount

            if argcount == 3:
                self.on_connect(self, self._userdata, result)
            else:
                flags_dict = dict()
                flags_dict['session present'] = flags & 0x01
                self.on_connect(self, self._userdata, flags_dict, result)
R
Roger Light 已提交
2301 2302 2303
            self._in_callback = False
        self._callback_mutex.release()
        if result == 0:
2304 2305 2306
            rc = 0
            self._out_message_mutex.acquire()
            for m in self._out_messages:
2307
                m.timestamp = time_func()
2308
                if m.state == mqtt_ms_queued:
2309
                    self.loop_write() # Process outgoing messages that have just been queued up
2310 2311 2312 2313
                    self._out_message_mutex.release()
                    return MQTT_ERR_SUCCESS

                if m.qos == 0:
2314
                    self._in_callback = True # Don't call loop_write after _send_publish()
2315
                    rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
2316
                    self._in_callback = False
2317 2318 2319 2320
                    if rc != 0:
                        self._out_message_mutex.release()
                        return rc
                elif m.qos == 1:
R
Roger A. Light 已提交
2321
                    if m.state == mqtt_ms_publish:
2322 2323
                        self._inflight_messages = self._inflight_messages + 1
                        m.state = mqtt_ms_wait_for_puback
2324
                        self._in_callback = True # Don't call loop_write after _send_publish()
2325
                        rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
2326
                        self._in_callback = False
2327 2328 2329 2330
                        if rc != 0:
                            self._out_message_mutex.release()
                            return rc
                elif m.qos == 2:
R
Roger A. Light 已提交
2331
                    if m.state == mqtt_ms_publish:
2332 2333
                        self._inflight_messages = self._inflight_messages + 1
                        m.state = mqtt_ms_wait_for_pubrec
2334
                        self._in_callback = True # Don't call loop_write after _send_publish()
2335
                        rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
2336
                        self._in_callback = False
2337 2338 2339 2340 2341 2342
                        if rc != 0:
                            self._out_message_mutex.release()
                            return rc
                    elif m.state == mqtt_ms_resend_pubrel:
                        self._inflight_messages = self._inflight_messages + 1
                        m.state = mqtt_ms_wait_for_pubcomp
2343
                        self._in_callback = True # Don't call loop_write after _send_pubrel()
2344
                        rc = self._send_pubrel(m.mid, m.dup)
2345
                        self._in_callback = False
2346 2347 2348
                        if rc != 0:
                            self._out_message_mutex.release()
                            return rc
2349
                self.loop_write() # Process outgoing messages that have just been queued up
2350 2351
            self._out_message_mutex.release()
            return rc
R
Roger Light 已提交
2352
        elif result > 0 and result < 6:
2353
            return MQTT_ERR_CONN_REFUSED
R
Roger Light 已提交
2354
        else:
2355
            return MQTT_ERR_PROTOCOL
R
Roger Light 已提交
2356 2357

    def _handle_suback(self):
2358
        self._easy_log(MQTT_LOG_DEBUG, "Received SUBACK")
R
Roger Light 已提交
2359 2360
        pack_format = "!H" + str(len(self._in_packet['packet'])-2) + 's'
        (mid, packet) = struct.unpack(pack_format, self._in_packet['packet'])
R
Roger Light 已提交
2361 2362 2363 2364 2365 2366 2367 2368 2369 2370
        pack_format = "!" + "B"*len(packet)
        granted_qos = struct.unpack(pack_format, packet)

        self._callback_mutex.acquire()
        if self.on_subscribe:
            self._in_callback = True
            self.on_subscribe(self, self._userdata, mid, granted_qos)
            self._in_callback = False
        self._callback_mutex.release()

2371
        return MQTT_ERR_SUCCESS
R
Roger Light 已提交
2372 2373 2374 2375

    def _handle_publish(self):
        rc = 0

R
Roger Light 已提交
2376
        header = self._in_packet['command']
2377
        message = MQTTMessage()
R
Roger Light 已提交
2378 2379 2380 2381
        message.dup = (header & 0x08)>>3
        message.qos = (header & 0x06)>>1
        message.retain = (header & 0x01)

R
Roger Light 已提交
2382 2383
        pack_format = "!H" + str(len(self._in_packet['packet'])-2) + 's'
        (slen, packet) = struct.unpack(pack_format, self._in_packet['packet'])
R
Roger Light 已提交
2384 2385 2386 2387
        pack_format = '!' + str(slen) + 's' + str(len(packet)-slen) + 's'
        (message.topic, packet) = struct.unpack(pack_format, packet)

        if len(message.topic) == 0:
2388
            return MQTT_ERR_PROTOCOL
R
Roger Light 已提交
2389 2390 2391 2392 2393 2394 2395 2396 2397 2398

        if sys.version_info[0] >= 3:
            message.topic = message.topic.decode('utf-8')

        if message.qos > 0:
            pack_format = "!H" + str(len(packet)-2) + 's'
            (message.mid, packet) = struct.unpack(pack_format, packet)

        message.payload = packet

R
Roger Light 已提交
2399 2400
        self._easy_log(
            MQTT_LOG_DEBUG,
2401 2402 2403 2404
            "Received PUBLISH (d%d, q%d, r%d, m%d), '%s', ...  (%d bytes)",
            message.dup, message.qos, message.retain, message.mid,
            message.topic, len(message.payload)
        )
R
Roger Light 已提交
2405

2406
        message.timestamp = time_func()
R
Roger Light 已提交
2407
        if message.qos == 0:
R
Roger Light 已提交
2408
            self._handle_on_message(message)
2409
            return MQTT_ERR_SUCCESS
R
Roger Light 已提交
2410 2411
        elif message.qos == 1:
            rc = self._send_puback(message.mid)
R
Roger Light 已提交
2412
            self._handle_on_message(message)
R
Roger Light 已提交
2413 2414 2415
            return rc
        elif message.qos == 2:
            rc = self._send_pubrec(message.mid)
2416
            message.state = mqtt_ms_wait_for_pubrel
R
Roger Light 已提交
2417 2418 2419
            self._in_message_mutex.acquire()
            self._in_messages.append(message)
            self._in_message_mutex.release()
R
Roger Light 已提交
2420 2421
            return rc
        else:
2422
            return MQTT_ERR_PROTOCOL
R
Roger Light 已提交
2423 2424 2425

    def _handle_pubrel(self):
        if self._strict_protocol:
R
Roger Light 已提交
2426
            if self._in_packet['remaining_length'] != 2:
2427
                return MQTT_ERR_PROTOCOL
R
Roger Light 已提交
2428

R
Roger Light 已提交
2429
        if len(self._in_packet['packet']) != 2:
2430
            return MQTT_ERR_PROTOCOL
R
Roger Light 已提交
2431

R
Roger Light 已提交
2432
        mid = struct.unpack("!H", self._in_packet['packet'])
R
Roger Light 已提交
2433
        mid = mid[0]
Y
yoch 已提交
2434
        self._easy_log(MQTT_LOG_DEBUG, "Received PUBREL (Mid: %d)", mid)
R
Roger Light 已提交
2435

R
Roger Light 已提交
2436 2437 2438
        self._in_message_mutex.acquire()
        for i in range(len(self._in_messages)):
            if self._in_messages[i].mid == mid:
R
Roger Light 已提交
2439 2440 2441

                # Only pass the message on if we have removed it from the queue - this
                # prevents multiple callbacks for the same message.
R
Roger Light 已提交
2442
                self._handle_on_message(self._in_messages[i])
R
Roger Light 已提交
2443
                self._in_messages.pop(i)
2444 2445
                self._inflight_messages = self._inflight_messages - 1
                if self._max_inflight_messages > 0:
R
Roger Light 已提交
2446
                    self._out_message_mutex.acquire()
2447
                    rc = self._update_inflight()
R
Roger Light 已提交
2448
                    self._out_message_mutex.release()
2449
                    if rc != MQTT_ERR_SUCCESS:
R
Roger Light 已提交
2450
                        self._in_message_mutex.release()
2451 2452
                        return rc

R
Roger Light 已提交
2453
                self._in_message_mutex.release()
R
Roger Light 已提交
2454 2455
                return self._send_pubcomp(mid)

R
Roger Light 已提交
2456
        self._in_message_mutex.release()
2457 2458 2459 2460
        return MQTT_ERR_SUCCESS

    def _update_inflight(self):
        # Dont lock message_mutex here
R
Roger Light 已提交
2461
        for m in self._out_messages:
2462
            if self._inflight_messages < self._max_inflight_messages:
2463
                if m.qos > 0 and m.state == mqtt_ms_queued:
2464 2465
                    self._inflight_messages = self._inflight_messages + 1
                    if m.qos == 1:
2466
                        m.state = mqtt_ms_wait_for_puback
2467
                    elif m.qos == 2:
2468
                        m.state = mqtt_ms_wait_for_pubrec
2469 2470 2471 2472 2473
                    rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
                    if rc != 0:
                        return rc
            else:
                return MQTT_ERR_SUCCESS
2474
        return MQTT_ERR_SUCCESS
R
Roger Light 已提交
2475 2476 2477

    def _handle_pubrec(self):
        if self._strict_protocol:
R
Roger Light 已提交
2478
            if self._in_packet['remaining_length'] != 2:
2479
                return MQTT_ERR_PROTOCOL
R
Roger Light 已提交
2480

R
Roger Light 已提交
2481
        mid = struct.unpack("!H", self._in_packet['packet'])
R
Roger Light 已提交
2482
        mid = mid[0]
Y
yoch 已提交
2483
        self._easy_log(MQTT_LOG_DEBUG, "Received PUBREC (Mid: %d)", mid)
R
Roger Light 已提交
2484

R
Roger Light 已提交
2485 2486 2487
        self._out_message_mutex.acquire()
        for m in self._out_messages:
            if m.mid == mid:
2488
                m.state = mqtt_ms_wait_for_pubcomp
2489
                m.timestamp = time_func()
R
Roger Light 已提交
2490
                self._out_message_mutex.release()
R
Roger Light 已提交
2491
                return self._send_pubrel(mid, False)
R
Roger Light 已提交
2492

R
Roger Light 已提交
2493
        self._out_message_mutex.release()
2494
        return MQTT_ERR_SUCCESS
R
Roger Light 已提交
2495 2496 2497

    def _handle_unsuback(self):
        if self._strict_protocol:
R
Roger Light 已提交
2498
            if self._in_packet['remaining_length'] != 2:
2499
                return MQTT_ERR_PROTOCOL
R
Roger Light 已提交
2500

R
Roger Light 已提交
2501
        mid = struct.unpack("!H", self._in_packet['packet'])
R
Roger Light 已提交
2502
        mid = mid[0]
Y
yoch 已提交
2503
        self._easy_log(MQTT_LOG_DEBUG, "Received UNSUBACK (Mid: %d)", mid)
R
Roger Light 已提交
2504 2505 2506 2507 2508 2509
        self._callback_mutex.acquire()
        if self.on_unsubscribe:
            self._in_callback = True
            self.on_unsubscribe(self, self._userdata, mid)
            self._in_callback = False
        self._callback_mutex.release()
2510
        return MQTT_ERR_SUCCESS
R
Roger Light 已提交
2511

2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530
    def _do_on_publish(self, idx, mid):
        with self._callback_mutex:
            if self.on_publish:
                self._out_message_mutex.release()
                self._in_callback = True
                self.on_publish(self, self._userdata, mid)
                self._in_callback = False
                self._out_message_mutex.acquire()

        msg = self._out_messages.pop(idx)
        if msg.qos > 0:
            self._inflight_messages = self._inflight_messages - 1
            if self._max_inflight_messages > 0:
                rc = self._update_inflight()
                if rc != MQTT_ERR_SUCCESS:
                    return rc
        msg.info._set_as_published()
        return MQTT_ERR_SUCCESS

R
Roger Light 已提交
2531 2532
    def _handle_pubackcomp(self, cmd):
        if self._strict_protocol:
R
Roger Light 已提交
2533
            if self._in_packet['remaining_length'] != 2:
2534
                return MQTT_ERR_PROTOCOL
R
Roger Light 已提交
2535

R
Roger Light 已提交
2536
        mid = struct.unpack("!H", self._in_packet['packet'])
R
Roger Light 已提交
2537
        mid = mid[0]
Y
yoch 已提交
2538
        self._easy_log(MQTT_LOG_DEBUG, "Received %s (Mid: %d)", cmd, mid)
R
Roger Light 已提交
2539

R
Roger Light 已提交
2540 2541
        self._out_message_mutex.acquire()
        for i in range(len(self._out_messages)):
R
Roger Light 已提交
2542
            try:
R
Roger Light 已提交
2543
                if self._out_messages[i].mid == mid:
R
Roger Light 已提交
2544
                    # Only inform the client the message has been sent once.
2545
                    rc = self._do_on_publish(i, mid)
R
Roger Light 已提交
2546
                    self._out_message_mutex.release()
2547
                    return rc
R
Roger Light 已提交
2548 2549 2550 2551 2552
            except IndexError:
                # Have removed item so i>count.
                # Not really an error.
                pass

R
Roger Light 已提交
2553
        self._out_message_mutex.release()
2554
        return MQTT_ERR_SUCCESS
R
Roger Light 已提交
2555

R
Roger Light 已提交
2556 2557
    def _handle_on_message(self, message):
        self._callback_mutex.acquire()
2558
        matched = False
2559 2560 2561 2562 2563
        for callback in self._on_message_filtered.iter_match(message.topic):
            self._in_callback = True
            callback(self, self._userdata, message)
            self._in_callback = False
            matched = True
2564 2565

        if matched == False and self.on_message:
R
Roger Light 已提交
2566 2567 2568 2569 2570 2571
            self._in_callback = True
            self.on_message(self, self._userdata, message)
            self._in_callback = False

        self._callback_mutex.release()

R
Roger Light 已提交
2572
    def _thread_main(self):
2573
        self.loop_forever(retry_first_connection=True)
R
Roger Light 已提交
2574

2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591
    def _host_matches_cert(self, host, cert_host):
        if cert_host[0:2] == "*.":
            if cert_host.count("*") != 1:
                return False

            host_match = host.split(".", 1)[1]
            cert_match = cert_host.split(".", 1)[1]
            if host_match == cert_match:
                return True
            else:
                return False
        else:
            if host == cert_host:
                return True
            else:
                return False

Y
yoch 已提交
2592
    def _tls_match_hostname(self, sock):
2593
        try:
Y
yoch 已提交
2594
            cert = sock.getpeercert()
2595 2596 2597 2598 2599
        except AttributeError:
            # the getpeercert can throw Attribute error: object has no attribute 'peer_certificate'
            # Don't let that crash the whole client. See also: http://bugs.python.org/issue13721
            raise ssl.SSLError('Not connected')

2600 2601 2602
        san = cert.get('subjectAltName')
        if san:
            have_san_dns = False
2603
            for (key, value) in san:
2604 2605
                if key == 'DNS':
                    have_san_dns = True
2606
                    if self._host_matches_cert(self._host.lower(), value.lower()) == True:
2607 2608 2609 2610
                        return
                if key == 'IP Address':
                    have_san_dns = True
                    if value.lower() == self._host.lower():
2611 2612 2613 2614 2615 2616 2617
                        return

            if have_san_dns:
                # Only check subject if subjectAltName dns not found.
                raise ssl.SSLError('Certificate subject does not match remote hostname.')
        subject = cert.get('subject')
        if subject:
R
Roger Light 已提交
2618
            for ((key, value),) in subject:
2619
                if key == 'commonName':
2620
                    if self._host_matches_cert(self._host.lower(), value.lower()) == True:
2621 2622 2623 2624
                        return

        raise ssl.SSLError('Certificate subject does not match remote hostname.')

2625

2626
# Compatibility class for easy porting from mosquitto.py.
2627 2628 2629
class Mosquitto(Client):
    def __init__(self, client_id="", clean_session=True, userdata=None):
        super(Mosquitto, self).__init__(client_id, clean_session, userdata)
M
Milan Toth 已提交
2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673

class WebsocketWrapper:

    OPCODE_CONTINUATION = 0x0
    OPCODE_TEXT = 0x1
    OPCODE_BINARY = 0x2
    OPCODE_CONNCLOSE = 0x8
    OPCODE_PING = 0x9
    OPCODE_PONG = 0xa

    def __init__(self, socket, host, port, is_ssl):

        self.connected = False

        self._ssl = is_ssl
        self._host = host
        self._port = port
        self._socket = socket

        self._sendbuffer = bytearray()
        self._readbuffer = bytearray()

        self._requested_size = 0
        self._payload_head = 0
        self._readbuffer_head = 0

        self._do_handshake()

    def __del__(self):

        self._sendbuffer = None
        self._readbuffer = None

    def _do_handshake(self):

        sec_websocket_key = uuid.uuid4().bytes
        sec_websocket_key = base64.b64encode(sec_websocket_key)

        header = b"GET /mqtt HTTP/1.1\r\n" +\
                 b"Upgrade: websocket\r\n" +\
                 b"Connection: Upgrade\r\n" +\
                 b"Host: " + str(self._host).encode('utf-8') + b":" + str(self._port).encode('utf-8') + b"\r\n" +\
                 b"Origin: http://" + str(self._host).encode('utf-8') + b":" + str(self._port).encode('utf-8') + b"\r\n" +\
                 b"Sec-WebSocket-Key: " + sec_websocket_key + b"\r\n" +\
2674 2675
                 b"Sec-WebSocket-Version: 13\r\n" +\
                 b"Sec-WebSocket-Protocol: mqtt\r\n\r\n"
M
Milan Toth 已提交
2676

Y
yoch 已提交
2677
        self._socket.send(header)
M
Milan Toth 已提交
2678 2679 2680 2681 2682 2683

        has_secret = False
        has_upgrade = False

        while True:
            # read HTTP response header as lines
Y
yoch 已提交
2684
            byte = self._socket.recv(1)
M
Milan Toth 已提交
2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764

            self._readbuffer.extend(byte)
            # line end
            if byte == b"\n":
                if len(self._readbuffer) > 2:
                    # check upgrade
                    if b"connection" in str(self._readbuffer).lower().encode('utf-8'):
                        if b"upgrade" not in str(self._readbuffer).lower().encode('utf-8'):
                            raise ValueError("WebSocket handshake error, connection not upgraded")
                        else:
                            has_upgrade = True

                    # check key hash
                    if b"sec-websocket-accept" in str(self._readbuffer).lower().encode('utf-8'):
                        GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"

                        server_hash = self._readbuffer.decode('utf-8').split(": ", 1)[1]
                        server_hash = server_hash.strip().encode('utf-8')

                        client_hash = sec_websocket_key.decode('utf-8') + GUID
                        client_hash = hashlib.sha1(client_hash.encode('utf-8'))
                        client_hash = base64.b64encode(client_hash.digest())

                        if server_hash != client_hash:
                            raise ValueError("WebSocket handshake error, invalid secret key")
                        else:
                            has_secret = True
                else:
                    # ending linebreak
                    break

                # reset linebuffer
                self._readbuffer = bytearray()

            # connection reset
            elif not byte:
                raise ValueError("WebSocket handshake error")

        if not has_upgrade or not has_secret:
            raise ValueError("WebSocket handshake error")

        self._readbuffer = bytearray()
        self.connected = True

    def _create_frame(self, opcode, data, do_masking=1):

        header = bytearray()
        length = len(data)
        mask_key = bytearray([random.randint(0, 255), random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)])
        mask_flag = do_masking

        # 1 << 7 is the final flag, we don't send continuated data
        header.append(1 << 7 | opcode)

        if length < 126:
            header.append(mask_flag << 7 | length)

        elif length < 32768:
            header.append(mask_flag << 7 | 126)
            header += struct.pack("!H", length)

        elif length < 0x8000000000000001:
            header.append(mask_flag << 7 | 127)
            header += struct.pack("!Q", length)

        else:
            raise ValueError("Maximum payload size is 2^63")

        if mask_flag == 1:
            for index in range(length):
                data[index] ^= mask_key[index % 4]
            data = mask_key + data

        return header + data

    def _buffered_read(self, length):

        # try to recv and strore needed bytes
        if self._readbuffer_head + length > len(self._readbuffer):

Y
yoch 已提交
2765
            data = self._socket.recv(self._readbuffer_head + length - len(self._readbuffer))
M
Milan Toth 已提交
2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837

            if not data:
                raise socket.error(errno.ECONNABORTED, 0)
            else:
                self._readbuffer.extend(data)

        self._readbuffer_head += length
        return self._readbuffer[self._readbuffer_head-length:self._readbuffer_head]

    def _recv_impl(self, length):

        # try to decode websocket payload part from data
        try:

            self._readbuffer_head = 0

            result = None

            chunk_startindex = self._payload_head
            chunk_endindex = self._payload_head + length

            header1 = self._buffered_read(1)
            header2 = self._buffered_read(1)

            opcode = (header1[0] & 0x0f)
            maskbit = (header2[0] & 0x80) == 0x80
            lengthbits = (header2[0] & 0x7f)
            payload_length = lengthbits
            mask_key = None

            # read length
            if lengthbits == 0x7e:

                value = self._buffered_read(2)
                payload_length = struct.unpack("!H", value)[0]

            elif lengthbits == 0x7f:

                value = self._buffered_read(8)
                payload_length = struct.unpack("!Q", value)[0]

            # read mask
            if maskbit:

                mask_key = self._buffered_read(4)

            # if frame payload is shorter than the requested data, read only the possible part
            readindex = chunk_endindex
            if payload_length < readindex:
                readindex = payload_length

            if readindex > 0:

                # get payload chunk
                payload = self._buffered_read(readindex)

                # unmask only the needed part
                if maskbit:
                    for index in range(chunk_startindex, readindex):
                        payload[index] ^= mask_key[index % 4]

                result = payload[chunk_startindex:readindex]
                self._payload_head = readindex

            # check if full frame arrived and reset readbuffer and payloadhead if needed
            if readindex == payload_length:
                self._readbuffer = bytearray()
                self._payload_head = 0

                # respond to non-binary opcodes, their arrival is not guaranteed beacause of non-blocking sockets
                if opcode == WebsocketWrapper.OPCODE_CONNCLOSE:
                    frame = self._create_frame(WebsocketWrapper.OPCODE_CONNCLOSE, payload, 0)
Y
yoch 已提交
2838
                    self._socket.send(frame)
M
Milan Toth 已提交
2839 2840 2841

                if opcode == WebsocketWrapper.OPCODE_PING:
                    frame = self._create_frame(WebsocketWrapper.OPCODE_PONG, payload, 0)
Y
yoch 已提交
2842
                    self._socket.send(frame)
M
Milan Toth 已提交
2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868

            if opcode == WebsocketWrapper.OPCODE_BINARY:
                return result
            else:
                raise socket.error(errno.EAGAIN, 0)

        except socket.error as err:

            if err.errno == errno.ECONNABORTED:
                self.connected = False
                return b''
            else:
                # no more data
                raise

    def _send_impl(self, data):

        # if previous frame was sent successfully
        if len(self._sendbuffer) == 0:

            # create websocket frame
            frame = self._create_frame(WebsocketWrapper.OPCODE_BINARY, bytearray(data))
            self._sendbuffer.extend(frame)
            self._requested_size = len(data)

        # try to write out as much as possible
Y
yoch 已提交
2869
        length = self._socket.send(self._sendbuffer)
M
Milan Toth 已提交
2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899

        self._sendbuffer = self._sendbuffer[length:]

        if len(self._sendbuffer) == 0:
            # buffer sent out completely, return with payload's size
            return self._requested_size
        else:
            # couldn't send whole data, request the same data again with 0 as sent length
            return 0

    def recv(self, length):
        return self._recv_impl(length)

    def read(self, length):
        return self._recv_impl(length)

    def send(self, data):
        return self._send_impl(data)

    def write(self, data):
        return self._send_impl(data)

    def close(self):
        self._socket.close()

    def fileno(self):
        return self._socket.fileno()

    def setblocking(self,flag):
        self._socket.setblocking(flag)