diff --git a/paddle_fl/core/master/fl_job.py b/paddle_fl/core/master/fl_job.py index dda29ef602739c5e9146969b0fa52229c05994a0..175d8dcda3ffeb534567bb7f312fda42c19dbf84 100644 --- a/paddle_fl/core/master/fl_job.py +++ b/paddle_fl/core/master/fl_job.py @@ -201,11 +201,14 @@ class FLRunTimeJob(FLJobBase): main_fn = "%s/trainer.main.program" % folder_name self._trainer_main_program = self._load_program(main_fn) - send_fn = "%s/trainer.send.program" % folder_name - self._trainer_send_program = self._load_program(send_fn) - - recv_fn = "%s/trainer.recv.program" % folder_name - self._trainer_recv_program = self._load_program(recv_fn) + try: + send_fn = "%s/trainer.send.program" % folder_name + self._trainer_send_program = self._load_program(send_fn) + + recv_fn = "%s/trainer.recv.program" % folder_name + self._trainer_recv_program = self._load_program(recv_fn) + except: + pass endpoints_fn = "%s/endpoints" % folder self._endpoints = self._load_endpoints(endpoints_fn) diff --git a/paddle_fl/core/strategy/fl_strategy_base.py b/paddle_fl/core/strategy/fl_strategy_base.py index f450b6de1da9a5342d2280073b7d40d78fa26245..95f1cc8483b57a9d93dbaa19e7f5dbdd3cd646e8 100644 --- a/paddle_fl/core/strategy/fl_strategy_base.py +++ b/paddle_fl/core/strategy/fl_strategy_base.py @@ -110,10 +110,43 @@ class DPSGDStrategy(FLStrategyBase): def __init__(self): super(DPSGDStrategy, self).__init__() + @property + def learning_rate(self): + return self._learning_rate + + @learning_rate.setter + def learning_rate(self, s): + self._learning_rate = s + + @property + def clip(self): + return self._clip + + @clip.setter + def clip(self, s): + self._clip = s + + @property + def batch_size(self): + return self._batch_size + + @batch_size.setter + def batch_size(self, s): + self._batch_size = s + + @property + def sigma(self): + return self._sigma + + @sigma.setter + def sigma(self, s): + self._sigma = s + def minimize(self, optimizer=None, losses=[]): """ - Do nothing in DPSGDStrategy in minimize function + Define Dpsgd optimizer """ + optimizer = fluid.optimizer.Dpsgd(self._learning_rate, clip=self._clip, batch_size=self._batch_size, sigma=self._sigma) optimizer.minimize(losses[0]) def _build_trainer_program_for_job( @@ -128,7 +161,7 @@ class DPSGDStrategy(FLStrategyBase): trainers=trainers, sync_mode=sync_mode, startup_program=startup_program) - main = transpiler.get_trainer_program() + main = transpiler.get_trainer_program(wait_port=False) job._trainer_startup_programs.append(startup_program) job._trainer_main_programs.append(main) diff --git a/paddle_fl/examples/dpsgd_demo/fl_master.py b/paddle_fl/examples/dpsgd_demo/fl_master.py index 37ce22919f2d15c7d471982c41e2ce6ec05292a1..b5c42c2e5f7348e47dba8a3ebadcf6d0f1709bb7 100644 --- a/paddle_fl/examples/dpsgd_demo/fl_master.py +++ b/paddle_fl/examples/dpsgd_demo/fl_master.py @@ -8,44 +8,41 @@ class Model(object): def __init__(self): pass - def mlp(self, inputs, label, hidden_size=128): - self.concat = fluid.layers.concat(inputs, axis=1) - self.fc1 = fluid.layers.fc(input=self.concat, size=256, act='relu') - self.fc2 = fluid.layers.fc(input=self.fc1, size=128, act='relu') - self.predict = fluid.layers.fc(input=self.fc2, size=2, act='softmax') - self.sum_cost = fluid.layers.cross_entropy(input=self.predict, label=label) - self.accuracy = fluid.layers.accuracy(input=self.predict, label=label) - self.loss = fluid.layers.reduce_mean(self.sum_cost) + def lr_network(self): + self.inputs = fluid.layers.data(name='img', shape=[1, 28, 28], dtype="float32") + self.label = fluid.layers.data(name='label', shape=[1],dtype='int64') + self.predict = fluid.layers.fc(input=self.inputs, size=10, act='softmax') + self.sum_cost = fluid.layers.cross_entropy(input=self.predict, label=self.label) + self.accuracy = fluid.layers.accuracy(input=self.predict, label=self.label) + self.loss = fluid.layers.mean(self.sum_cost) self.startup_program = fluid.default_startup_program() -inputs = [fluid.layers.data( \ - name=str(slot_id), shape=[5], - dtype="float32") - for slot_id in range(3)] -label = fluid.layers.data( \ - name="label", - shape=[1], - dtype='int64') model = Model() -model.mlp(inputs, label) +model.lr_network() -STEP_EPSILON = 10 +STEP_EPSILON = 0.1 DELTA = 0.00001 SIGMA = math.sqrt(2.0 * math.log(1.25/DELTA)) / STEP_EPSILON -CLIP = 10.0 -batch_size = 1 +CLIP = 4.0 +batch_size = 64 job_generator = JobGenerator() -optimizer = fluid.optimizer.Dpsgd(0.1, clip=CLIP, batch_size=float(batch_size), sigma=0.0 * SIGMA) +optimizer = fluid.optimizer.SGD(learning_rate=0.1) job_generator.set_optimizer(optimizer) job_generator.set_losses([model.loss]) job_generator.set_startup_program(model.startup_program) +job_generator.set_infer_feed_and_target_names( + [model.inputs.name, model.label.name], [model.loss.name, model.accuracy.name]) build_strategy = FLStrategyFactory() build_strategy.dpsgd = True build_strategy.inner_step = 1 strategy = build_strategy.create_fl_strategy() +strategy.learning_rate = 0.1 +strategy.clip = CLIP +strategy.batch_size = float(batch_size) +strategy.sigma = CLIP * SIGMA # endpoints will be collected through the cluster # in this example, we suppose endpoints have been collected diff --git a/paddle_fl/examples/dpsgd_demo/fl_trainer.py b/paddle_fl/examples/dpsgd_demo/fl_trainer.py index e13e898ebc1fabfef33e7cf917fd554405d6942c..c57b2b1acd07f14459d274ce9f2e7ae7105ccb87 100644 --- a/paddle_fl/examples/dpsgd_demo/fl_trainer.py +++ b/paddle_fl/examples/dpsgd_demo/fl_trainer.py @@ -1,15 +1,13 @@ from paddle_fl.core.trainer.fl_trainer import FLTrainerFactory from paddle_fl.core.master.fl_job import FLRunTimeJob -import numpy as np +import numpy import sys +import paddle +import paddle.fluid as fluid +import logging +import math -def reader(): - for i in range(1000): - data_dict = {} - for i in range(3): - data_dict[str(i)] = np.random.rand(1, 5).astype('float32') - data_dict["label"] = np.random.randint(2, size=(1, 1)).astype('int64') - yield data_dict +logging.basicConfig(filename="test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG) trainer_id = int(sys.argv[1]) # trainer id for each guest job_path = "fl_job_config" @@ -18,11 +16,55 @@ job.load_trainer_job(job_path, trainer_id) trainer = FLTrainerFactory().create_fl_trainer(job) trainer.start() -step_i = 0 +test_program = trainer._main_program.clone(for_test=True) + +train_reader = paddle.batch( + paddle.reader.shuffle(paddle.dataset.mnist.train(), buf_size=500), + batch_size=64) +test_reader = paddle.batch( + paddle.dataset.mnist.test(), batch_size=64) + +img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32') +label = fluid.layers.data(name='label', shape=[1], dtype='int64') +feeder = fluid.DataFeeder(feed_list=[img, label], place=fluid.CPUPlace()) + +def train_test(train_test_program, train_test_feed, train_test_reader): + acc_set = [] + for test_data in train_test_reader(): + acc_np = trainer.exe.run( + program=train_test_program, + feed=train_test_feed.feed(test_data), + fetch_list=["accuracy_0.tmp_0"]) + acc_set.append(float(acc_np[0])) + acc_val_mean = numpy.array(acc_set).mean() + return acc_val_mean + +def compute_privacy_budget(sample_ratio, epsilon, step, delta): + E = 2 * epsilon * math.sqrt(step * sample_ratio) + print("({0}, {1})-DP".format(E, delta)) + +output_folder = "model_node%d" % trainer_id +epoch_id = 0 +step = 0 while not trainer.stop(): - step_i += 1 - print("batch %d start train" % (step_i)) - for data in reader(): - trainer.run(feed=data, fetch=[]) - if step_i % 100 == 0: - trainer.save_inference_program(output_folder) + epoch_id += 1 + if epoch_id > 40: + break + print("epoch %d start train" % (epoch_id)) + for step_id, data in enumerate(train_reader()): + acc = trainer.run(feeder.feed(data), fetch=["accuracy_0.tmp_0"]) + step += 1 + # print("acc:%.3f" % (acc[0])) + + acc_val = train_test( + train_test_program=test_program, + train_test_reader=test_reader, + train_test_feed=feeder) + + print("Test with epoch %d, accuracy: %s" % (epoch_id, acc_val)) + compute_privacy_budget(sample_ratio=0.001, epsilon=0.1, step=step, delta=0.00001) + + save_dir = (output_folder + "/epoch_%d") % epoch_id + trainer.save_inference_program(output_folder) + + diff --git a/paddle_fl/examples/dpsgd_demo/stop.sh b/paddle_fl/examples/dpsgd_demo/stop.sh new file mode 100644 index 0000000000000000000000000000000000000000..148759658e3986ddebf9fa4153874157a4b738de --- /dev/null +++ b/paddle_fl/examples/dpsgd_demo/stop.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +echo "Stop service!" + +ps -ef | grep -E "fl" | grep -v grep | awk '{print $2}' | xargs kill -9