common.py 17.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License

15
import abc
16
from paddle.distributed.fleet.meta_optimizers.common import OP_ROLE_KEY, OpRole
17
from ..dist_attribute import OperatorDistributedAttribute
18
from ..utils import _get_comm_group, _get_corresponding_rank, is_optimize_op
19
from ..process_group import new_process_group
20

21 22
_g_distributed_operator_impl_containers = {}

23
_g_elementwise_ops = [
24 25
    "elementwise", "gelu", "dropout", "cast", "gather", "concat",
    "fused_softmax_mask_upper_triangle"
26
]
27
BACKWARD_ONLY_DIST_OPS = {'check_finite_and_unscale', 'update_loss_scaling'}
28 29


30 31 32 33 34 35 36 37 38 39
class ParallelMode():
    """
    the parallel mode for communication or auxiliary operator
    """
    DataParallel = "auto_parallel/data_parallel"
    ModelParallel = "auto_parallel/model_parallel"
    PipelineParalel = "auto_parallel/pipeline_paralel"
    MoEParallel = "auto_parallel/moe_parallel"


40
def is_elementwise_op(op_type):
41 42 43 44
    if op_type in _g_elementwise_ops:
        return True
    if "elementwise" in op_type:
        return True
45
    return False
46 47


48
class DistributedOperatorImplContainer:
49

50 51
    def __init__(self, op_type):
        self._type = op_type
52
        self._impls = []
53 54 55 56 57 58 59 60 61 62 63 64

    @property
    def type(self):
        return self._type

    @type.setter
    def type(self, op_type):
        self._type = op_type

    @property
    def impls(self):
        return self._impls
65 66

    def register_impl(self, dist_impl):
67 68 69 70
        assert self.type == dist_impl.type, \
            "Op type of container must be same as that of the implementation."
        impl_idx = len(self.impls)
        dist_impl.idx = impl_idx
71 72 73 74 75
        self._impls.append(dist_impl)

    def get_impl(self, impl_idx):
        return self._impls[impl_idx]

76 77 78 79 80 81
    def get_input_compatible_impls(self, dist_op):
        compatible_impls = []
        for impl in self.impls:
            if impl.is_input_compatible(dist_op):
                compatible_impls.append(impl)
        return compatible_impls
82

83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
    def get_output_compatible_impls(self, dist_op):
        compatible_impls = []
        for impl in self.impls:
            if impl.is_output_compatible(dist_op):
                compatible_impls.append(impl)
        return compatible_impls

    def get_compatible_impls(self, dist_op):
        compatible_impls = []
        for impl in self.impls:
            if impl.is_auto_compatible(dist_op):
                compatible_impls.append(impl)
        return compatible_impls


class DistributedOperatorImpl(abc.ABC):
99

100 101 102 103
    def __init__(self, name):
        self._name = name
        self._type = None
        self._idx = None
104 105
        self._forward_implemented = False
        self._backward_implemented = False
106

107 108 109
    @property
    def name(self):
        return self._name
110

111 112 113
    @name.setter
    def name(self, name):
        self._name = name
114

115 116 117 118 119 120 121 122 123 124 125
    @property
    def type(self):
        return self._type

    @type.setter
    def type(self, op_type):
        self._type = op_type

    @property
    def idx(self):
        return self._idx
126

127 128 129 130 131
    @idx.setter
    def idx(self, impl_idx):
        self._idx = impl_idx

    @abc.abstractmethod
132
    def is_input_compatible(self, dist_op):
133 134
        raise NotImplementedError("Please Implement this method in Subclass.")

135
    @abc.abstractmethod
136
    def is_output_compatible(self, dist_op):
137 138
        raise NotImplementedError("Please Implement this method in Subclass.")

139
    @abc.abstractmethod
沉潜的鱼儿's avatar
沉潜的鱼儿 已提交
140 141 142
    def is_auto_compatible(self, dist_op):
        raise NotImplementedError("Please Implement this method in Subclass.")

143 144 145 146 147 148 149 150 151 152
    @staticmethod
    @abc.abstractmethod
    def forward(dist_ctx, *args, **kwargs):
        raise NotImplementedError("Please Implement this method in Subclass.")

    @staticmethod
    @abc.abstractmethod
    def backward(dist_ctx, *grad_outputs, **kwargs):
        raise NotImplementedError("Please Implement this method in Subclass.")

