提交 169c580a 编写于 作者: M mindspore-ci-bot 提交者: Gitee

!130 Support profiling: profiler api and file (hwts log) parser

Merge pull request !130 from WeibiaoYu/master
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Profiler utils."""
import os
import re
def fwrite_format(output_data_path, data_source=None, is_print=False, is_start=False):
"""
Write data to the output file.
Args:
output_data_path(str): the output file path of the data.
data_source(list): the data to write.
is_print(bool): whether to print the data to stdout.
is_start(bool): Whether is the first line of the output file, will remove the old file if True."
"""
if is_start is True and os.path.exists(output_data_path):
os.remove(output_data_path)
if data_source.startswith("title:"):
title_label = '=' * 20
data_source = title_label + data_source[6:] + title_label
with open(output_data_path, 'a+') as f:
f.write(data_source)
f.write("\n")
if is_print:
print(data_source)
def get_log_slice_id(file_name):
pattern = re.compile(r'(?<=slice_)\d+')
slice_list = pattern.findall(file_name)
index = re.findall(r'\d+', slice_list[0])
return int(index[0])
def get_file_join_name(input_path, file_name):
"""
Search files under the special path, and will join all the files to one file.
Args:
input_path(str): the source path, will search files under it.
file_name(str): the target of the filename, such as 'hwts.log.data.45.dev'.
Returns:
str: the join file name.
"""
name_list = []
file_join_name = ''
input_path = os.path.realpath(input_path)
if os.path.exists(input_path):
files = os.listdir(input_path)
for f in files:
if file_name in f and not f.endswith('.done') and not f.endswith('.join') \
and not f.endswith('.zip'):
name_list.append(f)
# resort name_list
name_list.sort(key=get_log_slice_id)
if len(name_list) == 1:
file_join_name = os.path.join(input_path, name_list[0])
elif len(name_list) > 1:
file_join_name = os.path.join(input_path, '%s.join' % file_name)
if os.path.exists(file_join_name):
os.remove(file_join_name)
with open(file_join_name, 'ab') as bin_data:
for i in name_list:
file = input_path + os.sep + i
with open(file, 'rb') as txt:
bin_data.write(txt.read())
return file_join_name
def get_file_names(input_path, file_name):
"""
Search files under the special path.
Args:
input_path(str): the souce path, will serch files under it.
file_name(str): the target of the filename, such as 'host_start_log'.
Returns:
list: file name list.
"""
input_path = os.path.realpath(input_path)
name_list = []
if os.path.exists(input_path):
files = os.listdir(input_path)
for f in files:
if file_name in f and not f.endswith('.done') \
and not f.endswith('.zip'):
name_list.append(f)
break
return name_list
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Profiler check parameters."""
def check_bool(input_param, param_name):
"""Bool type judgment."""
if isinstance(input_param, bool):
return input_param
raise TypeError("Parameter {}: input type must be bool!".format(param_name))
def check_subgraph(subgraph):
"""Check subgraph."""
if subgraph in ("all", "Default", "Gradients"):
return subgraph
raise ValueError("subgraph must be all or Default or Gradients, but got {}.".format(subgraph))
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""The parser for hwts log file."""
import struct
from tabulate import tabulate
from mindinsight.profiler.common._utils import fwrite_format, get_file_join_name
from mindinsight.profiler.common.log import logger
class HWTSLogParser:
"""
The Parser for hwts log files.
Args:
_input_path(str): The profiling job path. Such as: '/var/log/npu/profiling/JOBAIFGJEJFEDCBAEADIFJAAAAAAAAAA".
output_filename(str): The output data path and name. Such as: './output_format_data_hwts_0.txt'.
"""
_source_file_target = 'hwts.log.data.45.dev.profiler_default_tag'
_dst_file_title = 'title:45 HWTS data'
_dst_file_column_title = ['Type', 'cnt', 'Core ID', 'Block ID', 'Task ID',
'Cycle counter', 'Stream ID']
def __init__(self, input_path, output_filename):
self._input_path = input_path
self._output_filename = output_filename
self._source_flie_name = self._get_source_file()
def _get_source_file(self):
"""Get hwts log file name, which was created by ada service."""
file_name = get_file_join_name(self._input_path, self._source_file_target)
if not file_name:
msg = ("Fail to find hwts log file, under directory %s" \
%self._input_path)
raise RuntimeError(msg)
return file_name
def execute(self):
"""
Execute the parser, get result data, and write it to the output file.
Returns:
bool: whether succeed to analyse hwts log.
"""
content_format = ['QIIIIIIIIIIII', 'QIIQIIIIIIII', 'IIIIQIIIIIIII']
log_type = ['Start of task', 'End of task', 'Start of block', 'End of block', 'Block PMU']
result_data = []
with open(self._source_flie_name, 'rb') as hwts_data:
while True:
line = hwts_data.read(64)
if line:
if not line.strip():
continue
else:
break
byte_first_four = struct.unpack('BBHHH', line[0:8])
byte_first = bin(byte_first_four[0]).replace('0b', '').zfill(8)
ms_type = byte_first[-3:]
cnt = int(byte_first[0:4], 2)
core_id = byte_first_four[1]
blk_id, task_id = byte_first_four[3], byte_first_four[4]
if ms_type in ['000', '001', '010']: # log type 0,1,2
result = struct.unpack(content_format[0], line[8:])
syscnt = result[0]
stream_id = result[1]
result_data.append((log_type[int(ms_type, 2)], cnt, core_id, blk_id, task_id, syscnt, stream_id))
elif ms_type == '011': # log type 3
result = struct.unpack(content_format[1], line[8:])
syscnt = result[0]
stream_id = result[1]
result_data.append((log_type[int(ms_type, 2)], cnt, core_id, blk_id, task_id, syscnt, stream_id))
elif ms_type == '100': # log type 4
result = struct.unpack(content_format[2], line[8:])
stream_id = result[2]
result_data.append((log_type[int(ms_type, 2)], cnt, core_id, blk_id, task_id, total_cyc, stream_id))
else:
logger.info("Profiling: invalid hwts log record type %s", ms_type)
fwrite_format(self._output_filename, data_source=self._dst_file_title, is_start=True)
fwrite_format(self._output_filename, data_source=tabulate(result_data,
self._dst_file_column_title,
tablefmt='simple'))
return True
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""op compute time files parser"""
from tabulate import tabulate
from mindinsight.profiler.common._utils import fwrite_format
class OPComputeTimeParser:
"""
Join hwts info and framework info, get op time info, and output to the result file.
Args:
hwts_output_file(str): The file path of hwts_output_file. Such as: './output_format_data_hwts_0.txt".
output_filename(str): The output data file path and name. Such as: './output_op_compute_time_0.txt'.
op_task_info(dict): The task and op relation info. format as: {taskid, [opname, streamid, block dim]}.
"""
_dst_file_title = 'title:op compute time'
_dst_file_column_title = ['op_name', 'compute_time(ms)', 'stream_id']
def __init__(self, hwts_output_file, output_filename, op_task_info):
self._hwts_output_file = hwts_output_file
self._output_filename = output_filename
self._op_task_info = op_task_info
def _get_op_task_id_map(self):
"""
Read hwts data file, get the task time info.
Returns:
list: all hwts task time info.
"""
op_map_result = []
hwts_list = []
with(open(self._hwts_output_file, 'r')) as data_file:
lines = data_file.readlines()
for line in lines:
if line.startswith("Start of task"):
line_split = line.split()
hwts_list.append([line_split[0], line_split[6], line_split[7], line_split[8]])
if line.startswith('End of task'):
line_split = line.split()
hwts_list.append([line_split[0], line_split[6], line_split[7], line_split[8]])
# hwts op map by taskId
for hwts in hwts_list:
if hwts[1] in self._op_task_info.keys():
op_map_result.append([self._op_task_info[hwts[1]], hwts[0], hwts[1], hwts[2], hwts[3]])
return op_map_result
def execute(self):
"""Execute the parser, compute all op, get op time, and write it to the output file."""
result_data = []
tmp_result_data = []
op_map_list = self._get_op_task_id_map()
cur_index = 0
length = len(op_map_list)
while cur_index < length:
if cur_index + 1 == length:
break
op_start = op_map_list[cur_index]
op_end = op_map_list[cur_index+1]
if op_start[1] == "Start" and op_end[1] == "End"\
and op_start[0] == op_end[0]:
# op_name, taskId, cycle counter, streamId
tmp_result_data.append([op_start[0], op_start[2], int(op_end[3]) - int(op_start[3]), op_start[4]])
cur_index += 2
else:
cur_index += 1
op_name_time_dict = {}
op_name_steamid_dict = {}
op_name_count_dict = {}
op_name_task_dict = {}
# compute all op
for item in tmp_result_data:
if item[0] in op_name_time_dict.keys():
op_name_time_dict[item[0]] += float(item[2])/1e5 # cycle counter/1*10^5 ms
if item[1] == op_name_task_dict[item[0]]:
op_name_count_dict[item[0]] += 1
else:
op_name_time_dict[item[0]] = float(item[2])/1e5
op_name_steamid_dict[item[0]] = item[-1]
op_name_task_dict[item[0]] = item[1]
op_name_count_dict[item[0]] = 1
for op_name, time in op_name_time_dict.items():
if op_name in op_name_steamid_dict.keys():
stream_id = op_name_steamid_dict[op_name]
avg_time = time / op_name_count_dict[op_name]
result_data.append([op_name, avg_time, stream_id])
result_data.sort(key=lambda x: x[0])
total_time = 0
for item in result_data:
total_time += item[1]
result_data.append(["total op", total_time, 0])
fwrite_format(self._output_filename, data_source=self._dst_file_title, is_start=True)
fwrite_format(self._output_filename, data_source=tabulate(result_data,
self._dst_file_column_title,
tablefmt='simple'))
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""profiling api file."""
import os
import time
from tabulate import tabulate
from mindinsight.profiler.parser.hwts_log_parser import HWTSLogParser
from mindinsight.profiler.parser.framework_parser import FrameworkParser
from mindinsight.profiler.parser.optime_parser import OPComputeTimeParser
from mindinsight.profiler.parser.aicpu_data_parser import DataPreProcessParser
from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory
from mindinsight.profiler.analyser.integrator import Integrator
from mindinsight.profiler.common._utils import get_file_names, fwrite_format
from mindinsight.profiler.common.validator.validate_path import \
validate_and_normalize_path
from mindinsight.profiler.common.validator.checkparam import \
check_bool, check_subgraph
from mindinsight.profiler.common.log import logger
from mindinsight.utils.exceptions import MindInsightException
profiling_log_base_path = "/var/log/npu/profiling"
class Profiler:
"""Performance profiling tool."""
_base_profiling_container_path = "/var/log/npu/profiling/container"
_hwts_output_filename_target = "output_format_data_hwts_"
_opcompute_output_filename_target = "output_op_compute_time_"
_aicpu_op_output_filename_target = "output_data_preprocess_aicpu_"
def __init__(self, subgraph='all', is_detail=True, is_show_op_path=False, output_path='./data',
optypes_to_deal='', optypes_not_deal='Variable', job_id=""):
"""
Init profiling service, called berfore network training.
Args:
subgraph(str): which subgraph to monit and anlayse, can be 'all', 'Default', 'Gradients'.
is_detail(Bool): whether to show profiling data for op_instace level, only show optype level if False.
is_show_op_path(Bool): whether to save the full path for each op instace.
output_path(Bool): output data path.
optypes_to_deal(List): Op type names, the data of which optype should be collected and analysed,
will deal with all op if null.
optypes_not_deal(List): Op type names, the data of which optype will not be collected and analysed.
"""
dev_id = os.getenv('DEVICE_ID')
if not dev_id:
dev_id = "0"
logger.error("Fail to get DEVICE_ID, use 0 instead.")
self._dev_id = dev_id
self._container_path = os.path.join(self._base_profiling_container_path, dev_id)
data_path = os.path.join(self._container_path, "data")
if not os.path.exists(data_path):
os.makedirs(data_path)
self._output_path = validate_and_normalize_path(output_path,
'Profiler output path (' + output_path + ')')
self._output_path = os.path.join(self._output_path, "profiler")
if not os.path.exists(self._output_path):
os.makedirs(self._output_path)
os.environ['PROFILING_MODE'] = 'true'
os.environ['PROFILING_OPTIONS'] = 'training_trace:task_trace'
os.environ['AICPU_PROFILING_MODE'] = 'true'
os.environ['PROFILING_DIR'] = str(self._container_path)
self._subgraph = check_subgraph(subgraph)
self._valid_optype_name = optypes_to_deal.split(",") if optypes_to_deal else []
self._filt_optype_names = optypes_not_deal.split(",") if optypes_not_deal else []
self._detail = check_bool(is_detail, 'is_detail')
self._withfullpath = check_bool(is_show_op_path, 'is_show_op_path')
self._profiling_job_id = job_id
self._start_time = int(time.time() * 10000000)
logger.info("Profiling: profiling start time: %d", self._start_time)
def analyse(self):
"""Collect and analyze performance data, called after training or during training."""
try:
from mindspore.communication.management import release
release()
except ImportError:
logger.error("Profiling: fail to import release from mindspore.")
logger.info("begin profiler analyse")
job_id = self._get_profiling_job_id()
if not job_id:
msg = ("Fail to get profiling job, please check whether job dir was generated under path %s"\
%profiling_log_base_path)
raise RuntimeError(msg)
logger.info("Profiling: job id is %s ", job_id)
source_path = os.path.join(profiling_log_base_path, job_id)
# parse hwts.log.data.45.dev file, and get task profiling data
hwts_output_filename = self._hwts_output_filename_target + self._dev_id + ".txt"
hwts_output_filename = os.path.join(self._output_path, hwts_output_filename)
hwtslog_parser = HWTSLogParser(source_path, hwts_output_filename)
result = hwtslog_parser.execute()
if not result:
logger.error("Profiling: fail to parse hwts log file.")
return
# parse Framework file, and get the relation of op and tasks
framework_parser = FrameworkParser(job_id, self._dev_id, self._output_path)
framework_parser.parse()
op_task_dict = framework_parser.to_task_id_full_op_name_dict()
if not op_task_dict:
logger.error("Profiling: fail to parse framework files.")
return
# get op compute time from hwts data and framework data, write output_op_compute_time.txt
opcompute_output_filename = self._opcompute_output_filename_target + self._dev_id + ".txt"
opcompute_output_filename = os.path.join(self._output_path, opcompute_output_filename)
optime_parser = OPComputeTimeParser(hwts_output_filename, opcompute_output_filename, op_task_dict)
optime_parser.execute()
# parse DATA_PREPROCESS.dev.AICPU file, write output_data_preprocess_aicpu_x.txt
output_data_preprocess_aicpu = self._aicpu_op_output_filename_target + self._dev_id + ".txt"
output_data_preprocess_aicpu = os.path.join(self._output_path, output_data_preprocess_aicpu)
aicpu_data_parser = DataPreProcessParser(source_path, output_data_preprocess_aicpu)
aicpu_data_parser.execute()
# analyse op compute time info
try:
self._analyser_op_info()
except MindInsightException as err:
logger.error(err.message)
def __del__(self):
"""Disable the profiling collection service, called after training."""
os.environ['PROFILING_MODE'] = str("false")
def _get_profiling_job_id(self):
"""Get profiling job id, which was generated by ada service.
Returns:
str: profiling jon id.
"""
if self._profiling_job_id:
return self._profiling_job_id
job_id = ""
cmd = "ls -t " + profiling_log_base_path + "|grep JOB|awk '{print $1}'"
r = os.popen(cmd)
profiling_job_dirs = r.readlines()
r.close()
for item in profiling_job_dirs:
path = os.path.join(profiling_log_base_path, item.strip())
log_file = get_file_names(path, "host_start.log")
if not log_file:
logger.error("Profiling: job path %s, host_start.log not exist.", path)
continue
log_file = os.path.join(path, log_file[0])
item_dict = self._parse_host_start_log(log_file)
if not item_dict:
logger.error("Profiling: job path %s, fail to get job start info.", path)
continue
if self._start_time > int(item_dict["start_time"]):
logger.info("Profiling: job path %s, start_time %s, training start_time %d.",
path, item_dict["start_time"], self._start_time)
break
if self._dev_id != item_dict["device_id"]:
logger.info("Profiling: job path %s, dev id %s, training device id %s.",
path, item_dict["device_id"], self._dev_id)
continue
job_id = item.strip()
break
return job_id
def _parse_host_start_log(self, input_file):
"""
Parse host start log file, get the device id and start time of the job.
Args:
input_file(str): the file path of the host start log file.
Returns:
dict: job start time and device id.
"""
item_dict = {}
for line in open(input_file):
if "Device" in line:
item_dict["device_id"] = line[7:len(line)-2]
elif "clock_realtime" in line:
item_dict["start_time"] = line[16:len(line)-3]
return item_dict
def _analyser_op_info(self):
"""Analyser the operator information."""
integrator = Integrator(self._output_path, self._dev_id)
integrator.integrate()
aicore_type_result = self._query_op_type_info()
detail_file_path = os.path.join(
self._output_path,
'output_op_compute_time_detail_{}.txt'.format(self._dev_id)
)
fwrite_format(detail_file_path, data_source='title:op compute time')
display_names = [
'optype_name', 'compute_time(ms, per-step)',
'called_times(per-step)', 'percent'
]
data_source = tabulate(aicore_type_result, display_names)
fwrite_format(detail_file_path, data_source=data_source, is_print=True)
if self._detail:
op_type_order = [item[0] for item in aicore_type_result]
aicore_detail_result = self._query_op_detail_info(op_type_order)
fwrite_format(detail_file_path, data_source='', is_print=True)
fwrite_format(detail_file_path, data_source='Detail:', is_print=True)
data_source = tabulate(
aicore_detail_result.get('object'),
aicore_detail_result.get('col_name')
)
fwrite_format(detail_file_path, data_source=data_source, is_print=True)
def _query_op_type_info(self):
"""
Query AICORE operator type information.
Returns:
list[list], the AICORE operator type and execution time information.
"""
condition = {
'sort_condition': {
'name': 'execution_time',
'type': 'descending'
}
}
analyser = AnalyserFactory.instance().get_analyser(
'aicore_type', self._output_path, self._dev_id
)
result = analyser.query(condition)
return result.get('object')
def _query_op_detail_info(self, op_type_order):
"""
Query AICORE operator detail information.
Args:
op_type_order(list): The name of the op type in order.
Returns:
dict, the AICORE operator detail information.
"""
op_type_condition = {}
if self._valid_optype_name:
op_type_condition['in'] = self._valid_optype_name
if self._filt_optype_names:
op_type_condition['not_in'] = self._filt_optype_names
subgraph_condition = {}
if self._subgraph != 'all':
subgraph_condition['in'] = [self._subgraph]
filter_condition = {
'op_type': op_type_condition,
'subgraph': subgraph_condition,
'is_display_detail': False,
'is_display_full_op_name': self._withfullpath
}
analyser = AnalyserFactory.instance().get_analyser(
'aicore_detail', self._output_path, self._dev_id
)
result = analyser.query_and_sort_by_op_type(
filter_condition, op_type_order
)
return result
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册