提交 9a2577a9 编写于 作者: G guru4elephant

initial commit to github

上级 69ab9ec6
| Github account | name |
|---|---|
| guru4elephant | Daxiang Dong |
\ No newline at end of file
| guru4elephant | Daxiang Dong |
| frankwhzhang | Wenhui Zhang |
\ No newline at end of file
# PaddleFL
Federated Deep Learning in PaddlePaddle
PaddleFL is an open source federated learning framework based on PaddlePaddle. Researchers can easily replicate and compare different federated learning algorithms with PaddleFL. Developers can also benefit from PaddleFL in that it is easy to deploy a federated learning system in large scale distributed clusters. In PaddleFL, serveral federated learning strategies will be provided with application in computer vision, natural language processing, recommendation and so on. Application of traditional machine learning training strategies such as Multi-task learning, Transfer Learning in Federated Learning settings will be provided. Based on PaddlePaddle's large scale distributed training and elastic scheduling of training job on Kubernetes, PaddleFL can be easily deployed based on full-stack open sourced software.
## Federated Learning
Data is becoming more and more expensive nowadays, and sharing of raw data is very hard across organizations. Federated Learning aims to solve the problem of data isolation and secure sharing of data knowledge among organizations. The concept of federated learning is proposed by researchers in Google [1, 2, 3].
## Overview of PaddleFL
<img src='images/FL-framework.png' width = "1300" height = "310" align="middle"/>
In PaddleFL, horizontal and vertical federated learning strategies will be implemented according to the categorization given in [4]. Application demonstrations in natural language processing, computer vision and recommendation will be provided in PaddleFL.
#### Federated Learning Strategy
- **Vertical Federated Learning**: Logistic Regression with PrivC, Neural Network with third-party PrivC [5]
- **Horizontal Federated Learning**: Federated Averaging [2], Differential Privacy [6]
#### Training Strategy
- **Multi Task Learning** [7]
- **Transfer Learning** [8]
- **Active Learning**
## Framework design of PaddleFL
<img src='images/FL-training.png' width = "1300" height = "310" align="middle"/>
In PaddleFL, components for defining a federated learning task and training a federated learning job are as follows:
#### Compile Time
- **FL-Strategy**: a user can define federated learning strategies with FL-Strategy such as Fed-Avg[1]
- **User-Defined-Program**: PaddlePaddle's program that defines the machine learning model structure and training strategies such as multi-task learning.
- **Distributed-Config**: In federated learning, a system should be deployed in distributed settings. Distributed Training Config defines distributed training node information.
- **FL-Job-Generator**: Given FL-Strategy, User-Defined Program and Distributed Training Config, FL-Job for federated server and worker will be generated through FL Job Generator. FL-Jobs will be sent to organizations and federated parameter server for run-time execution.
#### Run Time
- **FL-Server**: federated parameter server that usually runs in cloud or third-party clusters.
- **FL-Worker**: Each organization participates in federated learning will have one or more federated workers that will communicate with the federated parameter server.
## Install Guide
``` shell
python setup.py install
python -c "import paddle_fl as fl"
```
## Quick-Start Example
``` shell
cd paddle_fl/demo
python fl_master.py
python fl_server.py 2> server0.errlog > server0.stdlog &
python fl_trainer.py 0 2> trainer0.errlog > trainer0.stdlog &
python fl_trainer.py 1 2> trainer0.errlog > trainer0.stdlog &
```
## Benchmark task
Gru4Rec [9] introduces recurrent neural network model in session-based recommendation. PaddlePaddle's Gru4Rec implementation is in https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/gru4rec.
## On Going and Future Work
- Experimental benchmark with public datasets in federated learning settings.
- Federated Learning Systems deployment methods in Kubernetes.
- Vertical Federated Learning Strategies and more horizontal federated learning strategies will be open sourced.
## Reference
[1]. Jakub Konečný, H. Brendan McMahan, Daniel Ramage, Peter Richtárik. **Federated Optimization: Distributed Machine Learning for On-Device Intelligence.** 2016
[2]. H. Brendan McMahan, Eider Moore, Daniel Ramage, Blaise Agüera y Arcas. **Federated Learning of Deep Networks using Model Averaging.** 2017
[3]. Jakub Konečný, H. Brendan McMahan, Felix X. Yu, Peter Richtárik, Ananda Theertha Suresh, Dave Bacon. **Federated Learning: Strategies for Improving Communication Efficiency.** 2016
[4]. Qiang Yang, Yang Liu, Tianjian Chen, Yongxin Tong. **Federated Machine Learning: Concept and Applications.** 2019
[5]. Kai He, Liu Yang, Jue Hong, Jinghua Jiang, Jieming Wu, Xu Dong et al. **PrivC - A framework for efficient Secure Two-Party Computation. In Proceedings of 15th EAI International Conference on Security and Privacy in Communication Networks.** SecureComm 2019
[6]. Martín Abadi, Andy Chu, Ian Goodfellow, H. Brendan McMahan, Ilya Mironov, Kunal Talwar, Li Zhang. **Deep Learning with Differential Privacy.** 2016
[7]. Virginia Smith, Chao-Kai Chiang, Maziar Sanjabi, Ameet Talwalkar. **Federated Multi-Task Learning** 2016
[8]. Yang Liu, Tianjian Chen, Qiang Yang. **Secure Federated Transfer Learning.** 2018
[9]. Balázs Hidasi, Alexandros Karatzoglou, Linas Baltrunas, Domonkos Tikk. **Session-based Recommendations with Recurrent Neural Networks.** 2016
\ No newline at end of file
# PaddleFL
PaddleFL是一个基于PaddlePaddle的开源联邦学习框架。研究人员可以很轻松地用PaddleFL复制和比较不同的联邦学习算法。开发人员也可以从padderFL中获益,因为用PaddleFL在大规模分布式集群中部署联邦学习系统很容易。PaddleFL提供了很多联邦学习策略及其在计算机视觉、自然语言处理、推荐算法等领域的应用。此外,PaddleFL还将提供传统机器学习训练策略的应用,例如多任务学习、联邦学习环境下的转移学习。依靠着PaddlePaddle的大规模分布式训练和Kubernetes的训练工作弹性调度,PaddleFL可以基于全栈开源软件轻松地部署。
## 联邦学习
如今,数据变得越来越昂贵,而且跨组织共享原始数据非常困难。联合学习旨在解决组织间数据隔离和数据知识安全共享的问题。联邦学习的概念是由谷歌的研究人员提出的[1,2,3]。
## PaddleFL概述
<img src='images/FL-framework.png' width = "1300" height = "310" align="middle"/>
在padderfl中,水平和垂直联合学习策略将根据[4]中给出的分类来实现。PaddleFL也将提供在自然语言处理,计算机视觉和推荐算法等领域的应用演示。
#### 联邦学习策略
- **垂直联邦学习**: 带privc的逻辑回归,带第三方privc的神经网络[5]
- **水平联邦学习**: 联邦平均 [2],差分隐私 [6]
#### 训练策略
- **多任务学习** [7]
- **迁移学习** [8]
- **主动学习**
## PaddleFL框架设计
<img src='images/FL-training.png' width = "1300" height = "310" align="middle"/>
在PadderFL中,用于定义联邦学习任务和联邦学习训练工作的组件如下:
#### 编译时
- **FL-Strategy**: 用户可以使用FL-Strategy定义联邦学习策略,例如Fed-Avg[1]。
- **User-Defined-Program**: PaddlePaddle的程序定义了机器学习模型结构和训练策略,如多任务学习。
- **Distributed-Config**: 在联邦学习中,系统应该部署在分布式环境中。分布式训练配置定义分布式训练节点信息。
- **FL-Job-Generator**: 给定FL-Strategy, User-Defined Program 和 Distributed Training Config,联邦服务端和工作端的FL-Job将通过FL Job Generator生成。FL-Jobs 被发送到组织和联邦参数服务器以进行运行时执行。
#### 运行时
- **FL-Server**: 在云或第三方集群中运行的联邦参数服务器。
- **FL-Worker**: 参与联合学习的每个组织都将有一个或多个与联合参数服务器通信的联合工作者。
## 安装指南
``` shell
python setup.py install
python -c "import paddle_fl as fl"
```
## 快速入门
``` shell
cd paddle_fl/demo
python fl_master.py
python fl_server.py 2> server0.errlog > server0.stdlog &
python fl_trainer.py 0 2> trainer0.errlog > trainer0.stdlog &
python fl_trainer.py 1 2> trainer0.errlog > trainer0.stdlog &
```
## 性能测试
Gru4Rec [9] 在基于会话的推荐中引入了递归神经网络模型。PaddlePaddle的GRU4RC实现代码在 https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/gru4rec.
## 正在进行与发展方向
- 联邦学习在公共数据集上的实验基准。
- kubernetes中联邦学习系统的部署方法。
- 垂直联合学习策略和更多的水平联合学习策略将是开源的。
## 参考文献
[1]. Jakub Konečný, H. Brendan McMahan, Daniel Ramage, Peter Richtárik. **Federated Optimization: Distributed Machine Learning for On-Device Intelligence.** 2016
[2]. H. Brendan McMahan, Eider Moore, Daniel Ramage, Blaise Agüera y Arcas. **Federated Learning of Deep Networks using Model Averaging.** 2017
[3]. Jakub Konečný, H. Brendan McMahan, Felix X. Yu, Peter Richtárik, Ananda Theertha Suresh, Dave Bacon. **Federated Learning: Strategies for Improving Communication Efficiency.** 2016
[4]. Qiang Yang, Yang Liu, Tianjian Chen, Yongxin Tong. **Federated Machine Learning: Concept and Applications.** 2019
[5]. Kai He, Liu Yang, Jue Hong, Jinghua Jiang, Jieming Wu, Xu Dong et al. **PrivC - A framework for efficient Secure Two-Party Computation. In Proceedings of 15th EAI International Conference on Security and Privacy in Communication Networks.** SecureComm 2019
[6]. Martín Abadi, Andy Chu, Ian Goodfellow, H. Brendan McMahan, Ilya Mironov, Kunal Talwar, Li Zhang. **Deep Learning with Differential Privacy.** 2016
[7]. Virginia Smith, Chao-Kai Chiang, Maziar Sanjabi, Ameet Talwalkar. **Federated Multi-Task Learning** 2016
[8]. Yang Liu, Tianjian Chen, Qiang Yang. **Secure Federated Transfer Learning.** 2018
[9]. Balázs Hidasi, Alexandros Karatzoglou, Linas Baltrunas, Domonkos Tikk. **Session-based Recommendations with Recurrent Neural Networks.** 2016
\ No newline at end of file
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import six
from . import common
from . import core
from . import dataset
from . import reader
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .master.fl_job import FLCompileTimeJob
from .master.fl_job import FLRunTimeJob
from .master.job_generator import JobGenerator
from .strategy.fl_strategy_base import DPSGDStrategy
from .strategy.fl_strategy_base import FedAvgStrategy
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import paddle.fluid as fluid
class FLJobBase(object):
"""
FLJobBase is fl job base class, responsible for save and load
a federated learning job
"""
def __init__(self):
pass
def _save_str_list(self, items, output):
with open(output, "w") as fout:
for item in items:
fout.write(item + "\n")
def _load_str_list(self, input_file):
res = []
with open(input_file, "r") as fin:
res.append(fin.readline().strip())
return res
def _save_strategy(self, strategy, output_file):
import pickle
pickle.dump(strategy, open(output_file, "wb"))
def _save_endpoints(self, endpoints, output_file):
with open(output_file, "w") as fout:
for ep in endpoints:
fout.write(str(ep) + "\n")
def _load_endpoints(self, input_file):
ep_list = []
with open(input_file, "r") as fin:
for line in fin:
ep_list.append(line.strip())
return ep_list
def _save_program(self, program, output_file):
with open(output_file, "wb") as fout:
fout.write(program.desc.serialize_to_string())
def _save_readable_program(self, program, output_file):
with open(output_file, "w") as fout:
fout.write(str(program))
def _load_program(self, input_file):
with open(input_file, "rb") as fin:
program_desc_str = fin.read()
return fluid.Program.parse_from_string(program_desc_str)
return None
class FLCompileTimeJob(FLJobBase):
"""
FLCompileTimeJob is a container for compile time job in federated learning.
trainer startup programs, trainer main programs and other trainer programs
are in FLCompileTimeJob. Also, server main programs and server startup programs
are in this class. FLCompileTimeJob has server endpoints for debugging as well
"""
def __init__(self):
self._trainer_startup_programs = []
self._trainer_recv_programs = []
self._trainer_main_programs = []
self._trainer_send_programs = []
self._server_startup_programs = []
self._server_main_programs = []
self._server_endpoints = []
def set_strategy(self, strategy):
self._strategy = strategy
def set_server_endpoints(self, ps_endpoints):
self._server_endpoints = ps_endpoints
def set_feed_names(self, names):
self._feed_names = names
def set_target_names(self, names):
self._target_names = names
def save(self, folder=None):
server_num = len(self._server_startup_programs)
trainer_num = len(self._trainer_startup_programs)
send_prog_num = len(self._trainer_send_programs)
for i in range(server_num):
server_folder = "%s/server%d" % (folder, i)
os.system("mkdir -p %s" % server_folder)
server_startup = self._server_startup_programs[i]
server_main = self._server_main_programs[i]
self._save_program(
server_startup,
"%s/server.startup.program" % server_folder)
self._save_program(
server_main,
"%s/server.main.program" % server_folder)
self._save_readable_program(
server_startup,
"%s/server.startup.program.txt" % server_folder)
self._save_readable_program(
server_main,
"%s/server.main.program.txt" % server_folder)
for i in range(trainer_num):
trainer_folder = "%s/trainer%d" % (folder, i)
os.system("mkdir -p %s" % trainer_folder)
trainer_startup = self._trainer_startup_programs[i]
trainer_main = self._trainer_main_programs[i]
self._save_program(
trainer_startup,
"%s/trainer.startup.program" % trainer_folder)
self._save_program(
trainer_main,
"%s/trainer.main.program" % trainer_folder)
self._save_readable_program(
trainer_startup,
"%s/trainer.startup.program.txt" % trainer_folder)
self._save_readable_program(
trainer_main,
"%s/trainer.main.program.txt" % trainer_folder)
for i in range(send_prog_num):
trainer_folder = "%s/trainer%d" % (folder, i)
trainer_send = self._trainer_send_programs[i]
trainer_recv = self._trainer_recv_programs[i]
self._save_program(
trainer_send,
"%s/trainer.send.program" % trainer_folder)
self._save_program(
trainer_recv,
"%s/trainer.recv.program" % trainer_folder)
self._save_readable_program(
trainer_send,
"%s/trainer.send.program.txt" % trainer_folder)
self._save_readable_program(
trainer_recv,
"%s/trainer.recv.program.txt" % trainer_folder)
self._save_str_list(self._feed_names,
"%s/feed_names" % folder)
self._save_str_list(self._target_names,
"%s/target_names" % folder)
self._save_endpoints(self._server_endpoints,
"%s/endpoints" % folder)
self._save_strategy(self._strategy,
"%s/strategy.pkl" % folder)
class FLRunTimeJob(FLJobBase):
"""
FLRunTimeJob is a contrainer for run time job in federated leanring.
A trainer or a server can load FLRunTimeJob. Only necessary programs
can be loaded in FLRunTimeJob
"""
def __init__(self):
self._trainer_startup_program = None
self._trainer_recv_program = None
self._trainer_main_program = None
self._trainer_send_program = None
self._server_startup_program = None
self._server_main_program = None
self._feed_names = None
self._target_names = None
def _load_strategy(self, input_file):
import pickle
return pickle.load(open(input_file, "rb"))
def load_trainer_job(self, folder=None, trainer_id=0):
"""
Load trainer job given training folder and trainer id
Currently, a trainer_id is assigned to a trainer node, and
corresponding FL Job will be sent to the trainer node.
Args:
folder(str): FL Job folder name
trainer_id(int): trainer index for current job
Return:
None
"""
folder_name = "%s/trainer%d" % (folder, trainer_id)
startup_fn = "%s/trainer.startup.program" % folder_name
self._trainer_startup_program = self._load_program(startup_fn)
main_fn = "%s/trainer.main.program" % folder_name
self._trainer_main_program = self._load_program(main_fn)
send_fn = "%s/trainer.send.program" % folder_name
self._trainer_send_program = self._load_program(send_fn)
recv_fn = "%s/trainer.recv.program" % folder_name
self._trainer_recv_program = self._load_program(recv_fn)
endpoints_fn = "%s/endpoints" % folder
self._endpoints = self._load_endpoints(endpoints_fn)
strategy_fn = "%s/strategy.pkl" % folder
self._strategy = self._load_strategy(strategy_fn)
feed_names_fn = "%s/feed_names" % folder
self._feed_names = self._load_str_list(feed_names_fn)
target_names_fn = "%s/target_names" % folder
self._target_names = self._load_str_list(target_names_fn)
def load_server_job(self, folder=None, server_id=0):
"""
Load server job given training folder and server_id
Currently, a server_id is assigned to a server node, and
corresponding FL Job will be sent to the server node.
Args:
folder(str): FL Job folder name
server_id(int): server index for current job
Return:
None
"""
folder_name = "%s/server%d" % (folder, server_id)
startup_fn = "%s/server.startup.program" % folder_name
self._server_startup_program = self._load_program(startup_fn)
main_fn = "%s/server.main.program" % folder_name
self._server_main_program = self._load_program(main_fn)
endpoints_fn = "%s/endpoints" % folder
self._endpoints = self._load_endpoints(endpoints_fn)
import pickle
strategy_fn = "%s/strategy.pkl" % folder
self._strategy = self._load_strategy(strategy_fn)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
from .fl_job import FLCompileTimeJob
class JobGenerator(object):
"""
A JobGenerator is responsible for generating distributed federated
learning configs. Before federated learning job starts, organizations
need to define a deep learning model together to do horizontal federated
learning.
"""
def __init__(self):
# worker num for federated learning
self._worker_num = 0
# startup program
self._startup_prog = None
# inner optimizer
self._optimizer = \
fluid.optimizer.SGD(learning_rate=0.001)
def set_optimizer(self, optimizer):
"""
Set optimizer of current job
"""
self._optimizer = optimizer
def set_losses(self, losses):
"""
Set losses of current job
losses can be a list of loss so that we can do
optimization on multiple losses
"""
self._losses = losses
def set_startup_program(self, startup=None):
"""
set startup program for user defined program
"""
if startup == None:
startup = fluid.default_startup_program()
self._startup_prog = startup
def set_infer_feed_and_target_names(self, feed_names, target_names):
self._feed_names = feed_names
self._target_names = target_names
def generate_fl_job(self,
fl_strategy,
server_endpoints=[],
worker_num=1,
output=None):
"""
Generate Federated Learning Job, based on user defined configs
Args:
fl_strategy(FLStrategyBase): federated learning strategy defined by current federated users
server_endpoints(List(str)): endpoints for federated server nodes
worker_num(int): number of training nodes
output(str): output directory of generated fl job
Returns:
None
Examples:
import paddle.fluid as fluid
import paddle_fl as fl
from paddle_fl.core.master.job_generator import JobGenerator
from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory
input_x = fluid.layers.data(name="input_x", shape=[10], dtype="float32")
label = fluid.layers.data(name="label", shape[1], dtype="int64")
fc0 = fluid.layers.fc(input=input_x, size=2, act='sigmoid')
cost = fluid.layers.cross_entropy(input=fc0, label=label)
loss = fluid.layers.reduce_mean(cost)
job_generator = JobGenerator()
optimizer = fluid.optimizer.SGD(learning_rate=0.1)
job_generator.set_optimizer(optimizer)
job_generator.set_losses([loss])
server_endpoints = [127.0.0.1:8181]
worker_num = 10
build_strategy = FLStrategyFactor()
build_strategy.fed_avg = True
strategy = build_strategy.create_fl_strategy()
job_output_dir = "fl_job_config"
job_generator.generate_fl_job(strategy,
server_endpoints=server_endpoints,
worker_num=1,
output=output)
"""
local_job = FLCompileTimeJob()
assert len(self._losses) > 0
assert self._startup_prog != None
assert fl_strategy != None
assert output != None
fl_strategy.minimize(self._optimizer, self._losses)
# strategy can generate startup and main program
# of a single worker and servers
for trainer_id in range(worker_num):
startup_program = self._startup_prog.clone()
main_program = self._losses[0].block.program.clone()
fl_strategy._build_trainer_program_for_job(
trainer_id, program=main_program,
ps_endpoints=server_endpoints, trainers=worker_num,
sync_mode=True, startup_program=startup_program,
job=local_job)
startup_program = self._startup_prog.clone()
main_program = self._losses[0].block.program.clone()
fl_strategy._build_server_programs_for_job(
program=main_program, ps_endpoints=server_endpoints,
trainers=worker_num, sync_mode=True,
startup_program=startup_program, job=local_job)
local_job.set_feed_names(self._feed_names)
local_job.set_target_names(self._target_names)
local_job.set_strategy(fl_strategy)
local_job.save(output)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
class FLServer(object):
def __init__(self):
self._startup_program = None
self._main_program = None
def set_server_job(self, job):
# need to parse startup and main program in job
# need to parse current endpoint
# need to parse master endpoint
self._startup_program = job._server_startup_program
self._main_program = job._server_main_program
def start(self):
exe = fluid.Executor(fluid.CPUPlace())
exe.run(self._startup_program)
exe.run(self._main_program)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from .program_utils import *
from .ufind import *
from .checkport import *
from .vars_distributed import *
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import time
import socket
from contextlib import closing
from six import string_types
def wait_server_ready(endpoints):
"""
Wait until parameter servers are ready, use connext_ex to detect
port readiness.
Args:
endpoints (list): endpoints string list, like:
["127.0.0.1:8080", "127.0.0.1:8081"]
Examples:
.. code-block:: python
wait_server_ready(["127.0.0.1:8080", "127.0.0.1:8081"])
"""
assert not isinstance(endpoints, string_types)
while True:
all_ok = True
not_ready_endpoints = []
for ep in endpoints:
ip_port = ep.split(":")
with closing(socket.socket(socket.AF_INET,
socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex((ip_port[0], int(ip_port[1])))
if result != 0:
all_ok = False
not_ready_endpoints.append(ep)
if not all_ok:
sys.stderr.write("server not ready, wait 3 sec to retry...\n")
sys.stderr.write("not ready endpoints:" + str(not_ready_endpoints) +
"\n")
sys.stderr.flush()
time.sleep(3)
else:
break
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import six
from paddle.fluid import core
import paddle
def delete_ops(block, ops):
for op in ops:
try:
idx = list(block.ops).index(op)
block._remove_op(idx)
except Exception as e:
print(e)
def find_op_by_input_arg(block, arg_name):
for index, op in enumerate(block.ops):
if arg_name in op.input_arg_names:
return index
return -1
def find_op_by_output_arg(block, arg_name, reverse=False):
if reverse:
pos = len(block.ops) - 1
while pos >= 0:
op = block.ops[pos]
if arg_name in op.output_arg_names:
return pos
pos -= 1
else:
for index, op in enumerate(block.ops):
if arg_name in op.output_arg_names:
return index
return -1
def get_indent_space(indent, space_num=4):
ret = ""
for i in range(0, indent * space_num):
ret += " "
return ret
def variable_to_code(var):
"""
Get readable codes of fluid variable.
Args:
var: A fluid operator.
Returns:
string: The formatted string.
"""
if var.type == core.VarDesc.VarType.SELECTED_ROWS or var.type == core.VarDesc.VarType.LOD_TENSOR:
var_str = "{name} : fluid.{type}.shape{shape}.astype({dtype})".\
format(i="{", e="}", name=var.name, type=var.type, shape=var.shape, dtype=var.dtype)
else:
var_str = "{name} : fluid.{type})".\
format(i="{", e="}", name=var.name, type=var.type)
if type(var) == paddle.fluid.framework.Parameter:
if var.trainable:
var_str = "trainable parameter " + var_str
else:
var_str = "parameter " + var_str
else:
var_str = "var " + var_str
if var.persistable:
var_str = "persist " + var_str
return var_str
def op_to_code(op, skip_op_callstack=True):
"""
Get readable codes of fluid operator.
Args:
op: A fluid operator.
Returns:
string: The foramtted string.
"""
outputs_str = "{"
for i in range(0, len(op.output_names)):
outputs_str += "{name}=".format(name=op.output_names[i])
o = op.output(op.output_names[i])
outputs_str += "{value}".format(value=o)
if i != len(op.output_names) - 1:
outputs_str += ", "
outputs_str += "}"
inputs_str = "{"
for i in range(0, len(op.input_names)):
inputs_str += "{name}=".format(name=op.input_names[i])
o = op.input(op.input_names[i])
inputs_str += "{value}".format(value=o)
if i != len(op.input_names) - 1:
inputs_str += ", "
inputs_str += "}"
attr_names = sorted(op.attr_names)
attrs_str = ""
for i in range(0, len(attr_names)):
name = attr_names[i]
if skip_op_callstack and name == "op_callstack":
continue
attr_type = op.desc.attr_type(name)
if attr_type == core.AttrType.BLOCK:
a = "{name} = block[{value}]".format(
name=name, type=attr_type, value=op._block_attr_id(name))
attrs_str += a
if i != len(attr_names) - 1:
attrs_str += ", "
continue
if attr_type == core.AttrType.BLOCKS:
a = "{name} = blocks{value}".format(
name=name, type=attr_type, value=op._blocks_attr_ids(name))
attrs_str += a
if i != len(attr_names) - 1:
attrs_str += ", "
continue
a = "{name} = {value}".format(
name=name, type=attr_type, value=op.desc.attr(name))
attrs_str += a
if i != len(attr_names) - 1:
attrs_str += ", "
if outputs_str != "{}":
op_str = "{outputs} = {op_type}(inputs={inputs}, {attrs})".\
format(outputs = outputs_str, op_type=op.type, inputs=inputs_str, attrs=attrs_str)
else:
op_str = "{op_type}(inputs={inputs}, {attrs})".\
format(op_type=op.type, inputs=inputs_str, attrs=attrs_str)
return op_str
def block_to_code(block, block_idx, fout=None, skip_op_callstack=False):
indent = 0
print(
"{0}{1} // block {2}".format(get_indent_space(indent), '{', block_idx),
file=fout)
indent += 1
# sort all vars
all_vars = sorted(six.iteritems(block.vars), key=lambda x: x[0])
for var in all_vars:
print(
"{}{}".format(get_indent_space(indent), variable_to_code(var[1])),
file=fout)
if len(all_vars) > 0:
print("", file=fout)
for op in block.ops:
print(
"{}{}".format(
get_indent_space(indent), op_to_code(op, skip_op_callstack)),
file=fout)
indent -= 1
print("{0}{1}".format(get_indent_space(indent), '}'), file=fout)
def program_to_code(prog, fout=None, skip_op_callstack=True):
"""
Print readable codes of fluid program.
Args:
prog : A fluid program.
An example result like bellow:
https://github.com/PaddlePaddle/Paddle/pull/12673
"""
block_idx = 0
for block in prog.blocks:
block_to_code(block, block_idx, fout, skip_op_callstack)
block_idx += 1
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
class PSDispatcher(object):
"""
PSDispatcher is the base class for dispatching vars
into different pserver instance.
You need to implement the `dispatch` inferface.
"""
def __init__(self, pserver_endpoints):
self._eps = pserver_endpoints
self._step = 0
@property
def eps(self):
return self._eps
def reset(self):
self._step = 0
def dispatch(self, varlist):
"""
Args:
varlist(list): a list of Variables
Returns:
a map of pserver endpoint -> varname
"""
AssertionError("Interface has not been implemented.")
class HashName(PSDispatcher):
"""
Hash variable names to several endpoints using python
"hash()" function.
Args:
pserver_endpoints (list): list of endpoint(ip:port).
Examples:
.. code-block:: python
pserver_endpoints = ["127.0.0.1:6007", "127.0.0.1:6008"]
vars = ["var1","var2","var3","var4","var5"]
rr = RoundRobin(pserver_endpoints)
rr.dispatch(vars)
"""
def __init__(self, pserver_endpoints):
super(self.__class__, self).__init__(pserver_endpoints)
def _hash_block(self, block_str, total):
return hash(block_str) % total
def dispatch(self, varlist):
eplist = []
for var in varlist:
server_id = self._hash_block(var.name(), len(self._eps))
server_for_param = self._eps[server_id]
eplist.append(server_for_param)
return eplist
class RoundRobin(PSDispatcher):
"""
Distribute variables to serveral endpoints using
RondRobin<https://en.wikipedia.org/wiki/Round-robin_scheduling> method.
Args:
pserver_endpoints (list): list of endpoint(ip:port).
Examples:
.. code-block:: python
pserver_endpoints = ["127.0.0.1:6007", "127.0.0.1:6008"]
vars = ["var1","var2","var3","var4","var5"]
rr = RoundRobin(pserver_endpoints)
rr.dispatch(vars)
"""
def __init__(self, pserver_endpoints):
super(self.__class__, self).__init__(pserver_endpoints)
def dispatch(self, varlist):
eplist = []
for var in varlist:
server_for_param = self._eps[self._step]
eplist.append(server_for_param)
self._step += 1
if self._step >= len(self._eps):
self._step = 0
return eplist
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
class UnionFind(object):
""" Union-find data structure.
Union-find is a data structure that keeps track of a set of elements partitioned
into a number of disjoint (non-overlapping) subsets.
Reference:
https://en.wikipedia.org/wiki/Disjoint-set_data_structure
Args:
elements(list): The initialize element list.
"""
def __init__(self, elementes=None):
self._parents = [] # index -> parent index
self._index = {} # element -> index
self._curr_idx = 0
if not elementes:
elementes = []
for ele in elementes:
self._parents.append(self._curr_idx)
self._index.update({ele: self._curr_idx})
self._curr_idx += 1
def find(self, x):
# Find the root index of given element x,
# execute the path compress while findind the root index
if not x in self._index:
return -1
idx = self._index[x]
while idx != self._parents[idx]:
t = self._parents[idx]
self._parents[idx] = self._parents[t]
idx = t
return idx
def union(self, x, y):
# Union two given element
x_root = self.find(x)
y_root = self.find(y)
if x_root == y_root:
return
self._parents[x_root] = y_root
def is_connected(self, x, y):
# If two given elements have the same root index,
# then they are connected.
return self.find(x) == self.find(y)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from paddle.fluid.framework import Variable
class VarStruct(object):
"""
record part properties of a Variable in python.
"""
def __init__(self, name, shape, dtype, type, lod_level, persistable):
self.name = name
self.shape = shape
self.dtype = dtype
self.type = type
self.lod_level = lod_level
self.persistable = persistable
class VarDistributed(object):
"""
a class to record the var distributed on parameter servers.
the class will record the relationship between origin var and slice var.
the slice var's properties, such as type/shape/offset/endpoint.
"""
def __init__(self,
origin_var,
slice_var,
is_slice=None,
block_id=None,
offset=None,
vtype=None,
endpoint=None):
"""
Args:
origin_var(Variable|VarStruct): origin var properties
slice_var(Variable|VarStruct): slice var properties
is_slice(bool|None): slice or not, slice_var=True/False and its block size > 8192 are the judgement standard.
block_id(int|None): the number about the slice var.
offset(int|None): if the slice var is sliced, offset is the numel before the var.
vtype(str|None): a tag, such as Optimizer/Param/RemoteProfetch.
endpoint(str|None): which parameter the slice var on, such as "127.0.0.1:1001"
"""
if isinstance(origin_var, Variable):
self.origin = self.__create_var_struct(origin_var)
else:
self.origin = origin_var
if isinstance(slice_var, Variable):
self.slice = self.__create_var_struct(slice_var)
else:
self.slice = slice_var
if self.equal(self.origin, self.slice):
self.is_slice = False
self.block_id = 0
self.offset = 0
else:
self.is_slice = True
self.block_id = 0
self.offset = 0
if is_slice is not None:
self.is_slice = is_slice
if block_id is not None:
self.block_id = block_id
if offset is not None:
self.offset = offset
self.vtype = vtype
self.endpoint = endpoint
@staticmethod
def __create_var_struct(var):
return VarStruct(var.name, var.shape, var.dtype, var.type,
var.lod_level, var.persistable)
@staticmethod
def equal(var1, var2):
"""
the two var is equal or not.
Returns:
bool: equal will return True else False
"""
assert isinstance(var1, VarStruct) and isinstance(var2, VarStruct)
return var1.name == var2.name and \
var1.type == var2.type and \
var1.shape == var2.shape and \
var1.dtype == var2.dtype and \
var1.lod_level == var2.lod_level and \
var1.persistable == var2.persistable
def __str__(self):
origin_var_str = "{name} : fluid.{type}.shape{shape}.astype({dtype})". \
format(i="{", e="}", name=self.origin.name, type=self.origin.type,
shape=self.origin.shape, dtype=self.origin.dtype)
slice_var_str = "{name} : fluid.{type}.shape{shape}.astype({dtype})" \
".slice({is_slice}).block({block_id}).offset({offset})". \
format(i="{", e="}", name=self.slice.name, type=self.slice.type,
shape=self.slice.shape, dtype=self.slice.dtype,
is_slice=self.is_slice, block_id=self.block_id, offset=self.offset)
return "var owned: {}, origin var: ( {} ), slice var: ( {} ), endpoint: {} ".format(
self.vtype, origin_var_str, slice_var_str, self.endpoint)
class VarsDistributed(object):
"""
a gather about VarDistributed with many methods to find distributed vars.
through the class, we can get overview about the distributed parameters on parameter servers.
this class may centralized and convenient for developer to manage and get variable's distribute.
other module can also use this to find variables such io.py.
"""
def __init__(self):
self.distributed_vars = []
def add_distributed_var(self,
origin_var,
slice_var,
is_slice=None,
block_id=None,
offset=None,
vtype=None,
endpoint=None):
"""
add distributed var in this.
Args:
origin_var(Variable|VarStruct): origin var properties
slice_var(Variable|VarStruct): slice var properties
is_slice(bool|None): slice or not, slice_var=True/False and its block size > 8192 are the judgement standard.
block_id(int|None): the number about the slice var.
offset(int|None): if the slice var is sliced, offset is the numel before the var.
vtype(str|None): a tag, such as Optimizer/Param/RemoteProfetch.
endpoint(str|None): which parameter the slice var on, such as "127.0.0.1:1001"
Returns:
None
"""
self.distributed_vars.append(
VarDistributed(origin_var, slice_var, is_slice, block_id, offset,
vtype, endpoint))
def get_distributed_var_by_slice(self, var_name):
"""
get distributed var by conditions.
Args:
var_name(str): slice var name, such as "w.traier0.block1"
Returns:
VarDistributed: distributed var.
"""
for dist_var in self.distributed_vars:
if dist_var.slice.name == var_name:
return dist_var
return None
@staticmethod
def equal(var1, var2):
"""
the two var is equal or not.
Returns:
bool: equal will return True else False
"""
return var1.name == var2.name and \
var1.type == var2.type and \
var1.shape == var2.shape and \
var1.dtype == var2.dtype and \
var1.lod_level == var2.lod_level and \
var1.persistable == var2.persistable
def get_distributed_var_by_origin_and_ep(self, origin_var_name, endpoint):
"""
get distributed var by conditions.
Args:
origin_var_name(str):
endpoint(str): the parameter endpoint, such as "127.0.0.1:1001"
Returns:
VarDistributed: distributed var.
"""
for dist_var in self.distributed_vars:
if dist_var.origin.name == origin_var_name and dist_var.endpoint == endpoint:
return dist_var
return None
def get_distributed_vars_by_vtypes(self, vtypes, groupby=False):
"""
get distributed vars by conditions.
Args:
vtype(str|None): distributed var's vtype, such as "Optimizer", "RemotePrefetch"
groupby(bool|False): group by origin var or not.
Returns:
list: distributed var list.
dict: distributed var map when groupby=True
"""
vtype_vars = []
for var in self.distributed_vars:
if var.vtype in vtypes:
vtype_vars.append(var)
if not groupby:
return vtype_vars
params_map = {}
for var in vtype_vars:
origin_var_name = var.origin.name
if origin_var_name in params_map.keys():
optimizers = params_map.get(origin_var_name)
else:
optimizers = []
optimizers.append(var)
params_map[origin_var_name] = optimizers
return params_map
def get_distributed_vars_by_ep(self, endpoint, vtype=None):
"""
get distributed vars by conditions.
Args:
endpoint(str): the parameter server endpoint, such as "127.0.0.1:2001"
vtype(str|None): distributed var's vtype, such as "Optimizer", "RemotePrefetch"
Returns:
list: distributed var list.
"""
endpoint_vars = []
for var in self.distributed_vars:
if var.endpoint == endpoint:
endpoint_vars.append(var)
if not vtype:
return endpoint_vars
vtype_vars = []
for var in endpoint_vars:
if var.vtype == vtype:
vtype_vars.append(var)
return vtype_vars
def overview(self):
"""
get the overview string about all params on all parameter servers.
Returns:
Str: overview string.
"""
vars_str = []
for var in self.distributed_vars:
vars_str.append(str(var))
return "\n".join(vars_str)
此差异已折叠。
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .fl_distribute_transpiler import FLDistributeTranspiler
from paddle.fluid.optimizer import SGD
import paddle.fluid as fluid
class FLStrategyFactory(object):
"""
FLStrategyFactory is a FLStrategy builder
Users can define strategy config to create different FLStrategy
"""
def __init__(self):
self._fed_avg = False
self._dpsgd = False
self._inner_step = 1
@property
def fed_avg(self):
return self._fed_avg
@fed_avg.setter
def fed_avg(self, s):
self._fed_avg = s
@property
def dpsgd(self):
return self._dpsgd
@fed_avg.setter
def dpsgd(self, s):
self._dpsgd = s
@property
def inner_step(self):
return self._inner_step
@inner_step.setter
def inner_step(self, s):
self._inner_step = s
def create_fl_strategy(self):
"""
The factory method
"""
if self._fed_avg == True:
strategy = FedAvgStrategy()
strategy._fed_avg = True
strategy._dpsgd = False
elif self._dpsgd == True:
strategy = DPSGDStrategy()
strategy._fed_avg = False
strategy._dpsgd = True
strategy._inner_step = self._inner_step
return strategy
class FLStrategyBase(object):
"""
FLStrategyBase is federated learning algorithm container
"""
def __init__(self):
self._fed_avg = False
self._dpsgd = False
self._inner_step = 1
pass
def minimize(self, optimizer=None, losses=[]):
"""
minmize can do minimization as paddle.fluid.Optimizer.minimize does
this function can be overloaded so that for some FLStrategy, the
program should be transpiled before minimize
Args:
optimizer(paddle.fluid.optimizer): the user defined optimizer
losses(List(Variable)): list of loss variables in paddle.fluid
"""
for loss in losses:
optimizer.minimize(loss)
def _build_trainer_program_for_job(
self, trainer_id=0, program=None,
ps_endpoints=[], trainers=0,
sync_mode=True, startup_program=None,
job=None):
pass
def _build_server_programs_for_job(
self, program=None, ps_endpoints=[],
trainers=0, sync_mode=True,
startup_program=None, job=None):
pass
class DPSGDStrategy(FLStrategyBase):
"""
DPSGDStrategy: Deep Learning with Differential Privacy. 2016
"""
def __init__(self):
super(DPSGDStrategy, self).__init__()
def minimize(self, optimizer=None, losses=[]):
"""
Do nothing in DPSGDStrategy in minimize function
"""
optimizer.minimize(losses[0])
def _build_trainer_program_for_job(
self, trainer_id=0, program=None,
ps_endpoints=[], trainers=0,
sync_mode=True, startup_program=None,
job=None):
transpiler = fluid.DistributeTranspiler()
transpiler.transpile(trainer_id,
program=program,
pservers=",".join(ps_endpoints),
trainers=trainers,
sync_mode=sync_mode,
startup_program=startup_program)
main = transpiler.get_trainer_program()
job._trainer_startup_programs.append(startup_program)
job._trainer_main_programs.append(main)
def _build_server_programs_for_job(
self, program=None, ps_endpoints=[],
trainers=0, sync_mode=True,
startup_program=None, job=None):
transpiler = fluid.DistributeTranspiler()
trainer_id = 0
transpiler.transpile(
trainer_id,
program=program,
pservers=",".join(ps_endpoints),
trainers=trainers,
sync_mode=sync_mode,
startup_program=startup_program)
job.set_server_endpoints(ps_endpoints)
for endpoint in ps_endpoints:
main_prog = transpiler.get_pserver_program(endpoint)
startup_prog = transpiler.get_startup_program(endpoint, main_prog)
job._server_startup_programs.append(startup_prog)
job._server_main_programs.append(main_prog)
class FedAvgStrategy(FLStrategyBase):
"""
FedAvgStrategy: this is model averaging optimization proposed in
H. Brendan McMahan, Eider Moore, Daniel Ramage, Blaise Aguera y Arcas. Federated Learning of Deep Networks using Model Averaging. 2017
"""
def __init__(self):
super(FedAvgStrategy, self).__init__()
def minimize(self, optimizer=None, losses=[]):
"""
minimize the first loss as in paddle.fluid
"""
optimizer.minimize(losses[0])
def _build_trainer_program_for_job(
self, trainer_id=0, program=None,
ps_endpoints=[], trainers=0,
sync_mode=True, startup_program=None,
job=None):
transpiler = FLDistributeTranspiler()
transpiler.transpile(trainer_id,
program=program,
pservers=",".join(ps_endpoints),
trainers=trainers,
sync_mode=sync_mode,
startup_program=startup_program)
recv, main, send = transpiler.get_trainer_program()
job._trainer_startup_programs.append(startup_program)
job._trainer_main_programs.append(main)
job._trainer_send_programs.append(send)
job._trainer_recv_programs.append(recv)
def _build_server_programs_for_job(
self, program=None, ps_endpoints=[],
trainers=0, sync_mode=True,
startup_program=None, job=None):
transpiler = FLDistributeTranspiler()
trainer_id = 0
transpiler.transpile(
trainer_id,
program=program,
pservers=",".join(ps_endpoints),
trainers=trainers,
sync_mode=sync_mode,
startup_program=startup_program)
job.set_server_endpoints(ps_endpoints)
for endpoint in ps_endpoints:
main_prog = transpiler.get_pserver_program(endpoint)
startup_prog = transpiler.get_startup_program(endpoint, main_prog)
job._server_startup_programs.append(startup_prog)
job._server_main_programs.append(main_prog)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
class FLTrainerFactory(object):
def __init__(self):
pass
def create_fl_trainer(self, job):
strategy = job._strategy
trainer = None
if strategy._fed_avg == True:
trainer = FedAvgTrainer()
trainer.set_trainer_job(job)
elif strategy._dpsgd == True:
trainer = FLTrainer()
trainer.set_trainer_job(job)
trainer.set_trainer_job(job)
return trainer
class FLTrainer(object):
def __init__(self):
pass
def set_trainer_job(self, job):
self._startup_program = \
job._trainer_startup_program
self._main_program = \
job._trainer_main_program
self._step = job._strategy._inner_step
self._feed_names = job._feed_names
self._target_names = job._target_names
def start(self):
self.exe = fluid.Executor(fluid.CPUPlace())
self.exe.run(self._startup_program)
def train_inner_loop(self, reader):
now_step = 0
for data in reader():
now_step += 1
if now_step > self._step:
break
self.exe.run(self._main_program,
feed=data,
fetch_list=[])
def save_inference_program(self, output_folder):
target_vars = []
infer_program = self._main_program.clone(for_test=True)
for name in self._target_names:
tmp_var = self._main_program.block(0)._find_var_recursive(name)
target_vars.append(tmp_var)
fluid.io.save_inference_model(
output_folder,
self._feed_names,
target_vars,
self.exe,
main_program=infer_program)
def stop(self):
# ask for termination with master endpoint
# currently not open sourced, will release the code later
# TODO(guru4elephant): add connection with master
return False
class FedAvgTrainer(FLTrainer):
def __init__(self):
super(FedAvgTrainer, self).__init__()
pass
def start(self):
self.exe = fluid.Executor(fluid.CPUPlace())
self.exe.run(self._startup_program)
def set_trainer_job(self, job):
super(FedAvgTrainer, self).set_trainer_job(job)
self._send_program = job._trainer_send_program
self._recv_program = job._trainer_recv_program
def train_inner_loop(self, reader):
self.exe.run(self._recv_program)
now_step = 0
for data in reader():
now_step += 1
if now_step > self._step:
break
self.exe.run(self._main_program,
feed=data,
fetch_list=[])
self.exe.run(self._send_program)
def stop(self):
return False
import paddle.fluid as fluid
import paddle_fl as fl
from paddle_fl.core.master.job_generator import JobGenerator
from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory
class Model(object):
def __init__(self):
pass
def mlp(self, inputs, label, hidden_size=128):
self.concat = fluid.layers.concat(inputs, axis=1)
self.fc1 = fluid.layers.fc(input=self.concat, size=256, act='relu')
self.fc2 = fluid.layers.fc(input=self.fc1, size=128, act='relu')
self.predict = fluid.layers.fc(input=self.fc2, size=2, act='softmax')
self.sum_cost = fluid.layers.cross_entropy(input=self.predict, label=label)
self.accuracy = fluid.layers.accuracy(input=self.predict, label=label)
self.loss = fluid.layers.reduce_mean(self.sum_cost)
self.startup_program = fluid.default_startup_program()
inputs = [fluid.layers.data( \
name=str(slot_id), shape=[5],
dtype="float32")
for slot_id in range(3)]
label = fluid.layers.data( \
name="label",
shape=[1],
dtype='int64')
model = Model()
model.mlp(inputs, label)
job_generator = JobGenerator()
optimizer = fluid.optimizer.SGD(learning_rate=0.1)
job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program)
job_generator.set_infer_feed_and_target_names(
[x.name for x in inputs], [model.predict.name])
build_strategy = FLStrategyFactory()
build_strategy.fed_avg = True
build_strategy.inner_step = 1
strategy = build_strategy.create_fl_strategy()
# endpoints will be collected through the cluster
# in this example, we suppose endpoints have been collected
endpoints = ["127.0.0.1:8181"]
output = "fl_job_config"
job_generator.generate_fl_job(
strategy, server_endpoints=endpoints, worker_num=2, output=output)
# fl_job_config will be dispatched to workers
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle_fl as fl
import paddle.fluid as fluid
from paddle_fl.core.server.fl_server import FLServer
from paddle_fl.core.master.fl_job import FLRunTimeJob
server = FLServer()
server_id = 0
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_server_job(job_path, server_id)
server.set_server_job(job)
server.start()
from paddle_fl.core.trainer.fl_trainer import FLTrainerFactory
from paddle_fl.core.master.fl_job import FLRunTimeJob
import numpy as np
import sys
def reader():
for i in range(1000):
data_dict = {}
for i in range(3):
data_dict[str(i)] = np.random.rand(1, 5).astype('float32')
data_dict["label"] = np.random.randint(2, size=(1, 1)).astype('int64')
yield data_dict
trainer_id = int(sys.argv[1]) # trainer id for each guest
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id)
trainer = FLTrainerFactory().create_fl_trainer(job)
trainer.start()
output_folder = "fl_model"
step_i = 0
while not trainer.stop():
step_i += 1
print("batch %d start train" % (step_i))
trainer.train_inner_loop(reader)
trainer.save_inference_program(output_folder)
unset http_proxy
unset https_proxy
python fl_master.py
sleep 2
python -u fl_server.py >server0.log &
sleep 2
python -u fl_trainer.py 0 >trainer0.log &
sleep 2
python -u fl_trainer.py 1 >trainer1.log &
import paddle.fluid as fluid
import paddle_fl as fl
from paddle_fl.core.master.job_generator import JobGenerator
from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory
import math
class Model(object):
def __init__(self):
pass
def mlp(self, inputs, label, hidden_size=128):
self.concat = fluid.layers.concat(inputs, axis=1)
self.fc1 = fluid.layers.fc(input=self.concat, size=256, act='relu')
self.fc2 = fluid.layers.fc(input=self.fc1, size=128, act='relu')
self.predict = fluid.layers.fc(input=self.fc2, size=2, act='softmax')
self.sum_cost = fluid.layers.cross_entropy(input=self.predict, label=label)
self.accuracy = fluid.layers.accuracy(input=self.predict, label=label)
self.loss = fluid.layers.reduce_mean(self.sum_cost)
self.startup_program = fluid.default_startup_program()
inputs = [fluid.layers.data( \
name=str(slot_id), shape=[5],
dtype="float32")
for slot_id in range(3)]
label = fluid.layers.data( \
name="label",
shape=[1],
dtype='int64')
model = Model()
model.mlp(inputs, label)
STEP_EPSILON = 10
DELTA = 0.00001
SIGMA = math.sqrt(2.0 * math.log(1.25/DELTA)) / STEP_EPSILON
CLIP = 10.0
batch_size = 1
job_generator = JobGenerator()
optimizer = fluid.optimizer.Dpsgd(0.1, clip=CLIP, batch_size=float(batch_size), sigma=0.0 * SIGMA)
job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program)
build_strategy = FLStrategyFactory()
build_strategy.dpsgd = True
build_strategy.inner_step = 1
strategy = build_strategy.create_fl_strategy()
# endpoints will be collected through the cluster
# in this example, we suppose endpoints have been collected
endpoints = ["127.0.0.1:8181"]
output = "fl_job_config"
job_generator.generate_fl_job(
strategy, server_endpoints=endpoints, worker_num=2, output=output)
# fl_job_config will be dispatched to workers
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle_fl as fl
import paddle.fluid as fluid
from paddle_fl.core.server.fl_server import FLServer
from paddle_fl.core.master.fl_job import FLRunTimeJob
server = FLServer()
server_id = 0
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_server_job(job_path, server_id)
server.set_server_job(job)
server.start()
from paddle_fl.core.trainer.fl_trainer import FLTrainerFactory
from paddle_fl.core.master.fl_job import FLRunTimeJob
import numpy as np
import sys
def reader():
for i in range(1000):
data_dict = {}
for i in range(3):
data_dict[str(i)] = np.random.rand(1, 5).astype('float32')
data_dict["label"] = np.random.randint(2, size=(1, 1)).astype('int64')
yield data_dict
trainer_id = int(sys.argv[1]) # trainer id for each guest
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id)
trainer = FLTrainerFactory().create_fl_trainer(job)
trainer.start()
step_i = 0
while not trainer.stop():
step_i += 1
print("batch %d start train" % (step_i))
trainer.train_inner_loop(reader)
python fl_master.py
sleep 2
python -u fl_server.py >server0.log &
sleep 2
python -u fl_trainer.py 0 >trainer0.log &
sleep 2
python -u fl_trainer.py 1 >trainer1.log &
wget --no-check-certificate https://paddlefl.bj.bcebos.com/rsc15_benchmark/mid_data.tar
tar xvf mid_data.tar
import paddle.fluid as fluid
import paddle_fl as fl
from paddle_fl.core.master.job_generator import JobGenerator
from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory
class Model(object):
def __init__(self):
pass
def gru4rec_network(self,
vocab_size=37483,
hid_size=10,
init_low_bound=-0.04,
init_high_bound=0.04):
""" network definition """
emb_lr_x = 10.0
gru_lr_x = 1.0
fc_lr_x = 1.0
# Input data
src_wordseq = fluid.layers.data(
name="src_wordseq", shape=[1], dtype="int64", lod_level=1)
dst_wordseq = fluid.layers.data(
name="dst_wordseq", shape=[1], dtype="int64", lod_level=1)
emb = fluid.layers.embedding(
input=src_wordseq,
size=[vocab_size, hid_size],
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Uniform(
low=init_low_bound, high=init_high_bound),
learning_rate=emb_lr_x),
#is_distributed=True,
is_sparse=False)
fc0 = fluid.layers.fc(input=emb,
size=hid_size * 3,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Uniform(
low=init_low_bound, high=init_high_bound),
learning_rate=gru_lr_x))
gru_h0 = fluid.layers.dynamic_gru(
input=fc0,
size=hid_size,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Uniform(
low=init_low_bound, high=init_high_bound),
learning_rate=gru_lr_x))
fc = fluid.layers.fc(input=gru_h0,
size=vocab_size,
act='softmax',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Uniform(
low=init_low_bound, high=init_high_bound),
learning_rate=fc_lr_x))
cost = fluid.layers.cross_entropy(input=fc, label=dst_wordseq)
acc = fluid.layers.accuracy(input=fc, label=dst_wordseq, k=20)
self.loss = fluid.layers.mean(x=cost)
self.startup_program = fluid.default_startup_program()
model = Model()
model.gru4rec_network()
job_generator = JobGenerator()
optimizer = fluid.optimizer.SGD(learning_rate=2.0)
job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program)
build_strategy = FLStrategyFactory()
build_strategy.fed_avg = True
build_strategy.inner_step = 10
strategy = build_strategy.create_fl_strategy()
# endpoints will be collected through the cluster
# in this example, we suppose endpoints have been collected
endpoints = ["127.0.0.1:8181"]
output = "fl_job_config"
job_generator.generate_fl_job(
strategy, server_endpoints=endpoints, worker_num=2, output=output)
# fl_job_config will be dispatched to workers
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle_fl as fl
import paddle.fluid as fluid
from paddle_fl.core.server.fl_server import FLServer
from paddle_fl.core.master.fl_job import FLRunTimeJob
server = FLServer()
server_id = 0
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_server_job(job_path, server_id)
server.set_server_job(job)
server.start()
from paddle_fl.core.trainer.fl_trainer import FLTrainerFactory
from paddle_fl.core.master.fl_job import FLRunTimeJob
from paddle_fl.reader.gru4rec_reader import Gru4rec_Reader
import paddle.fluid as fluid
import numpy as np
import sys
import os
trainer_id = int(sys.argv[1]) # trainer id for each guest
place = fluid.CPUPlace()
train_file_dir = "mid_data/node1/0/"
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id)
trainer = FLTrainerFactory().create_fl_trainer(job)
trainer.start()
r = Gru4rec_Reader()
train_reader = r.reader(train_file_dir, place)
step_i = 0
while not trainer.stop():
step_i += 1
print("batch %d start train" % (step_i))
trainer.train_inner_loop(train_reader)
python fl_master.py
sleep 2
python -u fl_server.py >server0.log &
sleep 2
python -u fl_trainer.py 0 >trainer0.log &
sleep 2
python -u fl_trainer.py 1 >trainer1.log &
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .gru4rec_reader import Gru4rec_Reader
import paddle.fluid as fluid
import numpy as np
import os
class Gru4rec_Reader:
def __init__(self):
pass
def to_lodtensor(self, data, place):
""" convert to LODtensor """
seq_lens = [len(seq) for seq in data]
cur_len = 0
lod = [cur_len]
for l in seq_lens:
cur_len += l
lod.append(cur_len)
flattened_data = np.concatenate(data, axis=0).astype("int64")
flattened_data = flattened_data.reshape([len(flattened_data), 1])
res = fluid.LoDTensor()
res.set(flattened_data, place)
res.set_lod([lod])
return res
def lod_reader(self, reader, place):
def feed_reader():
for data in reader():
lod_src_wordseq = self.to_lodtensor([dat[0] for dat in data],
place)
lod_dst_wordseq = self.to_lodtensor([dat[1] for dat in data],
place)
fe_data = {}
fe_data["src_wordseq"] = lod_src_wordseq
fe_data["dst_wordseq"] = lod_dst_wordseq
yield fe_data
return feed_reader
def sort_batch(self, reader, batch_size, sort_group_size, drop_last=False):
"""
Create a batched reader.
"""
def batch_reader():
r = reader()
b = []
for instance in r:
b.append(instance)
if len(b) == sort_group_size:
sortl = sorted(b, key=lambda x: len(x[0]), reverse=True)
b = []
c = []
for sort_i in sortl:
c.append(sort_i)
if (len(c) == batch_size):
yield c
c = []
if drop_last == False and len(b) != 0:
sortl = sorted(b, key=lambda x: len(x[0]), reverse=True)
c = []
for sort_i in sortl:
c.append(sort_i)
if (len(c) == batch_size):
yield c
c = []
# Batch size check
batch_size = int(batch_size)
if batch_size <= 0:
raise ValueError("batch_size should be a positive integeral value, "
"but got batch_size={}".format(batch_size))
return batch_reader
def reader_creator(self, file_dir):
def reader():
files = os.listdir(file_dir)
for fi in files:
with open(file_dir + '/' + fi, "r") as f:
for l in f:
l = l.strip().split()
l = [w for w in l]
src_seq = l[:len(l) - 1]
trg_seq = l[1:]
yield src_seq, trg_seq
return reader
def reader(self, file_dir, place, batch_size=5):
""" prepare the English Pann Treebank (PTB) data """
print("start constuct word dict")
reader = self.sort_batch(self.reader_creator(file_dir), batch_size, batch_size * 20)
return self.lod_reader(reader, place)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" PaddleFL version string """
fl_version = "0.1.0"
module_proto_version = "0.1.0"
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册