提交 cce05461 编写于 作者: Y yelihua

implementation of step trace profiler

上级 e569b96b
......@@ -21,17 +21,17 @@ import json
import os
from flask import Blueprint
from flask import request
from flask import jsonify
from flask import request
from marshmallow import ValidationError
from mindinsight.conf import settings
from mindinsight.datavisual.utils.tools import get_train_id, get_profiler_dir
from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory
from mindinsight.datavisual.utils.tools import get_train_id, get_profiler_dir, to_int
from mindinsight.lineagemgr.common.validator.validate_path import validate_and_normalize_path
from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory
from mindinsight.profiler.common.util import analyse_device_list_from_profiler_dir
from mindinsight.profiler.common.validator.validate import validate_condition
from mindinsight.profiler.common.validator.validate import validate_condition, validate_ui_proc
from mindinsight.profiler.common.validator.validate_path import validate_and_normalize_profiler_path
from mindinsight.utils.exceptions import ParamValueError
BLUEPRINT = Blueprint("profile", __name__, url_prefix=settings.URL_PREFIX)
......@@ -109,6 +109,64 @@ def get_profile_device_list():
return jsonify(device_list)
@BLUEPRINT.route("/profile/training-trace/graph", methods=["GET"])
def get_training_trace_graph():
"""
Get training trace info of one step.
Returns:
Response, the training trace info of one step.
Examples:
>>> GET http://xxxx/v1/mindinsight/profile/training-trace/graph
"""
summary_dir = request.args.get("dir")
profiler_dir = validate_and_normalize_profiler_path(summary_dir)
graph_type = request.args.get("type", default='0')
graph_type = to_int(graph_type, 'graph_type')
device_id = request.args.get("device_id", default='0')
_ = to_int(device_id, 'device_id')
analyser = AnalyserFactory.instance().get_analyser(
'step_trace', profiler_dir, device_id)
graph_info = analyser.query({
'filter_condition': {
'mode': 'step',
'step_id': graph_type
}})
return jsonify(graph_info)
@BLUEPRINT.route("/profile/training-trace/target-time-info", methods=["GET"])
def get_target_time_info():
"""
Get all the time information of the specified column.
Returns:
Response, all the time information of the specified column.
Examples:
>>> GET http://xxxx/v1/mindinsight/profile/training-trace/target-time-info
"""
summary_dir = request.args.get("dir")
profiler_dir = validate_and_normalize_profiler_path(summary_dir)
proc_name = request.args.get("type")
validate_ui_proc(proc_name)
device_id = request.args.get("device_id", default='0')
_ = to_int(device_id, 'device_id')
analyser = AnalyserFactory.instance().get_analyser(
'step_trace', profiler_dir, device_id)
target_time_info = analyser.query({
'filter_condition': {
'mode': 'proc',
'proc_name': proc_name
}})
target_time_info['summary'] = analyser.summary
return jsonify(target_time_info)
def init_module(app):
"""
Init module entry.
......
......@@ -12,3 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Import analyser."""
from .analyser import *
from .step_trace_analyser import StepTraceAnalyser
......@@ -15,7 +15,7 @@
"""The analyser factory."""
import threading
import mindinsight.profiler.analyser.analyser as analyser_module
import mindinsight.profiler.analyser as analyser_module
from mindinsight.profiler.common.exceptions.exceptions import \
ProfilerAnalyserNotExistException
......@@ -25,11 +25,12 @@ class AnalyserFactory:
The analyser factory is used to create analyser special instance.
Currently the factory supports creating `AicoreTypeAnalyser`,
`AicoreDetailAnalyser` and `AicpuAnalyser`. The `AicoreTypeAnalyser` is used
to analyze execution time according to AICORE operator type.
`AicoreDetailAnalyser`, `AicpuAnalyser` and `StepTraceAnalyser`.
The `AicoreTypeAnalyser` is used to analyze execution time according to AICORE operator type.
The `AicoreDetailAnalyser` is used to analyze execution time according to
all specific AICORE operator. The `AicpuAnalyser` is used to analyze
execution time according to all specific AICPU operator.
The `StepTraceAnalyser` is used to analyze the execution time according to different process.
Examples:
>>> analyser = AnalyserFactory.instance().get_analyser(
......
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""The StepTraceAnalyser analyser class."""
import csv
from mindinsight.profiler.analyser.base_analyser import BaseAnalyser
from mindinsight.profiler.common.exceptions.exceptions import ProfilerParamValueErrorException, \
ProfilerFileNotFoundException, StepNumNotSupportedException
from mindinsight.profiler.common.log import logger as log
from mindinsight.profiler.common.util import query_latest_trace_time_file, get_field_value, \
get_summary_for_step_trace
class StepTraceAnalyser(BaseAnalyser):
"""The analyser for analyzing training steps."""
__col_names__ = []
_attr_ui_name = 'name'
_attr_ui_start = 'start'
_attr_ui_duration = 'duration'
@property
def summary(self):
"""The property of summary info."""
summary = get_summary_for_step_trace(self._data[-1], self.__column__)
summary['total_steps'] = self._size
return summary
def query(self, condition=None):
"""
Query data according to the condition.
Args:
condition (dict): The search condition, only contains `filter_condition` parameter.
Default: None.
Returns:
dict, the result after filtered, sorted and grouped.
"""
if condition is None:
condition = {}
filter_condition = condition.get('filter_condition', {})
self._validate_filter_condition(filter_condition)
self._result = {'size': self._size}
self._filter(filter_condition)
return self._result
def query_for_all_reduce(self):
"""
Query for all reduce info.
Returns:
list[dict], each item is the reduce info for one step, the reduce info is format like:
{stream_id: List[Tuple(start_point, end_point, duration, field_name)]}.
"""
reduce_infos = []
for row_info in self._data[:-1]:
reduce_info = self._get_reduce_time_in_order(row_info, 'systime')
reduce_infos.append(reduce_info)
return reduce_infos
def _load(self):
"""Load data according to the parsed AICORE operator types file."""
file_path = query_latest_trace_time_file(self._profiling_dir, self._device_id)
if not file_path:
log.error("Failed to find parsed trace time file.")
raise ProfilerFileNotFoundException('parsed step trace time file')
with open(file_path, 'r') as handle:
csv_reader = csv.reader(handle)
self.__column__ = next(csv_reader)
self._data = list(csv_reader)
self._size = len(self._data) - 1
self._display_col_names = self.__col_names__[:]
def _filter(self, filter_condition):
"""
Filter the profiling data according to the filter condition.
Args:
filter_condition (dict): The filter condition.
- mode (str): The kind of information. `step` return the info about specific
step. `proc` return the info about specific field in parsed trace file.
- step_id (int): The selected step_id. If not given, it means all steps is required.
If the value is 0, it means average info for all steps except the first is
required.
- proc_name (str): The selected field name.
- time_type (str): The value type. `systime` keeps the original value.
`realtime` transforms the value in millisecond. Default: `realtime`.
"""
mode = filter_condition.get('mode', 'step')
if mode == 'step':
self._get_step_details(step_id=filter_condition.get('step_id'),
time_type=filter_condition.get('time_type', 'realtime'))
else:
self._get_proc_details(step_id=filter_condition.get('step_id'),
proc_name=filter_condition.get('proc_name'),
time_type=filter_condition.get('time_type', 'realtime'))
def _construct_time_point(self, name, start, duration):
"""Construct time point."""
point = {
self._attr_ui_name: name,
self._attr_ui_start: round(start, 4),
self._attr_ui_duration: round(duration, 4)
}
return point
def _get_step_details(self, step_id, time_type='realtime'):
"""
Get step trace info for selected step and save the result.
Args:
step_id (int): The selected step_id. If the value is 0, it means average info
for all steps except the first is required.
time_type (str): The value type. `systime` keeps the original value.
`realtime` transforms the value in millisecond. Default: `realtime`.
"""
if step_id is None:
step_id = 0
row_info = self._data[step_id - 1]
start_point = get_field_value(row_info, 'start_point', self.__column__, time_type)
total = get_field_value(row_info, 'total', self.__column__, time_type)
iteration_interval = get_field_value(row_info, 'iteration_interval', self.__column__,
time_type)
fp_point = get_field_value(row_info, 'fp_point', self.__column__, time_type)
fp_and_bp = get_field_value(row_info, 'fp_and_bp', self.__column__, time_type)
bp_point = get_field_value(row_info, 'bp_point', self.__column__, time_type)
tail = get_field_value(row_info, 'tail', self.__column__, time_type)
# first line only contains total time
first_line = [self._construct_time_point('', 0, total)]
# second line contains iteration_interval, fp_and_bp and tail
second_line = [
self._construct_time_point('', 0, iteration_interval),
self._construct_time_point('fp_and_bp', fp_point - start_point, fp_and_bp),
self._construct_time_point('', bp_point - start_point, tail),
]
# construct reduces lines
reduce_lines = self._construct_reduce_lines(row_info, time_type)
graph = [first_line, second_line]
graph.extend(reduce_lines)
self._result['training_trace_graph'] = graph
def _get_reduce_time_in_order(self, row_info, time_type):
"""Get reduce time in order."""
reduce_info = {}
reduce_fields = [field_name for field_name in self.__column__
if field_name.startswith('stream_') and not field_name.endswith('point')]
for reduce_field in reduce_fields:
cur_stream_id = reduce_field.split('_', 2)[1]
cur_stream = reduce_info.get(cur_stream_id)
if not cur_stream:
cur_stream = []
reduce_info[cur_stream_id] = cur_stream
reduce_start = get_field_value(
row_info, reduce_field + '_start_point', self.__column__, time_type)
reduce_end = get_field_value(
row_info, reduce_field + '_end_point', self.__column__, time_type)
reduce_duration = get_field_value(
row_info, reduce_field, self.__column__, time_type)
cur_stream.append((reduce_start, reduce_end, reduce_duration, reduce_field))
for _, reduce_events in reduce_info.items():
reduce_events.sort(key=lambda elem: elem[1])
return reduce_info
def _construct_reduce_lines(self, row_info, time_type):
"""Contruct first line in detailed graph."""
reduce_lines = []
start_point = get_field_value(row_info, 'start_point', self.__column__, time_type)
fp_point = get_field_value(row_info, 'fp_point', self.__column__, time_type)
end_point = get_field_value(row_info, 'end_point', self.__column__, time_type)
reduce_info = self._get_reduce_time_in_order(row_info, time_type)
# construct time point for each line
for _, reduce_events in reduce_info.items():
current_line = self._construct_reduce_line(
start_point, end_point, fp_point, reduce_events)
reduce_lines.append(current_line)
return reduce_lines
def _construct_reduce_line(self, start_point, end_point, fp_point, reduce_events):
"""Construct list of time points for reduce line."""
current_line = []
previous_start = fp_point
for start, end, duration, field_name in reduce_events:
current_line.extend([
self._construct_time_point(
'', previous_start - start_point, start - previous_start),
self._construct_time_point(
field_name, start - start_point, duration)
])
previous_start = end
current_line.append(self._construct_time_point(
'', previous_start - start_point, end_point - previous_start))
return current_line
def _get_proc_details(self, proc_name, step_id=None, time_type='realtime'):
"""
Get step trace info for selected step and save the result.
Args:
proc_name (str): The selected field name.
step_id (int): The selected step_id. If not given, it means all steps is required.
If the value is 0, it means average info for all steps except the first is
required. Default: None.
time_type (str): The value type. `systime` keeps the original value.
`realtime` transforms the value in millisecond. Default: `realtime`.
"""
if step_id is None:
rows_info = self._data[:-1]
else:
rows_info = [self._data[step_id - 1]]
proc_info = [get_field_value(row_info, proc_name, self.__column__, time_type)
for row_info in rows_info]
self._result['info'] = {proc_name: proc_info}
def _validate_filter_condition(self, filter_condition):
"""Validate step trace filter_condition."""
mode = filter_condition.get('mode', 'step')
self._validate_str_param(mode, ['step', 'proc'], 'mode')
step_id = filter_condition.get('step_id', 0)
self._validate_step_id(step_id)
proc_name = filter_condition.get('proc_name')
self._validate_str_param(proc_name, self.__column__, 'proc_name')
time_type = filter_condition.get('time_type', 'realtime')
self._validate_str_param(time_type, ['realtime', 'systime'], 'time_type')
def _validate_step_id(self, step_id):
"""Validate step_id."""
if isinstance(step_id, int) and 0 <= step_id <= self._size:
return
log.error("Invalid step_id in request. step_id should be in [0, %d].", self._size)
raise StepNumNotSupportedException([0, self._size])
@staticmethod
def _validate_str_param(proc_name, accept_param, error_name=''):
"""Validate proc_name."""
if proc_name is None or isinstance(proc_name, str) and proc_name in accept_param:
return
log.error("Invalid param %s in request. Acceptable value is %s.", error_name, accept_param)
raise ProfilerParamValueErrorException("Invalid proc_name.")
......@@ -16,15 +16,16 @@
import os
import re
def fwrite_format(output_data_path, data_source=None, is_print=False, is_start=False):
"""
Write data to the output file.
Args:
output_data_path(str): the output file path of the data.
data_source(list): the data to write.
is_print(bool): whether to print the data to stdout.
is_start(bool): Whether is the first line of the output file, will remove the old file if True."
output_data_path (str): The output file path of the data.
data_source (list): The data to write.
is_print (bool): whether to print the data to stdout.
is_start (bool): Whether is the first line of the output file, will remove the old file if True."
"""
if is_start is True and os.path.exists(output_data_path):
......@@ -48,16 +49,17 @@ def get_log_slice_id(file_name):
index = re.findall(r'\d+', slice_list[0])
return int(index[0])
def get_file_join_name(input_path, file_name):
"""
Search files under the special path, and will join all the files to one file.
Args:
input_path(str): the source path, will search files under it.
file_name(str): the target of the filename, such as 'hwts.log.data.45.dev'.
input_path (str): The source path, will search files under it.
file_name (str): The target of the filename, such as 'hwts.log.data.45.dev'.
Returns:
str: the join file name.
str, the join file name.
"""
name_list = []
file_join_name = ''
......@@ -90,11 +92,11 @@ def get_file_names(input_path, file_name):
Search files under the special path.
Args:
input_path(str): the souce path, will serch files under it.
file_name(str): the target of the filename, such as 'host_start_log'.
input_path (str): The source path, will search files under it.
file_name (str): The target of the filename, such as 'host_start_log'.
Returns:
list: file name list.
list, file name list.
"""
input_path = os.path.realpath(input_path)
......
......@@ -37,6 +37,8 @@ class ProfilerErrors(ProfilerMgrErrors):
# parser error code
DEVICE_ID_MISMATCH_ERROR = 0 | _PARSER_MASK
RAW_FILE_ERROR = 1 | _PARSER_MASK
STEP_NUM_NOT_SUPPORTED_ERROR = 2 | _PARSER_MASK
JOB_ID_MISMATCH_ERROR = 3 | _PARSER_MASK
# analyser error code
COLUMN_NOT_EXIST_ERROR = 0 | _ANALYSER_MASK
......@@ -49,6 +51,8 @@ class ProfilerErrors(ProfilerMgrErrors):
COLUMN_NOT_SUPPORT_SORT_ERROR = 7 | _ANALYSER_MASK
@unique
class ProfilerErrorMsg(Enum):
"""Profiler error messages."""
......@@ -63,6 +67,9 @@ class ProfilerErrorMsg(Enum):
# parser error msg
DEVICE_ID_MISMATCH_ERROR = 'The device ID mismatch.'
RAW_FILE_ERROR = 'Raw file error. {}'
STEP_NUM_NOT_SUPPORTED_ERROR = 'The step num must be in {}'
JOB_ID_MISMATCH_ERROR = 'The job id in the parameter is not the same as ' \
'in the training trace file. '
# analyser error msg
COLUMN_NOT_EXIST_ERROR = 'The column {} does not exist.'
......
......@@ -192,3 +192,25 @@ class ProfilerColumnNotSupportSortException(MindInsightException):
message=ProfilerErrorMsg.COLUMN_NOT_SUPPORT_SORT_ERROR.value.format(msg),
http_code=400
)
class StepNumNotSupportedException(MindInsightException):
"""The step number error in profiler module."""
def __init__(self, msg):
super(StepNumNotSupportedException, self).__init__(
error=ProfilerErrors.STEP_NUM_NOT_SUPPORTED_ERROR,
message=ProfilerErrorMsg.STEP_NUM_NOT_SUPPORTED_ERROR.value.format(msg),
http_code=400
)
class JobIdMismatchException(MindInsightException):
"""The Job ID mismatch error in profiler module."""
def __init__(self):
super(JobIdMismatchException, self).__init__(
error=ProfilerErrors.JOB_ID_MISMATCH_ERROR,
message=ProfilerErrorMsg.JOB_ID_MISMATCH_ERROR.value,
http_code=400
)
......@@ -19,6 +19,8 @@ This module provides the utils.
"""
import os
from mindinsight.datavisual.utils.tools import to_int
def analyse_device_list_from_profiler_dir(profiler_dir):
"""
......@@ -40,3 +42,99 @@ def analyse_device_list_from_profiler_dir(profiler_dir):
device_id_list.add(device_num)
return list(device_id_list)
def query_latest_trace_time_file(profiler_dir, device_id=0):
"""
Query the latest trace time file.
Args:
profiler_dir (str): The profiler directory.
device_id (int): The id of device.
Returns:
str, the latest trace time file path.
"""
files = os.listdir(profiler_dir)
target_file = f'step_trace_raw_{device_id}_detail_time.csv'
try:
latest_file = max(
filter(
lambda file: file == target_file,
files
),
key=lambda file: os.stat(os.path.join(profiler_dir, file)).st_mtime
)
except ValueError:
return None
return os.path.join(profiler_dir, latest_file)
def query_step_trace_file(profiler_dir):
"""
Query for all step trace file.
Args:
profiler_dir (str): The directory that contains all step trace files.
Returns:
str, the file path of step trace time.
"""
files = os.listdir(profiler_dir)
training_trace_file = list(
filter(
lambda file: file.startswith('training_trace') and not file.endswith('.done'),
files
)
)
if training_trace_file:
return os.path.join(profiler_dir, training_trace_file[0])
return None
def get_summary_for_step_trace(average_info, header):
"""The property of summary info."""
if not average_info or not header:
return {}
total_time = get_field_value(average_info, 'total', header)
iteration_interval = get_field_value(average_info, 'iteration_interval',
header)
fp_and_bp = get_field_value(average_info, 'fp_and_bp', header)
tail = get_field_value(average_info, 'tail', header)
summary = {
'total_time': total_time,
'iteration_interval': calculate_percent(iteration_interval, total_time),
'fp_and_bp': calculate_percent(fp_and_bp, total_time),
'tail': calculate_percent(tail, total_time)
}
return summary
def calculate_percent(partial, total):
"""Calculate percent value."""
percent = round(partial / total * 100, 2)
return f'{percent}%'
def get_field_value(row_info, field_name, header, time_type='realtime'):
"""
Extract basic info through row_info.
Args:
row_info (list): The list of data info in one row.
header (list[str]): The list of field names.
field_name (str): The name in header.
time_type (str): The type of value, `realtime` or `systime`. Default: `realtime`.
Returns:
dict, step trace info in dict format.
"""
# one sys count takes 10 ns, 1 ms has 100000 syscnt
per_ms_syscnt = 100000
field_index = header.index(field_name)
value = row_info[field_index]
value = to_int(value, field_name)
if time_type == 'realtime':
value = value / per_ms_syscnt
return value
......@@ -13,9 +13,14 @@
# limitations under the License.
# ============================================================================
"""Validate the profiler parameters."""
import os
import sys
from mindinsight.datavisual.utils.tools import to_int
from mindinsight.profiler.common.exceptions.exceptions import ProfilerParamTypeErrorException, \
ProfilerDeviceIdException, ProfilerOpTypeException, \
ProfilerSortConditionException, ProfilerFilterConditionException, ProfilerGroupConditionException
ProfilerSortConditionException, ProfilerFilterConditionException, \
ProfilerGroupConditionException, ProfilerParamValueErrorException
from mindinsight.profiler.common.log import logger as log
AICORE_TYPE_COL = ["op_type", "execution_time", "execution_frequency", "precent"]
......@@ -71,6 +76,7 @@ def validate_condition(search_condition):
if "filter_condition" in search_condition:
validate_filter_condition(search_condition)
def validata_group_condition(search_condition):
"""
Verify the group_condition in search_condition is valid or not.
......@@ -185,3 +191,44 @@ def validate_filter_condition(search_condition):
validate_op_filter_condition(op_name_condition)
if "op_type" not in filter_condition and "op_name" not in filter_condition:
raise ProfilerFilterConditionException("The key of filter_condition is not support")
def validate_and_set_job_id_env(job_id_env):
"""
Validate the job id and set it in environment.
Args:
job_id_env (str): The id that to be set in environment parameter `JOB_ID`.
Returns:
int, the valid job id env.
"""
if job_id_env is None:
return job_id_env
# get job_id_env in int type
valid_id = to_int(job_id_env, 'job_id_env')
# check the range of valid_id
if valid_id and 255 < valid_id < sys.maxsize:
os.environ['JOB_ID'] = job_id_env
else:
log.warning("Invalid job_id_env %s. The value should be int and between 255 and %s. Use"
"default job id env instead.",
job_id_env, sys.maxsize)
return valid_id
def validate_ui_proc(proc_name):
"""
Validate proc name in restful request.
Args:
proc_name (str): The proc name to query. Acceptable value is in
[`iteration_interval`, `fp_and_bp`, `tail`].
Raises:
ProfilerParamValueErrorException: If the proc_name is invalid.
"""
accept_names = ['iteration_interval', 'fp_and_bp', 'tail']
if proc_name not in accept_names:
log.error("Invalid proc_name. The proc_name for restful api is in %s", accept_names)
raise ProfilerParamValueErrorException(f'proc_name should be in {accept_names}.')
......@@ -15,8 +15,14 @@
"""Validate the input path."""
import os
from typing import Union, List
from urllib.parse import unquote
from marshmallow import ValidationError
from mindinsight.profiler.common.exceptions.exceptions import \
ProfilerParamValueErrorException
from mindinsight.profiler.common.log import logger as log
def safe_normalize_path(
path,
......@@ -117,3 +123,30 @@ def validate_and_normalize_path(
raise ValidationError({raise_key: {"The path is invalid!"}})
return normalized_path
def validate_and_normalize_profiler_path(path):
"""
Validate and normalize profiler path.
Args:
path (str): The path of summary directory.
Returns:
str, normalized path of profiler directory.
"""
if not path:
raise ProfilerParamValueErrorException('The file dir does not exist.')
try:
unquote_path = unquote(path, errors='strict')
except UnicodeDecodeError:
raise ProfilerParamValueErrorException('Unquote error with strict mode')
profiler_dir = os.path.join(unquote_path, 'profiler')
try:
profiler_dir = validate_and_normalize_path(profiler_dir, 'profiler')
except ValidationError:
log.error('profiler dir <%s> is invalid', unquote_path)
raise ProfilerParamValueErrorException('Profiler dir is invalid.')
return profiler_dir
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""The parser for step trace data."""
import csv
import os
import stat
import struct
from collections import namedtuple
from decimal import Decimal
from mindinsight.profiler.common.exceptions.exceptions import ProfilerPathErrorException, \
JobIdMismatchException
from mindinsight.profiler.common.log import logger as log
from mindinsight.profiler.common.util import get_summary_for_step_trace
from mindinsight.utils.exceptions import MindInsightException
StepTraceStruct = namedtuple(
'TrainingTraceStruct', ['tag_id', 'task_id', 'stream_id', 'sys_count']
)
class StepTraceParser:
"""
The parser for step trace data.
Args:
input_dir (str): The directory that contains original step trace data.
output_file_path (str): The output file path.
skip_first_step (bool): Whether skip the first step or not.
"""
_event_size = 20
def __init__(self, input_dir, output_file_path, job_id, skip_first_step=False):
self._input_dir = input_dir
self._output_path = output_file_path
self._job_id = job_id
self._skip_first_step = skip_first_step
self._result = []
self._header = []
self._step_num = 0
@property
def output_file(self):
"""The property of step trace header."""
file_name = self._output_path.rsplit('/', 2)
return file_name
def show(self):
"""The property of step trace info."""
summary_info = {}
if self._result:
summary_info = get_summary_for_step_trace(self._result[-1], self._header)
summary_info['total_steps'] = len(self._result) - 1
print('\nStep trace summary info (unit: syscnt):')
print(summary_info)
print('\nThe step trace parse result saves under {summary_dir}/%s' % self.output_file)
def parse_and_save(self):
"""Parse step trace files and save the result."""
try:
source_file = self._get_step_trace_file()
self._parse(source_file)
self._save()
except MindInsightException as err:
log.error("Failed to parse and save step trace files.")
log.exception(err)
log.info("Finish to save intermediate result for step trace file.")
def _get_step_trace_file(self):
"""Get step trace file."""
profiling_path = self._input_dir
# validate input_dir
if not os.path.isdir(profiling_path):
raise ProfilerPathErrorException(
'{} does not exist or is not a dir'.format(profiling_path)
)
# get step trace files
files = os.listdir(profiling_path)
step_trace_files = list(
filter(
lambda file: file.startswith('training_trace') and not file.endswith('.done'),
files
)
)
# validate result
if not step_trace_files:
raise ProfilerPathErrorException('training trace file does not exist')
if len(step_trace_files) > 1:
log.warning("Not enable to parse multiple step trace files yet.")
step_trace_file = os.path.join(profiling_path, step_trace_files[0])
return step_trace_file
def _parse(self, source_file):
"""Parse source step trace file."""
log.info("Start to parse step trace file.")
with open(source_file, 'rb') as handler:
content = handler.read()
for step_trace in self._get_next_step_trace(content):
if self._skip_first_step:
self._skip_first_step = False
else:
self._record_trace_event(step_trace)
self._record_average_info()
log.info("Finish to parse step trace file.")
def _get_next_step_trace(self, content):
"""
Get next step trace info.
Args:
content (bytes): The input step trace info
Returns:
Generator, return the step trace one by one.
"""
event_info = {}
for pos in range(0, len(content), 20):
next_event = self._get_trace_struct(content[pos:pos + self._event_size])
self._construct_event_info(next_event, event_info)
if event_info.get('end'):
yield event_info
event_info = {
'start': event_info.get('end'),
'reduce': {}
}
def _get_trace_struct(self, bin_info):
"""Translate event info to StepTraceStruct."""
if len(bin_info) == self._event_size:
parsed_info = struct.unpack('=QHHQ', bin_info)
return StepTraceStruct(*parsed_info)
return None
def _construct_event_info(self, next_event, event_info):
"""Construct event info according to next_event."""
min_job_id = 255
step_flag: bool = lambda tag: tag > min_job_id
end_flag: bool = lambda tag: tag == min_job_id
fp_flag: bool = lambda tag: tag == 1
bp_flag: bool = lambda tag: tag == 2
def _on_step_event():
"""Handle step event."""
self._validate_tag_id(tag_id)
if event_info.get('start'):
event_info['end'] = sys_count
else:
event_info['start'] = sys_count
event_info['reduce'] = {}
def _on_reduce_event():
"""Handle reduce event."""
stream_id = next_event.stream_id
if event_info['reduce'].get(stream_id):
event_info['reduce'][stream_id].append(sys_count)
else:
event_info['reduce'][stream_id] = [sys_count]
tag_id = next_event.tag_id
sys_count = next_event.sys_count
if end_flag(tag_id):
event_info['end'] = sys_count
elif step_flag(tag_id):
_on_step_event()
elif fp_flag(tag_id):
event_info['fp'] = sys_count
elif bp_flag(tag_id):
event_info['bp'] = sys_count
else:
_on_reduce_event()
def _validate_tag_id(self, job_id):
"""Check the job id in source step trace file is same os user set."""
if not self._job_id:
self._job_id = job_id
elif self._job_id != job_id:
raise JobIdMismatchException()
def _record_trace_event(self, step_trace):
"""Record trace event."""
self._step_num += 1
start_time = step_trace.get('start')
end_time = step_trace.get('end')
fp_time = step_trace.get('fp')
bp_time = step_trace.get('bp')
if not (start_time and end_time and fp_time and bp_time):
log.warning("The step %d is missing basic time.", self._step_num)
return
row_data = {
'step_num': self._step_num,
'start_point': start_time,
'end_point': end_time,
'total': end_time - start_time,
'fp_point': fp_time,
'bp_point': bp_time,
'iteration_interval': fp_time - start_time,
'fp_and_bp': bp_time - fp_time,
'tail': end_time - bp_time
}
# update reduce info
self._update_reduce_info(step_trace, row_data)
# save the row data
if not self._header:
self._header = list(row_data.keys())
row_data_list = [row_data[header_name] for header_name in self._header]
self._result.append(row_data_list)
@staticmethod
def _update_reduce_info(step_trace, row_data):
"""Extract reduce info."""
reduce_time = step_trace.get('reduce', {})
for stream_id, time_points in reduce_time.items():
time_point_num = len(time_points)
if time_point_num % 2:
log.warning("Stream %d has %d reduce time points.", stream_id, time_point_num)
continue
for index, point_id in enumerate(range(0, time_point_num, 2)):
field_name = f'stream_{stream_id}_parallel_{index}'
row_data[field_name + '_start_point'] = time_points[point_id]
row_data[field_name + '_end_point'] = time_points[point_id + 1]
row_data[field_name] = time_points[point_id + 1] - time_points[point_id]
def _record_average_info(self):
"""Calculate average info."""
result_size = len(self._result)
if result_size < 2:
return
# calculate average data for each column in result data
average_data = [0] * len(self._header)
for row_info in self._result[1:]:
average_data = [
Decimal(i) + Decimal(j) for i, j in zip(row_info, average_data)
]
average_data = [
round((item / (result_size - 1))) for item in average_data
]
# change step num info in average_data to None
step_num_index = self._header.index('step_num')
average_data[step_num_index] = '-'
self._result.append(average_data)
log.info("Finish add average info for step trace.")
def _save(self):
log.info("Start to save step trace file.")
if not self._header:
return
with open(self._output_path, 'w') as file_handle:
csv_writer = csv.writer(file_handle)
csv_writer.writerow(self._header)
for row_data in self._result:
csv_writer.writerow(row_data)
os.chmod(self._output_path, stat.S_IREAD | stat.S_IWRITE)
......@@ -15,22 +15,26 @@
"""Profiling api file."""
import os
import time
from tabulate import tabulate
from mindinsight.profiler.parser.hwts_log_parser import HWTSLogParser
from mindinsight.profiler.parser.framework_parser import FrameworkParser
from mindinsight.profiler.parser.optime_parser import OPComputeTimeParser
from mindinsight.profiler.parser.aicpu_data_parser import DataPreProcessParser
from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory
from mindinsight.profiler.analyser.integrator import Integrator
from mindinsight.profiler.common._utils import get_file_names, fwrite_format
from mindinsight.profiler.common.validator.validate_path import \
validate_and_normalize_path
from mindinsight.profiler.common.log import logger
from mindinsight.profiler.common.validator.checkparam import \
check_bool, check_subgraph
from mindinsight.profiler.common.log import logger
from mindinsight.profiler.common.validator.validate_path import \
validate_and_normalize_path
from mindinsight.profiler.parser.aicpu_data_parser import DataPreProcessParser
from mindinsight.profiler.parser.framework_parser import FrameworkParser
from mindinsight.profiler.parser.hwts_log_parser import HWTSLogParser
from mindinsight.profiler.parser.optime_parser import OPComputeTimeParser
from mindinsight.profiler.parser.step_trace_parser import StepTraceParser
from mindinsight.utils.exceptions import MindInsightException
PROFILING_LOG_BASE_PATH = "/var/log/npu/profiling"
INIT_OP_NAME = 'Default/InitDataSetQueue'
class Profiler:
......@@ -87,7 +91,7 @@ class Profiler:
if device_target and device_target != "Davinci" \
and device_target != "Ascend":
msg = ("Profiling: unsupport backend: %s" \
% device_target)
% device_target)
raise RuntimeError(msg)
self._dev_id = dev_id
......@@ -120,6 +124,8 @@ class Profiler:
self._detail = check_bool(is_detail, 'is_detail')
self._withfullpath = check_bool(is_show_op_path, 'is_show_op_path')
self._profiling_job_id = job_id
# add job id env through user input later
self._job_id_env = None
self._start_time = int(time.time() * 10000000)
logger.info("Profiling: profiling start time: %d", self._start_time)
......@@ -193,6 +199,35 @@ class Profiler:
except MindInsightException as err:
logger.error(err.message)
# analyse step trace info
self._analyse_step_trace(source_path, framework_parser)
def _analyse_step_trace(self, source_path, framework_parser):
"""
Analyse step trace data and save the result.
Args:
source_path (str): The directory that contains the step trace original data.
framework_parser (str): The framework parse instance.
"""
logger.info("Begin to parse step trace.")
# construct output path
step_trace_intermediate_file_path = os.path.join(
self._output_path,
f'step_trace_raw_{self._dev_id}_detail_time.csv'
)
# whether keep the first step
skip_first_step_flag = framework_parser.check_op_name(INIT_OP_NAME)
# parser the step trace files and save the result to disk
parser = StepTraceParser(input_dir=source_path,
output_file_path=step_trace_intermediate_file_path,
job_id=self._job_id_env,
skip_first_step=skip_first_step_flag)
parser.parse_and_save()
# print parser result
parser.show()
logger.info("Finish save the intermediate result %s", step_trace_intermediate_file_path)
def __del__(self):
"""Disable the profiling collection service, called after training."""
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册