dist_reshape.py 28.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License

15
from .common import DistributedOperatorImplContainer
16
from .common import DistributedOperatorImpl
17
from .common import register_distributed_operator_impl_container
C
caozhou 已提交
18
from .common import register_distributed_operator_impl, is_parameter_related
19 20
from ..utils import is_dim_shard
from ..utils import compute_compatible_and_update_dim_mapping
21
from ..utils import set_dist_op_desc_original_id
22
from .dist_default import DistributedDefaultImpl0
C
caozhou 已提交
23 24 25
from ..cost import build_comp_desc_from_dist_op, build_comp_costs_from_descs
from ..cost import Reshape2OpCost
from ..cost import Reshape2GradOpCost
26
from ..cost import build_dp_costs
C
caozhou 已提交
27
from paddle.distributed.fleet.meta_optimizers.common import OpRole
28 29


30
class DistributedReshape2(DistributedOperatorImplContainer):
31
    def __init__(self, op_type):
32
        super().__init__(op_type)
33 34


35
register_distributed_operator_impl_container(DistributedReshape2("reshape2"))
36 37 38 39


class DistributedReshapeImpl0(DistributedOperatorImpl):
    def __init__(self, name):
40
        super().__init__(name)
41
        self._forward_implemented = True
42
        self._backward_implemented = False
43

C
caozhou 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
    def calc_cost(self, op_role, dist_op, ctx, cluster):
        cost = None
        if int(op_role) == int(OpRole.Backward):
            cost = self.calc_bwd_cost(dist_op, ctx, cluster)
        else:
            cost = self.calc_fwd_cost(dist_op, ctx, cluster)
        assert cost is not None
        return cost

    def calc_fwd_cost(self, dist_op, ctx, cluster):
        res = []
        op = dist_op.serial_op
        vars = op.block.vars
        dist_attr = dist_op.dist_attr

        shape_list = op.desc.attr("shape")
        # got dist attribute info
        dim_mapping = dist_attr.get_output_dims_mapping(op.output("Out")[0])
        process_mesh_shape = dist_attr.process_mesh.topology

        # modify target shape
        for idx, axis in enumerate(dim_mapping):
            if axis >= 0:
                if len(shape_list) > idx:
68 69 70
                    shape_list[idx] = (
                        shape_list[idx] // process_mesh_shape[axis]
                    )
C
caozhou 已提交
71 72

        # calc comp op cost
73 74 75
        desc_mapping = build_comp_desc_from_dist_op(
            dist_op=dist_op, dist_context=ctx
        )
C
caozhou 已提交
76 77 78 79
        processes = dist_attr.process_mesh.processes
        for key in desc_mapping:
            desc_mapping[key]["shape"] = shape_list

80 81 82
        cost_mapping = build_comp_costs_from_descs(
            Reshape2OpCost, ctx, processes, desc_mapping, cluster
        )
C
caozhou 已提交
83 84 85 86 87 88 89
        res.append(cost_mapping)

        return res

    def calc_bwd_cost(self, dist_op, ctx, cluster):
        # calc comp op cost
        res = []
90 91 92
        desc_mapping = build_comp_desc_from_dist_op(
            dist_op=dist_op, dist_context=ctx
        )
C
caozhou 已提交
93 94 95 96 97
        dist_attr = dist_op.dist_attr
        process_mesh = dist_attr.process_mesh
        processes = process_mesh.processes
        op_type = dist_op.serial_op.type

98 99 100
        cost_mapping = build_comp_costs_from_descs(
            Reshape2GradOpCost, ctx, processes, desc_mapping, cluster
        )
C
caozhou 已提交
101 102 103 104 105 106 107 108 109
        res.append(cost_mapping)

        backward_op = dist_op.serial_op
        main_block = backward_op.block
        need_gradient_allreduce = False
        vars = main_block.vars
        for input_name in backward_op.desc.input_names():
            for varname in backward_op.desc.input(input_name):
                if "@GRAD" not in varname and is_parameter_related(
110 111
                    varname, main_block
                ):
