未验证 提交 62e43f6b 编写于 作者: B Ben Darnell 提交者: GitHub

Merge pull request #2307 from bdarnell/work

concurrent,httputil,websocket: small bug fixes
......@@ -421,6 +421,10 @@ def run_on_executor(*args, **kwargs):
.. versionchanged:: 5.0
Always uses the current IOLoop instead of ``self.io_loop``.
.. versionchanged:: 5.1
Returns a `.Future` compatible with ``await`` instead of a
`concurrent.futures.Future`.
"""
def run_on_executor_decorator(fn):
executor = kwargs.get("executor", "executor")
......@@ -428,12 +432,14 @@ def run_on_executor(*args, **kwargs):
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
callback = kwargs.pop("callback", None)
future = getattr(self, executor).submit(fn, self, *args, **kwargs)
async_future = Future()
conc_future = getattr(self, executor).submit(fn, self, *args, **kwargs)
chain_future(conc_future, async_future)
if callback:
from tornado.ioloop import IOLoop
IOLoop.current().add_future(
future, lambda future: callback(future.result()))
return future
async_future, lambda future: callback(future.result()))
return async_future
return wrapper
if args and kwargs:
raise ValueError("cannot combine positional and keyword args")
......
......@@ -519,12 +519,7 @@ class HTTP1Connection(httputil.HTTPConnection):
# RFC 7230 section allows for both CRLF and bare LF.
eol = data.find("\n")
start_line = data[:eol].rstrip("\r")
try:
headers = httputil.HTTPHeaders.parse(data[eol:])
except ValueError:
# probably form split() if there was no ':' in the line
raise httputil.HTTPInputError("Malformed HTTP headers: %r" %
data[eol:100])
headers = httputil.HTTPHeaders.parse(data[eol:])
return start_line, headers
def _read_body(self, code, headers, delegate):
......
......@@ -183,11 +183,16 @@ class HTTPHeaders(collections.MutableMapping):
"""
if line[0].isspace():
# continuation of a multi-line header
if self._last_key is None:
raise HTTPInputError("first header line cannot start with whitespace")
new_part = ' ' + line.lstrip()
self._as_list[self._last_key][-1] += new_part
self._dict[self._last_key] += new_part
else:
name, value = line.split(":", 1)
try:
name, value = line.split(":", 1)
except ValueError:
raise HTTPInputError("no colon in header line")
self.add(name, value.strip())
@classmethod
......@@ -197,6 +202,12 @@ class HTTPHeaders(collections.MutableMapping):
>>> h = HTTPHeaders.parse("Content-Type: text/html\\r\\nContent-Length: 42\\r\\n")
>>> sorted(h.items())
[('Content-Length', '42'), ('Content-Type', 'text/html')]
.. versionchanged:: 5.1
Raises `HTTPInputError` on malformed headers instead of a
mix of `KeyError`, and `ValueError`.
"""
h = cls()
for line in _CRLF_RE.split(headers):
......
......@@ -31,7 +31,7 @@ from tornado.log import app_log
from tornado import stack_context
from tornado.tcpserver import TCPServer
from tornado.testing import AsyncTestCase, ExpectLog, bind_unused_port, gen_test
from tornado.test.util import unittest
from tornado.test.util import unittest, skipBefore35, exec_test
try:
......@@ -429,6 +429,26 @@ class RunOnExecutorTest(AsyncTestCase):
answer = yield o.f()
self.assertEqual(answer, 42)
@skipBefore35
@gen_test
def test_async_await(self):
class Object(object):
def __init__(self):
self.executor = futures.thread.ThreadPoolExecutor(1)
@run_on_executor()
def f(self):
return 42
o = Object()
namespace = exec_test(globals(), locals(), """
async def f():
answer = await o.f()
return answer
""")
result = yield namespace['f']()
self.assertEqual(result, 42)
if __name__ == '__main__':
unittest.main()
......@@ -425,7 +425,7 @@ class HTTPServerRawTest(AsyncHTTPTestCase):
self.wait()
def test_malformed_headers(self):
with ExpectLog(gen_log, '.*Malformed HTTP headers'):
with ExpectLog(gen_log, '.*Malformed HTTP message.*no colon in header line'):
self.stream.write(b'GET / HTTP/1.0\r\nasdf\r\n\r\n')
self.io_loop.add_timeout(datetime.timedelta(seconds=0.05),
self.stop)
......
......@@ -4,6 +4,7 @@ from __future__ import absolute_import, division, print_function
from tornado.httputil import (
url_concat, parse_multipart_form_data, HTTPHeaders, format_timestamp,
HTTPServerRequest, parse_request_start_line, parse_cookie, qs_to_qsl,
HTTPInputError,
)
from tornado.escape import utf8, native_str
from tornado.util import PY3
......@@ -283,6 +284,13 @@ Foo: even
("Foo", "bar baz"),
("Foo", "even more lines")])
def test_malformed_continuation(self):
# If the first line starts with whitespace, it's a
# continuation line with nothing to continue, so reject it
# (with a proper error).
data = " Foo: bar"
self.assertRaises(HTTPInputError, HTTPHeaders.parse, data)
def test_unicode_newlines(self):
# Ensure that only \r\n is recognized as a header separator, and not
# the other newline-like unicode characters.
......
......@@ -621,6 +621,34 @@ class ClientPeriodicPingTest(WebSocketBaseTestCase):
# TODO: test that the connection gets closed if ping responses stop.
class ManualPingTest(WebSocketBaseTestCase):
def get_app(self):
class PingHandler(TestWebSocketHandler):
def on_ping(self, data):
self.write_message(data, binary=isinstance(data, bytes))
self.close_future = Future()
return Application([
('/', PingHandler, dict(close_future=self.close_future)),
])
@gen_test
def test_manual_ping(self):
ws = yield self.ws_connect('/')
self.assertRaises(ValueError, ws.ping, 'a' * 126)
ws.ping('hello')
resp = yield ws.read_message()
# on_ping always sees bytes.
self.assertEqual(resp, b'hello')
ws.ping(b'binary hello')
resp = yield ws.read_message()
self.assertEqual(resp, b'binary hello')
yield self.close(ws)
class MaxMessageSizeTest(WebSocketBaseTestCase):
def get_app(self):
self.close_future = Future()
......
......@@ -312,8 +312,23 @@ class WebSocketHandler(tornado.web.RequestHandler):
"""
raise NotImplementedError
def ping(self, data):
"""Send ping frame to the remote end."""
def ping(self, data=b''):
"""Send ping frame to the remote end.
The data argument allows a small amount of data (up to 125
bytes) to be sent as a part of the ping message. Note that not
all websocket implementations expose this data to
applications.
Consider using the ``websocket_ping_interval`` application
setting instead of sending pings manually.
.. versionchanged:: 5.1
The data argument is now optional.
"""
data = utf8(data)
if self.ws_connection is None:
raise WebSocketClosedError()
self.ws_connection.write_ping(data)
......@@ -755,12 +770,19 @@ class WebSocketProtocol13(WebSocketProtocol):
**self._get_compressor_options(other_side, agreed_parameters, compression_options))
def _write_frame(self, fin, opcode, data, flags=0):
data_len = len(data)
if opcode & 0x8:
# All control frames MUST have a payload length of 125
# bytes or less and MUST NOT be fragmented.
if not fin:
raise ValueError("control frames may not be fragmented")
if data_len > 125:
raise ValueError("control frame payloads may not exceed 125 bytes")
if fin:
finbit = self.FIN
else:
finbit = 0
frame = struct.pack("B", finbit | opcode | flags)
data_len = len(data)
if self.mask_outgoing:
mask_bit = 0x80
else:
......@@ -1204,6 +1226,25 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection):
else:
self.read_queue.append(message)
def ping(self, data=b''):
"""Send ping frame to the remote end.
The data argument allows a small amount of data (up to 125
bytes) to be sent as a part of the ping message. Note that not
all websocket implementations expose this data to
applications.
Consider using the ``ping_interval`` argument to
`websocket_connect` instead of sending pings manually.
.. versionadded:: 5.1
"""
data = utf8(data)
if self.protocol is None:
raise WebSocketClosedError()
self.protocol.write_ping(data)
def on_pong(self, data):
pass
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册