From 132f37233be89809b597bed857c29c02e3d82bc1 Mon Sep 17 00:00:00 2001 From: qjing666 Date: Wed, 26 Feb 2020 16:00:20 +0800 Subject: [PATCH] add k8s deployment solution --- paddle_fl/core/master/job_generator.py | 38 ++++++++++ .../k8s_deployment/master/.bash_history | 4 ++ .../k8s_deployment/master/fl_master.py | 71 +++++++++++++++++++ .../k8s_deployment/master/run_master.sh | 3 + .../k8s_deployment/scheduler/fl_scheduler.py | 23 ++++++ .../k8s_deployment/scheduler/run_scheduler.sh | 3 + .../k8s_deployment/server/fl_server.py | 32 +++++++++ .../k8s_deployment/server/run_server.sh | 9 +++ .../k8s_deployment/trainer/fl_trainer.py | 41 +++++++++++ .../k8s_deployment/trainer/run_trainer.sh | 14 ++++ .../k8s_deployment/trainer0/fl_trainer.py | 43 +++++++++++ .../k8s_deployment/trainer0/test_trainer.sh | 9 +++ .../k8s_deployment/trainer1/fl_trainer.py | 43 +++++++++++ .../k8s_deployment/trainer1/test_trainer.sh | 9 +++ paddle_fl/version.py | 4 +- 15 files changed, 344 insertions(+), 2 deletions(-) create mode 100644 paddle_fl/examples/k8s_deployment/master/.bash_history create mode 100644 paddle_fl/examples/k8s_deployment/master/fl_master.py create mode 100644 paddle_fl/examples/k8s_deployment/master/run_master.sh create mode 100644 paddle_fl/examples/k8s_deployment/scheduler/fl_scheduler.py create mode 100644 paddle_fl/examples/k8s_deployment/scheduler/run_scheduler.sh create mode 100644 paddle_fl/examples/k8s_deployment/server/fl_server.py create mode 100644 paddle_fl/examples/k8s_deployment/server/run_server.sh create mode 100644 paddle_fl/examples/k8s_deployment/trainer/fl_trainer.py create mode 100644 paddle_fl/examples/k8s_deployment/trainer/run_trainer.sh create mode 100644 paddle_fl/examples/k8s_deployment/trainer0/fl_trainer.py create mode 100644 paddle_fl/examples/k8s_deployment/trainer0/test_trainer.sh create mode 100644 paddle_fl/examples/k8s_deployment/trainer1/fl_trainer.py create mode 100644 paddle_fl/examples/k8s_deployment/trainer1/test_trainer.sh diff --git a/paddle_fl/core/master/job_generator.py b/paddle_fl/core/master/job_generator.py index e429124..374e085 100644 --- a/paddle_fl/core/master/job_generator.py +++ b/paddle_fl/core/master/job_generator.py @@ -146,3 +146,41 @@ class JobGenerator(object): local_job.set_target_names(self._target_names) local_job.set_strategy(fl_strategy) local_job.save(output) + + + def generate_fl_job_for_k8s(self, + fl_strategy, + server_pod_endpoints=[], + server_service_endpoints=[], + worker_num=1, + output=None): + + local_job = FLCompileTimeJob() + assert len(self._losses) > 0 + assert self._startup_prog != None + assert fl_strategy != None + assert output != None + fl_strategy.minimize(self._optimizer, self._losses) + + # strategy can generate startup and main program + # of a single worker and servers + for trainer_id in range(worker_num): + startup_program = self._startup_prog.clone() + main_program = self._losses[0].block.program.clone() + fl_strategy._build_trainer_program_for_job( + trainer_id, program=main_program, + ps_endpoints=server_service_endpoints, trainers=worker_num, + sync_mode=True, startup_program=startup_program, + job=local_job) + + startup_program = self._startup_prog.clone() + main_program = self._losses[0].block.program.clone() + fl_strategy._build_server_programs_for_job( + program=main_program, ps_endpoints=server_pod_endpoints, + trainers=worker_num, sync_mode=True, + startup_program=startup_program, job=local_job) + + local_job.set_feed_names(self._feed_names) + local_job.set_target_names(self._target_names) + local_job.set_strategy(fl_strategy) + local_job.save(output) diff --git a/paddle_fl/examples/k8s_deployment/master/.bash_history b/paddle_fl/examples/k8s_deployment/master/.bash_history new file mode 100644 index 0000000..27b69cc --- /dev/null +++ b/paddle_fl/examples/k8s_deployment/master/.bash_history @@ -0,0 +1,4 @@ +ls +cd +ls +ls diff --git a/paddle_fl/examples/k8s_deployment/master/fl_master.py b/paddle_fl/examples/k8s_deployment/master/fl_master.py new file mode 100644 index 0000000..8b3f4f2 --- /dev/null +++ b/paddle_fl/examples/k8s_deployment/master/fl_master.py @@ -0,0 +1,71 @@ +import argparse +import paddle.fluid as fluid +import os +import paddle_fl as fl +from paddle_fl.core.master.job_generator import JobGenerator +from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory + +def parse_args(): + parser = argparse.ArgumentParser(description="master") + parser.add_argument( + '--trainer_num', + type=int, + default=2, + help='number of trainer(default: 2)') + + return parser.parse_args() + + +class Model(object): + def __init__(self): + pass + + def mlp(self, inputs, label, hidden_size=128): + self.concat = fluid.layers.concat(inputs, axis=1) + self.fc1 = fluid.layers.fc(input=self.concat, size=256, act='relu') + self.fc2 = fluid.layers.fc(input=self.fc1, size=128, act='relu') + self.predict = fluid.layers.fc(input=self.fc2, size=2, act='softmax') + self.sum_cost = fluid.layers.cross_entropy(input=self.predict, label=label) + self.accuracy = fluid.layers.accuracy(input=self.predict, label=label) + self.loss = fluid.layers.reduce_mean(self.sum_cost) + self.startup_program = fluid.default_startup_program() + +inputs = [fluid.layers.data( \ + name=str(slot_id), shape=[5], + dtype="float32") + for slot_id in range(3)] +label = fluid.layers.data( \ + name="label", + shape=[1], + dtype='int64') + +model = Model() +model.mlp(inputs, label) + +job_generator = JobGenerator() +optimizer = fluid.optimizer.SGD(learning_rate=0.1) +job_generator.set_optimizer(optimizer) +job_generator.set_losses([model.loss]) +job_generator.set_startup_program(model.startup_program) +job_generator.set_infer_feed_and_target_names( + [x.name for x in inputs], [model.predict.name]) + +build_strategy = FLStrategyFactory() +build_strategy.fed_avg = True +build_strategy.inner_step = 10 +strategy = build_strategy.create_fl_strategy() + +# endpoints will be collected through the cluster +# in this example, we suppose endpoints have been collected +server_service_ip = os.environ['FL_SERVER_SERVICE_HOST'] + ":" + os.environ['FL_SERVER_SERVICE_PORT_FL_SERVER'] +service_endpoints = [server_service_ip] +pod_endpoints = ["0.0.0.0:8181"] +output = "fl_job_config" +args = parse_args() +num_trainer = args.trainer_num +#job_generator.generate_fl_job( +# strategy, server_endpoints=endpoints, worker_num=num_trainer, output=output) +# fl_job_config will be dispatched to workers + +job_generator.generate_fl_job_for_k8s( + strategy, server_pod_endpoints=pod_endpoints,server_service_endpoints=service_endpoints, worker_num=2, output=output) diff --git a/paddle_fl/examples/k8s_deployment/master/run_master.sh b/paddle_fl/examples/k8s_deployment/master/run_master.sh new file mode 100644 index 0000000..b07187e --- /dev/null +++ b/paddle_fl/examples/k8s_deployment/master/run_master.sh @@ -0,0 +1,3 @@ +python fl_master.py --trainer_num 2 +tar -zcvf fl_job_config.tar.gz fl_job_config +python -m SimpleHTTPServer 8000 diff --git a/paddle_fl/examples/k8s_deployment/scheduler/fl_scheduler.py b/paddle_fl/examples/k8s_deployment/scheduler/fl_scheduler.py new file mode 100644 index 0000000..c93f132 --- /dev/null +++ b/paddle_fl/examples/k8s_deployment/scheduler/fl_scheduler.py @@ -0,0 +1,23 @@ +import argparse +from paddle_fl.core.scheduler.agent_master import FLScheduler + +def parse_args(): + parser = argparse.ArgumentParser(description="scheduler") + parser.add_argument( + '--trainer_num', + type=int, + default=2, + help='number trainers(default: 2)') + + return parser.parse_args() + +args = parse_args() +num_trainer = args.trainer_num +worker_num = num_trainer +server_num = 1 +# Define the number of worker/server and the port for scheduler +scheduler = FLScheduler(worker_num,server_num,port=9091) +scheduler.set_sample_worker_num(worker_num) +scheduler.init_env() +print("init env done.") +scheduler.start_fl_training() diff --git a/paddle_fl/examples/k8s_deployment/scheduler/run_scheduler.sh b/paddle_fl/examples/k8s_deployment/scheduler/run_scheduler.sh new file mode 100644 index 0000000..13494e2 --- /dev/null +++ b/paddle_fl/examples/k8s_deployment/scheduler/run_scheduler.sh @@ -0,0 +1,3 @@ +python fl_scheduler.py --trainer_num 2 + + diff --git a/paddle_fl/examples/k8s_deployment/server/fl_server.py b/paddle_fl/examples/k8s_deployment/server/fl_server.py new file mode 100644 index 0000000..a833ee9 --- /dev/null +++ b/paddle_fl/examples/k8s_deployment/server/fl_server.py @@ -0,0 +1,32 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle_fl as fl +import os +import paddle.fluid as fluid +from paddle_fl.core.server.fl_server import FLServer +from paddle_fl.core.master.fl_job import FLRunTimeJob +import time +server = FLServer() +server_id = 0 +job_path = "fl_job_config" +job = FLRunTimeJob() +job.load_server_job(job_path, server_id) +job._scheduler_ep = os.environ['FL_SCHEDULER_SERVICE_HOST'] + ":" + os.environ['FL_SCHEDULER_SERVICE_PORT_FL_SCHEDULER']# IP address for scheduler +#job._endpoints = os.environ['POD_IP'] + ":" + os.environ['FL_SERVER_SERVICE_PORT_FL_SERVER'] # IP address for server +server.set_server_job(job) +server._current_ep = os.environ['FL_SERVER_SERVICE_HOST'] + ":" + os.environ['FL_SERVER_SERVICE_PORT_FL_SERVER'] # IP address for server +print(job._scheduler_ep,server._current_ep) +server.start() +print("connect") diff --git a/paddle_fl/examples/k8s_deployment/server/run_server.sh b/paddle_fl/examples/k8s_deployment/server/run_server.sh new file mode 100644 index 0000000..203d96c --- /dev/null +++ b/paddle_fl/examples/k8s_deployment/server/run_server.sh @@ -0,0 +1,9 @@ +export GLOG_v=3 +wget ${FL_MASTER_SERVICE_HOST}:${FL_MASTER_SERVICE_PORT_FL_MASTER}/fl_job_config.tar.gz +while [ $? -ne 0 ] +do + sleep 3 + wget ${FL_MASTER_SERVICE_HOST}:${FL_MASTER_SERVICE_PORT_FL_MASTER}/fl_job_config.tar.gz +done +tar -xf fl_job_config.tar.gz +python -u fl_server.py > server.log 2>&1 diff --git a/paddle_fl/examples/k8s_deployment/trainer/fl_trainer.py b/paddle_fl/examples/k8s_deployment/trainer/fl_trainer.py new file mode 100644 index 0000000..ef19579 --- /dev/null +++ b/paddle_fl/examples/k8s_deployment/trainer/fl_trainer.py @@ -0,0 +1,41 @@ +from paddle_fl.core.trainer.fl_trainer import FLTrainerFactory +from paddle_fl.core.master.fl_job import FLRunTimeJob +import numpy as np +import sys +import logging +import time +logging.basicConfig(filename="test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG) + + +def reader(): + for i in range(1000): + data_dict = {} + for i in range(3): + data_dict[str(i)] = np.random.rand(1, 5).astype('float32') + data_dict["label"] = np.random.randint(2, size=(1, 1)).astype('int64') + yield data_dict + +trainer_id = int(sys.argv[1]) # trainer id for each guest +job_path = "fl_job_config" +job = FLRunTimeJob() +job.load_trainer_job(job_path, trainer_id) +#job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer +job._scheduler_ep = os.environ['FL_SCHEDULER_SERVICE_HOST'] + ":" + os.environ['FL_SCHEDULER_SERVICE_PORT_FL_SCHEDULER'] +trainer = FLTrainerFactory().create_fl_trainer(job) +trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id) +trainer.start() +print(trainer._scheduler_ep, trainer._current_ep) +output_folder = "fl_model" +epoch_id = 0 +while not trainer.stop(): + print("batch %d start train" % (epoch_id)) + train_step = 0 + for data in reader(): + trainer.run(feed=data, fetch=[]) + train_step += 1 + if train_step == trainer._step: + break + epoch_id += 1 + if epoch_id % 5 == 0: + trainer.save_inference_program(output_folder) + diff --git a/paddle_fl/examples/k8s_deployment/trainer/run_trainer.sh b/paddle_fl/examples/k8s_deployment/trainer/run_trainer.sh new file mode 100644 index 0000000..88e27b1 --- /dev/null +++ b/paddle_fl/examples/k8s_deployment/trainer/run_trainer.sh @@ -0,0 +1,14 @@ +#Download config file +wget + +#Download image +sudo docker pull [paddle-fl image] + +#Build docker +sudo docker run --name paddlefl -it -v $PWD:/root [paddle-fl image] /bin/bash + +sudo docker cp /path/to/config paddlefl:/path/to/config/file/at/container + +#Run program + +python -u fl_trainer.py > trainer.log & diff --git a/paddle_fl/examples/k8s_deployment/trainer0/fl_trainer.py b/paddle_fl/examples/k8s_deployment/trainer0/fl_trainer.py new file mode 100644 index 0000000..bd76eef --- /dev/null +++ b/paddle_fl/examples/k8s_deployment/trainer0/fl_trainer.py @@ -0,0 +1,43 @@ +from paddle_fl.core.trainer.fl_trainer import FLTrainerFactory +from paddle_fl.core.master.fl_job import FLRunTimeJob +import numpy as np +import sys +import os +import logging +import time +logging.basicConfig(filename="test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG) + + +def reader(): + for i in range(1000): + data_dict = {} + for i in range(3): + data_dict[str(i)] = np.random.rand(1, 5).astype('float32') + data_dict["label"] = np.random.randint(2, size=(1, 1)).astype('int64') + yield data_dict + +trainer_id = int(sys.argv[1]) # trainer id for each guest +job_path = "fl_job_config" +job = FLRunTimeJob() +job.load_trainer_job(job_path, trainer_id) +#job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer +job._scheduler_ep = os.environ['FL_SCHEDULER_SERVICE_HOST'] + ":" + os.environ['FL_SCHEDULER_SERVICE_PORT_FL_SCHEDULER'] +trainer = FLTrainerFactory().create_fl_trainer(job) +#trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id) +trainer._current_ep = os.environ['TRAINER0_SERVICE_HOST'] + ":" + os.environ['TRAINER0_SERVICE_PORT_TRAINER0'] +trainer.start() +print(trainer._scheduler_ep, trainer._current_ep) +output_folder = "fl_model" +epoch_id = 0 +while not trainer.stop(): + print("batch %d start train" % (epoch_id)) + train_step = 0 + for data in reader(): + trainer.run(feed=data, fetch=[]) + train_step += 1 + if train_step == trainer._step: + break + epoch_id += 1 + if epoch_id % 5 == 0: + trainer.save_inference_program(output_folder) + diff --git a/paddle_fl/examples/k8s_deployment/trainer0/test_trainer.sh b/paddle_fl/examples/k8s_deployment/trainer0/test_trainer.sh new file mode 100644 index 0000000..cfcedcf --- /dev/null +++ b/paddle_fl/examples/k8s_deployment/trainer0/test_trainer.sh @@ -0,0 +1,9 @@ +wget ${FL_MASTER_SERVICE_HOST}:${FL_MASTER_SERVICE_PORT_FL_MASTER}/fl_job_config.tar.gz +while [ $? -ne 0 ] +do + sleep 3 + wget ${FL_MASTER_SERVICE_HOST}:${FL_MASTER_SERVICE_PORT_FL_MASTER}/fl_job_config.tar.gz +done +tar -xf fl_job_config.tar.gz +sleep 10 +python -u fl_trainer.py 0 diff --git a/paddle_fl/examples/k8s_deployment/trainer1/fl_trainer.py b/paddle_fl/examples/k8s_deployment/trainer1/fl_trainer.py new file mode 100644 index 0000000..5cbe285 --- /dev/null +++ b/paddle_fl/examples/k8s_deployment/trainer1/fl_trainer.py @@ -0,0 +1,43 @@ +from paddle_fl.core.trainer.fl_trainer import FLTrainerFactory +from paddle_fl.core.master.fl_job import FLRunTimeJob +import numpy as np +import sys +import os +import logging +import time +logging.basicConfig(filename="test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG) + + +def reader(): + for i in range(1000): + data_dict = {} + for i in range(3): + data_dict[str(i)] = np.random.rand(1, 5).astype('float32') + data_dict["label"] = np.random.randint(2, size=(1, 1)).astype('int64') + yield data_dict + +trainer_id = int(sys.argv[1]) # trainer id for each guest +job_path = "fl_job_config" +job = FLRunTimeJob() +job.load_trainer_job(job_path, trainer_id) +#job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer +job._scheduler_ep = os.environ['FL_SCHEDULER_SERVICE_HOST'] + ":" + os.environ['FL_SCHEDULER_SERVICE_PORT_FL_SCHEDULER'] +trainer = FLTrainerFactory().create_fl_trainer(job) +#trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id) +trainer._current_ep = os.environ['TRAINER1_SERVICE_HOST'] + ":" + os.environ['TRAINER1_SERVICE_PORT_TRAINER1'] +trainer.start() +print(trainer._scheduler_ep, trainer._current_ep) +output_folder = "fl_model" +epoch_id = 0 +while not trainer.stop(): + print("batch %d start train" % (epoch_id)) + train_step = 0 + for data in reader(): + trainer.run(feed=data, fetch=[]) + train_step += 1 + if train_step == trainer._step: + break + epoch_id += 1 + if epoch_id % 5 == 0: + trainer.save_inference_program(output_folder) + diff --git a/paddle_fl/examples/k8s_deployment/trainer1/test_trainer.sh b/paddle_fl/examples/k8s_deployment/trainer1/test_trainer.sh new file mode 100644 index 0000000..82dd83a --- /dev/null +++ b/paddle_fl/examples/k8s_deployment/trainer1/test_trainer.sh @@ -0,0 +1,9 @@ +wget ${FL_MASTER_SERVICE_HOST}:${FL_MASTER_SERVICE_PORT_FL_MASTER}/fl_job_config.tar.gz +while [ $? -ne 0 ] +do + sleep 3 + wget ${FL_MASTER_SERVICE_HOST}:${FL_MASTER_SERVICE_PORT_FL_MASTER}/fl_job_config.tar.gz +done +tar -xf fl_job_config.tar.gz +sleep 10 +python -u fl_trainer.py 1 diff --git a/paddle_fl/version.py b/paddle_fl/version.py index 2f9278c..4060d4a 100644 --- a/paddle_fl/version.py +++ b/paddle_fl/version.py @@ -12,6 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. """ PaddleFL version string """ -fl_version = "0.1.10" -module_proto_version = "0.1.10" +fl_version = "0.1.11" +module_proto_version = "0.1.11" -- GitLab