c_comm_init_op.py 2.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

import unittest
import os
import paddle.fluid.core as core
import paddle.fluid as fluid
21
from paddle.distributed.fleet.base.private_helper_function import wait_server_ready
P
pangyoki 已提交
22 23 24
import paddle

paddle.enable_static()
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71


class TestCCommInitOp(unittest.TestCase):
    def setUp(self):
        self.endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS").split(',')
        self.current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
        self.nranks = len(self.endpoints)
        self.rank = self.endpoints.index(self.current_endpoint)
        self.gpu_id = int(os.getenv("FLAGS_selected_gpus"))
        self.place = fluid.CUDAPlace(self.gpu_id)
        self.exe = fluid.Executor(self.place)
        self.endpoints.remove(self.current_endpoint)
        self.other_endpoints = self.endpoints
        if self.rank == 0:
            wait_server_ready(self.other_endpoints)

    def test_specifying_devices(self):
        program = fluid.Program()
        block = program.global_block()
        nccl_id_var = block.create_var(
            name=fluid.unique_name.generate('nccl_id'),
            persistable=True,
            type=fluid.core.VarDesc.VarType.RAW)
        block.append_op(
            type='c_gen_nccl_id',
            inputs={},
            outputs={'Out': nccl_id_var},
            attrs={
                'rank': self.rank,
                'endpoint': self.current_endpoint,
                'other_endpoints': self.other_endpoints
            })
        block.append_op(
            type='c_comm_init',
            inputs={'X': nccl_id_var},
            outputs={},
            attrs={
                'nranks': self.nranks,
                'rank': self.rank,
                'ring_id': 0,
                'device_id': self.gpu_id
            })
        self.exe.run(program)


if __name__ == "__main__":
    unittest.main()