reduce.py 5.8 KB
Newer Older
1
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
W
wuhuachaocoding 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
import paddle
16 17
from paddle import framework
from paddle.distributed.communication import stream
18

W
wuhuachaocoding 已提交
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36

class ReduceOp:
    """

    Specify the type of operation used for element-wise reductions.
    It should be one of the following values:

        ReduceOp.SUM

        ReduceOp.MAX

        ReduceOp.MIN

        ReduceOp.PROD

    Examples:
        .. code-block:: python

37 38 39 40 41 42 43 44 45 46 47 48
            >>> # doctest: +REQUIRES(env: DISTRIBUTED)
            >>> import paddle
            >>> import paddle.distributed as dist

            >>> dist.init_parallel_env()
            >>> if dist.get_rank() == 0:
            ...     data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
            >>> else:
            ...     data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
            >>> dist.all_reduce(data, op=dist.ReduceOp.SUM)
            >>> print(data)
            >>> # [[5, 7, 9], [5, 7, 9]] (2 GPUs)
W
wuhuachaocoding 已提交
49
    """
50

W
wuhuachaocoding 已提交
51 52 53 54 55
    SUM = 0
    MAX = 1
    MIN = 2
    PROD = 3
    AVG = 4
56 57 58


def _get_reduce_op(reduce_op, func_name):
59
    if framework.in_dynamic_mode():
60
        if reduce_op == ReduceOp.SUM:
61
            return framework.core.ReduceOp.SUM
62
        elif reduce_op == ReduceOp.MAX:
63
            return framework.core.ReduceOp.MAX
64
        elif reduce_op == ReduceOp.MIN:
65
            return framework.core.ReduceOp.MIN
66
        elif reduce_op == ReduceOp.PROD:
67
            return framework.core.ReduceOp.PRODUCT
68 69
    else:
        if reduce_op == ReduceOp.SUM:
70
            return f'c_{func_name}_sum'
71
        elif reduce_op == ReduceOp.MAX:
72
            return f'c_{func_name}_max'
73
        elif reduce_op == ReduceOp.MIN:
74
            return f'c_{func_name}_min'
75
        elif reduce_op == ReduceOp.PROD:
76
            return f'c_{func_name}_prod'
77
        else:
78
            return f'c_{func_name}'
79

80
    raise ValueError(f"Unknown reduce_op type for {func_name}.")
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108


def reduce(tensor, dst, op=ReduceOp.SUM, group=None, sync_op=True):
    """

    Reduce a tensor to the destination from all others. As shown below, one process is started with a GPU and the data of this process is represented
    by its group rank. The destination of the reduce operator is GPU0 and the process is sum. Through reduce operator,
    the GPU0 will owns the sum of all data from all GPUs.

    .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/reduce.png
        :width: 800
        :alt: reduce
        :align: center

    Args:
        tensor (Tensor): The output Tensor for the destination and the input Tensor otherwise. Its data type
            should be float16, float32, float64, int32, int64, int8, uint8, bool or bfloat16.
        dst (int): The destination rank id.
        op (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD, optional): The operation used. Default value is ReduceOp.SUM.
        group (Group, optional): The group instance return by new_group or None for global default group.
        sync_op (bool, optional): Whether this op is a sync op. The default value is True.

    Returns:
        Return a task object.

    Examples:
        .. code-block:: python

109 110 111 112 113 114 115 116 117 118 119 120 121
            >>> # doctest: +REQUIRES(env: DISTRIBUTED)
            >>> import paddle
            >>> import paddle.distributed as dist

            >>> dist.init_parallel_env()
            >>> if dist.get_rank() == 0:
            ...     data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
            >>> else:
            ...     data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
            >>> dist.reduce(data, dst=0)
            >>> print(data)
            >>> # [[5, 7, 9], [5, 7, 9]] (2 GPUs, out for rank 0)
            >>> # [[1, 2, 3], [1, 2, 3]] (2 GPUs, out for rank 1)
122
    """
123 124 125 126 127 128 129 130
    return stream.reduce(
        tensor,
        dst=dst,
        op=op,
        group=group,
        sync_op=sync_op,
        use_calc_stream=False,
    )
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184

    # code below will be removed after we remove the old dygraph
    if group is not None and not group.is_member():
        return
    use_calc_stream = sync_op
    ring_id = 0 if group is None else group.id
    gdst = dst if group is None else group.get_group_rank(dst)
    assert gdst >= 0, "dst rank out of group, need global rank"

    if op == ReduceOp.SUM:
        return paddle._legacy_C_ops.c_reduce_sum(
            tensor,
            tensor,
            'use_calc_stream',
            use_calc_stream,
            'ring_id',
            ring_id,
            'root_id',
            gdst,
        )
    elif op == ReduceOp.MAX:
        return paddle._legacy_C_ops.c_reduce_max(
            tensor,
            tensor,
            'use_calc_stream',
            use_calc_stream,
            'ring_id',
            ring_id,
            'root_id',
            gdst,
        )
    elif op == ReduceOp.MIN:
        return paddle._legacy_C_ops.c_reduce_min(
            tensor,
            tensor,
            'use_calc_stream',
            use_calc_stream,
            'ring_id',
            ring_id,
            'root_id',
            gdst,
        )
    elif op == ReduceOp.PROD:
        return paddle._legacy_C_ops.c_reduce_prod(
            tensor,
            tensor,
            'use_calc_stream',
            use_calc_stream,
            'ring_id',
            ring_id,
            'root_id',
            gdst,
        )
    else:
185
        raise ValueError(f"Unknown parameter: {op}.")