提交 f944df41 编写于 作者: Y Yu Kun

Merge remote-tracking branch 'upstream/branch-0.4.0' into branch-0.4.0


Former-commit-id: 81ebc3a6ab38577ba4945b7db4022d9725586f19
......@@ -8,7 +8,7 @@ container('publish-docker') {
sh "curl -O -u anonymous: ftp://192.168.1.126/data/${PROJECT_NAME}/engine/${JOB_NAME}-${BUILD_ID}/${PROJECT_NAME}-engine-${PACKAGE_VERSION}.tar.gz"
sh "tar zxvf ${PROJECT_NAME}-engine-${PACKAGE_VERSION}.tar.gz"
try {
docker.withRegistry('https://registry.zilliz.com', 'a54e38ef-c424-4ea9-9224-b25fc20e3924') {
docker.withRegistry('https://registry.zilliz.com', '${params.DOCKER_PUBLISH_USER}') {
def customImage = docker.build("${PROJECT_NAME}/engine:${DOCKER_VERSION}")
customImage.push()
}
......
......@@ -16,6 +16,22 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-350 - Remove knowhere submodule
- MS-354 - Add task class and interface in scheduler
- MS-355 - Add copy interface in ExcutionEngine
- MS-357 - Add minimum schedule function
- MS-359 - Add cost test in new scheduler
- MS-361 - Add event in resource
- MS-364 - Modify tasktableitem in tasktable
- MS-365 - Use tasktableitemptr instead in event
- MS-366 - Implement TaskTable
- MS-368 - Implement cost.cpp
- MS-371 - Add TaskTableUpdatedEvent
- MS-373 - Add resource test
- MS-374 - Add action definition
- MS-375 - Add Dump implementation for Event
- MS-376 - Add loader and executor enable flag in Resource avoid diskresource execute task
- MS-377 - Improve process thread trigger in ResourceMgr, Scheduler and TaskTable
- MS-378 - Debug and Update normal_test in scheduler unittest
- MS-379 - Add Dump implementation in Resource
- MS-380 - Update resource loader and executor, work util all finished
## New Feature
- MS-343 - Implement ResourceMgr
......
......@@ -606,9 +606,9 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
&TableFileSchema::engine_type_);
auto match_tableid = c(&TableFileSchema::table_id_) == table_id;
auto is_raw = c(&TableFileSchema::file_type_) == (int) TableFileSchema::RAW;
auto is_toindex = c(&TableFileSchema::file_type_) == (int) TableFileSchema::TO_INDEX;
auto is_index = c(&TableFileSchema::file_type_) == (int) TableFileSchema::INDEX;
std::vector<int> file_type = {(int) TableFileSchema::RAW, (int) TableFileSchema::TO_INDEX, (int) TableFileSchema::INDEX};
auto match_type = in(&TableFileSchema::file_type_, file_type);
TableSchema table_schema;
table_schema.table_id_ = table_id;
......@@ -617,23 +617,23 @@ Status SqliteMetaImpl::FilesToSearch(const std::string &table_id,
decltype(ConnectorPtr->select(select_columns)) result;
if (partition.empty() && ids.empty()) {
auto filter = where(match_tableid and (is_raw or is_toindex or is_index));
auto filter = where(match_tableid and match_type);
result = ConnectorPtr->select(select_columns, filter);
}
else if (partition.empty() && !ids.empty()) {
auto match_fileid = in(&TableFileSchema::id_, ids);
auto filter = where(match_tableid and match_fileid and (is_raw or is_toindex or is_index));
auto filter = where(match_tableid and match_fileid and match_type);
result = ConnectorPtr->select(select_columns, filter);
}
else if (!partition.empty() && ids.empty()) {
auto match_date = in(&TableFileSchema::date_, partition);
auto filter = where(match_tableid and match_date and (is_raw or is_toindex or is_index));
auto filter = where(match_tableid and match_date and match_type);
result = ConnectorPtr->select(select_columns, filter);
}
else if (!partition.empty() && !ids.empty()) {
auto match_fileid = in(&TableFileSchema::id_, ids);
auto match_date = in(&TableFileSchema::date_, partition);
auto filter = where(match_tableid and match_fileid and match_date and (is_raw or is_toindex or is_index));
auto filter = where(match_tableid and match_fileid and match_date and match_type);
result = ConnectorPtr->select(select_columns, filter);
}
......
......@@ -12,22 +12,40 @@ namespace milvus {
namespace engine {
std::vector<uint64_t>
PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) {
PickToMove(TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit) {
std::vector<uint64_t> indexes;
for (uint64_t i = 0, count = 0; i < task_table.Size() && count < limit; ++i) {
if (task_table[i]->state == TaskTableItemState::LOADED) {
indexes.push_back(i);
++count;
}
}
return indexes;
}
std::vector<uint64_t>
PickToLoad(const TaskTable &task_table, uint64_t limit) {
PickToLoad(TaskTable &task_table, uint64_t limit) {
std::vector<uint64_t> indexes;
for (uint64_t i = 0, count = 0; i < task_table.Size() && count < limit; ++i) {
if (task_table[i]->state == TaskTableItemState::START) {
indexes.push_back(i);
++count;
}
}
return indexes;
}
std::vector<uint64_t>
PickToExecute(const TaskTable &task_table, uint64_t limit) {
PickToExecute(TaskTable &task_table, uint64_t limit) {
std::vector<uint64_t> indexes;
for (uint64_t i = 0, count = 0; i < task_table.Size() && count < limit; ++i) {
if (task_table[i]->state == TaskTableItemState::LOADED) {
indexes.push_back(i);
++count;
}
}
return indexes;
}
......
......@@ -23,7 +23,7 @@ namespace engine {
* call from scheduler;
*/
std::vector<uint64_t>
PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit);
PickToMove(TaskTable &task_table, const CacheMgr &cache_mgr, uint64_t limit);
/*
......@@ -32,7 +32,7 @@ PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit)
* I DONT SURE NEED THIS;
*/
std::vector<uint64_t>
PickToLoad(const TaskTable &task_table, uint64_t limit);
PickToLoad(TaskTable &task_table, uint64_t limit);
/*
* select task to execute;
......@@ -40,7 +40,7 @@ PickToLoad(const TaskTable &task_table, uint64_t limit);
* I DONT SURE NEED THIS;
*/
std::vector<uint64_t>
PickToExecute(const TaskTable &task_table, uint64_t limit);
PickToExecute(TaskTable &task_table, uint64_t limit);
}
......
......@@ -7,6 +7,7 @@
#include "ResourceMgr.h"
#include "db/Log.h"
namespace zilliz {
namespace milvus {
namespace engine {
......@@ -21,30 +22,18 @@ ResourceMgr::Add(ResourcePtr &&resource) {
ResourceWPtr ret(resource);
std::lock_guard<std::mutex> lck(resources_mutex_);
if(running_) {
if (running_) {
ENGINE_LOG_ERROR << "ResourceMgr is running, not allow to add resource";
return ret;
}
if (resource->Type() == ResourceType::DISK) {
disk_resources_.emplace_back(ResourceWPtr(resource));
}
resources_.emplace_back(resource);
size_t index = resources_.size() - 1;
resource->RegisterOnStartUp([&] {
start_up_event_[index] = true;
event_cv_.notify_one();
});
resource->RegisterOnFinishTask([&] {
finish_task_event_[index] = true;
event_cv_.notify_one();
});
resource->RegisterOnCopyCompleted([&] {
copy_completed_event_[index] = true;
event_cv_.notify_one();
});
resource->RegisterOnTaskTableUpdated([&] {
task_table_updated_event_[index] = true;
event_cv_.notify_one();
});
resource->RegisterSubscriber(std::bind(&ResourceMgr::PostEvent, this, std::placeholders::_1));
return ret;
}
......@@ -53,41 +42,11 @@ ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connect
if (auto observe_a = res1.lock()) {
if (auto observe_b = res2.lock()) {
observe_a->AddNeighbour(std::static_pointer_cast<Node>(observe_b), connection);
observe_b->AddNeighbour(std::static_pointer_cast<Node>(observe_a), connection);
}
}
}
void
ResourceMgr::EventProcess() {
while (running_) {
std::unique_lock <std::mutex> lock(resources_mutex_);
event_cv_.wait(lock, [this] { return !resources_.empty(); });
if(!running_) {
break;
}
for (uint64_t i = 0; i < resources_.size(); ++i) {
ResourceWPtr res(resources_[i]);
if (start_up_event_[i]) {
on_start_up_(res);
start_up_event_[i] = false;
}
if (finish_task_event_[i]) {
on_finish_task_(res);
finish_task_event_[i] = false;
}
if (copy_completed_event_[i]) {
on_copy_completed_(res);
copy_completed_event_[i] = false;
}
if (task_table_updated_event_[i]) {
on_task_table_updated_(res);
task_table_updated_event_[i] = false;
}
}
}
}
void
ResourceMgr::Start() {
......@@ -95,23 +54,33 @@ ResourceMgr::Start() {
for (auto &resource : resources_) {
resource->Start();
}
worker_thread_ = std::thread(&ResourceMgr::EventProcess, this);
running_ = true;
worker_thread_ = std::thread(&ResourceMgr::event_process, this);
}
void
ResourceMgr::Stop() {
std::lock_guard<std::mutex> lck(resources_mutex_);
running_ = false;
{
std::lock_guard<std::mutex> lock(event_mutex_);
running_ = false;
queue_.push(nullptr);
event_cv_.notify_one();
}
worker_thread_.join();
std::lock_guard<std::mutex> lck(resources_mutex_);
for (auto &resource : resources_) {
resource->Stop();
}
}
void
ResourceMgr::PostEvent(const EventPtr &event) {
std::unique_lock<std::mutex> lock(event_mutex_);
queue_.emplace(event);
event_cv_.notify_one();
}
std::string
ResourceMgr::Dump() {
std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n";
......@@ -124,6 +93,26 @@ ResourceMgr::Dump() {
return str;
}
void
ResourceMgr::event_process() {
while (running_) {
std::unique_lock<std::mutex> lock(event_mutex_);
event_cv_.wait(lock, [this] { return !queue_.empty(); });
auto event = queue_.front();
if (event == nullptr) {
break;
}
// ENGINE_LOG_DEBUG << "ResourceMgr process " << *event;
queue_.pop();
if (subscriber_) {
subscriber_(event);
}
}
}
}
}
}
......@@ -10,9 +10,12 @@
#include <vector>
#include <memory>
#include <mutex>
#include <queue>
#include <condition_variable>
#include "resource/Resource.h"
#include "utils/Log.h"
namespace zilliz {
namespace milvus {
......@@ -23,6 +26,15 @@ public:
ResourceMgr();
/******** Management Interface ********/
inline void
RegisterSubscriber(std::function<void(EventPtr)> subscriber) {
subscriber_ = std::move(subscriber);
}
std::vector<ResourceWPtr> &
GetDiskResources() {
return disk_resources_;
}
/*
* Add resource into Resource Management;
......@@ -48,43 +60,10 @@ public:
void
Stop();
// TODO: add stats interface(low)
public:
/******** Event Register Interface ********/
/*
* Register on start up event;
*/
void
RegisterOnStartUp(std::function<void(ResourceWPtr)> &func) {
on_start_up_ = func;
}
PostEvent(const EventPtr& event);
/*
* Register on finish one task event;
*/
void
RegisterOnFinishTask(std::function<void(ResourceWPtr)> &func) {
on_finish_task_ = func;
}
/*
* Register on copy task data completed event;
*/
void
RegisterOnCopyCompleted(std::function<void(ResourceWPtr)> &func) {
on_copy_completed_ = func;
}
/*
* Register on task table updated event;
*/
void
RegisterOnTaskTableUpdated(std::function<void(ResourceWPtr)> &func) {
on_task_table_updated_ = func;
}
// TODO: add stats interface(low)
public:
/******** Utlitity Functions ********/
......@@ -94,27 +73,25 @@ public:
private:
void
EventProcess();
event_process();
private:
std::queue<EventPtr> queue_;
std::function<void(EventPtr)> subscriber_ = nullptr;
bool running_;
std::vector<ResourceWPtr> disk_resources_;
std::vector<ResourcePtr> resources_;
mutable std::mutex resources_mutex_;
std::thread worker_thread_;
std::mutex event_mutex_;
std::condition_variable event_cv_;
std::vector<bool> start_up_event_;
std::vector<bool> finish_task_event_;
std::vector<bool> copy_completed_event_;
std::vector<bool> task_table_updated_event_;
std::function<void(ResourceWPtr)> on_start_up_;
std::function<void(ResourceWPtr)> on_finish_task_;
std::function<void(ResourceWPtr)> on_copy_completed_;
std::function<void(ResourceWPtr)> on_task_table_updated_;
};
using ResourceMgrPtr = std::shared_ptr<ResourceMgr>;
using ResourceMgrWPtr = std::weak_ptr<ResourceMgr>;
}
......
......@@ -5,40 +5,46 @@
******************************************************************************/
#include "Scheduler.h"
#include "Cost.h"
#include "action/Action.h"
namespace zilliz {
namespace milvus {
namespace engine {
void
StartUpEvent::Process() {
Scheduler::Scheduler(ResourceMgrWPtr res_mgr)
: running_(false),
res_mgr_(std::move(res_mgr)) {
if (auto mgr = res_mgr_.lock()) {
mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1));
}
}
void
FinishTaskEvent::Process() {
// for (nei : res->neighbours) {
// tasks = cost(nei->task_table(), nei->connection, limit = 3)
// res->task_table()->PutTasks(tasks);
// }
// res->WakeUpExec();
}
void
CopyCompletedEvent::Process() {
Scheduler::Start() {
running_ = true;
worker_thread_ = std::thread(&Scheduler::worker_function, this);
}
void
TaskTableUpdatedEvent::Process() {
Scheduler::Stop() {
{
std::lock_guard<std::mutex> lock(event_mutex_);
running_ = false;
event_queue_.push(nullptr);
event_cv_.notify_one();
}
worker_thread_.join();
}
void
Scheduler::Start() {
worker_thread_ = std::thread(&Scheduler::worker_thread_, this);
Scheduler::PostEvent(const EventPtr &event) {
std::lock_guard<std::mutex> lock(event_mutex_);
event_queue_.push(event);
event_cv_.notify_one();
// SERVER_LOG_DEBUG << "Scheduler post " << *event;
}
std::string
......@@ -49,8 +55,76 @@ Scheduler::Dump() {
void
Scheduler::worker_function() {
while (running_) {
std::unique_lock<std::mutex> lock(event_mutex_);
event_cv_.wait(lock, [this] { return !event_queue_.empty(); });
auto event = event_queue_.front();
event->Process();
if (event == nullptr) {
break;
}
// SERVER_LOG_DEBUG << "Scheduler process " << *event;
event_queue_.pop();
Process(event);
}
}
void
Scheduler::Process(const EventPtr &event) {
switch (event->Type()) {
case EventType::START_UP: {
OnStartUp(event);
break;
}
case EventType::COPY_COMPLETED: {
OnCopyCompleted(event);
break;
}
case EventType::FINISH_TASK: {
OnFinishTask(event);
break;
}
case EventType::TASK_TABLE_UPDATED: {
OnTaskTableUpdated(event);
break;
}
default: {
// TODO: logging
break;
}
}
}
void
Scheduler::OnStartUp(const EventPtr &event) {
if (auto resource = event->resource_.lock()) {
resource->WakeupLoader();
}
}
void
Scheduler::OnFinishTask(const EventPtr &event) {
if (auto resource = event->resource_.lock()) {
resource->WakeupExecutor();
}
}
void
Scheduler::OnCopyCompleted(const EventPtr &event) {
if (auto resource = event->resource_.lock()) {
resource->WakeupLoader();
resource->WakeupExecutor();
if (resource->Type()== ResourceType::DISK) {
Action::PushTaskToNeighbour(event->resource_);
}
}
}
void
Scheduler::OnTaskTableUpdated(const EventPtr &event) {
// Action::PushTaskToNeighbour(event->resource_);
if (auto resource = event->resource_.lock()) {
resource->WakeupLoader();
}
}
......
......@@ -5,6 +5,7 @@
******************************************************************************/
#pragma once
#include <memory>
#include <string>
#include <mutex>
#include <thread>
......@@ -12,128 +13,93 @@
#include "resource/Resource.h"
#include "ResourceMgr.h"
#include "utils/Log.h"
namespace zilliz {
namespace milvus {
namespace engine {
class Event {
public:
explicit
Event(ResourceWPtr &resource) : resource_(resource) {}
public:
virtual void
Process() = 0;
private:
ResourceWPtr resource_;
};
using EventPtr = std::shared_ptr<Event>;
class StartUpEvent : public Event {
class Scheduler {
public:
explicit
StartUpEvent(ResourceWPtr &resource) : Event(resource) {}
public:
void
Process() override;
};
Scheduler(ResourceMgrWPtr res_mgr);
class FinishTaskEvent : public Event {
public:
explicit
FinishTaskEvent(ResourceWPtr &resource) : Event(resource) {}
Scheduler(const Scheduler &) = delete;
Scheduler(Scheduler &&) = delete;
public:
/*
* Start worker thread;
*/
void
Process() override;
};
class CopyCompletedEvent : public Event {
public:
explicit
CopyCompletedEvent(ResourceWPtr &resource) : Event(resource) {}
Start();
public:
/*
* Stop worker thread, join it;
*/
void
Process() override;
};
Stop();
class TaskTableUpdatedEvent : public Event {
public:
explicit
TaskTableUpdatedEvent(ResourceWPtr &resource) : Event(resource) {}
public:
/*
* Post event to scheduler event queue;
*/
void
Process() override;
};
class Scheduler {
public:
explicit
Scheduler(ResourceMgrWPtr res_mgr)
: running_(false),
res_mgr_(std::move(res_mgr)) {
// res_mgr.Register();
// res_mgr.Register();
// res_mgr.Register();
// res_mgr.Register();
}
PostEvent(const EventPtr &event);
void
Start();
/*
* Dump as string;
*/
std::string
Dump();
public:
private:
/******** Events ********/
/*
* Process start up events;
*
* Actions:
* Pull task from neighbours;
*/
inline void
OnStartUp(ResourceWPtr &resource) {
auto event = std::make_shared<StartUpEvent>(resource);
event_queue_.push(event);
}
void
OnStartUp(const EventPtr &event);
/*
* Process finish task events;
*
* Actions:
* Pull task from neighbours;
*/
inline void
OnFinishTask(ResourceWPtr &resource) {
auto event = std::make_shared<FinishTaskEvent>(resource);
event_queue_.push(event);
}
void
OnFinishTask(const EventPtr &event);
/*
* Process copy completed events;
*
* Actions:
* Mark task source MOVED;
* Pull task from neighbours;
*/
inline void
OnCopyCompleted(ResourceWPtr &resource) {
auto event = std::make_shared<CopyCompletedEvent>(resource);
event_queue_.push(event);
}
void
OnCopyCompleted(const EventPtr &event);
/*
* Process task table updated events;
* Process task table updated events, which happened on task_table->put;
*
* Actions:
* Push task to neighbours;
*/
inline void
OnTaskTableUpdated(ResourceWPtr &resource) {
auto event = std::make_shared<TaskTableUpdatedEvent>(resource);
event_queue_.push(event);
}
public:
std::string
Dump();
void
OnTaskTableUpdated(const EventPtr &event);
private:
/*
* Dispatch event to event handler;
*/
void
Process(const EventPtr &event);
/*
* Called by worker_thread_;
*/
......@@ -146,8 +112,12 @@ private:
ResourceMgrWPtr res_mgr_;
std::queue<EventPtr> event_queue_;
std::thread worker_thread_;
std::mutex event_mutex_;
std::condition_variable event_cv_;
};
using SchedulerPtr = std::shared_ptr<Scheduler>;
}
}
}
......
......@@ -5,29 +5,46 @@
******************************************************************************/
#include "TaskTable.h"
#include "event/TaskTableUpdatedEvent.h"
#include <vector>
#include <sstream>
namespace zilliz {
namespace milvus {
namespace engine {
TaskTable::TaskTable(std::vector<TaskPtr> &&tasks) {
}
void
TaskTable::Put(TaskPtr task) {
std::lock_guard<std::mutex> lock(id_mutex_);
auto item = std::make_shared<TaskTableItem>();
item->id = id_++;
item->task = std::move(task);
item->state = TaskTableItemState::START;
table_.push_back(item);
if (subscriber_) {
subscriber_();
}
}
void
TaskTable::Put(std::vector<TaskPtr> &tasks) {
std::lock_guard<std::mutex> lock(id_mutex_);
for (auto &task : tasks) {
auto item = std::make_shared<TaskTableItem>();
item->id = id_++;
item->task = std::move(task);
item->state = TaskTableItemState::START;
table_.push_back(item);
}
if (subscriber_) {
subscriber_();
}
}
TaskTableItem &
TaskTableItemPtr
TaskTable::Get(uint64_t index) {
return table_[index];
}
......@@ -46,9 +63,9 @@ bool
TaskTable::Move(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task.mutex);
if (task.state == TaskTableItemState::START) {
task.state = TaskTableItemState::LOADING;
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::LOADED) {
task->state = TaskTableItemState::MOVING;
return true;
}
return false;
......@@ -56,32 +73,88 @@ TaskTable::Move(uint64_t index) {
bool
TaskTable::Moved(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::MOVING) {
task->state = TaskTableItemState::MOVED;
return true;
}
return false;
}
bool
TaskTable::Load(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::START) {
task->state = TaskTableItemState::LOADING;
return true;
}
return false;
}
bool
TaskTable::Loaded(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::LOADING) {
task->state = TaskTableItemState::LOADED;
return true;
}
return false;
}
bool
TaskTable::Execute(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::LOADED) {
task->state = TaskTableItemState::EXECUTING;
return true;
}
return false;
}
bool
TaskTable::Executed(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task->mutex);
if (task->state == TaskTableItemState::EXECUTING) {
task->state = TaskTableItemState::EXECUTED;
return true;
}
return false;
}
std::string
ToString(TaskTableItemState state) {
switch (state) {
case TaskTableItemState::INVALID: return "INVALID";
case TaskTableItemState::START: return "START";
case TaskTableItemState::LOADING: return "LOADING";
case TaskTableItemState::LOADED: return "LOADED";
case TaskTableItemState::EXECUTING: return "EXECUTING";
case TaskTableItemState::EXECUTED: return "EXECUTED";
case TaskTableItemState::MOVING: return "MOVING";
case TaskTableItemState::MOVED: return "MOVED";
default: return "";
}
}
std::string
TaskTable::Dump() {
return std::string();
std::stringstream ss;
for (auto &item : table_) {
ss << "<" << item->id;
ss << ", " << ToString(item->state);
ss << ">" << std::endl;
}
return ss.str();
}
}
......
......@@ -10,6 +10,7 @@
#include <mutex>
#include "task/SearchTask.h"
#include "event/Event.h"
namespace zilliz {
......@@ -31,7 +32,7 @@ struct TaskTableItem {
TaskTableItem() : id(0), state(TaskTableItemState::INVALID), mutex(), priority(0) {}
TaskTableItem(const TaskTableItem &src)
: id(src.id), state(src.state), mutex(), priority(src.priority) {}
: id(src.id), state(src.state), mutex(), priority(src.priority) {}
uint64_t id; // auto increment from 0;
// TODO: add tag into task
......@@ -42,12 +43,19 @@ struct TaskTableItem {
uint8_t priority; // just a number, meaningless;
};
using TaskTableItemPtr = std::shared_ptr<TaskTableItem>;
class TaskTable {
public:
TaskTable() = default;
explicit
TaskTable(std::vector<TaskPtr> &&tasks);
TaskTable(const TaskTable &) = delete;
TaskTable(TaskTable &&) = delete;
inline void
RegisterSubscriber(std::function<void(void)> subscriber) {
subscriber_ = std::move(subscriber);
}
/*
* Put one task;
......@@ -65,7 +73,7 @@ public:
/*
* Return task table item reference;
*/
TaskTableItem &
TaskTableItemPtr
Get(uint64_t index);
/*
......@@ -76,6 +84,29 @@ public:
void
Clear();
/*
* Return true if task table empty, otherwise false;
*/
inline bool
Empty() {
return table_.empty();
}
/*
* Return size of task table;
*/
inline size_t
Size() {
return table_.size();
}
public:
TaskTableItemPtr &
operator[](uint64_t index) {
return table_[index];
}
std::deque<TaskTableItemPtr>::iterator begin() { return table_.begin(); }
std::deque<TaskTableItemPtr>::iterator end() { return table_.end(); }
public:
......@@ -139,7 +170,10 @@ public:
private:
// TODO: map better ?
std::deque<TaskTableItem> table_;
std::uint64_t id_ = 0;
mutable std::mutex id_mutex_;
std::deque<TaskTableItemPtr> table_;
std::function<void(void)> subscriber_ = nullptr;
};
......
/*******************************************************************************
* copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved
* unauthorized copying of this file, via any medium is strictly prohibited.
* proprietary and confidential.
******************************************************************************/
#pragma once
#include "../resource/Resource.h"
namespace zilliz {
namespace milvus {
namespace engine {
class Action {
public:
/*
* Push task to neighbour;
*/
static void
PushTaskToNeighbour(const ResourceWPtr &self);
/*
* Pull task From neighbour;
*/
static void
PullTaskFromNeighbour(const ResourceWPtr &self);
};
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Action.h"
namespace zilliz {
namespace milvus {
namespace engine {
void
Action::PullTaskFromNeighbour(const ResourceWPtr &self) {
// TODO: implement
}
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Action.h"
namespace zilliz {
namespace milvus {
namespace engine {
void
push_task(const ResourcePtr &self, const ResourcePtr &other) {
auto &self_task_table = self->task_table();
auto &other_task_table = other->task_table();
CacheMgr cache;
auto indexes = PickToMove(self_task_table, cache, 1);
for (auto index : indexes) {
if (self_task_table.Move(index)) {
auto task = self_task_table.Get(index)->task;
other_task_table.Put(task);
// TODO: mark moved future
}
}
}
void
Action::PushTaskToNeighbour(const ResourceWPtr &res) {
if (auto self = res.lock()) {
for (auto &neighbour : self->GetNeighbours()) {
if (auto n = neighbour.neighbour_node.lock()) {
push_task(self, std::static_pointer_cast<Resource>(n));
}
}
}
}
}
}
}
/*******************************************************************************
* copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved
* unauthorized copying of this file, via any medium is strictly prohibited.
* proprietary and confidential.
******************************************************************************/
#pragma once
#include "Event.h"
#include "../TaskTable.h"
namespace zilliz {
namespace milvus {
namespace engine {
class CopyCompletedEvent : public Event {
public:
CopyCompletedEvent(std::weak_ptr<Resource> resource, TaskTableItemPtr task_table_item)
: Event(EventType::COPY_COMPLETED, std::move(resource)),
task_table_item_(std::move(task_table_item)) {}
inline std::string
Dump() const override {
return "<CopyCompletedEvent>";
}
friend std::ostream &operator<<(std::ostream &out, const CopyCompletedEvent &event);
public:
TaskTableItemPtr task_table_item_;
};
}
}
}
/*******************************************************************************
* copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved
* unauthorized copying of this file, via any medium is strictly prohibited.
* proprietary and confidential.
******************************************************************************/
#pragma once
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
enum class EventType {
START_UP,
COPY_COMPLETED,
FINISH_TASK,
TASK_TABLE_UPDATED
};
class Resource;
class Event {
public:
explicit
Event(EventType type, std::weak_ptr<Resource> resource)
: type_(type),
resource_(std::move(resource)) {}
inline EventType
Type() const {
return type_;
}
inline virtual std::string
Dump() const {
return "<Event>";
}
friend std::ostream &operator<<(std::ostream &out, const Event &event);
public:
EventType type_;
std::weak_ptr<Resource> resource_;
};
using EventPtr = std::shared_ptr<Event>;
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "Event.h"
#include "StartUpEvent.h"
#include "CopyCompletedEvent.h"
#include "FinishTaskEvent.h"
#include "TaskTableUpdatedEvent.h"
namespace zilliz {
namespace milvus {
namespace engine {
std::ostream &operator<<(std::ostream &out, const Event &event) {
out << event.Dump();
return out;
}
std::ostream &operator<<(std::ostream &out, const StartUpEvent &event) {
out << event.Dump();
return out;
}
std::ostream &operator<<(std::ostream &out, const CopyCompletedEvent &event) {
out << event.Dump();
return out;
}
std::ostream &operator<<(std::ostream &out, const FinishTaskEvent &event) {
out << event.Dump();
return out;
}
std::ostream &operator<<(std::ostream &out, const TaskTableUpdatedEvent &event) {
out << event.Dump();
return out;
}
}
}
}
/*******************************************************************************
* copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved
* unauthorized copying of this file, via any medium is strictly prohibited.
* proprietary and confidential.
******************************************************************************/
#pragma once
#include "Event.h"
namespace zilliz {
namespace milvus {
namespace engine {
class FinishTaskEvent : public Event {
public:
FinishTaskEvent(std::weak_ptr<Resource> resource, TaskTableItemPtr task_table_item)
: Event(EventType::FINISH_TASK, std::move(resource)),
task_table_item_(std::move(task_table_item)) {}
inline std::string
Dump() const override {
return "<FinishTaskEvent>";
}
friend std::ostream &operator<<(std::ostream &out, const FinishTaskEvent &event);
public:
TaskTableItemPtr task_table_item_;
};
}
}
}
/*******************************************************************************
* copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved
* unauthorized copying of this file, via any medium is strictly prohibited.
* proprietary and confidential.
******************************************************************************/
#pragma once
#include "Event.h"
namespace zilliz {
namespace milvus {
namespace engine {
class StartUpEvent : public Event {
public:
explicit
StartUpEvent(std::weak_ptr<Resource> resource)
: Event(EventType::START_UP, std::move(resource)) {}
inline std::string
Dump() const override {
return "<StartUpEvent>";
}
friend std::ostream &operator<<(std::ostream &out, const StartUpEvent &event);
};
}
}
}
\ No newline at end of file
/*******************************************************************************
* copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved
* unauthorized copying of this file, via any medium is strictly prohibited.
* proprietary and confidential.
******************************************************************************/
#pragma once
#include "Event.h"
namespace zilliz {
namespace milvus {
namespace engine {
class TaskTableUpdatedEvent : public Event {
public:
explicit
TaskTableUpdatedEvent(std::weak_ptr<Resource> resource)
: Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) {}
inline std::string
Dump() const override {
return "<TaskTableUpdatedEvent>";
}
friend std::ostream &operator<<(std::ostream &out, const TaskTableUpdatedEvent &event);
};
}
}
}
......@@ -11,26 +11,20 @@ namespace zilliz {
namespace milvus {
namespace engine {
std::ostream &operator<<(std::ostream &out, const CpuResource &resource) {
out << resource.Dump();
return out;
}
CpuResource::CpuResource(std::string name)
: Resource(std::move(name), ResourceType::CPU) {}
void CpuResource::LoadFile(TaskPtr task) {
//if (src.type == DISK) {
// fd = open(filename);
// content = fd.read();
// close(fd);
//} else if (src.type == CPU) {
// memcpy(src, dest, len);
//} else if (src.type == GPU) {
// cudaMemcpyD2H(src, dest);
//} else {
// // unknown type, exception
//}
task->Load(LoadType::DISK2CPU, 0);
}
void CpuResource::Process(TaskPtr task) {
task->Execute();
}
}
......
......@@ -19,6 +19,13 @@ public:
explicit
CpuResource(std::string name);
inline std::string
Dump() const override {
return "<CpuResource>";
}
friend std::ostream &operator<<(std::ostream &out, const CpuResource &resource);
protected:
void
LoadFile(TaskPtr task) override;
......
......@@ -10,9 +10,14 @@ namespace zilliz {
namespace milvus {
namespace engine {
std::ostream &operator<<(std::ostream &out, const DiskResource &resource) {
out << resource.Dump();
return out;
}
DiskResource::DiskResource(std::string name)
: Resource(std::move(name), ResourceType::DISK) {}
: Resource(std::move(name), ResourceType::DISK, true, false) {
}
void DiskResource::LoadFile(TaskPtr task) {
......
......@@ -18,6 +18,13 @@ public:
explicit
DiskResource(std::string name);
inline std::string
Dump() const override {
return "<DiskResource>";
}
friend std::ostream &operator<<(std::ostream &out, const DiskResource &resource);
protected:
void
LoadFile(TaskPtr task) override;
......
......@@ -11,16 +11,20 @@ namespace zilliz {
namespace milvus {
namespace engine {
std::ostream &operator<<(std::ostream &out, const GpuResource &resource) {
out << resource.Dump();
return out;
}
GpuResource::GpuResource(std::string name)
: Resource(std::move(name), ResourceType::GPU) {}
void GpuResource::LoadFile(TaskPtr task) {
task->Load(LoadType::CPU2GPU, 0);
}
void GpuResource::Process(TaskPtr task) {
task->Execute();
}
}
......
......@@ -18,6 +18,13 @@ public:
explicit
GpuResource(std::string name);
inline std::string
Dump() const override {
return "<GpuResource>";
}
friend std::ostream &operator<<(std::ostream &out, const GpuResource &resource);
protected:
void
LoadFile(TaskPtr task) override;
......
......@@ -55,8 +55,7 @@ std::string Node::Dump() {
void Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) {
std::lock_guard<std::mutex> lk(mutex_);
if (auto s = neighbour_node.lock()) {
Neighbour neighbour(neighbour_node, connection);
neighbours_[s->id_] = neighbour;
neighbours_.emplace(std::make_pair(s->id_, Neighbour(neighbour_node, connection)));
}
// else do nothing, consider it..
}
......
......@@ -10,54 +10,85 @@ namespace zilliz {
namespace milvus {
namespace engine {
Resource::Resource(std::string name, ResourceType type)
std::ostream &operator<<(std::ostream &out, const Resource &resource) {
out << resource.Dump();
return out;
}
Resource::Resource(std::string name,
ResourceType type,
bool enable_loader,
bool enable_executor)
: name_(std::move(name)),
type_(type),
running_(false),
enable_loader_(enable_loader),
enable_executor_(enable_executor),
load_flag_(false),
exec_flag_(false) {
task_table_.RegisterSubscriber([&] {
if (subscriber_) {
auto event = std::make_shared<TaskTableUpdatedEvent>(shared_from_this());
subscriber_(std::static_pointer_cast<Event>(event));
}
});
}
void Resource::Start() {
loader_thread_ = std::thread(&Resource::loader_function, this);
executor_thread_ = std::thread(&Resource::executor_function, this);
running_ = true;
if (enable_loader_) {
loader_thread_ = std::thread(&Resource::loader_function, this);
}
if (enable_executor_) {
executor_thread_ = std::thread(&Resource::executor_function, this);
}
}
void Resource::Stop() {
running_ = false;
WakeupLoader();
WakeupExecutor();
if (enable_loader_) {
WakeupLoader();
loader_thread_.join();
}
if (enable_executor_) {
WakeupExecutor();
executor_thread_.join();
}
}
TaskTable &Resource::task_table() {
return task_table_;
}
void Resource::WakeupExecutor() {
exec_cv_.notify_one();
}
void Resource::WakeupLoader() {
std::lock_guard<std::mutex> lock(load_mutex_);
load_flag_ = true;
load_cv_.notify_one();
}
TaskPtr Resource::pick_task_load() {
void Resource::WakeupExecutor() {
std::lock_guard<std::mutex> lock(exec_mutex_);
exec_flag_ = true;
exec_cv_.notify_one();
}
TaskTableItemPtr Resource::pick_task_load() {
auto indexes = PickToLoad(task_table_, 3);
for (auto index : indexes) {
// try to set one task loading, then return
if (task_table_.Load(index))
return task_table_.Get(index).task;
return task_table_.Get(index);
// else try next
}
return nullptr;
}
TaskPtr Resource::pick_task_execute() {
TaskTableItemPtr Resource::pick_task_execute() {
auto indexes = PickToExecute(task_table_, 3);
for (auto index : indexes) {
// try to set one task executing, then return
if (task_table_.Execute(index))
return task_table_.Get(index).task;
return task_table_.Get(index);
// else try next
}
return nullptr;
......@@ -67,24 +98,46 @@ void Resource::loader_function() {
while (running_) {
std::unique_lock<std::mutex> lock(load_mutex_);
load_cv_.wait(lock, [&] { return load_flag_; });
auto task = pick_task_load();
if (task) {
LoadFile(task);
GetRegisterFunc(RegisterType::ON_COPY_COMPLETED)->Exec();
load_flag_ = false;
while (true) {
auto task_item = pick_task_load();
if (task_item == nullptr) {
break;
}
LoadFile(task_item->task);
// TODO: wrapper loaded
task_item->state = TaskTableItemState::LOADED;
if (subscriber_) {
auto event = std::make_shared<CopyCompletedEvent>(shared_from_this(), task_item);
subscriber_(std::static_pointer_cast<Event>(event));
}
}
}
}
void Resource::executor_function() {
GetRegisterFunc(RegisterType::START_UP)->Exec();
if (subscriber_) {
auto event = std::make_shared<StartUpEvent>(shared_from_this());
subscriber_(std::static_pointer_cast<Event>(event));
}
while (running_) {
std::unique_lock<std::mutex> lock(exec_mutex_);
exec_cv_.wait(lock, [&] { return exec_flag_; });
auto task = pick_task_execute();
if (task) {
Process(task);
GetRegisterFunc(RegisterType::ON_FINISH_TASK)->Exec();
exec_flag_ = false;
while (true) {
auto task_item = pick_task_execute();
if (task_item == nullptr) {
break;
}
Process(task_item->task);
task_item->state = TaskTableItemState::EXECUTED;
if (subscriber_) {
auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task_item);
subscriber_(std::static_pointer_cast<Event>(event));
}
}
}
}
......
......@@ -12,6 +12,11 @@
#include <functional>
#include <condition_variable>
#include "../event/Event.h"
#include "../event/StartUpEvent.h"
#include "../event/CopyCompletedEvent.h"
#include "../event/FinishTaskEvent.h"
#include "../event/TaskTableUpdatedEvent.h"
#include "../TaskTable.h"
#include "../task/Task.h"
#include "../Cost.h"
......@@ -37,18 +42,28 @@ enum class RegisterType {
ON_TASK_TABLE_UPDATED,
};
class Resource : public Node {
class Resource : public Node, public std::enable_shared_from_this<Resource> {
public:
/*
* Event function MUST be a short function, never blocking;
*/
template <typename T>
void Register_T(const RegisterType& type) {
template<typename T>
void Register_T(const RegisterType &type) {
register_table_.emplace(type, [] { return std::make_shared<T>(); });
}
RegisterHandlerPtr
GetRegisterFunc(const RegisterType& type);
GetRegisterFunc(const RegisterType &type);
inline void
RegisterSubscriber(std::function<void(EventPtr)> subscriber) {
subscriber_ = std::move(subscriber);
}
inline ResourceType
Type() const {
return type_;
}
void
Start();
......@@ -59,21 +74,31 @@ public:
TaskTable &
task_table();
inline virtual std::string
Dump() const {
return "<Resource>";
}
friend std::ostream &operator<<(std::ostream &out, const Resource &resource);
public:
/*
* wake up executor;
* wake up loader;
*/
void
WakeupExecutor();
WakeupLoader();
/*
* wake up loader;
/*
* wake up executor;
*/
void
WakeupLoader();
WakeupExecutor();
protected:
Resource(std::string name, ResourceType type);
Resource(std::string name,
ResourceType type,
bool enable_loader = true,
bool enable_executor = true);
// TODO: SearchContextPtr to TaskPtr
/*
......@@ -100,14 +125,14 @@ private:
* Pick one task to load;
* Order by start time;
*/
TaskPtr
TaskTableItemPtr
pick_task_load();
/*
* Pick one task to execute;
* Pick by start time and priority;
*/
TaskPtr
TaskTableItemPtr
pick_task_execute();
private:
......@@ -123,7 +148,6 @@ private:
void
executor_function();
private:
std::string name_;
ResourceType type_;
......@@ -131,8 +155,11 @@ private:
TaskTable task_table_;
std::map<RegisterType, std::function<RegisterHandlerPtr()>> register_table_;
std::function<void(EventPtr)> subscriber_ = nullptr;
bool running_;
bool enable_loader_ = true;
bool enable_executor_ = true;
std::thread loader_thread_;
std::thread executor_thread_;
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "TestTask.h"
namespace zilliz {
namespace milvus {
namespace engine {
void
TestTask::Load(LoadType type, uint8_t device_id) {
load_count_++;
}
void
TestTask::Execute() {
exec_count_++;
}
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "Task.h"
namespace zilliz {
namespace milvus {
namespace engine {
class TestTask : public Task {
public:
TestTask() = default;
public:
void
Load(LoadType type, uint8_t device_id) override;
void
Execute() override;
public:
uint64_t load_count_;
uint64_t exec_count_;
};
}
}
}
......@@ -8,6 +8,7 @@
#include "knowhere/index/vector_index/idmap.h"
#include "knowhere/index/vector_index/gpu_ivf.h"
#include "knowhere/common/exception.h"
#include "knowhere/index/vector_index/cloner.h"
#include "vec_impl.h"
#include "data_transfer.h"
......@@ -152,6 +153,22 @@ VecIndexPtr VecIndexImpl::CopyToCpu(const Config &cfg) {
return std::make_shared<VecIndexImpl>(cpu_index, type);
}
VecIndexPtr VecIndexImpl::Clone() {
auto clone_index = std::make_shared<VecIndexImpl>(index_->Clone(), type);
clone_index->dim = dim;
return clone_index;
}
int64_t VecIndexImpl::GetDeviceId() {
if (auto device_idx = std::dynamic_pointer_cast<GPUIndex>(index_)){
return device_idx->GetGpuDevice();
}
else {
return -1; // -1 == cpu
}
return 0;
}
float *BFIndex::GetRawVectors() {
auto raw_index = std::dynamic_pointer_cast<IDMAP>(index_);
if (raw_index) { return raw_index->GetRawVectors(); }
......
......@@ -33,6 +33,8 @@ class VecIndexImpl : public VecIndex {
server::KnowhereError Add(const long &nb, const float *xb, const long *ids, const Config &cfg) override;
zilliz::knowhere::BinarySet Serialize() override;
server::KnowhereError Load(const zilliz::knowhere::BinarySet &index_binary) override;
VecIndexPtr Clone() override;
int64_t GetDeviceId() override;
server::KnowhereError Search(const long &nq, const float *xq, float *dist, long *ids, const Config &cfg) override;
protected:
......
......@@ -63,6 +63,10 @@ class VecIndex {
virtual VecIndexPtr CopyToCpu(const Config &cfg = Config()) = 0;
virtual VecIndexPtr Clone() = 0;
virtual int64_t GetDeviceId() = 0;
virtual IndexType GetType() = 0;
virtual int64_t Dimension() = 0;
......
......@@ -14,6 +14,8 @@ aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler scheduler_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/context scheduler_context_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/db/scheduler/task scheduler_task_files)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/action scheduler_action_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/event scheduler_event_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/task scheduler_task_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_srcs)
......@@ -35,6 +37,9 @@ include_directories(/usr/include/mysql)
set(scheduler_test_src
${unittest_srcs}
${test_srcs}
${scheduler_action_srcs}
${scheduler_event_srcs}
${scheduler_resource_srcs}
${scheduler_task_srcs}
${scheduler_srcs}
......
#include "scheduler/TaskTable.h"
#include "scheduler/Cost.h"
#include <gtest/gtest.h>
using namespace zilliz::milvus::engine;
class CostTest : public ::testing::Test {
protected:
void
SetUp() override {
for (uint64_t i = 0; i < 8; ++i) {
auto task = std::make_shared<XSearchTask>();
table_.Put(task);
}
table_.Get(0)->state = TaskTableItemState::INVALID;
table_.Get(1)->state = TaskTableItemState::START;
table_.Get(2)->state = TaskTableItemState::LOADING;
table_.Get(3)->state = TaskTableItemState::LOADED;
table_.Get(4)->state = TaskTableItemState::EXECUTING;
table_.Get(5)->state = TaskTableItemState::EXECUTED;
table_.Get(6)->state = TaskTableItemState::MOVING;
table_.Get(7)->state = TaskTableItemState::MOVED;
}
TaskTable table_;
};
TEST_F(CostTest, pick_to_move) {
CacheMgr cache;
auto indexes = PickToMove(table_, cache, 10);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0], 3);
}
TEST_F(CostTest, pick_to_load) {
auto indexes = PickToLoad(table_, 10);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0], 1);
}
TEST_F(CostTest, pick_to_executed) {
auto indexes = PickToExecute(table_, 10);
ASSERT_EQ(indexes.size(), 1);
ASSERT_EQ(indexes[0], 3);
}
......@@ -10,6 +10,8 @@ protected:
SetUp() override {
node1_ = std::make_shared<Node>();
node2_ = std::make_shared<Node>();
node3_ = std::make_shared<Node>();
node4_ = std::make_shared<Node>();
auto pcie = Connection("PCIe", 11.0);
......
#include "scheduler/ResourceFactory.h"
#include "scheduler/ResourceMgr.h"
#include "scheduler/Scheduler.h"
#include "scheduler/task/TestTask.h"
#include "utils/Log.h"
#include <gtest/gtest.h>
using namespace zilliz::milvus::engine;
TEST(normal_test, DISABLED_test1) {
TEST(normal_test, test1) {
// ResourceMgr only compose resources, provide unified event
auto res_mgr = std::make_shared<ResourceMgr>();
auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd"));
......@@ -23,17 +24,62 @@ TEST(normal_test, DISABLED_test1) {
res_mgr->Start();
auto task1 = std::make_shared<XSearchTask>();
auto task2 = std::make_shared<XSearchTask>();
auto scheduler = new Scheduler(res_mgr);
scheduler->Start();
auto task1 = std::make_shared<TestTask>();
auto task2 = std::make_shared<TestTask>();
auto task3 = std::make_shared<TestTask>();
auto task4 = std::make_shared<TestTask>();
if (auto observe = disk.lock()) {
observe->task_table().Put(task1);
observe->task_table().Put(task2);
observe->task_table().Put(task1);
observe->task_table().Put(task1);
observe->task_table().Put(task3);
observe->task_table().Put(task4);
}
auto scheduler = new Scheduler(res_mgr);
scheduler->Start();
// if (auto disk_r = disk.lock()) {
// if (auto cpu_r = cpu.lock()) {
// if (auto gpu1_r = gpu1.lock()) {
// if (auto gpu2_r = gpu2.lock()) {
// std::cout << "<<<<<<<<<<before<<<<<<<<<<" << std::endl;
// std::cout << "disk:" << std::endl;
// std::cout << disk_r->task_table().Dump() << std::endl;
// std::cout << "cpu:" << std::endl;
// std::cout << cpu_r->task_table().Dump() << std::endl;
// std::cout << "gpu1:" << std::endl;
// std::cout << gpu1_r->task_table().Dump() << std::endl;
// std::cout << "gpu2:" << std::endl;
// std::cout << gpu2_r->task_table().Dump() << std::endl;
// std::cout << ">>>>>>>>>>before>>>>>>>>>>" << std::endl;
// }
// }
// }
// }
sleep(5);
// if (auto disk_r = disk.lock()) {
// if (auto cpu_r = cpu.lock()) {
// if (auto gpu1_r = gpu1.lock()) {
// if (auto gpu2_r = gpu2.lock()) {
// std::cout << "<<<<<<<<<<after<<<<<<<<<<" << std::endl;
// std::cout << "disk:" << std::endl;
// std::cout << disk_r->task_table().Dump() << std::endl;
// std::cout << "cpu:" << std::endl;
// std::cout << cpu_r->task_table().Dump() << std::endl;
// std::cout << "gpu1:" << std::endl;
// std::cout << gpu1_r->task_table().Dump() << std::endl;
// std::cout << "gpu2:" << std::endl;
// std::cout << gpu2_r->task_table().Dump() << std::endl;
// std::cout << ">>>>>>>>>>after>>>>>>>>>>" << std::endl;
// }
// }
// }
// }
scheduler->Stop();
res_mgr->Stop();
while (true) sleep(1);
ASSERT_EQ(task1->load_count_, 1);
ASSERT_EQ(task1->exec_count_, 1);
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "scheduler/resource/Resource.h"
#include "scheduler/resource/DiskResource.h"
#include "scheduler/resource/CpuResource.h"
#include "scheduler/resource/GpuResource.h"
#include "scheduler/task/Task.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/ResourceFactory.h"
#include <gtest/gtest.h>
namespace zilliz {
namespace milvus {
namespace engine {
class ResourceTest : public testing::Test {
protected:
void
SetUp() override {
disk_resource_ = ResourceFactory::Create("disk");
cpu_resource_ = ResourceFactory::Create("cpu");
gpu_resource_ = ResourceFactory::Create("gpu");
resources_.push_back(disk_resource_);
resources_.push_back(cpu_resource_);
resources_.push_back(gpu_resource_);
auto subscriber = [&](EventPtr event) {
if (event->Type() == EventType::COPY_COMPLETED) {
std::lock_guard<std::mutex> lock(load_mutex_);
++load_count_;
cv_.notify_one();
}
if (event->Type() == EventType::FINISH_TASK) {
std::lock_guard<std::mutex> lock(load_mutex_);
++exec_count_;
cv_.notify_one();
}
};
disk_resource_->RegisterSubscriber(subscriber);
cpu_resource_->RegisterSubscriber(subscriber);
gpu_resource_->RegisterSubscriber(subscriber);
disk_resource_->Start();
cpu_resource_->Start();
gpu_resource_->Start();
}
void
TearDown() override {
disk_resource_->Stop();
cpu_resource_->Stop();
gpu_resource_->Stop();
}
void
WaitLoader(uint64_t count) {
std::unique_lock<std::mutex> lock(load_mutex_);
cv_.wait(lock, [&] { return load_count_ == count; });
}
void
WaitExecutor(uint64_t count) {
std::unique_lock<std::mutex> lock(exec_mutex_);
cv_.wait(lock, [&] { return exec_count_ == count; });
}
ResourcePtr disk_resource_;
ResourcePtr cpu_resource_;
ResourcePtr gpu_resource_;
std::vector<ResourcePtr> resources_;
uint64_t load_count_ = 0;
uint64_t exec_count_ = 0;
std::mutex load_mutex_;
std::mutex exec_mutex_;
std::condition_variable cv_;
};
TEST_F(ResourceTest, cpu_resource_test) {
const uint64_t NUM = 100;
std::vector<std::shared_ptr<TestTask>> tasks;
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>();
tasks.push_back(task);
cpu_resource_->task_table().Put(task);
}
cpu_resource_->WakeupLoader();
WaitLoader(NUM);
// std::cout << "after WakeupLoader" << std::endl;
// std::cout << cpu_resource_->task_table().Dump();
for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->load_count_, 1);
}
cpu_resource_->WakeupExecutor();
WaitExecutor(NUM);
// std::cout << "after WakeupExecutor" << std::endl;
// std::cout << cpu_resource_->task_table().Dump();
for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->exec_count_, 1);
}
}
TEST_F(ResourceTest, gpu_resource_test) {
const uint64_t NUM = 100;
std::vector<std::shared_ptr<TestTask>> tasks;
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>();
tasks.push_back(task);
gpu_resource_->task_table().Put(task);
}
gpu_resource_->WakeupLoader();
WaitLoader(NUM);
// std::cout << "after WakeupLoader" << std::endl;
// std::cout << cpu_resource_->task_table().Dump();
for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->load_count_, 1);
}
gpu_resource_->WakeupExecutor();
WaitExecutor(NUM);
// std::cout << "after WakeupExecutor" << std::endl;
// std::cout << cpu_resource_->task_table().Dump();
for (uint64_t i = 0; i < NUM; ++i) {
ASSERT_EQ(tasks[i]->exec_count_, 1);
}
}
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <gtest/gtest.h>
namespace zilliz {
namespace milvus {
namespace engine {
}
}
}
......@@ -45,8 +45,6 @@ protected:
invalid_task_ = nullptr;
task1_ = std::make_shared<XSearchTask>();
task2_ = std::make_shared<XSearchTask>();
empty_table_ = TaskTable();
}
TaskPtr invalid_task_;
......@@ -58,19 +56,19 @@ protected:
TEST_F(TaskTableBaseTest, put_task) {
empty_table_.Put(task1_);
ASSERT_EQ(empty_table_.Get(0).task, task1_);
ASSERT_EQ(empty_table_.Get(0)->task, task1_);
}
TEST_F(TaskTableBaseTest, put_invalid_test) {
empty_table_.Put(invalid_task_);
ASSERT_EQ(empty_table_.Get(0).task, invalid_task_);
ASSERT_EQ(empty_table_.Get(0)->task, invalid_task_);
}
TEST_F(TaskTableBaseTest, put_batch) {
std::vector<TaskPtr> tasks{task1_, task2_};
empty_table_.Put(tasks);
ASSERT_EQ(empty_table_.Get(0).task, task1_);
ASSERT_EQ(empty_table_.Get(1).task, task2_);
ASSERT_EQ(empty_table_.Get(0)->task, task1_);
ASSERT_EQ(empty_table_.Get(1)->task, task2_);
}
TEST_F(TaskTableBaseTest, put_empty_batch) {
......@@ -89,14 +87,14 @@ protected:
table1_.Put(task);
}
table1_.Get(0).state = TaskTableItemState::INVALID;
table1_.Get(1).state = TaskTableItemState::START;
table1_.Get(2).state = TaskTableItemState::LOADING;
table1_.Get(3).state = TaskTableItemState::LOADED;
table1_.Get(4).state = TaskTableItemState::EXECUTING;
table1_.Get(5).state = TaskTableItemState::EXECUTED;
table1_.Get(6).state = TaskTableItemState::MOVING;
table1_.Get(7).state = TaskTableItemState::MOVED;
table1_.Get(0)->state = TaskTableItemState::INVALID;
table1_.Get(1)->state = TaskTableItemState::START;
table1_.Get(2)->state = TaskTableItemState::LOADING;
table1_.Get(3)->state = TaskTableItemState::LOADED;
table1_.Get(4)->state = TaskTableItemState::EXECUTING;
table1_.Get(5)->state = TaskTableItemState::EXECUTED;
table1_.Get(6)->state = TaskTableItemState::MOVING;
table1_.Get(7)->state = TaskTableItemState::MOVED;
}
TaskTable table1_;
......@@ -106,22 +104,22 @@ TEST_F(TaskTableAdvanceTest, load) {
table1_.Load(1);
table1_.Loaded(2);
ASSERT_EQ(table1_.Get(1).state, TaskTableItemState::LOADING);
ASSERT_EQ(table1_.Get(2).state, TaskTableItemState::LOADED);
ASSERT_EQ(table1_.Get(1)->state, TaskTableItemState::LOADING);
ASSERT_EQ(table1_.Get(2)->state, TaskTableItemState::LOADED);
}
TEST_F(TaskTableAdvanceTest, execute) {
table1_.Execute(3);
table1_.Executed(4);
ASSERT_EQ(table1_.Get(3).state, TaskTableItemState::EXECUTING);
ASSERT_EQ(table1_.Get(4).state, TaskTableItemState::EXECUTED);
ASSERT_EQ(table1_.Get(3)->state, TaskTableItemState::EXECUTING);
ASSERT_EQ(table1_.Get(4)->state, TaskTableItemState::EXECUTED);
}
TEST_F(TaskTableAdvanceTest, move) {
table1_.Move(3);
table1_.Moved(6);
ASSERT_EQ(table1_.Get(3).state, TaskTableItemState::MOVING);
ASSERT_EQ(table1_.Get(6).state, TaskTableItemState::MOVED);
ASSERT_EQ(table1_.Get(3)->state, TaskTableItemState::MOVING);
ASSERT_EQ(table1_.Get(6)->state, TaskTableItemState::MOVED);
}
......@@ -39,6 +39,14 @@ public:
}
engine::VecIndexPtr Clone() override {
return zilliz::milvus::engine::VecIndexPtr();
}
int64_t GetDeviceId() override {
return 0;
}
engine::IndexType GetType() override {
return engine::IndexType::INVALID;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册