From dac34b3b281eb6ecca52b5bea1079954dbe3585e Mon Sep 17 00:00:00 2001 From: superjom Date: Wed, 29 Nov 2017 19:15:27 +0800 Subject: [PATCH] add read and write maintain service --- visualdl/backend/logic/im.cc | 4 +- visualdl/backend/logic/im.h | 21 +++++++-- visualdl/backend/logic/pybind.cc | 9 +++- visualdl/backend/logic/sdk.h | 62 +++++-------------------- visualdl/backend/python/summary.py | 30 ++++++++++-- visualdl/backend/python/test_summary.py | 4 +- visualdl/backend/storage/storage.cc | 10 ++-- visualdl/backend/utils/concurrency.h | 5 ++ 8 files changed, 74 insertions(+), 71 deletions(-) diff --git a/visualdl/backend/logic/im.cc b/visualdl/backend/logic/im.cc index 817ea6b1..305fb42c 100644 --- a/visualdl/backend/logic/im.cc +++ b/visualdl/backend/logic/im.cc @@ -27,9 +27,9 @@ int ReserviorSample(int num_samples, int num_records) { IM::IM(StorageBase::Type type, StorageBase::Mode mode) { switch (type) { - case StorageBase::Type::kMemory: + case StorageBase::Type::kMemory: { storage_.reset(new MemoryStorage); - break; + } break; default: CHECK(false) << "Unsupported storage kind " << type; } diff --git a/visualdl/backend/logic/im.h b/visualdl/backend/logic/im.h index 1d8245c8..de884b0a 100644 --- a/visualdl/backend/logic/im.h +++ b/visualdl/backend/logic/im.h @@ -2,6 +2,7 @@ #define VISUALDL_BACKEND_LOGIC_IM_H #include +#include #include #include @@ -10,7 +11,7 @@ namespace visualdl { /* - * IM(IM) maintain the Storage singleton in memory, + * IM(Information Maintainer) maintain the Storage singleton in memory, * pre-compute some the statistical information to help visualizaton. * * There should be two processes and each have an IM, one is the web server @@ -26,12 +27,22 @@ namespace visualdl { */ class IM final { public: - IM(StorageBase::Type type = StorageBase::Type::kMemory, - StorageBase::Mode mode = StorageBase::Mode::kNone); + IM() { storage_.reset(new MemoryStorage); } + IM(StorageBase::Type type, StorageBase::Mode mode); + ~IM() { cc::PeriodExector::Global().Quit(); } static IM &Global() { - static IM *x = new IM(); - return *x; + static IM x; + return x; + } + + void MaintainRead() { + LOG(INFO) << "start maintain read"; + dynamic_cast(storage_.get())->StartReadService(); + } + + void MaintainWrite() { + dynamic_cast(storage_.get())->StartWriteSerice(); } /* diff --git a/visualdl/backend/logic/pybind.cc b/visualdl/backend/logic/pybind.cc index b6ff06f1..9b7f6772 100644 --- a/visualdl/backend/logic/pybind.cc +++ b/visualdl/backend/logic/pybind.cc @@ -55,7 +55,14 @@ PYBIND11_PLUGIN(core) { .def("persist_to_disk", &vs::ImHelper::PersistToDisk) .def("clear_tablets", &vs::ImHelper::ClearTablets); - m.def("im", &vs::get_im, "global information-maintainer object."); + m.def("start_read_service", + &vs::start_read_service, + "global information-maintainer object."); + m.def("start_write_service", + &vs::start_write_service, + "global information-maintainer object."); + m.def("im", &vs::im); + m.def("stop_threads", &vs::StopThreads); // interfaces for components #define ADD_SCALAR_TYPED_INTERFACE(T, name__) \ diff --git a/visualdl/backend/logic/sdk.h b/visualdl/backend/logic/sdk.h index 963834e8..e97d4f85 100644 --- a/visualdl/backend/logic/sdk.h +++ b/visualdl/backend/logic/sdk.h @@ -77,7 +77,10 @@ public: void SetBuffer(const storage::Storage &buffer) { *data_ = buffer; } void SetBuffer(const std::string &buffer) { data_->ParseFromString(buffer); } - void SetDir(const std::string &dir) { data_->set_dir(dir); } + void SetDir(const std::string &dir) { + CHECK(data_) << "no storage instance hold"; + data_->set_dir(dir); + } int64_t timestamp() const { return data_->timestamp(); } std::string dir() const { return data_->dir(); } @@ -86,62 +89,19 @@ public: std::string human_readable_buffer() const; private: - storage::Storage *data_; + storage::Storage *data_{nullptr}; }; class ImHelper { public: ImHelper() {} - StorageHelper storage() { - return StorageHelper(IM::Global().storage().mutable_data()); - } - TabletHelper tablet(const std::string &tag) { - return TabletHelper(IM::Global().storage().tablet(tag)); - } - TabletHelper AddTablet(const std::string &tag, int num_samples) { - return TabletHelper(IM::Global().AddTablet(tag, num_samples)); - } - void ClearTablets() { - IM::Global().storage().mutable_data()->clear_tablets(); - } - - void PersistToDisk() const; -}; - -namespace components { - -/* - * Read and write support for Scalar component. - */ -template -class ScalarHelper { -public: - ScalarHelper(storage::Tablet *tablet) : data_(tablet) {} - - void SetCaptions(const std::vector &captions); - - void AddRecord(int id, const std::vector &values); - - std::vector> GetRecords() const; - - std::vector GetIds() const; - - std::vector GetTimestamps() const; - - std::vector GetCaptions() const; - - size_t GetSize() const { return data_->records_size(); } - -private: - storage::Tablet *data_; -}; - -} // namespace components - -static ImHelper &get_im() { - static ImHelper im; - return im; + /* + * mode: + * 0: read + * 1: write + * 2: none + */ } } // namespace visualdl diff --git a/visualdl/backend/python/summary.py b/visualdl/backend/python/summary.py index 59877aca..13b9400c 100644 --- a/visualdl/backend/python/summary.py +++ b/visualdl/backend/python/summary.py @@ -4,8 +4,6 @@ __all__ = [ ] import core -im = core.im() - dtypes = ("float", "double", "int32", "int64") @@ -15,7 +13,19 @@ def set_storage(dir): directory of summary to write log. :return: None ''' - im.storage().set_dir(dir) + core.im().storage().set_dir(dir) + + +def set_readable_storage(dir): + core.start_read_service(dir) + + +def set_writable_storage(dir): + core.start_write_service(dir) + + +def stop_service(): + core.stop_threads() class _Scalar(object): @@ -95,7 +105,19 @@ def scalar(tag, dtype='float'): ''' assert dtype in dtypes, "invalid dtype(%s), should be one of %s" % ( dtype, str(dtypes)) - tablet = im.add_tablet(tag, -1) + tablet = core.im().add_tablet(tag, -1) + dtype2obj = { + 'float': tablet.as_float_scalar, + 'double': tablet.as_double_scalar, + 'int32': tablet.as_int32_scalar, + 'int64': tablet.as_int64_scalar, + } + obj = dtype2obj[dtype]() + return _Scalar(obj) + + +def read_scalar(tag, dtype='float'): + tablet = core.im().tablet(tag) dtype2obj = { 'float': tablet.as_float_scalar, 'double': tablet.as_double_scalar, diff --git a/visualdl/backend/python/test_summary.py b/visualdl/backend/python/test_summary.py index 24fec43f..c3caab75 100644 --- a/visualdl/backend/python/test_summary.py +++ b/visualdl/backend/python/test_summary.py @@ -1,14 +1,14 @@ import summary import numpy as np import unittest - -summary.set_storage("tmp_dir") +import time once_flag = False class ScalarTester(unittest.TestCase): def setUp(self): + summary.set_storage("tmp_dir") global once_flag self.scalar = summary.scalar("scalar0") if not once_flag: diff --git a/visualdl/backend/storage/storage.cc b/visualdl/backend/storage/storage.cc index b74840d1..9ed2e20e 100644 --- a/visualdl/backend/storage/storage.cc +++ b/visualdl/backend/storage/storage.cc @@ -40,10 +40,7 @@ storage::Tablet *MemoryStorage::tablet(const std::string &tag) { // TODO add some checksum to avoid unnecessary saving void MemoryStorage::PersistToDisk() const { - LOG(INFO) << "inside dir " << storage_.dir() << " " - << (!storage_.dir().empty()); CHECK(!storage_.dir().empty()) << "storage's dir should be set first"; - LOG(INFO) << "after check dir"; VLOG(3) << "persist storage to disk path " << storage_.dir(); // make a directory if not exist fs::TryMkdir(storage_.dir()); @@ -59,7 +56,6 @@ void MemoryStorage::PersistToDisk() const { // TODO add some checksum to avoid unnecessary loading void MemoryStorage::LoadFromDisk(const std::string &dir) { - VLOG(3) << "load storage from disk path " << dir; CHECK(!dir.empty()) << "dir is empty"; storage_.set_dir(dir); // load storage @@ -75,16 +71,18 @@ void MemoryStorage::LoadFromDisk(const std::string &dir) { void MemoryStorage::StartReadService() { cc::PeriodExector::task_t task = [this] { - LOG(INFO) << "loading from " << storage_.dir(); + VLOG(3) << "loading from " << storage_.dir(); LoadFromDisk(storage_.dir()); return true; }; + cc::PeriodExector::Global().Start(); + VLOG(3) << "push read task"; cc::PeriodExector::Global()(std::move(task), 2512); } void MemoryStorage::StartWriteSerice() { + cc::PeriodExector::Global().Start(); cc::PeriodExector::task_t task = [this] { - LOG(INFO) << "writing to " << storage_.dir(); PersistToDisk(); return true; }; diff --git a/visualdl/backend/utils/concurrency.h b/visualdl/backend/utils/concurrency.h index f964a9c4..a3d46a9a 100644 --- a/visualdl/backend/utils/concurrency.h +++ b/visualdl/backend/utils/concurrency.h @@ -27,14 +27,18 @@ struct PeriodExector { quit = true; } + void Start() { quit = false; } + void operator()(task_t&& task, int msec) { auto task_wrapper = [=] { while (!quit) { if (!task()) break; std::this_thread::sleep_for(std::chrono::milliseconds(msec)); } + LOG(INFO) << "quit job"; }; threads_.emplace_back(std::thread(std::move(task_wrapper))); + msec_ = msec; } ~PeriodExector() { @@ -48,6 +52,7 @@ struct PeriodExector { private: bool quit = false; std::vector threads_; + int msec_; }; } // namespace cc -- GitLab