提交 132f3723 编写于 作者: Q qjing666

add k8s deployment solution

上级 95cdf430
......@@ -146,3 +146,41 @@ class JobGenerator(object):
local_job.set_target_names(self._target_names)
local_job.set_strategy(fl_strategy)
local_job.save(output)
def generate_fl_job_for_k8s(self,
fl_strategy,
server_pod_endpoints=[],
server_service_endpoints=[],
worker_num=1,
output=None):
local_job = FLCompileTimeJob()
assert len(self._losses) > 0
assert self._startup_prog != None
assert fl_strategy != None
assert output != None
fl_strategy.minimize(self._optimizer, self._losses)
# strategy can generate startup and main program
# of a single worker and servers
for trainer_id in range(worker_num):
startup_program = self._startup_prog.clone()
main_program = self._losses[0].block.program.clone()
fl_strategy._build_trainer_program_for_job(
trainer_id, program=main_program,
ps_endpoints=server_service_endpoints, trainers=worker_num,
sync_mode=True, startup_program=startup_program,
job=local_job)
startup_program = self._startup_prog.clone()
main_program = self._losses[0].block.program.clone()
fl_strategy._build_server_programs_for_job(
program=main_program, ps_endpoints=server_pod_endpoints,
trainers=worker_num, sync_mode=True,
startup_program=startup_program, job=local_job)
local_job.set_feed_names(self._feed_names)
local_job.set_target_names(self._target_names)
local_job.set_strategy(fl_strategy)
local_job.save(output)
import argparse
import paddle.fluid as fluid
import os
import paddle_fl as fl
from paddle_fl.core.master.job_generator import JobGenerator
from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory
def parse_args():
parser = argparse.ArgumentParser(description="master")
parser.add_argument(
'--trainer_num',
type=int,
default=2,
help='number of trainer(default: 2)')
return parser.parse_args()
class Model(object):
def __init__(self):
pass
def mlp(self, inputs, label, hidden_size=128):
self.concat = fluid.layers.concat(inputs, axis=1)
self.fc1 = fluid.layers.fc(input=self.concat, size=256, act='relu')
self.fc2 = fluid.layers.fc(input=self.fc1, size=128, act='relu')
self.predict = fluid.layers.fc(input=self.fc2, size=2, act='softmax')
self.sum_cost = fluid.layers.cross_entropy(input=self.predict, label=label)
self.accuracy = fluid.layers.accuracy(input=self.predict, label=label)
self.loss = fluid.layers.reduce_mean(self.sum_cost)
self.startup_program = fluid.default_startup_program()
inputs = [fluid.layers.data( \
name=str(slot_id), shape=[5],
dtype="float32")
for slot_id in range(3)]
label = fluid.layers.data( \
name="label",
shape=[1],
dtype='int64')
model = Model()
model.mlp(inputs, label)
job_generator = JobGenerator()
optimizer = fluid.optimizer.SGD(learning_rate=0.1)
job_generator.set_optimizer(optimizer)
job_generator.set_losses([model.loss])
job_generator.set_startup_program(model.startup_program)
job_generator.set_infer_feed_and_target_names(
[x.name for x in inputs], [model.predict.name])
build_strategy = FLStrategyFactory()
build_strategy.fed_avg = True
build_strategy.inner_step = 10
strategy = build_strategy.create_fl_strategy()
# endpoints will be collected through the cluster
# in this example, we suppose endpoints have been collected
server_service_ip = os.environ['FL_SERVER_SERVICE_HOST'] + ":" + os.environ['FL_SERVER_SERVICE_PORT_FL_SERVER']
service_endpoints = [server_service_ip]
pod_endpoints = ["0.0.0.0:8181"]
output = "fl_job_config"
args = parse_args()
num_trainer = args.trainer_num
#job_generator.generate_fl_job(
# strategy, server_endpoints=endpoints, worker_num=num_trainer, output=output)
# fl_job_config will be dispatched to workers
job_generator.generate_fl_job_for_k8s(
strategy, server_pod_endpoints=pod_endpoints,server_service_endpoints=service_endpoints, worker_num=2, output=output)
python fl_master.py --trainer_num 2
tar -zcvf fl_job_config.tar.gz fl_job_config
python -m SimpleHTTPServer 8000
import argparse
from paddle_fl.core.scheduler.agent_master import FLScheduler
def parse_args():
parser = argparse.ArgumentParser(description="scheduler")
parser.add_argument(
'--trainer_num',
type=int,
default=2,
help='number trainers(default: 2)')
return parser.parse_args()
args = parse_args()
num_trainer = args.trainer_num
worker_num = num_trainer
server_num = 1
# Define the number of worker/server and the port for scheduler
scheduler = FLScheduler(worker_num,server_num,port=9091)
scheduler.set_sample_worker_num(worker_num)
scheduler.init_env()
print("init env done.")
scheduler.start_fl_training()
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle_fl as fl
import os
import paddle.fluid as fluid
from paddle_fl.core.server.fl_server import FLServer
from paddle_fl.core.master.fl_job import FLRunTimeJob
import time
server = FLServer()
server_id = 0
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_server_job(job_path, server_id)
job._scheduler_ep = os.environ['FL_SCHEDULER_SERVICE_HOST'] + ":" + os.environ['FL_SCHEDULER_SERVICE_PORT_FL_SCHEDULER']# IP address for scheduler
#job._endpoints = os.environ['POD_IP'] + ":" + os.environ['FL_SERVER_SERVICE_PORT_FL_SERVER'] # IP address for server
server.set_server_job(job)
server._current_ep = os.environ['FL_SERVER_SERVICE_HOST'] + ":" + os.environ['FL_SERVER_SERVICE_PORT_FL_SERVER'] # IP address for server
print(job._scheduler_ep,server._current_ep)
server.start()
print("connect")
export GLOG_v=3
wget ${FL_MASTER_SERVICE_HOST}:${FL_MASTER_SERVICE_PORT_FL_MASTER}/fl_job_config.tar.gz
while [ $? -ne 0 ]
do
sleep 3
wget ${FL_MASTER_SERVICE_HOST}:${FL_MASTER_SERVICE_PORT_FL_MASTER}/fl_job_config.tar.gz
done
tar -xf fl_job_config.tar.gz
python -u fl_server.py > server.log 2>&1
from paddle_fl.core.trainer.fl_trainer import FLTrainerFactory
from paddle_fl.core.master.fl_job import FLRunTimeJob
import numpy as np
import sys
import logging
import time
logging.basicConfig(filename="test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG)
def reader():
for i in range(1000):
data_dict = {}
for i in range(3):
data_dict[str(i)] = np.random.rand(1, 5).astype('float32')
data_dict["label"] = np.random.randint(2, size=(1, 1)).astype('int64')
yield data_dict
trainer_id = int(sys.argv[1]) # trainer id for each guest
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id)
#job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer
job._scheduler_ep = os.environ['FL_SCHEDULER_SERVICE_HOST'] + ":" + os.environ['FL_SCHEDULER_SERVICE_PORT_FL_SCHEDULER']
trainer = FLTrainerFactory().create_fl_trainer(job)
trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id)
trainer.start()
print(trainer._scheduler_ep, trainer._current_ep)
output_folder = "fl_model"
epoch_id = 0
while not trainer.stop():
print("batch %d start train" % (epoch_id))
train_step = 0
for data in reader():
trainer.run(feed=data, fetch=[])
train_step += 1
if train_step == trainer._step:
break
epoch_id += 1
if epoch_id % 5 == 0:
trainer.save_inference_program(output_folder)
#Download config file
wget
#Download image
sudo docker pull [paddle-fl image]
#Build docker
sudo docker run --name paddlefl -it -v $PWD:/root [paddle-fl image] /bin/bash
sudo docker cp /path/to/config paddlefl:/path/to/config/file/at/container
#Run program
python -u fl_trainer.py > trainer.log &
from paddle_fl.core.trainer.fl_trainer import FLTrainerFactory
from paddle_fl.core.master.fl_job import FLRunTimeJob
import numpy as np
import sys
import os
import logging
import time
logging.basicConfig(filename="test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG)
def reader():
for i in range(1000):
data_dict = {}
for i in range(3):
data_dict[str(i)] = np.random.rand(1, 5).astype('float32')
data_dict["label"] = np.random.randint(2, size=(1, 1)).astype('int64')
yield data_dict
trainer_id = int(sys.argv[1]) # trainer id for each guest
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id)
#job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer
job._scheduler_ep = os.environ['FL_SCHEDULER_SERVICE_HOST'] + ":" + os.environ['FL_SCHEDULER_SERVICE_PORT_FL_SCHEDULER']
trainer = FLTrainerFactory().create_fl_trainer(job)
#trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id)
trainer._current_ep = os.environ['TRAINER0_SERVICE_HOST'] + ":" + os.environ['TRAINER0_SERVICE_PORT_TRAINER0']
trainer.start()
print(trainer._scheduler_ep, trainer._current_ep)
output_folder = "fl_model"
epoch_id = 0
while not trainer.stop():
print("batch %d start train" % (epoch_id))
train_step = 0
for data in reader():
trainer.run(feed=data, fetch=[])
train_step += 1
if train_step == trainer._step:
break
epoch_id += 1
if epoch_id % 5 == 0:
trainer.save_inference_program(output_folder)
wget ${FL_MASTER_SERVICE_HOST}:${FL_MASTER_SERVICE_PORT_FL_MASTER}/fl_job_config.tar.gz
while [ $? -ne 0 ]
do
sleep 3
wget ${FL_MASTER_SERVICE_HOST}:${FL_MASTER_SERVICE_PORT_FL_MASTER}/fl_job_config.tar.gz
done
tar -xf fl_job_config.tar.gz
sleep 10
python -u fl_trainer.py 0
from paddle_fl.core.trainer.fl_trainer import FLTrainerFactory
from paddle_fl.core.master.fl_job import FLRunTimeJob
import numpy as np
import sys
import os
import logging
import time
logging.basicConfig(filename="test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG)
def reader():
for i in range(1000):
data_dict = {}
for i in range(3):
data_dict[str(i)] = np.random.rand(1, 5).astype('float32')
data_dict["label"] = np.random.randint(2, size=(1, 1)).astype('int64')
yield data_dict
trainer_id = int(sys.argv[1]) # trainer id for each guest
job_path = "fl_job_config"
job = FLRunTimeJob()
job.load_trainer_job(job_path, trainer_id)
#job._scheduler_ep = "127.0.0.1:9091" # Inform the scheduler IP to trainer
job._scheduler_ep = os.environ['FL_SCHEDULER_SERVICE_HOST'] + ":" + os.environ['FL_SCHEDULER_SERVICE_PORT_FL_SCHEDULER']
trainer = FLTrainerFactory().create_fl_trainer(job)
#trainer._current_ep = "127.0.0.1:{}".format(9000+trainer_id)
trainer._current_ep = os.environ['TRAINER1_SERVICE_HOST'] + ":" + os.environ['TRAINER1_SERVICE_PORT_TRAINER1']
trainer.start()
print(trainer._scheduler_ep, trainer._current_ep)
output_folder = "fl_model"
epoch_id = 0
while not trainer.stop():
print("batch %d start train" % (epoch_id))
train_step = 0
for data in reader():
trainer.run(feed=data, fetch=[])
train_step += 1
if train_step == trainer._step:
break
epoch_id += 1
if epoch_id % 5 == 0:
trainer.save_inference_program(output_folder)
wget ${FL_MASTER_SERVICE_HOST}:${FL_MASTER_SERVICE_PORT_FL_MASTER}/fl_job_config.tar.gz
while [ $? -ne 0 ]
do
sleep 3
wget ${FL_MASTER_SERVICE_HOST}:${FL_MASTER_SERVICE_PORT_FL_MASTER}/fl_job_config.tar.gz
done
tar -xf fl_job_config.tar.gz
sleep 10
python -u fl_trainer.py 1
......@@ -12,6 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
""" PaddleFL version string """
fl_version = "0.1.10"
module_proto_version = "0.1.10"
fl_version = "0.1.11"
module_proto_version = "0.1.11"
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册