提交 74b65a6c 编写于 作者: J jhjiangcs

optimize sec_agg demo according to daxiang's comments.

上级 6eecc3ef
......@@ -16,7 +16,7 @@ python setup.py install
### Model
The simplest Softmax regression model is to get features with input layer passing through a fully connected layer and then compute and ouput probabilities of multiple classifications directly via Softmax function [[PaddlePaddle tutorial: recognize digits](https://github.com/PaddlePaddle/book/tree/develop/02.recognize_digits#references)].
The simplest Softmax regression model is to get features with input layer passing through a fully connected layer and then compute and ouput probabilities of multiple classes directly via Softmax function [[PaddlePaddle tutorial: recognize digits](https://github.com/PaddlePaddle/book/tree/develop/02.recognize_digits#references)].
### Datasets
......@@ -80,6 +80,10 @@ job_generator.set_infer_feed_and_target_names(
build_strategy = FLStrategyFactory()
build_strategy.sec_agg = True
param_name_list = []
param_name_list.append("fc_0.w_0.opti.trainer_") # need trainer_id when running
param_name_list.append("fc_0.b_0.opti.trainer_")
build_strategy.param_name_list = param_name_list
build_strategy.inner_step = 10
strategy = build_strategy.create_fl_strategy()
......@@ -95,9 +99,13 @@ job_generator.generate_fl_job(
How to work in RunTime
```shell
python3 -u fl_server.py >server0.log &
python3 -u fl_trainer.py 0 data/ >trainer0.log &
python3 -u fl_trainer.py 1 data/ >trainer1.log &
python3 fl_master.py
sleep 2
python3 -u fl_server.py >log/server0.log &
sleep 2
python3 -u fl_trainer.py 0 >log/trainer0.log &
sleep 2
python3 -u fl_trainer.py 1 >log/trainer1.log &
```
In fl_server.py, we load and run the FL server job.
......@@ -112,12 +120,19 @@ server.set_server_job(job)
server.start()
```
In fl_trainer.py, we prepare the MNIST dataset, load and run the FL trainer job, then evaluate the accuracy. Before training , we first prepare the party's private key and other party's public key. Then, each party generates mask using Diffie-Hellman key aggregate protocal with its parivate key and other's public key [1], and masks the model parameters before uploading to the server. Finally, the server can remove the masks by aggregating the parameters from all the parties.
In fl_trainer.py, we prepare the MNIST dataset, load and run the FL trainer job, then evaluate the accuracy. Before training , we first prepare the party's private key and other party's public key. Then, each party generates a random noise using Diffie-Hellman key aggregate protocol with its private key and each other's public key [1]. If the other party's id is larger than this party's id, the model parameters add this random noise. If the other party's id is less than this party's id, the model parameters subtract this random noise. So, and the model parameters is masked before uploading to the server. Finally, the random noises can be removed when aggregating the masked parameters from all the parties.
```python
logging.basicConfig(filename="log/test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG)
logger = logging.getLogger("FLTrainer")
BATCH_SIZE = 64
train_reader = paddle.batch(
paddle.reader.shuffle(paddle.dataset.mnist.train(), buf_size=500),
batch_size=16)
batch_size=BATCH_SIZE)
test_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=BATCH_SIZE)
trainer_num = 2
trainer_id = int(sys.argv[1]) # trainer id for each guest
......@@ -126,63 +141,54 @@ job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id)
trainer = FLTrainerFactory().create_fl_trainer(job)
trainer.trainer_id = trainer_id
trainer.trainer_num = trainer_num
trainer.key_dir = "./keys/"
trainer.start()
output_folder = "fl_model"
epoch_id = 0
step_i = 0
inputs = fluid.layers.data(name='x', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='y', shape=[1], dtype='int64')
feeder = fluid.DataFeeder(feed_list=[inputs, label], place=fluid.CPUPlace())
# for test
test_program = trainer._main_program.clone(for_test=True)
def train_test(train_test_program,
train_test_feed, train_test_reader):
acc_set = []
avg_loss_set = []
for test_data in train_test_reader():
acc_np, avg_loss_np = trainer.exe.run(
program=train_test_program,
feed=train_test_feed.feed(test_data),
fetch_list=["accuracy_0.tmp_0", "mean_0.tmp_0"])
acc_set.append(float(acc_np))
avg_loss_set.append(float(avg_loss_np))
acc_val_mean = numpy.array(acc_set).mean()
avg_loss_val_mean = numpy.array(avg_loss_set).mean()
return avg_loss_val_mean, acc_val_mean
# for test
while not trainer.stop():
epoch_id += 1
print("epoch %d start train" % (epoch_id))
starttime = datetime.datetime.now()
# prepare the aggregated parameters
param_name_list = []
param_name_list.append("fc_0.b_0.opti.trainer_" + str(trainer_id))
param_name_list.append("fc_0.b_0.opti.trainer_" + str(trainer_id))
inputs = fluid.layers.data(name='x', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='y', shape=[1], dtype='int64')
feeder = fluid.DataFeeder(feed_list=[inputs, label], place=fluid.CPUPlace())
scale = pow(10.0, 5)
# 1. load priv key and other's pub key
# party_name = Party(trainer_id + 1)
dh = DiffieHellman(group=15, key_length=256)
dh.load_private_key(str(trainer_id) + "_priv_key.txt")
digestmod="SHA256"
for data in train_reader():
step_i += 1
noise = 0.0
# 2. generate noise
secagg_starttime = datetime.datetime.now()
key = str(step_i).encode("utf-8")
for i in range(trainer_num):
if i != trainer_id:
f = open(str(i) + "_pub_key.txt", "r")
public_key = int(f.read())
dh.generate_shared_secret(public_key, echo_return_key=True)
msg = dh.shared_key.encode("utf-8")
hex_res1 = hmac.new(key=key, msg=msg, digestmod=digestmod).hexdigest()
current_noise = int(hex_res1[0:8], 16) / scale
if i > trainer_id:
noise = noise + current_noise
else:
noise = noise - current_noise
if step_i % 100 == 0:
print("Step: {0}".format(step_i))
# 3. add noise between training and sending.
trainer.step_id = step_i
accuracy, = trainer.run(feed=feeder.feed(data),
fetch=["top_k_0.tmp_0"],
param_name_list=param_name_list,
mask=noise)
fetch=["accuracy_0.tmp_0"])
if step_i % 100 == 0:
print("Epoch: {0}, step: {1}, accuracy: {2}".format(epoch_id, step_i, accuracy[0]))
print("Epoch: {0}, step: {1}, accuracy: {2}".format(epoch_id, step_i, accuracy[0]))
endtime = datetime.datetime.now()
print("time cost: {0}".format(endtime - starttime))
avg_loss_val, acc_val = train_test(train_test_program=test_program,
train_test_reader=test_reader,
train_test_feed=feeder)
print("Test with Epoch %d, avg_cost: %s, acc: %s" %(epoch_id, avg_loss_val, acc_val))
if epoch_id > 40:
break
......@@ -192,4 +198,4 @@ while not trainer.stop():
[1] Aaron Segal, Antonio Marcedone, Benjamin Kreuter, Daniel Ramage, H. Brendan McMahan, Karn Seth, Keith Bonawitz, Sarvar Patel, Vladimir Ivanov. **Practical Secure Aggregation for Privacy-Preserving Machine Learning**, The 24th ACM Conference on Computer and Communications Security (**CCS**), 2017
\ No newline at end of file
[1] Aaron Segal, Antonio Marcedone, Benjamin Kreuter, Daniel Ramage, H. Brendan McMahan, Karn Seth, Keith Bonawitz, Sarvar Patel, Vladimir Ivanov. **Practical Secure Aggregation for Privacy-Preserving Machine Learning**, The 24th ACM Conference on Computer and Communications Security (**CCS**), 2017
......@@ -74,7 +74,7 @@ class FLStrategyFactory(object):
strategy._dpsgd = True
strategy._sec_agg = False
elif self._sec_agg == True:
strategy = FedAvgStrategy()
strategy = SecAggStrategy()
strategy._fed_avg = False
strategy._dpsgd = False
strategy._sec_agg = True
......@@ -162,7 +162,7 @@ class DPSGDStrategy(FLStrategyBase):
"""
Define Dpsgd optimizer
"""
# optimizer = fluid.optimizer.Dpsgd(self._learning_rate, clip=self._clip, batch_size=self._batch_size, sigma=self._sigma)
optimizer = fluid.optimizer.Dpsgd(self._learning_rate, clip=self._clip, batch_size=self._batch_size, sigma=self._sigma)
optimizer.minimize(losses[0])
def _build_trainer_program_for_job(
......@@ -255,87 +255,23 @@ class FedAvgStrategy(FLStrategyBase):
job._server_main_programs.append(main_prog)
class SecAggStrategy(FedAvgStrategy):
"""
DPSGDStrategy: this is model averaging optimization proposed in
Aaron Segal, Antonio Marcedone, Benjamin Kreuter, et al.
Practical Secure Aggregation for Privacy-Preserving Machine Learning,
The 24th ACM Conference on Computer and Communications Security ( CCS2017 ).
"""
def __init__(self):
super(SecAggStrategy, self).__init__()
self._param_name_list = []
@property
def param_name_list(self):
return self._param_name_list
@param_name_list.setter
def param_name_list(self, s):
self._param_name_list = s
......@@ -14,6 +14,8 @@
import paddle.fluid as fluid
import logging
import numpy
import hmac
from .diffiehellman.diffiehellman import DiffieHellman
class FLTrainerFactory(object):
def __init__(self):
......@@ -79,6 +81,7 @@ class FLTrainer(object):
# TODO(guru4elephant): add connection with master
return False
class FedAvgTrainer(FLTrainer):
def __init__(self):
super(FedAvgTrainer, self).__init__()
......@@ -97,26 +100,18 @@ class FedAvgTrainer(FLTrainer):
def reset(self):
self.cur_step = 0
def run(self, feed, fetch, train_id, mask):
def run(self, feed, fetch):
self._logger.debug("begin to run FedAvgTrainer, cur_step=%d, inner_step=%d" %
(self.cur_step, self._step))
if self.cur_step % self._step == 0:
self._logger.debug("begin to run recv program")
self.exe.run(self._recv_program)
scope = fluid.global_scope()
print("****** fc_0.b_0: {0}".format(numpy.array(scope.find_var("fc_0.b_0").get_tensor())))
print("****** fc_0.w_0: {0}".format(numpy.array(scope.find_var("fc_0.w_0").get_tensor())))
self._logger.debug("begin to run current step")
loss = self.exe.run(self._main_program,
feed=feed,
fetch_list=fetch)
if self.cur_step % self._step == 0:
self._logger.debug("begin to run send program")
scope = fluid.global_scope()
name1 = "fc_0.b_0.opti.trainer_" + str(train_id)
name2 = "fc_0.w_0.opti.trainer_" + str(train_id)
fluid.global_scope().var(name1).get_tensor().set(numpy.array(scope.find_var(name1).get_tensor()) + mask, fluid.CPUPlace())
fluid.global_scope().var(name2).get_tensor().set(numpy.array(scope.find_var(name2).get_tensor()) + mask, fluid.CPUPlace())
self.exe.run(self._send_program)
self.cur_step += 1
return loss
......@@ -130,6 +125,38 @@ class SecAggTrainer(FLTrainer):
super(SecAggTrainer, self).__init__()
pass
@property
def trainer_id(self):
return self._trainer_id
@trainer_id.setter
def trainer_id(self, s):
self._trainer_id = s
@property
def trainer_num(self):
return self._trainer_num
@trainer_num.setter
def trainer_num(self, s):
self._trainer_num = s
@property
def key_dir(self):
return self._key_dir
@key_dir.setter
def key_dir(self, s):
self._key_dir = s
@property
def step_id(self):
return self._step_id
@step_id.setter
def step_id(self, s):
self._step_id = s
def start(self):
self.exe = fluid.Executor(fluid.CPUPlace())
self.exe.run(self._startup_program)
......@@ -139,11 +166,14 @@ class SecAggTrainer(FLTrainer):
super(SecAggTrainer, self).set_trainer_job(job)
self._send_program = job._trainer_send_program
self._recv_program = job._trainer_recv_program
self_step = job._strategy._inner_step
self._param_name_list = job._strategy._param_name_list
def reset(self):
self.cur_step = 0
def run(self, feed, fetch, param_name_list, mask):
def run(self, feed, fetch):
self._logger.debug("begin to run SecAggTrainer, cur_step=%d, inner_step=%d" %
(self.cur_step, self._step))
if self.cur_step % self._step == 0:
......@@ -156,9 +186,30 @@ class SecAggTrainer(FLTrainer):
fetch_list=fetch)
if self.cur_step % self._step == 0:
self._logger.debug("begin to run send program")
noise = 0.0
scale = pow(10.0, 5)
digestmod="SHA256"
# 1. load priv key and other's pub key
dh = DiffieHellman(group=15, key_length=256)
dh.load_private_key(self._key_dir + str(self._trainer_id) + "_priv_key.txt")
key = str(self._step_id).encode("utf-8")
for i in range(self._trainer_num):
if i != self._trainer_id:
f = open(self._key_dir + str(i) + "_pub_key.txt", "r")
public_key = int(f.read())
dh.generate_shared_secret(public_key, echo_return_key=True)
msg = dh.shared_key.encode("utf-8")
hex_res1 = hmac.new(key=key, msg=msg, digestmod=digestmod).hexdigest()
current_noise = int(hex_res1[0:8], 16) / scale
if i > self._trainer_id:
noise = noise + current_noise
else:
noise = noise - current_noise
scope = fluid.global_scope()
for param_name in param_name_list:
fluid.global_scope().var(param_name).get_tensor().set(numpy.array(scope.find_var(param_name).get_tensor()) + mask, fluid.CPUPlace())
for param_name in self._param_name_list:
fluid.global_scope().var(param_name + str(self._trainer_id)).get_tensor().set(
numpy.array(scope.find_var(param_name + str(self._trainer_id)).get_tensor()) + noise, fluid.CPUPlace())
self.exe.run(self._send_program)
self.cur_step += 1
return loss
......
......@@ -35,7 +35,14 @@ job_generator.set_infer_feed_and_target_names(
[inputs.name, label.name], [model.loss.name])
build_strategy = FLStrategyFactory()
#build_strategy.fed_avg = True
build_strategy.sec_agg = True
param_name_list = []
param_name_list.append("fc_0.w_0.opti.trainer_") # need trainer_id when running
param_name_list.append("fc_0.b_0.opti.trainer_")
build_strategy.param_name_list = param_name_list
build_strategy.inner_step = 10
strategy = build_strategy.create_fl_strategy()
......
......@@ -10,16 +10,17 @@ import datetime
import math
import hashlib
import hmac
from diffiehellman.diffiehellman import DiffieHellman
# from enum import Enum
# Party = Enum('Party', ('alice', 'bob'))
logging.basicConfig(filename="test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG)
logging.basicConfig(filename="log/test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG)
logger = logging.getLogger("FLTrainer")
BATCH_SIZE = 64
train_reader = paddle.batch(
paddle.reader.shuffle(paddle.dataset.mnist.train(), buf_size=500),
batch_size=16)
batch_size=BATCH_SIZE)
test_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=BATCH_SIZE)
trainer_num = 2
trainer_id = int(sys.argv[1]) # trainer id for each guest
......@@ -28,72 +29,56 @@ job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id)
trainer = FLTrainerFactory().create_fl_trainer(job)
trainer.trainer_id = trainer_id
trainer.trainer_num = trainer_num
trainer.key_dir = "./keys/"
trainer.start()
output_folder = "fl_model"
epoch_id = 0
step_i = 0
while not trainer.stop():
epoch_id += 1
print("epoch %d start train" % (epoch_id))
starttime = datetime.datetime.now()
# prepare the aggregated parameters
param_name_list = []
param_name_list.append("fc_0.b_0.opti.trainer_" + str(trainer_id))
param_name_list.append("fc_0.b_0.opti.trainer_" + str(trainer_id))
inputs = fluid.layers.data(name='x', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='y', shape=[1], dtype='int64')
feeder = fluid.DataFeeder(feed_list=[inputs, label], place=fluid.CPUPlace())
inputs = fluid.layers.data(name='x', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='y', shape=[1], dtype='int64')
feeder = fluid.DataFeeder(feed_list=[inputs, label], place=fluid.CPUPlace())
scale = pow(10.0, 5)
# 1. load priv key and other's pub key
# party_name = Party(trainer_id + 1)
dh = DiffieHellman(group=15, key_length=256)
dh.load_private_key(str(trainer_id) + "_priv_key.txt")
# for test
test_program = trainer._main_program.clone(for_test=True)
digestmod="SHA256"
def train_test(train_test_program,
train_test_feed, train_test_reader):
acc_set = []
avg_loss_set = []
for test_data in train_test_reader():
acc_np, avg_loss_np = trainer.exe.run(
program=train_test_program,
feed=train_test_feed.feed(test_data),
fetch_list=["accuracy_0.tmp_0", "mean_0.tmp_0"])
acc_set.append(float(acc_np))
avg_loss_set.append(float(avg_loss_np))
acc_val_mean = numpy.array(acc_set).mean()
avg_loss_val_mean = numpy.array(avg_loss_set).mean()
return avg_loss_val_mean, acc_val_mean
# for test
while not trainer.stop():
epoch_id += 1
print("epoch %d start train" % (epoch_id))
for data in train_reader():
step_i += 1
noise = 0.0
# 2. generate noise
secagg_starttime = datetime.datetime.now()
key = str(step_i).encode("utf-8")
for i in range(trainer_num):
if i != trainer_id:
f = open(str(i) + "_pub_key.txt", "r")
public_key = int(f.read())
dh.generate_shared_secret(public_key, echo_return_key=True)
msg = dh.shared_key.encode("utf-8")
hex_res1 = hmac.new(key=key, msg=msg, digestmod=digestmod).hexdigest()
current_noise = int(hex_res1[0:8], 16) / scale
if i > trainer_id:
noise = noise + current_noise
else:
noise = noise - current_noise
#secagg_endtime = datetime.datetime.now()
#print("Epoch: {0}, step: {1}".format(epoch_id, step_i))
#print("secagg time cost: {0}".format(secagg_endtime - secagg_starttime))
if step_i % 100 == 0:
print("Step: {0}".format(step_i))
# 3. add noise between training and sending.
trainer.step_id = step_i
accuracy, = trainer.run(feed=feeder.feed(data),
fetch=["top_k_0.tmp_0"],
param_name_list=param_name_list,
mask=noise)
#train_endtime = datetime.datetime.now()
#print("train time cost: {0}".format(train_endtime - secagg_endtime))
print("Epoch: {0}, step: {1}, accuracy: {2}".format(epoch_id, step_i, accuracy[0]))
endtime = datetime.datetime.now()
print("time cost: {0}".format(endtime - starttime))
fetch=["accuracy_0.tmp_0"])
if step_i % 100 == 0:
print("Epoch: {0}, step: {1}, accuracy: {2}".format(epoch_id, step_i, accuracy[0]))
avg_loss_val, acc_val = train_test(train_test_program=test_program,
train_test_reader=test_reader,
train_test_feed=feeder)
print("Test with Epoch %d, avg_cost: %s, acc: %s" %(epoch_id, avg_loss_val, acc_val))
if epoch_id > 40:
break
if step_i % 100 == 0:
#print("Epoch: {0},loss: {1}".format(step_i, loss_value[0]))
trainer.save_inference_program(output_folder)
unset http_proxy
unset https_proxy
if [ ! -d log ];then
mkdir log
fi
python3 fl_master.py
sleep 2
python3 -u fl_server.py >server0.log &
python3 -u fl_server.py >log/server0.log &
sleep 2
python3 -u fl_trainer.py 0 >trainer0.log &
python3 -u fl_trainer.py 0 >log/trainer0.log &
sleep 2
python3 -u fl_trainer.py 1 >trainer1.log &
python3 -u fl_trainer.py 1 >log/trainer1.log &
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册