auto_parallel_gradient_merge.py 12.3 KB
Newer Older
1
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
2
#
3 4 5
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9 10 11 12 13 14
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
from typing import Any, Dict, List, Tuple
16 17

import paddle
18 19
from paddle.distributed.auto_parallel.process_mesh import ProcessMesh
from paddle.distributed.auto_parallel.static.process_group import (
20
    get_world_process_group,
21
)
22
from paddle.distributed.auto_parallel.static.utils import (
23
    is_optimize_op,
24
    naive_set_dist_op_attr_for_program_by_mesh_and_mapping,
25
    set_var_dist_attr,
26
)
27
from paddle.distributed.fleet.meta_optimizers.common import OP_ROLE_KEY, OpRole
28
from paddle.framework import core
29
from paddle.static import device_guard
30 31

from .pass_base import PassBase, PassType, register_pass
32

33
world_process_group = get_world_process_group()
34 35 36 37 38 39 40 41 42 43 44


def _remove_and_get_optimizer_op(main_program, dist_context):
    # 1 create tmp block
    # 2 mv optimizer op from global program to tmp block
    # 3 del the op from dist_context
    main_block = main_program.global_block()
    temp_block = main_program._create_block()
    removed_op_idx = []
    optimize_ops_desc = []
    for idx, op in enumerate(main_block.ops):
45
        if is_optimize_op(op):
46 47 48 49 50 51 52 53 54 55 56
            # append optimizer op to tmp block
            new_op_desc = temp_block.desc.append_op()
            new_op_desc.copy_from(op.desc)
            optimize_ops_desc.append(new_op_desc)
            removed_op_idx.append(idx)

            # del op from dist_context
            if dist_context:
                dist_context.del_dist_op_for_program(op)

    for idx in removed_op_idx[::-1]:
57 58
        main_block._remove_op(idx, sync=False)
    main_block._sync_with_cpp()
59 60 61 62

    return optimize_ops_desc


63
def _get_gm_cond_var(main_program, k_steps, dist_context):
64 65
    main_block = main_program.global_block()
    # Add const var
66
    k_step_var = paddle.static.create_global_var(
67 68 69 70 71 72 73
        name="gradient_merge_k",
        shape=[1],
        value=int(k_steps),
        dtype='int32',
        persistable=True,
        force_cpu=True,
    )
74
    set_var_dist_attr(dist_context, k_step_var, [-1], world_process_group.ranks)
75

76
    zero_var = paddle.static.create_global_var(
77 78 79 80 81 82 83
        name="gradient_merge_zero",
        shape=[1],
        value=int(0),
        dtype='int32',
        persistable=True,
        force_cpu=True,
    )
84
    set_var_dist_attr(dist_context, zero_var, [-1], world_process_group.ranks)
85 86

    # Add step var & cond var
87
    step_var = paddle.static.create_global_var(
88 89 90 91 92 93 94
        name="gradient_merge_step",
        shape=[1],
        value=int(0),
        dtype='int32',
        persistable=True,
        force_cpu=True,
    )
95
    set_var_dist_attr(dist_context, step_var, [-1], world_process_group.ranks)
96

97 98 99 100 101 102 103
    cond_var = paddle.static.create_global_var(
        name="gradient_merge_cond",
        shape=[1],
        value=bool(0),
        dtype='bool',
        persistable=True,
        force_cpu=True,
104
    )
105
    set_var_dist_attr(dist_context, cond_var, [-1], world_process_group.ranks)
106 107

    with device_guard("cpu"):
108
        # step_var += 1
109 110 111 112 113 114
        increment_op = main_block.append_op(
            type='increment',
            inputs={'X': [step_var]},
            outputs={'Out': [step_var]},
            attrs={'step': float(1.0), OP_ROLE_KEY: OpRole.Backward},
        )
115
        naive_set_dist_op_attr_for_program_by_mesh_and_mapping(
116 117 118 119
            increment_op,
            ProcessMesh(world_process_group.ranks),
            [-1],
            dist_context,
120
        )
121
        # step_var %= k_step
122 123 124 125 126 127 128 129 130 131
        elementwise_mod_op = main_block.append_op(
            type='elementwise_mod',
            inputs={'X': step_var, 'Y': k_step_var},
            outputs={'Out': step_var},
            attrs={
                'axis': -1,
                'use_mkldnn': False,
                OP_ROLE_KEY: OpRole.Backward,
            },
        )
