提交 ca424b7b 编写于 作者: S suweiyue

erniesage: add node classification task

上级 ed474928
# Global Enviroment Settings # Global Enviroment Settings
# #
# trainer config ------ # trainer config ------
task: "link_predict"
learner_type: "gpu" learner_type: "gpu"
optimizer_type: "adam" optimizer_type: "adam"
lr: 0.00005 lr: 0.00005
batch_size: 32 batch_size: 32
CPU_NUM: 10 CPU_NUM: 10
epoch: 20 epoch: 3
log_per_step: 1 log_per_step: 1
save_per_step: 1000 save_per_step: 1000
output_path: "./output" output_path: "./output"
ckpt_path: "./ernie_base_ckpt" ckpt_path: "./ernie_base_ckpt"
# data config ------ # data config ------
train_data: "./data.txt" train_data: "./example_data/link_predict/graph_data.txt"
graph_data: "./data.txt" graph_data: "./example_data/link_predict/train_data.txt"
graph_work_path: "./workdir" graph_work_path: "./workdir"
sample_workers: 1 sample_workers: 1
...@@ -23,7 +24,7 @@ input_type: "text" ...@@ -23,7 +24,7 @@ input_type: "text"
# model config ------ # model config ------
samples: [10] samples: [10]
model_type: "ERNIESageV3" model_type: "ERNIESageV2"
layer_type: "graphsage_sum" layer_type: "graphsage_sum"
max_seqlen: 40 max_seqlen: 40
...@@ -32,7 +33,7 @@ num_layers: 1 ...@@ -32,7 +33,7 @@ num_layers: 1
hidden_size: 128 hidden_size: 128
final_fc: true final_fc: true
final_l2_norm: true final_l2_norm: true
loss_type: "hinge" loss_type: "global_hinge"
margin: 0.1 margin: 0.1
neg_type: "batch_neg" neg_type: "batch_neg"
...@@ -47,11 +48,11 @@ ernie_config: ...@@ -47,11 +48,11 @@ ernie_config:
attention_probs_dropout_prob: 0.1 attention_probs_dropout_prob: 0.1
hidden_act: "relu" hidden_act: "relu"
hidden_dropout_prob: 0.1 hidden_dropout_prob: 0.1
hidden_size: 128 hidden_size: 768
initializer_range: 0.02 initializer_range: 0.02
max_position_embeddings: 513 max_position_embeddings: 513
num_attention_heads: 2 num_attention_heads: 12
num_hidden_layers: 2 num_hidden_layers: 12
sent_type_vocab_size: 4 sent_type_vocab_size: 4
task_type_vocab_size: 3 task_type_vocab_size: 3
vocab_size: 18000 vocab_size: 18000
......
# Global Enviroment Settings
#
# trainer config ------
task: "node_classification"
learner_type: "gpu"
optimizer_type: "adam"
lr: 0.00005
batch_size: 32
CPU_NUM: 10
epoch: 3
log_per_step: 1
save_per_step: 1000
output_path: "./output"
ckpt_path: "./ernie_base_ckpt"
# data config ------
graph_data: "./example_data/node_classification/graph_data.txt"
train_data: "./example_data/node_classification/train_data.txt"
graph_work_path: "./workdir"
sample_workers: 1
use_pyreader: true
input_type: "text"
# model config ------
num_label: 10
samples: [10]
model_type: "ERNIESageV2"
layer_type: "graphsage_sum"
max_seqlen: 40
num_layers: 1
hidden_size: 128
final_fc: true
final_l2_norm: true
loss_type: "softmax_with_cross_entropy"
margin: 0.1
neg_type: "batch_neg"
# infer config ------
infer_model: "./output/last"
infer_batch_size: 128
# ernie config ------
encoding: "utf8"
ernie_vocab_file: "./vocab.txt"
ernie_config:
attention_probs_dropout_prob: 0.1
hidden_act: "relu"
hidden_dropout_prob: 0.1
hidden_size: 768
initializer_range: 0.02
max_position_embeddings: 513
num_attention_heads: 12
num_hidden_layers: 12
sent_type_vocab_size: 4
task_type_vocab_size: 3
vocab_size: 18000
use_task_id: false
use_fp16: false
...@@ -105,9 +105,8 @@ class GraphGenerator(BaseDataGenerator): ...@@ -105,9 +105,8 @@ class GraphGenerator(BaseDataGenerator):
feed_dict["pos_item_index"] = np.array(sub_dst_idx, dtype="int64") feed_dict["pos_item_index"] = np.array(sub_dst_idx, dtype="int64")
feed_dict["neg_item_index"] = np.array(sub_neg_idx, dtype="int64") feed_dict["neg_item_index"] = np.array(sub_neg_idx, dtype="int64")
feed_dict["user_real_index"] = np.array(sub_src_idx, dtype="int64") feed_dict["user_real_index"] = np.array(batch_src, dtype="int64")
feed_dict["pos_item_real_index"] = np.array(sub_dst_idx, dtype="int64") feed_dict["pos_item_real_index"] = np.array(batch_dst, dtype="int64")
feed_dict["neg_item_real_index"] = np.array(sub_neg_idx, dtype="int64")
return feed_dict return feed_dict
def __call__(self): def __call__(self):
...@@ -126,3 +125,37 @@ class GraphGenerator(BaseDataGenerator): ...@@ -126,3 +125,37 @@ class GraphGenerator(BaseDataGenerator):
class NodeClassificationGenerator(GraphGenerator):
def batch_fn(self, batch_ex):
# batch_ex = [
# (node, label),
# (node, label),
# ]
#
batch_node = []
batch_label = []
for batch in batch_ex:
batch_node.append(batch[0])
batch_label.append(batch[1])
if len(batch_node) != self.batch_size:
if self.phase == "train":
return None #Skip
batch_node = np.array(batch_node, dtype="int64")
batch_label = np.array(batch_label, dtype="int64")
subgraphs = graphsage_sample(self.graph, batch_node, self.samples)
subgraphs[0].node_feat["index"] = subgraphs[0].reindex_to_parrent_nodes(subgraphs[0].nodes).astype(np.int64)
subgraphs[0].node_feat["term_ids"] = self.term_ids[subgraphs[0].node_feat["index"]].astype(np.int64)
feed_dict = {}
for i in range(self.num_layers):
feed_dict.update(self.graph_wrappers[i].to_feed(subgraphs[i]))
# only reindex from first subgraph
sub_node_idx = subgraphs[0].reindex_from_parrent_nodes(batch_node)
feed_dict["node_index"] = np.array(sub_node_idx, dtype="int64")
feed_dict["node_real_index"] = np.array(batch_node, dtype="int64")
feed_dict["label"] = np.array(batch_label, dtype="int64")
return feed_dict
...@@ -71,7 +71,7 @@ def run_predict(py_reader, ...@@ -71,7 +71,7 @@ def run_predict(py_reader,
for batch_feed_dict in py_reader(): for batch_feed_dict in py_reader():
batch += 1 batch += 1
batch_usr_feat, _, _, batch_src_real_index, _, _ = exe.run( batch_usr_feat, _, _, batch_src_real_index, _ = exe.run(
program, program,
feed=batch_feed_dict, feed=batch_feed_dict,
fetch_list=model_dict.outputs) fetch_list=model_dict.outputs)
......
...@@ -32,9 +32,8 @@ class TrainData(object): ...@@ -32,9 +32,8 @@ class TrainData(object):
trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
log.info("trainer_id: %s, trainer_count: %s." % (trainer_id, trainer_count)) log.info("trainer_id: %s, trainer_count: %s." % (trainer_id, trainer_count))
bidirectional_edges = np.load(os.path.join(graph_work_path, "train_data.npy"), allow_pickle=True) edges = np.load(os.path.join(graph_work_path, "train_data.npy"), allow_pickle=True)
# edges is bidirectional. # edges is bidirectional.
edges = bidirectional_edges[0::2]
train_usr = edges[trainer_id::trainer_count, 0] train_usr = edges[trainer_id::trainer_count, 0]
train_ad = edges[trainer_id::trainer_count, 1] train_ad = edges[trainer_id::trainer_count, 1]
returns = { returns = {
......
...@@ -34,6 +34,8 @@ class Loss(object): ...@@ -34,6 +34,8 @@ class Loss(object):
return HingeLoss(config) return HingeLoss(config)
elif loss_type == "global_hinge": elif loss_type == "global_hinge":
return GlobalHingeLoss(config) return GlobalHingeLoss(config)
elif loss_type == "softmax_with_cross_entropy":
return lambda logits, label: L.reduce_mean(L.softmax_with_cross_entropy(logits, label))
else: else:
raise ValueError raise ValueError
......
...@@ -48,9 +48,7 @@ class LinkPredictModel(BaseModel): ...@@ -48,9 +48,7 @@ class LinkPredictModel(BaseModel):
"user_real_index", shape=[None], dtype="int64", append_batch_size=False) "user_real_index", shape=[None], dtype="int64", append_batch_size=False)
pos_item_real_index = L.data( pos_item_real_index = L.data(
"pos_item_real_index", shape=[None], dtype="int64", append_batch_size=False) "pos_item_real_index", shape=[None], dtype="int64", append_batch_size=False)
neg_item_real_index = L.data( datas = [user_index, pos_item_index, neg_item_index, user_real_index, pos_item_real_index]
"neg_item_real_index", shape=[None], dtype="int64", append_batch_size=False)
datas = [user_index, pos_item_index, neg_item_index, user_real_index, pos_item_real_index, neg_item_real_index]
# graph_wrappers # graph_wrappers
graph_wrappers = [] graph_wrappers = []
...@@ -74,7 +72,7 @@ class LinkPredictModel(BaseModel): ...@@ -74,7 +72,7 @@ class LinkPredictModel(BaseModel):
loss = loss_func(user_feat, pos_item_feat, neg_item_feat) loss = loss_func(user_feat, pos_item_feat, neg_item_feat)
# set datas, graph_wrappers, loss, outputs # set datas, graph_wrappers, loss, outputs
return datas, graph_wrappers, loss, outputs + [user_real_index, pos_item_real_index, neg_item_real_index] return datas, graph_wrappers, loss, outputs + [user_real_index, pos_item_real_index]
class NodeClassificationModel(BaseModel): class NodeClassificationModel(BaseModel):
...@@ -83,9 +81,11 @@ class NodeClassificationModel(BaseModel): ...@@ -83,9 +81,11 @@ class NodeClassificationModel(BaseModel):
# inputs # inputs
node_index = L.data( node_index = L.data(
"node_index", shape=[None], dtype="int64", append_batch_size=False) "node_index", shape=[None], dtype="int64", append_batch_size=False)
node_real_index = L.data(
"node_real_index", shape=[None], dtype="int64", append_batch_size=False)
label = L.data( label = L.data(
"label", shape=[None], dtype="int64", append_batch_size=False) "label", shape=[None], dtype="int64", append_batch_size=False)
datas = [node_index, label] datas = [node_index, node_real_index, label]
# graph_wrappers # graph_wrappers
graph_wrappers = [] graph_wrappers = []
...@@ -100,11 +100,12 @@ class NodeClassificationModel(BaseModel): ...@@ -100,11 +100,12 @@ class NodeClassificationModel(BaseModel):
# encoder model # encoder model
encoder = Encoder.factory(self.config) encoder = Encoder.factory(self.config)
outputs = encoder(graph_wrappers, [node_index]) outputs = encoder(graph_wrappers, [node_index])
feat = outputs feat = outputs[0]
logits = L.fc(feat, self.config.num_label)
# loss # loss
label = L.reshape(label, [-1, 1])
loss_func = Loss.factory(self.config) loss_func = Loss.factory(self.config)
loss = loss_func(feat1, feat2, feat3, label) loss = loss_func(logits, label)
# set datas, graph_wrappers, loss, outputs return datas, graph_wrappers, loss, outputs + [node_real_index, logits]
return datas, graph_wrappers, loss, outputs
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
import argparse
import pickle
import time
import glob
import os
import io
import traceback
import pickle as pkl
role = os.getenv("TRAINING_ROLE", "TRAINER")
import numpy as np
import yaml
from easydict import EasyDict as edict
import pgl
from pgl.utils.logger import log
from pgl.utils import paddle_helper
import paddle
import paddle.fluid as F
from models.model import NodeClassificationModel
from dataset.graph_reader import NodeClassificationGenerator
class PredictData(object):
def __init__(self, num_nodes):
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
train_usr = np.arange(trainer_id, num_nodes, trainer_count)
#self.data = (train_usr, train_usr)
self.data = train_usr
def __getitem__(self, index):
return [self.data[index], self.data[index]]
def tostr(data_array):
return " ".join(["%.5lf" % d for d in data_array])
def run_predict(py_reader,
exe,
program,
model_dict,
log_per_step=1,
args=None):
id2str = io.open(os.path.join(args.graph_work_path, "terms.txt"), encoding=args.encoding).readlines()
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
if not os.path.exists(args.output_path):
os.mkdir(args.output_path)
fout = io.open("%s/part-%s" % (args.output_path, trainer_id), "w", encoding="utf8")
batch = 0
for batch_feed_dict in py_reader():
batch += 1
_, batch_node_real_index, batch_logits = exe.run(
program,
feed=batch_feed_dict,
fetch_list=model_dict.outputs)
if batch % log_per_step == 0:
log.info("Predict %s finished" % batch)
for idx, logits in zip(batch_node_real_index, batch_logits):
if args.input_type == "text":
text = id2str[int(idx)].strip("\n").split("\t")[-1]
#prediction = np.argmax(logits)
prediction = logits[1]
line = "{}\t{}\n".format(text, prediction)
fout.write(line)
fout.close()
def _warmstart(exe, program, path='params'):
def _existed_persitables(var):
#if not isinstance(var, fluid.framework.Parameter):
# return False
if not F.io.is_persistable(var):
return False
param_path = os.path.join(path, var.name)
log.info("Loading parameter: {} persistable: {} exists: {}".format(
param_path,
F.io.is_persistable(var),
os.path.exists(param_path),
))
return os.path.exists(param_path)
F.io.load_vars(
exe,
path,
main_program=program,
predicate=_existed_persitables
)
def main(config):
model = NodeClassificationModel(config)
if config.learner_type == "cpu":
place = F.CPUPlace()
elif config.learner_type == "gpu":
gpu_id = int(os.getenv("FLAGS_selected_gpus", "0"))
place = F.CUDAPlace(gpu_id)
else:
raise ValueError
exe = F.Executor(place)
val_program = F.default_main_program().clone(for_test=True)
exe.run(F.default_startup_program())
_warmstart(exe, F.default_startup_program(), path=config.infer_model)
num_threads = int(os.getenv("CPU_NUM", 1))
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", 0))
exec_strategy = F.ExecutionStrategy()
exec_strategy.num_threads = num_threads
build_strategy = F.BuildStrategy()
build_strategy.enable_inplace = True
build_strategy.memory_optimize = True
build_strategy.remove_unnecessary_lock = False
build_strategy.memory_optimize = False
if num_threads > 1:
build_strategy.reduce_strategy = F.BuildStrategy.ReduceStrategy.Reduce
val_compiled_prog = F.compiler.CompiledProgram(
val_program).with_data_parallel(
build_strategy=build_strategy,
exec_strategy=exec_strategy)
num_nodes = int(np.load(os.path.join(config.graph_work_path, "num_nodes.npy")))
predict_data = PredictData(num_nodes)
predict_iter = NodeClassificationGenerator(
graph_wrappers=model.graph_wrappers,
batch_size=config.infer_batch_size,
data=predict_data,
samples=config.samples,
num_workers=config.sample_workers,
feed_name_list=[var.name for var in model.feed_list],
use_pyreader=config.use_pyreader,
phase="predict",
graph_data_path=config.graph_work_path,
shuffle=False)
if config.learner_type == "cpu":
model.data_loader.decorate_batch_generator(
predict_iter, places=F.cpu_places())
elif config.learner_type == "gpu":
gpu_id = int(os.getenv("FLAGS_selected_gpus", "0"))
place = F.CUDAPlace(gpu_id)
model.data_loader.decorate_batch_generator(
predict_iter, places=place)
else:
raise ValueError
run_predict(model.data_loader,
program=val_compiled_prog,
exe=exe,
model_dict=model,
args=config)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='main')
parser.add_argument("--conf", type=str, default="./config.yaml")
args = parser.parse_args()
config = edict(yaml.load(open(args.conf), Loader=yaml.FullLoader))
print(config)
main(config)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import argparse
import traceback
import yaml
import numpy as np
from easydict import EasyDict as edict
from pgl.utils.logger import log
from pgl.utils import paddle_helper
from learner import Learner
from models.model import LinkPredictModel
from models.model import NodeClassificationModel
from dataset.graph_reader import NodeClassificationGenerator
class TrainData(object):
def __init__(self, graph_work_path):
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
log.info("trainer_id: %s, trainer_count: %s." % (trainer_id, trainer_count))
edges = np.load(os.path.join(graph_work_path, "train_data.npy"), allow_pickle=True)
# edges is bidirectional.
train_node = edges[trainer_id::trainer_count, 0]
train_label = edges[trainer_id::trainer_count, 1]
returns = {
"train_data": [train_node, train_label]
}
log.info("Load train_data done.")
self.data = returns
def __getitem__(self, index):
return [data[index] for data in self.data["train_data"]]
def __len__(self):
return len(self.data["train_data"][0])
def main(config):
# Select Model
model = NodeClassificationModel(config)
# Build Train Edges
data = TrainData(config.graph_work_path)
# Build Train Data
train_iter = NodeClassificationGenerator(
graph_wrappers=model.graph_wrappers,
batch_size=config.batch_size,
data=data,
samples=config.samples,
num_workers=config.sample_workers,
feed_name_list=[var.name for var in model.feed_list],
use_pyreader=config.use_pyreader,
phase="train",
graph_data_path=config.graph_work_path,
shuffle=True,
neg_type=config.neg_type)
log.info("build graph reader done.")
learner = Learner.factory(config.learner_type)
learner.build(model, train_iter, config)
learner.start()
learner.stop()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='main')
parser.add_argument("--conf", type=str, default="./config.yaml")
args = parser.parse_args()
config = edict(yaml.load(open(args.conf), Loader=yaml.FullLoader))
print(config)
main(config)
...@@ -44,19 +44,8 @@ def term2id(string, tokenizer, max_seqlen): ...@@ -44,19 +44,8 @@ def term2id(string, tokenizer, max_seqlen):
ids = ids + [0] * (max_seqlen - len(ids)) ids = ids + [0] * (max_seqlen - len(ids))
return ids return ids
def load_graph(args, str2id, term_file, terms, item_distribution):
def dump_graph(args):
if not os.path.exists(args.outpath):
os.makedirs(args.outpath)
neg_samples = []
str2id = dict()
term_file = io.open(os.path.join(args.outpath, "terms.txt"), "w", encoding=args.encoding)
terms = []
count = 0
item_distribution = []
edges = [] edges = []
train_data = []
with io.open(args.graphpath, encoding=args.encoding) as f: with io.open(args.graphpath, encoding=args.encoding) as f:
for idx, line in enumerate(f): for idx, line in enumerate(f):
if idx % 100000 == 0: if idx % 100000 == 0:
...@@ -65,20 +54,22 @@ def dump_graph(args): ...@@ -65,20 +54,22 @@ def dump_graph(args):
for col_idx, col in enumerate(line.strip("\n").split("\t")): for col_idx, col in enumerate(line.strip("\n").split("\t")):
s = col[:args.max_seqlen] s = col[:args.max_seqlen]
if s not in str2id: if s not in str2id:
str2id[s] = count str2id[s] = len(str2id)
count += 1
term_file.write(str(col_idx) + "\t" + col + "\n") term_file.write(str(col_idx) + "\t" + col + "\n")
item_distribution.append(0) item_distribution.append(0)
slots.append(str2id[s]) slots.append(str2id[s])
src = slots[0] src = slots[0]
dst = slots[1] dst = slots[1]
#neg_samples.append(slots[2:])
edges.append((src, dst)) edges.append((src, dst))
edges.append((dst, src)) edges.append((dst, src))
item_distribution[dst] += 1 item_distribution[dst] += 1
edges = np.array(edges, dtype="int64") edges = np.array(edges, dtype="int64")
return edges
def load_link_predict_train_data(args, str2id, term_file, terms, item_distribution):
train_data = []
neg_samples = []
with io.open(args.inpath, encoding=args.encoding) as f: with io.open(args.inpath, encoding=args.encoding) as f:
for idx, line in enumerate(f): for idx, line in enumerate(f):
if idx % 100000 == 0: if idx % 100000 == 0:
...@@ -87,8 +78,7 @@ def dump_graph(args): ...@@ -87,8 +78,7 @@ def dump_graph(args):
for col_idx, col in enumerate(line.strip("\n").split("\t")): for col_idx, col in enumerate(line.strip("\n").split("\t")):
s = col[:args.max_seqlen] s = col[:args.max_seqlen]
if s not in str2id: if s not in str2id:
str2id[s] = count str2id[s] = len(str2id)
count += 1
term_file.write(str(col_idx) + "\t" + col + "\n") term_file.write(str(col_idx) + "\t" + col + "\n")
item_distribution.append(0) item_distribution.append(0)
slots.append(str2id[s]) slots.append(str2id[s])
...@@ -98,6 +88,48 @@ def dump_graph(args): ...@@ -98,6 +88,48 @@ def dump_graph(args):
neg_samples.append(slots[2:]) neg_samples.append(slots[2:])
train_data.append((src, dst)) train_data.append((src, dst))
train_data = np.array(train_data, dtype="int64") train_data = np.array(train_data, dtype="int64")
np.save(os.path.join(args.outpath, "train_data.npy"), train_data)
if len(neg_samples) != 0:
np.save(os.path.join(args.outpath, "neg_samples.npy"), np.array(neg_samples))
def load_node_classification_train_data(args, str2id, term_file, terms, item_distribution):
train_data = []
neg_samples = []
with io.open(args.inpath, encoding=args.encoding) as f:
for idx, line in enumerate(f):
if idx % 100000 == 0:
log.info("%s readed %s lines" % (args.inpath, idx))
slots = []
col_idx = 0
slots = line.strip("\n").split("\t")
col = slots[0]
label = int(slots[1])
text = col[:args.max_seqlen]
if text not in str2id:
str2id[text] = len(str2id)
term_file.write(str(col_idx) + "\t" + col + "\n")
item_distribution.append(0)
src = str2id[text]
train_data.append([src, label])
train_data = np.array(train_data, dtype="int64")
np.save(os.path.join(args.outpath, "train_data.npy"), train_data)
def dump_graph(args):
if not os.path.exists(args.outpath):
os.makedirs(args.outpath)
str2id = dict()
term_file = io.open(os.path.join(args.outpath, "terms.txt"), "w", encoding=args.encoding)
terms = []
item_distribution = []
edges = load_graph(args, str2id, term_file, terms, item_distribution)
#load_train_data(args, str2id, term_file, terms, item_distribution)
if args.task == "link_predict":
load_link_predict_train_data(args, str2id, term_file, terms, item_distribution)
elif args.task == "node_classification":
load_node_classification_train_data(args, str2id, term_file, terms, item_distribution)
else:
raise ValueError
term_file.close() term_file.close()
num_nodes = len(str2id) num_nodes = len(str2id)
...@@ -115,15 +147,13 @@ def dump_graph(args): ...@@ -115,15 +147,13 @@ def dump_graph(args):
item_distribution = np.sqrt(item_distribution) item_distribution = np.sqrt(item_distribution)
distribution = 1. * item_distribution / item_distribution.sum() distribution = 1. * item_distribution / item_distribution.sum()
alias, events = alias_sample_build_table(distribution) alias, events = alias_sample_build_table(distribution)
np.save(os.path.join(args.outpath, "train_data.npy"), train_data)
np.save(os.path.join(args.outpath, "alias.npy"), alias) np.save(os.path.join(args.outpath, "alias.npy"), alias)
np.save(os.path.join(args.outpath, "events.npy"), events) np.save(os.path.join(args.outpath, "events.npy"), events)
np.save(os.path.join(args.outpath, "neg_samples.npy"), np.array(neg_samples))
log.info("End Build Graph") log.info("End Build Graph")
def dump_node_feat(args): def dump_node_feat(args):
log.info("Dump node feat starting...") log.info("Dump node feat starting...")
id2str = [line.strip("\n").split("\t")[1] for line in io.open(os.path.join(args.outpath, "terms.txt"), encoding=args.encoding)] id2str = [line.strip("\n").split("\t")[-1] for line in io.open(os.path.join(args.outpath, "terms.txt"), encoding=args.encoding)]
pool = multiprocessing.Pool() pool = multiprocessing.Pool()
tokenizer = FullTokenizer(args.vocab_file) tokenizer = FullTokenizer(args.vocab_file)
term_ids = pool.map(partial(term2id, tokenizer=tokenizer, max_seqlen=args.max_seqlen), id2str) term_ids = pool.map(partial(term2id, tokenizer=tokenizer, max_seqlen=args.max_seqlen), id2str)
...@@ -138,6 +168,7 @@ if __name__ == "__main__": ...@@ -138,6 +168,7 @@ if __name__ == "__main__":
parser.add_argument("-l", "--max_seqlen", type=int, default=30) parser.add_argument("-l", "--max_seqlen", type=int, default=30)
parser.add_argument("--vocab_file", type=str, default="./vocab.txt") parser.add_argument("--vocab_file", type=str, default="./vocab.txt")
parser.add_argument("--encoding", type=str, default="utf8") parser.add_argument("--encoding", type=str, default="utf8")
parser.add_argument("--task", type=str, default="link_predict", choices=["link_predict", "node_classification"])
parser.add_argument("-o", "--outpath", type=str, default=None) parser.add_argument("-o", "--outpath", type=str, default=None)
args = parser.parse_args() args = parser.parse_args()
dump_graph(args) dump_graph(args)
......
#!/bin/bash
set -x
config=${1:-"./config.yaml"}
unset http_proxy https_proxy
function parse_yaml {
local prefix=$2
local s='[[:space:]]*' w='[a-zA-Z0-9_]*' fs=$(echo @|tr @ '\034')
sed -ne "s|^\($s\):|\1|" \
-e "s|^\($s\)\($w\)$s:$s[\"']\(.*\)[\"']$s\$|\1$fs\2$fs\3|p" \
-e "s|^\($s\)\($w\)$s:$s\(.*\)$s\$|\1$fs\2$fs\3|p" $1 |
awk -F$fs '{
indent = length($1)/2;
vname[indent] = $2;
for (i in vname) {if (i > indent) {delete vname[i]}}
if (length($3) > 0) {
vn=""; for (i=0; i<indent; i++) {vn=(vn)(vname[i])("_")}
printf("%s%s%s=\"%s\"\n", "'$prefix'",vn, $2, $3);
}
}'
}
transpiler_local_train(){
export PADDLE_TRAINERS_NUM=1
export PADDLE_PSERVERS_NUM=1
export PADDLE_PORT=6206
export PADDLE_PSERVERS="127.0.0.1"
export BASE="./local_dir"
echo `which python`
if [ -d ${BASE} ]; then
rm -rf ${BASE}
fi
mkdir ${BASE}
rm job_id
for((i=0;i<${PADDLE_PSERVERS_NUM};i++))
do
echo "start ps server: ${i}"
TRAINING_ROLE="PSERVER" PADDLE_TRAINER_ID=${i} python ./link_predict_train.py --conf $config \
&> $BASE/pserver.$i.log &
echo $! >> job_id
done
sleep 3s
for((j=0;j<${PADDLE_TRAINERS_NUM};j++))
do
echo "start ps work: ${j}"
TRAINING_ROLE="TRAINER" PADDLE_TRAINER_ID=${j} python ./link_predict_train.py --conf $config
TRAINING_ROLE="TRAINER" PADDLE_TRAINER_ID=${j} python ./link_predict_infer.py --conf $config
done
}
collective_local_train(){
echo `which python`
python -m paddle.distributed.launch link_predict_train.py --conf $config
python -m paddle.distributed.launch link_predict_infer.py --conf $config
}
eval $(parse_yaml $config)
python ./preprocessing/dump_graph.py -i $train_data -g $graph_data -o $graph_work_path \
--encoding $encoding -l $max_seqlen --vocab_file $ernie_vocab_file --task $task
if [[ $learner_type == "cpu" ]];then
transpiler_local_train
fi
if [[ $learner_type == "gpu" ]];then
collective_local_train
fi
...@@ -51,13 +51,14 @@ transpiler_local_train(){ ...@@ -51,13 +51,14 @@ transpiler_local_train(){
collective_local_train(){ collective_local_train(){
echo `which python` echo `which python`
python -m paddle.distributed.launch train.py --conf $config python -m paddle.distributed.launch node_classification_train.py --conf $config
python -m paddle.distributed.launch infer.py --conf $config python -m paddle.distributed.launch node_classification_infer.py --conf $config
} }
eval $(parse_yaml $config) eval $(parse_yaml $config)
python ./preprocessing/dump_graph.py -i $train_data -g $graph_data -o $graph_work_path --encoding $encoding -l $max_seqlen --vocab_file $ernie_vocab_file python ./preprocessing/dump_graph.py -i $train_data -g $graph_data -o $graph_work_path \
--encoding $encoding -l $max_seqlen --vocab_file $ernie_vocab_file --task $task
if [[ $learner_type == "cpu" ]];then if [[ $learner_type == "cpu" ]];then
transpiler_local_train transpiler_local_train
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册