C
caozhou 已提交
112 113 114 115 116 117 118 119 120
                    # NOTE input var's dim_mapping of backward op should be the same with input var instead of corresponding varname of forward op
                    var_dim_mapping = dist_attr.get_input_dims_mapping(varname)

                    mesh_shape = process_mesh.topology
                    batch_size_axis = var_dim_mapping[0]
                    if batch_size_axis > -1 and mesh_shape[batch_size_axis] > 1:
                        parallel_axis = batch_size_axis
                        attrs = {"use_calc_stream": True}
                        var_names = [varname + "@GRAD"]
121 122 123 124 125 126 127 128 129
                        build_dp_costs(
                            res,
                            dist_op,
                            ctx,
                            var_names,
                            attrs,
                            parallel_axis,
                            cluster,
                        )
C
caozhou 已提交
130 131 132

        return res

133 134 135
    def is_input_compatible(self, dist_op):
        op_desc = dist_op.serial_op.desc
        op_dist_attr = dist_op.dist_attr
136 137 138 139 140 141 142 143 144 145
        x_name = op_desc.input('X')[0]
        out_name = op_desc.output('Out')[0]
        x_dims_mapping = op_dist_attr.get_input_dims_mapping(x_name)
        out_dims_mapping = op_dist_attr.get_output_dims_mapping(out_name)

        if len(x_dims_mapping) != len(out_dims_mapping) - 1:
            return False

        return True

146 147 148
    def is_output_compatible(self, dist_op):
        op_desc = dist_op.serial_op.desc
        op_dist_attr = dist_op.dist_attr
149 150 151 152 153 154 155 156 157 158 159 160 161
        x_name = op_desc.input('X')[0]
        out_name = op_desc.output('Out')[0]
        x_dims_mapping = op_dist_attr.get_input_dims_mapping(x_name)
        out_dims_mapping = op_dist_attr.get_output_dims_mapping(out_name)

        if len(x_dims_mapping) != len(out_dims_mapping) - 1:
            return False

        if is_dim_shard(out_dims_mapping[-1]):
            return False

        return True

沉潜的鱼儿's avatar
沉潜的鱼儿 已提交
162
    def is_auto_compatible(self, dist_op):
163 164 165
        if (not self.is_input_compatible(dist_op)) or (
            not self.is_output_compatible(dist_op)
        ):
166 167
            return False

沉潜的鱼儿's avatar
沉潜的鱼儿 已提交
168 169 170 171 172 173
        op_desc = dist_op.serial_op.desc
        op_dist_attr = dist_op.dist_attr
        x_name = op_desc.input('X')[0]
        out_name = op_desc.output('Out')[0]
        x_shape_name = op_desc.output('XShape')[0]
        x_shape_dims_mapping = op_dist_attr.get_output_dims_mapping(
174 175
            x_shape_name
        )
沉潜的鱼儿's avatar
沉潜的鱼儿 已提交
176 177 178
        x_dims_mapping = op_dist_attr.get_input_dims_mapping(x_name)
        out_dims_mapping = op_dist_attr.get_output_dims_mapping(out_name)

179 180
        for idx, dim_mapping in enumerate(out_dims_mapping[:-1]):
            if x_dims_mapping[idx] != dim_mapping:
沉潜的鱼儿's avatar
沉潜的鱼儿 已提交
181 182 183 184 185 186 187 188 189 190
                return False

        if x_shape_dims_mapping[0] != -1:
            return False

        if x_shape_dims_mapping[1:] != x_dims_mapping[:]:
            return False

        return True

191
    def update_dims_mapping(self, dist_op):
192
        changed = False
193 194
        op_desc = dist_op.serial_op.desc
        op_dist_attr = dist_op.dist_attr
195 196 197 198 199 200
        x_name = op_desc.input('X')[0]
        out_name = op_desc.output('Out')[0]
        x_shape_name = op_desc.output('XShape')[0]
        x_dims_mapping = op_dist_attr.get_input_dims_mapping(x_name)
        out_dims_mapping = op_dist_attr.get_output_dims_mapping(out_name)
        x_shape_dims_mapping = op_dist_attr.get_output_dims_mapping(
201 202
            x_shape_name
        )
