提交 2207ae95 编写于 作者: D dongdaxiang

add feature hashing for training, users do not need to use preprocessing.py in this mode

上级 439879ef
...@@ -46,15 +46,12 @@ This Demo only implement the DNN part of the model described in DeepFM paper. ...@@ -46,15 +46,12 @@ This Demo only implement the DNN part of the model described in DeepFM paper.
DeepFM model will be provided in other model. DeepFM model will be provided in other model.
## Data preparation ## Data Preprocessing method
To preprocess the raw dataset, the integer features are clipped then min-max To preprocess the raw dataset, the integer features are clipped then min-max
normalized to [0, 1] and the categorical features are one-hot encoded. The raw normalized to [0, 1] and the categorical features are one-hot encoded. The raw
training dataset are splited such that 90% are used for training and the other training dataset are splited such that 90% are used for training and the other
10% are used for validation during training. 10% are used for validation during training. In reader.py, training data is the first
90% of data in train.txt, and validation data is the left.
```bash
python preprocess.py --datadir ./data/raw --outdir ./data
```
## Train ## Train
The command line options for training can be listed by `python train.py -h`. The command line options for training can be listed by `python train.py -h`.
...@@ -62,7 +59,7 @@ The command line options for training can be listed by `python train.py -h`. ...@@ -62,7 +59,7 @@ The command line options for training can be listed by `python train.py -h`.
### Local Train: ### Local Train:
```bash ```bash
python train.py \ python train.py \
--train_data_path data/train.txt \ --train_data_path data/raw/train.txt \
2>&1 | tee train.log 2>&1 | tee train.log
``` ```
...@@ -70,7 +67,9 @@ After training pass 1 batch 40000, the testing AUC is `0.801178` and the testing ...@@ -70,7 +67,9 @@ After training pass 1 batch 40000, the testing AUC is `0.801178` and the testing
cost is `0.445196`. cost is `0.445196`.
### Distributed Train ### Distributed Train
Run a 2 pserver 2 trainer distribute training on a single machine Run a 2 pserver 2 trainer distribute training on a single machine.
In distributed training setting, training data is splited by trainer_id, so that training data
do not overlap among trainers
```bash ```bash
sh cluster_train.sh sh cluster_train.sh
...@@ -83,9 +82,9 @@ To make inference for the test dataset: ...@@ -83,9 +82,9 @@ To make inference for the test dataset:
```bash ```bash
python infer.py \ python infer.py \
--model_path models/ \ --model_path models/ \
--data_path data/valid.txt --data_path data/raw/train.txt
``` ```
Note: The AUC value in the last log info is the total AUC for all test dataset. Note: The AUC value in the last log info is the total AUC for all test dataset. Here, train.txt is splited inside the reader.py so that validation data does not have overlap with training data.
## Train on Baidu Cloud ## Train on Baidu Cloud
1. Please prepare some CPU machines on Baidu Cloud following the steps in [train_on_baidu_cloud](https://github.com/PaddlePaddle/FluidDoc/blob/develop/doc/fluid/user_guides/howto/training/train_on_baidu_cloud_cn.rst) 1. Please prepare some CPU machines on Baidu Cloud following the steps in [train_on_baidu_cloud](https://github.com/PaddlePaddle/FluidDoc/blob/develop/doc/fluid/user_guides/howto/training/train_on_baidu_cloud_cn.rst)
......
...@@ -2,6 +2,9 @@ import argparse ...@@ -2,6 +2,9 @@ import argparse
import logging import logging
import numpy as np import numpy as np
# disable gpu training for this example
import os
os.environ["CUDA_VISIBLE_DEVICES"] = ""
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
...@@ -32,6 +35,11 @@ def parse_args(): ...@@ -32,6 +35,11 @@ def parse_args():
type=int, type=int,
default=10, default=10,
help="The size for embedding layer (default:10)") help="The size for embedding layer (default:10)")
parser.add_argument(
'--sparse_feature_dim',
type=int,
default=1000001,
help="The size for embedding layer (default:1000001)")
parser.add_argument( parser.add_argument(
'--batch_size', '--batch_size',
type=int, type=int,
...@@ -46,19 +54,19 @@ def infer(): ...@@ -46,19 +54,19 @@ def infer():
place = fluid.CPUPlace() place = fluid.CPUPlace()
inference_scope = fluid.core.Scope() inference_scope = fluid.core.Scope()
dataset = reader.Dataset() dataset = reader.CriteoDataset(args.sparse_feature_dim)
test_reader = paddle.batch(dataset.train([args.data_path]), batch_size=args.batch_size) test_reader = paddle.batch(dataset.test([args.data_path]), batch_size=args.batch_size)
startup_program = fluid.framework.Program() startup_program = fluid.framework.Program()
test_program = fluid.framework.Program() test_program = fluid.framework.Program()
with fluid.framework.program_guard(test_program, startup_program): with fluid.framework.program_guard(test_program, startup_program):
loss, data_list, auc_var, batch_auc_var = ctr_dnn_model(args.embedding_size) loss, data_list, auc_var, batch_auc_var = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim)
exe = fluid.Executor(place) exe = fluid.Executor(place)
feeder = fluid.DataFeeder(feed_list=data_list, place=place) feeder = fluid.DataFeeder(feed_list=data_list, place=place)
with fluid.scope_guard(inference_scope): with fluid.scope_guard(inference_scope):
[inference_program, _, fetch_targets] = fluid.io.load_inference_model(args.model_path, exe) [inference_program, _, fetch_targets] = fluid.io.load_inference_model(args.model_path, exe)
......
...@@ -2,10 +2,8 @@ import paddle.fluid as fluid ...@@ -2,10 +2,8 @@ import paddle.fluid as fluid
import math import math
dense_feature_dim = 13 dense_feature_dim = 13
sparse_feature_dim = 117568
def ctr_dnn_model(embedding_size, sparse_feature_dim):
def ctr_dnn_model(embedding_size):
dense_input = fluid.layers.data( dense_input = fluid.layers.data(
name="dense_input", shape=[dense_feature_dim], dtype='float32') name="dense_input", shape=[dense_feature_dim], dtype='float32')
sparse_input_ids = [ sparse_input_ids = [
......
...@@ -24,3 +24,51 @@ class Dataset: ...@@ -24,3 +24,51 @@ class Dataset:
def infer(self, file_list): def infer(self, file_list):
return self._reader_creator(file_list, True) return self._reader_creator(file_list, True)
class CriteoDataset:
def __init__(self, sparse_feature_dim):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
self.cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
self.hash_dim_ = sparse_feature_dim
self.train_idx_ = 41256555
self.continuous_range_ = range(1, 14)
self.categorical_range_ = range(14, 40)
def _reader_creator(self, file_list, is_train, trainer_id):
def reader():
for file in file_list:
with open(file, 'r') as f:
line_idx = 0
for line in f:
line_idx += 1
if is_train and line_idx > self.train_idx_:
continue
elif not is_train and line_idx <= self.train_idx_:
continue
if trainer_id > 0 and line_idx % trainer_id != 0:
continue
features = line.rstrip('\n').split('\t')
dense_feature = []
sparse_feature = []
for idx in self.continuous_range_:
if features[idx] == '':
dense_feature.append(0.0)
else:
dense_feature.append((float(features[idx]) - self.cont_min_[idx - 1]) / self.cont_diff_[idx - 1])
for idx in self.categorical_range_:
sparse_feature.append([hash("%d_%s" % (idx, features[idx])) % self.hash_dim_])
label = [int(features[0])]
yield [dense_feature] + sparse_feature + [label]
return reader
def train(self, file_list, trainer_id):
return self._reader_creator(file_list, True, trainer_id)
def test(self, file_list):
return self._reader_creator(file_list, False, -1)
def infer(self, file_list):
return self._reader_creator(file_list, False, -1)
...@@ -4,6 +4,9 @@ import argparse ...@@ -4,6 +4,9 @@ import argparse
import logging import logging
import os import os
# disable gpu training for this example
os.environ["CUDA_VISIBLE_DEVICES"] = ""
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
...@@ -21,12 +24,12 @@ def parse_args(): ...@@ -21,12 +24,12 @@ def parse_args():
parser.add_argument( parser.add_argument(
'--train_data_path', '--train_data_path',
type=str, type=str,
default='./data/train.txt', default='./data/raw/train.txt',
help="The path of training dataset") help="The path of training dataset")
parser.add_argument( parser.add_argument(
'--test_data_path', '--test_data_path',
type=str, type=str,
default='./data/valid.txt', default='./data/raw/valid.txt',
help="The path of testing dataset") help="The path of testing dataset")
parser.add_argument( parser.add_argument(
'--batch_size', '--batch_size',
...@@ -48,6 +51,11 @@ def parse_args(): ...@@ -48,6 +51,11 @@ def parse_args():
type=str, type=str,
default='models', default='models',
help='The path for model to store (default: models)') help='The path for model to store (default: models)')
parser.add_argument(
'--sparse_feature_dim',
type=int,
default=1000001,
help='sparse feature hashing space for index processing')
parser.add_argument( parser.add_argument(
'--is_local', '--is_local',
...@@ -84,11 +92,11 @@ def parse_args(): ...@@ -84,11 +92,11 @@ def parse_args():
return parser.parse_args() return parser.parse_args()
def train_loop(args, train_program, data_list, loss, auc_var, batch_auc_var): def train_loop(args, train_program, data_list, loss, auc_var, batch_auc_var, trainer_id):
dataset = reader.Dataset() dataset = reader.CriteoDataset(args.sparse_feature_dim)
train_reader = paddle.batch( train_reader = paddle.batch(
paddle.reader.shuffle( paddle.reader.shuffle(
dataset.train([args.train_data_path]), dataset.train([args.train_data_path], trainer_id),
buf_size=args.batch_size * 100), buf_size=args.batch_size * 100),
batch_size=args.batch_size) batch_size=args.batch_size)
place = fluid.CPUPlace() place = fluid.CPUPlace()
...@@ -122,14 +130,14 @@ def train(): ...@@ -122,14 +130,14 @@ def train():
if not os.path.isdir(args.model_output_dir): if not os.path.isdir(args.model_output_dir):
os.mkdir(args.model_output_dir) os.mkdir(args.model_output_dir)
loss, data_list, auc_var, batch_auc_var = ctr_dnn_model(args.embedding_size) loss, data_list, auc_var, batch_auc_var = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim)
optimizer = fluid.optimizer.Adam(learning_rate=1e-4) optimizer = fluid.optimizer.Adam(learning_rate=1e-4)
optimizer.minimize(loss) optimizer.minimize(loss)
if args.is_local: if args.is_local:
logger.info("run local training") logger.info("run local training")
main_program = fluid.default_main_program() main_program = fluid.default_main_program()
train_loop(args, main_program, data_list, loss, auc_var, batch_auc_var) train_loop(args, main_program, data_list, loss, auc_var, batch_auc_var, -1)
else: else:
logger.info("run dist training") logger.info("run dist training")
t = fluid.DistributeTranspiler() t = fluid.DistributeTranspiler()
...@@ -144,7 +152,8 @@ def train(): ...@@ -144,7 +152,8 @@ def train():
elif args.role == "trainer": elif args.role == "trainer":
logger.info("run trainer") logger.info("run trainer")
train_prog = t.get_trainer_program() train_prog = t.get_trainer_program()
train_loop(args, train_prog, data_list, loss, auc_var, batch_auc_var) train_loop(args, train_prog, data_list, loss, auc_var, batch_auc_var,
args.trainer_id + 1)
if __name__ == '__main__': if __name__ == '__main__':
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册