提交 6c42b3e9 编写于 作者: W wxyu

MS-459 Add cache for pick function in tasktable


Former-commit-id: ad4fc4b67499cc06b0fe0937c1ec16566d483239
上级 01f2db33
......@@ -74,6 +74,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-442 - Merge Knowhere
- MS-445 - Rename CopyCompleted to LoadCompleted
- MS-451 - Update server_config.template file, set GPU compute default
- MS-459 - Add cache for pick function in tasktable
## New Feature
- MS-343 - Implement ResourceMgr
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Cost.h"
namespace zilliz {
namespace milvus {
namespace engine {
std::vector<uint64_t>
PickToMove(TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit) {
std::vector<uint64_t> indexes;
for (uint64_t i = 0, count = 0; i < task_table.Size() && count < limit; ++i) {
if (task_table[i]->state == TaskTableItemState::LOADED) {
indexes.push_back(i);
++count;
}
}
return indexes;
}
std::vector<uint64_t>
PickToLoad(TaskTable &task_table, uint64_t limit) {
std::vector<uint64_t> indexes;
for (uint64_t i = 0, count = 0; i < task_table.Size() && count < limit; ++i) {
if (task_table[i]->state == TaskTableItemState::START) {
indexes.push_back(i);
++count;
}
}
return indexes;
}
std::vector<uint64_t>
PickToExecute(TaskTable &task_table, uint64_t limit) {
std::vector<uint64_t> indexes;
for (uint64_t i = 0, count = 0; i < task_table.Size() && count < limit; ++i) {
if (task_table[i]->state == TaskTableItemState::LOADED) {
indexes.push_back(i);
++count;
}
}
return indexes;
}
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <vector>
#include "task/Task.h"
#include "TaskTable.h"
#include "CacheMgr.h"
namespace zilliz {
namespace milvus {
namespace engine {
// TODO: Policy interface
// TODO: collect statistics
/*
* select tasks to move;
* call from scheduler;
*/
std::vector<uint64_t>
PickToMove(TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit);
/*
* select task to load
* call from resource;
* I DONT SURE NEED THIS;
*/
std::vector<uint64_t>
PickToLoad(TaskTable &task_table, uint64_t limit);
/*
* select task to execute;
* call from resource;
* I DONT SURE NEED THIS;
*/
std::vector<uint64_t>
PickToExecute(TaskTable &task_table, uint64_t limit);
}
}
}
......@@ -6,7 +6,6 @@
#include <src/cache/GpuCacheMgr.h>
#include "Scheduler.h"
#include "Cost.h"
#include "action/Action.h"
......
......@@ -53,6 +53,11 @@ ToString(const TaskTimestamp &timestamp) {
return ss.str();
}
bool
TaskTableItem::IsFinish() {
return state == TaskTableItemState::MOVED || state == TaskTableItemState::EXECUTED;
}
bool
TaskTableItem::Load() {
std::unique_lock<std::mutex> lock(mutex);
......@@ -133,6 +138,38 @@ TaskTableItem::Dump() {
return ss.str();
}
std::vector<uint64_t>
TaskTable::PickToLoad(uint64_t limit) {
std::vector<uint64_t> indexes;
bool cross = false;
for (uint64_t i = last_finish_, count = 0; i < table_.size() && count < limit; ++i) {
if (not cross && table_[i]->IsFinish()) {
last_finish_ = i;
} else if (table_[i]->state == TaskTableItemState::START) {
cross = true;
indexes.push_back(i);
++count;
}
}
return indexes;
}
std::vector<uint64_t>
TaskTable::PickToExecute(uint64_t limit) {
std::vector<uint64_t> indexes;
bool cross = false;
for (uint64_t i = last_finish_, count = 0; i < table_.size() && count < limit; ++i) {
if (not cross && table_[i]->IsFinish()) {
last_finish_ = i;
} else if (table_[i]->state == TaskTableItemState::LOADED) {
cross = true;
indexes.push_back(i);
++count;
}
}
return indexes;
}
void
TaskTable::Put(TaskPtr task) {
std::lock_guard<std::mutex> lock(id_mutex_);
......
......@@ -54,6 +54,9 @@ struct TaskTableItem {
uint8_t priority; // just a number, meaningless;
bool
IsFinish();
bool
Load();
......@@ -141,6 +144,13 @@ public:
std::deque<TaskTableItemPtr>::iterator begin() { return table_.begin(); }
std::deque<TaskTableItemPtr>::iterator end() { return table_.end(); }
public:
std::vector<uint64_t>
PickToLoad(uint64_t limit);
std::vector<uint64_t>
PickToExecute(uint64_t limit);
public:
/******** Action ********/
......@@ -182,7 +192,7 @@ public:
* Called by executor;
*/
inline bool
Executed(uint64_t index){
Executed(uint64_t index) {
return table_[index]->Executed();
}
......@@ -193,7 +203,7 @@ public:
*/
inline bool
Move(uint64_t index){
Move(uint64_t index) {
return table_[index]->Move();
}
......@@ -203,7 +213,7 @@ public:
* Called by scheduler;
*/
inline bool
Moved(uint64_t index){
Moved(uint64_t index) {
return table_[index]->Moved();
}
......@@ -220,6 +230,9 @@ private:
mutable std::mutex id_mutex_;
std::deque<TaskTableItemPtr> table_;
std::function<void(void)> subscriber_ = nullptr;
// cache last finish avoid Pick task from begin always
uint64_t last_finish_ = 0;
};
......
......@@ -80,7 +80,7 @@ void Resource::WakeupExecutor() {
}
TaskTableItemPtr Resource::pick_task_load() {
auto indexes = PickToLoad(task_table_, 10);
auto indexes = task_table_.PickToLoad(10);
for (auto index : indexes) {
// try to set one task loading, then return
if (task_table_.Load(index))
......@@ -91,7 +91,7 @@ TaskTableItemPtr Resource::pick_task_load() {
}
TaskTableItemPtr Resource::pick_task_execute() {
auto indexes = PickToExecute(task_table_, 3);
auto indexes = task_table_.PickToExecute(3);
for (auto index : indexes) {
// try to set one task executing, then return
if (task_table_.Execute(index))
......
......@@ -19,7 +19,6 @@
#include "../event/TaskTableUpdatedEvent.h"
#include "../TaskTable.h"
#include "../task/Task.h"
#include "../Cost.h"
#include "Connection.h"
#include "Node.h"
#include "RegisterHandler.h"
......
#include "scheduler/TaskTable.h"
#include "scheduler/Cost.h"
#include <gtest/gtest.h>
#include "scheduler/task/TestTask.h"
using namespace zilliz::milvus::engine;
class CostTest : public ::testing::Test {
protected:
void
SetUp() override {
TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < 8; ++i) {
auto task = std::make_shared<TestTask>(dummy);
table_.Put(task);
}
table_.Get(0)->state = TaskTableItemState::INVALID;
table_.Get(1)->state = TaskTableItemState::START;
table_.Get(2)->state = TaskTableItemState::LOADING;
table_.Get(3)->state = TaskTableItemState::LOADED;
table_.Get(4)->state = TaskTableItemState::EXECUTING;
table_.Get(5)->state = TaskTableItemState::EXECUTED;
table_.Get(6)->state = TaskTableItemState::MOVING;
table_.Get(7)->state = TaskTableItemState::MOVED;
}
TaskTable table_;
};
TEST_F(CostTest, pick_to_move) {
CacheMgr cache;
auto indexes = PickToMove(table_, cache, 10);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0], 3);
}
TEST_F(CostTest, pick_to_load) {
auto indexes = PickToLoad(table_, 10);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0], 1);
}
TEST_F(CostTest, pick_to_executed) {
auto indexes = PickToExecute(table_, 10);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0], 3);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册