diff --git a/mindinsight/backend/data_manager/__init__.py b/mindinsight/backend/data_manager/__init__.py index ff63fe91abc8caa89d71a233a2399ec34b6e1100..e05a5b53e8f6a6d8b5f3e1f4b1b9fe12da8e0062 100644 --- a/mindinsight/backend/data_manager/__init__.py +++ b/mindinsight/backend/data_manager/__init__.py @@ -14,9 +14,10 @@ # ============================================================================ """Trigger data manager load.""" -from mindinsight.datavisual.data_transform.data_manager import DATA_MANAGER -from mindinsight.datavisual.common.log import logger from mindinsight.conf import settings +from mindinsight.datavisual.common.log import logger +from mindinsight.datavisual.data_transform.data_manager import DATA_MANAGER +from mindinsight.lineagemgr.cache_item_updater import LineageCacheItemUpdater def init_module(app): @@ -29,6 +30,6 @@ def init_module(app): """ # Just to suppress pylint warning about unused arg. logger.debug("App: %s", type(app)) - # Register cache item updater here, before start load data. + DATA_MANAGER.register_brief_cache_item_updater(LineageCacheItemUpdater()) DATA_MANAGER.start_load_data(reload_interval=int(settings.RELOAD_INTERVAL), max_threads_count=int(settings.MAX_THREADS_COUNT)) diff --git a/mindinsight/backend/lineagemgr/lineage_api.py b/mindinsight/backend/lineagemgr/lineage_api.py index 2aa64ad2b88db485c3f4f3679f27db646f6a6af3..7b4846adad20208b188b6c1b8e32ccfd56dc9f8f 100644 --- a/mindinsight/backend/lineagemgr/lineage_api.py +++ b/mindinsight/backend/lineagemgr/lineage_api.py @@ -20,9 +20,10 @@ from flask import Blueprint, jsonify, request from mindinsight.conf import settings from mindinsight.datavisual.utils.tools import get_train_id -from mindinsight.lineagemgr import filter_summary_lineage, get_summary_lineage -from mindinsight.lineagemgr.common.validator.validate import validate_path +from mindinsight.datavisual.data_transform.data_manager import DATA_MANAGER +from mindinsight.lineagemgr.api.model import general_filter_summary_lineage, general_get_summary_lineage from mindinsight.utils.exceptions import MindInsightException, ParamValueError +from mindinsight.lineagemgr.cache_item_updater import update_lineage_object BLUEPRINT = Blueprint("lineage", __name__, url_prefix=settings.URL_PREFIX.rstrip("/")) @@ -68,8 +69,10 @@ def _get_lineage_info(search_condition): """ summary_base_dir = str(settings.SUMMARY_BASE_DIR) try: - lineage_info = filter_summary_lineage( - summary_base_dir, search_condition) + lineage_info = general_filter_summary_lineage( + data_manager=DATA_MANAGER, + search_condition=search_condition, + added=True) lineages = lineage_info['object'] @@ -91,6 +94,30 @@ def _get_lineage_info(search_condition): return lineage_info +@BLUEPRINT.route("/lineagemgr/lineages", methods=["PUT"]) +def update_lineage(): + """ + Get lineage. + + Returns: + str, update the lineage information about cache and tag. + + Raises: + MindInsightException: If method fails to be called. + + Examples: + >>> PUT http://xxxx/v1/mindinsight/lineagemgr/lineages?train_id=./run1 + """ + train_id = get_train_id(request) + added_info = request.json + if not isinstance(added_info, dict): + raise ParamValueError("The request body should be a dict.") + + update_lineage_object(DATA_MANAGER, train_id, added_info) + + return jsonify({"status": "success"}) + + @BLUEPRINT.route("/datasets/dataset_graph", methods=["GET"]) def get_dataset_graph(): """ @@ -109,18 +136,9 @@ def get_dataset_graph(): summary_base_dir = str(settings.SUMMARY_BASE_DIR) summary_dir = get_train_id(request) - if summary_dir.startswith('/'): - validate_path(summary_dir) - elif summary_dir.startswith('./'): - summary_dir = os.path.join(summary_base_dir, summary_dir[2:]) - summary_dir = validate_path(summary_dir) - else: - raise ParamValueError( - "Summary dir should be absolute path or " - "relative path that relate to summary base dir." - ) try: - dataset_graph = get_summary_lineage( + dataset_graph = general_get_summary_lineage( + DATA_MANAGER, summary_dir=summary_dir, keys=['dataset_graph'] ) diff --git a/mindinsight/datavisual/data_transform/data_manager.py b/mindinsight/datavisual/data_transform/data_manager.py index ade1751b4c5a6651d0fae0a5c7bb86af22a7315c..66997633b68b3568878473ac94478383cd65b541 100644 --- a/mindinsight/datavisual/data_transform/data_manager.py +++ b/mindinsight/datavisual/data_transform/data_manager.py @@ -21,10 +21,10 @@ It can read events data through the DataLoader. This module also acts as a thread pool manager. """ import abc +import datetime import enum import threading import time -import datetime import os from typing import Iterable, Optional @@ -76,11 +76,21 @@ class _BasicTrainJob: """Get summary directory path.""" return self._abs_summary_dir + @property + def summary_base_dir(self): + """Get summary base directory path.""" + return self._abs_summary_base_dir + @property def train_id(self): """Get train id.""" return self._train_id + @property + def update_time(self): + """Get update time.""" + return self._update_time + class CachedTrainJob: """ @@ -99,6 +109,7 @@ class CachedTrainJob: self._content = {} self._cache_status = _CacheStatus.NOT_IN_CACHE + self._key_locks = {} @property def cache_status(self): @@ -124,6 +135,11 @@ class CachedTrainJob: """Get summary directory path.""" return self._basic_info.summary_dir + @property + def summary_base_dir(self): + """Get summary base directory path.""" + return self._basic_info.summary_base_dir + def set(self, key, value): """Set value to cache.""" self._content[key] = value @@ -145,6 +161,12 @@ class CachedTrainJob: """Set basic train job info.""" self._basic_info = value + def lock_key(self, key): + """Threading lock with given key.""" + if key not in self._key_locks: + self._key_locks[key] = threading.Lock() + return self._key_locks[key] + class TrainJob: """ @@ -325,6 +347,11 @@ class _BriefCacheManager(_BaseCacheManager): for cache_item in self._cache_items.values(): updater.update_item(cache_item) + @property + def cache_items(self): + """Get cache items.""" + return self._cache_items + # Key for plugin tags. DATAVISUAL_PLUGIN_KEY = "tag_mapping" @@ -740,6 +767,11 @@ class DataManager: self._detail_cache = _DetailCacheManager(loader_generators) self._brief_cache = _BriefCacheManager() + @property + def summary_base_dir(self): + """Get summary base dir.""" + return self._summary_base_dir + def start_load_data(self, reload_interval=settings.RELOAD_INTERVAL, max_threads_count=MAX_DATA_LOADER_SIZE): @@ -955,5 +987,13 @@ class DataManager: """Register brief cache item updater for brief cache manager.""" self._brief_cache.register_cache_item_updater(updater) + def get_brief_cache(self): + """Get brief cache.""" + return self._brief_cache + + def get_brief_train_job(self, train_id): + """Get brief train job.""" + return self._brief_cache.get_train_job(train_id) + DATA_MANAGER = DataManager(settings.SUMMARY_BASE_DIR) diff --git a/mindinsight/lineagemgr/api/model.py b/mindinsight/lineagemgr/api/model.py index cf6e108d6a7b58518fc33ab44fa0da9d21875488..5ab05191883ebd46481b65a251e72536c017a708 100644 --- a/mindinsight/lineagemgr/api/model.py +++ b/mindinsight/lineagemgr/api/model.py @@ -16,15 +16,15 @@ import os from mindinsight.lineagemgr.common.exceptions.exceptions import LineageParamValueError, \ - LineageFileNotFoundError, LineageQuerySummaryDataError, LineageParamSummaryPathError, \ + LineageQuerySummaryDataError, LineageParamSummaryPathError, \ LineageQuerierParamException, LineageDirNotExistError, LineageSearchConditionParamError, \ LineageParamTypeError, LineageSummaryParseException from mindinsight.lineagemgr.common.log import logger as log -from mindinsight.lineagemgr.common.path_parser import SummaryPathParser +from mindinsight.lineagemgr.common.utils import normalize_summary_dir from mindinsight.lineagemgr.common.validator.model_parameter import SearchModelConditionParameter -from mindinsight.lineagemgr.common.validator.validate import validate_filter_key -from mindinsight.lineagemgr.common.validator.validate import validate_search_model_condition, \ - validate_condition, validate_path +from mindinsight.lineagemgr.common.validator.validate import validate_filter_key, validate_search_model_condition, \ + validate_condition, validate_path, validate_train_id +from mindinsight.lineagemgr.lineage_parser import LineageParser, LineageOrganizer from mindinsight.lineagemgr.querier.querier import Querier from mindinsight.utils.exceptions import MindInsightException @@ -58,33 +58,61 @@ def get_summary_lineage(summary_dir, keys=None): >>> summary_lineage_info = get_summary_lineage(summary_dir) >>> hyper_parameters = get_summary_lineage(summary_dir, keys=["hyper_parameters"]) """ - try: - summary_dir = validate_path(summary_dir) - except MindInsightException as error: - log.error(str(error)) - log.exception(error) - raise LineageParamSummaryPathError(str(error.message)) + return general_get_summary_lineage(summary_dir=summary_dir, keys=keys) + + +def general_get_summary_lineage(data_manager=None, summary_dir=None, keys=None): + """ + Get summary lineage from data_manager or parsing from summaries. + + One of data_manager or summary_dir needs to be specified. Support getting + super_lineage_obj from data_manager or parsing summaries by summary_dir. + + Args: + data_manager (DataManager): Data manager defined as + mindinsight.datavisual.data_transform.data_manager.DataManager + summary_dir (str): The summary directory. It contains summary logs for + one training. + keys (list[str]): The filter keys of lineage information. The acceptable + keys are `metric`, `user_defined`, `hyper_parameters`, `algorithm`, + `train_dataset`, `model`, `valid_dataset` and `dataset_graph`. + If it is `None`, all information will be returned. Default: None. + + Returns: + dict, the lineage information for one training. + + Raises: + LineageParamSummaryPathError: If summary path is invalid. + LineageQuerySummaryDataError: If querying summary data fails. + LineageFileNotFoundError: If the summary log file is not found. + + """ + default_result = {} + if data_manager is None and summary_dir is None: + raise LineageParamTypeError("One of data_manager or summary_dir needs to be specified.") if keys is not None: validate_filter_key(keys) - summary_path = SummaryPathParser.get_latest_lineage_summary(summary_dir) - if summary_path is None: - log.error('There is no summary log file under summary_dir.') - raise LineageFileNotFoundError( - 'There is no summary log file under summary_dir.' - ) + if data_manager is None: + normalize_summary_dir(summary_dir) + + super_lineage_obj = None + if os.path.isabs(summary_dir): + super_lineage_obj = LineageParser(summary_dir).super_lineage_obj + elif data_manager is not None: + validate_train_id(summary_dir) + super_lineage_obj = LineageOrganizer(data_manager=data_manager).get_super_lineage_obj(summary_dir) + + if super_lineage_obj is None: + return default_result try: - result = Querier(summary_path).get_summary_lineage( - summary_dir, filter_keys=keys) - except LineageSummaryParseException: - return {} + result = Querier({summary_dir: super_lineage_obj}).get_summary_lineage(summary_dir, keys) except (LineageQuerierParamException, LineageParamTypeError) as error: log.error(str(error)) log.exception(error) raise LineageQuerySummaryDataError("Get summary lineage failed.") - return result[0] @@ -209,12 +237,30 @@ def filter_summary_lineage(summary_base_dir, search_condition=None): >>> summary_lineage = filter_summary_lineage(summary_base_dir) >>> summary_lineage_filter = filter_summary_lineage(summary_base_dir, search_condition) """ - try: - summary_base_dir = validate_path(summary_base_dir) - except (LineageParamValueError, LineageDirNotExistError) as error: - log.error(str(error)) - log.exception(error) - raise LineageParamSummaryPathError(str(error.message)) + return general_filter_summary_lineage(summary_base_dir=summary_base_dir, search_condition=search_condition) + + +def general_filter_summary_lineage(data_manager=None, summary_base_dir=None, search_condition=None, added=False): + """ + Filter summary lineage from data_manager or parsing from summaries. + + One of data_manager or summary_base_dir needs to be specified. Support getting + super_lineage_obj from data_manager or parsing summaries by summary_base_dir. + + Args: + data_manager (DataManager): Data manager defined as + mindinsight.datavisual.data_transform.data_manager.DataManager + summary_base_dir (str): The summary base directory. It contains summary + directories generated by training. + search_condition (dict): The search condition. + """ + if data_manager is None and summary_base_dir is None: + raise LineageParamTypeError("One of data_manager or summary_base_dir needs to be specified.") + + if data_manager is None: + summary_base_dir = normalize_summary_dir(summary_base_dir) + else: + summary_base_dir = data_manager.summary_base_dir search_condition = {} if search_condition is None else search_condition @@ -233,16 +279,11 @@ def filter_summary_lineage(summary_base_dir, search_condition=None): log.exception(error) raise LineageParamSummaryPathError(str(error.message)) - summary_path = SummaryPathParser.get_latest_lineage_summaries(summary_base_dir) - if not summary_path: - log.error('There is no summary log file under summary_base_dir.') - raise LineageFileNotFoundError( - 'There is no summary log file under summary_base_dir.' - ) - try: - result = Querier(summary_path).filter_summary_lineage( - condition=search_condition + lineage_objects = LineageOrganizer(data_manager, summary_base_dir).super_lineage_objs + result = Querier(lineage_objects).filter_summary_lineage( + condition=search_condition, + added=added ) except LineageSummaryParseException: result = {'object': [], 'count': 0} diff --git a/mindinsight/lineagemgr/cache_item_updater.py b/mindinsight/lineagemgr/cache_item_updater.py index 78648353087eae9b77b8030bade1ba13616d13fc..abcea1a8221a2b5e13c21db5b159aa576e55996a 100644 --- a/mindinsight/lineagemgr/cache_item_updater.py +++ b/mindinsight/lineagemgr/cache_item_updater.py @@ -16,8 +16,26 @@ import os from mindinsight.datavisual.data_transform.data_manager import BaseCacheItemUpdater, CachedTrainJob -from mindinsight.lineagemgr.querier.query_model import LineageObj -from mindinsight.lineagemgr.summary.lineage_summary_analyzer import LineageSummaryAnalyzer +from mindinsight.lineagemgr.common.log import logger +from mindinsight.lineagemgr.common.exceptions.exceptions import LineageFileNotFoundError +from mindinsight.lineagemgr.common.validator.validate import validate_train_id +from mindinsight.lineagemgr.lineage_parser import LineageParser, LINEAGE +from mindinsight.utils.exceptions import ParamValueError + + +def update_lineage_object(data_manager, train_id, added_info: dict): + """Update lineage objects about tag and remark.""" + validate_train_id(train_id) + cache_item = data_manager.get_brief_train_job(train_id) + cached_added_info = cache_item.get(key=LINEAGE).added_info + new_added_info = dict(cached_added_info) + + for key, value in added_info.items(): + if key in ["tag", "remark"]: + new_added_info.update({key: value}) + + with cache_item.lock_key(LINEAGE): + cache_item.get(key=LINEAGE).added_info = new_added_info class LineageCacheItemUpdater(BaseCacheItemUpdater): @@ -25,15 +43,28 @@ class LineageCacheItemUpdater(BaseCacheItemUpdater): def update_item(self, cache_item: CachedTrainJob): """Update cache item in place.""" - log_path = cache_item.summary_dir - log_dir = os.path.dirname(log_path) - lineage_info = LineageSummaryAnalyzer.get_summary_infos(log_path) - user_defined_info = LineageSummaryAnalyzer.get_user_defined_info(log_path) - lineage_obj = LineageObj( - log_dir, - train_lineage=lineage_info.train_lineage, - evaluation_lineage=lineage_info.eval_lineage, - dataset_graph=lineage_info.dataset_graph, - user_defined_info=user_defined_info - ) - cache_item.set(key="lineage", value=lineage_obj) + summary_base_dir = cache_item.summary_base_dir + summary_dir = cache_item.summary_dir + update_time = cache_item.basic_info.update_time + + # The summary_base_dir and summary_dir have been normalized in data_manager. + if summary_base_dir == summary_dir: + relative_path = "./" + else: + relative_path = f'./{os.path.basename(summary_dir)}' + + try: + cached_added_info = cache_item.get(key=LINEAGE).added_info + except ParamValueError: + cached_added_info = None + try: + lineage_parser = LineageParser(summary_dir, update_time, cached_added_info) + super_lineage_obj = lineage_parser.super_lineage_obj + except LineageFileNotFoundError: + super_lineage_obj = None + + if super_lineage_obj is None: + logger.warning("There is no lineage to update in tran job %s.", relative_path) + return + with cache_item.lock_key(LINEAGE): + cache_item.set(key=LINEAGE, value=super_lineage_obj) diff --git a/mindinsight/lineagemgr/common/utils.py b/mindinsight/lineagemgr/common/utils.py index 0fd499b2beb280d9332120fb15eea9aaf5f1f900..d2462a3e931d2b8b5234e67f18ad050b55cca9bc 100644 --- a/mindinsight/lineagemgr/common/utils.py +++ b/mindinsight/lineagemgr/common/utils.py @@ -15,8 +15,11 @@ """Lineage utils.""" from functools import wraps +from mindinsight.lineagemgr.common.log import logger as log from mindinsight.lineagemgr.common.exceptions.exceptions import LineageParamRunContextError, \ - LineageGetModelFileError, LineageLogError + LineageGetModelFileError, LineageLogError, LineageParamValueError, LineageDirNotExistError, \ + LineageParamSummaryPathError +from mindinsight.lineagemgr.common.validator.validate import validate_path from mindinsight.utils.exceptions import MindInsightException @@ -54,3 +57,14 @@ def try_except(logger): return wrapper return try_except_decorate + + +def normalize_summary_dir(summary_dir): + """Normalize summary dir.""" + try: + summary_dir = validate_path(summary_dir) + except (LineageParamValueError, LineageDirNotExistError) as error: + log.error(str(error)) + log.exception(error) + raise LineageParamSummaryPathError(str(error.message)) + return summary_dir diff --git a/mindinsight/lineagemgr/common/validator/validate.py b/mindinsight/lineagemgr/common/validator/validate.py index b409f574c1d2ac419f82b653f4f10e160dfe95d4..4a839d95445b70099c02eeafff19efebc90d29b5 100644 --- a/mindinsight/lineagemgr/common/validator/validate.py +++ b/mindinsight/lineagemgr/common/validator/validate.py @@ -23,7 +23,7 @@ from mindinsight.lineagemgr.common.exceptions.exceptions import LineageParamMiss from mindinsight.lineagemgr.common.log import logger as log from mindinsight.lineagemgr.common.validator.validate_path import safe_normalize_path from mindinsight.lineagemgr.querier.query_model import FIELD_MAPPING -from mindinsight.utils.exceptions import MindInsightException +from mindinsight.utils.exceptions import MindInsightException, ParamValueError try: from mindspore.nn import Cell @@ -437,3 +437,26 @@ def validate_user_defined_info(user_defined_info): if len(field_map) + len(user_defined_keys) != len(all_keys): raise LineageParamValueError("There are some keys have defined in lineage.") + + +def validate_train_id(relative_path): + """ + Check if train_id is valid. + + Args: + relative_path (str): Train ID of a summary directory, e.g. './log1'. + + Returns: + bool, if train id is valid, return True. + + """ + if not relative_path.startswith('./'): + log.warning("The relative_path does not start with './'.") + raise ParamValueError( + "Summary dir should be relative path starting with './'." + ) + if len(relative_path.split("/")) > 2: + log.warning("The relative_path contains multiple '/'.") + raise ParamValueError( + "Summary dir should be relative path starting with './'." + ) diff --git a/mindinsight/lineagemgr/lineage_parser.py b/mindinsight/lineagemgr/lineage_parser.py new file mode 100644 index 0000000000000000000000000000000000000000..52e6a338aa06db28c8fee530d3d0386fefc05f16 --- /dev/null +++ b/mindinsight/lineagemgr/lineage_parser.py @@ -0,0 +1,171 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""This file is used to parse lineage info.""" +import os + +from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher +from mindinsight.lineagemgr.common.exceptions.exceptions import LineageSummaryAnalyzeException, \ + LineageEventNotExistException, LineageEventFieldNotExistException, LineageFileNotFoundError, \ + MindInsightException +from mindinsight.lineagemgr.common.log import logger +from mindinsight.lineagemgr.common.path_parser import SummaryPathParser +from mindinsight.lineagemgr.summary.lineage_summary_analyzer import LineageSummaryAnalyzer +from mindinsight.lineagemgr.querier.query_model import LineageObj +from mindinsight.utils.exceptions import ParamValueError + +LINEAGE = "lineage" + + +class SuperLineageObj: + """This is an object for LineageObj and its additional info.""" + def __init__(self, lineage_obj, update_time, added_info=None): + self._lineage_obj = lineage_obj + self._update_time = update_time + self._added_info = added_info if added_info is not None else dict() + + @property + def lineage_obj(self): + """Get lineage object.""" + return self._lineage_obj + + @property + def added_info(self): + """Get added info.""" + return self._added_info + + @added_info.setter + def added_info(self, added_info): + """Set added info.""" + self._added_info = added_info + + @property + def update_time(self): + """Get update time.""" + return self._update_time + + +class LineageParser: + """Lineage parser.""" + def __init__(self, summary_dir, update_time=None, added_info=None): + self._super_lineage_obj = None + self._summary_dir = summary_dir + self._update_time = update_time + self._added_info = added_info + self._parse_summary_log() + + def _parse_summary_log(self): + """ + Parse the single summary log. + + Returns: + bool, `True` if parse summary log success, else `False`. + """ + file_path = SummaryPathParser.get_latest_lineage_summary(self._summary_dir) + if file_path is None: + logger.warning('There is no summary log file under summary_dir %s.', self._summary_dir) + raise LineageFileNotFoundError( + 'There is no summary log file under summary_dir.' + ) + try: + lineage_info = LineageSummaryAnalyzer.get_summary_infos(file_path) + user_defined_info = LineageSummaryAnalyzer.get_user_defined_info(file_path) + lineage_obj = LineageObj( + self._summary_dir, + train_lineage=lineage_info.train_lineage, + evaluation_lineage=lineage_info.eval_lineage, + dataset_graph=lineage_info.dataset_graph, + user_defined_info=user_defined_info + ) + self._super_lineage_obj = SuperLineageObj(lineage_obj, self._update_time, self._added_info) + except (LineageSummaryAnalyzeException, + LineageEventNotExistException, + LineageEventFieldNotExistException): + logger.warning("Parse file failed under summary_dir %s.", self._summary_dir) + except MindInsightException as error: + logger.error(str(error)) + logger.exception(error) + logger.warning("Parse file failed under summary_dir %s.", self._summary_dir) + + @property + def super_lineage_obj(self): + """Get super lineage object.""" + return self._super_lineage_obj + + +class LineageOrganizer: + """Lineage organizer.""" + def __init__(self, data_manager=None, summary_base_dir=None): + self._data_manager = data_manager + self._summary_base_dir = summary_base_dir + self._check_params() + self._super_lineage_objs = {} + self._organize_from_cache() + self._organize_from_disk() + + def _check_params(self): + """Check params.""" + if self._data_manager is not None and self._summary_base_dir is not None: + self._summary_base_dir = None + + def _organize_from_disk(self): + """Organize lineage objs from disk.""" + if self._summary_base_dir is None: + return + summary_watcher = SummaryWatcher() + relative_dirs = summary_watcher.list_summary_directories( + summary_base_dir=self._summary_base_dir + ) + + no_lineage_count = 0 + for item in relative_dirs: + relative_dir = item.get('relative_path') + update_time = item.get('update_time') + abs_summary_dir = os.path.realpath(os.path.join(self._summary_base_dir, relative_dir)) + + try: + lineage_parser = LineageParser(abs_summary_dir, update_time) + super_lineage_obj = lineage_parser.super_lineage_obj + if super_lineage_obj is not None: + self._super_lineage_objs.update({abs_summary_dir: super_lineage_obj}) + except LineageFileNotFoundError: + no_lineage_count += 1 + + if no_lineage_count == len(relative_dirs): + logger.error('There is no summary log file under summary_base_dir.') + raise LineageFileNotFoundError( + 'There is no summary log file under summary_base_dir.' + ) + + def _organize_from_cache(self): + """Organize lineage objs from cache.""" + if self._data_manager is None: + return + brief_cache = self._data_manager.get_brief_cache() + cache_items = brief_cache.cache_items + for relative_dir, cache_train_job in cache_items.items(): + try: + super_lineage_obj = cache_train_job.get("lineage") + self._super_lineage_objs.update({relative_dir: super_lineage_obj}) + except ParamValueError: + logger.info("This is no lineage info in train job %s.", relative_dir) + + @property + def super_lineage_objs(self): + """Get super lineage objects.""" + return self._super_lineage_objs + + def get_super_lineage_obj(self, relative_path): + """Get super lineage object by given relative path.""" + return self._super_lineage_objs.get(relative_path) diff --git a/mindinsight/lineagemgr/querier/querier.py b/mindinsight/lineagemgr/querier/querier.py index 2ae772ace94863d84ef17582ecc196369a5dc4d5..1407dc3bd5657dba21c919a9280f28d07b6c1285 100644 --- a/mindinsight/lineagemgr/querier/querier.py +++ b/mindinsight/lineagemgr/querier/querier.py @@ -16,17 +16,11 @@ import enum import functools import operator -import os -from mindinsight.lineagemgr.common.exceptions.exceptions import \ - LineageParamTypeError, LineageSummaryAnalyzeException, \ - LineageEventNotExistException, LineageQuerierParamException, \ - LineageSummaryParseException, LineageEventFieldNotExistException -from mindinsight.lineagemgr.common.log import logger +from mindinsight.lineagemgr.common.exceptions.exceptions import LineageQuerierParamException, LineageParamTypeError from mindinsight.lineagemgr.common.utils import enum_to_list -from mindinsight.lineagemgr.querier.query_model import LineageObj, FIELD_MAPPING -from mindinsight.lineagemgr.summary.lineage_summary_analyzer import \ - LineageSummaryAnalyzer +from mindinsight.lineagemgr.lineage_parser import SuperLineageObj +from mindinsight.lineagemgr.querier.query_model import FIELD_MAPPING @enum.unique @@ -173,20 +167,24 @@ class Querier: See the method `filter_summary_lineage` for supported fields. Args: - summary_path (Union[str, list[str]]): The single summary log path or - a list of summary log path. + super_lineage_objs (dict): A dict of . Raises: LineageParamTypeError: If the input parameter type is invalid. LineageQuerierParamException: If the input parameter value is invalid. LineageSummaryParseException: If all summary logs parsing failed. """ - def __init__(self, summary_path): - self._lineage_objects = [] - self._index_map = {} - self._parse_failed_paths = [] - self._parse_summary_logs(summary_path) - self._size = len(self._lineage_objects) + def __init__(self, super_lineage_objs): + self._super_lineage_objs = self._check_objs(super_lineage_objs) + + def _check_objs(self, super_lineage_objs): + if super_lineage_objs is None: + raise LineageQuerierParamException( + 'querier_init_param', 'The querier init param is empty.' + ) + if not isinstance(super_lineage_objs, dict): + raise LineageParamTypeError("Init param should be a dict.") + return super_lineage_objs def get_summary_lineage(self, summary_dir=None, filter_keys=None): """ @@ -209,7 +207,6 @@ class Querier: Returns: list[dict], summary lineage information. """ - self._parse_fail_summary_logs() if filter_keys is None: filter_keys = LineageFilterKey.get_key_list() @@ -222,20 +219,20 @@ class Querier: if summary_dir is None: result = [ - item.get_summary_info(filter_keys) for item in self._lineage_objects + item.lineage_obj.get_summary_info(filter_keys) for item in self._super_lineage_objs.values() ] - else: - index = self._index_map.get(summary_dir) - if index is None: - raise LineageQuerierParamException( - 'summary_dir', - 'Summary dir {} does not exist.'.format(summary_dir) - ) - lineage_obj = self._lineage_objects[index] + elif summary_dir in self._super_lineage_objs: + lineage_obj = self._super_lineage_objs[summary_dir].lineage_obj result = [lineage_obj.get_summary_info(filter_keys)] + else: + raise LineageQuerierParamException( + 'summary_dir', + 'Summary dir {} does not exist.'.format(summary_dir) + ) + return result - def filter_summary_lineage(self, condition=None): + def filter_summary_lineage(self, condition=None, added=False): """ Filter and sort lineage information based on the specified condition. @@ -253,7 +250,7 @@ class Querier: Returns: dict, filtered and sorted model lineage information. """ - def _filter(lineage_obj: LineageObj): + def _filter(super_lineage_obj: SuperLineageObj): for condition_key, condition_value in condition.items(): if ConditionParam.is_condition_type(condition_key): continue @@ -263,7 +260,7 @@ class Querier: 'The field {} not supported'.format(condition_key) ) - value = lineage_obj.get_value_by_key(condition_key) + value = super_lineage_obj.lineage_obj.get_value_by_key(condition_key) for exp_key, exp_value in condition_value.items(): if not ExpressionType.is_valid_exp(exp_key): raise LineageQuerierParamException( @@ -274,9 +271,9 @@ class Querier: return False return True - def _cmp(obj1: LineageObj, obj2: LineageObj): - value1 = obj1.get_value_by_key(sorted_name) - value2 = obj2.get_value_by_key(sorted_name) + def _cmp(obj1: SuperLineageObj, obj2: SuperLineageObj): + value1 = obj1.lineage_obj.get_value_by_key(sorted_name) + value2 = obj2.lineage_obj.get_value_by_key(sorted_name) if value1 is None and value2 is None: cmp_result = 0 @@ -293,11 +290,14 @@ class Querier: cmp_result = (type1 > type2) - (type1 < type2) return cmp_result - self._parse_fail_summary_logs() - if condition is None: condition = {} - results = list(filter(_filter, self._lineage_objects)) + + self._add_dataset_mark() + super_lineage_objs = list(self._super_lineage_objs.values()) + super_lineage_objs.sort(key=lambda x: x.update_time, reverse=True) + + results = list(filter(_filter, super_lineage_objs)) if ConditionParam.SORTED_NAME.value in condition: sorted_name = condition.get(ConditionParam.SORTED_NAME.value) @@ -323,9 +323,11 @@ class Querier: for item in offset_results: lineage_object = dict() if LineageType.MODEL.value in lineage_types: - lineage_object.update(item.to_model_lineage_dict()) + lineage_object.update(item.lineage_obj.to_model_lineage_dict()) if LineageType.DATASET.value in lineage_types: - lineage_object.update(item.to_dataset_lineage_dict()) + lineage_object.update(item.lineage_obj.to_dataset_lineage_dict()) + if added: + lineage_object.update({"added_info": item.added_info}) object_items.append(lineage_object) lineage_info = { @@ -341,7 +343,7 @@ class Querier: customized = dict() for offset_result in offset_results: for obj_name in ["metric", "user_defined"]: - self._organize_customized_item(customized, offset_result, obj_name) + self._organize_customized_item(customized, offset_result.lineage_obj, obj_name) # If types contain numbers and string, it will be "mixed". # If types contain "int" and "float", it will be "float". @@ -410,10 +412,10 @@ class Querier: Args: condition (dict): Filter and sort condition. - result (list[LineageObj]): Filtered and sorted result. + result (list[SuperLineageObj]): Filtered and sorted result. Returns: - list[LineageObj], paginated result. + list[SuperLineageObj], paginated result. """ offset = 0 limit = 10 @@ -428,87 +430,12 @@ class Querier: offset_result = result[offset * limit: limit * (offset + 1)] return offset_result - def _parse_summary_logs(self, summary_path): - """ - Parse summary logs. - - Args: - summary_path (Union[str, list[str]]): The single summary log path or - a list of summary log path. - """ - if not summary_path: - raise LineageQuerierParamException( - 'summary_path', 'The summary path is empty.' - ) - if isinstance(summary_path, str): - self._parse_summary_log(summary_path, 0) - elif isinstance(summary_path, list): - index = 0 - for path in summary_path: - parse_result = self._parse_summary_log(path, index) - if parse_result: - index += 1 - else: - raise LineageParamTypeError('Summary path is not str or list.') - - if self._parse_failed_paths: - logger.info('Parse failed paths: %s', str(self._parse_failed_paths)) - - if not self._lineage_objects: - raise LineageSummaryParseException() - - def _parse_summary_log(self, log_path, index: int, is_save_fail_path=True): - """ - Parse the single summary log. - - Args: - log_path (str): The single summary log path. - index (int): TrainInfo instance index in the train info list. - is_save_fail_path (bool): Set whether to save the failed summary - path. Default: True. - - Returns: - bool, `True` if parse summary log success, else `False`. - """ - log_dir = os.path.dirname(log_path) - try: - lineage_info = LineageSummaryAnalyzer.get_summary_infos(log_path) - user_defined_info = LineageSummaryAnalyzer.get_user_defined_info(log_path) - lineage_obj = LineageObj( - log_dir, - train_lineage=lineage_info.train_lineage, - evaluation_lineage=lineage_info.eval_lineage, - dataset_graph=lineage_info.dataset_graph, - user_defined_info=user_defined_info - ) - self._lineage_objects.append(lineage_obj) - self._add_dataset_mark() - self._index_map[log_dir] = index - return True - except (LineageSummaryAnalyzeException, - LineageEventNotExistException, - LineageEventFieldNotExistException): - if is_save_fail_path: - self._parse_failed_paths.append(log_path) - return False - - def _parse_fail_summary_logs(self): - """Parse fail summary logs.""" - if self._parse_failed_paths: - failed_paths = [] - for path in self._parse_failed_paths: - parse_result = self._parse_summary_log(path, self._size, False) - if parse_result: - self._size += 1 - else: - failed_paths.append(path) - self._parse_failed_paths = failed_paths - def _add_dataset_mark(self): """Add dataset mark into LineageObj.""" # give a dataset mark for each dataset graph in lineage information marked_dataset_group = {'1': None} - for lineage in self._lineage_objects: + for super_lineage_obj in self._super_lineage_objs.values(): + lineage = super_lineage_obj.lineage_obj dataset_mark = '0' for dataset_graph_mark, marked_dataset_graph in marked_dataset_group.items(): if marked_dataset_graph == lineage.dataset_graph: diff --git a/mindinsight/lineagemgr/querier/query_model.py b/mindinsight/lineagemgr/querier/query_model.py index 5c140e3ae823d7a2c3d2a3703265ed668d5d744e..36509bd80937542ef63e907ff7384d2f3aad744b 100644 --- a/mindinsight/lineagemgr/querier/query_model.py +++ b/mindinsight/lineagemgr/querier/query_model.py @@ -298,6 +298,7 @@ class LineageObj: result[self._name_user_defined] = self.user_defined # add dataset_graph into filtration result result[self._name_dataset_graph] = getattr(self, self._name_dataset_graph) + return result def _parse_train_lineage(self, train_lineage): diff --git a/tests/st/func/lineagemgr/api/test_model_api.py b/tests/st/func/lineagemgr/api/test_model_api.py index c3942e9c8432560058a3f9016fe95816c6914ba7..516479fc1dfa9f61bf41d5d8f8e1659a4064ad12 100644 --- a/tests/st/func/lineagemgr/api/test_model_api.py +++ b/tests/st/func/lineagemgr/api/test_model_api.py @@ -356,6 +356,7 @@ class TestModelApi(TestCase): assert expect_result == res expect_result = { + 'customized': {}, 'object': [], 'count': 0 } diff --git a/tests/ut/backend/lineagemgr/test_lineage_api.py b/tests/ut/backend/lineagemgr/test_lineage_api.py index 8ff5b222f67fb5cb6fb6c604b7a2459047883530..21b1885035b33706b82f4251789e6871e3489ec5 100644 --- a/tests/ut/backend/lineagemgr/test_lineage_api.py +++ b/tests/ut/backend/lineagemgr/test_lineage_api.py @@ -70,7 +70,7 @@ class TestSearchModel(TestCase): self.url = '/v1/mindinsight/lineagemgr/lineages' @mock.patch('mindinsight.backend.lineagemgr.lineage_api.settings') - @mock.patch('mindinsight.backend.lineagemgr.lineage_api.filter_summary_lineage') + @mock.patch('mindinsight.backend.lineagemgr.lineage_api.general_filter_summary_lineage') def test_search_model_success(self, *args): """Test the success of model_success.""" base_dir = '/path/to/test_lineage_summary_dir_base' @@ -113,7 +113,7 @@ class TestSearchModel(TestCase): self.assertDictEqual(expect_result, response.get_json()) @mock.patch('mindinsight.backend.lineagemgr.lineage_api.settings') - @mock.patch('mindinsight.backend.lineagemgr.lineage_api.filter_summary_lineage') + @mock.patch('mindinsight.backend.lineagemgr.lineage_api.general_filter_summary_lineage') def test_search_model_fail(self, *args): """Test the function of model_lineage with exception.""" response = self.app_client.post(self.url, data='xxx') diff --git a/tests/ut/lineagemgr/api/test_model.py b/tests/ut/lineagemgr/api/test_model.py index 7e7b442095cb17c9a5cc941e6f797e88b8789687..34f0e6c7385bb336927a9a73e77ac13c8a06a9db 100644 --- a/tests/ut/lineagemgr/api/test_model.py +++ b/tests/ut/lineagemgr/api/test_model.py @@ -29,12 +29,12 @@ class TestModel(TestCase): """Test the function of get_summary_lineage and filter_summary_lineage.""" @mock.patch('mindinsight.lineagemgr.api.model.Querier') - @mock.patch('mindinsight.lineagemgr.api.model.SummaryPathParser.get_latest_lineage_summary') + @mock.patch('mindinsight.lineagemgr.api.model.LineageParser') @mock.patch('os.path.isdir') - def test_get_summary_lineage_success(self, isdir_mock, latest_summary_mock, qurier_mock): + def test_get_summary_lineage_success(self, isdir_mock, parser_mock, qurier_mock): """Test the function of get_summary_lineage.""" isdir_mock.return_value = True - latest_summary_mock.return_value = '/path/to/summary_dir/a_MS_lineage' + parser_mock.return_value = MagicMock() mock_querier = MagicMock() qurier_mock.return_value = mock_querier @@ -53,7 +53,7 @@ class TestModel(TestCase): invalid_path ) - @mock.patch('mindinsight.lineagemgr.api.model.validate_path') + @mock.patch('mindinsight.lineagemgr.common.utils.validate_path') @mock.patch.object(SummaryPathParser, 'get_latest_lineage_summary') def test_get_summary_lineage_failed2(self, mock_summary, mock_valid): """Test get_summary_lineage failed.""" @@ -66,29 +66,19 @@ class TestModel(TestCase): '/path/to/summary_dir' ) - @mock.patch('mindinsight.lineagemgr.api.model.validate_path') - @mock.patch('mindinsight.lineagemgr.api.model.Querier') + @mock.patch('mindinsight.lineagemgr.lineage_parser.LineageParser._parse_summary_log') + @mock.patch('mindinsight.lineagemgr.common.utils.validate_path') @mock.patch.object(SummaryPathParser, 'get_latest_lineage_summary') def test_get_summary_lineage_failed3(self, mock_summary, - mock_querier, - mock_valid): + mock_valid, + mock_paser): """Test get_summary_lineage failed.""" mock_summary.return_value = '/path/to/summary/file' - mock_querier.return_value.get_summary_lineage.side_effect = \ - LineageSummaryParseException() mock_valid.return_value = '/path/to/summary_dir' - res = get_summary_lineage('/path/to/summary_dir') - assert res == {} - - mock_querier.side_effect = LineageQuerierParamException( - ['keys'], 'key') - self.assertRaisesRegex( - LineageQuerySummaryDataError, - 'Get summary lineage failed', - get_summary_lineage, - '/path/to/summary_dir' - ) + mock_paser.return_value = None + result = get_summary_lineage('/path/to/summary_dir') + assert {} == result @mock.patch('mindinsight.lineagemgr.api.model.validate_path') def test_convert_relative_path_to_abspath(self, validate_path_mock): @@ -135,13 +125,13 @@ class TestModel(TestCase): class TestFilterAPI(TestCase): """Test the function of filter_summary_lineage.""" - + @mock.patch('mindinsight.lineagemgr.api.model.LineageOrganizer') @mock.patch('mindinsight.lineagemgr.api.model.Querier') - @mock.patch('mindinsight.lineagemgr.api.model.SummaryPathParser.get_latest_lineage_summaries') + @mock.patch('mindinsight.lineagemgr.lineage_parser.SummaryPathParser.get_latest_lineage_summary') @mock.patch('mindinsight.lineagemgr.api.model._convert_relative_path_to_abspath') - @mock.patch('mindinsight.lineagemgr.api.model.validate_path') + @mock.patch('mindinsight.lineagemgr.api.model.normalize_summary_dir') def test_filter_summary_lineage(self, validate_path_mock, convert_path_mock, - latest_summary_mock, qurier_mock): + latest_summary_mock, qurier_mock, organizer_mock): """Test the function of filter_summary_lineage.""" convert_path_mock.return_value = { 'summary_dir': { @@ -151,6 +141,8 @@ class TestFilterAPI(TestCase): 'gt': 2.0 } } + organizer_mock = MagicMock() + organizer_mock.super_lineage_objs = None validate_path_mock.return_value = True latest_summary_mock.return_value = ['/path/to/summary_base_dir/summary_dir'] mock_querier = MagicMock() @@ -172,7 +164,7 @@ class TestFilterAPI(TestCase): ) @mock.patch('mindinsight.lineagemgr.api.model.validate_condition') - @mock.patch('mindinsight.lineagemgr.api.model.validate_path') + @mock.patch('mindinsight.lineagemgr.api.model.normalize_summary_dir') def test_invalid_search_condition(self, mock_path, mock_valid): """Test filter_summary_lineage with invalid invalid param.""" mock_path.return_value = None @@ -188,7 +180,7 @@ class TestFilterAPI(TestCase): @mock.patch('mindinsight.lineagemgr.api.model.validate_search_model_condition') @mock.patch('mindinsight.lineagemgr.api.model.validate_condition') - @mock.patch('mindinsight.lineagemgr.api.model.validate_path') + @mock.patch('mindinsight.lineagemgr.common.utils.validate_path') @mock.patch('mindinsight.lineagemgr.api.model._convert_relative_path_to_abspath') def test_failed_to_convert_path(self, mock_convert, *args): """Test filter_summary_lineage with invalid invalid param.""" @@ -205,23 +197,24 @@ class TestFilterAPI(TestCase): @mock.patch('mindinsight.lineagemgr.api.model._convert_relative_path_to_abspath') @mock.patch('mindinsight.lineagemgr.api.model.validate_search_model_condition') @mock.patch('mindinsight.lineagemgr.api.model.validate_condition') - @mock.patch('mindinsight.lineagemgr.api.model.validate_path') - @mock.patch.object(SummaryPathParser, 'get_latest_lineage_summaries') + @mock.patch('mindinsight.lineagemgr.api.model.normalize_summary_dir') + @mock.patch.object(SummaryPathParser, 'get_latest_lineage_summary') def test_failed_to_get_summary_filesh(self, mock_parse, *args): """Test filter_summary_lineage with invalid invalid param.""" - mock_parse.return_value = [] - args[0].return_value = None + path = '/path/to/summary/dir' + mock_parse.return_value = None + args[0].return_value = path self.assertRaisesRegex( LineageFileNotFoundError, 'There is no summary log file under summary_base_dir.', filter_summary_lineage, - '/path/to/summary/dir' + path ) @mock.patch('mindinsight.lineagemgr.api.model._convert_relative_path_to_abspath') @mock.patch('mindinsight.lineagemgr.api.model.validate_search_model_condition') @mock.patch('mindinsight.lineagemgr.api.model.validate_condition') - @mock.patch('mindinsight.lineagemgr.api.model.validate_path') + @mock.patch('mindinsight.lineagemgr.api.model.normalize_summary_dir') @mock.patch.object(SummaryPathParser, 'get_latest_lineage_summaries') @mock.patch('mindinsight.lineagemgr.api.model.Querier') def test_failed_to_querier(self, mock_query, mock_parse, *args): diff --git a/tests/ut/lineagemgr/querier/test_querier.py b/tests/ut/lineagemgr/querier/test_querier.py index 5cc176d50c349c267876e73f6c03fd4876df990d..65b9be24572c84185135c07691c5ab0e0e2f6045 100644 --- a/tests/ut/lineagemgr/querier/test_querier.py +++ b/tests/ut/lineagemgr/querier/test_querier.py @@ -13,14 +13,15 @@ # limitations under the License. # ============================================================================ """Test the querier module.""" +import time + from unittest import TestCase, mock from google.protobuf.json_format import ParseDict import mindinsight.datavisual.proto_files.mindinsight_lineage_pb2 as summary_pb2 -from mindinsight.lineagemgr.common.exceptions.exceptions import (LineageParamTypeError, LineageQuerierParamException, - LineageSummaryAnalyzeException, - LineageSummaryParseException) +from mindinsight.lineagemgr.common.exceptions.exceptions import LineageParamTypeError, LineageQuerierParamException +from mindinsight.lineagemgr.lineage_parser import LineageOrganizer from mindinsight.lineagemgr.querier.querier import Querier from mindinsight.lineagemgr.summary.lineage_summary_analyzer import LineageInfo @@ -246,8 +247,11 @@ LINEAGE_FILTRATION_6 = { class TestQuerier(TestCase): """Test the class of `Querier`.""" - @mock.patch('mindinsight.lineagemgr.querier.querier.LineageSummaryAnalyzer.get_user_defined_info') - @mock.patch('mindinsight.lineagemgr.querier.querier.LineageSummaryAnalyzer.get_summary_infos') + + @mock.patch('mindinsight.lineagemgr.lineage_parser.SummaryPathParser.get_latest_lineage_summary') + @mock.patch('mindinsight.lineagemgr.lineage_parser.SummaryWatcher.list_summary_directories') + @mock.patch('mindinsight.lineagemgr.lineage_parser.LineageSummaryAnalyzer.get_user_defined_info') + @mock.patch('mindinsight.lineagemgr.lineage_parser.LineageSummaryAnalyzer.get_summary_infos') def setUp(self, *args): """Initialization before test case execution.""" args[0].return_value = create_lineage_info( @@ -256,22 +260,22 @@ class TestQuerier(TestCase): event_data.EVENT_DATASET_DICT_0 ) args[1].return_value = [] + args[3].return_value = 'path' - single_summary_path = '/path/to/summary0/log0' - self.single_querier = Querier(single_summary_path) + args[2].return_value = [{'relative_path': './', 'update_time': 1}] + single_summary_path = '/path/to/summary0' + lineage_objects = LineageOrganizer(summary_base_dir=single_summary_path).super_lineage_objs + self.single_querier = Querier(lineage_objects) lineage_infos = get_lineage_infos() args[0].side_effect = lineage_infos - summary_paths = [ - '/path/to/summary0/log0', - '/path/to/summary1/log1', - '/path/to/summary2/log2', - '/path/to/summary3/log3', - '/path/to/summary4/log4', - '/path/to/summary5/log5', - '/path/to/summary6/log6' - ] - self.multi_querier = Querier(summary_paths) + summary_base_dir = '/path/to' + relative_dirs = [] + for i in range(7): + relative_dirs.append(dict(relative_path=f'./summary{i}', update_time=time.time() - i)) + args[2].return_value = relative_dirs + lineage_objects = LineageOrganizer(summary_base_dir=summary_base_dir).super_lineage_objs + self.multi_querier = Querier(lineage_objects) def test_get_summary_lineage_success_1(self): """Test the success of get_summary_lineage.""" @@ -282,9 +286,7 @@ class TestQuerier(TestCase): def test_get_summary_lineage_success_2(self): """Test the success of get_summary_lineage.""" expected_result = [LINEAGE_INFO_0] - result = self.single_querier.get_summary_lineage( - summary_dir='/path/to/summary0' - ) + result = self.single_querier.get_summary_lineage() self.assertListEqual(expected_result, result) def test_get_summary_lineage_success_3(self): @@ -601,60 +603,12 @@ class TestQuerier(TestCase): condition=condition ) - @mock.patch('mindinsight.lineagemgr.querier.querier.LineageSummaryAnalyzer.get_summary_infos') - def test_init_fail(self, *args): + def test_init_fail(self): """Test the function of init with exception.""" - summary_path = {'xxx': 1} + obj_dict = 'a' with self.assertRaises(LineageParamTypeError): - Querier(summary_path) + Querier(obj_dict) - summary_path = None + obj_dict = None with self.assertRaises(LineageQuerierParamException): - Querier(summary_path) - - args[0].side_effect = LineageSummaryAnalyzeException - summary_path = '/path/to/summary0/log0' - with self.assertRaises(LineageSummaryParseException): - Querier(summary_path) - - @mock.patch('mindinsight.lineagemgr.querier.querier.LineageSummaryAnalyzer.get_user_defined_info') - @mock.patch('mindinsight.lineagemgr.querier.querier.LineageSummaryAnalyzer.get_summary_infos') - def test_parse_fail_summary_logs_1(self, *args): - """Test the function of parsing fail summary logs.""" - lineage_infos = get_lineage_infos() - args[0].side_effect = lineage_infos - args[1].return_value = [] - - summary_path = ['/path/to/summary0/log0'] - querier = Querier(summary_path) - querier._parse_failed_paths.append('/path/to/summary1/log1') - expected_result = [ - LINEAGE_INFO_0, - LINEAGE_INFO_1 - ] - result = querier.get_summary_lineage() - self.assertListEqual(expected_result, result) - self.assertListEqual([], querier._parse_failed_paths) - - @mock.patch('mindinsight.lineagemgr.querier.querier.LineageSummaryAnalyzer.get_user_defined_info') - @mock.patch('mindinsight.lineagemgr.querier.querier.LineageSummaryAnalyzer.get_summary_infos') - def test_parse_fail_summary_logs_2(self, *args): - """Test the function of parsing fail summary logs.""" - args[0].return_value = create_lineage_info( - event_data.EVENT_TRAIN_DICT_0, - event_data.EVENT_EVAL_DICT_0, - event_data.EVENT_DATASET_DICT_0, - ) - args[1].return_value = [] - - summary_path = ['/path/to/summary0/log0'] - querier = Querier(summary_path) - querier._parse_failed_paths.append('/path/to/summary1/log1') - - args[0].return_value = create_lineage_info(None, None, None) - expected_result = [LINEAGE_INFO_0] - result = querier.get_summary_lineage() - self.assertListEqual(expected_result, result) - self.assertListEqual( - ['/path/to/summary1/log1'], querier._parse_failed_paths - ) + Querier(obj_dict)