提交 ef228651 编写于 作者: Z zhangyunshu

profiler: add AI CPU into timeline

上级 46c03af9
...@@ -69,7 +69,7 @@ class StepTraceAnalyser(BaseAnalyser): ...@@ -69,7 +69,7 @@ class StepTraceAnalyser(BaseAnalyser):
return self._result return self._result
def query_for_all_reduce(self, min_cycle_counter): def query_for_all_reduce(self):
""" """
Query for all reduce info. Query for all reduce info.
...@@ -81,7 +81,7 @@ class StepTraceAnalyser(BaseAnalyser): ...@@ -81,7 +81,7 @@ class StepTraceAnalyser(BaseAnalyser):
reduce_infos = [] reduce_infos = []
for row_info in self._data[:-1]: for row_info in self._data[:-1]:
row_info_dict = self._get_info_dict_from_row_data(row_info, 'systime') row_info_dict = self._get_info_dict_from_row_data(row_info, 'systime')
reduce_info = self._sort_reduce_by_time(row_info_dict, min_cycle_counter) reduce_info = self._sort_reduce_by_time(row_info_dict)
if reduce_info: if reduce_info:
reduce_infos.append(reduce_info) reduce_infos.append(reduce_info)
...@@ -252,13 +252,12 @@ class StepTraceAnalyser(BaseAnalyser): ...@@ -252,13 +252,12 @@ class StepTraceAnalyser(BaseAnalyser):
reduce_events.sort(key=lambda elem: elem[1]) reduce_events.sort(key=lambda elem: elem[1])
return reduce_info return reduce_info
def _sort_reduce_by_time(self, row_info_dict, min_cycle_counter): def _sort_reduce_by_time(self, row_info_dict):
""" """
Sort reduce info by time. Sort reduce info by time.
Args: Args:
row_info_dict (dict): Step trace information. row_info_dict (dict): Step trace information.
min_cycle_counter (int): The minimum cycle counter.
Returns: Returns:
list, including the all reduce info sorted by start time only. list, including the all reduce info sorted by start time only.
...@@ -275,7 +274,7 @@ class StepTraceAnalyser(BaseAnalyser): ...@@ -275,7 +274,7 @@ class StepTraceAnalyser(BaseAnalyser):
if field_name.startswith('stream_') and not field_name.endswith('point')] if field_name.startswith('stream_') and not field_name.endswith('point')]
for reduce_field in reduce_fields: for reduce_field in reduce_fields:
reduce_start = row_info_dict.get(reduce_field + '_start_point') reduce_start = row_info_dict.get(reduce_field + '_start_point')
reduce_start = (reduce_start - min_cycle_counter) / factor \ reduce_start = reduce_start / factor \
if reduce_start else 0 if reduce_start else 0
reduce_duration = row_info_dict.get(reduce_field) reduce_duration = row_info_dict.get(reduce_field)
reduce_duration = reduce_duration / factor if reduce_duration else 0 reduce_duration = reduce_duration / factor if reduce_duration else 0
......
...@@ -189,7 +189,7 @@ class TimelineAnalyser(BaseAnalyser): ...@@ -189,7 +189,7 @@ class TimelineAnalyser(BaseAnalyser):
return timeline_list return timeline_list
def _parse_timeline_data(self, timeline): def _parse_timeline_data(self, timeline, min_cycle_counter):
"""Parse timeline data.""" """Parse timeline data."""
# factor to convert the time unit from 1ms to 1us for timeline display # factor to convert the time unit from 1ms to 1us for timeline display
factor = 1000 factor = 1000
...@@ -198,15 +198,15 @@ class TimelineAnalyser(BaseAnalyser): ...@@ -198,15 +198,15 @@ class TimelineAnalyser(BaseAnalyser):
timeline_dict['name'] = op_meta.op_name timeline_dict['name'] = op_meta.op_name
timeline_dict['ph'] = 'X' timeline_dict['ph'] = 'X'
timeline_dict['tid'] = op_meta.stream_id timeline_dict['tid'] = op_meta.stream_id
timeline_dict['ts'] = op_meta.start_time * factor timeline_dict['ts'] = (op_meta.start_time - min_cycle_counter) * factor
dur = op_meta.duration * factor dur = op_meta.duration * factor
timeline_dict['dur'] = dur timeline_dict['dur'] = dur
if op_meta.pid == 10000: # AllReduce PID if op_meta.pid is None:
timeline_dict['pid'] = 10000
else:
timeline_dict['pid'] = int(self._device_id) timeline_dict['pid'] = int(self._device_id)
# Update total time of operator execution. # Update total time of operator execution.
self._timeline_summary['total_time'] += dur self._timeline_summary['total_time'] += dur
else: # AllReduce and AI CPU pid
timeline_dict['pid'] = op_meta.pid
self._timeline_meta.append(timeline_dict) self._timeline_meta.append(timeline_dict)
@staticmethod @staticmethod
...@@ -249,14 +249,19 @@ class TimelineAnalyser(BaseAnalyser): ...@@ -249,14 +249,19 @@ class TimelineAnalyser(BaseAnalyser):
return min_cycle_counter return min_cycle_counter
def init_timeline(self, all_reduce_info, framework_info): def init_timeline(self, all_reduce_info, framework_info, aicpu_info, min_cycle_counter):
""" """
Init timeline metadata, adding all collected info. Init timeline metadata, adding all collected info.
Args: Args:
all_reduce_info (list[list]): The metadata of AllReduce operator. all_reduce_info (list[list]): The metadata of AllReduce operator.
framework_info (dict): The framework metadata. framework_info (dict): The framework metadata.
aicpu_info (dict): The metadata of AI CPU operator.
min_cycle_counter (float): The minimum cycle counter of the timeline.
""" """
if min_cycle_counter == float('inf'):
min_cycle_counter = 0
logger.info('Initiating timeline...') logger.info('Initiating timeline...')
timeline_list = self._load_timeline_data() timeline_list = self._load_timeline_data()
self._timeline_summary['op_exe_times'] = len(timeline_list) self._timeline_summary['op_exe_times'] = len(timeline_list)
...@@ -267,10 +272,20 @@ class TimelineAnalyser(BaseAnalyser): ...@@ -267,10 +272,20 @@ class TimelineAnalyser(BaseAnalyser):
timeline_list.extend(all_reduce_info) timeline_list.extend(all_reduce_info)
timeline_list.sort(key=lambda x: float(x[2])) timeline_list.sort(key=lambda x: float(x[2]))
# Add AI CPU data into timeline temp list and sort by start time.
aicpu_data = aicpu_info.get('info')
if aicpu_data:
timeline_list.extend(aicpu_data)
timeline_list.sort(key=lambda x: float(x[2]))
self._timeline_summary['op_exe_times'] += aicpu_info.get('op_exe_times', 0)
self._timeline_summary['num_of_streams'] += aicpu_info.get('num_of_streams', 0)
self._timeline_summary['num_of_ops'] += aicpu_info.get('num_of_ops', 0)
self._timeline_summary['total_time'] += aicpu_info.get('total_time', 0)
# Init a dict for counting the num of streams. # Init a dict for counting the num of streams.
stream_count_dict = {} stream_count_dict = {}
for timeline in timeline_list: for timeline in timeline_list:
self._parse_timeline_data(timeline) self._parse_timeline_data(timeline, min_cycle_counter)
# Updating the collection of streams. # Updating the collection of streams.
if len(timeline) == 4: if len(timeline) == 4:
self._update_num_of_streams(timeline, stream_count_dict) self._update_num_of_streams(timeline, stream_count_dict)
...@@ -278,12 +293,12 @@ class TimelineAnalyser(BaseAnalyser): ...@@ -278,12 +293,12 @@ class TimelineAnalyser(BaseAnalyser):
# Get framework metadata. # Get framework metadata.
framework_obj_list = framework_info.get('object') framework_obj_list = framework_info.get('object')
# The length of list is the number of operators. # The length of list is the number of operators.
self._timeline_summary['num_of_ops'] = len(framework_obj_list) self._timeline_summary['num_of_ops'] += len(framework_obj_list)
self._add_framework_info(framework_obj_list) self._add_framework_info(framework_obj_list)
logger.info('Finished adding info into timeline...') logger.info('Finished adding info into timeline...')
# Update timeline summary info # Update timeline summary info
self._timeline_summary['num_of_streams'] = len(stream_count_dict.keys()) self._timeline_summary['num_of_streams'] += len(stream_count_dict.keys())
def _add_framework_info(self, framework_obj_list): def _add_framework_info(self, framework_obj_list):
""" """
......
...@@ -48,6 +48,8 @@ class DataPreProcessParser: ...@@ -48,6 +48,8 @@ class DataPreProcessParser:
self._thread_flag = 7 self._thread_flag = 7
self._ms_kernel_run_end_index = 2 self._ms_kernel_run_end_index = 2
self._other_kernel_run_end_index = 5 self._other_kernel_run_end_index = 5
self._result_list = []
self._min_cycle_counter = float('inf')
def _get_source_file(self): def _get_source_file(self):
"""Get log file name, which was created by ada service.""" """Get log file name, which was created by ada service."""
...@@ -129,3 +131,52 @@ class DataPreProcessParser: ...@@ -129,3 +131,52 @@ class DataPreProcessParser:
data_source=tabulate(result_list, self._dst_file_column_title, data_source=tabulate(result_list, self._dst_file_column_title,
tablefmt='simple'), tablefmt='simple'),
is_start=True, is_print=True) is_start=True, is_print=True)
# For timeline display.
self._result_list = result_list
def query_aicpu_data(self):
"""
Get execution time of AI CPU operator.
Returns:
a dict, the metadata of AI CPU operator execution time.
"""
stream_id = 0 # Default stream id for AI CPU.
pid = 9000 # Default pid for AI CPU.
factor = 1000 # Convert time unit from 1us to 1ms
total_time = 0
min_cycle_counter = float('inf')
aicpu_info = []
op_count_list = []
for aicpu_item in self._result_list:
if "AI CPU Total Time(ms):" in aicpu_item:
total_time = aicpu_item[-1]
continue
op_name = aicpu_item[1]
start_time = float(aicpu_item[4]) / factor
min_cycle_counter = min(min_cycle_counter, start_time)
end_time = float(aicpu_item[5]) / factor
duration = end_time - start_time
aicpu_info.append([op_name, stream_id, start_time, duration, pid])
# Record the number of operator types.
if op_name not in op_count_list:
op_count_list.append(op_name)
self._min_cycle_counter = min_cycle_counter
aicpu_dict = {
'info': aicpu_info,
'total_time': float(total_time),
'op_exe_times': len(aicpu_info),
'num_of_ops': len(op_count_list),
'num_of_streams': 1
}
return aicpu_dict
@property
def min_cycle_counter(self):
"""Get minimum cycle counter in AI CPU."""
return self._min_cycle_counter
...@@ -122,8 +122,6 @@ class OPComputeTimeParser: ...@@ -122,8 +122,6 @@ class OPComputeTimeParser:
# Write the timeline data into file, # Write the timeline data into file,
# including operator name, stream id, start time, and duration. # including operator name, stream id, start time, and duration.
self._write_timeline_data_into_file(timeline_data) self._write_timeline_data_into_file(timeline_data)
# Write the minimum cycle counter into the file.
self.write_min_cycle_counter_to_file()
def _write_op_time_into_file(self, result_data): def _write_op_time_into_file(self, result_data):
""" """
...@@ -201,7 +199,7 @@ class OPComputeTimeParser: ...@@ -201,7 +199,7 @@ class OPComputeTimeParser:
cur_index += 1 cur_index += 1
# Update the value of minimum cycle counter. # Update the value of minimum cycle counter.
self._min_cycle_counter = min_cycle_counter self._min_cycle_counter = min_cycle_counter / 1e5 # Convert the time unit from 10ns to 1ms
return tmp_result_data return tmp_result_data
...@@ -222,7 +220,7 @@ class OPComputeTimeParser: ...@@ -222,7 +220,7 @@ class OPComputeTimeParser:
for item in op_data_list: for item in op_data_list:
op_name = item.op_name op_name = item.op_name
# Unit conversion: converting the cycle counter into ms. # Unit conversion: converting the cycle counter into ms.
op_start_time_str = str((item.cycle_counter - self._min_cycle_counter) / factor) op_start_time_str = str(item.cycle_counter / factor)
op_duration = item.duration / factor op_duration = item.duration / factor
op_duration_str = str(item.duration / factor) op_duration_str = str(item.duration / factor)
if op_name in op_name_time_dict.keys(): if op_name in op_name_time_dict.keys():
...@@ -243,13 +241,7 @@ class OPComputeTimeParser: ...@@ -243,13 +241,7 @@ class OPComputeTimeParser:
(op_start_time_str, op_duration_str) (op_start_time_str, op_duration_str)
) )
def write_min_cycle_counter_to_file(self): @property
"""Write minimum cycle counter into a txt file.""" def min_cycle_counter(self):
min_cycle_counter = self._min_cycle_counter """Get minimum cycle counter."""
file_name = 'min_cycle_counter_' + self._device_id + '.txt' return self._min_cycle_counter
file_path = os.path.join(self._output_path, file_name)
file_path = validate_and_normalize_path(
file_path, raise_key='Invalid min cycle counter file path.'
)
with open(file_path, 'w') as file:
file.write(str(min_cycle_counter))
...@@ -196,7 +196,7 @@ class Profiler: ...@@ -196,7 +196,7 @@ class Profiler:
# analyse timeline info # analyse timeline info
try: try:
self._analyse_timeline() self._analyse_timeline(aicpu_data_parser, optime_parser)
except (ProfilerIOException, ProfilerFileNotFoundException, ValidationError) as err: except (ProfilerIOException, ProfilerFileNotFoundException, ValidationError) as err:
logger.warning('Fail to write timeline data: %s', err) logger.warning('Fail to write timeline data: %s', err)
...@@ -233,14 +233,13 @@ class Profiler: ...@@ -233,14 +233,13 @@ class Profiler:
logger.info("Finish saving the intermediate result: %s", step_trace_intermediate_file_path) logger.info("Finish saving the intermediate result: %s", step_trace_intermediate_file_path)
logger.info("The point info is: %s", point_info) logger.info("The point info is: %s", point_info)
def _analyse_timeline(self): def _analyse_timeline(self, aicpu_parser, optime_parser):
""" """
Analyse and parse timeline info. Analyse and parse timeline info.
""" """
timeline_analyser = AnalyserFactory.instance().get_analyser( timeline_analyser = AnalyserFactory.instance().get_analyser(
'timeline', self._output_path, self._dev_id 'timeline', self._output_path, self._dev_id
) )
min_cycle_counter = timeline_analyser.get_min_cycle_counter()
# Get framework info # Get framework info
aicoredetail_analyser = AnalyserFactory.instance().get_analyser( aicoredetail_analyser = AnalyserFactory.instance().get_analyser(
...@@ -252,14 +251,16 @@ class Profiler: ...@@ -252,14 +251,16 @@ class Profiler:
step_trace_analyser = AnalyserFactory.instance().get_analyser( step_trace_analyser = AnalyserFactory.instance().get_analyser(
'step_trace', self._output_path, self._dev_id 'step_trace', self._output_path, self._dev_id
) )
all_reduce_info = step_trace_analyser.query_for_all_reduce(min_cycle_counter) all_reduce_info = step_trace_analyser.query_for_all_reduce()
# Get timeline info # Get timeline info
logger.info('Start writing timeline info...') logger.info('Start writing timeline info...')
logger.info('Warm Prompt: It could take a few minutes if you are training ' logger.info('Warm Prompt: It could take a few minutes if you are training '
'with a complex network or more than 10 steps.') 'with a complex network or more than 10 steps.')
# Add AllReduce and framework info into timeline # Add info into timeline, such as AI CPU, AllReduce, framework info.
timeline_analyser.init_timeline(all_reduce_info, framework_info) aicpu_info = aicpu_parser.query_aicpu_data()
min_cycle_counter = min(aicpu_parser.min_cycle_counter, optime_parser.min_cycle_counter)
timeline_analyser.init_timeline(all_reduce_info, framework_info, aicpu_info, min_cycle_counter)
timeline_analyser.write_timeline() timeline_analyser.write_timeline()
timeline_analyser.write_timeline_summary() timeline_analyser.write_timeline_summary()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册