Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
03b049b1
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
03b049b1
编写于
8月 19, 2019
作者:
W
wxyu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
MS-383 Modify condition variable usage in scheduler
Former-commit-id: 5be1c8879d1a77d444a5722dbd806a5eb00973e5
上级
842fa507
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
73 addition
and
64 deletion
+73
-64
cpp/CHANGELOG.md
cpp/CHANGELOG.md
+1
-0
cpp/src/scheduler/ResourceMgr.cpp
cpp/src/scheduler/ResourceMgr.cpp
+3
-2
cpp/src/scheduler/Scheduler.cpp
cpp/src/scheduler/Scheduler.cpp
+6
-8
cpp/src/scheduler/action/PushTaskToNeighbour.cpp
cpp/src/scheduler/action/PushTaskToNeighbour.cpp
+2
-1
cpp/src/scheduler/resource/Resource.cpp
cpp/src/scheduler/resource/Resource.cpp
+12
-5
cpp/unittest/scheduler/normal_test.cpp
cpp/unittest/scheduler/normal_test.cpp
+49
-48
未找到文件。
cpp/CHANGELOG.md
浏览文件 @
03b049b1
...
...
@@ -32,6 +32,7 @@ Please mark all change in change log and use the ticket from JIRA.
-
MS-378 - Debug and Update normal_test in scheduler unittest
-
MS-379 - Add Dump implementation in Resource
-
MS-380 - Update resource loader and executor, work util all finished
-
MS-383 - Modify condition variable usage in scheduler
## New Feature
-
MS-343 - Implement ResourceMgr
...
...
cpp/src/scheduler/ResourceMgr.cpp
浏览文件 @
03b049b1
...
...
@@ -76,7 +76,7 @@ ResourceMgr::Stop() {
void
ResourceMgr
::
PostEvent
(
const
EventPtr
&
event
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
event_mutex_
);
std
::
lock_guard
<
std
::
mutex
>
lock
(
event_mutex_
);
queue_
.
emplace
(
event
);
event_cv_
.
notify_one
();
}
...
...
@@ -100,13 +100,14 @@ ResourceMgr::event_process() {
event_cv_
.
wait
(
lock
,
[
this
]
{
return
!
queue_
.
empty
();
});
auto
event
=
queue_
.
front
();
queue_
.
pop
();
lock
.
unlock
();
if
(
event
==
nullptr
)
{
break
;
}
// ENGINE_LOG_DEBUG << "ResourceMgr process " << *event;
queue_
.
pop
();
if
(
subscriber_
)
{
subscriber_
(
event
);
}
...
...
cpp/src/scheduler/Scheduler.cpp
浏览文件 @
03b049b1
...
...
@@ -41,10 +41,11 @@ Scheduler::Stop() {
void
Scheduler
::
PostEvent
(
const
EventPtr
&
event
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
event_mutex_
);
event_queue_
.
push
(
event
);
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
event_mutex_
);
event_queue_
.
push
(
event
);
}
event_cv_
.
notify_one
();
// SERVER_LOG_DEBUG << "Scheduler post " << *event;
}
std
::
string
...
...
@@ -58,12 +59,11 @@ Scheduler::worker_function() {
std
::
unique_lock
<
std
::
mutex
>
lock
(
event_mutex_
);
event_cv_
.
wait
(
lock
,
[
this
]
{
return
!
event_queue_
.
empty
();
});
auto
event
=
event_queue_
.
front
();
event_queue_
.
pop
();
if
(
event
==
nullptr
)
{
break
;
}
// SERVER_LOG_DEBUG << "Scheduler process " << *event;
event_queue_
.
pop
();
Process
(
event
);
}
}
...
...
@@ -105,16 +105,14 @@ Scheduler::OnStartUp(const EventPtr &event) {
void
Scheduler
::
OnFinishTask
(
const
EventPtr
&
event
)
{
if
(
auto
resource
=
event
->
resource_
.
lock
())
{
resource
->
WakeupExecutor
();
}
}
void
Scheduler
::
OnCopyCompleted
(
const
EventPtr
&
event
)
{
if
(
auto
resource
=
event
->
resource_
.
lock
())
{
resource
->
WakeupLoader
();
resource
->
WakeupExecutor
();
if
(
resource
->
Type
()
==
ResourceType
::
DISK
)
{
if
(
resource
->
Type
()
==
ResourceType
::
DISK
)
{
Action
::
PushTaskToNeighbour
(
event
->
resource_
);
}
}
...
...
cpp/src/scheduler/action/PushTaskToNeighbour.cpp
浏览文件 @
03b049b1
...
...
@@ -4,6 +4,7 @@
* Proprietary and confidential.
******************************************************************************/
#include <iostream>
#include "Action.h"
...
...
@@ -16,7 +17,7 @@ push_task(const ResourcePtr &self, const ResourcePtr &other) {
auto
&
self_task_table
=
self
->
task_table
();
auto
&
other_task_table
=
other
->
task_table
();
CacheMgr
cache
;
auto
indexes
=
PickToMove
(
self_task_table
,
cache
,
1
);
auto
indexes
=
PickToMove
(
self_task_table
,
cache
,
1
0
);
for
(
auto
index
:
indexes
)
{
if
(
self_task_table
.
Move
(
index
))
{
auto
task
=
self_task_table
.
Get
(
index
)
->
task
;
...
...
cpp/src/scheduler/resource/Resource.cpp
浏览文件 @
03b049b1
...
...
@@ -3,6 +3,7 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <iostream>
#include "Resource.h"
...
...
@@ -61,19 +62,23 @@ TaskTable &Resource::task_table() {
}
void
Resource
::
WakeupLoader
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
load_mutex_
);
load_flag_
=
true
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
load_mutex_
);
load_flag_
=
true
;
}
load_cv_
.
notify_one
();
}
void
Resource
::
WakeupExecutor
()
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
exec_mutex_
);
exec_flag_
=
true
;
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
exec_mutex_
);
exec_flag_
=
true
;
}
exec_cv_
.
notify_one
();
}
TaskTableItemPtr
Resource
::
pick_task_load
()
{
auto
indexes
=
PickToLoad
(
task_table_
,
3
);
auto
indexes
=
PickToLoad
(
task_table_
,
10
);
for
(
auto
index
:
indexes
)
{
// try to set one task loading, then return
if
(
task_table_
.
Load
(
index
))
...
...
@@ -99,6 +104,7 @@ void Resource::loader_function() {
std
::
unique_lock
<
std
::
mutex
>
lock
(
load_mutex_
);
load_cv_
.
wait
(
lock
,
[
&
]
{
return
load_flag_
;
});
load_flag_
=
false
;
lock
.
unlock
();
while
(
true
)
{
auto
task_item
=
pick_task_load
();
if
(
task_item
==
nullptr
)
{
...
...
@@ -125,6 +131,7 @@ void Resource::executor_function() {
std
::
unique_lock
<
std
::
mutex
>
lock
(
exec_mutex_
);
exec_cv_
.
wait
(
lock
,
[
&
]
{
return
exec_flag_
;
});
exec_flag_
=
false
;
lock
.
unlock
();
while
(
true
)
{
auto
task_item
=
pick_task_execute
();
if
(
task_item
==
nullptr
)
{
...
...
cpp/unittest/scheduler/normal_test.cpp
浏览文件 @
03b049b1
...
...
@@ -27,59 +27,60 @@ TEST(normal_test, test1) {
auto
scheduler
=
new
Scheduler
(
res_mgr
);
scheduler
->
Start
();
auto
task1
=
std
::
make_shared
<
TestTask
>
();
auto
task2
=
std
::
make_shared
<
TestTask
>
();
auto
task3
=
std
::
make_shared
<
TestTask
>
();
auto
task4
=
std
::
make_shared
<
TestTask
>
();
if
(
auto
observe
=
disk
.
lock
())
{
observe
->
task_table
().
Put
(
task1
);
observe
->
task_table
().
Put
(
task2
);
observe
->
task_table
().
Put
(
task3
);
observe
->
task_table
().
Put
(
task4
);
const
uint64_t
NUM_TASK
=
10
;
std
::
vector
<
std
::
shared_ptr
<
TestTask
>>
tasks
;
for
(
uint64_t
i
=
0
;
i
<
NUM_TASK
;
++
i
)
{
if
(
auto
observe
=
disk
.
lock
())
{
auto
task
=
std
::
make_shared
<
TestTask
>
();
tasks
.
push_back
(
task
);
observe
->
task_table
().
Put
(
task
);
}
}
//
if (auto disk_r = disk.lock()) {
//
if (auto cpu_r = cpu.lock()) {
//
if (auto gpu1_r = gpu1.lock()) {
//
if (auto gpu2_r = gpu2.lock()) {
//
std::cout << "<<<<<<<<<<before<<<<<<<<<<" << std::endl;
//
std::cout << "disk:" << std::endl;
//
std::cout << disk_r->task_table().Dump() << std::endl;
//
std::cout << "cpu:" << std::endl;
//
std::cout << cpu_r->task_table().Dump() << std::endl;
//
std::cout << "gpu1:" << std::endl;
//
std::cout << gpu1_r->task_table().Dump() << std::endl;
//
std::cout << "gpu2:" << std::endl;
//
std::cout << gpu2_r->task_table().Dump() << std::endl;
//
std::cout << ">>>>>>>>>>before>>>>>>>>>>" << std::endl;
//
}
//
}
//
}
//
}
if
(
auto
disk_r
=
disk
.
lock
())
{
if
(
auto
cpu_r
=
cpu
.
lock
())
{
if
(
auto
gpu1_r
=
gpu1
.
lock
())
{
if
(
auto
gpu2_r
=
gpu2
.
lock
())
{
std
::
cout
<<
"<<<<<<<<<<before<<<<<<<<<<"
<<
std
::
endl
;
std
::
cout
<<
"disk:"
<<
std
::
endl
;
std
::
cout
<<
disk_r
->
task_table
().
Dump
()
<<
std
::
endl
;
std
::
cout
<<
"cpu:"
<<
std
::
endl
;
std
::
cout
<<
cpu_r
->
task_table
().
Dump
()
<<
std
::
endl
;
std
::
cout
<<
"gpu1:"
<<
std
::
endl
;
std
::
cout
<<
gpu1_r
->
task_table
().
Dump
()
<<
std
::
endl
;
std
::
cout
<<
"gpu2:"
<<
std
::
endl
;
std
::
cout
<<
gpu2_r
->
task_table
().
Dump
()
<<
std
::
endl
;
std
::
cout
<<
">>>>>>>>>>before>>>>>>>>>>"
<<
std
::
endl
;
}
}
}
}
sleep
(
5
);
sleep
(
1
);
//
if (auto disk_r = disk.lock()) {
//
if (auto cpu_r = cpu.lock()) {
//
if (auto gpu1_r = gpu1.lock()) {
//
if (auto gpu2_r = gpu2.lock()) {
//
std::cout << "<<<<<<<<<<after<<<<<<<<<<" << std::endl;
//
std::cout << "disk:" << std::endl;
//
std::cout << disk_r->task_table().Dump() << std::endl;
//
std::cout << "cpu:" << std::endl;
//
std::cout << cpu_r->task_table().Dump() << std::endl;
//
std::cout << "gpu1:" << std::endl;
//
std::cout << gpu1_r->task_table().Dump() << std::endl;
//
std::cout << "gpu2:" << std::endl;
//
std::cout << gpu2_r->task_table().Dump() << std::endl;
//
std::cout << ">>>>>>>>>>after>>>>>>>>>>" << std::endl;
//
}
//
}
//
}
//
}
if
(
auto
disk_r
=
disk
.
lock
())
{
if
(
auto
cpu_r
=
cpu
.
lock
())
{
if
(
auto
gpu1_r
=
gpu1
.
lock
())
{
if
(
auto
gpu2_r
=
gpu2
.
lock
())
{
std
::
cout
<<
"<<<<<<<<<<after<<<<<<<<<<"
<<
std
::
endl
;
std
::
cout
<<
"disk:"
<<
std
::
endl
;
std
::
cout
<<
disk_r
->
task_table
().
Dump
()
<<
std
::
endl
;
std
::
cout
<<
"cpu:"
<<
std
::
endl
;
std
::
cout
<<
cpu_r
->
task_table
().
Dump
()
<<
std
::
endl
;
std
::
cout
<<
"gpu1:"
<<
std
::
endl
;
std
::
cout
<<
gpu1_r
->
task_table
().
Dump
()
<<
std
::
endl
;
std
::
cout
<<
"gpu2:"
<<
std
::
endl
;
std
::
cout
<<
gpu2_r
->
task_table
().
Dump
()
<<
std
::
endl
;
std
::
cout
<<
">>>>>>>>>>after>>>>>>>>>>"
<<
std
::
endl
;
}
}
}
}
scheduler
->
Stop
();
res_mgr
->
Stop
();
ASSERT_EQ
(
task1
->
load_count_
,
1
);
ASSERT_EQ
(
task1
->
exec_count_
,
1
);
for
(
uint64_t
i
=
0
;
i
<
NUM_TASK
;
++
i
)
{
ASSERT_EQ
(
tasks
[
i
]
->
load_count_
,
1
);
ASSERT_EQ
(
tasks
[
i
]
->
exec_count_
,
1
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录