提交 a92342e3 编写于 作者: P peng.xu

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-422 Support DeleteTask in Multi-GpuResource case

See merge request megasearch/milvus!438

Former-commit-id: a61639ee813b8446f5472282d61e25bdd9b4ce08
......@@ -57,6 +57,8 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-415 - Add command tasktable to dump all tasktables
- MS-418 - Update server_config.template file, set CPU compute only default
- MS-419 - Move index_file_size from IndexParam to TableSchema
- MS-421 - Add TaskLabel in scheduler
- MS-422 - Support DeleteTask in Multi-GpuResource case
## New Feature
- MS-343 - Implement ResourceMgr
......
......@@ -54,7 +54,7 @@ ResourceMgr::Connect(const std::string &name1, const std::string &name2, Connect
auto res2 = get_resource_by_name(name2);
if (res1 && res2) {
res1->AddNeighbour(std::static_pointer_cast<Node>(res2), connection);
res2->AddNeighbour(std::static_pointer_cast<Node>(res1), connection);
// res2->AddNeighbour(std::static_pointer_cast<Node>(res1), connection);
}
}
......
......@@ -104,18 +104,29 @@ Scheduler::OnStartUp(const EventPtr &event) {
void
Scheduler::OnFinishTask(const EventPtr &event) {
if (auto resource = event->resource_.lock()) {
}
}
void
Scheduler::OnCopyCompleted(const EventPtr &event) {
auto load_completed_event = std::static_pointer_cast<CopyCompletedEvent>(event);
if (auto resource = event->resource_.lock()) {
resource->WakeupExecutor();
if (resource->Type() == ResourceType::DISK) {
Action::PushTaskToNeighbour(event->resource_);
} else {
Action::PushTaskToNeighbourHasExecutor(event->resource_);
auto task_table_type = load_completed_event->task_table_item_->task->label()->Type();
switch (task_table_type) {
case TaskLabelType::DEFAULT: {
if (not resource->HasExecutor() && load_completed_event->task_table_item_->Move()) {
Action::PushTaskToNeighbourRandomly(load_completed_event->task_table_item_->task, resource);
}
break;
}
case TaskLabelType::BROADCAST: {
Action::PushTaskToAllNeighbour(load_completed_event->task_table_item_->task, resource);
break;
}
default: {
break;
}
}
}
}
......
......@@ -14,23 +14,11 @@ namespace engine {
class Action {
public:
/*
* Push task to neighbour;
*/
static void
PushTaskToNeighbour(const ResourceWPtr &self);
PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self);
/*
* Push task to neighbour that has executor;
*/
static void
PushTaskToNeighbourHasExecutor(const ResourceWPtr &self);
/*
* Pull task From neighbour;
*/
static void
PullTaskFromNeighbour(const ResourceWPtr &self);
PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self);
};
......
......@@ -13,85 +13,37 @@ namespace zilliz {
namespace milvus {
namespace engine {
void
next(std::list<ResourcePtr> &neighbours, std::list<ResourcePtr>::iterator &it) {
it++;
if (neighbours.end() == it) {
it = neighbours.begin();
}
}
// TODO: this function called with only on tasks, so it will always push task to first neighbour
void
push_task_round_robin(TaskTable &self_task_table, std::list<ResourcePtr> &neighbours) {
CacheMgr cache;
auto it = neighbours.begin();
if (it == neighbours.end()) return;
auto indexes = PickToMove(self_task_table, cache, self_task_table.Size());
std::vector<ResourcePtr>
get_neighbours(const ResourcePtr &self) {
std::vector<ResourcePtr> neighbours;
for (auto &neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
if (not node) continue;
for (auto index : indexes) {
if (self_task_table.Move(index)) {
auto task = self_task_table.Get(index)->task;
// task = task->Clone();
(*it)->task_table().Put(task);
next(neighbours, it);
}
auto resource = std::static_pointer_cast<Resource>(node);
neighbours.emplace_back(resource);
}
return neighbours;
}
void
push_task_randomly(TaskTable &self_task_table, std::vector<ResourcePtr> &neighbours) {
Action::PushTaskToNeighbourRandomly(const TaskPtr &task,
const ResourcePtr &self) {
auto neighbours = get_neighbours(self);
std::random_device rd;
std::mt19937 mt(rd());
std::uniform_int_distribution<uint64_t> dist(0, neighbours.size() - 1);
CacheMgr cache;
auto indexes = PickToMove(self_task_table, cache, self_task_table.Size());
for (auto index : indexes) {
if (self_task_table.Move(index)) {
auto task = self_task_table.Get(index)->task;
neighbours[dist(mt)]->task_table().Put(task);
}
}
}
void
Action::PushTaskToNeighbour(const ResourceWPtr &res) {
auto self = res.lock();
if (not self) return;
std::list<ResourcePtr> neighbours;
for (auto &neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
if (not node) continue;
auto resource = std::static_pointer_cast<Resource>(node);
neighbours.emplace_back(resource);
}
push_task_round_robin(self->task_table(), neighbours);
}
void
Action::PushTaskToNeighbourHasExecutor(const ResourceWPtr &res) {
auto self = res.lock();
if (not self) return;
std::list<ResourcePtr> l_neighbours;
std::vector<ResourcePtr> v_neighbours;
for (auto &neighbour_node : self->GetNeighbours()) {
auto node = neighbour_node.neighbour_node.lock();
if (not node) continue;
auto resource = std::static_pointer_cast<Resource>(node);
if (resource->HasExecutor()) {
l_neighbours.push_back(resource);
v_neighbours.push_back(resource);
Action::PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self) {
auto neighbours = get_neighbours(self);
for (auto &neighbour : neighbours) {
neighbour->task_table().Put(task);
}
}
// push_task_round_robin(self->task_table(), l_neighbours);
push_task_randomly(self->task_table(), v_neighbours);
}
......
......@@ -29,6 +29,7 @@ namespace zilliz {
namespace milvus {
namespace engine {
// TODO(wxyu): Storage, Route, Executor
enum class ResourceType {
DISK = 0,
CPU = 1,
......
......@@ -5,10 +5,12 @@
******************************************************************************/
#pragma once
#include "db/scheduler/context/SearchContext.h"
#include "db/scheduler/task/IScheduleTask.h"
#include "scheduler/tasklabel/TaskLabel.h"
#include <string>
#include <memory>
#include <src/db/scheduler/context/SearchContext.h>
#include "src/db/scheduler/task/IScheduleTask.h"
namespace zilliz {
......@@ -36,6 +38,21 @@ public:
explicit
Task(TaskType type) : type_(type) {}
/*
* Just Getter;
*/
inline TaskType
Type() const { return type_; }
/*
* Getter and Setter;
*/
inline TaskLabelPtr &
label() {
return label_;
}
public:
virtual void
Load(LoadType type, uint8_t device_id) = 0;
......@@ -46,13 +63,11 @@ public:
virtual TaskPtr
Clone() = 0;
inline TaskType
Type() const { return type_; }
public:
std::vector<SearchContextPtr> search_contexts_;
ScheduleTaskPtr task_;
TaskType type_;
TaskLabelPtr label_ = nullptr;
};
......
......@@ -5,6 +5,8 @@
******************************************************************************/
#include "TaskConvert.h"
#include "scheduler/tasklabel/DefaultLabel.h"
#include "scheduler/tasklabel/BroadcastLabel.h"
namespace zilliz {
......@@ -17,6 +19,7 @@ TaskConvert(const ScheduleTaskPtr &schedule_task) {
case ScheduleTaskType::kIndexLoad: {
auto load_task = std::static_pointer_cast<IndexLoadTask>(schedule_task);
auto task = std::make_shared<XSearchTask>(load_task->file_);
task->label() = std::make_shared<DefaultLabel>();
task->search_contexts_ = load_task->search_contexts_;
task->task_ = schedule_task;
return task;
......@@ -24,6 +27,7 @@ TaskConvert(const ScheduleTaskPtr &schedule_task) {
case ScheduleTaskType::kDelete: {
auto delete_task = std::static_pointer_cast<DeleteTask>(schedule_task);
auto task = std::make_shared<XDeleteTask>(delete_task->context_);
task->label() = std::make_shared<BroadcastLabel>();
return task;
}
default: {
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "TaskLabel.h"
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
class BroadcastLabel : public TaskLabel {
public:
BroadcastLabel() : TaskLabel(TaskLabelType::BROADCAST) {}
};
using BroadcastLabelPtr = std::shared_ptr<BroadcastLabel>;
}
}
}
......@@ -3,19 +3,23 @@
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "Action.h"
#include "TaskLabel.h"
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
void
Action::PullTaskFromNeighbour(const ResourceWPtr &self) {
// TODO: implement
}
class DefaultLabel : public TaskLabel {
public:
DefaultLabel() : TaskLabel(TaskLabelType::DEFAULT) {}
};
using DefaultLabelPtr = std::shared_ptr<DefaultLabel>;
}
}
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "TaskLabel.h"
#include <string>
#include <memory>
class Resource;
using ResourceWPtr = std::weak_ptr<Resource>;
namespace zilliz {
namespace milvus {
namespace engine {
class SpecResLabel : public TaskLabel {
public:
SpecResLabel(const ResourceWPtr &resource)
: TaskLabel(TaskLabelType::SPECIAL_RESOURCE), resource_(resource) {}
inline ResourceWPtr &
resource() const {
return resource_;
}
inline std::string &
resource_name() const {
return resource_name_;
}
private:
ResourceWPtr resource_;
std::string resource_name_;
}
using SpecResLabelPtr = std::make_shared<SpecResLabel>;
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
enum class TaskLabelType {
DEFAULT, // means can be executed in any resource
SPECIAL_RESOURCE, // means must executing in special resource
BROADCAST, // means all enable-executor resource must execute task
};
class TaskLabel {
public:
inline TaskLabelType
Type() const {
return type_;
}
protected:
TaskLabel(TaskLabelType type) : type_(type) {}
private:
TaskLabelType type_;
};
using TaskLabelPtr = std::shared_ptr<TaskLabel>;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册