提交 97bfad04 编写于 作者: R Roger Light

Remove _out_packet_mutex and _current_out_packet_mutex.

And convert the _out_packet queue use to thread safe.
上级 7bf4aba1
......@@ -17,6 +17,8 @@ v1.6.0 - 2021-xx-xx
- Raise exceptions when attempting to set MQTT v5 properties to forbidden
values. Closes #586.
- Callbacks can now be updated from within a callback.
- Remove _out_packet_mutex and _current_out_packet_mutex and convert the
_out_packet queue use to thread safe.
v1.5.1 - 2020-09-22
......
......@@ -646,7 +646,6 @@ class Client(object):
"to_process": 0,
"pos": 0}
self._out_packet = collections.deque()
self._current_out_packet = None
self._last_msg_in = time_func()
self._last_msg_out = time_func()
self._reconnect_min_delay = 1
......@@ -676,8 +675,6 @@ class Client(object):
self._proxy = {}
self._in_callback_mutex = threading.Lock()
self._callback_mutex = threading.RLock()
self._out_packet_mutex = threading.Lock()
self._current_out_packet_mutex = threading.RLock()
self._msgtime_mutex = threading.Lock()
self._out_message_mutex = threading.RLock()
self._in_message_mutex = threading.Lock()
......@@ -1110,11 +1107,7 @@ class Client(object):
"to_process": 0,
"pos": 0}
with self._out_packet_mutex:
self._out_packet = collections.deque()
with self._current_out_packet_mutex:
self._current_out_packet = None
self._out_packet = collections.deque()
with self._msgtime_mutex:
self._last_msg_in = time_func()
......@@ -1197,15 +1190,12 @@ class Client(object):
if timeout < 0.0:
raise ValueError('Invalid timeout.')
with self._current_out_packet_mutex:
with self._out_packet_mutex:
if self._current_out_packet is None and len(self._out_packet) > 0:
self._current_out_packet = self._out_packet.popleft()
if self._current_out_packet:
wlist = [self._sock]
else:
wlist = []
try:
packet = self._out_packet.popleft()
self._out_packet.appendleft(packet)
wlist = [self._sock]
except IndexError:
wlist = []
# used to check if there are any bytes left in the (SSL) socket
pending_bytes = 0
......@@ -1645,18 +1635,14 @@ class Client(object):
if self._sock is None:
return MQTT_ERR_NO_CONN
max_packets = len(self._out_packet) + 1
if max_packets < 1:
max_packets = 1
try:
for _ in range(0, max_packets):
rc = self._packet_write()
if rc > 0:
return self._loop_rc_handle(rc)
elif rc == MQTT_ERR_AGAIN:
return MQTT_ERR_SUCCESS
return MQTT_ERR_SUCCESS
rc = self._packet_write()
if rc == MQTT_ERR_AGAIN:
return MQTT_ERR_SUCCESS
elif rc > 0:
return self._loop_rc_handle(rc)
else:
return MQTT_ERR_SUCCESS
finally:
if self.want_write():
self._call_socket_register_write()
......@@ -1667,9 +1653,11 @@ class Client(object):
"""Call to determine if there is network data waiting to be written.
Useful if you are calling select() yourself rather than using loop().
"""
if self._current_out_packet or len(self._out_packet) > 0:
try:
packet = self._out_packet.popleft()
self._out_packet.appendleft(packet)
return True
else:
except IndexError:
return False
def loop_misc(self):
......@@ -1831,10 +1819,8 @@ class Client(object):
# We don't need to worry about locking here, because we've
# either called loop_forever() when in single threaded mode, or
# in multi threaded mode when loop_stop() has been called and
# so no other threads can access _current_out_packet,
# _out_packet or _messages.
# so no other threads can access _out_packet or _messages.
if (self._thread_terminate is True
and self._current_out_packet is None
and len(self._out_packet) == 0
and len(self._out_messages) == 0):
rc = 1
......@@ -2482,22 +2468,23 @@ class Client(object):
return rc
def _packet_write(self):
self._current_out_packet_mutex.acquire()
while self._current_out_packet:
packet = self._current_out_packet
while True:
try:
packet = self._out_packet.popleft()
except IndexError:
return MQTT_ERR_SUCCESS
try:
write_length = self._sock_send(
packet['packet'][packet['pos']:])
except (AttributeError, ValueError):
self._current_out_packet_mutex.release()
self._out_packet.appendleft(packet)
return MQTT_ERR_SUCCESS
except WouldBlockError:
self._current_out_packet_mutex.release()
self._out_packet.appendleft(packet)
return MQTT_ERR_AGAIN
except socket.error as err:
self._current_out_packet_mutex.release()
self._out_packet.appendleft(packet)
self._easy_log(
MQTT_LOG_ERR, 'failed to receive on socket: %s', err)
return MQTT_ERR_CONN_LOST
......@@ -2525,26 +2512,19 @@ class Client(object):
packet['info']._set_as_published()
if (packet['command'] & 0xF0) == DISCONNECT:
self._current_out_packet_mutex.release()
with self._msgtime_mutex:
self._last_msg_out = time_func()
self._do_on_disconnect(MQTT_ERR_SUCCESS)
self._sock_close()
return MQTT_ERR_SUCCESS
with self._out_packet_mutex:
if len(self._out_packet) > 0:
self._current_out_packet = self._out_packet.popleft()
else:
self._current_out_packet = None
else:
# We haven't finished with this packet
self._out_packet.appendleft(packet)
else:
break
self._current_out_packet_mutex.release()
with self._msgtime_mutex:
self._last_msg_out = time_func()
......@@ -3032,12 +3012,7 @@ class Client(object):
'packet': packet,
'info': info}
with self._out_packet_mutex:
self._out_packet.append(mpkt)
if self._current_out_packet_mutex.acquire(False):
if self._current_out_packet is None and len(self._out_packet) > 0:
self._current_out_packet = self._out_packet.popleft()
self._current_out_packet_mutex.release()
self._out_packet.append(mpkt)
# Write a single byte to sockpairW (connected to sockpairR) to break
# out of select() if in threaded mode.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册