# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import time import numpy as np import scipy.sparse as sp from sklearn.preprocessing import StandardScaler import pgl from pgl.utils.logger import log from pgl.utils import paddle_helper import paddle import paddle.fluid as fluid import reader from model import graphsage_mean, graphsage_meanpool,\ graphsage_maxpool, graphsage_lstm def load_data(normalize=True, symmetry=True): """ data from https://github.com/matenure/FastGCN/issues/8 reddit_adj.npz: https://drive.google.com/open?id=174vb0Ws7Vxk_QTUtxqTgDHSQ4El4qDHt reddit.npz: https://drive.google.com/open?id=19SphVl_Oe8SJ1r87Hr5a6znx3nJu1F2J """ data = np.load("data/reddit.npz") adj = sp.load_npz("data/reddit_adj.npz") if symmetry: adj = adj + adj.T adj = adj.tocoo() src = adj.row dst = adj.col num_class = 41 train_label = data['y_train'] val_label = data['y_val'] test_label = data['y_test'] train_index = data['train_index'] val_index = data['val_index'] test_index = data['test_index'] feature = data["feats"].astype("float32") if normalize: scaler = StandardScaler() scaler.fit(feature[train_index]) feature = scaler.transform(feature) log.info("Feature shape %s" % (repr(feature.shape))) graph = pgl.graph.Graph( num_nodes=feature.shape[0], edges=list(zip(src, dst)), node_feat={"index": np.arange( 0, len(feature), dtype="int32")}) return { "graph": graph, "train_index": train_index, "train_label": train_label, "val_label": val_label, "val_index": val_index, "test_index": test_index, "test_label": test_label, "feature": feature, "num_class": 41 } def build_graph_model(graph_wrapper, num_class, k_hop, graphsage_type, hidden_size, feature): node_index = fluid.layers.data( "node_index", shape=[None], dtype="int32", append_batch_size=False) node_label = fluid.layers.data( "node_label", shape=[None, 1], dtype="int64", append_batch_size=False) feature = fluid.layers.gather(feature, graph_wrapper.node_feat['index']) feature.stop_gradient = True for i in range(k_hop): if graphsage_type == 'graphsage_mean': feature = graphsage_mean( graph_wrapper, feature, hidden_size, act="relu", name="graphsage_mean_%s" % i) elif graphsage_type == 'graphsage_meanpool': feature = graphsage_meanpool( graph_wrapper, feature, hidden_size, act="relu", name="graphsage_meanpool_%s" % i) elif graphsage_type == 'graphsage_maxpool': feature = graphsage_maxpool( graph_wrapper, feature, hidden_size, act="relu", name="graphsage_maxpool_%s" % i) elif graphsage_type == 'graphsage_lstm': feature = graphsage_lstm( graph_wrapper, feature, hidden_size, act="relu", name="graphsage_maxpool_%s" % i) else: raise ValueError("graphsage type %s is not" " implemented" % graphsage_type) feature = fluid.layers.gather(feature, node_index) logits = fluid.layers.fc(feature, num_class, act=None, name='classification_layer') proba = fluid.layers.softmax(logits) loss = fluid.layers.softmax_with_cross_entropy( logits=logits, label=node_label) loss = fluid.layers.mean(loss) acc = fluid.layers.accuracy(input=proba, label=node_label, k=1) return loss, acc def run_epoch(batch_iter, exe, program, prefix, model_loss, model_acc, epoch, log_per_step=100): batch = 0 total_loss = 0. total_acc = 0. total_sample = 0 start = time.time() for batch_feed_dict in batch_iter(): batch += 1 batch_loss, batch_acc = exe.run(program, fetch_list=[model_loss, model_acc], feed=batch_feed_dict) if batch % log_per_step == 0: log.info("Batch %s %s-Loss %s %s-Acc %s" % (batch, prefix, batch_loss, prefix, batch_acc)) num_samples = len(batch_feed_dict["node_index"]) total_loss += batch_loss * num_samples total_acc += batch_acc * num_samples total_sample += num_samples end = time.time() log.info("%s Epoch %s Loss %.5lf Acc %.5lf Speed(per batch) %.5lf sec" % (prefix, epoch, total_loss / total_sample, total_acc / total_sample, (end - start) / batch)) def main(args): data = load_data(args.normalize, args.symmetry) log.info("preprocess finish") log.info("Train Examples: %s" % len(data["train_index"])) log.info("Val Examples: %s" % len(data["val_index"])) log.info("Test Examples: %s" % len(data["test_index"])) log.info("Num nodes %s" % data["graph"].num_nodes) log.info("Num edges %s" % data["graph"].num_edges) log.info("Average Degree %s" % np.mean(data["graph"].indegree())) place = fluid.CUDAPlace(0) if args.use_cuda else fluid.CPUPlace() train_program = fluid.Program() startup_program = fluid.Program() samples = [] if args.samples_1 > 0: samples.append(args.samples_1) if args.samples_2 > 0: samples.append(args.samples_2) with fluid.program_guard(train_program, startup_program): feature, feature_init = paddle_helper.constant( "feat", dtype=data['feature'].dtype, value=data['feature'], hide_batch_size=False) graph_wrapper = pgl.graph_wrapper.GraphWrapper( "sub_graph", place, node_feat=data['graph'].node_feat_info()) model_loss, model_acc = build_graph_model( graph_wrapper, num_class=data["num_class"], feature=feature, hidden_size=args.hidden_size, graphsage_type=args.graphsage_type, k_hop=len(samples)) test_program = train_program.clone(for_test=True) with fluid.program_guard(train_program, startup_program): adam = fluid.optimizer.Adam(learning_rate=args.lr) adam.minimize(model_loss) exe = fluid.Executor(place) exe.run(startup_program) feature_init(place) if args.sample_workers > 1: train_iter = reader.multiprocess_graph_reader( data['graph'], graph_wrapper, samples=samples, num_workers=args.sample_workers, batch_size=args.batch_size, node_index=data['train_index'], node_label=data["train_label"]) else: train_iter = reader.graph_reader( data['graph'], graph_wrapper, samples=samples, batch_size=args.batch_size, node_index=data['train_index'], node_label=data["train_label"]) if args.sample_workers > 1: val_iter = reader.multiprocess_graph_reader( data['graph'], graph_wrapper, samples=samples, num_workers=args.sample_workers, batch_size=args.batch_size, node_index=data['val_index'], node_label=data["val_label"]) else: val_iter = reader.graph_reader( data['graph'], graph_wrapper, samples=samples, batch_size=args.batch_size, node_index=data['val_index'], node_label=data["val_label"]) if args.sample_workers > 1: test_iter = reader.multiprocess_graph_reader( data['graph'], graph_wrapper, samples=samples, num_workers=args.sample_workers, batch_size=args.batch_size, node_index=data['test_index'], node_label=data["test_label"]) else: test_iter = reader.graph_reader( data['graph'], graph_wrapper, samples=samples, batch_size=args.batch_size, node_index=data['test_index'], node_label=data["test_label"]) for epoch in range(args.epoch): run_epoch( train_iter, program=train_program, exe=exe, prefix="train", model_loss=model_loss, model_acc=model_acc, epoch=epoch) run_epoch( val_iter, program=test_program, exe=exe, prefix="val", model_loss=model_loss, model_acc=model_acc, log_per_step=10000, epoch=epoch) run_epoch( test_iter, program=test_program, prefix="test", exe=exe, model_loss=model_loss, model_acc=model_acc, log_per_step=10000, epoch=epoch) if __name__ == "__main__": parser = argparse.ArgumentParser(description='graphsage') parser.add_argument("--use_cuda", action='store_true', help="use_cuda") parser.add_argument( "--normalize", action='store_true', help="normalize features") parser.add_argument( "--symmetry", action='store_true', help="undirect graph") parser.add_argument("--graphsage_type", type=str, default="graphsage_mean") parser.add_argument("--sample_workers", type=int, default=5) parser.add_argument("--epoch", type=int, default=10) parser.add_argument("--hidden_size", type=int, default=128) parser.add_argument("--batch_size", type=int, default=128) parser.add_argument("--lr", type=float, default=0.01) parser.add_argument("--samples_1", type=int, default=25) parser.add_argument("--samples_2", type=int, default=10) args = parser.parse_args() log.info(args) main(args)