Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Oneflow-Inc
oneflow
提交
bebddcc9
O
oneflow
项目概览
Oneflow-Inc
/
oneflow
上一次同步 2 年多
通知
13
Star
2733
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
O
oneflow
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
bebddcc9
编写于
1月 17, 2019
作者:
S
Shiyuan Shang-Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
update
Former-commit-id: 97b0367c3d63c40a2c2ba6a4b272796ed706088a
上级
e2799a39
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
17 addition
and
10 deletion
+17
-10
oneflow/core/comm_network/epoll/epoll_comm_network.cpp
oneflow/core/comm_network/epoll/epoll_comm_network.cpp
+10
-6
oneflow/core/comm_network/epoll/socket_message.h
oneflow/core/comm_network/epoll/socket_message.h
+2
-0
oneflow/core/comm_network/epoll/socket_read_helper.cpp
oneflow/core/comm_network/epoll/socket_read_helper.cpp
+2
-2
oneflow/core/comm_network/epoll/socket_write_helper.cpp
oneflow/core/comm_network/epoll/socket_write_helper.cpp
+3
-2
未找到文件。
oneflow/core/comm_network/epoll/epoll_comm_network.cpp
浏览文件 @
bebddcc9
...
...
@@ -71,7 +71,8 @@ void EpollCommNet::SendActorMsg(int64_t dst_machine_id, const ActorMsg& actor_ms
SocketMsg
msg
;
msg
.
msg_type
=
SocketMsgType
::
kActor
;
msg
.
actor_msg
=
actor_msg
;
int32_t
link_i
=
std
::
uniform_int_distribution
<
int32_t
>
(
0
,
epoll_conf_
.
link_num
())(
random_gen_
);
int32_t
link_i
=
std
::
uniform_int_distribution
<
int32_t
>
(
0
,
epoll_conf_
.
link_num
()
-
1
)(
random_gen_
);
GetSocketHelper
(
dst_machine_id
,
link_i
)
->
AsyncWrite
(
msg
);
}
...
...
@@ -90,8 +91,10 @@ void EpollCommNet::SendSocketMsg(int64_t dst_machine_id, const SocketMsg& total_
total_byte_size
-=
offset
;
SocketMsg
msg
;
msg
.
msg_type
=
total_msg
.
msg_type
;
msg
.
request_read_msg
.
src_token
=
NewMemDesc
(
src_mem_desc
->
mem_ptr
+
link_i
*
offset
,
byte_size
);
msg
.
request_read_msg
.
dst_token
=
NewMemDesc
(
dst_mem_desc
->
mem_ptr
+
link_i
*
offset
,
byte_size
);
msg
.
request_read_msg
.
src_token
=
total_msg
.
request_read_msg
.
src_token
;
msg
.
request_read_msg
.
dst_token
=
total_msg
.
request_read_msg
.
dst_token
;
msg
.
request_read_msg
.
offset
=
link_i
*
offset
;
msg
.
request_read_msg
.
byte_size
=
byte_size
;
msg
.
request_read_msg
.
read_id
=
total_msg
.
request_read_msg
.
read_id
;
msg
.
request_read_msg
.
part_num
=
part_num
;
GetSocketHelper
(
dst_machine_id
,
link_i
)
->
AsyncWrite
(
msg
);
...
...
@@ -189,7 +192,7 @@ void EpollCommNet::InitSockets() {
}
SocketHelper
*
EpollCommNet
::
GetSocketHelper
(
int64_t
machine_id
,
int32_t
link_index
)
{
int
sockfd
=
machine_id2sockfds_
.
at
(
machine_id
*
epoll_conf_
.
link_num
()
+
link_index
)
;
int
sockfd
=
machine_id2sockfds_
[
machine_id
*
epoll_conf_
.
link_num
()
+
link_index
]
;
return
sockfd2helper_
.
at
(
sockfd
);
}
...
...
@@ -201,13 +204,14 @@ void EpollCommNet::DoRead(void* read_id, int64_t src_machine_id, void* src_token
msg
.
request_write_msg
.
dst_machine_id
=
Global
<
MachineCtx
>::
Get
()
->
this_machine_id
();
msg
.
request_write_msg
.
dst_token
=
dst_token
;
msg
.
request_write_msg
.
read_id
=
read_id
;
int32_t
link_i
=
std
::
uniform_int_distribution
<
int32_t
>
(
0
,
epoll_conf_
.
link_num
())(
random_gen_
);
int32_t
link_i
=
std
::
uniform_int_distribution
<
int32_t
>
(
0
,
epoll_conf_
.
link_num
()
-
1
)(
random_gen_
);
GetSocketHelper
(
src_machine_id
,
link_i
)
->
AsyncWrite
(
msg
);
}
void
EpollCommNet
::
PartReadDone
(
void
*
read_id
,
int32_t
part_num
)
{
int32_t
&
part_read_done_cnt
=
read_id2part_done_cnt_
.
at
(
read_id
);
std
::
unique_lock
<
std
::
mutex
>
lck
(
part_done_cnt_mtx_
);
int32_t
&
part_read_done_cnt
=
read_id2part_done_cnt_
.
at
(
read_id
);
part_read_done_cnt
++
;
if
(
part_read_done_cnt
==
part_num
)
{
ReadDone
(
read_id
);
...
...
oneflow/core/comm_network/epoll/socket_message.h
浏览文件 @
bebddcc9
...
...
@@ -40,6 +40,8 @@ struct RequestWriteMsg {
struct
RequestReadMsg
{
void
*
src_token
;
void
*
dst_token
;
int64_t
offset
;
int64_t
byte_size
;
void
*
read_id
;
int32_t
part_num
;
};
...
...
oneflow/core/comm_network/epoll/socket_read_helper.cpp
浏览文件 @
bebddcc9
...
...
@@ -82,8 +82,8 @@ void SocketReadHelper::SetStatusWhenRequestWriteMsgHeadDone() {
void
SocketReadHelper
::
SetStatusWhenRequestReadMsgHeadDone
()
{
auto
mem_desc
=
static_cast
<
const
SocketMemDesc
*>
(
cur_msg_
.
request_read_msg
.
dst_token
);
read_ptr_
=
reinterpret_cast
<
char
*>
(
mem_desc
->
mem_ptr
);
read_size_
=
mem_desc
->
byte_size
;
read_ptr_
=
reinterpret_cast
<
char
*>
(
mem_desc
->
mem_ptr
)
+
cur_msg_
.
request_read_msg
.
offset
;
read_size_
=
cur_msg_
.
request_read_msg
.
byte_size
;
cur_read_handle_
=
&
SocketReadHelper
::
MsgBodyReadHandle
;
}
...
...
oneflow/core/comm_network/epoll/socket_write_helper.cpp
浏览文件 @
bebddcc9
...
...
@@ -116,8 +116,9 @@ void SocketWriteHelper::SetStatusWhenRequestWriteMsgHeadDone() {
void
SocketWriteHelper
::
SetStatusWhenRequestReadMsgHeadDone
()
{
const
void
*
src_token
=
cur_msg_
.
request_read_msg
.
src_token
;
auto
src_mem_desc
=
static_cast
<
const
SocketMemDesc
*>
(
src_token
);
write_ptr_
=
reinterpret_cast
<
const
char
*>
(
src_mem_desc
->
mem_ptr
);
write_size_
=
src_mem_desc
->
byte_size
;
write_ptr_
=
reinterpret_cast
<
const
char
*>
(
src_mem_desc
->
mem_ptr
)
+
cur_msg_
.
request_read_msg
.
offset
;
write_size_
=
cur_msg_
.
request_read_msg
.
byte_size
;
cur_write_handle_
=
&
SocketWriteHelper
::
MsgBodyWriteHandle
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录