fl_trainer.py 4.4 KB
Newer Older
G
guru4elephant 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
G
guru4elephant 已提交
15
import logging
Q
qjing666 已提交
16
from paddle_fl.core.scheduler.agent_master import FLWorkerAgent
G
guru4elephant 已提交
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36

class FLTrainerFactory(object):
    def __init__(self):
        pass

    def create_fl_trainer(self, job):
        strategy = job._strategy
        trainer = None
        if strategy._fed_avg == True:
            trainer = FedAvgTrainer()
            trainer.set_trainer_job(job)
        elif strategy._dpsgd == True:
            trainer = FLTrainer()
            trainer.set_trainer_job(job)
        trainer.set_trainer_job(job)
        return trainer


class FLTrainer(object):
    def __init__(self):
G
guru4elephant 已提交
37
        self._logger = logging.getLogger("FLTrainer")
G
guru4elephant 已提交
38 39 40 41 42 43 44 45 46 47
        pass

    def set_trainer_job(self, job):
        self._startup_program = \
            job._trainer_startup_program
        self._main_program = \
            job._trainer_main_program
        self._step = job._strategy._inner_step
        self._feed_names = job._feed_names
        self._target_names = job._target_names
Q
qjing666 已提交
48 49 50
        self._scheduler_ep = job._scheduler_ep
	self._current_ep = None
	self.cur_step = 0
G
guru4elephant 已提交
51 52

    def start(self):
Q
qjing666 已提交
53 54 55
        #current_ep = "to be added"
        self.agent = FLWorkerAgent(self._scheduler_ep, self._current_ep)
        self.agent.connect_scheduler()
G
guru4elephant 已提交
56 57 58
        self.exe = fluid.Executor(fluid.CPUPlace())
        self.exe.run(self._startup_program)

G
guru4elephant 已提交
59 60 61
    def run(self, feed, fetch):
        self._logger.debug("begin to run")
        self.exe.run(self._main_program,
Q
qjing666 已提交
62 63
                      feed=feed,
                      fetch_list=fetch)
G
guru4elephant 已提交
64
        self._logger.debug("end to run current batch")
Q
qjing666 已提交
65
	self.cur_step += 1
G
guru4elephant 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83

    def save_inference_program(self, output_folder):
        target_vars = []
        infer_program = self._main_program.clone(for_test=True)
        for name in self._target_names:
            tmp_var = self._main_program.block(0)._find_var_recursive(name)
            target_vars.append(tmp_var)
        fluid.io.save_inference_model(
            output_folder,
            self._feed_names,
            target_vars,
            self.exe,
            main_program=infer_program)

    def stop(self):
        # ask for termination with master endpoint
        # currently not open sourced, will release the code later
        # TODO(guru4elephant): add connection with master
Q
qjing666 已提交
84 85 86 87 88 89 90 91 92
	if self.cur_step != 0:
		while not self.agent.finish_training():
			print('wait others finish')
			continue
        while not self.agent.can_join_training():
		print("wait permit")
		continue    
	print("ready to train")
	return False
G
guru4elephant 已提交
93 94 95 96 97 98 99

class FedAvgTrainer(FLTrainer):
    def __init__(self):
        super(FedAvgTrainer, self).__init__()
        pass

    def start(self):
Q
qjing666 已提交
100 101 102
	#current_ep = "to be added"
        self.agent = FLWorkerAgent(self._scheduler_ep, self._current_ep)
	self.agent.connect_scheduler()
G
guru4elephant 已提交
103 104 105 106 107 108 109 110
        self.exe = fluid.Executor(fluid.CPUPlace())
        self.exe.run(self._startup_program)

    def set_trainer_job(self, job):
        super(FedAvgTrainer, self).set_trainer_job(job)
        self._send_program = job._trainer_send_program
        self._recv_program = job._trainer_recv_program

G
guru4elephant 已提交
111 112 113 114
    def reset(self):
        self.cur_step = 0

    def run(self, feed, fetch):
G
guru4elephant 已提交
115 116
        self._logger.debug("begin to run FedAvgTrainer, cur_step=%d, inner_step=%d" %
                           (self.cur_step, self._step))
G
guru4elephant 已提交
117
        if self.cur_step % self._step == 0:
G
guru4elephant 已提交
118
            self._logger.debug("begin to run recv program")
G
guru4elephant 已提交
119
            self.exe.run(self._recv_program)
G
guru4elephant 已提交
120
        self._logger.debug("begin to run current step")
F
frankwhzhang 已提交
121
        loss = self.exe.run(self._main_program, 
G
guru4elephant 已提交
122 123 124
                     feed=feed,
                     fetch_list=fetch)
        if self.cur_step % self._step == 0:
G
guru4elephant 已提交
125
            self._logger.debug("begin to run send program")
G
guru4elephant 已提交
126 127
            self.exe.run(self._send_program)
        self.cur_step += 1
F
frankwhzhang 已提交
128
        return loss
G
guru4elephant 已提交
129