未验证 提交 bc44bd64 编写于 作者: Y Yan Chunwei 提交者: GitHub

Merge pull request #52 from ChunweiYan/feature/refactor_im_singleton

......@@ -33,17 +33,21 @@ include_directories(${PROJECT_SOURCE_DIR}/thirdparty/local/include)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/storage)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/logic)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/python)
add_executable(vl_test
${PROJECT_SOURCE_DIR}/visualdl/test.cc
${PROJECT_SOURCE_DIR}/visualdl/storage/storage_test.cc
${PROJECT_SOURCE_DIR}/visualdl/utils/test_concurrency.cc
${PROJECT_SOURCE_DIR}/visualdl/logic/im_test.cc
${PROJECT_SOURCE_DIR}/visualdl/logic/sdk_test.cc
${PROJECT_SOURCE_DIR}/visualdl/utils/concurrency.h
${PROJECT_SOURCE_DIR}/visualdl/utils/filesystem.h
)
target_link_libraries(vl_test storage im gtest glog protobuf gflags pthread)
target_link_libraries(vl_test storage sdk im gtest glog protobuf gflags pthread)
enable_testing ()
add_test(NAME vstest COMMAND vl_test)
add_custom_target(test_init COMMAND ./init_test.sh $CMAKE_BINARY_DIR)
add_test(NAME vstest COMMAND ./vl_test)
set_target_properties(vl_test PROPERTIES DEPENDS test_init)
......@@ -7,3 +7,4 @@ add_dependencies(sdk storage_proto)
add_library(core SHARED ${PROJECT_SOURCE_DIR}/visualdl/logic/pybind.cc)
add_dependencies(core pybind python im storage sdk protobuf glog)
target_link_libraries(core PRIVATE pybind python im storage sdk protobuf glog)
set_target_properties(core PROPERTIES PREFIX "" SUFFIX ".so")
......@@ -25,27 +25,6 @@ int ReserviorSample(int num_samples, int num_records) {
return -1;
}
IM::IM(StorageBase::Type type, StorageBase::Mode mode) {
switch (type) {
case StorageBase::Type::kMemory: {
storage_.reset(new MemoryStorage);
} break;
default:
CHECK(false) << "Unsupported storage kind " << type;
}
switch (mode) {
case StorageBase::Mode::kRead:
dynamic_cast<MemoryStorage *>(storage_.get())->StartReadService();
break;
case StorageBase::Mode::kWrite:
dynamic_cast<MemoryStorage *>(storage_.get())->StartWriteSerice();
break;
default:
break;
}
}
void IM::SetPersistDest(const std::string &path) {
CHECK(storage_->mutable_data()->dir().empty())
<< "duplicate set storage's path";
......@@ -95,7 +74,7 @@ void IM::PersistToDisk() {
CHECK(!storage_->data().dir().empty()) << "path of storage should be set";
// TODO make dir first
// MakeDir(storage_.data().dir());
storage_->PersistToDisk();
storage_->PersistToDisk(storage_->data().dir());
}
} // namespace visualdl
......@@ -3,6 +3,7 @@
#include <glog/logging.h>
#include <memory>
#include <mutex>
#include <string>
#include "visualdl/storage/storage.h"
......@@ -27,22 +28,19 @@ namespace visualdl {
*/
class IM final {
public:
IM() { storage_.reset(new MemoryStorage); }
IM(StorageBase::Type type, StorageBase::Mode mode);
~IM() { cc::PeriodExector::Global().Quit(); }
IM() { storage_.reset(new MemoryStorage(&executor_)); }
// IM(StorageBase::Type type, StorageBase::Mode mode);
~IM() { executor_.Quit(); }
static IM &Global() {
static IM x;
return x;
}
void MaintainRead() {
void MaintainRead(const std::string &dir, int msecs) {
LOG(INFO) << "start maintain read";
dynamic_cast<MemoryStorage *>(storage_.get())->StartReadService();
dynamic_cast<MemoryStorage *>(storage_.get())
->StartReadService(dir, msecs, &lock_);
}
void MaintainWrite() {
dynamic_cast<MemoryStorage *>(storage_.get())->StartWriteSerice();
void MaintainWrite(const std::string &dir, int msecs) {
dynamic_cast<MemoryStorage *>(storage_.get())
->StartWriteService(dir, msecs, &lock_);
}
/*
......@@ -73,8 +71,17 @@ public:
StorageBase &storage() { return *storage_; }
cc::PeriodExector &executor() { return executor_; }
std::mutex &handler() { return lock_; }
private:
// read write lock for protobuf in memory
// TODO(ChunweiYan) mutex too heavy here, might change to a message queue to
// reduce the frequency of visiting disk
std::mutex lock_;
std::unique_ptr<StorageBase> storage_;
cc::PeriodExector executor_;
};
} // namespace visualdl
......
......@@ -2,13 +2,15 @@
#include "gtest/gtest.h"
#include "visualdl/storage/storage.h"
namespace visualdl {
class ImTester : public ::testing::Test {
protected:
void SetUp() override {}
IM &im = IM::Global();
IM im;
};
TEST_F(ImTester, AddTablet) {
......
......@@ -22,19 +22,22 @@ PYBIND11_PLUGIN(core) {
vs::TabletHelper::SetBuffer)
// scalar interface
.def("as_int32_scalar",
[](const vs::TabletHelper& self) {
return vs::components::ScalarHelper<int32_t>(&self.data());
[](vs::TabletHelper& self, vs::ImHelper& im) {
return vs::components::ScalarHelper<int32_t>(self, &im.handler());
})
.def("as_int64_scalar",
[](const vs::TabletHelper& self) {
return vs::components::ScalarHelper<int64_t>(&self.data());
[](vs::TabletHelper& self, vs::ImHelper& im) {
return vs::components::ScalarHelper<int64_t>(&self.data(),
&im.handler());
})
.def("as_float_scalar",
[](const vs::TabletHelper& self) {
return vs::components::ScalarHelper<float>(&self.data());
[](vs::TabletHelper& self, vs::ImHelper& im) {
return vs::components::ScalarHelper<float>(&self.data(),
&im.handler());
})
.def("as_double_scalar", [](const vs::TabletHelper& self) {
return vs::components::ScalarHelper<double>(&self.data());
.def("as_double_scalar", [](vs::TabletHelper& self, vs::ImHelper& im) {
return vs::components::ScalarHelper<double>(&self.data(),
&im.handler());
});
py::class_<vs::StorageHelper>(m, "Storage")
......@@ -49,22 +52,26 @@ PYBIND11_PLUGIN(core) {
vs::StorageHelper::SetBuffer);
py::class_<vs::ImHelper>(m, "Im")
.def("__init__",
[](vs::ImHelper& instance) { new (&instance) vs::ImHelper(); })
.def("storage", &vs::ImHelper::storage)
.def("tablet", &vs::ImHelper::tablet)
.def("add_tablet", &vs::ImHelper::AddTablet)
.def("persist_to_disk", &vs::ImHelper::PersistToDisk)
.def("clear_tablets", &vs::ImHelper::ClearTablets);
.def("clear_tablets", &vs::ImHelper::ClearTablets)
.def("start_read_service",
&vs::ImHelper::StartReadService,
"start a thread to maintain read service")
.def("start_write_service",
&vs::ImHelper::StartWriteSerice,
"start a thread to maintain write service")
.def("stop_service",
&vs::ImHelper::StopService,
"stop the service thread");
m.def("start_read_service",
&vs::start_read_service,
"global information-maintainer object.");
m.def("start_write_service",
&vs::start_write_service,
"global information-maintainer object.");
m.def("im", &vs::im);
m.def("stop_threads", &vs::stop_threads);
// interfaces for components begin
// interfaces for components
// different data type of scalar conponent
#define ADD_SCALAR_TYPED_INTERFACE(T, name__) \
py::class_<vs::components::ScalarHelper<T>>(m, #name__) \
.def("add_record", &vs::components::ScalarHelper<T>::AddRecord) \
......
......@@ -66,6 +66,8 @@ namespace components {
template <typename T>
void ScalarHelper<T>::SetCaptions(const std::vector<std::string> &captions) {
ACQUIRE_HANDLER(handler_);
CHECK_EQ(data_->captions_size(), 0UL) << "the captions can set only once";
for (int i = 0; i < captions.size(); i++) {
data_->add_captions(captions[i]);
......@@ -74,6 +76,8 @@ void ScalarHelper<T>::SetCaptions(const std::vector<std::string> &captions) {
template <typename T>
void ScalarHelper<T>::AddRecord(int id, const std::vector<T> &values) {
ACQUIRE_HANDLER(handler_);
CHECK_NOTNULL(data_);
CHECK_GT(data_->captions_size(), 0UL) << "captions should be set first";
CHECK_EQ(data_->captions_size(), values.size())
......@@ -94,6 +98,8 @@ void ScalarHelper<T>::AddRecord(int id, const std::vector<T> &values) {
template <typename T>
std::vector<std::vector<T>> ScalarHelper<T>::GetRecords() const {
ACQUIRE_HANDLER(handler_);
std::vector<std::vector<T>> result;
EntryHelper<T> entry_helper;
for (int i = 0; i < data_->records_size(); i++) {
......@@ -107,6 +113,7 @@ std::vector<std::vector<T>> ScalarHelper<T>::GetRecords() const {
template <typename T>
std::vector<int> ScalarHelper<T>::GetIds() const {
ACQUIRE_HANDLER(handler_);
CHECK_NOTNULL(data_);
std::vector<int> result;
for (int i = 0; i < data_->records_size(); i++) {
......@@ -117,6 +124,7 @@ std::vector<int> ScalarHelper<T>::GetIds() const {
template <typename T>
std::vector<int> ScalarHelper<T>::GetTimestamps() const {
ACQUIRE_HANDLER(handler_);
CHECK_NOTNULL(data_);
std::vector<int> result;
for (int i = 0; i < data_->records_size(); i++) {
......@@ -127,6 +135,7 @@ std::vector<int> ScalarHelper<T>::GetTimestamps() const {
template <typename T>
std::vector<std::string> ScalarHelper<T>::GetCaptions() const {
ACQUIRE_HANDLER(handler_);
return std::vector<std::string>(data_->captions().begin(),
data_->captions().end());
}
......
......@@ -94,33 +94,50 @@ private:
class ImHelper {
public:
ImHelper() {}
// TODO(ChunweiYan) decouple helper with resource.
ImHelper() { im_.reset(new IM); }
ImHelper(std::unique_ptr<IM> im) : im_(std::move(im)) {}
StorageHelper storage() {
return StorageHelper(IM::Global().storage().mutable_data());
return StorageHelper(im_->storage().mutable_data());
}
TabletHelper tablet(const std::string &tag) {
return TabletHelper(IM::Global().storage().tablet(tag));
return TabletHelper(im_->storage().tablet(tag));
}
TabletHelper AddTablet(const std::string &tag, int num_samples) {
return TabletHelper(IM::Global().AddTablet(tag, num_samples));
return TabletHelper(im_->AddTablet(tag, num_samples));
}
void ClearTablets() {
IM::Global().storage().mutable_data()->clear_tablets();
std::mutex &handler() { return im_->handler(); }
void ClearTablets() { im_->storage().mutable_data()->clear_tablets(); }
void StartReadService(const std::string &dir, int msecs) {
im_->SetPersistDest(dir);
im_->MaintainRead(dir, msecs);
}
void StartWriteSerice(const std::string &dir, int msecs) {
im_->SetPersistDest(dir);
im_->MaintainWrite(dir, msecs);
}
void StopService() { im_->executor().Quit(); }
void PersistToDisk() const { im_->PersistToDisk(); }
void PersistToDisk() const { IM::Global().PersistToDisk(); }
private:
std::unique_ptr<IM> im_;
};
namespace components {
#define ACQUIRE_HANDLER(handler) std::lock_guard<std::mutex> ____(*handler);
/*
* Read and write support for Scalar component.
*/
template <typename T>
class ScalarHelper {
public:
ScalarHelper(storage::Tablet *tablet) : data_(tablet) {}
ScalarHelper(storage::Tablet *tablet, std::mutex *handler = nullptr)
: data_(tablet), handler_(handler) {}
ScalarHelper(TabletHelper &tablet, std::mutex *handler = nullptr)
: data_(&tablet.data()), handler_(handler) {}
void SetCaptions(const std::vector<std::string> &captions);
......@@ -138,27 +155,10 @@ public:
private:
storage::Tablet *data_;
std::mutex *handler_;
};
} // namespace components
static ImHelper &im() {
static ImHelper im;
return im;
}
static void start_read_service(const std::string &dir) {
IM::Global().SetPersistDest(dir);
IM::Global().MaintainRead();
}
static void start_write_service(const std::string &dir) {
IM::Global().SetPersistDest(dir);
IM::Global().MaintainWrite();
}
static void stop_threads() { cc::PeriodExector::Global().Quit(); }
} // namespace visualdl
#endif // VISUALDL_BACKEND_LOGIC_SDK_H
#include "visualdl/logic/sdk.h"
#include <gtest/gtest.h>
namespace visualdl {
struct ScalarTestHelper {
ImHelper rim;
ImHelper wim;
const std::string dir = "./tmp/sdk_test.test";
void operator()(std::function<void()> read, std::function<void()> write) {
wim.StartWriteSerice(dir, 200);
write();
// should wait for the write service create log's path
std::this_thread::sleep_for(std::chrono::milliseconds(400));
rim.StartReadService(dir, 100);
// should wait for the read service to load "tag0" tablet into memory
std::this_thread::sleep_for(std::chrono::milliseconds(600));
read();
}
};
TEST(Scalar, set_caption) {
ScalarTestHelper helper;
const std::vector<std::string> captions({"train", "test"});
auto write = [&] {
auto tablet = helper.wim.AddTablet("tag0", -1);
components::ScalarHelper<float> scalar(tablet, &helper.wim.handler());
scalar.SetCaptions(captions);
};
auto read = [&] {
auto mytablet = helper.rim.tablet("tag0");
components::ScalarHelper<float> myscalar(mytablet, &helper.rim.handler());
auto mycaptions = myscalar.GetCaptions();
ASSERT_EQ(captions, mycaptions);
};
helper(read, write);
}
TEST(Scalar, add_records) {
ScalarTestHelper helper;
const std::vector<std::string> captions({"train", "test"});
const size_t nsteps = 100;
auto write = [&] {
auto tablet = helper.wim.AddTablet("tag0", -1);
components::ScalarHelper<float> scalar(tablet, &helper.wim.handler());
scalar.SetCaptions(captions);
for (int i = 0; i < nsteps; i++) {
scalar.AddRecord(i * 10, std::vector<float>({(float)i, (float)i + 1}));
}
};
auto read = [&] {
auto mytablet = helper.rim.tablet("tag0");
components::ScalarHelper<float> myscalar(mytablet, &helper.rim.handler());
auto records = myscalar.GetRecords();
ASSERT_EQ(records.size(), nsteps);
for (int i = 0; i < nsteps; i++) {
ASSERT_EQ(records[i], std::vector<float>({(float)i, (float)i + 1}));
}
};
helper(read, write);
}
} // namespace visualdl
function(py_test TARGET_NAME)
set(options STATIC static SHARED shared)
set(oneValueArgs "")
set(multiValueArgs SRCS DEPS ARGS)
cmake_parse_arguments(py_test "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})
add_test(NAME ${TARGET_NAME}
COMMAND env PYTHONPATH=${CMAKE_BINARY_DIR}/visualdl/logic
${PYTHON_EXECUTABLE} -u ${py_test_SRCS} ${py_test_ARGS}
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
)
endfunction()
py_test(test_summary SRCS test_summary.py)
......@@ -6,26 +6,15 @@ import core
dtypes = ("float", "double", "int32", "int64")
def set_storage(dir):
'''
:param dir: str
directory of summary to write log.
:return: None
'''
core.im().storage().set_dir(dir)
def set_readable_storage(dir):
core.start_read_service(dir)
def set_writable_storage(dir):
core.start_write_service(dir)
def stop_service():
core.stop_threads()
def IM(dir, mode="read", msecs=500):
im = core.Im()
READ = "read"
WRITE = "write"
if mode == READ:
im.start_read_service(dir, msecs)
else:
im.start_write_service(dir, msecs)
return im
class _Scalar(object):
......@@ -93,7 +82,7 @@ class _Scalar(object):
return self._core_object.get_record_size()
def scalar(tag, dtype='float'):
def scalar(im, tag, dtype='float'):
'''
create a scalar component.
......@@ -105,24 +94,12 @@ def scalar(tag, dtype='float'):
'''
assert dtype in dtypes, "invalid dtype(%s), should be one of %s" % (
dtype, str(dtypes))
tablet = core.im().add_tablet(tag, -1)
dtype2obj = {
'float': tablet.as_float_scalar,
'double': tablet.as_double_scalar,
'int32': tablet.as_int32_scalar,
'int64': tablet.as_int64_scalar,
}
obj = dtype2obj[dtype]()
return _Scalar(obj)
def read_scalar(tag, dtype='float'):
tablet = core.im().tablet(tag)
tablet = im.add_tablet(tag, -1)
dtype2obj = {
'float': tablet.as_float_scalar,
'double': tablet.as_double_scalar,
'int32': tablet.as_int32_scalar,
'int64': tablet.as_int64_scalar,
}
obj = dtype2obj[dtype]()
obj = dtype2obj[dtype](im)
return _Scalar(obj)
import summary
import numpy as np
import unittest
import random
import time
once_flag = False
......@@ -8,23 +9,27 @@ once_flag = False
class ScalarTester(unittest.TestCase):
def setUp(self):
summary.set_storage("tmp_dir")
global once_flag
self.scalar = summary.scalar("scalar0")
if not once_flag:
self.py_captions = ["train cost", "test cost"]
self.scalar.set_captions(self.py_captions)
self.dir = "tmp/summary.test"
# clean path
try:
os.rmdir(self.dir)
except:
pass
self.im = summary.IM(self.dir, "write", 200)
self.tablet_name = "scalar0"
self.scalar = summary.scalar(self.im, self.tablet_name)
self.py_captions = ["train cost", "test cost"]
self.scalar.set_captions(self.py_captions)
self.py_records = []
self.py_ids = []
# write
for i in range(10):
record = [0.1 * i, 0.2 * i]
id = i * 10
self.py_records.append(record)
self.py_ids.append(id)
if not once_flag:
self.scalar.add(id, record)
once_flag = True
self.scalar.add(id, record)
def test_records(self):
self.assertEqual(self.scalar.size, len(self.py_records))
......@@ -39,6 +44,58 @@ class ScalarTester(unittest.TestCase):
def test_captions(self):
self.assertEqual(self.scalar.captions, self.py_captions)
def test_read_records(self):
time.sleep(1)
im = summary.IM(self.dir, "read", 200)
time.sleep(1)
scalar = summary.scalar(im, self.tablet_name)
records = scalar.records
self.assertEqual(len(self.py_records), scalar.size)
for i, record in enumerate(self.scalar.records):
self.assertTrue(np.isclose(record, records[i]).all())
def test_read_ids(self):
time.sleep(0.6)
im = summary.IM(self.dir, "read", msecs=200)
time.sleep(0.6)
scalar = summary.scalar(im, self.tablet_name)
self.assertEqual(len(self.py_ids), scalar.size)
for i, id in enumerate(scalar.ids):
self.assertEqual(self.py_ids[i], id)
def test_read_captions(self):
time.sleep(0.6)
im = summary.IM(self.dir, "read", msecs=200)
time.sleep(0.6)
scalar = summary.scalar(im, self.tablet_name)
self.assertEqual(scalar.captions, self.py_captions)
def test_mix_read_write(self):
write_im = summary.IM(self.dir, "write", msecs=200)
time.sleep(0.6)
read_im = summary.IM(self.dir, "read", msecs=200)
scalar_writer = summary.scalar(write_im, self.tablet_name)
scalar_reader = summary.scalar(read_im, self.tablet_name)
scalar_writer.set_captions(["train cost", "test cost"])
for i in range(1000):
scalar_writer.add(i, [random.random(), random.random()])
scalar_reader.records
for i in range(500):
scalar_writer.add(i, [random.random(), random.random()])
scalar_reader.records
for i in range(500):
scalar_writer.add(i, [random.random(), random.random()])
for i in range(10):
scalar_reader.records
scalar_reader.captions
if __name__ == '__main__':
unittest.main()
......@@ -9,13 +9,14 @@ namespace visualdl {
const std::string StorageBase::meta_file_name = "storage.meta";
std::string StorageBase::meta_path() const {
CHECK(!storage_.dir().empty()) << "storage.dir should be set first";
return storage_.dir() + "/" + meta_file_name;
std::string StorageBase::meta_path(const std::string &dir) const {
CHECK(!dir.empty()) << "dir is empty";
return dir + "/" + meta_file_name;
}
std::string StorageBase::tablet_path(const std::string &tag) const {
CHECK(!storage_.dir().empty()) << "storage.dir should be set first";
return storage_.dir() + "/" + tag;
std::string StorageBase::tablet_path(const std::string &dir,
const std::string &tag) const {
CHECK(!dir.empty()) << "dir should be set first";
return dir + "/" + tag;
}
storage::Tablet *MemoryStorage::NewTablet(const std::string &tag,
......@@ -39,18 +40,20 @@ storage::Tablet *MemoryStorage::tablet(const std::string &tag) {
}
// TODO add some checksum to avoid unnecessary saving
void MemoryStorage::PersistToDisk() const {
CHECK(!storage_.dir().empty()) << "storage's dir should be set first";
VLOG(3) << "persist storage to disk path " << storage_.dir();
void MemoryStorage::PersistToDisk(const std::string &dir) {
CHECK(!dir.empty());
storage_.set_dir(dir);
// make a directory if not exist
fs::TryMkdir(storage_.dir());
fs::TryRecurMkdir(dir);
// write storage out
fs::SerializeToFile(storage_, meta_path());
VLOG(2) << "to serize meta to dir " << dir;
fs::SerializeToFile(storage_, meta_path(dir));
VLOG(2) << "serize meta to dir " << dir;
// write all the tablets
for (auto tag : storage_.tags()) {
auto it = tablets_.find(tag);
CHECK(it != tablets_.end());
fs::SerializeToFile(it->second, tablet_path(tag));
fs::SerializeToFile(it->second, tablet_path(dir, tag));
}
}
......@@ -59,33 +62,52 @@ void MemoryStorage::LoadFromDisk(const std::string &dir) {
CHECK(!dir.empty()) << "dir is empty";
storage_.set_dir(dir);
// load storage
CHECK(fs::DeSerializeFromFile(&storage_, meta_path()))
<< "parse from " << meta_path() << " failed";
CHECK(fs::DeSerializeFromFile(&storage_, meta_path(dir)))
<< "parse from " << meta_path(dir) << " failed";
// load all the tablets
for (int i = 0; i < storage_.tags_size(); i++) {
auto tag = storage_.tags(i);
CHECK(fs::DeSerializeFromFile(&tablets_[tag], tablet_path(tag)));
CHECK(fs::DeSerializeFromFile(&tablets_[tag], tablet_path(dir, tag)));
}
}
void MemoryStorage::StartReadService() {
cc::PeriodExector::task_t task = [this] {
VLOG(3) << "loading from " << storage_.dir();
LoadFromDisk(storage_.dir());
void MemoryStorage::StartReadService(const std::string &dir,
int msecs,
std::mutex *handler) {
CHECK(executor_ != nullptr);
CHECK(!dir.empty()) << "dir should be set first";
cc::PeriodExector::task_t task = [dir, this, handler] {
VLOG(1) << "loading from " << dir;
if (handler != nullptr) {
std::lock_guard<std::mutex> _(*handler);
LoadFromDisk(dir);
} else {
LoadFromDisk(dir);
}
return true;
};
cc::PeriodExector::Global().Start();
VLOG(3) << "push read task";
cc::PeriodExector::Global()(std::move(task), 2512);
// executor_.Start();
VLOG(1) << "push read task";
(*executor_)(std::move(task), msecs);
}
void MemoryStorage::StartWriteSerice() {
cc::PeriodExector::Global().Start();
cc::PeriodExector::task_t task = [this] {
PersistToDisk();
void MemoryStorage::StartWriteService(const std::string &dir,
int msecs,
std::mutex *handler) {
CHECK(executor_ != nullptr);
CHECK(!dir.empty()) << "dir should be set first";
storage_.set_dir(dir);
// executor_.Start();
cc::PeriodExector::task_t task = [dir, handler, this] {
VLOG(2) << "persist to disk";
if (handler != nullptr) {
std::lock_guard<std::mutex> _(*handler);
PersistToDisk(dir);
} else {
PersistToDisk(dir);
}
return true;
};
cc::PeriodExector::Global()(std::move(task), 2000);
(*executor_)(std::move(task), msecs);
}
} // namespace visualdl
......@@ -3,9 +3,12 @@
#include <time.h>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include "visualdl/storage/storage.pb.h"
#include "visualdl/utils/concurrency.h"
namespace visualdl {
......@@ -36,8 +39,8 @@ public:
storage_.set_dir(dir);
}
std::string meta_path() const;
std::string tablet_path(const std::string &tag) const;
std::string meta_path(const std::string &dir) const;
std::string tablet_path(const std::string &dir, const std::string &tag) const;
/*
* Create a new Tablet storage.
......@@ -56,7 +59,7 @@ public:
* Persist the data from cache to disk. Both the memory storage or disk
* storage should write changes to disk for persistence.
*/
virtual void PersistToDisk() const = 0;
virtual void PersistToDisk(const std::string &dir) = 0;
/*
* Load data from disk.
......@@ -75,28 +78,41 @@ protected:
*/
class MemoryStorage final : public StorageBase {
public:
MemoryStorage() {}
MemoryStorage(cc::PeriodExector *executor) : executor_(executor) {}
~MemoryStorage() {
if (executor_ != nullptr) executor_->Quit();
}
storage::Tablet *NewTablet(const std::string &tag, int num_samples) override;
storage::Tablet *tablet(const std::string &tag) override;
void PersistToDisk() const override;
void PersistToDisk(const std::string &dir) override;
void LoadFromDisk(const std::string &dir) override;
/*
* Create a thread which will keep reading the latest data from the disk to
* memory.
*
* msecs: how many millisecond to sync memory and disk.
*/
void StartReadService();
void StartReadService(const std::string &dir, int msecs, std::mutex *handler);
/*
* Create a thread which will keep writing the latest changes from memory to
* disk.
*
* msecs: how many millisecond to sync memory and disk.
*/
void StartWriteSerice();
void StartWriteService(const std::string &dir,
int msecs,
std::mutex *handler);
private:
std::map<std::string, storage::Tablet> tablets_;
// TODO(ChunweiYan) remove executor here.
cc::PeriodExector *executor_{nullptr};
};
} // namespace visualdl
......
......@@ -32,15 +32,17 @@ TEST_F(MemoryStorageTest, AddTablet) {
}
TEST_F(MemoryStorageTest, PersistToDisk) {
storage_.SetStorage("./tmp");
CHECK(!storage_.data().dir().empty());
const std::string dir = "./tmp/201.test";
storage_.SetStorage(dir);
string tag = "add%20tag0";
storage_.NewTablet(tag, -1);
storage_.PersistToDisk();
storage_.PersistToDisk(dir);
LOG(INFO) << "persist to disk";
MemoryStorage other;
other.LoadFromDisk("./tmp");
other.LoadFromDisk(dir);
LOG(INFO) << "read from disk";
ASSERT_EQ(other.data().SerializeAsString(),
storage_.data().SerializeAsString());
}
......
......@@ -17,13 +17,9 @@ namespace cc {
struct PeriodExector {
using task_t = std::function<bool()>;
static PeriodExector& Global() {
static PeriodExector exec;
return exec;
}
void Quit() {
// TODO use some conditonal variable to help quit immediately.
// std::this_thread::sleep_for(std::chrono::milliseconds(200));
quit = true;
}
......@@ -34,6 +30,7 @@ struct PeriodExector {
auto task_wrapper = [=] {
while (!quit) {
// task failed
if (!task()) break;
// if the program is terminated, quit while as soon as possible.
// this is just trick, but should works.
......@@ -57,6 +54,7 @@ struct PeriodExector {
}
~PeriodExector() {
Quit();
for (auto& t : threads_) {
if (t.joinable()) {
t.join();
......
......@@ -39,6 +39,7 @@ bool SerializeToFile(const T& proto, const std::string& path) {
template <typename T>
bool DeSerializeFromFile(T* proto, const std::string& path) {
std::ifstream file(path, std::ios::binary);
CHECK(file.is_open()) << "open " << path << " failed";
return proto->ParseFromIstream(&file);
}
......@@ -50,6 +51,19 @@ void TryMkdir(const std::string& dir) {
}
}
// Create a path by recursively create directries
void TryRecurMkdir(const std::string& path) {
// split path by '/'
for (int i = 1; i < path.size() - 1; i++) {
if (path[i] == '/') {
auto dir = path.substr(0, i + 1);
TryMkdir(dir);
}
}
// the last level
TryMkdir(path);
}
inline void Write(const std::string& path,
const std::string& buffer,
std::ios::openmode open_mode = std::ios::binary) {
......
......@@ -8,12 +8,13 @@ namespace visualdl {
int counter = 0;
TEST(concurrency, test) {
cc::PeriodExector::task_t task = [&counter]() {
cc::PeriodExector executor;
cc::PeriodExector::task_t task = [&]() {
LOG(INFO) << "Hello " << counter++;
if (counter > 5) return false;
return true;
};
cc::PeriodExector::Global()(std::move(task), 200);
executor(std::move(task), 200);
}
} // namespace visualdl
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册