提交 29259816 编写于 作者: Y Yan Chunwei 提交者: GitHub

Merge pull request #44 from ChunweiYan/feature/support_web_server_storage_read

......@@ -31,5 +31,9 @@ add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/backend/logic)
add_executable(vl_test
${PROJECT_SOURCE_DIR}/visualdl/backend/test.cc
${PROJECT_SOURCE_DIR}/visualdl/backend/storage/storage_test.cc
${PROJECT_SOURCE_DIR}/visualdl/backend/logic/im_test.cc)
${PROJECT_SOURCE_DIR}/visualdl/backend/utils/test_concurrency.cc
${PROJECT_SOURCE_DIR}/visualdl/backend/logic/im_test.cc
${PROJECT_SOURCE_DIR}/visualdl/backend/utils/concurrency.h
${PROJECT_SOURCE_DIR}/visualdl/backend/utils/filesystem.h
)
target_link_libraries(vl_test storage im gtest glog protobuf gflags pthread)
......@@ -25,20 +25,39 @@ int ReserviorSample(int num_samples, int num_records) {
return -1;
}
void InformationMaintainer::SetPersistDest(const std::string &path) {
IM::IM(StorageBase::Type type, StorageBase::Mode mode) {
switch (type) {
case StorageBase::Type::kMemory: {
storage_.reset(new MemoryStorage);
} break;
default:
CHECK(false) << "Unsupported storage kind " << type;
}
switch (mode) {
case StorageBase::Mode::kRead:
dynamic_cast<MemoryStorage *>(storage_.get())->StartReadService();
break;
case StorageBase::Mode::kWrite:
dynamic_cast<MemoryStorage *>(storage_.get())->StartWriteSerice();
break;
default:
break;
}
}
void IM::SetPersistDest(const std::string &path) {
CHECK(storage_->mutable_data()->dir().empty())
<< "duplicate set storage's path";
storage_->mutable_data()->set_dir(path);
}
storage::Tablet *InformationMaintainer::AddTablet(const std::string &tag,
int num_samples) {
storage::Tablet *IM::AddTablet(const std::string &tag, int num_samples) {
auto tablet = storage_->NewTablet(tag, num_samples);
return tablet;
}
void InformationMaintainer::AddRecord(const std::string &tag,
const storage::Record &data) {
void IM::AddRecord(const std::string &tag, const storage::Record &data) {
auto *tablet = storage_->tablet(tag);
CHECK(tablet) << "no tablet called " << tag;
......@@ -65,14 +84,14 @@ void InformationMaintainer::AddRecord(const std::string &tag,
tablet->set_total_records(num_records + 1);
}
void InformationMaintainer::Clear() {
void IM::Clear() {
auto *data = storage().mutable_data();
data->clear_tablets();
data->clear_dir();
data->clear_timestamp();
}
void InformationMaintainer::PersistToDisk() {
void IM::PersistToDisk() {
CHECK(!storage_->data().dir().empty()) << "path of storage should be set";
// TODO make dir first
// MakeDir(storage_.data().dir());
......
......@@ -2,6 +2,7 @@
#define VISUALDL_BACKEND_LOGIC_IM_H
#include <glog/logging.h>
#include <visualdl/backend/utils/concurrency.h>
#include <memory>
#include <string>
......@@ -10,24 +11,38 @@
namespace visualdl {
/*
* Maintain the Storage singleton in memory, pre-compute some the statical
* information to help visualizaton.
* IM(Information Maintainer) maintain the Storage singleton in memory,
* pre-compute some the statistical information to help visualizaton.
*
* There should be two processes and each have an IM, one is the web server
* which hold one IM to read the storage, the other is the SDK(python or C++),
* it will get an IM to write latest changes to storage.
*
* An IM have an underlying Storage object, which might be a memory based
* storage or a disk based one, both has the same interfaces those defined by
* class StorageBase.
*
* The SDK's IM will maintain the changes and periodically write to disk, and
* the web server's IM will periodically read latest storage from disk.
*/
class InformationMaintainer final {
class IM final {
public:
InformationMaintainer(StorageBase::Type type = StorageBase::Type::kMemory) {
switch (type) {
case StorageBase::Type::kMemory:
storage_.reset(new MemoryStorage);
break;
default:
CHECK(false) << "Unsupported storage kind " << type;
}
IM() { storage_.reset(new MemoryStorage); }
IM(StorageBase::Type type, StorageBase::Mode mode);
~IM() { cc::PeriodExector::Global().Quit(); }
static IM &Global() {
static IM x;
return x;
}
void MaintainRead() {
LOG(INFO) << "start maintain read";
dynamic_cast<MemoryStorage *>(storage_.get())->StartReadService();
}
static InformationMaintainer &Global() {
static InformationMaintainer *x = new InformationMaintainer();
return *x;
void MaintainWrite() {
dynamic_cast<MemoryStorage *>(storage_.get())->StartWriteSerice();
}
/*
......
......@@ -8,7 +8,7 @@ class ImTester : public ::testing::Test {
protected:
void SetUp() override {}
InformationMaintainer &im = InformationMaintainer::Global();
IM &im = IM::Global();
};
TEST_F(ImTester, AddTablet) {
......
......@@ -55,7 +55,14 @@ PYBIND11_PLUGIN(core) {
.def("persist_to_disk", &vs::ImHelper::PersistToDisk)
.def("clear_tablets", &vs::ImHelper::ClearTablets);
m.def("im", &vs::get_im, "global information-maintainer object.");
m.def("start_read_service",
&vs::start_read_service,
"global information-maintainer object.");
m.def("start_write_service",
&vs::start_write_service,
"global information-maintainer object.");
m.def("im", &vs::im);
m.def("stop_threads", &vs::StopThreads);
// interfaces for components
#define ADD_SCALAR_TYPED_INTERFACE(T, name__) \
......
......@@ -60,9 +60,7 @@ std::string TabletHelper::human_readable_buffer() const {
return buffer;
}
void ImHelper::PersistToDisk() const {
InformationMaintainer::Global().PersistToDisk();
}
void ImHelper::PersistToDisk() const { IM::Global().PersistToDisk(); }
// implementations for components
namespace components {
......
......@@ -77,7 +77,10 @@ public:
void SetBuffer(const storage::Storage &buffer) { *data_ = buffer; }
void SetBuffer(const std::string &buffer) { data_->ParseFromString(buffer); }
void SetDir(const std::string &dir) { data_->set_dir(dir); }
void SetDir(const std::string &dir) {
CHECK(data_) << "no storage instance hold";
data_->set_dir(dir);
}
int64_t timestamp() const { return data_->timestamp(); }
std::string dir() const { return data_->dir(); }
......@@ -86,64 +89,19 @@ public:
std::string human_readable_buffer() const;
private:
storage::Storage *data_;
storage::Storage *data_{nullptr};
};
class ImHelper {
public:
ImHelper() {}
StorageHelper storage() {
return StorageHelper(
InformationMaintainer::Global().storage().mutable_data());
}
TabletHelper tablet(const std::string &tag) {
return TabletHelper(InformationMaintainer::Global().storage().tablet(tag));
}
TabletHelper AddTablet(const std::string &tag, int num_samples) {
return TabletHelper(
InformationMaintainer::Global().AddTablet(tag, num_samples));
}
void ClearTablets() {
InformationMaintainer::Global().storage().mutable_data()->clear_tablets();
}
void PersistToDisk() const;
};
namespace components {
/*
* Read and write support for Scalar component.
*/
template <typename T>
class ScalarHelper {
public:
ScalarHelper(storage::Tablet *tablet) : data_(tablet) {}
void SetCaptions(const std::vector<std::string> &captions);
void AddRecord(int id, const std::vector<T> &values);
std::vector<std::vector<T>> GetRecords() const;
std::vector<int> GetIds() const;
std::vector<int> GetTimestamps() const;
std::vector<std::string> GetCaptions() const;
size_t GetSize() const { return data_->records_size(); }
private:
storage::Tablet *data_;
};
} // namespace components
static ImHelper &get_im() {
static ImHelper im;
return im;
/*
* mode:
* 0: read
* 1: write
* 2: none
*/
}
} // namespace visualdl
......
......@@ -4,8 +4,6 @@ __all__ = [
]
import core
im = core.im()
dtypes = ("float", "double", "int32", "int64")
......@@ -15,7 +13,19 @@ def set_storage(dir):
directory of summary to write log.
:return: None
'''
im.storage().set_dir(dir)
core.im().storage().set_dir(dir)
def set_readable_storage(dir):
core.start_read_service(dir)
def set_writable_storage(dir):
core.start_write_service(dir)
def stop_service():
core.stop_threads()
class _Scalar(object):
......@@ -95,7 +105,19 @@ def scalar(tag, dtype='float'):
'''
assert dtype in dtypes, "invalid dtype(%s), should be one of %s" % (
dtype, str(dtypes))
tablet = im.add_tablet(tag, -1)
tablet = core.im().add_tablet(tag, -1)
dtype2obj = {
'float': tablet.as_float_scalar,
'double': tablet.as_double_scalar,
'int32': tablet.as_int32_scalar,
'int64': tablet.as_int64_scalar,
}
obj = dtype2obj[dtype]()
return _Scalar(obj)
def read_scalar(tag, dtype='float'):
tablet = core.im().tablet(tag)
dtype2obj = {
'float': tablet.as_float_scalar,
'double': tablet.as_double_scalar,
......
import summary
import numpy as np
import unittest
import time
class StorageTester(unittest.TestCase):
def test_storage(self):
summary.set_writable_storage("./tmp_dir")
time.sleep(5)
summary.stop_service()
if __name__ == '__main__':
unittest.main()
import summary
import numpy as np
import unittest
summary.set_storage("tmp_dir")
import time
once_flag = False
class ScalarTester(unittest.TestCase):
def setUp(self):
summary.set_storage("tmp_dir")
global once_flag
self.scalar = summary.scalar("scalar0")
if not once_flag:
......
import summary
import numpy as np
import unittest
import time
class StorageTester(unittest.TestCase):
def test_read_storage(self):
summary.set_readable_storage("./tmp")
time.sleep(1)
scalar = summary.read_scalar('tag01')
time.sleep(5)
summary.stop_service()
if __name__ == '__main__':
unittest.main()
#include <glog/logging.h>
#include <visualdl/backend/utils/concurrency.h>
#include <fstream>
#include "visualdl/backend/storage/storage.h"
......@@ -6,12 +7,17 @@
namespace visualdl {
std::string GenPathFromTag(const std::string &dir, const std::string &tag) {
return dir + "/" + tag;
}
const std::string StorageBase::meta_file_name = "storage.meta";
std::string StorageBase::meta_path() const {
CHECK(!storage_.dir().empty()) << "storage.dir should be set first";
return storage_.dir() + "/" + meta_file_name;
}
std::string StorageBase::tablet_path(const std::string &tag) const {
CHECK(!storage_.dir().empty()) << "storage.dir should be set first";
return storage_.dir() + "/" + tag;
}
storage::Tablet *MemoryStorage::NewTablet(const std::string &tag,
int num_samples) {
auto it = tablets_.find(tag);
......@@ -32,39 +38,54 @@ storage::Tablet *MemoryStorage::tablet(const std::string &tag) {
return &it->second;
}
// TODO add some checksum to avoid unnecessary saving
void MemoryStorage::PersistToDisk() const {
CHECK(!storage_.dir().empty()) << "storage's dir should be set first";
VLOG(3) << "persist storage to disk path " << storage_.dir();
// make a directory if not exist
fs::TryMkdir(storage_.dir());
// write storage out
CHECK(!storage_.dir().empty()) << "storage's dir should be set first";
const auto meta_path = storage_.dir() + "/" + meta_file_name;
fs::Write(meta_path, fs::Serialize(storage_));
fs::SerializeToFile(storage_, meta_path());
// write all the tablets
for (auto tag : storage_.tags()) {
auto path = GenPathFromTag(storage_.dir(), tag);
auto it = tablets_.find(tag);
CHECK(it != tablets_.end());
fs::Write(path, fs::Serialize(it->second));
fs::SerializeToFile(it->second, tablet_path(tag));
}
}
// TODO add some checksum to avoid unnecessary loading
void MemoryStorage::LoadFromDisk(const std::string &dir) {
VLOG(3) << "load storage from disk path " << dir;
CHECK(!dir.empty()) << "dir is empty";
storage_.set_dir(dir);
// load storage
const auto meta_path = dir + "/" + meta_file_name;
auto buf = fs::Read(meta_path);
CHECK(fs::DeSerialize(&storage_, buf))
<< "failed to parse protobuf loaded from " << meta_path;
CHECK(fs::DeSerializeFromFile(&storage_, meta_path()))
<< "parse from " << meta_path() << " failed";
// load all the tablets
for (int i = 0; i < storage_.tags_size(); i++) {
std::string tag = storage_.tags(i);
auto path = GenPathFromTag(storage_.dir(), tag);
CHECK(tablets_[tag].ParseFromString(fs::Read(path)))
<< "failed to parse protobuf text loaded from " << path;
auto tag = storage_.tags(i);
CHECK(fs::DeSerializeFromFile(&tablets_[tag], tablet_path(tag)));
}
}
void MemoryStorage::StartReadService() {
cc::PeriodExector::task_t task = [this] {
VLOG(3) << "loading from " << storage_.dir();
LoadFromDisk(storage_.dir());
return true;
};
cc::PeriodExector::Global().Start();
VLOG(3) << "push read task";
cc::PeriodExector::Global()(std::move(task), 2512);
}
void MemoryStorage::StartWriteSerice() {
cc::PeriodExector::Global().Start();
cc::PeriodExector::task_t task = [this] {
PersistToDisk();
return true;
};
cc::PeriodExector::Global()(std::move(task), 2000);
}
} // namespace visualdl
......@@ -26,6 +26,8 @@ public:
const static std::string meta_file_name;
enum Type { kMemory = 0, kDisk = 1 };
// mode of the sevice, either reading or writing.
enum Mode { kRead = 0, kWrite = 1, kNone = 2 };
void SetStorage(const std::string &dir) {
time_t t;
......@@ -34,6 +36,9 @@ public:
storage_.set_dir(dir);
}
std::string meta_path() const;
std::string tablet_path(const std::string &tag) const;
/*
* Create a new Tablet storage.
*/
......@@ -78,6 +83,18 @@ public:
void LoadFromDisk(const std::string &dir) override;
/*
* Create a thread which will keep reading the latest data from the disk to
* memory.
*/
void StartReadService();
/*
* Create a thread which will keep writing the latest changes from memory to
* disk.
*/
void StartWriteSerice();
private:
std::map<std::string, storage::Tablet> tablets_;
};
......
......@@ -32,6 +32,8 @@ TEST_F(MemoryStorageTest, AddTablet) {
}
TEST_F(MemoryStorageTest, PersistToDisk) {
storage_.SetStorage("./tmp");
CHECK(!storage_.data().dir().empty());
string tag = "add%20tag0";
storage_.NewTablet(tag, -1);
......
#ifndef VISUALDL_BACKEND_UTILS_CONCURRENCY_H
#define VISUALDL_BACKEND_UTILS_CONCURRENCY_H
#include <glog/logging.h>
#include <chrono>
#include <memory>
#include <thread>
#include <vector>
namespace visualdl {
namespace cc {
/*
* Run a task every `duration` milliseconds.
* Each evoke will start a thread to do this asynchronously.
*/
struct PeriodExector {
using task_t = std::function<bool()>;
static PeriodExector& Global() {
static PeriodExector exec;
return exec;
}
void Quit() {
// TODO use some conditonal variable to help quit immediately.
quit = true;
}
void Start() { quit = false; }
void operator()(task_t&& task, int msec) {
auto task_wrapper = [=] {
while (!quit) {
if (!task()) break;
std::this_thread::sleep_for(std::chrono::milliseconds(msec));
}
LOG(INFO) << "quit job";
};
threads_.emplace_back(std::thread(std::move(task_wrapper)));
msec_ = msec;
}
~PeriodExector() {
for (auto& t : threads_) {
if (t.joinable()) {
t.join();
}
}
}
private:
bool quit = false;
std::vector<std::thread> threads_;
int msec_;
};
} // namespace cc
} // namespace visualdl
#endif
......@@ -30,6 +30,18 @@ bool DeSerialize(T* proto, const std::string buf, bool human_readable = false) {
return proto->ParseFromString(buf);
}
template <typename T>
bool SerializeToFile(const T& proto, const std::string& path) {
std::ofstream file(path, std::ios::binary);
return proto.SerializeToOstream(&file);
}
template <typename T>
bool DeSerializeFromFile(T* proto, const std::string& path) {
std::ifstream file(path, std::ios::binary);
return proto->ParseFromIstream(&file);
}
void TryMkdir(const std::string& dir) {
VLOG(1) << "try to mkdir " << dir;
struct stat st = {0};
......
#ifndef VISUALDL_BACKEND_UTILS_LOG_H
#define VISUALDL_BACKEND_UTILS_LOG_H
#include <stdexcept>
namespace visualdl {
namespace log {
class NotImplementedException : public std::logic_error {
public:
NotImplementedException() : std::logic_error{"Function not implemented"} {}
};
} // namespace log
} // namespace visualdl
#endif
#include "visualdl/backend/utils/concurrency.h"
#include <glog/logging.h>
#include <gtest/gtest.h>
namespace visualdl {
int counter = 0;
TEST(concurrency, test) {
cc::PeriodExector::task_t task = [&counter]() {
LOG(INFO) << "Hello " << counter++;
if (counter > 5) return false;
return true;
};
cc::PeriodExector::Global()(std::move(task), 200);
}
} // namespace visualdl
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册