Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
BaiXuePrincess
Paddle
提交
657a4f94
P
Paddle
项目概览
BaiXuePrincess
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
657a4f94
编写于
1月 28, 2019
作者:
Q
Qiao Longfei
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
code can compile
上级
c7e38680
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
38 addition
and
33 deletion
+38
-33
paddle/fluid/operators/distributed/parameter_send.cc
paddle/fluid/operators/distributed/parameter_send.cc
+25
-23
paddle/fluid/operators/distributed/parameter_send.h
paddle/fluid/operators/distributed/parameter_send.h
+8
-6
paddle/fluid/operators/distributed_ops/CMakeLists.txt
paddle/fluid/operators/distributed_ops/CMakeLists.txt
+2
-2
paddle/fluid/operators/distributed_ops/send_op.cc
paddle/fluid/operators/distributed_ops/send_op.cc
+3
-2
未找到文件。
paddle/fluid/operators/distributed/parameter_send.cc
浏览文件 @
657a4f94
...
@@ -38,27 +38,27 @@ using SelectedRows = framework::SelectedRows;
...
@@ -38,27 +38,27 @@ using SelectedRows = framework::SelectedRows;
using
DDim
=
framework
::
DDim
;
using
DDim
=
framework
::
DDim
;
template
<
typename
T
>
template
<
typename
T
>
void
send
(
const
std
::
string
&
var_name
,
void
ParameterSend
<
T
>::
operator
()(
const
std
::
string
&
var_name
,
const
std
::
vector
<
std
::
string
>&
send_varnames
,
const
std
::
vector
<
std
::
string
>
&
send_varnames
,
const
std
::
vector
<
std
::
string
>&
epmap
,
const
std
::
vector
<
std
::
string
>
&
epmap
,
const
std
::
vector
<
int64_t
>&
height_sections
,
const
std
::
vector
<
int64_t
>
&
height_sections
,
const
framework
::
ExecutionContext
&
ctx
,
const
framework
::
Scope
&
scope
,
const
framework
::
ExecutionContext
&
ctx
,
bool
sync
)
{
const
framework
::
Scope
&
scope
,
bool
sync
)
{
framework
::
Scope
*
local_scope
=
scope
.
NewTmpScope
();
framework
::
Scope
*
local_scope
=
scope
.
NewTmpScope
();
platform
::
DeviceContextPool
&
pool
=
platform
::
DeviceContextPool
::
Instance
();
platform
::
DeviceContextPool
&
pool
=
platform
::
DeviceContextPool
::
Instance
();
auto
&
cpu_ctx
=
*
pool
.
Get
(
platform
::
CPUPlace
());
auto
&
cpu_ctx
=
*
pool
.
Get
(
platform
::
CPUPlace
());
auto
&
actual_ctx
=
*
pool
.
Get
(
ctx
.
GetPlace
());
auto
&
actual_ctx
=
*
pool
.
Get
(
ctx
.
GetPlace
());
distributed
::
RPCClient
*
rpc_client
=
distributed
::
RPCClient
*
rpc_client
=
distributed
::
RPCClient
::
GetInstance
<
RPCCLIENT_T
>
(
distributed
::
RPCClient
::
GetInstance
<
RPCCLIENT_T
>
(
ctx
.
Attr
<
int
>
(
"trainer_id"
));
ctx
.
Attr
<
int
>
(
"trainer_id"
));
auto
*
send_var
=
scope
.
FindVar
(
var_name
);
auto
*
send_var
=
scope
.
FindVar
(
var_name
);
size_t
out_num
=
send_varnames
.
size
();
size_t
out_num
=
send_varnames
.
size
();
if
(
send_var
->
IsType
<
framework
::
LoDTensor
>
())
{
if
(
send_var
->
IsType
<
framework
::
LoDTensor
>
())
{
auto
&
send_tensor
=
send_var
->
Get
<
framework
::
LoDTensor
>
();
auto
&
send_tensor
=
send_var
->
Get
<
framework
::
LoDTensor
>
();
auto
&
send_tensor_dims
=
send_tensor
.
dims
();
auto
&
send_tensor_dims
=
send_tensor
.
dims
();
std
::
vector
<
framework
::
DDim
>
outs_dims
;
std
::
vector
<
framework
::
DDim
>
outs_dims
;
outs_dims
.
reserve
(
out_num
);
outs_dims
.
reserve
(
out_num
);
...
@@ -89,13 +89,13 @@ void send(const std::string& var_name,
...
@@ -89,13 +89,13 @@ void send(const std::string& var_name,
// create output var in local scope
// create output var in local scope
size_t
row_offset
=
0
;
size_t
row_offset
=
0
;
for
(
auto
i
=
0
;
i
<
out_num
;
++
i
)
{
for
(
auto
i
=
0
;
i
<
out_num
;
++
i
)
{
auto
*
out
=
auto
*
out
=
local_scope
->
Var
(
send_varnames
[
i
])
->
GetMutable
<
framework
::
Tensor
>
();
local_scope
->
Var
(
send_varnames
[
i
])
->
GetMutable
<
framework
::
Tensor
>
();
*
out
=
send_tensor
.
Slice
(
row_offset
,
row_offset
+
outs_dims
[
i
][
0
]);
*
out
=
send_tensor
.
Slice
(
row_offset
,
row_offset
+
outs_dims
[
i
][
0
]);
row_offset
+=
outs_dims
[
i
][
0
];
row_offset
+=
outs_dims
[
i
][
0
];
}
}
}
else
if
(
send_var
->
IsType
<
framework
::
SelectedRows
>
())
{
}
else
if
(
send_var
->
IsType
<
framework
::
SelectedRows
>
())
{
auto
&
send_slr
=
send_var
->
Get
<
framework
::
SelectedRows
>
();
auto
&
send_slr
=
send_var
->
Get
<
framework
::
SelectedRows
>
();
auto
abs_sections
=
ToAbsoluteSection
(
height_sections
);
auto
abs_sections
=
ToAbsoluteSection
(
height_sections
);
auto
send_rows
=
send_slr
.
rows
();
auto
send_rows
=
send_slr
.
rows
();
...
@@ -109,9 +109,9 @@ void send(const std::string& var_name,
...
@@ -109,9 +109,9 @@ void send(const std::string& var_name,
auto
src
=
send_slr
.
value
().
data
<
T
>
();
auto
src
=
send_slr
.
value
().
data
<
T
>
();
// create output var in local scope
// create output var in local scope
std
::
vector
<
framework
::
SelectedRows
*>
outs
;
std
::
vector
<
framework
::
SelectedRows
*>
outs
;
for
(
auto
&
name
:
send_varnames
)
{
for
(
auto
&
name
:
send_varnames
)
{
auto
*
out
=
local_scope
->
Var
(
name
)
->
GetMutable
<
framework
::
SelectedRows
>
();
auto
*
out
=
local_scope
->
Var
(
name
)
->
GetMutable
<
framework
::
SelectedRows
>
();
outs
.
push_back
(
out
);
outs
.
push_back
(
out
);
}
}
...
@@ -163,8 +163,8 @@ void send(const std::string& var_name,
...
@@ -163,8 +163,8 @@ void send(const std::string& var_name,
std
::
vector
<
distributed
::
VarHandlePtr
>
rets
;
std
::
vector
<
distributed
::
VarHandlePtr
>
rets
;
for
(
size_t
i
=
0
;
i
<
send_varnames
.
size
();
i
++
)
{
for
(
size_t
i
=
0
;
i
<
send_varnames
.
size
();
i
++
)
{
auto
&
send_var_name
=
send_varnames
[
i
];
auto
&
send_var_name
=
send_varnames
[
i
];
auto
&
endpoint
=
epmap
[
i
];
auto
&
endpoint
=
epmap
[
i
];
if
(
NeedSend
(
*
local_scope
,
send_var_name
))
{
if
(
NeedSend
(
*
local_scope
,
send_var_name
))
{
VLOG
(
3
)
<<
"sending "
<<
send_var_name
<<
" to "
<<
endpoint
;
VLOG
(
3
)
<<
"sending "
<<
send_var_name
<<
" to "
<<
endpoint
;
rets
.
push_back
(
rpc_client
->
AsyncSendVar
(
endpoint
,
cpu_ctx
,
*
local_scope
,
rets
.
push_back
(
rpc_client
->
AsyncSendVar
(
endpoint
,
cpu_ctx
,
*
local_scope
,
...
@@ -183,6 +183,8 @@ void send(const std::string& var_name,
...
@@ -183,6 +183,8 @@ void send(const std::string& var_name,
delete
local_scope
;
delete
local_scope
;
}
}
template
struct
ParameterSend
<
float
>;
};
// namespace distributed
};
// namespace distributed
};
// namespace operators
};
// namespace operators
};
// namespace paddle
};
// namespace paddle
paddle/fluid/operators/distributed/parameter_send.h
浏览文件 @
657a4f94
...
@@ -24,12 +24,14 @@ namespace operators {
...
@@ -24,12 +24,14 @@ namespace operators {
namespace
distributed
{
namespace
distributed
{
template
<
typename
T
>
template
<
typename
T
>
void
send
(
const
std
::
string
&
var_name
,
struct
ParameterSend
{
const
std
::
vector
<
std
::
string
>&
send_varnames
,
void
operator
()(
const
std
::
string
&
var_name
,
const
std
::
vector
<
std
::
string
>&
epmap
,
const
std
::
vector
<
std
::
string
>
&
send_varnames
,
const
std
::
vector
<
int64_t
>&
height_sections
,
const
std
::
vector
<
std
::
string
>
&
epmap
,
const
framework
::
ExecutionContext
&
context
,
const
std
::
vector
<
int64_t
>
&
height_sections
,
const
framework
::
Scope
&
scope
,
bool
sync
);
const
framework
::
ExecutionContext
&
context
,
const
framework
::
Scope
&
scope
,
bool
sync
);
};
};
// namespace distributed
};
// namespace distributed
};
// namespace operators
};
// namespace operators
...
...
paddle/fluid/operators/distributed_ops/CMakeLists.txt
浏览文件 @
657a4f94
...
@@ -2,9 +2,9 @@ include(operators)
...
@@ -2,9 +2,9 @@ include(operators)
set
(
DISTRIBUTE_DEPS
""
)
set
(
DISTRIBUTE_DEPS
""
)
if
(
WITH_GRPC
)
if
(
WITH_GRPC
)
set
(
DISTRIBUTE_DEPS sendrecvop_rpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf node
)
set
(
DISTRIBUTE_DEPS sendrecvop_rpc
parameter_send
grpc++_unsecure grpc_unsecure gpr cares zlib protobuf node
)
else
()
else
()
set
(
DISTRIBUTE_DEPS sendrecvop_rpc brpc leveldb snappystream snappy protobuf ssl crypto zlib node
)
set
(
DISTRIBUTE_DEPS sendrecvop_rpc
parameter_send
brpc leveldb snappystream snappy protobuf ssl crypto zlib node
)
if
(
WITH_BRPC_RDMA
)
if
(
WITH_BRPC_RDMA
)
find_library
(
IBVERBS_LIBRARY NAMES ibverbs
)
find_library
(
IBVERBS_LIBRARY NAMES ibverbs
)
ADD_LIBRARY
(
ibverbs SHARED IMPORTED GLOBAL
)
ADD_LIBRARY
(
ibverbs SHARED IMPORTED GLOBAL
)
...
...
paddle/fluid/operators/distributed_ops/send_op.cc
浏览文件 @
657a4f94
...
@@ -51,8 +51,9 @@ class SendOp : public framework::OperatorBase {
...
@@ -51,8 +51,9 @@ class SendOp : public framework::OperatorBase {
platform
::
DeviceContextPool
::
Instance
();
platform
::
DeviceContextPool
::
Instance
();
auto
*
dev_ctx
=
pool
.
Get
(
place
);
auto
*
dev_ctx
=
pool
.
Get
(
place
);
auto
exe_ctx
=
framework
::
ExecutionContext
(
*
this
,
scope
,
*
dev_ctx
,
ctx
);
auto
exe_ctx
=
framework
::
ExecutionContext
(
*
this
,
scope
,
*
dev_ctx
,
ctx
);
distributed
::
send
<
float
>
(
ins
[
0
],
send_varnames
,
epmap
,
height_sections
,
auto
send_functor
=
distributed
::
ParameterSend
<
float
>
();
exe_ctx
,
scope
,
static_cast
<
bool
>
(
sync_send
));
send_functor
(
ins
[
0
],
send_varnames
,
epmap
,
height_sections
,
exe_ctx
,
scope
,
static_cast
<
bool
>
(
sync_send
));
}
else
{
}
else
{
platform
::
DeviceContextPool
&
pool
=
platform
::
DeviceContextPool
&
pool
=
platform
::
DeviceContextPool
::
Instance
();
platform
::
DeviceContextPool
::
Instance
();
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录