From e8240167905f8337f6e62c291b26dd21bae0f798 Mon Sep 17 00:00:00 2001 From: "He, Kai" Date: Mon, 14 Sep 2020 08:16:12 +0000 Subject: [PATCH] add reduce to mean_normalize --- .../mpc_protocol/aby3_operators.h | 14 ++ .../paddlefl_mpc/mpc_protocol/mpc_operators.h | 2 + .../operators/mpc_mean_normalize_op.cc | 36 ++++- .../operators/mpc_mean_normalize_op.h | 20 ++- .../mpc/layers/data_preprocessing.py | 133 +++--------------- python/paddle_fl/mpc/layers/math.py | 13 +- .../unittests/test_data_preprocessing.py | 9 +- 7 files changed, 93 insertions(+), 134 deletions(-) diff --git a/core/paddlefl_mpc/mpc_protocol/aby3_operators.h b/core/paddlefl_mpc/mpc_protocol/aby3_operators.h index 77325ec..70e4543 100644 --- a/core/paddlefl_mpc/mpc_protocol/aby3_operators.h +++ b/core/paddlefl_mpc/mpc_protocol/aby3_operators.h @@ -381,6 +381,20 @@ public: FixedTensor::calc_precision_recall(in, &out_); } + void div(const Tensor *lhs, const Tensor *rhs, Tensor *out) override { + + auto lhs_tuple = from_tensor(lhs); + auto rhs_tuple = from_tensor(rhs); + auto out_tuple = from_tensor(out); + + auto lhs_ = std::get<0>(lhs_tuple).get(); + auto rhs_ = std::get<0>(rhs_tuple).get(); + auto out_ = std::get<0>(out_tuple).get(); + + lhs_->long_div(rhs_, out_); + + } + private: template std::tuple< diff --git a/core/paddlefl_mpc/mpc_protocol/mpc_operators.h b/core/paddlefl_mpc/mpc_protocol/mpc_operators.h index 309b5f3..33f2f17 100644 --- a/core/paddlefl_mpc/mpc_protocol/mpc_operators.h +++ b/core/paddlefl_mpc/mpc_protocol/mpc_operators.h @@ -93,6 +93,8 @@ public: Tensor* out) = 0; virtual void calc_precision_recall(const Tensor* tp_fp_fn, Tensor* out) = 0; + + virtual void div(const Tensor *lhs, const Tensor *rhs, Tensor *out) = 0; }; } // mpc diff --git a/core/paddlefl_mpc/operators/mpc_mean_normalize_op.cc b/core/paddlefl_mpc/operators/mpc_mean_normalize_op.cc index d268ede..4cc6276 100644 --- a/core/paddlefl_mpc/operators/mpc_mean_normalize_op.cc +++ b/core/paddlefl_mpc/operators/mpc_mean_normalize_op.cc @@ -39,6 +39,9 @@ class MpcMeanNormalizationOp : public framework::OperatorWithKernel { PADDLE_ENFORCE_EQ( ctx->HasInput("SampleNum"), true, platform::errors::InvalidArgument("Input(Sample) should not be null.")); + PADDLE_ENFORCE_EQ( + ctx->HasInput("TotalNum"), true, + platform::errors::InvalidArgument("Input(TotalNum) should not be null.")); PADDLE_ENFORCE_EQ(ctx->HasOutput("Range"), true, platform::errors::InvalidArgument( "Output(Range) should not be null.")); @@ -46,13 +49,11 @@ class MpcMeanNormalizationOp : public framework::OperatorWithKernel { platform::errors::InvalidArgument( "Output(Meanor) should not be null.")); - int64_t total_sample_num = - static_cast(ctx->Attrs().Get("total_sample_num")); - auto min_dims = ctx->GetInputDim("Min"); auto max_dims = ctx->GetInputDim("Max"); auto mean_dims = ctx->GetInputDim("Mean"); auto sample_num_dims = ctx->GetInputDim("SampleNum"); + auto total_num_dims = ctx->GetInputDim("TotalNum"); if (ctx->IsRuntime()) { PADDLE_ENFORCE_EQ(min_dims, max_dims, @@ -77,7 +78,7 @@ class MpcMeanNormalizationOp : public framework::OperatorWithKernel { PADDLE_ENFORCE_EQ( sample_num_dims.size(), 2, platform::errors::InvalidArgument( - "The dimension of Input(SampleNum) should be equal to 3 " + "The dimension of Input(SampleNum) should be equal to 2 " "(share_num, party_num). But received (%d)", sample_num_dims.size())); @@ -87,6 +88,27 @@ class MpcMeanNormalizationOp : public framework::OperatorWithKernel { "The party num of Input(SampleNum) and Input(Min) " "should be equal But received (%d) != (%d)", sample_num_dims[1], min_dims[1])); + + PADDLE_ENFORCE_EQ( + total_num_dims.size(), 2, + platform::errors::InvalidArgument( + "The dimension of Input(TotalNum) " + "should be 2, But received (%d) != (%d)", + total_num_dims.size(), 2)); + + PADDLE_ENFORCE_EQ( + sample_num_dims[0], total_num_dims[0], + platform::errors::InvalidArgument( + "The share num of Input(SampleNum) and Input(TotalNum) " + "should be equal But received (%d) != (%d)", + sample_num_dims[0], total_num_dims[0])); + + PADDLE_ENFORCE_EQ( + total_num_dims[1], 1, + platform::errors::InvalidArgument( + "The shape of Input(TotalNum) " + "should be [share_num, 1] But dims[1] received (%d) != (%d)", + total_num_dims[1], 1)); } ctx->SetOutputDim("Range", {mean_dims[0], mean_dims[2]}); @@ -121,6 +143,9 @@ class MpcMeanNormalizationOpMaker : public framework::OpProtoAndCheckerMaker { "(Tensor, default Tensor) A 1-D tensor with shape [P], " "where P is the party num. Each element contains " "sample num of party_i."); + AddInput("TotalNum", + "(Tensor, default Tensor) A 1-D tensor with shape [1], " + "Element contains sum of sample num of party_i."); AddOutput("Range", "(Tensor, default Tensor) A 1-D tensor with shape [N], " "where N is the feature num. Each element contains " @@ -129,10 +154,9 @@ class MpcMeanNormalizationOpMaker : public framework::OpProtoAndCheckerMaker { "(Tensor, default Tensor) A 1-D tensor with shape [N], " "where N is the feature num. Each element contains " "global mean of feature_i."); - AddAttr("total_sample_num", "(int) Sum of sample nums from all party."); AddComment(R"DOC( Mean normalization Operator. -When given Input(Min), Input(Max), Input(Mean) and Input(SampleNum), +When given Input(Min), Input(Max), Input(Mean), Input(SampleNum) and Input(TotalNum) this operator can be used to compute global range and mean for further feature scaling. Output(Range) is the global range of all features. diff --git a/core/paddlefl_mpc/operators/mpc_mean_normalize_op.h b/core/paddlefl_mpc/operators/mpc_mean_normalize_op.h index 91f8ba2..dcf9bba 100644 --- a/core/paddlefl_mpc/operators/mpc_mean_normalize_op.h +++ b/core/paddlefl_mpc/operators/mpc_mean_normalize_op.h @@ -12,6 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#include + #include "paddle/fluid/framework/op_registry.h" #include "mpc_op.h" @@ -28,6 +30,7 @@ class MpcMeanNormalizationKernel : public MpcOpKernel { const Tensor* max = context.Input("Max"); const Tensor* mean = context.Input("Mean"); const Tensor* sample_num = context.Input("SampleNum"); + const Tensor* total_num = context.Input("TotalNum"); Tensor* range = context.Output("Range"); Tensor* mean_out = context.Output("MeanOut"); @@ -65,9 +68,6 @@ class MpcMeanNormalizationKernel : public MpcOpKernel { range->mutable_data( framework::make_ddim({share_num, feat_num}), context.GetPlace(), 0); - // TODO: get total_sample_num by reduing size - int total_sample_num = context.Attr("total_sample_num"); - Tensor sample_num_; sample_num_.ShareDataWith(*sample_num); @@ -84,8 +84,20 @@ class MpcMeanNormalizationKernel : public MpcOpKernel { mean_out->mutable_data( framework::make_ddim({share_num, feat_num}), context.GetPlace(), 0); + Tensor total_num_; + + total_num_.mutable_data( + framework::make_ddim({share_num, feat_num}), context.GetPlace(), 0); + + // broadcasting total_num to shape [share_num, feat_num] + for (int i = 0; i < share_num; ++i) { + std::fill(total_num_.data() + i * feat_num, + total_num_.data() + (i + 1) * feat_num, + total_num->data()[i]); + } + mpc::MpcInstance::mpc_instance()->mpc_protocol() - ->mpc_operators()->scale(mean_out, 1.0 / total_sample_num, mean_out); + ->mpc_operators()->div(mean_out, &total_num_, mean_out); } }; diff --git a/python/paddle_fl/mpc/layers/data_preprocessing.py b/python/paddle_fl/mpc/layers/data_preprocessing.py index e219994..813f85f 100644 --- a/python/paddle_fl/mpc/layers/data_preprocessing.py +++ b/python/paddle_fl/mpc/layers/data_preprocessing.py @@ -17,10 +17,11 @@ mpc data preprocessing op layers. from paddle.fluid.data_feeder import check_type, check_dtype from ..framework import check_mpc_variable_and_dtype from ..mpc_layer_helper import MpcLayerHelper +from .math import reduce_sum __all__ = ['mean_normalize'] -def mean_normalize(f_min, f_max, f_mean, sample_num, total_sample_num): +def mean_normalize(f_min, f_max, f_mean, sample_num): ''' Mean normalization is a method used to normalize the range of independent variables or features of data. @@ -40,7 +41,6 @@ def mean_normalize(f_min, f_max, f_mean, sample_num, total_sample_num): sample_num (Variable): A 1-D tensor with shape [P], where P is the party num. Each element contains sample num of party_i. - total_sample_num (int): Sum of sample nums from all party. Returns: f_range (Variable): A 1-D tensor with shape [N], where N is the @@ -51,121 +51,26 @@ def mean_normalize(f_min, f_max, f_mean, sample_num, total_sample_num): range of feature_i. Examples: .. code-block:: python - from multiprocessing import Manager - from multiprocessing import Process - import numpy as np - import paddle.fluid as fluid import paddle_fl.mpc as pfl_mpc - import mpc_data_utils as mdu - import paddle_fl.mpc.data_utils.aby3 as aby3 + pfl_mpc.init("aby3", role, "localhost", redis_server, redis_port) - redis_server = "127.0.0.1" - redis_port = 9937 - test_f_num = 100 - # party i owns 2 + 2*i rows of data - test_row_split = range(2, 10, 2) + # 2 for share, 4 for 4 party, 100 for feat_num + input_size = [2, 4, 100] + mi = pfl_mpc.data(name='mi', shape=input_size, dtype='int64') + ma = pfl_mpc.data(name='ma', shape=input_size, dtype='int64') + me = pfl_mpc.data(name='me', shape=input_size, dtype='int64') + sn = pfl_mpc.data(name='sn', shape=input_size[:-1], dtype='int64') - def mean_norm_naive(f_mat): - ma = np.amax(f_mat, axis=0) - mi = np.amin(f_mat, axis=0) - return ma - mi, np.mean(f_mat, axis=0) + out0, out1 = pfl_mpc.layers.mean_normalize(f_min=mi, f_max=ma, + f_mean=me, sample_num=sn) + exe = fluid.Executor(place=fluid.CPUPlace()) - def gen_data(f_num, sample_nums): - f_mat = np.random.rand(np.sum(sample_nums), f_num) - - f_min, f_max, f_mean = [], [], [] - - prev_idx = 0 - - for n in sample_nums: - i = prev_idx - j = i + n - - ma = np.amax(f_mat[i:j], axis=0) - mi = np.amin(f_mat[i:j], axis=0) - me = np.mean(f_mat[i:j], axis=0) - - f_min.append(mi) - f_max.append(ma) - f_mean.append(me) - - prev_idx += n - - f_min = np.array(f_min).reshape(sample_nums.size, f_num) - f_max = np.array(f_max).reshape(sample_nums.size, f_num) - f_mean = np.array(f_mean).reshape(sample_nums.size, f_num) - - return f_mat, f_min, f_max, f_mean - - - class MeanNormDemo: - - def mean_normalize(self, **kwargs): - """ - mean_normalize op ut - :param kwargs: - :return: - """ - role = kwargs['role'] - - pfl_mpc.init("aby3", role, "localhost", redis_server, redis_port) - - mi = pfl_mpc.data(name='mi', shape=self.input_size, dtype='int64') - ma = pfl_mpc.data(name='ma', shape=self.input_size, dtype='int64') - me = pfl_mpc.data(name='me', shape=self.input_size, dtype='int64') - sn = pfl_mpc.data(name='sn', shape=self.input_size, dtype='int64') - - out0, out1 = pfl_mpc.layers.mean_normalize(f_min=mi, f_max=ma, - f_mean=me, sample_num=sn, total_sample_num=self.total_num) - - exe = fluid.Executor(place=fluid.CPUPlace()) - - f_range, f_mean = exe.run(feed={'mi': kwargs['min'], - 'ma': kwargs['max'], 'me': kwargs['mean'], - 'sn': kwargs['sample_num']},fetch_list=[out0, out1]) - - self.f_range_list.append(f_range) - self.f_mean_list.append(f_mean) - - def run(self): - f_nums = test_f_num - sample_nums = np.array(test_row_split) - mat, mi, ma, me = gen_data(f_nums, sample_nums) - - self.input_size = [len(sample_nums), f_nums] - self.total_num = mat.shape[0] - - # simulating encrypting data - share = lambda x: np.array([x * mdu.mpc_one_share] * 2).astype('int64').reshape( - [2] + list(x.shape)) - - self.f_range_list = Manager().list() - self.f_mean_list = Manager().list() - - proc = list() - for role in range(3): - args = {'role': role, 'min': share(mi), 'max': share(ma), - 'mean': share(me), 'sample_num': share(sample_nums)} - p = Process(target=self.mean_normalize, kwargs=args) - - proc.append(p) - p.start() - - for p in proc: - p.join() - - f_r = aby3.reconstruct(np.array(self.f_range_list)) - f_m = aby3.reconstruct(np.array(self.f_mean_list)) - - plain_r, plain_m = mean_norm_naive(mat) - print("max error in featrue range:", np.max(np.abs(f_r - plain_r))) - print("max error in featrue mean:", np.max(np.abs(f_m - plain_m))) - - - MeanNormDemo().run() + # feed encrypted data + f_range, f_mean = exe.run(feed={'mi': f_min, 'ma': f_max, + 'me': f_mean, 'sn': sample_num}, fetch_list=[out0, out1]) ''' helper = MpcLayerHelper("mean_normalize", **locals()) @@ -180,6 +85,8 @@ def mean_normalize(f_min, f_max, f_mean, sample_num, total_sample_num): f_range = helper.create_mpc_variable_for_type_inference(dtype=f_min.dtype) f_mean_out= helper.create_mpc_variable_for_type_inference(dtype=f_min.dtype) + total_num = reduce_sum(sample_num) + op_type = 'mean_normalize' helper.append_op( @@ -189,14 +96,12 @@ def mean_normalize(f_min, f_max, f_mean, sample_num, total_sample_num): "Max": f_max, "Mean": f_mean, "SampleNum": sample_num, + "TotalNum": total_num, }, outputs={ "Range": f_range, "MeanOut": f_mean_out, }, - attrs={ - # TODO: remove attr total_sample_num, reducing sample_num instead - "total_sample_num": total_sample_num, - }) + ) return f_range, f_mean_out diff --git a/python/paddle_fl/mpc/layers/math.py b/python/paddle_fl/mpc/layers/math.py index 7eec73e..2f2f849 100644 --- a/python/paddle_fl/mpc/layers/math.py +++ b/python/paddle_fl/mpc/layers/math.py @@ -18,6 +18,7 @@ mpc math op layers. from ..framework import MpcVariable from ..framework import check_mpc_variable_and_dtype from ..mpc_layer_helper import MpcLayerHelper +from .ml import reshape __all__ = [ 'mean', @@ -125,7 +126,7 @@ def square_error_cost(input, label): square_out = helper.create_mpc_variable_for_type_inference(dtype=input.dtype) helper.append_op( - type='mpc_square', + type='mpc_square', inputs={'X': [minus_out]}, outputs={'Out': [square_out]}) return square_out @@ -158,14 +159,14 @@ def reduce_sum(input, dim=None, keep_dim=False, name=None): Returns: out(MpcVariable): (Tensor) The output of mean op - Examples: + Examples: .. code-block:: python - + import paddle_fl.mpc as pfl_mpc pfl_mpc.init("aby3", int(args.role), "localhost", args.server, int(args.port)) data_1 = pfl_mpc.data(name='x', shape=[3, 3], dtype='int64') - pfl_mpc.layers.reshape(data_1, [1, 2]) # shape: [2, 1, 1] + pfl_mpc.layers.reshape(data_1, [1, 2]) # shape: [2, 1, 1] # data_1 = np.full(shape=(3, 4), fill_value=2) # reduce_sum: 24 """ @@ -178,7 +179,7 @@ def reduce_sum(input, dim=None, keep_dim=False, name=None): "'dim' should not contain 0, because dim[0] is share number." ) else: - dim = [i for i in range(len(input.shape))][1:] + dim = [i for i in range(len(input.shape))][1:] attrs = { 'dim': dim, @@ -194,6 +195,8 @@ def reduce_sum(input, dim=None, keep_dim=False, name=None): inputs={'X': input}, outputs={'Out': out}, attrs=attrs) + if out.shape == (2,): + out = reshape(out, list(out.shape) + [1]) return out diff --git a/python/paddle_fl/mpc/tests/unittests/test_data_preprocessing.py b/python/paddle_fl/mpc/tests/unittests/test_data_preprocessing.py index ec613c1..4cb082c 100644 --- a/python/paddle_fl/mpc/tests/unittests/test_data_preprocessing.py +++ b/python/paddle_fl/mpc/tests/unittests/test_data_preprocessing.py @@ -78,10 +78,10 @@ class TestOpMeanNormalize(test_op_base.TestOpBase): mi = pfl_mpc.data(name='mi', shape=self.input_size, dtype='int64') ma = pfl_mpc.data(name='ma', shape=self.input_size, dtype='int64') me = pfl_mpc.data(name='me', shape=self.input_size, dtype='int64') - sn = pfl_mpc.data(name='sn', shape=self.input_size, dtype='int64') - - out0, out1 = pfl_mpc.layers.mean_normalize(f_min=mi, f_max=ma, f_mean=me, sample_num=sn, total_sample_num=self.total_num) + sn = pfl_mpc.data(name='sn', shape=self.input_size[:-1], dtype='int64') + out0, out1 = pfl_mpc.layers.mean_normalize(f_min=mi, + f_max=ma, f_mean=me, sample_num=sn) exe = fluid.Executor(place=fluid.CPUPlace()) @@ -98,7 +98,6 @@ class TestOpMeanNormalize(test_op_base.TestOpBase): mat, mi, ma, me = gen_data(f_nums, sample_nums) self.input_size = [len(sample_nums), f_nums] - self.total_num = mat.shape[0] share = lambda x: np.array([x * mdu.mpc_one_share] * 2).astype('int64').reshape( [2] + list(x.shape)) @@ -116,7 +115,7 @@ class TestOpMeanNormalize(test_op_base.TestOpBase): plain_r, plain_m = mean_norm_naive(mat) self.assertTrue(np.allclose(f_r, plain_r, atol=1e-4)) - self.assertTrue(np.allclose(f_m, plain_m, atol=1e-3)) + self.assertTrue(np.allclose(f_m, plain_m, atol=1e-4)) if __name__ == '__main__': -- GitLab