提交 c617a1a1 编写于 作者: Z zhoushiyu 提交者: zhang wenhui

add xDeepFM, DCN ctr model and DeepFM data download (#3303)

add xDeepFM, DCN model in PaddleRec/ctr.
change DeepFM data download script.
上级 1b5d1330
# Deep & Cross Network
以下是本例的简要目录结构及说明:
```text
.
├── README.md # 文档
├── local_train.py # 本地训练脚本
├── infer.py # 预测脚本
├── network.py # 网络结构
├── config.py # 参数配置
├── reader.py # 读取数据相关的函数
├── data/
├── download.sh # 下载数据脚本
├── preprocess.py # 数据预处理脚本
```
## 介绍
DCN模型介绍可以参阅论文[Deep & Cross Network for Ad Click Predictions](https://arxiv.org/abs/1708.05123)
## 环境
- PaddlePaddle 1.5.1
## 数据下载
我们在Criteo数据集训练测试DCN。整个数据集包含约4500万条记录。每一行第一列是label,表示该条广告是否被点击,剩下的是13个整数型特征(I1 - I13)和26个离散型特征(C1 - C26)。
数据下载命令
```bash
cd data && sh download.sh
```
## 数据处理
- 根据论文,使用前6天的数据进行训练(大约41million),第7天的数据一半做valid一半做test。基本上是将数据集按照9:0.5:0.5切分,需要注意的是train数据是前90%。而如xdeepfm等论文实验中8:1:1,并且是完全打乱的。
- 论文对整数型特征数据使用了log transform,因为只有I2最小值为-3,其余最小值为0,所以对I2采用log(4 + l2_value)对其余采用log(1 + l*_value)。
- 统计每个离散型特征(即C1 - C26)出现的不同feature id,存在大量的低频feature id。所以需要对低频feature id进行过滤,缩小embedding matrix大小。代码默认设置的频率是10,去掉了大量低频feature id。
数据预处理命令
```bash
python preprocess.py
```
数据预处理后,训练数据在train中,验证和测试数据在test_valid中,vocab存储离散型特征过滤低频后的feature id。并统计了整数型特征的最小/最大值,离散型特征的feature id数量。
## 本地训练
```bash
nohup python -u local_train.py > train.log &
```
训练过程中每隔固定的steps(默认为100)输出当前total loss(logloss + 正则), log loss和auc,可以在args.py中调整print_steps。
## 本地预测
```bash
nohup python -u infer.py --test_epoch 2 > test.log &
```
注意:最后一行的auc是整个预测数据集的auc
## 结果
本结果在Linux CPU机器上使用dataset开启20线程训练,batch size为512。经过150000 steps(~1.87 epoch)后,预测实验结果如下:
```text
loss: [0.44703564] auc_val: [0.80654419]
```
#!/usr/bin/env python
# coding: utf-8
import argparse
"""
global params
"""
def parse_args():
parser = argparse.ArgumentParser(description="PaddleFluid DCN demo")
parser.add_argument(
'--train_data_dir',
type=str,
default='data/train',
help='The path of train data')
parser.add_argument(
'--test_valid_data_dir',
type=str,
default='data/test_valid',
help='The path of test and valid data')
parser.add_argument(
'--batch_size', type=int, default=512, help="Batch size")
parser.add_argument(
'--steps',
type=int,
default=150000,
help="Early stop steps in training. If set, num_epoch will not work")
parser.add_argument('--num_epoch', type=int, default=2, help="train epoch")
parser.add_argument(
'--model_output_dir',
type=str,
default='models',
help='The path for model to store')
parser.add_argument(
'--num_thread', type=int, default=20, help='The number of threads')
parser.add_argument('--test_epoch', type=str, default='1')
parser.add_argument(
'--dnn_hidden_units',
nargs='+',
type=int,
default=[1024, 1024],
help='DNN layers and hidden units')
parser.add_argument(
'--cross_num',
type=int,
default=6,
help='The number of Cross network layers')
parser.add_argument('--lr', type=float, default=1e-4, help='Learning rate')
parser.add_argument(
'--l2_reg_cross',
type=float,
default=1e-5,
help='Cross net l2 regularizer coefficient')
parser.add_argument(
'--use_bn',
type=bool,
default=True,
help='Whether use batch norm in dnn part')
parser.add_argument(
'--clip_by_norm', type=float, default=100.0, help="gradient clip norm")
parser.add_argument('--print_steps', type=int, default=100)
return parser.parse_args()
#!/bin/bash
workdir=$(cd $(dirname $0); pwd)
cd $workdir
trainfile='train.txt'
echo "data dir:" ${workdir}
cd $workdir
echo "download data starting..."
wget --no-check-certificate -c https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz
echo "download finished"
echo "extracting ..."
tar xzvf dac.tar.gz
wc -l $trainfile | awk '{print $1}' > line_nums.log
echo "extract finished"
echo "total records: "`cat line_nums.log`
echo "done"
#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, absolute_import, division
import os
import sys
from collections import Counter
import numpy as np
"""
preprocess Criteo train data, generate extra statistic files for model input.
"""
# input filename
FILENAME = 'train.txt'
# global vars
CAT_FEATURE_NUM = 'cat_feature_num.txt'
INT_FEATURE_MINMAX = 'int_feature_minmax.txt'
VOCAB_DIR = 'vocab'
TRAIN_DIR = 'train'
TEST_VALID_DIR = 'test_valid'
SPLIT_RATIO = 0.9
LINE_NUMS = "line_nums.log"
FREQ_THR = 10
INT_COLUMN_NAMES = ['I' + str(i) for i in range(1, 14)]
CAT_COLUMN_NAMES = ['C' + str(i) for i in range(1, 27)]
def check_statfiles():
"""
check if statistic files of Criteo exists
:return:
"""
statsfiles = [CAT_FEATURE_NUM, INT_FEATURE_MINMAX] + [
os.path.join(VOCAB_DIR, cat_fn + '.txt') for cat_fn in CAT_COLUMN_NAMES
]
if all([os.path.exists(fn) for fn in statsfiles]):
return True
return False
def create_statfiles():
"""
create statistic files of Criteo, including:
min/max of interger features
counts of categorical features
vocabs of each categorical features
:return:
"""
int_minmax_list = [[sys.maxsize, -sys.maxsize]
for _ in range(13)] # count integer feature min max
cat_ct_list = [Counter() for _ in range(26)] # count categorical features
for idx, line in enumerate(open(FILENAME)):
spls = line.rstrip('\n').split('\t')
assert len(spls) == 40
for i in range(13):
if not spls[1 + i]: continue
int_val = int(spls[1 + i])
int_minmax_list[i][0] = min(int_minmax_list[i][0], int_val)
int_minmax_list[i][1] = max(int_minmax_list[i][1], int_val)
for i in range(26):
cat_ct_list[i].update([spls[14 + i]])
# save min max of integer features
with open(INT_FEATURE_MINMAX, 'w') as f:
for name, minmax in zip(INT_COLUMN_NAMES, int_minmax_list):
print("{} {} {}".format(name, minmax[0], minmax[1]), file=f)
# remove '' from all cat_set[i] and filter low freq categorical value
cat_set_list = [set() for i in range(len(cat_ct_list))]
for i, ct in enumerate(cat_ct_list):
if '' in ct: del ct['']
for key in list(ct.keys()):
if ct[key] >= FREQ_THR:
cat_set_list[i].add(key)
del cat_ct_list
# create vocab dir
if not os.path.exists(VOCAB_DIR):
os.makedirs(VOCAB_DIR)
# write vocab file of categorical features
with open(CAT_FEATURE_NUM, 'w') as cat_feat_count_file:
for name, s in zip(CAT_COLUMN_NAMES, cat_set_list):
print('{} {}'.format(name, len(s)), file=cat_feat_count_file)
vocabfile = os.path.join(VOCAB_DIR, name + '.txt')
with open(vocabfile, 'w') as f:
for vocab_val in s:
print(vocab_val, file=f)
def split_data():
"""
split train.txt into train and test_valid files.
:return:
"""
if not os.path.exists(TRAIN_DIR):
os.makedirs(TRAIN_DIR)
if not os.path.exists(TEST_VALID_DIR):
os.makedirs(TEST_VALID_DIR)
fin = open('train.txt', 'r')
data_dir = TRAIN_DIR
fout = open(os.path.join(data_dir, 'part-0'), 'w')
split_idx = int(45840617 * SPLIT_RATIO)
for line_idx, line in enumerate(fin):
if line_idx == split_idx:
fout.close()
data_dir = TEST_VALID_DIR
cur_part_idx = int(line_idx / 200000)
fout = open(data_dir + '/part-' + str(cur_part_idx), 'w')
if line_idx % 200000 == 0 and line_idx != 0:
fout.close()
cur_part_idx = int(line_idx / 200000)
fout = open(data_dir + '/part-' + str(cur_part_idx), 'w')
fout.write(line)
fout.close()
fin.close()
if __name__ == '__main__':
if not check_statfiles():
print('create statstic files of Criteo...')
create_statfiles()
print('split train.txt...')
split_data()
print('done')
#!/usr/bin/env python
# coding: utf-8
import logging
import random
import numpy as np
import pickle
# disable gpu training for this example
import os
os.environ['CUDA_VISIBLE_DEVICES'] = ''
import paddle
import paddle.fluid as fluid
from config import parse_args
from reader import CriteoDataset
from network import DCN
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger('fluid')
logger.setLevel(logging.INFO)
def infer():
args = parse_args()
print(args)
place = fluid.CPUPlace()
inference_scope = fluid.Scope()
test_files = [
os.path.join(args.test_valid_data_dir, fname)
for fname in next(os.walk(args.test_valid_data_dir))[2]
]
test_files = random.sample(test_files, int(len(test_files) * 0.5))
print('test files num {}'.format(len(test_files)))
criteo_dataset = CriteoDataset()
criteo_dataset.setup()
test_reader = criteo_dataset.test_reader(test_files, args.batch_size, 100)
startup_program = fluid.framework.Program()
test_program = fluid.framework.Program()
cur_model_path = args.model_output_dir + '/epoch_' + args.test_epoch
with fluid.scope_guard(inference_scope):
with fluid.framework.program_guard(test_program, startup_program):
dcn_model = DCN(args.cross_num, args.dnn_hidden_units,
args.l2_reg_cross, args.use_bn)
dcn_model.build_network(is_test=True)
exe = fluid.Executor(place)
feeder = fluid.DataFeeder(
feed_list=dcn_model.data_list, place=place)
fluid.io.load_persistables(
executor=exe,
dirname=cur_model_path,
main_program=fluid.default_main_program())
auc_states_names = ['_generated_var_2', '_generated_var_3']
for name in auc_states_names:
param = inference_scope.var(name).get_tensor()
param_array = np.zeros(param._get_dims()).astype("int64")
param.set(param_array, place)
loss_all = 0
num_ins = 0
for batch_id, data_test in enumerate(test_reader()):
loss_val, auc_val = exe.run(test_program,
feed=feeder.feed(data_test),
fetch_list=[
dcn_model.avg_logloss.name,
dcn_model.auc_var.name
])
# num_ins += len(data_test)
num_ins += 1
loss_all += loss_val
logger.info('TEST --> batch: {} loss: {} auc_val: {}'.format(
batch_id, loss_all / num_ins, auc_val))
print(
'The last log info is the total Logloss and AUC for all test data. '
)
if __name__ == '__main__':
infer()
#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, absolute_import, division
import os
import random
import sys
import time
import paddle.fluid as fluid
from config import parse_args
from network import DCN
"""
train DCN model
"""
def train(args):
"""train and save DCN model
:param args: hyperparams of model
:return:
"""
dcn_model = DCN(args.cross_num, args.dnn_hidden_units, args.l2_reg_cross,
args.use_bn, args.clip_by_norm)
dcn_model.build_network()
dcn_model.backward(args.lr)
# config dataset
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(dcn_model.data_list)
pipe_command = 'python reader.py'
dataset.set_pipe_command(pipe_command)
dataset.set_batch_size(args.batch_size)
dataset.set_thread(args.num_thread)
train_filelist = [
os.path.join(args.train_data_dir, fname)
for fname in next(os.walk(args.train_data_dir))[2]
]
dataset.set_filelist(train_filelist)
num_epoch = args.num_epoch
if args.steps:
epoch = args.steps * args.batch_size / 41000000
full_epoch = int(epoch // 1)
last_epoch = epoch % 1
train_filelists = [train_filelist for _ in range(full_epoch)] + [
random.sample(train_filelist, int(
len(train_filelist) * last_epoch))
]
num_epoch = full_epoch + 1
print("train epoch: {}".format(num_epoch))
# Executor
exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program())
for epoch_id in range(num_epoch):
start = time.time()
sys.stderr.write('\nepoch%d start ...\n' % (epoch_id + 1))
dataset.set_filelist(train_filelists[epoch_id])
exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=dataset,
fetch_list=[
dcn_model.loss, dcn_model.avg_logloss, dcn_model.auc_var
],
fetch_info=['total_loss', 'avg_logloss', 'auc'],
debug=False,
print_period=args.print_steps)
model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1)
sys.stderr.write('epoch%d is finished and takes %f s\n' % (
(epoch_id + 1), time.time() - start))
fluid.io.save_persistables(
executor=exe,
dirname=model_dir,
main_program=fluid.default_main_program())
if __name__ == '__main__':
args = parse_args()
print(args)
train(args)
#!/usr/bin/env python
# coding: utf-8
from __future__ import print_function, absolute_import, division
import paddle.fluid as fluid
from collections import OrderedDict
"""
DCN network
"""
class DCN(object):
def __init__(self,
cross_num=2,
dnn_hidden_units=(128, 128),
l2_reg_cross=1e-5,
dnn_use_bn=False,
clip_by_norm=None):
self.cross_num = cross_num
self.dnn_hidden_units = dnn_hidden_units
self.l2_reg_cross = l2_reg_cross
self.dnn_use_bn = dnn_use_bn
self.clip_by_norm = clip_by_norm
self.cat_feat_dims_dict = OrderedDict()
self.dense_feat_names = ['I' + str(i) for i in range(1, 14)]
self.sparse_feat_names = ['C' + str(i) for i in range(1, 27)]
target = ['label']
for line in open('data/cat_feature_num.txt'):
spls = line.strip().split()
assert len(spls) == 2
self.cat_feat_dims_dict[spls[0]] = int(spls[1])
# {feat_name: dims}
self.feat_dims_dict = OrderedDict(
[(feat_name, 1) for feat_name in self.dense_feat_names])
self.feat_dims_dict.update(self.cat_feat_dims_dict)
self.net_input = None
self.loss = None
def build_network(self, is_test=False):
# data input
self.target_input = fluid.layers.data(
name='label', shape=[1], dtype='float32')
data_dict = OrderedDict()
for feat_name in self.feat_dims_dict:
data_dict[feat_name] = fluid.layers.data(
name=feat_name, shape=[1], dtype='float32')
self.net_input = self._create_embedding_input(data_dict)
deep_out = self._deep_net(self.net_input, self.dnn_hidden_units,
self.dnn_use_bn, is_test)
cross_out, l2_reg_cross_loss = self._cross_net(self.net_input,
self.cross_num)
last_out = fluid.layers.concat([deep_out, cross_out], axis=-1)
logit = fluid.layers.fc(last_out, 1)
self.prob = fluid.layers.sigmoid(logit)
self.data_list = [self.target_input] + [
data_dict[dense_name] for dense_name in self.dense_feat_names
] + [data_dict[sparse_name] for sparse_name in self.sparse_feat_names]
# auc
prob_2d = fluid.layers.concat([1 - self.prob, self.prob], 1)
label_int = fluid.layers.cast(self.target_input, 'int64')
auc_var, batch_auc_var, auc_states = fluid.layers.auc(input=prob_2d,
label=label_int,
slide_steps=0)
self.auc_var = auc_var
# logloss
logloss = fluid.layers.log_loss(self.prob, self.target_input)
self.avg_logloss = fluid.layers.reduce_mean(logloss)
# reg_coeff * l2_reg_cross
l2_reg_cross_loss = self.l2_reg_cross * l2_reg_cross_loss
self.loss = self.avg_logloss + l2_reg_cross_loss
def backward(self, lr):
p_g_clip = fluid.backward.append_backward(loss=self.loss)
fluid.clip.set_gradient_clip(
fluid.clip.GradientClipByGlobalNorm(clip_norm=self.clip_by_norm))
p_g_clip = fluid.clip.append_gradient_clip_ops(p_g_clip)
optimizer = fluid.optimizer.Adam(learning_rate=lr)
# params_grads = optimizer.backward(self.loss)
optimizer.apply_gradients(p_g_clip)
def _deep_net(self, input, hidden_units, use_bn=False, is_test=False):
for units in hidden_units:
input = fluid.layers.fc(input=input, size=units)
if use_bn:
input = fluid.layers.batch_norm(input, is_test=is_test)
input = fluid.layers.relu(input)
return input
def _cross_layer(self, x0, x, prefix):
input_dim = x0.shape[-1]
w = fluid.layers.create_parameter(
[input_dim], dtype='float32', name=prefix + "_w")
b = fluid.layers.create_parameter(
[input_dim], dtype='float32', name=prefix + "_b")
xw = fluid.layers.reduce_sum(x * w, dim=1, keep_dim=True) # (N, 1)
return x0 * xw + b + x, w
def _cross_net(self, input, num_corss_layers):
x = x0 = input
l2_reg_cross_list = []
for i in range(num_corss_layers):
x, w = self._cross_layer(x0, x, "cross_layer_{}".format(i))
l2_reg_cross_list.append(self._l2_loss(w))
l2_reg_cross_loss = fluid.layers.reduce_sum(
fluid.layers.concat(
l2_reg_cross_list, axis=-1))
return x, l2_reg_cross_loss
def _l2_loss(self, w):
return fluid.layers.reduce_sum(fluid.layers.square(w))
def _create_embedding_input(self, data_dict):
# sparse embedding
sparse_emb_dict = OrderedDict((
name, fluid.layers.embedding(
input=fluid.layers.cast(
data_dict[name], dtype='int64'),
size=[
self.feat_dims_dict[name] + 1,
6 * int(pow(self.feat_dims_dict[name], 0.25))
]))
for name in self.sparse_feat_names
)
# combine dense and sparse_emb
dense_input_list = [
data_dict[name] for name in data_dict if name.startswith('I')
]
sparse_emb_list = list(sparse_emb_dict.values())
sparse_input = fluid.layers.concat(sparse_emb_list, axis=-1)
sparse_input = fluid.layers.flatten(sparse_input)
dense_input = fluid.layers.concat(dense_input_list, axis=-1)
dense_input = fluid.layers.flatten(dense_input)
dense_input = fluid.layers.cast(dense_input, 'float32')
net_input = fluid.layers.concat([dense_input, sparse_input], axis=-1)
return net_input
#!/usr/bin/env python
# coding: utf-8
"""
dataset and reader
"""
import math
import sys
import paddle.fluid.incubate.data_generator as dg
import pickle
from collections import Counter
import os
class CriteoDataset(dg.MultiSlotDataGenerator):
def setup(self):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.cont_max_ = [
5775, 257675, 65535, 969, 23159456, 431037, 56311, 6047, 29019, 11,
231, 4008, 7393
]
self.cont_diff_ = [
self.cont_max_[i] - self.cont_min_[i]
for i in range(len(self.cont_min_))
]
self.cont_idx_ = list(range(1, 14))
self.cat_idx_ = list(range(14, 40))
dense_feat_names = ['I' + str(i) for i in range(1, 14)]
sparse_feat_names = ['C' + str(i) for i in range(1, 27)]
target = ['label']
self.label_feat_names = target + dense_feat_names + sparse_feat_names
self.cat_feat_idx_dict_list = [{} for _ in range(26)]
for i in range(26):
lookup_idx = 1 # remain 0 for default value
for line in open(
os.path.join('data/vocab', 'C' + str(i + 1) + '.txt')):
self.cat_feat_idx_dict_list[i][line.strip()] = lookup_idx
lookup_idx += 1
def _process_line(self, line):
features = line.rstrip('\n').split('\t')
label_feat_list = [[] for _ in range(40)]
for idx in self.cont_idx_:
if features[idx] == '':
label_feat_list[idx].append(0)
else:
# 0-1 minmax norm
# label_feat_list[idx].append((float(features[idx]) - self.cont_min_[idx - 1]) /
# self.cont_diff_[idx - 1])
# log transform
label_feat_list[idx].append(
math.log(4 + float(features[idx]))
if idx == 2 else math.log(1 + float(features[idx])))
for idx in self.cat_idx_:
if features[idx] == '' or features[
idx] not in self.cat_feat_idx_dict_list[idx - 14]:
label_feat_list[idx].append(0)
else:
label_feat_list[idx].append(self.cat_feat_idx_dict_list[
idx - 14][features[idx]])
label_feat_list[0].append(int(features[0]))
return label_feat_list
def test_reader(self, filelist, batch, buf_size):
print(filelist)
def local_iter():
for fname in filelist:
with open(fname.strip(), 'r') as fin:
for line in fin:
label_feat_list = self._process_line(line)
yield label_feat_list
import paddle
batch_iter = paddle.batch(
paddle.reader.buffered(
local_iter, size=buf_size), batch_size=batch)
return batch_iter
def generate_sample(self, line):
def data_iter():
label_feat_list = self._process_line(line)
yield list(zip(self.label_feat_names, label_feat_list))
return data_iter
if __name__ == '__main__':
criteo_dataset = CriteoDataset()
criteo_dataset.setup()
criteo_dataset.run_from_stdin()
#!/bin/bash
wget --no-check-certificate https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz
wget --no-check-certificate https://paddlerec.bj.bcebos.com/deepfm%2Ffeat_dict_10.pkl2 -O ./aid_data/feat_dict_10.pkl2 || rm -f ./aid_data/feat_dict_10.pkl2
tar zxf dac.tar.gz
rm -f dac.tar.gz
......
# xDeepFM for CTR Prediction
## 简介
使用PaddlePaddle复现论文 "xDeepFM: Combining Explicit and Implicit Feature Interactions for Recommender Systems" 。论文[开源代码](https://github.com/Leavingseason/xDeepFM)
## 数据集
demo数据集,在data目录下执行命令,下载数据
```bash
sh download.sh
```
## 环境
- PaddlePaddle 1.5
## 单机训练
```bash
python local_train.py --model_output_dir models
```
训练过程中每隔固定的steps(默认为50)输出当前loss和auc,可以在args.py中调整print_steps。
## 单机预测
```bash
python infer.py --model_output_dir models --test_epoch 10
```
test_epoch设置加载第10轮训练的模型。
注意:最后的 log info是测试数据集的整体 Logloss 和 AUC。
## 结果
训练集训练10轮后,测试集的LogLoss : `0.48657` 和 AUC : `0.7308`
import argparse
def parse_args():
parser = argparse.ArgumentParser(description="PaddlePaddle CTR example")
parser.add_argument(
'--train_data_dir',
type=str,
default='data/train_data',
help='The path of train data (default: data/train_data)')
parser.add_argument(
'--test_data_dir',
type=str,
default='data/test_data',
help='The path of test data (default: models)')
parser.add_argument(
'--batch_size',
type=int,
default=100,
help="The size of mini-batch (default:100)")
parser.add_argument(
'--embedding_size',
type=int,
default=10,
help="The size for embedding layer (default:10)")
parser.add_argument(
'--num_epoch',
type=int,
default=10,
help="The number of epochs to train (default: 10)")
parser.add_argument(
'--model_output_dir',
type=str,
required=True,
help='The path for model to store (default: models)')
parser.add_argument(
'--num_thread',
type=int,
default=1,
help='The number of threads (default: 10)')
parser.add_argument('--test_epoch', type=str, default='1')
parser.add_argument(
'--layer_sizes_dnn',
nargs='+',
type=int,
default=[10, 10, 10],
help='The size of each layers')
parser.add_argument(
'--layer_sizes_cin',
nargs='+',
type=int,
default=[10, 10],
help='The size of each layers')
parser.add_argument(
'--act',
type=str,
default='relu',
help='The activation of each layers (default: relu)')
parser.add_argument(
'--lr', type=float, default=1e-1, help='Learning rate (default: 1e-4)')
parser.add_argument(
'--reg', type=float, default=1e-4, help=' (default: 1e-4)')
parser.add_argument('--num_field', type=int, default=39)
parser.add_argument('--num_feat', type=int, default=28651)
parser.add_argument(
'--model_name',
type=str,
default='ctr_xdeepfm_model',
help='The name of model (default: ctr_xdeepfm_model)')
parser.add_argument('--use_gpu', type=int, default=1)
parser.add_argument('--print_steps', type=int, default=50)
return parser.parse_args()
import sys
import paddle.fluid.incubate.data_generator as dg
import pickle
from collections import Counter
import os
class CriteoDataset(dg.MultiSlotDataGenerator):
def _process_line(self, line):
features = line.strip('\n').split('\t')
feat_idx = []
feat_value = []
for idx in range(1, 40):
feat_idx.append(int(features[idx]))
feat_value.append(1.0)
label = [int(features[0])]
return feat_idx, feat_value, label
def test(self, filelist):
def local_iter():
for fname in filelist:
with open(fname.strip(), 'r') as fin:
for line in fin:
feat_idx, feat_value, label = self._process_line(line)
yield [feat_idx, feat_value, label]
return local_iter
def generate_sample(self, line):
def data_iter():
feat_idx, feat_value, label = self._process_line(line)
feature_name = ['feat_idx', 'feat_value', 'label']
yield [('feat_idx', feat_idx), ('feat_value', feat_value), ('label',
label)]
return data_iter
if __name__ == '__main__':
criteo_dataset = CriteoDataset()
criteo_dataset.run_from_stdin()
#!/bin/bash
wget --no-check-certificate https://paddlerec.bj.bcebos.com/xdeepfm%2Fev -O ./test_data/ev
wget --no-check-certificate https://paddlerec.bj.bcebos.com/xdeepfm%2Ftr -O ./train_data/tr
import logging
import numpy as np
import pickle
import os
import paddle
import paddle.fluid as fluid
from args import parse_args
from criteo_reader import CriteoDataset
import network_conf
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger('fluid')
logger.setLevel(logging.INFO)
def infer():
args = parse_args()
print(args)
if args.use_gpu == 1:
place = fluid.CUDAPlace(0)
else:
place = fluid.CPUPlace()
inference_scope = fluid.Scope()
test_files = [
args.test_data_dir + '/' + x for x in os.listdir(args.test_data_dir)
]
criteo_dataset = CriteoDataset()
test_reader = paddle.batch(
criteo_dataset.test(test_files), batch_size=args.batch_size)
startup_program = fluid.framework.Program()
test_program = fluid.framework.Program()
cur_model_path = args.model_output_dir + '/epoch_' + args.test_epoch
with fluid.scope_guard(inference_scope):
with fluid.framework.program_guard(test_program, startup_program):
loss, auc, data_list = eval('network_conf.' + args.model_name)(
args.embedding_size, args.num_field, args.num_feat,
args.layer_sizes_dnn, args.act, args.reg, args.layer_sizes_cin)
exe = fluid.Executor(place)
feeder = fluid.DataFeeder(feed_list=data_list, place=place)
fluid.io.load_persistables(
executor=exe,
dirname=cur_model_path,
main_program=fluid.default_main_program())
auc_states_names = ['_generated_var_2', '_generated_var_3']
for name in auc_states_names:
param = inference_scope.var(name).get_tensor()
param_array = np.zeros(param._get_dims()).astype("int64")
param.set(param_array, place)
loss_all = 0
num_ins = 0
for batch_id, data_test in enumerate(test_reader()):
loss_val, auc_val = exe.run(test_program,
feed=feeder.feed(data_test),
fetch_list=[loss.name, auc.name])
num_ins += len(data_test)
loss_all += loss_val * len(data_test)
logger.info('TEST --> batch: {} loss: {} auc_val: {}'.format(
batch_id, loss_all / num_ins, auc_val))
print(
'The last log info is the total Logloss and AUC for all test data. '
)
if __name__ == '__main__':
infer()
from args import parse_args
import os
import paddle.fluid as fluid
import sys
import network_conf
import time
def train():
args = parse_args()
print(args)
if not os.path.isdir(args.model_output_dir):
os.mkdir(args.model_output_dir)
loss, auc, data_list = eval('network_conf.' + args.model_name)(
args.embedding_size, args.num_field, args.num_feat,
args.layer_sizes_dnn, args.act, args.reg, args.layer_sizes_cin)
optimizer = fluid.optimizer.SGD(
learning_rate=args.lr,
regularization=fluid.regularizer.L2DecayRegularizer(args.reg))
optimizer.minimize(loss)
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(data_list)
dataset.set_pipe_command('python criteo_reader.py')
dataset.set_batch_size(args.batch_size)
dataset.set_filelist([
args.train_data_dir + '/' + x for x in os.listdir(args.train_data_dir)
])
if args.use_gpu == 1:
exe = fluid.Executor(fluid.CUDAPlace(0))
dataset.set_thread(1)
else:
exe = fluid.Executor(fluid.CPUPlace())
dataset.set_thread(args.num_thread)
exe.run(fluid.default_startup_program())
for epoch_id in range(args.num_epoch):
start = time.time()
sys.stderr.write('\nepoch%d start ...\n' % (epoch_id + 1))
exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=dataset,
fetch_list=[loss, auc],
fetch_info=['loss', 'auc'],
debug=False,
print_period=args.print_steps)
model_dir = args.model_output_dir + '/epoch_' + str(epoch_id + 1)
sys.stderr.write('epoch%d is finished and takes %f s\n' % (
(epoch_id + 1), time.time() - start))
fluid.io.save_persistables(
executor=exe,
dirname=model_dir,
main_program=fluid.default_main_program())
if __name__ == '__main__':
train()
import paddle.fluid as fluid
import math
def ctr_xdeepfm_model(embedding_size, num_field, num_feat, layer_sizes_dnn, act,
reg, layer_sizes_cin):
init_value_ = 0.1
initer = fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_)
raw_feat_idx = fluid.layers.data(
name='feat_idx', shape=[num_field], dtype='int64')
raw_feat_value = fluid.layers.data(
name='feat_value', shape=[num_field], dtype='float32')
label = fluid.layers.data(
name='label', shape=[1], dtype='float32') # None * 1
feat_idx = fluid.layers.reshape(raw_feat_idx,
[-1, num_field, 1]) # None * num_field * 1
feat_value = fluid.layers.reshape(
raw_feat_value, [-1, num_field, 1]) # None * num_field * 1
feat_embeddings = fluid.layers.embedding(
input=feat_idx,
dtype='float32',
size=[num_feat + 1, embedding_size],
padding_idx=0,
param_attr=fluid.ParamAttr(
initializer=initer)) # None * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # None * num_field * embedding_size
# -------------------- linear --------------------
weights_linear = fluid.layers.embedding(
input=feat_idx,
dtype='float32',
size=[num_feat + 1, 1],
padding_idx=0,
param_attr=fluid.ParamAttr(initializer=initer)) # None * num_field * 1
b_linear = fluid.layers.create_parameter(
shape=[1],
dtype='float32',
default_initializer=fluid.initializer.ConstantInitializer(value=0))
y_linear = fluid.layers.reduce_sum(
(weights_linear * feat_value), 1) + b_linear
# -------------------- CIN --------------------
Xs = [feat_embeddings]
last_s = num_field
for s in layer_sizes_cin:
# calculate Z^(k+1) with X^k and X^0
X_0 = fluid.layers.reshape(
fluid.layers.transpose(Xs[0], [0, 2, 1]),
[-1, embedding_size, num_field,
1]) # None, embedding_size, num_field, 1
X_k = fluid.layers.reshape(
fluid.layers.transpose(Xs[-1], [0, 2, 1]),
[-1, embedding_size, 1, last_s]) # None, embedding_size, 1, last_s
Z_k_1 = fluid.layers.matmul(
X_0, X_k) # None, embedding_size, num_field, last_s
# compresses Z^(k+1) to X^(k+1)
Z_k_1 = fluid.layers.reshape(Z_k_1, [
-1, embedding_size, last_s * num_field
]) # None, embedding_size, last_s*num_field
Z_k_1 = fluid.layers.transpose(
Z_k_1, [0, 2, 1]) # None, s*num_field, embedding_size
Z_k_1 = fluid.layers.reshape(
Z_k_1, [-1, last_s * num_field, 1, embedding_size]
) # None, last_s*num_field, 1, embedding_size (None, channal_in, h, w)
X_k_1 = fluid.layers.conv2d(
Z_k_1,
num_filters=s,
filter_size=(1, 1),
act=None,
bias_attr=False,
param_attr=fluid.ParamAttr(
initializer=initer)) # None, s, 1, embedding_size
X_k_1 = fluid.layers.reshape(
X_k_1, [-1, s, embedding_size]) # None, s, embedding_size
Xs.append(X_k_1)
last_s = s
# sum pooling
y_cin = fluid.layers.concat(Xs[1:],
1) # None, (num_field++), embedding_size
y_cin = fluid.layers.reduce_sum(y_cin, -1) # None, (num_field++)
y_cin = fluid.layers.fc(input=y_cin,
size=1,
act=None,
param_attr=fluid.ParamAttr(initializer=initer),
bias_attr=None)
y_cin = fluid.layers.reduce_sum(y_cin, dim=-1, keep_dim=True)
# -------------------- DNN --------------------
y_dnn = fluid.layers.reshape(feat_embeddings,
[-1, num_field * embedding_size])
for s in layer_sizes_dnn:
y_dnn = fluid.layers.fc(input=y_dnn,
size=s,
act=act,
param_attr=fluid.ParamAttr(initializer=initer),
bias_attr=None)
y_dnn = fluid.layers.fc(input=y_dnn,
size=1,
act=None,
param_attr=fluid.ParamAttr(initializer=initer),
bias_attr=None)
# ------------------- xDeepFM ------------------
predict = fluid.layers.sigmoid(y_linear + y_cin + y_dnn)
cost = fluid.layers.log_loss(input=predict, label=label, epsilon=0.0000001)
batch_cost = fluid.layers.reduce_mean(cost)
# for auc
predict_2d = fluid.layers.concat([1 - predict, predict], 1)
label_int = fluid.layers.cast(label, 'int64')
auc_var, batch_auc_var, auc_states = fluid.layers.auc(input=predict_2d,
label=label_int,
slide_steps=0)
return batch_cost, auc_var, [raw_feat_idx, raw_feat_value, label]
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册