From ed4749285db4813263d872faa93c8124d87d51fe Mon Sep 17 00:00:00 2001 From: suweiyue Date: Wed, 19 Aug 2020 00:01:23 +0800 Subject: [PATCH] erniesage: reconsitution --- ...v1_gpu.yaml => erniesage_linkpredict.yaml} | 19 +- .../erniesage/config/erniesage_v1_cpu.yaml | 56 ---- .../erniesage/config/erniesage_v2_cpu.yaml | 56 ---- .../erniesage/config/erniesage_v2_gpu.yaml | 56 ---- .../erniesage/config/erniesage_v3_cpu.yaml | 55 ---- .../erniesage/config/erniesage_v3_gpu.yaml | 55 ---- examples/erniesage/dataset/graph_reader.py | 20 +- examples/erniesage/infer.py | 14 +- examples/erniesage/local_run.sh | 2 +- examples/erniesage/models/base.py | 244 --------------- examples/erniesage/models/encoder.py | 287 ++++++++++++++++++ examples/erniesage/models/erniesage_v1.py | 42 --- examples/erniesage/models/erniesage_v2.py | 135 -------- examples/erniesage/models/erniesage_v3.py | 119 -------- examples/erniesage/models/loss.py | 88 ++++++ examples/erniesage/models/model.py | 110 +++++++ examples/erniesage/models/model_factory.py | 24 -- .../erniesage/preprocessing/dump_graph.py | 41 ++- examples/erniesage/train.py | 18 +- 19 files changed, 556 insertions(+), 885 deletions(-) rename examples/erniesage/config/{erniesage_v1_gpu.yaml => erniesage_linkpredict.yaml} (79%) delete mode 100644 examples/erniesage/config/erniesage_v1_cpu.yaml delete mode 100644 examples/erniesage/config/erniesage_v2_cpu.yaml delete mode 100644 examples/erniesage/config/erniesage_v2_gpu.yaml delete mode 100644 examples/erniesage/config/erniesage_v3_cpu.yaml delete mode 100644 examples/erniesage/config/erniesage_v3_gpu.yaml delete mode 100644 examples/erniesage/models/base.py create mode 100644 examples/erniesage/models/encoder.py delete mode 100644 examples/erniesage/models/erniesage_v1.py delete mode 100644 examples/erniesage/models/erniesage_v2.py delete mode 100644 examples/erniesage/models/erniesage_v3.py create mode 100644 examples/erniesage/models/loss.py create mode 100644 examples/erniesage/models/model.py delete mode 100644 examples/erniesage/models/model_factory.py diff --git a/examples/erniesage/config/erniesage_v1_gpu.yaml b/examples/erniesage/config/erniesage_linkpredict.yaml similarity index 79% rename from examples/erniesage/config/erniesage_v1_gpu.yaml rename to examples/erniesage/config/erniesage_linkpredict.yaml index 7b883fe..b0209dc 100644 --- a/examples/erniesage/config/erniesage_v1_gpu.yaml +++ b/examples/erniesage/config/erniesage_linkpredict.yaml @@ -8,20 +8,22 @@ batch_size: 32 CPU_NUM: 10 epoch: 20 log_per_step: 1 -save_per_step: 100 +save_per_step: 1000 output_path: "./output" ckpt_path: "./ernie_base_ckpt" # data config ------ -input_data: "./data.txt" -graph_path: "./workdir" +train_data: "./data.txt" +graph_data: "./data.txt" + +graph_work_path: "./workdir" sample_workers: 1 use_pyreader: true input_type: "text" # model config ------ samples: [10] -model_type: "ErnieSageModelV1" +model_type: "ERNIESageV3" layer_type: "graphsage_sum" max_seqlen: 40 @@ -31,7 +33,8 @@ hidden_size: 128 final_fc: true final_l2_norm: true loss_type: "hinge" -margin: 0.3 +margin: 0.1 +neg_type: "batch_neg" # infer config ------ infer_model: "./output/last" @@ -44,11 +47,11 @@ ernie_config: attention_probs_dropout_prob: 0.1 hidden_act: "relu" hidden_dropout_prob: 0.1 - hidden_size: 768 + hidden_size: 128 initializer_range: 0.02 max_position_embeddings: 513 - num_attention_heads: 12 - num_hidden_layers: 12 + num_attention_heads: 2 + num_hidden_layers: 2 sent_type_vocab_size: 4 task_type_vocab_size: 3 vocab_size: 18000 diff --git a/examples/erniesage/config/erniesage_v1_cpu.yaml b/examples/erniesage/config/erniesage_v1_cpu.yaml deleted file mode 100644 index 1f7e5ed..0000000 --- a/examples/erniesage/config/erniesage_v1_cpu.yaml +++ /dev/null @@ -1,56 +0,0 @@ -# Global Enviroment Settings -# -# trainer config ------ -learner_type: "cpu" -optimizer_type: "adam" -lr: 0.00005 -batch_size: 2 -CPU_NUM: 10 -epoch: 20 -log_per_step: 1 -save_per_step: 100 -output_path: "./output" -ckpt_path: "./ernie_base_ckpt" - -# data config ------ -input_data: "./data.txt" -graph_path: "./workdir" -sample_workers: 1 -use_pyreader: true -input_type: "text" - -# model config ------ -samples: [10] -model_type: "ErnieSageModelV1" -layer_type: "graphsage_sum" - -max_seqlen: 40 - -num_layers: 1 -hidden_size: 128 -final_fc: true -final_l2_norm: true -loss_type: "hinge" -margin: 0.3 - -# infer config ------ -infer_model: "./output/last" -infer_batch_size: 128 - -# ernie config ------ -encoding: "utf8" -ernie_vocab_file: "./vocab.txt" -ernie_config: - attention_probs_dropout_prob: 0.1 - hidden_act: "relu" - hidden_dropout_prob: 0.1 - hidden_size: 768 - initializer_range: 0.02 - max_position_embeddings: 513 - num_attention_heads: 12 - num_hidden_layers: 12 - sent_type_vocab_size: 4 - task_type_vocab_size: 3 - vocab_size: 18000 - use_task_id: false - use_fp16: false diff --git a/examples/erniesage/config/erniesage_v2_cpu.yaml b/examples/erniesage/config/erniesage_v2_cpu.yaml deleted file mode 100644 index 77b7805..0000000 --- a/examples/erniesage/config/erniesage_v2_cpu.yaml +++ /dev/null @@ -1,56 +0,0 @@ -# Global Enviroment Settings -# -# trainer config ------ -learner_type: "cpu" -optimizer_type: "adam" -lr: 0.00005 -batch_size: 4 -CPU_NUM: 16 -epoch: 3 -log_per_step: 1 -save_per_step: 100 -output_path: "./output" -ckpt_path: "./ernie_base_ckpt" - -# data config ------ -input_data: "./data.txt" -graph_path: "./workdir" -sample_workers: 1 -use_pyreader: true -input_type: "text" - -# model config ------ -samples: [10] -model_type: "ErnieSageModelV2" - -max_seqlen: 40 - -num_layers: 1 -hidden_size: 128 -final_fc: true -final_l2_norm: true -loss_type: "hinge" -margin: 0.3 -neg_type: "batch_neg" - -# infer config ------ -infer_model: "./output/last" -infer_batch_size: 128 - -# ernie config ------ -encoding: "utf8" -ernie_vocab_file: "./vocab.txt" -ernie_config: - attention_probs_dropout_prob: 0.1 - hidden_act: "relu" - hidden_dropout_prob: 0.1 - hidden_size: 768 - initializer_range: 0.02 - max_position_embeddings: 513 - num_attention_heads: 12 - num_hidden_layers: 12 - sent_type_vocab_size: 2 - task_type_vocab_size: 3 - vocab_size: 18000 - use_task_id: false - use_fp16: false diff --git a/examples/erniesage/config/erniesage_v2_gpu.yaml b/examples/erniesage/config/erniesage_v2_gpu.yaml deleted file mode 100644 index 7a9b4af..0000000 --- a/examples/erniesage/config/erniesage_v2_gpu.yaml +++ /dev/null @@ -1,56 +0,0 @@ -# Global Enviroment Settings -# -# trainer config ------ -learner_type: "gpu" -optimizer_type: "adam" -lr: 0.00005 -batch_size: 32 -CPU_NUM: 10 -epoch: 3 -log_per_step: 10 -save_per_step: 1000 -output_path: "./output" -ckpt_path: "./ernie_base_ckpt" - -# data config ------ -input_data: "./data.txt" -graph_path: "./workdir" -sample_workers: 1 -use_pyreader: true -input_type: "text" - -# model config ------ -samples: [10] -model_type: "ErnieSageModelV2" - -max_seqlen: 40 - -num_layers: 1 -hidden_size: 128 -final_fc: true -final_l2_norm: true -loss_type: "hinge" -margin: 0.3 -neg_type: "batch_neg" - -# infer config ------ -infer_model: "./output/last" -infer_batch_size: 128 - -# ernie config ------ -encoding: "utf8" -ernie_vocab_file: "./vocab.txt" -ernie_config: - attention_probs_dropout_prob: 0.1 - hidden_act: "relu" - hidden_dropout_prob: 0.1 - hidden_size: 768 - initializer_range: 0.02 - max_position_embeddings: 513 - num_attention_heads: 12 - num_hidden_layers: 12 - sent_type_vocab_size: 2 - task_type_vocab_size: 3 - vocab_size: 18000 - use_task_id: false - use_fp16: false diff --git a/examples/erniesage/config/erniesage_v3_cpu.yaml b/examples/erniesage/config/erniesage_v3_cpu.yaml deleted file mode 100644 index 2172a26..0000000 --- a/examples/erniesage/config/erniesage_v3_cpu.yaml +++ /dev/null @@ -1,55 +0,0 @@ -# Global Enviroment Settings -# -# trainer config ------ -learner_type: "cpu" -optimizer_type: "adam" -lr: 0.00005 -batch_size: 2 -CPU_NUM: 10 -epoch: 20 -log_per_step: 1 -save_per_step: 100 -output_path: "./output" -ckpt_path: "./ernie_base_ckpt" - -# data config ------ -input_data: "./data.txt" -graph_path: "./workdir" -sample_workers: 1 -use_pyreader: true -input_type: "text" - -# model config ------ -samples: [10] -model_type: "ErnieSageModelV3" - -max_seqlen: 40 - -num_layers: 1 -hidden_size: 128 -final_fc: true -final_l2_norm: true -loss_type: "hinge" -margin: 0.3 - -# infer config ------ -infer_model: "./output/last" -infer_batch_size: 128 - -# ernie config ------ -encoding: "utf8" -ernie_vocab_file: "./vocab.txt" -ernie_config: - attention_probs_dropout_prob: 0.1 - hidden_act: "relu" - hidden_dropout_prob: 0.1 - hidden_size: 768 - initializer_range: 0.02 - max_position_embeddings: 513 - num_attention_heads: 12 - num_hidden_layers: 12 - sent_type_vocab_size: 4 - task_type_vocab_size: 3 - vocab_size: 18000 - use_task_id: false - use_fp16: false diff --git a/examples/erniesage/config/erniesage_v3_gpu.yaml b/examples/erniesage/config/erniesage_v3_gpu.yaml deleted file mode 100644 index e53ab33..0000000 --- a/examples/erniesage/config/erniesage_v3_gpu.yaml +++ /dev/null @@ -1,55 +0,0 @@ -# Global Enviroment Settings -# -# trainer config ------ -learner_type: "gpu" -optimizer_type: "adam" -lr: 0.00005 -batch_size: 32 -CPU_NUM: 10 -epoch: 20 -log_per_step: 1 -save_per_step: 100 -output_path: "./output" -ckpt_path: "./ernie_base_ckpt" - -# data config ------ -input_data: "./data.txt" -graph_path: "./workdir" -sample_workers: 1 -use_pyreader: true -input_type: "text" - -# model config ------ -samples: [10] -model_type: "ErnieSageModelV3" - -max_seqlen: 40 - -num_layers: 1 -hidden_size: 128 -final_fc: true -final_l2_norm: true -loss_type: "hinge" -margin: 0.3 - -# infer config ------ -infer_model: "./output/last" -infer_batch_size: 128 - -# ernie config ------ -encoding: "utf8" -ernie_vocab_file: "./vocab.txt" -ernie_config: - attention_probs_dropout_prob: 0.1 - hidden_act: "relu" - hidden_dropout_prob: 0.1 - hidden_size: 768 - initializer_range: 0.02 - max_position_embeddings: 513 - num_attention_heads: 12 - num_hidden_layers: 12 - sent_type_vocab_size: 4 - task_type_vocab_size: 3 - vocab_size: 18000 - use_task_id: false - use_fp16: false diff --git a/examples/erniesage/dataset/graph_reader.py b/examples/erniesage/dataset/graph_reader.py index c811b56..5040942 100644 --- a/examples/erniesage/dataset/graph_reader.py +++ b/examples/erniesage/dataset/graph_reader.py @@ -74,17 +74,15 @@ class GraphGenerator(BaseDataGenerator): batch_dst = np.array(batch_dst, dtype="int64") if self.neg_type == "batch_neg": - neg_shape = [1] + batch_neg = batch_dst else: + # TODO user define shape of neg_sample neg_shape = batch_dst.shape - sampled_batch_neg = alias_sample(neg_shape, self.alias, self.events) - - if len(batch_neg) > 0: + sampled_batch_neg = alias_sample(neg_shape, self.alias, self.events) batch_neg = np.concatenate([batch_neg, sampled_batch_neg], 0) - else: - batch_neg = sampled_batch_neg if self.phase == "train": + # TODO user define ignore edges or not #ignore_edges = np.concatenate([np.stack([batch_src, batch_dst], 1), np.stack([batch_dst, batch_src], 1)], 0) ignore_edges = set() else: @@ -92,7 +90,8 @@ class GraphGenerator(BaseDataGenerator): nodes = np.unique(np.concatenate([batch_src, batch_dst, batch_neg], 0)) subgraphs = graphsage_sample(self.graph, nodes, self.samples, ignore_edges=ignore_edges) - #subgraphs[0].reindex_to_parrent_nodes(subgraphs[0].nodes) + subgraphs[0].node_feat["index"] = subgraphs[0].reindex_to_parrent_nodes(subgraphs[0].nodes).astype(np.int64) + subgraphs[0].node_feat["term_ids"] = self.term_ids[subgraphs[0].node_feat["index"]].astype(np.int64) feed_dict = {} for i in range(self.num_layers): feed_dict.update(self.graph_wrappers[i].to_feed(subgraphs[i])) @@ -103,9 +102,12 @@ class GraphGenerator(BaseDataGenerator): sub_neg_idx = subgraphs[0].reindex_from_parrent_nodes(batch_neg) feed_dict["user_index"] = np.array(sub_src_idx, dtype="int64") - feed_dict["item_index"] = np.array(sub_dst_idx, dtype="int64") + feed_dict["pos_item_index"] = np.array(sub_dst_idx, dtype="int64") feed_dict["neg_item_index"] = np.array(sub_neg_idx, dtype="int64") - feed_dict["term_ids"] = self.term_ids[subgraphs[0].node_feat["index"]].astype(np.int64) + + feed_dict["user_real_index"] = np.array(sub_src_idx, dtype="int64") + feed_dict["pos_item_real_index"] = np.array(sub_dst_idx, dtype="int64") + feed_dict["neg_item_real_index"] = np.array(sub_neg_idx, dtype="int64") return feed_dict def __call__(self): diff --git a/examples/erniesage/infer.py b/examples/erniesage/infer.py index 9fc4aee..b7231a4 100644 --- a/examples/erniesage/infer.py +++ b/examples/erniesage/infer.py @@ -34,7 +34,7 @@ from pgl.utils import paddle_helper import paddle import paddle.fluid as F -from models.model_factory import Model +from models.model import LinkPredictModel from dataset.graph_reader import GraphGenerator @@ -59,7 +59,7 @@ def run_predict(py_reader, log_per_step=1, args=None): - id2str = io.open(os.path.join(args.graph_path, "terms.txt"), encoding=args.encoding).readlines() + id2str = io.open(os.path.join(args.graph_work_path, "terms.txt"), encoding=args.encoding).readlines() trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) @@ -71,7 +71,7 @@ def run_predict(py_reader, for batch_feed_dict in py_reader(): batch += 1 - batch_usr_feat, batch_ad_feat, _, batch_src_real_index = exe.run( + batch_usr_feat, _, _, batch_src_real_index, _, _ = exe.run( program, feed=batch_feed_dict, fetch_list=model_dict.outputs) @@ -79,7 +79,7 @@ def run_predict(py_reader, if batch % log_per_step == 0: log.info("Predict %s finished" % batch) - for ufs, _, sri in zip(batch_usr_feat, batch_ad_feat, batch_src_real_index): + for ufs, sri in zip(batch_usr_feat, batch_src_real_index): if args.input_type == "text": sri = id2str[int(sri)].strip("\n") line = "{}\t{}\n".format(sri, tostr(ufs)) @@ -108,7 +108,7 @@ def _warmstart(exe, program, path='params'): ) def main(config): - model = Model.factory(config) + model = LinkPredictModel(config) if config.learner_type == "cpu": place = F.CPUPlace() @@ -143,7 +143,7 @@ def main(config): build_strategy=build_strategy, exec_strategy=exec_strategy) - num_nodes = int(np.load(os.path.join(config.graph_path, "num_nodes.npy"))) + num_nodes = int(np.load(os.path.join(config.graph_work_path, "num_nodes.npy"))) predict_data = PredictData(num_nodes) @@ -156,7 +156,7 @@ def main(config): feed_name_list=[var.name for var in model.feed_list], use_pyreader=config.use_pyreader, phase="predict", - graph_data_path=config.graph_path, + graph_data_path=config.graph_work_path, shuffle=False) if config.learner_type == "cpu": diff --git a/examples/erniesage/local_run.sh b/examples/erniesage/local_run.sh index 6a11daf..9a27d71 100644 --- a/examples/erniesage/local_run.sh +++ b/examples/erniesage/local_run.sh @@ -57,7 +57,7 @@ collective_local_train(){ eval $(parse_yaml $config) -python ./preprocessing/dump_graph.py -i $input_data -o $graph_path --encoding $encoding -l $max_seqlen --vocab_file $ernie_vocab_file +python ./preprocessing/dump_graph.py -i $train_data -g $graph_data -o $graph_work_path --encoding $encoding -l $max_seqlen --vocab_file $ernie_vocab_file if [[ $learner_type == "cpu" ]];then transpiler_local_train diff --git a/examples/erniesage/models/base.py b/examples/erniesage/models/base.py deleted file mode 100644 index e93fd5f..0000000 --- a/examples/erniesage/models/base.py +++ /dev/null @@ -1,244 +0,0 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import time -import glob -import os - -import numpy as np - -import pgl -import paddle.fluid as F -import paddle.fluid.layers as L - -from models import message_passing - -def get_layer(layer_type, gw, feature, hidden_size, act, initializer, learning_rate, name, is_test=False): - return getattr(message_passing, layer_type)(gw, feature, hidden_size, act, initializer, learning_rate, name) - - -class BaseGraphWrapperBuilder(object): - def __init__(self, config): - self.config = config - self.node_feature_info = [] - self.edge_feature_info = [] - - def __call__(self): - place = F.CPUPlace() - graph_wrappers = [] - for i in range(self.config.num_layers): - # all graph have same node_feat_info - graph_wrappers.append( - pgl.graph_wrapper.GraphWrapper( - "layer_%s" % i, node_feat=self.node_feature_info, edge_feat=self.edge_feature_info)) - return graph_wrappers - - -class GraphsageGraphWrapperBuilder(BaseGraphWrapperBuilder): - def __init__(self, config): - super(GraphsageGraphWrapperBuilder, self).__init__(config) - self.node_feature_info.append(('index', [None], np.dtype('int64'))) - - -class BaseGNNModel(object): - def __init__(self, config): - self.config = config - self.graph_wrapper_builder = self.gen_graph_wrapper_builder(config) - self.net_fn = self.gen_net_fn(config) - self.feed_list_builder = self.gen_feed_list_builder(config) - self.data_loader_builder = self.gen_data_loader_builder(config) - self.loss_fn = self.gen_loss_fn(config) - self.build() - - - def gen_graph_wrapper_builder(self, config): - return GraphsageGraphWrapperBuilder(config) - - def gen_net_fn(self, config): - return BaseNet(config) - - def gen_feed_list_builder(self, config): - return BaseFeedListBuilder(config) - - def gen_data_loader_builder(self, config): - return BaseDataLoaderBuilder(config) - - def gen_loss_fn(self, config): - return BaseLoss(config) - - def build(self): - self.graph_wrappers = self.graph_wrapper_builder() - self.inputs, self.outputs = self.net_fn(self.graph_wrappers) - self.feed_list = self.feed_list_builder(self.inputs, self.graph_wrappers) - self.data_loader = self.data_loader_builder(self.feed_list) - self.loss = self.loss_fn(self.outputs) - -class BaseFeedListBuilder(object): - def __init__(self, config): - self.config = config - - def __call__(self, inputs, graph_wrappers): - feed_list = [] - for i in range(len(graph_wrappers)): - feed_list.extend(graph_wrappers[i].holder_list) - feed_list.extend(inputs) - return feed_list - - -class BaseDataLoaderBuilder(object): - def __init__(self, config): - self.config = config - - def __call__(self, feed_list): - data_loader = F.io.PyReader( - feed_list=feed_list, capacity=20, use_double_buffer=True, iterable=True) - return data_loader - - - -class BaseNet(object): - def __init__(self, config): - self.config = config - - def take_final_feature(self, feature, index, name): - """take final feature""" - feat = L.gather(feature, index, overwrite=False) - - if self.config.final_fc: - feat = L.fc(feat, - self.config.hidden_size, - param_attr=F.ParamAttr(name=name + '_w'), - bias_attr=F.ParamAttr(name=name + '_b')) - - if self.config.final_l2_norm: - feat = L.l2_normalize(feat, axis=1) - return feat - - def build_inputs(self): - user_index = L.data( - "user_index", shape=[None], dtype="int64", append_batch_size=False) - item_index = L.data( - "item_index", shape=[None], dtype="int64", append_batch_size=False) - neg_item_index = L.data( - "neg_item_index", shape=[None], dtype="int64", append_batch_size=False) - return [user_index, item_index, neg_item_index] - - def build_embedding(self, graph_wrappers, inputs=None): - num_embed = int(np.load(os.path.join(self.config.graph_path, "num_nodes.npy"))) - is_sparse = self.config.trainer_type == "Transpiler" - - embed = L.embedding( - input=L.reshape(graph_wrappers[0].node_feat['index'], [-1, 1]), - size=[num_embed, self.config.hidden_size], - is_sparse=is_sparse, - param_attr=F.ParamAttr(name="node_embedding", initializer=F.initializer.Uniform( - low=-1. / self.config.hidden_size, - high=1. / self.config.hidden_size))) - return embed - - def gnn_layers(self, graph_wrappers, feature): - features = [feature] - - initializer = None - fc_lr = self.config.lr / 0.001 - - for i in range(self.config.num_layers): - if i == self.config.num_layers - 1: - act = None - else: - act = "leaky_relu" - - feature = get_layer( - self.config.layer_type, - graph_wrappers[i], - feature, - self.config.hidden_size, - act, - initializer, - learning_rate=fc_lr, - name="%s_%s" % (self.config.layer_type, i)) - features.append(feature) - return features - - def __call__(self, graph_wrappers): - inputs = self.build_inputs() - feature = self.build_embedding(graph_wrappers, inputs) - features = self.gnn_layers(graph_wrappers, feature) - outputs = [self.take_final_feature(features[-1], i, "final_fc") for i in inputs] - src_real_index = L.gather(graph_wrappers[0].node_feat['index'], inputs[0]) - outputs.append(src_real_index) - return inputs, outputs - -def all_gather(X): - trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) - trainer_num = int(os.getenv("PADDLE_TRAINERS_NUM", "0")) - if trainer_num == 1: - copy_X = X * 1 - copy_X.stop_gradients=True - return copy_X - - Xs = [] - for i in range(trainer_num): - copy_X = X * 1 - copy_X = L.collective._broadcast(copy_X, i, True) - copy_X.stop_gradient=True - Xs.append(copy_X) - - if len(Xs) > 1: - Xs=L.concat(Xs, 0) - Xs.stop_gradient=True - else: - Xs = Xs[0] - return Xs - -class BaseLoss(object): - def __init__(self, config): - self.config = config - - def __call__(self, outputs): - user_feat, item_feat, neg_item_feat = outputs[0], outputs[1], outputs[2] - loss_type = self.config.loss_type - - if self.config.neg_type == "batch_neg": - neg_item_feat = item_feat - # Calc Loss - if self.config.loss_type == "hinge": - pos = L.reduce_sum(user_feat * item_feat, -1, keep_dim=True) # [B, 1] - neg = L.matmul(user_feat, neg_item_feat, transpose_y=True) # [B, B] - loss = L.reduce_mean(L.relu(neg - pos + self.config.margin)) - elif self.config.loss_type == "all_hinge": - pos = L.reduce_sum(user_feat * item_feat, -1, keep_dim=True) # [B, 1] - all_pos = all_gather(pos) # [B * n, 1] - all_neg_item_feat = all_gather(neg_item_feat) # [B * n, 1] - all_user_feat = all_gather(user_feat) # [B * n, 1] - - neg1 = L.matmul(user_feat, all_neg_item_feat, transpose_y=True) # [B, B * n] - neg2 = L.matmul(all_user_feat, neg_item_feat, transpose_y=True) # [B *n, B] - - loss1 = L.reduce_mean(L.relu(neg1 - pos + self.config.margin)) - loss2 = L.reduce_mean(L.relu(neg2 - all_pos + self.config.margin)) - - #loss = (loss1 + loss2) / 2 - loss = loss1 + loss2 - - elif self.config.loss_type == "softmax": - pass - # TODO - # pos = L.reduce_sum(user_feat * item_feat, -1, keep_dim=True) # [B, 1] - # neg = L.matmul(user_feat, neg_feat, transpose_y=True) # [B, B] - # logits = L.concat([pos, neg], -1) # [B, 1+B] - # labels = L.fill_constant_batch_size_like(logits, [-1, 1], "int64", 0) - # loss = L.reduce_mean(L.softmax_with_cross_entropy(logits, labels)) - else: - raise ValueError - return loss diff --git a/examples/erniesage/models/encoder.py b/examples/erniesage/models/encoder.py new file mode 100644 index 0000000..72ac6b0 --- /dev/null +++ b/examples/erniesage/models/encoder.py @@ -0,0 +1,287 @@ +import numpy as np +import pgl +import paddle.fluid as F +import paddle.fluid.layers as L +from models.ernie_model.ernie import ErnieModel +from models.ernie_model.ernie import ErnieGraphModel +from models.ernie_model.ernie import ErnieConfig +from models import message_passing +from models.message_passing import copy_send + + +def get_layer(layer_type, gw, feature, hidden_size, act, initializer, learning_rate, name, is_test=False): + return getattr(message_passing, layer_type)(gw, feature, hidden_size, act, initializer, learning_rate, name) + + +class Encoder(object): + + def __init__(self, config): + self.config = config + + @classmethod + def factory(cls, config): + model_type = config.model_type + if model_type == "ERNIESageV1": + return ERNIESageV1Encoder(config) + elif model_type == "ERNIESageV2": + return ERNIESageV2Encoder(config) + elif model_type == "ERNIESageV3": + return ERNIESageV3Encoder(config) + elif model_type == "ERNIESageV4": + return ERNIESageV4Encoder(config) + else: + raise ValueError + + def __call__(self, graph_wrappers, inputs): + raise NotImplementedError + + +class ERNIESageV1Encoder(Encoder): + def __call__(self, graph_wrappers, inputs): + feature = self.build_embedding(graph_wrappers[0].node_feat["term_ids"]) + + initializer = None + fc_lr = self.config.lr / 0.001 + for i in range(self.config.num_layers): + if i == self.config.num_layers - 1: + act = None + else: + act = "leaky_relu" + feature = get_layer( + self.config.layer_type, + graph_wrappers[i], + feature, + self.config.hidden_size, + act, + initializer, + learning_rate=fc_lr, + name="%s_%s" % (self.config.layer_type, i)) + + final_feats = [self.take_final_feature(feature, i, "final_fc") for i in inputs] + return final_feats + + def build_embedding(self, term_ids): + term_ids = L.unsqueeze(term_ids, [-1]) + ernie_config = self.config.ernie_config + ernie = ErnieModel( + src_ids=term_ids, + sentence_ids=L.zeros_like(term_ids), + task_ids=None, + config=ernie_config, + use_fp16=False, + name="") + feature = ernie.get_pooled_output() + return feature + + def take_final_feature(self, feature, index, name): + """take final feature""" + feat = L.gather(feature, index, overwrite=False) + + if self.config.final_fc: + feat = L.fc(feat, + self.config.hidden_size, + param_attr=F.ParamAttr(name=name + '_w'), + bias_attr=F.ParamAttr(name=name + '_b')) + + if self.config.final_l2_norm: + feat = L.l2_normalize(feat, axis=1) + return feat + +class ERNIESageV2Encoder(Encoder): + + def __call__(self, graph_wrappers, inputs): + feature = graph_wrappers[0].node_feat["term_ids"] + feature = self.gnn_layer(graph_wrappers[0], feature, self.config.hidden_size, 'leaky_relu', None, 1., "erniesage_v2_0") + + initializer = None + fc_lr = self.config.lr / 0.001 + for i in range(1, self.config.num_layers): + if i == self.config.num_layers - 1: + act = None + else: + act = "leaky_relu" + feature = get_layer( + self.config.layer_type, + graph_wrappers[i], + feature, + self.config.hidden_size, + act, + initializer, + learning_rate=fc_lr, + name="%s_%s" % (self.config.layer_type, i)) + + final_feats = [self.take_final_feature(feature, i, "final_fc") for i in inputs] + return final_feats + + def take_final_feature(self, feature, index, name): + """take final feature""" + feat = L.gather(feature, index, overwrite=False) + + if self.config.final_fc: + feat = L.fc(feat, + self.config.hidden_size, + param_attr=F.ParamAttr(name=name + '_w'), + bias_attr=F.ParamAttr(name=name + '_b')) + + if self.config.final_l2_norm: + feat = L.l2_normalize(feat, axis=1) + return feat + + def gnn_layer(self, gw, feature, hidden_size, act, initializer, learning_rate, name): + def build_position_ids(src_ids, dst_ids): + src_shape = L.shape(src_ids) + src_batch = src_shape[0] + src_seqlen = src_shape[1] + dst_seqlen = src_seqlen - 1 # without cls + + src_position_ids = L.reshape( + L.range( + 0, src_seqlen, 1, dtype='int32'), [1, src_seqlen, 1], + inplace=True) # [1, slot_seqlen, 1] + src_position_ids = L.expand(src_position_ids, [src_batch, 1, 1]) # [B, slot_seqlen * num_b, 1] + zero = L.fill_constant([1], dtype='int64', value=0) + input_mask = L.cast(L.equal(src_ids, zero), "int32") # assume pad id == 0 [B, slot_seqlen, 1] + src_pad_len = L.reduce_sum(input_mask, 1, keep_dim=True) # [B, 1, 1] + + dst_position_ids = L.reshape( + L.range( + src_seqlen, src_seqlen+dst_seqlen, 1, dtype='int32'), [1, dst_seqlen, 1], + inplace=True) # [1, slot_seqlen, 1] + dst_position_ids = L.expand(dst_position_ids, [src_batch, 1, 1]) # [B, slot_seqlen, 1] + dst_position_ids = dst_position_ids - src_pad_len # [B, slot_seqlen, 1] + + position_ids = L.concat([src_position_ids, dst_position_ids], 1) + position_ids = L.cast(position_ids, 'int64') + position_ids.stop_gradient = True + return position_ids + + + def ernie_send(src_feat, dst_feat, edge_feat): + """doc""" + # input_ids + cls = L.fill_constant_batch_size_like(src_feat["term_ids"], [-1, 1, 1], "int64", 1) + src_ids = L.concat([cls, src_feat["term_ids"]], 1) + dst_ids = dst_feat["term_ids"] + + # sent_ids + sent_ids = L.concat([L.zeros_like(src_ids), L.ones_like(dst_ids)], 1) + term_ids = L.concat([src_ids, dst_ids], 1) + + # position_ids + position_ids = build_position_ids(src_ids, dst_ids) + + term_ids.stop_gradient = True + sent_ids.stop_gradient = True + ernie = ErnieModel( + term_ids, sent_ids, position_ids, + config=self.config.ernie_config) + feature = ernie.get_pooled_output() + return feature + + def erniesage_v2_aggregator(gw, feature, hidden_size, act, initializer, learning_rate, name): + feature = L.unsqueeze(feature, [-1]) + msg = gw.send(ernie_send, nfeat_list=[("term_ids", feature)]) + neigh_feature = gw.recv(msg, lambda feat: F.layers.sequence_pool(feat, pool_type="sum")) + + term_ids = feature + cls = L.fill_constant_batch_size_like(term_ids, [-1, 1, 1], "int64", 1) + term_ids = L.concat([cls, term_ids], 1) + term_ids.stop_gradient = True + ernie = ErnieModel( + term_ids, L.zeros_like(term_ids), + config=self.config.ernie_config) + self_feature = ernie.get_pooled_output() + + self_feature = L.fc(self_feature, + hidden_size, + act=act, + param_attr=F.ParamAttr(name=name + "_l.w_0", + learning_rate=learning_rate), + bias_attr=name+"_l.b_0" + ) + neigh_feature = L.fc(neigh_feature, + hidden_size, + act=act, + param_attr=F.ParamAttr(name=name + "_r.w_0", + learning_rate=learning_rate), + bias_attr=name+"_r.b_0" + ) + output = L.concat([self_feature, neigh_feature], axis=1) + output = L.l2_normalize(output, axis=1) + return output + return erniesage_v2_aggregator(gw, feature, hidden_size, act, initializer, learning_rate, name) + +class ERNIESageV3Encoder(Encoder): + + def __call__(self, graph_wrappers, inputs): + feature = graph_wrappers[0].node_feat["term_ids"] + feature = self.gnn_layer(graph_wrappers[0], feature, self.config.hidden_size, 'leaky_relu', None, 1., "erniesage_v3_0") + + final_feats = [self.take_final_feature(feature, i, "final_fc") for i in inputs] + return final_feats + + def gnn_layer(self, gw, feature, hidden_size, act, initializer, learning_rate, name): + def ernie_recv(feat): + """doc""" + num_neighbor = self.config.samples[0] + pad_value = L.zeros([1], "int64") + out, _ = L.sequence_pad(feat, pad_value=pad_value, maxlen=num_neighbor) + out = L.reshape(out, [0, self.config.max_seqlen*num_neighbor]) + return out + + def erniesage_v3_aggregator(gw, feature, hidden_size, act, initializer, learning_rate, name): + msg = gw.send(copy_send, nfeat_list=[("h", feature)]) + neigh_feature = gw.recv(msg, ernie_recv) + neigh_feature = L.cast(L.unsqueeze(neigh_feature, [-1]), "int64") + + feature = L.unsqueeze(feature, [-1]) + cls = L.fill_constant_batch_size_like(feature, [-1, 1, 1], "int64", 1) + term_ids = L.concat([cls, feature[:, :-1], neigh_feature], 1) + term_ids.stop_gradient = True + return term_ids + return erniesage_v3_aggregator(gw, feature, hidden_size, act, initializer, learning_rate, name) + + def gnn_layers(self, graph_wrappers, feature): + features = [feature] + + initializer = None + fc_lr = self.config.lr / 0.001 + + for i in range(self.config.num_layers): + if i == self.config.num_layers - 1: + act = None + else: + act = "leaky_relu" + + feature = self.gnn_layer( + graph_wrappers[i], + feature, + self.config.hidden_size, + act, + initializer, + learning_rate=fc_lr, + name="%s_%s" % ("erniesage_v3", i)) + features.append(feature) + return features + + def take_final_feature(self, feature, index, name): + """take final feature""" + feat = L.gather(feature, index, overwrite=False) + + ernie_config = self.config.ernie_config + ernie = ErnieGraphModel( + src_ids=feat, + config=ernie_config, + slot_seqlen=self.config.max_seqlen) + feat = ernie.get_pooled_output() + fc_lr = self.config.lr / 0.001 + + if self.config.final_fc: + feat = L.fc(feat, + self.config.hidden_size, + param_attr=F.ParamAttr(name=name + '_w'), + bias_attr=F.ParamAttr(name=name + '_b')) + + if self.config.final_l2_norm: + feat = L.l2_normalize(feat, axis=1) + return feat diff --git a/examples/erniesage/models/erniesage_v1.py b/examples/erniesage/models/erniesage_v1.py deleted file mode 100644 index 696231a..0000000 --- a/examples/erniesage/models/erniesage_v1.py +++ /dev/null @@ -1,42 +0,0 @@ -import pgl -import paddle.fluid as F -import paddle.fluid.layers as L -from models.base import BaseNet, BaseGNNModel -from models.ernie_model.ernie import ErnieModel -from models.ernie_model.ernie import ErnieGraphModel -from models.ernie_model.ernie import ErnieConfig - -class ErnieSageV1(BaseNet): - - def build_inputs(self): - inputs = super(ErnieSageV1, self).build_inputs() - term_ids = L.data( - "term_ids", shape=[None, self.config.max_seqlen], dtype="int64", append_batch_size=False) - return inputs + [term_ids] - - def build_embedding(self, graph_wrappers, term_ids): - term_ids = L.unsqueeze(term_ids, [-1]) - ernie_config = self.config.ernie_config - ernie = ErnieModel( - src_ids=term_ids, - sentence_ids=L.zeros_like(term_ids), - task_ids=None, - config=ernie_config, - use_fp16=False, - name="student_") - feature = ernie.get_pooled_output() - return feature - - def __call__(self, graph_wrappers): - inputs = self.build_inputs() - feature = self.build_embedding(graph_wrappers, inputs[-1]) - features = self.gnn_layers(graph_wrappers, feature) - outputs = [self.take_final_feature(features[-1], i, "final_fc") for i in inputs[:-1]] - src_real_index = L.gather(graph_wrappers[0].node_feat['index'], inputs[0]) - outputs.append(src_real_index) - return inputs, outputs - - -class ErnieSageModelV1(BaseGNNModel): - def gen_net_fn(self, config): - return ErnieSageV1(config) diff --git a/examples/erniesage/models/erniesage_v2.py b/examples/erniesage/models/erniesage_v2.py deleted file mode 100644 index 7ad9a26..0000000 --- a/examples/erniesage/models/erniesage_v2.py +++ /dev/null @@ -1,135 +0,0 @@ -import pgl -import paddle.fluid as F -import paddle.fluid.layers as L -from models.base import BaseNet, BaseGNNModel -from models.ernie_model.ernie import ErnieModel - - -class ErnieSageV2(BaseNet): - - def build_inputs(self): - inputs = super(ErnieSageV2, self).build_inputs() - term_ids = L.data( - "term_ids", shape=[None, self.config.max_seqlen], dtype="int64", append_batch_size=False) - return inputs + [term_ids] - - def gnn_layer(self, gw, feature, hidden_size, act, initializer, learning_rate, name): - def build_position_ids(src_ids, dst_ids): - src_shape = L.shape(src_ids) - src_batch = src_shape[0] - src_seqlen = src_shape[1] - dst_seqlen = src_seqlen - 1 # without cls - - src_position_ids = L.reshape( - L.range( - 0, src_seqlen, 1, dtype='int32'), [1, src_seqlen, 1], - inplace=True) # [1, slot_seqlen, 1] - src_position_ids = L.expand(src_position_ids, [src_batch, 1, 1]) # [B, slot_seqlen * num_b, 1] - zero = L.fill_constant([1], dtype='int64', value=0) - input_mask = L.cast(L.equal(src_ids, zero), "int32") # assume pad id == 0 [B, slot_seqlen, 1] - src_pad_len = L.reduce_sum(input_mask, 1, keep_dim=True) # [B, 1, 1] - - dst_position_ids = L.reshape( - L.range( - src_seqlen, src_seqlen+dst_seqlen, 1, dtype='int32'), [1, dst_seqlen, 1], - inplace=True) # [1, slot_seqlen, 1] - dst_position_ids = L.expand(dst_position_ids, [src_batch, 1, 1]) # [B, slot_seqlen, 1] - dst_position_ids = dst_position_ids - src_pad_len # [B, slot_seqlen, 1] - - position_ids = L.concat([src_position_ids, dst_position_ids], 1) - position_ids = L.cast(position_ids, 'int64') - position_ids.stop_gradient = True - return position_ids - - - def ernie_send(src_feat, dst_feat, edge_feat): - """doc""" - # input_ids - cls = L.fill_constant_batch_size_like(src_feat["term_ids"], [-1, 1, 1], "int64", 1) - src_ids = L.concat([cls, src_feat["term_ids"]], 1) - dst_ids = dst_feat["term_ids"] - - # sent_ids - sent_ids = L.concat([L.zeros_like(src_ids), L.ones_like(dst_ids)], 1) - term_ids = L.concat([src_ids, dst_ids], 1) - - # position_ids - position_ids = build_position_ids(src_ids, dst_ids) - - term_ids.stop_gradient = True - sent_ids.stop_gradient = True - ernie = ErnieModel( - term_ids, sent_ids, position_ids, - config=self.config.ernie_config) - feature = ernie.get_pooled_output() - return feature - - def erniesage_v2_aggregator(gw, feature, hidden_size, act, initializer, learning_rate, name): - feature = L.unsqueeze(feature, [-1]) - msg = gw.send(ernie_send, nfeat_list=[("term_ids", feature)]) - neigh_feature = gw.recv(msg, lambda feat: F.layers.sequence_pool(feat, pool_type="sum")) - - term_ids = feature - cls = L.fill_constant_batch_size_like(term_ids, [-1, 1, 1], "int64", 1) - term_ids = L.concat([cls, term_ids], 1) - term_ids.stop_gradient = True - ernie = ErnieModel( - term_ids, L.zeros_like(term_ids), - config=self.config.ernie_config) - self_feature = ernie.get_pooled_output() - - self_feature = L.fc(self_feature, - hidden_size, - act=act, - param_attr=F.ParamAttr(name=name + "_l.w_0", - learning_rate=learning_rate), - bias_attr=name+"_l.b_0" - ) - neigh_feature = L.fc(neigh_feature, - hidden_size, - act=act, - param_attr=F.ParamAttr(name=name + "_r.w_0", - learning_rate=learning_rate), - bias_attr=name+"_r.b_0" - ) - output = L.concat([self_feature, neigh_feature], axis=1) - output = L.l2_normalize(output, axis=1) - return output - return erniesage_v2_aggregator(gw, feature, hidden_size, act, initializer, learning_rate, name) - - def gnn_layers(self, graph_wrappers, feature): - features = [feature] - - initializer = None - fc_lr = self.config.lr / 0.001 - - for i in range(self.config.num_layers): - if i == self.config.num_layers - 1: - act = None - else: - act = "leaky_relu" - - feature = self.gnn_layer( - graph_wrappers[i], - feature, - self.config.hidden_size, - act, - initializer, - learning_rate=fc_lr, - name="%s_%s" % ("erniesage_v2", i)) - features.append(feature) - return features - - def __call__(self, graph_wrappers): - inputs = self.build_inputs() - feature = inputs[-1] - features = self.gnn_layers(graph_wrappers, feature) - outputs = [self.take_final_feature(features[-1], i, "final_fc") for i in inputs[:-1]] - src_real_index = L.gather(graph_wrappers[0].node_feat['index'], inputs[0]) - outputs.append(src_real_index) - return inputs, outputs - - -class ErnieSageModelV2(BaseGNNModel): - def gen_net_fn(self, config): - return ErnieSageV2(config) diff --git a/examples/erniesage/models/erniesage_v3.py b/examples/erniesage/models/erniesage_v3.py deleted file mode 100644 index 44c3ae8..0000000 --- a/examples/erniesage/models/erniesage_v3.py +++ /dev/null @@ -1,119 +0,0 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import pgl -import paddle.fluid as F -import paddle.fluid.layers as L - -from models.base import BaseNet, BaseGNNModel -from models.ernie_model.ernie import ErnieModel -from models.ernie_model.ernie import ErnieGraphModel -from models.message_passing import copy_send - - -class ErnieSageV3(BaseNet): - def __init__(self, config): - super(ErnieSageV3, self).__init__(config) - - def build_inputs(self): - inputs = super(ErnieSageV3, self).build_inputs() - term_ids = L.data( - "term_ids", shape=[None, self.config.max_seqlen], dtype="int64", append_batch_size=False) - return inputs + [term_ids] - - def gnn_layer(self, gw, feature, hidden_size, act, initializer, learning_rate, name): - def ernie_recv(feat): - """doc""" - num_neighbor = self.config.samples[0] - pad_value = L.zeros([1], "int64") - out, _ = L.sequence_pad(feat, pad_value=pad_value, maxlen=num_neighbor) - out = L.reshape(out, [0, self.config.max_seqlen*num_neighbor]) - return out - - def erniesage_v3_aggregator(gw, feature, hidden_size, act, initializer, learning_rate, name): - msg = gw.send(copy_send, nfeat_list=[("h", feature)]) - neigh_feature = gw.recv(msg, ernie_recv) - neigh_feature = L.cast(L.unsqueeze(neigh_feature, [-1]), "int64") - - feature = L.unsqueeze(feature, [-1]) - cls = L.fill_constant_batch_size_like(feature, [-1, 1, 1], "int64", 1) - term_ids = L.concat([cls, feature[:, :-1], neigh_feature], 1) - term_ids.stop_gradient = True - return term_ids - return erniesage_v3_aggregator(gw, feature, hidden_size, act, initializer, learning_rate, name) - - def gnn_layers(self, graph_wrappers, feature): - features = [feature] - - initializer = None - fc_lr = self.config.lr / 0.001 - - for i in range(self.config.num_layers): - if i == self.config.num_layers - 1: - act = None - else: - act = "leaky_relu" - - feature = self.gnn_layer( - graph_wrappers[i], - feature, - self.config.hidden_size, - act, - initializer, - learning_rate=fc_lr, - name="%s_%s" % ("erniesage_v3", i)) - features.append(feature) - return features - - def take_final_feature(self, feature, index, name): - """take final feature""" - feat = L.gather(feature, index, overwrite=False) - - ernie_config = self.config.ernie_config - ernie = ErnieGraphModel( - src_ids=feat, - config=ernie_config, - slot_seqlen=self.config.max_seqlen) - feat = ernie.get_pooled_output() - fc_lr = self.config.lr / 0.001 - # feat = L.fc(feat, - # self.config.hidden_size, - # act="relu", - # param_attr=F.ParamAttr(name=name + "_l", - # learning_rate=fc_lr), - # ) - #feat = L.l2_normalize(feat, axis=1) - - if self.config.final_fc: - feat = L.fc(feat, - self.config.hidden_size, - param_attr=F.ParamAttr(name=name + '_w'), - bias_attr=F.ParamAttr(name=name + '_b')) - - if self.config.final_l2_norm: - feat = L.l2_normalize(feat, axis=1) - return feat - - def __call__(self, graph_wrappers): - inputs = self.build_inputs() - feature = inputs[-1] - features = self.gnn_layers(graph_wrappers, feature) - outputs = [self.take_final_feature(features[-1], i, "final_fc") for i in inputs[:-1]] - src_real_index = L.gather(graph_wrappers[0].node_feat['index'], inputs[0]) - outputs.append(src_real_index) - return inputs, outputs - - -class ErnieSageModelV3(BaseGNNModel): - def gen_net_fn(self, config): - return ErnieSageV3(config) diff --git a/examples/erniesage/models/loss.py b/examples/erniesage/models/loss.py new file mode 100644 index 0000000..ef45340 --- /dev/null +++ b/examples/erniesage/models/loss.py @@ -0,0 +1,88 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import time +import glob +import os + +import numpy as np + +import pgl +import paddle.fluid as F +import paddle.fluid.layers as L + + +class Loss(object): + + def __init__(self, config): + self.config = config + + @classmethod + def factory(cls, config): + loss_type = config.loss_type + if loss_type == "hinge": + return HingeLoss(config) + elif loss_type == "global_hinge": + return GlobalHingeLoss(config) + else: + raise ValueError + + +class HingeLoss(Loss): + + def __call__(self, user_feat, pos_item_feat, neg_item_feat): + pos = L.reduce_sum(user_feat * pos_item_feat, -1, keep_dim=True) # [B, 1] + neg = L.matmul(user_feat, neg_item_feat, transpose_y=True) # [B, B] + loss = L.reduce_mean(L.relu(neg - pos + self.config.margin)) + return loss + + +def all_gather(X): + trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) + trainer_num = int(os.getenv("PADDLE_TRAINERS_NUM", "0")) + if trainer_num == 1: + copy_X = X * 1 + copy_X.stop_gradient=True + return copy_X + + Xs = [] + for i in range(trainer_num): + copy_X = X * 1 + copy_X = L.collective._broadcast(copy_X, i, True) + copy_X.stop_gradient=True + Xs.append(copy_X) + + if len(Xs) > 1: + Xs=L.concat(Xs, 0) + Xs.stop_gradient=True + else: + Xs = Xs[0] + return Xs + + +class GlobalHingeLoss(Loss): + + def __call__(self, user_feat, pos_item_feat, neg_item_feat): + pos = L.reduce_sum(user_feat * pos_item_feat, -1, keep_dim=True) # [B, 1] + all_pos = all_gather(pos) # [B * n, 1] + all_neg_item_feat = all_gather(neg_item_feat) # [B * n, 1] + all_user_feat = all_gather(user_feat) # [B * n, 1] + + neg1 = L.matmul(user_feat, all_neg_item_feat, transpose_y=True) # [B, B * n] + neg2 = L.matmul(all_user_feat, neg_item_feat, transpose_y=True) # [B *n, B] + + loss1 = L.reduce_mean(L.relu(neg1 - pos + self.config.margin)) + loss2 = L.reduce_mean(L.relu(neg2 - all_pos + self.config.margin)) + + loss = loss1 + loss2 + return loss diff --git a/examples/erniesage/models/model.py b/examples/erniesage/models/model.py new file mode 100644 index 0000000..a8c18ce --- /dev/null +++ b/examples/erniesage/models/model.py @@ -0,0 +1,110 @@ +import numpy as np +import pgl +import paddle.fluid as F +import paddle.fluid.layers as L +from models.encoder import Encoder +from models.loss import Loss + +class BaseModel(object): + + def __init__(self, config): + self.config = config + datas, graph_wrappers, loss, outputs = self.forward() + self.build(datas, graph_wrappers, loss, outputs) + + def forward(self): + raise NotImplementedError + + def build(self, datas, graph_wrappers, loss, outputs): + self.datas = datas + self.graph_wrappers = graph_wrappers + self.loss = loss + self.outputs = outputs + self.build_feed_list() + self.build_data_loader() + + def build_feed_list(self): + self.feed_list = [] + for i in range(len(self.graph_wrappers)): + self.feed_list.extend(self.graph_wrappers[i].holder_list) + self.feed_list.extend(self.datas) + + def build_data_loader(self): + self.data_loader = F.io.PyReader( + feed_list=self.feed_list, capacity=20, use_double_buffer=True, iterable=True) + + +class LinkPredictModel(BaseModel): + + def forward(self): + # datas + user_index = L.data( + "user_index", shape=[None], dtype="int64", append_batch_size=False) + pos_item_index = L.data( + "pos_item_index", shape=[None], dtype="int64", append_batch_size=False) + neg_item_index = L.data( + "neg_item_index", shape=[None], dtype="int64", append_batch_size=False) + user_real_index = L.data( + "user_real_index", shape=[None], dtype="int64", append_batch_size=False) + pos_item_real_index = L.data( + "pos_item_real_index", shape=[None], dtype="int64", append_batch_size=False) + neg_item_real_index = L.data( + "neg_item_real_index", shape=[None], dtype="int64", append_batch_size=False) + datas = [user_index, pos_item_index, neg_item_index, user_real_index, pos_item_real_index, neg_item_real_index] + + # graph_wrappers + graph_wrappers = [] + node_feature_info, edge_feature_info = [], [] + node_feature_info.append(('index', [None], np.dtype('int64'))) + node_feature_info.append(('term_ids', [None, None], np.dtype('int64'))) + for i in range(self.config.num_layers): + graph_wrappers.append( + pgl.graph_wrapper.GraphWrapper( + "layer_%s" % i, node_feat=node_feature_info, edge_feat=edge_feature_info)) + + # encoder model + encoder = Encoder.factory(self.config) + outputs = encoder(graph_wrappers, [user_index, pos_item_index, neg_item_index]) + user_feat, pos_item_feat, neg_item_feat = outputs + + # loss + if self.config.neg_type == "batch_neg": + neg_item_feat = pos_item_feat + loss_func = Loss.factory(self.config) + loss = loss_func(user_feat, pos_item_feat, neg_item_feat) + + # set datas, graph_wrappers, loss, outputs + return datas, graph_wrappers, loss, outputs + [user_real_index, pos_item_real_index, neg_item_real_index] + + +class NodeClassificationModel(BaseModel): + + def forward(self): + # inputs + node_index = L.data( + "node_index", shape=[None], dtype="int64", append_batch_size=False) + label = L.data( + "label", shape=[None], dtype="int64", append_batch_size=False) + datas = [node_index, label] + + # graph_wrappers + graph_wrappers = [] + node_feature_info = [] + node_feature_info.append(('index', [None], np.dtype('int64'))) + node_feature_info.append(('term_ids', [None, None], np.dtype('int64'))) + for i in range(self.config.num_layers): + graph_wrappers.append( + pgl.graph_wrapper.GraphWrapper( + "layer_%s" % i, node_feat=node_feature_info)) + + # encoder model + encoder = Encoder.factory(self.config) + outputs = encoder(graph_wrappers, [node_index]) + feat = outputs + + # loss + loss_func = Loss.factory(self.config) + loss = loss_func(feat1, feat2, feat3, label) + + # set datas, graph_wrappers, loss, outputs + return datas, graph_wrappers, loss, outputs diff --git a/examples/erniesage/models/model_factory.py b/examples/erniesage/models/model_factory.py deleted file mode 100644 index 0f69bb1..0000000 --- a/examples/erniesage/models/model_factory.py +++ /dev/null @@ -1,24 +0,0 @@ -from models.base import BaseGNNModel -from models.ernie import ErnieModel -from models.erniesage_v1 import ErnieSageModelV1 -from models.erniesage_v2 import ErnieSageModelV2 -from models.erniesage_v3 import ErnieSageModelV3 - -class Model(object): - @classmethod - def factory(cls, config): - name = config.model_type - if name == "BaseGNNModel": - return BaseGNNModel(config) - if name == "ErnieModel": - return ErnieModel(config) - if name == "ErnieSageModelV1": - return ErnieSageModelV1(config) - if name == "ErnieSageModelV2": - return ErnieSageModelV2(config) - if name == "ErnieSageModelV3": - return ErnieSageModelV3(config) - else: - raise ValueError - - diff --git a/examples/erniesage/preprocessing/dump_graph.py b/examples/erniesage/preprocessing/dump_graph.py index d1558b3..96e02e1 100644 --- a/examples/erniesage/preprocessing/dump_graph.py +++ b/examples/erniesage/preprocessing/dump_graph.py @@ -54,12 +54,13 @@ def dump_graph(args): terms = [] count = 0 item_distribution = [] + edges = [] + train_data = [] - with io.open(args.inpath, encoding=args.encoding) as f: - edges = [] + with io.open(args.graphpath, encoding=args.encoding) as f: for idx, line in enumerate(f): if idx % 100000 == 0: - log.info("%s readed %s lines" % (args.inpath, idx)) + log.info("%s readed %s lines" % (args.graphpath, idx)) slots = [] for col_idx, col in enumerate(line.strip("\n").split("\t")): s = col[:args.max_seqlen] @@ -68,20 +69,40 @@ def dump_graph(args): count += 1 term_file.write(str(col_idx) + "\t" + col + "\n") item_distribution.append(0) - slots.append(str2id[s]) src = slots[0] dst = slots[1] - neg_samples.append(slots[2:]) + #neg_samples.append(slots[2:]) edges.append((src, dst)) edges.append((dst, src)) item_distribution[dst] += 1 + edges = np.array(edges, dtype="int64") + + with io.open(args.inpath, encoding=args.encoding) as f: + for idx, line in enumerate(f): + if idx % 100000 == 0: + log.info("%s readed %s lines" % (args.inpath, idx)) + slots = [] + for col_idx, col in enumerate(line.strip("\n").split("\t")): + s = col[:args.max_seqlen] + if s not in str2id: + str2id[s] = count + count += 1 + term_file.write(str(col_idx) + "\t" + col + "\n") + item_distribution.append(0) + slots.append(str2id[s]) + + src = slots[0] + dst = slots[1] + neg_samples.append(slots[2:]) + train_data.append((src, dst)) + train_data = np.array(train_data, dtype="int64") + + term_file.close() + num_nodes = len(str2id) + str2id.clear() - term_file.close() - edges = np.array(edges, dtype="int64") - num_nodes = len(str2id) - str2id.clear() log.info("building graph...") graph = pgl.graph.Graph(num_nodes=num_nodes, edges=edges) indegree = graph.indegree() @@ -94,6 +115,7 @@ def dump_graph(args): item_distribution = np.sqrt(item_distribution) distribution = 1. * item_distribution / item_distribution.sum() alias, events = alias_sample_build_table(distribution) + np.save(os.path.join(args.outpath, "train_data.npy"), train_data) np.save(os.path.join(args.outpath, "alias.npy"), alias) np.save(os.path.join(args.outpath, "events.npy"), events) np.save(os.path.join(args.outpath, "neg_samples.npy"), np.array(neg_samples)) @@ -112,6 +134,7 @@ def dump_node_feat(args): if __name__ == "__main__": parser = argparse.ArgumentParser(description='main') parser.add_argument("-i", "--inpath", type=str, default=None) + parser.add_argument("-g", "--graphpath", type=str, default=None) parser.add_argument("-l", "--max_seqlen", type=int, default=30) parser.add_argument("--vocab_file", type=str, default="./vocab.txt") parser.add_argument("--encoding", type=str, default="utf8") diff --git a/examples/erniesage/train.py b/examples/erniesage/train.py index cc3255c..33d5b92 100644 --- a/examples/erniesage/train.py +++ b/examples/erniesage/train.py @@ -22,17 +22,17 @@ from pgl.utils.logger import log from pgl.utils import paddle_helper from learner import Learner -from models.model_factory import Model +from models.model import LinkPredictModel from dataset.graph_reader import GraphGenerator class TrainData(object): - def __init__(self, graph_path): + def __init__(self, graph_work_path): trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) log.info("trainer_id: %s, trainer_count: %s." % (trainer_id, trainer_count)) - bidirectional_edges = np.load(os.path.join(graph_path, "edges.npy"), allow_pickle=True) + bidirectional_edges = np.load(os.path.join(graph_work_path, "train_data.npy"), allow_pickle=True) # edges is bidirectional. edges = bidirectional_edges[0::2] train_usr = edges[trainer_id::trainer_count, 0] @@ -41,8 +41,8 @@ class TrainData(object): "train_data": [train_usr, train_ad] } - if os.path.exists(os.path.join(graph_path, "neg_samples.npy")): - neg_samples = np.load(os.path.join(graph_path, "neg_samples.npy"), allow_pickle=True) + if os.path.exists(os.path.join(graph_work_path, "neg_samples.npy")): + neg_samples = np.load(os.path.join(graph_work_path, "neg_samples.npy"), allow_pickle=True) if neg_samples.size != 0: train_negs = neg_samples[trainer_id::trainer_count] returns["train_data"].append(train_negs) @@ -50,7 +50,7 @@ class TrainData(object): self.data = returns def __getitem__(self, index): - return [ data[index] for data in self.data["train_data"]] + return [data[index] for data in self.data["train_data"]] def __len__(self): return len(self.data["train_data"][0]) @@ -58,10 +58,10 @@ class TrainData(object): def main(config): # Select Model - model = Model.factory(config) + model = LinkPredictModel(config) # Build Train Edges - data = TrainData(config.graph_path) + data = TrainData(config.graph_work_path) # Build Train Data train_iter = GraphGenerator( @@ -73,7 +73,7 @@ def main(config): feed_name_list=[var.name for var in model.feed_list], use_pyreader=config.use_pyreader, phase="train", - graph_data_path=config.graph_path, + graph_data_path=config.graph_work_path, shuffle=True, neg_type=config.neg_type) -- GitLab