153
    def update_dims_mapping(self, dist_op):
154 155 156
        raise NotImplementedError("Please Implement this method in Subclass.")


157 158 159
def register_distributed_operator_impl_container(container):
    global _g_distributed_operator_impl_containers
    _g_distributed_operator_impl_containers[container.type] = container
160 161


162 163 164
def get_distributed_operator_impl_container(op_type):
    global _g_distributed_operator_impl_containers
    return _g_distributed_operator_impl_containers.get(op_type, None)
165 166


167 168
def register_distributed_operator_impl(op_type, dist_impl):
    dist_op_impl_container = get_distributed_operator_impl_container(op_type)
169
    if dist_op_impl_container is not None:
170
        dist_impl.type = op_type
171
        dist_op_impl_container.register_impl(dist_impl)
172
    else:
173
        assert False, "Must register distributed operator registry first."
174 175


176
def find_compatible_distributed_operator_impls(dist_op, fwd=True, partial=True):
177
    """
178
    Here just return the first compatible implemention.
179 180
    This will be improved by cost model in the future.
    """
181 182 183 184 185 186
    op_type = dist_op.serial_op.type
    dist_op_impl_container = get_distributed_operator_impl_container(op_type)
    dist_op_eltwise_impl_container = get_distributed_operator_impl_container(
        "elementwise")
    dist_op_default_impl_container = get_distributed_operator_impl_container(
        "default")
187
    compatible_impls = []
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
    if partial:
        if fwd:
            # First, find impls in the corresponding container
            if dist_op_impl_container:
                compatible_impls.extend(
                    dist_op_impl_container.get_input_compatible_impls(dist_op))
            # Second, find impls in the elementwise container
            if dist_op_eltwise_impl_container and is_elementwise_op(op_type):
                compatible_impls.extend(
                    dist_op_eltwise_impl_container.get_input_compatible_impls(
                        dist_op))
            # Third, find impls in the default container
            if dist_op_default_impl_container:
                compatible_impls.extend(
                    dist_op_default_impl_container.get_input_compatible_impls(
                        dist_op))
        else:
            # First, find impls in the corresponding container
            if dist_op_impl_container:
                compatible_impls.extend(
                    dist_op_impl_container.get_output_compatible_impls(dist_op))
            # Second, find impls in the elementwise container
            if dist_op_eltwise_impl_container and is_elementwise_op(op_type):
                compatible_impls.extend(
                    dist_op_eltwise_impl_container.get_output_compatible_impls(
                        dist_op))
            # Third, find impls in the default container
            if dist_op_default_impl_container:
                compatible_impls.extend(
                    dist_op_default_impl_container.get_output_compatible_impls(
                        dist_op))
219
    else:
220 221 222
        # First, find impls in the corresponding container
        if dist_op_impl_container:
            compatible_impls.extend(
223
                dist_op_impl_container.get_compatible_impls(dist_op))
224 225 226
        # Second, find impls in the elementwise container
        if dist_op_eltwise_impl_container and is_elementwise_op(op_type):
            compatible_impls.extend(
227
                dist_op_eltwise_impl_container.get_compatible_impls(dist_op))
228 229 230
        # Third, find impls in the default container
        if dist_op_default_impl_container:
            compatible_impls.extend(
231 232
                dist_op_default_impl_container.get_compatible_impls(dist_op))

233
    if compatible_impls:
234
        # For now, just return the first compatible impl
235 236
        # best_compatible_impl = compatible_impls[0]
        best_compatible_impl = compatible_impls
237
    else:
238 239
        best_compatible_impl = None
    return best_compatible_impl
240 241


J
JZ-LIANG 已提交
242
def is_parameter_related(varname, block):
243 244
    if ".subprog_" in varname:
        varname = varname[:varname.index(".subprog_")]
J
JZ-LIANG 已提交
245 246
    if ".cast_fp" in varname:
        varname = varname[:varname.index(".cast_fp")]
247 248
    if ".quantized" in varname:
        varname = varname[:varname.index(".quantized")]
J
JZ-LIANG 已提交
249 250 251 252 253
    assert block.has_var(varname)
    var = block.var(varname)
    return var.is_parameter


