test_functional_distributed.py 10.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# -*- coding: utf-8 -*-
# MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
#
# Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
import platform

import numpy as np
import pytest

import megengine as mge
import megengine.distributed as dist
16
from megengine import Parameter, tensor
17
from megengine.core._imperative_rt.core2 import sync
18
from megengine.device import get_default_device, set_default_device
19
from megengine.distributed.helper import get_device_count_by_fork
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
from megengine.functional.distributed import (
    all_gather,
    all_reduce_max,
    all_reduce_min,
    all_reduce_sum,
    all_to_all,
    broadcast,
    gather,
    reduce_scatter_sum,
    reduce_sum,
    remote_recv,
    remote_send,
    scatter,
)


@pytest.mark.skipif(
    platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
)
@pytest.mark.skipif(
40
    platform.system() == "Windows", reason="windows disable MGB_ENABLE_OPR_MM"
41
)
42
@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device")
43 44
@pytest.mark.isolated_distributed
def test_reduce_sum():
45 46 47 48
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
49 50
        output = reduce_sum(inp)
        if rank == 0:
51
            assert np.allclose(output.numpy(), expect[rank])
52 53 54 55
        else:
            assert np.allclose(output.numpy(), 0)

    def check(shape):
56 57
        x = np.random.rand(*shape)
        y = np.random.rand(*shape)
58
        z = x + y
59 60 61
        data = (x, y)
        expect = (z, None)
        worker(data, expect)
62

63
    for shape in [(), (1,), (2, 3), (8, 10), (99, 77)]:
64 65 66 67 68 69 70
        check(shape)


@pytest.mark.skipif(
    platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
)
@pytest.mark.skipif(
71
    platform.system() == "Windows", reason="windows disable MGB_ENABLE_OPR_MM"
72
)
73
@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device")
74 75
@pytest.mark.isolated_distributed
def test_broadcast():
76 77 78 79
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
80
        output = broadcast(inp)
81
        assert np.allclose(output.numpy(), expect[rank])
82 83

    def check(shape):
84
        x = np.random.rand(*shape)
85
        y = x + 1
86 87 88
        data = (x, y)
        expect = (x, x)
        worker(data, expect)
89

90
    for shape in [(), (1,), (2, 3), (8, 10), (99, 77)]:
91 92 93 94 95 96 97
        check(shape)


@pytest.mark.skipif(
    platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
)
@pytest.mark.skipif(
98
    platform.system() == "Windows", reason="windows disable MGB_ENABLE_OPR_MM"
99
)
100
@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device")
101 102
@pytest.mark.isolated_distributed
def test_all_gather():
103 104 105 106
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
107
        output = all_gather(inp)
108
        assert np.allclose(output.numpy(), expect[rank])
109 110 111 112 113

    def check(shape):
        x = np.random.rand(*shape).astype("float32")
        y = np.random.rand(*shape).astype("float32")
        z = np.concatenate((x, y))
114 115 116
        data = (x, y)
        expect = (z, z)
        worker(data, expect)
117 118 119 120 121 122 123 124 125

    for shape in [(2, 3), (8, 10), (99, 77)]:
        check(shape)


@pytest.mark.skipif(
    platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
)
@pytest.mark.skipif(
126
    platform.system() == "Windows", reason="windows disable MGB_ENABLE_OPR_MM"
127
)
128
@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device")
129 130
@pytest.mark.isolated_distributed
def test_reduce_scatter_sum():
131 132 133 134
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
135
        output = reduce_scatter_sum(inp)
136
        assert np.allclose(output.numpy(), expect[rank])
137 138 139 140 141

    def check(shape):
        x = np.random.rand(*shape).astype("float32")
        y = np.random.rand(*shape).astype("float32")
        z = x + y
