diff --git a/python/paddle_fl/paddle_fl/core/master/job_generator.py b/python/paddle_fl/paddle_fl/core/master/job_generator.py index 64feb7d8083699459d05f3ccee6185cd00194312..a69ffa16c294ed13ec15e46a25301a3214adf4c3 100644 --- a/python/paddle_fl/paddle_fl/core/master/job_generator.py +++ b/python/paddle_fl/paddle_fl/core/master/job_generator.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os import paddle.fluid as fluid from .fl_job import FLCompileTimeJob @@ -197,3 +198,99 @@ class JobGenerator(object): local_job.set_target_names(self._target_names) local_job.set_strategy(fl_strategy) local_job.save(output) + + def save_program(self, program_path, input_list, hidden_vars, loss): + if not os.path.exists(program_path): + os.makedirs(program_path) + main_program_str = fluid.default_main_program( + ).desc.serialize_to_string() + startup_program_str = fluid.default_startup_program( + ).desc.serialize_to_string() + params = fluid.default_main_program().global_block().all_parameters() + para_info = [] + for pa in params: + para_info.append(pa.name) + with open(program_path + '/input_names', 'w') as fout: + for input in input_list: + fout.write("%s\n" % input.name) + with open(program_path + '/hidden_vars', 'w') as fout: + for var in hidden_vars: + fout.write("%s:%s\n" % (var[0], var[1].name)) + with open(program_path + '/para_info', 'w') as fout: + for item in para_info: + fout.write("%s\n" % item) + with open(program_path + '/startup_program', "wb") as fout: + fout.write(startup_program_str) + with open(program_path + '/main_program', "wb") as fout: + fout.write(main_program_str) + with open(program_path + '/loss_name', 'w') as fout: + fout.write(loss.name) + + def generate_fl_job_from_program(self, strategy, endpoints, worker_num, + program_input, output): + local_job = FLCompileTimeJob() + with open(program_input + '/startup_program', "rb") as fin: + program_desc_str = fin.read() + new_startup = fluid.Program.parse_from_string(program_desc_str) + + with open(program_input + '/main_program', "rb") as fin: + program_desc_str = fin.read() + new_main = fluid.Program.parse_from_string(program_desc_str) + + para_list = [] + with open(program_input + '/para_info', 'r') as fin: + for line in fin: + current_para = line[:-1] + para_list.append(current_para) + + input_list = [] + with open(program_input + '/input_names', 'r') as fin: + for line in fin: + current_input = line[:-1] + input_list.append(current_input) + + with open(program_input + '/loss_name', 'r') as fin: + loss_name = fin.read() + + for item in para_list: + para = new_main.global_block().var(item) + para.regularizer = None + para.optimize_attr = {'learning_rate': 1.0} + para.trainable = True + exe = fluid.Executor(fluid.CPUPlace()) + loss = None + for var in new_main.list_vars(): + if var.name == loss_name: + loss = var + with fluid.program_guard(new_main, new_startup): + optimizer = fluid.optimizer.SGD(learning_rate=0.1, + parameter_list=para_list) + exe.run(new_startup) + strategy.minimize(optimizer, loss) + + for trainer_id in range(worker_num): + startup_program = new_startup.clone() + main_program = loss.block.program.clone() + strategy._build_trainer_program_for_job( + trainer_id, + program=main_program, + ps_endpoints=endpoints, + trainers=worker_num, + sync_mode=True, + startup_program=startup_program, + job=local_job) + + startup_program = new_startup.clone() + main_program = loss.block.program.clone() + strategy._build_server_programs_for_job( + program=main_program, + ps_endpoints=endpoints, + trainers=worker_num, + sync_mode=True, + startup_program=startup_program, + job=local_job) + + local_job.set_feed_names(input_list) + local_job.set_target_names([loss.name]) + local_job.set_strategy(strategy) + local_job.save(output) diff --git a/python/paddle_fl/paddle_fl/examples/generate_job_from_program/fl_master.py b/python/paddle_fl/paddle_fl/examples/generate_job_from_program/fl_master.py new file mode 100644 index 0000000000000000000000000000000000000000..0932a4d016a593ed1f2c2fb537cbd1259421737c --- /dev/null +++ b/python/paddle_fl/paddle_fl/examples/generate_job_from_program/fl_master.py @@ -0,0 +1,34 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle_fl.paddle_fl as fl +import paddle.fluid as fluid +from paddle_fl.paddle_fl.core.master.job_generator import JobGenerator +from paddle_fl.paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory + +build_strategy = FLStrategyFactory() +build_strategy.fed_avg = True +build_strategy.inner_step = 10 +strategy = build_strategy.create_fl_strategy() + +endpoints = ["127.0.0.1:8181"] +output = "fl_job_config" +program_file = "./load_file" +job_generator = JobGenerator() +job_generator.generate_fl_job_from_program( + strategy=strategy, + endpoints=endpoints, + worker_num=2, + program_input=program_file, + output=output) diff --git a/python/paddle_fl/paddle_fl/examples/generate_job_from_program/fl_scheduler.py b/python/paddle_fl/paddle_fl/examples/generate_job_from_program/fl_scheduler.py new file mode 100644 index 0000000000000000000000000000000000000000..a8f507e678acc0f4c6faabcf8d5fcaea7a7e3ba0 --- /dev/null +++ b/python/paddle_fl/paddle_fl/examples/generate_job_from_program/fl_scheduler.py @@ -0,0 +1,24 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from paddle_fl.paddle_fl.core.scheduler.agent_master import FLScheduler + +worker_num = 2 +server_num = 1 +#Define number of worker/server and the port for scheduler +scheduler = FLScheduler(worker_num, server_num, port=9091) +scheduler.set_sample_worker_num(2) +scheduler.init_env() +print("init env done.") +scheduler.start_fl_training() diff --git a/python/paddle_fl/paddle_fl/examples/generate_job_from_program/fl_server.py b/python/paddle_fl/paddle_fl/examples/generate_job_from_program/fl_server.py new file mode 100644 index 0000000000000000000000000000000000000000..aa1e9a86504673968cdfb969de5321df5d33277e --- /dev/null +++ b/python/paddle_fl/paddle_fl/examples/generate_job_from_program/fl_server.py @@ -0,0 +1,27 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle_fl.paddle_fl as fl +import paddle.fluid as fluid +from paddle_fl.paddle_fl.core.server.fl_server import FLServer +from paddle_fl.paddle_fl.core.master.fl_job import FLRunTimeJob +server = FLServer() +server_id = 0 +job_path = "fl_job_config" +job = FLRunTimeJob() +job.load_server_job(job_path, server_id) +job._scheduler_ep = "127.0.0.1:9091" # IP address for scheduler +server.set_server_job(job) +server._current_ep = "127.0.0.1:8181" # IP address for server +server.start() diff --git a/python/paddle_fl/paddle_fl/examples/generate_job_from_program/fl_trainer.py b/python/paddle_fl/paddle_fl/examples/generate_job_from_program/fl_trainer.py new file mode 100644 index 0000000000000000000000000000000000000000..8a1be916edda07c78f086f1c47b8e1f50713cdb2 --- /dev/null +++ b/python/paddle_fl/paddle_fl/examples/generate_job_from_program/fl_trainer.py @@ -0,0 +1,83 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from paddle_fl.paddle_fl.core.trainer.fl_trainer import FLTrainerFactory +from paddle_fl.paddle_fl.core.master.fl_job import FLRunTimeJob +import numpy +import sys +import paddle +import paddle.fluid as fluid +import logging +import math + +logging.basicConfig( + filename="test.log", + filemode="w", + format="%(asctime)s %(name)s:%(levelname)s:%(message)s", + datefmt="%d-%M-%Y %H:%M:%S", + level=logging.DEBUG) + +trainer_id = int(sys.argv[1]) # trainer id for each guest +job_path = "fl_job_config" +job = FLRunTimeJob() +job.load_trainer_job(job_path, trainer_id) +job._scheduler_ep = "127.0.0.1:9091" # Inform scheduler IP address to trainer +trainer = FLTrainerFactory().create_fl_trainer(job) +trainer._current_ep = "127.0.0.1:{}".format(9000 + trainer_id) +trainer.start() +test_program = trainer._main_program.clone(for_test=True) + +train_reader = paddle.batch( + paddle.reader.shuffle( + paddle.dataset.mnist.train(), buf_size=500), + batch_size=64) +test_reader = paddle.batch(paddle.dataset.mnist.test(), batch_size=64) + +input = fluid.layers.data(name='input', shape=[1, 28, 28], dtype='float32') +label = fluid.layers.data(name='label', shape=[1], dtype='int64') +feeder = fluid.DataFeeder(feed_list=[input, label], place=fluid.CPUPlace()) + + +def train_test(train_test_program, train_test_feed, train_test_reader): + acc_set = [] + for test_data in train_test_reader(): + acc_np = trainer.exe.run(program=train_test_program, + feed=train_test_feed.feed(test_data), + fetch_list=["accuracy_0.tmp_0"]) + acc_set.append(float(acc_np[0])) + acc_val_mean = numpy.array(acc_set).mean() + return acc_val_mean + + +output_folder = "model_node%d" % trainer_id +epoch_id = 0 +step = 0 +while not trainer.stop(): + epoch_id += 1 + if epoch_id > 40: + break + print("epoch %d start train" % (epoch_id)) + for step_id, data in enumerate(train_reader()): + acc = trainer.run(feeder.feed(data), fetch=["accuracy_0.tmp_0"]) + step += 1 + + acc_val = train_test( + train_test_program=test_program, + train_test_reader=test_reader, + train_test_feed=feeder) + + print("Test with epoch %d, accuracy: %s" % (epoch_id, acc_val)) + + save_dir = (output_folder + "/epoch_%d") % epoch_id + trainer.save_inference_program(output_folder) diff --git a/python/paddle_fl/paddle_fl/examples/generate_job_from_program/program_saver.py b/python/paddle_fl/paddle_fl/examples/generate_job_from_program/program_saver.py new file mode 100644 index 0000000000000000000000000000000000000000..e6f752880bbaf30005eeb59ac2d9765707eeaf06 --- /dev/null +++ b/python/paddle_fl/paddle_fl/examples/generate_job_from_program/program_saver.py @@ -0,0 +1,36 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import json +import paddle.fluid as fluid +from paddle_fl.paddle_fl.core.master.job_generator import JobGenerator + +input = fluid.layers.data(name='input', shape=[1, 28, 28], dtype="float32") +label = fluid.layers.data(name='label', shape=[1], dtype='int64') +feeder = fluid.DataFeeder(feed_list=[input, label], place=fluid.CPUPlace()) +predict = fluid.layers.fc(input=input, size=10, act='softmax') +sum_cost = fluid.layers.cross_entropy(input=predict, label=label) +accuracy = fluid.layers.accuracy(input=predict, label=label) +avg_cost = fluid.layers.mean(sum_cost, name="loss") +startup_program = fluid.default_startup_program() +place = fluid.CPUPlace() +exe = fluid.Executor(place) +exe.run(startup_program) + +job_generator = JobGenerator() +program_path = './load_file' +job_generator.save_program(program_path, [input, label], + [['predict', predict], ['accuracy', accuracy]], + avg_cost) diff --git a/python/paddle_fl/paddle_fl/examples/generate_job_from_program/run.sh b/python/paddle_fl/paddle_fl/examples/generate_job_from_program/run.sh new file mode 100644 index 0000000000000000000000000000000000000000..5a19898109a546f5f15463544a1d54d60d9cfd4b --- /dev/null +++ b/python/paddle_fl/paddle_fl/examples/generate_job_from_program/run.sh @@ -0,0 +1,14 @@ +unset http_proxy +unset https_proxy +python program_saver.py + +python fl_master.py +sleep 2 +python -u fl_scheduler.py >scheduler.log & +sleep 2 +python -u fl_server.py >server0.log & +sleep 2 +python -u fl_trainer.py 0 >trainer0.log & +sleep 2 +python -u fl_trainer.py 1 > trainer1.log & +sleep 2 diff --git a/python/paddle_fl/paddle_fl/examples/generate_job_from_program/stop.sh b/python/paddle_fl/paddle_fl/examples/generate_job_from_program/stop.sh new file mode 100644 index 0000000000000000000000000000000000000000..148759658e3986ddebf9fa4153874157a4b738de --- /dev/null +++ b/python/paddle_fl/paddle_fl/examples/generate_job_from_program/stop.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +echo "Stop service!" + +ps -ef | grep -E "fl" | grep -v grep | awk '{print $2}' | xargs kill -9