未验证 提交 0ee0d33e 编写于 作者: K kirayummy 提交者: GitHub

Merge pull request #3 from PaddlePaddle/master

update
# data and log
/examples/GaAN/dataset/
/examples/GaAN/log/
/examples/GaAN/__pycache__/
/examples/GaAN/params/
/DoorGod
# Virtualenv
/.venv/
/venv/
......
<img src="./docs/source/_static/logo.png" alt="The logo of Paddle Graph Learning (PGL)" width="320">
[![PyPi Latest Release](https://img.shields.io/pypi/v/pgl.svg)](https://pypi.org/project/pgl/)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](./LICENSE)
[DOC](https://pgl.readthedocs.io/en/latest/) | [Quick Start](https://pgl.readthedocs.io/en/latest/quick_start/instruction.html) | [中文](./README.zh.md)
## Breaking News !!
PGL v1.1 2020.4.29
- You can find **ERNIESage**, a novel model for modeling text and graph structures, and its introduction [here](./examples/erniesage/).
- PGL for [Open Graph Benchmark](https://github.com/snap-stanford/ogb) examples can be find [here](./ogb_examples/).
- We add newly graph level operators like **GraphPooling** and [**GraphNormalization**](https://arxiv.org/abs/2003.00982) for graph level predictions.
- We relase a PGL-KE toolkit [here](./examples/pgl-ke) including classical knowledge graph embedding t algorithms like TransE, TransR, RotatE.
------
Paddle Graph Learning (PGL) is an efficient and flexible graph learning framework based on [PaddlePaddle](https://github.com/PaddlePaddle/Paddle).
<img src="./docs/source/_static/framework_of_pgl.png" alt="The Framework of Paddle Graph Learning (PGL)" width="800">
<img src="./docs/source/_static/framework_of_pgl_en.png" alt="The Framework of Paddle Graph Learning (PGL)" width="800">
The newly released PGL supports heterogeneous graph learning on both walk based paradigm and message-passing based paradigm by providing MetaPath sampling and Message Passing mechanism on heterogeneous graph. Furthermor, The newly released PGL also support distributed graph storage and some distributed training algorithms, such as distributed deep walk and distributed graphsage. Combined with the PaddlePaddle deep learning framework, we are able to support both graph representation learning models and graph neural networks, and thus our framework has a wide range of graph-based applications.
......@@ -13,13 +30,13 @@ The newly released PGL supports heterogeneous graph learning on both walk based
## Highlight: Efficiency - Support Scatter-Gather and LodTensor Message Passing
One of the most important benefits of graph neural networks compared to other models is the ability to use node-to-node connectivity information, but coding the communication between nodes is very cumbersome. At PGL we adopt **Message Passing Paradigm** similar to [DGL](https://github.com/dmlc/dgl) to help to build a customize graph neural network easily. Users only need to write ```send``` and ```recv``` functions to easily implement a simple GCN. As shown in the following figure, for the first step the send function is defined on the edges of the graph, and the user can customize the send function ![](http://latex.codecogs.com/gif.latex?\\phi^e}) to send the message from the source to the target node. For the second step, the recv function ![](http://latex.codecogs.com/gif.latex?\\phi^v}) is responsible for aggregating ![](http://latex.codecogs.com/gif.latex?\\oplus}) messages together from different sources.
One of the most important benefits of graph neural networks compared to other models is the ability to use node-to-node connectivity information, but coding the communication between nodes is very cumbersome. At PGL we adopt **Message Passing Paradigm** similar to [DGL](https://github.com/dmlc/dgl) to help to build a customize graph neural network easily. Users only need to write ```send``` and ```recv``` functions to easily implement a simple GCN. As shown in the following figure, for the first step the send function is defined on the edges of the graph, and the user can customize the send function ![](http://latex.codecogs.com/gif.latex?\\phi^e) to send the message from the source to the target node. For the second step, the recv function ![](http://latex.codecogs.com/gif.latex?\\phi^v) is responsible for aggregating ![](http://latex.codecogs.com/gif.latex?\\oplus) messages together from different sources.
<img src="./docs/source/_static/message_passing_paradigm.png" alt="The basic idea of message passing paradigm" width="800">
As shown in the left of the following figure, to adapt general user-defined message aggregate functions, DGL uses the degree bucketing method to combine nodes with the same degree into a batch and then apply an aggregate function ![](http://latex.codecogs.com/gif.latex?\\oplus}) on each batch serially. For our PGL UDF aggregate function, we organize the message as a [LodTensor](http://www.paddlepaddle.org/documentation/docs/en/1.4/user_guides/howto/basic_concept/lod_tensor_en.html) in [PaddlePaddle](https://github.com/PaddlePaddle/Paddle) taking the message as variable length sequences. And we **utilize the features of LodTensor in Paddle to obtain fast parallel aggregation**.
As shown in the left of the following figure, to adapt general user-defined message aggregate functions, DGL uses the degree bucketing method to combine nodes with the same degree into a batch and then apply an aggregate function ![](http://latex.codecogs.com/gif.latex?\\oplus) on each batch serially. For our PGL UDF aggregate function, we organize the message as a [LodTensor](http://www.paddlepaddle.org/documentation/docs/en/1.4/user_guides/howto/basic_concept/lod_tensor_en.html) in [PaddlePaddle](https://github.com/PaddlePaddle/Paddle) taking the message as variable length sequences. And we **utilize the features of LodTensor in Paddle to obtain fast parallel aggregation**.
<img src="./docs/source/_static/parallel_degree_bucketing.png" alt="The parallel degree bucketing of PGL" width="800">
......@@ -82,10 +99,11 @@ In most cases of large-scale graph learning, we need distributed graph storage a
## Model Zoo
The following are 13 graph learning models that have been implemented in the framework. See the details [here](https://pgl.readthedocs.io/en/latest/introduction.html#highlight-tons-of-models)
The following graph learning models have been implemented in the framework. You can find more [examples](./examples) and the [details](https://pgl.readthedocs.io/en/latest/introduction.html#highlight-tons-of-models)
|Model | feature |
|---|---|
| [**ERNIESage**](./examples/erniesage/) | ERNIE SAmple aggreGatE for Text and Graph |
| GCN | Graph Convolutional Neural Networks |
| GAT | Graph Attention Network |
| GraphSage |Large-scale graph convolution network based on neighborhood sampling|
......
<img src="./docs/source/_static/logo.png" alt="The logo of Paddle Graph Learning (PGL)" width="320">
[![PyPi Latest Release](https://img.shields.io/pypi/v/pgl.svg)](https://pypi.org/project/pgl/)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](./LICENSE)
[文档](https://pgl.readthedocs.io/en/latest/) | [快速开始](https://pgl.readthedocs.io/en/latest/quick_start/instruction.html) | [English](./README.md)
## 最新消息
PGL v1.1 2020.4.29
- **ERNIESage**是PGL团队最新提出的模型,可以用于建模文本以及图结构信息。你可以在[这里](./examples/erniesage)看到详细的介绍。
- PGL现在提供[Open Graph Benchmark](https://github.com/snap-stanford/ogb)的一些例子,你可以在[这里](./ogb_examples)找到。
- 新增了图级别的算子包括**GraphPooling**以及[**GraphNormalization**](https://arxiv.org/abs/2003.00982),这样你就能实现更多复杂的图级别分类模型。
- 新增PGL-KE工具包,里面包含许多经典知识图谱图嵌入算法,包括TransE, TransR, RotatE,详情可见[这里](./examples/pgl-ke)
------
Paddle Graph Learning (PGL)是一个基于[PaddlePaddle](https://github.com/PaddlePaddle/Paddle)的高效易用的图学习框架
<img src="./docs/source/_static/framework_of_pgl.png" alt="The Framework of Paddle Graph Learning (PGL)" width="800">
......@@ -12,11 +29,11 @@ Paddle Graph Learning (PGL)是一个基于[PaddlePaddle](https://github.com/Padd
# 特色:高效性——支持Scatter-Gather及LodTensor消息传递
对比于一般的模型,图神经网络模型最大的优势在于它利用了节点与节点之间连接的信息。但是,如何通过代码来实现建模这些节点连接十分的麻烦。PGL采用与[DGL](https://github.com/dmlc/dgl)相似的**消息传递范式**用于作为构建图神经网络的接口。用于只需要简单的编写```send```还有```recv```函数就能够轻松的实现一个简单的GCN网络。如下图所示,首先,send函数被定义在节点之间的边上,用户自定义send函数![](http://latex.codecogs.com/gif.latex?\\phi^e})会把消息从源点发送到目标节点。然后,recv函数![](http://latex.codecogs.com/gif.latex?\\phi^v})负责将这些消息用汇聚函数 ![](http://latex.codecogs.com/gif.latex?\\oplus}) 汇聚起来。
对比于一般的模型,图神经网络模型最大的优势在于它利用了节点与节点之间连接的信息。但是,如何通过代码来实现建模这些节点连接十分的麻烦。PGL采用与[DGL](https://github.com/dmlc/dgl)相似的**消息传递范式**用于作为构建图神经网络的接口。用于只需要简单的编写```send```还有```recv```函数就能够轻松的实现一个简单的GCN网络。如下图所示,首先,send函数被定义在节点之间的边上,用户自定义send函数![](http://latex.codecogs.com/gif.latex?\\phi^e)会把消息从源点发送到目标节点。然后,recv函数![](http://latex.codecogs.com/gif.latex?\\phi^v)负责将这些消息用汇聚函数 ![](http://latex.codecogs.com/gif.latex?\\oplus) 汇聚起来。
<img src="./docs/source/_static/message_passing_paradigm.png" alt="The basic idea of message passing paradigm" width="800">
如下面左图所示,为了去适配用户定义的汇聚函数,DGL使用了Degree Bucketing来将相同度的节点组合在一个块,然后将汇聚函数![](http://latex.codecogs.com/gif.latex?\\oplus})作用在每个块之上。而对于PGL的用户定义汇聚函数,我们则将消息以PaddlePaddle的[LodTensor](http://www.paddlepaddle.org/documentation/docs/en/1.4/user_guides/howto/basic_concept/lod_tensor_en.html)的形式处理,将若干消息看作一组变长的序列,然后利用**LodTensor在PaddlePaddle的特性进行快速平行的消息聚合**
如下面左图所示,为了去适配用户定义的汇聚函数,DGL使用了Degree Bucketing来将相同度的节点组合在一个块,然后将汇聚函数![](http://latex.codecogs.com/gif.latex?\\oplus)作用在每个块之上。而对于PGL的用户定义汇聚函数,我们则将消息以PaddlePaddle的[LodTensor](http://www.paddlepaddle.org/documentation/docs/en/1.4/user_guides/howto/basic_concept/lod_tensor_en.html)的形式处理,将若干消息看作一组变长的序列,然后利用**LodTensor在PaddlePaddle的特性进行快速平行的消息聚合**
<img src="./docs/source/_static/parallel_degree_bucketing.png" alt="The parallel degree bucketing of PGL" width="800">
......@@ -77,10 +94,11 @@ Paddle Graph Learning (PGL)是一个基于[PaddlePaddle](https://github.com/Padd
## 丰富性——覆盖业界大部分图学习网络
下列是框架中已经自带实现的十三种图网络学习模型。详情请参考[这里](https://pgl.readthedocs.io/en/latest/introduction.html#highlight-tons-of-models)
下列是框架中部分已经实现的图网络模型,更多的模型在[这里](./examples)可以找到。详情请参考[这里](https://pgl.readthedocs.io/en/latest/introduction.html#highlight-tons-of-models)
| 模型 | 特点 |
|---|---|
| [**ERNIESage**](./examples/erniesage/) | 能同时建模文本以及图结构的ERNIE SAmple aggreGatE |
| GCN | 图卷积网络 |
| GAT | 基于Attention的图卷积网络 |
| GraphSage | 基于邻居采样的大规模图卷积网络 |
......
docs/source/_static/logo.png

45.2 KB | W: | H:

docs/source/_static/logo.png

487.0 KB | W: | H:

docs/source/_static/logo.png
docs/source/_static/logo.png
docs/source/_static/logo.png
docs/source/_static/logo.png
  • 2-up
  • Swipe
  • Onion skin
pgl.contrib.heter\_graph module: Heterogenous Graph Storage
pgl.heter\_graph module: Heterogenous Graph Storage
===============================
.. automodule:: pgl.contrib.heter_graph
.. automodule:: pgl.heter_graph
:members:
:undoc-members:
:show-inheritance:
pgl.contrib.heter\_graph\_wrapper module: Heterogenous Graph data holders for Paddle GNN.
pgl.heter\_graph\_wrapper module: Heterogenous Graph data holders for Paddle GNN.
=========================
.. automodule:: pgl.contrib.heter_graph_wrapper
.. automodule:: pgl.heter_graph_wrapper
:members:
:undoc-members:
:show-inheritance:
......@@ -9,5 +9,5 @@ API Reference
pgl.data_loader
pgl.utils.paddle_helper
pgl.utils.mp_reader
pgl.contrib.heter_graph
pgl.contrib.heter_graph_wrapper
pgl.heter_graph
pgl.heter_graph_wrapper
......@@ -19,8 +19,8 @@ def build_graph():
# Each node can be represented by a d-dimensional feature vector, here for simple, the feature vectors are randomly generated.
d = 16
feature = np.random.randn(num_node, d).astype("float32")
# each edge also can be represented by a feature vector
edge_feature = np.random.randn(len(edge_list), d).astype("float32")
# each edge has it own weight
edge_feature = np.random.randn(len(edge_list), 1).astype("float32")
# create a graph
g = graph.Graph(num_nodes = num_node,
......@@ -53,7 +53,6 @@ place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
# use GraphWrapper as a container for graph data to construct a graph neural network
gw = pgl.graph_wrapper.GraphWrapper(name='graph',
place = place,
node_feat=g.node_feat_info())
```
......@@ -67,13 +66,13 @@ In this tutorial, we use a simple Graph Convolutional Network(GCN) developed by
In PGL, we can easily implement a GCN layer as follows:
```python
# define GCN layer function
def gcn_layer(gw, feature, hidden_size, name, activation):
def gcn_layer(gw, nfeat, efeat, hidden_size, name, activation):
# gw is a GraphWrapper;feature is the feature vectors of nodes
# define message function
def send_func(src_feat, dst_feat, edge_feat):
# In this tutorial, we return the feature vector of the source node as message
return src_feat['h']
return src_feat['h'] * edge_feat['e']
# define reduce function
def recv_func(feat):
......@@ -81,7 +80,7 @@ def gcn_layer(gw, feature, hidden_size, name, activation):
return fluid.layers.sequence_pool(feat, pool_type='sum')
# trigger message to passing
msg = gw.send(send_func, nfeat_list=[('h', feature)])
msg = gw.send(send_func, nfeat_list=[('h', nfeat)], efeat_list=[('e', efeat)])
# recv funciton receives message and trigger reduce funcition to handle message
output = gw.recv(msg, recv_func)
output = fluid.layers.fc(output,
......@@ -93,10 +92,10 @@ def gcn_layer(gw, feature, hidden_size, name, activation):
```
After defining the GCN layer, we can construct a deeper GCN model with two GCN layers.
```python
output = gcn_layer(gw, gw.node_feat['feature'],
output = gcn_layer(gw, gw.node_feat['feature'], gw.edge_feat['edge_feature'],
hidden_size=8, name='gcn_layer_1', activation='relu')
output = gcn_layer(gw, output, hidden_size=1,
name='gcn_layer_2', activation=None)
output = gcn_layer(gw, output, gw.edge_feat['edge_feature'],
hidden_size=1, name='gcn_layer_2', activation=None)
```
## Step 3: data preprocessing
......
......@@ -58,8 +58,8 @@ Now, we can build a heterogenous graph by using PGL.
import paddle.fluid as fluid
import paddle.fluid.layers as fl
import pgl
from pgl.contrib import heter_graph
from pgl.contrib import heter_graph_wrapper
from pgl import heter_graph
from pgl import heter_graph_wrapper
g = heter_graph.HeterGraph(num_nodes=num_nodes,
edges=edges,
......@@ -77,7 +77,6 @@ place = fluid.CPUPlace()
# create a GraphWrapper as a container for graph data
gw = heter_graph_wrapper.HeterGraphWrapper(name='heter_graph',
place = place,
edge_types = g.edge_types_info(),
node_feat=g.node_feat_info(),
edge_feat=g.edge_feat_info())
......@@ -161,8 +160,3 @@ for epoch in range(30):
train_loss = exe.run(fluid.default_main_program(), feed=feed_dict, fetch_list=[loss], return_numpy=True)
print('Epoch %d | Loss: %f'%(epoch, train_loss[0]))
```
......@@ -53,7 +53,6 @@ class GATNE(object):
self.gw = heter_graph_wrapper.HeterGraphWrapper(
name="heter_graph",
place=place,
edge_types=self.graph.edge_types_info(),
node_feat=self.graph.node_feat_info(),
edge_feat=self.graph.edge_feat_info())
......
# GaAN: Gated Attention Networks for Learning on Large and Spatiotemporal Graphs
[GaAN](https://arxiv.org/abs/1803.07294) is a powerful neural network designed for machine learning on graph. It introduces an gated attention mechanism. Based on PGL, we reproduce the GaAN algorithm and train the model on [ogbn-proteins](https://ogb.stanford.edu/docs/nodeprop/#ogbn-proteins).
## Datasets
The ogbn-proteins dataset will be downloaded in directory ./dataset automatically.
## Dependencies
- [paddlepaddle >= 1.6](https://github.com/paddlepaddle/paddle)
- [pgl 1.1](https://github.com/PaddlePaddle/PGL)
- [ogb 1.1.1](https://github.com/snap-stanford/ogb)
## How to run
```bash
python train.py --lr 1e-2 --rc 0 --batch_size 1024 --epochs 100
```
or
```bash
source main.sh
```
### Hyperparameters
- use_gpu: whether to use gpu or not
- mini_data: use a small dataset to test code
- epochs: number of training epochs
- lr: learning rate
- rc: regularization coefficient
- log_path: the path of log
- batch_size: the number of batch size
- heads: the number of heads of attention
- hidden_size_a: the size of query and key vectors
- hidden_size_v: the size of value vectors
- hidden_size_m: the size of projection space for computing gates
- hidden_size_o: the size of output of GaAN layer
## Performance
We train our models for 100 epochs and report the **rocauc** on the test dataset.
|dataset|mean|std|#experiments|
|-|-|-|-|
|ogbn-proteins|0.7803|0.0073|10|
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This package implements common layers to help building
graph neural networks.
"""
import paddle.fluid as fluid
from pgl import graph_wrapper
from pgl.utils import paddle_helper
__all__ = ['gcn', 'gat', 'gin', 'gaan']
def gcn(gw, feature, hidden_size, activation, name, norm=None):
"""Implementation of graph convolutional neural networks (GCN)
This is an implementation of the paper SEMI-SUPERVISED CLASSIFICATION
WITH GRAPH CONVOLUTIONAL NETWORKS (https://arxiv.org/pdf/1609.02907.pdf).
Args:
gw: Graph wrapper object (:code:`StaticGraphWrapper` or :code:`GraphWrapper`)
feature: A tensor with shape (num_nodes, feature_size).
hidden_size: The hidden size for gcn.
activation: The activation for the output.
name: Gcn layer names.
norm: If :code:`norm` is not None, then the feature will be normalized. Norm must
be tensor with shape (num_nodes,) and dtype float32.
Return:
A tensor with shape (num_nodes, hidden_size)
"""
def send_src_copy(src_feat, dst_feat, edge_feat):
return src_feat["h"]
size = feature.shape[-1]
if size > hidden_size:
feature = fluid.layers.fc(feature,
size=hidden_size,
bias_attr=False,
param_attr=fluid.ParamAttr(name=name))
if norm is not None:
feature = feature * norm
msg = gw.send(send_src_copy, nfeat_list=[("h", feature)])
if size > hidden_size:
output = gw.recv(msg, "sum")
else:
output = gw.recv(msg, "sum")
output = fluid.layers.fc(output,
size=hidden_size,
bias_attr=False,
param_attr=fluid.ParamAttr(name=name))
if norm is not None:
output = output * norm
bias = fluid.layers.create_parameter(
shape=[hidden_size],
dtype='float32',
is_bias=True,
name=name + '_bias')
output = fluid.layers.elementwise_add(output, bias, act=activation)
return output
def gat(gw,
feature,
hidden_size,
activation,
name,
num_heads=8,
feat_drop=0.6,
attn_drop=0.6,
is_test=False):
"""Implementation of graph attention networks (GAT)
This is an implementation of the paper GRAPH ATTENTION NETWORKS
(https://arxiv.org/abs/1710.10903).
Args:
gw: Graph wrapper object (:code:`StaticGraphWrapper` or :code:`GraphWrapper`)
feature: A tensor with shape (num_nodes, feature_size).
hidden_size: The hidden size for gat.
activation: The activation for the output.
name: Gat layer names.
num_heads: The head number in gat.
feat_drop: Dropout rate for feature.
attn_drop: Dropout rate for attention.
is_test: Whether in test phrase.
Return:
A tensor with shape (num_nodes, hidden_size * num_heads)
"""
def send_attention(src_feat, dst_feat, edge_feat):
output = src_feat["left_a"] + dst_feat["right_a"]
output = fluid.layers.leaky_relu(
output, alpha=0.2) # (num_edges, num_heads)
return {"alpha": output, "h": src_feat["h"]}
def reduce_attention(msg):
alpha = msg["alpha"] # lod-tensor (batch_size, seq_len, num_heads)
h = msg["h"]
alpha = paddle_helper.sequence_softmax(alpha)
old_h = h
h = fluid.layers.reshape(h, [-1, num_heads, hidden_size])
alpha = fluid.layers.reshape(alpha, [-1, num_heads, 1])
if attn_drop > 1e-15:
alpha = fluid.layers.dropout(
alpha,
dropout_prob=attn_drop,
is_test=is_test,
dropout_implementation="upscale_in_train")
h = h * alpha
h = fluid.layers.reshape(h, [-1, num_heads * hidden_size])
h = fluid.layers.lod_reset(h, old_h)
return fluid.layers.sequence_pool(h, "sum")
if feat_drop > 1e-15:
feature = fluid.layers.dropout(
feature,
dropout_prob=feat_drop,
is_test=is_test,
dropout_implementation='upscale_in_train')
ft = fluid.layers.fc(feature,
hidden_size * num_heads,
bias_attr=False,
param_attr=fluid.ParamAttr(name=name + '_weight'))
left_a = fluid.layers.create_parameter(
shape=[num_heads, hidden_size],
dtype='float32',
name=name + '_gat_l_A')
right_a = fluid.layers.create_parameter(
shape=[num_heads, hidden_size],
dtype='float32',
name=name + '_gat_r_A')
reshape_ft = fluid.layers.reshape(ft, [-1, num_heads, hidden_size])
left_a_value = fluid.layers.reduce_sum(reshape_ft * left_a, -1)
right_a_value = fluid.layers.reduce_sum(reshape_ft * right_a, -1)
msg = gw.send(
send_attention,
nfeat_list=[("h", ft), ("left_a", left_a_value),
("right_a", right_a_value)])
output = gw.recv(msg, reduce_attention)
bias = fluid.layers.create_parameter(
shape=[hidden_size * num_heads],
dtype='float32',
is_bias=True,
name=name + '_bias')
bias.stop_gradient = True
output = fluid.layers.elementwise_add(output, bias, act=activation)
return output
def gin(gw,
feature,
hidden_size,
activation,
name,
init_eps=0.0,
train_eps=False):
"""Implementation of Graph Isomorphism Network (GIN) layer.
This is an implementation of the paper How Powerful are Graph Neural Networks?
(https://arxiv.org/pdf/1810.00826.pdf).
In their implementation, all MLPs have 2 layers. Batch normalization is applied
on every hidden layer.
Args:
gw: Graph wrapper object (:code:`StaticGraphWrapper` or :code:`GraphWrapper`)
feature: A tensor with shape (num_nodes, feature_size).
name: GIN layer names.
hidden_size: The hidden size for gin.
activation: The activation for the output.
init_eps: float, optional
Initial :math:`\epsilon` value, default is 0.
train_eps: bool, optional
if True, :math:`\epsilon` will be a learnable parameter.
Return:
A tensor with shape (num_nodes, hidden_size).
"""
def send_src_copy(src_feat, dst_feat, edge_feat):
return src_feat["h"]
epsilon = fluid.layers.create_parameter(
shape=[1, 1],
dtype="float32",
attr=fluid.ParamAttr(name="%s_eps" % name),
default_initializer=fluid.initializer.ConstantInitializer(
value=init_eps))
if not train_eps:
epsilon.stop_gradient = True
msg = gw.send(send_src_copy, nfeat_list=[("h", feature)])
output = gw.recv(msg, "sum") + feature * (epsilon + 1.0)
output = fluid.layers.fc(output,
size=hidden_size,
act=None,
param_attr=fluid.ParamAttr(name="%s_w_0" % name),
bias_attr=fluid.ParamAttr(name="%s_b_0" % name))
output = fluid.layers.layer_norm(
output,
begin_norm_axis=1,
param_attr=fluid.ParamAttr(
name="norm_scale_%s" % (name),
initializer=fluid.initializer.Constant(1.0)),
bias_attr=fluid.ParamAttr(
name="norm_bias_%s" % (name),
initializer=fluid.initializer.Constant(0.0)), )
if activation is not None:
output = getattr(fluid.layers, activation)(output)
output = fluid.layers.fc(output,
size=hidden_size,
act=activation,
param_attr=fluid.ParamAttr(name="%s_w_1" % name),
bias_attr=fluid.ParamAttr(name="%s_b_1" % name))
return output
def gaan(gw, feature, hidden_size_a, hidden_size_v, hidden_size_m, hidden_size_o, heads, name):
"""Implementation of GaAN"""
def send_func(src_feat, dst_feat, edge_feat):
# attention score of each edge
# E * (M * D1)
feat_query, feat_key = dst_feat['feat_query'], src_feat['feat_key']
# E * M * D1
old = feat_query
feat_query = fluid.layers.reshape(feat_query, [-1, heads, hidden_size_a])
feat_key = fluid.layers.reshape(feat_key, [-1, heads, hidden_size_a])
# E * M
alpha = fluid.layers.reduce_sum(feat_key * feat_query, dim=-1)
return {'dst_node_feat': dst_feat['node_feat'],
'src_node_feat': src_feat['node_feat'],
'feat_value': src_feat['feat_value'],
'alpha': alpha,
'feat_gate': src_feat['feat_gate']}
def recv_func(message):
dst_feat = message['dst_node_feat']
src_feat = message['src_node_feat']
x = fluid.layers.sequence_pool(dst_feat, 'average')
z = fluid.layers.sequence_pool(src_feat, 'average')
feat_gate = message['feat_gate']
g_max = fluid.layers.sequence_pool(feat_gate, 'max')
g = fluid.layers.concat([x, g_max, z], axis=1)
g = fluid.layers.fc(g, heads, bias_attr=False, act="sigmoid")
# softmax
alpha = message['alpha']
alpha = paddle_helper.sequence_softmax(alpha) # E * M
feat_value = message['feat_value'] # E * (M * D2)
old = feat_value
feat_value = fluid.layers.reshape(feat_value, [-1, heads, hidden_size_v]) # E * M * D2
feat_value = fluid.layers.elementwise_mul(feat_value, alpha, axis=0)
feat_value = fluid.layers.reshape(feat_value, [-1, heads*hidden_size_v]) # E * (M * D2)
feat_value = fluid.layers.lod_reset(feat_value, old)
feat_value = fluid.layers.sequence_pool(feat_value, 'sum') # N * (M * D2)
feat_value = fluid.layers.reshape(feat_value, [-1, heads, hidden_size_v]) # N * M * D2
output = fluid.layers.elementwise_mul(feat_value, g, axis=0)
output = fluid.layers.reshape(output, [-1, heads * hidden_size_v]) # N * (M * D2)
output = fluid.layers.concat([x, output], axis=1)
return output
# N * (D1 * M)
feat_key = fluid.layers.fc(feature, hidden_size_a * heads, bias_attr=False,
param_attr=fluid.ParamAttr(name=name + '_project_key'))
# N * (D2 * M)
feat_value = fluid.layers.fc(feature, hidden_size_v * heads, bias_attr=False,
param_attr=fluid.ParamAttr(name=name + '_project_value'))
# N * (D1 * M)
feat_query = fluid.layers.fc(feature, hidden_size_a * heads, bias_attr=False,
param_attr=fluid.ParamAttr(name=name + '_project_query'))
# N * Dm
feat_gate = fluid.layers.fc(feature, hidden_size_m, bias_attr=False,
param_attr=fluid.ParamAttr(name=name + '_project_gate'))
# send stage
message = gw.send(
send_func,
nfeat_list=[('node_feat', feature), ('feat_key', feat_key), ('feat_value', feat_value),
('feat_query', feat_query), ('feat_gate', feat_gate)],
efeat_list=None,
)
# recv stage
output = gw.recv(message, recv_func)
output = fluid.layers.fc(output, hidden_size_o, bias_attr=False,
param_attr=fluid.ParamAttr(name=name + '_project_output'))
output = fluid.layers.leaky_relu(output, alpha=0.1)
output = fluid.layers.dropout(output, dropout_prob=0.1)
return output
python3 train.py --epochs 100 --lr 1e-2 --rc 0 --batch_size 1024 --gpu_id 0 --exp_id 0
\ No newline at end of file
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle import fluid
from pgl.utils import paddle_helper
# from pgl.layers import gaan
from conv import gaan
class GaANModel(object):
def __init__(self, num_class, num_layers, hidden_size_a=24,
hidden_size_v=32, hidden_size_m=64, hidden_size_o=128,
heads=8, act='relu', name="GaAN"):
self.num_class = num_class
self.num_layers = num_layers
self.hidden_size_a = hidden_size_a
self.hidden_size_v = hidden_size_v
self.hidden_size_m = hidden_size_m
self.hidden_size_o = hidden_size_o
self.act = act
self.name = name
self.heads = heads
def forward(self, gw):
feature = gw.node_feat['node_feat']
for i in range(self.num_layers):
feature = gaan(gw, feature, self.hidden_size_a, self.hidden_size_v,
self.hidden_size_m, self.hidden_size_o, self.heads,
self.name+'_'+str(i))
pred = fluid.layers.fc(
feature, self.num_class, act=None, name=self.name + "_pred_output")
return pred
\ No newline at end of file
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
from ogb.nodeproppred import NodePropPredDataset, Evaluator
import pgl
import numpy as np
import os
import time
def get_graph_data(d_name="ogbn-proteins", mini_data=False):
"""
Param:
d_name: name of dataset
mini_data: if mini_data==True, only use a small dataset (for test)
"""
# import ogb data
dataset = NodePropPredDataset(name = d_name)
num_tasks = dataset.num_tasks # obtaining the number of prediction tasks in a dataset
split_idx = dataset.get_idx_split()
train_idx, valid_idx, test_idx = split_idx["train"], split_idx["valid"], split_idx["test"]
graph, label = dataset[0]
# reshape
graph["edge_index"] = graph["edge_index"].T
# mini dataset
if mini_data:
graph['num_nodes'] = 500
mask = (graph['edge_index'][:, 0] < 500)*(graph['edge_index'][:, 1] < 500)
graph["edge_index"] = graph["edge_index"][mask]
graph["edge_feat"] = graph["edge_feat"][mask]
label = label[:500]
train_idx = np.arange(0,400)
valid_idx = np.arange(400,450)
test_idx = np.arange(450,500)
# read/compute node feature
if mini_data:
node_feat_path = './dataset/ogbn_proteins_node_feat_small.npy'
else:
node_feat_path = './dataset/ogbn_proteins_node_feat.npy'
new_node_feat = None
if os.path.exists(node_feat_path):
print("Begin: read node feature".center(50, '='))
new_node_feat = np.load(node_feat_path)
print("End: read node feature".center(50, '='))
else:
print("Begin: compute node feature".center(50, '='))
start = time.perf_counter()
for i in range(graph['num_nodes']):
if i % 100 == 0:
dur = time.perf_counter() - start
print("{}/{}({}%), times: {:.2f}s".format(
i, graph['num_nodes'], i/graph['num_nodes']*100, dur
))
mask = (graph['edge_index'][:, 0] == i)
current_node_feat = np.mean(np.compress(mask, graph['edge_feat'], axis=0),
axis=0, keepdims=True)
if i == 0:
new_node_feat = [current_node_feat]
else:
new_node_feat.append(current_node_feat)
new_node_feat = np.concatenate(new_node_feat, axis=0)
print("End: compute node feature".center(50,'='))
print("Saving node feature in "+node_feat_path.center(50, '='))
np.save(node_feat_path, new_node_feat)
print("Saving finish".center(50,'='))
print(new_node_feat)
# create graph
g = pgl.graph.Graph(
num_nodes=graph["num_nodes"],
edges = graph["edge_index"],
node_feat = {'node_feat': new_node_feat},
edge_feat = None
)
print("Create graph")
print(g)
return g, label, train_idx, valid_idx, test_idx, Evaluator(d_name)
\ No newline at end of file
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import pickle as pkl
import paddle
import paddle.fluid as fluid
import pgl
import time
from pgl.utils import mp_reader
from pgl.utils.logger import log
import time
import copy
def node_batch_iter(nodes, node_label, batch_size):
"""node_batch_iter
"""
perm = np.arange(len(nodes))
np.random.shuffle(perm)
start = 0
while start < len(nodes):
index = perm[start:start + batch_size]
start += batch_size
yield nodes[index], node_label[index]
def traverse(item):
"""traverse
"""
if isinstance(item, list) or isinstance(item, np.ndarray):
for i in iter(item):
for j in traverse(i):
yield j
else:
yield item
def flat_node_and_edge(nodes):
"""flat_node_and_edge
"""
nodes = list(set(traverse(nodes)))
return nodes
def worker(batch_info, graph, graph_wrapper, samples):
"""Worker
"""
def work():
"""work
"""
_graph_wrapper = copy.copy(graph_wrapper)
_graph_wrapper.node_feat_tensor_dict = {}
for batch_train_samples, batch_train_labels in batch_info:
start_nodes = batch_train_samples
nodes = start_nodes
edges = []
for max_deg in samples:
pred_nodes = graph.sample_predecessor(
start_nodes, max_degree=max_deg)
for dst_node, src_nodes in zip(start_nodes, pred_nodes):
for src_node in src_nodes:
edges.append((src_node, dst_node))
last_nodes = nodes
nodes = [nodes, pred_nodes]
nodes = flat_node_and_edge(nodes)
# Find new nodes
start_nodes = list(set(nodes) - set(last_nodes))
if len(start_nodes) == 0:
break
subgraph = graph.subgraph(
nodes=nodes,
edges=edges,
with_node_feat=True,
with_edge_feat=True)
sub_node_index = subgraph.reindex_from_parrent_nodes(
batch_train_samples)
feed_dict = _graph_wrapper.to_feed(subgraph)
feed_dict["node_label"] = batch_train_labels
feed_dict["node_index"] = sub_node_index
feed_dict["parent_node_index"] = np.array(nodes, dtype="int64")
yield feed_dict
return work
def multiprocess_graph_reader(graph,
graph_wrapper,
samples,
node_index,
batch_size,
node_label,
with_parent_node_index=False,
num_workers=4):
"""multiprocess_graph_reader
"""
def parse_to_subgraph(rd, prefix, node_feat, _with_parent_node_index):
"""parse_to_subgraph
"""
def work():
"""work
"""
for data in rd():
feed_dict = data
for key in node_feat:
feed_dict[prefix + '/node_feat/' + key] = node_feat[key][
feed_dict["parent_node_index"]]
if not _with_parent_node_index:
del feed_dict["parent_node_index"]
yield feed_dict
return work
def reader():
"""reader"""
batch_info = list(
node_batch_iter(
node_index, node_label, batch_size=batch_size))
block_size = int(len(batch_info) / num_workers + 1)
reader_pool = []
for i in range(num_workers):
reader_pool.append(
worker(batch_info[block_size * i:block_size * (i + 1)], graph,
graph_wrapper, samples))
if len(reader_pool) == 1:
r = parse_to_subgraph(reader_pool[0],
repr(graph_wrapper), graph.node_feat,
with_parent_node_index)
else:
multi_process_sample = mp_reader.multiprocess_reader(
reader_pool, use_pipe=True, queue_size=1000)
r = parse_to_subgraph(multi_process_sample,
repr(graph_wrapper), graph.node_feat,
with_parent_node_index)
return paddle.reader.buffered(r, num_workers)
return reader()
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from preprocess import get_graph_data
import pgl
import argparse
import numpy as np
import time
from paddle import fluid
import reader
from train_tool import train_epoch, valid_epoch
from model import GaANModel
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="ogb Training")
parser.add_argument("--d_name", type=str, choices=["ogbn-proteins"], default="ogbn-proteins",
help="the name of dataset in ogb")
parser.add_argument("--model", type=str, choices=["GaAN"], default="GaAN",
help="the name of model")
parser.add_argument("--mini_data", type=str, choices=["True", "False"], default="False",
help="use a small dataset to test the code")
parser.add_argument("--use_gpu", type=bool, choices=[True, False], default=True,
help="use gpu")
parser.add_argument("--gpu_id", type=int, default=0,
help="the id of gpu")
parser.add_argument("--exp_id", type=int, default=0,
help="the id of experiment")
parser.add_argument("--epochs", type=int, default=100,
help="the number of training epochs")
parser.add_argument("--lr", type=float, default=1e-2,
help="learning rate of Adam")
parser.add_argument("--rc", type=float, default=0,
help="regularization coefficient")
parser.add_argument("--log_path", type=str, default="./log",
help="the path of log")
parser.add_argument("--batch_size", type=int, default=1024,
help="the number of batch size")
parser.add_argument("--heads", type=int, default=8,
help="the number of heads of attention")
parser.add_argument("--hidden_size_a", type=int, default=24,
help="the hidden size of query and key vectors")
parser.add_argument("--hidden_size_v", type=int, default=32,
help="the hidden size of value vectors")
parser.add_argument("--hidden_size_m", type=int, default=64,
help="the hidden size of projection for computing gates")
parser.add_argument("--hidden_size_o", type=int ,default=128,
help="the hidden size of each layer in GaAN")
args = parser.parse_args()
print("Parameters Setting".center(50, "="))
print("lr = {}, rc = {}, epochs = {}, batch_size = {}".format(args.lr, args.rc, args.epochs,
args.batch_size))
print("Experiment ID: {}".format(args.exp_id).center(50, "="))
print("training in GPU: {}".format(args.gpu_id).center(50, "="))
d_name = args.d_name
# get data
g, label, train_idx, valid_idx, test_idx, evaluator = get_graph_data(d_name=d_name,
mini_data=eval(args.mini_data))
if args.model == "GaAN":
graph_model = GaANModel(112, 3, args.hidden_size_a, args.hidden_size_v, args.hidden_size_m,
args.hidden_size_o, args.heads)
# training
samples = [25, 10] # 2-hop sample size
batch_size = args.batch_size
sample_workers = 1
place = fluid.CUDAPlace(args.gpu_id) if args.use_gpu else fluid.CPUPlace()
train_program = fluid.Program()
startup_program = fluid.Program()
with fluid.program_guard(train_program, startup_program):
gw = pgl.graph_wrapper.GraphWrapper(
name='graph',
place = place,
node_feat=g.node_feat_info(),
edge_feat=g.edge_feat_info()
)
node_index = fluid.layers.data('node_index', shape=[None, 1], dtype="int64",
append_batch_size=False)
node_label = fluid.layers.data('node_label', shape=[None, 112], dtype="float32",
append_batch_size=False)
parent_node_index = fluid.layers.data('parent_node_index', shape=[None, 1], dtype="int64",
append_batch_size=False)
output = graph_model.forward(gw)
output = fluid.layers.gather(output, node_index)
score = fluid.layers.sigmoid(output)
loss = fluid.layers.sigmoid_cross_entropy_with_logits(
x=output, label=node_label)
loss = fluid.layers.mean(loss)
val_program = train_program.clone(for_test=True)
with fluid.program_guard(train_program, startup_program):
lr = args.lr
adam = fluid.optimizer.Adam(
learning_rate=lr,
regularization=fluid.regularizer.L2DecayRegularizer(
regularization_coeff=args.rc))
adam.minimize(loss)
exe = fluid.Executor(place)
exe.run(startup_program)
train_iter = reader.multiprocess_graph_reader(
g,
gw,
samples=samples,
num_workers=sample_workers,
batch_size=batch_size,
with_parent_node_index=True,
node_index=train_idx,
node_label=np.array(label[train_idx], dtype='float32'))
val_iter = reader.multiprocess_graph_reader(
g,
gw,
samples=samples,
num_workers=sample_workers,
batch_size=batch_size,
with_parent_node_index=True,
node_index=valid_idx,
node_label=np.array(label[valid_idx], dtype='float32'))
test_iter = reader.multiprocess_graph_reader(
g,
gw,
samples=samples,
num_workers=sample_workers,
batch_size=batch_size,
with_parent_node_index=True,
node_index=test_idx,
node_label=np.array(label[test_idx], dtype='float32'))
start = time.time()
print("Training Begin".center(50, "="))
best_valid = -1.0
for epoch in range(args.epochs):
start_e = time.time()
train_loss, train_rocauc = train_epoch(
train_iter, program=train_program, exe=exe, loss=loss, score=score,
evaluator=evaluator, epoch=epoch
)
valid_loss, valid_rocauc = valid_epoch(
val_iter, program=val_program, exe=exe, loss=loss, score=score,
evaluator=evaluator, epoch=epoch)
end_e = time.time()
print("Epoch {}: train_loss={:.4},val_loss={:.4}, train_rocauc={:.4}, val_rocauc={:.4}, s/epoch={:.3}".format(
epoch, train_loss, valid_loss, train_rocauc, valid_rocauc, end_e-start_e
))
if valid_rocauc > best_valid:
print("Update: new {}, old {}".format(valid_rocauc, best_valid))
best_valid = valid_rocauc
fluid.io.save_params(executor=exe, dirname='./params/'+str(args.exp_id), main_program=val_program)
print("Test Stage".center(50, "="))
fluid.io.load_params(executor=exe, dirname='./params/'+str(args.exp_id), main_program=val_program)
test_loss, test_rocauc = valid_epoch(
test_iter, program=val_program, exe=exe, loss=loss, score=score,
evaluator=evaluator, epoch=epoch)
end = time.time()
print("test_loss={:.4},test_rocauc={:.4}, Total Time={:.3}".format(
test_loss, test_rocauc, end-start
))
print("End".center(50, "="))
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from pgl.utils.logger import log
def train_epoch(batch_iter, exe, program, loss, score, evaluator, epoch, log_per_step=1):
batch = 0
total_loss = 0.0
total_sample = 0
result = 0
for batch_feed_dict in batch_iter():
batch += 1
batch_loss, y_pred = exe.run(program, fetch_list=[loss, score], feed=batch_feed_dict)
num_samples = len(batch_feed_dict["node_index"])
total_loss += batch_loss * num_samples
total_sample += num_samples
input_dict = {
"y_true": batch_feed_dict["node_label"],
"y_pred": y_pred
}
result += evaluator.eval(input_dict)["rocauc"]
return total_loss.item()/total_sample, result/batch
def valid_epoch(batch_iter, exe, program, loss, score, evaluator, epoch, log_per_step=1):
batch = 0
total_sample = 0
result = 0
total_loss = 0.0
for batch_feed_dict in batch_iter():
batch += 1
batch_loss, y_pred = exe.run(program, fetch_list=[loss, score], feed=batch_feed_dict)
input_dict = {
"y_true": batch_feed_dict["node_label"],
"y_pred": y_pred
}
result += evaluator.eval(input_dict)["rocauc"]
num_samples = len(batch_feed_dict["node_index"])
total_loss += batch_loss * num_samples
total_sample += num_samples
return total_loss.item()/total_sample, result/batch
# Self-Attention Graph Pooling
SAGPool is a graph pooling method based on self-attention. Self-attention uses graph convolution, which allows the pooling method to consider both node features and graph topology. Based on PGL, we implement the SAGPool algorithm and train the model on five datasets.
## Datasets
There are five datasets, including D&D, PROTEINS, NCI1, NCI109 and FRANKENSTEIN. You can download the datasets from [here](https://bj.bcebos.com/paddle-pgl/SAGPool/data.zip), and unzip it directly. The pkl format datasets should be in directory ./data.
## Dependencies
- [paddlepaddle >= 1.8](https://github.com/PaddlePaddle/paddle)
- [pgl 1.1](https://github.com/PaddlePaddle/PGL)
## How to run
```
python main.py --dataset_name DD --learning_rate 0.005 --weight_decay 0.00001
python main.py --dataset_name PROTEINS --learning_rate 0.001 --hidden_size 32 --weight_decay 0.00001
python main.py --dataset_name NCI1 --learning_rate 0.001 --weight_decay 0.00001
python main.py --dataset_name NCI109 --learning_rate 0.0005 --hidden_size 64 --weight_decay 0.0001 --patience 200
python main.py --dataset_name FRANKENSTEIN --learning_rate 0.001 --weight_decay 0.0001
```
## Hyperparameters
- seed: random seed
- batch\_size: the number of batch size
- learning\_rate: learning rate of optimizer
- weight\_decay: the weight decay for L2 regularization
- hidden\_size: the hidden size of gcn
- pooling\_ratio: the pooling ratio of SAGPool
- dropout\_ratio: the number of dropout ratio
- dataset\_name: the name of datasets, including DD, PROTEINS, NCI1, NCI109, FRANKENSTEIN
- epochs: maximum number of epochs
- patience: patience for early stopping
- use\_cuda: whether to use cuda
- save\_model: the name for the best model
## Performance
We evaluate the implemented method for 20 random seeds using 10-fold cross validation, following the same training procedures as in the paper.
| dataset | mean accuracy | standard deviation | mean accuracy(paper) | standard deviation(paper) |
| ------------ | ------------- | ------------------ | -------------------- | ------------------------- |
| DD | 74.4181 | 1.0244 | 76.19 | 0.94 |
| PROTEINS | 72.7858 | 0.6617 | 70.04 | 1.47 |
| NCI1 | 75.781 | 1.2125 | 74.18 | 1.2 |
| NCI109 | 74.3156 | 1.3 | 74.06 | 0.78 |
| FRANKENSTEIN | 60.7826 | 0.629 | 62.57 | 0.6 |
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--seed', type=int, default=777,
help='seed')
parser.add_argument('--batch_size', type=int, default=128,
help='batch size')
parser.add_argument('--learning_rate', type=float, default=0.0005,
help='learning rate')
parser.add_argument('--weight_decay', type=float, default=0.0001,
help='weight decay')
parser.add_argument('--hidden_size', type=int, default=128,
help='gcn hidden size')
parser.add_argument('--pooling_ratio', type=float, default=0.5,
help='pooling ratio of SAGPool')
parser.add_argument('--dropout_ratio', type=float, default=0.5,
help='dropout ratio')
parser.add_argument('--dataset_name', type=str, default='DD',
help='DD/PROTEINS/NCI1/NCI109/FRANKENSTEIN')
parser.add_argument('--epochs', type=int, default=100000,
help='maximum number of epochs')
parser.add_argument('--patience', type=int, default=50,
help='patience for early stopping')
parser.add_argument('--use_cuda', type=bool, default=True,
help='use cuda or cpu')
parser.add_argument('--save_model', type=str,
help='save model name')
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import os
import random
import pgl
from pgl.utils.logger import log
from pgl.graph import Graph, MultiGraph
import numpy as np
import pickle
class BaseDataset(object):
def __init__(self):
pass
def __getitem__(self, idx):
raise NotImplementedError
def __len__(self):
raise NotImplementedError
class Subset(BaseDataset):
"""Subset of a dataset at specified indices.
Args:
dataset (Dataset): The whole Dataset
indices (sequence): Indices in the whole set selected for subset
"""
def __init__(self, dataset, indices):
self.dataset = dataset
self.indices = indices
def __getitem__(self, idx):
return self.dataset[self.indices[idx]]
def __len__(self):
return len(self.indices)
class Dataset(BaseDataset):
def __init__(self, args):
self.args = args
with open('data/%s.pkl' % args.dataset_name, 'rb') as f:
graphs_info_list = pickle.load(f)
self.pgl_graph_list = []
self.graph_label_list = []
for i in range(len(graphs_info_list) - 1):
graph = graphs_info_list[i]
edges_l, edges_r = graph["edge_src"], graph["edge_dst"]
# add self-loops
if self.args.dataset_name != "FRANKENSTEIN":
num_nodes = graph["num_nodes"]
x = np.arange(0, num_nodes)
edges_l = np.append(edges_l, x)
edges_r = np.append(edges_r, x)
edges = list(zip(edges_l, edges_r))
g = pgl.graph.Graph(num_nodes=graph["num_nodes"], edges=edges)
g.node_feat["feat"] = graph["node_feat"]
self.pgl_graph_list.append(g)
self.graph_label_list.append(graph["label"])
self.num_classes = graphs_info_list[-1]["num_classes"]
self.num_features = graphs_info_list[-1]["num_features"]
def __getitem__(self, idx):
return self.pgl_graph_list[idx], self.graph_label_list[idx]
def shuffle(self):
"""shuffle the dataset.
"""
cc = list(zip(self.pgl_graph_list, self.graph_label_list))
random.seed(self.args.seed)
random.shuffle(cc)
a, b = zip(*cc)
self.pgl_graph_list[:], self.graph_label_list[:] = a, b
def __len__(self):
return len(self.pgl_graph_list)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
import paddle.fluid.layers as L
def norm_gcn(gw, feature, hidden_size, activation, name, norm=None):
"""Implementation of graph convolutional neural networks(GCN), using different
normalization method.
Args:
gw: Graph wrapper object.
feature: A tensor with shape (num_nodes, feature_size).
hidden_size: The hidden size for norm gcn.
activation: The activation for the output.
name: Norm gcn layer names.
norm: If norm is not None, then the feature will be normalized. Norm must
be tensor with shape (num_nodes,) and dtype float32.
Return:
A tensor with shape (num_nodes, hidden_size)
"""
size = feature.shape[-1]
feature = L.fc(feature,
size=hidden_size,
bias_attr=False,
param_attr=fluid.ParamAttr(name=name))
if norm is not None:
src, dst = gw.edges
norm_src = L.gather(norm, src, overwrite=False)
norm_dst = L.gather(norm, dst, overwrite=False)
norm = norm_src * norm_dst
def send_src_copy(src_feat, dst_feat, edge_feat):
return src_feat["h"] * norm
else:
def send_src_copy(src_feat, dst_feat, edge_feat):
return src_feat["h"]
msg = gw.send(send_src_copy, nfeat_list=[("h", feature)])
output = gw.recv(msg, "sum")
bias = L.create_parameter(
shape=[hidden_size],
dtype='float32',
is_bias=True,
name=name + '_bias')
output = L.elementwise_add(output, bias, act=activation)
return output
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import collections
import paddle
import pgl
from pgl.utils.logger import log
from pgl.graph import Graph, MultiGraph
def batch_iter(data, batch_size):
"""node_batch_iter
"""
size = len(data)
perm = np.arange(size)
np.random.shuffle(perm)
start = 0
while start < size:
index = perm[start:start + batch_size]
start += batch_size
yield data[index]
def scan_batch_iter(data, batch_size):
"""scan_batch_iter
"""
batch = []
for example in data.scan():
batch.append(example)
if len(batch) == batch_size:
yield batch
batch = []
if len(batch) > 0:
yield batch
def label_to_onehot(labels):
"""Return one-hot representations of labels
"""
onehot_labels = []
for label in labels:
if label == 0:
onehot_labels.append([1, 0])
else:
onehot_labels.append([0, 1])
onehot_labels = np.array(onehot_labels)
return onehot_labels
class GraphDataloader(object):
"""Graph Dataloader
"""
def __init__(self,
dataset,
graph_wrapper,
batch_size,
seed=0,
buf_size=1000,
shuffle=True):
self.shuffle = shuffle
self.seed = seed
self.batch_size = batch_size
self.dataset = dataset
self.buf_size = buf_size
self.graph_wrapper = graph_wrapper
def batch_fn(self, batch_examples):
""" batch_fun batch producer """
graphs = [b[0] for b in batch_examples]
labels = [b[1] for b in batch_examples]
join_graph = MultiGraph(graphs)
# normalize
indegree = join_graph.indegree()
norm = np.zeros_like(indegree, dtype="float32")
norm[indegree > 0] = np.power(indegree[indegree > 0], -0.5)
join_graph.node_feat["norm"] = np.expand_dims(norm, -1)
feed_dict = self.graph_wrapper.to_feed(join_graph)
labels = np.array(labels)
feed_dict["labels_1dim"] = labels
labels = label_to_onehot(labels)
feed_dict["labels"] = labels
graph_lod = join_graph.graph_lod
graph_id = []
for i in range(1, len(graph_lod)):
graph_node_num = graph_lod[i] - graph_lod[i - 1]
graph_id += [i - 1] * graph_node_num
graph_id = np.array(graph_id, dtype="int32")
feed_dict["graph_id"] = graph_id
return feed_dict
def batch_iter(self):
""" batch_iter """
if self.shuffle:
for batch in batch_iter(self, self.batch_size):
yield batch
else:
for batch in scan_batch_iter(self, self.batch_size):
yield batch
def __len__(self):
"""__len__"""
return len(self.dataset)
def __getitem__(self, idx):
"""__getitem__"""
if isinstance(idx, collections.Iterable):
return [self.dataset[bidx] for bidx in idx]
else:
return self.dataset[idx]
def __iter__(self):
"""__iter__"""
def func_run():
for batch_examples in self.batch_iter():
batch_dict = self.batch_fn(batch_examples)
yield batch_dict
r = paddle.reader.buffered(func_run, self.buf_size)
for batch in r():
yield batch
def scan(self):
"""scan"""
for example in self.dataset:
yield example
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import paddle
import paddle.fluid as fluid
import paddle.fluid.layers as L
import pgl
from pgl.graph_wrapper import GraphWrapper
from pgl.utils.logger import log
from conv import norm_gcn
from pgl.layers.conv import gcn
def topk_pool(gw, score, graph_id, ratio):
"""Implementation of topk pooling, where k means pooling ratio.
Args:
gw: Graph wrapper object.
score: The attention score of all nodes, which is used to select
important nodes.
graph_id: The graphs that the nodes belong to.
ratio: The pooling ratio of nodes we want to select.
Return:
perm: The index of nodes we choose.
ratio_length: The selected node numbers of each graph.
"""
graph_lod = gw.graph_lod
graph_nodes = gw.num_nodes
num_graph = gw.num_graph
num_nodes = L.ones(shape=[graph_nodes], dtype="float32")
num_nodes = L.lod_reset(num_nodes, graph_lod)
num_nodes_per_graph = L.sequence_pool(num_nodes, pool_type='sum')
max_num_nodes = L.reduce_max(num_nodes_per_graph, dim=0)
max_num_nodes = L.cast(max_num_nodes, dtype="int32")
index = L.arange(0, gw.num_nodes, dtype="int64")
offset = L.gather(graph_lod, graph_id, overwrite=False)
index = (index - offset) + (graph_id * max_num_nodes)
index.stop_gradient = True
# padding
dense_score = L.fill_constant(shape=[num_graph * max_num_nodes],
dtype="float32", value=-999999)
index = L.reshape(index, shape=[-1])
dense_score = L.scatter(dense_score, index, updates=score)
num_graph = L.cast(num_graph, dtype="int32")
dense_score = L.reshape(dense_score,
shape=[num_graph, max_num_nodes])
# record the sorted index
_, sort_index = L.argsort(dense_score, axis=-1, descending=True)
# recover the index range
graph_lod = graph_lod[:-1]
graph_lod = L.reshape(graph_lod, shape=[-1, 1])
graph_lod = L.cast(graph_lod, dtype="int64")
sort_index = L.elementwise_add(sort_index, graph_lod, axis=-1)
sort_index = L.reshape(sort_index, shape=[-1, 1])
# use sequence_slice to choose selected node index
pad_lod = L.arange(0, (num_graph + 1) * max_num_nodes, step=max_num_nodes, dtype="int32")
sort_index = L.lod_reset(sort_index, pad_lod)
ratio_length = L.ceil(num_nodes_per_graph * ratio)
ratio_length = L.cast(ratio_length, dtype="int64")
ratio_length = L.reshape(ratio_length, shape=[-1, 1])
offset = L.zeros(shape=[num_graph, 1], dtype="int64")
choose_index = L.sequence_slice(input=sort_index, offset=offset, length=ratio_length)
perm = L.reshape(choose_index, shape=[-1])
return perm, ratio_length
def sag_pool(gw, feature, ratio, graph_id, dataset, name, activation=L.tanh):
"""Implementation of self-attention graph pooling (SAGPool)
This is an implementation of the paper SELF-ATTENTION GRAPH POOLING
(https://arxiv.org/pdf/1904.08082.pdf)
Args:
gw: Graph wrapper object.
feature: A tensor with shape (num_nodes, feature_size).
ratio: The pooling ratio of nodes we want to select.
graph_id: The graphs that the nodes belong to.
dataset: To differentiate FRANKENSTEIN dataset and other datasets.
name: The name of SAGPool layer.
activation: The activation function.
Return:
new_feature: A tensor with shape (num_nodes, feature_size), and the unselected
nodes' feature is masked by zero.
ratio_length: The selected node numbers of each graph.
"""
if dataset == "FRANKENSTEIN":
gcn_ = gcn
else:
gcn_ = norm_gcn
score = gcn_(gw=gw,
feature=feature,
hidden_size=1,
activation=None,
norm=gw.node_feat["norm"],
name=name)
score = L.squeeze(score, axes=[])
perm, ratio_length = topk_pool(gw, score, graph_id, ratio)
mask = L.zeros_like(score)
mask = L.cast(mask, dtype="float32")
updates = L.ones_like(perm)
updates = L.cast(updates, dtype="float32")
mask = L.scatter(mask, perm, updates)
new_feature = L.elementwise_mul(feature, mask, axis=0)
temp_score = activation(score)
new_feature = L.elementwise_mul(new_feature, temp_score, axis=0)
return new_feature, ratio_length
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import os
import argparse
import pgl
from pgl.utils.logger import log
import paddle
import re
import time
import random
import numpy as np
import math
import paddle
import paddle.fluid as fluid
import paddle.fluid.layers as L
import pgl
from pgl.utils.logger import log
from model import GlobalModel
from base_dataset import Subset, Dataset
from dataloader import GraphDataloader
from args import parser
import warnings
from sklearn.model_selection import KFold
warnings.filterwarnings("ignore")
def main(args, train_dataset, val_dataset, test_dataset):
"""main function for running one testing results.
"""
log.info("Train Examples: %s" % len(train_dataset))
log.info("Val Examples: %s" % len(val_dataset))
log.info("Test Examples: %s" % len(test_dataset))
train_program = fluid.Program()
train_program.random_seed = args.seed
startup_program = fluid.Program()
startup_program.random_seed = args.seed
if args.use_cuda:
place = fluid.CUDAPlace(0)
else:
place = fluid.CPUPlace()
exe = fluid.Executor(place)
log.info("building model")
with fluid.program_guard(train_program, startup_program):
with fluid.unique_name.guard():
graph_model = GlobalModel(args, dataset)
train_loader = GraphDataloader(train_dataset,
graph_model.graph_wrapper,
batch_size=args.batch_size)
optimizer = fluid.optimizer.Adam(learning_rate=args.learning_rate,
regularization=fluid.regularizer.L2DecayRegularizer(args.weight_decay))
optimizer.minimize(graph_model.loss)
exe.run(startup_program)
test_program = fluid.Program()
test_program = train_program.clone(for_test=True)
val_loader = GraphDataloader(val_dataset,
graph_model.graph_wrapper,
batch_size=args.batch_size,
shuffle=False)
test_loader = GraphDataloader(test_dataset,
graph_model.graph_wrapper,
batch_size=args.batch_size,
shuffle=False)
min_loss = 1e10
global_step = 0
for epoch in range(args.epochs):
for feed_dict in train_loader:
loss, pred = exe.run(train_program,
feed=feed_dict,
fetch_list=[graph_model.loss, graph_model.pred])
log.info("Epoch: %d, global_step: %d, Training loss: %f" \
% (epoch, global_step, loss))
global_step += 1
# validation
valid_loss = 0.
correct = 0.
for feed_dict in val_loader:
valid_loss_, correct_ = exe.run(test_program,
feed=feed_dict,
fetch_list=[graph_model.loss, graph_model.correct])
valid_loss += valid_loss_
correct += correct_
if epoch % 50 == 0:
log.info("Epoch:%d, Validation loss: %f, Validation acc: %f" \
% (epoch, valid_loss, correct / len(val_loader)))
if valid_loss < min_loss:
min_loss = valid_loss
patience = 0
path = "./save/%s" % args.dataset_name
if not os.path.exists(path):
os.makedirs(path)
fluid.save(train_program, "%s/%s" \
% (path, args.save_model))
log.info("Model saved at epoch %d" % epoch)
else:
patience += 1
if patience > args.patience:
break
correct = 0.
new_test_program = fluid.Program()
fluid.load(new_test_program, "./save/%s/%s" \
% (args.dataset_name, args.save_model), exe)
for feed_dict in test_loader:
correct_ = exe.run(test_program,
feed=feed_dict,
fetch_list=[graph_model.correct])
correct += correct_[0]
log.info("Test acc: %f" % (correct / len(test_loader)))
return correct / len(test_loader)
def split_10_cv(dataset, args):
"""10 folds cross validation
"""
dataset.shuffle()
X = np.array([0] * len(dataset))
y = X
kf = KFold(n_splits=10, shuffle=False)
i = 1
test_acc = []
for train_index, test_index in kf.split(X, y):
train_val_dataset = Subset(dataset, train_index)
test_dataset = Subset(dataset, test_index)
train_val_index_range = list(range(0, len(train_val_dataset)))
num_val = int(len(train_val_dataset) / 9)
val_dataset = Subset(train_val_dataset, train_val_index_range[:num_val])
train_dataset = Subset(train_val_dataset, train_val_index_range[num_val:])
log.info("######%d fold of 10-fold cross validation######" % i)
i += 1
test_acc_ = main(args, train_dataset, val_dataset, test_dataset)
test_acc.append(test_acc_)
mean_acc = sum(test_acc) / len(test_acc)
return mean_acc, test_acc
def random_seed_20(args, dataset):
"""run for 20 random seeds
"""
alist = random.sample(range(1,1000),20)
test_acc_fold = []
for seed in alist:
log.info('############ Seed %d ############' % seed)
args.seed = seed
test_acc_fold_, _ = split_10_cv(dataset, args)
log.info('Mean test acc at seed %d: %f' % (seed, test_acc_fold_))
test_acc_fold.append(test_acc_fold_)
mean_acc = sum(test_acc_fold) / len(test_acc_fold)
temp = [(acc - mean_acc) * (acc - mean_acc) for acc in test_acc_fold]
standard_std = math.sqrt(sum(temp) / len(test_acc_fold))
log.info('Final mean test acc using 20 random seeds(mean for 10-fold): %f' % (mean_acc))
log.info('Final standard std using 20 random seeds(mean for 10-fold): %f' % (standard_std))
if __name__ == "__main__":
args = parser.parse_args()
log.info('loading data...')
dataset = Dataset(args)
log.info("preprocess finish.")
args.num_classes = dataset.num_classes
args.num_features = dataset.num_features
random_seed_20(args, dataset)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from random import random
import numpy as np
import paddle
import paddle.fluid as fluid
import paddle.fluid.layers as L
import pgl
from pgl.graph import Graph, MultiGraph
from pgl.graph_wrapper import GraphWrapper
from pgl.utils.logger import log
from pgl.layers.conv import gcn
from layers import sag_pool
from conv import norm_gcn
class GlobalModel(object):
"""Implementation of global pooling architecture with SAGPool.
"""
def __init__(self, args, dataset):
self.args = args
self.dataset = dataset
self.hidden_size = args.hidden_size
self.num_classes = args.num_classes
self.num_features = args.num_features
self.pooling_ratio = args.pooling_ratio
self.dropout_ratio = args.dropout_ratio
self.batch_size = args.batch_size
graph_data = []
g, label = self.dataset[0]
graph_data.append(g)
g, label = self.dataset[1]
graph_data.append(g)
batch_graph = MultiGraph(graph_data)
indegree = batch_graph.indegree()
norm = np.zeros_like(indegree, dtype="float32")
norm[indegree > 0] = np.power(indegree[indegree > 0], -0.5)
batch_graph.node_feat["norm"] = np.expand_dims(norm, -1)
graph_data = batch_graph
self.graph_wrapper = GraphWrapper(
name="graph",
node_feat=graph_data.node_feat_info()
)
self.labels = L.data(
"labels",
shape=[None, self.args.num_classes],
dtype="int32",
append_batch_size=False)
self.labels_1dim = L.data(
"labels_1dim",
shape=[None],
dtype="int32",
append_batch_size=False)
self.graph_id = L.data(
"graph_id",
shape=[None],
dtype="int32",
append_batch_size=False)
if self.args.dataset_name == "FRANKENSTEIN":
self.gcn = gcn
else:
self.gcn = norm_gcn
self.build_model()
def build_model(self):
node_features = self.graph_wrapper.node_feat["feat"]
output = self.gcn(gw=self.graph_wrapper,
feature=node_features,
hidden_size=self.hidden_size,
activation="relu",
norm=self.graph_wrapper.node_feat["norm"],
name="gcn_layer_1")
output1 = output
output = self.gcn(gw=self.graph_wrapper,
feature=output,
hidden_size=self.hidden_size,
activation="relu",
norm=self.graph_wrapper.node_feat["norm"],
name="gcn_layer_2")
output2 = output
output = self.gcn(gw=self.graph_wrapper,
feature=output,
hidden_size=self.hidden_size,
activation="relu",
norm=self.graph_wrapper.node_feat["norm"],
name="gcn_layer_3")
output = L.concat(input=[output1, output2, output], axis=-1)
output, ratio_length = sag_pool(gw=self.graph_wrapper,
feature=output,
ratio=self.pooling_ratio,
graph_id=self.graph_id,
dataset=self.args.dataset_name,
name="sag_pool_1")
output = L.lod_reset(output, self.graph_wrapper.graph_lod)
cat1 = L.sequence_pool(output, "sum")
ratio_length = L.cast(ratio_length, dtype="float32")
cat1 = L.elementwise_div(cat1, ratio_length, axis=-1)
cat2 = L.sequence_pool(output, "max")
output = L.concat(input=[cat2, cat1], axis=-1)
output = L.fc(output, size=self.hidden_size, act="relu")
output = L.dropout(output, dropout_prob=self.dropout_ratio)
output = L.fc(output, size=self.hidden_size // 2, act="relu")
output = L.fc(output, size=self.num_classes, act=None,
param_attr=fluid.ParamAttr(name="final_fc"))
self.labels = L.cast(self.labels, dtype="float32")
loss = L.sigmoid_cross_entropy_with_logits(x=output, label=self.labels)
self.loss = L.mean(loss)
pred = L.sigmoid(output)
self.pred = L.argmax(x=pred, axis=-1)
correct = L.equal(self.pred, self.labels_1dim)
correct = L.cast(correct, dtype="int32")
self.correct = L.reduce_sum(correct)
......@@ -65,7 +65,6 @@ def main(args):
with fluid.program_guard(train_program, startup_program):
gw = pgl.graph_wrapper.GraphWrapper(
name="graph",
place=place,
node_feat=dataset.graph.node_feat_info())
output = pgl.layers.gcn(gw,
......
......@@ -6,54 +6,32 @@ information (e.g., text attributes) to efficiently generate node embeddings for
For purpose of high scalability, we use redis as distribute graph storage solution and training graphsage against redis server.
### Datasets(Quickstart)
The reddit dataset should be downloaded from [reddit_adj.npz](https://drive.google.com/open?id=174vb0Ws7Vxk_QTUtxqTgDHSQ4El4qDHt) and [reddit.npz](https://drive.google.com/open?id=19SphVl_Oe8SJ1r87Hr5a6znx3nJu1F2Jthe). The details for Reddit Dataset can be found [here](https://cs.stanford.edu/people/jure/pubs/graphsage-nips17.pdf).
The reddit dataset should be downloaded from [reddit_adj.npz](https://drive.google.com/open?id=174vb0Ws7Vxk_QTUtxqTgDHSQ4El4qDHt) and [reddit.npz](https://drive.google.com/open?id=19SphVl_Oe8SJ1r87Hr5a6znx3nJu1F2J). The details for Reddit Dataset can be found [here](https://cs.stanford.edu/people/jure/pubs/graphsage-nips17.pdf).
Alternatively, reddit dataset has been preprocessed and packed into docker image, which can be instantly pulled using following commands.
- reddit.npz: https://drive.google.com/open?id=19SphVl_Oe8SJ1r87Hr5a6znx3nJu1F2J
- reddit_adj.npz: https://drive.google.com/open?id=174vb0Ws7Vxk_QTUtxqTgDHSQ4El4qDHt
```sh
docker pull githubutilities/reddit_redis_demo:v0.1
```
Download `reddit.npz` and `reddit_adj.npz` into `data` directory for further preprocessing.
### Dependencies
```txt
- paddlepaddle>=1.6
- pgl
- scipy
- redis==2.10.6
- redis-py-cluster==1.3.6
```sh
pip install -r requirements.txt
```
### How to run
#### 1. Start reddit data service
#### 1. Preprocessing and start reddit data service
```sh
docker run \
--net=host \
-d --rm \
--name reddit_demo \
-it githubutilities/reddit_redis_demo:v0.1 \
/bin/bash -c "/bin/bash ./before_hook.sh && /bin/bash"
docker logs -f `docker ps -aqf "name=reddit_demo"`
pushd ./redis_setup
/bin/bash ./before_hook.sh
popd
```
#### 2. training GraphSAGE model
```sh
python train.py --use_cuda --epoch 10 --graphsage_type graphsage_mean --sample_workers 10
sh ./cloud_run.sh
```
#### Hyperparameters
- epoch: Number of epochs default (10)
- use_cuda: Use gpu if assign use_cuda.
- graphsage_type: We support 4 aggregator types including "graphsage_mean", "graphsage_maxpool", "graphsage_meanpool" and "graphsage_lstm".
- sample_workers: The number of workers for multiprocessing subgraph sample.
- lr: Learning rate.
- batch_size: Batch size.
- samples_1: The max neighbors for the first hop neighbor sampling. (default: 25)
- samples_2: The max neighbors for the second hop neighbor sampling. (default: 10)
- hidden_size: The hidden size of the GraphSAGE models.
#!/bin/bash
set -x
mode=${1}
source ./utils.sh
unset http_proxy https_proxy
source ./local_config
if [ ! -d ${log_dir} ]; then
mkdir ${log_dir}
fi
for((i=0;i<${PADDLE_PSERVERS_NUM};i++))
do
echo "start ps server: ${i}"
echo $log_dir
TRAINING_ROLE="PSERVER" PADDLE_TRAINER_ID=${i} sh job.sh &> $log_dir/pserver.$i.log &
done
sleep 10s
for((j=0;j<${PADDLE_TRAINERS_NUM};j++))
do
echo "start ps work: ${j}"
TRAINING_ROLE="TRAINER" PADDLE_TRAINER_ID=${j} sh job.sh &> $log_dir/worker.$j.log &
done
tail -f $log_dir/worker.0.log
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import time
import os
import math
import numpy as np
import paddle.fluid as F
import paddle.fluid.layers as L
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from pgl.utils.logger import log
from model import GraphsageModel
from utils import load_config
import reader
def init_role():
# reset the place according to role of parameter server
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
paddle_role = role_maker.Role.WORKER
place = F.CPUPlace()
if training_role == "PSERVER":
paddle_role = role_maker.Role.SERVER
# set the fleet runtime environment according to configure
ports = os.getenv("PADDLE_PORT", "6174").split(",")
pserver_ips = os.getenv("PADDLE_PSERVERS").split(",") # ip,ip...
eplist = []
if len(ports) > 1:
# local debug mode, multi port
for port in ports:
eplist.append(':'.join([pserver_ips[0], port]))
else:
# distributed mode, multi ip
for ip in pserver_ips:
eplist.append(':'.join([ip, ports[0]]))
pserver_endpoints = eplist # ip:port,ip:port...
worker_num = int(os.getenv("PADDLE_TRAINERS_NUM", "0"))
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
role = role_maker.UserDefinedRoleMaker(
current_id=trainer_id,
role=paddle_role,
worker_num=worker_num,
server_endpoints=pserver_endpoints)
fleet.init(role)
def optimization(base_lr, loss, optimizer='adam'):
if optimizer == 'sgd':
optimizer = F.optimizer.SGD(base_lr)
elif optimizer == 'adam':
optimizer = F.optimizer.Adam(base_lr, lazy_mode=True)
else:
raise ValueError
log.info('learning rate:%f' % (base_lr))
#create the DistributeTranspiler configure
config = DistributeTranspilerConfig()
config.sync_mode = False
#config.runtime_split_send_recv = False
config.slice_var_up = False
#create the distributed optimizer
optimizer = fleet.distributed_optimizer(optimizer, config)
optimizer.minimize(loss)
def build_complied_prog(train_program, model_loss):
num_threads = int(os.getenv("CPU_NUM", 10))
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", 0))
exec_strategy = F.ExecutionStrategy()
exec_strategy.num_threads = num_threads
#exec_strategy.use_experimental_executor = True
build_strategy = F.BuildStrategy()
build_strategy.enable_inplace = True
#build_strategy.memory_optimize = True
build_strategy.memory_optimize = False
build_strategy.remove_unnecessary_lock = False
if num_threads > 1:
build_strategy.reduce_strategy = F.BuildStrategy.ReduceStrategy.Reduce
compiled_prog = F.compiler.CompiledProgram(
train_program).with_data_parallel(loss_name=model_loss.name)
return compiled_prog
def fake_py_reader(data_iter, num):
def fake_iter():
queue = []
for idx, data in enumerate(data_iter()):
queue.append(data)
if len(queue) == num:
yield queue
queue = []
if len(queue) > 0:
while len(queue) < num:
queue.append(queue[-1])
yield queue
return fake_iter
def train_prog(exe, program, model, pyreader, args):
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
start = time.time()
batch = 0
total_loss = 0.
total_acc = 0.
total_sample = 0
for epoch_idx in range(args.num_epoch):
for step, batch_feed_dict in enumerate(pyreader()):
try:
cpu_time = time.time()
batch += 1
batch_loss, batch_acc = exe.run(
program,
feed=batch_feed_dict,
fetch_list=[model.loss, model.acc])
end = time.time()
if batch % args.log_per_step == 0:
log.info(
"Batch %s Loss %s Acc %s \t Speed(per batch) %.5lf/%.5lf sec"
% (batch, np.mean(batch_loss), np.mean(batch_acc), (end - start) /batch, (end - cpu_time)))
if step % args.steps_per_save == 0:
save_path = args.save_path
if trainer_id == 0:
model_path = os.path.join(save_path, "%s" % step)
fleet.save_persistables(exe, model_path)
except Exception as e:
log.info("Pyreader train error")
log.exception(e)
def main(args):
log.info("start")
worker_num = int(os.getenv("PADDLE_TRAINERS_NUM", "0"))
num_devices = int(os.getenv("CPU_NUM", 10))
model = GraphsageModel(args)
loss = model.forward()
train_iter = reader.get_iter(args, model.graph_wrapper, 'train')
pyreader = fake_py_reader(train_iter, num_devices)
# init fleet
init_role()
optimization(args.lr, loss, args.optimizer)
# init and run server or worker
if fleet.is_server():
fleet.init_server(args.warm_start_from_dir)
fleet.run_server()
if fleet.is_worker():
log.info("start init worker done")
fleet.init_worker()
#just the worker, load the sample
log.info("init worker done")
exe = F.Executor(F.CPUPlace())
exe.run(fleet.startup_program)
log.info("Startup done")
compiled_prog = build_complied_prog(fleet.main_program, loss)
train_prog(exe, compiled_prog, model, pyreader, args)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='metapath2vec')
parser.add_argument("-c", "--config", type=str, default="./config.yaml")
args = parser.parse_args()
config = load_config(args.config)
log.info(config)
main(config)
# model config
hidden_size: 128
num_class: 41
samples: [25, 10]
graphsage_type: "graphsage_mean"
# trainging config
num_epoch: 10
batch_size: 128
num_sample_workers: 10
optimizer: "adam"
lr: 0.01
warm_start_from_dir: null
steps_per_save: 1000
log_per_step: 1
save_path: "./checkpoints"
log_dir: "./logs"
CPU_NUM: 1
#!/bin/bash
set -x
source ./utils.sh
export CPU_NUM=$CPU_NUM
export FLAGS_rpc_deadline=3000000
export FLAGS_communicator_send_queue_size=1
export FLAGS_communicator_min_send_grad_num_before_recv=0
export FLAGS_communicator_max_merge_var_num=1
export FLAGS_communicator_merge_sparse_grad=0
python -u cluster_train.py -c config.yaml
#!/bin/bash
export PADDLE_TRAINERS_NUM=2
export PADDLE_PSERVERS_NUM=2
export PADDLE_PORT=6184,6185
export PADDLE_PSERVERS="127.0.0.1"
......@@ -11,10 +11,22 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
graphsage model.
"""
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
import math
import pgl
import numpy as np
import paddle
import paddle.fluid.layers as L
import paddle.fluid as F
import paddle.fluid as fluid
def copy_send(src_feat, dst_feat, edge_feat):
return src_feat["h"]
......@@ -128,3 +140,87 @@ def graphsage_lstm(gw, feature, hidden_size, act, name):
output = fluid.layers.concat([self_feature, neigh_feature], axis=1)
output = fluid.layers.l2_normalize(output, axis=1)
return output
def build_graph_model(graph_wrapper, num_class, k_hop, graphsage_type,
hidden_size):
node_index = fluid.layers.data(
"node_index", shape=[None], dtype="int64", append_batch_size=False)
node_label = fluid.layers.data(
"node_label", shape=[None, 1], dtype="int64", append_batch_size=False)
#feature = fluid.layers.gather(feature, graph_wrapper.node_feat['feats'])
feature = graph_wrapper.node_feat['feats']
feature.stop_gradient = True
for i in range(k_hop):
if graphsage_type == 'graphsage_mean':
feature = graphsage_mean(
graph_wrapper,
feature,
hidden_size,
act="relu",
name="graphsage_mean_%s" % i)
elif graphsage_type == 'graphsage_meanpool':
feature = graphsage_meanpool(
graph_wrapper,
feature,
hidden_size,
act="relu",
name="graphsage_meanpool_%s" % i)
elif graphsage_type == 'graphsage_maxpool':
feature = graphsage_maxpool(
graph_wrapper,
feature,
hidden_size,
act="relu",
name="graphsage_maxpool_%s" % i)
elif graphsage_type == 'graphsage_lstm':
feature = graphsage_lstm(
graph_wrapper,
feature,
hidden_size,
act="relu",
name="graphsage_maxpool_%s" % i)
else:
raise ValueError("graphsage type %s is not"
" implemented" % graphsage_type)
feature = fluid.layers.gather(feature, node_index)
logits = fluid.layers.fc(feature,
num_class,
act=None,
name='classification_layer')
proba = fluid.layers.softmax(logits)
loss = fluid.layers.softmax_with_cross_entropy(
logits=logits, label=node_label)
loss = fluid.layers.mean(loss)
acc = fluid.layers.accuracy(input=proba, label=node_label, k=1)
return loss, acc
class GraphsageModel(object):
def __init__(self, args):
self.args = args
def forward(self):
args = self.args
graph_wrapper = pgl.graph_wrapper.GraphWrapper(
"sub_graph", node_feat=[('feats', [None, 602], np.dtype('float32'))])
loss, acc = build_graph_model(
graph_wrapper,
num_class=args.num_class,
hidden_size=args.hidden_size,
graphsage_type=args.graphsage_type,
k_hop=len(args.samples))
loss.persistable = True
self.graph_wrapper = graph_wrapper
self.loss = loss
self.acc = acc
return loss
......@@ -11,6 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import numpy as np
import pickle as pkl
import paddle
......@@ -147,3 +149,48 @@ def multiprocess_graph_reader(
return reader()
def load_data():
"""
data from https://github.com/matenure/FastGCN/issues/8
reddit.npz: https://drive.google.com/open?id=19SphVl_Oe8SJ1r87Hr5a6znx3nJu1F2J
reddit_index_label is preprocess from reddit.npz without feats key.
"""
data_dir = os.path.dirname(os.path.abspath(__file__))
data = np.load(os.path.join(data_dir, "data/reddit_index_label.npz"))
num_class = 41
train_label = data['y_train']
val_label = data['y_val']
test_label = data['y_test']
train_index = data['train_index']
val_index = data['val_index']
test_index = data['test_index']
return {
"train_index": train_index,
"train_label": train_label,
"val_label": val_label,
"val_index": val_index,
"test_index": test_index,
"test_label": test_label,
"num_class": 41
}
def get_iter(args, graph_wrapper, mode):
data = load_data()
train_iter = multiprocess_graph_reader(
graph_wrapper,
samples=args.samples,
num_workers=args.num_sample_workers,
batch_size=args.batch_size,
node_index=data['train_index'],
node_label=data["train_label"])
return train_iter
if __name__ == '__main__':
for e in train_iter():
print(e)
#!/bin/bash
set -x
srcdir=./src
# Data preprocessing
python ./src/preprocess.py
# Download and compile redis
export PATH=$PWD/redis-5.0.5/src:$PATH
if [ ! -f ./redis.tar.gz ]; then
curl https://codeload.github.com/antirez/redis/tar.gz/5.0.5 -o ./redis.tar.gz
fi
tar -xzf ./redis.tar.gz
cd ./redis-5.0.5/
make
cd -
# Install python deps
python -m pip install -U pip
pip install -r ./src/requirements.txt -U
# Run redis server
sh ./src/run_server.sh
# Dumping data into redis
source ./redis_graph.cfg
sh ./src/dump_data.sh $edge_path $server_list $num_nodes $node_feat_path
exit 0
# dump config
edge_path=../data/edge.txt
node_feat_path=../data/feats.npz
num_nodes=232965
server_list=./server.list
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import json
import logging
from collections import defaultdict
import tqdm
import redis
from redis._compat import b, unicode, bytes, long, basestring
from rediscluster.nodemanager import NodeManager
from rediscluster.crc import crc16
import argparse
import time
import pickle
import numpy as np
import scipy.sparse as sp
log = logging.getLogger(__name__)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
root.addHandler(handler)
def encode(value):
"""
Return a bytestring representation of the value.
This method is copied from Redis' connection.py:Connection.encode
"""
if isinstance(value, bytes):
return value
elif isinstance(value, (int, long)):
value = b(str(value))
elif isinstance(value, float):
value = b(repr(value))
elif not isinstance(value, basestring):
value = unicode(value)
if isinstance(value, unicode):
value = value.encode('utf-8')
return value
def crc16_hash(data):
return crc16(encode(data))
def get_redis(startup_host, startup_port):
startup_nodes = [{"host": startup_host, "port": startup_port}, ]
nodemanager = NodeManager(startup_nodes=startup_nodes)
nodemanager.initialize()
rs = {}
for node, config in nodemanager.nodes.items():
rs[node] = redis.Redis(
host=config["host"], port=config["port"], decode_responses=False)
return rs, nodemanager
def load_data(edge_path):
src, dst = [], []
with open(edge_path, "r") as f:
for i in tqdm.tqdm(f):
s, d, _ = i.split()
s = int(s)
d = int(d)
src.append(s)
dst.append(d)
dst.append(s)
src.append(d)
src = np.array(src, dtype="int64")
dst = np.array(dst, dtype="int64")
return src, dst
def build_edge_index(edge_path, num_nodes, startup_host, startup_port,
num_bucket):
#src, dst = load_data(edge_path)
rs, nodemanager = get_redis(startup_host, startup_port)
dst_mp, edge_mp = defaultdict(list), defaultdict(list)
with open(edge_path) as f:
for l in tqdm.tqdm(f):
a, b, idx = l.rstrip().split('\t')
a, b, idx = int(a), int(b), int(idx)
dst_mp[a].append(b)
edge_mp[a].append(idx)
part_dst_dicts = {}
for i in tqdm.tqdm(range(num_nodes)):
#if len(edge_index.v[i]) == 0:
# continue
#v = edge_index.v[i].astype("int64").reshape([-1, 1])
#e = edge_index.eid[i].astype("int64").reshape([-1, 1])
if i not in dst_mp:
continue
v = np.array(dst_mp[i]).astype('int64').reshape([-1, 1])
e = np.array(edge_mp[i]).astype('int64').reshape([-1, 1])
o = np.hstack([v, e])
key = "d:%s" % i
part = crc16_hash(key) % num_bucket
if part not in part_dst_dicts:
part_dst_dicts[part] = {}
dst_dicts = part_dst_dicts[part]
dst_dicts["d:%s" % i] = o.tobytes()
if len(dst_dicts) > 10000:
slot = nodemanager.keyslot("part-%s" % part)
node = nodemanager.slots[slot][0]['name']
while True:
res = rs[node].hmset("part-%s" % part, dst_dicts)
if res:
break
log.info("HMSET FAILED RETRY connected %s" % node)
time.sleep(1)
part_dst_dicts[part] = {}
for part, dst_dicts in part_dst_dicts.items():
if len(dst_dicts) > 0:
slot = nodemanager.keyslot("part-%s" % part)
node = nodemanager.slots[slot][0]['name']
while True:
res = rs[node].hmset("part-%s" % part, dst_dicts)
if res:
break
log.info("HMSET FAILED RETRY connected %s" % node)
time.sleep(1)
part_dst_dicts[part] = {}
log.info("dst_dict Done")
def build_edge_id(edge_path, num_nodes, startup_host, startup_port,
num_bucket):
src, dst = load_data(edge_path)
rs, nodemanager = get_redis(startup_host, startup_port)
part_edge_dict = {}
for i in tqdm.tqdm(range(len(src))):
key = "e:%s" % i
part = crc16_hash(key) % num_bucket
if part not in part_edge_dict:
part_edge_dict[part] = {}
edge_dict = part_edge_dict[part]
edge_dict["e:%s" % i] = int(src[i]) * num_nodes + int(dst[i])
if len(edge_dict) > 10000:
slot = nodemanager.keyslot("part-%s" % part)
node = nodemanager.slots[slot][0]['name']
while True:
res = rs[node].hmset("part-%s" % part, edge_dict)
if res:
break
log.info("HMSET FAILED RETRY connected %s" % node)
time.sleep(1)
part_edge_dict[part] = {}
for part, edge_dict in part_edge_dict.items():
if len(edge_dict) > 0:
slot = nodemanager.keyslot("part-%s" % part)
node = nodemanager.slots[slot][0]['name']
while True:
res = rs[node].hmset("part-%s" % part, edge_dict)
if res:
break
log.info("HMSET FAILED RETRY connected %s" % node)
time.sleep(1)
part_edge_dict[part] = {}
def build_infos(edge_path, num_nodes, startup_host, startup_port, num_bucket):
src, dst = load_data(edge_path)
rs, nodemanager = get_redis(startup_host, startup_port)
slot = nodemanager.keyslot("num_nodes")
node = nodemanager.slots[slot][0]['name']
res = rs[node].set("num_nodes", num_nodes)
slot = nodemanager.keyslot("num_edges")
node = nodemanager.slots[slot][0]['name']
rs[node].set("num_edges", len(src))
slot = nodemanager.keyslot("nf:infos")
node = nodemanager.slots[slot][0]['name']
rs[node].set("nf:infos", json.dumps([['feats', [-1, 602], 'float32'], ]))
slot = nodemanager.keyslot("ef:infos")
node = nodemanager.slots[slot][0]['name']
rs[node].set("ef:infos", json.dumps([]))
def build_node_feat(node_feat_path, num_nodes, startup_host, startup_port, num_bucket):
assert node_feat_path != "", "node_feat_path empty!"
feat_dict = np.load(node_feat_path)
for k in feat_dict.keys():
feat = feat_dict[k]
assert feat.shape[0] == num_nodes, "num_nodes invalid"
rs, nodemanager = get_redis(startup_host, startup_port)
part_feat_dict = {}
for k in feat_dict.keys():
feat = feat_dict[k]
for i in tqdm.tqdm(range(num_nodes)):
key = "nf:%s:%i" % (k, i)
value = feat[i].tobytes()
part = crc16_hash(key) % num_bucket
if part not in part_feat_dict:
part_feat_dict[part] = {}
part_feat = part_feat_dict[part]
part_feat[key] = value
if len(part_feat) > 100:
slot = nodemanager.keyslot("part-%s" % part)
node = nodemanager.slots[slot][0]['name']
while True:
res = rs[node].hmset("part-%s" % part, part_feat)
if res:
break
log.info("HMSET FAILED RETRY connected %s" % node)
time.sleep(1)
part_feat_dict[part] = {}
for part, part_feat in part_feat_dict.items():
if len(part_feat) > 0:
slot = nodemanager.keyslot("part-%s" % part)
node = nodemanager.slots[slot][0]['name']
while True:
res = rs[node].hmset("part-%s" % part, part_feat)
if res:
break
log.info("HMSET FAILED RETRY connected %s" % node)
time.sleep(1)
part_feat_dict[part] = {}
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='gen_redis_conf')
parser.add_argument('--startup_port', type=int, required=True)
parser.add_argument('--startup_host', type=str, required=True)
parser.add_argument('--edge_path', type=str, default="")
parser.add_argument('--node_feat_path', type=str, default="")
parser.add_argument('--num_nodes', type=int, default=0)
parser.add_argument('--num_bucket', type=int, default=64)
parser.add_argument(
'--mode',
type=str,
required=True,
help="choose one of the following modes (clear, edge_index, edge_id, graph_attr)"
)
args = parser.parse_args()
log.info("Mode: {}".format(args.mode))
if args.mode == 'edge_index':
build_edge_index(args.edge_path, args.num_nodes, args.startup_host,
args.startup_port, args.num_bucket)
elif args.mode == 'edge_id':
build_edge_id(args.edge_path, args.num_nodes, args.startup_host,
args.startup_port, args.num_bucket)
elif args.mode == 'graph_attr':
build_infos(args.edge_path, args.num_nodes, args.startup_host,
args.startup_port, args.num_bucket)
elif args.mode == 'node_feat':
build_node_feat(args.node_feat_path, args.num_nodes, args.startup_host,
args.startup_port, args.num_bucket)
else:
raise ValueError("%s mode not found" % args.mode)
filter(){
lines=`cat $1`
rm $1
for line in $lines; do
remote_host=`echo $line | cut -d":" -f1`
remote_port=`echo $line | cut -d":" -f2`
nc -z $remote_host $remote_port
if [[ $? == 0 ]]; then
echo $line >> $1
fi
done
}
dump_data(){
filter $server_list
python ./src/start_cluster.py --server_list $server_list --replicas 0
address=`head -n 1 $server_list`
ip=`echo $address | cut -d":" -f1`
port=`echo $address | cut -d":" -f2`
python ./src/build_graph.py --startup_host $ip \
--startup_port $port \
--mode node_feat \
--node_feat_path $feat_fn \
--num_nodes $num_nodes
# build edge index
python ./src/build_graph.py --startup_host $ip \
--startup_port $port \
--mode edge_index \
--edge_path $edge_path \
--num_nodes $num_nodes
# build edge id
#python ./src/build_graph.py --startup_host $ip \
# --startup_port $port \
# --mode edge_id \
# --edge_path $edge_path \
# --num_nodes $num_nodes
# build graph attr
python ./src/build_graph.py --startup_host $ip \
--startup_port $port \
--mode graph_attr \
--edge_path $edge_path \
--num_nodes $num_nodes
}
if [ $# -ne 4 ]; then
echo 'sh edge_path server_list num_nodes feat_fn'
exit
fi
num_nodes=$3
server_list=$2
edge_path=$1
feat_fn=$4
dump_data
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import socket
import argparse
import os
temp = """port %s
bind %s
daemonize yes
pidfile /var/run/redis_%s.pid
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 50000
logfile "redis.log"
appendonly yes"""
def gen_config(ports):
if len(ports) == 0:
raise ValueError("No ports")
ip = socket.gethostbyname(socket.gethostname())
print("Generate redis conf")
for port in ports:
try:
os.mkdir("%s" % port)
except:
print("port %s directory already exists" % port)
pass
with open("%s/redis.conf" % port, 'w') as f:
f.write(temp % (port, ip, port))
print("Generate Start Server Scripts")
with open("start_server.sh", "w") as f:
f.write("set -x\n")
for ind, port in enumerate(ports):
f.write("# %s %s start\n" % (ip, port))
if ind > 0:
f.write("cd ..\n")
f.write("cd %s\n" % port)
f.write("redis-server redis.conf\n")
f.write("\n")
print("Generate Stop Server Scripts")
with open("stop_server.sh", "w") as f:
f.write("set -x\n")
for ind, port in enumerate(ports):
f.write("# %s %s shutdown\n" % (ip, port))
f.write("redis-cli -h %s -p %s shutdown\n" % (ip, port))
f.write("\n")
with open("server.list", "w") as f:
for ind, port in enumerate(ports):
f.write("%s:%s\n" % (ip, port))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='gen_redis_conf')
parser.add_argument('--ports', nargs='+', type=int, default=[])
args = parser.parse_args()
gen_config(args.ports)
import os
import sys
import numpy as np
import scipy.sparse as sp
def _load_config(fn):
ret = {}
with open(fn) as f:
for l in f:
if l.strip() == '' or l.startswith('#'):
continue
k, v = l.strip().split('=')
ret[k] = v
return ret
def _prepro(config):
data = np.load("../data/reddit.npz")
adj = sp.load_npz("../data/reddit_adj.npz")
adj = adj.tocoo()
src = adj.row
dst = adj.col
with open(config['edge_path'], 'w') as f:
for idx, e in enumerate(zip(src, dst)):
s, d = e
l = "{}\t{}\t{}\n".format(s, d, idx)
f.write(l)
feats = data['feats'].astype(np.float32)
np.savez(config['node_feat_path'], feats=feats)
if __name__ == '__main__':
config = _load_config('./redis_graph.cfg')
_prepro(config)
numpy
scipy
tqdm
redis==2.10.6
redis-py-cluster==1.3.6
start_server(){
ports=""
for i in {7430..7439}; do
nc -z localhost $i
if [[ $? != 0 ]]; then
ports="$ports $i"
fi
done
python ./src/gen_redis_conf.py --ports $ports
bash ./start_server.sh #启动服务器
}
start_server
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import argparse
def build_clusters(server_list, replicas):
servers = []
with open(server_list) as f:
for line in f:
servers.append(line.strip())
cmd = "echo yes | redis-cli --cluster create"
for server in servers:
cmd += ' %s ' % server
cmd += '--cluster-replicas %s' % replicas
print(cmd)
os.system(cmd)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='start_cluster')
parser.add_argument('--server_list', type=str, required=True)
parser.add_argument('--replicas', type=int, default=0)
args = parser.parse_args()
build_clusters(args.server_list, args.replicas)
#!/bin/bash
source ./redis_graph.cfg
url=`head -n1 $server_list`
shuf $edge_path | head -n 1000 | python ./test/test_redis_graph.py $url
#!/usr/bin/env python
# -*- coding: utf-8 -*-
########################################################################
#
# Copyright (c) 2019 Baidu.com, Inc. All Rights Reserved
#
# File: test_redis_graph.py
# Author: suweiyue(suweiyue@baidu.com)
# Date: 2019/08/19 16:28:18
#
########################################################################
"""
Comment.
"""
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
import sys
import numpy as np
import tqdm
from pgl.redis_graph import RedisGraph
if __name__ == '__main__':
host, port = sys.argv[1].split(':')
port = int(port)
redis_configs = [{"host": host, "port": port}, ]
graph = RedisGraph("reddit-graph", redis_configs, num_parts=64)
#nodes = np.arange(0, 100)
#for i in range(0, 100):
for l in tqdm.tqdm(sys.stdin):
l_sp = l.rstrip().split('\t')
if len(l_sp) != 2:
continue
i, j = int(l_sp[0]), int(l_sp[1])
nodes = graph.sample_predecessor(np.array([i]), 10000)
assert j in nodes
pgl==1.1.0
pyyaml
paddlepaddle==1.6.1
scipy
redis==2.10.6
redis-py-cluster==1.3.6
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import argparse
import time
import numpy as np
import scipy.sparse as sp
from sklearn.preprocessing import StandardScaler
import pgl
from pgl.utils.logger import log
from pgl.utils import paddle_helper
import paddle
import paddle.fluid as fluid
import reader
from model import graphsage_mean, graphsage_meanpool,\
graphsage_maxpool, graphsage_lstm
def load_data():
"""
data from https://github.com/matenure/FastGCN/issues/8
reddit.npz: https://drive.google.com/open?id=19SphVl_Oe8SJ1r87Hr5a6znx3nJu1F2J
reddit_index_label is preprocess from reddit.npz without feats key.
"""
data_dir = os.path.dirname(os.path.abspath(__file__))
data = np.load(os.path.join(data_dir, "data/reddit_index_label.npz"))
num_class = 41
train_label = data['y_train']
val_label = data['y_val']
test_label = data['y_test']
train_index = data['train_index']
val_index = data['val_index']
test_index = data['test_index']
return {
"train_index": train_index,
"train_label": train_label,
"val_label": val_label,
"val_index": val_index,
"test_index": test_index,
"test_label": test_label,
"num_class": 41
}
def build_graph_model(graph_wrapper, num_class, k_hop, graphsage_type,
hidden_size):
node_index = fluid.layers.data(
"node_index", shape=[None], dtype="int64", append_batch_size=False)
node_label = fluid.layers.data(
"node_label", shape=[None, 1], dtype="int64", append_batch_size=False)
#feature = fluid.layers.gather(feature, graph_wrapper.node_feat['feats'])
feature = graph_wrapper.node_feat['feats']
feature.stop_gradient = True
for i in range(k_hop):
if graphsage_type == 'graphsage_mean':
feature = graphsage_mean(
graph_wrapper,
feature,
hidden_size,
act="relu",
name="graphsage_mean_%s" % i)
elif graphsage_type == 'graphsage_meanpool':
feature = graphsage_meanpool(
graph_wrapper,
feature,
hidden_size,
act="relu",
name="graphsage_meanpool_%s" % i)
elif graphsage_type == 'graphsage_maxpool':
feature = graphsage_maxpool(
graph_wrapper,
feature,
hidden_size,
act="relu",
name="graphsage_maxpool_%s" % i)
elif graphsage_type == 'graphsage_lstm':
feature = graphsage_lstm(
graph_wrapper,
feature,
hidden_size,
act="relu",
name="graphsage_maxpool_%s" % i)
else:
raise ValueError("graphsage type %s is not"
" implemented" % graphsage_type)
feature = fluid.layers.gather(feature, node_index)
logits = fluid.layers.fc(feature,
num_class,
act=None,
name='classification_layer')
proba = fluid.layers.softmax(logits)
loss = fluid.layers.softmax_with_cross_entropy(
logits=logits, label=node_label)
loss = fluid.layers.mean(loss)
acc = fluid.layers.accuracy(input=proba, label=node_label, k=1)
return loss, acc
def run_epoch(batch_iter,
exe,
program,
prefix,
model_loss,
model_acc,
epoch,
log_per_step=100):
batch = 0
total_loss = 0.
total_acc = 0.
total_sample = 0
start = time.time()
for batch_feed_dict in batch_iter():
batch += 1
batch_loss, batch_acc = exe.run(program,
fetch_list=[model_loss, model_acc],
feed=batch_feed_dict)
if batch % log_per_step == 0:
log.info("Batch %s %s-Loss %s %s-Acc %s" %
(batch, prefix, batch_loss, prefix, batch_acc))
num_samples = len(batch_feed_dict["node_index"])
total_loss += batch_loss * num_samples
total_acc += batch_acc * num_samples
total_sample += num_samples
end = time.time()
log.info("%s Epoch %s Loss %.5lf Acc %.5lf Speed(per batch) %.5lf sec" %
(prefix, epoch, total_loss / total_sample,
total_acc / total_sample, (end - start) / batch))
def main(args):
data = load_data()
log.info("preprocess finish")
log.info("Train Examples: %s" % len(data["train_index"]))
log.info("Val Examples: %s" % len(data["val_index"]))
log.info("Test Examples: %s" % len(data["test_index"]))
place = fluid.CUDAPlace(0) if args.use_cuda else fluid.CPUPlace()
train_program = fluid.Program()
startup_program = fluid.Program()
samples = []
if args.samples_1 > 0:
samples.append(args.samples_1)
if args.samples_2 > 0:
samples.append(args.samples_2)
with fluid.program_guard(train_program, startup_program):
graph_wrapper = pgl.graph_wrapper.GraphWrapper(
"sub_graph", fluid.CPUPlace(), node_feat=[('feats', [None, 602], np.dtype('float32'))])
model_loss, model_acc = build_graph_model(
graph_wrapper,
num_class=data["num_class"],
hidden_size=args.hidden_size,
graphsage_type=args.graphsage_type,
k_hop=len(samples))
test_program = train_program.clone(for_test=True)
with fluid.program_guard(train_program, startup_program):
adam = fluid.optimizer.Adam(learning_rate=args.lr)
adam.minimize(model_loss)
exe = fluid.Executor(place)
exe.run(startup_program)
train_iter = reader.multiprocess_graph_reader(
graph_wrapper,
samples=samples,
num_workers=args.sample_workers,
batch_size=args.batch_size,
node_index=data['train_index'],
node_label=data["train_label"])
val_iter = reader.multiprocess_graph_reader(
graph_wrapper,
samples=samples,
num_workers=args.sample_workers,
batch_size=args.batch_size,
node_index=data['val_index'],
node_label=data["val_label"])
test_iter = reader.multiprocess_graph_reader(
graph_wrapper,
samples=samples,
num_workers=args.sample_workers,
batch_size=args.batch_size,
node_index=data['test_index'],
node_label=data["test_label"])
for epoch in range(args.epoch):
run_epoch(
train_iter,
program=train_program,
exe=exe,
prefix="train",
model_loss=model_loss,
model_acc=model_acc,
log_per_step=1,
epoch=epoch)
run_epoch(
val_iter,
program=test_program,
exe=exe,
prefix="val",
model_loss=model_loss,
model_acc=model_acc,
log_per_step=10000,
epoch=epoch)
run_epoch(
test_iter,
program=test_program,
prefix="test",
exe=exe,
model_loss=model_loss,
model_acc=model_acc,
log_per_step=10000,
epoch=epoch)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='graphsage')
parser.add_argument("--use_cuda", action='store_true', help="use_cuda")
parser.add_argument(
"--normalize", action='store_true', help="normalize features")
parser.add_argument(
"--symmetry", action='store_true', help="undirect graph")
parser.add_argument("--graphsage_type", type=str, default="graphsage_mean")
parser.add_argument("--sample_workers", type=int, default=10)
parser.add_argument("--epoch", type=int, default=10)
parser.add_argument("--hidden_size", type=int, default=128)
parser.add_argument("--batch_size", type=int, default=128)
parser.add_argument("--lr", type=float, default=0.01)
parser.add_argument("--samples_1", type=int, default=25)
parser.add_argument("--samples_2", type=int, default=10)
args = parser.parse_args()
log.info(args)
main(args)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of some helper functions"""
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
import os
import time
import yaml
import numpy as np
from pgl.utils.logger import log
class AttrDict(dict):
"""Attr dict
"""
def __init__(self, d):
self.dict = d
def __getattr__(self, attr):
value = self.dict[attr]
if isinstance(value, dict):
return AttrDict(value)
else:
return value
def __str__(self):
return str(self.dict)
def load_config(config_file):
"""Load config file"""
with open(config_file) as f:
if hasattr(yaml, 'FullLoader'):
config = yaml.load(f, Loader=yaml.FullLoader)
else:
config = yaml.load(f)
return AttrDict(config)
# parse yaml file
function parse_yaml {
local prefix=$2
local s='[[:space:]]*' w='[a-zA-Z0-9_]*' fs=$(echo @|tr @ '\034')
sed -ne "s|^\($s\):|\1|" \
-e "s|^\($s\)\($w\)$s:$s[\"']\(.*\)[\"']$s\$|\1$fs\2$fs\3|p" \
-e "s|^\($s\)\($w\)$s:$s\(.*\)$s\$|\1$fs\2$fs\3|p" $1 |
awk -F$fs '{
indent = length($1)/2;
vname[indent] = $2;
for (i in vname) {if (i > indent) {delete vname[i]}}
if (length($3) > 0) {
vn=""; for (i=0; i<indent; i++) {vn=(vn)(vname[i])("_")}
printf("%s%s%s=\"%s\"\n", "'$prefix'",vn, $2, $3);
}
}'
}
eval $(parse_yaml "$(dirname "${BASH_SOURCE}")"/config.yaml)
# ERNIESage in PGL
[中文版 README](./README.md)
## Introduction
In many industrial applications, there is often a special graph shown below: Text Graph. As the name implies, the node attributes of such graph consist of text, and the edges provide structural information. Take the search scenario for example, nodes can be expressed by search query, web page titles, and web page content, while the edges are constructed by user feedback or hyperlink information.
<img src="./docs/source/_static/text_graph.png" alt="Text Graph" width="800">
**ERNIESage** (abbreviation of ERNIE SAmple aggreGatE), a model proposed by the PGL team, effectively improves the performance on text graph by simultaneously modeling text semantics and graph structure information. It's worth mentioning that [**ERNIE**](https://github.com/PaddlePaddle/ERNIE) in **ERNIESage** is a continual pre-training framework for language understanding launched by Baidu.
**ERNIESage** is an aggregation of ERNIE and GraphSAGE. Its structure is shown in the figure below. The main idea is to use ERNIE as an aggregation function (Aggregators) to model the semantic and structural relationship between its own nodes and neighbor nodes. In addition, for the position-independent characteristics of neighbor nodes, attention mask and independent position embedding mechanism for neighbor blindness are designed.
<img src="./docs/source/_static/ernie_aggregator.png" alt="ERNIESage" width="800">
GraphSAGE with ID feature can only model the graph structure information, while ERNIE can only deal with the text. With the help of PGL, the proposed **ERNIESage** model can combine the advantages of both models. Take the following recommendation example of text graph, we can see that **ERNIESage** achieves the best performance when compared to single ERNIE model or GraphSAGE model.
<img src="./docs/source/_static/ERNIESage_result.png" alt="ERNIESage_result" width="800">
Thanks to the flexibility and usability of PGL, **ERNIESage** can be quickly implemented under PGL's Message Passing paradigm. Acutally, there are four PGL version of ERNIESage:
- **ERNIESage v1**: ERNIE is applied to the NODE of the text graph;
- **ERNIESage v2**: ERNIE is applied to the EDGE of the text graph;
- **ERNIESage v3**: ERNIE is applied to the first order neighbors and center node;
- **ERNIESage v4**: ERNIE is applied to the N-order neighbors and center node.
<img src="./docs/source/_static/ERNIESage_v1_4.png" alt="ERNIESage_v1_4" width="800">
## Dependencies
- paddlepaddle>=1.7
- pgl>=1.1
## Dataformat
In the example data ```data.txt```, part of NLPCC2016-DBQA is used, and the format is "query \t answer" for each line.
```text
NLPCC2016-DBQA is a sub-task of NLPCC-ICCPOL 2016 Shared Task which is hosted by NLPCC(Natural Language Processing and Chinese Computing), this task targets on selecting documents from the candidates to answer the questions. [url: http://tcci.ccf.org.cn/conference/2016/dldoc/evagline2.pdf]
```
## How to run
We adopt [PaddlePaddle Fleet](https://github.com/PaddlePaddle/Fleet) as our distributed training frameworks ```config/*.yaml``` are some example config files for hyperparameters. Among them, the ERNIE model checkpoint ```ckpt_path``` and the vocabulary ```ernie_vocab_file``` can be downloaded on the [ERNIE](https://github.com/PaddlePaddle/ERNIE) page.
```sh
# train ERNIESage in distributed gpu mode.
sh local_run.sh config/enriesage_v1_gpu.yaml
# train ERNIESage in distributed cpu mode.
sh local_run.sh config/enriesage_v1_cpu.yaml
```
## Hyperparamters
- learner_type: `gpu` or `cpu`; gpu use fleet Collective mode, cpu use fleet Transpiler mode.
## Citation
```
@misc{ERNIESage,
author = {PGL Team},
title = {ERNIESage: ERNIE SAmple aggreGatE},
year = {2020},
publisher = {GitHub},
journal = {GitHub repository},
howpublished = {\url{https://github.com/PaddlePaddle/PGL/tree/master/examples/erniesage},
}
```
# 使用PGL实现ERNIESage
[ENG Readme](./README.en.md)
## 背景介绍
在很多工业应用中,往往出现如下图所示的一种特殊的图:Text Graph。顾名思义,图的节点属性由文本构成,而边的构建提供了结构信息。如搜索场景下的Text Graph,节点可由搜索词、网页标题、网页正文来表达,用户反馈和超链信息则可构成边关系。
<img src="./docs/source/_static/text_graph.png" alt="Text Graph" width="800">
**ERNIESage** 由PGL团队提出,是ERNIE SAmple aggreGatE的简称,该模型可以同时建模文本语义与图结构信息,有效提升 Text Graph 的应用效果。其中 [**ERNIE**](https://github.com/PaddlePaddle/ERNIE) 是百度推出的基于知识增强的持续学习语义理解框架。
**ERNIESage** 是 ERNIE 与 GraphSAGE 碰撞的结果,是 ERNIE SAmple aggreGatE 的简称,它的结构如下图所示,主要思想是通过 ERNIE 作为聚合函数(Aggregators),建模自身节点和邻居节点的语义与结构关系。ERNIESage 对于文本的建模是构建在邻居聚合的阶段,中心节点文本会与所有邻居节点文本进行拼接;然后通过预训练的 ERNIE 模型进行消息汇聚,捕捉中心节点以及邻居节点之间的相互关系;最后使用 ERNIESage 搭配独特的邻居互相看不见的 Attention Mask 和独立的 Position Embedding 体系,就可以轻松构建 TextGraph 中句子之间以及词之间的关系。
<img src="./docs/source/_static/ernie_aggregator.png" alt="ERNIESage" width="800">
使用ID特征的GraphSAGE只能够建模图的结构信息,而单独的ERNIE只能处理文本信息。通过PGL搭建的图与文本的桥梁,**ERNIESage**能够很简单的把GraphSAGE以及ERNIE的优点结合一起。以下面TextGraph的场景,**ERNIESage**的效果能够比单独的ERNIE以及GraphSAGE模型都要好。
<img src="./docs/source/_static/ERNIESage_result.png" alt="ERNIESage_result" width="800">
**ERNIESage**可以很轻松地在PGL中的消息传递范式中进行实现,目前PGL提供了4个版本的ERNIESage模型:
- **ERNIESage v1**: ERNIE 作用于text graph节点上;
- **ERNIESage v2**: ERNIE 作用在text graph的边上;
- **ERNIESage v3**: ERNIE 作用于一阶邻居及起边上;
- **ERNIESage v4**: ERNIE 作用于N阶邻居及边上;
<img src="./docs/source/_static/ERNIESage_v1_4.png" alt="ERNIESage_v1_4" width="800">
## 环境依赖
- paddlepaddle>=1.7
- pgl>=1.1
## Dataformat
示例数据```data.txt```中使用了NLPCC2016-DBQA的部分数据,格式为每行"query \t answer"。
```text
NLPCC2016-DBQA 是由国际自然语言处理和中文计算会议 NLPCC 于 2016 年举办的评测任务,其目标是从候选中找到合适的文档作为问题的答案。[链接: http://tcci.ccf.org.cn/conference/2016/dldoc/evagline2.pdf]
```
## How to run
我们采用了[PaddlePaddle Fleet](https://github.com/PaddlePaddle/Fleet)作为我们的分布式训练框架,在```config/*.yaml```中,有部分用于训练ERNIESage的配置, 其中ERNIE模型```ckpt_path```以及词表```ernie_vocab_file```[ERNIE](https://github.com/PaddlePaddle/ERNIE)下载。
```sh
# 分布式GPU模式或单机模式ERNIESage
sh local_run.sh config/erniesage_v2_gpu.yaml
# 分布式CPU模式训练ERNIESage
sh local_run.sh config/erniesage_v2_cpu.yaml
```
## Hyperparamters
- learner_type: `gpu` or `cpu`; gpu 使用fleet Collective 模式, cpu 使用fleet Transpiler 模式.
## Citation
```
@misc{ERNIESage,
author = {PGL Team},
title = {ERNIESage: ERNIE SAmple aggreGatE},
year = {2020},
publisher = {GitHub},
journal = {GitHub repository},
howpublished = {\url{https://github.com/PaddlePaddle/PGL/tree/master/examples/erniesage},
}
```
# Global Enviroment Settings
#
# trainer config ------
learner_type: "cpu"
optimizer_type: "adam"
lr: 0.00005
batch_size: 2
CPU_NUM: 10
epoch: 20
log_per_step: 1
save_per_step: 100
output_path: "./output"
ckpt_path: "./ernie_base_ckpt"
# data config ------
input_data: "./data.txt"
graph_path: "./workdir"
sample_workers: 1
use_pyreader: true
input_type: "text"
# model config ------
samples: [10]
model_type: "ErnieSageModelV1"
layer_type: "graphsage_sum"
max_seqlen: 40
num_layers: 1
hidden_size: 128
final_fc: true
final_l2_norm: true
loss_type: "hinge"
margin: 0.3
# infer config ------
infer_model: "./output/last"
infer_batch_size: 128
# ernie config ------
encoding: "utf8"
ernie_vocab_file: "./vocab.txt"
ernie_config:
attention_probs_dropout_prob: 0.1
hidden_act: "relu"
hidden_dropout_prob: 0.1
hidden_size: 768
initializer_range: 0.02
max_position_embeddings: 513
num_attention_heads: 12
num_hidden_layers: 12
sent_type_vocab_size: 4
task_type_vocab_size: 3
vocab_size: 18000
use_task_id: false
use_fp16: false
# Global Enviroment Settings
#
# trainer config ------
learner_type: "gpu"
optimizer_type: "adam"
lr: 0.00005
batch_size: 32
CPU_NUM: 10
epoch: 20
log_per_step: 1
save_per_step: 100
output_path: "./output"
ckpt_path: "./ernie_base_ckpt"
# data config ------
input_data: "./data.txt"
graph_path: "./workdir"
sample_workers: 1
use_pyreader: true
input_type: "text"
# model config ------
samples: [10]
model_type: "ErnieSageModelV1"
layer_type: "graphsage_sum"
max_seqlen: 40
num_layers: 1
hidden_size: 128
final_fc: true
final_l2_norm: true
loss_type: "hinge"
margin: 0.3
# infer config ------
infer_model: "./output/last"
infer_batch_size: 128
# ernie config ------
encoding: "utf8"
ernie_vocab_file: "./vocab.txt"
ernie_config:
attention_probs_dropout_prob: 0.1
hidden_act: "relu"
hidden_dropout_prob: 0.1
hidden_size: 768
initializer_range: 0.02
max_position_embeddings: 513
num_attention_heads: 12
num_hidden_layers: 12
sent_type_vocab_size: 4
task_type_vocab_size: 3
vocab_size: 18000
use_task_id: false
use_fp16: false
# Global Enviroment Settings
#
# trainer config ------
learner_type: "cpu"
optimizer_type: "adam"
lr: 0.00005
batch_size: 4
CPU_NUM: 16
epoch: 3
log_per_step: 1
save_per_step: 100
output_path: "./output"
ckpt_path: "./ernie_base_ckpt"
# data config ------
input_data: "./data.txt"
graph_path: "./workdir"
sample_workers: 1
use_pyreader: true
input_type: "text"
# model config ------
samples: [10]
model_type: "ErnieSageModelV2"
max_seqlen: 40
num_layers: 1
hidden_size: 128
final_fc: true
final_l2_norm: true
loss_type: "hinge"
margin: 0.3
neg_type: "batch_neg"
# infer config ------
infer_model: "./output/last"
infer_batch_size: 128
# ernie config ------
encoding: "utf8"
ernie_vocab_file: "./vocab.txt"
ernie_config:
attention_probs_dropout_prob: 0.1
hidden_act: "relu"
hidden_dropout_prob: 0.1
hidden_size: 768
initializer_range: 0.02
max_position_embeddings: 513
num_attention_heads: 12
num_hidden_layers: 12
sent_type_vocab_size: 2
task_type_vocab_size: 3
vocab_size: 18000
use_task_id: false
use_fp16: false
# Global Enviroment Settings
#
# trainer config ------
learner_type: "gpu"
optimizer_type: "adam"
lr: 0.00005
batch_size: 32
CPU_NUM: 10
epoch: 3
log_per_step: 10
save_per_step: 1000
output_path: "./output"
ckpt_path: "./ernie_base_ckpt"
# data config ------
input_data: "./data.txt"
graph_path: "./workdir"
sample_workers: 1
use_pyreader: true
input_type: "text"
# model config ------
samples: [10]
model_type: "ErnieSageModelV2"
max_seqlen: 40
num_layers: 1
hidden_size: 128
final_fc: true
final_l2_norm: true
loss_type: "hinge"
margin: 0.3
neg_type: "batch_neg"
# infer config ------
infer_model: "./output/last"
infer_batch_size: 128
# ernie config ------
encoding: "utf8"
ernie_vocab_file: "./vocab.txt"
ernie_config:
attention_probs_dropout_prob: 0.1
hidden_act: "relu"
hidden_dropout_prob: 0.1
hidden_size: 768
initializer_range: 0.02
max_position_embeddings: 513
num_attention_heads: 12
num_hidden_layers: 12
sent_type_vocab_size: 2
task_type_vocab_size: 3
vocab_size: 18000
use_task_id: false
use_fp16: false
# Global Enviroment Settings
#
# trainer config ------
learner_type: "cpu"
optimizer_type: "adam"
lr: 0.00005
batch_size: 2
CPU_NUM: 10
epoch: 20
log_per_step: 1
save_per_step: 100
output_path: "./output"
ckpt_path: "./ernie_base_ckpt"
# data config ------
input_data: "./data.txt"
graph_path: "./workdir"
sample_workers: 1
use_pyreader: true
input_type: "text"
# model config ------
samples: [10]
model_type: "ErnieSageModelV3"
max_seqlen: 40
num_layers: 1
hidden_size: 128
final_fc: true
final_l2_norm: true
loss_type: "hinge"
margin: 0.3
# infer config ------
infer_model: "./output/last"
infer_batch_size: 128
# ernie config ------
encoding: "utf8"
ernie_vocab_file: "./vocab.txt"
ernie_config:
attention_probs_dropout_prob: 0.1
hidden_act: "relu"
hidden_dropout_prob: 0.1
hidden_size: 768
initializer_range: 0.02
max_position_embeddings: 513
num_attention_heads: 12
num_hidden_layers: 12
sent_type_vocab_size: 4
task_type_vocab_size: 3
vocab_size: 18000
use_task_id: false
use_fp16: false
# Global Enviroment Settings
#
# trainer config ------
learner_type: "gpu"
optimizer_type: "adam"
lr: 0.00005
batch_size: 32
CPU_NUM: 10
epoch: 20
log_per_step: 1
save_per_step: 100
output_path: "./output"
ckpt_path: "./ernie_base_ckpt"
# data config ------
input_data: "./data.txt"
graph_path: "./workdir"
sample_workers: 1
use_pyreader: true
input_type: "text"
# model config ------
samples: [10]
model_type: "ErnieSageModelV3"
max_seqlen: 40
num_layers: 1
hidden_size: 128
final_fc: true
final_l2_norm: true
loss_type: "hinge"
margin: 0.3
# infer config ------
infer_model: "./output/last"
infer_batch_size: 128
# ernie config ------
encoding: "utf8"
ernie_vocab_file: "./vocab.txt"
ernie_config:
attention_probs_dropout_prob: 0.1
hidden_act: "relu"
hidden_dropout_prob: 0.1
hidden_size: 768
initializer_range: 0.02
max_position_embeddings: 513
num_attention_heads: 12
num_hidden_layers: 12
sent_type_vocab_size: 4
task_type_vocab_size: 3
vocab_size: 18000
use_task_id: false
use_fp16: false
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
"""Ernie
"""
from models.base import BaseNet, BaseGNNModel
class Ernie(BaseNet):
def build_inputs(self):
inputs = super(Ernie, self).build_inputs()
term_ids = L.data(
"term_ids", shape=[None, self.config.max_seqlen], dtype="int64", append_batch_size=False)
return inputs + [term_ids]
def build_embedding(self, graph_wrappers, term_ids):
term_ids = L.unsqueeze(term_ids, [-1])
ernie_config = self.config.ernie_config
ernie = ErnieModel(
src_ids=term_ids,
sentence_ids=L.zeros_like(term_ids),
task_ids=None,
config=ernie_config,
use_fp16=False,
name="student_")
feature = ernie.get_pooled_output()
return feature
def __call__(self, graph_wrappers):
inputs = self.build_inputs()
feature = self.build_embedding(graph_wrappers, inputs[-1])
features = [feature]
outputs = [self.take_final_feature(features[-1], i, "final_fc") for i in inputs[:-1]]
src_real_index = L.gather(graph_wrappers[0].node_feat['index'], inputs[0])
outputs.append(src_real_index)
return inputs, outputs
class ErnieModel(BaseGNNModel):
def gen_net_fn(self, config):
return Ernie(config)
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -44,7 +44,6 @@ def main(args):
with fluid.program_guard(train_program, startup_program):
gw = pgl.graph_wrapper.GraphWrapper(
name="graph",
place=place,
node_feat=dataset.graph.node_feat_info())
output = pgl.layers.gat(gw,
......
此差异已折叠。
此差异已折叠。
......@@ -204,8 +204,8 @@ def main(args):
graph_wrapper = pgl.graph_wrapper.GraphWrapper(
"sub_graph",
fluid.CPUPlace(),
node_feat=data['graph'].node_feat_info())
model_loss, model_acc = build_graph_model(
graph_wrapper,
num_class=data["num_class"],
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册