From 67960ad86fada70939a408b353bf64ccf28a3e91 Mon Sep 17 00:00:00 2001 From: superjom Date: Fri, 22 Dec 2017 16:35:46 +0800 Subject: [PATCH] refactor finish reader and writer --- CMakeLists.txt | 6 +- visualdl/logic/CMakeLists.txt | 4 +- visualdl/logic/im.cc | 84 ++++------------- visualdl/logic/im.h | 76 ++------------- visualdl/logic/im_test.cc | 37 -------- visualdl/logic/sdk.cc | 150 ------------------------------ visualdl/logic/sdk.h | 154 ++----------------------------- visualdl/logic/sdk_test.cc | 21 ++++- visualdl/storage/entry.cc | 51 ++++++++++ visualdl/storage/entry.h | 55 +++++++++++ visualdl/storage/record.cc | 1 + visualdl/storage/record.h | 95 +++++++++++++++++++ visualdl/storage/storage.cc | 1 - visualdl/storage/storage.h | 108 ++++++++++++++++++++-- visualdl/storage/storage_test.cc | 21 +++-- visualdl/storage/tablet.cc | 0 visualdl/storage/tablet.h | 103 +++++++++++++++++++++ visualdl/utils/filesystem.h | 4 +- visualdl/utils/guard.h | 34 +++++++ visualdl/utils/string.h | 29 ++++++ 20 files changed, 543 insertions(+), 491 deletions(-) delete mode 100644 visualdl/logic/im_test.cc create mode 100644 visualdl/storage/entry.cc create mode 100644 visualdl/storage/entry.h create mode 100644 visualdl/storage/record.cc create mode 100644 visualdl/storage/record.h create mode 100644 visualdl/storage/tablet.cc create mode 100644 visualdl/storage/tablet.h create mode 100644 visualdl/utils/guard.h create mode 100644 visualdl/utils/string.h diff --git a/CMakeLists.txt b/CMakeLists.txt index cf8d61f8..57f68632 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,19 +32,17 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR}) include_directories(${PROJECT_SOURCE_DIR}/thirdparty/local/include) add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/storage) -#add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/logic) +add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/logic) add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/python) add_executable(vl_test ${PROJECT_SOURCE_DIR}/visualdl/test.cc ${PROJECT_SOURCE_DIR}/visualdl/storage/storage_test.cc ${PROJECT_SOURCE_DIR}/visualdl/utils/test_concurrency.cc - #${PROJECT_SOURCE_DIR}/visualdl/logic/im_test.cc - #${PROJECT_SOURCE_DIR}/visualdl/logic/sdk_test.cc ${PROJECT_SOURCE_DIR}/visualdl/utils/concurrency.h ${PROJECT_SOURCE_DIR}/visualdl/utils/filesystem.h ) -target_link_libraries(vl_test storage gtest glog protobuf gflags pthread) +target_link_libraries(vl_test storage im gtest glog protobuf gflags pthread) enable_testing () diff --git a/visualdl/logic/CMakeLists.txt b/visualdl/logic/CMakeLists.txt index e0d02e51..f1a4250d 100644 --- a/visualdl/logic/CMakeLists.txt +++ b/visualdl/logic/CMakeLists.txt @@ -1,6 +1,6 @@ #add_library(sdk ${PROJECT_SOURCE_DIR}/visualdl/logic/sdk.cc) -#add_library(im ${PROJECT_SOURCE_DIR}/visualdl/logic/im.cc) -#add_dependencies(im storage_proto) +add_library(im ${PROJECT_SOURCE_DIR}/visualdl/logic/im.cc) +add_dependencies(im storage_proto) #add_dependencies(sdk storage_proto) ## pybind diff --git a/visualdl/logic/im.cc b/visualdl/logic/im.cc index 624fbfbb..f8fe1924 100644 --- a/visualdl/logic/im.cc +++ b/visualdl/logic/im.cc @@ -2,79 +2,35 @@ #include #include "visualdl/logic/im.h" +#include "visualdl/storage/entry.h" +#include "visualdl/storage/storage.h" +#include "visualdl/storage/tablet.h" namespace visualdl { -/* - * @num_samples: number of instances to sample - * @size: counter of the records. - * @returns: id of the instance to replace, if drop this instance, return -1. - */ -int ReserviorSample(int num_samples, int num_records) { - if (num_records <= num_samples) { - return num_records; - } - - std::srand(std::time(0)); - float prob = static_cast(std::rand()) / RAND_MAX; - float receive_prob = static_cast(num_samples) / num_records; - if (prob < receive_prob) { - int offset2replace = std::rand() % num_samples; - return offset2replace; - } - return -1; -} - -void IM::SetPersistDest(const std::string &path) { - CHECK(storage_->mutable_data()->dir().empty()) - << "duplicate set storage's path"; - storage_->mutable_data()->set_dir(path); -} - -storage::Tablet *IM::AddTablet(const std::string &tag, int num_samples) { - auto tablet = storage_->NewTablet(tag, num_samples); - return tablet; +template +void SimpleWriteSyncGuard::Start() { + CHECK(data_); + data_->parent()->meta.Inc(); } -void IM::AddRecord(const std::string &tag, const storage::Record &data) { - auto *tablet = storage_->tablet(tag); - CHECK(tablet) << "no tablet called " << tag; - - auto num_records = tablet->total_records(); - const auto num_samples = tablet->num_samples(); - - int offset; - // use reservoir sampling or not - if (num_samples > 0) { - offset = ReserviorSample(num_samples, num_records + 1); - if (offset < 0) return; - } else { - offset = num_records; - } - - storage::Record *record; - if (offset >= num_records) { - record = tablet->add_records(); - } else { - record = tablet->mutable_records(offset); +template +void SimpleWriteSyncGuard::End() { + CHECK(data_); + if (data_->parent()->meta.ToSync()) { + Sync(); } - - *record = data; - tablet->set_total_records(num_records + 1); } -void IM::Clear() { - auto *data = storage().mutable_data(); - data->clear_tablets(); - data->clear_dir(); - data->clear_timestamp(); +template +void SimpleWriteSyncGuard::Sync() { + CHECK(data_); + auto* storage = data_->parent(); + storage->PersistToDisk(); } -void IM::PersistToDisk() { - CHECK(!storage_->data().dir().empty()) << "path of storage should be set"; - // TODO make dir first - // MakeDir(storage_.data().dir()); - storage_->PersistToDisk(storage_->data().dir()); -} +template class SimpleWriteSyncGuard; +template class SimpleWriteSyncGuard; +template class SimpleWriteSyncGuard; } // namespace visualdl diff --git a/visualdl/logic/im.h b/visualdl/logic/im.h index abbbc989..d503a58c 100644 --- a/visualdl/logic/im.h +++ b/visualdl/logic/im.h @@ -6,82 +6,26 @@ #include #include -#include "visualdl/storage/storage.h" #include "visualdl/utils/concurrency.h" +#include "visualdl/utils/guard.h" namespace visualdl { /* - * IM(Information Maintainer) maintain the Storage singleton in memory, - * pre-compute some the statistical information to help visualizaton. - * - * There should be two processes and each have an IM, one is the web server - * which hold one IM to read the storage, the other is the SDK(python or C++), - * it will get an IM to write latest changes to storage. - * - * An IM have an underlying Storage object, which might be a memory based - * storage or a disk based one, both has the same interfaces those defined by - * class StorageBase. - * - * The SDK's IM will maintain the changes and periodically write to disk, and - * the web server's IM will periodically read latest storage from disk. + * Simple logic to sync memory to disk. */ -class IM final { +template +class SimpleWriteSyncGuard { public: - IM() { storage_.reset(new MemoryStorage(&executor_)); } - // IM(StorageBase::Type type, StorageBase::Mode mode); - ~IM() { executor_.Quit(); } + SimpleWriteSyncGuard(T* x) : data_(x) { Start(); } + ~SimpleWriteSyncGuard() { End(); } - void MaintainRead(const std::string &dir, int msecs) { - LOG(INFO) << "start maintain read"; - dynamic_cast(storage_.get()) - ->StartReadService(dir, msecs, &lock_); - } - - void MaintainWrite(const std::string &dir, int msecs) { - dynamic_cast(storage_.get()) - ->StartWriteService(dir, msecs, &lock_); - } - - /* - * Set the disk path to store the Storage object. - */ - void SetPersistDest(const std::string &path); - - storage::Tablet *AddTablet(const std::string &tag, int num_samples); - - /* - * @tag: tag of the target Tablet. - * @record: a record - * - * NOTE pass in the serialized protobuf message will trigger copying, but - * simpler to support different Tablet data formats. - */ - void AddRecord(const std::string &tag, const storage::Record &record); - - /* - * delete all the information. - */ - void Clear(); - - /* - * Save the Storage Protobuf to disk. - */ - void PersistToDisk(); - - StorageBase &storage() { return *storage_; } - - cc::PeriodExector &executor() { return executor_; } - - std::mutex &handler() { return lock_; } + void Start(); + void End(); + void Sync(); private: - // read write lock for protobuf in memory - // TODO(ChunweiYan) mutex too heavy here, might change to a message queue to - // reduce the frequency of visiting disk - std::mutex lock_; - std::unique_ptr storage_; - cc::PeriodExector executor_; + T* data_{nullptr}; }; } // namespace visualdl diff --git a/visualdl/logic/im_test.cc b/visualdl/logic/im_test.cc deleted file mode 100644 index 37246c17..00000000 --- a/visualdl/logic/im_test.cc +++ /dev/null @@ -1,37 +0,0 @@ -#include "visualdl/logic/im.h" - -#include "gtest/gtest.h" - -#include "visualdl/storage/storage.h" - -namespace visualdl { - -class ImTester : public ::testing::Test { -protected: - void SetUp() override {} - - IM im; -}; - -TEST_F(ImTester, AddTablet) { - im.Clear(); - im.AddTablet("tag0", 20); -} - -TEST_F(ImTester, AddRecord) { - im.Clear(); - - im.AddTablet("tag0", 20); - for (int i = 0; i < 100; i++) { - storage::Record rcd; - rcd.set_dtype(storage::DataType::kInt32s); - for (int j = 0; j < 10; j++) { - rcd.add_data()->add_i32s(i * 20 + j); - } - im.AddRecord("tag0", rcd); - } - - ASSERT_EQ(im.storage().tablet("tag0")->records_size(), 100UL); -} - -} // namespace visualdl diff --git a/visualdl/logic/sdk.cc b/visualdl/logic/sdk.cc index e3283812..e69de29b 100644 --- a/visualdl/logic/sdk.cc +++ b/visualdl/logic/sdk.cc @@ -1,150 +0,0 @@ -#include "visualdl/logic/sdk.h" - -#include - -namespace visualdl { - -#define IMPL_ENTRY_SET_OR_ADD(method__, ctype__, dtype__, opr__) \ - template <> \ - void EntryHelper::method__(ctype__ v) { \ - entry->set_dtype(storage::DataType::dtype__); \ - entry->opr__(v); \ - } - -IMPL_ENTRY_SET_OR_ADD(Set, int32_t, kInt32, set_i32); -IMPL_ENTRY_SET_OR_ADD(Set, int64_t, kInt64, set_i64); -IMPL_ENTRY_SET_OR_ADD(Set, bool, kBool, set_b); -IMPL_ENTRY_SET_OR_ADD(Set, float, kFloat, set_f); -IMPL_ENTRY_SET_OR_ADD(Set, double, kDouble, set_d); -IMPL_ENTRY_SET_OR_ADD(Add, int32_t, kInt32s, add_i32s); -IMPL_ENTRY_SET_OR_ADD(Add, int64_t, kInt64s, add_i64s); -IMPL_ENTRY_SET_OR_ADD(Add, float, kFloats, add_fs); -IMPL_ENTRY_SET_OR_ADD(Add, double, kDoubles, add_ds); -IMPL_ENTRY_SET_OR_ADD(Add, std::string, kStrings, add_ss); -IMPL_ENTRY_SET_OR_ADD(Add, bool, kBools, add_bs); - -#define IMPL_ENTRY_GET(T, fieldname__) \ - template <> \ - T EntryHelper::Get() const { \ - return entry->fieldname__(); \ - } -IMPL_ENTRY_GET(int32_t, i32); -IMPL_ENTRY_GET(int64_t, i64); -IMPL_ENTRY_GET(float, f); -IMPL_ENTRY_GET(double, d); -IMPL_ENTRY_GET(std::string, s); -IMPL_ENTRY_GET(bool, b); - -#define IMPL_ENTRY_GET_MULTI(T, fieldname__) \ - template <> \ - std::vector EntryHelper::GetMulti() const { \ - return std::vector(entry->fieldname__().begin(), \ - entry->fieldname__().end()); \ - } - -IMPL_ENTRY_GET_MULTI(int32_t, i32s); -IMPL_ENTRY_GET_MULTI(int64_t, i64s); -IMPL_ENTRY_GET_MULTI(float, fs); -IMPL_ENTRY_GET_MULTI(double, ds); -IMPL_ENTRY_GET_MULTI(std::string, ss); -IMPL_ENTRY_GET_MULTI(bool, bs); - -std::string StorageHelper::human_readable_buffer() const { - std::string buffer; - google::protobuf::TextFormat::PrintToString(*data_, &buffer); - return buffer; -} - -std::string TabletHelper::human_readable_buffer() const { - std::string buffer; - google::protobuf::TextFormat::PrintToString(*data_, &buffer); - return buffer; -} - -// implementations for components -namespace components { - -template -void ScalarHelper::SetCaptions(const std::vector &captions) { - ACQUIRE_HANDLER(handler_); - - CHECK_EQ(data_->captions_size(), 0UL) << "the captions can set only once"; - for (int i = 0; i < captions.size(); i++) { - data_->add_captions(captions[i]); - } -} - -template -void ScalarHelper::AddRecord(int id, const std::vector &values) { - ACQUIRE_HANDLER(handler_); - - CHECK_NOTNULL(data_); - CHECK_GT(data_->captions_size(), 0UL) << "captions should be set first"; - CHECK_EQ(data_->captions_size(), values.size()) - << "number of values in a record should be compatible with the " - "captions"; - // add record data - auto *record = data_->add_records(); - auto *data = record->add_data(); - EntryHelper entry_helper(data); - for (auto v : values) { - entry_helper.Add(v); - } - // set record id - record->set_id(id); - // set record timestamp - record->set_timestamp(time(NULL)); -} - -template -std::vector> ScalarHelper::GetRecords() const { - ACQUIRE_HANDLER(handler_); - - std::vector> result; - EntryHelper entry_helper; - for (int i = 0; i < data_->records_size(); i++) { - auto *entry = data_->mutable_records(i)->mutable_data(0); - entry_helper(entry); - auto datas = entry_helper.GetMulti(); - result.push_back(std::move(datas)); - } - return result; -} - -template -std::vector ScalarHelper::GetIds() const { - ACQUIRE_HANDLER(handler_); - CHECK_NOTNULL(data_); - std::vector result; - for (int i = 0; i < data_->records_size(); i++) { - result.push_back(data_->records(i).id()); - } - return result; -} - -template -std::vector ScalarHelper::GetTimestamps() const { - ACQUIRE_HANDLER(handler_); - CHECK_NOTNULL(data_); - std::vector result; - for (int i = 0; i < data_->records_size(); i++) { - result.push_back(data_->records(i).timestamp()); - } - return result; -} - -template -std::vector ScalarHelper::GetCaptions() const { - ACQUIRE_HANDLER(handler_); - return std::vector(data_->captions().begin(), - data_->captions().end()); -} - -template class ScalarHelper; -template class ScalarHelper; -template class ScalarHelper; -template class ScalarHelper; - -} // namespace components - -} // namespace visualdl diff --git a/visualdl/logic/sdk.h b/visualdl/logic/sdk.h index b74e3317..b3c12f91 100644 --- a/visualdl/logic/sdk.h +++ b/visualdl/logic/sdk.h @@ -1,164 +1,28 @@ #ifndef VISUALDL_LOGIC_SDK_H #define VISUALDL_LOGIC_SDK_H -#include -#include -#include - -#include "visualdl/logic/im.h" +#include "visualdl/storage/storage.h" +#include "visualdl/storage/tablet.h" namespace visualdl { - -/* - * Utility helper for storage::Entry. - */ -template -struct EntryHelper { - // use pointer to avoid copy - storage::Entry *entry{nullptr}; - - EntryHelper() {} - explicit EntryHelper(storage::Entry *entry) : entry(entry) {} - void operator()(storage::Entry *entry) { this->entry = entry; } - - /* - * Set a single value. - */ - void Set(T v); - - /* - * Add a value to repeated message field. - */ - void Add(T v); - - /* - * Get a single value. - */ - T Get() const; - - /* - * Get repeated field. - */ - std::vector GetMulti() const; -}; - -class TabletHelper { -public: - // basic member getter and setter - std::string record_buffer(int idx) const { - return data_->records(idx).SerializeAsString(); - } - size_t records_size() const { return data_->records_size(); } - std::string buffer() const { return data_->SerializeAsString(); } - std::string human_readable_buffer() const; - void SetBuffer(const storage::Tablet &t) { *data_ = t; } - void SetBuffer(const std::string &b) { data_->ParseFromString(b); } - storage::Tablet &data() const { return *data_; } - - // constructor that enable concurrency. - TabletHelper(storage::Tablet *t) : data_(t) {} - // data updater that resuage of one instance. - TabletHelper &operator()(storage::Tablet *t) { - data_ = t; - return *this; - } - -private: - storage::Tablet *data_; -}; - -class StorageHelper { -public: - StorageHelper(storage::Storage *s) : data_(s) {} - StorageHelper &operator()(storage::Storage *s) { - data_ = s; - return *this; - } - - void SetBuffer(const storage::Storage &buffer) { *data_ = buffer; } - void SetBuffer(const std::string &buffer) { data_->ParseFromString(buffer); } - void SetDir(const std::string &dir) { - CHECK(data_) << "no storage instance hold"; - data_->set_dir(dir); - } - - int64_t timestamp() const { return data_->timestamp(); } - std::string dir() const { return data_->dir(); } - int tablets_size() const { return data_->tablets_size(); } - std::string buffer() const { return data_->SerializeAsString(); } - std::string human_readable_buffer() const; - -private: - storage::Storage *data_{nullptr}; -}; - -class ImHelper { -public: - // TODO(ChunweiYan) decouple helper with resource. - ImHelper() { im_.reset(new IM); } - ImHelper(std::unique_ptr im) : im_(std::move(im)) {} - - StorageHelper storage() { - return StorageHelper(im_->storage().mutable_data()); - } - TabletHelper tablet(const std::string &tag) { - return TabletHelper(im_->storage().tablet(tag)); - } - TabletHelper AddTablet(const std::string &tag, int num_samples) { - return TabletHelper(im_->AddTablet(tag, num_samples)); - } - std::mutex &handler() { return im_->handler(); } - void ClearTablets() { im_->storage().mutable_data()->clear_tablets(); } - void StartReadService(const std::string &dir, int msecs) { - im_->SetPersistDest(dir); - im_->MaintainRead(dir, msecs); - } - void StartWriteSerice(const std::string &dir, int msecs) { - im_->SetPersistDest(dir); - im_->MaintainWrite(dir, msecs); - } - void StopService() { im_->executor().Quit(); } - void PersistToDisk() const { im_->PersistToDisk(); } - -private: - std::unique_ptr im_; -}; - namespace components { -#define ACQUIRE_HANDLER(handler) std::lock_guard ____(*handler); - /* * Read and write support for Scalar component. */ template -class ScalarHelper { +class Scalar { public: - ScalarHelper(storage::Tablet *tablet, std::mutex *handler = nullptr) - : data_(tablet), handler_(handler) {} - ScalarHelper(TabletHelper &tablet, std::mutex *handler = nullptr) - : data_(&tablet.data()), handler_(handler) {} - - void SetCaptions(const std::vector &captions); - - void AddRecord(int id, const std::vector &values); - - std::vector> GetRecords() const; - - std::vector GetIds() const; - - std::vector GetTimestamps() const; - - std::vector GetCaptions() const; - - size_t GetSize() const { return data_->records_size(); } + Scalar(Tablet tablet) : tablet_(tablet) { tablet_->SetTag(kScalar); } + void SetCaption(const std::string cap) { + tablet_->SetCaptions(std::vector({cap})); + } private: - storage::Tablet *data_; - std::mutex *handler_; + Tablet tablet_; }; } // namespace components } // namespace visualdl -#endif // VISUALDL_BACKEND_LOGIC_SDK_H +#endif diff --git a/visualdl/logic/sdk_test.cc b/visualdl/logic/sdk_test.cc index f800f31e..566b3f00 100644 --- a/visualdl/logic/sdk_test.cc +++ b/visualdl/logic/sdk_test.cc @@ -5,8 +5,10 @@ namespace visualdl { struct ScalarTestHelper { - ImHelper rim; - ImHelper wim; + ImHelper _rim; + ImHelper _wim; + ImHelper rim = _rim.AsMode("train"); + ImHelper wim = _wim.AsMode("train"); const std::string dir = "./tmp/sdk_test.test"; void operator()(std::function read, std::function write) { @@ -78,4 +80,19 @@ TEST(Scalar, add_records) { helper(read, write); } +TEST(Scalar, mode) { + ScalarTestHelper helper; + auto train_wim = helper.wim.AsMode("train"); + + auto write = [&] { + auto tablet = train_wim.AddTablet("tag1", -1); + components::ScalarHelper scalar(tablet, &train_wim.handler()); + + scalar.SetCaptions(std::vector({"train"})); + scalar.AddRecord(10, std::vector({0.1})); + }; + + auto reader = [&] {}; +} + } // namespace visualdl diff --git a/visualdl/storage/entry.cc b/visualdl/storage/entry.cc new file mode 100644 index 00000000..78503106 --- /dev/null +++ b/visualdl/storage/entry.cc @@ -0,0 +1,51 @@ +#include "visualdl/storage/entry.h" + +namespace visualdl { + +#define IMPL_ENTRY_SET_OR_ADD(method__, ctype__, dtype__, opr__) \ + template <> \ + void Entry::method__(ctype__ v) { \ + entry->set_dtype(storage::DataType::dtype__); \ + entry->opr__(v); \ + } + +IMPL_ENTRY_SET_OR_ADD(Set, int32_t, kInt32, set_i32); +IMPL_ENTRY_SET_OR_ADD(Set, int64_t, kInt64, set_i64); +IMPL_ENTRY_SET_OR_ADD(Set, bool, kBool, set_b); +IMPL_ENTRY_SET_OR_ADD(Set, float, kFloat, set_f); +IMPL_ENTRY_SET_OR_ADD(Set, double, kDouble, set_d); +IMPL_ENTRY_SET_OR_ADD(Add, int32_t, kInt32s, add_i32s); +IMPL_ENTRY_SET_OR_ADD(Add, int64_t, kInt64s, add_i64s); +IMPL_ENTRY_SET_OR_ADD(Add, float, kFloats, add_fs); +IMPL_ENTRY_SET_OR_ADD(Add, double, kDoubles, add_ds); +IMPL_ENTRY_SET_OR_ADD(Add, std::string, kStrings, add_ss); +IMPL_ENTRY_SET_OR_ADD(Add, bool, kBools, add_bs); + +#define IMPL_ENTRY_GET(T, fieldname__) \ + template <> \ + T EntryReader::Get() const { \ + data_.fieldname__(); \ + } + +IMPL_ENTRY_GET(int32_t, i32); +IMPL_ENTRY_GET(int64_t, i64); +IMPL_ENTRY_GET(float, f); +IMPL_ENTRY_GET(double, d); +IMPL_ENTRY_GET(std::string, s); +IMPL_ENTRY_GET(bool, b); + +#define IMPL_ENTRY_GET_MULTI(T, fieldname__) \ + template <> \ + std::vector EntryReader::GetMulti() const { \ + return std::vector(data_.fieldname__().begin(), \ + data_.fieldname__().end()); \ + } + +IMPL_ENTRY_GET_MULTI(int32_t, i32s); +IMPL_ENTRY_GET_MULTI(int64_t, i64s); +IMPL_ENTRY_GET_MULTI(float, fs); +IMPL_ENTRY_GET_MULTI(double, ds); +IMPL_ENTRY_GET_MULTI(std::string, ss); +IMPL_ENTRY_GET_MULTI(bool, bs); + +} // namespace visualdl diff --git a/visualdl/storage/entry.h b/visualdl/storage/entry.h new file mode 100644 index 00000000..d22f32c2 --- /dev/null +++ b/visualdl/storage/entry.h @@ -0,0 +1,55 @@ +#ifndef VISUALDL_STORAGE_ENTRY_H +#define VISUALDL_STORAGE_ENTRY_H + +#include "visualdl/logic/im.h" +#include "visualdl/storage/storage.pb.h" +#include "visualdl/utils/guard.h" + +namespace visualdl { + +struct Storage; + +/* + * Utility helper for storage::Entry. + */ +template +struct Entry { + DECL_GUARD(Entry) + // use pointer to avoid copy + storage::Entry* entry{nullptr}; + + Entry() {} + explicit Entry(storage::Entry* entry, void* parent) + : entry(entry), x_(parent) {} + void operator()(storage::Entry* entry, void* parent) { + this->entry = entry; + x_ = parent; + } + + // Set a single value. + void Set(T v); + + // Add a value to repeated message field. + void Add(T v); + + Storage* parent() { return x_; } + +private: + Storage* x_; +}; + +template +struct EntryReader { + EntryReader(storage::Entry x) : data_(x) {} + // Get a single value. + T Get() const; + // Get repeated field. + std::vector GetMulti() const; + +private: + storage::Entry data_; +}; + +} // namespace visualdl + +#endif diff --git a/visualdl/storage/record.cc b/visualdl/storage/record.cc new file mode 100644 index 00000000..69d22154 --- /dev/null +++ b/visualdl/storage/record.cc @@ -0,0 +1 @@ +#include "visualdl/storage/record.h" \ No newline at end of file diff --git a/visualdl/storage/record.h b/visualdl/storage/record.h new file mode 100644 index 00000000..4e5fc7fd --- /dev/null +++ b/visualdl/storage/record.h @@ -0,0 +1,95 @@ +#ifndef VISUALDL_STORAGE_RECORD_H +#define VISUALDL_STORAGE_RECORD_H + +#include "visualdl/logic/im.h" +#include "visualdl/storage/entry.h" +#include "visualdl/storage/storage.pb.h" + +namespace visualdl { + +/* + * A helper for operations on storage::Record + */ +struct Record { + enum Dtype { + kInt32 = 0, + kInt64 = 1, + kFloat = 2, + kDouble = 3, + kString = 4, + kBool = 5, + // entrys + kInt64s = 6, + kFloats = 7, + kDoubles = 8, + kStrings = 9, + kInt32s = 10, + kBools = 11, + kUnknown = 12 + }; + + DECL_GUARD(Record) + + Record(storage::Record* x, Storage* parent) : data_(x), x_(parent) {} + + // write operations + void SetTimeStamp(int64_t x) { + data_->set_timestamp(x); + WRITE_GUARD + } + + void SetId(int64_t id) { + data_->set_id(id); + WRITE_GUARD + } + + void SetDtype(Dtype type) { + data_->set_dtype(storage::DataType(type)); + WRITE_GUARD + } + + template + Entry MutableMeta() { + return Entry(data_->mutable_meta(), parent()); + } + + template + Entry AddData() { + WRITE_GUARD + return Entry(data_->add_data(), parent()); + } + + Storage* parent() { return x_; } + +private: + storage::Record* data_{nullptr}; + Storage* x_; +}; + +struct RecordReader { + RecordReader(storage::Record x) : data_(x) {} + + // read operations + size_t data_size() const { return data_.data_size(); } + + template + EntryReader data(int i) { + return EntryReader(data_.data(i)); + } + int64_t timestamp() const { return data_.timestamp(); } + int64_t id() const { return data_.id(); } + + Record::Dtype dtype() const { return (Record::Dtype)data_.dtype(); } + + template + Entry meta() const { + return data_.meta(); + } + +private: + storage::Record data_; +}; + +} // namespace visualdl + +#endif diff --git a/visualdl/storage/storage.cc b/visualdl/storage/storage.cc index ab5a4bc5..e69de29b 100644 --- a/visualdl/storage/storage.cc +++ b/visualdl/storage/storage.cc @@ -1 +0,0 @@ -#include "visualdl/storage/storage.h" \ No newline at end of file diff --git a/visualdl/storage/storage.h b/visualdl/storage/storage.h index 49762db1..97a89177 100644 --- a/visualdl/storage/storage.h +++ b/visualdl/storage/storage.h @@ -2,44 +2,132 @@ #define VISUALDL_STORAGE_STORAGE_H #include +#include #include +#include "visualdl/logic/im.h" #include "visualdl/storage/storage.pb.h" #include "visualdl/storage/tablet.h" +#include "visualdl/utils/filesystem.h" namespace visualdl { +static const std::string meta_file_name = "storage.meta"; + +static std::string meta_path(const std::string& dir) { + CHECK(!dir.empty()) << "dir is empty"; + return dir + "/" + meta_file_name; +} +static std::string tablet_path(const std::string& dir, const std::string& tag) { + CHECK(!dir.empty()) << "dir should be set first"; + return dir + "/" + tag; +} + +struct SimpleSyncMeta { + void Inc() { counter++; } + + bool ToSync() { return counter % cycle == 0; } + + size_t counter{0}; + int cycle; +}; + /* * Helper for operations on storage::Storage. */ struct Storage { - Storage() {} - Storage(storage::Storage* x) : data_(x) { + DECL_GUARD(Storage) + + mutable SimpleSyncMeta meta; + + Storage() { data_ = std::make_shared(); } + Storage(const std::shared_ptr& x) : data_(x) { time_t t; time(&t); data_->set_timestamp(t); } - std::vector Modes() { - return std::vector(data_->modes().begin(), - data_->modes().end()); + // write operations + void AddMode(const std::string& x) { + *data_->add_modes() = x; + WRITE_GUARD } - void AddMode(const std::string& x) { *data_->add_modes() = x; } - Tablet AddTablet(const std::string& x) { AddTag(x); CHECK(tablets_.count(x) == 0) << "tablet [" << x << "] has existed"; tablets_[x] = storage::Tablet(); - return Tablet(&tablets_[x]); + WRITE_GUARD + return Tablet(&tablets_[x], this); } + void SetDir(const std::string& dir) { dir_ = dir; } + void PersistToDisk() { PersistToDisk(dir_); } + /* + * Save memory to disk. + */ + void PersistToDisk(const std::string& dir) { + LOG(INFO) << "persist to disk " << dir; + CHECK(!dir.empty()) << "dir should be set."; + fs::TryRecurMkdir(dir); + + fs::SerializeToFile(*data_, meta_path(dir)); + for (auto tag : data_->tags()) { + auto it = tablets_.find(tag); + CHECK(it != tablets_.end()) << "tag " << tag << " not exist."; + fs::SerializeToFile(it->second, tablet_path(dir, tag)); + } + } + + Storage* parent() { return this; } + protected: - void AddTag(const std::string& x) { *data_->add_tags() = x; } + void AddTag(const std::string& x) { + WRITE_GUARD + *data_->add_tags() = x; + } private: + std::string dir_; std::map tablets_; - storage::Storage* data_{nullptr}; + std::shared_ptr data_; +}; + +/* + * Storage reader, each interface will trigger a read. + */ +struct StorageReader { + StorageReader(const std::string& dir) : dir_(dir) {} + + // read operations + std::vector Tags() { + storage::Storage storage; + Reload(storage); + return std::vector(storage.tags().begin(), + storage.tags().end()); + } + std::vector Modes() { + storage::Storage storage; + Reload(storage); + return std::vector(storage.modes().begin(), + storage.modes().end()); + } + + TabletReader tablet(const std::string& tag) const { + auto path = tablet_path(dir_, tag); + storage::Tablet tablet; + fs::DeSerializeFromFile(&tablet, path); + return TabletReader(tablet); + } + +protected: + void Reload(storage::Storage& storage) { + const std::string path = meta_path(dir_); + fs::DeSerializeFromFile(&storage, path); + } + +private: + std::string dir_; }; } // namespace visualdl diff --git a/visualdl/storage/storage_test.cc b/visualdl/storage/storage_test.cc index f805fa39..9eebdc41 100644 --- a/visualdl/storage/storage_test.cc +++ b/visualdl/storage/storage_test.cc @@ -6,20 +6,25 @@ namespace visualdl { class StorageTest : public ::testing::Test { public: - void SetUp() { storage.reset(new Storage(&data_)); } + void SetUp() { + storage.SetDir("./tmp/storage_test"); + storage.meta.cycle = 2; + } - storage::Storage data_; - std::unique_ptr storage; + Storage storage; }; TEST_F(StorageTest, main) { - storage->AddMode("train"); - storage->AddMode("test"); + storage.AddMode("train"); + storage.AddMode("test"); - auto tag0 = storage->AddTablet("tag0"); - auto tag1 = storage->AddTablet("tag1"); + auto tag0 = storage.AddTablet("tag0"); + auto tag1 = storage.AddTablet("tag1"); + + + StorageReader reader("./tmp/storage_test"); + auto modes = reader.Modes(); - auto modes = storage->Modes(); ASSERT_EQ(modes.size(), 2); ASSERT_EQ(modes[0], "train"); ASSERT_EQ(modes[1], "test"); diff --git a/visualdl/storage/tablet.cc b/visualdl/storage/tablet.cc new file mode 100644 index 00000000..e69de29b diff --git a/visualdl/storage/tablet.h b/visualdl/storage/tablet.h new file mode 100644 index 00000000..d2de1251 --- /dev/null +++ b/visualdl/storage/tablet.h @@ -0,0 +1,103 @@ +#ifndef VISUALDL_TABLET_H +#define VISUALDL_TABLET_H + +#include "visualdl/logic/im.h" +#include "visualdl/storage/record.h" +#include "visualdl/storage/storage.pb.h" +#include "visualdl/utils/string.h" + +namespace visualdl { + +/* + * Tablet is a helper for operations on storage::Tablet. + */ +struct Tablet { + enum Type { kScalar = 0, kHistogram = 1, kImage = 2 }; + + DECL_GUARD(Tablet); + + Tablet(storage::Tablet* x, Storage* parent) : data_(x), x_(parent) {} + + // write operations. + void SetNumSamples(int x) { + data_->set_num_samples(x); + WRITE_GUARD + } + + void SetType(Type type) { + data_->set_component(static_cast(type)); + WRITE_GUARD + } + + void SetTag(const std::string& mode, const std::string& tag) { + auto internal_tag = mode + "/" + tag; + string::TagEncode(internal_tag); + data_->set_tag(internal_tag); + WRITE_GUARD + } + + Record AddRecord() { + IncTotalRecords(); + WRITE_GUARD + return Record(data_->add_records(), parent()); + } + + template + Entry MutableMeta() { + Entry x(data_->mutable_meta(), parent()); + } + + void SetCaptions(const std::vector& xs) { + for (const auto& x : xs) { + *data_->add_captions() = x; + } + WRITE_GUARD + } + + void SetDescription(const std::string& x) { + data_->set_description(x); + WRITE_GUARD + } + + void IncTotalRecords() { + data_->set_total_records(data_->total_records() + 1); + WRITE_GUARD + } + + Storage* parent() const { return x_; } + +private: + Storage* x_; + storage::Tablet* data_{nullptr}; +}; + +/* + * Tablet reader, it will hold the protobuf object. + */ +struct TabletReader { + TabletReader(storage::Tablet x) : data_(x) {} + + // read operations. + std::string tag() const { return data_.tag(); } + Tablet::Type type() const { return Tablet::Type(data_.component()); } + int64_t total_records() const { return data_.total_records(); } + int32_t num_samples() const { return data_.num_samples(); } + RecordReader record(int i) const { return RecordReader(data_.records(i)); } + template + EntryReader meta() const { + return EntryReader(data_.meta()); + } + std::vector captions() const { + std::vector x(data_.captions().begin(), + data_.captions().end()); + return x; + } + std::string description() const { return data_.description(); } + +private: + storage::Tablet data_; +}; + +} // namespace visualdl + +#endif diff --git a/visualdl/utils/filesystem.h b/visualdl/utils/filesystem.h index 58438f26..58d86963 100644 --- a/visualdl/utils/filesystem.h +++ b/visualdl/utils/filesystem.h @@ -43,7 +43,7 @@ bool DeSerializeFromFile(T* proto, const std::string& path) { return proto->ParseFromIstream(&file); } -void TryMkdir(const std::string& dir) { +static void TryMkdir(const std::string& dir) { VLOG(1) << "try to mkdir " << dir; struct stat st = {0}; if (stat(dir.c_str(), &st) == -1) { @@ -52,7 +52,7 @@ void TryMkdir(const std::string& dir) { } // Create a path by recursively create directries -void TryRecurMkdir(const std::string& path) { +static void TryRecurMkdir(const std::string& path) { // split path by '/' for (int i = 1; i < path.size() - 1; i++) { if (path[i] == '/') { diff --git a/visualdl/utils/guard.h b/visualdl/utils/guard.h new file mode 100644 index 00000000..961f76b9 --- /dev/null +++ b/visualdl/utils/guard.h @@ -0,0 +1,34 @@ +#ifndef VISUALDL_UTILS_GUARD_H +#define VISUALDL_UTILS_GUARD_H + +namespace visualdl { +namespace guard { + +template +class BasicGuard { +public: + BasicGuard(const T* x) : data_(x) { start(); } + ~BasicGuard() { end(); } + + void start() {} + void end() {} + +private: + const T* data_; +}; + +#define DECL_GUARD(T) \ + using WriteGuard = SimpleWriteSyncGuard; \ + using ReadGuard = guard::BasicGuard; + +// #define DECL_GUARD(T) \ +// using WriteGuard = guard::BasicGuard; \ +// using ReadGuard = guard::BasicGuard; + +#define READ_GUARD ReadGuard _(this); +#define WRITE_GUARD WriteGuard _(this); + +} // namespace guard +} // namespace visualdl + +#endif diff --git a/visualdl/utils/string.h b/visualdl/utils/string.h new file mode 100644 index 00000000..f714d7e3 --- /dev/null +++ b/visualdl/utils/string.h @@ -0,0 +1,29 @@ +#ifndef VISUALDL_UTILS_STRING_H +#define VISUALDL_UTILS_STRING_H + +#include +#include + +namespace visualdl { +namespace string { + +static void TagEncode(std::string& tag) { + for (auto& c : tag) { + if (c == '/') { + c = '%'; + } + } +} + +static void TagDecode(std::string& tag) { + for (auto& c : tag) { + if (c == '%') { + c = '/'; + } + } +} + +} // namespace string +} // namespace visualdl + +#endif -- GitLab