提交 cfef8f5a 编写于 作者: B barrierye

update profile

上级 fffdd846
...@@ -347,12 +347,12 @@ class Op(object): ...@@ -347,12 +347,12 @@ class Op(object):
def stop(self): def stop(self):
self._run = False self._run = False
def start(self): def start(self, concurrency_idx):
self._run = True self._run = True
while self._run: while self._run:
_profiler.record("{}-get_0".format(self._name)) _profiler.record("{}{}-get_0".format(self._name, concurrency_idx))
input_data = self._input.front(self._name) input_data = self._input.front(self._name)
_profiler.record("{}-get_1".format(self._name)) _profiler.record("{}{}-get_1".format(self._name, concurrency_idx))
data_id = None data_id = None
logging.debug(self._log("input_data: {}".format(input_data))) logging.debug(self._log("input_data: {}".format(input_data)))
if isinstance(input_data, dict): if isinstance(input_data, dict):
...@@ -361,18 +361,20 @@ class Op(object): ...@@ -361,18 +361,20 @@ class Op(object):
else: else:
data_id = input_data.id data_id = input_data.id
_profiler.record("{}-prep_0".format(self._name)) _profiler.record("{}{}-prep_0".format(self._name, concurrency_idx))
data = self.preprocess(input_data) data = self.preprocess(input_data)
_profiler.record("{}-prep_1".format(self._name)) _profiler.record("{}{}-prep_1".format(self._name, concurrency_idx))
if self.with_serving(): if self.with_serving():
_profiler.record("{}-midp_0".format(self._name)) _profiler.record("{}{}-midp_0".format(self._name,
concurrency_idx))
data = self.midprocess(data) data = self.midprocess(data)
_profiler.record("{}-midp_1".format(self._name)) _profiler.record("{}{}-midp_1".format(self._name,
concurrency_idx))
_profiler.record("{}-postp_0".format(self._name)) _profiler.record("{}{}-postp_0".format(self._name, concurrency_idx))
output_data = self.postprocess(data) output_data = self.postprocess(data)
_profiler.record("{}-postp_1".format(self._name)) _profiler.record("{}{}-postp_1".format(self._name, concurrency_idx))
if not isinstance(output_data, if not isinstance(output_data,
python_service_channel_pb2.ChannelData): python_service_channel_pb2.ChannelData):
...@@ -382,10 +384,10 @@ class Op(object): ...@@ -382,10 +384,10 @@ class Op(object):
format(type(output_data)))) format(type(output_data))))
output_data.id = data_id output_data.id = data_id
_profiler.record("{}-push_0".format(self._name)) _profiler.record("{}{}-push_0".format(self._name, concurrency_idx))
for channel in self._outputs: for channel in self._outputs:
channel.push(output_data, self._name) channel.push(output_data, self._name)
_profiler.record("{}-push_1".format(self._name)) _profiler.record("{}{}-push_1".format(self._name, concurrency_idx))
def _log(self, info_str): def _log(self, info_str):
return "[{}] {}".format(self._name, info_str) return "[{}] {}".format(self._name, info_str)
...@@ -549,8 +551,8 @@ class PyServer(object): ...@@ -549,8 +551,8 @@ class PyServer(object):
self._out_channel = out_channel.pop() self._out_channel = out_channel.pop()
self.gen_desc() self.gen_desc()
def _op_start_wrapper(self, op): def _op_start_wrapper(self, op, concurrency_idx):
return op.start() return op.start(concurrency_idx)
def _run_ops(self): def _run_ops(self):
for op in self._ops: for op in self._ops:
...@@ -560,7 +562,7 @@ class PyServer(object): ...@@ -560,7 +562,7 @@ class PyServer(object):
for c in range(op_concurrency): for c in range(op_concurrency):
# th = multiprocessing.Process( # th = multiprocessing.Process(
th = threading.Thread( th = threading.Thread(
target=self._op_start_wrapper, args=(op, )) target=self._op_start_wrapper, args=(op, c))
th.start() th.start()
self._op_threads.append(th) self._op_threads.append(th)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册