fl_trainer.py 8.4 KB
Newer Older
G
guru4elephant 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
G
guru4elephant 已提交
15
import logging
Q
qjing666 已提交
16
from paddle_fl.core.scheduler.agent_master import FLWorkerAgent
17
import numpy
18
import hmac
19
#from .diffiehellman.diffiehellman import DiffieHellman
G
guru4elephant 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33

class FLTrainerFactory(object):
    def __init__(self):
        pass

    def create_fl_trainer(self, job):
        strategy = job._strategy
        trainer = None
        if strategy._fed_avg == True:
            trainer = FedAvgTrainer()
            trainer.set_trainer_job(job)
        elif strategy._dpsgd == True:
            trainer = FLTrainer()
            trainer.set_trainer_job(job)
34 35 36
        elif strategy._sec_agg == True:
            trainer = SecAggTrainer()
            trainer.set_trainer_job(job)
G
guru4elephant 已提交
37 38 39 40 41 42
        trainer.set_trainer_job(job)
        return trainer


class FLTrainer(object):
    def __init__(self):
G
guru4elephant 已提交
43
        self._logger = logging.getLogger("FLTrainer")
G
guru4elephant 已提交
44 45 46 47 48 49 50 51 52 53
        pass

    def set_trainer_job(self, job):
        self._startup_program = \
            job._trainer_startup_program
        self._main_program = \
            job._trainer_main_program
        self._step = job._strategy._inner_step
        self._feed_names = job._feed_names
        self._target_names = job._target_names
Q
qjing666 已提交
54
        self._scheduler_ep = job._scheduler_ep
G
giddenslee 已提交
55 56
        self._current_ep = None
        self.cur_step = 0
G
guru4elephant 已提交
57 58

    def start(self):
Q
qjing666 已提交
59 60 61
        #current_ep = "to be added"
        self.agent = FLWorkerAgent(self._scheduler_ep, self._current_ep)
        self.agent.connect_scheduler()
G
guru4elephant 已提交
62 63 64
        self.exe = fluid.Executor(fluid.CPUPlace())
        self.exe.run(self._startup_program)

G
guru4elephant 已提交
65 66 67
    def run(self, feed, fetch):
        self._logger.debug("begin to run")
        self.exe.run(self._main_program,
Q
qjing666 已提交
68 69
                      feed=feed,
                      fetch_list=fetch)
G
guru4elephant 已提交
70
        self._logger.debug("end to run current batch")
G
giddenslee 已提交
71
        self.cur_step += 1
G
guru4elephant 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89

    def save_inference_program(self, output_folder):
        target_vars = []
        infer_program = self._main_program.clone(for_test=True)
        for name in self._target_names:
            tmp_var = self._main_program.block(0)._find_var_recursive(name)
            target_vars.append(tmp_var)
        fluid.io.save_inference_model(
            output_folder,
            self._feed_names,
            target_vars,
            self.exe,
            main_program=infer_program)

    def stop(self):
        # ask for termination with master endpoint
        # currently not open sourced, will release the code later
        # TODO(guru4elephant): add connection with master
G
giddenslee 已提交
90 91 92 93
        if self.cur_step != 0:
            while not self.agent.finish_training():
                print('wait others finish')
                continue
Q
qjing666 已提交
94
        while not self.agent.can_join_training():
G
giddenslee 已提交
95 96 97 98
            print("wait permit")
            continue
        print("ready to train")
        return False
G
guru4elephant 已提交
99

100

G
guru4elephant 已提交
101 102 103 104 105 106
class FedAvgTrainer(FLTrainer):
    def __init__(self):
        super(FedAvgTrainer, self).__init__()
        pass

    def start(self):
G
giddenslee 已提交
107
        #current_ep = "to be added"
Q
qjing666 已提交
108
        self.agent = FLWorkerAgent(self._scheduler_ep, self._current_ep)
G
giddenslee 已提交
109
        self.agent.connect_scheduler()
G
guru4elephant 已提交
110 111 112 113 114 115 116 117
        self.exe = fluid.Executor(fluid.CPUPlace())
        self.exe.run(self._startup_program)

    def set_trainer_job(self, job):
        super(FedAvgTrainer, self).set_trainer_job(job)
        self._send_program = job._trainer_send_program
        self._recv_program = job._trainer_recv_program

G
guru4elephant 已提交
118 119 120
    def reset(self):
        self.cur_step = 0

Q
qjing666 已提交
121
    def run_with_epoch(self,reader,feeder,fetch,num_epoch):
122
        self._logger.debug("begin to run recv program")
Q
qjing666 已提交
123
        self.exe.run(self._recv_program)
124 125 126 127 128 129
        epoch = 0
        for i in range(num_epoch):
	        print(epoch)
	        for data in reader():
			    self.exe.run(self._main_program,
                          feed=feeder.feed(data),
Q
qjing666 已提交
130
                           fetch_list=fetch)
