提交 e0f650cf 编写于 作者: W wxyu

MS-361 Add event in resource


Former-commit-id: b0f8d2e22ba4f3576127208968857b91a9c6830f
上级 7996b5a9
......@@ -18,6 +18,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-355 - Add copy interface in ExcutionEngine
- MS-357 - Add minimum schedule function
- MS-359 - Add cost test in new scheduler
- MS-361 - Add event in resource
## New Feature
- MS-343 - Implement ResourceMgr
......
......@@ -7,6 +7,7 @@
#include "ResourceMgr.h"
#include "db/Log.h"
namespace zilliz {
namespace milvus {
namespace engine {
......@@ -21,31 +22,22 @@ ResourceMgr::Add(ResourcePtr &&resource) {
ResourceWPtr ret(resource);
std::lock_guard<std::mutex> lck(resources_mutex_);
if(running_) {
if (running_) {
ENGINE_LOG_ERROR << "ResourceMgr is running, not allow to add resource";
return ret;
}
if (resource->Type() == ResourceType::DISK) {
disk_resources_.emplace_back(ResourceWPtr(resource));
}
resources_.emplace_back(resource);
size_t index = resources_.size() - 1;
// TODO: update interface
// resource->RegisterOnStartUp([&] {
// start_up_event_[index] = true;
// event_cv_.notify_one();
// });
// resource->RegisterOnFinishTask([&] {
// finish_task_event_[index] = true;
// event_cv_.notify_one();
// });
// resource->RegisterOnCopyCompleted([&] {
// copy_completed_event_[index] = true;
// event_cv_.notify_one();
// });
// resource->RegisterOnTaskTableUpdated([&] {
// task_table_updated_event_[index] = true;
// event_cv_.notify_one();
// });
resource->RegisterSubscriber([&](EventPtr event) {
queue_.emplace(event);
std::unique_lock<std::mutex> lock(event_mutex_);
event_cv_.notify_one();
});
return ret;
}
......@@ -61,31 +53,17 @@ ResourceMgr::Connect(ResourceWPtr &res1, ResourceWPtr &res2, Connection &connect
void
ResourceMgr::EventProcess() {
while (running_) {
std::unique_lock <std::mutex> lock(resources_mutex_);
event_cv_.wait(lock, [this] { return !resources_.empty(); });
std::unique_lock<std::mutex> lock(event_mutex_);
event_cv_.wait(lock, [this] { return !queue_.empty(); });
if(!running_) {
if (!running_) {
break;
}
for (uint64_t i = 0; i < resources_.size(); ++i) {
ResourceWPtr res(resources_[i]);
if (start_up_event_[i]) {
on_start_up_(res);
start_up_event_[i] = false;
}
if (finish_task_event_[i]) {
on_finish_task_(res);
finish_task_event_[i] = false;
}
if (copy_completed_event_[i]) {
on_copy_completed_(res);
copy_completed_event_[i] = false;
}
if (task_table_updated_event_[i]) {
on_task_table_updated_(res);
task_table_updated_event_[i] = false;
}
auto event = queue_.front();
queue_.pop();
if (subscriber_) {
subscriber_(event);
}
}
}
......
......@@ -10,10 +10,12 @@
#include <vector>
#include <memory>
#include <mutex>
#include <queue>
#include <condition_variable>
#include "resource/Resource.h"
namespace zilliz {
namespace milvus {
namespace engine {
......@@ -23,6 +25,15 @@ public:
ResourceMgr();
/******** Management Interface ********/
inline void
RegisterSubscriber(std::function<void(EventPtr)> subscriber) {
subscriber_ = std::move(subscriber);
}
std::vector<ResourceWPtr> &
GetDiskResources() {
return disk_resources_;
}
/*
* Add resource into Resource Management;
......@@ -51,41 +62,6 @@ public:
// TODO: add stats interface(low)
public:
/******** Event Register Interface ********/
/*
* Register on start up event;
*/
void
RegisterOnStartUp(std::function<void(ResourceWPtr)> &func) {
on_start_up_ = func;
}
/*
* Register on finish one task event;
*/
void
RegisterOnFinishTask(std::function<void(ResourceWPtr)> &func) {
on_finish_task_ = func;
}
/*
* Register on copy task data completed event;
*/
void
RegisterOnCopyCompleted(std::function<void(ResourceWPtr)> &func) {
on_copy_completed_ = func;
}
/*
* Register on task table updated event;
*/
void
RegisterOnTaskTableUpdated(std::function<void(ResourceWPtr)> &func) {
on_task_table_updated_ = func;
}
public:
/******** Utlitity Functions ********/
......@@ -97,22 +73,19 @@ private:
EventProcess();
private:
std::queue<EventPtr> queue_;
std::function<void(EventPtr)> subscriber_ = nullptr;
bool running_;
std::vector<ResourceWPtr> disk_resources_;
std::vector<ResourcePtr> resources_;
mutable std::mutex resources_mutex_;
std::thread worker_thread_;
std::mutex event_mutex_;
std::condition_variable event_cv_;
std::vector<bool> start_up_event_;
std::vector<bool> finish_task_event_;
std::vector<bool> copy_completed_event_;
std::vector<bool> task_table_updated_event_;
std::function<void(ResourceWPtr)> on_start_up_;
std::function<void(ResourceWPtr)> on_finish_task_;
std::function<void(ResourceWPtr)> on_copy_completed_;
std::function<void(ResourceWPtr)> on_task_table_updated_;
};
using ResourceMgrWPtr = std::weak_ptr<ResourceMgr>;
......
......@@ -45,23 +45,23 @@ schedule(const ResourceWPtr &res) {
}
void
StartUpEvent::Process() {
schedule(resource_);
Scheduler::OnStartUp(const EventPtr &event) {
schedule(event->resource_);
}
void
FinishTaskEvent::Process() {
schedule(resource_);
Scheduler::OnFinishTask(const EventPtr &event) {
schedule(event->resource_);
}
void
CopyCompletedEvent::Process() {
schedule(resource_);
Scheduler::OnCopyCompleted(const EventPtr &event) {
schedule(event->resource_);
}
void
TaskTableUpdatedEvent::Process() {
schedule(resource_);
Scheduler::OnTaskTableUpdated(const EventPtr &event) {
schedule(event->resource_);
}
std::string
......
......@@ -18,60 +18,6 @@ namespace zilliz {
namespace milvus {
namespace engine {
class Event {
public:
explicit
Event(ResourceWPtr &resource) : resource_(resource) {}
public:
virtual void
Process() = 0;
protected:
ResourceWPtr resource_;
};
using EventPtr = std::shared_ptr<Event>;
class StartUpEvent : public Event {
public:
explicit
StartUpEvent(ResourceWPtr &resource) : Event(resource) {}
public:
void
Process() override;
};
class FinishTaskEvent : public Event {
public:
explicit
FinishTaskEvent(ResourceWPtr &resource) : Event(resource) {}
public:
void
Process() override;
};
class CopyCompletedEvent : public Event {
public:
explicit
CopyCompletedEvent(ResourceWPtr &resource) : Event(resource) {}
public:
void
Process() override;
};
class TaskTableUpdatedEvent : public Event {
public:
explicit
TaskTableUpdatedEvent(ResourceWPtr &resource) : Event(resource) {}
public:
void
Process() override;
};
class Scheduler {
public:
......@@ -90,52 +36,65 @@ public:
worker_thread_ = std::thread(&Scheduler::worker_thread_, this);
}
public:
std::string
Dump();
private:
/******** Events ********/
/*
* Process start up events;
*/
inline void
OnStartUp(ResourceWPtr &resource) {
auto event = std::make_shared<StartUpEvent>(resource);
event_queue_.push(event);
}
void
OnStartUp(const EventPtr &event);
/*
* Process finish task events;
*/
inline void
OnFinishTask(ResourceWPtr &resource) {
auto event = std::make_shared<FinishTaskEvent>(resource);
event_queue_.push(event);
}
void
OnFinishTask(const EventPtr &event);
/*
* Process copy completed events;
*/
inline void
OnCopyCompleted(ResourceWPtr &resource) {
auto event = std::make_shared<CopyCompletedEvent>(resource);
event_queue_.push(event);
}
void
OnCopyCompleted(const EventPtr &event);
/*
* Process task table updated events;
*/
inline void
OnTaskTableUpdated(ResourceWPtr &resource) {
auto event = std::make_shared<TaskTableUpdatedEvent>(resource);
event_queue_.push(event);
}
public:
std::string
Dump();
void
OnTaskTableUpdated(const EventPtr &event);
private:
/*
* Dispatch event to event handler;
*/
void
Process(const EventPtr &event) {
switch (event->Type()) {
case EventType::START_UP: {
OnStartUp(event);
break;
}
case EventType::COPY_COMPLETED: {
OnCopyCompleted(event);
break;
}
case EventType::FINISH_TASK: {
OnFinishTask(event);
break;
}
case EventType::TASK_TABLE_UPDATED: {
OnTaskTableUpdated(event);
break;
}
default: {
break;
}
}
}
/*
* Called by worker_thread_;
*/
......@@ -143,7 +102,7 @@ private:
worker_function() {
while (running_) {
auto event = event_queue_.front();
event->Process();
Process(event);
}
}
......
/*******************************************************************************
* copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved
* unauthorized copying of this file, via any medium is strictly prohibited.
* proprietary and confidential.
******************************************************************************/
#pragma once
#include "Event.h"
#include "../TaskTable.h"
namespace zilliz {
namespace milvus {
namespace engine {
class CopyCompletedEvent : public Event {
public:
CopyCompletedEvent(std::weak_ptr<Resource> resource, TaskTableItem &task_table_item)
: Event(EventType::COPY_COMPLETED, std::move(resource)),
task_table_item_(task_table_item) {}
public:
TaskTableItem &task_table_item_;
};
}
}
}
/*******************************************************************************
* copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved
* unauthorized copying of this file, via any medium is strictly prohibited.
* proprietary and confidential.
******************************************************************************/
#pragma once
namespace zilliz {
namespace milvus {
namespace engine {
enum class EventType {
START_UP,
COPY_COMPLETED,
FINISH_TASK,
TASK_TABLE_UPDATED
};
class Resource;
class Event {
public:
explicit
Event(EventType type, std::weak_ptr<Resource> resource)
: type_(type),
resource_(std::move(resource)) {}
inline EventType
Type() const {
return type_;
}
public:
EventType type_;
std::weak_ptr<Resource> resource_;
};
using EventPtr = std::shared_ptr<Event>;
}
}
}
/*******************************************************************************
* copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved
* unauthorized copying of this file, via any medium is strictly prohibited.
* proprietary and confidential.
******************************************************************************/
#pragma once
#include "Event.h"
namespace zilliz {
namespace milvus {
namespace engine {
class FinishTaskEvent : public Event {
public:
FinishTaskEvent(std::weak_ptr<Resource> resource, TaskTableItem &task_table_item)
: Event(EventType::FINISH_TASK, std::move(resource)),
task_table_item_(task_table_item) {}
public:
TaskTableItem &task_table_item_;
};
}
}
}
/*******************************************************************************
* copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved
* unauthorized copying of this file, via any medium is strictly prohibited.
* proprietary and confidential.
******************************************************************************/
#pragma once
#include "Event.h"
namespace zilliz {
namespace milvus {
namespace engine {
class StartUpEvent : public Event {
public:
explicit
StartUpEvent(std::weak_ptr<Resource> resource)
: Event(EventType::START_UP, std::move(resource)) {}
};
}
}
}
\ No newline at end of file
/*******************************************************************************
* copyright 上海赜睿信息科技有限公司(zilliz) - all rights reserved
* unauthorized copying of this file, via any medium is strictly prohibited.
* proprietary and confidential.
******************************************************************************/
#pragma once
#include "Event.h"
namespace zilliz {
namespace milvus {
namespace engine {
class TaskTableUpdatedEvent : public Event {
public:
explicit
TaskTableUpdatedEvent(std::weak_ptr<Resource> resource)
: Event(EventType::TASK_TABLE_UPDATED, std::move(resource)) {}
};
}
}
}
......@@ -70,20 +70,30 @@ void Resource::loader_function() {
auto task = pick_task_load();
if (task) {
LoadFile(task);
GetRegisterFunc(RegisterType::ON_COPY_COMPLETED)->Exec();
if (subscriber_) {
// auto event = std::make_shared<CopyCompletedEvent>(shared_from_this(), task);
// subscriber_(std::static_pointer_cast<Event>(event));
}
}
}
}
void Resource::executor_function() {
GetRegisterFunc(RegisterType::START_UP)->Exec();
if (subscriber_) {
// auto event = std::make_shared<StartUpEvent>(shared_from_this());
// subscriber_(std::static_pointer_cast<Event>(event));
}
while (running_) {
std::unique_lock<std::mutex> lock(exec_mutex_);
exec_cv_.wait(lock, [&] { return exec_flag_; });
auto task = pick_task_execute();
if (task) {
Process(task);
GetRegisterFunc(RegisterType::ON_FINISH_TASK)->Exec();
if (subscriber_) {
// auto event = std::make_shared<FinishTaskEvent>(shared_from_this(), task);
// subscriber_(std::static_pointer_cast<Event>(event));
}
}
}
}
......
......@@ -12,6 +12,10 @@
#include <functional>
#include <condition_variable>
#include "../event/Event.h"
#include "../event/StartUpEvent.h"
#include "../event/CopyCompletedEvent.h"
#include "../event/FinishTaskEvent.h"
#include "../TaskTable.h"
#include "../task/Task.h"
#include "../Cost.h"
......@@ -37,18 +41,28 @@ enum class RegisterType {
ON_TASK_TABLE_UPDATED,
};
class Resource : public Node {
class Resource : public Node, public std::enable_shared_from_this<Resource> {
public:
/*
* Event function MUST be a short function, never blocking;
*/
template <typename T>
void Register_T(const RegisterType& type) {
template<typename T>
void Register_T(const RegisterType &type) {
register_table_.emplace(type, [] { return std::make_shared<T>(); });
}
RegisterHandlerPtr
GetRegisterFunc(const RegisterType& type);
GetRegisterFunc(const RegisterType &type);
inline void
RegisterSubscriber(std::function<void(EventPtr)> subscriber) {
subscriber_ = std::move(subscriber);
}
inline ResourceType
Type() const {
return type_;
}
void
Start();
......@@ -131,8 +145,11 @@ private:
TaskTable task_table_;
std::map<RegisterType, std::function<RegisterHandlerPtr()>> register_table_;
std::function<void(EventPtr)> subscriber_ = nullptr;
bool running_;
bool loader_running_ = false;
bool executor_running_ = false;
std::thread loader_thread_;
std::thread executor_thread_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册