提交 66634c7e 编写于 作者: W willzhang4a58

read helper init


Former-commit-id: 61ce791b
上级 c126c1a9
#include "oneflow/core/comm_network/epoll/socket_read_helper.h"
#include "oneflow/core/comm_network/epoll/epoll_data_comm_network.h"
#ifdef PLATFORM_POSIX
namespace oneflow {
SocketReadHelper::~SocketReadHelper() { TODO(); }
SocketReadHelper::SocketReadHelper(int sockfd, CpuStream* cpu_stream) {
sockfd_ = sockfd;
cpu_stream_ = cpu_stream;
cur_read_handle_ = &SocketReadHelper::MsgHeadReadHandle;
read_ptr_ = reinterpret_cast<char*>(&cur_msg_);
read_size_ = sizeof(cur_msg_);
}
void SocketReadHelper::NotifyMeSocketReadable() {
cpu_stream_->SendWork(
std::bind(&SocketReadHelper::ReadUntilSocketNotReadable, this));
}
void SocketReadHelper::ReadUntilSocketNotReadable() {
while ((this->*cur_read_handle_)()) {}
}
bool SocketReadHelper::MsgHeadReadHandle() {
DoCurRead(&SocketReadHelper::SetStatusWhenMsgHeadDone);
}
bool SocketReadHelper::MsgBodyReadHandle() {
DoCurRead(&SocketReadHelper::SetStatusWhenMsgBodyDone);
}
bool SocketReadHelper::DoCurRead(
bool (SocketReadHelper::*set_cur_read_done)()) {
ssize_t n = read(sockfd_, read_ptr_, read_size_);
if (n == read_size_) {
return (this->*set_cur_read_done)();
} else if (n >= 0) {
read_ptr_ += n;
read_size_ -= n;
return true;
} else {
CHECK_EQ(n, -1);
PCHECK(errno == EAGAIN || errno == EWOULDBLOCK);
return false;
}
}
bool SocketReadHelper::SetStatusWhenMsgHeadDone() {
switch (cur_msg_.msg_type) {
#define MAKE_ENTRY(x, y) \
case SocketMsgType::k##x: return SetStatusWhen##x##MsgHeadDone();
OF_PP_FOR_EACH_TUPLE(MAKE_ENTRY, SOCKET_MSG_TYPE_SEQ);
#undef MAKE_ENTRY
default: UNEXPECTED_RUN();
}
UNEXPECTED_RUN();
}
bool SocketReadHelper::SetStatusWhenMsgBodyDone() {
cur_read_handle_ = &SocketReadHelper::MsgHeadReadHandle;
read_ptr_ = reinterpret_cast<char*>(&cur_msg_);
read_size_ = sizeof(cur_msg_);
return true;
}
bool SocketReadHelper::SetStatusWhenRequestWriteMsgHeadDone() { TODO(); }
bool SocketReadHelper::SetStatusWhenRequestReadMsgHeadDone() { TODO(); }
bool SocketReadHelper::SetStatusWhenActorMsgHeadDone() { TODO(); }
} // namespace oneflow
#endif // PLATFORM_POSIX
#ifndef ONEFLOW_CORE_COMM_NETWORK_EPOLL_SOCKET_READ_HELPER_H_
#define ONEFLOW_CORE_COMM_NETWORK_EPOLL_SOCKET_READ_HELPER_H_
#include "oneflow/core/comm_network/epoll/socket_message.h"
#include "oneflow/core/device/cpu_stream.h"
#ifdef PLATFORM_POSIX
......@@ -11,13 +12,33 @@ class SocketReadHelper final {
public:
OF_DISALLOW_COPY_AND_MOVE(SocketReadHelper);
SocketReadHelper() = delete;
~SocketReadHelper() { TODO(); }
~SocketReadHelper();
SocketReadHelper(int sockfd, CpuStream* cpu_stream) { TODO(); }
SocketReadHelper(int sockfd, CpuStream* cpu_stream);
void NotifyMeSocketReadable() { TODO(); }
void NotifyMeSocketReadable();
private:
void ReadUntilSocketNotReadable();
bool MsgHeadReadHandle();
bool MsgBodyReadHandle();
bool DoCurRead(bool (SocketReadHelper::*set_cur_read_done)());
bool SetStatusWhenMsgHeadDone();
bool SetStatusWhenMsgBodyDone();
#define MAKE_ENTRY(x, y) bool SetStatusWhen##x##MsgHeadDone();
OF_PP_FOR_EACH_TUPLE(MAKE_ENTRY, SOCKET_MSG_TYPE_SEQ);
#undef MAKE_ENTRY
int sockfd_;
CpuStream* cpu_stream_;
SocketMsg cur_msg_;
bool (SocketReadHelper::*cur_read_handle_)();
char* read_ptr_;
size_t read_size_;
};
} // namespace oneflow
......
......@@ -84,7 +84,7 @@ bool SocketWriteHelper::DoCurWrite(
if (n == write_size_) {
return (this->*set_cur_write_done)();
} else if (n >= 0) {
write_ptr_ = write_ptr_ + n;
write_ptr_ += n;
write_size_ -= n;
return true;
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册