未验证 提交 b86d5c68 编写于 作者: H Huang Zhengjie 提交者: GitHub

Merge pull request #32 from PaddlePaddle/develop

Merge from develop; version 1.0.2
# Distributed metapath2vec in PGL
[metapath2vec](https://ericdongyx.github.io/papers/KDD17-dong-chawla-swami-metapath2vec.pdf) is a algorithm framework for representation learning in heterogeneous networks which contains multiple types of nodes and links. Given a heterogeneous graph, metapath2vec algorithm first generates meta-path-based random walks and then use skipgram model to train a language model. Based on PGL, we reproduce metapath2vec algorithm in distributed mode.
## Datasets
DBLP: The dataset contains 14376 papers (P), 20 conferences (C), 14475 authors (A), and 8920 terms (T). There are 33791 nodes in this dataset.
You can dowload datasets from [here](https://github.com/librahu/HIN-Datasets-for-Recommendation-and-Network-Embedding)
We use the ```DBLP``` dataset for example. After downloading the dataset, put them, let's say, in ```./data/DBLP/``` .
## Dependencies
- paddlepaddle>=1.6
- pgl>=1.0.0
## How to run
Before training, run the below command to do data preprocessing.
```sh
python data_process.py --data_path ./data/DBLP --output_path ./data/data_processed
```
We adopt [PaddlePaddle Fleet](https://github.com/PaddlePaddle/Fleet) as our distributed training frameworks. ```config.yaml``` is a configure file for metapath2vec hyperparameters and ```local_config``` is a configure file for parameter servers of PaddlePaddle. By default, we have 2 pservers and 2 trainers. One can use ```cloud_run.sh``` to help startup the parameter servers and model trainers.
For examples, train metapath2vec in distributed mode on DBLP dataset.
```sh
# train metapath2vec in distributed mode.
sh cloud_run.sh
# multiclass task example
python multi_class.py --dataset ./data/data_processed/author_label.txt --ckpt_path ./checkpoints/2000 --num_nodes 33791
```
## Hyperparameters
All the hyper parameters are saved in ```config.yaml``` file. So before training, you can open the config.yaml to modify the hyper parameters as you like.
Some important hyper parameters in config.yaml:
- **edge_path**: the directory of graph data that you want to load
- **lr**: learning rate
- **neg_num**: number of negative samples.
- **num_walks**: number of walks started from each node
- **walk_len**: walk length
- **meta_path**: meta path scheme
#!/bin/bash
set -x
mode=${1}
source ./utils.sh
unset http_proxy https_proxy
source ./local_config
if [ ! -d ${log_dir} ]; then
mkdir ${log_dir}
fi
for((i=0;i<${PADDLE_PSERVERS_NUM};i++))
do
echo "start ps server: ${i}"
echo $log_dir
TRAINING_ROLE="PSERVER" PADDLE_TRAINER_ID=${i} sh job.sh &> $log_dir/pserver.$i.log &
done
sleep 10s
for((j=0;j<${PADDLE_TRAINERS_NUM};j++))
do
echo "start ps work: ${j}"
TRAINING_ROLE="TRAINER" PADDLE_TRAINER_ID=${j} sh job.sh &> $log_dir/worker.$j.log &
done
tail -f $log_dir/worker.0.log
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import time
import os
import math
import numpy as np
import paddle.fluid as F
import paddle.fluid.layers as L
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from pgl.utils.logger import log
from model import Metapath2vecModel
from graph import m2vGraph
from utils import load_config
from walker import multiprocess_data_generator
def init_role():
# reset the place according to role of parameter server
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
paddle_role = role_maker.Role.WORKER
place = F.CPUPlace()
if training_role == "PSERVER":
paddle_role = role_maker.Role.SERVER
# set the fleet runtime environment according to configure
ports = os.getenv("PADDLE_PORT", "6174").split(",")
pserver_ips = os.getenv("PADDLE_PSERVERS").split(",") # ip,ip...
eplist = []
if len(ports) > 1:
# local debug mode, multi port
for port in ports:
eplist.append(':'.join([pserver_ips[0], port]))
else:
# distributed mode, multi ip
for ip in pserver_ips:
eplist.append(':'.join([ip, ports[0]]))
pserver_endpoints = eplist # ip:port,ip:port...
worker_num = int(os.getenv("PADDLE_TRAINERS_NUM", "0"))
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
role = role_maker.UserDefinedRoleMaker(
current_id=trainer_id,
role=paddle_role,
worker_num=worker_num,
server_endpoints=pserver_endpoints)
fleet.init(role)
def optimization(base_lr, loss, train_steps, optimizer='sgd'):
decayed_lr = L.learning_rate_scheduler.polynomial_decay(
learning_rate=base_lr,
decay_steps=train_steps,
end_learning_rate=0.0001 * base_lr,
power=1.0,
cycle=False)
if optimizer == 'sgd':
optimizer = F.optimizer.SGD(decayed_lr)
elif optimizer == 'adam':
optimizer = F.optimizer.Adam(decayed_lr, lazy_mode=True)
else:
raise ValueError
log.info('learning rate:%f' % (base_lr))
#create the DistributeTranspiler configure
config = DistributeTranspilerConfig()
config.sync_mode = False
#config.runtime_split_send_recv = False
config.slice_var_up = False
#create the distributed optimizer
optimizer = fleet.distributed_optimizer(optimizer, config)
optimizer.minimize(loss)
def build_complied_prog(train_program, model_loss):
num_threads = int(os.getenv("CPU_NUM", 10))
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", 0))
exec_strategy = F.ExecutionStrategy()
exec_strategy.num_threads = num_threads
#exec_strategy.use_experimental_executor = True
build_strategy = F.BuildStrategy()
build_strategy.enable_inplace = True
#build_strategy.memory_optimize = True
build_strategy.memory_optimize = False
build_strategy.remove_unnecessary_lock = False
if num_threads > 1:
build_strategy.reduce_strategy = F.BuildStrategy.ReduceStrategy.Reduce
compiled_prog = F.compiler.CompiledProgram(
train_program).with_data_parallel(loss_name=model_loss.name)
return compiled_prog
def train_prog(exe, program, loss, node2vec_pyreader, args, train_steps):
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
step = 0
if not os.path.exists(args.save_path):
os.makedirs(args.save_path)
while True:
try:
begin_time = time.time()
loss_val, = exe.run(program, fetch_list=[loss])
log.info("step %s: loss %.5f speed: %.5f s/step" %
(step, np.mean(loss_val), time.time() - begin_time))
step += 1
except F.core.EOFException:
node2vec_pyreader.reset()
if step % args.steps_per_save == 0 or step == train_steps:
save_path = args.save_path
if trainer_id == 0:
model_path = os.path.join(save_path, "%s" % step)
fleet.save_persistables(exe, model_path)
if step == train_steps:
break
def main(args):
log.info("start")
worker_num = int(os.getenv("PADDLE_TRAINERS_NUM", "0"))
num_devices = int(os.getenv("CPU_NUM", 10))
model = Metapath2vecModel(config=args)
pyreader = model.pyreader
loss = model.forward()
# init fleet
init_role()
train_steps = math.ceil(args.num_nodes * args.epochs / args.batch_size /
num_devices / worker_num)
log.info("Train step: %s" % train_steps)
real_batch_size = args.batch_size * args.walk_len * args.win_size
if args.optimizer == "sgd":
args.lr *= real_batch_size
optimization(args.lr, loss, train_steps, args.optimizer)
# init and run server or worker
if fleet.is_server():
fleet.init_server(args.warm_start_from_dir)
fleet.run_server()
if fleet.is_worker():
log.info("start init worker done")
fleet.init_worker()
#just the worker, load the sample
log.info("init worker done")
exe = F.Executor(F.CPUPlace())
exe.run(fleet.startup_program)
log.info("Startup done")
dataset = m2vGraph(args)
log.info("Build graph done.")
data_generator = multiprocess_data_generator(args, dataset)
cur_time = time.time()
for idx, _ in enumerate(data_generator()):
log.info("iter %s: %s s" % (idx, time.time() - cur_time))
cur_time = time.time()
if idx == 100:
break
pyreader.decorate_tensor_provider(data_generator)
pyreader.start()
compiled_prog = build_complied_prog(fleet.main_program, loss)
train_prog(exe, compiled_prog, loss, pyreader, args, train_steps)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='metapath2vec')
parser.add_argument("-c", "--config", type=str, default="./config.yaml")
args = parser.parse_args()
config = load_config(args.config)
log.info(config)
main(config)
# graph data config
edge_path: "./data/data_processed"
edge_files: "p2a:paper_author.txt,p2c:paper_conference.txt,p2t:paper_type.txt"
node_types_file: "node_types.txt"
num_nodes: 37791
symmetry: True
# skipgram pair data config
win_size: 5
neg_num: 5
# average; m2v_plus
neg_sample_type: "average"
# random walk config
# m2v; multi_m2v;
walk_mode: "m2v"
meta_path: "c2p-p2a-a2p-p2c"
first_node_type: "c"
walk_len: 24
batch_size: 4
node_shuffle: True
node_files: null
num_sample_workers: 2
# model config
embed_dim: 64
is_sparse: True
# only use when num_nodes > 100,000,000, slower than noraml embedding
is_distributed: False
# trainging config
epochs: 10
optimizer: "sgd"
lr: 1.0
warm_start_from_dir: null
walkpath_files: "None"
train_files: "None"
steps_per_save: 1000
save_path: "./checkpoints"
log_dir: "./logs"
CPU_NUM: 16
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Data preprocessing for DBLP dataset"""
import sys
import os
import argparse
import numpy as np
from collections import OrderedDict
AUTHOR = 14475
PAPER = 14376
CONF = 20
TYPE = 8920
LABEL = 4
def build_node_types(meta_node, outfile):
"""build_node_types"""
nt_ori2new = {}
with open(outfile, 'w') as writer:
offset = 0
for node_type, num_nodes in meta_node.items():
ori_id2new_id = {}
for i in range(num_nodes):
writer.write("%d\t%s\n" % (offset + i, node_type))
ori_id2new_id[i + 1] = offset + i
nt_ori2new[node_type] = ori_id2new_id
offset += num_nodes
return nt_ori2new
def remapping_index(args, src_dict, dst_dict, ori_file, new_file):
"""remapping_index"""
ori_file = os.path.join(args.data_path, ori_file)
new_file = os.path.join(args.output_path, new_file)
with open(ori_file, 'r') as reader, open(new_file, 'w') as writer:
for line in reader:
slots = line.strip().split()
s = int(slots[0])
d = int(slots[1])
new_s = src_dict[s]
new_d = dst_dict[d]
writer.write("%d\t%d\n" % (new_s, new_d))
def author_label(args, ori_id2pgl_id, ori_file, real_file, new_file):
"""author_label"""
ori_file = os.path.join(args.data_path, ori_file)
real_file = os.path.join(args.data_path, real_file)
new_file = os.path.join(args.output_path, new_file)
real_id2pgl_id = {}
with open(ori_file, 'r') as reader:
for line in reader:
slots = line.strip().split()
ori_id = int(slots[0])
real_id = int(slots[1])
pgl_id = ori_id2pgl_id[ori_id]
real_id2pgl_id[real_id] = pgl_id
with open(real_file, 'r') as reader, open(new_file, 'w') as writer:
for line in reader:
slots = line.strip().split()
real_id = int(slots[0])
label = int(slots[1])
pgl_id = real_id2pgl_id[real_id]
writer.write("%d\t%d\n" % (pgl_id, label))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='DBLP data preprocessing')
parser.add_argument(
'--data_path',
default=None,
type=str,
help='original data path(default: None)')
parser.add_argument(
'--output_path',
default=None,
type=str,
help='output path(default: None)')
args = parser.parse_args()
meta_node = OrderedDict()
meta_node['a'] = AUTHOR
meta_node['p'] = PAPER
meta_node['c'] = CONF
meta_node['t'] = TYPE
if not os.path.exists(args.output_path):
os.makedirs(args.output_path)
node_types_file = os.path.join(args.output_path, "node_types.txt")
nt_ori2new = build_node_types(meta_node, node_types_file)
remapping_index(args, nt_ori2new['p'], nt_ori2new['a'], 'paper_author.dat',
'paper_author.txt')
remapping_index(args, nt_ori2new['p'], nt_ori2new['c'],
'paper_conference.dat', 'paper_conference.txt')
remapping_index(args, nt_ori2new['p'], nt_ori2new['t'], 'paper_type.dat',
'paper_type.txt')
author_label(args, nt_ori2new['a'], 'author_map_id.dat',
'author_label.dat', 'author_label.txt')
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import sys
import os
import numpy as np
import pickle as pkl
import tqdm
import time
import random
from pgl.utils.logger import log
from pgl import heter_graph
class m2vGraph(object):
"""Implemetation of graph in order to sample metapath random walk.
"""
def __init__(self, config):
self.edge_path = config.edge_path
self.num_nodes = config.num_nodes
self.symmetry = config.symmetry
edge_files = config.edge_files
node_types_file = config.node_types_file
self.edge_file_list = []
for pair in edge_files.split(','):
e_type, filename = pair.split(':')
filename = os.path.join(self.edge_path, filename)
self.edge_file_list.append((e_type, filename))
self.node_types_file = os.path.join(self.edge_path, node_types_file)
self.build_graph()
def build_graph(self):
"""Build pgl heterogeneous graph.
"""
edges_by_types = {}
npy = self.edge_file_list[0][1] + ".npy"
if os.path.exists(npy):
log.info("load data from numpy file")
for pair in self.edge_file_list:
edges_by_types[pair[0]] = np.load(pair[1] + ".npy")
else:
log.info("load data from txt file")
for pair in self.edge_file_list:
edges_by_types[pair[0]] = self.load_edges(pair[1])
# np.save(pair[1] + ".npy", edges_by_types[pair[0]])
for e_type, edges in edges_by_types.items():
log.info(["number of %s edges: " % e_type, len(edges)])
if self.symmetry:
tmp = {}
for key, edges in edges_by_types.items():
n_list = key.split('2')
re_key = n_list[1] + '2' + n_list[0]
tmp[re_key] = edges_by_types[key][:, [1, 0]]
edges_by_types.update(tmp)
log.info(["finished loadding symmetry edges."])
node_types = self.load_node_types(self.node_types_file)
assert len(node_types) == self.num_nodes, \
"num_nodes should be equal to the length of node_types"
log.info(["number of nodes: ", len(node_types)])
node_features = {
'index': np.array([i for i in range(self.num_nodes)]).reshape(
-1, 1).astype(np.int64)
}
self.graph = heter_graph.HeterGraph(
num_nodes=self.num_nodes,
edges=edges_by_types,
node_types=node_types,
node_feat=node_features)
def load_edges(self, file_, symmetry=False):
"""Load edges from file.
"""
edges = []
with open(file_, 'r') as reader:
for line in reader:
items = line.strip().split()
src, dst = int(items[0]), int(items[1])
edges.append((src, dst))
if symmetry:
edges.append((dst, src))
edges = np.array(list(set(edges)), dtype=np.int64)
# edges = list(set(edges))
return edges
def load_node_types(self, file_):
"""Load node types
"""
node_types = []
log.info("node_types_file name: %s" % file_)
with open(file_, 'r') as reader:
for line in reader:
items = line.strip().split()
node_id = int(items[0])
n_type = items[1]
node_types.append((node_id, n_type))
return node_types
#!/bin/bash
set -x
source ./utils.sh
export CPU_NUM=$CPU_NUM
export FLAGS_rpc_deadline=3000000
export FLAGS_communicator_send_queue_size=1
export FLAGS_communicator_min_send_grad_num_before_recv=0
export FLAGS_communicator_max_merge_var_num=1
export FLAGS_communicator_merge_sparse_grad=0
python -u cluster_train.py -c config.yaml
#!/bin/bash
export PADDLE_TRAINERS_NUM=2
export PADDLE_PSERVERS_NUM=2
export PADDLE_PORT=6184,6185
export PADDLE_PSERVERS="127.0.0.1"
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
metapath2vec model.
"""
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
import math
import paddle.fluid.layers as L
import paddle.fluid as F
def distributed_embedding(input,
dict_size,
hidden_size,
initializer,
name,
num_part=16,
is_sparse=False,
learning_rate=1.0):
_part_size = hidden_size // num_part
if hidden_size % num_part != 0:
_part_size += 1
output_embedding = []
p_num = 0
while hidden_size > 0:
_part_size = min(_part_size, hidden_size)
hidden_size -= _part_size
print("part", p_num, "size=", (dict_size, _part_size))
part_embedding = L.embedding(
input=input,
size=(dict_size, int(_part_size)),
is_sparse=is_sparse,
is_distributed=False,
param_attr=F.ParamAttr(
name=name + '_part%s' % p_num,
initializer=initializer,
learning_rate=learning_rate))
p_num += 1
output_embedding.append(part_embedding)
return L.concat(output_embedding, -1)
class Metapath2vecModel(object):
def __init__(self, config, embedding_lr=1.0):
self.config = config
self.neg_num = self.config.neg_num
self.num_nodes = self.config.num_nodes
self.embed_dim = self.config.embed_dim
self.is_sparse = self.config.is_sparse
self.is_distributed = self.config.is_distributed
self.embedding_lr = embedding_lr
self.pyreader = L.py_reader(
capacity=70,
shapes=[[-1, 1, 1], [-1, self.neg_num + 1, 1]],
dtypes=['int64', 'int64'],
lod_levels=[0, 0],
name='train',
use_double_buffer=True)
bound = 1. / math.sqrt(self.embed_dim)
self.embed_init = F.initializer.Uniform(low=-bound, high=bound)
self.loss = None
max_hidden_size = int(math.pow(2, 31) / 4 / self.num_nodes)
self.num_part = int(math.ceil(1. * self.embed_dim / max_hidden_size))
def forward(self):
src, dsts = L.read_file(self.pyreader)
if self.is_sparse:
src = L.reshape(src, [-1, 1])
dsts = L.reshape(dsts, [-1, 1])
if self.num_part is not None and self.num_part != 1 and not self.is_distributed:
src_embed = distributed_embedding(
src,
self.num_nodes,
self.embed_dim,
self.embed_init,
"weight",
self.num_part,
self.is_sparse,
learning_rate=self.embedding_lr)
dsts_embed = distributed_embedding(
dsts,
self.num_nodes,
self.embed_dim,
self.embed_init,
"weight",
self.num_part,
self.is_sparse,
learning_rate=self.embedding_lr)
else:
src_embed = L.embedding(
src, (self.num_nodes, self.embed_dim),
self.is_sparse,
self.is_distributed,
param_attr=F.ParamAttr(
name="weight",
learning_rate=self.embedding_lr,
initializer=self.embed_init))
dsts_embed = L.embedding(
dsts, (self.num_nodes, self.embed_dim),
self.is_sparse,
self.is_distributed,
param_attr=F.ParamAttr(
name="weight",
learning_rate=self.embedding_lr,
initializer=self.embed_init))
if self.is_sparse:
src_embed = L.reshape(src_embed, [-1, 1, self.embed_dim])
dsts_embed = L.reshape(dsts_embed,
[-1, self.neg_num + 1, self.embed_dim])
logits = L.matmul(
src_embed, dsts_embed,
transpose_y=True) # [batch_size, 1, neg_num+1]
pos_label = L.fill_constant_batch_size_like(logits, [-1, 1, 1],
"float32", 1)
neg_label = L.fill_constant_batch_size_like(
logits, [-1, 1, self.neg_num], "float32", 0)
label = L.concat([pos_label, neg_label], -1)
pos_weight = L.fill_constant_batch_size_like(logits, [-1, 1, 1],
"float32", self.neg_num)
neg_weight = L.fill_constant_batch_size_like(
logits, [-1, 1, self.neg_num], "float32", 1)
weight = L.concat([pos_weight, neg_weight], -1)
weight.stop_gradient = True
label.stop_gradient = True
loss = L.sigmoid_cross_entropy_with_logits(logits, label)
loss = loss * weight
loss = L.reduce_mean(loss)
loss = loss * ((self.neg_num + 1) / 2 / self.neg_num)
loss.persistable = True
self.loss = loss
return loss
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Optimized Multiprocessing Reader for PaddlePaddle
"""
import multiprocessing
import numpy as np
import time
import paddle.fluid as fluid
import pyarrow
def _serialize_serializable(obj):
"""Serialize Feed Dict
"""
return {"type": type(obj), "data": obj.__dict__}
def _deserialize_serializable(obj):
"""Deserialize Feed Dict
"""
val = obj["type"].__new__(obj["type"])
val.__dict__.update(obj["data"])
return val
context = pyarrow.default_serialization_context()
context.register_type(
object,
"object",
custom_serializer=_serialize_serializable,
custom_deserializer=_deserialize_serializable)
def serialize_data(data):
"""serialize_data"""
return pyarrow.serialize(data, context=context).to_buffer().to_pybytes()
def deserialize_data(data):
"""deserialize_data"""
return pyarrow.deserialize(data, context=context)
def multiprocess_reader(readers, use_pipe=True, queue_size=1000):
"""
multiprocess_reader use python multi process to read data from readers
and then use multiprocess.Queue or multiprocess.Pipe to merge all
data. The process number is equal to the number of input readers, each
process call one reader.
Multiprocess.Queue require the rw access right to /dev/shm, some
platform does not support.
you need to create multiple readers first, these readers should be independent
to each other so that each process can work independently.
An example:
.. code-block:: python
reader0 = reader(["file01", "file02"])
reader1 = reader(["file11", "file12"])
reader1 = reader(["file21", "file22"])
reader = multiprocess_reader([reader0, reader1, reader2],
queue_size=100, use_pipe=False)
"""
assert type(readers) is list and len(readers) > 0
def _read_into_queue(reader, queue):
"""read_into_queue"""
for sample in reader():
if sample is None:
raise ValueError("sample has None")
queue.put(serialize_data(sample))
queue.put(serialize_data(None))
def queue_reader():
"""queue_reader"""
queue = multiprocessing.Queue(queue_size)
for reader in readers:
p = multiprocessing.Process(
target=_read_into_queue, args=(reader, queue))
p.start()
reader_num = len(readers)
finish_num = 0
while finish_num < reader_num:
sample = deserialize_data(queue.get())
if sample is None:
finish_num += 1
else:
yield sample
def _read_into_pipe(reader, conn):
"""read_into_pipe"""
for sample in reader():
if sample is None:
raise ValueError("sample has None!")
conn.send(serialize_data(sample))
conn.send(serialize_data(None))
conn.close()
def pipe_reader():
"""pipe_reader"""
conns = []
for reader in readers:
parent_conn, child_conn = multiprocessing.Pipe()
conns.append(parent_conn)
p = multiprocessing.Process(
target=_read_into_pipe, args=(reader, child_conn))
p.start()
reader_num = len(readers)
finish_num = 0
conn_to_remove = []
finish_flag = np.zeros(len(conns), dtype="int32")
while finish_num < reader_num:
for conn_id, conn in enumerate(conns):
if finish_flag[conn_id] > 0:
continue
buff = conn.recv()
now = time.time()
sample = deserialize_data(buff)
out = time.time() - now
if sample is None:
finish_num += 1
conn.close()
finish_flag[conn_id] = 1
else:
yield sample
if use_pipe:
return pipe_reader
else:
return queue_reader
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This file provides the multi class task for testing the embedding learned by metapath2vec model.
"""
import argparse
import sys
import os
import tqdm
import time
import math
import logging
import random
import pickle as pkl
import numpy as np
import sklearn.metrics
from sklearn.metrics import f1_score
import pgl
import paddle.fluid as fluid
import paddle.fluid.layers as fl
def load_data(file_):
"""Load data for node classification.
"""
words_label = []
line_count = 0
with open(file_, 'r') as reader:
for line in reader:
line_count += 1
tokens = line.strip().split()
word, label = int(tokens[0]), int(tokens[1]) - 1
words_label.append((word, label))
words_label = np.array(words_label, dtype=np.int64)
np.random.shuffle(words_label)
logging.info('%d/%d word_label pairs have been loaded' %
(len(words_label), line_count))
return words_label
def node_classify_model(config):
"""Build node classify model.
"""
nodes = fl.data('nodes', shape=[None, 1], dtype='int64')
labels = fl.data('labels', shape=[None, 1], dtype='int64')
embed_nodes = fl.embedding(
input=nodes,
size=[config.num_nodes, config.embed_dim],
param_attr=fluid.ParamAttr(name='weight'))
embed_nodes.stop_gradient = True
probs = fl.fc(input=embed_nodes, size=config.num_labels, act='softmax')
predict = fl.argmax(probs, axis=-1)
loss = fl.cross_entropy(input=probs, label=labels)
loss = fl.reduce_mean(loss)
return {
'loss': loss,
'probs': probs,
'predict': predict,
'labels': labels,
}
def run_epoch(exe, prog, model, feed_dict, lr):
"""Run training process of every epoch.
"""
if lr is None:
loss, predict = exe.run(prog,
feed=feed_dict,
fetch_list=[model['loss'], model['predict']],
return_numpy=True)
lr_ = 0
else:
loss, predict, lr_ = exe.run(
prog,
feed=feed_dict,
fetch_list=[model['loss'], model['predict'], lr],
return_numpy=True)
macro_f1 = f1_score(feed_dict['labels'], predict, average="macro")
micro_f1 = f1_score(feed_dict['labels'], predict, average="micro")
return {
'loss': loss,
'pred': predict,
'lr': lr_,
'macro_f1': macro_f1,
'micro_f1': micro_f1
}
def main(args):
"""main function for training node classification task.
"""
words_label = load_data(args.dataset)
# split data for training and testing
split_position = int(words_label.shape[0] * args.train_percent)
train_words_label = words_label[0:split_position, :]
test_words_label = words_label[split_position:, :]
place = fluid.CUDAPlace(0) if args.use_cuda else fluid.CPUPlace()
train_prog = fluid.Program()
test_prog = fluid.Program()
startup_prog = fluid.Program()
with fluid.program_guard(train_prog, startup_prog):
with fluid.unique_name.guard():
model = node_classify_model(args)
test_prog = train_prog.clone(for_test=True)
with fluid.program_guard(train_prog, startup_prog):
lr = fl.polynomial_decay(args.lr, 1000, 0.001)
adam = fluid.optimizer.Adam(lr)
adam.minimize(model['loss'])
exe = fluid.Executor(place)
exe.run(startup_prog)
def existed_params(var):
if not isinstance(var, fluid.framework.Parameter):
return False
return os.path.exists(os.path.join(args.ckpt_path, var.name))
fluid.io.load_vars(
exe, args.ckpt_path, main_program=train_prog, predicate=existed_params)
# load_param(args.ckpt_path, ['content'])
feed_dict = {}
X = train_words_label[:, 0].reshape(-1, 1)
labels = train_words_label[:, 1].reshape(-1, 1)
logging.info('%d/%d data to train' %
(labels.shape[0], words_label.shape[0]))
test_feed_dict = {}
test_X = test_words_label[:, 0].reshape(-1, 1)
test_labels = test_words_label[:, 1].reshape(-1, 1)
logging.info('%d/%d data to test' %
(test_labels.shape[0], words_label.shape[0]))
for epoch in range(args.epochs):
feed_dict['nodes'] = X
feed_dict['labels'] = labels
train_result = run_epoch(exe, train_prog, model, feed_dict, lr)
test_feed_dict['nodes'] = test_X
test_feed_dict['labels'] = test_labels
test_result = run_epoch(exe, test_prog, model, test_feed_dict, lr=None)
logging.info(
'epoch %d | lr %.4f | train_loss %.5f | train_macro_F1 %.4f | train_micro_F1 %.4f | test_loss %.5f | test_macro_F1 %.4f | test_micro_F1 %.4f'
% (epoch, train_result['lr'], train_result['loss'],
train_result['macro_f1'], train_result['micro_f1'],
test_result['loss'], test_result['macro_f1'],
test_result['micro_f1']))
logging.info(
'final_test_macro_f1 score: %.4f | final_test_micro_f1 score: %.4f' %
(test_result['macro_f1'], test_result['micro_f1']))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='multi_class')
parser.add_argument(
'--dataset',
default=None,
type=str,
help='training and testing data file(default: None)')
parser.add_argument(
'--ckpt_path', default=None, type=str, help='task name(default: None)')
parser.add_argument("--use_cuda", action='store_true', help="use_cuda")
parser.add_argument(
'--train_percent',
default=0.5,
type=float,
help='train_percent(default: 0.5)')
parser.add_argument(
'--num_labels',
default=4,
type=int,
help='number of labels(default: 4)')
parser.add_argument(
'--epochs',
default=100,
type=int,
help='number of epochs for training(default: 100)')
parser.add_argument(
'--lr',
default=0.025,
type=float,
help='learning rate(default: 0.025)')
parser.add_argument(
'--num_nodes', default=0, type=int, help='number of nodes')
parser.add_argument(
'--embed_dim',
default=64,
type=int,
help='dimension of embedding(default: 64)')
args = parser.parse_args()
log_format = '%(asctime)s-%(levelname)s-%(name)s: %(message)s'
logging.basicConfig(level='INFO', format=log_format)
main(args)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of some helper functions"""
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
import os
import time
import yaml
import numpy as np
from pgl.utils.logger import log
class AttrDict(dict):
"""Attr dict
"""
def __init__(self, d):
self.dict = d
def __getattr__(self, attr):
value = self.dict[attr]
if isinstance(value, dict):
return AttrDict(value)
else:
return value
def __str__(self):
return str(self.dict)
def load_config(config_file):
"""Load config file"""
with open(config_file) as f:
if hasattr(yaml, 'FullLoader'):
config = yaml.load(f, Loader=yaml.FullLoader)
else:
config = yaml.load(f)
return AttrDict(config)
# parse yaml file
function parse_yaml {
local prefix=$2
local s='[[:space:]]*' w='[a-zA-Z0-9_]*' fs=$(echo @|tr @ '\034')
sed -ne "s|^\($s\):|\1|" \
-e "s|^\($s\)\($w\)$s:$s[\"']\(.*\)[\"']$s\$|\1$fs\2$fs\3|p" \
-e "s|^\($s\)\($w\)$s:$s\(.*\)$s\$|\1$fs\2$fs\3|p" $1 |
awk -F$fs '{
indent = length($1)/2;
vname[indent] = $2;
for (i in vname) {if (i > indent) {delete vname[i]}}
if (length($3) > 0) {
vn=""; for (i=0; i<indent; i++) {vn=(vn)(vname[i])("_")}
printf("%s%s%s=\"%s\"\n", "'$prefix'",vn, $2, $3);
}
}'
}
eval $(parse_yaml "$(dirname "${BASH_SOURCE}")"/config.yaml)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""doc
"""
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
import time
import io
import os
import numpy as np
import random
from pgl.utils.logger import log
from pgl.sample import metapath_randomwalk
from pgl.graph_kernel import skip_gram_gen_pair
from pgl.graph_kernel import alias_sample_build_table
from utils import load_config
from graph import m2vGraph
import mp_reader
class NodeGenerator(object):
"""Node generator"""
def __init__(self, config, graph):
self.config = config
self.graph = graph
self.batch_size = self.config.batch_size
self.shuffle = self.config.node_shuffle
self.node_files = self.config.node_files
self.first_node_type = self.config.first_node_type
self.walk_mode = self.config.walk_mode
def __call__(self):
if self.walk_mode == "m2v":
generator = self.m2v_node_generate
log.info("node gen mode is : %s" % (self.walk_mode))
elif self.walk_mode == "multi_m2v":
generator = self.multi_m2v_node_generate
log.info("node gen mode is : %s" % (self.walk_mode))
elif self.walk_mode == "files":
generator = self.files_node_generate
log.info("node gen mode is : %s" % (self.walk_mode))
else:
generator = self.m2v_node_generate
log.info("node gen mode is : %s" % (self.walk_mode))
while True:
for nodes in generator():
yield nodes
def m2v_node_generate(self):
"""m2v_node_generate"""
for nodes in self.graph.node_batch_iter(
batch_size=self.batch_size,
n_type=self.first_node_type,
shuffle=self.shuffle):
yield nodes
def multi_m2v_node_generate(self):
"""multi_m2v_node_generate"""
n_type_list = self.first_node_type.split(';')
num_n_type = len(n_type_list)
node_types = np.unique(self.graph.node_types).tolist()
node_generators = {}
for n_type in node_types:
node_generators[n_type] = \
self.graph.node_batch_iter(self.batch_size, n_type=n_type)
cc = 0
while True:
idx = cc % num_n_type
n_type = n_type_list[idx]
try:
nodes = node_generators[n_type].next()
except StopIteration as e:
log.info("exception when iteration")
break
yield (nodes, idx)
cc += 1
def files_node_generate(self):
"""files_node_generate"""
nodes = []
for filename in self.node_files:
with io.open(filename) as inf:
for line in inf:
node = int(line.strip('\n\t'))
nodes.append(node)
if len(nodes) == self.batch_size:
yield nodes
nodes = []
if len(nodes):
yield nodes
class WalkGenerator(object):
"""Walk generator"""
def __init__(self, config, dataset):
self.config = config
self.dataset = dataset
self.graph = self.dataset.graph
self.walk_mode = self.config.walk_mode
self.node_generator = NodeGenerator(self.config, self.graph)
if self.walk_mode == "multi_m2v":
num_path = len(self.config.meta_path.split(';'))
num_first_node_type = len(self.config.first_node_type.split(';'))
assert num_first_node_type == num_path, \
"In [multi_m2v] walk_mode, the number of metapath should be the same \
as the number of first_node_type"
assert num_path > 1, "In [multi_m2v] walk_mode, the number of metapath\
should be greater than 1"
def __call__(self):
np.random.seed(os.getpid())
if self.walk_mode == "m2v":
walk_generator = self.m2v_walk
log.info("walk mode is : %s" % (self.walk_mode))
elif self.walk_mode == "multi_m2v":
walk_generator = self.multi_m2v_walk
log.info("walk mode is : %s" % (self.walk_mode))
else:
raise ValueError("walk_mode [%s] is not matched" % self.walk_mode)
for walks in walk_generator():
yield walks
def m2v_walk(self):
"""Metapath2vec walker"""
for nodes in self.node_generator():
walks = metapath_randomwalk(
self.graph, nodes, self.config.meta_path, self.config.walk_len)
yield walks
def multi_m2v_walk(self):
"""Multi metapath2vec walker"""
meta_paths = self.config.meta_path.split(';')
for nodes, idx in self.node_generator():
walks = metapath_randomwalk(self.graph, nodes, meta_paths[idx],
self.config.walk_len)
yield walks
class DataGenerator(object):
def __init__(self, config, dataset):
self.config = config
self.dataset = dataset
self.graph = self.dataset.graph
self.walk_generator = WalkGenerator(self.config, self.dataset)
def __call__(self):
generator = self.pair_generate
for src, pos, negs in generator():
dst = np.concatenate([pos, negs], 1)
yield src, dst
def pair_generate(self):
for walks in self.walk_generator():
try:
src_list, pos_list = [], []
for walk in walks:
s, p = skip_gram_gen_pair(walk, self.config.win_size)
src_list.append(s), pos_list.append(p)
src = [s for x in src_list for s in x]
pos = [s for x in pos_list for s in x]
if len(src) == 0:
continue
negs = self.negative_sample(
src,
pos,
neg_num=self.config.neg_num,
neg_sample_type=self.config.neg_sample_type)
src = np.array(src, dtype=np.int64).reshape(-1, 1, 1)
pos = np.array(pos, dtype=np.int64).reshape(-1, 1, 1)
yield src, pos, negs
except Exception as e:
log.exception(e)
def negative_sample(self, src, pos, neg_num, neg_sample_type):
if neg_sample_type == "average":
neg_sample_size = [len(pos), neg_num, 1]
negs = np.random.randint(
low=0, high=self.graph.num_nodes, size=neg_sample_size)
elif neg_sample_type == "m2v_plus":
negs = []
for s in src:
neg = self.graph.sample_nodes(
sample_num=neg_num, n_type=self.graph.node_types[s])
negs.append(neg)
negs = np.vstack(negs).reshape(-1, neg_num, 1)
else: # equal to "average"
neg_sample_size = [len(pos), neg_num, 1]
negs = np.random.randint(
low=0, high=self.graph.num_nodes, size=neg_sample_size)
negs = negs.astype(np.int64)
return negs
def multiprocess_data_generator(config, dataset):
"""Multiprocess data generator.
"""
if config.num_sample_workers == 1:
data_generator = DataGenerator(config, dataset)
else:
pool = [
DataGenerator(config, dataset)
for i in range(config.num_sample_workers)
]
data_generator = mp_reader.multiprocess_reader(
pool, use_pipe=True, queue_size=100)
return data_generator
if __name__ == "__main__":
config_file = "./config.yaml"
config = load_config(config_file)
dataset = m2vGraph(config)
data_generator = multiprocess_data_generator(config, dataset)
start = time.time()
cc = 0
for src, dst in data_generator():
log.info(src.shape)
log.info("time: %.6f" % (time.time() - start))
start = time.time()
cc += 1
if cc == 100:
break
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""test ogb
"""
import argparse
import pgl
import numpy as np
import paddle.fluid as fluid
from pgl.contrib.ogb.graphproppred.dataset_pgl import PglGraphPropPredDataset
from pgl.utils import paddle_helper
from ogb.graphproppred import Evaluator
from pgl.contrib.ogb.graphproppred.mol_encoder import AtomEncoder, BondEncoder
def train(exe, batch_size, graph_wrapper, train_program, splitted_idx, dataset,
evaluator, fetch_loss, fetch_pred):
"""Train"""
graphs, labels = dataset[splitted_idx["train"]]
perm = np.arange(0, len(graphs))
np.random.shuffle(perm)
start_batch = 0
batch_no = 0
pred_output = np.zeros_like(labels, dtype="float32")
while start_batch < len(perm):
batch_index = perm[start_batch:start_batch + batch_size]
start_batch += batch_size
batch_graph = pgl.graph.MultiGraph(graphs[batch_index])
batch_label = labels[batch_index]
batch_valid = (batch_label == batch_label).astype("float32")
batch_label = np.nan_to_num(batch_label).astype("float32")
feed_dict = graph_wrapper.to_feed(batch_graph)
feed_dict["label"] = batch_label
feed_dict["weight"] = batch_valid
loss, pred = exe.run(train_program,
feed=feed_dict,
fetch_list=[fetch_loss, fetch_pred])
pred_output[batch_index] = pred
batch_no += 1
print("train", evaluator.eval({"y_true": labels, "y_pred": pred_output}))
def evaluate(exe, batch_size, graph_wrapper, val_program, splitted_idx,
dataset, mode, evaluator, fetch_pred):
"""Eval"""
graphs, labels = dataset[splitted_idx[mode]]
perm = np.arange(0, len(graphs))
start_batch = 0
batch_no = 0
pred_output = np.zeros_like(labels, dtype="float32")
while start_batch < len(perm):
batch_index = perm[start_batch:start_batch + batch_size]
start_batch += batch_size
batch_graph = pgl.graph.MultiGraph(graphs[batch_index])
feed_dict = graph_wrapper.to_feed(batch_graph)
pred = exe.run(val_program, feed=feed_dict, fetch_list=[fetch_pred])
pred_output[batch_index] = pred[0]
batch_no += 1
print(mode, evaluator.eval({"y_true": labels, "y_pred": pred_output}))
def send_func(src_feat, dst_feat, edge_feat):
"""Send"""
return src_feat["h"] + edge_feat["h"]
class GNNModel(object):
"""GNNModel"""
def __init__(self, name, emb_dim, num_task, num_layers):
self.num_task = num_task
self.emb_dim = emb_dim
self.num_layers = num_layers
self.name = name
self.atom_encoder = AtomEncoder(name=name, emb_dim=emb_dim)
self.bond_encoder = BondEncoder(name=name, emb_dim=emb_dim)
def forward(self, graph):
"""foward"""
h_node = self.atom_encoder(graph.node_feat['feat'])
h_edge = self.bond_encoder(graph.edge_feat['feat'])
for layer in range(self.num_layers):
msg = graph.send(
send_func,
nfeat_list=[("h", h_node)],
efeat_list=[("h", h_edge)])
h_node = graph.recv(msg, 'sum') + h_node
h_node = fluid.layers.fc(h_node,
size=self.emb_dim,
name=self.name + '_%s' % layer,
act="relu")
graph_nodes = pgl.layers.graph_pooling(graph, h_node, "average")
graph_pred = fluid.layers.fc(graph_nodes, self.num_task, name="final")
return graph_pred
def main():
"""main
"""
# Training settings
parser = argparse.ArgumentParser(description='Graph Dataset')
parser.add_argument(
'--epochs',
type=int,
default=100,
help='number of epochs to train (default: 100)')
parser.add_argument(
'--dataset',
type=str,
default="ogbg-mol-tox21",
help='dataset name (default: proteinfunc)')
args = parser.parse_args()
place = fluid.CPUPlace() # Dataset too big to use GPU
### automatic dataloading and splitting
dataset = PglGraphPropPredDataset(name=args.dataset)
splitted_idx = dataset.get_idx_split()
### automatic evaluator. takes dataset name as input
evaluator = Evaluator(args.dataset)
graph_data, label = dataset[:2]
batch_graph = pgl.graph.MultiGraph(graph_data)
graph_data = batch_graph
train_program = fluid.Program()
startup_program = fluid.Program()
test_program = fluid.Program()
# degree normalize
graph_data.edge_feat["feat"] = graph_data.edge_feat["feat"].astype("int64")
graph_data.node_feat["feat"] = graph_data.node_feat["feat"].astype("int64")
model = GNNModel(
name="gnn", num_task=dataset.num_tasks, emb_dim=64, num_layers=2)
with fluid.program_guard(train_program, startup_program):
gw = pgl.graph_wrapper.GraphWrapper(
"graph",
place=place,
node_feat=graph_data.node_feat_info(),
edge_feat=graph_data.edge_feat_info())
pred = model.forward(gw)
sigmoid_pred = fluid.layers.sigmoid(pred)
val_program = train_program.clone(for_test=True)
initializer = []
with fluid.program_guard(train_program, startup_program):
train_label = fluid.layers.data(
name="label", dtype="float32", shape=[None, dataset.num_tasks])
train_weight = fluid.layers.data(
name="weight", dtype="float32", shape=[None, dataset.num_tasks])
train_loss_t = fluid.layers.sigmoid_cross_entropy_with_logits(
x=pred, label=train_label) * train_weight
train_loss_t = fluid.layers.reduce_sum(train_loss_t)
adam = fluid.optimizer.Adam(
learning_rate=1e-2,
regularization=fluid.regularizer.L2DecayRegularizer(
regularization_coeff=0.0005))
adam.minimize(train_loss_t)
exe = fluid.Executor(place)
exe.run(startup_program)
for epoch in range(1, args.epochs + 1):
print("Epoch", epoch)
train(exe, 128, gw, train_program, splitted_idx, dataset, evaluator,
train_loss_t, sigmoid_pred)
evaluate(exe, 128, gw, val_program, splitted_idx, dataset, "valid",
evaluator, sigmoid_pred)
evaluate(exe, 128, gw, val_program, splitted_idx, dataset, "test",
evaluator, sigmoid_pred)
if __name__ == "__main__":
main()
...@@ -14,10 +14,13 @@ ...@@ -14,10 +14,13 @@
"""test ogb """test ogb
""" """
import argparse import argparse
import time
import pgl import logging
import numpy as np import numpy as np
import paddle.fluid as fluid import paddle.fluid as fluid
import pgl
from pgl.contrib.ogb.linkproppred.dataset_pgl import PglLinkPropPredDataset from pgl.contrib.ogb.linkproppred.dataset_pgl import PglLinkPropPredDataset
from pgl.utils import paddle_helper from pgl.utils import paddle_helper
from ogb.linkproppred import Evaluator from ogb.linkproppred import Evaluator
...@@ -44,12 +47,12 @@ class GNNModel(object): ...@@ -44,12 +47,12 @@ class GNNModel(object):
self.src_nodes = fluid.layers.data( self.src_nodes = fluid.layers.data(
name='src_nodes', name='src_nodes',
shape=[None, 1], shape=[None],
dtype='int64', ) dtype='int64', )
self.dst_nodes = fluid.layers.data( self.dst_nodes = fluid.layers.data(
name='dst_nodes', name='dst_nodes',
shape=[None, 1], shape=[None],
dtype='int64', ) dtype='int64', )
self.edge_label = fluid.layers.data( self.edge_label = fluid.layers.data(
...@@ -63,7 +66,6 @@ class GNNModel(object): ...@@ -63,7 +66,6 @@ class GNNModel(object):
shape=[self.num_nodes, self.emb_dim], shape=[self.num_nodes, self.emb_dim],
dtype="float32", dtype="float32",
name=self.name + "_embedding") name=self.name + "_embedding")
# edge_attr = fluid.layers.fc(graph.edge_feat["feat"], size=self.emb_dim)
for layer in range(self.num_layers): for layer in range(self.num_layers):
msg = graph.send( msg = graph.send(
...@@ -83,8 +85,8 @@ class GNNModel(object): ...@@ -83,8 +85,8 @@ class GNNModel(object):
name=self.name + '_bias_%s' % layer) name=self.name + '_bias_%s' % layer)
h = fluid.layers.elementwise_add(h, bias, act="relu") h = fluid.layers.elementwise_add(h, bias, act="relu")
src = fluid.layers.gather(h, self.src_nodes) src = fluid.layers.gather(h, self.src_nodes, overwrite=False)
dst = fluid.layers.gather(h, self.dst_nodes) dst = fluid.layers.gather(h, self.dst_nodes, overwrite=False)
edge_embed = src * dst edge_embed = src * dst
pred = fluid.layers.fc(input=edge_embed, pred = fluid.layers.fc(input=edge_embed,
size=1, size=1,
...@@ -107,17 +109,22 @@ def main(): ...@@ -107,17 +109,22 @@ def main():
parser.add_argument( parser.add_argument(
'--epochs', '--epochs',
type=int, type=int,
default=100, default=4,
help='number of epochs to train (default: 100)') help='number of epochs to train (default: 100)')
parser.add_argument( parser.add_argument(
'--dataset', '--dataset',
type=str, type=str,
default="ogbl-ppa", default="ogbl-ppa",
help='dataset name (default: protein protein associations)') help='dataset name (default: protein protein associations)')
parser.add_argument('--use_cuda', action='store_true')
parser.add_argument('--batch_size', type=int, default=5120)
parser.add_argument('--embed_dim', type=int, default=64)
parser.add_argument('--num_layers', type=int, default=2)
parser.add_argument('--lr', type=float, default=0.001)
args = parser.parse_args() args = parser.parse_args()
print(args)
#place = fluid.CUDAPlace(0) place = fluid.CUDAPlace(0) if args.use_cuda else fluid.CPUPlace()
place = fluid.CPUPlace() # Dataset too big to use GPU
### automatic dataloading and splitting ### automatic dataloading and splitting
print("loadding dataset") print("loadding dataset")
...@@ -135,19 +142,20 @@ def main(): ...@@ -135,19 +142,20 @@ def main():
train_program = fluid.Program() train_program = fluid.Program()
startup_program = fluid.Program() startup_program = fluid.Program()
test_program = fluid.Program()
# degree normalize # degree normalize
indegree = graph_data.indegree() indegree = graph_data.indegree()
norm = np.zeros_like(indegree, dtype="float32") norm = np.zeros_like(indegree, dtype="float32")
norm[indegree > 0] = np.power(indegree[indegree > 0], -0.5) norm[indegree > 0] = np.power(indegree[indegree > 0], -0.5)
graph_data.node_feat["norm"] = np.expand_dims(norm, -1).astype("float32") graph_data.node_feat["norm"] = np.expand_dims(norm, -1).astype("float32")
# graph_data.node_feat["index"] = np.array([i for i in range(graph_data.num_nodes)], dtype=np.int64).reshape(-1,1)
with fluid.program_guard(train_program, startup_program): with fluid.program_guard(train_program, startup_program):
model = GNNModel( model = GNNModel(
name="gnn", name="gnn",
num_nodes=graph_data.num_nodes, num_nodes=graph_data.num_nodes,
emb_dim=64, emb_dim=args.embed_dim,
num_layers=2) num_layers=args.num_layers)
gw = pgl.graph_wrapper.GraphWrapper( gw = pgl.graph_wrapper.GraphWrapper(
"graph", "graph",
place, place,
...@@ -158,50 +166,106 @@ def main(): ...@@ -158,50 +166,106 @@ def main():
val_program = train_program.clone(for_test=True) val_program = train_program.clone(for_test=True)
with fluid.program_guard(train_program, startup_program): with fluid.program_guard(train_program, startup_program):
global_steps = int(splitted_edge['train_edge'].shape[0] /
args.batch_size * 2)
learning_rate = fluid.layers.polynomial_decay(args.lr, global_steps,
0.00005)
adam = fluid.optimizer.Adam( adam = fluid.optimizer.Adam(
learning_rate=1e-2, learning_rate=learning_rate,
regularization=fluid.regularizer.L2DecayRegularizer( regularization=fluid.regularizer.L2DecayRegularizer(
regularization_coeff=0.0005)) regularization_coeff=0.0005))
adam.minimize(loss) adam.minimize(loss)
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(startup_program) exe.run(startup_program)
feed = gw.to_feed(graph_data) feed = gw.to_feed(graph_data)
print("evaluate result before training: ")
result = test(exe, val_program, prob, evaluator, feed, splitted_edge)
print(result)
print("training")
cc = 0
for epoch in range(1, args.epochs + 1): for epoch in range(1, args.epochs + 1):
feed['src_nodes'] = splitted_edge["train_edge"][:, 0].reshape(-1, 1) for batch_data, batch_label in data_generator(
feed['dst_nodes'] = splitted_edge["train_edge"][:, 1].reshape(-1, 1) graph_data,
feed['edge_label'] = splitted_edge["train_edge_label"].astype( splitted_edge["train_edge"],
"float32").reshape(-1, 1) splitted_edge["train_edge_label"],
res_loss, y_pred = exe.run(train_program, batch_size=args.batch_size):
feed=feed, feed['src_nodes'] = batch_data[:, 0].reshape(-1, 1)
fetch_list=[loss, prob]) feed['dst_nodes'] = batch_data[:, 1].reshape(-1, 1)
print("Loss %s" % res_loss[0]) feed['edge_label'] = batch_label.astype("float32")
result = {} res_loss, y_pred, b_lr = exe.run(
print("Evaluating...") train_program,
feed['src_nodes'] = splitted_edge["valid_edge"][:, 0].reshape(-1, 1) feed=feed,
feed['dst_nodes'] = splitted_edge["valid_edge"][:, 1].reshape(-1, 1) fetch_list=[loss, prob, learning_rate])
feed['edge_label'] = splitted_edge["valid_edge_label"].astype( if cc % 1 == 0:
"float32").reshape(-1, 1) print("epoch %d | step %d | lr %s | Loss %s" %
y_pred = exe.run(val_program, feed=feed, fetch_list=[prob])[0] (epoch, cc, b_lr[0], res_loss[0]))
input_dict = { cc += 1
"y_true": splitted_edge["valid_edge_label"],
"y_pred": y_pred.reshape(-1, ), if cc % 20 == 0:
} print("Evaluating...")
result["valid"] = evaluator.eval(input_dict) result = test(exe, val_program, prob, evaluator, feed,
splitted_edge)
feed['src_nodes'] = splitted_edge["test_edge"][:, 0].reshape(-1, 1) print("epoch %d | step %d" % (epoch, cc))
feed['dst_nodes'] = splitted_edge["test_edge"][:, 1].reshape(-1, 1) print(result)
feed['edge_label'] = splitted_edge["test_edge_label"].astype(
"float32").reshape(-1, 1)
y_pred = exe.run(val_program, feed=feed, fetch_list=[prob])[0] def test(exe, val_program, prob, evaluator, feed, splitted_edge):
input_dict = { """Evaluation"""
"y_true": splitted_edge["test_edge_label"], result = {}
"y_pred": y_pred.reshape(-1, ), feed['src_nodes'] = splitted_edge["valid_edge"][:, 0].reshape(-1, 1)
} feed['dst_nodes'] = splitted_edge["valid_edge"][:, 1].reshape(-1, 1)
result["test"] = evaluator.eval(input_dict) feed['edge_label'] = splitted_edge["valid_edge_label"].astype(
print(result) "float32").reshape(-1, 1)
y_pred = exe.run(val_program, feed=feed, fetch_list=[prob])[0]
input_dict = {
"y_true": splitted_edge["valid_edge_label"],
"y_pred": y_pred.reshape(-1, ),
}
result["valid"] = evaluator.eval(input_dict)
feed['src_nodes'] = splitted_edge["test_edge"][:, 0].reshape(-1, 1)
feed['dst_nodes'] = splitted_edge["test_edge"][:, 1].reshape(-1, 1)
feed['edge_label'] = splitted_edge["test_edge_label"].astype(
"float32").reshape(-1, 1)
y_pred = exe.run(val_program, feed=feed, fetch_list=[prob])[0]
input_dict = {
"y_true": splitted_edge["test_edge_label"],
"y_pred": y_pred.reshape(-1, ),
}
result["test"] = evaluator.eval(input_dict)
return result
def data_generator(graph, data, label_data, batch_size, shuffle=True):
"""Data Generator"""
perm = np.arange(0, len(data))
if shuffle:
np.random.shuffle(perm)
offset = 0
while offset < len(perm):
batch_index = perm[offset:(offset + batch_size)]
offset += batch_size
pos_data = data[batch_index]
pos_label = label_data[batch_index]
neg_src_node = pos_data[:, 0]
neg_dst_node = np.random.choice(
pos_data.reshape(-1, ), size=len(neg_src_node))
neg_data = np.hstack(
[neg_src_node.reshape(-1, 1), neg_dst_node.reshape(-1, 1)])
exists = graph.has_edges_between(neg_src_node, neg_dst_node)
neg_data = neg_data[np.invert(exists)]
neg_label = np.zeros(shape=len(neg_data), dtype=np.int64)
batch_data = np.vstack([pos_data, neg_data])
label = np.vstack([pos_label.reshape(-1, 1), neg_label.reshape(-1, 1)])
yield batch_data, label
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
# limitations under the License. # limitations under the License.
"""Generate pgl apis """Generate pgl apis
""" """
__version__ = "1.0.1" __version__ = "1.0.2"
from pgl import layers from pgl import layers
from pgl import graph_wrapper from pgl import graph_wrapper
from pgl import graph from pgl import graph
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""MolEncoder for ogb
"""
import paddle.fluid as fluid
from ogb.utils.features import get_atom_feature_dims, get_bond_feature_dims
class AtomEncoder(object):
"""AtomEncoder for encoding node features"""
def __init__(self, name, emb_dim):
self.emb_dim = emb_dim
self.name = name
def __call__(self, x):
atom_feature = get_atom_feature_dims()
atom_input = fluid.layers.split(
x, num_or_sections=len(atom_feature), dim=-1)
outputs = None
count = 0
for _x, _atom_input_dim in zip(atom_input, atom_feature):
count += 1
emb = fluid.layers.embedding(
_x,
size=(_atom_input_dim, self.emb_dim),
param_attr=fluid.ParamAttr(
name=self.name + '_atom_feat_%s' % count))
if outputs is None:
outputs = emb
else:
outputs = outputs + emb
return outputs
class BondEncoder(object):
"""Bond for encoding edge features"""
def __init__(self, name, emb_dim):
self.emb_dim = emb_dim
self.name = name
def __call__(self, x):
bond_feature = get_bond_feature_dims()
bond_input = fluid.layers.split(
x, num_or_sections=len(bond_feature), dim=-1)
outputs = None
count = 0
for _x, _bond_input_dim in zip(bond_input, bond_feature):
count += 1
emb = fluid.layers.embedding(
_x,
size=(_bond_input_dim, self.emb_dim),
param_attr=fluid.ParamAttr(
name=self.name + '_bond_feat_%s' % count))
if outputs is None:
outputs = emb
else:
outputs = outputs + emb
return outputs
...@@ -60,7 +60,7 @@ class PglLinkPropPredDataset(object): ...@@ -60,7 +60,7 @@ class PglLinkPropPredDataset(object):
"""pre_process downlaoding data """pre_process downlaoding data
""" """
processed_dir = osp.join(self.root, 'processed') processed_dir = osp.join(self.root, 'processed')
pre_processed_file_path = osp.join(processed_dir, 'dgl_data_processed') pre_processed_file_path = osp.join(processed_dir, 'pgl_data_processed')
if osp.exists(pre_processed_file_path): if osp.exists(pre_processed_file_path):
#TODO: Reload Preprocess files #TODO: Reload Preprocess files
......
...@@ -20,8 +20,9 @@ import numpy as np ...@@ -20,8 +20,9 @@ import numpy as np
import pickle as pkl import pickle as pkl
import time import time
import pgl.graph_kernel as graph_kernel import pgl.graph_kernel as graph_kernel
from collections import defaultdict
__all__ = ['Graph', 'SubGraph'] __all__ = ['Graph', 'SubGraph', 'MultiGraph']
def _hide_num_nodes(shape): def _hide_num_nodes(shape):
...@@ -140,11 +141,11 @@ class Graph(object): ...@@ -140,11 +141,11 @@ class Graph(object):
self._edges = edges self._edges = edges
self._num_nodes = num_nodes self._num_nodes = num_nodes
if len(edges) == 0:
raise ValueError("The Graph have no edges.")
self._adj_src_index = None self._adj_src_index = None
self._adj_dst_index = None self._adj_dst_index = None
self.indegree()
self._num_graph = 1
self._graph_lod = np.array([0, self.num_nodes], dtype="int32")
def dump(self, path): def dump(self, path):
if not os.path.exists(path): if not os.path.exists(path):
...@@ -176,10 +177,15 @@ class Graph(object): ...@@ -176,10 +177,15 @@ class Graph(object):
"""Return an EdgeIndex object for src. """Return an EdgeIndex object for src.
""" """
if self._adj_src_index is None: if self._adj_src_index is None:
if len(self._edges) == 0:
u = np.array([], dtype="int64")
v = np.array([], dtype="int64")
else:
u = self._edges[:, 0]
v = self._edges[:, 1]
self._adj_src_index = EdgeIndex( self._adj_src_index = EdgeIndex(
u=self._edges[:, 0], u=u, v=v, num_nodes=self._num_nodes)
v=self._edges[:, 1],
num_nodes=self._num_nodes)
return self._adj_src_index return self._adj_src_index
@property @property
...@@ -187,10 +193,15 @@ class Graph(object): ...@@ -187,10 +193,15 @@ class Graph(object):
"""Return an EdgeIndex object for dst. """Return an EdgeIndex object for dst.
""" """
if self._adj_dst_index is None: if self._adj_dst_index is None:
if len(self._edges) == 0:
v = np.array([], dtype="int64")
u = np.array([], dtype="int64")
else:
v = self._edges[:, 0]
u = self._edges[:, 1]
self._adj_dst_index = EdgeIndex( self._adj_dst_index = EdgeIndex(
u=self._edges[:, 1], u=u, v=v, num_nodes=self._num_nodes)
v=self._edges[:, 0],
num_nodes=self._num_nodes)
return self._adj_dst_index return self._adj_dst_index
@property @property
...@@ -777,6 +788,16 @@ class Graph(object): ...@@ -777,6 +788,16 @@ class Graph(object):
cur_nodes = nxt_nodes cur_nodes = nxt_nodes
return walk return walk
@property
def num_graph(self):
""" Return Number of Graphs"""
return self._num_graph
@property
def graph_lod(self):
""" Return Graph Lod Index for Paddle Computation"""
return self._graph_lod
class SubGraph(Graph): class SubGraph(Graph):
"""Implementation of SubGraph in pgl. """Implementation of SubGraph in pgl.
...@@ -832,6 +853,81 @@ class SubGraph(Graph): ...@@ -832,6 +853,81 @@ class SubGraph(Graph):
return graph_kernel.map_nodes(nodes, self._to_reindex) return graph_kernel.map_nodes(nodes, self._to_reindex)
class MultiGraph(Graph):
"""Implementation of multiple disjoint graph structure in pgl.
This is a simple implementation of graph structure in pgl.
Args:
graph_list : A list of Graph Instances
Examples:
.. code-block:: python
batch_graph = MultiGraph([graph1, graph2, graph3])
"""
def __init__(self, graph_list):
num_nodes = np.sum([g.num_nodes for g in graph_list])
node_feat = self._join_node_feature(graph_list)
edge_feat = self._join_edge_feature(graph_list)
edges = self._join_edges(graph_list)
super(MultiGraph, self).__init__(
num_nodes=num_nodes,
edges=edges,
node_feat=node_feat,
edge_feat=edge_feat)
self._num_graph = len(graph_list)
self._src_graph = graph_list
graph_lod = [g.num_nodes for g in graph_list]
graph_lod = np.cumsum(graph_lod, dtype="int32")
graph_lod = np.insert(graph_lod, 0, 0)
self._graph_lod = graph_lod
def __getitem__(self, index):
return self._src_graph[index]
def _join_node_feature(self, graph_list):
"""join node features for multiple graph"""
node_feat = defaultdict(lambda: [])
for graph in graph_list:
for key in graph.node_feat:
node_feat[key].append(graph.node_feat[key])
ret_node_feat = {}
for key in node_feat:
ret_node_feat[key] = np.vstack(node_feat[key])
return ret_node_feat
def _join_edge_feature(self, graph_list):
"""join edge features for multiple graph"""
edge_feat = defaultdict(lambda: [])
for graph in graph_list:
for key in graph.edge_feat:
efeat = graph.edge_feat[key]
if len(efeat) > 0:
edge_feat[key].append(efeat)
ret_edge_feat = {}
for key in edge_feat:
ret_edge_feat[key] = np.vstack(edge_feat[key])
return ret_edge_feat
def _join_edges(self, graph_list):
"""join edges for multiple graph"""
list_edges = []
start_offset = 0
for graph in graph_list:
edges = graph.edges
if len(edges) > 0:
edges = edges + start_offset
list_edges.append(edges)
start_offset += graph.num_nodes
edges = np.vstack(list_edges)
return edges
class MemmapEdgeIndex(EdgeIndex): class MemmapEdgeIndex(EdgeIndex):
def __init__(self, path): def __init__(self, path):
self._degree = np.load(os.path.join(path, 'degree.npy'), mmap_mode="r") self._degree = np.load(os.path.join(path, 'degree.npy'), mmap_mode="r")
......
...@@ -219,7 +219,11 @@ def sample_subset(list nids, long long maxdegree, shuffle=False): ...@@ -219,7 +219,11 @@ def sample_subset(list nids, long long maxdegree, shuffle=False):
output.append(nids[inc]) output.append(nids[inc])
else: else:
sample_size = buff_size if buff_size <= maxdegree else maxdegree sample_size = buff_size if buff_size <= maxdegree else maxdegree
subset_choose_index(sample_size, nids[inc], rnd, buff_nid, offset) if isinstance(nids[inc], list):
tmp = np.array(nids[inc], dtype=np.int64)
else:
tmp = nids[inc]
subset_choose_index(sample_size, tmp, rnd, buff_nid, offset)
output.append(buff_nid[offset:offset+sample_size]) output.append(buff_nid[offset:offset+sample_size])
offset += sample_size offset += sample_size
return output return output
...@@ -252,7 +256,14 @@ def sample_subset_with_eid(list nids, list eids, long long maxdegree, shuffle=Fa ...@@ -252,7 +256,14 @@ def sample_subset_with_eid(list nids, list eids, long long maxdegree, shuffle=Fa
output_eid.append(eids[inc]) output_eid.append(eids[inc])
else: else:
sample_size = buff_size if buff_size <= maxdegree else maxdegree sample_size = buff_size if buff_size <= maxdegree else maxdegree
subset_choose_index_eid(sample_size, nids[inc], eids[inc], rnd, buff_nid, buff_eid, offset) if isinstance(nids[inc], list):
tmp = np.array(nids[inc], dtype=np.int64)
tmp_eids = np.array(eids[inc], dtype=np.int64)
else:
tmp = nids[inc]
tmp_eids = eids[inc]
subset_choose_index_eid(sample_size, tmp, tmp_eids, rnd, buff_nid, buff_eid, offset)
output.append(buff_nid[offset:offset+sample_size]) output.append(buff_nid[offset:offset+sample_size])
output_eid.append(buff_eid[offset:offset+sample_size]) output_eid.append(buff_eid[offset:offset+sample_size])
offset += sample_size offset += sample_size
......
...@@ -36,19 +36,22 @@ def send(src, dst, nfeat, efeat, message_func): ...@@ -36,19 +36,22 @@ def send(src, dst, nfeat, efeat, message_func):
return msg return msg
def recv(dst, uniq_dst, bucketing_index, msg, reduce_function, node_ids): def recv(dst, uniq_dst, bucketing_index, msg, reduce_function, num_nodes,
num_edges):
"""Recv message from given msg to dst nodes. """Recv message from given msg to dst nodes.
""" """
empty_msg_flag = fluid.layers.cast(num_edges > 0, dtype="float32")
if reduce_function == "sum": if reduce_function == "sum":
if isinstance(msg, dict): if isinstance(msg, dict):
raise TypeError("The message for build-in function" raise TypeError("The message for build-in function"
" should be Tensor not dict.") " should be Tensor not dict.")
try: try:
out_dims = msg.shape[-1] out_dim = msg.shape[-1]
init_output = fluid.layers.fill_constant_batch_size_like( init_output = fluid.layers.fill_constant(
node_ids, shape=[1, out_dims], value=0, dtype="float32") shape=[num_nodes, out_dim], value=0, dtype="float32")
init_output.stop_gradient = False init_output.stop_gradient = False
msg = msg * empty_msg_flag
output = paddle_helper.scatter_add(init_output, dst, msg) output = paddle_helper.scatter_add(init_output, dst, msg)
return output return output
except TypeError as e: except TypeError as e:
...@@ -60,17 +63,16 @@ def recv(dst, uniq_dst, bucketing_index, msg, reduce_function, node_ids): ...@@ -60,17 +63,16 @@ def recv(dst, uniq_dst, bucketing_index, msg, reduce_function, node_ids):
reduce_function = sum_func reduce_function = sum_func
# convert msg into lodtensor
bucketed_msg = op.nested_lod_reset(msg, bucketing_index) bucketed_msg = op.nested_lod_reset(msg, bucketing_index)
# Check dim for bucketed_msg equal to out_dims
output = reduce_function(bucketed_msg) output = reduce_function(bucketed_msg)
out_dims = output.shape[-1] output_dim = output.shape[-1]
output = output * empty_msg_flag
init_output = fluid.layers.fill_constant_batch_size_like( init_output = fluid.layers.fill_constant(
node_ids, shape=[1, out_dims], value=0, dtype="float32") shape=[num_nodes, output_dim], value=0, dtype="float32")
init_output.stop_gradient = False init_output.stop_gradient = True
output = fluid.layers.scatter(init_output, uniq_dst, output) final_output = fluid.layers.scatter(init_output, uniq_dst, output)
return output return final_output
class BaseGraphWrapper(object): class BaseGraphWrapper(object):
...@@ -98,6 +100,8 @@ class BaseGraphWrapper(object): ...@@ -98,6 +100,8 @@ class BaseGraphWrapper(object):
self._edge_uniq_dst = None self._edge_uniq_dst = None
self._edge_uniq_dst_count = None self._edge_uniq_dst_count = None
self._node_ids = None self._node_ids = None
self._graph_lod = None
self._num_graph = None
self._data_name_prefix = "" self._data_name_prefix = ""
def __repr__(self): def __repr__(self):
...@@ -194,7 +198,8 @@ class BaseGraphWrapper(object): ...@@ -194,7 +198,8 @@ class BaseGraphWrapper(object):
bucketing_index=self._edge_uniq_dst_count, bucketing_index=self._edge_uniq_dst_count,
msg=msg, msg=msg,
reduce_function=reduce_function, reduce_function=reduce_function,
node_ids=self._node_ids) num_edges=self._num_edges,
num_nodes=self._num_nodes)
return output return output
@property @property
...@@ -216,6 +221,24 @@ class BaseGraphWrapper(object): ...@@ -216,6 +221,24 @@ class BaseGraphWrapper(object):
""" """
return self._num_nodes return self._num_nodes
@property
def graph_lod(self):
"""Return graph index for graphs
Return:
A variable with shape [None ] as the Lod information of multiple-graph.
"""
return self._graph_lod
@property
def num_graph(self):
"""Return a variable of number of graphs
Return:
A variable with shape (1,) as the number of Graphs in int64.
"""
return self._num_graph
@property @property
def edge_feat(self): def edge_feat(self):
"""Return a dictionary of tensor representing edge features. """Return a dictionary of tensor representing edge features.
...@@ -309,7 +332,6 @@ class StaticGraphWrapper(BaseGraphWrapper): ...@@ -309,7 +332,6 @@ class StaticGraphWrapper(BaseGraphWrapper):
def __create_graph_attr(self, graph): def __create_graph_attr(self, graph):
"""Create graph attributes for paddlepaddle. """Create graph attributes for paddlepaddle.
""" """
src, dst = list(zip(*graph.edges))
src, dst, eid = graph.sorted_edges(sort_by="dst") src, dst, eid = graph.sorted_edges(sort_by="dst")
indegree = graph.indegree() indegree = graph.indegree()
nodes = graph.nodes nodes = graph.nodes
...@@ -317,6 +339,17 @@ class StaticGraphWrapper(BaseGraphWrapper): ...@@ -317,6 +339,17 @@ class StaticGraphWrapper(BaseGraphWrapper):
uniq_dst_count = indegree[indegree > 0] uniq_dst_count = indegree[indegree > 0]
uniq_dst_count = np.cumsum(uniq_dst_count, dtype='int32') uniq_dst_count = np.cumsum(uniq_dst_count, dtype='int32')
uniq_dst_count = np.insert(uniq_dst_count, 0, 0) uniq_dst_count = np.insert(uniq_dst_count, 0, 0)
graph_lod = graph.graph_lod
num_graph = graph.num_graph
num_edges = len(src)
if num_edges == 0:
# Fake Graph
src = np.array([0], dtype="int64")
dst = np.array([0], dtype="int64")
eid = np.array([0], dtype="int64")
uniq_dst_count = np.array([0, 1], dtype="int32")
uniq_dst = np.array([0], dtype="int64")
edge_feat = {} edge_feat = {}
...@@ -327,6 +360,20 @@ class StaticGraphWrapper(BaseGraphWrapper): ...@@ -327,6 +360,20 @@ class StaticGraphWrapper(BaseGraphWrapper):
self.__create_graph_node_feat(node_feat, self._initializers) self.__create_graph_node_feat(node_feat, self._initializers)
self.__create_graph_edge_feat(edge_feat, self._initializers) self.__create_graph_edge_feat(edge_feat, self._initializers)
self._num_edges, init = paddle_helper.constant(
dtype="int64",
value=np.array(
[num_edges], dtype="int64"),
name=self._data_name_prefix + '/num_edges')
self._initializers.append(init)
self._num_graph, init = paddle_helper.constant(
dtype="int64",
value=np.array(
[num_graph], dtype="int64"),
name=self._data_name_prefix + '/num_graph')
self._initializers.append(init)
self._edges_src, init = paddle_helper.constant( self._edges_src, init = paddle_helper.constant(
dtype="int64", dtype="int64",
value=src, value=src,
...@@ -358,6 +405,12 @@ class StaticGraphWrapper(BaseGraphWrapper): ...@@ -358,6 +405,12 @@ class StaticGraphWrapper(BaseGraphWrapper):
value=uniq_dst_count) value=uniq_dst_count)
self._initializers.append(init) self._initializers.append(init)
self._graph_lod, init = paddle_helper.constant(
name=self._data_name_prefix + "/graph_lod",
dtype="int32",
value=graph_lod)
self._initializers.append(init)
node_ids_value = np.arange(0, graph.num_nodes, dtype="int64") node_ids_value = np.arange(0, graph.num_nodes, dtype="int64")
self._node_ids, init = paddle_helper.constant( self._node_ids, init = paddle_helper.constant(
name=self._data_name_prefix + "/node_ids", name=self._data_name_prefix + "/node_ids",
...@@ -496,6 +549,18 @@ class GraphWrapper(BaseGraphWrapper): ...@@ -496,6 +549,18 @@ class GraphWrapper(BaseGraphWrapper):
def __create_graph_attr_holders(self): def __create_graph_attr_holders(self):
"""Create data holders for graph attributes. """Create data holders for graph attributes.
""" """
self._num_edges = fluid.layers.data(
self._data_name_prefix + '/num_edges',
shape=[1],
append_batch_size=False,
dtype="int64",
stop_gradient=True)
self._num_graph = fluid.layers.data(
self._data_name_prefix + '/num_graph',
shape=[1],
append_batch_size=False,
dtype="int64",
stop_gradient=True)
self._edges_src = fluid.layers.data( self._edges_src = fluid.layers.data(
self._data_name_prefix + '/edges_src', self._data_name_prefix + '/edges_src',
shape=[None], shape=[None],
...@@ -514,18 +579,28 @@ class GraphWrapper(BaseGraphWrapper): ...@@ -514,18 +579,28 @@ class GraphWrapper(BaseGraphWrapper):
append_batch_size=False, append_batch_size=False,
dtype='int64', dtype='int64',
stop_gradient=True) stop_gradient=True)
self._edge_uniq_dst = fluid.layers.data( self._edge_uniq_dst = fluid.layers.data(
self._data_name_prefix + "/uniq_dst", self._data_name_prefix + "/uniq_dst",
shape=[None], shape=[None],
append_batch_size=False, append_batch_size=False,
dtype="int64", dtype="int64",
stop_gradient=True) stop_gradient=True)
self._graph_lod = fluid.layers.data(
self._data_name_prefix + "/graph_lod",
shape=[None],
append_batch_size=False,
dtype="int32",
stop_gradient=True)
self._edge_uniq_dst_count = fluid.layers.data( self._edge_uniq_dst_count = fluid.layers.data(
self._data_name_prefix + "/uniq_dst_count", self._data_name_prefix + "/uniq_dst_count",
shape=[None], shape=[None],
append_batch_size=False, append_batch_size=False,
dtype="int32", dtype="int32",
stop_gradient=True) stop_gradient=True)
self._node_ids = fluid.layers.data( self._node_ids = fluid.layers.data(
self._data_name_prefix + "/node_ids", self._data_name_prefix + "/node_ids",
shape=[None], shape=[None],
...@@ -539,9 +614,16 @@ class GraphWrapper(BaseGraphWrapper): ...@@ -539,9 +614,16 @@ class GraphWrapper(BaseGraphWrapper):
dtype="int64", dtype="int64",
stop_gradient=True) stop_gradient=True)
self._holder_list.extend([ self._holder_list.extend([
self._edges_src, self._edges_dst, self._num_nodes, self._edges_src,
self._edge_uniq_dst, self._edge_uniq_dst_count, self._node_ids, self._edges_dst,
self._indegree self._num_nodes,
self._edge_uniq_dst,
self._edge_uniq_dst_count,
self._node_ids,
self._indegree,
self._graph_lod,
self._num_graph,
self._num_edges,
]) ])
def __create_graph_node_feat_holders(self, node_feat_name, node_feat_shape, def __create_graph_node_feat_holders(self, node_feat_name, node_feat_shape,
...@@ -587,10 +669,22 @@ class GraphWrapper(BaseGraphWrapper): ...@@ -587,10 +669,22 @@ class GraphWrapper(BaseGraphWrapper):
src, dst, eid = graph.sorted_edges(sort_by="dst") src, dst, eid = graph.sorted_edges(sort_by="dst")
indegree = graph.indegree() indegree = graph.indegree()
nodes = graph.nodes nodes = graph.nodes
num_edges = len(src)
uniq_dst = nodes[indegree > 0] uniq_dst = nodes[indegree > 0]
uniq_dst_count = indegree[indegree > 0] uniq_dst_count = indegree[indegree > 0]
uniq_dst_count = np.cumsum(uniq_dst_count, dtype='int32') uniq_dst_count = np.cumsum(uniq_dst_count, dtype='int32')
uniq_dst_count = np.insert(uniq_dst_count, 0, 0) uniq_dst_count = np.insert(uniq_dst_count, 0, 0)
num_graph = graph.num_graph
graph_lod = graph.graph_lod
if num_edges == 0:
# Fake Graph
src = np.array([0], dtype="int64")
dst = np.array([0], dtype="int64")
eid = np.array([0], dtype="int64")
uniq_dst_count = np.array([0, 1], dtype="int32")
uniq_dst = np.array([0], dtype="int64")
edge_feat = {} edge_feat = {}
...@@ -598,14 +692,20 @@ class GraphWrapper(BaseGraphWrapper): ...@@ -598,14 +692,20 @@ class GraphWrapper(BaseGraphWrapper):
edge_feat[key] = value[eid] edge_feat[key] = value[eid]
node_feat = graph.node_feat node_feat = graph.node_feat
feed_dict[self._data_name_prefix + '/num_edges'] = np.array(
[num_edges], dtype="int64")
feed_dict[self._data_name_prefix + '/edges_src'] = src feed_dict[self._data_name_prefix + '/edges_src'] = src
feed_dict[self._data_name_prefix + '/edges_dst'] = dst feed_dict[self._data_name_prefix + '/edges_dst'] = dst
feed_dict[self._data_name_prefix + '/num_nodes'] = np.array( feed_dict[self._data_name_prefix + '/num_nodes'] = np.array(
graph.num_nodes) [graph.num_nodes], dtype="int64")
feed_dict[self._data_name_prefix + '/uniq_dst'] = uniq_dst feed_dict[self._data_name_prefix + '/uniq_dst'] = uniq_dst
feed_dict[self._data_name_prefix + '/uniq_dst_count'] = uniq_dst_count feed_dict[self._data_name_prefix + '/uniq_dst_count'] = uniq_dst_count
feed_dict[self._data_name_prefix + '/node_ids'] = graph.nodes feed_dict[self._data_name_prefix + '/node_ids'] = graph.nodes
feed_dict[self._data_name_prefix + '/indegree'] = indegree feed_dict[self._data_name_prefix + '/indegree'] = indegree
feed_dict[self._data_name_prefix + '/graph_lod'] = graph_lod
feed_dict[self._data_name_prefix + '/num_graph'] = np.array(
[num_graph], dtype="int64")
feed_dict[self._data_name_prefix + '/indegree'] = indegree
for key in self.node_feat_tensor_dict: for key in self.node_feat_tensor_dict:
feed_dict[self._data_name_prefix + '/node_feat/' + feed_dict[self._data_name_prefix + '/node_feat/' +
......
...@@ -18,7 +18,10 @@ from pgl.layers import conv ...@@ -18,7 +18,10 @@ from pgl.layers import conv
from pgl.layers.conv import * from pgl.layers.conv import *
from pgl.layers import set2set from pgl.layers import set2set
from pgl.layers.set2set import * from pgl.layers.set2set import *
from pgl.layers import graph_pool
from pgl.layers.graph_pool import *
__all__ = [] __all__ = []
__all__ += conv.__all__ __all__ += conv.__all__
__all__ += set2set.__all__ __all__ += set2set.__all__
__all__ += graph_pool.__all__
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This package implements common layers to help building
graph neural networks.
"""
import paddle.fluid as fluid
from pgl import graph_wrapper
from pgl.utils import paddle_helper
from pgl.utils import op
__all__ = ['graph_pooling']
def graph_pooling(gw, node_feat, pool_type):
"""Implementation of graph pooling
This is an implementation of graph pooling
Args:
gw: Graph wrapper object (:code:`StaticGraphWrapper` or :code:`GraphWrapper`)
node_feat: A tensor with shape (num_nodes, feature_size).
pool_type: The type of pooling ("sum", "average" , "min")
Return:
A tensor with shape (num_graph, hidden_size)
"""
graph_feat = op.nested_lod_reset(node_feat, gw.graph_lod)
graph_feat = fluid.layers.sequence_pool(graph_feat, pool_type)
return graph_feat
...@@ -56,7 +56,7 @@ def edge_hash(src, dst): ...@@ -56,7 +56,7 @@ def edge_hash(src, dst):
def graphsage_sample(graph, nodes, samples, ignore_edges=[]): def graphsage_sample(graph, nodes, samples, ignore_edges=[]):
"""Implement of graphsage sample. """Implement of graphsage sample.
Reference paper: https://cs.stanford.edu/~jure/pubs/node2vec-kdd16.pdf. Reference paper: https://cs.stanford.edu/people/jure/pubs/graphsage-nips17.pdf.
Args: Args:
graph: A pgl graph instance graph: A pgl graph instance
...@@ -84,7 +84,6 @@ def graphsage_sample(graph, nodes, samples, ignore_edges=[]): ...@@ -84,7 +84,6 @@ def graphsage_sample(graph, nodes, samples, ignore_edges=[]):
continue continue
batch_pred_nodes, batch_pred_eids = graph.sample_predecessor( batch_pred_nodes, batch_pred_eids = graph.sample_predecessor(
start_nodes, samples[layer_idx], return_eids=True) start_nodes, samples[layer_idx], return_eids=True)
log.debug("sample_predecessor time: %s" % (time.time() - start))
start = time.time() start = time.time()
last_nodes_set = nodes_set last_nodes_set = nodes_set
...@@ -120,7 +119,6 @@ def graphsage_sample(graph, nodes, samples, ignore_edges=[]): ...@@ -120,7 +119,6 @@ def graphsage_sample(graph, nodes, samples, ignore_edges=[]):
# only for this task # only for this task
subgraphs[i].node_feat["index"] = np.array( subgraphs[i].node_feat["index"] = np.array(
layer_nodes[0], dtype="int64") layer_nodes[0], dtype="int64")
log.debug("subgraph time: %s" % (time.time() - start))
return subgraphs return subgraphs
......
...@@ -70,6 +70,55 @@ class HeterGraphTest(unittest.TestCase): ...@@ -70,6 +70,55 @@ class HeterGraphTest(unittest.TestCase):
self.assertEqual(len(nodes), batch_size) self.assertEqual(len(nodes), batch_size)
self.assertListEqual(list(nodes), ground[idx]) self.assertListEqual(list(nodes), ground[idx])
def test_sample_successor(self):
print()
nodes = [4, 5, 8]
md = 2
succes = self.graph.sample_successor(
edge_type='p2a', nodes=nodes, max_degree=md, return_eids=False)
self.assertIsInstance(succes, list)
ground = [[10, 11, 12, 14, 13], [], [14]]
for succ, g in zip(succes, ground):
self.assertIsInstance(succ, np.ndarray)
for i in succ:
self.assertIn(i, g)
nodes = [4]
succes = self.graph.sample_successor(
edge_type='p2a', nodes=nodes, max_degree=md, return_eids=False)
self.assertIsInstance(succes, list)
ground = [[10, 11, 12, 14, 13]]
for succ, g in zip(succes, ground):
self.assertIsInstance(succ, np.ndarray)
for i in succ:
self.assertIn(i, g)
def test_successor(self):
print()
nodes = [4, 5, 8]
e_type = 'p2a'
succes = self.graph.successor(
edge_type=e_type,
nodes=nodes, )
self.assertIsInstance(succes, np.ndarray)
ground = [[10, 11, 12, 14, 13], [], [14]]
for succ, g in zip(succes, ground):
self.assertIsInstance(succ, np.ndarray)
self.assertCountEqual(succ, g)
nodes = [4]
e_type = 'p2a'
succes = self.graph.successor(
edge_type=e_type,
nodes=nodes, )
self.assertIsInstance(succes, np.ndarray)
ground = [[10, 11, 12, 14, 13]]
for succ, g in zip(succes, ground):
self.assertIsInstance(succ, np.ndarray)
self.assertCountEqual(succ, g)
def test_sample_nodes(self): def test_sample_nodes(self):
print() print()
p_ground = [4, 5, 6, 7, 8, 9] p_ground = [4, 5, 6, 7, 8, 9]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册