Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Paddle
提交
9cc1eb43
P
Paddle
项目概览
PaddlePaddle
/
Paddle
大约 1 年 前同步成功
通知
2298
Star
20931
Fork
5422
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1423
列表
看板
标记
里程碑
合并请求
543
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1,423
Issue
1,423
列表
看板
标记
里程碑
合并请求
543
合并请求
543
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
9cc1eb43
编写于
6月 21, 2018
作者:
Y
Yancey
提交者:
GitHub
6月 21, 2018
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11221 from Yancey1989/overlap_memcpy_with_dist
overlap rpc op memcpy in distributed training
上级
bfe5dc63
7e6518e8
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
127 addition
and
62 deletion
+127
-62
paddle/fluid/framework/details/multi_devices_graph_builder.cc
...le/fluid/framework/details/multi_devices_graph_builder.cc
+95
-50
paddle/fluid/framework/details/multi_devices_graph_builder.h
paddle/fluid/framework/details/multi_devices_graph_builder.h
+10
-7
paddle/fluid/framework/details/ssa_graph_builder.h
paddle/fluid/framework/details/ssa_graph_builder.h
+1
-0
paddle/fluid/framework/parallel_executor.cc
paddle/fluid/framework/parallel_executor.cc
+18
-5
paddle/fluid/framework/parallel_executor.h
paddle/fluid/framework/parallel_executor.h
+3
-0
未找到文件。
paddle/fluid/framework/details/multi_devices_graph_builder.cc
浏览文件 @
9cc1eb43
...
...
@@ -57,6 +57,7 @@ MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder(
for
(
auto
&
p
:
params
)
{
grad_names_
.
insert
(
GradVarName
(
p
));
}
balance_vars_
.
resize
(
places_
.
size
(),
0
);
}
void
MultiDevSSAGraphBuilder
::
CreateOpHandleIOs
(
SSAGraph
*
result
,
...
...
@@ -140,11 +141,30 @@ bool MultiDevSSAGraphBuilder::IsDistTrainOp(
checker
(
op
.
InputArgumentNames
(),
recv_vars
);
}
size_t
MultiDevSSAGraphBuilder
::
GetAppropriateDeviceID
(
const
std
::
vector
<
std
::
string
>
&
var_names
)
const
{
int64_t
numel_sum
=
0
;
for
(
auto
var_name
:
var_names
)
{
auto
var_desc
=
all_vars_
.
at
(
var_name
);
PADDLE_ENFORCE_NOT_NULL
(
var_desc
);
auto
dim
=
framework
::
make_ddim
(
var_desc
->
GetShape
());
int64_t
numel
=
framework
::
product
(
dim
);
PADDLE_ENFORCE_GT
(
numel
,
0
);
numel_sum
+=
numel
;
}
auto
smallest
=
std
::
min_element
(
std
::
begin
(
balance_vars_
),
std
::
end
(
balance_vars_
));
size_t
dev_id
=
static_cast
<
size_t
>
(
std
::
distance
(
std
::
begin
(
balance_vars_
),
smallest
));
balance_vars_
[
dev_id
]
+=
numel_sum
;
return
dev_id
;
}
std
::
unique_ptr
<
SSAGraph
>
MultiDevSSAGraphBuilder
::
Build
(
const
ProgramDesc
&
program
)
const
{
std
::
unordered_map
<
std
::
string
,
VarDesc
*>
all_vars
;
for
(
auto
*
var
:
program
.
Block
(
0
).
AllVars
())
{
all_vars
[
var
->
Name
()]
=
var
;
all_vars
_
.
emplace
(
var
->
Name
(),
var
)
;
}
auto
graph
=
new
SSAGraph
();
...
...
@@ -161,35 +181,16 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
auto
send_vars
=
FindDistTrainSendVars
(
program
);
auto
recv_vars
=
FindDistTrainRecvVars
(
program
);
std
::
vector
<
std
::
unordered_set
<
std
::
string
>>
var_name_on_devices
;
std
::
vector
<
std
::
unordered_set
<
std
::
string
>>
bcast_var_name_set
;
var_name_on_devices
.
resize
(
places_
.
size
());
bcast_var_name_set
.
resize
(
places_
.
size
());
size_t
cur_device_id
=
0
;
std
::
vector
<
int64_t
>
balance_grads
(
places_
.
size
(),
0
);
auto
get_appropriate_dev
=
[
&
](
std
::
string
&
g_name
)
->
size_t
{
auto
var_desc
=
all_vars
.
at
(
g_name
);
PADDLE_ENFORCE_NOT_NULL
(
var_desc
);
auto
dim
=
framework
::
make_ddim
(
var_desc
->
GetShape
());
int64_t
numel
=
framework
::
product
(
dim
);
PADDLE_ENFORCE_GE
(
numel
,
0
);
auto
smallest
=
std
::
min_element
(
std
::
begin
(
balance_grads
),
std
::
end
(
balance_grads
));
size_t
dev_id
=
static_cast
<
size_t
>
(
std
::
distance
(
std
::
begin
(
balance_grads
),
smallest
));
balance_grads
[
dev_id
]
+=
numel
;
return
dev_id
;
};
bool
is_forwarding
=
true
;
for
(
auto
*
op
:
program
.
Block
(
0
).
AllOps
())
{
if
(
boost
::
get
<
int
>
(
op
->
GetAttr
(
OpProtoAndCheckerMaker
::
OpRoleAttrName
()))
==
static_cast
<
int
>
(
OpRole
::
kRPC
))
{
// append rpc op if program is distributed trainer main program.
// always use the first device
CreateRPCOp
(
&
result
,
*
op
);
}
else
if
(
IsDistTrainOp
(
*
op
,
send_vars
,
recv_vars
))
{
CreateDistTrainOp
(
&
result
,
*
op
);
...
...
@@ -201,13 +202,13 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
}
is_forwarding
=
false
;
}
else
{
int
op_dev_id
=
GetOpDeviceID
(
var_name_on_devices
,
*
op
);
int
op_dev_id
=
GetOpDeviceID
(
*
op
);
if
(
op_dev_id
==
-
1
)
{
// var on all device
CreateComputationalOps
(
&
result
,
*
op
,
places_
.
size
());
}
else
{
CreateComputationalOp
(
&
result
,
*
op
,
op_dev_id
);
for
(
auto
&
var_name
:
op
->
OutputArgumentNames
())
{
var_name_on_devices
[
op_dev_id
].
emplace
(
var_name
);
var_name_on_devices
_
.
emplace
(
var_name
,
op_dev_id
);
}
}
if
(
!
is_forwarding
&&
places_
.
size
()
>
1
)
{
...
...
@@ -230,13 +231,13 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
switch
(
strategy_
.
reduce_
)
{
case
BuildStrategy
::
ReduceStrategy
::
kReduce
:
cur_device_id
=
get_appropriate_dev
(
g_name
);
cur_device_id
=
GetAppropriateDeviceID
({
g_name
}
);
CreateReduceOp
(
&
result
,
g_name
,
cur_device_id
);
var_name_on_devices
[
cur_device_id
].
emplace
(
g_name
);
var_name_on_devices
_
.
emplace
(
g_name
,
cur_device_id
);
bcast_var_name_set
[
cur_device_id
].
emplace
(
p_name
);
break
;
case
BuildStrategy
::
ReduceStrategy
::
kAllReduce
:
if
(
IsSparseGradient
(
all_vars
,
g_name
))
{
if
(
IsSparseGradient
(
g_name
))
{
CreateReduceOp
(
&
result
,
g_name
,
0
);
CreateBroadcastOp
(
&
result
,
g_name
,
0
);
}
else
{
...
...
@@ -273,11 +274,9 @@ std::unique_ptr<SSAGraph> MultiDevSSAGraphBuilder::Build(
return
std
::
unique_ptr
<
SSAGraph
>
(
graph
);
}
bool
MultiDevSSAGraphBuilder
::
IsSparseGradient
(
const
std
::
unordered_map
<
std
::
string
,
VarDesc
*>
&
all_vars
,
const
std
::
string
&
og
)
const
{
PADDLE_ENFORCE
(
all_vars
.
count
(
og
)
!=
0
);
if
(
all_vars
.
at
(
og
)
->
GetType
()
==
proto
::
VarType
::
SELECTED_ROWS
)
{
bool
MultiDevSSAGraphBuilder
::
IsSparseGradient
(
const
std
::
string
&
og
)
const
{
PADDLE_ENFORCE
(
all_vars_
.
count
(
og
)
!=
0
);
if
(
all_vars_
.
at
(
og
)
->
GetType
()
==
proto
::
VarType
::
SELECTED_ROWS
)
{
return
true
;
}
return
false
;
...
...
@@ -363,24 +362,23 @@ bool MultiDevSSAGraphBuilder::IsParameterGradientOnce(
return
is_pg_once
;
}
int
MultiDevSSAGraphBuilder
::
GetOpDeviceID
(
const
std
::
vector
<
std
::
unordered_set
<
std
::
string
>>
&
var_name_on_devices
,
const
OpDesc
&
op
)
const
{
int
MultiDevSSAGraphBuilder
::
GetOpDeviceID
(
const
OpDesc
&
op
)
const
{
if
(
strategy_
.
reduce_
!=
BuildStrategy
::
ReduceStrategy
::
kReduce
)
{
return
-
1
;
}
int
var_dev_id
=
-
1
;
for
(
auto
&
var_name
:
op
.
InputArgumentNames
())
{
if
(
var_dev_id
!=
-
1
)
break
;
for
(
size_t
i
=
0
;
i
<
var_name_on_devices
.
size
();
++
i
)
{
if
(
var_name_on_devices
[
i
].
count
(
var_name
))
{
var_dev_id
=
static_cast
<
int
>
(
i
);
break
;
}
for
(
auto
&
varname
:
op
.
InputArgumentNames
())
{
int
dev_id
=
GetVarDeviceID
(
varname
);
if
(
dev_id
!=
-
1
)
{
return
dev_id
;
}
}
return
var_dev_id
;
return
-
1
;
}
int
MultiDevSSAGraphBuilder
::
GetVarDeviceID
(
const
std
::
string
&
varname
)
const
{
auto
got
=
var_name_on_devices_
.
find
(
varname
);
return
got
==
var_name_on_devices_
.
end
()
?
-
1
:
got
->
second
;
}
void
MultiDevSSAGraphBuilder
::
CreateScaleLossGradOp
(
SSAGraph
*
result
)
const
{
...
...
@@ -463,7 +461,30 @@ void MultiDevSSAGraphBuilder::ConnectOp(SSAGraph *result, OpHandleBase *op,
void
MultiDevSSAGraphBuilder
::
CreateDistTrainOp
(
SSAGraph
*
result
,
const
OpDesc
&
op
)
const
{
CreateComputationalOp
(
result
,
op
,
0
);
int
op_dev_id
=
-
1
;
if
(
op
.
Type
()
==
"split_byref"
)
{
op_dev_id
=
GetVarDeviceID
(
op
.
InputArgumentNames
()[
0
]);
if
(
strategy_
.
reduce_
==
BuildStrategy
::
ReduceStrategy
::
kAllReduce
)
{
op_dev_id
=
GetAppropriateDeviceID
(
op
.
InputArgumentNames
());
for
(
auto
&
varname
:
op
.
InputArgumentNames
())
{
var_name_on_devices_
.
emplace
(
varname
,
op_dev_id
);
}
}
for
(
auto
&
varname
:
op
.
OutputArgumentNames
())
{
var_name_on_devices_
.
emplace
(
varname
,
op_dev_id
);
}
}
else
if
(
op
.
Type
()
==
"concat"
)
{
op_dev_id
=
GetVarDeviceID
(
op
.
InputArgumentNames
()[
0
]);
}
else
{
PADDLE_ENFORCE
(
"the distribute training related op should be in [split_byref, "
"concat]."
);
}
PADDLE_ENFORCE
(
op_dev_id
!=
-
1
,
"can not find right place for distributed op: %s"
,
op
.
Type
());
CreateComputationalOp
(
result
,
op
,
op_dev_id
);
if
(
op
.
Type
()
==
"concat"
)
{
ConnectOp
(
result
,
result
->
ops_
.
back
().
get
(),
"fetch_barrier"
);
}
...
...
@@ -471,8 +492,34 @@ void MultiDevSSAGraphBuilder::CreateDistTrainOp(SSAGraph *result,
void
MultiDevSSAGraphBuilder
::
CreateRPCOp
(
SSAGraph
*
result
,
const
OpDesc
&
op
)
const
{
result
->
ops_
.
emplace_back
(
new
RPCOpHandle
(
op
,
local_scopes_
[
0
],
op
.
Type
(),
places_
[
0
]));
int
op_dev_id
=
-
1
;
if
(
op
.
Type
()
==
"send"
)
{
op_dev_id
=
GetVarDeviceID
(
op
.
InputArgumentNames
()[
0
]);
// the variable name which contains .block means it was splited by
// split_byref op
// so that we can balance the variable blocks to all the pserver instances.
if
(
strategy_
.
reduce_
==
BuildStrategy
::
ReduceStrategy
::
kAllReduce
&&
op
.
InputArgumentNames
()[
0
].
find
(
".block"
)
==
std
::
string
::
npos
)
{
op_dev_id
=
GetAppropriateDeviceID
(
op
.
InputArgumentNames
());
for
(
auto
&
varname
:
op
.
InputArgumentNames
())
{
var_name_on_devices_
.
emplace
(
varname
,
op_dev_id
);
}
}
}
else
if
(
op
.
Type
()
==
"recv"
)
{
op_dev_id
=
GetAppropriateDeviceID
(
op
.
OutputArgumentNames
());
for
(
auto
&
varname
:
op
.
OutputArgumentNames
())
{
var_name_on_devices_
.
emplace
(
varname
,
op_dev_id
);
}
}
else
{
// send_barrier and fetch_barrier op can be scheduled on device 0
op_dev_id
=
0
;
}
PADDLE_ENFORCE
(
op_dev_id
!=
-
1
,
"can not find the right place for rpc op: %s"
,
op
.
Type
());
result
->
ops_
.
emplace_back
(
new
RPCOpHandle
(
op
,
local_scopes_
[
op_dev_id
],
op
.
Type
(),
places_
[
op_dev_id
]));
if
(
op
.
Type
()
==
"send_barrier"
)
{
ConnectOp
(
result
,
result
->
ops_
.
back
().
get
(),
"send"
);
...
...
@@ -488,9 +535,7 @@ void MultiDevSSAGraphBuilder::CreateRPCOp(SSAGraph *result,
"send, send_barrier. recv, fetch_barrier]"
);
}
// TODO(Yancey1989): schedule rpc op on different place may
// increate throughput
CreateOpHandleIOs
(
result
,
op
,
0
);
CreateOpHandleIOs
(
result
,
op
,
op_dev_id
);
}
bool
MultiDevSSAGraphBuilder
::
IsScaleLossOp
(
const
OpDesc
&
op
)
const
{
...
...
paddle/fluid/framework/details/multi_devices_graph_builder.h
浏览文件 @
9cc1eb43
...
...
@@ -47,10 +47,11 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
#endif
std
::
unique_ptr
<
SSAGraph
>
Build
(
const
ProgramDesc
&
program
)
const
override
;
int
GetVarDeviceID
(
const
std
::
string
&
varname
)
const
;
private:
void
CreateOpHandleIOs
(
SSAGraph
*
result
,
const
OpDesc
&
op
,
size_t
pla
ce_id
)
const
;
size_t
devi
ce_id
)
const
;
private:
std
::
string
loss_var_name_
;
...
...
@@ -96,21 +97,23 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
const
std
::
string
&
og
,
std
::
unordered_set
<
std
::
string
>
*
og_has_been_broadcast
)
const
;
int
GetOpDeviceID
(
const
std
::
vector
<
std
::
unordered_set
<
std
::
string
>>
&
var_name_on_devices
,
const
OpDesc
&
op
)
const
;
int
GetOpDeviceID
(
const
OpDesc
&
op
)
const
;
void
InsertAllReduceOp
(
SSAGraph
*
result
,
const
std
::
string
&
og
)
const
;
void
CreateBroadcastOp
(
SSAGraph
*
result
,
const
std
::
string
&
p_name
,
size_t
src_dev_id
)
const
;
bool
IsSparseGradient
(
const
std
::
unordered_map
<
std
::
string
,
VarDesc
*>
&
all_vars
,
const
std
::
string
&
og
)
const
;
bool
IsSparseGradient
(
const
std
::
string
&
og
)
const
;
size_t
GetAppropriateDeviceID
(
const
std
::
vector
<
std
::
string
>
&
var_names
)
const
;
private:
BuildStrategy
strategy_
;
mutable
std
::
unordered_map
<
std
::
string
,
VarDesc
*>
all_vars_
;
mutable
std
::
unordered_map
<
std
::
string
,
int
>
var_name_on_devices_
;
mutable
std
::
vector
<
int64_t
>
balance_vars_
;
void
SetCommunicationContext
(
OpHandleBase
*
op_handle
,
const
platform
::
Place
&
p
)
const
;
...
...
paddle/fluid/framework/details/ssa_graph_builder.h
浏览文件 @
9cc1eb43
...
...
@@ -30,6 +30,7 @@ class SSAGraphBuilder {
SSAGraphBuilder
()
{}
virtual
~
SSAGraphBuilder
()
{}
virtual
std
::
unique_ptr
<
SSAGraph
>
Build
(
const
ProgramDesc
&
program
)
const
=
0
;
virtual
int
GetVarDeviceID
(
const
std
::
string
&
var_name
)
const
{
return
-
1
;
}
DISABLE_COPY_AND_ASSIGN
(
SSAGraphBuilder
);
...
...
paddle/fluid/framework/parallel_executor.cc
浏览文件 @
9cc1eb43
...
...
@@ -110,7 +110,6 @@ ParallelExecutor::ParallelExecutor(
// Step 3. Convert main_program to SSA form and dependency graph. Also, insert
// ncclOp
details
::
SSAGraphBuilderFactory
builder_factory
(
member_
->
places_
,
loss_var_name
,
params
,
member_
->
local_scopes_
,
build_strategy
);
...
...
@@ -122,9 +121,10 @@ ParallelExecutor::ParallelExecutor(
#endif
}
builder_
=
std
::
move
(
builder_factory
.
Create
());
member_
->
executor_
.
reset
(
new
details
::
ThreadedSSAGraphExecutor
(
exec_strategy
,
member_
->
local_scopes_
,
places
,
builder_
factory
.
Create
()
->
Build
(
main_program
)));
builder_
->
Build
(
main_program
)));
member_
->
executor_
.
reset
(
new
details
::
ScopeBufferedSSAGraphExecutor
(
exec_strategy
,
member_
->
local_scopes_
,
std
::
move
(
var_infos
),
...
...
@@ -133,10 +133,22 @@ ParallelExecutor::ParallelExecutor(
void
ParallelExecutor
::
BCastParamsToGPUs
(
const
std
::
unordered_set
<
std
::
string
>
&
vars
)
const
{
auto
*
main_scope
=
member_
->
local_scopes_
[
0
];
// the the initialize bcast, all vars would be bcast from device(0), otherwise
// bcast from the specified device.
bool
initialize
=
builder_
.
get
()
==
nullptr
?
true
:
false
;
for
(
auto
&
var
:
vars
)
{
auto
*
main_var
=
main_scope
->
FindVar
(
var
);
int
var_dev_id
=
builder_
.
get
()
==
nullptr
?
-
1
:
builder_
->
GetVarDeviceID
(
var
);
if
(
!
initialize
&&
var_dev_id
==
-
1
)
continue
;
framework
::
Variable
*
main_var
=
nullptr
;
if
(
initialize
)
{
main_var
=
member_
->
local_scopes_
[
0
]
->
FindVar
(
var
);
}
else
{
main_var
=
member_
->
local_scopes_
[
var_dev_id
]
->
FindVar
(
var
);
}
if
(
main_var
==
nullptr
||
!
main_var
->
IsType
<
LoDTensor
>
())
{
continue
;
}
...
...
@@ -151,7 +163,8 @@ void ParallelExecutor::BCastParamsToGPUs(
for
(
size_t
i
=
0
;
i
<
member_
->
places_
.
size
();
++
i
)
{
auto
place
=
member_
->
places_
[
i
];
void
*
buffer
;
if
(
i
==
0
)
{
if
((
initialize
&&
i
==
0
)
||
(
!
initialize
&&
i
==
var_dev_id
))
{
buffer
=
const_cast
<
void
*>
(
main_tensor
.
data
<
void
>
());
}
else
{
auto
local_scope
=
member_
->
local_scopes_
[
i
];
...
...
paddle/fluid/framework/parallel_executor.h
浏览文件 @
9cc1eb43
...
...
@@ -19,12 +19,14 @@ limitations under the License. */
#include <unordered_set>
#include <vector>
#include "paddle/fluid/framework/details/execution_strategy.h"
#include "paddle/fluid/framework/details/multi_devices_graph_builder.h"
#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/op_info.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/platform/device_context.h"
namespace
paddle
{
namespace
framework
{
...
...
@@ -68,6 +70,7 @@ class ParallelExecutor {
private:
ParallelExecutorPrivate
*
member_
;
std
::
unique_ptr
<
details
::
SSAGraphBuilder
>
builder_
;
};
}
// namespace framework
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录