提交 60eaf967 编写于 作者: X xiaoli.liu@intel.com

Clean unittest code.

test=develop
上级 157e79e8
...@@ -20,217 +20,91 @@ import numpy as np ...@@ -20,217 +20,91 @@ import numpy as np
import paddle.fluid.core as core import paddle.fluid.core as core
from op_test import OpTest from op_test import OpTest
from test_pool2d_op import TestPool2D_Op, avg_pool2D_forward_naive, max_pool2D_forward_naive
def adaptive_start_index(index, input_size, output_size): class TestPool2dMKLDNNInt8_Op(TestPool2D_Op):
return int(np.floor(index * input_size / output_size)) def init_kernel_type(self):
def adaptive_end_index(index, input_size, output_size):
return int(np.ceil((index + 1) * input_size / output_size))
def max_pool2D_forward_naive(x,
ksize,
strides,
paddings,
global_pool=0,
ceil_mode=False,
exclusive=True,
adaptive=False):
N, C, H, W = x.shape
if global_pool == 1:
ksize = [H, W]
if adaptive:
H_out, W_out = ksize
else:
H_out = (H - ksize[0] + 2 * paddings[0] + strides[0] - 1
) // strides[0] + 1 if ceil_mode else (
H - ksize[0] + 2 * paddings[0]) // strides[0] + 1
W_out = (W - ksize[1] + 2 * paddings[1] + strides[1] - 1
) // strides[1] + 1 if ceil_mode else (
W - ksize[1] + 2 * paddings[1]) // strides[1] + 1
out = np.zeros((N, C, H_out, W_out))
for i in range(H_out):
for j in range(W_out):
if adaptive:
r_start = adaptive_start_index(i, H, ksize[0])
r_end = adaptive_end_index(i, H, ksize[0])
c_start = adaptive_start_index(j, W, ksize[1])
c_end = adaptive_end_index(j, W, ksize[1])
else:
r_start = np.max((i * strides[0] - paddings[0], 0))
r_end = np.min((i * strides[0] + ksize[0] - paddings[0], H))
c_start = np.max((j * strides[1] - paddings[1], 0))
c_end = np.min((j * strides[1] + ksize[1] - paddings[1], W))
x_masked = x[:, :, r_start:r_end, c_start:c_end]
out[:, :, i, j] = np.max(x_masked, axis=(2, 3))
return out
def avg_pool2D_forward_naive(x,
ksize,
strides,
paddings,
global_pool=0,
ceil_mode=False,
exclusive=True,
adaptive=False):
N, C, H, W = x.shape
if global_pool == 1:
ksize = [H, W]
if adaptive:
H_out, W_out = ksize
else:
H_out = (H - ksize[0] + 2 * paddings[0] + strides[0] - 1
) // strides[0] + 1 if ceil_mode else (
H - ksize[0] + 2 * paddings[0]) // strides[0] + 1
W_out = (W - ksize[1] + 2 * paddings[1] + strides[1] - 1
) // strides[1] + 1 if ceil_mode else (
W - ksize[1] + 2 * paddings[1]) // strides[1] + 1
out = np.zeros((N, C, H_out, W_out))
for i in range(H_out):
for j in range(W_out):
if adaptive:
r_start = adaptive_start_index(i, H, ksize[0])
r_end = adaptive_end_index(i, H, ksize[0])
c_start = adaptive_start_index(j, W, ksize[1])
c_end = adaptive_end_index(j, W, ksize[1])
else:
r_start = np.max((i * strides[0] - paddings[0], 0))
r_end = np.min((i * strides[0] + ksize[0] - paddings[0], H))
c_start = np.max((j * strides[1] - paddings[1], 0))
c_end = np.min((j * strides[1] + ksize[1] - paddings[1], W))
x_masked = x[:, :, r_start:r_end, c_start:c_end]
field_size = ((r_end - r_start) * (c_end - c_start)) \
if (exclusive or adaptive) else (ksize[0] * ksize[1])
out[:, :, i, j] = np.sum(x_masked, axis=(2, 3)) / field_size
return out
class TestPool2D_Op(OpTest):
def setUp(self):
self.op_type = "pool2d"
self.use_cudnn = False
self.use_mkldnn = True self.use_mkldnn = True
self.dtype = np.int8
self.init_test_case()
self.init_global_pool()
self.init_pool_type()
self.init_ceil_mode()
self.init_exclusive()
self.init_adaptive()
if self.global_pool:
self.paddings = [0 for _ in range(len(self.paddings))]
input = np.random.random(self.shape).astype(self.dtype)
output = self.pool2D_forward_naive(
input, self.ksize, self.strides, self.paddings, self.global_pool,
self.ceil_mode, self.exclusive, self.adaptive).astype(self.dtype)
self.inputs = {'X': OpTest.np_dtype_to_fluid_dtype(input)}
self.attrs = {
'strides': self.strides,
'paddings': self.paddings,
'ksize': self.ksize,
'pooling_type': self.pool_type,
'global_pooling': self.global_pool,
'use_cudnn': self.use_cudnn,
'use_mkldnn': self.use_mkldnn,
'ceil_mode': self.ceil_mode,
'data_format':
'AnyLayout', # TODO(dzhwinter) : should be fix latter
'exclusive': self.exclusive,
'adaptive': self.adaptive
}
self.outputs = {'Out': output}
def test_check_output(self):
self.check_output_with_place(core.CPUPlace(), atol=1e-5)
def init_test_case(self): def init_data_type(self):
self.shape = [2, 3, 5, 5]
self.ksize = [3, 3]
self.strides = [1, 1]
self.paddings = [0, 0]
self.dtype = np.int8 self.dtype = np.int8
def init_pool_type(self): def setUp(self):
self.pool_type = "avg" TestPool2D_Op.setUp(self)
self.pool2D_forward_naive = avg_pool2D_forward_naive assert self.dtype in [np.int8, np.uint8
], 'Dtype should be int8 or uint8'
def init_global_pool(self):
self.global_pool = True
def init_ceil_mode(self):
self.ceil_mode = False
def init_exclusive(self): def test_check_output(self):
self.exclusive = True self.check_output_with_place(core.CPUPlace(), atol=1e-5)
def init_adaptive(self): def test_check_grad(self):
self.adaptive = False pass
class TestCase1(TestPool2D_Op): class TestCase1Avg(TestPool2dMKLDNNInt8_Op):
def init_test_case(self): def init_test_case(self):
self.shape = [2, 3, 7, 7] self.shape = [2, 3, 7, 7]
self.ksize = [3, 3] self.ksize = [3, 3]
self.strides = [1, 1] self.strides = [1, 1]
self.paddings = [0, 0] self.paddings = [0, 0]
self.dtype = np.int8
def init_pool_type(self):
self.pool_type = "avg"
self.pool2D_forward_naive = avg_pool2D_forward_naive
def init_global_pool(self): def init_global_pool(self):
self.global_pool = False self.global_pool = False
class TestCase2(TestPool2D_Op): class TestCase2Avg(TestPool2dMKLDNNInt8_Op):
def init_test_case(self): def init_test_case(self):
self.shape = [2, 3, 7, 7] self.shape = [2, 3, 7, 7]
self.ksize = [3, 3] self.ksize = [3, 3]
self.strides = [1, 1] self.strides = [1, 1]
self.paddings = [1, 1] self.paddings = [1, 1]
self.dtype = np.uint8
def init_pool_type(self):
self.pool_type = "avg"
self.pool2D_forward_naive = avg_pool2D_forward_naive
def init_global_pool(self): def init_global_pool(self):
self.global_pool = False self.global_pool = False
class TestCase3(TestPool2D_Op): class TestCase0Max(TestPool2dMKLDNNInt8_Op):
def init_test_case(self):
self.shape = [2, 3, 7, 7]
self.ksize = [3, 3]
self.strides = [1, 1]
self.paddings = [0, 0]
self.dtype = np.int8
def init_pool_type(self): def init_pool_type(self):
self.pool_type = "max" self.pool_type = "max"
self.pool2D_forward_naive = max_pool2D_forward_naive self.pool2D_forward_naive = max_pool2D_forward_naive
class TestCase4(TestCase1): class TestCase1Max(TestCase1Avg):
def init_test_case(self): def init_pool_type(self):
self.shape = [2, 3, 7, 7] self.pool_type = "max"
self.ksize = [3, 3] self.pool2D_forward_naive = max_pool2D_forward_naive
self.strides = [1, 1]
self.paddings = [1, 1]
self.dtype = np.uint8
class TestCase2Max(TestCase2Avg):
def init_pool_type(self): def init_pool_type(self):
self.pool_type = "max" self.pool_type = "max"
self.pool2D_forward_naive = max_pool2D_forward_naive self.pool2D_forward_naive = max_pool2D_forward_naive
def create_test_s8_u8_class(parent):
class TestS8Case(parent):
def init_data_type(self):
self.dtype = np.int8
class TestU8Case(parent):
def init_data_type(self):
self.dtype = np.uint8
cls_name_s8 = "{0}_{1}".format(parent.__name__, "mkldnn_s8")
cls_name_u8 = "{0}_{1}".format(parent.__name__, "mkldnn_u8")
TestS8Case.__name__ = cls_name_s8
TestU8Case.__name__ = cls_name_u8
globals()[cls_name_s8] = TestS8Case
globals()[cls_name_u8] = TestU8Case
create_test_s8_u8_class(TestPool2dMKLDNNInt8_Op)
create_test_s8_u8_class(TestCase1Avg)
create_test_s8_u8_class(TestCase2Avg)
create_test_s8_u8_class(TestCase0Max)
create_test_s8_u8_class(TestCase1Max)
create_test_s8_u8_class(TestCase2Max)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
...@@ -18,35 +18,22 @@ import unittest ...@@ -18,35 +18,22 @@ import unittest
from test_pool2d_op import TestPool2D_Op, TestCase1, TestCase2, TestCase3, TestCase4, TestCase5 from test_pool2d_op import TestPool2D_Op, TestCase1, TestCase2, TestCase3, TestCase4, TestCase5
class TestMKLDNNCase1(TestPool2D_Op): def create_test_mkldnn_class(parent):
def init_kernel_type(self): class TestMKLDNNCase(parent):
self.use_mkldnn = True def init_kernel_type(self):
self.use_mkldnn = True
class TestMKLDNNCase2(TestCase1): cls_name = "{0}_{1}".format(parent.__name__, "MKLDNNOp")
def init_kernel_type(self): TestMKLDNNCase.__name__ = cls_name
self.use_mkldnn = True globals()[cls_name] = TestMKLDNNCase
class TestMKLDNNCase3(TestCase2): create_test_mkldnn_class(TestPool2D_Op)
def init_kernel_type(self): create_test_mkldnn_class(TestCase1)
self.use_mkldnn = True create_test_mkldnn_class(TestCase2)
create_test_mkldnn_class(TestCase3)
create_test_mkldnn_class(TestCase4)
class TestMKLDNNCase4(TestCase3): create_test_mkldnn_class(TestCase5)
def init_kernel_type(self):
self.use_mkldnn = True
class TestMKLDNNCase5(TestCase4):
def init_kernel_type(self):
self.use_mkldnn = True
class TestMKLDNNCase6(TestCase5):
def init_kernel_type(self):
self.use_mkldnn = True
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
...@@ -115,7 +115,7 @@ class TestPool2D_Op(OpTest): ...@@ -115,7 +115,7 @@ class TestPool2D_Op(OpTest):
self.op_type = "pool2d" self.op_type = "pool2d"
self.use_cudnn = False self.use_cudnn = False
self.use_mkldnn = False self.use_mkldnn = False
self.dtype = np.float32 self.init_data_type()
self.init_test_case() self.init_test_case()
self.init_global_pool() self.init_global_pool()
self.init_kernel_type() self.init_kernel_type()
...@@ -177,6 +177,9 @@ class TestPool2D_Op(OpTest): ...@@ -177,6 +177,9 @@ class TestPool2D_Op(OpTest):
def init_kernel_type(self): def init_kernel_type(self):
pass pass
def init_data_type(self):
self.dtype = np.float32
def init_pool_type(self): def init_pool_type(self):
self.pool_type = "avg" self.pool_type = "avg"
self.pool2D_forward_naive = avg_pool2D_forward_naive self.pool2D_forward_naive = avg_pool2D_forward_naive
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册