未验证 提交 ccf74fa5 编写于 作者: Webbley's avatar Webbley 提交者: GitHub

Merge pull request #2 from PaddlePaddle/master

update from pgl
......@@ -30,13 +30,13 @@ The newly released PGL supports heterogeneous graph learning on both walk based
## Highlight: Efficiency - Support Scatter-Gather and LodTensor Message Passing
One of the most important benefits of graph neural networks compared to other models is the ability to use node-to-node connectivity information, but coding the communication between nodes is very cumbersome. At PGL we adopt **Message Passing Paradigm** similar to [DGL](https://github.com/dmlc/dgl) to help to build a customize graph neural network easily. Users only need to write ```send``` and ```recv``` functions to easily implement a simple GCN. As shown in the following figure, for the first step the send function is defined on the edges of the graph, and the user can customize the send function ![](http://latex.codecogs.com/gif.latex?\\phi^e}) to send the message from the source to the target node. For the second step, the recv function ![](http://latex.codecogs.com/gif.latex?\\phi^v}) is responsible for aggregating ![](http://latex.codecogs.com/gif.latex?\\oplus}) messages together from different sources.
One of the most important benefits of graph neural networks compared to other models is the ability to use node-to-node connectivity information, but coding the communication between nodes is very cumbersome. At PGL we adopt **Message Passing Paradigm** similar to [DGL](https://github.com/dmlc/dgl) to help to build a customize graph neural network easily. Users only need to write ```send``` and ```recv``` functions to easily implement a simple GCN. As shown in the following figure, for the first step the send function is defined on the edges of the graph, and the user can customize the send function ![](http://latex.codecogs.com/gif.latex?\\phi^e) to send the message from the source to the target node. For the second step, the recv function ![](http://latex.codecogs.com/gif.latex?\\phi^v) is responsible for aggregating ![](http://latex.codecogs.com/gif.latex?\\oplus) messages together from different sources.
<img src="./docs/source/_static/message_passing_paradigm.png" alt="The basic idea of message passing paradigm" width="800">
As shown in the left of the following figure, to adapt general user-defined message aggregate functions, DGL uses the degree bucketing method to combine nodes with the same degree into a batch and then apply an aggregate function ![](http://latex.codecogs.com/gif.latex?\\oplus}) on each batch serially. For our PGL UDF aggregate function, we organize the message as a [LodTensor](http://www.paddlepaddle.org/documentation/docs/en/1.4/user_guides/howto/basic_concept/lod_tensor_en.html) in [PaddlePaddle](https://github.com/PaddlePaddle/Paddle) taking the message as variable length sequences. And we **utilize the features of LodTensor in Paddle to obtain fast parallel aggregation**.
As shown in the left of the following figure, to adapt general user-defined message aggregate functions, DGL uses the degree bucketing method to combine nodes with the same degree into a batch and then apply an aggregate function ![](http://latex.codecogs.com/gif.latex?\\oplus) on each batch serially. For our PGL UDF aggregate function, we organize the message as a [LodTensor](http://www.paddlepaddle.org/documentation/docs/en/1.4/user_guides/howto/basic_concept/lod_tensor_en.html) in [PaddlePaddle](https://github.com/PaddlePaddle/Paddle) taking the message as variable length sequences. And we **utilize the features of LodTensor in Paddle to obtain fast parallel aggregation**.
<img src="./docs/source/_static/parallel_degree_bucketing.png" alt="The parallel degree bucketing of PGL" width="800">
......
......@@ -29,11 +29,11 @@ Paddle Graph Learning (PGL)是一个基于[PaddlePaddle](https://github.com/Padd
# 特色:高效性——支持Scatter-Gather及LodTensor消息传递
对比于一般的模型,图神经网络模型最大的优势在于它利用了节点与节点之间连接的信息。但是,如何通过代码来实现建模这些节点连接十分的麻烦。PGL采用与[DGL](https://github.com/dmlc/dgl)相似的**消息传递范式**用于作为构建图神经网络的接口。用于只需要简单的编写```send```还有```recv```函数就能够轻松的实现一个简单的GCN网络。如下图所示,首先,send函数被定义在节点之间的边上,用户自定义send函数![](http://latex.codecogs.com/gif.latex?\\phi^e})会把消息从源点发送到目标节点。然后,recv函数![](http://latex.codecogs.com/gif.latex?\\phi^v})负责将这些消息用汇聚函数 ![](http://latex.codecogs.com/gif.latex?\\oplus}) 汇聚起来。
对比于一般的模型,图神经网络模型最大的优势在于它利用了节点与节点之间连接的信息。但是,如何通过代码来实现建模这些节点连接十分的麻烦。PGL采用与[DGL](https://github.com/dmlc/dgl)相似的**消息传递范式**用于作为构建图神经网络的接口。用于只需要简单的编写```send```还有```recv```函数就能够轻松的实现一个简单的GCN网络。如下图所示,首先,send函数被定义在节点之间的边上,用户自定义send函数![](http://latex.codecogs.com/gif.latex?\\phi^e)会把消息从源点发送到目标节点。然后,recv函数![](http://latex.codecogs.com/gif.latex?\\phi^v)负责将这些消息用汇聚函数 ![](http://latex.codecogs.com/gif.latex?\\oplus) 汇聚起来。
<img src="./docs/source/_static/message_passing_paradigm.png" alt="The basic idea of message passing paradigm" width="800">
如下面左图所示,为了去适配用户定义的汇聚函数,DGL使用了Degree Bucketing来将相同度的节点组合在一个块,然后将汇聚函数![](http://latex.codecogs.com/gif.latex?\\oplus})作用在每个块之上。而对于PGL的用户定义汇聚函数,我们则将消息以PaddlePaddle的[LodTensor](http://www.paddlepaddle.org/documentation/docs/en/1.4/user_guides/howto/basic_concept/lod_tensor_en.html)的形式处理,将若干消息看作一组变长的序列,然后利用**LodTensor在PaddlePaddle的特性进行快速平行的消息聚合**
如下面左图所示,为了去适配用户定义的汇聚函数,DGL使用了Degree Bucketing来将相同度的节点组合在一个块,然后将汇聚函数![](http://latex.codecogs.com/gif.latex?\\oplus)作用在每个块之上。而对于PGL的用户定义汇聚函数,我们则将消息以PaddlePaddle的[LodTensor](http://www.paddlepaddle.org/documentation/docs/en/1.4/user_guides/howto/basic_concept/lod_tensor_en.html)的形式处理,将若干消息看作一组变长的序列,然后利用**LodTensor在PaddlePaddle的特性进行快速平行的消息聚合**
<img src="./docs/source/_static/parallel_degree_bucketing.png" alt="The parallel degree bucketing of PGL" width="800">
......
......@@ -6,54 +6,32 @@ information (e.g., text attributes) to efficiently generate node embeddings for
For purpose of high scalability, we use redis as distribute graph storage solution and training graphsage against redis server.
### Datasets(Quickstart)
The reddit dataset should be downloaded from [reddit_adj.npz](https://drive.google.com/open?id=174vb0Ws7Vxk_QTUtxqTgDHSQ4El4qDHt) and [reddit.npz](https://drive.google.com/open?id=19SphVl_Oe8SJ1r87Hr5a6znx3nJu1F2Jthe). The details for Reddit Dataset can be found [here](https://cs.stanford.edu/people/jure/pubs/graphsage-nips17.pdf).
The reddit dataset should be downloaded from [reddit_adj.npz](https://drive.google.com/open?id=174vb0Ws7Vxk_QTUtxqTgDHSQ4El4qDHt) and [reddit.npz](https://drive.google.com/open?id=19SphVl_Oe8SJ1r87Hr5a6znx3nJu1F2J). The details for Reddit Dataset can be found [here](https://cs.stanford.edu/people/jure/pubs/graphsage-nips17.pdf).
Alternatively, reddit dataset has been preprocessed and packed into docker image, which can be instantly pulled using following commands.
- reddit.npz: https://drive.google.com/open?id=19SphVl_Oe8SJ1r87Hr5a6znx3nJu1F2J
- reddit_adj.npz: https://drive.google.com/open?id=174vb0Ws7Vxk_QTUtxqTgDHSQ4El4qDHt
```sh
docker pull githubutilities/reddit_redis_demo:v0.1
```
Download `reddit.npz` and `reddit_adj.npz` into `data` directory for further preprocessing.
### Dependencies
```txt
- paddlepaddle>=1.6
- pgl
- scipy
- redis==2.10.6
- redis-py-cluster==1.3.6
```sh
pip install -r requirements.txt
```
### How to run
#### 1. Start reddit data service
#### 1. Preprocessing and start reddit data service
```sh
docker run \
--net=host \
-d --rm \
--name reddit_demo \
-it githubutilities/reddit_redis_demo:v0.1 \
/bin/bash -c "/bin/bash ./before_hook.sh && /bin/bash"
docker logs -f `docker ps -aqf "name=reddit_demo"`
pushd ./redis_setup
/bin/bash ./before_hook.sh
popd
```
#### 2. training GraphSAGE model
```sh
python train.py --use_cuda --epoch 10 --graphsage_type graphsage_mean --sample_workers 10
sh ./cloud_run.sh
```
#### Hyperparameters
- epoch: Number of epochs default (10)
- use_cuda: Use gpu if assign use_cuda.
- graphsage_type: We support 4 aggregator types including "graphsage_mean", "graphsage_maxpool", "graphsage_meanpool" and "graphsage_lstm".
- sample_workers: The number of workers for multiprocessing subgraph sample.
- lr: Learning rate.
- batch_size: Batch size.
- samples_1: The max neighbors for the first hop neighbor sampling. (default: 25)
- samples_2: The max neighbors for the second hop neighbor sampling. (default: 10)
- hidden_size: The hidden size of the GraphSAGE models.
#!/bin/bash
set -x
mode=${1}
source ./utils.sh
unset http_proxy https_proxy
source ./local_config
if [ ! -d ${log_dir} ]; then
mkdir ${log_dir}
fi
for((i=0;i<${PADDLE_PSERVERS_NUM};i++))
do
echo "start ps server: ${i}"
echo $log_dir
TRAINING_ROLE="PSERVER" PADDLE_TRAINER_ID=${i} sh job.sh &> $log_dir/pserver.$i.log &
done
sleep 10s
for((j=0;j<${PADDLE_TRAINERS_NUM};j++))
do
echo "start ps work: ${j}"
TRAINING_ROLE="TRAINER" PADDLE_TRAINER_ID=${j} sh job.sh &> $log_dir/worker.$j.log &
done
tail -f $log_dir/worker.0.log
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import time
import os
import math
import numpy as np
import paddle.fluid as F
import paddle.fluid.layers as L
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from pgl.utils.logger import log
from model import GraphsageModel
from utils import load_config
import reader
def init_role():
# reset the place according to role of parameter server
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
paddle_role = role_maker.Role.WORKER
place = F.CPUPlace()
if training_role == "PSERVER":
paddle_role = role_maker.Role.SERVER
# set the fleet runtime environment according to configure
ports = os.getenv("PADDLE_PORT", "6174").split(",")
pserver_ips = os.getenv("PADDLE_PSERVERS").split(",") # ip,ip...
eplist = []
if len(ports) > 1:
# local debug mode, multi port
for port in ports:
eplist.append(':'.join([pserver_ips[0], port]))
else:
# distributed mode, multi ip
for ip in pserver_ips:
eplist.append(':'.join([ip, ports[0]]))
pserver_endpoints = eplist # ip:port,ip:port...
worker_num = int(os.getenv("PADDLE_TRAINERS_NUM", "0"))
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
role = role_maker.UserDefinedRoleMaker(
current_id=trainer_id,
role=paddle_role,
worker_num=worker_num,
server_endpoints=pserver_endpoints)
fleet.init(role)
def optimization(base_lr, loss, optimizer='adam'):
if optimizer == 'sgd':
optimizer = F.optimizer.SGD(base_lr)
elif optimizer == 'adam':
optimizer = F.optimizer.Adam(base_lr, lazy_mode=True)
else:
raise ValueError
log.info('learning rate:%f' % (base_lr))
#create the DistributeTranspiler configure
config = DistributeTranspilerConfig()
config.sync_mode = False
#config.runtime_split_send_recv = False
config.slice_var_up = False
#create the distributed optimizer
optimizer = fleet.distributed_optimizer(optimizer, config)
optimizer.minimize(loss)
def build_complied_prog(train_program, model_loss):
num_threads = int(os.getenv("CPU_NUM", 10))
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", 0))
exec_strategy = F.ExecutionStrategy()
exec_strategy.num_threads = num_threads
#exec_strategy.use_experimental_executor = True
build_strategy = F.BuildStrategy()
build_strategy.enable_inplace = True
#build_strategy.memory_optimize = True
build_strategy.memory_optimize = False
build_strategy.remove_unnecessary_lock = False
if num_threads > 1:
build_strategy.reduce_strategy = F.BuildStrategy.ReduceStrategy.Reduce
compiled_prog = F.compiler.CompiledProgram(
train_program).with_data_parallel(loss_name=model_loss.name)
return compiled_prog
def fake_py_reader(data_iter, num):
def fake_iter():
queue = []
for idx, data in enumerate(data_iter()):
queue.append(data)
if len(queue) == num:
yield queue
queue = []
if len(queue) > 0:
while len(queue) < num:
queue.append(queue[-1])
yield queue
return fake_iter
def train_prog(exe, program, model, pyreader, args):
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
start = time.time()
batch = 0
total_loss = 0.
total_acc = 0.
total_sample = 0
for epoch_idx in range(args.num_epoch):
for step, batch_feed_dict in enumerate(pyreader()):
try:
cpu_time = time.time()
batch += 1
batch_loss, batch_acc = exe.run(
program,
feed=batch_feed_dict,
fetch_list=[model.loss, model.acc])
end = time.time()
if batch % args.log_per_step == 0:
log.info(
"Batch %s Loss %s Acc %s \t Speed(per batch) %.5lf/%.5lf sec"
% (batch, np.mean(batch_loss), np.mean(batch_acc), (end - start) /batch, (end - cpu_time)))
if step % args.steps_per_save == 0:
save_path = args.save_path
if trainer_id == 0:
model_path = os.path.join(save_path, "%s" % step)
fleet.save_persistables(exe, model_path)
except Exception as e:
log.info("Pyreader train error")
log.exception(e)
def main(args):
log.info("start")
worker_num = int(os.getenv("PADDLE_TRAINERS_NUM", "0"))
num_devices = int(os.getenv("CPU_NUM", 10))
model = GraphsageModel(args)
loss = model.forward()
train_iter = reader.get_iter(args, model.graph_wrapper, 'train')
pyreader = fake_py_reader(train_iter, num_devices)
# init fleet
init_role()
optimization(args.lr, loss, args.optimizer)
# init and run server or worker
if fleet.is_server():
fleet.init_server(args.warm_start_from_dir)
fleet.run_server()
if fleet.is_worker():
log.info("start init worker done")
fleet.init_worker()
#just the worker, load the sample
log.info("init worker done")
exe = F.Executor(F.CPUPlace())
exe.run(fleet.startup_program)
log.info("Startup done")
compiled_prog = build_complied_prog(fleet.main_program, loss)
train_prog(exe, compiled_prog, model, pyreader, args)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='metapath2vec')
parser.add_argument("-c", "--config", type=str, default="./config.yaml")
args = parser.parse_args()
config = load_config(args.config)
log.info(config)
main(config)
# model config
hidden_size: 128
num_class: 41
samples: [25, 10]
graphsage_type: "graphsage_mean"
# trainging config
num_epoch: 10
batch_size: 128
num_sample_workers: 10
optimizer: "adam"
lr: 0.01
warm_start_from_dir: null
steps_per_save: 1000
log_per_step: 1
save_path: "./checkpoints"
log_dir: "./logs"
CPU_NUM: 1
#!/bin/bash
set -x
source ./utils.sh
export CPU_NUM=$CPU_NUM
export FLAGS_rpc_deadline=3000000
export FLAGS_communicator_send_queue_size=1
export FLAGS_communicator_min_send_grad_num_before_recv=0
export FLAGS_communicator_max_merge_var_num=1
export FLAGS_communicator_merge_sparse_grad=0
python -u cluster_train.py -c config.yaml
#!/bin/bash
export PADDLE_TRAINERS_NUM=2
export PADDLE_PSERVERS_NUM=2
export PADDLE_PORT=6184,6185
export PADDLE_PSERVERS="127.0.0.1"
......@@ -11,10 +11,22 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
graphsage model.
"""
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
import math
import pgl
import numpy as np
import paddle
import paddle.fluid.layers as L
import paddle.fluid as F
import paddle.fluid as fluid
def copy_send(src_feat, dst_feat, edge_feat):
return src_feat["h"]
......@@ -128,3 +140,87 @@ def graphsage_lstm(gw, feature, hidden_size, act, name):
output = fluid.layers.concat([self_feature, neigh_feature], axis=1)
output = fluid.layers.l2_normalize(output, axis=1)
return output
def build_graph_model(graph_wrapper, num_class, k_hop, graphsage_type,
hidden_size):
node_index = fluid.layers.data(
"node_index", shape=[None], dtype="int64", append_batch_size=False)
node_label = fluid.layers.data(
"node_label", shape=[None, 1], dtype="int64", append_batch_size=False)
#feature = fluid.layers.gather(feature, graph_wrapper.node_feat['feats'])
feature = graph_wrapper.node_feat['feats']
feature.stop_gradient = True
for i in range(k_hop):
if graphsage_type == 'graphsage_mean':
feature = graphsage_mean(
graph_wrapper,
feature,
hidden_size,
act="relu",
name="graphsage_mean_%s" % i)
elif graphsage_type == 'graphsage_meanpool':
feature = graphsage_meanpool(
graph_wrapper,
feature,
hidden_size,
act="relu",
name="graphsage_meanpool_%s" % i)
elif graphsage_type == 'graphsage_maxpool':
feature = graphsage_maxpool(
graph_wrapper,
feature,
hidden_size,
act="relu",
name="graphsage_maxpool_%s" % i)
elif graphsage_type == 'graphsage_lstm':
feature = graphsage_lstm(
graph_wrapper,
feature,
hidden_size,
act="relu",
name="graphsage_maxpool_%s" % i)
else:
raise ValueError("graphsage type %s is not"
" implemented" % graphsage_type)
feature = fluid.layers.gather(feature, node_index)
logits = fluid.layers.fc(feature,
num_class,
act=None,
name='classification_layer')
proba = fluid.layers.softmax(logits)
loss = fluid.layers.softmax_with_cross_entropy(
logits=logits, label=node_label)
loss = fluid.layers.mean(loss)
acc = fluid.layers.accuracy(input=proba, label=node_label, k=1)
return loss, acc
class GraphsageModel(object):
def __init__(self, args):
self.args = args
def forward(self):
args = self.args
graph_wrapper = pgl.graph_wrapper.GraphWrapper(
"sub_graph", node_feat=[('feats', [None, 602], np.dtype('float32'))])
loss, acc = build_graph_model(
graph_wrapper,
num_class=args.num_class,
hidden_size=args.hidden_size,
graphsage_type=args.graphsage_type,
k_hop=len(args.samples))
loss.persistable = True
self.graph_wrapper = graph_wrapper
self.loss = loss
self.acc = acc
return loss
......@@ -11,6 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import numpy as np
import pickle as pkl
import paddle
......@@ -147,3 +149,48 @@ def multiprocess_graph_reader(
return reader()
def load_data():
"""
data from https://github.com/matenure/FastGCN/issues/8
reddit.npz: https://drive.google.com/open?id=19SphVl_Oe8SJ1r87Hr5a6znx3nJu1F2J
reddit_index_label is preprocess from reddit.npz without feats key.
"""
data_dir = os.path.dirname(os.path.abspath(__file__))
data = np.load(os.path.join(data_dir, "data/reddit_index_label.npz"))
num_class = 41
train_label = data['y_train']
val_label = data['y_val']
test_label = data['y_test']
train_index = data['train_index']
val_index = data['val_index']
test_index = data['test_index']
return {
"train_index": train_index,
"train_label": train_label,
"val_label": val_label,
"val_index": val_index,
"test_index": test_index,
"test_label": test_label,
"num_class": 41
}
def get_iter(args, graph_wrapper, mode):
data = load_data()
train_iter = multiprocess_graph_reader(
graph_wrapper,
samples=args.samples,
num_workers=args.num_sample_workers,
batch_size=args.batch_size,
node_index=data['train_index'],
node_label=data["train_label"])
return train_iter
if __name__ == '__main__':
for e in train_iter():
print(e)
#!/bin/bash
set -x
srcdir=./src
# Data preprocessing
python ./src/preprocess.py
# Download and compile redis
export PATH=$PWD/redis-5.0.5/src:$PATH
if [ ! -f ./redis.tar.gz ]; then
curl https://codeload.github.com/antirez/redis/tar.gz/5.0.5 -o ./redis.tar.gz
fi
tar -xzf ./redis.tar.gz
cd ./redis-5.0.5/
make
cd -
# Install python deps
python -m pip install -U pip
pip install -r ./src/requirements.txt -U
# Run redis server
sh ./src/run_server.sh
# Dumping data into redis
source ./redis_graph.cfg
sh ./src/dump_data.sh $edge_path $server_list $num_nodes $node_feat_path
exit 0
# dump config
edge_path=../data/edge.txt
node_feat_path=../data/feats.npz
num_nodes=232965
server_list=./server.list
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import json
import logging
from collections import defaultdict
import tqdm
import redis
from redis._compat import b, unicode, bytes, long, basestring
from rediscluster.nodemanager import NodeManager
from rediscluster.crc import crc16
import argparse
import time
import pickle
import numpy as np
import scipy.sparse as sp
log = logging.getLogger(__name__)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
root.addHandler(handler)
def encode(value):
"""
Return a bytestring representation of the value.
This method is copied from Redis' connection.py:Connection.encode
"""
if isinstance(value, bytes):
return value
elif isinstance(value, (int, long)):
value = b(str(value))
elif isinstance(value, float):
value = b(repr(value))
elif not isinstance(value, basestring):
value = unicode(value)
if isinstance(value, unicode):
value = value.encode('utf-8')
return value
def crc16_hash(data):
return crc16(encode(data))
def get_redis(startup_host, startup_port):
startup_nodes = [{"host": startup_host, "port": startup_port}, ]
nodemanager = NodeManager(startup_nodes=startup_nodes)
nodemanager.initialize()
rs = {}
for node, config in nodemanager.nodes.items():
rs[node] = redis.Redis(
host=config["host"], port=config["port"], decode_responses=False)
return rs, nodemanager
def load_data(edge_path):
src, dst = [], []
with open(edge_path, "r") as f:
for i in tqdm.tqdm(f):
s, d, _ = i.split()
s = int(s)
d = int(d)
src.append(s)
dst.append(d)
dst.append(s)
src.append(d)
src = np.array(src, dtype="int64")
dst = np.array(dst, dtype="int64")
return src, dst
def build_edge_index(edge_path, num_nodes, startup_host, startup_port,
num_bucket):
#src, dst = load_data(edge_path)
rs, nodemanager = get_redis(startup_host, startup_port)
dst_mp, edge_mp = defaultdict(list), defaultdict(list)
with open(edge_path) as f:
for l in tqdm.tqdm(f):
a, b, idx = l.rstrip().split('\t')
a, b, idx = int(a), int(b), int(idx)
dst_mp[a].append(b)
edge_mp[a].append(idx)
part_dst_dicts = {}
for i in tqdm.tqdm(range(num_nodes)):
#if len(edge_index.v[i]) == 0:
# continue
#v = edge_index.v[i].astype("int64").reshape([-1, 1])
#e = edge_index.eid[i].astype("int64").reshape([-1, 1])
if i not in dst_mp:
continue
v = np.array(dst_mp[i]).astype('int64').reshape([-1, 1])
e = np.array(edge_mp[i]).astype('int64').reshape([-1, 1])
o = np.hstack([v, e])
key = "d:%s" % i
part = crc16_hash(key) % num_bucket
if part not in part_dst_dicts:
part_dst_dicts[part] = {}
dst_dicts = part_dst_dicts[part]
dst_dicts["d:%s" % i] = o.tobytes()
if len(dst_dicts) > 10000:
slot = nodemanager.keyslot("part-%s" % part)
node = nodemanager.slots[slot][0]['name']
while True:
res = rs[node].hmset("part-%s" % part, dst_dicts)
if res:
break
log.info("HMSET FAILED RETRY connected %s" % node)
time.sleep(1)
part_dst_dicts[part] = {}
for part, dst_dicts in part_dst_dicts.items():
if len(dst_dicts) > 0:
slot = nodemanager.keyslot("part-%s" % part)
node = nodemanager.slots[slot][0]['name']
while True:
res = rs[node].hmset("part-%s" % part, dst_dicts)
if res:
break
log.info("HMSET FAILED RETRY connected %s" % node)
time.sleep(1)
part_dst_dicts[part] = {}
log.info("dst_dict Done")
def build_edge_id(edge_path, num_nodes, startup_host, startup_port,
num_bucket):
src, dst = load_data(edge_path)
rs, nodemanager = get_redis(startup_host, startup_port)
part_edge_dict = {}
for i in tqdm.tqdm(range(len(src))):
key = "e:%s" % i
part = crc16_hash(key) % num_bucket
if part not in part_edge_dict:
part_edge_dict[part] = {}
edge_dict = part_edge_dict[part]
edge_dict["e:%s" % i] = int(src[i]) * num_nodes + int(dst[i])
if len(edge_dict) > 10000:
slot = nodemanager.keyslot("part-%s" % part)
node = nodemanager.slots[slot][0]['name']
while True:
res = rs[node].hmset("part-%s" % part, edge_dict)
if res:
break
log.info("HMSET FAILED RETRY connected %s" % node)
time.sleep(1)
part_edge_dict[part] = {}
for part, edge_dict in part_edge_dict.items():
if len(edge_dict) > 0:
slot = nodemanager.keyslot("part-%s" % part)
node = nodemanager.slots[slot][0]['name']
while True:
res = rs[node].hmset("part-%s" % part, edge_dict)
if res:
break
log.info("HMSET FAILED RETRY connected %s" % node)
time.sleep(1)
part_edge_dict[part] = {}
def build_infos(edge_path, num_nodes, startup_host, startup_port, num_bucket):
src, dst = load_data(edge_path)
rs, nodemanager = get_redis(startup_host, startup_port)
slot = nodemanager.keyslot("num_nodes")
node = nodemanager.slots[slot][0]['name']
res = rs[node].set("num_nodes", num_nodes)
slot = nodemanager.keyslot("num_edges")
node = nodemanager.slots[slot][0]['name']
rs[node].set("num_edges", len(src))
slot = nodemanager.keyslot("nf:infos")
node = nodemanager.slots[slot][0]['name']
rs[node].set("nf:infos", json.dumps([['feats', [-1, 602], 'float32'], ]))
slot = nodemanager.keyslot("ef:infos")
node = nodemanager.slots[slot][0]['name']
rs[node].set("ef:infos", json.dumps([]))
def build_node_feat(node_feat_path, num_nodes, startup_host, startup_port, num_bucket):
assert node_feat_path != "", "node_feat_path empty!"
feat_dict = np.load(node_feat_path)
for k in feat_dict.keys():
feat = feat_dict[k]
assert feat.shape[0] == num_nodes, "num_nodes invalid"
rs, nodemanager = get_redis(startup_host, startup_port)
part_feat_dict = {}
for k in feat_dict.keys():
feat = feat_dict[k]
for i in tqdm.tqdm(range(num_nodes)):
key = "nf:%s:%i" % (k, i)
value = feat[i].tobytes()
part = crc16_hash(key) % num_bucket
if part not in part_feat_dict:
part_feat_dict[part] = {}
part_feat = part_feat_dict[part]
part_feat[key] = value
if len(part_feat) > 100:
slot = nodemanager.keyslot("part-%s" % part)
node = nodemanager.slots[slot][0]['name']
while True:
res = rs[node].hmset("part-%s" % part, part_feat)
if res:
break
log.info("HMSET FAILED RETRY connected %s" % node)
time.sleep(1)
part_feat_dict[part] = {}
for part, part_feat in part_feat_dict.items():
if len(part_feat) > 0:
slot = nodemanager.keyslot("part-%s" % part)
node = nodemanager.slots[slot][0]['name']
while True:
res = rs[node].hmset("part-%s" % part, part_feat)
if res:
break
log.info("HMSET FAILED RETRY connected %s" % node)
time.sleep(1)
part_feat_dict[part] = {}
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='gen_redis_conf')
parser.add_argument('--startup_port', type=int, required=True)
parser.add_argument('--startup_host', type=str, required=True)
parser.add_argument('--edge_path', type=str, default="")
parser.add_argument('--node_feat_path', type=str, default="")
parser.add_argument('--num_nodes', type=int, default=0)
parser.add_argument('--num_bucket', type=int, default=64)
parser.add_argument(
'--mode',
type=str,
required=True,
help="choose one of the following modes (clear, edge_index, edge_id, graph_attr)"
)
args = parser.parse_args()
log.info("Mode: {}".format(args.mode))
if args.mode == 'edge_index':
build_edge_index(args.edge_path, args.num_nodes, args.startup_host,
args.startup_port, args.num_bucket)
elif args.mode == 'edge_id':
build_edge_id(args.edge_path, args.num_nodes, args.startup_host,
args.startup_port, args.num_bucket)
elif args.mode == 'graph_attr':
build_infos(args.edge_path, args.num_nodes, args.startup_host,
args.startup_port, args.num_bucket)
elif args.mode == 'node_feat':
build_node_feat(args.node_feat_path, args.num_nodes, args.startup_host,
args.startup_port, args.num_bucket)
else:
raise ValueError("%s mode not found" % args.mode)
filter(){
lines=`cat $1`
rm $1
for line in $lines; do
remote_host=`echo $line | cut -d":" -f1`
remote_port=`echo $line | cut -d":" -f2`
nc -z $remote_host $remote_port
if [[ $? == 0 ]]; then
echo $line >> $1
fi
done
}
dump_data(){
filter $server_list
python ./src/start_cluster.py --server_list $server_list --replicas 0
address=`head -n 1 $server_list`
ip=`echo $address | cut -d":" -f1`
port=`echo $address | cut -d":" -f2`
python ./src/build_graph.py --startup_host $ip \
--startup_port $port \
--mode node_feat \
--node_feat_path $feat_fn \
--num_nodes $num_nodes
# build edge index
python ./src/build_graph.py --startup_host $ip \
--startup_port $port \
--mode edge_index \
--edge_path $edge_path \
--num_nodes $num_nodes
# build edge id
#python ./src/build_graph.py --startup_host $ip \
# --startup_port $port \
# --mode edge_id \
# --edge_path $edge_path \
# --num_nodes $num_nodes
# build graph attr
python ./src/build_graph.py --startup_host $ip \
--startup_port $port \
--mode graph_attr \
--edge_path $edge_path \
--num_nodes $num_nodes
}
if [ $# -ne 4 ]; then
echo 'sh edge_path server_list num_nodes feat_fn'
exit
fi
num_nodes=$3
server_list=$2
edge_path=$1
feat_fn=$4
dump_data
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import socket
import argparse
import os
temp = """port %s
bind %s
daemonize yes
pidfile /var/run/redis_%s.pid
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 50000
logfile "redis.log"
appendonly yes"""
def gen_config(ports):
if len(ports) == 0:
raise ValueError("No ports")
ip = socket.gethostbyname(socket.gethostname())
print("Generate redis conf")
for port in ports:
try:
os.mkdir("%s" % port)
except:
print("port %s directory already exists" % port)
pass
with open("%s/redis.conf" % port, 'w') as f:
f.write(temp % (port, ip, port))
print("Generate Start Server Scripts")
with open("start_server.sh", "w") as f:
f.write("set -x\n")
for ind, port in enumerate(ports):
f.write("# %s %s start\n" % (ip, port))
if ind > 0:
f.write("cd ..\n")
f.write("cd %s\n" % port)
f.write("redis-server redis.conf\n")
f.write("\n")
print("Generate Stop Server Scripts")
with open("stop_server.sh", "w") as f:
f.write("set -x\n")
for ind, port in enumerate(ports):
f.write("# %s %s shutdown\n" % (ip, port))
f.write("redis-cli -h %s -p %s shutdown\n" % (ip, port))
f.write("\n")
with open("server.list", "w") as f:
for ind, port in enumerate(ports):
f.write("%s:%s\n" % (ip, port))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='gen_redis_conf')
parser.add_argument('--ports', nargs='+', type=int, default=[])
args = parser.parse_args()
gen_config(args.ports)
import os
import sys
import numpy as np
import scipy.sparse as sp
def _load_config(fn):
ret = {}
with open(fn) as f:
for l in f:
if l.strip() == '' or l.startswith('#'):
continue
k, v = l.strip().split('=')
ret[k] = v
return ret
def _prepro(config):
data = np.load("../data/reddit.npz")
adj = sp.load_npz("../data/reddit_adj.npz")
adj = adj.tocoo()
src = adj.row
dst = adj.col
with open(config['edge_path'], 'w') as f:
for idx, e in enumerate(zip(src, dst)):
s, d = e
l = "{}\t{}\t{}\n".format(s, d, idx)
f.write(l)
feats = data['feats'].astype(np.float32)
np.savez(config['node_feat_path'], feats=feats)
if __name__ == '__main__':
config = _load_config('./redis_graph.cfg')
_prepro(config)
numpy
scipy
tqdm
redis==2.10.6
redis-py-cluster==1.3.6
start_server(){
ports=""
for i in {7430..7439}; do
nc -z localhost $i
if [[ $? != 0 ]]; then
ports="$ports $i"
fi
done
python ./src/gen_redis_conf.py --ports $ports
bash ./start_server.sh #启动服务器
}
start_server
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import argparse
def build_clusters(server_list, replicas):
servers = []
with open(server_list) as f:
for line in f:
servers.append(line.strip())
cmd = "echo yes | redis-cli --cluster create"
for server in servers:
cmd += ' %s ' % server
cmd += '--cluster-replicas %s' % replicas
print(cmd)
os.system(cmd)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='start_cluster')
parser.add_argument('--server_list', type=str, required=True)
parser.add_argument('--replicas', type=int, default=0)
args = parser.parse_args()
build_clusters(args.server_list, args.replicas)
#!/bin/bash
source ./redis_graph.cfg
url=`head -n1 $server_list`
shuf $edge_path | head -n 1000 | python ./test/test_redis_graph.py $url
#!/usr/bin/env python
# -*- coding: utf-8 -*-
########################################################################
#
# Copyright (c) 2019 Baidu.com, Inc. All Rights Reserved
#
# File: test_redis_graph.py
# Author: suweiyue(suweiyue@baidu.com)
# Date: 2019/08/19 16:28:18
#
########################################################################
"""
Comment.
"""
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
import sys
import numpy as np
import tqdm
from pgl.redis_graph import RedisGraph
if __name__ == '__main__':
host, port = sys.argv[1].split(':')
port = int(port)
redis_configs = [{"host": host, "port": port}, ]
graph = RedisGraph("reddit-graph", redis_configs, num_parts=64)
#nodes = np.arange(0, 100)
#for i in range(0, 100):
for l in tqdm.tqdm(sys.stdin):
l_sp = l.rstrip().split('\t')
if len(l_sp) != 2:
continue
i, j = int(l_sp[0]), int(l_sp[1])
nodes = graph.sample_predecessor(np.array([i]), 10000)
assert j in nodes
pgl==1.1.0
pyyaml
paddlepaddle==1.6.1
scipy
redis==2.10.6
redis-py-cluster==1.3.6
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import argparse
import time
import numpy as np
import scipy.sparse as sp
from sklearn.preprocessing import StandardScaler
import pgl
from pgl.utils.logger import log
from pgl.utils import paddle_helper
import paddle
import paddle.fluid as fluid
import reader
from model import graphsage_mean, graphsage_meanpool,\
graphsage_maxpool, graphsage_lstm
def load_data():
"""
data from https://github.com/matenure/FastGCN/issues/8
reddit.npz: https://drive.google.com/open?id=19SphVl_Oe8SJ1r87Hr5a6znx3nJu1F2J
reddit_index_label is preprocess from reddit.npz without feats key.
"""
data_dir = os.path.dirname(os.path.abspath(__file__))
data = np.load(os.path.join(data_dir, "data/reddit_index_label.npz"))
num_class = 41
train_label = data['y_train']
val_label = data['y_val']
test_label = data['y_test']
train_index = data['train_index']
val_index = data['val_index']
test_index = data['test_index']
return {
"train_index": train_index,
"train_label": train_label,
"val_label": val_label,
"val_index": val_index,
"test_index": test_index,
"test_label": test_label,
"num_class": 41
}
def build_graph_model(graph_wrapper, num_class, k_hop, graphsage_type,
hidden_size):
node_index = fluid.layers.data(
"node_index", shape=[None], dtype="int64", append_batch_size=False)
node_label = fluid.layers.data(
"node_label", shape=[None, 1], dtype="int64", append_batch_size=False)
#feature = fluid.layers.gather(feature, graph_wrapper.node_feat['feats'])
feature = graph_wrapper.node_feat['feats']
feature.stop_gradient = True
for i in range(k_hop):
if graphsage_type == 'graphsage_mean':
feature = graphsage_mean(
graph_wrapper,
feature,
hidden_size,
act="relu",
name="graphsage_mean_%s" % i)
elif graphsage_type == 'graphsage_meanpool':
feature = graphsage_meanpool(
graph_wrapper,
feature,
hidden_size,
act="relu",
name="graphsage_meanpool_%s" % i)
elif graphsage_type == 'graphsage_maxpool':
feature = graphsage_maxpool(
graph_wrapper,
feature,
hidden_size,
act="relu",
name="graphsage_maxpool_%s" % i)
elif graphsage_type == 'graphsage_lstm':
feature = graphsage_lstm(
graph_wrapper,
feature,
hidden_size,
act="relu",
name="graphsage_maxpool_%s" % i)
else:
raise ValueError("graphsage type %s is not"
" implemented" % graphsage_type)
feature = fluid.layers.gather(feature, node_index)
logits = fluid.layers.fc(feature,
num_class,
act=None,
name='classification_layer')
proba = fluid.layers.softmax(logits)
loss = fluid.layers.softmax_with_cross_entropy(
logits=logits, label=node_label)
loss = fluid.layers.mean(loss)
acc = fluid.layers.accuracy(input=proba, label=node_label, k=1)
return loss, acc
def run_epoch(batch_iter,
exe,
program,
prefix,
model_loss,
model_acc,
epoch,
log_per_step=100):
batch = 0
total_loss = 0.
total_acc = 0.
total_sample = 0
start = time.time()
for batch_feed_dict in batch_iter():
batch += 1
batch_loss, batch_acc = exe.run(program,
fetch_list=[model_loss, model_acc],
feed=batch_feed_dict)
if batch % log_per_step == 0:
log.info("Batch %s %s-Loss %s %s-Acc %s" %
(batch, prefix, batch_loss, prefix, batch_acc))
num_samples = len(batch_feed_dict["node_index"])
total_loss += batch_loss * num_samples
total_acc += batch_acc * num_samples
total_sample += num_samples
end = time.time()
log.info("%s Epoch %s Loss %.5lf Acc %.5lf Speed(per batch) %.5lf sec" %
(prefix, epoch, total_loss / total_sample,
total_acc / total_sample, (end - start) / batch))
def main(args):
data = load_data()
log.info("preprocess finish")
log.info("Train Examples: %s" % len(data["train_index"]))
log.info("Val Examples: %s" % len(data["val_index"]))
log.info("Test Examples: %s" % len(data["test_index"]))
place = fluid.CUDAPlace(0) if args.use_cuda else fluid.CPUPlace()
train_program = fluid.Program()
startup_program = fluid.Program()
samples = []
if args.samples_1 > 0:
samples.append(args.samples_1)
if args.samples_2 > 0:
samples.append(args.samples_2)
with fluid.program_guard(train_program, startup_program):
graph_wrapper = pgl.graph_wrapper.GraphWrapper(
"sub_graph", node_feat=[('feats', [None, 602], np.dtype('float32'))])
model_loss, model_acc = build_graph_model(
graph_wrapper,
num_class=data["num_class"],
hidden_size=args.hidden_size,
graphsage_type=args.graphsage_type,
k_hop=len(samples))
test_program = train_program.clone(for_test=True)
with fluid.program_guard(train_program, startup_program):
adam = fluid.optimizer.Adam(learning_rate=args.lr)
adam.minimize(model_loss)
exe = fluid.Executor(place)
exe.run(startup_program)
train_iter = reader.multiprocess_graph_reader(
graph_wrapper,
samples=samples,
num_workers=args.sample_workers,
batch_size=args.batch_size,
node_index=data['train_index'],
node_label=data["train_label"])
val_iter = reader.multiprocess_graph_reader(
graph_wrapper,
samples=samples,
num_workers=args.sample_workers,
batch_size=args.batch_size,
node_index=data['val_index'],
node_label=data["val_label"])
test_iter = reader.multiprocess_graph_reader(
graph_wrapper,
samples=samples,
num_workers=args.sample_workers,
batch_size=args.batch_size,
node_index=data['test_index'],
node_label=data["test_label"])
for epoch in range(args.epoch):
run_epoch(
train_iter,
program=train_program,
exe=exe,
prefix="train",
model_loss=model_loss,
model_acc=model_acc,
log_per_step=1,
epoch=epoch)
run_epoch(
val_iter,
program=test_program,
exe=exe,
prefix="val",
model_loss=model_loss,
model_acc=model_acc,
log_per_step=10000,
epoch=epoch)
run_epoch(
test_iter,
program=test_program,
prefix="test",
exe=exe,
model_loss=model_loss,
model_acc=model_acc,
log_per_step=10000,
epoch=epoch)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='graphsage')
parser.add_argument("--use_cuda", action='store_true', help="use_cuda")
parser.add_argument(
"--normalize", action='store_true', help="normalize features")
parser.add_argument(
"--symmetry", action='store_true', help="undirect graph")
parser.add_argument("--graphsage_type", type=str, default="graphsage_mean")
parser.add_argument("--sample_workers", type=int, default=10)
parser.add_argument("--epoch", type=int, default=10)
parser.add_argument("--hidden_size", type=int, default=128)
parser.add_argument("--batch_size", type=int, default=128)
parser.add_argument("--lr", type=float, default=0.01)
parser.add_argument("--samples_1", type=int, default=25)
parser.add_argument("--samples_2", type=int, default=10)
args = parser.parse_args()
log.info(args)
main(args)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of some helper functions"""
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
import os
import time
import yaml
import numpy as np
from pgl.utils.logger import log
class AttrDict(dict):
"""Attr dict
"""
def __init__(self, d):
self.dict = d
def __getattr__(self, attr):
value = self.dict[attr]
if isinstance(value, dict):
return AttrDict(value)
else:
return value
def __str__(self):
return str(self.dict)
def load_config(config_file):
"""Load config file"""
with open(config_file) as f:
if hasattr(yaml, 'FullLoader'):
config = yaml.load(f, Loader=yaml.FullLoader)
else:
config = yaml.load(f)
return AttrDict(config)
unset http_proxy https_proxy
set -x
mode=${1:-local}
config=${2:-"./config.yaml"}
# parse yaml file
function parse_yaml {
local prefix=$2
local s='[[:space:]]*' w='[a-zA-Z0-9_]*' fs=$(echo @|tr @ '\034')
......@@ -20,26 +16,5 @@ function parse_yaml {
}
}'
}
eval $(parse_yaml $config)
export CPU_NUM=$CPU_NUM
export FLAGS_rpc_deadline=3000000
export FLAGS_rpc_retry_times=1000
if [[ $async_mode == "True" ]];then
echo "async_mode is True"
else
export FLAGS_communicator_send_queue_size=1
export FLAGS_communicator_min_send_grad_num_before_recv=0
export FLAGS_communicator_max_merge_var_num=1 # important!
export FLAGS_communicator_merge_sparse_grad=0
fi
export FLAGS_communicator_recv_wait_times=5000000
mkdir -p output
python ./train.py --conf $config
if [[ $TRAINING_ROLE == "TRAINER" ]];then
python ./infer.py --conf $config
fi
eval $(parse_yaml "$(dirname "${BASH_SOURCE}")"/config.yaml)
......@@ -32,10 +32,14 @@ Thanks to the flexibility and usability of PGL, **ERNIESage** can be quickly imp
- pgl>=1.1
## Dataformat
In the example data ```data.txt```, part of NLPCC2016-DBQA is used, and the format is "query \t answer" for each line.
```text
NLPCC2016-DBQA is a sub-task of NLPCC-ICCPOL 2016 Shared Task which is hosted by NLPCC(Natural Language Processing and Chinese Computing), this task targets on selecting documents from the candidates to answer the questions. [url: http://tcci.ccf.org.cn/conference/2016/dldoc/evagline2.pdf]
```
## How to run
We adopt [PaddlePaddle Fleet](https://github.com/PaddlePaddle/Fleet) as our distributed training frameworks ```config/*.yaml``` are some example config files for hyperparameters.
We adopt [PaddlePaddle Fleet](https://github.com/PaddlePaddle/Fleet) as our distributed training frameworks ```config/*.yaml``` are some example config files for hyperparameters. Among them, the ERNIE model checkpoint ```ckpt_path``` and the vocabulary ```ernie_vocab_file``` can be downloaded on the [ERNIE](https://github.com/PaddlePaddle/ERNIE) page.
```sh
# train ERNIESage in distributed gpu mode.
......
......@@ -8,7 +8,7 @@
<img src="./docs/source/_static/text_graph.png" alt="Text Graph" width="800">
**ERNIESage** 由PGL团队提出,是ERNIE SAmple aggreGatE的简称,该模型可以同时建模文本语义与图结构信息,有效提升 Text Graph 的应用效果。其中 [**ERNIE**](https://github.com/PaddlePaddle/ERNIE) 是百度推出的基于知识增强的持续学习语义理解框架
**ERNIESage** 由PGL团队提出,是ERNIE SAmple aggreGatE的简称,该模型可以同时建模文本语义与图结构信息,有效提升 Text Graph 的应用效果。其中 [**ERNIE**](https://github.com/PaddlePaddle/ERNIE) 是百度推出的基于知识增强的持续学习语义理解框架
**ERNIESage** 是 ERNIE 与 GraphSAGE 碰撞的结果,是 ERNIE SAmple aggreGatE 的简称,它的结构如下图所示,主要思想是通过 ERNIE 作为聚合函数(Aggregators),建模自身节点和邻居节点的语义与结构关系。ERNIESage 对于文本的建模是构建在邻居聚合的阶段,中心节点文本会与所有邻居节点文本进行拼接;然后通过预训练的 ERNIE 模型进行消息汇聚,捕捉中心节点以及邻居节点之间的相互关系;最后使用 ERNIESage 搭配独特的邻居互相看不见的 Attention Mask 和独立的 Position Embedding 体系,就可以轻松构建 TextGraph 中句子之间以及词之间的关系。
......@@ -32,17 +32,22 @@
- pgl>=1.1
## Dataformat
示例数据```data.txt```中使用了NLPCC2016-DBQA的部分数据,格式为每行"query \t answer"。
```text
NLPCC2016-DBQA 是由国际自然语言处理和中文计算会议 NLPCC 于 2016 年举办的评测任务,其目标是从候选中找到合适的文档作为问题的答案。[链接: http://tcci.ccf.org.cn/conference/2016/dldoc/evagline2.pdf]
```
## How to run
我们采用了[PaddlePaddle Fleet](https://github.com/PaddlePaddle/Fleet)作为我们的分布式训练框架,在```config/*.yaml```中,由部分用于训练ERNIESage的配置。
我们采用了[PaddlePaddle Fleet](https://github.com/PaddlePaddle/Fleet)作为我们的分布式训练框架,在```config/*.yaml```中,有部分用于训练ERNIESage的配置, 其中ERNIE模型```ckpt_path```以及词表```ernie_vocab_file```[ERNIE](https://github.com/PaddlePaddle/ERNIE)下载。
```sh
# 分布式GPU模式或单机模式ERNIESage
sh local_run.sh config/enriesage_v1_gpu.yaml
sh local_run.sh config/erniesage_v2_gpu.yaml
# 分布式CPU模式训练ERNIESage
sh local_run.sh config/enriesage_v1_cpu.yaml
sh local_run.sh config/erniesage_v2_cpu.yaml
```
## Hyperparamters
......
......@@ -4,9 +4,9 @@
learner_type: "cpu"
optimizer_type: "adam"
lr: 0.00005
batch_size: 2
CPU_NUM: 10
epoch: 20
batch_size: 4
CPU_NUM: 16
epoch: 3
log_per_step: 1
save_per_step: 100
output_path: "./output"
......@@ -31,6 +31,7 @@ final_fc: true
final_l2_norm: true
loss_type: "hinge"
margin: 0.3
neg_type: "batch_neg"
# infer config ------
infer_model: "./output/last"
......@@ -48,7 +49,7 @@ ernie_config:
max_position_embeddings: 513
num_attention_heads: 12
num_hidden_layers: 12
sent_type_vocab_size: 4
sent_type_vocab_size: 2
task_type_vocab_size: 3
vocab_size: 18000
use_task_id: false
......
......@@ -6,9 +6,9 @@ optimizer_type: "adam"
lr: 0.00005
batch_size: 32
CPU_NUM: 10
epoch: 20
log_per_step: 1
save_per_step: 100
epoch: 3
log_per_step: 10
save_per_step: 1000
output_path: "./output"
ckpt_path: "./ernie_base_ckpt"
......@@ -31,6 +31,7 @@ final_fc: true
final_l2_norm: true
loss_type: "hinge"
margin: 0.3
neg_type: "batch_neg"
# infer config ------
infer_model: "./output/last"
......@@ -48,7 +49,7 @@ ernie_config:
max_position_embeddings: 513
num_attention_heads: 12
num_hidden_layers: 12
sent_type_vocab_size: 4
sent_type_vocab_size: 2
task_type_vocab_size: 3
vocab_size: 18000
use_task_id: false
......
此差异已折叠。
......@@ -24,7 +24,7 @@ from pgl.sample import edge_hash
class GraphGenerator(BaseDataGenerator):
def __init__(self, graph_wrappers, data, batch_size, samples,
num_workers, feed_name_list, use_pyreader,
phase, graph_data_path, shuffle=True, buf_size=1000):
phase, graph_data_path, shuffle=True, buf_size=1000, neg_type="batch_neg"):
super(GraphGenerator, self).__init__(
buf_size=buf_size,
......@@ -40,6 +40,7 @@ class GraphGenerator(BaseDataGenerator):
self.phase = phase
self.load_graph(graph_data_path)
self.num_layers = len(graph_wrappers)
self.neg_type= neg_type
def load_graph(self, graph_data_path):
self.graph = pgl.graph.MemmapGraph(graph_data_path)
......@@ -72,7 +73,11 @@ class GraphGenerator(BaseDataGenerator):
batch_src = np.array(batch_src, dtype="int64")
batch_dst = np.array(batch_dst, dtype="int64")
sampled_batch_neg = alias_sample(batch_dst.shape, self.alias, self.events)
if self.neg_type == "batch_neg":
neg_shape = [1]
else:
neg_shape = batch_dst.shape
sampled_batch_neg = alias_sample(neg_shape, self.alias, self.events)
if len(batch_neg) > 0:
batch_neg = np.concatenate([batch_neg, sampled_batch_neg], 0)
......@@ -80,12 +85,14 @@ class GraphGenerator(BaseDataGenerator):
batch_neg = sampled_batch_neg
if self.phase == "train":
#ignore_edges = np.concatenate([np.stack([batch_src, batch_dst], 1), np.stack([batch_dst, batch_src], 1)], 0)
ignore_edges = set()
else:
ignore_edges = set()
nodes = np.unique(np.concatenate([batch_src, batch_dst, batch_neg], 0))
subgraphs = graphsage_sample(self.graph, nodes, self.samples, ignore_edges=ignore_edges)
#subgraphs[0].reindex_to_parrent_nodes(subgraphs[0].nodes)
feed_dict = {}
for i in range(self.num_layers):
feed_dict.update(self.graph_wrappers[i].to_feed(subgraphs[i]))
......@@ -97,8 +104,8 @@ class GraphGenerator(BaseDataGenerator):
feed_dict["user_index"] = np.array(sub_src_idx, dtype="int64")
feed_dict["item_index"] = np.array(sub_dst_idx, dtype="int64")
#feed_dict["neg_item_index"] = np.array(sub_neg_idx, dtype="int64")
feed_dict["term_ids"] = self.term_ids[subgraphs[0].node_feat["index"]]
feed_dict["neg_item_index"] = np.array(sub_neg_idx, dtype="int64")
feed_dict["term_ids"] = self.term_ids[subgraphs[0].node_feat["index"]].astype(np.int64)
return feed_dict
def __call__(self):
......
......@@ -59,8 +59,7 @@ def run_predict(py_reader,
log_per_step=1,
args=None):
if args.input_type == "text":
id2str = np.load(os.path.join(args.graph_path, "id2str.npy"), mmap_mode="r")
id2str = io.open(os.path.join(args.graph_path, "terms.txt"), encoding=args.encoding).readlines()
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
......@@ -72,7 +71,7 @@ def run_predict(py_reader,
for batch_feed_dict in py_reader():
batch += 1
batch_usr_feat, batch_ad_feat, batch_src_real_index = exe.run(
batch_usr_feat, batch_ad_feat, _, batch_src_real_index = exe.run(
program,
feed=batch_feed_dict,
fetch_list=model_dict.outputs)
......@@ -82,7 +81,7 @@ def run_predict(py_reader,
for ufs, _, sri in zip(batch_usr_feat, batch_ad_feat, batch_src_real_index):
if args.input_type == "text":
sri = id2str[int(sri)]
sri = id2str[int(sri)].strip("\n")
line = "{}\t{}\n".format(sri, tostr(ufs))
fout.write(line)
......@@ -183,5 +182,6 @@ if __name__ == "__main__":
parser.add_argument("--conf", type=str, default="./config.yaml")
args = parser.parse_args()
config = edict(yaml.load(open(args.conf), Loader=yaml.FullLoader))
config.loss_type = "hinge"
print(config)
main(config)
......@@ -26,6 +26,17 @@ from paddle.fluid.incubate.fleet.collective import fleet as cfleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet as tfleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from tensorboardX import SummaryWriter
from paddle.fluid.transpiler.distribute_transpiler import DistributedMode
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import TrainerRuntimeConfig
# hack it!
base_get_communicator_flags = TrainerRuntimeConfig.get_communicator_flags
def get_communicator_flags(self):
flag_dict = base_get_communicator_flags(self)
flag_dict['communicator_max_merge_var_num'] = str(1)
flag_dict['communicator_send_queue_size'] = str(1)
return flag_dict
TrainerRuntimeConfig.get_communicator_flags = get_communicator_flags
class Learner(object):
......@@ -132,8 +143,6 @@ class TranspilerLearner(Learner):
self.model = model
def optimize(self, loss, optimizer_type, lr):
strategy = DistributeTranspilerConfig()
strategy.sync_mode = False
log.info('learning rate:%f' % lr)
if optimizer_type == "sgd":
optimizer = F.optimizer.SGD(learning_rate=lr)
......@@ -143,7 +152,8 @@ class TranspilerLearner(Learner):
else:
raise ValueError("Unknown Optimizer %s" % optimizer_type)
#create the DistributeTranspiler configure
optimizer = tfleet.distributed_optimizer(optimizer, strategy)
self.strategy = StrategyFactory.create_sync_strategy()
optimizer = tfleet.distributed_optimizer(optimizer, self.strategy)
optimizer.minimize(loss)
def init_and_run_ps_worker(self, ckpt_path):
......@@ -193,6 +203,7 @@ class CollectiveLearner(Learner):
def optimize(self, loss, optimizer_type, lr):
optimizer = F.optimizer.Adam(learning_rate=lr)
dist_strategy = DistributedStrategy()
dist_strategy.enable_sequential_execution = True
optimizer = cfleet.distributed_optimizer(optimizer, strategy=dist_strategy)
_, param_grads = optimizer.minimize(loss, F.default_startup_program())
......
......@@ -36,7 +36,7 @@ transpiler_local_train(){
for((i=0;i<${PADDLE_PSERVERS_NUM};i++))
do
echo "start ps server: ${i}"
TRAINING_ROLE="PSERVER" PADDLE_TRAINER_ID=${i} sh job.sh local $config \
TRAINING_ROLE="PSERVER" PADDLE_TRAINER_ID=${i} python ./train.py --conf $config \
&> $BASE/pserver.$i.log &
echo $! >> job_id
done
......@@ -44,23 +44,20 @@ transpiler_local_train(){
for((j=0;j<${PADDLE_TRAINERS_NUM};j++))
do
echo "start ps work: ${j}"
TRAINING_ROLE="TRAINER" PADDLE_TRAINER_ID=${j} sh job.sh local $config \
echo $! >> job_id
TRAINING_ROLE="TRAINER" PADDLE_TRAINER_ID=${j} python ./train.py --conf $config
TRAINING_ROLE="TRAINER" PADDLE_TRAINER_ID=${j} python ./infer.py --conf $config
done
}
collective_local_train(){
export PATH=./python27-gcc482-gpu/bin/:$PATH
echo `which python`
python -m paddle.distributed.launch train.py --conf $config
python -m paddle.distributed.launch infer.py --conf $config
}
eval $(parse_yaml $config)
unalias python
python3 ./preprocessing/dump_graph.py -i $input_data -o $graph_path --encoding $encoding \
-l $max_seqlen --vocab_file $ernie_vocab_file
python ./preprocessing/dump_graph.py -i $input_data -o $graph_path --encoding $encoding -l $max_seqlen --vocab_file $ernie_vocab_file
if [[ $learner_type == "cpu" ]];then
transpiler_local_train
......
......@@ -40,7 +40,7 @@ class BaseGraphWrapperBuilder(object):
# all graph have same node_feat_info
graph_wrappers.append(
pgl.graph_wrapper.GraphWrapper(
"layer_%s" % i, place, node_feat=self.node_feature_info, edge_feat=self.edge_feature_info))
"layer_%s" % i, node_feat=self.node_feature_info, edge_feat=self.edge_feature_info))
return graph_wrappers
......@@ -129,7 +129,9 @@ class BaseNet(object):
"user_index", shape=[None], dtype="int64", append_batch_size=False)
item_index = L.data(
"item_index", shape=[None], dtype="int64", append_batch_size=False)
return [user_index, item_index]
neg_item_index = L.data(
"neg_item_index", shape=[None], dtype="int64", append_batch_size=False)
return [user_index, item_index, neg_item_index]
def build_embedding(self, graph_wrappers, inputs=None):
num_embed = int(np.load(os.path.join(self.config.graph_path, "num_nodes.npy")))
......@@ -177,18 +179,58 @@ class BaseNet(object):
outputs.append(src_real_index)
return inputs, outputs
def all_gather(X):
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
trainer_num = int(os.getenv("PADDLE_TRAINERS_NUM", "0"))
if trainer_num == 1:
copy_X = X * 1
copy_X.stop_gradients=True
return copy_X
Xs = []
for i in range(trainer_num):
copy_X = X * 1
copy_X = L.collective._broadcast(copy_X, i, True)
copy_X.stop_gradient=True
Xs.append(copy_X)
if len(Xs) > 1:
Xs=L.concat(Xs, 0)
Xs.stop_gradient=True
else:
Xs = Xs[0]
return Xs
class BaseLoss(object):
def __init__(self, config):
self.config = config
def __call__(self, outputs):
user_feat, item_feat = outputs[0], outputs[1]
user_feat, item_feat, neg_item_feat = outputs[0], outputs[1], outputs[2]
loss_type = self.config.loss_type
if self.config.neg_type == "batch_neg":
neg_item_feat = item_feat
# Calc Loss
if self.config.loss_type == "hinge":
pos = L.reduce_sum(user_feat * item_feat, -1, keep_dim=True) # [B, 1]
neg = L.matmul(user_feat, item_feat, transpose_y=True) # [B, B]
neg = L.matmul(user_feat, neg_item_feat, transpose_y=True) # [B, B]
loss = L.reduce_mean(L.relu(neg - pos + self.config.margin))
elif self.config.loss_type == "all_hinge":
pos = L.reduce_sum(user_feat * item_feat, -1, keep_dim=True) # [B, 1]
all_pos = all_gather(pos) # [B * n, 1]
all_neg_item_feat = all_gather(neg_item_feat) # [B * n, 1]
all_user_feat = all_gather(user_feat) # [B * n, 1]
neg1 = L.matmul(user_feat, all_neg_item_feat, transpose_y=True) # [B, B * n]
neg2 = L.matmul(all_user_feat, neg_item_feat, transpose_y=True) # [B *n, B]
loss1 = L.reduce_mean(L.relu(neg1 - pos + self.config.margin))
loss2 = L.reduce_mean(L.relu(neg2 - all_pos + self.config.margin))
#loss = (loss1 + loss2) / 2
loss = loss1 + loss2
elif self.config.loss_type == "softmax":
pass
# TODO
......
......@@ -59,6 +59,8 @@ class ErnieModel(object):
def __init__(self,
src_ids,
sentence_ids,
position_ids=None,
input_mask=None,
task_ids=None,
config=None,
weight_sharing=True,
......@@ -66,8 +68,10 @@ class ErnieModel(object):
name=""):
self._set_config(config, name, weight_sharing)
input_mask = self._build_input_mask(src_ids)
position_ids = self._build_position_ids(src_ids)
if position_ids is None:
position_ids = self._build_position_ids(src_ids)
if input_mask is None:
input_mask = self._build_input_mask(src_ids)
self._build_model(src_ids, position_ids, sentence_ids, task_ids,
input_mask)
self._debug_summary(input_mask)
......@@ -100,7 +104,7 @@ class ErnieModel(object):
zero = L.fill_constant([1], dtype='int64', value=0)
input_mask = L.logical_not(L.equal(src_ids,
zero)) # assume pad id == 0
input_mask = L.cast(input_mask, 'float')
input_mask = L.cast(input_mask, 'float32')
input_mask.stop_gradient = True
return input_mask
......@@ -338,7 +342,7 @@ class ErnieGraphModel(ErnieModel):
L.range(
0, slot_seqlen, 1, dtype='int32'), [1, slot_seqlen, 1],
inplace=True) # [1, slot_seqlen, 1]
a_position_ids = L.expand(a_position_ids, [src_batch, 1, 1]) # [B, slot_seqlen * num_b, 1]
a_position_ids = L.expand(a_position_ids, [src_batch, 1, 1]) # [B, slot_seqlen, 1]
zero = L.fill_constant([1], dtype='int64', value=0)
input_mask = L.cast(L.equal(src_ids[:, :slot_seqlen], zero), "int32") # assume pad id == 0 [B, slot_seqlen, 1]
......
......@@ -19,8 +19,6 @@ from contextlib import contextmanager
import paddle.fluid as fluid
import paddle.fluid.layers as L
import paddle.fluid.layers as layers
#import propeller.paddle as propeller
#from propeller import log
#determin this at the begining
to_3d = lambda a: a # will change later
......@@ -85,7 +83,7 @@ def multi_head_attention(queries,
# The value 0 in shape attr means copying the corresponding dimension
# size of the input as the output dimension size.
reshaped = layers.reshape(
x=x, shape=[0, 0, n_head, hidden_size // n_head], inplace=True)
x=x, shape=[0, 0, n_head, hidden_size // n_head], inplace=True)
# permuate the dimensions into:
# [batch_size, n_head, max_sequence_len, hidden_size_per_head]
......@@ -262,7 +260,6 @@ def encoder_layer(enc_input,
with the post_process_layer to add residual connection, layer normalization
and droput.
"""
#L.Print(L.reduce_mean(enc_input), message='1')
attn_output, ctx_multiheads_attn = multi_head_attention(
pre_process_layer(
enc_input,
......@@ -279,7 +276,6 @@ def encoder_layer(enc_input,
attention_dropout,
param_initializer=param_initializer,
name=name + '_multi_head_att')
#L.Print(L.reduce_mean(attn_output), message='1')
attn_output = post_process_layer(
enc_input,
attn_output,
......@@ -287,7 +283,6 @@ def encoder_layer(enc_input,
prepostprocess_dropout,
name=name + '_post_att')
#L.Print(L.reduce_mean(attn_output), message='2')
ffd_output = positionwise_feed_forward(
pre_process_layer(
attn_output,
......@@ -300,14 +295,12 @@ def encoder_layer(enc_input,
hidden_act,
param_initializer=param_initializer,
name=name + '_ffn')
#L.Print(L.reduce_mean(ffd_output), message='3')
ret = post_process_layer(
attn_output,
ffd_output,
postprocess_cmd,
prepostprocess_dropout,
name=name + '_post_ffn')
#L.Print(L.reduce_mean(ret), message='4')
return ret, ctx_multiheads_attn, ffd_output
......@@ -374,7 +367,7 @@ def encoder(enc_input,
encoder_layer.
"""
#global to_2d, to_3d #, batch, seqlen, dynamic_dim
global to_2d, to_3d #, batch, seqlen, dynamic_dim
d_shape = L.shape(input_mask)
pad_idx = build_pad_idx(input_mask)
attn_bias = build_attn_bias(input_mask, n_head, enc_input.dtype)
......@@ -391,14 +384,14 @@ def encoder(enc_input,
# if attn_bias.dtype != enc_input.dtype:
# attn_bias = L.cast(attn_bias, enc_input.dtype)
# def to_2d(t_3d):
# t_2d = L.gather_nd(t_3d, pad_idx)
# return t_2d
def to_2d(t_3d):
t_2d = L.gather_nd(t_3d, pad_idx)
return t_2d
# def to_3d(t_2d):
# t_3d = L.scatter_nd(
# pad_idx, t_2d, shape=[d_shape[0], d_shape[1], d_model])
# return t_3d
def to_3d(t_2d):
t_3d = L.scatter_nd(
pad_idx, t_2d, shape=[d_shape[0], d_shape[1], d_model])
return t_3d
enc_input = to_2d(enc_input)
all_hidden = []
......@@ -456,32 +449,20 @@ def graph_encoder(enc_input,
encoder_layer.
"""
#global to_2d, to_3d #, batch, seqlen, dynamic_dim
global to_2d, to_3d #, batch, seqlen, dynamic_dim
d_shape = L.shape(input_mask)
pad_idx = build_pad_idx(input_mask)
attn_bias = build_graph_attn_bias(input_mask, n_head, enc_input.dtype, slot_seqlen)
#attn_bias = build_attn_bias(input_mask, n_head, enc_input.dtype)
# d_batch = d_shape[0]
# d_seqlen = d_shape[1]
# pad_idx = L.where(
# L.cast(L.reshape(input_mask, [d_batch, d_seqlen]), 'bool'))
# attn_bias = L.matmul(
# input_mask, input_mask, transpose_y=True) # [batch, seq, seq]
# attn_bias = (1. - attn_bias) * -10000.
# attn_bias = L.stack([attn_bias] * n_head, 1)
# if attn_bias.dtype != enc_input.dtype:
# attn_bias = L.cast(attn_bias, enc_input.dtype)
# def to_2d(t_3d):
# t_2d = L.gather_nd(t_3d, pad_idx)
# return t_2d
def to_2d(t_3d):
t_2d = L.gather_nd(t_3d, pad_idx)
return t_2d
# def to_3d(t_2d):
# t_3d = L.scatter_nd(
# pad_idx, t_2d, shape=[d_shape[0], d_shape[1], d_model])
# return t_3d
def to_3d(t_2d):
t_3d = L.scatter_nd(
pad_idx, t_2d, shape=[d_shape[0], d_shape[1], d_model])
return t_3d
enc_input = to_2d(enc_input)
all_hidden = []
......
......@@ -3,8 +3,6 @@ import paddle.fluid as F
import paddle.fluid.layers as L
from models.base import BaseNet, BaseGNNModel
from models.ernie_model.ernie import ErnieModel
from models.ernie_model.ernie import ErnieGraphModel
from models.ernie_model.ernie import ErnieConfig
class ErnieSageV2(BaseNet):
......@@ -16,19 +14,52 @@ class ErnieSageV2(BaseNet):
return inputs + [term_ids]
def gnn_layer(self, gw, feature, hidden_size, act, initializer, learning_rate, name):
def build_position_ids(src_ids, dst_ids):
src_shape = L.shape(src_ids)
src_batch = src_shape[0]
src_seqlen = src_shape[1]
dst_seqlen = src_seqlen - 1 # without cls
src_position_ids = L.reshape(
L.range(
0, src_seqlen, 1, dtype='int32'), [1, src_seqlen, 1],
inplace=True) # [1, slot_seqlen, 1]
src_position_ids = L.expand(src_position_ids, [src_batch, 1, 1]) # [B, slot_seqlen * num_b, 1]
zero = L.fill_constant([1], dtype='int64', value=0)
input_mask = L.cast(L.equal(src_ids, zero), "int32") # assume pad id == 0 [B, slot_seqlen, 1]
src_pad_len = L.reduce_sum(input_mask, 1, keep_dim=True) # [B, 1, 1]
dst_position_ids = L.reshape(
L.range(
src_seqlen, src_seqlen+dst_seqlen, 1, dtype='int32'), [1, dst_seqlen, 1],
inplace=True) # [1, slot_seqlen, 1]
dst_position_ids = L.expand(dst_position_ids, [src_batch, 1, 1]) # [B, slot_seqlen, 1]
dst_position_ids = dst_position_ids - src_pad_len # [B, slot_seqlen, 1]
position_ids = L.concat([src_position_ids, dst_position_ids], 1)
position_ids = L.cast(position_ids, 'int64')
position_ids.stop_gradient = True
return position_ids
def ernie_send(src_feat, dst_feat, edge_feat):
"""doc"""
# input_ids
cls = L.fill_constant_batch_size_like(src_feat["term_ids"], [-1, 1, 1], "int64", 1)
src_ids = L.concat([cls, src_feat["term_ids"]], 1)
dst_ids = dst_feat["term_ids"]
# sent_ids
sent_ids = L.concat([L.zeros_like(src_ids), L.ones_like(dst_ids)], 1)
term_ids = L.concat([src_ids, dst_ids], 1)
# position_ids
position_ids = build_position_ids(src_ids, dst_ids)
term_ids.stop_gradient = True
sent_ids.stop_gradient = True
ernie = ErnieModel(
term_ids, sent_ids,
term_ids, sent_ids, position_ids,
config=self.config.ernie_config)
feature = ernie.get_pooled_output()
return feature
......
......@@ -18,14 +18,12 @@ import paddle.fluid.layers as L
from models.base import BaseNet, BaseGNNModel
from models.ernie_model.ernie import ErnieModel
from models.ernie_model.ernie import ErnieGraphModel
from models.ernie_model.ernie import ErnieConfig
from models.message_passing import copy_send
class ErnieSageV3(BaseNet):
def __init__(self, config):
super(ErnieSageV3, self).__init__(config)
self.config.layer_type = "ernie_recv_sum"
def build_inputs(self):
inputs = super(ErnieSageV3, self).build_inputs()
......@@ -36,11 +34,10 @@ class ErnieSageV3(BaseNet):
def gnn_layer(self, gw, feature, hidden_size, act, initializer, learning_rate, name):
def ernie_recv(feat):
"""doc"""
# TODO maxlen 400
#pad_value = L.cast(L.assign(input=np.array([0], dtype=np.int32)), "int64")
num_neighbor = self.config.samples[0]
pad_value = L.zeros([1], "int64")
out, _ = L.sequence_pad(feat, pad_value=pad_value, maxlen=10)
out = L.reshape(out, [0, 400])
out, _ = L.sequence_pad(feat, pad_value=pad_value, maxlen=num_neighbor)
out = L.reshape(out, [0, self.config.max_seqlen*num_neighbor])
return out
def erniesage_v3_aggregator(gw, feature, hidden_size, act, initializer, learning_rate, name):
......@@ -74,7 +71,7 @@ class ErnieSageV3(BaseNet):
act,
initializer,
learning_rate=fc_lr,
name="%s_%s" % (self.config.layer_type, i))
name="%s_%s" % ("erniesage_v3", i))
features.append(feature)
return features
......@@ -86,17 +83,16 @@ class ErnieSageV3(BaseNet):
ernie = ErnieGraphModel(
src_ids=feat,
config=ernie_config,
slot_seqlen=self.config.max_seqlen,
name="student_")
slot_seqlen=self.config.max_seqlen)
feat = ernie.get_pooled_output()
fc_lr = self.config.lr / 0.001
feat= L.fc(feat,
self.config.hidden_size,
act="relu",
param_attr=F.ParamAttr(name=name + "_l",
learning_rate=fc_lr),
)
feat = L.l2_normalize(feat, axis=1)
# feat = L.fc(feat,
# self.config.hidden_size,
# act="relu",
# param_attr=F.ParamAttr(name=name + "_l",
# learning_rate=fc_lr),
# )
#feat = L.l2_normalize(feat, axis=1)
if self.config.final_fc:
feat = L.fc(feat,
......
......@@ -36,7 +36,7 @@ from tokenization import FullTokenizer
def term2id(string, tokenizer, max_seqlen):
string = string.split("\t")[1]
#string = string.split("\t")[1]
tokens = tokenizer.tokenize(string)
ids = tokenizer.convert_tokens_to_ids(tokens)
ids = ids[:max_seqlen-1]
......@@ -53,6 +53,7 @@ def dump_graph(args):
term_file = io.open(os.path.join(args.outpath, "terms.txt"), "w", encoding=args.encoding)
terms = []
count = 0
item_distribution = []
with io.open(args.inpath, encoding=args.encoding) as f:
edges = []
......@@ -66,6 +67,7 @@ def dump_graph(args):
str2id[s] = count
count += 1
term_file.write(str(col_idx) + "\t" + col + "\n")
item_distribution.append(0)
slots.append(str2id[s])
......@@ -74,6 +76,7 @@ def dump_graph(args):
neg_samples.append(slots[2:])
edges.append((src, dst))
edges.append((dst, src))
item_distribution[dst] += 1
term_file.close()
edges = np.array(edges, dtype="int64")
......@@ -82,31 +85,27 @@ def dump_graph(args):
log.info("building graph...")
graph = pgl.graph.Graph(num_nodes=num_nodes, edges=edges)
indegree = graph.indegree()
graph.indegree()
graph.outdegree()
graph.dump(args.outpath)
# dump alias sample table
sqrt_indegree = np.sqrt(indegree)
distribution = 1. * sqrt_indegree / sqrt_indegree.sum()
item_distribution = np.array(item_distribution)
item_distribution = np.sqrt(item_distribution)
distribution = 1. * item_distribution / item_distribution.sum()
alias, events = alias_sample_build_table(distribution)
np.save(os.path.join(args.outpath, "alias.npy"), alias)
np.save(os.path.join(args.outpath, "events.npy"), events)
np.save(os.path.join(args.outpath, "neg_samples.npy"), np.array(neg_samples))
log.info("End Build Graph")
def dump_id2str_map(args):
log.info("Dump id2str map starting...")
id2str = np.array([line.strip("\n") for line in open(os.path.join(args.outpath, "terms.txt"), "r", encoding=args.encoding)])
np.save(os.path.join(args.outpath, "id2str.npy"), id2str)
log.info("Dump id2str map done.")
def dump_node_feat(args):
log.info("Dump node feat starting...")
id2str = np.load(os.path.join(args.outpath, "id2str.npy"), mmap_mode="r")
id2str = [line.strip("\n").split("\t")[1] for line in io.open(os.path.join(args.outpath, "terms.txt"), encoding=args.encoding)]
pool = multiprocessing.Pool()
tokenizer = FullTokenizer(args.vocab_file)
term_ids = pool.map(partial(term2id, tokenizer=tokenizer, max_seqlen=args.max_seqlen), id2str)
np.save(os.path.join(args.outpath, "term_ids.npy"), np.array(term_ids))
np.save(os.path.join(args.outpath, "term_ids.npy"), np.array(term_ids, np.uint16))
log.info("Dump node feat done.")
pool.terminate()
......@@ -119,5 +118,4 @@ if __name__ == "__main__":
parser.add_argument("-o", "--outpath", type=str, default=None)
args = parser.parse_args()
dump_graph(args)
dump_id2str_map(args)
dump_node_feat(args)
......@@ -32,8 +32,9 @@ class TrainData(object):
trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
log.info("trainer_id: %s, trainer_count: %s." % (trainer_id, trainer_count))
edges = np.load(os.path.join(graph_path, "edges.npy"), allow_pickle=True)
bidirectional_edges = np.load(os.path.join(graph_path, "edges.npy"), allow_pickle=True)
# edges is bidirectional.
edges = bidirectional_edges[0::2]
train_usr = edges[trainer_id::trainer_count, 0]
train_ad = edges[trainer_id::trainer_count, 1]
returns = {
......@@ -73,7 +74,8 @@ def main(config):
use_pyreader=config.use_pyreader,
phase="train",
graph_data_path=config.graph_path,
shuffle=True)
shuffle=True,
neg_type=config.neg_type)
log.info("build graph reader done.")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册