提交 e304057f 编写于 作者: B barrierye

add channel-size config

上级 0f8899a3
......@@ -204,7 +204,7 @@ class PipelineServer(object):
raise Exception("response_op can only have one previous op.")
self._response_op = response_op
def _topo_sort(self, response_op):
def _topo_sort(self, response_op, channel_size):
if response_op is None:
raise Exception("response_op has not been set.")
......@@ -286,9 +286,11 @@ class PipelineServer(object):
def gen_channel(name_gen):
channel = None
if self._use_multithread:
channel = ThreadChannel(name=name_gen.next())
channel = ThreadChannel(
name=name_gen.next(), maxsize=channel_size)
else:
channel = ProcessChannel(self._manager, name=name_gen.next())
channel = ProcessChannel(
self._manager, name=name_gen.next(), maxsize=channel_size)
return channel
def gen_virtual_op(name_gen):
......@@ -398,6 +400,7 @@ class PipelineServer(object):
self._client_type = yml_config.get('client_type', 'brpc')
self._use_multithread = yml_config.get('use_multithread', True)
profile = yml_config.get('profile', False)
channel_size = yml_config.get('channel_size', 0)
if not self._use_multithread:
self._manager = multiprocessing.Manager()
......@@ -407,7 +410,7 @@ class PipelineServer(object):
_profiler.enable(profile)
input_channel, output_channel, self._pack_func, self._unpack_func = self._topo_sort(
self._response_op)
self._response_op, channel_size)
self._in_channel = input_channel
self._out_channel = output_channel
for op in self._actual_ops:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册