提交 b90e02d6 编写于 作者: D dongdaxiang

Merge branch 'new_develop' into ssr

repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v1.2.3
hooks:
- id: trailing-whitespace
\ No newline at end of file
# 个性化推荐中的多视角Simnet模型
## 介绍
在个性化推荐场景中,推荐系统给用户提供的项目(Item)列表通常是通过个性化的匹配模型计算出来的。在现实世界中,一个用户可能有很多个视角的特征,比如用户Id,年龄,项目的点击历史等。一个项目,举例来说,新闻资讯,也会有多种视角的特征比如新闻标题,新闻类别等。多视角Simnet模型是可以融合用户以及推荐项目的多个视角的特征并进行个性化匹配学习的一体化模型。这类模型在很多工业化的场景中都会被使用到,比如百度的Feed产品中。
## 数据集
目前,本项目实用机器生成的数据集来介绍多视角Simnet模型的概念,未来我们会逐渐加入真是世界中的数据集并在这个模型上进行效果验证。
## 模型
本项目的目标是提供一个在个性化匹配场景下利用Paddle搭建的模型。多视角Simnet模型包括多个编码器模块,每个编码器被用在不同的特征视角上。当前,项目中提供Bag-of-Embedding编码器,Temporal-Convolutional编码器,和Gated-Recurrent-Unit编码器。我们会逐渐加入稀疏特征场景下比较实用的编码器到这个项目中。模型的训练方法,当前采用的是Pairwise ranking模式进行训练,即针对一对具有关联的User-Item组合,随机实用一个Item作为负例进行排序学习。
## 训练
如下
如下命令行可以获得训练工具的具体选项,`python train.py -h`内容可以参考说明
```bash
python train.py
```
## 未来的工作
- 多种pairwise的损失函数会被加入到这个项目中。对于不同视角的特征,用户-项目之间的匹配关系可以使用不同的损失函数进行联合优化。整个模型会在真实数据中进行验证。
- 推理工具会被加入
- Parallel Executor选项会被加入
- 分布式训练能力会被加入
# Multi-view Simnet for Personalized recommendation
## Introduction
In personalized recommendation scenario, a user often is provided with several items from personalized interest matching model. In real world application, a user may have multiple views of features, say user-id, age, click-history of items, search queries. A item, e.g. news, may also have multiple views of features like news title, news category, images in news and so on. Multi-view Simnet is matching a model that combine users' and items' multiple views of features into one unified model. The model can be used in many industrial product like Baidu's feed news. The model is adapted from the paper A Multi-View Deep Learning(MV-DNN) Approach for Cross Domain User Modeling in Recommendation Systems, WWW 2015. The difference between our model and the MV-DNN is that we also consider multiple feature views of users.
## Dataset
Currently, synthetic dataset is provided for proof of concept and we aim to add more real world dataset in this project in the future.
## Model
This project aims to provide practical usage of Paddle in personalized matching scenario. The model provides several encoder modules for different views of features. Currently, Bag-of-Embedding encoder, Temporal-Convolutional encoder, Gated-Recurrent-Unit encoder are provided. We will add more practical encoder for sparse features commonly used in recommender systems. Training algorithms used in this model is pairwise ranking in that a negative item with multiple views will be sampled given a pair of positive user-item pair.
## Train
The command line options for training can be listed by `python train.py -h`
```bash
python train.py
```
## Future work
- Multiple types of pairwise loss will be added in this project. For different views of features between a user and an item, multiple losses will be supported. The model will be verified in real world dataset.
- infer will be added
- Parallel Executor will be added in this project
- Distributed Training will be added
#Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
import paddle.fluid.layers.nn as nn
import paddle.fluid.layers.tensor as tensor
import paddle.fluid.layers.control_flow as cf
import paddle.fluid.layers.io as io
class BowEncoder(object):
""" bow-encoder """
def __init__(self):
self.param_name = ""
def forward(self, emb):
return nn.sequence_pool(input=emb, pool_type='sum')
class CNNEncoder(object):
""" cnn-encoder"""
def __init__(self,
param_name="cnn.w",
win_size=3,
ksize=128,
act='tanh',
pool_type='max'):
self.param_name = param_name
self.win_size = win_size
self.ksize = ksize
self.act = act
self.pool_type = pool_type
def forward(self, emb):
return fluid.nets.sequence_conv_pool(
input=emb,
num_filters=self.ksize,
filter_size=self.win_size,
act=self.act,
pool_type=self.pool_type,
attr=self.param_name)
class GrnnEncoder(object):
""" grnn-encoder """
def __init__(self, param_name="grnn.w", hidden_size=128):
self.param_name = args
self.hidden_size = hidden_size
def forward(self, emb):
fc0 = nn.fc(input=emb, size=self.hidden_size * 3)
gru_h = nn.dynamic_gru(
input=emb,
size=self.hidden_size,
is_reverse=False,
attr=self.param_name)
return nn.sequence_pool(input=gru_h, pool_type='max')
'''this is a very simple Encoder factory
most default argument values are used'''
class SimpleEncoderFactory(object):
def __init__(self):
pass
''' create an encoder through create function '''
def create(self, enc_type, enc_hid_size):
if enc_type == "bow":
bow_encode = BowEncoder()
return bow_encode
elif enc_type == "cnn":
cnn_encode = CNNEncoder(ksize=enc_hid_size)
return cnn_encode
elif enc_type == "gru":
rnn_encode = GrnnEncoder(hidden_size=enc_hid_size)
return rnn_encode
class MultiviewSimnet(object):
""" multi-view simnet """
def __init__(self, embedding_size, embedding_dim, hidden_size):
self.embedding_size = embedding_size
self.embedding_dim = embedding_dim
self.emb_shape = [self.embedding_size, self.embedding_dim]
self.hidden_size = hidden_size
self.margin = 0.1
def set_query_encoder(self, encoders):
self.query_encoders = encoders
def set_title_encoder(self, encoders):
self.title_encoders = encoders
def get_correct(self, x, y):
less = tensor.cast(cf.less_than(x, y), dtype='float32')
correct = nn.reduce_sum(less)
return correct
def train_net(self):
# input fields for query, pos_title, neg_title
q_slots = [
io.data(
name="q%d" % i, shape=[1], lod_level=1, dtype='int64')
for i in range(len(self.query_encoders))
]
pt_slots = [
io.data(
name="pt%d" % i, shape=[1], lod_level=1, dtype='int64')
for i in range(len(self.title_encoders))
]
nt_slots = [
io.data(
name="nt%d" % i, shape=[1], lod_level=1, dtype='int64')
for i in range(len(self.title_encoders))
]
# lookup embedding for each slot
q_embs = [
nn.embedding(
input=query, size=self.emb_shape, param_attr="emb.w")
for query in q_slots
]
pt_embs = [
nn.embedding(
input=title, size=self.emb_shape, param_attr="emb.w")
for title in pt_slots
]
nt_embs = [
nn.embedding(
input=title, size=self.emb_shape, param_attr="emb.w")
for title in nt_slots
]
# encode each embedding field with encoder
q_encodes = [
self.query_encoders[i].forward(emb) for i, emb in enumerate(q_embs)
]
pt_encodes = [
self.title_encoders[i].forward(emb) for i, emb in enumerate(pt_embs)
]
nt_encodes = [
self.title_encoders[i].forward(emb) for i, emb in enumerate(nt_embs)
]
# concat multi view for query, pos_title, neg_title
q_concat = nn.concat(q_encodes)
pt_concat = nn.concat(pt_encodes)
nt_concat = nn.concat(nt_encodes)
# projection of hidden layer
q_hid = nn.fc(q_concat, size=self.hidden_size, param_attr='q_fc.w')
pt_hid = nn.fc(pt_concat, size=self.hidden_size, param_attr='t_fc.w')
nt_hid = nn.fc(nt_concat, size=self.hidden_size, param_attr='t_fc.w')
# cosine of hidden layers
cos_pos = nn.cos_sim(q_hid, pt_hid)
cos_neg = nn.cos_sim(q_hid, nt_hid)
# pairwise hinge_loss
loss_part1 = nn.elementwise_sub(
tensor.fill_constant_batch_size_like(
input=cos_pos,
shape=[-1, 1],
value=self.margin,
dtype='float32'),
cos_pos)
loss_part2 = nn.elementwise_add(loss_part1, cos_neg)
loss_part3 = nn.elementwise_max(
tensor.fill_constant_batch_size_like(
input=loss_part2, shape=[-1, 1], value=0.0, dtype='float32'),
loss_part2)
avg_cost = nn.mean(loss_part3)
correct = self.get_correct(cos_pos, cos_neg)
return q_slots + pt_slots + nt_slots, avg_cost, correct
def pred_net(self, query_fields, pos_title_fields, neg_title_fields):
q_slots = [
io.data(
name="q%d" % i, shape=[1], lod_level=1, dtype='int64')
for i in range(len(self.query_encoders))
]
pt_slots = [
io.data(
name="pt%d" % i, shape=[1], lod_level=1, dtype='int64')
for i in range(len(self.title_encoders))
]
# lookup embedding for each slot
q_embs = [
nn.embedding(
input=query, size=self.emb_shape, param_attr="emb.w")
for query in q_slots
]
pt_embs = [
nn.embedding(
input=title, size=self.emb_shape, param_attr="emb.w")
for title in pt_slots
]
# encode each embedding field with encoder
q_encodes = [
self.query_encoder[i].forward(emb) for i, emb in enumerate(q_embs)
]
pt_encodes = [
self.title_encoders[i].forward(emb) for i, emb in enumerate(pt_embs)
]
# concat multi view for query, pos_title, neg_title
q_concat = nn.concat(q_encodes)
pt_concat = nn.concat(pt_encodes)
# projection of hidden layer
q_hid = nn.fc(q_concat, size=self.hidden_size, param_attr='q_fc.w')
pt_hid = nn.fc(pt_concat, size=self.hidden_size, param_attr='t_fc.w')
# cosine of hidden layers
cos = nn.cos_sim(q_hid, pt_hid)
return cos
#Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
class Dataset:
def __init__(self):
pass
class SyntheticDataset(Dataset):
def __init__(self, sparse_feature_dim, query_slot_num, title_slot_num):
# ids are randomly generated
self.ids_per_slot = 10
self.sparse_feature_dim = sparse_feature_dim
self.query_slot_num = query_slot_num
self.title_slot_num = title_slot_num
self.dataset_size = 10000
def _reader_creator(self, is_train):
def generate_ids(num, space):
return [random.randint(0, space - 1) for i in range(num)]
def reader():
for i in range(self.dataset_size):
query_slots = []
pos_title_slots = []
neg_title_slots = []
for i in range(self.query_slot_num):
qslot = generate_ids(self.ids_per_slot,
self.sparse_feature_dim)
query_slots.append(qslot)
for i in range(self.title_slot_num):
pt_slot = generate_ids(self.ids_per_slot,
self.sparse_feature_dim)
pos_title_slots.append(pt_slot)
if is_train:
for i in range(self.title_slot_num):
nt_slot = generate_ids(self.ids_per_slot,
self.sparse_feature_dim)
neg_title_slots.append(nt_slot)
yield query_slots + pos_title_slots + neg_title_slots
else:
yield query_slots + pos_title_slots
return reader
def train(self):
return self._reader_creator(True)
def valid(self):
return self._reader_creator(True)
def test(self):
return self._reader_creator(False)
#Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import time
import six
import numpy as np
import math
import argparse
import logging
import paddle.fluid as fluid
import paddle
import time
import reader as reader
from nets import MultiviewSimnet, SimpleEncoderFactory
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
def parse_args():
parser = argparse.ArgumentParser("multi-view simnet")
parser.add_argument("--train_file", type=str, help="Training file")
parser.add_argument("--valid_file", type=str, help="Validation file")
parser.add_argument(
"--epochs", type=int, default=10, help="Number of epochs for training")
parser.add_argument(
"--model_output_dir",
type=str,
default='model_output',
help="Model output folder")
parser.add_argument(
"--query_slots", type=int, default=1, help="Number of query slots")
parser.add_argument(
"--title_slots", type=int, default=1, help="Number of title slots")
parser.add_argument(
"--query_encoder",
type=str,
default="bow",
help="Encoder module for slot encoding")
parser.add_argument(
"--title_encoder",
type=str,
default="bow",
help="Encoder module for slot encoding")
parser.add_argument(
"--query_encode_dim",
type=int,
default=128,
help="Dimension of query encoder output")
parser.add_argument(
"--title_encode_dim",
type=int,
default=128,
help="Dimension of title encoder output")
parser.add_argument(
"--batch_size", type=int, default=128, help="Batch size for training")
parser.add_argument(
"--embedding_dim",
type=int,
default=128,
help="Default Dimension of Embedding")
parser.add_argument(
"--sparse_feature_dim",
type=int,
default=1000001,
help="Sparse feature hashing space"
"for index processing")
parser.add_argument(
"--hidden_size", type=int, default=128, help="Hidden dim")
return parser.parse_args()
def start_train(args):
dataset = reader.SyntheticDataset(args.sparse_feature_dim, args.query_slots,
args.title_slots)
train_reader = paddle.batch(
paddle.reader.shuffle(
dataset.train(), buf_size=args.batch_size * 100),
batch_size=args.batch_size)
place = fluid.CPUPlace()
factory = SimpleEncoderFactory()
query_encoders = [
factory.create(args.query_encoder, args.query_encode_dim)
for i in range(args.query_slots)
]
title_encoders = [
factory.create(args.title_encoder, args.title_encode_dim)
for i in range(args.title_slots)
]
m_simnet = MultiviewSimnet(args.sparse_feature_dim, args.embedding_dim,
args.hidden_size)
m_simnet.set_query_encoder(query_encoders)
m_simnet.set_title_encoder(title_encoders)
all_slots, avg_cost, correct = m_simnet.train_net()
optimizer = fluid.optimizer.Adam(learning_rate=1e-4)
optimizer.minimize(avg_cost)
startup_program = fluid.default_startup_program()
loop_program = fluid.default_main_program()
feeder = fluid.DataFeeder(feed_list=all_slots, place=place)
exe = fluid.Executor(place)
exe.run(startup_program)
for pass_id in range(args.epochs):
for batch_id, data in enumerate(train_reader()):
loss_val, correct_val = exe.run(loop_program,
feed=feeder.feed(data),
fetch_list=[avg_cost, correct])
logger.info("TRAIN --> pass: {} batch_id: {} avg_cost: {}, acc: {}"
.format(pass_id, batch_id, loss_val,
float(correct_val) / args.batch_size))
fluid.io.save_inference_model(args.model_output_dir,
[var.name for val in all_slots],
[avg_cost, correct], exe)
def main():
args = parse_args()
start_train(args)
if __name__ == "__main__":
main()
#!/bin/bash
set -x
unset http_proxy
unset https_proxy
#pserver
export TRAINING_ROLE=PSERVER
export PADDLE_PORT=30134
export PADDLE_PSERVERS=127.0.0.1
export PADDLE_IS_LOCAL=0
export PADDLE_INIT_TRAINER_COUNT=1
export POD_IP=127.0.0.1
export PADDLE_TRAINER_ID=0
export PADDLE_TRAINERS_NUM=1
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/:/usr/local/lib/:/workspace/brpc
export PYTHONPATH=$PYTHONPATH:/paddle/build/build_reader_RelWithDebInfo_gpu/python
#GLOG_v=7 GLOG_logtostderr=1
CUDA_VISIBLE_DEVICES=4,5,6,7 python -u train.py \
--src_vocab_fpath 'cluster_test_data_en_fr/thirdparty/vocab.wordpiece.en-fr' \
--trg_vocab_fpath 'cluster_test_data_en_fr/thirdparty/vocab.wordpiece.en-fr' \
--special_token '<s>' '<e>' '<unk>' \
--token_delimiter '\x01' \
--train_file_pattern 'cluster_test_data_en_fr/train/train.wordpiece.en-fr.0' \
--val_file_pattern 'cluster_test_data_en_fr/thirdparty/newstest2014.wordpiece.en-fr' \
--use_token_batch True \
--batch_size 3200 \
--sort_type pool \
--pool_size 200000 \
--local False > pserver.log 2>&1 &
pserver_pid=$(echo $!)
echo $pserver_pid
sleep 30s
#trainer
export TRAINING_ROLE=TRAINER
export PADDLE_PORT=30134
export PADDLE_PSERVERS=127.0.0.1
export PADDLE_IS_LOCAL=0
export PADDLE_INIT_TRAINER_COUNT=1
export POD_IP=127.0.0.1
export PADDLE_TRAINER_ID=0
export PADDLE_TRAINERS_NUM=1
CUDA_VISIBLE_DEVICES=4,5,6,7 python -u train.py \
--src_vocab_fpath 'cluster_test_data_en_fr/thirdparty/vocab.wordpiece.en-fr' \
--trg_vocab_fpath 'cluster_test_data_en_fr/thirdparty/vocab.wordpiece.en-fr' \
--special_token '<s>' '<e>' '<unk>' \
--token_delimiter '\x01' \
--train_file_pattern 'cluster_test_data_en_fr/train/train.wordpiece.en-fr.0' \
--val_file_pattern 'cluster_test_data_en_fr/thirdparty/newstest2014.wordpiece.en-fr' \
--use_token_batch True \
--batch_size 3200 \
--sort_type pool \
--pool_size 200000 \
--local False > trainer.log 2>&1 &
#sleep 80
#kill -9 $pserver_pid
......@@ -643,7 +643,7 @@ def train(args):
if args.sync:
lr_decay = fluid.layers.learning_rate_scheduler.noam_decay(
ModelHyperParams.d_model, TrainTaskConfig.warmup_steps)
print("before adam")
logging.info("before adam")
with fluid.default_main_program()._lr_schedule_guard():
learning_rate = lr_decay * TrainTaskConfig.learning_rate
......@@ -661,7 +661,7 @@ def train(args):
fluid.memory_optimize(train_prog)
if args.local:
print("local start_up:")
logging.info("local start_up:")
train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost,
token_num, predict, pyreader)
else:
......@@ -677,9 +677,9 @@ def train(args):
if trainer_id == 0:
logging.info("train_id == 0, sleep 60s")
time.sleep(60)
print("trainers_num:", trainers_num)
print("worker_endpoints:", worker_endpoints)
print("current_endpoint:", current_endpoint)
logging.info("trainers_num:{}".format(trainers_num))
logging.info("worker_endpoints:{}".format(worker_endpoints))
logging.info("current_endpoint:{}".format(current_endpoint))
append_nccl2_prepare(trainer_id, worker_endpoints, current_endpoint)
train_loop(exe,
fluid.default_main_program(), dev_count, sum_cost,
......@@ -696,11 +696,11 @@ def train(args):
current_endpoint = os.getenv("POD_IP") + ":" + port
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
print("pserver_endpoints", pserver_endpoints)
print("current_endpoint", current_endpoint)
print("trainer_id", trainer_id)
print("pserver_ips", pserver_ips)
print("port", port)
logging.info("pserver_endpoints:{}".format(pserver_endpoints))
logging.info("current_endpoint:{}".format(current_endpoint))
logging.info("trainer_id:{}".format(trainer_id))
logging.info("pserver_ips:{}".format(pserver_ips))
logging.info("port:{}".format(port))
t = fluid.DistributeTranspiler()
t.transpile(
......@@ -715,30 +715,17 @@ def train(args):
current_endpoint = os.getenv("POD_IP") + ":" + os.getenv(
"PADDLE_PORT")
if not current_endpoint:
print("need env SERVER_ENDPOINT")
logging.critical("need env SERVER_ENDPOINT")
exit(1)
pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint,
pserver_prog)
print("pserver start:")
program_to_code(pserver_startup)
print("pserver train:")
program_to_code(pserver_prog)
#sys.exit(0)
exe.run(pserver_startup)
exe.run(pserver_prog)
elif training_role == "TRAINER":
logging.info("distributed: trainer started")
trainer_prog = t.get_trainer_program()
'''
print("trainer start:")
program_to_code(pserver_startup)
print("trainer train:")
program_to_code(trainer_prog)
sys.exit(0)
'''
train_loop(exe, train_prog, startup_prog, dev_count, sum_cost,
avg_cost, token_num, predict, pyreader)
......
......@@ -87,8 +87,8 @@ def evaluate(epoch_id, exe, inference_program, dev_reader, test_reader, fetch_li
def train_and_evaluate(train_reader,
test_reader,
dev_reader,
test_reader,
network,
optimizer,
global_config,
......@@ -246,7 +246,10 @@ def main():
# use cuda or not
if not global_config.has_member('use_cuda'):
global_config.use_cuda = 'CUDA_VISIBLE_DEVICES' in os.environ
if 'CUDA_VISIBLE_DEVICES' in os.environ and os.environ['CUDA_VISIBLE_DEVICES'] != '':
global_config.use_cuda = True
else:
global_config.use_cuda = False
global_config.list_config()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册