diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index 1cf4eb6c2989346c9e9acef648aa74615c7bcb10..d42bd0b16d7a84987517326af9567809fd29da4d 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -45,14 +45,35 @@ inline void InitVarsInScope(const std::vector &var_infos, Scope *scope, // get CommContext and remote send and recv op void ProcessGraph(std::vector graphs, Scope *scope) { #ifdef PADDLE_WITH_DISTRIBUTE - // init communicator here - auto *instance = operators::distributed::Communicator::GetInstance(); - auto initialized = instance ? true : false; - PADDLE_ENFORCE_EQ(initialized, true, - platform::errors::InvalidArgument( - "Communicator is not Initialized, you may use " - "FleetAPI(https://github.com/PaddlePaddle/Fleet/tree/" - "develop/markdown_doc/transpiler)")); + + bool need_communicator = false; + + for (auto &node : graphs[0]->Nodes()) { + VLOG(3) << "node name " << node->Name(); + if (node && node->IsOp()) { + if (node->Name() == "send") { + auto send_varnames = + BOOST_GET_CONST(std::vector, + node->Op()->GetNullableAttr("send_varnames")); + + if (send_varnames.size() > 0) { + need_communicator = true; + break; + } + } + } + } + + if (need_communicator) { + // init communicator here + auto *instance = operators::distributed::Communicator::GetInstance(); + auto initialized = instance ? true : false; + PADDLE_ENFORCE_EQ(initialized, true, + platform::errors::InvalidArgument( + "Communicator is not Initialized, you may use " + "FleetAPI(https://github.com/PaddlePaddle/Fleet/tree/" + "develop/markdown_doc/transpiler)")); + } #endif } diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py index a7d86411e203728116604ffafddf36a1cfaed9b3..d2c7397c85f8df155444d9272c7b75596f0fe169 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py @@ -579,7 +579,7 @@ class FleetTranspiler(Fleet): block.append_op( type='recv_save', attrs={ - "trainer_id": self._role_maker.worker_id(), + "trainer_id": self._role_maker.worker_index(), "shape": var.shape, "slice_shapes": [",".join([str(i) for i in var.shape])], diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py index 2056e3deb18476748df0e16bc18b59f0a1074d55..b96eff19e9b9c5d8e78b85e61b9a69afee106546 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py @@ -329,7 +329,7 @@ class CompileTimeStrategy(object): is_distributed = True if param_name in distibuted_varnames else False - ctx = self.build_ctx(grad, self.grad_var_mapping, True, False, + ctx = self.build_ctx(grad, self.grad_var_mapping, True, True, True, is_distributed) send_ctx[ctx.var_name()] = ctx