未验证 提交 ef5687d0 编写于 作者: Z zhang wenhui 提交者: GitHub

move ctr/dnn (#3327)

上级 10de9c7f
# DNN for Click-Through Rate prediction
# Click-Through Rate prediction
## Introduction
This model implements the DNN part proposed in the following paper:
```text
@inproceedings{guo2017deepfm,
title={DeepFM: A Factorization-Machine based Neural Network for CTR Prediction},
author={Huifeng Guo, Ruiming Tang, Yunming Ye, Zhenguo Li and Xiuqiang He},
booktitle={the Twenty-Sixth International Joint Conference on Artificial Intelligence (IJCAI)},
pages={1725--1731},
year={2017}
}
```
The DeepFm combines factorization machine and deep neural networks to model
both low order and high order feature interactions. For details of the
factorization machines, please refer to the paper [factorization
machines](https://www.csie.ntu.edu.tw/~b97053/paper/Rendle2010FM.pdf)
## Environment
You should install PaddlePaddle Fluid first, and run:
```shell
pip install -r requirements.txt
```
## Dataset
This example uses Criteo dataset which was used for the [Display Advertising
Challenge](https://www.kaggle.com/c/criteo-display-ad-challenge/)
hosted by Kaggle.
Each row is the features for an ad display and the first column is a label
indicating whether this ad has been clicked or not. There are 39 features in
total. 13 features take integer values and the other 26 features are
categorical features. For the test dataset, the labels are omitted.
Download dataset:
```bash
cd data && ./download.sh && cd ..
```
## Model
This Demo only implement the DNN part of the model described in DeepFM paper.
DeepFM model will be provided in other model.
## Data Preprocessing method
To preprocess the raw dataset, the integer features are clipped then min-max
normalized to [0, 1] and the categorical features are one-hot encoded. The raw
training dataset are splited such that 90% are used for training and the other
10% are used for validation during training. In reader.py, training data is the first
90% of data in train.txt, and validation data is the left.
## Train
The command line options for training can be listed by `python train.py -h`.
### Local Train:
```bash
python train.py \
--train_data_path data/raw/train.txt \
2>&1 | tee train.log
```
After training pass 1 batch 40000, the testing AUC is `0.801178` and the testing
cost is `0.445196`.
### Distributed Train
Run a 2 pserver 2 trainer distribute training on a single machine.
In distributed training setting, training data is splited by trainer_id, so that training data
do not overlap among trainers
```bash
sh cluster_train.sh
```
## Infer
The command line options for infering can be listed by `python infer.py -h`.
To make inference for the test dataset:
```bash
python infer.py \
--model_path models/ \
--data_path data/raw/train.txt
```
Note: The AUC value in the last log info is the total AUC for all test dataset. Here, train.txt is splited inside the reader.py so that validation data does not have overlap with training data.
## Train on Baidu Cloud
1. Please prepare some CPU machines on Baidu Cloud following the steps in [train_on_baidu_cloud](https://github.com/PaddlePaddle/FluidDoc/blob/develop/doc/fluid/user_guides/howto/training/train_on_baidu_cloud_cn.rst)
1. Prepare dataset using preprocess.py.
1. Split the train.txt to trainer_num parts and put them on the machines.
1. Run training with the cluster train using the command in `Distributed Train` above.
## Train on Paddle Cloud
If you want to run this training on PaddleCloud, you can use the script ```cloud.py```, you can change the arguments in ```trian.py``` through environments in PaddleCloud.
\ No newline at end of file
## 简介
我们提供了常见的ctr任务中使用的模型,包括[dnn](https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/ctr/dnn)[deepfm](https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/ctr/deepfm)[xdeepfm](https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/ctr/xdeepfm)[dcn](https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/ctr/dcn)
......@@ -76,4 +76,4 @@ python infer.py \
1. 用上面的 `分布式训练` 中的命令行启动分布式训练任务.
## 在PaddleCloud上运行集群训练
如果你正在使用PaddleCloud做集群训练,你可以使用```cloud.py```这个文件来帮助你提交任务,```trian.py```中所需要的参数可以通过PaddleCloud的环境变量来提交。
\ No newline at end of file
如果你正在使用PaddleCloud做集群训练,你可以使用```cloud.py```这个文件来帮助你提交任务,```trian.py```中所需要的参数可以通过PaddleCloud的环境变量来提交。
# DNN for Click-Through Rate prediction
## Introduction
This model implements the DNN part proposed in the following paper:
```text
@inproceedings{guo2017deepfm,
title={DeepFM: A Factorization-Machine based Neural Network for CTR Prediction},
author={Huifeng Guo, Ruiming Tang, Yunming Ye, Zhenguo Li and Xiuqiang He},
booktitle={the Twenty-Sixth International Joint Conference on Artificial Intelligence (IJCAI)},
pages={1725--1731},
year={2017}
}
```
The DeepFm combines factorization machine and deep neural networks to model
both low order and high order feature interactions. For details of the
factorization machines, please refer to the paper [factorization
machines](https://www.csie.ntu.edu.tw/~b97053/paper/Rendle2010FM.pdf)
## Environment
You should install PaddlePaddle Fluid first, and run:
```shell
pip install -r requirements.txt
```
## Dataset
This example uses Criteo dataset which was used for the [Display Advertising
Challenge](https://www.kaggle.com/c/criteo-display-ad-challenge/)
hosted by Kaggle.
Each row is the features for an ad display and the first column is a label
indicating whether this ad has been clicked or not. There are 39 features in
total. 13 features take integer values and the other 26 features are
categorical features. For the test dataset, the labels are omitted.
Download dataset:
```bash
cd data && ./download.sh && cd ..
```
## Model
This Demo only implement the DNN part of the model described in DeepFM paper.
DeepFM model will be provided in other model.
## Data Preprocessing method
To preprocess the raw dataset, the integer features are clipped then min-max
normalized to [0, 1] and the categorical features are one-hot encoded. The raw
training dataset are splited such that 90% are used for training and the other
10% are used for validation during training. In reader.py, training data is the first
90% of data in train.txt, and validation data is the left.
## Train
The command line options for training can be listed by `python train.py -h`.
### Local Train:
```bash
python train.py \
--train_data_path data/raw/train.txt \
2>&1 | tee train.log
```
After training pass 1 batch 40000, the testing AUC is `0.801178` and the testing
cost is `0.445196`.
### Distributed Train
Run a 2 pserver 2 trainer distribute training on a single machine.
In distributed training setting, training data is splited by trainer_id, so that training data
do not overlap among trainers
```bash
sh cluster_train.sh
```
## Infer
The command line options for infering can be listed by `python infer.py -h`.
To make inference for the test dataset:
```bash
python infer.py \
--model_path models/ \
--data_path data/raw/train.txt
```
Note: The AUC value in the last log info is the total AUC for all test dataset. Here, train.txt is splited inside the reader.py so that validation data does not have overlap with training data.
## Train on Baidu Cloud
1. Please prepare some CPU machines on Baidu Cloud following the steps in [train_on_baidu_cloud](https://github.com/PaddlePaddle/FluidDoc/blob/develop/doc/fluid/user_guides/howto/training/train_on_baidu_cloud_cn.rst)
1. Prepare dataset using preprocess.py.
1. Split the train.txt to trainer_num parts and put them on the machines.
1. Run training with the cluster train using the command in `Distributed Train` above.
## Train on Paddle Cloud
If you want to run this training on PaddleCloud, you can use the script ```cloud.py```, you can change the arguments in ```trian.py``` through environments in PaddleCloud.
......@@ -7,34 +7,39 @@ from kpi import CostKpi
from kpi import DurationKpi
from kpi import AccKpi
each_pass_duration_cpu1_thread1_kpi = DurationKpi('each_pass_duration_cpu1_thread1', 0.08, 0, actived=True)
each_pass_duration_cpu1_thread1_kpi = DurationKpi(
'each_pass_duration_cpu1_thread1', 0.08, 0, actived=True)
train_loss_cpu1_thread1_kpi = CostKpi('train_loss_cpu1_thread1', 0.08, 0)
train_auc_val_cpu1_thread1_kpi = AccKpi('train_auc_val_cpu1_thread1', 0.08, 0)
train_batch_auc_val_cpu1_thread1_kpi = AccKpi('train_batch_auc_val_cpu1_thread1', 0.08, 0)
each_pass_duration_cpu1_thread8_kpi = DurationKpi('each_pass_duration_cpu1_thread8', 0.08, 0, actived=True)
train_batch_auc_val_cpu1_thread1_kpi = AccKpi(
'train_batch_auc_val_cpu1_thread1', 0.08, 0)
each_pass_duration_cpu1_thread8_kpi = DurationKpi(
'each_pass_duration_cpu1_thread8', 0.08, 0, actived=True)
train_loss_cpu1_thread8_kpi = CostKpi('train_loss_cpu1_thread8', 0.08, 0)
train_auc_val_cpu1_thread8_kpi = AccKpi('train_auc_val_cpu1_thread8', 0.08, 0)
train_batch_auc_val_cpu1_thread8_kpi = AccKpi('train_batch_auc_val_cpu1_thread8', 0.08, 0)
each_pass_duration_cpu8_thread8_kpi = DurationKpi('each_pass_duration_cpu8_thread8', 0.08, 0, actived=True)
train_batch_auc_val_cpu1_thread8_kpi = AccKpi(
'train_batch_auc_val_cpu1_thread8', 0.08, 0)
each_pass_duration_cpu8_thread8_kpi = DurationKpi(
'each_pass_duration_cpu8_thread8', 0.08, 0, actived=True)
train_loss_cpu8_thread8_kpi = CostKpi('train_loss_cpu8_thread8', 0.08, 0)
train_auc_val_cpu8_thread8_kpi = AccKpi('train_auc_val_cpu8_thread8', 0.08, 0)
train_batch_auc_val_cpu8_thread8_kpi = AccKpi('train_batch_auc_val_cpu8_thread8', 0.08, 0)
train_batch_auc_val_cpu8_thread8_kpi = AccKpi(
'train_batch_auc_val_cpu8_thread8', 0.08, 0)
tracking_kpis = [
each_pass_duration_cpu1_thread1_kpi,
train_loss_cpu1_thread1_kpi,
train_auc_val_cpu1_thread1_kpi,
train_batch_auc_val_cpu1_thread1_kpi,
each_pass_duration_cpu1_thread8_kpi,
train_loss_cpu1_thread8_kpi,
train_auc_val_cpu1_thread8_kpi,
train_batch_auc_val_cpu1_thread8_kpi,
each_pass_duration_cpu8_thread8_kpi,
train_loss_cpu8_thread8_kpi,
train_auc_val_cpu8_thread8_kpi,
train_batch_auc_val_cpu8_thread8_kpi,
]
each_pass_duration_cpu1_thread1_kpi,
train_loss_cpu1_thread1_kpi,
train_auc_val_cpu1_thread1_kpi,
train_batch_auc_val_cpu1_thread1_kpi,
each_pass_duration_cpu1_thread8_kpi,
train_loss_cpu1_thread8_kpi,
train_auc_val_cpu1_thread8_kpi,
train_batch_auc_val_cpu1_thread8_kpi,
each_pass_duration_cpu8_thread8_kpi,
train_loss_cpu8_thread8_kpi,
train_auc_val_cpu8_thread8_kpi,
train_batch_auc_val_cpu8_thread8_kpi,
]
def parse_log(log):
......
......@@ -13,8 +13,7 @@ import logging
import paddle.fluid.contrib.utils.hdfs_utils as hdfs_utils
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s')
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("cloud")
logger.setLevel(logging.INFO)
......@@ -93,7 +92,13 @@ def download():
local_train_data_dir = os.getenv("TRAIN_DATA_LOCAL", "data")
hdfs_train_data_dir = os.getenv("TRAIN_DATA_HDFS", "")
downloads = hdfs_utils.multi_download(client, hdfs_train_data_dir, local_train_data_dir, 0, 1, multi_processes=1)
downloads = hdfs_utils.multi_download(
client,
hdfs_train_data_dir,
local_train_data_dir,
0,
1,
multi_processes=1)
print(downloads)
for d in downloads:
......@@ -111,7 +116,8 @@ def download():
def env_declar():
logging.info("******** Rename Cluster Env to PaddleFluid Env ********")
if os.environ["TRAINING_ROLE"] == "PSERVER" or os.environ["PADDLE_IS_LOCAL"] == "0":
if os.environ["TRAINING_ROLE"] == "PSERVER" or os.environ[
"PADDLE_IS_LOCAL"] == "0":
os.environ["PADDLE_TRAINING_ROLE"] = os.environ["TRAINING_ROLE"]
os.environ["PADDLE_PSERVER_PORT"] = os.environ["PADDLE_PORT"]
os.environ["PADDLE_PSERVER_IPS"] = os.environ["PADDLE_PSERVERS"]
......@@ -137,7 +143,9 @@ if __name__ == '__main__':
if os.environ["PADDLE_TRAINING_ROLE"] == "PSERVER":
logging.info("PSERVER do not need to download datas")
else:
logging.info("NEED_CUSTOM_DOWNLOAD is True, will download train data with hdfs_utils")
logging.info(
"NEED_CUSTOM_DOWNLOAD is True, will download train data with hdfs_utils"
)
download()
run()
......@@ -4,22 +4,27 @@ import math
dense_feature_dim = 13
def ctr_deepfm_model(factor_size, sparse_feature_dim, dense_feature_dim, sparse_input):
def ctr_deepfm_model(factor_size, sparse_feature_dim, dense_feature_dim,
sparse_input):
def dense_fm_layer(input, emb_dict_size, factor_size, fm_param_attr):
"""
dense_fm_layer
"""
first_order = fluid.layers.fc(input=input, size=1)
emb_table = fluid.layers.create_parameter(shape=[emb_dict_size, factor_size],
dtype='float32', attr=fm_param_attr)
emb_table = fluid.layers.create_parameter(
shape=[emb_dict_size, factor_size],
dtype='float32',
attr=fm_param_attr)
input_mul_factor = fluid.layers.matmul(input, emb_table)
input_mul_factor_square = fluid.layers.square(input_mul_factor)
input_square = fluid.layers.square(input)
factor_square = fluid.layers.square(emb_table)
input_square_mul_factor_square = fluid.layers.matmul(input_square, factor_square)
input_square_mul_factor_square = fluid.layers.matmul(input_square,
factor_square)
second_order = 0.5 * (input_mul_factor_square - input_square_mul_factor_square)
second_order = 0.5 * (
input_mul_factor_square - input_square_mul_factor_square)
return first_order, second_order
def sparse_fm_layer(input, emb_dict_size, factor_size, fm_param_attr):
......@@ -27,44 +32,56 @@ def ctr_deepfm_model(factor_size, sparse_feature_dim, dense_feature_dim, sparse_
sparse_fm_layer
"""
first_embeddings = fluid.layers.embedding(
input=input, dtype='float32', size=[emb_dict_size, 1], is_sparse=True)
first_order = fluid.layers.sequence_pool(input=first_embeddings, pool_type='sum')
input=input,
dtype='float32',
size=[emb_dict_size, 1],
is_sparse=True)
first_order = fluid.layers.sequence_pool(
input=first_embeddings, pool_type='sum')
nonzero_embeddings = fluid.layers.embedding(
input=input, dtype='float32', size=[emb_dict_size, factor_size],
param_attr=fm_param_attr, is_sparse=True)
summed_features_emb = fluid.layers.sequence_pool(input=nonzero_embeddings, pool_type='sum')
input=input,
dtype='float32',
size=[emb_dict_size, factor_size],
param_attr=fm_param_attr,
is_sparse=True)
summed_features_emb = fluid.layers.sequence_pool(
input=nonzero_embeddings, pool_type='sum')
summed_features_emb_square = fluid.layers.square(summed_features_emb)
squared_features_emb = fluid.layers.square(nonzero_embeddings)
squared_sum_features_emb = fluid.layers.sequence_pool(
input=squared_features_emb, pool_type='sum')
second_order = 0.5 * (summed_features_emb_square - squared_sum_features_emb)
second_order = 0.5 * (
summed_features_emb_square - squared_sum_features_emb)
return first_order, second_order
dense_input = fluid.layers.data(name="dense_input", shape=[dense_feature_dim], dtype='float32')
dense_input = fluid.layers.data(
name="dense_input", shape=[dense_feature_dim], dtype='float32')
sparse_input_ids = [
fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, dtype='int64')
for i in range(1, 27)]
fluid.layers.data(
name="C" + str(i), shape=[1], lod_level=1, dtype='int64')
for i in range(1, 27)
]
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
datas = [dense_input] + sparse_input_ids + [label]
py_reader = fluid.layers.create_py_reader_by_data(capacity=64,
feed_list=datas,
name='py_reader',
use_double_buffer=True)
py_reader = fluid.layers.create_py_reader_by_data(
capacity=64, feed_list=datas, name='py_reader', use_double_buffer=True)
words = fluid.layers.read_file(py_reader)
sparse_fm_param_attr = fluid.param_attr.ParamAttr(name="SparseFeatFactors",
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(sparse_feature_dim)))
dense_fm_param_attr = fluid.param_attr.ParamAttr(name="DenseFeatFactors",
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(dense_feature_dim)))
sparse_fm_param_attr = fluid.param_attr.ParamAttr(
name="SparseFeatFactors",
initializer=fluid.initializer.Normal(scale=1 /
math.sqrt(sparse_feature_dim)))
dense_fm_param_attr = fluid.param_attr.ParamAttr(
name="DenseFeatFactors",
initializer=fluid.initializer.Normal(scale=1 /
math.sqrt(dense_feature_dim)))
sparse_fm_first, sparse_fm_second = sparse_fm_layer(
sparse_input, sparse_feature_dim, factor_size, sparse_fm_param_attr)
......@@ -74,26 +91,41 @@ def ctr_deepfm_model(factor_size, sparse_feature_dim, dense_feature_dim, sparse_
def embedding_layer(input):
"""embedding_layer"""
emb = fluid.layers.embedding(
input=input, dtype='float32', size=[sparse_feature_dim, factor_size],
param_attr=sparse_fm_param_attr, is_sparse=True)
input=input,
dtype='float32',
size=[sparse_feature_dim, factor_size],
param_attr=sparse_fm_param_attr,
is_sparse=True)
return fluid.layers.sequence_pool(input=emb, pool_type='average')
sparse_embed_seq = list(map(embedding_layer, sparse_input_ids))
concated = fluid.layers.concat(sparse_embed_seq + [dense_input], axis=1)
fc1 = fluid.layers.fc(input=concated, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(concated.shape[1]))))
fc2 = fluid.layers.fc(input=fc1, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc1.shape[1]))))
fc3 = fluid.layers.fc(input=fc2, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc2.shape[1]))))
predict = fluid.layers.fc(
input=[sparse_fm_first, sparse_fm_second, dense_fm_first, dense_fm_second, fc3],
size=2,
act="softmax",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(scale=1 / math.sqrt(fc3.shape[1]))))
fc1 = fluid.layers.fc(input=concated,
size=400,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(concated.shape[1]))))
fc2 = fluid.layers.fc(input=fc1,
size=400,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc1.shape[1]))))
fc3 = fluid.layers.fc(input=fc2,
size=400,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc2.shape[1]))))
predict = fluid.layers.fc(input=[
sparse_fm_first, sparse_fm_second, dense_fm_first, dense_fm_second, fc3
],
size=2,
act="softmax",
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc3.shape[1]))))
cost = fluid.layers.cross_entropy(input=predict, label=words[-1])
avg_cost = fluid.layers.reduce_sum(cost)
......@@ -105,7 +137,6 @@ def ctr_deepfm_model(factor_size, sparse_feature_dim, dense_feature_dim, sparse_
def ctr_dnn_model(embedding_size, sparse_feature_dim, use_py_reader=True):
def embedding_layer(input):
"""embedding_layer"""
emb = fluid.layers.embedding(
......@@ -115,16 +146,19 @@ def ctr_dnn_model(embedding_size, sparse_feature_dim, use_py_reader=True):
# if you want to set is_distributed to True
is_distributed=False,
size=[sparse_feature_dim, embedding_size],
param_attr=fluid.ParamAttr(name="SparseFeatFactors",
initializer=fluid.initializer.Uniform()))
param_attr=fluid.ParamAttr(
name="SparseFeatFactors",
initializer=fluid.initializer.Uniform()))
return fluid.layers.sequence_pool(input=emb, pool_type='average')
dense_input = fluid.layers.data(
name="dense_input", shape=[dense_feature_dim], dtype='float32')
sparse_input_ids = [
fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, dtype='int64')
for i in range(1, 27)]
fluid.layers.data(
name="C" + str(i), shape=[1], lod_level=1, dtype='int64')
for i in range(1, 27)
]
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
......@@ -132,27 +166,40 @@ def ctr_dnn_model(embedding_size, sparse_feature_dim, use_py_reader=True):
py_reader = None
if use_py_reader:
py_reader = fluid.layers.create_py_reader_by_data(capacity=64,
feed_list=words,
name='py_reader',
use_double_buffer=True)
py_reader = fluid.layers.create_py_reader_by_data(
capacity=64,
feed_list=words,
name='py_reader',
use_double_buffer=True)
words = fluid.layers.read_file(py_reader)
sparse_embed_seq = list(map(embedding_layer, words[1:-1]))
concated = fluid.layers.concat(sparse_embed_seq + words[0:1], axis=1)
fc1 = fluid.layers.fc(input=concated, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(concated.shape[1]))))
fc2 = fluid.layers.fc(input=fc1, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc1.shape[1]))))
fc3 = fluid.layers.fc(input=fc2, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc2.shape[1]))))
predict = fluid.layers.fc(input=fc3, size=2, act='softmax',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc3.shape[1]))))
fc1 = fluid.layers.fc(input=concated,
size=400,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(concated.shape[1]))))
fc2 = fluid.layers.fc(input=fc1,
size=400,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc1.shape[1]))))
fc3 = fluid.layers.fc(input=fc2,
size=400,
act='relu',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc2.shape[1]))))
predict = fluid.layers.fc(input=fc3,
size=2,
act='softmax',
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc3.shape[1]))))
cost = fluid.layers.cross_entropy(input=predict, label=words[-1])
avg_cost = fluid.layers.reduce_sum(cost)
......
......@@ -2,11 +2,16 @@ class Dataset:
def __init__(self):
pass
class CriteoDataset(Dataset):
def __init__(self, sparse_feature_dim):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
self.cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
self.cont_max_ = [
20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50
]
self.cont_diff_ = [
20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50
]
self.hash_dim_ = sparse_feature_dim
# here, training data are lines with line_index < train_idx_
self.train_idx_ = 41256555
......@@ -33,13 +38,17 @@ class CriteoDataset(Dataset):
if features[idx] == '':
dense_feature.append(0.0)
else:
dense_feature.append((float(features[idx]) - self.cont_min_[idx - 1]) / self.cont_diff_[idx - 1])
dense_feature.append((float(features[idx]) -
self.cont_min_[idx - 1]) /
self.cont_diff_[idx - 1])
for idx in self.categorical_range_:
sparse_feature.append([hash(str(idx) + features[idx]) % self.hash_dim_])
sparse_feature.append([
hash(str(idx) + features[idx]) % self.hash_dim_
])
label = [int(features[0])]
yield [dense_feature] + sparse_feature + [label]
return reader
def train(self, file_list, trainer_num, trainer_id):
......
......@@ -14,12 +14,10 @@ import reader
from network_conf import ctr_dnn_model
from multiprocessing import cpu_count
# disable gpu training for this example
os.environ["CUDA_VISIBLE_DEVICES"] = ""
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s')
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
......@@ -85,7 +83,7 @@ def parse_args():
parser.add_argument(
'--role',
type=str,
default='pserver', # trainer or pserver
default='pserver', # trainer or pserver
help='The path for model to store (default: models)')
parser.add_argument(
'--endpoints',
......@@ -117,7 +115,7 @@ def parse_args():
def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var,
trainer_num, trainer_id):
if args.enable_ce:
SEED = 102
train_program.random_seed = SEED
......@@ -163,43 +161,52 @@ def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var,
try:
while True:
loss_val, auc_val, batch_auc_val = pe.run(fetch_list=[loss.name, auc_var.name, batch_auc_var.name])
loss_val, auc_val, batch_auc_val = pe.run(
fetch_list=[loss.name, auc_var.name, batch_auc_var.name])
loss_val = np.mean(loss_val)
auc_val = np.mean(auc_val)
batch_auc_val = np.mean(batch_auc_val)
logger.info("TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}"
.format(pass_id, batch_id, loss_val/args.batch_size, auc_val, batch_auc_val))
logger.info(
"TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}"
.format(pass_id, batch_id, loss_val / args.batch_size,
auc_val, batch_auc_val))
if batch_id % 1000 == 0 and batch_id != 0:
model_dir = args.model_output_dir + '/batch-' + str(batch_id)
model_dir = args.model_output_dir + '/batch-' + str(
batch_id)
if args.trainer_id == 0:
fluid.io.save_persistables(executor=exe, dirname=model_dir,
main_program=fluid.default_main_program())
fluid.io.save_persistables(
executor=exe,
dirname=model_dir,
main_program=fluid.default_main_program())
batch_id += 1
except fluid.core.EOFException:
py_reader.reset()
print("pass_id: %d, pass_time_cost: %f" % (pass_id, time.time() - pass_start))
print("pass_id: %d, pass_time_cost: %f" %
(pass_id, time.time() - pass_start))
total_time += time.time() - pass_start
model_dir = args.model_output_dir + '/pass-' + str(pass_id)
if args.trainer_id == 0:
fluid.io.save_persistables(executor=exe, dirname=model_dir,
main_program=fluid.default_main_program())
fluid.io.save_persistables(
executor=exe,
dirname=model_dir,
main_program=fluid.default_main_program())
# only for ce
if args.enable_ce:
threads_num, cpu_num = get_cards(args)
epoch_idx = args.num_passes
epoch_idx = args.num_passes
print("kpis\teach_pass_duration_cpu%s_thread%s\t%s" %
(cpu_num, threads_num, total_time / epoch_idx))
(cpu_num, threads_num, total_time / epoch_idx))
print("kpis\ttrain_loss_cpu%s_thread%s\t%s" %
(cpu_num, threads_num, loss_val/args.batch_size))
(cpu_num, threads_num, loss_val / args.batch_size))
print("kpis\ttrain_auc_val_cpu%s_thread%s\t%s" %
(cpu_num, threads_num, auc_val))
(cpu_num, threads_num, auc_val))
print("kpis\ttrain_batch_auc_val_cpu%s_thread%s\t%s" %
(cpu_num, threads_num, batch_auc_val))
(cpu_num, threads_num, batch_auc_val))
def train():
args = parse_args()
......@@ -207,7 +214,8 @@ def train():
if not os.path.isdir(args.model_output_dir):
os.mkdir(args.model_output_dir)
loss, auc_var, batch_auc_var, py_reader, _ = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim)
loss, auc_var, batch_auc_var, py_reader, _ = ctr_dnn_model(
args.embedding_size, args.sparse_feature_dim)
optimizer = fluid.optimizer.Adam(learning_rate=1e-4)
optimizer.minimize(loss)
if args.cloud_train:
......@@ -228,23 +236,26 @@ def train():
if args.is_local:
logger.info("run local training")
main_program = fluid.default_main_program()
train_loop(args, main_program, py_reader, loss, auc_var, batch_auc_var, 1, 0)
train_loop(args, main_program, py_reader, loss, auc_var, batch_auc_var,
1, 0)
else:
logger.info("run dist training")
t = fluid.DistributeTranspiler()
t.transpile(args.trainer_id, pservers=args.endpoints, trainers=args.trainers)
t.transpile(
args.trainer_id, pservers=args.endpoints, trainers=args.trainers)
if args.role == "pserver" or args.role == "PSERVER":
logger.info("run pserver")
prog = t.get_pserver_program(args.current_endpoint)
startup = t.get_startup_program(args.current_endpoint, pserver_program=prog)
startup = t.get_startup_program(
args.current_endpoint, pserver_program=prog)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(startup)
exe.run(prog)
elif args.role == "trainer" or args.role == "TRAINER":
logger.info("run trainer")
train_prog = t.get_trainer_program()
train_loop(args, train_prog, py_reader, loss, auc_var, batch_auc_var,
args.trainers, args.trainer_id)
train_loop(args, train_prog, py_reader, loss, auc_var,
batch_auc_var, args.trainers, args.trainer_id)
else:
raise ValueError(
'PADDLE_TRAINING_ROLE environment variable must be either TRAINER or PSERVER'
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册