203 204 205

        for i in range(len(x_dims_mapping)):
            dim_changed = compute_compatible_and_update_dim_mapping(
206 207
                [x_dims_mapping, out_dims_mapping], [i, i]
            )
208 209 210 211 212 213 214 215
            if dim_changed:
                changed = True

        for i in range(len(x_dims_mapping)):
            x_shape_dims_mapping[i + 1] = x_dims_mapping[i]

        return changed

216 217 218 219 220 221
    @staticmethod
    def forward(ctx, *args, **kwargs):
        """
        kwargs: inputname_mapping & outputname_mapping
        """

222
        dist_op_context = ctx.dist_op_context
223 224 225
        main_block = dist_op_context.work_block
        src_op = dist_op_context.cur_src_op
        rank_id = dist_op_context.rank_id
226
        op_dist_attr = ctx.get_op_dist_attr_for_program(src_op)
227 228 229
        assert (
            op_dist_attr is not None
        ), "backward op [{}] don't have dist attribute !".format(str(src_op))
230

231
        # check validation of inputs / outputs
232 233
        for input_name in src_op.desc.input_names():
            assert input_name in kwargs, "input [{}] is not given".format(
234 235
                input_name
            )
236 237 238 239 240
            assert len(kwargs[input_name]) == len(
                src_op.desc.input(input_name)
            ), "number of tensor for input [{}] is not match".format(input_name)
        for output_name in src_op.desc.output_names():
            assert output_name in kwargs, "input [{}] is not given".format(
241 242
                output_name
            )
243 244 245
            assert len(kwargs[output_name]) == len(
                src_op.desc.output(output_name)
            ), "number of tensor for input [{}] is not match".format(
246 247
                output_name
            )
248 249 250 251 252 253 254 255 256 257 258 259 260 261

        X_var = main_block.var(kwargs['X'][0])
        Out_var = main_block.var(kwargs['Out'][0])
        XShape_var = main_block.var(kwargs['XShape'][0])
        shape_list = src_op.desc.attr("shape")
        ShapeTensor_var_list = []
        for name in kwargs['ShapeTensor']:
            ShapeTensor_var_list.append(name)
        Shape_var_list = []
        for name in kwargs['Shape']:
            Shape_var_list.append(name)

        # got dist attribute info
        dim_mapping = op_dist_attr.get_output_dims_mapping(Out_var.name)
262
        process_mesh_shape = op_dist_attr.process_mesh.topology
263 264 265 266 267

        # modify target shape
        for idx, axis in enumerate(dim_mapping):
            if axis >= 0:
                if len(shape_list) > idx:
268 269 270
                    shape_list[idx] = (
                        shape_list[idx] // process_mesh_shape[axis]
                    )
271 272

        # create op
273
        new_op_desc = main_block.append_op(type='nop').desc
274
        new_op_desc.copy_from(src_op.desc)
275
        set_dist_op_desc_original_id(new_op_desc, src_op.desc, ctx)
276 277 278 279 280 281 282 283 284
        new_op_desc.set_input('ShapeTensor', ShapeTensor_var_list)
        new_op_desc.set_input('Shape', Shape_var_list)
        new_op_desc.set_input('X', [X_var.name])
        new_op_desc.set_output('XShape', [XShape_var.name])
        new_op_desc.set_output('Out', [Out_var.name])
        new_op_desc._set_attr('shape', shape_list)

    @staticmethod
    def backward(ctx, *args, **kwargs):
285
        DistributedDefaultImpl0.backward(ctx, *args, **kwargs)
286

287 288 289

class DistributedReshapeImpl1(DistributedOperatorImpl):
    def __init__(self, name):
290
        super().__init__(name)
291
        self._forward_implemented = True
292
        self._backward_implemented = False
293

