提交 d4fa9e6f 编写于 作者: S superjom

add read and write maintain service

上级 c27317da
...@@ -27,9 +27,9 @@ int ReserviorSample(int num_samples, int num_records) { ...@@ -27,9 +27,9 @@ int ReserviorSample(int num_samples, int num_records) {
IM::IM(StorageBase::Type type, StorageBase::Mode mode) { IM::IM(StorageBase::Type type, StorageBase::Mode mode) {
switch (type) { switch (type) {
case StorageBase::Type::kMemory: case StorageBase::Type::kMemory: {
storage_.reset(new MemoryStorage); storage_.reset(new MemoryStorage);
break; } break;
default: default:
CHECK(false) << "Unsupported storage kind " << type; CHECK(false) << "Unsupported storage kind " << type;
} }
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#define VISUALDL_BACKEND_LOGIC_IM_H #define VISUALDL_BACKEND_LOGIC_IM_H
#include <glog/logging.h> #include <glog/logging.h>
#include <visualdl/backend/utils/concurrency.h>
#include <memory> #include <memory>
#include <string> #include <string>
...@@ -10,7 +11,7 @@ ...@@ -10,7 +11,7 @@
namespace visualdl { namespace visualdl {
/* /*
* IM(IM) maintain the Storage singleton in memory, * IM(Information Maintainer) maintain the Storage singleton in memory,
* pre-compute some the statistical information to help visualizaton. * pre-compute some the statistical information to help visualizaton.
* *
* There should be two processes and each have an IM, one is the web server * There should be two processes and each have an IM, one is the web server
...@@ -26,12 +27,22 @@ namespace visualdl { ...@@ -26,12 +27,22 @@ namespace visualdl {
*/ */
class IM final { class IM final {
public: public:
IM(StorageBase::Type type = StorageBase::Type::kMemory, IM() { storage_.reset(new MemoryStorage); }
StorageBase::Mode mode = StorageBase::Mode::kNone); IM(StorageBase::Type type, StorageBase::Mode mode);
~IM() { cc::PeriodExector::Global().Quit(); }
static IM &Global() { static IM &Global() {
static IM *x = new IM(); static IM x;
return *x; return x;
}
void MaintainRead() {
LOG(INFO) << "start maintain read";
dynamic_cast<MemoryStorage *>(storage_.get())->StartReadService();
}
void MaintainWrite() {
dynamic_cast<MemoryStorage *>(storage_.get())->StartWriteSerice();
} }
/* /*
......
...@@ -55,7 +55,14 @@ PYBIND11_PLUGIN(core) { ...@@ -55,7 +55,14 @@ PYBIND11_PLUGIN(core) {
.def("persist_to_disk", &vs::ImHelper::PersistToDisk) .def("persist_to_disk", &vs::ImHelper::PersistToDisk)
.def("clear_tablets", &vs::ImHelper::ClearTablets); .def("clear_tablets", &vs::ImHelper::ClearTablets);
m.def("im", &vs::get_im, "global information-maintainer object."); m.def("start_read_service",
&vs::start_read_service,
"global information-maintainer object.");
m.def("start_write_service",
&vs::start_write_service,
"global information-maintainer object.");
m.def("im", &vs::im);
m.def("stop_threads", &vs::StopThreads);
// interfaces for components // interfaces for components
#define ADD_SCALAR_TYPED_INTERFACE(T, name__) \ #define ADD_SCALAR_TYPED_INTERFACE(T, name__) \
......
...@@ -77,7 +77,10 @@ public: ...@@ -77,7 +77,10 @@ public:
void SetBuffer(const storage::Storage &buffer) { *data_ = buffer; } void SetBuffer(const storage::Storage &buffer) { *data_ = buffer; }
void SetBuffer(const std::string &buffer) { data_->ParseFromString(buffer); } void SetBuffer(const std::string &buffer) { data_->ParseFromString(buffer); }
void SetDir(const std::string &dir) { data_->set_dir(dir); } void SetDir(const std::string &dir) {
CHECK(data_) << "no storage instance hold";
data_->set_dir(dir);
}
int64_t timestamp() const { return data_->timestamp(); } int64_t timestamp() const { return data_->timestamp(); }
std::string dir() const { return data_->dir(); } std::string dir() const { return data_->dir(); }
...@@ -86,62 +89,19 @@ public: ...@@ -86,62 +89,19 @@ public:
std::string human_readable_buffer() const; std::string human_readable_buffer() const;
private: private:
storage::Storage *data_; storage::Storage *data_{nullptr};
}; };
class ImHelper { class ImHelper {
public: public:
ImHelper() {} ImHelper() {}
StorageHelper storage() { /*
return StorageHelper(IM::Global().storage().mutable_data()); * mode:
} * 0: read
TabletHelper tablet(const std::string &tag) { * 1: write
return TabletHelper(IM::Global().storage().tablet(tag)); * 2: none
} */
TabletHelper AddTablet(const std::string &tag, int num_samples) {
return TabletHelper(IM::Global().AddTablet(tag, num_samples));
}
void ClearTablets() {
IM::Global().storage().mutable_data()->clear_tablets();
}
void PersistToDisk() const;
};
namespace components {
/*
* Read and write support for Scalar component.
*/
template <typename T>
class ScalarHelper {
public:
ScalarHelper(storage::Tablet *tablet) : data_(tablet) {}
void SetCaptions(const std::vector<std::string> &captions);
void AddRecord(int id, const std::vector<T> &values);
std::vector<std::vector<T>> GetRecords() const;
std::vector<int> GetIds() const;
std::vector<int> GetTimestamps() const;
std::vector<std::string> GetCaptions() const;
size_t GetSize() const { return data_->records_size(); }
private:
storage::Tablet *data_;
};
} // namespace components
static ImHelper &get_im() {
static ImHelper im;
return im;
} }
} // namespace visualdl } // namespace visualdl
......
...@@ -4,8 +4,6 @@ __all__ = [ ...@@ -4,8 +4,6 @@ __all__ = [
] ]
import core import core
im = core.im()
dtypes = ("float", "double", "int32", "int64") dtypes = ("float", "double", "int32", "int64")
...@@ -15,7 +13,19 @@ def set_storage(dir): ...@@ -15,7 +13,19 @@ def set_storage(dir):
directory of summary to write log. directory of summary to write log.
:return: None :return: None
''' '''
im.storage().set_dir(dir) core.im().storage().set_dir(dir)
def set_readable_storage(dir):
core.start_read_service(dir)
def set_writable_storage(dir):
core.start_write_service(dir)
def stop_service():
core.stop_threads()
class _Scalar(object): class _Scalar(object):
...@@ -95,7 +105,19 @@ def scalar(tag, dtype='float'): ...@@ -95,7 +105,19 @@ def scalar(tag, dtype='float'):
''' '''
assert dtype in dtypes, "invalid dtype(%s), should be one of %s" % ( assert dtype in dtypes, "invalid dtype(%s), should be one of %s" % (
dtype, str(dtypes)) dtype, str(dtypes))
tablet = im.add_tablet(tag, -1) tablet = core.im().add_tablet(tag, -1)
dtype2obj = {
'float': tablet.as_float_scalar,
'double': tablet.as_double_scalar,
'int32': tablet.as_int32_scalar,
'int64': tablet.as_int64_scalar,
}
obj = dtype2obj[dtype]()
return _Scalar(obj)
def read_scalar(tag, dtype='float'):
tablet = core.im().tablet(tag)
dtype2obj = { dtype2obj = {
'float': tablet.as_float_scalar, 'float': tablet.as_float_scalar,
'double': tablet.as_double_scalar, 'double': tablet.as_double_scalar,
......
import summary import summary
import numpy as np import numpy as np
import unittest import unittest
import time
summary.set_storage("tmp_dir")
once_flag = False once_flag = False
class ScalarTester(unittest.TestCase): class ScalarTester(unittest.TestCase):
def setUp(self): def setUp(self):
summary.set_storage("tmp_dir")
global once_flag global once_flag
self.scalar = summary.scalar("scalar0") self.scalar = summary.scalar("scalar0")
if not once_flag: if not once_flag:
......
...@@ -40,10 +40,7 @@ storage::Tablet *MemoryStorage::tablet(const std::string &tag) { ...@@ -40,10 +40,7 @@ storage::Tablet *MemoryStorage::tablet(const std::string &tag) {
// TODO add some checksum to avoid unnecessary saving // TODO add some checksum to avoid unnecessary saving
void MemoryStorage::PersistToDisk() const { void MemoryStorage::PersistToDisk() const {
LOG(INFO) << "inside dir " << storage_.dir() << " "
<< (!storage_.dir().empty());
CHECK(!storage_.dir().empty()) << "storage's dir should be set first"; CHECK(!storage_.dir().empty()) << "storage's dir should be set first";
LOG(INFO) << "after check dir";
VLOG(3) << "persist storage to disk path " << storage_.dir(); VLOG(3) << "persist storage to disk path " << storage_.dir();
// make a directory if not exist // make a directory if not exist
fs::TryMkdir(storage_.dir()); fs::TryMkdir(storage_.dir());
...@@ -59,7 +56,6 @@ void MemoryStorage::PersistToDisk() const { ...@@ -59,7 +56,6 @@ void MemoryStorage::PersistToDisk() const {
// TODO add some checksum to avoid unnecessary loading // TODO add some checksum to avoid unnecessary loading
void MemoryStorage::LoadFromDisk(const std::string &dir) { void MemoryStorage::LoadFromDisk(const std::string &dir) {
VLOG(3) << "load storage from disk path " << dir;
CHECK(!dir.empty()) << "dir is empty"; CHECK(!dir.empty()) << "dir is empty";
storage_.set_dir(dir); storage_.set_dir(dir);
// load storage // load storage
...@@ -75,16 +71,18 @@ void MemoryStorage::LoadFromDisk(const std::string &dir) { ...@@ -75,16 +71,18 @@ void MemoryStorage::LoadFromDisk(const std::string &dir) {
void MemoryStorage::StartReadService() { void MemoryStorage::StartReadService() {
cc::PeriodExector::task_t task = [this] { cc::PeriodExector::task_t task = [this] {
LOG(INFO) << "loading from " << storage_.dir(); VLOG(3) << "loading from " << storage_.dir();
LoadFromDisk(storage_.dir()); LoadFromDisk(storage_.dir());
return true; return true;
}; };
cc::PeriodExector::Global().Start();
VLOG(3) << "push read task";
cc::PeriodExector::Global()(std::move(task), 2512); cc::PeriodExector::Global()(std::move(task), 2512);
} }
void MemoryStorage::StartWriteSerice() { void MemoryStorage::StartWriteSerice() {
cc::PeriodExector::Global().Start();
cc::PeriodExector::task_t task = [this] { cc::PeriodExector::task_t task = [this] {
LOG(INFO) << "writing to " << storage_.dir();
PersistToDisk(); PersistToDisk();
return true; return true;
}; };
......
...@@ -27,14 +27,18 @@ struct PeriodExector { ...@@ -27,14 +27,18 @@ struct PeriodExector {
quit = true; quit = true;
} }
void Start() { quit = false; }
void operator()(task_t&& task, int msec) { void operator()(task_t&& task, int msec) {
auto task_wrapper = [=] { auto task_wrapper = [=] {
while (!quit) { while (!quit) {
if (!task()) break; if (!task()) break;
std::this_thread::sleep_for(std::chrono::milliseconds(msec)); std::this_thread::sleep_for(std::chrono::milliseconds(msec));
} }
LOG(INFO) << "quit job";
}; };
threads_.emplace_back(std::thread(std::move(task_wrapper))); threads_.emplace_back(std::thread(std::move(task_wrapper)));
msec_ = msec;
} }
~PeriodExector() { ~PeriodExector() {
...@@ -48,6 +52,7 @@ struct PeriodExector { ...@@ -48,6 +52,7 @@ struct PeriodExector {
private: private:
bool quit = false; bool quit = false;
std::vector<std::thread> threads_; std::vector<std::thread> threads_;
int msec_;
}; };
} // namespace cc } // namespace cc
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册