提交 887c2c4f 编写于 作者: J jinhai

Merge branch 'branch-0.4.0' into 'branch-0.4.0'

MS-336 scheduler interface

See merge request megasearch/milvus!332

Former-commit-id: 47f143b78d0e6b3ff9af768d56872aafbf822df8
......@@ -19,3 +19,4 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-202 - Add Milvus Jenkins project email notification
- MS-215 - Add Milvus cluster CI/CD groovy file
- MS-277 - Update CUDA Version to V10.1
- MS-336 - Scheduler interface
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
// dummy cache_mgr
class CacheMgr {
};
using CacheMgrPtr = std::shared_ptr<CacheMgr>;
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <vector>
#include "Task.h"
#include "CacheMgr.h"
namespace zilliz {
namespace milvus {
namespace engine {
// TODO: Policy interface
// TODO: collect statistics
/*
* select tasks to move;
* call from scheduler;
*/
std::vector<TaskPtr>
PickToMove(const TaskTable &task_table, const CacheMgr &cache_mgr, double limit) {}
/*
* select task to load
* call from resource;
* I DONT SURE NEED THIS;
*/
std::vector<TaskPtr>
PickToLoad(TaskTable task_table, uint64_t limit) {}
/*
* select task to execute;
* call from resource;
* I DONT SURE NEED THIS;
*/
std::vector<TaskPtr>
PickToExecute(TaskTable task_table, uint64_t limit) {}
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <string>
#include <memory>
#include "resource/Resource.h"
#include "resource/CpuResource.h"
#include "resource/GpuResource.h"
#include "resource/DiskResource.h"
namespace zilliz {
namespace milvus {
namespace engine {
class ResourceFactory {
public:
static std::shared_ptr<Resource>
Create(const std::string &name, const std::string &alias = "") {
if (name == "disk") {
return std::make_shared<CpuResource>(alias);
} else if (name == "cpu") {
return std::make_shared<CpuResource>(alias);
} else if (name == "gpu") {
return std::make_shared<CpuResource>(alias);
} else {
return nullptr;
}
}
};
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <string>
#include <vector>
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
class ResourceMgr {
public:
ResourceMgr() : running_(false) {}
/******** Management Interface ********/
/*
* Add resource into Resource Management;
* Generate functions on events;
* Functions only modify bool variable, like event trigger;
*/
ResourceWPtr
Add(ResourcePtr &&resource) {
ResourceWPtr ret(resource);
resources_.emplace_back(resource);
// resource->RegisterOnStartUp([] {
// start_up_event_[index] = true;
// });
// resource.RegisterOnFinishTask([] {
// finish_task_event_[index] = true;
// });
return ret;
}
/*
* Create connection between A and B;
*/
void
Connect(ResourceWPtr &A, ResourceWPtr &B, Connection &connection) {
if (auto observe_a = A.lock()) {
if (auto observe_b = B.lock()) {
observe_a->AddNeighbour(std::static_pointer_cast<Node>(observe_b), connection);
}
}
}
/*
* Synchronous start all resource;
* Last, start event process thread;
*/
void
StartAll() {
for (auto &resource : resources_) {
resource->Start();
}
worker_thread_ = std::thread(&ResourceMgr::EventProcess, this);
}
// TODO: add stats interface(low)
public:
/******** Event Register Interface ********/
/*
* Register on start up event;
*/
void
RegisterOnStartUp(std::function<void(ResourceWPtr)> &func) {
on_start_up_ = func;
}
/*
* Register on finish one task event;
*/
void
RegisterOnFinishTask(std::function<void(ResourceWPtr)> &func) {
on_finish_task_ = func;
}
/*
* Register on copy task data completed event;
*/
void
RegisterOnCopyCompleted(std::function<void(ResourceWPtr)> &func);
/*
* Register on task table updated event;
*/
void
RegisterOnTaskTableUpdated(std::function<void(ResourceWPtr)> &func);
public:
/******** Utlitity Functions ********/
std::string
Dump();
private:
void
EventProcess() {
while (running_) {
for (uint64_t i = 0; i < resources_.size(); ++i) {
if (start_up_event_[i]) {
on_start_up_(resources_[i]);
}
}
}
}
private:
bool running_;
std::vector<ResourcePtr> resources_;
std::thread worker_thread_;
std::vector<bool> start_up_event_;
std::vector<bool> finish_task_event_;
std::vector<bool> copy_completed_event_;
std::vector<bool> task_table_updated_event_;
std::function<void(ResourceWPtr)> on_start_up_;
std::function<void(ResourceWPtr)> on_finish_task_;
std::function<void(ResourceWPtr)> on_copy_completed_;
std::function<void(ResourceWPtr)> on_task_table_updated_;
};
using ResourceMgrWPtr = std::weak_ptr<ResourceMgr>;
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <string>
#include <mutex>
#include <thread>
#include <queue>
namespace zilliz {
namespace milvus {
namespace engine {
class Event {
public:
explicit
Event(ResourceWPtr &resource)
: resource_(resource) {}
public:
virtual void
Process() = 0;
private:
ResourceWPtr resource_;
};
using EventPtr = std::shared_ptr<Event>;
class StartUpEvent : public Event {
public:
explicit
StartUpEvent(ResourceWPtr &resource)
: Event(resource) {}
public:
void
Process() override;
};
class FinishTaskEvent : public Event {
public:
explicit
FinishTaskEvent(ResourceWPtr &resource)
: Event(resource) {}
public:
void
Process() override {
// for (nei : res->neighbours) {
// tasks = cost(nei->task_table(), nei->connection, limit = 3)
// res->task_table()->PutTasks(tasks);
// }
// res->WakeUpExec();
}
};
class CopyCompletedEvent : public Event {
public:
explicit
CopyCompletedEvent(ResourceWPtr &resource)
: Event(resource) {}
public:
void
Process() override;
};
class TaskTableUpdatedEvent : public Event {
public:
explicit
TaskTableUpdatedEvent(ResourceWPtr &resource)
: Event(resource) {}
public:
void
Process() override;
};
class Scheduler {
public:
explicit
Scheduler(ResourceMgrWPtr res_mgr)
: running_(false),
res_mgr_(std::move(res_mgr)) {
// res_mgr.Register();
// res_mgr.Register();
// res_mgr.Register();
// res_mgr.Register();
}
void
Start() {}
/******** Events ********/
/*
* Process start up events;
*/
void
OnStartUp(ResourceWPtr &resource) {
// call from res_mgr, non-blocking, if queue size over limit, exception!
auto event = std::make_shared<StartUpEvent>(resource);
event_queue_.push(event);
}
/*
* Process finish task events;
*/
void
OnFinishTask(ResourceWPtr);
/*
* Process copy completed events;
*/
void
OnCopyCompleted(ResourceWPtr);
/*
* Process task table updated events;
*/
void
OnTaskTableUpdated(ResourceWPtr);
public:
std::string
Dump();
private:
void
worker_function() {
while (running_) {
auto event = event_queue_.front();
event->Process();
}
}
private:
bool running_;
ResourceMgrWPtr res_mgr_;
std::queue<EventPtr> event_queue_;
std::thread worker_thread_;
};
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <string>
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
// dummy task
class Task {
public:
Task(const std::string &name) {}
void
Execute() {}
};
using TaskPtr = std::shared_ptr<Task>;
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <vector>
#include <deque>
#include <mutex>
#include "Task.h"
namespace zilliz {
namespace milvus {
namespace engine {
enum class TaskTableItemState {
INVALID,
START, // idle
LOADING, // loading data from other resource
LOADED, // ready to exec or move
EXECUTING, // executing, locking util executed or failed
EXECUTED, // executed, termination state
MOVING, // moving to another resource, locking util executed or failed
MOVED, // moved, termination state
};
struct TaskTableItem {
TaskTableItem() : id(0), state(TaskTableItemState::INVALID), mutex(), priority(0) {}
TaskTableItem(const TaskTableItem &src)
: id(src.id), state(src.state), mutex(), priority(src.priority) {}
uint64_t id; // auto increment from 0;
// TODO: add tag into task
TaskPtr task; // the task;
TaskTableItemState state; // the state;
std::mutex mutex;
uint8_t priority; // just a number, meaningless;
};
class TaskTable {
public:
TaskTable() = default;
explicit
TaskTable(std::vector<TaskPtr> &&tasks) {}
/*
* Put one task;
*/
void
Put(TaskPtr task) {}
/*
* Put tasks back of task table;
* Called by DBImpl;
*/
void
Put(std::vector<TaskPtr> &tasks) {}
/*
* Return task table item reference;
*/
TaskTableItem &
Get(uint64_t index) {}
/*
* TODO
* Remove sequence task which is DONE or MOVED from front;
* Called by ?
*/
void
Clear() {
// find first task is NOT (done or moved), erase from begin to it;
// auto iterator = table_.begin();
// while (iterator->state == TaskTableItemState::EXECUTED or
// iterator->state == TaskTableItemState::MOVED)
// iterator++;
// table_.erase(table_.begin(), iterator);
}
public:
/******** Action ********/
/*
* Move a task;
* Set state moving;
* Called by scheduler;
*/
// TODO: bool to Status
bool
Move(uint64_t index) {
auto &task = table_[index];
std::lock_guard<std::mutex> lock(task.mutex);
if (task.state == TaskTableItemState::START) {
task.state = TaskTableItemState::LOADING;
return true;
}
return false;
}
/*
* Move task finished;
* Set state moved;
* Called by scheduler;
*/
bool
Moved(uint64_t index) {}
/*
* Load a task;
* Set state loading;
* Called by loader;
*/
bool
Load(uint64_t index) {}
/*
* Load task finished;
* Set state loaded;
* Called by loader;
*/
bool
Loaded(uint64_t index) {}
/*
* Execute a task;
* Set state executing;
* Called by executor;
*/
bool
Execute(uint64_t index) {}
/*
* Execute task finished;
* Set state executed;
* Called by executor;
*/
bool
Executed(uint64_t index) {}
public:
/*
* Dump;
*/
std::string
Dump();
private:
// TODO: map better ?
std::deque<TaskTableItem> table_;
};
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <string>
#include <sstream>
namespace zilliz {
namespace milvus {
namespace engine {
class Connection {
public:
Connection(std::string name, double speed)
: name_(std::move(name)), speed_(speed) {}
const std::string &
get_name() const {
return name_;
}
const double
get_speed() const {
return speed_;
}
public:
std::string
Dump() const {
std::stringstream ss;
ss << "<name: " << name_ << ", speed: " << speed_ << ">";
return ss.str();
}
private:
std::string name_;
double speed_;
};
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <string>
#include "Resource.h"
namespace zilliz {
namespace milvus {
namespace engine {
class CpuResource : public Resource {
public:
explicit
CpuResource(std::string name)
: Resource(std::move(name), ResourceType::CPU) {}
protected:
void
LoadFile(TaskPtr task) override {
// if (src.type == DISK) {
// fd = open(filename);
// content = fd.read();
// close(fd);
// } else if (src.type == CPU) {
// memcpy(src, dest, len);
// } else if (src.type == GPU) {
// cudaMemcpyD2H(src, dest);
// } else {
// // unknown type, exception
// }
}
void
Process(TaskPtr task) override {
task->Execute();
}
};
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "Resource.h"
namespace zilliz {
namespace milvus {
namespace engine {
class DiskResource : public Resource {
};
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "Resource.h"
namespace zilliz {
namespace milvus {
namespace engine {
class GpuResource : public Resource {
};
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <vector>
#include <memory>
#include "../TaskTable.h"
#include "Connection.h"
namespace zilliz {
namespace milvus {
namespace engine {
class Node;
using NeighbourNodePtr = std::weak_ptr<Node>;
struct Neighbour {
NeighbourNodePtr neighbour_node;
Connection connection;
};
class Node {
public:
void
AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) {
Neighbour neighbour{.neighbour_node = neighbour_node, .connection = connection};
neighbours_.push_back(neighbour);
}
void
DelNeighbour(NeighbourNodePtr &neighbour_ptr);
bool
IsNeighbour(NeighbourNodePtr &neighbour_ptr);
std::vector<NeighbourNodePtr>
GetNeighbours();
public:
std::string
Dump();
private:
std::vector<Neighbour> neighbours_;
};
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include <string>
#include <vector>
#include <memory>
#include <thread>
#include <functional>
#include <condition_variable>
#include "../TaskTable.h"
#include "../Task.h"
#include "../Cost.h"
#include "Node.h"
#include "Connection.h"
namespace zilliz {
namespace milvus {
namespace engine {
enum class ResourceType {
DISK = 0,
CPU = 1,
GPU = 2
};
class Resource : public Node {
public:
void
Start() {
loader_thread_ = std::thread(&Resource::loader_function, this);
executor_thread_ = std::thread(&Resource::executor_function, this);
}
void
Stop() {
running_ = false;
WakeupLoader();
WakeupExecutor();
}
TaskTable &
task_table() {
return task_table_;
}
public:
/*
* wake up executor;
*/
void
WakeupExecutor() {
exec_cv_.notify_one();
}
/*
* wake up loader;
*/
void
WakeupLoader() {
load_cv_.notify_one();
}
public:
/*
* Event function MUST be a short function, never blocking;
*/
/*
* Register on start up event;
*/
void
RegisterOnStartUp(std::function<void(void)> func);
/*
* Register on finish one task event;
*/
void
RegisterOnFinishTask(std::function<void(void)> func);
/*
* Register on copy task data completed event;
*/
void
RegisterOnCopyCompleted(std::function<void(void)> func);
/*
* Register on task table updated event;
*/
void
RegisterOnTaskTableUpdated(std::function<void(void)> func);
protected:
Resource(std::string name, ResourceType type)
: name_(std::move(name)),
type_(type),
on_start_up_(nullptr),
on_finish_task_(nullptr),
on_copy_completed_(nullptr),
on_task_table_updated_(nullptr),
running_(false),
load_flag_(false),
exec_flag_(false) {
}
// TODO: SearchContextPtr to TaskPtr
/*
* Implementation by inherit class;
* Blocking function;
*/
virtual void
LoadFile(TaskPtr task) = 0;
/*
* Implementation by inherit class;
* Blocking function;
*/
virtual void
Process(TaskPtr task) = 0;
private:
/*
* These function should move to cost.h ???
* COST.H ???
*/
/*
* Pick one task to load;
* Order by start time;
*/
TaskPtr
pick_task_load() {
auto tasks = PickToLoad(task_table_, 3);
for (uint64_t i = 0; i < tasks.size(); ++i) {
// try to set one task loading, then return
if (task_table_.Load(i))
return task_table_.Get(i).task;
// else try next
}
return nullptr;
}
/*
* Pick one task to execute;
* Pick by start time and priority;
*/
TaskPtr
pick_task_execute() {
auto tasks = PickToExecute(task_table_, 3);
for (uint64_t i = 0; i < tasks.size(); ++i) {
// try to set one task executing, then return
if (task_table_.Execute(i))
return task_table_.Get(i).task;
// else try next
}
return nullptr;
}
private:
/*
* Only called by load thread;
*/
void
loader_function() {
while (running_) {
std::unique_lock<std::mutex> lock(load_mutex_);
load_cv_.wait(lock, [&] { return load_flag_; });
auto task = pick_task_load();
if (task) {
LoadFile(task);
on_copy_completed_();
}
}
}
/*
* Only called by worker thread;
*/
void
executor_function() {
on_start_up_();
while (running_) {
std::unique_lock<std::mutex> lock(exec_mutex_);
exec_cv_.wait(lock, [&] { return exec_flag_; });
auto task = pick_task_execute();
if (task) {
Process(task);
on_finish_task_();
}
}
}
private:
std::string name_;
ResourceType type_;
TaskTable task_table_;
std::function<void(void)> on_start_up_;
std::function<void(void)> on_finish_task_;
std::function<void(void)> on_copy_completed_;
std::function<void(void)> on_task_table_updated_;
bool running_;
std::thread loader_thread_;
std::thread executor_thread_;
bool load_flag_;
bool exec_flag_;
std::mutex load_mutex_;
std::mutex exec_mutex_;
std::condition_variable load_cv_;
std::condition_variable exec_cv_;
};
using ResourcePtr = std::shared_ptr<Resource>;
using ResourceWPtr = std::weak_ptr<Resource>;
}
}
}
......@@ -42,4 +42,5 @@ add_subdirectory(server)
add_subdirectory(db)
add_subdirectory(knowhere)
add_subdirectory(metrics)
add_subdirectory(scheduler)
#add_subdirectory(storage)
\ No newline at end of file
#-------------------------------------------------------------------------------
# Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
# Unauthorized copying of this file, via any medium is strictly prohibited.
# Proprietary and confidential.
#-------------------------------------------------------------------------------
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler/resource scheduler_resource_srcs)
aux_source_directory(${MILVUS_ENGINE_SRC}/scheduler scheduler_srcs)
aux_source_directory(./ test_srcs)
include_directories(/usr/local/cuda/include)
link_directories("/usr/local/cuda/lib64")
include_directories(/usr/include/mysql)
#add_definitions(-DBOOST_ERROR_CODE_HEADER_ONLY)
set(scheduler_test_src
${scheduler_resource_srcs}
${scheduler_srcs}
${test_srcs}
)
cuda_add_executable(scheduler_test ${scheduler_test_src})
set(scheduler_libs
sqlite
boost_system_static
boost_filesystem_static
lz4
mysqlpp
)
target_link_libraries(scheduler_test ${scheduler_libs} ${unittest_libs})
install(TARGETS scheduler_test DESTINATION bin)
#include "scheduler/ResourceFactory.h"
#include "scheduler/ResourceMgr.h"
#include "scheduler/Scheduler.h"
#include <gtest/gtest.h>
using namespace zilliz::milvus::engine;
int main() {
// ResourceMgr only compose resources, provide unified event
auto res_mgr = std::make_shared<ResourceMgr>();
auto disk = res_mgr->Add(ResourceFactory::Create("disk", "ssd"));
auto cpu = res_mgr->Add(ResourceFactory::Create("cpu"));
auto gpu1 = res_mgr->Add(ResourceFactory::Create("gpu"));
auto gpu2 = res_mgr->Add(ResourceFactory::Create("gpu"));
auto IO = Connection("IO", 500.0);
auto PCIE = Connection("IO", 11000.0);
res_mgr->Connect(disk, cpu, IO);
res_mgr->Connect(cpu, gpu1, PCIE);
res_mgr->Connect(cpu, gpu2, PCIE);
res_mgr->StartAll();
auto task1 = std::make_shared<Task>("123456789");
auto task2 = std::make_shared<Task>("222222222");
if (auto observe = disk.lock()) {
observe->task_table().Put(task1);
observe->task_table().Put(task2);
observe->task_table().Put(task1);
observe->task_table().Put(task1);
}
auto scheduler = new Scheduler(res_mgr);
scheduler->Start();
while (true) sleep(1);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册