C
caozhou 已提交
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
    def calc_cost(self, op_role, dist_op, ctx, cluster):
        cost = None
        if int(op_role) == int(OpRole.Backward):
            cost = self.calc_bwd_cost(dist_op, ctx, cluster)
        else:
            cost = self.calc_fwd_cost(dist_op, ctx, cluster)
        assert cost is not None
        return cost

    def calc_fwd_cost(self, dist_op, ctx, cluster):
        res = []
        op = dist_op.serial_op
        vars = op.block.vars
        dist_attr = dist_op.dist_attr

        shape_list = op.desc.attr("shape")
        # got dist attribute info
        dim_mapping = dist_attr.get_output_dims_mapping(op.output("Out")[0])
        process_mesh_shape = dist_attr.process_mesh.topology

        # modify target shape
        for idx, axis in enumerate(dim_mapping):
            if axis >= 0:
                if len(shape_list) > idx:
318 319 320
                    shape_list[idx] = (
                        shape_list[idx] // process_mesh_shape[axis]
                    )
C
caozhou 已提交
321 322

        # calc comp op cost
323 324 325
        desc_mapping = build_comp_desc_from_dist_op(
            dist_op=dist_op, dist_context=ctx
        )
C
caozhou 已提交
326 327 328 329
        processes = dist_attr.process_mesh.processes
        for key in desc_mapping:
            desc_mapping[key]["shape"] = shape_list

330 331 332
        cost_mapping = build_comp_costs_from_descs(
            Reshape2OpCost, ctx, processes, desc_mapping, cluster
        )
C
caozhou 已提交
333 334 335 336 337 338 339
        res.append(cost_mapping)

        return res

    def calc_bwd_cost(self, dist_op, ctx, cluster):
        # calc comp op cost
        res = []
340 341 342
        desc_mapping = build_comp_desc_from_dist_op(
            dist_op=dist_op, dist_context=ctx
        )
C
caozhou 已提交
343 344 345 346 347
        dist_attr = dist_op.dist_attr
        process_mesh = dist_attr.process_mesh
        processes = process_mesh.processes
        op_type = dist_op.serial_op.type

348 349 350
        cost_mapping = build_comp_costs_from_descs(
            Reshape2GradOpCost, ctx, processes, desc_mapping, cluster
        )
C
caozhou 已提交
351 352 353 354 355 356 357 358 359
        res.append(cost_mapping)

        backward_op = dist_op.serial_op
        main_block = backward_op.block
        need_gradient_allreduce = False
        vars = main_block.vars
        for input_name in backward_op.desc.input_names():
            for varname in backward_op.desc.input(input_name):
                if "@GRAD" not in varname and not is_parameter_related(
360 361
                    varname, main_block
                ):
C
caozhou 已提交
362 363 364 365 366 367 368 369 370
                    # NOTE input var's dim_mapping of backward op should be the same with input var instead of corresponding varname of forward op
                    var_dim_mapping = dist_attr.get_input_dims_mapping(varname)

                    mesh_shape = process_mesh.topology
                    batch_size_axis = var_dim_mapping[0]
                    if batch_size_axis > -1 and mesh_shape[batch_size_axis] > 1:
                        parallel_axis = batch_size_axis
                        attrs = {"use_calc_stream": True}
                        var_names = [varname + "@GRAD"]
371 372 373 374 375 376 377 378 379
                        build_dp_costs(
                            res,
                            dist_op,
                            ctx,
                            var_names,
                            attrs,
                            parallel_axis,
                            cluster,
                        )
C
caozhou 已提交
380 381 382

        return res

383 384 385
    def is_input_compatible(self, dist_op):
        op_desc = dist_op.serial_op.desc
        op_dist_attr = dist_op.dist_attr
386 387 388 389 390 391 392 393 394 395 396 397 398
        x_name = op_desc.input('X')[0]
        out_name = op_desc.output('Out')[0]
        x_dims_mapping = op_dist_attr.get_input_dims_mapping(x_name)
        out_dims_mapping = op_dist_attr.get_output_dims_mapping(out_name)

        if len(x_dims_mapping) != len(out_dims_mapping) + 1:
            return False

        if is_dim_shard(x_dims_mapping[-1]):
            return False

        return True

399 400 401
    def is_output_compatible(self, dist_op):
        op_desc = dist_op.serial_op.desc
        op_dist_attr = dist_op.dist_attr
