parallelizer_v2.py 12.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
#   Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
16 17
import time
import logging
18 19 20

from paddle.fluid import program_guard
from paddle.fluid.backward import append_backward
21
from paddle.fluid.framework import unique_name
22 23 24 25
from paddle.distributed.passes import new_pass

from .reshard import Resharder
from .partitioner import Partitioner
26
from .utils import set_grad_var_shape
27
from .utils import get_logger
28
from .process_group import get_world_process_group
29 30 31


class Parallelizer:
32

33 34 35 36
    def __init__(self, mode, completer, dist_context):
        self._mode = mode
        self._completer = completer
        self._dist_context = dist_context
37
        assert self._dist_context._is_initialized
38 39
        self._pass_context = self._dist_context.pass_context
        self._strategy = self._dist_context.strategy
40
        self._logger = get_logger(logging.INFO)
41 42 43 44 45

    def parallel_all(self):
        world_process_group = get_world_process_group()
        all_ranks = world_process_group.ranks
        for rank in all_ranks:
46
            # self._dist_context._backup(serial=True, dist=True)
47
            self.parallel(rank)
48
            # self._dist_context._restore(serial=True, dist=True)
49 50 51 52 53 54 55

    def parallel(self, rank):
        serial_main_program = self._dist_context.serial_main_program
        serial_startup_program = self._dist_context.serial_startup_program
        serial_optimizer = self._dist_context.serial_optimizer
        if self._mode == "train" and serial_optimizer:
            # Generate backward
56
            serial_loss = self._dist_context.serial_loss
57 58 59
            params_grads = self._generate_backward(serial_main_program,
                                                   serial_startup_program,
                                                   serial_loss)
60
            # Apply pre optimization passes
61
            time0 = time.time()
62 63 64
            serial_main_program, serial_startup_program, params_grads = self._apply_pre_optimization(
                serial_main_program, serial_startup_program, serial_loss,
                serial_optimizer, params_grads)
65 66 67
            self._logger.info(
                "within parallel apply_pre_optimization time: {}, mode {}".
                format(time.time() - time0, self._mode))
68
            # Do logical partition
69
            time0 = time.time()
70 71 72
            partitioner = Partitioner(self._dist_context, rank)
            dist_main_prog, dist_startup_prog, dist_params_grads = partitioner.partition(
                serial_main_program, serial_startup_program, params_grads)
73 74 75
            self._logger.info(
                "within parallel partitioner time: {}, mode {}".format(
                    time.time() - time0, self._mode))
76
            # Generate optimizer
77
            time0 = time.time()
78 79
            self._generate_optimizer(dist_main_prog, dist_startup_prog,
                                     serial_optimizer, dist_params_grads)
80 81 82
            self._logger.info(
                "within parallel optimizer time: {}, mode {}".format(
                    time.time() - time0, self._mode))
83
            # Do reshard process
84
            time0 = time.time()
85 86 87 88
            set_grad_var_shape(dist_main_prog, self._dist_context)
            resharder = Resharder(dist_main_prog, dist_startup_prog, rank,
                                  self._dist_context, dist_params_grads)
            resharder.reshard()
89 90 91
            self._logger.info(
                "within parallel reshard time: {}, mode {}".format(
                    time.time() - time0, self._mode))
92
            # Apply post optimization passes
93
            time0 = time.time()
94 95
            self._apply_post_optimization(dist_main_prog, dist_startup_prog,
                                          rank, dist_params_grads)
96 97 98
            self._logger.info(
                "within parallel apply_post_optimization time: {}, mode {}".
                format(time.time() - time0, self._mode))
99 100
        else:
            # Apply pre optimization passes
101 102 103 104 105 106 107
            time0 = time.time()
            self._apply_pre_optimization(serial_main_program,
                                         serial_startup_program, None, None,
                                         None)
            self._logger.info(
                "within parallel apply_pre_optimization time: {}, mode {}".
                format(time.time() - time0, self._mode))
108
            # Do logical partition
109
            time0 = time.time()
110 111 112 113
            partitioner = Partitioner(self._dist_context, rank)
            dist_main_prog, dist_startup_prog, dist_params_grads = partitioner.partition(
                serial_main_program, serial_startup_program, [])
            # Do reshard process
114 115 116 117
            self._logger.info(
                "within parallel partitioner time: {}, mode {}".format(
                    time.time() - time0, self._mode))
            time0 = time.time()
118 119 120
            resharder = Resharder(dist_main_prog, dist_startup_prog, rank,
                                  self._dist_context, [], 1)
            resharder.reshard()
121 122 123
            self._logger.info(
                "within parallel reshard time: {}, mode {}".format(
                    time.time() - time0, self._mode))
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
        # Clone program for test
        if self._mode != 'train':
            dist_main_prog = dist_main_prog.clone(for_test=True)
            dist_startup_prog = dist_startup_prog.clone(for_test=True)

        # Store the distributed programs for further usages
        self._dist_context.dist_main_programs[rank] = dist_main_prog
        self._dist_context.dist_startup_programs[rank] = dist_startup_prog

    def _generate_backward(self, main_program, startup_program, loss):
        with program_guard(main_program, startup_program):
            params_grads = append_backward(
                loss, distop_context=self._dist_context.dist_op_context)
        self._completer.complete_backward_annotation(main_program)
        self._dist_context.block_state.parse_backward_blocks(main_program)
        return params_grads

    def _generate_optimizer(self, main_program, startup_program, optimizer,
                            params_grads):
