diff --git a/paddle/fluid/operators/collective/CMakeLists.txt b/paddle/fluid/operators/collective/CMakeLists.txt index 55ad52f6447174aedb9db2a995c3f8c4978b3013..a119518c76d34686c0b227dbe913a9c71b453337 100644 --- a/paddle/fluid/operators/collective/CMakeLists.txt +++ b/paddle/fluid/operators/collective/CMakeLists.txt @@ -22,6 +22,7 @@ endif() if(WITH_ASCEND) op_library(gen_nccl_id_op) + op_library(c_gen_nccl_id_op) endif() diff --git a/paddle/fluid/operators/collective/c_gen_nccl_id_op.cc b/paddle/fluid/operators/collective/c_gen_nccl_id_op.cc index 26f639ebc98b9e4bc7a85546e224898e00a50a0d..18f2a40f3ddd0b8726f698e0484dc00c7302cf72 100644 --- a/paddle/fluid/operators/collective/c_gen_nccl_id_op.cc +++ b/paddle/fluid/operators/collective/c_gen_nccl_id_op.cc @@ -23,11 +23,15 @@ limitations under the License. */ #include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/place.h" +#ifdef PADDLE_WITH_NCCL #include "paddle/fluid/operators/collective/gen_nccl_id_op_helper.h" +#endif namespace paddle { namespace operators { +#ifdef PADDLE_WITH_NCCL + class CGenNCCLIdOp : public framework::OperatorBase { public: CGenNCCLIdOp(const std::string& type, @@ -57,6 +61,22 @@ class CGenNCCLIdOp : public framework::OperatorBase { } }; +#else +class CGenNCCLIdOp : public framework::OperatorBase { + public: + CGenNCCLIdOp(const std::string& type, + const framework::VariableNameMap& inputs, + const framework::VariableNameMap& outputs, + const framework::AttributeMap& attrs) + : OperatorBase(type, inputs, outputs, attrs) {} + + void RunImpl(const framework::Scope& scope, + const platform::Place& dev_place) const override { + } +}; + +#endif + class CGenNCCLIdOpMaker : public framework::OpProtoAndCheckerMaker { public: void Make() override { diff --git a/python/paddle/fluid/tests/unittests/test_ascend_group.py b/python/paddle/fluid/tests/unittests/test_ascend_group.py new file mode 100644 index 0000000000000000000000000000000000000000..0ed9fc0b92bcf57fc92828d74ae9eae559c45b93 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_ascend_group.py @@ -0,0 +1,104 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import time +import paddle.fluid as fluid +from paddle.fluid import unique_name +import paddle.fluid.core as core +import paddle +from paddle.fluid.layer_helper import LayerHelper + +paddle.enable_static() + +OpRole = core.op_proto_and_checker_maker.OpRole +OP_ROLE_KEY = core.op_proto_and_checker_maker.kOpRoleAttrName() +OP_ROLE_VAR_KEY = core.op_proto_and_checker_maker.kOpRoleVarAttrName() + +def init_communicator(startup_program, main_program, current_endpoint, endpoints, ring_id): + nranks = len(endpoints) + other_endpoints = endpoints[:] + other_endpoints.remove(current_endpoint) + group_rank=endpoints.index(current_endpoint) + assert group_rank >=0 + + block = startup_program.global_block() + nccl_id_var = block.create_var( + name=unique_name.generate('nccl_id'), + persistable=True, + type=core.VarDesc.VarType.RAW) + block.append_op( + type='c_gen_nccl_id', + inputs={}, + outputs={'Out': nccl_id_var}, + attrs={ + 'rank': group_rank, + 'endpoint': current_endpoint, + 'other_endpoints': other_endpoints, + OP_ROLE_KEY: OpRole.Forward, + }) + block.append_op( + type='c_comm_init', + inputs={'X': nccl_id_var}, + outputs={}, + attrs={ + 'nranks': nranks, + 'rank': group_rank, + 'ring_id': ring_id, + OP_ROLE_KEY: OpRole.Forward, + }) + block.create_var( + name="data", + persistable=True, + dtype='float32') + + with fluid.program_guard(main_program): + op_type="c_allreduce_sum" + data=fluid.layers.fill_constant(shape=[1], dtype='float32', value=2.5) + helper = LayerHelper(op_type, **locals()) + helper.append_op( + type=op_type, + inputs={'X': [data]}, + outputs={'Out': [data]}, + attrs={'ring_id': ring_id, + 'use_calc_stream': True}) + +def train(): + startup_programs=[] + main_programs=[] + + + trainer_endpoints=["127.0.0.1:6071","127.0.0.1:6072","127.0.0.1:6073","127.0.0.1:6074"] + groups=[[], [], []] + groups[0]=[trainer_endpoints[0], trainer_endpoints[1]] + groups[1]=[trainer_endpoints[2], trainer_endpoints[3]] + groups[2]=[trainer_endpoints[0], trainer_endpoints[2]] + + for i in range(len(trainer_endpoints)): + startup_programs.append(fluid.Program()) + main_programs.append(fluid.Program()) + + for idx, group in enumerate(groups): + for te in group: + te_idx = trainer_endpoints.index(te) + startup_program = startup_programs[te_idx] + main_program=main_programs[te_idx] + init_communicator(startup_program, main_program, te, group, idx) + + print(len(startup_programs)) + print(startup_programs[0]) + print(main_programs[0]) + +train()