132
        naive_set_dist_op_attr_for_program_by_mesh_and_mapping(
133 134 135 136
            elementwise_mod_op,
            ProcessMesh(world_process_group.ranks),
            [-1],
            dist_context,
137
        )
138
        # cond_var = (step_var == 0)
139 140 141 142 143 144
        equal_op = main_block.append_op(
            type='equal',
            inputs={'X': step_var, 'Y': zero_var},
            outputs={'Out': cond_var},
            attrs={OP_ROLE_KEY: OpRole.Backward},
        )
145
        naive_set_dist_op_attr_for_program_by_mesh_and_mapping(
146
            equal_op, ProcessMesh(world_process_group.ranks), [-1], dist_context
147
        )
148 149 150 151 152

    return cond_var


def _append_gradient_merge_backward_op(
153 154 155 156 157
    main_program,
    startup_program,
    params_grads: List[Tuple[Any, Any]],
    dist_context,
) -> Tuple[List[Tuple[Any, Any]], Dict[str, Any]]:
158 159 160 161 162 163 164 165 166
    main_block = main_program.global_block()
    startup_block = startup_program.global_block()

    # step1: remove grad.op's op_role_var
    for param, grad in params_grads:
        assert (
            param.type != core.VarDesc.VarType.SELECTED_ROWS
        ), "SELECTED_ROWS is not supported in GradientMergeOptimizer for now"

167 168 169
    # {grad.name: gradient_merge_var.name} to rename opt inputs
    grad_to_gradient_merge = {}
    # {param: gradient_merge_var} to insert scale op and fill_constant op
170 171 172 173 174
    new_params_to_grads = []
    # step2: create gradient_merge var and init with 0
    for param, grad in params_grads:
        param_name = param.name
        param_var = main_block.var(param_name)
175
        assert param_var is not None
176 177
        ref_dist_attr = dist_context.get_tensor_dist_attr_for_program(param_var)
        assert ref_dist_attr is not None
178 179 180 181 182 183
        gradient_merge_var = main_block.create_var(
            name=param_name + "@GRAD@GradientMerge",
            shape=param_var.shape,
            dtype=param_var.dtype,
            persistable=True,
        )
184 185 186
        ref_process_mesh = ref_dist_attr.process_mesh
        ref_dims_mapping = ref_dist_attr.dims_mapping

187 188 189
        set_var_dist_attr(
            dist_context, gradient_merge_var, ref_dims_mapping, ref_process_mesh
        )
190 191 192 193 194

        startup_gradient_merge_var = startup_block.create_var(
            name=param_name + "@GRAD@GradientMerge",
            shape=param_var.shape,
            dtype=param_var.dtype,
195 196 197 198 199 200 201 202 203 204 205
            persistable=True,
        )
        startup_block.append_op(
            type="fill_constant",
            outputs={"Out": startup_gradient_merge_var},
            attrs={
                "shape": param_var.shape,
                "dtype": param_var.dtype,
                "value": float(0),
            },
        )
206 207

        # grad_merge += grad
208 209 210 211 212 213 214 215 216 217
        new_grad_op = main_block.append_op(
            type="elementwise_add",
            inputs={'X': grad, 'Y': gradient_merge_var},
            outputs={'Out': gradient_merge_var},
            attrs={
                'axis': -1,
                'use_mkldnn': False,
                OP_ROLE_KEY: OpRole.Backward,
            },
        )
218
        new_params_to_grads.append([param, gradient_merge_var])
219
        grad_to_gradient_merge[grad.name] = gradient_merge_var.name
220
        naive_set_dist_op_attr_for_program_by_mesh_and_mapping(
221 222
            new_grad_op, ref_process_mesh, ref_dims_mapping, dist_context
        )
223
    return new_params_to_grads, grad_to_gradient_merge
224 225 226


def _create_cond_block_and_update_optimizer(
227 228 229 230 231 232 233 234
    main_program,
    cond_var,
    new_params_to_grads: List[Tuple[Any, Any]],
    grad_to_gradient_merge: Dict[str, str],
    optimize_ops_desc: List[Any],
    k_steps,
    avg,
):
235 236 237 238 239 240 241 242 243 244
    def true_apply_gradient():
        cur_block_idx = main_program.current_block_idx
        cur_block = main_program.current_block()

        # cur_block's forward_block & backward_block is itself
        cur_block._set_forward_block_idx(cur_block_idx)
        op_maker = core.op_proto_and_checker_maker
        if avg:
            for param, new_grad in new_params_to_grads:
                # grad /= k_steps