143 144
        # NOTE: `apply_gradients` will add an Accumulator for a parameter only once,
        # but optimizer will be called repeatedly in re-launch, so optimizer need to be copied.
145
        optimizer = copy.deepcopy(optimizer)
146
        self._dist_context._lr_optimizer = optimizer
147
        with program_guard(main_program, startup_program):
148 149
            with unique_name.guard("opt_"):
                optimizer_ops = optimizer.apply_gradients(params_grads)
150 151 152 153 154 155 156
        self._completer.complete_update_annotation(main_program)
        return optimizer_ops

    def _apply_pre_optimization(self, main_program, startup_program, loss,
                                optimizer, params_grads):
        if self._strategy is None:
            return
157 158 159

        # apply quantization pass
        # The pass can be applied when mode must be 'train'
160 161
        if self._mode == 'train' and self._strategy.qat.enable:
            config = copy.deepcopy(self._strategy.qat.to_dict())
162 163 164 165 166 167 168 169 170 171 172
            config["dist_context"] = self._dist_context
            config["params_grads"] = params_grads
            auto_parallel_quantization_pass = new_pass(
                "auto_parallel_quantization", config)
            auto_parallel_quantization_pass.apply([main_program],
                                                  [startup_program],
                                                  self._pass_context)
            main_program = self._pass_context.get_attr("main_program")
            startup_program = self._pass_context.get_attr("startup_program")
            params_grads = self._pass_context.get_attr("params_grads")

173
        # apply amp pass
174 175
        # FIXME we disenable amp for eval since it has a little bug with
        # eval program and which will be fixed in future
176
        if self._strategy.amp.enable:
177
            config = copy.deepcopy(self._strategy.amp.to_dict())
178 179 180 181 182 183 184 185
            config["dist_context"] = self._dist_context
            config["params_grads"] = params_grads
            config["loss"] = loss
            config["input_data"] = self._dist_context.serial_feed_vars["inputs"] \
                + self._dist_context.serial_feed_vars["labels"]
            if config["use_pure_fp16"]:
                config["base_opt"] = optimizer
                auto_parallel_fp16_pass = new_pass("auto_parallel_fp16", config)
186 187
                auto_parallel_fp16_pass.apply([main_program], [startup_program],
                                              self._pass_context)
188
                loss = auto_parallel_fp16_pass.get_loss()
189 190 191 192
            else:
                auto_parallel_amp_pass = new_pass("auto_parallel_amp", config)
                auto_parallel_amp_pass.apply([main_program], [startup_program],
                                             self._pass_context)
193
                loss = auto_parallel_amp_pass.get_loss()
194 195

        # apply recompute pass
196
        # recompute is then train-only optimization
197 198
        if self._mode == "train" and self._strategy.recompute.enable:
            config = copy.deepcopy(self._strategy.recompute.to_dict())
199 200 201 202 203
            config["dist_context"] = self._dist_context
            config["no_grad_set"] = None
            config["loss"] = loss
            auto_parallel_recompute_pass = new_pass("auto_parallel_recompute",
                                                    config)
204 205
            auto_parallel_recompute_pass.apply([main_program],
                                               [startup_program],
206
                                               self._pass_context)
207

208 209
        return main_program, startup_program, params_grads

210 211 212 213
    def _apply_post_optimization(self, main_program, startup_program, rank,
                                 params_grads):
        if self._strategy is None:
            return
214 215 216 217 218

        # data parallel optimization
        config = {}
        config["dist_context"] = self._dist_context
        config["global_rank"] = rank
219
        config["use_sharding"] = self._strategy.sharding.enable
220 221 222
        dp_pass = new_pass("auto_parallel_data_parallel_optimization", config)
        dp_pass.apply([main_program], [startup_program], self._pass_context)

223 224
        if self._strategy.sharding.enable:
            config = copy.deepcopy(self._strategy.sharding.to_dict())
225 226 227 228 229
            config["dist_context"] = self._dist_context
            config["params_grads"] = params_grads
            config["global_rank"] = rank
            auto_parallel_sharding_pass = new_pass("auto_parallel_sharding",
                                                   config)
230
            auto_parallel_sharding_pass.apply([main_program], [startup_program],
231
                                              self._pass_context)
232
            params_grads = self._pass_context.get_attr("params_grads")
233

234 235
        # GradClip is train-only optimization
        if self._mode == "train":
236
            config = copy.deepcopy(self._strategy.sharding.to_dict())
237 238 239 240 241 242 243 244 245
            config["dist_context"] = self._dist_context
            config["params_grads"] = params_grads
            config["rank_id"] = rank
            auto_parallel_clip_pass = new_pass("auto_parallel_grad_clip",
                                               config)
            auto_parallel_clip_pass.apply([main_program], [startup_program],
                                          self._pass_context)

        # gradient_merge is then train-only optimization
246 247
        if self._mode == "train" and self._strategy.gradient_merge.enable:
            config = copy.deepcopy(self._strategy.gradient_merge.to_dict())
248 249 250 251
            config["dist_context"] = self._dist_context
            config["params_grads"] = params_grads
            auto_parallel_gradient_merge_pass = new_pass(
                "auto_parallel_gradient_merge_pass", config)
252 253
            auto_parallel_gradient_merge_pass.apply([main_program],
                                                    [startup_program],
254
                                                    self._pass_context)