From 51fd58477edf28b80add4cde6e712cda647faaf7 Mon Sep 17 00:00:00 2001 From: Webbley Date: Fri, 6 Mar 2020 17:25:54 +0800 Subject: [PATCH] fix bug in pgl --- pgl/graph.py | 116 +++++++++++++++++++++++++++--- pgl/graph_kernel.pyx | 15 +++- pgl/graph_wrapper.py | 135 ++++++++++++++++++++++++++++++----- pgl/tests/test_hetergraph.py | 49 +++++++++++++ 4 files changed, 285 insertions(+), 30 deletions(-) diff --git a/pgl/graph.py b/pgl/graph.py index c0ac181..30a1475 100644 --- a/pgl/graph.py +++ b/pgl/graph.py @@ -20,8 +20,9 @@ import numpy as np import pickle as pkl import time import pgl.graph_kernel as graph_kernel +from collections import defaultdict -__all__ = ['Graph', 'SubGraph'] +__all__ = ['Graph', 'SubGraph', 'MultiGraph'] def _hide_num_nodes(shape): @@ -140,11 +141,11 @@ class Graph(object): self._edges = edges self._num_nodes = num_nodes - if len(edges) == 0: - raise ValueError("The Graph have no edges.") - self._adj_src_index = None self._adj_dst_index = None + self.indegree() + self._num_graph = 1 + self._graph_lod = np.array([0, self.num_nodes], dtype="int32") def dump(self, path): if not os.path.exists(path): @@ -176,10 +177,15 @@ class Graph(object): """Return an EdgeIndex object for src. """ if self._adj_src_index is None: + if len(self._edges) == 0: + u = np.array([], dtype="int64") + v = np.array([], dtype="int64") + else: + u = self._edges[:, 0] + v = self._edges[:, 1] + self._adj_src_index = EdgeIndex( - u=self._edges[:, 0], - v=self._edges[:, 1], - num_nodes=self._num_nodes) + u=u, v=v, num_nodes=self._num_nodes) return self._adj_src_index @property @@ -187,10 +193,15 @@ class Graph(object): """Return an EdgeIndex object for dst. """ if self._adj_dst_index is None: + if len(self._edges) == 0: + v = np.array([], dtype="int64") + u = np.array([], dtype="int64") + else: + v = self._edges[:, 0] + u = self._edges[:, 1] + self._adj_dst_index = EdgeIndex( - u=self._edges[:, 1], - v=self._edges[:, 0], - num_nodes=self._num_nodes) + u=u, v=v, num_nodes=self._num_nodes) return self._adj_dst_index @property @@ -777,6 +788,16 @@ class Graph(object): cur_nodes = nxt_nodes return walk + @property + def num_graph(self): + """ Return Number of Graphs""" + return self._num_graph + + @property + def graph_lod(self): + """ Return Graph Lod Index for Paddle Computation""" + return self._graph_lod + class SubGraph(Graph): """Implementation of SubGraph in pgl. @@ -832,6 +853,81 @@ class SubGraph(Graph): return graph_kernel.map_nodes(nodes, self._to_reindex) +class MultiGraph(Graph): + """Implementation of multiple disjoint graph structure in pgl. + + This is a simple implementation of graph structure in pgl. + + Args: + graph_list : A list of Graph Instances + + Examples: + + .. code-block:: python + + batch_graph = MultiGraph([graph1, graph2, graph3]) + + """ + + def __init__(self, graph_list): + num_nodes = np.sum([g.num_nodes for g in graph_list]) + node_feat = self._join_node_feature(graph_list) + edge_feat = self._join_edge_feature(graph_list) + edges = self._join_edges(graph_list) + super(MultiGraph, self).__init__( + num_nodes=num_nodes, + edges=edges, + node_feat=node_feat, + edge_feat=edge_feat) + self._num_graph = len(graph_list) + self._src_graph = graph_list + graph_lod = [g.num_nodes for g in graph_list] + graph_lod = np.cumsum(graph_lod, dtype="int32") + graph_lod = np.insert(graph_lod, 0, 0) + self._graph_lod = graph_lod + + def __getitem__(self, index): + return self._src_graph[index] + + def _join_node_feature(self, graph_list): + """join node features for multiple graph""" + node_feat = defaultdict(lambda: []) + for graph in graph_list: + for key in graph.node_feat: + node_feat[key].append(graph.node_feat[key]) + ret_node_feat = {} + for key in node_feat: + ret_node_feat[key] = np.vstack(node_feat[key]) + return ret_node_feat + + def _join_edge_feature(self, graph_list): + """join edge features for multiple graph""" + edge_feat = defaultdict(lambda: []) + for graph in graph_list: + for key in graph.edge_feat: + efeat = graph.edge_feat[key] + if len(efeat) > 0: + edge_feat[key].append(efeat) + + ret_edge_feat = {} + for key in edge_feat: + ret_edge_feat[key] = np.vstack(edge_feat[key]) + return ret_edge_feat + + def _join_edges(self, graph_list): + """join edges for multiple graph""" + list_edges = [] + start_offset = 0 + for graph in graph_list: + edges = graph.edges + if len(edges) > 0: + edges = edges + start_offset + list_edges.append(edges) + start_offset += graph.num_nodes + edges = np.vstack(list_edges) + return edges + + class MemmapEdgeIndex(EdgeIndex): def __init__(self, path): self._degree = np.load(os.path.join(path, 'degree.npy'), mmap_mode="r") diff --git a/pgl/graph_kernel.pyx b/pgl/graph_kernel.pyx index adcd343..5e5f289 100644 --- a/pgl/graph_kernel.pyx +++ b/pgl/graph_kernel.pyx @@ -219,7 +219,11 @@ def sample_subset(list nids, long long maxdegree, shuffle=False): output.append(nids[inc]) else: sample_size = buff_size if buff_size <= maxdegree else maxdegree - subset_choose_index(sample_size, nids[inc], rnd, buff_nid, offset) + if isinstance(nids[inc], list): + tmp = np.array(nids[inc], dtype=np.int64) + else: + tmp = nids[inc] + subset_choose_index(sample_size, tmp, rnd, buff_nid, offset) output.append(buff_nid[offset:offset+sample_size]) offset += sample_size return output @@ -252,7 +256,14 @@ def sample_subset_with_eid(list nids, list eids, long long maxdegree, shuffle=Fa output_eid.append(eids[inc]) else: sample_size = buff_size if buff_size <= maxdegree else maxdegree - subset_choose_index_eid(sample_size, nids[inc], eids[inc], rnd, buff_nid, buff_eid, offset) + if isinstance(nids[inc], list): + tmp = np.array(nids[inc], dtype=np.int64) + tmp_eids = np.array(eids[inc], dtype=np.int64) + else: + tmp = nids[inc] + tmp_eids = eids[inc] + + subset_choose_index_eid(sample_size, tmp, tmp_eids, rnd, buff_nid, buff_eid, offset) output.append(buff_nid[offset:offset+sample_size]) output_eid.append(buff_eid[offset:offset+sample_size]) offset += sample_size diff --git a/pgl/graph_wrapper.py b/pgl/graph_wrapper.py index 7f1ac82..85b9a7b 100644 --- a/pgl/graph_wrapper.py +++ b/pgl/graph_wrapper.py @@ -36,19 +36,22 @@ def send(src, dst, nfeat, efeat, message_func): return msg -def recv(dst, uniq_dst, bucketing_index, msg, reduce_function, node_ids): +def recv(dst, uniq_dst, bucketing_index, msg, reduce_function, num_nodes, + num_edges): """Recv message from given msg to dst nodes. """ + empty_msg_flag = fluid.layers.cast(num_edges > 0, dtype="float32") if reduce_function == "sum": if isinstance(msg, dict): raise TypeError("The message for build-in function" " should be Tensor not dict.") try: - out_dims = msg.shape[-1] - init_output = fluid.layers.fill_constant_batch_size_like( - node_ids, shape=[1, out_dims], value=0, dtype="float32") + out_dim = msg.shape[-1] + init_output = fluid.layers.fill_constant( + shape=[num_nodes, out_dim], value=0, dtype="float32") init_output.stop_gradient = False + msg = msg * empty_msg_flag output = paddle_helper.scatter_add(init_output, dst, msg) return output except TypeError as e: @@ -60,17 +63,16 @@ def recv(dst, uniq_dst, bucketing_index, msg, reduce_function, node_ids): reduce_function = sum_func - # convert msg into lodtensor bucketed_msg = op.nested_lod_reset(msg, bucketing_index) - # Check dim for bucketed_msg equal to out_dims output = reduce_function(bucketed_msg) - out_dims = output.shape[-1] + output_dim = output.shape[-1] + output = output * empty_msg_flag - init_output = fluid.layers.fill_constant_batch_size_like( - node_ids, shape=[1, out_dims], value=0, dtype="float32") - init_output.stop_gradient = False - output = fluid.layers.scatter(init_output, uniq_dst, output) - return output + init_output = fluid.layers.fill_constant( + shape=[num_nodes, output_dim], value=0, dtype="float32") + init_output.stop_gradient = True + final_output = fluid.layers.scatter(init_output, uniq_dst, output) + return final_output class BaseGraphWrapper(object): @@ -98,6 +100,8 @@ class BaseGraphWrapper(object): self._edge_uniq_dst = None self._edge_uniq_dst_count = None self._node_ids = None + self._graph_lod = None + self._num_graph = None self._data_name_prefix = "" def __repr__(self): @@ -194,7 +198,8 @@ class BaseGraphWrapper(object): bucketing_index=self._edge_uniq_dst_count, msg=msg, reduce_function=reduce_function, - node_ids=self._node_ids) + num_edges=self._num_edges, + num_nodes=self._num_nodes) return output @property @@ -216,6 +221,24 @@ class BaseGraphWrapper(object): """ return self._num_nodes + @property + def graph_lod(self): + """Return graph index for graphs + + Return: + A variable with shape [None ] as the Lod information of multiple-graph. + """ + return self._graph_lod + + @property + def num_graph(self): + """Return a variable of number of graphs + + Return: + A variable with shape (1,) as the number of Graphs in int64. + """ + return self._num_graph + @property def edge_feat(self): """Return a dictionary of tensor representing edge features. @@ -309,7 +332,6 @@ class StaticGraphWrapper(BaseGraphWrapper): def __create_graph_attr(self, graph): """Create graph attributes for paddlepaddle. """ - src, dst = list(zip(*graph.edges)) src, dst, eid = graph.sorted_edges(sort_by="dst") indegree = graph.indegree() nodes = graph.nodes @@ -317,6 +339,17 @@ class StaticGraphWrapper(BaseGraphWrapper): uniq_dst_count = indegree[indegree > 0] uniq_dst_count = np.cumsum(uniq_dst_count, dtype='int32') uniq_dst_count = np.insert(uniq_dst_count, 0, 0) + graph_lod = graph.graph_lod + num_graph = graph.num_graph + + num_edges = len(src) + if num_edges == 0: + # Fake Graph + src = np.array([0], dtype="int64") + dst = np.array([0], dtype="int64") + eid = np.array([0], dtype="int64") + uniq_dst_count = np.array([0, 1], dtype="int32") + uniq_dst = np.array([0], dtype="int64") edge_feat = {} @@ -327,6 +360,20 @@ class StaticGraphWrapper(BaseGraphWrapper): self.__create_graph_node_feat(node_feat, self._initializers) self.__create_graph_edge_feat(edge_feat, self._initializers) + self._num_edges, init = paddle_helper.constant( + dtype="int64", + value=np.array( + [num_edges], dtype="int64"), + name=self._data_name_prefix + '/num_edges') + self._initializers.append(init) + + self._num_graph, init = paddle_helper.constant( + dtype="int64", + value=np.array( + [num_graph], dtype="int64"), + name=self._data_name_prefix + '/num_graph') + self._initializers.append(init) + self._edges_src, init = paddle_helper.constant( dtype="int64", value=src, @@ -358,6 +405,12 @@ class StaticGraphWrapper(BaseGraphWrapper): value=uniq_dst_count) self._initializers.append(init) + self._graph_lod, init = paddle_helper.constant( + name=self._data_name_prefix + "/graph_lod", + dtype="int32", + value=graph_lod) + self._initializers.append(init) + node_ids_value = np.arange(0, graph.num_nodes, dtype="int64") self._node_ids, init = paddle_helper.constant( name=self._data_name_prefix + "/node_ids", @@ -496,6 +549,18 @@ class GraphWrapper(BaseGraphWrapper): def __create_graph_attr_holders(self): """Create data holders for graph attributes. """ + self._num_edges = fluid.layers.data( + self._data_name_prefix + '/num_edges', + shape=[1], + append_batch_size=False, + dtype="int64", + stop_gradient=True) + self._num_graph = fluid.layers.data( + self._data_name_prefix + '/num_graph', + shape=[1], + append_batch_size=False, + dtype="int64", + stop_gradient=True) self._edges_src = fluid.layers.data( self._data_name_prefix + '/edges_src', shape=[None], @@ -514,18 +579,28 @@ class GraphWrapper(BaseGraphWrapper): append_batch_size=False, dtype='int64', stop_gradient=True) + self._edge_uniq_dst = fluid.layers.data( self._data_name_prefix + "/uniq_dst", shape=[None], append_batch_size=False, dtype="int64", stop_gradient=True) + + self._graph_lod = fluid.layers.data( + self._data_name_prefix + "/graph_lod", + shape=[None], + append_batch_size=False, + dtype="int32", + stop_gradient=True) + self._edge_uniq_dst_count = fluid.layers.data( self._data_name_prefix + "/uniq_dst_count", shape=[None], append_batch_size=False, dtype="int32", stop_gradient=True) + self._node_ids = fluid.layers.data( self._data_name_prefix + "/node_ids", shape=[None], @@ -539,9 +614,15 @@ class GraphWrapper(BaseGraphWrapper): dtype="int64", stop_gradient=True) self._holder_list.extend([ - self._edges_src, self._edges_dst, self._num_nodes, - self._edge_uniq_dst, self._edge_uniq_dst_count, self._node_ids, - self._indegree + self._edges_src, + self._edges_dst, + self._num_nodes, + self._edge_uniq_dst, + self._edge_uniq_dst_count, + self._node_ids, + self._indegree, + self._graph_lod, + self._num_graph, ]) def __create_graph_node_feat_holders(self, node_feat_name, node_feat_shape, @@ -587,10 +668,22 @@ class GraphWrapper(BaseGraphWrapper): src, dst, eid = graph.sorted_edges(sort_by="dst") indegree = graph.indegree() nodes = graph.nodes + num_edges = len(src) uniq_dst = nodes[indegree > 0] uniq_dst_count = indegree[indegree > 0] uniq_dst_count = np.cumsum(uniq_dst_count, dtype='int32') uniq_dst_count = np.insert(uniq_dst_count, 0, 0) + num_graph = graph.num_graph + graph_lod = graph.graph_lod + + if num_edges == 0: + # Fake Graph + src = np.array([0], dtype="int64") + dst = np.array([0], dtype="int64") + eid = np.array([0], dtype="int64") + + uniq_dst_count = np.array([0, 1], dtype="int32") + uniq_dst = np.array([0], dtype="int64") edge_feat = {} @@ -598,14 +691,20 @@ class GraphWrapper(BaseGraphWrapper): edge_feat[key] = value[eid] node_feat = graph.node_feat + feed_dict[self._data_name_prefix + '/num_edges'] = np.array( + [num_edges], dtype="int64") feed_dict[self._data_name_prefix + '/edges_src'] = src feed_dict[self._data_name_prefix + '/edges_dst'] = dst feed_dict[self._data_name_prefix + '/num_nodes'] = np.array( - graph.num_nodes) + [graph.num_nodes], dtype="int64") feed_dict[self._data_name_prefix + '/uniq_dst'] = uniq_dst feed_dict[self._data_name_prefix + '/uniq_dst_count'] = uniq_dst_count feed_dict[self._data_name_prefix + '/node_ids'] = graph.nodes feed_dict[self._data_name_prefix + '/indegree'] = indegree + feed_dict[self._data_name_prefix + '/graph_lod'] = graph_lod + feed_dict[self._data_name_prefix + '/num_graph'] = np.array( + [num_graph], dtype="int64") + feed_dict[self._data_name_prefix + '/indegree'] = indegree for key in self.node_feat_tensor_dict: feed_dict[self._data_name_prefix + '/node_feat/' + diff --git a/pgl/tests/test_hetergraph.py b/pgl/tests/test_hetergraph.py index 769541c..7e824b1 100644 --- a/pgl/tests/test_hetergraph.py +++ b/pgl/tests/test_hetergraph.py @@ -70,6 +70,55 @@ class HeterGraphTest(unittest.TestCase): self.assertEqual(len(nodes), batch_size) self.assertListEqual(list(nodes), ground[idx]) + def test_sample_successor(self): + print() + nodes = [4, 5, 8] + md = 2 + succes = self.graph.sample_successor( + edge_type='p2a', nodes=nodes, max_degree=md, return_eids=False) + self.assertIsInstance(succes, list) + ground = [[10, 11, 12, 14, 13], [], [14]] + for succ, g in zip(succes, ground): + self.assertIsInstance(succ, np.ndarray) + for i in succ: + self.assertIn(i, g) + + nodes = [4] + succes = self.graph.sample_successor( + edge_type='p2a', nodes=nodes, max_degree=md, return_eids=False) + self.assertIsInstance(succes, list) + ground = [[10, 11, 12, 14, 13]] + for succ, g in zip(succes, ground): + self.assertIsInstance(succ, np.ndarray) + for i in succ: + self.assertIn(i, g) + + def test_successor(self): + print() + nodes = [4, 5, 8] + e_type = 'p2a' + succes = self.graph.successor( + edge_type=e_type, + nodes=nodes, ) + + self.assertIsInstance(succes, np.ndarray) + ground = [[10, 11, 12, 14, 13], [], [14]] + for succ, g in zip(succes, ground): + self.assertIsInstance(succ, np.ndarray) + self.assertCountEqual(succ, g) + + nodes = [4] + e_type = 'p2a' + succes = self.graph.successor( + edge_type=e_type, + nodes=nodes, ) + + self.assertIsInstance(succes, np.ndarray) + ground = [[10, 11, 12, 14, 13]] + for succ, g in zip(succes, ground): + self.assertIsInstance(succ, np.ndarray) + self.assertCountEqual(succ, g) + def test_sample_nodes(self): print() p_ground = [4, 5, 6, 7, 8, 9] -- GitLab