test_param_pack.py 2.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
# -*- coding: utf-8 -*-
# MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
#
# Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
import platform

import numpy as np
import pytest

import megengine
import megengine.autodiff as ad
import megengine.distributed as dist
import megengine.optimizer as optimizer
from megengine import Parameter, tensor
from megengine.distributed.helper import get_device_count_by_fork
from megengine.module import Module
from megengine.optimizer import SGD


class Simple(Module):
    def __init__(self):
        super().__init__()
        self.params = [Parameter(1.0, dtype=np.float32) for i in range(10)]

    def forward(self, x):
        for p in self.params:
            x = x * p
        return x


@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device")
@pytest.mark.isolated_distributed
@pytest.mark.skipif(
    platform.system() == "Windows", reason="windows disable MGB_ENABLE_OPR_MM"
)
def test_param_pack():
    data = np.ones([1], dtype="float32")

    @dist.launcher
    def worker():
        net = Simple()
        opt = SGD(net.parameters(), lr=0.1)

        gm = ad.GradManager().attach(
            net.parameters(), callbacks=[dist.make_allreduce_cb("MEAN", dist.WORLD)]
        )

        opt.clear_grad()
        with gm:
            x = tensor(data)
            loss = net(x)
            loss = loss.sum()
            gm.backward(loss)
        for p in net.params:
            np.testing.assert_equal(p.grad.numpy(), 1)

    worker()


@pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device")
@pytest.mark.isolated_distributed
@pytest.mark.skipif(
    platform.system() == "Windows", reason="windows disable MGB_ENABLE_OPR_MM"
)
def test_param_pack_with_no_param():
    data = np.ones([1], dtype="float32")

    @dist.launcher
    def worker():
        net = Simple()
        opt = SGD(net.parameters(), lr=0.1)

        allreduce_cb = dist.make_allreduce_cb("MEAN", dist.WORLD)
        allreduce_cb._param_pack_thd = 0
        gm = ad.GradManager().attach(net.parameters(), callbacks=[allreduce_cb])

        opt.clear_grad()
        with gm:
            x = tensor(data)
            loss = net(x)
            loss = loss.sum()
            gm.backward(loss)
        for p in net.params:
            np.testing.assert_equal(p.grad.numpy(), 1)

    worker()