提交 b3245865 编写于 作者: B barriery

bug fix

上级 c9cd9e02
...@@ -102,11 +102,16 @@ class DAGExecutor(object): ...@@ -102,11 +102,16 @@ class DAGExecutor(object):
_LOGGER.info("[DAG Executor] succ stop") _LOGGER.info("[DAG Executor] succ stop")
def _get_next_data_id(self): def _get_next_data_id(self):
data_id = None
with self._id_lock: with self._id_lock:
if self._id_counter >= self._reset_max_id: if self._id_counter >= self._reset_max_id:
self._id_counter -= self._reset_max_id self._id_counter -= self._reset_max_id
data_id = self._id_counter
self._id_counter += 1 self._id_counter += 1
return self._id_counter - 1 cond_v = threading.Condition()
with self._cv_for_cv_pool:
self._cv_pool[data_id] = cond_v
return data_id, cond_v
def _set_in_channel(self, in_channel): def _set_in_channel(self, in_channel):
if not isinstance(in_channel, (ThreadChannel, ProcessChannel)): if not isinstance(in_channel, (ThreadChannel, ProcessChannel)):
...@@ -159,13 +164,10 @@ class DAGExecutor(object): ...@@ -159,13 +164,10 @@ class DAGExecutor(object):
self._fetch_buffer = channeldata self._fetch_buffer = channeldata
cv.notify_all() cv.notify_all()
def _get_channeldata_from_fetch_buffer(self, data_id): def _get_channeldata_from_fetch_buffer(self, data_id, cond_v):
resp = None resp = None
cv = threading.Condition() with cond_v:
with self._cv_for_cv_pool: cond_v.wait()
self._cv_pool[data_id] = cv
with cv:
cv.wait()
with self._cv_for_cv_pool: with self._cv_for_cv_pool:
resp = copy.deepcopy(self._fetch_buffer) resp = copy.deepcopy(self._fetch_buffer)
_LOGGER.debug("resp thread get resp data[{}]".format(data_id)) _LOGGER.debug("resp thread get resp data[{}]".format(data_id))
...@@ -201,7 +203,7 @@ class DAGExecutor(object): ...@@ -201,7 +203,7 @@ class DAGExecutor(object):
client_need_profile=client_need_profile) client_need_profile=client_need_profile)
def call(self, rpc_request): def call(self, rpc_request):
data_id = self._get_next_data_id() data_id, cond_v = self._get_next_data_id()
_LOGGER.debug("generate id: {}".format(data_id)) _LOGGER.debug("generate id: {}".format(data_id))
if not self._is_thread_op: if not self._is_thread_op:
...@@ -222,6 +224,8 @@ class DAGExecutor(object): ...@@ -222,6 +224,8 @@ class DAGExecutor(object):
self._in_channel.push(req_channeldata, self.name) self._in_channel.push(req_channeldata, self.name)
except ChannelStopError: except ChannelStopError:
_LOGGER.debug("[DAG Executor] channel stop.") _LOGGER.debug("[DAG Executor] channel stop.")
with self._cv_for_cv_pool:
self._cv_pool.pop(data_id)
return self._pack_for_rpc_resp( return self._pack_for_rpc_resp(
ChannelData( ChannelData(
ecode=ChannelDataEcode.CLOSED_ERROR.value, ecode=ChannelDataEcode.CLOSED_ERROR.value,
...@@ -230,7 +234,8 @@ class DAGExecutor(object): ...@@ -230,7 +234,8 @@ class DAGExecutor(object):
_LOGGER.debug("wait for Graph engine for data[{}]...".format( _LOGGER.debug("wait for Graph engine for data[{}]...".format(
data_id)) data_id))
resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id) resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id,
cond_v)
if resp_channeldata.ecode == ChannelDataEcode.OK.value: if resp_channeldata.ecode == ChannelDataEcode.OK.value:
_LOGGER.debug("Graph engine predict data[{}] succ".format( _LOGGER.debug("Graph engine predict data[{}] succ".format(
......
...@@ -779,7 +779,7 @@ function python_test_pipeline(){ ...@@ -779,7 +779,7 @@ function python_test_pipeline(){
# test: thread servicer & thread op # test: thread servicer & thread op
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 port: 18080
worker_num: 2 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
dag: dag:
is_thread_op: true is_thread_op: true
...@@ -796,7 +796,7 @@ EOF ...@@ -796,7 +796,7 @@ EOF
# test: thread servicer & process op # test: thread servicer & process op
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 port: 18080
worker_num: 2 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
dag: dag:
is_thread_op: false is_thread_op: false
...@@ -810,13 +810,13 @@ EOF ...@@ -810,13 +810,13 @@ EOF
ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill
kill_process_by_port 18080 kill_process_by_port 18080
# test: process servicer & thread op # test: process servicer & process op
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 port: 18080
worker_num: 2 worker_num: 4
build_dag_each_worker: true build_dag_each_worker: false
dag: dag:
is_thread_op: flase is_thread_op: false
client_type: brpc client_type: brpc
retry: 1 retry: 1
use_profile: false use_profile: false
...@@ -826,14 +826,20 @@ EOF ...@@ -826,14 +826,20 @@ EOF
check_cmd "python test_pipeline_client.py" check_cmd "python test_pipeline_client.py"
ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill
kill_process_by_port 18080 kill_process_by_port 18080
kill_server_process
kill_process_by_port 9292
kill_process_by_port 9393
# test: process servicer & process op # test: process servicer & thread op
pip uninstall grpcio
pip install grpcio --no-binary=grpcio
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 port: 18080
worker_num: 2 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: true
dag: dag:
is_thread_op: false is_thread_op: flase
client_type: brpc client_type: brpc
retry: 1 retry: 1
use_profile: false use_profile: false
...@@ -843,10 +849,6 @@ EOF ...@@ -843,10 +849,6 @@ EOF
check_cmd "python test_pipeline_client.py" check_cmd "python test_pipeline_client.py"
ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill ps -ef | grep "pipeline_server" | grep -v grep | awk '{print $2}' | xargs kill
kill_process_by_port 18080 kill_process_by_port 18080
kill_server_process
kill_process_by_port 9292
kill_process_by_port 9393
# start paddle serving service (grpc) # start paddle serving service (grpc)
python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 --use_multilang --workdir test9292 &> cnn.log & python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 --use_multilang --workdir test9292 &> cnn.log &
...@@ -854,7 +856,7 @@ EOF ...@@ -854,7 +856,7 @@ EOF
sleep 5 sleep 5
cat << EOF > config.yml cat << EOF > config.yml
port: 18080 port: 18080
worker_num: 2 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
dag: dag:
is_thread_op: false is_thread_op: false
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册