test_functional_distributed.py 7.4 KB
Newer Older
1 2 3
# -*- coding: utf-8 -*-
# MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
#
4
# Copyright (c) 2014-2021 Megvii Inc. All rights reserved.
5 6 7 8 9 10 11 12 13 14 15
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
import platform

import numpy as np
import pytest

import megengine as mge
import megengine.distributed as dist
16
from megengine import Parameter, tensor
17
from megengine.core._imperative_rt.core2 import sync
18
from megengine.device import get_default_device, set_default_device
19
from megengine.distributed.helper import get_device_count_by_fork
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
from megengine.functional.distributed import (
    all_gather,
    all_reduce_max,
    all_reduce_min,
    all_reduce_sum,
    all_to_all,
    broadcast,
    gather,
    reduce_scatter_sum,
    reduce_sum,
    remote_recv,
    remote_send,
    scatter,
)


36 37
@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(), (1,), (2, 3), (8, 10), (99, 77)], ids=str)
38
@pytest.mark.isolated_distributed
39
def test_reduce_sum(shape):
40 41 42 43
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
44 45
        output = reduce_sum(inp)
        if rank == 0:
46
            assert np.allclose(output.numpy(), expect[rank])
47

48 49 50 51 52 53
    x = np.random.random_sample(shape).astype("float32")
    y = np.random.random_sample(shape).astype("float32")
    z = x + y
    data = (x, y)
    expect = (z, None)
    worker(data, expect)
54 55


56 57
@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(), (1,), (2, 3), (8, 10), (99, 77)], ids=str)
58
@pytest.mark.isolated_distributed
59
def test_broadcast(shape):
60 61 62 63
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
64
        output = broadcast(inp)
65
        assert np.allclose(output.numpy(), expect[rank])
66

67 68 69 70 71
    x = np.random.random_sample(shape).astype("float32")
    y = x + 1
    data = (x, y)
    expect = (x, x)
    worker(data, expect)
72 73


74 75
@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(1,), (2, 3), (8, 10), (99, 77)], ids=str)
76
@pytest.mark.isolated_distributed
77
def test_all_gather(shape):
78 79 80 81
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
82
        output = all_gather(inp)
83
        assert np.allclose(output.numpy(), expect[rank])
84

85 86 87 88 89 90
    x = np.random.random_sample(shape).astype("float32")
    y = np.random.random_sample(shape).astype("float32")
    z = np.concatenate((x, y))
    data = (x, y)
    expect = (z, z)
    worker(data, expect)
91 92


93 94
@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(2, 3), (8, 10), (88, 44)], ids=str)
95
@pytest.mark.isolated_distributed
96
def test_reduce_scatter_sum(shape):
97 98 99 100
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
101
        output = reduce_scatter_sum(inp)
102
        assert np.allclose(output.numpy(), expect[rank])
103

104 105 106 107 108 109
    x = np.random.random_sample(shape).astype("float32")
    y = np.random.random_sample(shape).astype("float32")
    z = x + y
    data = (x, y)
    expect = (z[: shape[0] // 2], z[shape[0] // 2 :])
    worker(data, expect)
110 111


112 113
@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(), (1,), (2, 3), (8, 10), (99, 77)], ids=str)
114
@pytest.mark.isolated_distributed
115
def test_all_reduce_sum(shape):
116 117 118 119
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
120
        output = all_reduce_sum(inp)
121
        assert np.allclose(output.numpy(), expect[rank])
122

123 124 125 126 127 128
    x = np.random.random_sample(shape).astype("float32")
    y = np.random.random_sample(shape).astype("float32")
    z = x + y
    data = (x, y)
    expect = (z, z)
    worker(data, expect)
129 130


131 132
@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(), (1,), (2, 3), (8, 10), (99, 77)], ids=str)
133
@pytest.mark.isolated_distributed
134
def test_all_reduce_max(shape):
135 136 137 138
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
139
        output = all_reduce_max(inp)
140
        assert np.allclose(output.numpy(), expect[rank])
141

142 143 144 145 146 147
    x = np.random.random_sample(shape).astype("float32")
    y = np.random.random_sample(shape).astype("float32")
    z = np.maximum(x, y)
    data = (x, y)
    expect = (z, z)
    worker(data, expect)
148 149


150 151
@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(), (1,), (2, 3), (8, 10), (99, 77)], ids=str)
152
@pytest.mark.isolated_distributed
153
def test_all_reduce_min(shape):
154 155 156 157
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
158
        output = all_reduce_min(inp)
159
        assert np.allclose(output.numpy(), expect[rank])
160

161 162 163 164 165 166
    x = np.random.random_sample(shape).astype("float32")
    y = np.random.random_sample(shape).astype("float32")
    z = np.minimum(x, y)
    data = (x, y)
    expect = (z, z)
    worker(data, expect)
167 168


169 170
@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(2, 3), (8, 10), (99, 77)], ids=str)
171
@pytest.mark.isolated_distributed
172
def test_gather(shape):
173 174 175 176
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
177 178
        output = gather(inp)
        if rank == 0:
179
            assert np.allclose(output.numpy(), expect[rank])
180

181 182 183 184 185 186
    x = np.random.random_sample(shape).astype("float32")
    y = np.random.random_sample(shape).astype("float32")
    z = np.concatenate((x, y))
    data = (x, y)
    expect = (z, None)
    worker(data, expect)
187 188


189 190
@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(2, 3), (8, 10), (100, 77)], ids=str)
191
@pytest.mark.isolated_distributed
192
def test_scatter(shape):
193 194 195 196
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
197
        output = scatter(inp)
198
        assert np.allclose(output.numpy(), expect[rank])
199

200 201 202 203 204
    x = np.random.random_sample(shape).astype("float32")
    y = x + 1
    data = (x, y)
    expect = (x[: shape[0] // 2], x[shape[0] // 2 :])
    worker(data, expect)
205 206


207 208
@pytest.mark.require_ngpu(2)
@pytest.mark.parametrize("shape", [(2, 3), (8, 10), (100, 77)], ids=str)
209
@pytest.mark.isolated_distributed
210
def test_all_to_all(shape):
211 212 213 214
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
215
        output = all_to_all(inp)
216
        assert np.allclose(output.numpy(), expect[rank])
217

218 219 220 221 222 223 224
    x = np.random.random_sample(shape).astype("float32")
    y = np.random.random_sample(shape).astype("float32")
    a = np.concatenate((x[: shape[0] // 2], y[: shape[0] // 2]))
    b = np.concatenate((x[shape[0] // 2 :], y[shape[0] // 2 :]))
    data = (x, y)
    expect = (a, b)
    worker(data, expect)
225 226


227
@pytest.mark.require_ngpu(2)
228
@pytest.mark.isolated_distributed
229 230
@pytest.mark.parametrize("shape", [(), (1,), (4, 5)], ids=str)
def test_io_remote(shape):
231
    @dist.launcher(n_gpus=2)
232
    def worker(val, shape):
233
        rank = dist.get_rank()
234
        if rank == 0:  # remote send
235
            x = tensor(val, device="xpu0")
236 237
            remote_send(x, 1)
            sync()
238
        else:  # remote recv
239
            y = remote_recv(0, shape, np.float32)
240
            assert y.device == get_default_device()
241 242
            np.testing.assert_almost_equal(val, y.numpy())

243 244
    val = np.random.random_sample(shape).astype("float32")
    worker(val, shape)