Z
zhaoyingli 已提交
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
def infer_shape(block, src_var, src_var_dist_attr, op_input_dist_attr):
    var_shape = block.var(src_var.name).shape
    var_topoloy = src_var_dist_attr.process_mesh.topology
    var_dims_mapping = src_var_dist_attr.dims_mapping

    complete_shape = []
    for idx, shape in enumerate(var_shape):
        if var_dims_mapping[idx] == -1:
            complete_shape.append(shape)
        else:
            new_shape = shape * var_topoloy[var_dims_mapping[idx]]
            complete_shape.append(new_shape)

    exact_shape = []
    input_topology = op_input_dist_attr.process_mesh.topology
    input_dims_mapping = op_input_dist_attr.dims_mapping
    for idx, shape in enumerate(complete_shape):
        if input_dims_mapping[idx] == -1:
            exact_shape.append(shape)
        else:
            new_shape = shape // input_topology[input_dims_mapping[idx]]
            exact_shape.append(new_shape)

    return exact_shape
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320


def set_comm_op_dist_attr_for_program(new_op, process_mesh, tensor_dist_attr,
                                      ctx):
    assert process_mesh is not None
    assert tensor_dist_attr is not None

    new_op_dist_attr = OperatorDistributedAttribute()
    new_op_dist_attr.process_mesh = process_mesh
    for input_varname in new_op.desc.input_arg_names():
        new_op_dist_attr.set_input_dist_attr(input_varname, tensor_dist_attr)
    for output_varname in new_op.desc.output_arg_names():
        new_op_dist_attr.set_output_dist_attr(output_varname, tensor_dist_attr)
    ctx.set_op_dist_attr_for_program(new_op, new_op_dist_attr)


def naive_copy_op_dist_attr_for_program(new_op, ref_op, ctx):

    ref_dist_attr = ctx.get_op_dist_attr_for_program(ref_op)
    new_op_dist_attr = OperatorDistributedAttribute()
    new_op_dist_attr.process_mesh = ref_dist_attr.process_mesh

    for input_name in ref_op.input_names:
        assert input_name in new_op.input_names
        assert len(ref_op.input(input_name)) == 1
        assert len(new_op.input(input_name)) == 1

        ref_tensor_dist_attr = ref_dist_attr.get_input_dist_attr(
            ref_op.input(input_name)[0])
        new_op_dist_attr.set_input_dist_attr(
            new_op.input(input_name)[0], ref_tensor_dist_attr)

    for output_name in ref_op.output_names:
        assert output_name in new_op.output_names
        assert len(ref_op.output(output_name)) == 1
        assert len(new_op.output(output_name)) == 1

        ref_tensor_dist_attr = ref_dist_attr.get_output_dist_attr(
            ref_op.output(output_name)[0])
        new_op_dist_attr.set_output_dist_attr(
            new_op.output(output_name)[0], ref_tensor_dist_attr)

    ctx.set_op_dist_attr_for_program(new_op, new_op_dist_attr)
321 322 323 324 325 326 327 328


def get_data_parallel_group(dist_ctx, op, act_grad_names, rank):
    """
    deduce the data parallel communication group for current operator.

    Args:
        dist_ctx (DistributedContext): dist context.
329 330 331
        op (Operator): the current (backward) operator which might need.
        act_grad_names (list): list of input activation grads variable name to the current operator.
        out_grad_names (list): list of the output parameter's grads variable name of the current operator.
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
        rank (int): global ranks index for current process.
    """
    dp_group = None

    op_dist_attr = dist_ctx.get_op_dist_attr_for_program(op)
    process_mesh = op_dist_attr.process_mesh
    mesh_shape = process_mesh.topology
    # FIXME Hack for Pipeline Parallelism where the current operator
    # not belong to the mesh the current rank belong to.
    if rank not in process_mesh.processes:
        rank = _get_corresponding_rank(dist_ctx, process_mesh, rank)

    for var_name in act_grad_names:
        var_dim_mapping = op_dist_attr.get_input_dims_mapping(var_name)
        # consider that the variable's shape is None
        # TODO utilize the batch_dim attr instead of "0" in future
        batch_size_axis = var_dim_mapping[0] if len(var_dim_mapping) > 0 else -1

        if batch_size_axis > -1 and mesh_shape[batch_size_axis] > 1:
            group_ranks = _get_comm_group(process_mesh.processes,
                                          process_mesh.topology,
                                          batch_size_axis, rank)
            dp_group = new_process_group(group_ranks)
            break

    return dp_group


