提交 d23cc2da 编写于 作者: W wxyu

MS-377 Improve process thread trigger in ResourceMgr, Scheduler and TaskTable


Former-commit-id: d4221fdb00cc24f4e8d62700101a6a9e24bcd064
上级 23a2f5a6
......@@ -28,6 +28,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-374 - Add action definition
- MS-375 - Add Dump implementation for Event
- MS-376 - Add loader and executor enable flag in Resource avoid diskresource execute task
- MS-377 - Improve process thread trigger in ResourceMgr, Scheduler and TaskTable
## New Feature
- MS-343 - Implement ResourceMgr
......
......@@ -33,11 +33,7 @@ ResourceMgr::Add(ResourcePtr &&resource) {
resources_.emplace_back(resource);
size_t index = resources_.size() - 1;
resource->RegisterSubscriber([&](EventPtr event) {
queue_.emplace(event);
std::unique_lock<std::mutex> lock(event_mutex_);
event_cv_.notify_one();
});
resource->RegisterSubscriber(std::bind(&ResourceMgr::PostEvent, this, std::placeholders::_1));
return ret;
}
......@@ -46,27 +42,11 @@ ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connect
if (auto observe_a = res1.lock()) {
if (auto observe_b = res2.lock()) {
observe_a->AddNeighbour(std::static_pointer_cast<Node>(observe_b), connection);
observe_b->AddNeighbour(std::static_pointer_cast<Node>(observe_a), connection);
}
}
}
void
ResourceMgr::EventProcess() {
while (running_) {
std::unique_lock<std::mutex> lock(event_mutex_);
event_cv_.wait(lock, [this] { return !queue_.empty(); });
if (!running_) {
break;
}
auto event = queue_.front();
queue_.pop();
if (subscriber_) {
subscriber_(event);
}
}
}
void
ResourceMgr::Start() {
......@@ -74,23 +54,33 @@ ResourceMgr::Start() {
for (auto &resource : resources_) {
resource->Start();
}
worker_thread_ = std::thread(&ResourceMgr::EventProcess, this);
running_ = true;
worker_thread_ = std::thread(&ResourceMgr::event_process, this);
}
void
ResourceMgr::Stop() {
std::lock_guard<std::mutex> lck(resources_mutex_);
running_ = false;
{
std::lock_guard<std::mutex> lock(event_mutex_);
running_ = false;
queue_.push(nullptr);
event_cv_.notify_one();
}
worker_thread_.join();
std::lock_guard<std::mutex> lck(resources_mutex_);
for (auto &resource : resources_) {
resource->Stop();
}
}
void
ResourceMgr::PostEvent(const EventPtr &event) {
std::unique_lock<std::mutex> lock(event_mutex_);
queue_.emplace(event);
event_cv_.notify_one();
}
std::string
ResourceMgr::Dump() {
std::string str = "ResourceMgr contains " + std::to_string(resources_.size()) + " resources.\n";
......@@ -103,6 +93,26 @@ ResourceMgr::Dump() {
return str;
}
void
ResourceMgr::event_process() {
while (running_) {
std::unique_lock<std::mutex> lock(event_mutex_);
event_cv_.wait(lock, [this] { return !queue_.empty(); });
auto event = queue_.front();
if (event == nullptr) {
break;
}
// ENGINE_LOG_DEBUG << "ResourceMgr process " << *event;
queue_.pop();
if (subscriber_) {
subscriber_(event);
}
}
}
}
}
}
......@@ -14,6 +14,7 @@
#include <condition_variable>
#include "resource/Resource.h"
#include "utils/Log.h"
namespace zilliz {
......@@ -59,6 +60,8 @@ public:
void
Stop();
void
PostEvent(const EventPtr& event);
// TODO: add stats interface(low)
......@@ -70,7 +73,7 @@ public:
private:
void
EventProcess();
event_process();
private:
std::queue<EventPtr> queue_;
......
......@@ -4,69 +4,129 @@
* Proprietary and confidential.
******************************************************************************/
#include <iostream>
#include "Scheduler.h"
#include "Cost.h"
#include "action/Action.h"
namespace zilliz {
namespace milvus {
namespace engine {
Scheduler::Scheduler(ResourceMgrWPtr res_mgr)
: running_(false),
res_mgr_(std::move(res_mgr)) {
if (auto mgr = res_mgr_.lock()) {
mgr->RegisterSubscriber(std::bind(&Scheduler::PostEvent, this, std::placeholders::_1));
}
}
void
push_task(ResourcePtr &self, ResourcePtr &other) {
auto self_task_table = self->task_table();
auto other_task_table = other->task_table();
if (!other_task_table.Empty()) {
CacheMgr cache;
auto indexes = PickToMove(self_task_table, cache, 1);
for (auto index : indexes) {
if (self_task_table.Move(index)) {
auto task = self_task_table.Get(index)->task;
other_task_table.Put(task);
// TODO: mark moved future
other->WakeupLoader();
other->WakeupExecutor();
}
}
Scheduler::Start() {
running_ = true;
worker_thread_ = std::thread(&Scheduler::worker_function, this);
}
void
Scheduler::Stop() {
{
std::lock_guard<std::mutex> lock(event_mutex_);
running_ = false;
event_queue_.push(nullptr);
event_cv_.notify_one();
}
worker_thread_.join();
}
void
Scheduler::PostEvent(const EventPtr &event) {
std::lock_guard<std::mutex> lock(event_mutex_);
event_queue_.push(event);
event_cv_.notify_one();
// SERVER_LOG_DEBUG << "Scheduler post " << *event;
}
std::string
Scheduler::Dump() {
return std::string();
}
void
schedule(const ResourceWPtr &res) {
if (auto self = res.lock()) {
for (auto &nei : self->GetNeighbours()) {
if (auto n = nei.neighbour_node.lock()) {
auto neighbour = std::static_pointer_cast<Resource>(n);
push_task(self, neighbour);
}
Scheduler::worker_function() {
while (running_) {
std::unique_lock<std::mutex> lock(event_mutex_);
event_cv_.wait(lock, [this] { return !event_queue_.empty(); });
auto event = event_queue_.front();
if (event == nullptr) {
break;
}
// SERVER_LOG_DEBUG << "Scheduler process " << *event;
event_queue_.pop();
Process(event);
}
}
void
Scheduler::Process(const EventPtr &event) {
switch (event->Type()) {
case EventType::START_UP: {
OnStartUp(event);
break;
}
case EventType::COPY_COMPLETED: {
OnCopyCompleted(event);
break;
}
case EventType::FINISH_TASK: {
OnFinishTask(event);
break;
}
case EventType::TASK_TABLE_UPDATED: {
OnTaskTableUpdated(event);
break;
}
default: {
// TODO: logging
break;
}
}
}
void
Scheduler::OnStartUp(const EventPtr &event) {
schedule(event->resource_);
if (auto resource = event->resource_.lock()) {
resource->WakeupLoader();
}
}
void
Scheduler::OnFinishTask(const EventPtr &event) {
schedule(event->resource_);
if (auto resource = event->resource_.lock()) {
resource->WakeupExecutor();
}
}
void
Scheduler::OnCopyCompleted(const EventPtr &event) {
schedule(event->resource_);
if (auto resource = event->resource_.lock()) {
resource->WakeupLoader();
resource->WakeupExecutor();
if (resource->Type()== ResourceType::DISK) {
Action::PushTaskToNeighbour(event->resource_);
}
}
}
void
Scheduler::OnTaskTableUpdated(const EventPtr &event) {
schedule(event->resource_);
}
std::string
Scheduler::Dump() {
return std::string();
// Action::PushTaskToNeighbour(event->resource_);
if (auto resource = event->resource_.lock()) {
resource->WakeupLoader();
}
}
}
......
......@@ -13,6 +13,7 @@
#include "resource/Resource.h"
#include "ResourceMgr.h"
#include "utils/Log.h"
namespace zilliz {
......@@ -23,20 +24,32 @@ namespace engine {
class Scheduler {
public:
explicit
Scheduler(ResourceMgrWPtr res_mgr)
: running_(false),
res_mgr_(std::move(res_mgr)) {
// res_mgr.Register();
// res_mgr.Register();
// res_mgr.Register();
// res_mgr.Register();
}
Scheduler(ResourceMgrWPtr res_mgr);
Scheduler(const Scheduler &) = delete;
Scheduler(Scheduler &&) = delete;
/*
* Start worker thread;
*/
void
Start();
/*
* Stop worker thread, join it;
*/
void
Start() {
worker_thread_ = std::thread(&Scheduler::worker_thread_, this);
}
Stop();
/*
* Post event to scheduler event queue;
*/
void
PostEvent(const EventPtr &event);
/*
* Dump as string;
*/
std::string
Dump();
......@@ -45,24 +58,37 @@ private:
/*
* Process start up events;
*
* Actions:
* Pull task from neighbours;
*/
void
OnStartUp(const EventPtr &event);
/*
* Process finish task events;
*
* Actions:
* Pull task from neighbours;
*/
void
OnFinishTask(const EventPtr &event);
/*
* Process copy completed events;
*
* Actions:
* Mark task source MOVED;
* Pull task from neighbours;
*/
void
OnCopyCompleted(const EventPtr &event);
/*
* Process task table updated events;
* Process task table updated events, which happened on task_table->put;
*
* Actions:
* Push task to neighbours;
*/
void
OnTaskTableUpdated(const EventPtr &event);
......@@ -72,40 +98,13 @@ private:
* Dispatch event to event handler;
*/
void
Process(const EventPtr &event) {
switch (event->Type()) {
case EventType::START_UP: {
OnStartUp(event);
break;
}
case EventType::COPY_COMPLETED: {
OnCopyCompleted(event);
break;
}
case EventType::FINISH_TASK: {
OnFinishTask(event);
break;
}
case EventType::TASK_TABLE_UPDATED: {
OnTaskTableUpdated(event);
break;
}
default: {
break;
}
}
}
Process(const EventPtr &event);
/*
* Called by worker_thread_;
*/
void
worker_function() {
while (running_) {
auto event = event_queue_.front();
Process(event);
}
}
worker_function();
private:
bool running_;
......@@ -113,6 +112,8 @@ private:
ResourceMgrWPtr res_mgr_;
std::queue<EventPtr> event_queue_;
std::thread worker_thread_;
std::mutex event_mutex_;
std::condition_variable event_cv_;
};
using SchedulerPtr = std::shared_ptr<Scheduler>;
......
......@@ -7,6 +7,7 @@
#include "TaskTable.h"
#include "event/TaskTableUpdatedEvent.h"
#include <vector>
#include <sstream>
namespace zilliz {
......@@ -16,7 +17,9 @@ namespace engine {
void
TaskTable::Put(TaskPtr task) {
std::lock_guard<std::mutex> lock(id_mutex_);
auto item = std::make_shared<TaskTableItem>();
item->id = id_++;
item->task = std::move(task);
item->state = TaskTableItemState::START;
table_.push_back(item);
......@@ -27,8 +30,10 @@ TaskTable::Put(TaskPtr task) {
void
TaskTable::Put(std::vector<TaskPtr> &tasks) {
std::lock_guard<std::mutex> lock(id_mutex_);
for (auto &task : tasks) {
auto item = std::make_shared<TaskTableItem>();
item->id = id_++;
item->task = std::move(task);
item->state = TaskTableItemState::START;
table_.push_back(item);
......@@ -126,9 +131,30 @@ TaskTable::Executed(uint64_t index) {
return false;
}
std::string
ToString(TaskTableItemState state) {
switch (state) {
case TaskTableItemState::INVALID: return "INVALID";
case TaskTableItemState::START: return "START";
case TaskTableItemState::LOADING: return "LOADING";
case TaskTableItemState::LOADED: return "LOADED";
case TaskTableItemState::EXECUTING: return "EXECUTING";
case TaskTableItemState::EXECUTED: return "EXECUTED";
case TaskTableItemState::MOVING: return "MOVING";
case TaskTableItemState::MOVED: return "MOVED";
default: return "";
}
}
std::string
TaskTable::Dump() {
return std::string();
std::stringstream ss;
for (auto &item : table_) {
ss << "<" << item->id;
ss << ", " << ToString(item->state);
ss << ">" << std::endl;
}
return ss.str();
}
}
......
......@@ -49,6 +49,9 @@ class TaskTable {
public:
TaskTable() = default;
TaskTable(const TaskTable &) = delete;
TaskTable(TaskTable &&) = delete;
inline void
RegisterSubscriber(std::function<void(void)> subscriber) {
subscriber_ = std::move(subscriber);
......@@ -167,6 +170,8 @@ public:
private:
// TODO: map better ?
std::uint64_t id_ = 0;
mutable std::mutex id_mutex_;
std::deque<TaskTableItemPtr> table_;
std::function<void(void)> subscriber_ = nullptr;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册