From b32458657684d7ac5191dc17f671458abafc2b1e Mon Sep 17 00:00:00 2001 From: barriery Date: Fri, 31 Jul 2020 06:18:01 +0000 Subject: [PATCH] bug fix --- python/pipeline/dag.py | 23 ++++++++++++++--------- tools/serving_build.sh | 32 +++++++++++++++++--------------- 2 files changed, 31 insertions(+), 24 deletions(-) diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index 8f36105d..f625c9e6 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -102,11 +102,16 @@ class DAGExecutor(object): _LOGGER.info("[DAG Executor] succ stop") def _get_next_data_id(self): + data_id = None with self._id_lock: if self._id_counter >= self._reset_max_id: self._id_counter -= self._reset_max_id + data_id = self._id_counter self._id_counter += 1 - return self._id_counter - 1 + cond_v = threading.Condition() + with self._cv_for_cv_pool: + self._cv_pool[data_id] = cond_v + return data_id, cond_v def _set_in_channel(self, in_channel): if not isinstance(in_channel, (ThreadChannel, ProcessChannel)): @@ -159,13 +164,10 @@ class DAGExecutor(object): self._fetch_buffer = channeldata cv.notify_all() - def _get_channeldata_from_fetch_buffer(self, data_id): + def _get_channeldata_from_fetch_buffer(self, data_id, cond_v): resp = None - cv = threading.Condition() - with self._cv_for_cv_pool: - self._cv_pool[data_id] = cv - with cv: - cv.wait() + with cond_v: + cond_v.wait() with self._cv_for_cv_pool: resp = copy.deepcopy(self._fetch_buffer) _LOGGER.debug("resp thread get resp data[{}]".format(data_id)) @@ -201,7 +203,7 @@ class DAGExecutor(object): client_need_profile=client_need_profile) def call(self, rpc_request): - data_id = self._get_next_data_id() + data_id, cond_v = self._get_next_data_id() _LOGGER.debug("generate id: {}".format(data_id)) if not self._is_thread_op: @@ -222,6 +224,8 @@ class DAGExecutor(object): self._in_channel.push(req_channeldata, self.name) except ChannelStopError: _LOGGER.debug("[DAG Executor] channel stop.") + with self._cv_for_cv_pool: + self._cv_pool.pop(data_id) return self._pack_for_rpc_resp( ChannelData( ecode=ChannelDataEcode.CLOSED_ERROR.value, @@ -230,7 +234,8 @@ class DAGExecutor(object): _LOGGER.debug("wait for Graph engine for data[{}]...".format( data_id)) - resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id) + resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id, + cond_v) if resp_channeldata.ecode == ChannelDataEcode.OK.value: _LOGGER.debug("Graph engine predict data[{}] succ".format( diff --git a/tools/serving_build.sh b/tools/serving_build.sh index 4bb68d93..24818958 100644 --- a/tools/serving_build.sh +++ b/tools/serving_build.sh @@ -779,7 +779,7 @@ function python_test_pipeline(){ # test: thread servicer & thread op cat << EOF > config.yml port: 18080 -worker_num: 2 +worker_num: 4 build_dag_each_worker: false dag: is_thread_op: true @@ -796,7 +796,7 @@ EOF # test: thread servicer & process op cat << EOF > config.yml port: 18080 -worker_num: 2 +worker_num: 4 build_dag_each_worker: false dag: is_thread_op: false @@ -810,13 +810,13 @@ EOF ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill kill_process_by_port 18080 - # test: process servicer & thread op + # test: process servicer & process op cat << EOF > config.yml port: 18080 -worker_num: 2 -build_dag_each_worker: true +worker_num: 4 +build_dag_each_worker: false dag: - is_thread_op: flase + is_thread_op: false client_type: brpc retry: 1 use_profile: false @@ -826,14 +826,20 @@ EOF check_cmd "python test_pipeline_client.py" ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill kill_process_by_port 18080 + + kill_server_process + kill_process_by_port 9292 + kill_process_by_port 9393 - # test: process servicer & process op + # test: process servicer & thread op + pip uninstall grpcio + pip install grpcio --no-binary=grpcio cat << EOF > config.yml port: 18080 -worker_num: 2 -build_dag_each_worker: false +worker_num: 4 +build_dag_each_worker: true dag: - is_thread_op: false + is_thread_op: flase client_type: brpc retry: 1 use_profile: false @@ -843,10 +849,6 @@ EOF check_cmd "python test_pipeline_client.py" ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill kill_process_by_port 18080 - - kill_server_process - kill_process_by_port 9292 - kill_process_by_port 9393 # start paddle serving service (grpc) python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 --use_multilang --workdir test9292 &> cnn.log & @@ -854,7 +856,7 @@ EOF sleep 5 cat << EOF > config.yml port: 18080 -worker_num: 2 +worker_num: 4 build_dag_each_worker: false dag: is_thread_op: false -- GitLab