提交 dacc87a1 编写于 作者: S superjom

refactor storage

上级 7f375258
......@@ -32,19 +32,19 @@ include_directories(${CMAKE_CURRENT_BINARY_DIR})
include_directories(${PROJECT_SOURCE_DIR}/thirdparty/local/include)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/storage)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/logic)
#add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/logic)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/python)
add_executable(vl_test
${PROJECT_SOURCE_DIR}/visualdl/test.cc
${PROJECT_SOURCE_DIR}/visualdl/storage/storage_test.cc
${PROJECT_SOURCE_DIR}/visualdl/utils/test_concurrency.cc
${PROJECT_SOURCE_DIR}/visualdl/logic/im_test.cc
${PROJECT_SOURCE_DIR}/visualdl/logic/sdk_test.cc
#${PROJECT_SOURCE_DIR}/visualdl/logic/im_test.cc
#${PROJECT_SOURCE_DIR}/visualdl/logic/sdk_test.cc
${PROJECT_SOURCE_DIR}/visualdl/utils/concurrency.h
${PROJECT_SOURCE_DIR}/visualdl/utils/filesystem.h
)
target_link_libraries(vl_test storage sdk im gtest glog protobuf gflags pthread)
target_link_libraries(vl_test storage gtest glog protobuf gflags pthread)
enable_testing ()
......
add_library(sdk ${PROJECT_SOURCE_DIR}/visualdl/logic/sdk.cc)
add_library(im ${PROJECT_SOURCE_DIR}/visualdl/logic/im.cc)
add_dependencies(im storage_proto)
add_dependencies(sdk storage_proto)
#add_library(sdk ${PROJECT_SOURCE_DIR}/visualdl/logic/sdk.cc)
#add_library(im ${PROJECT_SOURCE_DIR}/visualdl/logic/im.cc)
#add_dependencies(im storage_proto)
#add_dependencies(sdk storage_proto)
## pybind
add_library(core SHARED ${PROJECT_SOURCE_DIR}/visualdl/logic/pybind.cc)
add_dependencies(core pybind python im storage sdk protobuf glog)
target_link_libraries(core PRIVATE pybind python im storage sdk protobuf glog)
set_target_properties(core PROPERTIES PREFIX "" SUFFIX ".so")
#add_library(core SHARED ${PROJECT_SOURCE_DIR}/visualdl/logic/pybind.cc)
#add_dependencies(core pybind python im storage sdk protobuf glog)
#target_link_libraries(core PRIVATE pybind python im storage sdk protobuf glog)
#set_target_properties(core PROPERTIES PREFIX "" SUFFIX ".so")
......@@ -4,5 +4,14 @@ add_library(storage_proto ${PROTO_SRCS})
add_dependencies(storage_proto protobuf)
## add storage as target
#add_library(storage storage.cc storage.h ${PROTO_SRCS} ${PROTO_HDRS})
add_library(entry entry.cc entry.h ${PROTO_SRCS} ${PROTO_HDRS})
add_library(tablet tablet.cc tablet.h ${PROTO_SRCS} ${PROTO_HDRS})
add_library(record record.cc record.h ${PROTO_SRCS} ${PROTO_HDRS})
add_library(storage storage.cc storage.h ${PROTO_SRCS} ${PROTO_HDRS})
add_dependencies(entry storage_proto)
add_dependencies(tablet storage_proto)
add_dependencies(record storage_proto)
add_dependencies(storage storage_proto)
#add_dependencies(storage storage_proto)
#include <glog/logging.h>
#include <fstream>
#include "visualdl/storage/storage.h"
#include "visualdl/utils/concurrency.h"
#include "visualdl/utils/filesystem.h"
namespace visualdl {
const std::string StorageBase::meta_file_name = "storage.meta";
std::string StorageBase::meta_path(const std::string &dir) const {
CHECK(!dir.empty()) << "dir is empty";
return dir + "/" + meta_file_name;
}
std::string StorageBase::tablet_path(const std::string &dir,
const std::string &tag) const {
CHECK(!dir.empty()) << "dir should be set first";
return dir + "/" + tag;
}
storage::Tablet *MemoryStorage::NewTablet(const std::string &tag,
int num_samples) {
auto it = tablets_.find(tag);
if (it == tablets_.end()) {
// create new tablet
tablets_[tag] = storage::Tablet();
tablets_[tag].set_tag(tag);
*storage_.add_tags() = tag;
} else {
return &it->second;
}
return &tablets_[tag];
}
storage::Tablet *MemoryStorage::tablet(const std::string &tag) {
auto it = tablets_.find(tag);
CHECK(it != tablets_.end()) << "tablet tagged as " << tag << " not exists";
return &it->second;
}
// TODO add some checksum to avoid unnecessary saving
void MemoryStorage::PersistToDisk(const std::string &dir) {
CHECK(!dir.empty());
storage_.set_dir(dir);
// make a directory if not exist
fs::TryRecurMkdir(dir);
// write storage out
VLOG(2) << "to serize meta to dir " << dir;
fs::SerializeToFile(storage_, meta_path(dir));
VLOG(2) << "serize meta to dir " << dir;
// write all the tablets
for (auto tag : storage_.tags()) {
auto it = tablets_.find(tag);
CHECK(it != tablets_.end());
fs::SerializeToFile(it->second, tablet_path(dir, tag));
}
}
// TODO add some checksum to avoid unnecessary loading
void MemoryStorage::LoadFromDisk(const std::string &dir) {
CHECK(!dir.empty()) << "dir is empty";
storage_.set_dir(dir);
// load storage
CHECK(fs::DeSerializeFromFile(&storage_, meta_path(dir)))
<< "parse from " << meta_path(dir) << " failed";
// load all the tablets
for (int i = 0; i < storage_.tags_size(); i++) {
auto tag = storage_.tags(i);
CHECK(fs::DeSerializeFromFile(&tablets_[tag], tablet_path(dir, tag)));
}
}
void MemoryStorage::StartReadService(const std::string &dir,
int msecs,
std::mutex *handler) {
CHECK(executor_ != nullptr);
CHECK(!dir.empty()) << "dir should be set first";
cc::PeriodExector::task_t task = [dir, this, handler] {
VLOG(1) << "loading from " << dir;
if (handler != nullptr) {
std::lock_guard<std::mutex> _(*handler);
LoadFromDisk(dir);
} else {
LoadFromDisk(dir);
}
return true;
};
// executor_.Start();
VLOG(1) << "push read task";
(*executor_)(std::move(task), msecs);
}
void MemoryStorage::StartWriteService(const std::string &dir,
int msecs,
std::mutex *handler) {
CHECK(executor_ != nullptr);
CHECK(!dir.empty()) << "dir should be set first";
storage_.set_dir(dir);
// executor_.Start();
cc::PeriodExector::task_t task = [dir, handler, this] {
VLOG(2) << "persist to disk";
if (handler != nullptr) {
std::lock_guard<std::mutex> _(*handler);
PersistToDisk(dir);
} else {
PersistToDisk(dir);
}
return true;
};
(*executor_)(std::move(task), msecs);
}
} // namespace visualdl
#include "visualdl/storage/storage.h"
\ No newline at end of file
#ifndef VISUALDL_STORAGE_H
#define VISUALDL_STORAGE_H
#ifndef VISUALDL_STORAGE_STORAGE_H
#define VISUALDL_STORAGE_STORAGE_H
#include <time.h>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <glog/logging.h>
#include <vector>
#include "visualdl/storage/storage.pb.h"
#include "visualdl/utils/concurrency.h"
#include "visualdl/storage/tablet.h"
namespace visualdl {
/*
* Generate a tablet path in disk from its tag.
* Helper for operations on storage::Storage.
*/
inline std::string GenPathFromTag(const std::string &dir,
const std::string &tag);
/*
* Storage Interface. The might be a bunch of implementations, for example, a
* MemStorage that keep a copy of all the taplets in memory, can be changed with
* a higher performance; a DiskStorage that keep all the data in disk, apply to
* the scenerios where memory consumption should be considered.
*/
class StorageBase {
public:
const static std::string meta_file_name;
enum Type { kMemory = 0, kDisk = 1 };
// mode of the sevice, either reading or writing.
enum Mode { kRead = 0, kWrite = 1, kNone = 2 };
void SetStorage(const std::string &dir) {
struct Storage {
Storage() {}
Storage(storage::Storage* x) : data_(x) {
time_t t;
time(&t);
storage_.set_timestamp(t);
storage_.set_dir(dir);
data_->set_timestamp(t);
}
std::string meta_path(const std::string &dir) const;
std::string tablet_path(const std::string &dir, const std::string &tag) const;
/*
* Create a new Tablet storage.
*/
virtual storage::Tablet *NewTablet(const std::string &tag,
int num_samples) = 0;
/*
* Get a tablet from memory, this can be viewed as a cache, if the storage is
* in disk, a hash map in memory will first load the corresponding Tablet
* Protobuf from disk and hold all the changes.
*/
virtual storage::Tablet *tablet(const std::string &tag) = 0;
/*
* Persist the data from cache to disk. Both the memory storage or disk
* storage should write changes to disk for persistence.
*/
virtual void PersistToDisk(const std::string &dir) = 0;
/*
* Load data from disk.
*/
virtual void LoadFromDisk(const std::string &dir) = 0;
storage::Storage *mutable_data() { return &storage_; }
const storage::Storage &data() { return storage_; }
protected:
storage::Storage storage_;
};
/*
* Storage in Memory, that will support quick edits on data.
*/
class MemoryStorage final : public StorageBase {
public:
MemoryStorage() {}
MemoryStorage(cc::PeriodExector *executor) : executor_(executor) {}
~MemoryStorage() {
if (executor_ != nullptr) executor_->Quit();
std::vector<std::string> Modes() {
return std::vector<std::string>(data_->modes().begin(),
data_->modes().end());
}
storage::Tablet *NewTablet(const std::string &tag, int num_samples) override;
storage::Tablet *tablet(const std::string &tag) override;
void PersistToDisk(const std::string &dir) override;
void LoadFromDisk(const std::string &dir) override;
void AddMode(const std::string& x) { *data_->add_modes() = x; }
/*
* Create a thread which will keep reading the latest data from the disk to
* memory.
*
* msecs: how many millisecond to sync memory and disk.
*/
void StartReadService(const std::string &dir, int msecs, std::mutex *handler);
Tablet AddTablet(const std::string& x) {
AddTag(x);
CHECK(tablets_.count(x) == 0) << "tablet [" << x << "] has existed";
tablets_[x] = storage::Tablet();
return Tablet(&tablets_[x]);
}
/*
* Create a thread which will keep writing the latest changes from memory to
* disk.
*
* msecs: how many millisecond to sync memory and disk.
*/
void StartWriteService(const std::string &dir,
int msecs,
std::mutex *handler);
protected:
void AddTag(const std::string& x) { *data_->add_tags() = x; }
private:
std::map<std::string, storage::Tablet> tablets_;
// TODO(ChunweiYan) remove executor here.
cc::PeriodExector *executor_{nullptr};
storage::Storage* data_{nullptr};
};
} // namespace visualdl
#endif // VISUALDL_STORAGE_H
#endif
......@@ -91,25 +91,33 @@ message Record {
/*
A Tablet stores the records of a component which type is `component` and
indidates as `tag`.
indidated as `tag`.
The records will be saved in a file which name contains `tag`. During the
running period,
`num_records` will be accumulated, and `num_samples` indicates the size of
sample set the
reservoir sampling algorithm will collect.
sample set the reservoir sampling algorithm will collect, if `num_samples`
set to -1, no sample will be applied.
*/
message Tablet {
// the kinds of the components that supported
// the kinds of the components that supported.
enum Type {
kScalar = 0;
kHistogram = 1;
kGraph = 2;
kImage = 2;
}
// The unique identification for this `Tablet`. VisualDL will have no the
// concept of FileWriter like TB. It will store all the tablets in a single
// directory, so it has a `mode` concept. `mode` will be stored in `tag`
// as the prefix, so that the same tablet in different modes will have
// different `tag`. for example, a tablet called "layer/grad/min" in "train"
// and "test" mode will have tags like "train/layer/grad/min" and
// "test/layer/grad/min".
string tag = 6;
// type of the component, different component should have different storage
// format.
Type component = 1;
// records the total count of records, each Write operation should increate
// this value.
// Keep a record of the total count of records, each Write operation should
// increate this value.
int64 total_records = 2;
// indicate the number of instances to sample, this should be a constant
// value.
......@@ -117,22 +125,21 @@ message Tablet {
repeated Record records = 4;
// store a meta infomation if all the records share.
Entry meta = 5;
// the unique identification for this `Tablet`.
string tag = 6;
// one tablet might have multiple captions, for example, a scalar component
// might have
// two plots labeled "train" and "test".
// one tablet might have just one caption, if not set, it should be the value
// of `mode`.
repeated string captions = 7;
string description = 8;
}
/*
The Storage stores all the records.
*/
message Storage {
// tags to Tablet, should be thread safe if fix the keys after initialization.
// TODO to delete in the new storage interface.
map<string, Tablet> tablets = 1;
repeated string tags = 4;
string dir = 2;
int64 timestamp = 3;
// VisualDL will not have the concept like TB's FileWriter, just one storage,
// each tablet has different `mode`.
repeated string modes = 1;
// tags will be used to generate paths of tablets.
repeated string tags = 2;
int64 timestamp = 5;
}
#include "visualdl/storage/storage.h"
#include <glog/logging.h>
#include <gtest/gtest.h>
#include <memory>
namespace visualdl {
using namespace std;
class MemoryStorageTest : public ::testing::Test {
class StorageTest : public ::testing::Test {
public:
void SetUp() override { storage_.SetStorage("./tmp"); }
void SetUp() { storage.reset(new Storage(&data_)); }
MemoryStorage storage_;
storage::Storage data_;
std::unique_ptr<Storage> storage;
};
TEST_F(MemoryStorageTest, SetStorage) {
string dir = "./tmp";
storage_.SetStorage(dir);
ASSERT_EQ(storage_.data().dir(), dir);
}
TEST_F(MemoryStorageTest, AddTablet) {
// TODO need to escape tag as name
string tag = "add%20tag0";
storage_.NewTablet(tag, -1);
auto* tablet = storage_.tablet(tag);
ASSERT_TRUE(tablet != nullptr);
ASSERT_EQ(tablet->tag(), tag);
}
TEST_F(MemoryStorageTest, PersistToDisk) {
const std::string dir = "./tmp/201.test";
storage_.SetStorage(dir);
string tag = "add%20tag0";
storage_.NewTablet(tag, -1);
TEST_F(StorageTest, main) {
storage->AddMode("train");
storage->AddMode("test");
storage_.PersistToDisk(dir);
LOG(INFO) << "persist to disk";
auto tag0 = storage->AddTablet("tag0");
auto tag1 = storage->AddTablet("tag1");
MemoryStorage other;
other.LoadFromDisk(dir);
LOG(INFO) << "read from disk";
ASSERT_EQ(other.data().SerializeAsString(),
storage_.data().SerializeAsString());
auto modes = storage->Modes();
ASSERT_EQ(modes.size(), 2);
ASSERT_EQ(modes[0], "train");
ASSERT_EQ(modes[1], "test");
}
} // namespace visualdl
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册