提交 a75197a8 编写于 作者: B barriery

fix codestyle

上级 df8f6a63
......@@ -258,7 +258,8 @@ class ProcessChannel(object):
def push(self, channeldata, op_name=None):
_LOGGER.debug(
self._log("{} try to push data[{}]".format(op_name, channeldata.id)))
self._log("{} try to push data[{}]".format(op_name,
channeldata.id)))
if len(self._producers) == 0:
raise Exception(
self._log(
......@@ -275,8 +276,9 @@ class ProcessChannel(object):
if self._stop.value == 1:
raise ChannelStopError()
self._cv.notify_all()
_LOGGER.debug(self._log("{} notify all".format(op_name)))
_LOGGER.debug(self._log("{} push data succ!".format(op_name)))
_LOGGER.debug(
self._log("{} succ push data[{}] into internal queue.".format(
op_name, channeldata.id)))
return True
elif op_name is None:
raise Exception(
......@@ -287,7 +289,6 @@ class ProcessChannel(object):
data_id = channeldata.id
put_data = None
with self._cv:
_LOGGER.debug(self._log("{} get lock".format(op_name)))
if data_id not in self._input_buf:
self._input_buf[data_id] = {
name: None
......@@ -309,14 +310,11 @@ class ProcessChannel(object):
if put_data is None:
_LOGGER.debug(
self._log("{} push data succ, but not push to queue.".
format(op_name)))
self._log("{} succ push data[{}] into input_buffer.".format(
op_name, data_id)))
else:
while self._stop.value == 0:
try:
_LOGGER.debug(
self._log("{} push data succ: {}".format(
op_name, put_data.__str__())))
self._que.put(put_data, timeout=0)
break
except Queue.Empty:
......@@ -325,11 +323,15 @@ class ProcessChannel(object):
raise ChannelStopError()
_LOGGER.debug(
self._log("multi | {} push data succ!".format(op_name)))
self._log("{} succ push data[{}] into internal queue.".
format(op_name, data_id)))
self._cv.notify_all()
return True
def front(self, op_name=None, timeout=None):
_LOGGER.debug(
self._log("{} try to get data[?]; timeout={}".format(op_name,
timeout)))
endtime = None
if timeout is not None:
if timeout <= 0:
......@@ -337,7 +339,6 @@ class ProcessChannel(object):
else:
endtime = _time() + timeout
_LOGGER.debug(self._log("{} try to get data...".format(op_name)))
if len(self._consumer_cursors) == 0:
raise Exception(
self._log(
......@@ -348,21 +349,24 @@ class ProcessChannel(object):
with self._cv:
while self._stop.value == 0 and resp is None:
try:
_LOGGER.debug(
self._log("{} try to get(with channel empty: {})".
format(op_name, self._que.empty())))
resp = self._que.get(timeout=0)
break
except Queue.Empty:
if timeout is not None:
remaining = endtime - _time()
if remaining <= 0.0:
_LOGGER.debug(
self._log("{} get data[?] timeout".format(
op_name)))
raise ChannelTimeoutError()
self._cv.wait(remaining)
else:
self._cv.wait()
if self._stop.value == 1:
raise ChannelStopError()
_LOGGER.debug(
self._log("{} succ get data[{}]".format(op_name,
resp.values()[0].id)))
return resp
elif op_name is None:
raise Exception(
......@@ -384,22 +388,20 @@ class ProcessChannel(object):
# it is necessary to obtain a data from queue and add it to output_buf.
while self._stop.value == 0 and self._consumer_cursors[
op_name] - self._base_cursor.value >= len(self._output_buf):
_LOGGER.debug(
self._log(
"({}) B self._consumer_cursors: {}, self._base_cursor: {}, len(self._output_buf): {}".
format(op_name, self._consumer_cursors,
self._base_cursor.value, len(self._output_buf))))
try:
_LOGGER.debug(
self._log("{} try to get(with channel size: {})".format(
op_name, self._que.qsize())))
channeldata = self._que.get(timeout=0)
self._output_buf.append(channeldata)
_LOGGER.debug(
self._log("pop ready item[{}] into output_buffer".
format(channeldata.values()[0].id)))
break
except Queue.Empty:
if timeout is not None:
remaining = endtime - _time()
if remaining <= 0.0:
_LOGGER.debug(
self._log("{} get data[?] timeout".format(
op_name)))
raise ChannelTimeoutError()
self._cv.wait(remaining)
else:
......@@ -411,7 +413,6 @@ class ProcessChannel(object):
base_cursor = self._base_cursor.value
data_idx = consumer_cursor - base_cursor
resp = self._output_buf[data_idx]
_LOGGER.debug(self._log("{} get data: {}".format(op_name, resp)))
self._cursor_count[consumer_cursor] -= 1
if consumer_cursor == base_cursor and self._cursor_count[
......@@ -423,6 +424,7 @@ class ProcessChannel(object):
self._base_cursor.value += 1
# to avoid cursor overflow
if self._base_cursor.value >= self._reset_max_cursor:
_LOGGER.info(self._log("reset cursor in Channel"))
self._base_cursor.value -= self._reset_max_cursor
for name in self._consumer_cursors.keys():
self._consumer_cursors[name] -= self._reset_max_cursor
......@@ -440,16 +442,12 @@ class ProcessChannel(object):
self._cursor_count[new_consumer_cursor] = 0
self._cursor_count[new_consumer_cursor] += 1
_LOGGER.debug(
self._log(
"({}) A self._consumer_cursors: {}, self._base_cursor: {}, len(self._output_buf): {}".
format(op_name, self._consumer_cursors,
self._base_cursor.value, len(self._output_buf))))
_LOGGER.debug(self._log("{} notify all".format(op_name)))
self._cv.notify_all()
_LOGGER.debug(self._log("multi | {} get data succ!".format(op_name)))
return resp # reference, read only
_LOGGER.debug(
self._log("{} succ get data[{}] from output_buffer".format(
op_name, resp.values()[0].id)))
return resp
def stop(self):
_LOGGER.debug(self._log("stop."))
......@@ -538,7 +536,8 @@ class ThreadChannel(Queue.Queue):
def push(self, channeldata, op_name=None):
_LOGGER.debug(
self._log("{} try to push data[{}]".format(op_name, channeldata.id)))
self._log("{} try to push data[{}]".format(op_name,
channeldata.id)))
if len(self._producers) == 0:
raise Exception(
self._log(
......@@ -556,8 +555,7 @@ class ThreadChannel(Queue.Queue):
raise ChannelStopError()
self._cv.notify_all()
_LOGGER.debug(
self._log(
"{} succ push data[{}] into internal queue.".format(
self._log("{} succ push data[{}] into internal queue.".format(
op_name, channeldata.id)))
return True
elif op_name is None:
......@@ -585,8 +583,7 @@ class ThreadChannel(Queue.Queue):
if put_data is None:
_LOGGER.debug(
self._log(
"{} succ push data[{}] into input_buffer.".format(
self._log("{} succ push data[{}] into input_buffer.".format(
op_name, data_id)))
else:
while self._stop is False:
......@@ -599,17 +596,15 @@ class ThreadChannel(Queue.Queue):
raise ChannelStopError()
_LOGGER.debug(
self._log(
"{} succ push data[{}] into internal queue.".format(
op_name, data_id)))
self._log("{} succ push data[{}] into internal queue.".
format(op_name, data_id)))
self._cv.notify_all()
return True
def front(self, op_name=None, timeout=None):
_LOGGER.debug(
self._log(
"{} try to get data[?]; timeout={}".format(
op_name, timeout)))
self._log("{} try to get data[?]; timeout={}".format(op_name,
timeout)))
endtime = None
if timeout is not None:
if timeout <= 0:
......@@ -634,8 +629,8 @@ class ThreadChannel(Queue.Queue):
remaining = endtime - _time()
if remaining <= 0.0:
_LOGGER.debug(
self._log(
"{} get data[?] timeout".format(op_name)))
self._log("{} get data[?] timeout".format(
op_name)))
raise ChannelTimeoutError()
self._cv.wait(remaining)
else:
......@@ -643,8 +638,8 @@ class ThreadChannel(Queue.Queue):
if self._stop:
raise ChannelStopError()
_LOGGER.debug(
self._log("{} succ get data[{}]".format(
op_name, resp.values()[0].id)))
self._log("{} succ get data[{}]".format(op_name,
resp.values()[0].id)))
return resp
elif op_name is None:
raise Exception(
......@@ -670,17 +665,16 @@ class ThreadChannel(Queue.Queue):
channeldata = self.get(timeout=0)
self._output_buf.append(channeldata)
_LOGGER.debug(
self._log(
"pop ready item[{}] into output_buffer".format(
channeldata.values()[0].id)))
self._log("pop ready item[{}] into output_buffer".
format(channeldata.values()[0].id)))
break
except Queue.Empty:
if timeout is not None:
remaining = endtime - _time()
if remaining <= 0.0:
_LOGGER.debug(
self._log(
"{} get data[?] timeout".format(op_name)))
self._log("{} get data[?] timeout".format(
op_name)))
raise ChannelTimeoutError()
self._cv.wait(remaining)
else:
......@@ -704,8 +698,7 @@ class ThreadChannel(Queue.Queue):
self._base_cursor += 1
# to avoid cursor overflow
if self._base_cursor >= self._reset_max_cursor:
_LOGGER.info(
self._log("reset cursor in Channel"))
_LOGGER.info(self._log("reset cursor in Channel"))
self._base_cursor -= self._reset_max_cursor
for name in self._consumer_cursors:
self._consumer_cursors[name] -= self._reset_max_cursor
......@@ -725,8 +718,7 @@ class ThreadChannel(Queue.Queue):
self._cv.notify_all()
_LOGGER.debug(
self._log(
"{} succ get data[{}] from output_buffer".format(
self._log("{} succ get data[{}] from output_buffer".format(
op_name, resp.values()[0].id)))
return resp
......@@ -736,10 +728,12 @@ class ThreadChannel(Queue.Queue):
with self._cv:
self._cv.notify_all()
class ChannelTimeoutError(RuntimeError):
def __init__(self):
pass
class ChannelStopError(RuntimeError):
def __init__(self):
pass
......@@ -110,17 +110,15 @@ class DAGExecutor(object):
def _set_in_channel(self, in_channel):
if not isinstance(in_channel, (ThreadChannel, ProcessChannel)):
raise TypeError(
"in_channel must be Channel type, but get {}".format(
type(in_channel)))
raise TypeError("in_channel must be Channel type, but get {}".
format(type(in_channel)))
in_channel.add_producer(self.name)
self._in_channel = in_channel
def _set_out_channel(self, out_channel):
if not isinstance(out_channel, (ThreadChannel, ProcessChannel)):
raise TypeError(
"iout_channel must be Channel type, but get {}".format(
type(out_channel)))
raise TypeError("iout_channel must be Channel type, but get {}".
format(type(out_channel)))
out_channel.add_consumer(self.name)
self._out_channel = out_channel
......@@ -143,11 +141,13 @@ class DAGExecutor(object):
break
if len(channeldata_dict) != 1:
_LOGGER.error("[DAG Executor] out_channel cannot have multiple input ops")
_LOGGER.error(
"[DAG Executor] out_channel cannot have multiple input ops")
os._exit(-1)
(_, channeldata), = channeldata_dict.items()
if not isinstance(channeldata, ChannelData):
_LOGGER.error('[DAG Executor] data must be ChannelData type, but get {}'
_LOGGER.error(
'[DAG Executor] data must be ChannelData type, but get {}'
.format(type(channeldata)))
os._exit(-1)
......@@ -192,7 +192,8 @@ class DAGExecutor(object):
profile_value = rpc_request.value[idx]
break
client_need_profile = (profile_value == self._client_profile_value)
_LOGGER.debug("request[{}] need profile: {}".format(data_id, client_need_profile))
_LOGGER.debug("request[{}] need profile: {}".format(
data_id, client_need_profile))
return ChannelData(
datatype=ChannelDataType.DICT.value,
dictdata=dictdata,
......@@ -208,7 +209,8 @@ class DAGExecutor(object):
else:
self._profiler.record("call_{}#DAG_0".format(data_id))
_LOGGER.debug("try parse RPC package to channeldata[{}]".format(data_id))
_LOGGER.debug("try parse RPC package to channeldata[{}]".format(
data_id))
self._profiler.record("prepack_{}#{}_0".format(data_id, self.name))
req_channeldata = self._pack_channeldata(rpc_request, data_id)
self._profiler.record("prepack_{}#{}_1".format(data_id, self.name))
......@@ -226,11 +228,13 @@ class DAGExecutor(object):
error_info="dag closed.",
data_id=data_id))
_LOGGER.debug("wait for Graph engine for data[{}]...".format(data_id))
_LOGGER.debug("wait for Graph engine for data[{}]...".format(
data_id))
resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id)
if resp_channeldata.ecode == ChannelDataEcode.OK.value:
_LOGGER.debug("Graph engine predict data[{}] succ".format(data_id))
_LOGGER.debug("Graph engine predict data[{}] succ".format(
data_id))
break
else:
_LOGGER.warn("Graph engine predict data[{}] failed: {}"
......@@ -239,10 +243,11 @@ class DAGExecutor(object):
break
if i + 1 < self._retry:
_LOGGER.warn("retry({}/{}) data[{}]".format(
i + 1, self._retry, data_id))
_LOGGER.warn("retry({}/{}) data[{}]".format(i + 1, self._retry,
data_id))
_LOGGER.debug("unpack channeldata[{}] into RPC resp package".format(data_id))
_LOGGER.debug("unpack channeldata[{}] into RPC resp package".format(
data_id))
self._profiler.record("postpack_{}#{}_0".format(data_id, self.name))
rpc_resp = self._pack_for_rpc_resp(resp_channeldata)
self._profiler.record("postpack_{}#{}_1".format(data_id, self.name))
......
......@@ -81,13 +81,11 @@ class Op(object):
def use_default_auto_batching_config(self):
if self._batch_size != 1:
_LOGGER.warn(
"Op({}) reset batch_size=1 (original: {})"
_LOGGER.warn("Op({}) reset batch_size=1 (original: {})"
.format(self.name, self._batch_size))
self._batch_size = 1
if self._auto_batching_timeout != None:
_LOGGER.warn(
"Op({}) reset auto_batching_timeout=1 (original: {})"
_LOGGER.warn("Op({}) reset auto_batching_timeout=1 (original: {})"
.format(self.name, self._auto_batching_timeout))
self._auto_batching_timeout = None
......@@ -104,10 +102,12 @@ class Op(object):
if self.with_serving == False:
_LOGGER.info("Op({}) no client".format(self.name))
return None
_LOGGER.info("Op({}) service endpoints: {}".format(self.name, server_endpoints))
_LOGGER.info("Op({}) service endpoints: {}".format(self.name,
server_endpoints))
_LOGGER.debug("Op({}) fetch_names: {}".format(self.name, fetch_names))
if client_type == 'brpc':
_LOGGER.debug("Op({}) client_config: {}".format(self.name, client_config))
_LOGGER.debug("Op({}) client_config: {}".format(self.name,
client_config))
client = Client()
client.load_client_config(client_config)
elif client_type == 'grpc':
......@@ -259,8 +259,7 @@ class Op(object):
preped_data = self.preprocess(parsed_data)
except NotImplementedError as e:
# preprocess function not implemented
error_info = log_func(
"preprocess data[{}] failed: {}".format(
error_info = log_func("preprocess data[{}] failed: {}".format(
data_id, e))
error_channeldata = ChannelData(
ecode=ChannelDataEcode.NOT_IMPLEMENTED.value,
......@@ -268,8 +267,7 @@ class Op(object):
data_id=data_id)
except TypeError as e:
# Error type in channeldata.datatype
error_info = log_func(
"preprocess data[{}] failed: {}".format(
error_info = log_func("preprocess data[{}] failed: {}".format(
data_id, e))
_LOGGER.error(error_info)
error_channeldata = ChannelData(
......@@ -277,8 +275,7 @@ class Op(object):
error_info=error_info,
data_id=data_id)
except Exception as e:
error_info = log_func(
"preprocess data[{}] failed: {}".format(
error_info = log_func("preprocess data[{}] failed: {}".format(
data_id, e))
_LOGGER.error(error_info)
error_channeldata = ChannelData(
......@@ -324,7 +321,8 @@ class Op(object):
.format(i + 1, self._retry)))
except Exception as e:
ecode = ChannelDataEcode.UNKNOW.value
error_info = log_func("process batch failed: {}".format(e))
error_info = log_func("process batch failed: {}".format(
e))
_LOGGER.error(error_info)
break
else:
......@@ -332,12 +330,10 @@ class Op(object):
if ecode != ChannelDataEcode.OK.value:
for data_id in data_ids:
err_channeldata_dict[data_id] = ChannelData(
ecode=ecode,
error_info=error_info,
data_id=data_id)
ecode=ecode, error_info=error_info, data_id=data_id)
elif midped_batch is None:
# op client return None
error_info=log_func(
error_info = log_func(
"predict failed. pls check the server side.")
_LOGGER.error(error_info)
for data_id in data_ids:
......@@ -349,7 +345,8 @@ class Op(object):
# transform np format to dict format
for idx, data_id in enumerate(data_ids):
midped_data_dict[data_id] = {
k: v[idx] for k, v in midped_batch.items()
k: v[idx]
for k, v in midped_batch.items()
}
else:
midped_data_dict = preped_data_dict
......@@ -363,8 +360,8 @@ class Op(object):
for data_id, midped_data in midped_data_dict.items():
postped_data, err_channeldata = None, None
try:
postped_data = self.postprocess(
parsed_data_dict[data_id], midped_data)
postped_data = self.postprocess(parsed_data_dict[data_id],
midped_data)
except Exception as e:
error_info = log_func("postprocess data[{}] failed: {}"
.format(data_id, e))
......@@ -404,13 +401,12 @@ class Op(object):
_LOGGER.debug(log_func("succ run postprocess"))
return postped_data_dict, err_channeldata_dict
def _auto_batching_generator(self, input_channel, op_name,
batch_size, timeout, log_func):
def _auto_batching_generator(self, input_channel, op_name, batch_size,
timeout, log_func):
while True:
batch = []
_LOGGER.debug(
log_func(
"Auto-batching expect size: {}; timeout: {}".format(
log_func("Auto-batching expect size: {}; timeout: {}".format(
batch_size, timeout)))
while len(batch) == 0:
endtime = None
......@@ -424,14 +420,16 @@ class Op(object):
if remaining <= 0.0:
_LOGGER.debug(log_func("Auto-batching timeout"))
break
channeldata_dict = input_channel.front(op_name, timeout)
channeldata_dict = input_channel.front(op_name,
timeout)
else:
channeldata_dict = input_channel.front(op_name)
batch.append(channeldata_dict)
except ChannelTimeoutError:
_LOGGER.debug(log_func("Auto-batching timeout"))
break
_LOGGER.debug(log_func("Auto-batching actual size: {}".format(len(batch))))
_LOGGER.debug(
log_func("Auto-batching actual size: {}".format(len(batch))))
yield batch
def _parse_channeldata_batch(self, batch, output_channels):
......@@ -449,16 +447,17 @@ class Op(object):
else:
# error data in predecessor Op
# (error_channeldata with profile info)
self._push_to_output_channels(
error_channeldata, output_channels)
self._push_to_output_channels(error_channeldata,
output_channels)
return parsed_data_dict, need_profile_dict, profile_dict
def _run(self, concurrency_idx, input_channel, output_channels,
client_type, is_thread_op):
def _run(self, concurrency_idx, input_channel, output_channels, client_type,
is_thread_op):
def get_log_func(op_info_prefix):
def log_func(info_str):
return "{} {}".format(op_info_prefix, info_str)
return log_func
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
......@@ -595,8 +594,7 @@ class Op(object):
self._profiler = TimeProfiler()
self._profiler.enable(True)
# init client
self.client = self.init_client(
client_type, self._client_config,
self.client = self.init_client(client_type, self._client_config,
self._server_endpoints,
self._fetch_names)
# user defined
......@@ -625,7 +623,7 @@ class RequestOp(Op):
try:
self.init_op()
except Exception as e:
_LOGGER.error(e)
_LOGGER.error("Op(Request) init op failed: {}".format(e))
os._exit(-1)
def unpack_request_package(self, request):
......@@ -649,7 +647,7 @@ class ResponseOp(Op):
try:
self.init_op()
except Exception as e:
_LOGGER.error(e)
_LOGGER.error("Op(ResponseOp) init op failed: {}".format(e))
os._exit(-1)
def pack_response_package(self, channeldata):
......@@ -730,7 +728,7 @@ class VirtualOp(Op):
try:
channeldata_dict = input_channel.front(self.name)
except ChannelStopError:
_LOGGER.debug(log("stop."))
_LOGGER.debug(log("Channel stop."))
break
try:
......@@ -738,5 +736,5 @@ class VirtualOp(Op):
self._push_to_output_channels(
data, channels=output_channels, name=name)
except ChannelStopError:
_LOGGER.debug(log("stop."))
_LOGGER.debug(log("Channel stop."))
break
......@@ -103,7 +103,8 @@ class PipelineServer(object):
for key in default_config.keys():
_LOGGER.info("{}: {}".format(key, yml_config[key]))
if self._build_dag_each_worker is True:
_LOGGER.info("(Make sure that install grpcio whl with --no-binary flag)")
_LOGGER.info(
"(Make sure that install grpcio whl with --no-binary flag)")
_LOGGER.info("-------------------------------------------")
self._dag_config = yml_config.get("dag", {})
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册