未验证 提交 e0c7192c 编写于 作者: D Dong Daxiang 提交者: GitHub

Merge pull request #54 from qjing666/load_program

Load program from a pre-defined program
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import paddle.fluid as fluid
from .fl_job import FLCompileTimeJob
......@@ -197,3 +198,99 @@ class JobGenerator(object):
local_job.set_target_names(self._target_names)
local_job.set_strategy(fl_strategy)
local_job.save(output)
def save_program(self, program_path, input_list, hidden_vars, loss):
if not os.path.exists(program_path):
os.makedirs(program_path)
main_program_str = fluid.default_main_program(
).desc.serialize_to_string()
startup_program_str = fluid.default_startup_program(
).desc.serialize_to_string()
params = fluid.default_main_program().global_block().all_parameters()
para_info = []
for pa in params:
para_info.append(pa.name)
with open(program_path + '/input_names', 'w') as fout:
for input in input_list:
fout.write("%s\n" % input.name)
with open(program_path + '/hidden_vars', 'w') as fout:
for var in hidden_vars:
fout.write("%s:%s\n" % (var[0], var[1].name))
with open(program_path + '/para_info', 'w') as fout:
for item in para_info:
fout.write("%s\n" % item)
with open(program_path + '/startup_program', "wb") as fout:
fout.write(startup_program_str)
with open(program_path + '/main_program', "wb") as fout:
fout.write(main_program_str)
with open(program_path + '/loss_name', 'w') as fout:
fout.write(loss.name)
def generate_fl_job_from_program(self, strategy, endpoints, worker_num,
program_input, output):
local_job = FLCompileTimeJob()
with open(program_input + '/startup_program', "rb") as fin:
program_desc_str = fin.read()
new_startup = fluid.Program.parse_from_string(program_desc_str)
with open(program_input + '/main_program', "rb") as fin:
program_desc_str = fin.read()
new_main = fluid.Program.parse_from_string(program_desc_str)
para_list = []
with open(program_input + '/para_info', 'r') as fin:
for line in fin:
current_para = line[:-1]
para_list.append(current_para)
input_list = []
with open(program_input + '/input_names', 'r') as fin:
for line in fin:
current_input = line[:-1]
input_list.append(current_input)
with open(program_input + '/loss_name', 'r') as fin:
loss_name = fin.read()
for item in para_list:
para = new_main.global_block().var(item)
para.regularizer = None
para.optimize_attr = {'learning_rate': 1.0}
para.trainable = True
exe = fluid.Executor(fluid.CPUPlace())
loss = None
for var in new_main.list_vars():
if var.name == loss_name:
loss = var
with fluid.program_guard(new_main, new_startup):
optimizer = fluid.optimizer.SGD(learning_rate=0.1,
parameter_list=para_list)
exe.run(new_startup)
strategy.minimize(optimizer, loss)
for trainer_id in range(worker_num):
startup_program = new_startup.clone()
main_program = loss.block.program.clone()
strategy._build_trainer_program_for_job(
trainer_id,
program=main_program,
ps_endpoints=endpoints,
trainers=worker_num,
sync_mode=True,
startup_program=startup_program,
job=local_job)
startup_program = new_startup.clone()
main_program = loss.block.program.clone()
strategy._build_server_programs_for_job(
program=main_program,
ps_endpoints=endpoints,
trainers=worker_num,
sync_mode=True,
startup_program=startup_program,
job=local_job)
local_job.set_feed_names(input_list)
local_job.set_target_names([loss.name])
local_job.set_strategy(strategy)
local_job.save(output)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle_fl.paddle_fl as fl
import paddle.fluid as fluid
from paddle_fl.paddle_fl.core.master.job_generator import JobGenerator
from paddle_fl.paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory
build_strategy = FLStrategyFactory()
build_strategy.fed_avg = True
build_strategy.inner_step = 10
strategy = build_strategy.create_fl_strategy()
endpoints = ["127.0.0.1:8181"]
output = "fl_job_config"
program_file = "./load_file"
job_generator = JobGenerator()
job_generator.generate_fl_job_from_program(
strategy=strategy,
endpoints=endpoints,
worker_num=2,
program_input=program_file,
output=output)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_fl.paddle_fl.core.scheduler.agent_master import FLScheduler
worker_num = 2
server_num = 1
#Define number of worker/server and the port for scheduler
scheduler = FLScheduler(worker_num, server_num, port=9091)
scheduler.set_sample_worker_num(2)
scheduler.init_env()
print("init env done.")
scheduler.start_fl_training()
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle_fl.paddle_fl as fl
import paddle.fluid as fluid
from paddle_fl.paddle_fl.core.server.fl_server import FLServer
from paddle_fl.paddle_fl.core.master.fl_job import FLRunTimeJob
server = FLServer()
server_id = 0
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_server_job(job_path, server_id)
job._scheduler_ep = "127.0.0.1:9091" # IP address for scheduler
server.set_server_job(job)
server._current_ep = "127.0.0.1:8181" # IP address for server
server.start()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_fl.paddle_fl.core.trainer.fl_trainer import FLTrainerFactory
from paddle_fl.paddle_fl.core.master.fl_job import FLRunTimeJob
import numpy
import sys
import paddle
import paddle.fluid as fluid
import logging
import math
logging.basicConfig(
filename="test.log",
filemode="w",
format="%(asctime)s %(name)s:%(levelname)s:%(message)s",
datefmt="%d-%M-%Y %H:%M:%S",
level=logging.DEBUG)
trainer_id = int(sys.argv[1]) # trainer id for each guest
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id)
job._scheduler_ep = "127.0.0.1:9091" # Inform scheduler IP address to trainer
trainer = FLTrainerFactory().create_fl_trainer(job)
trainer._current_ep = "127.0.0.1:{}".format(9000 + trainer_id)
trainer.start()
test_program = trainer._main_program.clone(for_test=True)
train_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.mnist.train(), buf_size=500),
batch_size=64)
test_reader = paddle.batch(paddle.dataset.mnist.test(), batch_size=64)
input = fluid.layers.data(name='input', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
feeder = fluid.DataFeeder(feed_list=[input, label], place=fluid.CPUPlace())
def train_test(train_test_program, train_test_feed, train_test_reader):
acc_set = []
for test_data in train_test_reader():
acc_np = trainer.exe.run(program=train_test_program,
feed=train_test_feed.feed(test_data),
fetch_list=["accuracy_0.tmp_0"])
acc_set.append(float(acc_np[0]))
acc_val_mean = numpy.array(acc_set).mean()
return acc_val_mean
output_folder = "model_node%d" % trainer_id
epoch_id = 0
step = 0
while not trainer.stop():
epoch_id += 1
if epoch_id > 40:
break
print("epoch %d start train" % (epoch_id))
for step_id, data in enumerate(train_reader()):
acc = trainer.run(feeder.feed(data), fetch=["accuracy_0.tmp_0"])
step += 1
acc_val = train_test(
train_test_program=test_program,
train_test_reader=test_reader,
train_test_feed=feeder)
print("Test with epoch %d, accuracy: %s" % (epoch_id, acc_val))
save_dir = (output_folder + "/epoch_%d") % epoch_id
trainer.save_inference_program(output_folder)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import json
import paddle.fluid as fluid
from paddle_fl.paddle_fl.core.master.job_generator import JobGenerator
input = fluid.layers.data(name='input', shape=[1, 28, 28], dtype="float32")
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
feeder = fluid.DataFeeder(feed_list=[input, label], place=fluid.CPUPlace())
predict = fluid.layers.fc(input=input, size=10, act='softmax')
sum_cost = fluid.layers.cross_entropy(input=predict, label=label)
accuracy = fluid.layers.accuracy(input=predict, label=label)
avg_cost = fluid.layers.mean(sum_cost, name="loss")
startup_program = fluid.default_startup_program()
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_program)
job_generator = JobGenerator()
program_path = './load_file'
job_generator.save_program(program_path, [input, label],
[['predict', predict], ['accuracy', accuracy]],
avg_cost)
unset http_proxy
unset https_proxy
python program_saver.py
python fl_master.py
sleep 2
python -u fl_scheduler.py >scheduler.log &
sleep 2
python -u fl_server.py >server0.log &
sleep 2
python -u fl_trainer.py 0 >trainer0.log &
sleep 2
python -u fl_trainer.py 1 > trainer1.log &
sleep 2
#!/bin/bash
echo "Stop service!"
ps -ef | grep -E "fl" | grep -v grep | awk '{print $2}' | xargs kill -9
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册