diff --git a/mindinsight/backend/profiler/profile_api.py b/mindinsight/backend/profiler/profile_api.py index 3292e0cd340d5c3589b3d0d3da1d86de44759b69..50dced5569cf6a42da63437672bb81b21169412b 100644 --- a/mindinsight/backend/profiler/profile_api.py +++ b/mindinsight/backend/profiler/profile_api.py @@ -26,14 +26,15 @@ from flask import request from marshmallow import ValidationError from mindinsight.conf import settings -from mindinsight.datavisual.utils.tools import get_train_id, get_profiler_dir, unquote_args -from mindinsight.datavisual.utils.tools import to_int -from mindinsight.lineagemgr.common.validator.validate_path import validate_and_normalize_path +from mindinsight.datavisual.utils.tools import get_train_id, get_profiler_dir, \ + unquote_args, to_int from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory from mindinsight.profiler.analyser.minddata_analyser import MinddataAnalyser from mindinsight.profiler.common.util import analyse_device_list_from_profiler_dir -from mindinsight.profiler.common.validator.validate import validate_condition, validate_ui_proc -from mindinsight.profiler.common.validator.validate_path import validate_and_normalize_profiler_path +from mindinsight.profiler.common.validator.validate import validate_condition, \ + validate_ui_proc, validate_minddata_pipeline_condition +from mindinsight.profiler.common.validator.validate_path import \ + validate_and_normalize_profiler_path, validate_and_normalize_path from mindinsight.utils.exceptions import ParamValueError BLUEPRINT = Blueprint("profile", __name__, url_prefix=settings.URL_PREFIX) @@ -274,6 +275,92 @@ def get_profiler_abs_dir(requests): return profiler_dir_abs + +@BLUEPRINT.route("/profile/minddata-pipeline/op-queue", methods=["POST"]) +def get_minddata_pipeline_op_queue_info(): + """ + Get minddata pipeline operator info and queue info. + + Returns: + str, the operation information and queue information. + + Raises: + ParamValueError: If the search condition contains some errors. + + Examples: + >>> POST http://xxxx/v1/mindinsight/profile/minddata-pipeline/op-queue + """ + profiler_dir = get_profiler_dir(request) + train_id = get_train_id(request) + if not profiler_dir or not train_id: + raise ParamValueError("No profiler_dir or train_id.") + + profiler_dir_abs = os.path.join( + settings.SUMMARY_BASE_DIR, train_id, profiler_dir + ) + try: + profiler_dir_abs = validate_and_normalize_path( + profiler_dir_abs, "profiler" + ) + except ValidationError: + raise ParamValueError("Invalid profiler dir.") + + condition = request.stream.read() + try: + condition = json.loads(condition) if condition else {} + except Exception: + raise ParamValueError("Json data parse failed.") + validate_minddata_pipeline_condition(condition) + + device_id = condition.get("device_id", "0") + analyser = AnalyserFactory.instance().get_analyser( + 'minddata_pipeline', profiler_dir_abs, device_id + ) + op_info = analyser.query(condition) + return jsonify(op_info) + + +@BLUEPRINT.route("/profile/minddata-pipeline/queue", methods=["GET"]) +def get_minddata_pipeline_queue_info(): + """ + Get the special minddata pipeline queue info. + + Returns: + str, the queue information. + + Raises: + ParamValueError: If the search condition contains some errors. + + Examples: + >>> GET http://xxxx/v1/mindinsight/profile/minddata-pipeline/queue + """ + profiler_dir = get_profiler_dir(request) + train_id = get_train_id(request) + if not profiler_dir or not train_id: + raise ParamValueError("No profiler_dir or train_id.") + + profiler_dir_abs = os.path.join( + settings.SUMMARY_BASE_DIR, train_id, profiler_dir + ) + try: + profiler_dir_abs = validate_and_normalize_path( + profiler_dir_abs, "profiler" + ) + except ValidationError: + raise ParamValueError("Invalid profiler dir.") + + device_id = request.args.get('device_id', default='0') + op_id = request.args.get('op_id', type=int) + if op_id is None: + raise ParamValueError("Invalid operator id or operator id does not exist.") + + analyser = AnalyserFactory.instance().get_analyser( + 'minddata_pipeline', profiler_dir_abs, device_id + ) + op_queue_info = analyser.get_op_and_parent_op_info(op_id) + return jsonify(op_queue_info) + + def init_module(app): """ Init module entry. diff --git a/mindinsight/profiler/analyser/__init__.py b/mindinsight/profiler/analyser/__init__.py index 6786c9526f8c9a5fd0b701058b6463c71b100b7a..cf01fe5eb77cfbc29d7d8a2c38b1bbc4ddb5f229 100644 --- a/mindinsight/profiler/analyser/__init__.py +++ b/mindinsight/profiler/analyser/__init__.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================ -"""Import analyser.""" -from .analyser import * -from .step_trace_analyser import StepTraceAnalyser -from .minddata_analyser import MinddataAnalyser +"""The analyser module.""" +from . import analyser, minddata_pipeline_analyser, step_trace_analyser, \ + minddata_analyser diff --git a/mindinsight/profiler/analyser/analyser.py b/mindinsight/profiler/analyser/analyser.py index f805e76a5541113d504036c6f17539b65098ce61..deeb36fedd2cc4c4ef5bbe946e676867fa7c14d9 100644 --- a/mindinsight/profiler/analyser/analyser.py +++ b/mindinsight/profiler/analyser/analyser.py @@ -29,9 +29,11 @@ class AicoreTypeAnalyser(BaseAnalyser): profiling_dir (str): The directory where the parsed profiling files are located. device_id (str): The device ID. + + Raises: + ProfilerPathErrorException: If the profiling dir is invalid. """ - __col_names__ = ['op_type', 'execution_time', 'execution_frequency', - 'percent'] + _col_names = ['op_type', 'execution_time', 'execution_frequency', 'percent'] _file_name_aicore_type_time = 'aicore_intermediate_{}_type.csv' def _load(self): @@ -82,9 +84,12 @@ class AicoreDetailAnalyser(BaseAnalyser): profiling_dir (str): The directory where the parsed profiling files are located. device_id (str): The device ID. + + Raises: + ProfilerPathErrorException: If the profiling dir is invalid. """ - __col_names__ = ['op_name', 'op_type', 'execution_time', 'subgraph', - 'full_op_name', 'op_info'] + _col_names = ['op_name', 'op_type', 'execution_time', 'subgraph', + 'full_op_name', 'op_info'] _file_name_aicore_detail_time = 'aicore_intermediate_{}_detail.csv' _file_name_framework_info = 'framework_raw_{}.csv' @@ -210,11 +215,11 @@ class AicoreDetailAnalyser(BaseAnalyser): is_display_full_op_name (bool): Whether to display the operator full name. """ - self._display_col_names = self.__col_names__[0:4] + self._display_col_names = self._col_names[0:4] if is_display_full_op_name: - self._display_col_names.append(self.__col_names__[4]) + self._display_col_names.append(self._col_names[4]) if is_display_detail: - self._display_col_names.append(self.__col_names__[5]) + self._display_col_names.append(self._col_names[5]) def _convert_framework_field_type(self, row): """ @@ -253,10 +258,13 @@ class AicpuAnalyser(BaseAnalyser): profiling_dir (str): The directory where the parsed profiling files are located. device_id (str): The device ID. + + Raises: + ProfilerPathErrorException: If the profiling dir is invalid. """ - __col_names__ = ['serial_number', 'op_name', 'total_time', 'dispatch_time', - 'RunV2_start', 'compute_start', 'memcpy_start', - 'memcpy_end', 'RunV2_end'] + _col_names = ['serial_number', 'op_name', 'total_time', 'dispatch_time', + 'RunV2_start', 'compute_start', 'memcpy_start', 'memcpy_end', + 'RunV2_end'] _file_name_aicpu_time = 'aicpu_intermediate_{}.csv' def _load(self): diff --git a/mindinsight/profiler/analyser/analyser_factory.py b/mindinsight/profiler/analyser/analyser_factory.py index ea0148babf30eb844602f97dd9b9d17944de197c..d13ed1aeedcfac5f207781326b2b5e6162345f00 100644 --- a/mindinsight/profiler/analyser/analyser_factory.py +++ b/mindinsight/profiler/analyser/analyser_factory.py @@ -24,13 +24,9 @@ class AnalyserFactory: """ The analyser factory is used to create analyser special instance. - Currently the factory supports creating `AicoreTypeAnalyser`, - `AicoreDetailAnalyser`, `AicpuAnalyser` and `StepTraceAnalyser`. - The `AicoreTypeAnalyser` is used to analyze execution time according to AICORE operator type. - The `AicoreDetailAnalyser` is used to analyze execution time according to - all specific AICORE operator. The `AicpuAnalyser` is used to analyze - execution time according to all specific AICPU operator. - The `StepTraceAnalyser` is used to analyze the execution time according to different process. + Depending on the analyser type, different analyzers can be created. Users + can use the created analyser to query and analyse profiling data, such as + operator information, step trace data and so on. Examples: >>> analyser = AnalyserFactory.instance().get_analyser( @@ -72,6 +68,10 @@ class AnalyserFactory: analyser_class_name = ''.join([name.capitalize() for name in subnames]) analyser_class_name += 'Analyser' - if not hasattr(analyser_module, analyser_class_name): - raise ProfilerAnalyserNotExistException(analyser_type) - return getattr(analyser_module, analyser_class_name)(*args) + analyser_sub_modules = dir(analyser_module) + for sub_module in analyser_sub_modules: + if sub_module.endswith('analyser') and sub_module != 'base_analyser': + analyser_sub_module = getattr(analyser_module, sub_module) + if hasattr(analyser_sub_module, analyser_class_name): + return getattr(analyser_sub_module, analyser_class_name)(*args) + raise ProfilerAnalyserNotExistException(analyser_type) diff --git a/mindinsight/profiler/analyser/base_analyser.py b/mindinsight/profiler/analyser/base_analyser.py index c24b250ae9abeedffac15bd8137f0be247df7f12..8b4d31c885678210f5af89420bda1628867065c5 100644 --- a/mindinsight/profiler/analyser/base_analyser.py +++ b/mindinsight/profiler/analyser/base_analyser.py @@ -39,8 +39,11 @@ class BaseAnalyser(ABC): profiling_dir (str): The directory where the parsed profiling files are located. device_id (str): The device ID. + + Raises: + ProfilerPathErrorException: If the profiling dir is invalid. """ - __col_names__ = [] + _col_names = [] def __init__(self, profiling_dir, device_id): self._profiling_dir = self._normalize_profiling_dir(profiling_dir) @@ -61,7 +64,7 @@ class BaseAnalyser(ABC): @property def col_names(self): """The column names in the parsed profiling file.""" - return self.__col_names__ + return self._col_names @property def data(self): @@ -87,7 +90,7 @@ class BaseAnalyser(ABC): group_condition = condition.get('group_condition') self._result = [] - self._display_col_names = self.__col_names__[:] + self._display_col_names = self._col_names[:] self._filter(filter_condition) self._size = len(self._result) if sort_condition: @@ -148,7 +151,7 @@ class BaseAnalyser(ABC): if not sort_name: return try: - index = self.__col_names__.index(sort_name) + index = self._col_names.index(sort_name) except ValueError: raise ProfilerColumnNotExistException(sort_name) if self._none_sort_col_names and sort_name in self._none_sort_col_names: @@ -186,8 +189,8 @@ class BaseAnalyser(ABC): for condition_key, condition_value in condition.items(): if condition_key in self._none_filter_condition_key: continue - if condition_key in self.__col_names__: - index = self.__col_names__.index(condition_key) + if condition_key in self._col_names: + index = self._col_names.index(condition_key) actual_value = item[index] for exp_key, exp_value in condition_value.items(): if not self._is_match_condition( diff --git a/mindinsight/profiler/analyser/minddata_analyser.py b/mindinsight/profiler/analyser/minddata_analyser.py index 29738833c760f9cc0fb13aafcc7e6db4d3c44da2..7f9ea3456b162bdecba87a347618cf6315cb8fba 100644 --- a/mindinsight/profiler/analyser/minddata_analyser.py +++ b/mindinsight/profiler/analyser/minddata_analyser.py @@ -15,7 +15,7 @@ """Data process analyser.""" import os -from mindinsight.profiler.analyser import BaseAnalyser +from mindinsight.profiler.analyser.base_analyser import BaseAnalyser class MinddataAnalyser(BaseAnalyser): diff --git a/mindinsight/profiler/analyser/minddata_pipeline_analyser.py b/mindinsight/profiler/analyser/minddata_pipeline_analyser.py new file mode 100644 index 0000000000000000000000000000000000000000..321ccb352ed10230effa9ad1b0d7cb78063abac6 --- /dev/null +++ b/mindinsight/profiler/analyser/minddata_pipeline_analyser.py @@ -0,0 +1,285 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""The minddata pipeline analyser class.""" +import csv +import json +import os +import sys + +from mindinsight.profiler.analyser.base_analyser import BaseAnalyser +from mindinsight.profiler.common.exceptions.exceptions import \ + ProfilerPipelineOpNotExistException +from mindinsight.profiler.common.log import logger + + +class MinddataPipelineAnalyser(BaseAnalyser): + """ + The analyser for analyzing the minddata pipeline operator and queue data. + + Args: + profiling_dir (str): The directory where the parsed profiling files are + located. + device_id (str): The device ID. + + Raises: + ProfilerPathErrorException: If the profiling dir is invalid. + """ + _col_names = ['op_id', 'op_type', 'num_workers', 'output_queue_size', + 'output_queue_average_size', 'output_queue_length', + 'output_queue_usage_rate', 'sample_interval', 'parent_id', + 'children_id'] + _file_name_pipeline = 'minddata_pipeline_raw_{}.csv' + _index_op_id = 0 + _index_op_type = 1 + _index_num_workers = 2 + _index_output_queue_size = 3 + _index_output_queue_average_size = 4 + _index_output_queue_length = 5 + _index_output_queue_usage_rate = 6 + _index_sample_interval = 7 + _index_parent_id = 8 + _index_children_id = 9 + + def __init__(self, profiling_dir, device_id): + super().__init__(profiling_dir, device_id) + self._none_filter_condition_key = ['threshold', 'is_display_op_detail'] + self._none_sort_col_names = ['output_queue_size', 'children_id'] + self._op_id_index_map = self._get_op_id_index_map() + + def get_op_and_parent_op_info(self, op_id): + """ + Get the operator and parent operator information by `op_id`. + + Args: + op_id (int): The minddata pipeline operator ID. + + Returns: + dict, the operator and parent operator information. + + Raises: + ProfilerPipelineOpNotExistException: If the minddata pipeline + operator does not exist. + """ + index = self._op_id_index_map.get(op_id) + if index is None: + raise ProfilerPipelineOpNotExistException(str(op_id)) + op_info = self._data[index] + parent_id = op_info[self._index_parent_id] + parent_index = self._op_id_index_map.get(parent_id) + if parent_index is None: + parent_op = None + queue_info = None + else: + parent_op_info = self._data[parent_index] + parent_op = { + 'op_id': parent_op_info[self._index_op_id], + 'op_type': parent_op_info[self._index_op_type], + 'num_workers': parent_op_info[self._index_num_workers] + } + queue_info = { + 'output_queue_size': op_info[self._index_output_queue_size], + 'output_queue_average_size': + op_info[self._index_output_queue_average_size], + 'output_queue_length': op_info[self._index_output_queue_length], + 'output_queue_usage_rate': + op_info[self._index_output_queue_usage_rate], + 'sample_interval': op_info[self._index_sample_interval] + } + + current_op = { + 'op_id': op_info[self._index_op_id], + 'op_type': op_info[self._index_op_type], + 'num_workers': op_info[self._index_num_workers] + } + return { + 'current_op': current_op, + 'parent_op': parent_op, + 'queue_info': queue_info + } + + def _load(self): + """Load data according to the parsed minddata pipeline file.""" + pipeline_file_path = os.path.join( + self._profiling_dir, + self._file_name_pipeline.format(self._device_id) + ) + if not os.path.isfile(pipeline_file_path): + logger.warning('The file <%s> does not exist.', pipeline_file_path) + return + + with open(pipeline_file_path, 'r') as file: + csv.field_size_limit(sys.maxsize) + csv_reader = csv.reader(file) + _ = next(csv_reader) + for info in csv_reader: + self._data.append(self._convert_field_type(info)) + + def _filter(self, filter_condition): + """ + Filter the profiling data according to the filter condition. + + Args: + filter_condition (dict): The filter condition. + """ + def _inner_filter(item: list): + return self._default_filter(item, filter_condition) + + def _inner_map(item: list): + inner_item = item[0:2] + inner_item.extend(item[4:]) + return inner_item + + threshold = filter_condition.get('threshold') + is_display_op_detail = filter_condition.get( + 'is_display_op_detail', False + ) + self._set_display_col_name(is_display_op_detail) + + filter_result = list(filter(_inner_filter, self._data)) + if threshold: + low_threshold = threshold[1] + high_threshold = threshold[0] + filter_result = self._filter_outside_threshold( + filter_result, low_threshold, high_threshold + ) + + if is_display_op_detail: + self._result = filter_result + else: + self._result = list(map(_inner_map, filter_result)) + + def _filter_outside_threshold(self, data, low_threshold, high_threshold): + """ + Get the data outside the threshold range. + + Args: + data (list[list]): The filtered data. + low_threshold (float): The low threshold. + high_threshold (float): The high threshold. + + Returns: + list[list], the data outside the threshold range. + """ + root_node = None + leaf_nodes = [] + all_below_low_threshold = True + all_higher_high_threshold = True + result = [] + for item in data: + parent_id = item[self._index_parent_id] + if parent_id is None: + root_node = item + continue + + # current usage rate compared to the threshold + cur_usage_rate = item[self._index_output_queue_usage_rate] + is_low = False + if cur_usage_rate < low_threshold: + is_low = True + else: + all_below_low_threshold = False + if cur_usage_rate < high_threshold: + all_higher_high_threshold = False + + # the child node usage rate compared to the threshold + child_ids = item[self._index_children_id] + if not child_ids: + leaf_nodes.append(item) + continue + child_usage_rates = [ + self._get_usage_rate_by_op_id(op_id) for op_id in child_ids + ] + is_high = True + for usage_rate in child_usage_rates: + if usage_rate < high_threshold: + is_high = False + break + + if is_high and is_low: + result.append(item) + + if all_below_low_threshold: + result = leaf_nodes + elif all_higher_high_threshold: + result = [root_node] + return result + + def _get_usage_rate_by_op_id(self, op_id): + """ + Gets the usage rate of the queue corresponding to the specified operator. + + Args: + op_id (int): The pipeline operator ID. + + Returns: + float, the usage rate of the queue corresponding to the specified + operator. + """ + index = self._op_id_index_map.get(op_id) + op_info = self._data[index] + return op_info[self._index_output_queue_usage_rate] + + def _set_display_col_name(self, is_display_op_detail): + """ + Set the display column name according to the filter condition. + + Args: + is_display_op_detail (bool): Whether to display the detailed operator + information. + """ + if not is_display_op_detail: + self._display_col_names = self._col_names[0:2] + self._display_col_names.extend(self._col_names[4:]) + + def _convert_field_type(self, row): + """ + Convert the field type of minddata pipeline file to the specific type. + + Args: + row (list[str]): One row data from parsed data. + + Returns: + list[Union[str, int, float]], the converted data. + """ + return [ + int(row[self._index_op_id]), + row[self._index_op_type], + int(row[self._index_num_workers]), + json.loads(row[self._index_output_queue_size]) + if row[self._index_output_queue_size] else None, + float(row[self._index_output_queue_average_size]) + if row[self._index_output_queue_average_size] else None, + int(row[self._index_output_queue_length]) + if row[self._index_output_queue_length] else None, + float(row[self._index_output_queue_usage_rate]) + if row[self._index_output_queue_usage_rate] else None, + int(row[self._index_sample_interval]), + int(row[self._index_parent_id]) + if row[self._index_parent_id] else None, + json.loads(row[self._index_children_id]) + if row[self._index_children_id] else None + ] + + def _get_op_id_index_map(self): + """ + Get the map of the operator id and index in data. + + Returns: + dict, the map of the operator id and index in data. + """ + the_map = {} + for index, op_info in enumerate(self._data): + the_map[op_info[self._index_op_id]] = index + return the_map diff --git a/mindinsight/profiler/analyser/step_trace_analyser.py b/mindinsight/profiler/analyser/step_trace_analyser.py index e611e3697c56a55613501948abf809bc605b019c..d0d079745a4402294e5428021e808da88265e334 100644 --- a/mindinsight/profiler/analyser/step_trace_analyser.py +++ b/mindinsight/profiler/analyser/step_trace_analyser.py @@ -27,7 +27,7 @@ from mindinsight.profiler.common.util import query_latest_trace_time_file, get_f class StepTraceAnalyser(BaseAnalyser): """The analyser for analyzing training steps.""" - __col_names__ = [] + _col_names = [] _attr_ui_name = 'name' _attr_ui_start = 'start' _attr_ui_duration = 'duration' @@ -89,7 +89,7 @@ class StepTraceAnalyser(BaseAnalyser): self.__column__ = next(csv_reader) self._data = list(csv_reader) self._size = len(self._data) - 1 - self._display_col_names = self.__col_names__[:] + self._display_col_names = self._col_names[:] def _filter(self, filter_condition): """ diff --git a/mindinsight/profiler/common/exceptions/error_code.py b/mindinsight/profiler/common/exceptions/error_code.py index 9e350a48d551fc446cf3f64af6ef5ce2a0fc16be..a592e1925249e34b7574f4d448021d75aa1bf5d9 100644 --- a/mindinsight/profiler/common/exceptions/error_code.py +++ b/mindinsight/profiler/common/exceptions/error_code.py @@ -49,6 +49,7 @@ class ProfilerErrors(ProfilerMgrErrors): SORT_CONDITION_ERROR = 5 | _ANALYSER_MASK FILTER_CONDITION_ERROR = 6 | _ANALYSER_MASK COLUMN_NOT_SUPPORT_SORT_ERROR = 7 | _ANALYSER_MASK + PIPELINE_OP_NOT_EXIST_ERROR = 8 | _ANALYSER_MASK @@ -80,3 +81,4 @@ class ProfilerErrorMsg(Enum): GROUP_CONDITION_ERROR = 'The group_condition in search_condition error, {}' SORT_CONDITION_ERROR = 'The sort_condition in search_condition error, {}' COLUMN_NOT_SUPPORT_SORT_ERROR = 'The column {} does not support to sort.' + PIPELINE_OP_NOT_EXIST_ERROR = 'The minddata pipeline operator {} does not exist.' diff --git a/mindinsight/profiler/common/exceptions/exceptions.py b/mindinsight/profiler/common/exceptions/exceptions.py index ec4a9f14ee38e27a172844099d33f4fb22892867..18fb41f8fefdce220213d32dcd7bc2511d988bbf 100644 --- a/mindinsight/profiler/common/exceptions/exceptions.py +++ b/mindinsight/profiler/common/exceptions/exceptions.py @@ -214,3 +214,14 @@ class JobIdMismatchException(MindInsightException): message=ProfilerErrorMsg.JOB_ID_MISMATCH_ERROR.value, http_code=400 ) + + +class ProfilerPipelineOpNotExistException(MindInsightException): + """The minddata pipeline operator does not exist error in profiler module.""" + + def __init__(self, msg): + super(ProfilerPipelineOpNotExistException, self).__init__( + error=ProfilerErrors.PIPELINE_OP_NOT_EXIST_ERROR, + message=ProfilerErrorMsg.PIPELINE_OP_NOT_EXIST_ERROR.value.format(msg), + http_code=400 + ) diff --git a/mindinsight/profiler/common/validator/validate.py b/mindinsight/profiler/common/validator/validate.py index e70855b5059caf08c5e424ab7acd5ed2b97edbaf..24aba79a4da9ce6c03b899f506398a3f3029cfbc 100644 --- a/mindinsight/profiler/common/validator/validate.py +++ b/mindinsight/profiler/common/validator/validate.py @@ -27,6 +27,11 @@ AICORE_TYPE_COL = ["op_type", "execution_time", "execution_frequency", "precent" AICORE_DETAIL_COL = ["op_name", "op_type", "execution_time", "subgraph", "full_op_name"] AICPU_COL = ["serial_number", "op_name", "total_time", "dispatch_time", "RunV2_start", "compute_start", "memcpy_start", "memcpy_end", "RunV2_end"] +MINDDATA_PIPELINE_COL = [ + 'op_id', 'op_type', 'num_workers', 'output_queue_average_size', + 'output_queue_length', 'output_queue_usage_rate', 'sample_interval', + 'parent_id' +] def validate_condition(search_condition): @@ -68,7 +73,7 @@ def validate_condition(search_condition): raise ProfilerOpTypeException("The op_type must in ['aicpu', 'aicore_type', 'aicore_detail']") if "group_condition" in search_condition: - validata_group_condition(search_condition) + validate_group_condition(search_condition) if "sort_condition" in search_condition: validate_sort_condition(search_condition, search_scope) @@ -77,7 +82,7 @@ def validate_condition(search_condition): validate_filter_condition(search_condition) -def validata_group_condition(search_condition): +def validate_group_condition(search_condition): """ Verify the group_condition in search_condition is valid or not. @@ -91,7 +96,7 @@ def validata_group_condition(search_condition): if not isinstance(group_condition, dict): raise ProfilerGroupConditionException("The group condition must be dict.") if "limit" in group_condition: - limit = group_condition.get("limit", 0) + limit = group_condition.get("limit", 10) if isinstance(limit, bool) \ or not isinstance(group_condition.get("limit"), int): log.error("The limit must be int.") @@ -145,6 +150,35 @@ def validate_sort_condition(search_condition, search_scope): raise ProfilerSortConditionException(err_msg) +def validate_op_filter_condition(op_condition, value_type=str, value_type_msg='str'): + """ + Verify the op_condition in filter_condition is valid or not. + + Args: + op_condition (dict): The op_condition in search_condition. + value_type (type): The value type. Default: str. + value_type_msg (str): The value type message. Default: 'str'. + + Raises: + ProfilerFilterConditionException: If the filter_condition param in search_condition is invalid. + """ + filter_key = ["in", "not_in", "partial_match_str_in"] + if not isinstance(op_condition, dict): + raise ProfilerFilterConditionException("The filter condition value must be dict.") + for key, value in op_condition.items(): + if not isinstance(key, str): + raise ProfilerFilterConditionException("The filter key must be str") + if not isinstance(value, list): + raise ProfilerFilterConditionException("The filter value must be list") + if key not in filter_key: + raise ProfilerFilterConditionException("The filter key must in {}.".format(filter_key)) + for item in value: + if not isinstance(item, value_type): + raise ProfilerFilterConditionException( + "The item in filter value must be {}.".format(value_type_msg) + ) + + def validate_filter_condition(search_condition): """ Verify the filter_condition in search_condition is valid or not. @@ -155,33 +189,9 @@ def validate_filter_condition(search_condition): Raises: ProfilerFilterConditionException: If the filter_condition param in search_condition is invalid. """ - def validate_op_filter_condition(op_condition): - """ - Verify the op_condition in filter_condition is valid or not. - - Args: - op_condition (dict): The op_condition in search_condition. - - Raises: - ProfilerFilterConditionException: If the filter_condition param in search_condition is invalid. - """ - if not isinstance(op_condition, dict): - raise ProfilerFilterConditionException("Wrong op_type filter condition.") - for key, value in op_condition.items(): - if not isinstance(key, str): - raise ProfilerFilterConditionException("The filter key must be str") - if not isinstance(value, list): - raise ProfilerFilterConditionException("The filter value must be list") - if key not in filter_key: - raise ProfilerFilterConditionException("The filter key must in {}.".format(filter_key)) - for item in value: - if not isinstance(item, str): - raise ProfilerFilterConditionException("The item in filter value must be str") - filter_condition = search_condition.get("filter_condition") if not isinstance(filter_condition, dict): raise ProfilerFilterConditionException("The filter condition must be dict.") - filter_key = ["in", "not_in", "partial_match_str_in"] if filter_condition: if "op_type" in filter_condition: op_type_condition = filter_condition.get("op_type") @@ -232,3 +242,65 @@ def validate_ui_proc(proc_name): if proc_name not in accept_names: log.error("Invalid proc_name. The proc_name for restful api is in %s", accept_names) raise ProfilerParamValueErrorException(f'proc_name should be in {accept_names}.') + + +def validate_minddata_pipeline_condition(condition): + """ + Verify the minddata pipeline search condition is valid or not. + + Args: + condition (dict): The minddata pipeline search condition. + + Raises: + ProfilerParamTypeErrorException: If the type of the search condition is + invalid. + ProfilerDeviceIdException: If the device_id param in the search + condition is invalid. + ProfilerGroupConditionException: If the group_condition param in the + search condition is invalid. + ProfilerSortConditionException: If the sort_condition param in the + search condition is invalid. + ProfilerFilterConditionException: If the filter_condition param in the + search condition is invalid. + """ + if not isinstance(condition, dict): + log.error("Invalid condition type, it should be dict.") + raise ProfilerParamTypeErrorException( + "Invalid condition type, it should be dict." + ) + + if "device_id" in condition: + device_id = condition.get("device_id") + if not isinstance(device_id, str): + raise ProfilerDeviceIdException( + "Invalid device_id type, it should be str." + ) + + if "group_condition" in condition: + validate_group_condition(condition) + + if "sort_condition" in condition: + validate_sort_condition(condition, MINDDATA_PIPELINE_COL) + + if "filter_condition" in condition: + filter_condition = condition.get('filter_condition') + if not isinstance(filter_condition, dict): + raise ProfilerFilterConditionException( + "The filter condition must be dict." + ) + for key, value in filter_condition.items(): + if key == 'op_id': + validate_op_filter_condition( + value, value_type=int, value_type_msg='int' + ) + elif key == 'op_type': + validate_op_filter_condition(value) + elif key == 'is_display_op_detail': + if not isinstance(key, bool): + raise ProfilerFilterConditionException( + "The condition must be bool." + ) + else: + raise ProfilerFilterConditionException( + "The key {} of filter_condition is not support.".format(key) + ) diff --git a/mindinsight/profiler/parser/minddata_pipeline_parser.py b/mindinsight/profiler/parser/minddata_pipeline_parser.py new file mode 100644 index 0000000000000000000000000000000000000000..75f9a68a70c304cfdc5bd765aaa84715e1f54817 --- /dev/null +++ b/mindinsight/profiler/parser/minddata_pipeline_parser.py @@ -0,0 +1,289 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""Thr parser for parsing minddata pipeline files.""" +import csv +import json +import os +from queue import Queue + +from marshmallow import ValidationError + +from mindinsight.profiler.common.exceptions.exceptions import \ + ProfilerPathErrorException, ProfilerFileNotFoundException, \ + ProfilerDirNotFoundException, ProfilerRawFileException +from mindinsight.profiler.common.log import logger +from mindinsight.profiler.common.validator.validate_path import \ + validate_and_normalize_path + + +class MinddataPipelineParser: + """ + Thr parser for parsing minddata pipeline files. + + Args: + source_dir (str): The minddata pipeline source dir. + device_id (str): The device ID. + output_path (str): The directory of the parsed file. Default: `./`. + + Raises: + ProfilerPathErrorException: If the minddata pipeline file path or + the output path is invalid. + ProfilerFileNotFoundException: If the minddata pipeline file or + the output dir does not exist. + """ + _raw_pipeline_file_name = 'pipeline_profiling_{}.json' + _parsed_pipeline_file_name = 'minddata_pipeline_raw_{}.csv' + _col_names = [ + 'op_id', 'op_type', 'num_workers', 'output_queue_size', + 'output_queue_average_size', 'output_queue_length', + 'output_queue_usage_rate', 'sample_interval', 'parent_id', 'children_id' + ] + + def __init__(self, source_dir, device_id, output_path='./'): + self._device_id = device_id + self._pipeline_path = self._get_pipeline_path(source_dir) + self._save_path = self._get_save_path(output_path) + + @property + def save_path(self): + """ + The property of save path. + + Returns: + str, the save path. + """ + return self._save_path + + def parse(self): + """ + Parse the minddata pipeline files. + + Raises: + ProfilerRawFileException: If fails to parse the raw file of + minddata pipeline or the file is empty. + """ + with open(self._pipeline_path, 'r') as file: + try: + pipeline_info = json.load(file) + except (json.JSONDecodeError, TypeError) as err: + logger.exception(err) + raise ProfilerRawFileException( + 'Fail to parse minddata pipeline file.' + ) + if not pipeline_info: + logger.warning('The minddata pipeline file is empty.') + raise ProfilerRawFileException( + 'The minddata pipeline file is empty.' + ) + + self._parse_and_save(pipeline_info) + + def _get_pipeline_path(self, source_dir): + """ + Get the minddata pipeline file path. + + Args: + source_dir (str): The minddata pipeline source dir. + + Returns: + str, the minddata pipeline file path. + """ + pipeline_path = os.path.join( + source_dir, + self._raw_pipeline_file_name.format(self._device_id) + ) + + try: + pipeline_path = validate_and_normalize_path(pipeline_path, 'profiler') + except ValidationError: + logger.warning('Minddata pipeline file is invalid.') + raise ProfilerPathErrorException('Minddata pipeline file is invalid.') + if not os.path.isfile(pipeline_path): + logger.warning( + 'The minddata pipeline file <%s> not found.', pipeline_path + ) + raise ProfilerFileNotFoundException(pipeline_path) + + return pipeline_path + + def _get_save_path(self, output_path): + """ + Get the save path. + + Args: + output_path (str): The output dir. + + Returns: + str, the save path. + """ + try: + output_dir = validate_and_normalize_path(output_path, 'profiler') + except ValidationError: + logger.warning('Output path is invalid.') + raise ProfilerPathErrorException('Output path is invalid.') + if not os.path.isdir(output_dir): + logger.warning('The output dir <%s> not found.', output_dir) + raise ProfilerDirNotFoundException(output_dir) + return os.path.join( + output_dir, self._parsed_pipeline_file_name.format(self._device_id) + ) + + def _parse_and_save(self, pipeline_info): + """ + Parse and save the parsed minddata pipeline file. + + Args: + pipeline_info (dict): The pipeline info reads from the raw file of + the minddata pipeline. + + Raises: + ProfilerRawFileException: If the format of minddata pipeline raw + file is wrong. + """ + sample_interval = pipeline_info.get('sampling_interval') + op_info = pipeline_info.get('op_info') + if sample_interval is None or not op_info: + raise ProfilerRawFileException( + 'The format of minddata pipeline raw file is wrong.' + ) + + op_id_info_cache = {} + for item in op_info: + op_id_info_cache[item.get('op_id')] = item + + with open(self._save_path, 'w') as save_file: + csv_writer = csv.writer(save_file) + csv_writer.writerow(self._col_names) + self._parse_and_save_op_info( + csv_writer, op_id_info_cache, sample_interval + ) + + def _parse_and_save_op_info(self, csv_writer, op_id_info_cache, + sample_interval): + """ + Parse and save the minddata pipeline operator information. + + Args: + csv_writer (csv.writer): The csv writer. + op_id_info_cache (dict): The operator id and information cache. + sample_interval (int): The sample interval. + + Raises: + ProfilerRawFileException: If the operator that id is 0 does not exist. + """ + queue = Queue() + root_node = op_id_info_cache.get(0) + if not root_node: + raise ProfilerRawFileException( + 'The format of minddata pipeline raw file is wrong, ' + 'the operator that id is 0 does not exist.' + ) + root_node['parent_id'] = None + queue.put_nowait(root_node) + + while not queue.empty(): + node = queue.get_nowait() + self._update_child_node(node, op_id_info_cache) + csv_writer.writerow(self._get_op_info(node, sample_interval)) + + op_id = node.get('op_id') + children_ids = node.get('children') + if not children_ids: + continue + for child_op_id in children_ids: + sub_node = op_id_info_cache.get(child_op_id) + sub_node['parent_id'] = op_id + queue.put_nowait(sub_node) + + def _update_child_node(self, node, op_id_info_cache): + """ + Updates the child node information of the operator. + + Args: + node (dict): The node represents an operator. + op_id_info_cache (dict): The operator id and information cache. + """ + child_op_ids = node.get('children') + if not child_op_ids: + return + + queue = Queue() + self._cp_list_item_to_queue(child_op_ids, queue) + + new_child_op_ids = [] + while not queue.empty(): + child_op_id = queue.get_nowait() + child_node = op_id_info_cache.get(child_op_id) + if child_node is None: + continue + metrics = child_node.get('metrics') + if not metrics or not metrics.get('output_queue'): + op_ids = child_node.get('children') + if op_ids: + self._cp_list_item_to_queue(op_ids, queue) + else: + new_child_op_ids.append(child_op_id) + + node['children'] = new_child_op_ids + + def _get_op_info(self, op_node, sample_interval): + """ + Get the operator information. + + Args: + op_node (dict): The node represents an operator. + sample_interval (int): The sample interval. + + Returns: + list[str, int, float], the operator information. + """ + queue_size = None + queue_average_size = None + queue_length = None + queue_usage_rate = None + metrics = op_node.get('metrics') + if metrics: + output_queue = metrics.get('output_queue') + if output_queue: + queue_size = output_queue.get('size') + queue_average_size = sum(queue_size) / len(queue_size) + queue_length = output_queue.get('length') + queue_usage_rate = queue_average_size / queue_length + + children_id = op_node.get('children') + op_info = [ + op_node.get('op_id'), + op_node.get('op_type'), + op_node.get('num_workers'), + queue_size, + queue_average_size, + queue_length, + queue_usage_rate, + sample_interval, + op_node.get('parent_id'), + children_id if children_id else None + ] + return op_info + + def _cp_list_item_to_queue(self, inner_list, queue): + """ + Copy the contents of a list to a queue. + + Args: + inner_list (list): The list. + queue (Queue): The target queue. + """ + for item in inner_list: + queue.put_nowait(item) diff --git a/mindinsight/profiler/profiling.py b/mindinsight/profiler/profiling.py index d7426d508dc875c4b27f226294dde967f923214c..07d5d2852b9249c7bab0587c46a80af750a3f1cb 100644 --- a/mindinsight/profiler/profiling.py +++ b/mindinsight/profiler/profiling.py @@ -30,6 +30,8 @@ from mindinsight.profiler.parser.aicpu_data_parser import DataPreProcessParser from mindinsight.profiler.parser.framework_parser import FrameworkParser from mindinsight.profiler.parser.hwts_log_parser import HWTSLogParser from mindinsight.profiler.parser.minddata_parser import MinddataParser +from mindinsight.profiler.parser.minddata_pipeline_parser import \ + MinddataPipelineParser from mindinsight.profiler.parser.optime_parser import OPComputeTimeParser from mindinsight.profiler.parser.step_trace_parser import StepTraceParser from mindinsight.utils.exceptions import MindInsightException @@ -198,11 +200,18 @@ class Profiler: # Parsing minddata AICPU profiling MinddataParser.execute(source_path, self._output_path, self._dev_id) + # parse minddata pipeline operator and queue + try: + pipeline_parser = MinddataPipelineParser(job_id, self._dev_id) + pipeline_parser.parse() + except MindInsightException as err: + logger.warning(err.message) + # analyse op compute time info try: self._analyser_op_info() except MindInsightException as err: - logger.error(err.message) + logger.warning(err.message) # analyse step trace info self._analyse_step_trace(source_path, framework_parser) diff --git a/tests/ut/backend/profiler/test_profiler_api_minddata_pipeline.py b/tests/ut/backend/profiler/test_profiler_api_minddata_pipeline.py new file mode 100644 index 0000000000000000000000000000000000000000..af720bfb4f400eddd174832b8dc578e2f86cdc2d --- /dev/null +++ b/tests/ut/backend/profiler/test_profiler_api_minddata_pipeline.py @@ -0,0 +1,191 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""Test profiler restful api of minddata pipeline.""" +import json +from unittest import mock + +from flask import Response +from marshmallow import ValidationError + +from mindinsight.backend.application import APP + + +class TestMinddataPipelineApi: + """Test the minddata pipeline restful api of profiler.""" + def setup_method(self): + """Test init.""" + APP.response_class = Response + self._app_client = APP.test_client() + self._url_op_queue = '/v1/mindinsight/profile/minddata-pipeline/op-queue' + self._url_queue = '/v1/mindinsight/profile/minddata-pipeline/queue' + + @mock.patch('mindinsight.backend.profiler.profile_api.settings') + @mock.patch('mindinsight.profiler.analyser.base_analyser.BaseAnalyser.query') + def test_get_minddata_pipeline_op_queue_info_1(self, *args): + """Test the function of querying operator and queue information.""" + expect_result = { + 'col_name': [ + 'op_id', 'op_type', 'output_queue_average_size', + 'output_queue_length', 'output_queue_usage_rate', + 'sample_interval', 'parent_id', 'children_id'], + 'object': [], + 'size': 0 + } + args[0].return_value = expect_result + args[1].SUMMARY_BASE_DIR = '/path/to/summary_base_dir' + + url = self._url_op_queue + '?train_id=run1&profile=profiler' + body_data = {} + response = self._app_client.post(url, data=json.dumps(body_data)) + assert response.status_code == 200 + assert expect_result == response.get_json() + + def test_get_minddata_pipeline_op_queue_info_2(self): + """Test the function of querying operator and queue information.""" + expect_result = { + 'error_code': '50540002', + 'error_msg': 'Invalid parameter value. No profiler_dir or train_id.' + } + + url = self._url_op_queue + '?train_id=run1' + body_data = {} + response = self._app_client.post(url, data=json.dumps(body_data)) + assert response.status_code == 400 + assert expect_result == response.get_json() + + @mock.patch('mindinsight.backend.profiler.profile_api.validate_and_normalize_path') + @mock.patch('mindinsight.backend.profiler.profile_api.settings') + def test_get_minddata_pipeline_op_queue_info_3(self, *args): + """Test the function of querying operator and queue information.""" + args[0].SUMMARY_BASE_DIR = '/path/to/summary_base_dir' + args[1].side_effect = ValidationError('xxx') + + expect_result = { + 'error_code': '50540002', + 'error_msg': 'Invalid parameter value. Invalid profiler dir.' + } + + url = self._url_op_queue + '?train_id=run1&profile=profiler' + body_data = {} + response = self._app_client.post(url, data=json.dumps(body_data)) + assert response.status_code == 400 + assert expect_result == response.get_json() + + @mock.patch('mindinsight.backend.profiler.profile_api.settings') + def test_get_minddata_pipeline_op_queue_info_4(self, *args): + """Test the function of querying operator and queue information.""" + args[0].SUMMARY_BASE_DIR = '/path/to/summary_base_dir' + + expect_result = { + 'error_code': '50540002', + 'error_msg': 'Invalid parameter value. Json data parse failed.' + } + + url = self._url_op_queue + '?train_id=run1&profile=profiler' + response = self._app_client.post(url, data='xxx') + assert response.status_code == 400 + assert expect_result == response.get_json() + + @mock.patch('mindinsight.backend.profiler.profile_api.settings') + @mock.patch('mindinsight.profiler.analyser.minddata_pipeline_analyser.' + 'MinddataPipelineAnalyser.get_op_and_parent_op_info') + def test_get_minddata_pipeline_queue_info_1(self, *args): + """Test the function of querying queue information.""" + expect_result = { + 'current_op': { + 'op_id': 1, + 'op_type': 'Shuffle', + 'num_workers': 1 + }, + 'queue_info': { + 'output_queue_size': [10, 20, 30], + 'output_queue_average_size': 20.0, + 'output_queue_length': 64, + 'output_queue_usage_rate': 0.3125, + 'sample_interval': 10 + }, + 'parent_op': { + 'op_id': 0, + 'op_type': 'Batch', + 'num_workers': 4 + } + } + args[0].return_value = expect_result + args[1].SUMMARY_BASE_DIR = '/path/to/summary_base_dir' + url = self._url_queue + '?train_id=run1&profile=profiler&device_id=0&op_id=1' + response = self._app_client.get(url) + assert response.status_code == 200 + assert expect_result == response.get_json() + + def test_get_minddata_pipeline_queue_info_2(self): + """Test the function of querying queue information.""" + expect_result = { + 'error_code': '50540002', + 'error_msg': 'Invalid parameter value. No profiler_dir or train_id.' + } + + url = self._url_queue + '?profile=profiler&device_id=0&op_id=1' + response = self._app_client.get(url) + assert response.status_code == 400 + assert expect_result == response.get_json() + + @mock.patch('mindinsight.backend.profiler.profile_api.validate_and_normalize_path') + @mock.patch('mindinsight.backend.profiler.profile_api.settings') + def test_get_minddata_pipeline_queue_info_3(self, *args): + """Test the function of querying queue information.""" + args[0].SUMMARY_BASE_DIR = '/path/to/summary_base_dir' + args[1].side_effect = ValidationError('xxx') + + expect_result = { + 'error_code': '50540002', + 'error_msg': 'Invalid parameter value. Invalid profiler dir.' + } + + url = self._url_queue + '?train_id=run1&profile=profiler&device_id=0&op_id=1' + response = self._app_client.get(url) + assert response.status_code == 400 + assert expect_result == response.get_json() + + @mock.patch('mindinsight.backend.profiler.profile_api.settings') + def test_get_minddata_pipeline_queue_info_4(self, *args): + """Test the function of querying queue information.""" + args[0].SUMMARY_BASE_DIR = '/path/to/summary_base_dir' + + expect_result = { + 'error_code': '50540002', + 'error_msg': 'Invalid parameter value. ' + 'Invalid operator id or operator id does not exist.' + } + + url = self._url_queue + '?train_id=run1&profile=profiler&device_id=0' + response = self._app_client.get(url) + assert response.status_code == 400 + assert expect_result == response.get_json() + + @mock.patch('mindinsight.backend.profiler.profile_api.settings') + def test_get_minddata_pipeline_queue_info_5(self, *args): + """Test the function of querying queue information.""" + args[0].SUMMARY_BASE_DIR = '/path/to/summary_base_dir' + + expect_result = { + 'error_code': '50540002', + 'error_msg': 'Invalid parameter value. ' + 'Invalid operator id or operator id does not exist.' + } + + url = self._url_queue + '?train_id=run1&profile=profiler&device_id=0&op_id=xx' + response = self._app_client.get(url) + assert response.status_code == 400 + assert expect_result == response.get_json() diff --git a/tests/ut/profiler/__init__.py b/tests/ut/profiler/__init__.py index 3c5df639e7877589e2e50bf7dcee45890d9a97df..b85f25fd33bd4a039f1e8cc20e05f233ae9eb5ee 100644 --- a/tests/ut/profiler/__init__.py +++ b/tests/ut/profiler/__init__.py @@ -17,4 +17,5 @@ import os RAW_DATA_BASE = os.path.realpath(os.path.join(os.path.dirname(__file__), '../../utils/resource')) RAW_DATA = os.path.realpath(os.path.join(RAW_DATA_BASE, 'JOB1')) +RAW_DATA_JOB2 = os.path.realpath(os.path.join(RAW_DATA_BASE, 'JOB2')) PROFILER_DIR = os.path.realpath(os.path.join(RAW_DATA_BASE, 'profiler')) diff --git a/tests/ut/profiler/analyser/resource/aicore_intermediate_0_detail.csv b/tests/ut/profiler/analyser/resource/aicore_intermediate_0_detail.csv deleted file mode 100644 index ec117c5346b175d0715a19e5249ef82b5e5fa3a4..0000000000000000000000000000000000000000 --- a/tests/ut/profiler/analyser/resource/aicore_intermediate_0_detail.csv +++ /dev/null @@ -1,11 +0,0 @@ -full_op_time,execution_time -Default/AtomicAddrClean-op104,0.00133 -Default/AtomicAddrClean-op105,0.000987 -Default/AtomicAddrClean-op106,0.001129 -Default/Cast-op10,0.00466 -Default/network-WithLossCell/_backbone-LeNet5/conv1-Conv2d/Cast-op12,0.002366 -Gradients/Default/network-WithLossCell/_backbone-LeNet5/conv1-Conv2d/gradConv2D/Cast-op53,0.004879 -Default/TransData-op11,0.006366 -Gradients/Default/network-WithLossCell/_backbone-LeNet5/gradReshape/TransData-op44,0.006782 -Default/network-WithLossCell/_backbone-LeNet5/conv1-Conv2d/Conv2D-op13,0.05651 -Default/network-WithLossCell/_backbone-LeNet5/fc3-Dense/MatMul-op9,0.370864 diff --git a/tests/ut/profiler/analyser/resource/aicore_intermediate_0_type.csv b/tests/ut/profiler/analyser/resource/aicore_intermediate_0_type.csv deleted file mode 100644 index 56bf368a6c2c6e9844992ff26cdcede84711a7c4..0000000000000000000000000000000000000000 --- a/tests/ut/profiler/analyser/resource/aicore_intermediate_0_type.csv +++ /dev/null @@ -1,6 +0,0 @@ -op_type,execution_time,execution_frequency,percent -AtomicAddrClean,0.007283,6,0.49 -Cast,0.053395,13,3.63 -TransData,0.121800,5,8.23 -Conv2D,0.063656,2,4.33 -MatMul,1.085982,9,73.80 diff --git a/tests/ut/profiler/analyser/resource/framework_raw_0.csv b/tests/ut/profiler/analyser/resource/framework_raw_0.csv deleted file mode 100644 index 5b60ecef49dc854a140531fa64c67a3df0521e34..0000000000000000000000000000000000000000 --- a/tests/ut/profiler/analyser/resource/framework_raw_0.csv +++ /dev/null @@ -1,11 +0,0 @@ -task_id,stream_id,block_dim,full_op_name,op_name,op_type,subgraph,op_info -30290,0,1,Default/AtomicAddrClean-op104,AtomicAddrClean-op104,AtomicAddrClean,Default,"{""input_0"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": """"}}" -30295,0,1,Default/AtomicAddrClean-op105,AtomicAddrClean-op105,AtomicAddrClean,Default,"{""input_0"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": ""10""}}" -30300,0,1,Default/AtomicAddrClean-op106,AtomicAddrClean-op106,AtomicAddrClean,Default,"{""input_0"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": ""84""}}" -30268,0,32,Default/Cast-op10,Cast-op10,Cast,Default,"{""input_0"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": ""32,1,32,32""}, ""output_0"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""32,1,32,32""}}" -30271,0,9,Default/network-WithLossCell/_backbone-LeNet5/conv1-Conv2d/Cast-op12,Cast-op12,Cast,Default,"{""input_0"": {""format"": ""FracZ"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": ""25,1,16,16""}, ""output_0"": {""format"": ""FracZ"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""25,1,16,16""}}" -30320,0,32,Gradients/Default/network-WithLossCell/_backbone-LeNet5/conv1-Conv2d/gradConv2D/Cast-op53,Cast-op53,Cast,Gradients,"{""input_0"": {""format"": ""NC1HWC0"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": ""32,1,28,28,16""}, ""output_0"": {""format"": ""NC1HWC0"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""32,1,28,28,16""}}" -30269,0,32,Default/TransData-op11,TransData-op11,TransData,Default,"{""input_0"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""32,1,32,32""}, ""output_0"": {""format"": ""NC1HWC0"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""32,1,32,32""}}" -30308,0,32,Gradients/Default/network-WithLossCell/_backbone-LeNet5/gradReshape/TransData-op44,TransData-op44,TransData,Gradients,"{""input_0"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""32,16,5,5""}, ""output_0"": {""format"": ""NC1HWC0"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""32,1,5,5,16""}}" -30272,0,32,Default/network-WithLossCell/_backbone-LeNet5/conv1-Conv2d/Conv2D-op13,Conv2D-op13,Conv2D,Default,"{""input_0"": {""format"": ""NC1HWC0"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""32,1,32,32,16""}, ""input_1"": {""format"": ""FracZ"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""25,1,16,16""}, ""output_0"": {""format"": ""NC1HWC0"", ""data_type"": ""NUMBER_TYPE_FLOAT16"", ""shape"": ""32,1,28,28,16""}}" -30286,0,1,Default/network-WithLossCell/_backbone-LeNet5/fc3-Dense/MatMul-op9,MatMul-op9,MatMul,Default,"{""input_0"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": ""32,120""}, ""input_1"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": ""84,120""}, ""input_2"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": ""84""}, ""output_0"": {""format"": ""DefaultFormat"", ""data_type"": ""NUMBER_TYPE_FLOAT32"", ""shape"": ""32,84""}}" diff --git a/tests/ut/profiler/analyser/resource/minddata_pipeline_raw_0.csv b/tests/ut/profiler/analyser/resource/minddata_pipeline_raw_0.csv new file mode 100644 index 0000000000000000000000000000000000000000..6a0538ea73d543e3b9f60b273b628cc2ea6cc1e6 --- /dev/null +++ b/tests/ut/profiler/analyser/resource/minddata_pipeline_raw_0.csv @@ -0,0 +1,6 @@ +op_id,op_type,num_workers,output_queue_size,output_queue_average_size,output_queue_length,output_queue_usage_rate,sample_interval,parent_id,children_id +0,Batch,4,,,,,10,,[4] +4,Shuffle,1,"[10, 10, 10, 10]",10.0,64,0.15625,10,0,[1] +1,Shuffle,1,"[60, 60, 60, 60]",60.0,64,0.9375,10,4,"[2, 3]" +2,TFReader,4,"[10, 20, 30, 20]",20.0,64,0.3125,10,1, +3,TFReader,4,"[10, 20, 30, 20]",20.0,64,0.3125,10,1, diff --git a/tests/ut/profiler/analyser/resource/minddata_pipeline_raw_1.csv b/tests/ut/profiler/analyser/resource/minddata_pipeline_raw_1.csv new file mode 100644 index 0000000000000000000000000000000000000000..169576787dfdb852a3d7f9950f495cd6d79c1df2 --- /dev/null +++ b/tests/ut/profiler/analyser/resource/minddata_pipeline_raw_1.csv @@ -0,0 +1,6 @@ +op_id,op_type,num_workers,output_queue_size,output_queue_average_size,output_queue_length,output_queue_usage_rate,sample_interval,parent_id,children_id +0,Batch,4,,,,,10,,[4] +4,Shuffle,1,"[10, 20, 30, 20]",20.0,64,0.3125,10,0,[1] +1,Shuffle,1,"[10, 10, 10, 10]",10.0,64,0.15625,10,4,"[2, 3]" +2,TFReader,4,"[60, 60, 60, 60]",60.0,64,0.9375,10,1, +3,TFReader,4,"[60, 60, 60, 60]",60.0,64,0.9375,10,1, diff --git a/tests/ut/profiler/analyser/resource/minddata_pipeline_raw_2.csv b/tests/ut/profiler/analyser/resource/minddata_pipeline_raw_2.csv new file mode 100644 index 0000000000000000000000000000000000000000..a2efca5e067069b17115362623e14a02df17d08b --- /dev/null +++ b/tests/ut/profiler/analyser/resource/minddata_pipeline_raw_2.csv @@ -0,0 +1,6 @@ +op_id,op_type,num_workers,output_queue_size,output_queue_average_size,output_queue_length,output_queue_usage_rate,sample_interval,parent_id,children_id +0,Batch,4,,,,,10,,[4] +4,Shuffle,1,"[60, 60, 60, 60]",60.0,64,0.9375,10,0,[1] +1,Shuffle,1,"[60, 60, 60, 60]",60.0,64,0.9375,10,4,"[2, 3]" +2,TFReader,4,"[60, 60, 60, 60]",60.0,64,0.9375,10,1, +3,TFReader,4,"[60, 60, 60, 60]",60.0,64,0.9375,10,1, diff --git a/tests/ut/profiler/analyser/resource/minddata_pipeline_raw_3.csv b/tests/ut/profiler/analyser/resource/minddata_pipeline_raw_3.csv new file mode 100644 index 0000000000000000000000000000000000000000..57654587639bbc7dbf236f8a5ba5fdaf2ef3f1df --- /dev/null +++ b/tests/ut/profiler/analyser/resource/minddata_pipeline_raw_3.csv @@ -0,0 +1,6 @@ +op_id,op_type,num_workers,output_queue_size,output_queue_average_size,output_queue_length,output_queue_usage_rate,sample_interval,parent_id,children_id +0,Batch,4,,,,,10,,[4] +4,Shuffle,1,"[10, 10, 10, 10]",10.0,64,0.15625,10,0,[1] +1,Shuffle,1,"[10, 10, 10, 10]",10.0,64,0.15625,10,4,"[2, 3]" +2,TFReader,4,"[10, 10, 10, 10]",10.0,64,0.15625,10,1, +3,TFReader,4,"[10, 10, 10, 10]",10.0,64,0.15625,10,1, diff --git a/tests/ut/profiler/analyser/test_analyser_aicore_detail.py b/tests/ut/profiler/analyser/test_analyser_aicore_detail.py index 5b385fdc1f5299886f1deae87ec155a1ee905a3e..6b50fd5ce103f7228486e53694453dbc75098b88 100644 --- a/tests/ut/profiler/analyser/test_analyser_aicore_detail.py +++ b/tests/ut/profiler/analyser/test_analyser_aicore_detail.py @@ -18,10 +18,12 @@ import json import os from unittest import TestCase -from mindinsight.profiler.analyser.analyser import AicoreDetailAnalyser from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory from tests.ut.profiler import PROFILER_DIR +COL_NAMES = ['op_name', 'op_type', 'execution_time', 'subgraph', 'full_op_name', + 'op_info'] + def get_detail_infos(indexes=None, sort_name=None, sort_type=True): """ @@ -56,7 +58,7 @@ def get_detail_infos(indexes=None, sort_name=None, sort_type=True): result = cache if sort_name: - sort_index = AicoreDetailAnalyser.__col_names__.index(sort_name) + sort_index = COL_NAMES.index(sort_name) result.sort(key=lambda item: item[sort_index], reverse=sort_type) return result @@ -73,7 +75,7 @@ class TestAicoreDetailAnalyser(TestCase): def test_query_success_1(self): """Test the success of the querying function.""" expect_result = { - 'col_name': AicoreDetailAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_detail_infos(), 'size': 10 } @@ -86,7 +88,7 @@ class TestAicoreDetailAnalyser(TestCase): def test_query_success_2(self): """Test the success of the querying function.""" expect_result = { - 'col_name': AicoreDetailAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_detail_infos(indexes=[9]), 'size': 1 } @@ -123,7 +125,7 @@ class TestAicoreDetailAnalyser(TestCase): def test_query_success_3(self): """Test the success of the querying function.""" expect_result = { - 'col_name': AicoreDetailAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_detail_infos(sort_name='execution_time', sort_type=True), 'size': 10 } @@ -137,7 +139,7 @@ class TestAicoreDetailAnalyser(TestCase): self.assertDictEqual(expect_result, result) expect_result = { - 'col_name': AicoreDetailAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_detail_infos(sort_name='op_name', sort_type=False), 'size': 10 } @@ -153,7 +155,7 @@ class TestAicoreDetailAnalyser(TestCase): def test_query_success_4(self): """Test the success of the querying function.""" expect_result = { - 'col_name': AicoreDetailAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_detail_infos(indexes=[2, 3]), 'size': 10 } @@ -167,7 +169,7 @@ class TestAicoreDetailAnalyser(TestCase): self.assertDictEqual(expect_result, result) expect_result = { - 'col_name': AicoreDetailAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': [], 'size': 10 } @@ -183,7 +185,7 @@ class TestAicoreDetailAnalyser(TestCase): def test_query_success_5(self): """Test the success of the querying function.""" expect_result = { - 'col_name': AicoreDetailAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_detail_infos( indexes=[1, 2], sort_name='execution_time', sort_type=True ), @@ -207,7 +209,7 @@ class TestAicoreDetailAnalyser(TestCase): self.assertDictEqual(expect_result, result) expect_result = { - 'col_name': AicoreDetailAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_detail_infos( indexes=[0, 1, 2, 8], sort_name='execution_time', sort_type=True ), @@ -234,7 +236,7 @@ class TestAicoreDetailAnalyser(TestCase): detail_infos = get_detail_infos(indexes=[9]) expect_result = { - 'col_name': AicoreDetailAnalyser.__col_names__[0:5], + 'col_name': COL_NAMES[0:5], 'object': [item[0:5] for item in detail_infos], 'size': 1 } @@ -250,7 +252,7 @@ class TestAicoreDetailAnalyser(TestCase): self.assertDictEqual(expect_result, result) expect_result = { - 'col_name': AicoreDetailAnalyser.__col_names__[0:4], + 'col_name': COL_NAMES[0:4], 'object': [item[0:4] for item in detail_infos], 'size': 1 } @@ -270,7 +272,7 @@ class TestAicoreDetailAnalyser(TestCase): """Test the success of the querying and sorting function by operator type.""" detail_infos = get_detail_infos(indexes=[9, 0, 2, 1, 5, 3, 4]) expect_result = { - 'col_name': AicoreDetailAnalyser.__col_names__[0:4], + 'col_name': COL_NAMES[0:4], 'object': [item[0:4] for item in detail_infos] } @@ -292,7 +294,7 @@ class TestAicoreDetailAnalyser(TestCase): """Test the success of the querying and sorting function by operator type.""" detail_infos = get_detail_infos(indexes=[9, 0, 2, 1, 3, 4, 8, 6]) expect_result = { - 'col_name': AicoreDetailAnalyser.__col_names__[0:4], + 'col_name': COL_NAMES[0:4], 'object': [item[0:4] for item in detail_infos] } @@ -310,14 +312,11 @@ class TestAicoreDetailAnalyser(TestCase): result = self._analyser.query_and_sort_by_op_type( filter_condition, op_type_order ) - print(result) self.assertDictEqual(expect_result, result) def test_col_names(self): """Test the querying column names function.""" - self.assertListEqual( - AicoreDetailAnalyser.__col_names__, self._analyser.col_names - ) + self.assertListEqual(COL_NAMES, self._analyser.col_names) def test_data(self): """Test the querying data function.""" diff --git a/tests/ut/profiler/analyser/test_analyser_aicore_type.py b/tests/ut/profiler/analyser/test_analyser_aicore_type.py index f0764c8f6ef77aa3626cbce302f6fada82befb65..76caa12e5d7f582663c0dedcefa84e9a716dfc7a 100644 --- a/tests/ut/profiler/analyser/test_analyser_aicore_type.py +++ b/tests/ut/profiler/analyser/test_analyser_aicore_type.py @@ -17,10 +17,11 @@ import csv import os from unittest import TestCase -from mindinsight.profiler.analyser.analyser import AicoreTypeAnalyser from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory from tests.ut.profiler import PROFILER_DIR +COL_NAMES = ['op_type', 'execution_time', 'execution_frequency', 'percent'] + def get_type_infos(indexes=None, sort_name=None, sort_type=True): """ @@ -38,8 +39,8 @@ def get_type_infos(indexes=None, sort_name=None, sort_type=True): PROFILER_DIR, 'aicore_intermediate_1_type.csv' ) - with open(aicore_type_path, 'r') as aicore_type_path: - csv_reader = csv.reader(aicore_type_path) + with open(aicore_type_path, 'r') as file: + csv_reader = csv.reader(file) _ = next(csv_reader) cache = [] for type_info in csv_reader: @@ -54,7 +55,7 @@ def get_type_infos(indexes=None, sort_name=None, sort_type=True): result = cache if sort_name: - sort_index = AicoreTypeAnalyser.__col_names__.index(sort_name) + sort_index = COL_NAMES.index(sort_name) result.sort(key=lambda item: item[sort_index], reverse=sort_type) return result @@ -71,7 +72,7 @@ class TestAicoreTypeAnalyser(TestCase): def test_query_success_1(self): """Test the success of the querying function.""" expect_result = { - 'col_name': AicoreTypeAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_type_infos(), 'size': 5 } @@ -85,7 +86,7 @@ class TestAicoreTypeAnalyser(TestCase): def test_query_success_2(self): """Test the success of the querying function.""" expect_result = { - 'col_name': AicoreTypeAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_type_infos(indexes=[1]), 'size': 1 } @@ -100,7 +101,7 @@ class TestAicoreTypeAnalyser(TestCase): self.assertDictEqual(expect_result, result) expect_result = { - 'col_name': AicoreTypeAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_type_infos(indexes=[0, 2, 3, 4]), 'size': 4 } @@ -115,7 +116,7 @@ class TestAicoreTypeAnalyser(TestCase): self.assertDictEqual(expect_result, result) expect_result = { - 'col_name': AicoreTypeAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_type_infos(indexes=[0, 1, 3]), 'size': 3 } @@ -132,7 +133,7 @@ class TestAicoreTypeAnalyser(TestCase): def test_query_success_3(self): """Test the success of the querying function.""" expect_result = { - 'col_name': AicoreTypeAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_type_infos(indexes=[1, 3]), 'size': 2 } @@ -147,7 +148,7 @@ class TestAicoreTypeAnalyser(TestCase): self.assertDictEqual(expect_result, result) expect_result = { - 'col_name': AicoreTypeAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_type_infos(indexes=[0, 2, 4]), 'size': 3 } @@ -162,7 +163,7 @@ class TestAicoreTypeAnalyser(TestCase): self.assertDictEqual(expect_result, result) expect_result = { - 'col_name': AicoreTypeAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_type_infos(indexes=[2, 3]), 'size': 2 } @@ -179,7 +180,7 @@ class TestAicoreTypeAnalyser(TestCase): def test_query_success_4(self): """Test the success of the querying function.""" expect_result = { - 'col_name': AicoreTypeAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_type_infos(sort_name='op_type', sort_type=True), 'size': 5} condition = { @@ -192,7 +193,7 @@ class TestAicoreTypeAnalyser(TestCase): self.assertDictEqual(expect_result, result) expect_result = { - 'col_name': AicoreTypeAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_type_infos(sort_name='execution_time', sort_type=False), 'size': 5 } @@ -208,7 +209,7 @@ class TestAicoreTypeAnalyser(TestCase): def test_query_success_5(self): """Test the success of the querying function.""" expect_result = { - 'col_name': AicoreTypeAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_type_infos(indexes=[0, 1]), 'size': 5 } @@ -222,7 +223,7 @@ class TestAicoreTypeAnalyser(TestCase): self.assertDictEqual(expect_result, result) expect_result = { - 'col_name': AicoreTypeAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_type_infos(indexes=[3, 4]), 'size': 5 } @@ -238,7 +239,7 @@ class TestAicoreTypeAnalyser(TestCase): def test_query_success_6(self): """Test the success of the querying function.""" expect_result = { - 'col_name': AicoreTypeAnalyser.__col_names__, + 'col_name': COL_NAMES, 'object': get_type_infos( indexes=[1, 3], sort_name='execution_time', sort_type=True ), @@ -263,9 +264,7 @@ class TestAicoreTypeAnalyser(TestCase): def test_col_names(self): """Test the querying column names function.""" - self.assertListEqual( - AicoreTypeAnalyser.__col_names__, self._analyser.col_names - ) + self.assertListEqual(COL_NAMES, self._analyser.col_names) def test_data(self): """Test the querying data function.""" diff --git a/tests/ut/profiler/analyser/test_minddata_pipeline_analyser.py b/tests/ut/profiler/analyser/test_minddata_pipeline_analyser.py new file mode 100644 index 0000000000000000000000000000000000000000..5f022acdbbe6edf9fc9aec8a148069461d38af81 --- /dev/null +++ b/tests/ut/profiler/analyser/test_minddata_pipeline_analyser.py @@ -0,0 +1,360 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""Test the minddata pipeline analyser module.""" +import csv +import json +import os + +import pytest + +from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory +from mindinsight.profiler.common.exceptions.exceptions import \ + ProfilerPipelineOpNotExistException +from tests.ut.profiler import PROFILER_DIR + +COL_NAMES = ['op_id', 'op_type', 'num_workers', 'output_queue_size', + 'output_queue_average_size', 'output_queue_length', + 'output_queue_usage_rate', 'sample_interval', 'parent_id', + 'children_id'] + + +def get_pipeline_infos(pipeline_path, indexes=None, sort_name=None, + sort_type=True): + """ + Get minddata pipeline operator and queue information. + + Args: + pipeline_path (str): The parsed minddata pipeline file path. + indexes (list[int]): The operator indexes. Default: None. + sort_name (str): The sort name. Default: None. + sort_type (bool): The sort type. If the parameter is `True`, the results + are sorted in descending order, else `False`. Default: True. + Returns: + list[list], the minddata pipeline operator and queue information. + """ + with open(pipeline_path, 'r') as file: + csv_reader = csv.reader(file) + _ = next(csv_reader) + cache = [] + for row in csv_reader: + cache.append( + [ + int(row[0]), + row[1], + int(row[2]), + json.loads(row[3]) if row[3] else None, + float(row[4]) if row[4] else None, + int(row[5]) if row[5] else None, + float(row[6]) if row[6] else None, + int(row[7]), + int(row[8]) if row[8] else None, + json.loads(row[9]) if row[9] else None + ] + ) + + if indexes: + result = [cache[index] for index in indexes] + else: + result = cache + + if sort_name: + sort_index = COL_NAMES.index(sort_name) + result.sort(key=lambda item: item[sort_index], reverse=sort_type) + + return result + + +def get_simple_row_info(row): + """ + Get simple minddata pipeline row information. + + Args: + row (list[str, int, float]): The minddata pipeline row information. + + Returns: + list[str, int, float], the simple minddata pipeline row information. + """ + simple_info = row[0:2] + simple_info.extend(row[4:]) + return simple_info + + +class TestMinddataPipelineAnalyser: + """Test the class of `MinddataPipelineAnalyser`.""" + def setup_method(self): + """Initialization before test case execution.""" + self._analyser = AnalyserFactory.instance().get_analyser( + 'minddata_pipeline', PROFILER_DIR, '0' + ) + self._pipeline_path = os.path.join( + PROFILER_DIR, 'minddata_pipeline_raw_0.csv' + ) + + def test_query_success_1(self): + """Test the success of the querying function.""" + detail_infos = get_pipeline_infos(self._pipeline_path) + col_name = get_simple_row_info(COL_NAMES) + expect_result = { + 'col_name': col_name, + 'object': [get_simple_row_info(item) for item in detail_infos], + 'size': 4 + } + result = self._analyser.query({}) + assert expect_result == result + + result = self._analyser.query() + assert expect_result, result + + def test_query_success_2(self): + """Test the success of the querying function.""" + detail_infos = get_pipeline_infos(self._pipeline_path, indexes=[0]) + col_name = get_simple_row_info(COL_NAMES) + expect_result = { + 'col_name': col_name, + 'object': [get_simple_row_info(item) for item in detail_infos], + 'size': 1 + } + condition = { + 'filter_condition': { + 'op_id': { + 'in': [0] + } + } + } + result = self._analyser.query(condition) + assert expect_result == result + + detail_infos = get_pipeline_infos(self._pipeline_path, indexes=[0, 1]) + expect_result = { + 'col_name': col_name, + 'object': [get_simple_row_info(item) for item in detail_infos], + 'size': 2 + } + condition = { + 'filter_condition': { + 'op_type': { + 'not_in': ['TFReader'] + } + } + } + result = self._analyser.query(condition) + assert expect_result == result + + detail_infos = get_pipeline_infos(self._pipeline_path, indexes=[2, 3]) + expect_result = { + 'col_name': col_name, + 'object': [get_simple_row_info(item) for item in detail_infos], + 'size': 2 + } + condition = { + 'filter_condition': { + 'op_type': { + 'partial_match_str_in': ['TF'] + } + } + } + result = self._analyser.query(condition) + assert expect_result, result + + def test_query_success_3(self): + """Test the success of the querying function.""" + expect_result = { + 'col_name': COL_NAMES, + 'object': get_pipeline_infos(self._pipeline_path), + 'size': 4 + } + condition = { + 'filter_condition': { + 'is_display_op_detail': True + } + } + result = self._analyser.query(condition) + assert expect_result == result + + def test_query_success_4(self): + """Test the success of the querying function.""" + expect_result = { + 'col_name': COL_NAMES, + 'object': [], + 'size': 0 + } + condition = { + 'filter_condition': { + 'threshold': [0.8, 0.2], + 'is_display_op_detail': True + } + } + result = self._analyser.query(condition) + assert expect_result == result + + def test_query_success_5(self): + """ + Test the success of the querying function. + + The upstream queue utilization of the operator is greater than + the highest threshold, and the downstream queue utilization of + the operator is lower than the lowest threshold. + """ + profiling_dir = os.path.join(os.path.dirname(__file__), 'resource') + pipeline_path = os.path.join( + profiling_dir, 'minddata_pipeline_raw_0.csv' + ) + analyser = AnalyserFactory.instance().get_analyser( + 'minddata_pipeline', profiling_dir, '0' + ) + + expect_result = { + 'col_name': COL_NAMES, + 'object': get_pipeline_infos(pipeline_path, [1]), + 'size': 1 + } + condition = { + 'filter_condition': { + 'threshold': [0.8, 0.2], + 'is_display_op_detail': True + } + } + result = analyser.query(condition) + assert expect_result == result + + def test_query_success_6(self): + """ + Test the success of the querying function. + + The upstream queue utilization of the operator is greater than + the highest threshold, and the downstream queue utilization of + the operator is lower than the lowest threshold. + """ + profiling_dir = os.path.join(os.path.dirname(__file__), 'resource') + pipeline_path = os.path.join( + profiling_dir, 'minddata_pipeline_raw_1.csv' + ) + analyser = AnalyserFactory.instance().get_analyser( + 'minddata_pipeline', profiling_dir, '1' + ) + + expect_result = { + 'col_name': COL_NAMES, + 'object': get_pipeline_infos(pipeline_path, [2]), + 'size': 1 + } + condition = { + 'filter_condition': { + 'threshold': [0.8, 0.2], + 'is_display_op_detail': True + } + } + result = analyser.query(condition) + assert expect_result == result + + def test_query_success_7(self): + """ + Test the success of the querying function. + + All queues utilization are greater than the highest threshold. + """ + profiling_dir = os.path.join(os.path.dirname(__file__), 'resource') + pipeline_path = os.path.join( + profiling_dir, 'minddata_pipeline_raw_2.csv' + ) + analyser = AnalyserFactory.instance().get_analyser( + 'minddata_pipeline', profiling_dir, '2' + ) + + expect_result = { + 'col_name': COL_NAMES, + 'object': get_pipeline_infos(pipeline_path, [0]), + 'size': 1 + } + condition = { + 'filter_condition': { + 'threshold': [0.8, 0.2], + 'is_display_op_detail': True + } + } + result = analyser.query(condition) + assert expect_result == result + + def test_query_success_8(self): + """ + Test the success of the querying function. + + All queues utilization are lower than the lowest threshold. + """ + profiling_dir = os.path.join(os.path.dirname(__file__), 'resource') + pipeline_path = os.path.join( + profiling_dir, 'minddata_pipeline_raw_3.csv' + ) + analyser = AnalyserFactory.instance().get_analyser( + 'minddata_pipeline', profiling_dir, '3' + ) + + expect_result = { + 'col_name': COL_NAMES, + 'object': get_pipeline_infos(pipeline_path, [3, 4]), + 'size': 2 + } + condition = { + 'filter_condition': { + 'threshold': [0.8, 0.2], + 'is_display_op_detail': True + } + } + result = analyser.query(condition) + assert expect_result == result + + def test_get_op_and_parent_op_info_success(self): + """Test the success of the function of querying operator and parent operator.""" + expect_result = { + 'current_op': { + 'op_id': 0, + 'op_type': 'Batch', + 'num_workers': 4 + }, + 'parent_op': None, + 'queue_info': None + } + result = self._analyser.get_op_and_parent_op_info(0) + assert expect_result == result + + expect_result = { + 'current_op': { + 'op_id': 1, + 'op_type': 'Shuffle', + 'num_workers': 1 + }, + 'queue_info': { + 'output_queue_size': [10, 20, 30], + 'output_queue_average_size': 20.0, + 'output_queue_length': 64, + 'output_queue_usage_rate': 0.3125, + 'sample_interval': 10 + }, + 'parent_op': { + 'op_id': 0, + 'op_type': 'Batch', + 'num_workers': 4 + } + } + result = self._analyser.get_op_and_parent_op_info(1) + assert expect_result == result + + def test_get_op_and_parent_op_info_fail(self): + """Test the function of fail to query operator and parent operator.""" + with pytest.raises(ProfilerPipelineOpNotExistException) as exc_info: + self._analyser.get_op_and_parent_op_info(5) + assert exc_info.value.error_code == '50546188' + assert exc_info.value.message == 'The minddata pipeline operator 5 ' \ + 'does not exist.' diff --git a/tests/ut/profiler/common/__init__.py b/tests/ut/profiler/common/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e30774307ca2107b3a81c071ad33c042ef924790 --- /dev/null +++ b/tests/ut/profiler/common/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ diff --git a/tests/ut/profiler/common/validator/__init__.py b/tests/ut/profiler/common/validator/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e30774307ca2107b3a81c071ad33c042ef924790 --- /dev/null +++ b/tests/ut/profiler/common/validator/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ diff --git a/tests/ut/profiler/common/validator/test_validator.py b/tests/ut/profiler/common/validator/test_validator.py new file mode 100644 index 0000000000000000000000000000000000000000..571a0a1b676d45b6587d078daa5c4264bbf19aa6 --- /dev/null +++ b/tests/ut/profiler/common/validator/test_validator.py @@ -0,0 +1,208 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""Test the validator module.""" +import pytest + +from mindinsight.profiler.common.exceptions.exceptions import \ + ProfilerParamTypeErrorException, ProfilerDeviceIdException, \ + ProfilerGroupConditionException, ProfilerSortConditionException, \ + ProfilerFilterConditionException +from mindinsight.profiler.common.validator.validate import \ + validate_minddata_pipeline_condition + + +class TestMinddataPipelineCondition: + """Test the method of `validate_minddata_pipeline_condition`.""" + def test_validate_minddata_pipeline_condition_1(self): + """Test the method exceptions.""" + with pytest.raises(ProfilerParamTypeErrorException) as exc_info: + validate_minddata_pipeline_condition([]) + assert exc_info.value.error_code == '50546082' + assert exc_info.value.message == 'Param type error. Invalid condition ' \ + 'type, it should be dict.' + + def test_validate_minddata_pipeline_condition_2(self): + """Test the method exceptions.""" + condition = {'device_id': 0} + with pytest.raises(ProfilerDeviceIdException) as exc_info: + validate_minddata_pipeline_condition(condition) + assert exc_info.value.error_code == '50546182' + assert exc_info.value.message == 'The device_id in search_condition error, ' \ + 'Invalid device_id type, it should be str.' + + def test_validate_minddata_pipeline_condition_3(self): + """Test the method exceptions.""" + condition = {'group_condition': 0} + with pytest.raises(ProfilerGroupConditionException) as exc_info: + validate_minddata_pipeline_condition(condition) + assert exc_info.value.error_code == '50546184' + assert exc_info.value.message == 'The group_condition in search_condition error, ' \ + 'The group condition must be dict.' + + condition = { + 'group_condition': { + 'limit': '1' + } + } + with pytest.raises(ProfilerGroupConditionException) as exc_info: + validate_minddata_pipeline_condition(condition) + assert exc_info.value.error_code == '50546184' + assert exc_info.value.message == 'The group_condition in search_condition error, ' \ + 'The limit must be int.' + + condition = { + 'group_condition': { + 'limit': '1' + } + } + with pytest.raises(ProfilerGroupConditionException) as exc_info: + validate_minddata_pipeline_condition(condition) + assert exc_info.value.error_code == '50546184' + assert exc_info.value.message == 'The group_condition in search_condition error, ' \ + 'The limit must be int.' + + condition = { + 'group_condition': { + 'limit': 0 + } + } + with pytest.raises(ProfilerGroupConditionException) as exc_info: + validate_minddata_pipeline_condition(condition) + assert exc_info.value.error_code == '50546184' + assert exc_info.value.message == 'The group_condition in search_condition error, ' \ + 'The limit must in [1, 100].' + + condition = { + 'group_condition': { + 'offset': '0' + } + } + with pytest.raises(ProfilerGroupConditionException) as exc_info: + validate_minddata_pipeline_condition(condition) + assert exc_info.value.error_code == '50546184' + assert exc_info.value.message == 'The group_condition in search_condition error, ' \ + 'The offset must be int.' + + condition = { + 'group_condition': { + 'offset': 1000001 + } + } + with pytest.raises(ProfilerGroupConditionException) as exc_info: + validate_minddata_pipeline_condition(condition) + assert exc_info.value.error_code == '50546184' + assert exc_info.value.message == 'The group_condition in search_condition error, ' \ + 'The offset must le 1000000.' + + def test_validate_minddata_pipeline_condition_4(self): + """Test the method exceptions.""" + condition = {'sort_condition': 0} + with pytest.raises(ProfilerSortConditionException) as exc_info: + validate_minddata_pipeline_condition(condition) + assert exc_info.value.error_code == '50546185' + assert exc_info.value.message == 'The sort_condition in search_condition error, ' \ + 'The sort condition must be dict.' + + condition = { + 'sort_condition': { + 'name': 0 + } + } + with pytest.raises(ProfilerSortConditionException) as exc_info: + validate_minddata_pipeline_condition(condition) + assert exc_info.value.error_code == '50546185' + assert exc_info.value.message == 'The sort_condition in search_condition error, ' \ + 'Wrong sorted name type.' + + condition = { + 'sort_condition': { + 'name': 'xxx' + } + } + with pytest.raises(ProfilerSortConditionException) as exc_info: + validate_minddata_pipeline_condition(condition) + assert exc_info.value.error_code == '50546185' + assert exc_info.value.message.startswith( + 'The sort_condition in search_condition error, The sorted_name must be in' + ) + + condition = { + 'sort_condition': { + 'name': 'output_queue_usage_rate', + 'type': 'xxx' + } + } + with pytest.raises(ProfilerSortConditionException) as exc_info: + validate_minddata_pipeline_condition(condition) + assert exc_info.value.error_code == '50546185' + assert exc_info.value.message == 'The sort_condition in search_condition error, ' \ + 'The sorted type must be ascending or descending.' + + def test_validate_minddata_pipeline_condition_5(self): + """Test the method exceptions.""" + condition = { + 'filter_condition': '0' + } + with pytest.raises(ProfilerFilterConditionException) as exc_info: + validate_minddata_pipeline_condition(condition) + assert exc_info.value.error_code == '50546186' + assert exc_info.value.message == 'The filter_condition in search_condition error, ' \ + 'The filter condition must be dict.' + + condition = { + 'filter_condition': { + 'xxx': 0 + } + } + with pytest.raises(ProfilerFilterConditionException) as exc_info: + validate_minddata_pipeline_condition(condition) + assert exc_info.value.error_code == '50546186' + assert exc_info.value.message == 'The filter_condition in search_condition error, ' \ + 'The key xxx of filter_condition is not support.' + + condition = { + 'filter_condition': { + 'is_display_op_detail': 0 + } + } + with pytest.raises(ProfilerFilterConditionException) as exc_info: + validate_minddata_pipeline_condition(condition) + assert exc_info.value.error_code == '50546186' + assert exc_info.value.message == 'The filter_condition in search_condition error, ' \ + 'The condition must be bool.' + + condition = { + 'filter_condition': { + 'op_id': 0 + } + } + with pytest.raises(ProfilerFilterConditionException) as exc_info: + validate_minddata_pipeline_condition(condition) + assert exc_info.value.error_code == '50546186' + assert exc_info.value.message == 'The filter_condition in search_condition error, ' \ + 'The filter condition value must be dict.' + + condition = { + 'filter_condition': { + 'op_id': { + 'in': ['0'] + } + } + } + with pytest.raises(ProfilerFilterConditionException) as exc_info: + validate_minddata_pipeline_condition(condition) + assert exc_info.value.error_code == '50546186' + assert exc_info.value.message == 'The filter_condition in search_condition error, ' \ + 'The item in filter value must be int.' diff --git a/tests/ut/profiler/parser/test_framework_parser.py b/tests/ut/profiler/parser/test_framework_parser.py index f276d0f1d7578fb21974b4ef9c61828b52090944..32d8ea6860e22f6aa3ad66f78a5e9308d5144adb 100644 --- a/tests/ut/profiler/parser/test_framework_parser.py +++ b/tests/ut/profiler/parser/test_framework_parser.py @@ -51,13 +51,11 @@ class TestFrameworkParser: """Test the class of `FrameworkParser`.""" def setup_method(self): """Initialization before test case execution.""" - raw_dir = RAW_DATA_BASE - FrameworkParser._raw_data_dir = raw_dir + FrameworkParser._raw_data_dir = RAW_DATA_BASE self._output_path_1 = tempfile.mkdtemp(prefix='test_framework_parser_') self._parser_1 = FrameworkParser('JOB1', '0', self._output_path_1) - os.makedirs(os.path.join(raw_dir, 'JOB2'), exist_ok=True) self._output_path_2 = tempfile.mkdtemp(prefix='test_framework_parser_') self._parser_2 = FrameworkParser('JOB2', '0', self._output_path_2) @@ -84,8 +82,8 @@ class TestFrameworkParser: '51522': 'Default/network-WithLossCell/_backbone-ResNet/' 'layer1-SequentialCell/0-ResidualBlock/conv1-Conv2d/Cast-op28' } - assert self._parser_1.to_task_id_full_op_name_dict(), expect_result - assert self._parser_2.to_task_id_full_op_name_dict(), expect_result + assert expect_result == self._parser_1.to_task_id_full_op_name_dict() + assert expect_result == self._parser_2.to_task_id_full_op_name_dict() def test_parse(self): """Test the parse function.""" @@ -135,6 +133,5 @@ class TestFrameworkParser: with pytest.raises(ProfilerFileNotFoundException) as exc_info: FrameworkParser('JOB1', '0') - print(exc_info.value) assert exc_info.value.error_code == '50546084' assert exc_info.value.message == 'The file not found.' diff --git a/tests/ut/profiler/parser/test_minddata_pipeline_parser.py b/tests/ut/profiler/parser/test_minddata_pipeline_parser.py new file mode 100644 index 0000000000000000000000000000000000000000..337b6cfe1f540c8596ef52036c8d15f80afa4efd --- /dev/null +++ b/tests/ut/profiler/parser/test_minddata_pipeline_parser.py @@ -0,0 +1,93 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""Test the minddata pipeline parser module.""" +import csv +import os +import shutil +import tempfile + +from mindinsight.profiler.parser.minddata_pipeline_parser import \ + MinddataPipelineParser +from tests.ut.profiler import PROFILER_DIR, RAW_DATA, RAW_DATA_JOB2 + + +def get_minddata_pipeline_result(file_path): + """ + Get minddata pipeline result from the minddata pipeline file. + + Args: + file_path (str): The minddata pipeline file path. + + Returns: + list[list], the parsed minddata pipeline information. + """ + result = [] + with open(file_path, 'r') as file: + csv_reader = csv.reader(file) + for row in csv_reader: + result.append(row) + return result + + +class TestMinddataPipelineParser: + """Test the class of `MinddataPipelineParser`.""" + def setup_method(self): + """Initialization before test case execution.""" + self._output_path_1 = tempfile.mkdtemp( + prefix='test_minddata_pipeline_parser_' + ) + self._parser_1 = MinddataPipelineParser( + RAW_DATA, '0', self._output_path_1 + ) + + self._output_path_2 = tempfile.mkdtemp( + prefix='test_minddata_pipeline_parser_' + ) + self._parser_2 = MinddataPipelineParser( + RAW_DATA_JOB2, '0', self._output_path_2 + ) + + def teardown_method(self) -> None: + """Clear up after test case execution.""" + shutil.rmtree(self._output_path_1) + shutil.rmtree(self._output_path_2) + + def test_save_path(self): + """Test the querying save path function.""" + expect_result = os.path.join( + self._output_path_1, 'minddata_pipeline_raw_0.csv' + ) + assert expect_result == self._parser_1.save_path + + def test_parse(self): + """Test the parse function.""" + expect_pipeline_file = os.path.join( + PROFILER_DIR, 'minddata_pipeline_raw_0.csv' + ) + expect_result = get_minddata_pipeline_result(expect_pipeline_file) + + self._parser_1.parse() + pipeline_file = os.path.join( + self._output_path_1, 'minddata_pipeline_raw_0.csv' + ) + result = get_minddata_pipeline_result(pipeline_file) + assert expect_result == result + + self._parser_2.parse() + pipeline_file = os.path.join( + self._output_path_2, 'minddata_pipeline_raw_0.csv' + ) + result = get_minddata_pipeline_result(pipeline_file) + assert expect_result == result diff --git a/tests/utils/resource/JOB1/pipeline_profiling_0.json b/tests/utils/resource/JOB1/pipeline_profiling_0.json new file mode 100644 index 0000000000000000000000000000000000000000..2f5fdca724c94b2b6d23fd0492292419bc2671db --- /dev/null +++ b/tests/utils/resource/JOB1/pipeline_profiling_0.json @@ -0,0 +1,55 @@ +{ + "sampling_interval": 10, + "op_info": [ + { + "op_id": 4, + "op_type": "TFReader", + "num_workers": 4, + "metrics": null, + "children": [3] + }, + { + "op_id": 3, + "op_type": "TFReader", + "num_workers": 4, + "metrics": { + "output_queue": { + "size": [10, 20, 30], + "length": 64 + } + }, + "children": null + }, + { + "op_id": 2, + "op_type": "TFReader", + "num_workers": 4, + "metrics": { + "output_queue": { + "size": [10, 20, 30], + "length": 64 + } + }, + "children": null + }, + { + "op_id": 1, + "op_type": "Shuffle", + "num_workers": 1, + "metrics": { + "output_queue": { + "size": [10, 20, 30], + "length": 64 + } + }, + "children": [2, 4] + }, + { + "op_id": 0, + "op_type": "Batch", + "num_workers": 4, + "metrics": null, + "children": [1] + } + ] +} \ No newline at end of file diff --git a/tests/utils/resource/JOB2/pipeline_profiling_0.json b/tests/utils/resource/JOB2/pipeline_profiling_0.json new file mode 100644 index 0000000000000000000000000000000000000000..1357a35c83da88754aaf147042036b282299cd7a --- /dev/null +++ b/tests/utils/resource/JOB2/pipeline_profiling_0.json @@ -0,0 +1,48 @@ +{ + "sampling_interval": 10, + "op_info": [ + { + "op_id": 3, + "op_type": "TFReader", + "num_workers": 4, + "metrics": { + "output_queue": { + "size": [10, 20, 30], + "length": 64 + } + }, + "children": null + }, + { + "op_id": 2, + "op_type": "TFReader", + "num_workers": 4, + "metrics": { + "output_queue": { + "size": [10, 20, 30], + "length": 64 + } + }, + "children": null + }, + { + "op_id": 1, + "op_type": "Shuffle", + "num_workers": 1, + "metrics": { + "output_queue": { + "size": [10, 20, 30], + "length": 64 + } + }, + "children": [2, 3] + }, + { + "op_id": 0, + "op_type": "Batch", + "num_workers": 4, + "metrics": null, + "children": [1] + } + ] +} \ No newline at end of file diff --git a/tests/utils/resource/profiler/minddata_pipeline_raw_0.csv b/tests/utils/resource/profiler/minddata_pipeline_raw_0.csv new file mode 100644 index 0000000000000000000000000000000000000000..57f8ddaf6875af2f3c275d576b805983d8ee2b23 --- /dev/null +++ b/tests/utils/resource/profiler/minddata_pipeline_raw_0.csv @@ -0,0 +1,5 @@ +op_id,op_type,num_workers,output_queue_size,output_queue_average_size,output_queue_length,output_queue_usage_rate,sample_interval,parent_id,children_id +0,Batch,4,,,,,10,,[1] +1,Shuffle,1,"[10, 20, 30]",20.0,64,0.3125,10,0,"[2, 3]" +2,TFReader,4,"[10, 20, 30]",20.0,64,0.3125,10,1, +3,TFReader,4,"[10, 20, 30]",20.0,64,0.3125,10,1,