Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
216bd9f0
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
216bd9f0
编写于
8月 15, 2019
作者:
X
xj.lin
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
MS-337 dev basic Resource
Former-commit-id: 81c6292f69db715897a1af97baef6b47396c8932
上级
3bec8a1a
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
334 addition
and
150 deletion
+334
-150
cpp/src/scheduler/resource/CpuResource.cpp
cpp/src/scheduler/resource/CpuResource.cpp
+38
-0
cpp/src/scheduler/resource/CpuResource.h
cpp/src/scheduler/resource/CpuResource.h
+3
-18
cpp/src/scheduler/resource/DiskResource.cpp
cpp/src/scheduler/resource/DiskResource.cpp
+28
-0
cpp/src/scheduler/resource/DiskResource.h
cpp/src/scheduler/resource/DiskResource.h
+3
-4
cpp/src/scheduler/resource/GpuResource.cpp
cpp/src/scheduler/resource/GpuResource.cpp
+28
-0
cpp/src/scheduler/resource/GpuResource.h
cpp/src/scheduler/resource/GpuResource.h
+3
-4
cpp/src/scheduler/resource/Node.cpp
cpp/src/scheduler/resource/Node.cpp
+66
-0
cpp/src/scheduler/resource/Node.h
cpp/src/scheduler/resource/Node.h
+12
-9
cpp/src/scheduler/resource/RegisterHandler.h
cpp/src/scheduler/resource/RegisterHandler.h
+24
-0
cpp/src/scheduler/resource/Resource.cpp
cpp/src/scheduler/resource/Resource.cpp
+98
-0
cpp/src/scheduler/resource/Resource.h
cpp/src/scheduler/resource/Resource.h
+31
-115
未找到文件。
cpp/src/scheduler/resource/CpuResource.cpp
0 → 100644
浏览文件 @
216bd9f0
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "CpuResource.h"
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
CpuResource
::
CpuResource
(
std
::
string
name
)
:
Resource
(
std
::
move
(
name
),
ResourceType
::
CPU
)
{}
void
CpuResource
::
LoadFile
(
TaskPtr
task
)
{
//if (src.type == DISK) {
// fd = open(filename);
// content = fd.read();
// close(fd);
//} else if (src.type == CPU) {
// memcpy(src, dest, len);
//} else if (src.type == GPU) {
// cudaMemcpyD2H(src, dest);
//} else {
// // unknown type, exception
//}
}
void
CpuResource
::
Process
(
TaskPtr
task
)
{
}
}
}
}
\ No newline at end of file
cpp/src/scheduler/resource/CpuResource.h
浏览文件 @
216bd9f0
...
...
@@ -17,29 +17,14 @@ namespace engine {
class
CpuResource
:
public
Resource
{
public:
explicit
CpuResource
(
std
::
string
name
)
:
Resource
(
std
::
move
(
name
),
ResourceType
::
CPU
)
{}
CpuResource
(
std
::
string
name
);
protected:
void
LoadFile
(
TaskPtr
task
)
override
{
// if (src.type == DISK) {
// fd = open(filename);
// content = fd.read();
// close(fd);
// } else if (src.type == CPU) {
// memcpy(src, dest, len);
// } else if (src.type == GPU) {
// cudaMemcpyD2H(src, dest);
// } else {
// // unknown type, exception
// }
}
LoadFile
(
TaskPtr
task
)
override
;
void
Process
(
TaskPtr
task
)
override
{
task
->
Execute
();
}
Process
(
TaskPtr
task
)
override
;
};
}
...
...
cpp/src/scheduler/resource/DiskResource.cpp
0 → 100644
浏览文件 @
216bd9f0
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "DiskResource.h"
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
DiskResource
::
DiskResource
(
std
::
string
name
)
:
Resource
(
std
::
move
(
name
),
ResourceType
::
DISK
)
{}
void
DiskResource
::
LoadFile
(
TaskPtr
task
)
{
}
void
DiskResource
::
Process
(
TaskPtr
task
)
{
}
}
}
}
cpp/src/scheduler/resource/DiskResource.h
浏览文件 @
216bd9f0
...
...
@@ -16,15 +16,14 @@ namespace engine {
class
DiskResource
:
public
Resource
{
public:
explicit
DiskResource
(
std
::
string
name
)
:
Resource
(
std
::
move
(
name
),
ResourceType
::
DISK
)
{}
DiskResource
(
std
::
string
name
);
protected:
void
LoadFile
(
TaskPtr
task
)
override
{}
LoadFile
(
TaskPtr
task
)
override
;
void
Process
(
TaskPtr
task
)
override
{}
Process
(
TaskPtr
task
)
override
;
};
}
...
...
cpp/src/scheduler/resource/GpuResource.cpp
0 → 100644
浏览文件 @
216bd9f0
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "GpuResource.h"
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
GpuResource
::
GpuResource
(
std
::
string
name
)
:
Resource
(
std
::
move
(
name
),
ResourceType
::
GPU
)
{}
void
GpuResource
::
LoadFile
(
TaskPtr
task
)
{
}
void
GpuResource
::
Process
(
TaskPtr
task
)
{
}
}
}
}
cpp/src/scheduler/resource/GpuResource.h
浏览文件 @
216bd9f0
...
...
@@ -16,15 +16,14 @@ namespace engine {
class
GpuResource
:
public
Resource
{
public:
explicit
GpuResource
(
std
::
string
name
)
:
Resource
(
std
::
move
(
name
),
ResourceType
::
GPU
)
{}
GpuResource
(
std
::
string
name
);
protected:
void
LoadFile
(
TaskPtr
task
)
override
{}
LoadFile
(
TaskPtr
task
)
override
;
void
Process
(
TaskPtr
task
)
override
{}
Process
(
TaskPtr
task
)
override
;
};
}
...
...
cpp/src/scheduler/resource/Node.cpp
0 → 100644
浏览文件 @
216bd9f0
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <atomic>
#include "Node.h"
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
Node
::
Node
()
{
static
std
::
atomic_uint_fast8_t
counter
(
0
);
id_
=
counter
++
;
}
void
Node
::
DelNeighbour
(
const
NeighbourNodePtr
&
neighbour_ptr
)
{
std
::
lock_guard
<
std
::
mutex
>
lk
(
mutex_
);
if
(
auto
s
=
neighbour_ptr
.
lock
())
{
auto
search
=
neighbours_
.
find
(
s
->
id_
);
if
(
search
!=
neighbours_
.
end
())
{
neighbours_
.
erase
(
search
);
}
}
}
bool
Node
::
IsNeighbour
(
const
NeighbourNodePtr
&
neighbour_ptr
)
{
std
::
lock_guard
<
std
::
mutex
>
lk
(
mutex_
);
if
(
auto
s
=
neighbour_ptr
.
lock
())
{
auto
search
=
neighbours_
.
find
(
s
->
id_
);
if
(
search
!=
neighbours_
.
end
())
{
return
true
;
}
}
return
false
;
}
std
::
vector
<
Neighbour
>
Node
::
GetNeighbours
()
{
std
::
lock_guard
<
std
::
mutex
>
lk
(
mutex_
);
std
::
vector
<
Neighbour
>
ret
;
for
(
auto
&
e
:
neighbours_
)
{
ret
.
push_back
(
e
.
second
);
}
return
ret
;
}
std
::
string
Node
::
Dump
()
{
// TODO(linxj): what's that?
return
std
::
__cxx11
::
string
();
}
void
Node
::
AddNeighbour
(
const
NeighbourNodePtr
&
neighbour_node
,
Connection
&
connection
)
{
std
::
lock_guard
<
std
::
mutex
>
lk
(
mutex_
);
if
(
auto
s
=
neighbour_node
.
lock
())
{
Neighbour
neighbour
(
neighbour_node
,
connection
);
neighbours_
[
s
->
id_
]
=
neighbour
;
}
// else do nothing, consider it..
}
}
}
}
cpp/src/scheduler/resource/Node.h
浏览文件 @
216bd9f0
...
...
@@ -7,6 +7,7 @@
#include <vector>
#include <memory>
#include <map>
#include "../TaskTable.h"
#include "Connection.h"
...
...
@@ -28,29 +29,31 @@ struct Neighbour {
Connection
connection
;
};
// TODO(linxj): return type void -> Status
class
Node
{
public:
Node
();
void
AddNeighbour
(
const
NeighbourNodePtr
&
neighbour_node
,
Connection
&
connection
)
{
Neighbour
neighbour
(
neighbour_node
,
connection
);
neighbours_
.
emplace_back
(
neighbour
);
}
AddNeighbour
(
const
NeighbourNodePtr
&
neighbour_node
,
Connection
&
connection
);
void
DelNeighbour
(
NeighbourNodePtr
neighbour_ptr
)
{}
DelNeighbour
(
const
NeighbourNodePtr
&
neighbour_ptr
);
bool
IsNeighbour
(
NeighbourNodePtr
neighbour_ptr
)
{}
IsNeighbour
(
const
NeighbourNodePtr
&
neighbour_ptr
);
const
std
::
vector
<
Neighbour
>
&
GetNeighbours
()
{}
std
::
vector
<
Neighbour
>
GetNeighbours
()
;
public:
std
::
string
Dump
();
private:
std
::
vector
<
Neighbour
>
neighbours_
;
std
::
mutex
mutex_
;
uint8_t
id_
;
std
::
map
<
uint8_t
,
Neighbour
>
neighbours_
;
};
using
NodePtr
=
std
::
shared_ptr
<
Node
>
;
...
...
cpp/src/scheduler/resource/RegisterHandler.h
0 → 100644
浏览文件 @
216bd9f0
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <memory>
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
class
RegisterHandler
{
public:
virtual
void
Exec
()
=
0
;
};
using
RegisterHandlerPtr
=
std
::
shared_ptr
<
RegisterHandler
>
;
}
}
}
\ No newline at end of file
cpp/src/scheduler/resource/Resource.cpp
0 → 100644
浏览文件 @
216bd9f0
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Resource.h"
namespace
zilliz
{
namespace
milvus
{
namespace
engine
{
Resource
::
Resource
(
std
::
string
name
,
ResourceType
type
)
:
name_
(
std
::
move
(
name
)),
type_
(
type
),
running_
(
false
),
load_flag_
(
false
),
exec_flag_
(
false
)
{
}
void
Resource
::
Start
()
{
loader_thread_
=
std
::
thread
(
&
Resource
::
loader_function
,
this
);
executor_thread_
=
std
::
thread
(
&
Resource
::
executor_function
,
this
);
}
void
Resource
::
Stop
()
{
running_
=
false
;
WakeupLoader
();
WakeupExecutor
();
}
TaskTable
&
Resource
::
task_table
()
{
return
task_table_
;
}
void
Resource
::
WakeupExecutor
()
{
exec_cv_
.
notify_one
();
}
void
Resource
::
WakeupLoader
()
{
load_cv_
.
notify_one
();
}
TaskPtr
Resource
::
pick_task_load
()
{
auto
indexes
=
PickToLoad
(
task_table_
,
3
);
for
(
auto
index
:
indexes
)
{
// try to set one task loading, then return
if
(
task_table_
.
Load
(
index
))
return
task_table_
.
Get
(
index
).
task
;
// else try next
}
return
nullptr
;
}
TaskPtr
Resource
::
pick_task_execute
()
{
auto
indexes
=
PickToExecute
(
task_table_
,
3
);
for
(
auto
index
:
indexes
)
{
// try to set one task executing, then return
if
(
task_table_
.
Execute
(
index
))
return
task_table_
.
Get
(
index
).
task
;
// else try next
}
return
nullptr
;
}
void
Resource
::
loader_function
()
{
while
(
running_
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
load_mutex_
);
load_cv_
.
wait
(
lock
,
[
&
]
{
return
load_flag_
;
});
auto
task
=
pick_task_load
();
if
(
task
)
{
LoadFile
(
task
);
GetRegisterFunc
(
RegisterType
::
ON_COPY_COMPLETED
)
->
Exec
();
}
}
}
void
Resource
::
executor_function
()
{
GetRegisterFunc
(
RegisterType
::
START_UP
)
->
Exec
();
while
(
running_
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
exec_mutex_
);
exec_cv_
.
wait
(
lock
,
[
&
]
{
return
exec_flag_
;
});
auto
task
=
pick_task_execute
();
if
(
task
)
{
Process
(
task
);
GetRegisterFunc
(
RegisterType
::
ON_FINISH_TASK
)
->
Exec
();
}
}
}
RegisterHandlerPtr
Resource
::
GetRegisterFunc
(
const
RegisterType
&
type
)
{
// construct object each time.
return
register_table_
[
type
]();
}
}
}
}
\ No newline at end of file
cpp/src/scheduler/resource/Resource.h
浏览文件 @
216bd9f0
...
...
@@ -15,8 +15,9 @@
#include "../TaskTable.h"
#include "../task/Task.h"
#include "../Cost.h"
#include "Node.h"
#include "Connection.h"
#include "Node.h"
#include "RegisterHandler.h"
namespace
zilliz
{
...
...
@@ -29,92 +30,50 @@ enum class ResourceType {
GPU
=
2
};
enum
class
RegisterType
{
START_UP
,
ON_FINISH_TASK
,
ON_COPY_COMPLETED
,
ON_TASK_TABLE_UPDATED
,
};
class
Resource
:
public
Node
{
public:
void
Start
()
{
loader_thread_
=
std
::
thread
(
&
Resource
::
loader_function
,
this
);
executor_thread_
=
std
::
thread
(
&
Resource
::
executor_function
,
this
);
/*
* Event function MUST be a short function, never blocking;
*/
template
<
typename
T
>
void
Register_T
(
const
RegisterType
&
type
)
{
register_table_
.
emplace
(
type
,
[]
{
return
std
::
make_shared
<
T
>
();
});
}
RegisterHandlerPtr
GetRegisterFunc
(
const
RegisterType
&
type
);
void
Stop
()
{
running_
=
false
;
WakeupLoader
();
WakeupExecutor
();
}
Start
();
void
Stop
();
TaskTable
&
task_table
()
{
return
task_table_
;
}
task_table
();
public:
/*
* wake up executor;
*/
void
WakeupExecutor
()
{
exec_cv_
.
notify_one
();
}
WakeupExecutor
();
/*
* wake up loader;
*/
void
WakeupLoader
()
{
load_cv_
.
notify_one
();
}
public:
/*
* Event function MUST be a short function, never blocking;
*/
/*
* Register on start up event;
*/
void
RegisterOnStartUp
(
std
::
function
<
void
(
void
)
>
func
)
{
on_start_up_
=
func
;
}
/*
* Register on finish one task event;
*/
void
RegisterOnFinishTask
(
std
::
function
<
void
(
void
)
>
func
)
{
on_finish_task_
=
func
;
}
/*
* Register on copy task data completed event;
*/
void
RegisterOnCopyCompleted
(
std
::
function
<
void
(
void
)
>
func
)
{
on_copy_completed_
=
func
;
}
/*
* Register on task table updated event;
*/
void
RegisterOnTaskTableUpdated
(
std
::
function
<
void
(
void
)
>
func
)
{
on_task_table_updated_
=
func
;
}
WakeupLoader
();
protected:
Resource
(
std
::
string
name
,
ResourceType
type
)
:
name_
(
std
::
move
(
name
)),
type_
(
type
),
on_start_up_
(
nullptr
),
on_finish_task_
(
nullptr
),
on_copy_completed_
(
nullptr
),
on_task_table_updated_
(
nullptr
),
running_
(
false
),
load_flag_
(
false
),
exec_flag_
(
false
)
{
}
Resource
(
std
::
string
name
,
ResourceType
type
);
// TODO: SearchContextPtr to TaskPtr
/*
...
...
@@ -142,67 +101,27 @@ private:
* Order by start time;
*/
TaskPtr
pick_task_load
()
{
auto
indexes
=
PickToLoad
(
task_table_
,
3
);
for
(
auto
index
:
indexes
)
{
// try to set one task loading, then return
if
(
task_table_
.
Load
(
index
))
return
task_table_
.
Get
(
index
).
task
;
// else try next
}
return
nullptr
;
}
pick_task_load
();
/*
* Pick one task to execute;
* Pick by start time and priority;
*/
TaskPtr
pick_task_execute
()
{
auto
indexes
=
PickToExecute
(
task_table_
,
3
);
for
(
auto
index
:
indexes
)
{
// try to set one task executing, then return
if
(
task_table_
.
Execute
(
index
))
return
task_table_
.
Get
(
index
).
task
;
// else try next
}
return
nullptr
;
}
pick_task_execute
();
private:
/*
* Only called by load thread;
*/
void
loader_function
()
{
while
(
running_
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
load_mutex_
);
load_cv_
.
wait
(
lock
,
[
&
]
{
return
load_flag_
;
});
auto
task
=
pick_task_load
();
if
(
task
)
{
LoadFile
(
task
);
on_copy_completed_
();
}
}
}
loader_function
();
/*
* Only called by worker thread;
*/
void
executor_function
()
{
on_start_up_
();
while
(
running_
)
{
std
::
unique_lock
<
std
::
mutex
>
lock
(
exec_mutex_
);
exec_cv_
.
wait
(
lock
,
[
&
]
{
return
exec_flag_
;
});
auto
task
=
pick_task_execute
();
if
(
task
)
{
Process
(
task
);
on_finish_task_
();
}
}
}
executor_function
();
private:
...
...
@@ -211,10 +130,7 @@ private:
TaskTable
task_table_
;
std
::
function
<
void
(
void
)
>
on_start_up_
;
std
::
function
<
void
(
void
)
>
on_finish_task_
;
std
::
function
<
void
(
void
)
>
on_copy_completed_
;
std
::
function
<
void
(
void
)
>
on_task_table_updated_
;
std
::
map
<
RegisterType
,
std
::
function
<
RegisterHandlerPtr
()
>>
register_table_
;
bool
running_
;
std
::
thread
loader_thread_
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录