提交 d3cc7a89 编写于 作者: Y yuximiao

remove profiler user interface.

上级 f674ae3e
......@@ -12,16 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""
Profiler Module Introduction.
This module provides Python APIs to enable the profiling of MindSpore neural networks.
Users can import the mindinsight.profiler.Profiler, initialize the Profiler object to start profiling,
and use Profiler.analyse() to stop profiling and analyse the results.
To visualize the profiling results, users can open MindInsight Web, find the corresponding run
and click the profile link.
Now, Profiler supports the AICore operator analysis.
"""
from mindinsight.profiler.profiling import Profiler
__all__ = ["Profiler"]
"""Profiler Module Introduction."""
......@@ -15,7 +15,7 @@
"""The analyser factory."""
import threading
import mindinsight.profiler.analyser as analyser_module
from mindinsight.profiler import analyser as analyser_module
from mindinsight.profiler.common.exceptions.exceptions import \
ProfilerAnalyserNotExistException
......
......@@ -17,7 +17,6 @@ import json
import os
from mindinsight.profiler.analyser.base_analyser import BaseAnalyser
from mindinsight.profiler.parser.container import TimelineContainer
from mindinsight.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \
ProfilerIOException
from mindinsight.profiler.common.log import logger
......@@ -27,6 +26,48 @@ from mindinsight.profiler.common.validator.validate_path import validate_and_nor
SIZE_LIMIT = 20 * 1024 * 1024 # 20MB
class TimelineContainer:
"""
A container of operator computation metadata.
Args:
split_list (list): The split list of metadata in op_compute output file.
"""
def __init__(self, split_list):
self._op_name = split_list[0]
self._stream_id = int(split_list[1])
self._start_time = float(split_list[2])
self._duration = float(split_list[3])
self._pid = None
if len(split_list) == 5:
self._pid = int(split_list[4])
@property
def op_name(self):
"""Get the name of the operator."""
return self._op_name
@property
def stream_id(self):
"""Get the stream id of the operator."""
return self._stream_id
@property
def start_time(self):
"""Get the execution start time of the operator."""
return self._start_time
@property
def duration(self):
"""Get the duration of the operator execution."""
return self._duration
@property
def pid(self):
"""Get the pid of the operator execution."""
return self._pid
class TimelineAnalyser(BaseAnalyser):
"""
Analyse timeline data from file.
......
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""
The parser for AI CPU preprocess data.
"""
import os
from tabulate import tabulate
from mindinsight.profiler.common._utils import fwrite_format, get_file_join_name
from mindinsight.profiler.common.log import logger
class DataPreProcessParser:
"""
The Parser for AI CPU preprocess data.
Args:
input_path(str): The profiling job path.
output_filename(str): The output data path and name.
"""
_source_file_target = 'DATA_PREPROCESS.dev.AICPU.'
_dst_file_title = 'title:DATA_PREPROCESS AICPU'
_dst_file_column_title = ['serial_number', 'node_type_name', 'total_time(ms)',
'dispatch_time(ms)', 'run_start', 'run_end']
_ms_unit = 1000
def __init__(self, input_path, output_filename):
self._input_path = input_path
self._output_filename = output_filename
self._source_file_name = self._get_source_file()
self._ms_kernel_flag = 3
self._other_kernel_flag = 6
self._thread_flag = 7
self._ms_kernel_run_end_index = 2
self._other_kernel_run_end_index = 5
self._result_list = []
self._min_cycle_counter = float('inf')
def _get_source_file(self):
"""Get log file name, which was created by ada service."""
file_name = get_file_join_name(self._input_path, self._source_file_target)
if not file_name:
data_path = os.path.join(self._input_path, "data")
file_name = get_file_join_name(data_path, self._source_file_target)
return file_name
def _get_kernel_result(self, number, node_list, thread_list):
"""Get the profiling data form different aicpu kernel"""
try:
if len(node_list) == self._ms_kernel_flag and len(thread_list) == self._thread_flag:
node_type_name = node_list[0].split(':')[-1]
run_end_index = self._ms_kernel_run_end_index
elif len(node_list) == self._other_kernel_flag and len(thread_list) == self._thread_flag:
node_type_name = node_list[0].split(':')[-1].split('/')[-1].split('-')[0]
run_end_index = self._other_kernel_run_end_index
else:
logger.warning("the data format can't support 'node_list':%s", str(node_list))
return None
run_start = node_list[1].split(':')[-1].split(' ')[0]
run_end = node_list[run_end_index].split(':')[-1].split(' ')[0]
total_time = float(thread_list[-1].split('=')[-1].split()[0]) / self._ms_unit
dispatch_time = float(thread_list[-2].split('=')[-1].split()[0]) / self._ms_unit
return [number, node_type_name, total_time, dispatch_time,
run_start, run_end]
except IndexError as e:
logger.exception(e)
return None
def execute(self):
"""Execute the parser, get result data, and write it to the output file."""
if not os.path.exists(self._source_file_name):
logger.info("Did not find the aicpu profiling source file")
return
with open(self._source_file_name, 'rb') as ai_cpu_data:
ai_cpu_str = str(ai_cpu_data.read().replace(b'\n\x00', b' ___ ')
.replace(b'\x00', b' ___ '))[2:-1]
ai_cpu_lines = ai_cpu_str.split(" ___ ")
result_list = list()
ai_cpu_total_time_summary = 0
# Node serial number.
serial_number = 1
for i in range(len(ai_cpu_lines) - 1):
node_line = ai_cpu_lines[i]
thread_line = ai_cpu_lines[i + 1]
result = []
if "Node" in node_line and "Thread" in thread_line:
# Get the node data from node_line
node_list = node_line.split(',')
thread_list = thread_line.split(',')
result = self._get_kernel_result(serial_number, node_list, thread_list)
if result is None:
continue
result_list.append(result)
# Calculate the total time.
total_time = result[2]
ai_cpu_total_time_summary += total_time
# Increase node serial number.
serial_number += 1
elif "Node" in node_line and "Thread" not in thread_line:
node_type_name = node_line.split(',')[0].split(':')[-1]
logger.warning("The node type:%s cannot find thread data", node_type_name)
if result_list:
ai_cpu_total_time = format(ai_cpu_total_time_summary, '.6f')
result_list.append(["AI CPU Total Time(ms):", ai_cpu_total_time])
fwrite_format(self._output_filename, data_source=self._dst_file_title, is_print=True,
is_start=True)
fwrite_format(self._output_filename,
data_source=tabulate(result_list, self._dst_file_column_title,
tablefmt='simple'),
is_start=True, is_print=True)
# For timeline display.
self._result_list = result_list
def query_aicpu_data(self):
"""
Get execution time of AI CPU operator.
Returns:
a dict, the metadata of AI CPU operator execution time.
"""
stream_id = 0 # Default stream id for AI CPU.
pid = 9000 # Default pid for AI CPU.
factor = 1000 # Convert time unit from 1us to 1ms
total_time = 0
min_cycle_counter = float('inf')
aicpu_info = []
op_count_list = []
for aicpu_item in self._result_list:
if "AI CPU Total Time(ms):" in aicpu_item:
total_time = aicpu_item[-1]
continue
op_name = aicpu_item[1]
start_time = float(aicpu_item[4]) / factor
min_cycle_counter = min(min_cycle_counter, start_time)
end_time = float(aicpu_item[5]) / factor
duration = end_time - start_time
aicpu_info.append([op_name, stream_id, start_time, duration, pid])
# Record the number of operator types.
if op_name not in op_count_list:
op_count_list.append(op_name)
self._min_cycle_counter = min_cycle_counter
aicpu_dict = {
'info': aicpu_info,
'total_time': float(total_time),
'op_exe_times': len(aicpu_info),
'num_of_ops': len(op_count_list),
'num_of_streams': 1
}
return aicpu_dict
@property
def min_cycle_counter(self):
"""Get minimum cycle counter in AI CPU."""
return self._min_cycle_counter
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""The container of metadata used in profiler parser."""
class HWTSContainer:
"""
HWTS output container.
Args:
split_list (list): The split list of metadata in HWTS output file.
"""
def __init__(self, split_list):
self._op_name = ''
self._duration = None
self._status = split_list[0]
self._task_id = split_list[6]
self._cycle_counter = float(split_list[7])
self._stream_id = split_list[8]
@property
def status(self):
"""Get the status of the operator, i.e. Start or End."""
return self._status
@property
def task_id(self):
"""Get the task id of the operator."""
return self._task_id
@property
def cycle_counter(self):
"""Get the cycle counter."""
return self._cycle_counter
@property
def stream_id(self):
"""Get the stream id of the operator."""
return self._stream_id
@property
def op_name(self):
"""Get the name of the operator."""
return self._op_name
@op_name.setter
def op_name(self, name):
"""Set the name of the operator."""
self._op_name = name
@property
def duration(self):
"""Get the duration of the operator execution."""
return self._duration
@duration.setter
def duration(self, value):
"""Set the duration of the operator execution."""
self._duration = value
class TimelineContainer:
"""
A container of operator computation metadata.
Args:
split_list (list): The split list of metadata in op_compute output file.
"""
def __init__(self, split_list):
self._op_name = split_list[0]
self._stream_id = int(split_list[1])
self._start_time = float(split_list[2])
self._duration = float(split_list[3])
self._pid = None
if len(split_list) == 5:
self._pid = int(split_list[4])
@property
def op_name(self):
"""Get the name of the operator."""
return self._op_name
@property
def stream_id(self):
"""Get the stream id of the operator."""
return self._stream_id
@property
def start_time(self):
"""Get the execution start time of the operator."""
return self._start_time
@property
def duration(self):
"""Get the duration of the operator execution."""
return self._duration
@property
def pid(self):
"""Get the pid of the operator execution."""
return self._pid
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""The parser for hwts log file."""
import os
import struct
from mindinsight.profiler.common._utils import fwrite_format, get_file_join_name
from mindinsight.profiler.common.log import logger
class HWTSLogParser:
"""
The Parser for hwts log files.
Args:
_input_path (str): The profiling job path. Such as: '/var/log/npu/profiling/JOBAIFGJEJFEDCBAEADIFJAAAAAAAAAA".
output_filename (str): The output data path and name. Such as: './output_format_data_hwts_0.txt'.
"""
_source_file_target = 'hwts.log.data.45.dev.profiler_default_tag'
_dst_file_title = 'title:45 HWTS data'
_dst_file_column_title = 'Type cnt Core_ID Block_ID Task_ID Cycle_counter Stream_ID'
def __init__(self, input_path, output_filename):
self._input_path = input_path
self._output_filename = output_filename
self._source_flie_name = self._get_source_file()
def _get_source_file(self):
"""Get hwts log file name, which was created by ada service."""
file_name = get_file_join_name(self._input_path, self._source_file_target)
if not file_name:
data_path = os.path.join(self._input_path, "data")
file_name = get_file_join_name(data_path, self._source_file_target)
if not file_name:
msg = ("Fail to find hwts log file, under profiling directory")
raise RuntimeError(msg)
return file_name
def execute(self):
"""
Execute the parser, get result data, and write it to the output file.
Returns:
bool, whether succeed to analyse hwts log.
"""
content_format = ['QIIIIIIIIIIII', 'QIIQIIIIIIII', 'IIIIQIIIIIIII']
log_type = ['Start of task', 'End of task', 'Start of block', 'End of block', 'Block PMU']
result_data = ""
with open(self._source_flie_name, 'rb') as hwts_data:
while True:
line = hwts_data.read(64)
if line:
if not line.strip():
continue
else:
break
byte_first_four = struct.unpack('BBHHH', line[0:8])
byte_first = bin(byte_first_four[0]).replace('0b', '').zfill(8)
ms_type = byte_first[-3:]
is_warn_res0_ov = byte_first[4]
cnt = int(byte_first[0:4], 2)
core_id = byte_first_four[1]
blk_id, task_id = byte_first_four[3], byte_first_four[4]
if ms_type in ['000', '001', '010']: # log type 0,1,2
result = struct.unpack(content_format[0], line[8:])
syscnt = result[0]
stream_id = result[1]
elif ms_type == '011': # log type 3
result = struct.unpack(content_format[1], line[8:])
syscnt = result[0]
stream_id = result[1]
elif ms_type == '100': # log type 4
result = struct.unpack(content_format[2], line[8:])
stream_id = result[2]
if is_warn_res0_ov == '0':
syscnt = result[4]
else:
syscnt = None
else:
logger.info("Profiling: invalid hwts log record type %s", ms_type)
continue
if int(task_id) < 25000:
task_id = str(stream_id) + "_" + str(task_id)
result_data += ("%-14s %-4s %-8s %-9s %-8s %-15s %s\n" %(log_type[int(ms_type, 2)], cnt, core_id,
blk_id, task_id, syscnt, stream_id))
fwrite_format(self._output_filename, data_source=self._dst_file_title, is_start=True)
fwrite_format(self._output_filename, data_source=self._dst_file_column_title)
fwrite_format(self._output_filename, data_source=result_data)
return True
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Minddata aicpu parser."""
import os
from tabulate import tabulate
from mindinsight.profiler.common._utils import get_file_join_name, fwrite_format
from mindinsight.profiler.common.log import logger
class MinddataParser:
"""Minddata Aicpu Parser."""
@staticmethod
def parse_minddata_aicpu_data(minddata_aicpu_source_path):
"""
Parse minddata get_next info which contains queue size and execute time.
Args:
minddata_aicpu_source_path (str): the source file path.
Returns:
list[Union[str, float]], the converted data.
"""
result = list()
try:
with open(minddata_aicpu_source_path) as source_data_file:
source_data = source_data_file.read()
step_data = source_data.split("\x00")
for one_step in step_data:
if one_step:
node_info = one_step.split(", ")
node_name, node_start, node_end, queue_size = "", 0, 0, 0
if node_info:
node_name = node_info[0].replace("Node:", "")
if len(node_info) > 2:
node_start = node_info[1].replace("Run start:", "")
if node_start.isdigit():
node_start = int(node_start)
node_end = node_info[2].replace("Run end:", "")
if node_end.isdigit():
node_end = int(node_end)
if len(node_info) > 3:
queue_size = node_info[3].replace("queue size:", "")
if queue_size.isdigit():
queue_size = int(queue_size)
one_step_list = [node_name, node_start, node_end, queue_size]
result.append(one_step_list)
except OSError:
logger.error("Open get_next profiling file error.")
return result
@staticmethod
def execute(source_path, output_path, device_id):
"""
Execute the parser.
Args:
source_path (str): the source file path.
output_path (str): the output file path.
device_id (str): the device id.
"""
col_names = ["node_name", "start_time", "end_time", "queue_size"]
minddata_aicpu_source_path = get_file_join_name(
input_path=source_path, file_name='DATA_PREPROCESS.dev.AICPUMI')
if not minddata_aicpu_source_path:
minddata_aicpu_source_path = get_file_join_name(
input_path=os.path.join(source_path, "data"), file_name='DATA_PREPROCESS.dev.AICPUMI')
if not minddata_aicpu_source_path:
return
minddata_aicpu_output_path = os.path.join(output_path, "minddata_aicpu_" + device_id + ".txt")
minddata_aicpu_data = MinddataParser.parse_minddata_aicpu_data(minddata_aicpu_source_path)
if minddata_aicpu_data:
fwrite_format(
minddata_aicpu_output_path,
tabulate(minddata_aicpu_data, col_names, tablefmt='simple'),
is_start=True
)
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Thr parser for parsing minddata pipeline files."""
import csv
import json
import os
from queue import Queue
from marshmallow import ValidationError
from mindinsight.profiler.common.exceptions.exceptions import \
ProfilerPathErrorException, ProfilerFileNotFoundException, \
ProfilerDirNotFoundException, ProfilerRawFileException
from mindinsight.profiler.common.log import logger
from mindinsight.profiler.common.validator.validate_path import \
validate_and_normalize_path
class MinddataPipelineParser:
"""
Thr parser for parsing minddata pipeline files.
Args:
source_dir (str): The minddata pipeline source dir.
device_id (str): The device ID.
output_path (str): The directory of the parsed file. Default: `./`.
Raises:
ProfilerPathErrorException: If the minddata pipeline file path or
the output path is invalid.
ProfilerFileNotFoundException: If the minddata pipeline file or
the output dir does not exist.
"""
_raw_pipeline_file_name = 'pipeline_profiling_{}.json'
_parsed_pipeline_file_name = 'minddata_pipeline_raw_{}.csv'
_col_names = [
'op_id', 'op_type', 'num_workers', 'output_queue_size',
'output_queue_average_size', 'output_queue_length',
'output_queue_usage_rate', 'sample_interval', 'parent_id', 'children_id'
]
def __init__(self, source_dir, device_id, output_path='./'):
self._device_id = device_id
self._pipeline_path = self._get_pipeline_path(source_dir)
self._save_path = self._get_save_path(output_path)
@property
def save_path(self):
"""
The property of save path.
Returns:
str, the save path.
"""
return self._save_path
def parse(self):
"""
Parse the minddata pipeline files.
Raises:
ProfilerRawFileException: If fails to parse the raw file of
minddata pipeline or the file is empty.
"""
with open(self._pipeline_path, 'r') as file:
try:
pipeline_info = json.load(file)
except (json.JSONDecodeError, TypeError) as err:
logger.exception(err)
raise ProfilerRawFileException(
'Fail to parse minddata pipeline file.'
)
if not pipeline_info:
logger.warning('The minddata pipeline file is empty.')
raise ProfilerRawFileException(
'The minddata pipeline file is empty.'
)
self._parse_and_save(pipeline_info)
def _get_pipeline_path(self, source_dir):
"""
Get the minddata pipeline file path.
Args:
source_dir (str): The minddata pipeline source dir.
Returns:
str, the minddata pipeline file path.
"""
pipeline_path = os.path.join(
source_dir,
self._raw_pipeline_file_name.format(self._device_id)
)
try:
pipeline_path = validate_and_normalize_path(pipeline_path, 'profiler')
except ValidationError:
logger.warning('Minddata pipeline file is invalid.')
raise ProfilerPathErrorException('Minddata pipeline file is invalid.')
if not os.path.isfile(pipeline_path):
logger.warning(
'The minddata pipeline file <%s> not found.', pipeline_path
)
raise ProfilerFileNotFoundException(pipeline_path)
return pipeline_path
def _get_save_path(self, output_path):
"""
Get the save path.
Args:
output_path (str): The output dir.
Returns:
str, the save path.
"""
try:
output_dir = validate_and_normalize_path(output_path, 'profiler')
except ValidationError:
logger.warning('Output path is invalid.')
raise ProfilerPathErrorException('Output path is invalid.')
if not os.path.isdir(output_dir):
logger.warning('The output dir <%s> not found.', output_dir)
raise ProfilerDirNotFoundException(output_dir)
return os.path.join(
output_dir, self._parsed_pipeline_file_name.format(self._device_id)
)
def _parse_and_save(self, pipeline_info):
"""
Parse and save the parsed minddata pipeline file.
Args:
pipeline_info (dict): The pipeline info reads from the raw file of
the minddata pipeline.
Raises:
ProfilerRawFileException: If the format of minddata pipeline raw
file is wrong.
"""
sample_interval = pipeline_info.get('sampling_interval')
op_info = pipeline_info.get('op_info')
if sample_interval is None or not op_info:
raise ProfilerRawFileException(
'The format of minddata pipeline raw file is wrong.'
)
op_id_info_cache = {}
for item in op_info:
op_id_info_cache[item.get('op_id')] = item
with open(self._save_path, 'w') as save_file:
csv_writer = csv.writer(save_file)
csv_writer.writerow(self._col_names)
self._parse_and_save_op_info(
csv_writer, op_id_info_cache, sample_interval
)
def _parse_and_save_op_info(self, csv_writer, op_id_info_cache,
sample_interval):
"""
Parse and save the minddata pipeline operator information.
Args:
csv_writer (csv.writer): The csv writer.
op_id_info_cache (dict): The operator id and information cache.
sample_interval (int): The sample interval.
Raises:
ProfilerRawFileException: If the operator that id is 0 does not exist.
"""
queue = Queue()
root_node = op_id_info_cache.get(0)
if not root_node:
raise ProfilerRawFileException(
'The format of minddata pipeline raw file is wrong, '
'the operator that id is 0 does not exist.'
)
root_node['parent_id'] = None
queue.put_nowait(root_node)
while not queue.empty():
node = queue.get_nowait()
self._update_child_node(node, op_id_info_cache)
csv_writer.writerow(self._get_op_info(node, sample_interval))
op_id = node.get('op_id')
children_ids = node.get('children')
if not children_ids:
continue
for child_op_id in children_ids:
sub_node = op_id_info_cache.get(child_op_id)
sub_node['parent_id'] = op_id
queue.put_nowait(sub_node)
def _update_child_node(self, node, op_id_info_cache):
"""
Updates the child node information of the operator.
Args:
node (dict): The node represents an operator.
op_id_info_cache (dict): The operator id and information cache.
"""
child_op_ids = node.get('children')
if not child_op_ids:
return
queue = Queue()
self._cp_list_item_to_queue(child_op_ids, queue)
new_child_op_ids = []
while not queue.empty():
child_op_id = queue.get_nowait()
child_node = op_id_info_cache.get(child_op_id)
if child_node is None:
continue
metrics = child_node.get('metrics')
if not metrics or not metrics.get('output_queue'):
op_ids = child_node.get('children')
if op_ids:
self._cp_list_item_to_queue(op_ids, queue)
else:
new_child_op_ids.append(child_op_id)
node['children'] = new_child_op_ids
def _get_op_info(self, op_node, sample_interval):
"""
Get the operator information.
Args:
op_node (dict): The node represents an operator.
sample_interval (int): The sample interval.
Returns:
list[str, int, float], the operator information.
"""
queue_size = None
queue_average_size = None
queue_length = None
queue_usage_rate = None
metrics = op_node.get('metrics')
if metrics:
output_queue = metrics.get('output_queue')
if output_queue:
queue_size = output_queue.get('size')
queue_average_size = sum(queue_size) / len(queue_size)
queue_length = output_queue.get('length')
queue_usage_rate = queue_average_size / queue_length
children_id = op_node.get('children')
op_info = [
op_node.get('op_id'),
op_node.get('op_type'),
op_node.get('num_workers'),
queue_size,
queue_average_size,
queue_length,
queue_usage_rate,
sample_interval,
op_node.get('parent_id'),
children_id if children_id else None
]
return op_info
def _cp_list_item_to_queue(self, inner_list, queue):
"""
Copy the contents of a list to a queue.
Args:
inner_list (list): The list.
queue (Queue): The target queue.
"""
for item in inner_list:
queue.put_nowait(item)
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Op compute time files parser."""
import os
from mindinsight.profiler.common._utils import fwrite_format
from mindinsight.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \
ProfilerIOException
from mindinsight.profiler.common.log import logger
from mindinsight.profiler.common.validator.validate_path import validate_and_normalize_path
from mindinsight.profiler.parser.container import HWTSContainer
TIMELINE_FILE_COLUMN_TITLE = 'op_name, stream_id, start_time(ms), duration(ms)'
class OPComputeTimeParser:
"""
Join hwts info and framework info, get op time info, and output to the result file.
Args:
hwts_output_file (str): The file path of hwts_output_file. Such as: './output_format_data_hwts_0.txt".
output_filename (str): The output data file path and name. Such as: './output_op_compute_time_0.txt'.
op_task_info (dict): The task and op relation info. The format: {task_id, [opname, stream_id, block dim]}.
"""
_dst_file_title = 'title:op compute time'
_dst_file_column_title = 'op_name compute_time(ms) stream_id'
_dst_file_column_title += '\n------------ --------------- ---------'
def __init__(self, hwts_output_file, output_filename, op_task_info,
output_path, device_id):
hwts_output_file = validate_and_normalize_path(
hwts_output_file, raise_key='Invalid hwts output file path.'
)
self._hwts_output_file = hwts_output_file
self._output_filename = output_filename
self._op_task_info = op_task_info
self._output_path = output_path
self._device_id = device_id
self._min_cycle_counter = float("inf")
def _get_op_task_id_map(self):
"""
Read hwts data file, get the task time info.
Returns:
list: all hwts task time info.
"""
op_map_result = []
hwts_list = []
if not os.path.exists(self._hwts_output_file):
logger.error('The hwts output file does not exist.')
raise ProfilerFileNotFoundException('hwts output file')
with open(self._hwts_output_file, 'r') as data_file:
lines = data_file.readlines()
for line in lines:
if line.startswith("Start of task") or line.startswith("End of task"):
line_split = line.split()
container = HWTSContainer(line_split)
hwts_list.append(container)
# hwts op map by taskId
for hwts in hwts_list:
if hwts.task_id in self._op_task_info.keys():
hwts.op_name = self._op_task_info[hwts.task_id]
op_map_result.append(hwts)
return op_map_result
def execute(self):
"""Execute the parser, compute all op, get op time, and write it to the output file."""
# Calculate the execution time of operators,
# and update the minimum cycle counter.
tmp_result_data = self._calculate_op_execution_time()
# Convert time units from nanoseconds to milliseconds.
# The unit of the cycle counter is 10 nanoseconds.
op_name_time_dict = {}
op_name_stream_dict = {}
op_name_count_dict = {}
op_name_task_dict = {}
op_name_start_time = {}
self._convert_op_time_unit(
tmp_result_data, op_name_time_dict, op_name_stream_dict,
op_name_count_dict, op_name_task_dict, op_name_start_time
)
result_data = ""
total_time = 0
for op_name, time in op_name_time_dict.items():
if op_name in op_name_stream_dict.keys():
stream_id = op_name_stream_dict[op_name]
avg_time = time / op_name_count_dict[op_name]
total_time += avg_time
result_data += ("%s %s %s\n" %(op_name, str(avg_time), stream_id))
result_data += ("total op %s 0" %(str(total_time)))
timeline_data = []
for op_name, time in op_name_time_dict.items():
if op_name in op_name_stream_dict.keys():
stream_id = op_name_stream_dict[op_name]
start_time_list = op_name_start_time.get(op_name)
for (start_time, duration) in start_time_list:
timeline_data.append([op_name, stream_id, start_time, duration])
# Write the metadata of operators into the file,
# including operator name, average time, and stream id.
self._write_op_time_into_file(result_data)
# Write the timeline data into file,
# including operator name, stream id, start time, and duration.
self._write_timeline_data_into_file(timeline_data)
def _write_op_time_into_file(self, result_data):
"""
Write the metadata of operators into the file, including
op name, average time, and stream id.
Args:
result_data (str): The metadata to be written into the file.
'op_name_1', 'avg_time_1', 'stream_id_1',
'op_name_2', 'avg_time_2', 'stream_id_2',
...
"""
fwrite_format(self._output_filename, data_source=self._dst_file_title, is_start=True)
fwrite_format(self._output_filename, data_source=self._dst_file_column_title)
fwrite_format(self._output_filename, data_source=result_data)
def _write_timeline_data_into_file(self, timeline_data):
"""
Write the timeline information into the file, including
operator name, stream id, start time and duration.
Args:
timeline_data (list): The metadata to be written into the file.
[
['op_name_1', 'stream_id_1', 'start_time_1', 'durarion_1'],
['op_name_2', 'stream_id_2', 'start_time_2', 'durarion_2'],
[...]
]
"""
# sorted by start times
timeline_data.sort(key=lambda x: float(x[2]))
filename = 'output_timeline_data_{}.txt'.format(self._device_id)
file_path = os.path.join(self._output_path, filename)
file_path = validate_and_normalize_path(file_path, raise_key='Invalid file path of timeline data.')
# write to file
try:
with open(file_path, 'w') as f_obj:
f_obj.write(TIMELINE_FILE_COLUMN_TITLE + '\n')
for timeline in timeline_data:
timeline = [str(item) for item in timeline]
f_obj.write(','.join(timeline) + '\n')
except (IOError, OSError) as err:
logger.error('Error occurred when writing intermediate timeline file: %s', err)
raise ProfilerIOException
def _calculate_op_execution_time(self):
"""
Calculate the execution time of each operator.
Returns:
list, including the intermediate data of op execution time.
"""
tmp_result_data = []
op_map_list = self._get_op_task_id_map()
cur_index = 0
length = len(op_map_list)
min_cycle_counter = float("inf")
while cur_index < length:
if cur_index + 1 == length:
break
op_start = op_map_list[cur_index]
op_end = op_map_list[cur_index + 1]
if op_start.status == "Start" and op_end.status == "End" \
and op_start.op_name == op_end.op_name:
op_start.duration = op_end.cycle_counter - op_start.cycle_counter
tmp_result_data.append(op_start)
cur_index += 2
if not op_start.op_name.startswith("assign"):
min_cycle_counter = min(min_cycle_counter, op_start.cycle_counter)
else:
cur_index += 1
# Update the value of minimum cycle counter.
self._min_cycle_counter = min_cycle_counter / 1e5 # Convert the time unit from 10ns to 1ms
return tmp_result_data
def _convert_op_time_unit(self, op_data_list, op_name_time_dict, op_name_stream_dict,
op_name_count_dict, op_name_task_dict, op_name_start_time):
"""
Calculate the execution time of operator and convert it into millisecond.
Args:
op_data_list (list): The list of operator metadata.
op_name_time_dict (dict): The mapping relation of operator name and its execution time.
op_name_stream_dict (dict): The mapping relation of operator name and its stream id.
op_name_count_dict (dict): The mapping relation of operator name and its count.
op_name_task_dict (dict): The mapping relation of operator name and its task id.
op_name_start_time (dict): The mapping relation of operator name and its start time.
"""
factor = 1e5
for item in op_data_list:
op_name = item.op_name
# Unit conversion: converting the cycle counter into ms.
op_start_time_str = str(item.cycle_counter / factor)
op_duration = item.duration / factor
op_duration_str = str(item.duration / factor)
if op_name in op_name_time_dict.keys():
op_name_time_dict[op_name] += op_duration
if item.task_id == op_name_task_dict[op_name]:
op_name_count_dict[op_name] += 1
op_name_start_time[op_name].append(
(op_start_time_str, op_duration_str)
)
else:
op_name_time_dict[op_name] = op_duration
op_name_stream_dict[op_name] = item.stream_id
op_name_task_dict[op_name] = item.task_id
op_name_count_dict[op_name] = 1
op_name_start_time[op_name] = []
op_name_start_time[op_name].append(
(op_start_time_str, op_duration_str)
)
@property
def min_cycle_counter(self):
"""Get minimum cycle counter."""
return self._min_cycle_counter
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""The parser for step trace data."""
import csv
import json
import os
import stat
import struct
from collections import namedtuple
from decimal import Decimal
from mindinsight.profiler.common.exceptions.exceptions import ProfilerPathErrorException, \
JobIdMismatchException, ProfilerIOException
from mindinsight.profiler.common.log import logger as log
from mindinsight.profiler.common.util import get_summary_for_step_trace
StepTraceStruct = namedtuple(
'TrainingTraceStruct', ['tag_id', 'task_id', 'stream_id', 'sys_count']
)
class StepTraceParser:
"""
The parser for step trace data.
Args:
input_dir (str): The directory that contains original step trace data.
output_file_path (str): The output file path.
job_id (int): The job id used to define the start of new step. Default: 0.
skip_first_step (bool): Whether skip the first step or not.
"""
_event_size = 20
_fp_tag = 1
_bp_tag = 2
_end_tag = 255
def __init__(self, input_dir, output_file_path, job_id=0, skip_first_step=False):
self._input_dir = input_dir
self._output_path = output_file_path
self._job_id = job_id
self._skip_first_step = skip_first_step
self._result = []
self._header = []
self._step_num = 0
self._tag_map = {}
@property
def output_file(self):
"""The property of step trace header."""
file_name = self._output_path.rsplit('/', 2)
return file_name[-1] if len(file_name) == 3 else ''
def show(self):
"""The property of step trace info."""
summary_info = {}
if self._result:
summary_info = get_summary_for_step_trace(self._result[-1], self._header)
summary_info['total_steps'] = len(self._result) - 1
print('\nStep trace summary info (unit: syscnt):')
print(summary_info)
print('\nThe step trace parse result saves under ${summary_dir}/profiler/%s'
% self.output_file)
def parse_and_save(self):
"""Parse step trace files and save the result."""
try:
source_files = self._get_step_trace_files()
self._parse(source_files)
self._save()
except IOError as err:
log.exception(err)
raise ProfilerIOException()
else:
log.info("Finish to save intermediate result for step trace file.")
def record_point_info(self, point_info, output_path):
"""
Record point info into json.
Args:
point_info (dict): The point info about tag id and relative op name.
output_path (str): The output path for saving point info.
Returns:
dict, parsed point info.
"""
points = {
'fp_start': point_info.get(self._fp_tag, ''),
'bp_end': point_info.get(self._bp_tag, '')
}
try:
with open(output_path, 'w') as json_file:
json.dump(points, json_file)
os.chmod(output_path, stat.S_IREAD)
except (IOError, OSError) as err:
log.warning('Failed to save point info. %s', err)
raise ProfilerIOException
return points
def update_tag_op_type_map(self, point_info):
"""
update the map from tag id to op type.
Args:
point_info (dict): The point info about tag id and relative op name.
"""
tag_map = {}
for tag, op_name in point_info.items():
op_type = self._get_op_type(tag, op_name)
tag_map[tag] = op_type
log.info("Get tag types for step trace analysis: %s", tag_map)
self._tag_map = tag_map
def _get_op_type(self, tag, name):
"""
Get op type from tag and name.
Args:
tag (int): The tag id.
name (str): The op name.
Returns:
str, the op type.
"""
tag_map = {self._fp_tag: 'fp', self._bp_tag: 'bp', self._end_tag: 'end'}
# get solid tag type
op_type = tag_map.get(tag, '')
if op_type:
return op_type
# check if the tag is step tag.
if tag > self._end_tag or tag == 0:
return 'start'
# analyze the reduce tag
op_type = name.rsplit('/', 1)[-1].split('-')[0]
if not op_type:
log.warning("Unexpected op name:%s", name)
return op_type
def _get_step_trace_files(self):
"""Get step trace files."""
# step trace files may under $profiler_dir or $profiler_dir/data
profiler_dir = self._input_dir
step_trace_files = self._search_file(profiler_dir)
if not step_trace_files:
# try to find step trace files under $profiler_dir/data
profiler_dir = os.path.join(profiler_dir, 'data')
step_trace_files = self._search_file(profiler_dir)
if not step_trace_files:
raise ProfilerPathErrorException('Training trace file does not exist.')
return step_trace_files
@staticmethod
def _search_file(input_dir):
"""Search step trace file under specific input directory."""
# validate input_dir
if not os.path.isdir(input_dir):
raise ProfilerPathErrorException(
'{} does not exist or is not a dir'.format(input_dir)
)
# get step trace files
files = os.listdir(input_dir)
step_trace_files = list(
filter(
lambda file: file.startswith('training_trace') and not file.endswith('.done'),
files
)
)
# validate result
if len(step_trace_files) > 1:
# the format of file name is like
# `training_trace.46.dev.profiler_default_tag.$id.slice_$number`
# use the $number as the sorted key
try:
step_trace_files.sort(key=lambda path: int(path.rsplit('_', 1)[-1]))
except ValueError as err:
log.warning("Unable to parse file names: %s. %s", step_trace_files, err)
step_trace_files = []
file_paths = [os.path.join(input_dir, file) for file in step_trace_files]
log.info("Find %d step trace files.", len(file_paths))
return file_paths
def _parse(self, source_files):
"""Parse source step trace files."""
log.info("Start to parse step trace file.")
event_info = {}
for source_file in source_files:
with open(source_file, 'rb') as handler:
content = handler.read()
for step_trace in self._get_next_step_trace(content, event_info):
if self._skip_first_step:
self._skip_first_step = False
continue
self._record_trace_event(step_trace)
self._record_average_info()
log.info("Finish to parse step trace file.")
def _get_next_step_trace(self, content, event_info):
"""
Get next step trace info.
Args:
content (bytes): The input step trace info.
event_info (dict): The event info.
Returns:
Generator, return the step trace one by one.
"""
for pos in range(0, len(content), 20):
next_event = self._get_trace_struct(content[pos:pos + self._event_size])
self._construct_event_info(next_event, event_info)
if event_info.get('end'):
yield event_info
def _get_trace_struct(self, bin_info):
"""Translate event info to StepTraceStruct."""
if len(bin_info) == self._event_size:
parsed_info = struct.unpack('=QHHQ', bin_info)
return StepTraceStruct(*parsed_info)
return None
def _construct_event_info(self, next_event, event_info):
"""Construct event info according to next_event."""
min_job_id = 255
step_flag: bool = lambda tag: tag > min_job_id or tag == 0
end_flag: bool = lambda tag: tag == min_job_id
fp_flag: bool = lambda tag: tag == self._fp_tag
bp_flag: bool = lambda tag: tag == self._bp_tag
def _on_step_event():
"""Handle step event."""
self._validate_tag_id(tag_id)
start_time = event_info.get('end', '-')
event_info.clear()
event_info['start'] = start_time
event_info['reduce'] = {}
def _on_reduce_event(reduce_tag_id):
"""Handle reduce event."""
stream_id = next_event.stream_id
if event_info['reduce'].get(stream_id):
event_info['reduce'][stream_id].append((reduce_tag_id, sys_count))
else:
event_info['reduce'][stream_id] = [(reduce_tag_id, sys_count)]
tag_id = next_event.tag_id
sys_count = next_event.sys_count
if end_flag(tag_id):
event_info['end'] = sys_count
elif step_flag(tag_id):
_on_step_event()
elif fp_flag(tag_id):
event_info['fp'] = sys_count
elif bp_flag(tag_id):
event_info['bp'] = sys_count
else:
_on_reduce_event(tag_id)
def _validate_tag_id(self, job_id):
"""Check the job id in source step trace file is same as user set."""
if not self._job_id:
self._job_id = job_id
elif self._job_id != job_id:
raise JobIdMismatchException()
def _record_trace_event(self, step_trace):
"""Record trace event."""
self._step_num += 1
start_time = step_trace.get('start')
end_time = step_trace.get('end')
fp_time = step_trace.get('fp')
bp_time = step_trace.get('bp')
if not (start_time and end_time and fp_time and bp_time):
log.warning("The step %d lacks basic time.", self._step_num)
return
if start_time == '-':
start_time = fp_time
row_data = {
'step_num': self._step_num,
'start_point': start_time,
'end_point': end_time,
'total': end_time - start_time,
'fp_point': fp_time,
'bp_point': bp_time,
'iteration_interval': fp_time - start_time,
'fp_and_bp': bp_time - fp_time,
'tail': end_time - bp_time
}
# update reduce info
self._update_reduce_info(step_trace, row_data)
# save the row data
if not self._header:
self._header = list(row_data.keys())
row_data_list = [row_data.get(header_name, 0) for header_name in self._header]
self._result.append(row_data_list)
def _update_reduce_info(self, step_trace, row_data):
"""Extract reduce info."""
reduce_time = step_trace.get('reduce', {})
for stream_id, time_points in reduce_time.items():
time_point_num = len(time_points)
if time_point_num % 2:
log.warning("Stream %d has %d reduce time points.", stream_id, time_point_num)
continue
for index, point_id in enumerate(range(0, time_point_num, 2)):
field_name = f'stream_{stream_id}_{index}'
reduce_info = self._get_single_reduce_event_info(
field_name, time_points[point_id], time_points[point_id + 1])
row_data.update(reduce_info)
def _get_single_reduce_event_info(self, field_name, start_point, end_point):
"""
Get single reduce info.
Args:
field_name (str): The field name.
start_point (Tuple[int, int]): Start point time info, including (tag_id, sys_count).
end_point (Tuple[int, int]): End point time info, including (tag_id, sys_count).
Returns:
dict, reduce info.
"""
reduce_info = {}
if end_point[0] - start_point[0] != 1 or end_point[0] % 2:
log.warning("Unmatched reduce event <%s, %s>.", start_point, end_point)
return reduce_info
op_type = self._tag_map.get(start_point[0])
# append field name with op type.
if not op_type:
log.warning("Can't recognize the inner type for point tag: %d.", start_point[0])
field_name += '_parallel'
else:
field_name += '_' + op_type
reduce_info[field_name] = end_point[1] - start_point[1]
reduce_info[field_name + '_start_point'] = start_point[1]
reduce_info[field_name + '_end_point'] = end_point[1]
return reduce_info
def _record_average_info(self):
"""Calculate average info."""
result_size = len(self._result)
# calculate average data for each column in result data
average_data = [0] * len(self._header)
if result_size >= 2:
for row_info in self._result[1:]:
average_data = [
Decimal(i) + Decimal(j) for i, j in zip(row_info, average_data)
]
average_data = [
round((item / (result_size - 1))) for item in average_data
]
# change step num info in average_data to None
step_num_index = self._header.index('step_num')
average_data[step_num_index] = '-'
self._result.append(average_data)
log.info("Finish add average info for step trace.")
def _save(self):
log.info("Start to save step trace file.")
if not self._header:
return
with open(self._output_path, 'w') as file_handle:
csv_writer = csv.writer(file_handle)
csv_writer.writerow(self._header)
for row_data in self._result:
csv_writer.writerow(row_data)
os.chmod(self._output_path, stat.S_IREAD)
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Profiling api file."""
import os
import time
from marshmallow import ValidationError
from tabulate import tabulate
from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory
from mindinsight.profiler.analyser.integrator import Integrator
from mindinsight.profiler.common._utils import get_file_names, fwrite_format
from mindinsight.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \
ProfilerIOException
from mindinsight.profiler.common.log import logger
from mindinsight.profiler.common.validator.checkparam import \
check_bool, check_subgraph
from mindinsight.profiler.common.validator.validate_path import \
validate_and_normalize_path
from mindinsight.profiler.parser.aicpu_data_parser import DataPreProcessParser
from mindinsight.profiler.parser.framework_parser import FrameworkParser
from mindinsight.profiler.parser.hwts_log_parser import HWTSLogParser
from mindinsight.profiler.parser.minddata_parser import MinddataParser
from mindinsight.profiler.parser.minddata_pipeline_parser import \
MinddataPipelineParser
from mindinsight.profiler.parser.optime_parser import OPComputeTimeParser
from mindinsight.profiler.parser.step_trace_parser import StepTraceParser
from mindinsight.utils.exceptions import MindInsightException
PROFILING_LOG_BASE_PATH = "/var/log/npu/profiling"
INIT_OP_NAME = 'Default/InitDataSetQueue'
class Profiler:
"""
Performance profiling API.
Enable MindSpore users to profile the performance of neural network.
Args:
subgraph (str): Define which subgraph to monitor and analyse, can be 'all', 'Default', 'Gradients'.
is_detail (bool): Whether to show profiling data for op_instance level, only show optype level if False.
is_show_op_path (bool): Whether to save the full path for each op instance.
output_path (str): Output data path.
optypes_to_deal (str): Op type names, the data of which optype should be collected and analysed,
will deal with all op if null; Different op types should be seperated by comma.
optypes_not_deal (str): Op type names, the data of which optype will not be collected and analysed;
Different op types should be seperated by comma.
Examples:
>>> from mindinsight.profiler import Profiler
>>> context.set_context(mode=context.GRAPH_MODE, device_target="Ascend",
>>> device_id=int(os.environ["DEVICE_ID"]))
>>> profiler = Profiler(subgraph='all', is_detail=True, is_show_op_path=False, output_path='./data')
>>> model = Model(train_network)
>>> dataset = get_dataset()
>>> model.train(2, dataset)
>>> profiler.analyse()
"""
_base_profiling_container_path = "/var/log/npu/profiling/container"
_hwts_output_filename_target = "output_format_data_hwts_"
_opcompute_output_filename_target = "output_op_compute_time_"
_aicpu_op_output_filename_target = "output_data_preprocess_aicpu_"
def __init__(self, subgraph='all', is_detail=True, is_show_op_path=False, output_path='./data',
optypes_to_deal='', optypes_not_deal='Variable', job_id=""):
# get device_id and device_target
self._get_devid_and_devtarget()
self._container_path = os.path.join(self._base_profiling_container_path, self._dev_id)
data_path = os.path.join(self._container_path, "data")
if not os.path.exists(data_path):
os.makedirs(data_path, exist_ok=True)
self._output_path = validate_and_normalize_path(output_path,
'Profiler output path (' + output_path + ')')
self._output_path = os.path.join(self._output_path, "profiler")
if not os.path.exists(self._output_path):
os.makedirs(self._output_path, exist_ok=True)
os.environ['PROFILING_MODE'] = 'true'
os.environ['PROFILING_OPTIONS'] = 'training_trace:task_trace'
os.environ['MINDDATA_PROFILING_DIR'] = self._output_path
os.environ['DEVICE_ID'] = self._dev_id
# use context interface to open profiling, for the new mindspore version(after 2020.5.21)
try:
import mindspore.context as context
context.set_context(enable_profiling=True, profiling_options="training_trace:task_trace")
except ImportError:
logger.error("Profiling: fail to import context from mindspore.")
except ValueError:
logger.error("Profiling: fail to set context enable_profiling")
os.environ['AICPU_PROFILING_MODE'] = 'true'
os.environ['PROFILING_DIR'] = str(self._container_path)
self._subgraph = check_subgraph(subgraph)
self._valid_optype_name = optypes_to_deal.split(",") if optypes_to_deal else []
self._filt_optype_names = optypes_not_deal.split(",") if optypes_not_deal else []
self._detail = check_bool(is_detail, 'is_detail')
self._withfullpath = check_bool(is_show_op_path, 'is_show_op_path')
self._profiling_job_id = job_id
# add job id env through user input later
self._job_id_env = 0
self._start_time = int(time.time() * 10000000)
logger.info("Profiling: profiling start time: %d", self._start_time)
def analyse(self):
"""
Collect and analyse performance data, called after training or during training.
Examples:
>>> from mindinsight.profiler import Profiler
>>> context.set_context(mode=context.GRAPH_MODE, device_target="Ascend",
>>> device_id=int(os.environ["DEVICE_ID"]))
>>> profiler = Profiler(subgraph='all', is_detail=True, is_show_op_path=False, output_path='./data')
>>> model = Model(train_network)
>>> dataset = get_dataset()
>>> model.train(2, dataset)
>>> profiler.analyse()
"""
try:
from mindspore.communication.management import release
release()
except ImportError:
logger.error("Profiling: fail to import release from mindspore.")
job_id = self._get_profiling_job_id()
logger.info("Profiling: job id is %s ", job_id)
source_path = os.path.join(PROFILING_LOG_BASE_PATH, job_id)
# parse hwts.log.data.45.dev file, and get task profiling data
hwts_output_filename = self._hwts_output_filename_target + self._dev_id + ".txt"
hwts_output_filename = os.path.join(self._output_path, hwts_output_filename)
hwtslog_parser = HWTSLogParser(source_path, hwts_output_filename)
result = hwtslog_parser.execute()
if not result:
logger.error("Profiling: fail to parse hwts log file.")
return
# parse Framework file, and get the relation of op and tasks
framework_parser = FrameworkParser(job_id, self._dev_id, self._output_path)
framework_parser.parse()
op_task_dict = framework_parser.to_task_id_full_op_name_dict()
if not op_task_dict:
logger.error("Profiling: fail to parse framework files.")
return
# get op compute time from hwts data and framework data, write output_op_compute_time.txt
opcompute_output_filename = self._opcompute_output_filename_target + self._dev_id + ".txt"
opcompute_output_filename = os.path.join(self._output_path, opcompute_output_filename)
optime_parser = OPComputeTimeParser(
hwts_output_filename, opcompute_output_filename,
op_task_dict, self._output_path, self._dev_id
)
optime_parser.execute()
# parse DATA_PREPROCESS.dev.AICPU file, write output_data_preprocess_aicpu_x.txt
output_data_preprocess_aicpu = self._aicpu_op_output_filename_target + self._dev_id + ".txt"
output_data_preprocess_aicpu = os.path.join(self._output_path, output_data_preprocess_aicpu)
aicpu_data_parser = DataPreProcessParser(source_path, output_data_preprocess_aicpu)
aicpu_data_parser.execute()
# Parsing minddata AICPU profiling
MinddataParser.execute(source_path, self._output_path, self._dev_id)
# parse minddata pipeline operator and queue
try:
pipeline_parser = MinddataPipelineParser(self._output_path, self._dev_id, self._output_path)
pipeline_parser.parse()
except MindInsightException as err:
logger.warning(err.message)
# analyse op compute time info
try:
self._analyser_op_info()
except MindInsightException as err:
logger.warning(err.message)
# analyse step trace info
try:
self._analyse_step_trace(source_path, framework_parser)
except MindInsightException as err:
logger.warning(err.message)
# analyse timeline info
try:
self._analyse_timeline(aicpu_data_parser, optime_parser)
except (ProfilerIOException, ProfilerFileNotFoundException, ValidationError) as err:
logger.warning('Fail to write timeline data: %s', err)
def _analyse_step_trace(self, source_path, framework_parser):
"""
Analyse step trace data and save the result.
Args:
source_path (str): The directory that contains the step trace original data.
framework_parser (FrameworkParser): The framework parse instance.
"""
logger.info("Begin to parse step trace.")
# construct output path
step_trace_intermediate_file_path = os.path.join(
self._output_path,
f'step_trace_raw_{self._dev_id}_detail_time.csv'
)
point_info_file_path = os.path.join(
self._output_path,
'step_trace_point_info.json'
)
# whether keep the first step
skip_first_step_flag = framework_parser.check_op_name(INIT_OP_NAME)
point_info = framework_parser.point_info
# parser the step trace files and save the result to disk
parser = StepTraceParser(input_dir=source_path,
output_file_path=step_trace_intermediate_file_path,
job_id=self._job_id_env,
skip_first_step=skip_first_step_flag)
parser.update_tag_op_type_map(point_info)
parser.parse_and_save()
point_info = parser.record_point_info(point_info, point_info_file_path)
# print parser result
parser.show()
logger.info("Finish saving the intermediate result: %s", step_trace_intermediate_file_path)
logger.info("The point info is: %s", point_info)
def _analyse_timeline(self, aicpu_parser, optime_parser):
"""
Analyse and parse timeline info.
Args:
aicpu_parser (DataPreProcessParser): The parser instance for AI CPU operator
execution time calculation.
optime_parser (OPComputeTimeParserParser): The parser instance for AI Core
operator execution time calculation.
"""
timeline_analyser = AnalyserFactory.instance().get_analyser(
'timeline', self._output_path, self._dev_id
)
# Get framework info
aicoredetail_analyser = AnalyserFactory.instance().get_analyser(
'aicore_detail', self._output_path, self._dev_id
)
framework_info = aicoredetail_analyser.query()
# Get all reduce info
step_trace_analyser = AnalyserFactory.instance().get_analyser(
'step_trace', self._output_path, self._dev_id
)
all_reduce_info = step_trace_analyser.query_for_all_reduce()
# Get timeline info
logger.info('Start writing timeline info...')
logger.info('Warm Prompt: It could take a few minutes if you are training '
'with a complex network or more than 10 steps.')
# Add info into timeline, such as AI CPU, AllReduce, framework info.
aicpu_info = aicpu_parser.query_aicpu_data()
min_cycle_counter = min(aicpu_parser.min_cycle_counter, optime_parser.min_cycle_counter)
timeline_analyser.init_timeline(all_reduce_info, framework_info, aicpu_info, min_cycle_counter)
timeline_analyser.write_timeline()
timeline_analyser.write_timeline_summary()
def __del__(self):
"""Disable the profiling collection service, called after training."""
os.environ['PROFILING_MODE'] = str("false")
try:
import mindspore.context as context
context.set_context(enable_profiling=False)
except ImportError:
pass
def _get_profiling_job_id(self):
"""Get profiling job id, which was generated by ada service.
Returns:
str: profiling jon id.
"""
if self._profiling_job_id:
return self._profiling_job_id
job_id = ""
cmd = "ls -t " + PROFILING_LOG_BASE_PATH + "|grep JOB|awk '{print $1}'"
r = os.popen(cmd)
profiling_job_dirs = r.readlines()
r.close()
for item in profiling_job_dirs:
path = os.path.join(PROFILING_LOG_BASE_PATH, item.strip())
log_file = get_file_names(path, "host_start.log")
if not log_file:
logger.error("Profiling: job path %s, host_start.log not exist.", path)
continue
log_file = os.path.join(path, log_file[0])
item_dict = self._parse_host_start_log(log_file)
if not item_dict:
logger.error("Profiling: job path %s, fail to get job start info.", path)
continue
if self._start_time > int(item_dict["start_time"]):
logger.info("Profiling: job path %s, start_time %s, training start_time %d.",
path, item_dict["start_time"], self._start_time)
break
if self._dev_id != item_dict["device_id"]:
logger.info("Profiling: job path %s, dev id %s, training device id %s.",
path, item_dict["device_id"], self._dev_id)
continue
job_id = item.strip()
break
if not job_id:
msg = ("Fail to get profiling job, please check whether job dir was generated")
raise RuntimeError(msg)
return job_id
def _parse_host_start_log(self, input_file):
"""
Parse host start log file, get the device id and start time of the job.
Args:
input_file (str): The file path of the host start log file.
Returns:
dict, job start time and device id.
"""
item_dict = {}
for line in open(input_file):
if "Device" in line:
item_dict["device_id"] = line[7:len(line)-2]
elif "clock_realtime" in line:
item_dict["start_time"] = line[16:len(line)-3]
return item_dict
def _analyser_op_info(self):
"""Analyse the operator information."""
integrator = Integrator(self._output_path, self._dev_id)
integrator.integrate()
aicore_type_result = self._query_op_type_info()
detail_file_path = os.path.join(
self._output_path,
'output_op_compute_time_detail_{}.txt'.format(self._dev_id)
)
fwrite_format(detail_file_path, data_source='title:op compute time')
display_names = [
'optype_name', 'compute_time(ms, per-step)',
'called_times(per-step)', 'percent'
]
data_source = tabulate(aicore_type_result, display_names)
fwrite_format(detail_file_path, data_source=data_source, is_print=True)
if self._detail:
op_type_order = [item[0] for item in aicore_type_result]
aicore_detail_result = self._query_op_detail_info(op_type_order)
fwrite_format(detail_file_path, data_source='', is_print=True)
fwrite_format(detail_file_path, data_source='Detail:', is_print=True)
data_source = tabulate(
aicore_detail_result.get('object'),
aicore_detail_result.get('col_name')
)
fwrite_format(detail_file_path, data_source=data_source, is_print=True)
def _query_op_type_info(self):
"""
Query AICORE operator type information.
Returns:
list[list], the AICORE operator type and execution time information.
"""
condition = {
'sort_condition': {
'name': 'execution_time',
'type': 'descending'
}
}
analyser = AnalyserFactory.instance().get_analyser(
'aicore_type', self._output_path, self._dev_id
)
result = analyser.query(condition)
return result.get('object')
def _query_op_detail_info(self, op_type_order):
"""
Query AICORE operator detail information.
Args:
op_type_order(list): The name of the op type in order.
Returns:
dict, the AICORE operator detail information.
"""
op_type_condition = {}
if self._valid_optype_name:
op_type_condition['in'] = self._valid_optype_name
if self._filt_optype_names:
op_type_condition['not_in'] = self._filt_optype_names
subgraph_condition = {}
if self._subgraph != 'all':
subgraph_condition['in'] = [self._subgraph]
filter_condition = {
'op_type': op_type_condition,
'subgraph': subgraph_condition,
'is_display_detail': False,
'is_display_full_op_name': self._withfullpath
}
analyser = AnalyserFactory.instance().get_analyser(
'aicore_detail', self._output_path, self._dev_id
)
result = analyser.query_and_sort_by_op_type(
filter_condition, op_type_order
)
return result
def _get_devid_and_devtarget(self):
"""Get device id and target of this training."""
device_target = ""
dev_id = ""
try:
import mindspore.context as context
dev_id = str(context.get_context("device_id"))
device_target = context.get_context("device_target")
except ImportError:
logger.error("Profiling: fail to import context from mindspore.")
except ValueError as err:
logger.error("Profiling: fail to get context, %s", err)
if not dev_id or not dev_id.isdigit():
dev_id = os.getenv('DEVICE_ID')
if not dev_id or not dev_id.isdigit():
dev_id = "0"
logger.error("Fail to get DEVICE_ID, use 0 instead.")
if device_target and device_target != "Davinci" \
and device_target != "Ascend":
msg = ("Profiling: unsupport backend: %s" \
% device_target)
raise RuntimeError(msg)
self._dev_id = dev_id
......@@ -15,31 +15,11 @@
"""The st config."""
import os
import shutil
import sys
import tempfile
import pytest
from tests.st.func.profiler import RAW_DATA_BASE
from tests.utils import mindspore
sys.modules['mindspore'] = mindspore
BASE_SUMMARY_DIR = tempfile.mkdtemp(prefix='test_profiler_summary_dir_base_')
@pytest.fixture(scope="session")
def create_summary_dir():
"""Create summary directory for profiler module."""
try:
if os.path.exists(BASE_SUMMARY_DIR):
shutil.rmtree(BASE_SUMMARY_DIR)
permissions = os.R_OK | os.W_OK | os.X_OK
mode = permissions << 6
if not os.path.exists(BASE_SUMMARY_DIR):
os.mkdir(BASE_SUMMARY_DIR, mode=mode)
yield
finally:
if os.path.exists(BASE_SUMMARY_DIR):
shutil.rmtree(BASE_SUMMARY_DIR)
BASE_SUMMARY_DIR = os.path.realpath(os.path.join(RAW_DATA_BASE, "run_1"))
......@@ -21,19 +21,16 @@ Usage:
"""
import os
from unittest import mock, TestCase
from unittest import TestCase
import pytest
from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory
from mindinsight.profiler.common.exceptions.exceptions import StepNumNotSupportedException, \
ProfilerParamValueErrorException
from mindinsight.profiler.profiling import Profiler, FrameworkParser
from tests.st.func.profiler import RAW_DATA_BASE
from tests.st.func.profiler.conftest import BASE_SUMMARY_DIR
@pytest.mark.usefixtures('create_summary_dir')
class TestProfilerAnalyse(TestCase):
"""Test Converter module."""
JOB_ID = 'JOB3'
......@@ -42,26 +39,14 @@ class TestProfilerAnalyse(TestCase):
def setup_class(cls):
"""Generate parsed files."""
cls.step_trace_file = 'step_trace_raw_1_detail_time.csv'
cls.generate_parsed_files()
cls.summary_dir = os.path.join(BASE_SUMMARY_DIR, 'normal_run')
cls.profiler = os.path.join(cls.summary_dir, 'profiler')
def setUp(self):
"""Setup before each test."""
self.step_trace_analyser = AnalyserFactory.instance().get_analyser(
'step_trace', self.profiler, '1')
@classmethod
def generate_parsed_files(cls):
"""Test parse raw info about profiler."""
cls.summary_dir = os.path.join(BASE_SUMMARY_DIR, 'normal_run')
cls.profiler = os.path.join(cls.summary_dir, 'profiler')
FrameworkParser._raw_data_dir = RAW_DATA_BASE
if not os.path.exists(cls.summary_dir):
os.makedirs(cls.summary_dir)
Profiler._base_profiling_container_path = os.path.join(RAW_DATA_BASE, 'container')
with mock.patch('mindinsight.profiler.profiling.PROFILING_LOG_BASE_PATH', RAW_DATA_BASE):
profiler = Profiler(subgraph='all', is_detail=True, is_show_op_path=False,
output_path=cls.summary_dir, job_id=cls.JOB_ID)
profiler.analyse()
@pytest.mark.level0
@pytest.mark.env_single
......@@ -108,7 +93,7 @@ class TestProfilerAnalyse(TestCase):
assert len(res['training_trace_graph']) == 13
assert res['training_trace_graph'][-1] == [
{'name': '', 'start': 0.2038, 'duration': 118.1667},
{'name': 'stream_540_0_parallel', 'start': 118.3705, 'duration': 49.281},
{'name': 'stream_540_parallel_0', 'start': 118.3705, 'duration': 49.281},
{'name': '', 'start': 167.6515, 'duration': 37.7294}]
@pytest.mark.level0
......
......@@ -19,19 +19,13 @@ Usage:
pytest tests/st/func/profiler
"""
import os
import shutil
from unittest import mock
import pytest
from mindinsight.profiler import Profiler
from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory
from mindinsight.profiler.parser.framework_parser import FrameworkParser
from tests.st.func.profiler.conftest import BASE_SUMMARY_DIR
from tests.ut.profiler import RAW_DATA_BASE
@pytest.mark.usefixtures('create_summary_dir')
class TestMinddataPipelineAnalyser:
"""Test minddata pipeline analyser module."""
JOB_ID = 'JOB3'
......@@ -39,29 +33,14 @@ class TestMinddataPipelineAnalyser:
@classmethod
def setup_class(cls):
"""Generate parsed files."""
cls.generate_parsed_files()
cls.summary_dir = os.path.join(BASE_SUMMARY_DIR, 'normal_run')
cls.profiler = os.path.join(cls.summary_dir, 'profiler')
def setup_method(self):
"""Create analyser."""
self._analyser = AnalyserFactory.instance().get_analyser(
'minddata_pipeline', self.profiler, '1')
@classmethod
def generate_parsed_files(cls):
"""Test parse raw info about profiler."""
cls.summary_dir = os.path.join(BASE_SUMMARY_DIR, 'normal_run')
cls.profiler = os.path.join(cls.summary_dir, 'profiler')
FrameworkParser._raw_data_dir = RAW_DATA_BASE
if not os.path.exists(cls.summary_dir):
os.makedirs(cls.summary_dir)
os.makedirs(cls.profiler, exist_ok=True)
pipeline_path = os.path.join(RAW_DATA_BASE, 'profiler', 'pipeline_profiling_1.json')
shutil.copy(pipeline_path, cls.profiler)
Profiler._base_profiling_container_path = os.path.join(RAW_DATA_BASE, 'container')
with mock.patch('mindinsight.profiler.profiling.PROFILING_LOG_BASE_PATH', RAW_DATA_BASE):
profiler = Profiler(subgraph='all', is_detail=True, is_show_op_path=False,
output_path=cls.summary_dir, job_id=cls.JOB_ID)
profiler.analyse()
@pytest.mark.level0
@pytest.mark.env_single
......
......@@ -19,16 +19,11 @@ Usage:
pytest tests/st/func/profiler
"""
import os
from unittest import mock
import pytest
from mindinsight.profiler import Profiler
from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory
from mindinsight.profiler.parser.framework_parser import FrameworkParser
from tests.st.func.profiler.conftest import BASE_SUMMARY_DIR
from tests.ut.profiler import RAW_DATA_BASE
OP_GATHER_V2_INFO = {
'col_name': [
......@@ -84,7 +79,6 @@ OP_GATHER_V2_INFO = {
}
@pytest.mark.usefixtures('create_summary_dir')
class TestOpAnalyser:
"""Test AICORE and AICPU analyser module."""
JOB_ID = 'JOB3'
......@@ -92,7 +86,8 @@ class TestOpAnalyser:
@classmethod
def setup_class(cls):
"""Generate parsed files."""
cls.generate_parsed_files()
cls.summary_dir = os.path.join(BASE_SUMMARY_DIR, 'normal_run')
cls.profiler = os.path.join(cls.summary_dir, 'profiler')
def setup_method(self):
"""Create analyser."""
......@@ -101,20 +96,6 @@ class TestOpAnalyser:
self._analyser_aicore_detail = AnalyserFactory.instance().get_analyser(
'aicore_detail', self.profiler, '1')
@classmethod
def generate_parsed_files(cls):
"""Test parse raw info about profiler."""
cls.summary_dir = os.path.join(BASE_SUMMARY_DIR, 'normal_run')
cls.profiler = os.path.join(cls.summary_dir, 'profiler')
FrameworkParser._raw_data_dir = RAW_DATA_BASE
if not os.path.exists(cls.summary_dir):
os.makedirs(cls.summary_dir)
Profiler._base_profiling_container_path = os.path.join(RAW_DATA_BASE, 'container')
with mock.patch('mindinsight.profiler.profiling.PROFILING_LOG_BASE_PATH', RAW_DATA_BASE):
profiler = Profiler(subgraph='all', is_detail=True, is_show_op_path=False,
output_path=cls.summary_dir, job_id=cls.JOB_ID)
profiler.analyse()
@pytest.mark.level0
@pytest.mark.env_single
@pytest.mark.platform_x86_cpu
......
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Test the aicpu parser."""
import os
import tempfile
import shutil
from unittest import TestCase
from mindinsight.profiler.parser.aicpu_data_parser import DataPreProcessParser
def get_result(file_path):
"""
Get result from the aicpu file.
Args:
file_path (str): The aicpu file path.
Returns:
list[list], the parsed aicpu information.
"""
result = []
try:
file = open(file_path, 'r')
result.append(file.read())
return result
finally:
if file:
file.close()
class TestAicpuParser(TestCase):
"""Test the class of Aicpu Parser."""
def setUp(self) -> None:
"""Initialization before test case execution."""
self.profiling_dir = os.path.realpath(os.path.join(os.path.dirname(__file__),
'../../../utils/resource/'
'JOB_AICPU/data'))
self.expect_dir = os.path.realpath(os.path.join(os.path.dirname(__file__),
'../../../utils/resource/'
'JOB_AICPU/expect'))
self.output_path = tempfile.mkdtemp(prefix='output_data_preprocess_aicpu_')
self.output_file = os.path.join(self.output_path, 'output_data_preprocess_aicpu_0.txt')
self.expect_file = os.path.join(self.expect_dir, 'output_data_preprocess_aicpu_0.txt')
def test_aicpu_parser(self):
"""Test the class of Aicpu Parser."""
data = DataPreProcessParser(self.profiling_dir, self.output_file)
data.execute()
expect_result = get_result(self.expect_file)
result = get_result(self.output_file)
shutil.rmtree(self.output_path)
assert expect_result == result
def test_aicpu_parser_file_not_exist(self):
"""Test the class of Aicpu Parser."""
profiling_dir = os.path.realpath(os.path.join(self.profiling_dir, 'data'))
data = DataPreProcessParser(profiling_dir, self.output_file)
data.execute()
shutil.rmtree(self.output_path)
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Test the framework parser module."""
import csv
import os
import shutil
import tempfile
from unittest import mock
import pytest
from marshmallow import ValidationError
from mindinsight.profiler.common.exceptions.exceptions import \
ProfilerPathErrorException, ProfilerDirNotFoundException, \
ProfilerFileNotFoundException
from mindinsight.profiler.parser.framework_parser import FrameworkParser
from tests.ut.profiler import PROFILER_DIR, RAW_DATA_BASE
def get_framework_result(file_path):
"""
Get framework result from the framework file.
Args:
file_path (str): The framework file path.
Returns:
list[list], the parsed framework information.
"""
result = []
with open(file_path, 'r') as file:
csv_reader = csv.reader(file)
for row in csv_reader:
result.append(row)
return result
class TestFrameworkParser:
"""Test the class of `FrameworkParser`."""
def setup_method(self):
"""Initialization before test case execution."""
FrameworkParser._raw_data_dir = RAW_DATA_BASE
self._output_path_1 = tempfile.mkdtemp(prefix='test_framework_parser_')
self._parser_1 = FrameworkParser('JOB1', '0', self._output_path_1)
self._output_path_2 = tempfile.mkdtemp(prefix='test_framework_parser_')
self._parser_2 = FrameworkParser('JOB2', '0', self._output_path_2)
self._output_path_4 = tempfile.mkdtemp(prefix='test_framework_parser_')
self._parser_4 = FrameworkParser('JOB4', '0', self._output_path_4)
def teardown_method(self) -> None:
"""Clear up after test case execution."""
shutil.rmtree(self._output_path_1)
shutil.rmtree(self._output_path_2)
shutil.rmtree(self._output_path_4)
FrameworkParser._raw_data_dir = '/var/log/npu/profiling'
def test_save_path(self):
"""Test the querying save path function."""
expect_result = os.path.join(self._output_path_1, 'framework_raw_0.csv')
assert expect_result == self._parser_1.save_path
expect_result = os.path.join(self._output_path_2, 'framework_raw_0.csv')
assert expect_result == self._parser_2.save_path
def test_point_info(self):
"""Test the querying point info function."""
expect_result = {
1: 'Default/Cast-op6',
2: 'Default/TransData-op7'
}
assert expect_result == self._parser_4.point_info
def test_to_task_id_full_op_name_dict(self):
"""Test the querying task id and full operator name dict function."""
expect_result = {
'51517': 'Default/Cast-op6',
'51518': 'Default/TransData-op7',
'51519': 'Default/network-WithLossCell/_backbone-ResNet/conv1-Conv2d/Cast-op5',
'51522': 'Default/network-WithLossCell/_backbone-ResNet/'
'layer1-SequentialCell/0-ResidualBlock/conv1-Conv2d/Cast-op28'
}
assert expect_result == self._parser_1.to_task_id_full_op_name_dict()
assert expect_result == self._parser_2.to_task_id_full_op_name_dict()
expect_result = {
'0_1': 'Default/Cast-op6',
'0_2': 'Default/TransData-op7',
'0_3': 'Default/network-WithLossCell/_backbone-ResNet/conv1-Conv2d/Cast-op5',
'0_4': 'Default/network-WithLossCell/_backbone-ResNet/layer1-SequentialCell/'
'0-ResidualBlock/conv1-Conv2d/Cast-op28'
}
assert expect_result == self._parser_4.to_task_id_full_op_name_dict()
def test_parse(self):
"""Test the parse function."""
expect_framework_file = os.path.join(PROFILER_DIR, 'framework_raw_0.csv')
expect_framework_file = os.path.realpath(expect_framework_file)
expect_result = get_framework_result(expect_framework_file)
self._parser_1.parse()
framework_file = os.path.join(self._output_path_1, 'framework_raw_0.csv')
result = get_framework_result(framework_file)
assert expect_result == result
self._parser_2.parse()
framework_file = os.path.join(self._output_path_2, 'framework_raw_0.csv')
result = get_framework_result(framework_file)
assert expect_result == result
@mock.patch('mindinsight.profiler.parser.framework_parser.validate_and_normalize_path')
def test_create_framework_parser_fail_1(self, *args):
"""Test the function of fail to create framework parser."""
args[0].side_effect = ValidationError({'profiler': {"The path is invalid!"}})
with pytest.raises(ProfilerPathErrorException) as exc_info:
FrameworkParser('JOB1', '0')
assert exc_info.value.error_code == '50546081'
assert exc_info.value.message == 'Path error. Profiling path is invalid.'
@mock.patch('os.path.isdir')
def test_create_framework_parser_fail_2(self, *args):
"""Test the function of fail to create framework parser."""
args[0].return_value = False
FrameworkParser._raw_data_dir = '/var/log/npu/profiling'
with pytest.raises(ProfilerDirNotFoundException) as exc_info:
FrameworkParser('JOB1', '0')
assert exc_info.value.error_code == '50546083'
assert exc_info.value.message == \
'The dir </var/log/npu/profiling/JOB1> not found.'
@mock.patch('os.listdir')
@mock.patch('os.path.isdir')
def test_create_framework_parser_fail_3(self, *args):
"""Test the function of fail to create framework parser."""
args[0].return_value = True
args[1].return_value = []
FrameworkParser._raw_data_dir = '/var/log/npu/profiling'
with pytest.raises(ProfilerFileNotFoundException) as exc_info:
FrameworkParser('JOB1', '0')
assert exc_info.value.error_code == '50546084'
assert exc_info.value.message == 'The file <Framework> not found.'
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Test the minddata pipeline parser module."""
import csv
import os
import shutil
import tempfile
from mindinsight.profiler.parser.minddata_pipeline_parser import \
MinddataPipelineParser
from tests.ut.profiler import PROFILER_DIR, RAW_DATA, RAW_DATA_JOB2
def get_minddata_pipeline_result(file_path):
"""
Get minddata pipeline result from the minddata pipeline file.
Args:
file_path (str): The minddata pipeline file path.
Returns:
list[list], the parsed minddata pipeline information.
"""
result = []
with open(file_path, 'r') as file:
csv_reader = csv.reader(file)
for row in csv_reader:
result.append(row)
return result
class TestMinddataPipelineParser:
"""Test the class of `MinddataPipelineParser`."""
def setup_method(self):
"""Initialization before test case execution."""
self._output_path_1 = tempfile.mkdtemp(
prefix='test_minddata_pipeline_parser_'
)
self._parser_1 = MinddataPipelineParser(
RAW_DATA, '0', self._output_path_1
)
self._output_path_2 = tempfile.mkdtemp(
prefix='test_minddata_pipeline_parser_'
)
self._parser_2 = MinddataPipelineParser(
RAW_DATA_JOB2, '0', self._output_path_2
)
def teardown_method(self) -> None:
"""Clear up after test case execution."""
shutil.rmtree(self._output_path_1)
shutil.rmtree(self._output_path_2)
def test_save_path(self):
"""Test the querying save path function."""
expect_result = os.path.join(
self._output_path_1, 'minddata_pipeline_raw_0.csv'
)
assert expect_result == self._parser_1.save_path
def test_parse(self):
"""Test the parse function."""
expect_pipeline_file = os.path.join(
PROFILER_DIR, 'minddata_pipeline_raw_0.csv'
)
expect_result = get_minddata_pipeline_result(expect_pipeline_file)
self._parser_1.parse()
pipeline_file = os.path.join(
self._output_path_1, 'minddata_pipeline_raw_0.csv'
)
result = get_minddata_pipeline_result(pipeline_file)
assert expect_result == result
self._parser_2.parse()
pipeline_file = os.path.join(
self._output_path_2, 'minddata_pipeline_raw_0.csv'
)
result = get_minddata_pipeline_result(pipeline_file)
assert expect_result == result
op_type,execution_time,execution_frequency,percent
AssignAdd,0.001688,1,0.00
Mul,1.9029486666666665347,32,1.51
Assign,0.0024766666666666667,2,0.00
GatherV2,43.1554414761904692,2,34.13
ReduceSum,0.0307648571428571411,5,0.02
TensorAdd,0.0092183809523809521,3,0.01
Cast,0.4846848571428571735,15,0.38
TransData,1.1151575238095237340,30,0.88
Square,0.009799333333333334,1,0.01
Split,2.71427747619047635,2,2.15
Concat,5.808453809523809946,4,4.59
StridedSlice,0.345568761904761835,2,0.27
AtomicAddrClean,0.0193686666666666662,8,0.02
RealDiv,0.4228071904761904831,15,0.33
Tile,0.044158333333333339,4,0.03
StridedSliceGrad,1.50683428571428578,2,1.19
Slice,20.3763149999999997,16,16.12
ReLU,0.483282142857142759,5,0.38
MatMul,1.936681428571428733,15,1.53
BiasAdd,0.189662857142857130,5,0.15
SigmoidCrossEntropyWithLogits,0.004808571428571428,1,0.00
SigmoidCrossEntropyWithLogitsGrad,0.009582142857142858,2,0.01
ReduceMean,0.004534999999999999,1,0.00
BiasAddGrad,0.0716814285714285667,5,0.06
UnsortedSegmentSum,44.607826428571423,2,35.28
ReluGrad,0.85406857142857138,5,0.68
AddN,0.012836428571428572,1,0.01
ApplyFtrl,0.0254535714285714273,2,0.02
Adam,0.2859357142857142737,11,0.23
op_id,op_type,num_workers,output_queue_size,output_queue_average_size,output_queue_length,output_queue_usage_rate,sample_interval,parent_id,children_id
0,Batch,4,,,,,10,,[1]
1,Shuffle,1,"[10, 20, 30]",20.0,64,0.3125,10,0,"[2, 3]"
2,TFReader,4,"[10, 20, 30]",20.0,64,0.3125,10,1,
3,TFReader,4,"[10, 20, 30]",20.0,64,0.3125,10,1,
{
"sampling_interval": 10,
"op_info": [
{
"op_id": 4,
"op_type": "TFReader",
"num_workers": 4,
"metrics": null,
"children": [3]
},
{
"op_id": 3,
"op_type": "TFReader",
"num_workers": 4,
"metrics": {
"output_queue": {
"size": [10, 20, 30],
"length": 64
}
},
"children": null
},
{
"op_id": 2,
"op_type": "TFReader",
"num_workers": 4,
"metrics": {
"output_queue": {
"size": [10, 20, 30],
"length": 64
}
},
"children": null
},
{
"op_id": 1,
"op_type": "Shuffle",
"num_workers": 1,
"metrics": {
"output_queue": {
"size": [10, 20, 30],
"length": 64
}
},
"children": [2, 4]
},
{
"op_id": 0,
"op_type": "Batch",
"num_workers": 4,
"metrics": null,
"children": [1]
}
]
}
\ No newline at end of file
{"fp_start": "Default/Cast-op6", "bp_end": "Default/TransData-op7"}
\ No newline at end of file
{"total_time": 1771382.849999995, "num_of_streams": 11, "num_of_ops": 199, "op_exe_times": 2817}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册