utils.py 8.0 KB
Newer Older
R
Roc 已提交
1
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
R
Roc 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
from paddle import _legacy_C_ops
R
Roc 已提交
16
from paddle.fluid.data_feeder import check_variable_and_dtype
17
from paddle.fluid.framework import in_dygraph_mode
18
from paddle.fluid.layer_helper import LayerHelper
R
Roc 已提交
19 20


R
Roc 已提交
21
def _number_count(numbers, upper_range):
R
Roc 已提交
22 23 24
    """
    calculate the expert count according to the gate index.
    Args:
R
Roc 已提交
25
        numbers (Tensor): Tensor. The input gate index whose data type should be int32 or int64.
R
Roc 已提交
26 27 28 29 30 31 32 33
        upper_range (int): The number of the experts.
    Returns:
        out (Tensor): The output expert count.
    Examples:
        .. code-block:: python
            # required: distributed
            import paddle

R
Roc 已提交
34
            numbers = [
R
Roc 已提交
35 36 37 38
                [0, 2],
                [0, 2]
            ]
            upper_range = 6
R
Roc 已提交
39 40
            numbers = paddle.to_tensor(numbers, dtype="int32")
            number_count = paddle.distributed.utils.number_count(numbers, upper_range)
R
Roc 已提交
41 42
            print(number_count) # the result: [2, 0, 2, 0, 0, 0]
    """
43
    if in_dygraph_mode():
44
        return _legacy_C_ops.number_count(numbers, 'upper_range', upper_range)
R
Roc 已提交
45 46 47 48
    else:
        op_type = 'number_count'

        helper = LayerHelper(op_type, **locals())
R
Roc 已提交
49
        out = helper.create_variable_for_type_inference(dtype=numbers.dtype)
R
Roc 已提交
50

51 52 53 54 55 56
        helper.append_op(
            type=op_type,
            inputs={'numbers': numbers},
            outputs={'Out': out},
            attrs={'upper_range': upper_range},
        )
R
Roc 已提交
57
        return out
R
Roc 已提交
58 59 60 61


def _assign_pos(x, cum_count):
    """
62
    Assign pos decides which tokens should be fetched belong to
R
Roc 已提交
63
    specially expert orderingly.
64

R
Roc 已提交
65 66 67
    Args:
        x (Tensor): Tensor. Every element in the list must be a Tensor whose data type
            should be float16, float32, float64, int32 or int64.
68
        cum_count (Tensor): The cumulative sum tokens of counters. Every element in the list must be a Tensor whose
R
Roc 已提交
69
            data type should be int64.
70

R
Roc 已提交
71
    Returns:
72 73
        out (Tensor): Assemble numbers in the order of counters.

R
Roc 已提交
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
    Examples:
        .. code-block:: python

            # required: distributed
            import paddle
            number_count = [2, 0, 2, 0]
            numbers = [
                [0, 2],
                [0, 2]
            ]
            number_count = paddle.to_tensor(number_count)
            numbers = paddle.to_tensor(numbers, dtype="int32")
            num_cum = paddle.cumsum(number_count)
            pos = paddle.distributed.utils.assign_pos(x=numbers, cum_count=num_cum)
            print(pos) # the result: (2, 0, 3, 1)
    """
90
    if in_dygraph_mode():
91
        return _legacy_C_ops.assign_pos(x, cum_count, cum_count[-1])
R
Roc 已提交
92 93 94 95 96 97
    else:
        op_type = 'assign_pos'

        helper = LayerHelper(op_type, **locals())
        out = helper.create_variable_for_type_inference(dtype=cum_count.dtype)

98 99 100 101 102 103 104 105 106
        helper.append_op(
            type=op_type,
            inputs={
                'X': [x],
                'cum_count': [cum_count],
                "eff_num_len": [cum_count[-1]],
            },
            outputs={'Out': [out]},
        )
R
Roc 已提交
107
        return out
R
Roc 已提交
108 109 110 111


def _random_routing(topk_idx, topk_value, prob, topk=2):
    r"""
112 113 114 115 116 117 118 119 120 121 122
    random routing topk gate idx
    ```
        out = topk_idx
        for i in len(topk_idx):
            if topk * value[i][topk-1] < prob[i]:
                out[i][topk-1] = -1
    ```
    Args:
        topk_idx: gate idx, shape=(N, topk)
        topk_value: values, shape = topk_idx.shape
        prob: random prob, shape=(topk_idx.shape[0],)
R
Roc 已提交
123 124
    """
    if topk == 2:
125
        if in_dygraph_mode():
