提交 ee3f2412 编写于 作者: C chenchao99

add feature of minddata pipeline stage analysing function

上级 a01709e7
......@@ -26,14 +26,15 @@ from flask import request
from marshmallow import ValidationError
from mindinsight.conf import settings
from mindinsight.datavisual.utils.tools import get_train_id, get_profiler_dir, unquote_args
from mindinsight.datavisual.utils.tools import to_int
from mindinsight.lineagemgr.common.validator.validate_path import validate_and_normalize_path
from mindinsight.datavisual.utils.tools import get_train_id, get_profiler_dir, \
unquote_args, to_int
from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory
from mindinsight.profiler.analyser.minddata_analyser import MinddataAnalyser
from mindinsight.profiler.common.util import analyse_device_list_from_profiler_dir
from mindinsight.profiler.common.validator.validate import validate_condition, validate_ui_proc
from mindinsight.profiler.common.validator.validate_path import validate_and_normalize_profiler_path
from mindinsight.profiler.common.validator.validate import validate_condition, \
validate_ui_proc, validate_minddata_pipeline_condition
from mindinsight.profiler.common.validator.validate_path import \
validate_and_normalize_profiler_path, validate_and_normalize_path
from mindinsight.utils.exceptions import ParamValueError
BLUEPRINT = Blueprint("profile", __name__, url_prefix=settings.URL_PREFIX)
......@@ -274,6 +275,92 @@ def get_profiler_abs_dir(requests):
return profiler_dir_abs
@BLUEPRINT.route("/profile/minddata-pipeline/op-queue", methods=["POST"])
def get_minddata_pipeline_op_queue_info():
"""
Get minddata pipeline operator info and queue info.
Returns:
str, the operation information and queue information.
Raises:
ParamValueError: If the search condition contains some errors.
Examples:
>>> POST http://xxxx/v1/mindinsight/profile/minddata-pipeline/op-queue
"""
profiler_dir = get_profiler_dir(request)
train_id = get_train_id(request)
if not profiler_dir or not train_id:
raise ParamValueError("No profiler_dir or train_id.")
profiler_dir_abs = os.path.join(
settings.SUMMARY_BASE_DIR, train_id, profiler_dir
)
try:
profiler_dir_abs = validate_and_normalize_path(
profiler_dir_abs, "profiler"
)
except ValidationError:
raise ParamValueError("Invalid profiler dir.")
condition = request.stream.read()
try:
condition = json.loads(condition) if condition else {}
except Exception:
raise ParamValueError("Json data parse failed.")
validate_minddata_pipeline_condition(condition)
device_id = condition.get("device_id", "0")
analyser = AnalyserFactory.instance().get_analyser(
'minddata_pipeline', profiler_dir_abs, device_id
)
op_info = analyser.query(condition)
return jsonify(op_info)
@BLUEPRINT.route("/profile/minddata-pipeline/queue", methods=["GET"])
def get_minddata_pipeline_queue_info():
"""
Get the special minddata pipeline queue info.
Returns:
str, the queue information.
Raises:
ParamValueError: If the search condition contains some errors.
Examples:
>>> GET http://xxxx/v1/mindinsight/profile/minddata-pipeline/queue
"""
profiler_dir = get_profiler_dir(request)
train_id = get_train_id(request)
if not profiler_dir or not train_id:
raise ParamValueError("No profiler_dir or train_id.")
profiler_dir_abs = os.path.join(
settings.SUMMARY_BASE_DIR, train_id, profiler_dir
)
try:
profiler_dir_abs = validate_and_normalize_path(
profiler_dir_abs, "profiler"
)
except ValidationError:
raise ParamValueError("Invalid profiler dir.")
device_id = request.args.get('device_id', default='0')
op_id = request.args.get('op_id', type=int)
if op_id is None:
raise ParamValueError("Invalid operator id or operator id does not exist.")
analyser = AnalyserFactory.instance().get_analyser(
'minddata_pipeline', profiler_dir_abs, device_id
)
op_queue_info = analyser.get_op_and_parent_op_info(op_id)
return jsonify(op_queue_info)
def init_module(app):
"""
Init module entry.
......
......@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Import analyser."""
from .analyser import *
from .step_trace_analyser import StepTraceAnalyser
from .minddata_analyser import MinddataAnalyser
"""The analyser module."""
from . import analyser, minddata_pipeline_analyser, step_trace_analyser, \
minddata_analyser
......@@ -29,9 +29,11 @@ class AicoreTypeAnalyser(BaseAnalyser):
profiling_dir (str): The directory where the parsed profiling files are
located.
device_id (str): The device ID.
Raises:
ProfilerPathErrorException: If the profiling dir is invalid.
"""
__col_names__ = ['op_type', 'execution_time', 'execution_frequency',
'percent']
_col_names = ['op_type', 'execution_time', 'execution_frequency', 'percent']
_file_name_aicore_type_time = 'aicore_intermediate_{}_type.csv'
def _load(self):
......@@ -82,9 +84,12 @@ class AicoreDetailAnalyser(BaseAnalyser):
profiling_dir (str): The directory where the parsed profiling files are
located.
device_id (str): The device ID.
Raises:
ProfilerPathErrorException: If the profiling dir is invalid.
"""
__col_names__ = ['op_name', 'op_type', 'execution_time', 'subgraph',
'full_op_name', 'op_info']
_col_names = ['op_name', 'op_type', 'execution_time', 'subgraph',
'full_op_name', 'op_info']
_file_name_aicore_detail_time = 'aicore_intermediate_{}_detail.csv'
_file_name_framework_info = 'framework_raw_{}.csv'
......@@ -210,11 +215,11 @@ class AicoreDetailAnalyser(BaseAnalyser):
is_display_full_op_name (bool): Whether to display the operator full
name.
"""
self._display_col_names = self.__col_names__[0:4]
self._display_col_names = self._col_names[0:4]
if is_display_full_op_name:
self._display_col_names.append(self.__col_names__[4])
self._display_col_names.append(self._col_names[4])
if is_display_detail:
self._display_col_names.append(self.__col_names__[5])
self._display_col_names.append(self._col_names[5])
def _convert_framework_field_type(self, row):
"""
......@@ -253,10 +258,13 @@ class AicpuAnalyser(BaseAnalyser):
profiling_dir (str): The directory where the parsed profiling files are
located.
device_id (str): The device ID.
Raises:
ProfilerPathErrorException: If the profiling dir is invalid.
"""
__col_names__ = ['serial_number', 'op_name', 'total_time', 'dispatch_time',
'RunV2_start', 'compute_start', 'memcpy_start',
'memcpy_end', 'RunV2_end']
_col_names = ['serial_number', 'op_name', 'total_time', 'dispatch_time',
'RunV2_start', 'compute_start', 'memcpy_start', 'memcpy_end',
'RunV2_end']
_file_name_aicpu_time = 'aicpu_intermediate_{}.csv'
def _load(self):
......
......@@ -24,13 +24,9 @@ class AnalyserFactory:
"""
The analyser factory is used to create analyser special instance.
Currently the factory supports creating `AicoreTypeAnalyser`,
`AicoreDetailAnalyser`, `AicpuAnalyser` and `StepTraceAnalyser`.
The `AicoreTypeAnalyser` is used to analyze execution time according to AICORE operator type.
The `AicoreDetailAnalyser` is used to analyze execution time according to
all specific AICORE operator. The `AicpuAnalyser` is used to analyze
execution time according to all specific AICPU operator.
The `StepTraceAnalyser` is used to analyze the execution time according to different process.
Depending on the analyser type, different analyzers can be created. Users
can use the created analyser to query and analyse profiling data, such as
operator information, step trace data and so on.
Examples:
>>> analyser = AnalyserFactory.instance().get_analyser(
......@@ -72,6 +68,10 @@ class AnalyserFactory:
analyser_class_name = ''.join([name.capitalize() for name in subnames])
analyser_class_name += 'Analyser'
if not hasattr(analyser_module, analyser_class_name):
raise ProfilerAnalyserNotExistException(analyser_type)
return getattr(analyser_module, analyser_class_name)(*args)
analyser_sub_modules = dir(analyser_module)
for sub_module in analyser_sub_modules:
if sub_module.endswith('analyser') and sub_module != 'base_analyser':
analyser_sub_module = getattr(analyser_module, sub_module)
if hasattr(analyser_sub_module, analyser_class_name):
return getattr(analyser_sub_module, analyser_class_name)(*args)
raise ProfilerAnalyserNotExistException(analyser_type)
......@@ -39,8 +39,11 @@ class BaseAnalyser(ABC):
profiling_dir (str): The directory where the parsed profiling files
are located.
device_id (str): The device ID.
Raises:
ProfilerPathErrorException: If the profiling dir is invalid.
"""
__col_names__ = []
_col_names = []
def __init__(self, profiling_dir, device_id):
self._profiling_dir = self._normalize_profiling_dir(profiling_dir)
......@@ -61,7 +64,7 @@ class BaseAnalyser(ABC):
@property
def col_names(self):
"""The column names in the parsed profiling file."""
return self.__col_names__
return self._col_names
@property
def data(self):
......@@ -87,7 +90,7 @@ class BaseAnalyser(ABC):
group_condition = condition.get('group_condition')
self._result = []
self._display_col_names = self.__col_names__[:]
self._display_col_names = self._col_names[:]
self._filter(filter_condition)
self._size = len(self._result)
if sort_condition:
......@@ -148,7 +151,7 @@ class BaseAnalyser(ABC):
if not sort_name:
return
try:
index = self.__col_names__.index(sort_name)
index = self._col_names.index(sort_name)
except ValueError:
raise ProfilerColumnNotExistException(sort_name)
if self._none_sort_col_names and sort_name in self._none_sort_col_names:
......@@ -186,8 +189,8 @@ class BaseAnalyser(ABC):
for condition_key, condition_value in condition.items():
if condition_key in self._none_filter_condition_key:
continue
if condition_key in self.__col_names__:
index = self.__col_names__.index(condition_key)
if condition_key in self._col_names:
index = self._col_names.index(condition_key)
actual_value = item[index]
for exp_key, exp_value in condition_value.items():
if not self._is_match_condition(
......
......@@ -15,7 +15,7 @@
"""Data process analyser."""
import os
from mindinsight.profiler.analyser import BaseAnalyser
from mindinsight.profiler.analyser.base_analyser import BaseAnalyser
class MinddataAnalyser(BaseAnalyser):
......
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""The minddata pipeline analyser class."""
import csv
import json
import os
import sys
from mindinsight.profiler.analyser.base_analyser import BaseAnalyser
from mindinsight.profiler.common.exceptions.exceptions import \
ProfilerPipelineOpNotExistException
from mindinsight.profiler.common.log import logger
class MinddataPipelineAnalyser(BaseAnalyser):
"""
The analyser for analyzing the minddata pipeline operator and queue data.
Args:
profiling_dir (str): The directory where the parsed profiling files are
located.
device_id (str): The device ID.
Raises:
ProfilerPathErrorException: If the profiling dir is invalid.
"""
_col_names = ['op_id', 'op_type', 'num_workers', 'output_queue_size',
'output_queue_average_size', 'output_queue_length',
'output_queue_usage_rate', 'sample_interval', 'parent_id',
'children_id']
_file_name_pipeline = 'minddata_pipeline_raw_{}.csv'
_index_op_id = 0
_index_op_type = 1
_index_num_workers = 2
_index_output_queue_size = 3
_index_output_queue_average_size = 4
_index_output_queue_length = 5
_index_output_queue_usage_rate = 6
_index_sample_interval = 7
_index_parent_id = 8
_index_children_id = 9
def __init__(self, profiling_dir, device_id):
super().__init__(profiling_dir, device_id)
self._none_filter_condition_key = ['threshold', 'is_display_op_detail']
self._none_sort_col_names = ['output_queue_size', 'children_id']
self._op_id_index_map = self._get_op_id_index_map()
def get_op_and_parent_op_info(self, op_id):
"""
Get the operator and parent operator information by `op_id`.
Args:
op_id (int): The minddata pipeline operator ID.
Returns:
dict, the operator and parent operator information.
Raises:
ProfilerPipelineOpNotExistException: If the minddata pipeline
operator does not exist.
"""
index = self._op_id_index_map.get(op_id)
if index is None:
raise ProfilerPipelineOpNotExistException(str(op_id))
op_info = self._data[index]
parent_id = op_info[self._index_parent_id]
parent_index = self._op_id_index_map.get(parent_id)
if parent_index is None:
parent_op = None
queue_info = None
else:
parent_op_info = self._data[parent_index]
parent_op = {
'op_id': parent_op_info[self._index_op_id],
'op_type': parent_op_info[self._index_op_type],
'num_workers': parent_op_info[self._index_num_workers]
}
queue_info = {
'output_queue_size': op_info[self._index_output_queue_size],
'output_queue_average_size':
op_info[self._index_output_queue_average_size],
'output_queue_length': op_info[self._index_output_queue_length],
'output_queue_usage_rate':
op_info[self._index_output_queue_usage_rate],
'sample_interval': op_info[self._index_sample_interval]
}
current_op = {
'op_id': op_info[self._index_op_id],
'op_type': op_info[self._index_op_type],
'num_workers': op_info[self._index_num_workers]
}
return {
'current_op': current_op,
'parent_op': parent_op,
'queue_info': queue_info
}
def _load(self):
"""Load data according to the parsed minddata pipeline file."""
pipeline_file_path = os.path.join(
self._profiling_dir,
self._file_name_pipeline.format(self._device_id)
)
if not os.path.isfile(pipeline_file_path):
logger.warning('The file <%s> does not exist.', pipeline_file_path)
return
with open(pipeline_file_path, 'r') as file:
csv.field_size_limit(sys.maxsize)
csv_reader = csv.reader(file)
_ = next(csv_reader)
for info in csv_reader:
self._data.append(self._convert_field_type(info))
def _filter(self, filter_condition):
"""
Filter the profiling data according to the filter condition.
Args:
filter_condition (dict): The filter condition.
"""
def _inner_filter(item: list):
return self._default_filter(item, filter_condition)
def _inner_map(item: list):
inner_item = item[0:2]
inner_item.extend(item[4:])
return inner_item
threshold = filter_condition.get('threshold')
is_display_op_detail = filter_condition.get(
'is_display_op_detail', False
)
self._set_display_col_name(is_display_op_detail)
filter_result = list(filter(_inner_filter, self._data))
if threshold:
low_threshold = threshold[1]
high_threshold = threshold[0]
filter_result = self._filter_outside_threshold(
filter_result, low_threshold, high_threshold
)
if is_display_op_detail:
self._result = filter_result
else:
self._result = list(map(_inner_map, filter_result))
def _filter_outside_threshold(self, data, low_threshold, high_threshold):
"""
Get the data outside the threshold range.
Args:
data (list[list]): The filtered data.
low_threshold (float): The low threshold.
high_threshold (float): The high threshold.
Returns:
list[list], the data outside the threshold range.
"""
root_node = None
leaf_nodes = []
all_below_low_threshold = True
all_higher_high_threshold = True
result = []
for item in data:
parent_id = item[self._index_parent_id]
if parent_id is None:
root_node = item
continue
# current usage rate compared to the threshold
cur_usage_rate = item[self._index_output_queue_usage_rate]
is_low = False
if cur_usage_rate < low_threshold:
is_low = True
else:
all_below_low_threshold = False
if cur_usage_rate < high_threshold:
all_higher_high_threshold = False
# the child node usage rate compared to the threshold
child_ids = item[self._index_children_id]
if not child_ids:
leaf_nodes.append(item)
continue
child_usage_rates = [
self._get_usage_rate_by_op_id(op_id) for op_id in child_ids
]
is_high = True
for usage_rate in child_usage_rates:
if usage_rate < high_threshold:
is_high = False
break
if is_high and is_low:
result.append(item)
if all_below_low_threshold:
result = leaf_nodes
elif all_higher_high_threshold:
result = [root_node]
return result
def _get_usage_rate_by_op_id(self, op_id):
"""
Gets the usage rate of the queue corresponding to the specified operator.
Args:
op_id (int): The pipeline operator ID.
Returns:
float, the usage rate of the queue corresponding to the specified
operator.
"""
index = self._op_id_index_map.get(op_id)
op_info = self._data[index]
return op_info[self._index_output_queue_usage_rate]
def _set_display_col_name(self, is_display_op_detail):
"""
Set the display column name according to the filter condition.
Args:
is_display_op_detail (bool): Whether to display the detailed operator
information.
"""
if not is_display_op_detail:
self._display_col_names = self._col_names[0:2]
self._display_col_names.extend(self._col_names[4:])
def _convert_field_type(self, row):
"""
Convert the field type of minddata pipeline file to the specific type.
Args:
row (list[str]): One row data from parsed data.
Returns:
list[Union[str, int, float]], the converted data.
"""
return [
int(row[self._index_op_id]),
row[self._index_op_type],
int(row[self._index_num_workers]),
json.loads(row[self._index_output_queue_size])
if row[self._index_output_queue_size] else None,
float(row[self._index_output_queue_average_size])
if row[self._index_output_queue_average_size] else None,
int(row[self._index_output_queue_length])
if row[self._index_output_queue_length] else None,
float(row[self._index_output_queue_usage_rate])
if row[self._index_output_queue_usage_rate] else None,
int(row[self._index_sample_interval]),
int(row[self._index_parent_id])
if row[self._index_parent_id] else None,
json.loads(row[self._index_children_id])
if row[self._index_children_id] else None
]
def _get_op_id_index_map(self):
"""
Get the map of the operator id and index in data.
Returns:
dict, the map of the operator id and index in data.
"""
the_map = {}
for index, op_info in enumerate(self._data):
the_map[op_info[self._index_op_id]] = index
return the_map
......@@ -27,7 +27,7 @@ from mindinsight.profiler.common.util import query_latest_trace_time_file, get_f
class StepTraceAnalyser(BaseAnalyser):
"""The analyser for analyzing training steps."""
__col_names__ = []
_col_names = []
_attr_ui_name = 'name'
_attr_ui_start = 'start'
_attr_ui_duration = 'duration'
......@@ -89,7 +89,7 @@ class StepTraceAnalyser(BaseAnalyser):
self.__column__ = next(csv_reader)
self._data = list(csv_reader)
self._size = len(self._data) - 1
self._display_col_names = self.__col_names__[:]
self._display_col_names = self._col_names[:]
def _filter(self, filter_condition):
"""
......
......@@ -49,6 +49,7 @@ class ProfilerErrors(ProfilerMgrErrors):
SORT_CONDITION_ERROR = 5 | _ANALYSER_MASK
FILTER_CONDITION_ERROR = 6 | _ANALYSER_MASK
COLUMN_NOT_SUPPORT_SORT_ERROR = 7 | _ANALYSER_MASK
PIPELINE_OP_NOT_EXIST_ERROR = 8 | _ANALYSER_MASK
......@@ -80,3 +81,4 @@ class ProfilerErrorMsg(Enum):
GROUP_CONDITION_ERROR = 'The group_condition in search_condition error, {}'
SORT_CONDITION_ERROR = 'The sort_condition in search_condition error, {}'
COLUMN_NOT_SUPPORT_SORT_ERROR = 'The column {} does not support to sort.'
PIPELINE_OP_NOT_EXIST_ERROR = 'The minddata pipeline operator {} does not exist.'
......@@ -214,3 +214,14 @@ class JobIdMismatchException(MindInsightException):
message=ProfilerErrorMsg.JOB_ID_MISMATCH_ERROR.value,
http_code=400
)
class ProfilerPipelineOpNotExistException(MindInsightException):
"""The minddata pipeline operator does not exist error in profiler module."""
def __init__(self, msg):
super(ProfilerPipelineOpNotExistException, self).__init__(
error=ProfilerErrors.PIPELINE_OP_NOT_EXIST_ERROR,
message=ProfilerErrorMsg.PIPELINE_OP_NOT_EXIST_ERROR.value.format(msg),
http_code=400
)
......@@ -27,6 +27,11 @@ AICORE_TYPE_COL = ["op_type", "execution_time", "execution_frequency", "precent"
AICORE_DETAIL_COL = ["op_name", "op_type", "execution_time", "subgraph", "full_op_name"]
AICPU_COL = ["serial_number", "op_name", "total_time", "dispatch_time", "RunV2_start",
"compute_start", "memcpy_start", "memcpy_end", "RunV2_end"]
MINDDATA_PIPELINE_COL = [
'op_id', 'op_type', 'num_workers', 'output_queue_average_size',
'output_queue_length', 'output_queue_usage_rate', 'sample_interval',
'parent_id'
]
def validate_condition(search_condition):
......@@ -68,7 +73,7 @@ def validate_condition(search_condition):
raise ProfilerOpTypeException("The op_type must in ['aicpu', 'aicore_type', 'aicore_detail']")
if "group_condition" in search_condition:
validata_group_condition(search_condition)
validate_group_condition(search_condition)
if "sort_condition" in search_condition:
validate_sort_condition(search_condition, search_scope)
......@@ -77,7 +82,7 @@ def validate_condition(search_condition):
validate_filter_condition(search_condition)
def validata_group_condition(search_condition):
def validate_group_condition(search_condition):
"""
Verify the group_condition in search_condition is valid or not.
......@@ -91,7 +96,7 @@ def validata_group_condition(search_condition):
if not isinstance(group_condition, dict):
raise ProfilerGroupConditionException("The group condition must be dict.")
if "limit" in group_condition:
limit = group_condition.get("limit", 0)
limit = group_condition.get("limit", 10)
if isinstance(limit, bool) \
or not isinstance(group_condition.get("limit"), int):
log.error("The limit must be int.")
......@@ -145,6 +150,35 @@ def validate_sort_condition(search_condition, search_scope):
raise ProfilerSortConditionException(err_msg)
def validate_op_filter_condition(op_condition, value_type=str, value_type_msg='str'):
"""
Verify the op_condition in filter_condition is valid or not.
Args:
op_condition (dict): The op_condition in search_condition.
value_type (type): The value type. Default: str.
value_type_msg (str): The value type message. Default: 'str'.
Raises:
ProfilerFilterConditionException: If the filter_condition param in search_condition is invalid.
"""
filter_key = ["in", "not_in", "partial_match_str_in"]
if not isinstance(op_condition, dict):
raise ProfilerFilterConditionException("The filter condition value must be dict.")
for key, value in op_condition.items():
if not isinstance(key, str):
raise ProfilerFilterConditionException("The filter key must be str")
if not isinstance(value, list):
raise ProfilerFilterConditionException("The filter value must be list")
if key not in filter_key:
raise ProfilerFilterConditionException("The filter key must in {}.".format(filter_key))
for item in value:
if not isinstance(item, value_type):
raise ProfilerFilterConditionException(
"The item in filter value must be {}.".format(value_type_msg)
)
def validate_filter_condition(search_condition):
"""
Verify the filter_condition in search_condition is valid or not.
......@@ -155,33 +189,9 @@ def validate_filter_condition(search_condition):
Raises:
ProfilerFilterConditionException: If the filter_condition param in search_condition is invalid.
"""
def validate_op_filter_condition(op_condition):
"""
Verify the op_condition in filter_condition is valid or not.
Args:
op_condition (dict): The op_condition in search_condition.
Raises:
ProfilerFilterConditionException: If the filter_condition param in search_condition is invalid.
"""
if not isinstance(op_condition, dict):
raise ProfilerFilterConditionException("Wrong op_type filter condition.")
for key, value in op_condition.items():
if not isinstance(key, str):
raise ProfilerFilterConditionException("The filter key must be str")
if not isinstance(value, list):
raise ProfilerFilterConditionException("The filter value must be list")
if key not in filter_key:
raise ProfilerFilterConditionException("The filter key must in {}.".format(filter_key))
for item in value:
if not isinstance(item, str):
raise ProfilerFilterConditionException("The item in filter value must be str")
filter_condition = search_condition.get("filter_condition")
if not isinstance(filter_condition, dict):
raise ProfilerFilterConditionException("The filter condition must be dict.")
filter_key = ["in", "not_in", "partial_match_str_in"]
if filter_condition:
if "op_type" in filter_condition:
op_type_condition = filter_condition.get("op_type")
......@@ -232,3 +242,65 @@ def validate_ui_proc(proc_name):
if proc_name not in accept_names:
log.error("Invalid proc_name. The proc_name for restful api is in %s", accept_names)
raise ProfilerParamValueErrorException(f'proc_name should be in {accept_names}.')
def validate_minddata_pipeline_condition(condition):
"""
Verify the minddata pipeline search condition is valid or not.
Args:
condition (dict): The minddata pipeline search condition.
Raises:
ProfilerParamTypeErrorException: If the type of the search condition is
invalid.
ProfilerDeviceIdException: If the device_id param in the search
condition is invalid.
ProfilerGroupConditionException: If the group_condition param in the
search condition is invalid.
ProfilerSortConditionException: If the sort_condition param in the
search condition is invalid.
ProfilerFilterConditionException: If the filter_condition param in the
search condition is invalid.
"""
if not isinstance(condition, dict):
log.error("Invalid condition type, it should be dict.")
raise ProfilerParamTypeErrorException(
"Invalid condition type, it should be dict."
)
if "device_id" in condition:
device_id = condition.get("device_id")
if not isinstance(device_id, str):
raise ProfilerDeviceIdException(
"Invalid device_id type, it should be str."
)
if "group_condition" in condition:
validate_group_condition(condition)
if "sort_condition" in condition:
validate_sort_condition(condition, MINDDATA_PIPELINE_COL)
if "filter_condition" in condition:
filter_condition = condition.get('filter_condition')
if not isinstance(filter_condition, dict):
raise ProfilerFilterConditionException(
"The filter condition must be dict."
)
for key, value in filter_condition.items():
if key == 'op_id':
validate_op_filter_condition(
value, value_type=int, value_type_msg='int'
)
elif key == 'op_type':
validate_op_filter_condition(value)
elif key == 'is_display_op_detail':
if not isinstance(key, bool):
raise ProfilerFilterConditionException(
"The condition must be bool."
)
else:
raise ProfilerFilterConditionException(
"The key {} of filter_condition is not support.".format(key)
)
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Thr parser for parsing minddata pipeline files."""
import csv
import json
import os
from queue import Queue
from marshmallow import ValidationError
from mindinsight.profiler.common.exceptions.exceptions import \
ProfilerPathErrorException, ProfilerFileNotFoundException, \
ProfilerDirNotFoundException, ProfilerRawFileException
from mindinsight.profiler.common.log import logger
from mindinsight.profiler.common.validator.validate_path import \
validate_and_normalize_path
class MinddataPipelineParser:
"""
Thr parser for parsing minddata pipeline files.
Args:
source_dir (str): The minddata pipeline source dir.
device_id (str): The device ID.
output_path (str): The directory of the parsed file. Default: `./`.
Raises:
ProfilerPathErrorException: If the minddata pipeline file path or
the output path is invalid.
ProfilerFileNotFoundException: If the minddata pipeline file or
the output dir does not exist.
"""
_raw_pipeline_file_name = 'pipeline_profiling_{}.json'
_parsed_pipeline_file_name = 'minddata_pipeline_raw_{}.csv'
_col_names = [
'op_id', 'op_type', 'num_workers', 'output_queue_size',
'output_queue_average_size', 'output_queue_length',
'output_queue_usage_rate', 'sample_interval', 'parent_id', 'children_id'
]
def __init__(self, source_dir, device_id, output_path='./'):
self._device_id = device_id
self._pipeline_path = self._get_pipeline_path(source_dir)
self._save_path = self._get_save_path(output_path)
@property
def save_path(self):
"""
The property of save path.
Returns:
str, the save path.
"""
return self._save_path
def parse(self):
"""
Parse the minddata pipeline files.
Raises:
ProfilerRawFileException: If fails to parse the raw file of
minddata pipeline or the file is empty.
"""
with open(self._pipeline_path, 'r') as file:
try:
pipeline_info = json.load(file)
except (json.JSONDecodeError, TypeError) as err:
logger.exception(err)
raise ProfilerRawFileException(
'Fail to parse minddata pipeline file.'
)
if not pipeline_info:
logger.warning('The minddata pipeline file is empty.')
raise ProfilerRawFileException(
'The minddata pipeline file is empty.'
)
self._parse_and_save(pipeline_info)
def _get_pipeline_path(self, source_dir):
"""
Get the minddata pipeline file path.
Args:
source_dir (str): The minddata pipeline source dir.
Returns:
str, the minddata pipeline file path.
"""
pipeline_path = os.path.join(
source_dir,
self._raw_pipeline_file_name.format(self._device_id)
)
try:
pipeline_path = validate_and_normalize_path(pipeline_path, 'profiler')
except ValidationError:
logger.warning('Minddata pipeline file is invalid.')
raise ProfilerPathErrorException('Minddata pipeline file is invalid.')
if not os.path.isfile(pipeline_path):
logger.warning(
'The minddata pipeline file <%s> not found.', pipeline_path
)
raise ProfilerFileNotFoundException(pipeline_path)
return pipeline_path
def _get_save_path(self, output_path):
"""
Get the save path.
Args:
output_path (str): The output dir.
Returns:
str, the save path.
"""
try:
output_dir = validate_and_normalize_path(output_path, 'profiler')
except ValidationError:
logger.warning('Output path is invalid.')
raise ProfilerPathErrorException('Output path is invalid.')
if not os.path.isdir(output_dir):
logger.warning('The output dir <%s> not found.', output_dir)
raise ProfilerDirNotFoundException(output_dir)
return os.path.join(
output_dir, self._parsed_pipeline_file_name.format(self._device_id)
)
def _parse_and_save(self, pipeline_info):
"""
Parse and save the parsed minddata pipeline file.
Args:
pipeline_info (dict): The pipeline info reads from the raw file of
the minddata pipeline.
Raises:
ProfilerRawFileException: If the format of minddata pipeline raw
file is wrong.
"""
sample_interval = pipeline_info.get('sampling_interval')
op_info = pipeline_info.get('op_info')
if sample_interval is None or not op_info:
raise ProfilerRawFileException(
'The format of minddata pipeline raw file is wrong.'
)
op_id_info_cache = {}
for item in op_info:
op_id_info_cache[item.get('op_id')] = item
with open(self._save_path, 'w') as save_file:
csv_writer = csv.writer(save_file)
csv_writer.writerow(self._col_names)
self._parse_and_save_op_info(
csv_writer, op_id_info_cache, sample_interval
)
def _parse_and_save_op_info(self, csv_writer, op_id_info_cache,
sample_interval):
"""
Parse and save the minddata pipeline operator information.
Args:
csv_writer (csv.writer): The csv writer.
op_id_info_cache (dict): The operator id and information cache.
sample_interval (int): The sample interval.
Raises:
ProfilerRawFileException: If the operator that id is 0 does not exist.
"""
queue = Queue()
root_node = op_id_info_cache.get(0)
if not root_node:
raise ProfilerRawFileException(
'The format of minddata pipeline raw file is wrong, '
'the operator that id is 0 does not exist.'
)
root_node['parent_id'] = None
queue.put_nowait(root_node)
while not queue.empty():
node = queue.get_nowait()
self._update_child_node(node, op_id_info_cache)
csv_writer.writerow(self._get_op_info(node, sample_interval))
op_id = node.get('op_id')
children_ids = node.get('children')
if not children_ids:
continue
for child_op_id in children_ids:
sub_node = op_id_info_cache.get(child_op_id)
sub_node['parent_id'] = op_id
queue.put_nowait(sub_node)
def _update_child_node(self, node, op_id_info_cache):
"""
Updates the child node information of the operator.
Args:
node (dict): The node represents an operator.
op_id_info_cache (dict): The operator id and information cache.
"""
child_op_ids = node.get('children')
if not child_op_ids:
return
queue = Queue()
self._cp_list_item_to_queue(child_op_ids, queue)
new_child_op_ids = []
while not queue.empty():
child_op_id = queue.get_nowait()
child_node = op_id_info_cache.get(child_op_id)
if child_node is None:
continue
metrics = child_node.get('metrics')
if not metrics or not metrics.get('output_queue'):
op_ids = child_node.get('children')
if op_ids:
self._cp_list_item_to_queue(op_ids, queue)
else:
new_child_op_ids.append(child_op_id)
node['children'] = new_child_op_ids
def _get_op_info(self, op_node, sample_interval):
"""
Get the operator information.
Args:
op_node (dict): The node represents an operator.
sample_interval (int): The sample interval.
Returns:
list[str, int, float], the operator information.
"""
queue_size = None
queue_average_size = None
queue_length = None
queue_usage_rate = None
metrics = op_node.get('metrics')
if metrics:
output_queue = metrics.get('output_queue')
if output_queue:
queue_size = output_queue.get('size')
queue_average_size = sum(queue_size) / len(queue_size)
queue_length = output_queue.get('length')
queue_usage_rate = queue_average_size / queue_length
children_id = op_node.get('children')
op_info = [
op_node.get('op_id'),
op_node.get('op_type'),
op_node.get('num_workers'),
queue_size,
queue_average_size,
queue_length,
queue_usage_rate,
sample_interval,
op_node.get('parent_id'),
children_id if children_id else None
]
return op_info
def _cp_list_item_to_queue(self, inner_list, queue):
"""
Copy the contents of a list to a queue.
Args:
inner_list (list): The list.
queue (Queue): The target queue.
"""
for item in inner_list:
queue.put_nowait(item)
......@@ -30,6 +30,8 @@ from mindinsight.profiler.parser.aicpu_data_parser import DataPreProcessParser
from mindinsight.profiler.parser.framework_parser import FrameworkParser
from mindinsight.profiler.parser.hwts_log_parser import HWTSLogParser
from mindinsight.profiler.parser.minddata_parser import MinddataParser
from mindinsight.profiler.parser.minddata_pipeline_parser import \
MinddataPipelineParser
from mindinsight.profiler.parser.optime_parser import OPComputeTimeParser
from mindinsight.profiler.parser.step_trace_parser import StepTraceParser
from mindinsight.utils.exceptions import MindInsightException
......@@ -198,11 +200,18 @@ class Profiler:
# Parsing minddata AICPU profiling
MinddataParser.execute(source_path, self._output_path, self._dev_id)
# parse minddata pipeline operator and queue
try:
pipeline_parser = MinddataPipelineParser(job_id, self._dev_id)
pipeline_parser.parse()
except MindInsightException as err:
logger.warning(err.message)
# analyse op compute time info
try:
self._analyser_op_info()
except MindInsightException as err:
logger.error(err.message)
logger.warning(err.message)
# analyse step trace info
self._analyse_step_trace(source_path, framework_parser)
......
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Test profiler restful api of minddata pipeline."""
import json
from unittest import mock
from flask import Response
from marshmallow import ValidationError
from mindinsight.backend.application import APP
class TestMinddataPipelineApi:
"""Test the minddata pipeline restful api of profiler."""
def setup_method(self):
"""Test init."""
APP.response_class = Response
self._app_client = APP.test_client()
self._url_op_queue = '/v1/mindinsight/profile/minddata-pipeline/op-queue'
self._url_queue = '/v1/mindinsight/profile/minddata-pipeline/queue'
@mock.patch('mindinsight.backend.profiler.profile_api.settings')
@mock.patch('mindinsight.profiler.analyser.base_analyser.BaseAnalyser.query')
def test_get_minddata_pipeline_op_queue_info_1(self, *args):
"""Test the function of querying operator and queue information."""
expect_result = {
'col_name': [
'op_id', 'op_type', 'output_queue_average_size',
'output_queue_length', 'output_queue_usage_rate',
'sample_interval', 'parent_id', 'children_id'],
'object': [],
'size': 0
}
args[0].return_value = expect_result
args[1].SUMMARY_BASE_DIR = '/path/to/summary_base_dir'
url = self._url_op_queue + '?train_id=run1&profile=profiler'
body_data = {}
response = self._app_client.post(url, data=json.dumps(body_data))
assert response.status_code == 200
assert expect_result == response.get_json()
def test_get_minddata_pipeline_op_queue_info_2(self):
"""Test the function of querying operator and queue information."""
expect_result = {
'error_code': '50540002',
'error_msg': 'Invalid parameter value. No profiler_dir or train_id.'
}
url = self._url_op_queue + '?train_id=run1'
body_data = {}
response = self._app_client.post(url, data=json.dumps(body_data))
assert response.status_code == 400
assert expect_result == response.get_json()
@mock.patch('mindinsight.backend.profiler.profile_api.validate_and_normalize_path')
@mock.patch('mindinsight.backend.profiler.profile_api.settings')
def test_get_minddata_pipeline_op_queue_info_3(self, *args):
"""Test the function of querying operator and queue information."""
args[0].SUMMARY_BASE_DIR = '/path/to/summary_base_dir'
args[1].side_effect = ValidationError('xxx')
expect_result = {
'error_code': '50540002',
'error_msg': 'Invalid parameter value. Invalid profiler dir.'
}
url = self._url_op_queue + '?train_id=run1&profile=profiler'
body_data = {}
response = self._app_client.post(url, data=json.dumps(body_data))
assert response.status_code == 400
assert expect_result == response.get_json()
@mock.patch('mindinsight.backend.profiler.profile_api.settings')
def test_get_minddata_pipeline_op_queue_info_4(self, *args):
"""Test the function of querying operator and queue information."""
args[0].SUMMARY_BASE_DIR = '/path/to/summary_base_dir'
expect_result = {
'error_code': '50540002',
'error_msg': 'Invalid parameter value. Json data parse failed.'
}
url = self._url_op_queue + '?train_id=run1&profile=profiler'
response = self._app_client.post(url, data='xxx')
assert response.status_code == 400
assert expect_result == response.get_json()
@mock.patch('mindinsight.backend.profiler.profile_api.settings')
@mock.patch('mindinsight.profiler.analyser.minddata_pipeline_analyser.'
'MinddataPipelineAnalyser.get_op_and_parent_op_info')
def test_get_minddata_pipeline_queue_info_1(self, *args):
"""Test the function of querying queue information."""
expect_result = {
'current_op': {
'op_id': 1,
'op_type': 'Shuffle',
'num_workers': 1
},
'queue_info': {
'output_queue_size': [10, 20, 30],
'output_queue_average_size': 20.0,
'output_queue_length': 64,
'output_queue_usage_rate': 0.3125,
'sample_interval': 10
},
'parent_op': {
'op_id': 0,
'op_type': 'Batch',
'num_workers': 4
}
}
args[0].return_value = expect_result
args[1].SUMMARY_BASE_DIR = '/path/to/summary_base_dir'
url = self._url_queue + '?train_id=run1&profile=profiler&device_id=0&op_id=1'
response = self._app_client.get(url)
assert response.status_code == 200
assert expect_result == response.get_json()
def test_get_minddata_pipeline_queue_info_2(self):
"""Test the function of querying queue information."""
expect_result = {
'error_code': '50540002',
'error_msg': 'Invalid parameter value. No profiler_dir or train_id.'
}
url = self._url_queue + '?profile=profiler&device_id=0&op_id=1'
response = self._app_client.get(url)
assert response.status_code == 400
assert expect_result == response.get_json()
@mock.patch('mindinsight.backend.profiler.profile_api.validate_and_normalize_path')
@mock.patch('mindinsight.backend.profiler.profile_api.settings')
def test_get_minddata_pipeline_queue_info_3(self, *args):
"""Test the function of querying queue information."""
args[0].SUMMARY_BASE_DIR = '/path/to/summary_base_dir'
args[1].side_effect = ValidationError('xxx')
expect_result = {
'error_code': '50540002',
'error_msg': 'Invalid parameter value. Invalid profiler dir.'
}
url = self._url_queue + '?train_id=run1&profile=profiler&device_id=0&op_id=1'
response = self._app_client.get(url)
assert response.status_code == 400
assert expect_result == response.get_json()
@mock.patch('mindinsight.backend.profiler.profile_api.settings')
def test_get_minddata_pipeline_queue_info_4(self, *args):
"""Test the function of querying queue information."""
args[0].SUMMARY_BASE_DIR = '/path/to/summary_base_dir'
expect_result = {
'error_code': '50540002',
'error_msg': 'Invalid parameter value. '
'Invalid operator id or operator id does not exist.'
}
url = self._url_queue + '?train_id=run1&profile=profiler&device_id=0'
response = self._app_client.get(url)
assert response.status_code == 400
assert expect_result == response.get_json()
@mock.patch('mindinsight.backend.profiler.profile_api.settings')
def test_get_minddata_pipeline_queue_info_5(self, *args):
"""Test the function of querying queue information."""
args[0].SUMMARY_BASE_DIR = '/path/to/summary_base_dir'
expect_result = {
'error_code': '50540002',
'error_msg': 'Invalid parameter value. '
'Invalid operator id or operator id does not exist.'
}
url = self._url_queue + '?train_id=run1&profile=profiler&device_id=0&op_id=xx'
response = self._app_client.get(url)
assert response.status_code == 400
assert expect_result == response.get_json()
......@@ -17,4 +17,5 @@ import os
RAW_DATA_BASE = os.path.realpath(os.path.join(os.path.dirname(__file__), '../../utils/resource'))
RAW_DATA = os.path.realpath(os.path.join(RAW_DATA_BASE, 'JOB1'))
RAW_DATA_JOB2 = os.path.realpath(os.path.join(RAW_DATA_BASE, 'JOB2'))
PROFILER_DIR = os.path.realpath(os.path.join(RAW_DATA_BASE, 'profiler'))
full_op_time,execution_time
Default/AtomicAddrClean-op104,0.00133
Default/AtomicAddrClean-op105,0.000987
Default/AtomicAddrClean-op106,0.001129
Default/Cast-op10,0.00466
Default/network-WithLossCell/_backbone-LeNet5/conv1-Conv2d/Cast-op12,0.002366
Gradients/Default/network-WithLossCell/_backbone-LeNet5/conv1-Conv2d/gradConv2D/Cast-op53,0.004879
Default/TransData-op11,0.006366
Gradients/Default/network-WithLossCell/_backbone-LeNet5/gradReshape/TransData-op44,0.006782
Default/network-WithLossCell/_backbone-LeNet5/conv1-Conv2d/Conv2D-op13,0.05651
Default/network-WithLossCell/_backbone-LeNet5/fc3-Dense/MatMul-op9,0.370864
op_type,execution_time,execution_frequency,percent
AtomicAddrClean,0.007283,6,0.49
Cast,0.053395,13,3.63
TransData,0.121800,5,8.23
Conv2D,0.063656,2,4.33
MatMul,1.085982,9,73.80
task_id,stream_id,block_dim,full_op_name,op_name,op_type,subgraph,op_info
30290,0,1,Default/AtomicAddrClean-op104,AtomicAddrClean-op104,AtomicAddrClean,Default,"{""input_0"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": """"}}"
30295,0,1,Default/AtomicAddrClean-op105,AtomicAddrClean-op105,AtomicAddrClean,Default,"{""input_0"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": ""10""}}"
30300,0,1,Default/AtomicAddrClean-op106,AtomicAddrClean-op106,AtomicAddrClean,Default,"{""input_0"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": ""84""}}"
30268,0,32,Default/Cast-op10,Cast-op10,Cast,Default,"{""input_0"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": ""32,1,32,32""}, ""output_0"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""32,1,32,32""}}"
30271,0,9,Default/network-WithLossCell/_backbone-LeNet5/conv1-Conv2d/Cast-op12,Cast-op12,Cast,Default,"{""input_0"": {""format"": ""FracZ"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": ""25,1,16,16""}, ""output_0"": {""format"": ""FracZ"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""25,1,16,16""}}"
30320,0,32,Gradients/Default/network-WithLossCell/_backbone-LeNet5/conv1-Conv2d/gradConv2D/Cast-op53,Cast-op53,Cast,Gradients,"{""input_0"": {""format"": ""NC1HWC0"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": ""32,1,28,28,16""}, ""output_0"": {""format"": ""NC1HWC0"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""32,1,28,28,16""}}"
30269,0,32,Default/TransData-op11,TransData-op11,TransData,Default,"{""input_0"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""32,1,32,32""}, ""output_0"": {""format"": ""NC1HWC0"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""32,1,32,32""}}"
30308,0,32,Gradients/Default/network-WithLossCell/_backbone-LeNet5/gradReshape/TransData-op44,TransData-op44,TransData,Gradients,"{""input_0"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""32,16,5,5""}, ""output_0"": {""format"": ""NC1HWC0"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""32,1,5,5,16""}}"
30272,0,32,Default/network-WithLossCell/_backbone-LeNet5/conv1-Conv2d/Conv2D-op13,Conv2D-op13,Conv2D,Default,"{""input_0"": {""format"": ""NC1HWC0"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""32,1,32,32,16""}, ""input_1"": {""format"": ""FracZ"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""25,1,16,16""}, ""output_0"": {""format"": ""NC1HWC0"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""32,1,28,28,16""}}"
30286,0,1,Default/network-WithLossCell/_backbone-LeNet5/fc3-Dense/MatMul-op9,MatMul-op9,MatMul,Default,"{""input_0"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": ""32,120""}, ""input_1"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": ""84,120""}, ""input_2"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": ""84""}, ""output_0"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": ""32,84""}}"
op_id,op_type,num_workers,output_queue_size,output_queue_average_size,output_queue_length,output_queue_usage_rate,sample_interval,parent_id,children_id
0,Batch,4,,,,,10,,[4]
4,Shuffle,1,"[10, 10, 10, 10]",10.0,64,0.15625,10,0,[1]
1,Shuffle,1,"[60, 60, 60, 60]",60.0,64,0.9375,10,4,"[2, 3]"
2,TFReader,4,"[10, 20, 30, 20]",20.0,64,0.3125,10,1,
3,TFReader,4,"[10, 20, 30, 20]",20.0,64,0.3125,10,1,
op_id,op_type,num_workers,output_queue_size,output_queue_average_size,output_queue_length,output_queue_usage_rate,sample_interval,parent_id,children_id
0,Batch,4,,,,,10,,[4]
4,Shuffle,1,"[10, 20, 30, 20]",20.0,64,0.3125,10,0,[1]
1,Shuffle,1,"[10, 10, 10, 10]",10.0,64,0.15625,10,4,"[2, 3]"
2,TFReader,4,"[60, 60, 60, 60]",60.0,64,0.9375,10,1,
3,TFReader,4,"[60, 60, 60, 60]",60.0,64,0.9375,10,1,
op_id,op_type,num_workers,output_queue_size,output_queue_average_size,output_queue_length,output_queue_usage_rate,sample_interval,parent_id,children_id
0,Batch,4,,,,,10,,[4]
4,Shuffle,1,"[60, 60, 60, 60]",60.0,64,0.9375,10,0,[1]
1,Shuffle,1,"[60, 60, 60, 60]",60.0,64,0.9375,10,4,"[2, 3]"
2,TFReader,4,"[60, 60, 60, 60]",60.0,64,0.9375,10,1,
3,TFReader,4,"[60, 60, 60, 60]",60.0,64,0.9375,10,1,
op_id,op_type,num_workers,output_queue_size,output_queue_average_size,output_queue_length,output_queue_usage_rate,sample_interval,parent_id,children_id
0,Batch,4,,,,,10,,[4]
4,Shuffle,1,"[10, 10, 10, 10]",10.0,64,0.15625,10,0,[1]
1,Shuffle,1,"[10, 10, 10, 10]",10.0,64,0.15625,10,4,"[2, 3]"
2,TFReader,4,"[10, 10, 10, 10]",10.0,64,0.15625,10,1,
3,TFReader,4,"[10, 10, 10, 10]",10.0,64,0.15625,10,1,
......@@ -18,10 +18,12 @@ import json
import os
from unittest import TestCase
from mindinsight.profiler.analyser.analyser import AicoreDetailAnalyser
from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory
from tests.ut.profiler import PROFILER_DIR
COL_NAMES = ['op_name', 'op_type', 'execution_time', 'subgraph', 'full_op_name',
'op_info']
def get_detail_infos(indexes=None, sort_name=None, sort_type=True):
"""
......@@ -56,7 +58,7 @@ def get_detail_infos(indexes=None, sort_name=None, sort_type=True):
result = cache
if sort_name:
sort_index = AicoreDetailAnalyser.__col_names__.index(sort_name)
sort_index = COL_NAMES.index(sort_name)
result.sort(key=lambda item: item[sort_index], reverse=sort_type)
return result
......@@ -73,7 +75,7 @@ class TestAicoreDetailAnalyser(TestCase):
def test_query_success_1(self):
"""Test the success of the querying function."""
expect_result = {
'col_name': AicoreDetailAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_detail_infos(),
'size': 10
}
......@@ -86,7 +88,7 @@ class TestAicoreDetailAnalyser(TestCase):
def test_query_success_2(self):
"""Test the success of the querying function."""
expect_result = {
'col_name': AicoreDetailAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_detail_infos(indexes=[9]),
'size': 1
}
......@@ -123,7 +125,7 @@ class TestAicoreDetailAnalyser(TestCase):
def test_query_success_3(self):
"""Test the success of the querying function."""
expect_result = {
'col_name': AicoreDetailAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_detail_infos(sort_name='execution_time', sort_type=True),
'size': 10
}
......@@ -137,7 +139,7 @@ class TestAicoreDetailAnalyser(TestCase):
self.assertDictEqual(expect_result, result)
expect_result = {
'col_name': AicoreDetailAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_detail_infos(sort_name='op_name', sort_type=False),
'size': 10
}
......@@ -153,7 +155,7 @@ class TestAicoreDetailAnalyser(TestCase):
def test_query_success_4(self):
"""Test the success of the querying function."""
expect_result = {
'col_name': AicoreDetailAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_detail_infos(indexes=[2, 3]),
'size': 10
}
......@@ -167,7 +169,7 @@ class TestAicoreDetailAnalyser(TestCase):
self.assertDictEqual(expect_result, result)
expect_result = {
'col_name': AicoreDetailAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': [],
'size': 10
}
......@@ -183,7 +185,7 @@ class TestAicoreDetailAnalyser(TestCase):
def test_query_success_5(self):
"""Test the success of the querying function."""
expect_result = {
'col_name': AicoreDetailAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_detail_infos(
indexes=[1, 2], sort_name='execution_time', sort_type=True
),
......@@ -207,7 +209,7 @@ class TestAicoreDetailAnalyser(TestCase):
self.assertDictEqual(expect_result, result)
expect_result = {
'col_name': AicoreDetailAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_detail_infos(
indexes=[0, 1, 2, 8], sort_name='execution_time', sort_type=True
),
......@@ -234,7 +236,7 @@ class TestAicoreDetailAnalyser(TestCase):
detail_infos = get_detail_infos(indexes=[9])
expect_result = {
'col_name': AicoreDetailAnalyser.__col_names__[0:5],
'col_name': COL_NAMES[0:5],
'object': [item[0:5] for item in detail_infos],
'size': 1
}
......@@ -250,7 +252,7 @@ class TestAicoreDetailAnalyser(TestCase):
self.assertDictEqual(expect_result, result)
expect_result = {
'col_name': AicoreDetailAnalyser.__col_names__[0:4],
'col_name': COL_NAMES[0:4],
'object': [item[0:4] for item in detail_infos],
'size': 1
}
......@@ -270,7 +272,7 @@ class TestAicoreDetailAnalyser(TestCase):
"""Test the success of the querying and sorting function by operator type."""
detail_infos = get_detail_infos(indexes=[9, 0, 2, 1, 5, 3, 4])
expect_result = {
'col_name': AicoreDetailAnalyser.__col_names__[0:4],
'col_name': COL_NAMES[0:4],
'object': [item[0:4] for item in detail_infos]
}
......@@ -292,7 +294,7 @@ class TestAicoreDetailAnalyser(TestCase):
"""Test the success of the querying and sorting function by operator type."""
detail_infos = get_detail_infos(indexes=[9, 0, 2, 1, 3, 4, 8, 6])
expect_result = {
'col_name': AicoreDetailAnalyser.__col_names__[0:4],
'col_name': COL_NAMES[0:4],
'object': [item[0:4] for item in detail_infos]
}
......@@ -310,14 +312,11 @@ class TestAicoreDetailAnalyser(TestCase):
result = self._analyser.query_and_sort_by_op_type(
filter_condition, op_type_order
)
print(result)
self.assertDictEqual(expect_result, result)
def test_col_names(self):
"""Test the querying column names function."""
self.assertListEqual(
AicoreDetailAnalyser.__col_names__, self._analyser.col_names
)
self.assertListEqual(COL_NAMES, self._analyser.col_names)
def test_data(self):
"""Test the querying data function."""
......
......@@ -17,10 +17,11 @@ import csv
import os
from unittest import TestCase
from mindinsight.profiler.analyser.analyser import AicoreTypeAnalyser
from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory
from tests.ut.profiler import PROFILER_DIR
COL_NAMES = ['op_type', 'execution_time', 'execution_frequency', 'percent']
def get_type_infos(indexes=None, sort_name=None, sort_type=True):
"""
......@@ -38,8 +39,8 @@ def get_type_infos(indexes=None, sort_name=None, sort_type=True):
PROFILER_DIR, 'aicore_intermediate_1_type.csv'
)
with open(aicore_type_path, 'r') as aicore_type_path:
csv_reader = csv.reader(aicore_type_path)
with open(aicore_type_path, 'r') as file:
csv_reader = csv.reader(file)
_ = next(csv_reader)
cache = []
for type_info in csv_reader:
......@@ -54,7 +55,7 @@ def get_type_infos(indexes=None, sort_name=None, sort_type=True):
result = cache
if sort_name:
sort_index = AicoreTypeAnalyser.__col_names__.index(sort_name)
sort_index = COL_NAMES.index(sort_name)
result.sort(key=lambda item: item[sort_index], reverse=sort_type)
return result
......@@ -71,7 +72,7 @@ class TestAicoreTypeAnalyser(TestCase):
def test_query_success_1(self):
"""Test the success of the querying function."""
expect_result = {
'col_name': AicoreTypeAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_type_infos(),
'size': 5
}
......@@ -85,7 +86,7 @@ class TestAicoreTypeAnalyser(TestCase):
def test_query_success_2(self):
"""Test the success of the querying function."""
expect_result = {
'col_name': AicoreTypeAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_type_infos(indexes=[1]),
'size': 1
}
......@@ -100,7 +101,7 @@ class TestAicoreTypeAnalyser(TestCase):
self.assertDictEqual(expect_result, result)
expect_result = {
'col_name': AicoreTypeAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_type_infos(indexes=[0, 2, 3, 4]),
'size': 4
}
......@@ -115,7 +116,7 @@ class TestAicoreTypeAnalyser(TestCase):
self.assertDictEqual(expect_result, result)
expect_result = {
'col_name': AicoreTypeAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_type_infos(indexes=[0, 1, 3]),
'size': 3
}
......@@ -132,7 +133,7 @@ class TestAicoreTypeAnalyser(TestCase):
def test_query_success_3(self):
"""Test the success of the querying function."""
expect_result = {
'col_name': AicoreTypeAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_type_infos(indexes=[1, 3]),
'size': 2
}
......@@ -147,7 +148,7 @@ class TestAicoreTypeAnalyser(TestCase):
self.assertDictEqual(expect_result, result)
expect_result = {
'col_name': AicoreTypeAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_type_infos(indexes=[0, 2, 4]),
'size': 3
}
......@@ -162,7 +163,7 @@ class TestAicoreTypeAnalyser(TestCase):
self.assertDictEqual(expect_result, result)
expect_result = {
'col_name': AicoreTypeAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_type_infos(indexes=[2, 3]),
'size': 2
}
......@@ -179,7 +180,7 @@ class TestAicoreTypeAnalyser(TestCase):
def test_query_success_4(self):
"""Test the success of the querying function."""
expect_result = {
'col_name': AicoreTypeAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_type_infos(sort_name='op_type', sort_type=True),
'size': 5}
condition = {
......@@ -192,7 +193,7 @@ class TestAicoreTypeAnalyser(TestCase):
self.assertDictEqual(expect_result, result)
expect_result = {
'col_name': AicoreTypeAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_type_infos(sort_name='execution_time', sort_type=False),
'size': 5
}
......@@ -208,7 +209,7 @@ class TestAicoreTypeAnalyser(TestCase):
def test_query_success_5(self):
"""Test the success of the querying function."""
expect_result = {
'col_name': AicoreTypeAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_type_infos(indexes=[0, 1]),
'size': 5
}
......@@ -222,7 +223,7 @@ class TestAicoreTypeAnalyser(TestCase):
self.assertDictEqual(expect_result, result)
expect_result = {
'col_name': AicoreTypeAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_type_infos(indexes=[3, 4]),
'size': 5
}
......@@ -238,7 +239,7 @@ class TestAicoreTypeAnalyser(TestCase):
def test_query_success_6(self):
"""Test the success of the querying function."""
expect_result = {
'col_name': AicoreTypeAnalyser.__col_names__,
'col_name': COL_NAMES,
'object': get_type_infos(
indexes=[1, 3], sort_name='execution_time', sort_type=True
),
......@@ -263,9 +264,7 @@ class TestAicoreTypeAnalyser(TestCase):
def test_col_names(self):
"""Test the querying column names function."""
self.assertListEqual(
AicoreTypeAnalyser.__col_names__, self._analyser.col_names
)
self.assertListEqual(COL_NAMES, self._analyser.col_names)
def test_data(self):
"""Test the querying data function."""
......
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Test the minddata pipeline analyser module."""
import csv
import json
import os
import pytest
from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory
from mindinsight.profiler.common.exceptions.exceptions import \
ProfilerPipelineOpNotExistException
from tests.ut.profiler import PROFILER_DIR
COL_NAMES = ['op_id', 'op_type', 'num_workers', 'output_queue_size',
'output_queue_average_size', 'output_queue_length',
'output_queue_usage_rate', 'sample_interval', 'parent_id',
'children_id']
def get_pipeline_infos(pipeline_path, indexes=None, sort_name=None,
sort_type=True):
"""
Get minddata pipeline operator and queue information.
Args:
pipeline_path (str): The parsed minddata pipeline file path.
indexes (list[int]): The operator indexes. Default: None.
sort_name (str): The sort name. Default: None.
sort_type (bool): The sort type. If the parameter is `True`, the results
are sorted in descending order, else `False`. Default: True.
Returns:
list[list], the minddata pipeline operator and queue information.
"""
with open(pipeline_path, 'r') as file:
csv_reader = csv.reader(file)
_ = next(csv_reader)
cache = []
for row in csv_reader:
cache.append(
[
int(row[0]),
row[1],
int(row[2]),
json.loads(row[3]) if row[3] else None,
float(row[4]) if row[4] else None,
int(row[5]) if row[5] else None,
float(row[6]) if row[6] else None,
int(row[7]),
int(row[8]) if row[8] else None,
json.loads(row[9]) if row[9] else None
]
)
if indexes:
result = [cache[index] for index in indexes]
else:
result = cache
if sort_name:
sort_index = COL_NAMES.index(sort_name)
result.sort(key=lambda item: item[sort_index], reverse=sort_type)
return result
def get_simple_row_info(row):
"""
Get simple minddata pipeline row information.
Args:
row (list[str, int, float]): The minddata pipeline row information.
Returns:
list[str, int, float], the simple minddata pipeline row information.
"""
simple_info = row[0:2]
simple_info.extend(row[4:])
return simple_info
class TestMinddataPipelineAnalyser:
"""Test the class of `MinddataPipelineAnalyser`."""
def setup_method(self):
"""Initialization before test case execution."""
self._analyser = AnalyserFactory.instance().get_analyser(
'minddata_pipeline', PROFILER_DIR, '0'
)
self._pipeline_path = os.path.join(
PROFILER_DIR, 'minddata_pipeline_raw_0.csv'
)
def test_query_success_1(self):
"""Test the success of the querying function."""
detail_infos = get_pipeline_infos(self._pipeline_path)
col_name = get_simple_row_info(COL_NAMES)
expect_result = {
'col_name': col_name,
'object': [get_simple_row_info(item) for item in detail_infos],
'size': 4
}
result = self._analyser.query({})
assert expect_result == result
result = self._analyser.query()
assert expect_result, result
def test_query_success_2(self):
"""Test the success of the querying function."""
detail_infos = get_pipeline_infos(self._pipeline_path, indexes=[0])
col_name = get_simple_row_info(COL_NAMES)
expect_result = {
'col_name': col_name,
'object': [get_simple_row_info(item) for item in detail_infos],
'size': 1
}
condition = {
'filter_condition': {
'op_id': {
'in': [0]
}
}
}
result = self._analyser.query(condition)
assert expect_result == result
detail_infos = get_pipeline_infos(self._pipeline_path, indexes=[0, 1])
expect_result = {
'col_name': col_name,
'object': [get_simple_row_info(item) for item in detail_infos],
'size': 2
}
condition = {
'filter_condition': {
'op_type': {
'not_in': ['TFReader']
}
}
}
result = self._analyser.query(condition)
assert expect_result == result
detail_infos = get_pipeline_infos(self._pipeline_path, indexes=[2, 3])
expect_result = {
'col_name': col_name,
'object': [get_simple_row_info(item) for item in detail_infos],
'size': 2
}
condition = {
'filter_condition': {
'op_type': {
'partial_match_str_in': ['TF']
}
}
}
result = self._analyser.query(condition)
assert expect_result, result
def test_query_success_3(self):
"""Test the success of the querying function."""
expect_result = {
'col_name': COL_NAMES,
'object': get_pipeline_infos(self._pipeline_path),
'size': 4
}
condition = {
'filter_condition': {
'is_display_op_detail': True
}
}
result = self._analyser.query(condition)
assert expect_result == result
def test_query_success_4(self):
"""Test the success of the querying function."""
expect_result = {
'col_name': COL_NAMES,
'object': [],
'size': 0
}
condition = {
'filter_condition': {
'threshold': [0.8, 0.2],
'is_display_op_detail': True
}
}
result = self._analyser.query(condition)
assert expect_result == result
def test_query_success_5(self):
"""
Test the success of the querying function.
The upstream queue utilization of the operator is greater than
the highest threshold, and the downstream queue utilization of
the operator is lower than the lowest threshold.
"""
profiling_dir = os.path.join(os.path.dirname(__file__), 'resource')
pipeline_path = os.path.join(
profiling_dir, 'minddata_pipeline_raw_0.csv'
)
analyser = AnalyserFactory.instance().get_analyser(
'minddata_pipeline', profiling_dir, '0'
)
expect_result = {
'col_name': COL_NAMES,
'object': get_pipeline_infos(pipeline_path, [1]),
'size': 1
}
condition = {
'filter_condition': {
'threshold': [0.8, 0.2],
'is_display_op_detail': True
}
}
result = analyser.query(condition)
assert expect_result == result
def test_query_success_6(self):
"""
Test the success of the querying function.
The upstream queue utilization of the operator is greater than
the highest threshold, and the downstream queue utilization of
the operator is lower than the lowest threshold.
"""
profiling_dir = os.path.join(os.path.dirname(__file__), 'resource')
pipeline_path = os.path.join(
profiling_dir, 'minddata_pipeline_raw_1.csv'
)
analyser = AnalyserFactory.instance().get_analyser(
'minddata_pipeline', profiling_dir, '1'
)
expect_result = {
'col_name': COL_NAMES,
'object': get_pipeline_infos(pipeline_path, [2]),
'size': 1
}
condition = {
'filter_condition': {
'threshold': [0.8, 0.2],
'is_display_op_detail': True
}
}
result = analyser.query(condition)
assert expect_result == result
def test_query_success_7(self):
"""
Test the success of the querying function.
All queues utilization are greater than the highest threshold.
"""
profiling_dir = os.path.join(os.path.dirname(__file__), 'resource')
pipeline_path = os.path.join(
profiling_dir, 'minddata_pipeline_raw_2.csv'
)
analyser = AnalyserFactory.instance().get_analyser(
'minddata_pipeline', profiling_dir, '2'
)
expect_result = {
'col_name': COL_NAMES,
'object': get_pipeline_infos(pipeline_path, [0]),
'size': 1
}
condition = {
'filter_condition': {
'threshold': [0.8, 0.2],
'is_display_op_detail': True
}
}
result = analyser.query(condition)
assert expect_result == result
def test_query_success_8(self):
"""
Test the success of the querying function.
All queues utilization are lower than the lowest threshold.
"""
profiling_dir = os.path.join(os.path.dirname(__file__), 'resource')
pipeline_path = os.path.join(
profiling_dir, 'minddata_pipeline_raw_3.csv'
)
analyser = AnalyserFactory.instance().get_analyser(
'minddata_pipeline', profiling_dir, '3'
)
expect_result = {
'col_name': COL_NAMES,
'object': get_pipeline_infos(pipeline_path, [3, 4]),
'size': 2
}
condition = {
'filter_condition': {
'threshold': [0.8, 0.2],
'is_display_op_detail': True
}
}
result = analyser.query(condition)
assert expect_result == result
def test_get_op_and_parent_op_info_success(self):
"""Test the success of the function of querying operator and parent operator."""
expect_result = {
'current_op': {
'op_id': 0,
'op_type': 'Batch',
'num_workers': 4
},
'parent_op': None,
'queue_info': None
}
result = self._analyser.get_op_and_parent_op_info(0)
assert expect_result == result
expect_result = {
'current_op': {
'op_id': 1,
'op_type': 'Shuffle',
'num_workers': 1
},
'queue_info': {
'output_queue_size': [10, 20, 30],
'output_queue_average_size': 20.0,
'output_queue_length': 64,
'output_queue_usage_rate': 0.3125,
'sample_interval': 10
},
'parent_op': {
'op_id': 0,
'op_type': 'Batch',
'num_workers': 4
}
}
result = self._analyser.get_op_and_parent_op_info(1)
assert expect_result == result
def test_get_op_and_parent_op_info_fail(self):
"""Test the function of fail to query operator and parent operator."""
with pytest.raises(ProfilerPipelineOpNotExistException) as exc_info:
self._analyser.get_op_and_parent_op_info(5)
assert exc_info.value.error_code == '50546188'
assert exc_info.value.message == 'The minddata pipeline operator 5 ' \
'does not exist.'
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Test the validator module."""
import pytest
from mindinsight.profiler.common.exceptions.exceptions import \
ProfilerParamTypeErrorException, ProfilerDeviceIdException, \
ProfilerGroupConditionException, ProfilerSortConditionException, \
ProfilerFilterConditionException
from mindinsight.profiler.common.validator.validate import \
validate_minddata_pipeline_condition
class TestMinddataPipelineCondition:
"""Test the method of `validate_minddata_pipeline_condition`."""
def test_validate_minddata_pipeline_condition_1(self):
"""Test the method exceptions."""
with pytest.raises(ProfilerParamTypeErrorException) as exc_info:
validate_minddata_pipeline_condition([])
assert exc_info.value.error_code == '50546082'
assert exc_info.value.message == 'Param type error. Invalid condition ' \
'type, it should be dict.'
def test_validate_minddata_pipeline_condition_2(self):
"""Test the method exceptions."""
condition = {'device_id': 0}
with pytest.raises(ProfilerDeviceIdException) as exc_info:
validate_minddata_pipeline_condition(condition)
assert exc_info.value.error_code == '50546182'
assert exc_info.value.message == 'The device_id in search_condition error, ' \
'Invalid device_id type, it should be str.'
def test_validate_minddata_pipeline_condition_3(self):
"""Test the method exceptions."""
condition = {'group_condition': 0}
with pytest.raises(ProfilerGroupConditionException) as exc_info:
validate_minddata_pipeline_condition(condition)
assert exc_info.value.error_code == '50546184'
assert exc_info.value.message == 'The group_condition in search_condition error, ' \
'The group condition must be dict.'
condition = {
'group_condition': {
'limit': '1'
}
}
with pytest.raises(ProfilerGroupConditionException) as exc_info:
validate_minddata_pipeline_condition(condition)
assert exc_info.value.error_code == '50546184'
assert exc_info.value.message == 'The group_condition in search_condition error, ' \
'The limit must be int.'
condition = {
'group_condition': {
'limit': '1'
}
}
with pytest.raises(ProfilerGroupConditionException) as exc_info:
validate_minddata_pipeline_condition(condition)
assert exc_info.value.error_code == '50546184'
assert exc_info.value.message == 'The group_condition in search_condition error, ' \
'The limit must be int.'
condition = {
'group_condition': {
'limit': 0
}
}
with pytest.raises(ProfilerGroupConditionException) as exc_info:
validate_minddata_pipeline_condition(condition)
assert exc_info.value.error_code == '50546184'
assert exc_info.value.message == 'The group_condition in search_condition error, ' \
'The limit must in [1, 100].'
condition = {
'group_condition': {
'offset': '0'
}
}
with pytest.raises(ProfilerGroupConditionException) as exc_info:
validate_minddata_pipeline_condition(condition)
assert exc_info.value.error_code == '50546184'
assert exc_info.value.message == 'The group_condition in search_condition error, ' \
'The offset must be int.'
condition = {
'group_condition': {
'offset': 1000001
}
}
with pytest.raises(ProfilerGroupConditionException) as exc_info:
validate_minddata_pipeline_condition(condition)
assert exc_info.value.error_code == '50546184'
assert exc_info.value.message == 'The group_condition in search_condition error, ' \
'The offset must le 1000000.'
def test_validate_minddata_pipeline_condition_4(self):
"""Test the method exceptions."""
condition = {'sort_condition': 0}
with pytest.raises(ProfilerSortConditionException) as exc_info:
validate_minddata_pipeline_condition(condition)
assert exc_info.value.error_code == '50546185'
assert exc_info.value.message == 'The sort_condition in search_condition error, ' \
'The sort condition must be dict.'
condition = {
'sort_condition': {
'name': 0
}
}
with pytest.raises(ProfilerSortConditionException) as exc_info:
validate_minddata_pipeline_condition(condition)
assert exc_info.value.error_code == '50546185'
assert exc_info.value.message == 'The sort_condition in search_condition error, ' \
'Wrong sorted name type.'
condition = {
'sort_condition': {
'name': 'xxx'
}
}
with pytest.raises(ProfilerSortConditionException) as exc_info:
validate_minddata_pipeline_condition(condition)
assert exc_info.value.error_code == '50546185'
assert exc_info.value.message.startswith(
'The sort_condition in search_condition error, The sorted_name must be in'
)
condition = {
'sort_condition': {
'name': 'output_queue_usage_rate',
'type': 'xxx'
}
}
with pytest.raises(ProfilerSortConditionException) as exc_info:
validate_minddata_pipeline_condition(condition)
assert exc_info.value.error_code == '50546185'
assert exc_info.value.message == 'The sort_condition in search_condition error, ' \
'The sorted type must be ascending or descending.'
def test_validate_minddata_pipeline_condition_5(self):
"""Test the method exceptions."""
condition = {
'filter_condition': '0'
}
with pytest.raises(ProfilerFilterConditionException) as exc_info:
validate_minddata_pipeline_condition(condition)
assert exc_info.value.error_code == '50546186'
assert exc_info.value.message == 'The filter_condition in search_condition error, ' \
'The filter condition must be dict.'
condition = {
'filter_condition': {
'xxx': 0
}
}
with pytest.raises(ProfilerFilterConditionException) as exc_info:
validate_minddata_pipeline_condition(condition)
assert exc_info.value.error_code == '50546186'
assert exc_info.value.message == 'The filter_condition in search_condition error, ' \
'The key xxx of filter_condition is not support.'
condition = {
'filter_condition': {
'is_display_op_detail': 0
}
}
with pytest.raises(ProfilerFilterConditionException) as exc_info:
validate_minddata_pipeline_condition(condition)
assert exc_info.value.error_code == '50546186'
assert exc_info.value.message == 'The filter_condition in search_condition error, ' \
'The condition must be bool.'
condition = {
'filter_condition': {
'op_id': 0
}
}
with pytest.raises(ProfilerFilterConditionException) as exc_info:
validate_minddata_pipeline_condition(condition)
assert exc_info.value.error_code == '50546186'
assert exc_info.value.message == 'The filter_condition in search_condition error, ' \
'The filter condition value must be dict.'
condition = {
'filter_condition': {
'op_id': {
'in': ['0']
}
}
}
with pytest.raises(ProfilerFilterConditionException) as exc_info:
validate_minddata_pipeline_condition(condition)
assert exc_info.value.error_code == '50546186'
assert exc_info.value.message == 'The filter_condition in search_condition error, ' \
'The item in filter value must be int.'
......@@ -51,13 +51,11 @@ class TestFrameworkParser:
"""Test the class of `FrameworkParser`."""
def setup_method(self):
"""Initialization before test case execution."""
raw_dir = RAW_DATA_BASE
FrameworkParser._raw_data_dir = raw_dir
FrameworkParser._raw_data_dir = RAW_DATA_BASE
self._output_path_1 = tempfile.mkdtemp(prefix='test_framework_parser_')
self._parser_1 = FrameworkParser('JOB1', '0', self._output_path_1)
os.makedirs(os.path.join(raw_dir, 'JOB2'), exist_ok=True)
self._output_path_2 = tempfile.mkdtemp(prefix='test_framework_parser_')
self._parser_2 = FrameworkParser('JOB2', '0', self._output_path_2)
......@@ -84,8 +82,8 @@ class TestFrameworkParser:
'51522': 'Default/network-WithLossCell/_backbone-ResNet/'
'layer1-SequentialCell/0-ResidualBlock/conv1-Conv2d/Cast-op28'
}
assert self._parser_1.to_task_id_full_op_name_dict(), expect_result
assert self._parser_2.to_task_id_full_op_name_dict(), expect_result
assert expect_result == self._parser_1.to_task_id_full_op_name_dict()
assert expect_result == self._parser_2.to_task_id_full_op_name_dict()
def test_parse(self):
"""Test the parse function."""
......@@ -135,6 +133,5 @@ class TestFrameworkParser:
with pytest.raises(ProfilerFileNotFoundException) as exc_info:
FrameworkParser('JOB1', '0')
print(exc_info.value)
assert exc_info.value.error_code == '50546084'
assert exc_info.value.message == 'The file <Framework> not found.'
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Test the minddata pipeline parser module."""
import csv
import os
import shutil
import tempfile
from mindinsight.profiler.parser.minddata_pipeline_parser import \
MinddataPipelineParser
from tests.ut.profiler import PROFILER_DIR, RAW_DATA, RAW_DATA_JOB2
def get_minddata_pipeline_result(file_path):
"""
Get minddata pipeline result from the minddata pipeline file.
Args:
file_path (str): The minddata pipeline file path.
Returns:
list[list], the parsed minddata pipeline information.
"""
result = []
with open(file_path, 'r') as file:
csv_reader = csv.reader(file)
for row in csv_reader:
result.append(row)
return result
class TestMinddataPipelineParser:
"""Test the class of `MinddataPipelineParser`."""
def setup_method(self):
"""Initialization before test case execution."""
self._output_path_1 = tempfile.mkdtemp(
prefix='test_minddata_pipeline_parser_'
)
self._parser_1 = MinddataPipelineParser(
RAW_DATA, '0', self._output_path_1
)
self._output_path_2 = tempfile.mkdtemp(
prefix='test_minddata_pipeline_parser_'
)
self._parser_2 = MinddataPipelineParser(
RAW_DATA_JOB2, '0', self._output_path_2
)
def teardown_method(self) -> None:
"""Clear up after test case execution."""
shutil.rmtree(self._output_path_1)
shutil.rmtree(self._output_path_2)
def test_save_path(self):
"""Test the querying save path function."""
expect_result = os.path.join(
self._output_path_1, 'minddata_pipeline_raw_0.csv'
)
assert expect_result == self._parser_1.save_path
def test_parse(self):
"""Test the parse function."""
expect_pipeline_file = os.path.join(
PROFILER_DIR, 'minddata_pipeline_raw_0.csv'
)
expect_result = get_minddata_pipeline_result(expect_pipeline_file)
self._parser_1.parse()
pipeline_file = os.path.join(
self._output_path_1, 'minddata_pipeline_raw_0.csv'
)
result = get_minddata_pipeline_result(pipeline_file)
assert expect_result == result
self._parser_2.parse()
pipeline_file = os.path.join(
self._output_path_2, 'minddata_pipeline_raw_0.csv'
)
result = get_minddata_pipeline_result(pipeline_file)
assert expect_result == result
{
"sampling_interval": 10,
"op_info": [
{
"op_id": 4,
"op_type": "TFReader",
"num_workers": 4,
"metrics": null,
"children": [3]
},
{
"op_id": 3,
"op_type": "TFReader",
"num_workers": 4,
"metrics": {
"output_queue": {
"size": [10, 20, 30],
"length": 64
}
},
"children": null
},
{
"op_id": 2,
"op_type": "TFReader",
"num_workers": 4,
"metrics": {
"output_queue": {
"size": [10, 20, 30],
"length": 64
}
},
"children": null
},
{
"op_id": 1,
"op_type": "Shuffle",
"num_workers": 1,
"metrics": {
"output_queue": {
"size": [10, 20, 30],
"length": 64
}
},
"children": [2, 4]
},
{
"op_id": 0,
"op_type": "Batch",
"num_workers": 4,
"metrics": null,
"children": [1]
}
]
}
\ No newline at end of file
{
"sampling_interval": 10,
"op_info": [
{
"op_id": 3,
"op_type": "TFReader",
"num_workers": 4,
"metrics": {
"output_queue": {
"size": [10, 20, 30],
"length": 64
}
},
"children": null
},
{
"op_id": 2,
"op_type": "TFReader",
"num_workers": 4,
"metrics": {
"output_queue": {
"size": [10, 20, 30],
"length": 64
}
},
"children": null
},
{
"op_id": 1,
"op_type": "Shuffle",
"num_workers": 1,
"metrics": {
"output_queue": {
"size": [10, 20, 30],
"length": 64
}
},
"children": [2, 3]
},
{
"op_id": 0,
"op_type": "Batch",
"num_workers": 4,
"metrics": null,
"children": [1]
}
]
}
\ No newline at end of file
op_id,op_type,num_workers,output_queue_size,output_queue_average_size,output_queue_length,output_queue_usage_rate,sample_interval,parent_id,children_id
0,Batch,4,,,,,10,,[1]
1,Shuffle,1,"[10, 20, 30]",20.0,64,0.3125,10,0,"[2, 3]"
2,TFReader,4,"[10, 20, 30]",20.0,64,0.3125,10,1,
3,TFReader,4,"[10, 20, 30]",20.0,64,0.3125,10,1,
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册