提交 3cca32d5 编写于 作者: B Ben Darnell

asyncio: Manage the selector thread with an async generator

Async generators have a special shutdown protocol which allows
us to detect the end of the event loop and stop our thread.
This lets us clean up the thread reliably when the event loop
is started/stopped via the tornado IOLoop interfaces (which
explicitly know about the selector thread), or when the
latest asyncio interfaces are used (asyncio.run or manually
calling shutdown_asyncgens).

The thread is still leaked when older versions of the asyncio
interfaces are used (loop.close *without* shutdown_asyncgens), but
I've been unable to find a solution that does not print leak warnings
even in the event of a clean shutdown. Use of shutdown_asyncgens is
now effectively required for apps combining asyncio and tornado.
This is unfortunate since leaking a thread is relatively expensive
compared to the usual consequences of failing to call
shutdown_asyncgens, but it seems to be the best we can do.

Fixes #3173
上级 f28b2453
......@@ -64,11 +64,12 @@ def _atexit_callback() -> None:
loop._waker_w.send(b"a")
except BlockingIOError:
pass
# If we don't join our (daemon) thread here, we may get a deadlock
# during interpreter shutdown. I don't really understand why. This
# deadlock happens every time in CI (both travis and appveyor) but
# I've never been able to reproduce locally.
loop._thread.join()
if loop._thread is not None:
# If we don't join our (daemon) thread here, we may get a deadlock
# during interpreter shutdown. I don't really understand why. This
# deadlock happens every time in CI (both travis and appveyor) but
# I've never been able to reproduce locally.
loop._thread.join()
_selector_loops.clear()
......@@ -443,24 +444,25 @@ class SelectorThread:
def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
self._real_loop = real_loop
# Create a thread to run the select system call. We manage this thread
# manually so we can trigger a clean shutdown from an atexit hook. Note
# that due to the order of operations at shutdown, only daemon threads
# can be shut down in this way (non-daemon threads would require the
# introduction of a new hook: https://bugs.python.org/issue41962)
self._select_cond = threading.Condition()
self._select_args = (
None
) # type: Optional[Tuple[List[_FileDescriptorLike], List[_FileDescriptorLike]]]
self._closing_selector = False
self._thread = threading.Thread(
name="Tornado selector",
daemon=True,
target=self._run_select,
self._thread = None # type: Optional[threading.Thread]
self._thread_manager_handle = self._thread_manager()
async def thread_manager_anext() -> None:
# the anext builtin wasn't added until 3.10. We just need to iterate
# this generator one step.
await self._thread_manager_handle.__anext__()
# When the loop starts, start the thread. Not too soon because we can't
# clean up if we get to this point but the event loop is closed without
# starting.
self._real_loop.call_soon(
lambda: self._real_loop.create_task(thread_manager_anext())
)
self._thread.start()
# Start the select loop once the loop is started.
self._real_loop.call_soon(self._start_select)
self._readers = {} # type: Dict[_FileDescriptorLike, Callable]
self._writers = {} # type: Dict[_FileDescriptorLike, Callable]
......@@ -473,16 +475,6 @@ class SelectorThread:
_selector_loops.add(self)
self.add_reader(self._waker_r, self._consume_waker)
def __del__(self) -> None:
# If the top-level application code uses asyncio interfaces to
# start and stop the event loop, no objects created in Tornado
# can get a clean shutdown notification. If we're just left to
# be GC'd, we must explicitly close our sockets to avoid
# logging warnings.
_selector_loops.discard(self)
self._waker_r.close()
self._waker_w.close()
def close(self) -> None:
if self._closed:
return
......@@ -490,13 +482,42 @@ class SelectorThread:
self._closing_selector = True
self._select_cond.notify()
self._wake_selector()
self._thread.join()
if self._thread is not None:
self._thread.join()
_selector_loops.discard(self)
self.remove_reader(self._waker_r)
self._waker_r.close()
self._waker_w.close()
self._closed = True
async def _thread_manager(self) -> typing.AsyncGenerator[None, None]:
# Create a thread to run the select system call. We manage this thread
# manually so we can trigger a clean shutdown from an atexit hook. Note
# that due to the order of operations at shutdown, only daemon threads
# can be shut down in this way (non-daemon threads would require the
# introduction of a new hook: https://bugs.python.org/issue41962)
self._thread = threading.Thread(
name="Tornado selector",
daemon=True,
target=self._run_select,
)
self._thread.start()
self._start_select()
try:
# The presense of this yield statement means that this coroutine
# is actually an asynchronous generator, which has a special
# shutdown protocol. We wait at this yield point until the
# event loop's shutdown_asyncgens method is called, at which point
# we will get a GeneratorExit exception and can shut down the
# selector thread.
yield
except GeneratorExit:
self.close()
raise
def _wake_selector(self) -> None:
if self._closed:
return
try:
self._waker_w.send(b"a")
except BlockingIOError:
......
......@@ -11,6 +11,8 @@
# under the License.
import asyncio
import threading
import time
import unittest
import warnings
......@@ -157,6 +159,55 @@ class LeakTest(unittest.TestCase):
self.assertEqual(new_count, 1)
class SelectorThreadLeakTest(unittest.TestCase):
# These tests are only relevant on windows, but they should pass anywhere.
def setUp(self):
# As a precaution, ensure that we've run an event loop at least once
# so if it spins up any singleton threads they're already there.
asyncio.run(self.dummy_tornado_coroutine())
self.orig_thread_count = threading.active_count()
def assert_no_thread_leak(self):
# For some reason we see transient failures here, but I haven't been able
# to catch it to identify which thread is causing it. Whatever thread it
# is, it appears to quickly clean up on its own, so just retry a few times.
deadline = time.time() + 1
while time.time() < deadline:
threads = list(threading.enumerate())
if len(threads) == self.orig_thread_count:
break
time.sleep(0.1)
self.assertEqual(self.orig_thread_count, len(threads), threads)
async def dummy_tornado_coroutine(self):
# Just access the IOLoop to initialize the selector thread.
IOLoop.current()
def test_asyncio_run(self):
for i in range(10):
# asyncio.run calls shutdown_asyncgens for us.
asyncio.run(self.dummy_tornado_coroutine())
self.assert_no_thread_leak()
def test_asyncio_manual(self):
for i in range(10):
loop = asyncio.new_event_loop()
loop.run_until_complete(self.dummy_tornado_coroutine())
# Without this step, we'd leak the thread.
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
self.assert_no_thread_leak()
def test_tornado(self):
for i in range(10):
# The IOLoop interfaces are aware of the selector thread and
# (synchronously) shut it down.
loop = IOLoop(make_current=False)
loop.run_sync(self.dummy_tornado_coroutine)
loop.close()
self.assert_no_thread_leak()
class AnyThreadEventLoopPolicyTest(unittest.TestCase):
def setUp(self):
self.orig_policy = asyncio.get_event_loop_policy()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册