未验证 提交 28007898 编写于 作者: D Dong Daxiang 提交者: GitHub

Merge pull request #82 from qjing666/program

update load program function and add detection demo
......@@ -199,23 +199,29 @@ class JobGenerator(object):
local_job.set_strategy(fl_strategy)
local_job.save(output)
def save_program(self, program_path, input_list, hidden_vars, loss):
def save_program(self,
main_prog,
startup_prog,
program_path,
input_list,
hidden_vars,
loss,
learning_rate=None):
if not os.path.exists(program_path):
os.makedirs(program_path)
main_program_str = fluid.default_main_program(
).desc.serialize_to_string()
startup_program_str = fluid.default_startup_program(
).desc.serialize_to_string()
params = fluid.default_main_program().global_block().all_parameters()
main_program_str = main_prog.desc.serialize_to_string()
startup_program_str = startup_prog.desc.serialize_to_string()
params = main_prog.global_block().all_parameters()
para_info = []
for pa in params:
para_info.append(pa.name)
with open(program_path + '/input_names', 'w') as fout:
for input in input_list:
fout.write("%s\n" % input.name)
with open(program_path + '/hidden_vars', 'w') as fout:
for var in hidden_vars:
fout.write("%s:%s\n" % (var[0], var[1].name))
fout.write("%s\n" % input)
if hidden_vars != None:
with open(program_path + '/hidden_vars', 'w') as fout:
for var in hidden_vars:
fout.write("%s:%s\n" % (var[0], var[1].name))
with open(program_path + '/para_info', 'w') as fout:
for item in para_info:
fout.write("%s\n" % item)
......@@ -225,6 +231,9 @@ class JobGenerator(object):
fout.write(main_program_str)
with open(program_path + '/loss_name', 'w') as fout:
fout.write(loss.name)
if type(learning_rate) == fluid.Variable:
with open(program_path + '/lr_name', 'w') as fout:
fout.write(learning_rate.name)
def generate_fl_job_from_program(self, strategy, endpoints, worker_num,
program_input, output):
......@@ -252,6 +261,12 @@ class JobGenerator(object):
with open(program_input + '/loss_name', 'r') as fin:
loss_name = fin.read()
if os.path.exists(program_input + '/lr_name'):
with open(program_input + '/lr_name', 'r') as fin:
lr_name = fin.read()
else:
lr_name = None
for item in para_list:
para = new_main.global_block().var(item)
para.regularizer = None
......@@ -262,9 +277,20 @@ class JobGenerator(object):
for var in new_main.list_vars():
if var.name == loss_name:
loss = var
if lr_name != None:
if var.name == lr_name:
lr = var
with fluid.program_guard(new_main, new_startup):
optimizer = fluid.optimizer.SGD(learning_rate=0.1,
parameter_list=para_list)
if lr_name != None:
optimizer = fluid.optimizer.MomentumOptimizer(
learning_rate=lr, momentum=0.9, parameter_list=para_list)
else:
optimizer = fluid.optimizer.MomentumOptimizer(
learning_rate=0.00001,
momentum=0.9,
parameter_list=para_list)
exe.run(new_startup)
strategy.minimize(optimizer, loss)
......
......@@ -695,7 +695,8 @@ class FLDistributeTranspiler(object):
opti_to_param = dict()
param_to_opti = dict()
for op in self.optimize_ops:
if (op.type == "sgd") or (op.type == "adam"):
if (op.type == "sgd") or (op.type == "adam") or (
op.type == "momentum"):
origin_name = op.output("ParamOut")
var = self.origin_program.global_block().var(origin_name[0])
new_var_name = "%s.opti.trainer_%d" % (origin_name[0],
......
......@@ -11,8 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import paddle.fluid as fluid
import logging
from paddle.fluid.executor import global_scope
import pickle
from paddle.fluid.io import is_belong_to_optimizer
from paddle_fl.paddle_fl.core.scheduler.agent_master import FLWorkerAgent
import numpy
import hmac
......@@ -83,6 +87,43 @@ class FLTrainer(object):
self.exe,
main_program=infer_program)
def save(self, parameter_dir, model_path):
base_name = os.path.basename(model_path)
assert base_name != "", \
"The input model_path MUST be format of dirname/filename [dirname\\filename in Windows system], but received model_path is empty string."
dir_name = os.path.dirname(model_path)
if dir_name and not os.path.exists(dir_name):
os.makedirs(dir_name)
def get_tensor(var_name):
t = global_scope().find_var(var_name).get_tensor()
return numpy.array(t)
parameter_list = []
with open(parameter_dir + '/para_info', 'r') as fin:
for line in fin:
current_para = line[:-1]
parameter_list.append(current_para)
param_dict = {p: get_tensor(p) for p in parameter_list}
with open(model_path + ".pdparams", 'wb') as f:
pickle.dump(param_dict, f, protocol=2)
optimizer_var_list = list(
filter(is_belong_to_optimizer, self._main_program.list_vars()))
opt_dict = {p.name: get_tensor(p.name) for p in optimizer_var_list}
with open(model_path + ".pdopt", 'wb') as f:
pickle.dump(opt_dict, f, protocol=2)
main_program = self._main_program.clone()
self._main_program.desc.flush()
main_program.desc._set_version()
fluid.core.save_op_compatible_info(self._main_program.desc)
with open(model_path + ".pdmodel", "wb") as f:
f.write(self._main_program.desc.serialize_to_string())
def stop(self):
# ask for termination with master endpoint
# currently not open sourced, will release the code later
......
# Example of a detection model training with FedAvg
This document introduce how to start a detection model training in PaddleFL with our pre-defined program. Now we only provide faster_rcnn, more models will be updated.
### Dependencies
- paddlepaddle>=1.8
- paddle_fl>=1.0.1
Please use pip which has paddlepaddle installed
```sh
pip install paddle_fl
```
### How to Run
#### Download the dataset
```sh
# download and unzip the dataset
sh download.sh
```
#### Start training
Before training, please modify the following paths according to your environment.
```python
# In run.sh, change the path to you PaddleDetection
export PYTHONPATH=/path/to/PaddleDetection
# In fl_train.py, change the path to your fl_fruit dataset that is downloaded in download.sh.
# Note, the path should be absolute path rather than relative path. Otherwise, error will be raised.
#line 41
self.root_path = '/path/to/your/fl_fruit'
#line 42
self.anno_path = '/path/to/your/fl_fruit/train' + str(trainer_id) + '.txt'
# line 44
self.image_dir = '/path/to/your/fl_fruit/JPEGImages'
# line 57
label_list='/path/to/your/fl_fruit/label_list.txt'
```
After modifying the path, you can run the following shell directly.
```sh
sh run.sh
```
wget --no-check-certificate https://paddlefl.bj.bcebos.com/datasets/fl_fruit.tar.gz
tar -xf fl_fruit.tar.gz
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import paddle_fl.paddle_fl as fl
import paddle.fluid as fluid
from paddle_fl.paddle_fl.core.master.job_generator import JobGenerator
from paddle_fl.paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory
# Fedavg
build_strategy = FLStrategyFactory()
build_strategy.fed_avg = True
build_strategy.inner_step = 5
strategy = build_strategy.create_fl_strategy()
endpoints = ["127.0.0.1:8181"]
output = "fl_job_config"
program_file = "faster_rcnn_program"
job_generator = JobGenerator()
job_generator.generate_fl_job_from_program(
strategy=strategy,
endpoints=endpoints,
worker_num=2,
program_input=program_file,
output=output)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_fl.paddle_fl.core.scheduler.agent_master import FLScheduler
worker_num = 2
server_num = 1
#Define number of worker/server and the port for scheduler
scheduler = FLScheduler(worker_num, server_num, port=9091)
scheduler.set_sample_worker_num(2)
scheduler.init_env()
print("init env done.")
scheduler.start_fl_training()
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#import paddle_fl.paddle_fl as fl
import paddle_fl as fl
import paddle.fluid as fluid
from paddle_fl.paddle_fl.core.server.fl_server import FLServer
from paddle_fl.paddle_fl.core.master.fl_job import FLRunTimeJob
server = FLServer()
server_id = 0
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_server_job(job_path, server_id)
job._scheduler_ep = "127.0.0.1:9091" # IP address for scheduler
server.set_server_job(job)
server._current_ep = "127.0.0.1:8181" # IP address for server
server.start()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_fl.paddle_fl.core.trainer.fl_trainer import FLTrainerFactory
from paddle_fl.paddle_fl.core.master.fl_job import FLRunTimeJob
import numpy as np
import sys
import paddle
import pickle
import paddle.fluid as fluid
import logging
import math
import unittest
import os
from ppdet.data.source.voc import VOCDataSet
from ppdet.data.reader import Reader
from ppdet.utils.download import get_path
from ppdet.utils.download import DATASET_HOME
from ppdet.data.transform.operators import DecodeImage, RandomFlipImage, NormalizeImage, ResizeImage, Permute
from ppdet.data.transform.batch_operators import PadBatch
trainer_id = int(sys.argv[1]) # trainer id for each guest
class DataReader():
def __init__(self):
""" setup
"""
self.root_path = '/path/to/your/fl_fruit'
self.anno_path = '/path/to/your/fl_fruit/train' + str(
trainer_id) + '.txt'
self.image_dir = '/path/to/your/fl_fruit/JPEGImages'
def tearDownClass(self):
""" tearDownClass """
pass
def test_loader(self):
coco_loader = VOCDataSet(
dataset_dir=self.image_dir,
image_dir=self.root_path,
anno_path=self.anno_path,
sample_num=240,
use_default_label=False,
label_list='/path/to/your/fl_fruit/label_list.txt')
sample_trans = [
DecodeImage(to_rgb=True), RandomFlipImage(), NormalizeImage(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225],
is_scale=True,
is_channel_first=False), ResizeImage(
target_size=800, max_size=1333, interp=1),
Permute(to_bgr=False)
]
batch_trans = [PadBatch(pad_to_stride=32, use_padded_im_info=True), ]
inputs_def = {
'fields':
['image', 'im_info', 'im_id', 'gt_bbox', 'gt_class', 'is_crowd'],
}
data_loader = Reader(
coco_loader,
sample_transforms=sample_trans,
batch_transforms=batch_trans,
batch_size=1,
shuffle=True,
drop_empty=True,
inputs_def=inputs_def)()
return data_loader
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id)
job._scheduler_ep = "127.0.0.1:9091" # Inform scheduler IP address to trainer
trainer = FLTrainerFactory().create_fl_trainer(job)
trainer._current_ep = "127.0.0.1:{}".format(9000 + trainer_id)
trainer.start(fluid.CUDAPlace(trainer_id))
test_program = trainer._main_program.clone(for_test=True)
image = fluid.layers.data(
name='image', shape=[3, None, None], dtype='float32', lod_level=0)
im_info = fluid.layers.data(
name='im_info', shape=[None, 3], dtype='float32', lod_level=0)
im_id = fluid.layers.data(
name='im_id', shape=[None, 1], dtype='int64', lod_level=0)
gt_bbox = fluid.layers.data(
name='gt_bbox', shape=[None, 4], dtype='float32', lod_level=1)
gt_class = fluid.layers.data(
name='gt_class', shape=[None, 1], dtype='int32', lod_level=1)
is_crowd = fluid.layers.data(
name='is_crowd', shape=[None, 1], dtype='int32', lod_level=1)
place = fluid.CUDAPlace(trainer_id)
feeder = fluid.DataFeeder(
feed_list=[image, im_info, im_id, gt_bbox, gt_class, is_crowd],
place=place)
output_folder = "5_model_node%d" % trainer_id
epoch_id = 0
step = 0
para_dir = "faster_rcnn_program"
while not trainer.stop():
epoch_id += 1
if epoch_id > 120:
break
print("epoch %d start train" % (epoch_id))
test_class = DataReader()
data_loader = test_class.test_loader()
for step_id, data in enumerate(data_loader):
acc = trainer.run(feeder.feed(data), fetch=['sum_0.tmp_0'])
step += 1
print("step: {}, loss: {}".format(step, acc))
if trainer_id == 0:
save_dir = (output_folder + "/epoch_%d") % epoch_id
trainer.save(para_dir, save_dir)
wget --no-check-certificate https://paddlefl.bj.bcebos.com/detection_programs/faster_rcnn_program.tar.gz
tar -xf faster_rcnn_program.tar.gz
unset http_proxy
unset https_proxy
export PYTHONPATH=/path/to/PaddleDetection
CUDA_VISIBLE_DEVICES=0,1
python fl_master.py
sleep 2
python -u fl_scheduler.py >scheduler.log &
sleep 2
python -u fl_server.py >server0.log &
sleep 2
python -u fl_trainer.py 0 > 5_trainer0.log &
sleep 2
python -u fl_trainer.py 1 > 5_trainer1.log &
sleep 2
......@@ -61,6 +61,7 @@ def train_test(train_test_program, train_test_feed, train_test_reader):
return acc_val_mean
para_dir = 'load_file'
output_folder = "model_node%d" % trainer_id
epoch_id = 0
step = 0
......@@ -80,5 +81,6 @@ while not trainer.stop():
print("Test with epoch %d, accuracy: %s" % (epoch_id, acc_val))
save_dir = (output_folder + "/epoch_%d") % epoch_id
trainer.save_inference_program(output_folder)
if trainer_id == 0:
save_dir = (output_folder + "/epoch_%d") % epoch_id
trainer.save(para_dir, save_dir)
......@@ -25,12 +25,13 @@ sum_cost = fluid.layers.cross_entropy(input=predict, label=label)
accuracy = fluid.layers.accuracy(input=predict, label=label)
avg_cost = fluid.layers.mean(sum_cost, name="loss")
startup_program = fluid.default_startup_program()
main_program = fluid.default_main_program()
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_program)
job_generator = JobGenerator()
program_path = './load_file'
job_generator.save_program(program_path, [input, label],
[['predict', predict], ['accuracy', accuracy]],
avg_cost)
job_generator.save_program(
main_program, startup_program, program_path, [input, label],
[['predict', predict], ['accuracy', accuracy]], avg_cost)
......@@ -12,5 +12,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
""" PaddleFL version string """
fl_version = "1.0.0"
module_proto_version = "1.0.0"
fl_version = "1.0.1"
module_proto_version = "1.0.1"
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册