From 07ce3428417da056a68d32eab91336596eda1921 Mon Sep 17 00:00:00 2001 From: barrierye Date: Fri, 10 Jul 2020 13:43:40 +0800 Subject: [PATCH] support build process version op in each request --- .../pipeline/imdb_model_ensemble/config.yml | 4 +-- python/pipeline/channel.py | 20 +++++++-------- python/pipeline/dag.py | 3 +++ python/pipeline/operator.py | 25 +++++++++---------- 4 files changed, 27 insertions(+), 25 deletions(-) diff --git a/python/examples/pipeline/imdb_model_ensemble/config.yml b/python/examples/pipeline/imdb_model_ensemble/config.yml index c9a00f27..0303be7f 100644 --- a/python/examples/pipeline/imdb_model_ensemble/config.yml +++ b/python/examples/pipeline/imdb_model_ensemble/config.yml @@ -1,8 +1,8 @@ port: 18080 worker_num: 1 -build_dag_each_request: true +build_dag_each_request: false dag: - is_thread_op: false + is_thread_op: true client_type: brpc retry: 1 use_profile: false diff --git a/python/pipeline/channel.py b/python/pipeline/channel.py index 071b0014..90265750 100644 --- a/python/pipeline/channel.py +++ b/python/pipeline/channel.py @@ -197,7 +197,7 @@ class ProcessChannel(object): self._maxsize = maxsize self._timeout = timeout self.name = name - self._stop = False + self._stop = manager.Value('i', 0) self._cv = multiprocessing.Condition() @@ -253,13 +253,13 @@ class ProcessChannel(object): )) elif len(self._producers) == 1: with self._cv: - while self._stop is False: + while self._stop.value == 0: try: self._que.put({op_name: channeldata}, timeout=0) break except Queue.Full: self._cv.wait() - if self._stop: + if self._stop.value == 1: raise ChannelStopError() _LOGGER.debug( self._log("{} channel size: {}".format(op_name, @@ -302,7 +302,7 @@ class ProcessChannel(object): self._log("{} push data succ, but not push to queue.". format(op_name))) else: - while self._stop is False: + while self._stop.value == 0: try: _LOGGER.debug( self._log("{} push data succ: {}".format( @@ -311,7 +311,7 @@ class ProcessChannel(object): break except Queue.Empty: self._cv.wait() - if self._stop: + if self._stop.value == 1: raise ChannelStopError() _LOGGER.debug( @@ -329,7 +329,7 @@ class ProcessChannel(object): elif len(self._consumer_cursors) == 1: resp = None with self._cv: - while self._stop is False and resp is None: + while self._stop.value == 0 and resp is None: try: _LOGGER.debug( self._log("{} try to get(with channel empty: {})". @@ -342,7 +342,7 @@ class ProcessChannel(object): "{} wait for empty queue(with channel empty: {})". format(op_name, self._que.empty()))) self._cv.wait() - if self._stop: + if self._stop.value == 1: raise ChannelStopError() _LOGGER.debug( self._log("{} get data succ: {}".format(op_name, resp.__str__( @@ -366,7 +366,7 @@ class ProcessChannel(object): with self._cv: # When the data required by the current Op is not in output_buf, # it is necessary to obtain a data from queue and add it to output_buf. - while self._stop is False and self._consumer_cursors[ + while self._stop.value == 0 and self._consumer_cursors[ op_name] - self._base_cursor.value >= len(self._output_buf): _LOGGER.debug( self._log( @@ -386,7 +386,7 @@ class ProcessChannel(object): "{} wait for empty queue(with channel size: {})". format(op_name, self._que.qsize()))) self._cv.wait() - if self._stop: + if self._stop.value == 1: raise ChannelStopError() consumer_cursor = self._consumer_cursors[op_name] @@ -435,7 +435,7 @@ class ProcessChannel(object): def stop(self): _LOGGER.info(self._log("stop.")) - self._stop = True + self._stop.value = 1 with self._cv: self._cv.notify_all() diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index 100bda6f..a8bb4c25 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -470,3 +470,6 @@ class DAG(object): def stop(self): for chl in self._channels: chl.stop() + for op in self._actual_ops: + op.clean_input_channel() + op.clean_output_channels() diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index bc9a2eda..d3b879c4 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -117,7 +117,7 @@ class Op(object): channel.add_consumer(self.name) self._input = channel - def _clean_input_channel(self): + def clean_input_channel(self): self._input = None def _get_input_channel(self): @@ -131,7 +131,7 @@ class Op(object): channel.add_producer(self.name) self._outputs.append(channel) - def _clean_output_channels(self): + def clean_output_channels(self): self._outputs = [] def _get_output_channels(self): @@ -316,7 +316,7 @@ class Op(object): return output_data, error_channeldata def _run(self, concurrency_idx, input_channel, output_channels, client_type, - use_multithread): + is_thread_op): def get_log_func(op_info_prefix): def log_func(info_str): return "{} {}".format(op_info_prefix, info_str) @@ -330,7 +330,7 @@ class Op(object): # init op self.concurrency_idx = concurrency_idx try: - if use_multithread: + if is_thread_op: with self._for_init_op_lock: if not self._succ_init_op: # init profiler @@ -364,14 +364,13 @@ class Op(object): channeldata_dict = input_channel.front(self.name) except ChannelStopError: _LOGGER.info(log("stop.")) - with self._for_close_op_lock: - if not self._succ_close_op: - self._clean_input_channel() - self._clean_output_channels() - self._profiler = None - self.client = None - self._succ_init_op = False - self._succ_close_op = True + if is_thread_op: + with self._for_close_op_lock: + if not self._succ_close_op: + self._profiler = None + self.client = None + self._succ_init_op = False + self._succ_close_op = True break #self._profiler_record("get#{}_1".format(op_info_prefix)) _LOGGER.debug(log("input_data: {}".format(channeldata_dict))) @@ -547,7 +546,7 @@ class VirtualOp(Op): self._outputs.append(channel) def _run(self, concurrency_idx, input_channel, output_channels, client_type, - use_multithread): + is_thread_op): def get_log_func(op_info_prefix): def log_func(info_str): return "{} {}".format(op_info_prefix, info_str) -- GitLab