diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index 714924c9b906adda40b8d9750b84545255a6dfbd..fc7744cd2bdb14d9ef5fa5d0de270da0393524a3 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -88,6 +88,7 @@ private: }; +using ResourceMgrPtr = std::shared_ptr; using ResourceMgrWPtr = std::weak_ptr; } diff --git a/cpp/src/scheduler/Scheduler.h b/cpp/src/scheduler/Scheduler.h index 66088dd5b6e350bf1d030248c1f27e5b1a2a6398..5e50826238c985bf5b66b684ea55feb5588f8cc6 100644 --- a/cpp/src/scheduler/Scheduler.h +++ b/cpp/src/scheduler/Scheduler.h @@ -5,6 +5,7 @@ ******************************************************************************/ #pragma once +#include #include #include #include @@ -114,6 +115,8 @@ private: std::thread worker_thread_; }; +using SchedulerPtr = std::shared_ptr; + } } } diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index bac4d245da21fc2e262c168fdf7cbd3138f97bc7..807d35be4ac907baa852956fb0b4446b440dc2ba 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -5,6 +5,7 @@ ******************************************************************************/ #include "TaskTable.h" +#include "event/TaskTableUpdatedEvent.h" #include @@ -19,6 +20,9 @@ TaskTable::Put(TaskPtr task) { item->task = std::move(task); item->state = TaskTableItemState::LOADED; table_.push_back(item); + if (subscriber_) { + subscriber_(); + } } void @@ -29,6 +33,9 @@ TaskTable::Put(std::vector &tasks) { item->state = TaskTableItemState::LOADED; table_.push_back(item); } + if (subscriber_) { + subscriber_(); + } } diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index 528c2f3d5cf7577ea8682897aa6de057d1315338..70a2899331239c14d7baaf055186a3823ae668a4 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -10,6 +10,7 @@ #include #include "task/SearchTask.h" +#include "event/Event.h" namespace zilliz { @@ -48,6 +49,11 @@ class TaskTable { public: TaskTable() = default; + inline void + RegisterSubscriber(std::function subscriber) { + subscriber_ = std::move(subscriber); + } + /* * Put one task; */ @@ -162,6 +168,7 @@ public: private: // TODO: map better ? std::deque table_; + std::function subscriber_ = nullptr; }; diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index cf9ff9d882b6bb2e7b062aeda882a75d81b11339..ea6f71a3591632f82c04c6cb499f6f22a2d89979 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -16,6 +16,12 @@ Resource::Resource(std::string name, ResourceType type) running_(false), load_flag_(false), exec_flag_(false) { + task_table_.RegisterSubscriber([&] { + if (subscriber_) { + auto event = std::make_shared(shared_from_this()); + subscriber_(std::static_pointer_cast(event)); + } + }); } void Resource::Start() { diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index a9a1454ed532d2e4af6781b5d2d71a04e1c8cfa6..ab7aab7bacb58b16afad00d46b376329bdcb5ce1 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -16,6 +16,7 @@ #include "../event/StartUpEvent.h" #include "../event/CopyCompletedEvent.h" #include "../event/FinishTaskEvent.h" +#include "../event/TaskTableUpdatedEvent.h" #include "../TaskTable.h" #include "../task/Task.h" #include "../Cost.h" diff --git a/cpp/src/scheduler/task/TestTask.cpp b/cpp/src/scheduler/task/TestTask.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a974482e52a6fc65bd59b12980d6b376325817d4 --- /dev/null +++ b/cpp/src/scheduler/task/TestTask.cpp @@ -0,0 +1,26 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "TestTask.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +void +TestTask::Load(LoadType type, uint8_t device_id) { + load_count_++; +} + +void +TestTask::Execute() { + exec_count_++; +} + +} +} +} + diff --git a/cpp/src/scheduler/task/TestTask.h b/cpp/src/scheduler/task/TestTask.h new file mode 100644 index 0000000000000000000000000000000000000000..5f49d2e31ebc3399511fdaa463e47c3420f95329 --- /dev/null +++ b/cpp/src/scheduler/task/TestTask.h @@ -0,0 +1,34 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "Task.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class TestTask : public Task { +public: + TestTask() = default; + +public: + void + Load(LoadType type, uint8_t device_id) override; + + void + Execute() override; + +public: + uint64_t load_count_; + uint64_t exec_count_; +}; + + +} +} +} diff --git a/cpp/unittest/scheduler/resource_test.cpp b/cpp/unittest/scheduler/resource_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..afb484b376775a1f94ca6f13e7900b67e9187247 --- /dev/null +++ b/cpp/unittest/scheduler/resource_test.cpp @@ -0,0 +1,83 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "scheduler/resource/Resource.h" +#include "scheduler/resource/DiskResource.h" +#include "scheduler/resource/CpuResource.h" +#include "scheduler/resource/GpuResource.h" +#include "scheduler/task/Task.h" +#include "scheduler/task/TestTask.h" +#include "scheduler/ResourceFactory.h" +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +class ResourceTest : public testing::Test { +protected: + void + SetUp() override { + disk_resource_ = ResourceFactory::Create("disk"); + cpu_resource_ = ResourceFactory::Create("cpu"); + gpu_resource_ = ResourceFactory::Create("gpu"); + flag_ = false; + + auto subscriber = [&](EventPtr) { + std::unique_lock lock(mutex_); + flag_ = true; + cv_.notify_one(); + }; + + disk_resource_->RegisterSubscriber(subscriber); + cpu_resource_->RegisterSubscriber(subscriber); + gpu_resource_->RegisterSubscriber(subscriber); + } + + void + Wait() { + std::unique_lock lock(mutex_); + cv_.wait(lock, [&] { return flag_; }); + } + + ResourcePtr disk_resource_; + ResourcePtr cpu_resource_; + ResourcePtr gpu_resource_; + bool flag_; + std::mutex mutex_; + std::condition_variable cv_; +}; + + +TEST_F(ResourceTest, cpu_resource_test) { + auto task = std::make_shared(); + cpu_resource_->task_table().Put(task); + cpu_resource_->WakeupLoader(); + Wait(); + ASSERT_EQ(task->load_count_, 1); + flag_ = false; + cpu_resource_->WakeupExecutor(); + Wait(); + ASSERT_EQ(task->exec_count_, 1); +} + +TEST_F(ResourceTest, gpu_resource_test) { + auto task = std::make_shared(); + gpu_resource_->task_table().Put(task); + gpu_resource_->WakeupLoader(); + Wait(); + ASSERT_EQ(task->load_count_, 1); + flag_ = false; + gpu_resource_->WakeupExecutor(); + Wait(); + ASSERT_EQ(task->exec_count_, 1); +} + + +} +} +} diff --git a/cpp/unittest/scheduler/scheduler_test.cpp b/cpp/unittest/scheduler/scheduler_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..787ae59329c41cf69a99edc01e7c5dc51ea155b3 --- /dev/null +++ b/cpp/unittest/scheduler/scheduler_test.cpp @@ -0,0 +1,17 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + + +} +} +}