131 132 133
	        self.cur_step += 1
	        epoch += 1
        self._logger.debug("begin to run send program")
Q
qjing666 已提交
134
        self.exe.run(self._send_program)
G
guru4elephant 已提交
135
    def run(self, feed, fetch):
G
guru4elephant 已提交
136 137
        self._logger.debug("begin to run FedAvgTrainer, cur_step=%d, inner_step=%d" %
                           (self.cur_step, self._step))
G
guru4elephant 已提交
138
        if self.cur_step % self._step == 0:
G
guru4elephant 已提交
139
            self._logger.debug("begin to run recv program")
G
guru4elephant 已提交
140
            self.exe.run(self._recv_program)
G
guru4elephant 已提交
141
        self._logger.debug("begin to run current step")
G
giddenslee 已提交
142
        loss = self.exe.run(self._main_program,
G
guru4elephant 已提交
143 144 145
                     feed=feed,
                     fetch_list=fetch)
        if self.cur_step % self._step == 0:
G
guru4elephant 已提交
146
            self._logger.debug("begin to run send program")
G
guru4elephant 已提交
147 148
            self.exe.run(self._send_program)
        self.cur_step += 1
F
frankwhzhang 已提交
149
        return loss
150 151 152
       
 

G
giddenslee 已提交
153 154


155 156 157 158 159
class SecAggTrainer(FLTrainer):
    def __init__(self):
        super(SecAggTrainer, self).__init__()
        pass

160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
    @property
    def trainer_id(self):
        return self._trainer_id

    @trainer_id.setter
    def trainer_id(self, s):
        self._trainer_id = s

    @property
    def trainer_num(self):
        return self._trainer_num

    @trainer_num.setter
    def trainer_num(self, s):
        self._trainer_num = s

    @property
    def key_dir(self):
        return self._key_dir

    @key_dir.setter
    def key_dir(self, s):
        self._key_dir = s

    @property
    def step_id(self):
        return self._step_id

    @step_id.setter
    def step_id(self, s):
        self._step_id = s

192 193 194 195 196 197 198 199 200
    def start(self):
        self.exe = fluid.Executor(fluid.CPUPlace())
        self.exe.run(self._startup_program)
        self.cur_step = 0

    def set_trainer_job(self, job):
        super(SecAggTrainer, self).set_trainer_job(job)
        self._send_program = job._trainer_send_program
        self._recv_program = job._trainer_recv_program
201 202
        self_step = job._strategy._inner_step
        self._param_name_list = job._strategy._param_name_list
203 204 205 206

    def reset(self):
        self.cur_step = 0

207
    def run(self, feed, fetch):
208 209 210 211 212 213
        self._logger.debug("begin to run SecAggTrainer, cur_step=%d, inner_step=%d" %
                           (self.cur_step, self._step))
        if self.cur_step % self._step == 0:
            self._logger.debug("begin to run recv program")
            self.exe.run(self._recv_program)
        scope = fluid.global_scope()
G
guru4elephant 已提交
214
        self._logger.debug("begin to run current step")
G
giddenslee 已提交
215
        loss = self.exe.run(self._main_program,
G
guru4elephant 已提交
216 217 218
                     feed=feed,
                     fetch_list=fetch)
        if self.cur_step % self._step == 0:
G
guru4elephant 已提交
219
            self._logger.debug("begin to run send program")
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
            noise = 0.0
            scale = pow(10.0, 5)
            digestmod="SHA256"
            # 1. load priv key and other's pub key
            dh = DiffieHellman(group=15, key_length=256)
            dh.load_private_key(self._key_dir + str(self._trainer_id) + "_priv_key.txt")
            key = str(self._step_id).encode("utf-8")
            for i in range(self._trainer_num):
                if i != self._trainer_id:
                    f = open(self._key_dir + str(i) + "_pub_key.txt", "r")
                    public_key = int(f.read())
                    dh.generate_shared_secret(public_key, echo_return_key=True)
                    msg = dh.shared_key.encode("utf-8")
                    hex_res1 = hmac.new(key=key, msg=msg, digestmod=digestmod).hexdigest()
                    current_noise = int(hex_res1[0:8], 16) / scale
                    if i > self._trainer_id:
                        noise = noise + current_noise
                    else:
                        noise = noise - current_noise

240
            scope = fluid.global_scope()
241 242 243
            for param_name in self._param_name_list:
                fluid.global_scope().var(param_name + str(self._trainer_id)).get_tensor().set(
                    numpy.array(scope.find_var(param_name + str(self._trainer_id)).get_tensor()) + noise, fluid.CPUPlace())
G
guru4elephant 已提交
244 245
            self.exe.run(self._send_program)
        self.cur_step += 1
F
frankwhzhang 已提交
246
        return loss
G
guru4elephant 已提交
247

G
guru4elephant 已提交
248 249
    def stop(self):
        return False