提交 5549d161 编写于 作者: B barrierye

update tracer

上级 4fee0cee
...@@ -24,6 +24,7 @@ else: ...@@ -24,6 +24,7 @@ else:
raise Exception("Error Python version") raise Exception("Error Python version")
import os import os
import logging import logging
import collections
from .operator import Op, RequestOp, ResponseOp, VirtualOp from .operator import Op, RequestOp, ResponseOp, VirtualOp
from .channel import (ThreadChannel, ProcessChannel, ChannelData, from .channel import (ThreadChannel, ProcessChannel, ChannelData,
...@@ -284,12 +285,14 @@ class DAGExecutor(object): ...@@ -284,12 +285,14 @@ class DAGExecutor(object):
end_call = self._profiler.record("call_{}#DAG_1".format(data_id)) end_call = self._profiler.record("call_{}#DAG_1".format(data_id))
if self._tracer is not None: if self._tracer is not None:
if resp_channeldata.ecode == ChannelDataEcode.OK.value: trace_buffer.put_nowait({
trace_buffer.put(("DAG", "call_{}".format(data_id), True, "name": "DAG",
end_call - start_call)) "id": data_id,
else: "succ": resp_channeldata.ecode == ChannelDataEcode.OK.value,
trace_buffer.put(("DAG", "call_{}".format(data_id), False, "actions": {
end_call - start_call)) "call_{}".format(data_id): end_call - start_call,
},
})
profile_str = self._profiler.gen_profile_str() profile_str = self._profiler.gen_profile_str()
if self._server_use_profile: if self._server_use_profile:
......
...@@ -25,6 +25,12 @@ import sys ...@@ -25,6 +25,12 @@ import sys
import collections import collections
import numpy as np import numpy as np
from numpy import * from numpy import *
if sys.version_info.major == 2:
import Queue
elif sys.version_info.major == 3:
import queue as Queue
else:
raise Exception("Error Python version")
from .proto import pipeline_service_pb2 from .proto import pipeline_service_pb2
from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode, from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode,
...@@ -532,6 +538,7 @@ class Op(object): ...@@ -532,6 +538,7 @@ class Op(object):
op_info_prefix=op_info_prefix) op_info_prefix=op_info_prefix)
start, end = None, None start, end = None, None
trace_que = collections.deque()
while True: while True:
start = int(round(_time() * 1000000)) start = int(round(_time() * 1000000))
try: try:
...@@ -541,8 +548,7 @@ class Op(object): ...@@ -541,8 +548,7 @@ class Op(object):
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
end = int(round(_time() * 1000000)) end = int(round(_time() * 1000000))
if trace_buffer is not None: in_time = end - start
trace_buffer.put((self.name, "in", True, end - start))
# parse channeldata batch # parse channeldata batch
try: try:
...@@ -562,8 +568,7 @@ class Op(object): ...@@ -562,8 +568,7 @@ class Op(object):
preped_data_dict, err_channeldata_dict \ preped_data_dict, err_channeldata_dict \
= self._run_preprocess(parsed_data_dict, op_info_prefix) = self._run_preprocess(parsed_data_dict, op_info_prefix)
end = profiler.record("prep#{}_1".format(op_info_prefix)) end = profiler.record("prep#{}_1".format(op_info_prefix))
if trace_buffer is not None: prep_time = end - start
trace_buffer.put((self.name, "prep", True, end - start))
try: try:
for data_id, err_channeldata in err_channeldata_dict.items(): for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
...@@ -583,8 +588,7 @@ class Op(object): ...@@ -583,8 +588,7 @@ class Op(object):
midped_data_dict, err_channeldata_dict \ midped_data_dict, err_channeldata_dict \
= self._run_process(preped_data_dict, op_info_prefix) = self._run_process(preped_data_dict, op_info_prefix)
end = profiler.record("midp#{}_1".format(op_info_prefix)) end = profiler.record("midp#{}_1".format(op_info_prefix))
if trace_buffer is not None: midp_time = end - start
trace_buffer.put((self.name, "midp", True, end - start))
try: try:
for data_id, err_channeldata in err_channeldata_dict.items(): for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
...@@ -605,8 +609,7 @@ class Op(object): ...@@ -605,8 +609,7 @@ class Op(object):
= self._run_postprocess( = self._run_postprocess(
parsed_data_dict, midped_data_dict, op_info_prefix) parsed_data_dict, midped_data_dict, op_info_prefix)
end = profiler.record("postp#{}_1".format(op_info_prefix)) end = profiler.record("postp#{}_1".format(op_info_prefix))
if trace_buffer is not None: postp_time = end - start
trace_buffer.put((self.name, "postp", True, end - start))
try: try:
for data_id, err_channeldata in err_channeldata_dict.items(): for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
...@@ -639,8 +642,25 @@ class Op(object): ...@@ -639,8 +642,25 @@ class Op(object):
self._finalize(is_thread_op) self._finalize(is_thread_op)
break break
end = int(round(_time() * 1000000)) end = int(round(_time() * 1000000))
out_time = end - start
if trace_buffer is not None: if trace_buffer is not None:
trace_buffer.put((self.name, "out", True, end - start)) trace_que.append({
"name": self.name,
"actions": {
"in": in_time,
"prep": prep_time,
"midp": midp_time,
"postp": postp_time,
"out": out_time,
}
})
while trace_que:
info = trace_que[0]
try:
trace_buffer.put_nowait(info)
trace_que.popleft()
except Queue.Full:
break
def _initialize(self, is_thread_op, client_type, concurrency_idx): def _initialize(self, is_thread_op, client_type, concurrency_idx):
if is_thread_op: if is_thread_op:
......
...@@ -68,26 +68,35 @@ class PerformanceTracer(object): ...@@ -68,26 +68,35 @@ class PerformanceTracer(object):
self._channels = channels self._channels = channels
def _trace_func(self, channels): def _trace_func(self, channels):
actions = ["in", "prep", "midp", "postp", "out"] all_actions = ["in", "prep", "midp", "postp", "out"]
calcu_actions = ["prep", "midp", "postp"] calcu_actions = ["prep", "midp", "postp"]
while True: while True:
op_cost = {} op_cost = {}
err_request = []
err_count = 0 err_count = 0
_LOGGER.info("==================== TRACER ======================") _LOGGER.info("==================== TRACER ======================")
# op # op
while True: while True:
try: try:
name, action, stage, cost = self._data_buffer.get_nowait() item = self._data_buffer.get_nowait()
if stage == False: name = item["name"]
# only for name == DAG actions = item["actions"]
assert name == "DAG"
err_count += 1 if name == "DAG":
succ = item["succ"]
req_id = item["id"]
if not succ:
err_count += 1
err_request.append(req_id)
if name not in op_cost: if name not in op_cost:
op_cost[name] = {} op_cost[name] = {}
if action not in op_cost[name]:
op_cost[name][action] = [] for action, cost in actions.items():
op_cost[name][action].append(cost) if action not in op_cost[name]:
op_cost[name][action] = []
op_cost[name][action].append(cost)
except Queue.Empty: except Queue.Empty:
break break
...@@ -100,7 +109,7 @@ class PerformanceTracer(object): ...@@ -100,7 +109,7 @@ class PerformanceTracer(object):
if name != "DAG": if name != "DAG":
_LOGGER.info("Op({}):".format(name)) _LOGGER.info("Op({}):".format(name))
for action in actions: for action in all_actions:
if action in op_cost[name]: if action in op_cost[name]:
_LOGGER.info("\t{}[{} ms]".format( _LOGGER.info("\t{}[{} ms]".format(
action, op_cost[name][action])) action, op_cost[name][action]))
...@@ -118,10 +127,11 @@ class PerformanceTracer(object): ...@@ -118,10 +127,11 @@ class PerformanceTracer(object):
ave_cost = sum(calls) / tot ave_cost = sum(calls) / tot
latencys = [50, 60, 70, 80, 90, 95, 99] latencys = [50, 60, 70, 80, 90, 95, 99]
_LOGGER.info("DAGExecutor:") _LOGGER.info("DAGExecutor:")
_LOGGER.info("\tquery count[{}]".format(tot)) _LOGGER.info("\tQuery count[{}]".format(tot))
_LOGGER.info("\tqps[{} q/s]".format(qps)) _LOGGER.info("\tQPS[{} q/s]".format(qps))
_LOGGER.info("\tsucc[{}]".format(1 - 1.0 * err_count / tot)) _LOGGER.info("\tSucc[{}]".format(1 - 1.0 * err_count / tot))
_LOGGER.info("\tlatency:") _LOGGER.info("\tError req[{}]".format([", ".join([str(x) for x in err_request])]))
_LOGGER.info("\tLatency:")
_LOGGER.info("\t\tave[{} ms]".format(ave_cost)) _LOGGER.info("\t\tave[{} ms]".format(ave_cost))
for latency in latencys: for latency in latencys:
_LOGGER.info("\t\t.{}[{} ms]".format(latency, calls[int( _LOGGER.info("\t\t.{}[{} ms]".format(latency, calls[int(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册