sharding_optimizer.py 82.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
import os
16

W
wangxiaoning 已提交
17 18
from paddle.fluid import core
from paddle.fluid.optimizer import PipelineOptimizer
19 20 21 22 23 24
from paddle.static import (
    create_global_var,
    default_startup_program,
    device_guard,
)
from paddle.utils import unique_name
25

26 27 28 29 30 31 32 33 34 35
from ..utils.log_util import logger
from .common import (
    OP_ROLE_KEY,
    OP_ROLE_VAR_KEY,
    CollectiveHelper,
    OpRole,
    is_backward_op,
    is_optimizer_op,
    is_update_op,
)
36
from .meta_optimizer_base import MetaOptimizerBase
37
from .sharding import utils
38 39 40 41
from .sharding.fp16_helper import FP16Utils
from .sharding.gradient_clip_helper import GradientClipHelper
from .sharding.offload_helper import OffloadHelper
from .sharding.prune import ProgramDeps
42
from .sharding.shard import ProgramSegment, Shard
43 44
from .sharding.utils import (
    get_first_optimize_op_idx,
45
    get_grad_device,
46
    get_var_size,
47 48 49 50 51
    insert_allreduce_ops,
    insert_broadcast_ops,
    insert_cast_ops,
    insert_fill_constant_ops,
    insert_reduce_ops,
52
    insert_scale_loss_grad_ops,
53 54
    insert_sync_calc_op,
    insert_sync_comm_ops,
55
)
56
from .sharding.weight_decay_helper import WeightDecayHelper
57

58
__all__ = []
59 60 61


class ShardingOptimizer(MetaOptimizerBase):
62 63
    """Sharding Optimizer."""

64
    def __init__(self, optimizer):
65
        super().__init__(optimizer)
66 67 68 69
        self.inner_opt = optimizer
        self.meta_optimizers_white_list = [
            "RecomputeOptimizer",
            "AMPOptimizer",
70 71
            "LarsOptimizer",
            "LambOptimizer",
M
minghaoBD 已提交
72
            "ASPOptimizer",
73 74
            # "ModelParallelOptimizer",
            # "PipelineOptimizer",
75
        ]
76
        self.meta_optimizers_black_list = []
77 78 79 80 81 82 83 84 85
        self._main_program = None
        self._startup_program = None
        self._segments = []
        # params and fp16 params is for broadcast
        self._params = set([])
        self._broadcast_vars = set([])
        # reduced grads to param name
        self._reduced_grads_to_param = {}
        self._shard = Shard()
86 87 88 89
        self._verbose = False

        # use sharding as outer parallelism (e.g. inner:Megatron & outer sharding)
        self.mp_degree = 1
90 91 92 93 94 95 96 97 98 99 100 101 102 103

    def _can_apply(self):
        if not self.role_maker._is_collective:
            return False
        if self.role_maker._worker_num() <= 1:
            return False
        return self.user_defined_strategy.sharding

    def _disable_strategy(self, dist_strategy):
        dist_strategy.sharding = False
        dist_strategy.sharding_configs = {}

    def _enable_strategy(self, dist_strategy, context):
        dist_strategy.sharding = True
104
        dist_strategy.sharding_configs = {"segment_broadcast_MB": 32}
105

W
WangXi 已提交
106
    def _get_sharding_segment_strategy(self):
107
        """get
W
WangXi 已提交
108 109 110 111 112 113 114 115 116
        self._sharding_segment_strategy
        1. if by_size:    self._broadcast_MB
        2. if by_anchors: self._sharding_segment_anchors
                          self._backward_remain_anchors
                          self._forward_remain_anchors
        """
        strategy = self.user_defined_strategy
        sharding_configs = strategy.sharding_configs
        segment_strategy = str(sharding_configs["sharding_segment_strategy"])
117

W
WangXi 已提交
118 119
        if segment_strategy == "segment_broadcast_MB":
            self._broadcast_MB = sharding_configs["segment_broadcast_MB"]
120 121 122
            assert (
                self._broadcast_MB > 0
            ), "segment size should larger than zero !"
W
WangXi 已提交
123 124
        elif segment_strategy == "segment_anchors":
            self._sharding_segment_anchors = sharding_configs["segment_anchors"]
125 126 127
            assert (
                len(self._sharding_segment_anchors) > 0
            ), "you should set the sharding segment anchors !"
128 129 130 131 132
            self._backward_remain_anchors = self._sharding_segment_anchors[:]
            self._forward_remain_anchors = []
        else:
            raise NotImplementedError(
                "the sharding segment strategy [{}] is not implemented".format(
133 134 135
                    str(segment_strategy)
                )
            )
W
WangXi 已提交
136 137 138
        self._sharding_segment_strategy = segment_strategy

    def _get_hybrid_degree(self):
139
        """get
W
WangXi 已提交
140 141 142 143 144 145 146 147
        self.hybrid_dp
        self.sharding_degree
        self.mp_degree
        self.pp_degree
        self.dp_degree
        """
        strategy = self.user_defined_strategy
        sharding_configs = strategy.sharding_configs
148

149
        # parallelism
W
WangXi 已提交
150 151 152 153 154 155 156
        sharding_degree = int(sharding_configs["sharding_degree"])
        mp_degree = int(sharding_configs["mp_degree"])
        pp_degree = int(sharding_configs["pp_degree"])
        dp_degree = int(sharding_configs['dp_degree'])
        global_world_size = self.role_maker._worker_num()

        assert sharding_degree > 0, "sharding degree must be larger than zero"
157 158
        # pipeline setting
        # TODO (JZ-LIANG) should revise here for support mix parallelism with pipeline
W
WangXi 已提交
159 160 161
        if pp_degree > 1:
            assert strategy.pipeline is True

L
lilong12 已提交
162
        if os.getenv("PADDLE_MANUAL_PIPELINE_STAGE", None):
163 164 165 166 167 168 169 170
            assert pp_degree == 2, (
                "For manually set pipeline, only " "pp_degree = 2 is supported."
            )
            assert (
                global_world_size == mp_degree * sharding_degree * dp_degree
            ), "global work size [{}], mp_degree [{}], sharding_degree [{}], dp_degree [{}].".format(
                global_world_size, mp_degree, sharding_degree, dp_degree
            )
L
lilong12 已提交
171
        else:
172 173 174 175 176 177 178 179 180 181
            assert (
                global_world_size
                == mp_degree * sharding_degree * pp_degree * dp_degree
            ), "global work size [{}], mp_degree [{}], sharding_degree [{}], pp_degree [{}], dp_degree [{}].".format(
                global_world_size,
                mp_degree,
                sharding_degree,
                pp_degree,
                dp_degree,
            )
182

J
JZ-LIANG 已提交
183
        # FIXME (JZ-LIANG) deprecated hybrid_dp
W
WangXi 已提交
184
        if sharding_configs["hybrid_dp"]:
185
            logger.warning(
W
WangXi 已提交
186
                "[hybrid_dp] API setting is deprecated. Now when "
187 188
                "dp_degree >= 2, its will be in hybrid dp mode automatically"
            )
W
WangXi 已提交
189 190 191 192 193 194 195 196 197
            assert dp_degree >= 1

        self.hybrid_dp = True if dp_degree > 1 else False
        self.sharding_degree = sharding_degree
        self.mp_degree = mp_degree
        self.pp_degree = pp_degree
        self.dp_degree = dp_degree

    def _get_hybrid_dp_mode(self):
198
        """get
199 200
        self.hybrid_dp_mode = 'pp_hybrid_dp' or 'sharding_hybrid_dp'
        self.gradient_merge_mode = 'pp_gm' or 'sharding_gm'
W
WangXi 已提交
201 202
        self._gradient_merge_acc_step
        self.pp_allreduce_in_optimize
203
        self._optimizer_sharding
W
WangXi 已提交
204 205 206 207 208 209 210 211 212 213 214 215 216
        """
        strategy = self.user_defined_strategy
        sharding_configs = strategy.sharding_configs

        # NOTE (JZ-LIANG)
        # There 2 kind of modes for gradient-merge and hybrid-dp in mixed parallelism [sharding] and [pipeline].
        # We distinguish this two modes since the gm/hybrid-dp related allreduce should be insert in different place
        # according different mode to have best performance:
        # sharding: communication within node, and therefore should insert within backward segment
        #           to overlap with bw calc, conduct every micro step.
        # pipeline: communication across nodes, and therefore should insert in update segment,
        #           conduct just once per global step.
        dp_mode = None
217 218 219
        # dp here is the pure dp as the outest parallelism
        if self.hybrid_dp:
            if self.pp_degree > 1:
W
WangXi 已提交
220
                dp_mode = "pp_hybrid_dp"
221
            else:
222 223
                assert self.sharding_degree > 1, (
                    "by now we only support five kind of hybrid dp: sharding_hybrid_dp, "
W
WangXi 已提交
224
                    "mp_sharding_hybrid_dp, pp_hybrid_dp, mp_sharding_pp_hybrid_dp, sharding_pp_hybrid_dp."
225
                )
W
WangXi 已提交
226
                dp_mode = "sharding_hybrid_dp"
227

228
        # gradient merge
W
WangXi 已提交
229 230
        gm_mode = None
        gm_acc_step = int(sharding_configs["gradient_merge_acc_step"])
231
        if self.pp_degree <= 1:
W
WangXi 已提交
232
            gm_mode = "sharding_gm"
233 234
            self._grad2merged_grad = dict()
        else:
W
WangXi 已提交
235 236
            gm_mode = "pp_gm"
            gm_acc_step = strategy.pipeline_configs['accumulate_steps']
237
            gradient_scale_configs = strategy.gradient_scale_configs
238 239 240 241 242 243 244
            assert gradient_scale_configs['scale_strategy'] == 'avg', (
                'For pipeline mode, the '
                'gradient scale mode should '
                'be "avg", but got {}'.format(
                    gradient_scale_configs['scale_strategy']
                )
            )
245 246 247 248
            # Note (Yuang Liu): this avg_loss flag determines where to do the average op for grad merge.
            # If True, will do sum firstly for gradient merge, then do scale by gm_acc_step.
            # If False, will scale loss by gm_acc_step first, then do sum for gradient merge.
            self.scale_gradient = gradient_scale_configs['scale_gradient']
W
WangXi 已提交
249
        if gm_acc_step > 1:
250 251 252 253 254
            logger.info(
                "Gradient merge in [{}], acc step = [{}]".format(
                    gm_mode, gm_acc_step
                )
            )
255

256 257 258
        optimizer_sharding = False
        # TODO(wangxi): need support dp_as_opt_sharding with sharding
        #               need support without pp in future
