common.py 17.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License

15
import abc
16
from paddle.distributed.fleet.meta_optimizers.common import OP_ROLE_KEY, OpRole
17
from ..dist_attribute import OperatorDistributedAttribute
18
from ..utils import _get_comm_group, _get_corresponding_rank, is_optimize_op
19
from ..process_group import new_process_group
20

21 22
_g_distributed_operator_impl_containers = {}

23
_g_elementwise_ops = [
24 25 26 27 28 29 30
    "elementwise",
    "gelu",
    "dropout",
    "cast",
    "gather",
    "concat",
    "fused_softmax_mask_upper_triangle",
31
]
32
BACKWARD_ONLY_DIST_OPS = {'check_finite_and_unscale', 'update_loss_scaling'}
33 34


35
class ParallelMode:
36 37 38
    """
    the parallel mode for communication or auxiliary operator
    """
39

40 41 42 43 44 45
    DataParallel = "auto_parallel/data_parallel"
    ModelParallel = "auto_parallel/model_parallel"
    PipelineParalel = "auto_parallel/pipeline_paralel"
    MoEParallel = "auto_parallel/moe_parallel"


46
def is_elementwise_op(op_type):
47 48 49 50
    if op_type in _g_elementwise_ops:
        return True
    if "elementwise" in op_type:
        return True
51
    return False
52 53


54
class DistributedOperatorImplContainer:
55 56
    def __init__(self, op_type):
        self._type = op_type
57
        self._impls = []
58 59 60 61 62 63 64 65 66 67 68 69

    @property
    def type(self):
        return self._type

    @type.setter
    def type(self, op_type):
        self._type = op_type

    @property
    def impls(self):
        return self._impls
70 71

    def register_impl(self, dist_impl):
72 73 74
        assert (
            self.type == dist_impl.type
        ), "Op type of container must be same as that of the implementation."
75 76
        impl_idx = len(self.impls)
        dist_impl.idx = impl_idx
77 78 79 80 81
        self._impls.append(dist_impl)

    def get_impl(self, impl_idx):
        return self._impls[impl_idx]

82 83 84 85 86 87
    def get_input_compatible_impls(self, dist_op):
        compatible_impls = []
        for impl in self.impls:
            if impl.is_input_compatible(dist_op):
                compatible_impls.append(impl)
        return compatible_impls
88

89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
    def get_output_compatible_impls(self, dist_op):
        compatible_impls = []
        for impl in self.impls:
            if impl.is_output_compatible(dist_op):
                compatible_impls.append(impl)
        return compatible_impls

    def get_compatible_impls(self, dist_op):
        compatible_impls = []
        for impl in self.impls:
            if impl.is_auto_compatible(dist_op):
                compatible_impls.append(impl)
        return compatible_impls


class DistributedOperatorImpl(abc.ABC):
    def __init__(self, name):
        self._name = name
        self._type = None
        self._idx = None
109 110
        self._forward_implemented = False
        self._backward_implemented = False
111

112 113 114
    @property
    def name(self):
        return self._name
115

116 117 118
    @name.setter
    def name(self, name):
        self._name = name
119

120 121 122 123 124 125 126 127 128 129 130
    @property
    def type(self):
        return self._type

    @type.setter
    def type(self, op_type):
        self._type = op_type

    @property
    def idx(self):
        return self._idx
131

132 133 134 135 136
    @idx.setter
    def idx(self, impl_idx):
        self._idx = impl_idx

    @abc.abstractmethod
137
    def is_input_compatible(self, dist_op):
138 139
        raise NotImplementedError("Please Implement this method in Subclass.")

140
    @abc.abstractmethod
141
    def is_output_compatible(self, dist_op):
142 143
        raise NotImplementedError("Please Implement this method in Subclass.")

144
    @abc.abstractmethod
沉潜的鱼儿's avatar
沉潜的鱼儿 已提交
145 146 147
    def is_auto_compatible(self, dist_op):
        raise NotImplementedError("Please Implement this method in Subclass.")

148 149 150 151 152 153 154 155 156 157
    @staticmethod
    @abc.abstractmethod
    def forward(dist_ctx, *args, **kwargs):
        raise NotImplementedError("Please Implement this method in Subclass.")

    @staticmethod
    @abc.abstractmethod
    def backward(dist_ctx, *grad_outputs, **kwargs):
        raise NotImplementedError("Please Implement this method in Subclass.")

158
    def update_dims_mapping(self, dist_op):
159 160 161
        raise NotImplementedError("Please Implement this method in Subclass.")


162 163 164
def register_distributed_operator_impl_container(container):
    global _g_distributed_operator_impl_containers
    _g_distributed_operator_impl_containers[container.type] = container