126
            return _legacy_C_ops.random_routing(prob, topk_value, topk_idx)
R
Roc 已提交
127
        else:
128
            raise RuntimeError("Not supporting static graph mode now")
R
Roc 已提交
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
    else:
        raise RuntimeError("only topk=2 is supported now")


def _limit_by_capacity(expert_count, capacity, n_worker):
    """
    limit the expert count by capacity.
    Args:
        expert_count (Tensor): Tensor. The input expert count whose data type should be int32 or int64.
        capacity (Tensor): Tensor. The input capacity whose data type should be int32 or int64 and the elements of capacity should be the same with expert_count.numel()/n_work.
        n_work (int): The number of the works.
    Returns:
        out (Tensor): The output expert count limit by capacity.
    Examples:
        .. code-block:: python
            # required: distributed
            import paddle
            expert_count = [1, 2, 2, 8, 3, 6]
            capacity = [5, 5, 5]
            n_work = 2
            expert_count = paddle.to_tensor(expert_count, dtype="int32")
            capacity = paddle.to_tensor(capacity, dtype="int32")
            out = paddle.distributed.utils.limit_by_capacity(expert_count, capacity, n_work)
            print(out) # the result: [1, 2, 2, 4, 3, 3]
    """
154
    if in_dygraph_mode():
155 156 157
        return _legacy_C_ops.limit_by_capacity(
            expert_count, capacity, 'n_worker', n_worker
        )
R
Roc 已提交
158 159 160 161 162
    else:
        op_type = 'limit_by_capacity'

        helper = LayerHelper(op_type, **locals())
        out = helper.create_variable_for_type_inference(
163 164 165 166 167 168 169 170 171
            dtype=expert_count.dtype
        )

        helper.append_op(
            type=op_type,
            inputs={'expert_count': expert_count, 'capacity': capacity},
            outputs={'Out': out},
            attrs={'n_worker': n_worker},
        )
R
Roc 已提交
172 173 174 175 176 177 178 179 180 181 182
        return out


def _prune_gate_by_capacity(gate_idx, expert_count, n_expert, n_worker):
    """
    prune gate by capacity(only support CUDA)

    Args:
        gate_idx (Tensor): Represents the gate_id sequence corresponding to the input data with type int32, int64.
        expert_count (Tensor): The quantity value counted on the gate_id sequence of the input data with type int32, int64.
        n_worker(int,optional): The number of workers on the trainer with type int64.
183

R
Roc 已提交
184 185
    Returns:
        new_gate_idx (Tensor): The gate_id sequence corresponding to the new input data after passing through prune.
186

R
Roc 已提交
187 188 189 190 191 192 193 194 195 196 197 198
    Examples:
        .. code-block:: python

            import paddle
            gate_idx = paddle.to_tensor([1, 3, 3, 3, 3, 2, 1, 1], dtype='int32')
            expert_count = paddle.to_tensor([0, 3, 1, 3, 0, 0, 0, 0], dtype='int32')
            n_worker = 1
            new_gate_id = paddle.distributed.utils.prune_gate_by_capacity(gate_idx, expert_count, n_expert, n_worker)
            print(new_gate_id)
            # Tensor(shape=[8], dtype=int32, place=CUDAPlace(0), stop_gradient=True,
              [1, 3, 3, 3, -1, 2, 1, 1])
    """
199
    if in_dygraph_mode():
200 201 202
        return _legacy_C_ops.prune_gate_by_capacity(
            gate_idx, expert_count, "n_expert", n_expert, "n_worker", n_worker
        )
203 204 205 206 207 208 209 210 211 212 213 214
    else:
        check_variable_and_dtype(
            gate_idx,
            'GateIdx',
            ['int32', 'int64'],
            'paddle.distributed.utils.prune_gate_by_capacity',
        )
        check_variable_and_dtype(
            expert_count,
            'ExpertCount',
            ['int32', 'int64'],
            'paddle.distributed.utils.prune_gate_by_capacity',
215
        )
216 217 218 219 220 221 222 223 224 225 226 227 228

        helper = LayerHelper('prune_gate_by_capacity', **locals())
        new_gate_idx = helper.create_variable_for_type_inference(
            dtype=gate_idx.dtype
        )
        helper.append_op(
            type='prune_gate_by_capacity',
            inputs={'GateIdx': gate_idx, "ExpertCount": expert_count},
            outputs={'NewGateIdx': new_gate_idx},
            attrs={"n_expert": n_expert, "n_worker": n_worker},
        )

        return new_gate_idx