259 260 261 262 263 264
        if (
            self.sharding_degree == 1
            and self.dp_degree > 1
            and sharding_configs['_dp_as_optimizer_sharding']
            and self.pp_degree > 1
        ):
265 266
            optimizer_sharding = True

W
WangXi 已提交
267 268 269
        self.hybrid_dp_mode = dp_mode
        self.gradient_merge_mode = gm_mode
        self._gradient_merge_acc_step = gm_acc_step
270
        self._optimizer_sharding = optimizer_sharding
271 272

        # this feature is design for ascend, and should NOT be used in GPU training
W
WangXi 已提交
273
        self.pp_allreduce_in_optimize = sharding_configs[
274 275
            "pp_allreduce_in_optimize"
        ]
276

277 278 279
    def _inner_opt_minimize(
        self, loss, startup_program, parameter_list, no_grad_set
    ):
W
WangXi 已提交
280 281
        pipeline_configs = self.user_defined_strategy.pipeline_configs

282 283
        if self.inner_opt is None:
            raise ValueError(
284 285
                "self.inner_opt of ShardingOptimizer should not be None."
            )
286 287

        if self.pp_degree > 1:
W
wangxiaoning 已提交
288
            pp_optimizer = PipelineOptimizer(
289 290
                self.inner_opt, self._gradient_merge_acc_step
            )
W
WangXi 已提交
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
            self._pp_optimizer = pp_optimizer

            global_rank = self.role_maker._worker_index()
            schedule_mode = pipeline_configs['schedule_mode']

            pipeline_opt = {
                'schedule_mode': schedule_mode,
                'micro_batch_size': pipeline_configs['micro_batch_size'],
                'local_rank': self.pp_rank,
                'global_rank': global_rank,
                'use_sharding': True,
                # TODO (JZ-LIANG) should revise here for support mix parallelism with pipeline
                'ring_id': 20,
                'global_ring_id': 3,
                'mp_degree': self.mp_degree,
                'mp_rank': global_rank % self.mp_degree,
307
                'scale_gradient': self.scale_gradient,
W
WangXi 已提交
308
            }
309 310
            main_program = loss.block.program
            main_program._pipeline_opt = pipeline_opt
311

312 313 314 315 316 317 318 319 320
            (
                optimize_ops,
                params_grads,
                program_list,
                self.pipeline_pair,
                self.pp_ring_map,
            ) = pp_optimizer.minimize(
                loss, startup_program, parameter_list, no_grad_set
            )
W
WangXi 已提交
321
            assert self.pp_degree == len(program_list)
322 323
        else:
            optimize_ops, params_grads = self.inner_opt.minimize(
324 325
                loss, startup_program, parameter_list, no_grad_set
            )
326 327 328

        if startup_program is None:
            startup_program = default_startup_program()
329 330 331

        if self.pp_degree > 1:
            startup_program = startup_program._pipeline_opt['startup_program']
W
WangXi 已提交
332
            print("pp_rank:", self.pp_rank)
L
lilong12 已提交
333
            if os.getenv("PADDLE_MANUAL_PIPELINE_STAGE", None):
334 335 336
                main_program = program_list[
                    int(os.getenv("PADDLE_MANUAL_PIPELINE_STAGE"))
                ]
L
lilong12 已提交
337 338
            else:
                main_program = program_list[self.pp_rank]
339 340 341 342 343 344 345 346 347 348 349
            with open("main_%d" % self.role_maker._worker_index(), 'w') as f:
                f.writelines(str(main_program))
            main_block = main_program.global_block()
            new_params_grads = []
            for param, grad in params_grads:
                if main_block.has_var(param.name):
                    new_params_grads.append((param, grad))
            params_grads = new_params_grads
        else:
            main_block = loss.block

350 351 352 353
        startup_block = startup_program.global_block()
        self._main_program = main_block.program
        self._startup_program = startup_program

354 355 356 357 358
        if self.pp_degree > 1:
            pp_optimizer._rename_gradient_var_name(main_block)
            with open("main_%d" % self.role_maker._worker_index(), 'w') as f:
                f.writelines(str(main_program))

W
WangXi 已提交
359
        return optimize_ops, params_grads
360

W
WangXi 已提交
361
    def _apply_sharding_pass(self, params_grads):
362 363
        if self.sharding_degree == 1:
            return
W
WangXi 已提交
364 365 366

        main_block = self._main_program.global_block()
        startup_block = self._startup_program.global_block()
367

W
WangXi 已提交
368
        # step1: build shard
369 370 371
        self._build_shard(
            params_grads, self.sharding_rank, self.sharding_degree
        )
372

W
WangXi 已提交
373 374
        # step2: split_program
        self._split_program(main_block)
375

W
WangXi 已提交
376 377 378 379
        # step3: add broadcast and reduce ops
        self._add_broadcast_allreduce(main_block)
        main_block._sync_with_cpp()
        startup_block._sync_with_cpp()
380

W
WangXi 已提交
381
        # step4: remove unneeded ops and vars from block
382
        self._prune_main_program(
383 384 385 386
            main_block,
            self._shard,
            [self.mp_ring_id, self.sharding_ring_id, self.pp_ring_id],
        )
387 388 389
        self._prune_startup_program(startup_block, self._shard)

    def _apply_opt_sharding_pass(self, params_grads):
390 391 392
        """outer dp as optimizer sharding"""
        if self._optimizer_sharding is False:
            return
393 394 395

        main_block = self._main_program.global_block()
        startup_block = self._startup_program.global_block()
396

397 398 399 400 401 402 403 404 405
        # step1: build shard
        self._build_shard(params_grads, self.dp_rank, self.dp_degree)

        # NOTE(wangxi): prune_main_program will prune cast if not add this
        for param, grad in params_grads:
            self._reduced_grads_to_param[grad.name] = param.name

        # step4: remove unneeded ops and vars from block
        self._prune_main_program(
406 407 408 409
            main_block,
            self._shard,
            [self.mp_ring_id, self.pp_ring_id, self.dp_ring_id],
        )
410 411 412
        self._prune_startup_program(startup_block, self._shard)

    def _insert_allreduce_for_pp(self, params_grads):
413 414
        if self.pp_degree == 1:
            return
415

W
WangXi 已提交
416
        strategy = self.user_defined_strategy
417
        sharding_configs = strategy.sharding_configs
418

W
WangXi 已提交
419 420 421 422 423 424 425 426 427 428 429 430
        main_block = self._main_program.global_block()
        startup_block = self._startup_program.global_block()

        # sharding-pp related logic
        # pp_optimizer._rename_gradient_var_name(main_block)
        # crop ops
        if self.sharding_degree > 1:
            for idx, op in reversed(list(enumerate(main_block.ops))):
                if is_update_op(op):
                    op_role_var = op.attr('op_role_var')
                    param_name = op_role_var[0]
                    if not self._shard.has_param(param_name):
431 432
                        main_block._remove_op(idx)

W
WangXi 已提交
433
            for idx, op in reversed(list(enumerate(main_block.ops))):
434 435
                if op.type != 'cast':
                    continue
W
WangXi 已提交
436
                in_name = op.input_arg_names[0]
437 438 439
                if in_name not in self._params:
                    continue
                # if self._shard.has_param(param_name): continue
W
WangXi 已提交
440 441 442
                if in_name not in main_block.vars:
                    main_block._remove_op(idx)

443 444 445 446 447
        if self._optimizer_sharding:
            # TODO(wangxi): support fp16_allreduce with optimizer sharding
            strategy.fp16_allreduce = False

        shard = self._shard if self._optimizer_sharding else None
W
WangXi 已提交
448
        accumulated_grad_names = self._pp_optimizer._accumulate_gradients(
449 450
            main_block, strategy=strategy, shard=shard
        )
451 452

        len_of_ops = len(main_block.ops)
453 454
        if self.scale_gradient:
            self._avg_grad_merge_after_sum(main_block, accumulated_grad_names)
455 456
        first_optimize_op_index = get_first_optimize_op_idx(main_block)

W
WangXi 已提交
457
        if self.pp_allreduce_in_optimize:
458 459 460
            logger.info(
                "Pipeline Persistable grad is {}".format(accumulated_grad_names)
            )
461 462 463 464
            # FIXME(wangxi): accumulated_grad get from pipeline is not
            #  include sharding's param@BroadCast grad when
            #  pp_allreduce_in_optimize
            accumulated_grad_names = insert_reduce_ops(
W
WangXi 已提交
465 466 467 468 469 470
                main_block,
                first_optimize_op_index,
                self.sharding_ring_id,
                accumulated_grad_names,
                self._shard,
                core.op_proto_and_checker_maker.OpRole.Optimize,
471
                use_calc_stream=True,
472 473
                rank=self.sharding_rank,
            )
474 475

            logger.info("PP-Sharding grad is {}".format(accumulated_grad_names))
476
            first_optimize_op_index += len(main_block.ops) - len_of_ops
477 478
            len_of_ops = len(main_block.ops)

479 480 481 482 483 484 485 486 487 488
        if self._optimizer_sharding:
            accumulated_grad_names = utils.insert_reduce_ops(
                main_block,
                first_optimize_op_index,
                self.dp_ring_id,
                accumulated_grad_names,
                self._shard,
                OpRole.Optimize,
                use_calc_stream=True,
                rank=self.dp_rank,
489 490
                strategy=strategy,
            )
491
            logger.info(
492 493 494
                "Optimizer grad in this rank {}".format(accumulated_grad_names)
            )
            first_optimize_op_index += len(main_block.ops) - len_of_ops
495 496
            len_of_ops = len(main_block.ops)

497 498
            # NOTE(wangxi): we fused after optimize_cast
            optimize_cast = sharding_configs['optimize_cast']
499 500 501
            optimizer_param = utils.insert_broadcast_param_ops(
                main_block,
                len_of_ops,
502 503
                self.dp_ring_id,
                [x[0].name for x in params_grads],
504 505 506 507
                self._shard,
                OpRole.Optimize,
                use_calc_stream=True,
                rank=self.dp_rank,
508 509
                strategy=None if optimize_cast else strategy,
            )
510
            logger.info(
511 512
                "Optimizer param in this rank {}".format(optimizer_param)
            )
513
            if not strategy.fuse_grad_merge and not optimize_cast:
514 515
                assert len(accumulated_grad_names) == len(optimizer_param)
        elif self.hybrid_dp and self.hybrid_dp_mode == "pp_hybrid_dp":
516 517 518 519 520 521 522
            insert_allreduce_ops(
                main_block,
                first_optimize_op_index,
                self.dp_ring_id,
                accumulated_grad_names,
                core.op_proto_and_checker_maker.OpRole.Optimize,
                use_calc_stream=True,
523 524 525
                user_defined_strategy=strategy,
            )
            first_optimize_op_index += len(main_block.ops) - len_of_ops
