提交 c8cc0b51 编写于 作者: B barriery

update generate logid in build_dag_each_worker mode

上级 a4b29717
...@@ -164,7 +164,6 @@ int PredictorClient::batch_predict( ...@@ -164,7 +164,6 @@ int PredictorClient::batch_predict(
VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size; VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size;
Request req; Request req;
req.set_log_id(log_id); req.set_log_id(log_id);
VLOG(2) << "(logid=" << req.log_id() << ")";
for (auto &name : fetch_name) { for (auto &name : fetch_name) {
req.add_fetch_var_names(name); req.add_fetch_var_names(name);
} }
...@@ -379,7 +378,6 @@ int PredictorClient::numpy_predict( ...@@ -379,7 +378,6 @@ int PredictorClient::numpy_predict(
VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size; VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size;
Request req; Request req;
req.set_log_id(log_id); req.set_log_id(log_id);
VLOG(2) << "(logid=" << req.log_id() << ")";
for (auto &name : fetch_name) { for (auto &name : fetch_name) {
req.add_fetch_var_names(name); req.add_fetch_var_names(name);
} }
......
...@@ -268,7 +268,6 @@ class PdsCodeGenerator : public CodeGenerator { ...@@ -268,7 +268,6 @@ class PdsCodeGenerator : public CodeGenerator {
" const $input_name$* request,\n" " const $input_name$* request,\n"
" $output_name$* response,\n" " $output_name$* response,\n"
" google::protobuf::Closure* done) {\n" " google::protobuf::Closure* done) {\n"
" std::cout << \"WTFFFFFFFFFFFFFFFF\";\n"
" struct timeval tv;\n" " struct timeval tv;\n"
" gettimeofday(&tv, NULL);" " gettimeofday(&tv, NULL);"
" long start = tv.tv_sec * 1000000 + tv.tv_usec;", " long start = tv.tv_sec * 1000000 + tv.tv_usec;",
......
...@@ -16,7 +16,3 @@ from operator import Op, RequestOp, ResponseOp ...@@ -16,7 +16,3 @@ from operator import Op, RequestOp, ResponseOp
from pipeline_server import PipelineServer from pipeline_server import PipelineServer
from pipeline_client import PipelineClient from pipeline_client import PipelineClient
from analyse import Analyst from analyse import Analyst
from operator import Op, RequestOp, ResponseOp
from pipeline_server import PipelineServer
from pipeline_client import PipelineClient
from analyse import Analyst
...@@ -17,7 +17,7 @@ import copy ...@@ -17,7 +17,7 @@ import copy
import re import re
import logging import logging
_LOGGER = logging.getLogger("pipeline.analyse") _LOGGER = logging.getLogger(__name__)
class Analyst(object): class Analyst(object):
......
...@@ -36,7 +36,7 @@ _LOGGER = logging.getLogger(__name__) ...@@ -36,7 +36,7 @@ _LOGGER = logging.getLogger(__name__)
class DAGExecutor(object): class DAGExecutor(object):
def __init__(self, response_op, server_conf): def __init__(self, response_op, server_conf, worker_idx):
build_dag_each_worker = server_conf["build_dag_each_worker"] build_dag_each_worker = server_conf["build_dag_each_worker"]
server_worker_num = server_conf["worker_num"] server_worker_num = server_conf["worker_num"]
dag_conf = server_conf["dag"] dag_conf = server_conf["dag"]
...@@ -74,8 +74,16 @@ class DAGExecutor(object): ...@@ -74,8 +74,16 @@ class DAGExecutor(object):
if self._tracer is not None: if self._tracer is not None:
self._tracer.start() self._tracer.start()
# generate id: data_id == request_id == log_id
base_counter = 0
gen_id_step = 1
if build_dag_each_worker:
base_counter = worker_idx
gen_id_step = server_worker_num
self._id_generator = ThreadIdGenerator( self._id_generator = ThreadIdGenerator(
max_id=1000000000000000000, base_counter=0, step=1) max_id=1000000000000000000,
base_counter=base_counter,
step=gen_id_step)
self._cv_pool = {} self._cv_pool = {}
self._cv_for_cv_pool = threading.Condition() self._cv_for_cv_pool = threading.Condition()
......
...@@ -30,10 +30,10 @@ _LOGGER = logging.getLogger(__name__) ...@@ -30,10 +30,10 @@ _LOGGER = logging.getLogger(__name__)
class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer):
def __init__(self, response_op, dag_conf): def __init__(self, response_op, dag_conf, worker_idx=-1):
super(PipelineServicer, self).__init__() super(PipelineServicer, self).__init__()
# init dag executor # init dag executor
self._dag_executor = DAGExecutor(response_op, dag_conf) self._dag_executor = DAGExecutor(response_op, dag_conf, worker_idx)
self._dag_executor.start() self._dag_executor.start()
_LOGGER.info("[PipelineServicer] succ init") _LOGGER.info("[PipelineServicer] succ init")
...@@ -107,7 +107,7 @@ class PipelineServer(object): ...@@ -107,7 +107,7 @@ class PipelineServer(object):
show_info = (i == 0) show_info = (i == 0)
worker = multiprocessing.Process( worker = multiprocessing.Process(
target=self._run_server_func, target=self._run_server_func,
args=(bind_address, self._response_op, self._conf)) args=(bind_address, self._response_op, self._conf, i))
worker.start() worker.start()
workers.append(worker) workers.append(worker)
for worker in workers: for worker in workers:
...@@ -121,13 +121,13 @@ class PipelineServer(object): ...@@ -121,13 +121,13 @@ class PipelineServer(object):
server.start() server.start()
server.wait_for_termination() server.wait_for_termination()
def _run_server_func(self, bind_address, response_op, dag_conf): def _run_server_func(self, bind_address, response_op, dag_conf, worker_idx):
options = (('grpc.so_reuseport', 1), ) options = (('grpc.so_reuseport', 1), )
server = grpc.server( server = grpc.server(
futures.ThreadPoolExecutor( futures.ThreadPoolExecutor(
max_workers=1, ), options=options) max_workers=1, ), options=options)
pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server( pipeline_service_pb2_grpc.add_PipelineServiceServicer_to_server(
PipelineServicer(response_op, dag_conf), server) PipelineServicer(response_op, dag_conf, worker_idx), server)
server.add_insecure_port(bind_address) server.add_insecure_port(bind_address)
server.start() server.start()
server.wait_for_termination() server.wait_for_termination()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册