ascend_group.py 4.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import time
import paddle.fluid as fluid
from paddle.fluid import unique_name
import paddle.fluid.core as core
import paddle
from paddle.fluid.layer_helper import LayerHelper
from paddle.distributed import fleet
24 25 26 27 28
from paddle.distributed.fleet.meta_optimizers.ascend import ascend_parser, ascend_optimizer
from collections import namedtuple

Block = namedtuple('Block', ['program'])
Loss = namedtuple('Loss', ['block'])
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82

paddle.enable_static()

OpRole = core.op_proto_and_checker_maker.OpRole
OP_ROLE_KEY = core.op_proto_and_checker_maker.kOpRoleAttrName()
OP_ROLE_VAR_KEY = core.op_proto_and_checker_maker.kOpRoleVarAttrName()

role = fleet.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)

def init_communicator(startup_program, main_program, current_endpoint, endpoints, ring_id):
    nranks = len(endpoints)
    other_endpoints = endpoints[:]
    other_endpoints.remove(current_endpoint)
    group_rank=endpoints.index(current_endpoint)
    assert group_rank >=0

    block = startup_program.global_block()
    nccl_id_var = block.create_var(
        name=unique_name.generate('nccl_id'),
        persistable=True,
        type=core.VarDesc.VarType.RAW)
    block.append_op(
        type='c_gen_nccl_id',
        inputs={},
        outputs={'Out': nccl_id_var},
        attrs={
            'rank': group_rank,
            'endpoint': current_endpoint,
            'other_endpoints': other_endpoints,
            OP_ROLE_KEY: OpRole.Forward,
        })
    block.append_op(
        type='c_comm_init',
        inputs={'X': nccl_id_var},
        outputs={},
        attrs={
            'nranks': nranks,
            'rank': group_rank,
            'ring_id': ring_id,
            OP_ROLE_KEY: OpRole.Forward,
        })

    with fluid.program_guard(main_program):
        op_type="c_allreduce_sum"
        data=fluid.layers.fill_constant(shape=[1], dtype='float32', value=2.5)
        helper = LayerHelper(op_type, **locals())
        helper.append_op(
            type=op_type,
            inputs={'X': [data]},
            outputs={'Out': [data]},
            attrs={'ring_id': ring_id,
                   'use_calc_stream': True})

83 84 85
    print("startup program:", startup_program)
    print("main program:", main_program)

86 87 88 89 90 91 92 93 94 95
def train(world_endpoints, world_device_ids, local_device_ids,local_rank):
    startup_programs=[]
    main_programs=[]

    #trainer_endpoints=["127.0.0.1:6071","127.0.0.1:6072","127.0.0.1:6073","127.0.0.1:6074"]
    trainer_endpoints=world_endpoints
    groups=[[], [], []]
    groups[0]=[trainer_endpoints[0], trainer_endpoints[1]]
    groups[1]=[trainer_endpoints[2], trainer_endpoints[3]]
    groups[2]=[trainer_endpoints[0], trainer_endpoints[2]]
96
    print("groups:", groups)
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112

    for i in range(len(trainer_endpoints)):
        startup_programs.append(fluid.Program())
        main_programs.append(fluid.Program())

    for idx, group in enumerate(groups):
        for te in group:
            te_idx = trainer_endpoints.index(te)
            startup_program = startup_programs[te_idx]
            main_program=main_programs[te_idx]
            init_communicator(startup_program, main_program, te, group, idx)

    print(len(startup_programs))
    print(startup_programs[local_rank])
    print(main_programs[local_rank])

113 114 115 116 117 118 119 120 121 122 123 124 125 126
    print("local rank: ", local_rank)
    print("local startup program: ", startup_programs[local_rank])

    startup_program = startup_programs[local_rank]
    main_program = main_programs[local_rank]
    loss = Loss(Block(main_program))
    optimizer = ascend_optimizer.AscendOptimizer(None, fetch_list=[])
    optimizer.minimize(loss, startup_program, auto_dp=True)

    exe = paddle.static.Executor(paddle.CPUPlace())
    #exe.run(startup_program)
    exe.run(main_program)


127 128 129 130 131 132 133 134 135 136 137
worker_endpoints=fleet.worker_endpoints()
world_device_ids=fleet.world_device_ids()
local_device_ids=fleet.local_device_ids()
local_rank=int(fleet.local_rank())

print("worker_endpoints:", worker_endpoints)
print("world_device_ids:", world_device_ids)
print("local_device_ids:", local_device_ids)
print("local_rank:", local_rank)

train(worker_endpoints, world_device_ids,local_device_ids,local_rank)