parallelizer.py 5.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
#   Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Z
zhaoyingli 已提交
15
import logging
16
import paddle
Z
zhaoyingli 已提交
17
from paddle.distributed.utils import get_logger
18
from paddle.distributed.fleet import cloud_utils
19
import paddle.fluid.core as core
20 21
from .dist_context import DistributedContext
from .dist_context import get_default_distributed_context
22
from .dist_context import set_default_distributed_context
C
caozhou 已提交
23
from .completion import complete_annotation, complete_backward_annotation
24
from .partitioner import Partitioner
25
from .process_group import get_all_process_groups
26
from .utils import make_data_unshard
Z
zhaoyingli 已提交
27
from .utils import set_grad_var_shape
C
caozhou 已提交
28
from .reshard import reshard
Z
zhaoyingli 已提交
29 30 31
# from .auto_search import auto_search

_logger = get_logger(logging.INFO)
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47


class AutoParallelizer:
    """
    AutoParallelizer is the main controller class to do the auto parallel process.
    And the auto parallel process will be triggered in the wrapped parallelize function.
    To facilitate the auto parallelization, it will contain information about program, cluster and the
    related context. In this basic version, the program information will be retrevied from 
    Fleet object, and the cluster information can be retrevied in the new created Cluster object,
    and the context information can be retrevied in the new created DistributedContext. 
    """

    def __init__(self, fleet):
        self._fleet = fleet
        self._optimizer = self._fleet.user_defined_optimizer
        self._dist_strategy = self._fleet._user_defined_strategy
48
        self._dist_context = DistributedContext()
49

50 51 52 53 54 55 56 57 58 59
    def _remove_distributed_attrs(self, main_program):
        suffix = core.kAutoParallelSuffix()
        # distributed attributes for variable have been removed
        # in previous process.
        for block in main_program.blocks:
            for op in block.ops:
                for attr_name in op.attr_names:
                    if suffix in attr_name:
                        op._remove_attr(attr_name)

60 61
    def parallelize(self,
                    loss,
62
                    startup_program,
63 64 65
                    parameter_list=None,
                    no_grad_set=None):
        assert startup_program is not None
66
        main_program = loss.block.program
67

Z
zhaoyingli 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80
        if self._dist_strategy.auto_search:
            # auto search
            _logger.info("Start search dist attr.")
            # self._dist_context, _ = auto_search(main_program, startup_program,
            #                                     loss, self._optimizer)
            # completed_main_program = main_program
            raise NotImplementedError("Auto search has not implemented")
        else:
            # Annotation completion
            _logger.info("Start annotation dist attr.")
            completed_main_program = complete_annotation(main_program,
                                                         self._dist_context)

81 82 83 84 85 86 87 88 89 90 91 92
        # Logical partition 
        rank = paddle.distributed.get_rank()
        partitioner = Partitioner(self._dist_strategy, self._dist_context, rank)
        partitioned_main_prog, partitioned_startup_prog = partitioner.transpile_forward(
            completed_main_program, startup_program)
        dist_params_grads = partitioner.apply_backward(
            loss, completed_main_program, startup_program,
            partitioned_main_prog, partitioned_startup_prog)
        dist_optimize_ops = partitioner.apply_optimize(
            self._optimizer, dist_params_grads, partitioned_main_prog,
            partitioned_startup_prog)

Z
zhaoyingli 已提交
93 94
        # set the grad var shape
        set_grad_var_shape(partitioned_main_prog, self._dist_context)
95

96 97 98
        # The last step: remove all distributed attributes to be compatiable
        # with inference.
        self._remove_distributed_attrs(partitioned_main_prog)
99 100
        make_data_unshard(partitioned_main_prog, partitioned_startup_prog,
                          self._dist_context)
101

C
caozhou 已提交
102 103 104
        reshard(partitioned_main_prog, partitioned_startup_prog, rank,
                self._dist_context)

Z
zhaoyingli 已提交
105 106 107 108
        # Traverse different rank programs and traverse each op of them,
        # instantiate communication by process_mapping.
        all_process_groups = get_all_process_groups()
        for process_group in all_process_groups:
109
            if rank not in process_group.ranks:
Z
zhaoyingli 已提交
110 111 112
                continue
            process_group.instantiate()

113 114 115
        # Copy distributed info to the default context
        set_default_distributed_context(self._dist_context)

116
        return dist_optimize_ops, dist_params_grads, partitioned_startup_prog, partitioned_main_prog