未验证 提交 22e859e8 编写于 作者: C chenjian 提交者: GitHub

[Bug] Robust code for profiler server (#1221)

* robust code for profiler

* robust code
上级 de6bac41
...@@ -131,6 +131,8 @@ class ProfilerData: ...@@ -131,6 +131,8 @@ class ProfilerData:
return views return views
def get_device_infos(self): def get_device_infos(self):
if not self.overview_parser:
return
if not self.overview_parser.has_device: if not self.overview_parser.has_device:
device_type = 'CPU' device_type = 'CPU'
return { return {
...@@ -219,6 +221,8 @@ class ProfilerData: ...@@ -219,6 +221,8 @@ class ProfilerData:
''' '''
Get total cpu and gpu statistics for model perspective of each profiler step. Get total cpu and gpu statistics for model perspective of each profiler step.
''' '''
if not self.overview_parser:
return
data = OrderedDict() data = OrderedDict()
data['column_name'] = [ data['column_name'] = [
"name", "calls", "total_time", "avg_time", "max_time", "min_time", "name", "calls", "total_time", "avg_time", "max_time", "min_time",
...@@ -281,6 +285,8 @@ class ProfilerData: ...@@ -281,6 +285,8 @@ class ProfilerData:
return data return data
def get_model_perspective_perstep(self, device_type, time_unit): def get_model_perspective_perstep(self, device_type, time_unit):
if not self.overview_parser:
return
try: try:
data = OrderedDict() data = OrderedDict()
data['order'] = [] data['order'] = []
...@@ -329,6 +335,8 @@ class ProfilerData: ...@@ -329,6 +335,8 @@ class ProfilerData:
return new_data return new_data
def get_event_type_perspective(self, device_type, time_unit): def get_event_type_perspective(self, device_type, time_unit):
if not self.overview_parser:
return
data = OrderedDict() data = OrderedDict()
data['order'] = [] data['order'] = []
if device_type == 'cpu': if device_type == 'cpu':
...@@ -416,6 +424,8 @@ class ProfilerData: ...@@ -416,6 +424,8 @@ class ProfilerData:
return data return data
def get_event_type_model_perspective(self, time_unit): # noqa: C901 def get_event_type_model_perspective(self, time_unit): # noqa: C901
if not self.overview_parser:
return
data = OrderedDict() data = OrderedDict()
data['order'] = [] data['order'] = []
data['phase_type'] = [] data['phase_type'] = []
...@@ -470,6 +480,8 @@ class ProfilerData: ...@@ -470,6 +480,8 @@ class ProfilerData:
return newdata return newdata
def get_userdefined_perspective(self, time_unit): def get_userdefined_perspective(self, time_unit):
if not self.overview_parser:
return
data = OrderedDict() data = OrderedDict()
if self.overview_parser.has_device: if self.overview_parser.has_device:
data['column_name'] = [ data['column_name'] = [
...@@ -542,6 +554,8 @@ class ProfilerData: ...@@ -542,6 +554,8 @@ class ProfilerData:
return data return data
def get_operator_pie(self, topk, time_unit='ms'): def get_operator_pie(self, topk, time_unit='ms'):
if not self.operator_parser:
return
data = OrderedDict() data = OrderedDict()
data['column_name'] = [ data['column_name'] = [
"name", "calls", "total_time", "avg_time", "max_time", "min_time", "name", "calls", "total_time", "avg_time", "max_time", "min_time",
...@@ -611,6 +625,8 @@ class ProfilerData: ...@@ -611,6 +625,8 @@ class ProfilerData:
def get_operator_pie_expand( # noqa: C901 def get_operator_pie_expand( # noqa: C901
self, topk, device_type, time_unit): self, topk, device_type, time_unit):
if not self.operator_parser:
return
data = OrderedDict() data = OrderedDict()
data['order'] = [] data['order'] = []
data['phase_type'] = [] data['phase_type'] = []
...@@ -713,6 +729,9 @@ class ProfilerData: ...@@ -713,6 +729,9 @@ class ProfilerData:
group_by='op_name', group_by='op_name',
search_name=None, search_name=None,
time_unit='ms'): time_unit='ms'):
if not self.operator_parser:
return
def get_children_data(event): def get_children_data(event):
datas = [] datas = []
for innerop_name, item in event.operator_inners.items(): for innerop_name, item in event.operator_inners.items():
...@@ -1359,6 +1378,8 @@ class ProfilerData: ...@@ -1359,6 +1378,8 @@ class ProfilerData:
return data return data
def get_kernel_pie(self, topk, time_unit='ms'): def get_kernel_pie(self, topk, time_unit='ms'):
if not self.kernel_parser:
return
data = OrderedDict() data = OrderedDict()
data['column_name'] = [ data['column_name'] = [
"name", "calls", "total_time", "avg_time", "max_time", "min_time", "name", "calls", "total_time", "avg_time", "max_time", "min_time",
...@@ -1405,6 +1426,8 @@ class ProfilerData: ...@@ -1405,6 +1426,8 @@ class ProfilerData:
return data return data
def get_kernel_table(self, group_by='', search_name=None, time_unit='ms'): def get_kernel_table(self, group_by='', search_name=None, time_unit='ms'):
if not self.kernel_parser:
return
data = OrderedDict() data = OrderedDict()
data['events'] = [] data['events'] = []
total_gpu_time = 0 total_gpu_time = 0
...@@ -1561,6 +1584,8 @@ class ProfilerData: ...@@ -1561,6 +1584,8 @@ class ProfilerData:
return data return data
def get_kernel_tc_pie(self, topk, time_unit='ms'): def get_kernel_tc_pie(self, topk, time_unit='ms'):
if not self.kernel_parser:
return
data = OrderedDict() data = OrderedDict()
data['column_name'] = ["name", "calls", "ratio"] data['column_name'] = ["name", "calls", "ratio"]
...@@ -1602,9 +1627,13 @@ class ProfilerData: ...@@ -1602,9 +1627,13 @@ class ProfilerData:
return data return data
def get_trace_data(self): def get_trace_data(self):
if not self.trace_parser:
return
return self.trace_parser.content return self.trace_parser.content
def get_memory_devices(self): def get_memory_devices(self):
if not self.memory_parser:
return
data = [] data = []
for device in self.memory_curve.keys(): for device in self.memory_curve.keys():
data.append({ data.append({
...@@ -1620,6 +1649,8 @@ class ProfilerData: ...@@ -1620,6 +1649,8 @@ class ProfilerData:
return data return data
def get_memory_curve(self, device_type, time_unit='ms'): def get_memory_curve(self, device_type, time_unit='ms'):
if not self.memory_parser:
return
curves = self.memory_curve[device_type] curves = self.memory_curve[device_type]
data = {} data = {}
data['name'] = { data['name'] = {
...@@ -1647,6 +1678,8 @@ class ProfilerData: ...@@ -1647,6 +1678,8 @@ class ProfilerData:
max_size=float('inf'), max_size=float('inf'),
search_name=None, search_name=None,
time_unit='ms'): time_unit='ms'):
if not self.memory_parser:
return
data = {} data = {}
data['column_name'] = [ data['column_name'] = [
'MemoryAddr', 'MemoryType', 'AllocatedEvent', 'AllocatedTimestamp', 'MemoryAddr', 'MemoryType', 'AllocatedEvent', 'AllocatedTimestamp',
...@@ -1705,6 +1738,8 @@ class ProfilerData: ...@@ -1705,6 +1738,8 @@ class ProfilerData:
return data return data
def get_op_memory_events(self, device_type, search_name=None): def get_op_memory_events(self, device_type, search_name=None):
if not self.memory_parser:
return
data = {} data = {}
data['column_name'] = [ data['column_name'] = [
'EventName', 'MemoryType', 'AllocationCount', 'FreeCount', 'EventName', 'MemoryType', 'AllocationCount', 'FreeCount',
......
...@@ -39,11 +39,15 @@ class ProfilerApi(object): ...@@ -39,11 +39,15 @@ class ProfilerApi(object):
if view == 'Distributed': if view == 'Distributed':
return ['All'] return ['All']
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
return run_manager.get_workers(view) return run_manager.get_workers(view)
@result() @result()
def spans(self, run, worker): def spans(self, run, worker):
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
if worker == 'All': if worker == 'All':
return run_manager.get_distributed_spans() return run_manager.get_distributed_spans()
return run_manager.get_spans(worker) return run_manager.get_spans(worker)
...@@ -70,18 +74,24 @@ class ProfilerApi(object): ...@@ -70,18 +74,24 @@ class ProfilerApi(object):
@result() @result()
def overview_environment(self, run, worker, span): def overview_environment(self, run, worker, span):
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
span = str(span) span = str(span)
profiler_data = run_manager.get_profiler_data(worker, span) profiler_data = run_manager.get_profiler_data(worker, span)
result = profiler_data.get_device_infos() if profiler_data:
num_workers = len(run_manager.get_workers('Overview')) result = profiler_data.get_device_infos()
result['num_workers'] = num_workers num_workers = len(run_manager.get_workers('Overview'))
return result result['num_workers'] = num_workers
return result
@result() @result()
def model_perspective(self, run, worker, span, time_unit='ms'): def model_perspective(self, run, worker, span, time_unit='ms'):
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
profiler_data = run_manager.get_profiler_data(worker, span) profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_model_perspective(time_unit) if profiler_data:
return profiler_data.get_model_perspective(time_unit)
@result() @result()
def model_perspective_perstep(self, def model_perspective_perstep(self,
...@@ -92,9 +102,12 @@ class ProfilerApi(object): ...@@ -92,9 +102,12 @@ class ProfilerApi(object):
time_unit='ms'): time_unit='ms'):
device_type = device_type.lower() device_type = device_type.lower()
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
profiler_data = run_manager.get_profiler_data(worker, span) profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_model_perspective_perstep( if profiler_data:
device_type, time_unit) return profiler_data.get_model_perspective_perstep(
device_type, time_unit)
@result() @result()
def event_type_perspective(self, def event_type_perspective(self,
...@@ -105,38 +118,54 @@ class ProfilerApi(object): ...@@ -105,38 +118,54 @@ class ProfilerApi(object):
time_unit='ms'): time_unit='ms'):
device_type = device_type.lower() device_type = device_type.lower()
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
profiler_data = run_manager.get_profiler_data(worker, span) profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_event_type_perspective(device_type, time_unit) if profiler_data:
return profiler_data.get_event_type_perspective(
device_type, time_unit)
@result() @result()
def event_type_model_perspective(self, run, worker, span, time_unit='ms'): def event_type_model_perspective(self, run, worker, span, time_unit='ms'):
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
profiler_data = run_manager.get_profiler_data(worker, span) profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_event_type_model_perspective(time_unit) if profiler_data:
return profiler_data.get_event_type_model_perspective(time_unit)
@result() @result()
def userdefined_perspective(self, run, worker, span, time_unit='ms'): def userdefined_perspective(self, run, worker, span, time_unit='ms'):
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
profiler_data = run_manager.get_profiler_data(worker, span) profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_userdefined_perspective(time_unit) if profiler_data:
return profiler_data.get_userdefined_perspective(time_unit)
@result() @result()
def operator_pie(self, run, worker, span, topk, time_unit='ms'): def operator_pie(self, run, worker, span, topk, time_unit='ms'):
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
profiler_data = run_manager.get_profiler_data(worker, span) profiler_data = run_manager.get_profiler_data(worker, span)
topk = int(topk) topk = int(topk)
return profiler_data.get_operator_pie(topk, time_unit) if profiler_data:
return profiler_data.get_operator_pie(topk, time_unit)
@result() @result()
def operator_pie_expand(self, run, worker, span, topk, device_type, def operator_pie_expand(self, run, worker, span, topk, device_type,
time_unit): time_unit):
device_type = device_type.lower() device_type = device_type.lower()
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
profiler_data = run_manager.get_profiler_data(worker, span) profiler_data = run_manager.get_profiler_data(worker, span)
topk = int(topk) topk = int(topk)
return profiler_data.get_operator_pie_expand(topk, device_type, if profiler_data:
time_unit) return profiler_data.get_operator_pie_expand(
topk, device_type, time_unit)
@result() @result()
def operator_table(self, def operator_table(self,
...@@ -147,9 +176,12 @@ class ProfilerApi(object): ...@@ -147,9 +176,12 @@ class ProfilerApi(object):
search_name, search_name,
time_unit='ms'): time_unit='ms'):
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
profiler_data = run_manager.get_profiler_data(worker, span) profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_operator_table(group_by, search_name, if profiler_data:
time_unit) return profiler_data.get_operator_table(group_by, search_name,
time_unit)
@result() @result()
def operator_stack_table(self, def operator_stack_table(self,
...@@ -165,9 +197,12 @@ class ProfilerApi(object): ...@@ -165,9 +197,12 @@ class ProfilerApi(object):
@result() @result()
def kernel_pie(self, run, worker, span, topk, time_unit='ms'): def kernel_pie(self, run, worker, span, topk, time_unit='ms'):
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
profiler_data = run_manager.get_profiler_data(worker, span) profiler_data = run_manager.get_profiler_data(worker, span)
topk = int(topk) topk = int(topk)
return profiler_data.get_kernel_pie(topk, time_unit) if profiler_data:
return profiler_data.get_kernel_pie(topk, time_unit)
@result() @result()
def kernel_table(self, def kernel_table(self,
...@@ -178,19 +213,28 @@ class ProfilerApi(object): ...@@ -178,19 +213,28 @@ class ProfilerApi(object):
search_name, search_name,
time_unit='ms'): time_unit='ms'):
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
profiler_data = run_manager.get_profiler_data(worker, span) profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_kernel_table(group_by, search_name, time_unit) if profiler_data:
return profiler_data.get_kernel_table(group_by, search_name,
time_unit)
@result() @result()
def kernel_tc_pie(self, run, worker, span, topk, time_unit='ms'): def kernel_tc_pie(self, run, worker, span, topk, time_unit='ms'):
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
profiler_data = run_manager.get_profiler_data(worker, span) profiler_data = run_manager.get_profiler_data(worker, span)
topk = int(topk) if profiler_data:
return profiler_data.get_kernel_tc_pie(topk, time_unit) topk = int(topk)
return profiler_data.get_kernel_tc_pie(topk, time_unit)
@result() @result()
def distributed_info(self, run, worker, span): def distributed_info(self, run, worker, span):
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
distributed_profiler_data = run_manager.get_distributed_profiler_data( distributed_profiler_data = run_manager.get_distributed_profiler_data(
span) span)
if distributed_profiler_data is None: if distributed_profiler_data is None:
...@@ -200,6 +244,8 @@ class ProfilerApi(object): ...@@ -200,6 +244,8 @@ class ProfilerApi(object):
@result() @result()
def distributed_steps(self, run, worker, span): def distributed_steps(self, run, worker, span):
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
distributed_profiler_data = run_manager.get_distributed_profiler_data( distributed_profiler_data = run_manager.get_distributed_profiler_data(
span) span)
if distributed_profiler_data is None: if distributed_profiler_data is None:
...@@ -209,6 +255,8 @@ class ProfilerApi(object): ...@@ -209,6 +255,8 @@ class ProfilerApi(object):
@result() @result()
def distributed_histogram(self, run, worker, span, step, time_unit='ms'): def distributed_histogram(self, run, worker, span, step, time_unit='ms'):
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
distributed_profiler_data = run_manager.get_distributed_profiler_data( distributed_profiler_data = run_manager.get_distributed_profiler_data(
span) span)
if distributed_profiler_data is None: if distributed_profiler_data is None:
...@@ -219,22 +267,31 @@ class ProfilerApi(object): ...@@ -219,22 +267,31 @@ class ProfilerApi(object):
@result(headers={'content-encoding': 'gzip'}) @result(headers={'content-encoding': 'gzip'})
def trace(self, run, worker, span): def trace(self, run, worker, span):
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
profiler_data = run_manager.get_profiler_data(worker, span) profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_trace_data() if profiler_data:
return profiler_data.get_trace_data()
@result() @result()
def memory_devices(self, run, worker, span): def memory_devices(self, run, worker, span):
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
profiler_data = run_manager.get_profiler_data(worker, span) profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_memory_devices() if profiler_data:
return profiler_data.get_memory_devices()
@result(headers={'content-encoding': 'gzip'}) @result(headers={'content-encoding': 'gzip'})
def memory_curve(self, run, worker, span, device_type, time_unit='ms'): def memory_curve(self, run, worker, span, device_type, time_unit='ms'):
if device_type == 'undefined': if device_type == 'undefined':
return return
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
profiler_data = run_manager.get_profiler_data(worker, span) profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_memory_curve(device_type, time_unit) if profiler_data:
return profiler_data.get_memory_curve(device_type, time_unit)
@result(headers={'content-encoding': 'gzip'}) @result(headers={'content-encoding': 'gzip'})
def memory_events(self, def memory_events(self,
...@@ -259,9 +316,12 @@ class ProfilerApi(object): ...@@ -259,9 +316,12 @@ class ProfilerApi(object):
if search_name == 'undefined' or not search_name: if search_name == 'undefined' or not search_name:
search_name = None search_name = None
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
profiler_data = run_manager.get_profiler_data(worker, span) profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_memory_events(device_type, min_size, max_size, if profiler_data:
search_name, time_unit) return profiler_data.get_memory_events(
device_type, min_size, max_size, search_name, time_unit)
@result(headers={'content-encoding': 'gzip'}) @result(headers={'content-encoding': 'gzip'})
def op_memory_events(self, def op_memory_events(self,
...@@ -275,8 +335,11 @@ class ProfilerApi(object): ...@@ -275,8 +335,11 @@ class ProfilerApi(object):
if device_type == 'undefined': if device_type == 'undefined':
return return
run_manager = self._reader.get_run_manager(run) run_manager = self._reader.get_run_manager(run)
if run_manager is None:
return []
profiler_data = run_manager.get_profiler_data(worker, span) profiler_data = run_manager.get_profiler_data(worker, span)
return profiler_data.get_op_memory_events(device_type, search_name) if profiler_data:
return profiler_data.get_op_memory_events(device_type, search_name)
@result() @result()
def comparison_phase(self, base_run, base_worker, base_span, exp_run, def comparison_phase(self, base_run, base_worker, base_span, exp_run,
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
# limitations under the License. # limitations under the License.
# ======================================================================= # =======================================================================
import collections import collections
import os # noqa: F401
from functools import partial # noqa: F401 from functools import partial # noqa: F401
from visualdl.component import components from visualdl.component import components
...@@ -150,8 +151,6 @@ class LogReader(object): ...@@ -150,8 +151,6 @@ class LogReader(object):
else: else:
file_path = bfile.join(run, self.walks[run]) file_path = bfile.join(run, self.walks[run])
reader = self._get_file_reader(file_path=file_path, update=False) reader = self._get_file_reader(file_path=file_path, update=False)
reader.dir = run
self.reader = reader
remain = self.get_remain(reader=reader) remain = self.get_remain(reader=reader)
data = self.read_log_data( data = self.read_log_data(
remain=remain, update=False)[component][tag] remain=remain, update=False)[component][tag]
...@@ -276,6 +275,7 @@ class LogReader(object): ...@@ -276,6 +275,7 @@ class LogReader(object):
if update: if update:
self.register_reader(file_path) self.register_reader(file_path)
self.reader = self.readers[file_path] self.reader = self.readers[file_path]
self.reader.dir = file_path
return self.reader return self.reader
else: else:
reader = RecordReader(filepath=file_path) reader = RecordReader(filepath=file_path)
...@@ -285,7 +285,6 @@ class LogReader(object): ...@@ -285,7 +285,6 @@ class LogReader(object):
if update: if update:
if path not in list(self.readers.keys()): if path not in list(self.readers.keys()):
reader = RecordReader(filepath=path, dir=dir) reader = RecordReader(filepath=path, dir=dir)
reader.dir = dir
self.readers[path] = reader self.readers[path] = reader
else: else:
pass pass
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册