未验证 提交 643fd2f4 编写于 作者: W WangXi 提交者: GitHub

[FleetExecutor]Add interceptor message handle (#37093)

上级 8a2ce0f2
...@@ -27,6 +27,16 @@ Interceptor::Interceptor(int64_t interceptor_id, TaskNode* node) ...@@ -27,6 +27,16 @@ Interceptor::Interceptor(int64_t interceptor_id, TaskNode* node)
Interceptor::~Interceptor() { interceptor_thread_.join(); } Interceptor::~Interceptor() { interceptor_thread_.join(); }
void Interceptor::RegisterInterceptorHandle(InterceptorHandle handle) {
handle_ = handle;
}
void Interceptor::Handle(const InterceptorMessage& msg) {
if (handle_) {
handle_(msg);
}
}
std::condition_variable& Interceptor::GetCondVar() { std::condition_variable& Interceptor::GetCondVar() {
// get the conditional var // get the conditional var
return cond_var_; return cond_var_;
...@@ -47,6 +57,13 @@ bool Interceptor::EnqueueRemoteInterceptorMessage( ...@@ -47,6 +57,13 @@ bool Interceptor::EnqueueRemoteInterceptorMessage(
return true; return true;
} }
void Interceptor::Send(int64_t dst_id,
std::unique_ptr<InterceptorMessage> msg) {
msg->set_src_id(interceptor_id_);
msg->set_dst_id(dst_id);
// send interceptor msg
}
void Interceptor::PoolTheMailbox() { void Interceptor::PoolTheMailbox() {
// pool the local mailbox, parse the Message // pool the local mailbox, parse the Message
while (true) { while (true) {
...@@ -67,6 +84,8 @@ void Interceptor::PoolTheMailbox() { ...@@ -67,6 +84,8 @@ void Interceptor::PoolTheMailbox() {
// break the pooling thread // break the pooling thread
break; break;
} }
Handle(interceptor_message);
} }
} }
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#pragma once #pragma once
#include <condition_variable> #include <condition_variable>
#include <functional>
#include <map> #include <map>
#include <memory> #include <memory>
#include <queue> #include <queue>
...@@ -32,6 +33,9 @@ namespace distributed { ...@@ -32,6 +33,9 @@ namespace distributed {
class TaskNode; class TaskNode;
class Interceptor { class Interceptor {
public:
using InterceptorHandle = std::function<void(const InterceptorMessage&)>;
public: public:
Interceptor() = delete; Interceptor() = delete;
...@@ -39,6 +43,11 @@ class Interceptor { ...@@ -39,6 +43,11 @@ class Interceptor {
virtual ~Interceptor(); virtual ~Interceptor();
// register interceptor handle
void RegisterInterceptorHandle(InterceptorHandle handle);
void Handle(const InterceptorMessage& msg);
// return the interceptor id // return the interceptor id
int64_t GetInterceptorId() const; int64_t GetInterceptorId() const;
...@@ -49,6 +58,8 @@ class Interceptor { ...@@ -49,6 +58,8 @@ class Interceptor {
bool EnqueueRemoteInterceptorMessage( bool EnqueueRemoteInterceptorMessage(
const InterceptorMessage& interceptor_message); const InterceptorMessage& interceptor_message);
void Send(int64_t dst_id, std::unique_ptr<InterceptorMessage> msg);
DISABLE_COPY_AND_ASSIGN(Interceptor); DISABLE_COPY_AND_ASSIGN(Interceptor);
private: private:
...@@ -65,6 +76,9 @@ class Interceptor { ...@@ -65,6 +76,9 @@ class Interceptor {
// node need to be handled by this interceptor // node need to be handled by this interceptor
TaskNode* node_; TaskNode* node_;
// interceptor handle which process message
InterceptorHandle handle_{nullptr};
// mutex to control read/write conflict for remote mailbox // mutex to control read/write conflict for remote mailbox
std::mutex remote_mailbox_mutex_; std::mutex remote_mailbox_mutex_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册