未验证 提交 ec8e8782 编写于 作者: 乔龙飞 Qiao Longfei 提交者: GitHub

Merge pull request #15840 from jacquesqiao/revert-15684-revert-15661-fix-cpu-broadcast

fix cpu broadcast
...@@ -135,12 +135,15 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { ...@@ -135,12 +135,15 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
void AppendMultiDevPass(const BuildStrategy &strategy) { void AppendMultiDevPass(const BuildStrategy &strategy) {
ir::Pass *multi_devices_pass; ir::Pass *multi_devices_pass;
if (strategy_.is_distribution_) { if (strategy_.is_distribution_) {
VLOG(3) << "multi device parameter server mode";
multi_devices_pass = AppendPass("dist_multi_devices_pass").get(); multi_devices_pass = AppendPass("dist_multi_devices_pass").get();
} else { } else {
if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce) { if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce) {
VLOG(3) << "multi devices collective mode with allreduce";
multi_devices_pass = multi_devices_pass =
AppendPass("allreduce_mode_multi_devices_pass").get(); AppendPass("allreduce_mode_multi_devices_pass").get();
} else if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kReduce) { } else if (strategy.reduce_ == BuildStrategy::ReduceStrategy::kReduce) {
VLOG(3) << "multi deivces collective mode with reduce";
multi_devices_pass = AppendPass("reduce_mode_multi_devices_pass").get(); multi_devices_pass = AppendPass("reduce_mode_multi_devices_pass").get();
} else { } else {
PADDLE_THROW("Unknown reduce strategy."); PADDLE_THROW("Unknown reduce strategy.");
......
...@@ -937,9 +937,21 @@ void DistSSAGraphBuilder::InsertCollectiveOp(ir::Graph *result, ...@@ -937,9 +937,21 @@ void DistSSAGraphBuilder::InsertCollectiveOp(ir::Graph *result,
} }
void DistSSAGraphBuilder::InsertPostprocessOps(ir::Graph *result) const { void DistSSAGraphBuilder::InsertPostprocessOps(ir::Graph *result) const {
if (need_broadcast_var_ || // broad cast received parameters when training in parameter server mode.
(UseGPU() && if (need_broadcast_var_) {
strategy_.reduce_ == BuildStrategy::ReduceStrategy::kReduce)) { // There are 4 conditions:
// 1. GPU && Reduce: Reduce gradient then broadcast gradient to other GPUS.
// Need to broadcast received parameters to other GPU.
// 2. GPU && AllReduce: AllReduce all graident to each GPU. Need to
// broadcast received parameters to other GPU.
// 3. CPU && AllReduce: AllReduce all gradient to each thread. Need to
// broadcast received parameters to other scope.
// 4. CPU && Reduce: because all parameters share the same memory, did not
// broadcast received parameters.
if (!UseGPU() &&
strategy_.reduce_ == BuildStrategy::ReduceStrategy::kReduce) {
return;
}
if (strategy_.fuse_broadcast_op_) { if (strategy_.fuse_broadcast_op_) {
CreateFusedBroadcastOp(result, bcast_var_name_set_); CreateFusedBroadcastOp(result, bcast_var_name_set_);
} else { } else {
......
...@@ -19,6 +19,7 @@ import sys ...@@ -19,6 +19,7 @@ import sys
from .. import compat as cpt from .. import compat as cpt
from . import core from . import core
from . import framework
__all__ = ['CompiledProgram', 'ExecutionStrategy', 'BuildStrategy'] __all__ = ['CompiledProgram', 'ExecutionStrategy', 'BuildStrategy']
...@@ -110,6 +111,8 @@ class CompiledProgram(object): ...@@ -110,6 +111,8 @@ class CompiledProgram(object):
self._exec_strategy = ExecutionStrategy() self._exec_strategy = ExecutionStrategy()
if self._build_strategy is None: if self._build_strategy is None:
self._build_strategy = BuildStrategy() self._build_strategy = BuildStrategy()
self._build_strategy.is_distribution = framework.is_pserver_mode(
self._program)
return self return self
def with_inference_optimize(self, config): def with_inference_optimize(self, config):
......
...@@ -87,6 +87,15 @@ def _current_expected_place(): ...@@ -87,6 +87,15 @@ def _current_expected_place():
return _imperative_current_expected_place_ return _imperative_current_expected_place_
def is_pserver_mode(main_program):
main = main_program if main_program \
else default_main_program()
for op in main.global_block().ops:
if op.type in ["send", "recv"]:
return True
return False
class NameScope(object): class NameScope(object):
def __init__(self, name="", parent=None): def __init__(self, name="", parent=None):
self._children = dict() self._children = dict()
......
...@@ -29,15 +29,6 @@ ExecutionStrategy = core.ParallelExecutor.ExecutionStrategy ...@@ -29,15 +29,6 @@ ExecutionStrategy = core.ParallelExecutor.ExecutionStrategy
BuildStrategy = core.ParallelExecutor.BuildStrategy BuildStrategy = core.ParallelExecutor.BuildStrategy
def _is_pserver_mode(main_program):
main = main_program if main_program \
else framework.default_main_program()
for op in main.global_block().ops:
if op.type in ["send", "recv"]:
return True
return False
class ParallelExecutor(object): class ParallelExecutor(object):
""" """
ParallelExecutor is designed for data parallelism, which focuses on distributing ParallelExecutor is designed for data parallelism, which focuses on distributing
...@@ -140,7 +131,7 @@ class ParallelExecutor(object): ...@@ -140,7 +131,7 @@ class ParallelExecutor(object):
# FIXME(zcd): is_distribution_ is a temporary field, because in pserver mode, # FIXME(zcd): is_distribution_ is a temporary field, because in pserver mode,
# num_trainers is 1, so the current fields of build_strategy doesn't tell if # num_trainers is 1, so the current fields of build_strategy doesn't tell if
# it's distributed model. # it's distributed model.
build_strategy.is_distribution = _is_pserver_mode( build_strategy.is_distribution = framework.is_pserver_mode(
main_program) or num_trainers > 1 main_program) or num_trainers > 1
# step4: get main_program, scope, local_scopes # step4: get main_program, scope, local_scopes
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册