526 527 528
            len_of_ops = len(main_block.ops)

        # FIXME(wangxi): if fp16_allreduce, put cast fp16->fp32 to there?
529

530
    def _avg_grad_merge_after_sum(self, main_block, accumulated_grad_names):
531 532 533 534 535 536
        if (
            self.user_defined_strategy.amp
            and self.user_defined_strategy.amp_configs[
                'use_dynamic_loss_scaling'
            ]
        ):
537 538 539 540 541 542 543 544 545 546
            # For AMP, if using dynamic loss scaling the avg
            # operation can be simple done by modify the LossScaling op.
            for idx, op in enumerate(main_block.ops):
                if op.type == 'check_finite_and_unscale':
                    loss_scale_name = op.input('Scale')[0]
                    loss_scaling_var = main_block.var(loss_scale_name)
                    loss_scale_tmp_var_name = loss_scale_name + '@TMP'
                    loss_scale_tmp_var = main_block.create_var(
                        name=loss_scale_tmp_var_name,
                        shape=loss_scaling_var.shape,
547 548
                        dtype=loss_scaling_var.dtype,
                    )
549 550 551 552 553 554 555 556 557
                    main_block._insert_op_without_sync(
                        idx,
                        type='scale',
                        inputs={'X': loss_scaling_var},
                        outputs={'Out': loss_scale_tmp_var},
                        attrs={
                            'scale': self._gradient_merge_acc_step,
                            'bias': 0.0,
                            'bias_after_scale': False,
558 559 560
                            OP_ROLE_KEY: OpRole.Optimize,
                        },
                    )
561 562 563 564 565 566 567 568 569 570
                    op._rename_input(loss_scale_name, loss_scale_tmp_var_name)
                    break
        else:
            # For pp, do the avg operation for gradient merge after merging
            # the gradient to meet the logic for gradient merge under pure dp.
            tmp_first_opt_idx = None
            for idx, op in enumerate(main_block.ops):
                if is_optimizer_op(op) and op.type != 'c_sync_comm_stream':
                    tmp_first_opt_idx = idx
                    break
571 572 573
            assert (
                tmp_first_opt_idx is not None
            ), 'Occurs some errors, no optimize ops'
574 575 576 577 578 579 580 581 582 583
            for grad in accumulated_grad_names:
                main_block._insert_op_without_sync(
                    tmp_first_opt_idx,
                    type='scale',
                    inputs={'X': grad},
                    outputs={'Out': grad},
                    attrs={
                        'scale': 1.0 / self._gradient_merge_acc_step,
                        'bias': 0.0,
                        'bias_after_scale': False,
584 585 586
                        OP_ROLE_KEY: OpRole.Optimize,
                    },
                )
587

W
WangXi 已提交
588
    def _adapt_amp_clip_without_sharding(self):
589 590
        # if not use sharding, adapt amp/clip, for remain parallelism.
        # cast --> amp --> clip --> opt
591 592 593 594
        if self.sharding_degree > 1:
            return
        if self._optimizer_sharding:
            return
595

W
WangXi 已提交
596 597 598 599
        main_block = self._main_program.global_block()
        startup_block = self._startup_program.global_block()

        # amp inf_var & clip global_norm_var
600

601 602 603 604 605
        rings = [self.mp_ring_id, self.pp_ring_id]
        # FIXME(wangxi): some problem with NPU found_finite, need sync with DP
        if core.is_compiled_with_npu():
            rings += [self.dp_ring_id]
        FP16Utils.sync_amp_check_nan_inf(main_block, rings)
606

W
WangXi 已提交
607
        gradientclip_helper = GradientClipHelper(None)
608 609 610
        gradientclip_helper.sync_global_norm(
            main_block, [self.mp_ring_id, self.pp_ring_id], self.mp_rank
        )
W
WangXi 已提交
611 612 613 614 615

    def _insert_loss_grad_scale_op(self):
        main_block = self._main_program.global_block()

        # step6: loss div dp_degree
616 617 618
        global_dp_degree = self.sharding_degree * self.dp_degree
        assert int(global_dp_degree) == global_dp_degree
        if global_dp_degree > 1:
619
            insert_scale_loss_grad_ops(main_block, scale=global_dp_degree)
620

621 622
        main_block._sync_with_cpp()

623
    def _apply_optimize_offload_pass(self, params_grads):
W
WangXi 已提交
624 625 626 627 628
        strategy = self.user_defined_strategy
        sharding_configs = strategy.sharding_configs
        main_block = self._main_program.global_block()
        startup_block = self._startup_program.global_block()

629
        mp_ring_id = self.mp_ring_id if self.mp_degree > 1 else None
630
        dp_ring_id = self.dp_ring_id if self.dp_degree > 1 else None
631 632 633
        offload_helper = OffloadHelper(
            mp_ring_id=mp_ring_id, dp_ring_id=dp_ring_id
        )
634

W
WangXi 已提交
635 636 637 638
        # optimize offload should be enable while gradient merge is enable and
        # acc_step is quite large (e.g. >> 100). Since its memcpy could not be
        # overlap with calc, otherwise it will slower down training severely.
        if sharding_configs["optimize_offload"]:
639
            logger.info("Sharding with optimize offload !")
640
            offload_helper.offload(main_block, startup_block)
641
            # The optimize_cast is already included in offload_fp32param
642
            offload_helper.offload_fp32param(main_block, startup_block)
643 644 645 646
        elif sharding_configs['optimize_cast']:
            logger.info("Sharding with optimize cast !")
            # NOTE(wangxi): optimize_cast will persist fp16 param, it
            # will take more memory, but will be faster. Trade space for time.
647 648
            if self._optimizer_sharding:
                offload_helper.opt_sharding_cast_fp32param(
649 650
                    main_block, startup_block, [x[0].name for x in params_grads]
                )
651
                # NOTE(wangxi): fused after optimize_cast
652 653 654
                utils.fuse_opt_broadcast_param_ops(
                    main_block, dp_ring_id, self._shard, strategy=strategy
                )
655
            else:
656
                offload_helper.cast_fp32param_in_optimize(
657 658
                    main_block, startup_block
                )
659

W
WangXi 已提交
660 661 662
    def _dump_program_for_debug(self):
        main_block = self._main_program.global_block()
        startup_block = self._startup_program.global_block()
663 664 665
        with open(
            "start_sharding_%d" % self.role_maker._worker_index(), 'w'
        ) as f:
W
WangXi 已提交
666
            f.writelines(str(startup_block.program))
667 668 669
        with open(
            "main_sharding_%d" % self.role_maker._worker_index(), 'w'
        ) as f:
W
WangXi 已提交
670 671
            f.writelines(str(main_block.program))

672 673 674
    def minimize_impl(
        self, loss, startup_program=None, parameter_list=None, no_grad_set=None
    ):
W
WangXi 已提交
675 676 677 678 679 680 681 682 683 684 685 686 687 688
        # TODO: (JZ-LIANG) support multiple comm in future
        # self._nrings = self.user_defined_strategy.nccl_comm_num
        self._nrings_sharding = 1
        self._nrings_dp = 1

        self._get_sharding_segment_strategy()
        self._get_hybrid_degree()
        self._get_hybrid_dp_mode()

        # config sharding & dp groups
        self._build_groups()

        # inner optimize minimize
        optimize_ops, params_grads = self._inner_opt_minimize(
689 690
            loss, startup_program, parameter_list, no_grad_set
        )
W
WangXi 已提交
691 692 693 694 695

        self._init_comm()

        self._apply_sharding_pass(params_grads)

696 697 698
        self._apply_opt_sharding_pass(params_grads)

        self._insert_allreduce_for_pp(params_grads)
W
WangXi 已提交
699 700 701 702 703 704

        self._adapt_amp_clip_without_sharding()

        # loss div dp_degree
        self._insert_loss_grad_scale_op()

705
        # apply optimize offload or optimize cast
706
        self._apply_optimize_offload_pass(params_grads)
W
WangXi 已提交
707

708
        # step6: (optional) sharding gradient merge
W
WangXi 已提交
709
        self._sharding_gradient_merge()
710 711 712 713 714 715

        # # check op dependecy
        # FIXME (JZ-LIANG) enable checking in future.
        # check_broadcast(main_block)
        # check_allreduce_sum(main_block, self._shard, self.sharding_ring_id,
        #                     self.dp_ring_id)
716

W
WangXi 已提交
717 718 719
        # NOTE(JZ-LIANG) ensure in both sharding_hybrid_dp & pp_hybrid_dp
        # init param broadcast should be called after startup pruning
        self._initialization_broadcast()
720

721 722 723 724
        # NOTE(wangxi): if param is not persistable, program.clone will
        #  failed, so we remove no persistable param, recreate param as a var
        self._recreate_not_persist_param_as_var()

W
WangXi 已提交
725
        self._dump_program_for_debug()
726

727 728 729
        # GPU need to wait server ready, GPU and NPU is Layered connection
        if not core.is_compiled_with_npu():
            self._wait()
730 731
        return optimize_ops, params_grads

732 733 734 735 736 737
    def _init_pair_comm(self, pair, ring_id):
        pp_group_endpoints = [
            self.pp_group_endpoints[pair[0]],
            self.pp_group_endpoints[pair[1]],
        ]
        pp_rank = 0 if self.pp_rank == pair[0] else 1
L
lilong12 已提交
738
        if os.getenv("PADDLE_MANUAL_PIPELINE_STAGE", None) is None:
739 740 741 742 743 744 745 746 747
            self._collective_helper._init_communicator(
                self._startup_program,
                self.current_endpoint,
                pp_group_endpoints,
                pp_rank,
                ring_id,
                False,
                sync=False,
            )
748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764

    def _init_npu_pipeline_comm(self, startup_block):
        # NOTE(wangxi): some bug with hccl, must set pp_degree be even number
        assert (self.pp_degree % 2) == 0

        max_ring_id = -1
        my_pair = []
        for pair in self.pipeline_pair:
            pair_key = pair[0] * 1000 + pair[1]
            ring_id = self.pp_ring_map[pair_key]
            max_ring_id = max(max_ring_id, ring_id)
            logger.info("pp pair:{}, ring_id: {}".format(pair, ring_id))

            if self.pp_rank in pair:
                my_pair.append(pair)

        # for example: self.pp_rank=2, self.pp_degree=4
765 766 767 768
        send_to_next_pair = (
            self.pp_rank,
            (self.pp_rank + 1) % self.pp_degree,
        )  # 2->3
769
        recv_from_next_pair = (
770 771 772
            (self.pp_rank + 1) % self.pp_degree,
            self.pp_rank,
        )  # 3->2
