未验证 提交 f7af2157 编写于 作者: C chenjian 提交者: GitHub

Add scalars component (#1081)

* backend of add_scalars

* fix function name bug of get_scalars_tags

* @zh794390558 and @ShenYuhan commented 0508

* fix a bug

* adapt to frontend runs need

* fix proto

* fix a bug

* add scalars test demo

* fix

* adapt scalars to scalar

* remove unused code

* fix demo code

* fix proto version

* fix proto version

* upgrade protoc to v3.19.0

* fix a bug in logreader
Co-authored-by: N伽利略 <jialiasus2@163.com>
Co-authored-by: Nwuzewu <wuzewu@baidu.com>
上级 59c86e52
# Copyright (c) 2022 VisualDL Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =======================================================================
# coding=utf-8
import math
from visualdl import LogWriter
if __name__ == '__main__':
value = [i / 1000.0 for i in range(1000)]
with LogWriter(logdir="./log/scalars_test/") as writer:
for step in range(1000):
writer.add_scalars(
main_tag='math/formula1',
tag_scalar_dict={
'sinx': math.sin(value[step]),
'cosx': math.cos(value[step])
},
step=step)
writer.add_scalars(
main_tag='math/formula2',
tag_scalar_dict={
'sqrtx': math.sqrt(value[step]),
'squarex': value[step]**2
},
step=step)
with LogWriter(logdir="./log/scalar_test1/") as writer:
for step in range(1000):
writer.add_scalar(
tag='math/formula2', value=value[step], step=step)
...@@ -18,6 +18,29 @@ from PIL import Image ...@@ -18,6 +18,29 @@ from PIL import Image
from visualdl.proto.record_pb2 import Record from visualdl.proto.record_pb2 import Record
def scalars(main_tag, tag_scalar_dict, step, walltime=None):
"""Package data to scalars
Args:
main_tag (string): Data identifier
tag_scalar_dict (dict): A dict to provide multi-values with tags
step (int): Step of scalar
walltime (int): Wall time of scalar
Return:
Package with format of record_pb2.Record
"""
for sub_tag, value in tag_scalar_dict.items():
value = float(value)
yield Record(values=[
Record.Value(
id=step,
tag=main_tag,
timestamp=walltime,
tag_value=Record.TagValue(tag=sub_tag, value=value))
])
def scalar(tag, value, step, walltime=None): def scalar(tag, value, step, walltime=None):
"""Package data to one scalar. """Package data to one scalar.
Args: Args:
......
...@@ -75,6 +75,11 @@ message Record { ...@@ -75,6 +75,11 @@ message Record {
string display_name = 1; string display_name = 1;
} }
message TagValue {
string tag = 1;
float value = 2;
}
message Value { message Value {
int64 id = 1; int64 id = 1;
string tag = 2; string tag = 2;
...@@ -90,6 +95,7 @@ message Record { ...@@ -90,6 +95,7 @@ message Record {
ROC_Curve roc_curve = 11; ROC_Curve roc_curve = 11;
Text text = 12; Text text = 12;
HParam hparam = 13; HParam hparam = 13;
TagValue tag_value = 14;
} }
} }
......
此差异已折叠。
...@@ -123,6 +123,13 @@ class LogReader(object): ...@@ -123,6 +123,13 @@ class LogReader(object):
for item in proto_datas: for item in proto_datas:
data.append([item.id, item.tag, item.timestamp, item.value]) data.append([item.id, item.tag, item.timestamp, item.value])
return data return data
elif 'scalars' == component:
for item in proto_datas:
data.append([
item.id, item.tag, item.tag_value.tag, item.timestamp,
item.tag_value.value
])
return data
return proto_datas return proto_datas
def _get_log_tags(self): def _get_log_tags(self):
...@@ -143,6 +150,8 @@ class LogReader(object): ...@@ -143,6 +150,8 @@ class LogReader(object):
else: else:
file_path = bfile.join(run, self.walks[run]) file_path = bfile.join(run, self.walks[run])
reader = self._get_file_reader(file_path=file_path, update=False) reader = self._get_file_reader(file_path=file_path, update=False)
reader.dir = run
self.reader = reader
remain = self.get_remain(reader=reader) remain = self.get_remain(reader=reader)
data = self.read_log_data( data = self.read_log_data(
remain=remain, update=False)[component][tag] remain=remain, update=False)[component][tag]
...@@ -191,6 +200,8 @@ class LogReader(object): ...@@ -191,6 +200,8 @@ class LogReader(object):
component = "text" component = "text"
elif "hparam" == value_type: elif "hparam" == value_type:
component = "hyper_parameters" component = "hyper_parameters"
elif "tag_value" == value_type:
component = "scalars"
else: else:
raise TypeError("Invalid value type `%s`." % value_type) raise TypeError("Invalid value type `%s`." % value_type)
self._tags[path] = component self._tags[path] = component
...@@ -265,7 +276,6 @@ class LogReader(object): ...@@ -265,7 +276,6 @@ class LogReader(object):
if update: if update:
self.register_reader(file_path) self.register_reader(file_path)
self.reader = self.readers[file_path] self.reader = self.readers[file_path]
self.reader.dir = file_path
return self.reader return self.reader
else: else:
reader = RecordReader(filepath=file_path) reader = RecordReader(filepath=file_path)
...@@ -275,6 +285,7 @@ class LogReader(object): ...@@ -275,6 +285,7 @@ class LogReader(object):
if update: if update:
if path not in list(self.readers.keys()): if path not in list(self.readers.keys()):
reader = RecordReader(filepath=path, dir=dir) reader = RecordReader(filepath=path, dir=dir)
reader.dir = dir
self.readers[path] = reader self.readers[path] = reader
else: else:
pass pass
......
...@@ -135,6 +135,11 @@ class Api(object): ...@@ -135,6 +135,11 @@ class Api(object):
return self._get_with_retry('data/plugin/scalars/tags', return self._get_with_retry('data/plugin/scalars/tags',
lib.get_scalar_tags) lib.get_scalar_tags)
@result()
def scalars_tags(self):
return self._get_with_retry('data/plugin/multiscalars/tags',
lib.get_scalars_tags)
@result() @result()
def image_tags(self): def image_tags(self):
return self._get_with_retry('data/plugin/images/tags', return self._get_with_retry('data/plugin/images/tags',
...@@ -194,11 +199,24 @@ class Api(object): ...@@ -194,11 +199,24 @@ class Api(object):
key = os.path.join('data/plugin/scalars/scalars', run, tag) key = os.path.join('data/plugin/scalars/scalars', run, tag)
return self._get_with_retry(key, lib.get_scalar, run, tag) return self._get_with_retry(key, lib.get_scalar, run, tag)
@result()
def scalars_list(self, run, tag, sub_tag):
key = os.path.join('data/plugin/multiscalars/scalars', run, tag,
sub_tag)
return self._get_with_retry(key, lib.get_scalars, run, tag, sub_tag)
@result('text/csv') @result('text/csv')
def scalar_data(self, run, tag, type='tsv'): def scalar_data(self, run, tag, type='tsv'):
key = os.path.join('data/plugin/scalars/data', run, tag, type) key = os.path.join('data/plugin/scalars/data', run, tag, type)
return self._get_with_retry(key, lib.get_scalar_data, run, tag, type) return self._get_with_retry(key, lib.get_scalar_data, run, tag, type)
@result('text/csv')
def scalars_data(self, run, tag, sub_tag, type='tsv'):
key = os.path.join('data/plugin/multiscalars/data', run, tag, sub_tag,
type)
return self._get_with_retry(key, lib.get_scalars_data, run, tag,
sub_tag, type)
@result() @result()
def image_list(self, mode, tag): def image_list(self, mode, tag):
key = os.path.join('data/plugin/images/images', mode, tag) key = os.path.join('data/plugin/images/images', mode, tag)
...@@ -412,6 +430,7 @@ def create_api_call(logdir, model, cache_timeout): ...@@ -412,6 +430,7 @@ def create_api_call(logdir, model, cache_timeout):
'tags': (api.tags, []), 'tags': (api.tags, []),
'logs': (api.logs, []), 'logs': (api.logs, []),
'scalar/tags': (api.scalar_tags, []), 'scalar/tags': (api.scalar_tags, []),
'scalars/tags': (api.scalars_tags, []),
'image/tags': (api.image_tags, []), 'image/tags': (api.image_tags, []),
'text/tags': (api.text_tags, []), 'text/tags': (api.text_tags, []),
'audio/tags': (api.audio_tags, []), 'audio/tags': (api.audio_tags, []),
...@@ -420,7 +439,9 @@ def create_api_call(logdir, model, cache_timeout): ...@@ -420,7 +439,9 @@ def create_api_call(logdir, model, cache_timeout):
'pr-curve/tags': (api.pr_curve_tags, []), 'pr-curve/tags': (api.pr_curve_tags, []),
'roc-curve/tags': (api.roc_curve_tags, []), 'roc-curve/tags': (api.roc_curve_tags, []),
'scalar/list': (api.scalar_list, ['run', 'tag']), 'scalar/list': (api.scalar_list, ['run', 'tag']),
'scalars/list': (api.scalars_list, ['run', 'tag', 'sub_tag']),
'scalar/data': (api.scalar_data, ['run', 'tag', 'type']), 'scalar/data': (api.scalar_data, ['run', 'tag', 'type']),
'scalars/data': (api.scalars_data, ['run', 'tag', 'sub_tag', 'type']),
'image/list': (api.image_list, ['run', 'tag']), 'image/list': (api.image_list, ['run', 'tag']),
'image/image': (api.image_image, ['run', 'tag', 'index']), 'image/image': (api.image_image, ['run', 'tag', 'index']),
'text/list': (api.text_list, ['run', 'tag']), 'text/list': (api.text_list, ['run', 'tag']),
......
...@@ -12,10 +12,9 @@ ...@@ -12,10 +12,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# ======================================================================= # =======================================================================
import threading
import random
import collections import collections
import random
import threading
DEFAULT_PLUGIN_MAXSIZE = { DEFAULT_PLUGIN_MAXSIZE = {
"scalar": 1000, "scalar": 1000,
...@@ -31,6 +30,10 @@ DEFAULT_PLUGIN_MAXSIZE = { ...@@ -31,6 +30,10 @@ DEFAULT_PLUGIN_MAXSIZE = {
} }
def add_sub_tag(tag, sub_tag):
return tag.replace('%', '_') + '_' + sub_tag
class Reservoir(object): class Reservoir(object):
"""A map-to-arrays dict, with deterministic Reservoir Sampling. """A map-to-arrays dict, with deterministic Reservoir Sampling.
...@@ -53,10 +56,8 @@ class Reservoir(object): ...@@ -53,10 +56,8 @@ class Reservoir(object):
if max_size < 0 or max_size != round(max_size): if max_size < 0 or max_size != round(max_size):
raise ValueError("Max_size must be nonnegative integer.") raise ValueError("Max_size must be nonnegative integer.")
self._max_size = max_size self._max_size = max_size
self._buckets = collections.defaultdict( self._buckets = collections.defaultdict(lambda: _ReservoirBucket(
lambda: _ReservoirBucket(max_size=self._max_size, max_size=self._max_size, random_instance=random.Random(seed)))
random_instance=random.Random(seed))
)
self._mutex = threading.Lock() self._mutex = threading.Lock()
@property @property
...@@ -174,6 +175,25 @@ class Reservoir(object): ...@@ -174,6 +175,25 @@ class Reservoir(object):
with self._mutex: with self._mutex:
self._buckets[key].add_scalar_item(item) self._buckets[key].add_scalar_item(item)
def _add_scalars_item(self, key, item):
"""Add a new scalar item to reservoir buckets with given tag as key.
If bucket with key has not yet reached full size, each item will be
added.
If bucket with key is full, each item will be added with same
probability.
Add new item to buckets will always valid because self._buckets is a
collection.defaultdict.
Args:
key: Tag of one bucket to add new item.
item: New item to add to bucket.
"""
with self._mutex:
self._buckets[key].add_scalars_item(item)
def add_item(self, run, tag, item): def add_item(self, run, tag, item):
"""Add a new item to reservoir buckets with given tag as key. """Add a new item to reservoir buckets with given tag as key.
...@@ -197,8 +217,14 @@ class Reservoir(object): ...@@ -197,8 +217,14 @@ class Reservoir(object):
tag: Identity of one record in tablet. tag: Identity of one record in tablet.
item: New item to add to bucket. item: New item to add to bucket.
""" """
if item.WhichOneof("one_value") == "value":
key = run + "/" + tag key = run + "/" + tag
self._add_scalar_item(key, item) self._add_scalar_item(key, item)
elif item.WhichOneof("one_value") == "tag_value":
key = run + "/" + add_sub_tag(tag, item.tag_value.tag) + "/" + tag
self._add_scalars_item(key, item)
else:
raise ValueError("Not scalar type:" + item.WhichOneof("one_value"))
def _cut_tail(self, key): def _cut_tail(self, key):
with self._mutex: with self._mutex:
...@@ -247,6 +273,9 @@ class _ReservoirBucket(object): ...@@ -247,6 +273,9 @@ class _ReservoirBucket(object):
self.max_scalar = None self.max_scalar = None
self.min_scalar = None self.min_scalar = None
# improve performance when data is monotonous
self._last_special = False
def add_item(self, item): def add_item(self, item):
""" Add an item to bucket, replacing an old item with probability. """ Add an item to bucket, replacing an old item with probability.
...@@ -274,24 +303,101 @@ class _ReservoirBucket(object): ...@@ -274,24 +303,101 @@ class _ReservoirBucket(object):
Use reservoir sampling to add a new item to sampling bucket, Use reservoir sampling to add a new item to sampling bucket,
each item in a steam has same probability stay in the bucket. each item in a steam has same probability stay in the bucket.
use _last_special mark to improve performance when data is monotonous
Args: Args:
item: The item to add to reservoir bucket. item: The item to add to reservoir bucket.
""" """
with self._mutex: with self._mutex:
# save max and min value
if not self.max_scalar or self.max_scalar.value < item.value: if not self.max_scalar or self.max_scalar.value < item.value:
self.max_scalar = item self.max_scalar = item
if not self.min_scalar or self.min_scalar.value > item.value: if not self.min_scalar or self.min_scalar.value > item.value:
self.min_scalar = item self.min_scalar = item
if len(self._items) < self._max_size or self._max_size == 0: if len(self._items) < self._max_size or self._max_size == 0:
# capacity is valid, append directly
self._items.append(item)
else:
if self._last_special:
if self._items[-1].id == self.min_scalar.id or self._items[
-1].id == self.max_scalar.id:
# data is not monotonous, set special to False
self._last_special = False
else:
# data is monotonous, drop last item by reservoir algorithm
r = self._random.randint(1, self._num_items_index)
if r >= self._max_size:
self._items.pop(-1)
self._items.append(item)
self._num_items_index += 1
return
if item.id == self.min_scalar.id or item.id == self.max_scalar.id:
# this item is max or min, should be reserved
r = self._random.randint(1, self._max_size - 1)
self._last_special = True
else:
# drop by reservoir algorithm
r = self._random.randint(1, self._num_items_index)
self._last_special = False
if r < self._max_size:
if self._items[r].id == self.min_scalar.id or self._items[
r].id == self.max_scalar.id:
# reserve max and min point
if r - 1 > 0:
r = r - 1
elif r + 1 < self._max_size:
r = r + 1
self._items.pop(r)
self._items.append(item)
self._num_items_index += 1
def add_scalars_item(self, item):
""" Add an scalar item to bucket, replacing an old item with probability.
Use reservoir sampling to add a new item to sampling bucket,
each item in a steam has same probability stay in the bucket.
Args:
item: The item to add to reservoir bucket.
"""
with self._mutex:
# save max and min value
if not self.max_scalar or self.max_scalar.tag_value.value < item.tag_value.value:
self.max_scalar = item
if not self.min_scalar or self.min_scalar.tag_value.value > item.tag_value.value:
self.min_scalar = item
if len(self._items) < self._max_size or self._max_size == 0:
# capacity is valid, append directly
self._items.append(item) self._items.append(item)
else: else:
if self._last_special:
if self._items[-1].id == self.min_scalar.id or self._items[
-1].id == self.max_scalar.id:
# data is not monotic, set special to False
self._last_special = False
else:
# data is monotic, drop last item by reservoir algorithm
r = self._random.randint(1, self._num_items_index)
if r >= self._max_size:
self._items.pop(-1)
self._items.append(item)
self._num_items_index += 1
return
if item.id == self.min_scalar.id or item.id == self.max_scalar.id: if item.id == self.min_scalar.id or item.id == self.max_scalar.id:
# this item is max or min, should be reserved
r = self._random.randint(1, self._max_size - 1) r = self._random.randint(1, self._max_size - 1)
self._last_special = True
else: else:
# drop by reservoir algorithm
r = self._random.randint(1, self._num_items_index) r = self._random.randint(1, self._num_items_index)
self._last_special = False
if r < self._max_size: if r < self._max_size:
if self._items[r].id == self.min_scalar.id or self._items[r].id == self.max_scalar.id: if self._items[r].id == self.min_scalar.id or self._items[
r].id == self.max_scalar.id:
# reserve max and min point
if r - 1 > 0: if r - 1 > 0:
r = r - 1 r = r - 1
elif r + 1 < self._max_size: elif r + 1 < self._max_size:
...@@ -405,8 +511,8 @@ class DataManager(object): ...@@ -405,8 +511,8 @@ class DataManager(object):
item: The item to add to reservoir bucket. item: The item to add to reservoir bucket.
""" """
with self._mutex: with self._mutex:
if 'scalar' == plugin: if 'scalar' == plugin or 'scalars' == plugin: # We adapt scalars data to be saved in scalar reservoir.
self._reservoirs[plugin].add_scalar_item(run, tag, item) self._reservoirs['scalar'].add_scalar_item(run, tag, item)
else: else:
self._reservoirs[plugin].add_item(run, tag, item) self._reservoirs[plugin].add_item(run, tag, item)
......
...@@ -335,20 +335,37 @@ def get_scalar(log_reader, run, tag): ...@@ -335,20 +335,37 @@ def get_scalar(log_reader, run, tag):
log_reader.load_new_data() log_reader.load_new_data()
records = log_reader.data_manager.get_reservoir("scalar").get_items( records = log_reader.data_manager.get_reservoir("scalar").get_items(
run, decode_tag(tag)) run, decode_tag(tag))
results = [[ results = [[
s2ms(item.timestamp), item.id, s2ms(item.timestamp), item.id,
transfer_abnomal_scalar_value(item.value) transfer_abnomal_scalar_value(item.value)
] if item.WhichOneof("one_value") == "value" else [
s2ms(item.timestamp), item.id,
transfer_abnomal_scalar_value(item.tag_value.value)
] for item in records] ] for item in records]
return results return results
def get_scalar_data(log_reader, run, tag, type='tsv'): def get_scalar_data(log_reader, run, tag, type='tsv'):
is_scalars = False
if os.path.basename(run).startswith(decode_tag(tag).replace('%', '_')) and \
log_reader.tags().get(bfile.join(os.path.dirname(run), decode_tag(tag)), None) == 'scalars':
run = os.path.dirname(run)
is_scalars = True
run = log_reader.name2tags[run] if run in log_reader.name2tags else run run = log_reader.name2tags[run] if run in log_reader.name2tags else run
log_reader.load_new_data() log_reader.load_new_data()
if is_scalars:
result = log_reader.get_log_data('scalars', run, decode_tag(tag))
else:
result = log_reader.get_log_data('scalar', run, decode_tag(tag)) result = log_reader.get_log_data('scalar', run, decode_tag(tag))
print('scalar', result, 'run', run, 'tag', decode_tag(tag))
delimeter = '\t' if 'tsv' == type else ',' delimeter = '\t' if 'tsv' == type else ','
with io.StringIO() as fp: with io.StringIO() as fp:
csv_writer = csv.writer(fp, delimiter=delimeter) csv_writer = csv.writer(fp, delimiter=delimeter)
if is_scalars:
csv_writer.writerow(['id', 'tag', 'sub_tag', 'timestamp', 'value'])
else:
csv_writer.writerow(['id', 'tag', 'timestamp', 'value']) csv_writer.writerow(['id', 'tag', 'timestamp', 'value'])
csv_writer.writerows(result) csv_writer.writerows(result)
result = fp.getvalue() result = fp.getvalue()
......
...@@ -26,6 +26,7 @@ from visualdl.component.base_component import meta_data ...@@ -26,6 +26,7 @@ from visualdl.component.base_component import meta_data
from visualdl.component.base_component import pr_curve from visualdl.component.base_component import pr_curve
from visualdl.component.base_component import roc_curve from visualdl.component.base_component import roc_curve
from visualdl.component.base_component import scalar from visualdl.component.base_component import scalar
from visualdl.component.base_component import scalars
from visualdl.component.base_component import text from visualdl.component.base_component import text
from visualdl.component.graph import translate_graph from visualdl.component.graph import translate_graph
from visualdl.io import bfile from visualdl.io import bfile
...@@ -187,6 +188,30 @@ class LogWriter(object): ...@@ -187,6 +188,30 @@ class LogWriter(object):
self._get_file_writer().add_record( self._get_file_writer().add_record(
scalar(tag=tag, value=value, step=step, walltime=walltime)) scalar(tag=tag, value=value, step=step, walltime=walltime))
def add_scalars(self, main_tag, tag_scalar_dict, step, walltime=None):
"""Add a group of scalars to vdl record file.
Args:
main_tag (string): Data identifier
tag_scalar_dict (float): A dict to provide multi-values with tags
step (int): Step of scalar
walltime (int): Wall time of scalar
Example:
import math
for index in range(1, 101):
alpha = index*2*math.pi/100
tval = {'sin':math.sin(alpha), 'cos':math.cos(alpha)}
writer.add_scalars(tag="sin_and_cos", tag_value=tval, step=index)
"""
if '%' in main_tag:
raise RuntimeError("% can't appear in tag!")
if not isinstance(tag_scalar_dict, dict):
raise RuntimeError("tag_value must be a dict!")
walltime = round(time.time() * 1000) if walltime is None else walltime
for record in scalars(main_tag, tag_scalar_dict, step, walltime):
self._get_file_writer().add_record(record)
def add_image(self, tag, img, step, walltime=None, dataformats="HWC"): def add_image(self, tag, img, step, walltime=None, dataformats="HWC"):
"""Add an image to vdl record file. """Add an image to vdl record file.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册