提交 de58d900 编写于 作者: B barriery

bug fix

上级 5e571c7b
......@@ -69,7 +69,7 @@ class Analyst(object):
with open(self._profile_file) as f:
for line in f.readlines():
line = line.strip().split("\t")
if line[0] == "PROFILE":
if line[0] == "PROFILE" and len(line) >= 3:
trace_list = self._prase_line(line[1], line[2], counter)
counter += 1
for trace in trace_list:
......
......@@ -112,7 +112,7 @@ class DAGExecutor(object):
if not isinstance(in_channel, (ThreadChannel, ProcessChannel)):
raise TypeError(
"in_channel must be Channel type, but get {}".format(
type(in_channel))))
type(in_channel)))
in_channel.add_producer(self.name)
self._in_channel = in_channel
......@@ -120,7 +120,7 @@ class DAGExecutor(object):
if not isinstance(out_channel, (ThreadChannel, ProcessChannel)):
raise TypeError(
"iout_channel must be Channel type, but get {}".format(
type(out_channel))))
type(out_channel)))
out_channel.add_consumer(self.name)
self._out_channel = out_channel
......@@ -148,7 +148,7 @@ class DAGExecutor(object):
(_, channeldata), = channeldata_dict.items()
if not isinstance(channeldata, ChannelData):
_LOGGER.error('[DAG Executor] data must be ChannelData type, but get {}'
.format(type(channeldata))))
.format(type(channeldata)))
os._exit(-1)
data_id = channeldata.id
......@@ -380,7 +380,8 @@ class DAG(object):
)
if self._build_dag_each_worker:
_LOGGER.info("Because `build_dag_each_worker` mode is used, "
"Auto-batching is set to the default config.")
"Auto-batching is set to the default config: "
"batch_size=1, auto_batching_timeout=None")
for op in used_ops:
op.use_default_auto_batching_config()
......
......@@ -80,7 +80,15 @@ class Op(object):
self._succ_close_op = False
def use_default_auto_batching_config(self):
if self._batch_size != 1:
_LOGGER.warn(
"Op({}) reset batch_size=1 (original: {})"
.format(self.name, self._batch_size))
self._batch_size = 1
if self._auto_batching_timeout != None:
_LOGGER.warn(
"Op({}) reset auto_batching_timeout=1 (original: {})"
.format(self.name, self._auto_batching_timeout))
self._auto_batching_timeout = None
def use_profiler(self, use_profile):
......@@ -459,7 +467,7 @@ class Op(object):
# init op
try:
self._initialize(is_thread_op, client_type)
self._initialize(is_thread_op, client_type, concurrency_idx)
except Exception as e:
_LOGGER.error(log("init op failed: {}".format(e)))
os._exit(-1)
......@@ -564,7 +572,7 @@ class Op(object):
self._finalize(is_thread_op)
break
def _initialize(self, is_thread_op, client_type):
def _initialize(self, is_thread_op, client_type, concurrency_idx):
if is_thread_op:
with self._for_init_op_lock:
if not self._succ_init_op:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册