提交 8dbaac0f 编写于 作者: Y Yan Chunwei 提交者: GitHub

Merge pull request #40 from Superjom/feature/refactor_storage_interface

...@@ -30,5 +30,6 @@ add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/backend/logic) ...@@ -30,5 +30,6 @@ add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/backend/logic)
add_executable(vl_test add_executable(vl_test
${PROJECT_SOURCE_DIR}/visualdl/backend/test.cc ${PROJECT_SOURCE_DIR}/visualdl/backend/test.cc
${PROJECT_SOURCE_DIR}/visualdl/backend/storage/storage_test.cc
${PROJECT_SOURCE_DIR}/visualdl/backend/logic/im_test.cc) ${PROJECT_SOURCE_DIR}/visualdl/backend/logic/im_test.cc)
target_link_libraries(vl_test storage im gtest glog protobuf gflags) target_link_libraries(vl_test storage im gtest glog protobuf gflags pthread)
...@@ -26,24 +26,21 @@ int ReserviorSample(int num_samples, int num_records) { ...@@ -26,24 +26,21 @@ int ReserviorSample(int num_samples, int num_records) {
} }
void InformationMaintainer::SetPersistDest(const std::string &path) { void InformationMaintainer::SetPersistDest(const std::string &path) {
CHECK(storage_.mutable_data()->dir().empty()) CHECK(storage_->mutable_data()->dir().empty())
<< "duplicate set storage's path"; << "duplicate set storage's path";
storage_.mutable_data()->set_dir(path); storage_->mutable_data()->set_dir(path);
} }
storage::Tablet *InformationMaintainer::AddTablet(const std::string &tag, storage::Tablet *InformationMaintainer::AddTablet(const std::string &tag,
int num_samples) { int num_samples) {
auto *tablet = storage_.Find(tag); auto tablet = storage_->NewTablet(tag, num_samples);
if (!tablet) {
tablet = storage_.Add(tag, num_samples);
}
return tablet; return tablet;
} }
void InformationMaintainer::AddRecord(const std::string &tag, void InformationMaintainer::AddRecord(const std::string &tag,
const storage::Record &data) { const storage::Record &data) {
auto *tablet = storage_.Find(tag); auto *tablet = storage_->tablet(tag);
CHECK(tablet); CHECK(tablet) << "no tablet called " << tag;
auto num_records = tablet->total_records(); auto num_records = tablet->total_records();
const auto num_samples = tablet->num_samples(); const auto num_samples = tablet->num_samples();
...@@ -59,9 +56,9 @@ void InformationMaintainer::AddRecord(const std::string &tag, ...@@ -59,9 +56,9 @@ void InformationMaintainer::AddRecord(const std::string &tag,
storage::Record *record; storage::Record *record;
if (offset >= num_records) { if (offset >= num_records) {
record = storage_.NewRecord(tag); record = tablet->add_records();
} else { } else {
record = storage_.GetRecord(tag, offset); record = tablet->mutable_records(offset);
} }
*record = data; *record = data;
...@@ -76,10 +73,10 @@ void InformationMaintainer::Clear() { ...@@ -76,10 +73,10 @@ void InformationMaintainer::Clear() {
} }
void InformationMaintainer::PersistToDisk() { void InformationMaintainer::PersistToDisk() {
CHECK(!storage_.data().dir().empty()) << "path of storage should be set"; CHECK(!storage_->data().dir().empty()) << "path of storage should be set";
// TODO make dir first // TODO make dir first
// MakeDir(storage_.data().dir()); // MakeDir(storage_.data().dir());
storage_.Save(storage_.data().dir() + "/storage.pb"); storage_->PersistToDisk();
} }
} // namespace visualdl } // namespace visualdl
#ifndef VISUALDL_BACKEND_LOGIC_IM_H #ifndef VISUALDL_BACKEND_LOGIC_IM_H
#define VISUALDL_BACKEND_LOGIC_IM_H #define VISUALDL_BACKEND_LOGIC_IM_H
#include <glog/logging.h>
#include <memory>
#include <string> #include <string>
#include "visualdl/backend/storage/storage.h" #include "visualdl/backend/storage/storage.h"
...@@ -13,7 +15,15 @@ namespace visualdl { ...@@ -13,7 +15,15 @@ namespace visualdl {
*/ */
class InformationMaintainer final { class InformationMaintainer final {
public: public:
InformationMaintainer() {} InformationMaintainer(StorageBase::Type type = StorageBase::Type::kMemory) {
switch (type) {
case StorageBase::Type::kMemory:
storage_.reset(new MemoryStorage);
break;
default:
CHECK(false) << "Unsupported storage kind " << type;
}
}
static InformationMaintainer &Global() { static InformationMaintainer &Global() {
static InformationMaintainer *x = new InformationMaintainer(); static InformationMaintainer *x = new InformationMaintainer();
...@@ -46,10 +56,10 @@ public: ...@@ -46,10 +56,10 @@ public:
*/ */
void PersistToDisk(); void PersistToDisk();
Storage &storage() { return storage_; } StorageBase &storage() { return *storage_; }
private: private:
Storage storage_; std::unique_ptr<StorageBase> storage_;
}; };
} // namespace visualdl } // namespace visualdl
......
...@@ -11,19 +11,25 @@ protected: ...@@ -11,19 +11,25 @@ protected:
InformationMaintainer &im = InformationMaintainer::Global(); InformationMaintainer &im = InformationMaintainer::Global();
}; };
TEST_F(ImTester, AddTablet) { im.AddTablet("tag0", 20); } TEST_F(ImTester, AddTablet) {
im.Clear();
im.AddTablet("tag0", 20);
}
TEST_F(ImTester, AddRecord) { TEST_F(ImTester, AddRecord) {
im.Clear();
im.AddTablet("tag0", 20);
for (int i = 0; i < 100; i++) {
storage::Record rcd; storage::Record rcd;
rcd.set_dtype(storage::DataType::kInt32s); rcd.set_dtype(storage::DataType::kInt32s);
for (int i = 0; i < 100; i++) {
for (int j = 0; j < 10; j++) { for (int j = 0; j < 10; j++) {
rcd.add_data()->add_i32s(i * 20 + j); rcd.add_data()->add_i32s(i * 20 + j);
} }
im.AddRecord("tag0", rcd); im.AddRecord("tag0", rcd);
} }
ASSERT_EQ(im.storage().Find("tag0")->records_size(), 20UL); ASSERT_EQ(im.storage().tablet("tag0")->records_size(), 100UL);
} }
} // namespace visualdl } // namespace visualdl
...@@ -98,7 +98,7 @@ public: ...@@ -98,7 +98,7 @@ public:
InformationMaintainer::Global().storage().mutable_data()); InformationMaintainer::Global().storage().mutable_data());
} }
TabletHelper tablet(const std::string &tag) { TabletHelper tablet(const std::string &tag) {
return TabletHelper(InformationMaintainer::Global().storage().Find(tag)); return TabletHelper(InformationMaintainer::Global().storage().tablet(tag));
} }
TabletHelper AddTablet(const std::string &tag, int num_samples) { TabletHelper AddTablet(const std::string &tag, int num_samples) {
return TabletHelper( return TabletHelper(
......
...@@ -2,62 +2,69 @@ ...@@ -2,62 +2,69 @@
#include <fstream> #include <fstream>
#include "visualdl/backend/storage/storage.h" #include "visualdl/backend/storage/storage.h"
#include "visualdl/backend/utils/filesystem.h"
namespace visualdl { namespace visualdl {
storage::Tablet *Storage::Add(const std::string &tag, int num_samples) { std::string GenPathFromTag(const std::string &dir, const std::string &tag) {
auto *tablet = &(*proto_.mutable_tablets())[tag]; return dir + "/" + tag;
tablet->set_num_samples(num_samples);
return tablet;
} }
storage::Tablet *Storage::Find(const std::string &tag) { const std::string StorageBase::meta_file_name = "storage.meta";
auto it = proto_.mutable_tablets()->find(tag);
if (it != proto_.tablets().end()) { storage::Tablet *MemoryStorage::NewTablet(const std::string &tag,
int num_samples) {
auto it = tablets_.find(tag);
if (it == tablets_.end()) {
// create new tablet
tablets_[tag] = storage::Tablet();
tablets_[tag].set_tag(tag);
*storage_.add_tags() = tag;
} else {
return &it->second; return &it->second;
} }
return nullptr; return &tablets_[tag];
}
storage::Record *Storage::NewRecord(const std::string &tag) {
auto *tablet = Find(tag);
CHECK(tablet) << "Tablet" << tag << " should be create first";
auto *record = tablet->mutable_records()->Add();
// increase num_records
int num_records = tablet->total_records();
tablet->set_total_records(num_records + 1);
return record;
}
storage::Record *Storage::GetRecord(const std::string &tag, int offset) {
auto *tablet = Find(tag);
CHECK(tablet) << "Tablet" << tag << " should be create first";
auto num_records = tablet->total_records();
CHECK_LT(offset, num_records) << "invalid offset";
return tablet->mutable_records()->Mutable(offset);
} }
void Storage::Save(const std::string &path) const { storage::Tablet *MemoryStorage::tablet(const std::string &tag) {
std::ofstream file(path, file.binary | file.out); auto it = tablets_.find(tag);
CHECK(file.is_open()) << "can't open path " << path; CHECK(it != tablets_.end()) << "tablet tagged as " << tag << " not exists";
auto str = Serialize(); return &it->second;
file.write(str.c_str(), str.size());
} }
void Storage::Load(const std::string &path) { void MemoryStorage::PersistToDisk() const {
std::ifstream file(path, file.binary); VLOG(3) << "persist storage to disk path " << storage_.dir();
CHECK(file.is_open()) << "can't open path " << path; // make a directory if not exist
size_t size = file.tellg(); fs::TryMkdir(storage_.dir());
std::string buffer(size, ' '); // write storage out
file.seekg(0); CHECK(!storage_.dir().empty()) << "storage's dir should be set first";
file.read(&buffer[0], size); const auto meta_path = storage_.dir() + "/" + meta_file_name;
DeSerialize(buffer); fs::Write(meta_path, fs::Serialize(storage_));
// write all the tablets
for (auto tag : storage_.tags()) {
auto path = GenPathFromTag(storage_.dir(), tag);
auto it = tablets_.find(tag);
CHECK(it != tablets_.end());
fs::Write(path, fs::Serialize(it->second));
}
} }
std::string Storage::Serialize() const { return proto_.SerializeAsString(); } void MemoryStorage::LoadFromDisk(const std::string &dir) {
VLOG(3) << "load storage from disk path " << dir;
CHECK(!dir.empty()) << "dir is empty";
// load storage
const auto meta_path = dir + "/" + meta_file_name;
auto buf = fs::Read(meta_path);
CHECK(fs::DeSerialize(&storage_, buf))
<< "failed to parse protobuf loaded from " << meta_path;
void Storage::DeSerialize(const std::string &data) { // load all the tablets
proto_.ParseFromString(data); for (int i = 0; i < storage_.tags_size(); i++) {
std::string tag = storage_.tags(i);
auto path = GenPathFromTag(storage_.dir(), tag);
CHECK(tablets_[tag].ParseFromString(fs::Read(path)))
<< "failed to parse protobuf text loaded from " << path;
}
} }
} // namespace visualdl } // namespace visualdl
...@@ -2,68 +2,84 @@ ...@@ -2,68 +2,84 @@
#define VISUALDL_STORAGE_H #define VISUALDL_STORAGE_H
#include <time.h> #include <time.h>
#include <map>
#include <string> #include <string>
#include "visualdl/backend/storage/storage.pb.h" #include "visualdl/backend/storage/storage.pb.h"
namespace visualdl { namespace visualdl {
class Storage final { /*
* Generate a tablet path in disk from its tag.
*/
inline std::string GenPathFromTag(const std::string &dir,
const std::string &tag);
/*
* Storage Interface. The might be a bunch of implementations, for example, a
* MemStorage that keep a copy of all the taplets in memory, can be changed with
* a higher performance; a DiskStorage that keep all the data in disk, apply to
* the scenerios where memory consumption should be considered.
*/
class StorageBase {
public: public:
Storage() { const static std::string meta_file_name;
// set time stamp
time_t time0;
time(&time0);
proto_.set_timestamp(time0);
}
/* enum Type { kMemory = 0, kDisk = 1 };
* Add a new tablet named `tag`, the newly added instance will be returned.
*/
storage::Tablet *Add(const std::string &tag, int num_samples);
/* void SetStorage(const std::string &dir) {
* Search the tablet named `tag`, if not exist, return nullptr. time_t t;
*/ time(&t);
storage::Tablet *Find(const std::string &tag); storage_.set_timestamp(t);
storage_.set_dir(dir);
}
/* /*
* Append a new record to the tail of tablet. * Create a new Tablet storage.
*/ */
storage::Record *NewRecord(const std::string &tag); virtual storage::Tablet *NewTablet(const std::string &tag,
int num_samples) = 0;
/* /*
* Get a record at `offset`, if the offset is not valid, yield a failed CHECK. * Get a tablet from memory, this can be viewed as a cache, if the storage is
* in disk, a hash map in memory will first load the corresponding Tablet
* Protobuf from disk and hold all the changes.
*/ */
storage::Record *GetRecord(const std::string &tag, int offset); virtual storage::Tablet *tablet(const std::string &tag) = 0;
/* /*
* Serialize this object to string and save it to a file. * Persist the data from cache to disk. Both the memory storage or disk
* storage should write changes to disk for persistence.
*/ */
void Save(const std::string &path) const; virtual void PersistToDisk() const = 0;
/* /*
* Load the Protobuf message from a file. * Load data from disk.
*/ */
void Load(const std::string &path); virtual void LoadFromDisk(const std::string &dir) = 0;
storage::Storage *mutable_data() { return &proto_; }
const storage::Storage &data() { return proto_; } storage::Storage *mutable_data() { return &storage_; }
const storage::Storage &data() { return storage_; }
protected: protected:
/* storage::Storage storage_;
* Serialize the Storage instance to string. };
*/
std::string Serialize() const;
/* /*
* De-serialize from a string and update this Storage instance. * Storage in Memory, that will support quick edits on data.
*/ */
void DeSerialize(const std::string &data); class MemoryStorage final : public StorageBase {
public:
storage::Tablet *NewTablet(const std::string &tag, int num_samples) override;
storage::Tablet *tablet(const std::string &tag) override;
void PersistToDisk() const override;
void LoadFromDisk(const std::string &dir) override;
private: private:
storage::Storage proto_; std::map<std::string, storage::Tablet> tablets_;
}; };
} // namespace visualdl } // namespace visualdl
......
...@@ -130,7 +130,9 @@ The Storage stores all the records. ...@@ -130,7 +130,9 @@ The Storage stores all the records.
*/ */
message Storage { message Storage {
// tags to Tablet, should be thread safe if fix the keys after initialization. // tags to Tablet, should be thread safe if fix the keys after initialization.
// TODO to delete in the new storage interface.
map<string, Tablet> tablets = 1; map<string, Tablet> tablets = 1;
repeated string tags = 4;
string dir = 2; string dir = 2;
int64 timestamp = 3; int64 timestamp = 3;
} }
#include "visualdl/backend/storage/storage.h"
#include <glog/logging.h>
#include <gtest/gtest.h>
namespace visualdl {
using namespace std;
class MemoryStorageTest : public ::testing::Test {
public:
void SetUp() override { storage_.SetStorage("./tmp"); }
MemoryStorage storage_;
};
TEST_F(MemoryStorageTest, SetStorage) {
string dir = "./tmp";
storage_.SetStorage(dir);
ASSERT_EQ(storage_.data().dir(), dir);
}
TEST_F(MemoryStorageTest, AddTablet) {
// TODO need to escape tag as name
string tag = "add%20tag0";
storage_.NewTablet(tag, -1);
auto* tablet = storage_.tablet(tag);
ASSERT_TRUE(tablet != nullptr);
ASSERT_EQ(tablet->tag(), tag);
}
TEST_F(MemoryStorageTest, PersistToDisk) {
string tag = "add%20tag0";
storage_.NewTablet(tag, -1);
storage_.PersistToDisk();
MemoryStorage other;
other.LoadFromDisk("./tmp");
ASSERT_EQ(other.data().SerializeAsString(),
storage_.data().SerializeAsString());
}
} // namespace visualdl
#ifndef VISUALDL_BACKEND_UTILS_FILESYSTEM_H
#define VISUALDL_BACKEND_UTILS_FILESYSTEM_H
#include <google/protobuf/text_format.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <fstream>
namespace visualdl {
namespace fs {
template <typename T>
std::string Serialize(const T& proto, bool human_readable = false) {
if (human_readable) {
std::string buffer;
google::protobuf::TextFormat::PrintToString(proto, &buffer);
return buffer;
}
return proto.SerializeAsString();
}
template <typename T>
bool DeSerialize(T* proto, const std::string buf, bool human_readable = false) {
// NOTE human_readable not valid
if (human_readable) {
return google::protobuf::TextFormat::ParseFromString(buf, proto);
}
return proto->ParseFromString(buf);
}
void TryMkdir(const std::string& dir) {
VLOG(1) << "try to mkdir " << dir;
struct stat st = {0};
if (stat(dir.c_str(), &st) == -1) {
::mkdir(dir.c_str(), 0700);
}
}
inline void Write(const std::string& path,
const std::string& buffer,
std::ios::openmode open_mode = std::ios::binary) {
VLOG(1) << "write to path " << path;
std::ofstream file(path, open_mode);
CHECK(file.is_open()) << "failed to open " << path;
file.write(buffer.c_str(), buffer.size());
file.close();
}
inline std::string Read(const std::string& path,
std::ios::openmode open_mode = std::ios::binary) {
VLOG(1) << "read from path " << path;
std::string buffer;
std::ifstream file(path, open_mode | std::ios::ate);
CHECK(file.is_open()) << "failed to open " << path;
size_t size = file.tellg();
file.seekg(0);
buffer.resize(size);
file.read(&buffer[0], size);
return buffer;
}
} // namespace fs
} // namespace visualdl
#endif // VISUALDL_BACKEND_UTILS_FILESYSTEM_H
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册