Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
PaddlePaddle
Paddle
提交
ee16006b
P
Paddle
项目概览
PaddlePaddle
/
Paddle
大约 1 年 前同步成功
通知
2292
Star
20931
Fork
5422
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1423
列表
看板
标记
里程碑
合并请求
543
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1,423
Issue
1,423
列表
看板
标记
里程碑
合并请求
543
合并请求
543
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
ee16006b
编写于
1月 04, 2021
作者:
W
WangXi
提交者:
GitHub
1月 04, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Optimization grad merge performance (#29784)
上级
e891f4da
变更
21
隐藏空白更改
内联
并排
Showing
21 changed file
with
917 addition
and
189 deletion
+917
-189
paddle/fluid/framework/details/CMakeLists.txt
paddle/fluid/framework/details/CMakeLists.txt
+4
-0
paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.cc
...luid/framework/details/grad_merge_all_reduce_op_handle.cc
+132
-0
paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.h
...fluid/framework/details/grad_merge_all_reduce_op_handle.h
+111
-0
paddle/fluid/framework/details/multi_devices_helper.h
paddle/fluid/framework/details/multi_devices_helper.h
+3
-1
paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc
...id/framework/details/scope_buffered_ssa_graph_executor.cc
+18
-8
paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h
...uid/framework/details/scope_buffered_ssa_graph_executor.h
+1
-0
paddle/fluid/framework/ir/coalesce_grad_tensor_pass.cc
paddle/fluid/framework/ir/coalesce_grad_tensor_pass.cc
+67
-7
paddle/fluid/framework/ir/fuse_optimizer_ops_pass/fuse_optimizer_op_pass.cc
...work/ir/fuse_optimizer_ops_pass/fuse_optimizer_op_pass.cc
+11
-3
paddle/fluid/framework/ir/multi_devices_graph_pass/CMakeLists.txt
...luid/framework/ir/multi_devices_graph_pass/CMakeLists.txt
+2
-1
paddle/fluid/framework/ir/multi_devices_graph_pass/fuse_all_reduce_op_pass.cc
...rk/ir/multi_devices_graph_pass/fuse_all_reduce_op_pass.cc
+75
-13
paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc
...k/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc
+86
-23
paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h
...rk/ir/multi_devices_graph_pass/multi_devices_graph_pass.h
+12
-7
paddle/fluid/framework/parallel_executor.cc
paddle/fluid/framework/parallel_executor.cc
+11
-0
paddle/fluid/operators/coalesce_tensor_op.cc
paddle/fluid/operators/coalesce_tensor_op.cc
+22
-2
python/paddle/fluid/layers/control_flow.py
python/paddle/fluid/layers/control_flow.py
+4
-1
python/paddle/fluid/optimizer.py
python/paddle/fluid/optimizer.py
+201
-98
python/paddle/fluid/tests/unittests/CMakeLists.txt
python/paddle/fluid/tests/unittests/CMakeLists.txt
+1
-0
python/paddle/fluid/tests/unittests/dist_mnist_gradient_merge.py
...paddle/fluid/tests/unittests/dist_mnist_gradient_merge.py
+62
-0
python/paddle/fluid/tests/unittests/test_dist_base.py
python/paddle/fluid/tests/unittests/test_dist_base.py
+14
-0
python/paddle/fluid/tests/unittests/test_dist_mnist_gradient_merge.py
...e/fluid/tests/unittests/test_dist_mnist_gradient_merge.py
+57
-0
python/paddle/fluid/tests/unittests/test_optimizer.py
python/paddle/fluid/tests/unittests/test_optimizer.py
+23
-25
未找到文件。
paddle/fluid/framework/details/CMakeLists.txt
浏览文件 @
ee16006b
...
@@ -28,6 +28,8 @@ if(WITH_GPU)
...
@@ -28,6 +28,8 @@ if(WITH_GPU)
dynload_cuda variable_visitor
)
dynload_cuda variable_visitor
)
nv_library
(
fused_all_reduce_op_handle SRCS fused_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory
nv_library
(
fused_all_reduce_op_handle SRCS fused_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory
dynload_cuda variable_visitor place device_memory_aligment
)
dynload_cuda variable_visitor place device_memory_aligment
)
nv_library
(
grad_merge_all_reduce_op_handle SRCS grad_merge_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor
ddim memory dynload_cuda variable_visitor place device_memory_aligment all_reduce_op_handle fused_all_reduce_op_handle
)
if
(
WITH_DGC
)
if
(
WITH_DGC
)
nv_library
(
sparse_all_reduce_op_handle SRCS sparse_all_reduce_op_handle.cc DEPS op_handle_base scope
nv_library
(
sparse_all_reduce_op_handle SRCS sparse_all_reduce_op_handle.cc DEPS op_handle_base scope
...
@@ -50,6 +52,8 @@ else()
...
@@ -50,6 +52,8 @@ else()
variable_visitor
)
variable_visitor
)
cc_library
(
fused_all_reduce_op_handle SRCS fused_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory
cc_library
(
fused_all_reduce_op_handle SRCS fused_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory
variable_visitor place device_memory_aligment
)
variable_visitor place device_memory_aligment
)
cc_library
(
grad_merge_all_reduce_op_handle SRCS grad_merge_all_reduce_op_handle.cc DEPS op_handle_base scope lod_tensor
ddim memory variable_visitor place device_memory_aligment all_reduce_op_handle fused_all_reduce_op_handle
)
if
(
WITH_DISTRIBUTE
)
if
(
WITH_DISTRIBUTE
)
cc_library
(
reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base variable_visitor scope
cc_library
(
reduce_op_handle SRCS reduce_op_handle.cc DEPS op_handle_base variable_visitor scope
ddim selected_rows_functor
)
ddim selected_rows_functor
)
...
...
paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.cc
0 → 100644
浏览文件 @
ee16006b
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.h"
#include <algorithm>
#include "paddle/fluid/framework/details/container_cast.h"
#include "paddle/fluid/framework/details/reduce_and_gather.h"
#include "paddle/fluid/framework/details/variable_visitor.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/platform/gpu_info.h"
#include "paddle/fluid/platform/profiler.h"
#ifdef PADDLE_WITH_NCCL
DECLARE_bool
(
sync_nccl_allreduce
);
#endif
namespace
paddle
{
namespace
framework
{
namespace
details
{
#if defined(PADDLE_WITH_NCCL)
GradMergeAllReduceOpHandle
::
GradMergeAllReduceOpHandle
(
ir
::
Node
*
node
,
const
std
::
vector
<
Scope
*>
&
local_scopes
,
const
std
::
vector
<
platform
::
Place
>
&
places
,
const
std
::
string
&
grad_merge_cond_name
,
const
platform
::
NCCLCommunicator
*
ctxs
)
:
AllReduceOpHandle
(
node
,
local_scopes
,
places
,
ctxs
),
grad_merge_cond_name_
(
grad_merge_cond_name
)
{}
#elif defined(PADDLE_WITH_XPU_BKCL)
GradMergeAllReduceOpHandle
::
GradMergeAllReduceOpHandle
(
ir
::
Node
*
node
,
const
std
::
vector
<
Scope
*>
&
local_scopes
,
const
std
::
vector
<
platform
::
Place
>
&
places
,
const
std
::
string
&
grad_merge_cond_name
,
const
platform
::
BKCLCommunicator
*
ctxs
)
:
AllReduceOpHandle
(
node
,
local_scopes
,
places
,
ctxs
),
grad_merge_cond_name_
(
grad_merge_cond_name
)
{}
#else
GradMergeAllReduceOpHandle
::
GradMergeAllReduceOpHandle
(
ir
::
Node
*
node
,
const
std
::
vector
<
Scope
*>
&
local_scopes
,
const
std
::
vector
<
platform
::
Place
>
&
places
,
const
std
::
string
&
grad_merge_cond_name
)
:
AllReduceOpHandle
(
node
,
local_scopes
,
places
),
grad_merge_cond_name_
(
grad_merge_cond_name
)
{}
#endif
void
GradMergeAllReduceOpHandle
::
RunImpl
()
{
PADDLE_ENFORCE_GT
(
local_scopes_
.
size
(),
0
,
platform
::
errors
::
PreconditionNotMet
(
"The number of local scope should be > 0, but got %zu."
,
local_scopes_
.
size
()));
auto
*
local_scope
=
local_exec_scopes_
[
0
];
auto
cond_var
=
local_scope
->
FindVar
(
grad_merge_cond_name_
);
PADDLE_ENFORCE_NOT_NULL
(
cond_var
,
platform
::
errors
::
NotFound
(
"Variable %s is not found in scope."
,
cond_var
));
bool
cond
=
*
cond_var
->
Get
<
LoDTensor
>
().
data
<
bool
>
();
if
(
cond
)
{
AllReduceOpHandle
::
RunImpl
();
}
}
std
::
string
GradMergeAllReduceOpHandle
::
Name
()
const
{
return
"grad_merge_all_reduce"
;
}
#if defined(PADDLE_WITH_NCCL)
FusedGradMergeAllReduceOpHandle
::
FusedGradMergeAllReduceOpHandle
(
ir
::
Node
*
node
,
const
std
::
vector
<
Scope
*>
&
local_scopes
,
const
std
::
vector
<
platform
::
Place
>
&
places
,
const
size_t
num_of_all_reduce
,
const
std
::
string
&
grad_merge_cond_name
,
const
platform
::
NCCLCommunicator
*
ctxs
)
:
FusedAllReduceOpHandle
(
node
,
local_scopes
,
places
,
num_of_all_reduce
,
ctxs
),
grad_merge_cond_name_
(
grad_merge_cond_name
)
{}
#elif defined(PADDLE_WITH_XPU_BKCL)
FusedGradMergeAllReduceOpHandle
::
FusedGradMergeAllReduceOpHandle
(
ir
::
Node
*
node
,
const
std
::
vector
<
Scope
*>
&
local_scopes
,
const
std
::
vector
<
platform
::
Place
>
&
places
,
const
size_t
num_of_all_reduce
,
const
std
::
string
&
grad_merge_cond_name
,
const
platform
::
BKCLCommunicator
*
ctxs
)
:
FusedAllReduceOpHandle
(
node
,
local_scopes
,
places
,
num_of_all_reduce
,
ctxs
),
grad_merge_cond_name_
(
grad_merge_cond_name
)
{}
#else
FusedGradMergeAllReduceOpHandle
::
FusedGradMergeAllReduceOpHandle
(
ir
::
Node
*
node
,
const
std
::
vector
<
Scope
*>
&
local_scopes
,
const
std
::
vector
<
platform
::
Place
>
&
places
,
const
size_t
num_of_all_reduce
,
const
std
::
string
&
grad_merge_cond_name
)
:
FusedAllReduceOpHandle
(
node
,
local_scopes
,
places
,
num_of_all_reduce
),
grad_merge_cond_name_
(
grad_merge_cond_name
)
{}
#endif
void
FusedGradMergeAllReduceOpHandle
::
RunImpl
()
{
PADDLE_ENFORCE_GT
(
local_scopes_
.
size
(),
0
,
platform
::
errors
::
PreconditionNotMet
(
"The number of local scope should be > 0, but got %zu."
,
local_scopes_
.
size
()));
auto
*
local_scope
=
local_exec_scopes_
[
0
];
auto
cond_var
=
local_scope
->
FindVar
(
grad_merge_cond_name_
);
PADDLE_ENFORCE_NOT_NULL
(
cond_var
,
platform
::
errors
::
NotFound
(
"Variable %s is not found in scope."
,
cond_var
));
bool
cond
=
*
cond_var
->
Get
<
LoDTensor
>
().
data
<
bool
>
();
if
(
cond
)
{
VLOG
(
10
)
<<
"run fused grad merge all reduce"
;
FusedAllReduceOpHandle
::
RunImpl
();
}
}
std
::
string
FusedGradMergeAllReduceOpHandle
::
Name
()
const
{
return
"fused_grad_merge_all_reduce"
;
}
}
// namespace details
}
// namespace framework
}
// namespace paddle
paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.h
0 → 100644
浏览文件 @
ee16006b
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include <utility>
#include <vector>
#include "paddle/fluid/framework/details/all_reduce_op_handle.h"
#include "paddle/fluid/framework/details/fused_all_reduce_op_handle.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
namespace
paddle
{
namespace
framework
{
namespace
ir
{
class
Node
;
}
// namespace ir
}
// namespace framework
namespace
platform
{
class
NCCLCommunicator
;
}
// namespace platform
}
// namespace paddle
#if defined(PADDLE_WITH_NCCL)
#include "paddle/fluid/framework/details/nccl_op_handle.h"
#include "paddle/fluid/platform/nccl_helper.h"
#endif
namespace
paddle
{
namespace
framework
{
namespace
details
{
class
GradMergeAllReduceOpHandle
:
public
AllReduceOpHandle
{
public:
#if defined(PADDLE_WITH_NCCL)
GradMergeAllReduceOpHandle
(
ir
::
Node
*
node
,
const
std
::
vector
<
Scope
*>
&
local_scopes
,
const
std
::
vector
<
platform
::
Place
>
&
places
,
const
std
::
string
&
grad_merge_cond_name
,
const
platform
::
NCCLCommunicator
*
ctxs
);
#elif defined(PADDLE_WITH_XPU_BKCL)
GradMergeAllReduceOpHandle
(
ir
::
Node
*
node
,
const
std
::
vector
<
Scope
*>
&
local_scopes
,
const
std
::
vector
<
platform
::
Place
>
&
places
,
const
std
::
string
&
grad_merge_cond_name
,
const
platform
::
BKCLCommunicator
*
ctxs
);
#else
GradMergeAllReduceOpHandle
(
ir
::
Node
*
node
,
const
std
::
vector
<
Scope
*>
&
local_scopes
,
const
std
::
vector
<
platform
::
Place
>
&
places
,
const
std
::
string
&
grad_merge_cond_name
);
#endif
std
::
string
Name
()
const
override
;
std
::
string
GradMergeCondName
()
{
return
grad_merge_cond_name_
;
}
protected:
void
RunImpl
()
override
;
private:
std
::
string
grad_merge_cond_name_
;
};
class
FusedGradMergeAllReduceOpHandle
:
public
FusedAllReduceOpHandle
{
public:
#if defined(PADDLE_WITH_NCCL)
FusedGradMergeAllReduceOpHandle
(
ir
::
Node
*
node
,
const
std
::
vector
<
Scope
*>
&
local_scopes
,
const
std
::
vector
<
platform
::
Place
>
&
places
,
const
size_t
num_of_all_reduce
,
const
std
::
string
&
grad_merge_cond_name
,
const
platform
::
NCCLCommunicator
*
ctxs
);
#elif defined(PADDLE_WITH_XPU_BKCL)
FusedGradMergeAllReduceOpHandle
(
ir
::
Node
*
node
,
const
std
::
vector
<
Scope
*>
&
local_scopes
,
const
std
::
vector
<
platform
::
Place
>
&
places
,
const
size_t
num_of_all_reduce
,
const
std
::
string
&
grad_merge_cond_name
,
const
platform
::
BKCLCommunicator
*
ctxs
);
#else
FusedGradMergeAllReduceOpHandle
(
ir
::
Node
*
node
,
const
std
::
vector
<
Scope
*>
&
local_scopes
,
const
std
::
vector
<
platform
::
Place
>
&
places
,
const
size_t
num_of_all_reduce
,
const
std
::
string
&
grad_merge_cond_name
);
#endif
std
::
string
Name
()
const
override
;
protected:
void
RunImpl
()
override
;
private:
std
::
string
grad_merge_cond_name_
;
};
}
// namespace details
}
// namespace framework
}
// namespace paddle
paddle/fluid/framework/details/multi_devices_helper.h
浏览文件 @
ee16006b
...
@@ -22,6 +22,7 @@
...
@@ -22,6 +22,7 @@
#include <vector>
#include <vector>
#include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/var_handle.h"
#include "paddle/fluid/framework/details/var_handle.h"
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/ir/pass.h"
#include "paddle/fluid/framework/ir/pass.h"
...
@@ -62,7 +63,7 @@ constexpr char kUseHierarchicalAllReduce[] = "use_hierarchical_allreduce";
...
@@ -62,7 +63,7 @@ constexpr char kUseHierarchicalAllReduce[] = "use_hierarchical_allreduce";
typedef
std
::
unordered_set
<
VarHandleBase
*>
GraphDepVars
;
typedef
std
::
unordered_set
<
VarHandleBase
*>
GraphDepVars
;
constexpr
char
kGraphDepVars
[]
=
"dep_vars"
;
constexpr
char
kGraphDepVars
[]
=
"dep_vars"
;
typedef
std
::
unordered_
set
<
std
::
string
>
FusedVars
;
typedef
std
::
unordered_
map
<
std
::
string
,
details
::
VariableInfo
>
FusedVars
;
constexpr
char
kFusedVars
[]
=
"fused_vars"
;
constexpr
char
kFusedVars
[]
=
"fused_vars"
;
constexpr
char
kFusedVarNamePrefix
[]
=
"@FUSEDVAR@"
;
constexpr
char
kFusedVarNamePrefix
[]
=
"@FUSEDVAR@"
;
...
@@ -78,6 +79,7 @@ constexpr char kParamsAndSparseGrads[] = "params_and_sparse_grads";
...
@@ -78,6 +79,7 @@ constexpr char kParamsAndSparseGrads[] = "params_and_sparse_grads";
typedef
std
::
vector
<
ProgramDesc
>
ProgramDescs
;
typedef
std
::
vector
<
ProgramDesc
>
ProgramDescs
;
constexpr
char
kProgramDescs
[]
=
"program_descs"
;
constexpr
char
kProgramDescs
[]
=
"program_descs"
;
constexpr
char
kStartupProgramDescs
[]
=
"startup_program_descs"
;
typedef
std
::
unordered_set
<
std
::
string
>
PinnedVars
;
typedef
std
::
unordered_set
<
std
::
string
>
PinnedVars
;
constexpr
char
kPinnedVars
[]
=
"pinned_vars"
;
constexpr
char
kPinnedVars
[]
=
"pinned_vars"
;
...
...
paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.cc
浏览文件 @
ee16006b
...
@@ -123,17 +123,27 @@ void ScopeBufferedSSAGraphExecutor::InitVariables() {
...
@@ -123,17 +123,27 @@ void ScopeBufferedSSAGraphExecutor::InitVariables() {
}
}
const
ir
::
Graph
&
graph
=
Graph
();
const
ir
::
Graph
&
graph
=
Graph
();
if
(
!
is_initialized_
)
{
// startup_program_descs only need to be executed once
if
(
graph
.
Has
(
details
::
kStartupProgramDescs
))
{
auto
&
program_descs
=
graph
.
Get
<
details
::
ProgramDescs
>
(
details
::
kStartupProgramDescs
);
for
(
auto
&
program_desc
:
program_descs
)
{
for
(
auto
&
op_desc
:
program_desc
.
Block
(
0
).
AllOps
())
{
for
(
size_t
i
=
0
;
i
<
local_exec_scopes_
.
size
();
++
i
)
{
auto
op
=
OpRegistry
::
CreateOp
(
*
op_desc
);
op
->
Run
(
*
local_exec_scopes_
[
i
],
places_
[
i
]);
}
}
}
}
is_initialized_
=
true
;
}
if
(
graph
.
Has
(
details
::
kProgramDescs
))
{
if
(
graph
.
Has
(
details
::
kProgramDescs
))
{
auto
&
program_descs
=
auto
&
program_descs
=
graph
.
Get
<
details
::
ProgramDescs
>
(
details
::
kProgramDescs
);
graph
.
Get
<
details
::
ProgramDescs
>
(
details
::
kProgramDescs
);
// Init vars
auto
&
fused_grad_vars
=
graph
.
Get
<
details
::
FusedVars
>
(
details
::
kFusedVars
);
for
(
size_t
i
=
0
;
i
<
local_exec_scopes_
.
size
();
++
i
)
{
for
(
auto
&
var_name
:
fused_grad_vars
)
{
auto
var
=
local_exec_scopes_
[
i
]
->
Var
(
var_name
);
var
->
GetMutable
<
LoDTensor
>
();
}
}
for
(
auto
&
program_desc
:
program_descs
)
{
for
(
auto
&
program_desc
:
program_descs
)
{
for
(
auto
&
op_desc
:
program_desc
.
Block
(
0
).
AllOps
())
{
for
(
auto
&
op_desc
:
program_desc
.
Block
(
0
).
AllOps
())
{
...
...
paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h
浏览文件 @
ee16006b
...
@@ -64,6 +64,7 @@ class ScopeBufferedSSAGraphExecutor : public SSAGraphExecutor {
...
@@ -64,6 +64,7 @@ class ScopeBufferedSSAGraphExecutor : public SSAGraphExecutor {
bool
DropScopeOrNot
()
const
;
bool
DropScopeOrNot
()
const
;
bool
is_initialized_
{
false
};
size_t
drop_scope_counter_
{
0
};
size_t
drop_scope_counter_
{
0
};
ExecutionStrategy
strategy_
;
ExecutionStrategy
strategy_
;
std
::
unique_ptr
<
SSAGraphExecutor
>
underlying_executor_
;
std
::
unique_ptr
<
SSAGraphExecutor
>
underlying_executor_
;
...
...
paddle/fluid/framework/ir/coalesce_grad_tensor_pass.cc
浏览文件 @
ee16006b
...
@@ -199,19 +199,42 @@ class CoalesceGradTensorPass : public ir::Pass {
...
@@ -199,19 +199,42 @@ class CoalesceGradTensorPass : public ir::Pass {
if
(
!
result
->
Has
(
details
::
kFusedGrads
))
{
if
(
!
result
->
Has
(
details
::
kFusedGrads
))
{
result
->
Set
(
details
::
kFusedGrads
,
new
details
::
FusedGrads
);
result
->
Set
(
details
::
kFusedGrads
,
new
details
::
FusedGrads
);
}
}
if
(
!
result
->
Has
(
details
::
kStartupProgramDescs
))
{
result
->
Set
(
details
::
kStartupProgramDescs
,
new
details
::
ProgramDescs
);
}
if
(
!
result
->
Has
(
details
::
kProgramDescs
))
{
if
(
!
result
->
Has
(
details
::
kProgramDescs
))
{
result
->
Set
(
details
::
kProgramDescs
,
new
details
::
ProgramDescs
);
result
->
Set
(
details
::
kProgramDescs
,
new
details
::
ProgramDescs
);
}
}
auto
type
=
GetTypeOfVar
(
vars_info
,
params_grads
.
front
().
second
);
bool
persistable
=
false
;
for
(
auto
&
p_g
:
params_grads
)
{
if
(
IsPersistableVar
(
vars_info
,
p_g
.
second
))
{
// NOTE. If one of the grads is persistable, then the fused_grad_var
// should be set to persistable.
persistable
=
true
;
break
;
}
}
// the fused_var_name should be unique, so it appends
// the fused_var_name should be unique, so it appends
// params_grads.begin()->second.
// params_grads.begin()->second.
auto
fused_grad_var_name
=
std
::
string
(
details
::
kFusedVarNamePrefix
)
+
auto
fused_grad_var_name
=
std
::
string
(
details
::
kFusedVarNamePrefix
)
+
"@GRAD@"
+
params_grads
.
begin
()
->
second
;
"@GRAD@"
+
params_grads
.
begin
()
->
second
;
// what a pity, visual c++ unsupport {.type_ = type}
details
::
VariableInfo
var_info
;
var_info
.
name_
=
fused_grad_var_name
;
var_info
.
type_
=
type
;
var_info
.
persistable_
=
persistable
;
auto
&
fused_var_set
=
result
->
Get
<
details
::
FusedVars
>
(
details
::
kFusedVars
);
auto
&
fused_var_set
=
result
->
Get
<
details
::
FusedVars
>
(
details
::
kFusedVars
);
PADDLE_ENFORCE_EQ
(
PADDLE_ENFORCE_EQ
(
fused_var_set
.
count
(
fused_grad_var_name
),
0
,
fused_var_set
.
count
(
fused_grad_var_name
),
0
,
platform
::
errors
::
AlreadyExists
(
"Var(%s) is duplicate in FusedVars."
,
platform
::
errors
::
AlreadyExists
(
"Var(%s) is duplicate in FusedVars."
,
fused_grad_var_name
));
fused_grad_var_name
));
fused_var_set
.
insert
(
fused_grad_var_name
);
fused_var_set
.
insert
({
fused_grad_var_name
,
var_info
});
result
->
Get
<
details
::
FusedGrads
>
(
details
::
kFusedGrads
)
result
->
Get
<
details
::
FusedGrads
>
(
details
::
kFusedGrads
)
.
emplace_back
(
fused_grad_var_name
);
.
emplace_back
(
fused_grad_var_name
);
...
@@ -414,6 +437,13 @@ class CoalesceGradTensorPass : public ir::Pass {
...
@@ -414,6 +437,13 @@ class CoalesceGradTensorPass : public ir::Pass {
return
var_desc
->
GetType
();
return
var_desc
->
GetType
();
}
}
bool
IsPersistableVar
(
const
std
::
unordered_map
<
std
::
string
,
std
::
vector
<
ir
::
Node
*>>
&
vars_info
,
const
std
::
string
&
name
)
const
{
auto
var_desc
=
GetVarDescFromVarsInfo
(
vars_info
,
name
);
return
var_desc
->
Persistable
();
}
private:
private:
bool
IsLoDTensorType
(
const
proto
::
VarType
::
Type
&
type
)
const
{
bool
IsLoDTensorType
(
const
proto
::
VarType
::
Type
&
type
)
const
{
// Current only support LOD_TENSOR.
// Current only support LOD_TENSOR.
...
@@ -494,18 +524,46 @@ class CoalesceGradTensorPass : public ir::Pass {
...
@@ -494,18 +524,46 @@ class CoalesceGradTensorPass : public ir::Pass {
DataTypeToString
(
next_dtype
),
DataTypeToString
(
dtype
)));
DataTypeToString
(
next_dtype
),
DataTypeToString
(
dtype
)));
}
}
result
->
Get
<
details
::
ProgramDescs
>
(
details
::
kProgramDescs
).
emplace_back
();
bool
any_persistable
=
false
;
ProgramDesc
&
program_desc
=
bool
all_persistable
=
true
;
result
->
Get
<
details
::
ProgramDescs
>
(
details
::
kProgramDescs
).
back
();
for
(
auto
&
p_g
:
params_grads
)
{
auto
*
global_block
=
program_desc
.
MutableBlock
(
0
);
if
(
IsPersistableVar
(
vars_info
,
p_g
.
second
))
{
AppendAllocSpaceForVarsOp
(
params_name
,
grads_name
,
fused_var_name
,
dtype
,
any_persistable
=
true
;
global_block
);
}
else
{
all_persistable
=
false
;
}
}
if
(
all_persistable
)
{
// All grads are persistable, only need to be executed once at the
// beginning.
result
->
Get
<
details
::
ProgramDescs
>
(
details
::
kStartupProgramDescs
)
.
emplace_back
();
ProgramDesc
&
program_desc
=
result
->
Get
<
details
::
ProgramDescs
>
(
details
::
kStartupProgramDescs
)
.
back
();
auto
*
global_block
=
program_desc
.
MutableBlock
(
0
);
AppendAllocSpaceForVarsOp
(
params_name
,
grads_name
,
fused_var_name
,
dtype
,
all_persistable
,
global_block
);
}
else
{
// NOTE. In scope_buffered_ssa_graph_executor, after each execution of
// DropScope(), non persistable vars will be Erase or Clear. So
// coalesce_tensor op needs to be executed again after the execution
// of DropScope().
result
->
Get
<
details
::
ProgramDescs
>
(
details
::
kProgramDescs
).
emplace_back
();
ProgramDesc
&
program_desc
=
result
->
Get
<
details
::
ProgramDescs
>
(
details
::
kProgramDescs
).
back
();
auto
*
global_block
=
program_desc
.
MutableBlock
(
0
);
AppendAllocSpaceForVarsOp
(
params_name
,
grads_name
,
fused_var_name
,
dtype
,
any_persistable
,
global_block
);
}
}
}
void
AppendAllocSpaceForVarsOp
(
const
std
::
vector
<
std
::
string
>
&
params_name
,
void
AppendAllocSpaceForVarsOp
(
const
std
::
vector
<
std
::
string
>
&
params_name
,
const
std
::
vector
<
std
::
string
>
&
grads_name
,
const
std
::
vector
<
std
::
string
>
&
grads_name
,
const
std
::
string
&
fused_var_name
,
const
std
::
string
&
fused_var_name
,
const
proto
::
VarType
::
Type
&
dtype
,
const
proto
::
VarType
::
Type
&
dtype
,
bool
persistable
,
BlockDesc
*
global_block
)
const
{
BlockDesc
*
global_block
)
const
{
auto
op_desc
=
global_block
->
AppendOp
();
auto
op_desc
=
global_block
->
AppendOp
();
op_desc
->
SetType
(
"coalesce_tensor"
);
op_desc
->
SetType
(
"coalesce_tensor"
);
...
@@ -513,6 +571,8 @@ class CoalesceGradTensorPass : public ir::Pass {
...
@@ -513,6 +571,8 @@ class CoalesceGradTensorPass : public ir::Pass {
op_desc
->
SetOutput
(
"Output"
,
grads_name
);
op_desc
->
SetOutput
(
"Output"
,
grads_name
);
op_desc
->
SetOutput
(
"FusedOutput"
,
{
fused_var_name
});
op_desc
->
SetOutput
(
"FusedOutput"
,
{
fused_var_name
});
op_desc
->
SetAttr
(
"dtype"
,
static_cast
<
int
>
(
dtype
));
op_desc
->
SetAttr
(
"dtype"
,
static_cast
<
int
>
(
dtype
));
op_desc
->
SetAttr
(
"persist_output"
,
persistable
);
}
}
};
};
}
// namespace ir
}
// namespace ir
...
...
paddle/fluid/framework/ir/fuse_optimizer_ops_pass/fuse_optimizer_op_pass.cc
浏览文件 @
ee16006b
...
@@ -76,6 +76,9 @@ void FuseOptimizerOpPass::ApplyImpl(ir::Graph *graph) const {
...
@@ -76,6 +76,9 @@ void FuseOptimizerOpPass::ApplyImpl(ir::Graph *graph) const {
result
.
Set
(
details
::
kFusedOptType
,
new
details
::
FusedOptType
);
result
.
Set
(
details
::
kFusedOptType
,
new
details
::
FusedOptType
);
result
.
Get
<
details
::
FusedOptType
>
(
details
::
kFusedOptType
)
=
fuse_op_type
;
result
.
Get
<
details
::
FusedOptType
>
(
details
::
kFusedOptType
)
=
fuse_op_type
;
if
(
!
result
.
Has
(
details
::
kStartupProgramDescs
))
{
result
.
Set
(
details
::
kStartupProgramDescs
,
new
details
::
ProgramDescs
);
}
if
(
!
result
.
Has
(
details
::
kProgramDescs
))
{
if
(
!
result
.
Has
(
details
::
kProgramDescs
))
{
result
.
Set
(
details
::
kProgramDescs
,
new
details
::
ProgramDescs
);
result
.
Set
(
details
::
kProgramDescs
,
new
details
::
ProgramDescs
);
}
}
...
@@ -100,7 +103,12 @@ void FuseOptimizerOpPass::ApplyImpl(ir::Graph *graph) const {
...
@@ -100,7 +103,12 @@ void FuseOptimizerOpPass::ApplyImpl(ir::Graph *graph) const {
fused_var_set
.
count
(
fused_var_name
),
0
,
fused_var_set
.
count
(
fused_var_name
),
0
,
platform
::
errors
::
AlreadyExists
(
platform
::
errors
::
AlreadyExists
(
"The fused variable(%s) already exists."
,
fused_var_name
));
"The fused variable(%s) already exists."
,
fused_var_name
));
fused_var_set
.
insert
(
fused_var_name
);
// FIXME(wangxi). update persistable
details
::
VariableInfo
var_info
;
var_info
.
name_
=
fused_var_name
;
var_info
.
type_
=
proto
::
VarType
::
LOD_TENSOR
;
var_info
.
persistable_
=
false
;
fused_var_set
.
insert
({
fused_var_name
,
var_info
});
fused_vars_name
.
emplace
(
var_name
,
fused_var_name
);
fused_vars_name
.
emplace
(
var_name
,
fused_var_name
);
}
}
...
@@ -151,8 +159,8 @@ void FuseOptimizerOpPass::ApplyImpl(ir::Graph *graph) const {
...
@@ -151,8 +159,8 @@ void FuseOptimizerOpPass::ApplyImpl(ir::Graph *graph) const {
return
;
return
;
}
}
auto
&
fused_vars
=
result
.
Get
<
details
::
FusedVars
>
(
details
::
kFusedVars
);
auto
&
fused_vars
=
result
.
Get
<
details
::
FusedVars
>
(
details
::
kFusedVars
);
auto
iter
=
std
::
find
(
fused_vars
.
begin
(),
fused_vars
.
end
(),
fused_grad
.
front
());
auto
iter
=
fused_vars
.
find
(
fused_grad
.
front
());
PADDLE_ENFORCE_EQ
(
PADDLE_ENFORCE_EQ
(
iter
!=
fused_vars
.
end
(),
true
,
iter
!=
fused_vars
.
end
(),
true
,
platform
::
errors
::
NotFound
(
"Not found the fused gradient variable."
));
platform
::
errors
::
NotFound
(
"Not found the fused gradient variable."
));
...
...
paddle/fluid/framework/ir/multi_devices_graph_pass/CMakeLists.txt
浏览文件 @
ee16006b
...
@@ -4,6 +4,7 @@ cc_library(multi_devices_graph_print_pass SRCS multi_devices_graph_print_pass.cc
...
@@ -4,6 +4,7 @@ cc_library(multi_devices_graph_print_pass SRCS multi_devices_graph_print_pass.cc
cc_library
(
multi_devices_graph_check_pass SRCS multi_devices_graph_check_pass.cc DEPS multi_devices_helper
)
cc_library
(
multi_devices_graph_check_pass SRCS multi_devices_graph_check_pass.cc DEPS multi_devices_helper
)
set
(
ALL_REDUCE_OP_HANDLES all_reduce_op_handle
)
set
(
ALL_REDUCE_OP_HANDLES all_reduce_op_handle
)
set
(
ALL_REDUCE_OP_HANDLES grad_merge_all_reduce_op_handle
)
if
(
WITH_GPU AND WITH_DGC
)
if
(
WITH_GPU AND WITH_DGC
)
list
(
APPEND ALL_REDUCE_OP_HANDLES sparse_all_reduce_op_handle
)
list
(
APPEND ALL_REDUCE_OP_HANDLES sparse_all_reduce_op_handle
)
endif
()
endif
()
...
@@ -13,7 +14,7 @@ cc_library(multi_devices_graph_pass SRCS multi_devices_graph_pass.cc DEPS multi_
...
@@ -13,7 +14,7 @@ cc_library(multi_devices_graph_pass SRCS multi_devices_graph_pass.cc DEPS multi_
cc_library
(
sequential_execution_pass SRCS sequential_execution_pass.cc DEPS graph graph_helper pass
)
cc_library
(
sequential_execution_pass SRCS sequential_execution_pass.cc DEPS graph graph_helper pass
)
cc_library
(
set_reader_device_info_utils SRCS set_reader_device_info_utils.cc DEPS graph graph_helper pass multi_devices_graph_pass
)
cc_library
(
set_reader_device_info_utils SRCS set_reader_device_info_utils.cc DEPS graph graph_helper pass multi_devices_graph_pass
)
cc_library
(
fuse_all_reduce_op_pass SRCS fuse_all_reduce_op_pass.cc DEPS graph graph_helper fused_all_reduce_op_handle
)
cc_library
(
fuse_all_reduce_op_pass SRCS fuse_all_reduce_op_pass.cc DEPS graph graph_helper fused_all_reduce_op_handle
grad_merge_all_reduce_op_handle
)
cc_library
(
all_reduce_deps_pass SRCS all_reduce_deps_pass.cc DEPS all_reduce_op_handle graph graph_helper pass
)
cc_library
(
all_reduce_deps_pass SRCS all_reduce_deps_pass.cc DEPS all_reduce_op_handle graph graph_helper pass
)
cc_library
(
backward_optimizer_op_deps_pass SRCS backward_optimizer_op_deps_pass.cc DEPS graph graph_helper pass
)
cc_library
(
backward_optimizer_op_deps_pass SRCS backward_optimizer_op_deps_pass.cc DEPS graph graph_helper pass
)
cc_library
(
add_reader_dependency_pass SRCS add_reader_dependency_pass.cc DEPS graph graph_helper pass
)
cc_library
(
add_reader_dependency_pass SRCS add_reader_dependency_pass.cc DEPS graph graph_helper pass
)
paddle/fluid/framework/ir/multi_devices_graph_pass/fuse_all_reduce_op_pass.cc
浏览文件 @
ee16006b
...
@@ -19,6 +19,7 @@
...
@@ -19,6 +19,7 @@
#include "paddle/fluid/framework/details/all_reduce_op_handle.h"
#include "paddle/fluid/framework/details/all_reduce_op_handle.h"
#include "paddle/fluid/framework/details/container_cast.h"
#include "paddle/fluid/framework/details/container_cast.h"
#include "paddle/fluid/framework/details/fused_all_reduce_op_handle.h"
#include "paddle/fluid/framework/details/fused_all_reduce_op_handle.h"
#include "paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.h"
#include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/ir/graph_helper.h"
#include "paddle/fluid/framework/ir/graph_helper.h"
...
@@ -164,6 +165,38 @@ class FuseAllReduceOpPass : public ir::Pass {
...
@@ -164,6 +165,38 @@ class FuseAllReduceOpPass : public ir::Pass {
const
platform
::
BKCLCommunicator
*
multi_bkcl_ctxs
,
const
platform
::
BKCLCommunicator
*
multi_bkcl_ctxs
,
#endif
#endif
ir
::
Graph
*
result
)
const
{
ir
::
Graph
*
result
)
const
{
bool
is_grad_merge
=
false
;
std
::
string
grad_merge_cond_name
;
for
(
auto
&
op
:
all_reduce_ops
)
{
auto
*
grad_merge_all_reduce_op_handle
=
dynamic_cast
<
details
::
GradMergeAllReduceOpHandle
*>
(
&
op
->
Wrapper
<
details
::
OpHandleBase
>
());
if
(
grad_merge_all_reduce_op_handle
)
{
if
(
is_grad_merge
)
{
auto
this_grad_merge_cond_name
=
grad_merge_all_reduce_op_handle
->
GradMergeCondName
();
PADDLE_ENFORCE_EQ
(
grad_merge_cond_name
,
this_grad_merge_cond_name
,
platform
::
errors
::
InvalidArgument
(
"grad_merge_cond_name is not same in different all_reduce, "
"prev_grad_merge_cond_name is %s, this_grad_merge_cond_name "
"is %s"
,
grad_merge_cond_name
,
this_grad_merge_cond_name
));
}
else
{
is_grad_merge
=
true
;
grad_merge_cond_name
=
grad_merge_all_reduce_op_handle
->
GradMergeCondName
();
}
}
else
{
PADDLE_ENFORCE_EQ
(
is_grad_merge
,
false
,
platform
::
errors
::
InvalidArgument
(
"if use grad_merge, all of allreduce must be "
"grad_merge_allreduce"
));
}
}
VLOG
(
6
)
<<
"fused allreduce use_grad_merge="
<<
is_grad_merge
;
std
::
vector
<
details
::
VarHandleBase
*>
inputs
;
std
::
vector
<
details
::
VarHandleBase
*>
inputs
;
std
::
vector
<
details
::
VarHandleBase
*>
outputs
;
std
::
vector
<
details
::
VarHandleBase
*>
outputs
;
for
(
auto
&
op
:
all_reduce_ops
)
{
for
(
auto
&
op
:
all_reduce_ops
)
{
...
@@ -189,13 +222,16 @@ class FuseAllReduceOpPass : public ir::Pass {
...
@@ -189,13 +222,16 @@ class FuseAllReduceOpPass : public ir::Pass {
#if defined(PADDLE_WITH_NCCL)
#if defined(PADDLE_WITH_NCCL)
CreateFusedAllReduceOp
(
inputs
,
outputs
,
num_of_all_reduce
,
places
,
CreateFusedAllReduceOp
(
inputs
,
outputs
,
num_of_all_reduce
,
places
,
local_scopes
,
multi_nccl_ctxs
,
result
);
local_scopes
,
is_grad_merge
,
grad_merge_cond_name
,
multi_nccl_ctxs
,
result
);
#elif defined(PADDLE_WITH_XPU_BKCL)
#elif defined(PADDLE_WITH_XPU_BKCL)
CreateFusedAllReduceOp
(
inputs
,
outputs
,
num_of_all_reduce
,
places
,
CreateFusedAllReduceOp
(
inputs
,
outputs
,
num_of_all_reduce
,
places
,
local_scopes
,
multi_bkcl_ctxs
,
result
);
local_scopes
,
is_grad_merge
,
grad_merge_cond_name
,
multi_bkcl_ctxs
,
result
);
#else
#else
CreateFusedAllReduceOp
(
inputs
,
outputs
,
num_of_all_reduce
,
places
,
CreateFusedAllReduceOp
(
inputs
,
outputs
,
num_of_all_reduce
,
places
,
local_scopes
,
result
);
local_scopes
,
is_grad_merge
,
grad_merge_cond_name
,
result
);
#endif
#endif
}
}
...
@@ -205,26 +241,52 @@ class FuseAllReduceOpPass : public ir::Pass {
...
@@ -205,26 +241,52 @@ class FuseAllReduceOpPass : public ir::Pass {
const
std
::
vector
<
details
::
VarHandleBase
*>
&
outputs
,
const
std
::
vector
<
details
::
VarHandleBase
*>
&
outputs
,
const
size_t
num_of_all_reduce
,
const
size_t
num_of_all_reduce
,
const
std
::
vector
<
platform
::
Place
>
&
places
,
const
std
::
vector
<
platform
::
Place
>
&
places
,
const
std
::
vector
<
Scope
*>
&
local_scopes
,
const
std
::
vector
<
Scope
*>
&
local_scopes
,
bool
is_grad_merge
,
const
std
::
string
&
grad_merge_cond_name
,
#if defined(PADDLE_WITH_NCCL)
#if defined(PADDLE_WITH_NCCL)
const
platform
::
NCCLCommunicator
*
multi_nccl_ctxs
,
const
platform
::
NCCLCommunicator
*
multi_nccl_ctxs
,
#elif defined(PADDLE_WITH_XPU_BKCL)
#elif defined(PADDLE_WITH_XPU_BKCL)
const
platform
::
BKCLCommunicator
*
multi_bkcl_ctxs
,
const
platform
::
BKCLCommunicator
*
multi_bkcl_ctxs
,
#endif
#endif
ir
::
Graph
*
result
)
const
{
ir
::
Graph
*
result
)
const
{
details
::
FusedAllReduceOpHandle
*
op_handle
=
NULL
;
if
(
is_grad_merge
)
{
#if defined(PADDLE_WITH_NCCL)
op_handle
=
new
details
::
FusedGradMergeAllReduceOpHandle
(
result
->
CreateEmptyNode
(
"fused_all_reduce"
,
ir
::
Node
::
Type
::
kOperation
),
local_scopes
,
places
,
num_of_all_reduce
,
grad_merge_cond_name
,
multi_nccl_ctxs
);
#elif defined(PADDLE_WITH_XPU_BKCL)
op_handle
=
new
details
::
FusedGradMergeAllReduceOpHandle
(
result
->
CreateEmptyNode
(
"fused_all_reduce"
,
ir
::
Node
::
Type
::
kOperation
),
local_scopes
,
places
,
num_of_all_reduce
,
grad_merge_cond_name
,
multi_bkcl_ctxs
);
#else
op_handle
=
new
details
::
FusedGradMergeAllReduceOpHandle
(
result
->
CreateEmptyNode
(
"fused_all_reduce"
,
ir
::
Node
::
Type
::
kOperation
),
local_scopes
,
places
,
num_of_all_reduce
,
grad_merge_cond_name
);
#endif
}
else
{
#if defined(PADDLE_WITH_NCCL)
#if defined(PADDLE_WITH_NCCL)
auto
*
op_handle
=
new
details
::
FusedAllReduceOpHandle
(
op_handle
=
new
details
::
FusedAllReduceOpHandle
(
result
->
CreateEmptyNode
(
"fused_all_reduce"
,
ir
::
Node
::
Type
::
kOperation
),
result
->
CreateEmptyNode
(
"fused_all_reduce"
,
local_scopes
,
places
,
num_of_all_reduce
,
multi_nccl_ctxs
);
ir
::
Node
::
Type
::
kOperation
),
local_scopes
,
places
,
num_of_all_reduce
,
multi_nccl_ctxs
);
#elif defined(PADDLE_WITH_XPU_BKCL)
#elif defined(PADDLE_WITH_XPU_BKCL)
auto
*
op_handle
=
new
details
::
FusedAllReduceOpHandle
(
auto
*
op_handle
=
new
details
::
FusedAllReduceOpHandle
(
result
->
CreateEmptyNode
(
"fused_all_reduce"
,
ir
::
Node
::
Type
::
kOperation
),
result
->
CreateEmptyNode
(
"fused_all_reduce"
,
local_scopes
,
places
,
num_of_all_reduce
,
multi_bkcl_ctxs
);
ir
::
Node
::
Type
::
kOperation
),
local_scopes
,
places
,
num_of_all_reduce
,
multi_bkcl_ctxs
);
#else
#else
auto
*
op_handle
=
new
details
::
FusedAllReduceOpHandle
(
op_handle
=
new
details
::
FusedAllReduceOpHandle
(
result
->
CreateEmptyNode
(
"fused_all_reduce"
,
ir
::
Node
::
Type
::
kOperation
),
result
->
CreateEmptyNode
(
"fused_all_reduce"
,
local_scopes
,
places
,
num_of_all_reduce
);
ir
::
Node
::
Type
::
kOperation
),
local_scopes
,
places
,
num_of_all_reduce
);
#endif
#endif
}
for
(
auto
in
:
inputs
)
{
for
(
auto
in
:
inputs
)
{
op_handle
->
AddInput
(
in
);
op_handle
->
AddInput
(
in
);
...
...
paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc
浏览文件 @
ee16006b
...
@@ -25,6 +25,7 @@
...
@@ -25,6 +25,7 @@
#include "paddle/fluid/framework/details/computation_op_handle.h"
#include "paddle/fluid/framework/details/computation_op_handle.h"
#include "paddle/fluid/framework/details/fetch_barrier_op_handle.h"
#include "paddle/fluid/framework/details/fetch_barrier_op_handle.h"
#include "paddle/fluid/framework/details/fused_broadcast_op_handle.h"
#include "paddle/fluid/framework/details/fused_broadcast_op_handle.h"
#include "paddle/fluid/framework/details/grad_merge_all_reduce_op_handle.h"
#include "paddle/fluid/framework/details/reduce_op_handle.h"
#include "paddle/fluid/framework/details/reduce_op_handle.h"
#include "paddle/fluid/framework/details/rpc_op_handle.h"
#include "paddle/fluid/framework/details/rpc_op_handle.h"
#include "paddle/fluid/framework/details/scale_loss_grad_op_handle.h"
#include "paddle/fluid/framework/details/scale_loss_grad_op_handle.h"
...
@@ -255,7 +256,7 @@ void MultiDevSSAGraphBuilderBase::ApplyImpl(ir::Graph *graph) const {
...
@@ -255,7 +256,7 @@ void MultiDevSSAGraphBuilderBase::ApplyImpl(ir::Graph *graph) const {
VLOG
(
10
)
<<
"Bcast "
<<
g_name
<<
" for parameter "
<<
p_name
VLOG
(
10
)
<<
"Bcast "
<<
g_name
<<
" for parameter "
<<
p_name
<<
" op_type "
<<
node
->
Op
()
->
Type
();
<<
" op_type "
<<
node
->
Op
()
->
Type
();
if
(
NeedCollectiveForGrad
(
g_name
,
sorted_ops
))
{
if
(
NeedCollectiveForGrad
(
g_name
,
sorted_ops
))
{
InsertCollectiveOp
(
&
result
,
p_name
,
g_name
);
InsertCollectiveOp
(
&
result
,
node
,
p_name
,
g_name
);
}
}
}
}
}
}
...
@@ -481,45 +482,77 @@ void MultiDevSSAGraphBuilderBase::CreateComputationalOp(ir::Graph *result,
...
@@ -481,45 +482,77 @@ void MultiDevSSAGraphBuilderBase::CreateComputationalOp(ir::Graph *result,
}
}
void
MultiDevSSAGraphBuilderBase
::
CreateAllReduceOp
(
ir
::
Graph
*
result
,
void
MultiDevSSAGraphBuilderBase
::
CreateAllReduceOp
(
ir
::
Graph
*
result
,
ir
::
Node
*
node
,
const
std
::
string
&
og
,
const
std
::
string
&
og
,
bool
is_encoded
)
const
{
bool
is_encoded
)
const
{
details
::
OpHandleBase
*
op_handle
=
nullptr
;
const
std
::
string
GRAD_MERGE_COND_NAME
=
"grad_merge_cond_name"
;
bool
is_grad_merge
=
node
->
Op
()
->
HasAttr
(
GRAD_MERGE_COND_NAME
);
std
::
string
grad_merge_cond_name
;
PADDLE_ENFORCE_EQ
((
is_encoded
&&
is_grad_merge
),
false
,
platform
::
errors
::
InvalidArgument
(
"DGC and GradMerge cannot use at same time, while "
"use_dgc=%d, use_grad_merge=%d"
,
is_encoded
,
is_grad_merge
));
auto
append_allreduce_op
=
[
&
](
auto
append_allreduce_op
=
[
&
](
const
std
::
vector
<
Scope
*>
&
scopes
,
const
std
::
vector
<
Scope
*>
&
scopes
,
const
std
::
vector
<
platform
::
Place
>
&
places
)
->
details
::
OpHandleBase
*
{
const
std
::
vector
<
platform
::
Place
>
&
places
)
->
details
::
OpHandleBase
*
{
#if defined(PADDLE_WITH_DGC) && defined(PADDLE_WITH_NCCL)
if
(
is_encoded
)
{
if
(
is_encoded
)
{
#if defined(PADDLE_WITH_DGC) && defined(PADDLE_WITH_NCCL)
result
->
Get
<
GraphOps
>
(
kGraphOps
).
emplace_back
(
result
->
Get
<
GraphOps
>
(
kGraphOps
).
emplace_back
(
new
details
::
SparseAllReduceOpHandle
(
new
details
::
SparseAllReduceOpHandle
(
result
->
CreateEmptyNode
(
"allreduce"
,
ir
::
Node
::
Type
::
kOperation
),
result
->
CreateEmptyNode
(
"allreduce"
,
ir
::
Node
::
Type
::
kOperation
),
scopes
,
places
,
multi_nccl_ctxs_
,
is_encoded
,
scopes
,
places
,
multi_nccl_ctxs_
,
is_encoded
,
strategy_
.
num_trainers_
*
places_
.
size
()));
strategy_
.
num_trainers_
*
places_
.
size
()));
#else
PADDLE_THROW
(
platform
::
errors
::
PreconditionNotMet
(
"This version of PaddlePaddle does NOT support DGC, "
"but got DGC grad in CreateAllReduceOp. "
"Please compile PaddlePaddle WITH_DGC first."
));
#endif
}
else
if
(
is_grad_merge
)
{
grad_merge_cond_name
=
BOOST_GET_CONST
(
std
::
string
,
node
->
Op
()
->
GetAttr
(
GRAD_MERGE_COND_NAME
));
VLOG
(
10
)
<<
"og="
<<
og
<<
" use grad_merge_allreduce"
;
#if defined(PADDLE_WITH_NCCL)
result
->
Get
<
GraphOps
>
(
kGraphOps
).
emplace_back
(
new
details
::
GradMergeAllReduceOpHandle
(
result
->
CreateEmptyNode
(
"allreduce"
,
ir
::
Node
::
Type
::
kOperation
),
scopes
,
places
,
grad_merge_cond_name
,
multi_nccl_ctxs_
));
#elif defined(PADDLE_WITH_XPU_BKCL)
result
->
Get
<
GraphOps
>
(
kGraphOps
).
emplace_back
(
new
datails
::
GradMergeAllReduceOpHandle
(
result
->
CreateEmptyNode
(
"allreduce"
,
ir
::
Node
::
Type
::
kOperation
),
scopes
,
places
,
grad_merge_cond_name
,
multi_bkcl_ctxs_
));
#else
result
->
Get
<
GraphOps
>
(
kGraphOps
).
emplace_back
(
new
details
::
GradMergeAllReduceOpHandle
(
result
->
CreateEmptyNode
(
"allreduce"
,
ir
::
Node
::
Type
::
kOperation
),
scopes
,
places
,
grad_merge_cond_name
));
#endif
}
else
{
}
else
{
#ifdef PADDLE_WITH_NCCL
result
->
Get
<
GraphOps
>
(
kGraphOps
).
emplace_back
(
result
->
Get
<
GraphOps
>
(
kGraphOps
).
emplace_back
(
new
details
::
AllReduceOpHandle
(
new
details
::
AllReduceOpHandle
(
result
->
CreateEmptyNode
(
"allreduce"
,
ir
::
Node
::
Type
::
kOperation
),
result
->
CreateEmptyNode
(
"allreduce"
,
ir
::
Node
::
Type
::
kOperation
),
scopes
,
places
,
multi_nccl_ctxs_
));
scopes
,
places
,
multi_nccl_ctxs_
));
}
#elif defined(PADDLE_WITH_NCCL)
result
->
Get
<
GraphOps
>
(
kGraphOps
).
emplace_back
(
new
details
::
AllReduceOpHandle
(
result
->
CreateEmptyNode
(
"allreduce"
,
ir
::
Node
::
Type
::
kOperation
),
scopes
,
places
,
multi_nccl_ctxs_
));
#elif defined(PADDLE_WITH_XPU_BKCL)
#elif defined(PADDLE_WITH_XPU_BKCL)
result
->
Get
<
GraphOps
>
(
kGraphOps
).
emplace_back
(
result
->
Get
<
GraphOps
>
(
kGraphOps
).
emplace_back
(
new
details
::
AllReduceOpHandle
(
new
details
::
AllReduceOpHandle
(
result
->
CreateEmptyNode
(
"allreduce"
,
ir
::
Node
::
Type
::
kOperation
),
result
->
CreateEmptyNode
(
"allreduce"
,
ir
::
Node
::
Type
::
kOperation
),
scopes
,
places
,
multi_bkcl_ctxs_
));
scopes
,
places
,
multi_bkcl_ctxs_
));
#else
#else
result
->
Get
<
GraphOps
>
(
kGraphOps
).
emplace_back
(
result
->
Get
<
GraphOps
>
(
kGraphOps
).
emplace_back
(
new
details
::
AllReduceOpHandle
(
new
details
::
AllReduceOpHandle
(
result
->
CreateEmptyNode
(
"allreduce"
,
ir
::
Node
::
Type
::
kOperation
),
result
->
CreateEmptyNode
(
"allreduce"
,
ir
::
Node
::
Type
::
kOperation
),
scopes
,
places
));
scopes
,
places
));
#endif
#endif
}
return
result
->
Get
<
GraphOps
>
(
kGraphOps
).
back
();
return
result
->
Get
<
GraphOps
>
(
kGraphOps
).
back
();
};
};
details
::
OpHandleBase
*
op_handle
=
nullptr
;
if
(
!
strategy_
.
enable_parallel_graph_
)
if
(
!
strategy_
.
enable_parallel_graph_
)
op_handle
=
append_allreduce_op
(
local_scopes_
,
places_
);
op_handle
=
append_allreduce_op
(
local_scopes_
,
places_
);
...
@@ -546,6 +579,36 @@ void MultiDevSSAGraphBuilderBase::CreateAllReduceOp(ir::Graph *result,
...
@@ -546,6 +579,36 @@ void MultiDevSSAGraphBuilderBase::CreateAllReduceOp(ir::Graph *result,
op_handle
->
AddOutput
(
var
);
op_handle
->
AddOutput
(
var
);
VLOG
(
10
)
<<
"all_reduce_op_handle add output "
<<
og
VLOG
(
10
)
<<
"all_reduce_op_handle add output "
<<
og
<<
", handle:"
<<
var
->
DebugString
();
<<
", handle:"
<<
var
->
DebugString
();
if
(
is_grad_merge
)
{
// NOTE(wangxi). grad_merge_cond_var is used by
// GradMergeAllReduceOpHandle, but it is not the input of
// grad_merge_all_reduce_op_handle. So we must add dep_var to resolve
// WAR data hazard, for grad_merge_all_reduce_op_handle may be
// executed before grad_merge_cond_op.
auto
&
grad_merge_cond_vars
=
result
->
Get
<
details
::
GraphVars
>
(
details
::
kGraphVars
)[
i
][
grad_merge_cond_name
];
PADDLE_ENFORCE_EQ
(
grad_merge_cond_vars
.
empty
(),
false
,
platform
::
errors
::
InvalidArgument
(
"Can not find Var(%s) in Place[%d] "
"Paddle Can not add GradMergeAllReduce OP for Var(%s)."
,
grad_merge_cond_name
,
i
,
og
));
auto
&
grad_merge_cond_var
=
grad_merge_cond_vars
.
back
();
auto
*
cond_op
=
grad_merge_cond_var
->
GeneratedOp
();
PADDLE_ENFORCE_NOT_NULL
(
cond_op
,
platform
::
errors
::
Fatal
(
"grad_merge_cond_var(%s)'s generated op handle must not be NULL"
,
grad_merge_cond_name
));
auto
*
dep_var
=
new
details
::
DummyVarHandle
(
result
->
CreateControlDepVar
());
result
->
Get
<
details
::
GraphDepVars
>
(
details
::
kGraphDepVars
)
.
emplace
(
dep_var
);
cond_op
->
AddOutput
(
dep_var
);
op_handle
->
AddInput
(
dep_var
);
}
}
}
}
}
...
@@ -650,16 +713,16 @@ void MultiDevSSAGraphBuilderBase::CreateIsolatedVarNode(
...
@@ -650,16 +713,16 @@ void MultiDevSSAGraphBuilderBase::CreateIsolatedVarNode(
}
}
void
AllReduceSSAGraphBuilder
::
InsertCollectiveOp
(
void
AllReduceSSAGraphBuilder
::
InsertCollectiveOp
(
ir
::
Graph
*
result
,
const
std
::
string
&
p_name
,
ir
::
Graph
*
result
,
ir
::
Node
*
node
,
const
std
::
string
&
p_name
,
const
std
::
string
&
g_name
)
const
{
const
std
::
string
&
g_name
)
const
{
if
(
IsSparseGradient
(
g_name
))
{
if
(
IsSparseGradient
(
g_name
))
{
CreateReduceOp
(
result
,
g_name
,
0
);
CreateReduceOp
(
result
,
g_name
,
0
);
CreateBroadcastOp
(
result
,
g_name
,
0
);
CreateBroadcastOp
(
result
,
g_name
,
0
);
}
else
{
}
else
{
#if defined(PADDLE_WITH_DGC)
#if defined(PADDLE_WITH_DGC)
CreateAllReduceOp
(
result
,
g_name
,
IsEncoded
(
p_name
));
CreateAllReduceOp
(
result
,
node
,
g_name
,
IsEncoded
(
p_name
));
#else
#else
CreateAllReduceOp
(
result
,
g_name
);
CreateAllReduceOp
(
result
,
node
,
g_name
);
#endif
#endif
}
}
}
}
...
@@ -750,7 +813,7 @@ void ReduceSSAGraphBuilder::ResetState() const {
...
@@ -750,7 +813,7 @@ void ReduceSSAGraphBuilder::ResetState() const {
}
}
void
ReduceSSAGraphBuilder
::
InsertCollectiveOp
(
void
ReduceSSAGraphBuilder
::
InsertCollectiveOp
(
ir
::
Graph
*
result
,
const
std
::
string
&
p_name
,
ir
::
Graph
*
result
,
ir
::
Node
*
node
,
const
std
::
string
&
p_name
,
const
std
::
string
&
g_name
)
const
{
const
std
::
string
&
g_name
)
const
{
size_t
cur_device_id
=
GetAppropriateDeviceID
({
g_name
});
size_t
cur_device_id
=
GetAppropriateDeviceID
({
g_name
});
CreateReduceOp
(
result
,
g_name
,
cur_device_id
);
CreateReduceOp
(
result
,
g_name
,
cur_device_id
);
...
@@ -1128,7 +1191,7 @@ bool AllReduceSSAGraphBuilder::IsEncoded(const std::string &p_name) const {
...
@@ -1128,7 +1191,7 @@ bool AllReduceSSAGraphBuilder::IsEncoded(const std::string &p_name) const {
}
}
#endif
#endif
void
DistSSAGraphBuilder
::
InsertCollectiveOp
(
ir
::
Graph
*
result
,
void
DistSSAGraphBuilder
::
InsertCollectiveOp
(
ir
::
Graph
*
result
,
ir
::
Node
*
node
,
const
std
::
string
&
p_name
,
const
std
::
string
&
p_name
,
const
std
::
string
&
g_name
)
const
{
const
std
::
string
&
g_name
)
const
{
// collective gradient to each device
// collective gradient to each device
...
@@ -1144,7 +1207,7 @@ void DistSSAGraphBuilder::InsertCollectiveOp(ir::Graph *result,
...
@@ -1144,7 +1207,7 @@ void DistSSAGraphBuilder::InsertCollectiveOp(ir::Graph *result,
CreateReduceOp
(
result
,
g_name
,
0
);
CreateReduceOp
(
result
,
g_name
,
0
);
CreateBroadcastOp
(
result
,
g_name
,
0
);
CreateBroadcastOp
(
result
,
g_name
,
0
);
}
else
{
}
else
{
CreateAllReduceOp
(
result
,
g_name
);
CreateAllReduceOp
(
result
,
node
,
g_name
);
}
}
break
;
break
;
default:
default:
...
...
paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h
浏览文件 @
ee16006b
...
@@ -66,7 +66,8 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass {
...
@@ -66,7 +66,8 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass {
virtual
std
::
vector
<
ir
::
Node
*>
SortOperations
(
const
ir
::
Graph
&
graph
)
const
;
virtual
std
::
vector
<
ir
::
Node
*>
SortOperations
(
const
ir
::
Graph
&
graph
)
const
;
virtual
void
InsertCollectiveOp
(
ir
::
Graph
*
result
,
const
std
::
string
&
p_name
,
virtual
void
InsertCollectiveOp
(
ir
::
Graph
*
result
,
ir
::
Node
*
node
,
const
std
::
string
&
p_name
,
const
std
::
string
&
g_name
)
const
=
0
;
const
std
::
string
&
g_name
)
const
=
0
;
virtual
bool
DealWithSpecialOp
(
ir
::
Graph
*
result
,
ir
::
Node
*
node
)
const
;
virtual
bool
DealWithSpecialOp
(
ir
::
Graph
*
result
,
ir
::
Node
*
node
)
const
;
...
@@ -96,8 +97,8 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass {
...
@@ -96,8 +97,8 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass {
bool
IsSparseGradient
(
const
std
::
string
&
og
)
const
;
bool
IsSparseGradient
(
const
std
::
string
&
og
)
const
;
void
CreateAllReduceOp
(
ir
::
Graph
*
result
,
const
std
::
string
&
og
,
void
CreateAllReduceOp
(
ir
::
Graph
*
result
,
ir
::
Node
*
node
,
bool
is_encoded
=
false
)
const
;
const
std
::
string
&
og
,
bool
is_encoded
=
false
)
const
;
void
CreateBroadcastOp
(
ir
::
Graph
*
result
,
const
std
::
string
&
p_name
,
void
CreateBroadcastOp
(
ir
::
Graph
*
result
,
const
std
::
string
&
p_name
,
size_t
src_dev_id
)
const
;
size_t
src_dev_id
)
const
;
...
@@ -134,7 +135,8 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass {
...
@@ -134,7 +135,8 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass {
class
AllReduceSSAGraphBuilder
:
public
MultiDevSSAGraphBuilderBase
{
class
AllReduceSSAGraphBuilder
:
public
MultiDevSSAGraphBuilderBase
{
protected:
protected:
virtual
void
InsertCollectiveOp
(
ir
::
Graph
*
result
,
const
std
::
string
&
p_name
,
virtual
void
InsertCollectiveOp
(
ir
::
Graph
*
result
,
ir
::
Node
*
node
,
const
std
::
string
&
p_name
,
const
std
::
string
&
g_name
)
const
;
const
std
::
string
&
g_name
)
const
;
virtual
void
InsertPostprocessOps
(
ir
::
Graph
*
result
)
const
{}
virtual
void
InsertPostprocessOps
(
ir
::
Graph
*
result
)
const
{}
...
@@ -144,7 +146,8 @@ class AllReduceSSAGraphBuilder : public MultiDevSSAGraphBuilderBase {
...
@@ -144,7 +146,8 @@ class AllReduceSSAGraphBuilder : public MultiDevSSAGraphBuilderBase {
class
AsyncSSAGraphBuilder
:
public
MultiDevSSAGraphBuilderBase
{
class
AsyncSSAGraphBuilder
:
public
MultiDevSSAGraphBuilderBase
{
protected:
protected:
void
InsertCollectiveOp
(
ir
::
Graph
*
result
,
const
std
::
string
&
p_name
,
void
InsertCollectiveOp
(
ir
::
Graph
*
result
,
ir
::
Node
*
node
,
const
std
::
string
&
p_name
,
const
std
::
string
&
g_name
)
const
override
{}
const
std
::
string
&
g_name
)
const
override
{}
bool
NeedCollectiveForGrad
(
const
std
::
string
&
grad_name
,
bool
NeedCollectiveForGrad
(
const
std
::
string
&
grad_name
,
...
@@ -183,7 +186,8 @@ class ReduceSSAGraphBuilder : public BalanceVarSSAGraphBuilder {
...
@@ -183,7 +186,8 @@ class ReduceSSAGraphBuilder : public BalanceVarSSAGraphBuilder {
protected:
protected:
virtual
void
Init
()
const
;
virtual
void
Init
()
const
;
virtual
void
InsertCollectiveOp
(
ir
::
Graph
*
result
,
const
std
::
string
&
p_name
,
virtual
void
InsertCollectiveOp
(
ir
::
Graph
*
result
,
ir
::
Node
*
node
,
const
std
::
string
&
p_name
,
const
std
::
string
&
g_name
)
const
;
const
std
::
string
&
g_name
)
const
;
virtual
bool
DealWithSpecialOp
(
ir
::
Graph
*
result
,
ir
::
Node
*
node
)
const
;
virtual
bool
DealWithSpecialOp
(
ir
::
Graph
*
result
,
ir
::
Node
*
node
)
const
;
...
@@ -212,7 +216,8 @@ class DistSSAGraphBuilder : public BalanceVarSSAGraphBuilder {
...
@@ -212,7 +216,8 @@ class DistSSAGraphBuilder : public BalanceVarSSAGraphBuilder {
virtual
void
InsertPostprocessOps
(
ir
::
Graph
*
result
)
const
;
virtual
void
InsertPostprocessOps
(
ir
::
Graph
*
result
)
const
;
virtual
void
InsertCollectiveOp
(
ir
::
Graph
*
result
,
const
std
::
string
&
p_name
,
virtual
void
InsertCollectiveOp
(
ir
::
Graph
*
result
,
ir
::
Node
*
node
,
const
std
::
string
&
p_name
,
const
std
::
string
&
g_name
)
const
;
const
std
::
string
&
g_name
)
const
;
virtual
void
ResetState
()
const
;
virtual
void
ResetState
()
const
;
...
...
paddle/fluid/framework/parallel_executor.cc
浏览文件 @
ee16006b
...
@@ -847,6 +847,17 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
...
@@ -847,6 +847,17 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
}
}
}
}
if
(
graph
->
Has
(
details
::
kFusedVars
))
{
auto
&
fused_vars
=
graph
->
Get
<
details
::
FusedVars
>
(
details
::
kFusedVars
);
for
(
auto
&
fused_var
:
fused_vars
)
{
var_infos
.
emplace_back
();
var_infos
.
back
()
=
fused_var
.
second
;
member_
->
is_persistable_
.
emplace
(
fused_var
.
first
,
fused_var
.
second
.
persistable_
);
}
}
std
::
unordered_map
<
Scope
*
,
Scope
*>
scope_map
;
std
::
unordered_map
<
Scope
*
,
Scope
*>
scope_map
;
for
(
auto
*
scope
:
member_
->
local_scopes_
)
{
for
(
auto
*
scope
:
member_
->
local_scopes_
)
{
auto
&
local_exec_scope
=
scope
->
NewScope
();
auto
&
local_exec_scope
=
scope
->
NewScope
();
...
...
paddle/fluid/operators/coalesce_tensor_op.cc
浏览文件 @
ee16006b
...
@@ -64,7 +64,7 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
...
@@ -64,7 +64,7 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
platform
::
errors
::
InvalidArgument
(
platform
::
errors
::
InvalidArgument
(
"The output variable %s of CoalesceTensor operator "
"The output variable %s of CoalesceTensor operator "
"is not LoDTensor."
,
"is not LoDTensor."
,
in
_var_names
[
i
]));
out
_var_names
[
i
]));
}
}
auto
in_tensors
=
context
.
MultiInput
<
framework
::
LoDTensor
>
(
"Input"
);
auto
in_tensors
=
context
.
MultiInput
<
framework
::
LoDTensor
>
(
"Input"
);
...
@@ -123,6 +123,22 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
...
@@ -123,6 +123,22 @@ class CoalesceTensorOpKernel : public framework::OpKernel<T> {
math
::
SetConstant
<
DeviceContext
,
T
>
set_constant
;
math
::
SetConstant
<
DeviceContext
,
T
>
set_constant
;
set_constant
(
dev_ctx
,
fused_tensor
,
set_constant
(
dev_ctx
,
fused_tensor
,
static_cast
<
T
>
(
context
.
Attr
<
float
>
(
"constant"
)));
static_cast
<
T
>
(
context
.
Attr
<
float
>
(
"constant"
)));
}
else
if
(
context
.
Attr
<
bool
>
(
"persist_output"
))
{
for
(
size_t
i
=
0
;
i
<
out_var_names
.
size
();
++
i
)
{
size_t
len
=
static_cast
<
size_t
>
(
out_tensors
[
i
]
->
numel
());
auto
sub_tensor
=
fused_tensor
->
Slice
(
static_cast
<
int64_t
>
(
offset
),
static_cast
<
int64_t
>
(
offset
+
len
));
// some var may not persistable, or persistable var may not init
if
(
out_tensors
[
i
]
->
IsInitialized
())
{
framework
::
TensorCopy
(
*
out_tensors
[
i
],
context
.
GetPlace
(),
dev_ctx
,
&
sub_tensor
);
}
offset
+=
use_align
?
platform
::
Alignment
(
len
*
size_of_dtype
,
context
.
GetPlace
())
/
size_of_dtype
:
len
;
}
}
}
// Make the outputs point to the continuous space.
// Make the outputs point to the continuous space.
...
@@ -225,6 +241,9 @@ class CoalesceTensorOpMaker : public framework::OpProtoAndCheckerMaker {
...
@@ -225,6 +241,9 @@ class CoalesceTensorOpMaker : public framework::OpProtoAndCheckerMaker {
AddAttr
<
bool
>
(
"set_constant"
,
AddAttr
<
bool
>
(
"set_constant"
,
"Whether to set the Output with a constant value."
)
"Whether to set the Output with a constant value."
)
.
SetDefault
(
false
);
.
SetDefault
(
false
);
AddAttr
<
bool
>
(
"persist_output"
,
"Whether to persist the original Output value."
)
.
SetDefault
(
false
);
AddAttr
<
float
>
(
"constant"
,
AddAttr
<
float
>
(
"constant"
,
"If set_constant is true, the constant value will be used "
"If set_constant is true, the constant value will be used "
"to set the Output."
)
"to set the Output."
)
...
@@ -250,7 +269,8 @@ Note that, the dtype of Input should be the same, and the dim of Input
...
@@ -250,7 +269,8 @@ Note that, the dtype of Input should be the same, and the dim of Input
and Output should equal.
and Output should equal.
The tensors of Input and Output could be the same or different. And
The tensors of Input and Output could be the same or different. And
coalesce_tensor allows copying the value of Input to Output, or
coalesce_tensor allows copying the value of Input to Output, or
setting the Output with a constant value.
setting the Output with a constant value, or persist the original Output
value.
)DOC"
);
)DOC"
);
}
}
...
...
python/paddle/fluid/layers/control_flow.py
浏览文件 @
ee16006b
...
@@ -2188,8 +2188,11 @@ class ConditionalBlock(object):
...
@@ -2188,8 +2188,11 @@ class ConditionalBlock(object):
def
need_append_conditional_block_grad
(
self
,
inside_block
):
def
need_append_conditional_block_grad
(
self
,
inside_block
):
grad_sub_block_idx
=
inside_block
.
backward_block_idx
grad_sub_block_idx
=
inside_block
.
backward_block_idx
inside_block_idx
=
inside_block
.
idx
return
grad_sub_block_idx
!=
-
1
# if inside_block have grad_block and grad_block is not itself,
# we will append conditional block grad.
return
grad_sub_block_idx
!=
-
1
and
grad_sub_block_idx
!=
inside_block_idx
def
append_conditional_block_grad
(
self
,
parent_block
,
inside_block
,
def
append_conditional_block_grad
(
self
,
parent_block
,
inside_block
,
conditional_block_op
):
conditional_block_op
):
...
...
python/paddle/fluid/optimizer.py
浏览文件 @
ee16006b
...
@@ -5063,6 +5063,8 @@ class GradientMergeOptimizer(object):
...
@@ -5063,6 +5063,8 @@ class GradientMergeOptimizer(object):
print("step=%d, cost=%f" % (i, cost_val[0]))
print("step=%d, cost=%f" % (i, cost_val[0]))
"""
"""
GRAD_MERGE_COND_NAME
=
"grad_merge_cond_name"
def
__init__
(
self
,
inner_optimizer
,
k_steps
=
1
,
avg
=
True
):
def
__init__
(
self
,
inner_optimizer
,
k_steps
=
1
,
avg
=
True
):
if
framework
.
in_dygraph_mode
():
if
framework
.
in_dygraph_mode
():
raise
Exception
(
raise
Exception
(
...
@@ -5078,6 +5080,7 @@ class GradientMergeOptimizer(object):
...
@@ -5078,6 +5080,7 @@ class GradientMergeOptimizer(object):
self
.
k_steps
=
k_steps
self
.
k_steps
=
k_steps
self
.
type
=
"gradient_merge"
self
.
type
=
"gradient_merge"
self
.
avg
=
avg
self
.
avg
=
avg
self
.
_optimize_ops
=
None
def
_set_k_steps
(
self
,
k_steps
):
def
_set_k_steps
(
self
,
k_steps
):
self
.
k_steps
=
k_steps
self
.
k_steps
=
k_steps
...
@@ -5085,12 +5088,12 @@ class GradientMergeOptimizer(object):
...
@@ -5085,12 +5088,12 @@ class GradientMergeOptimizer(object):
def
_set_avg
(
self
,
avg
):
def
_set_avg
(
self
,
avg
):
self
.
avg
=
avg
self
.
avg
=
avg
def
minimize
(
self
,
def
backward
(
self
,
loss
,
loss
,
startup_program
=
None
,
startup_program
=
None
,
parameter_list
=
None
,
parameter_list
=
None
,
no_grad_set
=
None
):
no_grad_set
=
None
,
callbacks
=
None
):
assert
isinstance
(
loss
,
Variable
),
"The loss should be an Variable."
assert
isinstance
(
loss
,
Variable
),
"The loss should be an Variable."
assert
(
assert
(
parameter_list
is
None
parameter_list
is
None
...
@@ -5101,26 +5104,142 @@ class GradientMergeOptimizer(object):
...
@@ -5101,26 +5104,142 @@ class GradientMergeOptimizer(object):
params_grads
=
self
.
inner_optimizer
.
backward
(
params_grads
=
self
.
inner_optimizer
.
backward
(
loss
,
startup_program
=
startup_program
)
loss
,
startup_program
=
startup_program
)
return
params_grads
def
apply_optimize
(
self
,
loss
,
startup_program
,
params_grads
):
program
=
loss
.
block
.
program
with
program_guard
(
program
,
startup_program
):
optimize_ops
=
self
.
apply_gradients
(
params_grads
)
return
optimize_ops
def
_is_the_backward_op
(
self
,
op
):
op_maker
=
core
.
op_proto_and_checker_maker
backward
=
core
.
op_proto_and_checker_maker
.
OpRole
.
Backward
if
op_maker
.
kOpRoleVarAttrName
()
in
op
.
attr_names
and
\
int
(
op
.
all_attrs
()[
op_maker
.
kOpRoleAttrName
()])
==
int
(
backward
):
return
True
return
False
def
_remove_op_role_var
(
self
,
param
,
grad
):
op_maker
=
core
.
op_proto_and_checker_maker
op
=
grad
.
op
assert
self
.
_is_the_backward_op
(
op
),
\
'grad.op={} is not the backward op which produces the grad={}'
\
.
format
(
op
,
grad
.
name
)
block
=
grad
.
block
var_attr
=
op
.
all_attrs
()[
op_maker
.
kOpRoleVarAttrName
()]
assert
param
.
name
in
var_attr
,
\
'when using GradientMergeOptimizer, param={} must be in var_attr={}'
\
.
format
(
param
.
name
,
var_attr
)
assert
grad
.
name
in
var_attr
,
\
'when using GradientMergeOptimizer, grad={} must be in var_attr={}'
\
.
format
(
param
.
name
,
var_attr
)
# remove (param, grad) from op_role_var
var_attr
.
remove
(
param
.
name
)
var_attr
.
remove
(
grad
.
name
)
if
len
(
var_attr
)
>
1
:
op
.
_set_attr
(
op_maker
.
kOpRoleVarAttrName
(),
var_attr
)
else
:
op
.
_remove_attr
(
op_maker
.
kOpRoleVarAttrName
())
def
_add_gm_op_role_var
(
self
,
op
,
param
,
grad
,
cond
):
grad
.
op
=
op
op_maker
=
core
.
op_proto_and_checker_maker
backward
=
op_maker
.
OpRole
.
Backward
# NOTE(wangxi). When distributed, we will insert grad_merge_all_reduce_op_handle
# in multi_devices_graph_pass, which will allreduce(grad) if cond is True, else
# do nothing.
# In this way, the gradient can be merged first, and then communicate when the
# condition is met, reducing the number of communications to increase the
# speed.
op
.
_set_attr
(
self
.
GRAD_MERGE_COND_NAME
,
cond
.
name
)
op
.
_set_attr
(
op_maker
.
kOpRoleAttrName
(),
backward
)
op
.
_set_attr
(
op_maker
.
kOpRoleVarAttrName
(),
[
param
.
name
,
grad
.
name
])
def
_get_gm_cond_var
(
self
,
main_block
):
# Add const var
k_step_var
=
layers
.
create_global_var
(
name
=
"gradient_merge_k"
,
shape
=
[
1
],
value
=
int
(
self
.
k_steps
),
dtype
=
'int32'
,
persistable
=
True
,
force_cpu
=
True
)
zero_var
=
layers
.
create_global_var
(
name
=
"gradient_merge_zero"
,
shape
=
[
1
],
value
=
int
(
0
),
dtype
=
'int32'
,
persistable
=
True
,
force_cpu
=
True
)
# Add step var & cond var
step_var
=
layers
.
create_global_var
(
name
=
"gradient_merge_step"
,
shape
=
[
1
],
value
=
int
(
0
),
dtype
=
'int32'
,
persistable
=
True
,
force_cpu
=
True
)
cond_var
=
layers
.
create_global_var
(
name
=
"gradient_merge_cond"
,
shape
=
[
1
],
value
=
bool
(
0
),
dtype
=
'bool'
,
persistable
=
True
,
force_cpu
=
True
)
with
device_guard
(
"cpu"
):
# step_var = (step_var + 1) % k_step
layers
.
increment
(
x
=
step_var
,
value
=
1.0
,
in_place
=
True
)
main_block
.
append_op
(
type
=
'elementwise_mod'
,
inputs
=
{
'X'
:
step_var
,
'Y'
:
k_step_var
},
outputs
=
{
'Out'
:
step_var
},
attrs
=
{
'axis'
:
-
1
,
'use_mkldnn'
:
False
})
# cond_var = (step_var == 0)
main_block
.
append_op
(
type
=
'equal'
,
inputs
=
{
'X'
:
step_var
,
'Y'
:
zero_var
},
outputs
=
{
'Out'
:
cond_var
})
return
cond_var
def
apply_gradients
(
self
,
params_grads
):
main_program
=
default_main_program
()
startup_program
=
default_startup_program
()
main_block
=
main_program
.
global_block
()
startup_block
=
startup_program
.
global_block
()
cond
=
self
.
_get_gm_cond_var
(
main_block
)
#TODO(mapingshuo) support sparse embedding
#TODO(mapingshuo) support sparse embedding
for
k
,
v
in
params_grads
:
# step1: remove grad.op's op_role_var
for
param
,
grad
in
params_grads
:
assert
(
assert
(
v
.
type
!=
core
.
VarDesc
.
VarType
.
SELECTED_ROWS
param
.
type
!=
core
.
VarDesc
.
VarType
.
SELECTED_ROWS
),
"SELECTED_ROWS is not supported in GradientMergeOptimizer for now"
),
"SELECTED_ROWS is not supported in GradientMergeOptimizer for now"
param_to_grad
=
{
k
.
name
:
v
for
(
k
,
v
)
in
params_grads
}
self
.
_remove_op_role_var
(
param
,
grad
)
# Get startup_program and main_program
if
startup_program
is
None
:
startup_program
=
default_startup_program
()
main_block
=
loss
.
block
# add some vars to the main_program and startup_program
param_to_grad
=
{
k
.
name
:
v
for
(
k
,
v
)
in
params_grads
}
startup_block
=
startup_program
.
global_block
()
param_names
=
param_to_grad
.
keys
()
param_names
=
param_to_grad
.
keys
()
param_to_gradient_merge
=
{}
param_to_gradient_merge
=
{}
for
param_name
in
param_names
:
new_params_grads
=
[]
# step2: create gradient_merge var and init with 0
# and update op_role_var
for
param
,
grad
in
params_grads
:
param_name
=
param
.
name
param_var
=
main_block
.
var
(
param_name
)
param_var
=
main_block
.
var
(
param_name
)
assert
(
param_var
is
not
None
)
assert
(
param_var
is
not
None
)
gradient_merge_var
=
main_block
.
create_var
(
gradient_merge_var
=
main_block
.
create_var
(
...
@@ -5129,6 +5248,7 @@ class GradientMergeOptimizer(object):
...
@@ -5129,6 +5248,7 @@ class GradientMergeOptimizer(object):
dtype
=
param_var
.
dtype
,
dtype
=
param_var
.
dtype
,
persistable
=
True
)
persistable
=
True
)
param_to_gradient_merge
[
param_name
]
=
gradient_merge_var
param_to_gradient_merge
[
param_name
]
=
gradient_merge_var
startup_gradient_merge_var
=
startup_block
.
create_var
(
startup_gradient_merge_var
=
startup_block
.
create_var
(
name
=
param_name
+
"@GRAD@GradientMerge"
,
name
=
param_name
+
"@GRAD@GradientMerge"
,
shape
=
param_var
.
shape
,
shape
=
param_var
.
shape
,
...
@@ -5143,92 +5263,75 @@ class GradientMergeOptimizer(object):
...
@@ -5143,92 +5263,75 @@ class GradientMergeOptimizer(object):
"value"
:
float
(
0
),
"value"
:
float
(
0
),
})
})
with
framework
.
program_guard
(
main_block
.
program
,
startup_program
):
# grad_merge += grad
# Add Var k to main prog and startup prog
new_grad_op
=
main_block
.
append_op
(
gradient_merge_k
=
layers
.
create_global_var
(
type
=
"elementwise_add"
,
name
=
"gradient_merge_k"
,
inputs
=
{
'X'
:
grad
,
shape
=
[
1
],
'Y'
:
gradient_merge_var
},
value
=
int
(
self
.
k_steps
),
outputs
=
{
'Out'
:
gradient_merge_var
},
dtype
=
'int32'
,
attrs
=
{
'axis'
:
-
1
,
persistable
=
True
)
'use_mkldnn'
:
False
})
self
.
_add_gm_op_role_var
(
new_grad_op
,
param
,
gradient_merge_var
,
cond
)
new_params_grads
.
append
([
param
,
gradient_merge_var
])
def
true_apply_gradient
():
cur_block_idx
=
main_program
.
current_block_idx
cur_block
=
main_program
.
current_block
()
# cur_block's forward_block & backward_block is itself
cur_block
.
_set_forward_block_idx
(
cur_block_idx
)
if
self
.
avg
:
for
param
,
new_grad
in
new_params_grads
:
# grad /= k_steps
cur_block
.
append_op
(
type
=
'scale'
,
inputs
=
{
'X'
:
new_grad
},
outputs
=
{
'Out'
:
new_grad
},
attrs
=
{
'scale'
:
1.0
/
self
.
k_steps
,
'bias'
:
0.0
,
'bias_after_scale'
:
False
})
# Add Var step
for
param
,
new_grad
in
new_params_grads
:
gradient_merge_step
=
layers
.
create_global_var
(
# NOTE. regularization will append ops to grad.block,
name
=
"gradient_merge_step"
,
# while new_grad's real block is global_block,
shape
=
[
1
],
# but we want append regularization ops to cur_block,
value
=
int
(
0
),
# so we set new_grad.block = cur_block
dtype
=
'int32'
,
new_grad
.
block
=
cur_block
persistable
=
True
)
layers
.
increment
(
x
=
gradient_merge_step
,
value
=
1.0
,
in_place
=
True
)
# gradient merge
self
.
_optimize_ops
=
self
.
inner_optimizer
.
apply_gradients
(
zero_var
=
layers
.
fill_constant
(
new_params_grads
)
shape
=
[
1
],
dtype
=
'float32'
,
value
=
0.0
)
one_var
=
layers
.
fill_constant
(
shape
=
[
1
],
dtype
=
'float32'
,
value
=
1.0
)
mod
=
layers
.
elementwise_mod
(
gradient_merge_step
,
gradient_merge_k
)
# clear gradient_merge_vars
with
layers
.
control_flow
.
Switch
()
as
switch
:
for
param
,
new_grad
in
new_params_grads
:
with
switch
.
case
(
mod
!=
zero_var
):
layers
.
fill_constant
(
# 1. update the gradient_merge_vars
shape
=
new_grad
.
shape
,
# gradient_merge_vars += gradient_vars
dtype
=
new_grad
.
dtype
,
cur_block
=
main_block
.
program
.
current_block
()
value
=
0.0
,
for
param_name
in
param_names
:
out
=
new_grad
)
grad
=
param_to_grad
[
param_name
]
grad_merge
=
param_to_gradient_merge
[
param_name
]
# step3. apply gradient
cur_block
.
append_op
(
layers
.
cond
(
cond
,
true_fn
=
true_apply_gradient
,
false_fn
=
None
)
type
=
"elementwise_add"
,
inputs
=
{
'X'
:
grad
,
return
self
.
_optimize_ops
'Y'
:
grad_merge
},
outputs
=
{
'Out'
:
grad_merge
},
def
minimize
(
self
,
attrs
=
{
'axis'
:
-
1
,
loss
,
'use_mkldnn'
:
False
})
startup_program
=
None
,
parameter_list
=
None
,
no_grad_set
=
None
):
assert
isinstance
(
loss
,
Variable
),
"The loss should be an Variable."
params_grads
=
self
.
backward
(
loss
,
startup_program
=
startup_program
,
parameter_list
=
parameter_list
,
no_grad_set
=
no_grad_set
)
optimize_ops
=
self
.
apply_optimize
(
loss
,
startup_program
=
startup_program
,
params_grads
=
params_grads
)
with
switch
.
default
():
# 1. update the graient_vars
# gradient_vars += gradient_merge_vars
cur_block_idx
=
main_block
.
program
.
current_block_idx
cur_block
=
main_block
.
program
.
current_block
()
for
param_name
in
param_names
:
grad
=
param_to_grad
[
param_name
]
grad_merge
=
param_to_gradient_merge
[
param_name
]
if
self
.
avg
:
tmp_var
=
layers
.
elementwise_add
(
grad
,
grad_merge
)
cur_block
.
append_op
(
type
=
'scale'
,
inputs
=
{
'X'
:
tmp_var
},
outputs
=
{
'Out'
:
grad
},
attrs
=
{
'scale'
:
1.0
/
self
.
k_steps
,
'bias'
:
0.0
,
'bias_after_scale'
:
False
})
else
:
cur_block
.
append_op
(
type
=
"elementwise_add"
,
inputs
=
{
'X'
:
grad
,
'Y'
:
grad_merge
},
outputs
=
{
'Out'
:
grad
},
attrs
=
{
'axis'
:
-
1
,
'use_mkldnn'
:
False
})
# 2. apply_optimize
target_grad_block
=
main_block
.
program
.
_create_block
(
parent_idx
=
cur_block
.
parent_idx
)
target_grad_block
.
_set_forward_block_idx
(
cur_block_idx
)
main_block
.
program
.
current_block_idx
=
cur_block_idx
optimize_ops
=
self
.
inner_optimizer
.
apply_optimize
(
loss
,
startup_program
=
startup_program
,
params_grads
=
params_grads
)
# 3. clear gradient_merge_vars
for
param_name
in
param_names
:
grad_merge
=
param_to_gradient_merge
[
param_name
]
layers
.
fill_constant
(
shape
=
grad_merge
.
shape
,
dtype
=
grad_merge
.
dtype
,
value
=
0.0
,
out
=
grad_merge
)
return
optimize_ops
,
params_grads
return
optimize_ops
,
params_grads
python/paddle/fluid/tests/unittests/CMakeLists.txt
浏览文件 @
ee16006b
...
@@ -658,6 +658,7 @@ if (WITH_DISTRIBUTE AND NOT APPLE)
...
@@ -658,6 +658,7 @@ if (WITH_DISTRIBUTE AND NOT APPLE)
if
(
WITH_GPU
)
if
(
WITH_GPU
)
set_tests_properties
(
test_c_comm_init_op PROPERTIES TIMEOUT 120
)
set_tests_properties
(
test_c_comm_init_op PROPERTIES TIMEOUT 120
)
set_tests_properties
(
test_fleet_checkpoint PROPERTIES TIMEOUT 120
)
set_tests_properties
(
test_fleet_checkpoint PROPERTIES TIMEOUT 120
)
set_tests_properties
(
test_dist_mnist_gradient_merge PROPERTIES TIMEOUT 120
)
endif
()
endif
()
endif
()
endif
()
...
...
python/paddle/fluid/tests/unittests/dist_mnist_gradient_merge.py
0 → 100644
浏览文件 @
ee16006b
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
paddle
import
paddle.fluid
as
fluid
from
test_dist_base
import
TestDistRunnerBase
,
runtime_main
from
dist_mnist
import
cnn_model
DTYPE
=
"float32"
paddle
.
dataset
.
mnist
.
fetch
()
# Fix seed for test
fluid
.
default_startup_program
().
random_seed
=
1
fluid
.
default_main_program
().
random_seed
=
1
class
TestDistMnist2x2
(
TestDistRunnerBase
):
def
get_model
(
self
,
batch_size
=
2
):
# Input data
images
=
fluid
.
layers
.
data
(
name
=
'pixel'
,
shape
=
[
1
,
28
,
28
],
dtype
=
DTYPE
)
label
=
fluid
.
layers
.
data
(
name
=
'label'
,
shape
=
[
1
],
dtype
=
'int64'
)
# Train program
predict
=
cnn_model
(
images
)
cost
=
fluid
.
layers
.
cross_entropy
(
input
=
predict
,
label
=
label
)
avg_cost
=
fluid
.
layers
.
mean
(
x
=
cost
)
# Evaluator
batch_size_tensor
=
fluid
.
layers
.
create_tensor
(
dtype
=
'int64'
)
batch_acc
=
fluid
.
layers
.
accuracy
(
input
=
predict
,
label
=
label
,
total
=
batch_size_tensor
)
inference_program
=
fluid
.
default_main_program
().
clone
()
# Optimization
opt
=
fluid
.
optimizer
.
MomentumOptimizer
(
learning_rate
=
0.001
,
momentum
=
0.9
)
opt
=
fluid
.
optimizer
.
GradientMergeOptimizer
(
opt
,
2
)
# Reader
train_reader
=
paddle
.
batch
(
paddle
.
dataset
.
mnist
.
test
(),
batch_size
=
batch_size
)
test_reader
=
paddle
.
batch
(
paddle
.
dataset
.
mnist
.
test
(),
batch_size
=
batch_size
)
opt
.
minimize
(
avg_cost
)
return
inference_program
,
avg_cost
,
train_reader
,
test_reader
,
batch_acc
,
predict
if
__name__
==
"__main__"
:
runtime_main
(
TestDistMnist2x2
)
python/paddle/fluid/tests/unittests/test_dist_base.py
浏览文件 @
ee16006b
...
@@ -15,6 +15,7 @@
...
@@ -15,6 +15,7 @@
from
__future__
import
print_function
from
__future__
import
print_function
import
time
import
time
import
ast
import
unittest
import
unittest
import
os
import
os
import
sys
import
sys
...
@@ -373,6 +374,10 @@ class TestDistRunnerBase(object):
...
@@ -373,6 +374,10 @@ class TestDistRunnerBase(object):
build_stra
.
enable_inplace
=
False
build_stra
.
enable_inplace
=
False
build_stra
.
memory_optimize
=
False
build_stra
.
memory_optimize
=
False
if
args
.
fuse_all_reduce
is
not
None
:
sys
.
stderr
.
write
(
'fuse_all_reduce={}'
.
format
(
args
.
fuse_all_reduce
))
build_stra
.
fuse_all_reduce_ops
=
args
.
fuse_all_reduce
if
args
.
hogwild
:
if
args
.
hogwild
:
build_stra
.
async_mode
=
True
build_stra
.
async_mode
=
True
...
@@ -620,6 +625,11 @@ def runtime_main(test_class):
...
@@ -620,6 +625,11 @@ def runtime_main(test_class):
type
=
bool
,
type
=
bool
,
default
=
False
)
default
=
False
)
parser
.
add_argument
(
'--sync_batch_norm'
,
action
=
'store_true'
)
parser
.
add_argument
(
'--sync_batch_norm'
,
action
=
'store_true'
)
parser
.
add_argument
(
'--fuse_all_reduce'
,
required
=
False
,
type
=
ast
.
literal_eval
,
default
=
None
)
args
=
parser
.
parse_args
()
args
=
parser
.
parse_args
()
...
@@ -688,6 +698,7 @@ class TestDistBase(unittest.TestCase):
...
@@ -688,6 +698,7 @@ class TestDistBase(unittest.TestCase):
self
.
_ut4grad_allreduce
=
False
self
.
_ut4grad_allreduce
=
False
self
.
_use_hallreduce
=
False
self
.
_use_hallreduce
=
False
self
.
_save_model
=
False
self
.
_save_model
=
False
self
.
_fuse_all_reduce
=
None
self
.
_setup_config
()
self
.
_setup_config
()
global
DIST_UT_PORT
global
DIST_UT_PORT
...
@@ -971,6 +982,9 @@ class TestDistBase(unittest.TestCase):
...
@@ -971,6 +982,9 @@ class TestDistBase(unittest.TestCase):
if
self
.
_enable_backward_deps
:
if
self
.
_enable_backward_deps
:
tr_cmd
+=
" --enable_backward_deps"
tr_cmd
+=
" --enable_backward_deps"
if
self
.
_fuse_all_reduce
is
not
None
:
tr_cmd
+=
" --fuse_all_reduce {}"
.
format
(
self
.
_fuse_all_reduce
)
if
self
.
_gpu_fleet_api
:
if
self
.
_gpu_fleet_api
:
tr_cmd
+=
" --gpu_fleet_api"
tr_cmd
+=
" --gpu_fleet_api"
if
self
.
_use_local_sgd
:
if
self
.
_use_local_sgd
:
...
...
python/paddle/fluid/tests/unittests/test_dist_mnist_gradient_merge.py
0 → 100644
浏览文件 @
ee16006b
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from
__future__
import
print_function
import
os
import
unittest
from
test_dist_base
import
TestDistBase
flag_name
=
os
.
path
.
splitext
(
__file__
)[
0
]
class
TestDistMnistGradMerge
(
TestDistBase
):
def
_setup_config
(
self
):
self
.
_sync_mode
=
True
self
.
_use_reduce
=
False
self
.
_nccl2_mode
=
True
def
test_dist_train
(
self
):
import
paddle.fluid
as
fluid
if
fluid
.
core
.
is_compiled_with_cuda
():
self
.
check_with_place
(
"dist_mnist_gradient_merge.py"
,
delta
=
1e-5
,
check_error_log
=
True
,
log_name
=
flag_name
)
class
TestDistMnistGradMergeNoFuse
(
TestDistBase
):
def
_setup_config
(
self
):
self
.
_sync_mode
=
True
self
.
_use_reduce
=
False
self
.
_nccl2_mode
=
True
self
.
_fuse_all_reduce
=
False
def
test_dist_train
(
self
):
import
paddle.fluid
as
fluid
if
fluid
.
core
.
is_compiled_with_cuda
():
self
.
check_with_place
(
"dist_mnist_gradient_merge.py"
,
delta
=
1e-5
,
check_error_log
=
True
,
log_name
=
flag_name
+
"_no_fuse"
)
if
__name__
==
"__main__"
:
unittest
.
main
()
python/paddle/fluid/tests/unittests/test_optimizer.py
浏览文件 @
ee16006b
...
@@ -25,6 +25,7 @@ import numpy as np
...
@@ -25,6 +25,7 @@ import numpy as np
from
paddle.fluid.backward
import
append_backward
from
paddle.fluid.backward
import
append_backward
from
paddle.fluid.framework
import
Program
,
program_guard
,
convert_np_dtype_to_dtype_
from
paddle.fluid.framework
import
Program
,
program_guard
,
convert_np_dtype_to_dtype_
import
paddle
import
paddle
paddle
.
enable_static
()
class
TestOptimizer
(
unittest
.
TestCase
):
class
TestOptimizer
(
unittest
.
TestCase
):
...
@@ -1011,37 +1012,34 @@ class TestGradientMergeOptimizer(unittest.TestCase):
...
@@ -1011,37 +1012,34 @@ class TestGradientMergeOptimizer(unittest.TestCase):
with
framework
.
program_guard
(
main_program
,
init_program
):
with
framework
.
program_guard
(
main_program
,
init_program
):
ops
,
params_grads
=
opt
.
minimize
(
cost
)
ops
,
params_grads
=
opt
.
minimize
(
cost
)
self
.
assertEqual
(
main_program
.
num_blocks
,
4
)
self
.
assertEqual
(
main_program
.
num_blocks
,
2
)
# main block
# main block
self
.
assertEqual
(
len
(
cost
.
block
.
ops
),
17
)
self
.
assertEqual
(
len
(
cost
.
block
.
ops
),
13
)
self
.
assertEqual
([
op
.
type
for
op
in
cost
.
block
.
ops
],
[
self
.
assertEqual
(
'mul'
,
'elementwise_add'
,
'mean'
,
'fill_constant'
,
'mean_grad'
,
[
op
.
type
for
op
in
cost
.
block
.
ops
],
'elementwise_add_grad'
,
'mul_grad'
,
'increment'
,
'fill_constant'
,
[
'fill_constant'
,
'elementwise_mod'
,
'cast'
,
'not_equal'
,
'mul'
,
'logical_not'
,
'conditional_block'
,
'conditional_block'
,
'elementwise_add'
,
'conditional_block_grad'
'mean'
,
])
'fill_constant'
,
'mean_grad'
,
'elementwise_add_grad'
,
'mul_grad'
,
'increment'
,
# step += 1
'elementwise_mod'
,
# step %= k_steps
'equal'
,
# cond_var == (step == 0)
'elementwise_add'
,
'elementwise_add'
,
'conditional_block'
,
])
#
merg
e block
#
optimiz
e block
self
.
assertEqual
(
len
(
main_program
.
block
(
1
).
ops
),
2
)
self
.
assertEqual
(
len
(
main_program
.
block
(
1
).
ops
),
6
)
self
.
assertEqual
([
op
.
type
for
op
in
main_program
.
block
(
1
).
ops
],
[
self
.
assertEqual
([
op
.
type
for
op
in
main_program
.
block
(
1
).
ops
],
[
'elementwise_add'
,
'scale'
,
'scale'
,
'sgd'
,
'sgd'
,
'fill_constant'
,
'fill_constant'
'elementwise_add'
,
])
])
# reset block
self
.
assertEqual
(
len
(
main_program
.
block
(
2
).
ops
),
6
)
self
.
assertEqual
([
op
.
type
for
op
in
main_program
.
block
(
2
).
ops
],
[
'elementwise_add'
,
'scale'
,
'elementwise_add'
,
'scale'
,
'fill_constant'
,
'fill_constant'
])
# optimize block
self
.
assertEqual
(
len
(
main_program
.
block
(
3
).
ops
),
2
)
self
.
assertEqual
([
op
.
type
for
op
in
main_program
.
block
(
3
).
ops
],
[
'sgd'
,
'sgd'
])
class
TestOptimizerDtype
(
unittest
.
TestCase
):
class
TestOptimizerDtype
(
unittest
.
TestCase
):
'''
'''
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录