diff --git a/visualdl/logic/pybind.cc b/visualdl/logic/pybind.cc index aff4aa6c99331f8751f4500016b58a958124deb3..1da91f6e5c2cdf39e6e21788a1886f2a5bfe4d27 100644 --- a/visualdl/logic/pybind.cc +++ b/visualdl/logic/pybind.cc @@ -154,11 +154,12 @@ PYBIND11_MODULE(core, m) { }); //------------------- components -------------------- -#define ADD_SCALAR_READER(T) \ - py::class_>(m, "ScalarReader__" #T) \ - .def("records", &cp::ScalarReader::records) \ - .def("timestamps", &cp::ScalarReader::timestamps) \ - .def("ids", &cp::ScalarReader::ids) \ +#define ADD_SCALAR_READER(T) \ + py::class_>(m, "ScalarReader__" #T) \ + .def("records", &cp::ScalarReader::records, py::arg("start_index") = 0) \ + .def("timestamps", &cp::ScalarReader::timestamps, py::arg("start_index") = 0) \ + .def("ids", &cp::ScalarReader::ids, py::arg("start_index") = 0) \ + .def("size", &cp::ScalarReader::size) \ .def("caption", &cp::ScalarReader::caption); ADD_SCALAR_READER(int); ADD_SCALAR_READER(float); @@ -390,9 +391,11 @@ PYBIND11_MODULE(core, m) { ADD_FULL_TYPE_IMPL(ADD_HISTOGRAM_RECORD) #undef ADD_HISTOGRAM_RECORD -#define ADD_HISTOGRAM_READER(T) \ - py::class_>(m, "HistogramReader__" #T) \ - .def("num_records", &cp::HistogramReader::num_records) \ +#define ADD_HISTOGRAM_READER(T) \ + py::class_>(m, "HistogramReader__" #T) \ + .def("num_records", &cp::HistogramReader::num_records) \ + .def("records", &cp::HistogramReader::records, py::arg("start_index") = 0) \ + .def("size", &cp::HistogramReader::size) \ .def("record", &cp::HistogramReader::record); ADD_FULL_TYPE_IMPL(ADD_HISTOGRAM_READER) #undef ADD_HISTOGRAM_READER diff --git a/visualdl/logic/sdk.cc b/visualdl/logic/sdk.cc index 0e73c5df0a41b1bdb683e3c315b09da7d0c96031..1064dcb70a643d380b59bc2e58ed5e9508b7198d 100644 --- a/visualdl/logic/sdk.cc +++ b/visualdl/logic/sdk.cc @@ -108,27 +108,27 @@ bool LogReader::TagMatchMode(const std::string& tag, const std::string& mode) { namespace components { template -std::vector ScalarReader::records() const { +std::vector ScalarReader::records(size_t start_index) const { std::vector res; - for (int i = 0; i < total_records(); i++) { + for (size_t i = start_index; i < total_records(); ++i) { res.push_back(reader_.record(i).data(0).template Get()); } return res; } template -std::vector ScalarReader::ids() const { - std::vector res; - for (int i = 0; i < reader_.total_records(); i++) { +std::vector ScalarReader::ids(size_t start_index) const { + std::vector res; + for (size_t i = start_index; i < reader_.total_records(); ++i) { res.push_back(reader_.record(i).id()); } return res; } template -std::vector ScalarReader::timestamps() const { +std::vector ScalarReader::timestamps(size_t start_index) const { std::vector res; - for (int i = 0; i < reader_.total_records(); i++) { + for (size_t i = start_index; i < reader_.total_records(); ++i) { res.push_back(reader_.record(i).timestamp()); } return res; @@ -297,7 +297,7 @@ void Histogram::AddRecord(int step, const std::vector& data) { } template -HistogramRecord HistogramReader::record(int i) { +HistogramRecord HistogramReader::record(int i) const { CHECK_LT(i, num_records()); auto r = reader_.record(i); auto d = r.data(0); @@ -313,6 +313,22 @@ HistogramRecord HistogramReader::record(int i) { return HistogramRecord(timestamp, step, left, right, std::move(frequency)); } +template +std::vector> HistogramReader::records(size_t start_index) const { + std::vector> res; + + for (size_t i = start_index; i < reader_.total_records(); ++i) { + res.push_back(record(i)); + } + + return res; +} + +template +size_t HistogramReader::size() const { + return reader_.total_records(); +} + DECL_BASIC_TYPES_CLASS_IMPL(class, ScalarReader) DECL_BASIC_TYPES_CLASS_IMPL(struct, Histogram) DECL_BASIC_TYPES_CLASS_IMPL(struct, HistogramReader) diff --git a/visualdl/logic/sdk.h b/visualdl/logic/sdk.h index 65e6a47d17309e43eb280f461afddd4f878b9210..1d0a2b9399d75b9ab9309810ec07ac0678832bdf 100644 --- a/visualdl/logic/sdk.h +++ b/visualdl/logic/sdk.h @@ -122,9 +122,9 @@ template struct ScalarReader { ScalarReader(TabletReader&& reader) : reader_(reader) {} - std::vector records() const; - std::vector ids() const; - std::vector timestamps() const; + std::vector records(size_t start_index = 0) const; + std::vector ids(size_t start_index = 0) const; + std::vector timestamps(size_t start_index = 0) const; std::string caption() const; size_t total_records() const { return reader_.total_records(); } size_t size() const; @@ -279,9 +279,13 @@ template struct HistogramReader { HistogramReader(TabletReader tablet) : reader_(tablet) {} - size_t num_records() { return reader_.total_records() - 1; } + size_t num_records() const { return reader_.total_records(); } + + std::vector> records(size_t start_index = 0) const; - HistogramRecord record(int i); + HistogramRecord record(int i) const; + + size_t size() const; private: TabletReader reader_; diff --git a/visualdl/server/data_manager.py b/visualdl/server/data_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..426b250679b1f6e904fb7051bcea09e009166892 --- /dev/null +++ b/visualdl/server/data_manager.py @@ -0,0 +1,315 @@ +# Copyright (c) 2020 VisualDL Authors. All Rights Reserve. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ======================================================================= + +from __future__ import absolute_import +import threading +import random +import collections + +DEFAULT_PLUGIN_MAXSIZE = { + "scalar": 300, + "image": 10, + "histogram": 100 +} + + +class Reservoir(object): + """A map-to-arrays dict, with deterministic Reservoir Sampling. + + Store each reservoir bucket by key, and each bucket is a list sampling + with reservoir algorithm. + """ + + def __init__(self, max_size, seed=0): + """Creates a new reservoir. + + Args: + max_size: The number of values to keep in the reservoir for each tag, + if max_size is zero, all values will be kept in bucket. + seed: The seed to initialize a random.Random(). + num_item_index: The index of data to add. + + Raises: + ValueError: If max_size is not a nonnegative integer. + """ + if max_size < 0 or max_size != round(max_size): + raise ValueError("Max_size must be nonnegative integer.") + self._max_size = max_size + self._buckets = collections.defaultdict( + lambda : _ReservoirBucket(max_size=self._max_size, + random_instance=random.Random(seed)) + ) + self._mutex = threading.Lock() + + @property + def keys(self): + """Return all keys in self._buckets. + + Returns: + All keys in reservoir buckets. + :return: + """ + with self._mutex: + return list(self._buckets.keys()) + + def _exist_in_keys(self, key): + """Determine if key exists. + + Args: + key: Key to determine if exists. + + Returns: + True if key exists in buckets.keys, otherwise False. + """ + return True if key in self._buckets.keys() else False + + def exist_in_keys(self, mode, tag): + """Determine if mode_tag exists. + + For usage habits of VisualDL, actually call self._exist_in_keys() + + Args: + mode: Identity of one tablet. + tag: Identity of one record in tablet. + + Returns: + True if mode_tag exists in buckets.keys, otherwise False. + """ + key = mode + "_" + tag + return self._exist_in_keys(key) + + def _get_num_items_index(self, key): + keys = self.keys + if key not in keys: + raise KeyError("Key %s not in buckets.keys()" % key) + return self._buckets[key].num_items_index + + def get_num_items_index(self, mode, tag): + key = mode + "_" + tag + return self._get_num_items_index(key) + + def _get_items(self, key): + """Get items with tag "key" + + Args: + key: Key to finding bucket in reservoir buckets. + + Returns: + One bucket in reservoir buckets by key. + """ + keys = self.keys + with self._mutex: + if key not in keys: + raise KeyError("Key %s not in buckets.keys()" % key) + return self._buckets[key].items + + def get_items(self, mode, tag): + """Get items with tag 'mode_tag' + + For usage habits of VisualDL, actually call self._get_items() + + Args: + mode: Identity of one tablet. + tag: Identity of one record in tablet. + + Returns: + One bucket in reservoir buckets by mode and tag. + """ + key = mode + "_" + tag + return self._get_items(key) + + def _add_item(self, key, item): + """Add a new item to reservoir buckets with given tag as key. + + If bucket with key has not yet reached full size, each item will be + added. + + If bucket with key is full, each item will be added with same + probability. + + Add new item to buckets will always valid because self._buckets is a + collection.defaultdict. + + Args: + key: Tag of one bucket to add new item. + item: New item to add to bucket. + """ + with self._mutex: + self._buckets[key].add_item(item) + + def add_item(self, mode, tag, item): + """Add a new item to reservoir buckets with given tag as key. + + For usage habits of VisualDL, actually call self._add_items() + + Args: + mode: Identity of one tablet. + tag: Identity of one record in tablet. + item: New item to add to bucket. + """ + key = mode + "_" + tag + self._add_item(key, item) + + +class _ReservoirBucket(object): + """Data manager for sampling data, use reservoir sampling. + """ + def __init__(self, max_size, random_instance=None): + """Create a _ReservoirBucket instance. + + Args: + max_size: The maximum size of reservoir bucket. If max_size is + zero, the bucket has unbounded size. + random_instance: The random number generator. If not specified, + default to random.Random(0) + num_item_index: The index of data to add. + + Raises: + ValueError: If args max_size is not a nonnegative integer. + """ + if max_size < 0 or max_size != round(max_size): + raise ValueError("Max_size must be nonnegative integer.") + self._max_size = max_size + self._random = random_instance if random_instance is not None else \ + random.Random(0) + self._items = [] + self._mutex = threading.Lock() + self._num_items_index = 0 + + def add_item(self, item): + """ Add an item to bucket, replacing an old item with probability. + + Use reservoir sampling to add a new item to sampling bucket, + each item in a steam has same probability stay in the bucket. + + Args: + item: The item to add to reservoir bucket. + """ + with self._mutex: + if len(self._items) < self._max_size or self._max_size == 0: + self._items.append(item) + else: + r = self._random.randint(0, self._num_items_index) + if r < self._max_size: + self._items.pop(r) + self._items.append(item) + self._num_items_index += 1 + + @property + def items(self): + """Get self._items + + Returns: + All items. + """ + with self._mutex: + return self._items + + @property + def num_items_index(self): + with self._mutex: + return self._num_items_index + + +class DataManager(object): + """Data manager for all plugin. + """ + def __init__(self): + """Create a data manager for all plugin. + + All kinds of plugin has own reservoir, stored in a dict with plugin + name as key. + + """ + self._scalar_reservoir = Reservoir(max_size=DEFAULT_PLUGIN_MAXSIZE["scalar"]) + self._histogram_reservoir = Reservoir(max_size=DEFAULT_PLUGIN_MAXSIZE["histogram"]) + self._image_reservoir = Reservoir(max_size=DEFAULT_PLUGIN_MAXSIZE["image"]) + + self._reservoirs = {"scalar": self._scalar_reservoir, + "histogram": self._histogram_reservoir, + "image": self._image_reservoir} + self._mutex = threading.Lock() + + def get_reservoir(self, plugin): + """Get reservoir by plugin as key. + + Args: + plugin: Key to get one reservoir bucket for one specified plugin. + + Returns: + Reservoir bucket for plugin. + """ + with self._mutex: + if plugin not in self._reservoirs.keys(): + raise KeyError("Key %s not in reservoirs." % plugin) + return self._reservoirs[plugin] + + def add_item(self, plugin, mode, tag, item): + """Add item to one plugin reservoir bucket. + + Use 'mode', 'tag' for usage habits of VisualDL. + + Args: + plugin: Key to get one reservoir bucket. + mode: Each tablet has different 'mode'. + tag: Tag will be used to generate paths of tablets. + item: The item to add to reservoir bucket. + """ + with self._mutex: + self._reservoirs[plugin].add_item(mode, tag, item) + + def get_keys(self): + """Get all plugin buckets name. + + Returns: + All plugin keys. + """ + with self._mutex: + return self._reservoirs.keys() + + +default_data_manager = DataManager() + + +if __name__ == '__main__': + d = DataManager() + d.add_item("scalar", "train", "loss", 3) + d.add_item("scalar", "train", "loss", 4) + d.add_item("scalar", "train", "loss", 5) + + d.add_item("scalar", "train", "accu", 5) + d.add_item("scalar", "train", "accu", 6) + d.add_item("scalar", "train", "accu", 7) + d.add_item("scalar", "train", "accu", 8) + a = d.get_keys() + print(a) + b = d.get_reservoir("scalar").get_items("train", "loss") + print(b) + c = d.get_reservoir("scalar").get_num_items_index("train", "loss") + print(c) + print("***") + b = d.get_reservoir("scalar").get_items("train", "accu") + print(b) + print(d.get_reservoir("scalar").get_num_items_index("train", "accu")) + + d.add_item("scalar", "train", "loss", 3) + d.add_item("scalar", "train", "loss", 4) + d.add_item("scalar", "train", "loss", 5) + + c = d.get_reservoir("scalar").get_num_items_index("train", "loss") + print(c) + + print(d.get_reservoir("scalar").exist_in_keys("train", "loss")) + print(d.get_reservoir("scalar").exist_in_keys("train", "loss2")) \ No newline at end of file diff --git a/visualdl/server/lib.py b/visualdl/server/lib.py index 8fe4f4115348c04fd6e76f48848e574b115fde40..de18641827072e92f9c22d842da5d1f06c1fff63 100644 --- a/visualdl/server/lib.py +++ b/visualdl/server/lib.py @@ -22,6 +22,7 @@ import numpy as np from PIL import Image from .log import logger import wave +from .data_manager import default_data_manager try: from urllib.parse import urlencode @@ -48,38 +49,31 @@ def get_scalar_tags(storage): def get_scalar(storage, mode, tag, num_records=300): - assert num_records > 1 - with storage.mode(mode) as reader: scalar = reader.scalar(tag) - records = scalar.records() - ids = scalar.ids() - timestamps = scalar.timestamps() + data_reservoir = default_data_manager.get_reservoir("scalar") + if not data_reservoir.exist_in_keys(mode=mode, tag=tag): + records = scalar.records() + ids = scalar.ids() + timestamps = scalar.timestamps() - data = list(zip(timestamps, ids, records)) - data_size = len(data) + data = list(zip(timestamps, ids, records)) - if data_size <= num_records: - return data + for index in range(len(data)): + data_reservoir.add_item(mode=mode, tag=tag, item=data[index]) + else: + num_items_index = data_reservoir.get_num_items_index(mode, tag) + if num_items_index != scalar.size(): + records = scalar.records(num_items_index) + ids = scalar.ids(num_items_index) + timestamps = scalar.timestamps(num_items_index) - span = float(data_size) / (num_records - 1) - span_offset = 0 + data = list(zip(timestamps, ids, records)) - data_idx = int(span_offset * span) - sampled_data = [] - - while data_idx < data_size: - sampled_data.append(data[data_size - data_idx - 1]) - span_offset += 1 - data_idx = int(span_offset * span) - - sampled_data.append(data[0]) - res = sampled_data[::-1] - # TODO(Superjomn) some bug here, sometimes there are zero here. - if res[-1] == 0.: - res = res[:-1] - return res + for index in range(len(data)): + data_reservoir.add_item(mode=mode, tag=tag, item=data[index]) + return data_reservoir.get_items(mode, tag) def get_image_tags(storage): @@ -111,15 +105,27 @@ def get_image_tag_steps(storage, mode, tag): image = reader.image(tag) res = [] - for step_index in range(image.num_records()): - record = image.record(step_index, sample_index) - try: + image_num_records = image.num_records() + num_samples = 10 + if image_num_records <= num_samples: + for step_index in range(image_num_records): + record = image.record(step_index, sample_index) + res.append({ 'step': record.step_id(), - 'wallTime': image.timestamp(step_index), + 'wallTime': image.timestamp(step_index) + }) + + else: + span = float(image_num_records) / num_samples + for index in range(num_samples): + step_index = round(span * index) + record = image.record(step_index, sample_index) + + res.append({ + 'step': record.step_id(), + 'wallTime': image.timestamp(step_index) }) - except Exception: - logger.error("image sample out of range") return res @@ -291,45 +297,30 @@ def get_embeddings(storage, mode, reduction, dimension=2, num_records=5000): def get_histogram(storage, mode, tag): with storage.mode(mode) as reader: histogram = reader.histogram(tag) - res = [] - - for i in range(histogram.num_records()): - try: - # some bug with protobuf, some times may overflow - record = histogram.record(i) - except Exception: - continue - - res.append([]) - py_record = res[-1] - py_record.append(record.timestamp()) - py_record.append(record.step()) - py_record.append([]) - - data = py_record[-1] - for j in range(record.num_instances()): - instance = record.instance(j) - data.append( - [instance.left(), instance.right(), instance.frequency()]) - - # num_samples: We will only return 100 samples. - num_samples = 100 - if len(res) < num_samples: - return res - - # sample some steps - span = float(len(res)) / (num_samples - 1) - span_offset = 0 - data_idx = 0 - - sampled_data = [] - data_size = len(res) - while data_idx < data_size: - sampled_data.append(res[data_size - data_idx - 1]) - span_offset += 1 - data_idx = int(span_offset * span) - sampled_data.append(res[0]) - return sampled_data[::-1] + data_reservoir = default_data_manager.get_reservoir("histogram") + if not data_reservoir.exist_in_keys(mode=mode, tag=tag): + records = histogram.records() + + for record in records: + data = [] + for j in range(record.num_instances()): + instance = record.instance(j) + data.append( + [instance.left(), instance.right(), instance.frequency()]) + data_reservoir.add_item(mode, tag, [record.timestamp(), record.step(), data]) + else: + num_items_index = data_reservoir.get_num_items_index(mode, tag) + if num_items_index != histogram.size(): + records = histogram.records(num_items_index) + + for record in records: + data = [] + for j in range(record.num_instances()): + instance = record.instance(j) + data.append( + [instance.left(), instance.right(), instance.frequency()]) + data_reservoir.add_item(mode, tag, [record.timestamp(), record.step(), data]) + return data_reservoir.get_items(mode, tag) def retry(ntimes, function, time2sleep, *args, **kwargs):