245 246 247 248 249 250 251 252 253 254
                cur_block.append_op(
                    type='scale',
                    inputs={'X': new_grad},
                    outputs={'Out': new_grad},
                    attrs={
                        'scale': 1.0 / k_steps,
                        'bias': 0.0,
                        'bias_after_scale': False,
                    },
                )
255
                new_grad.op._set_attr(OP_ROLE_KEY, OpRole.Optimize)
256 257 258 259 260 261

        # append optimizer ops
        for op_desc in optimize_ops_desc:
            new_op_desc = cur_block.desc.append_op()
            new_op_desc.copy_from(op_desc)

262
            # update input/output
263
            for input_name in new_op_desc.input_arg_names():
264 265
                if input_name in grad_to_gradient_merge:
                    new_op_desc._rename_input(
266 267
                        input_name, grad_to_gradient_merge[input_name]
                    )
268 269

            for output_name in new_op_desc.output_arg_names():
270 271
                if output_name in grad_to_gradient_merge:
                    new_op_desc._rename_output(
272 273
                        output_name, grad_to_gradient_merge[output_name]
                    )
274 275 276 277 278 279

            # remove op_role_var
            if new_op_desc.has_attr(op_maker.kOpRoleVarAttrName()):
                new_op_desc.remove_attr(op_maker.kOpRoleVarAttrName())

            # op's update Grad
280
            if core.grad_var_suffix() in new_op_desc.input_arg_names():
281 282 283 284 285 286 287 288 289 290
                grad_value = new_op_desc.input("Grad")[0]
                # TODO FIXME(xym) support fp16
                grad_merge_value = grad_value + '@GradientMerge'
                new_op_desc.set_input("Grad", [grad_merge_value])

        main_program.global_block()._sync_with_cpp()
        cur_block._sync_with_cpp()

        # clear gradient_merge_vars
        for param, new_grad in new_params_to_grads:
291
            paddle.tensor.fill_constant(
292 293 294 295 296
                shape=new_grad.shape,
                dtype=new_grad.dtype,
                value=0.0,
                out=new_grad,
            )
297
            new_grad.op._set_attr(OP_ROLE_KEY, op_maker.OpRole.Optimize)
298

299
    paddle.static.nn.cond(cond_var, true_fn=true_apply_gradient, false_fn=None)
300
    cond_op = main_program.global_block().ops[-1]
301
    cond_op._set_attr(OP_ROLE_KEY, OpRole.Optimize)
302 303


304 305 306
def parse_program(
    main_program, startup_program, params_grads, k_steps, avg, dist_context
):
307
    # 1 remove optimizer_op from main_program
308 309 310 311 312
    optimize_ops_desc = _remove_and_get_optimizer_op(main_program, dist_context)

    # back to block 0
    main_program._rollback()

313
    # 2 append gradient merge backward op to main_program
314 315 316 317 318 319
    (
        new_params_to_grads,
        grad_to_gradient_merge,
    ) = _append_gradient_merge_backward_op(
        main_program, startup_program, params_grads, dist_context
    )
320 321 322

    # 3 create gradient_merge_cond
    cond_var = _get_gm_cond_var(main_program, k_steps, dist_context)
323 324

    # 4 create ConditionalBlock and append gradient merge optimizer ops
325 326 327 328 329 330 331 332 333
    _create_cond_block_and_update_optimizer(
        main_program,
        cond_var,
        new_params_to_grads,
        grad_to_gradient_merge,
        optimize_ops_desc,
        k_steps,
        avg,
    )
334 335 336 337 338


@register_pass("auto_parallel_gradient_merge_pass")
class GradientMergePass(PassBase):
    def __init__(self):
339
        super().__init__()
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
        self.set_attr("k_steps", -1)
        self.set_attr("avg", True)

    def _check_self(self):
        if self.get_attr("k_steps") < 1:
            return False
        return True

    def _check_conflict(self, other_pass):
        return True

    def _type(self):
        return PassType.COMM_OPT

    def _apply_single_impl(self, main_program, startup_program, context):
        k_steps = self.get_attr("k_steps", -1)
        avg = self.get_attr("avg", False)
        dist_context = self.get_attr("dist_context")
        params_grads = self.get_attr("params_grads")
        with paddle.static.program_guard(main_program, startup_program):
360 361 362 363 364 365 366 367
            parse_program(
                main_program,
                startup_program,
                params_grads,
                k_steps,
                avg,
                dist_context,
            )
368 369

        main_program._sync_with_cpp()