402 403 404 405 406 407 408 409 410 411
        x_name = op_desc.input('X')[0]
        out_name = op_desc.output('Out')[0]
        x_dims_mapping = op_dist_attr.get_input_dims_mapping(x_name)
        out_dims_mapping = op_dist_attr.get_output_dims_mapping(out_name)

        if len(x_dims_mapping) != len(out_dims_mapping) + 1:
            return False

        return True

沉潜的鱼儿's avatar
沉潜的鱼儿 已提交
412
    def is_auto_compatible(self, dist_op):
413 414 415
        if (not self.is_input_compatible(dist_op)) or (
            not self.is_output_compatible(dist_op)
        ):
416 417
            return False

沉潜的鱼儿's avatar
沉潜的鱼儿 已提交
418 419 420 421 422 423 424 425
        op_desc = dist_op.serial_op.desc
        op_dist_attr = dist_op.dist_attr
        x_name = op_desc.input('X')[0]
        out_name = op_desc.output('Out')[0]
        x_shape_name = op_desc.output('XShape')[0]
        x_dims_mapping = op_dist_attr.get_input_dims_mapping(x_name)
        out_dims_mapping = op_dist_attr.get_output_dims_mapping(out_name)
        x_shape_dims_mapping = op_dist_attr.get_output_dims_mapping(
426 427
            x_shape_name
        )
沉潜的鱼儿's avatar
沉潜的鱼儿 已提交
428 429 430 431

        if is_dim_shard(x_dims_mapping[-1]):
            return False

432
        for idx, item in enumerate(x_dims_mapping[:-1]):
沉潜的鱼儿's avatar
沉潜的鱼儿 已提交
433 434 435 436 437 438 439 440 441 442 443
            if out_dims_mapping[idx] != item:
                return False

        if x_shape_dims_mapping[0] != -1:
            return False

        if x_shape_dims_mapping[1:] != x_dims_mapping[:]:
            return False

        return True

444
    def update_dims_mapping(self, dist_op):
445
        changed = False
446 447
        op_desc = dist_op.serial_op.desc
        op_dist_attr = dist_op.dist_attr
448 449 450 451 452 453
        x_name = op_desc.input('X')[0]
        out_name = op_desc.output('Out')[0]
        x_shape_name = op_desc.output('XShape')[0]
        x_dims_mapping = op_dist_attr.get_input_dims_mapping(x_name)
        out_dims_mapping = op_dist_attr.get_output_dims_mapping(out_name)
        x_shape_dims_mapping = op_dist_attr.get_output_dims_mapping(
454 455
            x_shape_name
        )
456 457 458

        for i in range(len(out_dims_mapping)):
            dim_changed = compute_compatible_and_update_dim_mapping(
459 460
                [x_dims_mapping, out_dims_mapping], [i, i]
            )
461 462 463 464 465 466 467 468
            if dim_changed:
                changed = True

        for i in range(len(x_dims_mapping)):
            x_shape_dims_mapping[i + 1] = x_dims_mapping[i]

        return changed

469 470 471 472 473 474
    @staticmethod
    def forward(ctx, *args, **kwargs):
        """
        kwargs: inputname_mapping & outputname_mapping
        """

475
        dist_op_context = ctx.dist_op_context
476 477 478
        main_block = dist_op_context.work_block
        src_op = dist_op_context.cur_src_op
        rank_id = dist_op_context.rank_id
479
        op_dist_attr = ctx.get_op_dist_attr_for_program(src_op)
480 481 482
        assert (
            op_dist_attr is not None
        ), "backward op [{}] don't have dist attribute !".format(str(src_op))
483

484
        # check validation of inputs / outputs
485 486
        for input_name in src_op.desc.input_names():
            assert input_name in kwargs, "input [{}] is not given".format(
487 488
                input_name
            )
489 490 491 492 493
            assert len(kwargs[input_name]) == len(
                src_op.desc.input(input_name)
            ), "number of tensor for input [{}] is not match".format(input_name)
        for output_name in src_op.desc.output_names():
            assert output_name in kwargs, "input [{}] is not given".format(
494 495
                output_name
            )
