graph_wrapper.py 29.9 KB
Newer Older
Y
yelrose 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This package provides interface to help building static computational graph
for PaddlePaddle.
"""

import warnings
import numpy as np
import paddle.fluid as fluid
Y
Yelrose 已提交
22
import paddle.fluid.layers as L
Y
yelrose 已提交
23 24 25 26 27

from pgl.utils import op
from pgl.utils import paddle_helper
from pgl.utils.logger import log

Y
Yelrose 已提交
28
__all__ = ["BaseGraphWrapper", "GraphWrapper", "StaticGraphWrapper", "BatchGraphWrapper"]
Y
yelrose 已提交
29

Y
Yelrose 已提交
30
def send(src, dst, nfeat, efeat, message_func):
Y
yelrose 已提交
31 32
    """Send message from src to dst.
    """
Y
Yelrose 已提交
33 34
    src_feat = op.RowReader(nfeat, src)
    dst_feat = op.RowReader(nfeat, dst)
Y
yelrose 已提交
35 36 37 38
    msg = message_func(src_feat, dst_feat, efeat)
    return msg


W
Webbley 已提交
39 40
def recv(dst, uniq_dst, bucketing_index, msg, reduce_function, num_nodes,
         num_edges):
Y
yelrose 已提交
41 42 43 44 45 46 47 48
    """Recv message from given msg to dst nodes.
    """
    if reduce_function == "sum":
        if isinstance(msg, dict):
            raise TypeError("The message for build-in function"
                            " should be Tensor not dict.")

        try:
W
Webbley 已提交
49
            out_dim = msg.shape[-1]
Y
Yelrose 已提交
50
            init_output = L.fill_constant(
51
                shape=[num_nodes, out_dim], value=0, dtype=msg.dtype)
Y
yelrose 已提交
52
            init_output.stop_gradient = False
Y
Yelrose 已提交
53
            empty_msg_flag = L.cast(num_edges > 0, dtype=msg.dtype)
W
Webbley 已提交
54
            msg = msg * empty_msg_flag
Y
yelrose 已提交
55 56 57 58 59 60 61
            output = paddle_helper.scatter_add(init_output, dst, msg)
            return output
        except TypeError as e:
            warnings.warn(
                "scatter_add is not supported with paddle version <= 1.5")

            def sum_func(message):
Y
Yelrose 已提交
62
                return L.sequence_pool(message, "sum")
Y
yelrose 已提交
63 64 65 66 67

            reduce_function = sum_func

    bucketed_msg = op.nested_lod_reset(msg, bucketing_index)
    output = reduce_function(bucketed_msg)
W
Webbley 已提交
68
    output_dim = output.shape[-1]
69

Y
Yelrose 已提交
70
    empty_msg_flag = L.cast(num_edges > 0, dtype=output.dtype)
W
Webbley 已提交
71
    output = output * empty_msg_flag
Y
yelrose 已提交
72

Y
Yelrose 已提交
73
    init_output = L.fill_constant(
74
        shape=[num_nodes, output_dim], value=0, dtype=output.dtype)
W
Webbley 已提交
75
    init_output.stop_gradient = True
Y
Yelrose 已提交
76
    final_output = L.scatter(init_output, uniq_dst, output)
W
Webbley 已提交
77
    return final_output
Y
yelrose 已提交
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95


class BaseGraphWrapper(object):
    """This module implement base class for graph wrapper.

    Currently our PGL is developed based on static computational mode of
    paddle (we'll support dynamic computational model later). We need to build
    model upon a virtual data holder. BaseGraphWrapper provide a virtual
    graph structure that users can build deep learning models
    based on this virtual graph. And then feed real graph data to run
    the models. Moreover, we provide convenient message-passing interface
    (send & recv) for building graph neural networks.

    NOTICE: Don't use this BaseGraphWrapper directly. Use :code:`GraphWrapper`
    and :code:`StaticGraphWrapper` to create graph wrapper instead.
    """

    def __init__(self):
L
liweibin 已提交
96 97
        self.node_feat_tensor_dict = {}
        self.edge_feat_tensor_dict = {}
Y
yelrose 已提交
98 99 100 101 102 103
        self._edges_src = None
        self._edges_dst = None
        self._num_nodes = None
        self._indegree = None
        self._edge_uniq_dst = None
        self._edge_uniq_dst_count = None
W
Webbley 已提交
104 105
        self._graph_lod = None
        self._num_graph = None
Y
Yelrose 已提交
106
        self._num_edges = None
L
liweibin 已提交
107 108 109 110
        self._data_name_prefix = ""

    def __repr__(self):
        return self._data_name_prefix
Y
yelrose 已提交
111

Y
Yelrose 已提交
112
    def send(self, message_func, nfeat_list=None, efeat_list=None):
Y
yelrose 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
        """Send message from all src nodes to dst nodes.

        The UDF message function should has the following format.

        .. code-block:: python

            def message_func(src_feat, dst_feat, edge_feat):
                '''
                    Args:
                        src_feat: the node feat dict attached to the src nodes.
                        dst_feat: the node feat dict attached to the dst nodes.
                        edge_feat: the edge feat dict attached to the
                                   corresponding (src, dst) edges.

                    Return:
                        It should return a tensor or a dictionary of tensor. And each tensor
                        should have a shape of (num_edges, dims).
                '''
                pass

        Args:
            message_func: UDF function.
            nfeat_list: a list of names or tuple (name, tensor)
            efeat_list: a list of names or tuple (name, tensor)

        Return:
            A dictionary of tensor representing the message. Each of the values
            in the dictionary has a shape (num_edges, dim) which should be collected
            by :code:`recv` function.
        """
        if efeat_list is None:
            efeat_list = {}
Y
Yelrose 已提交
145

Y
yelrose 已提交
146 147 148 149 150
        if nfeat_list is None:
            nfeat_list = {}

        src, dst = self.edges
        nfeat = {}
Y
Yelrose 已提交
151

Y
yelrose 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
        for feat in nfeat_list:
            if isinstance(feat, str):
                nfeat[feat] = self.node_feat[feat]
            else:
                name, tensor = feat
                nfeat[name] = tensor

        efeat = {}
        for feat in efeat_list:
            if isinstance(feat, str):
                efeat[feat] = self.edge_feat[feat]
            else:
                name, tensor = feat
                efeat[name] = tensor

Y
Yelrose 已提交
167
        msg = send(src, dst, nfeat, efeat, message_func)
Y
yelrose 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
        return msg

    def recv(self, msg, reduce_function):
        """Recv message and aggregate the message by reduce_fucntion

        The UDF reduce_function function should has the following format.

        .. code-block:: python

            def reduce_func(msg):
                '''
                    Args:
                        msg: A LodTensor or a dictionary of LodTensor whose batch_size
                             is equals to the number of unique dst nodes.

                    Return:
                        It should return a tensor with shape (batch_size, out_dims). The
                        batch size should be the same as msg.
                '''
                pass

        Args:
            msg: A tensor or a dictionary of tensor created by send function..

            reduce_function: UDF reduce function or strings "sum" as built-in function.
                             The built-in "sum" will use scatter_add to optimized the speed.

        Return:
            A tensor with shape (num_nodes, out_dims). The output for nodes with no message
            will be zeros.
        """
        output = recv(
            dst=self._edges_dst,
            uniq_dst=self._edge_uniq_dst,
Y
Yelrose 已提交
202
            bucketing_index=self._edge_uniq_dst_count,
Y
yelrose 已提交
203 204
            msg=msg,
            reduce_function=reduce_function,
W
Webbley 已提交
205 206
            num_edges=self._num_edges,
            num_nodes=self._num_nodes)
Y
yelrose 已提交
207 208 209 210 211 212 213 214
        return output

    @property
    def edges(self):
        """Return a tuple of edge Tensor (src, dst).

        Return:
            A tuple of Tensor (src, dst). Src and dst are both
Y
Yelrose 已提交
215
            tensor with shape (num_edges, ) and dtype int64.
Y
yelrose 已提交
216 217 218 219 220 221 222 223
        """
        return self._edges_src, self._edges_dst

    @property
    def num_nodes(self):
        """Return a variable of number of nodes

        Return:
Y
Yelrose 已提交
224
            A variable with shape (1,) as the number of nodes in int64.
Y
yelrose 已提交
225 226 227
        """
        return self._num_nodes

W
Webbley 已提交
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
    @property
    def graph_lod(self):
        """Return graph index for graphs

        Return:
            A variable with shape [None ]  as the Lod information of multiple-graph.
        """
        return self._graph_lod

    @property
    def num_graph(self):
        """Return a variable of number of graphs

        Return:
            A variable with shape (1,) as the number of Graphs in int64.
        """
        return self._num_graph

Y
yelrose 已提交
246 247 248 249 250 251 252 253
    @property
    def edge_feat(self):
        """Return a dictionary of tensor representing edge features.

        Return:
            A dictionary whose keys are the feature names and the values
            are feature tensor.
        """
L
liweibin 已提交
254
        return self.edge_feat_tensor_dict
Y
yelrose 已提交
255 256 257 258 259 260 261 262 263

    @property
    def node_feat(self):
        """Return a dictionary of tensor representing node features.

        Return:
            A dictionary whose keys are the feature names and the values
            are feature tensor.
        """
L
liweibin 已提交
264
        return self.node_feat_tensor_dict
Y
yelrose 已提交
265 266 267 268 269

    def indegree(self):
        """Return the indegree tensor for all nodes.

        Return:
Y
Yelrose 已提交
270
            A tensor of shape (num_nodes, ) in int64.
Y
yelrose 已提交
271 272 273 274 275 276 277 278 279 280 281 282 283 284
        """
        return self._indegree


class StaticGraphWrapper(BaseGraphWrapper):
    """Implement a graph wrapper that the data of the graph won't
    be changed and it can be fit into the GPU or CPU memory. This
    can reduce the time of swapping large data from GPU and CPU.

    Args:
        name: The graph data prefix

        graph: The static graph that should be put into memory

W
Webbley 已提交
285
        place: fluid.CPUPlace or fluid.CUDAPlace(n) indicating the
Y
yelrose 已提交
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
               device to hold the graph data.

    Examples:

        If we have a immutable graph and it can be fit into the GPU or CPU.
        we can just use a :code:`StaticGraphWrapper` to pre-place the graph
        data into devices.

        .. code-block:: python

            import numpy as np
            import paddle.fluid as fluid
            from pgl.graph import Graph
            from pgl.graph_wrapper import StaticGraphWrapper

            place = fluid.CPUPlace()
            exe = fluid.Excecutor(place)

            num_nodes = 5
            edges = [ (0, 1), (1, 2), (3, 4)]
            feature = np.random.randn(5, 100)
            edge_feature = np.random.randn(3, 100)
            graph = Graph(num_nodes=num_nodes,
                        edges=edges,
                        node_feat={
                            "feature": feature
                        },
                        edge_feat={
                            "edge_feature": edge_feature
                        })

            graph_wrapper = StaticGraphWrapper(name="graph",
                        graph=graph,
                        place=place)

            # build your deep graph model

            # Initialize parameters for deep graph model
            exe.run(fluid.default_startup_program())

            # Initialize graph data
            graph_wrapper.initialize(place)
    """

    def __init__(self, name, graph, place):
        super(StaticGraphWrapper, self).__init__()
L
liweibin 已提交
332
        self._data_name_prefix = name
Y
yelrose 已提交
333 334 335 336 337 338 339 340 341 342 343
        self._initializers = []
        self.__create_graph_attr(graph)

    def __create_graph_attr(self, graph):
        """Create graph attributes for paddlepaddle.
        """
        src, dst, eid = graph.sorted_edges(sort_by="dst")
        indegree = graph.indegree()
        nodes = graph.nodes
        uniq_dst = nodes[indegree > 0]
        uniq_dst_count = indegree[indegree > 0]
Y
Yelrose 已提交
344 345
        uniq_dst_count = np.cumsum(uniq_dst_count, dtype='int32')
        uniq_dst_count = np.insert(uniq_dst_count, 0, 0)
W
Webbley 已提交
346 347 348 349 350 351 352 353 354 355 356
        graph_lod = graph.graph_lod
        num_graph = graph.num_graph

        num_edges = len(src)
        if num_edges == 0:
            # Fake Graph
            src = np.array([0], dtype="int64")
            dst = np.array([0], dtype="int64")
            eid = np.array([0], dtype="int64")
            uniq_dst_count = np.array([0, 1], dtype="int32")
            uniq_dst = np.array([0], dtype="int64")
Y
yelrose 已提交
357 358 359 360 361 362 363 364 365 366

        edge_feat = {}

        for key, value in graph.edge_feat.items():
            edge_feat[key] = value[eid]
        node_feat = graph.node_feat

        self.__create_graph_node_feat(node_feat, self._initializers)
        self.__create_graph_edge_feat(edge_feat, self._initializers)

W
Webbley 已提交
367 368 369 370 371 372 373 374 375 376 377 378 379 380
        self._num_edges, init = paddle_helper.constant(
            dtype="int64",
            value=np.array(
                [num_edges], dtype="int64"),
            name=self._data_name_prefix + '/num_edges')
        self._initializers.append(init)

        self._num_graph, init = paddle_helper.constant(
            dtype="int64",
            value=np.array(
                [num_graph], dtype="int64"),
            name=self._data_name_prefix + '/num_graph')
        self._initializers.append(init)

Y
yelrose 已提交
381
        self._edges_src, init = paddle_helper.constant(
Y
Yelrose 已提交
382
            dtype="int64",
Y
yelrose 已提交
383
            value=src,
L
liweibin 已提交
384
            name=self._data_name_prefix + '/edges_src')
Y
yelrose 已提交
385 386 387
        self._initializers.append(init)

        self._edges_dst, init = paddle_helper.constant(
Y
Yelrose 已提交
388
            dtype="int64",
Y
yelrose 已提交
389
            value=dst,
L
liweibin 已提交
390
            name=self._data_name_prefix + '/edges_dst')
Y
yelrose 已提交
391 392 393
        self._initializers.append(init)

        self._num_nodes, init = paddle_helper.constant(
Y
Yelrose 已提交
394
            dtype="int64",
Y
yelrose 已提交
395 396
            hide_batch_size=False,
            value=np.array([graph.num_nodes]),
L
liweibin 已提交
397
            name=self._data_name_prefix + '/num_nodes')
Y
yelrose 已提交
398 399 400
        self._initializers.append(init)

        self._edge_uniq_dst, init = paddle_helper.constant(
L
liweibin 已提交
401
            name=self._data_name_prefix + "/uniq_dst",
Y
Yelrose 已提交
402
            dtype="int64",
Y
yelrose 已提交
403 404 405 406
            value=uniq_dst)
        self._initializers.append(init)

        self._edge_uniq_dst_count, init = paddle_helper.constant(
L
liweibin 已提交
407
            name=self._data_name_prefix + "/uniq_dst_count",
Y
yelrose 已提交
408 409 410 411
            dtype="int32",
            value=uniq_dst_count)
        self._initializers.append(init)

W
Webbley 已提交
412 413 414 415 416 417
        self._graph_lod, init = paddle_helper.constant(
            name=self._data_name_prefix + "/graph_lod",
            dtype="int32",
            value=graph_lod)
        self._initializers.append(init)

Y
yelrose 已提交
418
        self._indegree, init = paddle_helper.constant(
L
liweibin 已提交
419
            name=self._data_name_prefix + "/indegree",
Y
Yelrose 已提交
420
            dtype="int64",
Y
yelrose 已提交
421 422 423 424 425 426 427 428 429
            value=indegree)
        self._initializers.append(init)

    def __create_graph_node_feat(self, node_feat, collector):
        """Convert node features into paddlepaddle tensor.
        """
        for node_feat_name, node_feat_value in node_feat.items():
            node_feat_shape = node_feat_value.shape
            node_feat_dtype = node_feat_value.dtype
L
liweibin 已提交
430
            self.node_feat_tensor_dict[
Y
yelrose 已提交
431
                node_feat_name], init = paddle_helper.constant(
L
liweibin 已提交
432
                    name=self._data_name_prefix + '/node_feat/' +
Y
Yelrose 已提交
433
                    node_feat_name,
Y
yelrose 已提交
434 435 436 437 438 439 440 441 442 443
                    dtype=node_feat_dtype,
                    value=node_feat_value)
            collector.append(init)

    def __create_graph_edge_feat(self, edge_feat, collector):
        """Convert edge features into paddlepaddle tensor.
        """
        for edge_feat_name, edge_feat_value in edge_feat.items():
            edge_feat_shape = edge_feat_value.shape
            edge_feat_dtype = edge_feat_value.dtype
L
liweibin 已提交
444
            self.edge_feat_tensor_dict[
Y
yelrose 已提交
445
                edge_feat_name], init = paddle_helper.constant(
L
liweibin 已提交
446
                    name=self._data_name_prefix + '/edge_feat/' +
Y
Yelrose 已提交
447
                    edge_feat_name,
Y
yelrose 已提交
448 449 450 451 452 453 454 455
                    dtype=edge_feat_dtype,
                    value=edge_feat_value)
            collector.append(init)

    def initialize(self, place):
        """Placing the graph data into the devices.

        Args:
W
Webbley 已提交
456
            place: fluid.CPUPlace or fluid.CUDAPlace(n) indicating the
Y
yelrose 已提交
457 458 459 460 461 462 463 464 465 466 467
                   device to hold the graph data.
        """
        log.info(
            "StaticGraphWrapper.initialize must be called after startup program"
        )
        for init_func in self._initializers:
            init_func(place)


class GraphWrapper(BaseGraphWrapper):
    """Implement a graph wrapper that creates a graph data holders
Y
Yelrose 已提交
468
    that attributes and features in the graph are :code:`L.data`.
Y
yelrose 已提交
469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526
    And we provide interface :code:`to_feed` to help converting :code:`Graph`
    data into :code:`feed_dict`.

    Args:
        name: The graph data prefix

        node_feat: A list of tuples that decribe the details of node
                   feature tenosr. Each tuple mush be (name, shape, dtype)
                   and the first dimension of the shape must be set unknown
                   (-1 or None) or we can easily use :code:`Graph.node_feat_info()`
                   to get the node_feat settings.

        edge_feat: A list of tuples that decribe the details of edge
                   feature tenosr. Each tuple mush be (name, shape, dtype)
                   and the first dimension of the shape must be set unknown
                   (-1 or None) or we can easily use :code:`Graph.edge_feat_info()`
                   to get the edge_feat settings.

    Examples:

        .. code-block:: python

            import numpy as np
            import paddle.fluid as fluid
            from pgl.graph import Graph
            from pgl.graph_wrapper import GraphWrapper

            place = fluid.CPUPlace()
            exe = fluid.Excecutor(place)

            num_nodes = 5
            edges = [ (0, 1), (1, 2), (3, 4)]
            feature = np.random.randn(5, 100)
            edge_feature = np.random.randn(3, 100)
            graph = Graph(num_nodes=num_nodes,
                        edges=edges,
                        node_feat={
                            "feature": feature
                        },
                        edge_feat={
                            "edge_feature": edge_feature
                        })

            graph_wrapper = GraphWrapper(name="graph",
                        node_feat=graph.node_feat_info(),
                        edge_feat=graph.edge_feat_info())

            # build your deep graph model
            ...

            # Initialize parameters for deep graph model
            exe.run(fluid.default_startup_program())

            for i in range(10):
                feed_dict = graph_wrapper.to_feed(graph)
                ret = exe.run(fetch_list=[...], feed=feed_dict )
    """

Y
yelrose 已提交
527
    def __init__(self, name, node_feat=[], edge_feat=[], **kwargs):
Y
yelrose 已提交
528
        super(GraphWrapper, self).__init__()
Y
Yelrose 已提交
529
        # collect holders for PyReader
L
liweibin 已提交
530
        self._data_name_prefix = name
Y
Yelrose 已提交
531
        self._holder_list = []
Y
yelrose 已提交
532 533 534 535 536 537 538 539 540 541 542 543
        self.__create_graph_attr_holders()
        for node_feat_name, node_feat_shape, node_feat_dtype in node_feat:
            self.__create_graph_node_feat_holders(
                node_feat_name, node_feat_shape, node_feat_dtype)

        for edge_feat_name, edge_feat_shape, edge_feat_dtype in edge_feat:
            self.__create_graph_edge_feat_holders(
                edge_feat_name, edge_feat_shape, edge_feat_dtype)

    def __create_graph_attr_holders(self):
        """Create data holders for graph attributes.
        """
Y
Yelrose 已提交
544
        self._num_edges = L.data(
W
Webbley 已提交
545 546 547 548 549
            self._data_name_prefix + '/num_edges',
            shape=[1],
            append_batch_size=False,
            dtype="int64",
            stop_gradient=True)
Y
Yelrose 已提交
550
        self._num_graph = L.data(
W
Webbley 已提交
551 552 553 554 555
            self._data_name_prefix + '/num_graph',
            shape=[1],
            append_batch_size=False,
            dtype="int64",
            stop_gradient=True)
Y
Yelrose 已提交
556
        self._edges_src = L.data(
L
liweibin 已提交
557
            self._data_name_prefix + '/edges_src',
Y
yelrose 已提交
558 559
            shape=[None],
            append_batch_size=False,
Y
Yelrose 已提交
560
            dtype="int64",
Y
yelrose 已提交
561
            stop_gradient=True)
Y
Yelrose 已提交
562
        self._edges_dst = L.data(
L
liweibin 已提交
563
            self._data_name_prefix + '/edges_dst',
Y
yelrose 已提交
564 565
            shape=[None],
            append_batch_size=False,
Y
Yelrose 已提交
566
            dtype="int64",
Y
yelrose 已提交
567
            stop_gradient=True)
Y
Yelrose 已提交
568
        self._num_nodes = L.data(
L
liweibin 已提交
569
            self._data_name_prefix + '/num_nodes',
Y
yelrose 已提交
570 571
            shape=[1],
            append_batch_size=False,
Y
Yelrose 已提交
572
            dtype='int64',
Y
yelrose 已提交
573
            stop_gradient=True)
W
Webbley 已提交
574

Y
Yelrose 已提交
575
        self._edge_uniq_dst = L.data(
L
liweibin 已提交
576
            self._data_name_prefix + "/uniq_dst",
Y
yelrose 已提交
577 578
            shape=[None],
            append_batch_size=False,
Y
Yelrose 已提交
579
            dtype="int64",
Y
yelrose 已提交
580
            stop_gradient=True)
W
Webbley 已提交
581

Y
Yelrose 已提交
582
        self._graph_lod = L.data(
W
Webbley 已提交
583 584 585 586 587 588
            self._data_name_prefix + "/graph_lod",
            shape=[None],
            append_batch_size=False,
            dtype="int32",
            stop_gradient=True)

Y
Yelrose 已提交
589
        self._edge_uniq_dst_count = L.data(
L
liweibin 已提交
590
            self._data_name_prefix + "/uniq_dst_count",
Y
yelrose 已提交
591 592 593 594
            shape=[None],
            append_batch_size=False,
            dtype="int32",
            stop_gradient=True)
W
Webbley 已提交
595

Y
Yelrose 已提交
596
        self._indegree = L.data(
L
liweibin 已提交
597
            self._data_name_prefix + "/indegree",
Y
yelrose 已提交
598 599
            shape=[None],
            append_batch_size=False,
Y
Yelrose 已提交
600
            dtype="int64",
Y
yelrose 已提交
601
            stop_gradient=True)
Y
Yelrose 已提交
602
        self._holder_list.extend([
W
Webbley 已提交
603 604 605 606 607 608 609 610
            self._edges_src,
            self._edges_dst,
            self._num_nodes,
            self._edge_uniq_dst,
            self._edge_uniq_dst_count,
            self._indegree,
            self._graph_lod,
            self._num_graph,
W
Webbley 已提交
611
            self._num_edges,
Y
Yelrose 已提交
612
        ])
Y
yelrose 已提交
613 614 615 616 617

    def __create_graph_node_feat_holders(self, node_feat_name, node_feat_shape,
                                         node_feat_dtype):
        """Create data holders for node features.
        """
Y
Yelrose 已提交
618
        feat_holder = L.data(
L
liweibin 已提交
619
            self._data_name_prefix + '/node_feat/' + node_feat_name,
Y
yelrose 已提交
620 621 622 623
            shape=node_feat_shape,
            append_batch_size=False,
            dtype=node_feat_dtype,
            stop_gradient=True)
L
liweibin 已提交
624
        self.node_feat_tensor_dict[node_feat_name] = feat_holder
Y
Yelrose 已提交
625
        self._holder_list.append(feat_holder)
Y
yelrose 已提交
626 627 628 629 630

    def __create_graph_edge_feat_holders(self, edge_feat_name, edge_feat_shape,
                                         edge_feat_dtype):
        """Create edge holders for edge features.
        """
Y
Yelrose 已提交
631
        feat_holder = L.data(
L
liweibin 已提交
632
            self._data_name_prefix + '/edge_feat/' + edge_feat_name,
Y
yelrose 已提交
633 634 635 636
            shape=edge_feat_shape,
            append_batch_size=False,
            dtype=edge_feat_dtype,
            stop_gradient=True)
L
liweibin 已提交
637
        self.edge_feat_tensor_dict[edge_feat_name] = feat_holder
Y
Yelrose 已提交
638
        self._holder_list.append(feat_holder)
Y
yelrose 已提交
639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656

    def to_feed(self, graph):
        """Convert the graph into feed_dict.

        This function helps to convert graph data into feed dict
        for :code:`fluid.Excecutor` to run the model.

        Args:
            graph: the :code:`Graph` data object

        Return:
            A dictionary contains data holder names and its corresponding
            data.
        """
        feed_dict = {}
        src, dst, eid = graph.sorted_edges(sort_by="dst")
        indegree = graph.indegree()
        nodes = graph.nodes
W
Webbley 已提交
657
        num_edges = len(src)
Y
yelrose 已提交
658 659
        uniq_dst = nodes[indegree > 0]
        uniq_dst_count = indegree[indegree > 0]
Y
Yelrose 已提交
660 661
        uniq_dst_count = np.cumsum(uniq_dst_count, dtype='int32')
        uniq_dst_count = np.insert(uniq_dst_count, 0, 0)
W
Webbley 已提交
662 663 664 665 666 667 668 669 670 671 672
        num_graph = graph.num_graph
        graph_lod = graph.graph_lod

        if num_edges == 0:
            # Fake Graph
            src = np.array([0], dtype="int64")
            dst = np.array([0], dtype="int64")
            eid = np.array([0], dtype="int64")

            uniq_dst_count = np.array([0, 1], dtype="int32")
            uniq_dst = np.array([0], dtype="int64")
Y
yelrose 已提交
673 674 675 676 677 678 679

        edge_feat = {}

        for key, value in graph.edge_feat.items():
            edge_feat[key] = value[eid]
        node_feat = graph.node_feat

W
Webbley 已提交
680 681
        feed_dict[self._data_name_prefix + '/num_edges'] = np.array(
            [num_edges], dtype="int64")
L
liweibin 已提交
682 683 684
        feed_dict[self._data_name_prefix + '/edges_src'] = src
        feed_dict[self._data_name_prefix + '/edges_dst'] = dst
        feed_dict[self._data_name_prefix + '/num_nodes'] = np.array(
W
Webbley 已提交
685
            [graph.num_nodes], dtype="int64")
L
liweibin 已提交
686 687 688
        feed_dict[self._data_name_prefix + '/uniq_dst'] = uniq_dst
        feed_dict[self._data_name_prefix + '/uniq_dst_count'] = uniq_dst_count
        feed_dict[self._data_name_prefix + '/indegree'] = indegree
W
Webbley 已提交
689 690 691 692
        feed_dict[self._data_name_prefix + '/graph_lod'] = graph_lod
        feed_dict[self._data_name_prefix + '/num_graph'] = np.array(
            [num_graph], dtype="int64")
        feed_dict[self._data_name_prefix + '/indegree'] = indegree
L
liweibin 已提交
693 694 695

        for key in self.node_feat_tensor_dict:
            feed_dict[self._data_name_prefix + '/node_feat/' +
Y
Yelrose 已提交
696
                      key] = node_feat[key]
Y
yelrose 已提交
697

L
liweibin 已提交
698 699
        for key in self.edge_feat_tensor_dict:
            feed_dict[self._data_name_prefix + '/edge_feat/' +
Y
Yelrose 已提交
700
                      key] = edge_feat[key]
Y
yelrose 已提交
701 702

        return feed_dict
Y
Yelrose 已提交
703 704 705 706 707 708

    @property
    def holder_list(self):
        """Return the holder list.
        """
        return self._holder_list
Y
Yelrose 已提交
709 710 711 712 713 714 715 716 717 718 719 720 721 722


def get_degree(edge, num_nodes):
    init_output = L.fill_constant(
        shape=[num_nodes], value=0, dtype="float32")
    init_output.stop_gradient = True
    final_output = L.scatter(init_output,
                       edge,
                       L.full_like(edge, 1, dtype="float32"),
                       overwrite=False)
    return final_output

class DropEdgeWrapper(BaseGraphWrapper):
    """Implement of Edge Drop """
Y
Yelrose 已提交
723
    def __init__(self, graph_wrapper, dropout, keep_self_loop=True):
Y
Yelrose 已提交
724 725 726 727 728 729 730 731 732 733 734 735 736
        super(DropEdgeWrapper, self).__init__()

        # Copy Node's information
        for key, value in graph_wrapper.node_feat.items():
            self.node_feat_tensor_dict[key] = value

        self._num_nodes = graph_wrapper.num_nodes 
        self._graph_lod = graph_wrapper.graph_lod
        self._num_graph = graph_wrapper.num_graph
     
        # Dropout Edges
        src, dst = graph_wrapper.edges
        u = L.uniform_random(shape=L.cast(L.shape(src), 'int64'), min=0., max=1.)
Y
Yelrose 已提交
737
        
Y
Yelrose 已提交
738 739 740 741 742 743

        # Avoid Empty Edges
        keeped = L.cast(u > dropout, dtype="float32")
        self._num_edges = L.reduce_sum(L.cast(keeped, "int32"))
        keeped = keeped + L.cast(self._num_edges == 0, dtype="float32")

Y
Yelrose 已提交
744 745 746 747
        if keep_self_loop:
            self_loop = L.cast(src == dst, dtype="float32")
            keeped = keeped + self_loop

Y
Yelrose 已提交
748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765
        keeped = (keeped > 0.5)
        src = paddle_helper.masked_select(src, keeped)
        dst = paddle_helper.masked_select(dst, keeped)
        src.stop_gradient=True
        dst.stop_gradient=True
        self._edges_src = src 
        self._edges_dst = dst 

        for key, value in graph_wrapper.edge_feat.items():
            self.edge_feat_tensor_dict[key] = paddle_helper.masked_select(value, keeped)
        
        self._edge_uniq_dst, _, uniq_count = L.unique_with_counts(dst, dtype="int32")
        self._edge_uniq_dst.stop_gradient=True
        last = L.reduce_sum(uniq_count, keep_dim=True)
        uniq_count = L.cumsum(uniq_count, exclusive=True)
        self._edge_uniq_dst_count = L.concat([uniq_count, last])
        self._edge_uniq_dst_count.stop_gradient=True
        self._indegree = get_degree(self._edges_dst, self._num_nodes)
Y
Yelrose 已提交
766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851


class BatchGraphWrapper(BaseGraphWrapper):
    """Implement a graph wrapper that user can use their own data holder. 
    And this graph wrapper support multiple graphs which is benefit for data parallel algorithms.

    Args:
        num_nodes (int32 or int64): Shape [ num_graph ]. 

        num_edges (int32 or int64): Shape [ num_graph ]. 

        edges (int32 or int64): Shape [ total_num_edges_in_the_graphs, 2 ]
   
        node_feats: A dictionary for node features. Each value should be tensor
                    with shape [ total_num_nodes_in_the_graphs, feature_size]

        edge_feats: A dictionary for edge features. Each value should be tensor
                    with shape [ total_num_edges_in_the_graphs, feature_size]

    """
    def __init__(self, num_nodes, num_edges, edges, node_feats=None, edge_feats=None):
        super(BatchGraphWrapper, self).__init__()

        node_shift, edge_lod = self.__build_meta_data(num_nodes, num_edges)
        self.__build_edges(edges, node_shift, edge_lod)

        # assign node features
        if node_feats is not None:
            for key, value in node_feats.items():
                self.node_feat_tensor_dict[key] = value 
 
        # assign edge features
        if edge_feats is not None:
            for key, value in edge_feats.items():
                self.edge_feat_tensor_dict[key] = value

        # other meta-data 
        self._edge_uniq_dst, _, uniq_count = L.unique_with_counts(self._edges_dst, dtype="int32")
        self._edge_uniq_dst.stop_gradient=True
        last = L.reduce_sum(uniq_count, keep_dim=True)
        uniq_count = L.cumsum(uniq_count, exclusive=True)
        self._edge_uniq_dst_count = L.concat([uniq_count, last])
        self._edge_uniq_dst_count.stop_gradient=True
        self._indegree = get_degree(self._edges_dst, self._num_nodes)

    def __build_meta_data(self, num_nodes, num_edges):
        """ Merge information for nodes and edges.
        """
        num_nodes = L.reshape(num_nodes, [-1])
        num_edges = L.reshape(num_edges, [-1])
        num_nodes = paddle_helper.ensure_dtype(num_nodes, dtype="int32")
        num_edges = paddle_helper.ensure_dtype(num_edges, dtype="int32")

        num_graph = L.shape(num_nodes)[0]
        sum_num_nodes = L.reduce_sum(num_nodes)
        sum_num_edges = L.reduce_sum(num_edges)
        edge_lod = L.concat([L.cumsum(num_edges, exclusive=True), sum_num_edges])

        node_shift = L.cumsum(num_nodes, exclusive=True)
        graph_lod = L.concat([node_shift, sum_num_nodes])
        self._num_nodes = sum_num_nodes
        self._num_edges = sum_num_edges
        self._num_graph = num_graph
        self._graph_lod = graph_lod
        return node_shift, edge_lod


    def __build_edges(self, edges, node_shift, edge_lod):
        """ Merge subgraph edges. 
        """
        src = edges[:, 0]
        dst = edges[:, 1]
        src = L.reshape(src, [-1])
        dst = L.reshape(dst, [-1])
        src = paddle_helper.ensure_dtype(src, dtype="int32")
        dst = paddle_helper.ensure_dtype(dst, dtype="int32")
        # preprocess edges
        lod_dst = L.lod_reset(dst, edge_lod)
        node_shift = L.reshape(node_shift, [-1, 1])
        node_shift = L.sequence_expand_as(node_shift, lod_dst)
        node_shift = L.reshape(node_shift, [-1])
        src = src + node_shift
        dst = dst + node_shift
        # sort edges
        self._edges_dst, index  = L.argsort(dst)
        self._edges_src = L.gather(src, index, overwrite=False)