graph_wrapper.py 25.1 KB
Newer Older
Y
yelrose 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This package provides interface to help building static computational graph
for PaddlePaddle.
"""

import warnings
import numpy as np
import paddle.fluid as fluid

from pgl.utils import op
from pgl.utils import paddle_helper
from pgl.utils.logger import log

__all__ = ["BaseGraphWrapper", "GraphWrapper", "StaticGraphWrapper"]


def send(src, dst, nfeat, efeat, message_func):
    """Send message from src to dst.
    """
    src_feat = op.read_rows(nfeat, src)
    dst_feat = op.read_rows(nfeat, dst)
    msg = message_func(src_feat, dst_feat, efeat)
    return msg


W
Webbley 已提交
39 40
def recv(dst, uniq_dst, bucketing_index, msg, reduce_function, num_nodes,
         num_edges):
Y
yelrose 已提交
41 42
    """Recv message from given msg to dst nodes.
    """
W
Webbley 已提交
43
    empty_msg_flag = fluid.layers.cast(num_edges > 0, dtype="float32")
Y
yelrose 已提交
44 45 46 47 48 49
    if reduce_function == "sum":
        if isinstance(msg, dict):
            raise TypeError("The message for build-in function"
                            " should be Tensor not dict.")

        try:
W
Webbley 已提交
50 51 52
            out_dim = msg.shape[-1]
            init_output = fluid.layers.fill_constant(
                shape=[num_nodes, out_dim], value=0, dtype="float32")
Y
yelrose 已提交
53
            init_output.stop_gradient = False
W
Webbley 已提交
54
            msg = msg * empty_msg_flag
Y
yelrose 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67
            output = paddle_helper.scatter_add(init_output, dst, msg)
            return output
        except TypeError as e:
            warnings.warn(
                "scatter_add is not supported with paddle version <= 1.5")

            def sum_func(message):
                return fluid.layers.sequence_pool(message, "sum")

            reduce_function = sum_func

    bucketed_msg = op.nested_lod_reset(msg, bucketing_index)
    output = reduce_function(bucketed_msg)
W
Webbley 已提交
68 69
    output_dim = output.shape[-1]
    output = output * empty_msg_flag
Y
yelrose 已提交
70

W
Webbley 已提交
71 72 73 74 75
    init_output = fluid.layers.fill_constant(
        shape=[num_nodes, output_dim], value=0, dtype="float32")
    init_output.stop_gradient = True
    final_output = fluid.layers.scatter(init_output, uniq_dst, output)
    return final_output
Y
yelrose 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93


class BaseGraphWrapper(object):
    """This module implement base class for graph wrapper.

    Currently our PGL is developed based on static computational mode of
    paddle (we'll support dynamic computational model later). We need to build
    model upon a virtual data holder. BaseGraphWrapper provide a virtual
    graph structure that users can build deep learning models
    based on this virtual graph. And then feed real graph data to run
    the models. Moreover, we provide convenient message-passing interface
    (send & recv) for building graph neural networks.

    NOTICE: Don't use this BaseGraphWrapper directly. Use :code:`GraphWrapper`
    and :code:`StaticGraphWrapper` to create graph wrapper instead.
    """

    def __init__(self):
L
liweibin 已提交
94 95
        self.node_feat_tensor_dict = {}
        self.edge_feat_tensor_dict = {}
Y
yelrose 已提交
96 97 98 99 100 101 102
        self._edges_src = None
        self._edges_dst = None
        self._num_nodes = None
        self._indegree = None
        self._edge_uniq_dst = None
        self._edge_uniq_dst_count = None
        self._node_ids = None
W
Webbley 已提交
103 104
        self._graph_lod = None
        self._num_graph = None
L
liweibin 已提交
105 106 107 108
        self._data_name_prefix = ""

    def __repr__(self):
        return self._data_name_prefix
Y
yelrose 已提交
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197

    def send(self, message_func, nfeat_list=None, efeat_list=None):
        """Send message from all src nodes to dst nodes.

        The UDF message function should has the following format.

        .. code-block:: python

            def message_func(src_feat, dst_feat, edge_feat):
                '''
                    Args:
                        src_feat: the node feat dict attached to the src nodes.
                        dst_feat: the node feat dict attached to the dst nodes.
                        edge_feat: the edge feat dict attached to the
                                   corresponding (src, dst) edges.

                    Return:
                        It should return a tensor or a dictionary of tensor. And each tensor
                        should have a shape of (num_edges, dims).
                '''
                pass

        Args:
            message_func: UDF function.
            nfeat_list: a list of names or tuple (name, tensor)
            efeat_list: a list of names or tuple (name, tensor)

        Return:
            A dictionary of tensor representing the message. Each of the values
            in the dictionary has a shape (num_edges, dim) which should be collected
            by :code:`recv` function.
        """
        if efeat_list is None:
            efeat_list = {}
        if nfeat_list is None:
            nfeat_list = {}

        src, dst = self.edges
        nfeat = {}
        for feat in nfeat_list:
            if isinstance(feat, str):
                nfeat[feat] = self.node_feat[feat]
            else:
                name, tensor = feat
                nfeat[name] = tensor

        efeat = {}
        for feat in efeat_list:
            if isinstance(feat, str):
                efeat[feat] = self.edge_feat[feat]
            else:
                name, tensor = feat
                efeat[name] = tensor

        msg = send(src, dst, nfeat, efeat, message_func)
        return msg

    def recv(self, msg, reduce_function):
        """Recv message and aggregate the message by reduce_fucntion

        The UDF reduce_function function should has the following format.

        .. code-block:: python

            def reduce_func(msg):
                '''
                    Args:
                        msg: A LodTensor or a dictionary of LodTensor whose batch_size
                             is equals to the number of unique dst nodes.

                    Return:
                        It should return a tensor with shape (batch_size, out_dims). The
                        batch size should be the same as msg.
                '''
                pass

        Args:
            msg: A tensor or a dictionary of tensor created by send function..

            reduce_function: UDF reduce function or strings "sum" as built-in function.
                             The built-in "sum" will use scatter_add to optimized the speed.

        Return:
            A tensor with shape (num_nodes, out_dims). The output for nodes with no message
            will be zeros.
        """
        output = recv(
            dst=self._edges_dst,
            uniq_dst=self._edge_uniq_dst,
Y
Yelrose 已提交
198
            bucketing_index=self._edge_uniq_dst_count,
Y
yelrose 已提交
199 200
            msg=msg,
            reduce_function=reduce_function,
W
Webbley 已提交
201 202
            num_edges=self._num_edges,
            num_nodes=self._num_nodes)
Y
yelrose 已提交
203 204 205 206 207 208 209 210
        return output

    @property
    def edges(self):
        """Return a tuple of edge Tensor (src, dst).

        Return:
            A tuple of Tensor (src, dst). Src and dst are both
Y
Yelrose 已提交
211
            tensor with shape (num_edges, ) and dtype int64.
Y
yelrose 已提交
212 213 214 215 216 217 218 219
        """
        return self._edges_src, self._edges_dst

    @property
    def num_nodes(self):
        """Return a variable of number of nodes

        Return:
Y
Yelrose 已提交
220
            A variable with shape (1,) as the number of nodes in int64.
Y
yelrose 已提交
221 222 223
        """
        return self._num_nodes

W
Webbley 已提交
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
    @property
    def graph_lod(self):
        """Return graph index for graphs

        Return:
            A variable with shape [None ]  as the Lod information of multiple-graph.
        """
        return self._graph_lod

    @property
    def num_graph(self):
        """Return a variable of number of graphs

        Return:
            A variable with shape (1,) as the number of Graphs in int64.
        """
        return self._num_graph

Y
yelrose 已提交
242 243 244 245 246 247 248 249
    @property
    def edge_feat(self):
        """Return a dictionary of tensor representing edge features.

        Return:
            A dictionary whose keys are the feature names and the values
            are feature tensor.
        """
L
liweibin 已提交
250
        return self.edge_feat_tensor_dict
Y
yelrose 已提交
251 252 253 254 255 256 257 258 259

    @property
    def node_feat(self):
        """Return a dictionary of tensor representing node features.

        Return:
            A dictionary whose keys are the feature names and the values
            are feature tensor.
        """
L
liweibin 已提交
260
        return self.node_feat_tensor_dict
Y
yelrose 已提交
261 262 263 264 265

    def indegree(self):
        """Return the indegree tensor for all nodes.

        Return:
Y
Yelrose 已提交
266
            A tensor of shape (num_nodes, ) in int64.
Y
yelrose 已提交
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
        """
        return self._indegree


class StaticGraphWrapper(BaseGraphWrapper):
    """Implement a graph wrapper that the data of the graph won't
    be changed and it can be fit into the GPU or CPU memory. This
    can reduce the time of swapping large data from GPU and CPU.

    Args:
        name: The graph data prefix

        graph: The static graph that should be put into memory

        place: fluid.CPUPlace or fluid.GPUPlace(n) indicating the
               device to hold the graph data.

    Examples:

        If we have a immutable graph and it can be fit into the GPU or CPU.
        we can just use a :code:`StaticGraphWrapper` to pre-place the graph
        data into devices.

        .. code-block:: python

            import numpy as np
            import paddle.fluid as fluid
            from pgl.graph import Graph
            from pgl.graph_wrapper import StaticGraphWrapper

            place = fluid.CPUPlace()
            exe = fluid.Excecutor(place)

            num_nodes = 5
            edges = [ (0, 1), (1, 2), (3, 4)]
            feature = np.random.randn(5, 100)
            edge_feature = np.random.randn(3, 100)
            graph = Graph(num_nodes=num_nodes,
                        edges=edges,
                        node_feat={
                            "feature": feature
                        },
                        edge_feat={
                            "edge_feature": edge_feature
                        })

            graph_wrapper = StaticGraphWrapper(name="graph",
                        graph=graph,
                        place=place)

            # build your deep graph model

            # Initialize parameters for deep graph model
            exe.run(fluid.default_startup_program())

            # Initialize graph data
            graph_wrapper.initialize(place)
    """

    def __init__(self, name, graph, place):
        super(StaticGraphWrapper, self).__init__()
L
liweibin 已提交
328
        self._data_name_prefix = name
Y
yelrose 已提交
329 330 331 332 333 334 335 336 337 338 339
        self._initializers = []
        self.__create_graph_attr(graph)

    def __create_graph_attr(self, graph):
        """Create graph attributes for paddlepaddle.
        """
        src, dst, eid = graph.sorted_edges(sort_by="dst")
        indegree = graph.indegree()
        nodes = graph.nodes
        uniq_dst = nodes[indegree > 0]
        uniq_dst_count = indegree[indegree > 0]
Y
Yelrose 已提交
340 341
        uniq_dst_count = np.cumsum(uniq_dst_count, dtype='int32')
        uniq_dst_count = np.insert(uniq_dst_count, 0, 0)
W
Webbley 已提交
342 343 344 345 346 347 348 349 350 351 352
        graph_lod = graph.graph_lod
        num_graph = graph.num_graph

        num_edges = len(src)
        if num_edges == 0:
            # Fake Graph
            src = np.array([0], dtype="int64")
            dst = np.array([0], dtype="int64")
            eid = np.array([0], dtype="int64")
            uniq_dst_count = np.array([0, 1], dtype="int32")
            uniq_dst = np.array([0], dtype="int64")
Y
yelrose 已提交
353 354 355 356 357 358 359 360 361 362

        edge_feat = {}

        for key, value in graph.edge_feat.items():
            edge_feat[key] = value[eid]
        node_feat = graph.node_feat

        self.__create_graph_node_feat(node_feat, self._initializers)
        self.__create_graph_edge_feat(edge_feat, self._initializers)

W
Webbley 已提交
363 364 365 366 367 368 369 370 371 372 373 374 375 376
        self._num_edges, init = paddle_helper.constant(
            dtype="int64",
            value=np.array(
                [num_edges], dtype="int64"),
            name=self._data_name_prefix + '/num_edges')
        self._initializers.append(init)

        self._num_graph, init = paddle_helper.constant(
            dtype="int64",
            value=np.array(
                [num_graph], dtype="int64"),
            name=self._data_name_prefix + '/num_graph')
        self._initializers.append(init)

Y
yelrose 已提交
377
        self._edges_src, init = paddle_helper.constant(
Y
Yelrose 已提交
378
            dtype="int64",
Y
yelrose 已提交
379
            value=src,
L
liweibin 已提交
380
            name=self._data_name_prefix + '/edges_src')
Y
yelrose 已提交
381 382 383
        self._initializers.append(init)

        self._edges_dst, init = paddle_helper.constant(
Y
Yelrose 已提交
384
            dtype="int64",
Y
yelrose 已提交
385
            value=dst,
L
liweibin 已提交
386
            name=self._data_name_prefix + '/edges_dst')
Y
yelrose 已提交
387 388 389
        self._initializers.append(init)

        self._num_nodes, init = paddle_helper.constant(
Y
Yelrose 已提交
390
            dtype="int64",
Y
yelrose 已提交
391 392
            hide_batch_size=False,
            value=np.array([graph.num_nodes]),
L
liweibin 已提交
393
            name=self._data_name_prefix + '/num_nodes')
Y
yelrose 已提交
394 395 396
        self._initializers.append(init)

        self._edge_uniq_dst, init = paddle_helper.constant(
L
liweibin 已提交
397
            name=self._data_name_prefix + "/uniq_dst",
Y
Yelrose 已提交
398
            dtype="int64",
Y
yelrose 已提交
399 400 401 402
            value=uniq_dst)
        self._initializers.append(init)

        self._edge_uniq_dst_count, init = paddle_helper.constant(
L
liweibin 已提交
403
            name=self._data_name_prefix + "/uniq_dst_count",
Y
yelrose 已提交
404 405 406 407
            dtype="int32",
            value=uniq_dst_count)
        self._initializers.append(init)

W
Webbley 已提交
408 409 410 411 412 413
        self._graph_lod, init = paddle_helper.constant(
            name=self._data_name_prefix + "/graph_lod",
            dtype="int32",
            value=graph_lod)
        self._initializers.append(init)

Y
Yelrose 已提交
414
        node_ids_value = np.arange(0, graph.num_nodes, dtype="int64")
Y
yelrose 已提交
415
        self._node_ids, init = paddle_helper.constant(
L
liweibin 已提交
416
            name=self._data_name_prefix + "/node_ids",
Y
Yelrose 已提交
417
            dtype="int64",
Y
yelrose 已提交
418 419 420 421
            value=node_ids_value)
        self._initializers.append(init)

        self._indegree, init = paddle_helper.constant(
L
liweibin 已提交
422
            name=self._data_name_prefix + "/indegree",
Y
Yelrose 已提交
423
            dtype="int64",
Y
yelrose 已提交
424 425 426 427 428 429 430 431 432
            value=indegree)
        self._initializers.append(init)

    def __create_graph_node_feat(self, node_feat, collector):
        """Convert node features into paddlepaddle tensor.
        """
        for node_feat_name, node_feat_value in node_feat.items():
            node_feat_shape = node_feat_value.shape
            node_feat_dtype = node_feat_value.dtype
L
liweibin 已提交
433
            self.node_feat_tensor_dict[
Y
yelrose 已提交
434
                node_feat_name], init = paddle_helper.constant(
L
liweibin 已提交
435
                    name=self._data_name_prefix + '/node_feat/' +
Y
Yelrose 已提交
436
                    node_feat_name,
Y
yelrose 已提交
437 438 439 440 441 442 443 444 445 446
                    dtype=node_feat_dtype,
                    value=node_feat_value)
            collector.append(init)

    def __create_graph_edge_feat(self, edge_feat, collector):
        """Convert edge features into paddlepaddle tensor.
        """
        for edge_feat_name, edge_feat_value in edge_feat.items():
            edge_feat_shape = edge_feat_value.shape
            edge_feat_dtype = edge_feat_value.dtype
L
liweibin 已提交
447
            self.edge_feat_tensor_dict[
Y
yelrose 已提交
448
                edge_feat_name], init = paddle_helper.constant(
L
liweibin 已提交
449
                    name=self._data_name_prefix + '/edge_feat/' +
Y
Yelrose 已提交
450
                    edge_feat_name,
Y
yelrose 已提交
451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
                    dtype=edge_feat_dtype,
                    value=edge_feat_value)
            collector.append(init)

    def initialize(self, place):
        """Placing the graph data into the devices.

        Args:
            place: fluid.CPUPlace or fluid.GPUPlace(n) indicating the
                   device to hold the graph data.
        """
        log.info(
            "StaticGraphWrapper.initialize must be called after startup program"
        )
        for init_func in self._initializers:
            init_func(place)


class GraphWrapper(BaseGraphWrapper):
    """Implement a graph wrapper that creates a graph data holders
    that attributes and features in the graph are :code:`fluid.layers.data`.
    And we provide interface :code:`to_feed` to help converting :code:`Graph`
    data into :code:`feed_dict`.

    Args:
        name: The graph data prefix

        place: fluid.CPUPlace or fluid.GPUPlace(n) indicating the
               device to hold the graph data.

        node_feat: A list of tuples that decribe the details of node
                   feature tenosr. Each tuple mush be (name, shape, dtype)
                   and the first dimension of the shape must be set unknown
                   (-1 or None) or we can easily use :code:`Graph.node_feat_info()`
                   to get the node_feat settings.

        edge_feat: A list of tuples that decribe the details of edge
                   feature tenosr. Each tuple mush be (name, shape, dtype)
                   and the first dimension of the shape must be set unknown
                   (-1 or None) or we can easily use :code:`Graph.edge_feat_info()`
                   to get the edge_feat settings.

    Examples:

        .. code-block:: python

            import numpy as np
            import paddle.fluid as fluid
            from pgl.graph import Graph
            from pgl.graph_wrapper import GraphWrapper

            place = fluid.CPUPlace()
            exe = fluid.Excecutor(place)

            num_nodes = 5
            edges = [ (0, 1), (1, 2), (3, 4)]
            feature = np.random.randn(5, 100)
            edge_feature = np.random.randn(3, 100)
            graph = Graph(num_nodes=num_nodes,
                        edges=edges,
                        node_feat={
                            "feature": feature
                        },
                        edge_feat={
                            "edge_feature": edge_feature
                        })

            graph_wrapper = GraphWrapper(name="graph",
                        place=place,
                        node_feat=graph.node_feat_info(),
                        edge_feat=graph.edge_feat_info())

            # build your deep graph model
            ...

            # Initialize parameters for deep graph model
            exe.run(fluid.default_startup_program())

            for i in range(10):
                feed_dict = graph_wrapper.to_feed(graph)
                ret = exe.run(fetch_list=[...], feed=feed_dict )
    """

    def __init__(self, name, place, node_feat=[], edge_feat=[]):
        super(GraphWrapper, self).__init__()
Y
Yelrose 已提交
536
        # collect holders for PyReader
L
liweibin 已提交
537
        self._data_name_prefix = name
Y
Yelrose 已提交
538
        self._holder_list = []
Y
yelrose 已提交
539 540 541 542 543 544 545 546 547 548 549 550 551
        self._place = place
        self.__create_graph_attr_holders()
        for node_feat_name, node_feat_shape, node_feat_dtype in node_feat:
            self.__create_graph_node_feat_holders(
                node_feat_name, node_feat_shape, node_feat_dtype)

        for edge_feat_name, edge_feat_shape, edge_feat_dtype in edge_feat:
            self.__create_graph_edge_feat_holders(
                edge_feat_name, edge_feat_shape, edge_feat_dtype)

    def __create_graph_attr_holders(self):
        """Create data holders for graph attributes.
        """
W
Webbley 已提交
552 553 554 555 556 557 558 559 560 561 562 563
        self._num_edges = fluid.layers.data(
            self._data_name_prefix + '/num_edges',
            shape=[1],
            append_batch_size=False,
            dtype="int64",
            stop_gradient=True)
        self._num_graph = fluid.layers.data(
            self._data_name_prefix + '/num_graph',
            shape=[1],
            append_batch_size=False,
            dtype="int64",
            stop_gradient=True)
Y
yelrose 已提交
564
        self._edges_src = fluid.layers.data(
L
liweibin 已提交
565
            self._data_name_prefix + '/edges_src',
Y
yelrose 已提交
566 567
            shape=[None],
            append_batch_size=False,
Y
Yelrose 已提交
568
            dtype="int64",
Y
yelrose 已提交
569 570
            stop_gradient=True)
        self._edges_dst = fluid.layers.data(
L
liweibin 已提交
571
            self._data_name_prefix + '/edges_dst',
Y
yelrose 已提交
572 573
            shape=[None],
            append_batch_size=False,
Y
Yelrose 已提交
574
            dtype="int64",
Y
yelrose 已提交
575 576
            stop_gradient=True)
        self._num_nodes = fluid.layers.data(
L
liweibin 已提交
577
            self._data_name_prefix + '/num_nodes',
Y
yelrose 已提交
578 579
            shape=[1],
            append_batch_size=False,
Y
Yelrose 已提交
580
            dtype='int64',
Y
yelrose 已提交
581
            stop_gradient=True)
W
Webbley 已提交
582

Y
yelrose 已提交
583
        self._edge_uniq_dst = fluid.layers.data(
L
liweibin 已提交
584
            self._data_name_prefix + "/uniq_dst",
Y
yelrose 已提交
585 586
            shape=[None],
            append_batch_size=False,
Y
Yelrose 已提交
587
            dtype="int64",
Y
yelrose 已提交
588
            stop_gradient=True)
W
Webbley 已提交
589 590 591 592 593 594 595 596

        self._graph_lod = fluid.layers.data(
            self._data_name_prefix + "/graph_lod",
            shape=[None],
            append_batch_size=False,
            dtype="int32",
            stop_gradient=True)

Y
yelrose 已提交
597
        self._edge_uniq_dst_count = fluid.layers.data(
L
liweibin 已提交
598
            self._data_name_prefix + "/uniq_dst_count",
Y
yelrose 已提交
599 600 601 602
            shape=[None],
            append_batch_size=False,
            dtype="int32",
            stop_gradient=True)
W
Webbley 已提交
603

Y
yelrose 已提交
604
        self._node_ids = fluid.layers.data(
L
liweibin 已提交
605
            self._data_name_prefix + "/node_ids",
Y
yelrose 已提交
606 607
            shape=[None],
            append_batch_size=False,
Y
Yelrose 已提交
608
            dtype="int64",
Y
yelrose 已提交
609 610
            stop_gradient=True)
        self._indegree = fluid.layers.data(
L
liweibin 已提交
611
            self._data_name_prefix + "/indegree",
Y
yelrose 已提交
612 613
            shape=[None],
            append_batch_size=False,
Y
Yelrose 已提交
614
            dtype="int64",
Y
yelrose 已提交
615
            stop_gradient=True)
Y
Yelrose 已提交
616
        self._holder_list.extend([
W
Webbley 已提交
617 618 619 620 621 622 623 624 625
            self._edges_src,
            self._edges_dst,
            self._num_nodes,
            self._edge_uniq_dst,
            self._edge_uniq_dst_count,
            self._node_ids,
            self._indegree,
            self._graph_lod,
            self._num_graph,
W
Webbley 已提交
626
            self._num_edges,
Y
Yelrose 已提交
627
        ])
Y
yelrose 已提交
628 629 630 631 632 633

    def __create_graph_node_feat_holders(self, node_feat_name, node_feat_shape,
                                         node_feat_dtype):
        """Create data holders for node features.
        """
        feat_holder = fluid.layers.data(
L
liweibin 已提交
634
            self._data_name_prefix + '/node_feat/' + node_feat_name,
Y
yelrose 已提交
635 636 637 638
            shape=node_feat_shape,
            append_batch_size=False,
            dtype=node_feat_dtype,
            stop_gradient=True)
L
liweibin 已提交
639
        self.node_feat_tensor_dict[node_feat_name] = feat_holder
Y
Yelrose 已提交
640
        self._holder_list.append(feat_holder)
Y
yelrose 已提交
641 642 643 644 645 646

    def __create_graph_edge_feat_holders(self, edge_feat_name, edge_feat_shape,
                                         edge_feat_dtype):
        """Create edge holders for edge features.
        """
        feat_holder = fluid.layers.data(
L
liweibin 已提交
647
            self._data_name_prefix + '/edge_feat/' + edge_feat_name,
Y
yelrose 已提交
648 649 650 651
            shape=edge_feat_shape,
            append_batch_size=False,
            dtype=edge_feat_dtype,
            stop_gradient=True)
L
liweibin 已提交
652
        self.edge_feat_tensor_dict[edge_feat_name] = feat_holder
Y
Yelrose 已提交
653
        self._holder_list.append(feat_holder)
Y
yelrose 已提交
654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671

    def to_feed(self, graph):
        """Convert the graph into feed_dict.

        This function helps to convert graph data into feed dict
        for :code:`fluid.Excecutor` to run the model.

        Args:
            graph: the :code:`Graph` data object

        Return:
            A dictionary contains data holder names and its corresponding
            data.
        """
        feed_dict = {}
        src, dst, eid = graph.sorted_edges(sort_by="dst")
        indegree = graph.indegree()
        nodes = graph.nodes
W
Webbley 已提交
672
        num_edges = len(src)
Y
yelrose 已提交
673 674
        uniq_dst = nodes[indegree > 0]
        uniq_dst_count = indegree[indegree > 0]
Y
Yelrose 已提交
675 676
        uniq_dst_count = np.cumsum(uniq_dst_count, dtype='int32')
        uniq_dst_count = np.insert(uniq_dst_count, 0, 0)
W
Webbley 已提交
677 678 679 680 681 682 683 684 685 686 687
        num_graph = graph.num_graph
        graph_lod = graph.graph_lod

        if num_edges == 0:
            # Fake Graph
            src = np.array([0], dtype="int64")
            dst = np.array([0], dtype="int64")
            eid = np.array([0], dtype="int64")

            uniq_dst_count = np.array([0, 1], dtype="int32")
            uniq_dst = np.array([0], dtype="int64")
Y
yelrose 已提交
688 689 690 691 692 693 694

        edge_feat = {}

        for key, value in graph.edge_feat.items():
            edge_feat[key] = value[eid]
        node_feat = graph.node_feat

W
Webbley 已提交
695 696
        feed_dict[self._data_name_prefix + '/num_edges'] = np.array(
            [num_edges], dtype="int64")
L
liweibin 已提交
697 698 699
        feed_dict[self._data_name_prefix + '/edges_src'] = src
        feed_dict[self._data_name_prefix + '/edges_dst'] = dst
        feed_dict[self._data_name_prefix + '/num_nodes'] = np.array(
W
Webbley 已提交
700
            [graph.num_nodes], dtype="int64")
L
liweibin 已提交
701 702 703 704
        feed_dict[self._data_name_prefix + '/uniq_dst'] = uniq_dst
        feed_dict[self._data_name_prefix + '/uniq_dst_count'] = uniq_dst_count
        feed_dict[self._data_name_prefix + '/node_ids'] = graph.nodes
        feed_dict[self._data_name_prefix + '/indegree'] = indegree
W
Webbley 已提交
705 706 707 708
        feed_dict[self._data_name_prefix + '/graph_lod'] = graph_lod
        feed_dict[self._data_name_prefix + '/num_graph'] = np.array(
            [num_graph], dtype="int64")
        feed_dict[self._data_name_prefix + '/indegree'] = indegree
L
liweibin 已提交
709 710 711

        for key in self.node_feat_tensor_dict:
            feed_dict[self._data_name_prefix + '/node_feat/' +
Y
Yelrose 已提交
712
                      key] = node_feat[key]
Y
yelrose 已提交
713

L
liweibin 已提交
714 715
        for key in self.edge_feat_tensor_dict:
            feed_dict[self._data_name_prefix + '/edge_feat/' +
Y
Yelrose 已提交
716
                      key] = edge_feat[key]
Y
yelrose 已提交
717 718

        return feed_dict
Y
Yelrose 已提交
719 720 721 722 723 724

    @property
    def holder_list(self):
        """Return the holder list.
        """
        return self._holder_list