# -*- coding: utf-8 -*- # MegEngine is Licensed under the Apache License, Version 2.0 (the "License") # # Copyright (c) 2014-2020 Megvii Inc. All rights reserved. # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. import weakref import numpy as np import pytest import megengine as mge import megengine.distributed as dist from megengine.core._imperative_rt import TensorAttr, imperative from megengine.core._imperative_rt.imperative import sync from megengine.core.autodiff.grad import Grad from megengine.core.ops.builtin import Elemwise from megengine.core.tensor.raw_tensor import as_raw_tensor from megengine.core.tensor.tensor import Tensor, apply from megengine.core.tensor.tensor_wrapper import TensorWrapper from megengine.functional.distributed import remote_recv, remote_send def _elwise(mode): op = Elemwise(mode=mode) def f(*args): (result,) = apply(op, *args) return result return f add = _elwise("add") mul = _elwise("mul") cos = _elwise("cos") relu = _elwise("relu") def as_tensor(x): return Tensor(as_raw_tensor(x, device=mge.device.get_default_device())) def save_to(self, name="grad"): def callback(tensor, grad): setattr(self, name, grad) return callback @pytest.mark.isolated_distributed def test_dist_grad(): world_size = 2 x_np = np.random.rand(10).astype("float32") port = dist.get_free_ports(1)[0] server = dist.Server(port) def worker0(): dist.init_process_group("localhost", port, world_size, 0, 0) mge.device.set_default_device("gpu0") grad = Grad() x = as_tensor(x_np) grad.wrt(x, callback=save_to(x)) # need a placeholder to trace operator send_x = remote_send(x, 1) recv_x = remote_recv(1, x_np.shape, x_np.dtype, "gpu0") y = recv_x * recv_x grad([y], [as_tensor(np.ones_like(x_np))]) np.testing.assert_almost_equal(x.grad.numpy(), x.numpy() * 2) def worker1(): dist.init_process_group("localhost", port, world_size, 1, 1) mge.device.set_default_device("gpu1") grad = Grad() recv_x = remote_recv(0, x_np.shape, x_np.dtype, "gpu1") send_x = remote_send(recv_x, 0) grad([], []) # sync because grad has a send operator sync() send_x.device._cn._sync_all() import multiprocessing as mp p0 = mp.Process(target=worker0) p1 = mp.Process(target=worker1) p0.start() p1.start() p0.join(10) p1.join(10) assert p0.exitcode == 0 and p1.exitcode == 0 def test_grad(): x_np = np.random.rand(10).astype("float32") x = as_tensor(x_np) grad = Grad().wrt(x, callback=save_to(x)) y = cos(x) grad(y, as_tensor(np.ones_like(x_np))) np.testing.assert_almost_equal(x.grad.numpy(), -np.sin(x_np)) def test_grad_2(): x_np = np.random.rand(10).astype("float32") x = as_tensor(x_np) grad = Grad().wrt(x, callback=save_to(x)) y = mul(x, x) y = mul(y, y) grad(y, as_tensor(np.ones_like(x_np))) np.testing.assert_almost_equal(x.grad.numpy(), 4 * x_np ** 3, decimal=6) @pytest.mark.skip(reason="high order gradient was not implemented yet") def test_2nd_grad(): x_np = np.random.rand(10).astype("float32") x = as_tensor(x_np) ones = as_tensor(np.ones_like(x_np)) grad = Grad().wrt(x, callback=save_to(x)) grad2 = Grad().wrt(x, callback=save_to(x)) y = cos(x) grad(y, ones) np.testing.assert_almost_equal(x.grad.numpy(), -np.sin(x_np), decimal=5) grad2(x.grad, ones) np.testing.assert_almost_equal(x.grad.numpy(), -np.cos(x_np)) def test_grad_with_tensor_wrapper(): x_np = np.random.rand(10).astype("float32") x = TensorWrapper(x_np) grad = Grad().wrt(x, callback=save_to(x)) y = mul(x, x) y = mul(y, y) grad(y, TensorWrapper(np.ones_like(x_np))) np.testing.assert_almost_equal(x.grad.numpy(), 4 * x_np ** 3, decimal=6) def test_grad_inplace(): x_np = np.random.rand(10).astype("float32") x = TensorWrapper(x_np) grad = Grad().wrt(x, callback=save_to(x)) y = mul(x, x) y *= y grad(y, TensorWrapper(np.ones_like(x_np))) np.testing.assert_almost_equal(x.grad.numpy(), 4 * x_np ** 3, decimal=6) def test_elemwise_add(): x_np = np.random.rand(10).astype("float32") y_np = np.random.rand(10, 10).astype("float32") dz_np = np.random.rand(10, 10).astype("float32") x = TensorWrapper(x_np) y = TensorWrapper(y_np) dz = TensorWrapper(dz_np) refs = {} def f(x, y): x = x * 2 refs["x"] = weakref.ref(x.__wrapped__) refs["y"] = weakref.ref(y.__wrapped__) return x + y grad = Grad().wrt(x, callback=save_to(x)) z = f(x, y) del y for k, r in refs.items(): assert r() is None grad(z, dz) np.testing.assert_almost_equal(x.grad.numpy(), dz_np.sum(0) * 2, decimal=5) def test_elemwise_relu(): x_np = [1.0, -1.0] dz_np = [1.0] x = TensorWrapper(x_np) dz = TensorWrapper(dz_np) refs = {} def f(x): x = x * 2 refs["x"] = weakref.ref(x.__wrapped__) return relu(x) grad = Grad().wrt(x, callback=save_to(x)) z = f(x) assert refs["x"]() is None grad(z, dz) np.testing.assert_almost_equal(x.grad.numpy(), [2.0, 0]) def test_elemwise_relu_backward_fn(): op = Elemwise(mode="relu").to_c() attr = TensorAttr() attr.dtype = "float32" attr.comp_node = "xpux" result = imperative.make_backward_graph(op, [attr], [True], [True]) backward_graph, save_for_backward_mask, input_has_grad = result assert save_for_backward_mask == [False, True, True], save_for_backward_mask