提交 ed9cc505 编写于 作者: Y yuximiao

Add profiler module

上级 180b3029
......@@ -213,6 +213,7 @@ install(
${CMAKE_SOURCE_DIR}/mindspore/common
${CMAKE_SOURCE_DIR}/mindspore/ops
${CMAKE_SOURCE_DIR}/mindspore/communication
${CMAKE_SOURCE_DIR}/mindspore/profiler
DESTINATION ${INSTALL_PY_DIR}
COMPONENT mindspore
)
......
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""
Profiler Module Introduction.
This module provides Python APIs to enable the profiling of MindSpore neural networks.
Users can import the mindspore.profiler.Profiler, initialize the Profiler object to start profiling,
and use Profiler.analyse() to stop profiling and analyse the results.
To visualize the profiling results, users can open mindspore Web, find the corresponding run
and click the profile link.
Now, Profiler supports the AICore operator analysis.
"""
from mindspore.profiler.profiling import Profiler
__all__ = ["Profiler"]
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Profiler error code and messages."""
from enum import unique, Enum
_GENERAL_MASK = 0b00001 << 7
_PARSER_MASK = 0b00010 << 7
_ANALYSER_MASK = 0b00011 << 7
class ProfilerMgrErrors(Enum):
"""Enum definition for profiler errors"""
@unique
class ProfilerErrors(ProfilerMgrErrors):
"""Profiler error codes."""
# general error code
PARAM_VALUE_ERROR = 0 | _GENERAL_MASK
PATH_ERROR = 1 | _GENERAL_MASK
PARAM_TYPE_ERROR = 2 | _GENERAL_MASK
DIR_NOT_FOUND_ERROR = 3 | _GENERAL_MASK
FILE_NOT_FOUND_ERROR = 4 | _GENERAL_MASK
IO_ERROR = 5 | _GENERAL_MASK
# parser error code
DEVICE_ID_MISMATCH_ERROR = 0 | _PARSER_MASK
RAW_FILE_ERROR = 1 | _PARSER_MASK
STEP_NUM_NOT_SUPPORTED_ERROR = 2 | _PARSER_MASK
JOB_ID_MISMATCH_ERROR = 3 | _PARSER_MASK
# analyser error code
COLUMN_NOT_EXIST_ERROR = 0 | _ANALYSER_MASK
ANALYSER_NOT_EXIST_ERROR = 1 | _ANALYSER_MASK
DEVICE_ID_ERROR = 2 | _ANALYSER_MASK
OP_TYPE_ERROR = 3 | _ANALYSER_MASK
GROUP_CONDITION_ERROR = 4 | _ANALYSER_MASK
SORT_CONDITION_ERROR = 5 | _ANALYSER_MASK
FILTER_CONDITION_ERROR = 6 | _ANALYSER_MASK
COLUMN_NOT_SUPPORT_SORT_ERROR = 7 | _ANALYSER_MASK
PIPELINE_OP_NOT_EXIST_ERROR = 8 | _ANALYSER_MASK
@unique
class ProfilerErrorMsg(Enum):
"""Profiler error messages."""
# general error msg
PARAM_VALUE_ERROR = 'Param value error. {}'
PATH_ERROR = 'Path error. {}'
PARAM_TYPE_ERROR = 'Param type error. {}'
DIR_NOT_FOUND_ERROR = 'The dir <{}> not found.'
FILE_NOT_FOUND_ERROR = 'The file <{}> not found.'
IO_ERROR = 'Read or write file fail.'
# parser error msg
DEVICE_ID_MISMATCH_ERROR = 'The device ID mismatch.'
RAW_FILE_ERROR = 'Raw file error. {}'
STEP_NUM_NOT_SUPPORTED_ERROR = 'The step num must be in {}'
JOB_ID_MISMATCH_ERROR = 'The job id in the parameter is not the same as ' \
'in the training trace file. '
# analyser error msg
COLUMN_NOT_EXIST_ERROR = 'The column {} does not exist.'
ANALYSER_NOT_EXIST_ERROR = 'The analyser {} does not exist.'
DEIVICE_ID_ERROR = 'The device_id in search_condition error, {}'
FILTER_CONDITION_ERROR = 'The filter_condition in search_condition error, {}'
OP_TYPE_ERROR = 'The op_type in search_condition error, {}'
GROUP_CONDITION_ERROR = 'The group_condition in search_condition error, {}'
SORT_CONDITION_ERROR = 'The sort_condition in search_condition error, {}'
COLUMN_NOT_SUPPORT_SORT_ERROR = 'The column {} does not support to sort.'
PIPELINE_OP_NOT_EXIST_ERROR = 'The minddata pipeline operator {} does not exist.'
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Definition of error code and relative messages in profiler module."""
from mindspore.profiler.common.exceptions.error_code import ProfilerErrors, \
ProfilerErrorMsg
class ProfilerException(Exception):
"""
Base class for Profilier exception.
Examples:
>>> raise ProfilerException(GeneralErrors.PATH_NOT_EXISTS_ERROR, 'path not exists')
"""
RUNTIME = 1
TYPE = 1
LEVEL = 0
SYSID = 42
def __init__(self, error, message, http_code=500):
"""
Initialization of ProfilerException.
Args:
error (Enum): Error value for specified case.
message (str): Description for exception.
http_code (int): Http code for exception. Default is 500.
"""
if isinstance(message, str):
message = ' '.join(message.split())
super(ProfilerException, self).__init__(message)
self.error = error
self.message = message
self.http_code = http_code
@property
def error_code(self):
"""
Transform exception no to Profiler error code.
code compose(4bytes):
runtime 2bits, type 2bits, level 3bits, sysid 8bits, modid 5bits, value 12bits.
num = ((0xFF & runtime) << 30) \
| ((0xFF & type) << 28) \
| ((0xFF & level) << 25) \
| ((0xFF & sysid) << 17) \
| ((0xFF & modid) << 12) \
| (0x0FFF & value)
Returns:
str, Hex string representing the composed Profiler error code.
"""
num = (((0xFF & self.RUNTIME) << 30)
| ((0xFF & self.TYPE) << 28)
| ((0xFF & self.LEVEL) << 25)
| ((0xFF & self.SYSID) << 17)
| ((0xFF & 6) << 12)
| (0x0FFF & self.error.value))
return hex(num)[2:].zfill(8).upper()
def __str__(self):
return '[{}] code: {}, msg: {}'.format(self.__class__.__name__, self.error_code, self.message)
class ProfilerParamValueErrorException(ProfilerException):
"""The parameter value error in profiler module."""
def __init__(self, msg):
super(ProfilerParamValueErrorException, self).__init__(
error=ProfilerErrors.PARAM_VALUE_ERROR,
message=ProfilerErrorMsg.PARAM_VALUE_ERROR.value.format(msg),
http_code=400
)
class ProfilerPathErrorException(ProfilerException):
"""The path error in profiler module."""
def __init__(self, msg):
super(ProfilerPathErrorException, self).__init__(
error=ProfilerErrors.PATH_ERROR,
message=ProfilerErrorMsg.PATH_ERROR.value.format(msg),
http_code=400
)
class ProfilerParamTypeErrorException(ProfilerException):
"""The parameter type error in profiler module."""
def __init__(self, msg):
super(ProfilerParamTypeErrorException, self).__init__(
error=ProfilerErrors.PARAM_TYPE_ERROR,
message=ProfilerErrorMsg.PARAM_TYPE_ERROR.value.format(msg),
http_code=400
)
class ProfilerDirNotFoundException(ProfilerException):
"""The dir not found exception in profiler module."""
def __init__(self, msg):
super(ProfilerDirNotFoundException, self).__init__(
error=ProfilerErrors.DIR_NOT_FOUND_ERROR,
message=ProfilerErrorMsg.DIR_NOT_FOUND_ERROR.value.format(msg),
http_code=400
)
class ProfilerFileNotFoundException(ProfilerException):
"""The file not found exception in profiler module."""
def __init__(self, msg):
super(ProfilerFileNotFoundException, self).__init__(
error=ProfilerErrors.FILE_NOT_FOUND_ERROR,
message=ProfilerErrorMsg.FILE_NOT_FOUND_ERROR.value.format(msg),
http_code=400
)
class ProfilerIOException(ProfilerException):
"""The IO exception in profiler module."""
def __init__(self):
super(ProfilerIOException, self).__init__(
error=ProfilerErrors.IO_ERROR,
message=ProfilerErrorMsg.IO_ERROR.value,
http_code=400
)
class ProfilerDeviceIdMismatchException(ProfilerException):
"""The device id mismatch exception in profiler module."""
def __init__(self):
super(ProfilerDeviceIdMismatchException, self).__init__(
error=ProfilerErrors.DEVICE_ID_MISMATCH_ERROR,
message=ProfilerErrorMsg.DEVICE_ID_MISMATCH_ERROR.value,
http_code=400
)
class ProfilerRawFileException(ProfilerException):
"""The raw file exception in profiler module."""
def __init__(self, msg):
super(ProfilerRawFileException, self).__init__(
error=ProfilerErrors.RAW_FILE_ERROR,
message=ProfilerErrorMsg.RAW_FILE_ERROR.value.format(msg),
http_code=400
)
class ProfilerColumnNotExistException(ProfilerException):
"""The column does not exist exception in profiler module."""
def __init__(self, msg):
super(ProfilerColumnNotExistException, self).__init__(
error=ProfilerErrors.COLUMN_NOT_EXIST_ERROR,
message=ProfilerErrorMsg.COLUMN_NOT_EXIST_ERROR.value.format(msg),
http_code=400
)
class ProfilerAnalyserNotExistException(ProfilerException):
"""The analyser in profiler module."""
def __init__(self, msg):
super(ProfilerAnalyserNotExistException, self).__init__(
error=ProfilerErrors.ANALYSER_NOT_EXIST_ERROR,
message=ProfilerErrorMsg.ANALYSER_NOT_EXIST_ERROR.value.format(msg),
http_code=400
)
class ProfilerDeviceIdException(ProfilerException):
"""The parameter device_id error in profiler module."""
def __init__(self, msg):
super(ProfilerDeviceIdException, self).__init__(
error=ProfilerErrors.DEVICE_ID_ERROR,
message=ProfilerErrorMsg.DEIVICE_ID_ERROR.value.format(msg),
http_code=400
)
class ProfilerOpTypeException(ProfilerException):
"""The parameter op_type error in profiler module."""
def __init__(self, msg):
super(ProfilerOpTypeException, self).__init__(
error=ProfilerErrors.OP_TYPE_ERROR,
message=ProfilerErrorMsg.OP_TYPE_ERROR.value.format(msg),
http_code=400
)
class ProfilerSortConditionException(ProfilerException):
"""The parameter sort_condition error in profiler module."""
def __init__(self, msg):
super(ProfilerSortConditionException, self).__init__(
error=ProfilerErrors.SORT_CONDITION_ERROR,
message=ProfilerErrorMsg.SORT_CONDITION_ERROR.value.format(msg),
http_code=400
)
class ProfilerFilterConditionException(ProfilerException):
"""The parameter filer_condition error in profiler module."""
def __init__(self, msg):
super(ProfilerFilterConditionException, self).__init__(
error=ProfilerErrors.FILTER_CONDITION_ERROR,
message=ProfilerErrorMsg.FILTER_CONDITION_ERROR.value.format(msg),
http_code=400
)
class ProfilerGroupConditionException(ProfilerException):
"""The parameter group_condition error in profiler module."""
def __init__(self, msg):
super(ProfilerGroupConditionException, self).__init__(
error=ProfilerErrors.GROUP_CONDITION_ERROR,
message=ProfilerErrorMsg.GROUP_CONDITION_ERROR.value.format(msg),
http_code=400
)
class ProfilerColumnNotSupportSortException(ProfilerException):
"""The column does not support to sort error in profiler module."""
def __init__(self, msg):
super(ProfilerColumnNotSupportSortException, self).__init__(
error=ProfilerErrors.COLUMN_NOT_SUPPORT_SORT_ERROR,
message=ProfilerErrorMsg.COLUMN_NOT_SUPPORT_SORT_ERROR.value.format(msg),
http_code=400
)
class StepNumNotSupportedException(ProfilerException):
"""The step number error in profiler module."""
def __init__(self, msg):
super(StepNumNotSupportedException, self).__init__(
error=ProfilerErrors.STEP_NUM_NOT_SUPPORTED_ERROR,
message=ProfilerErrorMsg.STEP_NUM_NOT_SUPPORTED_ERROR.value.format(msg),
http_code=400
)
class JobIdMismatchException(ProfilerException):
"""The Job ID mismatch error in profiler module."""
def __init__(self):
super(JobIdMismatchException, self).__init__(
error=ProfilerErrors.JOB_ID_MISMATCH_ERROR,
message=ProfilerErrorMsg.JOB_ID_MISMATCH_ERROR.value,
http_code=400
)
class ProfilerPipelineOpNotExistException(ProfilerException):
"""The minddata pipeline operator does not exist error in profiler module."""
def __init__(self, msg):
super(ProfilerPipelineOpNotExistException, self).__init__(
error=ProfilerErrors.PIPELINE_OP_NOT_EXIST_ERROR,
message=ProfilerErrorMsg.PIPELINE_OP_NOT_EXIST_ERROR.value.format(msg),
http_code=400
)
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""
Profiler util.
This module provides the utils.
"""
import os
# one sys count takes 10 ns, 1 ms has 100000 system count
import re
PER_MS_SYSCNT = 100000
def to_int(param, param_name):
"""
Transfer param to int type.
Args:
param (Any): A param transformed.
param_name (str): Param name.
Returns:
int, value after transformed.
"""
try:
param = int(param)
except ValueError:
raise TypeError('Must be Integer: ' + param_name)
return param
def fwrite_format(output_data_path, data_source=None, is_print=False, is_start=False):
"""
Write data to the output file.
Args:
output_data_path (str): The output file path of the data.
data_source (str, list, tuple): The data to write.
is_print (bool): whether to print the data to stdout.
is_start (bool): Whether is the first line of the output file, will remove the old file if True."
"""
if is_start is True and os.path.exists(output_data_path):
os.remove(output_data_path)
if isinstance(data_source, str) and data_source.startswith("title:"):
title_label = '=' * 20
data_source = title_label + data_source[6:] + title_label
with open(output_data_path, 'a+') as f:
if isinstance(data_source, (list, tuple)):
for raw_data in data_source:
if isinstance(raw_data, (list, tuple)):
raw_data = map(str, raw_data)
raw_data = " ".join(raw_data)
f.write(raw_data)
f.write("\n")
else:
f.write(data_source)
f.write("\n")
if is_print:
if isinstance(data_source, (list, tuple)):
for raw_data in data_source:
if isinstance(raw_data, (list, tuple)):
raw_data = map(str, raw_data)
raw_data = " ".join(raw_data)
print(raw_data)
else:
print(data_source)
def get_log_slice_id(file_name):
pattern = re.compile(r'(?<=slice_)\d+')
slice_list = pattern.findall(file_name)
index = re.findall(r'\d+', slice_list[0])
return int(index[0])
def get_file_join_name(input_path, file_name):
"""
Search files under the special path, and will join all the files to one file.
Args:
input_path (str): The source path, will search files under it.
file_name (str): The target of the filename, such as 'hwts.log.data.45.dev'.
Returns:
str, the join file name.
"""
name_list = []
file_join_name = ''
input_path = os.path.realpath(input_path)
if os.path.exists(input_path):
files = os.listdir(input_path)
for f in files:
if file_name in f and not f.endswith('.done') and not f.endswith('.join') \
and not f.endswith('.zip'):
name_list.append(f)
# resort name_list
name_list.sort(key=get_log_slice_id)
if len(name_list) == 1:
file_join_name = os.path.join(input_path, name_list[0])
elif len(name_list) > 1:
file_join_name = os.path.join(input_path, '%s.join' % file_name)
if os.path.exists(file_join_name):
os.remove(file_join_name)
with open(file_join_name, 'ab') as bin_data:
for i in name_list:
file = input_path + os.sep + i
with open(file, 'rb') as txt:
bin_data.write(txt.read())
return file_join_name
def get_file_names(input_path, file_name):
"""
Search files under the special path.
Args:
input_path (str): The source path, will search files under it.
file_name (str): The target of the filename, such as 'host_start_log'.
Returns:
list, file name list.
"""
input_path = os.path.realpath(input_path)
name_list = []
if os.path.exists(input_path):
files = os.listdir(input_path)
for f in files:
if file_name in f and not f.endswith('.done') \
and not f.endswith('.zip'):
name_list.append(f)
break
return name_list
def analyse_device_list_from_profiler_dir(profiler_dir):
"""
Analyse device list from profiler dir.
Args:
profiler_dir (str): The profiler data dir.
Returns:
list, the device_id list.
"""
profiler_file_prefix = ["timeline_display", "output_op_compute_time"]
device_id_list = set()
for _, _, filenames in os.walk(profiler_dir):
for filename in filenames:
if filename.startswith("step_trace_raw"):
items = filename.split("_")
device_num = ""
if len(items) > 3:
device_num = items[3]
else:
items = filename.split("_")
device_num = items[-1].split(".")[0] if items[-1].split(".") else ""
if device_num.isdigit() and '_'.join(items[:-1]) in profiler_file_prefix:
device_id_list.add(device_num)
return sorted(list(device_id_list))
def query_latest_trace_time_file(profiler_dir, device_id=0):
"""
Query the latest trace time file.
Args:
profiler_dir (str): The profiler directory.
device_id (int): The id of device.
Returns:
str, the latest trace time file path.
"""
files = os.listdir(profiler_dir)
target_file = f'step_trace_raw_{device_id}_detail_time.csv'
try:
latest_file = max(
filter(
lambda file: file == target_file,
files
),
key=lambda file: os.stat(os.path.join(profiler_dir, file)).st_mtime
)
except ValueError:
return None
return os.path.join(profiler_dir, latest_file)
def query_step_trace_file(profiler_dir):
"""
Query for all step trace file.
Args:
profiler_dir (str): The directory that contains all step trace files.
Returns:
str, the file path of step trace time.
"""
files = os.listdir(profiler_dir)
training_trace_file = list(
filter(
lambda file: file.startswith('training_trace') and not file.endswith('.done'),
files
)
)
if training_trace_file:
return os.path.join(profiler_dir, training_trace_file[0])
return None
def get_summary_for_step_trace(average_info, header):
"""The property of summary info."""
if not average_info or not header:
return {}
total_time = get_field_value(average_info, 'total', header)
iteration_interval = get_field_value(average_info, 'iteration_interval',
header)
fp_and_bp = get_field_value(average_info, 'fp_and_bp', header)
tail = get_field_value(average_info, 'tail', header)
summary = {
'total_time': total_time,
'iteration_interval': iteration_interval,
'iteration_interval_percent': calculate_percent(iteration_interval, total_time),
'fp_and_bp': fp_and_bp,
'fp_and_bp_percent': calculate_percent(fp_and_bp, total_time),
'tail': tail,
'tail_percent': calculate_percent(tail, total_time)
}
return summary
def calculate_percent(partial, total):
"""Calculate percent value."""
if total:
percent = round(partial / total * 100, 2)
else:
percent = 0
return f'{percent}%'
def to_millisecond(sys_count, limit=4):
"""Translate system count to millisecond."""
return round(sys_count / PER_MS_SYSCNT, limit)
def get_field_value(row_info, field_name, header, time_type='realtime'):
"""
Extract basic info through row_info.
Args:
row_info (list): The list of data info in one row.
field_name (str): The name in header.
header (list[str]): The list of field names.
time_type (str): The type of value, `realtime` or `systime`. Default: `realtime`.
Returns:
dict, step trace info in dict format.
"""
field_index = header.index(field_name)
value = row_info[field_index]
value = to_int(value, field_name)
if time_type == 'realtime':
value = to_millisecond(value)
return value
def get_options(options):
if options is None:
options = {}
return options
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Profiler check parameters."""
def check_bool(input_param, param_name):
"""Bool type judgment."""
if isinstance(input_param, bool):
return input_param
raise TypeError("Parameter {}: input type must be bool!".format(param_name))
def check_subgraph(subgraph):
"""Check subgraph."""
if subgraph in ("all", "Default", "Gradients"):
return subgraph
raise ValueError("subgraph must be all or Default or Gradients, but got {}.".format(subgraph))
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Validate the profiler parameters."""
import os
import sys
from mindspore.profiler.common.exceptions.exceptions import ProfilerParamTypeErrorException, \
ProfilerDeviceIdException, ProfilerOpTypeException, \
ProfilerSortConditionException, ProfilerFilterConditionException, \
ProfilerGroupConditionException, ProfilerParamValueErrorException
from mindspore import log
from mindspore.profiler.common.util import to_int
AICORE_TYPE_COL = ["op_type", "execution_time", "execution_frequency", "precent"]
AICORE_DETAIL_COL = ["op_name", "op_type", "avg_execution_time", "subgraph", "full_op_name"]
AICPU_COL = ["serial_number", "op_type", "total_time", "dispatch_time", "run_start",
"run_end"]
MINDDATA_PIPELINE_COL = [
'op_id', 'op_type', 'num_workers', 'output_queue_average_size',
'output_queue_length', 'output_queue_usage_rate', 'sample_interval',
'parent_id'
]
def validate_condition(search_condition):
"""
Verify the param in search_condition is valid or not.
Args:
search_condition (dict): The search condition.
Raises:
ProfilerParamTypeErrorException: If the type of the param in search_condition is invalid.
ProfilerDeviceIdException: If the device_id param in search_condition is invalid.
ProfilerOpTypeException: If the op_type param in search_condition is invalid.
ProfilerGroupConditionException: If the group_condition param in search_condition is invalid.
ProfilerSortConditionException: If the sort_condition param in search_condition is invalid.
ProfilerFilterConditionException: If the filter_condition param in search_condition is invalid.
"""
if not isinstance(search_condition, dict):
log.error("Invalid search_condition type, it should be dict.")
raise ProfilerParamTypeErrorException(
"Invalid search_condition type, it should be dict.")
if "device_id" in search_condition:
device_id = search_condition.get("device_id")
if not isinstance(device_id, str):
raise ProfilerDeviceIdException("Invalid device_id type, it should be str.")
if "op_type" in search_condition:
op_type = search_condition.get("op_type")
if op_type == "aicpu":
search_scope = AICPU_COL
elif op_type == "aicore_type":
search_scope = AICORE_TYPE_COL
elif op_type == "aicore_detail":
search_scope = AICORE_DETAIL_COL
else:
raise ProfilerOpTypeException("The op_type must in ['aicpu', 'aicore_type', 'aicore_detail']")
else:
raise ProfilerOpTypeException("The op_type must in ['aicpu', 'aicore_type', 'aicore_detail']")
if "group_condition" in search_condition:
validate_group_condition(search_condition)
if "sort_condition" in search_condition:
validate_sort_condition(search_condition, search_scope)
if "filter_condition" in search_condition:
validate_filter_condition(search_condition)
def validate_group_condition(search_condition):
"""
Verify the group_condition in search_condition is valid or not.
Args:
search_condition (dict): The search condition.
Raises:
ProfilerGroupConditionException: If the group_condition param in search_condition is invalid.
"""
group_condition = search_condition.get("group_condition")
if not isinstance(group_condition, dict):
raise ProfilerGroupConditionException("The group condition must be dict.")
if "limit" in group_condition:
limit = group_condition.get("limit", 10)
if isinstance(limit, bool) \
or not isinstance(group_condition.get("limit"), int):
log.error("The limit must be int.")
raise ProfilerGroupConditionException("The limit must be int.")
if limit < 1 or limit > 100:
raise ProfilerGroupConditionException("The limit must in [1, 100].")
if "offset" in group_condition:
offset = group_condition.get("offset", 0)
if isinstance(offset, bool) \
or not isinstance(group_condition.get("offset"), int):
log.error("The offset must be int.")
raise ProfilerGroupConditionException("The offset must be int.")
if offset < 0:
raise ProfilerGroupConditionException("The offset must ge 0.")
if offset > 1000000:
raise ProfilerGroupConditionException("The offset must le 1000000.")
def validate_sort_condition(search_condition, search_scope):
"""
Verify the sort_condition in search_condition is valid or not.
Args:
search_condition (dict): The search condition.
search_scope (list): The search scope.
Raises:
ProfilerSortConditionException: If the sort_condition param in search_condition is invalid.
"""
sort_condition = search_condition.get("sort_condition")
if not isinstance(sort_condition, dict):
raise ProfilerSortConditionException("The sort condition must be dict.")
if "name" in sort_condition:
sorted_name = sort_condition.get("name", "")
err_msg = "The sorted_name must be in {}".format(search_scope)
if not isinstance(sorted_name, str):
log.error("Wrong sorted name type.")
raise ProfilerSortConditionException("Wrong sorted name type.")
if sorted_name not in search_scope:
log.error(err_msg)
raise ProfilerSortConditionException(err_msg)
if "type" in sort_condition:
sorted_type_param = ['ascending', 'descending']
sorted_type = sort_condition.get("type")
if sorted_type and sorted_type not in sorted_type_param:
err_msg = "The sorted type must be ascending or descending."
log.error(err_msg)
raise ProfilerSortConditionException(err_msg)
def validate_op_filter_condition(op_condition, value_type=str, value_type_msg='str'):
"""
Verify the op_condition in filter_condition is valid or not.
Args:
op_condition (dict): The op_condition in search_condition.
value_type (type): The value type. Default: str.
value_type_msg (str): The value type message. Default: 'str'.
Raises:
ProfilerFilterConditionException: If the filter_condition param in search_condition is invalid.
"""
filter_key = ["in", "not_in", "partial_match_str_in"]
if not isinstance(op_condition, dict):
raise ProfilerFilterConditionException("The filter condition value must be dict.")
for key, value in op_condition.items():
if not isinstance(key, str):
raise ProfilerFilterConditionException("The filter key must be str")
if not isinstance(value, list):
raise ProfilerFilterConditionException("The filter value must be list")
if key not in filter_key:
raise ProfilerFilterConditionException("The filter key must in {}.".format(filter_key))
for item in value:
if not isinstance(item, value_type):
raise ProfilerFilterConditionException(
"The item in filter value must be {}.".format(value_type_msg)
)
def validate_filter_condition(search_condition):
"""
Verify the filter_condition in search_condition is valid or not.
Args:
search_condition (dict): The search condition.
Raises:
ProfilerFilterConditionException: If the filter_condition param in search_condition is invalid.
"""
filter_condition = search_condition.get("filter_condition")
if not isinstance(filter_condition, dict):
raise ProfilerFilterConditionException("The filter condition must be dict.")
if filter_condition:
if "op_type" in filter_condition:
op_type_condition = filter_condition.get("op_type")
validate_op_filter_condition(op_type_condition)
if "op_name" in filter_condition:
op_name_condition = filter_condition.get("op_name")
validate_op_filter_condition(op_name_condition)
if "op_type" not in filter_condition and "op_name" not in filter_condition:
raise ProfilerFilterConditionException("The key of filter_condition is not support")
def validate_and_set_job_id_env(job_id_env):
"""
Validate the job id and set it in environment.
Args:
job_id_env (str): The id that to be set in environment parameter `JOB_ID`.
Returns:
int, the valid job id env.
"""
if job_id_env is None:
return job_id_env
# get job_id_env in int type
valid_id = to_int(job_id_env, 'job_id_env')
# check the range of valid_id
if valid_id and 255 < valid_id < sys.maxsize:
os.environ['JOB_ID'] = job_id_env
else:
log.warning("Invalid job_id_env %s. The value should be int and between 255 and %s. Use"
"default job id env instead.",
job_id_env, sys.maxsize)
return valid_id
def validate_ui_proc(proc_name):
"""
Validate proc name in restful request.
Args:
proc_name (str): The proc name to query. Acceptable value is in
[`iteration_interval`, `fp_and_bp`, `tail`].
Raises:
ProfilerParamValueErrorException: If the proc_name is invalid.
"""
accept_names = ['iteration_interval', 'fp_and_bp', 'tail']
if proc_name not in accept_names:
log.error("Invalid proc_name. The proc_name for restful api is in %s", accept_names)
raise ProfilerParamValueErrorException(f'proc_name should be in {accept_names}.')
def validate_minddata_pipeline_condition(condition):
"""
Verify the minddata pipeline search condition is valid or not.
Args:
condition (dict): The minddata pipeline search condition.
Raises:
ProfilerParamTypeErrorException: If the type of the search condition is
invalid.
ProfilerDeviceIdException: If the device_id param in the search
condition is invalid.
ProfilerGroupConditionException: If the group_condition param in the
search condition is invalid.
ProfilerSortConditionException: If the sort_condition param in the
search condition is invalid.
ProfilerFilterConditionException: If the filter_condition param in the
search condition is invalid.
"""
if not isinstance(condition, dict):
log.error("Invalid condition type, it should be dict.")
raise ProfilerParamTypeErrorException(
"Invalid condition type, it should be dict."
)
if "device_id" in condition:
device_id = condition.get("device_id")
if not isinstance(device_id, str):
raise ProfilerDeviceIdException(
"Invalid device_id type, it should be str."
)
if "group_condition" in condition:
validate_group_condition(condition)
if "sort_condition" in condition:
validate_sort_condition(condition, MINDDATA_PIPELINE_COL)
if "filter_condition" in condition:
filter_condition = condition.get('filter_condition')
if not isinstance(filter_condition, dict):
raise ProfilerFilterConditionException(
"The filter condition must be dict."
)
for key, value in filter_condition.items():
if key == 'op_id':
validate_op_filter_condition(
value, value_type=int, value_type_msg='int'
)
elif key == 'op_type':
validate_op_filter_condition(value)
elif key == 'is_display_op_detail':
if not isinstance(value, bool):
raise ProfilerFilterConditionException(
"The condition must be bool."
)
else:
raise ProfilerFilterConditionException(
"The key {} of filter_condition is not support.".format(key)
)
# Copyright 2019 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Validate the input path."""
import os
def validate_and_normalize_path(
path,
check_absolute_path=False,
allow_parent_dir=False,
):
"""
Validates path and returns its normalized form.
If path has a valid scheme, treat path as url, otherwise consider path a
unix local path.
Note:
File scheme (rfc8089) is currently not supported.
Args:
path (str): Path to be normalized.
check_absolute_path (bool): Whether check path scheme is supported.
allow_parent_dir (bool): Whether allow parent dir in path.
Returns:
str, normalized path.
"""
if not path:
raise RuntimeError("The path is invalid!")
path_str = str(path)
if not allow_parent_dir:
path_components = path_str.split("/")
if ".." in path_components:
raise RuntimeError("The path is invalid!")
# path does not have valid schema, treat it as unix local path.
if check_absolute_path:
if not path_str.startswith("/"):
raise RuntimeError("The path is invalid!")
try:
# most unix systems allow
normalized_path = os.path.realpath(path)
except ValueError:
raise RuntimeError("The path is invalid!")
return normalized_path
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""
The parser for AI CPU preprocess data.
"""
import os
from mindspore.profiler.common.util import fwrite_format, get_file_join_name
from mindspore import log as logger
class DataPreProcessParser:
"""
The Parser for AI CPU preprocess data.
Args:
input_path(str): The profiling job path.
output_filename(str): The output data path and name.
"""
_source_file_target = 'DATA_PREPROCESS.dev.AICPU.'
_dst_file_title = 'title:DATA_PREPROCESS AICPU'
_dst_file_column_title = ['serial_number', 'node_type_name', 'total_time(ms)',
'dispatch_time(ms)', 'run_start', 'run_end']
_ms_unit = 1000
def __init__(self, input_path, output_filename):
self._input_path = input_path
self._output_filename = output_filename
self._source_file_name = self._get_source_file()
self._ms_kernel_flag = 3
self._other_kernel_flag = 6
self._thread_flag = 7
self._ms_kernel_run_end_index = 2
self._other_kernel_run_end_index = 5
self._result_list = []
self._min_cycle_counter = float('inf')
def _get_source_file(self):
"""Get log file name, which was created by ada service."""
file_name = get_file_join_name(self._input_path, self._source_file_target)
if not file_name:
data_path = os.path.join(self._input_path, "data")
file_name = get_file_join_name(data_path, self._source_file_target)
return file_name
def _get_kernel_result(self, number, node_list, thread_list):
"""Get the profiling data form different aicpu kernel"""
try:
if len(node_list) == self._ms_kernel_flag and len(thread_list) == self._thread_flag:
node_type_name = node_list[0].split(':')[-1]
run_end_index = self._ms_kernel_run_end_index
elif len(node_list) == self._other_kernel_flag and len(thread_list) == self._thread_flag:
node_type_name = node_list[0].split(':')[-1].split('/')[-1].split('-')[0]
run_end_index = self._other_kernel_run_end_index
else:
logger.warning("the data format can't support 'node_list':%s", str(node_list))
return None
run_start = node_list[1].split(':')[-1].split(' ')[0]
run_end = node_list[run_end_index].split(':')[-1].split(' ')[0]
total_time = float(thread_list[-1].split('=')[-1].split()[0]) / self._ms_unit
dispatch_time = float(thread_list[-2].split('=')[-1].split()[0]) / self._ms_unit
return [number, node_type_name, total_time, dispatch_time,
run_start, run_end]
except IndexError as e:
logger.error(e)
return None
def execute(self):
"""Execute the parser, get result data, and write it to the output file."""
if not os.path.exists(self._source_file_name):
logger.info("Did not find the aicpu profiling source file")
return
with open(self._source_file_name, 'rb') as ai_cpu_data:
ai_cpu_str = str(ai_cpu_data.read().replace(b'\n\x00', b' ___ ')
.replace(b'\x00', b' ___ '))[2:-1]
ai_cpu_lines = ai_cpu_str.split(" ___ ")
result_list = list()
ai_cpu_total_time_summary = 0
# Node serial number.
serial_number = 1
for i in range(len(ai_cpu_lines) - 1):
node_line = ai_cpu_lines[i]
thread_line = ai_cpu_lines[i + 1]
if "Node" in node_line and "Thread" in thread_line:
# Get the node data from node_line
node_list = node_line.split(',')
thread_list = thread_line.split(',')
result = self._get_kernel_result(serial_number, node_list, thread_list)
if result is None:
continue
result_list.append(result)
# Calculate the total time.
total_time = result[2]
ai_cpu_total_time_summary += total_time
# Increase node serial number.
serial_number += 1
elif "Node" in node_line and "Thread" not in thread_line:
node_type_name = node_line.split(',')[0].split(':')[-1]
logger.warning("The node type:%s cannot find thread data", node_type_name)
if result_list:
ai_cpu_total_time = format(ai_cpu_total_time_summary, '.6f')
result_list.append(["AI CPU Total Time(ms):", ai_cpu_total_time])
fwrite_format(self._output_filename, " ".join(self._dst_file_column_title), is_start=True, is_print=True)
fwrite_format(self._output_filename, result_list, is_print=True)
# For timeline display.
self._result_list = result_list
def query_aicpu_data(self):
"""
Get execution time of AI CPU operator.
Returns:
a dict, the metadata of AI CPU operator execution time.
"""
stream_id = 0 # Default stream id for AI CPU.
pid = 9000 # Default pid for AI CPU.
factor = 1000 # Convert time unit from 1us to 1ms
total_time = 0
min_cycle_counter = float('inf')
aicpu_info = []
op_count_list = []
for aicpu_item in self._result_list:
if "AI CPU Total Time(ms):" in aicpu_item:
total_time = aicpu_item[-1]
continue
op_name = aicpu_item[1]
start_time = float(aicpu_item[4]) / factor
min_cycle_counter = min(min_cycle_counter, start_time)
end_time = float(aicpu_item[5]) / factor
duration = end_time - start_time
aicpu_info.append([op_name, stream_id, start_time, duration, pid])
# Record the number of operator types.
if op_name not in op_count_list:
op_count_list.append(op_name)
self._min_cycle_counter = min_cycle_counter
aicpu_dict = {
'info': aicpu_info,
'total_time': float(total_time),
'op_exe_times': len(aicpu_info),
'num_of_ops': len(op_count_list),
'num_of_streams': 1
}
return aicpu_dict
@property
def min_cycle_counter(self):
"""Get minimum cycle counter in AI CPU."""
return self._min_cycle_counter
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""The container of metadata used in profiler parser."""
class HWTSContainer:
"""
HWTS output container.
Args:
split_list (list): The split list of metadata in HWTS output file.
"""
def __init__(self, split_list):
self._op_name = ''
self._duration = None
self._status = split_list[0]
self._task_id = split_list[6]
self._cycle_counter = float(split_list[7])
self._stream_id = split_list[8]
@property
def status(self):
"""Get the status of the operator, i.e. Start or End."""
return self._status
@property
def task_id(self):
"""Get the task id of the operator."""
return self._task_id
@property
def cycle_counter(self):
"""Get the cycle counter."""
return self._cycle_counter
@property
def stream_id(self):
"""Get the stream id of the operator."""
return self._stream_id
@property
def op_name(self):
"""Get the name of the operator."""
return self._op_name
@op_name.setter
def op_name(self, name):
"""Set the name of the operator."""
self._op_name = name
@property
def duration(self):
"""Get the duration of the operator execution."""
return self._duration
@duration.setter
def duration(self, value):
"""Set the duration of the operator execution."""
self._duration = value
class TimelineContainer:
"""
A container of operator computation metadata.
Args:
split_list (list): The split list of metadata in op_compute output file.
"""
def __init__(self, split_list):
self._op_name = split_list[0]
self._stream_id = int(split_list[1])
self._start_time = float(split_list[2])
self._duration = float(split_list[3])
self._pid = None
if len(split_list) == 5:
self._pid = int(split_list[4])
@property
def op_name(self):
"""Get the name of the operator."""
return self._op_name
@property
def stream_id(self):
"""Get the stream id of the operator."""
return self._stream_id
@property
def start_time(self):
"""Get the execution start time of the operator."""
return self._start_time
@property
def duration(self):
"""Get the duration of the operator execution."""
return self._duration
@property
def pid(self):
"""Get the pid of the operator execution."""
return self._pid
此差异已折叠。
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""The parser for hwts log file."""
import os
import struct
from mindspore.profiler.common.util import fwrite_format, get_file_join_name
from mindspore import log as logger
class HWTSLogParser:
"""
The Parser for hwts log files.
Args:
input_path (str): The profiling job path. Such as: '/var/log/npu/profiling/JOBAIFGJEJFEDCBAEADIFJAAAAAAAAAA".
output_filename (str): The output data path and name. Such as: './output_format_data_hwts_0.txt'.
"""
_source_file_target = 'hwts.log.data.45.dev.profiler_default_tag'
_dst_file_title = 'title:45 HWTS data'
_dst_file_column_title = 'Type cnt Core_ID Block_ID Task_ID Cycle_counter Stream_ID'
def __init__(self, input_path, output_filename):
self._input_path = input_path
self._output_filename = output_filename
self._source_flie_name = self._get_source_file()
def _get_source_file(self):
"""Get hwts log file name, which was created by ada service."""
file_name = get_file_join_name(self._input_path, self._source_file_target)
if not file_name:
data_path = os.path.join(self._input_path, "data")
file_name = get_file_join_name(data_path, self._source_file_target)
if not file_name:
msg = "Fail to find hwts log file, under profiling directory"
raise RuntimeError(msg)
return file_name
def execute(self):
"""
Execute the parser, get result data, and write it to the output file.
Returns:
bool, whether succeed to analyse hwts log.
"""
content_format = ['QIIIIIIIIIIII', 'QIIQIIIIIIII', 'IIIIQIIIIIIII']
log_type = ['Start of task', 'End of task', 'Start of block', 'End of block', 'Block PMU']
result_data = ""
with open(self._source_flie_name, 'rb') as hwts_data:
while True:
line = hwts_data.read(64)
if line:
if not line.strip():
continue
else:
break
byte_first_four = struct.unpack('BBHHH', line[0:8])
byte_first = bin(byte_first_four[0]).replace('0b', '').zfill(8)
ms_type = byte_first[-3:]
is_warn_res0_ov = byte_first[4]
cnt = int(byte_first[0:4], 2)
core_id = byte_first_four[1]
blk_id, task_id = byte_first_four[3], byte_first_four[4]
if ms_type in ['000', '001', '010']: # log type 0,1,2
result = struct.unpack(content_format[0], line[8:])
syscnt = result[0]
stream_id = result[1]
elif ms_type == '011': # log type 3
result = struct.unpack(content_format[1], line[8:])
syscnt = result[0]
stream_id = result[1]
elif ms_type == '100': # log type 4
result = struct.unpack(content_format[2], line[8:])
stream_id = result[2]
if is_warn_res0_ov == '0':
syscnt = result[4]
else:
syscnt = None
else:
logger.info("Profiling: invalid hwts log record type %s", ms_type)
continue
if int(task_id) < 25000:
task_id = str(stream_id) + "_" + str(task_id)
result_data += ("%-14s %-4s %-8s %-9s %-8s %-15s %s\n" %(log_type[int(ms_type, 2)], cnt, core_id,
blk_id, task_id, syscnt, stream_id))
fwrite_format(self._output_filename, data_source=self._dst_file_title, is_start=True)
fwrite_format(self._output_filename, data_source=self._dst_file_column_title)
fwrite_format(self._output_filename, data_source=result_data)
return True
此差异已折叠。
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Minddata aicpu parser."""
import os
from mindspore.profiler.common.util import get_file_join_name, fwrite_format
from mindspore import log as logger
class MinddataParser:
"""Minddata Aicpu Parser."""
@staticmethod
def parse_minddata_aicpu_data(minddata_aicpu_source_path):
"""
Parse minddata get_next info which contains queue size and execute time.
Args:
minddata_aicpu_source_path (str): the source file path.
Returns:
list[Union[str, float]], the converted data.
"""
result = list()
try:
with open(minddata_aicpu_source_path) as source_data_file:
source_data = source_data_file.read()
step_data = source_data.split("\x00")
for one_step in step_data:
if one_step:
node_info = one_step.split(", ")
node_name, node_start, node_end, queue_size = "", 0, 0, 0
if node_info:
node_name = node_info[0].replace("Node:", "")
if len(node_info) > 2:
node_start = node_info[1].replace("Run start:", "")
if node_start.isdigit():
node_start = int(node_start)
node_end = node_info[2].replace("Run end:", "")
if node_end.isdigit():
node_end = int(node_end)
if len(node_info) > 3:
queue_size = node_info[3].replace("queue size:", "")
if queue_size.isdigit():
queue_size = int(queue_size)
one_step_list = [node_name, node_start, node_end, queue_size]
result.append(one_step_list)
except OSError:
logger.error("Open get_next profiling file error.")
return result
@staticmethod
def execute(source_path, output_path, device_id):
"""
Execute the parser.
Args:
source_path (str): the source file path.
output_path (str): the output file path.
device_id (str): the device id.
"""
col_names = ["node_name", "start_time", "end_time", "queue_size"]
minddata_aicpu_source_path = get_file_join_name(
input_path=source_path, file_name='DATA_PREPROCESS.dev.AICPUMI')
if not minddata_aicpu_source_path:
minddata_aicpu_source_path = get_file_join_name(
input_path=os.path.join(source_path, "data"), file_name='DATA_PREPROCESS.dev.AICPUMI')
if not minddata_aicpu_source_path:
return
minddata_aicpu_output_path = os.path.join(output_path, "minddata_aicpu_" + device_id + ".txt")
minddata_aicpu_data = MinddataParser.parse_minddata_aicpu_data(minddata_aicpu_source_path)
if minddata_aicpu_data:
fwrite_format(minddata_aicpu_output_path, " ".join(col_names), is_start=True)
fwrite_format(minddata_aicpu_output_path, minddata_aicpu_data, is_start=True)
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Thr parser for parsing minddata pipeline files."""
import csv
import json
import os
from queue import Queue
from mindspore.profiler.common.exceptions.exceptions import \
ProfilerPathErrorException, ProfilerFileNotFoundException, \
ProfilerDirNotFoundException, ProfilerRawFileException
from mindspore import log as logger
from mindspore.profiler.common.validator.validate_path import \
validate_and_normalize_path
class MinddataPipelineParser:
"""
Thr parser for parsing minddata pipeline files.
Args:
source_dir (str): The minddata pipeline source dir.
device_id (str): The device ID.
output_path (str): The directory of the parsed file. Default: `./`.
Raises:
ProfilerPathErrorException: If the minddata pipeline file path or
the output path is invalid.
ProfilerFileNotFoundException: If the minddata pipeline file or
the output dir does not exist.
"""
_raw_pipeline_file_name = 'pipeline_profiling_{}.json'
_parsed_pipeline_file_name = 'minddata_pipeline_raw_{}.csv'
_col_names = [
'op_id', 'op_type', 'num_workers', 'output_queue_size',
'output_queue_average_size', 'output_queue_length',
'output_queue_usage_rate', 'sample_interval', 'parent_id', 'children_id'
]
def __init__(self, source_dir, device_id, output_path='./'):
self._device_id = device_id
self._pipeline_path = self._get_pipeline_path(source_dir)
self._save_path = self._get_save_path(output_path)
@property
def save_path(self):
"""
The property of save path.
Returns:
str, the save path.
"""
return self._save_path
def parse(self):
"""
Parse the minddata pipeline files.
Raises:
ProfilerRawFileException: If fails to parse the raw file of
minddata pipeline or the file is empty.
"""
with open(self._pipeline_path, 'r') as file:
try:
pipeline_info = json.load(file)
except (json.JSONDecodeError, TypeError) as err:
logger.warning(err)
raise ProfilerRawFileException(
'Fail to parse minddata pipeline file.'
)
if not pipeline_info:
logger.warning('The minddata pipeline file is empty.')
raise ProfilerRawFileException(
'The minddata pipeline file is empty.'
)
self._parse_and_save(pipeline_info)
def _get_pipeline_path(self, source_dir):
"""
Get the minddata pipeline file path.
Args:
source_dir (str): The minddata pipeline source dir.
Returns:
str, the minddata pipeline file path.
"""
pipeline_path = os.path.join(
source_dir,
self._raw_pipeline_file_name.format(self._device_id)
)
try:
pipeline_path = validate_and_normalize_path(pipeline_path)
except RuntimeError:
logger.warning('Minddata pipeline file is invalid.')
raise ProfilerPathErrorException('Minddata pipeline file is invalid.')
if not os.path.isfile(pipeline_path):
logger.warning(
'The minddata pipeline file <%s> not found.', pipeline_path
)
raise ProfilerFileNotFoundException(pipeline_path)
return pipeline_path
def _get_save_path(self, output_path):
"""
Get the save path.
Args:
output_path (str): The output dir.
Returns:
str, the save path.
"""
try:
output_dir = validate_and_normalize_path(output_path)
except ValidationError:
logger.warning('Output path is invalid.')
raise ProfilerPathErrorException('Output path is invalid.')
if not os.path.isdir(output_dir):
logger.warning('The output dir <%s> not found.', output_dir)
raise ProfilerDirNotFoundException(output_dir)
return os.path.join(
output_dir, self._parsed_pipeline_file_name.format(self._device_id)
)
def _parse_and_save(self, pipeline_info):
"""
Parse and save the parsed minddata pipeline file.
Args:
pipeline_info (dict): The pipeline info reads from the raw file of
the minddata pipeline.
Raises:
ProfilerRawFileException: If the format of minddata pipeline raw
file is wrong.
"""
sample_interval = pipeline_info.get('sampling_interval')
op_info = pipeline_info.get('op_info')
if sample_interval is None or not op_info:
raise ProfilerRawFileException(
'The format of minddata pipeline raw file is wrong.'
)
op_id_info_cache = {}
for item in op_info:
op_id_info_cache[item.get('op_id')] = item
with open(self._save_path, 'w') as save_file:
csv_writer = csv.writer(save_file)
csv_writer.writerow(self._col_names)
self._parse_and_save_op_info(
csv_writer, op_id_info_cache, sample_interval
)
def _parse_and_save_op_info(self, csv_writer, op_id_info_cache,
sample_interval):
"""
Parse and save the minddata pipeline operator information.
Args:
csv_writer (csv.writer): The csv writer.
op_id_info_cache (dict): The operator id and information cache.
sample_interval (int): The sample interval.
Raises:
ProfilerRawFileException: If the operator that id is 0 does not exist.
"""
queue = Queue()
root_node = op_id_info_cache.get(0)
if not root_node:
raise ProfilerRawFileException(
'The format of minddata pipeline raw file is wrong, '
'the operator that id is 0 does not exist.'
)
root_node['parent_id'] = None
queue.put_nowait(root_node)
while not queue.empty():
node = queue.get_nowait()
self._update_child_node(node, op_id_info_cache)
csv_writer.writerow(self._get_op_info(node, sample_interval))
op_id = node.get('op_id')
children_ids = node.get('children')
if not children_ids:
continue
for child_op_id in children_ids:
sub_node = op_id_info_cache.get(child_op_id)
sub_node['parent_id'] = op_id
queue.put_nowait(sub_node)
def _update_child_node(self, node, op_id_info_cache):
"""
Updates the child node information of the operator.
Args:
node (dict): The node represents an operator.
op_id_info_cache (dict): The operator id and information cache.
"""
child_op_ids = node.get('children')
if not child_op_ids:
return
queue = Queue()
self._cp_list_item_to_queue(child_op_ids, queue)
new_child_op_ids = []
while not queue.empty():
child_op_id = queue.get_nowait()
child_node = op_id_info_cache.get(child_op_id)
if child_node is None:
continue
metrics = child_node.get('metrics')
if not metrics or not metrics.get('output_queue'):
op_ids = child_node.get('children')
if op_ids:
self._cp_list_item_to_queue(op_ids, queue)
else:
new_child_op_ids.append(child_op_id)
node['children'] = new_child_op_ids
def _get_op_info(self, op_node, sample_interval):
"""
Get the operator information.
Args:
op_node (dict): The node represents an operator.
sample_interval (int): The sample interval.
Returns:
list[str, int, float], the operator information.
"""
queue_size = None
queue_average_size = None
queue_length = None
queue_usage_rate = None
metrics = op_node.get('metrics')
if metrics:
output_queue = metrics.get('output_queue')
if output_queue:
queue_size = output_queue.get('size')
queue_average_size = sum(queue_size) / len(queue_size)
queue_length = output_queue.get('length')
queue_usage_rate = queue_average_size / queue_length
children_id = op_node.get('children')
op_info = [
op_node.get('op_id'),
op_node.get('op_type'),
op_node.get('num_workers'),
queue_size,
queue_average_size,
queue_length,
queue_usage_rate,
sample_interval,
op_node.get('parent_id'),
children_id if children_id else None
]
return op_info
def _cp_list_item_to_queue(self, inner_list, queue):
"""
Copy the contents of a list to a queue.
Args:
inner_list (list): The list.
queue (Queue): The target queue.
"""
for item in inner_list:
queue.put_nowait(item)
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Op compute time files parser."""
import os
from mindspore.profiler.common.util import fwrite_format
from mindspore.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \
ProfilerIOException
from mindspore import log as logger
from mindspore.profiler.common.validator.validate_path import validate_and_normalize_path
from mindspore.profiler.parser.container import HWTSContainer
TIMELINE_FILE_COLUMN_TITLE = 'op_name, stream_id, start_time(ms), duration(ms)'
class OPComputeTimeParser:
"""
Join hwts info and framework info, get op time info, and output to the result file.
Args:
hwts_output_file (str): The file path of hwts_output_file. Such as: './output_format_data_hwts_0.txt".
output_filename (str): The output data file path and name. Such as: './output_op_compute_time_0.txt'.
op_task_info (dict): The task and op relation info. The format: {task_id, [opname, stream_id, block dim]}.
"""
_dst_file_title = 'title:op compute time'
_dst_file_column_title = 'op_name compute_time(ms) stream_id'
_dst_file_column_title += '\n------------ --------------- ---------'
def __init__(self, hwts_output_file, output_filename, op_task_info,
output_path, device_id):
hwts_output_file = validate_and_normalize_path(hwts_output_file)
self._hwts_output_file = hwts_output_file
self._output_filename = output_filename
self._op_task_info = op_task_info
self._output_path = output_path
self._device_id = device_id
self._min_cycle_counter = float("inf")
def _get_op_task_id_map(self):
"""
Read hwts data file, get the task time info.
Returns:
list: all hwts task time info.
"""
op_map_result = []
hwts_list = []
if not os.path.exists(self._hwts_output_file):
logger.error('The hwts output file does not exist.')
raise ProfilerFileNotFoundException('hwts output file')
with open(self._hwts_output_file, 'r') as data_file:
lines = data_file.readlines()
for line in lines:
if line.startswith("Start of task") or line.startswith("End of task"):
line_split = line.split()
container = HWTSContainer(line_split)
hwts_list.append(container)
# hwts op map by taskId
for hwts in hwts_list:
if hwts.task_id in self._op_task_info.keys():
hwts.op_name = self._op_task_info[hwts.task_id]
op_map_result.append(hwts)
return op_map_result
def execute(self):
"""Execute the parser, compute all op, get op time, and write it to the output file."""
# Calculate the execution time of operators,
# and update the minimum cycle counter.
tmp_result_data = self._calculate_op_execution_time()
# Convert time units from nanoseconds to milliseconds.
# The unit of the cycle counter is 10 nanoseconds.
op_name_time_dict = {}
op_name_stream_dict = {}
op_name_count_dict = {}
op_name_task_dict = {}
op_name_start_time = {}
self._convert_op_time_unit(
tmp_result_data, op_name_time_dict, op_name_stream_dict,
op_name_count_dict, op_name_task_dict, op_name_start_time
)
result_data = ""
total_time = 0
for op_name, time in op_name_time_dict.items():
if op_name in op_name_stream_dict.keys():
stream_id = op_name_stream_dict[op_name]
avg_time = time / op_name_count_dict[op_name]
total_time += avg_time
result_data += ("%s %s %s\n" %(op_name, str(avg_time), stream_id))
result_data += ("total op %s 0" %(str(total_time)))
timeline_data = []
for op_name, time in op_name_time_dict.items():
if op_name in op_name_stream_dict.keys():
stream_id = op_name_stream_dict[op_name]
start_time_list = op_name_start_time.get(op_name)
for (start_time, duration) in start_time_list:
timeline_data.append([op_name, stream_id, start_time, duration])
# Write the metadata of operators into the file,
# including operator name, average time, and stream id.
self._write_op_time_into_file(result_data)
# Write the timeline data into file,
# including operator name, stream id, start time, and duration.
self._write_timeline_data_into_file(timeline_data)
def _write_op_time_into_file(self, result_data):
"""
Write the metadata of operators into the file, including
op name, average time, and stream id.
Args:
result_data (str): The metadata to be written into the file.
'op_name_1', 'avg_time_1', 'stream_id_1',
'op_name_2', 'avg_time_2', 'stream_id_2',
...
"""
fwrite_format(self._output_filename, data_source=self._dst_file_title, is_start=True)
fwrite_format(self._output_filename, data_source=self._dst_file_column_title)
fwrite_format(self._output_filename, data_source=result_data)
def _write_timeline_data_into_file(self, timeline_data):
"""
Write the timeline information into the file, including
operator name, stream id, start time and duration.
Args:
timeline_data (list): The metadata to be written into the file.
[
['op_name_1', 'stream_id_1', 'start_time_1', 'durarion_1'],
['op_name_2', 'stream_id_2', 'start_time_2', 'durarion_2'],
[...]
]
"""
# sorted by start times
timeline_data.sort(key=lambda x: float(x[2]))
filename = 'output_timeline_data_{}.txt'.format(self._device_id)
file_path = os.path.join(self._output_path, filename)
file_path = validate_and_normalize_path(file_path)
# write to file
try:
with open(file_path, 'w') as f_obj:
f_obj.write(TIMELINE_FILE_COLUMN_TITLE + '\n')
for timeline in timeline_data:
timeline = [str(item) for item in timeline]
f_obj.write(','.join(timeline) + '\n')
except (IOError, OSError) as err:
logger.error('Error occurred when writing intermediate timeline file: %s', err)
raise ProfilerIOException
def _calculate_op_execution_time(self):
"""
Calculate the execution time of each operator.
Returns:
list, including the intermediate data of op execution time.
"""
tmp_result_data = []
op_map_list = self._get_op_task_id_map()
cur_index = 0
length = len(op_map_list)
min_cycle_counter = float("inf")
while cur_index < length:
if cur_index + 1 == length:
break
op_start = op_map_list[cur_index]
op_end = op_map_list[cur_index + 1]
if op_start.status == "Start" and op_end.status == "End" \
and op_start.op_name == op_end.op_name:
op_start.duration = op_end.cycle_counter - op_start.cycle_counter
tmp_result_data.append(op_start)
cur_index += 2
if not op_start.op_name.startswith("assign"):
min_cycle_counter = min(min_cycle_counter, op_start.cycle_counter)
else:
cur_index += 1
# Update the value of minimum cycle counter.
self._min_cycle_counter = min_cycle_counter / 1e5 # Convert the time unit from 10ns to 1ms
return tmp_result_data
def _convert_op_time_unit(self, op_data_list, op_name_time_dict, op_name_stream_dict,
op_name_count_dict, op_name_task_dict, op_name_start_time):
"""
Calculate the execution time of operator and convert it into millisecond.
Args:
op_data_list (list): The list of operator metadata.
op_name_time_dict (dict): The mapping relation of operator name and its execution time.
op_name_stream_dict (dict): The mapping relation of operator name and its stream id.
op_name_count_dict (dict): The mapping relation of operator name and its count.
op_name_task_dict (dict): The mapping relation of operator name and its task id.
op_name_start_time (dict): The mapping relation of operator name and its start time.
"""
factor = 1e5
for item in op_data_list:
op_name = item.op_name
# Unit conversion: converting the cycle counter into ms.
op_start_time_str = str(item.cycle_counter / factor)
op_duration = item.duration / factor
op_duration_str = str(item.duration / factor)
if op_name in op_name_time_dict.keys():
op_name_time_dict[op_name] += op_duration
if item.task_id == op_name_task_dict[op_name]:
op_name_count_dict[op_name] += 1
op_name_start_time[op_name].append(
(op_start_time_str, op_duration_str)
)
else:
op_name_time_dict[op_name] = op_duration
op_name_stream_dict[op_name] = item.stream_id
op_name_task_dict[op_name] = item.task_id
op_name_count_dict[op_name] = 1
op_name_start_time[op_name] = []
op_name_start_time[op_name].append(
(op_start_time_str, op_duration_str)
)
@property
def min_cycle_counter(self):
"""Get minimum cycle counter."""
return self._min_cycle_counter
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""The parser for step trace data."""
import csv
import json
import os
import stat
import struct
from collections import namedtuple
from decimal import Decimal
from mindspore.profiler.common.exceptions.exceptions import ProfilerPathErrorException, \
JobIdMismatchException, ProfilerIOException
from mindspore import log
from mindspore.profiler.common.util import get_summary_for_step_trace
StepTraceStruct = namedtuple(
'TrainingTraceStruct', ['tag_id', 'task_id', 'stream_id', 'sys_count']
)
class StepTraceParser:
"""
The parser for step trace data.
Args:
input_dir (str): The directory that contains original step trace data.
output_file_path (str): The output file path.
job_id (int): The job id used to define the start of new step. Default: 0.
skip_first_step (bool): Whether skip the first step or not.
"""
_event_size = 20
_fp_tag = 1
_bp_tag = 2
_end_tag = 255
def __init__(self, input_dir, output_file_path, job_id=0, skip_first_step=False):
self._input_dir = input_dir
self._output_path = output_file_path
self._job_id = job_id
self._skip_first_step = skip_first_step
self._result = []
self._header = []
self._step_num = 0
self._tag_map = {}
@property
def output_file(self):
"""The property of step trace header."""
file_name = self._output_path.rsplit('/', 2)
return file_name[-1] if len(file_name) == 3 else ''
def show(self):
"""The property of step trace info."""
summary_info = {}
if self._result:
summary_info = get_summary_for_step_trace(self._result[-1], self._header)
summary_info['total_steps'] = len(self._result) - 1
print('\nStep trace summary info (unit: syscnt):')
print(summary_info)
print('\nThe step trace parse result saves under ${summary_dir}/profiler/%s'
% self.output_file)
def parse_and_save(self):
"""Parse step trace files and save the result."""
try:
source_files = self._get_step_trace_files()
self._parse(source_files)
self._save()
except IOError as err:
log.warning(err)
raise ProfilerIOException()
else:
log.info("Finish to save intermediate result for step trace file.")
def record_point_info(self, point_info, output_path):
"""
Record point info into json.
Args:
point_info (dict): The point info about tag id and relative op name.
output_path (str): The output path for saving point info.
Returns:
dict, parsed point info.
"""
points = {
'fp_start': point_info.get(self._fp_tag, ''),
'bp_end': point_info.get(self._bp_tag, '')
}
try:
with open(output_path, 'w') as json_file:
json.dump(points, json_file)
os.chmod(output_path, stat.S_IREAD)
except (IOError, OSError) as err:
log.warning('Failed to save point info. %s', err)
raise ProfilerIOException
return points
def update_tag_op_type_map(self, point_info):
"""
update the map from tag id to op type.
Args:
point_info (dict): The point info about tag id and relative op name.
"""
tag_map = {}
for tag, op_name in point_info.items():
op_type = self._get_op_type(tag, op_name)
tag_map[tag] = op_type
log.info("Get tag types for step trace analysis: %s", tag_map)
self._tag_map = tag_map
def _get_op_type(self, tag, name):
"""
Get op type from tag and name.
Args:
tag (int): The tag id.
name (str): The op name.
Returns:
str, the op type.
"""
tag_map = {self._fp_tag: 'fp', self._bp_tag: 'bp', self._end_tag: 'end'}
# get solid tag type
op_type = tag_map.get(tag, '')
if op_type:
return op_type
# check if the tag is step tag.
if tag > self._end_tag or tag == 0:
return 'start'
# analyze the reduce tag
op_type = name.rsplit('/', 1)[-1].split('-')[0]
if not op_type:
log.warning("Unexpected op name:%s", name)
return op_type
def _get_step_trace_files(self):
"""Get step trace files."""
# step trace files may under $profiler_dir or $profiler_dir/data
profiler_dir = self._input_dir
step_trace_files = self._search_file(profiler_dir)
if not step_trace_files:
# try to find step trace files under $profiler_dir/data
profiler_dir = os.path.join(profiler_dir, 'data')
step_trace_files = self._search_file(profiler_dir)
if not step_trace_files:
raise ProfilerPathErrorException('Training trace file does not exist.')
return step_trace_files
@staticmethod
def _search_file(input_dir):
"""Search step trace file under specific input directory."""
# validate input_dir
if not os.path.isdir(input_dir):
raise ProfilerPathErrorException(
'{} does not exist or is not a dir'.format(input_dir)
)
# get step trace files
files = os.listdir(input_dir)
step_trace_files = list(
filter(
lambda file: file.startswith('training_trace') and not file.endswith('.done'),
files
)
)
# validate result
if len(step_trace_files) > 1:
# the format of file name is like
# `training_trace.46.dev.profiler_default_tag.$id.slice_$number`
# use the $number as the sorted key
try:
step_trace_files.sort(key=lambda path: int(path.rsplit('_', 1)[-1]))
except ValueError as err:
log.warning("Unable to parse file names: %s. %s", step_trace_files, err)
step_trace_files = []
file_paths = [os.path.join(input_dir, file) for file in step_trace_files]
log.info("Find %d step trace files.", len(file_paths))
return file_paths
def _parse(self, source_files):
"""Parse source step trace files."""
log.info("Start to parse step trace file.")
event_info = {}
for source_file in source_files:
with open(source_file, 'rb') as handler:
content = handler.read()
for step_trace in self._get_next_step_trace(content, event_info):
if self._skip_first_step:
self._skip_first_step = False
continue
self._record_trace_event(step_trace)
self._record_average_info()
log.info("Finish to parse step trace file.")
def _get_next_step_trace(self, content, event_info):
"""
Get next step trace info.
Args:
content (bytes): The input step trace info.
event_info (dict): The event info.
Returns:
Generator, return the step trace one by one.
"""
for pos in range(0, len(content), 20):
next_event = self._get_trace_struct(content[pos:pos + self._event_size])
self._construct_event_info(next_event, event_info)
if event_info.get('end'):
yield event_info
def _get_trace_struct(self, bin_info):
"""Translate event info to StepTraceStruct."""
if len(bin_info) == self._event_size:
parsed_info = struct.unpack('=QHHQ', bin_info)
return StepTraceStruct(*parsed_info)
return None
def _construct_event_info(self, next_event, event_info):
"""Construct event info according to next_event."""
min_job_id = 255
step_flag: bool = lambda tag: tag > min_job_id or tag == 0
end_flag: bool = lambda tag: tag == min_job_id
fp_flag: bool = lambda tag: tag == self._fp_tag
bp_flag: bool = lambda tag: tag == self._bp_tag
def _on_step_event():
"""Handle step event."""
self._validate_tag_id(tag_id)
start_time = event_info.get('end', '-')
event_info.clear()
event_info['start'] = start_time
event_info['reduce'] = {}
def _on_reduce_event(reduce_tag_id):
"""Handle reduce event."""
stream_id = next_event.stream_id
if event_info['reduce'].get(stream_id):
event_info['reduce'][stream_id].append((reduce_tag_id, sys_count))
else:
event_info['reduce'][stream_id] = [(reduce_tag_id, sys_count)]
tag_id = next_event.tag_id
sys_count = next_event.sys_count
if end_flag(tag_id):
event_info['end'] = sys_count
elif step_flag(tag_id):
_on_step_event()
elif fp_flag(tag_id):
event_info['fp'] = sys_count
elif bp_flag(tag_id):
event_info['bp'] = sys_count
else:
_on_reduce_event(tag_id)
def _validate_tag_id(self, job_id):
"""Check the job id in source step trace file is same as user set."""
if not self._job_id:
self._job_id = job_id
elif self._job_id != job_id:
raise JobIdMismatchException()
def _record_trace_event(self, step_trace):
"""Record trace event."""
self._step_num += 1
start_time = step_trace.get('start')
end_time = step_trace.get('end')
fp_time = step_trace.get('fp')
bp_time = step_trace.get('bp')
if not (start_time and end_time and fp_time and bp_time):
log.warning("The step %d lacks basic time.", self._step_num)
return
if start_time == '-':
start_time = fp_time
row_data = {
'step_num': self._step_num,
'start_point': start_time,
'end_point': end_time,
'total': end_time - start_time,
'fp_point': fp_time,
'bp_point': bp_time,
'iteration_interval': fp_time - start_time,
'fp_and_bp': bp_time - fp_time,
'tail': end_time - bp_time
}
# update reduce info
self._update_reduce_info(step_trace, row_data)
# save the row data
if not self._header:
self._header = list(row_data.keys())
row_data_list = [row_data.get(header_name, 0) for header_name in self._header]
self._result.append(row_data_list)
def _update_reduce_info(self, step_trace, row_data):
"""Extract reduce info."""
reduce_time = step_trace.get('reduce', {})
for stream_id, time_points in reduce_time.items():
time_point_num = len(time_points)
if time_point_num % 2:
log.warning("Stream %d has %d reduce time points.", stream_id, time_point_num)
continue
for index, point_id in enumerate(range(0, time_point_num, 2)):
field_name = f'stream_{stream_id}_{index}'
reduce_info = self._get_single_reduce_event_info(
field_name, time_points[point_id], time_points[point_id + 1])
row_data.update(reduce_info)
def _get_single_reduce_event_info(self, field_name, start_point, end_point):
"""
Get single reduce info.
Args:
field_name (str): The field name.
start_point (Tuple[int, int]): Start point time info, including (tag_id, sys_count).
end_point (Tuple[int, int]): End point time info, including (tag_id, sys_count).
Returns:
dict, reduce info.
"""
reduce_info = {}
if end_point[0] - start_point[0] != 1 or end_point[0] % 2:
log.warning("Unmatched reduce event <%s, %s>.", start_point, end_point)
return reduce_info
op_type = self._tag_map.get(start_point[0])
# append field name with op type.
if not op_type:
log.warning("Can't recognize the inner type for point tag: %d.", start_point[0])
field_name += '_parallel'
else:
field_name += '_' + op_type
reduce_info[field_name] = end_point[1] - start_point[1]
reduce_info[field_name + '_start_point'] = start_point[1]
reduce_info[field_name + '_end_point'] = end_point[1]
return reduce_info
def _record_average_info(self):
"""Calculate average info."""
result_size = len(self._result)
# calculate average data for each column in result data
average_data = [0] * len(self._header)
if result_size >= 2:
for row_info in self._result[1:]:
average_data = [
Decimal(i) + Decimal(j) for i, j in zip(row_info, average_data)
]
average_data = [
round((item / (result_size - 1))) for item in average_data
]
# change step num info in average_data to None
step_num_index = self._header.index('step_num')
average_data[step_num_index] = '-'
self._result.append(average_data)
log.info("Finish add average info for step trace.")
def _save(self):
log.info("Start to save step trace file.")
if not self._header:
return
with open(self._output_path, 'w') as file_handle:
csv_writer = csv.writer(file_handle)
csv_writer.writerow(self._header)
for row_data in self._result:
csv_writer.writerow(row_data)
os.chmod(self._output_path, stat.S_IREAD)
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Profiling api file."""
import os
import time
from mindspore import log as logger, context
from mindspore.communication.management import release
from mindspore.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \
ProfilerIOException, ProfilerException
from mindspore.profiler.common.util import get_file_names, fwrite_format
from mindspore.profiler.common.validator.checkparam import \
check_bool, check_subgraph
from mindspore.profiler.common.validator.validate_path import \
validate_and_normalize_path
from mindspore.profiler.parser.aicpu_data_parser import DataPreProcessParser
from mindspore.profiler.parser.framework_parser import FrameworkParser
from mindspore.profiler.parser.hwts_log_parser import HWTSLogParser
from mindspore.profiler.parser.integrator import Integrator
from mindspore.profiler.parser.integrator import TimelineAnalyser
from mindspore.profiler.parser.minddata_parser import MinddataParser
from mindspore.profiler.parser.minddata_pipeline_parser import \
MinddataPipelineParser
from mindspore.profiler.parser.optime_parser import OPComputeTimeParser
from mindspore.profiler.parser.step_trace_parser import StepTraceParser
PROFILING_LOG_BASE_PATH = "/var/log/npu/profiling"
INIT_OP_NAME = 'Default/InitDataSetQueue'
class Profiler:
"""
Performance profiling API.
Enable MindSpore users to profile the performance of neural network.
Args:
subgraph (str): Define which subgraph to monitor and analyse, can be 'all', 'Default', 'Gradients'.
is_detail (bool): Whether to show profiling data for op_instance level, only show optype level if False.
is_show_op_path (bool): Whether to save the full path for each op instance.
output_path (str): Output data path.
optypes_to_deal (str): Op type names, the data of which optype should be collected and analysed,
will deal with all op if null; Different op types should be seperated by comma.
optypes_not_deal (str): Op type names, the data of which optype will not be collected and analysed;
Different op types should be seperated by comma.
Examples:
>>> from mindspore.profiler import Profiler
>>> import mindspore.context
>>> context.set_context(mode=context.GRAPH_MODE, device_target="Ascend",
>>> device_id=int(os.environ["DEVICE_ID"]))
>>> profiler = Profiler(subgraph='all', is_detail=True, is_show_op_path=False, output_path='./data')
>>> model = Model()
>>> model.train()
>>> profiler.analyse()
"""
_base_profiling_container_path = "/var/log/npu/profiling/container"
_hwts_output_filename_target = "output_format_data_hwts_"
_opcompute_output_filename_target = "output_op_compute_time_"
_aicpu_op_output_filename_target = "output_data_preprocess_aicpu_"
def __init__(self, subgraph='all', is_detail=True, is_show_op_path=False, output_path='./data',
optypes_to_deal='', optypes_not_deal='Variable', job_id=""):
# get device_id and device_target
self._get_devid_and_devtarget()
self._container_path = os.path.join(self._base_profiling_container_path, self._dev_id)
data_path = os.path.join(self._container_path, "data")
if not os.path.exists(data_path):
os.makedirs(data_path, exist_ok=True)
self._output_path = validate_and_normalize_path(output_path)
self._output_path = os.path.join(self._output_path, "profiler")
if not os.path.exists(self._output_path):
os.makedirs(self._output_path, exist_ok=True)
os.environ['PROFILING_MODE'] = 'true'
os.environ['PROFILING_OPTIONS'] = 'training_trace:task_trace'
os.environ['MINDDATA_PROFILING_DIR'] = self._output_path
os.environ['DEVICE_ID'] = self._dev_id
os.environ['AICPU_PROFILING_MODE'] = 'true'
os.environ['PROFILING_DIR'] = str(self._container_path)
# use context interface to open profiling, for the new mindspore version(after 2020.5.21)
context.set_context(enable_profiling=True, profiling_options="training_trace:task_trace")
self._subgraph = check_subgraph(subgraph)
self._valid_optype_name = optypes_to_deal.split(",") if optypes_to_deal else []
self._filt_optype_names = optypes_not_deal.split(",") if optypes_not_deal else []
self._detail = check_bool(is_detail, 'is_detail')
self._withfullpath = check_bool(is_show_op_path, 'is_show_op_path')
self._profiling_job_id = job_id
# add job id env through user input later
self._job_id_env = 0
self._start_time = int(time.time() * 10000000)
logger.info("Profiling: profiling start time: %d", self._start_time)
def analyse(self):
"""
Collect and analyse performance data, called after training or during training.
Examples:
>>> from mindspore.profiler import Profiler
>>> import mindspore.context
>>> context.set_context(mode=context.GRAPH_MODE, device_target="Ascend",
>>> device_id=int(os.environ["DEVICE_ID"]))
>>> profiler = Profiler(subgraph='all', is_detail=True, is_show_op_path=False, output_path='./data')
>>> model = Model()
>>> model.train()
>>> profiler.analyse()
"""
release()
job_id = self._get_profiling_job_id()
logger.info("Profiling: job id is %s ", job_id)
source_path = os.path.join(PROFILING_LOG_BASE_PATH, job_id)
# parse hwts.log.data.45.dev file, and get task profiling data
hwts_output_filename = self._hwts_output_filename_target + self._dev_id + ".txt"
hwts_output_filename = os.path.join(self._output_path, hwts_output_filename)
hwtslog_parser = HWTSLogParser(source_path, hwts_output_filename)
result = hwtslog_parser.execute()
if not result:
logger.error("Profiling: fail to parse hwts log file.")
return
# parse Framework file, and get the relation of op and tasks
framework_parser = FrameworkParser(job_id, self._dev_id, self._output_path)
framework_parser.parse()
op_task_dict = framework_parser.to_task_id_full_op_name_dict()
if not op_task_dict:
logger.error("Profiling: fail to parse framework files.")
return
# get op compute time from hwts data and framework data, write output_op_compute_time.txt
opcompute_output_filename = self._opcompute_output_filename_target + self._dev_id + ".txt"
opcompute_output_filename = os.path.join(self._output_path, opcompute_output_filename)
optime_parser = OPComputeTimeParser(
hwts_output_filename, opcompute_output_filename,
op_task_dict, self._output_path, self._dev_id
)
optime_parser.execute()
# parse DATA_PREPROCESS.dev.AICPU file, write output_data_preprocess_aicpu_x.txt
output_data_preprocess_aicpu = self._aicpu_op_output_filename_target + self._dev_id + ".txt"
output_data_preprocess_aicpu = os.path.join(self._output_path, output_data_preprocess_aicpu)
aicpu_data_parser = DataPreProcessParser(source_path, output_data_preprocess_aicpu)
aicpu_data_parser.execute()
# Parsing minddata AICPU profiling
MinddataParser.execute(source_path, self._output_path, self._dev_id)
# parse minddata pipeline operator and queue
try:
pipeline_parser = MinddataPipelineParser(self._output_path, self._dev_id, self._output_path)
pipeline_parser.parse()
except ProfilerException as err:
logger.warning(err.message)
# analyse op compute time info
try:
self._analyser_op_info()
except ProfilerException as err:
logger.warning(err.message)
# analyse step trace info
try:
self._analyse_step_trace(source_path, framework_parser)
except ProfilerException as err:
logger.warning(err.message)
# analyse timeline info
try:
self._analyse_timeline(aicpu_data_parser, optime_parser)
except (ProfilerIOException, ProfilerFileNotFoundException, RuntimeError) as err:
logger.warning('Fail to write timeline data: %s', err)
def _analyse_step_trace(self, source_path, framework_parser):
"""
Analyse step trace data and save the result.
Args:
source_path (str): The directory that contains the step trace original data.
framework_parser (FrameworkParser): The framework parse instance.
"""
logger.info("Begin to parse step trace.")
# construct output path
step_trace_intermediate_file_path = os.path.join(
self._output_path,
f'step_trace_raw_{self._dev_id}_detail_time.csv'
)
point_info_file_path = os.path.join(
self._output_path,
'step_trace_point_info.json'
)
# whether keep the first step
skip_first_step_flag = framework_parser.check_op_name(INIT_OP_NAME)
point_info = framework_parser.point_info
# parser the step trace files and save the result to disk
parser = StepTraceParser(input_dir=source_path,
output_file_path=step_trace_intermediate_file_path,
job_id=self._job_id_env,
skip_first_step=skip_first_step_flag)
parser.update_tag_op_type_map(point_info)
parser.parse_and_save()
point_info = parser.record_point_info(point_info, point_info_file_path)
# print parser result
parser.show()
logger.info("Finish saving the intermediate result: %s", step_trace_intermediate_file_path)
logger.info("The point info is: %s", point_info)
def _analyse_timeline(self, aicpu_parser, optime_parser):
"""
Analyse and parse timeline info.
Args:
aicpu_parser (DataPreProcessParser): The parser instance for AI CPU operator
execution time calculation.
optime_parser (OPComputeTimeParserParser): The parser instance for AI Core
operator execution time calculation.
"""
timeline_analyser = TimelineAnalyser(self._output_path, self._dev_id)
# Get framework info
integrator = Integrator(self._output_path, self._dev_id)
aicore_detail_data = integrator.get_aicore_detail_data()
aicore_detail_data_size = len(aicore_detail_data)
col_names = ['op_name', 'op_type', 'avg_execution_time', 'subgraph',
'full_op_name', 'op_info']
framework_info = {
'col_name': col_names,
'object': aicore_detail_data,
'size': aicore_detail_data_size
}
all_reduce_info = integrator.query_for_all_reduce()
# Get timeline info
logger.info('Start writing timeline info...')
logger.info('Warm Prompt: It could take a few minutes if you are training '
'with a complex network or more than 10 steps.')
# Add info into timeline, such as AI CPU, AllReduce, framework info.
aicpu_info = aicpu_parser.query_aicpu_data()
min_cycle_counter = min(aicpu_parser.min_cycle_counter, optime_parser.min_cycle_counter)
timeline_analyser.init_timeline(all_reduce_info, framework_info, aicpu_info, min_cycle_counter)
timeline_analyser.write_timeline()
timeline_analyser.write_timeline_summary()
def __del__(self):
"""Disable the profiling collection service, called after training."""
os.environ['PROFILING_MODE'] = str("false")
context.set_context(enable_profiling=False)
def _get_profiling_job_id(self):
"""Get profiling job id, which was generated by ada service.
Returns:
str: profiling jon id.
"""
if self._profiling_job_id:
return self._profiling_job_id
job_id = ""
cmd = "ls -t " + PROFILING_LOG_BASE_PATH + "|grep JOB|awk '{print $1}'"
r = os.popen(cmd)
profiling_job_dirs = r.readlines()
r.close()
for item in profiling_job_dirs:
path = os.path.join(PROFILING_LOG_BASE_PATH, item.strip())
log_file = get_file_names(path, "host_start.log")
if not log_file:
logger.error("Profiling: job path %s, host_start.log not exist.", path)
continue
log_file = os.path.join(path, log_file[0])
item_dict = self._parse_host_start_log(log_file)
if not item_dict:
logger.error("Profiling: job path %s, fail to get job start info.", path)
continue
if self._start_time > int(item_dict["start_time"]):
logger.info("Profiling: job path %s, start_time %s, training start_time %d.",
path, item_dict["start_time"], self._start_time)
break
if self._dev_id != item_dict["device_id"]:
logger.info("Profiling: job path %s, dev id %s, training device id %s.",
path, item_dict["device_id"], self._dev_id)
continue
job_id = item.strip()
break
if not job_id:
msg = "Fail to get profiling job, please check whether job dir was generated"
raise RuntimeError(msg)
return job_id
def _parse_host_start_log(self, input_file):
"""
Parse host start log file, get the device id and start time of the job.
Args:
input_file (str): The file path of the host start log file.
Returns:
dict, job start time and device id.
"""
item_dict = {}
for line in open(input_file):
if "Device" in line:
item_dict["device_id"] = line[7:len(line)-2]
elif "clock_realtime" in line:
item_dict["start_time"] = line[16:len(line)-3]
return item_dict
def _analyser_op_info(self):
"""Analyse the operator information."""
integrator = Integrator(self._output_path, self._dev_id)
integrator.integrate()
aicore_type_result = self._query_op_type_info()
detail_file_path = os.path.join(
self._output_path,
'output_op_compute_time_detail_{}.txt'.format(self._dev_id)
)
fwrite_format(detail_file_path, data_source='title:op compute time')
display_names = [
'optype_name', 'compute_time(ms, per-step)',
'called_times(per-step)', 'percent'
]
fwrite_format(detail_file_path, data_source=" ".join(display_names), is_print=True)
fwrite_format(detail_file_path, data_source=aicore_type_result, is_print=True)
if self._detail:
op_type_order = [item[0] for item in aicore_type_result]
aicore_detail_result = self._query_op_detail_info(op_type_order)
fwrite_format(detail_file_path, data_source='', is_print=True)
fwrite_format(detail_file_path, data_source='Detail:', is_print=True)
col_names = ['op_name', 'op_type', 'avg_execution_time', 'subgraph',
'full_op_name', 'op_info']
fwrite_format(detail_file_path, data_source=" ".join(col_names), is_print=True)
fwrite_format(detail_file_path, data_source=aicore_detail_result, is_print=True)
def _query_op_type_info(self):
"""
Query AICORE operator type information.
Returns:
list[list], the AICORE operator type and execution time information.
"""
integrator = Integrator(self._output_path, self._dev_id)
return integrator.get_aicore_data()
def _query_op_detail_info(self, op_type_order):
"""
Query AICORE operator detail information.
Args:
op_type_order(list): The name of the op type in order.
Returns:
dict, the AICORE operator detail information.
"""
op_type_condition = {}
if self._valid_optype_name:
op_type_condition['in'] = self._valid_optype_name
if self._filt_optype_names:
op_type_condition['not_in'] = self._filt_optype_names
subgraph_condition = {}
if self._subgraph != 'all':
subgraph_condition['in'] = [self._subgraph]
integrator = Integrator(self._output_path, self._dev_id)
return integrator.get_aicore_detail_data()
def _get_devid_and_devtarget(self):
"""Get device id and target of this training."""
device_target = ""
dev_id = ""
try:
dev_id = str(context.get_context("device_id"))
device_target = context.get_context("device_target")
except ValueError as err:
logger.error("Profiling: fail to get context, %s", err)
if not dev_id or not dev_id.isdigit():
dev_id = os.getenv('DEVICE_ID')
if not dev_id or not dev_id.isdigit():
dev_id = "0"
logger.error("Fail to get DEVICE_ID, use 0 instead.")
if device_target and device_target != "Davinci" \
and device_target != "Ascend":
msg = "Profiling: unsupport backend: %s" % device_target
raise RuntimeError(msg)
self._dev_id = dev_id
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册