提交 2f829aaa 编写于 作者: M Megvii Engine Team

test(imperative): speed up integration test

GitOrigin-RevId: b03075fcfc4c72e2c398ca8c9fba18f5eb22224b
上级 a1ca390d
# -*- coding: utf-8 -*-
import numpy as np
import megengine
import megengine.autodiff as ad
import megengine.optimizer as optimizer
from megengine import Parameter, tensor
from megengine.module import Module
class Simple(Module):
def __init__(self):
super().__init__()
self.a = Parameter([1.0], dtype=np.float32)
def forward(self, x):
x = x[:, 0] * self.a
return x
def test_ai():
net = Simple()
gm = ad.GradManager().attach(net.parameters())
optim = optimizer.SGD(net.parameters(), lr=1.0)
optim.clear_grad()
dshape = (10, 10)
data = tensor(np.ones(dshape).astype(np.float32))
with gm:
loss = net(data).sum()
gm.backward(loss)
optim.step()
np.testing.assert_almost_equal(
net.a.numpy(), np.array([1.0 - dshape[0]]).astype(np.float32)
)
......@@ -7,7 +7,9 @@ import pytest
import megengine as mge
import megengine.autodiff as ad
import megengine.functional as F
import megengine.optimizer as optim
from megengine import Tensor
from megengine.core import set_option
from megengine.module import Linear, Module
from megengine.optimizer import SGD
from megengine.traced_module import trace_module
......@@ -66,8 +68,13 @@ class XORNet(Module):
return x
@pytest.mark.parametrize("test_traced_module", [True, False])
def test_training_converge(test_traced_module):
@pytest.mark.parametrize(
"test_traced_module, with_drop, grad_clip",
[(False, False, False), (True, True, True)],
)
def test_training_converge(test_traced_module, with_drop, grad_clip):
if with_drop:
set_option("enable_drop", 1)
net = XORNet()
if test_traced_module:
inp = Tensor(np.random.random((14, 2)))
......@@ -81,6 +88,8 @@ def test_training_converge(test_traced_module):
pred = net(data)
loss = F.nn.cross_entropy(pred, label)
gm.backward(loss)
if grad_clip:
optim.clip_grad_norm(net.parameters(), max_norm=0.2, ord=2.0)
return loss
def infer(data):
......@@ -89,11 +98,13 @@ def test_training_converge(test_traced_module):
train_dataset = minibatch_generator()
losses = []
for data, label in itertools.islice(train_dataset, 2000):
for data, label in itertools.islice(train_dataset, 1500):
data = Tensor(data, dtype=np.float32)
label = Tensor(label, dtype=np.int32)
opt.clear_grad()
loss = train(data, label)
if grad_clip:
optim.clip_grad_value(net.parameters(), lower=-0.1, upper=0.1)
opt.step()
losses.append(loss.numpy())
......@@ -110,3 +121,6 @@ def test_training_converge(test_traced_module):
assert precision == 1.0, "Test precision must be high enough, get {}".format(
precision
)
if with_drop:
set_option("enable_drop", 0)
# -*- coding: utf-8 -*-
import itertools
import numpy as np
import megengine as mge
import megengine.autodiff as ad
import megengine.functional as F
from megengine import Tensor
from megengine.core import get_option, set_option
from megengine.module import Linear, Module
from megengine.optimizer import SGD
batch_size = 64
data_shape = (batch_size, 2)
label_shape = (batch_size,)
def minibatch_generator():
while True:
inp_data = np.zeros((batch_size, 2))
label = np.zeros(batch_size, dtype=np.int32)
for i in range(batch_size):
# [x0, x1], sampled from U[-1, 1]
inp_data[i, :] = np.random.rand(2) * 2 - 1
label[i] = 0 if np.prod(inp_data[i]) < 0 else 1
yield inp_data.astype(np.float32), label.astype(np.int32)
def calculate_precision(data: np.ndarray, pred: np.ndarray) -> float:
""" Calculate precision for given data and prediction.
:type data: [[x, y], ...]
:param data: Input data
:type pred: [[x_pred, y_pred], ...]
:param pred: Network output data
"""
correct = 0
assert len(data) == len(pred)
for inp_data, pred_output in zip(data, pred):
label = 0 if np.prod(inp_data) < 0 else 1
pred_label = np.argmax(pred_output)
if pred_label == label:
correct += 1
return float(correct) / len(data)
class XORNet(Module):
def __init__(self):
self.mid_layers = 14
self.num_class = 2
super().__init__()
self.fc0 = Linear(self.num_class, self.mid_layers, bias=True)
self.fc1 = Linear(self.mid_layers, self.mid_layers, bias=True)
self.fc2 = Linear(self.mid_layers, self.num_class, bias=True)
def forward(self, x):
y = self.fc0(x)
x = F.tanh(y)
y = self.fc1(x)
x = F.tanh(y)
x = self.fc2(x)
y = (x + x) / 2 # in order to test drop()
y._drop()
return y
def test_training_converge_with_drop():
set_option("enable_drop", 1)
net = XORNet()
opt = SGD(net.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)
gm = ad.GradManager().attach(net.parameters())
def train(data, label):
with gm:
pred = net(data)
loss = F.nn.cross_entropy(pred, label)
gm.backward(loss)
return loss
def infer(data):
return net(data)
train_dataset = minibatch_generator()
losses = []
for data, label in itertools.islice(train_dataset, 2000):
data = Tensor(data, dtype=np.float32)
label = Tensor(label, dtype=np.int32)
opt.clear_grad()
loss = train(data, label)
opt.step()
losses.append(loss.numpy())
assert np.mean(losses[-100:]) < 0.1, "Final training Loss must be low enough"
ngrid = 10
x = np.linspace(-1.0, 1.0, ngrid)
xx, yy = np.meshgrid(x, x)
xx = xx.reshape((ngrid * ngrid, 1))
yy = yy.reshape((ngrid * ngrid, 1))
data = mge.tensor(np.concatenate((xx, yy), axis=1).astype(np.float32))
pred = infer(Tensor(data)).numpy()
precision = calculate_precision(data.numpy(), pred)
assert precision == 1.0, "Test precision must be high enough, get {}".format(
precision
)
set_option("enable_drop", 0)
# -*- coding: utf-8 -*-
import itertools
import numpy as np
import pytest
import megengine as mge
import megengine.autodiff as ad
import megengine.functional as F
import megengine.optimizer as optim
from megengine import Tensor
from megengine.jit import trace
from megengine.module import Linear, Module
from megengine.optimizer import SGD
from megengine.traced_module import trace_module
batch_size = 64
data_shape = (batch_size, 2)
label_shape = (batch_size,)
def minibatch_generator():
while True:
inp_data = np.zeros((batch_size, 2))
label = np.zeros(batch_size, dtype=np.int32)
for i in range(batch_size):
# [x0, x1], sampled from U[-1, 1]
inp_data[i, :] = np.random.rand(2) * 2 - 1
label[i] = 0 if np.prod(inp_data[i]) < 0 else 1
yield inp_data.astype(np.float32), label.astype(np.int32)
def calculate_precision(data: np.ndarray, pred: np.ndarray) -> float:
""" Calculate precision for given data and prediction.
:type data: [[x, y], ...]
:param data: Input data
:type pred: [[x_pred, y_pred], ...]
:param pred: Network output data
"""
correct = 0
assert len(data) == len(pred)
for inp_data, pred_output in zip(data, pred):
label = 0 if np.prod(inp_data) < 0 else 1
pred_label = np.argmax(pred_output)
if pred_label == label:
correct += 1
return float(correct) / len(data)
class XORNet(Module):
def __init__(self):
self.mid_layers = 14
self.num_class = 2
super().__init__()
self.fc0 = Linear(self.num_class, self.mid_layers, bias=True)
self.fc1 = Linear(self.mid_layers, self.mid_layers, bias=True)
self.fc2 = Linear(self.mid_layers, self.num_class, bias=True)
def forward(self, x):
x = self.fc0(x)
x = F.tanh(x)
x = self.fc1(x)
x = F.tanh(x)
x = self.fc2(x)
return x
@pytest.mark.parametrize("test_traced_module", [True, False])
def test_training_converge(test_traced_module):
net = XORNet()
if test_traced_module:
inp = Tensor(np.random.random((14, 2)))
net = trace_module(net, inp)
opt = SGD(net.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)
gm = ad.GradManager().attach(net.parameters())
@trace(symbolic=False)
def train(data, label):
with gm:
pred = net(data)
loss = F.nn.cross_entropy(pred, label)
gm.backward(loss)
optim.clip_grad_norm(net.parameters(), max_norm=0.2, ord=2.0)
return loss
def infer(data):
return net(data)
train_dataset = minibatch_generator()
losses = []
for data, label in itertools.islice(train_dataset, 2000):
data = Tensor(data, dtype=np.float32)
label = Tensor(label, dtype=np.int32)
opt.clear_grad()
loss = train(data, label)
optim.clip_grad_value(net.parameters(), lower=-0.1, upper=0.1)
opt.step()
losses.append(loss.numpy())
assert (
np.mean(losses[-100:]) < 0.1
), "Final training Loss must be low enough, get {}".format(np.mean(losses[-100:]))
ngrid = 10
x = np.linspace(-1.0, 1.0, ngrid)
xx, yy = np.meshgrid(x, x)
xx = xx.reshape((ngrid * ngrid, 1))
yy = yy.reshape((ngrid * ngrid, 1))
data = mge.tensor(np.concatenate((xx, yy), axis=1).astype(np.float32))
pred = infer(data)
precision = calculate_precision(data.numpy(), pred.numpy())
assert precision == 1.0, "Test precision must be high enough, get {}".format(
precision
)
# -*- coding: utf-8 -*-
import subprocess
import numpy as np
import pytest
import megengine
import megengine.autodiff as ad
import megengine.optimizer as optimizer
from megengine import Parameter, tensor
from megengine.module import Module
class Simple(Module):
def __init__(self):
super().__init__()
self.a = Parameter([1.23], dtype=np.float32)
def forward(self, x):
x = x * self.a
return x
def test_hello_world():
net = Simple()
optim = optimizer.SGD(net.parameters(), lr=1.0)
optim.clear_grad()
gm = ad.GradManager().attach(net.parameters())
data = tensor([2.34])
with gm:
loss = net(data)
gm.backward(loss)
optim.step()
np.testing.assert_almost_equal(
net.a.numpy(), np.array([1.23 - 2.34]).astype(np.float32)
)
# -*- coding: utf-8 -*-
import itertools
import os
import numpy as np
import pytest
import megengine
import megengine.autodiff as ad
import megengine.optimizer as optimizer
from megengine import Parameter, tensor
from megengine.jit import trace
from megengine.module import Module
class Simple(Module):
def __init__(self):
super().__init__()
self.a = Parameter([1.23], dtype="float32")
def forward(self, x):
x = x * self.a
return x
@pytest.mark.parametrize("trace_mode", [True, False, None])
@pytest.mark.parametrize("inplace_mode", [True, False])
def test_sgd_momentum(monkeypatch, trace_mode, inplace_mode):
with monkeypatch.context() as mk:
mk.setenv("MEGENGINE_INPLACE_UPDATE", str(int(inplace_mode)))
def train_func(data, *, model=None, optim=None, gm=None):
optim.clear_grad()
with gm:
loss = net(data)
gm.backward(loss)
optim.step()
return loss
if trace_mode is not None:
train_func = trace(symbolic=trace_mode)(train_func)
def eval_func(data, *, model=None, optim=None, gm=None):
loss = net(data)
return loss
if trace_mode is not None:
eval_func = trace(symbolic=trace_mode)(eval_func)
net = Simple()
optim = optimizer.SGD(net.parameters(), lr=1.0, momentum=0.9)
gm = ad.GradManager().attach(net.parameters())
data = tensor([2.34])
train_func(data, model=net, optim=optim, gm=gm)
np.testing.assert_almost_equal(
optim._state[net.a]["momentum_buffer"].numpy(), 2.34
)
# do 3 steps of infer
for _ in range(3):
loss = eval_func(data)
np.testing.assert_almost_equal(loss.numpy(), 2.34 * (1.23 - 2.34), 5)
np.testing.assert_almost_equal(
optim._state[net.a]["momentum_buffer"].numpy(), 2.34
)
# do a step of train
train_func(data, model=net, optim=optim, gm=gm)
np.testing.assert_almost_equal(loss.numpy(), 2.34 * (1.23 - 2.34), 5)
np.testing.assert_almost_equal(
optim._state[net.a]["momentum_buffer"].numpy(), 0.9 * 2.34 + 2.34, 5
)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册