From cce05461ecc31856e7e1a2d4cb9ae223cdf71d72 Mon Sep 17 00:00:00 2001 From: yelihua Date: Thu, 4 Jun 2020 17:18:01 +0800 Subject: [PATCH] implementation of step trace profiler --- mindinsight/backend/profiler/profile_api.py | 68 ++++- mindinsight/profiler/analyser/__init__.py | 3 + .../profiler/analyser/analyser_factory.py | 7 +- .../profiler/analyser/step_trace_analyser.py | 266 ++++++++++++++++++ mindinsight/profiler/common/_utils.py | 22 +- .../profiler/common/exceptions/error_code.py | 7 + .../profiler/common/exceptions/exceptions.py | 22 ++ mindinsight/profiler/common/util.py | 98 +++++++ .../profiler/common/validator/validate.py | 49 +++- .../common/validator/validate_path.py | 33 +++ .../profiler/parser/step_trace_parser.py | 264 +++++++++++++++++ mindinsight/profiler/profiling.py | 51 +++- 12 files changed, 863 insertions(+), 27 deletions(-) create mode 100644 mindinsight/profiler/analyser/step_trace_analyser.py create mode 100644 mindinsight/profiler/parser/step_trace_parser.py diff --git a/mindinsight/backend/profiler/profile_api.py b/mindinsight/backend/profiler/profile_api.py index f94f44e..19c2e5e 100644 --- a/mindinsight/backend/profiler/profile_api.py +++ b/mindinsight/backend/profiler/profile_api.py @@ -21,17 +21,17 @@ import json import os from flask import Blueprint -from flask import request from flask import jsonify +from flask import request from marshmallow import ValidationError from mindinsight.conf import settings -from mindinsight.datavisual.utils.tools import get_train_id, get_profiler_dir -from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory - +from mindinsight.datavisual.utils.tools import get_train_id, get_profiler_dir, to_int from mindinsight.lineagemgr.common.validator.validate_path import validate_and_normalize_path +from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory from mindinsight.profiler.common.util import analyse_device_list_from_profiler_dir -from mindinsight.profiler.common.validator.validate import validate_condition +from mindinsight.profiler.common.validator.validate import validate_condition, validate_ui_proc +from mindinsight.profiler.common.validator.validate_path import validate_and_normalize_profiler_path from mindinsight.utils.exceptions import ParamValueError BLUEPRINT = Blueprint("profile", __name__, url_prefix=settings.URL_PREFIX) @@ -109,6 +109,64 @@ def get_profile_device_list(): return jsonify(device_list) +@BLUEPRINT.route("/profile/training-trace/graph", methods=["GET"]) +def get_training_trace_graph(): + """ + Get training trace info of one step. + + Returns: + Response, the training trace info of one step. + + Examples: + >>> GET http://xxxx/v1/mindinsight/profile/training-trace/graph + """ + summary_dir = request.args.get("dir") + profiler_dir = validate_and_normalize_profiler_path(summary_dir) + graph_type = request.args.get("type", default='0') + graph_type = to_int(graph_type, 'graph_type') + device_id = request.args.get("device_id", default='0') + _ = to_int(device_id, 'device_id') + + analyser = AnalyserFactory.instance().get_analyser( + 'step_trace', profiler_dir, device_id) + graph_info = analyser.query({ + 'filter_condition': { + 'mode': 'step', + 'step_id': graph_type + }}) + + return jsonify(graph_info) + + +@BLUEPRINT.route("/profile/training-trace/target-time-info", methods=["GET"]) +def get_target_time_info(): + """ + Get all the time information of the specified column. + + Returns: + Response, all the time information of the specified column. + + Examples: + >>> GET http://xxxx/v1/mindinsight/profile/training-trace/target-time-info + """ + summary_dir = request.args.get("dir") + profiler_dir = validate_and_normalize_profiler_path(summary_dir) + proc_name = request.args.get("type") + validate_ui_proc(proc_name) + device_id = request.args.get("device_id", default='0') + _ = to_int(device_id, 'device_id') + + analyser = AnalyserFactory.instance().get_analyser( + 'step_trace', profiler_dir, device_id) + target_time_info = analyser.query({ + 'filter_condition': { + 'mode': 'proc', + 'proc_name': proc_name + }}) + target_time_info['summary'] = analyser.summary + return jsonify(target_time_info) + + def init_module(app): """ Init module entry. diff --git a/mindinsight/profiler/analyser/__init__.py b/mindinsight/profiler/analyser/__init__.py index e307743..384a027 100644 --- a/mindinsight/profiler/analyser/__init__.py +++ b/mindinsight/profiler/analyser/__init__.py @@ -12,3 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================ +"""Import analyser.""" +from .analyser import * +from .step_trace_analyser import StepTraceAnalyser diff --git a/mindinsight/profiler/analyser/analyser_factory.py b/mindinsight/profiler/analyser/analyser_factory.py index bd7aaeb..ea0148b 100644 --- a/mindinsight/profiler/analyser/analyser_factory.py +++ b/mindinsight/profiler/analyser/analyser_factory.py @@ -15,7 +15,7 @@ """The analyser factory.""" import threading -import mindinsight.profiler.analyser.analyser as analyser_module +import mindinsight.profiler.analyser as analyser_module from mindinsight.profiler.common.exceptions.exceptions import \ ProfilerAnalyserNotExistException @@ -25,11 +25,12 @@ class AnalyserFactory: The analyser factory is used to create analyser special instance. Currently the factory supports creating `AicoreTypeAnalyser`, - `AicoreDetailAnalyser` and `AicpuAnalyser`. The `AicoreTypeAnalyser` is used - to analyze execution time according to AICORE operator type. + `AicoreDetailAnalyser`, `AicpuAnalyser` and `StepTraceAnalyser`. + The `AicoreTypeAnalyser` is used to analyze execution time according to AICORE operator type. The `AicoreDetailAnalyser` is used to analyze execution time according to all specific AICORE operator. The `AicpuAnalyser` is used to analyze execution time according to all specific AICPU operator. + The `StepTraceAnalyser` is used to analyze the execution time according to different process. Examples: >>> analyser = AnalyserFactory.instance().get_analyser( diff --git a/mindinsight/profiler/analyser/step_trace_analyser.py b/mindinsight/profiler/analyser/step_trace_analyser.py new file mode 100644 index 0000000..72433e1 --- /dev/null +++ b/mindinsight/profiler/analyser/step_trace_analyser.py @@ -0,0 +1,266 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""The StepTraceAnalyser analyser class.""" +import csv + +from mindinsight.profiler.analyser.base_analyser import BaseAnalyser +from mindinsight.profiler.common.exceptions.exceptions import ProfilerParamValueErrorException, \ + ProfilerFileNotFoundException, StepNumNotSupportedException +from mindinsight.profiler.common.log import logger as log +from mindinsight.profiler.common.util import query_latest_trace_time_file, get_field_value, \ + get_summary_for_step_trace + + +class StepTraceAnalyser(BaseAnalyser): + """The analyser for analyzing training steps.""" + + __col_names__ = [] + _attr_ui_name = 'name' + _attr_ui_start = 'start' + _attr_ui_duration = 'duration' + + @property + def summary(self): + """The property of summary info.""" + + summary = get_summary_for_step_trace(self._data[-1], self.__column__) + summary['total_steps'] = self._size + return summary + + def query(self, condition=None): + """ + Query data according to the condition. + + Args: + condition (dict): The search condition, only contains `filter_condition` parameter. + Default: None. + + Returns: + dict, the result after filtered, sorted and grouped. + """ + if condition is None: + condition = {} + filter_condition = condition.get('filter_condition', {}) + self._validate_filter_condition(filter_condition) + self._result = {'size': self._size} + self._filter(filter_condition) + + return self._result + + def query_for_all_reduce(self): + """ + Query for all reduce info. + + Returns: + list[dict], each item is the reduce info for one step, the reduce info is format like: + {stream_id: List[Tuple(start_point, end_point, duration, field_name)]}. + """ + reduce_infos = [] + for row_info in self._data[:-1]: + reduce_info = self._get_reduce_time_in_order(row_info, 'systime') + reduce_infos.append(reduce_info) + + return reduce_infos + + def _load(self): + """Load data according to the parsed AICORE operator types file.""" + file_path = query_latest_trace_time_file(self._profiling_dir, self._device_id) + if not file_path: + log.error("Failed to find parsed trace time file.") + raise ProfilerFileNotFoundException('parsed step trace time file') + with open(file_path, 'r') as handle: + csv_reader = csv.reader(handle) + self.__column__ = next(csv_reader) + self._data = list(csv_reader) + self._size = len(self._data) - 1 + self._display_col_names = self.__col_names__[:] + + def _filter(self, filter_condition): + """ + Filter the profiling data according to the filter condition. + + Args: + filter_condition (dict): The filter condition. + + - mode (str): The kind of information. `step` return the info about specific + step. `proc` return the info about specific field in parsed trace file. + + - step_id (int): The selected step_id. If not given, it means all steps is required. + If the value is 0, it means average info for all steps except the first is + required. + + - proc_name (str): The selected field name. + + - time_type (str): The value type. `systime` keeps the original value. + `realtime` transforms the value in millisecond. Default: `realtime`. + """ + mode = filter_condition.get('mode', 'step') + if mode == 'step': + self._get_step_details(step_id=filter_condition.get('step_id'), + time_type=filter_condition.get('time_type', 'realtime')) + else: + self._get_proc_details(step_id=filter_condition.get('step_id'), + proc_name=filter_condition.get('proc_name'), + time_type=filter_condition.get('time_type', 'realtime')) + + def _construct_time_point(self, name, start, duration): + """Construct time point.""" + point = { + self._attr_ui_name: name, + self._attr_ui_start: round(start, 4), + self._attr_ui_duration: round(duration, 4) + } + return point + + def _get_step_details(self, step_id, time_type='realtime'): + """ + Get step trace info for selected step and save the result. + + Args: + step_id (int): The selected step_id. If the value is 0, it means average info + for all steps except the first is required. + time_type (str): The value type. `systime` keeps the original value. + `realtime` transforms the value in millisecond. Default: `realtime`. + """ + if step_id is None: + step_id = 0 + row_info = self._data[step_id - 1] + + start_point = get_field_value(row_info, 'start_point', self.__column__, time_type) + total = get_field_value(row_info, 'total', self.__column__, time_type) + iteration_interval = get_field_value(row_info, 'iteration_interval', self.__column__, + time_type) + fp_point = get_field_value(row_info, 'fp_point', self.__column__, time_type) + fp_and_bp = get_field_value(row_info, 'fp_and_bp', self.__column__, time_type) + bp_point = get_field_value(row_info, 'bp_point', self.__column__, time_type) + tail = get_field_value(row_info, 'tail', self.__column__, time_type) + # first line only contains total time + first_line = [self._construct_time_point('', 0, total)] + # second line contains iteration_interval, fp_and_bp and tail + second_line = [ + self._construct_time_point('', 0, iteration_interval), + self._construct_time_point('fp_and_bp', fp_point - start_point, fp_and_bp), + self._construct_time_point('', bp_point - start_point, tail), + ] + # construct reduces lines + reduce_lines = self._construct_reduce_lines(row_info, time_type) + + graph = [first_line, second_line] + graph.extend(reduce_lines) + self._result['training_trace_graph'] = graph + + def _get_reduce_time_in_order(self, row_info, time_type): + """Get reduce time in order.""" + reduce_info = {} + reduce_fields = [field_name for field_name in self.__column__ + if field_name.startswith('stream_') and not field_name.endswith('point')] + for reduce_field in reduce_fields: + cur_stream_id = reduce_field.split('_', 2)[1] + cur_stream = reduce_info.get(cur_stream_id) + if not cur_stream: + cur_stream = [] + reduce_info[cur_stream_id] = cur_stream + reduce_start = get_field_value( + row_info, reduce_field + '_start_point', self.__column__, time_type) + reduce_end = get_field_value( + row_info, reduce_field + '_end_point', self.__column__, time_type) + reduce_duration = get_field_value( + row_info, reduce_field, self.__column__, time_type) + cur_stream.append((reduce_start, reduce_end, reduce_duration, reduce_field)) + for _, reduce_events in reduce_info.items(): + reduce_events.sort(key=lambda elem: elem[1]) + return reduce_info + + def _construct_reduce_lines(self, row_info, time_type): + """Contruct first line in detailed graph.""" + reduce_lines = [] + start_point = get_field_value(row_info, 'start_point', self.__column__, time_type) + fp_point = get_field_value(row_info, 'fp_point', self.__column__, time_type) + end_point = get_field_value(row_info, 'end_point', self.__column__, time_type) + reduce_info = self._get_reduce_time_in_order(row_info, time_type) + # construct time point for each line + for _, reduce_events in reduce_info.items(): + current_line = self._construct_reduce_line( + start_point, end_point, fp_point, reduce_events) + reduce_lines.append(current_line) + + return reduce_lines + + def _construct_reduce_line(self, start_point, end_point, fp_point, reduce_events): + """Construct list of time points for reduce line.""" + current_line = [] + previous_start = fp_point + for start, end, duration, field_name in reduce_events: + current_line.extend([ + self._construct_time_point( + '', previous_start - start_point, start - previous_start), + self._construct_time_point( + field_name, start - start_point, duration) + ]) + previous_start = end + current_line.append(self._construct_time_point( + '', previous_start - start_point, end_point - previous_start)) + return current_line + + def _get_proc_details(self, proc_name, step_id=None, time_type='realtime'): + """ + Get step trace info for selected step and save the result. + + Args: + proc_name (str): The selected field name. + step_id (int): The selected step_id. If not given, it means all steps is required. + If the value is 0, it means average info for all steps except the first is + required. Default: None. + time_type (str): The value type. `systime` keeps the original value. + `realtime` transforms the value in millisecond. Default: `realtime`. + """ + + if step_id is None: + rows_info = self._data[:-1] + else: + rows_info = [self._data[step_id - 1]] + + proc_info = [get_field_value(row_info, proc_name, self.__column__, time_type) + for row_info in rows_info] + self._result['info'] = {proc_name: proc_info} + + def _validate_filter_condition(self, filter_condition): + """Validate step trace filter_condition.""" + mode = filter_condition.get('mode', 'step') + self._validate_str_param(mode, ['step', 'proc'], 'mode') + + step_id = filter_condition.get('step_id', 0) + self._validate_step_id(step_id) + + proc_name = filter_condition.get('proc_name') + self._validate_str_param(proc_name, self.__column__, 'proc_name') + + time_type = filter_condition.get('time_type', 'realtime') + self._validate_str_param(time_type, ['realtime', 'systime'], 'time_type') + + def _validate_step_id(self, step_id): + """Validate step_id.""" + if isinstance(step_id, int) and 0 <= step_id <= self._size: + return + log.error("Invalid step_id in request. step_id should be in [0, %d].", self._size) + raise StepNumNotSupportedException([0, self._size]) + + @staticmethod + def _validate_str_param(proc_name, accept_param, error_name=''): + """Validate proc_name.""" + if proc_name is None or isinstance(proc_name, str) and proc_name in accept_param: + return + log.error("Invalid param %s in request. Acceptable value is %s.", error_name, accept_param) + raise ProfilerParamValueErrorException("Invalid proc_name.") diff --git a/mindinsight/profiler/common/_utils.py b/mindinsight/profiler/common/_utils.py index 25a24b6..2c059ce 100755 --- a/mindinsight/profiler/common/_utils.py +++ b/mindinsight/profiler/common/_utils.py @@ -16,15 +16,16 @@ import os import re + def fwrite_format(output_data_path, data_source=None, is_print=False, is_start=False): """ Write data to the output file. Args: - output_data_path(str): the output file path of the data. - data_source(list): the data to write. - is_print(bool): whether to print the data to stdout. - is_start(bool): Whether is the first line of the output file, will remove the old file if True." + output_data_path (str): The output file path of the data. + data_source (list): The data to write. + is_print (bool): whether to print the data to stdout. + is_start (bool): Whether is the first line of the output file, will remove the old file if True." """ if is_start is True and os.path.exists(output_data_path): @@ -48,16 +49,17 @@ def get_log_slice_id(file_name): index = re.findall(r'\d+', slice_list[0]) return int(index[0]) + def get_file_join_name(input_path, file_name): """ Search files under the special path, and will join all the files to one file. Args: - input_path(str): the source path, will search files under it. - file_name(str): the target of the filename, such as 'hwts.log.data.45.dev'. + input_path (str): The source path, will search files under it. + file_name (str): The target of the filename, such as 'hwts.log.data.45.dev'. Returns: - str: the join file name. + str, the join file name. """ name_list = [] file_join_name = '' @@ -90,11 +92,11 @@ def get_file_names(input_path, file_name): Search files under the special path. Args: - input_path(str): the souce path, will serch files under it. - file_name(str): the target of the filename, such as 'host_start_log'. + input_path (str): The source path, will search files under it. + file_name (str): The target of the filename, such as 'host_start_log'. Returns: - list: file name list. + list, file name list. """ input_path = os.path.realpath(input_path) diff --git a/mindinsight/profiler/common/exceptions/error_code.py b/mindinsight/profiler/common/exceptions/error_code.py index 64360c2..9e350a4 100644 --- a/mindinsight/profiler/common/exceptions/error_code.py +++ b/mindinsight/profiler/common/exceptions/error_code.py @@ -37,6 +37,8 @@ class ProfilerErrors(ProfilerMgrErrors): # parser error code DEVICE_ID_MISMATCH_ERROR = 0 | _PARSER_MASK RAW_FILE_ERROR = 1 | _PARSER_MASK + STEP_NUM_NOT_SUPPORTED_ERROR = 2 | _PARSER_MASK + JOB_ID_MISMATCH_ERROR = 3 | _PARSER_MASK # analyser error code COLUMN_NOT_EXIST_ERROR = 0 | _ANALYSER_MASK @@ -49,6 +51,8 @@ class ProfilerErrors(ProfilerMgrErrors): COLUMN_NOT_SUPPORT_SORT_ERROR = 7 | _ANALYSER_MASK + + @unique class ProfilerErrorMsg(Enum): """Profiler error messages.""" @@ -63,6 +67,9 @@ class ProfilerErrorMsg(Enum): # parser error msg DEVICE_ID_MISMATCH_ERROR = 'The device ID mismatch.' RAW_FILE_ERROR = 'Raw file error. {}' + STEP_NUM_NOT_SUPPORTED_ERROR = 'The step num must be in {}' + JOB_ID_MISMATCH_ERROR = 'The job id in the parameter is not the same as ' \ + 'in the training trace file. ' # analyser error msg COLUMN_NOT_EXIST_ERROR = 'The column {} does not exist.' diff --git a/mindinsight/profiler/common/exceptions/exceptions.py b/mindinsight/profiler/common/exceptions/exceptions.py index 8502442..ec4a9f1 100644 --- a/mindinsight/profiler/common/exceptions/exceptions.py +++ b/mindinsight/profiler/common/exceptions/exceptions.py @@ -192,3 +192,25 @@ class ProfilerColumnNotSupportSortException(MindInsightException): message=ProfilerErrorMsg.COLUMN_NOT_SUPPORT_SORT_ERROR.value.format(msg), http_code=400 ) + + +class StepNumNotSupportedException(MindInsightException): + """The step number error in profiler module.""" + + def __init__(self, msg): + super(StepNumNotSupportedException, self).__init__( + error=ProfilerErrors.STEP_NUM_NOT_SUPPORTED_ERROR, + message=ProfilerErrorMsg.STEP_NUM_NOT_SUPPORTED_ERROR.value.format(msg), + http_code=400 + ) + + +class JobIdMismatchException(MindInsightException): + """The Job ID mismatch error in profiler module.""" + + def __init__(self): + super(JobIdMismatchException, self).__init__( + error=ProfilerErrors.JOB_ID_MISMATCH_ERROR, + message=ProfilerErrorMsg.JOB_ID_MISMATCH_ERROR.value, + http_code=400 + ) diff --git a/mindinsight/profiler/common/util.py b/mindinsight/profiler/common/util.py index 6702aea..14c0642 100644 --- a/mindinsight/profiler/common/util.py +++ b/mindinsight/profiler/common/util.py @@ -19,6 +19,8 @@ This module provides the utils. """ import os +from mindinsight.datavisual.utils.tools import to_int + def analyse_device_list_from_profiler_dir(profiler_dir): """ @@ -40,3 +42,99 @@ def analyse_device_list_from_profiler_dir(profiler_dir): device_id_list.add(device_num) return list(device_id_list) + + +def query_latest_trace_time_file(profiler_dir, device_id=0): + """ + Query the latest trace time file. + + Args: + profiler_dir (str): The profiler directory. + device_id (int): The id of device. + + Returns: + str, the latest trace time file path. + """ + files = os.listdir(profiler_dir) + target_file = f'step_trace_raw_{device_id}_detail_time.csv' + try: + latest_file = max( + filter( + lambda file: file == target_file, + files + ), + key=lambda file: os.stat(os.path.join(profiler_dir, file)).st_mtime + ) + except ValueError: + return None + return os.path.join(profiler_dir, latest_file) + + +def query_step_trace_file(profiler_dir): + """ + Query for all step trace file. + + Args: + profiler_dir (str): The directory that contains all step trace files. + + Returns: + str, the file path of step trace time. + """ + files = os.listdir(profiler_dir) + training_trace_file = list( + filter( + lambda file: file.startswith('training_trace') and not file.endswith('.done'), + files + ) + ) + if training_trace_file: + return os.path.join(profiler_dir, training_trace_file[0]) + return None + + +def get_summary_for_step_trace(average_info, header): + """The property of summary info.""" + if not average_info or not header: + return {} + total_time = get_field_value(average_info, 'total', header) + iteration_interval = get_field_value(average_info, 'iteration_interval', + header) + fp_and_bp = get_field_value(average_info, 'fp_and_bp', header) + tail = get_field_value(average_info, 'tail', header) + summary = { + 'total_time': total_time, + 'iteration_interval': calculate_percent(iteration_interval, total_time), + 'fp_and_bp': calculate_percent(fp_and_bp, total_time), + 'tail': calculate_percent(tail, total_time) + } + return summary + + +def calculate_percent(partial, total): + """Calculate percent value.""" + percent = round(partial / total * 100, 2) + return f'{percent}%' + + +def get_field_value(row_info, field_name, header, time_type='realtime'): + """ + Extract basic info through row_info. + + Args: + row_info (list): The list of data info in one row. + header (list[str]): The list of field names. + field_name (str): The name in header. + time_type (str): The type of value, `realtime` or `systime`. Default: `realtime`. + + Returns: + dict, step trace info in dict format. + """ + # one sys count takes 10 ns, 1 ms has 100000 syscnt + per_ms_syscnt = 100000 + field_index = header.index(field_name) + value = row_info[field_index] + value = to_int(value, field_name) + if time_type == 'realtime': + value = value / per_ms_syscnt + + return value diff --git a/mindinsight/profiler/common/validator/validate.py b/mindinsight/profiler/common/validator/validate.py index 1da0276..e70855b 100644 --- a/mindinsight/profiler/common/validator/validate.py +++ b/mindinsight/profiler/common/validator/validate.py @@ -13,9 +13,14 @@ # limitations under the License. # ============================================================================ """Validate the profiler parameters.""" +import os +import sys + +from mindinsight.datavisual.utils.tools import to_int from mindinsight.profiler.common.exceptions.exceptions import ProfilerParamTypeErrorException, \ ProfilerDeviceIdException, ProfilerOpTypeException, \ - ProfilerSortConditionException, ProfilerFilterConditionException, ProfilerGroupConditionException + ProfilerSortConditionException, ProfilerFilterConditionException, \ + ProfilerGroupConditionException, ProfilerParamValueErrorException from mindinsight.profiler.common.log import logger as log AICORE_TYPE_COL = ["op_type", "execution_time", "execution_frequency", "precent"] @@ -71,6 +76,7 @@ def validate_condition(search_condition): if "filter_condition" in search_condition: validate_filter_condition(search_condition) + def validata_group_condition(search_condition): """ Verify the group_condition in search_condition is valid or not. @@ -185,3 +191,44 @@ def validate_filter_condition(search_condition): validate_op_filter_condition(op_name_condition) if "op_type" not in filter_condition and "op_name" not in filter_condition: raise ProfilerFilterConditionException("The key of filter_condition is not support") + + +def validate_and_set_job_id_env(job_id_env): + """ + Validate the job id and set it in environment. + + Args: + job_id_env (str): The id that to be set in environment parameter `JOB_ID`. + + Returns: + int, the valid job id env. + """ + if job_id_env is None: + return job_id_env + # get job_id_env in int type + valid_id = to_int(job_id_env, 'job_id_env') + # check the range of valid_id + if valid_id and 255 < valid_id < sys.maxsize: + os.environ['JOB_ID'] = job_id_env + else: + log.warning("Invalid job_id_env %s. The value should be int and between 255 and %s. Use" + "default job id env instead.", + job_id_env, sys.maxsize) + return valid_id + + +def validate_ui_proc(proc_name): + """ + Validate proc name in restful request. + + Args: + proc_name (str): The proc name to query. Acceptable value is in + [`iteration_interval`, `fp_and_bp`, `tail`]. + + Raises: + ProfilerParamValueErrorException: If the proc_name is invalid. + """ + accept_names = ['iteration_interval', 'fp_and_bp', 'tail'] + if proc_name not in accept_names: + log.error("Invalid proc_name. The proc_name for restful api is in %s", accept_names) + raise ProfilerParamValueErrorException(f'proc_name should be in {accept_names}.') diff --git a/mindinsight/profiler/common/validator/validate_path.py b/mindinsight/profiler/common/validator/validate_path.py index db85480..0643ff5 100644 --- a/mindinsight/profiler/common/validator/validate_path.py +++ b/mindinsight/profiler/common/validator/validate_path.py @@ -15,8 +15,14 @@ """Validate the input path.""" import os from typing import Union, List +from urllib.parse import unquote + from marshmallow import ValidationError +from mindinsight.profiler.common.exceptions.exceptions import \ + ProfilerParamValueErrorException +from mindinsight.profiler.common.log import logger as log + def safe_normalize_path( path, @@ -117,3 +123,30 @@ def validate_and_normalize_path( raise ValidationError({raise_key: {"The path is invalid!"}}) return normalized_path + + +def validate_and_normalize_profiler_path(path): + """ + Validate and normalize profiler path. + + Args: + path (str): The path of summary directory. + + Returns: + str, normalized path of profiler directory. + """ + if not path: + raise ProfilerParamValueErrorException('The file dir does not exist.') + try: + unquote_path = unquote(path, errors='strict') + except UnicodeDecodeError: + raise ProfilerParamValueErrorException('Unquote error with strict mode') + + profiler_dir = os.path.join(unquote_path, 'profiler') + try: + profiler_dir = validate_and_normalize_path(profiler_dir, 'profiler') + except ValidationError: + log.error('profiler dir <%s> is invalid', unquote_path) + raise ProfilerParamValueErrorException('Profiler dir is invalid.') + + return profiler_dir diff --git a/mindinsight/profiler/parser/step_trace_parser.py b/mindinsight/profiler/parser/step_trace_parser.py new file mode 100644 index 0000000..4b51326 --- /dev/null +++ b/mindinsight/profiler/parser/step_trace_parser.py @@ -0,0 +1,264 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""The parser for step trace data.""" +import csv +import os +import stat +import struct +from collections import namedtuple +from decimal import Decimal + +from mindinsight.profiler.common.exceptions.exceptions import ProfilerPathErrorException, \ + JobIdMismatchException +from mindinsight.profiler.common.log import logger as log +from mindinsight.profiler.common.util import get_summary_for_step_trace +from mindinsight.utils.exceptions import MindInsightException + +StepTraceStruct = namedtuple( + 'TrainingTraceStruct', ['tag_id', 'task_id', 'stream_id', 'sys_count'] +) + + +class StepTraceParser: + """ + The parser for step trace data. + + Args: + input_dir (str): The directory that contains original step trace data. + output_file_path (str): The output file path. + skip_first_step (bool): Whether skip the first step or not. + """ + _event_size = 20 + + def __init__(self, input_dir, output_file_path, job_id, skip_first_step=False): + self._input_dir = input_dir + self._output_path = output_file_path + self._job_id = job_id + self._skip_first_step = skip_first_step + self._result = [] + self._header = [] + self._step_num = 0 + + @property + def output_file(self): + """The property of step trace header.""" + file_name = self._output_path.rsplit('/', 2) + return file_name + + def show(self): + """The property of step trace info.""" + summary_info = {} + if self._result: + summary_info = get_summary_for_step_trace(self._result[-1], self._header) + summary_info['total_steps'] = len(self._result) - 1 + print('\nStep trace summary info (unit: syscnt):') + print(summary_info) + print('\nThe step trace parse result saves under {summary_dir}/%s' % self.output_file) + + def parse_and_save(self): + """Parse step trace files and save the result.""" + try: + source_file = self._get_step_trace_file() + self._parse(source_file) + self._save() + except MindInsightException as err: + log.error("Failed to parse and save step trace files.") + log.exception(err) + log.info("Finish to save intermediate result for step trace file.") + + def _get_step_trace_file(self): + """Get step trace file.""" + profiling_path = self._input_dir + # validate input_dir + if not os.path.isdir(profiling_path): + raise ProfilerPathErrorException( + '{} does not exist or is not a dir'.format(profiling_path) + ) + # get step trace files + files = os.listdir(profiling_path) + step_trace_files = list( + filter( + lambda file: file.startswith('training_trace') and not file.endswith('.done'), + files + ) + ) + # validate result + if not step_trace_files: + raise ProfilerPathErrorException('training trace file does not exist') + if len(step_trace_files) > 1: + log.warning("Not enable to parse multiple step trace files yet.") + step_trace_file = os.path.join(profiling_path, step_trace_files[0]) + return step_trace_file + + def _parse(self, source_file): + """Parse source step trace file.""" + log.info("Start to parse step trace file.") + with open(source_file, 'rb') as handler: + content = handler.read() + for step_trace in self._get_next_step_trace(content): + if self._skip_first_step: + self._skip_first_step = False + else: + self._record_trace_event(step_trace) + self._record_average_info() + log.info("Finish to parse step trace file.") + + def _get_next_step_trace(self, content): + """ + Get next step trace info. + + Args: + content (bytes): The input step trace info + Returns: + Generator, return the step trace one by one. + """ + event_info = {} + for pos in range(0, len(content), 20): + next_event = self._get_trace_struct(content[pos:pos + self._event_size]) + self._construct_event_info(next_event, event_info) + if event_info.get('end'): + yield event_info + event_info = { + 'start': event_info.get('end'), + 'reduce': {} + } + + def _get_trace_struct(self, bin_info): + """Translate event info to StepTraceStruct.""" + if len(bin_info) == self._event_size: + parsed_info = struct.unpack('=QHHQ', bin_info) + return StepTraceStruct(*parsed_info) + return None + + def _construct_event_info(self, next_event, event_info): + """Construct event info according to next_event.""" + min_job_id = 255 + step_flag: bool = lambda tag: tag > min_job_id + end_flag: bool = lambda tag: tag == min_job_id + fp_flag: bool = lambda tag: tag == 1 + bp_flag: bool = lambda tag: tag == 2 + + def _on_step_event(): + """Handle step event.""" + self._validate_tag_id(tag_id) + if event_info.get('start'): + event_info['end'] = sys_count + else: + event_info['start'] = sys_count + event_info['reduce'] = {} + + def _on_reduce_event(): + """Handle reduce event.""" + stream_id = next_event.stream_id + if event_info['reduce'].get(stream_id): + event_info['reduce'][stream_id].append(sys_count) + else: + event_info['reduce'][stream_id] = [sys_count] + + tag_id = next_event.tag_id + sys_count = next_event.sys_count + if end_flag(tag_id): + event_info['end'] = sys_count + elif step_flag(tag_id): + _on_step_event() + elif fp_flag(tag_id): + event_info['fp'] = sys_count + elif bp_flag(tag_id): + event_info['bp'] = sys_count + else: + _on_reduce_event() + + def _validate_tag_id(self, job_id): + """Check the job id in source step trace file is same os user set.""" + if not self._job_id: + self._job_id = job_id + elif self._job_id != job_id: + raise JobIdMismatchException() + + def _record_trace_event(self, step_trace): + """Record trace event.""" + self._step_num += 1 + start_time = step_trace.get('start') + end_time = step_trace.get('end') + fp_time = step_trace.get('fp') + bp_time = step_trace.get('bp') + if not (start_time and end_time and fp_time and bp_time): + log.warning("The step %d is missing basic time.", self._step_num) + return + + row_data = { + 'step_num': self._step_num, + 'start_point': start_time, + 'end_point': end_time, + 'total': end_time - start_time, + 'fp_point': fp_time, + 'bp_point': bp_time, + 'iteration_interval': fp_time - start_time, + 'fp_and_bp': bp_time - fp_time, + 'tail': end_time - bp_time + } + # update reduce info + self._update_reduce_info(step_trace, row_data) + # save the row data + if not self._header: + self._header = list(row_data.keys()) + row_data_list = [row_data[header_name] for header_name in self._header] + self._result.append(row_data_list) + + @staticmethod + def _update_reduce_info(step_trace, row_data): + """Extract reduce info.""" + reduce_time = step_trace.get('reduce', {}) + for stream_id, time_points in reduce_time.items(): + time_point_num = len(time_points) + if time_point_num % 2: + log.warning("Stream %d has %d reduce time points.", stream_id, time_point_num) + continue + for index, point_id in enumerate(range(0, time_point_num, 2)): + field_name = f'stream_{stream_id}_parallel_{index}' + row_data[field_name + '_start_point'] = time_points[point_id] + row_data[field_name + '_end_point'] = time_points[point_id + 1] + row_data[field_name] = time_points[point_id + 1] - time_points[point_id] + + def _record_average_info(self): + """Calculate average info.""" + result_size = len(self._result) + if result_size < 2: + return + # calculate average data for each column in result data + average_data = [0] * len(self._header) + for row_info in self._result[1:]: + average_data = [ + Decimal(i) + Decimal(j) for i, j in zip(row_info, average_data) + ] + average_data = [ + round((item / (result_size - 1))) for item in average_data + ] + # change step num info in average_data to None + step_num_index = self._header.index('step_num') + average_data[step_num_index] = '-' + self._result.append(average_data) + log.info("Finish add average info for step trace.") + + def _save(self): + log.info("Start to save step trace file.") + if not self._header: + return + with open(self._output_path, 'w') as file_handle: + csv_writer = csv.writer(file_handle) + csv_writer.writerow(self._header) + for row_data in self._result: + csv_writer.writerow(row_data) + os.chmod(self._output_path, stat.S_IREAD | stat.S_IWRITE) diff --git a/mindinsight/profiler/profiling.py b/mindinsight/profiler/profiling.py index 2a1f3c0..dde54d8 100644 --- a/mindinsight/profiler/profiling.py +++ b/mindinsight/profiler/profiling.py @@ -15,22 +15,26 @@ """Profiling api file.""" import os import time + from tabulate import tabulate -from mindinsight.profiler.parser.hwts_log_parser import HWTSLogParser -from mindinsight.profiler.parser.framework_parser import FrameworkParser -from mindinsight.profiler.parser.optime_parser import OPComputeTimeParser -from mindinsight.profiler.parser.aicpu_data_parser import DataPreProcessParser + from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory from mindinsight.profiler.analyser.integrator import Integrator from mindinsight.profiler.common._utils import get_file_names, fwrite_format -from mindinsight.profiler.common.validator.validate_path import \ - validate_and_normalize_path +from mindinsight.profiler.common.log import logger from mindinsight.profiler.common.validator.checkparam import \ check_bool, check_subgraph -from mindinsight.profiler.common.log import logger +from mindinsight.profiler.common.validator.validate_path import \ + validate_and_normalize_path +from mindinsight.profiler.parser.aicpu_data_parser import DataPreProcessParser +from mindinsight.profiler.parser.framework_parser import FrameworkParser +from mindinsight.profiler.parser.hwts_log_parser import HWTSLogParser +from mindinsight.profiler.parser.optime_parser import OPComputeTimeParser +from mindinsight.profiler.parser.step_trace_parser import StepTraceParser from mindinsight.utils.exceptions import MindInsightException PROFILING_LOG_BASE_PATH = "/var/log/npu/profiling" +INIT_OP_NAME = 'Default/InitDataSetQueue' class Profiler: @@ -87,7 +91,7 @@ class Profiler: if device_target and device_target != "Davinci" \ and device_target != "Ascend": msg = ("Profiling: unsupport backend: %s" \ - % device_target) + % device_target) raise RuntimeError(msg) self._dev_id = dev_id @@ -120,6 +124,8 @@ class Profiler: self._detail = check_bool(is_detail, 'is_detail') self._withfullpath = check_bool(is_show_op_path, 'is_show_op_path') self._profiling_job_id = job_id + # add job id env through user input later + self._job_id_env = None self._start_time = int(time.time() * 10000000) logger.info("Profiling: profiling start time: %d", self._start_time) @@ -193,6 +199,35 @@ class Profiler: except MindInsightException as err: logger.error(err.message) + # analyse step trace info + self._analyse_step_trace(source_path, framework_parser) + + def _analyse_step_trace(self, source_path, framework_parser): + """ + Analyse step trace data and save the result. + + Args: + source_path (str): The directory that contains the step trace original data. + framework_parser (str): The framework parse instance. + """ + logger.info("Begin to parse step trace.") + # construct output path + step_trace_intermediate_file_path = os.path.join( + self._output_path, + f'step_trace_raw_{self._dev_id}_detail_time.csv' + ) + # whether keep the first step + skip_first_step_flag = framework_parser.check_op_name(INIT_OP_NAME) + # parser the step trace files and save the result to disk + parser = StepTraceParser(input_dir=source_path, + output_file_path=step_trace_intermediate_file_path, + job_id=self._job_id_env, + skip_first_step=skip_first_step_flag) + parser.parse_and_save() + # print parser result + parser.show() + logger.info("Finish save the intermediate result %s", step_trace_intermediate_file_path) + def __del__(self): """Disable the profiling collection service, called after training.""" -- GitLab