diff --git a/CMakeLists.txt b/CMakeLists.txt index 9ab453b7520134448e4e418751541638ea2d06dd..b4e6b67b22386444418d8f17d08311de87096c10 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,5 +31,9 @@ add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/backend/logic) add_executable(vl_test ${PROJECT_SOURCE_DIR}/visualdl/backend/test.cc ${PROJECT_SOURCE_DIR}/visualdl/backend/storage/storage_test.cc - ${PROJECT_SOURCE_DIR}/visualdl/backend/logic/im_test.cc) + ${PROJECT_SOURCE_DIR}/visualdl/backend/utils/test_concurrency.cc + ${PROJECT_SOURCE_DIR}/visualdl/backend/logic/im_test.cc + ${PROJECT_SOURCE_DIR}/visualdl/backend/utils/concurrency.h + ${PROJECT_SOURCE_DIR}/visualdl/backend/utils/filesystem.h + ) target_link_libraries(vl_test storage im gtest glog protobuf gflags pthread) diff --git a/visualdl/backend/logic/im.cc b/visualdl/backend/logic/im.cc index e33ac2768bd5325051f9a71ec6be880389c06dcd..305fb42c73cf4c6992a7d7fbd71ccbb00f7f7a5d 100644 --- a/visualdl/backend/logic/im.cc +++ b/visualdl/backend/logic/im.cc @@ -25,20 +25,39 @@ int ReserviorSample(int num_samples, int num_records) { return -1; } -void InformationMaintainer::SetPersistDest(const std::string &path) { +IM::IM(StorageBase::Type type, StorageBase::Mode mode) { + switch (type) { + case StorageBase::Type::kMemory: { + storage_.reset(new MemoryStorage); + } break; + default: + CHECK(false) << "Unsupported storage kind " << type; + } + + switch (mode) { + case StorageBase::Mode::kRead: + dynamic_cast(storage_.get())->StartReadService(); + break; + case StorageBase::Mode::kWrite: + dynamic_cast(storage_.get())->StartWriteSerice(); + break; + default: + break; + } +} + +void IM::SetPersistDest(const std::string &path) { CHECK(storage_->mutable_data()->dir().empty()) << "duplicate set storage's path"; storage_->mutable_data()->set_dir(path); } -storage::Tablet *InformationMaintainer::AddTablet(const std::string &tag, - int num_samples) { +storage::Tablet *IM::AddTablet(const std::string &tag, int num_samples) { auto tablet = storage_->NewTablet(tag, num_samples); return tablet; } -void InformationMaintainer::AddRecord(const std::string &tag, - const storage::Record &data) { +void IM::AddRecord(const std::string &tag, const storage::Record &data) { auto *tablet = storage_->tablet(tag); CHECK(tablet) << "no tablet called " << tag; @@ -65,14 +84,14 @@ void InformationMaintainer::AddRecord(const std::string &tag, tablet->set_total_records(num_records + 1); } -void InformationMaintainer::Clear() { +void IM::Clear() { auto *data = storage().mutable_data(); data->clear_tablets(); data->clear_dir(); data->clear_timestamp(); } -void InformationMaintainer::PersistToDisk() { +void IM::PersistToDisk() { CHECK(!storage_->data().dir().empty()) << "path of storage should be set"; // TODO make dir first // MakeDir(storage_.data().dir()); diff --git a/visualdl/backend/logic/im.h b/visualdl/backend/logic/im.h index d96aa4d55feb3947708d6dd26a89a8da915341fd..de884b0a890a97025e90a90342909e9b33a6ba18 100644 --- a/visualdl/backend/logic/im.h +++ b/visualdl/backend/logic/im.h @@ -2,6 +2,7 @@ #define VISUALDL_BACKEND_LOGIC_IM_H #include +#include #include #include @@ -10,24 +11,38 @@ namespace visualdl { /* - * Maintain the Storage singleton in memory, pre-compute some the statical - * information to help visualizaton. + * IM(Information Maintainer) maintain the Storage singleton in memory, + * pre-compute some the statistical information to help visualizaton. + * + * There should be two processes and each have an IM, one is the web server + * which hold one IM to read the storage, the other is the SDK(python or C++), + * it will get an IM to write latest changes to storage. + * + * An IM have an underlying Storage object, which might be a memory based + * storage or a disk based one, both has the same interfaces those defined by + * class StorageBase. + * + * The SDK's IM will maintain the changes and periodically write to disk, and + * the web server's IM will periodically read latest storage from disk. */ -class InformationMaintainer final { +class IM final { public: - InformationMaintainer(StorageBase::Type type = StorageBase::Type::kMemory) { - switch (type) { - case StorageBase::Type::kMemory: - storage_.reset(new MemoryStorage); - break; - default: - CHECK(false) << "Unsupported storage kind " << type; - } + IM() { storage_.reset(new MemoryStorage); } + IM(StorageBase::Type type, StorageBase::Mode mode); + ~IM() { cc::PeriodExector::Global().Quit(); } + + static IM &Global() { + static IM x; + return x; + } + + void MaintainRead() { + LOG(INFO) << "start maintain read"; + dynamic_cast(storage_.get())->StartReadService(); } - static InformationMaintainer &Global() { - static InformationMaintainer *x = new InformationMaintainer(); - return *x; + void MaintainWrite() { + dynamic_cast(storage_.get())->StartWriteSerice(); } /* diff --git a/visualdl/backend/logic/im_test.cc b/visualdl/backend/logic/im_test.cc index 183853e501143ffd33714b69c1c02162bf1d6f57..c387565477ee9df0ae8d2872ae6416f79e08ca7e 100644 --- a/visualdl/backend/logic/im_test.cc +++ b/visualdl/backend/logic/im_test.cc @@ -8,7 +8,7 @@ class ImTester : public ::testing::Test { protected: void SetUp() override {} - InformationMaintainer &im = InformationMaintainer::Global(); + IM &im = IM::Global(); }; TEST_F(ImTester, AddTablet) { diff --git a/visualdl/backend/logic/pybind.cc b/visualdl/backend/logic/pybind.cc index b6ff06f1aa1ab0985e475ba3e9973e322f7ac207..9b7f67727afc657b21b4274bf79cb6ffae94f4c7 100644 --- a/visualdl/backend/logic/pybind.cc +++ b/visualdl/backend/logic/pybind.cc @@ -55,7 +55,14 @@ PYBIND11_PLUGIN(core) { .def("persist_to_disk", &vs::ImHelper::PersistToDisk) .def("clear_tablets", &vs::ImHelper::ClearTablets); - m.def("im", &vs::get_im, "global information-maintainer object."); + m.def("start_read_service", + &vs::start_read_service, + "global information-maintainer object."); + m.def("start_write_service", + &vs::start_write_service, + "global information-maintainer object."); + m.def("im", &vs::im); + m.def("stop_threads", &vs::StopThreads); // interfaces for components #define ADD_SCALAR_TYPED_INTERFACE(T, name__) \ diff --git a/visualdl/backend/logic/sdk.cc b/visualdl/backend/logic/sdk.cc index c676c5505ab215ca6e601b8307af443520238f2f..4ce6362fbcf1e9b5f07d987e7a3988d11888d64e 100644 --- a/visualdl/backend/logic/sdk.cc +++ b/visualdl/backend/logic/sdk.cc @@ -60,9 +60,7 @@ std::string TabletHelper::human_readable_buffer() const { return buffer; } -void ImHelper::PersistToDisk() const { - InformationMaintainer::Global().PersistToDisk(); -} +void ImHelper::PersistToDisk() const { IM::Global().PersistToDisk(); } // implementations for components namespace components { diff --git a/visualdl/backend/logic/sdk.h b/visualdl/backend/logic/sdk.h index 5162fee8c0bcae538cabd790d9c68e2361a79dff..e97d4f85d8d7b558bf4e55b611914f8f502dad1b 100644 --- a/visualdl/backend/logic/sdk.h +++ b/visualdl/backend/logic/sdk.h @@ -77,7 +77,10 @@ public: void SetBuffer(const storage::Storage &buffer) { *data_ = buffer; } void SetBuffer(const std::string &buffer) { data_->ParseFromString(buffer); } - void SetDir(const std::string &dir) { data_->set_dir(dir); } + void SetDir(const std::string &dir) { + CHECK(data_) << "no storage instance hold"; + data_->set_dir(dir); + } int64_t timestamp() const { return data_->timestamp(); } std::string dir() const { return data_->dir(); } @@ -86,64 +89,19 @@ public: std::string human_readable_buffer() const; private: - storage::Storage *data_; + storage::Storage *data_{nullptr}; }; class ImHelper { public: ImHelper() {} - StorageHelper storage() { - return StorageHelper( - InformationMaintainer::Global().storage().mutable_data()); - } - TabletHelper tablet(const std::string &tag) { - return TabletHelper(InformationMaintainer::Global().storage().tablet(tag)); - } - TabletHelper AddTablet(const std::string &tag, int num_samples) { - return TabletHelper( - InformationMaintainer::Global().AddTablet(tag, num_samples)); - } - void ClearTablets() { - InformationMaintainer::Global().storage().mutable_data()->clear_tablets(); - } - - void PersistToDisk() const; -}; - -namespace components { - -/* - * Read and write support for Scalar component. - */ -template -class ScalarHelper { -public: - ScalarHelper(storage::Tablet *tablet) : data_(tablet) {} - - void SetCaptions(const std::vector &captions); - - void AddRecord(int id, const std::vector &values); - - std::vector> GetRecords() const; - - std::vector GetIds() const; - - std::vector GetTimestamps() const; - - std::vector GetCaptions() const; - - size_t GetSize() const { return data_->records_size(); } - -private: - storage::Tablet *data_; -}; - -} // namespace components - -static ImHelper &get_im() { - static ImHelper im; - return im; + /* + * mode: + * 0: read + * 1: write + * 2: none + */ } } // namespace visualdl diff --git a/visualdl/backend/python/summary.py b/visualdl/backend/python/summary.py index 59877aca69df649599e55aa112a4a8cf9d26c107..13b9400c3f358529d72d7057ac3f55ae6488c538 100644 --- a/visualdl/backend/python/summary.py +++ b/visualdl/backend/python/summary.py @@ -4,8 +4,6 @@ __all__ = [ ] import core -im = core.im() - dtypes = ("float", "double", "int32", "int64") @@ -15,7 +13,19 @@ def set_storage(dir): directory of summary to write log. :return: None ''' - im.storage().set_dir(dir) + core.im().storage().set_dir(dir) + + +def set_readable_storage(dir): + core.start_read_service(dir) + + +def set_writable_storage(dir): + core.start_write_service(dir) + + +def stop_service(): + core.stop_threads() class _Scalar(object): @@ -95,7 +105,19 @@ def scalar(tag, dtype='float'): ''' assert dtype in dtypes, "invalid dtype(%s), should be one of %s" % ( dtype, str(dtypes)) - tablet = im.add_tablet(tag, -1) + tablet = core.im().add_tablet(tag, -1) + dtype2obj = { + 'float': tablet.as_float_scalar, + 'double': tablet.as_double_scalar, + 'int32': tablet.as_int32_scalar, + 'int64': tablet.as_int64_scalar, + } + obj = dtype2obj[dtype]() + return _Scalar(obj) + + +def read_scalar(tag, dtype='float'): + tablet = core.im().tablet(tag) dtype2obj = { 'float': tablet.as_float_scalar, 'double': tablet.as_double_scalar, diff --git a/visualdl/backend/python/test_read_service.py b/visualdl/backend/python/test_read_service.py new file mode 100644 index 0000000000000000000000000000000000000000..0831e04b103d899c59a5d3406b0dcea6b99a90b3 --- /dev/null +++ b/visualdl/backend/python/test_read_service.py @@ -0,0 +1,15 @@ +import summary +import numpy as np +import unittest +import time + + +class StorageTester(unittest.TestCase): + def test_storage(self): + summary.set_writable_storage("./tmp_dir") + time.sleep(5) + summary.stop_service() + + +if __name__ == '__main__': + unittest.main() diff --git a/visualdl/backend/python/test_summary.py b/visualdl/backend/python/test_summary.py index 24fec43f5690da88c4d0d8fd2bad697794e4b032..c3caab75052cd04b3ff5b792a7c85988905d2fd8 100644 --- a/visualdl/backend/python/test_summary.py +++ b/visualdl/backend/python/test_summary.py @@ -1,14 +1,14 @@ import summary import numpy as np import unittest - -summary.set_storage("tmp_dir") +import time once_flag = False class ScalarTester(unittest.TestCase): def setUp(self): + summary.set_storage("tmp_dir") global once_flag self.scalar = summary.scalar("scalar0") if not once_flag: diff --git a/visualdl/backend/python/test_write_service.py b/visualdl/backend/python/test_write_service.py new file mode 100644 index 0000000000000000000000000000000000000000..241869c47786a19288b52b6abcd0d9c21cdb788c --- /dev/null +++ b/visualdl/backend/python/test_write_service.py @@ -0,0 +1,17 @@ +import summary +import numpy as np +import unittest +import time + + +class StorageTester(unittest.TestCase): + def test_read_storage(self): + summary.set_readable_storage("./tmp") + time.sleep(1) + scalar = summary.read_scalar('tag01') + time.sleep(5) + summary.stop_service() + + +if __name__ == '__main__': + unittest.main() diff --git a/visualdl/backend/storage/storage.cc b/visualdl/backend/storage/storage.cc index ca45d80142443ba86b4ed208242ed9b44b64c078..9ed2e20e245d72aba003ce4074bb684c69e11a2f 100644 --- a/visualdl/backend/storage/storage.cc +++ b/visualdl/backend/storage/storage.cc @@ -1,4 +1,5 @@ #include +#include #include #include "visualdl/backend/storage/storage.h" @@ -6,12 +7,17 @@ namespace visualdl { -std::string GenPathFromTag(const std::string &dir, const std::string &tag) { - return dir + "/" + tag; -} - const std::string StorageBase::meta_file_name = "storage.meta"; +std::string StorageBase::meta_path() const { + CHECK(!storage_.dir().empty()) << "storage.dir should be set first"; + return storage_.dir() + "/" + meta_file_name; +} +std::string StorageBase::tablet_path(const std::string &tag) const { + CHECK(!storage_.dir().empty()) << "storage.dir should be set first"; + return storage_.dir() + "/" + tag; +} + storage::Tablet *MemoryStorage::NewTablet(const std::string &tag, int num_samples) { auto it = tablets_.find(tag); @@ -32,39 +38,54 @@ storage::Tablet *MemoryStorage::tablet(const std::string &tag) { return &it->second; } +// TODO add some checksum to avoid unnecessary saving void MemoryStorage::PersistToDisk() const { + CHECK(!storage_.dir().empty()) << "storage's dir should be set first"; VLOG(3) << "persist storage to disk path " << storage_.dir(); // make a directory if not exist fs::TryMkdir(storage_.dir()); // write storage out - CHECK(!storage_.dir().empty()) << "storage's dir should be set first"; - const auto meta_path = storage_.dir() + "/" + meta_file_name; - fs::Write(meta_path, fs::Serialize(storage_)); + fs::SerializeToFile(storage_, meta_path()); // write all the tablets for (auto tag : storage_.tags()) { - auto path = GenPathFromTag(storage_.dir(), tag); auto it = tablets_.find(tag); CHECK(it != tablets_.end()); - fs::Write(path, fs::Serialize(it->second)); + fs::SerializeToFile(it->second, tablet_path(tag)); } } +// TODO add some checksum to avoid unnecessary loading void MemoryStorage::LoadFromDisk(const std::string &dir) { - VLOG(3) << "load storage from disk path " << dir; CHECK(!dir.empty()) << "dir is empty"; + storage_.set_dir(dir); // load storage - const auto meta_path = dir + "/" + meta_file_name; - auto buf = fs::Read(meta_path); - CHECK(fs::DeSerialize(&storage_, buf)) - << "failed to parse protobuf loaded from " << meta_path; + CHECK(fs::DeSerializeFromFile(&storage_, meta_path())) + << "parse from " << meta_path() << " failed"; // load all the tablets for (int i = 0; i < storage_.tags_size(); i++) { - std::string tag = storage_.tags(i); - auto path = GenPathFromTag(storage_.dir(), tag); - CHECK(tablets_[tag].ParseFromString(fs::Read(path))) - << "failed to parse protobuf text loaded from " << path; + auto tag = storage_.tags(i); + CHECK(fs::DeSerializeFromFile(&tablets_[tag], tablet_path(tag))); } } +void MemoryStorage::StartReadService() { + cc::PeriodExector::task_t task = [this] { + VLOG(3) << "loading from " << storage_.dir(); + LoadFromDisk(storage_.dir()); + return true; + }; + cc::PeriodExector::Global().Start(); + VLOG(3) << "push read task"; + cc::PeriodExector::Global()(std::move(task), 2512); +} + +void MemoryStorage::StartWriteSerice() { + cc::PeriodExector::Global().Start(); + cc::PeriodExector::task_t task = [this] { + PersistToDisk(); + return true; + }; + cc::PeriodExector::Global()(std::move(task), 2000); +} } // namespace visualdl diff --git a/visualdl/backend/storage/storage.h b/visualdl/backend/storage/storage.h index d9e8ae29a2481a7301b8e6740763abb91c76889c..a827a6dbd8d936ff6f9c8de9c95289cd8cbff0ee 100644 --- a/visualdl/backend/storage/storage.h +++ b/visualdl/backend/storage/storage.h @@ -26,6 +26,8 @@ public: const static std::string meta_file_name; enum Type { kMemory = 0, kDisk = 1 }; + // mode of the sevice, either reading or writing. + enum Mode { kRead = 0, kWrite = 1, kNone = 2 }; void SetStorage(const std::string &dir) { time_t t; @@ -34,6 +36,9 @@ public: storage_.set_dir(dir); } + std::string meta_path() const; + std::string tablet_path(const std::string &tag) const; + /* * Create a new Tablet storage. */ @@ -78,6 +83,18 @@ public: void LoadFromDisk(const std::string &dir) override; + /* + * Create a thread which will keep reading the latest data from the disk to + * memory. + */ + void StartReadService(); + + /* + * Create a thread which will keep writing the latest changes from memory to + * disk. + */ + void StartWriteSerice(); + private: std::map tablets_; }; diff --git a/visualdl/backend/storage/storage_test.cc b/visualdl/backend/storage/storage_test.cc index 6090d71f4b4ac88379f9eacce3a01f01434ecff9..91c2a59d47ed38f8f6a6eb5affc27305171bbf92 100644 --- a/visualdl/backend/storage/storage_test.cc +++ b/visualdl/backend/storage/storage_test.cc @@ -32,6 +32,8 @@ TEST_F(MemoryStorageTest, AddTablet) { } TEST_F(MemoryStorageTest, PersistToDisk) { + storage_.SetStorage("./tmp"); + CHECK(!storage_.data().dir().empty()); string tag = "add%20tag0"; storage_.NewTablet(tag, -1); diff --git a/visualdl/backend/utils/concurrency.h b/visualdl/backend/utils/concurrency.h new file mode 100644 index 0000000000000000000000000000000000000000..a3d46a9a89df99ca73c1075216be71990bbd34b3 --- /dev/null +++ b/visualdl/backend/utils/concurrency.h @@ -0,0 +1,61 @@ +#ifndef VISUALDL_BACKEND_UTILS_CONCURRENCY_H +#define VISUALDL_BACKEND_UTILS_CONCURRENCY_H + +#include +#include +#include +#include +#include + +namespace visualdl { +namespace cc { + +/* + * Run a task every `duration` milliseconds. + * Each evoke will start a thread to do this asynchronously. + */ +struct PeriodExector { + using task_t = std::function; + + static PeriodExector& Global() { + static PeriodExector exec; + return exec; + } + + void Quit() { + // TODO use some conditonal variable to help quit immediately. + quit = true; + } + + void Start() { quit = false; } + + void operator()(task_t&& task, int msec) { + auto task_wrapper = [=] { + while (!quit) { + if (!task()) break; + std::this_thread::sleep_for(std::chrono::milliseconds(msec)); + } + LOG(INFO) << "quit job"; + }; + threads_.emplace_back(std::thread(std::move(task_wrapper))); + msec_ = msec; + } + + ~PeriodExector() { + for (auto& t : threads_) { + if (t.joinable()) { + t.join(); + } + } + } + +private: + bool quit = false; + std::vector threads_; + int msec_; +}; + +} // namespace cc +} // namespace visualdl + +#endif diff --git a/visualdl/backend/utils/filesystem.h b/visualdl/backend/utils/filesystem.h index e30640b97bbf9509b78cc3ef55e7569bfd9bf7c8..9a509407c12816f77a2a97f1d2ed5e2972a7c531 100644 --- a/visualdl/backend/utils/filesystem.h +++ b/visualdl/backend/utils/filesystem.h @@ -30,6 +30,18 @@ bool DeSerialize(T* proto, const std::string buf, bool human_readable = false) { return proto->ParseFromString(buf); } +template +bool SerializeToFile(const T& proto, const std::string& path) { + std::ofstream file(path, std::ios::binary); + return proto.SerializeToOstream(&file); +} + +template +bool DeSerializeFromFile(T* proto, const std::string& path) { + std::ifstream file(path, std::ios::binary); + return proto->ParseFromIstream(&file); +} + void TryMkdir(const std::string& dir) { VLOG(1) << "try to mkdir " << dir; struct stat st = {0}; diff --git a/visualdl/backend/utils/log.h b/visualdl/backend/utils/log.h new file mode 100644 index 0000000000000000000000000000000000000000..9aed0806d905079b09e37734224bbf2ddb3b376c --- /dev/null +++ b/visualdl/backend/utils/log.h @@ -0,0 +1,18 @@ +#ifndef VISUALDL_BACKEND_UTILS_LOG_H +#define VISUALDL_BACKEND_UTILS_LOG_H + +#include +namespace visualdl { + +namespace log { + +class NotImplementedException : public std::logic_error { +public: + NotImplementedException() : std::logic_error{"Function not implemented"} {} +}; + +} // namespace log + +} // namespace visualdl + +#endif diff --git a/visualdl/backend/utils/test_concurrency.cc b/visualdl/backend/utils/test_concurrency.cc new file mode 100644 index 0000000000000000000000000000000000000000..9657e2d31e16453a790cf052703605da389999a2 --- /dev/null +++ b/visualdl/backend/utils/test_concurrency.cc @@ -0,0 +1,19 @@ +#include "visualdl/backend/utils/concurrency.h" + +#include +#include + +namespace visualdl { + +int counter = 0; + +TEST(concurrency, test) { + cc::PeriodExector::task_t task = [&counter]() { + LOG(INFO) << "Hello " << counter++; + if (counter > 5) return false; + return true; + }; + cc::PeriodExector::Global()(std::move(task), 200); +} + +} // namespace visualdl \ No newline at end of file