165 166


167 168 169
def get_distributed_operator_impl_container(op_type):
    global _g_distributed_operator_impl_containers
    return _g_distributed_operator_impl_containers.get(op_type, None)
170 171


172 173
def register_distributed_operator_impl(op_type, dist_impl):
    dist_op_impl_container = get_distributed_operator_impl_container(op_type)
174
    if dist_op_impl_container is not None:
175
        dist_impl.type = op_type
176
        dist_op_impl_container.register_impl(dist_impl)
177
    else:
178
        assert False, "Must register distributed operator registry first."
179 180


181
def find_compatible_distributed_operator_impls(dist_op, fwd=True, partial=True):
182
    """
183
    Here just return the first compatible implemention.
184 185
    This will be improved by cost model in the future.
    """
186 187 188
    op_type = dist_op.serial_op.type
    dist_op_impl_container = get_distributed_operator_impl_container(op_type)
    dist_op_eltwise_impl_container = get_distributed_operator_impl_container(
189 190
        "elementwise"
    )
191
    dist_op_default_impl_container = get_distributed_operator_impl_container(
192 193
        "default"
    )
194
    compatible_impls = []
195 196 197 198 199
    if partial:
        if fwd:
            # First, find impls in the corresponding container
            if dist_op_impl_container:
                compatible_impls.extend(
200 201
                    dist_op_impl_container.get_input_compatible_impls(dist_op)
                )
202 203 204 205
            # Second, find impls in the elementwise container
            if dist_op_eltwise_impl_container and is_elementwise_op(op_type):
                compatible_impls.extend(
                    dist_op_eltwise_impl_container.get_input_compatible_impls(
206 207 208
                        dist_op
                    )
                )
209 210 211 212
            # Third, find impls in the default container
            if dist_op_default_impl_container:
                compatible_impls.extend(
                    dist_op_default_impl_container.get_input_compatible_impls(
213 214 215
                        dist_op
                    )
                )
216 217 218 219
        else:
            # First, find impls in the corresponding container
            if dist_op_impl_container:
                compatible_impls.extend(
220 221
                    dist_op_impl_container.get_output_compatible_impls(dist_op)
                )
222 223 224 225
            # Second, find impls in the elementwise container
            if dist_op_eltwise_impl_container and is_elementwise_op(op_type):
                compatible_impls.extend(
                    dist_op_eltwise_impl_container.get_output_compatible_impls(
226 227 228
                        dist_op
                    )
                )
229 230 231 232
            # Third, find impls in the default container
            if dist_op_default_impl_container:
                compatible_impls.extend(
                    dist_op_default_impl_container.get_output_compatible_impls(
233 234 235
                        dist_op
                    )
                )
236
    else:
237 238 239
        # First, find impls in the corresponding container
        if dist_op_impl_container:
            compatible_impls.extend(
240 241
                dist_op_impl_container.get_compatible_impls(dist_op)
            )
242 243 244
        # Second, find impls in the elementwise container
        if dist_op_eltwise_impl_container and is_elementwise_op(op_type):
            compatible_impls.extend(
245 246
                dist_op_eltwise_impl_container.get_compatible_impls(dist_op)
            )
247 248 249
        # Third, find impls in the default container
        if dist_op_default_impl_container:
            compatible_impls.extend(
250 251
                dist_op_default_impl_container.get_compatible_impls(dist_op)
            )
252

253
    if compatible_impls:
254
        # For now, just return the first compatible impl
255 256
        # best_compatible_impl = compatible_impls[0]
        best_compatible_impl = compatible_impls
257
    else:
258 259
        best_compatible_impl = None
    return best_compatible_impl
260 261


J
JZ-LIANG 已提交
262
def is_parameter_related(varname, block):
263
    if ".subprog_" in varname:
264
        varname = varname[: varname.index(".subprog_")]
J
JZ-LIANG 已提交
265
    if ".cast_fp" in varname:
266
        varname = varname[: varname.index(".cast_fp")]
267
    if ".quantized" in varname:
268
        varname = varname[: varname.index(".quantized")]
Z
zhaoyingli 已提交
269 270
    assert block._find_var_recursive(varname)
    var = block._var_recursive(varname)
J
JZ-LIANG 已提交
271 272 273
    return var.is_parameter


Z
zhaoyingli 已提交
274
def infer_shape(block, src_var, src_var_dist_attr, op_input_dist_attr):
Z
zhaoyingli 已提交
275
    var_shape = block._var_recursive(src_var.name).shape
