提交 dc6be190 编写于 作者: B barrierye

move profiler into dag executor

上级 17380468
...@@ -23,7 +23,7 @@ lp_wrapper = lp(client.predict) ...@@ -23,7 +23,7 @@ lp_wrapper = lp(client.predict)
words = 'i am very sad | 0' words = 'i am very sad | 0'
for i in range(1): for i in range(10):
fetch_map = lp_wrapper(feed_dict={"words": words}, fetch=["prediction"]) fetch_map = lp_wrapper(feed_dict={"words": words}, fetch=["prediction"])
print(fetch_map) print(fetch_map)
......
...@@ -26,21 +26,34 @@ import logging ...@@ -26,21 +26,34 @@ import logging
from .operator import Op, RequestOp, ResponseOp, VirtualOp from .operator import Op, RequestOp, ResponseOp, VirtualOp
from .channel import ThreadChannel, ProcessChannel, ChannelData, ChannelDataEcode, ChannelDataType from .channel import ThreadChannel, ProcessChannel, ChannelData, ChannelDataEcode, ChannelDataType
from .profiler import TimeProfiler
from .util import NameGenerator from .util import NameGenerator
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger()
class DAGExecutor(object): class DAGExecutor(object):
def __init__(self, response_op, profiler, use_multithread, retry, def __init__(self, response_op, yml_config):
client_type, channel_size): self._retry = yml_config.get('retry', 1)
client_type = yml_config.get('client_type', 'brpc')
use_multithread = yml_config.get('use_multithread', True)
use_profile = yml_config.get('profile', False)
channel_size = yml_config.get('channel_size', 0)
if not use_multithread:
if use_profile:
raise Exception(
"profile cannot be used in multiprocess version temporarily")
self.name = "#G" self.name = "#G"
self._retry = min(retry, 1) self._profiler = TimeProfiler()
self._profiler = profiler self._profiler.enable(use_profile)
self._dag = DAG(response_op, profiler, use_multithread, client_type,
channel_size) self._dag = DAG(response_op, self._profiler, use_multithread,
in_channel, out_channel, pack_rpc_func, unpack_rpc_func = self._dag.build( client_type, channel_size)
) (in_channel, out_channel, pack_rpc_func,
unpack_rpc_func) = self._dag.build()
self._dag.start() self._dag.start()
self._set_in_channel(in_channel) self._set_in_channel(in_channel)
...@@ -52,10 +65,10 @@ class DAGExecutor(object): ...@@ -52,10 +65,10 @@ class DAGExecutor(object):
_LOGGER.debug(self._log(out_channel.debug())) _LOGGER.debug(self._log(out_channel.debug()))
self._id_lock = threading.Lock() self._id_lock = threading.Lock()
self._cv = threading.Condition()
self._fetch_buffer = {}
self._id_counter = 0 self._id_counter = 0
self._reset_max_id = 1000000000000000000 self._reset_max_id = 1000000000000000000
self._cv = threading.Condition()
self._fetch_buffer = {}
self._is_run = False self._is_run = False
self._recive_func = None self._recive_func = None
...@@ -136,6 +149,8 @@ class DAGExecutor(object): ...@@ -136,6 +149,8 @@ class DAGExecutor(object):
data_id=data_id), data_id data_id=data_id), data_id
def call(self, rpc_request): def call(self, rpc_request):
self._profiler.record("dag-call_0".format(self.name))
self._profiler.record("{}-prepack_0".format(self.name)) self._profiler.record("{}-prepack_0".format(self.name))
req_channeldata, data_id = self._pack_channeldata(rpc_request) req_channeldata, data_id = self._pack_channeldata(rpc_request)
self._profiler.record("{}-prepack_1".format(self.name)) self._profiler.record("{}-prepack_1".format(self.name))
...@@ -161,6 +176,8 @@ class DAGExecutor(object): ...@@ -161,6 +176,8 @@ class DAGExecutor(object):
self._profiler.record("{}-postpack_0".format(self.name)) self._profiler.record("{}-postpack_0".format(self.name))
rpc_resp = self._pack_for_rpc_resp(resp_channeldata) rpc_resp = self._pack_for_rpc_resp(resp_channeldata)
self._profiler.record("{}-postpack_1".format(self.name)) self._profiler.record("{}-postpack_1".format(self.name))
self._profiler.record("dag-call_1".format(self.name))
self._profiler.print_profile() self._profiler.print_profile()
return rpc_resp return rpc_resp
......
...@@ -21,17 +21,16 @@ import yaml ...@@ -21,17 +21,16 @@ import yaml
from .proto import pipeline_service_pb2_grpc from .proto import pipeline_service_pb2_grpc
from .operator import ResponseOp from .operator import ResponseOp
from .profiler import TimeProfiler
from .dag import DAGExecutor from .dag import DAGExecutor
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger()
_profiler = TimeProfiler()
class PipelineService(pipeline_service_pb2_grpc.PipelineServiceServicer): class PipelineService(pipeline_service_pb2_grpc.PipelineServiceServicer):
def __init__(self, dag_executor): def __init__(self, dag_executor):
super(PipelineService, self).__init__() super(PipelineService, self).__init__()
self._dag_executor = dag_executor self._dag_executor = dag_executor
self._dag_executor.start()
def inference(self, request, context): def inference(self, request, context):
resp = self._dag_executor.call(request) resp = self._dag_executor.call(request)
...@@ -44,10 +43,6 @@ class PipelineServer(object): ...@@ -44,10 +43,6 @@ class PipelineServer(object):
self._worker_num = None self._worker_num = None
self._response_op = None self._response_op = None
def gen_desc(self):
_LOGGER.info('here will generate desc for PAAS')
pass
def set_response_op(self, response_op): def set_response_op(self, response_op):
if not isinstance(response_op, ResponseOp): if not isinstance(response_op, ResponseOp):
raise Exception("response_op must be ResponseOp type.") raise Exception("response_op must be ResponseOp type.")
...@@ -69,25 +64,8 @@ class PipelineServer(object): ...@@ -69,25 +64,8 @@ class PipelineServer(object):
raise SystemExit("Prot {} is already used".format(self._port)) raise SystemExit("Prot {} is already used".format(self._port))
self._worker_num = yml_config.get('worker_num', 2) self._worker_num = yml_config.get('worker_num', 2)
retry = yml_config.get('retry', 1)
client_type = yml_config.get('client_type', 'brpc')
use_multithread = yml_config.get('use_multithread', True)
use_profile = yml_config.get('profile', False)
channel_size = yml_config.get('channel_size', 0)
if not use_multithread:
if use_profile:
raise Exception(
"profile cannot be used in multiprocess version temporarily")
_profiler.enable(use_profile)
# init dag executor # init dag executor
self._dag_executor = DAGExecutor(self._response_op, _profiler, self._dag_executor = DAGExecutor(self._response_op, yml_config)
use_multithread, retry, client_type,
channel_size)
self._dag_executor.start()
self.gen_desc()
def run_server(self): def run_server(self):
service = PipelineService(self._dag_executor) service = PipelineService(self._dag_executor)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册