diff --git a/core/general-client/src/general_model.cpp b/core/general-client/src/general_model.cpp index 5c2f95de8af6c0197a488c70bcb67f2893a122c6..6804a9e80d4140d02a5d46c7f3358f2403ee4fbb 100644 --- a/core/general-client/src/general_model.cpp +++ b/core/general-client/src/general_model.cpp @@ -164,7 +164,6 @@ int PredictorClient::batch_predict( VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size; Request req; req.set_log_id(log_id); - VLOG(2) << "(logid=" << req.log_id() << ")"; for (auto &name : fetch_name) { req.add_fetch_var_names(name); } @@ -379,7 +378,6 @@ int PredictorClient::numpy_predict( VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size; Request req; req.set_log_id(log_id); - VLOG(2) << "(logid=" << req.log_id() << ")"; for (auto &name : fetch_name) { req.add_fetch_var_names(name); } diff --git a/core/pdcodegen/src/pdcodegen.cpp b/core/pdcodegen/src/pdcodegen.cpp index 3c86ce4028bdc8722052525ac24405f6887bb586..c4e86d83eac05cb80152d12756dd7b5a49b357b6 100644 --- a/core/pdcodegen/src/pdcodegen.cpp +++ b/core/pdcodegen/src/pdcodegen.cpp @@ -268,7 +268,6 @@ class PdsCodeGenerator : public CodeGenerator { " const $input_name$* request,\n" " $output_name$* response,\n" " google::protobuf::Closure* done) {\n" - " std::cout << \"WTFFFFFFFFFFFFFFFF\";\n" " struct timeval tv;\n" " gettimeofday(&tv, NULL);" " long start = tv.tv_sec * 1000000 + tv.tv_usec;", diff --git a/python/pipeline/__init__.py b/python/pipeline/__init__.py index 913ee39f03d480663a79b1b2b1503835100ee176..9f3056708c4394637ea6898fa50911af9871cd9d 100644 --- a/python/pipeline/__init__.py +++ b/python/pipeline/__init__.py @@ -16,7 +16,3 @@ from operator import Op, RequestOp, ResponseOp from pipeline_server import PipelineServer from pipeline_client import PipelineClient from analyse import Analyst -from operator import Op, RequestOp, ResponseOp -from pipeline_server import PipelineServer -from pipeline_client import PipelineClient -from analyse import Analyst diff --git a/python/pipeline/analyse.py b/python/pipeline/analyse.py index 11a68b272e88fc6575d48b43ff0ea400702e45db..424b7e025394467840ae77a696e42cefc5a06eed 100644 --- a/python/pipeline/analyse.py +++ b/python/pipeline/analyse.py @@ -17,7 +17,7 @@ import copy import re import logging -_LOGGER = logging.getLogger("pipeline.analyse") +_LOGGER = logging.getLogger(__name__) class Analyst(object): diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index 3c8f8bd47f304f72ba08bead23997aca07c83ac0..4352cfed7a0209c404eb542915f2040a47eb9d51 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -36,7 +36,7 @@ _LOGGER = logging.getLogger(__name__) class DAGExecutor(object): - def __init__(self, response_op, server_conf): + def __init__(self, response_op, server_conf, worker_idx): build_dag_each_worker = server_conf["build_dag_each_worker"] server_worker_num = server_conf["worker_num"] dag_conf = server_conf["dag"] @@ -74,8 +74,16 @@ class DAGExecutor(object): if self._tracer is not None: self._tracer.start() + # generate id: data_id == request_id == log_id + base_counter = 0 + gen_id_step = 1 + if build_dag_each_worker: + base_counter = worker_idx + gen_id_step = server_worker_num self._id_generator = ThreadIdGenerator( - max_id=1000000000000000000, base_counter=0, step=1) + max_id=1000000000000000000, + base_counter=base_counter, + step=gen_id_step) self._cv_pool = {} self._cv_for_cv_pool = threading.Condition() diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py index 70720e9984fd41302e53cc5144b952e293c4ec39..6ad34e343c6891a33e5cbd1728d0a65fd598d748 100644 --- a/python/pipeline/pipeline_server.py +++ b/python/pipeline/pipeline_server.py @@ -30,10 +30,10 @@ _LOGGER = logging.getLogger(__name__) class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): - def __init__(self, response_op, dag_conf): + def __init__(self, response_op, dag_conf, worker_idx=-1): super(PipelineServicer, self).__init__() # init dag executor - self._dag_executor = DAGExecutor(response_op, dag_conf) + self._dag_executor = DAGExecutor(response_op, dag_conf, worker_idx) self._dag_executor.start() _LOGGER.info("[PipelineServicer] succ init") @@ -107,7 +107,7 @@ class PipelineServer(object): show_info = (i == 0) worker = multiprocessing.Process( target=self._run_server_func, - args=(bind_address, self._response_op, self._conf)) + args=(bind_address, self._response_op, self._conf, i)) worker.start() workers.append(worker) for worker in workers: @@ -121,13 +121,13 @@ class PipelineServer(object): server.start() server.wait_for_termination() - def _run_server_func(self, bind_address, response_op, dag_conf): + def _run_server_func(self, bind_address, response_op, dag_conf, worker_idx): options = (('grpc.so_reuseport', 1), ) server = grpc.server( futures.ThreadPoolExecutor( max_workers=1, ), options=options) pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( - PipelineServicer(response_op, dag_conf), server) + PipelineServicer(response_op, dag_conf, worker_idx), server) server.add_insecure_port(bind_address) server.start() server.wait_for_termination()