diff --git a/src/paho/mqtt/client.py b/src/paho/mqtt/client.py index 74cc97766248cfb8247f1d03a40f4d3fa9926c65..e33a7237b62afdf911276650e4627a8cfd39f100 100644 --- a/src/paho/mqtt/client.py +++ b/src/paho/mqtt/client.py @@ -180,10 +180,6 @@ class WebsocketConnectionError(ValueError): pass -class WouldBlockError(Exception): - pass - - def error_string(mqtt_errno): """Return the error string associated with an mqtt error number.""" if mqtt_errno == MQTT_ERR_SUCCESS: @@ -284,9 +280,8 @@ def _socketpair_compat(): sock1.setblocking(0) try: sock1.connect(("127.0.0.1", port)) - except socket.error as err: - if err.errno != errno.EINPROGRESS and err.errno != errno.EWOULDBLOCK and err.errno != EAGAIN: - raise + except BlockingIOError: + pass sock2, address = listensock.accept() sock2.setblocking(0) listensock.close() @@ -623,29 +618,23 @@ class Client(object): def _sock_recv(self, bufsize): try: return self._sock.recv(bufsize) - except socket.error as err: - if self._ssl and err.errno == ssl.SSL_ERROR_WANT_READ: - raise WouldBlockError() - if self._ssl and err.errno == ssl.SSL_ERROR_WANT_WRITE: - self._call_socket_register_write() - raise WouldBlockError() - if err.errno == EAGAIN: - raise WouldBlockError() - raise + except ssl.SSLWantReadError: + raise BlockingIOError + except ssl.SSLWantWriteError: + self._call_socket_register_write() + raise BlockingIOError def _sock_send(self, buf): try: return self._sock.send(buf) - except socket.error as err: - if self._ssl and err.errno == ssl.SSL_ERROR_WANT_READ: - raise WouldBlockError() - if self._ssl and err.errno == ssl.SSL_ERROR_WANT_WRITE: - self._call_socket_register_write() - raise WouldBlockError() - if err.errno == EAGAIN: - self._call_socket_register_write() - raise WouldBlockError() - raise + except ssl.SSLWantReadError: + raise BlockingIOError + except ssl.SSLWantWriteError: + self._call_socket_register_write() + raise BlockingIOError + except BlockingIOError: + self._call_socket_register_write() + raise BlockingIOError def _sock_close(self): """Close the connection to the server.""" @@ -1145,9 +1134,8 @@ class Client(object): # Clear sockpairR - only ever a single byte written. try: self._sockpairR.recv(1) - except socket.error as err: - if err.errno != EAGAIN: - raise + except BlockingIOError: + pass if self._sock in socklist[1]: rc = self.loop_write(max_packets) @@ -1700,7 +1688,7 @@ class Client(object): retry_first_connection: Should the first connection attempt be retried on failure. This is independent of the reconnect_on_failure setting. - Raises socket.error on first connection failures unless retry_first_connection=True + Raises OSError/WebsocketConnectionError on first connection failures unless retry_first_connection=True """ run = True @@ -1712,7 +1700,7 @@ class Client(object): if self._state == mqtt_cs_connect_async: try: self.reconnect() - except (socket.error, OSError, WebsocketConnectionError): + except (OSError, WebsocketConnectionError): self._handle_on_connect_fail() if not retry_first_connection: raise @@ -1749,7 +1737,7 @@ class Client(object): else: try: self.reconnect() - except (socket.error, OSError, WebsocketConnectionError): + except (OSError, WebsocketConnectionError): self._handle_on_connect_fail() self._easy_log( MQTT_LOG_DEBUG, "Connection failed, retrying") @@ -2339,9 +2327,9 @@ class Client(object): if self._in_packet['command'] == 0: try: command = self._sock_recv(1) - except WouldBlockError: + except BlockingIOError: return MQTT_ERR_AGAIN - except socket.error as err: + except ConnectionError as err: self._easy_log( MQTT_LOG_ERR, 'failed to receive on socket: %s', err) return MQTT_ERR_CONN_LOST @@ -2358,9 +2346,9 @@ class Client(object): while True: try: byte = self._sock_recv(1) - except WouldBlockError: + except BlockingIOError: return MQTT_ERR_AGAIN - except socket.error as err: + except ConnectionError as err: self._easy_log( MQTT_LOG_ERR, 'failed to receive on socket: %s', err) return MQTT_ERR_CONN_LOST @@ -2388,9 +2376,9 @@ class Client(object): while self._in_packet['to_process'] > 0: try: data = self._sock_recv(self._in_packet['to_process']) - except WouldBlockError: + except BlockingIOError: return MQTT_ERR_AGAIN - except socket.error as err: + except ConnectionError as err: self._easy_log( MQTT_LOG_ERR, 'failed to receive on socket: %s', err) return MQTT_ERR_CONN_LOST @@ -2437,10 +2425,10 @@ class Client(object): except (AttributeError, ValueError): self._out_packet.appendleft(packet) return MQTT_ERR_SUCCESS - except WouldBlockError: + except BlockingIOError: self._out_packet.appendleft(packet) return MQTT_ERR_AGAIN - except socket.error as err: + except ConnectionError as err: self._out_packet.appendleft(packet) self._easy_log( MQTT_LOG_ERR, 'failed to receive on socket: %s', err) @@ -2975,9 +2963,8 @@ class Client(object): # out of select() if in threaded mode. try: self._sockpairW.send(sockpair_data) - except socket.error as err: - if err.errno != EAGAIN: - raise + except BlockingIOError: + pass # If we have an external event loop registered, use that instead # of calling loop_write() directly. @@ -3807,19 +3794,19 @@ class WebsocketWrapper(object): def _buffered_read(self, length): - # try to recv and strore needed bytes + # try to recv and store needed bytes wanted_bytes = length - (len(self._readbuffer) - self._readbuffer_head) if wanted_bytes > 0: data = self._socket.recv(wanted_bytes) if not data: - raise socket.error(errno.ECONNABORTED, 0) + raise ConnectionAbortedError else: self._readbuffer.extend(data) if len(data) < wanted_bytes: - raise socket.error(EAGAIN, 0) + raise BlockingIOError self._readbuffer_head += length return self._readbuffer[self._readbuffer_head - length:self._readbuffer_head] @@ -3901,16 +3888,11 @@ class WebsocketWrapper(object): and payload_length > 0: return result else: - raise socket.error(EAGAIN, 0) + raise BlockingIOError - except socket.error as err: - - if err.errno == errno.ECONNABORTED: - self.connected = False - return b'' - else: - # no more data - raise + except ConnectionError: + self.connected = False + return b'' def _send_impl(self, data):