diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index ba35de8dddfa51063905dbc8c02b8ba378e9eab1..b993330b24f8fe04a00c94399344a1f906a6fcd5 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -872,14 +872,8 @@ void ParallelExecutor::BCastParamsToDevices( std::vector buffers; buffers.reserve(member_->places_.size()); size_t numel = main_tensor.numel(); - // TODO(liuyuhui): BKCL only support parameters using float type, - // other parameters need to be strongly converted to float before - // broadcasting, - // but broadcast is equivalent to no type of operation, does not affect - // correctness. - BKCLDataType data_type = BKCL_FLOAT; - // BKCLDataType data_type = - // platform::ToBKCLDataType(framework::TransToProtoVarType(main_tensor.dtype())); + auto dtype = framework::TransToProtoVarType(main_tensor.dtype()); + BKCLDataType data_type = platform::ToBKCLDataType(dtype); for (size_t i = 0; i < member_->places_.size(); ++i) { auto place = member_->places_[i]; void *buffer; @@ -904,33 +898,21 @@ void ParallelExecutor::BCastParamsToDevices( member_->places_.size())); { auto *bkcl_ctxs = member_->bkcl_ctxs_->DefaultFlatCtx(); - - PADDLE_ENFORCE_EQ( - bkcl_group_start(), - BKCL_SUCCESS, - platform::errors::Unavailable("bkcl_group_start failed")); + platform::BKCLGroupGuard guard; for (size_t i = 0; i < member_->places_.size(); ++i) { auto &bkcl_ctx = bkcl_ctxs->at(member_->places_[i]); - auto broadcast_numel = numel; - if (framework::TransToProtoVarType(main_tensor.dtype()) == - framework::proto::VarType::INT64) { - broadcast_numel *= 2; - } PADDLE_ENFORCE_EQ( bkcl_broadcast(bkcl_ctx.comm(), buffers[i], buffers[i], - broadcast_numel, + numel, data_type, 0, NULL), BKCL_SUCCESS, platform::errors::Unavailable("bkcl_broadcast failed")); } - PADDLE_ENFORCE_EQ( - bkcl_group_end(), - BKCL_SUCCESS, - platform::errors::Unavailable("bkcl_group_end failed")); + bkcl_ctxs->WaitAll(); } #else PADDLE_THROW( diff --git a/paddle/fluid/imperative/reducer.cc b/paddle/fluid/imperative/reducer.cc index 106cff0b6c7e31959c4273223b44ee43a5c49fcf..659a27bb8c95b14c3ae24a2a53f6089aa4f5815e 100644 --- a/paddle/fluid/imperative/reducer.cc +++ b/paddle/fluid/imperative/reducer.cc @@ -819,8 +819,6 @@ void Reducer::MarkVarReady(const size_t var_index, const bool is_used_var) { } } -// TODO(liuyuhui): If BKCL support non-blocking communication, it should be -// fixed as same as multi gpus card training. void Reducer::MarkGroupReady(size_t group_index) { PADDLE_ENFORCE_GE( group_index, diff --git a/python/paddle/distributed/parallel.py b/python/paddle/distributed/parallel.py index 1d5d07f01cbd69d82bedcd18810a50afffc5c701..cc6ab5384ca4ef99e85495ab8e792d5516656f1d 100644 --- a/python/paddle/distributed/parallel.py +++ b/python/paddle/distributed/parallel.py @@ -397,10 +397,7 @@ class DataParallel(layers.Layer): ), "ProcessGroup must be an instance of Group in DataParallel." # sync buffer and params - # TODO(liuyuhui) Currently not support xpu. xpu is - # still broadcasting parameters when calling layer - if not paddle.is_compiled_with_xpu(): - sync_params_buffers(self._layers) + sync_params_buffers(self._layers) self.comm_buffer_size = int(comm_buffer_size * 1024 * 1024) # NOTE(shenliang03): We can set environment variables to control diff --git a/python/paddle/nn/layer/layers.py b/python/paddle/nn/layer/layers.py index 6155fdae7eee1c6f95d3e7c07f5bf971d5548f34..abd15b00424898ded2d1a9d18e327b58319d75ca 100644 --- a/python/paddle/nn/layer/layers.py +++ b/python/paddle/nn/layer/layers.py @@ -1199,8 +1199,6 @@ class Layer: pass def _dygraph_call_func(self, *inputs, **kwargs): - from paddle.distributed import parallel_helper - for forward_pre_hook in self._forward_pre_hooks.values(): hook_result = forward_pre_hook(self, inputs) if hook_result is not None: @@ -1212,17 +1210,6 @@ class Layer: with program_desc_tracing_guard(False): self._build_once(*inputs, **kwargs) - # TODO(liuyuhui) Only xpu broadcast parameters here. - # The other device is to call _sync_params_buffers in DataParallel - # to realize the parameter synchronization among multiply cards. - if ( - parallel_helper._is_data_parallel_mode() - and paddle.is_compiled_with_xpu() - ): - parallel_helper._broadcast_parameters( - self._parameters.values() - ) - self._built = True if in_profiler_mode():