collective_alltoall_single.py 2.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest

import numpy as np
18 19

import paddle
20 21 22 23 24
import paddle.distributed as dist


class TestCollectiveAllToAllSingle(unittest.TestCase):
    def setUp(self):
25 26 27
        assert (
            not paddle.distributed.is_initialized()
        ), "The distributed environment has not been initialized."
28
        dist.init_parallel_env()
29 30 31
        assert (
            paddle.distributed.is_initialized()
        ), "The distributed environment has been initialized."
32 33 34 35 36 37 38 39 40

    def test_collective_alltoall_single(self):
        rank = dist.get_rank()
        size = dist.get_world_size()

        # case 1
        input = paddle.ones([size, size], dtype='int64') * rank
        output = paddle.empty([size, size], dtype='int64')
        expected_output = paddle.concat(
41 42
            [paddle.ones([1, size], dtype='int64') * i for i in range(size)]
        )
43 44 45 46 47 48 49 50 51 52 53 54 55

        group = dist.new_group([0, 1])
        dist.alltoall_single(input, output, group=group)

        np.testing.assert_allclose(output.numpy(), expected_output.numpy())
        dist.destroy_process_group(group)

        # case 2
        in_split_sizes = [i + 1 for i in range(size)]
        out_split_sizes = [rank + 1 for i in range(size)]

        input = paddle.ones([sum(in_split_sizes), size], dtype='float32') * rank
        output = paddle.empty([(rank + 1) * size, size], dtype='float32')
56 57 58 59 60 61
        expected_output = paddle.concat(
            [
                paddle.ones([rank + 1, size], dtype='float32') * i
                for i in range(size)
            ]
        )
62 63

        group = dist.new_group([0, 1])
64 65 66 67 68 69 70 71
        task = dist.alltoall_single(
            input,
            output,
            in_split_sizes,
            out_split_sizes,
            sync_op=False,
            group=group,
        )
72 73 74 75 76 77 78
        task.wait()

        np.testing.assert_allclose(output.numpy(), expected_output.numpy())
        dist.destroy_process_group(group)

    def tearDown(self):
        dist.destroy_process_group()
79 80 81
        assert (
            not paddle.distributed.is_initialized()
        ), "The distributed environment has been deinitialized."
82 83 84 85


if __name__ == '__main__':
    unittest.main()