From d3730036ea2091ae0989dbe9c9206dca3b947514 Mon Sep 17 00:00:00 2001 From: Megvii Engine Team Date: Wed, 29 Apr 2020 14:02:05 +0800 Subject: [PATCH] test(mge/dist): add unit tests for megengine.distributed.utils GitOrigin-RevId: 32336c66f1cf68b818093148f1f356caa170f8cb --- .../megengine/distributed/__init__.py | 2 + python_module/test/run.sh | 23 +- .../test/unit/distributed/test_functional.py | 294 ++++++++++++++++++ .../test/unit/distributed/test_util.py | 163 ++++++++++ 4 files changed, 473 insertions(+), 9 deletions(-) create mode 100644 python_module/test/unit/distributed/test_functional.py create mode 100644 python_module/test/unit/distributed/test_util.py diff --git a/python_module/megengine/distributed/__init__.py b/python_module/megengine/distributed/__init__.py index d84f6e030..fb6e8033a 100644 --- a/python_module/megengine/distributed/__init__.py +++ b/python_module/megengine/distributed/__init__.py @@ -17,6 +17,7 @@ from .functional import ( reduce_sum, ) from .util import ( + get_backend, get_master_ip, get_master_port, get_rank, @@ -24,4 +25,5 @@ from .util import ( group_barrier, init_process_group, is_distributed, + synchronized, ) diff --git a/python_module/test/run.sh b/python_module/test/run.sh index c13066003..59a66e6f2 100755 --- a/python_module/test/run.sh +++ b/python_module/test/run.sh @@ -1,14 +1,19 @@ #!/bin/bash -e +ignore_list="--ignore test/unit/module/test_pytorch.py \ + --ignore test/pytorch_comparison \ + --ignore test/unit/hub/test_hub.py \ + --ignore test/unit/data \ + --ignore test/integration/manual \ + --ignore megengine/module/pytorch \ + --ignore test/unit/module/test_external.py" +test_dirs="megengine test" + pushd $(dirname "${BASH_SOURCE[0]}")/.. >/dev/null - pytest -xv -m 'not internet' \ + pytest -xv -m 'isolated_distributed' \ + --json-report --json-report-file=time_python_test.json \ + $ignore_list $test_dirs + pytest -xv -m 'not internet and not isolated_distributed' \ --json-report --json-report-file=time_python_test.json \ - --ignore test/unit/module/test_pytorch.py \ - --ignore test/pytorch_comparison \ - --ignore test/unit/hub/test_hub.py \ - --ignore test/unit/data \ - --ignore test/integration/manual \ - --ignore megengine/module/pytorch \ - --ignore test/unit/module/test_external.py \ - megengine test + $ignore_list $test_dirs popd >/dev/null diff --git a/python_module/test/unit/distributed/test_functional.py b/python_module/test/unit/distributed/test_functional.py new file mode 100644 index 000000000..f3c981b2f --- /dev/null +++ b/python_module/test/unit/distributed/test_functional.py @@ -0,0 +1,294 @@ +# MegEngine is Licensed under the Apache License, Version 2.0 (the "License") +# +# Copyright (c) 2014-2020 Megvii Inc. All rights reserved. +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + +import multiprocessing as mp + +import numpy as np +import pytest + +import megengine as mge +import megengine.distributed as dist +from megengine.core import Parameter, tensor + + +def _init_process_group_wrapper(world_size, rank, dev, backend, q): + if rank == 0: + dist.init_process_group("localhost", 0, world_size, rank, dev, backend) + q.put(dist.get_master_port()) + else: + port = q.get() + dist.init_process_group("localhost", port, world_size, rank, dev, backend) + + +@pytest.mark.isolated_distributed +def test_reduce_sum(): + world_size = 2 + + def worker(rank, data, backend, expect, port_queue): + if not mge.is_cuda_available(): + return + _init_process_group_wrapper(world_size, rank, rank, backend, port_queue) + inp = tensor(data) + output = dist.functional.reduce_sum(inp, "x") + if rank == 0: + assert np.allclose(output.numpy(), expect) + else: + assert np.allclose(output.numpy(), 0) + + def check(shape, backend): + port_queue = mp.Queue() + x = np.random.rand(*shape).astype("float32") + y = np.random.rand(*shape).astype("float32") + z = x + y + p0 = mp.Process(target=worker, args=(0, x, backend, z, port_queue)) + p1 = mp.Process(target=worker, args=(1, y, backend, None, port_queue)) + + p0.start() + p1.start() + + p0.join(10) + p1.join(10) + + assert p0.exitcode == 0 and p1.exitcode == 0 + + for shape in [(2, 3), (8, 10), (99, 77)]: + for backend in ["nccl", "ucx"]: + check(shape, backend) + + +@pytest.mark.isolated_distributed +def test_broadcast(): + world_size = 2 + + def worker(rank, data, backend, expect, port_queue): + if not mge.is_cuda_available(): + return + _init_process_group_wrapper(world_size, rank, rank, backend, port_queue) + inp = tensor(data) + output = dist.functional.broadcast(inp, "x") + assert np.allclose(output.numpy(), expect) + + def check(shape, backend): + port_queue = mp.Queue() + x = np.random.rand(*shape).astype("float32") + y = x + 1 + p0 = mp.Process(target=worker, args=(0, x, backend, x, port_queue)) + p1 = mp.Process(target=worker, args=(1, y, backend, x, port_queue)) + + p0.start() + p1.start() + + p0.join(10) + p1.join(10) + + assert p0.exitcode == 0 and p1.exitcode == 0 + + for shape in [(2, 3), (8, 10), (99, 77)]: + for backend in ["nccl", "ucx"]: + check(shape, backend) + + +@pytest.mark.isolated_distributed +def test_all_gather(): + world_size = 2 + + def worker(rank, data, backend, expect, port_queue): + if not mge.is_cuda_available(): + return + _init_process_group_wrapper(world_size, rank, rank, backend, port_queue) + inp = tensor(data) + output = dist.functional.all_gather(inp, "x") + assert np.allclose(output.numpy(), expect) + + def check(shape, backend): + port_queue = mp.Queue() + x = np.random.rand(*shape).astype("float32") + y = np.random.rand(*shape).astype("float32") + z = np.concatenate((x, y)) + p0 = mp.Process(target=worker, args=(0, x, backend, z, port_queue)) + p1 = mp.Process(target=worker, args=(1, y, backend, z, port_queue)) + + p0.start() + p1.start() + + p0.join(10) + p1.join(10) + + assert p0.exitcode == 0 and p1.exitcode == 0 + + for shape in [(2, 3), (8, 10), (99, 77)]: + for backend in ["nccl", "ucx"]: + check(shape, backend) + + +@pytest.mark.isolated_distributed +def test_reduce_scatter_sum(): + world_size = 2 + + def worker(rank, data, backend, expect, port_queue): + if not mge.is_cuda_available(): + return + _init_process_group_wrapper(world_size, rank, rank, backend, port_queue) + inp = tensor(data) + output = dist.functional.reduce_scatter_sum(inp, "x") + assert np.allclose(output.numpy(), expect) + + def check(shape, backend): + port_queue = mp.Queue() + x = np.random.rand(*shape).astype("float32") + y = np.random.rand(*shape).astype("float32") + z = x + y + p0 = mp.Process( + target=worker, args=(0, x, backend, z[: shape[0] // 2], port_queue) + ) + p1 = mp.Process( + target=worker, args=(1, y, backend, z[shape[0] // 2 :], port_queue) + ) + + p0.start() + p1.start() + + p0.join(10) + p1.join(10) + + assert p0.exitcode == 0 and p1.exitcode == 0 + + for shape in [(2, 4), (8, 10), (88, 44)]: + for backend in ["nccl", "ucx"]: + check(shape, backend) + + +@pytest.mark.isolated_distributed +def test_all_reduce_sum(): + world_size = 2 + + def worker(rank, data, backend, expect, port_queue): + if not mge.is_cuda_available(): + return + _init_process_group_wrapper(world_size, rank, rank, backend, port_queue) + inp = tensor(data) + output = dist.functional.all_reduce_sum(inp, "x") + assert np.allclose(output.numpy(), expect) + + def check(shape, backend): + port_queue = mp.Queue() + x = np.random.rand(*shape).astype("float32") + y = np.random.rand(*shape).astype("float32") + z = x + y + p0 = mp.Process(target=worker, args=(0, x, backend, z, port_queue)) + p1 = mp.Process(target=worker, args=(1, y, backend, z, port_queue)) + + p0.start() + p1.start() + + p0.join(10) + p1.join(10) + + assert p0.exitcode == 0 and p1.exitcode == 0 + + for shape in [(2, 3), (8, 10), (99, 77)]: + for backend in ["nccl", "ucx"]: + check(shape, backend) + + +@pytest.mark.isolated_distributed +def test_all_reduce_max(): + world_size = 2 + + def worker(rank, data, backend, expect, port_queue): + if not mge.is_cuda_available(): + return + _init_process_group_wrapper(world_size, rank, rank, backend, port_queue) + inp = tensor(data) + output = dist.functional.all_reduce_max(inp, "x") + assert np.allclose(output.numpy(), expect) + + def check(shape, backend): + port_queue = mp.Queue() + x = np.random.rand(*shape).astype("float32") + y = np.random.rand(*shape).astype("float32") + z = np.maximum(x, y) + p0 = mp.Process(target=worker, args=(0, x, backend, z, port_queue)) + p1 = mp.Process(target=worker, args=(1, y, backend, z, port_queue)) + + p0.start() + p1.start() + + p0.join(10) + p1.join(10) + + assert p0.exitcode == 0 and p1.exitcode == 0 + + for shape in [(2, 3), (8, 10), (99, 77)]: + for backend in ["nccl", "ucx"]: + check(shape, backend) + + +@pytest.mark.isolated_distributed +def test_all_reduce_min(): + world_size = 2 + + def worker(rank, data, backend, expect, port_queue): + if not mge.is_cuda_available(): + return + _init_process_group_wrapper(world_size, rank, rank, backend, port_queue) + inp = tensor(data) + output = dist.functional.all_reduce_min(inp, "x") + assert np.allclose(output.numpy(), expect) + + def check(shape, backend): + port_queue = mp.Queue() + x = np.random.rand(*shape).astype("float32") + y = np.random.rand(*shape).astype("float32") + z = np.minimum(x, y) + p0 = mp.Process(target=worker, args=(0, x, backend, z, port_queue)) + p1 = mp.Process(target=worker, args=(1, y, backend, z, port_queue)) + + p0.start() + p1.start() + + p0.join(10) + p1.join(10) + + assert p0.exitcode == 0 and p1.exitcode == 0 + + for shape in [(2, 3), (8, 10), (99, 77)]: + for backend in ["nccl", "ucx"]: + check(shape, backend) + + +@pytest.mark.isolated_distributed +def test_bcast_param(): + world_size = 2 + + def worker(rank, data, backend, expect, port_queue): + if not mge.is_cuda_available(): + return + _init_process_group_wrapper(world_size, rank, rank, backend, port_queue) + inp = Parameter(data) + dist.functional.bcast_param(inp, "x") + assert np.allclose(inp.numpy(), expect) + + def check(shape, backend): + port_queue = mp.Queue() + x = np.random.rand(*shape).astype("float32") + y = x + 1 + p0 = mp.Process(target=worker, args=(0, x, backend, x, port_queue)) + p1 = mp.Process(target=worker, args=(1, y, backend, x, port_queue)) + + p0.start() + p1.start() + + p0.join(10) + p1.join(10) + + assert p0.exitcode == 0 and p1.exitcode == 0 + + for shape in [(2, 3), (8, 10), (99, 77)]: + for backend in ["nccl", "ucx"]: + check(shape, backend) diff --git a/python_module/test/unit/distributed/test_util.py b/python_module/test/unit/distributed/test_util.py new file mode 100644 index 000000000..bbb4dd41d --- /dev/null +++ b/python_module/test/unit/distributed/test_util.py @@ -0,0 +1,163 @@ +# MegEngine is Licensed under the Apache License, Version 2.0 (the "License") +# +# Copyright (c) 2014-2020 Megvii Inc. All rights reserved. +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +import multiprocessing as mp +import queue +from time import sleep + +import pytest + +import megengine as mge +import megengine._internal as mgb +import megengine.distributed as dist + +_LOCALHOST = "127.0.0.1" + + +def _assert_q_empty(q): + try: + res = q.get(timeout=1) + except Exception as e: + assert isinstance(e, queue.Empty) + else: + assert False, "queue is not empty" + + +def _assert_q_val(q, val): + ret = q.get() + assert ret == val + + +def _init_process_group_wrapper(world_size, rank, dev, backend, q): + if rank == 0: + dist.init_process_group(_LOCALHOST, 0, world_size, rank, dev, backend) + q.put(dist.get_master_port()) + else: + port = q.get() + dist.init_process_group(_LOCALHOST, port, world_size, rank, dev, backend) + + +@pytest.mark.isolated_distributed +def test_create_mm_server(): + def worker(): + if not mge.is_cuda_available(): + return + port = mgb.config.create_mm_server("0.0.0.0", 0) + assert port > 0 + res = mgb.config.create_mm_server("0.0.0.0", port) + assert res == -1 + + p = mp.Process(target=worker) + + p.start() + + p.join(10) + + assert p.exitcode == 0 + + +@pytest.mark.isolated_distributed +def test_init_process_group(): + world_size = 2 + + def worker(rank, backend, q): + if not mge.is_cuda_available(): + return + _init_process_group_wrapper(world_size, rank, rank, backend, q) + assert dist.is_distributed() == True + assert dist.get_master_ip() == _LOCALHOST + assert dist.get_master_port() > 0 + assert dist.get_world_size() == world_size + assert dist.get_rank() == rank + assert dist.get_backend() == backend + + def check(backend): + Q = mp.Queue() + p0 = mp.Process(target=worker, args=(0, backend, Q)) + p1 = mp.Process(target=worker, args=(1, backend, Q)) + + p0.start() + p1.start() + + p0.join(10) + p1.join(10) + + assert p0.exitcode == 0 and p1.exitcode == 0 + + check("nccl") + check("ucx") + + +@pytest.mark.isolated_distributed +def test_group_barrier(): + world_size = 2 + ip = "127.0.0.1" + backend = "nccl" + + def worker(rank, q): + if not mge.is_cuda_available(): + return + _init_process_group_wrapper(world_size, rank, rank, backend, q) + dist.group_barrier() + if rank == 0: + dist.group_barrier() + q.put(0) # to be observed in rank 1 + else: + _assert_q_empty(q) # q.put(0) is not executed in rank 0 + dist.group_barrier() + _assert_q_val(q, 0) # q.put(0) executed in rank 0 + + Q = mp.Queue() + p0 = mp.Process(target=worker, args=(0, Q)) + p1 = mp.Process(target=worker, args=(1, Q)) + + p0.start() + p1.start() + + p0.join(10) + p1.join(10) + + assert p0.exitcode == 0 and p1.exitcode == 0 + + +@pytest.mark.isolated_distributed +def test_synchronized(): + world_size = 2 + backend = "nccl" + + @dist.synchronized + def func(rank, q): + q.put(rank) + + def worker(rank, q): + if not mge.is_cuda_available(): + return + _init_process_group_wrapper(world_size, rank, rank, backend, q) + dist.group_barrier() + if rank == 0: + func(0, q) # q.put(0) + q.put(2) + else: + _assert_q_val(q, 0) # func executed in rank 0 + _assert_q_empty(q) # q.put(2) is not executed + func(1, q) + _assert_q_val( + q, 1 + ) # func in rank 1 executed earlier than q.put(2) in rank 0 + _assert_q_val(q, 2) # q.put(2) executed in rank 0 + + Q = mp.Queue() + p0 = mp.Process(target=worker, args=(0, Q)) + p1 = mp.Process(target=worker, args=(1, Q)) + + p0.start() + p1.start() + + p0.join(10) + p1.join(10) + + assert p0.exitcode == 0 and p1.exitcode == 0 -- GitLab