提交 38057f1d 编写于 作者: L liweibin

add redis hetergraph

上级 a580ad0c
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""redis_hetergraph"""
import pgl
import redis
from redis import BlockingConnectionPool, StrictRedis
from redis._compat import b, unicode, bytes, long, basestring
from rediscluster.nodemanager import NodeManager
from rediscluster.crc import crc16
from collections import OrderedDict
import threading
import numpy as np
import time
import json
import pgl.graph as pgraph
import pickle as pkl
from pgl.utils.logger import log
import pgl.graph_kernel as graph_kernel
from pgl.contrib import heter_graph
import pgl.redis_graph as rg
class RedisHeterGraph(rg.RedisGraph):
"""Redis Heterogeneous Graph"""
def __init__(self, name, edge_types, redis_config, num_parts):
super(RedisHeterGraph, self).__init__(name, redis_config, num_parts)
self._num_edges = {}
self.edge_types = edge_types
self.e_type = None
self._edge_feat_info = {}
self._edge_feat_dtype = {}
self._edge_feat_shape = {}
def num_edges_by_type(self, e_type):
"""get edge number by specified edge type"""
if e_type not in self._num_edges:
self._num_edges[e_type] = int(
self._rs.get("%s:num_edges" % e_type))
return self._num_edges[e_type]
def num_edges(self):
"""num_edges"""
num_edges = {}
for e_type in self.edge_types:
num_edges[e_type] = self.num_edges_by_type(e_type)
return num_edges
def edge_feat_info_by_type(self, e_type):
"""get edge features information by specified edge type"""
if e_type not in self._edge_feat_info:
buff = self._rs.get("%s:ef:infos" % e_type)
if buff is not None:
self._edge_feat_info[e_type] = json.loads(buff.decode())
else:
self._edge_feat_info[e_type] = []
return self._edge_feat_info[e_type]
def edge_feat_info(self):
"""edge_feat_info"""
edge_feat_info = {}
for e_type in self.edge_types:
edge_feat_info[e_type] = self.edge_feat_info_by_type(e_type)
return edge_feat_info
def edge_feat_shape(self, e_type, key):
"""edge_feat_shape"""
if e_type not in self._edge_feat_shape:
e_feat_shape = {}
for k, shape, _ in self.edge_feat_info()[e_type]:
e_feat_shape[k] = shape
self._edge_feat_shape[e_type] = e_feat_shape
return self._edge_feat_shape[e_type][key]
def edge_feat_dtype(self, e_type, key):
"""edge_feat_dtype"""
if e_type not in self._edge_feat_dtype:
e_feat_dtype = {}
for k, _, dtype in self.edge_feat_info()[e_type]:
e_feat_dtype[k] = dtype
self._edge_feat_dtype[e_type] = e_feat_dtype
return self._edge_feat_dtype[e_type][key]
def sample_predecessor(self, e_type, nodes, max_degree, return_eids=False):
"""sample predecessor with the specified edge type"""
query = ["%s:d:%s" % (e_type, n) for n in nodes]
rets = rg.hmget_sample_helper(self._rs, query, self.num_parts,
max_degree)
v = []
eid = []
for buff in rets:
if buff is None:
v.append(np.array([], dtype="int64"))
eid.append(np.array([], dtype="int64"))
else:
npret = np.frombuffer(
buff, dtype="int64").reshape([-1, 2]).astype("int64")
v.append(npret[:, 0])
eid.append(npret[:, 1])
if return_eids:
return np.array(v), np.array(eid)
else:
return np.array(v)
def sample_successor(self, e_type, nodes, max_degree, return_eids=False):
"""sample successor with the specified edge type"""
query = ["%s:s:%s" % (e_type, n) for n in nodes]
rets = rg.hmget_sample_helper(self._rs, query, self.num_parts,
max_degree)
v = []
eid = []
for buff in rets:
if buff is None:
v.append(np.array([], dtype="int64"))
eid.append(np.array([], dtype="int64"))
else:
npret = np.frombuffer(
buff, dtype="int64").reshape([-1, 2]).astype("int64")
v.append(npret[:, 0])
eid.append(npret[:, 1])
if return_eids:
return np.array(v), np.array(eid)
else:
return np.array(v)
def predecessor(self, e_type, nodes, return_eids=False):
"""predecessor with the specified edge type"""
query = ["%s:d:%s" % (e_type, n) for n in nodes]
ret = rg.hmget_helper(self._rs, query, self.num_parts)
v = []
eid = []
for buff in ret:
if buff is not None:
npret = np.frombuffer(
buff, dtype="int64").reshape([-1, 2]).astype("int64")
v.append(npret[:, 0])
eid.append(npret[:, 1])
else:
v.append(np.array([], dtype="int64"))
eid.append(np.array([], dtype="int64"))
if return_eids:
return np.array(v), np.array(eid)
else:
return np.array(v)
def successor(self, e_type, nodes, return_eids=False):
"""successor with the specified edge type"""
query = ["%s:s:%s" % (e_type, n) for n in nodes]
ret = rg.hmget_helper(self._rs, query, self.num_parts)
v = []
eid = []
for buff in ret:
if buff is not None:
npret = np.frombuffer(
buff, dtype="int64").reshape([-1, 2]).astype("int64")
v.append(npret[:, 0])
eid.append(npret[:, 1])
else:
v.append(np.array([], dtype="int64"))
eid.append(np.array([], dtype="int64"))
if return_eids:
return np.array(v), np.array(eid)
else:
return np.array(v)
def get_edges_by_id(self, e_type, eids):
"""get_edges_by_id"""
queries = ["%s:e:%s" % (e_type, e) for e in eids]
ret = rg.hmget_helper(self._rs, queries, self.num_parts)
o = np.asarray(ret, dtype="int64")
dst = o % self.num_nodes
src = o // self.num_nodes
data = np.hstack(
[src.reshape([-1, 1]), dst.reshape([-1, 1])]).astype("int64")
return data
def get_edge_feat_by_id(self, e_type, key, eids):
"""get_edge_feat_by_id"""
queries = ["%s:ef:%s:%i" % (e_type, key, e) for e in eids]
ret = rg.hmget_helper(self._rs, queries, self.num_parts)
if ret is None:
return None
else:
ret = b"".join(ret)
data = np.frombuffer(ret, dtype=self.edge_feat_dtype(e_type, key))
data = data.reshape(self.edge_feat_shape(e_type, key))
return data
def get_node_types(self, nodes):
"""get_node_types """
queries = ["nt:%i" % n for n in nodes]
ret = rg.hmget_helper(self._rs, queries, self.num_parts)
node_types = []
for buff in ret:
if buff:
node_types.append(buff.decode())
else:
node_types = None
return node_types
def subgraph(self, nodes, eid, edges=None):
"""Generate heterogeneous subgraph with nodes and edge ids.
WARNING: ALL NODES IN EID MUST BE INCLUDED BY NODES
Args:
nodes: Node ids which will be included in the subgraph.
eid: Edge ids which will be included in the subgraph.
Return:
A :code:`pgl.heter_graph.Subgraph` object.
"""
reindex = {}
for ind, node in enumerate(nodes):
reindex[node] = ind
_node_types = self.get_node_types(nodes)
if _node_types is None:
node_types = None
else:
node_types = []
for idx, t in zip(nodes, _node_types):
node_types.append([reindex[idx], t])
if edges is None:
edges = {}
for e_type, eid_list in eid.items():
edges[e_type] = self.get_edges_by_id(e_type, eid_list)
sub_edges = {}
for e_type, edges_list in edges.items():
sub_edges[e_type] = graph_kernel.map_edges(
np.arange(
len(edges_list), dtype="int64"), edges_list, reindex)
sub_edge_feat = {}
for e_type, edge_feat_info in self.edge_feat_info().items():
type_edge_feat = {}
for key, _, _ in edge_feat_info:
type_edge_feat[key] = self.get_edge_feat_by_id(e_type, key,
eid)
sub_edge_feat[e_type] = type_edge_feat
sub_node_feat = {}
for key, _, _ in self.node_feat_info():
sub_node_feat[key] = self.get_node_feat_by_id(key, nodes)
subgraph = heter_graph.SubHeterGraph(
num_nodes=len(nodes),
edges=sub_edges,
node_types=node_types,
node_feat=sub_node_feat,
edge_feat=sub_edge_feat,
reindex=reindex)
return subgraph
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册