From 3732f1a2a838df57acd38fed2440304071c59204 Mon Sep 17 00:00:00 2001 From: yuximiao Date: Sat, 13 Jun 2020 18:39:08 +0800 Subject: [PATCH] add minddata profiling --- mindinsight/backend/profiler/profile_api.py | 109 +++++++- mindinsight/datavisual/utils/tools.py | 20 ++ mindinsight/profiler/analyser/__init__.py | 1 + .../profiler/analyser/minddata_analyser.py | 260 ++++++++++++++++++ .../profiler/parser/minddata_parser.py | 88 ++++++ mindinsight/profiler/profiling.py | 7 +- 6 files changed, 483 insertions(+), 2 deletions(-) create mode 100644 mindinsight/profiler/analyser/minddata_analyser.py create mode 100644 mindinsight/profiler/parser/minddata_parser.py diff --git a/mindinsight/backend/profiler/profile_api.py b/mindinsight/backend/profiler/profile_api.py index a20e351..5c83843 100644 --- a/mindinsight/backend/profiler/profile_api.py +++ b/mindinsight/backend/profiler/profile_api.py @@ -26,9 +26,11 @@ from flask import request from marshmallow import ValidationError from mindinsight.conf import settings -from mindinsight.datavisual.utils.tools import get_train_id, get_profiler_dir, to_int +from mindinsight.datavisual.utils.tools import get_train_id, get_profiler_dir, unquote_args +from mindinsight.datavisual.utils.tools import to_int from mindinsight.lineagemgr.common.validator.validate_path import validate_and_normalize_path from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory +from mindinsight.profiler.analyser.minddata_analyser import MinddataAnalyser from mindinsight.profiler.common.util import analyse_device_list_from_profiler_dir from mindinsight.profiler.common.validator.validate import validate_condition, validate_ui_proc from mindinsight.profiler.common.validator.validate_path import validate_and_normalize_profiler_path @@ -167,6 +169,111 @@ def get_target_time_info(): return jsonify(target_time_info) +@BLUEPRINT.route("/profile/queue_info", methods=["GET"]) +def get_queue_info(): + """ + Get each type queue info. + + Returns: + Response, the queue info. + + Examples: + >>> GET http://xxxx/v1/mindinsight/profile/queue_info + """ + profile_dir = get_profiler_abs_dir(request) + + device_id = unquote_args(request, "device_id") + queue_type = unquote_args(request, "type") + queue_info = {} + + minddata_analyser = AnalyserFactory.instance().get_analyser( + 'minddata', profile_dir, device_id) + if queue_type == "get_next": + queue_info, _ = minddata_analyser.analyse_get_next_info(info_type="queue") + elif queue_type == "device_queue": + queue_info, _ = minddata_analyser.analyse_device_queue_info(info_type="queue") + + return jsonify(queue_info) + + +@BLUEPRINT.route("/profile/minddata_op", methods=["GET"]) +def get_time_info(): + """ + Get minddata operation info. + + Returns: + Response, the minddata operation info. + + Examples: + >>> GET http://xxxx/v1/mindinsight/profile/minddata_op + """ + profile_dir = get_profiler_abs_dir(request) + device_id = unquote_args(request, "device_id") + op_type = unquote_args(request, "type") + + time_info = { + 'size': 0, + 'info': [], + "summary": {"time_summary": {}}, + "advise": {} + } + minddata_analyser = AnalyserFactory.instance().get_analyser( + 'minddata', profile_dir, device_id) + if op_type == "get_next": + _, time_info = minddata_analyser.analyse_get_next_info(info_type="time") + elif op_type == "device_queue": + _, time_info = minddata_analyser.analyse_device_queue_info(info_type="time") + + return jsonify(time_info) + + +@BLUEPRINT.route("/profile/process_summary", methods=["GET"]) +def get_process_summary(): + """ + Get interval process summary. + + Returns: + Response, the process summary. + + Examples: + >>> GET http://xxxx/v1/mindinsight/profile/process_summary + """ + profile_dir = get_profiler_abs_dir(request) + device_id = unquote_args(request, "device_id") + + minddata_analyser = AnalyserFactory.instance().get_analyser( + 'minddata', profile_dir, device_id) + get_next_queue_info, _ = minddata_analyser.analyse_get_next_info(info_type="queue") + device_queue_info, _ = minddata_analyser.analyse_device_queue_info(info_type="queue") + + result = MinddataAnalyser.analyse_queue_summary(get_next_queue_info, device_queue_info) + + return jsonify(result) + + +def get_profiler_abs_dir(requests): + """ + Get interval process summary. + + Args: + requests (LocalProxy): The requests. + + Returns: + str, the profiler abs dir. + """ + profiler_dir = get_profiler_dir(requests) + train_id = get_train_id(requests) + if not profiler_dir or not train_id: + raise ParamValueError("No profiler_dir or train_id.") + + profiler_dir_abs = os.path.join(settings.SUMMARY_BASE_DIR, train_id, profiler_dir) + try: + profiler_dir_abs = validate_and_normalize_path(profiler_dir_abs, "profiler") + except ValidationError: + raise ParamValueError("Invalid profiler dir") + + return profiler_dir_abs + def init_module(app): """ Init module entry. diff --git a/mindinsight/datavisual/utils/tools.py b/mindinsight/datavisual/utils/tools.py index 4a9f527..813bc68 100644 --- a/mindinsight/datavisual/utils/tools.py +++ b/mindinsight/datavisual/utils/tools.py @@ -156,6 +156,26 @@ def get_profiler_dir(request): return profiler_dir +def unquote_args(request, arg_name): + """ + Get args from requst query string and unquote content. + + Args: + request (FlaskRequest): Http request instance. + arg_name (str): The name of arg. + + Returns: + str, unquoted arg. + """ + arg_value = request.args.get(arg_name, "") + if arg_value is not None: + try: + arg_value = unquote(arg_value, errors='strict') + except UnicodeDecodeError: + raise exceptions.ParamValueError('Unquote error with strict mode') + return arg_value + + def if_nan_inf_to_none(name, value): """ Transform value to None if it is NaN or Inf. diff --git a/mindinsight/profiler/analyser/__init__.py b/mindinsight/profiler/analyser/__init__.py index 384a027..6786c95 100644 --- a/mindinsight/profiler/analyser/__init__.py +++ b/mindinsight/profiler/analyser/__init__.py @@ -15,3 +15,4 @@ """Import analyser.""" from .analyser import * from .step_trace_analyser import StepTraceAnalyser +from .minddata_analyser import MinddataAnalyser diff --git a/mindinsight/profiler/analyser/minddata_analyser.py b/mindinsight/profiler/analyser/minddata_analyser.py new file mode 100644 index 0000000..2973883 --- /dev/null +++ b/mindinsight/profiler/analyser/minddata_analyser.py @@ -0,0 +1,260 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""Data process analyser.""" +import os + +from mindinsight.profiler.analyser import BaseAnalyser + + +class MinddataAnalyser(BaseAnalyser): + """The Minddata profiling analyser.""" + + def analyse_get_next_info(self, info_type="all"): + """ + Analyse the get_next operation info. + + Args: + info_type (str): The info type to return, default return both queue and time info, + other options are ["queue", "time"]. + + Returns: + list[list], all get_next operation info, each info contains node_name, start, end, queue_size. + """ + # init queue info result + queue_info = dict() + queue_size_list = [] + empty_step_count = 0 + + # init time info result + time_info = dict() + time_list = [] + total_cost = 0 + + file_name = "minddata_aicpu_" + self._device_id + ".txt" + file_path = MinddataAnalyser.find_target_file(self._profiling_dir, file_name) + + if file_path: + with open(file_path) as data_file: + for line in data_file.readlines(): + node_info = line.split() + if node_info and node_info[0] == "GetNext_dequeue_wait": + # analyse target info type + if len(node_info) > 3 and info_type in ["all", "queue"]: + queue_size_list.append(int(node_info[3])) + if node_info[3] == '0': + empty_step_count += 1 + if len(node_info) > 2 and info_type in ["all", "time"]: + one_step_cost_time = (float(node_info[2]) - float(node_info[1]))/1e3 + time_list.append(one_step_cost_time) + total_cost += one_step_cost_time + if info_type in ["all", "time"]: + queue_info["size"] = len(queue_size_list) + queue_info["info"] = {"queue": queue_size_list} + queue_info["summary"] = { + "queue_summary": { + "empty_queue": empty_step_count + } + } + if len(node_info) > 2 and info_type in ["all", "time"]: + time_info["size"] = len(time_list) + time_info["info"] = {"get_next": time_list} + time_info["summary"] = { + "time_summary": { + "avg_cost": "0" if not time_list else str(total_cost / len(time_list)) + } + } + + return queue_info, time_info + + def analyse_device_queue_info(self, info_type="all"): + """ + Analyse the device_queue operation info. + + Args: + info_type (str): The info type to return, default return both queue and time info, + other options are ["queue", "time"]. + + Returns: + dict, queue size info. + dict, time cost info. + """ + # init queue info result + queue_info = dict() + get_time_list, push_time_list, total_time_list = [], [], [] + total_cost, total_push, total_get = 0, 0, 0 + + # init time info result + time_info = dict() + queue_size_list = [] + empty_step, full_step = 0, 0 + + device_queue_file_name = "device_queue_profiling" + self._device_id + ".txt" + device_queue_file_path = MinddataAnalyser.find_target_file(self._profiling_dir, device_queue_file_name) + feed_file_name = "dataset_iterator_profiling_" + self._device_id + ".txt" + feed_file_path = MinddataAnalyser.find_target_file(self._profiling_dir, feed_file_name) + if device_queue_file_path: + file_path = device_queue_file_name + elif not device_queue_file_path and feed_file_path: + file_path = feed_file_path + else: + file_path = "" + + if file_path: + with open(file_path) as data_file: + for line in data_file.readlines(): + op_info = line.split() + # time info + if op_info and op_info[0] == "0" and info_type in ["all", "time"]: + # sub_type: 0 get_time, 1 push time, 2 total time + # op_info: 2: step num 3: cost time + if op_info[1] == "0": + get_time_list.append([int(op_info[2]), float(op_info[3])]) + total_cost += float(op_info[3]) + elif op_info[1] == "1": + push_time_list.append([int(op_info[2]), float(op_info[3])]) + total_push += float(op_info[3]) + elif op_info[1] == "2": + total_time_list.append([int(op_info[2]), float(op_info[3])]) + total_get += float(op_info[3]) + elif op_info and op_info[0] == "1" and info_type in ["all", "queue"]: + queue_size_list.append([int(op_info[2]), int(op_info[3])]) + if op_info[1] == op_info[3]: + full_step += 1 + if op_info[3] == "0": + empty_step += 1 + if info_type in ["all", "time"]: + total_time_list = MinddataAnalyser.sort_step(total_time_list) + push_time_list = MinddataAnalyser.sort_step(push_time_list) + get_time_list = MinddataAnalyser.sort_step(get_time_list) + + time_info["size"] = len(total_time_list) + time_info["info"] = {"total_cost": total_time_list, + "push_cost": push_time_list, + "get_cost": get_time_list} + time_info["summary"] = {"time_summary": {"avg_cost": total_cost/time_info["size"]}} + time_info["summary"]["time_summary"]["get_cost"] = total_get/time_info["size"] + time_info["summary"]["time_summary"]["push_cost"] = total_push/time_info["size"] + + if info_type in ["all", "queue"]: + queue_size_list = MinddataAnalyser.sort_step(queue_size_list) + + queue_info["size"] = len(queue_size_list) + queue_info["info"] = {"queue": queue_size_list} + queue_info["summary"] = {"queue_summary": {"empty_queue": empty_step}} + queue_info["summary"]["queue_summary"]["full_queue"] = full_step + + return queue_info, time_info + + @staticmethod + def analyse_queue_summary(get_next_queue_info, device_queue_info): + """ + Analyse the queue summary info. + + Args: + get_next_queue_info (dict): the get_next queue info return by ananlyser. + device_queue_info (dict): the device queue info return by ananlyser. + + Returns: + dict, the summary of queue. + """ + if not get_next_queue_info and not device_queue_info: + return {} + + get_next_queue_empty_count = 0 + if get_next_queue_info: + result = {"data_process": {"status": "normal"}, + "device_queue_op": {"status": "normal"}, + "tdt": {"status": "normal"}, + "get_next": {"status": "normal"}} + get_next_queue_empty_count = get_next_queue_info.get( + "summary", {}).get("queue_summary", {}).get("empty_queue", 0) + result["get_next_queue_info"] = { + "summary": { + "empty_batch_count": get_next_queue_empty_count, + "total_batch": get_next_queue_info.get("size") + } + } + else: + result = {"data_process": {"status": "normal"}, + "fpbp": {"status": "normal"}} + + device_queue_empty_count = device_queue_info.get( + "summary", {}).get("queue_summary", {}).get("empty_queue", 0) + device_queue_full_count = device_queue_info.get( + "summary", {}).get("queue_summary", {}).get("full_queue", 0) + result["device_queue_info"] = { + "summary": { + "empty_batch_count": device_queue_empty_count, + "full_batch_count": device_queue_full_count, + "total_batch": device_queue_info.get("size") + } + } + + if not get_next_queue_info or (get_next_queue_info and get_next_queue_empty_count == 0): + if device_queue_empty_count > device_queue_info.get("size", 0)*0.7: + result["data_process"]["status"] = "warning" + elif device_queue_empty_count < device_queue_info.get("size", 0)*0.9: + result["fpbp"]["status"] = "warning" + + return result + + + @staticmethod + def sort_step(step_info_list): + """ + Sorting the list by the first item and return the list of second item. + + Args: + step_info_list (list): the step info, contains [step_num, info]. + + Returns: + list, the info list sorted by step. + """ + step_info_list.sort(key=lambda x: x[0]) + result = [] + for item in step_info_list: + result.append(item[1]) + return result + + @staticmethod + def find_target_file(file_dir, file_name): + """ + Find the target file in dir, and return the find file's abs path or "". + + Args: + file_dir (str): The target file dir. + file_name (str): The target file name. + + Returns: + str, the abs file path. + """ + target_file_path = "" + for root_path, _, file_names in os.walk(file_dir): + for item in file_names: + if item == file_name: + target_file_path = os.path.join(root_path, file_name) + + return target_file_path + + def _filter(self, filter_condition): + """ + Filter the profiling data according to the filter condition. + + Args: + filter_condition (dict): The filter condition. + """ + + def _load(self): + """Load data according to the parsed profiling files.""" diff --git a/mindinsight/profiler/parser/minddata_parser.py b/mindinsight/profiler/parser/minddata_parser.py new file mode 100644 index 0000000..c58becb --- /dev/null +++ b/mindinsight/profiler/parser/minddata_parser.py @@ -0,0 +1,88 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""Minddata aicpu parser.""" +import os + +from tabulate import tabulate + +from mindinsight.profiler.common._utils import get_file_join_name, fwrite_format +from mindinsight.profiler.common.log import logger + + +class MinddataParser: + """Minddata Aicpu Parser.""" + @staticmethod + def parse_minddata_aicpu_data(minddata_aicpu_source_path): + """ + Parse minddata get_next info which contains queue size and execute time. + + Args: + minddata_aicpu_source_path (str): the source file path. + + Returns: + list[Union[str, float]], the converted data. + """ + result = list() + try: + with open(minddata_aicpu_source_path) as source_data_file: + source_data = source_data_file.read() + step_data = source_data.split("\x00") + for one_step in step_data: + if one_step: + node_info = one_step.split(", ") + node_name, node_start, node_end, queue_size = "", 0, 0, 0 + if node_info: + node_name = node_info[0].replace("Node:", "") + if len(node_info) > 2: + node_start = node_info[1].replace("Run start:", "") + if node_start.isdigit(): + node_start = int(node_start) + node_end = node_info[2].replace("Run end:", "") + if node_end.isdigit(): + node_end = int(node_end) + if len(node_info) > 3: + queue_size = node_info[3].replace("queue size:", "") + if queue_size.isdigit(): + queue_size = int(queue_size) + + one_step_list = [node_name, node_start, node_end, queue_size] + result.append(one_step_list) + except OSError: + logger.error("Open get_next profiling file error.") + + return result + + @staticmethod + def execute(source_path, output_path, device_id): + """ + Execute the parser. + + Args: + source_path (str): the source file path. + output_path (str): the output file path. + device_id (str): the device id. + """ + col_names = ["node_name", "start_time", "end_time", "queue_size"] + minddata_aicpu_source_path = get_file_join_name( + input_path=source_path, file_name='DATA_PREPROCESS.dev.AICPUMI') + minddata_aicpu_output_path = os.path.join(output_path, "minddata_aicpu_" + device_id + ".txt") + if minddata_aicpu_source_path: + minddata_aicpu_data = MinddataParser.parse_minddata_aicpu_data(minddata_aicpu_source_path) + if minddata_aicpu_data: + fwrite_format( + minddata_aicpu_output_path, + tabulate(minddata_aicpu_data, col_names, tablefmt='simple'), + is_start=True + ) diff --git a/mindinsight/profiler/profiling.py b/mindinsight/profiler/profiling.py index dde54d8..79e3769 100644 --- a/mindinsight/profiler/profiling.py +++ b/mindinsight/profiler/profiling.py @@ -29,6 +29,7 @@ from mindinsight.profiler.common.validator.validate_path import \ from mindinsight.profiler.parser.aicpu_data_parser import DataPreProcessParser from mindinsight.profiler.parser.framework_parser import FrameworkParser from mindinsight.profiler.parser.hwts_log_parser import HWTSLogParser +from mindinsight.profiler.parser.minddata_parser import MinddataParser from mindinsight.profiler.parser.optime_parser import OPComputeTimeParser from mindinsight.profiler.parser.step_trace_parser import StepTraceParser from mindinsight.utils.exceptions import MindInsightException @@ -107,6 +108,7 @@ class Profiler: os.environ['PROFILING_MODE'] = 'true' os.environ['PROFILING_OPTIONS'] = 'training_trace:task_trace' + os.environ['MINDDATA_PROFILING_DIR'] = self._output_path # use context interface to open profiling, for the new mindspore version(after 2020.5.21) try: import mindspore.context as context @@ -193,6 +195,9 @@ class Profiler: except FileNotFoundError as err: logger.exception(err) + # Parsing minddata AICPU profiling + MinddataParser.execute(source_path, self._output_path, self._dev_id) + # analyse op compute time info try: self._analyser_op_info() @@ -208,7 +213,7 @@ class Profiler: Args: source_path (str): The directory that contains the step trace original data. - framework_parser (str): The framework parse instance. + framework_parser (FrameworkParser): The framework parse instance. """ logger.info("Begin to parse step trace.") # construct output path -- GitLab