# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import paddle.fluid as fluid import os import paddle_fl as fl from paddle_fl.core.master.job_generator import JobGenerator from paddle_fl.core.strategy.fl_strategy_base import FLStrategyFactory def parse_args(): parser = argparse.ArgumentParser(description="master") parser.add_argument( '--trainer_num', type=int, default=2, help='number of trainer(default: 2)') return parser.parse_args() class Model(object): def __init__(self): pass def mlp(self, inputs, label, hidden_size=128): self.concat = fluid.layers.concat(inputs, axis=1) self.fc1 = fluid.layers.fc(input=self.concat, size=256, act='relu') self.fc2 = fluid.layers.fc(input=self.fc1, size=128, act='relu') self.predict = fluid.layers.fc(input=self.fc2, size=2, act='softmax') self.sum_cost = fluid.layers.cross_entropy( input=self.predict, label=label) self.accuracy = fluid.layers.accuracy(input=self.predict, label=label) self.loss = fluid.layers.reduce_mean(self.sum_cost) self.startup_program = fluid.default_startup_program() inputs = [fluid.layers.data( \ name=str(slot_id), shape=[5], dtype="float32") for slot_id in range(3)] label = fluid.layers.data( \ name="label", shape=[1], dtype='int64') model = Model() model.mlp(inputs, label) job_generator = JobGenerator() optimizer = fluid.optimizer.SGD(learning_rate=0.1) job_generator.set_optimizer(optimizer) job_generator.set_losses([model.loss]) job_generator.set_startup_program(model.startup_program) job_generator.set_infer_feed_and_target_names([x.name for x in inputs], [model.predict.name]) build_strategy = FLStrategyFactory() build_strategy.fed_avg = True build_strategy.inner_step = 10 strategy = build_strategy.create_fl_strategy() # endpoints will be collected through the cluster # in this example, we suppose endpoints have been collected server_service_ip = os.environ['FL_SERVER_SERVICE_HOST'] + ":" + os.environ[ 'FL_SERVER_SERVICE_PORT_FL_SERVER'] service_endpoints = [server_service_ip] pod_endpoints = ["0.0.0.0:8181"] output = "fl_job_config" args = parse_args() num_trainer = args.trainer_num #job_generator.generate_fl_job( # strategy, server_endpoints=endpoints, worker_num=num_trainer, output=output) # fl_job_config will be dispatched to workers job_generator.generate_fl_job_for_k8s( strategy, server_pod_endpoints=pod_endpoints, server_service_endpoints=service_endpoints, worker_num=2, output=output)