提交 2f28388b 编写于 作者: M mindspore-ci-bot 提交者: Gitee

!120 establish a caching mechanism for lineage

Merge pull request !120 from luopengting/lineage_cache_datamgr
......@@ -14,9 +14,10 @@
# ============================================================================
"""Trigger data manager load."""
from mindinsight.datavisual.data_transform.data_manager import DATA_MANAGER
from mindinsight.datavisual.common.log import logger
from mindinsight.conf import settings
from mindinsight.datavisual.common.log import logger
from mindinsight.datavisual.data_transform.data_manager import DATA_MANAGER
from mindinsight.lineagemgr.cache_item_updater import LineageCacheItemUpdater
def init_module(app):
......@@ -29,6 +30,6 @@ def init_module(app):
"""
# Just to suppress pylint warning about unused arg.
logger.debug("App: %s", type(app))
# Register cache item updater here, before start load data.
DATA_MANAGER.register_brief_cache_item_updater(LineageCacheItemUpdater())
DATA_MANAGER.start_load_data(reload_interval=int(settings.RELOAD_INTERVAL),
max_threads_count=int(settings.MAX_THREADS_COUNT))
......@@ -20,9 +20,10 @@ from flask import Blueprint, jsonify, request
from mindinsight.conf import settings
from mindinsight.datavisual.utils.tools import get_train_id
from mindinsight.lineagemgr import filter_summary_lineage, get_summary_lineage
from mindinsight.lineagemgr.common.validator.validate import validate_path
from mindinsight.datavisual.data_transform.data_manager import DATA_MANAGER
from mindinsight.lineagemgr.api.model import general_filter_summary_lineage, general_get_summary_lineage
from mindinsight.utils.exceptions import MindInsightException, ParamValueError
from mindinsight.lineagemgr.cache_item_updater import update_lineage_object
BLUEPRINT = Blueprint("lineage", __name__, url_prefix=settings.URL_PREFIX.rstrip("/"))
......@@ -68,8 +69,10 @@ def _get_lineage_info(search_condition):
"""
summary_base_dir = str(settings.SUMMARY_BASE_DIR)
try:
lineage_info = filter_summary_lineage(
summary_base_dir, search_condition)
lineage_info = general_filter_summary_lineage(
data_manager=DATA_MANAGER,
search_condition=search_condition,
added=True)
lineages = lineage_info['object']
......@@ -91,6 +94,30 @@ def _get_lineage_info(search_condition):
return lineage_info
@BLUEPRINT.route("/lineagemgr/lineages", methods=["PUT"])
def update_lineage():
"""
Get lineage.
Returns:
str, update the lineage information about cache and tag.
Raises:
MindInsightException: If method fails to be called.
Examples:
>>> PUT http://xxxx/v1/mindinsight/lineagemgr/lineages?train_id=./run1
"""
train_id = get_train_id(request)
added_info = request.json
if not isinstance(added_info, dict):
raise ParamValueError("The request body should be a dict.")
update_lineage_object(DATA_MANAGER, train_id, added_info)
return jsonify({"status": "success"})
@BLUEPRINT.route("/datasets/dataset_graph", methods=["GET"])
def get_dataset_graph():
"""
......@@ -109,18 +136,9 @@ def get_dataset_graph():
summary_base_dir = str(settings.SUMMARY_BASE_DIR)
summary_dir = get_train_id(request)
if summary_dir.startswith('/'):
validate_path(summary_dir)
elif summary_dir.startswith('./'):
summary_dir = os.path.join(summary_base_dir, summary_dir[2:])
summary_dir = validate_path(summary_dir)
else:
raise ParamValueError(
"Summary dir should be absolute path or "
"relative path that relate to summary base dir."
)
try:
dataset_graph = get_summary_lineage(
dataset_graph = general_get_summary_lineage(
DATA_MANAGER,
summary_dir=summary_dir,
keys=['dataset_graph']
)
......
......@@ -21,10 +21,10 @@ It can read events data through the DataLoader.
This module also acts as a thread pool manager.
"""
import abc
import datetime
import enum
import threading
import time
import datetime
import os
from typing import Iterable, Optional
......@@ -76,11 +76,21 @@ class _BasicTrainJob:
"""Get summary directory path."""
return self._abs_summary_dir
@property
def summary_base_dir(self):
"""Get summary base directory path."""
return self._abs_summary_base_dir
@property
def train_id(self):
"""Get train id."""
return self._train_id
@property
def update_time(self):
"""Get update time."""
return self._update_time
class CachedTrainJob:
"""
......@@ -99,6 +109,7 @@ class CachedTrainJob:
self._content = {}
self._cache_status = _CacheStatus.NOT_IN_CACHE
self._key_locks = {}
@property
def cache_status(self):
......@@ -124,6 +135,11 @@ class CachedTrainJob:
"""Get summary directory path."""
return self._basic_info.summary_dir
@property
def summary_base_dir(self):
"""Get summary base directory path."""
return self._basic_info.summary_base_dir
def set(self, key, value):
"""Set value to cache."""
self._content[key] = value
......@@ -145,6 +161,12 @@ class CachedTrainJob:
"""Set basic train job info."""
self._basic_info = value
def lock_key(self, key):
"""Threading lock with given key."""
if key not in self._key_locks:
self._key_locks[key] = threading.Lock()
return self._key_locks[key]
class TrainJob:
"""
......@@ -325,6 +347,11 @@ class _BriefCacheManager(_BaseCacheManager):
for cache_item in self._cache_items.values():
updater.update_item(cache_item)
@property
def cache_items(self):
"""Get cache items."""
return self._cache_items
# Key for plugin tags.
DATAVISUAL_PLUGIN_KEY = "tag_mapping"
......@@ -740,6 +767,11 @@ class DataManager:
self._detail_cache = _DetailCacheManager(loader_generators)
self._brief_cache = _BriefCacheManager()
@property
def summary_base_dir(self):
"""Get summary base dir."""
return self._summary_base_dir
def start_load_data(self,
reload_interval=settings.RELOAD_INTERVAL,
max_threads_count=MAX_DATA_LOADER_SIZE):
......@@ -963,5 +995,13 @@ class DataManager:
"""Register brief cache item updater for brief cache manager."""
self._brief_cache.register_cache_item_updater(updater)
def get_brief_cache(self):
"""Get brief cache."""
return self._brief_cache
def get_brief_train_job(self, train_id):
"""Get brief train job."""
return self._brief_cache.get_train_job(train_id)
DATA_MANAGER = DataManager(settings.SUMMARY_BASE_DIR)
......@@ -16,15 +16,15 @@
import os
from mindinsight.lineagemgr.common.exceptions.exceptions import LineageParamValueError, \
LineageFileNotFoundError, LineageQuerySummaryDataError, LineageParamSummaryPathError, \
LineageQuerySummaryDataError, LineageParamSummaryPathError, \
LineageQuerierParamException, LineageDirNotExistError, LineageSearchConditionParamError, \
LineageParamTypeError, LineageSummaryParseException
from mindinsight.lineagemgr.common.log import logger as log
from mindinsight.lineagemgr.common.path_parser import SummaryPathParser
from mindinsight.lineagemgr.common.utils import normalize_summary_dir
from mindinsight.lineagemgr.common.validator.model_parameter import SearchModelConditionParameter
from mindinsight.lineagemgr.common.validator.validate import validate_filter_key
from mindinsight.lineagemgr.common.validator.validate import validate_search_model_condition, \
validate_condition, validate_path
from mindinsight.lineagemgr.common.validator.validate import validate_filter_key, validate_search_model_condition, \
validate_condition, validate_path, validate_train_id
from mindinsight.lineagemgr.lineage_parser import LineageParser, LineageOrganizer
from mindinsight.lineagemgr.querier.querier import Querier
from mindinsight.utils.exceptions import MindInsightException
......@@ -58,33 +58,61 @@ def get_summary_lineage(summary_dir, keys=None):
>>> summary_lineage_info = get_summary_lineage(summary_dir)
>>> hyper_parameters = get_summary_lineage(summary_dir, keys=["hyper_parameters"])
"""
try:
summary_dir = validate_path(summary_dir)
except MindInsightException as error:
log.error(str(error))
log.exception(error)
raise LineageParamSummaryPathError(str(error.message))
return general_get_summary_lineage(summary_dir=summary_dir, keys=keys)
def general_get_summary_lineage(data_manager=None, summary_dir=None, keys=None):
"""
Get summary lineage from data_manager or parsing from summaries.
One of data_manager or summary_dir needs to be specified. Support getting
super_lineage_obj from data_manager or parsing summaries by summary_dir.
Args:
data_manager (DataManager): Data manager defined as
mindinsight.datavisual.data_transform.data_manager.DataManager
summary_dir (str): The summary directory. It contains summary logs for
one training.
keys (list[str]): The filter keys of lineage information. The acceptable
keys are `metric`, `user_defined`, `hyper_parameters`, `algorithm`,
`train_dataset`, `model`, `valid_dataset` and `dataset_graph`.
If it is `None`, all information will be returned. Default: None.
Returns:
dict, the lineage information for one training.
Raises:
LineageParamSummaryPathError: If summary path is invalid.
LineageQuerySummaryDataError: If querying summary data fails.
LineageFileNotFoundError: If the summary log file is not found.
"""
default_result = {}
if data_manager is None and summary_dir is None:
raise LineageParamTypeError("One of data_manager or summary_dir needs to be specified.")
if keys is not None:
validate_filter_key(keys)
summary_path = SummaryPathParser.get_latest_lineage_summary(summary_dir)
if summary_path is None:
log.error('There is no summary log file under summary_dir.')
raise LineageFileNotFoundError(
'There is no summary log file under summary_dir.'
)
if data_manager is None:
normalize_summary_dir(summary_dir)
super_lineage_obj = None
if os.path.isabs(summary_dir):
super_lineage_obj = LineageParser(summary_dir).super_lineage_obj
elif data_manager is not None:
validate_train_id(summary_dir)
super_lineage_obj = LineageOrganizer(data_manager=data_manager).get_super_lineage_obj(summary_dir)
if super_lineage_obj is None:
return default_result
try:
result = Querier(summary_path).get_summary_lineage(
summary_dir, filter_keys=keys)
except LineageSummaryParseException:
return {}
result = Querier({summary_dir: super_lineage_obj}).get_summary_lineage(summary_dir, keys)
except (LineageQuerierParamException, LineageParamTypeError) as error:
log.error(str(error))
log.exception(error)
raise LineageQuerySummaryDataError("Get summary lineage failed.")
return result[0]
......@@ -209,12 +237,30 @@ def filter_summary_lineage(summary_base_dir, search_condition=None):
>>> summary_lineage = filter_summary_lineage(summary_base_dir)
>>> summary_lineage_filter = filter_summary_lineage(summary_base_dir, search_condition)
"""
try:
summary_base_dir = validate_path(summary_base_dir)
except (LineageParamValueError, LineageDirNotExistError) as error:
log.error(str(error))
log.exception(error)
raise LineageParamSummaryPathError(str(error.message))
return general_filter_summary_lineage(summary_base_dir=summary_base_dir, search_condition=search_condition)
def general_filter_summary_lineage(data_manager=None, summary_base_dir=None, search_condition=None, added=False):
"""
Filter summary lineage from data_manager or parsing from summaries.
One of data_manager or summary_base_dir needs to be specified. Support getting
super_lineage_obj from data_manager or parsing summaries by summary_base_dir.
Args:
data_manager (DataManager): Data manager defined as
mindinsight.datavisual.data_transform.data_manager.DataManager
summary_base_dir (str): The summary base directory. It contains summary
directories generated by training.
search_condition (dict): The search condition.
"""
if data_manager is None and summary_base_dir is None:
raise LineageParamTypeError("One of data_manager or summary_base_dir needs to be specified.")
if data_manager is None:
summary_base_dir = normalize_summary_dir(summary_base_dir)
else:
summary_base_dir = data_manager.summary_base_dir
search_condition = {} if search_condition is None else search_condition
......@@ -233,16 +279,11 @@ def filter_summary_lineage(summary_base_dir, search_condition=None):
log.exception(error)
raise LineageParamSummaryPathError(str(error.message))
summary_path = SummaryPathParser.get_latest_lineage_summaries(summary_base_dir)
if not summary_path:
log.error('There is no summary log file under summary_base_dir.')
raise LineageFileNotFoundError(
'There is no summary log file under summary_base_dir.'
)
try:
result = Querier(summary_path).filter_summary_lineage(
condition=search_condition
lineage_objects = LineageOrganizer(data_manager, summary_base_dir).super_lineage_objs
result = Querier(lineage_objects).filter_summary_lineage(
condition=search_condition,
added=added
)
except LineageSummaryParseException:
result = {'object': [], 'count': 0}
......
......@@ -16,8 +16,26 @@
import os
from mindinsight.datavisual.data_transform.data_manager import BaseCacheItemUpdater, CachedTrainJob
from mindinsight.lineagemgr.querier.query_model import LineageObj
from mindinsight.lineagemgr.summary.lineage_summary_analyzer import LineageSummaryAnalyzer
from mindinsight.lineagemgr.common.log import logger
from mindinsight.lineagemgr.common.exceptions.exceptions import LineageFileNotFoundError
from mindinsight.lineagemgr.common.validator.validate import validate_train_id
from mindinsight.lineagemgr.lineage_parser import LineageParser, LINEAGE
from mindinsight.utils.exceptions import ParamValueError
def update_lineage_object(data_manager, train_id, added_info: dict):
"""Update lineage objects about tag and remark."""
validate_train_id(train_id)
cache_item = data_manager.get_brief_train_job(train_id)
cached_added_info = cache_item.get(key=LINEAGE).added_info
new_added_info = dict(cached_added_info)
for key, value in added_info.items():
if key in ["tag", "remark"]:
new_added_info.update({key: value})
with cache_item.lock_key(LINEAGE):
cache_item.get(key=LINEAGE).added_info = new_added_info
class LineageCacheItemUpdater(BaseCacheItemUpdater):
......@@ -25,15 +43,28 @@ class LineageCacheItemUpdater(BaseCacheItemUpdater):
def update_item(self, cache_item: CachedTrainJob):
"""Update cache item in place."""
log_path = cache_item.summary_dir
log_dir = os.path.dirname(log_path)
lineage_info = LineageSummaryAnalyzer.get_summary_infos(log_path)
user_defined_info = LineageSummaryAnalyzer.get_user_defined_info(log_path)
lineage_obj = LineageObj(
log_dir,
train_lineage=lineage_info.train_lineage,
evaluation_lineage=lineage_info.eval_lineage,
dataset_graph=lineage_info.dataset_graph,
user_defined_info=user_defined_info
)
cache_item.set(key="lineage", value=lineage_obj)
summary_base_dir = cache_item.summary_base_dir
summary_dir = cache_item.summary_dir
update_time = cache_item.basic_info.update_time
# The summary_base_dir and summary_dir have been normalized in data_manager.
if summary_base_dir == summary_dir:
relative_path = "./"
else:
relative_path = f'./{os.path.basename(summary_dir)}'
try:
cached_added_info = cache_item.get(key=LINEAGE).added_info
except ParamValueError:
cached_added_info = None
try:
lineage_parser = LineageParser(summary_dir, update_time, cached_added_info)
super_lineage_obj = lineage_parser.super_lineage_obj
except LineageFileNotFoundError:
super_lineage_obj = None
if super_lineage_obj is None:
logger.warning("There is no lineage to update in tran job %s.", relative_path)
return
with cache_item.lock_key(LINEAGE):
cache_item.set(key=LINEAGE, value=super_lineage_obj)
......@@ -15,8 +15,11 @@
"""Lineage utils."""
from functools import wraps
from mindinsight.lineagemgr.common.log import logger as log
from mindinsight.lineagemgr.common.exceptions.exceptions import LineageParamRunContextError, \
LineageGetModelFileError, LineageLogError
LineageGetModelFileError, LineageLogError, LineageParamValueError, LineageDirNotExistError, \
LineageParamSummaryPathError
from mindinsight.lineagemgr.common.validator.validate import validate_path
from mindinsight.utils.exceptions import MindInsightException
......@@ -54,3 +57,14 @@ def try_except(logger):
return wrapper
return try_except_decorate
def normalize_summary_dir(summary_dir):
"""Normalize summary dir."""
try:
summary_dir = validate_path(summary_dir)
except (LineageParamValueError, LineageDirNotExistError) as error:
log.error(str(error))
log.exception(error)
raise LineageParamSummaryPathError(str(error.message))
return summary_dir
......@@ -23,7 +23,7 @@ from mindinsight.lineagemgr.common.exceptions.exceptions import LineageParamMiss
from mindinsight.lineagemgr.common.log import logger as log
from mindinsight.lineagemgr.common.validator.validate_path import safe_normalize_path
from mindinsight.lineagemgr.querier.query_model import FIELD_MAPPING
from mindinsight.utils.exceptions import MindInsightException
from mindinsight.utils.exceptions import MindInsightException, ParamValueError
try:
from mindspore.nn import Cell
......@@ -437,3 +437,26 @@ def validate_user_defined_info(user_defined_info):
if len(field_map) + len(user_defined_keys) != len(all_keys):
raise LineageParamValueError("There are some keys have defined in lineage.")
def validate_train_id(relative_path):
"""
Check if train_id is valid.
Args:
relative_path (str): Train ID of a summary directory, e.g. './log1'.
Returns:
bool, if train id is valid, return True.
"""
if not relative_path.startswith('./'):
log.warning("The relative_path does not start with './'.")
raise ParamValueError(
"Summary dir should be relative path starting with './'."
)
if len(relative_path.split("/")) > 2:
log.warning("The relative_path contains multiple '/'.")
raise ParamValueError(
"Summary dir should be relative path starting with './'."
)
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""This file is used to parse lineage info."""
import os
from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher
from mindinsight.lineagemgr.common.exceptions.exceptions import LineageSummaryAnalyzeException, \
LineageEventNotExistException, LineageEventFieldNotExistException, LineageFileNotFoundError, \
MindInsightException
from mindinsight.lineagemgr.common.log import logger
from mindinsight.lineagemgr.common.path_parser import SummaryPathParser
from mindinsight.lineagemgr.summary.lineage_summary_analyzer import LineageSummaryAnalyzer
from mindinsight.lineagemgr.querier.query_model import LineageObj
from mindinsight.utils.exceptions import ParamValueError
LINEAGE = "lineage"
class SuperLineageObj:
"""This is an object for LineageObj and its additional info."""
def __init__(self, lineage_obj, update_time, added_info=None):
self._lineage_obj = lineage_obj
self._update_time = update_time
self._added_info = added_info if added_info is not None else dict()
@property
def lineage_obj(self):
"""Get lineage object."""
return self._lineage_obj
@property
def added_info(self):
"""Get added info."""
return self._added_info
@added_info.setter
def added_info(self, added_info):
"""Set added info."""
self._added_info = added_info
@property
def update_time(self):
"""Get update time."""
return self._update_time
class LineageParser:
"""Lineage parser."""
def __init__(self, summary_dir, update_time=None, added_info=None):
self._super_lineage_obj = None
self._summary_dir = summary_dir
self._update_time = update_time
self._added_info = added_info
self._parse_summary_log()
def _parse_summary_log(self):
"""
Parse the single summary log.
Returns:
bool, `True` if parse summary log success, else `False`.
"""
file_path = SummaryPathParser.get_latest_lineage_summary(self._summary_dir)
if file_path is None:
logger.warning('There is no summary log file under summary_dir %s.', self._summary_dir)
raise LineageFileNotFoundError(
'There is no summary log file under summary_dir.'
)
try:
lineage_info = LineageSummaryAnalyzer.get_summary_infos(file_path)
user_defined_info = LineageSummaryAnalyzer.get_user_defined_info(file_path)
lineage_obj = LineageObj(
self._summary_dir,
train_lineage=lineage_info.train_lineage,
evaluation_lineage=lineage_info.eval_lineage,
dataset_graph=lineage_info.dataset_graph,
user_defined_info=user_defined_info
)
self._super_lineage_obj = SuperLineageObj(lineage_obj, self._update_time, self._added_info)
except (LineageSummaryAnalyzeException,
LineageEventNotExistException,
LineageEventFieldNotExistException):
logger.warning("Parse file failed under summary_dir %s.", self._summary_dir)
except MindInsightException as error:
logger.error(str(error))
logger.exception(error)
logger.warning("Parse file failed under summary_dir %s.", self._summary_dir)
@property
def super_lineage_obj(self):
"""Get super lineage object."""
return self._super_lineage_obj
class LineageOrganizer:
"""Lineage organizer."""
def __init__(self, data_manager=None, summary_base_dir=None):
self._data_manager = data_manager
self._summary_base_dir = summary_base_dir
self._check_params()
self._super_lineage_objs = {}
self._organize_from_cache()
self._organize_from_disk()
def _check_params(self):
"""Check params."""
if self._data_manager is not None and self._summary_base_dir is not None:
self._summary_base_dir = None
def _organize_from_disk(self):
"""Organize lineage objs from disk."""
if self._summary_base_dir is None:
return
summary_watcher = SummaryWatcher()
relative_dirs = summary_watcher.list_summary_directories(
summary_base_dir=self._summary_base_dir
)
no_lineage_count = 0
for item in relative_dirs:
relative_dir = item.get('relative_path')
update_time = item.get('update_time')
abs_summary_dir = os.path.realpath(os.path.join(self._summary_base_dir, relative_dir))
try:
lineage_parser = LineageParser(abs_summary_dir, update_time)
super_lineage_obj = lineage_parser.super_lineage_obj
if super_lineage_obj is not None:
self._super_lineage_objs.update({abs_summary_dir: super_lineage_obj})
except LineageFileNotFoundError:
no_lineage_count += 1
if no_lineage_count == len(relative_dirs):
logger.error('There is no summary log file under summary_base_dir.')
raise LineageFileNotFoundError(
'There is no summary log file under summary_base_dir.'
)
def _organize_from_cache(self):
"""Organize lineage objs from cache."""
if self._data_manager is None:
return
brief_cache = self._data_manager.get_brief_cache()
cache_items = brief_cache.cache_items
for relative_dir, cache_train_job in cache_items.items():
try:
super_lineage_obj = cache_train_job.get("lineage")
self._super_lineage_objs.update({relative_dir: super_lineage_obj})
except ParamValueError:
logger.info("This is no lineage info in train job %s.", relative_dir)
@property
def super_lineage_objs(self):
"""Get super lineage objects."""
return self._super_lineage_objs
def get_super_lineage_obj(self, relative_path):
"""Get super lineage object by given relative path."""
return self._super_lineage_objs.get(relative_path)
......@@ -16,17 +16,11 @@
import enum
import functools
import operator
import os
from mindinsight.lineagemgr.common.exceptions.exceptions import \
LineageParamTypeError, LineageSummaryAnalyzeException, \
LineageEventNotExistException, LineageQuerierParamException, \
LineageSummaryParseException, LineageEventFieldNotExistException
from mindinsight.lineagemgr.common.log import logger
from mindinsight.lineagemgr.common.exceptions.exceptions import LineageQuerierParamException, LineageParamTypeError
from mindinsight.lineagemgr.common.utils import enum_to_list
from mindinsight.lineagemgr.querier.query_model import LineageObj, FIELD_MAPPING
from mindinsight.lineagemgr.summary.lineage_summary_analyzer import \
LineageSummaryAnalyzer
from mindinsight.lineagemgr.lineage_parser import SuperLineageObj
from mindinsight.lineagemgr.querier.query_model import FIELD_MAPPING
@enum.unique
......@@ -173,20 +167,24 @@ class Querier:
See the method `filter_summary_lineage` for supported fields.
Args:
summary_path (Union[str, list[str]]): The single summary log path or
a list of summary log path.
super_lineage_objs (dict): A dict of <summary_dir, SuperLineageObject>.
Raises:
LineageParamTypeError: If the input parameter type is invalid.
LineageQuerierParamException: If the input parameter value is invalid.
LineageSummaryParseException: If all summary logs parsing failed.
"""
def __init__(self, summary_path):
self._lineage_objects = []
self._index_map = {}
self._parse_failed_paths = []
self._parse_summary_logs(summary_path)
self._size = len(self._lineage_objects)
def __init__(self, super_lineage_objs):
self._super_lineage_objs = self._check_objs(super_lineage_objs)
def _check_objs(self, super_lineage_objs):
if super_lineage_objs is None:
raise LineageQuerierParamException(
'querier_init_param', 'The querier init param is empty.'
)
if not isinstance(super_lineage_objs, dict):
raise LineageParamTypeError("Init param should be a dict.")
return super_lineage_objs
def get_summary_lineage(self, summary_dir=None, filter_keys=None):
"""
......@@ -209,7 +207,6 @@ class Querier:
Returns:
list[dict], summary lineage information.
"""
self._parse_fail_summary_logs()
if filter_keys is None:
filter_keys = LineageFilterKey.get_key_list()
......@@ -222,20 +219,20 @@ class Querier:
if summary_dir is None:
result = [
item.get_summary_info(filter_keys) for item in self._lineage_objects
item.lineage_obj.get_summary_info(filter_keys) for item in self._super_lineage_objs.values()
]
else:
index = self._index_map.get(summary_dir)
if index is None:
raise LineageQuerierParamException(
'summary_dir',
'Summary dir {} does not exist.'.format(summary_dir)
)
lineage_obj = self._lineage_objects[index]
elif summary_dir in self._super_lineage_objs:
lineage_obj = self._super_lineage_objs[summary_dir].lineage_obj
result = [lineage_obj.get_summary_info(filter_keys)]
else:
raise LineageQuerierParamException(
'summary_dir',
'Summary dir {} does not exist.'.format(summary_dir)
)
return result
def filter_summary_lineage(self, condition=None):
def filter_summary_lineage(self, condition=None, added=False):
"""
Filter and sort lineage information based on the specified condition.
......@@ -253,7 +250,7 @@ class Querier:
Returns:
dict, filtered and sorted model lineage information.
"""
def _filter(lineage_obj: LineageObj):
def _filter(super_lineage_obj: SuperLineageObj):
for condition_key, condition_value in condition.items():
if ConditionParam.is_condition_type(condition_key):
continue
......@@ -263,7 +260,7 @@ class Querier:
'The field {} not supported'.format(condition_key)
)
value = lineage_obj.get_value_by_key(condition_key)
value = super_lineage_obj.lineage_obj.get_value_by_key(condition_key)
for exp_key, exp_value in condition_value.items():
if not ExpressionType.is_valid_exp(exp_key):
raise LineageQuerierParamException(
......@@ -274,9 +271,9 @@ class Querier:
return False
return True
def _cmp(obj1: LineageObj, obj2: LineageObj):
value1 = obj1.get_value_by_key(sorted_name)
value2 = obj2.get_value_by_key(sorted_name)
def _cmp(obj1: SuperLineageObj, obj2: SuperLineageObj):
value1 = obj1.lineage_obj.get_value_by_key(sorted_name)
value2 = obj2.lineage_obj.get_value_by_key(sorted_name)
if value1 is None and value2 is None:
cmp_result = 0
......@@ -293,11 +290,14 @@ class Querier:
cmp_result = (type1 > type2) - (type1 < type2)
return cmp_result
self._parse_fail_summary_logs()
if condition is None:
condition = {}
results = list(filter(_filter, self._lineage_objects))
self._add_dataset_mark()
super_lineage_objs = list(self._super_lineage_objs.values())
super_lineage_objs.sort(key=lambda x: x.update_time, reverse=True)
results = list(filter(_filter, super_lineage_objs))
if ConditionParam.SORTED_NAME.value in condition:
sorted_name = condition.get(ConditionParam.SORTED_NAME.value)
......@@ -323,9 +323,11 @@ class Querier:
for item in offset_results:
lineage_object = dict()
if LineageType.MODEL.value in lineage_types:
lineage_object.update(item.to_model_lineage_dict())
lineage_object.update(item.lineage_obj.to_model_lineage_dict())
if LineageType.DATASET.value in lineage_types:
lineage_object.update(item.to_dataset_lineage_dict())
lineage_object.update(item.lineage_obj.to_dataset_lineage_dict())
if added:
lineage_object.update({"added_info": item.added_info})
object_items.append(lineage_object)
lineage_info = {
......@@ -341,7 +343,7 @@ class Querier:
customized = dict()
for offset_result in offset_results:
for obj_name in ["metric", "user_defined"]:
self._organize_customized_item(customized, offset_result, obj_name)
self._organize_customized_item(customized, offset_result.lineage_obj, obj_name)
# If types contain numbers and string, it will be "mixed".
# If types contain "int" and "float", it will be "float".
......@@ -410,10 +412,10 @@ class Querier:
Args:
condition (dict): Filter and sort condition.
result (list[LineageObj]): Filtered and sorted result.
result (list[SuperLineageObj]): Filtered and sorted result.
Returns:
list[LineageObj], paginated result.
list[SuperLineageObj], paginated result.
"""
offset = 0
limit = 10
......@@ -428,87 +430,12 @@ class Querier:
offset_result = result[offset * limit: limit * (offset + 1)]
return offset_result
def _parse_summary_logs(self, summary_path):
"""
Parse summary logs.
Args:
summary_path (Union[str, list[str]]): The single summary log path or
a list of summary log path.
"""
if not summary_path:
raise LineageQuerierParamException(
'summary_path', 'The summary path is empty.'
)
if isinstance(summary_path, str):
self._parse_summary_log(summary_path, 0)
elif isinstance(summary_path, list):
index = 0
for path in summary_path:
parse_result = self._parse_summary_log(path, index)
if parse_result:
index += 1
else:
raise LineageParamTypeError('Summary path is not str or list.')
if self._parse_failed_paths:
logger.info('Parse failed paths: %s', str(self._parse_failed_paths))
if not self._lineage_objects:
raise LineageSummaryParseException()
def _parse_summary_log(self, log_path, index: int, is_save_fail_path=True):
"""
Parse the single summary log.
Args:
log_path (str): The single summary log path.
index (int): TrainInfo instance index in the train info list.
is_save_fail_path (bool): Set whether to save the failed summary
path. Default: True.
Returns:
bool, `True` if parse summary log success, else `False`.
"""
log_dir = os.path.dirname(log_path)
try:
lineage_info = LineageSummaryAnalyzer.get_summary_infos(log_path)
user_defined_info = LineageSummaryAnalyzer.get_user_defined_info(log_path)
lineage_obj = LineageObj(
log_dir,
train_lineage=lineage_info.train_lineage,
evaluation_lineage=lineage_info.eval_lineage,
dataset_graph=lineage_info.dataset_graph,
user_defined_info=user_defined_info
)
self._lineage_objects.append(lineage_obj)
self._add_dataset_mark()
self._index_map[log_dir] = index
return True
except (LineageSummaryAnalyzeException,
LineageEventNotExistException,
LineageEventFieldNotExistException):
if is_save_fail_path:
self._parse_failed_paths.append(log_path)
return False
def _parse_fail_summary_logs(self):
"""Parse fail summary logs."""
if self._parse_failed_paths:
failed_paths = []
for path in self._parse_failed_paths:
parse_result = self._parse_summary_log(path, self._size, False)
if parse_result:
self._size += 1
else:
failed_paths.append(path)
self._parse_failed_paths = failed_paths
def _add_dataset_mark(self):
"""Add dataset mark into LineageObj."""
# give a dataset mark for each dataset graph in lineage information
marked_dataset_group = {'1': None}
for lineage in self._lineage_objects:
for super_lineage_obj in self._super_lineage_objs.values():
lineage = super_lineage_obj.lineage_obj
dataset_mark = '0'
for dataset_graph_mark, marked_dataset_graph in marked_dataset_group.items():
if marked_dataset_graph == lineage.dataset_graph:
......
......@@ -298,6 +298,7 @@ class LineageObj:
result[self._name_user_defined] = self.user_defined
# add dataset_graph into filtration result
result[self._name_dataset_graph] = getattr(self, self._name_dataset_graph)
return result
def _parse_train_lineage(self, train_lineage):
......
......@@ -356,6 +356,7 @@ class TestModelApi(TestCase):
assert expect_result == res
expect_result = {
'customized': {},
'object': [],
'count': 0
}
......
......@@ -70,7 +70,7 @@ class TestSearchModel(TestCase):
self.url = '/v1/mindinsight/lineagemgr/lineages'
@mock.patch('mindinsight.backend.lineagemgr.lineage_api.settings')
@mock.patch('mindinsight.backend.lineagemgr.lineage_api.filter_summary_lineage')
@mock.patch('mindinsight.backend.lineagemgr.lineage_api.general_filter_summary_lineage')
def test_search_model_success(self, *args):
"""Test the success of model_success."""
base_dir = '/path/to/test_lineage_summary_dir_base'
......@@ -113,7 +113,7 @@ class TestSearchModel(TestCase):
self.assertDictEqual(expect_result, response.get_json())
@mock.patch('mindinsight.backend.lineagemgr.lineage_api.settings')
@mock.patch('mindinsight.backend.lineagemgr.lineage_api.filter_summary_lineage')
@mock.patch('mindinsight.backend.lineagemgr.lineage_api.general_filter_summary_lineage')
def test_search_model_fail(self, *args):
"""Test the function of model_lineage with exception."""
response = self.app_client.post(self.url, data='xxx')
......
......@@ -29,12 +29,12 @@ class TestModel(TestCase):
"""Test the function of get_summary_lineage and filter_summary_lineage."""
@mock.patch('mindinsight.lineagemgr.api.model.Querier')
@mock.patch('mindinsight.lineagemgr.api.model.SummaryPathParser.get_latest_lineage_summary')
@mock.patch('mindinsight.lineagemgr.api.model.LineageParser')
@mock.patch('os.path.isdir')
def test_get_summary_lineage_success(self, isdir_mock, latest_summary_mock, qurier_mock):
def test_get_summary_lineage_success(self, isdir_mock, parser_mock, qurier_mock):
"""Test the function of get_summary_lineage."""
isdir_mock.return_value = True
latest_summary_mock.return_value = '/path/to/summary_dir/a_MS_lineage'
parser_mock.return_value = MagicMock()
mock_querier = MagicMock()
qurier_mock.return_value = mock_querier
......@@ -53,7 +53,7 @@ class TestModel(TestCase):
invalid_path
)
@mock.patch('mindinsight.lineagemgr.api.model.validate_path')
@mock.patch('mindinsight.lineagemgr.common.utils.validate_path')
@mock.patch.object(SummaryPathParser, 'get_latest_lineage_summary')
def test_get_summary_lineage_failed2(self, mock_summary, mock_valid):
"""Test get_summary_lineage failed."""
......@@ -66,29 +66,19 @@ class TestModel(TestCase):
'/path/to/summary_dir'
)
@mock.patch('mindinsight.lineagemgr.api.model.validate_path')
@mock.patch('mindinsight.lineagemgr.api.model.Querier')
@mock.patch('mindinsight.lineagemgr.lineage_parser.LineageParser._parse_summary_log')
@mock.patch('mindinsight.lineagemgr.common.utils.validate_path')
@mock.patch.object(SummaryPathParser, 'get_latest_lineage_summary')
def test_get_summary_lineage_failed3(self,
mock_summary,
mock_querier,
mock_valid):
mock_valid,
mock_paser):
"""Test get_summary_lineage failed."""
mock_summary.return_value = '/path/to/summary/file'
mock_querier.return_value.get_summary_lineage.side_effect = \
LineageSummaryParseException()
mock_valid.return_value = '/path/to/summary_dir'
res = get_summary_lineage('/path/to/summary_dir')
assert res == {}
mock_querier.side_effect = LineageQuerierParamException(
['keys'], 'key')
self.assertRaisesRegex(
LineageQuerySummaryDataError,
'Get summary lineage failed',
get_summary_lineage,
'/path/to/summary_dir'
)
mock_paser.return_value = None
result = get_summary_lineage('/path/to/summary_dir')
assert {} == result
@mock.patch('mindinsight.lineagemgr.api.model.validate_path')
def test_convert_relative_path_to_abspath(self, validate_path_mock):
......@@ -135,13 +125,13 @@ class TestModel(TestCase):
class TestFilterAPI(TestCase):
"""Test the function of filter_summary_lineage."""
@mock.patch('mindinsight.lineagemgr.api.model.LineageOrganizer')
@mock.patch('mindinsight.lineagemgr.api.model.Querier')
@mock.patch('mindinsight.lineagemgr.api.model.SummaryPathParser.get_latest_lineage_summaries')
@mock.patch('mindinsight.lineagemgr.lineage_parser.SummaryPathParser.get_latest_lineage_summary')
@mock.patch('mindinsight.lineagemgr.api.model._convert_relative_path_to_abspath')
@mock.patch('mindinsight.lineagemgr.api.model.validate_path')
@mock.patch('mindinsight.lineagemgr.api.model.normalize_summary_dir')
def test_filter_summary_lineage(self, validate_path_mock, convert_path_mock,
latest_summary_mock, qurier_mock):
latest_summary_mock, qurier_mock, organizer_mock):
"""Test the function of filter_summary_lineage."""
convert_path_mock.return_value = {
'summary_dir': {
......@@ -151,6 +141,8 @@ class TestFilterAPI(TestCase):
'gt': 2.0
}
}
organizer_mock = MagicMock()
organizer_mock.super_lineage_objs = None
validate_path_mock.return_value = True
latest_summary_mock.return_value = ['/path/to/summary_base_dir/summary_dir']
mock_querier = MagicMock()
......@@ -172,7 +164,7 @@ class TestFilterAPI(TestCase):
)
@mock.patch('mindinsight.lineagemgr.api.model.validate_condition')
@mock.patch('mindinsight.lineagemgr.api.model.validate_path')
@mock.patch('mindinsight.lineagemgr.api.model.normalize_summary_dir')
def test_invalid_search_condition(self, mock_path, mock_valid):
"""Test filter_summary_lineage with invalid invalid param."""
mock_path.return_value = None
......@@ -188,7 +180,7 @@ class TestFilterAPI(TestCase):
@mock.patch('mindinsight.lineagemgr.api.model.validate_search_model_condition')
@mock.patch('mindinsight.lineagemgr.api.model.validate_condition')
@mock.patch('mindinsight.lineagemgr.api.model.validate_path')
@mock.patch('mindinsight.lineagemgr.common.utils.validate_path')
@mock.patch('mindinsight.lineagemgr.api.model._convert_relative_path_to_abspath')
def test_failed_to_convert_path(self, mock_convert, *args):
"""Test filter_summary_lineage with invalid invalid param."""
......@@ -205,23 +197,24 @@ class TestFilterAPI(TestCase):
@mock.patch('mindinsight.lineagemgr.api.model._convert_relative_path_to_abspath')
@mock.patch('mindinsight.lineagemgr.api.model.validate_search_model_condition')
@mock.patch('mindinsight.lineagemgr.api.model.validate_condition')
@mock.patch('mindinsight.lineagemgr.api.model.validate_path')
@mock.patch.object(SummaryPathParser, 'get_latest_lineage_summaries')
@mock.patch('mindinsight.lineagemgr.api.model.normalize_summary_dir')
@mock.patch.object(SummaryPathParser, 'get_latest_lineage_summary')
def test_failed_to_get_summary_filesh(self, mock_parse, *args):
"""Test filter_summary_lineage with invalid invalid param."""
mock_parse.return_value = []
args[0].return_value = None
path = '/path/to/summary/dir'
mock_parse.return_value = None
args[0].return_value = path
self.assertRaisesRegex(
LineageFileNotFoundError,
'There is no summary log file under summary_base_dir.',
filter_summary_lineage,
'/path/to/summary/dir'
path
)
@mock.patch('mindinsight.lineagemgr.api.model._convert_relative_path_to_abspath')
@mock.patch('mindinsight.lineagemgr.api.model.validate_search_model_condition')
@mock.patch('mindinsight.lineagemgr.api.model.validate_condition')
@mock.patch('mindinsight.lineagemgr.api.model.validate_path')
@mock.patch('mindinsight.lineagemgr.api.model.normalize_summary_dir')
@mock.patch.object(SummaryPathParser, 'get_latest_lineage_summaries')
@mock.patch('mindinsight.lineagemgr.api.model.Querier')
def test_failed_to_querier(self, mock_query, mock_parse, *args):
......
......@@ -13,14 +13,15 @@
# limitations under the License.
# ============================================================================
"""Test the querier module."""
import time
from unittest import TestCase, mock
from google.protobuf.json_format import ParseDict
import mindinsight.datavisual.proto_files.mindinsight_lineage_pb2 as summary_pb2
from mindinsight.lineagemgr.common.exceptions.exceptions import (LineageParamTypeError, LineageQuerierParamException,
LineageSummaryAnalyzeException,
LineageSummaryParseException)
from mindinsight.lineagemgr.common.exceptions.exceptions import LineageParamTypeError, LineageQuerierParamException
from mindinsight.lineagemgr.lineage_parser import LineageOrganizer
from mindinsight.lineagemgr.querier.querier import Querier
from mindinsight.lineagemgr.summary.lineage_summary_analyzer import LineageInfo
......@@ -246,8 +247,11 @@ LINEAGE_FILTRATION_6 = {
class TestQuerier(TestCase):
"""Test the class of `Querier`."""
@mock.patch('mindinsight.lineagemgr.querier.querier.LineageSummaryAnalyzer.get_user_defined_info')
@mock.patch('mindinsight.lineagemgr.querier.querier.LineageSummaryAnalyzer.get_summary_infos')
@mock.patch('mindinsight.lineagemgr.lineage_parser.SummaryPathParser.get_latest_lineage_summary')
@mock.patch('mindinsight.lineagemgr.lineage_parser.SummaryWatcher.list_summary_directories')
@mock.patch('mindinsight.lineagemgr.lineage_parser.LineageSummaryAnalyzer.get_user_defined_info')
@mock.patch('mindinsight.lineagemgr.lineage_parser.LineageSummaryAnalyzer.get_summary_infos')
def setUp(self, *args):
"""Initialization before test case execution."""
args[0].return_value = create_lineage_info(
......@@ -256,22 +260,22 @@ class TestQuerier(TestCase):
event_data.EVENT_DATASET_DICT_0
)
args[1].return_value = []
args[3].return_value = 'path'
single_summary_path = '/path/to/summary0/log0'
self.single_querier = Querier(single_summary_path)
args[2].return_value = [{'relative_path': './', 'update_time': 1}]
single_summary_path = '/path/to/summary0'
lineage_objects = LineageOrganizer(summary_base_dir=single_summary_path).super_lineage_objs
self.single_querier = Querier(lineage_objects)
lineage_infos = get_lineage_infos()
args[0].side_effect = lineage_infos
summary_paths = [
'/path/to/summary0/log0',
'/path/to/summary1/log1',
'/path/to/summary2/log2',
'/path/to/summary3/log3',
'/path/to/summary4/log4',
'/path/to/summary5/log5',
'/path/to/summary6/log6'
]
self.multi_querier = Querier(summary_paths)
summary_base_dir = '/path/to'
relative_dirs = []
for i in range(7):
relative_dirs.append(dict(relative_path=f'./summary{i}', update_time=time.time() - i))
args[2].return_value = relative_dirs
lineage_objects = LineageOrganizer(summary_base_dir=summary_base_dir).super_lineage_objs
self.multi_querier = Querier(lineage_objects)
def test_get_summary_lineage_success_1(self):
"""Test the success of get_summary_lineage."""
......@@ -282,9 +286,7 @@ class TestQuerier(TestCase):
def test_get_summary_lineage_success_2(self):
"""Test the success of get_summary_lineage."""
expected_result = [LINEAGE_INFO_0]
result = self.single_querier.get_summary_lineage(
summary_dir='/path/to/summary0'
)
result = self.single_querier.get_summary_lineage()
self.assertListEqual(expected_result, result)
def test_get_summary_lineage_success_3(self):
......@@ -601,60 +603,12 @@ class TestQuerier(TestCase):
condition=condition
)
@mock.patch('mindinsight.lineagemgr.querier.querier.LineageSummaryAnalyzer.get_summary_infos')
def test_init_fail(self, *args):
def test_init_fail(self):
"""Test the function of init with exception."""
summary_path = {'xxx': 1}
obj_dict = 'a'
with self.assertRaises(LineageParamTypeError):
Querier(summary_path)
Querier(obj_dict)
summary_path = None
obj_dict = None
with self.assertRaises(LineageQuerierParamException):
Querier(summary_path)
args[0].side_effect = LineageSummaryAnalyzeException
summary_path = '/path/to/summary0/log0'
with self.assertRaises(LineageSummaryParseException):
Querier(summary_path)
@mock.patch('mindinsight.lineagemgr.querier.querier.LineageSummaryAnalyzer.get_user_defined_info')
@mock.patch('mindinsight.lineagemgr.querier.querier.LineageSummaryAnalyzer.get_summary_infos')
def test_parse_fail_summary_logs_1(self, *args):
"""Test the function of parsing fail summary logs."""
lineage_infos = get_lineage_infos()
args[0].side_effect = lineage_infos
args[1].return_value = []
summary_path = ['/path/to/summary0/log0']
querier = Querier(summary_path)
querier._parse_failed_paths.append('/path/to/summary1/log1')
expected_result = [
LINEAGE_INFO_0,
LINEAGE_INFO_1
]
result = querier.get_summary_lineage()
self.assertListEqual(expected_result, result)
self.assertListEqual([], querier._parse_failed_paths)
@mock.patch('mindinsight.lineagemgr.querier.querier.LineageSummaryAnalyzer.get_user_defined_info')
@mock.patch('mindinsight.lineagemgr.querier.querier.LineageSummaryAnalyzer.get_summary_infos')
def test_parse_fail_summary_logs_2(self, *args):
"""Test the function of parsing fail summary logs."""
args[0].return_value = create_lineage_info(
event_data.EVENT_TRAIN_DICT_0,
event_data.EVENT_EVAL_DICT_0,
event_data.EVENT_DATASET_DICT_0,
)
args[1].return_value = []
summary_path = ['/path/to/summary0/log0']
querier = Querier(summary_path)
querier._parse_failed_paths.append('/path/to/summary1/log1')
args[0].return_value = create_lineage_info(None, None, None)
expected_result = [LINEAGE_INFO_0]
result = querier.get_summary_lineage()
self.assertListEqual(expected_result, result)
self.assertListEqual(
['/path/to/summary1/log1'], querier._parse_failed_paths
)
Querier(obj_dict)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册