diff --git a/CMakeLists.txt b/CMakeLists.txt index b0fe4f0276cdeb3540b39a4d545a151e606a9bc7..126529b9b805fe45549ae820be87e958de5fd334 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -33,17 +33,21 @@ include_directories(${PROJECT_SOURCE_DIR}/thirdparty/local/include) add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/storage) add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/logic) +add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/python) add_executable(vl_test ${PROJECT_SOURCE_DIR}/visualdl/test.cc ${PROJECT_SOURCE_DIR}/visualdl/storage/storage_test.cc ${PROJECT_SOURCE_DIR}/visualdl/utils/test_concurrency.cc ${PROJECT_SOURCE_DIR}/visualdl/logic/im_test.cc + ${PROJECT_SOURCE_DIR}/visualdl/logic/sdk_test.cc ${PROJECT_SOURCE_DIR}/visualdl/utils/concurrency.h ${PROJECT_SOURCE_DIR}/visualdl/utils/filesystem.h ) -target_link_libraries(vl_test storage im gtest glog protobuf gflags pthread) +target_link_libraries(vl_test storage sdk im gtest glog protobuf gflags pthread) enable_testing () -add_test(NAME vstest COMMAND vl_test) +add_custom_target(test_init COMMAND ./init_test.sh $CMAKE_BINARY_DIR) +add_test(NAME vstest COMMAND ./vl_test) +set_target_properties(vl_test PROPERTIES DEPENDS test_init) diff --git a/visualdl/logic/CMakeLists.txt b/visualdl/logic/CMakeLists.txt index 7f021af7015d585a2d713029323b445f6941cea4..6b88d931e4ee3173cef86d7d3b8b67441fe3ec40 100644 --- a/visualdl/logic/CMakeLists.txt +++ b/visualdl/logic/CMakeLists.txt @@ -7,3 +7,4 @@ add_dependencies(sdk storage_proto) add_library(core SHARED ${PROJECT_SOURCE_DIR}/visualdl/logic/pybind.cc) add_dependencies(core pybind python im storage sdk protobuf glog) target_link_libraries(core PRIVATE pybind python im storage sdk protobuf glog) +set_target_properties(core PROPERTIES PREFIX "" SUFFIX ".so") diff --git a/visualdl/logic/im.cc b/visualdl/logic/im.cc index 76d55d225bafe1754d14c351436bfde97041665e..624fbfbba50fe623cc4226591e336fbb66af9552 100644 --- a/visualdl/logic/im.cc +++ b/visualdl/logic/im.cc @@ -25,27 +25,6 @@ int ReserviorSample(int num_samples, int num_records) { return -1; } -IM::IM(StorageBase::Type type, StorageBase::Mode mode) { - switch (type) { - case StorageBase::Type::kMemory: { - storage_.reset(new MemoryStorage); - } break; - default: - CHECK(false) << "Unsupported storage kind " << type; - } - - switch (mode) { - case StorageBase::Mode::kRead: - dynamic_cast(storage_.get())->StartReadService(); - break; - case StorageBase::Mode::kWrite: - dynamic_cast(storage_.get())->StartWriteSerice(); - break; - default: - break; - } -} - void IM::SetPersistDest(const std::string &path) { CHECK(storage_->mutable_data()->dir().empty()) << "duplicate set storage's path"; @@ -95,7 +74,7 @@ void IM::PersistToDisk() { CHECK(!storage_->data().dir().empty()) << "path of storage should be set"; // TODO make dir first // MakeDir(storage_.data().dir()); - storage_->PersistToDisk(); + storage_->PersistToDisk(storage_->data().dir()); } } // namespace visualdl diff --git a/visualdl/logic/im.h b/visualdl/logic/im.h index 5df59e2994807cdfe4a0a5e746f8bbc264759433..abbbc989fd78164a8f82e9de0c96aaf850293c58 100644 --- a/visualdl/logic/im.h +++ b/visualdl/logic/im.h @@ -3,6 +3,7 @@ #include #include +#include #include #include "visualdl/storage/storage.h" @@ -27,22 +28,19 @@ namespace visualdl { */ class IM final { public: - IM() { storage_.reset(new MemoryStorage); } - IM(StorageBase::Type type, StorageBase::Mode mode); - ~IM() { cc::PeriodExector::Global().Quit(); } + IM() { storage_.reset(new MemoryStorage(&executor_)); } + // IM(StorageBase::Type type, StorageBase::Mode mode); + ~IM() { executor_.Quit(); } - static IM &Global() { - static IM x; - return x; - } - - void MaintainRead() { + void MaintainRead(const std::string &dir, int msecs) { LOG(INFO) << "start maintain read"; - dynamic_cast(storage_.get())->StartReadService(); + dynamic_cast(storage_.get()) + ->StartReadService(dir, msecs, &lock_); } - void MaintainWrite() { - dynamic_cast(storage_.get())->StartWriteSerice(); + void MaintainWrite(const std::string &dir, int msecs) { + dynamic_cast(storage_.get()) + ->StartWriteService(dir, msecs, &lock_); } /* @@ -73,8 +71,17 @@ public: StorageBase &storage() { return *storage_; } + cc::PeriodExector &executor() { return executor_; } + + std::mutex &handler() { return lock_; } + private: + // read write lock for protobuf in memory + // TODO(ChunweiYan) mutex too heavy here, might change to a message queue to + // reduce the frequency of visiting disk + std::mutex lock_; std::unique_ptr storage_; + cc::PeriodExector executor_; }; } // namespace visualdl diff --git a/visualdl/logic/im_test.cc b/visualdl/logic/im_test.cc index deaab5e4704063cabb652b1a7940d6321ae93adb..37246c1763c03071d25debe3b260cd81b054b552 100644 --- a/visualdl/logic/im_test.cc +++ b/visualdl/logic/im_test.cc @@ -2,13 +2,15 @@ #include "gtest/gtest.h" +#include "visualdl/storage/storage.h" + namespace visualdl { class ImTester : public ::testing::Test { protected: void SetUp() override {} - IM &im = IM::Global(); + IM im; }; TEST_F(ImTester, AddTablet) { diff --git a/visualdl/logic/pybind.cc b/visualdl/logic/pybind.cc index 7a79429a7dde5f11a9690ab326eb2cf2129610d6..69d40ca039e606d9d20a3d327347b665c1010480 100644 --- a/visualdl/logic/pybind.cc +++ b/visualdl/logic/pybind.cc @@ -22,19 +22,22 @@ PYBIND11_PLUGIN(core) { vs::TabletHelper::SetBuffer) // scalar interface .def("as_int32_scalar", - [](const vs::TabletHelper& self) { - return vs::components::ScalarHelper(&self.data()); + [](vs::TabletHelper& self, vs::ImHelper& im) { + return vs::components::ScalarHelper(self, &im.handler()); }) .def("as_int64_scalar", - [](const vs::TabletHelper& self) { - return vs::components::ScalarHelper(&self.data()); + [](vs::TabletHelper& self, vs::ImHelper& im) { + return vs::components::ScalarHelper(&self.data(), + &im.handler()); }) .def("as_float_scalar", - [](const vs::TabletHelper& self) { - return vs::components::ScalarHelper(&self.data()); + [](vs::TabletHelper& self, vs::ImHelper& im) { + return vs::components::ScalarHelper(&self.data(), + &im.handler()); }) - .def("as_double_scalar", [](const vs::TabletHelper& self) { - return vs::components::ScalarHelper(&self.data()); + .def("as_double_scalar", [](vs::TabletHelper& self, vs::ImHelper& im) { + return vs::components::ScalarHelper(&self.data(), + &im.handler()); }); py::class_(m, "Storage") @@ -49,22 +52,26 @@ PYBIND11_PLUGIN(core) { vs::StorageHelper::SetBuffer); py::class_(m, "Im") + .def("__init__", + [](vs::ImHelper& instance) { new (&instance) vs::ImHelper(); }) .def("storage", &vs::ImHelper::storage) .def("tablet", &vs::ImHelper::tablet) .def("add_tablet", &vs::ImHelper::AddTablet) .def("persist_to_disk", &vs::ImHelper::PersistToDisk) - .def("clear_tablets", &vs::ImHelper::ClearTablets); + .def("clear_tablets", &vs::ImHelper::ClearTablets) + .def("start_read_service", + &vs::ImHelper::StartReadService, + "start a thread to maintain read service") + .def("start_write_service", + &vs::ImHelper::StartWriteSerice, + "start a thread to maintain write service") + .def("stop_service", + &vs::ImHelper::StopService, + "stop the service thread"); - m.def("start_read_service", - &vs::start_read_service, - "global information-maintainer object."); - m.def("start_write_service", - &vs::start_write_service, - "global information-maintainer object."); - m.def("im", &vs::im); - m.def("stop_threads", &vs::stop_threads); +// interfaces for components begin -// interfaces for components +// different data type of scalar conponent #define ADD_SCALAR_TYPED_INTERFACE(T, name__) \ py::class_>(m, #name__) \ .def("add_record", &vs::components::ScalarHelper::AddRecord) \ diff --git a/visualdl/logic/sdk.cc b/visualdl/logic/sdk.cc index 513541511d48adc00e1a95f5bf51974fb0b1988f..e3283812569ece8183c96451a0026da6a2683572 100644 --- a/visualdl/logic/sdk.cc +++ b/visualdl/logic/sdk.cc @@ -66,6 +66,8 @@ namespace components { template void ScalarHelper::SetCaptions(const std::vector &captions) { + ACQUIRE_HANDLER(handler_); + CHECK_EQ(data_->captions_size(), 0UL) << "the captions can set only once"; for (int i = 0; i < captions.size(); i++) { data_->add_captions(captions[i]); @@ -74,6 +76,8 @@ void ScalarHelper::SetCaptions(const std::vector &captions) { template void ScalarHelper::AddRecord(int id, const std::vector &values) { + ACQUIRE_HANDLER(handler_); + CHECK_NOTNULL(data_); CHECK_GT(data_->captions_size(), 0UL) << "captions should be set first"; CHECK_EQ(data_->captions_size(), values.size()) @@ -94,6 +98,8 @@ void ScalarHelper::AddRecord(int id, const std::vector &values) { template std::vector> ScalarHelper::GetRecords() const { + ACQUIRE_HANDLER(handler_); + std::vector> result; EntryHelper entry_helper; for (int i = 0; i < data_->records_size(); i++) { @@ -107,6 +113,7 @@ std::vector> ScalarHelper::GetRecords() const { template std::vector ScalarHelper::GetIds() const { + ACQUIRE_HANDLER(handler_); CHECK_NOTNULL(data_); std::vector result; for (int i = 0; i < data_->records_size(); i++) { @@ -117,6 +124,7 @@ std::vector ScalarHelper::GetIds() const { template std::vector ScalarHelper::GetTimestamps() const { + ACQUIRE_HANDLER(handler_); CHECK_NOTNULL(data_); std::vector result; for (int i = 0; i < data_->records_size(); i++) { @@ -127,6 +135,7 @@ std::vector ScalarHelper::GetTimestamps() const { template std::vector ScalarHelper::GetCaptions() const { + ACQUIRE_HANDLER(handler_); return std::vector(data_->captions().begin(), data_->captions().end()); } diff --git a/visualdl/logic/sdk.h b/visualdl/logic/sdk.h index 3937ffafbb91ac2792cb88a242a3833fa2bfc3e4..b74e33174291d31baf44d430116eb86d583c02a6 100644 --- a/visualdl/logic/sdk.h +++ b/visualdl/logic/sdk.h @@ -94,33 +94,50 @@ private: class ImHelper { public: - ImHelper() {} + // TODO(ChunweiYan) decouple helper with resource. + ImHelper() { im_.reset(new IM); } + ImHelper(std::unique_ptr im) : im_(std::move(im)) {} StorageHelper storage() { - return StorageHelper(IM::Global().storage().mutable_data()); + return StorageHelper(im_->storage().mutable_data()); } TabletHelper tablet(const std::string &tag) { - return TabletHelper(IM::Global().storage().tablet(tag)); + return TabletHelper(im_->storage().tablet(tag)); } TabletHelper AddTablet(const std::string &tag, int num_samples) { - return TabletHelper(IM::Global().AddTablet(tag, num_samples)); + return TabletHelper(im_->AddTablet(tag, num_samples)); } - void ClearTablets() { - IM::Global().storage().mutable_data()->clear_tablets(); + std::mutex &handler() { return im_->handler(); } + void ClearTablets() { im_->storage().mutable_data()->clear_tablets(); } + void StartReadService(const std::string &dir, int msecs) { + im_->SetPersistDest(dir); + im_->MaintainRead(dir, msecs); } + void StartWriteSerice(const std::string &dir, int msecs) { + im_->SetPersistDest(dir); + im_->MaintainWrite(dir, msecs); + } + void StopService() { im_->executor().Quit(); } + void PersistToDisk() const { im_->PersistToDisk(); } - void PersistToDisk() const { IM::Global().PersistToDisk(); } +private: + std::unique_ptr im_; }; namespace components { +#define ACQUIRE_HANDLER(handler) std::lock_guard ____(*handler); + /* * Read and write support for Scalar component. */ template class ScalarHelper { public: - ScalarHelper(storage::Tablet *tablet) : data_(tablet) {} + ScalarHelper(storage::Tablet *tablet, std::mutex *handler = nullptr) + : data_(tablet), handler_(handler) {} + ScalarHelper(TabletHelper &tablet, std::mutex *handler = nullptr) + : data_(&tablet.data()), handler_(handler) {} void SetCaptions(const std::vector &captions); @@ -138,27 +155,10 @@ public: private: storage::Tablet *data_; + std::mutex *handler_; }; } // namespace components - -static ImHelper &im() { - static ImHelper im; - return im; -} - -static void start_read_service(const std::string &dir) { - IM::Global().SetPersistDest(dir); - IM::Global().MaintainRead(); -} - -static void start_write_service(const std::string &dir) { - IM::Global().SetPersistDest(dir); - IM::Global().MaintainWrite(); -} - -static void stop_threads() { cc::PeriodExector::Global().Quit(); } - } // namespace visualdl #endif // VISUALDL_BACKEND_LOGIC_SDK_H diff --git a/visualdl/logic/sdk_test.cc b/visualdl/logic/sdk_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..f800f31ee7368ebbb8eb9ae1fe17e3ec454ddf3e --- /dev/null +++ b/visualdl/logic/sdk_test.cc @@ -0,0 +1,81 @@ +#include "visualdl/logic/sdk.h" + +#include + +namespace visualdl { + +struct ScalarTestHelper { + ImHelper rim; + ImHelper wim; + const std::string dir = "./tmp/sdk_test.test"; + + void operator()(std::function read, std::function write) { + wim.StartWriteSerice(dir, 200); + write(); + + // should wait for the write service create log's path + std::this_thread::sleep_for(std::chrono::milliseconds(400)); + rim.StartReadService(dir, 100); + // should wait for the read service to load "tag0" tablet into memory + std::this_thread::sleep_for(std::chrono::milliseconds(600)); + read(); + } +}; + +TEST(Scalar, set_caption) { + ScalarTestHelper helper; + + const std::vector captions({"train", "test"}); + + auto write = [&] { + auto tablet = helper.wim.AddTablet("tag0", -1); + components::ScalarHelper scalar(tablet, &helper.wim.handler()); + + scalar.SetCaptions(captions); + }; + + auto read = [&] { + auto mytablet = helper.rim.tablet("tag0"); + components::ScalarHelper myscalar(mytablet, &helper.rim.handler()); + auto mycaptions = myscalar.GetCaptions(); + + ASSERT_EQ(captions, mycaptions); + }; + + helper(read, write); +} + +TEST(Scalar, add_records) { + ScalarTestHelper helper; + + const std::vector captions({"train", "test"}); + + const size_t nsteps = 100; + + auto write = [&] { + auto tablet = helper.wim.AddTablet("tag0", -1); + components::ScalarHelper scalar(tablet, &helper.wim.handler()); + + scalar.SetCaptions(captions); + + for (int i = 0; i < nsteps; i++) { + scalar.AddRecord(i * 10, std::vector({(float)i, (float)i + 1})); + } + }; + + auto read = [&] { + auto mytablet = helper.rim.tablet("tag0"); + components::ScalarHelper myscalar(mytablet, &helper.rim.handler()); + + auto records = myscalar.GetRecords(); + ASSERT_EQ(records.size(), nsteps); + + for (int i = 0; i < nsteps; i++) { + ASSERT_EQ(records[i], std::vector({(float)i, (float)i + 1})); + } + }; + + helper(read, write); +} + +} // namespace visualdl diff --git a/visualdl/python/CMakeLists.txt b/visualdl/python/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..def51de942894def455976795445e078ea6fb2d8 --- /dev/null +++ b/visualdl/python/CMakeLists.txt @@ -0,0 +1,13 @@ +function(py_test TARGET_NAME) + set(options STATIC static SHARED shared) + set(oneValueArgs "") + set(multiValueArgs SRCS DEPS ARGS) + cmake_parse_arguments(py_test "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) + add_test(NAME ${TARGET_NAME} + COMMAND env PYTHONPATH=${CMAKE_BINARY_DIR}/visualdl/logic + ${PYTHON_EXECUTABLE} -u ${py_test_SRCS} ${py_test_ARGS} + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} + ) +endfunction() + +py_test(test_summary SRCS test_summary.py) diff --git a/visualdl/python/summary.py b/visualdl/python/summary.py index 13b9400c3f358529d72d7057ac3f55ae6488c538..db9cb1d2b5b81f0567b6effcd78e3fa910c74af4 100644 --- a/visualdl/python/summary.py +++ b/visualdl/python/summary.py @@ -6,26 +6,15 @@ import core dtypes = ("float", "double", "int32", "int64") - -def set_storage(dir): - ''' - :param dir: str - directory of summary to write log. - :return: None - ''' - core.im().storage().set_dir(dir) - - -def set_readable_storage(dir): - core.start_read_service(dir) - - -def set_writable_storage(dir): - core.start_write_service(dir) - - -def stop_service(): - core.stop_threads() +def IM(dir, mode="read", msecs=500): + im = core.Im() + READ = "read" + WRITE = "write" + if mode == READ: + im.start_read_service(dir, msecs) + else: + im.start_write_service(dir, msecs) + return im class _Scalar(object): @@ -93,7 +82,7 @@ class _Scalar(object): return self._core_object.get_record_size() -def scalar(tag, dtype='float'): +def scalar(im, tag, dtype='float'): ''' create a scalar component. @@ -105,24 +94,12 @@ def scalar(tag, dtype='float'): ''' assert dtype in dtypes, "invalid dtype(%s), should be one of %s" % ( dtype, str(dtypes)) - tablet = core.im().add_tablet(tag, -1) - dtype2obj = { - 'float': tablet.as_float_scalar, - 'double': tablet.as_double_scalar, - 'int32': tablet.as_int32_scalar, - 'int64': tablet.as_int64_scalar, - } - obj = dtype2obj[dtype]() - return _Scalar(obj) - - -def read_scalar(tag, dtype='float'): - tablet = core.im().tablet(tag) + tablet = im.add_tablet(tag, -1) dtype2obj = { 'float': tablet.as_float_scalar, 'double': tablet.as_double_scalar, 'int32': tablet.as_int32_scalar, 'int64': tablet.as_int64_scalar, } - obj = dtype2obj[dtype]() + obj = dtype2obj[dtype](im) return _Scalar(obj) diff --git a/visualdl/python/test_summary.py b/visualdl/python/test_summary.py index c3caab75052cd04b3ff5b792a7c85988905d2fd8..d4ce733b68463e54d6adfeaa29501139525b87b5 100644 --- a/visualdl/python/test_summary.py +++ b/visualdl/python/test_summary.py @@ -1,6 +1,7 @@ import summary import numpy as np import unittest +import random import time once_flag = False @@ -8,23 +9,27 @@ once_flag = False class ScalarTester(unittest.TestCase): def setUp(self): - summary.set_storage("tmp_dir") - global once_flag - self.scalar = summary.scalar("scalar0") - if not once_flag: - self.py_captions = ["train cost", "test cost"] - self.scalar.set_captions(self.py_captions) + self.dir = "tmp/summary.test" + # clean path + try: + os.rmdir(self.dir) + except: + pass + self.im = summary.IM(self.dir, "write", 200) + self.tablet_name = "scalar0" + self.scalar = summary.scalar(self.im, self.tablet_name) + self.py_captions = ["train cost", "test cost"] + self.scalar.set_captions(self.py_captions) self.py_records = [] self.py_ids = [] + # write for i in range(10): record = [0.1 * i, 0.2 * i] id = i * 10 self.py_records.append(record) self.py_ids.append(id) - if not once_flag: - self.scalar.add(id, record) - once_flag = True + self.scalar.add(id, record) def test_records(self): self.assertEqual(self.scalar.size, len(self.py_records)) @@ -39,6 +44,58 @@ class ScalarTester(unittest.TestCase): def test_captions(self): self.assertEqual(self.scalar.captions, self.py_captions) + def test_read_records(self): + time.sleep(1) + im = summary.IM(self.dir, "read", 200) + time.sleep(1) + scalar = summary.scalar(im, self.tablet_name) + records = scalar.records + self.assertEqual(len(self.py_records), scalar.size) + for i, record in enumerate(self.scalar.records): + self.assertTrue(np.isclose(record, records[i]).all()) + + def test_read_ids(self): + time.sleep(0.6) + im = summary.IM(self.dir, "read", msecs=200) + time.sleep(0.6) + scalar = summary.scalar(im, self.tablet_name) + self.assertEqual(len(self.py_ids), scalar.size) + for i, id in enumerate(scalar.ids): + self.assertEqual(self.py_ids[i], id) + + def test_read_captions(self): + time.sleep(0.6) + im = summary.IM(self.dir, "read", msecs=200) + time.sleep(0.6) + scalar = summary.scalar(im, self.tablet_name) + self.assertEqual(scalar.captions, self.py_captions) + + def test_mix_read_write(self): + write_im = summary.IM(self.dir, "write", msecs=200) + time.sleep(0.6) + read_im = summary.IM(self.dir, "read", msecs=200) + + scalar_writer = summary.scalar(write_im, self.tablet_name) + scalar_reader = summary.scalar(read_im, self.tablet_name) + + scalar_writer.set_captions(["train cost", "test cost"]) + for i in range(1000): + scalar_writer.add(i, [random.random(), random.random()]) + + scalar_reader.records + + for i in range(500): + scalar_writer.add(i, [random.random(), random.random()]) + + scalar_reader.records + + for i in range(500): + scalar_writer.add(i, [random.random(), random.random()]) + + for i in range(10): + scalar_reader.records + scalar_reader.captions + if __name__ == '__main__': unittest.main() diff --git a/visualdl/storage/storage.cc b/visualdl/storage/storage.cc index 2b88ad091a9810fb7473c12fa6daa4c2a214017b..9d1d393c5a88b87ab5979a4c7e476adf89ed0da7 100644 --- a/visualdl/storage/storage.cc +++ b/visualdl/storage/storage.cc @@ -9,13 +9,14 @@ namespace visualdl { const std::string StorageBase::meta_file_name = "storage.meta"; -std::string StorageBase::meta_path() const { - CHECK(!storage_.dir().empty()) << "storage.dir should be set first"; - return storage_.dir() + "/" + meta_file_name; +std::string StorageBase::meta_path(const std::string &dir) const { + CHECK(!dir.empty()) << "dir is empty"; + return dir + "/" + meta_file_name; } -std::string StorageBase::tablet_path(const std::string &tag) const { - CHECK(!storage_.dir().empty()) << "storage.dir should be set first"; - return storage_.dir() + "/" + tag; +std::string StorageBase::tablet_path(const std::string &dir, + const std::string &tag) const { + CHECK(!dir.empty()) << "dir should be set first"; + return dir + "/" + tag; } storage::Tablet *MemoryStorage::NewTablet(const std::string &tag, @@ -39,18 +40,20 @@ storage::Tablet *MemoryStorage::tablet(const std::string &tag) { } // TODO add some checksum to avoid unnecessary saving -void MemoryStorage::PersistToDisk() const { - CHECK(!storage_.dir().empty()) << "storage's dir should be set first"; - VLOG(3) << "persist storage to disk path " << storage_.dir(); +void MemoryStorage::PersistToDisk(const std::string &dir) { + CHECK(!dir.empty()); + storage_.set_dir(dir); // make a directory if not exist - fs::TryMkdir(storage_.dir()); + fs::TryRecurMkdir(dir); // write storage out - fs::SerializeToFile(storage_, meta_path()); + VLOG(2) << "to serize meta to dir " << dir; + fs::SerializeToFile(storage_, meta_path(dir)); + VLOG(2) << "serize meta to dir " << dir; // write all the tablets for (auto tag : storage_.tags()) { auto it = tablets_.find(tag); CHECK(it != tablets_.end()); - fs::SerializeToFile(it->second, tablet_path(tag)); + fs::SerializeToFile(it->second, tablet_path(dir, tag)); } } @@ -59,33 +62,52 @@ void MemoryStorage::LoadFromDisk(const std::string &dir) { CHECK(!dir.empty()) << "dir is empty"; storage_.set_dir(dir); // load storage - CHECK(fs::DeSerializeFromFile(&storage_, meta_path())) - << "parse from " << meta_path() << " failed"; - + CHECK(fs::DeSerializeFromFile(&storage_, meta_path(dir))) + << "parse from " << meta_path(dir) << " failed"; // load all the tablets for (int i = 0; i < storage_.tags_size(); i++) { auto tag = storage_.tags(i); - CHECK(fs::DeSerializeFromFile(&tablets_[tag], tablet_path(tag))); + CHECK(fs::DeSerializeFromFile(&tablets_[tag], tablet_path(dir, tag))); } } -void MemoryStorage::StartReadService() { - cc::PeriodExector::task_t task = [this] { - VLOG(3) << "loading from " << storage_.dir(); - LoadFromDisk(storage_.dir()); +void MemoryStorage::StartReadService(const std::string &dir, + int msecs, + std::mutex *handler) { + CHECK(executor_ != nullptr); + CHECK(!dir.empty()) << "dir should be set first"; + cc::PeriodExector::task_t task = [dir, this, handler] { + VLOG(1) << "loading from " << dir; + if (handler != nullptr) { + std::lock_guard _(*handler); + LoadFromDisk(dir); + } else { + LoadFromDisk(dir); + } return true; }; - cc::PeriodExector::Global().Start(); - VLOG(3) << "push read task"; - cc::PeriodExector::Global()(std::move(task), 2512); + // executor_.Start(); + VLOG(1) << "push read task"; + (*executor_)(std::move(task), msecs); } -void MemoryStorage::StartWriteSerice() { - cc::PeriodExector::Global().Start(); - cc::PeriodExector::task_t task = [this] { - PersistToDisk(); +void MemoryStorage::StartWriteService(const std::string &dir, + int msecs, + std::mutex *handler) { + CHECK(executor_ != nullptr); + CHECK(!dir.empty()) << "dir should be set first"; + storage_.set_dir(dir); + // executor_.Start(); + cc::PeriodExector::task_t task = [dir, handler, this] { + VLOG(2) << "persist to disk"; + if (handler != nullptr) { + std::lock_guard _(*handler); + PersistToDisk(dir); + } else { + PersistToDisk(dir); + } return true; }; - cc::PeriodExector::Global()(std::move(task), 2000); + (*executor_)(std::move(task), msecs); } } // namespace visualdl diff --git a/visualdl/storage/storage.h b/visualdl/storage/storage.h index 2787d9af87a9547921531b7dc509798a70001b71..3b59e5000e24966b7d64f9a7fbc421119d6ef5a3 100644 --- a/visualdl/storage/storage.h +++ b/visualdl/storage/storage.h @@ -3,9 +3,12 @@ #include #include +#include +#include #include #include "visualdl/storage/storage.pb.h" +#include "visualdl/utils/concurrency.h" namespace visualdl { @@ -36,8 +39,8 @@ public: storage_.set_dir(dir); } - std::string meta_path() const; - std::string tablet_path(const std::string &tag) const; + std::string meta_path(const std::string &dir) const; + std::string tablet_path(const std::string &dir, const std::string &tag) const; /* * Create a new Tablet storage. @@ -56,7 +59,7 @@ public: * Persist the data from cache to disk. Both the memory storage or disk * storage should write changes to disk for persistence. */ - virtual void PersistToDisk() const = 0; + virtual void PersistToDisk(const std::string &dir) = 0; /* * Load data from disk. @@ -75,28 +78,41 @@ protected: */ class MemoryStorage final : public StorageBase { public: + MemoryStorage() {} + MemoryStorage(cc::PeriodExector *executor) : executor_(executor) {} + ~MemoryStorage() { + if (executor_ != nullptr) executor_->Quit(); + } storage::Tablet *NewTablet(const std::string &tag, int num_samples) override; storage::Tablet *tablet(const std::string &tag) override; - void PersistToDisk() const override; + void PersistToDisk(const std::string &dir) override; void LoadFromDisk(const std::string &dir) override; /* * Create a thread which will keep reading the latest data from the disk to * memory. + * + * msecs: how many millisecond to sync memory and disk. */ - void StartReadService(); + void StartReadService(const std::string &dir, int msecs, std::mutex *handler); /* * Create a thread which will keep writing the latest changes from memory to * disk. + * + * msecs: how many millisecond to sync memory and disk. */ - void StartWriteSerice(); + void StartWriteService(const std::string &dir, + int msecs, + std::mutex *handler); private: std::map tablets_; + // TODO(ChunweiYan) remove executor here. + cc::PeriodExector *executor_{nullptr}; }; } // namespace visualdl diff --git a/visualdl/storage/storage_test.cc b/visualdl/storage/storage_test.cc index 9ddf13a36153b8bbdb2b14ceabea7c79c971efbe..8dbc7f566735d5b0af7889e5c365285c60df4573 100644 --- a/visualdl/storage/storage_test.cc +++ b/visualdl/storage/storage_test.cc @@ -32,15 +32,17 @@ TEST_F(MemoryStorageTest, AddTablet) { } TEST_F(MemoryStorageTest, PersistToDisk) { - storage_.SetStorage("./tmp"); - CHECK(!storage_.data().dir().empty()); + const std::string dir = "./tmp/201.test"; + storage_.SetStorage(dir); string tag = "add%20tag0"; storage_.NewTablet(tag, -1); - storage_.PersistToDisk(); + storage_.PersistToDisk(dir); + LOG(INFO) << "persist to disk"; MemoryStorage other; - other.LoadFromDisk("./tmp"); + other.LoadFromDisk(dir); + LOG(INFO) << "read from disk"; ASSERT_EQ(other.data().SerializeAsString(), storage_.data().SerializeAsString()); } diff --git a/visualdl/utils/concurrency.h b/visualdl/utils/concurrency.h index 5a8c320dfd4f57a28c1e420d92166504defd5c25..cb73dc2b4d153360c48c2bc24220f9c96779186c 100644 --- a/visualdl/utils/concurrency.h +++ b/visualdl/utils/concurrency.h @@ -17,13 +17,9 @@ namespace cc { struct PeriodExector { using task_t = std::function; - static PeriodExector& Global() { - static PeriodExector exec; - return exec; - } - void Quit() { // TODO use some conditonal variable to help quit immediately. + // std::this_thread::sleep_for(std::chrono::milliseconds(200)); quit = true; } @@ -34,6 +30,7 @@ struct PeriodExector { auto task_wrapper = [=] { while (!quit) { + // task failed if (!task()) break; // if the program is terminated, quit while as soon as possible. // this is just trick, but should works. @@ -57,6 +54,7 @@ struct PeriodExector { } ~PeriodExector() { + Quit(); for (auto& t : threads_) { if (t.joinable()) { t.join(); diff --git a/visualdl/utils/filesystem.h b/visualdl/utils/filesystem.h index 593013515ca5f0c3b60c47cd1e0476649dd070c4..58438f26c4353fc70b4808239e2d6568d4600972 100644 --- a/visualdl/utils/filesystem.h +++ b/visualdl/utils/filesystem.h @@ -39,6 +39,7 @@ bool SerializeToFile(const T& proto, const std::string& path) { template bool DeSerializeFromFile(T* proto, const std::string& path) { std::ifstream file(path, std::ios::binary); + CHECK(file.is_open()) << "open " << path << " failed"; return proto->ParseFromIstream(&file); } @@ -50,6 +51,19 @@ void TryMkdir(const std::string& dir) { } } +// Create a path by recursively create directries +void TryRecurMkdir(const std::string& path) { + // split path by '/' + for (int i = 1; i < path.size() - 1; i++) { + if (path[i] == '/') { + auto dir = path.substr(0, i + 1); + TryMkdir(dir); + } + } + // the last level + TryMkdir(path); +} + inline void Write(const std::string& path, const std::string& buffer, std::ios::openmode open_mode = std::ios::binary) { diff --git a/visualdl/utils/test_concurrency.cc b/visualdl/utils/test_concurrency.cc index 96f18caabcf13a49fc4c98f31e3c0f7ff37cede2..ea4540d33b0cb3921b373f670fee3fd81212a8cd 100644 --- a/visualdl/utils/test_concurrency.cc +++ b/visualdl/utils/test_concurrency.cc @@ -8,12 +8,13 @@ namespace visualdl { int counter = 0; TEST(concurrency, test) { - cc::PeriodExector::task_t task = [&counter]() { + cc::PeriodExector executor; + cc::PeriodExector::task_t task = [&]() { LOG(INFO) << "Hello " << counter++; if (counter > 5) return false; return true; }; - cc::PeriodExector::Global()(std::move(task), 200); + executor(std::move(task), 200); } } // namespace visualdl