未验证 提交 c4ff2799 编写于 作者: Z zhang wenhui 提交者: GitHub

Delete PaddleRec model (#4872)

* update api 1.8

* fix paddlerec readme

* delete some model

* delete some model

* update
上级 afaf06e7
# Click-Through Rate prediction
## 简介
我们提供了常见的ctr任务中使用的模型,包括[dnn](https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/ctr/dnn)[deepfm](https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/ctr/deepfm)[xdeepfm](https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/ctr/xdeepfm)[dcn](https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/ctr/dcn)
同时推荐用户参考[ IPython Notebook demo](https://aistudio.baidu.com/aistudio/projectDetail/124378)
# Deep & Cross Network
以下是本例的简要目录结构及说明:
models/PaddleRec只是提供了经典推荐算法的Paddle实现,我们已经开源了功能更强大的工具组件[PaddlePaddle/PaddleRec](https://github.com/PaddlePaddle/PaddleRec) 打通了推荐算法+分布式训练全流程,并提供了高级API,在单机和分布式间可以实现无缝切换。后续我们将在[PaddlePaddle/PaddleRec](https://github.com/PaddlePaddle/PaddleRec) Repo中发布新的模型和功能,models/PaddleRec不再更新维护。
```text
.
├── README.md # 文档
├── local_train.py # 本地训练脚本
├── infer.py # 预测脚本
├── network.py # 网络结构
├── config.py # 参数配置
├── reader.py # 读取数据相关的函数
├── utils.py # 通用函数
├── data/
├── download.sh # 下载数据脚本
├── preprocess.py # 数据预处理脚本
├── dist_data/
├── dist_data_download.sh # 下载单机模拟多机小样本数据脚本
├── preprocess_dist.py # 小样本数据预处理脚本
```
## 介绍
DCN模型介绍可以参阅论文[Deep & Cross Network for Ad Click Predictions](https://arxiv.org/abs/1708.05123)
## 环境
- **目前模型库下模型均要求使用PaddlePaddle 1.6及以上版本或适当的develop版本**
## 数据下载
我们在Criteo数据集训练测试DCN。整个数据集包含约4500万条记录。每一行第一列是label,表示该条广告是否被点击,剩下的是13个整数型特征(I1 - I13)和26个离散型特征(C1 - C26)。
数据下载命令
```bash
cd data && python download.py
```
## 数据处理
- 根据论文,使用前6天的数据进行训练(大约41million),第7天的数据一半做valid一半做test。基本上是将数据集按照9:0.5:0.5切分,需要注意的是train数据是前90%。而如xdeepfm等论文实验中8:1:1,并且是完全打乱的。
- 论文对整数型特征数据使用了log transform,因为只有I2最小值为-3,其余最小值为0,所以对I2采用log(4 + l2_value)对其余采用log(1 + l*_value)。
- 统计每个离散型特征(即C1 - C26)出现的不同feature id,存在大量的低频feature id。所以需要对低频feature id进行过滤,缩小embedding matrix大小。代码默认设置的频率是10,去掉了大量低频feature id。
数据预处理命令
```bash
python preprocess.py
```
数据预处理后,训练数据在train中,验证和测试数据在test_valid中,vocab存储离散型特征过滤低频后的feature id。并统计了整数型特征的最小/最大值,离散型特征的feature id数量。
## 本地训练
```bash
nohup python -u local_train.py > train.log &
```
训练过程中每隔固定的steps(默认为100)输出当前total loss(logloss + 正则), log loss和auc,可以在args.py中调整print_steps。
## 本地预测
```bash
nohup python -u infer.py --test_epoch 2 > test.log &
```
注意:最后一行的auc是整个预测数据集的auc
## 结果
本结果在Linux CPU机器上使用dataset开启20线程训练,batch size为512。经过150000 steps(~1.87 epoch)后,预测实验结果如下:
```text
loss: [0.44703564] auc_val: [0.80654419]
```
## 多机训练
首先使用命令下载并预处理小规模样例数据集:
```bash
cd dist_data && python dist_download.py && cd ..
```
运行命令本地模拟多机场景,默认使用2 X 2,即2个pserver,2个trainer的方式组网训练。
**注意:在多机训练中,建议使用Paddle 1.6版本以上或[最新版本](https://www.paddlepaddle.org.cn/documentation/docs/zh/beginners_guide/install/Tables.html#whl-dev)。**
```bash
# 该sh不支持Windows
sh cluster_train.sh
```
参数说明:
- train_data_dir: 训练数据目录
- model_output_dir: 模型保存目录
- is_local: 是否单机本地训练(单机模拟多机分布式训练是为0)
- is_sparse: embedding是否使用sparse。如果没有设置,默认是False
- role: 进程角色(pserver或trainer)
- endpoints: 所有pserver地址和端口
- current_endpoint: 当前pserver(role是pserver)端口和地址
- trainers: trainer数量
其他参数见cluster_train.py
预测
```bash
python infer.py --model_output_dir cluster_model --test_epoch 10 --test_valid_data_dir dist_data/dist_test_valid_data --vocab_dir dist_data/vocab --cat_feat_num dist_data/cat_feature_num.txt
```
注意:
- 本地模拟需要关闭代理,e.g. unset http_proxy, unset https_proxy
- 0号trainer保存模型参数
- 每次训练完成后需要手动停止pserver进程,使用以下命令查看pserver进程:
>ps -ef | grep python
- 数据读取使用dataset模式,目前仅支持运行在Linux环境下
import argparse
"""
global params
"""
def boolean_string(s):
if s.lower() not in {'false', 'true'}:
raise ValueError('Not a valid boolean string')
return s.lower() == 'true'
def parse_args():
parser = argparse.ArgumentParser(description="PaddleFluid DCN demo")
parser.add_argument(
'--train_data_dir',
type=str,
default='data/train',
help='The path of train data')
parser.add_argument(
'--test_valid_data_dir',
type=str,
default='data/test_valid',
help='The path of test and valid data')
parser.add_argument(
'--vocab_dir',
type=str,
default='data/vocab',
help='The path of generated vocabs')
parser.add_argument(
'--cat_feat_num',
type=str,
default='data/cat_feature_num.txt',
help='The path of generated cat_feature_num.txt')
parser.add_argument(
'--batch_size', type=int, default=512, help="Batch size")
parser.add_argument(
'--steps',
type=int,
default=150000,
help="Early stop steps in training. If set, num_epoch will not work")
parser.add_argument('--num_epoch', type=int, default=2, help="train epoch")
parser.add_argument(
'--model_output_dir',
type=str,
default='models',
help='The path for model to store')
parser.add_argument(
'--num_thread', type=int, default=20, help='The number of threads')
parser.add_argument('--test_epoch', type=str, default='1')
parser.add_argument(
'--dnn_hidden_units',
nargs='+',
type=int,
default=[1024, 1024],
help='DNN layers and hidden units')
parser.add_argument(
'--cross_num',
type=int,
default=6,
help='The number of Cross network layers')
parser.add_argument('--lr', type=float, default=1e-4, help='Learning rate')
parser.add_argument(
'--l2_reg_cross',
type=float,
default=1e-5,
help='Cross net l2 regularizer coefficient')
parser.add_argument(
'--use_bn',
type=boolean_string,
default=True,
help='Whether use batch norm in dnn part')
parser.add_argument(
'--is_sparse',
action='store_true',
required=False,
default=False,
help='embedding will use sparse or not, (default: False)')
parser.add_argument(
'--clip_by_norm', type=float, default=100.0, help="gradient clip norm")
parser.add_argument('--print_steps', type=int, default=100)
parser.add_argument(
'--enable_ce', action='store_true', help='If set, run the task with continuous evaluation logs.')
return parser.parse_args()
import os
import sys
import io
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools")
sys.path.append(TOOLS_PATH)
from tools import download_file_and_uncompress
if __name__ == '__main__':
trainfile = 'train.txt'
url = "https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz"
print("download and extract starting...")
download_file_and_uncompress(url)
print("download and extract finished")
count = 0
for _ in io.open(trainfile, 'r', encoding='utf-8'):
count += 1
print("total records: %d" % count)
print("done")
from __future__ import print_function, absolute_import, division
import os
import sys
from collections import Counter
import numpy as np
"""
preprocess Criteo train data, generate extra statistic files for model input.
"""
# input filename
FILENAME = 'train.txt'
# global vars
CAT_FEATURE_NUM = 'cat_feature_num.txt'
INT_FEATURE_MINMAX = 'int_feature_minmax.txt'
VOCAB_DIR = 'vocab'
TRAIN_DIR = 'train'
TEST_VALID_DIR = 'test_valid'
SPLIT_RATIO = 0.9
FREQ_THR = 10
INT_COLUMN_NAMES = ['I' + str(i) for i in range(1, 14)]
CAT_COLUMN_NAMES = ['C' + str(i) for i in range(1, 27)]
def check_statfiles():
"""
check if statistic files of Criteo exists
:return:
"""
statsfiles = [CAT_FEATURE_NUM, INT_FEATURE_MINMAX] + [
os.path.join(VOCAB_DIR, cat_fn + '.txt') for cat_fn in CAT_COLUMN_NAMES
]
if all([os.path.exists(fn) for fn in statsfiles]):
return True
return False
def create_statfiles():
"""
create statistic files of Criteo, including:
min/max of interger features
counts of categorical features
vocabs of each categorical features
:return:
"""
int_minmax_list = [[sys.maxsize, -sys.maxsize]
for _ in range(13)] # count integer feature min max
cat_ct_list = [Counter() for _ in range(26)] # count categorical features
for idx, line in enumerate(open(FILENAME)):
spls = line.rstrip('\n').split('\t')
assert len(spls) == 40
for i in range(13):
if not spls[1 + i]: continue
int_val = int(spls[1 + i])
int_minmax_list[i][0] = min(int_minmax_list[i][0], int_val)
int_minmax_list[i][1] = max(int_minmax_list[i][1], int_val)
for i in range(26):
cat_ct_list[i].update([spls[14 + i]])
# save min max of integer features
with open(INT_FEATURE_MINMAX, 'w') as f:
for name, minmax in zip(INT_COLUMN_NAMES, int_minmax_list):
print("{} {} {}".format(name, minmax[0], minmax[1]), file=f)
# remove '' from all cat_set[i] and filter low freq categorical value
cat_set_list = [set() for i in range(len(cat_ct_list))]
for i, ct in enumerate(cat_ct_list):
if '' in ct: del ct['']
for key in list(ct.keys()):
if ct[key] >= FREQ_THR:
cat_set_list[i].add(key)
del cat_ct_list
# create vocab dir
if not os.path.exists(VOCAB_DIR):
os.makedirs(VOCAB_DIR)
# write vocab file of categorical features
with open(CAT_FEATURE_NUM, 'w') as cat_feat_count_file:
for name, s in zip(CAT_COLUMN_NAMES, cat_set_list):
print('{} {}'.format(name, len(s)), file=cat_feat_count_file)
vocabfile = os.path.join(VOCAB_DIR, name + '.txt')
with open(vocabfile, 'w') as f:
for vocab_val in s:
print(vocab_val, file=f)
def split_data():
"""
split train.txt into train and test_valid files.
:return:
"""
if not os.path.exists(TRAIN_DIR):
os.makedirs(TRAIN_DIR)
if not os.path.exists(TEST_VALID_DIR):
os.makedirs(TEST_VALID_DIR)
fin = open('train.txt', 'r')
data_dir = TRAIN_DIR
fout = open(os.path.join(data_dir, 'part-0'), 'w')
split_idx = int(45840617 * SPLIT_RATIO)
for line_idx, line in enumerate(fin):
if line_idx == split_idx:
fout.close()
data_dir = TEST_VALID_DIR
cur_part_idx = int(line_idx / 200000)
fout = open(
os.path.join(data_dir, 'part-' + str(cur_part_idx)), 'w')
if line_idx % 200000 == 0 and line_idx != 0:
fout.close()
cur_part_idx = int(line_idx / 200000)
fout = open(
os.path.join(data_dir, 'part-' + str(cur_part_idx)), 'w')
fout.write(line)
fout.close()
fin.close()
if __name__ == '__main__':
if not check_statfiles():
print('create statstic files of Criteo...')
create_statfiles()
print('split train.txt...')
split_data()
print('done')
from __future__ import print_function
import os
import sys
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools")
sys.path.append(TOOLS_PATH)
from tools import download_file_and_uncompress
if __name__ == '__main__':
url = "https://paddlerec.bj.bcebos.com/deepfm%2Fdist_data_demo.tar.gz"
print("download and extract starting...")
download_file_and_uncompress(url, savename="dist_data_demo.tar.gz")
print("download and extract finished")
print("preprocessing...")
os.system("python dist_preprocess.py")
print("preprocess done")
\ No newline at end of file
from __future__ import print_function, absolute_import, division
import os
import sys
from collections import Counter
import numpy as np
"""
preprocess Criteo train data, generate extra statistic files for model input.
"""
# input filename
FILENAME = 'dist_data_demo.txt'
# global vars
CAT_FEATURE_NUM = 'cat_feature_num.txt'
INT_FEATURE_MINMAX = 'int_feature_minmax.txt'
VOCAB_DIR = 'vocab'
TRAIN_DIR = 'dist_train_data'
TEST_DIR = 'dist_test_valid_data'
TRAIN_FILE = os.path.join(TRAIN_DIR, 'tr')
TEST_FILE = os.path.join(TEST_DIR, 'ev')
SPLIT_RATIO = 0.9
FREQ_THR = 10
INT_COLUMN_NAMES = ['I' + str(i) for i in range(1, 14)]
CAT_COLUMN_NAMES = ['C' + str(i) for i in range(1, 27)]
def check_statfiles():
"""
check if statistic files of Criteo exists
:return:
"""
statsfiles = [CAT_FEATURE_NUM, INT_FEATURE_MINMAX] + [
os.path.join(VOCAB_DIR, cat_fn + '.txt') for cat_fn in CAT_COLUMN_NAMES
]
if all([os.path.exists(fn) for fn in statsfiles]):
return True
return False
def create_statfiles():
"""
create statistic files of Criteo, including:
min/max of interger features
counts of categorical features
vocabs of each categorical features
:return:
"""
int_minmax_list = [[sys.maxsize, -sys.maxsize]
for _ in range(13)] # count integer feature min max
cat_ct_list = [Counter() for _ in range(26)] # count categorical features
for idx, line in enumerate(open(FILENAME)):
spls = line.rstrip('\n').split('\t')
assert len(spls) == 40
for i in range(13):
if not spls[1 + i]: continue
int_val = int(spls[1 + i])
int_minmax_list[i][0] = min(int_minmax_list[i][0], int_val)
int_minmax_list[i][1] = max(int_minmax_list[i][1], int_val)
for i in range(26):
cat_ct_list[i].update([spls[14 + i]])
# save min max of integer features
with open(INT_FEATURE_MINMAX, 'w') as f:
for name, minmax in zip(INT_COLUMN_NAMES, int_minmax_list):
print("{} {} {}".format(name, minmax[0], minmax[1]), file=f)
# remove '' from all cat_set[i] and filter low freq categorical value
cat_set_list = [set() for i in range(len(cat_ct_list))]
for i, ct in enumerate(cat_ct_list):
if '' in ct: del ct['']
for key in list(ct.keys()):
if ct[key] >= FREQ_THR:
cat_set_list[i].add(key)
del cat_ct_list
# create vocab dir
if not os.path.exists(VOCAB_DIR):
os.makedirs(VOCAB_DIR)
# write vocab file of categorical features
with open(CAT_FEATURE_NUM, 'w') as cat_feat_count_file:
for name, s in zip(CAT_COLUMN_NAMES, cat_set_list):
print('{} {}'.format(name, len(s)), file=cat_feat_count_file)
vocabfile = os.path.join(VOCAB_DIR, name + '.txt')
with open(vocabfile, 'w') as f:
for vocab_val in s:
print(vocab_val, file=f)
def split_data():
"""
split train.txt into train and test_valid files.
:return:
"""
if not os.path.exists(TRAIN_DIR):
os.makedirs(TRAIN_DIR)
if not os.path.exists(TEST_DIR):
os.makedirs(TEST_DIR)
all_lines = []
for line in open(FILENAME):
all_lines.append(line)
split_line_idx = int(len(all_lines) * SPLIT_RATIO)
with open(TRAIN_FILE, 'w') as f:
f.writelines(all_lines[:split_line_idx])
with open(TEST_FILE, 'w') as f:
f.writelines(all_lines[split_line_idx:])
if __name__ == '__main__':
if not check_statfiles():
print('create statstic files of Criteo...')
create_statfiles()
print('split train.txt...')
split_data()
print('done')
from __future__ import print_function, absolute_import, division
import paddle.fluid as fluid
from collections import OrderedDict
"""
DCN network
"""
class DCN(object):
def __init__(self,
cross_num=2,
dnn_hidden_units=(128, 128),
l2_reg_cross=1e-5,
dnn_use_bn=False,
clip_by_norm=None,
cat_feat_dims_dict=None,
is_sparse=False):
self.cross_num = cross_num
self.dnn_hidden_units = dnn_hidden_units
self.l2_reg_cross = l2_reg_cross
self.dnn_use_bn = dnn_use_bn
self.clip_by_norm = clip_by_norm
self.cat_feat_dims_dict = cat_feat_dims_dict if cat_feat_dims_dict else OrderedDict(
)
self.is_sparse = is_sparse
self.dense_feat_names = ['I' + str(i) for i in range(1, 14)]
self.sparse_feat_names = ['C' + str(i) for i in range(1, 27)]
target = ['label']
# {feat_name: dims}
self.feat_dims_dict = OrderedDict(
[(feat_name, 1) for feat_name in self.dense_feat_names])
self.feat_dims_dict.update(self.cat_feat_dims_dict)
self.net_input = None
self.loss = None
def build_network(self, is_test=False):
# data input
self.target_input = fluid.data(
name='label', shape=[None, 1], dtype='float32')
data_dict = OrderedDict()
for feat_name in self.feat_dims_dict:
data_dict[feat_name] = fluid.data(
name=feat_name, shape=[None, 1], dtype='float32')
self.net_input = self._create_embedding_input(data_dict)
deep_out = self._deep_net(self.net_input, self.dnn_hidden_units,
self.dnn_use_bn, is_test)
cross_out, l2_reg_cross_loss = self._cross_net(self.net_input,
self.cross_num)
last_out = fluid.layers.concat([deep_out, cross_out], axis=-1)
logit = fluid.layers.fc(last_out, 1)
self.prob = fluid.layers.sigmoid(logit)
self.data_list = [self.target_input] + [
data_dict[dense_name] for dense_name in self.dense_feat_names
] + [data_dict[sparse_name] for sparse_name in self.sparse_feat_names]
# auc
prob_2d = fluid.layers.concat([1 - self.prob, self.prob], 1)
label_int = fluid.layers.cast(self.target_input, 'int64')
auc_var, batch_auc_var, self.auc_states = fluid.layers.auc(
input=prob_2d, label=label_int, slide_steps=0)
self.auc_var = auc_var
# logloss
logloss = fluid.layers.log_loss(self.prob, self.target_input)
self.avg_logloss = fluid.layers.reduce_mean(logloss)
# reg_coeff * l2_reg_cross
l2_reg_cross_loss = self.l2_reg_cross * l2_reg_cross_loss
self.loss = self.avg_logloss + l2_reg_cross_loss
def backward(self, lr):
p_g_clip = fluid.backward.append_backward(loss=self.loss)
clip = fluid.clip.GradientClipByGlobalNorm(clip_norm=self.clip_by_norm)
p_g_clip = fluid.clip.append_gradient_clip_ops(p_g_clip)
optimizer = fluid.optimizer.Adam(learning_rate=lr, grad_clip=clip)
# params_grads = optimizer.backward(self.loss)
optimizer.apply_gradients(p_g_clip)
def _deep_net(self, input, hidden_units, use_bn=False, is_test=False):
for units in hidden_units:
input = fluid.layers.fc(input=input, size=units)
if use_bn:
input = fluid.layers.batch_norm(input, is_test=is_test)
input = fluid.layers.relu(input)
return input
def _cross_layer(self, x0, x, prefix):
input_dim = x0.shape[-1]
w = fluid.layers.create_parameter(
[input_dim], dtype='float32', name=prefix + "_w")
b = fluid.layers.create_parameter(
[input_dim], dtype='float32', name=prefix + "_b")
xw = fluid.layers.reduce_sum(x * w, dim=1, keep_dim=True) # (N, 1)
return x0 * xw + b + x, w
def _cross_net(self, input, num_corss_layers):
x = x0 = input
l2_reg_cross_list = []
for i in range(num_corss_layers):
x, w = self._cross_layer(x0, x, "cross_layer_{}".format(i))
l2_reg_cross_list.append(self._l2_loss(w))
l2_reg_cross_loss = fluid.layers.reduce_sum(
fluid.layers.concat(
l2_reg_cross_list, axis=-1))
return x, l2_reg_cross_loss
def _l2_loss(self, w):
return fluid.layers.reduce_sum(fluid.layers.square(w))
def _create_embedding_input(self, data_dict):
# sparse embedding
sparse_emb_dict = OrderedDict((name, fluid.embedding(
input=fluid.layers.cast(
data_dict[name], dtype='int64'),
size=[
self.feat_dims_dict[name] + 1,
6 * int(pow(self.feat_dims_dict[name], 0.25))
],
is_sparse=self.is_sparse)) for name in self.sparse_feat_names)
# combine dense and sparse_emb
dense_input_list = [
data_dict[name] for name in data_dict if name.startswith('I')
]
sparse_emb_list = list(sparse_emb_dict.values())
sparse_input = fluid.layers.concat(sparse_emb_list, axis=-1)
sparse_input = fluid.layers.flatten(sparse_input)
dense_input = fluid.layers.concat(dense_input_list, axis=-1)
dense_input = fluid.layers.flatten(dense_input)
dense_input = fluid.layers.cast(dense_input, 'float32')
net_input = fluid.layers.concat([dense_input, sparse_input], axis=-1)
return net_input
"""
dataset and reader
"""
import math
import sys
import paddle.fluid.incubate.data_generator as dg
import pickle
from collections import Counter
import os
class CriteoDataset(dg.MultiSlotDataGenerator):
def setup(self, vocab_dir):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.cont_max_ = [
5775, 257675, 65535, 969, 23159456, 431037, 56311, 6047, 29019, 11,
231, 4008, 7393
]
self.cont_diff_ = [
self.cont_max_[i] - self.cont_min_[i]
for i in range(len(self.cont_min_))
]
self.cont_idx_ = list(range(1, 14))
self.cat_idx_ = list(range(14, 40))
dense_feat_names = ['I' + str(i) for i in range(1, 14)]
sparse_feat_names = ['C' + str(i) for i in range(1, 27)]
target = ['label']
self.label_feat_names = target + dense_feat_names + sparse_feat_names
self.cat_feat_idx_dict_list = [{} for _ in range(26)]
for i in range(26):
lookup_idx = 1 # remain 0 for default value
for line in open(
os.path.join(vocab_dir, 'C' + str(i + 1) + '.txt')):
self.cat_feat_idx_dict_list[i][line.strip()] = lookup_idx
lookup_idx += 1
def _process_line(self, line):
features = line.rstrip('\n').split('\t')
label_feat_list = [[] for _ in range(40)]
for idx in self.cont_idx_:
if features[idx] == '':
label_feat_list[idx].append(0)
else:
# 0-1 minmax norm
# label_feat_list[idx].append((float(features[idx]) - self.cont_min_[idx - 1]) /
# self.cont_diff_[idx - 1])
# log transform
label_feat_list[idx].append(
math.log(4 + float(features[idx]))
if idx == 2 else math.log(1 + float(features[idx])))
for idx in self.cat_idx_:
if features[idx] == '' or features[
idx] not in self.cat_feat_idx_dict_list[idx - 14]:
label_feat_list[idx].append(0)
else:
label_feat_list[idx].append(self.cat_feat_idx_dict_list[
idx - 14][features[idx]])
label_feat_list[0].append(int(features[0]))
return label_feat_list
def test_reader(self, filelist, batch, buf_size):
print(filelist)
def local_iter():
for fname in filelist:
with open(fname.strip(), 'r') as fin:
for line in fin:
label_feat_list = self._process_line(line)
yield label_feat_list
import paddle
batch_iter = fluid.io.batch(
fluid.io.buffered(
local_iter, size=buf_size), batch_size=batch)
return batch_iter
def generate_sample(self, line):
def data_iter():
label_feat_list = self._process_line(line)
yield list(zip(self.label_feat_names, label_feat_list))
return data_iter
if __name__ == '__main__':
criteo_dataset = CriteoDataset()
if len(sys.argv) <= 1:
sys.stderr.write("feat_dict needed for criteo reader.")
exit(1)
criteo_dataset.setup(sys.argv[1])
criteo_dataset.run_from_stdin()
import sys
import paddle.fluid as fluid
import logging
logging.basicConfig()
logger = logging.getLogger(__name__)
__all__ = ['check_version']
def check_version():
"""
Log error and exit when the installed version of paddlepaddle is
not satisfied.
"""
err = "PaddlePaddle version 1.6 or higher is required, " \
"or a suitable develop version is satisfied as well. \n" \
"Please make sure the version is good with your code." \
try:
fluid.require_version('1.6.0')
except Exception as e:
logger.error(err)
sys.exit(1)
# DeepFM for CTR Prediction
## Introduction
This model implementation reproduces the result of the paper "DeepFM: A Factorization-Machine based Neural Network for CTR Prediction" on Criteo dataset.
```text
@inproceedings{guo2017deepfm,
title={DeepFM: A Factorization-Machine based Neural Network for CTR Prediction},
author={Huifeng Guo, Ruiming Tang, Yunming Ye, Zhenguo Li and Xiuqiang He},
booktitle={the Twenty-Sixth International Joint Conference on Artificial Intelligence (IJCAI)},
pages={1725--1731},
year={2017}
}
```
## Environment
- **Now all models in PaddleRec require PaddlePaddle version 1.6 or higher, or suitable develop version.**
## Download and preprocess data
We evaluate the effectiveness of our implemented DeepFM on Criteo dataset. The dataset was used for the [Display Advertising Challenge](https://www.kaggle.com/c/criteo-display-ad-challenge/) hosted by Kaggle and includes 45 million users'click records. Each row is the features for an ad display and the first column is a label indicating whether this ad has been clicked or not. There are 13 continuous features and 26 categorical ones.
To preprocess the raw dataset, we min-max normalize continuous features to [0, 1] and filter categorical features that occur less than 10 times. The dataset is randomly splited into two parts: 90% is for training, while the rest 10% is for testing.
Download and preprocess data:
```bash
cd data && python download_preprocess.py && cd ..
```
After executing these commands, 3 folders "train_data", "test_data" and "aid_data" will be generated. The folder "train_data" contains 90% of the raw data, while the rest 10% is in "test_data". The folder "aid_data" contains a created feature dictionary "feat_dict.pkl2".
## Local Train
```bash
nohup python local_train.py --model_output_dir models >> train_log 2>&1 &
```
## Local Infer
```bash
nohup python infer.py --model_output_dir models --test_epoch 1 >> infer_log 2>&1 &
```
Note: The last log info is the total Logloss and AUC for all test data.
## Result
Reproducing this result requires training with default hyperparameters. The default hyperparameter is shown in `args.py`. Using the default hyperparameters (10 threads, 100 batch size, etc.), it takes about 1.8 hours for CPUs to iterate the training data for one round.
When the training set is iterated to the 22nd round, the testing Logloss is `0.44797` and the testing AUC is `0.8046`.
<p align="center">
<img src="./picture/deepfm_result.png" height=200 hspace='10'/> <br />
</p>
## Distributed Train
We emulate distributed training on a local machine. In default, we use 2 X 2,i.e. 2 pservers X 2 trainers。
**Note: we suggest to use Paddle >= 1.6 or [the latest Paddle](https://www.paddlepaddle.org.cn/documentation/docs/zh/beginners_guide/install/Tables.html#whl-dev) in distributed train.**
### Download and preprocess distributed demo dataset
This small demo dataset(a few lines from Criteo dataset) only test if distributed training can train.
```bash
cd dist_data && python dist_data_download.py && cd ..
```
### Distributed Train and Infer
Train
```bash
# 该sh不支持Windows
sh cluster_train.sh
```
params of cluster_train.sh:
- train_data_dir: path of train data
- model_output_dir: path of saved model
- is_local: local or distributed training(set 0 in distributed training)
- is_sparse: whether to use sparse update in embedding. If not set, default is flase.
- role: role of process(pserver or trainer)
- endpoints: ip:port of all pservers
- current_endpoint: ip:port of current pserver(role should be pserver)
- trainers: the number of trainers
other params explained in cluster_train.py
Infer
```bash
python infer.py --model_output_dir cluster_model --test_epoch 10 --num_feat 141443 --test_data_dir=dist_data/dist_test_data --feat_dict='dist_data/aid_data/feat_dict_10.pkl2'
```
Notes:
- **Proxy must be closed**, e.g. unset http_proxy, unset https_proxy.
- The first trainer(with trainer_id 0) saves model params.
- After each training, pserver processes should be stop manually. You can use command below:
>ps -ef | grep python
- We use Dataset API to load data,it's only supported on Linux now.
## Distributed Train with Fleet
Fleet is High-Level API for distributed training in PaddlePaddle. See [DeepFM example](https://github.com/PaddlePaddle/Fleet/tree/develop/examples/deepFM) in Fleet Repo.
models/PaddleRec只是提供了经典推荐算法的Paddle实现,我们已经开源了功能更强大的工具组件[PaddlePaddle/PaddleRec](https://github.com/PaddlePaddle/PaddleRec) 打通了推荐算法+分布式训练全流程,并提供了高级API,在单机和分布式间可以实现无缝切换。后续我们将在[PaddlePaddle/PaddleRec](https://github.com/PaddlePaddle/PaddleRec) Repo中发布新的模型和功能,models/PaddleRec不再更新维护。
import argparse
def parse_args():
parser = argparse.ArgumentParser(description="PaddlePaddle CTR example")
parser.add_argument(
'--train_data_dir',
type=str,
default='data/train_data',
help='The path of train data (default: data/train_data)')
parser.add_argument(
'--test_data_dir',
type=str,
default='data/test_data',
help='The path of test data (default: models)')
parser.add_argument(
'--feat_dict',
type=str,
default='data/aid_data/feat_dict_10.pkl2',
help='The path of feat_dict')
parser.add_argument(
'--batch_size',
type=int,
default=100,
help="The size of mini-batch (default:100)")
parser.add_argument(
'--embedding_size',
type=int,
default=10,
help="The size for embedding layer (default:10)")
parser.add_argument(
'--num_epoch',
type=int,
default=30,
help="The number of epochs to train (default: 10)")
parser.add_argument(
'--model_output_dir',
type=str,
required=True,
help='The path for model to store (default: models)')
parser.add_argument(
'--num_thread',
type=int,
default=10,
help='The number of threads (default: 10)')
parser.add_argument('--test_epoch', type=str, default='1')
parser.add_argument(
'--layer_sizes',
nargs='+',
type=int,
default=[400, 400, 400],
help='The size of each layers (default: [400, 400, 400])')
parser.add_argument(
'--act',
type=str,
default='relu',
help='The activation of each layers (default: relu)')
parser.add_argument(
'--is_sparse',
action='store_true',
required=False,
default=False,
help='embedding will use sparse or not, (default: False)')
parser.add_argument(
'--lr', type=float, default=1e-4, help='Learning rate (default: 1e-4)')
parser.add_argument(
'--reg', type=float, default=1e-4, help=' (default: 1e-4)')
parser.add_argument('--num_field', type=int, default=39)
parser.add_argument('--num_feat', type=int, default=1086460) # 2090493
parser.add_argument(
'--enable_ce', action='store_true', help='If set, run the task with continuous evaluation logs.')
return parser.parse_args()
[143, 174, 214, 126, 27, 100, 74, 15, 83, 167, 87, 13, 90, 107, 1, 123, 76, 59, 44, 22, 203, 75, 216, 169, 101, 229, 63, 183, 112, 140, 91, 14, 115, 211, 227, 171, 51, 173, 137, 194, 223, 159, 168, 182, 208, 215, 7, 41, 120, 16, 77, 0, 220, 109, 166, 156, 29, 26, 95, 102, 196, 151, 98, 42, 163, 40, 114, 199, 35, 225, 179, 17, 62, 86, 149, 180, 133, 54, 170, 55, 68, 8, 99, 135, 181, 46, 134, 118, 201, 148, 210, 79, 25, 116, 38, 158, 141, 81, 37, 49, 39, 61, 34, 9, 150, 121, 65, 185, 213, 3, 11, 190, 20, 157, 108, 47, 24, 198, 104, 222, 127, 50, 4, 202, 142, 218, 48, 186, 32, 130, 85, 191, 53, 221, 224, 128, 33, 165, 172, 110, 69, 72, 152, 19, 88, 18, 119, 117, 111, 66, 177, 92, 106, 228, 212, 89, 195, 21, 113, 58, 43, 164, 138, 23, 70, 73, 178, 5, 122, 139, 97, 161, 162, 30, 136, 155, 93, 132, 52, 105, 80, 36, 10, 204, 45, 192, 125, 219, 209, 129, 124, 67, 176, 205, 154, 31, 60, 153, 146, 207, 56, 6, 71, 82, 217, 84, 226]
\ No newline at end of file
import os
import shutil
import sys
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools")
sys.path.append(TOOLS_PATH)
from tools import download_file_and_uncompress, download_file
if __name__ == '__main__':
url = "https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz"
url2 = "https://paddlerec.bj.bcebos.com/deepfm%2Ffeat_dict_10.pkl2"
print("download and extract starting...")
download_file_and_uncompress(url)
download_file(url2, "./aid_data/feat_dict_10.pkl2", True)
print("download and extract finished")
print("preprocessing...")
os.system("python preprocess.py")
print("preprocess done")
shutil.rmtree("raw_data")
print("done")
import os
import numpy
from collections import Counter
import shutil
import pickle
def get_raw_data():
if not os.path.isdir('raw_data'):
os.mkdir('raw_data')
fin = open('train.txt', 'r')
fout = open('raw_data/part-0', 'w')
for line_idx, line in enumerate(fin):
if line_idx % 200000 == 0 and line_idx != 0:
fout.close()
cur_part_idx = int(line_idx / 200000)
fout = open('raw_data/part-' + str(cur_part_idx), 'w')
fout.write(line)
fout.close()
fin.close()
def split_data():
split_rate_ = 0.9
dir_train_file_idx_ = 'aid_data/train_file_idx.txt'
filelist_ = [
'raw_data/part-%d' % x for x in range(len(os.listdir('raw_data')))
]
if not os.path.exists(dir_train_file_idx_):
train_file_idx = list(
numpy.random.choice(
len(filelist_), int(len(filelist_) * split_rate_), False))
with open(dir_train_file_idx_, 'w') as fout:
fout.write(str(train_file_idx))
else:
with open(dir_train_file_idx_, 'r') as fin:
train_file_idx = eval(fin.read())
for idx in range(len(filelist_)):
if idx in train_file_idx:
shutil.move(filelist_[idx], 'train_data')
else:
shutil.move(filelist_[idx], 'test_data')
def get_feat_dict():
freq_ = 10
dir_feat_dict_ = 'aid_data/feat_dict_' + str(freq_) + '.pkl2'
continuous_range_ = range(1, 14)
categorical_range_ = range(14, 40)
if not os.path.exists(dir_feat_dict_):
# print('generate a feature dict')
# Count the number of occurrences of discrete features
feat_cnt = Counter()
with open('train.txt', 'r') as fin:
for line_idx, line in enumerate(fin):
if line_idx % 100000 == 0:
print('generating feature dict', line_idx / 45000000)
features = line.rstrip('\n').split('\t')
for idx in categorical_range_:
if features[idx] == '': continue
feat_cnt.update([features[idx]])
# Only retain discrete features with high frequency
dis_feat_set = set()
for feat, ot in feat_cnt.items():
if ot >= freq_:
dis_feat_set.add(feat)
# Create a dictionary for continuous and discrete features
feat_dict = {}
tc = 1
# Continuous features
for idx in continuous_range_:
feat_dict[idx] = tc
tc += 1
for feat in dis_feat_set:
feat_dict[feat] = tc
tc += 1
# Save dictionary
with open(dir_feat_dict_, 'wb') as fout:
pickle.dump(feat_dict, fout, protocol=2)
print('args.num_feat ', len(feat_dict) + 1)
if __name__ == '__main__':
if not os.path.isdir('train_data'):
os.mkdir('train_data')
if not os.path.isdir('test_data'):
os.mkdir('test_data')
if not os.path.isdir('aid_data'):
os.mkdir('aid_data')
get_raw_data()
split_data()
get_feat_dict()
print('Done!')
import os
import shutil
import sys
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools")
sys.path.append(TOOLS_PATH)
from tools import download_file_and_uncompress
if __name__ == '__main__':
url = "https://paddlerec.bj.bcebos.com/deepfm%2Fdist_data_demo.tar.gz"
print("download and extract starting...")
download_file_and_uncompress(url, savename="dist_data_demo.tar.gz")
print("download and extract finished")
print("preprocessing...")
os.system("python preprocess_dist.py")
print("preprocess done")
print("done")
\ No newline at end of file
import os
import numpy
from collections import Counter
import shutil
import pickle
SPLIT_RATIO = 0.9
INPUT_FILE = 'dist_data_demo.txt'
TRAIN_FILE = os.path.join('dist_train_data', 'tr')
TEST_FILE = os.path.join('dist_test_data', 'ev')
def split_data():
all_lines = []
for line in open(INPUT_FILE):
all_lines.append(line)
split_line_idx = int(len(all_lines) * SPLIT_RATIO)
with open(TRAIN_FILE, 'w') as f:
f.writelines(all_lines[:split_line_idx])
with open(TEST_FILE, 'w') as f:
f.writelines(all_lines[split_line_idx:])
def get_feat_dict():
freq_ = 10
dir_feat_dict_ = 'aid_data/feat_dict_' + str(freq_) + '.pkl2'
continuous_range_ = range(1, 14)
categorical_range_ = range(14, 40)
if not os.path.exists(dir_feat_dict_):
# print('generate a feature dict')
# Count the number of occurrences of discrete features
feat_cnt = Counter()
with open(INPUT_FILE, 'r') as fin:
for line_idx, line in enumerate(fin):
features = line.rstrip('\n').split('\t')
for idx in categorical_range_:
if features[idx] == '': continue
feat_cnt.update([features[idx]])
# Only retain discrete features with high frequency
# not filter low freq in small dataset
freq_ = 0
feat_set = set()
for feat, ot in feat_cnt.items():
if ot >= freq_:
feat_set.add(feat)
# Create a dictionary for continuous and discrete features
feat_dict = {}
tc = 1
# Continuous features
for idx in continuous_range_:
feat_dict[idx] = tc
tc += 1
for feat in feat_set:
feat_dict[feat] = tc
tc += 1
with open(dir_feat_dict_, 'wb') as fout:
pickle.dump(feat_dict, fout, protocol=2)
print('args.num_feat ', len(feat_dict) + 1)
if __name__ == '__main__':
if not os.path.isdir('dist_train_data'):
os.mkdir('dist_train_data')
if not os.path.isdir('dist_test_data'):
os.mkdir('dist_test_data')
if not os.path.isdir('aid_data'):
os.mkdir('aid_data')
split_data()
get_feat_dict()
print('Done!')
import paddle.fluid as fluid
import math
def ctr_deepfm_model(embedding_size,
num_field,
num_feat,
layer_sizes,
act,
reg,
is_sparse=False):
init_value_ = 0.1
raw_feat_idx = fluid.data(
name='feat_idx', shape=[None, num_field], dtype='int64')
raw_feat_value = fluid.data(
name='feat_value', shape=[None, num_field], dtype='float32')
label = fluid.data(
name='label', shape=[None, 1], dtype='float32') # None * 1
feat_idx = fluid.layers.reshape(raw_feat_idx,
[-1, 1]) # (None * num_field) * 1
feat_value = fluid.layers.reshape(
raw_feat_value, [-1, num_field, 1]) # None * num_field * 1
# -------------------- first order term --------------------
first_weights_re = fluid.embedding(
input=feat_idx,
is_sparse=is_sparse,
dtype='float32',
size=[num_feat + 1, 1],
padding_idx=0,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_),
regularizer=fluid.regularizer.L1DecayRegularizer(reg)))
first_weights = fluid.layers.reshape(
first_weights_re, shape=[-1, num_field, 1]) # None * num_field * 1
y_first_order = fluid.layers.reduce_sum((first_weights * feat_value), 1)
# -------------------- second order term --------------------
feat_embeddings_re = fluid.embedding(
input=feat_idx,
is_sparse=is_sparse,
dtype='float32',
size=[num_feat + 1, embedding_size],
padding_idx=0,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_ / math.sqrt(float(embedding_size)))))
feat_embeddings = fluid.layers.reshape(
feat_embeddings_re,
shape=[-1, num_field,
embedding_size]) # None * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # None * num_field * embedding_size
# sum_square part
summed_features_emb = fluid.layers.reduce_sum(feat_embeddings,
1) # None * embedding_size
summed_features_emb_square = fluid.layers.square(
summed_features_emb) # None * embedding_size
# square_sum part
squared_features_emb = fluid.layers.square(
feat_embeddings) # None * num_field * embedding_size
squared_sum_features_emb = fluid.layers.reduce_sum(
squared_features_emb, 1) # None * embedding_size
y_second_order = 0.5 * fluid.layers.reduce_sum(
summed_features_emb_square - squared_sum_features_emb, 1,
keep_dim=True) # None * 1
# -------------------- DNN --------------------
y_dnn = fluid.layers.reshape(feat_embeddings,
[-1, num_field * embedding_size])
for s in layer_sizes:
y_dnn = fluid.layers.fc(
input=y_dnn,
size=s,
act=act,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_ / math.sqrt(float(10)))),
bias_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_)))
y_dnn = fluid.layers.fc(
input=y_dnn,
size=1,
act=None,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_)),
bias_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
loc=0.0, scale=init_value_)))
# ------------------- DeepFM ------------------
predict = fluid.layers.sigmoid(y_first_order + y_second_order + y_dnn)
cost = fluid.layers.log_loss(input=predict, label=label)
batch_cost = fluid.layers.reduce_sum(cost)
# for auc
predict_2d = fluid.layers.concat([1 - predict, predict], 1)
label_int = fluid.layers.cast(label, 'int64')
auc_var, batch_auc_var, auc_states = fluid.layers.auc(input=predict_2d,
label=label_int,
slide_steps=0)
return batch_cost, auc_var, [raw_feat_idx, raw_feat_value,
label], auc_states
import sys
import paddle.fluid as fluid
import logging
logging.basicConfig()
logger = logging.getLogger(__name__)
__all__ = ['check_version']
def check_version():
"""
Log error and exit when the installed version of paddlepaddle is
not satisfied.
"""
err = "PaddlePaddle version 1.6 or higher is required, " \
"or a suitable develop version is satisfied as well. \n" \
"Please make sure the version is good with your code." \
try:
fluid.require_version('1.6.0')
except Exception as e:
logger.error(err)
sys.exit(1)
[156, 51, 24, 103, 195, 35, 188, 16, 224, 173, 116, 3, 226, 11, 64, 94, 6, 70, 197, 164, 220, 77, 172, 194, 227, 12, 65, 129, 39, 38, 75, 210, 215, 36, 46, 185, 76, 222, 108, 78, 120, 71, 33, 189, 135, 97, 90, 219, 105, 205, 136, 167, 106, 29, 157, 125, 217, 121, 175, 143, 200, 45, 179, 37, 86, 140, 225, 47, 20, 228, 4, 209, 177, 178, 171, 58, 48, 118, 9, 149, 55, 192, 82, 17, 43, 54, 93, 96, 159, 216, 18, 206, 223, 104, 132, 182, 60, 109, 28, 180, 44, 166, 128, 27, 163, 141, 229, 102, 150, 7, 83, 198, 41, 191, 114, 117, 122, 161, 130, 174, 176, 160, 201, 49, 112, 69, 165, 95, 133, 92, 59, 110, 151, 203, 67, 169, 21, 66, 80, 22, 23, 152, 40, 127, 111, 186, 72, 26, 190, 42, 0, 63, 53, 124, 137, 85, 126, 196, 187, 208, 98, 25, 15, 170, 193, 168, 202, 31, 146, 147, 113, 32, 204, 131, 68, 84, 213, 19, 81, 79, 162, 199, 107, 50, 2, 207, 10, 181, 144, 139, 134, 62, 155, 142, 214, 212, 61, 52, 101, 99, 158, 145, 13, 153, 56, 184, 221]
\ No newline at end of file
import os
import shutil
import sys
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools")
sys.path.append(TOOLS_PATH)
from tools import download_file_and_uncompress, download_file
if __name__ == '__main__':
url = "https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz"
url2 = "https://paddlerec.bj.bcebos.com/deepfm%2Ffeat_dict_10.pkl2"
print("download and extract starting...")
download_file_and_uncompress(url)
if not os.path.exists("aid_data"):
os.makedirs("aid_data")
download_file(url2, "./aid_data/feat_dict_10.pkl2", True)
print("download and extract finished")
print("preprocessing...")
os.system("python preprocess.py")
print("preprocess done")
shutil.rmtree("raw_data")
print("done")
from __future__ import division
import os
import numpy
from collections import Counter
import shutil
import pickle
def get_raw_data(intput_file, raw_data, ins_per_file):
if not os.path.isdir(raw_data):
os.mkdir(raw_data)
fin = open(intput_file, 'r')
fout = open(os.path.join(raw_data, 'part-0'), 'w')
for line_idx, line in enumerate(fin):
if line_idx % ins_per_file == 0 and line_idx != 0:
fout.close()
cur_part_idx = int(line_idx / ins_per_file)
fout = open(
os.path.join(raw_data, 'part-' + str(cur_part_idx)), 'w')
fout.write(line)
fout.close()
fin.close()
def split_data(raw_data, aid_data, train_data, test_data):
split_rate_ = 0.9
dir_train_file_idx_ = os.path.join(aid_data, 'train_file_idx.txt')
filelist_ = [
os.path.join(raw_data, 'part-%d' % x)
for x in range(len(os.listdir(raw_data)))
]
if not os.path.exists(dir_train_file_idx_):
train_file_idx = list(
numpy.random.choice(
len(filelist_), int(len(filelist_) * split_rate_), False))
with open(dir_train_file_idx_, 'w') as fout:
fout.write(str(train_file_idx))
else:
with open(dir_train_file_idx_, 'r') as fin:
train_file_idx = eval(fin.read())
for idx in range(len(filelist_)):
if idx in train_file_idx:
shutil.move(filelist_[idx], train_data)
else:
shutil.move(filelist_[idx], test_data)
def get_feat_dict(intput_file, aid_data, print_freq=100000, total_ins=45000000):
freq_ = 10
dir_feat_dict_ = os.path.join(aid_data, 'feat_dict_' + str(freq_) + '.pkl2')
continuous_range_ = range(1, 14)
categorical_range_ = range(14, 40)
if not os.path.exists(dir_feat_dict_):
# print('generate a feature dict')
# Count the number of occurrences of discrete features
feat_cnt = Counter()
with open(intput_file, 'r') as fin:
for line_idx, line in enumerate(fin):
if line_idx % print_freq == 0:
print(r'generating feature dict {:.2f} %'.format((
line_idx / total_ins) * 100))
features = line.rstrip('\n').split('\t')
for idx in categorical_range_:
if features[idx] == '': continue
feat_cnt.update([features[idx]])
# Only retain discrete features with high frequency
dis_feat_set = set()
for feat, ot in feat_cnt.items():
if ot >= freq_:
dis_feat_set.add(feat)
# Create a dictionary for continuous and discrete features
feat_dict = {}
tc = 1
# Continuous features
for idx in continuous_range_:
feat_dict[idx] = tc
tc += 1
for feat in dis_feat_set:
feat_dict[feat] = tc
tc += 1
# Save dictionary
with open(dir_feat_dict_, 'wb') as fout:
pickle.dump(feat_dict, fout, protocol=2)
print('args.num_feat ', len(feat_dict) + 1)
def preprocess(input_file,
outdir,
ins_per_file,
total_ins=None,
print_freq=None):
train_data = os.path.join(outdir, "train_data")
test_data = os.path.join(outdir, "test_data")
aid_data = os.path.join(outdir, "aid_data")
raw_data = os.path.join(outdir, "raw_data")
if not os.path.isdir(train_data):
os.mkdir(train_data)
if not os.path.isdir(test_data):
os.mkdir(test_data)
if not os.path.isdir(aid_data):
os.mkdir(aid_data)
if print_freq is None:
print_freq = 10 * ins_per_file
get_raw_data(input_file, raw_data, ins_per_file)
split_data(raw_data, aid_data, train_data, test_data)
get_feat_dict(input_file, aid_data, print_freq, total_ins)
print('Done!')
if __name__ == '__main__':
preprocess('train.txt', './', 200000, 45000000)
import math
import paddle
import paddle.fluid as fluid
from paddle.fluid.dygraph.nn import Linear, Embedding
class DeepFM(fluid.dygraph.Layer):
class DeepFM(paddle.nn.Layer):
def __init__(self, args):
super(DeepFM, self).__init__()
self.args = args
......@@ -14,9 +13,9 @@ class DeepFM(fluid.dygraph.Layer):
self.dnn = DNN(args)
def forward(self, raw_feat_idx, raw_feat_value, label):
feat_idx = fluid.layers.reshape(raw_feat_idx,
feat_idx = paddle.fluid.layers.reshape(raw_feat_idx,
[-1, 1]) # (None * num_field) * 1
feat_value = fluid.layers.reshape(
feat_value = paddle.fluid.layers.reshape(
raw_feat_value,
[-1, self.args.num_field, 1]) # None * num_field * 1
......@@ -24,31 +23,31 @@ class DeepFM(fluid.dygraph.Layer):
feat_value)
y_dnn = self.dnn(feat_embeddings)
predict = fluid.layers.sigmoid(y_first_order + y_second_order + y_dnn)
predict = paddle.nn.functional.sigmoid(y_first_order + y_second_order + y_dnn)
return predict
class FM(fluid.dygraph.Layer):
class FM(paddle.nn.Layer):
def __init__(self, args):
super(FM, self).__init__()
self.args = args
self.init_value_ = 0.1
self.embedding_w = Embedding(
self.embedding_w = paddle.fluid.dygraph.nn.Embedding(
size=[self.args.num_feat + 1, 1],
dtype='float32',
padding_idx=0,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
param_attr=paddle.ParamAttr(
initializer=paddle.nn.initializer.TruncatedNormal(
loc=0.0, scale=self.init_value_),
regularizer=fluid.regularizer.L1DecayRegularizer(
regularizer=paddle.fluid.regularizer.L1DecayRegularizer(
self.args.reg)))
self.embedding = Embedding(
self.embedding = paddle.fluid.dygraph.nn.Embedding(
size=[self.args.num_feat + 1, self.args.embedding_size],
dtype='float32',
padding_idx=0,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
param_attr=paddle.ParamAttr(
initializer=paddle.nn.initializer.TruncatedNormal(
loc=0.0,
scale=self.init_value_ /
math.sqrt(float(self.args.embedding_size)))))
......@@ -56,32 +55,32 @@ class FM(fluid.dygraph.Layer):
def forward(self, feat_idx, feat_value):
# -------------------- first order term --------------------
first_weights_re = self.embedding_w(feat_idx)
first_weights = fluid.layers.reshape(
first_weights = paddle.fluid.layers.reshape(
first_weights_re,
shape=[-1, self.args.num_field, 1]) # None * num_field * 1
y_first_order = fluid.layers.reduce_sum(first_weights * feat_value, 1)
y_first_order = paddle.reduce_sum(first_weights * feat_value, 1)
# -------------------- second order term --------------------
feat_embeddings_re = self.embedding(feat_idx)
feat_embeddings = fluid.layers.reshape(
feat_embeddings = paddle.fluid.layers.reshape(
feat_embeddings_re,
shape=[-1, self.args.num_field, self.args.embedding_size
]) # None * num_field * embedding_size
feat_embeddings = feat_embeddings * feat_value # None * num_field * embedding_size
# sum_square part
summed_features_emb = fluid.layers.reduce_sum(
summed_features_emb = paddle.reduce_sum(
feat_embeddings, 1) # None * embedding_size
summed_features_emb_square = fluid.layers.square(
summed_features_emb_square = paddle.square(
summed_features_emb) # None * embedding_size
# square_sum part
squared_features_emb = fluid.layers.square(
squared_features_emb = paddle.square(
feat_embeddings) # None * num_field * embedding_size
squared_sum_features_emb = fluid.layers.reduce_sum(
squared_sum_features_emb = paddle.reduce_sum(
squared_features_emb, 1) # None * embedding_size
y_second_order = 0.5 * fluid.layers.reduce_sum(
y_second_order = 0.5 * paddle.reduce_sum(
summed_features_emb_square - squared_sum_features_emb,
1,
keep_dim=True) # None * 1
......@@ -89,7 +88,7 @@ class FM(fluid.dygraph.Layer):
return y_first_order, y_second_order, feat_embeddings
class DNN(fluid.dygraph.Layer):
class DNN(paddle.nn.Layer):
def __init__(self, args):
super(DNN, self).__init__()
self.args = args
......@@ -101,25 +100,29 @@ class DNN(fluid.dygraph.Layer):
self.init_value_ / math.sqrt(float(10))
for _ in range(len(self.args.layer_sizes))
] + [self.init_value_]
self.linears = []
self._layers = []
for i in range(len(self.args.layer_sizes) + 1):
linear = Linear(
sizes[i],
sizes[i + 1],
act=acts[i],
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
linear = paddle.nn.Linear(
in_features=sizes[i],
out_features=sizes[i + 1],
weight_attr=paddle.ParamAttr(
initializer=paddle.nn.initializer.TruncatedNormal(
loc=0.0, scale=w_scales[i])),
bias_attr=fluid.ParamAttr(
initializer=fluid.initializer.TruncatedNormalInitializer(
bias_attr=paddle.ParamAttr(
initializer=paddle.nn.initializer.TruncatedNormal(
loc=0.0, scale=self.init_value_)))
#linear = getattr(paddle.nn.functional, acts[i])(linear) if acts[i] else linear
if acts[i] == 'relu':
act = paddle.nn.ReLU()
self.add_sublayer('act_%d' % i, act)
self.add_sublayer('linear_%d' % i, linear)
self.linears.append(linear)
self._layers.append(linear)
self._layers.append(act)
def forward(self, feat_embeddings):
y_dnn = fluid.layers.reshape(
y_dnn = paddle.fluid.layers.reshape(
feat_embeddings,
[-1, self.args.num_field * self.args.embedding_size])
for linear in self.linears:
y_dnn = linear(y_dnn)
for n_layer in self._layers:
y_dnn = n_layer(y_dnn)
return y_dnn
......@@ -3,9 +3,8 @@ from __future__ import print_function
import os
import numpy as np
import paddle.fluid as fluid
import paddle
import time
from paddle.fluid.dygraph.base import to_variable
import logging
import data_reader
......@@ -19,134 +18,135 @@ logger = logging.getLogger(__name__)
def train(args):
if args.use_gpu:
place = fluid.CUDAPlace(0)
place = paddle.CUDAPlace(0)
else:
place = fluid.CPUPlace()
with fluid.dygraph.guard(place):
deepfm = DeepFM(args)
train_filelist = [
os.path.join(args.train_data_dir, x)
for x in os.listdir(args.train_data_dir)
]
test_filelist = [
os.path.join(args.test_data_dir, x)
for x in os.listdir(args.test_data_dir)
]
train_reader = data_reader.data_reader(
args.batch_size, train_filelist, args.feat_dict, data_type="train")
test_reader = data_reader.data_reader(
args.batch_size, test_filelist, args.feat_dict, data_type="test")
def eval(epoch):
deepfm.eval()
logger.info("start eval model.")
total_step = 0.0
auc_metric_test = fluid.metrics.Auc("ROC")
for data in test_reader():
total_step += 1
raw_feat_idx, raw_feat_value, label = zip(*data)
raw_feat_idx = np.array(raw_feat_idx, dtype=np.int64)
raw_feat_value = np.array(raw_feat_value, dtype=np.float32)
label = np.array(label, dtype=np.int64)
raw_feat_idx, raw_feat_value, label = [
to_variable(i)
for i in [raw_feat_idx, raw_feat_value, label]
]
predict = deepfm(raw_feat_idx, raw_feat_value, label)
place = paddle.CPUPlace()
paddle.disable_static(place)
deepfm = DeepFM(args)
train_filelist = [
os.path.join(args.train_data_dir, x)
for x in os.listdir(args.train_data_dir)
]
test_filelist = [
os.path.join(args.test_data_dir, x)
for x in os.listdir(args.test_data_dir)
]
train_reader = data_reader.data_reader(
args.batch_size, train_filelist, args.feat_dict, data_type="train")
test_reader = data_reader.data_reader(
args.batch_size, test_filelist, args.feat_dict, data_type="test")
def eval(epoch):
deepfm.eval()
logger.info("start eval model.")
total_step = 0.0
auc_metric_test = paddle.fluid.metrics.Auc("ROC")
for data in test_reader():
total_step += 1
raw_feat_idx, raw_feat_value, label = zip(*data)
raw_feat_idx = np.array(raw_feat_idx, dtype=np.int64)
raw_feat_value = np.array(raw_feat_value, dtype=np.float32)
label = np.array(label, dtype=np.int64)
raw_feat_idx, raw_feat_value, label = [
paddle.to_tensor(data=i, dtype=None, place=None, stop_gradient=True)
for i in [raw_feat_idx, raw_feat_value, label]
]
predict = deepfm(raw_feat_idx, raw_feat_value, label)
# for auc
predict_2d = fluid.layers.concat([1 - predict, predict], 1)
auc_metric_test.update(
preds=predict_2d.numpy(), labels=label.numpy())
predict_2d = paddle.concat(x=[1 - predict, predict], axis=1)
auc_metric_test.update(
preds=predict_2d.numpy(), labels=label.numpy())
logger.info("test auc of epoch %d is %.6f" %
(epoch, auc_metric_test.eval()))
logger.info("test auc of epoch %d is %.6f" %
(epoch, auc_metric_test.eval()))
optimizer = fluid.optimizer.Adam(
parameter_list=deepfm.parameters(),
regularization=fluid.regularizer.L2DecayRegularizer(args.reg))
optimizer = paddle.optimizer.Adam(
parameters=deepfm.parameters(),
weight_decay=paddle.fluid.regularizer.L2DecayRegularizer(args.reg))
# load model if exists
start_epoch = 0
if args.checkpoint:
model_dict, optimizer_dict = fluid.dygraph.load_dygraph(
args.checkpoint)
deepfm.set_dict(model_dict)
optimizer.set_dict(optimizer_dict)
start_epoch = int(
os.path.basename(args.checkpoint).split("_")[
-1]) + 1 # get next train epoch
logger.info("load model {} finished.".format(args.checkpoint))
for epoch in range(start_epoch, args.num_epoch):
begin = time.time()
batch_begin = time.time()
batch_id = 0
total_loss = 0.0
auc_metric = fluid.metrics.Auc("ROC")
logger.info("training epoch {} start.".format(epoch))
for data in train_reader():
raw_feat_idx, raw_feat_value, label = zip(*data)
raw_feat_idx = np.array(raw_feat_idx, dtype=np.int64)
raw_feat_value = np.array(raw_feat_value, dtype=np.float32)
label = np.array(label, dtype=np.int64)
raw_feat_idx, raw_feat_value, label = [
to_variable(i)
for i in [raw_feat_idx, raw_feat_value, label]
]
predict = deepfm(raw_feat_idx, raw_feat_value, label)
loss = fluid.layers.log_loss(
input=predict,
label=fluid.layers.cast(
label, dtype="float32"))
batch_loss = fluid.layers.reduce_sum(loss)
total_loss += batch_loss.numpy().item()
batch_loss.backward()
optimizer.minimize(batch_loss)
deepfm.clear_gradients()
start_epoch = 0
if args.checkpoint:
model_dict, optimizer_dict = paddle.fluid.dygraph.load_dygraph(
args.checkpoint)
deepfm.set_dict(model_dict)
optimizer.set_dict(optimizer_dict)
start_epoch = int(
os.path.basename(args.checkpoint).split("_")[
-1]) + 1 # get next train epoch
logger.info("load model {} finished.".format(args.checkpoint))
for epoch in range(start_epoch, args.num_epoch):
begin = time.time()
batch_begin = time.time()
batch_id = 0
total_loss = 0.0
auc_metric = paddle.fluid.metrics.Auc("ROC")
logger.info("training epoch {} start.".format(epoch))
for data in train_reader():
raw_feat_idx, raw_feat_value, label = zip(*data)
raw_feat_idx = np.array(raw_feat_idx, dtype=np.int64)
raw_feat_value = np.array(raw_feat_value, dtype=np.float32)
label = np.array(label, dtype=np.int64)
raw_feat_idx, raw_feat_value, label = [
paddle.to_tensor(data=i, dtype=None, place=None, stop_gradient=True)
for i in [raw_feat_idx, raw_feat_value, label]
]
predict = deepfm(raw_feat_idx, raw_feat_value, label)
loss = paddle.nn.functional.log_loss(
input=predict,
label=paddle.cast(
label, dtype="float32"))
batch_loss = paddle.reduce_sum(loss)
total_loss += batch_loss.numpy().item()
batch_loss.backward()
optimizer.minimize(batch_loss)
deepfm.clear_gradients()
# for auc
predict_2d = fluid.layers.concat([1 - predict, predict], 1)
auc_metric.update(
preds=predict_2d.numpy(), labels=label.numpy())
if batch_id > 0 and batch_id % 100 == 0:
logger.info(
"epoch: {}, batch_id: {}, loss: {:.6f}, auc: {:.6f}, speed: {:.2f} ins/s".
format(epoch, batch_id, total_loss / args.batch_size /
100,
auc_metric.eval(), 100 * args.batch_size / (
time.time() - batch_begin)))
batch_begin = time.time()
total_loss = 0.0
batch_id += 1
logger.info("epoch %d is finished and takes %f s" %
(epoch, time.time() - begin))
predict_2d = paddle.concat(x=[1 - predict, predict], axis=1)
auc_metric.update(
preds=predict_2d.numpy(), labels=label.numpy())
if batch_id > 0 and batch_id % 100 == 0:
logger.info(
"epoch: {}, batch_id: {}, loss: {:.6f}, auc: {:.6f}, speed: {:.2f} ins/s".
format(epoch, batch_id, total_loss / args.batch_size /
100,
auc_metric.eval(), 100 * args.batch_size / (
time.time() - batch_begin)))
batch_begin = time.time()
total_loss = 0.0
batch_id += 1
logger.info("epoch %d is finished and takes %f s" %
(epoch, time.time() - begin))
# save model and optimizer
logger.info("going to save epoch {} model and optimizer.".format(
epoch))
fluid.dygraph.save_dygraph(
deepfm.state_dict(),
model_path=os.path.join(args.model_output_dir,
"epoch_" + str(epoch)))
fluid.dygraph.save_dygraph(
optimizer.state_dict(),
model_path=os.path.join(args.model_output_dir,
"epoch_" + str(epoch)))
logger.info("save epoch {} finished.".format(epoch))
logger.info("going to save epoch {} model and optimizer.".format(
epoch))
paddle.fluid.dygraph.save_dygraph(
deepfm.state_dict(),
model_path=os.path.join(args.model_output_dir,
"epoch_" + str(epoch)))
paddle.fluid.dygraph.save_dygraph(
optimizer.state_dict(),
model_path=os.path.join(args.model_output_dir,
"epoch_" + str(epoch)))
logger.info("save epoch {} finished.".format(epoch))
# eval model
deepfm.eval()
eval(epoch)
deepfm.train()
deepfm.eval()
eval(epoch)
deepfm.train()
paddle.enable_static()
if __name__ == '__main__':
......
# DIN
以下是本例的简要目录结构及说明:
```text
.
├── README.md # 文档
├── train.py # 训练脚本
├── infer.py # 预测脚本
├── network.py # 网络结构
├── reader.py # 和读取数据相关的函数
├── data/
├── build_dataset.py # 文本数据转化为paddle数据
├── convert_pd.py # 将原始数据转化为pandas的dataframe
├── data_process.sh # 数据预处理脚本
├── remap_id.py # remap类别id
```
models/PaddleRec只是提供了经典推荐算法的Paddle实现,我们已经开源了功能更强大的工具组件[PaddlePaddle/PaddleRec](https://github.com/PaddlePaddle/PaddleRec) 打通了推荐算法+分布式训练全流程,并提供了高级API,在单机和分布式间可以实现无缝切换。后续我们将在[PaddlePaddle/PaddleRec](https://github.com/PaddlePaddle/PaddleRec) Repo中发布新的模型和功能,models/PaddleRec不再更新维护。
## 简介
......@@ -26,104 +11,3 @@ DIN通过一个兴趣激活模块(Activation Unit),用预估目标Candidate AD
权重高的历史行为表明这部分兴趣和当前广告相关,权重低的则是和广告无关的”兴趣噪声“。我们通过将激活的商品和激活权重相乘,然后累加起来作为当前预估目标ADs相关的兴趣状态表达。
最后我们将这相关的用户兴趣表达、用户静态特征和上下文相关特征,以及ad相关的特征拼接起来,输入到后续的多层DNN网络,最后预测得到用户对当前目标ADs的点击概率。
## 数据下载及预处理
* Step 1: 运行如下命令 下载[Amazon Product数据集](http://jmcauley.ucsd.edu/data/amazon/)并进行预处理
```
cd data && sh data_process.sh && cd ..
```
如果执行过程中遇到找不到某个包(例如pandas包)的报错,使用如下命令安装对应的包即可。
```
pip install pandas
```
**Windows系统下请用户自行下载数据进行解压,下载链接为:[reviews_Electronics](http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Electronics_5.json.gz)和[meta_Electronics](http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/meta_Electronics.json.gz)。**
* Step 2: 产生训练集、测试集和config文件
```
python build_dataset.py
```
运行之后在data文件夹下会产生config.txt、paddle_test.txt、paddle_train.txt三个文件
数据格式例子如下:
```
3737 19450;288 196;18486;674;1
3647 4342 6855 3805;281 463 558 674;4206;463;1
1805 4309;87 87;21354;556;1
18209 20753;649 241;51924;610;0
13150;351;41455;792;1
35120 40418;157 714;52035;724;0
```
其中每一行是一个Sample,由分号分隔的5个域组成。前两个域是历史交互的item序列和item对应的类别,第三、四个域是待预测的item和其类别,最后一个域是label,表示点击与否。
## 训练
具体的参数配置说明可通过运行下列代码查看
```
python train.py -h
```
gpu 单机单卡训练
``` bash
CUDA_VISIBLE_DEVICES=1 python -u train.py --config_path 'data/config.txt' --train_dir 'data/paddle_train.txt' --batch_size 32 --epoch_num 100 --use_cuda 1 > log.txt 2>&1 &
```
cpu 单机训练
``` bash
python -u train.py --config_path 'data/config.txt' --train_dir 'data/paddle_train.txt' --batch_size 32 --epoch_num 100 --use_cuda 0 > log.txt 2>&1 &
```
值得注意的是上述单卡训练可以通过加--parallel 1参数使用Parallel Executor来进行加速
gpu 单机多卡训练
``` bash
CUDA_VISIBLE_DEVICES=0,1 python -u train.py --config_path 'data/config.txt' --train_dir 'data/paddle_train.txt' --batch_size 32 --epoch_num 100 --use_cuda 1 --parallel 1 --num_devices 2 > log.txt 2>&1 &
```
cpu 单机多卡训练
``` bash
CPU_NUM=10 python -u train.py --config_path 'data/config.txt' --train_dir 'data/paddle_train.txt' --batch_size 32 --epoch_num 100 --use_cuda 0 --parallel 1 --num_devices 10 > log.txt 2>&1 &
```
## 训练结果示例
我们在Tesla K40m单GPU卡上训练的日志如下所示(以实际输出为准)
```text
2019-02-22 09:31:51,578 - INFO - reading data begins
2019-02-22 09:32:22,407 - INFO - reading data completes
W0222 09:32:24.151955 7221 device_context.cc:263] Please NOTE: device: 0, CUDA Capability: 35, Driver API Version: 9.0, Runtime API Version: 8.0
W0222 09:32:24.152046 7221 device_context.cc:271] device: 0, cuDNN Version: 7.0.
2019-02-22 09:32:27,797 - INFO - train begins
epoch: 1 global_step: 1000 train_loss: 0.6950 time: 14.64
epoch: 1 global_step: 2000 train_loss: 0.6854 time: 15.41
epoch: 1 global_step: 3000 train_loss: 0.6799 time: 14.84
...
model saved in din_amazon/global_step_50000
...
```
提示:
* 在单机条件下,使用代码中默认的超参数运行时,产生最优auc的global step大致在440000到500000之间
* 训练超出一定的epoch后会稍稍出现过拟合
## 预测
参考如下命令,开始预测.
其中model_path为模型的路径,test_path为测试数据路径。
```
CUDA_VISIBLE_DEVICES=3 python infer.py --model_path 'din_amazon/global_step_400000' --test_path 'data/paddle_test.txt' --use_cuda 1
```
## 预测结果示例
```text
2019-02-22 11:22:58,804 - INFO - TEST --> loss: [0.47005194] auc:0.863794952818
```
# this file is only used for continuous evaluation test!
import os
import sys
sys.path.append(os.environ['ceroot'])
from kpi import CostKpi
from kpi import DurationKpi
each_pass_duration_card1_kpi = DurationKpi(
'each_pass_duration_card1', 0.08, 0, actived=True)
train_loss_card1_kpi = CostKpi('train_loss_card1', 0.08, 0)
each_pass_duration_card4_kpi = DurationKpi(
'each_pass_duration_card4', 0.08, 0, actived=True)
train_loss_card4_kpi = CostKpi('train_loss_card4', 0.08, 0)
tracking_kpis = [
each_pass_duration_card1_kpi,
train_loss_card1_kpi,
each_pass_duration_card4_kpi,
train_loss_card4_kpi,
]
def parse_log(log):
'''
This method should be implemented by model developers.
The suggestion:
each line in the log should be key, value, for example:
"
train_cost\t1.0
test_cost\t1.0
train_cost\t1.0
train_cost\t1.0
train_acc\t1.2
"
'''
for line in log.split('\n'):
fs = line.strip().split('\t')
print(fs)
if len(fs) == 3 and fs[0] == 'kpis':
kpi_name = fs[1]
kpi_value = float(fs[2])
yield kpi_name, kpi_value
def log_to_ce(log):
kpi_tracker = {}
for kpi in tracking_kpis:
kpi_tracker[kpi.name] = kpi
for (kpi_name, kpi_value) in parse_log(log):
print(kpi_name, kpi_value)
kpi_tracker[kpi_name].add_record(kpi_value)
kpi_tracker[kpi_name].persist()
if __name__ == '__main__':
log = sys.stdin.read()
log_to_ce(log)
from __future__ import print_function
import random
import pickle
random.seed(1234)
print("read and process data")
with open('./raw_data/remap.pkl', 'rb') as f:
reviews_df = pickle.load(f)
cate_list = pickle.load(f)
user_count, item_count, cate_count, example_count = pickle.load(f)
train_set = []
test_set = []
for reviewerID, hist in reviews_df.groupby('reviewerID'):
pos_list = hist['asin'].tolist()
def gen_neg():
neg = pos_list[0]
while neg in pos_list:
neg = random.randint(0, item_count - 1)
return neg
neg_list = [gen_neg() for i in range(len(pos_list))]
for i in range(1, len(pos_list)):
hist = pos_list[:i]
if i != len(pos_list) - 1:
train_set.append((reviewerID, hist, pos_list[i], 1))
train_set.append((reviewerID, hist, neg_list[i], 0))
else:
label = (pos_list[i], neg_list[i])
test_set.append((reviewerID, hist, label))
random.shuffle(train_set)
random.shuffle(test_set)
assert len(test_set) == user_count
def print_to_file(data, fout):
for i in range(len(data)):
fout.write(str(data[i]))
if i != len(data) - 1:
fout.write(' ')
else:
fout.write(';')
print("make train data")
with open("paddle_train.txt", "w") as fout:
for line in train_set:
history = line[1]
target = line[2]
label = line[3]
cate = [cate_list[x] for x in history]
print_to_file(history, fout)
print_to_file(cate, fout)
fout.write(str(target) + ";")
fout.write(str(cate_list[target]) + ";")
fout.write(str(label) + "\n")
print("make test data")
with open("paddle_test.txt", "w") as fout:
for line in test_set:
history = line[1]
target = line[2]
cate = [cate_list[x] for x in history]
print_to_file(history, fout)
print_to_file(cate, fout)
fout.write(str(target[0]) + ";")
fout.write(str(cate_list[target[0]]) + ";")
fout.write("1\n")
print_to_file(history, fout)
print_to_file(cate, fout)
fout.write(str(target[1]) + ";")
fout.write(str(cate_list[target[1]]) + ";")
fout.write("0\n")
print("make config data")
with open('config.txt', 'w') as f:
f.write(str(user_count) + "\n")
f.write(str(item_count) + "\n")
f.write(str(cate_count) + "\n")
from __future__ import print_function
import pickle
import pandas as pd
def to_df(file_path):
with open(file_path, 'r') as fin:
df = {}
i = 0
for line in fin:
df[i] = eval(line)
i += 1
df = pd.DataFrame.from_dict(df, orient='index')
return df
print("start to analyse reviews_Electronics_5.json")
reviews_df = to_df('./raw_data/reviews_Electronics_5.json')
with open('./raw_data/reviews.pkl', 'wb') as f:
pickle.dump(reviews_df, f, pickle.HIGHEST_PROTOCOL)
print("start to analyse meta_Electronics.json")
meta_df = to_df('./raw_data/meta_Electronics.json')
meta_df = meta_df[meta_df['asin'].isin(reviews_df['asin'].unique())]
meta_df = meta_df.reset_index(drop=True)
with open('./raw_data/meta.pkl', 'wb') as f:
pickle.dump(meta_df, f, pickle.HIGHEST_PROTOCOL)
#! /bin/bash
set -e
echo "begin download data"
mkdir raw_data
cd raw_data
wget -c http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Electronics_5.json.gz
gzip -d reviews_Electronics_5.json.gz
wget -c http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/meta_Electronics.json.gz
gzip -d meta_Electronics.json.gz
echo "download data successfully"
cd ..
python convert_pd.py
python remap_id.py
from __future__ import print_function
import random
import pickle
import numpy as np
random.seed(1234)
with open('./raw_data/reviews.pkl', 'rb') as f:
reviews_df = pickle.load(f)
reviews_df = reviews_df[['reviewerID', 'asin', 'unixReviewTime']]
with open('./raw_data/meta.pkl', 'rb') as f:
meta_df = pickle.load(f)
meta_df = meta_df[['asin', 'categories']]
meta_df['categories'] = meta_df['categories'].map(lambda x: x[-1][-1])
def build_map(df, col_name):
key = sorted(df[col_name].unique().tolist())
m = dict(zip(key, range(len(key))))
df[col_name] = df[col_name].map(lambda x: m[x])
return m, key
asin_map, asin_key = build_map(meta_df, 'asin')
cate_map, cate_key = build_map(meta_df, 'categories')
revi_map, revi_key = build_map(reviews_df, 'reviewerID')
user_count, item_count, cate_count, example_count =\
len(revi_map), len(asin_map), len(cate_map), reviews_df.shape[0]
print('user_count: %d\titem_count: %d\tcate_count: %d\texample_count: %d' %
(user_count, item_count, cate_count, example_count))
meta_df = meta_df.sort_values('asin')
meta_df = meta_df.reset_index(drop=True)
reviews_df['asin'] = reviews_df['asin'].map(lambda x: asin_map[x])
reviews_df = reviews_df.sort_values(['reviewerID', 'unixReviewTime'])
reviews_df = reviews_df.reset_index(drop=True)
reviews_df = reviews_df[['reviewerID', 'asin', 'unixReviewTime']]
cate_list = [meta_df['categories'][i] for i in range(len(asin_map))]
cate_list = np.array(cate_list, dtype=np.int32)
with open('./raw_data/remap.pkl', 'wb') as f:
pickle.dump(reviews_df, f, pickle.HIGHEST_PROTOCOL) # uid, iid
pickle.dump(cate_list, f, pickle.HIGHEST_PROTOCOL) # cid of iid line
pickle.dump((user_count, item_count, cate_count, example_count), f,
pickle.HIGHEST_PROTOCOL)
pickle.dump((asin_key, cate_key, revi_key), f, pickle.HIGHEST_PROTOCOL)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserve.
#
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
import paddle.fluid as fluid
def din_attention(hist, target_expand, mask):
"""activation weight"""
hidden_size = hist.shape[-1]
concat = fluid.layers.concat(
[hist, target_expand, hist - target_expand, hist * target_expand],
axis=2)
atten_fc1 = fluid.layers.fc(name="atten_fc1",
input=concat,
size=80,
act="sigmoid",
num_flatten_dims=2)
atten_fc2 = fluid.layers.fc(name="atten_fc2",
input=atten_fc1,
size=40,
act="sigmoid",
num_flatten_dims=2)
atten_fc3 = fluid.layers.fc(name="atten_fc3",
input=atten_fc2,
size=1,
num_flatten_dims=2)
atten_fc3 += mask
atten_fc3 = fluid.layers.transpose(x=atten_fc3, perm=[0, 2, 1])
atten_fc3 = fluid.layers.scale(x=atten_fc3, scale=hidden_size**-0.5)
weight = fluid.layers.softmax(atten_fc3)
out = fluid.layers.matmul(weight, hist)
out = fluid.layers.reshape(x=out, shape=[0, hidden_size])
return out
def network(item_count, cat_count):
"""network definition"""
seq_len = -1
item_emb_size = 64
cat_emb_size = 64
is_sparse = False
#significant for speeding up the training process
item_emb_attr = fluid.ParamAttr(name="item_emb")
cat_emb_attr = fluid.ParamAttr(name="cat_emb")
hist_item_seq = fluid.data(
name="hist_item_seq", shape=[None, seq_len], dtype="int64")
hist_cat_seq = fluid.data(
name="hist_cat_seq", shape=[None, seq_len], dtype="int64")
target_item = fluid.data(name="target_item", shape=[None], dtype="int64")
target_cat = fluid.data(name="target_cat", shape=[None], dtype="int64")
label = fluid.data(name="label", shape=[None, 1], dtype="float32")
mask = fluid.data(name="mask", shape=[None, seq_len, 1], dtype="float32")
target_item_seq = fluid.data(
name="target_item_seq", shape=[None, seq_len], dtype="int64")
target_cat_seq = fluid.data(
name="target_cat_seq", shape=[None, seq_len], dtype="int64")
hist_item_emb = fluid.embedding(
input=hist_item_seq,
size=[item_count, item_emb_size],
param_attr=item_emb_attr,
is_sparse=is_sparse)
hist_cat_emb = fluid.embedding(
input=hist_cat_seq,
size=[cat_count, cat_emb_size],
param_attr=cat_emb_attr,
is_sparse=is_sparse)
target_item_emb = fluid.embedding(
input=target_item,
size=[item_count, item_emb_size],
param_attr=item_emb_attr,
is_sparse=is_sparse)
target_cat_emb = fluid.embedding(
input=target_cat,
size=[cat_count, cat_emb_size],
param_attr=cat_emb_attr,
is_sparse=is_sparse)
target_item_seq_emb = fluid.embedding(
input=target_item_seq,
size=[item_count, item_emb_size],
param_attr=item_emb_attr,
is_sparse=is_sparse)
target_cat_seq_emb = fluid.embedding(
input=target_cat_seq,
size=[cat_count, cat_emb_size],
param_attr=cat_emb_attr,
is_sparse=is_sparse)
item_b = fluid.embedding(
input=target_item,
size=[item_count, 1],
param_attr=fluid.initializer.Constant(value=0.0))
hist_seq_concat = fluid.layers.concat([hist_item_emb, hist_cat_emb], axis=2)
target_seq_concat = fluid.layers.concat(
[target_item_seq_emb, target_cat_seq_emb], axis=2)
target_concat = fluid.layers.concat(
[target_item_emb, target_cat_emb], axis=1)
out = din_attention(hist_seq_concat, target_seq_concat, mask)
out_fc = fluid.layers.fc(name="out_fc",
input=out,
size=item_emb_size + cat_emb_size,
num_flatten_dims=1)
embedding_concat = fluid.layers.concat([out_fc, target_concat], axis=1)
fc1 = fluid.layers.fc(name="fc1",
input=embedding_concat,
size=80,
act="sigmoid")
fc2 = fluid.layers.fc(name="fc2", input=fc1, size=40, act="sigmoid")
fc3 = fluid.layers.fc(name="fc3", input=fc2, size=1)
logit = fc3 + item_b
loss = fluid.layers.sigmoid_cross_entropy_with_logits(x=logit, label=label)
avg_loss = fluid.layers.mean(loss)
return avg_loss, fluid.layers.sigmoid(logit), \
[hist_item_seq, hist_cat_seq, target_item, \
target_cat, label, mask, target_item_seq, target_cat_seq]
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserve.
#
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
import os
import random
import numpy as np
import paddle
import pickle
def pad_batch_data(input, max_len):
res = np.array([x + [0] * (max_len - len(x)) for x in input])
res = res.astype("int64").reshape([-1, max_len])
return res
def make_data(b):
max_len = max(len(x[0]) for x in b)
item = pad_batch_data([x[0] for x in b], max_len)
cat = pad_batch_data([x[1] for x in b], max_len)
len_array = [len(x[0]) for x in b]
mask = np.array(
[[0] * x + [-1e9] * (max_len - x) for x in len_array]).reshape(
[-1, max_len, 1])
target_item_seq = np.array(
[[x[2]] * max_len for x in b]).astype("int64").reshape([-1, max_len])
target_cat_seq = np.array(
[[x[3]] * max_len for x in b]).astype("int64").reshape([-1, max_len])
res = []
for i in range(len(b)):
res.append([
item[i], cat[i], b[i][2], b[i][3], b[i][4], mask[i],
target_item_seq[i], target_cat_seq[i]
])
return res
def batch_reader(reader, batch_size, group_size):
def batch_reader():
bg = []
for line in reader:
bg.append(line)
if len(bg) == group_size:
sortb = sorted(bg, key=lambda x: len(x[0]), reverse=False)
bg = []
for i in range(0, group_size, batch_size):
b = sortb[i:i + batch_size]
yield make_data(b)
len_bg = len(bg)
if len_bg != 0:
sortb = sorted(bg, key=lambda x: len(x[0]), reverse=False)
bg = []
remain = len_bg % batch_size
for i in range(0, len_bg - remain, batch_size):
b = sortb[i:i + batch_size]
yield make_data(b)
return batch_reader
def base_read(file_dir):
res = []
max_len = 0
with open(file_dir, "r") as fin:
for line in fin:
line = line.strip().split(';')
hist = line[0].split()
cate = line[1].split()
max_len = max(max_len, len(hist))
res.append([hist, cate, line[2], line[3], float(line[4])])
return res, max_len
def prepare_reader(data_path, bs):
data_set, max_len = base_read(data_path)
random.shuffle(data_set)
return batch_reader(data_set, bs, bs * 20), max_len
def config_read(config_path):
with open(config_path, "r") as fin:
user_count = int(fin.readline().strip())
item_count = int(fin.readline().strip())
cat_count = int(fin.readline().strip())
return user_count, item_count, cat_count
此差异已折叠。
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid.incubate.data_generator as dg
cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
hash_dim_ = 1000001
continuous_range_ = range(1, 14)
categorical_range_ = range(14, 40)
class CriteoDataset(dg.MultiSlotDataGenerator):
"""
DacDataset: inheritance MultiSlotDataGeneratior, Implement data reading
Help document: http://wiki.baidu.com/pages/viewpage.action?pageId=728820675
"""
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def reader():
"""
This function needs to be implemented by the user, based on data format
"""
features = line.rstrip('\n').split('\t')
dense_feature = []
sparse_feature = []
for idx in continuous_range_:
if features[idx] == "":
dense_feature.append(0.0)
else:
dense_feature.append(
(float(features[idx]) - cont_min_[idx - 1]) /
cont_diff_[idx - 1])
for idx in categorical_range_:
sparse_feature.append(
[hash(str(idx) + features[idx]) % hash_dim_])
label = [int(features[0])]
process_line = dense_feature, sparse_feature, label
feature_name = ["dense_feature"]
for idx in categorical_range_:
feature_name.append("C" + str(idx - 13))
feature_name.append("label")
yield zip(feature_name, [dense_feature] + sparse_feature + [label])
return reader
d = CriteoDataset()
d.run_from_stdin()
wget --no-check-certificate https://fleet.bj.bcebos.com/ctr_data.tar.gz
tar -zxvf ctr_data.tar.gz
mv ./raw_data ./train_data_full
mkdir train_data && cd train_data
cp ../train_data_full/part-0 ../train_data_full/part-1 ./ && cd ..
mv ./test_data ./test_data_full
mkdir test_data && cd test_data
cp ../test_data_full/part-220 ./ && cd ..
echo "Complete data download."
echo "Full Train data stored in ./train_data_full "
echo "Full Test data stored in ./test_data_full "
echo "Rapid Verification train data stored in ./train_data "
echo "Rapid Verification test data stored in ./test_data "
\ No newline at end of file
#!/usr/bin/python
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# There are 13 integer features and 26 categorical features
continous_features = range(1, 14)
categorial_features = range(14, 40)
continous_clip = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
class CriteoDataset(object):
def __init__(self, sparse_feature_dim):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.cont_max_ = [
20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50
]
self.cont_diff_ = [
20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50
]
self.hash_dim_ = sparse_feature_dim
# here, training data are lines with line_index < train_idx_
self.train_idx_ = 41256555
self.continuous_range_ = range(1, 14)
self.categorical_range_ = range(14, 40)
def _reader_creator(self, file_list, is_train, trainer_num, trainer_id):
def reader():
for file in file_list:
with open(file, 'r') as f:
line_idx = 0
for line in f:
line_idx += 1
features = line.rstrip('\n').split('\t')
dense_feature = []
sparse_feature = []
for idx in self.continuous_range_:
if features[idx] == '':
dense_feature.append(0.0)
else:
dense_feature.append(
(float(features[idx]) -
self.cont_min_[idx - 1]) /
self.cont_diff_[idx - 1])
for idx in self.categorical_range_:
sparse_feature.append([
hash(str(idx) + features[idx]) % self.hash_dim_
])
label = [int(features[0])]
yield [dense_feature] + sparse_feature + [label]
return reader
def train(self, file_list, trainer_num, trainer_id):
return self._reader_creator(file_list, True, trainer_num, trainer_id)
def test(self, file_list):
return self._reader_creator(file_list, False, 1, 0)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
import time
import six
import numpy as np
import logging
import argparse
import paddle
import paddle.fluid as fluid
from network_conf import CTR
import feed_generator as generator
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
def parse_args():
parser = argparse.ArgumentParser(description="PaddlePaddle CTR-DNN example")
# -------------Data & Model Path-------------
parser.add_argument(
'--test_files_path',
type=str,
default='./test_data',
help="The path of testing dataset")
parser.add_argument(
'--model_path',
type=str,
default='models',
help='The path for model to store (default: models)')
# -------------Running parameter-------------
parser.add_argument(
'--batch_size',
type=int,
default=1000,
help="The size of mini-batch (default:1000)")
parser.add_argument(
'--infer_epoch',
type=int,
default=0,
help='Specify which epoch to run infer')
# -------------Network parameter-------------
parser.add_argument(
'--embedding_size',
type=int,
default=10,
help="The size for embedding layer (default:10)")
parser.add_argument(
'--sparse_feature_dim',
type=int,
default=1000001,
help='sparse feature hashing space for index processing')
parser.add_argument(
'--dense_feature_dim', type=int, default=13, help='dense feature shape')
# -------------device parameter-------------
parser.add_argument(
'--is_local',
type=int,
default=0,
help='Local train or distributed train (default: 1)')
parser.add_argument(
'--is_cloud',
type=int,
default=0,
help='Local train or distributed train on paddlecloud (default: 0)')
return parser.parse_args()
def print_arguments(args):
"""
print arguments
"""
logger.info('----------- Configuration Arguments -----------')
for arg, value in sorted(six.iteritems(vars(args))):
logger.info('%s: %s' % (arg, value))
logger.info('------------------------------------------------')
def run_infer(args, model_path):
place = fluid.CPUPlace()
train_generator = generator.CriteoDataset(args.sparse_feature_dim)
file_list = [
os.path.join(args.test_files_path, x)
for x in os.listdir(args.test_files_path)
]
test_reader = fluid.io.batch(
train_generator.test(file_list), batch_size=args.batch_size)
startup_program = fluid.framework.Program()
test_program = fluid.framework.Program()
ctr_model = CTR()
def set_zero():
auc_states_names = [
'_generated_var_0', '_generated_var_1', '_generated_var_2',
'_generated_var_3'
]
for name in auc_states_names:
param = fluid.global_scope().var(name).get_tensor()
if param:
param_array = np.zeros(param._get_dims()).astype("int64")
param.set(param_array, place)
with fluid.framework.program_guard(test_program, startup_program):
with fluid.unique_name.guard():
inputs = ctr_model.input_data(args)
loss, auc_var = ctr_model.net(inputs, args)
exe = fluid.Executor(place)
feeder = fluid.DataFeeder(feed_list=inputs, place=place)
if args.is_cloud:
fluid.io.load_persistables(
executor=exe,
dirname=model_path,
main_program=fluid.default_main_program())
elif args.is_local:
fluid.load(fluid.default_main_program(),
os.path.join(model_path, "checkpoint"), exe)
set_zero()
run_index = 0
infer_auc = 0
L = []
for batch_id, data in enumerate(test_reader()):
loss_val, auc_val = exe.run(test_program,
feed=feeder.feed(data),
fetch_list=[loss, auc_var])
run_index += 1
infer_auc = auc_val
L.append(loss_val / args.batch_size)
if batch_id % 100 == 0:
logger.info("TEST --> batch: {} loss: {} auc: {}".format(
batch_id, loss_val / args.batch_size, auc_val))
infer_loss = np.mean(L)
infer_result = {}
infer_result['loss'] = infer_loss
infer_result['auc'] = infer_auc
log_path = os.path.join(model_path, 'infer_result.log')
logger.info(str(infer_result))
with open(log_path, 'w+') as f:
f.write(str(infer_result))
logger.info("Inference complete")
return infer_result
if __name__ == "__main__":
import paddle
paddle.enable_static()
args = parse_args()
print_arguments(args)
model_list = []
for _, dir, _ in os.walk(args.model_path):
for model in dir:
if "epoch" in model and args.infer_epoch == int(
model.split('_')[-1]):
path = os.path.join(args.model_path, model)
model_list.append(path)
if len(model_list) == 0:
logger.info(
"There is no satisfactory model {} at path {}, please check your start command & env. ".
format(str("epoch_") + str(args.infer_epoch), args.model_path))
for model in model_list:
logger.info("Test model {}".format(model))
run_infer(args, model)
#!/bin/bash
echo "WARNING: This script only for run Paddle Paddle CTR distribute training locally"
if [ ! -d "./models" ]; then
mkdir ./models
echo "Create model folder for store infer model"
fi
if [ ! -d "./log" ]; then
mkdir ./log
echo "Create log floder for store running log"
fi
if [ ! -d "./output" ]; then
mkdir ./output
echo "Create output floder"
fi
# kill existing server process
ps -ef|grep python|awk '{print $2}'|xargs kill -9
# environment variables for fleet distribute training
export PADDLE_TRAINER_ID=0
export PADDLE_TRAINERS_NUM=2
export OUTPUT_PATH="output"
export FLAGS_communicator_thread_pool_size=10
export FLAGS_communicator_fake_rpc=0
export FLAGS_communicator_is_sgd_optimizer=0
# follow parameter = cpu_num
export FLAGS_communicator_send_queue_size=2
export FLAGS_communicator_max_merge_var_num=2
export FLAGS_rpc_retry_times=3
export PADDLE_PSERVERS_IP_PORT_LIST="127.0.0.1:36011,127.0.0.1:36012"
export PADDLE_PSERVER_PORT_ARRAY=(36011 36012)
export PADDLE_PSERVER_NUMS=2
export PADDLE_TRAINERS=2
export TRAINING_ROLE=PSERVER
export GLOG_v=0
export GLOG_logtostderr=1
train_mode=$1
for((i=0;i<$PADDLE_PSERVER_NUMS;i++))
do
cur_port=${PADDLE_PSERVER_PORT_ARRAY[$i]}
echo "PADDLE WILL START PSERVER "$cur_port
export PADDLE_PORT=${cur_port}
export POD_IP=127.0.0.1
python -u train.py --save_model=1 --is_cloud=1 --cpu_num=2 &> ./log/pserver.$i.log &
done
export TRAINING_ROLE=TRAINER
export GLOG_v=0
export GLOG_logtostderr=1
for((i=0;i<$PADDLE_TRAINERS;i++))
do
echo "PADDLE WILL START Trainer "$i
PADDLE_TRAINER_ID=$i
python -u train.py --save_model=1 --is_cloud=1 --cpu_num=2 &> ./log/trainer.$i.log &
done
echo "Training log stored in ./log/"
#!/usr/bin/python
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
import math
class CTR(object):
"""
DNN for Click-Through Rate prediction
"""
def input_data(self, args):
dense_input = fluid.data(name="dense_input",
shape=[-1, args.dense_feature_dim],
dtype="float32")
sparse_input_ids = [
fluid.data(name="C" + str(i),
shape=[-1, 1],
lod_level=1,
dtype="int64") for i in range(1, 27)
]
label = fluid.data(name="label", shape=[-1, 1], dtype="int64")
inputs = [dense_input] + sparse_input_ids + [label]
return inputs
def net(self, inputs, args):
def embedding_layer(input):
return fluid.layers.embedding(
input=input,
is_sparse=True,
size=[args.sparse_feature_dim, args.embedding_size],
param_attr=fluid.ParamAttr(
name="SparseFeatFactors",
initializer=fluid.initializer.Uniform()),
)
sparse_embed_seq = list(map(embedding_layer, inputs[1:-1]))
concated = fluid.layers.concat(sparse_embed_seq + inputs[0:1], axis=1)
fc1 = fluid.layers.fc(
input=concated,
size=400,
act="relu",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(concated.shape[1]))),
)
fc2 = fluid.layers.fc(
input=fc1,
size=400,
act="relu",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc1.shape[1]))),
)
fc3 = fluid.layers.fc(
input=fc2,
size=400,
act="relu",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc2.shape[1]))),
)
predict = fluid.layers.fc(
input=fc3,
size=2,
act="softmax",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc3.shape[1]))),
)
cost = fluid.layers.cross_entropy(input=predict, label=inputs[-1])
avg_cost = fluid.layers.reduce_sum(cost)
auc_var, _, _ = fluid.layers.auc(input=predict,
label=inputs[-1],
num_thresholds=2**12,
slide_steps=20)
return avg_cost, auc_var
#!/usr/bin/python
# -*- coding=utf-8 -*-
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import argparse
import logging
import os
import six
import time
import random
import numpy as np
import paddle
import paddle.fluid as fluid
from network_conf import CTR
import utils
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig
# disable gpu training for this example
os.environ["CUDA_VISIBLE_DEVICES"] = ""
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
def parse_args():
parser = argparse.ArgumentParser(
description="PaddlePaddle CTR-DNN example")
# -------------Data & Model Path-------------
parser.add_argument(
'--train_files_path',
type=str,
default='./train_data',
help="The path of training dataset")
parser.add_argument(
'--test_files_path',
type=str,
default='./test_data',
help="The path of testing dataset")
parser.add_argument(
'--model_path',
type=str,
default='models',
help='The path for model to store (default: models)')
# -------------Training parameter-------------
parser.add_argument(
'--learning_rate',
type=float,
default=1e-4,
help="Initial learning rate for training")
parser.add_argument(
'--batch_size',
type=int,
default=1000,
help="The size of mini-batch (default:1000)")
parser.add_argument(
"--epochs",
type=int,
default=1,
help="Number of epochs for training.")
# -------------Network parameter-------------
parser.add_argument(
'--embedding_size',
type=int,
default=10,
help="The size for embedding layer (default:10)")
parser.add_argument(
'--sparse_feature_dim',
type=int,
default=1000001,
help='sparse feature hashing space for index processing')
parser.add_argument(
'--dense_feature_dim',
type=int,
default=13,
help='dense feature shape')
# -------------device parameter-------------
parser.add_argument(
'--is_local',
type=int,
default=0,
help='Local train or distributed train (default: 1)')
parser.add_argument(
'--is_cloud',
type=int,
default=0,
help='Local train or distributed train on paddlecloud (default: 0)')
parser.add_argument(
'--save_model',
type=int,
default=0,
help='Save training model or not')
parser.add_argument(
'--cpu_num',
type=int,
default=2,
help='threads for ctr training')
return parser.parse_args()
def print_arguments(args):
"""
print arguments
"""
logger.info('----------- Configuration Arguments -----------')
for arg, value in sorted(six.iteritems(vars(args))):
logger.info('%s: %s' % (arg, value))
logger.info('------------------------------------------------')
def get_dataset(inputs, args):
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs)
dataset.set_pipe_command("python dataset_generator.py")
dataset.set_batch_size(args.batch_size)
thread_num = int(args.cpu_num)
dataset.set_thread(thread_num)
file_list = [
os.path.join(args.train_files_path, x) for x in os.listdir(args.train_files_path)
]
# 请确保每一个训练节点都持有不同的训练文件
# 当我们用本地多进程模拟分布式时,每个进程需要拿到不同的文件
# 使用 fleet.split_files 可以便捷的以文件为单位根据节点编号分配训练样本
if int(args.is_cloud):
file_list = fleet.split_files(file_list)
logger.info("file list: {}".format(file_list))
return dataset, file_list
def local_train(args):
# 引入模型的组网
ctr_model = CTR()
inputs = ctr_model.input_data(args)
avg_cost, auc_var = ctr_model.net(inputs, args)
# 选择反向更新优化策略
optimizer = fluid.optimizer.Adam(args.learning_rate)
optimizer.minimize(avg_cost)
# 在CPU上创建训练的执行器并做参数初始化
exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program())
# 引入训练数据读取器与训练数据列表
dataset, file_list = get_dataset(inputs, args)
logger.info("Training Begin")
for epoch in range(args.epochs):
# 以文件为粒度进行shuffle
random.shuffle(file_list)
dataset.set_filelist(file_list)
# 使用train_from_dataset实现多线程并发训练
start_time = time.time()
exe.train_from_dataset(program=fluid.default_main_program(),
dataset=dataset,
fetch_list=[auc_var],
fetch_info=["Epoch {} auc ".format(epoch)],
print_period=100,
debug=False)
end_time = time.time()
logger.info("epoch %d finished, use time=%d\n" %
((epoch), end_time - start_time))
if args.save_model:
model_path = os.path.join(
str(args.model_path), "epoch_" + str(epoch))
if not os.path.isdir(model_path):
os.mkdir(model_path)
fluid.save(fluid.default_main_program(),
os.path.join(model_path, "checkpoint"))
logger.info("Train Success!")
def distribute_train(args):
# 根据环境变量确定当前机器/进程在分布式训练中扮演的角色
# 然后使用 fleet api的 init()方法初始化这个节点
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
# 我们还可以进一步指定分布式的运行模式,通过 DistributeTranspilerConfig进行配置
# 如下,我们设置分布式运行模式为异步(async),同时将参数进行切分,以分配到不同的节点
strategy = DistributeTranspilerConfig()
strategy.sync_mode = False
strategy.runtime_split_send_recv = True
ctr_model = CTR()
inputs = ctr_model.input_data(args)
avg_cost, auc_var = ctr_model.net(inputs, args)
# 配置分布式的optimizer,传入我们指定的strategy,构建program
optimizer = fluid.optimizer.Adam(args.learning_rate)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(avg_cost)
# 根据节点角色,分别运行不同的逻辑
if fleet.is_server():
# 初始化及运行参数服务器节点
fleet.init_server()
fleet.run_server()
elif fleet.is_worker():
# 初始化工作节点
fleet.init_worker()
exe = fluid.Executor(fluid.CPUPlace())
# 初始化含有分布式流程的fleet.startup_program
exe.run(fleet.startup_program)
dataset, file_list = get_dataset(inputs, args)
for epoch in range(args.epochs):
# 以文件为粒度进行shuffle
random.shuffle(file_list)
dataset.set_filelist(file_list)
# 训练节点运行的是经过分布式裁剪的fleet.mian_program
start_time = time.time()
exe.train_from_dataset(program=fleet.main_program,
dataset=dataset,
fetch_list=[auc_var],
fetch_info=["Epoch {} auc ".format(epoch)],
print_period=100,
debug=False)
end_time = time.time()
logger.info("epoch %d finished, use time=%d\n" %
((epoch), end_time - start_time))
# 默认使用0号节点保存模型
if args.save_model and fleet.is_first_worker():
model_path = os.path.join(str(args.model_path), "epoch_" +
str(epoch))
fleet.save_persistables(executor=exe, dirname=model_path)
fleet.stop_worker()
logger.info("Distribute Train Success!")
def train():
args = parse_args()
if not os.path.isdir(args.model_path):
os.mkdir(args.model_path)
print_arguments(args)
if args.is_cloud:
logger.info("run cloud training")
distribute_train(args)
elif args.is_local:
logger.info("run local training")
local_train(args)
if __name__ == '__main__':
import paddle
paddle.enable_static()
utils.check_version()
train()
import sys
import paddle.fluid as fluid
import logging
logging.basicConfig()
logger = logging.getLogger(__name__)
__all__ = ['check_version']
def check_version():
"""
Log error and exit when the installed version of paddlepaddle is
not satisfied.
"""
err = "PaddlePaddle version 1.6.1 or higher is required, " \
"or a suitable develop version is satisfied as well. \n" \
"Please make sure the version is good with your code." \
try:
fluid.require_version('1.6.1')
except Exception as e:
logger.error(err)
sys.exit(1)
# wide&deep
以下是本例的简要目录结构及说明:
```
├── README.md # 文档
├── requirements.txt # 需要的安装包
├── net.py # wide&deep网络文件
├── utils.py # 通用函数
├── args.py # 参数脚本
├── create_data.sh # 生成训练数据脚本
├── data_preparation.py # 数据预处理脚本
├── train.py # 训练文件
├── infer.py # 预测文件
├── train_gpu.sh # gpu训练shell脚本
├── train_cpu.sh # cpu训练shell脚本
├── infer_gpu.sh # gpu预测shell脚本
├── infer_cpu.sh # cpu预测shell脚本
```
models/PaddleRec只是提供了经典推荐算法的Paddle实现,我们已经开源了功能更强大的工具组件[PaddlePaddle/PaddleRec](https://github.com/PaddlePaddle/PaddleRec) 打通了推荐算法+分布式训练全流程,并提供了高级API,在单机和分布式间可以实现无缝切换。后续我们将在[PaddlePaddle/PaddleRec](https://github.com/PaddlePaddle/PaddleRec) Repo中发布新的模型和功能,models/PaddleRec不再更新维护。
## 简介
......@@ -26,164 +8,3 @@
1. 效果上,在Google Play 进行线上A/B实验,wide&deep模型相比高度优化的Wide浅层模型,app下载率+3.9%。相比deep模型也有一定提升。
2. 性能上,通过切分一次请求需要处理的app 的Batch size为更小的size,并利用多线程并行请求达到提高处理效率的目的。单次响应耗时从31ms下降到14ms。
本例在paddlepaddle上实现wide&deep并在开源数据集Census-income Data上验证模型效果,在测试集上的平均acc和auc分别为:
> mean_acc: 0.76195
>
> mean_auc: 0.90577
## 数据下载及预处理
数据地址:
[adult.data](https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data)
[adult.test](https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test)
在create_data.sh脚本文件中添加文件的路径,并运行脚本。
```sh
mkdir train_data
mkdir test_data
mkdir data
train_path="data/adult.data" #原始训练数据
test_path="data/adult.test" #原始测试数据
train_data_path="train_data/train_data.csv" #处理后的训练数据
test_data_path="test_data/train_data.csv" #处理后的测试数据
pip install -r requirements.txt #安装必需包
wget -P data/ https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data
wget -P data/ https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test
python data_preparation.py --train_path ${train_path} \
--test_path ${test_path} \
--train_data_path ${train_data_path}\
--test_data_path ${test_data_path}
```
## 环境
PaddlePaddle 1.7.0
python3.7
## 单机训练
GPU环境
在train_gpu.sh脚本文件中设置好数据路径、参数。
```sh
CUDA_VISIBLE_DEVICES=0 python train.py --epochs 40 \ #训练轮次
--batch_size 40 \ #batch大小
--use_gpu 1 \ #使用gpu训练
--train_data_path 'train_data/train_data.csv' \ #训练数据
--model_dir 'model_dir' #模型保存路径
--hidden1_units 75 \ #deep网络隐层大小
--hidden2_units 50 \
--hidden3_units 25
```
修改脚本的可执行权限并运行
```sh
./train_gpu.sh
```
CPU环境
在train_cpu.sh脚本文件中设置好数据路径、参数。
```sh
python train.py --epochs 40 \ #训练轮次
--batch_size 40 \ #batch大小
--use_gpu 0 \ #使用cpu训练
--train_data_path 'train_data/train_data.csv' \ #训练数据
--model_dir 'model_dir' #模型保存路径
--hidden1_units 75 \ #deep网络隐层大小
--hidden2_units 50 \
--hidden3_units 25
```
修改脚本的可执行权限并运行
```
./train_cpu.sh
```
## 单机预测
GPU环境
在infer_gpu.sh脚本文件中设置好数据路径、参数。
```sh
python infer.py --batch_size 40 \ #batch大小
--use_gpu 0 \ #使用cpu训练
--test_epoch 39 \ #选择那一轮的模型用来预测
--test_data_path 'test_data/test_data.csv' \ #测试数据
--model_dir 'model_dir' \ #模型路径
--hidden1_units 75 \ #隐层单元个数
--hidden2_units 50 \
--hidden3_units 25
```
修改脚本的可执行权限并运行
```sh
./infer_gpu.sh
```
CPU环境
在infer_cpu.sh脚本文件中设置好数据路径、参数。
```sh
python infer.py --batch_size 40 \ #batch大小
--use_gpu 0 \ #使用cpu训练
--test_epoch 39 \ #选择那一轮的模型用来预测
--test_data_path 'test_data/test_data.csv' \ #测试数据
--model_dir 'model_dir' \ #模型路径
--hidden1_units 75 \ #隐层单元个数
--hidden2_units 50 \
--hidden3_units 25
```
修改脚本的可执行权限并运行
```
./infer_cpu.sh
```
## 模型效果
在测试集的效果如下:
```
W0422 11:44:50.891095 1573 device_context.cc:237] Please NOTE: device: 0, CUDA Capability: 70, Driver API Version: 9.2, Runtime API Version: 9.0
W0422 11:44:50.895593 1573 device_context.cc:245] device: 0, cuDNN Version: 7.3.
2020-04-22 11:44:52,236-INFO: batch_id: 0, batch_time: 0.00613s, acc: 0.72500, auc: 0.92790
2020-04-22 11:44:52,242-INFO: batch_id: 1, batch_time: 0.00467s, acc: 0.80000, auc: 0.93356
2020-04-22 11:44:52,247-INFO: batch_id: 2, batch_time: 0.00462s, acc: 0.82500, auc: 0.93372
2020-04-22 11:44:52,252-INFO: batch_id: 3, batch_time: 0.00445s, acc: 0.75000, auc: 0.94198
2020-04-22 11:44:52,257-INFO: batch_id: 4, batch_time: 0.00449s, acc: 0.67500, auc: 0.93222
2020-04-22 11:44:52,262-INFO: batch_id: 5, batch_time: 0.00444s, acc: 0.80000, auc: 0.92254
......
2020-04-22 11:44:54,439-INFO: batch_id: 400, batch_time: 0.00507s, acc: 0.80000, auc: 0.90650
2020-04-22 11:44:54,445-INFO: batch_id: 401, batch_time: 0.00512s, acc: 0.67500, auc: 0.90658
2020-04-22 11:44:54,452-INFO: batch_id: 402, batch_time: 0.00591s, acc: 0.72500, auc: 0.90638
2020-04-22 11:44:54,458-INFO: batch_id: 403, batch_time: 0.00518s, acc: 0.80000, auc: 0.90634
2020-04-22 11:44:54,464-INFO: batch_id: 404, batch_time: 0.00513s, acc: 0.72500, auc: 0.90619
2020-04-22 11:44:54,470-INFO: batch_id: 405, batch_time: 0.00497s, acc: 0.77500, auc: 0.90597
2020-04-22 11:44:54,476-INFO: batch_id: 406, batch_time: 0.00554s, acc: 0.77500, auc: 0.90606
2020-04-22 11:44:54,481-INFO: batch_id: 407, batch_time: 0.00471s, acc: 0.00000, auc: 0.90608
2020-04-22 11:44:54,481-INFO: mean_acc:0.76195, mean_auc:0.90577
```
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import distutils.util
import sys
def parse_args():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--epochs", type=int, default=40, help="epochs")
parser.add_argument("--batch_size", type=int, default=40, help="batch_size")
parser.add_argument('--use_gpu', type=int, default=0, help='whether using gpu')
parser.add_argument('--test_epoch', type=str, default='39',help='test_epoch')
parser.add_argument('--train_path', type=str, default='data/adult.data', help='train_path')
parser.add_argument('--test_path', type=str, default='data/adult.test', help='test_path')
parser.add_argument('--train_data_path', type=str, default='train_data/train_data.csv', help='train_data_path')
parser.add_argument('--test_data_path', type=str, default='test_data/test_data.csv', help='test_data_path')
parser.add_argument('--model_dir', type=str, default='model_dir', help='test_data_path')
parser.add_argument('--hidden1_units', type=int, default=75, help='hidden1_units')
parser.add_argument('--hidden2_units', type=int, default=50, help='hidden2_units')
parser.add_argument('--hidden3_units', type=int, default=25, help='hidden3_units')
args = parser.parse_args()
return args
mkdir train_data
mkdir test_data
mkdir data
train_path="data/adult.data"
test_path="data/adult.test"
train_data_path="train_data/train_data.csv"
test_data_path="test_data/test_data.csv"
pip install -r requirements.txt
wget -P data/ https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data
wget -P data/ https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test
python data_preparation.py --train_path ${train_path} \
--test_path ${test_path} \
--train_data_path ${train_data_path}\
--test_data_path ${test_data_path}
import os
import io
import args
import pandas as pd
from sklearn import preprocessing
def _clean_file(source_path,target_path):
"""makes changes to match the CSV format."""
with io.open(source_path, 'r') as temp_eval_file:
with io.open(target_path, 'w') as eval_file:
for line in temp_eval_file:
line = line.strip()
line = line.replace(', ', ',')
if not line or ',' not in line:
continue
if line[-1] == '.':
line = line[:-1]
line += '\n'
eval_file.write(line)
def build_model_columns(train_data_path, test_data_path):
# The column names are from
# https://www2.1010data.com/documentationcenter/prod/Tutorials/MachineLearningExamples/CensusIncomeDataSet.html
column_names = [
'age', 'workclass', 'fnlwgt', 'education', 'education_num',
'marital_status', 'occupation', 'relationship', 'race', 'gender',
'capital_gain', 'capital_loss', 'hours_per_week', 'native_country',
'income_bracket'
]
# Load the dataset in Pandas
train_df = pd.read_csv(
train_data_path,
delimiter=',',
header=None,
index_col=None,
names=column_names)
test_df = pd.read_csv(
test_data_path,
delimiter=',',
header=None,
index_col=None,
names=column_names)
# First group of tasks according to the paper
#label_columns = ['income_50k', 'marital_stat']
categorical_columns = ['education','marital_status','relationship','workclass','occupation']
for col in categorical_columns:
label_train = preprocessing.LabelEncoder()
train_df[col]= label_train.fit_transform(train_df[col])
label_test = preprocessing.LabelEncoder()
test_df[col]= label_test.fit_transform(test_df[col])
bins = [18, 25, 30, 35, 40, 45, 50, 55, 60, 65]
train_df['age_buckets'] = pd.cut(train_df['age'].values.tolist(), bins,labels=False)
test_df['age_buckets'] = pd.cut(test_df['age'].values.tolist(), bins,labels=False)
base_columns = ['education', 'marital_status', 'relationship', 'workclass', 'occupation', 'age_buckets']
train_df['education_occupation'] = train_df['education'].astype(str) + '_' + train_df['occupation'].astype(str)
test_df['education_occupation'] = test_df['education'].astype(str) + '_' + test_df['occupation'].astype(str)
train_df['age_buckets_education_occupation'] = train_df['age_buckets'].astype(str) + '_' + train_df['education'].astype(str) + '_' + train_df['occupation'].astype(str)
test_df['age_buckets_education_occupation'] = test_df['age_buckets'].astype(str) + '_' + test_df['education'].astype(str) + '_' + test_df['occupation'].astype(str)
crossed_columns = ['education_occupation','age_buckets_education_occupation']
for col in crossed_columns:
label_train = preprocessing.LabelEncoder()
train_df[col]= label_train.fit_transform(train_df[col])
label_test = preprocessing.LabelEncoder()
test_df[col]= label_test.fit_transform(test_df[col])
wide_columns = base_columns + crossed_columns
train_df_temp = pd.get_dummies(train_df[categorical_columns],columns=categorical_columns)
test_df_temp = pd.get_dummies(test_df[categorical_columns], columns=categorical_columns)
train_df = train_df.join(train_df_temp)
test_df = test_df.join(test_df_temp)
deep_columns = list(train_df_temp.columns)+ ['age','education_num','capital_gain','capital_loss','hours_per_week']
train_df['label'] = train_df['income_bracket'].apply(lambda x : 1 if x == '>50K' else 0)
test_df['label'] = test_df['income_bracket'].apply(lambda x : 1 if x == '>50K' else 0)
with io.open('train_data/columns.txt','w') as f:
write_str = str(len(wide_columns)) + '\n' + str(len(deep_columns)) + '\n'
f.write(write_str)
f.close()
with io.open('test_data/columns.txt','w') as f:
write_str = str(len(wide_columns)) + '\n' + str(len(deep_columns)) + '\n'
f.write(write_str)
f.close()
train_df[wide_columns + deep_columns + ['label']].fillna(0).to_csv(train_data_path,index=False)
test_df[wide_columns + deep_columns + ['label']].fillna(0).to_csv(test_data_path,index=False)
def clean_file(train_path, test_path, train_data_path, test_data_path):
_clean_file(train_path, train_data_path)
_clean_file(test_path, test_data_path)
if __name__ == '__main__':
args = args.parse_args()
clean_file(args.train_path, args.test_path, args.train_data_path, args.test_data_path)
build_model_columns(args.train_data_path, args.test_data_path)
\ No newline at end of file
python infer.py --batch_size 40 \
--use_gpu 0 \
--test_epoch 39 \
--test_data_path 'test_data/test_data.csv' \
--model_dir 'model_dir' \
--hidden1_units 75 \
--hidden2_units 50 \
--hidden3_units 25
\ No newline at end of file
CUDA_VISIBLE_DEVICES=0 python infer.py --batch_size 40 \
--use_gpu 1 \
--test_epoch 39 \
--test_data_path 'test_data/test_data.csv' \
--model_dir 'model_dir' \
--hidden1_units 75 \
--hidden2_units 50 \
--hidden3_units 25
\ No newline at end of file
import paddle
import io
import math
import paddle.fluid as fluid
class wide_deep(object):
def wide_part(self, data):
out = fluid.layers.fc(input=data,
size=1,
param_attr=fluid.ParamAttr(initializer=fluid.initializer.TruncatedNormal(loc=0.0, scale=1.0 / math.sqrt(data.shape[1])),
regularizer=fluid.regularizer.L2DecayRegularizer(regularization_coeff=1e-4)),
act=None,
name='wide')
return out
def fc(self, data, hidden_units, active, tag):
output = fluid.layers.fc(input=data,
size=hidden_units,
param_attr=fluid.ParamAttr(initializer=fluid.initializer.TruncatedNormal(loc=0.0, scale=1.0 / math.sqrt(data.shape[1]))),
act=active,
name=tag)
return output
def deep_part(self, data, hidden1_units, hidden2_units, hidden3_units):
l1 = self.fc(data, hidden1_units, 'relu', 'l1')
l2 = self.fc(l1, hidden2_units, 'relu', 'l2')
l3 = self.fc(l2, hidden3_units, 'relu', 'l3')
return l3
def input_data(self):
wide_input = fluid.data(name='wide_input', shape=[None, 8], dtype='float32')
deep_input = fluid.data(name='deep_input', shape=[None, 58], dtype='float32')
label = fluid.data(name='label', shape=[None, 1], dtype='float32')
inputs = [wide_input] + [deep_input] + [label]
return inputs
def model(self, inputs, hidden1_units, hidden2_units, hidden3_units):
wide_output = self.wide_part(inputs[0])
deep_output = self.deep_part(inputs[1], hidden1_units, hidden2_units, hidden3_units)
wide_model = fluid.layers.fc(input=wide_output,
size=1,
param_attr=fluid.ParamAttr(initializer=fluid.initializer.TruncatedNormal(loc=0.0, scale=1.0)),
act=None,
name='w_wide')
deep_model = fluid.layers.fc(input=deep_output,
size=1,
param_attr=fluid.ParamAttr(initializer=fluid.initializer.TruncatedNormal(loc=0.0, scale=1.0)),
act=None,
name='w_deep')
prediction = fluid.layers.elementwise_add(wide_model, deep_model)
pred = fluid.layers.sigmoid(fluid.layers.clip(prediction, min=-15.0, max=15.0), name="prediction")
num_seqs = fluid.layers.create_tensor(dtype='int64')
acc = fluid.layers.accuracy(input=pred, label=fluid.layers.cast(x=inputs[2], dtype='int64'), total=num_seqs)
auc_val, batch_auc, auc_states = fluid.layers.auc(input=pred, label=fluid.layers.cast(x=inputs[2], dtype='int64'))
cost = fluid.layers.sigmoid_cross_entropy_with_logits(x=prediction, label=inputs[2])
avg_cost = fluid.layers.mean(cost)
return avg_cost, acc,auc_val, batch_auc, auc_states
absl-py==0.8.1
aspy.yaml==1.3.0
attrs==19.2.0
audioread==2.1.8
backcall==0.1.0
bleach==3.1.0
cachetools==4.0.0
certifi==2019.9.11
cffi==1.14.0
cfgv==2.0.1
chardet==3.0.4
Click==7.0
cloudpickle==1.2.1
cma==2.7.0
colorlog==4.1.0
cycler==0.10.0
Cython==0.29
decorator==4.4.0
entrypoints==0.3
flake8==3.7.9
Flask==1.1.1
funcsigs==1.0.2
future==0.18.0
google-auth==1.10.0
google-auth-oauthlib==0.4.1
graphviz==0.13
grpcio==1.26.0
gunicorn==20.0.4
gym==0.12.1
h5py==2.9.0
identify==1.4.10
idna==2.8
imageio==2.6.1
imageio-ffmpeg==0.3.0
importlib-metadata==0.23
ipykernel==5.1.0
ipython==7.0.1
ipython-genutils==0.2.0
itsdangerous==1.1.0
jedi==0.15.1
jieba==0.42.1
Jinja2==2.10.1
joblib==0.14.1
jsonschema==3.1.1
jupyter-client==5.3.3
jupyter-core==4.5.0
kiwisolver==1.1.0
librosa==0.7.2
llvmlite==0.31.0
Markdown==3.1.1
MarkupSafe==1.1.1
matplotlib==2.2.3
mccabe==0.6.1
mistune==0.8.4
more-itertools==7.2.0
moviepy==1.0.1
nbconvert==5.3.1
nbformat==4.4.0
networkx==2.4
nltk==3.4.5
nodeenv==1.3.4
notebook==5.7.0
numba==0.48.0
numpy==1.16.4
oauthlib==3.1.0
objgraph==3.4.1
opencv-python==4.1.1.26
paddlehub==1.5.0
paddlepaddle-gpu==1.7.1.post97
pandas==0.23.4
pandocfilters==1.4.2
parl==1.1.2
parso==0.5.1
pexpect==4.7.0
pickleshare==0.7.5
Pillow==6.2.0
pre-commit==1.21.0
prettytable==0.7.2
proglog==0.1.9
prometheus-client==0.5.0
prompt-toolkit==2.0.10
protobuf==3.10.0
ptyprocess==0.6.0
pyarrow==0.13.0
pyasn1==0.4.8
pyasn1-modules==0.2.7
pycodestyle==2.5.0
pycparser==2.19
pyflakes==2.1.1
pyglet==1.4.5
Pygments==2.4.2
pyparsing==2.4.2
pyrsistent==0.15.4
python-dateutil==2.8.0
pytz==2019.3
PyYAML==5.1.2
pyzmq==18.0.1
rarfile==3.1
recordio==0.1.7
requests==2.22.0
requests-oauthlib==1.3.0
resampy==0.2.2
rsa==4.0
scikit-learn==0.20.0
scipy==1.3.0
seaborn==0.10.0
Send2Trash==1.5.0
sentencepiece==0.1.85
simplegeneric==0.8.1
six==1.12.0
sklearn==0.0
SoundFile==0.10.3.post1
tb-nightly==1.15.0a20190801
tb-paddle==0.3.6
tensorboard==2.1.0
tensorboardX==1.8
termcolor==1.1.0
terminado==0.8.2
testpath==0.4.2
toml==0.10.0
tornado==5.1.1
tqdm==4.36.1
traitlets==4.3.3
urllib3==1.25.6
virtualenv==16.7.9
visualdl==1.3.0
wcwidth==0.1.7
webencodings==0.5.1
Werkzeug==0.16.0
xgboost==1.0.1
yapf==0.26.0
zipp==0.6.0
python train.py --epochs 40 \
--batch_size 40 \
--use_gpu 0 \
--train_data_path 'train_data/train_data.csv' \
--model_dir 'model_dir' \
--hidden1_units 75 \
--hidden2_units 50 \
--hidden3_units 25
\ No newline at end of file
CUDA_VISIBLE_DEVICES=0 python train.py --epochs 40 \
--batch_size 40 \
--use_gpu 1 \
--train_data_path 'train_data/train_data.csv' \
--model_dir 'model_dir' \
--hidden1_units 75 \
--hidden2_units 50 \
--hidden3_units 25
import numpy as np
import os
import paddle.fluid as fluid
class Dataset(object):
def _reader_creator(self, file):
def reader():
with open(file, 'r') as f:
for i,line in enumerate(f):
if i == 0:
continue
line = line.strip().split(',')
features = list(map(float, line))
wide_feat = features[0:8]
deep_feat = features[8:58+8]
label = features[-1]
output = []
output.append(wide_feat)
output.append(deep_feat)
output.append([label])
yield output
return reader
def train(self, file):
return self._reader_creator(file)
def test(self, file):
return self._reader_creator(file)
\ No newline at end of file
# xDeepFM for CTR Prediction
models/PaddleRec只是提供了经典推荐算法的Paddle实现,我们已经开源了功能更强大的工具组件[PaddlePaddle/PaddleRec](https://github.com/PaddlePaddle/PaddleRec) 打通了推荐算法+分布式训练全流程,并提供了高级API,在单机和分布式间可以实现无缝切换。后续我们将在[PaddlePaddle/PaddleRec](https://github.com/PaddlePaddle/PaddleRec) Repo中发布新的模型和功能,models/PaddleRec不再更新维护。
## 简介
使用PaddlePaddle复现论文 "xDeepFM: Combining Explicit and Implicit Feature Interactions for Recommender Systems" 。论文[开源代码](https://github.com/Leavingseason/xDeepFM)
## 数据集
demo数据集,在data目录下执行命令,下载数据
```bash
python download.py
```
## 环境
- **要求使用PaddlePaddle 1.6及以上版本或适当的develop版本。**
## 单机训练
```bash
python local_train.py --model_output_dir models
```
训练过程中每隔固定的steps(默认为50)输出当前loss和auc,可以在args.py中调整print_steps。
## 单机预测
```bash
python infer.py --model_output_dir models --test_epoch 10
```
test_epoch设置加载第10轮训练的模型。
注意:最后的 log info是测试数据集的整体 Logloss 和 AUC。
## 单机结果
训练集训练10轮后,测试集的LogLoss : `0.48657` 和 AUC : `0.7308`
## 多机训练
运行命令本地模拟多机场景,默认使用2 X 2模式,即2个pserver,2个trainer的方式组网训练。
**注意:在多机训练中,建议使用Paddle 1.6版本以上或[最新版本](https://www.paddlepaddle.org.cn/documentation/docs/zh/beginners_guide/install/Tables.html#whl-dev)。**
数据下载同上面命令。
```bash
# 该sh不支持Windows
sh cluster_train.sh
```
参数说明:
- train_data_dir: 训练数据目录
- model_output_dir: 模型保存目录
- is_local: 是否单机本地训练(单机模拟多机分布式训练是为0)
- is_sparse: embedding是否使用sparse。如果没有设置,默认是False
- role: 进程角色(pserver或trainer)
- endpoints: 所有pserver地址和端口
- current_endpoint: 当前pserver(role是pserver)端口和地址
- trainers: trainer数量
其他参数见cluster_train.py
预测
```bash
python infer.py --model_output_dir cluster_model --test_epoch 10 --use_gpu=0
```
注意:
- 本地模拟需要关闭代理,e.g. unset http_proxy, unset https_proxy
- 0号trainer保存模型参数
- 每次训练完成后需要手动停止pserver进程,使用以下命令查看pserver进程:
>ps -ef | grep python
- 数据读取使用dataset模式,目前仅支持运行在Linux环境下
import argparse
def parse_args():
parser = argparse.ArgumentParser(description="PaddlePaddle CTR example")
parser.add_argument(
'--train_data_dir',
type=str,
default='data/train_data',
help='The path of train data (default: data/train_data)')
parser.add_argument(
'--test_data_dir',
type=str,
default='data/test_data',
help='The path of test data (default: models)')
parser.add_argument(
'--batch_size',
type=int,
default=100,
help="The size of mini-batch (default:100)")
parser.add_argument(
'--embedding_size',
type=int,
default=10,
help="The size for embedding layer (default:10)")
parser.add_argument(
'--num_epoch',
type=int,
default=10,
help="The number of epochs to train (default: 10)")
parser.add_argument(
'--model_output_dir',
type=str,
required=True,
help='The path for model to store (default: models)')
parser.add_argument(
'--num_thread',
type=int,
default=1,
help='The number of threads (default: 10)')
parser.add_argument('--test_epoch', type=str, default='1')
parser.add_argument(
'--layer_sizes_dnn',
nargs='+',
type=int,
default=[10, 10, 10],
help='The size of each layers')
parser.add_argument(
'--layer_sizes_cin',
nargs='+',
type=int,
default=[10, 10],
help='The size of each layers')
parser.add_argument(
'--act',
type=str,
default='relu',
help='The activation of each layers (default: relu)')
parser.add_argument(
'--lr', type=float, default=1e-1, help='Learning rate (default: 1e-4)')
parser.add_argument(
'--reg', type=float, default=1e-4, help=' (default: 1e-4)')
parser.add_argument('--num_field', type=int, default=39)
parser.add_argument('--num_feat', type=int, default=28651)
parser.add_argument(
'--model_name',
type=str,
default='ctr_xdeepfm_model',
help='The name of model (default: ctr_xdeepfm_model)')
parser.add_argument('--use_gpu', type=int, default=1)
parser.add_argument('--print_steps', type=int, default=50)
parser.add_argument(
'--is_sparse',
action='store_true',
required=False,
default=False,
help='embedding will use sparse or not, (default: False)')
parser.add_argument(
'--enable_ce', action='store_true', help='If set, run the task with continuous evaluation logs.')
return parser.parse_args()
import os
import shutil
import sys
LOCAL_PATH = os.path.dirname(os.path.abspath(__file__))
TOOLS_PATH = os.path.join(LOCAL_PATH, "..", "..", "tools")
sys.path.append(TOOLS_PATH)
from tools import download_file_and_uncompress, download_file
if __name__ == '__main__':
url_train = "https://paddlerec.bj.bcebos.com/xdeepfm%2Ftr"
url_test = "https://paddlerec.bj.bcebos.com/xdeepfm%2Fev"
train_dir = "train_data"
test_dir = "test_data"
if not os.path.exists(train_dir):
os.mkdir(train_dir)
if not os.path.exists(test_dir):
os.mkdir(test_dir)
print("download and extract starting...")
download_file(url_train, "./train_data/tr", True)
download_file(url_test, "./test_data/ev", True)
print("download and extract finished")
print("done")
此差异已折叠。
import sys
import paddle.fluid as fluid
import logging
logging.basicConfig()
logger = logging.getLogger(__name__)
__all__ = ['check_version']
def check_version():
"""
Log error and exit when the installed version of paddlepaddle is
not satisfied.
"""
err = "PaddlePaddle version 1.6 or higher is required, " \
"or a suitable develop version is satisfied as well. \n" \
"Please make sure the version is good with your code." \
try:
fluid.require_version('1.6.0')
except Exception as e:
logger.error(err)
sys.exit(1)
此差异已折叠。
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import distutils.util
def parse_args():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--batch_size", type=int, default=16, help="batch_size")
parser.add_argument('--use_gpu', type=int, default=0, help='whether using gpu')
parser.add_argument('--TRIGRAM_D', type=int, default=1000, help='TRIGRAM_D')
parser.add_argument('--L1_N', type=int, default=300, help='L1_N')
parser.add_argument('--L2_N', type=int, default=300, help='L2_N')
parser.add_argument('--L3_N', type=int, default=128, help='L3_N')
parser.add_argument('--Neg', type=int, default=4, help='Neg')
parser.add_argument('--base_lr', type=float, default=0.01, help='base_lr')
parser.add_argument('--model_dir', type=str, default="model_dir", help='model_dir')
args = parser.parse_args()
return args
此差异已折叠。
python infer.py --use_gpu 0 \
--model_dir 'model_dir'
\ No newline at end of file
CUDA_VISIBLE_DEVICES=0 python infer.py --use_gpu 1 \
--model_dir 'model_dir'
\ No newline at end of file
python dssm.py --use_gpu 0 \
--batch_size 16 \
--TRIGRAM_D 1000 \
--L1_N 300 \
--L2_N 300 \
--L3_N 128 \
--Neg 4 \
--base_lr 0.01 \
--model_dir 'model_dir'
\ No newline at end of file
CUDA_VISIBLE_DEVICES=0 python dssm.py --use_gpu 1 \
--batch_size 16 \
--TRIGRAM_D 1000 \
--L1_N 300 \
--L2_N 300 \
--L3_N 128 \
--Neg 4 \
--base_lr 0.01 \
--model_dir 'model_dir'
\ No newline at end of file
此差异已折叠。
此差异已折叠。
import requests
import sys
import time
import os
lasttime = time.time()
FLUSH_INTERVAL = 0.1
def progress(str, end=False):
global lasttime
if end:
str += "\n"
lasttime = 0
if time.time() - lasttime >= FLUSH_INTERVAL:
sys.stdout.write("\r%s" % str)
lasttime = time.time()
sys.stdout.flush()
def _download_file(url, savepath, print_progress):
r = requests.get(url, stream=True)
total_length = r.headers.get('content-length')
if total_length is None:
with open(savepath, 'wb') as f:
shutil.copyfileobj(r.raw, f)
else:
with open(savepath, 'wb') as f:
dl = 0
total_length = int(total_length)
starttime = time.time()
if print_progress:
print("Downloading %s" % os.path.basename(savepath))
for data in r.iter_content(chunk_size=4096):
dl += len(data)
f.write(data)
if print_progress:
done = int(50 * dl / total_length)
progress("[%-50s] %.2f%%" %
('=' * done, float(100 * dl) / total_length))
if print_progress:
progress("[%-50s] %.2f%%" % ('=' * 50, 100), end=True)
_download_file("https://sr-gnn.bj.bcebos.com/train-item-views.csv",
"./train-item-views.csv", True)
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册