496 497 498
            assert len(kwargs[output_name]) == len(
                src_op.desc.output(output_name)
            ), "number of tensor for input [{}] is not match".format(
499 500
                output_name
            )
501 502 503 504 505 506 507 508 509 510 511 512 513 514

        X_var = main_block.var(kwargs['X'][0])
        Out_var = main_block.var(kwargs['Out'][0])
        XShape_var = main_block.var(kwargs['XShape'][0])
        shape_list = src_op.desc.attr("shape")
        ShapeTensor_var_list = []
        for name in kwargs['ShapeTensor']:
            ShapeTensor_var_list.append(name)
        Shape_var_list = []
        for name in kwargs['Shape']:
            Shape_var_list.append(name)

        # got dist attribute info
        dim_mapping = op_dist_attr.get_output_dims_mapping(Out_var.name)
515
        process_mesh_shape = op_dist_attr.process_mesh.topology
516 517 518 519 520

        # modify target shape
        for idx, axis in enumerate(dim_mapping):
            if axis >= 0:
                if len(shape_list) > idx:
521 522 523
                    shape_list[idx] = (
                        shape_list[idx] // process_mesh_shape[axis]
                    )
524 525

        # create op
526
        new_op_desc = main_block.append_op(type='nop').desc
527
        new_op_desc.copy_from(src_op.desc)
528
        set_dist_op_desc_original_id(new_op_desc, src_op.desc, ctx)
529 530 531 532 533 534 535 536 537
        new_op_desc.set_input('ShapeTensor', ShapeTensor_var_list)
        new_op_desc.set_input('Shape', Shape_var_list)
        new_op_desc.set_input('X', [X_var.name])
        new_op_desc.set_output('XShape', [XShape_var.name])
        new_op_desc.set_output('Out', [Out_var.name])
        new_op_desc._set_attr('shape', shape_list)

    @staticmethod
    def backward(ctx, *args, **kwargs):
538
        DistributedDefaultImpl0.backward(ctx, *args, **kwargs)
539

540

541 542
class DistributedReshapeImpl2(DistributedOperatorImpl):
    def __init__(self, name):
543
        super().__init__(name)
544 545 546
        self._forward_implemented = True
        self._backward_implemented = False

C
caozhou 已提交
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570
    def calc_cost(self, op_role, dist_op, ctx, cluster):
        cost = None
        if int(op_role) == int(OpRole.Backward):
            cost = self.calc_bwd_cost(dist_op, ctx, cluster)
        else:
            cost = self.calc_fwd_cost(dist_op, ctx, cluster)
        assert cost is not None
        return cost

    def calc_fwd_cost(self, dist_op, ctx, cluster):
        res = []
        op = dist_op.serial_op
        vars = op.block.vars
        dist_attr = dist_op.dist_attr

        shape_list = op.desc.attr("shape")
        # got dist attribute info
        dim_mapping = dist_attr.get_output_dims_mapping(op.output("Out")[0])
        process_mesh_shape = dist_attr.process_mesh.topology

        # modify target shape
        for idx, axis in enumerate(dim_mapping):
            if axis >= 0:
                if len(shape_list) > idx:
571 572 573
                    shape_list[idx] = (
                        shape_list[idx] // process_mesh_shape[axis]
                    )
C
caozhou 已提交
574 575

        # calc comp op cost
576 577 578
        desc_mapping = build_comp_desc_from_dist_op(
            dist_op=dist_op, dist_context=ctx
        )
C
caozhou 已提交
579 580 581 582
        processes = dist_attr.process_mesh.processes
        for key in desc_mapping:
            desc_mapping[key]["shape"] = shape_list

583 584 585
        cost_mapping = build_comp_costs_from_descs(
            Reshape2OpCost, ctx, processes, desc_mapping, cluster
        )
C
caozhou 已提交
586 587 588 589 590 591 592
        res.append(cost_mapping)

        return res

    def calc_bwd_cost(self, dist_op, ctx, cluster):
        # calc comp op cost
        res = []