773
        recv_from_prev_pair = (
774 775
            (self.pp_rank - 1 + self.pp_degree) % self.pp_degree,
            self.pp_rank,
776
        )  # 1->2
777 778 779 780
        send_to_prev_pair = (
            self.pp_rank,
            (self.pp_rank - 1 + self.pp_degree) % self.pp_degree,
        )  # 2->1
781 782 783 784 785 786 787 788

        even = (self.pp_rank % 2) == 0

        # 1. even send to next, odd recv from prev, 0->1, 2->3
        pair = send_to_next_pair if even else recv_from_prev_pair
        ring_id = self.pp_ring_map[pair[0] * 1000 + pair[1]]
        self._init_pair_comm(pair, ring_id)
        my_pair.remove(pair)
789 790 791
        logger.info(
            "pair0(even->odd): pp pair:{}, ring_id: {}".format(pair, ring_id)
        )
792 793 794 795 796 797

        # 2. even recv from next, odd send to prev, 1->0, 3->2
        pair = recv_from_next_pair if even else send_to_prev_pair
        ring_id = self.pp_ring_map[pair[0] * 1000 + pair[1]]
        self._init_pair_comm(pair, ring_id)
        my_pair.remove(pair)
798 799 800
        logger.info(
            "pair1(even<-odd): pp pair:{}, ring_id: {}".format(pair, ring_id)
        )
801 802 803 804 805

        # if pp_degree is 2, only need pair(0->1, 1->0)
        if self.pp_degree > 2:
            # 3. odd send to next, even recv from prev, 1->2, 3->0
            pair = send_to_next_pair if not even else recv_from_prev_pair
806 807 808
            ring_id = self.pp_ring_map.get(
                pair[0] * 1000 + pair[1], max_ring_id + 1
            )  # 3->0 not in pp_ring_map
809 810 811
            self._init_pair_comm(pair, ring_id)
            if self.pp_rank != 0 and self.pp_rank != self.pp_degree - 1:
                my_pair.remove(pair)
812 813 814 815 816
            logger.info(
                "pair2(odd->even): pp pair:{}, ring_id: {}".format(
                    pair, ring_id
                )
            )
817 818 819

            # 4. odd recv from next, even send to prev, 2->1, 0->3
            pair = recv_from_next_pair if not even else send_to_prev_pair
820 821 822
            ring_id = self.pp_ring_map.get(
                pair[0] * 1000 + pair[1], max_ring_id + 2
            )  # 0->3 not in pp_ring_map
823 824 825
            self._init_pair_comm(pair, ring_id)
            if self.pp_rank != 0 and self.pp_rank != self.pp_degree - 1:
                my_pair.remove(pair)
826 827 828 829 830
            logger.info(
                "pair3(odd<-even): pp pair:{}, ring_id: {}".format(
                    pair, ring_id
                )
            )
831

832 833 834 835
        assert len(my_pair) == 0, (
            "Current pipeline does not support cross stage communication, "
            "please check unexpected pair {}".format(my_pair)
        )
836 837 838

    def _init_pipeline_comm(self, startup_block):
        # TODO (JZ-LIANG) to unify pp_rank_ and pp_rank
L
lilong12 已提交
839
        if os.getenv("PADDLE_MANUAL_PIPELINE_STAGE", None) is None:
840 841 842 843 844 845 846 847 848
            self._collective_helper._init_communicator(
                self._startup_program,
                self.current_endpoint,
                self.pp_group_endpoints,
                self.pp_rank,
                self.pp_ring_id,
                False,
                sync=False,
            )
849

850 851 852 853 854 855 856 857 858 859 860 861
        if core.is_compiled_with_npu():
            self._init_npu_pipeline_comm(startup_block)
            return

        # GPU
        for pair in self.pipeline_pair:
            pair_key = pair[0] * 1000 + pair[1]
            ring_id = self.pp_ring_map[pair_key]
            logger.info("pp pair:{}, ring_id: {}".format(pair, ring_id))
            if self.pp_rank in pair:
                self._init_pair_comm(pair, ring_id)

862
    def _init_comm(self):
863
        # sync var
864 865
        startup_block = self._startup_program.global_block()

866
        # mp ring
867
        if self.mp_degree > 1:
868 869 870 871 872 873 874 875 876
            self._collective_helper._init_communicator(
                self._startup_program,
                self.current_endpoint,
                self.mp_group_endpoints,
                self.mp_rank,
                self.mp_ring_id,
                False,
                sync=False,
            )
877

878
        # sharding ring
879 880 881 882 883 884 885 886
        if self.sharding_degree > 1:
            self._collective_helper._init_communicator(
                self._startup_program,
                self.current_endpoint,
                self.sharding_group_endpoints,
                self.sharding_rank,
                self.sharding_ring_id,
                False,
887 888
                sync=False,
            )
889

890 891
        # pp ring
        if self.pp_degree > 1:
892
            self._init_pipeline_comm(startup_block)
893 894

        # pure dp ring
895
        if self.dp_degree > 1:
896 897 898 899 900 901 902 903 904
            self._collective_helper._init_communicator(
                self._startup_program,
                self.current_endpoint,
                self.dp_group_endpoints,
                self.dp_rank,
                self.dp_ring_id,
                False,
                sync=False,
            )
905

906 907
        startup_block._sync_with_cpp()

908
    def _build_shard(self, params_grads, shard_rank, shard_size):
909 910
        # step 2: split params
        self._params = set([x[0].name for x in params_grads])
911
        self._shard.setup(params_grads, shard_rank, shard_size)
912 913 914

        # step 3: get broadcast vars
        self._broadcast_vars = self._shard.find_broadcast_params(
915 916
            self._main_program.global_block()
        )
917

918 919 920
    def _wait(
        self,
    ):
921 922 923
        endpoints = self.global_endpoints[:]
        current_endpoint = endpoints[self.global_rank]
        if self.global_rank == 0:
924 925
            self._collective_helper._wait(current_endpoint, endpoints)

926 927 928 929 930 931 932 933
    def collect_segment(self, segment, op_idx, block):
        segment._start_idx = op_idx + 1
        self._segments.insert(0, segment)
        new_segment = ProgramSegment(block)
        new_segment._end_idx = op_idx + 1

        return new_segment

934 935 936 937 938
    def _split_program(self, block):
        for op_idx, op in reversed(list(enumerate(block.ops))):
            if int(op.attr('op_role')) != int(OpRole.Optimize):
                last_backward_op_idx = op_idx + 1
                break
939 940

        var2broadcast_time = dict()
941 942 943 944
        segment = ProgramSegment(block)
        segment._end_idx = last_backward_op_idx
        for op_idx in reversed(range(last_backward_op_idx)):
            op = block.ops[op_idx]
945
            assert int(op.attr('op_role')) != int(OpRole.Optimize)
946 947 948 949 950 951 952 953 954 955 956 957 958
            if self._sharding_segment_strategy == "segment_broadcast_MB":
                if segment._param_mem >= self._broadcast_MB:
                    segment = self.collect_segment(segment, op_idx, block)

            elif self._sharding_segment_strategy == "segment_anchors":
                if int(op.attr('op_role')) == int(OpRole.Backward):
                    for input_name in op.desc.input_arg_names():

                        # NOTE (JZ-LIANG) naive rule to support amp, if amp change, should modify here accordingly
                        if self.user_defined_strategy.amp:
                            if ".cast_fp16@GRAD" not in input_name:
                                continue
                            else:
959 960 961
                                input_name = input_name[
                                    : input_name.find(".cast_fp16@GRAD")
                                ]
962 963

                        if input_name in self._backward_remain_anchors:
964
                            segment = self.collect_segment(
965 966 967 968 969 970 971
                                segment, op_idx, block
                            )
                            assert (
                                input_name not in self._forward_remain_anchors
                            ), "segment anchor [{}] met twice !".format(
                                input_name
                            )
972 973 974 975 976
                            self._backward_remain_anchors.remove(input_name)
                            self._forward_remain_anchors.append(input_name)
                elif int(op.attr('op_role')) == int(OpRole.Forward):
                    for output_name in op.desc.output_arg_names():
                        if output_name in self._forward_remain_anchors:
977
                            segment = self.collect_segment(
978 979
                                segment, op_idx, block
                            )
980
                            self._forward_remain_anchors.remove(output_name)
981 982 983 984 985 986 987 988 989 990 991 992 993 994

            # find broadcast vars
            for input_name in op.desc.input_arg_names():
                if input_name not in self._broadcast_vars:
                    continue
                if input_name in segment._param2broadcast:
                    # skip broadcast because it reuse the old broadcast var
                    broadcast_name = segment._param2broadcast[input_name]
                    if input_name != broadcast_name:
                        op._rename_input(input_name, broadcast_name)
                    continue
                if self._shard.has_param(input_name):
                    broadcast_var_name = input_name
                else:
995 996 997
                    broadcast_var_name = unique_name.generate(
                        input_name + "@BroadCast"
                    )
998
                    segment._fill_constant_vars.append(broadcast_var_name)
999 1000 1001 1002 1003

                # (JZ-LIANG) should use Param base name ?
                broadcast_var_base_name = input_name
                if "subprog" in broadcast_var_base_name:
                    # remove suffix
1004 1005 1006
                    broadcast_var_base_name = broadcast_var_base_name[
                        : broadcast_var_base_name.find(".subprog")
                    ]
1007

1008 1009 1010
                var2broadcast_time[broadcast_var_base_name] = (
                    var2broadcast_time.get(broadcast_var_base_name, 0) + 1
                )
1011

1012
                segment._param2broadcast[input_name] = broadcast_var_name
1013
                segment._broadcast_vars.append(
1014 1015
                    (broadcast_var_name, self._shard.device(input_name))
                )
1016
                segment._param_mem += get_var_size(
1017 1018
                    self._main_program.global_block().var(input_name)
                )
1019 1020

            # find reduce vars
1021 1022 1023 1024
            if self.pp_degree > 1 and self.pp_allreduce_in_optimize:
                # place pipeline gradient allreduce in optimize
                pass
            else:
1025
                if is_backward_op(op) and OP_ROLE_VAR_KEY in op.attr_names:
1026 1027 1028 1029
                    op_role_var = op.all_attrs()[OP_ROLE_VAR_KEY]
                    if len(op_role_var) != 0:
                        assert len(op_role_var) % 2 == 0
                        for i in range(0, len(op_role_var), 2):
1030 1031 1032 1033
                            param, reduced_grad = (
                                op_role_var[i],
                                op_role_var[i + 1],
                            )
1034
                            segment._allreduce_vars.append(reduced_grad)
1035 1036 1037
                            assert (
                                reduced_grad not in self._reduced_grads_to_param
                            )
1038
                            self._reduced_grads_to_param[reduced_grad] = param
1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049

            # find cast op
            if FP16Utils.is_fp16_cast_op(block, op, self._params):
                fp32_param = op.desc.input_arg_names()[0]
                fp16_param = op.desc.output_arg_names()[0]
                if self._shard.has_param(fp32_param):
                    segment._cast_ops[fp16_param] = fp32_param

        if segment._param_mem > 0:
            segment._start_idx = 0
            self._segments.insert(0, segment)
1050 1051

        if self._sharding_segment_strategy == "segment_anchors":
1052 1053 1054 1055 1056 1057
            assert (
                len(self._forward_remain_anchors) == 0
            ), "remain anchors {}".format(self._forward_remain_anchors)
            assert (
                len(self._backward_remain_anchors) == 0
            ), "remain anchors {}".format(self._backward_remain_anchors)
1058 1059

        if self._verbose:
1060 1061 1062 1063 1064 1065 1066 1067
            for varname in sorted(
                var2broadcast_time, key=var2broadcast_time.get, reverse=True
            ):
                logger.info(
                    "Sharding broadcast: [{}] times [{}]".format(
                        var2broadcast_time[varname], varname
                    )
                )
1068
            for idx_ in range(len(self._segments)):
1069
                logger.info("segment [{}] :".format(idx_))
1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085
                logger.info(
                    "start op: [{}]  [{}]".format(
                        block.ops[self._segments[idx_]._start_idx].desc.type(),
                        block.ops[
                            self._segments[idx_]._start_idx
                        ].desc.input_arg_names(),
                    )
                )
                logger.info(
                    "end   op: [{}]  [{}]".format(
                        block.ops[self._segments[idx_]._end_idx].desc.type(),
                        block.ops[
                            self._segments[idx_]._end_idx
                        ].desc.input_arg_names(),
                    )
                )
1086 1087
        return

1088
    def _prune_main_program(self, block, shard, rings):
1089 1090 1091
        """
        calculate deps from allredce op to optimize op,
        remove ops and vars not needed in this worker
1092 1093 1094 1095 1096

        1. prune regularization (weight decay)
        2. prune cast_fp32_to_fp16; update amp_infine_checking
        3. prune gradient_clip related; update global_norm_sum
        4. prune optimizer op + param + gradient
1097

1098 1099
        """
        weightdecay_helper = WeightDecayHelper()
1100
        weightdecay_helper.prune_weight_decay(block, shard)
1101 1102

        # FIXME(wangxi): mp should prune duplicated param_grads
1103 1104 1105
        # NOTE (JZ-LIANG) the sync of FoundInfinite should among one entire Model Parallelism
        # group. and each Data Parallelism group should have its own sync of FoundInfinite
        # amp could use global group for sync
1106
        FP16Utils.prune_fp16(block, shard, self._reduced_grads_to_param, rings)
1107

1108
        # clipbyglobalnorm should only use the Model paramllelism group (mp-sharding-pp)
1109
        gradientclip_helper = GradientClipHelper(None)
1110
        gradientclip_helper.prune_gradient_clip(block, shard, rings)
1111 1112 1113 1114 1115 1116

        # build prog deps
        reduced_grads = []
        for idx, op in enumerate(block.ops):
            input_names = op.desc.input_arg_names()
            output_names = op.desc.output_arg_names()
1117
            # FIXME(wangxi): need use grads, pipeline grad is @GRAD@MERGE
1118 1119 1120 1121 1122
            if (
                op.type == "c_allreduce_sum"
                and op.attr('use_model_parallel') is False
            ):
                assert len(output_names) == 1
1123 1124 1125
                output_name = output_names[0]
                reduced_grads.append(output_name)

1126
        # prune optimizer state and param
1127 1128
        pruned_opti_vars = []
        for var_name in list(block.vars.keys()):
1129
            if shard.is_opti_var(var_name) and not shard.has_opt_var(var_name):
1130 1131 1132 1133 1134 1135 1136 1137 1138 1139
                pruned_opti_vars.append(var_name)
        program_deps = ProgramDeps(block, reduced_grads, pruned_opti_vars)

        # Init
        for var_name in program_deps._end_vars:
            program_deps._should_removed_var.add(var_name)

        # Prune
        for idx, op in reversed(list(enumerate(block.ops))):
            if op.type in [
1140 1141 1142 1143
                "c_allreduce_sum",
                "c_sync_comm_stream",
                "c_calc_comm_stream",
                "c_gen_nccl_id",
1144
                "c_gen_bkcl_id",
1145 1146 1147
                "c_comm_init",
                'send_v2',
                'recv_v2',
1148 1149 1150
            ]:
                pass
            elif op.type == "conditional_block":
1151
                assert op.desc.has_attr("sub_block")
1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166
                subblock_idx = op.desc.attr("sub_block").id
                subblock_deps = program_deps.get_sub_block_deps(subblock_idx)
                # only prune amp subblock
                if subblock_deps is None or not self._is_amp_subblock(op):
                    continue
                # init
                reversed_output_vars = []
                for output_name in op.desc.output("Out"):
                    if output_name in program_deps._should_removed_var:
                        subblock_deps._should_removed_var.add(output_name)
                        program_deps.crop_output_var_from_op(idx, output_name)
                    else:
                        reversed_output_vars.append(output_name)
                # prune
                for sub_op_idx, _ in reversed(
1167 1168
                    list(enumerate(subblock_deps._block.ops))
                ):
1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179
                    if subblock_deps.should_remove_op(sub_op_idx):
                        subblock_deps.remove_op(sub_op_idx)
                reversed_input_vars = []
                for input_name in op.desc.input('Input'):
                    if input_name not in subblock_deps._should_removed_var:
                        reversed_input_vars.append(input_name)
                    else:
                        program_deps.crop_input_var_from_op(idx, input_name)
                op.desc.set_input('Input', reversed_input_vars)
                op.desc.set_output('Out', reversed_output_vars)
            else:
1180 1181
                # if all outputs of this op are in _should_removed_var
                # _should_removed_var: opt state not cur shard
1182
                if program_deps.should_remove_op(idx):
1183
                    # NOTE(wangxi): need reserve all param in optimizer_sharding
1184 1185 1186
                    reserved_vars = (
                        self._params if self._optimizer_sharding else None
                    )
1187
                    program_deps.remove_op(idx, reserved_vars)
1188

1189
        # NOTE (JZ-LIANG) revise and unify logic here
1190
        # sharding support fp16_allreduce logic
1191 1192 1193 1194 1195 1196
        block._sync_with_cpp()
        for idx, op in reversed(list(enumerate(block.ops))):
            if op.type == 'concat' and is_optimizer_op(op):
                # remove inputs that not on this card
                reserved_x = []
                for var_name in op.desc.input("X"):
1197 1198
                    if block.has_var(var_name):
                        reserved_x.append(var_name)
1199
                op.desc.set_input('X', reserved_x)
1200 1201 1202 1203 1204
        block._sync_with_cpp()
        return

    def _add_broadcast_allreduce(self, block):
        """
1205 1206
        add broadcast allreduce op
        if enable gradient_merge, insert related ops
1207

1208
        if combined with pipeline(grad accumulate),
1209
        the grad allreduce should be done in optimize role
1210 1211 1212
        """
        if len(self._segments) < 1:
            return
1213
        # sharding
1214 1215 1216 1217 1218 1219 1220
        if self.pp_degree > 1 and self.pp_allreduce_in_optimize:
            for idx in range(len(self._segments)):
                assert len(self._segments[idx]._allreduce_vars) == 0

        # NOTE (JZ-LIANG) revise and unify logic here
        # fix the _end_idx for segments[-1] if pp is used.
        new_end_idx = self._segments[-1]._end_idx
1221 1222 1223 1224 1225
        for idx in range(
            self._segments[-1]._end_idx - 1,
            self._segments[-1]._start_idx - 1,
            -1,
        ):
1226 1227
            op = block.ops[idx]
            if op.type == "fill_constant" or op.type == "sum":
1228 1229
                if "MERGED" in op.output_arg_names[0]:
                    new_end_idx = idx + 1
1230
            elif op.type == "cast":
1231 1232
                if "@TMP" in op.output_arg_names[0]:
                    new_end_idx = idx + 1
1233 1234
        self._segments[-1]._end_idx = new_end_idx

1235
        if self._segments[-1]._allreduce_vars:
1236
            shard_allredue_vars = self._shard.filter_grads(
1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253
                self._segments[-1]._allreduce_vars
            )
            if (
                self.gradient_merge_mode != "sharding_gm"
                or self._gradient_merge_acc_step <= 1
            ):
                if (
                    self.hybrid_dp
                    and self.hybrid_dp_mode == "sharding_hybrid_dp"
                    and len(shard_allredue_vars) >= 1
                ):
                    insert_sync_comm_ops(
                        block,
                        self._segments[-1]._end_idx,
                        self.dp_ring_id,
                        shard_allredue_vars,
                    )
1254 1255 1256 1257 1258
                    insert_allreduce_ops(
                        block,
                        self._segments[-1]._end_idx,
                        self.dp_ring_id,
                        shard_allredue_vars,
1259 1260
                        user_defined_strategy=self.user_defined_strategy,
                    )
1261
            # gradient merge
1262 1263 1264 1265
            elif (
                self.gradient_merge_mode == "sharding_gm"
                and self._gradient_merge_acc_step > 1
            ):
1266
                self.create_persistable_gradients_and_insert_merge_ops(
1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279
                    block,
                    self._startup_program.global_block(),
                    self._segments[-1]._end_idx,
                    shard_allredue_vars,
                    self._shard,
                )

            insert_sync_comm_ops(
                block,
                self._segments[-1]._end_idx,
                self.sharding_ring_id,
                self._segments[-1]._allreduce_vars,
            )
1280
            # allreduce --> reduce
1281 1282 1283 1284 1285 1286 1287 1288 1289
            insert_reduce_ops(
                block,
                self._segments[-1]._end_idx,
                self.sharding_ring_id,
                self._segments[-1]._allreduce_vars,
                self._shard,
                op_role=OpRole.Backward,
                use_calc_stream=False,
            )
1290 1291

        for idx, segment in reversed(list(enumerate(self._segments))):
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309
            allreduce_vars = (
                self._segments[idx - 1]._allreduce_vars if idx > 0 else []
            )
            broadcast_vars = (
                self._segments[idx + 1]._broadcast_vars
                if idx < len(self._segments) - 1
                else []
            )
            fill_constant_vars = (
                self._segments[idx + 2]._fill_constant_vars
                if idx < len(self._segments) - 2
                else []
            )
            cast_ops = (
                self._segments[idx + 2]._cast_ops
                if idx < len(self._segments) - 2
                else {}
            )
1310 1311 1312 1313

            for op_idx in reversed(range(segment._start_idx, segment._end_idx)):
                op = block.ops[op_idx]
                for input_name in op.desc.input_arg_names():
1314 1315 1316 1317 1318 1319 1320
                    if (
                        input_name in segment._param2broadcast
                        and input_name != segment._param2broadcast[input_name]
                    ):
                        op._rename_input(
                            input_name, segment._param2broadcast[input_name]
                        )
1321 1322 1323 1324 1325

            for param_name, broadcast_name in segment._param2broadcast.items():
                if param_name != broadcast_name:
                    block.create_var(
                        name=broadcast_name,
1326 1327 1328 1329 1330 1331 1332 1333
                        shape=self._main_program.global_block()
                        .var(param_name)
                        .shape,
                        dtype=self._main_program.global_block()
                        .var(param_name)
                        .dtype,
                        persistable=False,
                    )
1334 1335 1336

            # step1: remove cast ops
            block._sync_with_cpp()
1337
            segment._end_idx += FP16Utils.remove_cast_op(
1338 1339
                block, self._params, segment, 0
            )
1340 1341

            # step2: add Sync ops
1342 1343
            shard_allredue_vars = self._shard.filter_grads(allreduce_vars)

1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358
            if (
                self.gradient_merge_mode != "sharding_gm"
                or self._gradient_merge_acc_step <= 1
            ):
                if (
                    self.hybrid_dp
                    and self.hybrid_dp_mode == "sharding_hybrid_dp"
                    and len(shard_allredue_vars) >= 1
                ):
                    insert_sync_comm_ops(
                        block,
                        segment._end_idx,
                        self.dp_ring_id,
                        shard_allredue_vars,
                    )
1359 1360 1361

                    broad_cast_vars = [x[0] for x in broadcast_vars]
                    if len(broad_cast_vars) > 0:
1362 1363 1364 1365 1366 1367
                        insert_sync_comm_ops(
                            block,
                            segment._end_idx,
                            self.sharding_ring_id,
                            broad_cast_vars,
                        )
1368 1369 1370 1371 1372
                else:
                    comm_dep_vars = allreduce_vars + [
                        x[0] for x in broadcast_vars
                    ]
                    if len(comm_dep_vars) > 0:
1373 1374 1375 1376 1377 1378
                        insert_sync_comm_ops(
                            block,
                            segment._end_idx,
                            self.sharding_ring_id,
                            comm_dep_vars,
                        )
1379
            # gradient merge
1380 1381 1382 1383
            elif (
                self.gradient_merge_mode == "sharding_gm"
                and self._gradient_merge_acc_step > 1
            ):
1384 1385
                broad_cast_vars = [x[0] for x in broadcast_vars]
                if len(broad_cast_vars) > 0:
1386 1387 1388 1389 1390 1391
                    insert_sync_comm_ops(
                        block,
                        segment._end_idx,
                        self.sharding_ring_id,
                        broad_cast_vars,
                    )
1392

1393 1394 1395 1396 1397
            calc_dep_vars = (
                fill_constant_vars
                + [k for k, v in cast_ops.items()]
                + self._segments[idx]._allreduce_vars
            )
1398 1399

            if len(calc_dep_vars) > 0:
1400 1401 1402
                insert_sync_calc_op(
                    block, segment._end_idx, [calc_dep_vars[-1]]
                )
1403

1404
            # step3: insert `fill_constant` ops
1405 1406 1407
            insert_fill_constant_ops(
                block, segment._end_idx, fill_constant_vars
            )
1408

1409
            # step4: add `cast` ops
1410 1411 1412
            insert_cast_ops(block, segment._end_idx, cast_ops)

            # step5: add broadcast ops
1413
            # gradient merge
1414 1415 1416 1417
            if (
                self.gradient_merge_mode == "sharding_gm"
                and self._gradient_merge_acc_step > 1
            ):
1418
                self.create_persistable_gradients_and_insert_merge_ops(
1419 1420 1421 1422 1423 1424
                    block,
                    self._startup_program.global_block(),
                    segment._start_idx,
                    shard_allredue_vars,
                    self._shard,
                )
1425

1426 1427 1428
            insert_broadcast_ops(
                block, segment._start_idx, self.sharding_ring_id, broadcast_vars
            )
1429

1430
            # step6: add all_reduce ops
1431
            # dp
1432 1433 1434 1435 1436 1437 1438 1439 1440
            if (
                self.gradient_merge_mode != "sharding_gm"
                or self._gradient_merge_acc_step <= 1
            ):
                if (
                    self.hybrid_dp
                    and self.hybrid_dp_mode == "sharding_hybrid_dp"
                    and len(shard_allredue_vars) >= 1
                ):
1441 1442 1443 1444 1445
                    insert_allreduce_ops(
                        block,
                        segment._start_idx,
                        self.dp_ring_id,
                        shard_allredue_vars,
1446 1447 1448 1449 1450 1451 1452 1453
                        user_defined_strategy=self.user_defined_strategy,
                    )
                    insert_sync_comm_ops(
                        block,
                        segment._start_idx,
                        self.sharding_ring_id,
                        allreduce_vars,
                    )
1454
            # gradient merge
1455 1456 1457 1458 1459 1460 1461 1462 1463 1464
            elif (
                self.gradient_merge_mode == "sharding_gm"
                and self._gradient_merge_acc_step > 1
            ):
                insert_sync_comm_ops(
                    block,
                    segment._start_idx,
                    self.sharding_ring_id,
                    allreduce_vars,
                )
1465
            # sharding
1466
            # allreduce --> reduce
1467 1468
            # TODO temp change
            if len(allreduce_vars) > 0:
1469 1470 1471 1472 1473 1474 1475 1476 1477
                insert_reduce_ops(
                    block,
                    segment._start_idx,
                    self.sharding_ring_id,
                    allreduce_vars,
                    self._shard,
                    op_role=OpRole.Backward,
                    use_calc_stream=False,
                )
1478 1479 1480 1481

            block._sync_with_cpp()

        if self._segments[0]._broadcast_vars:
1482
            broadcast_vars = [x[0] for x in self._segments[0]._broadcast_vars]
1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494
            insert_sync_comm_ops(
                block,
                self._segments[0]._start_idx,
                self.sharding_ring_id,
                broadcast_vars,
            )
            insert_broadcast_ops(
                block,
                self._segments[0]._start_idx,
                self.sharding_ring_id,
                self._segments[0]._broadcast_vars,
            )
1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507

        fill_constant_vars = []
        for x in self._segments[:2]:
            fill_constant_vars += x._fill_constant_vars

        # Join
        cast_ops = {}
        for x in self._segments[:2]:
            for k, v in x._cast_ops.items():
                cast_ops[k] = v

        calc_deps_vars = fill_constant_vars + [k for k, v in cast_ops.items()]
        if fill_constant_vars or cast_ops:
1508 1509 1510
            insert_sync_calc_op(
                block, self._segments[0]._start_idx, [calc_deps_vars[-1]]
            )
1511 1512

        if fill_constant_vars:
1513 1514 1515
            insert_fill_constant_ops(
                block, self._segments[0]._start_idx, fill_constant_vars
            )
1516 1517 1518 1519 1520 1521

        if cast_ops:
            insert_cast_ops(block, self._segments[0]._start_idx, cast_ops)

        return

1522
    def _prune_startup_program(self, block, shard):
1523 1524
        for idx, op in reversed(list(enumerate(block.ops))):
            for output_name in op.desc.output_arg_names():
1525 1526 1527
                if shard.has_var(output_name):
                    continue
                if self._optimizer_sharding and shard.is_param(output_name):
1528
                    continue
1529
                # TODO why do we remove op, when only one var is removed
1530 1531 1532 1533
                block._remove_op(idx, sync=False)
                break

        for var_name in list(block.vars.keys()):
1534 1535 1536
            if shard.has_var(var_name):
                continue
            if self._optimizer_sharding and shard.is_param(var_name):
1537 1538 1539
                continue
            block._remove_var(var_name, sync=False)
        block._sync_with_cpp()
1540

1541
    def _build_groups(self):
1542 1543
        """
        pre-assign ring ids
1544 1545 1546 1547
            mp: 0
            sharding: 1
            pure-dp: 2
            global: 3
W
WangXi 已提交
1548 1549
            pp: 4
            pp-pair: >= 20
1550
        if one parallelism is not enable: -1
1551
        and only support parallelism hierarchy: mp --> sharding --> pp --> dp
1552 1553 1554 1555 1556 1557
        """
        # step 1: initialize nccl
        self.global_word_size = self.role_maker._worker_num()
        self.global_rank = self.role_maker._worker_index()
        self.global_endpoints = self.role_maker._get_trainer_endpoints()
        self.current_endpoint = self.global_endpoints[self.global_rank]
1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580
        self._collective_helper = CollectiveHelper(
            self.role_maker, nrings=self._nrings_sharding
        )
        assert (
            self.global_word_size % self.mp_degree == 0
        ), "global_word_size: {} should be divisible to the mp_degree: {}".format(
            self.global_word_size, self.mp_degree
        )
        assert (
            self.global_word_size % self.sharding_degree == 0
        ), "global_word_size: {} should be divisible to the sharding_degree: {}".format(
            self.global_word_size, self.sharding_degree
        )
        assert (
            self.global_word_size % self.pp_degree == 0
        ), "global_word_size: {} should be divisible to the pp_degree: {}".format(
            self.global_word_size, self.pp_degree
        )
        assert (
            self.global_word_size % self.dp_degree == 0
        ), "global_word_size: {} should be divisible to the dp_degree: {}".format(
            self.global_word_size, self.dp_degree
        )
1581 1582 1583 1584 1585 1586 1587

        # mp group
        if self.mp_degree > 1:
            self.mp_ring_id = 0
            self.mp_rank = self.global_rank % self.mp_degree
            self.mp_group_id = self.global_rank // self.mp_degree
            self.mp_group_endpoints = [
1588 1589
                ep
                for idx, ep in enumerate(self.global_endpoints)
1590
                if idx // self.mp_degree == self.mp_group_id
1591
            ]
1592
            assert self.current_endpoint in self.mp_group_endpoints
1593 1594 1595 1596 1597
            assert (
                len(self.mp_group_endpoints) == self.mp_degree
            ), "num of mp worker in group is [{}], but mp group size is [{}]".format(
                len(self.mp_group_endpoints), self.mp_degree
            )
1598 1599 1600 1601 1602 1603 1604
        else:
            self.mp_degree = 1
            self.mp_ring_id = -1
            self.mp_rank = -1
            self.mp_group_id = -1
            self.mp_group_endpoints = []

1605
        # sharding
1606 1607
        if self.sharding_degree > 1:
            self.sharding_ring_id = 1
1608 1609 1610 1611 1612 1613
            self.sharding_rank = (
                self.global_rank // self.mp_degree
            ) % self.sharding_degree
            self.sharding_group_id = self.global_rank // (
                self.mp_degree * self.sharding_degree
            )
1614 1615 1616
            # mp + sharding + ...
            if self.mp_degree > 1:
                self.sharding_group_endpoints = [
1617 1618 1619 1620 1621
                    ep
                    for idx, ep in enumerate(self.global_endpoints)
                    if (idx // (self.mp_degree * self.sharding_degree))
                    == self.sharding_group_id
                    and idx % self.mp_degree == self.mp_rank
1622
                ]
1623
            # sharding + ...
1624 1625
            else:
                self.sharding_group_endpoints = [
1626 1627 1628 1629
                    ep
                    for idx, ep in enumerate(self.global_endpoints)
                    if (idx // (self.mp_degree * self.sharding_degree))
                    == self.sharding_group_id
1630 1631 1632 1633 1634 1635 1636 1637 1638
                ]
            assert self.current_endpoint in self.sharding_group_endpoints
        else:
            self.sharding_degree = 1
            self.sharding_ring_id = -1
            self.sharding_rank = -1
            self.sharding_group_id = -1
            self.sharding_group_endpoints = []

1639 1640
        # pp
        if self.pp_degree > 1:
1641 1642 1643
            self.pp_pair_ring_id = 20
            # pipeline global ring_id set to 4 for sharding0, mp1, dp2, global3
            self.pp_ring_id = 4
1644 1645 1646 1647 1648
            self.pp_rank = (
                self.global_rank
                // (self.sharding_degree * self.mp_degree)
                % self.pp_degree
            )
1649 1650
            # (NOTE): Already adjust for (outter-pure) dp
            self.pp_group_id = self.global_rank // (
1651 1652
                self.mp_degree * self.sharding_degree * self.pp_degree
            )
1653
            pp_first_stage_idx = self.global_rank % (
1654 1655 1656 1657
                self.sharding_degree * self.mp_degree
            ) + self.pp_group_id * (
                self.mp_degree * self.sharding_degree * self.pp_degree
            )
1658 1659 1660
            pp_stage_offset = self.sharding_degree * self.mp_degree
            self.pp_group_endpoints = []
            for i in range(self.pp_degree):
1661
                self.pp_group_endpoints.append(
1662 1663 1664 1665
                    self.global_endpoints[
                        pp_first_stage_idx + pp_stage_offset * i
                    ]
                )
1666 1667 1668
            assert self.current_endpoint in self.pp_group_endpoints
        else:
            self.pp_ring_id = -1
1669 1670
            self.pp_degree = 1
            self.pp_pair_ring_id = -1
1671 1672 1673 1674
            self.pp_rank = -1
            self.pp_group_id = -1
            self.pp_group_endpoints = []

1675 1676 1677
        # outter-pure-dp group
        # NOTE (JZ-LIANG) support outter-pure-dp to scale the throughput in 3D parallelism
        # e.g. mp-sharding-pp-dp
1678
        # sharding-hybrid-dp as one senario of outter-pure-dp
L
lilong12 已提交
1679 1680
        local_pp_degree = self.pp_degree
        if os.getenv("PADDLE_MANUAL_PIPELINE_STAGE", None):
1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692
            assert self.pp_degree == 2, (
                "For manually set pipeline, only " "pp_degree = 2 is supported."
            )
            assert (
                self.global_word_size
                == self.mp_degree * self.sharding_degree * self.dp_degree
            ), "global work size [{}], mp_degree [{}], sharding_degree [{}], dp_degree [{}].".format(
                self.global_word_size,
                self.mp_degree,
                self.sharding_degree,
                self.dp_degree,
            )
L
lilong12 已提交
1693 1694
            local_pp_degree = 1
        else:
1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707
            assert (
                self.global_word_size
                == self.mp_degree
                * self.sharding_degree
                * self.pp_degree
                * self.dp_degree
            ), "mp_degree: [{}], sharding_degree: [{}], pp_degree: [{}], dp_degree: [{}]; BUT global nrank: [{}]".format(
                self.mp_degree,
                self.sharding_degree,
                self.pp_degree,
                self.dp_degree,
                self.global_word_size,
            )
1708

1709 1710
        if self.dp_degree > 1:
            self.dp_ring_id = 2
L
lilong12 已提交
1711
            self.dp_rank = self.global_rank // (
1712 1713
                self.sharding_degree * self.mp_degree * local_pp_degree
            )
1714
            dp_first_rank_idx = self.global_rank % (
1715 1716 1717
                self.sharding_degree * self.mp_degree * local_pp_degree
            )
            dp_offset = self.sharding_degree * self.mp_degree * local_pp_degree
1718 1719
            self.dp_group_endpoints = []
            for i in range(self.dp_degree):
1720
                self.dp_group_endpoints.append(
1721 1722
                    self.global_endpoints[dp_first_rank_idx + dp_offset * i]
                )
1723
            assert self.current_endpoint in self.dp_group_endpoints
1724
            logger.info("Hybrid DP mode turn on !")
1725 1726 1727
        else:
            self.dp_ring_id = -1
            self.dp_rank = -1
1728
            self.dp_group_endpoints = []
1729

1730
        # global group
1731 1732
        # use for gen_nccl_comm_sync, amp check nan inf, clip by global norm
        # NOTE (JZ-LIANG) when use global ring for calc global norm and dp_degree > 1, the allreduce result should be devided by dp_degree
1733
        self.global_ring_id = 3
1734

1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750
        logger.info("global word size: {}".format(self.global_word_size))
        logger.info("global rank: {}".format(self.global_rank))
        logger.info("global endpoints: {}".format(self.global_endpoints))
        logger.info("global ring id: {}".format(self.global_ring_id))
        logger.info("#####" * 6)

        logger.info("mp group size: {}".format(self.mp_degree))
        logger.info("mp rank: {}".format(self.mp_rank))
        logger.info("mp group id: {}".format(self.mp_group_id))
        logger.info("mp group endpoints: {}".format(self.mp_group_endpoints))
        logger.info("mp ring id: {}".format(self.mp_ring_id))
        logger.info("#####" * 6)

        logger.info("sharding group size: {}".format(self.sharding_degree))
        logger.info("sharding rank: {}".format(self.sharding_rank))
        logger.info("sharding group id: {}".format(self.sharding_group_id))
1751 1752 1753
        logger.info(
            "sharding group endpoints: {}".format(self.sharding_group_endpoints)
        )
1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765
        logger.info("sharding ring id: {}".format(self.sharding_ring_id))
        logger.info("#####" * 6)

        logger.info("pp group size: {}".format(self.pp_degree))
        logger.info("pp rank: {}".format(self.pp_rank))
        logger.info("pp group id: {}".format(self.pp_group_id))
        logger.info("pp group endpoints: {}".format(self.pp_group_endpoints))
        logger.info("pp ring id: {}".format(self.pp_ring_id))
        logger.info("#####" * 6)

        logger.info("pure dp group size: {}".format(self.dp_degree))
        logger.info("pure dp rank: {}".format(self.dp_rank))
1766 1767 1768
        logger.info(
            "pure dp group endpoints: {}".format(self.dp_group_endpoints)
        )
1769 1770
        logger.info("pure dp ring id: {}".format(self.dp_ring_id))
        logger.info("#####" * 6)
1771 1772

        return
1773

1774 1775 1776 1777 1778 1779 1780
    def _recreate_not_persist_param_as_var(self):
        def recreate_not_persist_param_as_var(program):
            block = program.global_block()
            params = block.all_parameters()
            for param in params:
                if param.persistable:
                    continue
1781

1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796
                name = param.name
                shape = param.shape
                dtype = param.dtype
                type = param.type
                lod_level = param.lod_level
                stop_gradient = param.stop_gradient
                trainable = param.trainable
                optimize_attr = param.optimize_attr
                regularizer = param.regularizer
                have_dist_attr = False
                is_distributed = False
                if hasattr(param, 'is_distributed'):
                    have_dist_attr = True
                    is_distributed = param.is_distributed

1797
                block._remove_var(name, sync=False)
1798 1799 1800 1801 1802 1803 1804 1805 1806 1807
                var = block.create_var(
                    name=name,
                    shape=shape,
                    dtype=dtype,
                    type=type,
                    lod_level=lod_level,
                    stop_gradient=stop_gradient,
                    trainable=trainable,
                    persistable=False,
                )
1808 1809 1810
                if have_dist_attr:
                    var.is_distributed = is_distributed

1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835
            block._sync_with_cpp()

        recreate_not_persist_param_as_var(self._startup_program)
        recreate_not_persist_param_as_var(self._main_program)

    def _initialization_broadcast(self):
        """
        this funtion is to ensure the initialization between dp group to be
        identical when hybrid-dp is used, and the initialization of
        not distributed param between mp group to be identical.
        """
        if self.dp_degree <= 1 and self.mp_degree <= 1:
            return

        startup_block = self._startup_program.global_block()

        params = startup_block.all_parameters()
        params_name = []
        not_dist_param_name = set()

        for param in params:
            params_name.append(param.name)
            if not hasattr(param, 'is_distributed') or not param.is_distributed:
                not_dist_param_name.add(param.name)

1836 1837 1838 1839 1840 1841 1842
        # offload and optimize_cast will insert broadcast op
        broadcast_params = set()
        for op in startup_block.ops:
            if op.type == 'c_broadcast':
                broadcast_params.add(op.desc.output_arg_names()[0])

        for param in params_name:
1843 1844
            if param in broadcast_params:
                continue
1845 1846 1847 1848 1849 1850 1851 1852 1853

            rings = []
            # need sync not distributed param in mp group
            if self.mp_degree > 1 and param in not_dist_param_name:
                rings.append(self.mp_ring_id)
            if self.dp_degree > 1:
                rings.append(self.dp_ring_id)

            for ring in rings:
1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864
                startup_block.append_op(
                    type='c_broadcast',
                    inputs={'X': param},
                    outputs={'Out': param},
                    attrs={
                        'ring_id': ring,
                        'root': 0,
                        'use_calc_stream': True,
                        OP_ROLE_KEY: OpRole.Forward,
                    },
                )
1865

1866 1867
        startup_block._sync_with_cpp()

1868 1869
    # sharding gradient merge
    def create_persistable_gradients_and_insert_merge_ops(
1870 1871
        self, main_block, startup_block, insert_idx, grad_names, shard
    ):
