未验证 提交 9916d223 编写于 作者: D Dong Daxiang 提交者: GitHub

Merge pull request #3 from frankwhzhang/master

add dpsgd benchmark
...@@ -201,11 +201,14 @@ class FLRunTimeJob(FLJobBase): ...@@ -201,11 +201,14 @@ class FLRunTimeJob(FLJobBase):
main_fn = "%s/trainer.main.program" % folder_name main_fn = "%s/trainer.main.program" % folder_name
self._trainer_main_program = self._load_program(main_fn) self._trainer_main_program = self._load_program(main_fn)
send_fn = "%s/trainer.send.program" % folder_name try:
self._trainer_send_program = self._load_program(send_fn) send_fn = "%s/trainer.send.program" % folder_name
self._trainer_send_program = self._load_program(send_fn)
recv_fn = "%s/trainer.recv.program" % folder_name
self._trainer_recv_program = self._load_program(recv_fn) recv_fn = "%s/trainer.recv.program" % folder_name
self._trainer_recv_program = self._load_program(recv_fn)
except:
pass
endpoints_fn = "%s/endpoints" % folder endpoints_fn = "%s/endpoints" % folder
self._endpoints = self._load_endpoints(endpoints_fn) self._endpoints = self._load_endpoints(endpoints_fn)
......
...@@ -110,10 +110,43 @@ class DPSGDStrategy(FLStrategyBase): ...@@ -110,10 +110,43 @@ class DPSGDStrategy(FLStrategyBase):
def __init__(self): def __init__(self):
super(DPSGDStrategy, self).__init__() super(DPSGDStrategy, self).__init__()
@property
def learning_rate(self):
return self._learning_rate
@learning_rate.setter
def learning_rate(self, s):
self._learning_rate = s
@property
def clip(self):
return self._clip
@clip.setter
def clip(self, s):
self._clip = s
@property
def batch_size(self):
return self._batch_size
@batch_size.setter
def batch_size(self, s):
self._batch_size = s
@property
def sigma(self):
return self._sigma
@sigma.setter
def sigma(self, s):
self._sigma = s
def minimize(self, optimizer=None, losses=[]): def minimize(self, optimizer=None, losses=[]):
""" """
Do nothing in DPSGDStrategy in minimize function Define Dpsgd optimizer
""" """
optimizer = fluid.optimizer.Dpsgd(self._learning_rate, clip=self._clip, batch_size=self._batch_size, sigma=self._sigma)
optimizer.minimize(losses[0]) optimizer.minimize(losses[0])
def _build_trainer_program_for_job( def _build_trainer_program_for_job(
...@@ -128,7 +161,7 @@ class DPSGDStrategy(FLStrategyBase): ...@@ -128,7 +161,7 @@ class DPSGDStrategy(FLStrategyBase):
trainers=trainers, trainers=trainers,
sync_mode=sync_mode, sync_mode=sync_mode,
startup_program=startup_program) startup_program=startup_program)
main = transpiler.get_trainer_program() main = transpiler.get_trainer_program(wait_port=False)
job._trainer_startup_programs.append(startup_program) job._trainer_startup_programs.append(startup_program)
job._trainer_main_programs.append(main) job._trainer_main_programs.append(main)
......
...@@ -8,44 +8,41 @@ class Model(object): ...@@ -8,44 +8,41 @@ class Model(object):
def __init__(self): def __init__(self):
pass pass
def mlp(self, inputs, label, hidden_size=128): def lr_network(self):
self.concat = fluid.layers.concat(inputs, axis=1) self.inputs = fluid.layers.data(name='img', shape=[1, 28, 28], dtype="float32")
self.fc1 = fluid.layers.fc(input=self.concat, size=256, act='relu') self.label = fluid.layers.data(name='label', shape=[1],dtype='int64')
self.fc2 = fluid.layers.fc(input=self.fc1, size=128, act='relu') self.predict = fluid.layers.fc(input=self.inputs, size=10, act='softmax')
self.predict = fluid.layers.fc(input=self.fc2, size=2, act='softmax') self.sum_cost = fluid.layers.cross_entropy(input=self.predict, label=self.label)
self.sum_cost = fluid.layers.cross_entropy(input=self.predict, label=label) self.accuracy = fluid.layers.accuracy(input=self.predict, label=self.label)
self.accuracy = fluid.layers.accuracy(input=self.predict, label=label) self.loss = fluid.layers.mean(self.sum_cost)
self.loss = fluid.layers.reduce_mean(self.sum_cost)
self.startup_program = fluid.default_startup_program() self.startup_program = fluid.default_startup_program()
inputs = [fluid.layers.data( \
name=str(slot_id), shape=[5],
dtype="float32")
for slot_id in range(3)]
label = fluid.layers.data( \
name="label",
shape=[1],
dtype='int64')
model = Model() model = Model()
model.mlp(inputs, label) model.lr_network()
STEP_EPSILON = 10 STEP_EPSILON = 0.1
DELTA = 0.00001 DELTA = 0.00001
SIGMA = math.sqrt(2.0 * math.log(1.25/DELTA)) / STEP_EPSILON SIGMA = math.sqrt(2.0 * math.log(1.25/DELTA)) / STEP_EPSILON
CLIP = 10.0 CLIP = 4.0
batch_size = 1 batch_size = 64
job_generator = JobGenerator() job_generator = JobGenerator()
optimizer = fluid.optimizer.Dpsgd(0.1, clip=CLIP, batch_size=float(batch_size), sigma=0.0 * SIGMA) optimizer = fluid.optimizer.SGD(learning_rate=0.1)
job_generator.set_optimizer(optimizer) job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss]) job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program) job_generator.set_startup_program(model.startup_program)
job_generator.set_infer_feed_and_target_names(
[model.inputs.name, model.label.name], [model.loss.name, model.accuracy.name])
build_strategy = FLStrategyFactory() build_strategy = FLStrategyFactory()
build_strategy.dpsgd = True build_strategy.dpsgd = True
build_strategy.inner_step = 1 build_strategy.inner_step = 1
strategy = build_strategy.create_fl_strategy() strategy = build_strategy.create_fl_strategy()
strategy.learning_rate = 0.1
strategy.clip = CLIP
strategy.batch_size = float(batch_size)
strategy.sigma = CLIP * SIGMA
# endpoints will be collected through the cluster # endpoints will be collected through the cluster
# in this example, we suppose endpoints have been collected # in this example, we suppose endpoints have been collected
......
from paddle_fl.core.trainer.fl_trainer import FLTrainerFactory from paddle_fl.core.trainer.fl_trainer import FLTrainerFactory
from paddle_fl.core.master.fl_job import FLRunTimeJob from paddle_fl.core.master.fl_job import FLRunTimeJob
import numpy as np import numpy
import sys import sys
import paddle
import paddle.fluid as fluid
import logging
import math
def reader(): logging.basicConfig(filename="test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG)
for i in range(1000):
data_dict = {}
for i in range(3):
data_dict[str(i)] = np.random.rand(1, 5).astype('float32')
data_dict["label"] = np.random.randint(2, size=(1, 1)).astype('int64')
yield data_dict
trainer_id = int(sys.argv[1]) # trainer id for each guest trainer_id = int(sys.argv[1]) # trainer id for each guest
job_path = "fl_job_config" job_path = "fl_job_config"
...@@ -18,11 +16,55 @@ job.load_trainer_job(job_path, trainer_id) ...@@ -18,11 +16,55 @@ job.load_trainer_job(job_path, trainer_id)
trainer = FLTrainerFactory().create_fl_trainer(job) trainer = FLTrainerFactory().create_fl_trainer(job)
trainer.start() trainer.start()
step_i = 0 test_program = trainer._main_program.clone(for_test=True)
train_reader = paddle.batch(
paddle.reader.shuffle(paddle.dataset.mnist.train(), buf_size=500),
batch_size=64)
test_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=64)
img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
feeder = fluid.DataFeeder(feed_list=[img, label], place=fluid.CPUPlace())
def train_test(train_test_program, train_test_feed, train_test_reader):
acc_set = []
for test_data in train_test_reader():
acc_np = trainer.exe.run(
program=train_test_program,
feed=train_test_feed.feed(test_data),
fetch_list=["accuracy_0.tmp_0"])
acc_set.append(float(acc_np[0]))
acc_val_mean = numpy.array(acc_set).mean()
return acc_val_mean
def compute_privacy_budget(sample_ratio, epsilon, step, delta):
E = 2 * epsilon * math.sqrt(step * sample_ratio)
print("({0}, {1})-DP".format(E, delta))
output_folder = "model_node%d" % trainer_id
epoch_id = 0
step = 0
while not trainer.stop(): while not trainer.stop():
step_i += 1 epoch_id += 1
print("batch %d start train" % (step_i)) if epoch_id > 40:
for data in reader(): break
trainer.run(feed=data, fetch=[]) print("epoch %d start train" % (epoch_id))
if step_i % 100 == 0: for step_id, data in enumerate(train_reader()):
trainer.save_inference_program(output_folder) acc = trainer.run(feeder.feed(data), fetch=["accuracy_0.tmp_0"])
step += 1
# print("acc:%.3f" % (acc[0]))
acc_val = train_test(
train_test_program=test_program,
train_test_reader=test_reader,
train_test_feed=feeder)
print("Test with epoch %d, accuracy: %s" % (epoch_id, acc_val))
compute_privacy_budget(sample_ratio=0.001, epsilon=0.1, step=step, delta=0.00001)
save_dir = (output_folder + "/epoch_%d") % epoch_id
trainer.save_inference_program(output_folder)
#!/bin/bash
echo "Stop service!"
ps -ef | grep -E "fl" | grep -v grep | awk '{print $2}' | xargs kill -9
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册