提交 ce452e3e 编写于 作者: Y Yan Chunwei 提交者: GitHub

Merge pull request #52 from ChunweiYan/feature/refactor_im_singleton

...@@ -33,17 +33,21 @@ include_directories(${PROJECT_SOURCE_DIR}/thirdparty/local/include) ...@@ -33,17 +33,21 @@ include_directories(${PROJECT_SOURCE_DIR}/thirdparty/local/include)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/storage) add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/storage)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/logic) add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/logic)
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/visualdl/python)
add_executable(vl_test add_executable(vl_test
${PROJECT_SOURCE_DIR}/visualdl/test.cc ${PROJECT_SOURCE_DIR}/visualdl/test.cc
${PROJECT_SOURCE_DIR}/visualdl/storage/storage_test.cc ${PROJECT_SOURCE_DIR}/visualdl/storage/storage_test.cc
${PROJECT_SOURCE_DIR}/visualdl/utils/test_concurrency.cc ${PROJECT_SOURCE_DIR}/visualdl/utils/test_concurrency.cc
${PROJECT_SOURCE_DIR}/visualdl/logic/im_test.cc ${PROJECT_SOURCE_DIR}/visualdl/logic/im_test.cc
${PROJECT_SOURCE_DIR}/visualdl/logic/sdk_test.cc
${PROJECT_SOURCE_DIR}/visualdl/utils/concurrency.h ${PROJECT_SOURCE_DIR}/visualdl/utils/concurrency.h
${PROJECT_SOURCE_DIR}/visualdl/utils/filesystem.h ${PROJECT_SOURCE_DIR}/visualdl/utils/filesystem.h
) )
target_link_libraries(vl_test storage im gtest glog protobuf gflags pthread) target_link_libraries(vl_test storage sdk im gtest glog protobuf gflags pthread)
enable_testing () enable_testing ()
add_test(NAME vstest COMMAND vl_test) add_custom_target(test_init COMMAND ./init_test.sh $CMAKE_BINARY_DIR)
add_test(NAME vstest COMMAND ./vl_test)
set_target_properties(vl_test PROPERTIES DEPENDS test_init)
...@@ -7,3 +7,4 @@ add_dependencies(sdk storage_proto) ...@@ -7,3 +7,4 @@ add_dependencies(sdk storage_proto)
add_library(core SHARED ${PROJECT_SOURCE_DIR}/visualdl/logic/pybind.cc) add_library(core SHARED ${PROJECT_SOURCE_DIR}/visualdl/logic/pybind.cc)
add_dependencies(core pybind python im storage sdk protobuf glog) add_dependencies(core pybind python im storage sdk protobuf glog)
target_link_libraries(core PRIVATE pybind python im storage sdk protobuf glog) target_link_libraries(core PRIVATE pybind python im storage sdk protobuf glog)
set_target_properties(core PROPERTIES PREFIX "" SUFFIX ".so")
...@@ -25,27 +25,6 @@ int ReserviorSample(int num_samples, int num_records) { ...@@ -25,27 +25,6 @@ int ReserviorSample(int num_samples, int num_records) {
return -1; return -1;
} }
IM::IM(StorageBase::Type type, StorageBase::Mode mode) {
switch (type) {
case StorageBase::Type::kMemory: {
storage_.reset(new MemoryStorage);
} break;
default:
CHECK(false) << "Unsupported storage kind " << type;
}
switch (mode) {
case StorageBase::Mode::kRead:
dynamic_cast<MemoryStorage *>(storage_.get())->StartReadService();
break;
case StorageBase::Mode::kWrite:
dynamic_cast<MemoryStorage *>(storage_.get())->StartWriteSerice();
break;
default:
break;
}
}
void IM::SetPersistDest(const std::string &path) { void IM::SetPersistDest(const std::string &path) {
CHECK(storage_->mutable_data()->dir().empty()) CHECK(storage_->mutable_data()->dir().empty())
<< "duplicate set storage's path"; << "duplicate set storage's path";
...@@ -95,7 +74,7 @@ void IM::PersistToDisk() { ...@@ -95,7 +74,7 @@ void IM::PersistToDisk() {
CHECK(!storage_->data().dir().empty()) << "path of storage should be set"; CHECK(!storage_->data().dir().empty()) << "path of storage should be set";
// TODO make dir first // TODO make dir first
// MakeDir(storage_.data().dir()); // MakeDir(storage_.data().dir());
storage_->PersistToDisk(); storage_->PersistToDisk(storage_->data().dir());
} }
} // namespace visualdl } // namespace visualdl
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include <glog/logging.h> #include <glog/logging.h>
#include <memory> #include <memory>
#include <mutex>
#include <string> #include <string>
#include "visualdl/storage/storage.h" #include "visualdl/storage/storage.h"
...@@ -27,22 +28,19 @@ namespace visualdl { ...@@ -27,22 +28,19 @@ namespace visualdl {
*/ */
class IM final { class IM final {
public: public:
IM() { storage_.reset(new MemoryStorage); } IM() { storage_.reset(new MemoryStorage(&executor_)); }
IM(StorageBase::Type type, StorageBase::Mode mode); // IM(StorageBase::Type type, StorageBase::Mode mode);
~IM() { cc::PeriodExector::Global().Quit(); } ~IM() { executor_.Quit(); }
static IM &Global() { void MaintainRead(const std::string &dir, int msecs) {
static IM x;
return x;
}
void MaintainRead() {
LOG(INFO) << "start maintain read"; LOG(INFO) << "start maintain read";
dynamic_cast<MemoryStorage *>(storage_.get())->StartReadService(); dynamic_cast<MemoryStorage *>(storage_.get())
->StartReadService(dir, msecs, &lock_);
} }
void MaintainWrite() { void MaintainWrite(const std::string &dir, int msecs) {
dynamic_cast<MemoryStorage *>(storage_.get())->StartWriteSerice(); dynamic_cast<MemoryStorage *>(storage_.get())
->StartWriteService(dir, msecs, &lock_);
} }
/* /*
...@@ -73,8 +71,17 @@ public: ...@@ -73,8 +71,17 @@ public:
StorageBase &storage() { return *storage_; } StorageBase &storage() { return *storage_; }
cc::PeriodExector &executor() { return executor_; }
std::mutex &handler() { return lock_; }
private: private:
// read write lock for protobuf in memory
// TODO(ChunweiYan) mutex too heavy here, might change to a message queue to
// reduce the frequency of visiting disk
std::mutex lock_;
std::unique_ptr<StorageBase> storage_; std::unique_ptr<StorageBase> storage_;
cc::PeriodExector executor_;
}; };
} // namespace visualdl } // namespace visualdl
......
...@@ -2,13 +2,15 @@ ...@@ -2,13 +2,15 @@
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "visualdl/storage/storage.h"
namespace visualdl { namespace visualdl {
class ImTester : public ::testing::Test { class ImTester : public ::testing::Test {
protected: protected:
void SetUp() override {} void SetUp() override {}
IM &im = IM::Global(); IM im;
}; };
TEST_F(ImTester, AddTablet) { TEST_F(ImTester, AddTablet) {
......
...@@ -22,19 +22,22 @@ PYBIND11_PLUGIN(core) { ...@@ -22,19 +22,22 @@ PYBIND11_PLUGIN(core) {
vs::TabletHelper::SetBuffer) vs::TabletHelper::SetBuffer)
// scalar interface // scalar interface
.def("as_int32_scalar", .def("as_int32_scalar",
[](const vs::TabletHelper& self) { [](vs::TabletHelper& self, vs::ImHelper& im) {
return vs::components::ScalarHelper<int32_t>(&self.data()); return vs::components::ScalarHelper<int32_t>(self, &im.handler());
}) })
.def("as_int64_scalar", .def("as_int64_scalar",
[](const vs::TabletHelper& self) { [](vs::TabletHelper& self, vs::ImHelper& im) {
return vs::components::ScalarHelper<int64_t>(&self.data()); return vs::components::ScalarHelper<int64_t>(&self.data(),
&im.handler());
}) })
.def("as_float_scalar", .def("as_float_scalar",
[](const vs::TabletHelper& self) { [](vs::TabletHelper& self, vs::ImHelper& im) {
return vs::components::ScalarHelper<float>(&self.data()); return vs::components::ScalarHelper<float>(&self.data(),
&im.handler());
}) })
.def("as_double_scalar", [](const vs::TabletHelper& self) { .def("as_double_scalar", [](vs::TabletHelper& self, vs::ImHelper& im) {
return vs::components::ScalarHelper<double>(&self.data()); return vs::components::ScalarHelper<double>(&self.data(),
&im.handler());
}); });
py::class_<vs::StorageHelper>(m, "Storage") py::class_<vs::StorageHelper>(m, "Storage")
...@@ -49,22 +52,26 @@ PYBIND11_PLUGIN(core) { ...@@ -49,22 +52,26 @@ PYBIND11_PLUGIN(core) {
vs::StorageHelper::SetBuffer); vs::StorageHelper::SetBuffer);
py::class_<vs::ImHelper>(m, "Im") py::class_<vs::ImHelper>(m, "Im")
.def("__init__",
[](vs::ImHelper& instance) { new (&instance) vs::ImHelper(); })
.def("storage", &vs::ImHelper::storage) .def("storage", &vs::ImHelper::storage)
.def("tablet", &vs::ImHelper::tablet) .def("tablet", &vs::ImHelper::tablet)
.def("add_tablet", &vs::ImHelper::AddTablet) .def("add_tablet", &vs::ImHelper::AddTablet)
.def("persist_to_disk", &vs::ImHelper::PersistToDisk) .def("persist_to_disk", &vs::ImHelper::PersistToDisk)
.def("clear_tablets", &vs::ImHelper::ClearTablets); .def("clear_tablets", &vs::ImHelper::ClearTablets)
.def("start_read_service",
&vs::ImHelper::StartReadService,
"start a thread to maintain read service")
.def("start_write_service",
&vs::ImHelper::StartWriteSerice,
"start a thread to maintain write service")
.def("stop_service",
&vs::ImHelper::StopService,
"stop the service thread");
m.def("start_read_service", // interfaces for components begin
&vs::start_read_service,
"global information-maintainer object.");
m.def("start_write_service",
&vs::start_write_service,
"global information-maintainer object.");
m.def("im", &vs::im);
m.def("stop_threads", &vs::stop_threads);
// interfaces for components // different data type of scalar conponent
#define ADD_SCALAR_TYPED_INTERFACE(T, name__) \ #define ADD_SCALAR_TYPED_INTERFACE(T, name__) \
py::class_<vs::components::ScalarHelper<T>>(m, #name__) \ py::class_<vs::components::ScalarHelper<T>>(m, #name__) \
.def("add_record", &vs::components::ScalarHelper<T>::AddRecord) \ .def("add_record", &vs::components::ScalarHelper<T>::AddRecord) \
......
...@@ -66,6 +66,8 @@ namespace components { ...@@ -66,6 +66,8 @@ namespace components {
template <typename T> template <typename T>
void ScalarHelper<T>::SetCaptions(const std::vector<std::string> &captions) { void ScalarHelper<T>::SetCaptions(const std::vector<std::string> &captions) {
ACQUIRE_HANDLER(handler_);
CHECK_EQ(data_->captions_size(), 0UL) << "the captions can set only once"; CHECK_EQ(data_->captions_size(), 0UL) << "the captions can set only once";
for (int i = 0; i < captions.size(); i++) { for (int i = 0; i < captions.size(); i++) {
data_->add_captions(captions[i]); data_->add_captions(captions[i]);
...@@ -74,6 +76,8 @@ void ScalarHelper<T>::SetCaptions(const std::vector<std::string> &captions) { ...@@ -74,6 +76,8 @@ void ScalarHelper<T>::SetCaptions(const std::vector<std::string> &captions) {
template <typename T> template <typename T>
void ScalarHelper<T>::AddRecord(int id, const std::vector<T> &values) { void ScalarHelper<T>::AddRecord(int id, const std::vector<T> &values) {
ACQUIRE_HANDLER(handler_);
CHECK_NOTNULL(data_); CHECK_NOTNULL(data_);
CHECK_GT(data_->captions_size(), 0UL) << "captions should be set first"; CHECK_GT(data_->captions_size(), 0UL) << "captions should be set first";
CHECK_EQ(data_->captions_size(), values.size()) CHECK_EQ(data_->captions_size(), values.size())
...@@ -94,6 +98,8 @@ void ScalarHelper<T>::AddRecord(int id, const std::vector<T> &values) { ...@@ -94,6 +98,8 @@ void ScalarHelper<T>::AddRecord(int id, const std::vector<T> &values) {
template <typename T> template <typename T>
std::vector<std::vector<T>> ScalarHelper<T>::GetRecords() const { std::vector<std::vector<T>> ScalarHelper<T>::GetRecords() const {
ACQUIRE_HANDLER(handler_);
std::vector<std::vector<T>> result; std::vector<std::vector<T>> result;
EntryHelper<T> entry_helper; EntryHelper<T> entry_helper;
for (int i = 0; i < data_->records_size(); i++) { for (int i = 0; i < data_->records_size(); i++) {
...@@ -107,6 +113,7 @@ std::vector<std::vector<T>> ScalarHelper<T>::GetRecords() const { ...@@ -107,6 +113,7 @@ std::vector<std::vector<T>> ScalarHelper<T>::GetRecords() const {
template <typename T> template <typename T>
std::vector<int> ScalarHelper<T>::GetIds() const { std::vector<int> ScalarHelper<T>::GetIds() const {
ACQUIRE_HANDLER(handler_);
CHECK_NOTNULL(data_); CHECK_NOTNULL(data_);
std::vector<int> result; std::vector<int> result;
for (int i = 0; i < data_->records_size(); i++) { for (int i = 0; i < data_->records_size(); i++) {
...@@ -117,6 +124,7 @@ std::vector<int> ScalarHelper<T>::GetIds() const { ...@@ -117,6 +124,7 @@ std::vector<int> ScalarHelper<T>::GetIds() const {
template <typename T> template <typename T>
std::vector<int> ScalarHelper<T>::GetTimestamps() const { std::vector<int> ScalarHelper<T>::GetTimestamps() const {
ACQUIRE_HANDLER(handler_);
CHECK_NOTNULL(data_); CHECK_NOTNULL(data_);
std::vector<int> result; std::vector<int> result;
for (int i = 0; i < data_->records_size(); i++) { for (int i = 0; i < data_->records_size(); i++) {
...@@ -127,6 +135,7 @@ std::vector<int> ScalarHelper<T>::GetTimestamps() const { ...@@ -127,6 +135,7 @@ std::vector<int> ScalarHelper<T>::GetTimestamps() const {
template <typename T> template <typename T>
std::vector<std::string> ScalarHelper<T>::GetCaptions() const { std::vector<std::string> ScalarHelper<T>::GetCaptions() const {
ACQUIRE_HANDLER(handler_);
return std::vector<std::string>(data_->captions().begin(), return std::vector<std::string>(data_->captions().begin(),
data_->captions().end()); data_->captions().end());
} }
......
...@@ -94,33 +94,50 @@ private: ...@@ -94,33 +94,50 @@ private:
class ImHelper { class ImHelper {
public: public:
ImHelper() {} // TODO(ChunweiYan) decouple helper with resource.
ImHelper() { im_.reset(new IM); }
ImHelper(std::unique_ptr<IM> im) : im_(std::move(im)) {}
StorageHelper storage() { StorageHelper storage() {
return StorageHelper(IM::Global().storage().mutable_data()); return StorageHelper(im_->storage().mutable_data());
} }
TabletHelper tablet(const std::string &tag) { TabletHelper tablet(const std::string &tag) {
return TabletHelper(IM::Global().storage().tablet(tag)); return TabletHelper(im_->storage().tablet(tag));
} }
TabletHelper AddTablet(const std::string &tag, int num_samples) { TabletHelper AddTablet(const std::string &tag, int num_samples) {
return TabletHelper(IM::Global().AddTablet(tag, num_samples)); return TabletHelper(im_->AddTablet(tag, num_samples));
} }
void ClearTablets() { std::mutex &handler() { return im_->handler(); }
IM::Global().storage().mutable_data()->clear_tablets(); void ClearTablets() { im_->storage().mutable_data()->clear_tablets(); }
void StartReadService(const std::string &dir, int msecs) {
im_->SetPersistDest(dir);
im_->MaintainRead(dir, msecs);
} }
void StartWriteSerice(const std::string &dir, int msecs) {
im_->SetPersistDest(dir);
im_->MaintainWrite(dir, msecs);
}
void StopService() { im_->executor().Quit(); }
void PersistToDisk() const { im_->PersistToDisk(); }
void PersistToDisk() const { IM::Global().PersistToDisk(); } private:
std::unique_ptr<IM> im_;
}; };
namespace components { namespace components {
#define ACQUIRE_HANDLER(handler) std::lock_guard<std::mutex> ____(*handler);
/* /*
* Read and write support for Scalar component. * Read and write support for Scalar component.
*/ */
template <typename T> template <typename T>
class ScalarHelper { class ScalarHelper {
public: public:
ScalarHelper(storage::Tablet *tablet) : data_(tablet) {} ScalarHelper(storage::Tablet *tablet, std::mutex *handler = nullptr)
: data_(tablet), handler_(handler) {}
ScalarHelper(TabletHelper &tablet, std::mutex *handler = nullptr)
: data_(&tablet.data()), handler_(handler) {}
void SetCaptions(const std::vector<std::string> &captions); void SetCaptions(const std::vector<std::string> &captions);
...@@ -138,27 +155,10 @@ public: ...@@ -138,27 +155,10 @@ public:
private: private:
storage::Tablet *data_; storage::Tablet *data_;
std::mutex *handler_;
}; };
} // namespace components } // namespace components
static ImHelper &im() {
static ImHelper im;
return im;
}
static void start_read_service(const std::string &dir) {
IM::Global().SetPersistDest(dir);
IM::Global().MaintainRead();
}
static void start_write_service(const std::string &dir) {
IM::Global().SetPersistDest(dir);
IM::Global().MaintainWrite();
}
static void stop_threads() { cc::PeriodExector::Global().Quit(); }
} // namespace visualdl } // namespace visualdl
#endif // VISUALDL_BACKEND_LOGIC_SDK_H #endif // VISUALDL_BACKEND_LOGIC_SDK_H
#include "visualdl/logic/sdk.h"
#include <gtest/gtest.h>
namespace visualdl {
struct ScalarTestHelper {
ImHelper rim;
ImHelper wim;
const std::string dir = "./tmp/sdk_test.test";
void operator()(std::function<void()> read, std::function<void()> write) {
wim.StartWriteSerice(dir, 200);
write();
// should wait for the write service create log's path
std::this_thread::sleep_for(std::chrono::milliseconds(400));
rim.StartReadService(dir, 100);
// should wait for the read service to load "tag0" tablet into memory
std::this_thread::sleep_for(std::chrono::milliseconds(600));
read();
}
};
TEST(Scalar, set_caption) {
ScalarTestHelper helper;
const std::vector<std::string> captions({"train", "test"});
auto write = [&] {
auto tablet = helper.wim.AddTablet("tag0", -1);
components::ScalarHelper<float> scalar(tablet, &helper.wim.handler());
scalar.SetCaptions(captions);
};
auto read = [&] {
auto mytablet = helper.rim.tablet("tag0");
components::ScalarHelper<float> myscalar(mytablet, &helper.rim.handler());
auto mycaptions = myscalar.GetCaptions();
ASSERT_EQ(captions, mycaptions);
};
helper(read, write);
}
TEST(Scalar, add_records) {
ScalarTestHelper helper;
const std::vector<std::string> captions({"train", "test"});
const size_t nsteps = 100;
auto write = [&] {
auto tablet = helper.wim.AddTablet("tag0", -1);
components::ScalarHelper<float> scalar(tablet, &helper.wim.handler());
scalar.SetCaptions(captions);
for (int i = 0; i < nsteps; i++) {
scalar.AddRecord(i * 10, std::vector<float>({(float)i, (float)i + 1}));
}
};
auto read = [&] {
auto mytablet = helper.rim.tablet("tag0");
components::ScalarHelper<float> myscalar(mytablet, &helper.rim.handler());
auto records = myscalar.GetRecords();
ASSERT_EQ(records.size(), nsteps);
for (int i = 0; i < nsteps; i++) {
ASSERT_EQ(records[i], std::vector<float>({(float)i, (float)i + 1}));
}
};
helper(read, write);
}
} // namespace visualdl
function(py_test TARGET_NAME)
set(options STATIC static SHARED shared)
set(oneValueArgs "")
set(multiValueArgs SRCS DEPS ARGS)
cmake_parse_arguments(py_test "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})
add_test(NAME ${TARGET_NAME}
COMMAND env PYTHONPATH=${CMAKE_BINARY_DIR}/visualdl/logic
${PYTHON_EXECUTABLE} -u ${py_test_SRCS} ${py_test_ARGS}
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
)
endfunction()
py_test(test_summary SRCS test_summary.py)
...@@ -6,26 +6,15 @@ import core ...@@ -6,26 +6,15 @@ import core
dtypes = ("float", "double", "int32", "int64") dtypes = ("float", "double", "int32", "int64")
def IM(dir, mode="read", msecs=500):
def set_storage(dir): im = core.Im()
''' READ = "read"
:param dir: str WRITE = "write"
directory of summary to write log. if mode == READ:
:return: None im.start_read_service(dir, msecs)
''' else:
core.im().storage().set_dir(dir) im.start_write_service(dir, msecs)
return im
def set_readable_storage(dir):
core.start_read_service(dir)
def set_writable_storage(dir):
core.start_write_service(dir)
def stop_service():
core.stop_threads()
class _Scalar(object): class _Scalar(object):
...@@ -93,7 +82,7 @@ class _Scalar(object): ...@@ -93,7 +82,7 @@ class _Scalar(object):
return self._core_object.get_record_size() return self._core_object.get_record_size()
def scalar(tag, dtype='float'): def scalar(im, tag, dtype='float'):
''' '''
create a scalar component. create a scalar component.
...@@ -105,24 +94,12 @@ def scalar(tag, dtype='float'): ...@@ -105,24 +94,12 @@ def scalar(tag, dtype='float'):
''' '''
assert dtype in dtypes, "invalid dtype(%s), should be one of %s" % ( assert dtype in dtypes, "invalid dtype(%s), should be one of %s" % (
dtype, str(dtypes)) dtype, str(dtypes))
tablet = core.im().add_tablet(tag, -1) tablet = im.add_tablet(tag, -1)
dtype2obj = {
'float': tablet.as_float_scalar,
'double': tablet.as_double_scalar,
'int32': tablet.as_int32_scalar,
'int64': tablet.as_int64_scalar,
}
obj = dtype2obj[dtype]()
return _Scalar(obj)
def read_scalar(tag, dtype='float'):
tablet = core.im().tablet(tag)
dtype2obj = { dtype2obj = {
'float': tablet.as_float_scalar, 'float': tablet.as_float_scalar,
'double': tablet.as_double_scalar, 'double': tablet.as_double_scalar,
'int32': tablet.as_int32_scalar, 'int32': tablet.as_int32_scalar,
'int64': tablet.as_int64_scalar, 'int64': tablet.as_int64_scalar,
} }
obj = dtype2obj[dtype]() obj = dtype2obj[dtype](im)
return _Scalar(obj) return _Scalar(obj)
import summary import summary
import numpy as np import numpy as np
import unittest import unittest
import random
import time import time
once_flag = False once_flag = False
...@@ -8,23 +9,27 @@ once_flag = False ...@@ -8,23 +9,27 @@ once_flag = False
class ScalarTester(unittest.TestCase): class ScalarTester(unittest.TestCase):
def setUp(self): def setUp(self):
summary.set_storage("tmp_dir") self.dir = "tmp/summary.test"
global once_flag # clean path
self.scalar = summary.scalar("scalar0") try:
if not once_flag: os.rmdir(self.dir)
self.py_captions = ["train cost", "test cost"] except:
self.scalar.set_captions(self.py_captions) pass
self.im = summary.IM(self.dir, "write", 200)
self.tablet_name = "scalar0"
self.scalar = summary.scalar(self.im, self.tablet_name)
self.py_captions = ["train cost", "test cost"]
self.scalar.set_captions(self.py_captions)
self.py_records = [] self.py_records = []
self.py_ids = [] self.py_ids = []
# write
for i in range(10): for i in range(10):
record = [0.1 * i, 0.2 * i] record = [0.1 * i, 0.2 * i]
id = i * 10 id = i * 10
self.py_records.append(record) self.py_records.append(record)
self.py_ids.append(id) self.py_ids.append(id)
if not once_flag: self.scalar.add(id, record)
self.scalar.add(id, record)
once_flag = True
def test_records(self): def test_records(self):
self.assertEqual(self.scalar.size, len(self.py_records)) self.assertEqual(self.scalar.size, len(self.py_records))
...@@ -39,6 +44,58 @@ class ScalarTester(unittest.TestCase): ...@@ -39,6 +44,58 @@ class ScalarTester(unittest.TestCase):
def test_captions(self): def test_captions(self):
self.assertEqual(self.scalar.captions, self.py_captions) self.assertEqual(self.scalar.captions, self.py_captions)
def test_read_records(self):
time.sleep(1)
im = summary.IM(self.dir, "read", 200)
time.sleep(1)
scalar = summary.scalar(im, self.tablet_name)
records = scalar.records
self.assertEqual(len(self.py_records), scalar.size)
for i, record in enumerate(self.scalar.records):
self.assertTrue(np.isclose(record, records[i]).all())
def test_read_ids(self):
time.sleep(0.6)
im = summary.IM(self.dir, "read", msecs=200)
time.sleep(0.6)
scalar = summary.scalar(im, self.tablet_name)
self.assertEqual(len(self.py_ids), scalar.size)
for i, id in enumerate(scalar.ids):
self.assertEqual(self.py_ids[i], id)
def test_read_captions(self):
time.sleep(0.6)
im = summary.IM(self.dir, "read", msecs=200)
time.sleep(0.6)
scalar = summary.scalar(im, self.tablet_name)
self.assertEqual(scalar.captions, self.py_captions)
def test_mix_read_write(self):
write_im = summary.IM(self.dir, "write", msecs=200)
time.sleep(0.6)
read_im = summary.IM(self.dir, "read", msecs=200)
scalar_writer = summary.scalar(write_im, self.tablet_name)
scalar_reader = summary.scalar(read_im, self.tablet_name)
scalar_writer.set_captions(["train cost", "test cost"])
for i in range(1000):
scalar_writer.add(i, [random.random(), random.random()])
scalar_reader.records
for i in range(500):
scalar_writer.add(i, [random.random(), random.random()])
scalar_reader.records
for i in range(500):
scalar_writer.add(i, [random.random(), random.random()])
for i in range(10):
scalar_reader.records
scalar_reader.captions
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
...@@ -9,13 +9,14 @@ namespace visualdl { ...@@ -9,13 +9,14 @@ namespace visualdl {
const std::string StorageBase::meta_file_name = "storage.meta"; const std::string StorageBase::meta_file_name = "storage.meta";
std::string StorageBase::meta_path() const { std::string StorageBase::meta_path(const std::string &dir) const {
CHECK(!storage_.dir().empty()) << "storage.dir should be set first"; CHECK(!dir.empty()) << "dir is empty";
return storage_.dir() + "/" + meta_file_name; return dir + "/" + meta_file_name;
} }
std::string StorageBase::tablet_path(const std::string &tag) const { std::string StorageBase::tablet_path(const std::string &dir,
CHECK(!storage_.dir().empty()) << "storage.dir should be set first"; const std::string &tag) const {
return storage_.dir() + "/" + tag; CHECK(!dir.empty()) << "dir should be set first";
return dir + "/" + tag;
} }
storage::Tablet *MemoryStorage::NewTablet(const std::string &tag, storage::Tablet *MemoryStorage::NewTablet(const std::string &tag,
...@@ -39,18 +40,20 @@ storage::Tablet *MemoryStorage::tablet(const std::string &tag) { ...@@ -39,18 +40,20 @@ storage::Tablet *MemoryStorage::tablet(const std::string &tag) {
} }
// TODO add some checksum to avoid unnecessary saving // TODO add some checksum to avoid unnecessary saving
void MemoryStorage::PersistToDisk() const { void MemoryStorage::PersistToDisk(const std::string &dir) {
CHECK(!storage_.dir().empty()) << "storage's dir should be set first"; CHECK(!dir.empty());
VLOG(3) << "persist storage to disk path " << storage_.dir(); storage_.set_dir(dir);
// make a directory if not exist // make a directory if not exist
fs::TryMkdir(storage_.dir()); fs::TryRecurMkdir(dir);
// write storage out // write storage out
fs::SerializeToFile(storage_, meta_path()); VLOG(2) << "to serize meta to dir " << dir;
fs::SerializeToFile(storage_, meta_path(dir));
VLOG(2) << "serize meta to dir " << dir;
// write all the tablets // write all the tablets
for (auto tag : storage_.tags()) { for (auto tag : storage_.tags()) {
auto it = tablets_.find(tag); auto it = tablets_.find(tag);
CHECK(it != tablets_.end()); CHECK(it != tablets_.end());
fs::SerializeToFile(it->second, tablet_path(tag)); fs::SerializeToFile(it->second, tablet_path(dir, tag));
} }
} }
...@@ -59,33 +62,52 @@ void MemoryStorage::LoadFromDisk(const std::string &dir) { ...@@ -59,33 +62,52 @@ void MemoryStorage::LoadFromDisk(const std::string &dir) {
CHECK(!dir.empty()) << "dir is empty"; CHECK(!dir.empty()) << "dir is empty";
storage_.set_dir(dir); storage_.set_dir(dir);
// load storage // load storage
CHECK(fs::DeSerializeFromFile(&storage_, meta_path())) CHECK(fs::DeSerializeFromFile(&storage_, meta_path(dir)))
<< "parse from " << meta_path() << " failed"; << "parse from " << meta_path(dir) << " failed";
// load all the tablets // load all the tablets
for (int i = 0; i < storage_.tags_size(); i++) { for (int i = 0; i < storage_.tags_size(); i++) {
auto tag = storage_.tags(i); auto tag = storage_.tags(i);
CHECK(fs::DeSerializeFromFile(&tablets_[tag], tablet_path(tag))); CHECK(fs::DeSerializeFromFile(&tablets_[tag], tablet_path(dir, tag)));
} }
} }
void MemoryStorage::StartReadService() { void MemoryStorage::StartReadService(const std::string &dir,
cc::PeriodExector::task_t task = [this] { int msecs,
VLOG(3) << "loading from " << storage_.dir(); std::mutex *handler) {
LoadFromDisk(storage_.dir()); CHECK(executor_ != nullptr);
CHECK(!dir.empty()) << "dir should be set first";
cc::PeriodExector::task_t task = [dir, this, handler] {
VLOG(1) << "loading from " << dir;
if (handler != nullptr) {
std::lock_guard<std::mutex> _(*handler);
LoadFromDisk(dir);
} else {
LoadFromDisk(dir);
}
return true; return true;
}; };
cc::PeriodExector::Global().Start(); // executor_.Start();
VLOG(3) << "push read task"; VLOG(1) << "push read task";
cc::PeriodExector::Global()(std::move(task), 2512); (*executor_)(std::move(task), msecs);
} }
void MemoryStorage::StartWriteSerice() { void MemoryStorage::StartWriteService(const std::string &dir,
cc::PeriodExector::Global().Start(); int msecs,
cc::PeriodExector::task_t task = [this] { std::mutex *handler) {
PersistToDisk(); CHECK(executor_ != nullptr);
CHECK(!dir.empty()) << "dir should be set first";
storage_.set_dir(dir);
// executor_.Start();
cc::PeriodExector::task_t task = [dir, handler, this] {
VLOG(2) << "persist to disk";
if (handler != nullptr) {
std::lock_guard<std::mutex> _(*handler);
PersistToDisk(dir);
} else {
PersistToDisk(dir);
}
return true; return true;
}; };
cc::PeriodExector::Global()(std::move(task), 2000); (*executor_)(std::move(task), msecs);
} }
} // namespace visualdl } // namespace visualdl
...@@ -3,9 +3,12 @@ ...@@ -3,9 +3,12 @@
#include <time.h> #include <time.h>
#include <map> #include <map>
#include <memory>
#include <mutex>
#include <string> #include <string>
#include "visualdl/storage/storage.pb.h" #include "visualdl/storage/storage.pb.h"
#include "visualdl/utils/concurrency.h"
namespace visualdl { namespace visualdl {
...@@ -36,8 +39,8 @@ public: ...@@ -36,8 +39,8 @@ public:
storage_.set_dir(dir); storage_.set_dir(dir);
} }
std::string meta_path() const; std::string meta_path(const std::string &dir) const;
std::string tablet_path(const std::string &tag) const; std::string tablet_path(const std::string &dir, const std::string &tag) const;
/* /*
* Create a new Tablet storage. * Create a new Tablet storage.
...@@ -56,7 +59,7 @@ public: ...@@ -56,7 +59,7 @@ public:
* Persist the data from cache to disk. Both the memory storage or disk * Persist the data from cache to disk. Both the memory storage or disk
* storage should write changes to disk for persistence. * storage should write changes to disk for persistence.
*/ */
virtual void PersistToDisk() const = 0; virtual void PersistToDisk(const std::string &dir) = 0;
/* /*
* Load data from disk. * Load data from disk.
...@@ -75,28 +78,41 @@ protected: ...@@ -75,28 +78,41 @@ protected:
*/ */
class MemoryStorage final : public StorageBase { class MemoryStorage final : public StorageBase {
public: public:
MemoryStorage() {}
MemoryStorage(cc::PeriodExector *executor) : executor_(executor) {}
~MemoryStorage() {
if (executor_ != nullptr) executor_->Quit();
}
storage::Tablet *NewTablet(const std::string &tag, int num_samples) override; storage::Tablet *NewTablet(const std::string &tag, int num_samples) override;
storage::Tablet *tablet(const std::string &tag) override; storage::Tablet *tablet(const std::string &tag) override;
void PersistToDisk() const override; void PersistToDisk(const std::string &dir) override;
void LoadFromDisk(const std::string &dir) override; void LoadFromDisk(const std::string &dir) override;
/* /*
* Create a thread which will keep reading the latest data from the disk to * Create a thread which will keep reading the latest data from the disk to
* memory. * memory.
*
* msecs: how many millisecond to sync memory and disk.
*/ */
void StartReadService(); void StartReadService(const std::string &dir, int msecs, std::mutex *handler);
/* /*
* Create a thread which will keep writing the latest changes from memory to * Create a thread which will keep writing the latest changes from memory to
* disk. * disk.
*
* msecs: how many millisecond to sync memory and disk.
*/ */
void StartWriteSerice(); void StartWriteService(const std::string &dir,
int msecs,
std::mutex *handler);
private: private:
std::map<std::string, storage::Tablet> tablets_; std::map<std::string, storage::Tablet> tablets_;
// TODO(ChunweiYan) remove executor here.
cc::PeriodExector *executor_{nullptr};
}; };
} // namespace visualdl } // namespace visualdl
......
...@@ -32,15 +32,17 @@ TEST_F(MemoryStorageTest, AddTablet) { ...@@ -32,15 +32,17 @@ TEST_F(MemoryStorageTest, AddTablet) {
} }
TEST_F(MemoryStorageTest, PersistToDisk) { TEST_F(MemoryStorageTest, PersistToDisk) {
storage_.SetStorage("./tmp"); const std::string dir = "./tmp/201.test";
CHECK(!storage_.data().dir().empty()); storage_.SetStorage(dir);
string tag = "add%20tag0"; string tag = "add%20tag0";
storage_.NewTablet(tag, -1); storage_.NewTablet(tag, -1);
storage_.PersistToDisk(); storage_.PersistToDisk(dir);
LOG(INFO) << "persist to disk";
MemoryStorage other; MemoryStorage other;
other.LoadFromDisk("./tmp"); other.LoadFromDisk(dir);
LOG(INFO) << "read from disk";
ASSERT_EQ(other.data().SerializeAsString(), ASSERT_EQ(other.data().SerializeAsString(),
storage_.data().SerializeAsString()); storage_.data().SerializeAsString());
} }
......
...@@ -17,13 +17,9 @@ namespace cc { ...@@ -17,13 +17,9 @@ namespace cc {
struct PeriodExector { struct PeriodExector {
using task_t = std::function<bool()>; using task_t = std::function<bool()>;
static PeriodExector& Global() {
static PeriodExector exec;
return exec;
}
void Quit() { void Quit() {
// TODO use some conditonal variable to help quit immediately. // TODO use some conditonal variable to help quit immediately.
// std::this_thread::sleep_for(std::chrono::milliseconds(200));
quit = true; quit = true;
} }
...@@ -34,6 +30,7 @@ struct PeriodExector { ...@@ -34,6 +30,7 @@ struct PeriodExector {
auto task_wrapper = [=] { auto task_wrapper = [=] {
while (!quit) { while (!quit) {
// task failed
if (!task()) break; if (!task()) break;
// if the program is terminated, quit while as soon as possible. // if the program is terminated, quit while as soon as possible.
// this is just trick, but should works. // this is just trick, but should works.
...@@ -57,6 +54,7 @@ struct PeriodExector { ...@@ -57,6 +54,7 @@ struct PeriodExector {
} }
~PeriodExector() { ~PeriodExector() {
Quit();
for (auto& t : threads_) { for (auto& t : threads_) {
if (t.joinable()) { if (t.joinable()) {
t.join(); t.join();
......
...@@ -39,6 +39,7 @@ bool SerializeToFile(const T& proto, const std::string& path) { ...@@ -39,6 +39,7 @@ bool SerializeToFile(const T& proto, const std::string& path) {
template <typename T> template <typename T>
bool DeSerializeFromFile(T* proto, const std::string& path) { bool DeSerializeFromFile(T* proto, const std::string& path) {
std::ifstream file(path, std::ios::binary); std::ifstream file(path, std::ios::binary);
CHECK(file.is_open()) << "open " << path << " failed";
return proto->ParseFromIstream(&file); return proto->ParseFromIstream(&file);
} }
...@@ -50,6 +51,19 @@ void TryMkdir(const std::string& dir) { ...@@ -50,6 +51,19 @@ void TryMkdir(const std::string& dir) {
} }
} }
// Create a path by recursively create directries
void TryRecurMkdir(const std::string& path) {
// split path by '/'
for (int i = 1; i < path.size() - 1; i++) {
if (path[i] == '/') {
auto dir = path.substr(0, i + 1);
TryMkdir(dir);
}
}
// the last level
TryMkdir(path);
}
inline void Write(const std::string& path, inline void Write(const std::string& path,
const std::string& buffer, const std::string& buffer,
std::ios::openmode open_mode = std::ios::binary) { std::ios::openmode open_mode = std::ios::binary) {
......
...@@ -8,12 +8,13 @@ namespace visualdl { ...@@ -8,12 +8,13 @@ namespace visualdl {
int counter = 0; int counter = 0;
TEST(concurrency, test) { TEST(concurrency, test) {
cc::PeriodExector::task_t task = [&counter]() { cc::PeriodExector executor;
cc::PeriodExector::task_t task = [&]() {
LOG(INFO) << "Hello " << counter++; LOG(INFO) << "Hello " << counter++;
if (counter > 5) return false; if (counter > 5) return false;
return true; return true;
}; };
cc::PeriodExector::Global()(std::move(task), 200); executor(std::move(task), 200);
} }
} // namespace visualdl } // namespace visualdl
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册