diff --git a/mindinsight/backend/profiler/profile_api.py b/mindinsight/backend/profiler/profile_api.py index dfaded8eca631ecd0fd4283012ad89da5170a682..54a8bcf7b7dea3c2f32591808593a82fd56138b2 100644 --- a/mindinsight/backend/profiler/profile_api.py +++ b/mindinsight/backend/profiler/profile_api.py @@ -401,6 +401,52 @@ def get_minddata_pipeline_queue_info(): return jsonify(op_queue_info) +@BLUEPRINT.route("/profile/timeline-summary", methods=["GET"]) +def get_timeline_summary(): + """ + Get timeline summary info. + + Returns: + Response, the timeline summary info. + + Examples: + >>> GET http://xxxx/v1/mindinsight/profile/timeline-summary + """ + summary_dir = request.args.get("dir") + profiler_dir = validate_and_normalize_profiler_path(summary_dir, settings.SUMMARY_BASE_DIR) + device_id = request.args.get("device_id", default='0') + _ = to_int(device_id, 'device_id') + + analyser = AnalyserFactory.instance().get_analyser( + 'timeline', profiler_dir, device_id) + summary = analyser.get_timeline_summary() + + return summary + + +@BLUEPRINT.route("/profile/timeline", methods=["GET"]) +def get_timeline_detail(): + """ + Get timeline detail. + + Returns: + Response, the detail information of timeline. + + Examples: + >>> GET http://xxxx/v1/mindinsight/profile/timeline + """ + summary_dir = request.args.get("dir") + profiler_dir = validate_and_normalize_profiler_path(summary_dir, settings.SUMMARY_BASE_DIR) + device_id = request.args.get("device_id", default='0') + _ = to_int(device_id, 'device_id') + + analyser = AnalyserFactory.instance().get_analyser( + 'timeline', profiler_dir, device_id) + timeline = analyser.get_display_timeline() + + return jsonify(timeline) + + def init_module(app): """ Init module entry. diff --git a/mindinsight/profiler/analyser/__init__.py b/mindinsight/profiler/analyser/__init__.py index cf01fe5eb77cfbc29d7d8a2c38b1bbc4ddb5f229..bc2cf2374d1823a1123b6691413d324f3477fda8 100644 --- a/mindinsight/profiler/analyser/__init__.py +++ b/mindinsight/profiler/analyser/__init__.py @@ -14,4 +14,4 @@ # ============================================================================ """The analyser module.""" from . import analyser, minddata_pipeline_analyser, step_trace_analyser, \ - minddata_analyser + minddata_analyser, timeline_analyser diff --git a/mindinsight/profiler/analyser/timeline_analyser.py b/mindinsight/profiler/analyser/timeline_analyser.py new file mode 100644 index 0000000000000000000000000000000000000000..b4e94ba4b29cdaee74a8333a171e758b29fdf984 --- /dev/null +++ b/mindinsight/profiler/analyser/timeline_analyser.py @@ -0,0 +1,340 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""The Timeline Analyser.""" +import json +import os + +from mindinsight.profiler.analyser.base_analyser import BaseAnalyser +from mindinsight.profiler.parser.container import TimelineContainer +from mindinsight.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \ + ProfilerIOException +from mindinsight.profiler.common.log import logger +from mindinsight.profiler.common.validator.validate_path import validate_and_normalize_path + + +SIZE_LIMIT = 20 * 1024 * 1024 # 20MB + + +class TimelineAnalyser(BaseAnalyser): + """ + Analyse timeline data from file. + """ + __col_names__ = ['op_name', 'stream_id', 'start_time', 'duration'] + _output_timeline_data_file_path = 'output_timeline_data_{}.txt' + _min_cycle_counter_file_path = 'min_cycle_counter_{}.txt' + _timeline_filename = 'timeline_detail_{}.json' + _display_filename = 'timeline_display_{}.json' + _timeline_summary_filename = 'timeline_summary_{}.json' + _timeline_meta = [] + _timeline_summary = { + 'total_time': 0, + 'num_of_streams': 0, + 'num_of_ops': 0, + 'op_exe_times': 0 + } + + def _load(self): + """Load data according to the parsed profiling files.""" + self.load_timeline_data() + self._timeline_summary['op_exe_times'] = len(self._timeline_meta) + + def _filter(self, filter_condition): + """ + Filter the profiling data according to the filter condition. + + Args: + filter_condition (dict): The filter condition. + """ + + def get_display_timeline(self): + """ + Get timeline data for UI display. + + Returns: + json, the content of timeline data. + """ + # Search timeline json file under profiling dir. + file_path = None + for filename in os.listdir(self._profiling_dir): + if filename.startswith('timeline_display') and filename.endswith('.json'): + file_path = os.path.join(self._profiling_dir, filename) + logger.debug('Display file found.') + break + elif filename.startswith('timeline_detail') and filename.endswith('.json'): + file_path = os.path.join(self._profiling_dir, filename) + logger.debug('Original file found.') + break + + file_path = validate_and_normalize_path( + file_path, raise_key='Invalid timeline json path.' + ) + + timeline = [] + if os.path.exists(file_path): + try: + with open(file_path, 'r') as f_obj: + timeline = json.load(f_obj) + except (IOError, OSError) as err: + logger.error('Error occurred when read timeline display file: %s', err) + raise ProfilerIOException + else: + logger.info('No timeline file. Please check the output path.') + + return timeline + + def get_timeline_summary(self): + """ + Get timeline summary information for UI display. + + Returns: + json, the content of timeline summary information. + """ + file_path = None + summary_file_name = 'timeline_summary_{}.json'.format(self._device_id) + if summary_file_name in os.listdir(self._profiling_dir): + file_path = os.path.join(self._profiling_dir, summary_file_name) + + file_path = validate_and_normalize_path( + file_path, raise_key='Invalid timeline summary path.' + ) + + timeline_summary = {} + if os.path.exists(file_path): + try: + with open(file_path, 'r') as f_obj: + timeline_summary = json.load(f_obj) + except (IOError, OSError) as err: + logger.error('Error occurred when read timeline summary file: %s', err) + raise ProfilerIOException + + return timeline_summary + + def write_timeline(self): + """Load data according to the parsed profiling files.""" + # Write timeline to file. + file_size = self.write_timeline_to_json() + + # If the file size is larger than 20MB, open a new file and + # write the first 20MB content into it. + if file_size > SIZE_LIMIT: + logger.debug('File size is larger than 20MB, will be resized...') + # write to json file for display + self.write_timeline_to_json_by_limitation() + + def write_timeline_to_json(self): + """Write timeline to json.""" + timeline_filename = self._timeline_filename.format(self._device_id) + timeline_file_path = os.path.join( + self._profiling_dir, + timeline_filename + ) + + timeline_file_path = validate_and_normalize_path( + timeline_file_path, raise_key='Invalid timeline json path.' + ) + + try: + with open(timeline_file_path, 'w') as json_file: + json.dump(self._timeline_meta, json_file) + file_size = os.path.getsize(timeline_file_path) + except (IOError, OSError) as err: + logger.error('Error occurred when write timeline full details: %s', err) + raise ProfilerIOException + + return file_size + + def write_timeline_to_json_by_limitation(self): + """Write timeline to json by limitation.""" + display_filename = self._display_filename.format(self._device_id) + display_file_path = os.path.join( + self._profiling_dir, + display_filename + ) + + display_file_path = validate_and_normalize_path( + display_file_path, raise_key='Invalid timeline display json path.' + ) + + try: + with open(display_file_path, 'w') as json_file: + json_file.write('[') + for item in self._timeline_meta: + json.dump(item, json_file) + file_size = os.path.getsize(display_file_path) + if file_size > SIZE_LIMIT: + break + json_file.write(',') + json_file.write(']') + except (IOError, OSError) as err: + logger.error('Error occurred when write timeline display file: %s', err) + raise ProfilerIOException + + def write_timeline_summary(self): + """Write timeline summary to json.""" + timeline_summary_file_path = os.path.join( + self._profiling_dir, + self._timeline_summary_filename.format(self._device_id) + ) + + timeline_summary_file_path = validate_and_normalize_path( + timeline_summary_file_path, raise_key='Invalid timeline summary path.' + ) + + try: + with open(timeline_summary_file_path, 'w') as json_file: + json.dump(self._timeline_summary, json_file) + except (IOError, OSError) as err: + logger.error('Error occurred when write timeline summary file: %s', err) + raise ProfilerIOException + + def load_timeline_data(self): + """Load timeline data from file.""" + file_path = os.path.join( + self._profiling_dir, + self._output_timeline_data_file_path.format(self._device_id) + ) + file_path = validate_and_normalize_path( + file_path, raise_key='Invalid timeline txt file path.' + ) + if not os.path.exists(file_path): + logger.error("Failed to find parsed timeline file.") + raise ProfilerFileNotFoundException('parsed timeline file') + + stream_count_dict = {} + try: + with open(file_path, 'r') as f_obj: + for line in f_obj: + if not line.startswith('=') and not line.startswith('op_name') and \ + not line.startswith('-'): + line_list = line.split() + self._parse_timeline_data(line_list) + self._update_num_of_streams(line_list, stream_count_dict) + except (IOError, OSError) as err: + logger.error('Error occurred when read timeline intermediate file: %s', err) + raise ProfilerIOException + + # Update timeline summary info + self._timeline_summary['num_of_streams'] = len(stream_count_dict.keys()) + + def _parse_timeline_data(self, line_list): + """Parse timeline data.""" + factor = 1000 + op_meta = TimelineContainer(line_list) + timeline_dict = {} + timeline_dict['name'] = op_meta.op_name + timeline_dict['ph'] = 'X' + timeline_dict['pid'] = int(self._device_id) + timeline_dict['tid'] = op_meta.stream_id + timeline_dict['ts'] = op_meta.start_time * factor + dur = op_meta.duration * factor + timeline_dict['dur'] = dur + self._timeline_summary['total_time'] += dur + self._timeline_meta.append(timeline_dict) + + @staticmethod + def _update_num_of_streams(line_list, stream_count_dict): + """Update number of streams.""" + stream_id = line_list[1] + if stream_id not in stream_count_dict.keys(): + stream_count_dict[stream_id] = 1 + else: + stream_count_dict[stream_id] += 1 + + def get_min_cycle_counter_from_file(self): + """ + Get minimum cycle counter. + + Returns: + float, the minimum value of the cycle counter. + """ + file_path = os.path.join( + self._profiling_dir, + self._min_cycle_counter_file_path.format(self._device_id) + ) + + file_path = validate_and_normalize_path( + file_path, raise_key='Invalid min cycle counter file path.' + ) + + if os.path.exists(file_path): + try: + with open(file_path, 'r') as f_obj: + min_cycle_counter = f_obj.read() + min_cycle_counter = float(min_cycle_counter) \ + if not min_cycle_counter == 'inf' else 0 + except (IOError, OSError) as err: + logger.error('Error occurred when read minimum cycle counter: %s', err) + raise ProfilerIOException + else: + min_cycle_counter = 0 + logger.info("No min cycle counter recorded.") + + return min_cycle_counter + + def add_all_reduce_info(self, all_reduce_info): + """ + Add all reduce info into timeline metadata. + + Args: + all_reduce_info (list): The metadata of AllReduce operator. + [ + { + 'stream_id_1': [(start_time, end_time, duration, field_name)], + ... + }, + {...} + ] + """ + logger.info('Adding AllReduce info...') + factor = 100 + min_cycle_counter = self.get_min_cycle_counter_from_file() + for step_meta in all_reduce_info: + for stream_id, time_info_list in step_meta.items(): + for time_info in time_info_list: + start, _, dur, name = time_info + all_reduce_dict = {} + all_reduce_dict['name'] = name + all_reduce_dict['ph'] = 'X' + # Using 10000 to represent AllReduce + all_reduce_dict['pid'] = 10000 + all_reduce_dict['tid'] = int(stream_id) + all_reduce_dict['ts'] = (start - min_cycle_counter) / factor + all_reduce_dict['dur'] = dur / factor + self._timeline_meta.append(all_reduce_dict) + self._timeline_summary['total_time'] += all_reduce_dict['dur'] + + def add_framework_info(self, framework_info): + """ + Add framework info into timeline metadata. + + Args: + framework_info (dict): The framework metadata. + """ + logger.info('Adding framework info...') + framework_obj_list = framework_info.get('object') + self._timeline_summary['num_of_ops'] = len(framework_obj_list) + for framework_obj in framework_obj_list: + op_name = framework_obj[0] + op_type = framework_obj[1] + op_full_name = framework_obj[4] + op_info = framework_obj[5] + for timeline_obj in self._timeline_meta: + if op_full_name == timeline_obj.get('name'): + timeline_obj['name'] = op_name + timeline_obj['args'] = { + 'type': op_type, + 'fullname': op_full_name + } + timeline_obj['args'].update(op_info) diff --git a/mindinsight/profiler/parser/container.py b/mindinsight/profiler/parser/container.py new file mode 100644 index 0000000000000000000000000000000000000000..2d6d80e1a92103f9055d9bd1bcaee1d4ea66ee35 --- /dev/null +++ b/mindinsight/profiler/parser/container.py @@ -0,0 +1,91 @@ +"""The container of metadata used in profiler parser.""" + + +class HWTSContainer: + """ + HWTS output container. + + Args: + split_list (list): The split list of metadata in HWTS output file. + """ + def __init__(self, split_list): + self._op_name = '' + self._duration = None + self._status = split_list[0] + self._task_id = split_list[6] + self._cycle_counter = float(split_list[7]) + self._stream_id = split_list[8] + + @property + def status(self): + """Get the status of the operator, i.e. Start or End.""" + return self._status + + @property + def task_id(self): + """Get the task id of the operator.""" + return self._task_id + + @property + def cycle_counter(self): + """Get the cycle counter.""" + return self._cycle_counter + + @property + def stream_id(self): + """Get the stream id of the operator.""" + return self._stream_id + + @property + def op_name(self): + """Get the name of the operator.""" + return self._op_name + + @op_name.setter + def op_name(self, name): + """Set the name of the operator.""" + self._op_name = name + + @property + def duration(self): + """Get the duration of the operator execution.""" + return self._duration + + @duration.setter + def duration(self, value): + """Set the duration of the operator execution.""" + self._duration = value + + +class TimelineContainer: + """ + A container of operator computation metadata. + + Args: + split_list (list): The split list of metadata in op_compute output file. + """ + def __init__(self, split_list): + self._op_name = split_list[0] + self._stream_id = int(split_list[1]) + self._start_time = float(split_list[2]) + self._duration = float(split_list[3]) + + @property + def op_name(self): + """Get the name of the operator.""" + return self._op_name + + @property + def stream_id(self): + """Get the stream id of the operator.""" + return self._stream_id + + @property + def start_time(self): + """Get the execution start time of the operator.""" + return self._start_time + + @property + def duration(self): + """Get the duration of the operator execution.""" + return self._duration diff --git a/mindinsight/profiler/parser/optime_parser.py b/mindinsight/profiler/parser/optime_parser.py index 6b2436b423a437d2c8567128ec546db477ed271b..4ec9301d6d9299367c6dfbc7864d8e3c6658de14 100755 --- a/mindinsight/profiler/parser/optime_parser.py +++ b/mindinsight/profiler/parser/optime_parser.py @@ -13,8 +13,15 @@ # limitations under the License. # ============================================================================ """Op compute time files parser.""" +import os + from tabulate import tabulate from mindinsight.profiler.common._utils import fwrite_format +from mindinsight.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException +from mindinsight.profiler.common.log import logger +from mindinsight.profiler.common.validator.validate_path import validate_and_normalize_path +from mindinsight.profiler.parser.container import HWTSContainer + class OPComputeTimeParser: """ @@ -28,11 +35,20 @@ class OPComputeTimeParser: _dst_file_title = 'title:op compute time' _dst_file_column_title = ['op_name', 'compute_time(ms)', 'stream_id'] + _timeline_file_title = 'title:timeline info' + _timeline_file_column_title = ['op_name', 'stream_id', 'start_time', 'duration'] - def __init__(self, hwts_output_file, output_filename, op_task_info): + def __init__(self, hwts_output_file, output_filename, op_task_info, + output_path, device_id): + hwts_output_file = validate_and_normalize_path( + hwts_output_file, raise_key='Invalid hwts output file path.' + ) self._hwts_output_file = hwts_output_file self._output_filename = output_filename self._op_task_info = op_task_info + self._output_path = output_path + self._device_id = device_id + self._min_cycle_counter = float("inf") def _get_op_task_id_map(self): """ @@ -44,78 +60,197 @@ class OPComputeTimeParser: op_map_result = [] hwts_list = [] - with(open(self._hwts_output_file, 'r')) as data_file: + + if not os.path.exists(self._hwts_output_file): + logger.error('The hwts output file does not exist.') + raise ProfilerFileNotFoundException('hwts output file') + + with open(self._hwts_output_file, 'r') as data_file: lines = data_file.readlines() for line in lines: - if line.startswith("Start of task"): + if line.startswith("Start of task") or line.startswith("End of task"): line_split = line.split() - hwts_list.append([line_split[0], line_split[6], line_split[7], line_split[8]]) - if line.startswith('End of task'): - line_split = line.split() - hwts_list.append([line_split[0], line_split[6], line_split[7], line_split[8]]) + container = HWTSContainer(line_split) + hwts_list.append(container) # hwts op map by taskId for hwts in hwts_list: - if hwts[1] in self._op_task_info.keys(): - op_map_result.append([self._op_task_info[hwts[1]], hwts[0], hwts[1], hwts[2], hwts[3]]) + if hwts.task_id in self._op_task_info.keys(): + hwts.op_name = self._op_task_info[hwts.task_id] + op_map_result.append(hwts) return op_map_result def execute(self): """Execute the parser, compute all op, get op time, and write it to the output file.""" + # Calculate the execution time of operators, + # and update the minimum cycle counter. + tmp_result_data = self._calculate_op_execution_time() + + # Convert time units from nanoseconds to milliseconds. + # The unit of the cycle counter is 10 nanoseconds. + op_name_time_dict = {} + op_name_stream_dict = {} + op_name_count_dict = {} + op_name_task_dict = {} + op_name_start_time = {} + self._convert_op_time_unit( + tmp_result_data, op_name_time_dict, op_name_stream_dict, + op_name_count_dict, op_name_task_dict, op_name_start_time + ) result_data = [] + for op_name, time in op_name_time_dict.items(): + if op_name in op_name_stream_dict.keys(): + stream_id = op_name_stream_dict[op_name] + avg_time = time / op_name_count_dict[op_name] + result_data.append([op_name, avg_time, stream_id]) + + timeline_data = [] + for op_name, time in op_name_time_dict.items(): + if op_name in op_name_stream_dict.keys(): + stream_id = op_name_stream_dict[op_name] + start_time_list = op_name_start_time.get(op_name) + for (start_time, duration) in start_time_list: + timeline_data.append([op_name, stream_id, start_time, duration]) + + # Write the metadata of operators into the file, + # including operator name, average time, and stream id. + self._write_op_time_into_file(result_data) + # Write the timeline data into file, + # including operator name, stream id, start time, and duration. + self._write_timeline_data_into_file(timeline_data) + # Write the minimum cycle counter into the file. + self.write_min_cycle_counter_to_file() + + def _write_op_time_into_file(self, result_data): + """ + Write the metadata of operators into the file, including + op name, average time, and stream id. + + Args: + result_data (list): The metadata to be written into the file. + [ + ['op_name_1', 'avg_time_1', 'stream_id_1'], + ['op_name_2', 'avg_time_2', 'stream_id_2'], + [...] + ] + """ + result_data.sort(key=lambda x: x[0]) + total_time = 0 + for item in result_data: + total_time += item[1] + result_data.append(["total op", total_time, 0]) + + fwrite_format(self._output_filename, data_source=self._dst_file_title, is_start=True) + fwrite_format(self._output_filename, data_source=tabulate(result_data, + self._dst_file_column_title, + tablefmt='simple')) + + def _write_timeline_data_into_file(self, timeline_data): + """ + Write the timeline information into the file, including + operator name, stream id, start time and duration. + + Args: + timeline_data (list): The metadata to be written into the file. + [ + ['op_name_1', 'stream_id_1', 'start_time_1', 'durarion_1'], + ['op_name_2', 'stream_id_2', 'start_time_2', 'durarion_2'], + [...] + ] + """ + # sorted by start times + timeline_data.sort(key=lambda x: float(x[2])) + filename = 'output_timeline_data_{}.txt'.format(self._device_id) + file_path = os.path.join(self._output_path, filename) + file_path = validate_and_normalize_path(file_path, raise_key='Invalid file path of timeline data.') + + # write to file + fwrite_format(file_path, data_source=self._timeline_file_title, is_start=True) + fwrite_format(file_path, data_source=tabulate( + timeline_data, self._timeline_file_column_title, tablefmt='simple' + )) + + def _calculate_op_execution_time(self): + """ + Calculate the execution time of each operator. + + Returns: + list, including the intermediate data of op execution time. + """ tmp_result_data = [] op_map_list = self._get_op_task_id_map() cur_index = 0 length = len(op_map_list) - + min_cycle_counter = float("inf") while cur_index < length: if cur_index + 1 == length: break - op_start = op_map_list[cur_index] - op_end = op_map_list[cur_index+1] - if op_start[1] == "Start" and op_end[1] == "End"\ - and op_start[0] == op_end[0]: - # op_name, task_id, cycle counter, stream_id - tmp_result_data.append([op_start[0], op_start[2], int(op_end[3]) - int(op_start[3]), op_start[4]]) + op_start = op_map_list[cur_index] + op_end = op_map_list[cur_index + 1] + if op_start.status == "Start" and op_end.status == "End" \ + and op_start.op_name == op_end.op_name: + op_start.duration = op_end.cycle_counter - op_start.cycle_counter + tmp_result_data.append(op_start) cur_index += 2 + if not op_start.op_name.startswith("assign"): + min_cycle_counter = min(min_cycle_counter, op_start.cycle_counter) else: cur_index += 1 - op_name_time_dict = {} - op_name_steamid_dict = {} - op_name_count_dict = {} - op_name_task_dict = {} + # Update the value of minimum cycle counter. + self._min_cycle_counter = min_cycle_counter - # compute all op - for item in tmp_result_data: - if item[0] in op_name_time_dict.keys(): - op_name_time_dict[item[0]] += float(item[2])/1e5 # cycle counter/1*10^5 ms - if item[1] == op_name_task_dict[item[0]]: - op_name_count_dict[item[0]] += 1 + return tmp_result_data - else: - op_name_time_dict[item[0]] = float(item[2])/1e5 - op_name_steamid_dict[item[0]] = item[-1] - op_name_task_dict[item[0]] = item[1] - op_name_count_dict[item[0]] = 1 + def _convert_op_time_unit(self, op_data_list, op_name_time_dict, op_name_stream_dict, + op_name_count_dict, op_name_task_dict, op_name_start_time): + """ + Calculate the execution time of operator and convert it into millisecond. - for op_name, time in op_name_time_dict.items(): - if op_name in op_name_steamid_dict.keys(): - stream_id = op_name_steamid_dict[op_name] - avg_time = time / op_name_count_dict[op_name] - result_data.append([op_name, avg_time, stream_id]) + Args: + op_data_list (list): The list of operator metadata. + op_name_time_dict (dict): The mapping relation of operator name and its execution time. + op_name_stream_dict (dict): The mapping relation of operator name and its stream id. + op_name_count_dict (dict): The mapping relation of operator name and its count. + op_name_task_dict (dict): The mapping relation of operator name and its task id. + op_name_start_time (dict): The mapping relation of operator name and its start time. + """ + factor = 1e5 + for item in op_data_list: + op_name = item.op_name + # Unit conversion: converting the cycle counter into ms. + op_start_time_str = str((item.cycle_counter - self._min_cycle_counter) / factor) + op_duration = item.duration / factor + op_duration_str = str(item.duration / factor) + if op_name in op_name_time_dict.keys(): + op_name_time_dict[op_name] += op_duration + if item.task_id == op_name_task_dict[op_name]: + op_name_count_dict[op_name] += 1 + op_name_start_time[op_name].append( + (op_start_time_str, op_duration_str) + ) - result_data.sort(key=lambda x: x[0]) - total_time = 0 - for item in result_data: - total_time += item[1] - result_data.append(["total op", total_time, 0]) + else: + op_name_time_dict[op_name] = op_duration + op_name_stream_dict[op_name] = item.stream_id + op_name_task_dict[op_name] = item.task_id + op_name_count_dict[op_name] = 1 + op_name_start_time[op_name] = [] + op_name_start_time[op_name].append( + (op_start_time_str, op_duration_str) + ) - fwrite_format(self._output_filename, data_source=self._dst_file_title, is_start=True) - fwrite_format(self._output_filename, data_source=tabulate(result_data, - self._dst_file_column_title, - tablefmt='simple')) + def write_min_cycle_counter_to_file(self): + """Write minimum cycle counter into a txt file.""" + min_cycle_counter = self._min_cycle_counter + file_name = 'min_cycle_counter_' + self._device_id + '.txt' + file_path = os.path.join(self._output_path, file_name) + file_path = validate_and_normalize_path( + file_path, raise_key='Invalid min cycle counter file path.' + ) + with open(file_path, 'w') as file: + file.write(str(min_cycle_counter)) diff --git a/mindinsight/profiler/profiling.py b/mindinsight/profiler/profiling.py index 07d5d2852b9249c7bab0587c46a80af750a3f1cb..24af157379933430d6bf07de1e76bf7679103a4f 100644 --- a/mindinsight/profiler/profiling.py +++ b/mindinsight/profiler/profiling.py @@ -16,11 +16,14 @@ import os import time +from marshmallow import ValidationError from tabulate import tabulate from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory from mindinsight.profiler.analyser.integrator import Integrator from mindinsight.profiler.common._utils import get_file_names, fwrite_format +from mindinsight.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \ + ProfilerIOException from mindinsight.profiler.common.log import logger from mindinsight.profiler.common.validator.checkparam import \ check_bool, check_subgraph @@ -76,6 +79,7 @@ class Profiler: optypes_to_deal='', optypes_not_deal='Variable', job_id=""): # get device_id and device_target device_target = "" + dev_id = "" try: import mindspore.context as context dev_id = str(context.get_context("device_id")) @@ -83,7 +87,7 @@ class Profiler: except ImportError: logger.error("Profiling: fail to import context from mindspore.") except ValueError as err: - logger.error("Profiling: fail to get context, %s", err.message) + logger.error("Profiling: fail to get context, %s", err) if not dev_id: dev_id = os.getenv('DEVICE_ID') @@ -185,7 +189,10 @@ class Profiler: # get op compute time from hwts data and framework data, write output_op_compute_time.txt opcompute_output_filename = self._opcompute_output_filename_target + self._dev_id + ".txt" opcompute_output_filename = os.path.join(self._output_path, opcompute_output_filename) - optime_parser = OPComputeTimeParser(hwts_output_filename, opcompute_output_filename, op_task_dict) + optime_parser = OPComputeTimeParser( + hwts_output_filename, opcompute_output_filename, + op_task_dict, self._output_path, self._dev_id + ) optime_parser.execute() # parse DATA_PREPROCESS.dev.AICPU file, write output_data_preprocess_aicpu_x.txt @@ -216,6 +223,9 @@ class Profiler: # analyse step trace info self._analyse_step_trace(source_path, framework_parser) + # analyse timeline info + self._analyse_timeline() + def _analyse_step_trace(self, source_path, framework_parser): """ Analyse step trace data and save the result. @@ -240,7 +250,34 @@ class Profiler: parser.parse_and_save() # print parser result parser.show() - logger.info("Finish save the intermediate result %s", step_trace_intermediate_file_path) + + def _analyse_timeline(self): + """ + Analyse and parse timeline info. + """ + # Get framework info + aicoredetail_analyser = AnalyserFactory.instance().get_analyser( + 'aicore_detail', self._output_path, self._dev_id + ) + framework_info = aicoredetail_analyser.query() + + # Get all reduce info + step_trace_analyser = AnalyserFactory.instance().get_analyser( + 'step_trace', self._output_path, self._dev_id + ) + all_reduce_info = step_trace_analyser.query_for_all_reduce() + + # Get timeline info + timeline_analyser = AnalyserFactory.instance().get_analyser( + 'timeline', self._output_path, self._dev_id + ) + timeline_analyser.add_framework_info(framework_info) + timeline_analyser.add_all_reduce_info(all_reduce_info) + try: + timeline_analyser.write_timeline() + timeline_analyser.write_timeline_summary() + except (ProfilerIOException, ProfilerFileNotFoundException, ValidationError) as err: + logger.warning('Fail to write timeline data: %s', err) def __del__(self): """Disable the profiling collection service, called after training.""" diff --git a/tests/st/func/profiler/test_analyse.py b/tests/st/func/profiler/test_analyse.py index 96867842b2eafa6ee58d625de5f03a600e55c4da..61a29dbf08ef26eb64392b2e4a98fe9b78f419f2 100644 --- a/tests/st/func/profiler/test_analyse.py +++ b/tests/st/func/profiler/test_analyse.py @@ -72,7 +72,6 @@ class TestProfilerAnalyse(TestCase): def test_step_trace_file_exist(self): """Test the step trace file has been generated""" output_files = os.listdir(self.profiler) - assert len(output_files) == 9 assert self.step_trace_file in output_files @pytest.mark.level0