提交 91e575b0 编写于 作者: Y Yan Chunwei 提交者: GitHub

Merge pull request #5 from ChunweiYan/feature/add_train_test_mode

......@@ -38,16 +38,14 @@ add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/python)
add_executable(vl_test
${PROJECT_SOURCE_DIR}/visualdl/test.cc
${PROJECT_SOURCE_DIR}/visualdl/storage/storage_test.cc
${PROJECT_SOURCE_DIR}/visualdl/utils/test_concurrency.cc
${PROJECT_SOURCE_DIR}/visualdl/logic/im_test.cc
${PROJECT_SOURCE_DIR}/visualdl/logic/sdk_test.cc
${PROJECT_SOURCE_DIR}/visualdl/utils/test_concurrency.cc
${PROJECT_SOURCE_DIR}/visualdl/utils/concurrency.h
${PROJECT_SOURCE_DIR}/visualdl/utils/filesystem.h
)
target_link_libraries(vl_test storage sdk im gtest glog protobuf gflags pthread)
target_link_libraries(vl_test sdk storage entry im gtest glog protobuf gflags pthread)
enable_testing ()
add_custom_target(test_init COMMAND ./init_test.sh $CMAKE_BINARY_DIR)
add_custom_target(test_init COMMAND $CMAKE_BINARY_DIR)
add_test(NAME vstest COMMAND ./vl_test)
set_target_properties(vl_test PROPERTIES DEPENDS test_init)
add_library(sdk ${PROJECT_SOURCE_DIR}/visualdl/logic/sdk.cc)
#add_library(sdk ${PROJECT_SOURCE_DIR}/visualdl/logic/sdk.cc)
add_library(im ${PROJECT_SOURCE_DIR}/visualdl/logic/im.cc)
add_library(sdk ${PROJECT_SOURCE_DIR}/visualdl/logic/sdk.cc)
add_dependencies(im storage_proto)
add_dependencies(sdk storage_proto)
add_dependencies(sdk entry storage storage_proto)
## pybind
add_library(core SHARED ${PROJECT_SOURCE_DIR}/visualdl/logic/pybind.cc)
add_dependencies(core pybind python im storage sdk protobuf glog)
target_link_libraries(core PRIVATE pybind python im storage sdk protobuf glog)
add_dependencies(core pybind python im entry storage sdk protobuf glog)
target_link_libraries(core PRIVATE pybind entry python im storage sdk protobuf glog)
set_target_properties(core PROPERTIES PREFIX "" SUFFIX ".so")
......@@ -2,79 +2,41 @@
#include <ctime>
#include "visualdl/logic/im.h"
#include "visualdl/storage/entry.h"
#include "visualdl/storage/storage.h"
#include "visualdl/storage/tablet.h"
namespace visualdl {
/*
* @num_samples: number of instances to sample
* @size: counter of the records.
* @returns: id of the instance to replace, if drop this instance, return -1.
*/
int ReserviorSample(int num_samples, int num_records) {
if (num_records <= num_samples) {
return num_records;
}
std::srand(std::time(0));
float prob = static_cast<float>(std::rand()) / RAND_MAX;
float receive_prob = static_cast<float>(num_samples) / num_records;
if (prob < receive_prob) {
int offset2replace = std::rand() % num_samples;
return offset2replace;
}
return -1;
}
void IM::SetPersistDest(const std::string &path) {
CHECK(storage_->mutable_data()->dir().empty())
<< "duplicate set storage's path";
storage_->mutable_data()->set_dir(path);
}
storage::Tablet *IM::AddTablet(const std::string &tag, int num_samples) {
auto tablet = storage_->NewTablet(tag, num_samples);
return tablet;
template <typename T>
void SimpleWriteSyncGuard<T>::Start() {
CHECK(data_);
data_->parent()->meta.Inc();
}
void IM::AddRecord(const std::string &tag, const storage::Record &data) {
auto *tablet = storage_->tablet(tag);
CHECK(tablet) << "no tablet called " << tag;
auto num_records = tablet->total_records();
const auto num_samples = tablet->num_samples();
int offset;
// use reservoir sampling or not
if (num_samples > 0) {
offset = ReserviorSample(num_samples, num_records + 1);
if (offset < 0) return;
} else {
offset = num_records;
}
storage::Record *record;
if (offset >= num_records) {
record = tablet->add_records();
} else {
record = tablet->mutable_records(offset);
template <typename T>
void SimpleWriteSyncGuard<T>::End() {
CHECK(data_);
if (data_->parent()->meta.ToSync()) {
Sync();
}
*record = data;
tablet->set_total_records(num_records + 1);
}
void IM::Clear() {
auto *data = storage().mutable_data();
data->clear_tablets();
data->clear_dir();
data->clear_timestamp();
template <typename T>
void SimpleWriteSyncGuard<T>::Sync() {
CHECK(data_);
auto* storage = data_->parent();
storage->PersistToDisk();
}
void IM::PersistToDisk() {
CHECK(!storage_->data().dir().empty()) << "path of storage should be set";
// TODO make dir first
// MakeDir(storage_.data().dir());
storage_->PersistToDisk(storage_->data().dir());
}
template class SimpleWriteSyncGuard<Storage>;
template class SimpleWriteSyncGuard<Tablet>;
template class SimpleWriteSyncGuard<Record>;
template class SimpleWriteSyncGuard<Entry<float>>;
template class SimpleWriteSyncGuard<Entry<double>>;
template class SimpleWriteSyncGuard<Entry<bool>>;
template class SimpleWriteSyncGuard<Entry<long>>;
template class SimpleWriteSyncGuard<Entry<std::string>>;
template class SimpleWriteSyncGuard<Entry<int>>;
} // namespace visualdl
......@@ -6,82 +6,26 @@
#include <mutex>
#include <string>
#include "visualdl/storage/storage.h"
#include "visualdl/utils/concurrency.h"
#include "visualdl/utils/guard.h"
namespace visualdl {
/*
* IM(Information Maintainer) maintain the Storage singleton in memory,
* pre-compute some the statistical information to help visualizaton.
*
* There should be two processes and each have an IM, one is the web server
* which hold one IM to read the storage, the other is the SDK(python or C++),
* it will get an IM to write latest changes to storage.
*
* An IM have an underlying Storage object, which might be a memory based
* storage or a disk based one, both has the same interfaces those defined by
* class StorageBase.
*
* The SDK's IM will maintain the changes and periodically write to disk, and
* the web server's IM will periodically read latest storage from disk.
* Simple logic to sync memory to disk.
*/
class IM final {
template <typename T>
class SimpleWriteSyncGuard {
public:
IM() { storage_.reset(new MemoryStorage(&executor_)); }
// IM(StorageBase::Type type, StorageBase::Mode mode);
~IM() { executor_.Quit(); }
SimpleWriteSyncGuard(T* x) : data_(x) { Start(); }
~SimpleWriteSyncGuard() { End(); }
void MaintainRead(const std::string &dir, int msecs) {
LOG(INFO) << "start maintain read";
dynamic_cast<MemoryStorage *>(storage_.get())
->StartReadService(dir, msecs, &lock_);
}
void MaintainWrite(const std::string &dir, int msecs) {
dynamic_cast<MemoryStorage *>(storage_.get())
->StartWriteService(dir, msecs, &lock_);
}
/*
* Set the disk path to store the Storage object.
*/
void SetPersistDest(const std::string &path);
storage::Tablet *AddTablet(const std::string &tag, int num_samples);
/*
* @tag: tag of the target Tablet.
* @record: a record
*
* NOTE pass in the serialized protobuf message will trigger copying, but
* simpler to support different Tablet data formats.
*/
void AddRecord(const std::string &tag, const storage::Record &record);
/*
* delete all the information.
*/
void Clear();
/*
* Save the Storage Protobuf to disk.
*/
void PersistToDisk();
StorageBase &storage() { return *storage_; }
cc::PeriodExector &executor() { return executor_; }
std::mutex &handler() { return lock_; }
void Start();
void End();
void Sync();
private:
// read write lock for protobuf in memory
// TODO(ChunweiYan) mutex too heavy here, might change to a message queue to
// reduce the frequency of visiting disk
std::mutex lock_;
std::unique_ptr<StorageBase> storage_;
cc::PeriodExector executor_;
T* data_{nullptr};
};
} // namespace visualdl
......
#include "visualdl/logic/im.h"
#include "gtest/gtest.h"
#include "visualdl/storage/storage.h"
namespace visualdl {
class ImTester : public ::testing::Test {
protected:
void SetUp() override {}
IM im;
};
TEST_F(ImTester, AddTablet) {
im.Clear();
im.AddTablet("tag0", 20);
}
TEST_F(ImTester, AddRecord) {
im.Clear();
im.AddTablet("tag0", 20);
for (int i = 0; i < 100; i++) {
storage::Record rcd;
rcd.set_dtype(storage::DataType::kInt32s);
for (int j = 0; j < 10; j++) {
rcd.add_data()->add_i32s(i * 20 + j);
}
im.AddRecord("tag0", rcd);
}
ASSERT_EQ(im.storage().tablet("tag0")->records_size(), 100UL);
}
} // namespace visualdl
......@@ -6,84 +6,68 @@
namespace py = pybind11;
namespace vs = visualdl;
namespace cp = visualdl::components;
PYBIND11_PLUGIN(core) {
py::module m("core", "C++ core of VisualDL");
// m.doc() = "visualdl python core API";
py::class_<vs::TabletHelper>(m, "Tablet")
// other member setter and getter
.def("record_buffer", &vs::TabletHelper::record_buffer)
.def("records_size", &vs::TabletHelper::records_size)
.def("buffer", &vs::TabletHelper::buffer)
.def("human_readable_buffer", &vs::TabletHelper::human_readable_buffer)
.def("set_buffer",
(void (vs::TabletHelper::*)(const std::string&)) &
vs::TabletHelper::SetBuffer)
// scalar interface
.def("as_int32_scalar",
[](vs::TabletHelper& self, vs::ImHelper& im) {
return vs::components::ScalarHelper<int32_t>(self, &im.handler());
})
.def("as_int64_scalar",
[](vs::TabletHelper& self, vs::ImHelper& im) {
return vs::components::ScalarHelper<int64_t>(&self.data(),
&im.handler());
})
.def("as_float_scalar",
[](vs::TabletHelper& self, vs::ImHelper& im) {
return vs::components::ScalarHelper<float>(&self.data(),
&im.handler());
})
.def("as_double_scalar", [](vs::TabletHelper& self, vs::ImHelper& im) {
return vs::components::ScalarHelper<double>(&self.data(),
&im.handler());
});
#define ADD_SCALAR(T) \
py::class_<cp::ScalarReader<T>>(m, "ScalarReader__" #T) \
.def("records", &cp::ScalarReader<T>::records) \
.def("timestamps", &cp::ScalarReader<T>::timestamps) \
.def("ids", &cp::ScalarReader<T>::ids) \
.def("caption", &cp::ScalarReader<T>::caption);
ADD_SCALAR(int);
ADD_SCALAR(float);
ADD_SCALAR(double);
ADD_SCALAR(int64_t);
#undef ADD_SCALAR
py::class_<vs::StorageHelper>(m, "Storage")
.def("timestamp", &vs::StorageHelper::timestamp)
.def("dir", &vs::StorageHelper::dir)
.def("set_dir", &vs::StorageHelper::SetDir)
.def("tablets_size", &vs::StorageHelper::tablets_size)
.def("buffer", &vs::StorageHelper::buffer)
.def("human_readable_buffer", &vs::StorageHelper::human_readable_buffer)
.def("set_buffer",
(void (vs::StorageHelper::*)(const std::string&)) &
vs::StorageHelper::SetBuffer);
#define ADD_SCALAR_WRITER(T) \
py::class_<cp::Scalar<T>>(m, "ScalarWriter__" #T) \
.def("set_caption", &cp::Scalar<T>::SetCaption) \
.def("add_record", &cp::Scalar<T>::AddRecord);
ADD_SCALAR_WRITER(int);
ADD_SCALAR_WRITER(float);
ADD_SCALAR_WRITER(double);
#undef ADD_SCALAR_WRITER
py::class_<vs::ImHelper>(m, "Im")
.def("__init__",
[](vs::ImHelper& instance) { new (&instance) vs::ImHelper(); })
.def("storage", &vs::ImHelper::storage)
.def("tablet", &vs::ImHelper::tablet)
.def("add_tablet", &vs::ImHelper::AddTablet)
.def("persist_to_disk", &vs::ImHelper::PersistToDisk)
.def("clear_tablets", &vs::ImHelper::ClearTablets)
.def("start_read_service",
&vs::ImHelper::StartReadService,
"start a thread to maintain read service")
.def("start_write_service",
&vs::ImHelper::StartWriteSerice,
"start a thread to maintain write service")
.def("stop_service",
&vs::ImHelper::StopService,
"stop the service thread");
#define ADD_SCALAR(T) \
.def("get_scalar_" #T, [](vs::Reader& self, const std::string& tag) { \
auto tablet = self.tablet(tag); \
return vs::components::ScalarReader<T>(std::move(tablet)); \
})
py::class_<vs::Reader>(m, "Reader")
.def(
"__init__",
[](vs::Reader& instance,
const std::string& mode,
const std::string& dir) { new (&instance) vs::Reader(mode, dir); })
// clang-format off
ADD_SCALAR(float)
ADD_SCALAR(double)
ADD_SCALAR(int);
// clang-format on
#undef ADD_SCALAR
// interfaces for components begin
#define ADD_SCALAR(T) \
.def("new_scalar_" #T, [](vs::Writer& self, const std::string& tag) { \
auto tablet = self.AddTablet(tag); \
return cp::Scalar<T>(tablet); \
})
py::class_<vs::Writer>(m, "Writer")
.def("__init__",
[](vs::Writer& instance, const std::string& dir, int sync_cycle) {
new (&instance) vs::Writer(dir);
instance.storage().meta.cycle = sync_cycle;
})
.def("as_mode", &vs::Writer::AsMode)
// clang-format off
ADD_SCALAR(float)
ADD_SCALAR(double)
ADD_SCALAR(int);
// clang-format on
#undef ADD_SCALAR
// different data type of scalar conponent
#define ADD_SCALAR_TYPED_INTERFACE(T, name__) \
py::class_<vs::components::ScalarHelper<T>>(m, #name__) \
.def("add_record", &vs::components::ScalarHelper<T>::AddRecord) \
.def("set_captions", &vs::components::ScalarHelper<T>::SetCaptions) \
.def("get_records", &vs::components::ScalarHelper<T>::GetRecords) \
.def("get_captions", &vs::components::ScalarHelper<T>::GetCaptions) \
.def("get_ids", &vs::components::ScalarHelper<T>::GetIds) \
.def("get_record_size", &vs::components::ScalarHelper<T>::GetSize) \
.def("get_timestamps", &vs::components::ScalarHelper<T>::GetTimestamps);
ADD_SCALAR_TYPED_INTERFACE(int32_t, ScalarInt32);
ADD_SCALAR_TYPED_INTERFACE(int64_t, ScalarInt64);
ADD_SCALAR_TYPED_INTERFACE(float, ScalarFloat);
ADD_SCALAR_TYPED_INTERFACE(double, ScalarDouble);
#undef ADD_SCALAR_TYPED_INTERFACE
}
} // end pybind
#include "visualdl/logic/sdk.h"
#include <google/protobuf/text_format.h>
namespace visualdl {
#define IMPL_ENTRY_SET_OR_ADD(method__, ctype__, dtype__, opr__) \
template <> \
void EntryHelper<ctype__>::method__(ctype__ v) { \
entry->set_dtype(storage::DataType::dtype__); \
entry->opr__(v); \
}
IMPL_ENTRY_SET_OR_ADD(Set, int32_t, kInt32, set_i32);
IMPL_ENTRY_SET_OR_ADD(Set, int64_t, kInt64, set_i64);
IMPL_ENTRY_SET_OR_ADD(Set, bool, kBool, set_b);
IMPL_ENTRY_SET_OR_ADD(Set, float, kFloat, set_f);
IMPL_ENTRY_SET_OR_ADD(Set, double, kDouble, set_d);
IMPL_ENTRY_SET_OR_ADD(Add, int32_t, kInt32s, add_i32s);
IMPL_ENTRY_SET_OR_ADD(Add, int64_t, kInt64s, add_i64s);
IMPL_ENTRY_SET_OR_ADD(Add, float, kFloats, add_fs);
IMPL_ENTRY_SET_OR_ADD(Add, double, kDoubles, add_ds);
IMPL_ENTRY_SET_OR_ADD(Add, std::string, kStrings, add_ss);
IMPL_ENTRY_SET_OR_ADD(Add, bool, kBools, add_bs);
#define IMPL_ENTRY_GET(T, fieldname__) \
template <> \
T EntryHelper<T>::Get() const { \
return entry->fieldname__(); \
}
IMPL_ENTRY_GET(int32_t, i32);
IMPL_ENTRY_GET(int64_t, i64);
IMPL_ENTRY_GET(float, f);
IMPL_ENTRY_GET(double, d);
IMPL_ENTRY_GET(std::string, s);
IMPL_ENTRY_GET(bool, b);
#define IMPL_ENTRY_GET_MULTI(T, fieldname__) \
template <> \
std::vector<T> EntryHelper<T>::GetMulti() const { \
return std::vector<T>(entry->fieldname__().begin(), \
entry->fieldname__().end()); \
}
IMPL_ENTRY_GET_MULTI(int32_t, i32s);
IMPL_ENTRY_GET_MULTI(int64_t, i64s);
IMPL_ENTRY_GET_MULTI(float, fs);
IMPL_ENTRY_GET_MULTI(double, ds);
IMPL_ENTRY_GET_MULTI(std::string, ss);
IMPL_ENTRY_GET_MULTI(bool, bs);
std::string StorageHelper::human_readable_buffer() const {
std::string buffer;
google::protobuf::TextFormat::PrintToString(*data_, &buffer);
return buffer;
}
std::string TabletHelper::human_readable_buffer() const {
std::string buffer;
google::protobuf::TextFormat::PrintToString(*data_, &buffer);
return buffer;
}
// implementations for components
namespace components {
template <typename T>
void ScalarHelper<T>::SetCaptions(const std::vector<std::string> &captions) {
ACQUIRE_HANDLER(handler_);
CHECK_EQ(data_->captions_size(), 0UL) << "the captions can set only once";
for (int i = 0; i < captions.size(); i++) {
data_->add_captions(captions[i]);
}
}
template <typename T>
void ScalarHelper<T>::AddRecord(int id, const std::vector<T> &values) {
ACQUIRE_HANDLER(handler_);
CHECK_NOTNULL(data_);
CHECK_GT(data_->captions_size(), 0UL) << "captions should be set first";
CHECK_EQ(data_->captions_size(), values.size())
<< "number of values in a record should be compatible with the "
"captions";
// add record data
auto *record = data_->add_records();
auto *data = record->add_data();
EntryHelper<T> entry_helper(data);
for (auto v : values) {
entry_helper.Add(v);
std::vector<T> ScalarReader<T>::records() const {
std::vector<T> res;
for (int i = 0; i < reader_.total_records(); i++) {
res.push_back(reader_.record(i).data<T>(0).Get());
}
// set record id
record->set_id(id);
// set record timestamp
record->set_timestamp(time(NULL));
return res;
}
template <typename T>
std::vector<std::vector<T>> ScalarHelper<T>::GetRecords() const {
ACQUIRE_HANDLER(handler_);
std::vector<std::vector<T>> result;
EntryHelper<T> entry_helper;
for (int i = 0; i < data_->records_size(); i++) {
auto *entry = data_->mutable_records(i)->mutable_data(0);
entry_helper(entry);
auto datas = entry_helper.GetMulti();
result.push_back(std::move(datas));
std::vector<T> ScalarReader<T>::ids() const {
std::vector<T> res;
for (int i = 0; i < reader_.total_records(); i++) {
res.push_back(reader_.record(i).id());
}
return result;
return res;
}
template <typename T>
std::vector<int> ScalarHelper<T>::GetIds() const {
ACQUIRE_HANDLER(handler_);
CHECK_NOTNULL(data_);
std::vector<int> result;
for (int i = 0; i < data_->records_size(); i++) {
result.push_back(data_->records(i).id());
std::vector<T> ScalarReader<T>::timestamps() const {
std::vector<T> res;
for (int i = 0; i < reader_.total_records(); i++) {
res.push_back(reader_.record(i).timestamp());
}
return result;
return res;
}
template <typename T>
std::vector<int> ScalarHelper<T>::GetTimestamps() const {
ACQUIRE_HANDLER(handler_);
CHECK_NOTNULL(data_);
std::vector<int> result;
for (int i = 0; i < data_->records_size(); i++) {
result.push_back(data_->records(i).timestamp());
}
return result;
std::string ScalarReader<T>::caption() const {
CHECK(!reader_.captions().empty()) << "no caption";
return reader_.captions().front();
}
template <typename T>
std::vector<std::string> ScalarHelper<T>::GetCaptions() const {
ACQUIRE_HANDLER(handler_);
return std::vector<std::string>(data_->captions().begin(),
data_->captions().end());
size_t ScalarReader<T>::size() const {
return reader_.total_records();
}
template class ScalarHelper<int32_t>;
template class ScalarHelper<int64_t>;
template class ScalarHelper<float>;
template class ScalarHelper<double>;
template class ScalarReader<int>;
template class ScalarReader<int64_t>;
template class ScalarReader<float>;
template class ScalarReader<double>;
} // namespace components
......
#ifndef VISUALDL_LOGIC_SDK_H
#define VISUALDL_LOGIC_SDK_H
#include <glog/logging.h>
#include <time.h>
#include <map>
#include "visualdl/logic/im.h"
#include "visualdl/storage/storage.h"
#include "visualdl/storage/tablet.h"
#include "visualdl/utils/string.h"
namespace visualdl {
/*
* Utility helper for storage::Entry.
*/
template <typename T>
struct EntryHelper {
// use pointer to avoid copy
storage::Entry *entry{nullptr};
EntryHelper() {}
explicit EntryHelper(storage::Entry *entry) : entry(entry) {}
void operator()(storage::Entry *entry) { this->entry = entry; }
/*
* Set a single value.
*/
void Set(T v);
/*
* Add a value to repeated message field.
*/
void Add(T v);
/*
* Get a single value.
*/
T Get() const;
/*
* Get repeated field.
*/
std::vector<T> GetMulti() const;
};
class TabletHelper {
class Writer {
public:
// basic member getter and setter
std::string record_buffer(int idx) const {
return data_->records(idx).SerializeAsString();
}
size_t records_size() const { return data_->records_size(); }
std::string buffer() const { return data_->SerializeAsString(); }
std::string human_readable_buffer() const;
void SetBuffer(const storage::Tablet &t) { *data_ = t; }
void SetBuffer(const std::string &b) { data_->ParseFromString(b); }
storage::Tablet &data() const { return *data_; }
// constructor that enable concurrency.
TabletHelper(storage::Tablet *t) : data_(t) {}
// data updater that resuage of one instance.
TabletHelper &operator()(storage::Tablet *t) {
data_ = t;
return *this;
Writer(const std::string& dir) {
storage_.SetDir(dir);
}
private:
storage::Tablet *data_;
};
class StorageHelper {
public:
StorageHelper(storage::Storage *s) : data_(s) {}
StorageHelper &operator()(storage::Storage *s) {
data_ = s;
Writer& AsMode(const std::string& mode) {
mode_ = mode;
storage_.AddMode(mode);
return *this;
}
void SetBuffer(const storage::Storage &buffer) { *data_ = buffer; }
void SetBuffer(const std::string &buffer) { data_->ParseFromString(buffer); }
void SetDir(const std::string &dir) {
CHECK(data_) << "no storage instance hold";
data_->set_dir(dir);
Tablet AddTablet(const std::string& tag) {
// TODO(ChunweiYan) add string check here.
auto tmp = mode_ + "/" + tag;
string::TagEncode(tmp);
auto res = storage_.AddTablet(tmp);
res.SetCaptions(std::vector<std::string>({mode_}));
return res;
}
int64_t timestamp() const { return data_->timestamp(); }
std::string dir() const { return data_->dir(); }
int tablets_size() const { return data_->tablets_size(); }
std::string buffer() const { return data_->SerializeAsString(); }
std::string human_readable_buffer() const;
Storage& storage() { return storage_; }
private:
storage::Storage *data_{nullptr};
Storage storage_;
std::string mode_;
};
class ImHelper {
class Reader {
public:
// TODO(ChunweiYan) decouple helper with resource.
ImHelper() { im_.reset(new IM); }
ImHelper(std::unique_ptr<IM> im) : im_(std::move(im)) {}
Reader(const std::string& mode, const std::string& dir)
: mode_(mode), reader_(dir) {}
StorageHelper storage() {
return StorageHelper(im_->storage().mutable_data());
}
TabletHelper tablet(const std::string &tag) {
return TabletHelper(im_->storage().tablet(tag));
TabletReader tablet(const std::string& tag) {
auto tmp = mode_ + "/" + tag;
string::TagEncode(tmp);
return reader_.tablet(tmp);
}
TabletHelper AddTablet(const std::string &tag, int num_samples) {
return TabletHelper(im_->AddTablet(tag, num_samples));
}
std::mutex &handler() { return im_->handler(); }
void ClearTablets() { im_->storage().mutable_data()->clear_tablets(); }
void StartReadService(const std::string &dir, int msecs) {
im_->SetPersistDest(dir);
im_->MaintainRead(dir, msecs);
}
void StartWriteSerice(const std::string &dir, int msecs) {
im_->SetPersistDest(dir);
im_->MaintainWrite(dir, msecs);
}
void StopService() { im_->executor().Quit(); }
void PersistToDisk() const { im_->PersistToDisk(); }
private:
std::unique_ptr<IM> im_;
StorageReader reader_;
std::string mode_{"default"};
};
namespace components {
#define ACQUIRE_HANDLER(handler) std::lock_guard<std::mutex> ____(*handler);
/*
* Read and write support for Scalar component.
*/
template <typename T>
class ScalarHelper {
public:
ScalarHelper(storage::Tablet *tablet, std::mutex *handler = nullptr)
: data_(tablet), handler_(handler) {}
ScalarHelper(TabletHelper &tablet, std::mutex *handler = nullptr)
: data_(&tablet.data()), handler_(handler) {}
void SetCaptions(const std::vector<std::string> &captions);
void AddRecord(int id, const std::vector<T> &values);
struct Scalar {
Scalar(Tablet tablet) : tablet_(tablet) {
tablet_.SetType(Tablet::Type::kScalar);
}
std::vector<std::vector<T>> GetRecords() const;
void SetCaption(const std::string cap) {
tablet_.SetCaptions(std::vector<std::string>({cap}));
}
std::vector<int> GetIds() const;
void AddRecord(int id, T value) {
auto record = tablet_.AddRecord();
record.SetId(id);
auto entry = record.AddData<T>();
entry.Set(value);
}
std::vector<int> GetTimestamps() const;
private:
Tablet tablet_;
};
std::vector<std::string> GetCaptions() const;
template <typename T>
struct ScalarReader {
ScalarReader(TabletReader&& reader) : reader_(reader) {}
size_t GetSize() const { return data_->records_size(); }
std::vector<T> records() const;
std::vector<T> ids() const;
std::vector<T> timestamps() const;
std::string caption() const;
size_t total_records() {return reader_.total_records();}
size_t size() const;
private:
storage::Tablet *data_;
std::mutex *handler_;
TabletReader reader_;
};
} // namespace components
} // namespace visualdl
#endif // VISUALDL_BACKEND_LOGIC_SDK_H
#endif
......@@ -4,78 +4,29 @@
namespace visualdl {
struct ScalarTestHelper {
ImHelper rim;
ImHelper wim;
const std::string dir = "./tmp/sdk_test.test";
void operator()(std::function<void()> read, std::function<void()> write) {
wim.StartWriteSerice(dir, 200);
write();
// should wait for the write service create log's path
std::this_thread::sleep_for(std::chrono::milliseconds(400));
rim.StartReadService(dir, 100);
// should wait for the read service to load "tag0" tablet into memory
std::this_thread::sleep_for(std::chrono::milliseconds(600));
read();
}
};
TEST(Scalar, set_caption) {
ScalarTestHelper helper;
const std::vector<std::string> captions({"train", "test"});
auto write = [&] {
auto tablet = helper.wim.AddTablet("tag0", -1);
components::ScalarHelper<float> scalar(tablet, &helper.wim.handler());
scalar.SetCaptions(captions);
};
auto read = [&] {
auto mytablet = helper.rim.tablet("tag0");
components::ScalarHelper<float> myscalar(mytablet, &helper.rim.handler());
auto mycaptions = myscalar.GetCaptions();
ASSERT_EQ(captions, mycaptions);
};
helper(read, write);
}
TEST(Scalar, add_records) {
ScalarTestHelper helper;
const std::vector<std::string> captions({"train", "test"});
const size_t nsteps = 100;
auto write = [&] {
auto tablet = helper.wim.AddTablet("tag0", -1);
components::ScalarHelper<float> scalar(tablet, &helper.wim.handler());
scalar.SetCaptions(captions);
for (int i = 0; i < nsteps; i++) {
scalar.AddRecord(i * 10, std::vector<float>({(float)i, (float)i + 1}));
}
};
auto read = [&] {
auto mytablet = helper.rim.tablet("tag0");
components::ScalarHelper<float> myscalar(mytablet, &helper.rim.handler());
auto records = myscalar.GetRecords();
ASSERT_EQ(records.size(), nsteps);
for (int i = 0; i < nsteps; i++) {
ASSERT_EQ(records[i], std::vector<float>({(float)i, (float)i + 1}));
}
};
helper(read, write);
TEST(Scalar, write) {
const auto dir = "./tmp/sdk_test";
Storage storage;
// write disk every time
storage.meta.cycle = 1;
storage.SetDir(dir);
auto tablet = storage.AddTablet("scalar0");
components::Scalar<int> scalar(tablet);
scalar.SetCaption("train");
scalar.AddRecord(0, 12);
storage.PersistToDisk();
// read from disk
StorageReader reader(dir);
auto tablet_reader = reader.tablet("scalar0");
auto scalar_reader = components::ScalarReader<int>(std::move(tablet_reader));
auto captioin = scalar_reader.caption();
ASSERT_EQ(captioin, "train");
ASSERT_EQ(scalar_reader.total_records(), 1);
auto record = scalar_reader.records();
ASSERT_EQ(record.size(), 1);
// check the first entry of first record
ASSERT_EQ(record.front(), 12);
}
} // namespace visualdl
......@@ -10,4 +10,4 @@ function(py_test TARGET_NAME)
)
endfunction()
py_test(test_summary SRCS test_summary.py)
py_test(test_summary SRCS test_storage.py)
__all__ = [
'StorageReader',
'StorageWriter',
]
import core
dtypes = ("float", "double", "int32", "int64")
class StorageReader(object):
def __init__(self, mode, dir):
self.reader = core.Reader(mode, dir)
def scalar(self, tag, type='float'):
type2scalar = {
'float': self.reader.get_scalar_float,
'double': self.reader.get_scalar_double,
'int': self.reader.get_scalar_int,
}
return type2scalar[type](tag)
class StorageWriter(object):
def __init__(self, dir, sync_cycle):
self.writer = core.Writer(dir, sync_cycle)
def as_mode(self, mode):
self.writer = self.writer.as_mode(mode)
return self
def scalar(self, tag, type='float'):
type2scalar = {
'float': self.writer.new_scalar_float,
'double': self.writer.new_scalar_double,
'int': self.writer.new_scalar_int,
}
return type2scalar[type](tag)
__all__ = [
'set_storage',
'scalar',
]
import core
dtypes = ("float", "double", "int32", "int64")
def IM(dir, mode="read", msecs=500):
im = core.Im()
READ = "read"
WRITE = "write"
if mode == READ:
im.start_read_service(dir, msecs)
else:
im.start_write_service(dir, msecs)
return im
class _Scalar(object):
'''
Python syntax wrapper for the core.ScalarHelper object.
'''
def __init__(self, core_object):
self._core_object = core_object
def add(self, id, vs):
'''
add a scalar record
:param id: int
id in the x-corrdinate
:param vs: list
values
:return: None
'''
self._core_object.add_record(id, vs)
def set_captions(self, cs):
'''
set the captions, one caption for one line.
:param cs: list of str
:return: None
'''
self._core_object.set_captions(cs)
@property
def captions(self):
return self._core_object.get_captions()
@property
def records(self):
'''
get all the records, format like
[
[0.1, 0.2], # first record
[0.2, 0.3], # second record
# ...
]
:return: list of list
'''
return self._core_object.get_records()
@property
def ids(self):
'''
get all the ids for the records
:return: list of int
'''
return self._core_object.get_ids()
@property
def timestamps(self):
'''
get all the timestamps for the records
:return: list of int
'''
return self._core_object.get_timestamps()
@property
def size(self):
return self._core_object.get_record_size()
def scalar(im, tag, dtype='float'):
'''
create a scalar component.
:param tag: str
name of this component.
:param dtype: string
the data type that will be used in underlying storage.
:return: object of core.Tablet
'''
assert dtype in dtypes, "invalid dtype(%s), should be one of %s" % (
dtype, str(dtypes))
tablet = im.add_tablet(tag, -1)
dtype2obj = {
'float': tablet.as_float_scalar,
'double': tablet.as_double_scalar,
'int32': tablet.as_int32_scalar,
'int64': tablet.as_int64_scalar,
}
obj = dtype2obj[dtype](im)
return _Scalar(obj)
import storage
import numpy as np
import unittest
import random
import time
class StorageTest(unittest.TestCase):
def setUp(self):
self.dir = "./tmp/storage_test"
def test_read(self):
print 'test write'
self.writer = storage.StorageWriter(self.dir, sync_cycle=1).as_mode("train")
scalar = self.writer.scalar("model/scalar/min")
# scalar.set_caption("model/scalar/min")
for i in range(10):
scalar.add_record(i, float(i))
print 'test read'
self.reader = storage.StorageReader("train", self.dir)
scalar = self.reader.scalar("model/scalar/min")
self.assertEqual(scalar.caption(), "train")
records = scalar.records()
ids = scalar.ids()
self.assertTrue(np.equal(records, [float(i) for i in range(10)]).all())
self.assertTrue(np.equal(ids, [float(i) for i in range(10)]).all())
print 'records', records
print 'ids', ids
if __name__ == '__main__':
unittest.main()
import summary
import numpy as np
import unittest
import random
import time
once_flag = False
class ScalarTester(unittest.TestCase):
def setUp(self):
self.dir = "tmp/summary.test"
# clean path
try:
os.rmdir(self.dir)
except:
pass
self.im = summary.IM(self.dir, "write", 200)
self.tablet_name = "scalar0"
self.scalar = summary.scalar(self.im, self.tablet_name)
self.py_captions = ["train cost", "test cost"]
self.scalar.set_captions(self.py_captions)
self.py_records = []
self.py_ids = []
# write
for i in range(10):
record = [0.1 * i, 0.2 * i]
id = i * 10
self.py_records.append(record)
self.py_ids.append(id)
self.scalar.add(id, record)
def test_records(self):
self.assertEqual(self.scalar.size, len(self.py_records))
for i, record in enumerate(self.scalar.records):
self.assertTrue(np.isclose(record, self.py_records[i]).all())
def test_ids(self):
self.assertEqual(len(self.py_ids), self.scalar.size)
for i, id in enumerate(self.scalar.ids):
self.assertEqual(self.py_ids[i], id)
def test_captions(self):
self.assertEqual(self.scalar.captions, self.py_captions)
def test_read_records(self):
time.sleep(1)
im = summary.IM(self.dir, "read", 200)
time.sleep(1)
scalar = summary.scalar(im, self.tablet_name)
records = scalar.records
self.assertEqual(len(self.py_records), scalar.size)
for i, record in enumerate(self.scalar.records):
self.assertTrue(np.isclose(record, records[i]).all())
def test_read_ids(self):
time.sleep(0.6)
im = summary.IM(self.dir, "read", msecs=200)
time.sleep(0.6)
scalar = summary.scalar(im, self.tablet_name)
self.assertEqual(len(self.py_ids), scalar.size)
for i, id in enumerate(scalar.ids):
self.assertEqual(self.py_ids[i], id)
def test_read_captions(self):
time.sleep(0.6)
im = summary.IM(self.dir, "read", msecs=200)
time.sleep(0.6)
scalar = summary.scalar(im, self.tablet_name)
self.assertEqual(scalar.captions, self.py_captions)
def test_mix_read_write(self):
write_im = summary.IM(self.dir, "write", msecs=200)
time.sleep(0.6)
read_im = summary.IM(self.dir, "read", msecs=200)
scalar_writer = summary.scalar(write_im, self.tablet_name)
scalar_reader = summary.scalar(read_im, self.tablet_name)
scalar_writer.set_captions(["train cost", "test cost"])
for i in range(1000):
scalar_writer.add(i, [random.random(), random.random()])
scalar_reader.records
for i in range(500):
scalar_writer.add(i, [random.random(), random.random()])
scalar_reader.records
for i in range(500):
scalar_writer.add(i, [random.random(), random.random()])
for i in range(10):
scalar_reader.records
scalar_reader.captions
if __name__ == '__main__':
unittest.main()
......@@ -4,5 +4,12 @@ add_library(storage_proto ${PROTO_SRCS})
add_dependencies(storage_proto protobuf)
## add storage as target
add_library(entry entry.cc entry.h ${PROTO_SRCS} ${PROTO_HDRS})
add_library(tablet tablet.cc tablet.h ${PROTO_SRCS} ${PROTO_HDRS})
add_library(record record.cc record.h ${PROTO_SRCS} ${PROTO_HDRS})
add_library(storage storage.cc storage.h ${PROTO_SRCS} ${PROTO_HDRS})
add_dependencies(entry storage_proto im)
add_dependencies(record storage_proto entry)
add_dependencies(tablet storage_proto)
add_dependencies(storage storage_proto)
#include "visualdl/storage/entry.h"
namespace visualdl {
#define IMPL_ENTRY_SET_OR_ADD(method__, ctype__, dtype__, opr__) \
template <> \
void Entry<ctype__>::method__(ctype__ v) { \
entry->set_dtype(storage::DataType::dtype__); \
entry->opr__(v); \
WRITE_GUARD \
}
IMPL_ENTRY_SET_OR_ADD(Set, int, kInt32, set_i32);
IMPL_ENTRY_SET_OR_ADD(Set, int64_t, kInt64, set_i64);
IMPL_ENTRY_SET_OR_ADD(Set, bool, kBool, set_b);
IMPL_ENTRY_SET_OR_ADD(Set, float, kFloat, set_f);
IMPL_ENTRY_SET_OR_ADD(Set, double, kDouble, set_d);
IMPL_ENTRY_SET_OR_ADD(Add, int, kInt32s, add_i32s);
IMPL_ENTRY_SET_OR_ADD(Add, int64_t, kInt64s, add_i64s);
IMPL_ENTRY_SET_OR_ADD(Add, float, kFloats, add_fs);
IMPL_ENTRY_SET_OR_ADD(Add, double, kDoubles, add_ds);
IMPL_ENTRY_SET_OR_ADD(Add, std::string, kStrings, add_ss);
IMPL_ENTRY_SET_OR_ADD(Add, bool, kBools, add_bs);
#define IMPL_ENTRY_GET(T, fieldname__) \
template <> \
T EntryReader<T>::Get() const { \
data_.fieldname__(); \
}
IMPL_ENTRY_GET(int, i32);
IMPL_ENTRY_GET(int64_t, i64);
IMPL_ENTRY_GET(float, f);
IMPL_ENTRY_GET(double, d);
IMPL_ENTRY_GET(std::string, s);
IMPL_ENTRY_GET(bool, b);
#define IMPL_ENTRY_GET_MULTI(T, fieldname__) \
template <> \
std::vector<T> EntryReader<T>::GetMulti() const { \
return std::vector<T>(data_.fieldname__().begin(), \
data_.fieldname__().end()); \
}
IMPL_ENTRY_GET_MULTI(int, i32s);
IMPL_ENTRY_GET_MULTI(float, fs);
IMPL_ENTRY_GET_MULTI(double, ds);
IMPL_ENTRY_GET_MULTI(std::string, ss);
IMPL_ENTRY_GET_MULTI(bool, bs);
template class Entry<int>;
template class Entry<float>;
template class Entry<double>;
template class Entry<bool>;
template class EntryReader<int>;
template class EntryReader<float>;
template class EntryReader<double>;
template class EntryReader<bool>;
} // namespace visualdl
#ifndef VISUALDL_STORAGE_ENTRY_H
#define VISUALDL_STORAGE_ENTRY_H
#include "visualdl/logic/im.h"
#include "visualdl/storage/storage.pb.h"
#include "visualdl/utils/guard.h"
namespace visualdl {
struct Storage;
/*
* Utility helper for storage::Entry.
*/
template <typename T>
struct Entry {
DECL_GUARD(Entry<T>)
// use pointer to avoid copy
storage::Entry* entry{nullptr};
Entry() {}
explicit Entry(storage::Entry* entry, Storage* parent)
: entry(entry), x_(parent) {}
void operator()(storage::Entry* entry, Storage* parent) {
this->entry = entry;
x_ = parent;
}
// Set a single value.
void Set(T v);
// Add a value to repeated message field.
void Add(T v);
Storage* parent() { return x_; }
private:
Storage* x_;
};
template <typename T>
struct EntryReader {
EntryReader(storage::Entry x) : data_(x) {}
// Get a single value.
T Get() const;
// Get repeated field.
std::vector<T> GetMulti() const;
private:
storage::Entry data_;
};
} // namespace visualdl
#endif
#include "visualdl/storage/record.h"
\ No newline at end of file
#ifndef VISUALDL_STORAGE_RECORD_H
#define VISUALDL_STORAGE_RECORD_H
#include "visualdl/logic/im.h"
#include "visualdl/storage/entry.h"
#include "visualdl/storage/storage.pb.h"
namespace visualdl {
/*
* A helper for operations on storage::Record
*/
struct Record {
enum Dtype {
kInt32 = 0,
kInt64 = 1,
kFloat = 2,
kDouble = 3,
kString = 4,
kBool = 5,
// entrys
kInt64s = 6,
kFloats = 7,
kDoubles = 8,
kStrings = 9,
kInt32s = 10,
kBools = 11,
kUnknown = 12
};
DECL_GUARD(Record)
Record(storage::Record* x, Storage* parent) : data_(x), x_(parent) {}
// write operations
void SetTimeStamp(int64_t x) {
data_->set_timestamp(x);
WRITE_GUARD
}
void SetId(int64_t id) {
data_->set_id(id);
WRITE_GUARD
}
void SetDtype(Dtype type) {
data_->set_dtype(storage::DataType(type));
WRITE_GUARD
}
template <typename T>
Entry<T> MutableMeta() {
return Entry<T>(data_->mutable_meta(), parent());
}
template <typename T>
Entry<T> AddData() {
WRITE_GUARD
return Entry<T>(data_->add_data(), parent());
}
Storage* parent() { return x_; }
private:
storage::Record* data_{nullptr};
Storage* x_;
};
struct RecordReader {
RecordReader(storage::Record x) : data_(x) {}
// read operations
size_t data_size() const { return data_.data_size(); }
template <typename T>
EntryReader<T> data(int i) {
return EntryReader<T>(data_.data(i));
}
int64_t timestamp() const { return data_.timestamp(); }
int64_t id() const { return data_.id(); }
Record::Dtype dtype() const { return (Record::Dtype)data_.dtype(); }
template <typename T>
Entry<T> meta() const {
return data_.meta();
}
private:
storage::Record data_;
};
} // namespace visualdl
#endif
#include <glog/logging.h>
#include <fstream>
#include "visualdl/storage/storage.h"
#include "visualdl/utils/concurrency.h"
#include "visualdl/utils/filesystem.h"
namespace visualdl {
const std::string StorageBase::meta_file_name = "storage.meta";
std::string StorageBase::meta_path(const std::string &dir) const {
CHECK(!dir.empty()) << "dir is empty";
return dir + "/" + meta_file_name;
}
std::string StorageBase::tablet_path(const std::string &dir,
const std::string &tag) const {
CHECK(!dir.empty()) << "dir should be set first";
return dir + "/" + tag;
}
storage::Tablet *MemoryStorage::NewTablet(const std::string &tag,
int num_samples) {
auto it = tablets_.find(tag);
if (it == tablets_.end()) {
// create new tablet
tablets_[tag] = storage::Tablet();
tablets_[tag].set_tag(tag);
*storage_.add_tags() = tag;
} else {
return &it->second;
}
return &tablets_[tag];
}
storage::Tablet *MemoryStorage::tablet(const std::string &tag) {
auto it = tablets_.find(tag);
CHECK(it != tablets_.end()) << "tablet tagged as " << tag << " not exists";
return &it->second;
}
// TODO add some checksum to avoid unnecessary saving
void MemoryStorage::PersistToDisk(const std::string &dir) {
CHECK(!dir.empty());
storage_.set_dir(dir);
// make a directory if not exist
fs::TryRecurMkdir(dir);
// write storage out
VLOG(2) << "to serize meta to dir " << dir;
fs::SerializeToFile(storage_, meta_path(dir));
VLOG(2) << "serize meta to dir " << dir;
// write all the tablets
for (auto tag : storage_.tags()) {
auto it = tablets_.find(tag);
CHECK(it != tablets_.end());
fs::SerializeToFile(it->second, tablet_path(dir, tag));
}
}
// TODO add some checksum to avoid unnecessary loading
void MemoryStorage::LoadFromDisk(const std::string &dir) {
CHECK(!dir.empty()) << "dir is empty";
storage_.set_dir(dir);
// load storage
CHECK(fs::DeSerializeFromFile(&storage_, meta_path(dir)))
<< "parse from " << meta_path(dir) << " failed";
// load all the tablets
for (int i = 0; i < storage_.tags_size(); i++) {
auto tag = storage_.tags(i);
CHECK(fs::DeSerializeFromFile(&tablets_[tag], tablet_path(dir, tag)));
}
}
void MemoryStorage::StartReadService(const std::string &dir,
int msecs,
std::mutex *handler) {
CHECK(executor_ != nullptr);
CHECK(!dir.empty()) << "dir should be set first";
cc::PeriodExector::task_t task = [dir, this, handler] {
VLOG(1) << "loading from " << dir;
if (handler != nullptr) {
std::lock_guard<std::mutex> _(*handler);
LoadFromDisk(dir);
} else {
LoadFromDisk(dir);
}
return true;
};
// executor_.Start();
VLOG(1) << "push read task";
(*executor_)(std::move(task), msecs);
}
void MemoryStorage::StartWriteService(const std::string &dir,
int msecs,
std::mutex *handler) {
CHECK(executor_ != nullptr);
CHECK(!dir.empty()) << "dir should be set first";
storage_.set_dir(dir);
// executor_.Start();
cc::PeriodExector::task_t task = [dir, handler, this] {
VLOG(2) << "persist to disk";
if (handler != nullptr) {
std::lock_guard<std::mutex> _(*handler);
PersistToDisk(dir);
} else {
PersistToDisk(dir);
}
return true;
};
(*executor_)(std::move(task), msecs);
}
} // namespace visualdl
#ifndef VISUALDL_STORAGE_H
#define VISUALDL_STORAGE_H
#ifndef VISUALDL_STORAGE_STORAGE_H
#define VISUALDL_STORAGE_STORAGE_H
#include <time.h>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <glog/logging.h>
#include <vector>
#include <set>
#include "visualdl/logic/im.h"
#include "visualdl/utils/guard.h"
#include "visualdl/storage/storage.pb.h"
#include "visualdl/utils/concurrency.h"
#include "visualdl/storage/tablet.h"
#include "visualdl/utils/filesystem.h"
namespace visualdl {
/*
* Generate a tablet path in disk from its tag.
*/
inline std::string GenPathFromTag(const std::string &dir,
const std::string &tag);
static const std::string meta_file_name = "storage.meta";
static std::string meta_path(const std::string& dir) {
CHECK(!dir.empty()) << "dir is empty";
return dir + "/" + meta_file_name;
}
static std::string tablet_path(const std::string& dir, const std::string& tag) {
CHECK(!dir.empty()) << "dir should be set first";
return dir + "/" + tag;
}
struct SimpleSyncMeta {
void Inc() { counter++; }
bool ToSync() { return counter % cycle == 0; }
size_t counter{0};
int cycle;
};
/*
* Storage Interface. The might be a bunch of implementations, for example, a
* MemStorage that keep a copy of all the taplets in memory, can be changed with
* a higher performance; a DiskStorage that keep all the data in disk, apply to
* the scenerios where memory consumption should be considered.
* Helper for operations on storage::Storage.
*/
class StorageBase {
public:
const static std::string meta_file_name;
struct Storage {
DECL_GUARD(Storage)
enum Type { kMemory = 0, kDisk = 1 };
// mode of the sevice, either reading or writing.
enum Mode { kRead = 0, kWrite = 1, kNone = 2 };
mutable SimpleSyncMeta meta;
void SetStorage(const std::string &dir) {
Storage() { data_ = std::make_shared<storage::Storage>(); }
Storage(const std::shared_ptr<storage::Storage>& x) : data_(x) {
time_t t;
time(&t);
storage_.set_timestamp(t);
storage_.set_dir(dir);
data_->set_timestamp(t);
}
std::string meta_path(const std::string &dir) const;
std::string tablet_path(const std::string &dir, const std::string &tag) const;
/*
* Create a new Tablet storage.
*/
virtual storage::Tablet *NewTablet(const std::string &tag,
int num_samples) = 0;
/*
* Get a tablet from memory, this can be viewed as a cache, if the storage is
* in disk, a hash map in memory will first load the corresponding Tablet
* Protobuf from disk and hold all the changes.
*/
virtual storage::Tablet *tablet(const std::string &tag) = 0;
// write operations
void AddMode(const std::string& x) {
// avoid duplicate modes.
if (modes_.count(x) != 0) return;
*data_->add_modes() = x;
modes_.insert(x);
WRITE_GUARD
}
/*
* Persist the data from cache to disk. Both the memory storage or disk
* storage should write changes to disk for persistence.
*/
virtual void PersistToDisk(const std::string &dir) = 0;
Tablet AddTablet(const std::string& x) {
CHECK(tablets_.count(x) == 0) << "tablet [" << x << "] has existed";
tablets_[x] = storage::Tablet();
AddTag(x);
LOG(INFO) << "really add tag " << x;
WRITE_GUARD
return Tablet(&tablets_[x], this);
}
void SetDir(const std::string& dir) { dir_ = dir; }
void PersistToDisk() { PersistToDisk(dir_); }
/*
* Load data from disk.
* Save memory to disk.
*/
virtual void LoadFromDisk(const std::string &dir) = 0;
void PersistToDisk(const std::string& dir) {
// LOG(INFO) << "persist to disk " << dir;
CHECK(!dir.empty()) << "dir should be set.";
fs::TryRecurMkdir(dir);
fs::SerializeToFile(*data_, meta_path(dir));
for (auto tag : data_->tags()) {
auto it = tablets_.find(tag);
CHECK(it != tablets_.end()) << "tag " << tag << " not exist.";
fs::SerializeToFile(it->second, tablet_path(dir, tag));
}
}
storage::Storage *mutable_data() { return &storage_; }
const storage::Storage &data() { return storage_; }
Storage* parent() { return this; }
protected:
storage::Storage storage_;
void AddTag(const std::string& x) {
*data_->add_tags() = x;
}
private:
std::string dir_;
std::map<std::string, storage::Tablet> tablets_;
std::shared_ptr<storage::Storage> data_;
std::set<std::string> modes_;
};
/*
* Storage in Memory, that will support quick edits on data.
* Storage reader, each interface will trigger a read.
*/
class MemoryStorage final : public StorageBase {
public:
MemoryStorage() {}
MemoryStorage(cc::PeriodExector *executor) : executor_(executor) {}
~MemoryStorage() {
if (executor_ != nullptr) executor_->Quit();
struct StorageReader {
StorageReader(const std::string& dir) : dir_(dir) {}
// read operations
std::vector<std::string> Tags() {
storage::Storage storage;
Reload(storage);
return std::vector<std::string>(storage.tags().begin(),
storage.tags().end());
}
std::vector<std::string> Modes() {
storage::Storage storage;
Reload(storage);
return std::vector<std::string>(storage.modes().begin(),
storage.modes().end());
}
storage::Tablet *NewTablet(const std::string &tag, int num_samples) override;
storage::Tablet *tablet(const std::string &tag) override;
void PersistToDisk(const std::string &dir) override;
void LoadFromDisk(const std::string &dir) override;
/*
* Create a thread which will keep reading the latest data from the disk to
* memory.
*
* msecs: how many millisecond to sync memory and disk.
*/
void StartReadService(const std::string &dir, int msecs, std::mutex *handler);
TabletReader tablet(const std::string& tag) const {
auto path = tablet_path(dir_, tag);
storage::Tablet tablet;
fs::DeSerializeFromFile(&tablet, path);
return TabletReader(tablet);
}
/*
* Create a thread which will keep writing the latest changes from memory to
* disk.
*
* msecs: how many millisecond to sync memory and disk.
*/
void StartWriteService(const std::string &dir,
int msecs,
std::mutex *handler);
protected:
void Reload(storage::Storage& storage) {
const std::string path = meta_path(dir_);
fs::DeSerializeFromFile(&storage, path);
}
private:
std::map<std::string, storage::Tablet> tablets_;
// TODO(ChunweiYan) remove executor here.
cc::PeriodExector *executor_{nullptr};
std::string dir_;
};
} // namespace visualdl
#endif // VISUALDL_STORAGE_H
#endif
......@@ -91,25 +91,33 @@ message Record {
/*
A Tablet stores the records of a component which type is `component` and
indidates as `tag`.
indidated as `tag`.
The records will be saved in a file which name contains `tag`. During the
running period,
`num_records` will be accumulated, and `num_samples` indicates the size of
sample set the
reservoir sampling algorithm will collect.
sample set the reservoir sampling algorithm will collect, if `num_samples`
set to -1, no sample will be applied.
*/
message Tablet {
// the kinds of the components that supported
// the kinds of the components that supported.
enum Type {
kScalar = 0;
kHistogram = 1;
kGraph = 2;
kImage = 2;
}
// The unique identification for this `Tablet`. VisualDL will have no the
// concept of FileWriter like TB. It will store all the tablets in a single
// directory, so it has a `mode` concept. `mode` will be stored in `tag`
// as the prefix, so that the same tablet in different modes will have
// different `tag`. for example, a tablet called "layer/grad/min" in "train"
// and "test" mode will have tags like "train/layer/grad/min" and
// "test/layer/grad/min".
string tag = 6;
// type of the component, different component should have different storage
// format.
Type component = 1;
// records the total count of records, each Write operation should increate
// this value.
// Keep a record of the total count of records, each Write operation should
// increate this value.
int64 total_records = 2;
// indicate the number of instances to sample, this should be a constant
// value.
......@@ -117,22 +125,21 @@ message Tablet {
repeated Record records = 4;
// store a meta infomation if all the records share.
Entry meta = 5;
// the unique identification for this `Tablet`.
string tag = 6;
// one tablet might have multiple captions, for example, a scalar component
// might have
// two plots labeled "train" and "test".
// one tablet might have just one caption, if not set, it should be the value
// of `mode`.
repeated string captions = 7;
string description = 8;
}
/*
The Storage stores all the records.
*/
message Storage {
// tags to Tablet, should be thread safe if fix the keys after initialization.
// TODO to delete in the new storage interface.
map<string, Tablet> tablets = 1;
repeated string tags = 4;
string dir = 2;
int64 timestamp = 3;
// VisualDL will not have the concept like TB's FileWriter, just one storage,
// each tablet has different `mode`.
repeated string modes = 1;
// tags will be used to generate paths of tablets.
repeated string tags = 2;
int64 timestamp = 5;
}
#include "visualdl/storage/storage.h"
#include <glog/logging.h>
#include <gtest/gtest.h>
#include <memory>
namespace visualdl {
using namespace std;
class MemoryStorageTest : public ::testing::Test {
class StorageTest : public ::testing::Test {
public:
void SetUp() override { storage_.SetStorage("./tmp"); }
void SetUp() {
storage.SetDir("./tmp/storage_test");
storage.meta.cycle = 1;
}
MemoryStorage storage_;
Storage storage;
};
TEST_F(MemoryStorageTest, SetStorage) {
string dir = "./tmp";
storage_.SetStorage(dir);
ASSERT_EQ(storage_.data().dir(), dir);
}
TEST_F(MemoryStorageTest, AddTablet) {
// TODO need to escape tag as name
string tag = "add%20tag0";
storage_.NewTablet(tag, -1);
auto* tablet = storage_.tablet(tag);
ASSERT_TRUE(tablet != nullptr);
ASSERT_EQ(tablet->tag(), tag);
}
TEST_F(StorageTest, main) {
storage.AddMode("train");
storage.AddMode("test");
TEST_F(MemoryStorageTest, PersistToDisk) {
const std::string dir = "./tmp/201.test";
storage_.SetStorage(dir);
string tag = "add%20tag0";
storage_.NewTablet(tag, -1);
auto tag0 = storage.AddTablet("tag0");
auto tag1 = storage.AddTablet("tag1");
auto record = tag0.AddRecord();
auto entry = record.AddData<int>();
entry.Set(12);
storage_.PersistToDisk(dir);
LOG(INFO) << "persist to disk";
StorageReader reader("./tmp/storage_test");
auto modes = reader.Modes();
MemoryStorage other;
other.LoadFromDisk(dir);
LOG(INFO) << "read from disk";
ASSERT_EQ(other.data().SerializeAsString(),
storage_.data().SerializeAsString());
ASSERT_EQ(modes.size(), 2);
ASSERT_EQ(modes[0], "train");
ASSERT_EQ(modes[1], "test");
}
} // namespace visualdl
#ifndef VISUALDL_TABLET_H
#define VISUALDL_TABLET_H
#include "visualdl/logic/im.h"
#include "visualdl/storage/record.h"
#include "visualdl/storage/storage.pb.h"
#include "visualdl/utils/string.h"
namespace visualdl {
/*
* Tablet is a helper for operations on storage::Tablet.
*/
struct Tablet {
enum Type { kScalar = 0, kHistogram = 1, kImage = 2 };
DECL_GUARD(Tablet);
Tablet(storage::Tablet* x, Storage* parent) : data_(x), x_(parent) {}
// write operations.
void SetNumSamples(int x) {
data_->set_num_samples(x);
WRITE_GUARD
}
void SetType(Type type) {
data_->set_component(static_cast<storage::Tablet::Type>(type));
WRITE_GUARD
}
void SetTag(const std::string& mode, const std::string& tag) {
auto internal_tag = mode + "/" + tag;
string::TagEncode(internal_tag);
data_->set_tag(internal_tag);
WRITE_GUARD
}
Record AddRecord() {
IncTotalRecords();
WRITE_GUARD
return Record(data_->add_records(), parent());
}
template <typename T>
Entry<T> MutableMeta() {
Entry<T> x(data_->mutable_meta(), parent());
}
void SetCaptions(const std::vector<std::string>& xs) {
data_->clear_captions();
for (const auto& x : xs) {
*data_->add_captions() = x;
}
WRITE_GUARD
}
void SetDescription(const std::string& x) {
data_->set_description(x);
WRITE_GUARD
}
void IncTotalRecords() {
data_->set_total_records(data_->total_records() + 1);
WRITE_GUARD
}
Storage* parent() const { return x_; }
private:
Storage* x_;
storage::Tablet* data_{nullptr};
};
/*
* Tablet reader, it will hold the protobuf object.
*/
struct TabletReader {
TabletReader(storage::Tablet x) : data_(x) {}
// read operations.
std::string tag() const { return data_.tag(); }
Tablet::Type type() const { return Tablet::Type(data_.component()); }
int64_t total_records() const { return data_.total_records(); }
int32_t num_samples() const { return data_.num_samples(); }
RecordReader record(int i) const { return RecordReader(data_.records(i)); }
template <typename T>
EntryReader<T> meta() const {
return EntryReader<T>(data_.meta());
}
std::vector<std::string> captions() const {
std::vector<std::string> x(data_.captions().begin(),
data_.captions().end());
return x;
}
std::string description() const { return data_.description(); }
private:
storage::Tablet data_;
};
} // namespace visualdl
#endif
......@@ -43,7 +43,7 @@ bool DeSerializeFromFile(T* proto, const std::string& path) {
return proto->ParseFromIstream(&file);
}
void TryMkdir(const std::string& dir) {
static void TryMkdir(const std::string& dir) {
VLOG(1) << "try to mkdir " << dir;
struct stat st = {0};
if (stat(dir.c_str(), &st) == -1) {
......@@ -52,7 +52,7 @@ void TryMkdir(const std::string& dir) {
}
// Create a path by recursively create directries
void TryRecurMkdir(const std::string& path) {
static void TryRecurMkdir(const std::string& path) {
// split path by '/'
for (int i = 1; i < path.size() - 1; i++) {
if (path[i] == '/') {
......
#ifndef VISUALDL_UTILS_GUARD_H
#define VISUALDL_UTILS_GUARD_H
namespace visualdl {
namespace guard {
template <typename T>
class BasicGuard {
public:
BasicGuard(const T* x) : data_(x) { start(); }
~BasicGuard() { end(); }
void start() {}
void end() {}
private:
const T* data_;
};
#define DECL_GUARD(T) \
using WriteGuard = SimpleWriteSyncGuard<T>; \
using ReadGuard = guard::BasicGuard<T>;
// #define DECL_GUARD(T) \
// using WriteGuard = guard::BasicGuard<T>; \
// using ReadGuard = guard::BasicGuard<T>;
#define READ_GUARD ReadGuard _(this);
#define WRITE_GUARD WriteGuard _(this);
} // namespace guard
} // namespace visualdl
#endif
#ifndef VISUALDL_UTILS_STRING_H
#define VISUALDL_UTILS_STRING_H
#include <sstream>
#include <string>
namespace visualdl {
namespace string {
static void TagEncode(std::string& tag) {
for (auto& c : tag) {
if (c == '/') {
c = '%';
}
}
}
static void TagDecode(std::string& tag) {
for (auto& c : tag) {
if (c == '%') {
c = '/';
}
}
}
} // namespace string
} // namespace visualdl
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册