提交 2a29fa06 编写于 作者: S Superjom

remove singleton design

上级 0127fec6
......@@ -39,10 +39,11 @@ add_executable(vl_test
${PROJECT_SOURCE_DIR}/visualdl/storage/storage_test.cc
${PROJECT_SOURCE_DIR}/visualdl/utils/test_concurrency.cc
${PROJECT_SOURCE_DIR}/visualdl/logic/im_test.cc
${PROJECT_SOURCE_DIR}/visualdl/logic/sdk_test.cc
${PROJECT_SOURCE_DIR}/visualdl/utils/concurrency.h
${PROJECT_SOURCE_DIR}/visualdl/utils/filesystem.h
)
target_link_libraries(vl_test storage im gtest glog protobuf gflags pthread)
target_link_libraries(vl_test storage sdk im gtest glog protobuf gflags pthread)
enable_testing ()
......
......@@ -25,26 +25,26 @@ int ReserviorSample(int num_samples, int num_records) {
return -1;
}
IM::IM(StorageBase::Type type, StorageBase::Mode mode) {
switch (type) {
case StorageBase::Type::kMemory: {
storage_.reset(new MemoryStorage);
} break;
default:
CHECK(false) << "Unsupported storage kind " << type;
}
switch (mode) {
case StorageBase::Mode::kRead:
dynamic_cast<MemoryStorage *>(storage_.get())->StartReadService();
break;
case StorageBase::Mode::kWrite:
dynamic_cast<MemoryStorage *>(storage_.get())->StartWriteSerice();
break;
default:
break;
}
}
// IM::IM(StorageBase::Type type, StorageBase::Mode mode) {
// switch (type) {
// case StorageBase::Type::kMemory: {
// storage_.reset(new MemoryStorage);
// } break;
// default:
// CHECK(false) << "Unsupported storage kind " << type;
// }
// switch (mode) {
// case StorageBase::Mode::kRead:
// dynamic_cast<MemoryStorage *>(storage_.get())->StartReadService(500);
// break;
// case StorageBase::Mode::kWrite:
// dynamic_cast<MemoryStorage *>(storage_.get())->StartWriteSerice(500);
// break;
// default:
// break;
// }
// }
void IM::SetPersistDest(const std::string &path) {
CHECK(storage_->mutable_data()->dir().empty())
......@@ -95,7 +95,7 @@ void IM::PersistToDisk() {
CHECK(!storage_->data().dir().empty()) << "path of storage should be set";
// TODO make dir first
// MakeDir(storage_.data().dir());
storage_->PersistToDisk();
storage_->PersistToDisk(storage_->data().dir());
}
} // namespace visualdl
......@@ -27,22 +27,21 @@ namespace visualdl {
*/
class IM final {
public:
IM() { storage_.reset(new MemoryStorage); }
IM(StorageBase::Type type, StorageBase::Mode mode);
~IM() { cc::PeriodExector::Global().Quit(); }
static IM &Global() {
static IM x;
return x;
IM() { storage_.reset(new MemoryStorage(&executor_)); }
// IM(StorageBase::Type type, StorageBase::Mode mode);
~IM() { executor_.Quit();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
void MaintainRead() {
void MaintainRead(const std::string &dir, int msecs) {
LOG(INFO) << "start maintain read";
dynamic_cast<MemoryStorage *>(storage_.get())->StartReadService();
dynamic_cast<MemoryStorage *>(storage_.get())
->StartReadService(dir, msecs, &lock_);
}
void MaintainWrite() {
dynamic_cast<MemoryStorage *>(storage_.get())->StartWriteSerice();
void MaintainWrite(const std::string &dir, int msecs) {
dynamic_cast<MemoryStorage *>(storage_.get())
->StartWriteSerice(dir, msecs, &lock_);
}
/*
......@@ -73,8 +72,15 @@ public:
StorageBase &storage() { return *storage_; }
cc::PeriodExector &executor() { return executor_; }
std::mutex &handler() { return lock_; }
private:
// read write lock for protobuf in memory
std::mutex lock_;
std::unique_ptr<StorageBase> storage_;
cc::PeriodExector executor_;
};
} // namespace visualdl
......
......@@ -2,13 +2,15 @@
#include "gtest/gtest.h"
#include "visualdl/storage/storage.h"
namespace visualdl {
class ImTester : public ::testing::Test {
protected:
void SetUp() override {}
IM &im = IM::Global();
IM im;
};
TEST_F(ImTester, AddTablet) {
......
......@@ -53,18 +53,20 @@ PYBIND11_PLUGIN(core) {
.def("tablet", &vs::ImHelper::tablet)
.def("add_tablet", &vs::ImHelper::AddTablet)
.def("persist_to_disk", &vs::ImHelper::PersistToDisk)
.def("clear_tablets", &vs::ImHelper::ClearTablets);
.def("clear_tablets", &vs::ImHelper::ClearTablets)
.def("start_read_service",
&vs::ImHelper::StartReadService,
"start a thread to maintain read service")
.def("start_write_service",
&vs::ImHelper::StartWriteSerice,
"start a thread to maintain write service")
.def("stop_service",
&vs::ImHelper::StopService,
"stop the service thread");
m.def("start_read_service",
&vs::start_read_service,
"global information-maintainer object.");
m.def("start_write_service",
&vs::start_write_service,
"global information-maintainer object.");
m.def("im", &vs::im);
m.def("stop_threads", &vs::stop_threads);
// interfaces for components begin
// interfaces for components
// different data type of scalar conponent
#define ADD_SCALAR_TYPED_INTERFACE(T, name__) \
py::class_<vs::components::ScalarHelper<T>>(m, #name__) \
.def("add_record", &vs::components::ScalarHelper<T>::AddRecord) \
......
......@@ -66,6 +66,8 @@ namespace components {
template <typename T>
void ScalarHelper<T>::SetCaptions(const std::vector<std::string> &captions) {
ACQUIRE_HANDLER(handler_);
CHECK_EQ(data_->captions_size(), 0UL) << "the captions can set only once";
for (int i = 0; i < captions.size(); i++) {
data_->add_captions(captions[i]);
......@@ -74,6 +76,8 @@ void ScalarHelper<T>::SetCaptions(const std::vector<std::string> &captions) {
template <typename T>
void ScalarHelper<T>::AddRecord(int id, const std::vector<T> &values) {
ACQUIRE_HANDLER(handler_);
CHECK_NOTNULL(data_);
CHECK_GT(data_->captions_size(), 0UL) << "captions should be set first";
CHECK_EQ(data_->captions_size(), values.size())
......@@ -94,6 +98,8 @@ void ScalarHelper<T>::AddRecord(int id, const std::vector<T> &values) {
template <typename T>
std::vector<std::vector<T>> ScalarHelper<T>::GetRecords() const {
ACQUIRE_HANDLER(handler_);
std::vector<std::vector<T>> result;
EntryHelper<T> entry_helper;
for (int i = 0; i < data_->records_size(); i++) {
......@@ -107,6 +113,7 @@ std::vector<std::vector<T>> ScalarHelper<T>::GetRecords() const {
template <typename T>
std::vector<int> ScalarHelper<T>::GetIds() const {
ACQUIRE_HANDLER(handler_);
CHECK_NOTNULL(data_);
std::vector<int> result;
for (int i = 0; i < data_->records_size(); i++) {
......@@ -117,6 +124,7 @@ std::vector<int> ScalarHelper<T>::GetIds() const {
template <typename T>
std::vector<int> ScalarHelper<T>::GetTimestamps() const {
ACQUIRE_HANDLER(handler_);
CHECK_NOTNULL(data_);
std::vector<int> result;
for (int i = 0; i < data_->records_size(); i++) {
......@@ -127,6 +135,7 @@ std::vector<int> ScalarHelper<T>::GetTimestamps() const {
template <typename T>
std::vector<std::string> ScalarHelper<T>::GetCaptions() const {
ACQUIRE_HANDLER(handler_);
return std::vector<std::string>(data_->captions().begin(),
data_->captions().end());
}
......
......@@ -94,33 +94,50 @@ private:
class ImHelper {
public:
ImHelper() {}
// TODO(ChunweiYan) decouple helper with resource.
ImHelper() { im_.reset(new IM); }
ImHelper(std::unique_ptr<IM> im) : im_(std::move(im)) {}
StorageHelper storage() {
return StorageHelper(IM::Global().storage().mutable_data());
return StorageHelper(im_->storage().mutable_data());
}
TabletHelper tablet(const std::string &tag) {
return TabletHelper(IM::Global().storage().tablet(tag));
return TabletHelper(im_->storage().tablet(tag));
}
TabletHelper AddTablet(const std::string &tag, int num_samples) {
return TabletHelper(IM::Global().AddTablet(tag, num_samples));
return TabletHelper(im_->AddTablet(tag, num_samples));
}
void ClearTablets() {
IM::Global().storage().mutable_data()->clear_tablets();
std::mutex &handler() { return im_->handler(); }
void ClearTablets() { im_->storage().mutable_data()->clear_tablets(); }
void StartReadService(const std::string &dir, int msecs) {
im_->SetPersistDest(dir);
im_->MaintainRead(dir, msecs);
}
void StartWriteSerice(const std::string &dir, int msecs) {
im_->SetPersistDest(dir);
im_->MaintainWrite(dir, msecs);
}
void StopService() { im_->executor().Quit(); }
void PersistToDisk() const { im_->PersistToDisk(); }
void PersistToDisk() const { IM::Global().PersistToDisk(); }
private:
std::unique_ptr<IM> im_;
};
namespace components {
#define ACQUIRE_HANDLER(handler) std::lock_guard<std::mutex> ____(*handler);
/*
* Read and write support for Scalar component.
*/
template <typename T>
class ScalarHelper {
public:
ScalarHelper(storage::Tablet *tablet) : data_(tablet) {}
ScalarHelper(storage::Tablet *tablet, std::mutex *handler = nullptr)
: data_(tablet), handler_(handler) {}
ScalarHelper(TabletHelper &tablet, std::mutex *handler = nullptr)
: data_(&tablet.data()), handler_(handler) {}
void SetCaptions(const std::vector<std::string> &captions);
......@@ -138,27 +155,10 @@ public:
private:
storage::Tablet *data_;
std::mutex *handler_;
};
} // namespace components
static ImHelper &im() {
static ImHelper im;
return im;
}
static void start_read_service(const std::string &dir) {
IM::Global().SetPersistDest(dir);
IM::Global().MaintainRead();
}
static void start_write_service(const std::string &dir) {
IM::Global().SetPersistDest(dir);
IM::Global().MaintainWrite();
}
static void stop_threads() { cc::PeriodExector::Global().Quit(); }
} // namespace visualdl
#endif // VISUALDL_BACKEND_LOGIC_SDK_H
......@@ -9,13 +9,14 @@ namespace visualdl {
const std::string StorageBase::meta_file_name = "storage.meta";
std::string StorageBase::meta_path() const {
CHECK(!storage_.dir().empty()) << "storage.dir should be set first";
return storage_.dir() + "/" + meta_file_name;
std::string StorageBase::meta_path(const std::string &dir) const {
CHECK(!dir.empty()) << "dir is empty";
return dir + "/" + meta_file_name;
}
std::string StorageBase::tablet_path(const std::string &tag) const {
CHECK(!storage_.dir().empty()) << "storage.dir should be set first";
return storage_.dir() + "/" + tag;
std::string StorageBase::tablet_path(const std::string &dir,
const std::string &tag) const {
CHECK(!dir.empty()) << "dir should be set first";
return dir + "/" + tag;
}
storage::Tablet *MemoryStorage::NewTablet(const std::string &tag,
......@@ -39,18 +40,20 @@ storage::Tablet *MemoryStorage::tablet(const std::string &tag) {
}
// TODO add some checksum to avoid unnecessary saving
void MemoryStorage::PersistToDisk() const {
CHECK(!storage_.dir().empty()) << "storage's dir should be set first";
VLOG(3) << "persist storage to disk path " << storage_.dir();
void MemoryStorage::PersistToDisk(const std::string &dir) {
CHECK(!dir.empty());
storage_.set_dir(dir);
// make a directory if not exist
fs::TryMkdir(storage_.dir());
fs::TryMkdir(dir);
// write storage out
fs::SerializeToFile(storage_, meta_path());
LOG(INFO) << "to serize meta to dir " << dir;
fs::SerializeToFile(storage_, meta_path(dir));
LOG(INFO) << "serize meta to dir " << dir;
// write all the tablets
for (auto tag : storage_.tags()) {
auto it = tablets_.find(tag);
CHECK(it != tablets_.end());
fs::SerializeToFile(it->second, tablet_path(tag));
fs::SerializeToFile(it->second, tablet_path(dir, tag));
}
}
......@@ -59,33 +62,53 @@ void MemoryStorage::LoadFromDisk(const std::string &dir) {
CHECK(!dir.empty()) << "dir is empty";
storage_.set_dir(dir);
// load storage
CHECK(fs::DeSerializeFromFile(&storage_, meta_path()))
<< "parse from " << meta_path() << " failed";
CHECK(fs::DeSerializeFromFile(&storage_, meta_path(dir)))
<< "parse from " << meta_path(dir) << " failed";
// load all the tablets
for (int i = 0; i < storage_.tags_size(); i++) {
auto tag = storage_.tags(i);
CHECK(fs::DeSerializeFromFile(&tablets_[tag], tablet_path(tag)));
CHECK(fs::DeSerializeFromFile(&tablets_[tag], tablet_path(dir, tag)));
}
}
void MemoryStorage::StartReadService() {
cc::PeriodExector::task_t task = [this] {
VLOG(3) << "loading from " << storage_.dir();
LoadFromDisk(storage_.dir());
void MemoryStorage::StartReadService(const std::string &dir,
int msecs,
std::mutex *handler) {
CHECK(executor_ != nullptr);
CHECK(!dir.empty()) << "dir should be set first";
cc::PeriodExector::task_t task = [dir, this, handler] {
LOG(INFO) << "loading from " << dir;
if (handler != nullptr) {
std::lock_guard<std::mutex> _(*handler);
LoadFromDisk(dir);
} else {
LoadFromDisk(dir);
}
return true;
};
cc::PeriodExector::Global().Start();
VLOG(3) << "push read task";
cc::PeriodExector::Global()(std::move(task), 2512);
// executor_.Start();
LOG(INFO) << "push read task";
(*executor_)(std::move(task), msecs);
}
void MemoryStorage::StartWriteSerice() {
cc::PeriodExector::Global().Start();
cc::PeriodExector::task_t task = [this] {
PersistToDisk();
void MemoryStorage::StartWriteSerice(const std::string &dir,
int msecs,
std::mutex *handler) {
CHECK(executor_ != nullptr);
CHECK(!dir.empty()) << "dir should be set first";
storage_.set_dir(dir);
// executor_.Start();
cc::PeriodExector::task_t task = [dir, handler, this] {
LOG(INFO) << "persist to disk";
if (handler != nullptr) {
std::lock_guard<std::mutex> _(*handler);
PersistToDisk(dir);
} else {
PersistToDisk(dir);
}
return true;
};
cc::PeriodExector::Global()(std::move(task), 2000);
(*executor_)(std::move(task), msecs);
}
} // namespace visualdl
......@@ -6,6 +6,7 @@
#include <string>
#include "visualdl/storage/storage.pb.h"
#include "visualdl/utils/concurrency.h"
namespace visualdl {
......@@ -36,8 +37,8 @@ public:
storage_.set_dir(dir);
}
std::string meta_path() const;
std::string tablet_path(const std::string &tag) const;
std::string meta_path(const std::string &dir) const;
std::string tablet_path(const std::string &dir, const std::string &tag) const;
/*
* Create a new Tablet storage.
......@@ -56,7 +57,7 @@ public:
* Persist the data from cache to disk. Both the memory storage or disk
* storage should write changes to disk for persistence.
*/
virtual void PersistToDisk() const = 0;
virtual void PersistToDisk(const std::string &dir) = 0;
/*
* Load data from disk.
......@@ -75,28 +76,39 @@ protected:
*/
class MemoryStorage final : public StorageBase {
public:
MemoryStorage() {}
MemoryStorage(cc::PeriodExector *executor) : executor_(executor) {}
~MemoryStorage() {
if (executor_ != nullptr) executor_->Quit();
}
storage::Tablet *NewTablet(const std::string &tag, int num_samples) override;
storage::Tablet *tablet(const std::string &tag) override;
void PersistToDisk() const override;
void PersistToDisk(const std::string &dir) override;
void LoadFromDisk(const std::string &dir) override;
/*
* Create a thread which will keep reading the latest data from the disk to
* memory.
*
* msecs: how many millisecond to sync memory and disk.
*/
void StartReadService();
void StartReadService(const std::string &dir, int msecs, std::mutex *handler);
/*
* Create a thread which will keep writing the latest changes from memory to
* disk.
*
* msecs: how many millisecond to sync memory and disk.
*/
void StartWriteSerice();
void StartWriteSerice(const std::string &dir, int msecs, std::mutex *handler);
private:
std::map<std::string, storage::Tablet> tablets_;
// TODO(ChunweiYan) remove executor here.
cc::PeriodExector *executor_{nullptr};
};
} // namespace visualdl
......
......@@ -32,15 +32,17 @@ TEST_F(MemoryStorageTest, AddTablet) {
}
TEST_F(MemoryStorageTest, PersistToDisk) {
storage_.SetStorage("./tmp");
CHECK(!storage_.data().dir().empty());
const std::string dir = "./tmp/201.test";
storage_.SetStorage(dir);
string tag = "add%20tag0";
storage_.NewTablet(tag, -1);
storage_.PersistToDisk();
storage_.PersistToDisk(dir);
LOG(INFO) << "persist to disk";
MemoryStorage other;
other.LoadFromDisk("./tmp");
other.LoadFromDisk(dir);
LOG(INFO) << "read from disk";
ASSERT_EQ(other.data().SerializeAsString(),
storage_.data().SerializeAsString());
}
......
......@@ -17,13 +17,9 @@ namespace cc {
struct PeriodExector {
using task_t = std::function<bool()>;
static PeriodExector& Global() {
static PeriodExector exec;
return exec;
}
void Quit() {
// TODO use some conditonal variable to help quit immediately.
// std::this_thread::sleep_for(std::chrono::milliseconds(200));
quit = true;
}
......@@ -34,6 +30,7 @@ struct PeriodExector {
auto task_wrapper = [=] {
while (!quit) {
// task failed
if (!task()) break;
// if the program is terminated, quit while as soon as possible.
// this is just trick, but should works.
......@@ -57,6 +54,7 @@ struct PeriodExector {
}
~PeriodExector() {
Quit();
for (auto& t : threads_) {
if (t.joinable()) {
t.join();
......
......@@ -8,12 +8,13 @@ namespace visualdl {
int counter = 0;
TEST(concurrency, test) {
cc::PeriodExector::task_t task = [&counter]() {
cc::PeriodExector executor;
cc::PeriodExector::task_t task = [&]() {
LOG(INFO) << "Hello " << counter++;
if (counter > 5) return false;
return true;
};
cc::PeriodExector::Global()(std::move(task), 200);
executor(std::move(task), 200);
}
} // namespace visualdl
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册