From ef5687d00cd79cf5b19a04344ebc88ccb29db8f0 Mon Sep 17 00:00:00 2001 From: zhang wenhui Date: Wed, 18 Sep 2019 13:14:13 +0800 Subject: [PATCH] move ctr/dnn (#3327) --- PaddleRec/ctr/README.md | 97 +---------- PaddleRec/ctr/{ => dnn}/.run_ce.sh | 0 PaddleRec/ctr/{ => dnn}/README.cn.md | 2 +- PaddleRec/ctr/dnn/README.md | 96 +++++++++++ PaddleRec/ctr/{ => dnn}/__init__.py | 0 PaddleRec/ctr/{ => dnn}/_ce.py | 45 ++--- PaddleRec/ctr/{ => dnn}/cloud.py | 18 +- PaddleRec/ctr/{ => dnn}/cluster_train.sh | 0 PaddleRec/ctr/{ => dnn}/infer.py | 0 PaddleRec/ctr/dnn/network_conf.py | 210 +++++++++++++++++++++++ PaddleRec/ctr/{ => dnn}/preprocess.py | 0 PaddleRec/ctr/{ => dnn}/reader.py | 19 +- PaddleRec/ctr/{ => dnn}/requirements.txt | 0 PaddleRec/ctr/{ => dnn}/train.py | 63 ++++--- PaddleRec/ctr/network_conf.py | 163 ------------------ 15 files changed, 399 insertions(+), 314 deletions(-) rename PaddleRec/ctr/{ => dnn}/.run_ce.sh (100%) rename PaddleRec/ctr/{ => dnn}/README.cn.md (99%) create mode 100644 PaddleRec/ctr/dnn/README.md rename PaddleRec/ctr/{ => dnn}/__init__.py (100%) rename PaddleRec/ctr/{ => dnn}/_ce.py (58%) rename PaddleRec/ctr/{ => dnn}/cloud.py (90%) rename PaddleRec/ctr/{ => dnn}/cluster_train.sh (100%) rename PaddleRec/ctr/{ => dnn}/infer.py (100%) create mode 100644 PaddleRec/ctr/dnn/network_conf.py rename PaddleRec/ctr/{ => dnn}/preprocess.py (100%) rename PaddleRec/ctr/{ => dnn}/reader.py (78%) rename PaddleRec/ctr/{ => dnn}/requirements.txt (100%) rename PaddleRec/ctr/{ => dnn}/train.py (81%) delete mode 100644 PaddleRec/ctr/network_conf.py diff --git a/PaddleRec/ctr/README.md b/PaddleRec/ctr/README.md index e29e2e1e..fac2d11e 100644 --- a/PaddleRec/ctr/README.md +++ b/PaddleRec/ctr/README.md @@ -1,96 +1,5 @@ -# DNN for Click-Through Rate prediction +# Click-Through Rate prediction -## Introduction -This model implements the DNN part proposed in the following paper: - -```text -@inproceedings{guo2017deepfm, - title={DeepFM: A Factorization-Machine based Neural Network for CTR Prediction}, - author={Huifeng Guo, Ruiming Tang, Yunming Ye, Zhenguo Li and Xiuqiang He}, - booktitle={the Twenty-Sixth International Joint Conference on Artificial Intelligence (IJCAI)}, - pages={1725--1731}, - year={2017} -} -``` - -The DeepFm combines factorization machine and deep neural networks to model -both low order and high order feature interactions. For details of the -factorization machines, please refer to the paper [factorization -machines](https://www.csie.ntu.edu.tw/~b97053/paper/Rendle2010FM.pdf) - -## Environment -You should install PaddlePaddle Fluid first, and run: - -```shell -pip install -r requirements.txt -``` - -## Dataset -This example uses Criteo dataset which was used for the [Display Advertising -Challenge](https://www.kaggle.com/c/criteo-display-ad-challenge/) -hosted by Kaggle. - -Each row is the features for an ad display and the first column is a label -indicating whether this ad has been clicked or not. There are 39 features in -total. 13 features take integer values and the other 26 features are -categorical features. For the test dataset, the labels are omitted. - -Download dataset: -```bash -cd data && ./download.sh && cd .. -``` - -## Model -This Demo only implement the DNN part of the model described in DeepFM paper. -DeepFM model will be provided in other model. - - -## Data Preprocessing method -To preprocess the raw dataset, the integer features are clipped then min-max -normalized to [0, 1] and the categorical features are one-hot encoded. The raw -training dataset are splited such that 90% are used for training and the other -10% are used for validation during training. In reader.py, training data is the first -90% of data in train.txt, and validation data is the left. - -## Train -The command line options for training can be listed by `python train.py -h`. - -### Local Train: -```bash -python train.py \ - --train_data_path data/raw/train.txt \ - 2>&1 | tee train.log -``` - -After training pass 1 batch 40000, the testing AUC is `0.801178` and the testing -cost is `0.445196`. - -### Distributed Train -Run a 2 pserver 2 trainer distribute training on a single machine. -In distributed training setting, training data is splited by trainer_id, so that training data - do not overlap among trainers - -```bash -sh cluster_train.sh -``` - -## Infer -The command line options for infering can be listed by `python infer.py -h`. - -To make inference for the test dataset: -```bash -python infer.py \ - --model_path models/ \ - --data_path data/raw/train.txt -``` -Note: The AUC value in the last log info is the total AUC for all test dataset. Here, train.txt is splited inside the reader.py so that validation data does not have overlap with training data. - -## Train on Baidu Cloud -1. Please prepare some CPU machines on Baidu Cloud following the steps in [train_on_baidu_cloud](https://github.com/PaddlePaddle/FluidDoc/blob/develop/doc/fluid/user_guides/howto/training/train_on_baidu_cloud_cn.rst) -1. Prepare dataset using preprocess.py. -1. Split the train.txt to trainer_num parts and put them on the machines. -1. Run training with the cluster train using the command in `Distributed Train` above. - -## Train on Paddle Cloud -If you want to run this training on PaddleCloud, you can use the script ```cloud.py```, you can change the arguments in ```trian.py``` through environments in PaddleCloud. \ No newline at end of file +## 简介 +我们提供了常见的ctr任务中使用的模型,包括[dnn](https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/ctr/dnn)、[deepfm](https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/ctr/deepfm)、[xdeepfm](https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/ctr/xdeepfm)和[dcn](https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/ctr/dcn)。 diff --git a/PaddleRec/ctr/.run_ce.sh b/PaddleRec/ctr/dnn/.run_ce.sh similarity index 100% rename from PaddleRec/ctr/.run_ce.sh rename to PaddleRec/ctr/dnn/.run_ce.sh diff --git a/PaddleRec/ctr/README.cn.md b/PaddleRec/ctr/dnn/README.cn.md similarity index 99% rename from PaddleRec/ctr/README.cn.md rename to PaddleRec/ctr/dnn/README.cn.md index 05d1653e..47d3b220 100644 --- a/PaddleRec/ctr/README.cn.md +++ b/PaddleRec/ctr/dnn/README.cn.md @@ -76,4 +76,4 @@ python infer.py \ 1. 用上面的 `分布式训练` 中的命令行启动分布式训练任务. ## 在PaddleCloud上运行集群训练 -如果你正在使用PaddleCloud做集群训练,你可以使用```cloud.py```这个文件来帮助你提交任务,```trian.py```中所需要的参数可以通过PaddleCloud的环境变量来提交。 \ No newline at end of file +如果你正在使用PaddleCloud做集群训练,你可以使用```cloud.py```这个文件来帮助你提交任务,```trian.py```中所需要的参数可以通过PaddleCloud的环境变量来提交。 diff --git a/PaddleRec/ctr/dnn/README.md b/PaddleRec/ctr/dnn/README.md new file mode 100644 index 00000000..022fa47c --- /dev/null +++ b/PaddleRec/ctr/dnn/README.md @@ -0,0 +1,96 @@ + +# DNN for Click-Through Rate prediction + +## Introduction +This model implements the DNN part proposed in the following paper: + +```text +@inproceedings{guo2017deepfm, + title={DeepFM: A Factorization-Machine based Neural Network for CTR Prediction}, + author={Huifeng Guo, Ruiming Tang, Yunming Ye, Zhenguo Li and Xiuqiang He}, + booktitle={the Twenty-Sixth International Joint Conference on Artificial Intelligence (IJCAI)}, + pages={1725--1731}, + year={2017} +} +``` + +The DeepFm combines factorization machine and deep neural networks to model +both low order and high order feature interactions. For details of the +factorization machines, please refer to the paper [factorization +machines](https://www.csie.ntu.edu.tw/~b97053/paper/Rendle2010FM.pdf) + +## Environment +You should install PaddlePaddle Fluid first, and run: + +```shell +pip install -r requirements.txt +``` + +## Dataset +This example uses Criteo dataset which was used for the [Display Advertising +Challenge](https://www.kaggle.com/c/criteo-display-ad-challenge/) +hosted by Kaggle. + +Each row is the features for an ad display and the first column is a label +indicating whether this ad has been clicked or not. There are 39 features in +total. 13 features take integer values and the other 26 features are +categorical features. For the test dataset, the labels are omitted. + +Download dataset: +```bash +cd data && ./download.sh && cd .. +``` + +## Model +This Demo only implement the DNN part of the model described in DeepFM paper. +DeepFM model will be provided in other model. + + +## Data Preprocessing method +To preprocess the raw dataset, the integer features are clipped then min-max +normalized to [0, 1] and the categorical features are one-hot encoded. The raw +training dataset are splited such that 90% are used for training and the other +10% are used for validation during training. In reader.py, training data is the first +90% of data in train.txt, and validation data is the left. + +## Train +The command line options for training can be listed by `python train.py -h`. + +### Local Train: +```bash +python train.py \ + --train_data_path data/raw/train.txt \ + 2>&1 | tee train.log +``` + +After training pass 1 batch 40000, the testing AUC is `0.801178` and the testing +cost is `0.445196`. + +### Distributed Train +Run a 2 pserver 2 trainer distribute training on a single machine. +In distributed training setting, training data is splited by trainer_id, so that training data + do not overlap among trainers + +```bash +sh cluster_train.sh +``` + +## Infer +The command line options for infering can be listed by `python infer.py -h`. + +To make inference for the test dataset: +```bash +python infer.py \ + --model_path models/ \ + --data_path data/raw/train.txt +``` +Note: The AUC value in the last log info is the total AUC for all test dataset. Here, train.txt is splited inside the reader.py so that validation data does not have overlap with training data. + +## Train on Baidu Cloud +1. Please prepare some CPU machines on Baidu Cloud following the steps in [train_on_baidu_cloud](https://github.com/PaddlePaddle/FluidDoc/blob/develop/doc/fluid/user_guides/howto/training/train_on_baidu_cloud_cn.rst) +1. Prepare dataset using preprocess.py. +1. Split the train.txt to trainer_num parts and put them on the machines. +1. Run training with the cluster train using the command in `Distributed Train` above. + +## Train on Paddle Cloud +If you want to run this training on PaddleCloud, you can use the script ```cloud.py```, you can change the arguments in ```trian.py``` through environments in PaddleCloud. diff --git a/PaddleRec/ctr/__init__.py b/PaddleRec/ctr/dnn/__init__.py similarity index 100% rename from PaddleRec/ctr/__init__.py rename to PaddleRec/ctr/dnn/__init__.py diff --git a/PaddleRec/ctr/_ce.py b/PaddleRec/ctr/dnn/_ce.py similarity index 58% rename from PaddleRec/ctr/_ce.py rename to PaddleRec/ctr/dnn/_ce.py index 91867d03..7f8860e8 100644 --- a/PaddleRec/ctr/_ce.py +++ b/PaddleRec/ctr/dnn/_ce.py @@ -7,34 +7,39 @@ from kpi import CostKpi from kpi import DurationKpi from kpi import AccKpi - -each_pass_duration_cpu1_thread1_kpi = DurationKpi('each_pass_duration_cpu1_thread1', 0.08, 0, actived=True) +each_pass_duration_cpu1_thread1_kpi = DurationKpi( + 'each_pass_duration_cpu1_thread1', 0.08, 0, actived=True) train_loss_cpu1_thread1_kpi = CostKpi('train_loss_cpu1_thread1', 0.08, 0) train_auc_val_cpu1_thread1_kpi = AccKpi('train_auc_val_cpu1_thread1', 0.08, 0) -train_batch_auc_val_cpu1_thread1_kpi = AccKpi('train_batch_auc_val_cpu1_thread1', 0.08, 0) -each_pass_duration_cpu1_thread8_kpi = DurationKpi('each_pass_duration_cpu1_thread8', 0.08, 0, actived=True) +train_batch_auc_val_cpu1_thread1_kpi = AccKpi( + 'train_batch_auc_val_cpu1_thread1', 0.08, 0) +each_pass_duration_cpu1_thread8_kpi = DurationKpi( + 'each_pass_duration_cpu1_thread8', 0.08, 0, actived=True) train_loss_cpu1_thread8_kpi = CostKpi('train_loss_cpu1_thread8', 0.08, 0) train_auc_val_cpu1_thread8_kpi = AccKpi('train_auc_val_cpu1_thread8', 0.08, 0) -train_batch_auc_val_cpu1_thread8_kpi = AccKpi('train_batch_auc_val_cpu1_thread8', 0.08, 0) -each_pass_duration_cpu8_thread8_kpi = DurationKpi('each_pass_duration_cpu8_thread8', 0.08, 0, actived=True) +train_batch_auc_val_cpu1_thread8_kpi = AccKpi( + 'train_batch_auc_val_cpu1_thread8', 0.08, 0) +each_pass_duration_cpu8_thread8_kpi = DurationKpi( + 'each_pass_duration_cpu8_thread8', 0.08, 0, actived=True) train_loss_cpu8_thread8_kpi = CostKpi('train_loss_cpu8_thread8', 0.08, 0) train_auc_val_cpu8_thread8_kpi = AccKpi('train_auc_val_cpu8_thread8', 0.08, 0) -train_batch_auc_val_cpu8_thread8_kpi = AccKpi('train_batch_auc_val_cpu8_thread8', 0.08, 0) +train_batch_auc_val_cpu8_thread8_kpi = AccKpi( + 'train_batch_auc_val_cpu8_thread8', 0.08, 0) tracking_kpis = [ - each_pass_duration_cpu1_thread1_kpi, - train_loss_cpu1_thread1_kpi, - train_auc_val_cpu1_thread1_kpi, - train_batch_auc_val_cpu1_thread1_kpi, - each_pass_duration_cpu1_thread8_kpi, - train_loss_cpu1_thread8_kpi, - train_auc_val_cpu1_thread8_kpi, - train_batch_auc_val_cpu1_thread8_kpi, - each_pass_duration_cpu8_thread8_kpi, - train_loss_cpu8_thread8_kpi, - train_auc_val_cpu8_thread8_kpi, - train_batch_auc_val_cpu8_thread8_kpi, - ] + each_pass_duration_cpu1_thread1_kpi, + train_loss_cpu1_thread1_kpi, + train_auc_val_cpu1_thread1_kpi, + train_batch_auc_val_cpu1_thread1_kpi, + each_pass_duration_cpu1_thread8_kpi, + train_loss_cpu1_thread8_kpi, + train_auc_val_cpu1_thread8_kpi, + train_batch_auc_val_cpu1_thread8_kpi, + each_pass_duration_cpu8_thread8_kpi, + train_loss_cpu8_thread8_kpi, + train_auc_val_cpu8_thread8_kpi, + train_batch_auc_val_cpu8_thread8_kpi, +] def parse_log(log): diff --git a/PaddleRec/ctr/cloud.py b/PaddleRec/ctr/dnn/cloud.py similarity index 90% rename from PaddleRec/ctr/cloud.py rename to PaddleRec/ctr/dnn/cloud.py index c5388c6a..fb14a1b8 100644 --- a/PaddleRec/ctr/cloud.py +++ b/PaddleRec/ctr/dnn/cloud.py @@ -13,8 +13,7 @@ import logging import paddle.fluid.contrib.utils.hdfs_utils as hdfs_utils -logging.basicConfig( - format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("cloud") logger.setLevel(logging.INFO) @@ -93,7 +92,13 @@ def download(): local_train_data_dir = os.getenv("TRAIN_DATA_LOCAL", "data") hdfs_train_data_dir = os.getenv("TRAIN_DATA_HDFS", "") - downloads = hdfs_utils.multi_download(client, hdfs_train_data_dir, local_train_data_dir, 0, 1, multi_processes=1) + downloads = hdfs_utils.multi_download( + client, + hdfs_train_data_dir, + local_train_data_dir, + 0, + 1, + multi_processes=1) print(downloads) for d in downloads: @@ -111,7 +116,8 @@ def download(): def env_declar(): logging.info("******** Rename Cluster Env to PaddleFluid Env ********") - if os.environ["TRAINING_ROLE"] == "PSERVER" or os.environ["PADDLE_IS_LOCAL"] == "0": + if os.environ["TRAINING_ROLE"] == "PSERVER" or os.environ[ + "PADDLE_IS_LOCAL"] == "0": os.environ["PADDLE_TRAINING_ROLE"] = os.environ["TRAINING_ROLE"] os.environ["PADDLE_PSERVER_PORT"] = os.environ["PADDLE_PORT"] os.environ["PADDLE_PSERVER_IPS"] = os.environ["PADDLE_PSERVERS"] @@ -137,7 +143,9 @@ if __name__ == '__main__': if os.environ["PADDLE_TRAINING_ROLE"] == "PSERVER": logging.info("PSERVER do not need to download datas") else: - logging.info("NEED_CUSTOM_DOWNLOAD is True, will download train data with hdfs_utils") + logging.info( + "NEED_CUSTOM_DOWNLOAD is True, will download train data with hdfs_utils" + ) download() run() diff --git a/PaddleRec/ctr/cluster_train.sh b/PaddleRec/ctr/dnn/cluster_train.sh similarity index 100% rename from PaddleRec/ctr/cluster_train.sh rename to PaddleRec/ctr/dnn/cluster_train.sh diff --git a/PaddleRec/ctr/infer.py b/PaddleRec/ctr/dnn/infer.py similarity index 100% rename from PaddleRec/ctr/infer.py rename to PaddleRec/ctr/dnn/infer.py diff --git a/PaddleRec/ctr/dnn/network_conf.py b/PaddleRec/ctr/dnn/network_conf.py new file mode 100644 index 00000000..bb23d484 --- /dev/null +++ b/PaddleRec/ctr/dnn/network_conf.py @@ -0,0 +1,210 @@ +import paddle.fluid as fluid +import math + +dense_feature_dim = 13 + + +def ctr_deepfm_model(factor_size, sparse_feature_dim, dense_feature_dim, + sparse_input): + def dense_fm_layer(input, emb_dict_size, factor_size, fm_param_attr): + """ + dense_fm_layer + """ + first_order = fluid.layers.fc(input=input, size=1) + emb_table = fluid.layers.create_parameter( + shape=[emb_dict_size, factor_size], + dtype='float32', + attr=fm_param_attr) + + input_mul_factor = fluid.layers.matmul(input, emb_table) + input_mul_factor_square = fluid.layers.square(input_mul_factor) + input_square = fluid.layers.square(input) + factor_square = fluid.layers.square(emb_table) + input_square_mul_factor_square = fluid.layers.matmul(input_square, + factor_square) + + second_order = 0.5 * ( + input_mul_factor_square - input_square_mul_factor_square) + return first_order, second_order + + def sparse_fm_layer(input, emb_dict_size, factor_size, fm_param_attr): + """ + sparse_fm_layer + """ + first_embeddings = fluid.layers.embedding( + input=input, + dtype='float32', + size=[emb_dict_size, 1], + is_sparse=True) + first_order = fluid.layers.sequence_pool( + input=first_embeddings, pool_type='sum') + + nonzero_embeddings = fluid.layers.embedding( + input=input, + dtype='float32', + size=[emb_dict_size, factor_size], + param_attr=fm_param_attr, + is_sparse=True) + summed_features_emb = fluid.layers.sequence_pool( + input=nonzero_embeddings, pool_type='sum') + summed_features_emb_square = fluid.layers.square(summed_features_emb) + + squared_features_emb = fluid.layers.square(nonzero_embeddings) + squared_sum_features_emb = fluid.layers.sequence_pool( + input=squared_features_emb, pool_type='sum') + + second_order = 0.5 * ( + summed_features_emb_square - squared_sum_features_emb) + return first_order, second_order + + dense_input = fluid.layers.data( + name="dense_input", shape=[dense_feature_dim], dtype='float32') + + sparse_input_ids = [ + fluid.layers.data( + name="C" + str(i), shape=[1], lod_level=1, dtype='int64') + for i in range(1, 27) + ] + + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + datas = [dense_input] + sparse_input_ids + [label] + + py_reader = fluid.layers.create_py_reader_by_data( + capacity=64, feed_list=datas, name='py_reader', use_double_buffer=True) + words = fluid.layers.read_file(py_reader) + + sparse_fm_param_attr = fluid.param_attr.ParamAttr( + name="SparseFeatFactors", + initializer=fluid.initializer.Normal(scale=1 / + math.sqrt(sparse_feature_dim))) + dense_fm_param_attr = fluid.param_attr.ParamAttr( + name="DenseFeatFactors", + initializer=fluid.initializer.Normal(scale=1 / + math.sqrt(dense_feature_dim))) + + sparse_fm_first, sparse_fm_second = sparse_fm_layer( + sparse_input, sparse_feature_dim, factor_size, sparse_fm_param_attr) + dense_fm_first, dense_fm_second = dense_fm_layer( + dense_input, dense_feature_dim, factor_size, dense_fm_param_attr) + + def embedding_layer(input): + """embedding_layer""" + emb = fluid.layers.embedding( + input=input, + dtype='float32', + size=[sparse_feature_dim, factor_size], + param_attr=sparse_fm_param_attr, + is_sparse=True) + return fluid.layers.sequence_pool(input=emb, pool_type='average') + + sparse_embed_seq = list(map(embedding_layer, sparse_input_ids)) + concated = fluid.layers.concat(sparse_embed_seq + [dense_input], axis=1) + fc1 = fluid.layers.fc(input=concated, + size=400, + act='relu', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(concated.shape[1])))) + fc2 = fluid.layers.fc(input=fc1, + size=400, + act='relu', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc1.shape[1])))) + fc3 = fluid.layers.fc(input=fc2, + size=400, + act='relu', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc2.shape[1])))) + predict = fluid.layers.fc(input=[ + sparse_fm_first, sparse_fm_second, dense_fm_first, dense_fm_second, fc3 + ], + size=2, + act="softmax", + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc3.shape[1])))) + + cost = fluid.layers.cross_entropy(input=predict, label=words[-1]) + avg_cost = fluid.layers.reduce_sum(cost) + accuracy = fluid.layers.accuracy(input=predict, label=words[-1]) + auc_var, batch_auc_var, auc_states = \ + fluid.layers.auc(input=predict, label=words[-1], num_thresholds=2 ** 12, slide_steps=20) + + return avg_cost, auc_var, batch_auc_var, py_reader + + +def ctr_dnn_model(embedding_size, sparse_feature_dim, use_py_reader=True): + def embedding_layer(input): + """embedding_layer""" + emb = fluid.layers.embedding( + input=input, + is_sparse=True, + # you need to patch https://github.com/PaddlePaddle/Paddle/pull/14190 + # if you want to set is_distributed to True + is_distributed=False, + size=[sparse_feature_dim, embedding_size], + param_attr=fluid.ParamAttr( + name="SparseFeatFactors", + initializer=fluid.initializer.Uniform())) + return fluid.layers.sequence_pool(input=emb, pool_type='average') + + dense_input = fluid.layers.data( + name="dense_input", shape=[dense_feature_dim], dtype='float32') + + sparse_input_ids = [ + fluid.layers.data( + name="C" + str(i), shape=[1], lod_level=1, dtype='int64') + for i in range(1, 27) + ] + + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + words = [dense_input] + sparse_input_ids + [label] + + py_reader = None + if use_py_reader: + py_reader = fluid.layers.create_py_reader_by_data( + capacity=64, + feed_list=words, + name='py_reader', + use_double_buffer=True) + words = fluid.layers.read_file(py_reader) + + sparse_embed_seq = list(map(embedding_layer, words[1:-1])) + concated = fluid.layers.concat(sparse_embed_seq + words[0:1], axis=1) + + fc1 = fluid.layers.fc(input=concated, + size=400, + act='relu', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(concated.shape[1])))) + fc2 = fluid.layers.fc(input=fc1, + size=400, + act='relu', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc1.shape[1])))) + fc3 = fluid.layers.fc(input=fc2, + size=400, + act='relu', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc2.shape[1])))) + predict = fluid.layers.fc(input=fc3, + size=2, + act='softmax', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc3.shape[1])))) + + cost = fluid.layers.cross_entropy(input=predict, label=words[-1]) + avg_cost = fluid.layers.reduce_sum(cost) + accuracy = fluid.layers.accuracy(input=predict, label=words[-1]) + auc_var, batch_auc_var, auc_states = \ + fluid.layers.auc(input=predict, label=words[-1], num_thresholds=2 ** 12, slide_steps=20) + + return avg_cost, auc_var, batch_auc_var, py_reader, words diff --git a/PaddleRec/ctr/preprocess.py b/PaddleRec/ctr/dnn/preprocess.py similarity index 100% rename from PaddleRec/ctr/preprocess.py rename to PaddleRec/ctr/dnn/preprocess.py diff --git a/PaddleRec/ctr/reader.py b/PaddleRec/ctr/dnn/reader.py similarity index 78% rename from PaddleRec/ctr/reader.py rename to PaddleRec/ctr/dnn/reader.py index e6bcc11d..d6eb1a77 100644 --- a/PaddleRec/ctr/reader.py +++ b/PaddleRec/ctr/dnn/reader.py @@ -2,11 +2,16 @@ class Dataset: def __init__(self): pass + class CriteoDataset(Dataset): def __init__(self, sparse_feature_dim): self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] - self.cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50] - self.cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50] + self.cont_max_ = [ + 20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50 + ] + self.cont_diff_ = [ + 20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50 + ] self.hash_dim_ = sparse_feature_dim # here, training data are lines with line_index < train_idx_ self.train_idx_ = 41256555 @@ -33,13 +38,17 @@ class CriteoDataset(Dataset): if features[idx] == '': dense_feature.append(0.0) else: - dense_feature.append((float(features[idx]) - self.cont_min_[idx - 1]) / self.cont_diff_[idx - 1]) + dense_feature.append((float(features[idx]) - + self.cont_min_[idx - 1]) / + self.cont_diff_[idx - 1]) for idx in self.categorical_range_: - sparse_feature.append([hash(str(idx) + features[idx]) % self.hash_dim_]) + sparse_feature.append([ + hash(str(idx) + features[idx]) % self.hash_dim_ + ]) label = [int(features[0])] yield [dense_feature] + sparse_feature + [label] - + return reader def train(self, file_list, trainer_num, trainer_id): diff --git a/PaddleRec/ctr/requirements.txt b/PaddleRec/ctr/dnn/requirements.txt similarity index 100% rename from PaddleRec/ctr/requirements.txt rename to PaddleRec/ctr/dnn/requirements.txt diff --git a/PaddleRec/ctr/train.py b/PaddleRec/ctr/dnn/train.py similarity index 81% rename from PaddleRec/ctr/train.py rename to PaddleRec/ctr/dnn/train.py index 58fb988c..69e51b9d 100644 --- a/PaddleRec/ctr/train.py +++ b/PaddleRec/ctr/dnn/train.py @@ -14,12 +14,10 @@ import reader from network_conf import ctr_dnn_model from multiprocessing import cpu_count - # disable gpu training for this example os.environ["CUDA_VISIBLE_DEVICES"] = "" -logging.basicConfig( - format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("fluid") logger.setLevel(logging.INFO) @@ -85,7 +83,7 @@ def parse_args(): parser.add_argument( '--role', type=str, - default='pserver', # trainer or pserver + default='pserver', # trainer or pserver help='The path for model to store (default: models)') parser.add_argument( '--endpoints', @@ -117,7 +115,7 @@ def parse_args(): def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var, trainer_num, trainer_id): - + if args.enable_ce: SEED = 102 train_program.random_seed = SEED @@ -163,43 +161,52 @@ def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var, try: while True: - loss_val, auc_val, batch_auc_val = pe.run(fetch_list=[loss.name, auc_var.name, batch_auc_var.name]) + loss_val, auc_val, batch_auc_val = pe.run( + fetch_list=[loss.name, auc_var.name, batch_auc_var.name]) loss_val = np.mean(loss_val) auc_val = np.mean(auc_val) batch_auc_val = np.mean(batch_auc_val) - logger.info("TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}" - .format(pass_id, batch_id, loss_val/args.batch_size, auc_val, batch_auc_val)) + logger.info( + "TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}" + .format(pass_id, batch_id, loss_val / args.batch_size, + auc_val, batch_auc_val)) if batch_id % 1000 == 0 and batch_id != 0: - model_dir = args.model_output_dir + '/batch-' + str(batch_id) + model_dir = args.model_output_dir + '/batch-' + str( + batch_id) if args.trainer_id == 0: - fluid.io.save_persistables(executor=exe, dirname=model_dir, - main_program=fluid.default_main_program()) + fluid.io.save_persistables( + executor=exe, + dirname=model_dir, + main_program=fluid.default_main_program()) batch_id += 1 except fluid.core.EOFException: py_reader.reset() - print("pass_id: %d, pass_time_cost: %f" % (pass_id, time.time() - pass_start)) + print("pass_id: %d, pass_time_cost: %f" % + (pass_id, time.time() - pass_start)) total_time += time.time() - pass_start model_dir = args.model_output_dir + '/pass-' + str(pass_id) if args.trainer_id == 0: - fluid.io.save_persistables(executor=exe, dirname=model_dir, - main_program=fluid.default_main_program()) + fluid.io.save_persistables( + executor=exe, + dirname=model_dir, + main_program=fluid.default_main_program()) # only for ce if args.enable_ce: threads_num, cpu_num = get_cards(args) - epoch_idx = args.num_passes + epoch_idx = args.num_passes print("kpis\teach_pass_duration_cpu%s_thread%s\t%s" % - (cpu_num, threads_num, total_time / epoch_idx)) + (cpu_num, threads_num, total_time / epoch_idx)) print("kpis\ttrain_loss_cpu%s_thread%s\t%s" % - (cpu_num, threads_num, loss_val/args.batch_size)) + (cpu_num, threads_num, loss_val / args.batch_size)) print("kpis\ttrain_auc_val_cpu%s_thread%s\t%s" % - (cpu_num, threads_num, auc_val)) + (cpu_num, threads_num, auc_val)) print("kpis\ttrain_batch_auc_val_cpu%s_thread%s\t%s" % - (cpu_num, threads_num, batch_auc_val)) - + (cpu_num, threads_num, batch_auc_val)) + def train(): args = parse_args() @@ -207,7 +214,8 @@ def train(): if not os.path.isdir(args.model_output_dir): os.mkdir(args.model_output_dir) - loss, auc_var, batch_auc_var, py_reader, _ = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim) + loss, auc_var, batch_auc_var, py_reader, _ = ctr_dnn_model( + args.embedding_size, args.sparse_feature_dim) optimizer = fluid.optimizer.Adam(learning_rate=1e-4) optimizer.minimize(loss) if args.cloud_train: @@ -228,23 +236,26 @@ def train(): if args.is_local: logger.info("run local training") main_program = fluid.default_main_program() - train_loop(args, main_program, py_reader, loss, auc_var, batch_auc_var, 1, 0) + train_loop(args, main_program, py_reader, loss, auc_var, batch_auc_var, + 1, 0) else: logger.info("run dist training") t = fluid.DistributeTranspiler() - t.transpile(args.trainer_id, pservers=args.endpoints, trainers=args.trainers) + t.transpile( + args.trainer_id, pservers=args.endpoints, trainers=args.trainers) if args.role == "pserver" or args.role == "PSERVER": logger.info("run pserver") prog = t.get_pserver_program(args.current_endpoint) - startup = t.get_startup_program(args.current_endpoint, pserver_program=prog) + startup = t.get_startup_program( + args.current_endpoint, pserver_program=prog) exe = fluid.Executor(fluid.CPUPlace()) exe.run(startup) exe.run(prog) elif args.role == "trainer" or args.role == "TRAINER": logger.info("run trainer") train_prog = t.get_trainer_program() - train_loop(args, train_prog, py_reader, loss, auc_var, batch_auc_var, - args.trainers, args.trainer_id) + train_loop(args, train_prog, py_reader, loss, auc_var, + batch_auc_var, args.trainers, args.trainer_id) else: raise ValueError( 'PADDLE_TRAINING_ROLE environment variable must be either TRAINER or PSERVER' diff --git a/PaddleRec/ctr/network_conf.py b/PaddleRec/ctr/network_conf.py deleted file mode 100644 index a0de1ae2..00000000 --- a/PaddleRec/ctr/network_conf.py +++ /dev/null @@ -1,163 +0,0 @@ -import paddle.fluid as fluid -import math - -dense_feature_dim = 13 - - -def ctr_deepfm_model(factor_size, sparse_feature_dim, dense_feature_dim, sparse_input): - def dense_fm_layer(input, emb_dict_size, factor_size, fm_param_attr): - """ - dense_fm_layer - """ - first_order = fluid.layers.fc(input=input, size=1) - emb_table = fluid.layers.create_parameter(shape=[emb_dict_size, factor_size], - dtype='float32', attr=fm_param_attr) - - input_mul_factor = fluid.layers.matmul(input, emb_table) - input_mul_factor_square = fluid.layers.square(input_mul_factor) - input_square = fluid.layers.square(input) - factor_square = fluid.layers.square(emb_table) - input_square_mul_factor_square = fluid.layers.matmul(input_square, factor_square) - - second_order = 0.5 * (input_mul_factor_square - input_square_mul_factor_square) - return first_order, second_order - - def sparse_fm_layer(input, emb_dict_size, factor_size, fm_param_attr): - """ - sparse_fm_layer - """ - first_embeddings = fluid.layers.embedding( - input=input, dtype='float32', size=[emb_dict_size, 1], is_sparse=True) - first_order = fluid.layers.sequence_pool(input=first_embeddings, pool_type='sum') - - nonzero_embeddings = fluid.layers.embedding( - input=input, dtype='float32', size=[emb_dict_size, factor_size], - param_attr=fm_param_attr, is_sparse=True) - summed_features_emb = fluid.layers.sequence_pool(input=nonzero_embeddings, pool_type='sum') - summed_features_emb_square = fluid.layers.square(summed_features_emb) - - squared_features_emb = fluid.layers.square(nonzero_embeddings) - squared_sum_features_emb = fluid.layers.sequence_pool( - input=squared_features_emb, pool_type='sum') - - second_order = 0.5 * (summed_features_emb_square - squared_sum_features_emb) - return first_order, second_order - - dense_input = fluid.layers.data(name="dense_input", shape=[dense_feature_dim], dtype='float32') - - sparse_input_ids = [ - fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, dtype='int64') - for i in range(1, 27)] - - label = fluid.layers.data(name='label', shape=[1], dtype='int64') - - datas = [dense_input] + sparse_input_ids + [label] - - py_reader = fluid.layers.create_py_reader_by_data(capacity=64, - feed_list=datas, - name='py_reader', - use_double_buffer=True) - words = fluid.layers.read_file(py_reader) - - sparse_fm_param_attr = fluid.param_attr.ParamAttr(name="SparseFeatFactors", - initializer=fluid.initializer.Normal( - scale=1 / math.sqrt(sparse_feature_dim))) - dense_fm_param_attr = fluid.param_attr.ParamAttr(name="DenseFeatFactors", - initializer=fluid.initializer.Normal( - scale=1 / math.sqrt(dense_feature_dim))) - - sparse_fm_first, sparse_fm_second = sparse_fm_layer( - sparse_input, sparse_feature_dim, factor_size, sparse_fm_param_attr) - dense_fm_first, dense_fm_second = dense_fm_layer( - dense_input, dense_feature_dim, factor_size, dense_fm_param_attr) - - def embedding_layer(input): - """embedding_layer""" - emb = fluid.layers.embedding( - input=input, dtype='float32', size=[sparse_feature_dim, factor_size], - param_attr=sparse_fm_param_attr, is_sparse=True) - return fluid.layers.sequence_pool(input=emb, pool_type='average') - - sparse_embed_seq = list(map(embedding_layer, sparse_input_ids)) - concated = fluid.layers.concat(sparse_embed_seq + [dense_input], axis=1) - fc1 = fluid.layers.fc(input=concated, size=400, act='relu', - param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( - scale=1 / math.sqrt(concated.shape[1])))) - fc2 = fluid.layers.fc(input=fc1, size=400, act='relu', - param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( - scale=1 / math.sqrt(fc1.shape[1])))) - fc3 = fluid.layers.fc(input=fc2, size=400, act='relu', - param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( - scale=1 / math.sqrt(fc2.shape[1])))) - predict = fluid.layers.fc( - input=[sparse_fm_first, sparse_fm_second, dense_fm_first, dense_fm_second, fc3], - size=2, - act="softmax", - param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(scale=1 / math.sqrt(fc3.shape[1])))) - - cost = fluid.layers.cross_entropy(input=predict, label=words[-1]) - avg_cost = fluid.layers.reduce_sum(cost) - accuracy = fluid.layers.accuracy(input=predict, label=words[-1]) - auc_var, batch_auc_var, auc_states = \ - fluid.layers.auc(input=predict, label=words[-1], num_thresholds=2 ** 12, slide_steps=20) - - return avg_cost, auc_var, batch_auc_var, py_reader - - -def ctr_dnn_model(embedding_size, sparse_feature_dim, use_py_reader=True): - - def embedding_layer(input): - """embedding_layer""" - emb = fluid.layers.embedding( - input=input, - is_sparse=True, - # you need to patch https://github.com/PaddlePaddle/Paddle/pull/14190 - # if you want to set is_distributed to True - is_distributed=False, - size=[sparse_feature_dim, embedding_size], - param_attr=fluid.ParamAttr(name="SparseFeatFactors", - initializer=fluid.initializer.Uniform())) - return fluid.layers.sequence_pool(input=emb, pool_type='average') - - dense_input = fluid.layers.data( - name="dense_input", shape=[dense_feature_dim], dtype='float32') - - sparse_input_ids = [ - fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, dtype='int64') - for i in range(1, 27)] - - label = fluid.layers.data(name='label', shape=[1], dtype='int64') - - words = [dense_input] + sparse_input_ids + [label] - - py_reader = None - if use_py_reader: - py_reader = fluid.layers.create_py_reader_by_data(capacity=64, - feed_list=words, - name='py_reader', - use_double_buffer=True) - words = fluid.layers.read_file(py_reader) - - sparse_embed_seq = list(map(embedding_layer, words[1:-1])) - concated = fluid.layers.concat(sparse_embed_seq + words[0:1], axis=1) - - fc1 = fluid.layers.fc(input=concated, size=400, act='relu', - param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( - scale=1 / math.sqrt(concated.shape[1])))) - fc2 = fluid.layers.fc(input=fc1, size=400, act='relu', - param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( - scale=1 / math.sqrt(fc1.shape[1])))) - fc3 = fluid.layers.fc(input=fc2, size=400, act='relu', - param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( - scale=1 / math.sqrt(fc2.shape[1])))) - predict = fluid.layers.fc(input=fc3, size=2, act='softmax', - param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( - scale=1 / math.sqrt(fc3.shape[1])))) - - cost = fluid.layers.cross_entropy(input=predict, label=words[-1]) - avg_cost = fluid.layers.reduce_sum(cost) - accuracy = fluid.layers.accuracy(input=predict, label=words[-1]) - auc_var, batch_auc_var, auc_states = \ - fluid.layers.auc(input=predict, label=words[-1], num_thresholds=2 ** 12, slide_steps=20) - - return avg_cost, auc_var, batch_auc_var, py_reader, words -- GitLab