diff --git a/mindinsight/profiler/common/_utils.py b/mindinsight/profiler/common/_utils.py new file mode 100755 index 0000000000000000000000000000000000000000..25a24b651b82bbdacf1f742a837176721f4a2555 --- /dev/null +++ b/mindinsight/profiler/common/_utils.py @@ -0,0 +1,110 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""Profiler utils.""" +import os +import re + +def fwrite_format(output_data_path, data_source=None, is_print=False, is_start=False): + """ + Write data to the output file. + + Args: + output_data_path(str): the output file path of the data. + data_source(list): the data to write. + is_print(bool): whether to print the data to stdout. + is_start(bool): Whether is the first line of the output file, will remove the old file if True." + """ + + if is_start is True and os.path.exists(output_data_path): + os.remove(output_data_path) + + if data_source.startswith("title:"): + title_label = '=' * 20 + data_source = title_label + data_source[6:] + title_label + + with open(output_data_path, 'a+') as f: + f.write(data_source) + f.write("\n") + + if is_print: + print(data_source) + + +def get_log_slice_id(file_name): + pattern = re.compile(r'(?<=slice_)\d+') + slice_list = pattern.findall(file_name) + index = re.findall(r'\d+', slice_list[0]) + return int(index[0]) + +def get_file_join_name(input_path, file_name): + """ + Search files under the special path, and will join all the files to one file. + + Args: + input_path(str): the source path, will search files under it. + file_name(str): the target of the filename, such as 'hwts.log.data.45.dev'. + + Returns: + str: the join file name. + """ + name_list = [] + file_join_name = '' + input_path = os.path.realpath(input_path) + if os.path.exists(input_path): + files = os.listdir(input_path) + for f in files: + if file_name in f and not f.endswith('.done') and not f.endswith('.join') \ + and not f.endswith('.zip'): + name_list.append(f) + + # resort name_list + name_list.sort(key=get_log_slice_id) + + if len(name_list) == 1: + file_join_name = os.path.join(input_path, name_list[0]) + elif len(name_list) > 1: + file_join_name = os.path.join(input_path, '%s.join' % file_name) + if os.path.exists(file_join_name): + os.remove(file_join_name) + with open(file_join_name, 'ab') as bin_data: + for i in name_list: + file = input_path + os.sep + i + with open(file, 'rb') as txt: + bin_data.write(txt.read()) + return file_join_name + +def get_file_names(input_path, file_name): + """ + Search files under the special path. + + Args: + input_path(str): the souce path, will serch files under it. + file_name(str): the target of the filename, such as 'host_start_log'. + + Returns: + list: file name list. + """ + + input_path = os.path.realpath(input_path) + name_list = [] + if os.path.exists(input_path): + files = os.listdir(input_path) + for f in files: + if file_name in f and not f.endswith('.done') \ + and not f.endswith('.zip'): + name_list.append(f) + break + + return name_list diff --git a/mindinsight/profiler/common/validator/checkparam.py b/mindinsight/profiler/common/validator/checkparam.py new file mode 100755 index 0000000000000000000000000000000000000000..ebe8cc167324b7d885d308a315eb6340270959d0 --- /dev/null +++ b/mindinsight/profiler/common/validator/checkparam.py @@ -0,0 +1,26 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""Profiler check parameters.""" +def check_bool(input_param, param_name): + """Bool type judgment.""" + if isinstance(input_param, bool): + return input_param + raise TypeError("Parameter {}: input type must be bool!".format(param_name)) + +def check_subgraph(subgraph): + """Check subgraph.""" + if subgraph in ("all", "Default", "Gradients"): + return subgraph + raise ValueError("subgraph must be all or Default or Gradients, but got {}.".format(subgraph)) diff --git a/mindinsight/profiler/parser/hwts_log_parser.py b/mindinsight/profiler/parser/hwts_log_parser.py new file mode 100755 index 0000000000000000000000000000000000000000..1b1d14ee52310c6cf4c801006740e8ef7404ed97 --- /dev/null +++ b/mindinsight/profiler/parser/hwts_log_parser.py @@ -0,0 +1,100 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""The parser for hwts log file.""" +import struct +from tabulate import tabulate +from mindinsight.profiler.common._utils import fwrite_format, get_file_join_name +from mindinsight.profiler.common.log import logger + +class HWTSLogParser: + """ + The Parser for hwts log files. + + Args: + _input_path(str): The profiling job path. Such as: '/var/log/npu/profiling/JOBAIFGJEJFEDCBAEADIFJAAAAAAAAAA". + output_filename(str): The output data path and name. Such as: './output_format_data_hwts_0.txt'. + """ + + _source_file_target = 'hwts.log.data.45.dev.profiler_default_tag' + _dst_file_title = 'title:45 HWTS data' + _dst_file_column_title = ['Type', 'cnt', 'Core ID', 'Block ID', 'Task ID', + 'Cycle counter', 'Stream ID'] + + def __init__(self, input_path, output_filename): + self._input_path = input_path + self._output_filename = output_filename + self._source_flie_name = self._get_source_file() + + def _get_source_file(self): + """Get hwts log file name, which was created by ada service.""" + + file_name = get_file_join_name(self._input_path, self._source_file_target) + if not file_name: + msg = ("Fail to find hwts log file, under directory %s" \ + %self._input_path) + raise RuntimeError(msg) + + return file_name + + def execute(self): + """ + Execute the parser, get result data, and write it to the output file. + + Returns: + bool: whether succeed to analyse hwts log. + """ + + content_format = ['QIIIIIIIIIIII', 'QIIQIIIIIIII', 'IIIIQIIIIIIII'] + log_type = ['Start of task', 'End of task', 'Start of block', 'End of block', 'Block PMU'] + + result_data = [] + + with open(self._source_flie_name, 'rb') as hwts_data: + while True: + line = hwts_data.read(64) + if line: + if not line.strip(): + continue + else: + break + byte_first_four = struct.unpack('BBHHH', line[0:8]) + byte_first = bin(byte_first_four[0]).replace('0b', '').zfill(8) + ms_type = byte_first[-3:] + cnt = int(byte_first[0:4], 2) + core_id = byte_first_four[1] + blk_id, task_id = byte_first_four[3], byte_first_four[4] + if ms_type in ['000', '001', '010']: # log type 0,1,2 + result = struct.unpack(content_format[0], line[8:]) + syscnt = result[0] + stream_id = result[1] + result_data.append((log_type[int(ms_type, 2)], cnt, core_id, blk_id, task_id, syscnt, stream_id)) + + elif ms_type == '011': # log type 3 + result = struct.unpack(content_format[1], line[8:]) + syscnt = result[0] + stream_id = result[1] + result_data.append((log_type[int(ms_type, 2)], cnt, core_id, blk_id, task_id, syscnt, stream_id)) + elif ms_type == '100': # log type 4 + result = struct.unpack(content_format[2], line[8:]) + stream_id = result[2] + result_data.append((log_type[int(ms_type, 2)], cnt, core_id, blk_id, task_id, total_cyc, stream_id)) + else: + logger.info("Profiling: invalid hwts log record type %s", ms_type) + + fwrite_format(self._output_filename, data_source=self._dst_file_title, is_start=True) + fwrite_format(self._output_filename, data_source=tabulate(result_data, + self._dst_file_column_title, + tablefmt='simple')) + return True diff --git a/mindinsight/profiler/parser/optime_parser.py b/mindinsight/profiler/parser/optime_parser.py new file mode 100755 index 0000000000000000000000000000000000000000..fe1830528d5fdc9a8ab18afa8525b73f49fa2f9d --- /dev/null +++ b/mindinsight/profiler/parser/optime_parser.py @@ -0,0 +1,122 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""op compute time files parser""" +from tabulate import tabulate +from mindinsight.profiler.common._utils import fwrite_format + +class OPComputeTimeParser: + """ + Join hwts info and framework info, get op time info, and output to the result file. + + Args: + hwts_output_file(str): The file path of hwts_output_file. Such as: './output_format_data_hwts_0.txt". + output_filename(str): The output data file path and name. Such as: './output_op_compute_time_0.txt'. + op_task_info(dict): The task and op relation info. format as: {taskid, [opname, streamid, block dim]}. + """ + + _dst_file_title = 'title:op compute time' + _dst_file_column_title = ['op_name', 'compute_time(ms)', 'stream_id'] + + def __init__(self, hwts_output_file, output_filename, op_task_info): + self._hwts_output_file = hwts_output_file + self._output_filename = output_filename + self._op_task_info = op_task_info + + def _get_op_task_id_map(self): + """ + Read hwts data file, get the task time info. + + Returns: + list: all hwts task time info. + """ + + op_map_result = [] + hwts_list = [] + with(open(self._hwts_output_file, 'r')) as data_file: + lines = data_file.readlines() + for line in lines: + if line.startswith("Start of task"): + line_split = line.split() + hwts_list.append([line_split[0], line_split[6], line_split[7], line_split[8]]) + if line.startswith('End of task'): + line_split = line.split() + hwts_list.append([line_split[0], line_split[6], line_split[7], line_split[8]]) + + # hwts op map by taskId + for hwts in hwts_list: + if hwts[1] in self._op_task_info.keys(): + op_map_result.append([self._op_task_info[hwts[1]], hwts[0], hwts[1], hwts[2], hwts[3]]) + + return op_map_result + + def execute(self): + """Execute the parser, compute all op, get op time, and write it to the output file.""" + + result_data = [] + tmp_result_data = [] + op_map_list = self._get_op_task_id_map() + + cur_index = 0 + length = len(op_map_list) + + while cur_index < length: + if cur_index + 1 == length: + break + op_start = op_map_list[cur_index] + op_end = op_map_list[cur_index+1] + + if op_start[1] == "Start" and op_end[1] == "End"\ + and op_start[0] == op_end[0]: + # op_name, taskId, cycle counter, streamId + tmp_result_data.append([op_start[0], op_start[2], int(op_end[3]) - int(op_start[3]), op_start[4]]) + cur_index += 2 + else: + cur_index += 1 + + op_name_time_dict = {} + op_name_steamid_dict = {} + op_name_count_dict = {} + op_name_task_dict = {} + + # compute all op + for item in tmp_result_data: + if item[0] in op_name_time_dict.keys(): + op_name_time_dict[item[0]] += float(item[2])/1e5 # cycle counter/1*10^5 ms + if item[1] == op_name_task_dict[item[0]]: + op_name_count_dict[item[0]] += 1 + + else: + op_name_time_dict[item[0]] = float(item[2])/1e5 + op_name_steamid_dict[item[0]] = item[-1] + op_name_task_dict[item[0]] = item[1] + op_name_count_dict[item[0]] = 1 + + + for op_name, time in op_name_time_dict.items(): + if op_name in op_name_steamid_dict.keys(): + stream_id = op_name_steamid_dict[op_name] + avg_time = time / op_name_count_dict[op_name] + result_data.append([op_name, avg_time, stream_id]) + + result_data.sort(key=lambda x: x[0]) + total_time = 0 + for item in result_data: + total_time += item[1] + result_data.append(["total op", total_time, 0]) + + fwrite_format(self._output_filename, data_source=self._dst_file_title, is_start=True) + fwrite_format(self._output_filename, data_source=tabulate(result_data, + self._dst_file_column_title, + tablefmt='simple')) diff --git a/mindinsight/profiler/profiling.py b/mindinsight/profiler/profiling.py new file mode 100644 index 0000000000000000000000000000000000000000..3d0fd99db181c504b61b3fd93c46cc30f37a609e --- /dev/null +++ b/mindinsight/profiler/profiling.py @@ -0,0 +1,290 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""profiling api file.""" +import os +import time +from tabulate import tabulate +from mindinsight.profiler.parser.hwts_log_parser import HWTSLogParser +from mindinsight.profiler.parser.framework_parser import FrameworkParser +from mindinsight.profiler.parser.optime_parser import OPComputeTimeParser +from mindinsight.profiler.parser.aicpu_data_parser import DataPreProcessParser +from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory +from mindinsight.profiler.analyser.integrator import Integrator +from mindinsight.profiler.common._utils import get_file_names, fwrite_format +from mindinsight.profiler.common.validator.validate_path import \ + validate_and_normalize_path +from mindinsight.profiler.common.validator.checkparam import \ + check_bool, check_subgraph +from mindinsight.profiler.common.log import logger +from mindinsight.utils.exceptions import MindInsightException + +profiling_log_base_path = "/var/log/npu/profiling" + +class Profiler: + """Performance profiling tool.""" + _base_profiling_container_path = "/var/log/npu/profiling/container" + _hwts_output_filename_target = "output_format_data_hwts_" + _opcompute_output_filename_target = "output_op_compute_time_" + _aicpu_op_output_filename_target = "output_data_preprocess_aicpu_" + + def __init__(self, subgraph='all', is_detail=True, is_show_op_path=False, output_path='./data', + optypes_to_deal='', optypes_not_deal='Variable', job_id=""): + """ + Init profiling service, called berfore network training. + + Args: + subgraph(str): which subgraph to monit and anlayse, can be 'all', 'Default', 'Gradients'. + is_detail(Bool): whether to show profiling data for op_instace level, only show optype level if False. + is_show_op_path(Bool): whether to save the full path for each op instace. + output_path(Bool): output data path. + optypes_to_deal(List): Op type names, the data of which optype should be collected and analysed, + will deal with all op if null. + optypes_not_deal(List): Op type names, the data of which optype will not be collected and analysed. + """ + + dev_id = os.getenv('DEVICE_ID') + if not dev_id: + dev_id = "0" + logger.error("Fail to get DEVICE_ID, use 0 instead.") + self._dev_id = dev_id + self._container_path = os.path.join(self._base_profiling_container_path, dev_id) + data_path = os.path.join(self._container_path, "data") + if not os.path.exists(data_path): + os.makedirs(data_path) + self._output_path = validate_and_normalize_path(output_path, + 'Profiler output path (' + output_path + ')') + self._output_path = os.path.join(self._output_path, "profiler") + if not os.path.exists(self._output_path): + os.makedirs(self._output_path) + + os.environ['PROFILING_MODE'] = 'true' + os.environ['PROFILING_OPTIONS'] = 'training_trace:task_trace' + os.environ['AICPU_PROFILING_MODE'] = 'true' + os.environ['PROFILING_DIR'] = str(self._container_path) + self._subgraph = check_subgraph(subgraph) + self._valid_optype_name = optypes_to_deal.split(",") if optypes_to_deal else [] + self._filt_optype_names = optypes_not_deal.split(",") if optypes_not_deal else [] + self._detail = check_bool(is_detail, 'is_detail') + self._withfullpath = check_bool(is_show_op_path, 'is_show_op_path') + self._profiling_job_id = job_id + self._start_time = int(time.time() * 10000000) + logger.info("Profiling: profiling start time: %d", self._start_time) + + + def analyse(self): + """Collect and analyze performance data, called after training or during training.""" + + try: + from mindspore.communication.management import release + release() + except ImportError: + logger.error("Profiling: fail to import release from mindspore.") + + logger.info("begin profiler analyse") + + job_id = self._get_profiling_job_id() + if not job_id: + msg = ("Fail to get profiling job, please check whether job dir was generated under path %s"\ + %profiling_log_base_path) + raise RuntimeError(msg) + + logger.info("Profiling: job id is %s ", job_id) + + source_path = os.path.join(profiling_log_base_path, job_id) + # parse hwts.log.data.45.dev file, and get task profiling data + hwts_output_filename = self._hwts_output_filename_target + self._dev_id + ".txt" + hwts_output_filename = os.path.join(self._output_path, hwts_output_filename) + hwtslog_parser = HWTSLogParser(source_path, hwts_output_filename) + result = hwtslog_parser.execute() + if not result: + logger.error("Profiling: fail to parse hwts log file.") + return + + # parse Framework file, and get the relation of op and tasks + framework_parser = FrameworkParser(job_id, self._dev_id, self._output_path) + framework_parser.parse() + op_task_dict = framework_parser.to_task_id_full_op_name_dict() + if not op_task_dict: + logger.error("Profiling: fail to parse framework files.") + return + + # get op compute time from hwts data and framework data, write output_op_compute_time.txt + opcompute_output_filename = self._opcompute_output_filename_target + self._dev_id + ".txt" + opcompute_output_filename = os.path.join(self._output_path, opcompute_output_filename) + optime_parser = OPComputeTimeParser(hwts_output_filename, opcompute_output_filename, op_task_dict) + optime_parser.execute() + + # parse DATA_PREPROCESS.dev.AICPU file, write output_data_preprocess_aicpu_x.txt + output_data_preprocess_aicpu = self._aicpu_op_output_filename_target + self._dev_id + ".txt" + output_data_preprocess_aicpu = os.path.join(self._output_path, output_data_preprocess_aicpu) + aicpu_data_parser = DataPreProcessParser(source_path, output_data_preprocess_aicpu) + aicpu_data_parser.execute() + + # analyse op compute time info + try: + self._analyser_op_info() + except MindInsightException as err: + logger.error(err.message) + + def __del__(self): + """Disable the profiling collection service, called after training.""" + + os.environ['PROFILING_MODE'] = str("false") + + def _get_profiling_job_id(self): + """Get profiling job id, which was generated by ada service. + + Returns: + str: profiling jon id. + """ + + if self._profiling_job_id: + return self._profiling_job_id + + job_id = "" + cmd = "ls -t " + profiling_log_base_path + "|grep JOB|awk '{print $1}'" + r = os.popen(cmd) + profiling_job_dirs = r.readlines() + r.close() + for item in profiling_job_dirs: + path = os.path.join(profiling_log_base_path, item.strip()) + log_file = get_file_names(path, "host_start.log") + if not log_file: + logger.error("Profiling: job path %s, host_start.log not exist.", path) + continue + + log_file = os.path.join(path, log_file[0]) + item_dict = self._parse_host_start_log(log_file) + + if not item_dict: + logger.error("Profiling: job path %s, fail to get job start info.", path) + continue + if self._start_time > int(item_dict["start_time"]): + logger.info("Profiling: job path %s, start_time %s, training start_time %d.", + path, item_dict["start_time"], self._start_time) + break + + if self._dev_id != item_dict["device_id"]: + logger.info("Profiling: job path %s, dev id %s, training device id %s.", + path, item_dict["device_id"], self._dev_id) + continue + + job_id = item.strip() + break + + return job_id + + def _parse_host_start_log(self, input_file): + """ + Parse host start log file, get the device id and start time of the job. + + Args: + input_file(str): the file path of the host start log file. + + Returns: + dict: job start time and device id. + """ + + item_dict = {} + for line in open(input_file): + if "Device" in line: + item_dict["device_id"] = line[7:len(line)-2] + elif "clock_realtime" in line: + item_dict["start_time"] = line[16:len(line)-3] + + return item_dict + + def _analyser_op_info(self): + """Analyser the operator information.""" + integrator = Integrator(self._output_path, self._dev_id) + integrator.integrate() + + aicore_type_result = self._query_op_type_info() + detail_file_path = os.path.join( + self._output_path, + 'output_op_compute_time_detail_{}.txt'.format(self._dev_id) + ) + fwrite_format(detail_file_path, data_source='title:op compute time') + display_names = [ + 'optype_name', 'compute_time(ms, per-step)', + 'called_times(per-step)', 'percent' + ] + data_source = tabulate(aicore_type_result, display_names) + fwrite_format(detail_file_path, data_source=data_source, is_print=True) + + if self._detail: + op_type_order = [item[0] for item in aicore_type_result] + aicore_detail_result = self._query_op_detail_info(op_type_order) + fwrite_format(detail_file_path, data_source='', is_print=True) + fwrite_format(detail_file_path, data_source='Detail:', is_print=True) + data_source = tabulate( + aicore_detail_result.get('object'), + aicore_detail_result.get('col_name') + ) + fwrite_format(detail_file_path, data_source=data_source, is_print=True) + + def _query_op_type_info(self): + """ + Query AICORE operator type information. + + Returns: + list[list], the AICORE operator type and execution time information. + """ + condition = { + 'sort_condition': { + 'name': 'execution_time', + 'type': 'descending' + } + } + analyser = AnalyserFactory.instance().get_analyser( + 'aicore_type', self._output_path, self._dev_id + ) + result = analyser.query(condition) + return result.get('object') + + def _query_op_detail_info(self, op_type_order): + """ + Query AICORE operator detail information. + + Args: + op_type_order(list): The name of the op type in order. + + Returns: + dict, the AICORE operator detail information. + """ + + op_type_condition = {} + if self._valid_optype_name: + op_type_condition['in'] = self._valid_optype_name + if self._filt_optype_names: + op_type_condition['not_in'] = self._filt_optype_names + + subgraph_condition = {} + if self._subgraph != 'all': + subgraph_condition['in'] = [self._subgraph] + + filter_condition = { + 'op_type': op_type_condition, + 'subgraph': subgraph_condition, + 'is_display_detail': False, + 'is_display_full_op_name': self._withfullpath + } + analyser = AnalyserFactory.instance().get_analyser( + 'aicore_detail', self._output_path, self._dev_id + ) + result = analyser.query_and_sort_by_op_type( + filter_condition, op_type_order + ) + return result