提交 c126c1a9 编写于 作者: W willzhang4a58

request read msg


Former-commit-id: a1b01564
上级 6e50be08
...@@ -18,7 +18,16 @@ ...@@ -18,7 +18,16 @@
namespace oneflow { namespace oneflow {
enum class SocketMsgType { kRequestWrite = 0, kActor }; #define SOCKET_MSG_TYPE_SEQ \
OF_PP_MAKE_TUPLE_SEQ(RequestWrite, request_write) \
OF_PP_MAKE_TUPLE_SEQ(RequestRead, request_read) \
OF_PP_MAKE_TUPLE_SEQ(Actor, actor)
enum class SocketMsgType {
#define MAKE_ENTRY(x, y) k##x,
OF_PP_FOR_EACH_TUPLE(MAKE_ENTRY, SOCKET_MSG_TYPE_SEQ)
#undef MAKE_ENTRY
};
struct RequestWriteMsg { struct RequestWriteMsg {
const void* src_token; const void* src_token;
...@@ -27,11 +36,18 @@ struct RequestWriteMsg { ...@@ -27,11 +36,18 @@ struct RequestWriteMsg {
void* read_id; void* read_id;
}; };
struct RequestReadMsg {
const void* src_token;
const void* dst_token;
void* read_id;
};
struct SocketMsg { struct SocketMsg {
SocketMsgType msg_type; SocketMsgType msg_type;
union { union {
RequestWriteMsg request_write_msg; #define MAKE_ENTRY(x, y) x##Msg y##_msg;
ActorMsg actor_msg; OF_PP_FOR_EACH_TUPLE(MAKE_ENTRY, SOCKET_MSG_TYPE_SEQ)
#undef MAKE_ENTRY
}; };
}; };
......
#include "oneflow/core/comm_network/epoll/socket_write_helper.h" #include "oneflow/core/comm_network/epoll/socket_write_helper.h"
#include "oneflow/core/comm_network/epoll/socket_memory_desc.h"
#ifdef PLATFORM_POSIX #ifdef PLATFORM_POSIX
...@@ -93,13 +94,41 @@ bool SocketWriteHelper::DoCurWrite( ...@@ -93,13 +94,41 @@ bool SocketWriteHelper::DoCurWrite(
} }
} }
bool SocketWriteHelper::SetStatusWhenMsgHeadDone() { TODO(); } bool SocketWriteHelper::SetStatusWhenMsgHeadDone() {
switch (cur_msg_.msg_type) {
#define MAKE_ENTRY(x, y) \
case SocketMsgType::k##x: return SetStatusWhen##x##MsgHeadDone();
OF_PP_FOR_EACH_TUPLE(MAKE_ENTRY, SOCKET_MSG_TYPE_SEQ);
#undef MAKE_ENTRY
default: UNEXPECTED_RUN();
}
UNEXPECTED_RUN();
}
bool SocketWriteHelper::SetStatusWhenMsgBodyDone() { bool SocketWriteHelper::SetStatusWhenMsgBodyDone() {
cur_write_handle_ = &SocketWriteHelper::InitMsgWriteHandle; cur_write_handle_ = &SocketWriteHelper::InitMsgWriteHandle;
return true; return true;
} }
bool SocketWriteHelper::SetStatusWhenRequestWriteMsgHeadDone() {
cur_write_handle_ = &SocketWriteHelper::InitMsgWriteHandle;
return true;
}
bool SocketWriteHelper::SetStatusWhenRequestReadMsgHeadDone() {
const void* src_token = cur_msg_.request_read_msg.src_token;
auto src_mem_desc = static_cast<const SocketMemDesc*>(src_token);
write_ptr_ = reinterpret_cast<const char*>(src_mem_desc->mem_ptr);
write_size_ = src_mem_desc->byte_size;
cur_write_handle_ = &SocketWriteHelper::MsgBodyWriteHandle;
return true;
}
bool SocketWriteHelper::SetStatusWhenActorMsgHeadDone() {
cur_write_handle_ = &SocketWriteHelper::InitMsgWriteHandle;
return true;
}
} // namespace oneflow } // namespace oneflow
#endif // PLATFORM_POSIX #endif // PLATFORM_POSIX
...@@ -31,6 +31,10 @@ class SocketWriteHelper final { ...@@ -31,6 +31,10 @@ class SocketWriteHelper final {
bool SetStatusWhenMsgHeadDone(); bool SetStatusWhenMsgHeadDone();
bool SetStatusWhenMsgBodyDone(); bool SetStatusWhenMsgBodyDone();
#define MAKE_ENTRY(x, y) bool SetStatusWhen##x##MsgHeadDone();
OF_PP_FOR_EACH_TUPLE(MAKE_ENTRY, SOCKET_MSG_TYPE_SEQ);
#undef MAKE_ENTRY
int sockfd_; int sockfd_;
CpuStream* cpu_stream_; CpuStream* cpu_stream_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册