Z
zhaoyingli 已提交
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
    var_topoloy = src_var_dist_attr.process_mesh.topology
    var_dims_mapping = src_var_dist_attr.dims_mapping

    complete_shape = []
    for idx, shape in enumerate(var_shape):
        if var_dims_mapping[idx] == -1:
            complete_shape.append(shape)
        else:
            new_shape = shape * var_topoloy[var_dims_mapping[idx]]
            complete_shape.append(new_shape)

    exact_shape = []
    input_topology = op_input_dist_attr.process_mesh.topology
    input_dims_mapping = op_input_dist_attr.dims_mapping
    for idx, shape in enumerate(complete_shape):
        if input_dims_mapping[idx] == -1:
            exact_shape.append(shape)
        else:
            new_shape = shape // input_topology[input_dims_mapping[idx]]
            exact_shape.append(new_shape)

    return exact_shape
298 299


300 301 302
def set_comm_op_dist_attr_for_program(
    new_op, process_mesh, tensor_dist_attr, ctx
):
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
    assert process_mesh is not None
    assert tensor_dist_attr is not None

    new_op_dist_attr = OperatorDistributedAttribute()
    new_op_dist_attr.process_mesh = process_mesh
    for input_varname in new_op.desc.input_arg_names():
        new_op_dist_attr.set_input_dist_attr(input_varname, tensor_dist_attr)
    for output_varname in new_op.desc.output_arg_names():
        new_op_dist_attr.set_output_dist_attr(output_varname, tensor_dist_attr)
    ctx.set_op_dist_attr_for_program(new_op, new_op_dist_attr)


def naive_copy_op_dist_attr_for_program(new_op, ref_op, ctx):

    ref_dist_attr = ctx.get_op_dist_attr_for_program(ref_op)
    new_op_dist_attr = OperatorDistributedAttribute()
    new_op_dist_attr.process_mesh = ref_dist_attr.process_mesh

    for input_name in ref_op.input_names:
        assert input_name in new_op.input_names
        assert len(ref_op.input(input_name)) == 1
        assert len(new_op.input(input_name)) == 1

        ref_tensor_dist_attr = ref_dist_attr.get_input_dist_attr(
327 328
            ref_op.input(input_name)[0]
        )
329
        new_op_dist_attr.set_input_dist_attr(
330 331
            new_op.input(input_name)[0], ref_tensor_dist_attr
        )
332 333 334 335 336 337 338

    for output_name in ref_op.output_names:
        assert output_name in new_op.output_names
        assert len(ref_op.output(output_name)) == 1
        assert len(new_op.output(output_name)) == 1

        ref_tensor_dist_attr = ref_dist_attr.get_output_dist_attr(
339 340
            ref_op.output(output_name)[0]
        )
341
        new_op_dist_attr.set_output_dist_attr(
342 343
            new_op.output(output_name)[0], ref_tensor_dist_attr
        )
344 345

    ctx.set_op_dist_attr_for_program(new_op, new_op_dist_attr)
346 347 348 349 350 351 352 353


def get_data_parallel_group(dist_ctx, op, act_grad_names, rank):
    """
    deduce the data parallel communication group for current operator.

    Args:
        dist_ctx (DistributedContext): dist context.
354 355 356
        op (Operator): the current (backward) operator which might need.
        act_grad_names (list): list of input activation grads variable name to the current operator.
        out_grad_names (list): list of the output parameter's grads variable name of the current operator.
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
        rank (int): global ranks index for current process.
    """
    dp_group = None

    op_dist_attr = dist_ctx.get_op_dist_attr_for_program(op)
    process_mesh = op_dist_attr.process_mesh
    mesh_shape = process_mesh.topology
    # FIXME Hack for Pipeline Parallelism where the current operator
    # not belong to the mesh the current rank belong to.
    if rank not in process_mesh.processes:
        rank = _get_corresponding_rank(dist_ctx, process_mesh, rank)

    for var_name in act_grad_names:
        var_dim_mapping = op_dist_attr.get_input_dims_mapping(var_name)
        # consider that the variable's shape is None
        # TODO utilize the batch_dim attr instead of "0" in future
        batch_size_axis = var_dim_mapping[0] if len(var_dim_mapping) > 0 else -1

        if batch_size_axis > -1 and mesh_shape[batch_size_axis] > 1:
376 377 378 379 380 381
            group_ranks = _get_comm_group(
                process_mesh.processes,
                process_mesh.topology,
                batch_size_axis,
                rank,
            )
382 383 384 385 386 387 388 389
            dp_group = new_process_group(group_ranks)
            break

    return dp_group


