diff --git a/python/examples/pipeline/imdb_model_ensemble/config.yml b/python/examples/pipeline/imdb_model_ensemble/config.yml index c9a00f2700cf0003824fa08867687695acec548d..0303be7f6097df09e130eab0c8ebe6305911c80b 100644 --- a/python/examples/pipeline/imdb_model_ensemble/config.yml +++ b/python/examples/pipeline/imdb_model_ensemble/config.yml @@ -1,8 +1,8 @@ port: 18080 worker_num: 1 -build_dag_each_request: true +build_dag_each_request: false dag: - is_thread_op: false + is_thread_op: true client_type: brpc retry: 1 use_profile: false diff --git a/python/pipeline/channel.py b/python/pipeline/channel.py index 071b0014edae38b6b6dfec376a54a0ada4eabbc5..9026575071fe2677d7df7fc80178c962dfc56c45 100644 --- a/python/pipeline/channel.py +++ b/python/pipeline/channel.py @@ -197,7 +197,7 @@ class ProcessChannel(object): self._maxsize = maxsize self._timeout = timeout self.name = name - self._stop = False + self._stop = manager.Value('i', 0) self._cv = multiprocessing.Condition() @@ -253,13 +253,13 @@ class ProcessChannel(object): )) elif len(self._producers) == 1: with self._cv: - while self._stop is False: + while self._stop.value == 0: try: self._que.put({op_name: channeldata}, timeout=0) break except Queue.Full: self._cv.wait() - if self._stop: + if self._stop.value == 1: raise ChannelStopError() _LOGGER.debug( self._log("{} channel size: {}".format(op_name, @@ -302,7 +302,7 @@ class ProcessChannel(object): self._log("{} push data succ, but not push to queue.". format(op_name))) else: - while self._stop is False: + while self._stop.value == 0: try: _LOGGER.debug( self._log("{} push data succ: {}".format( @@ -311,7 +311,7 @@ class ProcessChannel(object): break except Queue.Empty: self._cv.wait() - if self._stop: + if self._stop.value == 1: raise ChannelStopError() _LOGGER.debug( @@ -329,7 +329,7 @@ class ProcessChannel(object): elif len(self._consumer_cursors) == 1: resp = None with self._cv: - while self._stop is False and resp is None: + while self._stop.value == 0 and resp is None: try: _LOGGER.debug( self._log("{} try to get(with channel empty: {})". @@ -342,7 +342,7 @@ class ProcessChannel(object): "{} wait for empty queue(with channel empty: {})". format(op_name, self._que.empty()))) self._cv.wait() - if self._stop: + if self._stop.value == 1: raise ChannelStopError() _LOGGER.debug( self._log("{} get data succ: {}".format(op_name, resp.__str__( @@ -366,7 +366,7 @@ class ProcessChannel(object): with self._cv: # When the data required by the current Op is not in output_buf, # it is necessary to obtain a data from queue and add it to output_buf. - while self._stop is False and self._consumer_cursors[ + while self._stop.value == 0 and self._consumer_cursors[ op_name] - self._base_cursor.value >= len(self._output_buf): _LOGGER.debug( self._log( @@ -386,7 +386,7 @@ class ProcessChannel(object): "{} wait for empty queue(with channel size: {})". format(op_name, self._que.qsize()))) self._cv.wait() - if self._stop: + if self._stop.value == 1: raise ChannelStopError() consumer_cursor = self._consumer_cursors[op_name] @@ -435,7 +435,7 @@ class ProcessChannel(object): def stop(self): _LOGGER.info(self._log("stop.")) - self._stop = True + self._stop.value = 1 with self._cv: self._cv.notify_all() diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index 100bda6f0cd28e84eb37e231b6f765c4df6aa86f..a8bb4c254bd03787c6b1383ccc8e9417b8f3592f 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -470,3 +470,6 @@ class DAG(object): def stop(self): for chl in self._channels: chl.stop() + for op in self._actual_ops: + op.clean_input_channel() + op.clean_output_channels() diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index bc9a2edaa25a5a7d5f463553bec3e37bae73967f..d3b879c4081d02ee1ce41271028e21c1996aab22 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -117,7 +117,7 @@ class Op(object): channel.add_consumer(self.name) self._input = channel - def _clean_input_channel(self): + def clean_input_channel(self): self._input = None def _get_input_channel(self): @@ -131,7 +131,7 @@ class Op(object): channel.add_producer(self.name) self._outputs.append(channel) - def _clean_output_channels(self): + def clean_output_channels(self): self._outputs = [] def _get_output_channels(self): @@ -316,7 +316,7 @@ class Op(object): return output_data, error_channeldata def _run(self, concurrency_idx, input_channel, output_channels, client_type, - use_multithread): + is_thread_op): def get_log_func(op_info_prefix): def log_func(info_str): return "{} {}".format(op_info_prefix, info_str) @@ -330,7 +330,7 @@ class Op(object): # init op self.concurrency_idx = concurrency_idx try: - if use_multithread: + if is_thread_op: with self._for_init_op_lock: if not self._succ_init_op: # init profiler @@ -364,14 +364,13 @@ class Op(object): channeldata_dict = input_channel.front(self.name) except ChannelStopError: _LOGGER.info(log("stop.")) - with self._for_close_op_lock: - if not self._succ_close_op: - self._clean_input_channel() - self._clean_output_channels() - self._profiler = None - self.client = None - self._succ_init_op = False - self._succ_close_op = True + if is_thread_op: + with self._for_close_op_lock: + if not self._succ_close_op: + self._profiler = None + self.client = None + self._succ_init_op = False + self._succ_close_op = True break #self._profiler_record("get#{}_1".format(op_info_prefix)) _LOGGER.debug(log("input_data: {}".format(channeldata_dict))) @@ -547,7 +546,7 @@ class VirtualOp(Op): self._outputs.append(channel) def _run(self, concurrency_idx, input_channel, output_channels, client_type, - use_multithread): + is_thread_op): def get_log_func(op_info_prefix): def log_func(info_str): return "{} {}".format(op_info_prefix, info_str)