Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
fd84fdcf
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
fd84fdcf
编写于
9月 05, 2019
作者:
S
starlord
浏览文件
操作
浏览文件
下载
差异文件
refine code
Former-commit-id: baaa681395909de65c1a2e588a621bc52d5ff416
上级
471b8be8
6963ff5f
变更
24
隐藏空白更改
内联
并排
Showing
24 changed file
with
193 addition
and
347 deletion
+193
-347
cpp/CHANGELOG.md
cpp/CHANGELOG.md
+1
-0
cpp/src/scheduler/Algorithm.cpp
cpp/src/scheduler/Algorithm.cpp
+11
-11
cpp/src/scheduler/CacheMgr.h
cpp/src/scheduler/CacheMgr.h
+0
-24
cpp/src/scheduler/ResourceMgr.cpp
cpp/src/scheduler/ResourceMgr.cpp
+71
-100
cpp/src/scheduler/ResourceMgr.h
cpp/src/scheduler/ResourceMgr.h
+40
-55
cpp/src/scheduler/SchedInst.cpp
cpp/src/scheduler/SchedInst.cpp
+1
-1
cpp/src/scheduler/Scheduler.cpp
cpp/src/scheduler/Scheduler.cpp
+3
-3
cpp/src/scheduler/TaskTable.cpp
cpp/src/scheduler/TaskTable.cpp
+12
-18
cpp/src/scheduler/TaskTable.h
cpp/src/scheduler/TaskTable.h
+4
-7
cpp/src/scheduler/Utils.cpp
cpp/src/scheduler/Utils.cpp
+4
-3
cpp/src/scheduler/Utils.h
cpp/src/scheduler/Utils.h
+1
-0
cpp/src/scheduler/resource/Connection.h
cpp/src/scheduler/resource/Connection.h
+1
-0
cpp/src/scheduler/resource/RegisterHandler.h
cpp/src/scheduler/resource/RegisterHandler.h
+0
-24
cpp/src/scheduler/resource/Resource.cpp
cpp/src/scheduler/resource/Resource.cpp
+21
-18
cpp/src/scheduler/resource/Resource.h
cpp/src/scheduler/resource/Resource.h
+22
-43
cpp/src/scheduler/task/DeleteTask.cpp
cpp/src/scheduler/task/DeleteTask.cpp
+0
-6
cpp/src/scheduler/task/DeleteTask.h
cpp/src/scheduler/task/DeleteTask.h
+0
-3
cpp/src/scheduler/task/SearchTask.cpp
cpp/src/scheduler/task/SearchTask.cpp
+0
-10
cpp/src/scheduler/task/SearchTask.h
cpp/src/scheduler/task/SearchTask.h
+0
-3
cpp/src/scheduler/task/Task.h
cpp/src/scheduler/task/Task.h
+0
-5
cpp/src/scheduler/task/TaskConvert.cpp
cpp/src/scheduler/task/TaskConvert.cpp
+0
-1
cpp/src/scheduler/task/TestTask.cpp
cpp/src/scheduler/task/TestTask.cpp
+0
-9
cpp/src/scheduler/task/TestTask.h
cpp/src/scheduler/task/TestTask.h
+0
-3
cpp/src/scheduler/tasklabel/TaskLabel.h
cpp/src/scheduler/tasklabel/TaskLabel.h
+1
-0
未找到文件。
cpp/CHANGELOG.md
浏览文件 @
fd84fdcf
...
...
@@ -86,6 +86,7 @@ Please mark all change in change log and use the ticket from JIRA.
-
MS-459 - Add cache for pick function in tasktable
-
MS-482 - Change search stream transport to unary in grpc
-
MS-487 - Define metric type in CreateTable
-
MS-488 - Improve code format in scheduler
## New Feature
-
MS-343 - Implement ResourceMgr
...
...
cpp/src/scheduler/Algorithm.cpp
浏览文件 @
fd84fdcf
...
...
@@ -20,12 +20,12 @@ ShortestPath(const ResourcePtr &src,
std
::
vector
<
std
::
vector
<
std
::
string
>>
paths
;
uint64_t
num_of_resources
=
res_mgr
->
GetAllResouces
().
size
();
uint64_t
num_of_resources
=
res_mgr
->
GetAllResou
r
ces
().
size
();
std
::
unordered_map
<
uint64_t
,
std
::
string
>
id_name_map
;
std
::
unordered_map
<
std
::
string
,
uint64_t
>
name_id_map
;
for
(
uint64_t
i
=
0
;
i
<
num_of_resources
;
++
i
)
{
id_name_map
.
insert
(
std
::
make_pair
(
i
,
res_mgr
->
GetAllResou
ces
().
at
(
i
)
->
N
ame
()));
name_id_map
.
insert
(
std
::
make_pair
(
res_mgr
->
GetAllResou
ces
().
at
(
i
)
->
N
ame
(),
i
));
id_name_map
.
insert
(
std
::
make_pair
(
i
,
res_mgr
->
GetAllResou
rces
().
at
(
i
)
->
n
ame
()));
name_id_map
.
insert
(
std
::
make_pair
(
res_mgr
->
GetAllResou
rces
().
at
(
i
)
->
n
ame
(),
i
));
}
std
::
vector
<
std
::
vector
<
uint64_t
>
>
dis_matrix
;
...
...
@@ -40,23 +40,23 @@ ShortestPath(const ResourcePtr &src,
std
::
vector
<
bool
>
vis
(
num_of_resources
,
false
);
std
::
vector
<
uint64_t
>
dis
(
num_of_resources
,
MAXINT
);
for
(
auto
&
res
:
res_mgr
->
GetAllResouces
())
{
for
(
auto
&
res
:
res_mgr
->
GetAllResou
r
ces
())
{
auto
cur_node
=
std
::
static_pointer_cast
<
Node
>
(
res
);
auto
cur_neighbours
=
cur_node
->
GetNeighbours
();
for
(
auto
&
neighbour
:
cur_neighbours
)
{
auto
neighbour_res
=
std
::
static_pointer_cast
<
Resource
>
(
neighbour
.
neighbour_node
.
lock
());
dis_matrix
[
name_id_map
.
at
(
res
->
Name
())][
name_id_map
.
at
(
neighbour_res
->
N
ame
())]
=
dis_matrix
[
name_id_map
.
at
(
res
->
name
())][
name_id_map
.
at
(
neighbour_res
->
n
ame
())]
=
neighbour
.
connection
.
transport_cost
();
}
}
for
(
uint64_t
i
=
0
;
i
<
num_of_resources
;
++
i
)
{
dis
[
i
]
=
dis_matrix
[
name_id_map
.
at
(
src
->
N
ame
())][
i
];
dis
[
i
]
=
dis_matrix
[
name_id_map
.
at
(
src
->
n
ame
())][
i
];
}
vis
[
name_id_map
.
at
(
src
->
N
ame
())]
=
true
;
vis
[
name_id_map
.
at
(
src
->
n
ame
())]
=
true
;
std
::
vector
<
int64_t
>
parent
(
num_of_resources
,
-
1
);
for
(
uint64_t
i
=
0
;
i
<
num_of_resources
;
++
i
)
{
...
...
@@ -71,7 +71,7 @@ ShortestPath(const ResourcePtr &src,
vis
[
temp
]
=
true
;
if
(
i
==
0
)
{
parent
[
temp
]
=
name_id_map
.
at
(
src
->
N
ame
());
parent
[
temp
]
=
name_id_map
.
at
(
src
->
n
ame
());
}
for
(
uint64_t
j
=
0
;
j
<
num_of_resources
;
++
j
)
{
...
...
@@ -82,15 +82,15 @@ ShortestPath(const ResourcePtr &src,
}
}
int64_t
parent_idx
=
parent
[
name_id_map
.
at
(
dest
->
N
ame
())];
int64_t
parent_idx
=
parent
[
name_id_map
.
at
(
dest
->
n
ame
())];
if
(
parent_idx
!=
-
1
)
{
path
.
push_back
(
dest
->
N
ame
());
path
.
push_back
(
dest
->
n
ame
());
}
while
(
parent_idx
!=
-
1
)
{
path
.
push_back
(
id_name_map
.
at
(
parent_idx
));
parent_idx
=
parent
[
parent_idx
];
}
return
dis
[
name_id_map
.
at
(
dest
->
N
ame
())];
return
dis
[
name_id_map
.
at
(
dest
->
n
ame
())];
}
}
...
...
cpp/src/scheduler/CacheMgr.h
已删除
100644 → 0
浏览文件 @
471b8be8
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <memory>
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
// dummy cache_mgr
class
CacheMgr
{
};
using
CacheMgrPtr
=
std
::
shared_ptr
<
CacheMgr
>
;
}
}
}
cpp/src/scheduler/ResourceMgr.cpp
浏览文件 @
fd84fdcf
...
...
@@ -12,67 +12,31 @@ namespace zilliz {
namespace
milvus
{
namespace
engine
{
ResourceMgr
::
ResourceMgr
()
:
running_
(
false
)
{
}
uint64_t
ResourceMgr
::
GetNumOfComputeResource
()
{
uint64_t
count
=
0
;
for
(
auto
&
res
:
resources_
)
{
if
(
res
->
HasExecutor
())
{
++
count
;
}
}
return
count
;
}
std
::
vector
<
ResourcePtr
>
ResourceMgr
::
GetComputeResource
()
{
std
::
vector
<
ResourcePtr
>
result
;
void
ResourceMgr
::
Start
()
{
std
::
lock_guard
<
std
::
mutex
>
lck
(
resources_mutex_
);
for
(
auto
&
resource
:
resources_
)
{
if
(
resource
->
HasExecutor
())
{
result
.
emplace_back
(
resource
);
}
}
return
result
;
}
uint64_t
ResourceMgr
::
GetNumGpuResource
()
const
{
uint64_t
num
=
0
;
for
(
auto
&
res
:
resources_
)
{
if
(
res
->
Type
()
==
ResourceType
::
GPU
)
{
num
++
;
}
resource
->
Start
();
}
return
num
;
running_
=
true
;
worker_thread_
=
std
::
thread
(
&
ResourceMgr
::
event_process
,
this
);
}
ResourcePtr
ResourceMgr
::
GetResource
(
ResourceType
type
,
uint64_t
device_id
)
{
for
(
auto
&
resource
:
resources_
)
{
if
(
resource
->
Type
()
==
type
&&
resource
->
DeviceId
()
==
device_id
)
{
return
resource
;
}
void
ResourceMgr
::
Stop
()
{
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
event_mutex_
);
running_
=
false
;
queue_
.
push
(
nullptr
);
event_cv_
.
notify_one
();
}
return
nullptr
;
}
worker_thread_
.
join
();
ResourcePtr
ResourceMgr
::
GetResourceByName
(
std
::
string
name
)
{
std
::
lock_guard
<
std
::
mutex
>
lck
(
resources_mutex_
);
for
(
auto
&
resource
:
resources_
)
{
if
(
resource
->
Name
()
==
name
)
{
return
resource
;
}
resource
->
Stop
();
}
return
nullptr
;
}
std
::
vector
<
ResourcePtr
>
ResourceMgr
::
GetAllResouces
()
{
return
resources_
;
}
ResourceWPtr
...
...
@@ -85,75 +49,85 @@ ResourceMgr::Add(ResourcePtr &&resource) {
return
ret
;
}
if
(
resource
->
Type
()
==
ResourceType
::
DISK
)
{
resource
->
RegisterSubscriber
(
std
::
bind
(
&
ResourceMgr
::
post_event
,
this
,
std
::
placeholders
::
_1
));
if
(
resource
->
type
()
==
ResourceType
::
DISK
)
{
disk_resources_
.
emplace_back
(
ResourceWPtr
(
resource
));
}
resources_
.
emplace_back
(
resource
);
size_t
index
=
resources_
.
size
()
-
1
;
resource
->
RegisterSubscriber
(
std
::
bind
(
&
ResourceMgr
::
PostEvent
,
this
,
std
::
placeholders
::
_1
));
return
ret
;
}
void
ResourceMgr
::
Connect
(
const
std
::
string
&
name1
,
const
std
::
string
&
name2
,
Connection
&
connection
)
{
auto
res1
=
get_resource_by_nam
e
(
name1
);
auto
res2
=
get_resource_by_nam
e
(
name2
);
auto
res1
=
GetResourc
e
(
name1
);
auto
res2
=
GetResourc
e
(
name2
);
if
(
res1
&&
res2
)
{
res1
->
AddNeighbour
(
std
::
static_pointer_cast
<
Node
>
(
res2
),
connection
);
// TODO: enable when task balance supported
// res2->AddNeighbour(std::static_pointer_cast<Node>(res1), connection);
}
}
void
ResourceMgr
::
Connect
(
ResourceWPtr
&
res1
,
ResourceWPtr
&
res2
,
Connection
&
connection
)
{
if
(
auto
observe_a
=
res1
.
lock
())
{
if
(
auto
observe_b
=
res2
.
lock
())
{
observe_a
->
AddNeighbour
(
std
::
static_pointer_cast
<
Node
>
(
observe_b
),
connection
);
observe_b
->
AddNeighbour
(
std
::
static_pointer_cast
<
Node
>
(
observe_a
),
connection
);
}
}
ResourceMgr
::
Clear
()
{
std
::
lock_guard
<
std
::
mutex
>
lck
(
resources_mutex_
);
disk_resources_
.
clear
();
resources_
.
clear
();
}
void
ResourceMgr
::
Start
()
{
std
::
lock_guard
<
std
::
mutex
>
lck
(
resources_mutex_
);
std
::
vector
<
ResourcePtr
>
ResourceMgr
::
GetComputeResource
()
{
std
::
vector
<
ResourcePtr
>
result
;
for
(
auto
&
resource
:
resources_
)
{
resource
->
Start
();
if
(
resource
->
HasExecutor
())
{
result
.
emplace_back
(
resource
);
}
}
running_
=
true
;
worker_thread_
=
std
::
thread
(
&
ResourceMgr
::
event_process
,
this
);
return
result
;
}
void
ResourceMgr
::
Stop
()
{
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
event_mutex_
);
running_
=
false
;
queue_
.
push
(
nullptr
);
event_cv_
.
notify_one
();
ResourcePtr
ResourceMgr
::
GetResource
(
ResourceType
type
,
uint64_t
device_id
)
{
for
(
auto
&
resource
:
resources_
)
{
if
(
resource
->
type
()
==
type
&&
resource
->
device_id
()
==
device_id
)
{
return
resource
;
}
}
worker_thread_
.
join
();
return
nullptr
;
}
std
::
lock_guard
<
std
::
mutex
>
lck
(
resources_mutex_
);
ResourcePtr
ResourceMgr
::
GetResource
(
const
std
::
string
&
name
)
{
for
(
auto
&
resource
:
resources_
)
{
resource
->
Stop
();
if
(
resource
->
name
()
==
name
)
{
return
resource
;
}
}
return
nullptr
;
}
void
ResourceMgr
::
Clear
()
{
std
::
lock_guard
<
std
::
mutex
>
lck
(
resources_mutex_
);
disk_resources_
.
clear
();
resources_
.
clear
();
uint64_t
ResourceMgr
::
GetNumOfComputeResource
()
{
uint64_t
count
=
0
;
for
(
auto
&
res
:
resources_
)
{
if
(
res
->
HasExecutor
())
{
++
count
;
}
}
return
count
;
}
void
ResourceMgr
::
PostEvent
(
const
EventPtr
&
event
)
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
event_mutex_
);
queue_
.
emplace
(
event
);
event_cv_
.
notify_one
();
uint64_t
ResourceMgr
::
GetNumGpuResource
()
const
{
uint64_t
num
=
0
;
for
(
auto
&
res
:
resources_
)
{
if
(
res
->
type
()
==
ResourceType
::
GPU
)
{
num
++
;
}
}
return
num
;
}
std
::
string
...
...
@@ -180,14 +154,13 @@ ResourceMgr::DumpTaskTables() {
return
ss
.
str
();
}
ResourcePtr
ResourceMgr
::
get_resource_by_name
(
const
std
::
string
&
name
)
{
for
(
auto
&
res
:
resources_
)
{
if
(
res
->
Name
()
==
name
)
{
return
res
;
}
void
ResourceMgr
::
post_event
(
const
EventPtr
&
event
)
{
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
event_mutex_
);
queue_
.
emplace
(
event
);
}
return
nullptr
;
event_cv_
.
notify_one
()
;
}
void
...
...
@@ -203,8 +176,6 @@ ResourceMgr::event_process() {
break
;
}
// ENGINE_LOG_DEBUG << "ResourceMgr process " << *event;
if
(
subscriber_
)
{
subscriber_
(
event
);
}
...
...
cpp/src/scheduler/ResourceMgr.h
浏览文件 @
fd84fdcf
...
...
@@ -22,78 +22,63 @@ namespace engine {
class
ResourceMgr
{
public:
ResourceMgr
();
ResourceMgr
()
=
default
;
public:
/******** Management Interface ********/
void
Start
();
void
Stop
();
ResourceWPtr
Add
(
ResourcePtr
&&
resource
);
void
Connect
(
const
std
::
string
&
res1
,
const
std
::
string
&
res2
,
Connection
&
connection
);
void
Clear
();
inline
void
RegisterSubscriber
(
std
::
function
<
void
(
EventPtr
)
>
subscriber
)
{
subscriber_
=
std
::
move
(
subscriber
);
}
std
::
vector
<
ResourceWPtr
>
&
public:
/******** Management Interface ********/
inline
std
::
vector
<
ResourceWPtr
>
&
GetDiskResources
()
{
return
disk_resources_
;
}
uint64_t
GetNumGpuResource
()
const
;
// TODO: why return shared pointer
inline
std
::
vector
<
ResourcePtr
>
GetAllResources
()
{
return
resources_
;
}
std
::
vector
<
ResourcePtr
>
GetComputeResource
();
ResourcePtr
GetResource
(
ResourceType
type
,
uint64_t
device_id
);
ResourcePtr
GetResource
ByName
(
std
::
string
name
);
GetResource
(
const
std
::
string
&
name
);
std
::
vector
<
ResourcePtr
>
GetAllResouces
();
/*
* Return account of resource which enable executor;
*/
uint64_t
GetNumOfComputeResource
();
std
::
vector
<
ResourcePtr
>
GetComputeResource
();
/*
* Add resource into Resource Management;
* Generate functions on events;
* Functions only modify bool variable, like event trigger;
*/
ResourceWPtr
Add
(
ResourcePtr
&&
resource
);
void
Connect
(
const
std
::
string
&
res1
,
const
std
::
string
&
res2
,
Connection
&
connection
);
/*
* Create connection between A and B;
*/
void
Connect
(
ResourceWPtr
&
res1
,
ResourceWPtr
&
res2
,
Connection
&
connection
);
/*
* Synchronous start all resource;
* Last, start event process thread;
*/
void
Start
();
void
Stop
();
void
Clear
();
void
PostEvent
(
const
EventPtr
&
event
);
uint64_t
GetNumGpuResource
()
const
;
public:
// TODO: add stats interface(low)
public:
/******** Utlitity Functions ********/
/******** Utility Functions ********/
std
::
string
Dump
();
...
...
@@ -101,26 +86,26 @@ public:
DumpTaskTables
();
private:
ResourcePtr
get_resource_by_name
(
const
std
::
string
&
name
);
void
post_event
(
const
EventPtr
&
event
);
void
event_process
();
private:
std
::
queue
<
EventPtr
>
queue_
;
std
::
function
<
void
(
EventPtr
)
>
subscriber_
=
nullptr
;
bool
running_
;
bool
running_
=
false
;
std
::
vector
<
ResourceWPtr
>
disk_resources_
;
std
::
vector
<
ResourcePtr
>
resources_
;
mutable
std
::
mutex
resources_mutex_
;
std
::
thread
worker_thread_
;
std
::
queue
<
EventPtr
>
queue_
;
std
::
function
<
void
(
EventPtr
)
>
subscriber_
=
nullptr
;
std
::
mutex
event_mutex_
;
std
::
condition_variable
event_cv_
;
std
::
thread
worker_thread_
;
};
using
ResourceMgrPtr
=
std
::
shared_ptr
<
ResourceMgr
>
;
...
...
cpp/src/scheduler/SchedInst.cpp
浏览文件 @
fd84fdcf
...
...
@@ -48,7 +48,7 @@ StartSchedulerService() {
enable_loader
,
enable_executor
));
if
(
res
.
lock
()
->
T
ype
()
==
ResourceType
::
GPU
)
{
if
(
res
.
lock
()
->
t
ype
()
==
ResourceType
::
GPU
)
{
auto
pinned_memory
=
resconf
.
GetInt64Value
(
server
::
CONFIG_RESOURCE_PIN_MEMORY
,
300
);
auto
temp_memory
=
resconf
.
GetInt64Value
(
server
::
CONFIG_RESOURCE_TEMP_MEMORY
,
300
);
auto
resource_num
=
resconf
.
GetInt64Value
(
server
::
CONFIG_RESOURCE_NUM
,
2
);
...
...
cpp/src/scheduler/Scheduler.cpp
浏览文件 @
fd84fdcf
...
...
@@ -143,7 +143,7 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
auto
task
=
load_completed_event
->
task_table_item_
->
task
;
// if this resource is disk, assign it to smallest cost resource
if
(
self
->
T
ype
()
==
ResourceType
::
DISK
)
{
if
(
self
->
t
ype
()
==
ResourceType
::
DISK
)
{
// step 1: calculate shortest path per resource, from disk to compute resource
auto
compute_resources
=
res_mgr_
.
lock
()
->
GetComputeResource
();
std
::
vector
<
std
::
vector
<
std
::
string
>>
paths
;
...
...
@@ -176,11 +176,11 @@ Scheduler::OnLoadCompleted(const EventPtr &event) {
task
->
path
()
=
task_path
;
}
if
(
self
->
N
ame
()
==
task
->
path
().
Last
())
{
if
(
self
->
n
ame
()
==
task
->
path
().
Last
())
{
self
->
WakeupLoader
();
}
else
{
auto
next_res_name
=
task
->
path
().
Next
();
auto
next_res
=
res_mgr_
.
lock
()
->
GetResource
ByName
(
next_res_name
);
auto
next_res
=
res_mgr_
.
lock
()
->
GetResource
(
next_res_name
);
load_completed_event
->
task_table_item_
->
Move
();
next_res
->
task_table
().
Put
(
task
);
}
...
...
cpp/src/scheduler/TaskTable.cpp
浏览文件 @
fd84fdcf
...
...
@@ -6,6 +6,8 @@
#include "TaskTable.h"
#include "event/TaskTableUpdatedEvent.h"
#include "Utils.h"
#include <vector>
#include <sstream>
#include <ctime>
...
...
@@ -15,14 +17,6 @@ namespace zilliz {
namespace
milvus
{
namespace
engine
{
uint64_t
get_now_timestamp
()
{
std
::
chrono
::
time_point
<
std
::
chrono
::
system_clock
>
now
=
std
::
chrono
::
system_clock
::
now
();
auto
duration
=
now
.
time_since_epoch
();
auto
millis
=
std
::
chrono
::
duration_cast
<
std
::
chrono
::
milliseconds
>
(
duration
).
count
();
return
millis
;
}
std
::
string
ToString
(
TaskTableItemState
state
)
{
switch
(
state
)
{
...
...
@@ -64,7 +58,7 @@ TaskTableItem::Load() {
if
(
state
==
TaskTableItemState
::
START
)
{
state
=
TaskTableItemState
::
LOADING
;
lock
.
unlock
();
timestamp
.
load
=
get_
now
_timestamp
();
timestamp
.
load
=
get_
current
_timestamp
();
return
true
;
}
return
false
;
...
...
@@ -75,7 +69,7 @@ TaskTableItem::Loaded() {
if
(
state
==
TaskTableItemState
::
LOADING
)
{
state
=
TaskTableItemState
::
LOADED
;
lock
.
unlock
();
timestamp
.
loaded
=
get_
now
_timestamp
();
timestamp
.
loaded
=
get_
current
_timestamp
();
return
true
;
}
return
false
;
...
...
@@ -86,7 +80,7 @@ TaskTableItem::Execute() {
if
(
state
==
TaskTableItemState
::
LOADED
)
{
state
=
TaskTableItemState
::
EXECUTING
;
lock
.
unlock
();
timestamp
.
execute
=
get_
now
_timestamp
();
timestamp
.
execute
=
get_
current
_timestamp
();
return
true
;
}
return
false
;
...
...
@@ -97,8 +91,8 @@ TaskTableItem::Executed() {
if
(
state
==
TaskTableItemState
::
EXECUTING
)
{
state
=
TaskTableItemState
::
EXECUTED
;
lock
.
unlock
();
timestamp
.
executed
=
get_
now
_timestamp
();
timestamp
.
finish
=
get_
now
_timestamp
();
timestamp
.
executed
=
get_
current
_timestamp
();
timestamp
.
finish
=
get_
current
_timestamp
();
return
true
;
}
return
false
;
...
...
@@ -109,7 +103,7 @@ TaskTableItem::Move() {
if
(
state
==
TaskTableItemState
::
LOADED
)
{
state
=
TaskTableItemState
::
MOVING
;
lock
.
unlock
();
timestamp
.
move
=
get_
now
_timestamp
();
timestamp
.
move
=
get_
current
_timestamp
();
return
true
;
}
return
false
;
...
...
@@ -120,8 +114,8 @@ TaskTableItem::Moved() {
if
(
state
==
TaskTableItemState
::
MOVING
)
{
state
=
TaskTableItemState
::
MOVED
;
lock
.
unlock
();
timestamp
.
moved
=
get_
now
_timestamp
();
timestamp
.
finish
=
get_
now
_timestamp
();
timestamp
.
moved
=
get_
current
_timestamp
();
timestamp
.
finish
=
get_
current
_timestamp
();
return
true
;
}
return
false
;
...
...
@@ -177,7 +171,7 @@ TaskTable::Put(TaskPtr task) {
item
->
id
=
id_
++
;
item
->
task
=
std
::
move
(
task
);
item
->
state
=
TaskTableItemState
::
START
;
item
->
timestamp
.
start
=
get_
now
_timestamp
();
item
->
timestamp
.
start
=
get_
current
_timestamp
();
table_
.
push_back
(
item
);
if
(
subscriber_
)
{
subscriber_
();
...
...
@@ -192,7 +186,7 @@ TaskTable::Put(std::vector<TaskPtr> &tasks) {
item
->
id
=
id_
++
;
item
->
task
=
std
::
move
(
task
);
item
->
state
=
TaskTableItemState
::
START
;
item
->
timestamp
.
start
=
get_
now
_timestamp
();
item
->
timestamp
.
start
=
get_
current
_timestamp
();
table_
.
push_back
(
item
);
}
if
(
subscriber_
)
{
...
...
cpp/src/scheduler/TaskTable.h
浏览文件 @
fd84fdcf
...
...
@@ -40,20 +40,17 @@ struct TaskTimestamp {
};
struct
TaskTableItem
{
TaskTableItem
()
:
id
(
0
),
state
(
TaskTableItemState
::
INVALID
),
mutex
()
,
priority
(
0
)
{}
TaskTableItem
()
:
id
(
0
),
state
(
TaskTableItemState
::
INVALID
),
mutex
()
{}
TaskTableItem
(
const
TaskTableItem
&
src
)
:
id
(
src
.
id
),
state
(
src
.
state
),
mutex
()
,
priority
(
src
.
priority
)
{}
:
id
(
src
.
id
),
state
(
src
.
state
),
mutex
()
{}
uint64_t
id
;
// auto increment from 0;
// TODO: add tag into task
TaskPtr
task
;
// the task;
TaskTableItemState
state
;
// the state;
std
::
mutex
mutex
;
TaskTimestamp
timestamp
;
uint8_t
priority
;
// just a number, meaningless;
bool
IsFinish
();
...
...
@@ -113,7 +110,7 @@ public:
Get
(
uint64_t
index
);
/*
* TODO
* TODO
(wxyu): BIG GC
* Remove sequence task which is DONE or MOVED from front;
* Called by ?
*/
...
...
@@ -135,6 +132,7 @@ public:
Size
()
{
return
table_
.
size
();
}
public:
TaskTableItemPtr
&
operator
[](
uint64_t
index
)
{
...
...
@@ -225,7 +223,6 @@ public:
Dump
();
private:
// TODO: map better ?
std
::
uint64_t
id_
=
0
;
mutable
std
::
mutex
id_mutex_
;
std
::
deque
<
TaskTableItemPtr
>
table_
;
...
...
cpp/src/scheduler/Utils.cpp
浏览文件 @
fd84fdcf
...
...
@@ -4,16 +4,17 @@
* Proprietary and confidential.
******************************************************************************/
#include <chrono>
#include "Utils.h"
#include <chrono>
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
uint64_t
get_current_timestamp
()
{
get_current_timestamp
()
{
std
::
chrono
::
time_point
<
std
::
chrono
::
system_clock
>
now
=
std
::
chrono
::
system_clock
::
now
();
auto
duration
=
now
.
time_since_epoch
();
auto
millis
=
std
::
chrono
::
duration_cast
<
std
::
chrono
::
milliseconds
>
(
duration
).
count
();
...
...
cpp/src/scheduler/Utils.h
浏览文件 @
fd84fdcf
...
...
@@ -3,6 +3,7 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <cstdint>
...
...
cpp/src/scheduler/resource/Connection.h
浏览文件 @
fd84fdcf
...
...
@@ -15,6 +15,7 @@ namespace engine {
class
Connection
{
public:
// TODO: update construct function, speed: double->uint64_t
Connection
(
std
::
string
name
,
double
speed
)
:
name_
(
std
::
move
(
name
)),
speed_
(
speed
)
{}
...
...
cpp/src/scheduler/resource/RegisterHandler.h
已删除
100644 → 0
浏览文件 @
471b8be8
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <memory>
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
class
RegisterHandler
{
public:
virtual
void
Exec
()
=
0
;
};
using
RegisterHandlerPtr
=
std
::
shared_ptr
<
RegisterHandler
>
;
}
}
}
\ No newline at end of file
cpp/src/scheduler/resource/Resource.cpp
浏览文件 @
fd84fdcf
...
...
@@ -12,7 +12,8 @@ namespace zilliz {
namespace
milvus
{
namespace
engine
{
std
::
ostream
&
operator
<<
(
std
::
ostream
&
out
,
const
Resource
&
resource
)
{
std
::
ostream
&
operator
<<
(
std
::
ostream
&
out
,
const
Resource
&
resource
)
{
out
<<
resource
.
Dump
();
return
out
;
}
...
...
@@ -25,11 +26,9 @@ Resource::Resource(std::string name,
:
name_
(
std
::
move
(
name
)),
type_
(
type
),
device_id_
(
device_id
),
running_
(
false
),
enable_loader_
(
enable_loader
),
enable_executor_
(
enable_executor
),
load_flag_
(
false
),
exec_flag_
(
false
)
{
enable_executor_
(
enable_executor
)
{
// register subscriber in tasktable
task_table_
.
RegisterSubscriber
([
&
]
{
if
(
subscriber_
)
{
auto
event
=
std
::
make_shared
<
TaskTableUpdatedEvent
>
(
shared_from_this
());
...
...
@@ -38,7 +37,8 @@ Resource::Resource(std::string name,
});
}
void
Resource
::
Start
()
{
void
Resource
::
Start
()
{
running_
=
true
;
if
(
enable_loader_
)
{
loader_thread_
=
std
::
thread
(
&
Resource
::
loader_function
,
this
);
...
...
@@ -48,7 +48,8 @@ void Resource::Start() {
}
}
void
Resource
::
Stop
()
{
void
Resource
::
Stop
()
{
running_
=
false
;
if
(
enable_loader_
)
{
WakeupLoader
();
...
...
@@ -60,11 +61,8 @@ void Resource::Stop() {
}
}
TaskTable
&
Resource
::
task_table
()
{
return
task_table_
;
}
void
Resource
::
WakeupLoader
()
{
void
Resource
::
WakeupLoader
()
{
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
load_mutex_
);
load_flag_
=
true
;
...
...
@@ -72,7 +70,8 @@ void Resource::WakeupLoader() {
load_cv_
.
notify_one
();
}
void
Resource
::
WakeupExecutor
()
{
void
Resource
::
WakeupExecutor
()
{
{
std
::
lock_guard
<
std
::
mutex
>
lock
(
exec_mutex_
);
exec_flag_
=
true
;
...
...
@@ -80,6 +79,15 @@ void Resource::WakeupExecutor() {
exec_cv_
.
notify_one
();
}
uint64_t
Resource
::
NumOfTaskToExec
()
{
uint64_t
count
=
0
;
for
(
auto
&
task
:
task_table_
)
{
if
(
task
->
state
==
TaskTableItemState
::
LOADED
)
++
count
;
}
return
count
;
}
TaskTableItemPtr
Resource
::
pick_task_load
()
{
auto
indexes
=
task_table_
.
PickToLoad
(
10
);
for
(
auto
index
:
indexes
)
{
...
...
@@ -156,11 +164,6 @@ void Resource::executor_function() {
}
}
RegisterHandlerPtr
Resource
::
GetRegisterFunc
(
const
RegisterType
&
type
)
{
// construct object each time.
return
register_table_
[
type
]();
}
}
}
}
\ No newline at end of file
cpp/src/scheduler/resource/Resource.h
浏览文件 @
fd84fdcf
...
...
@@ -21,7 +21,6 @@
#include "../task/Task.h"
#include "Connection.h"
#include "Node.h"
#include "RegisterHandler.h"
namespace
zilliz
{
...
...
@@ -35,13 +34,6 @@ enum class ResourceType {
GPU
=
2
};
enum
class
RegisterType
{
START_UP
,
ON_FINISH_TASK
,
ON_COPY_COMPLETED
,
ON_TASK_TABLE_UPDATED
,
};
class
Resource
:
public
Node
,
public
std
::
enable_shared_from_this
<
Resource
>
{
public:
/*
...
...
@@ -68,56 +60,51 @@ class Resource : public Node, public std::enable_shared_from_this<Resource> {
void
WakeupExecutor
();
public:
template
<
typename
T
>
void
Register_T
(
const
RegisterType
&
type
)
{
register_table_
.
emplace
(
type
,
[]
{
return
std
::
make_shared
<
T
>
();
});
}
RegisterHandlerPtr
GetRegisterFunc
(
const
RegisterType
&
type
);
inline
void
RegisterSubscriber
(
std
::
function
<
void
(
EventPtr
)
>
subscriber
)
{
subscriber_
=
std
::
move
(
subscriber
);
}
inline
virtual
std
::
string
Dump
()
const
{
return
"<Resource>"
;
}
public:
inline
std
::
string
N
ame
()
const
{
n
ame
()
const
{
return
name_
;
}
inline
ResourceType
T
ype
()
const
{
t
ype
()
const
{
return
type_
;
}
inline
uint64_t
DeviceId
()
{
device_id
()
const
{
return
device_id_
;
}
// TODO: better name?
TaskTable
&
task_table
()
{
return
task_table_
;
}
public:
inline
bool
HasLoader
()
{
HasLoader
()
const
{
return
enable_loader_
;
}
// TODO: better name?
inline
bool
HasExecutor
()
{
HasExecutor
()
const
{
return
enable_executor_
;
}
// TODO: const
uint64_t
NumOfTaskToExec
()
{
uint64_t
count
=
0
;
for
(
auto
&
task
:
task_table_
)
{
if
(
task
->
state
==
TaskTableItemState
::
LOADED
)
++
count
;
}
return
count
;
}
NumOfTaskToExec
();
// TODO: need double ?
inline
uint64_t
...
...
@@ -130,14 +117,6 @@ class Resource : public Node, public std::enable_shared_from_this<Resource> {
return
total_task_
;
}
TaskTable
&
task_table
();
inline
virtual
std
::
string
Dump
()
const
{
return
"<Resource>"
;
}
friend
std
::
ostream
&
operator
<<
(
std
::
ostream
&
out
,
const
Resource
&
resource
);
protected:
...
...
@@ -198,6 +177,7 @@ class Resource : public Node, public std::enable_shared_from_this<Resource> {
protected:
uint64_t
device_id_
;
std
::
string
name_
;
private:
ResourceType
type_
;
...
...
@@ -206,17 +186,16 @@ class Resource : public Node, public std::enable_shared_from_this<Resource> {
uint64_t
total_cost_
=
0
;
uint64_t
total_task_
=
0
;
std
::
map
<
RegisterType
,
std
::
function
<
RegisterHandlerPtr
()
>>
register_table_
;
std
::
function
<
void
(
EventPtr
)
>
subscriber_
=
nullptr
;
bool
running_
;
bool
running_
=
false
;
bool
enable_loader_
=
true
;
bool
enable_executor_
=
true
;
std
::
thread
loader_thread_
;
std
::
thread
executor_thread_
;
bool
load_flag_
;
bool
exec_flag_
;
bool
load_flag_
=
false
;
bool
exec_flag_
=
false
;
std
::
mutex
load_mutex_
;
std
::
mutex
exec_mutex_
;
std
::
condition_variable
load_cv_
;
...
...
cpp/src/scheduler/task/DeleteTask.cpp
浏览文件 @
fd84fdcf
...
...
@@ -24,12 +24,6 @@ XDeleteTask::Execute() {
delete_context_ptr_
->
ResourceDone
();
}
TaskPtr
XDeleteTask
::
Clone
()
{
auto
task
=
std
::
make_shared
<
XDeleteTask
>
(
delete_context_ptr_
);
return
task
;
}
}
}
}
cpp/src/scheduler/task/DeleteTask.h
浏览文件 @
fd84fdcf
...
...
@@ -24,9 +24,6 @@ public:
void
Execute
()
override
;
TaskPtr
Clone
()
override
;
public:
DeleteContextPtr
delete_context_ptr_
;
};
...
...
cpp/src/scheduler/task/SearchTask.cpp
浏览文件 @
fd84fdcf
...
...
@@ -193,16 +193,6 @@ XSearchTask::Execute() {
index_engine_
=
nullptr
;
}
TaskPtr
XSearchTask
::
Clone
()
{
auto
ret
=
std
::
make_shared
<
XSearchTask
>
(
file_
);
ret
->
index_id_
=
index_id_
;
ret
->
index_engine_
=
index_engine_
->
Clone
();
ret
->
search_contexts_
=
search_contexts_
;
ret
->
metric_l2
=
metric_l2
;
return
ret
;
}
Status
XSearchTask
::
ClusterResult
(
const
std
::
vector
<
long
>
&
output_ids
,
const
std
::
vector
<
float
>
&
output_distence
,
uint64_t
nq
,
...
...
cpp/src/scheduler/task/SearchTask.h
浏览文件 @
fd84fdcf
...
...
@@ -23,9 +23,6 @@ public:
void
Execute
()
override
;
TaskPtr
Clone
()
override
;
public:
static
Status
ClusterResult
(
const
std
::
vector
<
long
>
&
output_ids
,
const
std
::
vector
<
float
>
&
output_distence
,
...
...
cpp/src/scheduler/task/Task.h
浏览文件 @
fd84fdcf
...
...
@@ -68,14 +68,9 @@ public:
virtual
void
Execute
()
=
0
;
// TODO: dont use this method to support task move
virtual
TaskPtr
Clone
()
=
0
;
public:
Path
task_path_
;
std
::
vector
<
SearchContextPtr
>
search_contexts_
;
ScheduleTaskPtr
task_
;
TaskType
type_
;
TaskLabelPtr
label_
=
nullptr
;
};
...
...
cpp/src/scheduler/task/TaskConvert.cpp
浏览文件 @
fd84fdcf
...
...
@@ -21,7 +21,6 @@ TaskConvert(const ScheduleTaskPtr &schedule_task) {
auto
task
=
std
::
make_shared
<
XSearchTask
>
(
load_task
->
file_
);
task
->
label
()
=
std
::
make_shared
<
DefaultLabel
>
();
task
->
search_contexts_
=
load_task
->
search_contexts_
;
task
->
task_
=
schedule_task
;
return
task
;
}
case
ScheduleTaskType
::
kDelete
:
{
...
...
cpp/src/scheduler/task/TestTask.cpp
浏览文件 @
fd84fdcf
...
...
@@ -27,15 +27,6 @@ TestTask::Execute() {
done_
=
true
;
}
TaskPtr
TestTask
::
Clone
()
{
TableFileSchemaPtr
dummy
=
nullptr
;
auto
ret
=
std
::
make_shared
<
TestTask
>
(
dummy
);
ret
->
load_count_
=
load_count_
;
ret
->
exec_count_
=
exec_count_
;
return
ret
;
}
void
TestTask
::
Wait
()
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
mutex_
);
...
...
cpp/src/scheduler/task/TestTask.h
浏览文件 @
fd84fdcf
...
...
@@ -23,9 +23,6 @@ public:
void
Execute
()
override
;
TaskPtr
Clone
()
override
;
void
Wait
();
...
...
cpp/src/scheduler/tasklabel/TaskLabel.h
浏览文件 @
fd84fdcf
...
...
@@ -25,6 +25,7 @@ public:
}
protected:
explicit
TaskLabel
(
TaskLabelType
type
)
:
type_
(
type
)
{}
private:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录