142 143 144
        data = (x, y)
        expect = (z[: shape[0] // 2], z[shape[0] // 2 :])
        worker(data, expect)
145 146 147 148 149 150 151 152 153

    for shape in [(2, 4), (8, 10), (88, 44)]:
        check(shape)


@pytest.mark.skipif(
    platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
)
@pytest.mark.skipif(
154
    platform.system() == "Windows", reason="windows disable MGB_ENABLE_OPR_MM"
155
)
156
@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device")
157 158
@pytest.mark.isolated_distributed
def test_all_reduce_sum():
159 160 161 162
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
163
        output = all_reduce_sum(inp)
164
        assert np.allclose(output.numpy(), expect[rank])
165 166

    def check(shape):
167 168
        x = np.random.rand(*shape)
        y = np.random.rand(*shape)
169
        z = x + y
170 171 172
        data = (x, y)
        expect = (z, z)
        worker(data, expect)
173

174
    for shape in [(), (1,), (2, 3), (8, 10), (99, 77)]:
175 176 177 178 179 180 181
        check(shape)


@pytest.mark.skipif(
    platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
)
@pytest.mark.skipif(
182
    platform.system() == "Windows", reason="windows disable MGB_ENABLE_OPR_MM"
183
)
184
@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device")
185 186
@pytest.mark.isolated_distributed
def test_all_reduce_max():
187 188 189 190
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
191
        output = all_reduce_max(inp)
192
        assert np.allclose(output.numpy(), expect[rank])
193 194

    def check(shape):
195 196
        x = np.random.rand(*shape)
        y = np.random.rand(*shape)
197
        z = np.maximum(x, y)
198 199 200
        data = (x, y)
        expect = (z, z)
        worker(data, expect)
201

202
    for shape in [(), (1,), (2, 3), (8, 10), (99, 77)]:
203 204 205 206 207 208 209
        check(shape)


@pytest.mark.skipif(
    platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
)
@pytest.mark.skipif(
210
    platform.system() == "Windows", reason="windows disable MGB_ENABLE_OPR_MM"
211
)
212
@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device")
213 214
@pytest.mark.isolated_distributed
def test_all_reduce_min():
215 216 217 218
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
219
        output = all_reduce_min(inp)
220
        assert np.allclose(output.numpy(), expect[rank])
221 222

    def check(shape):
223 224
        x = np.random.rand(*shape)
        y = np.random.rand(*shape)
225
        z = np.minimum(x, y)
226 227 228
        data = (x, y)
        expect = (z, z)
        worker(data, expect)
229

230
    for shape in [(), (1,), (2, 3), (8, 10), (99, 77)]:
231 232 233 234 235 236 237
        check(shape)


@pytest.mark.skipif(
    platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
)
@pytest.mark.skipif(
238
    platform.system() == "Windows", reason="windows disable MGB_ENABLE_OPR_MM"
239
)
240
@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device")
241 242
@pytest.mark.isolated_distributed
def test_gather():
243 244 245 246
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
247 248
        output = gather(inp)
        if rank == 0:
249
            assert np.allclose(output.numpy(), expect[rank])
250 251 252 253 254 255 256
        else:
            assert np.allclose(output.numpy(), 0)

    def check(shape):
        x = np.random.rand(*shape).astype("float32")
        y = np.random.rand(*shape).astype("float32")
        z = np.concatenate((x, y))
257 258 259
        data = (x, y)
        expect = (z, None)
        worker(data, expect)
260 261 262 263 264 265 266 267 268

    for shape in [(2, 3), (8, 10), (99, 77)]:
        check(shape)


@pytest.mark.skipif(
    platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
)
@pytest.mark.skipif(
269
    platform.system() == "Windows", reason="windows disable MGB_ENABLE_OPR_MM"
270
)
271
@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device")
272 273
@pytest.mark.isolated_distributed
def test_scatter():
274 275 276 277
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
278
        output = scatter(inp)
279
        assert np.allclose(output.numpy(), expect[rank])
280 281 282 283

    def check(shape):
        x = np.random.rand(*shape).astype("float32")
        y = x + 1
284 285 286
        data = (x, y)
        expect = (x[: shape[0] // 2], x[shape[0] // 2 :])
        worker(data, expect)
287 288 289 290 291 292 293 294 295

    for shape in [(2, 3), (8, 10), (100, 77)]:
        check(shape)


@pytest.mark.skipif(
    platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
)
@pytest.mark.skipif(
296
    platform.system() == "Windows", reason="windows disable MGB_ENABLE_OPR_MM"
297
)
298
@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device")
299 300
@pytest.mark.isolated_distributed
def test_all_to_all():
301 302 303 304
    @dist.launcher(n_gpus=2)
    def worker(data, expect):
        rank = dist.get_rank()
        inp = tensor(data[rank])
305
        output = all_to_all(inp)
306
        assert np.allclose(output.numpy(), expect[rank])
307 308 309 310 311 312

    def check(shape):
        x = np.random.rand(*shape).astype("float32")
        y = np.random.rand(*shape).astype("float32")
        a = np.concatenate((x[: shape[0] // 2], y[: shape[0] // 2]))
        b = np.concatenate((x[shape[0] // 2 :], y[shape[0] // 2 :]))
313 314 315
        data = (x, y)
        expect = (a, b)
        worker(data, expect)
316 317 318 319 320 321 322 323 324

    for shape in [(2, 3), (8, 10), (100, 77)]:
        check(shape)


@pytest.mark.skipif(
    platform.system() == "Darwin", reason="do not imp GPU mode at macos now"
)
@pytest.mark.skipif(
325
    platform.system() == "Windows", reason="windows disable MGB_ENABLE_OPR_MM"
326
)
327
@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device")
328 329
@pytest.mark.isolated_distributed
def test_io_remote():
330
    @dist.launcher(n_gpus=2)
331
    def worker(val, shape):
332
        rank = dist.get_rank()
333
        if rank == 0:  # remote send
334
            x = tensor(val, device="gpu0")
335 336
            remote_send(x, 1)
            sync()
337
        else:  # remote recv
338
            y = remote_recv(0, shape, np.float32)
339
            assert y.device == "gpu1"
340 341
            np.testing.assert_almost_equal(val, y.numpy())

342 343 344
    for shape in [(), (1,), (4, 5)]:
        val = np.random.rand(*shape)
        worker(val, shape)