diff --git a/imperative/python/megengine/module/__init__.py b/imperative/python/megengine/module/__init__.py index 9a47e1c044fe0378e108e30b6d350f5378a2351f..ea57ed3c03631ffb8ae4b7f71e59f4982fd4d6f0 100644 --- a/imperative/python/megengine/module/__init__.py +++ b/imperative/python/megengine/module/__init__.py @@ -38,8 +38,12 @@ from .rnn import LSTM, RNN, LSTMCell, RNNCell from .sequential import Sequential from .sliding_window import SlidingWindow, SlidingWindowTranspose from .vision import ( + ActiveBlur, AdditiveElemwise, AdditiveGaussianNoise, AdditiveLaplaceNoise, AdditivePoissonNoise, + Emboss, + LinearContrast, + Sharpen, ) diff --git a/imperative/python/megengine/module/vision.py b/imperative/python/megengine/module/vision.py index 8e8cf2b7b98e270ce182d1c60dd09b0e7fb16dfb..bf841a1a0a5599abc3cf3b40c9efff2ea31e883e 100644 --- a/imperative/python/megengine/module/vision.py +++ b/imperative/python/megengine/module/vision.py @@ -1,7 +1,25 @@ +import math +import numbers +from functools import lru_cache + import numpy as np +from ..core.ops import builtin +from ..core.tensor.utils import subgraph_fn +from ..functional import ( + arange, + broadcast_to, + clip, + flatten, + full_like, + gather, + mul, + reshape, + zeros, +) from ..functional.elemwise import abs, add, log from ..functional.math import sign +from ..functional.nn import conv2d, pad from ..functional.tensor import broadcast_to from ..random.rng import RNG from ..tensor import Tensor @@ -152,3 +170,342 @@ class AdditiveGaussianNoise(AdditiveElemwise): assert isinstance(seed, int) self._seed = seed self.rng_func = RNG(seed).normal + + +def _get_value_range_of_dtype(dtype): + if not dtype.kind in ["f", "u", "i", "b"]: + raise Exception( + "Cannot estimate value range of dtype '%s' " + "(type: %s)" % (str(dtype), type(dtype)) + ) + if dtype.kind == "f": + finfo = np.finfo(dtype) + value_min = finfo.min + value_mid = 0.0 + value_max = finfo.max + if dtype.kind == "u": + iinfo = np.iinfo(dtype) + value_min = iinfo.min + value_mid = iinfo.min + 0.5 * iinfo.max + value_max = iinfo.max + if dtype.kind == "i": + iinfo = np.iinfo(dtype) + value_min = iinfo.min + value_mid = -0.5 + value_max = iinfo.max + if dtype.kind == "b": + value_min = 0 + value_mid = None + value_max = 1 + return value_min, value_mid, value_max + + +def _check_out_dtype(inp, input_dtype): + if input_dtype.name == "bool": + inp = inp > 0.5 + elif input_dtype.name in ["uint8", "uint16", "int8", "int16", "int32", "float16"]: + min_dtype, _, max_dtype = _get_value_range_of_dtype(input_dtype) + inp = clip(inp, min_dtype, max_dtype) + inp = inp.astype(input_dtype) + return inp + + +class ActiveBlur(Module): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def forward(self, inp): + assert isinstance( + inp, Tensor + ), "expected input is megengine.Tensor, but got {}".format(type(inp)) + if inp.format == "nchw" or inp.format == "default": + _norm_inp = inp + N, C, H, W = inp.shape + else: + raise RuntimeError( + "expect you create Tensor with format NCHW, got format is {}".format( + inp.format + ) + ) + kernel = self.get_kernel(_norm_inp, C) + pad_inp = pad( + _norm_inp, pad_width=((0, 0), (0, 0), (1, 1), (1, 1)), mode="reflect" + ) + result = conv2d(pad_inp, kernel, groups=C) + result = _check_out_dtype(result, inp.dtype) + return result + + def _get_parameter(self, param): + if isinstance(param, bool): + raise TypeError("The input parameter cannot be of bool value type. ") + if isinstance(param, (numbers.Integral, numbers.Real)): + return float(param) + elif isinstance(param, tuple): + assert len(param) == 2, ( + "Expected parameter '%s' with type tuple to have exactly two " + "entries, but got %d." % (name, len(param)) + ) + param = self.rng_func(param[0], param[1]) + return float(param) + else: + raise TypeError("The input parameter has a wrong type. ") + + def get_kernel(self, inp, c): + raise NotImplementedError() + + +@lru_cache(maxsize=None) +def _get_EmbossKernel_op(alpha, strength, *, dtype=None, device=None): + @subgraph_fn( + "EmbossKernel", dtype=dtype, device=device, nr_inputs=2, gopt_level=None, + ) + def EmbossKernel(input, f, c): + inp_e, inp_n = input[0:2] + c_alp = c(alpha, dtype="float32") + c_sub_alp = c(1 - alpha, dtype="float32") + c_stg = c(strength, dtype="float32") + c_1 = c(1, dtype="int32") + c_2 = c(2, dtype="int32") + c_3 = c(3, dtype="int32") + + def _subtensor(src, axis, begin, end): + items = ((axis, (begin is not None), (end is not None), False, False),) + args = () + if begin is not None: + args += (begin,) + if end is not None: + args += (end,) + return f(builtin.Subtensor(items=items), src, *args) + + def _kernel_init(x): + k_1 = _subtensor(x, 0, None, c_1) + k_2 = _subtensor(x, 0, c_1, c_2) + k_3 = _subtensor(x, 0, c_2, c_3) + k_11 = f("-", _subtensor(k_1, 1, None, c_1), c_stg) + k_12_21 = f("-", _subtensor(k_1, 1, c_1, c_2), c_stg) + k_23_32 = f("+", _subtensor(k_2, 1, c_2, c_3), c_stg) + k_33 = f("+", _subtensor(k_3, 1, c_2, c_3), c_stg) + k_13 = _subtensor(k_1, 1, c_2, c_3) + k_22 = _subtensor(k_2, 1, c_1, c_2) + k_31 = _subtensor(k_3, 1, None, c_1) + nk_1 = f(builtin.Concat(axis=1), k_11, k_12_21, k_13,) + nk_2 = f(builtin.Concat(axis=1), k_12_21, k_22, k_23_32,) + nk_3 = f(builtin.Concat(axis=1), k_31, k_23_32, k_33,) + return f(builtin.Concat(axis=0), nk_1, nk_2, nk_3,) + + def _kernel_calc(k_e, k_n): + k1 = f("*", k_n, c_sub_alp) + k2 = f("*", k_e, c_alp) + return f("+", k1, k2) + + kernel_effect = _kernel_init(inp_e) + kernel = _kernel_calc(kernel_effect, inp_n) + return (kernel,), (False,) + + return EmbossKernel + + +class Emboss(ActiveBlur): + r"""overlay emboss effect and alpha-blend the result with the original input + The embossed version pronounces highlights and shadows, enhances the high-frequency information of the image, and retains the low-frequency information of the image + + Args: + alpha: Adjust visibility of embossed images. number or tuple of number, At ``0.0``, only the original image is visible, at ``1.0`` only its embossed version is visible. If a tuple ``(a, b)``, a random value will be sampled from the interval ``[a, b)``. + strength: emboss strength.Sane values are somewhere in the interval ``[0.0, 2.0)`` with ``1.0``, number or tuple of number, If a tuple ``(a, b)``, a random value will be sampled from the interval ``[a, b)``. + seed: random number seed of generator + + Examples: + >>> import numpy as np + >>> inp = mge.tensor(np.random.randint(0, 255, size=(160,3,128,128)).astype("float32")) + >>> aug = mge.module.Emboss(alpha=(0.6, 0.8), strength=(0.6, 0.8), seed=1) + >>> out = aug(inp) + """ + + def __init__(self, alpha, strength, seed=None): + assert seed is None or isinstance(seed, int) + super().__init__() + self.alpha = alpha + self.strength = strength + self.rng_func = RNG(seed).uniform + self.seed = seed + self.matrix_nochange = Tensor( + np.array([[0, 0, 0], [0, 1, 0], [0, 0, 0]], dtype=np.float32) + ) + self.matrix_effect = Tensor( + np.array([[-1, 0, 0], [0, 1, 0], [0, 0, 1]], dtype=np.float32) + ) + + def get_kernel(self, inp, c): + alpha = self._get_parameter(self.alpha) + strength = self._get_parameter(self.strength) + + get_kernel_fn = _get_EmbossKernel_op( + alpha, + strength, + dtype=self.matrix_effect.dtype, + device=self.matrix_effect.device, + ) + kernel, *_ = get_kernel_fn(self.matrix_effect, self.matrix_nochange) + kernel = broadcast_to(kernel, (c, 1, 1, kernel.shape[0], kernel.shape[1])) + return kernel + + +@lru_cache(maxsize=None) +def _get_SharpenKernel_op(alpha, lightness, *, dtype=None, device=None): + @subgraph_fn( + "SharpenKernel", dtype=dtype, device=device, nr_inputs=2, gopt_level=None, + ) + def SharpenKernel(input, f, c): + inp_e, inp_n = input[0:2] + c_alp = c(alpha, dtype="float32") + c_sub_alp = c(1 - alpha, dtype="float32") + c_lts = c(lightness, dtype="float32") + c_1 = c(1, dtype="int32") + c_2 = c(2, dtype="int32") + c_3 = c(3, dtype="int32") + + def _subtensor(src, axis, begin, end): + items = ((axis, (begin is not None), (end is not None), False, False),) + args = () + if begin is not None: + args += (begin,) + if end is not None: + args += (end,) + return f(builtin.Subtensor(items=items), src, *args) + + def _kernel_init(x): + k_1 = _subtensor(x, 0, None, c_1) + k_2 = _subtensor(x, 0, c_1, c_2) + k_3 = _subtensor(x, 0, c_2, c_3) + k_21 = _subtensor(k_2, 1, None, c_1) + k_22 = f("+", _subtensor(k_2, 1, c_1, c_2), c_lts) + k_23 = _subtensor(k_2, 1, c_2, c_3) + nk_2 = f(builtin.Concat(axis=1), k_21, k_22, k_23,) + return f(builtin.Concat(axis=0), k_1, nk_2, k_3,) + + def _kernel_calc(k_e, k_n): + k1 = f("*", k_n, c_sub_alp) + k2 = f("*", k_e, c_alp) + return f("+", k1, k2) + + kernel_effect = _kernel_init(inp_e) + kernel = _kernel_calc(kernel_effect, inp_n) + return (kernel,), (False,) + + return SharpenKernel + + +class Sharpen(ActiveBlur): + r"""Sharpen images and alpha-blend the result with the original input. + Args: + alpha: Adjust visibility of sharpened images. number or tuple of number, At ``0.0``, only the original image is visible, at ``1.0`` only its embossed version is visible. If a tuple ``(a, b)``, a random value will be sampled from the interval ``[a, b)``. + lightness: Controls the brightness of sharpened images. Sane values are somewhere in the interval ``[0.5, 2.0)`` with ``1.0``, number or tuple of number, If a tuple ``(a, b)``, a random value will be sampled from the interval ``[a, b)``. + seed: random number seed of generator + + Examples: + >>> import numpy as np + >>> inp = mge.tensor(np.random.randint(0, 255, size=(160,3,128,128)).astype("float32")) + >>> aug = mge.module.Sharpen(alpha=(0.6, 0.8), lightness=(0.6, 0.8), seed=1) + >>> out = aug(inp) + """ + + def __init__(self, alpha, lightness, seed=None): + assert seed is None or isinstance(seed, int) + super().__init__() + self.alpha = alpha + self.lightness = lightness + self.rng_func = RNG(seed).uniform + self.seed = seed + self.matrix_nochange = Tensor( + np.array([[0, 0, 0], [0, 1, 0], [0, 0, 0]], dtype=np.float32) + ) + self.matrix_effect = Tensor( + np.array([[-1, -1, -1], [-1, 8, -1], [-1, -1, -1]], dtype=np.float32) + ) + + def get_kernel(self, inp, c): + alpha = self._get_parameter(self.alpha) + lightness = self._get_parameter(self.lightness) + + get_kernel_fn = _get_SharpenKernel_op( + alpha, + lightness, + dtype=self.matrix_effect.dtype, + device=self.matrix_effect.device, + ) + kernel, *_ = get_kernel_fn(self.matrix_effect, self.matrix_nochange) + kernel = broadcast_to(kernel, (c, 1, 1, kernel.shape[0], kernel.shape[1])) + return kernel + + +class LinearContrast(Module): + r"""Adjust contrast by scaling each pixel to ``127 + alpha*(v-127)``. + Args: + alpha: number or tuple of number. If a tuple ``(a, b)``, a random value will be sampled from the interval ``[a, b)``. + per_channel:Whether to use (imagewise) the same sample(s) for all channels (False) or to sample value(s) for each channel (True). Setting this to True will therefore lead to different transformations per image and channel, otherwise only per image. + seed: random number seed of generator + + Examples: + >>> import numpy as np + >>> inp = mge.tensor(np.random.randint(0, 255, size=(160,3,128,128)).astype("float32")) + >>> aug = mge.module.LinearContrast(alpha=(0.6, 0.8), per_channel=False, seed=1) + >>> out = aug(inp) + """ + + def __init__(self, alpha, per_channel=False, seed=None): + super().__init__() + self.alpha = alpha + self.seed = seed + self.per_channel = per_channel + self.rng_func = RNG(seed).uniform + + def _get_parameter(self, param, size): + if isinstance(param, bool): + raise TypeError("The input parameter cannot be of bool value type. ") + if isinstance(param, (numbers.Integral, numbers.Real)): + value = zeros(size, dtype="float32") + value = full_like(value, param) + return value + elif isinstance(param, tuple): + assert len(param) == 2, ( + "Expected parameter '%s' with type tuple to have exactly two " + "entries, but got %d." % (name, len(param)) + ) + value = self.rng_func(param[0], param[1], size) + return value + else: + raise TypeError("The input parameter has a wrong type. ") + + def _get_table(self, size): + shape = (size, 1) + alpha = self._get_parameter(self.alpha, shape) + table = arange(255).astype("float32") + table = broadcast_to(table, (size, 255)) + table = 127 + mul((table - 127), alpha) + return clip(table, 0, 255) + + def forward(self, inp: Tensor) -> Tensor: + if inp.dtype.name == "uint8": + if self.per_channel is True: + flatten_inp = reshape( + inp, (inp.shape[0] * inp.shape[1], inp.shape[2] * inp.shape[3]) + ).astype("int32") + else: + flatten_inp = flatten(inp, 1).astype("int32") + table = self._get_table(flatten_inp.shape[0]) + result = gather(table, 1, flatten_inp) + result = reshape(result, inp.shape).astype("uint8") + return result + else: + input_dtype = inp.dtype + _, center_value, _ = _get_value_range_of_dtype(input_dtype) + if self.per_channel is True: + size = (inp.shape[0], inp.shape[1], 1, 1) + else: + size = (inp.shape[0], 1, 1, 1) + alpha = self._get_parameter(self.alpha, size) + if input_dtype.kind in ["u", "i"]: + center_value = int(center_value) + result = center_value + mul(inp.astype("float32") - center_value, alpha) + result = result.astype(input_dtype) + return result diff --git a/imperative/python/test/unit/module/test_vision.py b/imperative/python/test/unit/module/test_vision.py index 9c0a7ce6189e21d4ee4ec3562c454a013f1306b2..d6db848ab3c09b655aeaa936eed5ead71d9ee727 100644 --- a/imperative/python/test/unit/module/test_vision.py +++ b/imperative/python/test/unit/module/test_vision.py @@ -4,10 +4,14 @@ import numpy as np import pytest from megengine import Tensor +from megengine.functional import mean, zeros from megengine.module import ( AdditiveGaussianNoise, AdditiveLaplaceNoise, AdditivePoissonNoise, + Emboss, + LinearContrast, + Sharpen, ) @@ -38,3 +42,49 @@ def test_AdditiveNoise(cls, per_channel, shape, format, seed): aug_ref = cls(per_channel=per_channel, seed=seed) aug_data_ref = aug_ref(input_tensor) np.testing.assert_allclose(aug_data, aug_data_ref) + + +@pytest.mark.parametrize("cls", [Emboss, Sharpen]) +@pytest.mark.parametrize( + "shape, format, dtype", + [ + ((128, 2, 160, 160), "default", np.uint8), + ((128, 2, 160, 160), "default", np.float32), + ], +) +@pytest.mark.parametrize( + "param1, param2", [(0.5, 0.7), (0.6, 0.8), ((0.6, 0.8), (0.6, 0.8)),], +) +@pytest.mark.parametrize("seed", [1024, None]) +def test_blur(cls, shape, format, dtype, param1, param2, seed): + input_array = np.random.randint(0, 255, size=shape).astype(dtype) + input_tensor = Tensor(input_array, device="xpux", format=format) + + aug = cls(param1, param2, seed=seed) + aug_data = aug(input_tensor) + if seed is not None: # fix rng seed + aug_ref = cls(param1, param2, seed=seed) + aug_data_ref = aug_ref(input_tensor) + np.testing.assert_allclose(aug_data, aug_data_ref) + + +@pytest.mark.parametrize("per_channel", [False, True]) +@pytest.mark.parametrize( + "shape, format, dtype", + [ + ((128, 2, 160, 160), "default", np.uint8), + ((128, 2, 160, 160), "default", np.float32), + ], +) +@pytest.mark.parametrize("param1", [0.6, 0.8, (0.6, 0.8)]) +@pytest.mark.parametrize("seed", [1024, None]) +def test_LinearContrast(per_channel, shape, format, dtype, param1, seed): + input_array = np.random.randint(0, 255, size=shape).astype(dtype) + input_tensor = Tensor(input_array, device="xpux", format=format) + + aug = LinearContrast(param1, per_channel=per_channel, seed=seed) + aug_data = aug(input_tensor) + if seed is not None: # fix rng seed + aug_ref = LinearContrast(param1, per_channel=per_channel, seed=seed) + aug_data_ref = aug_ref(input_tensor) + np.testing.assert_allclose(aug_data, aug_data_ref)