test_ascend_group.py 3.4 KB
Newer Older
G
gongweibao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import time
import paddle.fluid as fluid
from paddle.fluid import unique_name
import paddle.fluid.core as core
import paddle
from paddle.fluid.layer_helper import LayerHelper

paddle.enable_static()

OpRole = core.op_proto_and_checker_maker.OpRole
OP_ROLE_KEY = core.op_proto_and_checker_maker.kOpRoleAttrName()
OP_ROLE_VAR_KEY = core.op_proto_and_checker_maker.kOpRoleVarAttrName()

def init_communicator(startup_program, main_program, current_endpoint, endpoints, ring_id):
    nranks = len(endpoints)
    other_endpoints = endpoints[:]
    other_endpoints.remove(current_endpoint)
    group_rank=endpoints.index(current_endpoint)
    assert group_rank >=0

    block = startup_program.global_block()
    nccl_id_var = block.create_var(
        name=unique_name.generate('nccl_id'),
        persistable=True,
        type=core.VarDesc.VarType.RAW)
    block.append_op(
        type='c_gen_nccl_id',
        inputs={},
        outputs={'Out': nccl_id_var},
        attrs={
            'rank': group_rank,
            'endpoint': current_endpoint,
            'other_endpoints': other_endpoints,
            OP_ROLE_KEY: OpRole.Forward,
        })
    block.append_op(
        type='c_comm_init',
        inputs={'X': nccl_id_var},
        outputs={},
        attrs={
            'nranks': nranks,
            'rank': group_rank,
            'ring_id': ring_id,
            OP_ROLE_KEY: OpRole.Forward,
        })
    block.create_var(
        name="data",
        persistable=True,
        dtype='float32')

    with fluid.program_guard(main_program):
        op_type="c_allreduce_sum"
        data=fluid.layers.fill_constant(shape=[1], dtype='float32', value=2.5)
        helper = LayerHelper(op_type, **locals())
        helper.append_op(
            type=op_type,
            inputs={'X': [data]},
            outputs={'Out': [data]},
            attrs={'ring_id': ring_id,
                   'use_calc_stream': True})

def train():
    startup_programs=[]
    main_programs=[]


    trainer_endpoints=["127.0.0.1:6071","127.0.0.1:6072","127.0.0.1:6073","127.0.0.1:6074"]
    groups=[[], [], []]
    groups[0]=[trainer_endpoints[0], trainer_endpoints[1]]
    groups[1]=[trainer_endpoints[2], trainer_endpoints[3]]
    groups[2]=[trainer_endpoints[0], trainer_endpoints[2]]

    for i in range(len(trainer_endpoints)):
        startup_programs.append(fluid.Program())
        main_programs.append(fluid.Program())

    for idx, group in enumerate(groups):
        for te in group:
            te_idx = trainer_endpoints.index(te)
            startup_program = startup_programs[te_idx]
            main_program=main_programs[te_idx]
            init_communicator(startup_program, main_program, te, group, idx)

    print(len(startup_programs))
    print(startup_programs[0])
    print(main_programs[0])

train()