提交 3732f1a2 编写于 作者: Y yuximiao

add minddata profiling

上级 13841a69
...@@ -26,9 +26,11 @@ from flask import request ...@@ -26,9 +26,11 @@ from flask import request
from marshmallow import ValidationError from marshmallow import ValidationError
from mindinsight.conf import settings from mindinsight.conf import settings
from mindinsight.datavisual.utils.tools import get_train_id, get_profiler_dir, to_int from mindinsight.datavisual.utils.tools import get_train_id, get_profiler_dir, unquote_args
from mindinsight.datavisual.utils.tools import to_int
from mindinsight.lineagemgr.common.validator.validate_path import validate_and_normalize_path from mindinsight.lineagemgr.common.validator.validate_path import validate_and_normalize_path
from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory
from mindinsight.profiler.analyser.minddata_analyser import MinddataAnalyser
from mindinsight.profiler.common.util import analyse_device_list_from_profiler_dir from mindinsight.profiler.common.util import analyse_device_list_from_profiler_dir
from mindinsight.profiler.common.validator.validate import validate_condition, validate_ui_proc from mindinsight.profiler.common.validator.validate import validate_condition, validate_ui_proc
from mindinsight.profiler.common.validator.validate_path import validate_and_normalize_profiler_path from mindinsight.profiler.common.validator.validate_path import validate_and_normalize_profiler_path
...@@ -167,6 +169,111 @@ def get_target_time_info(): ...@@ -167,6 +169,111 @@ def get_target_time_info():
return jsonify(target_time_info) return jsonify(target_time_info)
@BLUEPRINT.route("/profile/queue_info", methods=["GET"])
def get_queue_info():
"""
Get each type queue info.
Returns:
Response, the queue info.
Examples:
>>> GET http://xxxx/v1/mindinsight/profile/queue_info
"""
profile_dir = get_profiler_abs_dir(request)
device_id = unquote_args(request, "device_id")
queue_type = unquote_args(request, "type")
queue_info = {}
minddata_analyser = AnalyserFactory.instance().get_analyser(
'minddata', profile_dir, device_id)
if queue_type == "get_next":
queue_info, _ = minddata_analyser.analyse_get_next_info(info_type="queue")
elif queue_type == "device_queue":
queue_info, _ = minddata_analyser.analyse_device_queue_info(info_type="queue")
return jsonify(queue_info)
@BLUEPRINT.route("/profile/minddata_op", methods=["GET"])
def get_time_info():
"""
Get minddata operation info.
Returns:
Response, the minddata operation info.
Examples:
>>> GET http://xxxx/v1/mindinsight/profile/minddata_op
"""
profile_dir = get_profiler_abs_dir(request)
device_id = unquote_args(request, "device_id")
op_type = unquote_args(request, "type")
time_info = {
'size': 0,
'info': [],
"summary": {"time_summary": {}},
"advise": {}
}
minddata_analyser = AnalyserFactory.instance().get_analyser(
'minddata', profile_dir, device_id)
if op_type == "get_next":
_, time_info = minddata_analyser.analyse_get_next_info(info_type="time")
elif op_type == "device_queue":
_, time_info = minddata_analyser.analyse_device_queue_info(info_type="time")
return jsonify(time_info)
@BLUEPRINT.route("/profile/process_summary", methods=["GET"])
def get_process_summary():
"""
Get interval process summary.
Returns:
Response, the process summary.
Examples:
>>> GET http://xxxx/v1/mindinsight/profile/process_summary
"""
profile_dir = get_profiler_abs_dir(request)
device_id = unquote_args(request, "device_id")
minddata_analyser = AnalyserFactory.instance().get_analyser(
'minddata', profile_dir, device_id)
get_next_queue_info, _ = minddata_analyser.analyse_get_next_info(info_type="queue")
device_queue_info, _ = minddata_analyser.analyse_device_queue_info(info_type="queue")
result = MinddataAnalyser.analyse_queue_summary(get_next_queue_info, device_queue_info)
return jsonify(result)
def get_profiler_abs_dir(requests):
"""
Get interval process summary.
Args:
requests (LocalProxy): The requests.
Returns:
str, the profiler abs dir.
"""
profiler_dir = get_profiler_dir(requests)
train_id = get_train_id(requests)
if not profiler_dir or not train_id:
raise ParamValueError("No profiler_dir or train_id.")
profiler_dir_abs = os.path.join(settings.SUMMARY_BASE_DIR, train_id, profiler_dir)
try:
profiler_dir_abs = validate_and_normalize_path(profiler_dir_abs, "profiler")
except ValidationError:
raise ParamValueError("Invalid profiler dir")
return profiler_dir_abs
def init_module(app): def init_module(app):
""" """
Init module entry. Init module entry.
......
...@@ -156,6 +156,26 @@ def get_profiler_dir(request): ...@@ -156,6 +156,26 @@ def get_profiler_dir(request):
return profiler_dir return profiler_dir
def unquote_args(request, arg_name):
"""
Get args from requst query string and unquote content.
Args:
request (FlaskRequest): Http request instance.
arg_name (str): The name of arg.
Returns:
str, unquoted arg.
"""
arg_value = request.args.get(arg_name, "")
if arg_value is not None:
try:
arg_value = unquote(arg_value, errors='strict')
except UnicodeDecodeError:
raise exceptions.ParamValueError('Unquote error with strict mode')
return arg_value
def if_nan_inf_to_none(name, value): def if_nan_inf_to_none(name, value):
""" """
Transform value to None if it is NaN or Inf. Transform value to None if it is NaN or Inf.
......
...@@ -15,3 +15,4 @@ ...@@ -15,3 +15,4 @@
"""Import analyser.""" """Import analyser."""
from .analyser import * from .analyser import *
from .step_trace_analyser import StepTraceAnalyser from .step_trace_analyser import StepTraceAnalyser
from .minddata_analyser import MinddataAnalyser
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Data process analyser."""
import os
from mindinsight.profiler.analyser import BaseAnalyser
class MinddataAnalyser(BaseAnalyser):
"""The Minddata profiling analyser."""
def analyse_get_next_info(self, info_type="all"):
"""
Analyse the get_next operation info.
Args:
info_type (str): The info type to return, default return both queue and time info,
other options are ["queue", "time"].
Returns:
list[list], all get_next operation info, each info contains node_name, start, end, queue_size.
"""
# init queue info result
queue_info = dict()
queue_size_list = []
empty_step_count = 0
# init time info result
time_info = dict()
time_list = []
total_cost = 0
file_name = "minddata_aicpu_" + self._device_id + ".txt"
file_path = MinddataAnalyser.find_target_file(self._profiling_dir, file_name)
if file_path:
with open(file_path) as data_file:
for line in data_file.readlines():
node_info = line.split()
if node_info and node_info[0] == "GetNext_dequeue_wait":
# analyse target info type
if len(node_info) > 3 and info_type in ["all", "queue"]:
queue_size_list.append(int(node_info[3]))
if node_info[3] == '0':
empty_step_count += 1
if len(node_info) > 2 and info_type in ["all", "time"]:
one_step_cost_time = (float(node_info[2]) - float(node_info[1]))/1e3
time_list.append(one_step_cost_time)
total_cost += one_step_cost_time
if info_type in ["all", "time"]:
queue_info["size"] = len(queue_size_list)
queue_info["info"] = {"queue": queue_size_list}
queue_info["summary"] = {
"queue_summary": {
"empty_queue": empty_step_count
}
}
if len(node_info) > 2 and info_type in ["all", "time"]:
time_info["size"] = len(time_list)
time_info["info"] = {"get_next": time_list}
time_info["summary"] = {
"time_summary": {
"avg_cost": "0" if not time_list else str(total_cost / len(time_list))
}
}
return queue_info, time_info
def analyse_device_queue_info(self, info_type="all"):
"""
Analyse the device_queue operation info.
Args:
info_type (str): The info type to return, default return both queue and time info,
other options are ["queue", "time"].
Returns:
dict, queue size info.
dict, time cost info.
"""
# init queue info result
queue_info = dict()
get_time_list, push_time_list, total_time_list = [], [], []
total_cost, total_push, total_get = 0, 0, 0
# init time info result
time_info = dict()
queue_size_list = []
empty_step, full_step = 0, 0
device_queue_file_name = "device_queue_profiling" + self._device_id + ".txt"
device_queue_file_path = MinddataAnalyser.find_target_file(self._profiling_dir, device_queue_file_name)
feed_file_name = "dataset_iterator_profiling_" + self._device_id + ".txt"
feed_file_path = MinddataAnalyser.find_target_file(self._profiling_dir, feed_file_name)
if device_queue_file_path:
file_path = device_queue_file_name
elif not device_queue_file_path and feed_file_path:
file_path = feed_file_path
else:
file_path = ""
if file_path:
with open(file_path) as data_file:
for line in data_file.readlines():
op_info = line.split()
# time info
if op_info and op_info[0] == "0" and info_type in ["all", "time"]:
# sub_type: 0 get_time, 1 push time, 2 total time
# op_info: 2: step num 3: cost time
if op_info[1] == "0":
get_time_list.append([int(op_info[2]), float(op_info[3])])
total_cost += float(op_info[3])
elif op_info[1] == "1":
push_time_list.append([int(op_info[2]), float(op_info[3])])
total_push += float(op_info[3])
elif op_info[1] == "2":
total_time_list.append([int(op_info[2]), float(op_info[3])])
total_get += float(op_info[3])
elif op_info and op_info[0] == "1" and info_type in ["all", "queue"]:
queue_size_list.append([int(op_info[2]), int(op_info[3])])
if op_info[1] == op_info[3]:
full_step += 1
if op_info[3] == "0":
empty_step += 1
if info_type in ["all", "time"]:
total_time_list = MinddataAnalyser.sort_step(total_time_list)
push_time_list = MinddataAnalyser.sort_step(push_time_list)
get_time_list = MinddataAnalyser.sort_step(get_time_list)
time_info["size"] = len(total_time_list)
time_info["info"] = {"total_cost": total_time_list,
"push_cost": push_time_list,
"get_cost": get_time_list}
time_info["summary"] = {"time_summary": {"avg_cost": total_cost/time_info["size"]}}
time_info["summary"]["time_summary"]["get_cost"] = total_get/time_info["size"]
time_info["summary"]["time_summary"]["push_cost"] = total_push/time_info["size"]
if info_type in ["all", "queue"]:
queue_size_list = MinddataAnalyser.sort_step(queue_size_list)
queue_info["size"] = len(queue_size_list)
queue_info["info"] = {"queue": queue_size_list}
queue_info["summary"] = {"queue_summary": {"empty_queue": empty_step}}
queue_info["summary"]["queue_summary"]["full_queue"] = full_step
return queue_info, time_info
@staticmethod
def analyse_queue_summary(get_next_queue_info, device_queue_info):
"""
Analyse the queue summary info.
Args:
get_next_queue_info (dict): the get_next queue info return by ananlyser.
device_queue_info (dict): the device queue info return by ananlyser.
Returns:
dict, the summary of queue.
"""
if not get_next_queue_info and not device_queue_info:
return {}
get_next_queue_empty_count = 0
if get_next_queue_info:
result = {"data_process": {"status": "normal"},
"device_queue_op": {"status": "normal"},
"tdt": {"status": "normal"},
"get_next": {"status": "normal"}}
get_next_queue_empty_count = get_next_queue_info.get(
"summary", {}).get("queue_summary", {}).get("empty_queue", 0)
result["get_next_queue_info"] = {
"summary": {
"empty_batch_count": get_next_queue_empty_count,
"total_batch": get_next_queue_info.get("size")
}
}
else:
result = {"data_process": {"status": "normal"},
"fpbp": {"status": "normal"}}
device_queue_empty_count = device_queue_info.get(
"summary", {}).get("queue_summary", {}).get("empty_queue", 0)
device_queue_full_count = device_queue_info.get(
"summary", {}).get("queue_summary", {}).get("full_queue", 0)
result["device_queue_info"] = {
"summary": {
"empty_batch_count": device_queue_empty_count,
"full_batch_count": device_queue_full_count,
"total_batch": device_queue_info.get("size")
}
}
if not get_next_queue_info or (get_next_queue_info and get_next_queue_empty_count == 0):
if device_queue_empty_count > device_queue_info.get("size", 0)*0.7:
result["data_process"]["status"] = "warning"
elif device_queue_empty_count < device_queue_info.get("size", 0)*0.9:
result["fpbp"]["status"] = "warning"
return result
@staticmethod
def sort_step(step_info_list):
"""
Sorting the list by the first item and return the list of second item.
Args:
step_info_list (list): the step info, contains [step_num, info].
Returns:
list, the info list sorted by step.
"""
step_info_list.sort(key=lambda x: x[0])
result = []
for item in step_info_list:
result.append(item[1])
return result
@staticmethod
def find_target_file(file_dir, file_name):
"""
Find the target file in dir, and return the find file's abs path or "".
Args:
file_dir (str): The target file dir.
file_name (str): The target file name.
Returns:
str, the abs file path.
"""
target_file_path = ""
for root_path, _, file_names in os.walk(file_dir):
for item in file_names:
if item == file_name:
target_file_path = os.path.join(root_path, file_name)
return target_file_path
def _filter(self, filter_condition):
"""
Filter the profiling data according to the filter condition.
Args:
filter_condition (dict): The filter condition.
"""
def _load(self):
"""Load data according to the parsed profiling files."""
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Minddata aicpu parser."""
import os
from tabulate import tabulate
from mindinsight.profiler.common._utils import get_file_join_name, fwrite_format
from mindinsight.profiler.common.log import logger
class MinddataParser:
"""Minddata Aicpu Parser."""
@staticmethod
def parse_minddata_aicpu_data(minddata_aicpu_source_path):
"""
Parse minddata get_next info which contains queue size and execute time.
Args:
minddata_aicpu_source_path (str): the source file path.
Returns:
list[Union[str, float]], the converted data.
"""
result = list()
try:
with open(minddata_aicpu_source_path) as source_data_file:
source_data = source_data_file.read()
step_data = source_data.split("\x00")
for one_step in step_data:
if one_step:
node_info = one_step.split(", ")
node_name, node_start, node_end, queue_size = "", 0, 0, 0
if node_info:
node_name = node_info[0].replace("Node:", "")
if len(node_info) > 2:
node_start = node_info[1].replace("Run start:", "")
if node_start.isdigit():
node_start = int(node_start)
node_end = node_info[2].replace("Run end:", "")
if node_end.isdigit():
node_end = int(node_end)
if len(node_info) > 3:
queue_size = node_info[3].replace("queue size:", "")
if queue_size.isdigit():
queue_size = int(queue_size)
one_step_list = [node_name, node_start, node_end, queue_size]
result.append(one_step_list)
except OSError:
logger.error("Open get_next profiling file error.")
return result
@staticmethod
def execute(source_path, output_path, device_id):
"""
Execute the parser.
Args:
source_path (str): the source file path.
output_path (str): the output file path.
device_id (str): the device id.
"""
col_names = ["node_name", "start_time", "end_time", "queue_size"]
minddata_aicpu_source_path = get_file_join_name(
input_path=source_path, file_name='DATA_PREPROCESS.dev.AICPUMI')
minddata_aicpu_output_path = os.path.join(output_path, "minddata_aicpu_" + device_id + ".txt")
if minddata_aicpu_source_path:
minddata_aicpu_data = MinddataParser.parse_minddata_aicpu_data(minddata_aicpu_source_path)
if minddata_aicpu_data:
fwrite_format(
minddata_aicpu_output_path,
tabulate(minddata_aicpu_data, col_names, tablefmt='simple'),
is_start=True
)
...@@ -29,6 +29,7 @@ from mindinsight.profiler.common.validator.validate_path import \ ...@@ -29,6 +29,7 @@ from mindinsight.profiler.common.validator.validate_path import \
from mindinsight.profiler.parser.aicpu_data_parser import DataPreProcessParser from mindinsight.profiler.parser.aicpu_data_parser import DataPreProcessParser
from mindinsight.profiler.parser.framework_parser import FrameworkParser from mindinsight.profiler.parser.framework_parser import FrameworkParser
from mindinsight.profiler.parser.hwts_log_parser import HWTSLogParser from mindinsight.profiler.parser.hwts_log_parser import HWTSLogParser
from mindinsight.profiler.parser.minddata_parser import MinddataParser
from mindinsight.profiler.parser.optime_parser import OPComputeTimeParser from mindinsight.profiler.parser.optime_parser import OPComputeTimeParser
from mindinsight.profiler.parser.step_trace_parser import StepTraceParser from mindinsight.profiler.parser.step_trace_parser import StepTraceParser
from mindinsight.utils.exceptions import MindInsightException from mindinsight.utils.exceptions import MindInsightException
...@@ -107,6 +108,7 @@ class Profiler: ...@@ -107,6 +108,7 @@ class Profiler:
os.environ['PROFILING_MODE'] = 'true' os.environ['PROFILING_MODE'] = 'true'
os.environ['PROFILING_OPTIONS'] = 'training_trace:task_trace' os.environ['PROFILING_OPTIONS'] = 'training_trace:task_trace'
os.environ['MINDDATA_PROFILING_DIR'] = self._output_path
# use context interface to open profiling, for the new mindspore version(after 2020.5.21) # use context interface to open profiling, for the new mindspore version(after 2020.5.21)
try: try:
import mindspore.context as context import mindspore.context as context
...@@ -193,6 +195,9 @@ class Profiler: ...@@ -193,6 +195,9 @@ class Profiler:
except FileNotFoundError as err: except FileNotFoundError as err:
logger.exception(err) logger.exception(err)
# Parsing minddata AICPU profiling
MinddataParser.execute(source_path, self._output_path, self._dev_id)
# analyse op compute time info # analyse op compute time info
try: try:
self._analyser_op_info() self._analyser_op_info()
...@@ -208,7 +213,7 @@ class Profiler: ...@@ -208,7 +213,7 @@ class Profiler:
Args: Args:
source_path (str): The directory that contains the step trace original data. source_path (str): The directory that contains the step trace original data.
framework_parser (str): The framework parse instance. framework_parser (FrameworkParser): The framework parse instance.
""" """
logger.info("Begin to parse step trace.") logger.info("Begin to parse step trace.")
# construct output path # construct output path
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册