From 66634c7e0b574d1b3ff0f50844d5b2871bbea96f Mon Sep 17 00:00:00 2001 From: willzhang4a58 Date: Thu, 28 Sep 2017 16:56:56 +0800 Subject: [PATCH] read helper init Former-commit-id: 61ce791b32c71895be838f0540b0b6f0c0e90592 --- .../comm_network/epoll/socket_read_helper.cpp | 77 +++++++++++++++++++ .../comm_network/epoll/socket_read_helper.h | 27 ++++++- .../epoll/socket_write_helper.cpp | 2 +- 3 files changed, 102 insertions(+), 4 deletions(-) create mode 100644 oneflow/core/comm_network/epoll/socket_read_helper.cpp diff --git a/oneflow/core/comm_network/epoll/socket_read_helper.cpp b/oneflow/core/comm_network/epoll/socket_read_helper.cpp new file mode 100644 index 0000000000..eea978afb4 --- /dev/null +++ b/oneflow/core/comm_network/epoll/socket_read_helper.cpp @@ -0,0 +1,77 @@ +#include "oneflow/core/comm_network/epoll/socket_read_helper.h" +#include "oneflow/core/comm_network/epoll/epoll_data_comm_network.h" + +#ifdef PLATFORM_POSIX + +namespace oneflow { + +SocketReadHelper::~SocketReadHelper() { TODO(); } + +SocketReadHelper::SocketReadHelper(int sockfd, CpuStream* cpu_stream) { + sockfd_ = sockfd; + cpu_stream_ = cpu_stream; + cur_read_handle_ = &SocketReadHelper::MsgHeadReadHandle; + read_ptr_ = reinterpret_cast(&cur_msg_); + read_size_ = sizeof(cur_msg_); +} + +void SocketReadHelper::NotifyMeSocketReadable() { + cpu_stream_->SendWork( + std::bind(&SocketReadHelper::ReadUntilSocketNotReadable, this)); +} + +void SocketReadHelper::ReadUntilSocketNotReadable() { + while ((this->*cur_read_handle_)()) {} +} + +bool SocketReadHelper::MsgHeadReadHandle() { + DoCurRead(&SocketReadHelper::SetStatusWhenMsgHeadDone); +} + +bool SocketReadHelper::MsgBodyReadHandle() { + DoCurRead(&SocketReadHelper::SetStatusWhenMsgBodyDone); +} + +bool SocketReadHelper::DoCurRead( + bool (SocketReadHelper::*set_cur_read_done)()) { + ssize_t n = read(sockfd_, read_ptr_, read_size_); + if (n == read_size_) { + return (this->*set_cur_read_done)(); + } else if (n >= 0) { + read_ptr_ += n; + read_size_ -= n; + return true; + } else { + CHECK_EQ(n, -1); + PCHECK(errno == EAGAIN || errno == EWOULDBLOCK); + return false; + } +} + +bool SocketReadHelper::SetStatusWhenMsgHeadDone() { + switch (cur_msg_.msg_type) { +#define MAKE_ENTRY(x, y) \ + case SocketMsgType::k##x: return SetStatusWhen##x##MsgHeadDone(); + OF_PP_FOR_EACH_TUPLE(MAKE_ENTRY, SOCKET_MSG_TYPE_SEQ); +#undef MAKE_ENTRY + default: UNEXPECTED_RUN(); + } + UNEXPECTED_RUN(); +} + +bool SocketReadHelper::SetStatusWhenMsgBodyDone() { + cur_read_handle_ = &SocketReadHelper::MsgHeadReadHandle; + read_ptr_ = reinterpret_cast(&cur_msg_); + read_size_ = sizeof(cur_msg_); + return true; +} + +bool SocketReadHelper::SetStatusWhenRequestWriteMsgHeadDone() { TODO(); } + +bool SocketReadHelper::SetStatusWhenRequestReadMsgHeadDone() { TODO(); } + +bool SocketReadHelper::SetStatusWhenActorMsgHeadDone() { TODO(); } + +} // namespace oneflow + +#endif // PLATFORM_POSIX diff --git a/oneflow/core/comm_network/epoll/socket_read_helper.h b/oneflow/core/comm_network/epoll/socket_read_helper.h index 86f7990426..29c7f13c8d 100644 --- a/oneflow/core/comm_network/epoll/socket_read_helper.h +++ b/oneflow/core/comm_network/epoll/socket_read_helper.h @@ -1,6 +1,7 @@ #ifndef ONEFLOW_CORE_COMM_NETWORK_EPOLL_SOCKET_READ_HELPER_H_ #define ONEFLOW_CORE_COMM_NETWORK_EPOLL_SOCKET_READ_HELPER_H_ +#include "oneflow/core/comm_network/epoll/socket_message.h" #include "oneflow/core/device/cpu_stream.h" #ifdef PLATFORM_POSIX @@ -11,13 +12,33 @@ class SocketReadHelper final { public: OF_DISALLOW_COPY_AND_MOVE(SocketReadHelper); SocketReadHelper() = delete; - ~SocketReadHelper() { TODO(); } + ~SocketReadHelper(); - SocketReadHelper(int sockfd, CpuStream* cpu_stream) { TODO(); } + SocketReadHelper(int sockfd, CpuStream* cpu_stream); - void NotifyMeSocketReadable() { TODO(); } + void NotifyMeSocketReadable(); private: + void ReadUntilSocketNotReadable(); + + bool MsgHeadReadHandle(); + bool MsgBodyReadHandle(); + + bool DoCurRead(bool (SocketReadHelper::*set_cur_read_done)()); + bool SetStatusWhenMsgHeadDone(); + bool SetStatusWhenMsgBodyDone(); + +#define MAKE_ENTRY(x, y) bool SetStatusWhen##x##MsgHeadDone(); + OF_PP_FOR_EACH_TUPLE(MAKE_ENTRY, SOCKET_MSG_TYPE_SEQ); +#undef MAKE_ENTRY + + int sockfd_; + CpuStream* cpu_stream_; + + SocketMsg cur_msg_; + bool (SocketReadHelper::*cur_read_handle_)(); + char* read_ptr_; + size_t read_size_; }; } // namespace oneflow diff --git a/oneflow/core/comm_network/epoll/socket_write_helper.cpp b/oneflow/core/comm_network/epoll/socket_write_helper.cpp index 163e32dbd4..98066aa07d 100644 --- a/oneflow/core/comm_network/epoll/socket_write_helper.cpp +++ b/oneflow/core/comm_network/epoll/socket_write_helper.cpp @@ -84,7 +84,7 @@ bool SocketWriteHelper::DoCurWrite( if (n == write_size_) { return (this->*set_cur_write_done)(); } else if (n >= 0) { - write_ptr_ = write_ptr_ + n; + write_ptr_ += n; write_size_ -= n; return true; } else { -- GitLab