diff --git a/paddle/fluid/operators/nccl_op.cu.cc b/paddle/fluid/operators/nccl_op.cu.cc index 7637c7ed163f64502fcbd1dae12d373067ce5787..333aed2903e7873aa799bd34468b2e05ef2e556c 100644 --- a/paddle/fluid/operators/nccl_op.cu.cc +++ b/paddle/fluid/operators/nccl_op.cu.cc @@ -47,11 +47,8 @@ class NCCLAllReduceKernel : public framework::OpKernel { auto ins = ctx.MultiInput("X"); auto outs = ctx.MultiOutput("Out"); - LOG(INFO) << "------------------"; std::string reduction = ctx.Attr("reduction"); - LOG(INFO) << "------------------"; ncclRedOp_t reduction_op_ = ncclSum; - LOG(INFO) << "------------------"; if (reduction == "ncclMin") { reduction_op_ = ncclMin; @@ -65,19 +62,14 @@ class NCCLAllReduceKernel : public framework::OpKernel { PADDLE_THROW("Invalid reduction. default ncclSum."); } - LOG(INFO) << "------------------"; auto* comm = ctx.Input("Communicator"); - LOG(INFO) << "------------------"; auto stream = ctx.cuda_device_context().stream(); - LOG(INFO) << "------------------"; // device id int gpu_id = boost::get(ctx.GetPlace()).GetDeviceId(); - LOG(INFO) << "------------------"; int idx = comm->GetCommId(gpu_id); - LOG(INFO) << "------------------"; for (size_t i = 0; i < ins.size(); ++i) { VLOG(1) << "gpu : " << " invoke allreduce. send " << ins[i]->numel() << " recv " diff --git a/paddle/fluid/operators/parallel_do_op.cc b/paddle/fluid/operators/parallel_do_op.cc index ff5730bfe77a909ae59e18361c6157831c7eff23..f808c7130668c8f38bd1a195ab0a9bc80d2887a5 100644 --- a/paddle/fluid/operators/parallel_do_op.cc +++ b/paddle/fluid/operators/parallel_do_op.cc @@ -151,7 +151,6 @@ class ParallelDoOp : public framework::OperatorBase { } WaitOnPlaces(places); - // PADDLE_ENFORCE_EQ(places.size(), sub_scopes.size()); std::vector> workers; workers.reserve(places.size()); for (size_t place_idx = 0; place_idx < sub_scopes.size(); ++place_idx) { @@ -219,21 +218,18 @@ class ParallelDoGradOp : public framework::OperatorBase { auto &sub_scopes = scope.FindVar(Input(kParallelScopes)) ->Get>(); auto &places = scope.FindVar(Input(kPlaces))->Get(); - // PADDLE_ENFORCE_EQ(places.size(), sub_scopes.size()); // feed output@grad SplitTensorAndMoveTensorToScopes( scope, const_cast *>(&sub_scopes), places, Inputs(framework::GradVarName(kOutputs))); WaitOnPlaces(places); - LOG(INFO) << "places " << places.size(); // exe run std::vector> workers; for (size_t i = 0; i < sub_scopes.size(); ++i) { auto &place = places[i]; auto *cur_scope = sub_scopes[i]; - LOG(INFO) << place; // execute workers.emplace_back(framework::Async([program, cur_scope, place, block] { @@ -242,7 +238,6 @@ class ParallelDoGradOp : public framework::OperatorBase { false /*create_local_scope*/); })); } - LOG(INFO) << "places " << places.size(); for (auto &worker : workers) { worker.wait(); } diff --git a/python/paddle/v2/fluid/backward.py b/python/paddle/v2/fluid/backward.py index 8ec9db81b38a70c12e883ed340199578575dec0c..6da4325c64f4ee7c715bfe97a083147251dc629e 100644 --- a/python/paddle/v2/fluid/backward.py +++ b/python/paddle/v2/fluid/backward.py @@ -230,44 +230,19 @@ def _callback_lookup_(op): def __call__(self, block, context): if not self.has_inserted_nccl_init: - # global_block = block.program.global_block() - # op_desc = global_block.desc.append_op() - # var_desc = global_block.desc.var('nccl_com__do_not_change_') - # var_desc.set_type(core.VarDesc.VarType.NCCL_COM) - # self.nccl_com = global_block.create_var( - # name='nccl_com', type=core.VarDesc.VarType.NCCL_COM) - # framework.Operator( - # global_block, - # type='ncclInit', - # desc=op_desc, - # inputs={}, - # outputs={'Communicator': [self.nccl_com]}) op_desc = _create_op_desc_( "ncclInit", {"parallel_scopes": self.parallel_scopes_name}, {"Communicator": ['nccl_com__do_not_change_']}, {}) - # block.desc.append_op().copy_from(op_desc) print(serialize_op_decs(op_desc)) block.program.global_block().desc.append_op().copy_from( op_desc) self.has_inserted_nccl_init = True current_op_desc = context["__current_op_desc__"] - # print(serialize_op_decs(context)) for o_param in current_op_desc.output_names(): for o_argu in current_op_desc.output(o_param): if o_argu in self.param_grad_names: - # # print("reduce", o_argu) - # op_desc = block.desc.append_op() - # op_desc.set_type("ncclAllReduce") - # op_desc.set_input("X", [o_argu]) - # - # # FIXME(tonyyang-svail): - # # Looks like nccl_com has been changed to nccl_com_0 - # op_desc.set_input("Communicator", ['nccl_com_0']) - # out_var = block.create_var() - # op_desc.set_output("Out", [out_var.name]) - # op_desc.set_attr("reduction", "ncclSum") allreduce_out_name = o_argu + "__nccl_all_reduce__" op_desc = _create_op_desc_( "ncclAllReduce", { diff --git a/python/paddle/v2/fluid/tests/test_parallel_op.py b/python/paddle/v2/fluid/tests/test_parallel_op.py index 2914c8dbaaebbf58349051ec65d785ab8d0b7df2..66bb6442af9c5b9bb6fb445bf8700127b2c95450 100644 --- a/python/paddle/v2/fluid/tests/test_parallel_op.py +++ b/python/paddle/v2/fluid/tests/test_parallel_op.py @@ -175,7 +175,9 @@ class ParallelOpTest(BaseParallelForTest): def test_simple_fc(self): self.run_test( callback=self.__network__, - feed={'img': numpy.random.random(size=(8, 784)).astype('float32')}, + feed={ + 'img': numpy.random.random(size=(51, 784)).astype('float32') + }, fetch=['fc1.w@GRAD']) def test_fc_with_tiny_data(self):