提交 dac34b3b 编写于 作者: S superjom

add read and write maintain service

上级 b904480c
......@@ -27,9 +27,9 @@ int ReserviorSample(int num_samples, int num_records) {
IM::IM(StorageBase::Type type, StorageBase::Mode mode) {
switch (type) {
case StorageBase::Type::kMemory:
case StorageBase::Type::kMemory: {
storage_.reset(new MemoryStorage);
break;
} break;
default:
CHECK(false) << "Unsupported storage kind " << type;
}
......
......@@ -2,6 +2,7 @@
#define VISUALDL_BACKEND_LOGIC_IM_H
#include <glog/logging.h>
#include <visualdl/backend/utils/concurrency.h>
#include <memory>
#include <string>
......@@ -10,7 +11,7 @@
namespace visualdl {
/*
* IM(IM) maintain the Storage singleton in memory,
* IM(Information Maintainer) maintain the Storage singleton in memory,
* pre-compute some the statistical information to help visualizaton.
*
* There should be two processes and each have an IM, one is the web server
......@@ -26,12 +27,22 @@ namespace visualdl {
*/
class IM final {
public:
IM(StorageBase::Type type = StorageBase::Type::kMemory,
StorageBase::Mode mode = StorageBase::Mode::kNone);
IM() { storage_.reset(new MemoryStorage); }
IM(StorageBase::Type type, StorageBase::Mode mode);
~IM() { cc::PeriodExector::Global().Quit(); }
static IM &Global() {
static IM *x = new IM();
return *x;
static IM x;
return x;
}
void MaintainRead() {
LOG(INFO) << "start maintain read";
dynamic_cast<MemoryStorage *>(storage_.get())->StartReadService();
}
void MaintainWrite() {
dynamic_cast<MemoryStorage *>(storage_.get())->StartWriteSerice();
}
/*
......
......@@ -55,7 +55,14 @@ PYBIND11_PLUGIN(core) {
.def("persist_to_disk", &vs::ImHelper::PersistToDisk)
.def("clear_tablets", &vs::ImHelper::ClearTablets);
m.def("im", &vs::get_im, "global information-maintainer object.");
m.def("start_read_service",
&vs::start_read_service,
"global information-maintainer object.");
m.def("start_write_service",
&vs::start_write_service,
"global information-maintainer object.");
m.def("im", &vs::im);
m.def("stop_threads", &vs::StopThreads);
// interfaces for components
#define ADD_SCALAR_TYPED_INTERFACE(T, name__) \
......
......@@ -77,7 +77,10 @@ public:
void SetBuffer(const storage::Storage &buffer) { *data_ = buffer; }
void SetBuffer(const std::string &buffer) { data_->ParseFromString(buffer); }
void SetDir(const std::string &dir) { data_->set_dir(dir); }
void SetDir(const std::string &dir) {
CHECK(data_) << "no storage instance hold";
data_->set_dir(dir);
}
int64_t timestamp() const { return data_->timestamp(); }
std::string dir() const { return data_->dir(); }
......@@ -86,62 +89,19 @@ public:
std::string human_readable_buffer() const;
private:
storage::Storage *data_;
storage::Storage *data_{nullptr};
};
class ImHelper {
public:
ImHelper() {}
StorageHelper storage() {
return StorageHelper(IM::Global().storage().mutable_data());
}
TabletHelper tablet(const std::string &tag) {
return TabletHelper(IM::Global().storage().tablet(tag));
}
TabletHelper AddTablet(const std::string &tag, int num_samples) {
return TabletHelper(IM::Global().AddTablet(tag, num_samples));
}
void ClearTablets() {
IM::Global().storage().mutable_data()->clear_tablets();
}
void PersistToDisk() const;
};
namespace components {
/*
* Read and write support for Scalar component.
*/
template <typename T>
class ScalarHelper {
public:
ScalarHelper(storage::Tablet *tablet) : data_(tablet) {}
void SetCaptions(const std::vector<std::string> &captions);
void AddRecord(int id, const std::vector<T> &values);
std::vector<std::vector<T>> GetRecords() const;
std::vector<int> GetIds() const;
std::vector<int> GetTimestamps() const;
std::vector<std::string> GetCaptions() const;
size_t GetSize() const { return data_->records_size(); }
private:
storage::Tablet *data_;
};
} // namespace components
static ImHelper &get_im() {
static ImHelper im;
return im;
/*
* mode:
* 0: read
* 1: write
* 2: none
*/
}
} // namespace visualdl
......
......@@ -4,8 +4,6 @@ __all__ = [
]
import core
im = core.im()
dtypes = ("float", "double", "int32", "int64")
......@@ -15,7 +13,19 @@ def set_storage(dir):
directory of summary to write log.
:return: None
'''
im.storage().set_dir(dir)
core.im().storage().set_dir(dir)
def set_readable_storage(dir):
core.start_read_service(dir)
def set_writable_storage(dir):
core.start_write_service(dir)
def stop_service():
core.stop_threads()
class _Scalar(object):
......@@ -95,7 +105,19 @@ def scalar(tag, dtype='float'):
'''
assert dtype in dtypes, "invalid dtype(%s), should be one of %s" % (
dtype, str(dtypes))
tablet = im.add_tablet(tag, -1)
tablet = core.im().add_tablet(tag, -1)
dtype2obj = {
'float': tablet.as_float_scalar,
'double': tablet.as_double_scalar,
'int32': tablet.as_int32_scalar,
'int64': tablet.as_int64_scalar,
}
obj = dtype2obj[dtype]()
return _Scalar(obj)
def read_scalar(tag, dtype='float'):
tablet = core.im().tablet(tag)
dtype2obj = {
'float': tablet.as_float_scalar,
'double': tablet.as_double_scalar,
......
import summary
import numpy as np
import unittest
summary.set_storage("tmp_dir")
import time
once_flag = False
class ScalarTester(unittest.TestCase):
def setUp(self):
summary.set_storage("tmp_dir")
global once_flag
self.scalar = summary.scalar("scalar0")
if not once_flag:
......
......@@ -40,10 +40,7 @@ storage::Tablet *MemoryStorage::tablet(const std::string &tag) {
// TODO add some checksum to avoid unnecessary saving
void MemoryStorage::PersistToDisk() const {
LOG(INFO) << "inside dir " << storage_.dir() << " "
<< (!storage_.dir().empty());
CHECK(!storage_.dir().empty()) << "storage's dir should be set first";
LOG(INFO) << "after check dir";
VLOG(3) << "persist storage to disk path " << storage_.dir();
// make a directory if not exist
fs::TryMkdir(storage_.dir());
......@@ -59,7 +56,6 @@ void MemoryStorage::PersistToDisk() const {
// TODO add some checksum to avoid unnecessary loading
void MemoryStorage::LoadFromDisk(const std::string &dir) {
VLOG(3) << "load storage from disk path " << dir;
CHECK(!dir.empty()) << "dir is empty";
storage_.set_dir(dir);
// load storage
......@@ -75,16 +71,18 @@ void MemoryStorage::LoadFromDisk(const std::string &dir) {
void MemoryStorage::StartReadService() {
cc::PeriodExector::task_t task = [this] {
LOG(INFO) << "loading from " << storage_.dir();
VLOG(3) << "loading from " << storage_.dir();
LoadFromDisk(storage_.dir());
return true;
};
cc::PeriodExector::Global().Start();
VLOG(3) << "push read task";
cc::PeriodExector::Global()(std::move(task), 2512);
}
void MemoryStorage::StartWriteSerice() {
cc::PeriodExector::Global().Start();
cc::PeriodExector::task_t task = [this] {
LOG(INFO) << "writing to " << storage_.dir();
PersistToDisk();
return true;
};
......
......@@ -27,14 +27,18 @@ struct PeriodExector {
quit = true;
}
void Start() { quit = false; }
void operator()(task_t&& task, int msec) {
auto task_wrapper = [=] {
while (!quit) {
if (!task()) break;
std::this_thread::sleep_for(std::chrono::milliseconds(msec));
}
LOG(INFO) << "quit job";
};
threads_.emplace_back(std::thread(std::move(task_wrapper)));
msec_ = msec;
}
~PeriodExector() {
......@@ -48,6 +52,7 @@ struct PeriodExector {
private:
bool quit = false;
std::vector<std::thread> threads_;
int msec_;
};
} // namespace cc
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册