localsgd_optimizer.py 8.2 KB
Newer Older
Y
Yi Liu 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

S
ShenLiang 已提交
17
import paddle
18
from paddle.fluid import program_guard, layers, default_main_program
Y
Yi Liu 已提交
19 20 21 22 23 24 25 26 27
from .meta_optimizer_base import MetaOptimizerBase
from .common import OpRole, OP_ROLE_KEY, CollectiveHelper, is_update_op


class LocalSGDOptimizer(MetaOptimizerBase):
    def __init__(self, optimizer):
        super(LocalSGDOptimizer, self).__init__(optimizer)
        self.inner_opt = optimizer
        self.meta_optimizers_white_list = []
28
        self.meta_optimizers_black_list = ["GraphExecutionOptimizer"]
Y
Yi Liu 已提交
29 30 31
        self.snapshot_key = '@SNAPSHOT'

    def _can_apply(self):
32 33 34
        if not self.role_maker._is_collective:
            return False

Y
Yi Liu 已提交
35 36 37 38 39 40
        if not self.user_defined_strategy.localsgd:
            return False

        if self.role_maker.worker_num() <= 1:
            return False

S
ShenLiang 已提交
41
        return isinstance(self.inner_opt, paddle.optimizer.momentum.Momentum) \
42 43 44
            or isinstance(self.inner_opt, paddle.fluid.optimizer.Momentum) \
            or isinstance(self.inner_opt, paddle.optimizer.sgd.SGD) \
            or isinstance(self.inner_opt, paddle.fluid.optimizer.SGD)
Y
Yi Liu 已提交
45 46 47

    def _disable_strategy(self, dist_strategy):
        dist_strategy.localsgd = False
48
        dist_strategy.localsgd_configs = {}
Y
Yi Liu 已提交
49

50
    def _enable_strategy(self, dist_strategy, context):
51 52 53
        dist_strategy.localsgd = True
        dist_strategy.localsgd_configs = {"k_steps": 1}

Y
Yi Liu 已提交
54 55 56
    def snapshot_name(self, param_name):
        return param_name + self.snapshot_key

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
    def create_snapshot_vars(self, program):
        block = program.global_block()

        non_dist_params = []
        for param in block.iter_parameters():
            if not param.is_distributed:
                non_dist_params.append(param)

        p2s = []
        for param in non_dist_params:
            snapshot = block.create_var(
                name=self.snapshot_name(param.name),
                shape=param.shape,
                persistable=True,
                stop_gradient=True,
                dtype=param.dtype)
            p2s.append([param, snapshot])
        return p2s

    def init_snapshot_vars(self, startup_program, param2snapshot):
        with program_guard(startup_program):
            for param, snapshot in param2snapshot:
                layers.assign(param, snapshot)

Y
Yi Liu 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
    def minimize_impl(self,
                      loss,
                      startup_program=None,
                      parameter_list=None,
                      no_grad_set=None):
        minimized = self.inner_opt.minimize(
            loss, startup_program=startup_program)

        init_k_steps = self.user_defined_strategy.localsgd_configs['k_steps']
        auto_steps = self.user_defined_strategy.auto

        if startup_program is None:
            startup_program = default_startup_program()
        main_block = loss.block

        self.nrings = 2
        collective_helper = CollectiveHelper(self.role_maker, self.nrings)
        collective_helper.update_startup_program(startup_program)
99 100
        p2s = self.create_snapshot_vars(startup_program)
        self.init_snapshot_vars(startup_program, p2s)
Y
Yi Liu 已提交
101

102 103
        p2s = self.create_snapshot_vars(main_block.program)
        with program_guard(main_block.program, startup_program):
Y
Yi Liu 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
            step = layers.autoincreased_step_counter(begin=0)
            k_steps = layers.create_global_var(
                name="k_steps",
                shape=[1],
                value=init_k_steps,
                dtype='int64',
                persistable=True)
            last_step = layers.create_global_var(
                name="last_step",
                shape=[1],
                value=int(0),
                dtype='int64',
                persistable=True)

            if auto_steps:
119 120 121
                avg_loss = layers.collective._c_allreduce(
                    loss) / self.role_maker.worker_num()

Y
Yi Liu 已提交
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
                lr_0 = layers.create_global_var(
                    name="lr_0",
                    shape=[1],
                    value=float(0),
                    dtype='float32',
                    persistable=True)
                loss_0 = layers.create_global_var(
                    name="loss_0",
                    shape=[1],
                    value=float(0),
                    dtype='float32',
                    persistable=True)

                global_lr = self.inner_opt._global_learning_rate()

                def initialize():
                    layers.assign(loss, loss_0)
                    layers.assign(global_lr, lr_0)

                layers.cond(step == 0, initialize)

            def communicate():
144
                sub_block = default_main_program().current_block()
Y
Yi Liu 已提交
145
                ring_id = -1
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
                for param, snapshot in p2s:
                    sub_block.append_op(
                        type='elementwise_sub',
                        inputs={'X': [snapshot],
                                'Y': [param]},
                        outputs={'Out': [param]},
                        attrs={OP_ROLE_KEY: OpRole.Optimize})
                    sub_block.append_op(
                        type='c_sync_calc_stream',
                        inputs={'X': param},
                        outputs={'Out': param},
                        attrs={OP_ROLE_KEY: OpRole.Optimize})
                    ring_id = (ring_id + 1) % self.nrings
                    sub_block.append_op(
                        type='c_allreduce_sum',
                        inputs={'X': [param]},
                        outputs={'Out': [param]},
                        attrs={
                            'ring_id': ring_id,
                            OP_ROLE_KEY: OpRole.Optimize
                        })
Y
Yi Liu 已提交
167 168

                for ring_id in range(self.nrings):
169
                    sub_block.append_op(
Y
Yi Liu 已提交
170 171 172 173 174 175 176 177
                        type='c_sync_comm_stream',
                        inputs={'X': param},
                        outputs={'Out': param},
                        attrs={
                            'ring_id': ring_id,
                            OP_ROLE_KEY: OpRole.Optimize
                        })

178 179
                for param, snapshot in p2s:
                    sub_block.append_op(
Y
Yi Liu 已提交
180 181 182 183 184 185 186
                        type='scale',
                        inputs={'X': [param]},
                        outputs={'Out': [param]},
                        attrs={
                            'scale': 1.0 / self.role_maker.worker_num(),
                            OP_ROLE_KEY: OpRole.Optimize
                        })
187
                    sub_block.append_op(
Y
Yi Liu 已提交
188 189 190 191 192
                        type='elementwise_sub',
                        inputs={'X': [snapshot],
                                'Y': [param]},
                        outputs={'Out': [param]},
                        attrs={OP_ROLE_KEY: OpRole.Optimize})
193
                    sub_block.append_op(
Y
Yi Liu 已提交
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
                        type='assign',
                        inputs={'X': [param]},
                        outputs={'Out': [snapshot]},
                        attrs={OP_ROLE_KEY: OpRole.Optimize})

                if auto_steps:
                    next_local_steps = layers.cast(
                        layers.ceil(
                            layers.sqrt(lr_0 * loss / (global_lr * loss_0) *
                                        float(init_k_steps))),
                        dtype='int64')
                    max_local_steps = layers.fill_constant(
                        shape=[1], dtype='int64', value=16)
                    next_local_steps = layers.elementwise_min(next_local_steps,
                                                              max_local_steps)
                    layers.assign(next_local_steps, k_steps)
                layers.assign(step, last_step)

            layers.cond(step - last_step == k_steps, communicate)

        return minimized