提交 2c9bb722 编写于 作者: W wxyu

MS-371 Add TaskTableUpdatedEvent


Former-commit-id: cfcd409967d2e9d8f1747432f3b99c5e75ded768
上级 2ea773b9
...@@ -88,6 +88,7 @@ private: ...@@ -88,6 +88,7 @@ private:
}; };
using ResourceMgrPtr = std::shared_ptr<ResourceMgr>;
using ResourceMgrWPtr = std::weak_ptr<ResourceMgr>; using ResourceMgrWPtr = std::weak_ptr<ResourceMgr>;
} }
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
******************************************************************************/ ******************************************************************************/
#pragma once #pragma once
#include <memory>
#include <string> #include <string>
#include <mutex> #include <mutex>
#include <thread> #include <thread>
...@@ -114,6 +115,8 @@ private: ...@@ -114,6 +115,8 @@ private:
std::thread worker_thread_; std::thread worker_thread_;
}; };
using SchedulerPtr = std::shared_ptr<Scheduler>;
} }
} }
} }
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
******************************************************************************/ ******************************************************************************/
#include "TaskTable.h" #include "TaskTable.h"
#include "event/TaskTableUpdatedEvent.h"
#include <vector> #include <vector>
...@@ -19,6 +20,9 @@ TaskTable::Put(TaskPtr task) { ...@@ -19,6 +20,9 @@ TaskTable::Put(TaskPtr task) {
item->task = std::move(task); item->task = std::move(task);
item->state = TaskTableItemState::LOADED; item->state = TaskTableItemState::LOADED;
table_.push_back(item); table_.push_back(item);
if (subscriber_) {
subscriber_();
}
} }
void void
...@@ -29,6 +33,9 @@ TaskTable::Put(std::vector<TaskPtr> &tasks) { ...@@ -29,6 +33,9 @@ TaskTable::Put(std::vector<TaskPtr> &tasks) {
item->state = TaskTableItemState::LOADED; item->state = TaskTableItemState::LOADED;
table_.push_back(item); table_.push_back(item);
} }
if (subscriber_) {
subscriber_();
}
} }
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include <mutex> #include <mutex>
#include "task/SearchTask.h" #include "task/SearchTask.h"
#include "event/Event.h"
namespace zilliz { namespace zilliz {
...@@ -48,6 +49,11 @@ class TaskTable { ...@@ -48,6 +49,11 @@ class TaskTable {
public: public:
TaskTable() = default; TaskTable() = default;
inline void
RegisterSubscriber(std::function<void(void)> subscriber) {
subscriber_ = std::move(subscriber);
}
/* /*
* Put one task; * Put one task;
*/ */
...@@ -162,6 +168,7 @@ public: ...@@ -162,6 +168,7 @@ public:
private: private:
// TODO: map better ? // TODO: map better ?
std::deque<TaskTableItemPtr> table_; std::deque<TaskTableItemPtr> table_;
std::function<void(void)> subscriber_ = nullptr;
}; };
......
...@@ -16,6 +16,12 @@ Resource::Resource(std::string name, ResourceType type) ...@@ -16,6 +16,12 @@ Resource::Resource(std::string name, ResourceType type)
running_(false), running_(false),
load_flag_(false), load_flag_(false),
exec_flag_(false) { exec_flag_(false) {
task_table_.RegisterSubscriber([&] {
if (subscriber_) {
auto event = std::make_shared<TaskTableUpdatedEvent>(shared_from_this());
subscriber_(std::static_pointer_cast<Event>(event));
}
});
} }
void Resource::Start() { void Resource::Start() {
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include "../event/StartUpEvent.h" #include "../event/StartUpEvent.h"
#include "../event/CopyCompletedEvent.h" #include "../event/CopyCompletedEvent.h"
#include "../event/FinishTaskEvent.h" #include "../event/FinishTaskEvent.h"
#include "../event/TaskTableUpdatedEvent.h"
#include "../TaskTable.h" #include "../TaskTable.h"
#include "../task/Task.h" #include "../task/Task.h"
#include "../Cost.h" #include "../Cost.h"
......
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "TestTask.h"
namespace zilliz {
namespace milvus {
namespace engine {
void
TestTask::Load(LoadType type, uint8_t device_id) {
load_count_++;
}
void
TestTask::Execute() {
exec_count_++;
}
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#pragma once
#include "Task.h"
namespace zilliz {
namespace milvus {
namespace engine {
class TestTask : public Task {
public:
TestTask() = default;
public:
void
Load(LoadType type, uint8_t device_id) override;
void
Execute() override;
public:
uint64_t load_count_;
uint64_t exec_count_;
};
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "scheduler/resource/Resource.h"
#include "scheduler/resource/DiskResource.h"
#include "scheduler/resource/CpuResource.h"
#include "scheduler/resource/GpuResource.h"
#include "scheduler/task/Task.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/ResourceFactory.h"
#include <gtest/gtest.h>
namespace zilliz {
namespace milvus {
namespace engine {
class ResourceTest : public testing::Test {
protected:
void
SetUp() override {
disk_resource_ = ResourceFactory::Create("disk");
cpu_resource_ = ResourceFactory::Create("cpu");
gpu_resource_ = ResourceFactory::Create("gpu");
flag_ = false;
auto subscriber = [&](EventPtr) {
std::unique_lock<std::mutex> lock(mutex_);
flag_ = true;
cv_.notify_one();
};
disk_resource_->RegisterSubscriber(subscriber);
cpu_resource_->RegisterSubscriber(subscriber);
gpu_resource_->RegisterSubscriber(subscriber);
}
void
Wait() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return flag_; });
}
ResourcePtr disk_resource_;
ResourcePtr cpu_resource_;
ResourcePtr gpu_resource_;
bool flag_;
std::mutex mutex_;
std::condition_variable cv_;
};
TEST_F(ResourceTest, cpu_resource_test) {
auto task = std::make_shared<TestTask>();
cpu_resource_->task_table().Put(task);
cpu_resource_->WakeupLoader();
Wait();
ASSERT_EQ(task->load_count_, 1);
flag_ = false;
cpu_resource_->WakeupExecutor();
Wait();
ASSERT_EQ(task->exec_count_, 1);
}
TEST_F(ResourceTest, gpu_resource_test) {
auto task = std::make_shared<TestTask>();
gpu_resource_->task_table().Put(task);
gpu_resource_->WakeupLoader();
Wait();
ASSERT_EQ(task->load_count_, 1);
flag_ = false;
gpu_resource_->WakeupExecutor();
Wait();
ASSERT_EQ(task->exec_count_, 1);
}
}
}
}
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include <gtest/gtest.h>
namespace zilliz {
namespace milvus {
namespace engine {
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册