diff --git a/ctr/README.md b/ctr/README.md index 9332a8516e72d9df6c81ee0faf7c44264e43c0a6..c804dab6941357ad4505e98ed6f5429093a47765 100644 --- a/ctr/README.md +++ b/ctr/README.md @@ -1,5 +1,20 @@ # 点击率预估 +以下是本例目录包含的文件以及对应说明: + +``` +├── README.md # 本教程markdown 文档 +├── dataset.md # 数据集处理教程 +├── images # 本教程图片目录 +│   ├── lr_vs_dnn.jpg +│   └── wide_deep.png +├── infer.py # 预测脚本 +├── network_conf.py # 模型网络配置 +├── reader.py # data provider +├── train.py # 训练脚本 +└── utils.py # helper functions +``` + ## 背景介绍 CTR(Click-Through Rate,点击率预估)\[[1](https://en.wikipedia.org/wiki/Click-through_rate)\] 是用来表示用户点击一个特定链接的概率, @@ -61,8 +76,40 @@ LR 对于 DNN 模型的优势是对大规模稀疏特征的容纳能力,包括 我们使用 Kaggle 上 `Click-through rate prediction` 任务的数据集\[[2](https://www.kaggle.com/c/avazu-ctr-prediction/data)\] 来演示模型。 -具体的特征处理方法参看 [data process](./dataset.md) +具体的特征处理方法参看 [data process](./dataset.md)。 + +本教程中演示模型的输入格式如下: + +``` +# \t \t click +1 23 190 \t 230:0.12 3421:0.9 23451:0.12 \t 0 +23 231 \t 1230:0.12 13421:0.9 \t 1 +``` + +演示数据集\[[2](#参考文档)\] 可以使用 `avazu_data_processor.py` 脚本处理,具体使用方法参考如下说明: + +``` +usage: avazu_data_processer.py [-h] --data_path DATA_PATH --output_dir + OUTPUT_DIR + [--num_lines_to_detect NUM_LINES_TO_DETECT] + [--test_set_size TEST_SET_SIZE] + [--train_size TRAIN_SIZE] + +PaddlePaddle CTR example +optional arguments: + -h, --help show this help message and exit + --data_path DATA_PATH + path of the Avazu dataset + --output_dir OUTPUT_DIR + directory to output + --num_lines_to_detect NUM_LINES_TO_DETECT + number of records to detect dataset's meta info + --test_set_size TEST_SET_SIZE + size of the validation dataset(default: 10000) + --train_size TRAIN_SIZE + size of the trainset (default: 100000) +``` ## Wide & Deep Learning Model @@ -204,15 +251,17 @@ trainer.train( 1. 下载训练数据,可以使用 Kaggle 上 CTR 比赛的数据\[[2](#参考文献)\] 1. 从 [Kaggle CTR](https://www.kaggle.com/c/avazu-ctr-prediction/data) 下载 train.gz 2. 解压 train.gz 得到 train.txt -2. 执行 `python train.py --train_data_path train.txt` ,开始训练 + 3. `mkdir -p output; python avazu_data_processer.py --data_path train.txt --output_dir output --num_lines_to_detect 1000 --test_set_size 100` 生成演示数据 +2. 执行 `python train.py --train_data_path ./output/train.txt --test_data_path ./output/test.txt --data_meta_file ./output/data.meta.txt --model_type=0` 开始训练 上面第2个步骤可以为 `train.py` 填充命令行参数来定制模型的训练过程,具体的命令行参数及用法如下 ``` usage: train.py [-h] --train_data_path TRAIN_DATA_PATH - [--batch_size BATCH_SIZE] [--test_set_size TEST_SET_SIZE] + [--test_data_path TEST_DATA_PATH] [--batch_size BATCH_SIZE] [--num_passes NUM_PASSES] - [--num_lines_to_detact NUM_LINES_TO_DETACT] + [--model_output_prefix MODEL_OUTPUT_PREFIX] --data_meta_file + DATA_META_FILE --model_type MODEL_TYPE PaddlePaddle CTR example @@ -220,16 +269,63 @@ optional arguments: -h, --help show this help message and exit --train_data_path TRAIN_DATA_PATH path of training dataset + --test_data_path TEST_DATA_PATH + path of testing dataset --batch_size BATCH_SIZE size of mini-batch (default:10000) - --test_set_size TEST_SET_SIZE - size of the validation dataset(default: 10000) --num_passes NUM_PASSES number of passes to train - --num_lines_to_detact NUM_LINES_TO_DETACT - number of records to detect dataset's meta info + --model_output_prefix MODEL_OUTPUT_PREFIX + prefix of path for model to store (default: + ./ctr_models) + --data_meta_file DATA_META_FILE + path of data meta info file + --model_type MODEL_TYPE + model type, classification: 0, regression 1 (default + classification) +``` + +## 用训好的模型做预测 +训好的模型可以用来预测新的数据, 预测数据的格式为 + +``` +# \t +1 23 190 \t 230:0.12 3421:0.9 23451:0.12 +23 231 \t 1230:0.12 13421:0.9 +``` + +`infer.py` 的使用方法如下 + +``` +usage: infer.py [-h] --model_gz_path MODEL_GZ_PATH --data_path DATA_PATH + --prediction_output_path PREDICTION_OUTPUT_PATH + [--data_meta_path DATA_META_PATH] --model_type MODEL_TYPE + +PaddlePaddle CTR example + +optional arguments: + -h, --help show this help message and exit + --model_gz_path MODEL_GZ_PATH + path of model parameters gz file + --data_path DATA_PATH + path of the dataset to infer + --prediction_output_path PREDICTION_OUTPUT_PATH + path to output the prediction + --data_meta_path DATA_META_PATH + path of trainset's meta info, default is ./data.meta + --model_type MODEL_TYPE + model type, classification: 0, regression 1 (default + classification) ``` +示例数据可以用如下命令预测 + +``` +python infer.py --model_gz_path --data_path output/infer.txt --prediction_output_path predictions.txt --data_meta_path data.meta.txt +``` + +最终的预测结果位于 `predictions.txt`。 + ## 参考文献 1. 2. diff --git a/ctr/avazu_data_processer.py b/ctr/avazu_data_processer.py new file mode 100644 index 0000000000000000000000000000000000000000..884e09d1978507e197e7e98c11ad708fb6859a90 --- /dev/null +++ b/ctr/avazu_data_processer.py @@ -0,0 +1,415 @@ +import os +import sys +import csv +import cPickle +import argparse +import numpy as np + +from utils import logger, TaskMode + +parser = argparse.ArgumentParser(description="PaddlePaddle CTR example") +parser.add_argument( + '--data_path', type=str, required=True, help="path of the Avazu dataset") +parser.add_argument( + '--output_dir', type=str, required=True, help="directory to output") +parser.add_argument( + '--num_lines_to_detect', + type=int, + default=500000, + help="number of records to detect dataset's meta info") +parser.add_argument( + '--test_set_size', + type=int, + default=10000, + help="size of the validation dataset(default: 10000)") +parser.add_argument( + '--train_size', + type=int, + default=100000, + help="size of the trainset (default: 100000)") +args = parser.parse_args() +''' +The fields of the dataset are: + + 0. id: ad identifier + 1. click: 0/1 for non-click/click + 2. hour: format is YYMMDDHH, so 14091123 means 23:00 on Sept. 11, 2014 UTC. + 3. C1 -- anonymized categorical variable + 4. banner_pos + 5. site_id + 6. site_domain + 7. site_category + 8. app_id + 9. app_domain + 10. app_category + 11. device_id + 12. device_ip + 13. device_model + 14. device_type + 15. device_conn_type + 16. C14-C21 -- anonymized categorical variables + +We will treat following fields as categorical features: + + - C1 + - banner_pos + - site_category + - app_category + - device_type + - device_conn_type + +and some other features as id features: + + - id + - site_id + - app_id + - device_id + +The `hour` field will be treated as a continuous feature and will be transformed +to one-hot representation which has 24 bits. + +This script will output 3 files: + +1. train.txt +2. test.txt +3. infer.txt + +all the files are for demo. +''' + +feature_dims = {} + +categorial_features = ('C1 banner_pos site_category app_category ' + + 'device_type device_conn_type').split() + +id_features = 'id site_id app_id device_id _device_id_cross_site_id'.split() + + +def get_all_field_names(mode=0): + ''' + @mode: int + 0 for train, 1 for test + @return: list of str + ''' + return categorial_features + ['hour'] + id_features + ['click'] \ + if mode == 0 else [] + + +class CategoryFeatureGenerator(object): + ''' + Generator category features. + + Register all records by calling `register` first, then call `gen` to generate + one-hot representation for a record. + ''' + + def __init__(self): + self.dic = {'unk': 0} + self.counter = 1 + + def register(self, key): + ''' + Register record. + ''' + if key not in self.dic: + self.dic[key] = self.counter + self.counter += 1 + + def size(self): + return len(self.dic) + + def gen(self, key): + ''' + Generate one-hot representation for a record. + ''' + if key not in self.dic: + res = self.dic['unk'] + else: + res = self.dic[key] + return [res] + + def __repr__(self): + return '' % len(self.dic) + + +class IDfeatureGenerator(object): + def __init__(self, max_dim, cross_fea0=None, cross_fea1=None): + ''' + @max_dim: int + Size of the id elements' space + ''' + self.max_dim = max_dim + self.cross_fea0 = cross_fea0 + self.cross_fea1 = cross_fea1 + + def gen(self, key): + ''' + Generate one-hot representation for records + ''' + return [hash(key) % self.max_dim] + + def gen_cross_fea(self, fea1, fea2): + key = str(fea1) + str(fea2) + return self.gen(key) + + def size(self): + return self.max_dim + + +class ContinuousFeatureGenerator(object): + def __init__(self, n_intervals): + self.min = sys.maxint + self.max = sys.minint + self.n_intervals = n_intervals + + def register(self, val): + self.min = min(self.minint, val) + self.max = max(self.maxint, val) + + def gen(self, val): + self.len_part = (self.max - self.min) / self.n_intervals + return (val - self.min) / self.len_part + + +# init all feature generators +fields = {} +for key in categorial_features: + fields[key] = CategoryFeatureGenerator() +for key in id_features: + # for cross features + if 'cross' in key: + feas = key[1:].split('_cross_') + fields[key] = IDfeatureGenerator(10000000, *feas) + # for normal ID features + else: + fields[key] = IDfeatureGenerator(10000) + +# used as feed_dict in PaddlePaddle +field_index = dict((key, id) + for id, key in enumerate(['dnn_input', 'lr_input', 'click'])) + + +def detect_dataset(path, topn, id_fea_space=10000): + ''' + Parse the first `topn` records to collect meta information of this dataset. + + NOTE the records should be randomly shuffled first. + ''' + # create categorical statis objects. + logger.warning('detecting dataset') + + with open(path, 'rb') as csvfile: + reader = csv.DictReader(csvfile) + for row_id, row in enumerate(reader): + if row_id > topn: + break + + for key in categorial_features: + fields[key].register(row[key]) + + for key, item in fields.items(): + feature_dims[key] = item.size() + + feature_dims['hour'] = 24 + feature_dims['click'] = 1 + + feature_dims['dnn_input'] = np.sum( + feature_dims[key] for key in categorial_features + ['hour']) + 1 + feature_dims['lr_input'] = np.sum(feature_dims[key] + for key in id_features) + 1 + # logger.warning("dump dataset's meta info to %s" % meta_out_path) + # cPickle.dump([feature_dims, fields], open(meta_out_path, 'wb')) + + return feature_dims + + +def load_data_meta(meta_path): + ''' + Load dataset's meta infomation. + ''' + feature_dims, fields = cPickle.load(open(meta_path, 'rb')) + return feature_dims, fields + + +def concat_sparse_vectors(inputs, dims): + ''' + Concaterate more than one sparse vectors into one. + + @inputs: list + list of sparse vector + @dims: list of int + dimention of each sparse vector + ''' + res = [] + assert len(inputs) == len(dims) + start = 0 + for no, vec in enumerate(inputs): + for v in vec: + res.append(v + start) + start += dims[no] + return res + + +class AvazuDataset(object): + ''' + Load AVAZU dataset as train set. + ''' + + def __init__(self, + train_path, + n_records_as_test=-1, + fields=None, + feature_dims=None): + self.train_path = train_path + self.n_records_as_test = n_records_as_test + self.fields = fields + # default is train mode. + self.mode = TaskMode.create_train() + + self.categorial_dims = [ + feature_dims[key] for key in categorial_features + ['hour'] + ] + self.id_dims = [feature_dims[key] for key in id_features] + + def train(self): + ''' + Load trainset. + ''' + logger.info("load trainset from %s" % self.train_path) + self.mode = TaskMode.create_train() + with open(self.train_path) as f: + reader = csv.DictReader(f) + + for row_id, row in enumerate(reader): + # skip top n lines + if self.n_records_as_test > 0 and row_id < self.n_records_as_test: + continue + + rcd = self._parse_record(row) + if rcd: + yield rcd + + def test(self): + ''' + Load testset. + ''' + logger.info("load testset from %s" % self.train_path) + self.mode = TaskMode.create_test() + with open(self.train_path) as f: + reader = csv.DictReader(f) + + for row_id, row in enumerate(reader): + # skip top n lines + if self.n_records_as_test > 0 and row_id > self.n_records_as_test: + break + + rcd = self._parse_record(row) + if rcd: + yield rcd + + def infer(self): + ''' + Load inferset. + ''' + logger.info("load inferset from %s" % self.train_path) + self.mode = TaskMode.create_infer() + with open(self.train_path) as f: + reader = csv.DictReader(f) + + for row_id, row in enumerate(reader): + rcd = self._parse_record(row) + if rcd: + yield rcd + + def _parse_record(self, row): + ''' + Parse a CSV row and get a record. + ''' + record = [] + for key in categorial_features: + record.append(self.fields[key].gen(row[key])) + record.append([int(row['hour'][-2:])]) + dense_input = concat_sparse_vectors(record, self.categorial_dims) + + record = [] + for key in id_features: + if 'cross' not in key: + record.append(self.fields[key].gen(row[key])) + else: + fea0 = self.fields[key].cross_fea0 + fea1 = self.fields[key].cross_fea1 + record.append( + self.fields[key].gen_cross_fea(row[fea0], row[fea1])) + + sparse_input = concat_sparse_vectors(record, self.id_dims) + + record = [dense_input, sparse_input] + + if not self.mode.is_infer(): + record.append(list((int(row['click']), ))) + return record + + +def ids2dense(vec, dim): + return vec + + +def ids2sparse(vec): + return ["%d:1" % x for x in vec] + + +detect_dataset(args.data_path, args.num_lines_to_detect) +dataset = AvazuDataset( + args.data_path, + args.test_set_size, + fields=fields, + feature_dims=feature_dims) + +output_trainset_path = os.path.join(args.output_dir, 'train.txt') +output_testset_path = os.path.join(args.output_dir, 'test.txt') +output_infer_path = os.path.join(args.output_dir, 'infer.txt') +output_meta_path = os.path.join(args.output_dir, 'data.meta.txt') + +with open(output_trainset_path, 'w') as f: + for id, record in enumerate(dataset.train()): + if id and id % 10000 == 0: + logger.info("load %d records" % id) + if id > args.train_size: + break + dnn_input, lr_input, click = record + dnn_input = ids2dense(dnn_input, feature_dims['dnn_input']) + lr_input = ids2sparse(lr_input) + line = "%s\t%s\t%d\n" % (' '.join(map(str, dnn_input)), + ' '.join(map(str, lr_input)), click[0]) + f.write(line) + logger.info('write to %s' % output_trainset_path) + +with open(output_testset_path, 'w') as f: + for id, record in enumerate(dataset.test()): + dnn_input, lr_input, click = record + dnn_input = ids2dense(dnn_input, feature_dims['dnn_input']) + lr_input = ids2sparse(lr_input) + line = "%s\t%s\t%d\n" % (' '.join(map(str, dnn_input)), + ' '.join(map(str, lr_input)), click[0]) + f.write(line) + logger.info('write to %s' % output_testset_path) + +with open(output_infer_path, 'w') as f: + for id, record in enumerate(dataset.infer()): + dnn_input, lr_input = record + dnn_input = ids2dense(dnn_input, feature_dims['dnn_input']) + lr_input = ids2sparse(lr_input) + line = "%s\t%s\n" % (' '.join(map(str, dnn_input)), + ' '.join(map(str, lr_input)), ) + f.write(line) + if id > args.test_set_size: + break + logger.info('write to %s' % output_infer_path) + +with open(output_meta_path, 'w') as f: + lines = [ + "dnn_input_dim: %d" % feature_dims['dnn_input'], + "lr_input_dim: %d" % feature_dims['lr_input'] + ] + f.write('\n'.join(lines)) + logger.info('write data meta into %s' % output_meta_path) diff --git a/ctr/data_provider.py b/ctr/data_provider.py deleted file mode 100644 index f02d3d33e75163cf772921ef54729a3fc8da022b..0000000000000000000000000000000000000000 --- a/ctr/data_provider.py +++ /dev/null @@ -1,277 +0,0 @@ -import sys -import csv -import numpy as np -''' -The fields of the dataset are: - - 0. id: ad identifier - 1. click: 0/1 for non-click/click - 2. hour: format is YYMMDDHH, so 14091123 means 23:00 on Sept. 11, 2014 UTC. - 3. C1 -- anonymized categorical variable - 4. banner_pos - 5. site_id - 6. site_domain - 7. site_category - 8. app_id - 9. app_domain - 10. app_category - 11. device_id - 12. device_ip - 13. device_model - 14. device_type - 15. device_conn_type - 16. C14-C21 -- anonymized categorical variables - -We will treat following fields as categorical features: - - - C1 - - banner_pos - - site_category - - app_category - - device_type - - device_conn_type - -and some other features as id features: - - - id - - site_id - - app_id - - device_id - -The `hour` field will be treated as a continuous feature and will be transformed -to one-hot representation which has 24 bits. -''' - -feature_dims = {} - -categorial_features = ('C1 banner_pos site_category app_category ' + - 'device_type device_conn_type').split() - -id_features = 'id site_id app_id device_id _device_id_cross_site_id'.split() - - -def get_all_field_names(mode=0): - ''' - @mode: int - 0 for train, 1 for test - @return: list of str - ''' - return categorial_features + ['hour'] + id_features + ['click'] \ - if mode == 0 else [] - - -class CategoryFeatureGenerator(object): - ''' - Generator category features. - - Register all records by calling `register` first, then call `gen` to generate - one-hot representation for a record. - ''' - - def __init__(self): - self.dic = {'unk': 0} - self.counter = 1 - - def register(self, key): - ''' - Register record. - ''' - if key not in self.dic: - self.dic[key] = self.counter - self.counter += 1 - - def size(self): - return len(self.dic) - - def gen(self, key): - ''' - Generate one-hot representation for a record. - ''' - if key not in self.dic: - res = self.dic['unk'] - else: - res = self.dic[key] - return [res] - - def __repr__(self): - return '' % len(self.dic) - - -class IDfeatureGenerator(object): - def __init__(self, max_dim, cross_fea0=None, cross_fea1=None): - ''' - @max_dim: int - Size of the id elements' space - ''' - self.max_dim = max_dim - self.cross_fea0 = cross_fea0 - self.cross_fea1 = cross_fea1 - - def gen(self, key): - ''' - Generate one-hot representation for records - ''' - return [hash(key) % self.max_dim] - - def gen_cross_fea(self, fea1, fea2): - key = str(fea1) + str(fea2) - return self.gen(key) - - def size(self): - return self.max_dim - - -class ContinuousFeatureGenerator(object): - def __init__(self, n_intervals): - self.min = sys.maxint - self.max = sys.minint - self.n_intervals = n_intervals - - def register(self, val): - self.min = min(self.minint, val) - self.max = max(self.maxint, val) - - def gen(self, val): - self.len_part = (self.max - self.min) / self.n_intervals - return (val - self.min) / self.len_part - - -# init all feature generators -fields = {} -for key in categorial_features: - fields[key] = CategoryFeatureGenerator() -for key in id_features: - # for cross features - if 'cross' in key: - feas = key[1:].split('_cross_') - fields[key] = IDfeatureGenerator(10000000, *feas) - # for normal ID features - else: - fields[key] = IDfeatureGenerator(10000) - -# used as feed_dict in PaddlePaddle -field_index = dict((key, id) - for id, key in enumerate(['dnn_input', 'lr_input', 'click'])) - - -def detect_dataset(path, topn, id_fea_space=10000): - ''' - Parse the first `topn` records to collect meta information of this dataset. - - NOTE the records should be randomly shuffled first. - ''' - # create categorical statis objects. - - with open(path, 'rb') as csvfile: - reader = csv.DictReader(csvfile) - for row_id, row in enumerate(reader): - if row_id > topn: - break - - for key in categorial_features: - fields[key].register(row[key]) - - for key, item in fields.items(): - feature_dims[key] = item.size() - - #for key in id_features: - #feature_dims[key] = id_fea_space - - feature_dims['hour'] = 24 - feature_dims['click'] = 1 - - feature_dims['dnn_input'] = np.sum( - feature_dims[key] for key in categorial_features + ['hour']) + 1 - feature_dims['lr_input'] = np.sum(feature_dims[key] - for key in id_features) + 1 - - return feature_dims - - -def concat_sparse_vectors(inputs, dims): - ''' - Concaterate more than one sparse vectors into one. - - @inputs: list - list of sparse vector - @dims: list of int - dimention of each sparse vector - ''' - res = [] - assert len(inputs) == len(dims) - start = 0 - for no, vec in enumerate(inputs): - for v in vec: - res.append(v + start) - start += dims[no] - return res - - -class AvazuDataset(object): - ''' - Load AVAZU dataset as train set. - ''' - TRAIN_MODE = 0 - TEST_MODE = 1 - - def __init__(self, train_path, n_records_as_test=-1): - self.train_path = train_path - self.n_records_as_test = n_records_as_test - # task model: 0 train, 1 test - self.mode = 0 - - def train(self): - self.mode = self.TRAIN_MODE - return self._parse(self.train_path, skip_n_lines=self.n_records_as_test) - - def test(self): - self.mode = self.TEST_MODE - return self._parse(self.train_path, top_n_lines=self.n_records_as_test) - - def _parse(self, path, skip_n_lines=-1, top_n_lines=-1): - with open(path, 'rb') as csvfile: - reader = csv.DictReader(csvfile) - - categorial_dims = [ - feature_dims[key] for key in categorial_features + ['hour'] - ] - id_dims = [feature_dims[key] for key in id_features] - - for row_id, row in enumerate(reader): - if skip_n_lines > 0 and row_id < skip_n_lines: - continue - if top_n_lines > 0 and row_id > top_n_lines: - break - - record = [] - for key in categorial_features: - record.append(fields[key].gen(row[key])) - record.append([int(row['hour'][-2:])]) - dense_input = concat_sparse_vectors(record, categorial_dims) - - record = [] - for key in id_features: - if 'cross' not in key: - record.append(fields[key].gen(row[key])) - else: - fea0 = fields[key].cross_fea0 - fea1 = fields[key].cross_fea1 - record.append( - fields[key].gen_cross_fea(row[fea0], row[fea1])) - - sparse_input = concat_sparse_vectors(record, id_dims) - - record = [dense_input, sparse_input] - - record.append(list((int(row['click']), ))) - yield record - - -if __name__ == '__main__': - path = 'train.txt' - print detect_dataset(path, 400000) - - filereader = AvazuDataset(path) - for no, rcd in enumerate(filereader.train()): - print no, rcd - if no > 1000: break diff --git a/ctr/dataset.md b/ctr/dataset.md index dd6443d56adaf548d6c39458900c711c7f274def..16c0f9784bf3409ac5bbe704f932a9b28680fbf8 100644 --- a/ctr/dataset.md +++ b/ctr/dataset.md @@ -1,6 +1,13 @@ # 数据及处理 ## 数据集介绍 +本教程演示使用Kaggle上CTR任务的数据集\[[3](#参考文献)\]的预处理方法,最终产生本模型需要的格式,详细的数据格式参考[README.md](./README.md)。 + +Wide && Deep Model\[[2](#参考文献)\]的优势是融合稠密特征和大规模稀疏特征, +因此特征处理方面也针对稠密和稀疏两种特征作处理, +其中Deep部分的稠密值全部转化为ID类特征, +通过embedding 来转化为稠密的向量输入;Wide部分主要通过ID的叉乘提升维度。 + 数据集使用 `csv` 格式存储,其中各个字段内容如下: - `id` : ad identifier diff --git a/ctr/index.html b/ctr/index.html index ff0c5d9b19ec046b61f7f38d6eb9e70dff33e1ec..4c00a9959da036350995690419c6459116143915 100644 --- a/ctr/index.html +++ b/ctr/index.html @@ -42,6 +42,21 @@