未验证 提交 f28b2453 编写于 作者: B Ben Darnell 提交者: GitHub

Merge pull request #3029 from minrk/reuse-selector

separate SelectorThread into its own object
......@@ -52,7 +52,7 @@ _T = TypeVar("_T")
# Collection of selector thread event loops to shut down on exit.
_selector_loops = set() # type: Set[AddThreadSelectorEventLoop]
_selector_loops = set() # type: Set[SelectorThread]
def _atexit_callback() -> None:
......@@ -427,53 +427,18 @@ class AnyThreadEventLoopPolicy(_BasePolicy): # type: ignore
return loop
class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
"""Wrap an event loop to add implementations of the ``add_reader`` method family.
class SelectorThread:
"""Define ``add_reader`` methods to be called in a background select thread.
Instances of this class start a second thread to run a selector.
This thread is completely hidden from the user; all callbacks are
run on the wrapped event loop's thread.
This class is used automatically by Tornado; applications should not need
to refer to it directly.
It is safe to wrap any event loop with this class, although it only makes sense
for event loops that do not implement the ``add_reader`` family of methods
themselves (i.e. ``WindowsProactorEventLoop``)
Closing the ``AddThreadSelectorEventLoop`` also closes the wrapped event loop.
This thread is completely hidden from the user;
all callbacks are run on the wrapped event loop's thread.
Typically used via ``AddThreadSelectorEventLoop``,
but can be attached to a running asyncio loop.
"""
# This class is a __getattribute__-based proxy. All attributes other than those
# in this set are proxied through to the underlying loop.
MY_ATTRIBUTES = {
"_consume_waker",
"_select_cond",
"_select_args",
"_closing_selector",
"_thread",
"_handle_event",
"_readers",
"_real_loop",
"_start_select",
"_run_select",
"_handle_select",
"_wake_selector",
"_waker_r",
"_waker_w",
"_writers",
"add_reader",
"add_writer",
"close",
"remove_reader",
"remove_writer",
}
def __getattribute__(self, name: str) -> Any:
if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES:
return super().__getattribute__(name)
return getattr(self._real_loop, name)
_closed = False
def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
self._real_loop = real_loop
......@@ -519,6 +484,8 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
self._waker_w.close()
def close(self) -> None:
if self._closed:
return
with self._select_cond:
self._closing_selector = True
self._select_cond.notify()
......@@ -527,7 +494,7 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
_selector_loops.discard(self)
self._waker_r.close()
self._waker_w.close()
self._real_loop.close()
self._closed = True
def _wake_selector(self) -> None:
try:
......@@ -661,3 +628,63 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
return False
self._wake_selector()
return True
class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
"""Wrap an event loop to add implementations of the ``add_reader`` method family.
Instances of this class start a second thread to run a selector.
This thread is completely hidden from the user; all callbacks are
run on the wrapped event loop's thread.
This class is used automatically by Tornado; applications should not need
to refer to it directly.
It is safe to wrap any event loop with this class, although it only makes sense
for event loops that do not implement the ``add_reader`` family of methods
themselves (i.e. ``WindowsProactorEventLoop``)
Closing the ``AddThreadSelectorEventLoop`` also closes the wrapped event loop.
"""
# This class is a __getattribute__-based proxy. All attributes other than those
# in this set are proxied through to the underlying loop.
MY_ATTRIBUTES = {
"_real_loop",
"_selector",
"add_reader",
"add_writer",
"close",
"remove_reader",
"remove_writer",
}
def __getattribute__(self, name: str) -> Any:
if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES:
return super().__getattribute__(name)
return getattr(self._real_loop, name)
def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
self._real_loop = real_loop
self._selector = SelectorThread(real_loop)
def close(self) -> None:
self._selector.close()
self._real_loop.close()
def add_reader(
self, fd: "_FileDescriptorLike", callback: Callable[..., None], *args: Any
) -> None:
return self._selector.add_reader(fd, callback, *args)
def add_writer(
self, fd: "_FileDescriptorLike", callback: Callable[..., None], *args: Any
) -> None:
return self._selector.add_writer(fd, callback, *args)
def remove_reader(self, fd: "_FileDescriptorLike") -> bool:
return self._selector.remove_reader(fd)
def remove_writer(self, fd: "_FileDescriptorLike") -> bool:
return self._selector.remove_writer(fd)
......@@ -21,6 +21,7 @@ from tornado.platform.asyncio import (
AsyncIOLoop,
to_asyncio_future,
AnyThreadEventLoopPolicy,
AddThreadSelectorEventLoop,
)
from tornado.testing import AsyncTestCase, gen_test
......@@ -105,6 +106,11 @@ class AsyncIOLoopTest(AsyncTestCase):
42,
)
def test_add_thread_close_idempotent(self):
loop = AddThreadSelectorEventLoop(asyncio.get_event_loop()) # type: ignore
loop.close()
loop.close()
class LeakTest(unittest.TestCase):
def setUp(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册