提交 51fd5847 编写于 作者: W Webbley

fix bug in pgl

上级 59098f85
...@@ -20,8 +20,9 @@ import numpy as np ...@@ -20,8 +20,9 @@ import numpy as np
import pickle as pkl import pickle as pkl
import time import time
import pgl.graph_kernel as graph_kernel import pgl.graph_kernel as graph_kernel
from collections import defaultdict
__all__ = ['Graph', 'SubGraph'] __all__ = ['Graph', 'SubGraph', 'MultiGraph']
def _hide_num_nodes(shape): def _hide_num_nodes(shape):
...@@ -140,11 +141,11 @@ class Graph(object): ...@@ -140,11 +141,11 @@ class Graph(object):
self._edges = edges self._edges = edges
self._num_nodes = num_nodes self._num_nodes = num_nodes
if len(edges) == 0:
raise ValueError("The Graph have no edges.")
self._adj_src_index = None self._adj_src_index = None
self._adj_dst_index = None self._adj_dst_index = None
self.indegree()
self._num_graph = 1
self._graph_lod = np.array([0, self.num_nodes], dtype="int32")
def dump(self, path): def dump(self, path):
if not os.path.exists(path): if not os.path.exists(path):
...@@ -176,10 +177,15 @@ class Graph(object): ...@@ -176,10 +177,15 @@ class Graph(object):
"""Return an EdgeIndex object for src. """Return an EdgeIndex object for src.
""" """
if self._adj_src_index is None: if self._adj_src_index is None:
if len(self._edges) == 0:
u = np.array([], dtype="int64")
v = np.array([], dtype="int64")
else:
u = self._edges[:, 0]
v = self._edges[:, 1]
self._adj_src_index = EdgeIndex( self._adj_src_index = EdgeIndex(
u=self._edges[:, 0], u=u, v=v, num_nodes=self._num_nodes)
v=self._edges[:, 1],
num_nodes=self._num_nodes)
return self._adj_src_index return self._adj_src_index
@property @property
...@@ -187,10 +193,15 @@ class Graph(object): ...@@ -187,10 +193,15 @@ class Graph(object):
"""Return an EdgeIndex object for dst. """Return an EdgeIndex object for dst.
""" """
if self._adj_dst_index is None: if self._adj_dst_index is None:
if len(self._edges) == 0:
v = np.array([], dtype="int64")
u = np.array([], dtype="int64")
else:
v = self._edges[:, 0]
u = self._edges[:, 1]
self._adj_dst_index = EdgeIndex( self._adj_dst_index = EdgeIndex(
u=self._edges[:, 1], u=u, v=v, num_nodes=self._num_nodes)
v=self._edges[:, 0],
num_nodes=self._num_nodes)
return self._adj_dst_index return self._adj_dst_index
@property @property
...@@ -777,6 +788,16 @@ class Graph(object): ...@@ -777,6 +788,16 @@ class Graph(object):
cur_nodes = nxt_nodes cur_nodes = nxt_nodes
return walk return walk
@property
def num_graph(self):
""" Return Number of Graphs"""
return self._num_graph
@property
def graph_lod(self):
""" Return Graph Lod Index for Paddle Computation"""
return self._graph_lod
class SubGraph(Graph): class SubGraph(Graph):
"""Implementation of SubGraph in pgl. """Implementation of SubGraph in pgl.
...@@ -832,6 +853,81 @@ class SubGraph(Graph): ...@@ -832,6 +853,81 @@ class SubGraph(Graph):
return graph_kernel.map_nodes(nodes, self._to_reindex) return graph_kernel.map_nodes(nodes, self._to_reindex)
class MultiGraph(Graph):
"""Implementation of multiple disjoint graph structure in pgl.
This is a simple implementation of graph structure in pgl.
Args:
graph_list : A list of Graph Instances
Examples:
.. code-block:: python
batch_graph = MultiGraph([graph1, graph2, graph3])
"""
def __init__(self, graph_list):
num_nodes = np.sum([g.num_nodes for g in graph_list])
node_feat = self._join_node_feature(graph_list)
edge_feat = self._join_edge_feature(graph_list)
edges = self._join_edges(graph_list)
super(MultiGraph, self).__init__(
num_nodes=num_nodes,
edges=edges,
node_feat=node_feat,
edge_feat=edge_feat)
self._num_graph = len(graph_list)
self._src_graph = graph_list
graph_lod = [g.num_nodes for g in graph_list]
graph_lod = np.cumsum(graph_lod, dtype="int32")
graph_lod = np.insert(graph_lod, 0, 0)
self._graph_lod = graph_lod
def __getitem__(self, index):
return self._src_graph[index]
def _join_node_feature(self, graph_list):
"""join node features for multiple graph"""
node_feat = defaultdict(lambda: [])
for graph in graph_list:
for key in graph.node_feat:
node_feat[key].append(graph.node_feat[key])
ret_node_feat = {}
for key in node_feat:
ret_node_feat[key] = np.vstack(node_feat[key])
return ret_node_feat
def _join_edge_feature(self, graph_list):
"""join edge features for multiple graph"""
edge_feat = defaultdict(lambda: [])
for graph in graph_list:
for key in graph.edge_feat:
efeat = graph.edge_feat[key]
if len(efeat) > 0:
edge_feat[key].append(efeat)
ret_edge_feat = {}
for key in edge_feat:
ret_edge_feat[key] = np.vstack(edge_feat[key])
return ret_edge_feat
def _join_edges(self, graph_list):
"""join edges for multiple graph"""
list_edges = []
start_offset = 0
for graph in graph_list:
edges = graph.edges
if len(edges) > 0:
edges = edges + start_offset
list_edges.append(edges)
start_offset += graph.num_nodes
edges = np.vstack(list_edges)
return edges
class MemmapEdgeIndex(EdgeIndex): class MemmapEdgeIndex(EdgeIndex):
def __init__(self, path): def __init__(self, path):
self._degree = np.load(os.path.join(path, 'degree.npy'), mmap_mode="r") self._degree = np.load(os.path.join(path, 'degree.npy'), mmap_mode="r")
......
...@@ -219,7 +219,11 @@ def sample_subset(list nids, long long maxdegree, shuffle=False): ...@@ -219,7 +219,11 @@ def sample_subset(list nids, long long maxdegree, shuffle=False):
output.append(nids[inc]) output.append(nids[inc])
else: else:
sample_size = buff_size if buff_size <= maxdegree else maxdegree sample_size = buff_size if buff_size <= maxdegree else maxdegree
subset_choose_index(sample_size, nids[inc], rnd, buff_nid, offset) if isinstance(nids[inc], list):
tmp = np.array(nids[inc], dtype=np.int64)
else:
tmp = nids[inc]
subset_choose_index(sample_size, tmp, rnd, buff_nid, offset)
output.append(buff_nid[offset:offset+sample_size]) output.append(buff_nid[offset:offset+sample_size])
offset += sample_size offset += sample_size
return output return output
...@@ -252,7 +256,14 @@ def sample_subset_with_eid(list nids, list eids, long long maxdegree, shuffle=Fa ...@@ -252,7 +256,14 @@ def sample_subset_with_eid(list nids, list eids, long long maxdegree, shuffle=Fa
output_eid.append(eids[inc]) output_eid.append(eids[inc])
else: else:
sample_size = buff_size if buff_size <= maxdegree else maxdegree sample_size = buff_size if buff_size <= maxdegree else maxdegree
subset_choose_index_eid(sample_size, nids[inc], eids[inc], rnd, buff_nid, buff_eid, offset) if isinstance(nids[inc], list):
tmp = np.array(nids[inc], dtype=np.int64)
tmp_eids = np.array(eids[inc], dtype=np.int64)
else:
tmp = nids[inc]
tmp_eids = eids[inc]
subset_choose_index_eid(sample_size, tmp, tmp_eids, rnd, buff_nid, buff_eid, offset)
output.append(buff_nid[offset:offset+sample_size]) output.append(buff_nid[offset:offset+sample_size])
output_eid.append(buff_eid[offset:offset+sample_size]) output_eid.append(buff_eid[offset:offset+sample_size])
offset += sample_size offset += sample_size
......
...@@ -36,19 +36,22 @@ def send(src, dst, nfeat, efeat, message_func): ...@@ -36,19 +36,22 @@ def send(src, dst, nfeat, efeat, message_func):
return msg return msg
def recv(dst, uniq_dst, bucketing_index, msg, reduce_function, node_ids): def recv(dst, uniq_dst, bucketing_index, msg, reduce_function, num_nodes,
num_edges):
"""Recv message from given msg to dst nodes. """Recv message from given msg to dst nodes.
""" """
empty_msg_flag = fluid.layers.cast(num_edges > 0, dtype="float32")
if reduce_function == "sum": if reduce_function == "sum":
if isinstance(msg, dict): if isinstance(msg, dict):
raise TypeError("The message for build-in function" raise TypeError("The message for build-in function"
" should be Tensor not dict.") " should be Tensor not dict.")
try: try:
out_dims = msg.shape[-1] out_dim = msg.shape[-1]
init_output = fluid.layers.fill_constant_batch_size_like( init_output = fluid.layers.fill_constant(
node_ids, shape=[1, out_dims], value=0, dtype="float32") shape=[num_nodes, out_dim], value=0, dtype="float32")
init_output.stop_gradient = False init_output.stop_gradient = False
msg = msg * empty_msg_flag
output = paddle_helper.scatter_add(init_output, dst, msg) output = paddle_helper.scatter_add(init_output, dst, msg)
return output return output
except TypeError as e: except TypeError as e:
...@@ -60,17 +63,16 @@ def recv(dst, uniq_dst, bucketing_index, msg, reduce_function, node_ids): ...@@ -60,17 +63,16 @@ def recv(dst, uniq_dst, bucketing_index, msg, reduce_function, node_ids):
reduce_function = sum_func reduce_function = sum_func
# convert msg into lodtensor
bucketed_msg = op.nested_lod_reset(msg, bucketing_index) bucketed_msg = op.nested_lod_reset(msg, bucketing_index)
# Check dim for bucketed_msg equal to out_dims
output = reduce_function(bucketed_msg) output = reduce_function(bucketed_msg)
out_dims = output.shape[-1] output_dim = output.shape[-1]
output = output * empty_msg_flag
init_output = fluid.layers.fill_constant_batch_size_like( init_output = fluid.layers.fill_constant(
node_ids, shape=[1, out_dims], value=0, dtype="float32") shape=[num_nodes, output_dim], value=0, dtype="float32")
init_output.stop_gradient = False init_output.stop_gradient = True
output = fluid.layers.scatter(init_output, uniq_dst, output) final_output = fluid.layers.scatter(init_output, uniq_dst, output)
return output return final_output
class BaseGraphWrapper(object): class BaseGraphWrapper(object):
...@@ -98,6 +100,8 @@ class BaseGraphWrapper(object): ...@@ -98,6 +100,8 @@ class BaseGraphWrapper(object):
self._edge_uniq_dst = None self._edge_uniq_dst = None
self._edge_uniq_dst_count = None self._edge_uniq_dst_count = None
self._node_ids = None self._node_ids = None
self._graph_lod = None
self._num_graph = None
self._data_name_prefix = "" self._data_name_prefix = ""
def __repr__(self): def __repr__(self):
...@@ -194,7 +198,8 @@ class BaseGraphWrapper(object): ...@@ -194,7 +198,8 @@ class BaseGraphWrapper(object):
bucketing_index=self._edge_uniq_dst_count, bucketing_index=self._edge_uniq_dst_count,
msg=msg, msg=msg,
reduce_function=reduce_function, reduce_function=reduce_function,
node_ids=self._node_ids) num_edges=self._num_edges,
num_nodes=self._num_nodes)
return output return output
@property @property
...@@ -216,6 +221,24 @@ class BaseGraphWrapper(object): ...@@ -216,6 +221,24 @@ class BaseGraphWrapper(object):
""" """
return self._num_nodes return self._num_nodes
@property
def graph_lod(self):
"""Return graph index for graphs
Return:
A variable with shape [None ] as the Lod information of multiple-graph.
"""
return self._graph_lod
@property
def num_graph(self):
"""Return a variable of number of graphs
Return:
A variable with shape (1,) as the number of Graphs in int64.
"""
return self._num_graph
@property @property
def edge_feat(self): def edge_feat(self):
"""Return a dictionary of tensor representing edge features. """Return a dictionary of tensor representing edge features.
...@@ -309,7 +332,6 @@ class StaticGraphWrapper(BaseGraphWrapper): ...@@ -309,7 +332,6 @@ class StaticGraphWrapper(BaseGraphWrapper):
def __create_graph_attr(self, graph): def __create_graph_attr(self, graph):
"""Create graph attributes for paddlepaddle. """Create graph attributes for paddlepaddle.
""" """
src, dst = list(zip(*graph.edges))
src, dst, eid = graph.sorted_edges(sort_by="dst") src, dst, eid = graph.sorted_edges(sort_by="dst")
indegree = graph.indegree() indegree = graph.indegree()
nodes = graph.nodes nodes = graph.nodes
...@@ -317,6 +339,17 @@ class StaticGraphWrapper(BaseGraphWrapper): ...@@ -317,6 +339,17 @@ class StaticGraphWrapper(BaseGraphWrapper):
uniq_dst_count = indegree[indegree > 0] uniq_dst_count = indegree[indegree > 0]
uniq_dst_count = np.cumsum(uniq_dst_count, dtype='int32') uniq_dst_count = np.cumsum(uniq_dst_count, dtype='int32')
uniq_dst_count = np.insert(uniq_dst_count, 0, 0) uniq_dst_count = np.insert(uniq_dst_count, 0, 0)
graph_lod = graph.graph_lod
num_graph = graph.num_graph
num_edges = len(src)
if num_edges == 0:
# Fake Graph
src = np.array([0], dtype="int64")
dst = np.array([0], dtype="int64")
eid = np.array([0], dtype="int64")
uniq_dst_count = np.array([0, 1], dtype="int32")
uniq_dst = np.array([0], dtype="int64")
edge_feat = {} edge_feat = {}
...@@ -327,6 +360,20 @@ class StaticGraphWrapper(BaseGraphWrapper): ...@@ -327,6 +360,20 @@ class StaticGraphWrapper(BaseGraphWrapper):
self.__create_graph_node_feat(node_feat, self._initializers) self.__create_graph_node_feat(node_feat, self._initializers)
self.__create_graph_edge_feat(edge_feat, self._initializers) self.__create_graph_edge_feat(edge_feat, self._initializers)
self._num_edges, init = paddle_helper.constant(
dtype="int64",
value=np.array(
[num_edges], dtype="int64"),
name=self._data_name_prefix + '/num_edges')
self._initializers.append(init)
self._num_graph, init = paddle_helper.constant(
dtype="int64",
value=np.array(
[num_graph], dtype="int64"),
name=self._data_name_prefix + '/num_graph')
self._initializers.append(init)
self._edges_src, init = paddle_helper.constant( self._edges_src, init = paddle_helper.constant(
dtype="int64", dtype="int64",
value=src, value=src,
...@@ -358,6 +405,12 @@ class StaticGraphWrapper(BaseGraphWrapper): ...@@ -358,6 +405,12 @@ class StaticGraphWrapper(BaseGraphWrapper):
value=uniq_dst_count) value=uniq_dst_count)
self._initializers.append(init) self._initializers.append(init)
self._graph_lod, init = paddle_helper.constant(
name=self._data_name_prefix + "/graph_lod",
dtype="int32",
value=graph_lod)
self._initializers.append(init)
node_ids_value = np.arange(0, graph.num_nodes, dtype="int64") node_ids_value = np.arange(0, graph.num_nodes, dtype="int64")
self._node_ids, init = paddle_helper.constant( self._node_ids, init = paddle_helper.constant(
name=self._data_name_prefix + "/node_ids", name=self._data_name_prefix + "/node_ids",
...@@ -496,6 +549,18 @@ class GraphWrapper(BaseGraphWrapper): ...@@ -496,6 +549,18 @@ class GraphWrapper(BaseGraphWrapper):
def __create_graph_attr_holders(self): def __create_graph_attr_holders(self):
"""Create data holders for graph attributes. """Create data holders for graph attributes.
""" """
self._num_edges = fluid.layers.data(
self._data_name_prefix + '/num_edges',
shape=[1],
append_batch_size=False,
dtype="int64",
stop_gradient=True)
self._num_graph = fluid.layers.data(
self._data_name_prefix + '/num_graph',
shape=[1],
append_batch_size=False,
dtype="int64",
stop_gradient=True)
self._edges_src = fluid.layers.data( self._edges_src = fluid.layers.data(
self._data_name_prefix + '/edges_src', self._data_name_prefix + '/edges_src',
shape=[None], shape=[None],
...@@ -514,18 +579,28 @@ class GraphWrapper(BaseGraphWrapper): ...@@ -514,18 +579,28 @@ class GraphWrapper(BaseGraphWrapper):
append_batch_size=False, append_batch_size=False,
dtype='int64', dtype='int64',
stop_gradient=True) stop_gradient=True)
self._edge_uniq_dst = fluid.layers.data( self._edge_uniq_dst = fluid.layers.data(
self._data_name_prefix + "/uniq_dst", self._data_name_prefix + "/uniq_dst",
shape=[None], shape=[None],
append_batch_size=False, append_batch_size=False,
dtype="int64", dtype="int64",
stop_gradient=True) stop_gradient=True)
self._graph_lod = fluid.layers.data(
self._data_name_prefix + "/graph_lod",
shape=[None],
append_batch_size=False,
dtype="int32",
stop_gradient=True)
self._edge_uniq_dst_count = fluid.layers.data( self._edge_uniq_dst_count = fluid.layers.data(
self._data_name_prefix + "/uniq_dst_count", self._data_name_prefix + "/uniq_dst_count",
shape=[None], shape=[None],
append_batch_size=False, append_batch_size=False,
dtype="int32", dtype="int32",
stop_gradient=True) stop_gradient=True)
self._node_ids = fluid.layers.data( self._node_ids = fluid.layers.data(
self._data_name_prefix + "/node_ids", self._data_name_prefix + "/node_ids",
shape=[None], shape=[None],
...@@ -539,9 +614,15 @@ class GraphWrapper(BaseGraphWrapper): ...@@ -539,9 +614,15 @@ class GraphWrapper(BaseGraphWrapper):
dtype="int64", dtype="int64",
stop_gradient=True) stop_gradient=True)
self._holder_list.extend([ self._holder_list.extend([
self._edges_src, self._edges_dst, self._num_nodes, self._edges_src,
self._edge_uniq_dst, self._edge_uniq_dst_count, self._node_ids, self._edges_dst,
self._indegree self._num_nodes,
self._edge_uniq_dst,
self._edge_uniq_dst_count,
self._node_ids,
self._indegree,
self._graph_lod,
self._num_graph,
]) ])
def __create_graph_node_feat_holders(self, node_feat_name, node_feat_shape, def __create_graph_node_feat_holders(self, node_feat_name, node_feat_shape,
...@@ -587,10 +668,22 @@ class GraphWrapper(BaseGraphWrapper): ...@@ -587,10 +668,22 @@ class GraphWrapper(BaseGraphWrapper):
src, dst, eid = graph.sorted_edges(sort_by="dst") src, dst, eid = graph.sorted_edges(sort_by="dst")
indegree = graph.indegree() indegree = graph.indegree()
nodes = graph.nodes nodes = graph.nodes
num_edges = len(src)
uniq_dst = nodes[indegree > 0] uniq_dst = nodes[indegree > 0]
uniq_dst_count = indegree[indegree > 0] uniq_dst_count = indegree[indegree > 0]
uniq_dst_count = np.cumsum(uniq_dst_count, dtype='int32') uniq_dst_count = np.cumsum(uniq_dst_count, dtype='int32')
uniq_dst_count = np.insert(uniq_dst_count, 0, 0) uniq_dst_count = np.insert(uniq_dst_count, 0, 0)
num_graph = graph.num_graph
graph_lod = graph.graph_lod
if num_edges == 0:
# Fake Graph
src = np.array([0], dtype="int64")
dst = np.array([0], dtype="int64")
eid = np.array([0], dtype="int64")
uniq_dst_count = np.array([0, 1], dtype="int32")
uniq_dst = np.array([0], dtype="int64")
edge_feat = {} edge_feat = {}
...@@ -598,14 +691,20 @@ class GraphWrapper(BaseGraphWrapper): ...@@ -598,14 +691,20 @@ class GraphWrapper(BaseGraphWrapper):
edge_feat[key] = value[eid] edge_feat[key] = value[eid]
node_feat = graph.node_feat node_feat = graph.node_feat
feed_dict[self._data_name_prefix + '/num_edges'] = np.array(
[num_edges], dtype="int64")
feed_dict[self._data_name_prefix + '/edges_src'] = src feed_dict[self._data_name_prefix + '/edges_src'] = src
feed_dict[self._data_name_prefix + '/edges_dst'] = dst feed_dict[self._data_name_prefix + '/edges_dst'] = dst
feed_dict[self._data_name_prefix + '/num_nodes'] = np.array( feed_dict[self._data_name_prefix + '/num_nodes'] = np.array(
graph.num_nodes) [graph.num_nodes], dtype="int64")
feed_dict[self._data_name_prefix + '/uniq_dst'] = uniq_dst feed_dict[self._data_name_prefix + '/uniq_dst'] = uniq_dst
feed_dict[self._data_name_prefix + '/uniq_dst_count'] = uniq_dst_count feed_dict[self._data_name_prefix + '/uniq_dst_count'] = uniq_dst_count
feed_dict[self._data_name_prefix + '/node_ids'] = graph.nodes feed_dict[self._data_name_prefix + '/node_ids'] = graph.nodes
feed_dict[self._data_name_prefix + '/indegree'] = indegree feed_dict[self._data_name_prefix + '/indegree'] = indegree
feed_dict[self._data_name_prefix + '/graph_lod'] = graph_lod
feed_dict[self._data_name_prefix + '/num_graph'] = np.array(
[num_graph], dtype="int64")
feed_dict[self._data_name_prefix + '/indegree'] = indegree
for key in self.node_feat_tensor_dict: for key in self.node_feat_tensor_dict:
feed_dict[self._data_name_prefix + '/node_feat/' + feed_dict[self._data_name_prefix + '/node_feat/' +
......
...@@ -70,6 +70,55 @@ class HeterGraphTest(unittest.TestCase): ...@@ -70,6 +70,55 @@ class HeterGraphTest(unittest.TestCase):
self.assertEqual(len(nodes), batch_size) self.assertEqual(len(nodes), batch_size)
self.assertListEqual(list(nodes), ground[idx]) self.assertListEqual(list(nodes), ground[idx])
def test_sample_successor(self):
print()
nodes = [4, 5, 8]
md = 2
succes = self.graph.sample_successor(
edge_type='p2a', nodes=nodes, max_degree=md, return_eids=False)
self.assertIsInstance(succes, list)
ground = [[10, 11, 12, 14, 13], [], [14]]
for succ, g in zip(succes, ground):
self.assertIsInstance(succ, np.ndarray)
for i in succ:
self.assertIn(i, g)
nodes = [4]
succes = self.graph.sample_successor(
edge_type='p2a', nodes=nodes, max_degree=md, return_eids=False)
self.assertIsInstance(succes, list)
ground = [[10, 11, 12, 14, 13]]
for succ, g in zip(succes, ground):
self.assertIsInstance(succ, np.ndarray)
for i in succ:
self.assertIn(i, g)
def test_successor(self):
print()
nodes = [4, 5, 8]
e_type = 'p2a'
succes = self.graph.successor(
edge_type=e_type,
nodes=nodes, )
self.assertIsInstance(succes, np.ndarray)
ground = [[10, 11, 12, 14, 13], [], [14]]
for succ, g in zip(succes, ground):
self.assertIsInstance(succ, np.ndarray)
self.assertCountEqual(succ, g)
nodes = [4]
e_type = 'p2a'
succes = self.graph.successor(
edge_type=e_type,
nodes=nodes, )
self.assertIsInstance(succes, np.ndarray)
ground = [[10, 11, 12, 14, 13]]
for succ, g in zip(succes, ground):
self.assertIsInstance(succ, np.ndarray)
self.assertCountEqual(succ, g)
def test_sample_nodes(self): def test_sample_nodes(self):
print() print()
p_ground = [4, 5, 6, 7, 8, 9] p_ground = [4, 5, 6, 7, 8, 9]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册