提交 d2028c34 编写于 作者: B barrierye

only executes init_op once in multi-thread version

上级 9aeccd3f
...@@ -174,23 +174,30 @@ class Op(object): ...@@ -174,23 +174,30 @@ class Op(object):
p = multiprocessing.Process( p = multiprocessing.Process(
target=self._run, target=self._run,
args=(concurrency_idx, self._get_input_channel(), args=(concurrency_idx, self._get_input_channel(),
self._get_output_channels(), client_type)) self._get_output_channels(), client_type, False))
p.start() p.start()
proces.append(p) proces.append(p)
return proces return proces
def start_with_thread(self, client_type): def start_with_thread(self, client_type):
# load user resources
try:
self.init_op()
except Exception as e:
_LOGGER.error(log(e))
os._exit(-1)
threads = [] threads = []
for concurrency_idx in range(self.concurrency): for concurrency_idx in range(self.concurrency):
t = threading.Thread( t = threading.Thread(
target=self._run, target=self._run,
args=(concurrency_idx, self._get_input_channel(), args=(concurrency_idx, self._get_input_channel(),
self._get_output_channels(), client_type)) self._get_output_channels(), client_type, True))
t.start() t.start()
threads.append(t) threads.append(t)
return threads return threads
def load_user_resources(self): def init_op(self):
pass pass
def _run_preprocess(self, parsed_data, data_id, log_func): def _run_preprocess(self, parsed_data, data_id, log_func):
...@@ -309,8 +316,8 @@ class Op(object): ...@@ -309,8 +316,8 @@ class Op(object):
data_id=data_id) data_id=data_id)
return output_data, error_channeldata return output_data, error_channeldata
def _run(self, concurrency_idx, input_channel, output_channels, def _run(self, concurrency_idx, input_channel, output_channels, client_type,
client_type): use_multithread):
def get_log_func(op_info_prefix): def get_log_func(op_info_prefix):
def log_func(info_str): def log_func(info_str):
return "{} {}".format(op_info_prefix, info_str) return "{} {}".format(op_info_prefix, info_str)
...@@ -329,13 +336,18 @@ class Op(object): ...@@ -329,13 +336,18 @@ class Op(object):
self._server_endpoints, self._fetch_names) self._server_endpoints, self._fetch_names)
if client is not None: if client is not None:
client_predict_handler = client.predict client_predict_handler = client.predict
# load user resources
self.load_user_resources()
except Exception as e: except Exception as e:
_LOGGER.error(log(e)) _LOGGER.error(log(e))
os._exit(-1) os._exit(-1)
if not use_multithread:
# load user resources
try:
self.init_op()
except Exception as e:
_LOGGER.error(log(e))
os._exit(-1)
self._is_run = True self._is_run = True
while self._is_run: while self._is_run:
self._profiler_record("{}-get#{}_0".format(op_info_prefix, tid)) self._profiler_record("{}-get#{}_0".format(op_info_prefix, tid))
...@@ -399,7 +411,7 @@ class RequestOp(Op): ...@@ -399,7 +411,7 @@ class RequestOp(Op):
name="#G", input_ops=[], concurrency=concurrency) name="#G", input_ops=[], concurrency=concurrency)
# load user resources # load user resources
try: try:
self.load_user_resources() self.init_op()
except Exception as e: except Exception as e:
_LOGGER.error(log(e)) _LOGGER.error(log(e))
os._exit(-1) os._exit(-1)
...@@ -424,7 +436,7 @@ class ResponseOp(Op): ...@@ -424,7 +436,7 @@ class ResponseOp(Op):
name="#R", input_ops=input_ops, concurrency=concurrency) name="#R", input_ops=input_ops, concurrency=concurrency)
# load user resources # load user resources
try: try:
self.load_user_resources() self.init_op()
except Exception as e: except Exception as e:
_LOGGER.error(log(e)) _LOGGER.error(log(e))
os._exit(-1) os._exit(-1)
...@@ -490,8 +502,8 @@ class VirtualOp(Op): ...@@ -490,8 +502,8 @@ class VirtualOp(Op):
channel.add_producer(op_name) channel.add_producer(op_name)
self._outputs.append(channel) self._outputs.append(channel)
def _run(self, concurrency_idx, input_channel, output_channels, def _run(self, concurrency_idx, input_channel, output_channels, client_type,
client_type): use_multithread):
def get_log_func(op_info_prefix): def get_log_func(op_info_prefix):
def log_func(info_str): def log_func(info_str):
return "{} {}".format(op_info_prefix, info_str) return "{} {}".format(op_info_prefix, info_str)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册