diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 40de26bdd08dab610e98a7fde340c6543cf1e85e..25b31f8636136e4b787e7e3892498fc52246cb1f 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -229,8 +229,15 @@ class ParallelExecutorPrivate { // TODO(yy): Move this function somewhere ncclDataType_t ToNCCLDataType(std::type_index type) { - // FIXME!! - return ncclFloat; + if (type == typeid(float)) { // NOLINT + return ncclFloat; + } else if (type == typeid(double)) { // NOLINT + return ncclDouble; + } else if (type == typeid(int)) { // NOLINT + return ncclInt; + } else { + PADDLE_THROW("Not supported"); + } } ParallelExecutor::ParallelExecutor( @@ -479,30 +486,32 @@ void ParallelExecutor::BCastParamsToGPUs( ncclDataType_t data_type = ToNCCLDataType(main_tensor.type()); auto &dims = main_tensor.dims(); size_t numel = main_tensor.numel(); - std::vector> - mems; - mems.emplace_back(const_cast(main_tensor.data()), - &member_->GetNCCLCtx(member_->main_place_)); - for (auto &pair : member_->local_scopes_) { - if (pair.first == member_->main_place_) { - continue; - } + platform::dynload::ncclGroupStart(); + for (auto &pair : member_->local_scopes_) { auto local_scope = pair.second; auto *t = local_scope->Var(var_desc->Name())->GetMutable(); t->Resize(dims); - mems.emplace_back(t->mutable_data(pair.first, main_tensor.type()), - &member_->GetNCCLCtx(member_->main_place_)); + auto &nccl_ctx = member_->GetNCCLCtx(pair.first); + platform::dynload::ncclBcast( + t->mutable_data(pair.first, main_tensor.type()), numel, data_type, + 0, nccl_ctx.comm, nccl_ctx.stream()); } + platform::dynload::ncclGroupEnd(); + } + } - // TODO(yy): Invoke ncclBCast here. mems, numel, data_type. The mems[0] - // is the src, rests are dests. + for (auto &pair : member_->local_scopes_) { + member_->GetNCCLCtx(pair.first).ctx_->Wait(); - (void)(data_type); - (void)(numel); - } + auto &b = pair.second->FindVar("fc_1.b_0")->Get(); + framework::LoDTensor cpu; + framework::TensorCopy(b, platform::CPUPlace(), &cpu); + platform::DeviceContextPool::Instance().Get(b.place())->Wait(); + LOG(INFO) << *cpu.data(); } + #else PADDLE_THROW("Not compiled with CUDA"); #endif diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py index 3604fdb2850706e331d38d266f8f0051ba8299c9..85a9f7697fa03840fed2ec9c1cacfc2c246e0b45 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py @@ -52,7 +52,7 @@ class ParallelExecutor(unittest.TestCase): adam = fluid.optimizer.Adam() adam.minimize(loss) act_places = [] - for each in [fluid.CUDAPlace(0)]: + for each in [fluid.CUDAPlace(0), fluid.CUDAPlace(1)]: p = fluid.core.Place() p.set_place(each) act_places.append(p)