Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
VisualDL
提交
6eaf6a75
V
VisualDL
项目概览
PaddlePaddle
/
VisualDL
1 年多 前同步成功
通知
88
Star
4655
Fork
642
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
10
列表
看板
标记
里程碑
合并请求
2
Wiki
5
Wiki
分析
仓库
DevOps
项目成员
Pages
V
VisualDL
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
10
Issue
10
列表
看板
标记
里程碑
合并请求
2
合并请求
2
Pages
分析
分析
仓库分析
DevOps
Wiki
5
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
6eaf6a75
编写于
12月 16, 2017
作者:
S
Superjom
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
remove singleton design
上级
93c38d53
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
170 addition
and
114 deletion
+170
-114
CMakeLists.txt
CMakeLists.txt
+2
-1
visualdl/logic/im.cc
visualdl/logic/im.cc
+21
-21
visualdl/logic/im.h
visualdl/logic/im.h
+17
-11
visualdl/logic/im_test.cc
visualdl/logic/im_test.cc
+3
-1
visualdl/logic/pybind.cc
visualdl/logic/pybind.cc
+12
-10
visualdl/logic/sdk.cc
visualdl/logic/sdk.cc
+9
-0
visualdl/logic/sdk.h
visualdl/logic/sdk.h
+26
-26
visualdl/storage/storage.cc
visualdl/storage/storage.cc
+50
-27
visualdl/storage/storage.h
visualdl/storage/storage.h
+18
-6
visualdl/storage/storage_test.cc
visualdl/storage/storage_test.cc
+6
-4
visualdl/utils/concurrency.h
visualdl/utils/concurrency.h
+3
-5
visualdl/utils/test_concurrency.cc
visualdl/utils/test_concurrency.cc
+3
-2
未找到文件。
CMakeLists.txt
浏览文件 @
6eaf6a75
...
@@ -39,10 +39,11 @@ add_executable(vl_test
...
@@ -39,10 +39,11 @@ add_executable(vl_test
${
PROJECT_SOURCE_DIR
}
/visualdl/storage/storage_test.cc
${
PROJECT_SOURCE_DIR
}
/visualdl/storage/storage_test.cc
${
PROJECT_SOURCE_DIR
}
/visualdl/utils/test_concurrency.cc
${
PROJECT_SOURCE_DIR
}
/visualdl/utils/test_concurrency.cc
${
PROJECT_SOURCE_DIR
}
/visualdl/logic/im_test.cc
${
PROJECT_SOURCE_DIR
}
/visualdl/logic/im_test.cc
${
PROJECT_SOURCE_DIR
}
/visualdl/logic/sdk_test.cc
${
PROJECT_SOURCE_DIR
}
/visualdl/utils/concurrency.h
${
PROJECT_SOURCE_DIR
}
/visualdl/utils/concurrency.h
${
PROJECT_SOURCE_DIR
}
/visualdl/utils/filesystem.h
${
PROJECT_SOURCE_DIR
}
/visualdl/utils/filesystem.h
)
)
target_link_libraries
(
vl_test storage im gtest glog protobuf gflags pthread
)
target_link_libraries
(
vl_test storage
sdk
im gtest glog protobuf gflags pthread
)
enable_testing
()
enable_testing
()
...
...
visualdl/logic/im.cc
浏览文件 @
6eaf6a75
...
@@ -25,26 +25,26 @@ int ReserviorSample(int num_samples, int num_records) {
...
@@ -25,26 +25,26 @@ int ReserviorSample(int num_samples, int num_records) {
return
-
1
;
return
-
1
;
}
}
IM
::
IM
(
StorageBase
::
Type
type
,
StorageBase
::
Mode
mode
)
{
//
IM::IM(StorageBase::Type type, StorageBase::Mode mode) {
switch
(
type
)
{
//
switch (type) {
case
StorageBase
::
Type
::
kMemory
:
{
//
case StorageBase::Type::kMemory: {
storage_
.
reset
(
new
MemoryStorage
);
//
storage_.reset(new MemoryStorage);
}
break
;
//
} break;
default:
//
default:
CHECK
(
false
)
<<
"Unsupported storage kind "
<<
type
;
//
CHECK(false) << "Unsupported storage kind " << type;
}
//
}
switch
(
mode
)
{
//
switch (mode) {
case
StorageBase
::
Mode
::
kRead
:
//
case StorageBase::Mode::kRead:
dynamic_cast
<
MemoryStorage
*>
(
storage_
.
get
())
->
StartReadService
(
);
// dynamic_cast<MemoryStorage *>(storage_.get())->StartReadService(500
);
break
;
//
break;
case
StorageBase
::
Mode
::
kWrite
:
//
case StorageBase::Mode::kWrite:
dynamic_cast
<
MemoryStorage
*>
(
storage_
.
get
())
->
StartWriteSerice
(
);
// dynamic_cast<MemoryStorage *>(storage_.get())->StartWriteSerice(500
);
break
;
//
break;
default:
//
default:
break
;
//
break;
}
//
}
}
//
}
void
IM
::
SetPersistDest
(
const
std
::
string
&
path
)
{
void
IM
::
SetPersistDest
(
const
std
::
string
&
path
)
{
CHECK
(
storage_
->
mutable_data
()
->
dir
().
empty
())
CHECK
(
storage_
->
mutable_data
()
->
dir
().
empty
())
...
@@ -95,7 +95,7 @@ void IM::PersistToDisk() {
...
@@ -95,7 +95,7 @@ void IM::PersistToDisk() {
CHECK
(
!
storage_
->
data
().
dir
().
empty
())
<<
"path of storage should be set"
;
CHECK
(
!
storage_
->
data
().
dir
().
empty
())
<<
"path of storage should be set"
;
// TODO make dir first
// TODO make dir first
// MakeDir(storage_.data().dir());
// MakeDir(storage_.data().dir());
storage_
->
PersistToDisk
();
storage_
->
PersistToDisk
(
storage_
->
data
().
dir
()
);
}
}
}
// namespace visualdl
}
// namespace visualdl
visualdl/logic/im.h
浏览文件 @
6eaf6a75
...
@@ -27,22 +27,21 @@ namespace visualdl {
...
@@ -27,22 +27,21 @@ namespace visualdl {
*/
*/
class
IM
final
{
class
IM
final
{
public:
public:
IM
()
{
storage_
.
reset
(
new
MemoryStorage
);
}
IM
()
{
storage_
.
reset
(
new
MemoryStorage
(
&
executor_
));
}
IM
(
StorageBase
::
Type
type
,
StorageBase
::
Mode
mode
);
// IM(StorageBase::Type type, StorageBase::Mode mode);
~
IM
()
{
cc
::
PeriodExector
::
Global
().
Quit
();
}
~
IM
()
{
executor_
.
Quit
();
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
1000
));
static
IM
&
Global
()
{
static
IM
x
;
return
x
;
}
}
void
MaintainRead
()
{
void
MaintainRead
(
const
std
::
string
&
dir
,
int
msecs
)
{
LOG
(
INFO
)
<<
"start maintain read"
;
LOG
(
INFO
)
<<
"start maintain read"
;
dynamic_cast
<
MemoryStorage
*>
(
storage_
.
get
())
->
StartReadService
();
dynamic_cast
<
MemoryStorage
*>
(
storage_
.
get
())
->
StartReadService
(
dir
,
msecs
,
&
lock_
);
}
}
void
MaintainWrite
()
{
void
MaintainWrite
(
const
std
::
string
&
dir
,
int
msecs
)
{
dynamic_cast
<
MemoryStorage
*>
(
storage_
.
get
())
->
StartWriteSerice
();
dynamic_cast
<
MemoryStorage
*>
(
storage_
.
get
())
->
StartWriteSerice
(
dir
,
msecs
,
&
lock_
);
}
}
/*
/*
...
@@ -73,8 +72,15 @@ public:
...
@@ -73,8 +72,15 @@ public:
StorageBase
&
storage
()
{
return
*
storage_
;
}
StorageBase
&
storage
()
{
return
*
storage_
;
}
cc
::
PeriodExector
&
executor
()
{
return
executor_
;
}
std
::
mutex
&
handler
()
{
return
lock_
;
}
private:
private:
// read write lock for protobuf in memory
std
::
mutex
lock_
;
std
::
unique_ptr
<
StorageBase
>
storage_
;
std
::
unique_ptr
<
StorageBase
>
storage_
;
cc
::
PeriodExector
executor_
;
};
};
}
// namespace visualdl
}
// namespace visualdl
...
...
visualdl/logic/im_test.cc
浏览文件 @
6eaf6a75
...
@@ -2,13 +2,15 @@
...
@@ -2,13 +2,15 @@
#include "gtest/gtest.h"
#include "gtest/gtest.h"
#include "visualdl/storage/storage.h"
namespace
visualdl
{
namespace
visualdl
{
class
ImTester
:
public
::
testing
::
Test
{
class
ImTester
:
public
::
testing
::
Test
{
protected:
protected:
void
SetUp
()
override
{}
void
SetUp
()
override
{}
IM
&
im
=
IM
::
Global
()
;
IM
im
;
};
};
TEST_F
(
ImTester
,
AddTablet
)
{
TEST_F
(
ImTester
,
AddTablet
)
{
...
...
visualdl/logic/pybind.cc
浏览文件 @
6eaf6a75
...
@@ -53,18 +53,20 @@ PYBIND11_PLUGIN(core) {
...
@@ -53,18 +53,20 @@ PYBIND11_PLUGIN(core) {
.
def
(
"tablet"
,
&
vs
::
ImHelper
::
tablet
)
.
def
(
"tablet"
,
&
vs
::
ImHelper
::
tablet
)
.
def
(
"add_tablet"
,
&
vs
::
ImHelper
::
AddTablet
)
.
def
(
"add_tablet"
,
&
vs
::
ImHelper
::
AddTablet
)
.
def
(
"persist_to_disk"
,
&
vs
::
ImHelper
::
PersistToDisk
)
.
def
(
"persist_to_disk"
,
&
vs
::
ImHelper
::
PersistToDisk
)
.
def
(
"clear_tablets"
,
&
vs
::
ImHelper
::
ClearTablets
);
.
def
(
"clear_tablets"
,
&
vs
::
ImHelper
::
ClearTablets
)
.
def
(
"start_read_service"
,
&
vs
::
ImHelper
::
StartReadService
,
"start a thread to maintain read service"
)
.
def
(
"start_write_service"
,
&
vs
::
ImHelper
::
StartWriteSerice
,
"start a thread to maintain write service"
)
.
def
(
"stop_service"
,
&
vs
::
ImHelper
::
StopService
,
"stop the service thread"
);
m
.
def
(
"start_read_service"
,
// interfaces for components begin
&
vs
::
start_read_service
,
"global information-maintainer object."
);
m
.
def
(
"start_write_service"
,
&
vs
::
start_write_service
,
"global information-maintainer object."
);
m
.
def
(
"im"
,
&
vs
::
im
);
m
.
def
(
"stop_threads"
,
&
vs
::
stop_threads
);
//
interfaces for components
//
different data type of scalar conponent
#define ADD_SCALAR_TYPED_INTERFACE(T, name__) \
#define ADD_SCALAR_TYPED_INTERFACE(T, name__) \
py::class_<vs::components::ScalarHelper<T>>(m, #name__) \
py::class_<vs::components::ScalarHelper<T>>(m, #name__) \
.def("add_record", &vs::components::ScalarHelper<T>::AddRecord) \
.def("add_record", &vs::components::ScalarHelper<T>::AddRecord) \
...
...
visualdl/logic/sdk.cc
浏览文件 @
6eaf6a75
...
@@ -66,6 +66,8 @@ namespace components {
...
@@ -66,6 +66,8 @@ namespace components {
template
<
typename
T
>
template
<
typename
T
>
void
ScalarHelper
<
T
>::
SetCaptions
(
const
std
::
vector
<
std
::
string
>
&
captions
)
{
void
ScalarHelper
<
T
>::
SetCaptions
(
const
std
::
vector
<
std
::
string
>
&
captions
)
{
ACQUIRE_HANDLER
(
handler_
);
CHECK_EQ
(
data_
->
captions_size
(),
0UL
)
<<
"the captions can set only once"
;
CHECK_EQ
(
data_
->
captions_size
(),
0UL
)
<<
"the captions can set only once"
;
for
(
int
i
=
0
;
i
<
captions
.
size
();
i
++
)
{
for
(
int
i
=
0
;
i
<
captions
.
size
();
i
++
)
{
data_
->
add_captions
(
captions
[
i
]);
data_
->
add_captions
(
captions
[
i
]);
...
@@ -74,6 +76,8 @@ void ScalarHelper<T>::SetCaptions(const std::vector<std::string> &captions) {
...
@@ -74,6 +76,8 @@ void ScalarHelper<T>::SetCaptions(const std::vector<std::string> &captions) {
template
<
typename
T
>
template
<
typename
T
>
void
ScalarHelper
<
T
>::
AddRecord
(
int
id
,
const
std
::
vector
<
T
>
&
values
)
{
void
ScalarHelper
<
T
>::
AddRecord
(
int
id
,
const
std
::
vector
<
T
>
&
values
)
{
ACQUIRE_HANDLER
(
handler_
);
CHECK_NOTNULL
(
data_
);
CHECK_NOTNULL
(
data_
);
CHECK_GT
(
data_
->
captions_size
(),
0UL
)
<<
"captions should be set first"
;
CHECK_GT
(
data_
->
captions_size
(),
0UL
)
<<
"captions should be set first"
;
CHECK_EQ
(
data_
->
captions_size
(),
values
.
size
())
CHECK_EQ
(
data_
->
captions_size
(),
values
.
size
())
...
@@ -94,6 +98,8 @@ void ScalarHelper<T>::AddRecord(int id, const std::vector<T> &values) {
...
@@ -94,6 +98,8 @@ void ScalarHelper<T>::AddRecord(int id, const std::vector<T> &values) {
template
<
typename
T
>
template
<
typename
T
>
std
::
vector
<
std
::
vector
<
T
>>
ScalarHelper
<
T
>::
GetRecords
()
const
{
std
::
vector
<
std
::
vector
<
T
>>
ScalarHelper
<
T
>::
GetRecords
()
const
{
ACQUIRE_HANDLER
(
handler_
);
std
::
vector
<
std
::
vector
<
T
>>
result
;
std
::
vector
<
std
::
vector
<
T
>>
result
;
EntryHelper
<
T
>
entry_helper
;
EntryHelper
<
T
>
entry_helper
;
for
(
int
i
=
0
;
i
<
data_
->
records_size
();
i
++
)
{
for
(
int
i
=
0
;
i
<
data_
->
records_size
();
i
++
)
{
...
@@ -107,6 +113,7 @@ std::vector<std::vector<T>> ScalarHelper<T>::GetRecords() const {
...
@@ -107,6 +113,7 @@ std::vector<std::vector<T>> ScalarHelper<T>::GetRecords() const {
template
<
typename
T
>
template
<
typename
T
>
std
::
vector
<
int
>
ScalarHelper
<
T
>::
GetIds
()
const
{
std
::
vector
<
int
>
ScalarHelper
<
T
>::
GetIds
()
const
{
ACQUIRE_HANDLER
(
handler_
);
CHECK_NOTNULL
(
data_
);
CHECK_NOTNULL
(
data_
);
std
::
vector
<
int
>
result
;
std
::
vector
<
int
>
result
;
for
(
int
i
=
0
;
i
<
data_
->
records_size
();
i
++
)
{
for
(
int
i
=
0
;
i
<
data_
->
records_size
();
i
++
)
{
...
@@ -117,6 +124,7 @@ std::vector<int> ScalarHelper<T>::GetIds() const {
...
@@ -117,6 +124,7 @@ std::vector<int> ScalarHelper<T>::GetIds() const {
template
<
typename
T
>
template
<
typename
T
>
std
::
vector
<
int
>
ScalarHelper
<
T
>::
GetTimestamps
()
const
{
std
::
vector
<
int
>
ScalarHelper
<
T
>::
GetTimestamps
()
const
{
ACQUIRE_HANDLER
(
handler_
);
CHECK_NOTNULL
(
data_
);
CHECK_NOTNULL
(
data_
);
std
::
vector
<
int
>
result
;
std
::
vector
<
int
>
result
;
for
(
int
i
=
0
;
i
<
data_
->
records_size
();
i
++
)
{
for
(
int
i
=
0
;
i
<
data_
->
records_size
();
i
++
)
{
...
@@ -127,6 +135,7 @@ std::vector<int> ScalarHelper<T>::GetTimestamps() const {
...
@@ -127,6 +135,7 @@ std::vector<int> ScalarHelper<T>::GetTimestamps() const {
template
<
typename
T
>
template
<
typename
T
>
std
::
vector
<
std
::
string
>
ScalarHelper
<
T
>::
GetCaptions
()
const
{
std
::
vector
<
std
::
string
>
ScalarHelper
<
T
>::
GetCaptions
()
const
{
ACQUIRE_HANDLER
(
handler_
);
return
std
::
vector
<
std
::
string
>
(
data_
->
captions
().
begin
(),
return
std
::
vector
<
std
::
string
>
(
data_
->
captions
().
begin
(),
data_
->
captions
().
end
());
data_
->
captions
().
end
());
}
}
...
...
visualdl/logic/sdk.h
浏览文件 @
6eaf6a75
...
@@ -94,33 +94,50 @@ private:
...
@@ -94,33 +94,50 @@ private:
class
ImHelper
{
class
ImHelper
{
public:
public:
ImHelper
()
{}
// TODO(ChunweiYan) decouple helper with resource.
ImHelper
()
{
im_
.
reset
(
new
IM
);
}
ImHelper
(
std
::
unique_ptr
<
IM
>
im
)
:
im_
(
std
::
move
(
im
))
{}
StorageHelper
storage
()
{
StorageHelper
storage
()
{
return
StorageHelper
(
IM
::
Global
().
storage
().
mutable_data
());
return
StorageHelper
(
im_
->
storage
().
mutable_data
());
}
}
TabletHelper
tablet
(
const
std
::
string
&
tag
)
{
TabletHelper
tablet
(
const
std
::
string
&
tag
)
{
return
TabletHelper
(
IM
::
Global
().
storage
().
tablet
(
tag
));
return
TabletHelper
(
im_
->
storage
().
tablet
(
tag
));
}
}
TabletHelper
AddTablet
(
const
std
::
string
&
tag
,
int
num_samples
)
{
TabletHelper
AddTablet
(
const
std
::
string
&
tag
,
int
num_samples
)
{
return
TabletHelper
(
IM
::
Global
().
AddTablet
(
tag
,
num_samples
));
return
TabletHelper
(
im_
->
AddTablet
(
tag
,
num_samples
));
}
}
void
ClearTablets
()
{
std
::
mutex
&
handler
()
{
return
im_
->
handler
();
}
IM
::
Global
().
storage
().
mutable_data
()
->
clear_tablets
();
void
ClearTablets
()
{
im_
->
storage
().
mutable_data
()
->
clear_tablets
();
}
void
StartReadService
(
const
std
::
string
&
dir
,
int
msecs
)
{
im_
->
SetPersistDest
(
dir
);
im_
->
MaintainRead
(
dir
,
msecs
);
}
}
void
StartWriteSerice
(
const
std
::
string
&
dir
,
int
msecs
)
{
im_
->
SetPersistDest
(
dir
);
im_
->
MaintainWrite
(
dir
,
msecs
);
}
void
StopService
()
{
im_
->
executor
().
Quit
();
}
void
PersistToDisk
()
const
{
im_
->
PersistToDisk
();
}
void
PersistToDisk
()
const
{
IM
::
Global
().
PersistToDisk
();
}
private:
std
::
unique_ptr
<
IM
>
im_
;
};
};
namespace
components
{
namespace
components
{
#define ACQUIRE_HANDLER(handler) std::lock_guard<std::mutex> ____(*handler);
/*
/*
* Read and write support for Scalar component.
* Read and write support for Scalar component.
*/
*/
template
<
typename
T
>
template
<
typename
T
>
class
ScalarHelper
{
class
ScalarHelper
{
public:
public:
ScalarHelper
(
storage
::
Tablet
*
tablet
)
:
data_
(
tablet
)
{}
ScalarHelper
(
storage
::
Tablet
*
tablet
,
std
::
mutex
*
handler
=
nullptr
)
:
data_
(
tablet
),
handler_
(
handler
)
{}
ScalarHelper
(
TabletHelper
&
tablet
,
std
::
mutex
*
handler
=
nullptr
)
:
data_
(
&
tablet
.
data
()),
handler_
(
handler
)
{}
void
SetCaptions
(
const
std
::
vector
<
std
::
string
>
&
captions
);
void
SetCaptions
(
const
std
::
vector
<
std
::
string
>
&
captions
);
...
@@ -138,27 +155,10 @@ public:
...
@@ -138,27 +155,10 @@ public:
private:
private:
storage
::
Tablet
*
data_
;
storage
::
Tablet
*
data_
;
std
::
mutex
*
handler_
;
};
};
}
// namespace components
}
// namespace components
static
ImHelper
&
im
()
{
static
ImHelper
im
;
return
im
;
}
static
void
start_read_service
(
const
std
::
string
&
dir
)
{
IM
::
Global
().
SetPersistDest
(
dir
);
IM
::
Global
().
MaintainRead
();
}
static
void
start_write_service
(
const
std
::
string
&
dir
)
{
IM
::
Global
().
SetPersistDest
(
dir
);
IM
::
Global
().
MaintainWrite
();
}
static
void
stop_threads
()
{
cc
::
PeriodExector
::
Global
().
Quit
();
}
}
// namespace visualdl
}
// namespace visualdl
#endif // VISUALDL_BACKEND_LOGIC_SDK_H
#endif // VISUALDL_BACKEND_LOGIC_SDK_H
visualdl/storage/storage.cc
浏览文件 @
6eaf6a75
...
@@ -9,13 +9,14 @@ namespace visualdl {
...
@@ -9,13 +9,14 @@ namespace visualdl {
const
std
::
string
StorageBase
::
meta_file_name
=
"storage.meta"
;
const
std
::
string
StorageBase
::
meta_file_name
=
"storage.meta"
;
std
::
string
StorageBase
::
meta_path
()
const
{
std
::
string
StorageBase
::
meta_path
(
const
std
::
string
&
dir
)
const
{
CHECK
(
!
storage_
.
dir
().
empty
())
<<
"storage.dir should be set first
"
;
CHECK
(
!
dir
.
empty
())
<<
"dir is empty
"
;
return
storage_
.
dir
()
+
"/"
+
meta_file_name
;
return
dir
+
"/"
+
meta_file_name
;
}
}
std
::
string
StorageBase
::
tablet_path
(
const
std
::
string
&
tag
)
const
{
std
::
string
StorageBase
::
tablet_path
(
const
std
::
string
&
dir
,
CHECK
(
!
storage_
.
dir
().
empty
())
<<
"storage.dir should be set first"
;
const
std
::
string
&
tag
)
const
{
return
storage_
.
dir
()
+
"/"
+
tag
;
CHECK
(
!
dir
.
empty
())
<<
"dir should be set first"
;
return
dir
+
"/"
+
tag
;
}
}
storage
::
Tablet
*
MemoryStorage
::
NewTablet
(
const
std
::
string
&
tag
,
storage
::
Tablet
*
MemoryStorage
::
NewTablet
(
const
std
::
string
&
tag
,
...
@@ -39,18 +40,20 @@ storage::Tablet *MemoryStorage::tablet(const std::string &tag) {
...
@@ -39,18 +40,20 @@ storage::Tablet *MemoryStorage::tablet(const std::string &tag) {
}
}
// TODO add some checksum to avoid unnecessary saving
// TODO add some checksum to avoid unnecessary saving
void
MemoryStorage
::
PersistToDisk
(
)
const
{
void
MemoryStorage
::
PersistToDisk
(
const
std
::
string
&
dir
)
{
CHECK
(
!
storage_
.
dir
().
empty
())
<<
"storage's dir should be set first"
;
CHECK
(
!
dir
.
empty
())
;
VLOG
(
3
)
<<
"persist storage to disk path "
<<
storage_
.
dir
(
);
storage_
.
set_dir
(
dir
);
// make a directory if not exist
// make a directory if not exist
fs
::
TryMkdir
(
storage_
.
dir
()
);
fs
::
TryMkdir
(
dir
);
// write storage out
// write storage out
fs
::
SerializeToFile
(
storage_
,
meta_path
());
LOG
(
INFO
)
<<
"to serize meta to dir "
<<
dir
;
fs
::
SerializeToFile
(
storage_
,
meta_path
(
dir
));
LOG
(
INFO
)
<<
"serize meta to dir "
<<
dir
;
// write all the tablets
// write all the tablets
for
(
auto
tag
:
storage_
.
tags
())
{
for
(
auto
tag
:
storage_
.
tags
())
{
auto
it
=
tablets_
.
find
(
tag
);
auto
it
=
tablets_
.
find
(
tag
);
CHECK
(
it
!=
tablets_
.
end
());
CHECK
(
it
!=
tablets_
.
end
());
fs
::
SerializeToFile
(
it
->
second
,
tablet_path
(
tag
));
fs
::
SerializeToFile
(
it
->
second
,
tablet_path
(
dir
,
tag
));
}
}
}
}
...
@@ -59,33 +62,53 @@ void MemoryStorage::LoadFromDisk(const std::string &dir) {
...
@@ -59,33 +62,53 @@ void MemoryStorage::LoadFromDisk(const std::string &dir) {
CHECK
(
!
dir
.
empty
())
<<
"dir is empty"
;
CHECK
(
!
dir
.
empty
())
<<
"dir is empty"
;
storage_
.
set_dir
(
dir
);
storage_
.
set_dir
(
dir
);
// load storage
// load storage
CHECK
(
fs
::
DeSerializeFromFile
(
&
storage_
,
meta_path
()))
CHECK
(
fs
::
DeSerializeFromFile
(
&
storage_
,
meta_path
(
dir
)))
<<
"parse from "
<<
meta_path
()
<<
" failed"
;
<<
"parse from "
<<
meta_path
(
dir
)
<<
" failed"
;
// load all the tablets
// load all the tablets
for
(
int
i
=
0
;
i
<
storage_
.
tags_size
();
i
++
)
{
for
(
int
i
=
0
;
i
<
storage_
.
tags_size
();
i
++
)
{
auto
tag
=
storage_
.
tags
(
i
);
auto
tag
=
storage_
.
tags
(
i
);
CHECK
(
fs
::
DeSerializeFromFile
(
&
tablets_
[
tag
],
tablet_path
(
tag
)));
CHECK
(
fs
::
DeSerializeFromFile
(
&
tablets_
[
tag
],
tablet_path
(
dir
,
tag
)));
}
}
}
}
void
MemoryStorage
::
StartReadService
()
{
void
MemoryStorage
::
StartReadService
(
const
std
::
string
&
dir
,
cc
::
PeriodExector
::
task_t
task
=
[
this
]
{
int
msecs
,
VLOG
(
3
)
<<
"loading from "
<<
storage_
.
dir
();
std
::
mutex
*
handler
)
{
LoadFromDisk
(
storage_
.
dir
());
CHECK
(
executor_
!=
nullptr
);
CHECK
(
!
dir
.
empty
())
<<
"dir should be set first"
;
cc
::
PeriodExector
::
task_t
task
=
[
dir
,
this
,
handler
]
{
LOG
(
INFO
)
<<
"loading from "
<<
dir
;
if
(
handler
!=
nullptr
)
{
std
::
lock_guard
<
std
::
mutex
>
_
(
*
handler
);
LoadFromDisk
(
dir
);
}
else
{
LoadFromDisk
(
dir
);
}
return
true
;
return
true
;
};
};
cc
::
PeriodExector
::
Global
()
.
Start
();
// executor_
.Start();
VLOG
(
3
)
<<
"push read task"
;
LOG
(
INFO
)
<<
"push read task"
;
cc
::
PeriodExector
::
Global
()(
std
::
move
(
task
),
2512
);
(
*
executor_
)(
std
::
move
(
task
),
msecs
);
}
}
void
MemoryStorage
::
StartWriteSerice
()
{
void
MemoryStorage
::
StartWriteSerice
(
const
std
::
string
&
dir
,
cc
::
PeriodExector
::
Global
().
Start
();
int
msecs
,
cc
::
PeriodExector
::
task_t
task
=
[
this
]
{
std
::
mutex
*
handler
)
{
PersistToDisk
();
CHECK
(
executor_
!=
nullptr
);
CHECK
(
!
dir
.
empty
())
<<
"dir should be set first"
;
storage_
.
set_dir
(
dir
);
// executor_.Start();
cc
::
PeriodExector
::
task_t
task
=
[
dir
,
handler
,
this
]
{
LOG
(
INFO
)
<<
"persist to disk"
;
if
(
handler
!=
nullptr
)
{
std
::
lock_guard
<
std
::
mutex
>
_
(
*
handler
);
PersistToDisk
(
dir
);
}
else
{
PersistToDisk
(
dir
);
}
return
true
;
return
true
;
};
};
cc
::
PeriodExector
::
Global
()(
std
::
move
(
task
),
2000
);
(
*
executor_
)(
std
::
move
(
task
),
msecs
);
}
}
}
// namespace visualdl
}
// namespace visualdl
visualdl/storage/storage.h
浏览文件 @
6eaf6a75
...
@@ -6,6 +6,7 @@
...
@@ -6,6 +6,7 @@
#include <string>
#include <string>
#include "visualdl/storage/storage.pb.h"
#include "visualdl/storage/storage.pb.h"
#include "visualdl/utils/concurrency.h"
namespace
visualdl
{
namespace
visualdl
{
...
@@ -36,8 +37,8 @@ public:
...
@@ -36,8 +37,8 @@ public:
storage_
.
set_dir
(
dir
);
storage_
.
set_dir
(
dir
);
}
}
std
::
string
meta_path
()
const
;
std
::
string
meta_path
(
const
std
::
string
&
dir
)
const
;
std
::
string
tablet_path
(
const
std
::
string
&
tag
)
const
;
std
::
string
tablet_path
(
const
std
::
string
&
dir
,
const
std
::
string
&
tag
)
const
;
/*
/*
* Create a new Tablet storage.
* Create a new Tablet storage.
...
@@ -56,7 +57,7 @@ public:
...
@@ -56,7 +57,7 @@ public:
* Persist the data from cache to disk. Both the memory storage or disk
* Persist the data from cache to disk. Both the memory storage or disk
* storage should write changes to disk for persistence.
* storage should write changes to disk for persistence.
*/
*/
virtual
void
PersistToDisk
(
)
const
=
0
;
virtual
void
PersistToDisk
(
const
std
::
string
&
dir
)
=
0
;
/*
/*
* Load data from disk.
* Load data from disk.
...
@@ -75,28 +76,39 @@ protected:
...
@@ -75,28 +76,39 @@ protected:
*/
*/
class
MemoryStorage
final
:
public
StorageBase
{
class
MemoryStorage
final
:
public
StorageBase
{
public:
public:
MemoryStorage
()
{}
MemoryStorage
(
cc
::
PeriodExector
*
executor
)
:
executor_
(
executor
)
{}
~
MemoryStorage
()
{
if
(
executor_
!=
nullptr
)
executor_
->
Quit
();
}
storage
::
Tablet
*
NewTablet
(
const
std
::
string
&
tag
,
int
num_samples
)
override
;
storage
::
Tablet
*
NewTablet
(
const
std
::
string
&
tag
,
int
num_samples
)
override
;
storage
::
Tablet
*
tablet
(
const
std
::
string
&
tag
)
override
;
storage
::
Tablet
*
tablet
(
const
std
::
string
&
tag
)
override
;
void
PersistToDisk
(
)
const
override
;
void
PersistToDisk
(
const
std
::
string
&
dir
)
override
;
void
LoadFromDisk
(
const
std
::
string
&
dir
)
override
;
void
LoadFromDisk
(
const
std
::
string
&
dir
)
override
;
/*
/*
* Create a thread which will keep reading the latest data from the disk to
* Create a thread which will keep reading the latest data from the disk to
* memory.
* memory.
*
* msecs: how many millisecond to sync memory and disk.
*/
*/
void
StartReadService
();
void
StartReadService
(
const
std
::
string
&
dir
,
int
msecs
,
std
::
mutex
*
handler
);
/*
/*
* Create a thread which will keep writing the latest changes from memory to
* Create a thread which will keep writing the latest changes from memory to
* disk.
* disk.
*
* msecs: how many millisecond to sync memory and disk.
*/
*/
void
StartWriteSerice
();
void
StartWriteSerice
(
const
std
::
string
&
dir
,
int
msecs
,
std
::
mutex
*
handler
);
private:
private:
std
::
map
<
std
::
string
,
storage
::
Tablet
>
tablets_
;
std
::
map
<
std
::
string
,
storage
::
Tablet
>
tablets_
;
// TODO(ChunweiYan) remove executor here.
cc
::
PeriodExector
*
executor_
{
nullptr
};
};
};
}
// namespace visualdl
}
// namespace visualdl
...
...
visualdl/storage/storage_test.cc
浏览文件 @
6eaf6a75
...
@@ -32,15 +32,17 @@ TEST_F(MemoryStorageTest, AddTablet) {
...
@@ -32,15 +32,17 @@ TEST_F(MemoryStorageTest, AddTablet) {
}
}
TEST_F
(
MemoryStorageTest
,
PersistToDisk
)
{
TEST_F
(
MemoryStorageTest
,
PersistToDisk
)
{
storage_
.
SetStorage
(
"./tmp"
)
;
const
std
::
string
dir
=
"./tmp/201.test"
;
CHECK
(
!
storage_
.
data
().
dir
().
empty
()
);
storage_
.
SetStorage
(
dir
);
string
tag
=
"add%20tag0"
;
string
tag
=
"add%20tag0"
;
storage_
.
NewTablet
(
tag
,
-
1
);
storage_
.
NewTablet
(
tag
,
-
1
);
storage_
.
PersistToDisk
();
storage_
.
PersistToDisk
(
dir
);
LOG
(
INFO
)
<<
"persist to disk"
;
MemoryStorage
other
;
MemoryStorage
other
;
other
.
LoadFromDisk
(
"./tmp"
);
other
.
LoadFromDisk
(
dir
);
LOG
(
INFO
)
<<
"read from disk"
;
ASSERT_EQ
(
other
.
data
().
SerializeAsString
(),
ASSERT_EQ
(
other
.
data
().
SerializeAsString
(),
storage_
.
data
().
SerializeAsString
());
storage_
.
data
().
SerializeAsString
());
}
}
...
...
visualdl/utils/concurrency.h
浏览文件 @
6eaf6a75
...
@@ -17,13 +17,9 @@ namespace cc {
...
@@ -17,13 +17,9 @@ namespace cc {
struct
PeriodExector
{
struct
PeriodExector
{
using
task_t
=
std
::
function
<
bool
()
>
;
using
task_t
=
std
::
function
<
bool
()
>
;
static
PeriodExector
&
Global
()
{
static
PeriodExector
exec
;
return
exec
;
}
void
Quit
()
{
void
Quit
()
{
// TODO use some conditonal variable to help quit immediately.
// TODO use some conditonal variable to help quit immediately.
// std::this_thread::sleep_for(std::chrono::milliseconds(200));
quit
=
true
;
quit
=
true
;
}
}
...
@@ -34,6 +30,7 @@ struct PeriodExector {
...
@@ -34,6 +30,7 @@ struct PeriodExector {
auto
task_wrapper
=
[
=
]
{
auto
task_wrapper
=
[
=
]
{
while
(
!
quit
)
{
while
(
!
quit
)
{
// task failed
if
(
!
task
())
break
;
if
(
!
task
())
break
;
// if the program is terminated, quit while as soon as possible.
// if the program is terminated, quit while as soon as possible.
// this is just trick, but should works.
// this is just trick, but should works.
...
@@ -57,6 +54,7 @@ struct PeriodExector {
...
@@ -57,6 +54,7 @@ struct PeriodExector {
}
}
~
PeriodExector
()
{
~
PeriodExector
()
{
Quit
();
for
(
auto
&
t
:
threads_
)
{
for
(
auto
&
t
:
threads_
)
{
if
(
t
.
joinable
())
{
if
(
t
.
joinable
())
{
t
.
join
();
t
.
join
();
...
...
visualdl/utils/test_concurrency.cc
浏览文件 @
6eaf6a75
...
@@ -8,12 +8,13 @@ namespace visualdl {
...
@@ -8,12 +8,13 @@ namespace visualdl {
int
counter
=
0
;
int
counter
=
0
;
TEST
(
concurrency
,
test
)
{
TEST
(
concurrency
,
test
)
{
cc
::
PeriodExector
::
task_t
task
=
[
&
counter
]()
{
cc
::
PeriodExector
executor
;
cc
::
PeriodExector
::
task_t
task
=
[
&
]()
{
LOG
(
INFO
)
<<
"Hello "
<<
counter
++
;
LOG
(
INFO
)
<<
"Hello "
<<
counter
++
;
if
(
counter
>
5
)
return
false
;
if
(
counter
>
5
)
return
false
;
return
true
;
return
true
;
};
};
cc
::
PeriodExector
::
Global
()
(
std
::
move
(
task
),
200
);
executor
(
std
::
move
(
task
),
200
);
}
}
}
// namespace visualdl
}
// namespace visualdl
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录