未验证 提交 514fab8b 编写于 作者: G guru4elephant 提交者: GitHub

Merge pull request #1390 from guru4elephant/develop

remove preprocessing of CriteoDataset, and data auto-split for distributed training in one node, move models/fluid/recommendation to models/fluid/PaddleRec
...@@ -38,17 +38,13 @@ cd data && ./download.sh && cd .. ...@@ -38,17 +38,13 @@ cd data && ./download.sh && cd ..
## 数据准备 ## 数据准备
处理原始数据集,整型特征使用min-max归一化方法规范到[0, 1],类别类特征使用了one-hot编码。原始数据集分割成两部分:90%用于训练,其他10%用于训练过程中的验证。 处理原始数据集,整型特征使用min-max归一化方法规范到[0, 1],类别类特征使用了one-hot编码。原始数据集分割成两部分:90%用于训练,其他10%用于训练过程中的验证。
```bash
python preprocess.py --datadir ./data/raw --outdir ./data
```
## 训练 ## 训练
训练的命令行选项可以通过`python train.py -h`列出。 训练的命令行选项可以通过`python train.py -h`列出。
### 单机训练: ### 单机训练:
```bash ```bash
python train.py \ python train.py \
--train_data_path data/train.txt \ --train_data_path data/raw/train.txt \
2>&1 | tee train.log 2>&1 | tee train.log
``` ```
...@@ -56,7 +52,7 @@ python train.py \ ...@@ -56,7 +52,7 @@ python train.py \
### 分布式训练 ### 分布式训练
本地启动一个2 trainer 2 pserver的分布式训练任务 本地启动一个2 trainer 2 pserver的分布式训练任务,分布式场景下训练数据会按照trainer的id进行切分,保证trainer之间的训练数据不会重叠,提高训练效率
```bash ```bash
sh cluster_train.sh sh cluster_train.sh
...@@ -69,7 +65,7 @@ sh cluster_train.sh ...@@ -69,7 +65,7 @@ sh cluster_train.sh
```bash ```bash
python infer.py \ python infer.py \
--model_path models/pass-0/ \ --model_path models/pass-0/ \
--data_path data/valid.txt --data_path data/raw/valid.txt
``` ```
注意:infer.py跑完最后输出的AUC才是整个预测文件的整体AUC。 注意:infer.py跑完最后输出的AUC才是整个预测文件的整体AUC。
...@@ -77,4 +73,4 @@ python infer.py \ ...@@ -77,4 +73,4 @@ python infer.py \
1. 参考文档 [在百度云上启动Fluid分布式训练](https://github.com/PaddlePaddle/FluidDoc/blob/develop/doc/fluid/user_guides/howto/training/train_on_baidu_cloud_cn.rst) 在百度云上部署一个CPU集群。 1. 参考文档 [在百度云上启动Fluid分布式训练](https://github.com/PaddlePaddle/FluidDoc/blob/develop/doc/fluid/user_guides/howto/training/train_on_baidu_cloud_cn.rst) 在百度云上部署一个CPU集群。
1. 用preprocess.py处理训练数据生成train.txt。 1. 用preprocess.py处理训练数据生成train.txt。
1. 将train.txt切分成集群机器份,放到每台机器上。 1. 将train.txt切分成集群机器份,放到每台机器上。
1. 用上面的 `分布式训练` 中的命令行启动分布式训练任务. 1. 用上面的 `分布式训练` 中的命令行启动分布式训练任务.
\ No newline at end of file
...@@ -46,15 +46,12 @@ This Demo only implement the DNN part of the model described in DeepFM paper. ...@@ -46,15 +46,12 @@ This Demo only implement the DNN part of the model described in DeepFM paper.
DeepFM model will be provided in other model. DeepFM model will be provided in other model.
## Data preparation ## Data Preprocessing method
To preprocess the raw dataset, the integer features are clipped then min-max To preprocess the raw dataset, the integer features are clipped then min-max
normalized to [0, 1] and the categorical features are one-hot encoded. The raw normalized to [0, 1] and the categorical features are one-hot encoded. The raw
training dataset are splited such that 90% are used for training and the other training dataset are splited such that 90% are used for training and the other
10% are used for validation during training. 10% are used for validation during training. In reader.py, training data is the first
90% of data in train.txt, and validation data is the left.
```bash
python preprocess.py --datadir ./data/raw --outdir ./data
```
## Train ## Train
The command line options for training can be listed by `python train.py -h`. The command line options for training can be listed by `python train.py -h`.
...@@ -62,7 +59,7 @@ The command line options for training can be listed by `python train.py -h`. ...@@ -62,7 +59,7 @@ The command line options for training can be listed by `python train.py -h`.
### Local Train: ### Local Train:
```bash ```bash
python train.py \ python train.py \
--train_data_path data/train.txt \ --train_data_path data/raw/train.txt \
2>&1 | tee train.log 2>&1 | tee train.log
``` ```
...@@ -70,7 +67,9 @@ After training pass 1 batch 40000, the testing AUC is `0.801178` and the testing ...@@ -70,7 +67,9 @@ After training pass 1 batch 40000, the testing AUC is `0.801178` and the testing
cost is `0.445196`. cost is `0.445196`.
### Distributed Train ### Distributed Train
Run a 2 pserver 2 trainer distribute training on a single machine Run a 2 pserver 2 trainer distribute training on a single machine.
In distributed training setting, training data is splited by trainer_id, so that training data
do not overlap among trainers
```bash ```bash
sh cluster_train.sh sh cluster_train.sh
...@@ -83,9 +82,9 @@ To make inference for the test dataset: ...@@ -83,9 +82,9 @@ To make inference for the test dataset:
```bash ```bash
python infer.py \ python infer.py \
--model_path models/ \ --model_path models/ \
--data_path data/valid.txt --data_path data/raw/train.txt
``` ```
Note: The AUC value in the last log info is the total AUC for all test dataset. Note: The AUC value in the last log info is the total AUC for all test dataset. Here, train.txt is splited inside the reader.py so that validation data does not have overlap with training data.
## Train on Baidu Cloud ## Train on Baidu Cloud
1. Please prepare some CPU machines on Baidu Cloud following the steps in [train_on_baidu_cloud](https://github.com/PaddlePaddle/FluidDoc/blob/develop/doc/fluid/user_guides/howto/training/train_on_baidu_cloud_cn.rst) 1. Please prepare some CPU machines on Baidu Cloud following the steps in [train_on_baidu_cloud](https://github.com/PaddlePaddle/FluidDoc/blob/develop/doc/fluid/user_guides/howto/training/train_on_baidu_cloud_cn.rst)
......
...@@ -2,6 +2,9 @@ import argparse ...@@ -2,6 +2,9 @@ import argparse
import logging import logging
import numpy as np import numpy as np
# disable gpu training for this example
import os
os.environ["CUDA_VISIBLE_DEVICES"] = ""
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
...@@ -32,6 +35,11 @@ def parse_args(): ...@@ -32,6 +35,11 @@ def parse_args():
type=int, type=int,
default=10, default=10,
help="The size for embedding layer (default:10)") help="The size for embedding layer (default:10)")
parser.add_argument(
'--sparse_feature_dim',
type=int,
default=1000001,
help="The size for embedding layer (default:1000001)")
parser.add_argument( parser.add_argument(
'--batch_size', '--batch_size',
type=int, type=int,
...@@ -46,19 +54,19 @@ def infer(): ...@@ -46,19 +54,19 @@ def infer():
place = fluid.CPUPlace() place = fluid.CPUPlace()
inference_scope = fluid.core.Scope() inference_scope = fluid.core.Scope()
dataset = reader.Dataset() dataset = reader.CriteoDataset(args.sparse_feature_dim)
test_reader = paddle.batch(dataset.train([args.data_path]), batch_size=args.batch_size) test_reader = paddle.batch(dataset.test([args.data_path]), batch_size=args.batch_size)
startup_program = fluid.framework.Program() startup_program = fluid.framework.Program()
test_program = fluid.framework.Program() test_program = fluid.framework.Program()
with fluid.framework.program_guard(test_program, startup_program): with fluid.framework.program_guard(test_program, startup_program):
loss, data_list, auc_var, batch_auc_var = ctr_dnn_model(args.embedding_size) loss, data_list, auc_var, batch_auc_var = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim)
exe = fluid.Executor(place) exe = fluid.Executor(place)
feeder = fluid.DataFeeder(feed_list=data_list, place=place) feeder = fluid.DataFeeder(feed_list=data_list, place=place)
with fluid.scope_guard(inference_scope): with fluid.scope_guard(inference_scope):
[inference_program, _, fetch_targets] = fluid.io.load_inference_model(args.model_path, exe) [inference_program, _, fetch_targets] = fluid.io.load_inference_model(args.model_path, exe)
......
...@@ -2,10 +2,8 @@ import paddle.fluid as fluid ...@@ -2,10 +2,8 @@ import paddle.fluid as fluid
import math import math
dense_feature_dim = 13 dense_feature_dim = 13
sparse_feature_dim = 117568
def ctr_dnn_model(embedding_size, sparse_feature_dim):
def ctr_dnn_model(embedding_size):
dense_input = fluid.layers.data( dense_input = fluid.layers.data(
name="dense_input", shape=[dense_feature_dim], dtype='float32') name="dense_input", shape=[dense_feature_dim], dtype='float32')
sparse_input_ids = [ sparse_input_ids = [
......
class Dataset:
def __init__(self):
pass
class CriteoDataset(Dataset):
def __init__(self, sparse_feature_dim):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
self.cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
self.hash_dim_ = sparse_feature_dim
# here, training data are lines with line_index < train_idx_
self.train_idx_ = 41256555
self.continuous_range_ = range(1, 14)
self.categorical_range_ = range(14, 40)
def _reader_creator(self, file_list, is_train, trainer_num, trainer_id):
def reader():
for file in file_list:
with open(file, 'r') as f:
line_idx = 0
for line in f:
line_idx += 1
if is_train and line_idx > self.train_idx_:
continue
elif not is_train and line_idx <= self.train_idx_:
continue
if trainer_id > 0 and line_idx % trainer_num != trainer_id:
continue
features = line.rstrip('\n').split('\t')
dense_feature = []
sparse_feature = []
for idx in self.continuous_range_:
if features[idx] == '':
dense_feature.append(0.0)
else:
dense_feature.append((float(features[idx]) - self.cont_min_[idx - 1]) / self.cont_diff_[idx - 1])
for idx in self.categorical_range_:
sparse_feature.append([hash("%d_%s" % (idx, features[idx])) % self.hash_dim_])
label = [int(features[0])]
yield [dense_feature] + sparse_feature + [label]
return reader
def train(self, file_list, trainer_num, trainer_id):
return self._reader_creator(file_list, True, trainer_num, trainer_id)
def test(self, file_list):
return self._reader_creator(file_list, False, -1)
def infer(self, file_list):
return self._reader_creator(file_list, False, -1)
...@@ -4,6 +4,9 @@ import argparse ...@@ -4,6 +4,9 @@ import argparse
import logging import logging
import os import os
# disable gpu training for this example
os.environ["CUDA_VISIBLE_DEVICES"] = ""
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
...@@ -21,12 +24,12 @@ def parse_args(): ...@@ -21,12 +24,12 @@ def parse_args():
parser.add_argument( parser.add_argument(
'--train_data_path', '--train_data_path',
type=str, type=str,
default='./data/train.txt', default='./data/raw/train.txt',
help="The path of training dataset") help="The path of training dataset")
parser.add_argument( parser.add_argument(
'--test_data_path', '--test_data_path',
type=str, type=str,
default='./data/valid.txt', default='./data/raw/valid.txt',
help="The path of testing dataset") help="The path of testing dataset")
parser.add_argument( parser.add_argument(
'--batch_size', '--batch_size',
...@@ -48,6 +51,11 @@ def parse_args(): ...@@ -48,6 +51,11 @@ def parse_args():
type=str, type=str,
default='models', default='models',
help='The path for model to store (default: models)') help='The path for model to store (default: models)')
parser.add_argument(
'--sparse_feature_dim',
type=int,
default=1000001,
help='sparse feature hashing space for index processing')
parser.add_argument( parser.add_argument(
'--is_local', '--is_local',
...@@ -84,11 +92,12 @@ def parse_args(): ...@@ -84,11 +92,12 @@ def parse_args():
return parser.parse_args() return parser.parse_args()
def train_loop(args, train_program, data_list, loss, auc_var, batch_auc_var): def train_loop(args, train_program, data_list, loss, auc_var, batch_auc_var,
dataset = reader.Dataset() trainer_num, trainer_id):
dataset = reader.CriteoDataset(args.sparse_feature_dim)
train_reader = paddle.batch( train_reader = paddle.batch(
paddle.reader.shuffle( paddle.reader.shuffle(
dataset.train([args.train_data_path]), dataset.train([args.train_data_path], trainer_num, trainer_id),
buf_size=args.batch_size * 100), buf_size=args.batch_size * 100),
batch_size=args.batch_size) batch_size=args.batch_size)
place = fluid.CPUPlace() place = fluid.CPUPlace()
...@@ -122,14 +131,14 @@ def train(): ...@@ -122,14 +131,14 @@ def train():
if not os.path.isdir(args.model_output_dir): if not os.path.isdir(args.model_output_dir):
os.mkdir(args.model_output_dir) os.mkdir(args.model_output_dir)
loss, data_list, auc_var, batch_auc_var = ctr_dnn_model(args.embedding_size) loss, data_list, auc_var, batch_auc_var = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim)
optimizer = fluid.optimizer.Adam(learning_rate=1e-4) optimizer = fluid.optimizer.Adam(learning_rate=1e-4)
optimizer.minimize(loss) optimizer.minimize(loss)
if args.is_local: if args.is_local:
logger.info("run local training") logger.info("run local training")
main_program = fluid.default_main_program() main_program = fluid.default_main_program()
train_loop(args, main_program, data_list, loss, auc_var, batch_auc_var) train_loop(args, main_program, data_list, loss, auc_var, batch_auc_var, 1, -1)
else: else:
logger.info("run dist training") logger.info("run dist training")
t = fluid.DistributeTranspiler() t = fluid.DistributeTranspiler()
...@@ -144,7 +153,8 @@ def train(): ...@@ -144,7 +153,8 @@ def train():
elif args.role == "trainer": elif args.role == "trainer":
logger.info("run trainer") logger.info("run trainer")
train_prog = t.get_trainer_program() train_prog = t.get_trainer_program()
train_loop(args, train_prog, data_list, loss, auc_var, batch_auc_var) train_loop(args, train_prog, data_list, loss, auc_var, batch_auc_var,
args.trainers, args.trainer_id + 1)
if __name__ == '__main__': if __name__ == '__main__':
......
class Dataset:
def _reader_creator(self, file_list, is_infer):
def reader():
for file in file_list:
with open(file, 'r') as f:
for line in f:
features = line.rstrip('\n').split('\t')
dense_feature = map(float, features[0].split(','))
sparse_feature = map(lambda x: [int(x)], features[1].split(','))
if not is_infer:
label = [float(features[2])]
yield [dense_feature
] + sparse_feature + [label]
else:
yield [dense_feature] + sparse_feature
return reader
def train(self, file_list):
return self._reader_creator(file_list, False)
def test(self, file_list):
return self._reader_creator(file_list, False)
def infer(self, file_list):
return self._reader_creator(file_list, True)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册