From c8cc0b51a2a541c4f477fef02a0c9b4e911a9cd4 Mon Sep 17 00:00:00 2001 From: barriery Date: Fri, 7 Aug 2020 07:57:55 +0000 Subject: [PATCH] update generate logid in build_dag_each_worker mode --- core/general-client/src/general_model.cpp | 2 -- core/pdcodegen/src/pdcodegen.cpp | 1 - python/pipeline/__init__.py | 4 ---- python/pipeline/analyse.py | 2 +- python/pipeline/dag.py | 12 ++++++++++-- python/pipeline/pipeline_server.py | 10 +++++----- 6 files changed, 16 insertions(+), 15 deletions(-) diff --git a/core/general-client/src/general_model.cpp b/core/general-client/src/general_model.cpp index 5c2f95de..6804a9e8 100644 --- a/core/general-client/src/general_model.cpp +++ b/core/general-client/src/general_model.cpp @@ -164,7 +164,6 @@ int PredictorClient::batch_predict( VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size; Request req; req.set_log_id(log_id); - VLOG(2) << "(logid=" << req.log_id() << ")"; for (auto &name : fetch_name) { req.add_fetch_var_names(name); } @@ -379,7 +378,6 @@ int PredictorClient::numpy_predict( VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size; Request req; req.set_log_id(log_id); - VLOG(2) << "(logid=" << req.log_id() << ")"; for (auto &name : fetch_name) { req.add_fetch_var_names(name); } diff --git a/core/pdcodegen/src/pdcodegen.cpp b/core/pdcodegen/src/pdcodegen.cpp index 3c86ce40..c4e86d83 100644 --- a/core/pdcodegen/src/pdcodegen.cpp +++ b/core/pdcodegen/src/pdcodegen.cpp @@ -268,7 +268,6 @@ class PdsCodeGenerator : public CodeGenerator { " const $input_name$* request,\n" " $output_name$* response,\n" " google::protobuf::Closure* done) {\n" - " std::cout << \"WTFFFFFFFFFFFFFFFF\";\n" " struct timeval tv;\n" " gettimeofday(&tv, NULL);" " long start = tv.tv_sec * 1000000 + tv.tv_usec;", diff --git a/python/pipeline/__init__.py b/python/pipeline/__init__.py index 913ee39f..9f305670 100644 --- a/python/pipeline/__init__.py +++ b/python/pipeline/__init__.py @@ -16,7 +16,3 @@ from operator import Op, RequestOp, ResponseOp from pipeline_server import PipelineServer from pipeline_client import PipelineClient from analyse import Analyst -from operator import Op, RequestOp, ResponseOp -from pipeline_server import PipelineServer -from pipeline_client import PipelineClient -from analyse import Analyst diff --git a/python/pipeline/analyse.py b/python/pipeline/analyse.py index 11a68b27..424b7e02 100644 --- a/python/pipeline/analyse.py +++ b/python/pipeline/analyse.py @@ -17,7 +17,7 @@ import copy import re import logging -_LOGGER = logging.getLogger("pipeline.analyse") +_LOGGER = logging.getLogger(__name__) class Analyst(object): diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index 3c8f8bd4..4352cfed 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -36,7 +36,7 @@ _LOGGER = logging.getLogger(__name__) class DAGExecutor(object): - def __init__(self, response_op, server_conf): + def __init__(self, response_op, server_conf, worker_idx): build_dag_each_worker = server_conf["build_dag_each_worker"] server_worker_num = server_conf["worker_num"] dag_conf = server_conf["dag"] @@ -74,8 +74,16 @@ class DAGExecutor(object): if self._tracer is not None: self._tracer.start() + # generate id: data_id == request_id == log_id + base_counter = 0 + gen_id_step = 1 + if build_dag_each_worker: + base_counter = worker_idx + gen_id_step = server_worker_num self._id_generator = ThreadIdGenerator( - max_id=1000000000000000000, base_counter=0, step=1) + max_id=1000000000000000000, + base_counter=base_counter, + step=gen_id_step) self._cv_pool = {} self._cv_for_cv_pool = threading.Condition() diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py index 70720e99..6ad34e34 100644 --- a/python/pipeline/pipeline_server.py +++ b/python/pipeline/pipeline_server.py @@ -30,10 +30,10 @@ _LOGGER = logging.getLogger(__name__) class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): - def __init__(self, response_op, dag_conf): + def __init__(self, response_op, dag_conf, worker_idx=-1): super(PipelineServicer, self).__init__() # init dag executor - self._dag_executor = DAGExecutor(response_op, dag_conf) + self._dag_executor = DAGExecutor(response_op, dag_conf, worker_idx) self._dag_executor.start() _LOGGER.info("[PipelineServicer] succ init") @@ -107,7 +107,7 @@ class PipelineServer(object): show_info = (i == 0) worker = multiprocessing.Process( target=self._run_server_func, - args=(bind_address, self._response_op, self._conf)) + args=(bind_address, self._response_op, self._conf, i)) worker.start() workers.append(worker) for worker in workers: @@ -121,13 +121,13 @@ class PipelineServer(object): server.start() server.wait_for_termination() - def _run_server_func(self, bind_address, response_op, dag_conf): + def _run_server_func(self, bind_address, response_op, dag_conf, worker_idx): options = (('grpc.so_reuseport', 1), ) server = grpc.server( futures.ThreadPoolExecutor( max_workers=1, ), options=options) pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( - PipelineServicer(response_op, dag_conf), server) + PipelineServicer(response_op, dag_conf, worker_idx), server) server.add_insecure_port(bind_address) server.start() server.wait_for_termination() -- GitLab