未验证 提交 871358d4 编写于 作者: B Ben Darnell 提交者: GitHub

Merge pull request #2251 from bdarnell/read_into

iostream: Add read_into
......@@ -254,6 +254,8 @@ class BaseIOStream(object):
self._read_buffer = bytearray()
self._read_buffer_pos = 0
self._read_buffer_size = 0
self._user_read_buffer = False
self._after_user_read_buffer = None
self._write_buffer = _StreamBuffer()
self._total_write_index = 0
self._total_write_done_index = 0
......@@ -300,13 +302,18 @@ class BaseIOStream(object):
"""
raise NotImplementedError()
def read_from_fd(self):
def read_from_fd(self, buf):
"""Attempts to read from the underlying file.
Returns ``None`` if there was nothing to read (the socket
returned `~errno.EWOULDBLOCK` or equivalent), otherwise
returns the data. When possible, should return no more than
``self.read_chunk_size`` bytes at a time.
Reads up to ``len(buf)`` bytes, storing them in the buffer.
Returns the number of bytes read. Returns None if there was
nothing to read (the socket returned `~errno.EWOULDBLOCK` or
equivalent), and zero on EOF.
.. versionchanged:: 5.0
Interface redesigned to take a buffer and return a number
of bytes instead of a freshly-allocated object.
"""
raise NotImplementedError()
......@@ -415,6 +422,50 @@ class BaseIOStream(object):
raise
return future
def read_into(self, buf, callback=None, partial=False):
"""Asynchronously read a number of bytes.
``buf`` must be a writable buffer into which data will be read.
If a callback is given, it will be run with the number of read
bytes as an argument; if not, this method returns a `.Future`.
If ``partial`` is true, the callback is run as soon as any bytes
have been read. Otherwise, it is run when the ``buf`` has been
entirely filled with read data.
.. versionadded:: 5.0
"""
future = self._set_read_callback(callback)
# First copy data already in read buffer
available_bytes = self._read_buffer_size
n = len(buf)
if available_bytes >= n:
end = self._read_buffer_pos + n
buf[:] = memoryview(self._read_buffer)[self._read_buffer_pos:end]
del self._read_buffer[:end]
self._after_user_read_buffer = self._read_buffer
elif available_bytes > 0:
buf[:available_bytes] = memoryview(self._read_buffer)[self._read_buffer_pos:]
# Set up the supplied buffer as our temporary read buffer.
# The original (if it had any data remaining) has been
# saved for later.
self._user_read_buffer = True
self._read_buffer = buf
self._read_buffer_pos = 0
self._read_buffer_size = available_bytes
self._read_bytes = n
self._read_partial = partial
try:
self._try_inline_read()
except:
if future is not None:
future.add_done_callback(lambda f: f.exception())
raise
return future
def read_until_close(self, callback=None, streaming_callback=None):
"""Asynchronously reads all data from the socket until it is closed.
......@@ -762,6 +813,15 @@ class BaseIOStream(object):
return self._read_future
def _run_read_callback(self, size, streaming):
if self._user_read_buffer:
self._read_buffer = self._after_user_read_buffer or bytearray()
self._after_user_read_buffer = None
self._read_buffer_pos = 0
self._read_buffer_size = len(self._read_buffer)
self._user_read_buffer = False
result = size
else:
result = self._consume(size)
if streaming:
callback = self._streaming_callback
else:
......@@ -771,10 +831,11 @@ class BaseIOStream(object):
assert callback is None
future = self._read_future
self._read_future = None
future.set_result(self._consume(size))
future.set_result(result)
if callback is not None:
assert (self._read_future is None) or streaming
self._run_callback(callback, self._consume(size))
self._run_callback(callback, result)
else:
# If we scheduled a callback, we will add the error listener
# afterwards. If we didn't, we have to do it now.
......@@ -820,31 +881,44 @@ class BaseIOStream(object):
to read (i.e. the read returns EWOULDBLOCK or equivalent). On
error closes the socket and raises an exception.
"""
while True:
try:
chunk = self.read_from_fd()
except (socket.error, IOError, OSError) as e:
if errno_from_exception(e) == errno.EINTR:
continue
# ssl.SSLError is a subclass of socket.error
if self._is_connreset(e):
# Treat ECONNRESET as a connection close rather than
# an error to minimize log spam (the exception will
# be available on self.error for apps that care).
try:
while True:
try:
if self._user_read_buffer:
buf = memoryview(self._read_buffer)[self._read_buffer_size:]
else:
buf = bytearray(self.read_chunk_size)
bytes_read = self.read_from_fd(buf)
except (socket.error, IOError, OSError) as e:
if errno_from_exception(e) == errno.EINTR:
continue
# ssl.SSLError is a subclass of socket.error
if self._is_connreset(e):
# Treat ECONNRESET as a connection close rather than
# an error to minimize log spam (the exception will
# be available on self.error for apps that care).
self.close(exc_info=e)
return
self.close(exc_info=e)
return
self.close(exc_info=e)
raise
break
if chunk is None:
return 0
self._read_buffer += chunk
self._read_buffer_size += len(chunk)
raise
break
if bytes_read is None:
return 0
elif bytes_read == 0:
self.close()
return 0
if not self._user_read_buffer:
self._read_buffer += memoryview(buf)[:bytes_read]
self._read_buffer_size += bytes_read
finally:
# Break the reference to buf so we don't waste a chunk's worth of
# memory in case an exception hangs on to our stack frame.
buf = None
if self._read_buffer_size > self.max_buffer_size:
gen_log.error("Reached maximum read buffer size")
self.close()
raise StreamBufferFullError("Reached maximum read buffer size")
return len(chunk)
return bytes_read
def _run_streaming_callback(self):
if self._streaming_callback is not None and self._read_buffer_size:
......@@ -1106,18 +1180,16 @@ class IOStream(BaseIOStream):
socket.SO_ERROR)
return socket.error(errno, os.strerror(errno))
def read_from_fd(self):
def read_from_fd(self, buf):
try:
chunk = self.socket.recv(self.read_chunk_size)
return self.socket.recv_into(buf)
except socket.error as e:
if e.args[0] in _ERRNO_WOULDBLOCK:
return None
else:
raise
if not chunk:
self.close()
return None
return chunk
finally:
buf = None
def write_to_fd(self, data):
try:
......@@ -1528,35 +1600,29 @@ class SSLIOStream(IOStream):
# See https://github.com/tornadoweb/tornado/pull/2008
del data
def read_from_fd(self):
if self._ssl_accepting:
# If the handshake hasn't finished yet, there can't be anything
# to read (attempting to read may or may not raise an exception
# depending on the SSL version)
return None
def read_from_fd(self, buf):
try:
# SSLSocket objects have both a read() and recv() method,
# while regular sockets only have recv().
# The recv() method blocks (at least in python 2.6) if it is
# called when there is nothing to read, so we have to use
# read() instead.
chunk = self.socket.read(self.read_chunk_size)
except ssl.SSLError as e:
# SSLError is a subclass of socket.error, so this except
# block must come first.
if e.args[0] == ssl.SSL_ERROR_WANT_READ:
return None
else:
raise
except socket.error as e:
if e.args[0] in _ERRNO_WOULDBLOCK:
if self._ssl_accepting:
# If the handshake hasn't finished yet, there can't be anything
# to read (attempting to read may or may not raise an exception
# depending on the SSL version)
return None
else:
raise
if not chunk:
self.close()
return None
return chunk
try:
return self.socket.recv_into(buf)
except ssl.SSLError as e:
# SSLError is a subclass of socket.error, so this except
# block must come first.
if e.args[0] == ssl.SSL_ERROR_WANT_READ:
return None
else:
raise
except socket.error as e:
if e.args[0] in _ERRNO_WOULDBLOCK:
return None
else:
raise
finally:
buf = None
def _is_connreset(self, e):
if isinstance(e, ssl.SSLError) and e.args[0] == ssl.SSL_ERROR_EOF:
......@@ -1592,9 +1658,9 @@ class PipeIOStream(BaseIOStream):
# See https://github.com/tornadoweb/tornado/pull/2008
del data
def read_from_fd(self):
def read_from_fd(self, buf):
try:
chunk = self._fio.read(self.read_chunk_size)
return self._fio.readinto(buf)
except (IOError, OSError) as e:
if errno_from_exception(e) == errno.EBADF:
# If the writing half of a pipe is closed, select will
......@@ -1603,13 +1669,8 @@ class PipeIOStream(BaseIOStream):
return None
else:
raise
if chunk is None:
# Read would block
return None
if not chunk:
self.close()
return None
return chunk
finally:
buf = None
def doctests():
......
......@@ -12,6 +12,7 @@ from tornado.testing import AsyncHTTPTestCase, AsyncHTTPSTestCase, AsyncTestCase
from tornado.test.util import unittest, skipIfNonUnix, refusing_port, skipPypy3V58
from tornado.web import RequestHandler, Application
import errno
import hashlib
import logging
import os
import platform
......@@ -674,6 +675,143 @@ class TestReadWriteMixin(object):
rs.close()
ws.close()
def test_read_into(self):
rs, ws = self.make_iostream_pair()
def sleep_some():
self.io_loop.run_sync(lambda: gen.sleep(0.05))
try:
buf = bytearray(10)
rs.read_into(buf, callback=self.stop)
ws.write(b"hello")
sleep_some()
self.assertTrue(rs.reading())
ws.write(b"world!!")
data = self.wait()
self.assertFalse(rs.reading())
self.assertEqual(data, 10)
self.assertEqual(bytes(buf), b"helloworld")
# Existing buffer is fed into user buffer
rs.read_into(buf, callback=self.stop)
sleep_some()
self.assertTrue(rs.reading())
ws.write(b"1234567890")
data = self.wait()
self.assertFalse(rs.reading())
self.assertEqual(data, 10)
self.assertEqual(bytes(buf), b"!!12345678")
# Existing buffer can satisfy read immediately
buf = bytearray(4)
ws.write(b"abcdefghi")
rs.read_into(buf, callback=self.stop)
data = self.wait()
self.assertEqual(data, 4)
self.assertEqual(bytes(buf), b"90ab")
rs.read_bytes(7, self.stop)
data = self.wait()
self.assertEqual(data, b"cdefghi")
finally:
ws.close()
rs.close()
def test_read_into_partial(self):
rs, ws = self.make_iostream_pair()
def sleep_some():
self.io_loop.run_sync(lambda: gen.sleep(0.05))
try:
# Partial read
buf = bytearray(10)
rs.read_into(buf, callback=self.stop, partial=True)
ws.write(b"hello")
data = self.wait()
self.assertFalse(rs.reading())
self.assertEqual(data, 5)
self.assertEqual(bytes(buf), b"hello\0\0\0\0\0")
# Full read despite partial=True
ws.write(b"world!1234567890")
rs.read_into(buf, callback=self.stop, partial=True)
data = self.wait()
self.assertEqual(data, 10)
self.assertEqual(bytes(buf), b"world!1234")
# Existing buffer can satisfy read immediately
rs.read_into(buf, callback=self.stop, partial=True)
data = self.wait()
self.assertEqual(data, 6)
self.assertEqual(bytes(buf), b"5678901234")
finally:
ws.close()
rs.close()
def test_read_into_zero_bytes(self):
rs, ws = self.make_iostream_pair()
try:
buf = bytearray()
fut = rs.read_into(buf)
self.assertEqual(fut.result(), 0)
finally:
ws.close()
rs.close()
def test_many_mixed_reads(self):
# Stress buffer handling when going back and forth between
# read_bytes() (using an internal buffer) and read_into()
# (using a user-allocated buffer).
r = random.Random(42)
nbytes = 1000000
rs, ws = self.make_iostream_pair()
produce_hash = hashlib.sha1()
consume_hash = hashlib.sha1()
@gen.coroutine
def produce():
remaining = nbytes
while remaining > 0:
size = r.randint(1, min(1000, remaining))
data = os.urandom(size)
produce_hash.update(data)
yield ws.write(data)
remaining -= size
assert remaining == 0
@gen.coroutine
def consume():
remaining = nbytes
while remaining > 0:
if r.random() > 0.5:
# read_bytes()
size = r.randint(1, min(1000, remaining))
data = yield rs.read_bytes(size)
consume_hash.update(data)
remaining -= size
else:
# read_into()
size = r.randint(1, min(1000, remaining))
buf = bytearray(size)
n = yield rs.read_into(buf)
assert n == size
consume_hash.update(buf)
remaining -= size
assert remaining == 0
@gen.coroutine
def main():
yield [produce(), consume()]
assert produce_hash.hexdigest() == consume_hash.hexdigest()
try:
self.io_loop.run_sync(main)
finally:
ws.close()
rs.close()
class TestIOStreamMixin(TestReadWriteMixin):
def _make_server_iostream(self, connection, **kwargs):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册