Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
milvus
提交
4bc7d367
milvus
项目概览
BaiXuePrincess
/
milvus
与 Fork 源项目一致
从无法访问的项目Fork
通知
7
Star
4
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
milvus
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
4bc7d367
编写于
10月 30, 2019
作者:
W
wxyu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Set task state MOVED after resource copy it completed
Former-commit-id: 5e42161b2303b49a6fac6a2acfad3899ad47c055
上级
56e734df
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
34 addition
and
49 deletion
+34
-49
CHANGELOG.md
CHANGELOG.md
+1
-0
core/src/scheduler/JobMgr.cpp
core/src/scheduler/JobMgr.cpp
+1
-1
core/src/scheduler/Scheduler.cpp
core/src/scheduler/Scheduler.cpp
+1
-1
core/src/scheduler/TaskTable.cpp
core/src/scheduler/TaskTable.cpp
+2
-17
core/src/scheduler/TaskTable.h
core/src/scheduler/TaskTable.h
+7
-11
core/src/scheduler/action/Action.h
core/src/scheduler/action/Action.h
+3
-3
core/src/scheduler/action/PushTaskToNeighbour.cpp
core/src/scheduler/action/PushTaskToNeighbour.cpp
+11
-9
core/src/scheduler/job/Job.cpp
core/src/scheduler/job/Job.cpp
+1
-1
core/src/scheduler/resource/Resource.cpp
core/src/scheduler/resource/Resource.cpp
+4
-0
core/unittest/scheduler/test_tasktable.cpp
core/unittest/scheduler/test_tasktable.cpp
+3
-6
未找到文件。
CHANGELOG.md
浏览文件 @
4bc7d367
...
...
@@ -15,6 +15,7 @@ Please mark all change in change log and use the ticket from JIRA.
-
\#
96 - Remove .a file in milvus/lib for docker-version
-
\#
118 - Using shared_ptr instead of weak_ptr to avoid performance loss
-
\#
122 - Add unique id for Job
-
\#
130 - Set task state MOVED after resource copy it completed
## Feature
-
\#
115 - Using new structure for tasktable
...
...
core/src/scheduler/JobMgr.cpp
浏览文件 @
4bc7d367
...
...
@@ -91,7 +91,7 @@ JobMgr::worker_function() {
// disk resources NEVER be empty.
if
(
auto
disk
=
res_mgr_
->
GetDiskResources
()[
0
].
lock
())
{
for
(
auto
&
task
:
tasks
)
{
disk
->
task_table
().
Put
(
task
);
disk
->
task_table
().
Put
(
task
,
nullptr
);
}
}
}
...
...
core/src/scheduler/Scheduler.cpp
浏览文件 @
4bc7d367
...
...
@@ -120,7 +120,7 @@ Scheduler::OnLoadCompleted(const EventPtr& event) {
if
(
resource
->
HasExecutor
()
==
false
)
{
load_completed_event
->
task_table_item_
->
Move
();
}
Action
::
PushTaskToAllNeighbour
(
load_completed_event
->
task_table_item_
->
task
,
resource
);
Action
::
PushTaskToAllNeighbour
(
load_completed_event
->
task_table_item_
,
resource
);
break
;
}
default:
{
break
;
}
...
...
core/src/scheduler/TaskTable.cpp
浏览文件 @
4bc7d367
...
...
@@ -264,8 +264,8 @@ TaskTable::PickToExecute(uint64_t limit) {
}
void
TaskTable
::
Put
(
TaskPtr
task
)
{
auto
item
=
std
::
make_shared
<
TaskTableItem
>
();
TaskTable
::
Put
(
TaskPtr
task
,
TaskTableItemPtr
from
)
{
auto
item
=
std
::
make_shared
<
TaskTableItem
>
(
std
::
move
(
from
)
);
item
->
id
=
id_
++
;
item
->
task
=
std
::
move
(
task
);
item
->
state
=
TaskTableItemState
::
START
;
...
...
@@ -276,21 +276,6 @@ TaskTable::Put(TaskPtr task) {
}
}
void
TaskTable
::
Put
(
std
::
vector
<
TaskPtr
>&
tasks
)
{
for
(
auto
&
task
:
tasks
)
{
auto
item
=
std
::
make_shared
<
TaskTableItem
>
();
item
->
id
=
id_
++
;
item
->
task
=
std
::
move
(
task
);
item
->
state
=
TaskTableItemState
::
START
;
item
->
timestamp
.
start
=
get_current_timestamp
();
table_
.
put
(
std
::
move
(
item
));
}
if
(
subscriber_
)
{
subscriber_
();
}
}
size_t
TaskTable
::
TaskToExecute
()
{
size_t
count
=
0
;
...
...
core/src/scheduler/TaskTable.h
浏览文件 @
4bc7d367
...
...
@@ -58,8 +58,12 @@ struct TaskTimestamp : public interface::dumpable {
Dump
()
const
override
;
};
struct
TaskTableItem
;
using
TaskTableItemPtr
=
std
::
shared_ptr
<
TaskTableItem
>
;
struct
TaskTableItem
:
public
interface
::
dumpable
{
TaskTableItem
()
:
id
(
0
),
task
(
nullptr
),
state
(
TaskTableItemState
::
INVALID
),
mutex
()
{
explicit
TaskTableItem
(
TaskTableItemPtr
f
=
nullptr
)
:
id
(
0
),
task
(
nullptr
),
state
(
TaskTableItemState
::
INVALID
),
mutex
(),
from
(
std
::
move
(
f
))
{
}
TaskTableItem
(
const
TaskTableItem
&
src
)
=
delete
;
...
...
@@ -70,6 +74,7 @@ struct TaskTableItem : public interface::dumpable {
TaskTableItemState
state
;
// the state;
std
::
mutex
mutex
;
TaskTimestamp
timestamp
;
TaskTableItemPtr
from
;
bool
IsFinish
();
...
...
@@ -96,8 +101,6 @@ struct TaskTableItem : public interface::dumpable {
Dump
()
const
override
;
};
using
TaskTableItemPtr
=
std
::
shared_ptr
<
TaskTableItem
>
;
class
TaskTable
:
public
interface
::
dumpable
{
public:
TaskTable
()
:
table_
(
1ULL
<<
16ULL
)
{
...
...
@@ -120,14 +123,7 @@ class TaskTable : public interface::dumpable {
* Put one task;
*/
void
Put
(
TaskPtr
task
);
/*
* Put tasks back of task table;
* Called by DBImpl;
*/
void
Put
(
std
::
vector
<
TaskPtr
>&
tasks
);
Put
(
TaskPtr
task
,
TaskTableItemPtr
from
=
nullptr
);
size_t
TaskToExecute
();
...
...
core/src/scheduler/action/Action.h
浏览文件 @
4bc7d367
...
...
@@ -28,13 +28,13 @@ namespace scheduler {
class
Action
{
public:
static
void
PushTaskToNeighbourRandomly
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
self
);
PushTaskToNeighbourRandomly
(
TaskTableItemPtr
task_item
,
const
ResourcePtr
&
self
);
static
void
PushTaskToAllNeighbour
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
self
);
PushTaskToAllNeighbour
(
TaskTableItemPtr
task_item
,
const
ResourcePtr
&
self
);
static
void
PushTaskToResource
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
dest
);
PushTaskToResource
(
TaskTableItemPtr
task_item
,
const
ResourcePtr
&
dest
);
static
void
DefaultLabelTaskScheduler
(
const
ResourceMgrPtr
&
res_mgr
,
ResourcePtr
resource
,
...
...
core/src/scheduler/action/PushTaskToNeighbour.cpp
浏览文件 @
4bc7d367
...
...
@@ -59,7 +59,7 @@ get_neighbours_with_connetion(const ResourcePtr& self) {
}
void
Action
::
PushTaskToNeighbourRandomly
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
self
)
{
Action
::
PushTaskToNeighbourRandomly
(
TaskTableItemPtr
task_item
,
const
ResourcePtr
&
self
)
{
auto
neighbours
=
get_neighbours_with_connetion
(
self
);
if
(
not
neighbours
.
empty
())
{
std
::
vector
<
uint64_t
>
speeds
;
...
...
@@ -78,7 +78,7 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self
for
(
uint64_t
i
=
0
;
i
<
speeds
.
size
();
++
i
)
{
rd_speed
-=
speeds
[
i
];
if
(
rd_speed
<=
0
)
{
neighbours
[
i
].
first
->
task_table
().
Put
(
task
);
neighbours
[
i
].
first
->
task_table
().
Put
(
task
_item
->
task
,
task_item
);
return
;
}
}
...
...
@@ -89,22 +89,23 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr& task, const ResourcePtr& self
}
void
Action
::
PushTaskToAllNeighbour
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
self
)
{
Action
::
PushTaskToAllNeighbour
(
TaskTableItemPtr
task_item
,
const
ResourcePtr
&
self
)
{
auto
neighbours
=
get_neighbours
(
self
);
for
(
auto
&
neighbour
:
neighbours
)
{
neighbour
->
task_table
().
Put
(
task
);
neighbour
->
task_table
().
Put
(
task
_item
->
task
,
task_item
);
}
}
void
Action
::
PushTaskToResource
(
const
TaskPtr
&
task
,
const
ResourcePtr
&
dest
)
{
dest
->
task_table
().
Put
(
task
);
Action
::
PushTaskToResource
(
TaskTableItemPtr
task_item
,
const
ResourcePtr
&
dest
)
{
dest
->
task_table
().
Put
(
task
_item
->
task
,
task_item
);
}
void
Action
::
DefaultLabelTaskScheduler
(
const
ResourceMgrPtr
&
res_mgr
,
ResourcePtr
resource
,
std
::
shared_ptr
<
LoadCompletedEvent
>
event
)
{
if
(
not
resource
->
HasExecutor
()
&&
event
->
task_table_item_
->
Move
())
{
auto
task_item
=
event
->
task_table_item_
;
auto
task
=
event
->
task_table_item_
->
task
;
auto
search_task
=
std
::
static_pointer_cast
<
XSearchTask
>
(
task
);
bool
moved
=
false
;
...
...
@@ -119,7 +120,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res
if
(
index
!=
nullptr
)
{
moved
=
true
;
auto
dest_resource
=
res_mgr
->
GetResource
(
ResourceType
::
GPU
,
i
);
PushTaskToResource
(
event
->
task_table_item_
->
task
,
dest_resource
);
PushTaskToResource
(
event
->
task_table_item_
,
dest_resource
);
break
;
}
}
...
...
@@ -127,7 +128,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res
}
if
(
not
moved
)
{
PushTaskToNeighbourRandomly
(
task
,
resource
);
PushTaskToNeighbourRandomly
(
task
_item
,
resource
);
}
}
}
...
...
@@ -135,6 +136,7 @@ Action::DefaultLabelTaskScheduler(const ResourceMgrPtr& res_mgr, ResourcePtr res
void
Action
::
SpecifiedResourceLabelTaskScheduler
(
const
ResourceMgrPtr
&
res_mgr
,
ResourcePtr
resource
,
std
::
shared_ptr
<
LoadCompletedEvent
>
event
)
{
auto
task_item
=
event
->
task_table_item_
;
auto
task
=
event
->
task_table_item_
->
task
;
if
(
resource
->
type
()
==
ResourceType
::
DISK
)
{
// step 1: calculate shortest path per resource, from disk to compute resource
...
...
@@ -213,7 +215,7 @@ Action::SpecifiedResourceLabelTaskScheduler(const ResourceMgrPtr& res_mgr, Resou
// next_res->task_table().Put(task);
// }
event
->
task_table_item_
->
Move
();
next_res
->
task_table
().
Put
(
task
);
next_res
->
task_table
().
Put
(
task
,
task_item
);
}
}
...
...
core/src/scheduler/job/Job.cpp
浏览文件 @
4bc7d367
...
...
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "Job.h"
#include "
scheduler/job/
Job.h"
namespace
milvus
{
namespace
scheduler
{
...
...
core/src/scheduler/resource/Resource.cpp
浏览文件 @
4bc7d367
...
...
@@ -180,6 +180,10 @@ Resource::loader_function() {
}
LoadFile
(
task_item
->
task
);
task_item
->
Loaded
();
if
(
task_item
->
from
)
{
task_item
->
from
->
Moved
();
task_item
->
from
=
nullptr
;
}
if
(
subscriber_
)
{
auto
event
=
std
::
make_shared
<
LoadCompletedEvent
>
(
shared_from_this
(),
task_item
);
subscriber_
(
std
::
static_pointer_cast
<
Event
>
(
event
));
...
...
core/unittest/scheduler/test_tasktable.cpp
浏览文件 @
4bc7d367
...
...
@@ -193,16 +193,13 @@ TEST_F(TaskTableBaseTest, PUT_INVALID_TEST) {
TEST_F
(
TaskTableBaseTest
,
PUT_BATCH
)
{
std
::
vector
<
milvus
::
scheduler
::
TaskPtr
>
tasks
{
task1_
,
task2_
};
empty_table_
.
Put
(
tasks
);
for
(
auto
&
task
:
tasks
)
{
empty_table_
.
Put
(
task
);
}
ASSERT_EQ
(
empty_table_
.
at
(
0
)
->
task
,
task1_
);
ASSERT_EQ
(
empty_table_
.
at
(
1
)
->
task
,
task2_
);
}
TEST_F
(
TaskTableBaseTest
,
PUT_EMPTY_BATCH
)
{
std
::
vector
<
milvus
::
scheduler
::
TaskPtr
>
tasks
{};
empty_table_
.
Put
(
tasks
);
}
TEST_F
(
TaskTableBaseTest
,
SIZE
)
{
ASSERT_EQ
(
empty_table_
.
size
(),
0
);
empty_table_
.
Put
(
task1_
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录