def sync_and_scale_gradients(dist_ctx, op, dp_group, allreduce_var_names):
    """
362
    insert the allreudce and scale ops for gradients of model
363 364 365 366
    parameters for operator in data parallelism.

    Args:
        dist_ctx (DistributedContext): dist context.
367 368
        op (Operator): the current (backward) operator which might need.
        allreduce_var_names (list): list of the parameter's grads variable name in the current operator output.
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418
    """

    op_dist_attr = dist_ctx.get_op_dist_attr_for_program(op)
    process_mesh = op_dist_attr.process_mesh
    dist_op_context = dist_ctx.dist_op_context
    main_block = dist_op_context.work_block
    dp_degree = len(dp_group.ranks)

    for var_name in allreduce_var_names:
        added_ops = []
        grad_var = main_block.var(var_name)
        allreduce_op = main_block.append_op(type='c_allreduce_sum',
                                            inputs={'X': [grad_var]},
                                            outputs={'Out': [grad_var]},
                                            attrs={
                                                'ring_id': dp_group.id,
                                                'use_calc_stream': True,
                                                OP_ROLE_KEY: OpRole.Backward
                                            })
        allreduce_op._set_attr('op_namescope',
                               str('/') + ParallelMode.DataParallel)
        added_ops.append(allreduce_op)

        if dist_ctx.gradient_scale:
            scale_op = main_block.append_op(type='scale',
                                            inputs={'X': grad_var},
                                            outputs={'Out': grad_var},
                                            attrs={
                                                'scale': 1.0 / dp_degree,
                                                OP_ROLE_KEY: OpRole.Backward
                                            })
            scale_op._set_attr('op_namescope',
                               str('/') + ParallelMode.DataParallel)
            added_ops.append(scale_op)

        dims_mapping = op_dist_attr.get_output_dims_mapping(grad_var.name)
        assert dims_mapping is not None, "Unexception: dims_mapping of output [{}] of op [{}] is None".format(
            grad_var.name, op_dist_attr.op_type)
        # NOTE auxiliary op's dist attr should follow dist_op not dist_tensor
        for new_op in added_ops:
            new_op_attr = OperatorDistributedAttribute()
            new_op_attr.process_mesh = process_mesh
            new_op_attr.set_output_dims_mapping(grad_var.name, dims_mapping)
            new_op_attr.set_input_dims_mapping(grad_var.name, dims_mapping)
            dist_ctx.set_op_dist_attr_for_program(new_op, new_op_attr)


def gradient_synchronization(dist_ctx, op, act_grad_names, out_grad_names,
                             rank):
    """
419
    conduct the allreudce and scaling(dp size)for gradients of model
420 421 422 423
    parameters for operator in data parallelism.

    Args:
        dist_ctx (DistributedContext): dist context.
424 425 426
        op (Operator): the current (backward) operator which might need.
        act_grad_names (list): list of input activation grads variable name to the current operator.
        out_grad_names (list): list of the output parameter's grads variable name of the current operator.
427 428 429
        rank (int): global ranks index for current process.
    """

430 431 432
    if not is_in_backward_phase(dist_ctx):
        return

433 434
    if is_optimize_op(op) or len(act_grad_names) == 0 or len(
            out_grad_names) == 0:
435 436 437 438 439 440 441 442
        return

    dp_group = get_data_parallel_group(dist_ctx, op, act_grad_names, rank)

    if not dp_group:
        return

    sync_and_scale_gradients(dist_ctx, op, dp_group, out_grad_names)
443 444 445 446 447 448 449 450 451 452


def is_data_parallel_scale_op(op):
    return op.type == "scale" and op.desc.has_attr("op_namescope") \
            and ParallelMode.DataParallel in op.desc.attr("op_namescope")


def is_data_parallel_reduce_op(op):
    return op.type in ["c_reduce_sum", "c_allreduce_sum"] and op.desc.has_attr("op_namescope") \
            and ParallelMode.DataParallel in op.desc.attr("op_namescope")
453 454 455 456 457 458 459 460 461


def is_in_backward_phase(dist_ctx):
    # NOTE currently high-order differential in Paddle dose NOT distinguish gradient computation operators
    # in Forward phase and operators in Backward phase (both with op_role=1), which will mislead
    # auto parallel to add gradient synchronization for gradient computation operators in Forward phase.
    # we use this FLAG to distinguish these two phases temporarily.

    return dist_ctx.dist_op_context.in_backward_phase()