未验证 提交 9f629ac3 编写于 作者: W Weiyue Su 提交者: GitHub

Merge pull request #116 from WeiyueSu/main

erniesage: reconstitution
# Global Enviroment Settings # Global Enviroment Settings
# #
# trainer config ------ # trainer config ------
task: "link_predict"
learner_type: "gpu" learner_type: "gpu"
optimizer_type: "adam" optimizer_type: "adam"
lr: 0.00005 lr: 0.00005
batch_size: 32 batch_size: 32
CPU_NUM: 10 CPU_NUM: 10
epoch: 20 epoch: 3
log_per_step: 1 log_per_step: 1
save_per_step: 100 save_per_step: 1000
output_path: "./output" output_path: "./output"
ckpt_path: "./ernie_base_ckpt" ckpt_path: "./ernie_base_ckpt"
# data config ------ # data config ------
input_data: "./data.txt" train_data: "./example_data/link_predict/graph_data.txt"
graph_path: "./workdir" graph_data: "./example_data/link_predict/train_data.txt"
graph_work_path: "./workdir"
sample_workers: 1 sample_workers: 1
use_pyreader: true use_pyreader: true
input_type: "text" input_type: "text"
# model config ------ # model config ------
samples: [10] samples: [10]
model_type: "ErnieSageModelV1" model_type: "ERNIESageV2"
layer_type: "graphsage_sum" layer_type: "graphsage_sum"
max_seqlen: 40 max_seqlen: 40
...@@ -30,8 +33,9 @@ num_layers: 1 ...@@ -30,8 +33,9 @@ num_layers: 1
hidden_size: 128 hidden_size: 128
final_fc: true final_fc: true
final_l2_norm: true final_l2_norm: true
loss_type: "hinge" loss_type: "global_hinge"
margin: 0.3 margin: 0.1
neg_type: "batch_neg"
# infer config ------ # infer config ------
infer_model: "./output/last" infer_model: "./output/last"
......
# Global Enviroment Settings # Global Enviroment Settings
# #
# trainer config ------ # trainer config ------
task: "node_classification"
learner_type: "gpu" learner_type: "gpu"
optimizer_type: "adam" optimizer_type: "adam"
lr: 0.00005 lr: 0.00005
batch_size: 32 batch_size: 32
CPU_NUM: 10 CPU_NUM: 10
epoch: 3 epoch: 3
log_per_step: 10 log_per_step: 1
save_per_step: 1000 save_per_step: 1000
output_path: "./output" output_path: "./output"
ckpt_path: "./ernie_base_ckpt" ckpt_path: "./ernie_base_ckpt"
# data config ------ # data config ------
input_data: "./data.txt" graph_data: "./example_data/node_classification/graph_data.txt"
graph_path: "./workdir" train_data: "./example_data/node_classification/train_data.txt"
graph_work_path: "./workdir"
sample_workers: 1 sample_workers: 1
use_pyreader: true use_pyreader: true
input_type: "text" input_type: "text"
# model config ------ # model config ------
num_label: 10
samples: [10] samples: [10]
model_type: "ErnieSageModelV2" model_type: "ERNIESageV2"
layer_type: "graphsage_sum"
max_seqlen: 40 max_seqlen: 40
...@@ -29,8 +34,8 @@ num_layers: 1 ...@@ -29,8 +34,8 @@ num_layers: 1
hidden_size: 128 hidden_size: 128
final_fc: true final_fc: true
final_l2_norm: true final_l2_norm: true
loss_type: "hinge" loss_type: "softmax_with_cross_entropy"
margin: 0.3 margin: 0.1
neg_type: "batch_neg" neg_type: "batch_neg"
# infer config ------ # infer config ------
...@@ -49,7 +54,7 @@ ernie_config: ...@@ -49,7 +54,7 @@ ernie_config:
max_position_embeddings: 513 max_position_embeddings: 513
num_attention_heads: 12 num_attention_heads: 12
num_hidden_layers: 12 num_hidden_layers: 12
sent_type_vocab_size: 2 sent_type_vocab_size: 4
task_type_vocab_size: 3 task_type_vocab_size: 3
vocab_size: 18000 vocab_size: 18000
use_task_id: false use_task_id: false
......
# Global Enviroment Settings
#
# trainer config ------
learner_type: "cpu"
optimizer_type: "adam"
lr: 0.00005
batch_size: 2
CPU_NUM: 10
epoch: 20
log_per_step: 1
save_per_step: 100
output_path: "./output"
ckpt_path: "./ernie_base_ckpt"
# data config ------
input_data: "./data.txt"
graph_path: "./workdir"
sample_workers: 1
use_pyreader: true
input_type: "text"
# model config ------
samples: [10]
model_type: "ErnieSageModelV1"
layer_type: "graphsage_sum"
max_seqlen: 40
num_layers: 1
hidden_size: 128
final_fc: true
final_l2_norm: true
loss_type: "hinge"
margin: 0.3
# infer config ------
infer_model: "./output/last"
infer_batch_size: 128
# ernie config ------
encoding: "utf8"
ernie_vocab_file: "./vocab.txt"
ernie_config:
attention_probs_dropout_prob: 0.1
hidden_act: "relu"
hidden_dropout_prob: 0.1
hidden_size: 768
initializer_range: 0.02
max_position_embeddings: 513
num_attention_heads: 12
num_hidden_layers: 12
sent_type_vocab_size: 4
task_type_vocab_size: 3
vocab_size: 18000
use_task_id: false
use_fp16: false
# Global Enviroment Settings
#
# trainer config ------
learner_type: "cpu"
optimizer_type: "adam"
lr: 0.00005
batch_size: 4
CPU_NUM: 16
epoch: 3
log_per_step: 1
save_per_step: 100
output_path: "./output"
ckpt_path: "./ernie_base_ckpt"
# data config ------
input_data: "./data.txt"
graph_path: "./workdir"
sample_workers: 1
use_pyreader: true
input_type: "text"
# model config ------
samples: [10]
model_type: "ErnieSageModelV2"
max_seqlen: 40
num_layers: 1
hidden_size: 128
final_fc: true
final_l2_norm: true
loss_type: "hinge"
margin: 0.3
neg_type: "batch_neg"
# infer config ------
infer_model: "./output/last"
infer_batch_size: 128
# ernie config ------
encoding: "utf8"
ernie_vocab_file: "./vocab.txt"
ernie_config:
attention_probs_dropout_prob: 0.1
hidden_act: "relu"
hidden_dropout_prob: 0.1
hidden_size: 768
initializer_range: 0.02
max_position_embeddings: 513
num_attention_heads: 12
num_hidden_layers: 12
sent_type_vocab_size: 2
task_type_vocab_size: 3
vocab_size: 18000
use_task_id: false
use_fp16: false
# Global Enviroment Settings
#
# trainer config ------
learner_type: "cpu"
optimizer_type: "adam"
lr: 0.00005
batch_size: 2
CPU_NUM: 10
epoch: 20
log_per_step: 1
save_per_step: 100
output_path: "./output"
ckpt_path: "./ernie_base_ckpt"
# data config ------
input_data: "./data.txt"
graph_path: "./workdir"
sample_workers: 1
use_pyreader: true
input_type: "text"
# model config ------
samples: [10]
model_type: "ErnieSageModelV3"
max_seqlen: 40
num_layers: 1
hidden_size: 128
final_fc: true
final_l2_norm: true
loss_type: "hinge"
margin: 0.3
# infer config ------
infer_model: "./output/last"
infer_batch_size: 128
# ernie config ------
encoding: "utf8"
ernie_vocab_file: "./vocab.txt"
ernie_config:
attention_probs_dropout_prob: 0.1
hidden_act: "relu"
hidden_dropout_prob: 0.1
hidden_size: 768
initializer_range: 0.02
max_position_embeddings: 513
num_attention_heads: 12
num_hidden_layers: 12
sent_type_vocab_size: 4
task_type_vocab_size: 3
vocab_size: 18000
use_task_id: false
use_fp16: false
# Global Enviroment Settings
#
# trainer config ------
learner_type: "gpu"
optimizer_type: "adam"
lr: 0.00005
batch_size: 32
CPU_NUM: 10
epoch: 20
log_per_step: 1
save_per_step: 100
output_path: "./output"
ckpt_path: "./ernie_base_ckpt"
# data config ------
input_data: "./data.txt"
graph_path: "./workdir"
sample_workers: 1
use_pyreader: true
input_type: "text"
# model config ------
samples: [10]
model_type: "ErnieSageModelV3"
max_seqlen: 40
num_layers: 1
hidden_size: 128
final_fc: true
final_l2_norm: true
loss_type: "hinge"
margin: 0.3
# infer config ------
infer_model: "./output/last"
infer_batch_size: 128
# ernie config ------
encoding: "utf8"
ernie_vocab_file: "./vocab.txt"
ernie_config:
attention_probs_dropout_prob: 0.1
hidden_act: "relu"
hidden_dropout_prob: 0.1
hidden_size: 768
initializer_range: 0.02
max_position_embeddings: 513
num_attention_heads: 12
num_hidden_layers: 12
sent_type_vocab_size: 4
task_type_vocab_size: 3
vocab_size: 18000
use_task_id: false
use_fp16: false
...@@ -74,17 +74,15 @@ class GraphGenerator(BaseDataGenerator): ...@@ -74,17 +74,15 @@ class GraphGenerator(BaseDataGenerator):
batch_dst = np.array(batch_dst, dtype="int64") batch_dst = np.array(batch_dst, dtype="int64")
if self.neg_type == "batch_neg": if self.neg_type == "batch_neg":
neg_shape = [1] batch_neg = batch_dst
else: else:
# TODO user define shape of neg_sample
neg_shape = batch_dst.shape neg_shape = batch_dst.shape
sampled_batch_neg = alias_sample(neg_shape, self.alias, self.events) sampled_batch_neg = alias_sample(neg_shape, self.alias, self.events)
if len(batch_neg) > 0:
batch_neg = np.concatenate([batch_neg, sampled_batch_neg], 0) batch_neg = np.concatenate([batch_neg, sampled_batch_neg], 0)
else:
batch_neg = sampled_batch_neg
if self.phase == "train": if self.phase == "train":
# TODO user define ignore edges or not
#ignore_edges = np.concatenate([np.stack([batch_src, batch_dst], 1), np.stack([batch_dst, batch_src], 1)], 0) #ignore_edges = np.concatenate([np.stack([batch_src, batch_dst], 1), np.stack([batch_dst, batch_src], 1)], 0)
ignore_edges = set() ignore_edges = set()
else: else:
...@@ -92,7 +90,8 @@ class GraphGenerator(BaseDataGenerator): ...@@ -92,7 +90,8 @@ class GraphGenerator(BaseDataGenerator):
nodes = np.unique(np.concatenate([batch_src, batch_dst, batch_neg], 0)) nodes = np.unique(np.concatenate([batch_src, batch_dst, batch_neg], 0))
subgraphs = graphsage_sample(self.graph, nodes, self.samples, ignore_edges=ignore_edges) subgraphs = graphsage_sample(self.graph, nodes, self.samples, ignore_edges=ignore_edges)
#subgraphs[0].reindex_to_parrent_nodes(subgraphs[0].nodes) subgraphs[0].node_feat["index"] = subgraphs[0].reindex_to_parrent_nodes(subgraphs[0].nodes).astype(np.int64)
subgraphs[0].node_feat["term_ids"] = self.term_ids[subgraphs[0].node_feat["index"]].astype(np.int64)
feed_dict = {} feed_dict = {}
for i in range(self.num_layers): for i in range(self.num_layers):
feed_dict.update(self.graph_wrappers[i].to_feed(subgraphs[i])) feed_dict.update(self.graph_wrappers[i].to_feed(subgraphs[i]))
...@@ -103,9 +102,11 @@ class GraphGenerator(BaseDataGenerator): ...@@ -103,9 +102,11 @@ class GraphGenerator(BaseDataGenerator):
sub_neg_idx = subgraphs[0].reindex_from_parrent_nodes(batch_neg) sub_neg_idx = subgraphs[0].reindex_from_parrent_nodes(batch_neg)
feed_dict["user_index"] = np.array(sub_src_idx, dtype="int64") feed_dict["user_index"] = np.array(sub_src_idx, dtype="int64")
feed_dict["item_index"] = np.array(sub_dst_idx, dtype="int64") feed_dict["pos_item_index"] = np.array(sub_dst_idx, dtype="int64")
feed_dict["neg_item_index"] = np.array(sub_neg_idx, dtype="int64") feed_dict["neg_item_index"] = np.array(sub_neg_idx, dtype="int64")
feed_dict["term_ids"] = self.term_ids[subgraphs[0].node_feat["index"]].astype(np.int64)
feed_dict["user_real_index"] = np.array(batch_src, dtype="int64")
feed_dict["pos_item_real_index"] = np.array(batch_dst, dtype="int64")
return feed_dict return feed_dict
def __call__(self): def __call__(self):
...@@ -124,3 +125,37 @@ class GraphGenerator(BaseDataGenerator): ...@@ -124,3 +125,37 @@ class GraphGenerator(BaseDataGenerator):
class NodeClassificationGenerator(GraphGenerator):
def batch_fn(self, batch_ex):
# batch_ex = [
# (node, label),
# (node, label),
# ]
#
batch_node = []
batch_label = []
for batch in batch_ex:
batch_node.append(batch[0])
batch_label.append(batch[1])
if len(batch_node) != self.batch_size:
if self.phase == "train":
return None #Skip
batch_node = np.array(batch_node, dtype="int64")
batch_label = np.array(batch_label, dtype="int64")
subgraphs = graphsage_sample(self.graph, batch_node, self.samples)
subgraphs[0].node_feat["index"] = subgraphs[0].reindex_to_parrent_nodes(subgraphs[0].nodes).astype(np.int64)
subgraphs[0].node_feat["term_ids"] = self.term_ids[subgraphs[0].node_feat["index"]].astype(np.int64)
feed_dict = {}
for i in range(self.num_layers):
feed_dict.update(self.graph_wrappers[i].to_feed(subgraphs[i]))
# only reindex from first subgraph
sub_node_idx = subgraphs[0].reindex_from_parrent_nodes(batch_node)
feed_dict["node_index"] = np.array(sub_node_idx, dtype="int64")
feed_dict["node_real_index"] = np.array(batch_node, dtype="int64")
feed_dict["label"] = np.array(batch_label, dtype="int64")
return feed_dict
...@@ -34,7 +34,7 @@ from pgl.utils import paddle_helper ...@@ -34,7 +34,7 @@ from pgl.utils import paddle_helper
import paddle import paddle
import paddle.fluid as F import paddle.fluid as F
from models.model_factory import Model from models.model import LinkPredictModel
from dataset.graph_reader import GraphGenerator from dataset.graph_reader import GraphGenerator
...@@ -59,7 +59,7 @@ def run_predict(py_reader, ...@@ -59,7 +59,7 @@ def run_predict(py_reader,
log_per_step=1, log_per_step=1,
args=None): args=None):
id2str = io.open(os.path.join(args.graph_path, "terms.txt"), encoding=args.encoding).readlines() id2str = io.open(os.path.join(args.graph_work_path, "terms.txt"), encoding=args.encoding).readlines()
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
...@@ -71,7 +71,7 @@ def run_predict(py_reader, ...@@ -71,7 +71,7 @@ def run_predict(py_reader,
for batch_feed_dict in py_reader(): for batch_feed_dict in py_reader():
batch += 1 batch += 1
batch_usr_feat, batch_ad_feat, _, batch_src_real_index = exe.run( batch_usr_feat, _, _, batch_src_real_index, _ = exe.run(
program, program,
feed=batch_feed_dict, feed=batch_feed_dict,
fetch_list=model_dict.outputs) fetch_list=model_dict.outputs)
...@@ -79,7 +79,7 @@ def run_predict(py_reader, ...@@ -79,7 +79,7 @@ def run_predict(py_reader,
if batch % log_per_step == 0: if batch % log_per_step == 0:
log.info("Predict %s finished" % batch) log.info("Predict %s finished" % batch)
for ufs, _, sri in zip(batch_usr_feat, batch_ad_feat, batch_src_real_index): for ufs, sri in zip(batch_usr_feat, batch_src_real_index):
if args.input_type == "text": if args.input_type == "text":
sri = id2str[int(sri)].strip("\n") sri = id2str[int(sri)].strip("\n")
line = "{}\t{}\n".format(sri, tostr(ufs)) line = "{}\t{}\n".format(sri, tostr(ufs))
...@@ -108,7 +108,7 @@ def _warmstart(exe, program, path='params'): ...@@ -108,7 +108,7 @@ def _warmstart(exe, program, path='params'):
) )
def main(config): def main(config):
model = Model.factory(config) model = LinkPredictModel(config)
if config.learner_type == "cpu": if config.learner_type == "cpu":
place = F.CPUPlace() place = F.CPUPlace()
...@@ -143,7 +143,7 @@ def main(config): ...@@ -143,7 +143,7 @@ def main(config):
build_strategy=build_strategy, build_strategy=build_strategy,
exec_strategy=exec_strategy) exec_strategy=exec_strategy)
num_nodes = int(np.load(os.path.join(config.graph_path, "num_nodes.npy"))) num_nodes = int(np.load(os.path.join(config.graph_work_path, "num_nodes.npy")))
predict_data = PredictData(num_nodes) predict_data = PredictData(num_nodes)
...@@ -156,7 +156,7 @@ def main(config): ...@@ -156,7 +156,7 @@ def main(config):
feed_name_list=[var.name for var in model.feed_list], feed_name_list=[var.name for var in model.feed_list],
use_pyreader=config.use_pyreader, use_pyreader=config.use_pyreader,
phase="predict", phase="predict",
graph_data_path=config.graph_path, graph_data_path=config.graph_work_path,
shuffle=False) shuffle=False)
if config.learner_type == "cpu": if config.learner_type == "cpu":
......
...@@ -22,27 +22,26 @@ from pgl.utils.logger import log ...@@ -22,27 +22,26 @@ from pgl.utils.logger import log
from pgl.utils import paddle_helper from pgl.utils import paddle_helper
from learner import Learner from learner import Learner
from models.model_factory import Model from models.model import LinkPredictModel
from dataset.graph_reader import GraphGenerator from dataset.graph_reader import GraphGenerator
class TrainData(object): class TrainData(object):
def __init__(self, graph_path): def __init__(self, graph_work_path):
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
log.info("trainer_id: %s, trainer_count: %s." % (trainer_id, trainer_count)) log.info("trainer_id: %s, trainer_count: %s." % (trainer_id, trainer_count))
bidirectional_edges = np.load(os.path.join(graph_path, "edges.npy"), allow_pickle=True) edges = np.load(os.path.join(graph_work_path, "train_data.npy"), allow_pickle=True)
# edges is bidirectional. # edges is bidirectional.
edges = bidirectional_edges[0::2]
train_usr = edges[trainer_id::trainer_count, 0] train_usr = edges[trainer_id::trainer_count, 0]
train_ad = edges[trainer_id::trainer_count, 1] train_ad = edges[trainer_id::trainer_count, 1]
returns = { returns = {
"train_data": [train_usr, train_ad] "train_data": [train_usr, train_ad]
} }
if os.path.exists(os.path.join(graph_path, "neg_samples.npy")): if os.path.exists(os.path.join(graph_work_path, "neg_samples.npy")):
neg_samples = np.load(os.path.join(graph_path, "neg_samples.npy"), allow_pickle=True) neg_samples = np.load(os.path.join(graph_work_path, "neg_samples.npy"), allow_pickle=True)
if neg_samples.size != 0: if neg_samples.size != 0:
train_negs = neg_samples[trainer_id::trainer_count] train_negs = neg_samples[trainer_id::trainer_count]
returns["train_data"].append(train_negs) returns["train_data"].append(train_negs)
...@@ -50,7 +49,7 @@ class TrainData(object): ...@@ -50,7 +49,7 @@ class TrainData(object):
self.data = returns self.data = returns
def __getitem__(self, index): def __getitem__(self, index):
return [ data[index] for data in self.data["train_data"]] return [data[index] for data in self.data["train_data"]]
def __len__(self): def __len__(self):
return len(self.data["train_data"][0]) return len(self.data["train_data"][0])
...@@ -58,10 +57,10 @@ class TrainData(object): ...@@ -58,10 +57,10 @@ class TrainData(object):
def main(config): def main(config):
# Select Model # Select Model
model = Model.factory(config) model = LinkPredictModel(config)
# Build Train Edges # Build Train Edges
data = TrainData(config.graph_path) data = TrainData(config.graph_work_path)
# Build Train Data # Build Train Data
train_iter = GraphGenerator( train_iter = GraphGenerator(
...@@ -73,7 +72,7 @@ def main(config): ...@@ -73,7 +72,7 @@ def main(config):
feed_name_list=[var.name for var in model.feed_list], feed_name_list=[var.name for var in model.feed_list],
use_pyreader=config.use_pyreader, use_pyreader=config.use_pyreader,
phase="train", phase="train",
graph_data_path=config.graph_path, graph_data_path=config.graph_work_path,
shuffle=True, shuffle=True,
neg_type=config.neg_type) neg_type=config.neg_type)
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import glob
import os
import numpy as np
import pgl
import paddle.fluid as F
import paddle.fluid.layers as L
from models import message_passing
def get_layer(layer_type, gw, feature, hidden_size, act, initializer, learning_rate, name, is_test=False):
return getattr(message_passing, layer_type)(gw, feature, hidden_size, act, initializer, learning_rate, name)
class BaseGraphWrapperBuilder(object):
def __init__(self, config):
self.config = config
self.node_feature_info = []
self.edge_feature_info = []
def __call__(self):
place = F.CPUPlace()
graph_wrappers = []
for i in range(self.config.num_layers):
# all graph have same node_feat_info
graph_wrappers.append(
pgl.graph_wrapper.GraphWrapper(
"layer_%s" % i, node_feat=self.node_feature_info, edge_feat=self.edge_feature_info))
return graph_wrappers
class GraphsageGraphWrapperBuilder(BaseGraphWrapperBuilder):
def __init__(self, config):
super(GraphsageGraphWrapperBuilder, self).__init__(config)
self.node_feature_info.append(('index', [None], np.dtype('int64')))
class BaseGNNModel(object):
def __init__(self, config):
self.config = config
self.graph_wrapper_builder = self.gen_graph_wrapper_builder(config)
self.net_fn = self.gen_net_fn(config)
self.feed_list_builder = self.gen_feed_list_builder(config)
self.data_loader_builder = self.gen_data_loader_builder(config)
self.loss_fn = self.gen_loss_fn(config)
self.build()
def gen_graph_wrapper_builder(self, config):
return GraphsageGraphWrapperBuilder(config)
def gen_net_fn(self, config):
return BaseNet(config)
def gen_feed_list_builder(self, config):
return BaseFeedListBuilder(config)
def gen_data_loader_builder(self, config):
return BaseDataLoaderBuilder(config)
def gen_loss_fn(self, config):
return BaseLoss(config)
def build(self):
self.graph_wrappers = self.graph_wrapper_builder()
self.inputs, self.outputs = self.net_fn(self.graph_wrappers)
self.feed_list = self.feed_list_builder(self.inputs, self.graph_wrappers)
self.data_loader = self.data_loader_builder(self.feed_list)
self.loss = self.loss_fn(self.outputs)
class BaseFeedListBuilder(object):
def __init__(self, config):
self.config = config
def __call__(self, inputs, graph_wrappers):
feed_list = []
for i in range(len(graph_wrappers)):
feed_list.extend(graph_wrappers[i].holder_list)
feed_list.extend(inputs)
return feed_list
class BaseDataLoaderBuilder(object):
def __init__(self, config):
self.config = config
def __call__(self, feed_list):
data_loader = F.io.PyReader(
feed_list=feed_list, capacity=20, use_double_buffer=True, iterable=True)
return data_loader
class BaseNet(object):
def __init__(self, config):
self.config = config
def take_final_feature(self, feature, index, name):
"""take final feature"""
feat = L.gather(feature, index, overwrite=False)
if self.config.final_fc:
feat = L.fc(feat,
self.config.hidden_size,
param_attr=F.ParamAttr(name=name + '_w'),
bias_attr=F.ParamAttr(name=name + '_b'))
if self.config.final_l2_norm:
feat = L.l2_normalize(feat, axis=1)
return feat
def build_inputs(self):
user_index = L.data(
"user_index", shape=[None], dtype="int64", append_batch_size=False)
item_index = L.data(
"item_index", shape=[None], dtype="int64", append_batch_size=False)
neg_item_index = L.data(
"neg_item_index", shape=[None], dtype="int64", append_batch_size=False)
return [user_index, item_index, neg_item_index]
def build_embedding(self, graph_wrappers, inputs=None):
num_embed = int(np.load(os.path.join(self.config.graph_path, "num_nodes.npy")))
is_sparse = self.config.trainer_type == "Transpiler"
embed = L.embedding(
input=L.reshape(graph_wrappers[0].node_feat['index'], [-1, 1]),
size=[num_embed, self.config.hidden_size],
is_sparse=is_sparse,
param_attr=F.ParamAttr(name="node_embedding", initializer=F.initializer.Uniform(
low=-1. / self.config.hidden_size,
high=1. / self.config.hidden_size)))
return embed
def gnn_layers(self, graph_wrappers, feature):
features = [feature]
initializer = None
fc_lr = self.config.lr / 0.001
for i in range(self.config.num_layers):
if i == self.config.num_layers - 1:
act = None
else:
act = "leaky_relu"
feature = get_layer(
self.config.layer_type,
graph_wrappers[i],
feature,
self.config.hidden_size,
act,
initializer,
learning_rate=fc_lr,
name="%s_%s" % (self.config.layer_type, i))
features.append(feature)
return features
def __call__(self, graph_wrappers):
inputs = self.build_inputs()
feature = self.build_embedding(graph_wrappers, inputs)
features = self.gnn_layers(graph_wrappers, feature)
outputs = [self.take_final_feature(features[-1], i, "final_fc") for i in inputs]
src_real_index = L.gather(graph_wrappers[0].node_feat['index'], inputs[0])
outputs.append(src_real_index)
return inputs, outputs
def all_gather(X):
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
trainer_num = int(os.getenv("PADDLE_TRAINERS_NUM", "0"))
if trainer_num == 1:
copy_X = X * 1
copy_X.stop_gradients=True
return copy_X
Xs = []
for i in range(trainer_num):
copy_X = X * 1
copy_X = L.collective._broadcast(copy_X, i, True)
copy_X.stop_gradient=True
Xs.append(copy_X)
if len(Xs) > 1:
Xs=L.concat(Xs, 0)
Xs.stop_gradient=True
else:
Xs = Xs[0]
return Xs
class BaseLoss(object):
def __init__(self, config):
self.config = config
def __call__(self, outputs):
user_feat, item_feat, neg_item_feat = outputs[0], outputs[1], outputs[2]
loss_type = self.config.loss_type
if self.config.neg_type == "batch_neg":
neg_item_feat = item_feat
# Calc Loss
if self.config.loss_type == "hinge":
pos = L.reduce_sum(user_feat * item_feat, -1, keep_dim=True) # [B, 1]
neg = L.matmul(user_feat, neg_item_feat, transpose_y=True) # [B, B]
loss = L.reduce_mean(L.relu(neg - pos + self.config.margin))
elif self.config.loss_type == "all_hinge":
pos = L.reduce_sum(user_feat * item_feat, -1, keep_dim=True) # [B, 1]
all_pos = all_gather(pos) # [B * n, 1]
all_neg_item_feat = all_gather(neg_item_feat) # [B * n, 1]
all_user_feat = all_gather(user_feat) # [B * n, 1]
neg1 = L.matmul(user_feat, all_neg_item_feat, transpose_y=True) # [B, B * n]
neg2 = L.matmul(all_user_feat, neg_item_feat, transpose_y=True) # [B *n, B]
loss1 = L.reduce_mean(L.relu(neg1 - pos + self.config.margin))
loss2 = L.reduce_mean(L.relu(neg2 - all_pos + self.config.margin))
#loss = (loss1 + loss2) / 2
loss = loss1 + loss2
elif self.config.loss_type == "softmax":
pass
# TODO
# pos = L.reduce_sum(user_feat * item_feat, -1, keep_dim=True) # [B, 1]
# neg = L.matmul(user_feat, neg_feat, transpose_y=True) # [B, B]
# logits = L.concat([pos, neg], -1) # [B, 1+B]
# labels = L.fill_constant_batch_size_like(logits, [-1, 1], "int64", 0)
# loss = L.reduce_mean(L.softmax_with_cross_entropy(logits, labels))
else:
raise ValueError
return loss
import numpy as np
import pgl import pgl
import paddle.fluid as F import paddle.fluid as F
import paddle.fluid.layers as L import paddle.fluid.layers as L
from models.base import BaseNet, BaseGNNModel
from models.ernie_model.ernie import ErnieModel from models.ernie_model.ernie import ErnieModel
from models.ernie_model.ernie import ErnieGraphModel
from models.ernie_model.ernie import ErnieConfig
from models import message_passing
from models.message_passing import copy_send
class ErnieSageV2(BaseNet): def get_layer(layer_type, gw, feature, hidden_size, act, initializer, learning_rate, name, is_test=False):
return getattr(message_passing, layer_type)(gw, feature, hidden_size, act, initializer, learning_rate, name)
def build_inputs(self):
inputs = super(ErnieSageV2, self).build_inputs() class Encoder(object):
term_ids = L.data(
"term_ids", shape=[None, self.config.max_seqlen], dtype="int64", append_batch_size=False) def __init__(self, config):
return inputs + [term_ids] self.config = config
@classmethod
def factory(cls, config):
model_type = config.model_type
if model_type == "ERNIESageV1":
return ERNIESageV1Encoder(config)
elif model_type == "ERNIESageV2":
return ERNIESageV2Encoder(config)
elif model_type == "ERNIESageV3":
return ERNIESageV3Encoder(config)
elif model_type == "ERNIESageV4":
return ERNIESageV4Encoder(config)
else:
raise ValueError
def __call__(self, graph_wrappers, inputs):
raise NotImplementedError
class ERNIESageV1Encoder(Encoder):
def __call__(self, graph_wrappers, inputs):
feature = self.build_embedding(graph_wrappers[0].node_feat["term_ids"])
initializer = None
fc_lr = self.config.lr / 0.001
for i in range(self.config.num_layers):
if i == self.config.num_layers - 1:
act = None
else:
act = "leaky_relu"
feature = get_layer(
self.config.layer_type,
graph_wrappers[i],
feature,
self.config.hidden_size,
act,
initializer,
learning_rate=fc_lr,
name="%s_%s" % (self.config.layer_type, i))
final_feats = [self.take_final_feature(feature, i, "final_fc") for i in inputs]
return final_feats
def build_embedding(self, term_ids):
term_ids = L.unsqueeze(term_ids, [-1])
ernie_config = self.config.ernie_config
ernie = ErnieModel(
src_ids=term_ids,
sentence_ids=L.zeros_like(term_ids),
task_ids=None,
config=ernie_config,
use_fp16=False,
name="")
feature = ernie.get_pooled_output()
return feature
def take_final_feature(self, feature, index, name):
"""take final feature"""
feat = L.gather(feature, index, overwrite=False)
if self.config.final_fc:
feat = L.fc(feat,
self.config.hidden_size,
param_attr=F.ParamAttr(name=name + '_w'),
bias_attr=F.ParamAttr(name=name + '_b'))
if self.config.final_l2_norm:
feat = L.l2_normalize(feat, axis=1)
return feat
class ERNIESageV2Encoder(Encoder):
def __call__(self, graph_wrappers, inputs):
feature = graph_wrappers[0].node_feat["term_ids"]
feature = self.gnn_layer(graph_wrappers[0], feature, self.config.hidden_size, 'leaky_relu', None, 1., "erniesage_v2_0")
initializer = None
fc_lr = self.config.lr / 0.001
for i in range(1, self.config.num_layers):
if i == self.config.num_layers - 1:
act = None
else:
act = "leaky_relu"
feature = get_layer(
self.config.layer_type,
graph_wrappers[i],
feature,
self.config.hidden_size,
act,
initializer,
learning_rate=fc_lr,
name="%s_%s" % (self.config.layer_type, i))
final_feats = [self.take_final_feature(feature, i, "final_fc") for i in inputs]
return final_feats
def take_final_feature(self, feature, index, name):
"""take final feature"""
feat = L.gather(feature, index, overwrite=False)
if self.config.final_fc:
feat = L.fc(feat,
self.config.hidden_size,
param_attr=F.ParamAttr(name=name + '_w'),
bias_attr=F.ParamAttr(name=name + '_b'))
if self.config.final_l2_norm:
feat = L.l2_normalize(feat, axis=1)
return feat
def gnn_layer(self, gw, feature, hidden_size, act, initializer, learning_rate, name): def gnn_layer(self, gw, feature, hidden_size, act, initializer, learning_rate, name):
def build_position_ids(src_ids, dst_ids): def build_position_ids(src_ids, dst_ids):
...@@ -97,6 +211,36 @@ class ErnieSageV2(BaseNet): ...@@ -97,6 +211,36 @@ class ErnieSageV2(BaseNet):
return output return output
return erniesage_v2_aggregator(gw, feature, hidden_size, act, initializer, learning_rate, name) return erniesage_v2_aggregator(gw, feature, hidden_size, act, initializer, learning_rate, name)
class ERNIESageV3Encoder(Encoder):
def __call__(self, graph_wrappers, inputs):
feature = graph_wrappers[0].node_feat["term_ids"]
feature = self.gnn_layer(graph_wrappers[0], feature, self.config.hidden_size, 'leaky_relu', None, 1., "erniesage_v3_0")
final_feats = [self.take_final_feature(feature, i, "final_fc") for i in inputs]
return final_feats
def gnn_layer(self, gw, feature, hidden_size, act, initializer, learning_rate, name):
def ernie_recv(feat):
"""doc"""
num_neighbor = self.config.samples[0]
pad_value = L.zeros([1], "int64")
out, _ = L.sequence_pad(feat, pad_value=pad_value, maxlen=num_neighbor)
out = L.reshape(out, [0, self.config.max_seqlen*num_neighbor])
return out
def erniesage_v3_aggregator(gw, feature, hidden_size, act, initializer, learning_rate, name):
msg = gw.send(copy_send, nfeat_list=[("h", feature)])
neigh_feature = gw.recv(msg, ernie_recv)
neigh_feature = L.cast(L.unsqueeze(neigh_feature, [-1]), "int64")
feature = L.unsqueeze(feature, [-1])
cls = L.fill_constant_batch_size_like(feature, [-1, 1, 1], "int64", 1)
term_ids = L.concat([cls, feature[:, :-1], neigh_feature], 1)
term_ids.stop_gradient = True
return term_ids
return erniesage_v3_aggregator(gw, feature, hidden_size, act, initializer, learning_rate, name)
def gnn_layers(self, graph_wrappers, feature): def gnn_layers(self, graph_wrappers, feature):
features = [feature] features = [feature]
...@@ -116,20 +260,28 @@ class ErnieSageV2(BaseNet): ...@@ -116,20 +260,28 @@ class ErnieSageV2(BaseNet):
act, act,
initializer, initializer,
learning_rate=fc_lr, learning_rate=fc_lr,
name="%s_%s" % ("erniesage_v2", i)) name="%s_%s" % ("erniesage_v3", i))
features.append(feature) features.append(feature)
return features return features
def __call__(self, graph_wrappers): def take_final_feature(self, feature, index, name):
inputs = self.build_inputs() """take final feature"""
feature = inputs[-1] feat = L.gather(feature, index, overwrite=False)
features = self.gnn_layers(graph_wrappers, feature)
outputs = [self.take_final_feature(features[-1], i, "final_fc") for i in inputs[:-1]] ernie_config = self.config.ernie_config
src_real_index = L.gather(graph_wrappers[0].node_feat['index'], inputs[0]) ernie = ErnieGraphModel(
outputs.append(src_real_index) src_ids=feat,
return inputs, outputs config=ernie_config,
slot_seqlen=self.config.max_seqlen)
feat = ernie.get_pooled_output()
fc_lr = self.config.lr / 0.001
if self.config.final_fc:
feat = L.fc(feat,
self.config.hidden_size,
param_attr=F.ParamAttr(name=name + '_w'),
bias_attr=F.ParamAttr(name=name + '_b'))
class ErnieSageModelV2(BaseGNNModel): if self.config.final_l2_norm:
def gen_net_fn(self, config): feat = L.l2_normalize(feat, axis=1)
return ErnieSageV2(config) return feat
import pgl
import paddle.fluid as F
import paddle.fluid.layers as L
from models.base import BaseNet, BaseGNNModel
from models.ernie_model.ernie import ErnieModel
from models.ernie_model.ernie import ErnieGraphModel
from models.ernie_model.ernie import ErnieConfig
class ErnieSageV1(BaseNet):
def build_inputs(self):
inputs = super(ErnieSageV1, self).build_inputs()
term_ids = L.data(
"term_ids", shape=[None, self.config.max_seqlen], dtype="int64", append_batch_size=False)
return inputs + [term_ids]
def build_embedding(self, graph_wrappers, term_ids):
term_ids = L.unsqueeze(term_ids, [-1])
ernie_config = self.config.ernie_config
ernie = ErnieModel(
src_ids=term_ids,
sentence_ids=L.zeros_like(term_ids),
task_ids=None,
config=ernie_config,
use_fp16=False,
name="student_")
feature = ernie.get_pooled_output()
return feature
def __call__(self, graph_wrappers):
inputs = self.build_inputs()
feature = self.build_embedding(graph_wrappers, inputs[-1])
features = self.gnn_layers(graph_wrappers, feature)
outputs = [self.take_final_feature(features[-1], i, "final_fc") for i in inputs[:-1]]
src_real_index = L.gather(graph_wrappers[0].node_feat['index'], inputs[0])
outputs.append(src_real_index)
return inputs, outputs
class ErnieSageModelV1(BaseGNNModel):
def gen_net_fn(self, config):
return ErnieSageV1(config)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pgl
import paddle.fluid as F
import paddle.fluid.layers as L
from models.base import BaseNet, BaseGNNModel
from models.ernie_model.ernie import ErnieModel
from models.ernie_model.ernie import ErnieGraphModel
from models.message_passing import copy_send
class ErnieSageV3(BaseNet):
def __init__(self, config):
super(ErnieSageV3, self).__init__(config)
def build_inputs(self):
inputs = super(ErnieSageV3, self).build_inputs()
term_ids = L.data(
"term_ids", shape=[None, self.config.max_seqlen], dtype="int64", append_batch_size=False)
return inputs + [term_ids]
def gnn_layer(self, gw, feature, hidden_size, act, initializer, learning_rate, name):
def ernie_recv(feat):
"""doc"""
num_neighbor = self.config.samples[0]
pad_value = L.zeros([1], "int64")
out, _ = L.sequence_pad(feat, pad_value=pad_value, maxlen=num_neighbor)
out = L.reshape(out, [0, self.config.max_seqlen*num_neighbor])
return out
def erniesage_v3_aggregator(gw, feature, hidden_size, act, initializer, learning_rate, name):
msg = gw.send(copy_send, nfeat_list=[("h", feature)])
neigh_feature = gw.recv(msg, ernie_recv)
neigh_feature = L.cast(L.unsqueeze(neigh_feature, [-1]), "int64")
feature = L.unsqueeze(feature, [-1])
cls = L.fill_constant_batch_size_like(feature, [-1, 1, 1], "int64", 1)
term_ids = L.concat([cls, feature[:, :-1], neigh_feature], 1)
term_ids.stop_gradient = True
return term_ids
return erniesage_v3_aggregator(gw, feature, hidden_size, act, initializer, learning_rate, name)
def gnn_layers(self, graph_wrappers, feature):
features = [feature]
initializer = None
fc_lr = self.config.lr / 0.001
for i in range(self.config.num_layers):
if i == self.config.num_layers - 1:
act = None
else:
act = "leaky_relu"
feature = self.gnn_layer(
graph_wrappers[i],
feature,
self.config.hidden_size,
act,
initializer,
learning_rate=fc_lr,
name="%s_%s" % ("erniesage_v3", i))
features.append(feature)
return features
def take_final_feature(self, feature, index, name):
"""take final feature"""
feat = L.gather(feature, index, overwrite=False)
ernie_config = self.config.ernie_config
ernie = ErnieGraphModel(
src_ids=feat,
config=ernie_config,
slot_seqlen=self.config.max_seqlen)
feat = ernie.get_pooled_output()
fc_lr = self.config.lr / 0.001
# feat = L.fc(feat,
# self.config.hidden_size,
# act="relu",
# param_attr=F.ParamAttr(name=name + "_l",
# learning_rate=fc_lr),
# )
#feat = L.l2_normalize(feat, axis=1)
if self.config.final_fc:
feat = L.fc(feat,
self.config.hidden_size,
param_attr=F.ParamAttr(name=name + '_w'),
bias_attr=F.ParamAttr(name=name + '_b'))
if self.config.final_l2_norm:
feat = L.l2_normalize(feat, axis=1)
return feat
def __call__(self, graph_wrappers):
inputs = self.build_inputs()
feature = inputs[-1]
features = self.gnn_layers(graph_wrappers, feature)
outputs = [self.take_final_feature(features[-1], i, "final_fc") for i in inputs[:-1]]
src_real_index = L.gather(graph_wrappers[0].node_feat['index'], inputs[0])
outputs.append(src_real_index)
return inputs, outputs
class ErnieSageModelV3(BaseGNNModel):
def gen_net_fn(self, config):
return ErnieSageV3(config)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import glob
import os
import numpy as np
import pgl
import paddle.fluid as F
import paddle.fluid.layers as L
class Loss(object):
def __init__(self, config):
self.config = config
@classmethod
def factory(cls, config):
loss_type = config.loss_type
if loss_type == "hinge":
return HingeLoss(config)
elif loss_type == "global_hinge":
return GlobalHingeLoss(config)
elif loss_type == "softmax_with_cross_entropy":
return lambda logits, label: L.reduce_mean(L.softmax_with_cross_entropy(logits, label))
else:
raise ValueError
class HingeLoss(Loss):
def __call__(self, user_feat, pos_item_feat, neg_item_feat):
pos = L.reduce_sum(user_feat * pos_item_feat, -1, keep_dim=True) # [B, 1]
neg = L.matmul(user_feat, neg_item_feat, transpose_y=True) # [B, B]
loss = L.reduce_mean(L.relu(neg - pos + self.config.margin))
return loss
def all_gather(X):
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
trainer_num = int(os.getenv("PADDLE_TRAINERS_NUM", "0"))
if trainer_num == 1:
copy_X = X * 1
copy_X.stop_gradient=True
return copy_X
Xs = []
for i in range(trainer_num):
copy_X = X * 1
copy_X = L.collective._broadcast(copy_X, i, True)
copy_X.stop_gradient=True
Xs.append(copy_X)
if len(Xs) > 1:
Xs=L.concat(Xs, 0)
Xs.stop_gradient=True
else:
Xs = Xs[0]
return Xs
class GlobalHingeLoss(Loss):
def __call__(self, user_feat, pos_item_feat, neg_item_feat):
pos = L.reduce_sum(user_feat * pos_item_feat, -1, keep_dim=True) # [B, 1]
all_pos = all_gather(pos) # [B * n, 1]
all_neg_item_feat = all_gather(neg_item_feat) # [B * n, 1]
all_user_feat = all_gather(user_feat) # [B * n, 1]
neg1 = L.matmul(user_feat, all_neg_item_feat, transpose_y=True) # [B, B * n]
neg2 = L.matmul(all_user_feat, neg_item_feat, transpose_y=True) # [B *n, B]
loss1 = L.reduce_mean(L.relu(neg1 - pos + self.config.margin))
loss2 = L.reduce_mean(L.relu(neg2 - all_pos + self.config.margin))
loss = loss1 + loss2
return loss
此差异已折叠。
from models.base import BaseGNNModel
from models.ernie import ErnieModel
from models.erniesage_v1 import ErnieSageModelV1
from models.erniesage_v2 import ErnieSageModelV2
from models.erniesage_v3 import ErnieSageModelV3
class Model(object):
@classmethod
def factory(cls, config):
name = config.model_type
if name == "BaseGNNModel":
return BaseGNNModel(config)
if name == "ErnieModel":
return ErnieModel(config)
if name == "ErnieSageModelV1":
return ErnieSageModelV1(config)
if name == "ErnieSageModelV2":
return ErnieSageModelV2(config)
if name == "ErnieSageModelV3":
return ErnieSageModelV3(config)
else:
raise ValueError
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册