test_ftrl_op.py 1.9 KB
Newer Older
K
kavyasrinet 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
import unittest
import numpy as np
from op_test import OpTest


class TestFTRLOp(OpTest):
    def setUp(self):
        self.op_type = "ftrl"
        w = np.random.random((102, 105)).astype("float32")
        g = np.random.random((102, 105)).astype("float32")
        sq_accum = np.full((102, 105), 0.1).astype("float32")
        linear_accum = np.full((102, 105), 0.1).astype("float32")
        lr = np.array([0.01]).astype("float32")
        l1 = 0.1
        l2 = 0.2
        lr_power = -0.5

        self.inputs = {
            'Param': w,
            'SquaredAccumulator': sq_accum,
            'LinearAccumulator': linear_accum,
            'Grad': g,
            'LearningRate': lr
        }
        self.attrs = {
            'l1': l1,
            'l2': l2,
            'lr_power': lr_power,
            'learning_rate': lr
        }
        new_accum = sq_accum + g * g
        if lr_power == -0.5:
            linear_out = linear_accum + g - (
                (np.sqrt(new_accum) - np.sqrt(sq_accum)) / lr) * w
        else:
            linear_out = linear_accum + g - ((np.power(
                new_accum, -lr_power) - np.power(sq_accum, -lr_power)) / lr) * w

        x = (l1 * np.sign(linear_out) - linear_out)
        if lr_power == -0.5:
            y = (np.sqrt(new_accum) / lr) + (2 * l2)
            pre_shrink = x / y
            param_out = np.where(np.abs(linear_out) > l1, pre_shrink, 0.0)
        else:
            y = (np.power(new_accum, -lr_power) / lr) + (2 * l2)
            pre_shrink = x / y
            param_out = np.where(np.abs(linear_out) > l1, pre_shrink, 0.0)

        sq_accum_out = sq_accum + g * g

        self.outputs = {
            'ParamOut': param_out,
            'SquaredAccumOut': sq_accum_out,
            'LinearAccumOut': linear_out
        }

    def test_check_output(self):
        self.check_output()


if __name__ == "__main__":
    unittest.main()