593 594 595
        desc_mapping = build_comp_desc_from_dist_op(
            dist_op=dist_op, dist_context=ctx
        )
C
caozhou 已提交
596 597 598 599 600
        dist_attr = dist_op.dist_attr
        process_mesh = dist_attr.process_mesh
        processes = process_mesh.processes
        op_type = dist_op.serial_op.type

601 602 603
        cost_mapping = build_comp_costs_from_descs(
            Reshape2GradOpCost, ctx, processes, desc_mapping, cluster
        )
C
caozhou 已提交
604 605 606 607 608 609 610 611 612
        res.append(cost_mapping)

        backward_op = dist_op.serial_op
        main_block = backward_op.block
        need_gradient_allreduce = False
        vars = main_block.vars
        for input_name in backward_op.desc.input_names():
            for varname in backward_op.desc.input(input_name):
                if "@GRAD" not in varname and not is_parameter_related(
613 614
                    varname, main_block
                ):
C
caozhou 已提交
615 616 617 618 619 620 621 622 623
                    # NOTE input var's dim_mapping of backward op should be the same with input var instead of corresponding varname of forward op
                    var_dim_mapping = dist_attr.get_input_dims_mapping(varname)

                    mesh_shape = process_mesh.topology
                    batch_size_axis = var_dim_mapping[0]
                    if batch_size_axis > -1 and mesh_shape[batch_size_axis] > 1:
                        parallel_axis = batch_size_axis
                        attrs = {"use_calc_stream": True}
                        var_names = [varname + "@GRAD"]
624 625 626 627 628 629 630 631 632
                        build_dp_costs(
                            res,
                            dist_op,
                            ctx,
                            var_names,
                            attrs,
                            parallel_axis,
                            cluster,
                        )
C
caozhou 已提交
633 634 635

        return res

636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662
    def is_input_compatible(self, dist_op):
        op_desc = dist_op.serial_op.desc
        op_dist_attr = dist_op.dist_attr
        x_name = op_desc.input('X')[0]
        out_name = op_desc.output('Out')[0]
        x_dims_mapping = op_dist_attr.get_input_dims_mapping(x_name)
        out_dims_mapping = op_dist_attr.get_output_dims_mapping(out_name)

        if len(x_dims_mapping) != len(out_dims_mapping):
            return False

        return True

    def is_output_compatible(self, dist_op):
        op_desc = dist_op.serial_op.desc
        op_dist_attr = dist_op.dist_attr
        out_name = op_desc.output('Out')[0]
        x_name = op_desc.input('X')[0]
        x_dims_mapping = op_dist_attr.get_input_dims_mapping(x_name)
        out_dims_mapping = op_dist_attr.get_output_dims_mapping(out_name)

        if len(x_dims_mapping) != len(out_dims_mapping):
            return False

        return True

    def is_auto_compatible(self, dist_op):
663 664 665
        if (not self.is_input_compatible(dist_op)) or (
            not self.is_output_compatible(dist_op)
        ):
666 667 668 669 670 671 672 673 674 675
            return False

        op_desc = dist_op.serial_op.desc
        op_dist_attr = dist_op.dist_attr
        x_name = op_desc.input('X')[0]
        out_name = op_desc.output('Out')[0]
        x_shape_name = op_desc.output('XShape')[0]
        x_dims_mapping = op_dist_attr.get_input_dims_mapping(x_name)
        out_dims_mapping = op_dist_attr.get_output_dims_mapping(out_name)
        x_shape_dims_mapping = op_dist_attr.get_output_dims_mapping(
676 677
            x_shape_name
        )
678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700

        for idx, item in enumerate(x_dims_mapping[:-1]):
            if out_dims_mapping[idx] != item:
                return False

        if x_shape_dims_mapping[0] != -1:
            return False

        if x_shape_dims_mapping[1:] != out_dims_mapping[:]:
            return False

        return True

    def update_dims_mapping(self, dist_op):
        changed = False
        op_desc = dist_op.serial_op.desc
        op_dist_attr = dist_op.dist_attr
        x_name = op_desc.input('X')[0]
        out_name = op_desc.output('Out')[0]
        x_shape_name = op_desc.output('XShape')[0]
        x_dims_mapping = op_dist_attr.get_input_dims_mapping(x_name)
        out_dims_mapping = op_dist_attr.get_output_dims_mapping(out_name)
        x_shape_dims_mapping = op_dist_attr.get_output_dims_mapping(
701 702
            x_shape_name
        )
