未验证 提交 c3b8ee8d 编写于 作者: B Ben Darnell 提交者: GitHub

Merge pull request #3272 from bdarnell/asyncio-canary

asyncio: Use a canary task to detect end of event loop
......@@ -64,11 +64,12 @@ def _atexit_callback() -> None:
loop._waker_w.send(b"a")
except BlockingIOError:
pass
# If we don't join our (daemon) thread here, we may get a deadlock
# during interpreter shutdown. I don't really understand why. This
# deadlock happens every time in CI (both travis and appveyor) but
# I've never been able to reproduce locally.
loop._thread.join()
if loop._thread is not None:
# If we don't join our (daemon) thread here, we may get a deadlock
# during interpreter shutdown. I don't really understand why. This
# deadlock happens every time in CI (both travis and appveyor) but
# I've never been able to reproduce locally.
loop._thread.join()
_selector_loops.clear()
......@@ -443,24 +444,25 @@ class SelectorThread:
def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
self._real_loop = real_loop
# Create a thread to run the select system call. We manage this thread
# manually so we can trigger a clean shutdown from an atexit hook. Note
# that due to the order of operations at shutdown, only daemon threads
# can be shut down in this way (non-daemon threads would require the
# introduction of a new hook: https://bugs.python.org/issue41962)
self._select_cond = threading.Condition()
self._select_args = (
None
) # type: Optional[Tuple[List[_FileDescriptorLike], List[_FileDescriptorLike]]]
self._closing_selector = False
self._thread = threading.Thread(
name="Tornado selector",
daemon=True,
target=self._run_select,
self._thread = None # type: Optional[threading.Thread]
self._thread_manager_handle = self._thread_manager()
async def thread_manager_anext() -> None:
# the anext builtin wasn't added until 3.10. We just need to iterate
# this generator one step.
await self._thread_manager_handle.__anext__()
# When the loop starts, start the thread. Not too soon because we can't
# clean up if we get to this point but the event loop is closed without
# starting.
self._real_loop.call_soon(
lambda: self._real_loop.create_task(thread_manager_anext())
)
self._thread.start()
# Start the select loop once the loop is started.
self._real_loop.call_soon(self._start_select)
self._readers = {} # type: Dict[_FileDescriptorLike, Callable]
self._writers = {} # type: Dict[_FileDescriptorLike, Callable]
......@@ -473,16 +475,6 @@ class SelectorThread:
_selector_loops.add(self)
self.add_reader(self._waker_r, self._consume_waker)
def __del__(self) -> None:
# If the top-level application code uses asyncio interfaces to
# start and stop the event loop, no objects created in Tornado
# can get a clean shutdown notification. If we're just left to
# be GC'd, we must explicitly close our sockets to avoid
# logging warnings.
_selector_loops.discard(self)
self._waker_r.close()
self._waker_w.close()
def close(self) -> None:
if self._closed:
return
......@@ -490,13 +482,42 @@ class SelectorThread:
self._closing_selector = True
self._select_cond.notify()
self._wake_selector()
self._thread.join()
if self._thread is not None:
self._thread.join()
_selector_loops.discard(self)
self.remove_reader(self._waker_r)
self._waker_r.close()
self._waker_w.close()
self._closed = True
async def _thread_manager(self) -> typing.AsyncGenerator[None, None]:
# Create a thread to run the select system call. We manage this thread
# manually so we can trigger a clean shutdown from an atexit hook. Note
# that due to the order of operations at shutdown, only daemon threads
# can be shut down in this way (non-daemon threads would require the
# introduction of a new hook: https://bugs.python.org/issue41962)
self._thread = threading.Thread(
name="Tornado selector",
daemon=True,
target=self._run_select,
)
self._thread.start()
self._start_select()
try:
# The presense of this yield statement means that this coroutine
# is actually an asynchronous generator, which has a special
# shutdown protocol. We wait at this yield point until the
# event loop's shutdown_asyncgens method is called, at which point
# we will get a GeneratorExit exception and can shut down the
# selector thread.
yield
except GeneratorExit:
self.close()
raise
def _wake_selector(self) -> None:
if self._closed:
return
try:
self._waker_w.send(b"a")
except BlockingIOError:
......
......@@ -11,6 +11,8 @@
# under the License.
import asyncio
import threading
import time
import unittest
import warnings
......@@ -157,6 +159,55 @@ class LeakTest(unittest.TestCase):
self.assertEqual(new_count, 1)
class SelectorThreadLeakTest(unittest.TestCase):
# These tests are only relevant on windows, but they should pass anywhere.
def setUp(self):
# As a precaution, ensure that we've run an event loop at least once
# so if it spins up any singleton threads they're already there.
asyncio.run(self.dummy_tornado_coroutine())
self.orig_thread_count = threading.active_count()
def assert_no_thread_leak(self):
# For some reason we see transient failures here, but I haven't been able
# to catch it to identify which thread is causing it. Whatever thread it
# is, it appears to quickly clean up on its own, so just retry a few times.
deadline = time.time() + 1
while time.time() < deadline:
threads = list(threading.enumerate())
if len(threads) == self.orig_thread_count:
break
time.sleep(0.1)
self.assertEqual(self.orig_thread_count, len(threads), threads)
async def dummy_tornado_coroutine(self):
# Just access the IOLoop to initialize the selector thread.
IOLoop.current()
def test_asyncio_run(self):
for i in range(10):
# asyncio.run calls shutdown_asyncgens for us.
asyncio.run(self.dummy_tornado_coroutine())
self.assert_no_thread_leak()
def test_asyncio_manual(self):
for i in range(10):
loop = asyncio.new_event_loop()
loop.run_until_complete(self.dummy_tornado_coroutine())
# Without this step, we'd leak the thread.
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
self.assert_no_thread_leak()
def test_tornado(self):
for i in range(10):
# The IOLoop interfaces are aware of the selector thread and
# (synchronously) shut it down.
loop = IOLoop(make_current=False)
loop.run_sync(self.dummy_tornado_coroutine)
loop.close()
self.assert_no_thread_leak()
class AnyThreadEventLoopPolicyTest(unittest.TestCase):
def setUp(self):
self.orig_policy = asyncio.get_event_loop_policy()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册