提交 a70aa65b 编写于 作者: B barriery

update log

上级 6c00a00f
...@@ -11,7 +11,11 @@ ...@@ -11,7 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logger # this module must be the first to import
from operator import Op, RequestOp, ResponseOp
from pipeline_server import PipelineServer
from pipeline_client import PipelineClient
from analyse import Analyst
from operator import Op, RequestOp, ResponseOp from operator import Op, RequestOp, ResponseOp
from pipeline_server import PipelineServer from pipeline_server import PipelineServer
from pipeline_client import PipelineClient from pipeline_client import PipelineClient
......
...@@ -164,7 +164,7 @@ class OpAnalyst(object): ...@@ -164,7 +164,7 @@ class OpAnalyst(object):
def add(self, name_str, ts_list): def add(self, name_str, ts_list):
if self._close: if self._close:
_LOGGER.error("OpAnalyst is closed.") _LOGGER.error("Failed to add item: OpAnalyst is closed.")
return return
op_name, curr_idx, step = self._parse(name_str) op_name, curr_idx, step = self._parse(name_str)
if op_name not in self.op_time_list_dict: if op_name not in self.op_time_list_dict:
......
此差异已折叠。
...@@ -30,6 +30,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelData, ...@@ -30,6 +30,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelData,
ChannelDataEcode, ChannelDataType, ChannelStopError) ChannelDataEcode, ChannelDataType, ChannelStopError)
from .profiler import TimeProfiler from .profiler import TimeProfiler
from .util import NameGenerator from .util import NameGenerator
from .proto import pipeline_service_pb2
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger()
...@@ -74,17 +75,18 @@ class DAGExecutor(object): ...@@ -74,17 +75,18 @@ class DAGExecutor(object):
self._recive_func = threading.Thread( self._recive_func = threading.Thread(
target=DAGExecutor._recive_out_channel_func, args=(self, )) target=DAGExecutor._recive_out_channel_func, args=(self, ))
self._recive_func.start() self._recive_func.start()
_LOGGER.debug("[DAG Executor] start recive thread") _LOGGER.debug("[DAG Executor] Start recive thread")
def stop(self): def stop(self):
self._dag.stop() self._dag.stop()
self._dag.join() self._dag.join()
_LOGGER.info("[DAG Executor] succ stop") _LOGGER.info("[DAG Executor] Stop")
def _get_next_data_id(self): def _get_next_data_id(self):
data_id = None data_id = None
with self._id_lock: with self._id_lock:
if self._id_counter >= self._reset_max_id: if self._id_counter >= self._reset_max_id:
_LOGGER.info("[DAG Executor] Reset request id")
self._id_counter -= self._reset_max_id self._id_counter -= self._reset_max_id
data_id = self._id_counter data_id = self._id_counter
self._id_counter += 1 self._id_counter += 1
...@@ -96,16 +98,18 @@ class DAGExecutor(object): ...@@ -96,16 +98,18 @@ class DAGExecutor(object):
def _set_in_channel(self, in_channel): def _set_in_channel(self, in_channel):
if not isinstance(in_channel, (ThreadChannel, ProcessChannel)): if not isinstance(in_channel, (ThreadChannel, ProcessChannel)):
_LOGGER.critical("[DAG Executor] in_channel must be Channel" _LOGGER.critical("[DAG Executor] Failed to set in_channel: "
" type, but get {}".format(type(in_channel))) "in_channel must be Channel type, but get {}".
format(type(in_channel)))
os._exit(-1) os._exit(-1)
in_channel.add_producer(self.name) in_channel.add_producer(self.name)
self._in_channel = in_channel self._in_channel = in_channel
def _set_out_channel(self, out_channel): def _set_out_channel(self, out_channel):
if not isinstance(out_channel, (ThreadChannel, ProcessChannel)): if not isinstance(out_channel, (ThreadChannel, ProcessChannel)):
_LOGGER.critical("[DAG Executor]iout_channel must be Channel" _LOGGER.critical("[DAG Executor] Failed to set out_channel: "
" type, but get {}".format(type(out_channel))) "must be Channel type, but get {}".format(
type(out_channel)))
os._exit(-1) os._exit(-1)
out_channel.add_consumer(self.name) out_channel.add_consumer(self.name)
self._out_channel = out_channel self._out_channel = out_channel
...@@ -116,7 +120,7 @@ class DAGExecutor(object): ...@@ -116,7 +120,7 @@ class DAGExecutor(object):
try: try:
channeldata_dict = self._out_channel.front(self.name) channeldata_dict = self._out_channel.front(self.name)
except ChannelStopError: except ChannelStopError:
_LOGGER.info("[DAG Executor] channel stop.") _LOGGER.info("[DAG Executor] Stop.")
with self._cv_for_cv_pool: with self._cv_for_cv_pool:
for data_id, cv in self._cv_pool.items(): for data_id, cv in self._cv_pool.items():
closed_errror_data = ChannelData( closed_errror_data = ChannelData(
...@@ -130,17 +134,20 @@ class DAGExecutor(object): ...@@ -130,17 +134,20 @@ class DAGExecutor(object):
if len(channeldata_dict) != 1: if len(channeldata_dict) != 1:
_LOGGER.critical( _LOGGER.critical(
"[DAG Executor] out_channel cannot have multiple input ops") "[DAG Executor] Failed to fetch result: out_channel "
"cannot have multiple input ops")
os._exit(-1) os._exit(-1)
(_, channeldata), = channeldata_dict.items() (_, channeldata), = channeldata_dict.items()
if not isinstance(channeldata, ChannelData): if not isinstance(channeldata, ChannelData):
_LOGGER.critical( _LOGGER.critical(
'[DAG Executor] data must be ChannelData type, but get {}' '[DAG Executor] Failed to fetch result: data in out_channel" \
" must be ChannelData type, but get {}'
.format(type(channeldata))) .format(type(channeldata)))
os._exit(-1) os._exit(-1)
data_id = channeldata.id data_id = channeldata.id
_LOGGER.debug("recive thread fetch data[{}]".format(data_id)) _LOGGER.debug("(logid={}) [recive thread] Fetched data".format(
data_id))
with self._cv_for_cv_pool: with self._cv_for_cv_pool:
cond_v = self._cv_pool[data_id] cond_v = self._cv_pool[data_id]
with cond_v: with cond_v:
...@@ -164,7 +171,7 @@ class DAGExecutor(object): ...@@ -164,7 +171,7 @@ class DAGExecutor(object):
ready_data = self._fetch_buffer[data_id] ready_data = self._fetch_buffer[data_id]
self._cv_pool.pop(data_id) self._cv_pool.pop(data_id)
self._fetch_buffer.pop(data_id) self._fetch_buffer.pop(data_id)
_LOGGER.debug("resp thread get resp data[{}]".format(data_id)) _LOGGER.debug("(logid={}) [resp thread] Got data".format(data_id))
return ready_data return ready_data
def _pack_channeldata(self, rpc_request, data_id): def _pack_channeldata(self, rpc_request, data_id):
...@@ -172,8 +179,10 @@ class DAGExecutor(object): ...@@ -172,8 +179,10 @@ class DAGExecutor(object):
try: try:
dictdata = self._unpack_rpc_func(rpc_request) dictdata = self._unpack_rpc_func(rpc_request)
except Exception as e: except Exception as e:
_LOGGER.error("parse RPC package to data[{}] Error: {}" _LOGGER.error(
.format(data_id, e)) "(logid={}) Failed to parse RPC request package: {}"
.format(data_id, e),
exc_info=True)
return ChannelData( return ChannelData(
ecode=ChannelDataEcode.RPC_PACKAGE_ERROR.value, ecode=ChannelDataEcode.RPC_PACKAGE_ERROR.value,
error_info="rpc package error: {}".format(e), error_info="rpc package error: {}".format(e),
...@@ -187,7 +196,7 @@ class DAGExecutor(object): ...@@ -187,7 +196,7 @@ class DAGExecutor(object):
profile_value = rpc_request.value[idx] profile_value = rpc_request.value[idx]
break break
client_need_profile = (profile_value == self._client_profile_value) client_need_profile = (profile_value == self._client_profile_value)
_LOGGER.debug("request[{}] need profile: {}".format( _LOGGER.debug("(logid={}) Need profile in client: {}".format(
data_id, client_need_profile)) data_id, client_need_profile))
return ChannelData( return ChannelData(
datatype=ChannelDataType.DICT.value, datatype=ChannelDataType.DICT.value,
...@@ -197,26 +206,28 @@ class DAGExecutor(object): ...@@ -197,26 +206,28 @@ class DAGExecutor(object):
def call(self, rpc_request): def call(self, rpc_request):
data_id, cond_v = self._get_next_data_id() data_id, cond_v = self._get_next_data_id()
_LOGGER.debug("generate Request id: {}".format(data_id)) _LOGGER.info("(logid={}) Succ generate id".format(data_id))
start_call, end_call = None, None
if not self._is_thread_op: if not self._is_thread_op:
self._profiler.record("call_{}#DAG-{}_0".format(data_id, data_id)) start_call = self._profiler.record("call_{}#DAG-{}_0".format(
data_id, data_id))
else: else:
self._profiler.record("call_{}#DAG_0".format(data_id)) start_call = self._profiler.record("call_{}#DAG_0".format(data_id))
_LOGGER.debug("try parse RPC request to channeldata[{}]".format( _LOGGER.debug("(logid={}) Parsing RPC request package".format(data_id))
data_id))
self._profiler.record("prepack_{}#{}_0".format(data_id, self.name)) self._profiler.record("prepack_{}#{}_0".format(data_id, self.name))
req_channeldata = self._pack_channeldata(rpc_request, data_id) req_channeldata = self._pack_channeldata(rpc_request, data_id)
self._profiler.record("prepack_{}#{}_1".format(data_id, self.name)) self._profiler.record("prepack_{}#{}_1".format(data_id, self.name))
resp_channeldata = None resp_channeldata = None
for i in range(self._retry): for i in range(self._retry):
_LOGGER.debug("push data[{}] into Graph engine".format(data_id)) _LOGGER.debug("(logid={}) Pushing data into Graph engine".format(
data_id))
try: try:
self._in_channel.push(req_channeldata, self.name) self._in_channel.push(req_channeldata, self.name)
except ChannelStopError: except ChannelStopError:
_LOGGER.debug("[DAG Executor] channel stop.") _LOGGER.debug("[DAG Executor] Stop")
with self._cv_for_cv_pool: with self._cv_for_cv_pool:
self._cv_pool.pop(data_id) self._cv_pool.pop(data_id)
return self._pack_for_rpc_resp( return self._pack_for_rpc_resp(
...@@ -225,32 +236,35 @@ class DAGExecutor(object): ...@@ -225,32 +236,35 @@ class DAGExecutor(object):
error_info="dag closed.", error_info="dag closed.",
data_id=data_id)) data_id=data_id))
_LOGGER.debug("wait Graph engine for data[{}]...".format(data_id)) _LOGGER.debug("(logid={}) Wait for Graph engine...".format(data_id))
resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id, resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id,
cond_v) cond_v)
if resp_channeldata.ecode == ChannelDataEcode.OK.value: if resp_channeldata.ecode == ChannelDataEcode.OK.value:
_LOGGER.debug("request[{}] succ predict".format(data_id)) _LOGGER.debug("(logid={}) Succ predict".format(data_id))
break break
else: else:
_LOGGER.warning("request[{}] predict failed: {}" _LOGGER.error("(logid={}) Failed to predict: {}"
.format(data_id, resp_channeldata.error_info)) .format(data_id, resp_channeldata.error_info))
if resp_channeldata.ecode != ChannelDataEcode.TIMEOUT.value: if resp_channeldata.ecode != ChannelDataEcode.TIMEOUT.value:
break break
if i + 1 < self._retry: if i + 1 < self._retry:
_LOGGER.warning("retry({}/{}) data[{}]".format( _LOGGER.warning("(logid={}) DAGExecutor retry({}/{})".format(
i + 1, self._retry, data_id)) data_id, i + 1, self._retry))
_LOGGER.debug("unpack channeldata[{}] into RPC response".format( _LOGGER.debug("(logid={}) Packing RPC response package".format(data_id))
data_id))
self._profiler.record("postpack_{}#{}_0".format(data_id, self.name)) self._profiler.record("postpack_{}#{}_0".format(data_id, self.name))
rpc_resp = self._pack_for_rpc_resp(resp_channeldata) rpc_resp = self._pack_for_rpc_resp(resp_channeldata)
self._profiler.record("postpack_{}#{}_1".format(data_id, self.name)) self._profiler.record("postpack_{}#{}_1".format(data_id, self.name))
if not self._is_thread_op: if not self._is_thread_op:
self._profiler.record("call_{}#DAG-{}_1".format(data_id, data_id)) end_call = self._profiler.record("call_{}#DAG-{}_1".format(data_id,
data_id))
else: else:
self._profiler.record("call_{}#DAG_1".format(data_id)) end_call = self._profiler.record("call_{}#DAG_1".format(data_id))
_LOGGER.log(level=1,
msg="(logid={}) call[{} ms]".format(
data_id, (end_call - start_call) / 1e3))
profile_str = self._profiler.gen_profile_str() profile_str = self._profiler.gen_profile_str()
if self._server_use_profile: if self._server_use_profile:
...@@ -268,7 +282,17 @@ class DAGExecutor(object): ...@@ -268,7 +282,17 @@ class DAGExecutor(object):
return rpc_resp return rpc_resp
def _pack_for_rpc_resp(self, channeldata): def _pack_for_rpc_resp(self, channeldata):
try:
return self._pack_rpc_func(channeldata) return self._pack_rpc_func(channeldata)
except Exception as e:
_LOGGER.error(
"(logid={}) Failed to pack RPC response package: {}"
.format(channeldata.id, e),
exc_info=True)
resp = pipeline_service_pb2.Response()
resp.ecode = ChannelDataEcode.RPC_PACKAGE_ERROR.value
resp.error_info = "rpc package error: {}".format(e)
return resp
class DAG(object): class DAG(object):
...@@ -283,7 +307,7 @@ class DAG(object): ...@@ -283,7 +307,7 @@ class DAG(object):
self._build_dag_each_worker = build_dag_each_worker self._build_dag_each_worker = build_dag_each_worker
if not self._is_thread_op: if not self._is_thread_op:
self._manager = multiprocessing.Manager() self._manager = multiprocessing.Manager()
_LOGGER.info("[DAG] succ init") _LOGGER.info("[DAG] Succ init")
def get_use_ops(self, response_op): def get_use_ops(self, response_op):
unique_names = set() unique_names = set()
...@@ -303,7 +327,8 @@ class DAG(object): ...@@ -303,7 +327,8 @@ class DAG(object):
used_ops.add(pred_op) used_ops.add(pred_op)
# check the name of op is globally unique # check the name of op is globally unique
if pred_op.name in unique_names: if pred_op.name in unique_names:
_LOGGER.critical("the name of Op must be unique: {}". _LOGGER.critical("Failed to get used Ops: the"
" name of Op must be unique: {}".
format(pred_op.name)) format(pred_op.name))
os._exit(-1) os._exit(-1)
unique_names.add(pred_op.name) unique_names.add(pred_op.name)
...@@ -317,12 +342,12 @@ class DAG(object): ...@@ -317,12 +342,12 @@ class DAG(object):
else: else:
channel = ProcessChannel( channel = ProcessChannel(
self._manager, name=name_gen.next(), maxsize=self._channel_size) self._manager, name=name_gen.next(), maxsize=self._channel_size)
_LOGGER.debug("[DAG] gen Channel: {}".format(channel.name)) _LOGGER.debug("[DAG] Generate channel: {}".format(channel.name))
return channel return channel
def _gen_virtual_op(self, name_gen): def _gen_virtual_op(self, name_gen):
vir_op = VirtualOp(name=name_gen.next()) vir_op = VirtualOp(name=name_gen.next())
_LOGGER.debug("[DAG] gen VirtualOp: {}".format(vir_op.name)) _LOGGER.debug("[DAG] Generate virtual_op: {}".format(vir_op.name))
return vir_op return vir_op
def _topo_sort(self, used_ops, response_op, out_degree_ops): def _topo_sort(self, used_ops, response_op, out_degree_ops):
...@@ -337,7 +362,8 @@ class DAG(object): ...@@ -337,7 +362,8 @@ class DAG(object):
if len(op.get_input_ops()) == 0: if len(op.get_input_ops()) == 0:
zero_indegree_num += 1 zero_indegree_num += 1
if zero_indegree_num != 1: if zero_indegree_num != 1:
_LOGGER.critical("DAG contains multiple RequestOps") _LOGGER.critical("Failed to topo sort: DAG contains "
"multiple RequestOps")
os._exit(-1) os._exit(-1)
last_op = response_op.get_input_ops()[0] last_op = response_op.get_input_ops()[0]
ques[que_idx].put(last_op) ques[que_idx].put(last_op)
...@@ -362,14 +388,15 @@ class DAG(object): ...@@ -362,14 +388,15 @@ class DAG(object):
break break
que_idx = (que_idx + 1) % 2 que_idx = (que_idx + 1) % 2
if sorted_op_num < len(used_ops): if sorted_op_num < len(used_ops):
_LOGGER.critical("not legal DAG") _LOGGER.critical("Failed to topo sort: not legal DAG")
os._exit(-1) os._exit(-1)
return dag_views, last_op return dag_views, last_op
def _build_dag(self, response_op): def _build_dag(self, response_op):
if response_op is None: if response_op is None:
_LOGGER.critical("ResponseOp has not been set.") _LOGGER.critical("Failed to build DAG: ResponseOp"
" has not been set.")
os._exit(-1) os._exit(-1)
used_ops, out_degree_ops = self.get_use_ops(response_op) used_ops, out_degree_ops = self.get_use_ops(response_op)
if not self._build_dag_each_worker: if not self._build_dag_each_worker:
...@@ -380,8 +407,8 @@ class DAG(object): ...@@ -380,8 +407,8 @@ class DAG(object):
_LOGGER.info("-------------------------------------------") _LOGGER.info("-------------------------------------------")
if len(used_ops) <= 1: if len(used_ops) <= 1:
_LOGGER.critical( _LOGGER.critical(
"Besides RequestOp and ResponseOp, there should be at least one Op in DAG." "Failed to build DAG: besides RequestOp and ResponseOp, "
) "there should be at least one Op in DAG.")
os._exit(-1) os._exit(-1)
if self._build_dag_each_worker: if self._build_dag_each_worker:
_LOGGER.info("Because `build_dag_each_worker` mode is used, " _LOGGER.info("Because `build_dag_each_worker` mode is used, "
...@@ -443,8 +470,6 @@ class DAG(object): ...@@ -443,8 +470,6 @@ class DAG(object):
continue continue
channel = self._gen_channel(channel_name_gen) channel = self._gen_channel(channel_name_gen)
channels.append(channel) channels.append(channel)
_LOGGER.debug("[DAG] Channel({}) => Op({})"
.format(channel.name, op.name))
op.add_input_channel(channel) op.add_input_channel(channel)
pred_ops = pred_op_of_next_view_op[op.name] pred_ops = pred_op_of_next_view_op[op.name]
if v_idx == 0: if v_idx == 0:
...@@ -452,8 +477,6 @@ class DAG(object): ...@@ -452,8 +477,6 @@ class DAG(object):
else: else:
# if pred_op is virtual op, it will use ancestors as producers to channel # if pred_op is virtual op, it will use ancestors as producers to channel
for pred_op in pred_ops: for pred_op in pred_ops:
_LOGGER.debug("[DAG] Op({}) => Channel({})"
.format(pred_op.name, channel.name))
pred_op.add_output_channel(channel) pred_op.add_output_channel(channel)
processed_op.add(op.name) processed_op.add(op.name)
# find same input op to combine channel # find same input op to combine channel
...@@ -469,8 +492,6 @@ class DAG(object): ...@@ -469,8 +492,6 @@ class DAG(object):
same_flag = False same_flag = False
break break
if same_flag: if same_flag:
_LOGGER.debug("[DAG] Channel({}) => Op({})"
.format(channel.name, other_op.name))
other_op.add_input_channel(channel) other_op.add_input_channel(channel)
processed_op.add(other_op.name) processed_op.add(other_op.name)
output_channel = self._gen_channel(channel_name_gen) output_channel = self._gen_channel(channel_name_gen)
...@@ -488,7 +509,7 @@ class DAG(object): ...@@ -488,7 +509,7 @@ class DAG(object):
actual_ops.append(op) actual_ops.append(op)
for c in channels: for c in channels:
_LOGGER.debug("Channel({}):\n\t-producers: {}\n\t-consumers: {}" _LOGGER.debug("Channel({}):\n\t- producers: {}\n\t- consumers: {}"
.format(c.name, c.get_producers(), c.get_consumers())) .format(c.name, c.get_producers(), c.get_consumers()))
return (actual_ops, channels, input_channel, output_channel, pack_func, return (actual_ops, channels, input_channel, output_channel, pack_func,
...@@ -497,7 +518,7 @@ class DAG(object): ...@@ -497,7 +518,7 @@ class DAG(object):
def build(self): def build(self):
(actual_ops, channels, input_channel, output_channel, pack_func, (actual_ops, channels, input_channel, output_channel, pack_func,
unpack_func) = self._build_dag(self._response_op) unpack_func) = self._build_dag(self._response_op)
_LOGGER.info("[DAG] succ build dag") _LOGGER.info("[DAG] Succ build DAG")
self._actual_ops = actual_ops self._actual_ops = actual_ops
self._channels = channels self._channels = channels
......
此差异已折叠。
...@@ -67,9 +67,11 @@ class PipelineServer(object): ...@@ -67,9 +67,11 @@ class PipelineServer(object):
def set_response_op(self, response_op): def set_response_op(self, response_op):
if not isinstance(response_op, ResponseOp): if not isinstance(response_op, ResponseOp):
raise Exception("response_op must be ResponseOp type.") raise Exception("Failed to set response_op: response_op "
"must be ResponseOp type.")
if len(response_op.get_input_ops()) != 1: if len(response_op.get_input_ops()) != 1:
raise Exception("response_op can only have one previous op.") raise Exception("Failed to set response_op: response_op "
"can only have one previous op.")
self._response_op = response_op self._response_op = response_op
def _port_is_available(self, port): def _port_is_available(self, port):
...@@ -83,7 +85,8 @@ class PipelineServer(object): ...@@ -83,7 +85,8 @@ class PipelineServer(object):
self._port = conf["port"] self._port = conf["port"]
if not self._port_is_available(self._port): if not self._port_is_available(self._port):
raise SystemExit("Prot {} is already used".format(self._port)) raise SystemExit("Failed to prepare_server: prot {} "
"is already used".format(self._port))
self._worker_num = conf["worker_num"] self._worker_num = conf["worker_num"]
self._build_dag_each_worker = conf["build_dag_each_worker"] self._build_dag_each_worker = conf["build_dag_each_worker"]
......
...@@ -43,8 +43,9 @@ class UnsafeTimeProfiler(object): ...@@ -43,8 +43,9 @@ class UnsafeTimeProfiler(object):
def record(self, name): def record(self, name):
if self._enable is False: if self._enable is False:
return return
self.time_record.append('{}:{} '.format(name, timestamp = int(round(_time() * 1000000))
int(round(_time() * 1000000)))) self.time_record.append('{}:{} '.format(name, timestamp))
return timestamp
def print_profile(self): def print_profile(self):
if self._enable is False: if self._enable is False:
...@@ -80,6 +81,7 @@ class TimeProfiler(object): ...@@ -80,6 +81,7 @@ class TimeProfiler(object):
name = '_'.join(name_with_tag[:-1]) name = '_'.join(name_with_tag[:-1])
with self._lock: with self._lock:
self._time_record.put((name, tag, timestamp)) self._time_record.put((name, tag, timestamp))
return timestamp
def print_profile(self): def print_profile(self):
if self._enable is False: if self._enable is False:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册