diff --git a/python/pipeline/analyse.py b/python/pipeline/analyse.py index 0cb4196c53900e77f0d9ba346a6a16a264ef95de..65194889ae65bcd7a32f934613fe9514f3a1f6c3 100644 --- a/python/pipeline/analyse.py +++ b/python/pipeline/analyse.py @@ -69,7 +69,7 @@ class Analyst(object): with open(self._profile_file) as f: for line in f.readlines(): line = line.strip().split("\t") - if line[0] == "PROFILE": + if line[0] == "PROFILE" and len(line) >= 3: trace_list = self._prase_line(line[1], line[2], counter) counter += 1 for trace in trace_list: diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index c76fca81307639ab5986b4bbaee5231e6ebc42b3..7e303d1fd37392e8e932535b11745ae31e0d6b95 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -112,7 +112,7 @@ class DAGExecutor(object): if not isinstance(in_channel, (ThreadChannel, ProcessChannel)): raise TypeError( "in_channel must be Channel type, but get {}".format( - type(in_channel)))) + type(in_channel))) in_channel.add_producer(self.name) self._in_channel = in_channel @@ -120,7 +120,7 @@ class DAGExecutor(object): if not isinstance(out_channel, (ThreadChannel, ProcessChannel)): raise TypeError( "iout_channel must be Channel type, but get {}".format( - type(out_channel)))) + type(out_channel))) out_channel.add_consumer(self.name) self._out_channel = out_channel @@ -148,7 +148,7 @@ class DAGExecutor(object): (_, channeldata), = channeldata_dict.items() if not isinstance(channeldata, ChannelData): _LOGGER.error('[DAG Executor] data must be ChannelData type, but get {}' - .format(type(channeldata)))) + .format(type(channeldata))) os._exit(-1) data_id = channeldata.id @@ -380,7 +380,8 @@ class DAG(object): ) if self._build_dag_each_worker: _LOGGER.info("Because `build_dag_each_worker` mode is used, " - "Auto-batching is set to the default config.") + "Auto-batching is set to the default config: " + "batch_size=1, auto_batching_timeout=None") for op in used_ops: op.use_default_auto_batching_config() diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index 123df751209d124ca7e744ea35a267102547c39d..984fecab56d1d924ad6c24fe4ae1f183f5ae99e0 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -80,8 +80,16 @@ class Op(object): self._succ_close_op = False def use_default_auto_batching_config(self): - self._batch_size = 1 - self._auto_batching_timeout = None + if self._batch_size != 1: + _LOGGER.warn( + "Op({}) reset batch_size=1 (original: {})" + .format(self.name, self._batch_size)) + self._batch_size = 1 + if self._auto_batching_timeout != None: + _LOGGER.warn( + "Op({}) reset auto_batching_timeout=1 (original: {})" + .format(self.name, self._auto_batching_timeout)) + self._auto_batching_timeout = None def use_profiler(self, use_profile): self._server_use_profile = use_profile @@ -459,7 +467,7 @@ class Op(object): # init op try: - self._initialize(is_thread_op, client_type) + self._initialize(is_thread_op, client_type, concurrency_idx) except Exception as e: _LOGGER.error(log("init op failed: {}".format(e))) os._exit(-1) @@ -564,7 +572,7 @@ class Op(object): self._finalize(is_thread_op) break - def _initialize(self, is_thread_op, client_type): + def _initialize(self, is_thread_op, client_type, concurrency_idx): if is_thread_op: with self._for_init_op_lock: if not self._succ_init_op: