fleet_base.py 66.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
#   Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function
16
import copy
17
import warnings
18
import paddle
19
import os
20
from types import MethodType
21
import numpy as np
22
from paddle.fluid.framework import dygraph_only, _global_flags
23
from paddle.fluid import compiler
24
from .role_maker import UserDefinedRoleMaker, PaddleCloudRoleMaker, RoleMakerBase
25
from .strategy_compiler import StrategyCompiler
26
from .distributed_strategy import DistributedStrategy
27 28
from .meta_optimizer_factory import MetaOptimizerFactory
from .runtime_factory import RuntimeFactory
29
from paddle.fluid.wrapped_decorator import wrap_decorator
30
from paddle.fluid.dygraph import parallel_helper
31
from paddle.fluid.ir import apply_build_strategy
32
from . import topology as tp
33
from .topology import ParallelMode
34
from ..meta_parallel import TensorParallel, model_parallel_random_seed
J
JZ-LIANG 已提交
35
from ..meta_parallel import PipelineParallel, ShardingParallel
K
kuizhiqing 已提交
36
from ..meta_optimizers import HybridParallelOptimizer, HeterParallelOptimizer
37
from paddle import _C_ops
38 39
from paddle.fluid import core
from paddle.fluid.dygraph import to_variable
40 41
from paddle.distributed.fleet.utils.recompute import RecomputeFunction
from paddle.fluid.dygraph.varbase_patch_methods import _grad_scalar
42

43 44
__all__ = []

45 46 47 48
_grad_scalar = None


class _RecomputeModelWrapper(paddle.nn.Layer):
49

50 51 52 53 54 55 56 57 58 59 60 61
    def __init__(self, model, segments=2, preserve_rng_state=True):
        super(_RecomputeModelWrapper, self).__init__()
        assert isinstance(model, paddle.nn.Sequential), (
            "The model passed to RecomputeModelWrapper must be of type "
            "paddle.nn.Sequential.")
        self._model = model
        self._segments = segments
        self._preserve_rng_state = preserve_rng_state
        self._layers = list(model.children())
        self._segment_size = len(self._layers) // segments

    def _run_func(self, begin, end):
62

63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
        def do_run(input):
            for i in range(begin, end):
                input = self._layers[i](input)
            return input

        return do_run

    def _checkpoint(self, func, *args, **kwargs):
        return RecomputeFunction.apply(func, self._preserve_rng_state, *args)

    def forward(self, input):
        end = 0
        for begin in range(0, self._segment_size * (self._segments - 1),
                           self._segment_size):
            end = begin + self._segment_size
            input = self._checkpoint(self._run_func(begin, end), input)
        return self._run_func(end, len(self._layers))(input)

81

82 83 84 85 86 87 88 89 90 91 92 93 94 95
def apply_ir_passes(main_program, startup_program, config):
    build_strategy = config._user_defined_strategy.build_strategy._copy()
    if not _global_flags()['FLAGS_apply_pass_to_program']:
        return build_strategy

    pipeline_opt = getattr(main_program, "_pipeline_opt", {})
    if pipeline_opt:
        main_program = pipeline_opt["section_program"]
        startup_program = startup_program._pipeline_opt["startup_program"]

    pass_attrs = {"use_cuda": config._is_collective}
    fuse_all_reduce = config._user_defined_strategy.fuse_all_reduce_ops
    if fuse_all_reduce and build_strategy.fuse_all_optimizer_ops:
        # FIXME(zjl): currently, fuse_all_optimizer_ops
96 97 98 99
        # have conflict with fuse_all_reduce_ops because
        # RawProgramOptimizer also inserts coalesce_tensor
        # into program. These two procedures may conflict
        # in which vars are to be fused.
100 101 102 103 104 105 106 107 108
        warnings.warn(
            'Currently, the fuse_all_optimizer_ops pass has conflict with fuse_all_reduce_ops pass. Disable the fuse_all_optimizer_ops pass temporarily.'
        )
        build_strategy.fuse_all_optimizer_ops = False

    return apply_build_strategy(main_program, startup_program, build_strategy,
                                pass_attrs)


109
def _inited_runtime_handler_(func):
110

111 112 113 114 115 116 117 118 119 120 121
    def __impl__(*args, **kwargs):
        cls = args[0]

        if cls._runtime_handle is None:
            raise ValueError("Fleet can not find suitable runtime handler")

        return func(*args, **kwargs)

    return __impl__


122
def _is_non_distributed_check_(func):
123

124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
    def __impl__(*args, **kwargs):
        cls = args[0]

        if cls._role_maker is not None and cls._role_maker._is_non_distributed(
        ) is True:
            warnings.warn(
                "%s() function doesn't work when use non_distributed fleet." %
                (func.__name__))
            return

        return func(*args, **kwargs)

    return __impl__


139
inited_runtime_handler = wrap_decorator(_inited_runtime_handler_)
140
is_non_distributed_check = wrap_decorator(_is_non_distributed_check_)
141 142


143 144 145
class Fleet(object):
    """
    Unified API for distributed training of PaddlePaddle
146
    Please reference the https://github.com/PaddlePaddle/FleetX for details
147 148 149 150 151


    Returns:
        Fleet: A Fleet instance

152
    Example for collective training:
1
123malin 已提交
153

154 155
        .. code-block:: python

1
123malin 已提交
156 157
            import paddle
            paddle.enable_static()
158
            import paddle.distributed.fleet as fleet
159 160 161

            fleet.init(is_collective=True)

162 163 164
            strategy = fleet.DistributedStrategy()
            optimizer = paddle.optimizer.SGD(learning_rate=0.001)
            optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
165 166 167 168 169 170 171 172

            # do distributed training


    Example for parameter server training:

        .. code-block:: python

1
123malin 已提交
173 174
            import paddle
            paddle.enable_static()
175 176
            import paddle.distributed.fleet as fleet
            strategy = fleet.DistributedStrategy()
S
ShenLiang 已提交
177
            fleet.init(strategy=strategy)
178

179
            optimizer = paddle.optimizer.SGD(learning_rate=0.001)
180
            optimizer = fleet.distributed_optimizer(optimizer)
181

182 183
            if fleet.is_first_worker():
                print("this is first worker")
184

185 186
            print("current node index: {}".format(fleet.worker_index()))
            print("total number of worker num: {}".format(fleet.worker_num()))
187

188 189 190
            if fleet.is_worker():
                print("this is worker")
            print("worker endpoints: {}".format(fleet.worker_endpoints(to_string=True)))
191

192 193
            print("server num: {}".format(fleet.server_num()))
            print("server endpoints: {}".format(fleet.server_endpoints(to_string=True)))
194

195 196 197
            if fleet.is_server():
                print("this is server")
            fleet.stop_worker()
198 199


200 201 202
    """

    def __init__(self):
203
        self._role_maker = None
204
        self.strategy_compiler = None
205
        self._is_collective = False
206
        self._runtime_handle = None
D
Dong Daxiang 已提交
207 208
        self._util = None
        self._context = {}
209

210
    def init(self, role_maker=None, is_collective=False, strategy=None):
211 212 213
        """
        Initialize role_maker in Fleet.

214 215 216 217 218 219 220 221 222 223 224
        This function is responsible for the distributed architecture
        what you want to run your code behind.

        Args:
            role_maker (RoleMakerBase, optional): A ``RoleMakerBase`` containing the configuration
                of environment variables related to distributed training.If you did not initialize 
                the rolemaker by yourself, it will be automatically initialized to PaddleRoleMaker.
                The default value is None.
            is_collective (Boolean, optional): A ``Boolean`` variable determines whether the program 
                runs on the CPU or GPU. False means set distributed training using CPU, and True means
                GPU.The default value is False.The default value is False.
225 226 227 228
            strategy (DistributedStrategy): Extra properties for distributed training. 
                For details, please refer to paddle.distributed.fleet.DistributedStrategy. Default: None.


229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
        Returns:
            None

        Examples1:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

        Examples2:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init(is_collective=True)

        Examples3:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
1
123malin 已提交
251
                role = fleet.PaddleCloudRoleMaker()
252
                fleet.init(role)
253

254 255 256 257 258 259
        Examples4:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                strategy = fleet.DistributedStrategy()
S
ShenLiang 已提交
260
                fleet.init(strategy=strategy)
261

262
        """
S
ShenLiang 已提交
263 264 265
        if strategy is None:
            strategy = DistributedStrategy()
        self._user_defined_strategy = copy.deepcopy(strategy)
266 267

        if role_maker is None:
268 269 270 271 272 273
            if isinstance(is_collective, bool):
                self._is_collective = is_collective
                self._role_maker = PaddleCloudRoleMaker(
                    is_collective=self._is_collective)
            else:
                raise ValueError(
274 275
                    "`is_collective` should be instance of `bool`, but got {}".
                    format(type(is_collective)))
276
        else:
277 278
            if isinstance(role_maker, RoleMakerBase):
                self._role_maker = role_maker
279
                self._is_collective = role_maker._is_collective
280 281
            else:
                raise ValueError(
282 283
                    "`role_maker` should be subclass of `RoleMakerBase`, but got {}"
                    .format(type(role_maker)))
284
        self._role_maker._generate_role()
285

286 287 288
        import paddle.distributed.fleet as fleet
        fleet.util._set_role_maker(self._role_maker)

289
        self.strategy_compiler = StrategyCompiler()
290 291 292 293 294 295 296 297 298

        if self._role_maker._is_non_distributed() and self._is_collective:
            if paddle.fluid.core.is_compiled_with_cuda():
                gpus_num = paddle.fluid.core.get_cuda_device_count()
                if gpus_num != 1:
                    raise ValueError(
                        "CUDA_VISIBLE_DEVICES shoule be set only 1 card if you use `python` to launch fleet program."
                    )

J
Jiabin Yang 已提交
299
        if paddle.fluid.framework._non_static_mode():
300
            if self.worker_num() == 1:
301 302 303
                # if worker_num is 1, should construct default topology & hcg
                self._topology = tp.CommunicateTopology()
                self._hcg = tp.HybridCommunicateGroup(self._topology)
304
                return
305 306 307 308
            if parallel_helper._is_parallel_ctx_initialized():
                warnings.warn(
                    "The dygraph parallel environment has been initialized.")
            else:
309 310 311 312 313 314 315 316 317
                # FLAGS_nccl_nrings is used for dynamic graph multi-stream communication
                if "FLAGS_nccl_nrings" in os.environ:
                    warnings.warn(
                        "You have set the environment variable FLAGS_nccl_nrings "
                        "outside the program, so the nccl_comm_num in "
                        "DistributedStrategy will not take effect here.")
                else:
                    os.environ["FLAGS_nccl_nrings"] = str(
                        self._user_defined_strategy.nccl_comm_num)
318
                paddle.distributed.init_parallel_env()
319

K
kuizhiqing 已提交
320 321 322 323 324 325 326 327 328
            # hybrid parallel not support for npu/xpu
            if self._user_defined_strategy.heter_ccl_mode == False:
                # init hybrid parallel environment in dygraph
                if tp._HYBRID_PARALLEL_GROUP is None:
                    self._init_hybrid_parallel_env()
                else:
                    warnings.warn(
                        "The dygraph hybrid parallel environment has been initialized."
                    )
W
WangXi 已提交
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
        elif self._is_collective:
            use_sharding = self._user_defined_strategy.sharding

            # global group
            global_rank = self.worker_index()
            global_world_size = self.worker_num()
            # NOTE(wangxi): see sharding_optimizer
            global_ring_id = 3 if use_sharding else 0
            global_ranks = list(range(global_world_size))

            if tp._HYBRID_PARALLEL_GROUP is None: tp._CommunicateGroup()
            cg = tp._HYBRID_PARALLEL_GROUP
            self._hcg = cg
            cg.set_comm_group('global', global_rank, global_world_size,
                              global_ring_id, global_ranks)

Y
Yuang Liu 已提交
345 346 347
            use_tensor_parallel = self._user_defined_strategy.tensor_parallel
            use_mp = use_sharding or use_tensor_parallel

W
WangXi 已提交
348
            # hybrid group
Y
Yuang Liu 已提交
349 350 351 352 353 354 355 356 357 358
            if use_mp is False: return

            mp_degree_sharding = 1
            mp_degree_tensor_parallel = 1
            if use_sharding:
                sharding_configs = self._user_defined_strategy.sharding_configs
                mp_degree_sharding = int(sharding_configs['mp_degree'])

            if use_tensor_parallel:
                tensor_parallel_configs = self._user_defined_strategy.tensor_parallel_configs
359 360
                mp_degree_tensor_parallel = int(
                    tensor_parallel_configs['tensor_parallel_degree'])
Y
Yuang Liu 已提交
361 362 363

            if use_sharding and use_tensor_parallel:
                assert mp_degree_sharding == mp_degree_tensor_parallel
W
WangXi 已提交
364

Y
Yuang Liu 已提交
365
            mp_degree = mp_degree_sharding if use_sharding else mp_degree_tensor_parallel
W
WangXi 已提交
366 367 368 369 370 371 372 373 374 375 376 377 378

            if mp_degree > 1:
                assert global_world_size % mp_degree == 0
                # NOTE(wangxi): mp_ring_id sync with sharding_optimizer.py _build_groups
                mp_ring_id = 0
                mp_rank = global_rank % mp_degree
                mp_group_id = global_rank // mp_degree
                mp_group_ranks = [
                    idx for idx in global_ranks
                    if idx // mp_degree == mp_group_id
                ]
                cg.set_comm_group('model', mp_rank, mp_degree, mp_ring_id,
                                  mp_group_ranks)
379 380 381 382 383 384 385 386

    def _init_hybrid_parallel_env(self):
        """initialize the hybrid environment
        """
        self.hybrid_configs = self._user_defined_strategy.hybrid_configs
        self.dp_degree = self.hybrid_configs["dp_degree"]
        self.mp_degree = self.hybrid_configs["mp_degree"]
        self.pp_degree = self.hybrid_configs["pp_degree"]
J
JZ-LIANG 已提交
387
        self.sharding_degree = self.hybrid_configs["sharding_degree"]
388 389 390

        assert self.mp_degree >= 0, "mp_degree should be greater or equal to 0"
        assert self.pp_degree >= 0, "pp_degree should be greater or equal to 0"
J
JZ-LIANG 已提交
391
        assert self.sharding_degree >= 0, "sharding_degree should be greater or equal to 0"
392 393 394 395 396 397 398 399 400 401 402

        self.mp_degree = max(self.mp_degree, 1)
        self.pp_degree = max(self.pp_degree, 1)

        if self.dp_degree < 0:
            nranks = paddle.distributed.get_world_size()
            self.dp_degree = nranks // (self.mp_degree * self.pp_degree)

        self.dp_degree = max(self.dp_degree, 1)

        self._topology = tp.CommunicateTopology(
J
JZ-LIANG 已提交
403 404 405 406 407
            hybrid_group_names=["data", "pipe", "sharding", "model"],
            dims=[
                self.dp_degree, self.pp_degree, self.sharding_degree,
                self.mp_degree
            ])
408 409 410

        self._hcg = tp.HybridCommunicateGroup(self._topology)

411 412 413 414 415 416 417 418
        if self.mp_degree > 1:
            tensor_parallel_configs = self._user_defined_strategy.tensor_parallel_configs
            tensor_init_seed = tensor_parallel_configs["tensor_init_seed"]
            if tensor_init_seed == -1:
                model_parallel_random_seed()
            else:
                model_parallel_random_seed(tensor_init_seed)

419 420 421 422 423 424 425 426
    def get_hybrid_communicate_group(self):
        assert self._hcg is not None
        return self._hcg

    def get_hybrid_parallel_topology(self):
        assert self._topology is not None
        return self._topology

427 428 429 430 431 432 433
    def is_first_worker(self):
        """
        Check whether the node is the first instance of worker.

        Returns:
            bool: True if this is the first node of worker,
                  False if not.
434

435 436 437 438 439 440 441 442
        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.is_first_worker()

443
        """
444
        return self._role_maker._is_first_worker()
445 446 447 448 449 450 451

    def worker_index(self):
        """
        Get current worker index.

        Returns:
            int: node id
452 453 454 455

        Examples:

            .. code-block:: python
1
123malin 已提交
456

457 458 459 460
                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.worker_index()

461
        """
462
        return self._role_maker._worker_index()
463 464 465 466 467 468 469

    def worker_num(self):
        """
        Get current total worker number.

        Returns:
            int: worker numbers
1
123malin 已提交
470

471
        Examples:
1
123malin 已提交
472

473 474 475 476 477 478
            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.worker_num()

479
        """
480
        return self._role_maker._worker_num()
481

482 483 484 485 486 487 488 489 490 491 492 493
    def node_num(self):
        return self._role_maker._get_node_num()

    def local_rank(self):
        return self._role_maker._get_local_rank()

    def local_device_ids(self):
        return self._role_maker._get_local_device_ids()

    def world_device_ids(self):
        return self._role_maker._get_world_device_ids()

494 495 496 497 498 499 500
    def is_worker(self):
        """
        Check whether the node is an instance of worker.

        Returns:
            bool: True if this is a node of worker,
                  False if not.
501 502

        Examples:
1
123malin 已提交
503

504 505 506 507 508 509
            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.is_worker()

510
        """
511
        return self._role_maker._is_worker()
512 513 514

    def worker_endpoints(self, to_string=False):
        """
515
        Get current worker endpoints, such as ["127.0.0.1:1001", "127.0.0.1:1002"].
516 517 518

        Returns:
            list/string: server endpoints
519 520

        Examples:
1
123malin 已提交
521

522 523 524 525 526 527
            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.worker_endpoints()

528 529
        """
        if to_string:
530
            return ",".join(self._role_maker._get_trainer_endpoints())
531
        else:
532
            return self._role_maker._get_trainer_endpoints()
533 534 535 536 537 538 539

    def server_num(self):
        """
        Get current total worker number.

        Returns:
            int: server number
540 541

        Examples:
1
123malin 已提交
542

543
            .. code-block:: python
1
123malin 已提交
544 545 546 547

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.server_num()
548
        """
549
        return len(self._role_maker._get_pserver_endpoints())
550 551 552 553 554 555 556

    def server_index(self):
        """
        Get current server index.

        Returns:
            int: node id
557 558

        Examples:
1
123malin 已提交
559

560 561 562 563 564 565
            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.server_index()

566
        """
567
        return self._role_maker._server_index()
568 569 570 571 572 573 574

    def server_endpoints(self, to_string=False):
        """
        Get current server endpoints, such as ["127.0.0.1:1001", "127.0.0.1:1002"].

        Returns:
            list/string: server endpoints
575 576

        Examples:
1
123malin 已提交
577

578 579 580 581 582 583
            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.server_endpoints()

584
        """
585

586
        if to_string:
587
            return ",".join(self._role_maker._get_pserver_endpoints())
588
        else:
589
            return self._role_maker._get_pserver_endpoints()
590 591 592 593 594 595 596 597

    def is_server(self):
        """
        Check whether the node is an instance of server.

        Returns:
            bool: True if this is a node of server,
                  False if not.
598 599 600 601

        Examples:

            .. code-block:: python
1
123malin 已提交
602

603 604 605 606
                import paddle.distributed.fleet as fleet
                fleet.init()
                fleet.is_server()

607
        """
608 609
        return self._role_maker._is_server()

610 611
    def barrier_worker(self):
        """
612 613 614 615
        barrier all workers

        Returns:
            None
616
        """
617
        self._role_maker._barrier("worker")
618

619
    @is_non_distributed_check
620
    @inited_runtime_handler
621
    def init_worker(self, scopes=None):
622
        """
623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640
        initialize `Communicator` for parameter server training.


        Returns:
            None

        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

                fleet.init_worker()

641
        """
642
        self._runtime_handle._init_worker(scopes)
643

644
    @is_non_distributed_check
645
    @inited_runtime_handler
646
    def init_server(self, *args, **kwargs):
647
        """
648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666
        init_server executor to initialize startup program,
        if the `args` is not empty, it will run load_persistables for increment training.


        Returns:
            None

        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

                fleet.init_server()

667
        """
668
        self._runtime_handle._init_server(*args, **kwargs)
669

Z
zmxdream 已提交
670 671
    @is_non_distributed_check
    @inited_runtime_handler
T
Thunderbrook 已提交
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694
    def load_model(self, path, mode):
        """
        load fleet model from path


        Returns:
            None

        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

                fleet.load_model("path", "mode")

        """
        self._runtime_handle.load_model(path, mode)

695
    @is_non_distributed_check
696
    @inited_runtime_handler
697 698
    def run_server(self):
        """
699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716
        run server will run pserver main program with executor.

        Returns:
            None

        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

                if fleet.is_server():
                    fleet.init_server()

717 718 719
        """
        self._runtime_handle._run_server()

720
    @is_non_distributed_check
721
    @inited_runtime_handler
722 723
    def stop_worker(self):
        """
724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740
        stop `Communicator` and give training complete notice to parameter server.

        Returns:
            None

        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

                fleet.init_server()

741 742 743
        """
        self._runtime_handle._stop_worker()

Z
zmxdream 已提交
744 745
    @is_non_distributed_check
    @inited_runtime_handler
T
tangwei12 已提交
746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779
    def save(self, dirname, feed=[], fetch=[], **configs):
        inference = True

        if not feed and not fetch:
            inference = False

        place = paddle.CPUPlace()
        executor = paddle.static.Executor(place)

        if inference:
            feeded_var_names = []
            fetch_var_names = []

            for var in feed:
                if isinstance(var, str):
                    feeded_var_names.append(var)
                elif isinstance(var, paddle.static.Variable):
                    feeded_var_names.append(var.name)
                else:
                    raise ValueError("feed must be [str|Variable]")

            for var in fetch:
                if isinstance(var, str):
                    fetch_var_names.append(var)
                elif isinstance(var, paddle.static.Variable):
                    fetch_var_names.append(var.name)
                else:
                    raise ValueError("feed must be [str|Variable]")

            fetch_vars = [
                paddle.static.default_main_program().global_block().var(name)
                for name in fetch_var_names
            ]

780 781 782 783
            self._runtime_handle._save_inference_model(executor, dirname,
                                                       feeded_var_names,
                                                       fetch_vars, None, True,
                                                       0)
T
tangwei12 已提交
784 785 786 787
        else:
            increment_mode = 0
            if "mode" in configs:
                increment_mode = int(configs["mode"])
788 789 790 791
            self._runtime_handle._save_persistables(executor,
                                                    dirname,
                                                    main_program=None,
                                                    mode=increment_mode)
T
tangwei12 已提交
792

Z
zmxdream 已提交
793 794
    @is_non_distributed_check
    @inited_runtime_handler
795 796 797 798 799 800
    def save_inference_model(self,
                             executor,
                             dirname,
                             feeded_var_names,
                             target_vars,
                             main_program=None,
801 802
                             export_for_deployment=True,
                             mode=0):
803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821
        """
        save inference model for inference.

        Returns:
            None

        Examples:

            .. code-block:: python

                import paddle.distributed.fleet as fleet
                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

                fleet.init_server()

        """
T
tangwei12 已提交
822 823 824
        # warnings.warn(
        #     "'save_inference_model' is a deprecated, will be deleted after v2.2.0, Please use fleet.save instead."
        # )
825

826 827 828 829
        self._runtime_handle._save_inference_model(executor, dirname,
                                                   feeded_var_names,
                                                   target_vars, main_program,
                                                   export_for_deployment, mode)
830

Z
zmxdream 已提交
831 832
    @is_non_distributed_check
    @inited_runtime_handler
833
    def save_persistables(self, executor, dirname, main_program=None, mode=0):
834 835
        """

1
123malin 已提交
836
        saves all persistable tensors from :code:`main_program` to
837 838
        the folder :code:`dirname`. You can refer to

1
123malin 已提交
839 840
        The :code:`dirname` is used to specify the folder where persistable tensors
        are going to be saved. If you would like to save tensors in separate
841 842 843
        files, set :code:`filename` None.

        Args:
1
123malin 已提交
844
            executor(Executor): The executor to run for saving persistable tensors.
845 846 847 848 849
                                You can refer to :ref:`api_guide_executor_en` for
                                more details.

            dirname(str, optional): The saving directory path.
                                When you need to save the parameter to the memory, set it to None.
1
123malin 已提交
850
            main_program(Program, optional): The program whose persistbale tensors will
851 852 853 854 855 856 857 858 859 860
                                             be saved. Default: None.


        Returns:
            None

        Examples:

            .. code-block:: text

1
123malin 已提交
861 862
                import paddle
                paddle.enable_static()
863 864 865 866 867 868 869
                import paddle.distributed.fleet as fleet

                fleet.init()

                # build net
                # fleet.distributed_optimizer(...)

1
123malin 已提交
870 871
                exe = paddle.static.Executor(paddle.CPUPlace())
                fleet.save_persistables(exe, "dirname", paddle.static.default_main_program())
872 873

        """
T
tangwei12 已提交
874 875 876
        # warnings.warn(
        #     "'save_persistables' is a deprecated, will be deleted after v2.2.0, Please use fleet.save instead."
        # )
877

878 879
        self._runtime_handle._save_persistables(executor, dirname, main_program,
                                                mode)
880

Z
zhaocaibei123 已提交
881 882 883 884 885
    @is_non_distributed_check
    @inited_runtime_handler
    def save_cache_model(self, dirname, **configs):
        return self._runtime_handle._save_cache_model(dirname, **configs)

886
    def shrink(self, threshold=None):
887 888
        self._runtime_handle._shrink(threshold)

889
    def distributed_optimizer(self, optimizer, strategy=None):
890
        """
891 892 893 894 895 896 897
        Optimizer for distributed training.

        For the distributed training, this method would rebuild a new instance of DistributedOptimizer.
        Which has basic Optimizer function and special features for distributed training.

        Args:
            optimizer(Optimizer): The executor to run for init server.
898 899 900 901 902
            strategy(DistributedStrategy): Extra properties for distributed optimizer. 
                It is recommended to use DistributedStrategy in fleet.init(). The strategy
                here is for compatibility. If the strategy in fleet.distributed_optimizer() 
                is not None, then it will overwrite the DistributedStrategy in fleet.init(), 
                which will take effect in distributed training.
903

904
        Returns:
905
            Fleet: instance of fleet.
906 907

        Examples:
908

909
            .. code-block:: python
910

1
123malin 已提交
911
                import paddle
912
                import paddle.distributed.fleet as fleet
1
123malin 已提交
913
                fleet.init(is_collective=True)
914 915 916 917
                strategy = fleet.DistributedStrategy()
                optimizer = paddle.optimizer.SGD(learning_rate=0.001)
                optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)

918 919
        """
        self.user_defined_optimizer = optimizer
920

921
        if strategy is not None:
T
tangwei12 已提交
922 923 924 925 926 927 928
            if self._is_collective:
                warnings.warn(
                    "It is recommended to use DistributedStrategy "
                    "in fleet.init(). The strategy here is only for compatibility. "
                    "If the strategy in fleet.distributed_optimizer() is "
                    "not None, then it will overwrite the DistributedStrategy in fleet.init(), "
                    "which will take effect in distributed training.")
929
            self._user_defined_strategy = copy.deepcopy(strategy)
D
Dong Daxiang 已提交
930 931

        self._context = {}
S
ShenLiang 已提交
932

J
Jiabin Yang 已提交
933
        if paddle.fluid.framework._non_static_mode():
934
            if self.worker_num() > 1:
K
kuizhiqing 已提交
935 936 937 938 939 940
                if self._user_defined_strategy.heter_ccl_mode == False:
                    return HybridParallelOptimizer(optimizer, self._hcg,
                                                   self._user_defined_strategy)
                else:
                    return HeterParallelOptimizer(optimizer,
                                                  self._user_defined_strategy)
941 942
            else:
                return optimizer
943 944
        return self

945
    @dygraph_only
946
    def distributed_model(self, model):
947
        """
948 949 950 951 952 953 954
        Return distributed data parallel model (Only work in dygraph mode)

        Args:
            model (Layer): the user-defind model which inherits Layer.

        Returns:
            distributed data parallel model which inherits Layer.
955 956

        Examples:
957

958 959
            .. code-block:: python

960 961 962 963 964 965 966 967 968
                import paddle
                import paddle.nn as nn
                from paddle.distributed import fleet

                class LinearNet(nn.Layer):
                    def __init__(self):
                        super(LinearNet, self).__init__()
                        self._linear1 = nn.Linear(10, 10)
                        self._linear2 = nn.Linear(10, 1)
969

970 971
                    def forward(self, x):
                        return self._linear2(self._linear1(x))
972

1
123malin 已提交
973
                # 1. initialize fleet environment
974 975
                fleet.init(is_collective=True)

1
123malin 已提交
976
                # 2. create layer & optimizer
977 978 979 980 981
                layer = LinearNet()
                loss_fn = nn.MSELoss()
                adam = paddle.optimizer.Adam(
                    learning_rate=0.001, parameters=layer.parameters())

1
123malin 已提交
982
                # 3. get data_parallel model using fleet
983 984 985
                adam = fleet.distributed_optimizer(adam)
                dp_layer = fleet.distributed_model(layer)

1
123malin 已提交
986
                # 4. run layer
987 988 989 990 991 992 993 994 995 996 997 998
                inputs = paddle.randn([10, 10], 'float32')
                outputs = dp_layer(inputs)
                labels = paddle.randn([10, 1], 'float32')
                loss = loss_fn(outputs, labels)

                print("loss:", loss.numpy())

                loss.backward()

                adam.step()
                adam.clear_grad()

999

1000
        """
1001 1002 1003
        assert model is not None, "model should not be None"
        if self.worker_num() <= 1:
            return model
J
JZ-LIANG 已提交
1004

1005 1006 1007 1008 1009 1010 1011
        amp_enable = False
        recompute_enable = False
        strategy = self._user_defined_strategy
        if strategy.amp == True:
            amp_enable = True
            amp_level = "O2" if strategy.amp_configs['use_pure_fp16'] else "O1"
            if amp_level.upper() == "O2":
1012 1013 1014 1015 1016
                model = paddle.amp.decorate(models=model,
                                            optimizers=None,
                                            level="O2",
                                            master_weight=None,
                                            save_dtype=None)
1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038
            init_loss_scaling = strategy.amp_configs['init_loss_scaling']
            incr_ratio = strategy.amp_configs['incr_ratio']
            decr_ratio = strategy.amp_configs['decr_ratio']
            incr_every_n_steps = strategy.amp_configs['incr_every_n_steps']
            decr_every_n_nan_or_inf = strategy.amp_configs[
                'decr_every_n_nan_or_inf']
            use_dynamic_loss_scaling = strategy.amp_configs[
                'use_dynamic_loss_scaling']

            global _grad_scalar
            _grad_scalar = paddle.amp.GradScaler(
                init_loss_scaling=init_loss_scaling,
                incr_ratio=incr_ratio,
                decr_ratio=decr_ratio,
                incr_every_n_steps=incr_every_n_steps,
                decr_every_n_nan_or_inf=decr_every_n_nan_or_inf,
                use_dynamic_loss_scaling=use_dynamic_loss_scaling)

        if strategy.recompute == True:
            recompute_enable = True
            model = _RecomputeModelWrapper(model)

K
kuizhiqing 已提交
1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049
        if self._user_defined_strategy.heter_ccl_mode == True:
            distributed_model = paddle.DataParallel(
                model,
                comm_buffer_size=self._user_defined_strategy.
                fuse_grad_size_in_MB,
                last_comm_buffer_size=self._user_defined_strategy.
                last_comm_group_size_MB,
                find_unused_parameters=self._user_defined_strategy.
                find_unused_parameters)
            return distributed_model

J
JZ-LIANG 已提交
1050
        if self._hcg.get_parallel_mode() == ParallelMode.SHARDING_PARALLEL:
1051 1052 1053
            model = ShardingParallel(model,
                                     self._hcg,
                                     strategy=self._user_defined_strategy)
J
JZ-LIANG 已提交
1054
        elif self._hcg.get_parallel_mode() == ParallelMode.DATA_PARALLEL:
1055 1056 1057 1058 1059 1060 1061 1062

            # NOTE (JZ-LIANG) init parameters broadcast within sharding group
            # normally it should be done inside DataParallel
            if self.sharding_degree > 1:
                from paddle.distributed.fleet.utils.hybrid_parallel_util import broadcast_mp_parameters, broadcast_sharding_parameters
                assert self.sharding_degree == self._hcg.get_sharding_parallel_world_size(
                )
                broadcast_sharding_parameters(model, self._hcg)
1063
            model = paddle.DataParallel(
1064 1065 1066 1067 1068 1069
                model,
                comm_buffer_size=self._user_defined_strategy.
                fuse_grad_size_in_MB,
                last_comm_buffer_size=self._user_defined_strategy.
                last_comm_group_size_MB,
                find_unused_parameters=self._user_defined_strategy.
1070
                find_unused_parameters)
1071
        elif self._hcg.get_parallel_mode() == ParallelMode.TENSOR_PARALLEL:
1072 1073 1074
            model = TensorParallel(model,
                                   self._hcg,
                                   strategy=self._user_defined_strategy)
1075
        elif self._hcg.get_parallel_mode() == ParallelMode.PIPELINE_PARALLEL:
1076 1077 1078
            model = PipelineParallel(model,
                                     self._hcg,
                                     strategy=self._user_defined_strategy)
J
JZ-LIANG 已提交
1079

1080
        return model
1081 1082 1083 1084 1085

    @dygraph_only
    def state_dict(self):
        """
        Get state dict information from optimizer.
1086
        (Only work in dygraph mode)
1087 1088 1089 1090 1091 1092 1093

        Returns: 
            state_dict(dict) : dict contains all the Tensor used by optimizer

        Examples:
            .. code-block:: python

1094 1095 1096 1097 1098
                import numpy as np
                import paddle
                from paddle.distributed import fleet

                fleet.init(is_collective=True)
1099

1100
                value = np.arange(26).reshape(2, 13).astype("float32")
1
123malin 已提交
1101
                a = paddle.to_tensor(value)
1102

1103 1104
                layer = paddle.nn.Linear(13, 5)
                adam = paddle.optimizer.Adam(learning_rate=0.01, parameters=layer.parameters())
1105

1106 1107 1108
                adam = fleet.distributed_optimizer(adam)
                dp_layer = fleet.distributed_model(layer)
                state_dict = adam.state_dict()
1109 1110 1111 1112 1113 1114 1115 1116
        """
        # imitate target optimizer retrieval
        return self.user_defined_optimizer.state_dict()

    @dygraph_only
    def set_state_dict(self, state_dict):
        """
        Load optimizer state dict.
1117
        (Only work in dygraph mode)
1118 1119 1120 1121

        Args: 
            state_dict(dict) : Dict contains all the Tensor needed by optimizer

1122 1123
        Returns:
            None
1124 1125 1126 1127

        Examples:
            .. code-block:: python

1128 1129 1130
                import numpy as np
                import paddle
                from paddle.distributed import fleet
1131

1132 1133 1134
                fleet.init(is_collective=True)

                value = np.arange(26).reshape(2, 13).astype("float32")
1
123malin 已提交
1135
                a = paddle.to_tensor(value)
1136

1137 1138
                layer = paddle.nn.Linear(13, 5)
                adam = paddle.optimizer.Adam(learning_rate=0.01, parameters=layer.parameters())
1139

1140 1141 1142
                adam = fleet.distributed_optimizer(adam)
                dp_layer = fleet.distributed_model(layer)
                state_dict = adam.state_dict()
1
123malin 已提交
1143 1144 1145
                paddle.save(state_dict, "paddle_dy")
                para_state_dict = paddle.load("paddle_dy")
                adam.set_state_dict(para_state_dict)
1146 1147 1148 1149 1150 1151 1152 1153
        """
        # imitate target optimizer retrieval
        return self.user_defined_optimizer.set_state_dict(state_dict)

    @dygraph_only
    def set_lr(self, value):
        """
        Set the value of the learning rate manually in the optimizer. 
1154
        (Only work in dygraph mode)
1155

1156 1157 1158
        Args:
            value (float|Tensor): the value of learning rate

1159 1160
        Returns: 
            None 
1161 1162 1163 1164

        Examples:
            .. code-block:: python

1165 1166 1167
                import numpy as np
                import paddle
                from paddle.distributed import fleet
1168

1169
                fleet.init(is_collective=True)
1170

1171
                value = np.arange(26).reshape(2, 13).astype("float32")
1
123malin 已提交
1172
                a = paddle.to_tensor(value)
1173

1174 1175
                layer = paddle.nn.Linear(13, 5)
                adam = paddle.optimizer.Adam(learning_rate=0.01, parameters=layer.parameters())
1176

1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190
                adam = fleet.distributed_optimizer(adam)
                dp_layer = fleet.distributed_model(layer)

                lr_list = [0.2, 0.3, 0.4, 0.5, 0.6]
                for i in range(5):
                    adam.set_lr(lr_list[i])
                    lr = adam.get_lr()
                    print("current lr is {}".format(lr))
                # Print:
                #    current lr is 0.2
                #    current lr is 0.3
                #    current lr is 0.4
                #    current lr is 0.5
                #    current lr is 0.6
1191 1192 1193 1194 1195 1196 1197 1198
        """
        # imitate target optimizer retrieval
        return self.user_defined_optimizer.set_lr(value)

    @dygraph_only
    def get_lr(self):
        """
        Get current step learning rate.
1199
        (Only work in dygraph mode)
1200 1201 1202 1203 1204

        Returns:
            float: The learning rate of the current step.

        Examples:
1
123malin 已提交
1205

1206 1207
            .. code-block:: python

1208 1209 1210 1211 1212
                import numpy as np
                import paddle
                from paddle.distributed import fleet

                fleet.init(is_collective=True)
1213

1214
                value = np.arange(26).reshape(2, 13).astype("float32")
1
123malin 已提交
1215
                a = paddle.to_tensor(value)
1216

1217 1218
                layer = paddle.nn.Linear(13, 5)
                adam = paddle.optimizer.Adam(learning_rate=0.01, parameters=layer.parameters())
1219

1220 1221
                adam = fleet.distributed_optimizer(adam)
                dp_layer = fleet.distributed_model(layer)
1222

1223 1224
                lr = adam.get_lr()
                print(lr) # 0.01
1225 1226 1227 1228 1229 1230 1231 1232
        """
        # imitate target optimizer retrieval
        return self.user_defined_optimizer.get_lr()

    @dygraph_only
    def step(self):
        """
        Execute the optimizer once.
1233
        (Only work in dygraph mode)
1234

1235 1236
        Returns:
            None
1237 1238

        Examples:
1
123malin 已提交
1239

1240 1241
            .. code-block:: python

1242 1243 1244
                import paddle
                import paddle.nn as nn
                from paddle.distributed import fleet
1245

1246 1247 1248 1249 1250
                class LinearNet(nn.Layer):
                    def __init__(self):
                        super(LinearNet, self).__init__()
                        self._linear1 = nn.Linear(10, 10)
                        self._linear2 = nn.Linear(10, 1)
1251

1252 1253
                    def forward(self, x):
                        return self._linear2(self._linear1(x))
1254

1
123malin 已提交
1255
                # 1. initialize fleet environment
1256 1257
                fleet.init(is_collective=True)

1
123malin 已提交
1258
                # 2. create layer & optimizer
1259 1260 1261 1262 1263
                layer = LinearNet()
                loss_fn = nn.MSELoss()
                adam = paddle.optimizer.Adam(
                    learning_rate=0.001, parameters=layer.parameters())

1
123malin 已提交
1264
                # 3. get data_parallel model using fleet
1265 1266 1267
                adam = fleet.distributed_optimizer(adam)
                dp_layer = fleet.distributed_model(layer)

1
123malin 已提交
1268
                # 4. run layer
1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288
                inputs = paddle.randn([10, 10], 'float32')
                outputs = dp_layer(inputs)
                labels = paddle.randn([10, 1], 'float32')
                loss = loss_fn(outputs, labels)

                print("loss:", loss.numpy())

                loss.backward()

                adam.step()
                adam.clear_grad()


        """
        # imitate target optimizer retrieval
        return self.user_defined_optimizer.step()

    @dygraph_only
    def clear_grad(self):
        """
1289 1290
        Clear the gradients of all optimized parameters for model.
        (Only work in dygraph mode)
1291

1292 1293
        Returns: 
            None
1294 1295

        Examples:
1
123malin 已提交
1296

1297 1298
            .. code-block:: python

1299 1300 1301
                import paddle
                import paddle.nn as nn
                from paddle.distributed import fleet
1302

1303 1304 1305 1306 1307
                class LinearNet(nn.Layer):
                    def __init__(self):
                        super(LinearNet, self).__init__()
                        self._linear1 = nn.Linear(10, 10)
                        self._linear2 = nn.Linear(10, 1)
1308

1309 1310
                    def forward(self, x):
                        return self._linear2(self._linear1(x))
1311

1
123malin 已提交
1312
                # 1. initialize fleet environment
1313 1314
                fleet.init(is_collective=True)

1
123malin 已提交
1315
                # 2. create layer & optimizer
1316 1317 1318 1319 1320
                layer = LinearNet()
                loss_fn = nn.MSELoss()
                adam = paddle.optimizer.Adam(
                    learning_rate=0.001, parameters=layer.parameters())

1
123malin 已提交
1321
                # 3. get data_parallel model using fleet
1322 1323 1324
                adam = fleet.distributed_optimizer(adam)
                dp_layer = fleet.distributed_model(layer)

1
123malin 已提交
1325
                # 4. run layer
1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341
                inputs = paddle.randn([10, 10], 'float32')
                outputs = dp_layer(inputs)
                labels = paddle.randn([10, 1], 'float32')
                loss = loss_fn(outputs, labels)

                print("loss:", loss.numpy())

                loss.backward()

                adam.step()
                adam.clear_grad()

        """
        # imitate target optimizer retrieval
        return self.user_defined_optimizer.clear_grad()

1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358
    def _get_amp_optimizer(self):
        # imitate target optimizer retrieval
        amp_optimizer = None
        for optimizer in self.strategy_compiler._get_applied_meta_optimizer():
            if hasattr(optimizer, 'amp_init'):
                amp_optimizer = optimizer
                break

        if amp_optimizer is None:
            if hasattr(self.user_defined_optimizer, 'amp_init'):
                amp_optimizer = self.user_defined_optimizer

        assert amp_optimizer is not None, \
            "amp_init can only be used when the amp(auto mixed precision) strategy is turned on."
        return amp_optimizer

    def get_loss_scaling(self):
1359 1360
        """Return the real-time loss scaling factor.
        """
1361 1362 1363
        amp_optimizer = self._get_amp_optimizer()
        return amp_optimizer.get_loss_scaling()

H
huangxu96 已提交
1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423
    def amp_init(self,
                 place,
                 scope=None,
                 test_program=None,
                 use_fp16_test=False):
        """
        Init the amp training, such as cast fp32 parameters to fp16 type.
  
        Args:
            place(CUDAPlace): place is used to initialize 
                fp16 parameters with fp32 values.
            scope(Scope): The scope is used to find fp32 parameters.
            test_program(Program): The program is used for testing.
            use_fp16_test(bool): Whether to use fp16 testing.
            
        Examples:
            .. code-block:: python

                import numpy as np
                import paddle
                import paddle.nn.functional as F
                paddle.enable_static()

                def run_example_code():
                    place = paddle.CUDAPlace(0)
                    exe = paddle.static.Executor(place)
                    data = paddle.static.data(name='X', shape=[None, 1, 28, 28], dtype='float32')
                    conv2d = paddle.static.nn.conv2d(input=data, num_filters=6, filter_size=3)
                    # 1) Use fp16_guard to control the range of fp16 kernels used.
                    with paddle.static.amp.fp16_guard():
                        bn = paddle.static.nn.batch_norm(input=conv2d, act="relu")
                        pool = F.max_pool2d(bn, kernel_size=2, stride=2)
                        hidden = paddle.static.nn.fc(pool, size=10)
                        loss = paddle.mean(hidden)
                    # 2) Create the optimizer and set `multi_precision` to True.
                    # Setting `multi_precision` to True can avoid the poor accuracy
                    # or the slow convergence in a way. 
                    optimizer = paddle.optimizer.Momentum(learning_rate=0.01, multi_precision=True)
                    # 3) These ops in `custom_black_list` will keep in the float32 computation type.
                    amp_list = paddle.static.amp.CustomOpLists(
                        custom_black_list=['pool2d'])
                    # 4) The entry of Paddle AMP.
                    # Enable pure fp16 training by setting `use_pure_fp16` to True.
                    optimizer = paddle.static.amp.decorate(
                        optimizer,
                        amp_list,
                        init_loss_scaling=128.0,
                        use_dynamic_loss_scaling=True,
                        use_pure_fp16=True)
                    # If you don't use the default_startup_program(), you sholud pass
                    # your defined `startup_program` into `minimize`.
                    optimizer.minimize(loss)
                    exe.run(paddle.static.default_startup_program())
                    # 5) Use `amp_init` after FP32 parameters initialization(such as `exe.run(startup_program)`).
                    # If you want to perform the testing process, you should pass `test_program` into `amp_init`.
                    optimizer.amp_init(place, scope=paddle.static.global_scope())
                    
                if paddle.is_compiled_with_cuda() and len(paddle.static.cuda_places()) > 0:
                    run_example_code()       
        """
1424
        amp_optimizer = self._get_amp_optimizer()
1425
        return amp_optimizer.amp_init(place, scope, test_program, use_fp16_test)
H
huangxu96 已提交
1426

D
Dong Daxiang 已提交
1427 1428 1429 1430 1431 1432 1433 1434 1435
    def _final_strategy(self):
        if "valid_strategy" not in self._context:
            print(
                "WARNING: You may need to call minimize function before this function is called"
            )
            return {}
        else:
            return self._context["valid_strategy"]

1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453
    def _get_applied_meta_list(self):
        if "applied_meta_list" not in self._context:
            print(
                "WARNING: You may need to call minimize function before _get_applied_meta_list called"
            )
            return []
        else:
            return self._context["applied_meta_list"]

    def _get_applied_graph_list(self):
        if "applied_graph_list" not in self._context:
            print(
                "WARNING: You may need to call minimize function before _get_applied_graph_list called"
            )
            return []
        else:
            return self._context["applied_graph_list"]

1454 1455 1456 1457 1458 1459 1460 1461 1462
    def minimize(self,
                 loss,
                 startup_program=None,
                 parameter_list=None,
                 no_grad_set=None):
        """
        Add distributed operations to minimize ``loss`` by updating ``parameter_list``.

        Args:
1
123malin 已提交
1463
            loss (Tensor): A ``Tensor`` containing the value to minimize.
1464 1465 1466
            startup_program (Program, optional): :ref:`api_fluid_Program` for
                initializing parameters in ``parameter_list``. The default value
                is None, at this time :ref:`api_fluid_default_startup_program` will be used.
1
123malin 已提交
1467
            parameter_list (Iterable, optional): Iterable of ``Tensor`` or ``Tensor.name`` to update
1468 1469
                to minimize ``loss``. The default value is None, at this time all parameters
                will be updated.
1
123malin 已提交
1470
            no_grad_set (set, optional): Set of ``Tensor``  or ``Tensor.name`` that don't need
1471 1472 1473 1474
                to be updated. The default value is None.

        Returns:
            tuple: tuple (optimize_ops, params_grads), A list of operators appended
1
123malin 已提交
1475
            by minimize and a list of (param, grad) tensor pairs, param is
1476
            ``Parameter``, grad is the gradient value corresponding to the parameter.
1477 1478
            The returned tuple can be passed to ``fetch_list`` in ``Executor.run()`` to
            indicate program pruning. If so, the program will be pruned by ``feed`` and
1479 1480 1481
            ``fetch_list`` before run, see details in ``Executor``.

        Examples:
1
123malin 已提交
1482

1483
            .. code-block:: python
1484

1485
                import paddle
1
123malin 已提交
1486
                paddle.enable_static()
1487
                import paddle.distributed.fleet as fleet
1
123malin 已提交
1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498
                import paddle.nn.functional as F

                hid_dim = 10
                label_dim = 2
                input_x = paddle.static.data(name='x', shape=[None, 13], dtype='float32')
                input_y = paddle.static.data(name='y', shape=[None, 1], dtype='int64')
                fc_1 = paddle.static.nn.fc(x=input_x, size=hid_dim, activation='tanh')
                fc_2 = paddle.static.nn.fc(x=fc_1, size=hid_dim, activation='tanh')
                prediction = paddle.static.nn.fc(x=[fc_2], size=label_dim, activation='softmax')
                cost = F.cross_entropy(input=prediction, label=input_y)
                avg_cost = paddle.mean(x=cost)
1499

1
123malin 已提交
1500
                fleet.init(is_collective=True)
1501 1502 1503 1504
                strategy = fleet.DistributedStrategy()
                optimizer = paddle.optimizer.SGD(learning_rate=0.001)
                optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
                optimizer.minimize(avg_cost)
1505

1506
                # for more examples, please reference https://github.com/PaddlePaddle/FleetX
1507 1508

        """
1509 1510 1511 1512
        if not isinstance(loss, list):
            return self._minimize_impl(loss, startup_program, parameter_list,
                                       no_grad_set)
        else:
J
Jiabin Yang 已提交
1513
            if paddle.fluid.framework._non_static_mode(
1514 1515 1516 1517 1518 1519 1520 1521 1522 1523
            ) or self._role_maker._is_non_distributed() or self._is_collective:
                raise ValueError("loss can be list only in PS mode")
            return self._minimize_losses_impl(loss, startup_program,
                                              parameter_list, no_grad_set)

    def _minimize_impl(self,
                       loss,
                       startup_program=None,
                       parameter_list=None,
                       no_grad_set=None):
D
Dong Daxiang 已提交
1524 1525 1526
        context = {}
        context["user_defined_strategy"] = copy.deepcopy(
            self._user_defined_strategy)
J
Jiabin Yang 已提交
1527
        if paddle.fluid.framework._non_static_mode():
1528 1529
            # imitate target optimizer retrieval
            target_opt = self.user_defined_optimizer
D
Dong Daxiang 已提交
1530
            self._context = context
1531 1532
            return target_opt.minimize(loss)

1533 1534
        # cache original feed forward program
        self.origin_main_program = loss.block.program
B
Baibaifan 已提交
1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550
        # add distributed attr
        if not hasattr(self.origin_main_program, "distributed_info_"):
            setattr(self.origin_main_program, "distributed_info_", dict())
            self.origin_main_program.distributed_info_[
                "dp_degree"] = self._user_defined_strategy.sharding_configs[
                    "dp_degree"]
            self.origin_main_program.distributed_info_[
                "mp_degree"] = self._user_defined_strategy.sharding_configs[
                    "mp_degree"]
            self.origin_main_program.distributed_info_[
                "pp_degree"] = self._user_defined_strategy.sharding_configs[
                    "pp_degree"]
            self.origin_main_program.distributed_info_[
                "sharding_degree"] = self._user_defined_strategy.sharding_configs[
                    "sharding_degree"]

1551
        context["origin_main_program"] = self.origin_main_program
1552
        context["origin_main_programs"] = [self.origin_main_program]
1553
        context["loss"] = loss
1554 1555
        if startup_program == None:
            self.origin_startup_program = \
1556 1557
                paddle.static.default_startup_program().clone(for_test=False)
            startup_program = paddle.static.default_startup_program()
1558 1559 1560
        else:
            self.origin_startup_program = \
                startup_program.clone(for_test=False)
1561

1562
        context["origin_startup_program"] = startup_program
1563
        context["origin_startup_programs"] = [startup_program]
1564
        context["role_maker"] = self._role_maker
1565

1566
        # Use the auto-parallel's routines instead
1567
        if self._user_defined_strategy.semi_auto or self._user_defined_strategy.auto_search:
1568 1569 1570 1571
            from ...auto_parallel.parallelizer import AutoParallelizer
            auto_parallelizer = AutoParallelizer(self)
            optimize_ops, params_grads, dist_startup_prog, dist_main_prog = auto_parallelizer.parallelize(
                loss, startup_program, parameter_list, no_grad_set)
1572

1573 1574
            return optimize_ops, params_grads, dist_startup_prog, dist_main_prog

1575 1576 1577 1578
        # compile time
        distributed_optimizer_list = \
            MetaOptimizerFactory()._get_valid_meta_optimizers(
                self.user_defined_optimizer)
D
Dong Daxiang 已提交
1579

D
Dong Daxiang 已提交
1580 1581 1582
        context["user_defined_strategy"] = copy.deepcopy(
            self._user_defined_strategy)
        copy_user_defined_strategy = copy.deepcopy(self._user_defined_strategy)
1583 1584 1585 1586 1587 1588

        # trigger the auto-parallel in very strict condition
        # strategy = DistributedStrategy()
        # strategy.auto = True
        # optimizer = paddle.optimizer.SGD(learning_rate=0.1)
        # optimizer = fleet.distributed_optimizer(optimizer, strategy)
D
Dong Daxiang 已提交
1589
        if copy_user_defined_strategy._is_strict_auto():
1590 1591
            # turn on all the strategy for each optimizer
            for opt in distributed_optimizer_list:
D
Dong Daxiang 已提交
1592
                opt._enable_strategy(copy_user_defined_strategy, context)
1593

1594 1595
        valid_optimizer_list = []
        valid_graph_optimizer_list = []
D
Dong Daxiang 已提交
1596
        can_not_apply_optimizer_list = []
1597 1598 1599 1600
        # recall meta optimizers for ranking
        for opt in distributed_optimizer_list:
            opt._set_basic_info(loss, self._role_maker,
                                self.user_defined_optimizer,
D
Dong Daxiang 已提交
1601
                                copy_user_defined_strategy)
1602 1603
            if opt._can_apply() and not opt._is_graph_out():
                valid_optimizer_list.append(opt)
D
Dong Daxiang 已提交
1604
            elif opt._can_apply() and opt._is_graph_out():
1605
                valid_graph_optimizer_list.append(opt)
D
Dong Daxiang 已提交
1606 1607
            else:
                can_not_apply_optimizer_list.append(opt)
1608
        # combine recalled meta optimizers to be a valid meta optimizer
D
Dong Daxiang 已提交
1609
        meta_optimizer, graph_optimizer = \
1610 1611
            self.strategy_compiler.generate_optimizer(
                loss, self._role_maker, self.user_defined_optimizer,
D
Dong Daxiang 已提交
1612
                copy_user_defined_strategy, valid_optimizer_list,
1613
                valid_graph_optimizer_list)
D
Dong Daxiang 已提交
1614

D
Dong Daxiang 已提交
1615
        valid_strategy = self.strategy_compiler._get_valid_strategy(
D
Dong Daxiang 已提交
1616 1617 1618
            copy_user_defined_strategy, can_not_apply_optimizer_list)

        context["valid_strategy"] = copy.deepcopy(valid_strategy)
1619 1620
        # print("valid_strategy:", context["valid_strategy"])
        # print("user_defined_strategy:", context["user_defined_strategy"])
1621

1622 1623 1624 1625 1626 1627
        applied_meta_list = self.strategy_compiler._get_applied_meta_list()
        applied_graph_list = self.strategy_compiler._get_applied_graph_list()

        context['applied_meta_list'] = applied_meta_list
        context['applied_graph_list'] = applied_graph_list

D
Dong Daxiang 已提交
1628
        self._context = context
1629

D
Dong Daxiang 已提交
1630
        self.valid_strategy = valid_strategy
1631
        self.valid_strategy._enable_env()
D
Dong Daxiang 已提交
1632

1633 1634
        optimize_ops = []
        params_grads = []
1635

1636 1637 1638 1639 1640 1641 1642 1643
        if self._role_maker._is_non_distributed() and not self._is_collective:
            if self._runtime_handle is None:
                self._runtime_handle = RuntimeFactory()._create_runtime(context)

            compiled_program = compiler.CompiledProgram(
                self.origin_main_program).with_data_parallel(
                    loss_name=loss.name, share_vars_from=None)
            loss.block.program._graph = compiled_program
1644 1645 1646 1647
            return self.user_defined_optimizer.minimize(loss,
                                                        startup_program,
                                                        parameter_list,
                                                        no_grad_set=no_grad_set)
1648

1649
        if meta_optimizer:
1650
            # print("before minimize program id:", id(loss.block.program))
1651
            optimize_ops, params_grads = meta_optimizer.minimize(
M
MRXLT 已提交
1652
                loss, startup_program, parameter_list, no_grad_set=no_grad_set)
1653
            # print("after minimize program id:", id(loss.block.program))
1654

1655
            default_program = paddle.static.default_main_program()
1656
            # print("default program id:", id(default_program))
1657 1658 1659

            if id(default_program) != id(loss.block.program):
                paddle.fluid.framework.switch_main_program(loss.block.program)
1660
            # print("default program id after switch:", id(default_program))
1661

1662 1663
        else:
            optimize_ops, params_grads = self.user_defined_optimizer.minimize(
M
MRXLT 已提交
1664
                loss, startup_program, parameter_list, no_grad_set=no_grad_set)
1665

1666 1667
        context["program_optimize_ops"] = optimize_ops
        context["program_params_grads"] = params_grads
1668

1669
        if graph_optimizer:
1670
            # print("before graph minimize program id:", id(loss.block.program))
D
Dong Daxiang 已提交
1671
            optimize_ops, params_grads = graph_optimizer.minimize(
M
MRXLT 已提交
1672
                loss, startup_program, parameter_list, no_grad_set=no_grad_set)
1673 1674 1675 1676
            # since we do not encourage users to use graph operations
            # if a graph optimizer takes effect, mostly
            # optimizers_ops and params_grads are None
            # i.e. users can not modify current computation graph anymore
1677 1678
            context["graph_optimize_ops"] = optimize_ops
            context["graph_optimize_grads"] = params_grads
1679 1680
        else:
            apply_ir_passes(loss.block.program, startup_program, self)
1681

1682 1683
        if not self._role_maker._is_heter_parameter_server_mode:
            program = paddle.static.default_main_program()
1684 1685 1686 1687 1688
            opt_info = {} if program._fleet_opt is None else program._fleet_opt
            opt_info["mpi_size"] = self.worker_num()
            opt_info["mpi_rank"] = self.worker_index()
            for k, v in self._user_defined_strategy.trainer_desc_configs.items(
            ):
1689
                if v or k not in opt_info:
1690
                    opt_info[k] = v
1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761
            program._fleet_opt = opt_info

        if self._runtime_handle is None:
            self._runtime_handle = RuntimeFactory()._create_runtime(context)

        import paddle.distributed.fleet as fleet
        fleet.util._set_strategy(context["valid_strategy"])

        return optimize_ops, params_grads

    def _minimize_losses_impl(self,
                              losses,
                              startup_programs=None,
                              parameter_list=None,
                              no_grad_set=None):
        context = {}

        # cache original feed forward program
        self.origin_main_program = losses[0].block.program
        context["origin_main_program"] = self.origin_main_program
        context["origin_main_programs"] = []
        for loss in losses:
            context["origin_main_programs"].append(loss.block.program)
        context["loss"] = losses

        if startup_programs is None:
            if len(losses) == 1:
                startup_programs = [paddle.static.default_startup_program()]
            else:
                raise ValueError(
                    "startup_program can't be None when loss is list.")
        self.origin_startup_program = startup_programs[0].clone(for_test=False)
        context["origin_startup_program"] = startup_programs[0]
        context["origin_startup_programs"] = []
        for program in startup_programs:
            context["origin_startup_programs"].append(program)

        context["role_maker"] = self._role_maker

        context["user_defined_strategy"] = copy.deepcopy(
            self._user_defined_strategy)

        context["valid_strategy"] = copy.deepcopy(self._user_defined_strategy)

        self._context = context

        self.valid_strategy = context["valid_strategy"]
        self.valid_strategy._enable_env()

        optimize_ops = []
        params_grads = []

        from ..meta_optimizers import ParameterServerOptimizer
        ps_optimizer = ParameterServerOptimizer(self.user_defined_optimizer)
        ps_optimizer._set_basic_info(losses, self._role_maker,
                                     self.user_defined_optimizer,
                                     self._user_defined_strategy)
        optimize_ops, params_grads = ps_optimizer.minimize_losses_impl(
            losses, startup_programs, parameter_list, no_grad_set=no_grad_set)

        # default_program = paddle.static.default_main_program()

        # if id(default_program) != id(losses[0].block.program):
        #     paddle.fluid.framework.switch_main_program(losses[0].block.program)

        context["program_optimize_ops"] = optimize_ops
        context["program_params_grads"] = params_grads

        for loss in losses:
            program = loss.block.program
            opt_info = {} if program._fleet_opt is None else program._fleet_opt
1762 1763 1764 1765
            opt_info["mpi_size"] = self.worker_num()
            opt_info["mpi_rank"] = self.worker_index()
            for k, v in self._user_defined_strategy.trainer_desc_configs.items(
            ):
1766
                if v or k not in opt_info:
1767
                    opt_info[k] = v
1768
            program._fleet_opt = opt_info
1769
            # print("fleet base opt info:", id(program), program._fleet_opt)
1770

1771
        if self._runtime_handle is None:
1772
            self._runtime_handle = RuntimeFactory()._create_runtime(context)
1773

1774 1775
        import paddle.distributed.fleet as fleet
        fleet.util._set_strategy(context["valid_strategy"])
1776 1777

        return optimize_ops, params_grads
1778 1779 1780

    @dygraph_only
    def distributed_scaler(self, scaler):
1781

1782 1783 1784 1785 1786 1787
        def unscale_method(self, optimizer):
            if not self._enable:
                return
            if getattr(optimizer, '_param_groups', None) and isinstance(
                    optimizer._param_groups[0], dict):
                param_grads = []
1788 1789
                param_grads_fp16 = []
                param_grads_fp32 = []
1790 1791 1792 1793
                for group in optimizer._param_groups:
                    for param in group['params']:
                        if param._grad_ivar() is not None:
                            param_grads.append(param._grad_ivar())
1794 1795 1796 1797 1798
                            if param._grad_ivar(
                            ).dtype == core.VarDesc.VarType.FP16:
                                param_grads_fp16.append(param._grad_ivar())
                            else:
                                param_grads_fp32.append(param._grad_ivar())
1799 1800 1801 1802 1803
            else:
                param_grads = [
                    param._grad_ivar() for param in optimizer._parameter_list
                    if param._grad_ivar() is not None
                ]
1804 1805
                param_grads_fp16 = [
                    param._grad_ivar() for param in optimizer._parameter_list
1806 1807
                    if (param._grad_ivar() is not None) and (
                        param._grad_ivar().dtype == core.VarDesc.VarType.FP16)
1808 1809 1810
                ]
                param_grads_fp32 = [
                    param._grad_ivar() for param in optimizer._parameter_list
1811 1812
                    if (param._grad_ivar() is not None) and (
                        param._grad_ivar().dtype == core.VarDesc.VarType.FP32)
1813
                ]
1814 1815
            temp_found_inf_fp16 = to_variable(np.array([0]).astype(np.bool_))
            temp_found_inf_fp32 = to_variable(np.array([0]).astype(np.bool_))
1816 1817 1818 1819 1820 1821 1822 1823
            if len(param_grads_fp16):
                _C_ops.check_finite_and_unscale(param_grads_fp16, self._scale,
                                                param_grads_fp16,
                                                temp_found_inf_fp16)
            if len(param_grads_fp32):
                _C_ops.check_finite_and_unscale(param_grads_fp32, self._scale,
                                                param_grads_fp32,
                                                temp_found_inf_fp32)
1824

1825
            self._found_inf = 1 if temp_found_inf_fp16 or temp_found_inf_fp32 else 0
1826
            is_found_inf = paddle.to_tensor([self._found_inf], dtype="int32")
1827

1828 1829
            # TODO(shenliang03) Since dp allreduce in the optimizer is
            # after the gradscaler, check_finite needs to synchronize global
1830
            # information. In the future, we should use check_group to speed.
1831 1832 1833
            paddle.distributed.all_reduce(is_found_inf,
                                          op=paddle.distributed.ReduceOp.MAX,
                                          group=None)
1834
            self._found_inf = is_found_inf.numpy()[0]
1835 1836 1837 1838 1839 1840 1841

        # Only tensor_parallel and pipeline_parallel need to modify scaler
        if self._hcg.get_parallel_mode() in (ParallelMode.TENSOR_PARALLEL,
                                             ParallelMode.PIPELINE_PARALLEL):
            scaler._unscale = MethodType(unscale_method, scaler)

        return scaler