Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
VisualDL
提交
d56ed61e
V
VisualDL
项目概览
PaddlePaddle
/
VisualDL
1 年多 前同步成功
通知
88
Star
4655
Fork
642
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
10
列表
看板
标记
里程碑
合并请求
2
Wiki
5
Wiki
分析
仓库
DevOps
项目成员
Pages
V
VisualDL
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
10
Issue
10
列表
看板
标记
里程碑
合并请求
2
合并请求
2
Pages
分析
分析
仓库分析
DevOps
Wiki
5
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
d56ed61e
编写于
3月 15, 2020
作者:
走神的阿圆
提交者:
GitHub
3月 15, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Add data manager to accumulate sampling rate. (#594)
Fix backend data transfer perf issues
上级
a9980dcc
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
420 addition
and
91 deletion
+420
-91
visualdl/logic/pybind.cc
visualdl/logic/pybind.cc
+11
-8
visualdl/logic/sdk.cc
visualdl/logic/sdk.cc
+24
-8
visualdl/logic/sdk.h
visualdl/logic/sdk.h
+9
-5
visualdl/server/data_manager.py
visualdl/server/data_manager.py
+315
-0
visualdl/server/lib.py
visualdl/server/lib.py
+61
-70
未找到文件。
visualdl/logic/pybind.cc
浏览文件 @
d56ed61e
...
...
@@ -154,11 +154,12 @@ PYBIND11_MODULE(core, m) {
});
//------------------- components --------------------
#define ADD_SCALAR_READER(T) \
py::class_<cp::ScalarReader<T>>(m, "ScalarReader__" #T) \
.def("records", &cp::ScalarReader<T>::records) \
.def("timestamps", &cp::ScalarReader<T>::timestamps) \
.def("ids", &cp::ScalarReader<T>::ids) \
#define ADD_SCALAR_READER(T) \
py::class_<cp::ScalarReader<T>>(m, "ScalarReader__" #T) \
.def("records", &cp::ScalarReader<T>::records, py::arg("start_index") = 0) \
.def("timestamps", &cp::ScalarReader<T>::timestamps, py::arg("start_index") = 0) \
.def("ids", &cp::ScalarReader<T>::ids, py::arg("start_index") = 0) \
.def("size", &cp::ScalarReader<T>::size) \
.def("caption", &cp::ScalarReader<T>::caption);
ADD_SCALAR_READER
(
int
);
ADD_SCALAR_READER
(
float
);
...
...
@@ -390,9 +391,11 @@ PYBIND11_MODULE(core, m) {
ADD_FULL_TYPE_IMPL
(
ADD_HISTOGRAM_RECORD
)
#undef ADD_HISTOGRAM_RECORD
#define ADD_HISTOGRAM_READER(T) \
py::class_<cp::HistogramReader<T>>(m, "HistogramReader__" #T) \
.def("num_records", &cp::HistogramReader<T>::num_records) \
#define ADD_HISTOGRAM_READER(T) \
py::class_<cp::HistogramReader<T>>(m, "HistogramReader__" #T) \
.def("num_records", &cp::HistogramReader<T>::num_records) \
.def("records", &cp::HistogramReader<T>::records, py::arg("start_index") = 0) \
.def("size", &cp::HistogramReader<T>::size) \
.def("record", &cp::HistogramReader<T>::record);
ADD_FULL_TYPE_IMPL
(
ADD_HISTOGRAM_READER
)
#undef ADD_HISTOGRAM_READER
...
...
visualdl/logic/sdk.cc
浏览文件 @
d56ed61e
...
...
@@ -108,27 +108,27 @@ bool LogReader::TagMatchMode(const std::string& tag, const std::string& mode) {
namespace
components
{
template
<
typename
T
>
std
::
vector
<
T
>
ScalarReader
<
T
>::
records
()
const
{
std
::
vector
<
T
>
ScalarReader
<
T
>::
records
(
size_t
start_index
)
const
{
std
::
vector
<
T
>
res
;
for
(
int
i
=
0
;
i
<
total_records
();
i
++
)
{
for
(
size_t
i
=
start_index
;
i
<
total_records
();
++
i
)
{
res
.
push_back
(
reader_
.
record
(
i
).
data
(
0
).
template
Get
<
T
>());
}
return
res
;
}
template
<
typename
T
>
std
::
vector
<
int
>
ScalarReader
<
T
>::
ids
()
const
{
std
::
vector
<
int
>
res
;
for
(
int
i
=
0
;
i
<
reader_
.
total_records
();
i
++
)
{
std
::
vector
<
int
>
ScalarReader
<
T
>::
ids
(
size_t
start_index
)
const
{
std
::
vector
<
int
>
res
;
for
(
size_t
i
=
start_index
;
i
<
reader_
.
total_records
();
++
i
)
{
res
.
push_back
(
reader_
.
record
(
i
).
id
());
}
return
res
;
}
template
<
typename
T
>
std
::
vector
<
time_t
>
ScalarReader
<
T
>::
timestamps
()
const
{
std
::
vector
<
time_t
>
ScalarReader
<
T
>::
timestamps
(
size_t
start_index
)
const
{
std
::
vector
<
time_t
>
res
;
for
(
int
i
=
0
;
i
<
reader_
.
total_records
();
i
++
)
{
for
(
size_t
i
=
start_index
;
i
<
reader_
.
total_records
();
++
i
)
{
res
.
push_back
(
reader_
.
record
(
i
).
timestamp
());
}
return
res
;
...
...
@@ -297,7 +297,7 @@ void Histogram<T>::AddRecord(int step, const std::vector<T>& data) {
}
template
<
typename
T
>
HistogramRecord
<
T
>
HistogramReader
<
T
>::
record
(
int
i
)
{
HistogramRecord
<
T
>
HistogramReader
<
T
>::
record
(
int
i
)
const
{
CHECK_LT
(
i
,
num_records
());
auto
r
=
reader_
.
record
(
i
);
auto
d
=
r
.
data
(
0
);
...
...
@@ -313,6 +313,22 @@ HistogramRecord<T> HistogramReader<T>::record(int i) {
return
HistogramRecord
<
T
>
(
timestamp
,
step
,
left
,
right
,
std
::
move
(
frequency
));
}
template
<
typename
T
>
std
::
vector
<
HistogramRecord
<
T
>>
HistogramReader
<
T
>::
records
(
size_t
start_index
)
const
{
std
::
vector
<
HistogramRecord
<
T
>>
res
;
for
(
size_t
i
=
start_index
;
i
<
reader_
.
total_records
();
++
i
)
{
res
.
push_back
(
record
(
i
));
}
return
res
;
}
template
<
typename
T
>
size_t
HistogramReader
<
T
>::
size
()
const
{
return
reader_
.
total_records
();
}
DECL_BASIC_TYPES_CLASS_IMPL
(
class
,
ScalarReader
)
DECL_BASIC_TYPES_CLASS_IMPL
(
struct
,
Histogram
)
DECL_BASIC_TYPES_CLASS_IMPL
(
struct
,
HistogramReader
)
...
...
visualdl/logic/sdk.h
浏览文件 @
d56ed61e
...
...
@@ -122,9 +122,9 @@ template <typename T>
struct
ScalarReader
{
ScalarReader
(
TabletReader
&&
reader
)
:
reader_
(
reader
)
{}
std
::
vector
<
T
>
records
()
const
;
std
::
vector
<
int
>
ids
()
const
;
std
::
vector
<
time_t
>
timestamps
()
const
;
std
::
vector
<
T
>
records
(
size_t
start_index
=
0
)
const
;
std
::
vector
<
int
>
ids
(
size_t
start_index
=
0
)
const
;
std
::
vector
<
time_t
>
timestamps
(
size_t
start_index
=
0
)
const
;
std
::
string
caption
()
const
;
size_t
total_records
()
const
{
return
reader_
.
total_records
();
}
size_t
size
()
const
;
...
...
@@ -279,9 +279,13 @@ template <typename T>
struct
HistogramReader
{
HistogramReader
(
TabletReader
tablet
)
:
reader_
(
tablet
)
{}
size_t
num_records
()
{
return
reader_
.
total_records
()
-
1
;
}
size_t
num_records
()
const
{
return
reader_
.
total_records
();
}
std
::
vector
<
HistogramRecord
<
T
>>
records
(
size_t
start_index
=
0
)
const
;
HistogramRecord
<
T
>
record
(
int
i
);
HistogramRecord
<
T
>
record
(
int
i
)
const
;
size_t
size
()
const
;
private:
TabletReader
reader_
;
...
...
visualdl/server/data_manager.py
0 → 100644
浏览文件 @
d56ed61e
# Copyright (c) 2020 VisualDL Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =======================================================================
from
__future__
import
absolute_import
import
threading
import
random
import
collections
DEFAULT_PLUGIN_MAXSIZE
=
{
"scalar"
:
300
,
"image"
:
10
,
"histogram"
:
100
}
class
Reservoir
(
object
):
"""A map-to-arrays dict, with deterministic Reservoir Sampling.
Store each reservoir bucket by key, and each bucket is a list sampling
with reservoir algorithm.
"""
def
__init__
(
self
,
max_size
,
seed
=
0
):
"""Creates a new reservoir.
Args:
max_size: The number of values to keep in the reservoir for each tag,
if max_size is zero, all values will be kept in bucket.
seed: The seed to initialize a random.Random().
num_item_index: The index of data to add.
Raises:
ValueError: If max_size is not a nonnegative integer.
"""
if
max_size
<
0
or
max_size
!=
round
(
max_size
):
raise
ValueError
(
"Max_size must be nonnegative integer."
)
self
.
_max_size
=
max_size
self
.
_buckets
=
collections
.
defaultdict
(
lambda
:
_ReservoirBucket
(
max_size
=
self
.
_max_size
,
random_instance
=
random
.
Random
(
seed
))
)
self
.
_mutex
=
threading
.
Lock
()
@
property
def
keys
(
self
):
"""Return all keys in self._buckets.
Returns:
All keys in reservoir buckets.
:return:
"""
with
self
.
_mutex
:
return
list
(
self
.
_buckets
.
keys
())
def
_exist_in_keys
(
self
,
key
):
"""Determine if key exists.
Args:
key: Key to determine if exists.
Returns:
True if key exists in buckets.keys, otherwise False.
"""
return
True
if
key
in
self
.
_buckets
.
keys
()
else
False
def
exist_in_keys
(
self
,
mode
,
tag
):
"""Determine if mode_tag exists.
For usage habits of VisualDL, actually call self._exist_in_keys()
Args:
mode: Identity of one tablet.
tag: Identity of one record in tablet.
Returns:
True if mode_tag exists in buckets.keys, otherwise False.
"""
key
=
mode
+
"_"
+
tag
return
self
.
_exist_in_keys
(
key
)
def
_get_num_items_index
(
self
,
key
):
keys
=
self
.
keys
if
key
not
in
keys
:
raise
KeyError
(
"Key %s not in buckets.keys()"
%
key
)
return
self
.
_buckets
[
key
].
num_items_index
def
get_num_items_index
(
self
,
mode
,
tag
):
key
=
mode
+
"_"
+
tag
return
self
.
_get_num_items_index
(
key
)
def
_get_items
(
self
,
key
):
"""Get items with tag "key"
Args:
key: Key to finding bucket in reservoir buckets.
Returns:
One bucket in reservoir buckets by key.
"""
keys
=
self
.
keys
with
self
.
_mutex
:
if
key
not
in
keys
:
raise
KeyError
(
"Key %s not in buckets.keys()"
%
key
)
return
self
.
_buckets
[
key
].
items
def
get_items
(
self
,
mode
,
tag
):
"""Get items with tag 'mode_tag'
For usage habits of VisualDL, actually call self._get_items()
Args:
mode: Identity of one tablet.
tag: Identity of one record in tablet.
Returns:
One bucket in reservoir buckets by mode and tag.
"""
key
=
mode
+
"_"
+
tag
return
self
.
_get_items
(
key
)
def
_add_item
(
self
,
key
,
item
):
"""Add a new item to reservoir buckets with given tag as key.
If bucket with key has not yet reached full size, each item will be
added.
If bucket with key is full, each item will be added with same
probability.
Add new item to buckets will always valid because self._buckets is a
collection.defaultdict.
Args:
key: Tag of one bucket to add new item.
item: New item to add to bucket.
"""
with
self
.
_mutex
:
self
.
_buckets
[
key
].
add_item
(
item
)
def
add_item
(
self
,
mode
,
tag
,
item
):
"""Add a new item to reservoir buckets with given tag as key.
For usage habits of VisualDL, actually call self._add_items()
Args:
mode: Identity of one tablet.
tag: Identity of one record in tablet.
item: New item to add to bucket.
"""
key
=
mode
+
"_"
+
tag
self
.
_add_item
(
key
,
item
)
class
_ReservoirBucket
(
object
):
"""Data manager for sampling data, use reservoir sampling.
"""
def
__init__
(
self
,
max_size
,
random_instance
=
None
):
"""Create a _ReservoirBucket instance.
Args:
max_size: The maximum size of reservoir bucket. If max_size is
zero, the bucket has unbounded size.
random_instance: The random number generator. If not specified,
default to random.Random(0)
num_item_index: The index of data to add.
Raises:
ValueError: If args max_size is not a nonnegative integer.
"""
if
max_size
<
0
or
max_size
!=
round
(
max_size
):
raise
ValueError
(
"Max_size must be nonnegative integer."
)
self
.
_max_size
=
max_size
self
.
_random
=
random_instance
if
random_instance
is
not
None
else
\
random
.
Random
(
0
)
self
.
_items
=
[]
self
.
_mutex
=
threading
.
Lock
()
self
.
_num_items_index
=
0
def
add_item
(
self
,
item
):
""" Add an item to bucket, replacing an old item with probability.
Use reservoir sampling to add a new item to sampling bucket,
each item in a steam has same probability stay in the bucket.
Args:
item: The item to add to reservoir bucket.
"""
with
self
.
_mutex
:
if
len
(
self
.
_items
)
<
self
.
_max_size
or
self
.
_max_size
==
0
:
self
.
_items
.
append
(
item
)
else
:
r
=
self
.
_random
.
randint
(
0
,
self
.
_num_items_index
)
if
r
<
self
.
_max_size
:
self
.
_items
.
pop
(
r
)
self
.
_items
.
append
(
item
)
self
.
_num_items_index
+=
1
@
property
def
items
(
self
):
"""Get self._items
Returns:
All items.
"""
with
self
.
_mutex
:
return
self
.
_items
@
property
def
num_items_index
(
self
):
with
self
.
_mutex
:
return
self
.
_num_items_index
class
DataManager
(
object
):
"""Data manager for all plugin.
"""
def
__init__
(
self
):
"""Create a data manager for all plugin.
All kinds of plugin has own reservoir, stored in a dict with plugin
name as key.
"""
self
.
_scalar_reservoir
=
Reservoir
(
max_size
=
DEFAULT_PLUGIN_MAXSIZE
[
"scalar"
])
self
.
_histogram_reservoir
=
Reservoir
(
max_size
=
DEFAULT_PLUGIN_MAXSIZE
[
"histogram"
])
self
.
_image_reservoir
=
Reservoir
(
max_size
=
DEFAULT_PLUGIN_MAXSIZE
[
"image"
])
self
.
_reservoirs
=
{
"scalar"
:
self
.
_scalar_reservoir
,
"histogram"
:
self
.
_histogram_reservoir
,
"image"
:
self
.
_image_reservoir
}
self
.
_mutex
=
threading
.
Lock
()
def
get_reservoir
(
self
,
plugin
):
"""Get reservoir by plugin as key.
Args:
plugin: Key to get one reservoir bucket for one specified plugin.
Returns:
Reservoir bucket for plugin.
"""
with
self
.
_mutex
:
if
plugin
not
in
self
.
_reservoirs
.
keys
():
raise
KeyError
(
"Key %s not in reservoirs."
%
plugin
)
return
self
.
_reservoirs
[
plugin
]
def
add_item
(
self
,
plugin
,
mode
,
tag
,
item
):
"""Add item to one plugin reservoir bucket.
Use 'mode', 'tag' for usage habits of VisualDL.
Args:
plugin: Key to get one reservoir bucket.
mode: Each tablet has different 'mode'.
tag: Tag will be used to generate paths of tablets.
item: The item to add to reservoir bucket.
"""
with
self
.
_mutex
:
self
.
_reservoirs
[
plugin
].
add_item
(
mode
,
tag
,
item
)
def
get_keys
(
self
):
"""Get all plugin buckets name.
Returns:
All plugin keys.
"""
with
self
.
_mutex
:
return
self
.
_reservoirs
.
keys
()
default_data_manager
=
DataManager
()
if
__name__
==
'__main__'
:
d
=
DataManager
()
d
.
add_item
(
"scalar"
,
"train"
,
"loss"
,
3
)
d
.
add_item
(
"scalar"
,
"train"
,
"loss"
,
4
)
d
.
add_item
(
"scalar"
,
"train"
,
"loss"
,
5
)
d
.
add_item
(
"scalar"
,
"train"
,
"accu"
,
5
)
d
.
add_item
(
"scalar"
,
"train"
,
"accu"
,
6
)
d
.
add_item
(
"scalar"
,
"train"
,
"accu"
,
7
)
d
.
add_item
(
"scalar"
,
"train"
,
"accu"
,
8
)
a
=
d
.
get_keys
()
print
(
a
)
b
=
d
.
get_reservoir
(
"scalar"
).
get_items
(
"train"
,
"loss"
)
print
(
b
)
c
=
d
.
get_reservoir
(
"scalar"
).
get_num_items_index
(
"train"
,
"loss"
)
print
(
c
)
print
(
"***"
)
b
=
d
.
get_reservoir
(
"scalar"
).
get_items
(
"train"
,
"accu"
)
print
(
b
)
print
(
d
.
get_reservoir
(
"scalar"
).
get_num_items_index
(
"train"
,
"accu"
))
d
.
add_item
(
"scalar"
,
"train"
,
"loss"
,
3
)
d
.
add_item
(
"scalar"
,
"train"
,
"loss"
,
4
)
d
.
add_item
(
"scalar"
,
"train"
,
"loss"
,
5
)
c
=
d
.
get_reservoir
(
"scalar"
).
get_num_items_index
(
"train"
,
"loss"
)
print
(
c
)
print
(
d
.
get_reservoir
(
"scalar"
).
exist_in_keys
(
"train"
,
"loss"
))
print
(
d
.
get_reservoir
(
"scalar"
).
exist_in_keys
(
"train"
,
"loss2"
))
\ No newline at end of file
visualdl/server/lib.py
浏览文件 @
d56ed61e
...
...
@@ -22,6 +22,7 @@ import numpy as np
from
PIL
import
Image
from
.log
import
logger
import
wave
from
.data_manager
import
default_data_manager
try
:
from
urllib.parse
import
urlencode
...
...
@@ -48,38 +49,31 @@ def get_scalar_tags(storage):
def
get_scalar
(
storage
,
mode
,
tag
,
num_records
=
300
):
assert
num_records
>
1
with
storage
.
mode
(
mode
)
as
reader
:
scalar
=
reader
.
scalar
(
tag
)
records
=
scalar
.
records
()
ids
=
scalar
.
ids
()
timestamps
=
scalar
.
timestamps
()
data_reservoir
=
default_data_manager
.
get_reservoir
(
"scalar"
)
if
not
data_reservoir
.
exist_in_keys
(
mode
=
mode
,
tag
=
tag
):
records
=
scalar
.
records
()
ids
=
scalar
.
ids
()
timestamps
=
scalar
.
timestamps
()
data
=
list
(
zip
(
timestamps
,
ids
,
records
))
data_size
=
len
(
data
)
data
=
list
(
zip
(
timestamps
,
ids
,
records
))
if
data_size
<=
num_records
:
return
data
for
index
in
range
(
len
(
data
)):
data_reservoir
.
add_item
(
mode
=
mode
,
tag
=
tag
,
item
=
data
[
index
])
else
:
num_items_index
=
data_reservoir
.
get_num_items_index
(
mode
,
tag
)
if
num_items_index
!=
scalar
.
size
():
records
=
scalar
.
records
(
num_items_index
)
ids
=
scalar
.
ids
(
num_items_index
)
timestamps
=
scalar
.
timestamps
(
num_items_index
)
span
=
float
(
data_size
)
/
(
num_records
-
1
)
span_offset
=
0
data
=
list
(
zip
(
timestamps
,
ids
,
records
))
data_idx
=
int
(
span_offset
*
span
)
sampled_data
=
[]
while
data_idx
<
data_size
:
sampled_data
.
append
(
data
[
data_size
-
data_idx
-
1
])
span_offset
+=
1
data_idx
=
int
(
span_offset
*
span
)
sampled_data
.
append
(
data
[
0
])
res
=
sampled_data
[::
-
1
]
# TODO(Superjomn) some bug here, sometimes there are zero here.
if
res
[
-
1
]
==
0.
:
res
=
res
[:
-
1
]
return
res
for
index
in
range
(
len
(
data
)):
data_reservoir
.
add_item
(
mode
=
mode
,
tag
=
tag
,
item
=
data
[
index
])
return
data_reservoir
.
get_items
(
mode
,
tag
)
def
get_image_tags
(
storage
):
...
...
@@ -111,15 +105,27 @@ def get_image_tag_steps(storage, mode, tag):
image
=
reader
.
image
(
tag
)
res
=
[]
for
step_index
in
range
(
image
.
num_records
()):
record
=
image
.
record
(
step_index
,
sample_index
)
try
:
image_num_records
=
image
.
num_records
()
num_samples
=
10
if
image_num_records
<=
num_samples
:
for
step_index
in
range
(
image_num_records
):
record
=
image
.
record
(
step_index
,
sample_index
)
res
.
append
({
'step'
:
record
.
step_id
(),
'wallTime'
:
image
.
timestamp
(
step_index
),
'wallTime'
:
image
.
timestamp
(
step_index
)
})
else
:
span
=
float
(
image_num_records
)
/
num_samples
for
index
in
range
(
num_samples
):
step_index
=
round
(
span
*
index
)
record
=
image
.
record
(
step_index
,
sample_index
)
res
.
append
({
'step'
:
record
.
step_id
(),
'wallTime'
:
image
.
timestamp
(
step_index
)
})
except
Exception
:
logger
.
error
(
"image sample out of range"
)
return
res
...
...
@@ -291,45 +297,30 @@ def get_embeddings(storage, mode, reduction, dimension=2, num_records=5000):
def
get_histogram
(
storage
,
mode
,
tag
):
with
storage
.
mode
(
mode
)
as
reader
:
histogram
=
reader
.
histogram
(
tag
)
res
=
[]
for
i
in
range
(
histogram
.
num_records
()):
try
:
# some bug with protobuf, some times may overflow
record
=
histogram
.
record
(
i
)
except
Exception
:
continue
res
.
append
([])
py_record
=
res
[
-
1
]
py_record
.
append
(
record
.
timestamp
())
py_record
.
append
(
record
.
step
())
py_record
.
append
([])
data
=
py_record
[
-
1
]
for
j
in
range
(
record
.
num_instances
()):
instance
=
record
.
instance
(
j
)
data
.
append
(
[
instance
.
left
(),
instance
.
right
(),
instance
.
frequency
()])
# num_samples: We will only return 100 samples.
num_samples
=
100
if
len
(
res
)
<
num_samples
:
return
res
# sample some steps
span
=
float
(
len
(
res
))
/
(
num_samples
-
1
)
span_offset
=
0
data_idx
=
0
sampled_data
=
[]
data_size
=
len
(
res
)
while
data_idx
<
data_size
:
sampled_data
.
append
(
res
[
data_size
-
data_idx
-
1
])
span_offset
+=
1
data_idx
=
int
(
span_offset
*
span
)
sampled_data
.
append
(
res
[
0
])
return
sampled_data
[::
-
1
]
data_reservoir
=
default_data_manager
.
get_reservoir
(
"histogram"
)
if
not
data_reservoir
.
exist_in_keys
(
mode
=
mode
,
tag
=
tag
):
records
=
histogram
.
records
()
for
record
in
records
:
data
=
[]
for
j
in
range
(
record
.
num_instances
()):
instance
=
record
.
instance
(
j
)
data
.
append
(
[
instance
.
left
(),
instance
.
right
(),
instance
.
frequency
()])
data_reservoir
.
add_item
(
mode
,
tag
,
[
record
.
timestamp
(),
record
.
step
(),
data
])
else
:
num_items_index
=
data_reservoir
.
get_num_items_index
(
mode
,
tag
)
if
num_items_index
!=
histogram
.
size
():
records
=
histogram
.
records
(
num_items_index
)
for
record
in
records
:
data
=
[]
for
j
in
range
(
record
.
num_instances
()):
instance
=
record
.
instance
(
j
)
data
.
append
(
[
instance
.
left
(),
instance
.
right
(),
instance
.
frequency
()])
data_reservoir
.
add_item
(
mode
,
tag
,
[
record
.
timestamp
(),
record
.
step
(),
data
])
return
data_reservoir
.
get_items
(
mode
,
tag
)
def
retry
(
ntimes
,
function
,
time2sleep
,
*
args
,
**
kwargs
):
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录