diff --git a/mindinsight/profiler/analyser/step_trace_analyser.py b/mindinsight/profiler/analyser/step_trace_analyser.py index 825cc8792624063d1ac3e7f0b94ce1909a50d415..2dade026fedabd4185cc530415dddefbad742b72 100644 --- a/mindinsight/profiler/analyser/step_trace_analyser.py +++ b/mindinsight/profiler/analyser/step_trace_analyser.py @@ -69,7 +69,7 @@ class StepTraceAnalyser(BaseAnalyser): return self._result - def query_for_all_reduce(self): + def query_for_all_reduce(self, min_cycle_counter): """ Query for all reduce info. @@ -81,8 +81,9 @@ class StepTraceAnalyser(BaseAnalyser): reduce_infos = [] for row_info in self._data[:-1]: row_info_dict = self._get_info_dict_from_row_data(row_info, 'systime') - reduce_info = self._get_reduce_time_in_order(row_info_dict) - reduce_infos.append(reduce_info) + reduce_info = self._sort_reduce_by_time(row_info_dict, min_cycle_counter) + if reduce_info: + reduce_infos.append(reduce_info) return reduce_infos @@ -251,6 +252,42 @@ class StepTraceAnalyser(BaseAnalyser): reduce_events.sort(key=lambda elem: elem[1]) return reduce_info + def _sort_reduce_by_time(self, row_info_dict, min_cycle_counter): + """ + Sort reduce info by time. + + Args: + row_info_dict (dict): Step trace information. + min_cycle_counter (int): The minimum cycle counter. + + Returns: + list, including the all reduce info sorted by start time only. + [ + [reduce_field, stream_id, reduce_start, reduce_duration], + [...], + [...] + ] + """ + factor = 1e5 # convert time unit from 10ns to 1ms + reduce_pid = 10000 + reduce_info = [] + reduce_fields = [field_name for field_name in self.__column__ + if field_name.startswith('stream_') and not field_name.endswith('point')] + for reduce_field in reduce_fields: + reduce_start = row_info_dict.get(reduce_field + '_start_point') + reduce_start = (reduce_start - min_cycle_counter) / factor \ + if reduce_start else 0 + reduce_duration = row_info_dict.get(reduce_field) + reduce_duration = reduce_duration / factor if reduce_duration else 0 + if not (reduce_start and reduce_duration): + log.info("Reduce event missing value.") + continue + cur_stream_id = reduce_field.split('_', 2)[1] + reduce_info = [reduce_field, int(cur_stream_id), reduce_start, + reduce_duration, reduce_pid] + + return reduce_info + def _construct_reduce_lines(self, row_info_dict): """ Contruct first line in detailed graph. diff --git a/mindinsight/profiler/analyser/timeline_analyser.py b/mindinsight/profiler/analyser/timeline_analyser.py index dbbaf57deb636c6b1486808603bbd996a58a351f..01bf6bc2a8278a938d044e5b2c18ab17cce1f734 100644 --- a/mindinsight/profiler/analyser/timeline_analyser.py +++ b/mindinsight/profiler/analyser/timeline_analyser.py @@ -47,8 +47,6 @@ class TimelineAnalyser(BaseAnalyser): def _load(self): """Load data according to the parsed profiling files.""" - self.load_timeline_data() - self._timeline_summary['op_exe_times'] = len(self._timeline_meta) def _filter(self, filter_condition): """ @@ -122,6 +120,7 @@ class TimelineAnalyser(BaseAnalyser): def write_timeline(self): """Load data according to the parsed profiling files.""" # Write timeline to file. + logger.info('Writing timeline file...') file_size = self.write_timeline_to_json() # If the file size is larger than 20MB, open a new file and @@ -131,6 +130,8 @@ class TimelineAnalyser(BaseAnalyser): # write to json file for display self.write_timeline_to_json_by_limitation() + logger.info('Finished file writing!') + def write_timeline_to_json(self): """Write timeline to json.""" timeline_filename = self._timeline_filename.format(self._device_id) @@ -197,7 +198,7 @@ class TimelineAnalyser(BaseAnalyser): logger.error('Error occurred when write timeline summary file: %s', err) raise ProfilerIOException - def load_timeline_data(self): + def _load_timeline_data(self): """Load timeline data from file.""" file_path = os.path.join( self._profiling_dir, @@ -210,34 +211,37 @@ class TimelineAnalyser(BaseAnalyser): logger.error("Failed to find parsed timeline file.") raise ProfilerFileNotFoundException('parsed timeline file') - stream_count_dict = {} + timeline_list = [] try: with open(file_path, 'r') as f_obj: for line in f_obj: if not line.startswith('op_name'): line_list = line.strip('\n').split(',') - self._parse_timeline_data(line_list) - self._update_num_of_streams(line_list, stream_count_dict) + timeline_list.append(line_list) except (IOError, OSError) as err: logger.error('Error occurred when read timeline intermediate file: %s', err) raise ProfilerIOException - # Update timeline summary info - self._timeline_summary['num_of_streams'] = len(stream_count_dict.keys()) + return timeline_list def _parse_timeline_data(self, line_list): """Parse timeline data.""" + # factor to convert the time unit from 1ms to 1us for timeline display factor = 1000 op_meta = TimelineContainer(line_list) timeline_dict = {} timeline_dict['name'] = op_meta.op_name timeline_dict['ph'] = 'X' - timeline_dict['pid'] = int(self._device_id) timeline_dict['tid'] = op_meta.stream_id timeline_dict['ts'] = op_meta.start_time * factor dur = op_meta.duration * factor timeline_dict['dur'] = dur - self._timeline_summary['total_time'] += dur + if op_meta.pid == 10000: # AllReduce PID + timeline_dict['pid'] = 10000 + else: + timeline_dict['pid'] = int(self._device_id) + # Update total time of operator execution. + self._timeline_summary['total_time'] += dur self._timeline_meta.append(timeline_dict) @staticmethod @@ -249,7 +253,7 @@ class TimelineAnalyser(BaseAnalyser): else: stream_count_dict[stream_id] += 1 - def get_min_cycle_counter_from_file(self): + def get_min_cycle_counter(self): """ Get minimum cycle counter. @@ -280,48 +284,50 @@ class TimelineAnalyser(BaseAnalyser): return min_cycle_counter - def add_all_reduce_info(self, all_reduce_info): + def init_timeline(self, all_reduce_info, framework_info): """ - Add all reduce info into timeline metadata. + Init timeline metadata, adding all collected info. Args: - all_reduce_info (list): The metadata of AllReduce operator. - [ - { - 'stream_id_1': [(start_time, end_time, duration, field_name)], - ... - }, - {...} - ] + all_reduce_info (list[list]): The metadata of AllReduce operator. + framework_info (dict): The framework metadata. """ - logger.info('Adding AllReduce info...') - factor = 100 - min_cycle_counter = self.get_min_cycle_counter_from_file() - for step_meta in all_reduce_info: - for stream_id, time_info_list in step_meta.items(): - for time_info in time_info_list: - start, _, dur, name = time_info - all_reduce_dict = {} - all_reduce_dict['name'] = name - all_reduce_dict['ph'] = 'X' - # Using 10000 to represent AllReduce - all_reduce_dict['pid'] = 10000 - all_reduce_dict['tid'] = int(stream_id) - all_reduce_dict['ts'] = (start - min_cycle_counter) / factor - all_reduce_dict['dur'] = dur / factor - self._timeline_meta.append(all_reduce_dict) - self._timeline_summary['total_time'] += all_reduce_dict['dur'] - - def add_framework_info(self, framework_info): + logger.info('Initiating timeline...') + timeline_list = self._load_timeline_data() + self._timeline_summary['op_exe_times'] = len(timeline_list) + + # Add AllReduce info to timeline temp list and sort by start time. + if all_reduce_info: + logger.debug('AllReduce info found. Start adding info into timeline...') + timeline_list.extend(all_reduce_info) + timeline_list.sort(key=lambda x: float(x[2])) + + # Init a dict for counting the num of streams. + stream_count_dict = {} + for timeline in timeline_list: + self._parse_timeline_data(timeline) + # Updating the collection of streams. + if len(timeline) == 4: + self._update_num_of_streams(timeline, stream_count_dict) + + # Get framework metadata. + framework_obj_list = framework_info.get('object') + # The length of list is the number of operators. + self._timeline_summary['num_of_ops'] = len(framework_obj_list) + self._add_framework_info(framework_obj_list) + logger.info('Finished adding info into timeline...') + + # Update timeline summary info + self._timeline_summary['num_of_streams'] = len(stream_count_dict.keys()) + + def _add_framework_info(self, framework_obj_list): """ Add framework info into timeline metadata. Args: - framework_info (dict): The framework metadata. + framework_obj_list (list): The framework metadata. """ - logger.info('Adding framework info...') - framework_obj_list = framework_info.get('object') - self._timeline_summary['num_of_ops'] = len(framework_obj_list) + logger.debug('Start adding framework info into timeline...') for framework_obj in framework_obj_list: op_name = framework_obj[0] op_type = framework_obj[1] @@ -335,3 +341,5 @@ class TimelineAnalyser(BaseAnalyser): 'fullname': op_full_name } timeline_obj['args'].update(op_info) + + logger.debug('Finished adding framework info into timeline...') diff --git a/mindinsight/profiler/parser/container.py b/mindinsight/profiler/parser/container.py index 2d6d80e1a92103f9055d9bd1bcaee1d4ea66ee35..c39ca2db0c021863d98ee0c31be09216efb5bdac 100644 --- a/mindinsight/profiler/parser/container.py +++ b/mindinsight/profiler/parser/container.py @@ -69,6 +69,9 @@ class TimelineContainer: self._stream_id = int(split_list[1]) self._start_time = float(split_list[2]) self._duration = float(split_list[3]) + self._pid = None + if len(split_list) == 5: + self._pid = int(split_list[4]) @property def op_name(self): @@ -89,3 +92,8 @@ class TimelineContainer: def duration(self): """Get the duration of the operator execution.""" return self._duration + + @property + def pid(self): + """Get the pid of the operator execution.""" + return self._pid diff --git a/mindinsight/profiler/profiling.py b/mindinsight/profiler/profiling.py index 5a28eba4faaf90e44b6563720f70e2440d1ba4fd..529eef92384037c1214f8b08448c99cb4ebb2d26 100644 --- a/mindinsight/profiler/profiling.py +++ b/mindinsight/profiler/profiling.py @@ -194,7 +194,10 @@ class Profiler: logger.warning(err.message) # analyse timeline info - self._analyse_timeline() + try: + self._analyse_timeline() + except (ProfilerIOException, ProfilerFileNotFoundException, ValidationError) as err: + logger.warning('Fail to write timeline data: %s', err) def _analyse_step_trace(self, source_path, framework_parser): """ @@ -233,6 +236,11 @@ class Profiler: """ Analyse and parse timeline info. """ + timeline_analyser = AnalyserFactory.instance().get_analyser( + 'timeline', self._output_path, self._dev_id + ) + min_cycle_counter = timeline_analyser.get_min_cycle_counter() + # Get framework info aicoredetail_analyser = AnalyserFactory.instance().get_analyser( 'aicore_detail', self._output_path, self._dev_id @@ -243,19 +251,16 @@ class Profiler: step_trace_analyser = AnalyserFactory.instance().get_analyser( 'step_trace', self._output_path, self._dev_id ) - all_reduce_info = step_trace_analyser.query_for_all_reduce() + all_reduce_info = step_trace_analyser.query_for_all_reduce(min_cycle_counter) # Get timeline info - timeline_analyser = AnalyserFactory.instance().get_analyser( - 'timeline', self._output_path, self._dev_id - ) - timeline_analyser.add_framework_info(framework_info) - timeline_analyser.add_all_reduce_info(all_reduce_info) - try: - timeline_analyser.write_timeline() - timeline_analyser.write_timeline_summary() - except (ProfilerIOException, ProfilerFileNotFoundException, ValidationError) as err: - logger.warning('Fail to write timeline data: %s', err) + logger.info('Start writing timeline info...') + logger.info('Warm Prompt: It could take a few minutes if you are training ' + 'with a complex network or more than 10 steps.') + # Add AllReduce and framework info into timeline + timeline_analyser.init_timeline(all_reduce_info, framework_info) + timeline_analyser.write_timeline() + timeline_analyser.write_timeline_summary() def __del__(self): """Disable the profiling collection service, called after training."""