Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
8a3ca627
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
8a3ca627
编写于
9月 20, 2019
作者:
Y
Yu Kun
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
MS-576 Scheduler refactor
Former-commit-id: 0e87ca2ed7807e90b514f735548cb3f7349c7ab6
上级
2284b729
变更
6
显示空白变更内容
内联
并排
Showing
6 changed file
with
118 addition
and
106 deletion
+118
-106
cpp/CHANGELOG.md
cpp/CHANGELOG.md
+1
-0
cpp/src/scheduler/Scheduler.cpp
cpp/src/scheduler/Scheduler.cpp
+24
-105
cpp/src/scheduler/Scheduler.h
cpp/src/scheduler/Scheduler.h
+2
-0
cpp/src/scheduler/action/Action.h
cpp/src/scheduler/action/Action.h
+10
-0
cpp/src/scheduler/action/PushTaskToNeighbour.cpp
cpp/src/scheduler/action/PushTaskToNeighbour.cpp
+80
-0
cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp
cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp
+1
-1
未找到文件。
cpp/CHANGELOG.md
浏览文件 @
8a3ca627
...
...
@@ -147,6 +147,7 @@ Please mark all change in change log and use the ticket from JIRA.
-
MS-539 - Remove old task code
-
MS-546 - Add simple mode resource_config
-
MS-570 - Add prometheus docker-compose file
-
MS-576 - Scheduler refactor
## New Feature
-
MS-343 - Implement ResourceMgr
...
...
cpp/src/scheduler/Scheduler.cpp
浏览文件 @
8a3ca627
...
...
@@ -16,7 +16,7 @@
// under the License.
#include
<src/cache/GpuCacheMgr.h>
#include
"src/cache/GpuCacheMgr.h"
#include "event/LoadCompletedEvent.h"
#include "Scheduler.h"
#include "action/Action.h"
...
...
@@ -33,6 +33,14 @@ Scheduler::Scheduler(ResourceMgrWPtr res_mgr)
if
(
auto
mgr
=
res_mgr_
.
lock
())
{
mgr
->
RegisterSubscriber
(
std
::
bind
(
&
Scheduler
::
PostEvent
,
this
,
std
::
placeholders
::
_1
));
}
event_register_
.
insert
(
std
::
make_pair
(
static_cast
<
uint64_t
>
(
EventType
::
START_UP
),
std
::
bind
(
&
Scheduler
::
OnStartUp
,
this
,
std
::
placeholders
::
_1
)));
event_register_
.
insert
(
std
::
make_pair
(
static_cast
<
uint64_t
>
(
EventType
::
LOAD_COMPLETED
),
std
::
bind
(
&
Scheduler
::
OnLoadCompleted
,
this
,
std
::
placeholders
::
_1
)));
event_register_
.
insert
(
std
::
make_pair
(
static_cast
<
uint64_t
>
(
EventType
::
TASK_TABLE_UPDATED
),
std
::
bind
(
&
Scheduler
::
OnTaskTableUpdated
,
this
,
std
::
placeholders
::
_1
)));
event_register_
.
insert
(
std
::
make_pair
(
static_cast
<
uint64_t
>
(
EventType
::
FINISH_TASK
),
std
::
bind
(
&
Scheduler
::
OnFinishTask
,
this
,
std
::
placeholders
::
_1
)));
}
...
...
@@ -84,40 +92,8 @@ Scheduler::worker_function() {
void
Scheduler
::
Process
(
const
EventPtr
&
event
)
{
switch
(
event
->
Type
())
{
case
EventType
::
START_UP
:
{
OnStartUp
(
event
);
break
;
}
case
EventType
::
LOAD_COMPLETED
:
{
OnLoadCompleted
(
event
);
break
;
}
case
EventType
::
FINISH_TASK
:
{
OnFinishTask
(
event
);
break
;
}
case
EventType
::
TASK_TABLE_UPDATED
:
{
OnTaskTableUpdated
(
event
);
break
;
}
default:
{
// TODO: logging
break
;
}
}
}
void
Scheduler
::
OnStartUp
(
const
EventPtr
&
event
)
{
if
(
auto
resource
=
event
->
resource_
.
lock
())
{
resource
->
WakeupLoader
();
}
}
void
Scheduler
::
OnFinishTask
(
const
EventPtr
&
event
)
{
auto
process_event
=
event_register_
.
at
(
static_cast
<
int
>
(
event
->
Type
()));
process_event
(
event
);
}
// TODO: refactor the function
...
...
@@ -130,79 +106,11 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
auto
task_table_type
=
load_completed_event
->
task_table_item_
->
task
->
label
()
->
Type
();
switch
(
task_table_type
)
{
case
TaskLabelType
::
DEFAULT
:
{
if
(
not
resource
->
HasExecutor
()
&&
load_completed_event
->
task_table_item_
->
Move
())
{
auto
task
=
load_completed_event
->
task_table_item_
->
task
;
auto
search_task
=
std
::
static_pointer_cast
<
XSearchTask
>
(
task
);
bool
moved
=
false
;
// to support test task, REFACTOR
if
(
auto
index_engine
=
search_task
->
index_engine_
)
{
auto
location
=
index_engine
->
GetLocation
();
for
(
auto
i
=
0
;
i
<
res_mgr_
.
lock
()
->
GetNumGpuResource
();
++
i
)
{
auto
index
=
zilliz
::
milvus
::
cache
::
GpuCacheMgr
::
GetInstance
(
i
)
->
GetIndex
(
location
);
if
(
index
!=
nullptr
)
{
moved
=
true
;
auto
dest_resource
=
res_mgr_
.
lock
()
->
GetResource
(
ResourceType
::
GPU
,
i
);
Action
::
PushTaskToResource
(
load_completed_event
->
task_table_item_
->
task
,
dest_resource
);
break
;
}
}
}
if
(
not
moved
)
{
Action
::
PushTaskToNeighbourRandomly
(
task
,
resource
);
}
}
Action
::
DefaultLabelTaskScheduler
(
res_mgr_
,
resource
,
load_completed_event
);
break
;
}
case
TaskLabelType
::
SPECIFIED_RESOURCE
:
{
// support next version
// auto self = event->resource_.lock();
// auto task = load_completed_event->task_table_item_->task;
//
// // if this resource is disk, assign it to smallest cost resource
// if (self->type() == ResourceType::DISK) {
// // step 1: calculate shortest path per resource, from disk to compute resource
// auto compute_resources = res_mgr_.lock()->GetComputeResources();
// std::vector<std::vector<std::string>> paths;
// std::vector<uint64_t> transport_costs;
// for (auto &res : compute_resources) {
// std::vector<std::string> path;
// uint64_t transport_cost = ShortestPath(self, res, res_mgr_.lock(), path);
// transport_costs.push_back(transport_cost);
// paths.emplace_back(path);
// }
//
// // step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
// uint64_t min_cost = std::numeric_limits<uint64_t>::max();
// uint64_t min_cost_idx = 0;
// for (uint64_t i = 0; i < compute_resources.size(); ++i) {
// if (compute_resources[i]->TotalTasks() == 0) {
// min_cost_idx = i;
// break;
// }
// uint64_t cost = compute_resources[i]->TaskAvgCost() * compute_resources[i]->NumOfTaskToExec()
// + transport_costs[i];
// if (min_cost > cost) {
// min_cost = cost;
// min_cost_idx = i;
// }
// }
//
// // step 3: set path in task
// Path task_path(paths[min_cost_idx], paths[min_cost_idx].size() - 1);
// task->path() = task_path;
// }
//
// if (self->name() == task->path().Last()) {
// self->WakeupLoader();
// } else {
// auto next_res_name = task->path().Next();
// auto next_res = res_mgr_.lock()->GetResource(next_res_name);
// load_completed_event->task_table_item_->Move();
// next_res->task_table().Put(task);
// }
Action
::
SpecifiedResourceLabelTaskScheduler
(
res_mgr_
,
resource
,
load_completed_event
);
break
;
}
case
TaskLabelType
::
BROADCAST
:
{
...
...
@@ -216,6 +124,17 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
}
}
void
Scheduler
::
OnStartUp
(
const
EventPtr
&
event
)
{
if
(
auto
resource
=
event
->
resource_
.
lock
())
{
resource
->
WakeupLoader
();
}
}
void
Scheduler
::
OnFinishTask
(
const
EventPtr
&
event
)
{
}
void
Scheduler
::
OnTaskTableUpdated
(
const
EventPtr
&
event
)
{
if
(
auto
resource
=
event
->
resource_
.
lock
())
{
...
...
cpp/src/scheduler/Scheduler.h
浏览文件 @
8a3ca627
...
...
@@ -122,6 +122,8 @@ private:
private:
bool
running_
;
std
::
unordered_map
<
uint64_t
,
std
::
function
<
void
(
EventPtr
)
>>
event_register_
;
ResourceMgrWPtr
res_mgr_
;
std
::
queue
<
EventPtr
>
event_queue_
;
std
::
thread
worker_thread_
;
...
...
cpp/src/scheduler/action/Action.h
浏览文件 @
8a3ca627
...
...
@@ -18,6 +18,7 @@
#pragma once
#include "../resource/Resource.h"
#include "../ResourceMgr.h"
namespace
zilliz
{
...
...
@@ -34,6 +35,15 @@ public:
static
void
PushTaskToResource
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
dest
);
static
void
DefaultLabelTaskScheduler
(
ResourceMgrWPtr
res_mgr
,
ResourcePtr
resource
,
std
::
shared_ptr
<
LoadCompletedEvent
>
event
);
static
void
SpecifiedResourceLabelTaskScheduler
(
ResourceMgrWPtr
res_mgr
,
ResourcePtr
resource
,
std
::
shared_ptr
<
LoadCompletedEvent
>
event
);
};
...
...
cpp/src/scheduler/action/PushTaskToNeighbour.cpp
浏览文件 @
8a3ca627
...
...
@@ -18,6 +18,8 @@
#include <list>
#include <random>
#include "../Algorithm.h"
#include "src/cache/GpuCacheMgr.h"
#include "Action.h"
...
...
@@ -101,6 +103,84 @@ Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) {
dest
->
task_table
().
Put
(
task
);
}
void
Action
::
DefaultLabelTaskScheduler
(
ResourceMgrWPtr
res_mgr
,
ResourcePtr
resource
,
std
::
shared_ptr
<
LoadCompletedEvent
>
event
)
{
if
(
not
resource
->
HasExecutor
()
&&
event
->
task_table_item_
->
Move
())
{
auto
task
=
event
->
task_table_item_
->
task
;
auto
search_task
=
std
::
static_pointer_cast
<
XSearchTask
>
(
task
);
bool
moved
=
false
;
//to support test task, REFACTOR
if
(
auto
index_engine
=
search_task
->
index_engine_
)
{
auto
location
=
index_engine
->
GetLocation
();
for
(
auto
i
=
0
;
i
<
res_mgr
.
lock
()
->
GetNumGpuResource
();
++
i
)
{
auto
index
=
zilliz
::
milvus
::
cache
::
GpuCacheMgr
::
GetInstance
(
i
)
->
GetIndex
(
location
);
if
(
index
!=
nullptr
)
{
moved
=
true
;
auto
dest_resource
=
res_mgr
.
lock
()
->
GetResource
(
ResourceType
::
GPU
,
i
);
PushTaskToResource
(
event
->
task_table_item_
->
task
,
dest_resource
);
break
;
}
}
}
if
(
not
moved
)
{
PushTaskToNeighbourRandomly
(
task
,
resource
);
}
}
}
void
Action
::
SpecifiedResourceLabelTaskScheduler
(
ResourceMgrWPtr
res_mgr
,
ResourcePtr
resource
,
std
::
shared_ptr
<
LoadCompletedEvent
>
event
)
{
auto
task
=
event
->
task_table_item_
->
task
;
if
(
resource
->
type
()
==
ResourceType
::
DISK
)
{
// step 1: calculate shortest path per resource, from disk to compute resource
auto
compute_resources
=
res_mgr
.
lock
()
->
GetComputeResources
();
std
::
vector
<
std
::
vector
<
std
::
string
>>
paths
;
std
::
vector
<
uint64_t
>
transport_costs
;
for
(
auto
&
res
:
compute_resources
)
{
std
::
vector
<
std
::
string
>
path
;
uint64_t
transport_cost
=
ShortestPath
(
resource
,
res
,
res_mgr
.
lock
(),
path
);
transport_costs
.
push_back
(
transport_cost
);
paths
.
emplace_back
(
path
);
}
// step 2: select min cost, cost(resource) = avg_cost * task_to_do + transport_cost
uint64_t
min_cost
=
std
::
numeric_limits
<
uint64_t
>::
max
();
uint64_t
min_cost_idx
=
0
;
for
(
uint64_t
i
=
0
;
i
<
compute_resources
.
size
();
++
i
)
{
if
(
compute_resources
[
i
]
->
TotalTasks
()
==
0
)
{
min_cost_idx
=
i
;
break
;
}
uint64_t
cost
=
compute_resources
[
i
]
->
TaskAvgCost
()
*
compute_resources
[
i
]
->
NumOfTaskToExec
()
+
transport_costs
[
i
];
if
(
min_cost
>
cost
)
{
min_cost
=
cost
;
min_cost_idx
=
i
;
}
}
// step 3: set path in task
Path
task_path
(
paths
[
min_cost_idx
],
paths
[
min_cost_idx
].
size
()
-
1
);
task
->
path
()
=
task_path
;
}
if
(
resource
->
name
()
==
task
->
path
().
Last
())
{
resource
->
WakeupLoader
();
}
else
{
auto
next_res_name
=
task
->
path
().
Next
();
auto
next_res
=
res_mgr
.
lock
()
->
GetResource
(
next_res_name
);
event
->
task_table_item_
->
Move
();
next_res
->
task_table
().
Put
(
task
);
}
}
}
}
}
...
...
cpp/src/sdk/examples/grpcsimple/src/ClientTest.cpp
浏览文件 @
8a3ca627
...
...
@@ -39,7 +39,7 @@ constexpr int64_t BATCH_ROW_COUNT = 100000;
constexpr
int64_t
NQ
=
5
;
constexpr
int64_t
TOP_K
=
10
;
constexpr
int64_t
SEARCH_TARGET
=
5000
;
//change this value, result is different
constexpr
int64_t
ADD_VECTOR_LOOP
=
1
0
;
constexpr
int64_t
ADD_VECTOR_LOOP
=
1
;
constexpr
int64_t
SECONDS_EACH_HOUR
=
3600
;
#define BLOCK_SPLITER std::cout << "===========================================" << std::endl;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录