703 704 705

        for i in range(len(out_dims_mapping) - 1):
            dim_changed = compute_compatible_and_update_dim_mapping(
706 707
                [x_dims_mapping, out_dims_mapping], [i, i]
            )
708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725
            if dim_changed:
                changed = True

        for i in range(len(out_dims_mapping)):
            x_shape_dims_mapping[i + 1] = out_dims_mapping[i]

        return changed

    @staticmethod
    def forward(ctx, *args, **kwargs):
        """
        kwargs: inputname_mapping & outputname_mapping
        """

        dist_op_context = ctx.dist_op_context
        main_block = dist_op_context.work_block
        src_op = dist_op_context.cur_src_op
        op_dist_attr = ctx.get_op_dist_attr_for_program(src_op)
726 727 728
        assert (
            op_dist_attr is not None
        ), "backward op [{}] don't have dist attribute !".format(str(src_op))
729 730 731 732

        # check validation of inputs / outputs
        for input_name in src_op.desc.input_names():
            assert input_name in kwargs, "input [{}] is not given".format(
733 734
                input_name
            )
735 736 737 738 739
            assert len(kwargs[input_name]) == len(
                src_op.desc.input(input_name)
            ), "number of tensor for input [{}] is not match".format(input_name)
        for output_name in src_op.desc.output_names():
            assert output_name in kwargs, "input [{}] is not given".format(
740 741
                output_name
            )
742 743 744
            assert len(kwargs[output_name]) == len(
                src_op.desc.output(output_name)
            ), "number of tensor for input [{}] is not match".format(
745 746
                output_name
            )
747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766

        X_var = main_block.var(kwargs['X'][0])
        Out_var = main_block.var(kwargs['Out'][0])
        XShape_var = main_block.var(kwargs['XShape'][0])
        shape_list = src_op.desc.attr("shape")
        ShapeTensor_var_list = []
        for name in kwargs['ShapeTensor']:
            ShapeTensor_var_list.append(name)
        Shape_var_list = []
        for name in kwargs['Shape']:
            Shape_var_list.append(name)

        # got dist attribute info
        out_dim_mapping = op_dist_attr.get_output_dims_mapping(Out_var.name)
        process_mesh_shape = op_dist_attr.process_mesh.topology

        # modify target shape
        for idx, axis in enumerate(out_dim_mapping):
            if axis >= 0:
                if len(shape_list) > idx:
767 768 769
                    shape_list[idx] = (
                        shape_list[idx] // process_mesh_shape[axis]
                    )
770 771

        # create op
772
        new_op_desc = main_block.append_op(type='nop').desc
773 774 775 776 777 778 779 780 781 782 783 784 785 786
        new_op_desc.copy_from(src_op.desc)
        set_dist_op_desc_original_id(new_op_desc, src_op.desc, ctx)
        new_op_desc.set_input('ShapeTensor', ShapeTensor_var_list)
        new_op_desc.set_input('Shape', Shape_var_list)
        new_op_desc.set_input('X', [X_var.name])
        new_op_desc.set_output('XShape', [XShape_var.name])
        new_op_desc.set_output('Out', [Out_var.name])
        new_op_desc._set_attr('shape', shape_list)

    @staticmethod
    def backward(ctx, *args, **kwargs):
        DistributedDefaultImpl0.backward(ctx, *args, **kwargs)


787
register_distributed_operator_impl(
788 789 790 791 792 793 794 795
    "reshape2", DistributedReshapeImpl0("add_one_dim_back")
)
register_distributed_operator_impl(
    "reshape2", DistributedReshapeImpl1("remove_one_dim_back")
)
register_distributed_operator_impl(
    "reshape2", DistributedReshapeImpl2("same_dim_shape")
)