1872 1873

        for grad_name in grad_names:
1874 1875 1876 1877 1878
            assert (
                get_grad_device(grad_name, shard) == shard.worker_idx
            ), "try to merge gradient not belong to current shard: [{}]".format(
                grad_name
            )
1879
            persistable_grad_name = grad_name + '@GradiantMerge'
1880 1881 1882 1883 1884
            assert (
                grad_name not in self._grad2merged_grad
            ), "grad [{}] already in grad2merged_grad, maybe you meet sharing weight case !".format(
                grad_name
            )
1885 1886 1887 1888 1889 1890 1891
            self._grad2merged_grad[grad_name] = persistable_grad_name
            grad_var = main_block.var(grad_name)
            # create var
            gradient_merge_var = main_block.create_var(
                name=persistable_grad_name,
                shape=grad_var.shape,
                dtype=grad_var.dtype,
1892 1893
                persistable=True,
            )
1894 1895 1896 1897
            startup_gradient_merge_var = startup_block.create_var(
                name=persistable_grad_name,
                shape=grad_var.shape,
                dtype=grad_var.dtype,
1898 1899
                persistable=True,
            )
1900 1901 1902 1903 1904

            # merge gradient
            main_block._insert_op_without_sync(
                insert_idx,
                type="elementwise_add",
1905
                inputs={'X': grad_name, 'Y': gradient_merge_var},
1906 1907 1908 1909
                outputs={'Out': gradient_merge_var},
                attrs={
                    'axis': -1,
                    'use_mkldnn': False,
1910 1911 1912
                    OP_ROLE_KEY: OpRole.Backward,
                },
            )
1913 1914

            # startup initialization
1915 1916 1917 1918 1919 1920 1921 1922 1923
            startup_block.append_op(
                type="fill_constant",
                outputs={"Out": startup_gradient_merge_var},
                attrs={
                    "shape": grad_var.shape,
                    "dtype": grad_var.dtype,
                    "value": float(0),
                },
            )
1924 1925 1926 1927 1928 1929

        main_block._sync_with_cpp()
        startup_block._sync_with_cpp()

    def _create_gm_cond(self, main_block):
        # Add const var
W
wangxiaoning 已提交
1930
        acc_step_var = create_global_var(
1931 1932 1933 1934 1935
            name="gradient_merge_acc_step",
            shape=[1],
            value=int(self._gradient_merge_acc_step),
            dtype='int32',
            persistable=True,
1936 1937
            force_cpu=True,
        )
1938

W
wangxiaoning 已提交
1939
        zero_var = create_global_var(
1940 1941 1942 1943 1944 1945 1946
            name="gradient_merge_zero",
            shape=[1],
            value=int(0),
            dtype='int32',
            persistable=True,
            force_cpu=True,
        )
1947 1948

        # Add step var & cond var
W
wangxiaoning 已提交
1949
        current_step_var = create_global_var(
1950 1951 1952 1953 1954
            name="gradient_merge_current_step",
            shape=[1],
            value=int(0),
            dtype='int32',
            persistable=True,
1955 1956
            force_cpu=True,
        )
1957

1958 1959 1960
        cond_var = main_block.create_var(
            name="gradient_merge_cond", shape=[1], dtype='bool'
        )
1961 1962 1963

        with device_guard("cpu"):
            # step_var = (step_var + 1) % k_step
1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980
            main_block.append_op(
                type='increment',
                inputs={'X': [current_step_var]},
                outputs={'Out': [current_step_var]},
                attrs={'step': float(1), OP_ROLE_KEY: OpRole.Optimize},
            )

            main_block.append_op(
                type='elementwise_mod',
                inputs={'X': current_step_var, 'Y': acc_step_var},
                outputs={'Out': current_step_var},
                attrs={
                    'axis': -1,
                    OP_ROLE_KEY: OpRole.Optimize,
                    'use_mkldnn': False,
                },
            )
1981 1982

            # cond_var = (step_var == 0)
1983 1984 1985 1986 1987 1988
            main_block.append_op(
                type='equal',
                inputs={'X': current_step_var, 'Y': zero_var},
                outputs={'Out': cond_var},
                attrs={OP_ROLE_KEY: OpRole.Optimize},
            )
1989 1990 1991 1992 1993 1994 1995 1996 1997
        # paddle.static.Print(current_step_var, message="in FWBW last conditional")
        return cond_var

    def _true_apply_gradient(self):
        """
        allreduce grad@gradientmerge in dp group
        grad@gradientmerge / acc_step
        re-create all optimize ops of origin main block and rename them
            cast(backward)
1998
            amp
1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012
            clip
            opt
        # fill constant grad@gradientmerge

        """
        # current conditional block
        main_block = self._main_program.global_block()
        cur_block_idx = self._main_program.current_block_idx
        cur_block = self._main_program.current_block()
        self.cond_block = self._main_program.current_block()

        # cur_block's forward_block & backward_block is itself
        cur_block._set_forward_block_idx(cur_block_idx)

2013
        # allreduce grad@gradientmerge
2014
        if self.hybrid_dp:
2015 2016 2017
            assert (
                self.dp_ring_id >= 0
            ), "dp_ring_id should larger than 0 when in sharding&DP mode"
2018 2019
            for grad, merged_grad in self._grad2merged_grad.items():
                merged_grad_var = main_block.var(merged_grad)
2020 2021 2022 2023 2024 2025 2026 2027 2028 2029
                cur_block.append_op(
                    type='c_allreduce_sum',
                    inputs={'X': merged_grad_var},
                    outputs={'Out': merged_grad_var},
                    attrs={
                        'ring_id': self.dp_ring_id,
                        'use_calc_stream': True,
                        OP_ROLE_KEY: OpRole.Optimize,
                    },
                )
2030 2031 2032 2033 2034

        # grad@gradientmerge / acc_step
        for grad, merged_grad in self._grad2merged_grad.items():
            # grad /= k_steps
            merged_grad_var = main_block.var(merged_grad)
2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045
            cur_block.append_op(
                type='scale',
                inputs={'X': merged_grad_var},
                outputs={'Out': merged_grad_var},
                attrs={
                    'scale': 1.0 / float(self._gradient_merge_acc_step),
                    'bias': 0.0,
                    'bias_after_scale': False,
                    OP_ROLE_KEY: OpRole.Optimize,
                },
            )
2046 2047 2048 2049 2050 2051 2052 2053 2054 2055

        # re-create optimize ops
        already_moved_var_names = []
        for op_desc in self.original_optimize_ops_desc:
            new_op_desc = cur_block.desc.append_op()
            new_op_desc.copy_from(op_desc)

            for input_name in new_op_desc.input_arg_names():
                if input_name in self._grad2merged_grad:
                    new_op_desc._rename_input(
2056 2057
                        input_name, self._grad2merged_grad[input_name]
                    )
2058 2059 2060 2061

            for output_name in new_op_desc.output_arg_names():
                if output_name in self._grad2merged_grad:
                    new_op_desc._rename_output(
2062 2063
                        output_name, self._grad2merged_grad[output_name]
                    )
2064 2065

                # move non temp optimize vars from block0 to cond block
2066 2067 2068
                if (
                    output_name not in already_moved_var_names
                    and output_name not in self._grad2merged_grad.keys()
2069 2070 2071 2072 2073 2074 2075 2076
                ):
                    var_ = self._main_program.global_block().var(output_name)
                    if not var_.persistable:
                        # move
                        name_ = var_.name
                        shape_ = var_.shape
                        type_ = var_.dtype
                        self._main_program.global_block()._remove_var(
2077 2078 2079 2080 2081 2082 2083 2084
                            var_.name, sync=False
                        )
                        self.cond_block.create_var(
                            name=name_,
                            shape=shape_,
                            dtype=type_,
                            persistable=False,
                        )
2085 2086 2087 2088 2089 2090 2091 2092
                        already_moved_var_names.append(name_)

        self._main_program.global_block()._sync_with_cpp()
        cur_block._sync_with_cpp()

        # fill zero to grad@gradientmerge
        for grad, merged_grad in self._grad2merged_grad.items():
            merged_grad_var = main_block.var(merged_grad)
2093 2094 2095 2096 2097 2098 2099 2100 2101 2102
            cur_block.append_op(
                type='fill_constant',
                outputs={'Out': merged_grad_var},
                attrs={
                    "shape": merged_grad_var.shape,
                    "dtype": merged_grad_var.dtype,
                    "value": float(0),
                    OP_ROLE_KEY: OpRole.Optimize,
                },
            )
2103 2104 2105 2106

        # lr_var = main_block.var("gradient_merge_current_step")
        # paddle.static.Print(lr_var, message="in OPTIMIZE last conditional")

W
WangXi 已提交
2107
    def _sharding_gradient_merge(self):
2108 2109 2110 2111 2112 2113
        """
        copy all optimize ops in origin main block
        remove all optimize ops in origin main block
        create cond block

        """
2114 2115 2116 2117
        if (
            self.gradient_merge_mode != "sharding_gm"
            or self._gradient_merge_acc_step <= 1
        ):
W
WangXi 已提交
2118 2119 2120
            return

        main_block = self._main_program.global_block()
2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135
        # copy original optimize ops to temp ops desc list
        # remove them from block 0
        tmp_copy_block = self._main_program._create_block()

        self.original_optimize_ops_desc = []
        for op_idx, op in reversed(list(enumerate(main_block.ops))):
            if int(op.attr('op_role')) != int(OpRole.Optimize):
                continue
            else:
                tmp_op_desc = tmp_copy_block.desc.append_op()
                tmp_op_desc.copy_from(op.desc)
                self.original_optimize_ops_desc.append(tmp_op_desc)
                main_block._remove_op(op_idx, sync=False)
        tmp_copy_block._sync_with_cpp()
        self.original_optimize_ops_desc = list(
2136 2137
            reversed(self.original_optimize_ops_desc)
        )
2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153

        # back to block 0
        self._main_program._rollback()

        # create cond vars and ops at the end of block 0
        cond = self._create_gm_cond(main_block)

        # create cond block
        cond_block = self._main_program._create_block()
        self._true_apply_gradient()

        # back to block 0
        self._main_program._rollback()

        # cond op
        step_scope = self._main_program.global_block().create_var(
2154 2155
            type=core.VarDesc.VarType.STEP_SCOPES
        )
2156 2157 2158 2159 2160 2161
        conditional_block_op = self._main_program.global_block().append_op(
            type='conditional_block',
            inputs={
                'Cond': cond,
                'Input': [],
            },
2162
            outputs={'Out': [], 'Scope': [step_scope]},
2163 2164 2165
            attrs={
                'sub_block': cond_block,
                'is_scalar_condition': True,
2166 2167
            },
        )