diff --git a/cpp/src/scheduler/resource/CpuResource.cpp b/cpp/src/scheduler/resource/CpuResource.cpp new file mode 100644 index 0000000000000000000000000000000000000000..32eb62704607c6a471b028fdb73384e9361f8bb1 --- /dev/null +++ b/cpp/src/scheduler/resource/CpuResource.cpp @@ -0,0 +1,38 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "CpuResource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + + +CpuResource::CpuResource(std::string name) + : Resource(std::move(name), ResourceType::CPU) {} + +void CpuResource::LoadFile(TaskPtr task) { + //if (src.type == DISK) { + // fd = open(filename); + // content = fd.read(); + // close(fd); + //} else if (src.type == CPU) { + // memcpy(src, dest, len); + //} else if (src.type == GPU) { + // cudaMemcpyD2H(src, dest); + //} else { + // // unknown type, exception + //} +} + +void CpuResource::Process(TaskPtr task) { + +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/resource/CpuResource.h b/cpp/src/scheduler/resource/CpuResource.h index 995615d1ab8a8c298a9f732c2afca1e00f2d2dc7..be1340e954c6453225a1afa6dbef4a8ae3e3745c 100644 --- a/cpp/src/scheduler/resource/CpuResource.h +++ b/cpp/src/scheduler/resource/CpuResource.h @@ -17,29 +17,14 @@ namespace engine { class CpuResource : public Resource { public: explicit - CpuResource(std::string name) - : Resource(std::move(name), ResourceType::CPU) {} + CpuResource(std::string name); protected: void - LoadFile(TaskPtr task) override { -// if (src.type == DISK) { -// fd = open(filename); -// content = fd.read(); -// close(fd); -// } else if (src.type == CPU) { -// memcpy(src, dest, len); -// } else if (src.type == GPU) { -// cudaMemcpyD2H(src, dest); -// } else { -// // unknown type, exception -// } - } + LoadFile(TaskPtr task) override; void - Process(TaskPtr task) override { - task->Execute(); - } + Process(TaskPtr task) override; }; } diff --git a/cpp/src/scheduler/resource/DiskResource.cpp b/cpp/src/scheduler/resource/DiskResource.cpp new file mode 100644 index 0000000000000000000000000000000000000000..dcc0687ac46c414c1775782ab7f4c5480e5deb45 --- /dev/null +++ b/cpp/src/scheduler/resource/DiskResource.cpp @@ -0,0 +1,28 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#include "DiskResource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + + +DiskResource::DiskResource(std::string name) + : Resource(std::move(name), ResourceType::DISK) {} + +void DiskResource::LoadFile(TaskPtr task) { + +} + +void DiskResource::Process(TaskPtr task) { + +} + +} +} +} + diff --git a/cpp/src/scheduler/resource/DiskResource.h b/cpp/src/scheduler/resource/DiskResource.h index 77d2e978792aa3b0e3ed8d7c526b27994cc8d4ec..39211dbb664e8ab5c1d343d85889b917a9c34870 100644 --- a/cpp/src/scheduler/resource/DiskResource.h +++ b/cpp/src/scheduler/resource/DiskResource.h @@ -16,15 +16,14 @@ namespace engine { class DiskResource : public Resource { public: explicit - DiskResource(std::string name) - : Resource(std::move(name), ResourceType::DISK) {} + DiskResource(std::string name); protected: void - LoadFile(TaskPtr task) override {} + LoadFile(TaskPtr task) override; void - Process(TaskPtr task) override {} + Process(TaskPtr task) override; }; } diff --git a/cpp/src/scheduler/resource/GpuResource.cpp b/cpp/src/scheduler/resource/GpuResource.cpp new file mode 100644 index 0000000000000000000000000000000000000000..00d5df05b46fd70d9652a9ca618879d7c23f582d --- /dev/null +++ b/cpp/src/scheduler/resource/GpuResource.cpp @@ -0,0 +1,28 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "GpuResource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + + +GpuResource::GpuResource(std::string name) + : Resource(std::move(name), ResourceType::GPU) {} + +void GpuResource::LoadFile(TaskPtr task) { + +} + +void GpuResource::Process(TaskPtr task) { + +} + +} +} +} diff --git a/cpp/src/scheduler/resource/GpuResource.h b/cpp/src/scheduler/resource/GpuResource.h index 74fae13b75aff90a590c07c2ac1547b31bb4a28b..84bf163284db9aaa825f3b773fcda94fa5ea783a 100644 --- a/cpp/src/scheduler/resource/GpuResource.h +++ b/cpp/src/scheduler/resource/GpuResource.h @@ -16,15 +16,14 @@ namespace engine { class GpuResource : public Resource { public: explicit - GpuResource(std::string name) - : Resource(std::move(name), ResourceType::GPU) {} + GpuResource(std::string name); protected: void - LoadFile(TaskPtr task) override {} + LoadFile(TaskPtr task) override; void - Process(TaskPtr task) override {} + Process(TaskPtr task) override; }; } diff --git a/cpp/src/scheduler/resource/Node.cpp b/cpp/src/scheduler/resource/Node.cpp new file mode 100644 index 0000000000000000000000000000000000000000..76d61fe858125a8ece1b8177137d6e8541df72e5 --- /dev/null +++ b/cpp/src/scheduler/resource/Node.cpp @@ -0,0 +1,66 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include +#include "Node.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +Node::Node() { + static std::atomic_uint_fast8_t counter(0); + id_ = counter++; +} + +void Node::DelNeighbour(const NeighbourNodePtr &neighbour_ptr) { + std::lock_guard lk(mutex_); + if (auto s = neighbour_ptr.lock()) { + auto search = neighbours_.find(s->id_); + if (search != neighbours_.end()) { + neighbours_.erase(search); + } + } +} + +bool Node::IsNeighbour(const NeighbourNodePtr &neighbour_ptr) { + std::lock_guard lk(mutex_); + if (auto s = neighbour_ptr.lock()) { + auto search = neighbours_.find(s->id_); + if (search != neighbours_.end()) { + return true; + } + } + return false; +} + +std::vector Node::GetNeighbours() { + std::lock_guard lk(mutex_); + std::vector ret; + for (auto &e : neighbours_) { + ret.push_back(e.second); + } + return ret; +} + +std::string Node::Dump() { + // TODO(linxj): what's that? + return std::__cxx11::string(); +} + +void Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) { + std::lock_guard lk(mutex_); + if (auto s = neighbour_node.lock()) { + Neighbour neighbour(neighbour_node, connection); + neighbours_[s->id_] = neighbour; + } + // else do nothing, consider it.. +} + +} +} +} diff --git a/cpp/src/scheduler/resource/Node.h b/cpp/src/scheduler/resource/Node.h index 61ba4a343b15ec8e052bf1da84cd1bdcb4b11a30..a57987ca9c91b3409ede283e982c13faaf09d8cd 100644 --- a/cpp/src/scheduler/resource/Node.h +++ b/cpp/src/scheduler/resource/Node.h @@ -7,6 +7,7 @@ #include #include +#include #include "../TaskTable.h" #include "Connection.h" @@ -28,29 +29,31 @@ struct Neighbour { Connection connection; }; +// TODO(linxj): return type void -> Status class Node { public: + Node(); + void - AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) { - Neighbour neighbour(neighbour_node, connection); - neighbours_.emplace_back(neighbour); - } + AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection); void - DelNeighbour(NeighbourNodePtr neighbour_ptr) {} + DelNeighbour(const NeighbourNodePtr &neighbour_ptr); bool - IsNeighbour(NeighbourNodePtr neighbour_ptr) {} + IsNeighbour(const NeighbourNodePtr& neighbour_ptr); - const std::vector & - GetNeighbours() {} + std::vector + GetNeighbours(); public: std::string Dump(); private: - std::vector neighbours_; + std::mutex mutex_; + uint8_t id_; + std::map neighbours_; }; using NodePtr = std::shared_ptr; diff --git a/cpp/src/scheduler/resource/RegisterHandler.h b/cpp/src/scheduler/resource/RegisterHandler.h new file mode 100644 index 0000000000000000000000000000000000000000..02c55da1e752a0e6d9ec97a5a1efdccbca58235a --- /dev/null +++ b/cpp/src/scheduler/resource/RegisterHandler.h @@ -0,0 +1,24 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#pragma once + +#include + +namespace zilliz { +namespace milvus { +namespace engine { + +class RegisterHandler { + public: + virtual void Exec() = 0; +}; + +using RegisterHandlerPtr = std::shared_ptr; + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7f5925647495166205524ce8e927b8dec0e4e8b7 --- /dev/null +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -0,0 +1,98 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#include "Resource.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +Resource::Resource(std::string name, ResourceType type) + : name_(std::move(name)), + type_(type), + running_(false), + load_flag_(false), + exec_flag_(false) { +} + +void Resource::Start() { + loader_thread_ = std::thread(&Resource::loader_function, this); + executor_thread_ = std::thread(&Resource::executor_function, this); +} + +void Resource::Stop() { + running_ = false; + WakeupLoader(); + WakeupExecutor(); +} + +TaskTable &Resource::task_table() { + return task_table_; +} + +void Resource::WakeupExecutor() { + exec_cv_.notify_one(); +} + +void Resource::WakeupLoader() { + load_cv_.notify_one(); +} + +TaskPtr Resource::pick_task_load() { + auto indexes = PickToLoad(task_table_, 3); + for (auto index : indexes) { + // try to set one task loading, then return + if (task_table_.Load(index)) + return task_table_.Get(index).task; + // else try next + } + return nullptr; +} + +TaskPtr Resource::pick_task_execute() { + auto indexes = PickToExecute(task_table_, 3); + for (auto index : indexes) { + // try to set one task executing, then return + if (task_table_.Execute(index)) + return task_table_.Get(index).task; + // else try next + } + return nullptr; +} + +void Resource::loader_function() { + while (running_) { + std::unique_lock lock(load_mutex_); + load_cv_.wait(lock, [&] { return load_flag_; }); + auto task = pick_task_load(); + if (task) { + LoadFile(task); + GetRegisterFunc(RegisterType::ON_COPY_COMPLETED)->Exec(); + } + } +} + +void Resource::executor_function() { + GetRegisterFunc(RegisterType::START_UP)->Exec(); + while (running_) { + std::unique_lock lock(exec_mutex_); + exec_cv_.wait(lock, [&] { return exec_flag_; }); + auto task = pick_task_execute(); + if (task) { + Process(task); + GetRegisterFunc(RegisterType::ON_FINISH_TASK)->Exec(); + } + } +} + +RegisterHandlerPtr Resource::GetRegisterFunc(const RegisterType &type) { + // construct object each time. + return register_table_[type](); +} + +} +} +} \ No newline at end of file diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index bd1d11aa3c94deb6ebfc17c862287741c6b77eab..2961e281fae5b9e62cbdcd2dff8f8921f8c544dc 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -15,8 +15,9 @@ #include "../TaskTable.h" #include "../task/Task.h" #include "../Cost.h" -#include "Node.h" #include "Connection.h" +#include "Node.h" +#include "RegisterHandler.h" namespace zilliz { @@ -29,92 +30,50 @@ enum class ResourceType { GPU = 2 }; +enum class RegisterType { + START_UP, + ON_FINISH_TASK, + ON_COPY_COMPLETED, + ON_TASK_TABLE_UPDATED, +}; + class Resource : public Node { public: - void - Start() { - loader_thread_ = std::thread(&Resource::loader_function, this); - executor_thread_ = std::thread(&Resource::executor_function, this); + /* + * Event function MUST be a short function, never blocking; + */ + template + void Register_T(const RegisterType& type) { + register_table_.emplace(type, [] { return std::make_shared(); }); } + RegisterHandlerPtr + GetRegisterFunc(const RegisterType& type); + void - Stop() { - running_ = false; - WakeupLoader(); - WakeupExecutor(); - } + Start(); + + void + Stop(); TaskTable & - task_table() { - return task_table_; - } + task_table(); public: /* * wake up executor; */ void - WakeupExecutor() { - exec_cv_.notify_one(); - } + WakeupExecutor(); /* * wake up loader; */ void - WakeupLoader() { - load_cv_.notify_one(); - } - -public: - /* - * Event function MUST be a short function, never blocking; - */ - - /* - * Register on start up event; - */ - void - RegisterOnStartUp(std::function func) { - on_start_up_ = func; - } - - /* - * Register on finish one task event; - */ - void - RegisterOnFinishTask(std::function func) { - on_finish_task_ = func; - } - - /* - * Register on copy task data completed event; - */ - void - RegisterOnCopyCompleted(std::function func) { - on_copy_completed_ = func; - } - - /* - * Register on task table updated event; - */ - void - RegisterOnTaskTableUpdated(std::function func) { - on_task_table_updated_ = func; - } + WakeupLoader(); protected: - Resource(std::string name, ResourceType type) - : name_(std::move(name)), - type_(type), - on_start_up_(nullptr), - on_finish_task_(nullptr), - on_copy_completed_(nullptr), - on_task_table_updated_(nullptr), - running_(false), - load_flag_(false), - exec_flag_(false) { - } + Resource(std::string name, ResourceType type); // TODO: SearchContextPtr to TaskPtr /* @@ -142,67 +101,27 @@ private: * Order by start time; */ TaskPtr - pick_task_load() { - auto indexes = PickToLoad(task_table_, 3); - for (auto index : indexes) { - // try to set one task loading, then return - if (task_table_.Load(index)) - return task_table_.Get(index).task; - // else try next - } - return nullptr; - } + pick_task_load(); /* * Pick one task to execute; * Pick by start time and priority; */ TaskPtr - pick_task_execute() { - auto indexes = PickToExecute(task_table_, 3); - for (auto index : indexes) { - // try to set one task executing, then return - if (task_table_.Execute(index)) - return task_table_.Get(index).task; - // else try next - } - return nullptr; - } + pick_task_execute(); private: /* * Only called by load thread; */ void - loader_function() { - while (running_) { - std::unique_lock lock(load_mutex_); - load_cv_.wait(lock, [&] { return load_flag_; }); - auto task = pick_task_load(); - if (task) { - LoadFile(task); - on_copy_completed_(); - } - } - - } + loader_function(); /* * Only called by worker thread; */ void - executor_function() { - on_start_up_(); - while (running_) { - std::unique_lock lock(exec_mutex_); - exec_cv_.wait(lock, [&] { return exec_flag_; }); - auto task = pick_task_execute(); - if (task) { - Process(task); - on_finish_task_(); - } - } - } + executor_function(); private: @@ -211,10 +130,7 @@ private: TaskTable task_table_; - std::function on_start_up_; - std::function on_finish_task_; - std::function on_copy_completed_; - std::function on_task_table_updated_; + std::map> register_table_; bool running_; std::thread loader_thread_;