Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
MindSpore
mindinsight
提交
6f599aa1
M
mindinsight
项目概览
MindSpore
/
mindinsight
通知
7
Star
3
Fork
2
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
M
mindinsight
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
6f599aa1
编写于
5月 20, 2020
作者:
M
mindspore-ci-bot
提交者:
Gitee
5月 20, 2020
浏览文件
操作
浏览文件
下载
差异文件
!134 modify lineage parsing for multi lineage files
Merge pull request !134 from luopengting/lineage_parsing
上级
1f17c699
665f2d68
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
263 addition
and
264 deletion
+263
-264
mindinsight/datavisual/data_transform/data_manager.py
mindinsight/datavisual/data_transform/data_manager.py
+28
-9
mindinsight/lineagemgr/api/model.py
mindinsight/lineagemgr/api/model.py
+1
-4
mindinsight/lineagemgr/cache_item_updater.py
mindinsight/lineagemgr/cache_item_updater.py
+32
-15
mindinsight/lineagemgr/common/path_parser.py
mindinsight/lineagemgr/common/path_parser.py
+27
-111
mindinsight/lineagemgr/common/utils.py
mindinsight/lineagemgr/common/utils.py
+8
-0
mindinsight/lineagemgr/lineage_parser.py
mindinsight/lineagemgr/lineage_parser.py
+76
-20
mindinsight/lineagemgr/querier/query_model.py
mindinsight/lineagemgr/querier/query_model.py
+28
-9
mindinsight/lineagemgr/summary/lineage_summary_analyzer.py
mindinsight/lineagemgr/summary/lineage_summary_analyzer.py
+0
-2
tests/st/func/lineagemgr/api/test_model_api.py
tests/st/func/lineagemgr/api/test_model_api.py
+15
-14
tests/st/func/lineagemgr/collection/model/test_model_lineage.py
...st/func/lineagemgr/collection/model/test_model_lineage.py
+1
-4
tests/ut/lineagemgr/api/test_model.py
tests/ut/lineagemgr/api/test_model.py
+14
-10
tests/ut/lineagemgr/common/test_path_parser.py
tests/ut/lineagemgr/common/test_path_parser.py
+25
-63
tests/ut/lineagemgr/querier/test_querier.py
tests/ut/lineagemgr/querier/test_querier.py
+8
-3
未找到文件。
mindinsight/datavisual/data_transform/data_manager.py
浏览文件 @
6f599aa1
...
...
@@ -72,7 +72,7 @@ class _BasicTrainJob:
self
.
_update_time
=
update_time
@
property
def
summary_dir
(
self
):
def
abs_
summary_dir
(
self
):
"""Get summary directory path."""
return
self
.
_abs_summary_dir
...
...
@@ -131,9 +131,9 @@ class CachedTrainJob:
return
self
.
_last_access_time
@
property
def
summary_dir
(
self
):
def
abs_
summary_dir
(
self
):
"""Get summary directory path."""
return
self
.
_basic_info
.
summary_dir
return
self
.
_basic_info
.
abs_
summary_dir
@
property
def
summary_base_dir
(
self
):
...
...
@@ -144,12 +144,33 @@ class CachedTrainJob:
"""Set value to cache."""
self
.
_content
[
key
]
=
value
def
get
(
self
,
key
):
"""Get value from cache."""
def
delete
(
self
,
key
):
"""Delete key in cache."""
if
key
in
self
.
_content
:
self
.
_content
.
pop
(
key
)
def
get
(
self
,
key
,
raise_exception
=
True
):
"""
Get value from cache.
Args:
key (str): Key of content.
raise_exception (bool): If the key does not exist and
raise_exception is True, it will raise an Exception.
Returns:
Union[Object, None], Return value if key in content,
return False else if raise_exception is False.
Raises:
ParamValueError, if the key does not exist and raise_exception is True.
"""
try
:
return
self
.
_content
[
key
]
except
KeyError
:
raise
ParamValueError
(
"Invalid cache key({})."
.
format
(
key
))
if
raise_exception
:
raise
ParamValueError
(
"Invalid cache key({})."
.
format
(
key
))
return
None
@
property
def
basic_info
(
self
):
...
...
@@ -163,9 +184,7 @@ class CachedTrainJob:
def
lock_key
(
self
,
key
):
"""Threading lock with given key."""
if
key
not
in
self
.
_key_locks
:
self
.
_key_locks
[
key
]
=
threading
.
Lock
()
return
self
.
_key_locks
[
key
]
return
self
.
_key_locks
.
setdefault
(
key
,
threading
.
Lock
())
class
TrainJob
:
...
...
mindinsight/lineagemgr/api/model.py
浏览文件 @
6f599aa1
...
...
@@ -96,11 +96,8 @@ def general_get_summary_lineage(data_manager=None, summary_dir=None, keys=None):
if
data_manager
is
None
:
normalize_summary_dir
(
summary_dir
)
super_lineage_obj
=
None
if
os
.
path
.
isabs
(
summary_dir
):
super_lineage_obj
=
LineageParser
(
summary_dir
).
super_lineage_obj
el
if
data_manager
is
not
Non
e
:
el
s
e
:
validate_train_id
(
summary_dir
)
super_lineage_obj
=
LineageOrganizer
(
data_manager
=
data_manager
).
get_super_lineage_obj
(
summary_dir
)
...
...
mindinsight/lineagemgr/cache_item_updater.py
浏览文件 @
6f599aa1
...
...
@@ -27,15 +27,19 @@ def update_lineage_object(data_manager, train_id, added_info: dict):
"""Update lineage objects about tag and remark."""
validate_train_id
(
train_id
)
cache_item
=
data_manager
.
get_brief_train_job
(
train_id
)
cached_added_info
=
cache_item
.
get
(
key
=
LINEAGE
).
added_info
lineage_item
=
cache_item
.
get
(
key
=
LINEAGE
,
raise_exception
=
False
)
if
lineage_item
is
None
:
logger
.
warning
(
"Cannot update the lineage for tran job %s, because it does not exist."
,
train_id
)
raise
ParamValueError
(
"Cannot update the lineage for tran job %s, because it does not exist."
%
train_id
)
cached_added_info
=
lineage_item
.
super_lineage_obj
.
added_info
new_added_info
=
dict
(
cached_added_info
)
for
key
,
value
in
added_info
.
items
():
if
key
in
[
"tag"
,
"remark"
]:
new_added_info
.
update
({
key
:
value
})
new_added_info
.
update
({
key
:
value
})
with
cache_item
.
lock_key
(
LINEAGE
):
cache_item
.
get
(
key
=
LINEAGE
).
added_info
=
new_added_info
cache_item
.
get
(
key
=
LINEAGE
).
super_lineage_obj
.
added_info
=
new_added_info
class
LineageCacheItemUpdater
(
BaseCacheItemUpdater
):
...
...
@@ -44,8 +48,7 @@ class LineageCacheItemUpdater(BaseCacheItemUpdater):
def
update_item
(
self
,
cache_item
:
CachedTrainJob
):
"""Update cache item in place."""
summary_base_dir
=
cache_item
.
summary_base_dir
summary_dir
=
cache_item
.
summary_dir
update_time
=
cache_item
.
basic_info
.
update_time
summary_dir
=
cache_item
.
abs_summary_dir
# The summary_base_dir and summary_dir have been normalized in data_manager.
if
summary_base_dir
==
summary_dir
:
...
...
@@ -54,17 +57,31 @@ class LineageCacheItemUpdater(BaseCacheItemUpdater):
relative_path
=
f
'./
{
os
.
path
.
basename
(
summary_dir
)
}
'
try
:
cached_added_info
=
cache_item
.
get
(
key
=
LINEAGE
).
added_info
except
ParamValueError
:
cached_added_info
=
None
try
:
lineage_parser
=
LineageParser
(
summary_dir
,
update_time
,
cached_added_info
)
super_lineage_obj
=
lineage_parser
.
super_lineage_obj
lineage_parser
=
self
.
_lineage_parsing
(
cache_item
)
except
LineageFileNotFoundError
:
super_lineage_obj
=
None
with
cache_item
.
lock_key
(
LINEAGE
):
cache_item
.
delete
(
key
=
LINEAGE
)
return
super_lineage_obj
=
lineage_parser
.
super_lineage_obj
if
super_lineage_obj
is
None
:
logger
.
warning
(
"There is no lineage to update in tran job %s."
,
relative_path
)
return
with
cache_item
.
lock_key
(
LINEAGE
):
cache_item
.
set
(
key
=
LINEAGE
,
value
=
super_lineage_obj
)
cache_item
.
set
(
key
=
LINEAGE
,
value
=
lineage_parser
)
def
_lineage_parsing
(
self
,
cache_item
):
"""Parse summaries and return lineage parser."""
summary_dir
=
cache_item
.
abs_summary_dir
update_time
=
cache_item
.
basic_info
.
update_time
cached_lineage_item
=
cache_item
.
get
(
key
=
LINEAGE
,
raise_exception
=
False
)
if
cached_lineage_item
is
None
:
lineage_parser
=
LineageParser
(
summary_dir
,
update_time
)
else
:
lineage_parser
=
cached_lineage_item
with
cache_item
.
lock_key
(
LINEAGE
):
lineage_parser
.
update_time
=
update_time
lineage_parser
.
load
()
return
lineage_parser
mindinsight/lineagemgr/common/path_parser.py
浏览文件 @
6f599aa1
...
...
@@ -13,9 +13,10 @@
# limitations under the License.
# ============================================================================
"""This file provides path resolution."""
import
os
from
mindinsight.datavisual.data_transform.summary_watcher
import
SummaryWatcher
from
mindinsight.lineagemgr.common.log
import
logger
from
mindinsight.lineagemgr.common.utils
import
get_timestamp
from
mindinsight.utils.exceptions
import
MindInsightException
class
SummaryPathParser
:
...
...
@@ -29,121 +30,36 @@ class SummaryPathParser:
_LINEAGE_SUMMARY_SUFFIX_LEN
=
len
(
LINEAGE_SUMMARY_SUFFIX
)
@
staticmethod
def
get_summary_dirs
(
summary_base_dir
):
"""
Get summary dirs according to summary base dir.
Args:
summary_base_dir (str): Summary base dir.
Returns:
list[str], all summary dirs in summary base dir. The summary dir is
absolute path.
"""
summary_watcher
=
SummaryWatcher
()
relative_dirs
=
summary_watcher
.
list_summary_directories
(
summary_base_dir
=
summary_base_dir
)
summary_dirs
=
list
(
map
(
lambda
item
:
os
.
path
.
realpath
(
os
.
path
.
join
(
summary_base_dir
,
item
.
get
(
'relative_path'
))
),
relative_dirs
)
)
return
summary_dirs
@
staticmethod
def
get_latest_lineage_summary
(
summary_dir
):
def
get_lineage_summaries
(
summary_dir
,
is_sorted
=
False
,
reverse
=
True
):
"""
Get l
atest lineage summary log path
according to summary dir.
Get l
ineage summaries
according to summary dir.
Args:
summary_dir (str): Summary dir.
is_sorted (bool): If it is True, files will be sorted.
reverse (bool): If it is True, sort by timestamp increments and filename decrement.
Returns:
Union[str, None], if the lineage summary log exist, return the path,
else return None. The lineage summary log path is absolute path.
"""
summary_watcher
=
SummaryWatcher
()
summaries
=
summary_watcher
.
list_summaries
(
summary_base_dir
=
summary_dir
)
latest_file_name
=
SummaryPathParser
.
_get_latest_lineage_file
(
summaries
)
return
os
.
path
.
join
(
summary_dir
,
latest_file_name
)
\
if
latest_file_name
is
not
None
else
None
@
staticmethod
def
get_latest_lineage_summaries
(
summary_base_dir
):
"""
Get all latest lineage summary logs in summary base dir.
Args:
summary_base_dir (str): Summary base dir.
Returns:
list[str], all latest lineage summary logs in summary base dir. The
lineage summary log is absolute path.
"""
summary_watcher
=
SummaryWatcher
()
relative_dirs
=
summary_watcher
.
list_summary_directories
(
summary_base_dir
=
summary_base_dir
)
latest_summaries
=
[]
for
item
in
relative_dirs
:
relative_dir
=
item
.
get
(
'relative_path'
)
summaries
=
summary_watcher
.
list_summaries
(
summary_base_dir
=
summary_base_dir
,
relative_path
=
relative_dir
)
latest_file_name
=
SummaryPathParser
.
_get_latest_lineage_file
(
summaries
)
if
latest_file_name
is
None
:
continue
latest_file
=
os
.
path
.
realpath
(
os
.
path
.
join
(
summary_base_dir
,
relative_dir
,
latest_file_name
)
)
latest_summaries
.
append
(
latest_file
)
return
latest_summaries
@
staticmethod
def
_get_latest_lineage_file
(
summaries
):
"""
Get latest lineage summary file.
If there is a file with the suffix `LINEAGE_SUMMARY_SUFFIX`, check
whether there is a file with the same name that does not include the
suffix `LINEAGE_SUMMARY_SUFFIX`. When both exist, the file is considered
to be a lineage summary log.
Args:
summaries (list[dict]): All summary logs info in summary dir.
Returns:
str, the latest lineage summary file name.
list, if the lineage summary log exist, return the file names, else return [].
"""
try
:
latest_summary
=
max
(
summaries
,
key
=
lambda
summary
:
summary
.
get
(
'create_time'
)
)
except
ValueError
:
return
None
max_create_time
=
latest_summary
.
get
(
'create_time'
)
summary_file_names
=
[]
for
summary
in
summaries
:
if
summary
.
get
(
'create_time'
)
==
max_create_time
:
summary_file_names
.
append
(
summary
.
get
(
'file_name'
))
summary_watcher
=
SummaryWatcher
()
summaries
=
summary_watcher
.
list_summaries
(
summary_base_dir
=
summary_dir
)
except
MindInsightException
as
err
:
logger
.
warning
(
str
(
err
))
return
[]
summary_files
=
[
summary
.
get
(
'file_name'
)
for
summary
in
summaries
]
lineage_files_name
=
list
(
filter
(
lambda
filename
:
(
filename
.
endswith
(
SummaryPathParser
.
LINEAGE_SUMMARY_SUFFIX
)),
summary_files
))
if
is_sorted
:
lineage_files_name
=
SummaryPathParser
.
_sorted_summary_files
(
lineage_files_name
,
reverse
)
return
lineage_files_name
latest_lineage_name
=
None
for
name
in
summary_file_names
:
if
not
name
.
endswith
(
SummaryPathParser
.
LINEAGE_SUMMARY_SUFFIX
):
continue
ms_name
=
name
[:
-
SummaryPathParser
.
_LINEAGE_SUMMARY_SUFFIX_LEN
]
if
ms_name
in
summary_file_names
:
latest_lineage_name
=
name
return
latest_lineage_name
@
staticmethod
def
_sorted_summary_files
(
summary_files
,
reverse
):
"""Sort by timestamp increments and filename decrement."""
sorted_files
=
sorted
(
summary_files
,
key
=
lambda
filename
:
(
-
get_timestamp
(
filename
),
filename
),
reverse
=
reverse
)
return
sorted_files
mindinsight/lineagemgr/common/utils.py
浏览文件 @
6f599aa1
...
...
@@ -14,7 +14,9 @@
# ============================================================================
"""Lineage utils."""
from
functools
import
wraps
import
re
from
mindinsight.datavisual.data_transform.summary_watcher
import
SummaryWatcher
from
mindinsight.lineagemgr.common.log
import
logger
as
log
from
mindinsight.lineagemgr.common.exceptions.exceptions
import
LineageParamRunContextError
,
\
LineageGetModelFileError
,
LineageLogError
,
LineageParamValueError
,
LineageDirNotExistError
,
\
...
...
@@ -68,3 +70,9 @@ def normalize_summary_dir(summary_dir):
log
.
exception
(
error
)
raise
LineageParamSummaryPathError
(
str
(
error
.
message
))
return
summary_dir
def
get_timestamp
(
filename
):
"""Get timestamp from filename."""
timestamp
=
int
(
re
.
search
(
SummaryWatcher
().
SUMMARY_FILENAME_REGEX
,
filename
)[
1
])
return
timestamp
mindinsight/lineagemgr/lineage_parser.py
浏览文件 @
6f599aa1
...
...
@@ -21,6 +21,7 @@ from mindinsight.lineagemgr.common.exceptions.exceptions import LineageSummaryAn
MindInsightException
from
mindinsight.lineagemgr.common.log
import
logger
from
mindinsight.lineagemgr.common.path_parser
import
SummaryPathParser
from
mindinsight.lineagemgr.summary.file_handler
import
FileHandler
from
mindinsight.lineagemgr.summary.lineage_summary_analyzer
import
LineageSummaryAnalyzer
from
mindinsight.lineagemgr.querier.query_model
import
LineageObj
from
mindinsight.utils.exceptions
import
ParamValueError
...
...
@@ -30,7 +31,7 @@ LINEAGE = "lineage"
class
SuperLineageObj
:
"""This is an object for LineageObj and its additional info."""
def
__init__
(
self
,
lineage_obj
,
update_time
,
added_info
=
None
):
def
__init__
(
self
,
lineage_obj
:
LineageObj
,
update_time
,
added_info
=
None
):
self
.
_lineage_obj
=
lineage_obj
self
.
_update_time
=
update_time
self
.
_added_info
=
added_info
if
added_info
is
not
None
else
dict
()
...
...
@@ -59,11 +60,60 @@ class SuperLineageObj:
class
LineageParser
:
"""Lineage parser."""
def
__init__
(
self
,
summary_dir
,
update_time
=
None
,
added_info
=
None
):
self
.
_super_lineage_obj
=
None
self
.
_summary_dir
=
summary_dir
self
.
_
update_time
=
update_time
self
.
update_time
=
update_time
self
.
_added_info
=
added_info
self
.
_parse_summary_log
()
self
.
_init_variables
()
self
.
load
()
def
_init_variables
(
self
):
"""Init variables."""
self
.
_super_lineage_obj
=
None
self
.
_latest_filename
=
None
self
.
_latest_file_size
=
None
self
.
_cached_file_list
=
None
def
load
(
self
):
"""Find and load summaries."""
# get sorted lineage files
lineage_files
=
SummaryPathParser
.
get_lineage_summaries
(
self
.
_summary_dir
,
is_sorted
=
True
)
if
not
lineage_files
:
logger
.
warning
(
'There is no summary log file under summary_dir %s.'
,
self
.
_summary_dir
)
raise
LineageFileNotFoundError
(
'There is no summary log file under summary_dir.'
)
self
.
_init_if_files_deleted
(
lineage_files
)
index
=
0
if
self
.
_latest_filename
is
not
None
:
index
=
lineage_files
.
index
(
self
.
_latest_filename
)
for
filename
in
lineage_files
[
index
:]:
if
filename
!=
self
.
_latest_filename
:
self
.
_latest_filename
=
filename
self
.
_latest_file_size
=
0
file_path
=
os
.
path
.
join
(
self
.
_summary_dir
,
filename
)
new_size
=
FileHandler
(
file_path
).
size
if
new_size
==
self
.
_latest_file_size
:
continue
self
.
_latest_file_size
=
new_size
self
.
_parse_summary_log
()
def
_init_if_files_deleted
(
self
,
file_list
):
"""Init variables if files deleted."""
cached_file_list
=
self
.
_cached_file_list
self
.
_cached_file_list
=
file_list
if
cached_file_list
is
None
:
return
deleted_files
=
set
(
cached_file_list
)
-
set
(
file_list
)
if
deleted_files
:
logger
.
warning
(
"There are some files has been deleted, "
"all files will be reloaded in path %s."
,
self
.
_summary_dir
)
self
.
_init_variables
()
def
_parse_summary_log
(
self
):
"""
...
...
@@ -72,15 +122,22 @@ class LineageParser:
Returns:
bool, `True` if parse summary log success, else `False`.
"""
file_path
=
SummaryPathParser
.
get_latest_lineage_summary
(
self
.
_summary_dir
)
if
file_path
is
None
:
logger
.
warning
(
'There is no summary log file under summary_dir %s.'
,
self
.
_summary_dir
)
raise
LineageFileNotFoundError
(
'There is no summary log file under summary_dir.'
)
file_path
=
os
.
path
.
realpath
(
os
.
path
.
join
(
self
.
_summary_dir
,
self
.
_latest_filename
))
try
:
lineage_info
=
LineageSummaryAnalyzer
.
get_summary_infos
(
file_path
)
user_defined_info
=
LineageSummaryAnalyzer
.
get_user_defined_info
(
file_path
)
self
.
_update_lineage_obj
(
lineage_info
,
user_defined_info
)
except
LineageSummaryAnalyzeException
:
logger
.
warning
(
"Parse file failed under summary_dir %s."
,
file_path
)
except
(
LineageEventNotExistException
,
LineageEventFieldNotExistException
)
as
error
:
logger
.
warning
(
"Parse file failed under summary_dir %s. Detail: %s."
,
file_path
,
str
(
error
))
except
MindInsightException
as
error
:
logger
.
exception
(
error
)
logger
.
warning
(
"Parse file failed under summary_dir %s."
,
file_path
)
def
_update_lineage_obj
(
self
,
lineage_info
,
user_defined_info
):
"""Update lineage object."""
if
self
.
_super_lineage_obj
is
None
:
lineage_obj
=
LineageObj
(
self
.
_summary_dir
,
train_lineage
=
lineage_info
.
train_lineage
,
...
...
@@ -88,15 +145,14 @@ class LineageParser:
dataset_graph
=
lineage_info
.
dataset_graph
,
user_defined_info
=
user_defined_info
)
self
.
_super_lineage_obj
=
SuperLineageObj
(
lineage_obj
,
self
.
_update_time
,
self
.
_added_info
)
except
(
LineageSummaryAnalyzeException
,
LineageEventNotExistException
,
LineageEventFieldNotExistException
):
logger
.
warning
(
"Parse file failed under summary_dir %s."
,
self
.
_summary_dir
)
except
MindInsightException
as
error
:
logger
.
error
(
str
(
error
))
logger
.
exception
(
error
)
logger
.
warning
(
"Parse file failed under summary_dir %s."
,
self
.
_summary_dir
)
self
.
_super_lineage_obj
=
SuperLineageObj
(
lineage_obj
,
self
.
update_time
,
self
.
_added_info
)
else
:
self
.
_super_lineage_obj
.
lineage_obj
.
parse_and_update_lineage
(
train_lineage
=
lineage_info
.
train_lineage
,
evaluation_lineage
=
lineage_info
.
eval_lineage
,
dataset_graph
=
lineage_info
.
dataset_graph
,
user_defined_info
=
user_defined_info
)
@
property
def
super_lineage_obj
(
self
):
...
...
@@ -156,7 +212,7 @@ class LineageOrganizer:
cache_items
=
brief_cache
.
cache_items
for
relative_dir
,
cache_train_job
in
cache_items
.
items
():
try
:
super_lineage_obj
=
cache_train_job
.
get
(
"lineage"
)
super_lineage_obj
=
cache_train_job
.
get
(
"lineage"
)
.
super_lineage_obj
self
.
_super_lineage_objs
.
update
({
relative_dir
:
super_lineage_obj
})
except
ParamValueError
:
logger
.
info
(
"This is no lineage info in train job %s."
,
relative_dir
)
...
...
mindinsight/lineagemgr/querier/query_model.py
浏览文件 @
6f599aa1
...
...
@@ -82,6 +82,30 @@ class LineageObj:
self
.
_lineage_info
=
{
self
.
_name_summary_dir
:
summary_dir
}
self
.
_filtration_result
=
None
self
.
_init_lineage
()
self
.
parse_and_update_lineage
(
**
kwargs
)
def
_init_lineage
(
self
):
"""Init lineage info."""
# train
self
.
_lineage_info
[
self
.
_name_model
]
=
{}
self
.
_lineage_info
[
self
.
_name_algorithm
]
=
{}
self
.
_lineage_info
[
self
.
_name_hyper_parameters
]
=
{}
self
.
_lineage_info
[
self
.
_name_train_dataset
]
=
{}
# eval
self
.
_lineage_info
[
self
.
_name_metric
]
=
{}
self
.
_lineage_info
[
self
.
_name_valid_dataset
]
=
{}
# dataset graph
self
.
_lineage_info
[
self
.
_name_dataset_graph
]
=
{}
# user defined
self
.
_lineage_info
[
self
.
_name_user_defined
]
=
{}
def
parse_and_update_lineage
(
self
,
**
kwargs
):
"""Parse and update lineage."""
user_defined_info_list
=
kwargs
.
get
(
'user_defined_info'
,
[])
train_lineage
=
kwargs
.
get
(
'train_lineage'
)
evaluation_lineage
=
kwargs
.
get
(
'evaluation_lineage'
)
...
...
@@ -92,6 +116,7 @@ class LineageObj:
self
.
_parse_train_lineage
(
train_lineage
)
self
.
_parse_evaluation_lineage
(
evaluation_lineage
)
self
.
_parse_dataset_graph
(
dataset_graph
)
self
.
_filtration_result
=
self
.
_organize_filtration_result
()
@
property
...
...
@@ -309,10 +334,6 @@ class LineageObj:
train_lineage (Event): Train lineage.
"""
if
train_lineage
is
None
:
self
.
_lineage_info
[
self
.
_name_model
]
=
{}
self
.
_lineage_info
[
self
.
_name_algorithm
]
=
{}
self
.
_lineage_info
[
self
.
_name_hyper_parameters
]
=
{}
self
.
_lineage_info
[
self
.
_name_train_dataset
]
=
{}
return
event_dict
=
MessageToDict
(
...
...
@@ -341,8 +362,6 @@ class LineageObj:
evaluation_lineage (Event): Evaluation lineage.
"""
if
evaluation_lineage
is
None
:
self
.
_lineage_info
[
self
.
_name_metric
]
=
{}
self
.
_lineage_info
[
self
.
_name_valid_dataset
]
=
{}
return
event_dict
=
MessageToDict
(
...
...
@@ -364,9 +383,7 @@ class LineageObj:
Args:
dataset_graph (Event): Dataset graph.
"""
if
dataset_graph
is
None
:
self
.
_lineage_info
[
self
.
_name_dataset_graph
]
=
{}
else
:
if
dataset_graph
is
not
None
:
# convert message to dict
event_dict
=
organize_graph
(
dataset_graph
.
dataset_graph
)
if
event_dict
is
None
:
...
...
@@ -380,6 +397,8 @@ class LineageObj:
Args:
user_defined_info_list (list): user defined info list.
"""
if
not
user_defined_info_list
:
return
user_defined_infos
=
dict
()
for
user_defined_info
in
user_defined_info_list
:
user_defined_infos
.
update
(
user_defined_info
)
...
...
mindinsight/lineagemgr/summary/lineage_summary_analyzer.py
浏览文件 @
6f599aa1
...
...
@@ -204,11 +204,9 @@ class LineageSummaryAnalyzer(SummaryAnalyzer):
try
:
lineage_info
=
analyzer
.
get_latest_info
()
except
(
MindInsightException
,
IOError
,
DecodeError
)
as
err
:
log
.
error
(
"Failed to get lineage information."
)
log
.
exception
(
err
)
raise
LineageSummaryAnalyzeException
()
except
Exception
as
err
:
log
.
error
(
"Failed to get lineage information."
)
log
.
exception
(
err
)
raise
LineageSummaryAnalyzeException
()
...
...
tests/st/func/lineagemgr/api/test_model_api.py
浏览文件 @
6f599aa1
...
...
@@ -111,26 +111,26 @@ LINEAGE_FILTRATION_RUN1 = {
LINEAGE_FILTRATION_RUN2
=
{
'summary_dir'
:
os
.
path
.
join
(
BASE_SUMMARY_DIR
,
'run2'
),
'model_lineage'
:
{
'loss_function'
:
None
,
'loss_function'
:
"SoftmaxCrossEntropyWithLogits"
,
'train_dataset_path'
:
None
,
'train_dataset_count'
:
None
,
'train_dataset_count'
:
1024
,
'test_dataset_path'
:
None
,
'test_dataset_count'
:
10240
,
'user_defined'
:
{},
'network'
:
None
,
'optimizer'
:
None
,
'learning_rate'
:
None
,
'epoch'
:
None
,
'batch_size'
:
None
,
'device_num'
:
None
,
'loss'
:
None
,
'model_size'
:
None
,
'network'
:
"ResNet"
,
'optimizer'
:
"Momentum"
,
'learning_rate'
:
0.11999999731779099
,
'epoch'
:
10
,
'batch_size'
:
32
,
'device_num'
:
2
,
'loss'
:
0.029999999329447746
,
'model_size'
:
10
,
'metric'
:
{
'accuracy'
:
2.7800000000000002
},
'dataset_mark'
:
3
},
'dataset_graph'
:
{}
'dataset_graph'
:
DATASET_GRAPH
}
...
...
@@ -460,9 +460,10 @@ class TestModelApi(TestCase):
'customized'
:
event_data
.
CUSTOMIZED__0
,
'object'
:
[
LINEAGE_FILTRATION_EXCEPT_RUN
,
LINEAGE_FILTRATION_RUN1
LINEAGE_FILTRATION_RUN1
,
LINEAGE_FILTRATION_RUN2
],
'count'
:
2
'count'
:
3
}
partial_res1
=
filter_summary_lineage
(
BASE_SUMMARY_DIR
,
search_condition1
)
expect_objects
=
expect_result
.
get
(
'object'
)
...
...
@@ -746,7 +747,7 @@ class TestModelApi(TestCase):
expect_result
=
{
'customized'
:
{},
'object'
:
[],
'count'
:
1
'count'
:
2
}
partial_res2
=
filter_summary_lineage
(
BASE_SUMMARY_DIR
,
search_condition2
)
assert
expect_result
==
partial_res2
...
...
tests/st/func/lineagemgr/collection/model/test_model_lineage.py
浏览文件 @
6f599aa1
...
...
@@ -32,8 +32,7 @@ from mindinsight.lineagemgr import get_summary_lineage
from
mindinsight.lineagemgr.collection.model.model_lineage
import
TrainLineage
,
EvalLineage
,
\
AnalyzeObject
from
mindinsight.lineagemgr.common.exceptions.error_code
import
LineageErrors
from
mindinsight.lineagemgr.common.exceptions.exceptions
import
LineageParamRunContextError
,
\
LineageFileNotFoundError
from
mindinsight.lineagemgr.common.exceptions.exceptions
import
LineageParamRunContextError
from
mindinsight.utils.exceptions
import
MindInsightException
from
mindspore.application.model_zoo.resnet
import
ResNet
from
mindspore.common.tensor
import
Tensor
...
...
@@ -343,5 +342,3 @@ class TestModelLineage(TestCase):
full_file_name
=
summary_record
.
full_file_name
assert
full_file_name
.
endswith
(
'_lineage'
)
assert
os
.
path
.
isfile
(
full_file_name
)
with
self
.
assertRaisesRegex
(
LineageFileNotFoundError
,
'no summary log file'
):
get_summary_lineage
(
summary_dir
)
tests/ut/lineagemgr/api/test_model.py
浏览文件 @
6f599aa1
...
...
@@ -54,10 +54,10 @@ class TestModel(TestCase):
)
@
mock
.
patch
(
'mindinsight.lineagemgr.common.utils.validate_path'
)
@
mock
.
patch
.
object
(
SummaryPathParser
,
'get_l
atest_lineage_summary
'
)
@
mock
.
patch
.
object
(
SummaryPathParser
,
'get_l
ineage_summaries
'
)
def
test_get_summary_lineage_failed2
(
self
,
mock_summary
,
mock_valid
):
"""Test get_summary_lineage failed."""
mock_summary
.
return_value
=
None
mock_summary
.
return_value
=
[]
mock_valid
.
return_value
=
'/path/to/summary/dir'
self
.
assertRaisesRegex
(
LineageFileNotFoundError
,
...
...
@@ -66,17 +66,21 @@ class TestModel(TestCase):
'/path/to/summary_dir'
)
@
mock
.
patch
(
'mindinsight.lineagemgr.lineage_parser.FileHandler'
)
@
mock
.
patch
(
'mindinsight.lineagemgr.lineage_parser.LineageParser._parse_summary_log'
)
@
mock
.
patch
(
'mindinsight.lineagemgr.common.utils.validate_path'
)
@
mock
.
patch
.
object
(
SummaryPathParser
,
'get_l
atest_lineage_summary
'
)
@
mock
.
patch
.
object
(
SummaryPathParser
,
'get_l
ineage_summaries
'
)
def
test_get_summary_lineage_failed3
(
self
,
mock_summary
,
mock_valid
,
mock_paser
):
mock_parser
,
mock_file_handler
):
"""Test get_summary_lineage failed."""
mock_summary
.
return_value
=
'/path/to/summary/file'
mock_summary
.
return_value
=
[
'/path/to/summary/file'
]
mock_valid
.
return_value
=
'/path/to/summary_dir'
mock_paser
.
return_value
=
None
mock_parser
.
return_value
=
None
mock_file_handler
=
MagicMock
()
mock_file_handler
.
size
=
1
result
=
get_summary_lineage
(
'/path/to/summary_dir'
)
assert
{}
==
result
...
...
@@ -127,7 +131,7 @@ class TestFilterAPI(TestCase):
"""Test the function of filter_summary_lineage."""
@
mock
.
patch
(
'mindinsight.lineagemgr.api.model.LineageOrganizer'
)
@
mock
.
patch
(
'mindinsight.lineagemgr.api.model.Querier'
)
@
mock
.
patch
(
'mindinsight.lineagemgr.lineage_parser.SummaryPathParser.get_l
atest_lineage_summary
'
)
@
mock
.
patch
(
'mindinsight.lineagemgr.lineage_parser.SummaryPathParser.get_l
ineage_summaries
'
)
@
mock
.
patch
(
'mindinsight.lineagemgr.api.model._convert_relative_path_to_abspath'
)
@
mock
.
patch
(
'mindinsight.lineagemgr.api.model.normalize_summary_dir'
)
def
test_filter_summary_lineage
(
self
,
validate_path_mock
,
convert_path_mock
,
...
...
@@ -198,11 +202,11 @@ class TestFilterAPI(TestCase):
@
mock
.
patch
(
'mindinsight.lineagemgr.api.model.validate_search_model_condition'
)
@
mock
.
patch
(
'mindinsight.lineagemgr.api.model.validate_condition'
)
@
mock
.
patch
(
'mindinsight.lineagemgr.api.model.normalize_summary_dir'
)
@
mock
.
patch
.
object
(
SummaryPathParser
,
'get_l
atest_lineage_summary
'
)
@
mock
.
patch
.
object
(
SummaryPathParser
,
'get_l
ineage_summaries
'
)
def
test_failed_to_get_summary_filesh
(
self
,
mock_parse
,
*
args
):
"""Test filter_summary_lineage with invalid invalid param."""
path
=
'/path/to/summary/dir'
mock_parse
.
return_value
=
None
mock_parse
.
return_value
=
[]
args
[
0
].
return_value
=
path
self
.
assertRaisesRegex
(
LineageFileNotFoundError
,
...
...
@@ -215,7 +219,7 @@ class TestFilterAPI(TestCase):
@
mock
.
patch
(
'mindinsight.lineagemgr.api.model.validate_search_model_condition'
)
@
mock
.
patch
(
'mindinsight.lineagemgr.api.model.validate_condition'
)
@
mock
.
patch
(
'mindinsight.lineagemgr.api.model.normalize_summary_dir'
)
@
mock
.
patch
.
object
(
SummaryPathParser
,
'get_l
atest_l
ineage_summaries'
)
@
mock
.
patch
.
object
(
SummaryPathParser
,
'get_lineage_summaries'
)
@
mock
.
patch
(
'mindinsight.lineagemgr.api.model.Querier'
)
def
test_failed_to_querier
(
self
,
mock_query
,
mock_parse
,
*
args
):
"""Test filter_summary_lineage with invalid invalid param."""
...
...
tests/ut/lineagemgr/common/test_path_parser.py
浏览文件 @
6f599aa1
...
...
@@ -33,19 +33,19 @@ MOCK_SUMMARY_DIRS = [
]
MOCK_SUMMARIES
=
[
{
'file_name'
:
'file0'
,
'file_name'
:
'file0
.summary.1
'
,
'create_time'
:
datetime
.
fromtimestamp
(
1582031970
)
},
{
'file_name'
:
'file0_lineage'
,
'file_name'
:
'file0
.summary.1
_lineage'
,
'create_time'
:
datetime
.
fromtimestamp
(
1582031970
)
},
{
'file_name'
:
'file1'
,
'file_name'
:
'file1
.summary.2
'
,
'create_time'
:
datetime
.
fromtimestamp
(
1582031971
)
},
{
'file_name'
:
'file1_lineage'
,
'file_name'
:
'file1
.summary.2
_lineage'
,
'create_time'
:
datetime
.
fromtimestamp
(
1582031971
)
}
]
...
...
@@ -54,92 +54,54 @@ MOCK_SUMMARIES = [
class
TestSummaryPathParser
(
TestCase
):
"""Test the class of SummaryPathParser."""
@
mock
.
patch
.
object
(
SummaryWatcher
,
'list_summary_directories'
)
def
test_get_summary_dirs
(
self
,
*
args
):
"""Test the function of get_summary_dirs."""
args
[
0
].
return_value
=
MOCK_SUMMARY_DIRS
expected_result
=
[
'/path/to/base/relative_path0'
,
'/path/to/base'
,
'/path/to/base/relative_path1'
]
base_dir
=
'/path/to/base'
result
=
SummaryPathParser
.
get_summary_dirs
(
base_dir
)
self
.
assertListEqual
(
expected_result
,
result
)
args
[
0
].
return_value
=
[]
result
=
SummaryPathParser
.
get_summary_dirs
(
base_dir
)
self
.
assertListEqual
([],
result
)
@
mock
.
patch
.
object
(
SummaryWatcher
,
'list_summaries'
)
def
test_get_l
atest_lineage_summary
(
self
,
*
args
):
"""Test the function of get_l
atest_lineage_summary
."""
def
test_get_l
ineage_summaries
(
self
,
*
args
):
"""Test the function of get_l
ineage_summaries
."""
args
[
0
].
return_value
=
MOCK_SUMMARIES
exp_result
=
[
'file0.summary.1_lineage'
,
'file1.summary.2_lineage'
]
summary_dir
=
'/path/to/summary_dir'
result
=
SummaryPathParser
.
get_l
atest_lineage_summary
(
summary_dir
)
self
.
assertEqual
(
'/path/to/summary_dir/file1_lineage'
,
result
)
result
=
SummaryPathParser
.
get_l
ineage_summaries
(
summary_dir
)
self
.
assertEqual
(
exp_result
,
result
)
args
[
0
].
return_value
=
[
{
'file_name'
:
'file0'
,
'file_name'
:
'file0
.summary.1
'
,
'create_time'
:
datetime
.
fromtimestamp
(
1582031970
)
}
]
result
=
SummaryPathParser
.
get_l
atest_lineage_summary
(
summary_dir
)
self
.
assertEqual
(
None
,
result
)
result
=
SummaryPathParser
.
get_l
ineage_summaries
(
summary_dir
)
self
.
assertEqual
(
[]
,
result
)
args
[
0
].
return_value
=
[
{
'file_name'
:
'file0_lineage'
,
'file_name'
:
'file0
.summary.1
_lineage'
,
'create_time'
:
datetime
.
fromtimestamp
(
1582031970
)
}
]
result
=
SummaryPathParser
.
get_l
atest_lineage_summary
(
summary_dir
)
self
.
assertEqual
(
None
,
result
)
result
=
SummaryPathParser
.
get_l
ineage_summaries
(
summary_dir
)
self
.
assertEqual
(
[
'file0.summary.1_lineage'
]
,
result
)
args
[
0
].
return_value
=
[
{
'file_name'
:
'file0_lineage'
,
'file_name'
:
'file0
.summary.3
_lineage'
,
'create_time'
:
datetime
.
fromtimestamp
(
1582031970
)
},
{
'file_name'
:
'file0_lineage_lineage'
,
'file_name'
:
'file0
.summary.2
_lineage_lineage'
,
'create_time'
:
datetime
.
fromtimestamp
(
1582031970
)
},
{
'file_name'
:
'file1_lineage'
,
'file_name'
:
'file1
.summary.1
_lineage'
,
'create_time'
:
datetime
.
fromtimestamp
(
1582031971
)
},
{
'file_name'
:
'file1_lineage_lineage'
,
'file_name'
:
'file1
.summary.7
_lineage_lineage'
,
'create_time'
:
datetime
.
fromtimestamp
(
1582031971
)
}
]
result
=
SummaryPathParser
.
get_latest_lineage_summary
(
summary_dir
)
self
.
assertEqual
(
'/path/to/summary_dir/file1_lineage_lineage'
,
result
)
@
mock
.
patch
.
object
(
SummaryWatcher
,
'list_summaries'
)
@
mock
.
patch
.
object
(
SummaryWatcher
,
'list_summary_directories'
)
def
test_get_latest_lineage_summaries
(
self
,
*
args
):
"""Test the function of get_latest_lineage_summaries."""
args
[
0
].
return_value
=
MOCK_SUMMARY_DIRS
args
[
1
].
return_value
=
MOCK_SUMMARIES
expected_result
=
[
'/path/to/base/relative_path0/file1_lineage'
,
'/path/to/base/file1_lineage'
,
'/path/to/base/relative_path1/file1_lineage'
]
base_dir
=
'/path/to/base'
result
=
SummaryPathParser
.
get_latest_lineage_summaries
(
base_dir
)
self
.
assertListEqual
(
expected_result
,
result
)
args
[
1
].
return_value
=
[
{
'file_name'
:
'file0_lineage'
,
'create_time'
:
datetime
.
fromtimestamp
(
1582031970
)
}
]
result
=
SummaryPathParser
.
get_latest_lineage_summaries
(
base_dir
)
self
.
assertListEqual
([],
result
)
exp_result
=
[
'file1.summary.1_lineage'
,
'file0.summary.2_lineage_lineage'
,
'file0.summary.3_lineage'
,
'file1.summary.7_lineage_lineage'
]
result
=
SummaryPathParser
.
get_lineage_summaries
(
summary_dir
,
is_sorted
=
True
)
self
.
assertEqual
(
exp_result
,
result
)
tests/ut/lineagemgr/querier/test_querier.py
浏览文件 @
6f599aa1
...
...
@@ -16,6 +16,7 @@
import
time
from
unittest
import
TestCase
,
mock
from
unittest.mock
import
MagicMock
from
google.protobuf.json_format
import
ParseDict
...
...
@@ -248,11 +249,12 @@ LINEAGE_FILTRATION_6 = {
class
TestQuerier
(
TestCase
):
"""Test the class of `Querier`."""
@
mock
.
patch
(
'mindinsight.lineagemgr.lineage_parser.SummaryPathParser.get_l
atest_lineage_summary
'
)
@
mock
.
patch
(
'mindinsight.lineagemgr.lineage_parser.SummaryPathParser.get_l
ineage_summaries
'
)
@
mock
.
patch
(
'mindinsight.lineagemgr.lineage_parser.SummaryWatcher.list_summary_directories'
)
@
mock
.
patch
(
'mindinsight.lineagemgr.lineage_parser.LineageSummaryAnalyzer.get_user_defined_info'
)
@
mock
.
patch
(
'mindinsight.lineagemgr.lineage_parser.LineageSummaryAnalyzer.get_summary_infos'
)
def
setUp
(
self
,
*
args
):
@
mock
.
patch
(
'mindinsight.lineagemgr.lineage_parser.FileHandler'
)
def
setUp
(
self
,
mock_file_handler
,
*
args
):
"""Initialization before test case execution."""
args
[
0
].
return_value
=
create_lineage_info
(
event_data
.
EVENT_TRAIN_DICT_0
,
...
...
@@ -260,7 +262,10 @@ class TestQuerier(TestCase):
event_data
.
EVENT_DATASET_DICT_0
)
args
[
1
].
return_value
=
[]
args
[
3
].
return_value
=
'path'
args
[
3
].
return_value
=
[
'path'
]
mock_file_handler
=
MagicMock
()
mock_file_handler
.
size
=
1
args
[
2
].
return_value
=
[{
'relative_path'
:
'./'
,
'update_time'
:
1
}]
single_summary_path
=
'/path/to/summary0'
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录