提交 b3943336 编写于 作者: Q Qiao Longfei

init word2vec

上级 4f244555
# 基于DNN模型的点击率预估模型
## 介绍
本模型实现了下述论文中提出的DNN模型:
```text
@inproceedings{guo2017deepfm,
title={DeepFM: A Factorization-Machine based Neural Network for CTR Prediction},
author={Huifeng Guo, Ruiming Tang, Yunming Ye, Zhenguo Li and Xiuqiang He},
booktitle={the Twenty-Sixth International Joint Conference on Artificial Intelligence (IJCAI)},
pages={1725--1731},
year={2017}
}
```
## 运行环境
需要先安装PaddlePaddle Fluid,然后运行:
```shell
pip install -r requirements.txt
```
## 数据集
本文使用的是Kaggle公司举办的[展示广告竞赛](https://www.kaggle.com/c/criteo-display-ad-challenge/)中所使用的Criteo数据集。
每一行是一次广告展示的特征,第一列是一个标签,表示这次广告展示是否被点击。总共有39个特征,其中13个特征采用整型值,另外26个特征是类别类特征。测试集中是没有标签的。
下载数据集:
```bash
cd data && ./download.sh && cd ..
```
## 模型
本例子只实现了DeepFM论文中介绍的模型的DNN部分,DeepFM会在其他例子中给出。
## 数据准备
处理原始数据集,整型特征使用min-max归一化方法规范到[0, 1],类别类特征使用了one-hot编码。原始数据集分割成两部分:90%用于训练,其他10%用于训练过程中的验证。
## 训练
训练的命令行选项可以通过`python train.py -h`列出。
### 单机训练:
```bash
python train.py \
--train_data_path data/raw/train.txt \
2>&1 | tee train.log
```
训练到第1轮的第40000个batch后,测试的AUC为0.801178,误差(cost)为0.445196。
### 分布式训练
本地启动一个2 trainer 2 pserver的分布式训练任务,分布式场景下训练数据会按照trainer的id进行切分,保证trainer之间的训练数据不会重叠,提高训练效率
```bash
sh cluster_train.sh
```
## 预测
预测的命令行选项可以通过`python infer.py -h`列出。
对测试集进行预测:
```bash
python infer.py \
--model_path models/pass-0/ \
--data_path data/raw/valid.txt
```
注意:infer.py跑完最后输出的AUC才是整个预测文件的整体AUC。
## 在百度云上运行集群训练
1. 参考文档 [在百度云上启动Fluid分布式训练](https://github.com/PaddlePaddle/FluidDoc/blob/develop/doc/fluid/user_guides/howto/training/train_on_baidu_cloud_cn.rst) 在百度云上部署一个CPU集群。
1. 用preprocess.py处理训练数据生成train.txt。
1. 将train.txt切分成集群机器份,放到每台机器上。
1. 用上面的 `分布式训练` 中的命令行启动分布式训练任务.
# DNN for Click-Through Rate prediction
## Introduction
This model implements the DNN part proposed in the following paper:
```text
@inproceedings{guo2017deepfm,
title={DeepFM: A Factorization-Machine based Neural Network for CTR Prediction},
author={Huifeng Guo, Ruiming Tang, Yunming Ye, Zhenguo Li and Xiuqiang He},
booktitle={the Twenty-Sixth International Joint Conference on Artificial Intelligence (IJCAI)},
pages={1725--1731},
year={2017}
}
```
The DeepFm combines factorization machine and deep neural networks to model
both low order and high order feature interactions. For details of the
factorization machines, please refer to the paper [factorization
machines](https://www.csie.ntu.edu.tw/~b97053/paper/Rendle2010FM.pdf)
## Environment
You should install PaddlePaddle Fluid first, and run:
```shell
pip install -r requirements.txt
```
## Dataset
This example uses Criteo dataset which was used for the [Display Advertising
Challenge](https://www.kaggle.com/c/criteo-display-ad-challenge/)
hosted by Kaggle.
Each row is the features for an ad display and the first column is a label
indicating whether this ad has been clicked or not. There are 39 features in
total. 13 features take integer values and the other 26 features are
categorical features. For the test dataset, the labels are omitted.
Download dataset:
```bash
cd data && ./download.sh && cd ..
```
## Model
This Demo only implement the DNN part of the model described in DeepFM paper.
DeepFM model will be provided in other model.
## Data Preprocessing method
To preprocess the raw dataset, the integer features are clipped then min-max
normalized to [0, 1] and the categorical features are one-hot encoded. The raw
training dataset are splited such that 90% are used for training and the other
10% are used for validation during training. In reader.py, training data is the first
90% of data in train.txt, and validation data is the left.
## Train
The command line options for training can be listed by `python train.py -h`.
### Local Train:
```bash
python train.py \
--train_data_path data/raw/train.txt \
2>&1 | tee train.log
```
After training pass 1 batch 40000, the testing AUC is `0.801178` and the testing
cost is `0.445196`.
### Distributed Train
Run a 2 pserver 2 trainer distribute training on a single machine.
In distributed training setting, training data is splited by trainer_id, so that training data
do not overlap among trainers
```bash
sh cluster_train.sh
```
## Infer
The command line options for infering can be listed by `python infer.py -h`.
To make inference for the test dataset:
```bash
python infer.py \
--model_path models/ \
--data_path data/raw/train.txt
```
Note: The AUC value in the last log info is the total AUC for all test dataset. Here, train.txt is splited inside the reader.py so that validation data does not have overlap with training data.
## Train on Baidu Cloud
1. Please prepare some CPU machines on Baidu Cloud following the steps in [train_on_baidu_cloud](https://github.com/PaddlePaddle/FluidDoc/blob/develop/doc/fluid/user_guides/howto/training/train_on_baidu_cloud_cn.rst)
1. Prepare dataset using preprocess.py.
1. Split the train.txt to trainer_num parts and put them on the machines.
1. Run training with the cluster train using the command in `Distributed Train` above.
#!/bin/bash
# start pserver0
python train.py \
--train_data_path /paddle/data/train.txt \
--is_local 0 \
--role pserver \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--current_endpoint 127.0.0.1:6000 \
--trainers 2 \
> pserver0.log 2>&1 &
# start pserver1
python train.py \
--train_data_path /paddle/data/train.txt \
--is_local 0 \
--role pserver \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--current_endpoint 127.0.0.1:6001 \
--trainers 2 \
> pserver1.log 2>&1 &
# start trainer0
python train.py \
--train_data_path /paddle/data/train.txt \
--is_local 0 \
--role trainer \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--trainers 2 \
--trainer_id 0 \
> trainer0.log 2>&1 &
# start trainer1
python train.py \
--train_data_path /paddle/data/train.txt \
--is_local 0 \
--role trainer \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--trainers 2 \
--trainer_id 1 \
> trainer1.log 2>&1 &
\ No newline at end of file
import argparse
import logging
import numpy as np
# disable gpu training for this example
import os
os.environ["CUDA_VISIBLE_DEVICES"] = ""
import paddle
import paddle.fluid as fluid
import reader
from network_conf import ctr_dnn_model
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
def parse_args():
parser = argparse.ArgumentParser(description="PaddlePaddle DeepFM example")
parser.add_argument(
'--model_path',
type=str,
required=True,
help="The path of model parameters gz file")
parser.add_argument(
'--data_path',
type=str,
required=True,
help="The path of the dataset to infer")
parser.add_argument(
'--embedding_size',
type=int,
default=10,
help="The size for embedding layer (default:10)")
parser.add_argument(
'--sparse_feature_dim',
type=int,
default=1000001,
help="The size for embedding layer (default:1000001)")
parser.add_argument(
'--batch_size',
type=int,
default=1000,
help="The size of mini-batch (default:1000)")
return parser.parse_args()
def infer():
args = parse_args()
place = fluid.CPUPlace()
inference_scope = fluid.core.Scope()
dataset = reader.CriteoDataset(args.sparse_feature_dim)
test_reader = paddle.batch(
dataset.test([args.data_path]), batch_size=args.batch_size)
startup_program = fluid.framework.Program()
test_program = fluid.framework.Program()
with fluid.framework.program_guard(test_program, startup_program):
loss, data_list, auc_var, batch_auc_var = ctr_dnn_model(
args.embedding_size, args.sparse_feature_dim)
exe = fluid.Executor(place)
feeder = fluid.DataFeeder(feed_list=data_list, place=place)
with fluid.scope_guard(inference_scope):
[inference_program, _, fetch_targets] = fluid.io.load_inference_model(
args.model_path, exe)
def set_zero(var_name):
param = inference_scope.var(var_name).get_tensor()
param_array = np.zeros(param._get_dims()).astype("int64")
param.set(param_array, place)
auc_states_names = ['_generated_var_2', '_generated_var_3']
for name in auc_states_names:
set_zero(name)
for batch_id, data in enumerate(test_reader()):
loss_val, auc_val = exe.run(inference_program,
feed=feeder.feed(data),
fetch_list=fetch_targets)
if batch_id % 100 == 0:
logger.info("TEST --> batch: {} loss: {} auc: {}".format(
batch_id, loss_val / args.batch_size, auc_val))
if __name__ == '__main__':
infer()
import paddle.fluid as fluid
import math
def skip_gram_word2vec(dict_size, embedding_size):
input_word = fluid.layers.data(name="input_word", shape=[1], dtype='int64')
emb = fluid.layers.embedding(
input=input_word,
size=[dict_size, embedding_size],
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(dict_size))))
predict_word = fluid.layers.data(
name='predict_word', shape=[1], dtype='int64')
data_list = [input_word, predict_word]
w_param_name = "nce_w"
fluid.default_main_program().global_block().create_parameter(
shape=[dict_size, embedding_size], dtype='float32', name=w_param_name)
b_param_name = "nce_b"
fluid.default_main_program().global_block().create_parameter(
shape=[dict_size, 1], dtype='float32', name=b_param_name)
cost = fluid.layers.nce(input=emb,
label=predict_word,
num_total_classes=dict_size,
param_attr=fluid.ParamAttr(name=w_param_name),
bias_attr=fluid.ParamAttr(name=b_param_name),
num_neg_samples=5)
avg_cost = fluid.layers.reduce_mean(cost)
return avg_cost, data_list
"""
Preprocess Criteo dataset. This dataset was used for the Display Advertising
Challenge (https://www.kaggle.com/c/criteo-display-ad-challenge).
"""
import os
import sys
import click
import random
import collections
# There are 13 integer features and 26 categorical features
continous_features = range(1, 14)
categorial_features = range(14, 40)
# Clip integer features. The clip point for each integer feature
# is derived from the 95% quantile of the total values in each feature
continous_clip = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
class CategoryDictGenerator:
"""
Generate dictionary for each of the categorical features
"""
def __init__(self, num_feature):
self.dicts = []
self.num_feature = num_feature
for i in range(0, num_feature):
self.dicts.append(collections.defaultdict(int))
def build(self, datafile, categorial_features, cutoff=0):
with open(datafile, 'r') as f:
for line in f:
features = line.rstrip('\n').split('\t')
for i in range(0, self.num_feature):
if features[categorial_features[i]] != '':
self.dicts[i][features[categorial_features[i]]] += 1
for i in range(0, self.num_feature):
self.dicts[i] = filter(lambda x: x[1] >= cutoff,
self.dicts[i].items())
self.dicts[i] = sorted(self.dicts[i], key=lambda x: (-x[1], x[0]))
vocabs, _ = list(zip(*self.dicts[i]))
self.dicts[i] = dict(zip(vocabs, range(1, len(vocabs) + 1)))
self.dicts[i]['<unk>'] = 0
def gen(self, idx, key):
if key not in self.dicts[idx]:
res = self.dicts[idx]['<unk>']
else:
res = self.dicts[idx][key]
return res
def dicts_sizes(self):
return map(len, self.dicts)
class ContinuousFeatureGenerator:
"""
Normalize the integer features to [0, 1] by min-max normalization
"""
def __init__(self, num_feature):
self.num_feature = num_feature
self.min = [sys.maxint] * num_feature
self.max = [-sys.maxint] * num_feature
def build(self, datafile, continous_features):
with open(datafile, 'r') as f:
for line in f:
features = line.rstrip('\n').split('\t')
for i in range(0, self.num_feature):
val = features[continous_features[i]]
if val != '':
val = int(val)
if val > continous_clip[i]:
val = continous_clip[i]
self.min[i] = min(self.min[i], val)
self.max[i] = max(self.max[i], val)
def gen(self, idx, val):
if val == '':
return 0.0
val = float(val)
return (val - self.min[idx]) / (self.max[idx] - self.min[idx])
@click.command("preprocess")
@click.option("--datadir", type=str, help="Path to raw criteo dataset")
@click.option("--outdir", type=str, help="Path to save the processed data")
def preprocess(datadir, outdir):
"""
All 13 integer features are normalized to continuous values and these continuous
features are combined into one vector with dimension of 13.
Each of the 26 categorical features are one-hot encoded and all the one-hot
vectors are combined into one sparse binary vector.
"""
dists = ContinuousFeatureGenerator(len(continous_features))
dists.build(os.path.join(datadir, 'train.txt'), continous_features)
dicts = CategoryDictGenerator(len(categorial_features))
dicts.build(
os.path.join(datadir, 'train.txt'), categorial_features, cutoff=200)
dict_sizes = dicts.dicts_sizes()
categorial_feature_offset = [0]
for i in range(1, len(categorial_features)):
offset = categorial_feature_offset[i - 1] + dict_sizes[i - 1]
categorial_feature_offset.append(offset)
random.seed(0)
# 90% of the data are used for training, and 10% of the data are used
# for validation.
with open(os.path.join(outdir, 'train.txt'), 'w') as out_train:
with open(os.path.join(outdir, 'valid.txt'), 'w') as out_valid:
with open(os.path.join(datadir, 'train.txt'), 'r') as f:
for line in f:
features = line.rstrip('\n').split('\t')
continous_vals = []
for i in range(0, len(continous_features)):
val = dists.gen(i, features[continous_features[i]])
continous_vals.append("{0:.6f}".format(val).rstrip('0')
.rstrip('.'))
categorial_vals = []
for i in range(0, len(categorial_features)):
val = dicts.gen(i, features[categorial_features[
i]]) + categorial_feature_offset[i]
categorial_vals.append(str(val))
continous_vals = ','.join(continous_vals)
categorial_vals = ','.join(categorial_vals)
label = features[0]
if random.randint(0, 9999) % 10 != 0:
out_train.write('\t'.join(
[continous_vals, categorial_vals, label]) + '\n')
else:
out_valid.write('\t'.join(
[continous_vals, categorial_vals, label]) + '\n')
with open(os.path.join(outdir, 'test.txt'), 'w') as out:
with open(os.path.join(datadir, 'test.txt'), 'r') as f:
for line in f:
features = line.rstrip('\n').split('\t')
continous_vals = []
for i in range(0, len(continous_features)):
val = dists.gen(i, features[continous_features[i] - 1])
continous_vals.append("{0:.6f}".format(val).rstrip('0')
.rstrip('.'))
categorial_vals = []
for i in range(0, len(categorial_features)):
val = dicts.gen(i, features[categorial_features[
i] - 1]) + categorial_feature_offset[i]
categorial_vals.append(str(val))
continous_vals = ','.join(continous_vals)
categorial_vals = ','.join(categorial_vals)
out.write('\t'.join([continous_vals, categorial_vals]) + '\n')
if __name__ == "__main__":
preprocess()
# -*- coding: utf-8 -*
import time
import numpy as np
import random
from collections import Counter
"""
refs: https://github.com/NELSONZHAO/zhihu/blob/master/skip_gram/Skip-Gram-English-Corpus.ipynb
"""
with open('data/text8.txt') as f:
text = f.read()
# 定义函数来完成数据的预处理
def preprocess(text, freq=5):
'''
对文本进行预处理
参数
---
text: 文本数据
freq: 词频阈值
'''
# 对文本中的符号进行替换
text = text.lower()
text = text.replace('.', ' <PERIOD> ')
text = text.replace(',', ' <COMMA> ')
text = text.replace('"', ' <QUOTATION_MARK> ')
text = text.replace(';', ' <SEMICOLON> ')
text = text.replace('!', ' <EXCLAMATION_MARK> ')
text = text.replace('?', ' <QUESTION_MARK> ')
text = text.replace('(', ' <LEFT_PAREN> ')
text = text.replace(')', ' <RIGHT_PAREN> ')
text = text.replace('--', ' <HYPHENS> ')
text = text.replace('?', ' <QUESTION_MARK> ')
# text = text.replace('\n', ' <NEW_LINE> ')
text = text.replace(':', ' <COLON> ')
words = text.split()
# 删除低频词,减少噪音影响
word_counts = Counter(words)
trimmed_words = [word for word in words if word_counts[word] > freq]
return trimmed_words
# 清洗文本并分词
words = preprocess(text)
print(words[:20])
# 构建映射表
vocab = set(words)
vocab_to_int = {w: c for c, w in enumerate(vocab)}
int_to_vocab = {c: w for c, w in enumerate(vocab)}
dict_size = len(set(words))
print("total words: {}".format(len(words)))
print("unique words: {}".format(dict_size))
# 对原文本进行vocab到int的转换
int_words = [vocab_to_int[w] for w in words]
t = 1e-5 # t值
threshold = 0.8 # 剔除概率阈值
# # 统计单词出现频次
# int_word_counts = Counter(int_words)
# total_count = len(int_words)
# # 计算单词频率
# word_freqs = {w: c/total_count for w, c in int_word_counts.items()}
# # 计算被删除的概率
# prob_drop = {w: 1 - np.sqrt(t / word_freqs[w]) for w in int_word_counts}
# # 对单词进行采样
# train_words = [w for w in int_words if prob_drop[w] < threshold]
train_words = int_words
len(train_words)
def get_targets(words, idx, window_size=5):
'''
获得input word的上下文单词列表
参数
---
words: 单词列表
idx: input word的索引号
window_size: 窗口大小
'''
target_window = np.random.randint(1, window_size + 1)
# 这里要考虑input word前面单词不够的情况
start_point = idx - target_window if (idx - target_window) > 0 else 0
end_point = idx + target_window
# output words(即窗口中的上下文单词)
targets = set(words[start_point:idx] + words[idx + 1:end_point + 1])
return list(targets)
def get_batches(words, batch_size, window_size=5):
def _reader():
'''
构造一个获取batch的生成器
'''
n_batches = len(words) // batch_size
# 仅取full batches
new_words = words[:n_batches * batch_size]
for idx in range(0, len(new_words), batch_size):
x, y = [], []
batch = new_words[idx:idx + batch_size]
for i in range(len(batch)):
batch_x = batch[i]
batch_y = get_targets(batch, i, window_size)
# 由于一个input word会对应多个output word,因此需要长度统一
x.extend([batch_x] * len(batch_y))
y.extend(batch_y)
for i in range(len(batch_y)):
yield [x[i]], [y[i]]
return _reader
if __name__ == "__main__":
epochs = 10 # 迭代轮数
batch_size = 1000 # batch大小
window_size = 10 # 窗口大小
batches = get_batches(train_words, batch_size, window_size)
i = 0
for x, y in batches():
print("x: " + str(x))
print("y: " + str(y))
print("\n")
if i == 10:
exit(0)
i += 1
from __future__ import print_function
import argparse
import logging
import os
# disable gpu training for this example
os.environ["CUDA_VISIBLE_DEVICES"] = ""
import paddle
import paddle.fluid as fluid
import reader
from network_conf import skip_gram_word2vec
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
def parse_args():
parser = argparse.ArgumentParser(description="PaddlePaddle CTR example")
parser.add_argument(
'--train_data_path',
type=str,
default='./data/raw/train.txt',
help="The path of training dataset")
parser.add_argument(
'--test_data_path',
type=str,
default='./data/raw/valid.txt',
help="The path of testing dataset")
parser.add_argument(
'--batch_size',
type=int,
default=100,
help="The size of mini-batch (default:1000)")
parser.add_argument(
'--num_passes',
type=int,
default=10,
help="The number of passes to train (default: 10)")
parser.add_argument(
'--model_output_dir',
type=str,
default='models',
help='The path for model to store (default: models)')
parser.add_argument(
'--embedding_size',
type=int,
default=64,
help='sparse feature hashing space for index processing')
parser.add_argument(
'--is_local',
type=int,
default=1,
help='Local train or distributed train (default: 1)')
# the following arguments is used for distributed train, if is_local == false, then you should set them
parser.add_argument(
'--role',
type=str,
default='pserver', # trainer or pserver
help='The path for model to store (default: models)')
parser.add_argument(
'--endpoints',
type=str,
default='127.0.0.1:6000',
help='The pserver endpoints, like: 127.0.0.1:6000,127.0.0.1:6001')
parser.add_argument(
'--current_endpoint',
type=str,
default='127.0.0.1:6000',
help='The path for model to store (default: 127.0.0.1:6000)')
parser.add_argument(
'--trainer_id',
type=int,
default=0,
help='The path for model to store (default: models)')
parser.add_argument(
'--trainers',
type=int,
default=1,
help='The num of trianers, (default: 1)')
return parser.parse_args()
def train_loop(args, train_program, data_list, loss, trainer_num, trainer_id):
dataset = reader.get_batches(reader.train_words, 5, 5)
train_reader = paddle.batch(
paddle.reader.shuffle(
dataset, buf_size=args.batch_size * 100),
batch_size=args.batch_size)
place = fluid.CPUPlace()
feeder = fluid.DataFeeder(feed_list=data_list, place=place)
data_name_list = [var.name for var in data_list]
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
for pass_id in range(args.num_passes):
for batch_id, data in enumerate(train_reader()):
loss_val = exe.run(train_program,
feed=feeder.feed(data),
fetch_list=[loss])
if batch_id % 10 == 0:
logger.info("TRAIN --> pass: {} batch: {} loss: {}".format(
pass_id, batch_id, loss_val[0] / args.batch_size))
if batch_id % 1000 == 0 and batch_id != 0:
model_dir = args.model_output_dir + '/batch-' + str(batch_id)
if args.trainer_id == 0:
fluid.io.save_inference_model(model_dir, data_name_list,
[loss], exe)
model_dir = args.model_output_dir + '/pass-' + str(pass_id)
if args.trainer_id == 0:
fluid.io.save_inference_model(model_dir, data_name_list, [loss],
exe)
def train():
args = parse_args()
if not os.path.isdir(args.model_output_dir):
os.mkdir(args.model_output_dir)
loss, data_list = skip_gram_word2vec(reader.dict_size, args.embedding_size)
optimizer = fluid.optimizer.Adam(learning_rate=1e-3)
optimizer.minimize(loss)
if args.is_local:
logger.info("run local training")
main_program = fluid.default_main_program()
train_loop(args, main_program, data_list, loss, 1, -1)
else:
logger.info("run dist training")
t = fluid.DistributeTranspiler()
t.transpile(
args.trainer_id, pservers=args.endpoints, trainers=args.trainers)
if args.role == "pserver":
logger.info("run pserver")
prog = t.get_pserver_program(args.current_endpoint)
startup = t.get_startup_program(
args.current_endpoint, pserver_program=prog)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(startup)
exe.run(prog)
elif args.role == "trainer":
logger.info("run trainer")
train_prog = t.get_trainer_program()
train_loop(args, train_prog, data_list, loss, args.trainers,
args.trainer_id + 1)
if __name__ == '__main__':
train()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册