diff --git a/core/paddlefl_mpc/mpc_protocol/aby3_operators.h b/core/paddlefl_mpc/mpc_protocol/aby3_operators.h index 77325ec609cc40e09d6dbaa33776072f124130da..70e4543709840a07077632c4d03fb693677b2eb0 100644 --- a/core/paddlefl_mpc/mpc_protocol/aby3_operators.h +++ b/core/paddlefl_mpc/mpc_protocol/aby3_operators.h @@ -381,6 +381,20 @@ public: FixedTensor::calc_precision_recall(in, &out_); } + void div(const Tensor *lhs, const Tensor *rhs, Tensor *out) override { + + auto lhs_tuple = from_tensor(lhs); + auto rhs_tuple = from_tensor(rhs); + auto out_tuple = from_tensor(out); + + auto lhs_ = std::get<0>(lhs_tuple).get(); + auto rhs_ = std::get<0>(rhs_tuple).get(); + auto out_ = std::get<0>(out_tuple).get(); + + lhs_->long_div(rhs_, out_); + + } + private: template std::tuple< diff --git a/core/paddlefl_mpc/mpc_protocol/mpc_operators.h b/core/paddlefl_mpc/mpc_protocol/mpc_operators.h index 309b5f32445d964f5007b3d614d021d37d70310a..33f2f1730c2eba7cdbb408622c81466ce0b43659 100644 --- a/core/paddlefl_mpc/mpc_protocol/mpc_operators.h +++ b/core/paddlefl_mpc/mpc_protocol/mpc_operators.h @@ -93,6 +93,8 @@ public: Tensor* out) = 0; virtual void calc_precision_recall(const Tensor* tp_fp_fn, Tensor* out) = 0; + + virtual void div(const Tensor *lhs, const Tensor *rhs, Tensor *out) = 0; }; } // mpc diff --git a/core/paddlefl_mpc/operators/mpc_mean_normalize_op.cc b/core/paddlefl_mpc/operators/mpc_mean_normalize_op.cc index d268ede45360b1edcfdf32307608fc000d020d60..4cc6276220c7869f0c721f44360d393ec7c407c9 100644 --- a/core/paddlefl_mpc/operators/mpc_mean_normalize_op.cc +++ b/core/paddlefl_mpc/operators/mpc_mean_normalize_op.cc @@ -39,6 +39,9 @@ class MpcMeanNormalizationOp : public framework::OperatorWithKernel { PADDLE_ENFORCE_EQ( ctx->HasInput("SampleNum"), true, platform::errors::InvalidArgument("Input(Sample) should not be null.")); + PADDLE_ENFORCE_EQ( + ctx->HasInput("TotalNum"), true, + platform::errors::InvalidArgument("Input(TotalNum) should not be null.")); PADDLE_ENFORCE_EQ(ctx->HasOutput("Range"), true, platform::errors::InvalidArgument( "Output(Range) should not be null.")); @@ -46,13 +49,11 @@ class MpcMeanNormalizationOp : public framework::OperatorWithKernel { platform::errors::InvalidArgument( "Output(Meanor) should not be null.")); - int64_t total_sample_num = - static_cast(ctx->Attrs().Get("total_sample_num")); - auto min_dims = ctx->GetInputDim("Min"); auto max_dims = ctx->GetInputDim("Max"); auto mean_dims = ctx->GetInputDim("Mean"); auto sample_num_dims = ctx->GetInputDim("SampleNum"); + auto total_num_dims = ctx->GetInputDim("TotalNum"); if (ctx->IsRuntime()) { PADDLE_ENFORCE_EQ(min_dims, max_dims, @@ -77,7 +78,7 @@ class MpcMeanNormalizationOp : public framework::OperatorWithKernel { PADDLE_ENFORCE_EQ( sample_num_dims.size(), 2, platform::errors::InvalidArgument( - "The dimension of Input(SampleNum) should be equal to 3 " + "The dimension of Input(SampleNum) should be equal to 2 " "(share_num, party_num). But received (%d)", sample_num_dims.size())); @@ -87,6 +88,27 @@ class MpcMeanNormalizationOp : public framework::OperatorWithKernel { "The party num of Input(SampleNum) and Input(Min) " "should be equal But received (%d) != (%d)", sample_num_dims[1], min_dims[1])); + + PADDLE_ENFORCE_EQ( + total_num_dims.size(), 2, + platform::errors::InvalidArgument( + "The dimension of Input(TotalNum) " + "should be 2, But received (%d) != (%d)", + total_num_dims.size(), 2)); + + PADDLE_ENFORCE_EQ( + sample_num_dims[0], total_num_dims[0], + platform::errors::InvalidArgument( + "The share num of Input(SampleNum) and Input(TotalNum) " + "should be equal But received (%d) != (%d)", + sample_num_dims[0], total_num_dims[0])); + + PADDLE_ENFORCE_EQ( + total_num_dims[1], 1, + platform::errors::InvalidArgument( + "The shape of Input(TotalNum) " + "should be [share_num, 1] But dims[1] received (%d) != (%d)", + total_num_dims[1], 1)); } ctx->SetOutputDim("Range", {mean_dims[0], mean_dims[2]}); @@ -121,6 +143,9 @@ class MpcMeanNormalizationOpMaker : public framework::OpProtoAndCheckerMaker { "(Tensor, default Tensor) A 1-D tensor with shape [P], " "where P is the party num. Each element contains " "sample num of party_i."); + AddInput("TotalNum", + "(Tensor, default Tensor) A 1-D tensor with shape [1], " + "Element contains sum of sample num of party_i."); AddOutput("Range", "(Tensor, default Tensor) A 1-D tensor with shape [N], " "where N is the feature num. Each element contains " @@ -129,10 +154,9 @@ class MpcMeanNormalizationOpMaker : public framework::OpProtoAndCheckerMaker { "(Tensor, default Tensor) A 1-D tensor with shape [N], " "where N is the feature num. Each element contains " "global mean of feature_i."); - AddAttr("total_sample_num", "(int) Sum of sample nums from all party."); AddComment(R"DOC( Mean normalization Operator. -When given Input(Min), Input(Max), Input(Mean) and Input(SampleNum), +When given Input(Min), Input(Max), Input(Mean), Input(SampleNum) and Input(TotalNum) this operator can be used to compute global range and mean for further feature scaling. Output(Range) is the global range of all features. diff --git a/core/paddlefl_mpc/operators/mpc_mean_normalize_op.h b/core/paddlefl_mpc/operators/mpc_mean_normalize_op.h index 91f8ba24e5bacb3c52992fb2fa137b444ec0b079..dcf9bba55817644ee25875d4deb53f41ac6428a3 100644 --- a/core/paddlefl_mpc/operators/mpc_mean_normalize_op.h +++ b/core/paddlefl_mpc/operators/mpc_mean_normalize_op.h @@ -12,6 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#include + #include "paddle/fluid/framework/op_registry.h" #include "mpc_op.h" @@ -28,6 +30,7 @@ class MpcMeanNormalizationKernel : public MpcOpKernel { const Tensor* max = context.Input("Max"); const Tensor* mean = context.Input("Mean"); const Tensor* sample_num = context.Input("SampleNum"); + const Tensor* total_num = context.Input("TotalNum"); Tensor* range = context.Output("Range"); Tensor* mean_out = context.Output("MeanOut"); @@ -65,9 +68,6 @@ class MpcMeanNormalizationKernel : public MpcOpKernel { range->mutable_data( framework::make_ddim({share_num, feat_num}), context.GetPlace(), 0); - // TODO: get total_sample_num by reduing size - int total_sample_num = context.Attr("total_sample_num"); - Tensor sample_num_; sample_num_.ShareDataWith(*sample_num); @@ -84,8 +84,20 @@ class MpcMeanNormalizationKernel : public MpcOpKernel { mean_out->mutable_data( framework::make_ddim({share_num, feat_num}), context.GetPlace(), 0); + Tensor total_num_; + + total_num_.mutable_data( + framework::make_ddim({share_num, feat_num}), context.GetPlace(), 0); + + // broadcasting total_num to shape [share_num, feat_num] + for (int i = 0; i < share_num; ++i) { + std::fill(total_num_.data() + i * feat_num, + total_num_.data() + (i + 1) * feat_num, + total_num->data()[i]); + } + mpc::MpcInstance::mpc_instance()->mpc_protocol() - ->mpc_operators()->scale(mean_out, 1.0 / total_sample_num, mean_out); + ->mpc_operators()->div(mean_out, &total_num_, mean_out); } }; diff --git a/python/paddle_fl/mpc/layers/data_preprocessing.py b/python/paddle_fl/mpc/layers/data_preprocessing.py index e21999441a34f18fddf41c966d652ca6c92d2317..813f85f4b562799e442b9f6d02c21c03f5c217f8 100644 --- a/python/paddle_fl/mpc/layers/data_preprocessing.py +++ b/python/paddle_fl/mpc/layers/data_preprocessing.py @@ -17,10 +17,11 @@ mpc data preprocessing op layers. from paddle.fluid.data_feeder import check_type, check_dtype from ..framework import check_mpc_variable_and_dtype from ..mpc_layer_helper import MpcLayerHelper +from .math import reduce_sum __all__ = ['mean_normalize'] -def mean_normalize(f_min, f_max, f_mean, sample_num, total_sample_num): +def mean_normalize(f_min, f_max, f_mean, sample_num): ''' Mean normalization is a method used to normalize the range of independent variables or features of data. @@ -40,7 +41,6 @@ def mean_normalize(f_min, f_max, f_mean, sample_num, total_sample_num): sample_num (Variable): A 1-D tensor with shape [P], where P is the party num. Each element contains sample num of party_i. - total_sample_num (int): Sum of sample nums from all party. Returns: f_range (Variable): A 1-D tensor with shape [N], where N is the @@ -51,121 +51,26 @@ def mean_normalize(f_min, f_max, f_mean, sample_num, total_sample_num): range of feature_i. Examples: .. code-block:: python - from multiprocessing import Manager - from multiprocessing import Process - import numpy as np - import paddle.fluid as fluid import paddle_fl.mpc as pfl_mpc - import mpc_data_utils as mdu - import paddle_fl.mpc.data_utils.aby3 as aby3 + pfl_mpc.init("aby3", role, "localhost", redis_server, redis_port) - redis_server = "127.0.0.1" - redis_port = 9937 - test_f_num = 100 - # party i owns 2 + 2*i rows of data - test_row_split = range(2, 10, 2) + # 2 for share, 4 for 4 party, 100 for feat_num + input_size = [2, 4, 100] + mi = pfl_mpc.data(name='mi', shape=input_size, dtype='int64') + ma = pfl_mpc.data(name='ma', shape=input_size, dtype='int64') + me = pfl_mpc.data(name='me', shape=input_size, dtype='int64') + sn = pfl_mpc.data(name='sn', shape=input_size[:-1], dtype='int64') - def mean_norm_naive(f_mat): - ma = np.amax(f_mat, axis=0) - mi = np.amin(f_mat, axis=0) - return ma - mi, np.mean(f_mat, axis=0) + out0, out1 = pfl_mpc.layers.mean_normalize(f_min=mi, f_max=ma, + f_mean=me, sample_num=sn) + exe = fluid.Executor(place=fluid.CPUPlace()) - def gen_data(f_num, sample_nums): - f_mat = np.random.rand(np.sum(sample_nums), f_num) - - f_min, f_max, f_mean = [], [], [] - - prev_idx = 0 - - for n in sample_nums: - i = prev_idx - j = i + n - - ma = np.amax(f_mat[i:j], axis=0) - mi = np.amin(f_mat[i:j], axis=0) - me = np.mean(f_mat[i:j], axis=0) - - f_min.append(mi) - f_max.append(ma) - f_mean.append(me) - - prev_idx += n - - f_min = np.array(f_min).reshape(sample_nums.size, f_num) - f_max = np.array(f_max).reshape(sample_nums.size, f_num) - f_mean = np.array(f_mean).reshape(sample_nums.size, f_num) - - return f_mat, f_min, f_max, f_mean - - - class MeanNormDemo: - - def mean_normalize(self, **kwargs): - """ - mean_normalize op ut - :param kwargs: - :return: - """ - role = kwargs['role'] - - pfl_mpc.init("aby3", role, "localhost", redis_server, redis_port) - - mi = pfl_mpc.data(name='mi', shape=self.input_size, dtype='int64') - ma = pfl_mpc.data(name='ma', shape=self.input_size, dtype='int64') - me = pfl_mpc.data(name='me', shape=self.input_size, dtype='int64') - sn = pfl_mpc.data(name='sn', shape=self.input_size, dtype='int64') - - out0, out1 = pfl_mpc.layers.mean_normalize(f_min=mi, f_max=ma, - f_mean=me, sample_num=sn, total_sample_num=self.total_num) - - exe = fluid.Executor(place=fluid.CPUPlace()) - - f_range, f_mean = exe.run(feed={'mi': kwargs['min'], - 'ma': kwargs['max'], 'me': kwargs['mean'], - 'sn': kwargs['sample_num']},fetch_list=[out0, out1]) - - self.f_range_list.append(f_range) - self.f_mean_list.append(f_mean) - - def run(self): - f_nums = test_f_num - sample_nums = np.array(test_row_split) - mat, mi, ma, me = gen_data(f_nums, sample_nums) - - self.input_size = [len(sample_nums), f_nums] - self.total_num = mat.shape[0] - - # simulating encrypting data - share = lambda x: np.array([x * mdu.mpc_one_share] * 2).astype('int64').reshape( - [2] + list(x.shape)) - - self.f_range_list = Manager().list() - self.f_mean_list = Manager().list() - - proc = list() - for role in range(3): - args = {'role': role, 'min': share(mi), 'max': share(ma), - 'mean': share(me), 'sample_num': share(sample_nums)} - p = Process(target=self.mean_normalize, kwargs=args) - - proc.append(p) - p.start() - - for p in proc: - p.join() - - f_r = aby3.reconstruct(np.array(self.f_range_list)) - f_m = aby3.reconstruct(np.array(self.f_mean_list)) - - plain_r, plain_m = mean_norm_naive(mat) - print("max error in featrue range:", np.max(np.abs(f_r - plain_r))) - print("max error in featrue mean:", np.max(np.abs(f_m - plain_m))) - - - MeanNormDemo().run() + # feed encrypted data + f_range, f_mean = exe.run(feed={'mi': f_min, 'ma': f_max, + 'me': f_mean, 'sn': sample_num}, fetch_list=[out0, out1]) ''' helper = MpcLayerHelper("mean_normalize", **locals()) @@ -180,6 +85,8 @@ def mean_normalize(f_min, f_max, f_mean, sample_num, total_sample_num): f_range = helper.create_mpc_variable_for_type_inference(dtype=f_min.dtype) f_mean_out= helper.create_mpc_variable_for_type_inference(dtype=f_min.dtype) + total_num = reduce_sum(sample_num) + op_type = 'mean_normalize' helper.append_op( @@ -189,14 +96,12 @@ def mean_normalize(f_min, f_max, f_mean, sample_num, total_sample_num): "Max": f_max, "Mean": f_mean, "SampleNum": sample_num, + "TotalNum": total_num, }, outputs={ "Range": f_range, "MeanOut": f_mean_out, }, - attrs={ - # TODO: remove attr total_sample_num, reducing sample_num instead - "total_sample_num": total_sample_num, - }) + ) return f_range, f_mean_out diff --git a/python/paddle_fl/mpc/layers/math.py b/python/paddle_fl/mpc/layers/math.py index 7eec73e674d877a43a90f8cfa360e2dc22b2daed..2f2f8492956c1066366901ac58f21a8ad7502429 100644 --- a/python/paddle_fl/mpc/layers/math.py +++ b/python/paddle_fl/mpc/layers/math.py @@ -18,6 +18,7 @@ mpc math op layers. from ..framework import MpcVariable from ..framework import check_mpc_variable_and_dtype from ..mpc_layer_helper import MpcLayerHelper +from .ml import reshape __all__ = [ 'mean', @@ -125,7 +126,7 @@ def square_error_cost(input, label): square_out = helper.create_mpc_variable_for_type_inference(dtype=input.dtype) helper.append_op( - type='mpc_square', + type='mpc_square', inputs={'X': [minus_out]}, outputs={'Out': [square_out]}) return square_out @@ -158,14 +159,14 @@ def reduce_sum(input, dim=None, keep_dim=False, name=None): Returns: out(MpcVariable): (Tensor) The output of mean op - Examples: + Examples: .. code-block:: python - + import paddle_fl.mpc as pfl_mpc pfl_mpc.init("aby3", int(args.role), "localhost", args.server, int(args.port)) data_1 = pfl_mpc.data(name='x', shape=[3, 3], dtype='int64') - pfl_mpc.layers.reshape(data_1, [1, 2]) # shape: [2, 1, 1] + pfl_mpc.layers.reshape(data_1, [1, 2]) # shape: [2, 1, 1] # data_1 = np.full(shape=(3, 4), fill_value=2) # reduce_sum: 24 """ @@ -178,7 +179,7 @@ def reduce_sum(input, dim=None, keep_dim=False, name=None): "'dim' should not contain 0, because dim[0] is share number." ) else: - dim = [i for i in range(len(input.shape))][1:] + dim = [i for i in range(len(input.shape))][1:] attrs = { 'dim': dim, @@ -194,6 +195,8 @@ def reduce_sum(input, dim=None, keep_dim=False, name=None): inputs={'X': input}, outputs={'Out': out}, attrs=attrs) + if out.shape == (2,): + out = reshape(out, list(out.shape) + [1]) return out diff --git a/python/paddle_fl/mpc/tests/unittests/test_data_preprocessing.py b/python/paddle_fl/mpc/tests/unittests/test_data_preprocessing.py index ec613c1d3902e096ec6e0281d4a94e66cb71c7bb..4cb082c85483a987ab10b2291346df2f4fcc3af6 100644 --- a/python/paddle_fl/mpc/tests/unittests/test_data_preprocessing.py +++ b/python/paddle_fl/mpc/tests/unittests/test_data_preprocessing.py @@ -78,10 +78,10 @@ class TestOpMeanNormalize(test_op_base.TestOpBase): mi = pfl_mpc.data(name='mi', shape=self.input_size, dtype='int64') ma = pfl_mpc.data(name='ma', shape=self.input_size, dtype='int64') me = pfl_mpc.data(name='me', shape=self.input_size, dtype='int64') - sn = pfl_mpc.data(name='sn', shape=self.input_size, dtype='int64') - - out0, out1 = pfl_mpc.layers.mean_normalize(f_min=mi, f_max=ma, f_mean=me, sample_num=sn, total_sample_num=self.total_num) + sn = pfl_mpc.data(name='sn', shape=self.input_size[:-1], dtype='int64') + out0, out1 = pfl_mpc.layers.mean_normalize(f_min=mi, + f_max=ma, f_mean=me, sample_num=sn) exe = fluid.Executor(place=fluid.CPUPlace()) @@ -98,7 +98,6 @@ class TestOpMeanNormalize(test_op_base.TestOpBase): mat, mi, ma, me = gen_data(f_nums, sample_nums) self.input_size = [len(sample_nums), f_nums] - self.total_num = mat.shape[0] share = lambda x: np.array([x * mdu.mpc_one_share] * 2).astype('int64').reshape( [2] + list(x.shape)) @@ -116,7 +115,7 @@ class TestOpMeanNormalize(test_op_base.TestOpBase): plain_r, plain_m = mean_norm_naive(mat) self.assertTrue(np.allclose(f_r, plain_r, atol=1e-4)) - self.assertTrue(np.allclose(f_m, plain_m, atol=1e-3)) + self.assertTrue(np.allclose(f_m, plain_m, atol=1e-4)) if __name__ == '__main__':