diff --git a/CMakeLists.txt b/CMakeLists.txt index b0fe4f0276cdeb3540b39a4d545a151e606a9bc7..d3f167501abd2ede1b380bad1a1def8f2fc5c796 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -39,10 +39,11 @@ add_executable(vl_test ${PROJECT_SOURCE_DIR}/visualdl/storage/storage_test.cc ${PROJECT_SOURCE_DIR}/visualdl/utils/test_concurrency.cc ${PROJECT_SOURCE_DIR}/visualdl/logic/im_test.cc + ${PROJECT_SOURCE_DIR}/visualdl/logic/sdk_test.cc ${PROJECT_SOURCE_DIR}/visualdl/utils/concurrency.h ${PROJECT_SOURCE_DIR}/visualdl/utils/filesystem.h ) -target_link_libraries(vl_test storage im gtest glog protobuf gflags pthread) +target_link_libraries(vl_test storage sdk im gtest glog protobuf gflags pthread) enable_testing () diff --git a/visualdl/logic/im.cc b/visualdl/logic/im.cc index 76d55d225bafe1754d14c351436bfde97041665e..94881244f8d5925e2a4cf3e0ac71169a5fdfdb1f 100644 --- a/visualdl/logic/im.cc +++ b/visualdl/logic/im.cc @@ -25,26 +25,26 @@ int ReserviorSample(int num_samples, int num_records) { return -1; } -IM::IM(StorageBase::Type type, StorageBase::Mode mode) { - switch (type) { - case StorageBase::Type::kMemory: { - storage_.reset(new MemoryStorage); - } break; - default: - CHECK(false) << "Unsupported storage kind " << type; - } - - switch (mode) { - case StorageBase::Mode::kRead: - dynamic_cast(storage_.get())->StartReadService(); - break; - case StorageBase::Mode::kWrite: - dynamic_cast(storage_.get())->StartWriteSerice(); - break; - default: - break; - } -} +// IM::IM(StorageBase::Type type, StorageBase::Mode mode) { +// switch (type) { +// case StorageBase::Type::kMemory: { +// storage_.reset(new MemoryStorage); +// } break; +// default: +// CHECK(false) << "Unsupported storage kind " << type; +// } + +// switch (mode) { +// case StorageBase::Mode::kRead: +// dynamic_cast(storage_.get())->StartReadService(500); +// break; +// case StorageBase::Mode::kWrite: +// dynamic_cast(storage_.get())->StartWriteSerice(500); +// break; +// default: +// break; +// } +// } void IM::SetPersistDest(const std::string &path) { CHECK(storage_->mutable_data()->dir().empty()) @@ -95,7 +95,7 @@ void IM::PersistToDisk() { CHECK(!storage_->data().dir().empty()) << "path of storage should be set"; // TODO make dir first // MakeDir(storage_.data().dir()); - storage_->PersistToDisk(); + storage_->PersistToDisk(storage_->data().dir()); } } // namespace visualdl diff --git a/visualdl/logic/im.h b/visualdl/logic/im.h index 5df59e2994807cdfe4a0a5e746f8bbc264759433..47f280845981f0b735a239ed291f4326e7f8fe41 100644 --- a/visualdl/logic/im.h +++ b/visualdl/logic/im.h @@ -27,22 +27,21 @@ namespace visualdl { */ class IM final { public: - IM() { storage_.reset(new MemoryStorage); } - IM(StorageBase::Type type, StorageBase::Mode mode); - ~IM() { cc::PeriodExector::Global().Quit(); } - - static IM &Global() { - static IM x; - return x; + IM() { storage_.reset(new MemoryStorage(&executor_)); } + // IM(StorageBase::Type type, StorageBase::Mode mode); + ~IM() { executor_.Quit(); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } - void MaintainRead() { + void MaintainRead(const std::string &dir, int msecs) { LOG(INFO) << "start maintain read"; - dynamic_cast(storage_.get())->StartReadService(); + dynamic_cast(storage_.get()) + ->StartReadService(dir, msecs, &lock_); } - void MaintainWrite() { - dynamic_cast(storage_.get())->StartWriteSerice(); + void MaintainWrite(const std::string &dir, int msecs) { + dynamic_cast(storage_.get()) + ->StartWriteSerice(dir, msecs, &lock_); } /* @@ -73,8 +72,15 @@ public: StorageBase &storage() { return *storage_; } + cc::PeriodExector &executor() { return executor_; } + + std::mutex &handler() { return lock_; } + private: + // read write lock for protobuf in memory + std::mutex lock_; std::unique_ptr storage_; + cc::PeriodExector executor_; }; } // namespace visualdl diff --git a/visualdl/logic/im_test.cc b/visualdl/logic/im_test.cc index deaab5e4704063cabb652b1a7940d6321ae93adb..37246c1763c03071d25debe3b260cd81b054b552 100644 --- a/visualdl/logic/im_test.cc +++ b/visualdl/logic/im_test.cc @@ -2,13 +2,15 @@ #include "gtest/gtest.h" +#include "visualdl/storage/storage.h" + namespace visualdl { class ImTester : public ::testing::Test { protected: void SetUp() override {} - IM &im = IM::Global(); + IM im; }; TEST_F(ImTester, AddTablet) { diff --git a/visualdl/logic/pybind.cc b/visualdl/logic/pybind.cc index 7a79429a7dde5f11a9690ab326eb2cf2129610d6..09e29a3b15acf2381aee97f9316523a7351b41b4 100644 --- a/visualdl/logic/pybind.cc +++ b/visualdl/logic/pybind.cc @@ -53,18 +53,20 @@ PYBIND11_PLUGIN(core) { .def("tablet", &vs::ImHelper::tablet) .def("add_tablet", &vs::ImHelper::AddTablet) .def("persist_to_disk", &vs::ImHelper::PersistToDisk) - .def("clear_tablets", &vs::ImHelper::ClearTablets); + .def("clear_tablets", &vs::ImHelper::ClearTablets) + .def("start_read_service", + &vs::ImHelper::StartReadService, + "start a thread to maintain read service") + .def("start_write_service", + &vs::ImHelper::StartWriteSerice, + "start a thread to maintain write service") + .def("stop_service", + &vs::ImHelper::StopService, + "stop the service thread"); - m.def("start_read_service", - &vs::start_read_service, - "global information-maintainer object."); - m.def("start_write_service", - &vs::start_write_service, - "global information-maintainer object."); - m.def("im", &vs::im); - m.def("stop_threads", &vs::stop_threads); +// interfaces for components begin -// interfaces for components +// different data type of scalar conponent #define ADD_SCALAR_TYPED_INTERFACE(T, name__) \ py::class_>(m, #name__) \ .def("add_record", &vs::components::ScalarHelper::AddRecord) \ diff --git a/visualdl/logic/sdk.cc b/visualdl/logic/sdk.cc index 513541511d48adc00e1a95f5bf51974fb0b1988f..e3283812569ece8183c96451a0026da6a2683572 100644 --- a/visualdl/logic/sdk.cc +++ b/visualdl/logic/sdk.cc @@ -66,6 +66,8 @@ namespace components { template void ScalarHelper::SetCaptions(const std::vector &captions) { + ACQUIRE_HANDLER(handler_); + CHECK_EQ(data_->captions_size(), 0UL) << "the captions can set only once"; for (int i = 0; i < captions.size(); i++) { data_->add_captions(captions[i]); @@ -74,6 +76,8 @@ void ScalarHelper::SetCaptions(const std::vector &captions) { template void ScalarHelper::AddRecord(int id, const std::vector &values) { + ACQUIRE_HANDLER(handler_); + CHECK_NOTNULL(data_); CHECK_GT(data_->captions_size(), 0UL) << "captions should be set first"; CHECK_EQ(data_->captions_size(), values.size()) @@ -94,6 +98,8 @@ void ScalarHelper::AddRecord(int id, const std::vector &values) { template std::vector> ScalarHelper::GetRecords() const { + ACQUIRE_HANDLER(handler_); + std::vector> result; EntryHelper entry_helper; for (int i = 0; i < data_->records_size(); i++) { @@ -107,6 +113,7 @@ std::vector> ScalarHelper::GetRecords() const { template std::vector ScalarHelper::GetIds() const { + ACQUIRE_HANDLER(handler_); CHECK_NOTNULL(data_); std::vector result; for (int i = 0; i < data_->records_size(); i++) { @@ -117,6 +124,7 @@ std::vector ScalarHelper::GetIds() const { template std::vector ScalarHelper::GetTimestamps() const { + ACQUIRE_HANDLER(handler_); CHECK_NOTNULL(data_); std::vector result; for (int i = 0; i < data_->records_size(); i++) { @@ -127,6 +135,7 @@ std::vector ScalarHelper::GetTimestamps() const { template std::vector ScalarHelper::GetCaptions() const { + ACQUIRE_HANDLER(handler_); return std::vector(data_->captions().begin(), data_->captions().end()); } diff --git a/visualdl/logic/sdk.h b/visualdl/logic/sdk.h index 3937ffafbb91ac2792cb88a242a3833fa2bfc3e4..b74e33174291d31baf44d430116eb86d583c02a6 100644 --- a/visualdl/logic/sdk.h +++ b/visualdl/logic/sdk.h @@ -94,33 +94,50 @@ private: class ImHelper { public: - ImHelper() {} + // TODO(ChunweiYan) decouple helper with resource. + ImHelper() { im_.reset(new IM); } + ImHelper(std::unique_ptr im) : im_(std::move(im)) {} StorageHelper storage() { - return StorageHelper(IM::Global().storage().mutable_data()); + return StorageHelper(im_->storage().mutable_data()); } TabletHelper tablet(const std::string &tag) { - return TabletHelper(IM::Global().storage().tablet(tag)); + return TabletHelper(im_->storage().tablet(tag)); } TabletHelper AddTablet(const std::string &tag, int num_samples) { - return TabletHelper(IM::Global().AddTablet(tag, num_samples)); + return TabletHelper(im_->AddTablet(tag, num_samples)); } - void ClearTablets() { - IM::Global().storage().mutable_data()->clear_tablets(); + std::mutex &handler() { return im_->handler(); } + void ClearTablets() { im_->storage().mutable_data()->clear_tablets(); } + void StartReadService(const std::string &dir, int msecs) { + im_->SetPersistDest(dir); + im_->MaintainRead(dir, msecs); } + void StartWriteSerice(const std::string &dir, int msecs) { + im_->SetPersistDest(dir); + im_->MaintainWrite(dir, msecs); + } + void StopService() { im_->executor().Quit(); } + void PersistToDisk() const { im_->PersistToDisk(); } - void PersistToDisk() const { IM::Global().PersistToDisk(); } +private: + std::unique_ptr im_; }; namespace components { +#define ACQUIRE_HANDLER(handler) std::lock_guard ____(*handler); + /* * Read and write support for Scalar component. */ template class ScalarHelper { public: - ScalarHelper(storage::Tablet *tablet) : data_(tablet) {} + ScalarHelper(storage::Tablet *tablet, std::mutex *handler = nullptr) + : data_(tablet), handler_(handler) {} + ScalarHelper(TabletHelper &tablet, std::mutex *handler = nullptr) + : data_(&tablet.data()), handler_(handler) {} void SetCaptions(const std::vector &captions); @@ -138,27 +155,10 @@ public: private: storage::Tablet *data_; + std::mutex *handler_; }; } // namespace components - -static ImHelper &im() { - static ImHelper im; - return im; -} - -static void start_read_service(const std::string &dir) { - IM::Global().SetPersistDest(dir); - IM::Global().MaintainRead(); -} - -static void start_write_service(const std::string &dir) { - IM::Global().SetPersistDest(dir); - IM::Global().MaintainWrite(); -} - -static void stop_threads() { cc::PeriodExector::Global().Quit(); } - } // namespace visualdl #endif // VISUALDL_BACKEND_LOGIC_SDK_H diff --git a/visualdl/storage/storage.cc b/visualdl/storage/storage.cc index 2b88ad091a9810fb7473c12fa6daa4c2a214017b..814315454a903341ed102135d01b81438f6dccb3 100644 --- a/visualdl/storage/storage.cc +++ b/visualdl/storage/storage.cc @@ -9,13 +9,14 @@ namespace visualdl { const std::string StorageBase::meta_file_name = "storage.meta"; -std::string StorageBase::meta_path() const { - CHECK(!storage_.dir().empty()) << "storage.dir should be set first"; - return storage_.dir() + "/" + meta_file_name; +std::string StorageBase::meta_path(const std::string &dir) const { + CHECK(!dir.empty()) << "dir is empty"; + return dir + "/" + meta_file_name; } -std::string StorageBase::tablet_path(const std::string &tag) const { - CHECK(!storage_.dir().empty()) << "storage.dir should be set first"; - return storage_.dir() + "/" + tag; +std::string StorageBase::tablet_path(const std::string &dir, + const std::string &tag) const { + CHECK(!dir.empty()) << "dir should be set first"; + return dir + "/" + tag; } storage::Tablet *MemoryStorage::NewTablet(const std::string &tag, @@ -39,18 +40,20 @@ storage::Tablet *MemoryStorage::tablet(const std::string &tag) { } // TODO add some checksum to avoid unnecessary saving -void MemoryStorage::PersistToDisk() const { - CHECK(!storage_.dir().empty()) << "storage's dir should be set first"; - VLOG(3) << "persist storage to disk path " << storage_.dir(); +void MemoryStorage::PersistToDisk(const std::string &dir) { + CHECK(!dir.empty()); + storage_.set_dir(dir); // make a directory if not exist - fs::TryMkdir(storage_.dir()); + fs::TryMkdir(dir); // write storage out - fs::SerializeToFile(storage_, meta_path()); + LOG(INFO) << "to serize meta to dir " << dir; + fs::SerializeToFile(storage_, meta_path(dir)); + LOG(INFO) << "serize meta to dir " << dir; // write all the tablets for (auto tag : storage_.tags()) { auto it = tablets_.find(tag); CHECK(it != tablets_.end()); - fs::SerializeToFile(it->second, tablet_path(tag)); + fs::SerializeToFile(it->second, tablet_path(dir, tag)); } } @@ -59,33 +62,53 @@ void MemoryStorage::LoadFromDisk(const std::string &dir) { CHECK(!dir.empty()) << "dir is empty"; storage_.set_dir(dir); // load storage - CHECK(fs::DeSerializeFromFile(&storage_, meta_path())) - << "parse from " << meta_path() << " failed"; + CHECK(fs::DeSerializeFromFile(&storage_, meta_path(dir))) + << "parse from " << meta_path(dir) << " failed"; // load all the tablets for (int i = 0; i < storage_.tags_size(); i++) { auto tag = storage_.tags(i); - CHECK(fs::DeSerializeFromFile(&tablets_[tag], tablet_path(tag))); + CHECK(fs::DeSerializeFromFile(&tablets_[tag], tablet_path(dir, tag))); } } -void MemoryStorage::StartReadService() { - cc::PeriodExector::task_t task = [this] { - VLOG(3) << "loading from " << storage_.dir(); - LoadFromDisk(storage_.dir()); +void MemoryStorage::StartReadService(const std::string &dir, + int msecs, + std::mutex *handler) { + CHECK(executor_ != nullptr); + CHECK(!dir.empty()) << "dir should be set first"; + cc::PeriodExector::task_t task = [dir, this, handler] { + LOG(INFO) << "loading from " << dir; + if (handler != nullptr) { + std::lock_guard _(*handler); + LoadFromDisk(dir); + } else { + LoadFromDisk(dir); + } return true; }; - cc::PeriodExector::Global().Start(); - VLOG(3) << "push read task"; - cc::PeriodExector::Global()(std::move(task), 2512); + // executor_.Start(); + LOG(INFO) << "push read task"; + (*executor_)(std::move(task), msecs); } -void MemoryStorage::StartWriteSerice() { - cc::PeriodExector::Global().Start(); - cc::PeriodExector::task_t task = [this] { - PersistToDisk(); +void MemoryStorage::StartWriteSerice(const std::string &dir, + int msecs, + std::mutex *handler) { + CHECK(executor_ != nullptr); + CHECK(!dir.empty()) << "dir should be set first"; + storage_.set_dir(dir); + // executor_.Start(); + cc::PeriodExector::task_t task = [dir, handler, this] { + LOG(INFO) << "persist to disk"; + if (handler != nullptr) { + std::lock_guard _(*handler); + PersistToDisk(dir); + } else { + PersistToDisk(dir); + } return true; }; - cc::PeriodExector::Global()(std::move(task), 2000); + (*executor_)(std::move(task), msecs); } } // namespace visualdl diff --git a/visualdl/storage/storage.h b/visualdl/storage/storage.h index 2787d9af87a9547921531b7dc509798a70001b71..6c60b9302ddfd1e7409a696fc496dfdae6d3c574 100644 --- a/visualdl/storage/storage.h +++ b/visualdl/storage/storage.h @@ -6,6 +6,7 @@ #include #include "visualdl/storage/storage.pb.h" +#include "visualdl/utils/concurrency.h" namespace visualdl { @@ -36,8 +37,8 @@ public: storage_.set_dir(dir); } - std::string meta_path() const; - std::string tablet_path(const std::string &tag) const; + std::string meta_path(const std::string &dir) const; + std::string tablet_path(const std::string &dir, const std::string &tag) const; /* * Create a new Tablet storage. @@ -56,7 +57,7 @@ public: * Persist the data from cache to disk. Both the memory storage or disk * storage should write changes to disk for persistence. */ - virtual void PersistToDisk() const = 0; + virtual void PersistToDisk(const std::string &dir) = 0; /* * Load data from disk. @@ -75,28 +76,39 @@ protected: */ class MemoryStorage final : public StorageBase { public: + MemoryStorage() {} + MemoryStorage(cc::PeriodExector *executor) : executor_(executor) {} + ~MemoryStorage() { + if (executor_ != nullptr) executor_->Quit(); + } storage::Tablet *NewTablet(const std::string &tag, int num_samples) override; storage::Tablet *tablet(const std::string &tag) override; - void PersistToDisk() const override; + void PersistToDisk(const std::string &dir) override; void LoadFromDisk(const std::string &dir) override; /* * Create a thread which will keep reading the latest data from the disk to * memory. + * + * msecs: how many millisecond to sync memory and disk. */ - void StartReadService(); + void StartReadService(const std::string &dir, int msecs, std::mutex *handler); /* * Create a thread which will keep writing the latest changes from memory to * disk. + * + * msecs: how many millisecond to sync memory and disk. */ - void StartWriteSerice(); + void StartWriteSerice(const std::string &dir, int msecs, std::mutex *handler); private: std::map tablets_; + // TODO(ChunweiYan) remove executor here. + cc::PeriodExector *executor_{nullptr}; }; } // namespace visualdl diff --git a/visualdl/storage/storage_test.cc b/visualdl/storage/storage_test.cc index 9ddf13a36153b8bbdb2b14ceabea7c79c971efbe..8dbc7f566735d5b0af7889e5c365285c60df4573 100644 --- a/visualdl/storage/storage_test.cc +++ b/visualdl/storage/storage_test.cc @@ -32,15 +32,17 @@ TEST_F(MemoryStorageTest, AddTablet) { } TEST_F(MemoryStorageTest, PersistToDisk) { - storage_.SetStorage("./tmp"); - CHECK(!storage_.data().dir().empty()); + const std::string dir = "./tmp/201.test"; + storage_.SetStorage(dir); string tag = "add%20tag0"; storage_.NewTablet(tag, -1); - storage_.PersistToDisk(); + storage_.PersistToDisk(dir); + LOG(INFO) << "persist to disk"; MemoryStorage other; - other.LoadFromDisk("./tmp"); + other.LoadFromDisk(dir); + LOG(INFO) << "read from disk"; ASSERT_EQ(other.data().SerializeAsString(), storage_.data().SerializeAsString()); } diff --git a/visualdl/utils/concurrency.h b/visualdl/utils/concurrency.h index 5a8c320dfd4f57a28c1e420d92166504defd5c25..cb73dc2b4d153360c48c2bc24220f9c96779186c 100644 --- a/visualdl/utils/concurrency.h +++ b/visualdl/utils/concurrency.h @@ -17,13 +17,9 @@ namespace cc { struct PeriodExector { using task_t = std::function; - static PeriodExector& Global() { - static PeriodExector exec; - return exec; - } - void Quit() { // TODO use some conditonal variable to help quit immediately. + // std::this_thread::sleep_for(std::chrono::milliseconds(200)); quit = true; } @@ -34,6 +30,7 @@ struct PeriodExector { auto task_wrapper = [=] { while (!quit) { + // task failed if (!task()) break; // if the program is terminated, quit while as soon as possible. // this is just trick, but should works. @@ -57,6 +54,7 @@ struct PeriodExector { } ~PeriodExector() { + Quit(); for (auto& t : threads_) { if (t.joinable()) { t.join(); diff --git a/visualdl/utils/test_concurrency.cc b/visualdl/utils/test_concurrency.cc index 96f18caabcf13a49fc4c98f31e3c0f7ff37cede2..ea4540d33b0cb3921b373f670fee3fd81212a8cd 100644 --- a/visualdl/utils/test_concurrency.cc +++ b/visualdl/utils/test_concurrency.cc @@ -8,12 +8,13 @@ namespace visualdl { int counter = 0; TEST(concurrency, test) { - cc::PeriodExector::task_t task = [&counter]() { + cc::PeriodExector executor; + cc::PeriodExector::task_t task = [&]() { LOG(INFO) << "Hello " << counter++; if (counter > 5) return false; return true; }; - cc::PeriodExector::Global()(std::move(task), 200); + executor(std::move(task), 200); } } // namespace visualdl