diff --git a/cpp/src/scheduler/Algorithm.cpp b/cpp/src/scheduler/Algorithm.cpp index b388c17bdd0b04fe4222ead7f0e389a07a4d06c2..10a585304a681f01e594b3985d8e8624ebbb5d9f 100644 --- a/cpp/src/scheduler/Algorithm.cpp +++ b/cpp/src/scheduler/Algorithm.cpp @@ -16,20 +16,23 @@ // under the License. -#include "Algorithm.h" +#include "scheduler/Algorithm.h" + +#include +#include +#include namespace zilliz { namespace milvus { namespace scheduler { -constexpr uint64_t MAXINT = std::numeric_limits::max(); +constexpr uint64_t MAXINT = std::numeric_limits::max(); uint64_t ShortestPath(const ResourcePtr &src, const ResourcePtr &dest, const ResourceMgrPtr &res_mgr, std::vector &path) { - std::vector> paths; uint64_t num_of_resources = res_mgr->GetAllResources().size(); @@ -53,7 +56,6 @@ ShortestPath(const ResourcePtr &src, std::vector vis(num_of_resources, false); std::vector dis(num_of_resources, MAXINT); for (auto &res : res_mgr->GetAllResources()) { - auto cur_node = std::static_pointer_cast(res); auto cur_neighbours = cur_node->GetNeighbours(); @@ -105,6 +107,6 @@ ShortestPath(const ResourcePtr &src, return dis[name_id_map.at(dest->name())]; } -} -} -} \ No newline at end of file +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/Algorithm.h b/cpp/src/scheduler/Algorithm.h index be77cf351f428da9eb99b159b6027580366ed7c9..d7e0233ba0eb19f4c2e223e863ae53e5930a0451 100644 --- a/cpp/src/scheduler/Algorithm.h +++ b/cpp/src/scheduler/Algorithm.h @@ -30,8 +30,8 @@ uint64_t ShortestPath(const ResourcePtr &src, const ResourcePtr &dest, const ResourceMgrPtr &res_mgr, - std::vector& path); + std::vector &path); -} -} -} \ No newline at end of file +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/Definition.h b/cpp/src/scheduler/Definition.h index 9ac647de8847e5f1a39b74cc24fd0853f319cc67..ce41aca48d958377346f7f64c6a065fc284b129c 100644 --- a/cpp/src/scheduler/Definition.h +++ b/cpp/src/scheduler/Definition.h @@ -30,7 +30,6 @@ #include "db/engine/EngineFactory.h" #include "db/engine/ExecutionEngine.h" - namespace zilliz { namespace milvus { namespace scheduler { @@ -43,6 +42,6 @@ using EngineFactory = engine::EngineFactory; using EngineType = engine::EngineType; using MetricType = engine::MetricType; -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/JobMgr.cpp b/cpp/src/scheduler/JobMgr.cpp index 31db4b344ef35bad3e5ecd5e020f1eb91029afac..0406e98c4908fdd6bb56da9d27468a3457cfc11a 100644 --- a/cpp/src/scheduler/JobMgr.cpp +++ b/cpp/src/scheduler/JobMgr.cpp @@ -15,19 +15,19 @@ // specific language governing permissions and limitations // under the License. -#include "JobMgr.h" +#include "scheduler/JobMgr.h" #include "task/Task.h" #include "TaskCreator.h" +#include namespace zilliz { namespace milvus { namespace scheduler { -using namespace engine; - JobMgr::JobMgr(ResourceMgrPtr res_mgr) - : res_mgr_(std::move(res_mgr)) {} + : res_mgr_(std::move(res_mgr)) { +} void JobMgr::Start() { @@ -59,7 +59,9 @@ void JobMgr::worker_function() { while (running_) { std::unique_lock lock(mutex_); - cv_.wait(lock, [this] { return !queue_.empty(); }); + cv_.wait(lock, [this] { + return !queue_.empty(); + }); auto job = queue_.front(); queue_.pop(); lock.unlock(); @@ -84,6 +86,6 @@ JobMgr::build_task(const JobPtr &job) { return TaskCreator::Create(job); } -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/JobMgr.h b/cpp/src/scheduler/JobMgr.h index 8d22b5d4f5a7e262fe11d176302525b7a4701917..49ba9154e37958c2bdb182708ece10a48b8b6b3a 100644 --- a/cpp/src/scheduler/JobMgr.h +++ b/cpp/src/scheduler/JobMgr.h @@ -31,15 +31,13 @@ #include "task/Task.h" #include "ResourceMgr.h" - namespace zilliz { namespace milvus { namespace scheduler { class JobMgr { -public: - explicit - JobMgr(ResourceMgrPtr res_mgr); + public: + explicit JobMgr(ResourceMgrPtr res_mgr); void Start(); @@ -47,18 +45,18 @@ public: void Stop(); -public: + public: void Put(const JobPtr &job); -private: + private: void worker_function(); std::vector build_task(const JobPtr &job); -private: + private: bool running_ = false; std::queue queue_; @@ -72,6 +70,6 @@ private: using JobMgrPtr = std::shared_ptr; -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/ResourceFactory.cpp b/cpp/src/scheduler/ResourceFactory.cpp index 066b47aad7e38a855aa2e7ba4defead0ad48f950..de9b5bc717c6bab4cc59c3ff1bd904a62a9d0894 100644 --- a/cpp/src/scheduler/ResourceFactory.cpp +++ b/cpp/src/scheduler/ResourceFactory.cpp @@ -16,8 +16,7 @@ // under the License. -#include "ResourceFactory.h" - +#include "scheduler/ResourceFactory.h" namespace zilliz { namespace milvus { @@ -40,6 +39,6 @@ ResourceFactory::Create(const std::string &name, } } -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/ResourceFactory.h b/cpp/src/scheduler/ResourceFactory.h index 39fb79154b9ae69376fae3e0f926dbc56a443d31..f7a47ef1e565e0fa3be527c26546cc5e6885bc04 100644 --- a/cpp/src/scheduler/ResourceFactory.h +++ b/cpp/src/scheduler/ResourceFactory.h @@ -25,13 +25,12 @@ #include "resource/GpuResource.h" #include "resource/DiskResource.h" - namespace zilliz { namespace milvus { namespace scheduler { class ResourceFactory { -public: + public: static std::shared_ptr Create(const std::string &name, const std::string &type, @@ -40,8 +39,6 @@ public: bool enable_executor = true); }; - -} -} -} - +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/ResourceMgr.cpp b/cpp/src/scheduler/ResourceMgr.cpp index 0f9fab8e8dd39ac41d903d8044379e9ace6c20a8..6067b2eb01d332139fa0bf1b3281c81f17125d8b 100644 --- a/cpp/src/scheduler/ResourceMgr.cpp +++ b/cpp/src/scheduler/ResourceMgr.cpp @@ -16,15 +16,13 @@ // specific language governing permissions and limitations // under the License. -#include "ResourceMgr.h" +#include "scheduler/ResourceMgr.h" #include "utils/Log.h" - namespace zilliz { namespace milvus { namespace scheduler { - void ResourceMgr::Start() { std::lock_guard lck(resources_mutex_); @@ -186,7 +184,9 @@ void ResourceMgr::event_process() { while (running_) { std::unique_lock lock(event_mutex_); - event_cv_.wait(lock, [this] { return !queue_.empty(); }); + event_cv_.wait(lock, [this] { + return !queue_.empty(); + }); auto event = queue_.front(); queue_.pop(); @@ -201,6 +201,6 @@ ResourceMgr::event_process() { } } -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index 8bd045ab7d51ee220b2f4a92687ce83b21217b01..d03408a7df90954307bc87f8beb8e6a5b4c53a77 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -22,21 +22,21 @@ #include #include #include +#include #include #include "resource/Resource.h" #include "utils/Log.h" - namespace zilliz { namespace milvus { namespace scheduler { class ResourceMgr { -public: + public: ResourceMgr() = default; -public: + public: /******** Management Interface ********/ void Start(); @@ -58,7 +58,7 @@ public: subscriber_ = std::move(subscriber); } -public: + public: /******** Management Interface ********/ inline std::vector & GetDiskResources() { @@ -89,10 +89,10 @@ public: uint64_t GetNumGpuResource() const; -public: + public: // TODO: add stats interface(low) -public: + public: /******** Utility Functions ********/ std::string Dump(); @@ -100,14 +100,14 @@ public: std::string DumpTaskTables(); -private: + private: void post_event(const EventPtr &event); void event_process(); -private: + private: bool running_ = false; std::vector disk_resources_; @@ -120,13 +120,11 @@ private: std::condition_variable event_cv_; std::thread worker_thread_; - }; using ResourceMgrPtr = std::shared_ptr; using ResourceMgrWPtr = std::weak_ptr; -} -} -} - +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/SchedInst.cpp b/cpp/src/scheduler/SchedInst.cpp index 5d65d35993d8207ad9b60dddf9c5c7b457b9e228..71b40de9eed2e6ca42f8f5a56f4c3e66a32925cf 100644 --- a/cpp/src/scheduler/SchedInst.cpp +++ b/cpp/src/scheduler/SchedInst.cpp @@ -16,12 +16,16 @@ // under the License. -#include "SchedInst.h" +#include "scheduler/SchedInst.h" #include "server/Config.h" #include "ResourceFactory.h" #include "knowhere/index/vector_index/IndexGPUIVF.h" #include "Utils.h" +#include +#include +#include +#include namespace zilliz { namespace milvus { @@ -165,6 +169,7 @@ StopSchedulerService() { SchedInst::GetInstance()->Stop(); ResMgrInst::GetInstance()->Stop(); } -} -} -} + +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/SchedInst.h b/cpp/src/scheduler/SchedInst.h index d23c200b05962b91240ea87b3642149b4714d062..4cca6ec5a957f28a589a3f715120a93177669c6d 100644 --- a/cpp/src/scheduler/SchedInst.h +++ b/cpp/src/scheduler/SchedInst.h @@ -24,13 +24,12 @@ #include #include - namespace zilliz { namespace milvus { namespace scheduler { class ResMgrInst { -public: + public: static ResourceMgrPtr GetInstance() { if (instance == nullptr) { @@ -42,13 +41,13 @@ public: return instance; } -private: + private: static ResourceMgrPtr instance; static std::mutex mutex_; }; class SchedInst { -public: + public: static SchedulerPtr GetInstance() { if (instance == nullptr) { @@ -60,13 +59,13 @@ public: return instance; } -private: + private: static SchedulerPtr instance; static std::mutex mutex_; }; class JobMgrInst { -public: + public: static scheduler::JobMgrPtr GetInstance() { if (instance == nullptr) { @@ -78,7 +77,7 @@ public: return instance; } -private: + private: static scheduler::JobMgrPtr instance; static std::mutex mutex_; }; @@ -89,6 +88,6 @@ StartSchedulerService(); void StopSchedulerService(); -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/Scheduler.cpp b/cpp/src/scheduler/Scheduler.cpp index abaccd4fcbd4cad6488e81660e5c1cf630310046..24f7bfe73b3e62f2a7b53b8fe3a2f96f6aa60da5 100644 --- a/cpp/src/scheduler/Scheduler.cpp +++ b/cpp/src/scheduler/Scheduler.cpp @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. - -#include "src/cache/GpuCacheMgr.h" +#include "scheduler/Scheduler.h" +#include "cache/GpuCacheMgr.h" #include "event/LoadCompletedEvent.h" -#include "Scheduler.h" #include "action/Action.h" #include "Algorithm.h" +#include namespace zilliz { namespace milvus { @@ -43,7 +43,6 @@ Scheduler::Scheduler(ResourceMgrWPtr res_mgr) std::bind(&Scheduler::OnFinishTask, this, std::placeholders::_1))); } - void Scheduler::Start() { running_ = true; @@ -79,7 +78,9 @@ void Scheduler::worker_function() { while (running_) { std::unique_lock lock(event_mutex_); - event_cv_.wait(lock, [this] { return !event_queue_.empty(); }); + event_cv_.wait(lock, [this] { + return !event_queue_.empty(); + }); auto event = event_queue_.front(); event_queue_.pop(); if (event == nullptr) { @@ -142,6 +143,6 @@ Scheduler::OnTaskTableUpdated(const EventPtr &event) { } } -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/Scheduler.h b/cpp/src/scheduler/Scheduler.h index b591fdd256f5b6b59c83a133d290c6b500952bac..073f7eeb0d8d9617a01cf54ca82e7ba98797e420 100644 --- a/cpp/src/scheduler/Scheduler.h +++ b/cpp/src/scheduler/Scheduler.h @@ -22,22 +22,20 @@ #include #include #include +#include #include "resource/Resource.h" #include "ResourceMgr.h" #include "utils/Log.h" - namespace zilliz { namespace milvus { namespace scheduler { - // TODO: refactor, not friendly to unittest, logical in framework code class Scheduler { -public: - explicit - Scheduler(ResourceMgrWPtr res_mgr); + public: + explicit Scheduler(ResourceMgrWPtr res_mgr); Scheduler(const Scheduler &) = delete; Scheduler(Scheduler &&) = delete; @@ -66,7 +64,7 @@ public: std::string Dump(); -private: + private: /******** Events ********/ /* @@ -106,7 +104,7 @@ private: void OnTaskTableUpdated(const EventPtr &event); -private: + private: /* * Dispatch event to event handler; */ @@ -119,7 +117,7 @@ private: void worker_function(); -private: + private: bool running_; std::unordered_map> event_register_; @@ -133,7 +131,6 @@ private: using SchedulerPtr = std::shared_ptr; -} -} -} - +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/TaskCreator.cpp b/cpp/src/scheduler/TaskCreator.cpp index 0d3b2a8a9698e24c719920397d598816d8a93807..8b0378c646299924a6fca2e10b0d82472d61bf83 100644 --- a/cpp/src/scheduler/TaskCreator.cpp +++ b/cpp/src/scheduler/TaskCreator.cpp @@ -15,11 +15,10 @@ // specific language governing permissions and limitations // under the License. -#include -#include "TaskCreator.h" +#include "scheduler/TaskCreator.h" +#include "scheduler/tasklabel/BroadcastLabel.h" #include "tasklabel/DefaultLabel.h" - namespace zilliz { namespace milvus { namespace scheduler { @@ -64,8 +63,6 @@ TaskCreator::Create(const DeleteJobPtr &job) { return tasks; } - -} -} -} - +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/TaskCreator.h b/cpp/src/scheduler/TaskCreator.h index d4e87bd0a2ed4e65d12066329cb78d2e5e35a8c8..81cb25010f307c7424375a2340e4b8debc7cfa1b 100644 --- a/cpp/src/scheduler/TaskCreator.h +++ b/cpp/src/scheduler/TaskCreator.h @@ -34,17 +34,16 @@ #include "task/SearchTask.h" #include "task/DeleteTask.h" - namespace zilliz { namespace milvus { namespace scheduler { class TaskCreator { -public: + public: static std::vector Create(const JobPtr &job); -public: + public: static std::vector Create(const SearchJobPtr &job); @@ -52,6 +51,6 @@ public: Create(const DeleteJobPtr &job); }; -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index badcb46c2d16037ccd548203d17b0f5ca42e9ef3..a7343ee5093c38e443a6c742c35bfb3a52845fb1 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -16,7 +16,7 @@ // under the License. -#include "TaskTable.h" +#include "scheduler/TaskTable.h" #include "event/TaskTableUpdatedEvent.h" #include "Utils.h" @@ -24,7 +24,6 @@ #include #include - namespace zilliz { namespace milvus { namespace scheduler { @@ -75,6 +74,7 @@ TaskTableItem::Load() { } return false; } + bool TaskTableItem::Loaded() { std::unique_lock lock(mutex); @@ -86,6 +86,7 @@ TaskTableItem::Loaded() { } return false; } + bool TaskTableItem::Execute() { std::unique_lock lock(mutex); @@ -97,6 +98,7 @@ TaskTableItem::Execute() { } return false; } + bool TaskTableItem::Executed() { std::unique_lock lock(mutex); @@ -109,6 +111,7 @@ TaskTableItem::Executed() { } return false; } + bool TaskTableItem::Move() { std::unique_lock lock(mutex); @@ -120,6 +123,7 @@ TaskTableItem::Move() { } return false; } + bool TaskTableItem::Moved() { std::unique_lock lock(mutex); @@ -206,7 +210,6 @@ TaskTable::Put(std::vector &tasks) { } } - TaskTableItemPtr TaskTable::Get(uint64_t index) { return table_[index]; @@ -232,6 +235,6 @@ TaskTable::Dump() { return ss.str(); } -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index c9557c8f9ee11c3564628eaddacb8f43b68c5645..819197c1adaeb589e63e27ff2ed784ce27d8b6e2 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -20,11 +20,13 @@ #include #include #include +#include +#include +#include #include "task/SearchTask.h" #include "event/Event.h" - namespace zilliz { namespace milvus { namespace scheduler { @@ -52,7 +54,8 @@ struct TaskTimestamp { }; struct TaskTableItem { - TaskTableItem() : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex() {} + TaskTableItem() : id(0), task(nullptr), state(TaskTableItemState::INVALID), mutex() { + } TaskTableItem(const TaskTableItem &src) = delete; TaskTableItem(TaskTableItem &&) = delete; @@ -91,7 +94,7 @@ struct TaskTableItem { using TaskTableItemPtr = std::shared_ptr; class TaskTable { -public: + public: TaskTable() = default; TaskTable(const TaskTable &) = delete; @@ -145,24 +148,28 @@ public: return table_.size(); } -public: + public: TaskTableItemPtr & operator[](uint64_t index) { return table_[index]; } - std::deque::iterator begin() { return table_.begin(); } - std::deque::iterator end() { return table_.end(); } + std::deque::iterator begin() { + return table_.begin(); + } -public: + std::deque::iterator end() { + return table_.end(); + } + + public: std::vector PickToLoad(uint64_t limit); std::vector PickToExecute(uint64_t limit); -public: - + public: /******** Action ********/ // TODO: bool to Status @@ -227,14 +234,14 @@ public: return table_[index]->Moved(); } -public: + public: /* * Dump; */ std::string Dump(); -private: + private: std::uint64_t id_ = 0; mutable std::mutex id_mutex_; std::deque table_; @@ -246,7 +253,6 @@ private: uint64_t last_finish_ = -1; }; - -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/Utils.cpp b/cpp/src/scheduler/Utils.cpp index f207845c2052733e2240e691c15be3b5663f14b8..bb19950ffece8f6de4ad15e478e9ad7946989979 100644 --- a/cpp/src/scheduler/Utils.cpp +++ b/cpp/src/scheduler/Utils.cpp @@ -16,12 +16,11 @@ // under the License. -#include "Utils.h" +#include "scheduler/Utils.h" #include #include - namespace zilliz { namespace milvus { namespace scheduler { @@ -41,6 +40,6 @@ get_num_gpu() { return n_devices; } -} -} -} \ No newline at end of file +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/Utils.h b/cpp/src/scheduler/Utils.h index f1056f104a900cbd19458a7b85e019255b48708c..c69028f0fa64e2f0d57d1f82291ebe3c3523d587 100644 --- a/cpp/src/scheduler/Utils.h +++ b/cpp/src/scheduler/Utils.h @@ -18,7 +18,6 @@ #include - namespace zilliz { namespace milvus { namespace scheduler { @@ -29,6 +28,6 @@ get_current_timestamp(); uint64_t get_num_gpu(); -} -} -} \ No newline at end of file +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/action/Action.h b/cpp/src/scheduler/action/Action.h index 9f2fa9f6c5738da3a43f6adfc72b2318dc72041e..a5f67aa98d05cbb8a3a9c804f0e395e82d930c3e 100644 --- a/cpp/src/scheduler/action/Action.h +++ b/cpp/src/scheduler/action/Action.h @@ -17,16 +17,17 @@ #pragma once -#include "../resource/Resource.h" -#include "../ResourceMgr.h" +#include "scheduler/resource/Resource.h" +#include "scheduler/ResourceMgr.h" +#include namespace zilliz { namespace milvus { namespace scheduler { class Action { -public: + public: static void PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self); @@ -43,10 +44,8 @@ public: SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, ResourcePtr resource, std::shared_ptr event); - }; - -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp index d53903ba2dcb74b3ec0f83b44b74cf5e46d5e717..909112eb627c5a7c4d6e7d2b6f73a63656c95ae0 100644 --- a/cpp/src/scheduler/action/PushTaskToNeighbour.cpp +++ b/cpp/src/scheduler/action/PushTaskToNeighbour.cpp @@ -22,7 +22,6 @@ #include "src/cache/GpuCacheMgr.h" #include "Action.h" - namespace zilliz { namespace milvus { namespace scheduler { @@ -57,13 +56,12 @@ get_neighbours_with_connetion(const ResourcePtr &self) { return neighbours; } - void Action::PushTaskToNeighbourRandomly(const TaskPtr &task, const ResourcePtr &self) { auto neighbours = get_neighbours_with_connetion(self); if (not neighbours.empty()) { - std::vector speeds; + std::vector speeds; uint64_t total_speed = 0; for (auto &neighbour : neighbours) { uint64_t speed = neighbour.second.speed(); @@ -87,7 +85,6 @@ Action::PushTaskToNeighbourRandomly(const TaskPtr &task, } else { //TODO: process } - } void @@ -99,14 +96,14 @@ Action::PushTaskToAllNeighbour(const TaskPtr &task, const ResourcePtr &self) { } void -Action::PushTaskToResource(const TaskPtr& task, const ResourcePtr& dest) { +Action::PushTaskToResource(const TaskPtr &task, const ResourcePtr &dest) { dest->task_table().Put(task); } void Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, - ResourcePtr resource, - std::shared_ptr event) { + ResourcePtr resource, + std::shared_ptr event) { if (not resource->HasExecutor() && event->task_table_item_->Move()) { auto task = event->task_table_item_->task; auto search_task = std::static_pointer_cast(task); @@ -135,8 +132,8 @@ Action::DefaultLabelTaskScheduler(ResourceMgrWPtr res_mgr, void Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, - ResourcePtr resource, - std::shared_ptr event) { + ResourcePtr resource, + std::shared_ptr event) { auto task = event->task_table_item_->task; if (resource->type() == ResourceType::DISK) { // step 1: calculate shortest path per resource, from disk to compute resource @@ -181,7 +178,6 @@ Action::SpecifiedResourceLabelTaskScheduler(ResourceMgrWPtr res_mgr, } } -} -} -} - +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/event/Event.h b/cpp/src/scheduler/event/Event.h index b00bd7f90255431f51e469aa1d8d6fbeda50212c..860c60c5b727b600972e73f86d32dc8bbace3ce9 100644 --- a/cpp/src/scheduler/event/Event.h +++ b/cpp/src/scheduler/event/Event.h @@ -18,6 +18,8 @@ #pragma once #include +#include +#include namespace zilliz { namespace milvus { @@ -33,11 +35,12 @@ enum class EventType { class Resource; class Event { -public: + public: explicit Event(EventType type, std::weak_ptr resource) : type_(type), - resource_(std::move(resource)) {} + resource_(std::move(resource)) { + } inline EventType Type() const { @@ -49,13 +52,13 @@ public: friend std::ostream &operator<<(std::ostream &out, const Event &event); -public: + public: EventType type_; std::weak_ptr resource_; }; using EventPtr = std::shared_ptr; -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/event/EventDump.cpp b/cpp/src/scheduler/event/EventDump.cpp index 3d06dfd5f5529997f30127fcdb6a6869270711f9..a9ed751d88486bcf95950869be9a67eaf197de99 100644 --- a/cpp/src/scheduler/event/EventDump.cpp +++ b/cpp/src/scheduler/event/EventDump.cpp @@ -22,36 +22,40 @@ #include "FinishTaskEvent.h" #include "TaskTableUpdatedEvent.h" - namespace zilliz { namespace milvus { namespace scheduler { -std::ostream &operator<<(std::ostream &out, const Event &event) { +std::ostream & +operator<<(std::ostream &out, const Event &event) { out << event.Dump(); return out; } -std::ostream &operator<<(std::ostream &out, const StartUpEvent &event) { +std::ostream & +operator<<(std::ostream &out, const StartUpEvent &event) { out << event.Dump(); return out; } -std::ostream &operator<<(std::ostream &out, const LoadCompletedEvent &event) { +std::ostream & +operator<<(std::ostream &out, const LoadCompletedEvent &event) { out << event.Dump(); return out; } -std::ostream &operator<<(std::ostream &out, const FinishTaskEvent &event) { +std::ostream & +operator<<(std::ostream &out, const FinishTaskEvent &event) { out << event.Dump(); return out; } -std::ostream &operator<<(std::ostream &out, const TaskTableUpdatedEvent &event) { +std::ostream & +operator<<(std::ostream &out, const TaskTableUpdatedEvent &event) { out << event.Dump(); return out; } -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/event/FinishTaskEvent.h b/cpp/src/scheduler/event/FinishTaskEvent.h index 3ffa855f30eb393a94974edef674680c3cc786cc..f49acb16ad4955a1c06aeecad5c3985fe8c65ec3 100644 --- a/cpp/src/scheduler/event/FinishTaskEvent.h +++ b/cpp/src/scheduler/event/FinishTaskEvent.h @@ -17,18 +17,22 @@ #pragma once -#include "Event.h" +#include "scheduler/event/Event.h" +#include +#include +#include namespace zilliz { namespace milvus { namespace scheduler { class FinishTaskEvent : public Event { -public: + public: FinishTaskEvent(std::weak_ptr resource, TaskTableItemPtr task_table_item) : Event(EventType::FINISH_TASK, std::move(resource)), - task_table_item_(std::move(task_table_item)) {} + task_table_item_(std::move(task_table_item)) { + } inline std::string Dump() const override { @@ -37,10 +41,10 @@ public: friend std::ostream &operator<<(std::ostream &out, const FinishTaskEvent &event); -public: + public: TaskTableItemPtr task_table_item_; }; -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/event/LoadCompletedEvent.h b/cpp/src/scheduler/event/LoadCompletedEvent.h index 286f26723ffc1ab9cb5e76d5bbefec3e977c0c3b..8d727f7435984b3573d44fcab44fbbe21e964504 100644 --- a/cpp/src/scheduler/event/LoadCompletedEvent.h +++ b/cpp/src/scheduler/event/LoadCompletedEvent.h @@ -17,19 +17,23 @@ #pragma once -#include "Event.h" -#include "../TaskTable.h" +#include "scheduler/event/Event.h" +#include "scheduler/TaskTable.h" +#include +#include +#include namespace zilliz { namespace milvus { namespace scheduler { class LoadCompletedEvent : public Event { -public: + public: LoadCompletedEvent(std::weak_ptr resource, TaskTableItemPtr task_table_item) : Event(EventType::LOAD_COMPLETED, std::move(resource)), - task_table_item_(std::move(task_table_item)) {} + task_table_item_(std::move(task_table_item)) { + } inline std::string Dump() const override { @@ -38,10 +42,10 @@ public: friend std::ostream &operator<<(std::ostream &out, const LoadCompletedEvent &event); -public: + public: TaskTableItemPtr task_table_item_; }; -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/event/StartUpEvent.h b/cpp/src/scheduler/event/StartUpEvent.h index 089eb1377b855a89814c47a740387a5e8b010cac..8e4ad120dee2f5c297e190cce5209c2e71d1e692 100644 --- a/cpp/src/scheduler/event/StartUpEvent.h +++ b/cpp/src/scheduler/event/StartUpEvent.h @@ -17,18 +17,21 @@ #pragma once -#include "Event.h" +#include "scheduler/event/Event.h" +#include +#include +#include namespace zilliz { namespace milvus { namespace scheduler { class StartUpEvent : public Event { -public: - explicit - StartUpEvent(std::weak_ptr resource) - : Event(EventType::START_UP, std::move(resource)) {} + public: + explicit StartUpEvent(std::weak_ptr resource) + : Event(EventType::START_UP, std::move(resource)) { + } inline std::string Dump() const override { @@ -38,6 +41,6 @@ public: friend std::ostream &operator<<(std::ostream &out, const StartUpEvent &event); }; -} -} -} \ No newline at end of file +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/event/TaskTableUpdatedEvent.h b/cpp/src/scheduler/event/TaskTableUpdatedEvent.h index 70ee28fdab73f3c09d45068cce1c06c78811f10a..ec579b31bef4214d75d79bafe2f66ac81b246497 100644 --- a/cpp/src/scheduler/event/TaskTableUpdatedEvent.h +++ b/cpp/src/scheduler/event/TaskTableUpdatedEvent.h @@ -19,16 +19,19 @@ #include "Event.h" +#include +#include +#include namespace zilliz { namespace milvus { namespace scheduler { class TaskTableUpdatedEvent : public Event { -public: - explicit - TaskTableUpdatedEvent(std::weak_ptr resource) - : Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) {} + public: + explicit TaskTableUpdatedEvent(std::weak_ptr resource) + : Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) { + } inline std::string Dump() const override { @@ -38,7 +41,6 @@ public: friend std::ostream &operator<<(std::ostream &out, const TaskTableUpdatedEvent &event); }; - -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/job/DeleteJob.cpp b/cpp/src/scheduler/job/DeleteJob.cpp index 9066afafaac311dfb5ffbf55b880eaab60d4e507..9d917751c69616f4ffec2e4b8b2a0ed4cecaff7c 100644 --- a/cpp/src/scheduler/job/DeleteJob.cpp +++ b/cpp/src/scheduler/job/DeleteJob.cpp @@ -15,8 +15,9 @@ // specific language governing permissions and limitations // under the License. -#include "DeleteJob.h" +#include "scheduler/job/DeleteJob.h" +#include namespace zilliz { namespace milvus { @@ -29,15 +30,20 @@ DeleteJob::DeleteJob(JobId id, : Job(id, JobType::DELETE), table_id_(std::move(table_id)), meta_ptr_(std::move(meta_ptr)), - num_resource_(num_resource) {} + num_resource_(num_resource) { +} -void DeleteJob::WaitAndDelete() { +void +DeleteJob::WaitAndDelete() { std::unique_lock lock(mutex_); - cv_.wait(lock, [&] { return done_resource == num_resource_; }); + cv_.wait(lock, [&] { + return done_resource == num_resource_; + }); meta_ptr_->DeleteTableFiles(table_id_); } -void DeleteJob::ResourceDone() { +void +DeleteJob::ResourceDone() { { std::lock_guard lock(mutex_); ++done_resource; @@ -45,7 +51,6 @@ void DeleteJob::ResourceDone() { cv_.notify_one(); } -} -} -} - +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/job/DeleteJob.h b/cpp/src/scheduler/job/DeleteJob.h index d82262b235acf81665a6b042b698e6c65464c4dc..7d8b20e47c5df683ed3828b3720637ae34c20020 100644 --- a/cpp/src/scheduler/job/DeleteJob.h +++ b/cpp/src/scheduler/job/DeleteJob.h @@ -30,26 +30,25 @@ #include "Job.h" #include "db/meta/Meta.h" - namespace zilliz { namespace milvus { namespace scheduler { class DeleteJob : public Job { -public: + public: DeleteJob(JobId id, std::string table_id, engine::meta::MetaPtr meta_ptr, uint64_t num_resource); -public: + public: void WaitAndDelete(); void ResourceDone(); -public: + public: std::string table_id() const { return table_id_; @@ -60,7 +59,7 @@ public: return meta_ptr_; } -private: + private: std::string table_id_; engine::meta::MetaPtr meta_ptr_; @@ -72,7 +71,6 @@ private: using DeleteJobPtr = std::shared_ptr; -} -} -} - +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/job/Job.h b/cpp/src/scheduler/job/Job.h index 242c33a4e59580ac1a15f22d7af9453654a8cd71..c646a4f034a85d1a20f77409d54bf91dde00964f 100644 --- a/cpp/src/scheduler/job/Job.h +++ b/cpp/src/scheduler/job/Job.h @@ -27,7 +27,6 @@ #include #include - namespace zilliz { namespace milvus { namespace scheduler { @@ -42,7 +41,7 @@ enum class JobType { using JobId = std::uint64_t; class Job { -public: + public: inline JobId id() const { return id_; @@ -53,10 +52,11 @@ public: return type_; } -protected: - Job(JobId id, JobType type) : id_(id), type_(type) {} + protected: + Job(JobId id, JobType type) : id_(id), type_(type) { + } -private: + private: JobId id_; JobType type_; }; @@ -64,7 +64,6 @@ private: using JobPtr = std::shared_ptr; using JobWPtr = std::weak_ptr; -} -} -} - +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/job/SearchJob.cpp b/cpp/src/scheduler/job/SearchJob.cpp index 65b6701b5f6a1f43942fe7418e1d748cbe78dd7e..dee7125fed1de745d3762a784d445368f98ce6d0 100644 --- a/cpp/src/scheduler/job/SearchJob.cpp +++ b/cpp/src/scheduler/job/SearchJob.cpp @@ -15,11 +15,9 @@ // specific language governing permissions and limitations // under the License. +#include "scheduler/job/SearchJob.h" #include "utils/Log.h" -#include "SearchJob.h" - - namespace zilliz { namespace milvus { namespace scheduler { @@ -33,7 +31,8 @@ SearchJob::SearchJob(zilliz::milvus::scheduler::JobId id, topk_(topk), nq_(nq), nprobe_(nprobe), - vectors_(vectors) {} + vectors_(vectors) { +} bool SearchJob::AddIndexFile(const TableFileSchemaPtr &index_file) { @@ -48,11 +47,12 @@ SearchJob::AddIndexFile(const TableFileSchemaPtr &index_file) { return true; } - void SearchJob::WaitResult() { std::unique_lock lock(mutex_); - cv_.wait(lock, [this] { return index_files_.empty(); }); + cv_.wait(lock, [this] { + return index_files_.empty(); + }); SERVER_LOG_DEBUG << "SearchJob " << id() << " all done"; } @@ -69,14 +69,11 @@ SearchJob::GetResult() { return result_; } -Status& +Status & SearchJob::GetStatus() { return status_; } - -} -} -} - - +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/job/SearchJob.h b/cpp/src/scheduler/job/SearchJob.h index 12edeab1994d11b08bceeeb90fc92c8411c46ef1..7bb7fbefbf4b24232bb826aa4a1909d900da662b 100644 --- a/cpp/src/scheduler/job/SearchJob.h +++ b/cpp/src/scheduler/job/SearchJob.h @@ -26,16 +26,15 @@ #include #include #include +#include #include "Job.h" #include "db/meta/MetaTypes.h" - namespace zilliz { namespace milvus { namespace scheduler { - using engine::meta::TableFileSchemaPtr; using Id2IndexMap = std::unordered_map; @@ -43,10 +42,10 @@ using Id2DistanceMap = std::vector>; using ResultSet = std::vector; class SearchJob : public Job { -public: + public: SearchJob(JobId id, uint64_t topk, uint64_t nq, uint64_t nprobe, const float *vectors); -public: + public: bool AddIndexFile(const TableFileSchemaPtr &index_file); @@ -62,7 +61,7 @@ public: Status & GetStatus(); -public: + public: uint64_t topk() const { return topk_; @@ -77,6 +76,7 @@ public: nprobe() const { return nprobe_; } + const float * vectors() const { return vectors_; @@ -87,7 +87,7 @@ public: return index_files_; } -private: + private: uint64_t topk_ = 0; uint64_t nq_ = 0; uint64_t nprobe_ = 0; @@ -105,7 +105,6 @@ private: using SearchJobPtr = std::shared_ptr; -} -} -} - +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/resource/Connection.h b/cpp/src/scheduler/resource/Connection.h index 38fbc65bb5e99cdb4646acb8820f52d3724f84e6..cf18b6c9a214c8947fbe060c7c61c51ba0832650 100644 --- a/cpp/src/scheduler/resource/Connection.h +++ b/cpp/src/scheduler/resource/Connection.h @@ -19,17 +19,18 @@ #include #include - +#include namespace zilliz { namespace milvus { namespace scheduler { class Connection { -public: + public: // TODO: update construct function, speed: double->uint64_t Connection(std::string name, double speed) - : name_(std::move(name)), speed_(speed) {} + : name_(std::move(name)), speed_(speed) { + } const std::string & name() const { @@ -46,7 +47,7 @@ public: return 1024 / speed_; } -public: + public: std::string Dump() const { std::stringstream ss; @@ -54,12 +55,11 @@ public: return ss.str(); } -private: + private: std::string name_; uint64_t speed_; }; - -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/resource/CpuResource.cpp b/cpp/src/scheduler/resource/CpuResource.cpp index 3933bbb5a0813d86f2f99d16a48bfa646a2632db..5859dfd0cddcc51f985e1467ea1e973490d19b1b 100644 --- a/cpp/src/scheduler/resource/CpuResource.cpp +++ b/cpp/src/scheduler/resource/CpuResource.cpp @@ -16,29 +16,34 @@ // under the License. -#include "CpuResource.h" +#include "scheduler/resource/CpuResource.h" +#include namespace zilliz { namespace milvus { namespace scheduler { -std::ostream &operator<<(std::ostream &out, const CpuResource &resource) { +std::ostream & +operator<<(std::ostream &out, const CpuResource &resource) { out << resource.Dump(); return out; } CpuResource::CpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor) - : Resource(std::move(name), ResourceType::CPU, device_id, enable_loader, enable_executor) {} + : Resource(std::move(name), ResourceType::CPU, device_id, enable_loader, enable_executor) { +} -void CpuResource::LoadFile(TaskPtr task) { +void +CpuResource::LoadFile(TaskPtr task) { task->Load(LoadType::DISK2CPU, 0); } -void CpuResource::Process(TaskPtr task) { +void +CpuResource::Process(TaskPtr task) { task->Execute(); } -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/resource/CpuResource.h b/cpp/src/scheduler/resource/CpuResource.h index 1f6f235621b4009583e3f384828157e439e177a2..2226523fdfe55b8320ef950001e08f4d5a737257 100644 --- a/cpp/src/scheduler/resource/CpuResource.h +++ b/cpp/src/scheduler/resource/CpuResource.h @@ -21,13 +21,12 @@ #include "Resource.h" - namespace zilliz { namespace milvus { namespace scheduler { class CpuResource : public Resource { -public: + public: explicit CpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor); @@ -38,7 +37,7 @@ public: friend std::ostream &operator<<(std::ostream &out, const CpuResource &resource); -protected: + protected: void LoadFile(TaskPtr task) override; @@ -46,6 +45,6 @@ protected: Process(TaskPtr task) override; }; -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/resource/DiskResource.cpp b/cpp/src/scheduler/resource/DiskResource.cpp index 6495396fdbc78647fcc2a8d5b9720f1329cedb03..eee2424cc11ed16a94f04764dc7c08ce5800d4d1 100644 --- a/cpp/src/scheduler/resource/DiskResource.cpp +++ b/cpp/src/scheduler/resource/DiskResource.cpp @@ -15,14 +15,17 @@ // specific language governing permissions and limitations // under the License. -#include "DiskResource.h" +#include "scheduler/resource/DiskResource.h" +#include +#include namespace zilliz { namespace milvus { namespace scheduler { -std::ostream &operator<<(std::ostream &out, const DiskResource &resource) { +std::ostream & +operator<<(std::ostream &out, const DiskResource &resource) { out << resource.Dump(); return out; } @@ -31,15 +34,14 @@ DiskResource::DiskResource(std::string name, uint64_t device_id, bool enable_loa : Resource(std::move(name), ResourceType::DISK, device_id, enable_loader, enable_executor) { } -void DiskResource::LoadFile(TaskPtr task) { - +void +DiskResource::LoadFile(TaskPtr task) { } -void DiskResource::Process(TaskPtr task) { - +void +DiskResource::Process(TaskPtr task) { } -} -} -} - +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/resource/DiskResource.h b/cpp/src/scheduler/resource/DiskResource.h index b9e7a4035c8e716cbd2bd1865de63dc7181efd70..a7caf5c6622e97e0e8ed96a42b6907a1203439db 100644 --- a/cpp/src/scheduler/resource/DiskResource.h +++ b/cpp/src/scheduler/resource/DiskResource.h @@ -17,16 +17,16 @@ #pragma once - #include "Resource.h" +#include namespace zilliz { namespace milvus { namespace scheduler { class DiskResource : public Resource { -public: + public: explicit DiskResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor); @@ -37,7 +37,7 @@ public: friend std::ostream &operator<<(std::ostream &out, const DiskResource &resource); -protected: + protected: void LoadFile(TaskPtr task) override; @@ -45,6 +45,6 @@ protected: Process(TaskPtr task) override; }; -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/resource/GpuResource.cpp b/cpp/src/scheduler/resource/GpuResource.cpp index c3981e65cb2cd89bc4b21fe15f6730f467ad970a..3c7abc0b2937f14422a5f426c53b8c0671541ebb 100644 --- a/cpp/src/scheduler/resource/GpuResource.cpp +++ b/cpp/src/scheduler/resource/GpuResource.cpp @@ -16,29 +16,32 @@ // under the License. -#include "GpuResource.h" - +#include "scheduler/resource/GpuResource.h" namespace zilliz { namespace milvus { namespace scheduler { -std::ostream &operator<<(std::ostream &out, const GpuResource &resource) { +std::ostream & +operator<<(std::ostream &out, const GpuResource &resource) { out << resource.Dump(); return out; } GpuResource::GpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor) - : Resource(std::move(name), ResourceType::GPU, device_id, enable_loader, enable_executor) {} + : Resource(std::move(name), ResourceType::GPU, device_id, enable_loader, enable_executor) { +} -void GpuResource::LoadFile(TaskPtr task) { +void +GpuResource::LoadFile(TaskPtr task) { task->Load(LoadType::CPU2GPU, device_id_); } -void GpuResource::Process(TaskPtr task) { +void +GpuResource::Process(TaskPtr task) { task->Execute(); } -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/resource/GpuResource.h b/cpp/src/scheduler/resource/GpuResource.h index f08e767c51cfa871253b0b0b24101f25b0c3500e..9f19b07464463f6c6e49c53f9d203631f3a6b674 100644 --- a/cpp/src/scheduler/resource/GpuResource.h +++ b/cpp/src/scheduler/resource/GpuResource.h @@ -17,16 +17,17 @@ #pragma once - #include "Resource.h" +#include +#include namespace zilliz { namespace milvus { namespace scheduler { class GpuResource : public Resource { -public: + public: explicit GpuResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor); @@ -37,7 +38,7 @@ public: friend std::ostream &operator<<(std::ostream &out, const GpuResource &resource); -protected: + protected: void LoadFile(TaskPtr task) override; @@ -45,6 +46,6 @@ protected: Process(TaskPtr task) override; }; -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/resource/Node.cpp b/cpp/src/scheduler/resource/Node.cpp index 719bfb142731fdfa9d9bf37040b797b5a3c7ffb0..cf652b8ba0ac21d73bee593bdea39a45ca605c52 100644 --- a/cpp/src/scheduler/resource/Node.cpp +++ b/cpp/src/scheduler/resource/Node.cpp @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. +#include "scheduler/resource/Node.h" #include -#include "Node.h" - +#include namespace zilliz { namespace milvus { @@ -29,7 +29,8 @@ Node::Node() { id_ = counter++; } -std::vector Node::GetNeighbours() { +std::vector +Node::GetNeighbours() { std::lock_guard lk(mutex_); std::vector ret; for (auto &e : neighbours_) { @@ -38,7 +39,8 @@ std::vector Node::GetNeighbours() { return ret; } -std::string Node::Dump() { +std::string +Node::Dump() { std::stringstream ss; ss << "::neighbours:" << std::endl; for (auto &neighbour : neighbours_) { @@ -48,7 +50,8 @@ std::string Node::Dump() { return ss.str(); } -void Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) { +void +Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &connection) { std::lock_guard lk(mutex_); if (auto s = neighbour_node.lock()) { neighbours_.emplace(std::make_pair(s->id_, Neighbour(neighbour_node, connection))); @@ -56,6 +59,6 @@ void Node::AddNeighbour(const NeighbourNodePtr &neighbour_node, Connection &conn // else do nothing, consider it.. } -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/resource/Node.h b/cpp/src/scheduler/resource/Node.h index 46506da10d5a4fa0591990a7faa5a990048bc84c..00337a70094ae3cc2b399e2e4c47d83662c498bb 100644 --- a/cpp/src/scheduler/resource/Node.h +++ b/cpp/src/scheduler/resource/Node.h @@ -20,11 +20,11 @@ #include #include #include +#include -#include "../TaskTable.h" +#include "scheduler/TaskTable.h" #include "Connection.h" - namespace zilliz { namespace milvus { namespace scheduler { @@ -34,8 +34,9 @@ class Node; using NeighbourNodePtr = std::weak_ptr; struct Neighbour { - Neighbour(NeighbourNodePtr nei, Connection conn) - : neighbour_node(nei), connection(conn) {} + Neighbour(NeighbourNodePtr nei, Connection conn) + : neighbour_node(nei), connection(conn) { + } NeighbourNodePtr neighbour_node; Connection connection; @@ -43,7 +44,7 @@ struct Neighbour { // TODO(linxj): return type void -> Status class Node { -public: + public: Node(); void @@ -52,11 +53,11 @@ public: std::vector GetNeighbours(); -public: + public: std::string Dump(); -private: + private: std::mutex mutex_; uint8_t id_; std::map neighbours_; @@ -65,6 +66,6 @@ private: using NodePtr = std::shared_ptr; using NodeWPtr = std::weak_ptr; -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index 22e7eb8502c9a6615b13bcb4c54ac402a2d6e0ba..4eb71b8ac52a97b967acbb493c86afa4137be487 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -15,10 +15,11 @@ // specific language governing permissions and limitations // under the License. -#include -#include "../Utils.h" -#include "Resource.h" +#include "scheduler/resource/Resource.h" +#include "scheduler/Utils.h" +#include +#include namespace zilliz { namespace milvus { @@ -100,7 +101,8 @@ Resource::NumOfTaskToExec() { return count; } -TaskTableItemPtr Resource::pick_task_load() { +TaskTableItemPtr +Resource::pick_task_load() { auto indexes = task_table_.PickToLoad(10); for (auto index : indexes) { // try to set one task loading, then return @@ -111,7 +113,8 @@ TaskTableItemPtr Resource::pick_task_load() { return nullptr; } -TaskTableItemPtr Resource::pick_task_execute() { +TaskTableItemPtr +Resource::pick_task_execute() { auto indexes = task_table_.PickToExecute(3); for (auto index : indexes) { // try to set one task executing, then return @@ -122,10 +125,13 @@ TaskTableItemPtr Resource::pick_task_execute() { return nullptr; } -void Resource::loader_function() { +void +Resource::loader_function() { while (running_) { std::unique_lock lock(load_mutex_); - load_cv_.wait(lock, [&] { return load_flag_; }); + load_cv_.wait(lock, [&] { + return load_flag_; + }); load_flag_ = false; lock.unlock(); while (true) { @@ -140,18 +146,20 @@ void Resource::loader_function() { subscriber_(std::static_pointer_cast(event)); } } - } } -void Resource::executor_function() { +void +Resource::executor_function() { if (subscriber_) { auto event = std::make_shared(shared_from_this()); subscriber_(std::static_pointer_cast(event)); } while (running_) { std::unique_lock lock(exec_mutex_); - exec_cv_.wait(lock, [&] { return exec_flag_; }); + exec_cv_.wait(lock, [&] { + return exec_flag_; + }); exec_flag_ = false; lock.unlock(); while (true) { @@ -172,10 +180,9 @@ void Resource::executor_function() { subscriber_(std::static_pointer_cast(event)); } } - } } -} -} -} \ No newline at end of file +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index 0b0eaae3043f581bddcea67f7ff212e7e452cb2b..1c18b1a2b2497502d41ccec24acde446f22cc60f 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -34,7 +35,6 @@ #include "Connection.h" #include "Node.h" - namespace zilliz { namespace milvus { namespace scheduler { @@ -104,7 +104,7 @@ class Resource : public Node, public std::enable_shared_from_this { return task_table_; } -public: + public: inline bool HasLoader() const { return enable_loader_; @@ -212,7 +212,6 @@ public: using ResourcePtr = std::shared_ptr; using ResourceWPtr = std::weak_ptr; -} -} -} - +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/resource/TestResource.cpp b/cpp/src/scheduler/resource/TestResource.cpp index b49ea12d77822caa5411e1d9173cf9be93c6a319..25560cf7ee65a05b2c9a577a34ca0f07d93b7cae 100644 --- a/cpp/src/scheduler/resource/TestResource.cpp +++ b/cpp/src/scheduler/resource/TestResource.cpp @@ -15,14 +15,16 @@ // specific language governing permissions and limitations // under the License. -#include "TestResource.h" +#include "scheduler/resource/TestResource.h" +#include namespace zilliz { namespace milvus { namespace scheduler { -std::ostream &operator<<(std::ostream &out, const TestResource &resource) { +std::ostream & +operator<<(std::ostream &out, const TestResource &resource) { out << resource.Dump(); return out; } @@ -31,15 +33,16 @@ TestResource::TestResource(std::string name, uint64_t device_id, bool enable_loa : Resource(std::move(name), ResourceType::TEST, device_id, enable_loader, enable_executor) { } -void TestResource::LoadFile(TaskPtr task) { +void +TestResource::LoadFile(TaskPtr task) { task->Load(LoadType::TEST, 0); } -void TestResource::Process(TaskPtr task) { +void +TestResource::Process(TaskPtr task) { task->Execute(); } -} -} -} - +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/resource/TestResource.h b/cpp/src/scheduler/resource/TestResource.h index 861efbd825add4d657d75ab8df1b67c381523eea..ac83a42c60c776fd364611e136bf0afb21fa0b87 100644 --- a/cpp/src/scheduler/resource/TestResource.h +++ b/cpp/src/scheduler/resource/TestResource.h @@ -17,16 +17,17 @@ #pragma once - #include "Resource.h" +#include +#include namespace zilliz { namespace milvus { namespace scheduler { class TestResource : public Resource { -public: + public: explicit TestResource(std::string name, uint64_t device_id, bool enable_loader, bool enable_executor); @@ -37,7 +38,7 @@ public: friend std::ostream &operator<<(std::ostream &out, const TestResource &resource); -protected: + protected: void LoadFile(TaskPtr task) override; @@ -45,6 +46,6 @@ protected: Process(TaskPtr task) override; }; -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/task/DeleteTask.cpp b/cpp/src/scheduler/task/DeleteTask.cpp index 65a7a537fd5c2429944900801eb026cdd1d934ef..52579d67c67b5edd2061a84702703f5b8e599e03 100644 --- a/cpp/src/scheduler/task/DeleteTask.cpp +++ b/cpp/src/scheduler/task/DeleteTask.cpp @@ -16,19 +16,18 @@ // under the License. -#include "DeleteTask.h" - +#include "scheduler/task/DeleteTask.h" namespace zilliz { namespace milvus { namespace scheduler { XDeleteTask::XDeleteTask(const scheduler::DeleteJobPtr &delete_job) - : Task(TaskType::DeleteTask), delete_job_(delete_job) {} + : Task(TaskType::DeleteTask), delete_job_(delete_job) { +} void XDeleteTask::Load(LoadType type, uint8_t device_id) { - } void @@ -36,6 +35,6 @@ XDeleteTask::Execute() { delete_job_->ResourceDone(); } -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/task/DeleteTask.h b/cpp/src/scheduler/task/DeleteTask.h index b64f9a977f0ba66dd1876202fbf47660ec1fa036..608960e7c868b612dfc121a061e32bc9c28f34e0 100644 --- a/cpp/src/scheduler/task/DeleteTask.h +++ b/cpp/src/scheduler/task/DeleteTask.h @@ -20,15 +20,13 @@ #include "scheduler/job/DeleteJob.h" #include "Task.h" - namespace zilliz { namespace milvus { namespace scheduler { class XDeleteTask : public Task { -public: - explicit - XDeleteTask(const scheduler::DeleteJobPtr &job); + public: + explicit XDeleteTask(const scheduler::DeleteJobPtr &job); void Load(LoadType type, uint8_t device_id) override; @@ -36,10 +34,10 @@ public: void Execute() override; -public: + public: scheduler::DeleteJobPtr delete_job_; }; -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/task/Path.h b/cpp/src/scheduler/task/Path.h index a463494b774c341772b63f1a2b44a6efcb61faa2..672dfff1b9b36c251f4f23ff4a6d8972050dbe4b 100644 --- a/cpp/src/scheduler/task/Path.h +++ b/cpp/src/scheduler/task/Path.h @@ -20,7 +20,6 @@ #include #include - namespace zilliz { namespace milvus { namespace scheduler { @@ -29,7 +28,8 @@ class Path { public: Path() = default; - Path(std::vector& path, uint64_t index) : path_(path), index_(index) {} + Path(std::vector &path, uint64_t index) : path_(path), index_(index) { + } void push_back(const std::string &str) { @@ -49,7 +49,6 @@ class Path { } else { return nullptr; } - } std::string @@ -67,14 +66,19 @@ class Path { return path_[index]; } - std::vector::iterator begin() { return path_.begin(); } - std::vector::iterator end() { return path_.end(); } + std::vector::iterator begin() { + return path_.begin(); + } + + std::vector::iterator end() { + return path_.end(); + } public: std::vector path_; uint64_t index_ = 0; }; -} -} -} \ No newline at end of file +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/task/SearchTask.cpp b/cpp/src/scheduler/task/SearchTask.cpp index 284152f14ec6ae6b8115c60d339215dd51d3c8f7..0c205fcafabcb2cfcd201a59f37fc72221c6e226 100644 --- a/cpp/src/scheduler/task/SearchTask.cpp +++ b/cpp/src/scheduler/task/SearchTask.cpp @@ -15,15 +15,16 @@ // specific language governing permissions and limitations // under the License. -#include "SearchTask.h" +#include "scheduler/task/SearchTask.h" +#include "scheduler/job/SearchJob.h" #include "metrics/Metrics.h" #include "db/engine/EngineFactory.h" #include "utils/TimeRecorder.h" #include "utils/Log.h" #include -#include "scheduler/job/SearchJob.h" - +#include +#include namespace zilliz { namespace milvus { @@ -104,7 +105,6 @@ XSearchTask::XSearchTask(TableFileSchemaPtr file) (MetricType) file_->metric_type_, file_->nlist_); } - } void @@ -144,7 +144,7 @@ XSearchTask::Load(LoadType type, uint8_t device_id) { s = Status(SERVER_UNEXPECTED_ERROR, error_msg); } - if (auto job = job_.lock()){ + if (auto job = job_.lock()) { auto search_job = std::static_pointer_cast(job); search_job->SearchDone(file_->id_); search_job->GetStatus() = s; @@ -183,7 +183,7 @@ XSearchTask::Execute() { server::CollectDurationMetrics metrics(index_type_); - std::vector output_ids; + std::vector output_ids; std::vector output_distance; if (auto job = job_.lock()) { @@ -192,7 +192,7 @@ XSearchTask::Execute() { uint64_t nq = search_job->nq(); uint64_t topk = search_job->topk(); uint64_t nprobe = search_job->nprobe(); - const float* vectors = search_job->vectors(); + const float *vectors = search_job->vectors(); output_ids.resize(topk * nq); output_distance.resize(topk * nq); @@ -236,11 +236,12 @@ XSearchTask::Execute() { index_engine_ = nullptr; } -Status XSearchTask::ClusterResult(const std::vector &output_ids, - const std::vector &output_distance, - uint64_t nq, - uint64_t topk, - scheduler::ResultSet &result_set) { +Status +XSearchTask::ClusterResult(const std::vector &output_ids, + const std::vector &output_distance, + uint64_t nq, + uint64_t topk, + scheduler::ResultSet &result_set) { if (output_ids.size() < nq * topk || output_distance.size() < nq * topk) { std::string msg = "Invalid id array size: " + std::to_string(output_ids.size()) + " distance array size: " + std::to_string(output_distance.size()); @@ -275,10 +276,11 @@ Status XSearchTask::ClusterResult(const std::vector &output_ids, return Status::OK(); } -Status XSearchTask::MergeResult(scheduler::Id2DistanceMap &distance_src, - scheduler::Id2DistanceMap &distance_target, - uint64_t topk, - bool ascending) { +Status +XSearchTask::MergeResult(scheduler::Id2DistanceMap &distance_src, + scheduler::Id2DistanceMap &distance_target, + uint64_t topk, + bool ascending) { //Note: the score_src and score_target are already arranged by score in ascending order if (distance_src.empty()) { ENGINE_LOG_WARNING << "Empty distance source array"; @@ -349,10 +351,11 @@ Status XSearchTask::MergeResult(scheduler::Id2DistanceMap &distance_src, return Status::OK(); } -Status XSearchTask::TopkResult(scheduler::ResultSet &result_src, - uint64_t topk, - bool ascending, - scheduler::ResultSet &result_target) { +Status +XSearchTask::TopkResult(scheduler::ResultSet &result_src, + uint64_t topk, + bool ascending, + scheduler::ResultSet &result_target) { if (result_target.empty()) { result_target.swap(result_src); return Status::OK(); @@ -381,7 +384,6 @@ Status XSearchTask::TopkResult(scheduler::ResultSet &result_src, return Status::OK(); } - -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/task/SearchTask.h b/cpp/src/scheduler/task/SearchTask.h index 6f0ec6bc530396a4be11a2c09c8e4d849095d311..7c19ba20f961966455e6aa798aa9bf356b735f34 100644 --- a/cpp/src/scheduler/task/SearchTask.h +++ b/cpp/src/scheduler/task/SearchTask.h @@ -21,6 +21,7 @@ #include "scheduler/job/SearchJob.h" #include "scheduler/Definition.h" +#include namespace zilliz { namespace milvus { @@ -28,9 +29,8 @@ namespace scheduler { // TODO: rewrite class XSearchTask : public Task { -public: - explicit - XSearchTask(TableFileSchemaPtr file); + public: + explicit XSearchTask(TableFileSchemaPtr file); void Load(LoadType type, uint8_t device_id) override; @@ -38,8 +38,8 @@ public: void Execute() override; -public: - static Status ClusterResult(const std::vector &output_ids, + public: + static Status ClusterResult(const std::vector &output_ids, const std::vector &output_distence, uint64_t nq, uint64_t topk, @@ -55,7 +55,7 @@ public: bool ascending, scheduler::ResultSet &result_target); -public: + public: TableFileSchemaPtr file_; size_t index_id_ = 0; @@ -66,6 +66,6 @@ public: static std::mutex merge_mutex_; }; -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/task/Task.h b/cpp/src/scheduler/task/Task.h index 785ef9cfb7327baa9ea2e4d5748129ce80e575bc..3600c10f033b5de5fe0d7cf83fa0cea02e91037b 100644 --- a/cpp/src/scheduler/task/Task.h +++ b/cpp/src/scheduler/task/Task.h @@ -25,7 +25,6 @@ #include #include - namespace zilliz { namespace milvus { namespace scheduler { @@ -49,20 +48,22 @@ using TaskPtr = std::shared_ptr; // TODO: re-design class Task { -public: - explicit - Task(TaskType type) : type_(type) {} + public: + explicit Task(TaskType type) : type_(type) { + } /* * Just Getter; */ inline TaskType - Type() const { return type_; } + Type() const { + return type_; + } /* * Transport path; */ - inline Path& + inline Path & path() { return task_path_; } @@ -75,14 +76,14 @@ public: return label_; } -public: + public: virtual void Load(LoadType type, uint8_t device_id) = 0; virtual void Execute() = 0; -public: + public: Path task_path_; // std::vector search_contexts_; scheduler::JobWPtr job_; @@ -90,7 +91,6 @@ public: TaskLabelPtr label_ = nullptr; }; - -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/task/TestTask.cpp b/cpp/src/scheduler/task/TestTask.cpp index 32e0a8587e458d91eff450324542965edd0b180c..fc66e562691d85f418d7a61bc5fb2dce5ef87d30 100644 --- a/cpp/src/scheduler/task/TestTask.cpp +++ b/cpp/src/scheduler/task/TestTask.cpp @@ -15,16 +15,15 @@ // specific language governing permissions and limitations // under the License. - -#include -#include "TestTask.h" - +#include "scheduler/task/TestTask.h" +#include "cache/GpuCacheMgr.h" namespace zilliz { namespace milvus { namespace scheduler { -TestTask::TestTask(TableFileSchemaPtr &file) : XSearchTask(file) {} +TestTask::TestTask(TableFileSchemaPtr &file) : XSearchTask(file) { +} void TestTask::Load(LoadType type, uint8_t device_id) { @@ -44,10 +43,11 @@ TestTask::Execute() { void TestTask::Wait() { std::unique_lock lock(mutex_); - cv_.wait(lock, [&] { return done_; }); -} - -} -} + cv_.wait(lock, [&] { + return done_; + }); } +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/task/TestTask.h b/cpp/src/scheduler/task/TestTask.h index 3367506d88d7ef96190118729f33e285390d2a70..7051080861695e85cc4697d61b02d2cdbf000a9d 100644 --- a/cpp/src/scheduler/task/TestTask.h +++ b/cpp/src/scheduler/task/TestTask.h @@ -19,17 +19,15 @@ #include "SearchTask.h" - namespace zilliz { namespace milvus { namespace scheduler { class TestTask : public XSearchTask { -public: - explicit - TestTask(TableFileSchemaPtr& file); + public: + explicit TestTask(TableFileSchemaPtr &file); -public: + public: void Load(LoadType type, uint8_t device_id) override; @@ -39,7 +37,7 @@ public: void Wait(); -public: + public: uint64_t load_count_ = 0; uint64_t exec_count_ = 0; @@ -48,7 +46,6 @@ public: std::condition_variable cv_; }; - -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/tasklabel/BroadcastLabel.h b/cpp/src/scheduler/tasklabel/BroadcastLabel.h index 1c89460df01cac158402cec10f2a2ef36451e6ab..6fca107864f59aef82abe1678303e033efb74417 100644 --- a/cpp/src/scheduler/tasklabel/BroadcastLabel.h +++ b/cpp/src/scheduler/tasklabel/BroadcastLabel.h @@ -21,19 +21,19 @@ #include - namespace zilliz { namespace milvus { namespace scheduler { - class BroadcastLabel : public TaskLabel { -public: - BroadcastLabel() : TaskLabel(TaskLabelType::BROADCAST) {} + public: + BroadcastLabel() : TaskLabel(TaskLabelType::BROADCAST) { + } }; using BroadcastLabelPtr = std::shared_ptr; -} -} -} +} // namespace scheduler +} // namespace milvus +} // namespace zilliz + diff --git a/cpp/src/scheduler/tasklabel/DefaultLabel.h b/cpp/src/scheduler/tasklabel/DefaultLabel.h index d61028117da227aac38e3f5b6f41269cede1eab4..7943c4f7c19355e2247a8d26eec80ffcdc45d334 100644 --- a/cpp/src/scheduler/tasklabel/DefaultLabel.h +++ b/cpp/src/scheduler/tasklabel/DefaultLabel.h @@ -21,20 +21,18 @@ #include - namespace zilliz { namespace milvus { namespace scheduler { class DefaultLabel : public TaskLabel { -public: - DefaultLabel() : TaskLabel(TaskLabelType::DEFAULT) {} + public: + DefaultLabel() : TaskLabel(TaskLabelType::DEFAULT) { + } }; using DefaultLabelPtr = std::shared_ptr; -} -} -} - - +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/tasklabel/SpecResLabel.h b/cpp/src/scheduler/tasklabel/SpecResLabel.h index 8e94a7677b09c310ad861330092b5315a30bd162..cfc5aa94000a67e2e90de061ef1d008181fb528d 100644 --- a/cpp/src/scheduler/tasklabel/SpecResLabel.h +++ b/cpp/src/scheduler/tasklabel/SpecResLabel.h @@ -22,7 +22,6 @@ #include #include - class Resource; using ResourceWPtr = std::weak_ptr; @@ -32,9 +31,10 @@ namespace milvus { namespace scheduler { class SpecResLabel : public TaskLabel { -public: - SpecResLabel(const ResourceWPtr &resource) - : TaskLabel(TaskLabelType::SPECIFIED_RESOURCE), resource_(resource) {} + public: + explicit SpecResLabel(const ResourceWPtr &resource) + : TaskLabel(TaskLabelType::SPECIFIED_RESOURCE), resource_(resource) { + } inline ResourceWPtr & resource() { @@ -46,14 +46,13 @@ public: return resource_name_; } -private: + private: ResourceWPtr resource_; std::string resource_name_; }; using SpecResLabelPtr = std::shared_ptr(); -} -} -} - +} // namespace scheduler +} // namespace milvus +} // namespace zilliz diff --git a/cpp/src/scheduler/tasklabel/TaskLabel.h b/cpp/src/scheduler/tasklabel/TaskLabel.h index 5d43adb7dbdee8203d652171a863a5e26a612193..d8d404e09477822188df280ab581aeb4afa245a8 100644 --- a/cpp/src/scheduler/tasklabel/TaskLabel.h +++ b/cpp/src/scheduler/tasklabel/TaskLabel.h @@ -30,23 +30,22 @@ enum class TaskLabelType { }; class TaskLabel { -public: + public: inline TaskLabelType Type() const { return type_; } -protected: - explicit - TaskLabel(TaskLabelType type) : type_(type) {} + protected: + explicit TaskLabel(TaskLabelType type) : type_(type) { + } -private: + private: TaskLabelType type_; }; using TaskLabelPtr = std::shared_ptr; -} -} -} - +} // namespace scheduler +} // namespace milvus +} // namespace zilliz