提交 5235ce6d 编写于 作者: J jingqinghe

update document

上级 4c25972a
......@@ -45,14 +45,14 @@ cd /path/to/PaddleFL
mkdir build && cd build
```
Execute compile commands, where `PYTHON_EXECUTABLE` is path to the python binary where the PaddlePaddle is installed, and `PYTHON_INCLUDE_DIRS` is the corresponding python include directory. You can get the `PYTHON_INCLUDE_DIRS` via the following command:
Execute compile commands, where `PYTHON_EXECUTABLE` is path to the python binary where the PaddlePaddle is installed, 'g++_path' is the path of G++ and `PYTHON_INCLUDE_DIRS` is the corresponding python include directory. You can get the `PYTHON_INCLUDE_DIRS` via the following command:
```sh
${PYTHON_EXECUTABLE} -c "from distutils.sysconfig import get_python_inc;print(get_python_inc())"
```
Then you can put the directory in the following command and make:
```sh
cmake ../ -DPYTHON_EXECUTABLE=${PYTHON_EXECUTABLE} -DPYTHON_INCLUDE_DIRS=${python_include_dir}
cmake ../ -DPYTHON_EXECUTABLE=${PYTHON_EXECUTABLE} -DPYTHON_INCLUDE_DIRS=${python_include_dir} -DCMAKE_CXX_COMPILER=${g++_path}
make -j$(nproc)
```
......
......@@ -8,6 +8,60 @@ PaddleFL是一个基于PaddlePaddle的开源联邦学习框架。研究人员可
## 编译与安装
### 使用docker安装
```sh
#Pull and run the docker
docker pull hub.baidubce.com/paddlefl/paddle_fl:latest
docker run --name <docker_name> --net=host -it -v $PWD:/root <image id> /bin/bash
#Install paddle_fl
pip install paddle_fl
```
### 从源码编译
#### 环境准备
* CentOS 6 or CentOS 7 (64 bit)
* Python 2.7.15+/3.5.1+/3.6/3.7 ( 64 bit) or above
* pip or pip3 9.0.1+ (64 bit)
* PaddlePaddle release 1.6.3
* Redis 5.0.8 (64 bit)
* GCC or G++ 4.8.3+
* cmake 3.15+
#### 克隆源代码并编译安装
获取源代码
```sh
git clone https://github.com/PaddlePaddle/PaddleFL
cd /path/to/PaddleFL
# Checkout stable release
mkdir build && cd build
```
执行编译指令, `PYTHON_EXECUTABLE` 为安装了PaddlePaddle的可执行python路径, `PYTHON_INCLUDE_DIRS` 是相应的include路径,可以用如下指令获得。'g++_path' 为指定的g++路径。
```sh
${PYTHON_EXECUTABLE} -c "from distutils.sysconfig import get_python_inc;print(get_python_inc())"
```
之后就可以执行编译和安装的指令
```sh
cmake ../ -DPYTHON_EXECUTABLE=${PYTHON_EXECUTABLE} -DPYTHON_INCLUDE_DIRS=${python_include_dir} -DCMAKE_CXX_COMPILER=${g++_path}
make -j$(nproc)
```
安装对应的安装包
```sh
make install
cd /path/to/PaddleFL/python
${PYTHON_EXECUTABLE} setup.py sdist bdist_wheel
pip or pip3 install dist/***.whl -U
```
## PaddleFL概述
### 横向联邦方案
......
# Example in Recognize Digits with DPSGD
# Example in Recognize Digits with Secure Aggragate
This document introduces how to use PaddleFL to train a model with Fl Strategy: Secure Aggregation. Using Secure Aggregation strategy, the server can aggregate the model parameters without learning the value of the parameters.
### Dependencies
- paddlepaddle>=1.6
- paddlepaddle>=1.8
### How to install PaddleFL
Please use python which has paddlepaddle installed
```
python setup.py install
pip install paddle_fl
```
### Model
......
# Example in CTR(Click-Through-Rate) with FedAvg
This document introduces how to use PaddleFL to train a model with Fl Strategy.
### Dependencies
- paddlepaddle>=1.8
### How to install PaddleFL
Please use pip which has paddlepaddle installed
```
pip install paddle_fl
```
### How to work in PaddleFL
PaddleFL has two phases , CompileTime and RunTime. In CompileTime, a federated learning task is defined by fl_master. In RunTime, a federated learning job is executed on fl_server and fl_trainer in distributed clusters.
```
sh run.sh
```
#### How to work in CompileTime
In this example, we implement compile time programs in fl_master.py
```
python fl_master.py
```
In fl_master.py, we first define FL-Strategy, User-Defined-Program and Distributed-Config. Then FL-Job-Generator generate FL-Job for federated server and worker.
```python
class Model(object):
def __init__(self):
pass
def mlp(self, inputs, label, hidden_size=128):
self.concat = fluid.layers.concat(inputs, axis=1)
self.fc1 = fluid.layers.fc(input=self.concat, size=256, act='relu')
self.fc2 = fluid.layers.fc(input=self.fc1, size=128, act='relu')
self.predict = fluid.layers.fc(input=self.fc2, size=2, act='softmax')
self.sum_cost = fluid.layers.cross_entropy(
input=self.predict, label=label)
self.accuracy = fluid.layers.accuracy(input=self.predict, label=label)
self.loss = fluid.layers.reduce_mean(self.sum_cost)
self.startup_program = fluid.default_startup_program()
inputs = [fluid.layers.data( \
name=str(slot_id), shape=[5],
dtype="float32")
for slot_id in range(3)]
label = fluid.layers.data( \
name="label",
shape=[1],
dtype='int64')
model = Model()
model.mlp(inputs, label)
job_generator = JobGenerator()
optimizer = fluid.optimizer.SGD(learning_rate=0.1)
job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program)
job_generator.set_infer_feed_and_target_names([x.name for x in inputs],
[model.predict.name])
build_strategy = FLStrategyFactory()
build_strategy.fed_avg = True
build_strategy.inner_step = 10
strategy = build_strategy.create_fl_strategy()
# endpoints will be collected through the cluster
# in this example, we suppose endpoints have been collected
endpoints = ["127.0.0.1:8181"]
output = "fl_job_config"
job_generator.generate_fl_job(
strategy, server_endpoints=endpoints, worker_num=2, output=output)
# fl_job_config will be dispatched to workers
```
#### How to work in RunTime
```
python -u fl_scheduler.py >scheduler.log &
python -u fl_server.py >server0.log &
python -u fl_trainer.py 0 >trainer0.log &
python -u fl_trainer.py 1 >trainer1.log &
```
In fl_scheduler.py, we let server and trainers to do registeration.
```
worker_num = 2
server_num = 1
# Define the number of worker/server and the port for scheduler
scheduler = FLScheduler(worker_num, server_num, port=9091)
scheduler.set_sample_worker_num(worker_num)
scheduler.init_env()
print("init env done.")
scheduler.start_fl_training()
```
In fl_server.py, we load and run the FL server job.
```
server = FLServer()
server_id = 0
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_server_job(job_path, server_id)
job._scheduler_ep = "127.0.0.1:9091" # IP address for scheduler
server.set_server_job(job)
server._current_ep = "127.0.0.1:8181" # IP address for server
server.start()
```
In fl_trainer.py, we load and run the FL trainer job, then evaluate the accuracy with test data and compute the privacy budget. The DataSet is ramdomly generated.
```
def reader():
for i in range(1000):
data_dict = {}
for i in range(3):
data_dict[str(i)] = np.random.rand(1, 5).astype('float32')
data_dict["label"] = np.random.randint(2, size=(1, 1)).astype('int64')
yield data_dict
trainer_id = int(sys.argv[1]) # trainer id for each guest
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id)
job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer
trainer = FLTrainerFactory().create_fl_trainer(job)
trainer._current_ep = "127.0.0.1:{}".format(9000 + trainer_id)
place = fluid.CPUPlace()
trainer.start(place)
print(trainer._scheduler_ep, trainer._current_ep)
output_folder = "fl_model"
epoch_id = 0
while not trainer.stop():
print("batch %d start train" % (epoch_id))
train_step = 0
for data in reader():
trainer.run(feed=data, fetch=[])
train_step += 1
if train_step == trainer._step:
break
epoch_id += 1
if epoch_id % 5 == 0:
trainer.save_inference_program(output_folder)
```
# Example in Recognize Digits with DPSGD
This document introduces how to use PaddleFL to train a model with Fl Strategy: DPSGD.
### Dependencies
- paddlepaddle>=1.8
### How to install PaddleFL
Please use pip which has paddlepaddle installed
```
pip install paddle_fl
```
### Model
The simplest Softmax regression model is to get features with input layer passing through a fully connected layer and then compute and ouput probabilities of multiple classifications directly via Softmax function [[PaddlePaddle tutorial: recognize digits](https://github.com/PaddlePaddle/book/tree/develop/02.recognize_digits#references)].
### Datasets
Public Dataset [MNIST](http://yann.lecun.com/exdb/mnist/)
The dataset will downloaded automatically in the API and will be located under `/home/username/.cache/paddle/dataset/mnist`:
| filename | note |
| ----------------------- | ------------------------------- |
| train-images-idx3-ubyte | train data picture, 60,000 data |
| train-labels-idx1-ubyte | train data label, 60,000 data |
| t10k-images-idx3-ubyte | test data picture, 10,000 data |
| t10k-labels-idx1-ubyte | test data label, 10,000 data |
### How to work in PaddleFL
PaddleFL has two phases , CompileTime and RunTime. In CompileTime, a federated learning task is defined by fl_master. In RunTime, a federated learning job is executed on fl_server and fl_trainer in distributed clusters.
```
sh run.sh
```
#### How to work in CompileTime
In this example, we implement compile time programs in fl_master.py
```
python fl_master.py
```
In fl_master.py, we first define FL-Strategy, User-Defined-Program and Distributed-Config. Then FL-Job-Generator generate FL-Job for federated server and worker.
```python
class Model(object):
def __init__(self):
pass
def lr_network(self):
self.inputs = fluid.layers.data(name='img', shape=[1, 28, 28], dtype="float32")
self.label = fluid.layers.data(name='label', shape=[1],dtype='int64')
self.predict = fluid.layers.fc(input=self.inputs, size=10, act='softmax')
self.sum_cost = fluid.layers.cross_entropy(input=self.predict, label=self.label)
self.accuracy = fluid.layers.accuracy(input=self.predict, label=self.label)
self.loss = fluid.layers.mean(self.sum_cost)
self.startup_program = fluid.default_startup_program()
model = Model()
model.lr_network()
STEP_EPSILON = 0.1
DELTA = 0.00001
SIGMA = math.sqrt(2.0 * math.log(1.25/DELTA)) / STEP_EPSILON
CLIP = 4.0
batch_size = 64
job_generator = JobGenerator()
optimizer = fluid.optimizer.SGD(learning_rate=0.1)
job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program)
job_generator.set_infer_feed_and_target_names(
[model.inputs.name, model.label.name], [model.loss.name, model.accuracy.name])
build_strategy = FLStrategyFactory()
build_strategy.dpsgd = True
build_strategy.inner_step = 1
strategy = build_strategy.create_fl_strategy()
strategy.learning_rate = 0.1
strategy.clip = CLIP
strategy.batch_size = float(batch_size)
strategy.sigma = CLIP * SIGMA
# endpoints will be collected through the cluster
# in this example, we suppose endpoints have been collected
endpoints = ["127.0.0.1:8181"]
output = "fl_job_config"
job_generator.generate_fl_job(
strategy, server_endpoints=endpoints, worker_num=2, output=output)
```
#### How to work in RunTime
```
python -u fl_scheduler.py >scheduler.log &
python -u fl_server.py >server0.log &
python -u fl_trainer.py 0 >trainer0.log &
python -u fl_trainer.py 1 >trainer1.log &
python -u fl_trainer.py 2 >trainer2.log &
python -u fl_trainer.py 3 >trainer3.log &
```
In fl_scheduler.py, we let server and trainers to do registeration.
```
worker_num = 4
server_num = 1
#Define number of worker/server and the port for scheduler
scheduler = FLScheduler(worker_num, server_num, port=9091)
scheduler.set_sample_worker_num(4)
scheduler.init_env()
print("init env done.")
scheduler.start_fl_training()
```
In fl_server.py, we load and run the FL server job.
```
server = FLServer()
server_id = 0
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_server_job(job_path, server_id)
job._scheduler_ep = "127.0.0.1:9091" # IP address for scheduler
server.set_server_job(job)
server._current_ep = "127.0.0.1:8181" # IP address for server
server.start()
```
In fl_trainer.py, we load and run the FL trainer job, then evaluate the accuracy with test data and compute the privacy budget.
```
trainer_id = int(sys.argv[1]) # trainer id for each guest
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id)
trainer = FLTrainerFactory().create_fl_trainer(job)
trainer.start()
```
```
def train_test(train_test_program, train_test_feed, train_test_reader):
acc_set = []
for test_data in train_test_reader():
acc_np = trainer.exe.run(
program=train_test_program,
feed=train_test_feed.feed(test_data),
fetch_list=["accuracy_0.tmp_0"])
acc_set.append(float(acc_np[0]))
acc_val_mean = numpy.array(acc_set).mean()
return acc_val_mean
def compute_privacy_budget(sample_ratio, epsilon, step, delta):
E = 2 * epsilon * math.sqrt(step * sample_ratio)
print("({0}, {1})-DP".format(E, delta))
output_folder = "model_node%d" % trainer_id
epoch_id = 0
step = 0
while not trainer.stop():
epoch_id += 1
if epoch_id > 40:
break
print("epoch %d start train" % (epoch_id))
for step_id, data in enumerate(train_reader()):
acc = trainer.run(feeder.feed(data), fetch=["accuracy_0.tmp_0"])
step += 1
# print("acc:%.3f" % (acc[0]))
acc_val = train_test(
train_test_program=test_program,
train_test_reader=test_reader,
train_test_feed=feeder)
print("Test with epoch %d, accuracy: %s" % (epoch_id, acc_val))
compute_privacy_budget(sample_ratio=0.001, epsilon=0.1, step=step, delta=0.00001)
save_dir = (output_folder + "/epoch_%d") % epoch_id
trainer.save_inference_program(output_folder)
```
### Simulated experiments on public dataset MNIST
To show the effectiveness of DPSGD-based federated learning with PaddleFL, a simulated experiment is conducted on an open source dataset MNIST. From the figure given below, model evaluation results are similar between DPSGD-based federated learning and traditional parameter server training when the overall privacy budget *epsilon* is 1.3 or 0.13.
<img src="fl_dpsgd_benchmark.png" height=400 width=600 hspace='10'/> <br />
# Example in LEAF Dataset with FedAvg
This document introduces how to use PaddleFL to train a model with Fl Strategy: FedAvg.
### Dependencies
- paddlepaddle>=1.8
### How to install PaddleFL
Please use pip which has paddlepaddle installed
```
pip install paddle_fl
```
### Model
An CNN model which get features with two convolution layers and one fully connected layer and then compute and ouput probabilities of multiple classifications directly via Softmax function.
### Datasets
Public Dataset FEMNIST in [LEAF](https://github.com/TalwalkarLab/leaf)
### How to work in PaddleFL
PaddleFL has two phases , CompileTime and RunTime. In CompileTime, a federated learning task is defined by fl_master. In RunTime, a federated learning job is executed on fl_server and fl_trainer in distributed clusters.
```
sh run.sh
```
#### How to work in CompileTime
In this example, we implement compile time programs in fl_master.py
```
python fl_master.py
```
In fl_master.py, we first define FL-Strategy, User-Defined-Program and Distributed-Config. Then FL-Job-Generator generate FL-Job for federated server and worker.
```python
class Model(object):
def __init__(self):
pass
def cnn(self):
self.inputs = fluid.layers.data(
name='img', shape=[1, 28, 28], dtype="float32")
self.label = fluid.layers.data(name='label', shape=[1], dtype='int64')
self.conv_pool_1 = fluid.nets.simple_img_conv_pool(
input=self.inputs,
num_filters=20,
filter_size=5,
pool_size=2,
pool_stride=2,
act='relu')
self.conv_pool_2 = fluid.nets.simple_img_conv_pool(
input=self.conv_pool_1,
num_filters=50,
filter_size=5,
pool_size=2,
pool_stride=2,
act='relu')
self.predict = self.predict = fluid.layers.fc(input=self.conv_pool_2,
size=62,
act='softmax')
self.cost = fluid.layers.cross_entropy(
input=self.predict, label=self.label)
self.accuracy = fluid.layers.accuracy(
input=self.predict, label=self.label)
self.loss = fluid.layers.mean(self.cost)
self.startup_program = fluid.default_startup_program()
model = Model()
model.cnn()
job_generator = JobGenerator()
optimizer = fluid.optimizer.Adam(learning_rate=0.1)
job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program)
job_generator.set_infer_feed_and_target_names(
[model.inputs.name, model.label.name],
[model.loss.name, model.accuracy.name])
build_strategy = FLStrategyFactory()
build_strategy.fed_avg = True
build_strategy.inner_step = 1
strategy = build_strategy.create_fl_strategy()
endpoints = ["127.0.0.1:8181"]
output = "fl_job_config"
job_generator.generate_fl_job(
strategy, server_endpoints=endpoints, worker_num=4, output=output)
```
#### How to work in RunTime
```
python -u fl_scheduler.py >scheduler.log &
python -u fl_server.py >server0.log &
for ((i=0;i<4;i++))
do
python -u fl_trainer.py $i >trainer$i.log &
done
```
In fl_scheduler.py, we let server and trainers to do registeration.
```
worker_num = 4
server_num = 1
# Define the number of worker/server and the port for scheduler
scheduler = FLScheduler(worker_num, server_num, port=9091)
scheduler.set_sample_worker_num(4)
scheduler.init_env()
print("init env done.")
scheduler.start_fl_training()
```
In fl_server.py, we load and run the FL server job.
```
server = FLServer()
server_id = 0
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_server_job(job_path, server_id)
job._scheduler_ep = "127.0.0.1:9091" # IP address for scheduler
server.set_server_job(job)
server._current_ep = "127.0.0.1:8181" # IP address for server
server.start()
```
In fl_trainer.py, we load and run the FL trainer job.
```
trainer_id = int(sys.argv[1]) # trainer id for each guest
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id)
job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer
print(job._target_names)
trainer = FLTrainerFactory().create_fl_trainer(job)
trainer._current_ep = "127.0.0.1:{}".format(9000 + trainer_id)
place = fluid.CPUPlace()
trainer.start(place)
print(trainer._step)
test_program = trainer._main_program.clone(for_test=True)
img = fluid.layers.data(name='img', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
feeder = fluid.DataFeeder(feed_list=[img, label], place=fluid.CPUPlace())
def train_test(train_test_program, train_test_feed, train_test_reader):
acc_set = []
for test_data in train_test_reader():
acc_np = trainer.exe.run(program=train_test_program,
feed=train_test_feed.feed(test_data),
fetch_list=["accuracy_0.tmp_0"])
acc_set.append(float(acc_np[0]))
acc_val_mean = numpy.array(acc_set).mean()
return acc_val_mean
epoch_id = 0
step = 0
epoch = 3000
count_by_step = False
if count_by_step:
output_folder = "model_node%d" % trainer_id
else:
output_folder = "model_node%d_epoch" % trainer_id
while not trainer.stop():
count = 0
epoch_id += 1
if epoch_id > epoch:
break
print("epoch %d start train" % (epoch_id))
#train_data,test_data= data_generater(trainer_id,inner_step=trainer._step,batch_size=64,count_by_step=count_by_step)
train_reader = paddle.batch(
paddle.reader.shuffle(
femnist.train(
trainer_id,
inner_step=trainer._step,
batch_size=64,
count_by_step=count_by_step),
buf_size=500),
batch_size=64)
test_reader = paddle.batch(
femnist.test(
trainer_id,
inner_step=trainer._step,
batch_size=64,
count_by_step=count_by_step),
batch_size=64)
if count_by_step:
for step_id, data in enumerate(train_reader()):
acc = trainer.run(feeder.feed(data), fetch=["accuracy_0.tmp_0"])
step += 1
count += 1
print(count)
if count % trainer._step == 0:
break
# print("acc:%.3f" % (acc[0]))
else:
trainer.run_with_epoch(
train_reader, feeder, fetch=["accuracy_0.tmp_0"], num_epoch=1)
acc_val = train_test(
train_test_program=test_program,
train_test_reader=test_reader,
train_test_feed=feeder)
print("Test with epoch %d, accuracy: %s" % (epoch_id, acc_val))
if trainer_id == 0:
save_dir = (output_folder + "/epoch_%d") % epoch_id
trainer.save_inference_program(output_folder)
```
......@@ -9,6 +9,6 @@ python -u fl_server.py >server0.log &
sleep 2
for ((i=0;i<4;i++))
do
python -u fl_trainer.py $i >trainer$i.log &
sleep 2
python -u fl_trainer.py $i >trainer$i.log &
sleep 2
done
# Example to Load Program from a Pre-defined Model
This document introduces how to load a pre-defined model, and transfer into program that is usable by PaddleFL.
### Dependencies
- paddlepaddle>=1.8
- paddle_fl>=1.0
Please use pip which has paddlepaddle installed
```
pip install paddle_fl
```
### Compile Time
#### How to save a program
In program_saver.py, you can defind a model. And save the program in to 'load_file'
```
input = fluid.layers.data(name='input', shape=[1, 28, 28], dtype="float32")
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
feeder = fluid.DataFeeder(feed_list=[input, label], place=fluid.CPUPlace())
predict = fluid.layers.fc(input=input, size=10, act='softmax')
sum_cost = fluid.layers.cross_entropy(input=predict, label=label)
accuracy = fluid.layers.accuracy(input=predict, label=label)
avg_cost = fluid.layers.mean(sum_cost, name="loss")
startup_program = fluid.default_startup_program()
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_program)
job_generator = JobGenerator()
program_path = './load_file'
job_generator.save_program(program_path, [input, label],
[['predict', predict], ['accuracy', accuracy]],
avg_cost)
```
#### How to load a program
In fl_master.py, you can load the program in 'load_file' and transfer it into an fl program.
```
build_strategy = FLStrategyFactory()
build_strategy.fed_avg = True
build_strategy.inner_step = 10
strategy = build_strategy.create_fl_strategy()
endpoints = ["127.0.0.1:8181"]
output = "fl_job_config"
program_file = "./load_file"
job_generator = JobGenerator()
job_generator.generate_fl_job_from_program(
strategy=strategy,
endpoints=endpoints,
worker_num=2,
program_input=program_file,
output=output)
```
#### How to work in RunTime
```
python -u fl_scheduler.py >scheduler.log &
python -u fl_server.py >server0.log &
python -u fl_trainer.py 0 >trainer0.log &
python -u fl_trainer.py 1 >trainer1.log &
```
In fl_scheduler.py, we let server and trainers to do registeration.
```
worker_num = 2
server_num = 1
#Define number of worker/server and the port for scheduler
scheduler = FLScheduler(worker_num, server_num, port=9091)
scheduler.set_sample_worker_num(2)
scheduler.init_env()
print("init env done.")
scheduler.start_fl_training()
```
In fl_server.py, we load and run the FL server job.
```
server = FLServer()
server_id = 0
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_server_job(job_path, server_id)
job._scheduler_ep = "127.0.0.1:9091" # IP address for scheduler
server.set_server_job(job)
server._current_ep = "127.0.0.1:8181" # IP address for server
server.start()
```
In fl_trainer.py, we load and run the FL trainer job, then evaluate the accuracy with test data.
```
trainer_id = int(sys.argv[1]) # trainer id for each guest
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id)
job._scheduler_ep = "127.0.0.1:9091" # Inform scheduler IP address to trainer
trainer = FLTrainerFactory().create_fl_trainer(job)
trainer._current_ep = "127.0.0.1:{}".format(9000 + trainer_id)
place = fluid.CPUPlace()
trainer.start(place)
test_program = trainer._main_program.clone(for_test=True)
train_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.mnist.train(), buf_size=500),
batch_size=64)
test_reader = paddle.batch(paddle.dataset.mnist.test(), batch_size=64)
input = fluid.layers.data(name='input', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
feeder = fluid.DataFeeder(feed_list=[input, label], place=fluid.CPUPlace())
def train_test(train_test_program, train_test_feed, train_test_reader):
acc_set = []
for test_data in train_test_reader():
acc_np = trainer.exe.run(program=train_test_program,
feed=train_test_feed.feed(test_data),
fetch_list=["accuracy_0.tmp_0"])
acc_set.append(float(acc_np[0]))
acc_val_mean = numpy.array(acc_set).mean()
return acc_val_mean
output_folder = "model_node%d" % trainer_id
epoch_id = 0
step = 0
while not trainer.stop():
epoch_id += 1
if epoch_id > 40:
break
print("epoch %d start train" % (epoch_id))
for step_id, data in enumerate(train_reader()):
acc = trainer.run(feeder.feed(data), fetch=["accuracy_0.tmp_0"])
step += 1
acc_val = train_test(
train_test_program=test_program,
train_test_reader=test_reader,
train_test_feed=feeder)
print("Test with epoch %d, accuracy: %s" % (epoch_id, acc_val))
save_dir = (output_folder + "/epoch_%d") % epoch_id
trainer.save_inference_program(output_folder)
```
......@@ -35,7 +35,8 @@ job.load_trainer_job(job_path, trainer_id)
job._scheduler_ep = "127.0.0.1:9091" # Inform scheduler IP address to trainer
trainer = FLTrainerFactory().create_fl_trainer(job)
trainer._current_ep = "127.0.0.1:{}".format(9000 + trainer_id)
trainer.start()
place = fluid.CPUPlace()
trainer.start(place)
test_program = trainer._main_program.clone(for_test=True)
train_reader = paddle.batch(
......
# Example in recommendation with FedAvg
This document introduces how to use PaddleFL to train a model with Fl Strategy.
### Dependencies
- paddlepaddle>=1.8
### How to install PaddleFL
Please use pip which has paddlepaddle installed
```sh
pip install paddle_fl
```
### Model
[Gru4rec](https://arxiv.org/abs/1511.06939) is a classical session-based recommendation model. Detailed implementations with paddlepaddle is [here](https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/gru4rec).
### Datasets
Public Dataset [Rsc15](https://2015.recsyschallenge.com)
```sh
#download data
sh download.sh
```
### How to work in PaddleFL
PaddleFL has two phases , CompileTime and RunTime. In CompileTime, a federated learning task is defined by fl_master. In RunTime, a federated learning job is executed on fl_server and fl_trainer in distributed clusters.
```sh
sh run.sh
```
### How to work in CompileTime
In this example, we implement compile time programs in fl_master.py
```sh
# please run fl_master to generate fl_job
python fl_master.py
```
In fl_master.py, we first define FL-Strategy, User-Defined-Program and Distributed-Config. Then FL-Job-Generator generate FL-Job for federated server and worker.
```python
# define model
model = Model()
model.gru4rec_network()
# define JobGenerator and set model config
# feed_name and target_name are config for save model.
job_generator = JobGenerator()
optimizer = fluid.optimizer.SGD(learning_rate=2.0)
job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program)
job_generator.set_infer_feed_and_target_names(
[x.name for x in model.inputs], [model.loss.name, model.recall.name])
# define FL-Strategy , we now support two flstrategy, fed_avg and dpsgd. Inner_step means fl_trainer locally train inner_step mini-batch.
build_strategy = FLStrategyFactory()
build_strategy.fed_avg = True
build_strategy.inner_step = 1
strategy = build_strategy.create_fl_strategy()
# define Distributed-Config and generate fl_job
endpoints = ["127.0.0.1:8181"]
output = "fl_job_config"
job_generator.generate_fl_job(
strategy, server_endpoints=endpoints, worker_num=4, output=output)
```
### How to work in RunTime
```sh
python -u fl_scheduler.py >scheduler.log &
python -u fl_server.py >server0.log &
python -u fl_trainer.py 0 >trainer0.log &
python -u fl_trainer.py 1 >trainer1.log &
python -u fl_trainer.py 2 >trainer2.log &
python -u fl_trainer.py 3 >trainer3.log &
```
fl_trainer.py can define own reader according to data.
```python
r = Gru4rec_Reader()
train_reader = r.reader(train_file_dir, place, batch_size=10)
```
### Simulated experiments on real world dataset
To show the concept and effectiveness of horizontal federated learning with PaddleFL, a simulated experiment is conducted on an open source dataset with a real world task. In horizontal federated learning, a group of organizations are doing similar tasks based on private dataset and they are willing to collaborate on a certain task. The goal of the collaboration is to improve the task accuracy with federated learning.
The simulated experiment suppose all organizations have homogeneous dataset and homogeneous task which is an ideal case. The whole dataset is from small part of [Rsc15] and each organization has a subset as a private dataset. To show the performanc e improvement under federated learning, models based on each organization's private dataset are trained and a model under distributed federated learning is trained. A model based on traditional parameter server training is also trained where the whole dataset is owned by a single organization.
From the table and the figure given below, model evaluation results are similar between federated learning and traditional parameter server training. It is clear that compare with models trained with only private dataset, models' performance for each organization get significant improvement with federated learning.
```sh
# download code and readme
wget https://paddle-zwh.bj.bcebos.com/gru4rec_paddlefl_benchmark/gru4rec_benchmark.tar
```
| Dataset | training methods | FL Strategy | recall@20|
| --- | --- | --- |---|
| the whole dataset | private training | - | 0.504 |
| the whole dataset | federated learning | FedAvg | 0.504 |
| 1/4 of the whole dataset | private training | - | 0.286 |
| 1/4 of the whole dataset | private training | - | 0.277 |
| 1/4 of the whole dataset | private training | - | 0.269 |
| 1/4 of the whole dataset | private training | - | 0.282 |
<img src="fl_benchmark.png" height=300 width=500 hspace='10'/> <br />
# Example in Recognize Digits with Secure Aggragate
This document introduces how to use PaddleFL to train a model with Fl Strategy: Secure Aggregation. Using Secure Aggregation strategy, the server can aggregate the model parameters without learning the value of the parameters.
### Dependencies
- paddlepaddle>=1.8
### How to install PaddleFL
Please use pip which has paddlepaddle installed
```
pip install paddle_fl
```
### Model
The simplest Softmax regression model is to get features with input layer passing through a fully connected layer and then compute and ouput probabilities of multiple classes directly via Softmax function [[PaddlePaddle tutorial: recognize digits](https://github.com/PaddlePaddle/book/tree/develop/02.recognize_digits#references)].
### Datasets
Public Dataset [MNIST](http://yann.lecun.com/exdb/mnist/)
The dataset will downloaded automatically in the API and will be located under `/home/username/.cache/paddle/dataset/mnist`:
| filename | note |
| ----------------------- | ------------------------------- |
| train-images-idx3-ubyte | train data picture, 60,000 data |
| train-labels-idx1-ubyte | train data label, 60,000 data |
| t10k-images-idx3-ubyte | test data picture, 10,000 data |
| t10k-labels-idx1-ubyte | test data label, 10,000 data |
### How to work in PaddleFL
PaddleFL has two phases , CompileTime and RunTime. In CompileTime, a federated learning task is defined by fl_master. In RunTime, a federated learning job is executed on fl_server and fl_trainer in distributed clusters.
```
sh run.sh
```
#### How to work in CompileTime
In this example, we implement compile time programs in fl_master.py
```
python fl_master.py
```
In fl_master.py, we first define FL-Strategy, User-Defined-Program and Distributed-Config. Then FL-Job-Generator generate FL-Job for federated server and worker.
```python
def linear_regression(self, inputs, label):
param_attrs = fluid.ParamAttr(
name="fc_0.b_0",
initializer=fluid.initializer.ConstantInitializer(0.0))
param_attrs = fluid.ParamAttr(
name="fc_0.w_0",
initializer=fluid.initializer.ConstantInitializer(0.0))
self.predict = fluid.layers.fc(input=inputs, size=10, act='softmax', param_attr=param_attrs)
self.sum_cost = fluid.layers.cross_entropy(input=self.predict, label=label)
self.loss = fluid.layers.mean(self.sum_cost)
self.accuracy = fluid.layers.accuracy(input=self.predict, label=label)
self.startup_program = fluid.default_startup_program()
inputs = fluid.layers.data(name='x', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='y', shape=[1], dtype='int64')
model = Model()
model.linear_regression(inputs, label)
job_generator = JobGenerator()
optimizer = fluid.optimizer.SGD(learning_rate=0.01)
job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program)
job_generator.set_infer_feed_and_target_names(
[inputs.name, label.name], [model.loss.name])
build_strategy = FLStrategyFactory()
build_strategy.sec_agg = True
param_name_list = []
param_name_list.append("fc_0.w_0.opti.trainer_") # need trainer_id when running
param_name_list.append("fc_0.b_0.opti.trainer_")
build_strategy.param_name_list = param_name_list
build_strategy.inner_step = 10
strategy = build_strategy.create_fl_strategy()
# endpoints will be collected through the cluster
# in this example, we suppose endpoints have been collected
endpoints = ["127.0.0.1:8181"]
output = "fl_job_config"
job_generator.generate_fl_job(
strategy, server_endpoints=endpoints, worker_num=2, output=output)
```
#### How to work in RunTime
```shell
python3 fl_master.py
sleep 2
python3 -u fl_server.py >log/server0.log &
sleep 2
python3 -u fl_trainer.py 0 >log/trainer0.log &
sleep 2
python3 -u fl_trainer.py 1 >log/trainer1.log &
```
In fl_scheduler.py, we let server and trainers to do registeration.
```
worker_num = 2
server_num = 1
#Define number of worker/server and the port for scheduler
scheduler = FLScheduler(worker_num, server_num, port=9091)
scheduler.set_sample_worker_num(2)
scheduler.init_env()
print("init env done.")
scheduler.start_fl_training()
```
In fl_server.py, we load and run the FL server job.
```
server = FLServer()
server_id = 0
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_server_job(job_path, server_id)
server.set_server_job(job)
server.start()
```
In fl_trainer.py, we prepare the MNIST dataset, load and run the FL trainer job, then evaluate the accuracy. Before training , we first prepare the party's private key and other party's public key. Then, each party generates a random noise using Diffie-Hellman key aggregate protocol with its private key and each other's public key [1]. If the other party's id is larger than this party's id, the model parameters add this random noise. If the other party's id is less than this party's id, the model parameters subtract this random noise. So, and the model parameters is masked before uploading to the server. Finally, the random noises can be removed when aggregating the masked parameters from all the parties.
```python
logging.basicConfig(filename="log/test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG)
logger = logging.getLogger("FLTrainer")
BATCH_SIZE = 64
train_reader = paddle.batch(
paddle.reader.shuffle(paddle.dataset.mnist.train(), buf_size=500),
batch_size=BATCH_SIZE)
test_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=BATCH_SIZE)
trainer_num = 2
trainer_id = int(sys.argv[1]) # trainer id for each guest
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id)
trainer = FLTrainerFactory().create_fl_trainer(job)
trainer.trainer_id = trainer_id
trainer.trainer_num = trainer_num
trainer.key_dir = "./keys/"
trainer.start()
output_folder = "fl_model"
epoch_id = 0
step_i = 0
inputs = fluid.layers.data(name='x', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='y', shape=[1], dtype='int64')
feeder = fluid.DataFeeder(feed_list=[inputs, label], place=fluid.CPUPlace())
# for test
test_program = trainer._main_program.clone(for_test=True)
def train_test(train_test_program,
train_test_feed, train_test_reader):
acc_set = []
avg_loss_set = []
for test_data in train_test_reader():
acc_np, avg_loss_np = trainer.exe.run(
program=train_test_program,
feed=train_test_feed.feed(test_data),
fetch_list=["accuracy_0.tmp_0", "mean_0.tmp_0"])
acc_set.append(float(acc_np))
avg_loss_set.append(float(avg_loss_np))
acc_val_mean = numpy.array(acc_set).mean()
avg_loss_val_mean = numpy.array(avg_loss_set).mean()
return avg_loss_val_mean, acc_val_mean
# for test
while not trainer.stop():
epoch_id += 1
print("epoch %d start train" % (epoch_id))
for data in train_reader():
step_i += 1
trainer.step_id = step_i
accuracy, = trainer.run(feed=feeder.feed(data),
fetch=["accuracy_0.tmp_0"])
if step_i % 100 == 0:
print("Epoch: {0}, step: {1}, accuracy: {2}".format(epoch_id, step_i, accuracy[0]))
avg_loss_val, acc_val = train_test(train_test_program=test_program,
train_test_reader=test_reader,
train_test_feed=feeder)
print("Test with Epoch %d, avg_cost: %s, acc: %s" %(epoch_id, avg_loss_val, acc_val))
if epoch_id > 40:
break
if step_i % 100 == 0:
trainer.save_inference_program(output_folder)
```
[1] Aaron Segal, Antonio Marcedone, Benjamin Kreuter, Daniel Ramage, H. Brendan McMahan, Karn Seth, Keith Bonawitz, Sarvar Patel, Vladimir Ivanov. **Practical Secure Aggregation for Privacy-Preserving Machine Learning**, The 24th ACM Conference on Computer and Communications Security (**CCS**), 2017
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册