test_data_preprocessing.py 3.6 KB
Newer Older
H
He, Kai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124

#   Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module test data preprocessing.

"""
import unittest
from multiprocessing import Manager

import numpy as np
import paddle.fluid as fluid
import paddle_fl.mpc as pfl_mpc
import mpc_data_utils as mdu
import paddle_fl.mpc.data_utils.aby3 as aby3

import test_op_base


def mean_norm_naive(f_mat):
    ma = np.amax(f_mat, axis=0)
    mi = np.amin(f_mat, axis=0)

    return ma - mi, np.mean(f_mat, axis=0)


def gen_data(f_num, sample_nums):
    f_mat = np.random.rand(np.sum(sample_nums), f_num)

    f_min, f_max, f_mean = [], [], []

    prev_idx = 0

    for n in sample_nums:

        i = prev_idx
        j = i + n

        ma = np.amax(f_mat[i:j], axis=0)
        mi = np.amin(f_mat[i:j], axis=0)
        me = np.mean(f_mat[i:j], axis=0)

        f_min.append(mi)
        f_max.append(ma)
        f_mean.append(me)

        prev_idx += n

    f_min = np.array(f_min).reshape(sample_nums.size, f_num)
    f_max = np.array(f_max).reshape(sample_nums.size, f_num)
    f_mean = np.array(f_mean).reshape(sample_nums.size, f_num)

    return f_mat, f_min, f_max, f_mean

class TestOpMeanNormalize(test_op_base.TestOpBase):

    def mean_normalize(self, **kwargs):
        """
        mean_normalize op ut
        :param kwargs:
        :return:
        """
        role = kwargs['role']

        pfl_mpc.init("aby3", role, "localhost", self.server, int(self.port))

        mi = pfl_mpc.data(name='mi', shape=self.input_size, dtype='int64')
        ma = pfl_mpc.data(name='ma', shape=self.input_size, dtype='int64')
        me = pfl_mpc.data(name='me', shape=self.input_size, dtype='int64')
        sn = pfl_mpc.data(name='sn', shape=self.input_size, dtype='int64')

        out0, out1 = pfl_mpc.layers.mean_normalize(f_min=mi, f_max=ma, f_mean=me, sample_num=sn, total_sample_num=self.total_num)


        exe = fluid.Executor(place=fluid.CPUPlace())

        f_range, f_mean = exe.run(feed={'mi': kwargs['min'],
            'ma': kwargs['max'], 'me': kwargs['mean'], 'sn': kwargs['sample_num']},fetch_list=[out0, out1])

        self.f_range_list.append(f_range)
        self.f_mean_list.append(f_mean)

    def test_mean_normalize(self):

        f_nums = 100
        sample_nums = np.array(range(2, 10, 2))
        mat, mi, ma, me = gen_data(f_nums, sample_nums)

        self.input_size = [len(sample_nums), f_nums]
        self.total_num = mat.shape[0]

        share = lambda x: np.array([x * mdu.mpc_one_share] * 2).astype('int64').reshape(
                [2] + list(x.shape))

        self.f_range_list = Manager().list()
        self.f_mean_list = Manager().list()

        ret = self.multi_party_run(target=self.mean_normalize,
                min=share(mi), max=share(ma), mean=share(me), sample_num=share(sample_nums))

        self.assertEqual(ret[0], True)

        f_r = aby3.reconstruct(np.array(self.f_range_list))
        f_m = aby3.reconstruct(np.array(self.f_mean_list))

        plain_r, plain_m = mean_norm_naive(mat)
        self.assertTrue(np.allclose(f_r, plain_r, atol=1e-4))
        self.assertTrue(np.allclose(f_m, plain_m, atol=1e-3))


if __name__ == '__main__':
    unittest.main()