Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Oneflow-Inc
oneflow
提交
ad0c00a1
O
oneflow
项目概览
Oneflow-Inc
/
oneflow
上一次同步 2 年多
通知
13
Star
2733
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
O
oneflow
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
ad0c00a1
编写于
10月 10, 2017
作者:
W
willzhang4a58
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add fd with only read handler
上级
b20dea22
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
35 addition
and
14 deletion
+35
-14
oneflow/core/comm_network/epoll/io_event_poller.cpp
oneflow/core/comm_network/epoll/io_event_poller.cpp
+23
-7
oneflow/core/comm_network/epoll/io_event_poller.h
oneflow/core/comm_network/epoll/io_event_poller.h
+8
-0
oneflow/core/comm_network/epoll/socket_read_helper.cpp
oneflow/core/comm_network/epoll/socket_read_helper.cpp
+1
-1
oneflow/core/comm_network/epoll/socket_write_helper.cpp
oneflow/core/comm_network/epoll/socket_write_helper.cpp
+3
-6
未找到文件。
oneflow/core/comm_network/epoll/io_event_poller.cpp
浏览文件 @
ad0c00a1
...
...
@@ -25,28 +25,44 @@ IOEventPoller::~IOEventPoller() {
void
IOEventPoller
::
AddFd
(
int
fd
,
std
::
function
<
void
()
>
read_handler
,
std
::
function
<
void
()
>
write_handler
)
{
AddFd
(
fd
,
&
read_handler
,
&
write_handler
);
}
void
IOEventPoller
::
AddFdWithOnlyReadHandler
(
int
fd
,
std
::
function
<
void
()
>
read_handler
)
{
AddFd
(
fd
,
&
read_handler
,
nullptr
);
}
void
IOEventPoller
::
Start
()
{
thread_
=
std
::
thread
(
&
IOEventPoller
::
EpollLoop
,
this
);
}
void
IOEventPoller
::
AddFd
(
int
fd
,
std
::
function
<
void
()
>*
read_handler
,
std
::
function
<
void
()
>*
write_handler
)
{
unclosed_fd_cnt_
+=
1
;
fds_
.
push_back
(
fd
);
// Set Fd NONBLOCK
int
opt
=
fcntl
(
fd
,
F_GETFL
);
PCHECK
(
opt
!=
-
1
);
PCHECK
(
fcntl
(
fd
,
F_SETFL
,
opt
|
O_NONBLOCK
)
==
0
);
// Set CLOEXEC
opt
=
fcntl
(
fd
,
F_GETFD
);
PCHECK
(
opt
!=
-
1
);
PCHECK
(
fcntl
(
fd
,
F_SETFD
,
opt
|
FD_CLOEXEC
)
==
0
);
// New IOHandler on Heap
IOHandler
*
io_handler
=
new
IOHandler
;
i
o_handler
->
read_handler
=
read_handler
;
i
o_handler
->
write_handler
=
write_handler
;
i
f
(
read_handler
)
{
io_handler
->
read_handler
=
*
read_handler
;
}
i
f
(
write_handler
)
{
io_handler
->
write_handler
=
*
write_handler
;
}
io_handlers_
.
push_front
(
io_handler
);
// Add Fd to Epoll
epoll_event
ep_event
;
ep_event
.
events
=
EPOLLIN
|
EPOLLOUT
|
EPOLLET
;
ep_event
.
events
=
EPOLLET
;
if
(
read_handler
)
{
ep_event
.
events
|=
EPOLLIN
;
}
if
(
write_handler
)
{
ep_event
.
events
|=
EPOLLOUT
;
}
ep_event
.
data
.
ptr
=
io_handler
;
PCHECK
(
epoll_ctl
(
epfd_
,
EPOLL_CTL_ADD
,
fd
,
&
ep_event
)
==
0
);
}
void
IOEventPoller
::
Start
()
{
thread_
=
std
::
thread
(
&
IOEventPoller
::
EpollLoop
,
this
);
}
void
IOEventPoller
::
EpollLoop
()
{
while
(
unclosed_fd_cnt_
>
0
)
{
int
event_num
=
epoll_wait
(
epfd_
,
ep_events_
,
max_event_num_
,
-
1
);
...
...
oneflow/core/comm_network/epoll/io_event_poller.h
浏览文件 @
ad0c00a1
...
...
@@ -15,15 +15,23 @@ class IOEventPoller final {
void
AddFd
(
int
fd
,
std
::
function
<
void
()
>
read_handler
,
std
::
function
<
void
()
>
write_handler
);
void
AddFdWithOnlyReadHandler
(
int
fd
,
std
::
function
<
void
()
>
read_handler
);
void
Start
();
private:
struct
IOHandler
{
IOHandler
()
{
read_handler
=
[]()
{
UNEXPECTED_RUN
();
};
write_handler
=
[]()
{
UNEXPECTED_RUN
();
};
}
std
::
function
<
void
()
>
read_handler
;
std
::
function
<
void
()
>
write_handler
;
};
void
AddFd
(
int
fd
,
std
::
function
<
void
()
>*
read_handler
,
std
::
function
<
void
()
>*
write_handler
);
void
EpollLoop
();
static
const
int
max_event_num_
;
...
...
oneflow/core/comm_network/epoll/socket_read_helper.cpp
浏览文件 @
ad0c00a1
...
...
@@ -57,7 +57,7 @@ bool SocketReadHelper::DoCurRead(
void
SocketReadHelper
::
SetStatusWhenMsgHeadDone
()
{
switch
(
cur_msg_
.
msg_type
)
{
#define MAKE_ENTRY(x, y) \
case SocketMsgType::k##x: SetStatusWhen##x##MsgHeadDone();
case SocketMsgType::k##x: SetStatusWhen##x##MsgHeadDone();
break;
OF_PP_FOR_EACH_TUPLE
(
MAKE_ENTRY
,
SOCKET_MSG_TYPE_SEQ
);
#undef MAKE_ENTRY
default:
UNEXPECTED_RUN
();
...
...
oneflow/core/comm_network/epoll/socket_write_helper.cpp
浏览文件 @
ad0c00a1
...
...
@@ -21,12 +21,9 @@ SocketWriteHelper::SocketWriteHelper(int sockfd, IOEventPoller* poller) {
sockfd_
=
sockfd
;
queue_not_empty_fd_
=
eventfd
(
0
,
0
);
PCHECK
(
queue_not_empty_fd_
!=
-
1
);
poller
->
AddFd
(
queue_not_empty_fd_
,
std
::
bind
(
&
SocketWriteHelper
::
ProcessQueueNotEmptyEvent
,
this
),
[
this
]()
{
// TODO: delete this log
LOG
(
INFO
)
<<
"fd "
<<
queue_not_empty_fd_
<<
" writeable"
;
});
poller
->
AddFdWithOnlyReadHandler
(
queue_not_empty_fd_
,
std
::
bind
(
&
SocketWriteHelper
::
ProcessQueueNotEmptyEvent
,
this
));
cur_msg_queue_
=
new
std
::
queue
<
SocketMsg
>
;
pending_msg_queue_
=
new
std
::
queue
<
SocketMsg
>
;
cur_write_handle_
=
&
SocketWriteHelper
::
InitMsgWriteHandle
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录