提交 fd3ea999 编写于 作者: B barrierye

add channel-size config

上级 b56b933a
...@@ -204,7 +204,7 @@ class PipelineServer(object): ...@@ -204,7 +204,7 @@ class PipelineServer(object):
raise Exception("response_op can only have one previous op.") raise Exception("response_op can only have one previous op.")
self._response_op = response_op self._response_op = response_op
def _topo_sort(self, response_op): def _topo_sort(self, response_op, channel_size):
if response_op is None: if response_op is None:
raise Exception("response_op has not been set.") raise Exception("response_op has not been set.")
...@@ -286,9 +286,11 @@ class PipelineServer(object): ...@@ -286,9 +286,11 @@ class PipelineServer(object):
def gen_channel(name_gen): def gen_channel(name_gen):
channel = None channel = None
if self._use_multithread: if self._use_multithread:
channel = ThreadChannel(name=name_gen.next()) channel = ThreadChannel(
name=name_gen.next(), maxsize=channel_size)
else: else:
channel = ProcessChannel(self._manager, name=name_gen.next()) channel = ProcessChannel(
self._manager, name=name_gen.next(), maxsize=channel_size)
return channel return channel
def gen_virtual_op(name_gen): def gen_virtual_op(name_gen):
...@@ -398,6 +400,7 @@ class PipelineServer(object): ...@@ -398,6 +400,7 @@ class PipelineServer(object):
self._client_type = yml_config.get('client_type', 'brpc') self._client_type = yml_config.get('client_type', 'brpc')
self._use_multithread = yml_config.get('use_multithread', True) self._use_multithread = yml_config.get('use_multithread', True)
profile = yml_config.get('profile', False) profile = yml_config.get('profile', False)
channel_size = yml_config.get('channel_size', 0)
if not self._use_multithread: if not self._use_multithread:
self._manager = multiprocessing.Manager() self._manager = multiprocessing.Manager()
...@@ -407,7 +410,7 @@ class PipelineServer(object): ...@@ -407,7 +410,7 @@ class PipelineServer(object):
_profiler.enable(profile) _profiler.enable(profile)
input_channel, output_channel, self._pack_func, self._unpack_func = self._topo_sort( input_channel, output_channel, self._pack_func, self._unpack_func = self._topo_sort(
self._response_op) self._response_op, channel_size)
self._in_channel = input_channel self._in_channel = input_channel
self._out_channel = output_channel self._out_channel = output_channel
for op in self._actual_ops: for op in self._actual_ops:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册