diff --git a/CMakeLists.txt b/CMakeLists.txt index 126529b9b805fe45549ae820be87e958de5fd334..590abdd68547a2612b5d0a6e184618f4ff29dc6a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,16 +38,14 @@ add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/python) add_executable(vl_test ${PROJECT_SOURCE_DIR}/visualdl/test.cc ${PROJECT_SOURCE_DIR}/visualdl/storage/storage_test.cc - ${PROJECT_SOURCE_DIR}/visualdl/utils/test_concurrency.cc - ${PROJECT_SOURCE_DIR}/visualdl/logic/im_test.cc ${PROJECT_SOURCE_DIR}/visualdl/logic/sdk_test.cc + ${PROJECT_SOURCE_DIR}/visualdl/utils/test_concurrency.cc ${PROJECT_SOURCE_DIR}/visualdl/utils/concurrency.h ${PROJECT_SOURCE_DIR}/visualdl/utils/filesystem.h ) -target_link_libraries(vl_test storage sdk im gtest glog protobuf gflags pthread) +target_link_libraries(vl_test sdk storage entry im gtest glog protobuf gflags pthread) enable_testing () -add_custom_target(test_init COMMAND ./init_test.sh $CMAKE_BINARY_DIR) +add_custom_target(test_init COMMAND $CMAKE_BINARY_DIR) add_test(NAME vstest COMMAND ./vl_test) -set_target_properties(vl_test PROPERTIES DEPENDS test_init) diff --git a/visualdl/logic/CMakeLists.txt b/visualdl/logic/CMakeLists.txt index 6b88d931e4ee3173cef86d7d3b8b67441fe3ec40..f7d8254442ca97189faa2f19c17e9f5bebb22fd6 100644 --- a/visualdl/logic/CMakeLists.txt +++ b/visualdl/logic/CMakeLists.txt @@ -1,10 +1,11 @@ -add_library(sdk ${PROJECT_SOURCE_DIR}/visualdl/logic/sdk.cc) +#add_library(sdk ${PROJECT_SOURCE_DIR}/visualdl/logic/sdk.cc) add_library(im ${PROJECT_SOURCE_DIR}/visualdl/logic/im.cc) +add_library(sdk ${PROJECT_SOURCE_DIR}/visualdl/logic/sdk.cc) add_dependencies(im storage_proto) -add_dependencies(sdk storage_proto) +add_dependencies(sdk entry storage storage_proto) ## pybind add_library(core SHARED ${PROJECT_SOURCE_DIR}/visualdl/logic/pybind.cc) -add_dependencies(core pybind python im storage sdk protobuf glog) -target_link_libraries(core PRIVATE pybind python im storage sdk protobuf glog) +add_dependencies(core pybind python im entry storage sdk protobuf glog) +target_link_libraries(core PRIVATE pybind entry python im storage sdk protobuf glog) set_target_properties(core PROPERTIES PREFIX "" SUFFIX ".so") diff --git a/visualdl/logic/im.cc b/visualdl/logic/im.cc index 624fbfbba50fe623cc4226591e336fbb66af9552..a2ec1f22e23b260385e887e3c40a91328ce112b9 100644 --- a/visualdl/logic/im.cc +++ b/visualdl/logic/im.cc @@ -2,79 +2,41 @@ #include #include "visualdl/logic/im.h" +#include "visualdl/storage/entry.h" +#include "visualdl/storage/storage.h" +#include "visualdl/storage/tablet.h" namespace visualdl { -/* - * @num_samples: number of instances to sample - * @size: counter of the records. - * @returns: id of the instance to replace, if drop this instance, return -1. - */ -int ReserviorSample(int num_samples, int num_records) { - if (num_records <= num_samples) { - return num_records; - } - - std::srand(std::time(0)); - float prob = static_cast(std::rand()) / RAND_MAX; - float receive_prob = static_cast(num_samples) / num_records; - if (prob < receive_prob) { - int offset2replace = std::rand() % num_samples; - return offset2replace; - } - return -1; -} - -void IM::SetPersistDest(const std::string &path) { - CHECK(storage_->mutable_data()->dir().empty()) - << "duplicate set storage's path"; - storage_->mutable_data()->set_dir(path); -} - -storage::Tablet *IM::AddTablet(const std::string &tag, int num_samples) { - auto tablet = storage_->NewTablet(tag, num_samples); - return tablet; +template +void SimpleWriteSyncGuard::Start() { + CHECK(data_); + data_->parent()->meta.Inc(); } -void IM::AddRecord(const std::string &tag, const storage::Record &data) { - auto *tablet = storage_->tablet(tag); - CHECK(tablet) << "no tablet called " << tag; - - auto num_records = tablet->total_records(); - const auto num_samples = tablet->num_samples(); - - int offset; - // use reservoir sampling or not - if (num_samples > 0) { - offset = ReserviorSample(num_samples, num_records + 1); - if (offset < 0) return; - } else { - offset = num_records; - } - - storage::Record *record; - if (offset >= num_records) { - record = tablet->add_records(); - } else { - record = tablet->mutable_records(offset); +template +void SimpleWriteSyncGuard::End() { + CHECK(data_); + if (data_->parent()->meta.ToSync()) { + Sync(); } - - *record = data; - tablet->set_total_records(num_records + 1); } -void IM::Clear() { - auto *data = storage().mutable_data(); - data->clear_tablets(); - data->clear_dir(); - data->clear_timestamp(); +template +void SimpleWriteSyncGuard::Sync() { + CHECK(data_); + auto* storage = data_->parent(); + storage->PersistToDisk(); } -void IM::PersistToDisk() { - CHECK(!storage_->data().dir().empty()) << "path of storage should be set"; - // TODO make dir first - // MakeDir(storage_.data().dir()); - storage_->PersistToDisk(storage_->data().dir()); -} +template class SimpleWriteSyncGuard; +template class SimpleWriteSyncGuard; +template class SimpleWriteSyncGuard; +template class SimpleWriteSyncGuard>; +template class SimpleWriteSyncGuard>; +template class SimpleWriteSyncGuard>; +template class SimpleWriteSyncGuard>; +template class SimpleWriteSyncGuard>; +template class SimpleWriteSyncGuard>; } // namespace visualdl diff --git a/visualdl/logic/im.h b/visualdl/logic/im.h index abbbc989fd78164a8f82e9de0c96aaf850293c58..d503a58cbb54ab0ce36e193813d099128a9f0be0 100644 --- a/visualdl/logic/im.h +++ b/visualdl/logic/im.h @@ -6,82 +6,26 @@ #include #include -#include "visualdl/storage/storage.h" #include "visualdl/utils/concurrency.h" +#include "visualdl/utils/guard.h" namespace visualdl { /* - * IM(Information Maintainer) maintain the Storage singleton in memory, - * pre-compute some the statistical information to help visualizaton. - * - * There should be two processes and each have an IM, one is the web server - * which hold one IM to read the storage, the other is the SDK(python or C++), - * it will get an IM to write latest changes to storage. - * - * An IM have an underlying Storage object, which might be a memory based - * storage or a disk based one, both has the same interfaces those defined by - * class StorageBase. - * - * The SDK's IM will maintain the changes and periodically write to disk, and - * the web server's IM will periodically read latest storage from disk. + * Simple logic to sync memory to disk. */ -class IM final { +template +class SimpleWriteSyncGuard { public: - IM() { storage_.reset(new MemoryStorage(&executor_)); } - // IM(StorageBase::Type type, StorageBase::Mode mode); - ~IM() { executor_.Quit(); } + SimpleWriteSyncGuard(T* x) : data_(x) { Start(); } + ~SimpleWriteSyncGuard() { End(); } - void MaintainRead(const std::string &dir, int msecs) { - LOG(INFO) << "start maintain read"; - dynamic_cast(storage_.get()) - ->StartReadService(dir, msecs, &lock_); - } - - void MaintainWrite(const std::string &dir, int msecs) { - dynamic_cast(storage_.get()) - ->StartWriteService(dir, msecs, &lock_); - } - - /* - * Set the disk path to store the Storage object. - */ - void SetPersistDest(const std::string &path); - - storage::Tablet *AddTablet(const std::string &tag, int num_samples); - - /* - * @tag: tag of the target Tablet. - * @record: a record - * - * NOTE pass in the serialized protobuf message will trigger copying, but - * simpler to support different Tablet data formats. - */ - void AddRecord(const std::string &tag, const storage::Record &record); - - /* - * delete all the information. - */ - void Clear(); - - /* - * Save the Storage Protobuf to disk. - */ - void PersistToDisk(); - - StorageBase &storage() { return *storage_; } - - cc::PeriodExector &executor() { return executor_; } - - std::mutex &handler() { return lock_; } + void Start(); + void End(); + void Sync(); private: - // read write lock for protobuf in memory - // TODO(ChunweiYan) mutex too heavy here, might change to a message queue to - // reduce the frequency of visiting disk - std::mutex lock_; - std::unique_ptr storage_; - cc::PeriodExector executor_; + T* data_{nullptr}; }; } // namespace visualdl diff --git a/visualdl/logic/im_test.cc b/visualdl/logic/im_test.cc deleted file mode 100644 index 37246c1763c03071d25debe3b260cd81b054b552..0000000000000000000000000000000000000000 --- a/visualdl/logic/im_test.cc +++ /dev/null @@ -1,37 +0,0 @@ -#include "visualdl/logic/im.h" - -#include "gtest/gtest.h" - -#include "visualdl/storage/storage.h" - -namespace visualdl { - -class ImTester : public ::testing::Test { -protected: - void SetUp() override {} - - IM im; -}; - -TEST_F(ImTester, AddTablet) { - im.Clear(); - im.AddTablet("tag0", 20); -} - -TEST_F(ImTester, AddRecord) { - im.Clear(); - - im.AddTablet("tag0", 20); - for (int i = 0; i < 100; i++) { - storage::Record rcd; - rcd.set_dtype(storage::DataType::kInt32s); - for (int j = 0; j < 10; j++) { - rcd.add_data()->add_i32s(i * 20 + j); - } - im.AddRecord("tag0", rcd); - } - - ASSERT_EQ(im.storage().tablet("tag0")->records_size(), 100UL); -} - -} // namespace visualdl diff --git a/visualdl/logic/pybind.cc b/visualdl/logic/pybind.cc index 69d40ca039e606d9d20a3d327347b665c1010480..de82b415e375924f349e0cf5a869776374491d62 100644 --- a/visualdl/logic/pybind.cc +++ b/visualdl/logic/pybind.cc @@ -6,84 +6,68 @@ namespace py = pybind11; namespace vs = visualdl; +namespace cp = visualdl::components; PYBIND11_PLUGIN(core) { py::module m("core", "C++ core of VisualDL"); - // m.doc() = "visualdl python core API"; - py::class_(m, "Tablet") - // other member setter and getter - .def("record_buffer", &vs::TabletHelper::record_buffer) - .def("records_size", &vs::TabletHelper::records_size) - .def("buffer", &vs::TabletHelper::buffer) - .def("human_readable_buffer", &vs::TabletHelper::human_readable_buffer) - .def("set_buffer", - (void (vs::TabletHelper::*)(const std::string&)) & - vs::TabletHelper::SetBuffer) - // scalar interface - .def("as_int32_scalar", - [](vs::TabletHelper& self, vs::ImHelper& im) { - return vs::components::ScalarHelper(self, &im.handler()); - }) - .def("as_int64_scalar", - [](vs::TabletHelper& self, vs::ImHelper& im) { - return vs::components::ScalarHelper(&self.data(), - &im.handler()); - }) - .def("as_float_scalar", - [](vs::TabletHelper& self, vs::ImHelper& im) { - return vs::components::ScalarHelper(&self.data(), - &im.handler()); - }) - .def("as_double_scalar", [](vs::TabletHelper& self, vs::ImHelper& im) { - return vs::components::ScalarHelper(&self.data(), - &im.handler()); - }); +#define ADD_SCALAR(T) \ + py::class_>(m, "ScalarReader__" #T) \ + .def("records", &cp::ScalarReader::records) \ + .def("timestamps", &cp::ScalarReader::timestamps) \ + .def("ids", &cp::ScalarReader::ids) \ + .def("caption", &cp::ScalarReader::caption); + ADD_SCALAR(int); + ADD_SCALAR(float); + ADD_SCALAR(double); + ADD_SCALAR(int64_t); +#undef ADD_SCALAR - py::class_(m, "Storage") - .def("timestamp", &vs::StorageHelper::timestamp) - .def("dir", &vs::StorageHelper::dir) - .def("set_dir", &vs::StorageHelper::SetDir) - .def("tablets_size", &vs::StorageHelper::tablets_size) - .def("buffer", &vs::StorageHelper::buffer) - .def("human_readable_buffer", &vs::StorageHelper::human_readable_buffer) - .def("set_buffer", - (void (vs::StorageHelper::*)(const std::string&)) & - vs::StorageHelper::SetBuffer); +#define ADD_SCALAR_WRITER(T) \ + py::class_>(m, "ScalarWriter__" #T) \ + .def("set_caption", &cp::Scalar::SetCaption) \ + .def("add_record", &cp::Scalar::AddRecord); + ADD_SCALAR_WRITER(int); + ADD_SCALAR_WRITER(float); + ADD_SCALAR_WRITER(double); +#undef ADD_SCALAR_WRITER - py::class_(m, "Im") - .def("__init__", - [](vs::ImHelper& instance) { new (&instance) vs::ImHelper(); }) - .def("storage", &vs::ImHelper::storage) - .def("tablet", &vs::ImHelper::tablet) - .def("add_tablet", &vs::ImHelper::AddTablet) - .def("persist_to_disk", &vs::ImHelper::PersistToDisk) - .def("clear_tablets", &vs::ImHelper::ClearTablets) - .def("start_read_service", - &vs::ImHelper::StartReadService, - "start a thread to maintain read service") - .def("start_write_service", - &vs::ImHelper::StartWriteSerice, - "start a thread to maintain write service") - .def("stop_service", - &vs::ImHelper::StopService, - "stop the service thread"); +#define ADD_SCALAR(T) \ + .def("get_scalar_" #T, [](vs::Reader& self, const std::string& tag) { \ + auto tablet = self.tablet(tag); \ + return vs::components::ScalarReader(std::move(tablet)); \ + }) + py::class_(m, "Reader") + .def( + "__init__", + [](vs::Reader& instance, + const std::string& mode, + const std::string& dir) { new (&instance) vs::Reader(mode, dir); }) + // clang-format off + ADD_SCALAR(float) + ADD_SCALAR(double) + ADD_SCALAR(int); +// clang-format on +#undef ADD_SCALAR -// interfaces for components begin +#define ADD_SCALAR(T) \ + .def("new_scalar_" #T, [](vs::Writer& self, const std::string& tag) { \ + auto tablet = self.AddTablet(tag); \ + return cp::Scalar(tablet); \ + }) + + py::class_(m, "Writer") + .def("__init__", + [](vs::Writer& instance, const std::string& dir, int sync_cycle) { + new (&instance) vs::Writer(dir); + instance.storage().meta.cycle = sync_cycle; + }) + .def("as_mode", &vs::Writer::AsMode) + // clang-format off + ADD_SCALAR(float) + ADD_SCALAR(double) + ADD_SCALAR(int); +// clang-format on +#undef ADD_SCALAR -// different data type of scalar conponent -#define ADD_SCALAR_TYPED_INTERFACE(T, name__) \ - py::class_>(m, #name__) \ - .def("add_record", &vs::components::ScalarHelper::AddRecord) \ - .def("set_captions", &vs::components::ScalarHelper::SetCaptions) \ - .def("get_records", &vs::components::ScalarHelper::GetRecords) \ - .def("get_captions", &vs::components::ScalarHelper::GetCaptions) \ - .def("get_ids", &vs::components::ScalarHelper::GetIds) \ - .def("get_record_size", &vs::components::ScalarHelper::GetSize) \ - .def("get_timestamps", &vs::components::ScalarHelper::GetTimestamps); - ADD_SCALAR_TYPED_INTERFACE(int32_t, ScalarInt32); - ADD_SCALAR_TYPED_INTERFACE(int64_t, ScalarInt64); - ADD_SCALAR_TYPED_INTERFACE(float, ScalarFloat); - ADD_SCALAR_TYPED_INTERFACE(double, ScalarDouble); -#undef ADD_SCALAR_TYPED_INTERFACE -} +} // end pybind diff --git a/visualdl/logic/sdk.cc b/visualdl/logic/sdk.cc index e3283812569ece8183c96451a0026da6a2683572..62c46ea6d71d34c94e88d9f019bdfd5ef73c3591 100644 --- a/visualdl/logic/sdk.cc +++ b/visualdl/logic/sdk.cc @@ -1,149 +1,51 @@ #include "visualdl/logic/sdk.h" -#include - namespace visualdl { -#define IMPL_ENTRY_SET_OR_ADD(method__, ctype__, dtype__, opr__) \ - template <> \ - void EntryHelper::method__(ctype__ v) { \ - entry->set_dtype(storage::DataType::dtype__); \ - entry->opr__(v); \ - } - -IMPL_ENTRY_SET_OR_ADD(Set, int32_t, kInt32, set_i32); -IMPL_ENTRY_SET_OR_ADD(Set, int64_t, kInt64, set_i64); -IMPL_ENTRY_SET_OR_ADD(Set, bool, kBool, set_b); -IMPL_ENTRY_SET_OR_ADD(Set, float, kFloat, set_f); -IMPL_ENTRY_SET_OR_ADD(Set, double, kDouble, set_d); -IMPL_ENTRY_SET_OR_ADD(Add, int32_t, kInt32s, add_i32s); -IMPL_ENTRY_SET_OR_ADD(Add, int64_t, kInt64s, add_i64s); -IMPL_ENTRY_SET_OR_ADD(Add, float, kFloats, add_fs); -IMPL_ENTRY_SET_OR_ADD(Add, double, kDoubles, add_ds); -IMPL_ENTRY_SET_OR_ADD(Add, std::string, kStrings, add_ss); -IMPL_ENTRY_SET_OR_ADD(Add, bool, kBools, add_bs); - -#define IMPL_ENTRY_GET(T, fieldname__) \ - template <> \ - T EntryHelper::Get() const { \ - return entry->fieldname__(); \ - } -IMPL_ENTRY_GET(int32_t, i32); -IMPL_ENTRY_GET(int64_t, i64); -IMPL_ENTRY_GET(float, f); -IMPL_ENTRY_GET(double, d); -IMPL_ENTRY_GET(std::string, s); -IMPL_ENTRY_GET(bool, b); - -#define IMPL_ENTRY_GET_MULTI(T, fieldname__) \ - template <> \ - std::vector EntryHelper::GetMulti() const { \ - return std::vector(entry->fieldname__().begin(), \ - entry->fieldname__().end()); \ - } - -IMPL_ENTRY_GET_MULTI(int32_t, i32s); -IMPL_ENTRY_GET_MULTI(int64_t, i64s); -IMPL_ENTRY_GET_MULTI(float, fs); -IMPL_ENTRY_GET_MULTI(double, ds); -IMPL_ENTRY_GET_MULTI(std::string, ss); -IMPL_ENTRY_GET_MULTI(bool, bs); - -std::string StorageHelper::human_readable_buffer() const { - std::string buffer; - google::protobuf::TextFormat::PrintToString(*data_, &buffer); - return buffer; -} - -std::string TabletHelper::human_readable_buffer() const { - std::string buffer; - google::protobuf::TextFormat::PrintToString(*data_, &buffer); - return buffer; -} - -// implementations for components namespace components { template -void ScalarHelper::SetCaptions(const std::vector &captions) { - ACQUIRE_HANDLER(handler_); - - CHECK_EQ(data_->captions_size(), 0UL) << "the captions can set only once"; - for (int i = 0; i < captions.size(); i++) { - data_->add_captions(captions[i]); - } -} - -template -void ScalarHelper::AddRecord(int id, const std::vector &values) { - ACQUIRE_HANDLER(handler_); - - CHECK_NOTNULL(data_); - CHECK_GT(data_->captions_size(), 0UL) << "captions should be set first"; - CHECK_EQ(data_->captions_size(), values.size()) - << "number of values in a record should be compatible with the " - "captions"; - // add record data - auto *record = data_->add_records(); - auto *data = record->add_data(); - EntryHelper entry_helper(data); - for (auto v : values) { - entry_helper.Add(v); +std::vector ScalarReader::records() const { + std::vector res; + for (int i = 0; i < reader_.total_records(); i++) { + res.push_back(reader_.record(i).data(0).Get()); } - // set record id - record->set_id(id); - // set record timestamp - record->set_timestamp(time(NULL)); + return res; } template -std::vector> ScalarHelper::GetRecords() const { - ACQUIRE_HANDLER(handler_); - - std::vector> result; - EntryHelper entry_helper; - for (int i = 0; i < data_->records_size(); i++) { - auto *entry = data_->mutable_records(i)->mutable_data(0); - entry_helper(entry); - auto datas = entry_helper.GetMulti(); - result.push_back(std::move(datas)); +std::vector ScalarReader::ids() const { + std::vector res; + for (int i = 0; i < reader_.total_records(); i++) { + res.push_back(reader_.record(i).id()); } - return result; + return res; } template -std::vector ScalarHelper::GetIds() const { - ACQUIRE_HANDLER(handler_); - CHECK_NOTNULL(data_); - std::vector result; - for (int i = 0; i < data_->records_size(); i++) { - result.push_back(data_->records(i).id()); +std::vector ScalarReader::timestamps() const { + std::vector res; + for (int i = 0; i < reader_.total_records(); i++) { + res.push_back(reader_.record(i).timestamp()); } - return result; + return res; } template -std::vector ScalarHelper::GetTimestamps() const { - ACQUIRE_HANDLER(handler_); - CHECK_NOTNULL(data_); - std::vector result; - for (int i = 0; i < data_->records_size(); i++) { - result.push_back(data_->records(i).timestamp()); - } - return result; +std::string ScalarReader::caption() const { + CHECK(!reader_.captions().empty()) << "no caption"; + return reader_.captions().front(); } template -std::vector ScalarHelper::GetCaptions() const { - ACQUIRE_HANDLER(handler_); - return std::vector(data_->captions().begin(), - data_->captions().end()); +size_t ScalarReader::size() const { + return reader_.total_records(); } -template class ScalarHelper; -template class ScalarHelper; -template class ScalarHelper; -template class ScalarHelper; +template class ScalarReader; +template class ScalarReader; +template class ScalarReader; +template class ScalarReader; } // namespace components diff --git a/visualdl/logic/sdk.h b/visualdl/logic/sdk.h index b74e33174291d31baf44d430116eb86d583c02a6..96ee4321623923a2b55668cda89f863514381db0 100644 --- a/visualdl/logic/sdk.h +++ b/visualdl/logic/sdk.h @@ -1,164 +1,98 @@ #ifndef VISUALDL_LOGIC_SDK_H #define VISUALDL_LOGIC_SDK_H -#include -#include -#include - -#include "visualdl/logic/im.h" - +#include "visualdl/storage/storage.h" +#include "visualdl/storage/tablet.h" +#include "visualdl/utils/string.h" namespace visualdl { -/* - * Utility helper for storage::Entry. - */ -template -struct EntryHelper { - // use pointer to avoid copy - storage::Entry *entry{nullptr}; - - EntryHelper() {} - explicit EntryHelper(storage::Entry *entry) : entry(entry) {} - void operator()(storage::Entry *entry) { this->entry = entry; } - - /* - * Set a single value. - */ - void Set(T v); - - /* - * Add a value to repeated message field. - */ - void Add(T v); - - /* - * Get a single value. - */ - T Get() const; - - /* - * Get repeated field. - */ - std::vector GetMulti() const; -}; - -class TabletHelper { +class Writer { public: - // basic member getter and setter - std::string record_buffer(int idx) const { - return data_->records(idx).SerializeAsString(); - } - size_t records_size() const { return data_->records_size(); } - std::string buffer() const { return data_->SerializeAsString(); } - std::string human_readable_buffer() const; - void SetBuffer(const storage::Tablet &t) { *data_ = t; } - void SetBuffer(const std::string &b) { data_->ParseFromString(b); } - storage::Tablet &data() const { return *data_; } - - // constructor that enable concurrency. - TabletHelper(storage::Tablet *t) : data_(t) {} - // data updater that resuage of one instance. - TabletHelper &operator()(storage::Tablet *t) { - data_ = t; - return *this; + Writer(const std::string& dir) { + storage_.SetDir(dir); } -private: - storage::Tablet *data_; -}; - -class StorageHelper { -public: - StorageHelper(storage::Storage *s) : data_(s) {} - StorageHelper &operator()(storage::Storage *s) { - data_ = s; + Writer& AsMode(const std::string& mode) { + mode_ = mode; + storage_.AddMode(mode); return *this; } - void SetBuffer(const storage::Storage &buffer) { *data_ = buffer; } - void SetBuffer(const std::string &buffer) { data_->ParseFromString(buffer); } - void SetDir(const std::string &dir) { - CHECK(data_) << "no storage instance hold"; - data_->set_dir(dir); + Tablet AddTablet(const std::string& tag) { + // TODO(ChunweiYan) add string check here. + auto tmp = mode_ + "/" + tag; + string::TagEncode(tmp); + auto res = storage_.AddTablet(tmp); + res.SetCaptions(std::vector({mode_})); + return res; } - int64_t timestamp() const { return data_->timestamp(); } - std::string dir() const { return data_->dir(); } - int tablets_size() const { return data_->tablets_size(); } - std::string buffer() const { return data_->SerializeAsString(); } - std::string human_readable_buffer() const; + Storage& storage() { return storage_; } private: - storage::Storage *data_{nullptr}; + Storage storage_; + std::string mode_; }; -class ImHelper { +class Reader { public: - // TODO(ChunweiYan) decouple helper with resource. - ImHelper() { im_.reset(new IM); } - ImHelper(std::unique_ptr im) : im_(std::move(im)) {} + Reader(const std::string& mode, const std::string& dir) + : mode_(mode), reader_(dir) {} - StorageHelper storage() { - return StorageHelper(im_->storage().mutable_data()); - } - TabletHelper tablet(const std::string &tag) { - return TabletHelper(im_->storage().tablet(tag)); + TabletReader tablet(const std::string& tag) { + auto tmp = mode_ + "/" + tag; + string::TagEncode(tmp); + return reader_.tablet(tmp); } - TabletHelper AddTablet(const std::string &tag, int num_samples) { - return TabletHelper(im_->AddTablet(tag, num_samples)); - } - std::mutex &handler() { return im_->handler(); } - void ClearTablets() { im_->storage().mutable_data()->clear_tablets(); } - void StartReadService(const std::string &dir, int msecs) { - im_->SetPersistDest(dir); - im_->MaintainRead(dir, msecs); - } - void StartWriteSerice(const std::string &dir, int msecs) { - im_->SetPersistDest(dir); - im_->MaintainWrite(dir, msecs); - } - void StopService() { im_->executor().Quit(); } - void PersistToDisk() const { im_->PersistToDisk(); } private: - std::unique_ptr im_; + StorageReader reader_; + std::string mode_{"default"}; }; namespace components { -#define ACQUIRE_HANDLER(handler) std::lock_guard ____(*handler); - /* * Read and write support for Scalar component. */ template -class ScalarHelper { -public: - ScalarHelper(storage::Tablet *tablet, std::mutex *handler = nullptr) - : data_(tablet), handler_(handler) {} - ScalarHelper(TabletHelper &tablet, std::mutex *handler = nullptr) - : data_(&tablet.data()), handler_(handler) {} - - void SetCaptions(const std::vector &captions); - - void AddRecord(int id, const std::vector &values); +struct Scalar { + Scalar(Tablet tablet) : tablet_(tablet) { + tablet_.SetType(Tablet::Type::kScalar); + } - std::vector> GetRecords() const; + void SetCaption(const std::string cap) { + tablet_.SetCaptions(std::vector({cap})); + } - std::vector GetIds() const; + void AddRecord(int id, T value) { + auto record = tablet_.AddRecord(); + record.SetId(id); + auto entry = record.AddData(); + entry.Set(value); + } - std::vector GetTimestamps() const; +private: + Tablet tablet_; +}; - std::vector GetCaptions() const; +template +struct ScalarReader { + ScalarReader(TabletReader&& reader) : reader_(reader) {} - size_t GetSize() const { return data_->records_size(); } + std::vector records() const; + std::vector ids() const; + std::vector timestamps() const; + std::string caption() const; + size_t total_records() {return reader_.total_records();} + size_t size() const; private: - storage::Tablet *data_; - std::mutex *handler_; + TabletReader reader_; }; + } // namespace components } // namespace visualdl -#endif // VISUALDL_BACKEND_LOGIC_SDK_H +#endif diff --git a/visualdl/logic/sdk_test.cc b/visualdl/logic/sdk_test.cc index f800f31ee7368ebbb8eb9ae1fe17e3ec454ddf3e..7922a17a6ca7cc3bb4b9de12a7ca5ceee5e8badc 100644 --- a/visualdl/logic/sdk_test.cc +++ b/visualdl/logic/sdk_test.cc @@ -4,78 +4,29 @@ namespace visualdl { -struct ScalarTestHelper { - ImHelper rim; - ImHelper wim; - const std::string dir = "./tmp/sdk_test.test"; - - void operator()(std::function read, std::function write) { - wim.StartWriteSerice(dir, 200); - write(); - - // should wait for the write service create log's path - std::this_thread::sleep_for(std::chrono::milliseconds(400)); - rim.StartReadService(dir, 100); - // should wait for the read service to load "tag0" tablet into memory - std::this_thread::sleep_for(std::chrono::milliseconds(600)); - read(); - } -}; - -TEST(Scalar, set_caption) { - ScalarTestHelper helper; - - const std::vector captions({"train", "test"}); - - auto write = [&] { - auto tablet = helper.wim.AddTablet("tag0", -1); - components::ScalarHelper scalar(tablet, &helper.wim.handler()); - - scalar.SetCaptions(captions); - }; - - auto read = [&] { - auto mytablet = helper.rim.tablet("tag0"); - components::ScalarHelper myscalar(mytablet, &helper.rim.handler()); - auto mycaptions = myscalar.GetCaptions(); - - ASSERT_EQ(captions, mycaptions); - }; - - helper(read, write); -} - -TEST(Scalar, add_records) { - ScalarTestHelper helper; - - const std::vector captions({"train", "test"}); - - const size_t nsteps = 100; - - auto write = [&] { - auto tablet = helper.wim.AddTablet("tag0", -1); - components::ScalarHelper scalar(tablet, &helper.wim.handler()); - - scalar.SetCaptions(captions); - - for (int i = 0; i < nsteps; i++) { - scalar.AddRecord(i * 10, std::vector({(float)i, (float)i + 1})); - } - }; - - auto read = [&] { - auto mytablet = helper.rim.tablet("tag0"); - components::ScalarHelper myscalar(mytablet, &helper.rim.handler()); - - auto records = myscalar.GetRecords(); - ASSERT_EQ(records.size(), nsteps); - - for (int i = 0; i < nsteps; i++) { - ASSERT_EQ(records[i], std::vector({(float)i, (float)i + 1})); - } - }; - - helper(read, write); +TEST(Scalar, write) { + const auto dir = "./tmp/sdk_test"; + Storage storage; + // write disk every time + storage.meta.cycle = 1; + storage.SetDir(dir); + auto tablet = storage.AddTablet("scalar0"); + components::Scalar scalar(tablet); + scalar.SetCaption("train"); + scalar.AddRecord(0, 12); + storage.PersistToDisk(); + + // read from disk + StorageReader reader(dir); + auto tablet_reader = reader.tablet("scalar0"); + auto scalar_reader = components::ScalarReader(std::move(tablet_reader)); + auto captioin = scalar_reader.caption(); + ASSERT_EQ(captioin, "train"); + ASSERT_EQ(scalar_reader.total_records(), 1); + auto record = scalar_reader.records(); + ASSERT_EQ(record.size(), 1); + // check the first entry of first record + ASSERT_EQ(record.front(), 12); } } // namespace visualdl diff --git a/visualdl/python/CMakeLists.txt b/visualdl/python/CMakeLists.txt index def51de942894def455976795445e078ea6fb2d8..2f37afb567abdd5930eeb1ebb4fe278beff741e9 100644 --- a/visualdl/python/CMakeLists.txt +++ b/visualdl/python/CMakeLists.txt @@ -10,4 +10,4 @@ function(py_test TARGET_NAME) ) endfunction() -py_test(test_summary SRCS test_summary.py) +py_test(test_summary SRCS test_storage.py) diff --git a/visualdl/python/storage.py b/visualdl/python/storage.py new file mode 100644 index 0000000000000000000000000000000000000000..b04d6a3ca26e4e5bfe534d0ca87f126ff570a4df --- /dev/null +++ b/visualdl/python/storage.py @@ -0,0 +1,39 @@ +__all__ = [ + 'StorageReader', + 'StorageWriter', +] +import core + +dtypes = ("float", "double", "int32", "int64") + + +class StorageReader(object): + + def __init__(self, mode, dir): + self.reader = core.Reader(mode, dir) + + def scalar(self, tag, type='float'): + type2scalar = { + 'float': self.reader.get_scalar_float, + 'double': self.reader.get_scalar_double, + 'int': self.reader.get_scalar_int, + } + return type2scalar[type](tag) + + +class StorageWriter(object): + + def __init__(self, dir, sync_cycle): + self.writer = core.Writer(dir, sync_cycle) + + def as_mode(self, mode): + self.writer = self.writer.as_mode(mode) + return self + + def scalar(self, tag, type='float'): + type2scalar = { + 'float': self.writer.new_scalar_float, + 'double': self.writer.new_scalar_double, + 'int': self.writer.new_scalar_int, + } + return type2scalar[type](tag) diff --git a/visualdl/python/summary.py b/visualdl/python/summary.py deleted file mode 100644 index db9cb1d2b5b81f0567b6effcd78e3fa910c74af4..0000000000000000000000000000000000000000 --- a/visualdl/python/summary.py +++ /dev/null @@ -1,105 +0,0 @@ -__all__ = [ - 'set_storage', - 'scalar', -] -import core - -dtypes = ("float", "double", "int32", "int64") - -def IM(dir, mode="read", msecs=500): - im = core.Im() - READ = "read" - WRITE = "write" - if mode == READ: - im.start_read_service(dir, msecs) - else: - im.start_write_service(dir, msecs) - return im - - -class _Scalar(object): - ''' - Python syntax wrapper for the core.ScalarHelper object. - ''' - - def __init__(self, core_object): - self._core_object = core_object - - def add(self, id, vs): - ''' - add a scalar record - :param id: int - id in the x-corrdinate - :param vs: list - values - :return: None - ''' - self._core_object.add_record(id, vs) - - def set_captions(self, cs): - ''' - set the captions, one caption for one line. - :param cs: list of str - :return: None - ''' - self._core_object.set_captions(cs) - - @property - def captions(self): - return self._core_object.get_captions() - - @property - def records(self): - ''' - get all the records, format like - [ - [0.1, 0.2], # first record - [0.2, 0.3], # second record - # ... - ] - :return: list of list - ''' - return self._core_object.get_records() - - @property - def ids(self): - ''' - get all the ids for the records - :return: list of int - ''' - return self._core_object.get_ids() - - @property - def timestamps(self): - ''' - get all the timestamps for the records - :return: list of int - ''' - return self._core_object.get_timestamps() - - @property - def size(self): - return self._core_object.get_record_size() - - -def scalar(im, tag, dtype='float'): - ''' - create a scalar component. - - :param tag: str - name of this component. - :param dtype: string - the data type that will be used in underlying storage. - :return: object of core.Tablet - ''' - assert dtype in dtypes, "invalid dtype(%s), should be one of %s" % ( - dtype, str(dtypes)) - tablet = im.add_tablet(tag, -1) - dtype2obj = { - 'float': tablet.as_float_scalar, - 'double': tablet.as_double_scalar, - 'int32': tablet.as_int32_scalar, - 'int64': tablet.as_int64_scalar, - } - obj = dtype2obj[dtype](im) - return _Scalar(obj) diff --git a/visualdl/python/test_storage.py b/visualdl/python/test_storage.py new file mode 100644 index 0000000000000000000000000000000000000000..ef5914990e5cb3c8ae1ea4d18e2bffa6f39c8b89 --- /dev/null +++ b/visualdl/python/test_storage.py @@ -0,0 +1,32 @@ +import storage +import numpy as np +import unittest +import random +import time + +class StorageTest(unittest.TestCase): + def setUp(self): + self.dir = "./tmp/storage_test" + + def test_read(self): + print 'test write' + self.writer = storage.StorageWriter(self.dir, sync_cycle=1).as_mode("train") + scalar = self.writer.scalar("model/scalar/min") + # scalar.set_caption("model/scalar/min") + for i in range(10): + scalar.add_record(i, float(i)) + + print 'test read' + self.reader = storage.StorageReader("train", self.dir) + scalar = self.reader.scalar("model/scalar/min") + self.assertEqual(scalar.caption(), "train") + records = scalar.records() + ids = scalar.ids() + self.assertTrue(np.equal(records, [float(i) for i in range(10)]).all()) + self.assertTrue(np.equal(ids, [float(i) for i in range(10)]).all()) + print 'records', records + print 'ids', ids + + +if __name__ == '__main__': + unittest.main() diff --git a/visualdl/python/test_summary.py b/visualdl/python/test_summary.py deleted file mode 100644 index d4ce733b68463e54d6adfeaa29501139525b87b5..0000000000000000000000000000000000000000 --- a/visualdl/python/test_summary.py +++ /dev/null @@ -1,101 +0,0 @@ -import summary -import numpy as np -import unittest -import random -import time - -once_flag = False - - -class ScalarTester(unittest.TestCase): - def setUp(self): - self.dir = "tmp/summary.test" - # clean path - try: - os.rmdir(self.dir) - except: - pass - self.im = summary.IM(self.dir, "write", 200) - self.tablet_name = "scalar0" - self.scalar = summary.scalar(self.im, self.tablet_name) - self.py_captions = ["train cost", "test cost"] - self.scalar.set_captions(self.py_captions) - - self.py_records = [] - self.py_ids = [] - # write - for i in range(10): - record = [0.1 * i, 0.2 * i] - id = i * 10 - self.py_records.append(record) - self.py_ids.append(id) - self.scalar.add(id, record) - - def test_records(self): - self.assertEqual(self.scalar.size, len(self.py_records)) - for i, record in enumerate(self.scalar.records): - self.assertTrue(np.isclose(record, self.py_records[i]).all()) - - def test_ids(self): - self.assertEqual(len(self.py_ids), self.scalar.size) - for i, id in enumerate(self.scalar.ids): - self.assertEqual(self.py_ids[i], id) - - def test_captions(self): - self.assertEqual(self.scalar.captions, self.py_captions) - - def test_read_records(self): - time.sleep(1) - im = summary.IM(self.dir, "read", 200) - time.sleep(1) - scalar = summary.scalar(im, self.tablet_name) - records = scalar.records - self.assertEqual(len(self.py_records), scalar.size) - for i, record in enumerate(self.scalar.records): - self.assertTrue(np.isclose(record, records[i]).all()) - - def test_read_ids(self): - time.sleep(0.6) - im = summary.IM(self.dir, "read", msecs=200) - time.sleep(0.6) - scalar = summary.scalar(im, self.tablet_name) - self.assertEqual(len(self.py_ids), scalar.size) - for i, id in enumerate(scalar.ids): - self.assertEqual(self.py_ids[i], id) - - def test_read_captions(self): - time.sleep(0.6) - im = summary.IM(self.dir, "read", msecs=200) - time.sleep(0.6) - scalar = summary.scalar(im, self.tablet_name) - self.assertEqual(scalar.captions, self.py_captions) - - def test_mix_read_write(self): - write_im = summary.IM(self.dir, "write", msecs=200) - time.sleep(0.6) - read_im = summary.IM(self.dir, "read", msecs=200) - - scalar_writer = summary.scalar(write_im, self.tablet_name) - scalar_reader = summary.scalar(read_im, self.tablet_name) - - scalar_writer.set_captions(["train cost", "test cost"]) - for i in range(1000): - scalar_writer.add(i, [random.random(), random.random()]) - - scalar_reader.records - - for i in range(500): - scalar_writer.add(i, [random.random(), random.random()]) - - scalar_reader.records - - for i in range(500): - scalar_writer.add(i, [random.random(), random.random()]) - - for i in range(10): - scalar_reader.records - scalar_reader.captions - - -if __name__ == '__main__': - unittest.main() diff --git a/visualdl/storage/CMakeLists.txt b/visualdl/storage/CMakeLists.txt index 090f162ecec3667f6bf47e47c5a10a8215385142..ec9dfbbf8f464a0d55670452d0a429d720b2e31f 100644 --- a/visualdl/storage/CMakeLists.txt +++ b/visualdl/storage/CMakeLists.txt @@ -4,5 +4,12 @@ add_library(storage_proto ${PROTO_SRCS}) add_dependencies(storage_proto protobuf) ## add storage as target +add_library(entry entry.cc entry.h ${PROTO_SRCS} ${PROTO_HDRS}) +add_library(tablet tablet.cc tablet.h ${PROTO_SRCS} ${PROTO_HDRS}) +add_library(record record.cc record.h ${PROTO_SRCS} ${PROTO_HDRS}) add_library(storage storage.cc storage.h ${PROTO_SRCS} ${PROTO_HDRS}) + +add_dependencies(entry storage_proto im) +add_dependencies(record storage_proto entry) +add_dependencies(tablet storage_proto) add_dependencies(storage storage_proto) diff --git a/visualdl/storage/entry.cc b/visualdl/storage/entry.cc new file mode 100644 index 0000000000000000000000000000000000000000..0d4fc8dd3b0e081ed171e3f5fb9699509fdd5d36 --- /dev/null +++ b/visualdl/storage/entry.cc @@ -0,0 +1,61 @@ +#include "visualdl/storage/entry.h" + +namespace visualdl { + +#define IMPL_ENTRY_SET_OR_ADD(method__, ctype__, dtype__, opr__) \ + template <> \ + void Entry::method__(ctype__ v) { \ + entry->set_dtype(storage::DataType::dtype__); \ + entry->opr__(v); \ + WRITE_GUARD \ + } + +IMPL_ENTRY_SET_OR_ADD(Set, int, kInt32, set_i32); +IMPL_ENTRY_SET_OR_ADD(Set, int64_t, kInt64, set_i64); +IMPL_ENTRY_SET_OR_ADD(Set, bool, kBool, set_b); +IMPL_ENTRY_SET_OR_ADD(Set, float, kFloat, set_f); +IMPL_ENTRY_SET_OR_ADD(Set, double, kDouble, set_d); +IMPL_ENTRY_SET_OR_ADD(Add, int, kInt32s, add_i32s); +IMPL_ENTRY_SET_OR_ADD(Add, int64_t, kInt64s, add_i64s); +IMPL_ENTRY_SET_OR_ADD(Add, float, kFloats, add_fs); +IMPL_ENTRY_SET_OR_ADD(Add, double, kDoubles, add_ds); +IMPL_ENTRY_SET_OR_ADD(Add, std::string, kStrings, add_ss); +IMPL_ENTRY_SET_OR_ADD(Add, bool, kBools, add_bs); + +#define IMPL_ENTRY_GET(T, fieldname__) \ + template <> \ + T EntryReader::Get() const { \ + data_.fieldname__(); \ + } + +IMPL_ENTRY_GET(int, i32); +IMPL_ENTRY_GET(int64_t, i64); +IMPL_ENTRY_GET(float, f); +IMPL_ENTRY_GET(double, d); +IMPL_ENTRY_GET(std::string, s); +IMPL_ENTRY_GET(bool, b); + +#define IMPL_ENTRY_GET_MULTI(T, fieldname__) \ + template <> \ + std::vector EntryReader::GetMulti() const { \ + return std::vector(data_.fieldname__().begin(), \ + data_.fieldname__().end()); \ + } + +IMPL_ENTRY_GET_MULTI(int, i32s); +IMPL_ENTRY_GET_MULTI(float, fs); +IMPL_ENTRY_GET_MULTI(double, ds); +IMPL_ENTRY_GET_MULTI(std::string, ss); +IMPL_ENTRY_GET_MULTI(bool, bs); + +template class Entry; +template class Entry; +template class Entry; +template class Entry; + +template class EntryReader; +template class EntryReader; +template class EntryReader; +template class EntryReader; + +} // namespace visualdl diff --git a/visualdl/storage/entry.h b/visualdl/storage/entry.h new file mode 100644 index 0000000000000000000000000000000000000000..060b03827b8bdfd9cfe278c52c31f753e939d28c --- /dev/null +++ b/visualdl/storage/entry.h @@ -0,0 +1,55 @@ +#ifndef VISUALDL_STORAGE_ENTRY_H +#define VISUALDL_STORAGE_ENTRY_H + +#include "visualdl/logic/im.h" +#include "visualdl/storage/storage.pb.h" +#include "visualdl/utils/guard.h" + +namespace visualdl { + +struct Storage; + +/* + * Utility helper for storage::Entry. + */ +template +struct Entry { + DECL_GUARD(Entry) + // use pointer to avoid copy + storage::Entry* entry{nullptr}; + + Entry() {} + explicit Entry(storage::Entry* entry, Storage* parent) + : entry(entry), x_(parent) {} + void operator()(storage::Entry* entry, Storage* parent) { + this->entry = entry; + x_ = parent; + } + + // Set a single value. + void Set(T v); + + // Add a value to repeated message field. + void Add(T v); + + Storage* parent() { return x_; } + +private: + Storage* x_; +}; + +template +struct EntryReader { + EntryReader(storage::Entry x) : data_(x) {} + // Get a single value. + T Get() const; + // Get repeated field. + std::vector GetMulti() const; + +private: + storage::Entry data_; +}; + +} // namespace visualdl + +#endif diff --git a/visualdl/storage/record.cc b/visualdl/storage/record.cc new file mode 100644 index 0000000000000000000000000000000000000000..69d22154c5f8167c71ca7c842652c593292babd6 --- /dev/null +++ b/visualdl/storage/record.cc @@ -0,0 +1 @@ +#include "visualdl/storage/record.h" \ No newline at end of file diff --git a/visualdl/storage/record.h b/visualdl/storage/record.h new file mode 100644 index 0000000000000000000000000000000000000000..4e5fc7fd9c3c9c52f4ad9779517f768710037ef1 --- /dev/null +++ b/visualdl/storage/record.h @@ -0,0 +1,95 @@ +#ifndef VISUALDL_STORAGE_RECORD_H +#define VISUALDL_STORAGE_RECORD_H + +#include "visualdl/logic/im.h" +#include "visualdl/storage/entry.h" +#include "visualdl/storage/storage.pb.h" + +namespace visualdl { + +/* + * A helper for operations on storage::Record + */ +struct Record { + enum Dtype { + kInt32 = 0, + kInt64 = 1, + kFloat = 2, + kDouble = 3, + kString = 4, + kBool = 5, + // entrys + kInt64s = 6, + kFloats = 7, + kDoubles = 8, + kStrings = 9, + kInt32s = 10, + kBools = 11, + kUnknown = 12 + }; + + DECL_GUARD(Record) + + Record(storage::Record* x, Storage* parent) : data_(x), x_(parent) {} + + // write operations + void SetTimeStamp(int64_t x) { + data_->set_timestamp(x); + WRITE_GUARD + } + + void SetId(int64_t id) { + data_->set_id(id); + WRITE_GUARD + } + + void SetDtype(Dtype type) { + data_->set_dtype(storage::DataType(type)); + WRITE_GUARD + } + + template + Entry MutableMeta() { + return Entry(data_->mutable_meta(), parent()); + } + + template + Entry AddData() { + WRITE_GUARD + return Entry(data_->add_data(), parent()); + } + + Storage* parent() { return x_; } + +private: + storage::Record* data_{nullptr}; + Storage* x_; +}; + +struct RecordReader { + RecordReader(storage::Record x) : data_(x) {} + + // read operations + size_t data_size() const { return data_.data_size(); } + + template + EntryReader data(int i) { + return EntryReader(data_.data(i)); + } + int64_t timestamp() const { return data_.timestamp(); } + int64_t id() const { return data_.id(); } + + Record::Dtype dtype() const { return (Record::Dtype)data_.dtype(); } + + template + Entry meta() const { + return data_.meta(); + } + +private: + storage::Record data_; +}; + +} // namespace visualdl + +#endif diff --git a/visualdl/storage/storage.cc b/visualdl/storage/storage.cc index 9d1d393c5a88b87ab5979a4c7e476adf89ed0da7..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 --- a/visualdl/storage/storage.cc +++ b/visualdl/storage/storage.cc @@ -1,113 +0,0 @@ -#include -#include - -#include "visualdl/storage/storage.h" -#include "visualdl/utils/concurrency.h" -#include "visualdl/utils/filesystem.h" - -namespace visualdl { - -const std::string StorageBase::meta_file_name = "storage.meta"; - -std::string StorageBase::meta_path(const std::string &dir) const { - CHECK(!dir.empty()) << "dir is empty"; - return dir + "/" + meta_file_name; -} -std::string StorageBase::tablet_path(const std::string &dir, - const std::string &tag) const { - CHECK(!dir.empty()) << "dir should be set first"; - return dir + "/" + tag; -} - -storage::Tablet *MemoryStorage::NewTablet(const std::string &tag, - int num_samples) { - auto it = tablets_.find(tag); - if (it == tablets_.end()) { - // create new tablet - tablets_[tag] = storage::Tablet(); - tablets_[tag].set_tag(tag); - *storage_.add_tags() = tag; - } else { - return &it->second; - } - return &tablets_[tag]; -} - -storage::Tablet *MemoryStorage::tablet(const std::string &tag) { - auto it = tablets_.find(tag); - CHECK(it != tablets_.end()) << "tablet tagged as " << tag << " not exists"; - return &it->second; -} - -// TODO add some checksum to avoid unnecessary saving -void MemoryStorage::PersistToDisk(const std::string &dir) { - CHECK(!dir.empty()); - storage_.set_dir(dir); - // make a directory if not exist - fs::TryRecurMkdir(dir); - // write storage out - VLOG(2) << "to serize meta to dir " << dir; - fs::SerializeToFile(storage_, meta_path(dir)); - VLOG(2) << "serize meta to dir " << dir; - // write all the tablets - for (auto tag : storage_.tags()) { - auto it = tablets_.find(tag); - CHECK(it != tablets_.end()); - fs::SerializeToFile(it->second, tablet_path(dir, tag)); - } -} - -// TODO add some checksum to avoid unnecessary loading -void MemoryStorage::LoadFromDisk(const std::string &dir) { - CHECK(!dir.empty()) << "dir is empty"; - storage_.set_dir(dir); - // load storage - CHECK(fs::DeSerializeFromFile(&storage_, meta_path(dir))) - << "parse from " << meta_path(dir) << " failed"; - // load all the tablets - for (int i = 0; i < storage_.tags_size(); i++) { - auto tag = storage_.tags(i); - CHECK(fs::DeSerializeFromFile(&tablets_[tag], tablet_path(dir, tag))); - } -} - -void MemoryStorage::StartReadService(const std::string &dir, - int msecs, - std::mutex *handler) { - CHECK(executor_ != nullptr); - CHECK(!dir.empty()) << "dir should be set first"; - cc::PeriodExector::task_t task = [dir, this, handler] { - VLOG(1) << "loading from " << dir; - if (handler != nullptr) { - std::lock_guard _(*handler); - LoadFromDisk(dir); - } else { - LoadFromDisk(dir); - } - return true; - }; - // executor_.Start(); - VLOG(1) << "push read task"; - (*executor_)(std::move(task), msecs); -} - -void MemoryStorage::StartWriteService(const std::string &dir, - int msecs, - std::mutex *handler) { - CHECK(executor_ != nullptr); - CHECK(!dir.empty()) << "dir should be set first"; - storage_.set_dir(dir); - // executor_.Start(); - cc::PeriodExector::task_t task = [dir, handler, this] { - VLOG(2) << "persist to disk"; - if (handler != nullptr) { - std::lock_guard _(*handler); - PersistToDisk(dir); - } else { - PersistToDisk(dir); - } - return true; - }; - (*executor_)(std::move(task), msecs); -} -} // namespace visualdl diff --git a/visualdl/storage/storage.h b/visualdl/storage/storage.h index 3b59e5000e24966b7d64f9a7fbc421119d6ef5a3..506a8de87dca0aabedce27ba3db92e89f9baf6f0 100644 --- a/visualdl/storage/storage.h +++ b/visualdl/storage/storage.h @@ -1,120 +1,140 @@ -#ifndef VISUALDL_STORAGE_H -#define VISUALDL_STORAGE_H +#ifndef VISUALDL_STORAGE_STORAGE_H +#define VISUALDL_STORAGE_STORAGE_H -#include -#include -#include -#include -#include +#include +#include +#include +#include "visualdl/logic/im.h" +#include "visualdl/utils/guard.h" #include "visualdl/storage/storage.pb.h" -#include "visualdl/utils/concurrency.h" +#include "visualdl/storage/tablet.h" +#include "visualdl/utils/filesystem.h" namespace visualdl { -/* - * Generate a tablet path in disk from its tag. - */ -inline std::string GenPathFromTag(const std::string &dir, - const std::string &tag); +static const std::string meta_file_name = "storage.meta"; + +static std::string meta_path(const std::string& dir) { + CHECK(!dir.empty()) << "dir is empty"; + return dir + "/" + meta_file_name; +} +static std::string tablet_path(const std::string& dir, const std::string& tag) { + CHECK(!dir.empty()) << "dir should be set first"; + return dir + "/" + tag; +} + +struct SimpleSyncMeta { + void Inc() { counter++; } + + bool ToSync() { return counter % cycle == 0; } + + size_t counter{0}; + int cycle; +}; /* - * Storage Interface. The might be a bunch of implementations, for example, a - * MemStorage that keep a copy of all the taplets in memory, can be changed with - * a higher performance; a DiskStorage that keep all the data in disk, apply to - * the scenerios where memory consumption should be considered. + * Helper for operations on storage::Storage. */ -class StorageBase { -public: - const static std::string meta_file_name; +struct Storage { + DECL_GUARD(Storage) - enum Type { kMemory = 0, kDisk = 1 }; - // mode of the sevice, either reading or writing. - enum Mode { kRead = 0, kWrite = 1, kNone = 2 }; + mutable SimpleSyncMeta meta; - void SetStorage(const std::string &dir) { + Storage() { data_ = std::make_shared(); } + Storage(const std::shared_ptr& x) : data_(x) { time_t t; time(&t); - storage_.set_timestamp(t); - storage_.set_dir(dir); + data_->set_timestamp(t); } - std::string meta_path(const std::string &dir) const; - std::string tablet_path(const std::string &dir, const std::string &tag) const; - - /* - * Create a new Tablet storage. - */ - virtual storage::Tablet *NewTablet(const std::string &tag, - int num_samples) = 0; - - /* - * Get a tablet from memory, this can be viewed as a cache, if the storage is - * in disk, a hash map in memory will first load the corresponding Tablet - * Protobuf from disk and hold all the changes. - */ - virtual storage::Tablet *tablet(const std::string &tag) = 0; + // write operations + void AddMode(const std::string& x) { + // avoid duplicate modes. + if (modes_.count(x) != 0) return; + *data_->add_modes() = x; + modes_.insert(x); + WRITE_GUARD + } - /* - * Persist the data from cache to disk. Both the memory storage or disk - * storage should write changes to disk for persistence. - */ - virtual void PersistToDisk(const std::string &dir) = 0; + Tablet AddTablet(const std::string& x) { + CHECK(tablets_.count(x) == 0) << "tablet [" << x << "] has existed"; + tablets_[x] = storage::Tablet(); + AddTag(x); + LOG(INFO) << "really add tag " << x; + WRITE_GUARD + return Tablet(&tablets_[x], this); + } + void SetDir(const std::string& dir) { dir_ = dir; } + void PersistToDisk() { PersistToDisk(dir_); } /* - * Load data from disk. + * Save memory to disk. */ - virtual void LoadFromDisk(const std::string &dir) = 0; + void PersistToDisk(const std::string& dir) { + // LOG(INFO) << "persist to disk " << dir; + CHECK(!dir.empty()) << "dir should be set."; + fs::TryRecurMkdir(dir); + + fs::SerializeToFile(*data_, meta_path(dir)); + for (auto tag : data_->tags()) { + auto it = tablets_.find(tag); + CHECK(it != tablets_.end()) << "tag " << tag << " not exist."; + fs::SerializeToFile(it->second, tablet_path(dir, tag)); + } + } - storage::Storage *mutable_data() { return &storage_; } - const storage::Storage &data() { return storage_; } + Storage* parent() { return this; } protected: - storage::Storage storage_; + void AddTag(const std::string& x) { + *data_->add_tags() = x; + } + +private: + std::string dir_; + std::map tablets_; + std::shared_ptr data_; + std::set modes_; }; /* - * Storage in Memory, that will support quick edits on data. + * Storage reader, each interface will trigger a read. */ -class MemoryStorage final : public StorageBase { -public: - MemoryStorage() {} - MemoryStorage(cc::PeriodExector *executor) : executor_(executor) {} - ~MemoryStorage() { - if (executor_ != nullptr) executor_->Quit(); +struct StorageReader { + StorageReader(const std::string& dir) : dir_(dir) {} + + // read operations + std::vector Tags() { + storage::Storage storage; + Reload(storage); + return std::vector(storage.tags().begin(), + storage.tags().end()); + } + std::vector Modes() { + storage::Storage storage; + Reload(storage); + return std::vector(storage.modes().begin(), + storage.modes().end()); } - storage::Tablet *NewTablet(const std::string &tag, int num_samples) override; - - storage::Tablet *tablet(const std::string &tag) override; - - void PersistToDisk(const std::string &dir) override; - - void LoadFromDisk(const std::string &dir) override; - /* - * Create a thread which will keep reading the latest data from the disk to - * memory. - * - * msecs: how many millisecond to sync memory and disk. - */ - void StartReadService(const std::string &dir, int msecs, std::mutex *handler); + TabletReader tablet(const std::string& tag) const { + auto path = tablet_path(dir_, tag); + storage::Tablet tablet; + fs::DeSerializeFromFile(&tablet, path); + return TabletReader(tablet); + } - /* - * Create a thread which will keep writing the latest changes from memory to - * disk. - * - * msecs: how many millisecond to sync memory and disk. - */ - void StartWriteService(const std::string &dir, - int msecs, - std::mutex *handler); +protected: + void Reload(storage::Storage& storage) { + const std::string path = meta_path(dir_); + fs::DeSerializeFromFile(&storage, path); + } private: - std::map tablets_; - // TODO(ChunweiYan) remove executor here. - cc::PeriodExector *executor_{nullptr}; + std::string dir_; }; } // namespace visualdl -#endif // VISUALDL_STORAGE_H +#endif diff --git a/visualdl/storage/storage.proto b/visualdl/storage/storage.proto index e587e011327540476cff69535f16357d750b74a5..381ee91108534883cbd60fa4a4272bf5c1000141 100644 --- a/visualdl/storage/storage.proto +++ b/visualdl/storage/storage.proto @@ -91,25 +91,33 @@ message Record { /* A Tablet stores the records of a component which type is `component` and -indidates as `tag`. +indidated as `tag`. The records will be saved in a file which name contains `tag`. During the running period, `num_records` will be accumulated, and `num_samples` indicates the size of -sample set the -reservoir sampling algorithm will collect. +sample set the reservoir sampling algorithm will collect, if `num_samples` +set to -1, no sample will be applied. */ message Tablet { - // the kinds of the components that supported + // the kinds of the components that supported. enum Type { kScalar = 0; kHistogram = 1; - kGraph = 2; + kImage = 2; } + // The unique identification for this `Tablet`. VisualDL will have no the + // concept of FileWriter like TB. It will store all the tablets in a single + // directory, so it has a `mode` concept. `mode` will be stored in `tag` + // as the prefix, so that the same tablet in different modes will have + // different `tag`. for example, a tablet called "layer/grad/min" in "train" + // and "test" mode will have tags like "train/layer/grad/min" and + // "test/layer/grad/min". + string tag = 6; // type of the component, different component should have different storage // format. Type component = 1; - // records the total count of records, each Write operation should increate - // this value. + // Keep a record of the total count of records, each Write operation should + // increate this value. int64 total_records = 2; // indicate the number of instances to sample, this should be a constant // value. @@ -117,22 +125,21 @@ message Tablet { repeated Record records = 4; // store a meta infomation if all the records share. Entry meta = 5; - // the unique identification for this `Tablet`. - string tag = 6; - // one tablet might have multiple captions, for example, a scalar component - // might have - // two plots labeled "train" and "test". + // one tablet might have just one caption, if not set, it should be the value + // of `mode`. repeated string captions = 7; + + string description = 8; } /* The Storage stores all the records. */ message Storage { - // tags to Tablet, should be thread safe if fix the keys after initialization. - // TODO to delete in the new storage interface. - map tablets = 1; - repeated string tags = 4; - string dir = 2; - int64 timestamp = 3; + // VisualDL will not have the concept like TB's FileWriter, just one storage, + // each tablet has different `mode`. + repeated string modes = 1; + // tags will be used to generate paths of tablets. + repeated string tags = 2; + int64 timestamp = 5; } diff --git a/visualdl/storage/storage_test.cc b/visualdl/storage/storage_test.cc index 8dbc7f566735d5b0af7889e5c365285c60df4573..47f13e8d11e6a20caa16b2fa0a4c2d2b37f70ae9 100644 --- a/visualdl/storage/storage_test.cc +++ b/visualdl/storage/storage_test.cc @@ -1,50 +1,35 @@ #include "visualdl/storage/storage.h" -#include #include +#include namespace visualdl { -using namespace std; - -class MemoryStorageTest : public ::testing::Test { +class StorageTest : public ::testing::Test { public: - void SetUp() override { storage_.SetStorage("./tmp"); } + void SetUp() { + storage.SetDir("./tmp/storage_test"); + storage.meta.cycle = 1; + } - MemoryStorage storage_; + Storage storage; }; -TEST_F(MemoryStorageTest, SetStorage) { - string dir = "./tmp"; - storage_.SetStorage(dir); - - ASSERT_EQ(storage_.data().dir(), dir); -} - -TEST_F(MemoryStorageTest, AddTablet) { - // TODO need to escape tag as name - string tag = "add%20tag0"; - storage_.NewTablet(tag, -1); - - auto* tablet = storage_.tablet(tag); - - ASSERT_TRUE(tablet != nullptr); - ASSERT_EQ(tablet->tag(), tag); -} +TEST_F(StorageTest, main) { + storage.AddMode("train"); + storage.AddMode("test"); -TEST_F(MemoryStorageTest, PersistToDisk) { - const std::string dir = "./tmp/201.test"; - storage_.SetStorage(dir); - string tag = "add%20tag0"; - storage_.NewTablet(tag, -1); + auto tag0 = storage.AddTablet("tag0"); + auto tag1 = storage.AddTablet("tag1"); + auto record = tag0.AddRecord(); + auto entry = record.AddData(); + entry.Set(12); - storage_.PersistToDisk(dir); - LOG(INFO) << "persist to disk"; + StorageReader reader("./tmp/storage_test"); + auto modes = reader.Modes(); - MemoryStorage other; - other.LoadFromDisk(dir); - LOG(INFO) << "read from disk"; - ASSERT_EQ(other.data().SerializeAsString(), - storage_.data().SerializeAsString()); + ASSERT_EQ(modes.size(), 2); + ASSERT_EQ(modes[0], "train"); + ASSERT_EQ(modes[1], "test"); } } // namespace visualdl diff --git a/visualdl/storage/tablet.cc b/visualdl/storage/tablet.cc new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/visualdl/storage/tablet.h b/visualdl/storage/tablet.h new file mode 100644 index 0000000000000000000000000000000000000000..4e6a02d13e280ef0d6e945009e541a9ca82bdc29 --- /dev/null +++ b/visualdl/storage/tablet.h @@ -0,0 +1,104 @@ +#ifndef VISUALDL_TABLET_H +#define VISUALDL_TABLET_H + +#include "visualdl/logic/im.h" +#include "visualdl/storage/record.h" +#include "visualdl/storage/storage.pb.h" +#include "visualdl/utils/string.h" + +namespace visualdl { + +/* + * Tablet is a helper for operations on storage::Tablet. + */ +struct Tablet { + enum Type { kScalar = 0, kHistogram = 1, kImage = 2 }; + + DECL_GUARD(Tablet); + + Tablet(storage::Tablet* x, Storage* parent) : data_(x), x_(parent) {} + + // write operations. + void SetNumSamples(int x) { + data_->set_num_samples(x); + WRITE_GUARD + } + + void SetType(Type type) { + data_->set_component(static_cast(type)); + WRITE_GUARD + } + + void SetTag(const std::string& mode, const std::string& tag) { + auto internal_tag = mode + "/" + tag; + string::TagEncode(internal_tag); + data_->set_tag(internal_tag); + WRITE_GUARD + } + + Record AddRecord() { + IncTotalRecords(); + WRITE_GUARD + return Record(data_->add_records(), parent()); + } + + template + Entry MutableMeta() { + Entry x(data_->mutable_meta(), parent()); + } + + void SetCaptions(const std::vector& xs) { + data_->clear_captions(); + for (const auto& x : xs) { + *data_->add_captions() = x; + } + WRITE_GUARD + } + + void SetDescription(const std::string& x) { + data_->set_description(x); + WRITE_GUARD + } + + void IncTotalRecords() { + data_->set_total_records(data_->total_records() + 1); + WRITE_GUARD + } + + Storage* parent() const { return x_; } + +private: + Storage* x_; + storage::Tablet* data_{nullptr}; +}; + +/* + * Tablet reader, it will hold the protobuf object. + */ +struct TabletReader { + TabletReader(storage::Tablet x) : data_(x) {} + + // read operations. + std::string tag() const { return data_.tag(); } + Tablet::Type type() const { return Tablet::Type(data_.component()); } + int64_t total_records() const { return data_.total_records(); } + int32_t num_samples() const { return data_.num_samples(); } + RecordReader record(int i) const { return RecordReader(data_.records(i)); } + template + EntryReader meta() const { + return EntryReader(data_.meta()); + } + std::vector captions() const { + std::vector x(data_.captions().begin(), + data_.captions().end()); + return x; + } + std::string description() const { return data_.description(); } + +private: + storage::Tablet data_; +}; + +} // namespace visualdl + +#endif diff --git a/visualdl/utils/filesystem.h b/visualdl/utils/filesystem.h index 58438f26c4353fc70b4808239e2d6568d4600972..58d86963ef43d525a81fbf4f0f277046f4161416 100644 --- a/visualdl/utils/filesystem.h +++ b/visualdl/utils/filesystem.h @@ -43,7 +43,7 @@ bool DeSerializeFromFile(T* proto, const std::string& path) { return proto->ParseFromIstream(&file); } -void TryMkdir(const std::string& dir) { +static void TryMkdir(const std::string& dir) { VLOG(1) << "try to mkdir " << dir; struct stat st = {0}; if (stat(dir.c_str(), &st) == -1) { @@ -52,7 +52,7 @@ void TryMkdir(const std::string& dir) { } // Create a path by recursively create directries -void TryRecurMkdir(const std::string& path) { +static void TryRecurMkdir(const std::string& path) { // split path by '/' for (int i = 1; i < path.size() - 1; i++) { if (path[i] == '/') { diff --git a/visualdl/utils/guard.h b/visualdl/utils/guard.h new file mode 100644 index 0000000000000000000000000000000000000000..961f76b9900bf2a139ba22a5e05c3ac0e7ca92a2 --- /dev/null +++ b/visualdl/utils/guard.h @@ -0,0 +1,34 @@ +#ifndef VISUALDL_UTILS_GUARD_H +#define VISUALDL_UTILS_GUARD_H + +namespace visualdl { +namespace guard { + +template +class BasicGuard { +public: + BasicGuard(const T* x) : data_(x) { start(); } + ~BasicGuard() { end(); } + + void start() {} + void end() {} + +private: + const T* data_; +}; + +#define DECL_GUARD(T) \ + using WriteGuard = SimpleWriteSyncGuard; \ + using ReadGuard = guard::BasicGuard; + +// #define DECL_GUARD(T) \ +// using WriteGuard = guard::BasicGuard; \ +// using ReadGuard = guard::BasicGuard; + +#define READ_GUARD ReadGuard _(this); +#define WRITE_GUARD WriteGuard _(this); + +} // namespace guard +} // namespace visualdl + +#endif diff --git a/visualdl/utils/string.h b/visualdl/utils/string.h new file mode 100644 index 0000000000000000000000000000000000000000..f714d7e3884cf2b6bc3b0a73ec67baa71f82b9b8 --- /dev/null +++ b/visualdl/utils/string.h @@ -0,0 +1,29 @@ +#ifndef VISUALDL_UTILS_STRING_H +#define VISUALDL_UTILS_STRING_H + +#include +#include + +namespace visualdl { +namespace string { + +static void TagEncode(std::string& tag) { + for (auto& c : tag) { + if (c == '/') { + c = '%'; + } + } +} + +static void TagDecode(std::string& tag) { + for (auto& c : tag) { + if (c == '%') { + c = '/'; + } + } +} + +} // namespace string +} // namespace visualdl + +#endif