From 2c9bb7224d794caebb341b5a310ab3694e4d6381 Mon Sep 17 00:00:00 2001 From: wxyu Date: Sat, 17 Aug 2019 17:10:18 +0800 Subject: [PATCH] MS-371 Add TaskTableUpdatedEvent Former-commit-id: cfcd409967d2e9d8f1747432f3b99c5e75ded768 --- cpp/src/scheduler/ResourceMgr.h | 1 + cpp/src/scheduler/Scheduler.h | 3 + cpp/src/scheduler/TaskTable.cpp | 7 ++ cpp/src/scheduler/TaskTable.h | 7 ++ cpp/src/scheduler/resource/Resource.cpp | 6 ++ cpp/src/scheduler/resource/Resource.h | 1 + cpp/src/scheduler/task/TestTask.cpp | 26 +++++++ cpp/src/scheduler/task/TestTask.h | 34 ++++++++++ cpp/unittest/scheduler/resource_test.cpp | 83 +++++++++++++++++++++++ cpp/unittest/scheduler/scheduler_test.cpp | 17 +++++ 10 files changed, 185 insertions(+) create mode 100644 cpp/src/scheduler/task/TestTask.cpp create mode 100644 cpp/src/scheduler/task/TestTask.h create mode 100644 cpp/unittest/scheduler/resource_test.cpp create mode 100644 cpp/unittest/scheduler/scheduler_test.cpp diff --git a/cpp/src/scheduler/ResourceMgr.h b/cpp/src/scheduler/ResourceMgr.h index 714924c9..fc7744cd 100644 --- a/cpp/src/scheduler/ResourceMgr.h +++ b/cpp/src/scheduler/ResourceMgr.h @@ -88,6 +88,7 @@ private: }; +using ResourceMgrPtr = std::shared_ptr; using ResourceMgrWPtr = std::weak_ptr; } diff --git a/cpp/src/scheduler/Scheduler.h b/cpp/src/scheduler/Scheduler.h index 66088dd5..5e508262 100644 --- a/cpp/src/scheduler/Scheduler.h +++ b/cpp/src/scheduler/Scheduler.h @@ -5,6 +5,7 @@ ******************************************************************************/ #pragma once +#include #include #include #include @@ -114,6 +115,8 @@ private: std::thread worker_thread_; }; +using SchedulerPtr = std::shared_ptr; + } } } diff --git a/cpp/src/scheduler/TaskTable.cpp b/cpp/src/scheduler/TaskTable.cpp index bac4d245..807d35be 100644 --- a/cpp/src/scheduler/TaskTable.cpp +++ b/cpp/src/scheduler/TaskTable.cpp @@ -5,6 +5,7 @@ ******************************************************************************/ #include "TaskTable.h" +#include "event/TaskTableUpdatedEvent.h" #include @@ -19,6 +20,9 @@ TaskTable::Put(TaskPtr task) { item->task = std::move(task); item->state = TaskTableItemState::LOADED; table_.push_back(item); + if (subscriber_) { + subscriber_(); + } } void @@ -29,6 +33,9 @@ TaskTable::Put(std::vector &tasks) { item->state = TaskTableItemState::LOADED; table_.push_back(item); } + if (subscriber_) { + subscriber_(); + } } diff --git a/cpp/src/scheduler/TaskTable.h b/cpp/src/scheduler/TaskTable.h index 528c2f3d..70a28993 100644 --- a/cpp/src/scheduler/TaskTable.h +++ b/cpp/src/scheduler/TaskTable.h @@ -10,6 +10,7 @@ #include #include "task/SearchTask.h" +#include "event/Event.h" namespace zilliz { @@ -48,6 +49,11 @@ class TaskTable { public: TaskTable() = default; + inline void + RegisterSubscriber(std::function subscriber) { + subscriber_ = std::move(subscriber); + } + /* * Put one task; */ @@ -162,6 +168,7 @@ public: private: // TODO: map better ? std::deque table_; + std::function subscriber_ = nullptr; }; diff --git a/cpp/src/scheduler/resource/Resource.cpp b/cpp/src/scheduler/resource/Resource.cpp index cf9ff9d8..ea6f71a3 100644 --- a/cpp/src/scheduler/resource/Resource.cpp +++ b/cpp/src/scheduler/resource/Resource.cpp @@ -16,6 +16,12 @@ Resource::Resource(std::string name, ResourceType type) running_(false), load_flag_(false), exec_flag_(false) { + task_table_.RegisterSubscriber([&] { + if (subscriber_) { + auto event = std::make_shared(shared_from_this()); + subscriber_(std::static_pointer_cast(event)); + } + }); } void Resource::Start() { diff --git a/cpp/src/scheduler/resource/Resource.h b/cpp/src/scheduler/resource/Resource.h index a9a1454e..ab7aab7b 100644 --- a/cpp/src/scheduler/resource/Resource.h +++ b/cpp/src/scheduler/resource/Resource.h @@ -16,6 +16,7 @@ #include "../event/StartUpEvent.h" #include "../event/CopyCompletedEvent.h" #include "../event/FinishTaskEvent.h" +#include "../event/TaskTableUpdatedEvent.h" #include "../TaskTable.h" #include "../task/Task.h" #include "../Cost.h" diff --git a/cpp/src/scheduler/task/TestTask.cpp b/cpp/src/scheduler/task/TestTask.cpp new file mode 100644 index 00000000..a974482e --- /dev/null +++ b/cpp/src/scheduler/task/TestTask.cpp @@ -0,0 +1,26 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "TestTask.h" + +namespace zilliz { +namespace milvus { +namespace engine { + +void +TestTask::Load(LoadType type, uint8_t device_id) { + load_count_++; +} + +void +TestTask::Execute() { + exec_count_++; +} + +} +} +} + diff --git a/cpp/src/scheduler/task/TestTask.h b/cpp/src/scheduler/task/TestTask.h new file mode 100644 index 00000000..5f49d2e3 --- /dev/null +++ b/cpp/src/scheduler/task/TestTask.h @@ -0,0 +1,34 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ +#pragma once + +#include "Task.h" + + +namespace zilliz { +namespace milvus { +namespace engine { + +class TestTask : public Task { +public: + TestTask() = default; + +public: + void + Load(LoadType type, uint8_t device_id) override; + + void + Execute() override; + +public: + uint64_t load_count_; + uint64_t exec_count_; +}; + + +} +} +} diff --git a/cpp/unittest/scheduler/resource_test.cpp b/cpp/unittest/scheduler/resource_test.cpp new file mode 100644 index 00000000..afb484b3 --- /dev/null +++ b/cpp/unittest/scheduler/resource_test.cpp @@ -0,0 +1,83 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include "scheduler/resource/Resource.h" +#include "scheduler/resource/DiskResource.h" +#include "scheduler/resource/CpuResource.h" +#include "scheduler/resource/GpuResource.h" +#include "scheduler/task/Task.h" +#include "scheduler/task/TestTask.h" +#include "scheduler/ResourceFactory.h" +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + +class ResourceTest : public testing::Test { +protected: + void + SetUp() override { + disk_resource_ = ResourceFactory::Create("disk"); + cpu_resource_ = ResourceFactory::Create("cpu"); + gpu_resource_ = ResourceFactory::Create("gpu"); + flag_ = false; + + auto subscriber = [&](EventPtr) { + std::unique_lock lock(mutex_); + flag_ = true; + cv_.notify_one(); + }; + + disk_resource_->RegisterSubscriber(subscriber); + cpu_resource_->RegisterSubscriber(subscriber); + gpu_resource_->RegisterSubscriber(subscriber); + } + + void + Wait() { + std::unique_lock lock(mutex_); + cv_.wait(lock, [&] { return flag_; }); + } + + ResourcePtr disk_resource_; + ResourcePtr cpu_resource_; + ResourcePtr gpu_resource_; + bool flag_; + std::mutex mutex_; + std::condition_variable cv_; +}; + + +TEST_F(ResourceTest, cpu_resource_test) { + auto task = std::make_shared(); + cpu_resource_->task_table().Put(task); + cpu_resource_->WakeupLoader(); + Wait(); + ASSERT_EQ(task->load_count_, 1); + flag_ = false; + cpu_resource_->WakeupExecutor(); + Wait(); + ASSERT_EQ(task->exec_count_, 1); +} + +TEST_F(ResourceTest, gpu_resource_test) { + auto task = std::make_shared(); + gpu_resource_->task_table().Put(task); + gpu_resource_->WakeupLoader(); + Wait(); + ASSERT_EQ(task->load_count_, 1); + flag_ = false; + gpu_resource_->WakeupExecutor(); + Wait(); + ASSERT_EQ(task->exec_count_, 1); +} + + +} +} +} diff --git a/cpp/unittest/scheduler/scheduler_test.cpp b/cpp/unittest/scheduler/scheduler_test.cpp new file mode 100644 index 00000000..787ae593 --- /dev/null +++ b/cpp/unittest/scheduler/scheduler_test.cpp @@ -0,0 +1,17 @@ +/******************************************************************************* + * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited. + * Proprietary and confidential. + ******************************************************************************/ + +#include + + +namespace zilliz { +namespace milvus { +namespace engine { + + +} +} +} -- GitLab