未验证 提交 0a3fd011 编写于 作者: 乔龙飞 Qiao Longfei 提交者: GitHub

Merge pull request #15661 from jacquesqiao/fix-cpu-broadcast

cpu reduce mode did not need to broadcast params test=develop
...@@ -133,12 +133,15 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { ...@@ -133,12 +133,15 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
void AppendMultiDevPass(const BuildStrategy &strategy) { void AppendMultiDevPass(const BuildStrategy &strategy) {
ir::Pass *multi_devices_pass; ir::Pass *multi_devices_pass;
if (strategy_.is_distribution_) { if (strategy_.is_distribution_) {
VLOG(3) << "multi device dist train mode";
multi_devices_pass = AppendPass("dist_multi_devices_pass").get(); multi_devices_pass = AppendPass("dist_multi_devices_pass").get();
} else { } else {
if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce) { if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce) {
VLOG(3) << "multi device allreduce mode";
multi_devices_pass = multi_devices_pass =
AppendPass("allreduce_mode_multi_devices_pass").get(); AppendPass("allreduce_mode_multi_devices_pass").get();
} else if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kReduce) { } else if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kReduce) {
VLOG(3) << "multi device reduce mode";
multi_devices_pass = AppendPass("reduce_mode_multi_devices_pass").get(); multi_devices_pass = AppendPass("reduce_mode_multi_devices_pass").get();
} else { } else {
PADDLE_THROW("Unknown reduce strategy."); PADDLE_THROW("Unknown reduce strategy.");
......
...@@ -731,7 +731,6 @@ bool DistSSAGraphBuilder::DealWithSpecialOp(ir::Graph *result, ...@@ -731,7 +731,6 @@ bool DistSSAGraphBuilder::DealWithSpecialOp(ir::Graph *result,
} }
} }
insert_op = true; insert_op = true;
need_broadcast_var_ = true;
} else if (OpHaveRole(*node, OpRole::kDist)) { } else if (OpHaveRole(*node, OpRole::kDist)) {
int op_dev_id = CreateDistTrainOp(result, node); int op_dev_id = CreateDistTrainOp(result, node);
if (node->Op()->Type() == "concat") { if (node->Op()->Type() == "concat") {
...@@ -925,9 +924,8 @@ void DistSSAGraphBuilder::InsertCollectiveOp(ir::Graph *result, ...@@ -925,9 +924,8 @@ void DistSSAGraphBuilder::InsertCollectiveOp(ir::Graph *result,
} }
void DistSSAGraphBuilder::InsertPostprocessOps(ir::Graph *result) const { void DistSSAGraphBuilder::InsertPostprocessOps(ir::Graph *result) const {
if (need_broadcast_var_ || // only GPU reduce mode need to broadcast parameters to each device.
(UseGPU() && if (UseGPU() && strategy_.reduce_ == BuildStrategy::ReduceStrategy::kReduce) {
strategy_.reduce_ == BuildStrategy::ReduceStrategy::kReduce)) {
if (strategy_.fuse_broadcast_op_) { if (strategy_.fuse_broadcast_op_) {
CreateFusedBroadcastOp(result, bcast_var_name_set_); CreateFusedBroadcastOp(result, bcast_var_name_set_);
} else { } else {
......
...@@ -174,7 +174,6 @@ class DistSSAGraphBuilder : public BalanceVarSSAGraphBuilder { ...@@ -174,7 +174,6 @@ class DistSSAGraphBuilder : public BalanceVarSSAGraphBuilder {
int CreateDistTrainOp(ir::Graph *result, ir::Node *node) const; int CreateDistTrainOp(ir::Graph *result, ir::Node *node) const;
mutable std::vector<std::unordered_set<std::string>> bcast_var_name_set_; mutable std::vector<std::unordered_set<std::string>> bcast_var_name_set_;
mutable bool need_broadcast_var_{false};
}; };
std::unordered_set<std::string> &MultiDevSSAGraphBuilder(); std::unordered_set<std::string> &MultiDevSSAGraphBuilder();
......
...@@ -19,6 +19,7 @@ import sys ...@@ -19,6 +19,7 @@ import sys
from .. import compat as cpt from .. import compat as cpt
from . import core from . import core
from . import framework
__all__ = ['CompiledProgram', 'ExecutionStrategy', 'BuildStrategy'] __all__ = ['CompiledProgram', 'ExecutionStrategy', 'BuildStrategy']
...@@ -34,6 +35,15 @@ def _place_obj(place): ...@@ -34,6 +35,15 @@ def _place_obj(place):
return p return p
def _is_pserver_mode(main_program):
main = main_program if main_program \
else framework.default_main_program()
for op in main.global_block().ops:
if op.type in ["send", "recv"]:
return True
return False
class CompiledProgram(object): class CompiledProgram(object):
""" """
Compiles a Program for execution. Compiles a Program for execution.
...@@ -110,6 +120,7 @@ class CompiledProgram(object): ...@@ -110,6 +120,7 @@ class CompiledProgram(object):
self._exec_strategy = ExecutionStrategy() self._exec_strategy = ExecutionStrategy()
if self._build_strategy is None: if self._build_strategy is None:
self._build_strategy = BuildStrategy() self._build_strategy = BuildStrategy()
self._build_strategy.is_distribution = _is_pserver_mode(self._program)
return self return self
def with_inference_optimize(self, config): def with_inference_optimize(self, config):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册