# PaddleRec 自定义数据集及Reader ## dataset数据读取 为了能高速运行CTR模型的训练,我们使用`dataset`API进行高性能的IO,dataset是为多线程及全异步方式量身打造的数据读取方式,每个数据读取线程会与一个训练线程耦合,形成了多生产者-多消费者的模式,会极大的加速我们的模型训练。 如何在我们的训练中引入dataset读取方式呢?无需变更数据格式,只需在我们的训练代码中加入以下内容,便可达到媲美二进制读取的高效率,以下是一个比较完整的流程: ### 引入dataset 1. 通过工厂类`fluid.DatasetFactory()`创建一个dataset对象。 2. 将我们定义好的数据输入格式传给dataset,通过`dataset.set_use_var(inputs)`实现。 3. 指定我们的数据读取方式,由`dataset_generator.py`实现数据读取的规则,后面将会介绍读取规则的实现。 4. 指定数据读取的batch_size。 5. 指定数据读取的线程数,该线程数和训练线程应保持一致,两者为耦合的关系。 6. 指定dataset读取的训练文件的列表。 ```python def get_dataset(inputs, args) dataset = fluid.DatasetFactory().create_dataset() dataset.set_use_var(inputs) dataset.set_pipe_command("python dataset_generator.py") dataset.set_batch_size(args.batch_size) dataset.set_thread(int(args.cpu_num)) file_list = [ str(args.train_files_path) + "/%s" % x for x in os.listdir(args.train_files_path) ] logger.info("file list: {}".format(file_list)) return dataset, file_list ``` ### 如何指定数据读取规则 在上文我们提到了由`dataset_generator.py`实现具体的数据读取规则,那么,怎样为dataset创建数据读取的规则呢? 以下是`dataset_generator.py`的全部代码,具体流程如下: 1. 首先我们需要引入dataset的库,位于`paddle.fluid.incubate.data_generator`。 2. 声明一些在数据读取中会用到的变量,如示例代码中的`cont_min_`、`categorical_range_`等。 3. 创建一个子类,继承dataset的基类,基类有多种选择,如果是多种数据类型混合,并且需要转化为数值进行预处理的,建议使用`MultiSlotDataGenerator`;若已经完成了预处理并保存为数据文件,可以直接以`string`的方式进行读取,使用`MultiSlotStringDataGenerator`,能够进一步加速。在示例代码,我们继承并实现了名为`CriteoDataset`的dataset子类,使用`MultiSlotDataGenerator`方法。 4. 继承并实现基类中的`generate_sample`函数,逐行读取数据。该函数应返回一个可以迭代的reader方法(带有yield的函数不再是一个普通的函数,而是一个生成器generator,成为了可以迭代的对象,等价于一个数组、链表、文件、字符串etc.) 5. 在这个可以迭代的函数中,如示例代码中的`def reader()`,我们定义数据读取的逻辑。例如对以行为单位的数据进行截取,转换及预处理。 6. 最后,我们需要将数据整理为特定的格式,才能够被dataset正确读取,并灌入的训练的网络中。简单来说,数据的输出顺序与我们在网络中创建的`inputs`必须是严格一一对应的,并转换为类似字典的形式。在示例代码中,我们使用`zip`的方法将参数名与数值构成的元组组成了一个list,并将其yield输出。如果展开来看,我们输出的数据形如`[('dense_feature',[value]),('C1',[value]),('C2',[value]),...,('C26',[value]),('label',[value])]` ```python import paddle.fluid.incubate.data_generator as dg cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50] cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50] hash_dim_ = 1000001 continuous_range_ = range(1, 14) categorical_range_ = range(14, 40) class CriteoDataset(dg.MultiSlotDataGenerator): def generate_sample(self, line): def reader(): features = line.rstrip('\n').split('\t') dense_feature = [] sparse_feature = [] for idx in continuous_range_: if features[idx] == "": dense_feature.append(0.0) else: dense_feature.append( (float(features[idx]) - cont_min_[idx - 1]) / cont_diff_[idx - 1]) for idx in categorical_range_: sparse_feature.append( [hash(str(idx) + features[idx]) % hash_dim_]) label = [int(features[0])] process_line = dense_feature, sparse_feature, label feature_name = ["dense_feature"] for idx in categorical_range_: feature_name.append("C" + str(idx - 13)) feature_name.append("label") yield zip(feature_name, [dense_feature] + sparse_feature + [label]) return reader d = CriteoDataset() d.run_from_stdin() ``` ### 快速调试Dataset 我们可以脱离组网架构,单独验证Dataset的输出是否符合我们预期。使用命令 `cat 数据文件 | python dataset读取python文件`进行dataset代码的调试: ```bash cat train_data/part-0 | python dataset_generator.py ``` 输出的数据格式如下: ` dense_input:size ; dense_input:value ; sparse_input:size ; sparse_input:value ; ... ; sparse_input:size ; sparse_input:value ; label:size ; label:value ` 理想的输出为(截取了一个片段): ```bash ... 13 0.05 0.00663349917081 0.05 0.0 0.02159375 0.008 0.15 0.04 0.362 0.1 0.2 0.0 0.04 1 715353 1 817085 1 851010 1 833725 1 286835 1 948614 1 881652 1 507110 1 27346 1 646986 1 643076 1 200960 1 18464 1 202774 1 532679 1 729573 1 342789 1 562805 1 880474 1 984402 1 666449 1 26235 1 700326 1 452909 1 884722 1 787527 1 0 ... ``` >使用Dataset的一些注意事项 > - Dataset的基本原理:将数据print到缓存,再由C++端的代码实现读取,因此,我们不能在dataset的读取代码中,加入与数据读取无关的print信息,会导致C++端拿到错误的数据信息。 > - dataset目前只支持在`unbuntu`及`CentOS`等标准Linux环境下使用,在`Windows`及`Mac`下使用时,会产生预料之外的错误,请知悉。