def sync_and_scale_gradients(dist_ctx, op, dp_group, allreduce_var_names):
    """
390
    insert the allreudce and scale ops for gradients of model
391 392 393 394
    parameters for operator in data parallelism.

    Args:
        dist_ctx (DistributedContext): dist context.
395 396
        op (Operator): the current (backward) operator which might need.
        allreduce_var_names (list): list of the parameter's grads variable name in the current operator output.
397 398 399 400 401 402 403 404 405 406 407
    """

    op_dist_attr = dist_ctx.get_op_dist_attr_for_program(op)
    process_mesh = op_dist_attr.process_mesh
    dist_op_context = dist_ctx.dist_op_context
    main_block = dist_op_context.work_block
    dp_degree = len(dp_group.ranks)

    for var_name in allreduce_var_names:
        added_ops = []
        grad_var = main_block.var(var_name)
408 409 410 411 412 413 414 415 416 417 418 419 420
        allreduce_op = main_block.append_op(
            type='c_allreduce_sum',
            inputs={'X': [grad_var]},
            outputs={'Out': [grad_var]},
            attrs={
                'ring_id': dp_group.id,
                'use_calc_stream': True,
                OP_ROLE_KEY: OpRole.Backward,
            },
        )
        allreduce_op._set_attr(
            'op_namescope', str('/') + ParallelMode.DataParallel
        )
421 422 423
        added_ops.append(allreduce_op)

        if dist_ctx.gradient_scale:
424 425 426 427 428 429 430 431 432
            scale_op = main_block.append_op(
                type='scale',
                inputs={'X': grad_var},
                outputs={'Out': grad_var},
                attrs={'scale': 1.0 / dp_degree, OP_ROLE_KEY: OpRole.Backward},
            )
            scale_op._set_attr(
                'op_namescope', str('/') + ParallelMode.DataParallel
            )
433 434 435
            added_ops.append(scale_op)

        dims_mapping = op_dist_attr.get_output_dims_mapping(grad_var.name)
436 437 438 439 440
        assert (
            dims_mapping is not None
        ), "Unexception: dims_mapping of output [{}] of op [{}] is None".format(
            grad_var.name, op_dist_attr.op_type
        )
441 442 443 444 445 446 447 448 449
        # NOTE auxiliary op's dist attr should follow dist_op not dist_tensor
        for new_op in added_ops:
            new_op_attr = OperatorDistributedAttribute()
            new_op_attr.process_mesh = process_mesh
            new_op_attr.set_output_dims_mapping(grad_var.name, dims_mapping)
            new_op_attr.set_input_dims_mapping(grad_var.name, dims_mapping)
            dist_ctx.set_op_dist_attr_for_program(new_op, new_op_attr)


450 451 452
def gradient_synchronization(
    dist_ctx, op, act_grad_names, out_grad_names, rank
):
453
    """
454
    conduct the allreudce and scaling(dp size)for gradients of model
455 456 457 458
    parameters for operator in data parallelism.

    Args:
        dist_ctx (DistributedContext): dist context.
459 460 461
        op (Operator): the current (backward) operator which might need.
        act_grad_names (list): list of input activation grads variable name to the current operator.
        out_grad_names (list): list of the output parameter's grads variable name of the current operator.
462 463 464
        rank (int): global ranks index for current process.
    """

465 466 467
    if not is_in_backward_phase(dist_ctx):
        return

468 469 470 471 472
    if (
        is_optimize_op(op)
        or len(act_grad_names) == 0
        or len(out_grad_names) == 0
    ):
473 474 475 476 477 478 479 480
        return

    dp_group = get_data_parallel_group(dist_ctx, op, act_grad_names, rank)

    if not dp_group:
        return

    sync_and_scale_gradients(dist_ctx, op, dp_group, out_grad_names)
481 482 483


def is_data_parallel_scale_op(op):
484 485 486 487 488
    return (
        op.type == "scale"
        and op.desc.has_attr("op_namescope")
        and ParallelMode.DataParallel in op.desc.attr("op_namescope")
    )
489 490 491


def is_data_parallel_reduce_op(op):
492 493 494 495 496
    return (
        op.type in ["c_reduce_sum", "c_allreduce_sum"]
        and op.desc.has_attr("op_namescope")
        and ParallelMode.DataParallel in op.desc.attr("op_namescope")
    )
497 498 499 500 501 502 503 504 505


def is_in_backward_phase(dist_ctx):
    # NOTE currently high-order differential in Paddle dose NOT distinguish gradient computation operators
    # in Forward phase and operators in Backward phase (both with op_role=1), which will mislead
    # auto parallel to add gradient synchronization for gradient computation operators in Forward phase.
    # we use this FLAG to distinguish these two phases temporarily.

    return dist_ctx.dist_op_context.in_backward_phase()