提交 c103645a 编写于 作者: S superjom

refactor finish reader and writer

上级 dacc87a1
...@@ -32,19 +32,17 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR}) ...@@ -32,19 +32,17 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR})
include_directories(${PROJECT_SOURCE_DIR}/thirdparty/local/include) include_directories(${PROJECT_SOURCE_DIR}/thirdparty/local/include)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/storage) add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/storage)
#add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/logic) add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/logic)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/python) add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/python)
add_executable(vl_test add_executable(vl_test
${PROJECT_SOURCE_DIR}/visualdl/test.cc ${PROJECT_SOURCE_DIR}/visualdl/test.cc
${PROJECT_SOURCE_DIR}/visualdl/storage/storage_test.cc ${PROJECT_SOURCE_DIR}/visualdl/storage/storage_test.cc
${PROJECT_SOURCE_DIR}/visualdl/utils/test_concurrency.cc ${PROJECT_SOURCE_DIR}/visualdl/utils/test_concurrency.cc
#${PROJECT_SOURCE_DIR}/visualdl/logic/im_test.cc
#${PROJECT_SOURCE_DIR}/visualdl/logic/sdk_test.cc
${PROJECT_SOURCE_DIR}/visualdl/utils/concurrency.h ${PROJECT_SOURCE_DIR}/visualdl/utils/concurrency.h
${PROJECT_SOURCE_DIR}/visualdl/utils/filesystem.h ${PROJECT_SOURCE_DIR}/visualdl/utils/filesystem.h
) )
target_link_libraries(vl_test storage gtest glog protobuf gflags pthread) target_link_libraries(vl_test storage im gtest glog protobuf gflags pthread)
enable_testing () enable_testing ()
......
#add_library(sdk ${PROJECT_SOURCE_DIR}/visualdl/logic/sdk.cc) #add_library(sdk ${PROJECT_SOURCE_DIR}/visualdl/logic/sdk.cc)
#add_library(im ${PROJECT_SOURCE_DIR}/visualdl/logic/im.cc) add_library(im ${PROJECT_SOURCE_DIR}/visualdl/logic/im.cc)
#add_dependencies(im storage_proto) add_dependencies(im storage_proto)
#add_dependencies(sdk storage_proto) #add_dependencies(sdk storage_proto)
## pybind ## pybind
......
...@@ -2,79 +2,35 @@ ...@@ -2,79 +2,35 @@
#include <ctime> #include <ctime>
#include "visualdl/logic/im.h" #include "visualdl/logic/im.h"
#include "visualdl/storage/entry.h"
#include "visualdl/storage/storage.h"
#include "visualdl/storage/tablet.h"
namespace visualdl { namespace visualdl {
/* template <typename T>
* @num_samples: number of instances to sample void SimpleWriteSyncGuard<T>::Start() {
* @size: counter of the records. CHECK(data_);
* @returns: id of the instance to replace, if drop this instance, return -1. data_->parent()->meta.Inc();
*/
int ReserviorSample(int num_samples, int num_records) {
if (num_records <= num_samples) {
return num_records;
}
std::srand(std::time(0));
float prob = static_cast<float>(std::rand()) / RAND_MAX;
float receive_prob = static_cast<float>(num_samples) / num_records;
if (prob < receive_prob) {
int offset2replace = std::rand() % num_samples;
return offset2replace;
}
return -1;
}
void IM::SetPersistDest(const std::string &path) {
CHECK(storage_->mutable_data()->dir().empty())
<< "duplicate set storage's path";
storage_->mutable_data()->set_dir(path);
}
storage::Tablet *IM::AddTablet(const std::string &tag, int num_samples) {
auto tablet = storage_->NewTablet(tag, num_samples);
return tablet;
} }
void IM::AddRecord(const std::string &tag, const storage::Record &data) { template <typename T>
auto *tablet = storage_->tablet(tag); void SimpleWriteSyncGuard<T>::End() {
CHECK(tablet) << "no tablet called " << tag; CHECK(data_);
if (data_->parent()->meta.ToSync()) {
auto num_records = tablet->total_records(); Sync();
const auto num_samples = tablet->num_samples();
int offset;
// use reservoir sampling or not
if (num_samples > 0) {
offset = ReserviorSample(num_samples, num_records + 1);
if (offset < 0) return;
} else {
offset = num_records;
}
storage::Record *record;
if (offset >= num_records) {
record = tablet->add_records();
} else {
record = tablet->mutable_records(offset);
} }
*record = data;
tablet->set_total_records(num_records + 1);
} }
void IM::Clear() { template <typename T>
auto *data = storage().mutable_data(); void SimpleWriteSyncGuard<T>::Sync() {
data->clear_tablets(); CHECK(data_);
data->clear_dir(); auto* storage = data_->parent();
data->clear_timestamp(); storage->PersistToDisk();
} }
void IM::PersistToDisk() { template class SimpleWriteSyncGuard<Storage>;
CHECK(!storage_->data().dir().empty()) << "path of storage should be set"; template class SimpleWriteSyncGuard<Tablet>;
// TODO make dir first template class SimpleWriteSyncGuard<Record>;
// MakeDir(storage_.data().dir());
storage_->PersistToDisk(storage_->data().dir());
}
} // namespace visualdl } // namespace visualdl
...@@ -6,82 +6,26 @@ ...@@ -6,82 +6,26 @@
#include <mutex> #include <mutex>
#include <string> #include <string>
#include "visualdl/storage/storage.h"
#include "visualdl/utils/concurrency.h" #include "visualdl/utils/concurrency.h"
#include "visualdl/utils/guard.h"
namespace visualdl { namespace visualdl {
/* /*
* IM(Information Maintainer) maintain the Storage singleton in memory, * Simple logic to sync memory to disk.
* pre-compute some the statistical information to help visualizaton.
*
* There should be two processes and each have an IM, one is the web server
* which hold one IM to read the storage, the other is the SDK(python or C++),
* it will get an IM to write latest changes to storage.
*
* An IM have an underlying Storage object, which might be a memory based
* storage or a disk based one, both has the same interfaces those defined by
* class StorageBase.
*
* The SDK's IM will maintain the changes and periodically write to disk, and
* the web server's IM will periodically read latest storage from disk.
*/ */
class IM final { template <typename T>
class SimpleWriteSyncGuard {
public: public:
IM() { storage_.reset(new MemoryStorage(&executor_)); } SimpleWriteSyncGuard(T* x) : data_(x) { Start(); }
// IM(StorageBase::Type type, StorageBase::Mode mode); ~SimpleWriteSyncGuard() { End(); }
~IM() { executor_.Quit(); }
void MaintainRead(const std::string &dir, int msecs) { void Start();
LOG(INFO) << "start maintain read"; void End();
dynamic_cast<MemoryStorage *>(storage_.get()) void Sync();
->StartReadService(dir, msecs, &lock_);
}
void MaintainWrite(const std::string &dir, int msecs) {
dynamic_cast<MemoryStorage *>(storage_.get())
->StartWriteService(dir, msecs, &lock_);
}
/*
* Set the disk path to store the Storage object.
*/
void SetPersistDest(const std::string &path);
storage::Tablet *AddTablet(const std::string &tag, int num_samples);
/*
* @tag: tag of the target Tablet.
* @record: a record
*
* NOTE pass in the serialized protobuf message will trigger copying, but
* simpler to support different Tablet data formats.
*/
void AddRecord(const std::string &tag, const storage::Record &record);
/*
* delete all the information.
*/
void Clear();
/*
* Save the Storage Protobuf to disk.
*/
void PersistToDisk();
StorageBase &storage() { return *storage_; }
cc::PeriodExector &executor() { return executor_; }
std::mutex &handler() { return lock_; }
private: private:
// read write lock for protobuf in memory T* data_{nullptr};
// TODO(ChunweiYan) mutex too heavy here, might change to a message queue to
// reduce the frequency of visiting disk
std::mutex lock_;
std::unique_ptr<StorageBase> storage_;
cc::PeriodExector executor_;
}; };
} // namespace visualdl } // namespace visualdl
......
#include "visualdl/logic/im.h"
#include "gtest/gtest.h"
#include "visualdl/storage/storage.h"
namespace visualdl {
class ImTester : public ::testing::Test {
protected:
void SetUp() override {}
IM im;
};
TEST_F(ImTester, AddTablet) {
im.Clear();
im.AddTablet("tag0", 20);
}
TEST_F(ImTester, AddRecord) {
im.Clear();
im.AddTablet("tag0", 20);
for (int i = 0; i < 100; i++) {
storage::Record rcd;
rcd.set_dtype(storage::DataType::kInt32s);
for (int j = 0; j < 10; j++) {
rcd.add_data()->add_i32s(i * 20 + j);
}
im.AddRecord("tag0", rcd);
}
ASSERT_EQ(im.storage().tablet("tag0")->records_size(), 100UL);
}
} // namespace visualdl
#include "visualdl/logic/sdk.h"
#include <google/protobuf/text_format.h>
namespace visualdl {
#define IMPL_ENTRY_SET_OR_ADD(method__, ctype__, dtype__, opr__) \
template <> \
void EntryHelper<ctype__>::method__(ctype__ v) { \
entry->set_dtype(storage::DataType::dtype__); \
entry->opr__(v); \
}
IMPL_ENTRY_SET_OR_ADD(Set, int32_t, kInt32, set_i32);
IMPL_ENTRY_SET_OR_ADD(Set, int64_t, kInt64, set_i64);
IMPL_ENTRY_SET_OR_ADD(Set, bool, kBool, set_b);
IMPL_ENTRY_SET_OR_ADD(Set, float, kFloat, set_f);
IMPL_ENTRY_SET_OR_ADD(Set, double, kDouble, set_d);
IMPL_ENTRY_SET_OR_ADD(Add, int32_t, kInt32s, add_i32s);
IMPL_ENTRY_SET_OR_ADD(Add, int64_t, kInt64s, add_i64s);
IMPL_ENTRY_SET_OR_ADD(Add, float, kFloats, add_fs);
IMPL_ENTRY_SET_OR_ADD(Add, double, kDoubles, add_ds);
IMPL_ENTRY_SET_OR_ADD(Add, std::string, kStrings, add_ss);
IMPL_ENTRY_SET_OR_ADD(Add, bool, kBools, add_bs);
#define IMPL_ENTRY_GET(T, fieldname__) \
template <> \
T EntryHelper<T>::Get() const { \
return entry->fieldname__(); \
}
IMPL_ENTRY_GET(int32_t, i32);
IMPL_ENTRY_GET(int64_t, i64);
IMPL_ENTRY_GET(float, f);
IMPL_ENTRY_GET(double, d);
IMPL_ENTRY_GET(std::string, s);
IMPL_ENTRY_GET(bool, b);
#define IMPL_ENTRY_GET_MULTI(T, fieldname__) \
template <> \
std::vector<T> EntryHelper<T>::GetMulti() const { \
return std::vector<T>(entry->fieldname__().begin(), \
entry->fieldname__().end()); \
}
IMPL_ENTRY_GET_MULTI(int32_t, i32s);
IMPL_ENTRY_GET_MULTI(int64_t, i64s);
IMPL_ENTRY_GET_MULTI(float, fs);
IMPL_ENTRY_GET_MULTI(double, ds);
IMPL_ENTRY_GET_MULTI(std::string, ss);
IMPL_ENTRY_GET_MULTI(bool, bs);
std::string StorageHelper::human_readable_buffer() const {
std::string buffer;
google::protobuf::TextFormat::PrintToString(*data_, &buffer);
return buffer;
}
std::string TabletHelper::human_readable_buffer() const {
std::string buffer;
google::protobuf::TextFormat::PrintToString(*data_, &buffer);
return buffer;
}
// implementations for components
namespace components {
template <typename T>
void ScalarHelper<T>::SetCaptions(const std::vector<std::string> &captions) {
ACQUIRE_HANDLER(handler_);
CHECK_EQ(data_->captions_size(), 0UL) << "the captions can set only once";
for (int i = 0; i < captions.size(); i++) {
data_->add_captions(captions[i]);
}
}
template <typename T>
void ScalarHelper<T>::AddRecord(int id, const std::vector<T> &values) {
ACQUIRE_HANDLER(handler_);
CHECK_NOTNULL(data_);
CHECK_GT(data_->captions_size(), 0UL) << "captions should be set first";
CHECK_EQ(data_->captions_size(), values.size())
<< "number of values in a record should be compatible with the "
"captions";
// add record data
auto *record = data_->add_records();
auto *data = record->add_data();
EntryHelper<T> entry_helper(data);
for (auto v : values) {
entry_helper.Add(v);
}
// set record id
record->set_id(id);
// set record timestamp
record->set_timestamp(time(NULL));
}
template <typename T>
std::vector<std::vector<T>> ScalarHelper<T>::GetRecords() const {
ACQUIRE_HANDLER(handler_);
std::vector<std::vector<T>> result;
EntryHelper<T> entry_helper;
for (int i = 0; i < data_->records_size(); i++) {
auto *entry = data_->mutable_records(i)->mutable_data(0);
entry_helper(entry);
auto datas = entry_helper.GetMulti();
result.push_back(std::move(datas));
}
return result;
}
template <typename T>
std::vector<int> ScalarHelper<T>::GetIds() const {
ACQUIRE_HANDLER(handler_);
CHECK_NOTNULL(data_);
std::vector<int> result;
for (int i = 0; i < data_->records_size(); i++) {
result.push_back(data_->records(i).id());
}
return result;
}
template <typename T>
std::vector<int> ScalarHelper<T>::GetTimestamps() const {
ACQUIRE_HANDLER(handler_);
CHECK_NOTNULL(data_);
std::vector<int> result;
for (int i = 0; i < data_->records_size(); i++) {
result.push_back(data_->records(i).timestamp());
}
return result;
}
template <typename T>
std::vector<std::string> ScalarHelper<T>::GetCaptions() const {
ACQUIRE_HANDLER(handler_);
return std::vector<std::string>(data_->captions().begin(),
data_->captions().end());
}
template class ScalarHelper<int32_t>;
template class ScalarHelper<int64_t>;
template class ScalarHelper<float>;
template class ScalarHelper<double>;
} // namespace components
} // namespace visualdl
#ifndef VISUALDL_LOGIC_SDK_H #ifndef VISUALDL_LOGIC_SDK_H
#define VISUALDL_LOGIC_SDK_H #define VISUALDL_LOGIC_SDK_H
#include <glog/logging.h> #include "visualdl/storage/storage.h"
#include <time.h> #include "visualdl/storage/tablet.h"
#include <map>
#include "visualdl/logic/im.h"
namespace visualdl { namespace visualdl {
/*
* Utility helper for storage::Entry.
*/
template <typename T>
struct EntryHelper {
// use pointer to avoid copy
storage::Entry *entry{nullptr};
EntryHelper() {}
explicit EntryHelper(storage::Entry *entry) : entry(entry) {}
void operator()(storage::Entry *entry) { this->entry = entry; }
/*
* Set a single value.
*/
void Set(T v);
/*
* Add a value to repeated message field.
*/
void Add(T v);
/*
* Get a single value.
*/
T Get() const;
/*
* Get repeated field.
*/
std::vector<T> GetMulti() const;
};
class TabletHelper {
public:
// basic member getter and setter
std::string record_buffer(int idx) const {
return data_->records(idx).SerializeAsString();
}
size_t records_size() const { return data_->records_size(); }
std::string buffer() const { return data_->SerializeAsString(); }
std::string human_readable_buffer() const;
void SetBuffer(const storage::Tablet &t) { *data_ = t; }
void SetBuffer(const std::string &b) { data_->ParseFromString(b); }
storage::Tablet &data() const { return *data_; }
// constructor that enable concurrency.
TabletHelper(storage::Tablet *t) : data_(t) {}
// data updater that resuage of one instance.
TabletHelper &operator()(storage::Tablet *t) {
data_ = t;
return *this;
}
private:
storage::Tablet *data_;
};
class StorageHelper {
public:
StorageHelper(storage::Storage *s) : data_(s) {}
StorageHelper &operator()(storage::Storage *s) {
data_ = s;
return *this;
}
void SetBuffer(const storage::Storage &buffer) { *data_ = buffer; }
void SetBuffer(const std::string &buffer) { data_->ParseFromString(buffer); }
void SetDir(const std::string &dir) {
CHECK(data_) << "no storage instance hold";
data_->set_dir(dir);
}
int64_t timestamp() const { return data_->timestamp(); }
std::string dir() const { return data_->dir(); }
int tablets_size() const { return data_->tablets_size(); }
std::string buffer() const { return data_->SerializeAsString(); }
std::string human_readable_buffer() const;
private:
storage::Storage *data_{nullptr};
};
class ImHelper {
public:
// TODO(ChunweiYan) decouple helper with resource.
ImHelper() { im_.reset(new IM); }
ImHelper(std::unique_ptr<IM> im) : im_(std::move(im)) {}
StorageHelper storage() {
return StorageHelper(im_->storage().mutable_data());
}
TabletHelper tablet(const std::string &tag) {
return TabletHelper(im_->storage().tablet(tag));
}
TabletHelper AddTablet(const std::string &tag, int num_samples) {
return TabletHelper(im_->AddTablet(tag, num_samples));
}
std::mutex &handler() { return im_->handler(); }
void ClearTablets() { im_->storage().mutable_data()->clear_tablets(); }
void StartReadService(const std::string &dir, int msecs) {
im_->SetPersistDest(dir);
im_->MaintainRead(dir, msecs);
}
void StartWriteSerice(const std::string &dir, int msecs) {
im_->SetPersistDest(dir);
im_->MaintainWrite(dir, msecs);
}
void StopService() { im_->executor().Quit(); }
void PersistToDisk() const { im_->PersistToDisk(); }
private:
std::unique_ptr<IM> im_;
};
namespace components { namespace components {
#define ACQUIRE_HANDLER(handler) std::lock_guard<std::mutex> ____(*handler);
/* /*
* Read and write support for Scalar component. * Read and write support for Scalar component.
*/ */
template <typename T> template <typename T>
class ScalarHelper { class Scalar {
public: public:
ScalarHelper(storage::Tablet *tablet, std::mutex *handler = nullptr) Scalar(Tablet tablet) : tablet_(tablet) { tablet_->SetTag(kScalar); }
: data_(tablet), handler_(handler) {} void SetCaption(const std::string cap) {
ScalarHelper(TabletHelper &tablet, std::mutex *handler = nullptr) tablet_->SetCaptions(std::vector<std::string>({cap}));
: data_(&tablet.data()), handler_(handler) {} }
void SetCaptions(const std::vector<std::string> &captions);
void AddRecord(int id, const std::vector<T> &values);
std::vector<std::vector<T>> GetRecords() const;
std::vector<int> GetIds() const;
std::vector<int> GetTimestamps() const;
std::vector<std::string> GetCaptions() const;
size_t GetSize() const { return data_->records_size(); }
private: private:
storage::Tablet *data_; Tablet tablet_;
std::mutex *handler_;
}; };
} // namespace components } // namespace components
} // namespace visualdl } // namespace visualdl
#endif // VISUALDL_BACKEND_LOGIC_SDK_H #endif
...@@ -5,8 +5,10 @@ ...@@ -5,8 +5,10 @@
namespace visualdl { namespace visualdl {
struct ScalarTestHelper { struct ScalarTestHelper {
ImHelper rim; ImHelper _rim;
ImHelper wim; ImHelper _wim;
ImHelper rim = _rim.AsMode("train");
ImHelper wim = _wim.AsMode("train");
const std::string dir = "./tmp/sdk_test.test"; const std::string dir = "./tmp/sdk_test.test";
void operator()(std::function<void()> read, std::function<void()> write) { void operator()(std::function<void()> read, std::function<void()> write) {
...@@ -78,4 +80,19 @@ TEST(Scalar, add_records) { ...@@ -78,4 +80,19 @@ TEST(Scalar, add_records) {
helper(read, write); helper(read, write);
} }
TEST(Scalar, mode) {
ScalarTestHelper helper;
auto train_wim = helper.wim.AsMode("train");
auto write = [&] {
auto tablet = train_wim.AddTablet("tag1", -1);
components::ScalarHelper<float> scalar(tablet, &train_wim.handler());
scalar.SetCaptions(std::vector<std::string>({"train"}));
scalar.AddRecord(10, std::vector<float>({0.1}));
};
auto reader = [&] {};
}
} // namespace visualdl } // namespace visualdl
#include "visualdl/storage/entry.h"
namespace visualdl {
#define IMPL_ENTRY_SET_OR_ADD(method__, ctype__, dtype__, opr__) \
template <> \
void Entry<ctype__>::method__(ctype__ v) { \
entry->set_dtype(storage::DataType::dtype__); \
entry->opr__(v); \
}
IMPL_ENTRY_SET_OR_ADD(Set, int32_t, kInt32, set_i32);
IMPL_ENTRY_SET_OR_ADD(Set, int64_t, kInt64, set_i64);
IMPL_ENTRY_SET_OR_ADD(Set, bool, kBool, set_b);
IMPL_ENTRY_SET_OR_ADD(Set, float, kFloat, set_f);
IMPL_ENTRY_SET_OR_ADD(Set, double, kDouble, set_d);
IMPL_ENTRY_SET_OR_ADD(Add, int32_t, kInt32s, add_i32s);
IMPL_ENTRY_SET_OR_ADD(Add, int64_t, kInt64s, add_i64s);
IMPL_ENTRY_SET_OR_ADD(Add, float, kFloats, add_fs);
IMPL_ENTRY_SET_OR_ADD(Add, double, kDoubles, add_ds);
IMPL_ENTRY_SET_OR_ADD(Add, std::string, kStrings, add_ss);
IMPL_ENTRY_SET_OR_ADD(Add, bool, kBools, add_bs);
#define IMPL_ENTRY_GET(T, fieldname__) \
template <> \
T EntryReader<T>::Get() const { \
data_.fieldname__(); \
}
IMPL_ENTRY_GET(int32_t, i32);
IMPL_ENTRY_GET(int64_t, i64);
IMPL_ENTRY_GET(float, f);
IMPL_ENTRY_GET(double, d);
IMPL_ENTRY_GET(std::string, s);
IMPL_ENTRY_GET(bool, b);
#define IMPL_ENTRY_GET_MULTI(T, fieldname__) \
template <> \
std::vector<T> EntryReader<T>::GetMulti() const { \
return std::vector<T>(data_.fieldname__().begin(), \
data_.fieldname__().end()); \
}
IMPL_ENTRY_GET_MULTI(int32_t, i32s);
IMPL_ENTRY_GET_MULTI(int64_t, i64s);
IMPL_ENTRY_GET_MULTI(float, fs);
IMPL_ENTRY_GET_MULTI(double, ds);
IMPL_ENTRY_GET_MULTI(std::string, ss);
IMPL_ENTRY_GET_MULTI(bool, bs);
} // namespace visualdl
#ifndef VISUALDL_STORAGE_ENTRY_H
#define VISUALDL_STORAGE_ENTRY_H
#include "visualdl/logic/im.h"
#include "visualdl/storage/storage.pb.h"
#include "visualdl/utils/guard.h"
namespace visualdl {
struct Storage;
/*
* Utility helper for storage::Entry.
*/
template <typename T>
struct Entry {
DECL_GUARD(Entry)
// use pointer to avoid copy
storage::Entry* entry{nullptr};
Entry() {}
explicit Entry(storage::Entry* entry, void* parent)
: entry(entry), x_(parent) {}
void operator()(storage::Entry* entry, void* parent) {
this->entry = entry;
x_ = parent;
}
// Set a single value.
void Set(T v);
// Add a value to repeated message field.
void Add(T v);
Storage* parent() { return x_; }
private:
Storage* x_;
};
template <typename T>
struct EntryReader {
EntryReader(storage::Entry x) : data_(x) {}
// Get a single value.
T Get() const;
// Get repeated field.
std::vector<T> GetMulti() const;
private:
storage::Entry data_;
};
} // namespace visualdl
#endif
#include "visualdl/storage/record.h"
\ No newline at end of file
#ifndef VISUALDL_STORAGE_RECORD_H
#define VISUALDL_STORAGE_RECORD_H
#include "visualdl/logic/im.h"
#include "visualdl/storage/entry.h"
#include "visualdl/storage/storage.pb.h"
namespace visualdl {
/*
* A helper for operations on storage::Record
*/
struct Record {
enum Dtype {
kInt32 = 0,
kInt64 = 1,
kFloat = 2,
kDouble = 3,
kString = 4,
kBool = 5,
// entrys
kInt64s = 6,
kFloats = 7,
kDoubles = 8,
kStrings = 9,
kInt32s = 10,
kBools = 11,
kUnknown = 12
};
DECL_GUARD(Record)
Record(storage::Record* x, Storage* parent) : data_(x), x_(parent) {}
// write operations
void SetTimeStamp(int64_t x) {
data_->set_timestamp(x);
WRITE_GUARD
}
void SetId(int64_t id) {
data_->set_id(id);
WRITE_GUARD
}
void SetDtype(Dtype type) {
data_->set_dtype(storage::DataType(type));
WRITE_GUARD
}
template <typename T>
Entry<T> MutableMeta() {
return Entry<T>(data_->mutable_meta(), parent());
}
template <typename T>
Entry<T> AddData() {
WRITE_GUARD
return Entry<T>(data_->add_data(), parent());
}
Storage* parent() { return x_; }
private:
storage::Record* data_{nullptr};
Storage* x_;
};
struct RecordReader {
RecordReader(storage::Record x) : data_(x) {}
// read operations
size_t data_size() const { return data_.data_size(); }
template <typename T>
EntryReader<T> data(int i) {
return EntryReader<T>(data_.data(i));
}
int64_t timestamp() const { return data_.timestamp(); }
int64_t id() const { return data_.id(); }
Record::Dtype dtype() const { return (Record::Dtype)data_.dtype(); }
template <typename T>
Entry<T> meta() const {
return data_.meta();
}
private:
storage::Record data_;
};
} // namespace visualdl
#endif
#include "visualdl/storage/storage.h"
\ No newline at end of file
...@@ -2,44 +2,132 @@ ...@@ -2,44 +2,132 @@
#define VISUALDL_STORAGE_STORAGE_H #define VISUALDL_STORAGE_STORAGE_H
#include <glog/logging.h> #include <glog/logging.h>
#include <visualdl/utils/guard.h>
#include <vector> #include <vector>
#include "visualdl/logic/im.h"
#include "visualdl/storage/storage.pb.h" #include "visualdl/storage/storage.pb.h"
#include "visualdl/storage/tablet.h" #include "visualdl/storage/tablet.h"
#include "visualdl/utils/filesystem.h"
namespace visualdl { namespace visualdl {
static const std::string meta_file_name = "storage.meta";
static std::string meta_path(const std::string& dir) {
CHECK(!dir.empty()) << "dir is empty";
return dir + "/" + meta_file_name;
}
static std::string tablet_path(const std::string& dir, const std::string& tag) {
CHECK(!dir.empty()) << "dir should be set first";
return dir + "/" + tag;
}
struct SimpleSyncMeta {
void Inc() { counter++; }
bool ToSync() { return counter % cycle == 0; }
size_t counter{0};
int cycle;
};
/* /*
* Helper for operations on storage::Storage. * Helper for operations on storage::Storage.
*/ */
struct Storage { struct Storage {
Storage() {} DECL_GUARD(Storage)
Storage(storage::Storage* x) : data_(x) {
mutable SimpleSyncMeta meta;
Storage() { data_ = std::make_shared<storage::Storage>(); }
Storage(const std::shared_ptr<storage::Storage>& x) : data_(x) {
time_t t; time_t t;
time(&t); time(&t);
data_->set_timestamp(t); data_->set_timestamp(t);
} }
std::vector<std::string> Modes() { // write operations
return std::vector<std::string>(data_->modes().begin(), void AddMode(const std::string& x) {
data_->modes().end()); *data_->add_modes() = x;
WRITE_GUARD
} }
void AddMode(const std::string& x) { *data_->add_modes() = x; }
Tablet AddTablet(const std::string& x) { Tablet AddTablet(const std::string& x) {
AddTag(x); AddTag(x);
CHECK(tablets_.count(x) == 0) << "tablet [" << x << "] has existed"; CHECK(tablets_.count(x) == 0) << "tablet [" << x << "] has existed";
tablets_[x] = storage::Tablet(); tablets_[x] = storage::Tablet();
return Tablet(&tablets_[x]); WRITE_GUARD
return Tablet(&tablets_[x], this);
} }
void SetDir(const std::string& dir) { dir_ = dir; }
void PersistToDisk() { PersistToDisk(dir_); }
/*
* Save memory to disk.
*/
void PersistToDisk(const std::string& dir) {
LOG(INFO) << "persist to disk " << dir;
CHECK(!dir.empty()) << "dir should be set.";
fs::TryRecurMkdir(dir);
fs::SerializeToFile(*data_, meta_path(dir));
for (auto tag : data_->tags()) {
auto it = tablets_.find(tag);
CHECK(it != tablets_.end()) << "tag " << tag << " not exist.";
fs::SerializeToFile(it->second, tablet_path(dir, tag));
}
}
Storage* parent() { return this; }
protected: protected:
void AddTag(const std::string& x) { *data_->add_tags() = x; } void AddTag(const std::string& x) {
WRITE_GUARD
*data_->add_tags() = x;
}
private: private:
std::string dir_;
std::map<std::string, storage::Tablet> tablets_; std::map<std::string, storage::Tablet> tablets_;
storage::Storage* data_{nullptr}; std::shared_ptr<storage::Storage> data_;
};
/*
* Storage reader, each interface will trigger a read.
*/
struct StorageReader {
StorageReader(const std::string& dir) : dir_(dir) {}
// read operations
std::vector<std::string> Tags() {
storage::Storage storage;
Reload(storage);
return std::vector<std::string>(storage.tags().begin(),
storage.tags().end());
}
std::vector<std::string> Modes() {
storage::Storage storage;
Reload(storage);
return std::vector<std::string>(storage.modes().begin(),
storage.modes().end());
}
TabletReader tablet(const std::string& tag) const {
auto path = tablet_path(dir_, tag);
storage::Tablet tablet;
fs::DeSerializeFromFile(&tablet, path);
return TabletReader(tablet);
}
protected:
void Reload(storage::Storage& storage) {
const std::string path = meta_path(dir_);
fs::DeSerializeFromFile(&storage, path);
}
private:
std::string dir_;
}; };
} // namespace visualdl } // namespace visualdl
......
...@@ -6,20 +6,25 @@ ...@@ -6,20 +6,25 @@
namespace visualdl { namespace visualdl {
class StorageTest : public ::testing::Test { class StorageTest : public ::testing::Test {
public: public:
void SetUp() { storage.reset(new Storage(&data_)); } void SetUp() {
storage.SetDir("./tmp/storage_test");
storage.meta.cycle = 2;
}
storage::Storage data_; Storage storage;
std::unique_ptr<Storage> storage;
}; };
TEST_F(StorageTest, main) { TEST_F(StorageTest, main) {
storage->AddMode("train"); storage.AddMode("train");
storage->AddMode("test"); storage.AddMode("test");
auto tag0 = storage->AddTablet("tag0"); auto tag0 = storage.AddTablet("tag0");
auto tag1 = storage->AddTablet("tag1"); auto tag1 = storage.AddTablet("tag1");
StorageReader reader("./tmp/storage_test");
auto modes = reader.Modes();
auto modes = storage->Modes();
ASSERT_EQ(modes.size(), 2); ASSERT_EQ(modes.size(), 2);
ASSERT_EQ(modes[0], "train"); ASSERT_EQ(modes[0], "train");
ASSERT_EQ(modes[1], "test"); ASSERT_EQ(modes[1], "test");
......
#ifndef VISUALDL_TABLET_H
#define VISUALDL_TABLET_H
#include "visualdl/logic/im.h"
#include "visualdl/storage/record.h"
#include "visualdl/storage/storage.pb.h"
#include "visualdl/utils/string.h"
namespace visualdl {
/*
* Tablet is a helper for operations on storage::Tablet.
*/
struct Tablet {
enum Type { kScalar = 0, kHistogram = 1, kImage = 2 };
DECL_GUARD(Tablet);
Tablet(storage::Tablet* x, Storage* parent) : data_(x), x_(parent) {}
// write operations.
void SetNumSamples(int x) {
data_->set_num_samples(x);
WRITE_GUARD
}
void SetType(Type type) {
data_->set_component(static_cast<storage::Tablet::Type>(type));
WRITE_GUARD
}
void SetTag(const std::string& mode, const std::string& tag) {
auto internal_tag = mode + "/" + tag;
string::TagEncode(internal_tag);
data_->set_tag(internal_tag);
WRITE_GUARD
}
Record AddRecord() {
IncTotalRecords();
WRITE_GUARD
return Record(data_->add_records(), parent());
}
template <typename T>
Entry<T> MutableMeta() {
Entry<T> x(data_->mutable_meta(), parent());
}
void SetCaptions(const std::vector<std::string>& xs) {
for (const auto& x : xs) {
*data_->add_captions() = x;
}
WRITE_GUARD
}
void SetDescription(const std::string& x) {
data_->set_description(x);
WRITE_GUARD
}
void IncTotalRecords() {
data_->set_total_records(data_->total_records() + 1);
WRITE_GUARD
}
Storage* parent() const { return x_; }
private:
Storage* x_;
storage::Tablet* data_{nullptr};
};
/*
* Tablet reader, it will hold the protobuf object.
*/
struct TabletReader {
TabletReader(storage::Tablet x) : data_(x) {}
// read operations.
std::string tag() const { return data_.tag(); }
Tablet::Type type() const { return Tablet::Type(data_.component()); }
int64_t total_records() const { return data_.total_records(); }
int32_t num_samples() const { return data_.num_samples(); }
RecordReader record(int i) const { return RecordReader(data_.records(i)); }
template <typename T>
EntryReader<T> meta() const {
return EntryReader<T>(data_.meta());
}
std::vector<std::string> captions() const {
std::vector<std::string> x(data_.captions().begin(),
data_.captions().end());
return x;
}
std::string description() const { return data_.description(); }
private:
storage::Tablet data_;
};
} // namespace visualdl
#endif
...@@ -43,7 +43,7 @@ bool DeSerializeFromFile(T* proto, const std::string& path) { ...@@ -43,7 +43,7 @@ bool DeSerializeFromFile(T* proto, const std::string& path) {
return proto->ParseFromIstream(&file); return proto->ParseFromIstream(&file);
} }
void TryMkdir(const std::string& dir) { static void TryMkdir(const std::string& dir) {
VLOG(1) << "try to mkdir " << dir; VLOG(1) << "try to mkdir " << dir;
struct stat st = {0}; struct stat st = {0};
if (stat(dir.c_str(), &st) == -1) { if (stat(dir.c_str(), &st) == -1) {
...@@ -52,7 +52,7 @@ void TryMkdir(const std::string& dir) { ...@@ -52,7 +52,7 @@ void TryMkdir(const std::string& dir) {
} }
// Create a path by recursively create directries // Create a path by recursively create directries
void TryRecurMkdir(const std::string& path) { static void TryRecurMkdir(const std::string& path) {
// split path by '/' // split path by '/'
for (int i = 1; i < path.size() - 1; i++) { for (int i = 1; i < path.size() - 1; i++) {
if (path[i] == '/') { if (path[i] == '/') {
......
#ifndef VISUALDL_UTILS_GUARD_H
#define VISUALDL_UTILS_GUARD_H
namespace visualdl {
namespace guard {
template <typename T>
class BasicGuard {
public:
BasicGuard(const T* x) : data_(x) { start(); }
~BasicGuard() { end(); }
void start() {}
void end() {}
private:
const T* data_;
};
#define DECL_GUARD(T) \
using WriteGuard = SimpleWriteSyncGuard<T>; \
using ReadGuard = guard::BasicGuard<T>;
// #define DECL_GUARD(T) \
// using WriteGuard = guard::BasicGuard<T>; \
// using ReadGuard = guard::BasicGuard<T>;
#define READ_GUARD ReadGuard _(this);
#define WRITE_GUARD WriteGuard _(this);
} // namespace guard
} // namespace visualdl
#endif
#ifndef VISUALDL_UTILS_STRING_H
#define VISUALDL_UTILS_STRING_H
#include <sstream>
#include <string>
namespace visualdl {
namespace string {
static void TagEncode(std::string& tag) {
for (auto& c : tag) {
if (c == '/') {
c = '%';
}
}
}
static void TagDecode(std::string& tag) {
for (auto& c : tag) {
if (c == '%') {
c = '/';
}
}
}
} // namespace string
} // namespace visualdl
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册