diff --git a/ClickThroughRate/WideDeepLearning/README.md b/ClickThroughRate/WideDeepLearning/README.md new file mode 100644 index 0000000000000000000000000000000000000000..cd64ac9a81386acea987c6b5a977d6e6db3bfb8f --- /dev/null +++ b/ClickThroughRate/WideDeepLearning/README.md @@ -0,0 +1,45 @@ +The main different between `wdl_train_eval.py` and `wdl_train_eval_test.py` is: +`wdl_train_eval_test.py` is a end to end process of n-epoch training with training dataset, evaluation with full eval dataset after training of every epoch and testing with test dataset at the last stage. The main training loop is `epoch`. + +Otherwise, in `wdl_train_eval.py`, the main training loop is `iteration`. Only evaluate 20 samples a time but not full eval dataset. and no test stage. + +## Run OneFlow-WDL with train and evaluation +``` +EMBD_SIZE=1603616 +DATA_ROOT=/DATA/disk1/criteo_wdl/ofrecord +python3 wdl_train_eval.py \ + --train_data_dir $DATA_ROOT/train \ + --train_data_part_num 256 \ + --train_part_name_suffix_length=5 \ + --eval_data_dir $DATA_ROOT/val \ + --eval_data_part_num 256 \ + --max_iter=300000 \ + --loss_print_every_n_iter=1000 \ + --eval_interval=1000 \ + --batch_size=512 \ + --wide_vocab_size=$EMBD_SIZE \ + --deep_vocab_size=$EMBD_SIZE \ + --gpu_num 1 +``` + +## Run OneFlow-WDL with train, evaluation and test +``` +EMBD_SIZE=1603616 +DATA_ROOT=/DATA/disk1/criteo_wdl/ofrecord +python3 wdl_train_eval_test.py \ + --train_data_dir $DATA_ROOT/train \ + --train_data_part_num 256 \ + --train_part_name_suffix_length=5 \ + --eval_data_dir $DATA_ROOT/val \ + --eval_data_part_num 256 \ + --eval_part_name_suffix_length=5 \ + --test_data_dir $DATA_ROOT/test \ + --test_data_part_num 256 \ + --test_part_name_suffix_length=5 \ + --loss_print_every_n_iter=1000 \ + --batch_size=16484 \ + --wide_vocab_size=$EMBD_SIZE \ + --deep_vocab_size=$EMBD_SIZE \ + --gpu_num 1 +``` + diff --git a/ClickThroughRate/WideDeepLearning/how_to_make_ofrecord_for_wdl.md b/ClickThroughRate/WideDeepLearning/how_to_make_ofrecord_for_wdl.md new file mode 100644 index 0000000000000000000000000000000000000000..df1131ef940b4c098eed2dd1171d2c7a98adbc22 --- /dev/null +++ b/ClickThroughRate/WideDeepLearning/how_to_make_ofrecord_for_wdl.md @@ -0,0 +1,370 @@ +[TOC] + +[HugeCTR](https://github.com/NVIDIA/HugeCTR)是英伟达提供的一种高效的GPU框架,专为点击率(CTR)估计训练而设计。 + +OneFlow对标HugeCTR搭建了Wide & Deep 网络,本文介绍如何为该网络准备数据集。 + +Wide and Deep Learning (WDL) + +另1:本文作者没有太多spark和scala的相关经验。由于绝大多数数据集的处理方式是列操作,导致内存溢出频繁发生。本来想提供一个完整的工具,比如一个可执行的jar包,后来还是在Spark环境下交互式操作完成的这项工作。本文把交互的步骤罗列下来,中间有一些冗余的代码也没有处理,待后面整理。 +另2:整个处理过程最大内存消耗是170G,建议给Spark开192G以上的内存。 + +## 数据集及预处理 +数据由[CriteoLabs](http://labs.criteo.com/2014/02/kaggle-display-advertising-challenge-dataset/)提供。原始数据包括三个部分:一个标签列`labels`、13个整型特征`I列`、26个分类特征`C列`。数据处理后: +- `I列`转换为`dense_fields`; +- `C列`转换为`deep_sparse_fields`; +- `C列`中的`C1 C2`、`C3 C4`构成了交叉特征,形成了`wide_sparse_fields`。 + +数据经过处理后保存成`ofrecord`格式,结构如下: +``` +root + |-- deep_sparse_fields: array (nullable = true) + | |-- element: integer (containsNull = true) + |-- dense_fields: array (nullable = true) + | |-- element: float (containsNull = true) + |-- labels: integer (nullable = true) + |-- wide_sparse_fields: array (nullable = true) + | |-- element: integer (containsNull = true) + +``` +## step0 准备工作 +这一步主要是导入相关的库,并且准备一个临时目录。后面的很多步骤中都主动把中间结果保存到临时目录中,这样能够节省内存,而且方便中断恢复操作。 +``` +import org.apache.spark.sql.types._ +import org.apache.spark.sql.functions.{when, _} +import org.apache.spark.sql.DataFrame +import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, MinMaxScaler} +import org.apache.spark.ml.linalg._ + +import java.nio.file.{Files, Paths} +val tmp_dir = "/DATA/disk1/xuan/wdl_tmp" +Files.createDirectories(Paths.get(tmp_dir)) +``` +## step1 导入数据 +这一步中读入原始数据集,并根据需求做了如下操作: +1. 给读入的每一列命名[label, I1,...,I13, C1,...,C26] +2. 给每一条数据加上`id`,后面所有的表的合并操作都基于这个`id` +3. 将`I列`转换成整型 +4. `I列`和`C列`空白处补`NaN` +5. `features`是后面经常用到的DataFrame +``` +// load input file +var input = spark.read.options(Map("delimiter"->"\t")).csv("file:///DATA/disk1/xuan/train.shuf.bak") +// var input = spark.read.options(Map("delimiter"->"\t")).csv("file:///DATA/disk1/xuan/train.shuf.txt") + +// rename columns [label, I1,...,I13, C1,...,C26] +val NUM_INTEGER_COLUMNS = 13 +val NUM_CATEGORICAL_COLUMNS = 26 + +// val integer_cols = (1 to NUM_INTEGER_COLUMNS).map{id=>s"I$id"} +val integer_cols = (1 to NUM_INTEGER_COLUMNS).map{id=>$"I$id"} // note +val categorical_cols = (1 to NUM_CATEGORICAL_COLUMNS).map{id=>s"C$id"} +val feature_cols = integer_cols.map{c=>c.toString} ++ categorical_cols +val all_cols = (Seq(s"labels") ++ feature_cols) +input = input.toDF(all_cols: _*).withColumn("id", monotonically_increasing_id()) + +input = input.withColumn("labels", col("labels").cast(IntegerType)) +// cast integer columns to int +for(i <- 1 to NUM_INTEGER_COLUMNS) { + val col_name = s"I$i" + input = input.withColumn(col_name, col(col_name).cast(IntegerType)) +} + +// replace `null` with `NaN` +val features = input.na.fill(Int.MinValue, integer_cols.map{c=>c.toString}).na.fill("80000000", categorical_cols) + +// dump features as parquet format +val features_dir = tmp_dir ++ "/filled_features" +features.write.mode("overwrite").parquet(features_dir) +``` +Mem: 52.6G +duration: 1 min +## step2 处理整型特征生成`dense_fields` +需要两个步骤: +1. 循环处理每一个`I列`,编码映射后保存到临时文件夹; +2. 从临时文件夹中读取后转换成`dense_fields`并保存。 +### `I列`编码映射 +对于每一个整型特征: +- 计算每个特征值的频次 +- 频次小于6的特征值修改为NaN +- 特征编码 +- 进行normalize操作,或仅+1操作 +- 保存该列到临时文件夹 +``` +val features_dir = tmp_dir ++ "/filled_features" +val features = spark.read.parquet(features_dir) + +// integer features +println("create integer feature cols") +val normalize_dense = 1 +val nanValue = Int.MinValue +val getItem = udf((v: Vector, i: Int) => v(i).toFloat) +for(column_name <- integer_cols) { + val col_name = column_name.toString + println(col_name) + val col_index = col_name ++ "_index" + val uniqueValueCounts = features.groupBy(col_name).count() + val df = features.join(uniqueValueCounts, Seq(col_name)) + .withColumn(col_name, when(col("count") >= 6, col(col_name)).otherwise(nanValue)) + .select("id", col_name) + val indexedDf = new StringIndexer().setInputCol(col_name) + .setOutputCol(col_index) + .fit(df).transform(df) + .drop(col_name) // trick: drop col_name here and will be reused later + + var scaledDf = spark.emptyDataFrame + if (normalize_dense > 0) { + val assembler = new VectorAssembler().setInputCols(Array(col_index)).setOutputCol("vVec") + val df= assembler.transform(indexedDf) + scaledDf = new MinMaxScaler().setInputCol("vVec") + .setOutputCol(col_name) + .fit(df).transform(df) + .select("id", col_name) + } else { + scaledDf = indexedDf.withColumn(col_name, col(col_index) + lit(1)) // trick: reuse col_name + .select("id", col_name) + //.withColumn(col_name, col(col_index).cast(IntegerType)) + } + val col_dir = tmp_dir ++ "/" ++ col_name + scaledDf = scaledDf.withColumn(col_name, getItem(column_name, lit(0))) + scaledDf.write.mode("overwrite").parquet(col_dir) + scaledDf.printSchema +} +``` +Mem: 58.6G +duration: 3*13 ~= 40 min +### 合并所有`I列`形成`dense_fields` +- 从临时文件夹里分别读取各列,并合并到一个dataframe `df`里; +- 将`df`里的`I列`合并成`dense_fields`; +- 将`dense_fields`保存到临时文件夹。 +``` +val integer_cols = (1 to NUM_INTEGER_COLUMNS).map{id=>s"I$id"} +var df = features.select("id") +for(col_name <- integer_cols) { + println(col_name) + val df_col = spark.read.parquet(tmp_dir ++ "/" ++ col_name) + df = df.join(df_col, Seq("id")) +} +df = df.select($"id", array(integer_cols map col: _*).as("dense_fields")) +val parquet_dir = tmp_dir ++ "/parquet_dense" +df.write.mode("overwrite").parquet(parquet_dir) +``` +Mem: 110G +Duration: 3mins +## step3 处理分类特征和交叉特征并生成`deep_sparse_fields`和`wide_sparse_fields` +### 处理分类特征 +对于每一个分类特征: +- 计算每个特征值的频次 +- 频次小于6的特征值修改为NaN +- 特征编码 +- 编码后的值加offset +- 保存该列到临时文件夹 + +需要注意的是,offset的初始值设为1,而且offset是随着`C列`递增的,而且最后的offset还要继续被用到交叉特征里。 +``` +println("create categorical feature cols") +val nanValue = "80000000" +var offset: Long = 1 +for(col_name <- categorical_cols) { + println(col_name) + val col_index = col_name ++ "_index" + val uniqueValueCounts = features.groupBy(col_name).count() + val df = features.join(uniqueValueCounts, Seq(col_name)) + .withColumn(col_name, when(col("count") >= 6, col(col_name)).otherwise(nanValue)) + .select("id", col_name) + val indexedDf = new StringIndexer().setInputCol(col_name) + .setOutputCol(col_index) + .fit(df).transform(df).drop(col_name) + + val scaledDf = indexedDf.withColumn(col_name, col(col_index) + lit(offset)) + .withColumn(col_name, col(col_name).cast(IntegerType)) + .drop(col_index) + val hfCount = indexedDf.select(col_index).distinct.count() + println(offset, hfCount) + offset += hfCount + val col_dir = tmp_dir ++ "/" ++ col_name + scaledDf.write.mode("overwrite").parquet(col_dir) +} +``` +Mem: 110G +Time: 1 hour +下面是输出结果中代表某个`C列`的offset和独立值个数。下面这段不是程序脚本,是输出结果。 +``` +C1 +(1,1460) +C2 +(1461,558) +C3 +(2019,335378) +C4 +(337397,211710) +C5 +(549107,305) +C6 +(549412,20) +C7 +(549432,12136) +C8 +(561568,633) +C9 +(562201,3) +C10 +(562204,51298) +C11 +(613502,5302) +C12 +(618804,332600) +C13 +(951404,3179) +C14 +(954583,27) +C15 +(954610,12191) +C16 +(966801,301211) +C17 +(1268012,10) +C18 +(1268022,4841) +C19 +(1272863,2086) +C20 +(1274949,4) +C21 +(1274953,324273) +C22 +(1599226,17) +C23 +(1599243,15) +C24 +(1599258,79734) +C25 +(1678992,96) +C26 +(1679088,58622) +``` +### 生成交叉特征 +首先要注意:这一步依赖前面offset的值。 +- 交叉特征是由两个分类特征组合而成,组合方式是:col0 * col1_width + col1。 +- 组合后的值进行编码 +- 编码后保存到临时文件夹 +``` +import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, MinMaxScaler} +var offset: Long = 1737710 +println("create cross cols") +val feature_pairs = Array(Array("C1", "C2"), Array("C3", "C4")) +for(feature_pair <- feature_pairs) { + val col0 = feature_pair(0) + val col1 = feature_pair(1) + val df_col0 = spark.read.parquet(tmp_dir ++ "/" ++ col0) + val df_col1 = spark.read.parquet(tmp_dir ++ "/" ++ col1) + + val cross_col = col0 ++ "_" ++ col1 + val cross_col_index = cross_col ++ "_index" + println(cross_col) + + val min_max = df_col1.agg(min(col1), max(col1)).head() + val col1_width = min_max.getInt(1) - min_max.getInt(0) + 1 + val df = df_col0.withColumn(col0, col(col0) * lit(col1_width)) + .join(df_col1, Seq("id")) + .withColumn(cross_col, col(col0) + col(col1)) + .select("id", cross_col) + val indexedDf = new StringIndexer().setInputCol(cross_col) + .setOutputCol(cross_col_index) + .fit(df).transform(df).drop(cross_col) + + val scaledDf = indexedDf.withColumn(cross_col, col(cross_col_index).cast(IntegerType)) + .withColumn(cross_col, col(cross_col) + lit(offset)) + .drop(cross_col_index) + .withColumn(cross_col, col(cross_col).cast(IntegerType)) + + val hfCount = indexedDf.select(cross_col_index).distinct.count() + println(offset, hfCount) + offset += hfCount + val col_dir = tmp_dir ++ "/" ++ cross_col + scaledDf.write.mode("overwrite").parquet(col_dir) + scaledDf.show + scaledDf.printSchema +} +``` +Mem: 110G +Duration: 2mins +``` +(1737710,144108) +(1881818,447097) +``` +### 生成`deep_sparse_fields` +这段操作和形成`dense_fields`的方式相似,代码冗余。 +这一段要处理26个列,内存消耗极大(170G),速度到不是最慢的。如果数据集更大,或可采用每次合一列的方式。前面的`dense_fields`也可以采用这种方式,列为`TODO`吧。 +``` +val tmp_dir = "/DATA/disk1/xuan/wdl_tmp" +val features_dir = tmp_dir ++ "/filled_features" +val features = spark.read.parquet(features_dir) + +val NUM_CATEGORICAL_COLUMNS = 26 +val categorical_cols = (1 to NUM_CATEGORICAL_COLUMNS).map{id=>s"C$id"} + +var df = features.select("id") +for(col_name <- categorical_cols) { + println(col_name) + val df_col = spark.read.parquet(tmp_dir ++ "/" ++ col_name) + df = df.join(df_col, Seq("id")) +} +df = df.select($"id", array(categorical_cols map col: _*).as("deep_sparse_fields")) +val parquet_dir = tmp_dir ++ "/parquet_deep_sparse" +df.write.mode("overwrite").parquet(parquet_dir) +``` +Mem: 170G +Duration: 5min + +### 生成`wide_sparse_fields` +这段操作和形成`dense_fields`的方式相似,代码冗余。 +``` +val cross_pairs = Array("C1_C2", "C3_C4") +var df = features.select("id") +for(cross_pair <- cross_pairs) { + val df_col = spark.read.parquet(tmp_dir ++ "/" ++ cross_pair) + df = df.join(df_col, Seq("id")) +} +df = df.select($"id", array(cross_pairs map col: _*).as("wide_sparse_fields")) +val parquet_dir = tmp_dir ++ "/parquet_wide_sparse" +df.write.mode("overwrite").parquet(parquet_dir) +``` +Duration: 1min +## step4 合并所有字段 +``` +val fields = Array("dense", "deep_sparse", "wide_sparse") +var df = features.select("id", "labels") +for(field <- fields) { + val df_col = spark.read.parquet(tmp_dir ++ "/parquet_" ++ field) + df = df.join(df_col, Seq("id")) +} +val parquet_dir = tmp_dir ++ "/parquet_all" +df.write.mode("overwrite").parquet(parquet_dir) +``` +## step5 写入ofrecord +``` +val tmp_dir = "/DATA/disk1/xuan/wdl_tmp" +import org.oneflow.spark.functions._ +val parquet_dir = tmp_dir ++ "/parquet_all" +val df = spark.read.parquet(parquet_dir) + +val dfs = df.drop("id").randomSplit(Array(0.8, 0.1, 0.1)) + +val ofrecord_dir = tmp_dir ++ "/ofrecord/train" +dfs(0).repartition(256).write.mode("overwrite").ofrecord(ofrecord_dir) +dfs(0).count +sc.formatFilenameAsOneflowStyle(ofrecord_dir) + +val ofrecord_dir = tmp_dir ++ "/ofrecord/val" +dfs(1).repartition(256).write.mode("overwrite").ofrecord(ofrecord_dir) +dfs(1).count +sc.formatFilenameAsOneflowStyle(ofrecord_dir) + +val ofrecord_dir = tmp_dir ++ "/ofrecord/test" +dfs(2).repartition(256).write.mode("overwrite").ofrecord(ofrecord_dir) +dfs(2).count +sc.formatFilenameAsOneflowStyle(ofrecord_dir) +``` + + diff --git a/ClickThroughRate/WideDeepLearning/wdl_train_eval.py b/ClickThroughRate/WideDeepLearning/wdl_train_eval.py new file mode 100644 index 0000000000000000000000000000000000000000..f862c28d09f7f1f16c833ece4c2f6ea411de958a --- /dev/null +++ b/ClickThroughRate/WideDeepLearning/wdl_train_eval.py @@ -0,0 +1,222 @@ +import argparse +import oneflow as flow +import datetime +import os +import glob +from sklearn.metrics import roc_auc_score +import numpy as np +import time + +parser = argparse.ArgumentParser() +parser.add_argument('--train_data_dir', type=str, required=True) +parser.add_argument('--train_data_part_num', type=int, required=True) +parser.add_argument('--train_part_name_suffix_length', type=int, default=-1) +parser.add_argument('--eval_data_dir', type=str, required=True) +parser.add_argument('--eval_data_part_num', type=int, required=True) +parser.add_argument('--eval_part_name_suffix_length', type=int, default=-1) +parser.add_argument('--eval_batchs', type=int, default=20) +parser.add_argument('--eval_interval', type=int, default=1000) +parser.add_argument('--batch_size', type=int, default=16384) +parser.add_argument('--learning_rate', type=float, default=1e-3) +parser.add_argument('--wide_vocab_size', type=int, default=3200000) +parser.add_argument('--deep_vocab_size', type=int, default=3200000) +parser.add_argument('--deep_embedding_vec_size', type=int, default=16) +parser.add_argument('--deep_dropout_rate', type=float, default=0.5) +parser.add_argument('--num_dense_fields', type=int, default=13) +parser.add_argument('--num_wide_sparse_fields', type=int, default=2) +parser.add_argument('--num_deep_sparse_fields', type=int, default=26) +parser.add_argument('--max_iter', type=int, default=30000) +parser.add_argument('--loss_print_every_n_iter', type=int, default=100) +parser.add_argument('--gpu_num', type=int, default=8) +parser.add_argument('--hidden_units_num', type=int, default=7) +parser.add_argument('--hidden_size', type=int, default=1024) + +FLAGS = parser.parse_args() + +#DEEP_HIDDEN_UNITS = [1024, 1024]#, 1024, 1024, 1024, 1024, 1024] +DEEP_HIDDEN_UNITS = [FLAGS.hidden_size for i in range(FLAGS.hidden_units_num)] +print(DEEP_HIDDEN_UNITS) + + +def _raw_blob_conf(name, shape, data_type): + return flow.data.BlobConf(name=name, shape=shape, dtype=data_type, codec=flow.data.RawCodec()) + + +def _data_loader(data_dir, data_part_num, batch_size): + blob_conf = [ + _raw_blob_conf('labels', (1,), flow.int32), + _raw_blob_conf('dense_fields', (FLAGS.num_dense_fields,), flow.float), + _raw_blob_conf('wide_sparse_fields', (FLAGS.num_wide_sparse_fields,), flow.int32), + _raw_blob_conf('deep_sparse_fields', (FLAGS.num_deep_sparse_fields,), flow.int32) + ] + blobs = flow.data.decode_ofrecord( + data_dir, + blobs=blob_conf, + batch_size=batch_size, + name="decode", + data_part_num=data_part_num, + part_name_suffix_length=FLAGS.train_part_name_suffix_length, + ) + # copy to gpu + blobs = tuple(map(lambda blob: flow.identity(blob), blobs)) + return blobs + +def _data_loader_ofrecord_new(data_dir, data_part_num, batch_size, shuffle=True): + ofrecord = flow.data.ofrecord_reader(data_dir, + batch_size=batch_size, + data_part_num=data_part_num, + part_name_suffix_length=FLAGS.train_part_name_suffix_length, + random_shuffle=shuffle, + shuffle_after_epoch=shuffle) + labels = flow.data.OFRecordRawDecoder(ofrecord, "labels", shape=(1,), dtype=flow.int32) + dense_fields = flow.data.OFRecordRawDecoder(ofrecord, "dense_fields", shape=(FLAGS.num_dense_fields,), dtype=flow.float) + wide_sparse_fields = flow.data.OFRecordRawDecoder(ofrecord, "wide_sparse_fields", shape=(FLAGS.num_wide_sparse_fields,), dtype=flow.int32) + deep_sparse_fields = flow.data.OFRecordRawDecoder(ofrecord, "deep_sparse_fields", shape=(FLAGS.num_deep_sparse_fields,), dtype=flow.int32) + return flow.identity_n([labels, dense_fields, wide_sparse_fields, deep_sparse_fields]) + + +def _data_loader_onerec(data_dir, data_part_num, batch_size): + files = glob.glob(os.path.join(data_dir, '*.onerec')) + readdata = flow.data.onerec_reader(files=files, batch_size=batch_size) + labels = flow.data.onerec_decoder(readdata, key='labels', dtype=flow.int32, shape=(1,)) + dense_fields = flow.data.onerec_decoder(readdata, key='dense_fields', dtype=flow.float, shape=(FLAGS.num_dense_fields,)) + wide_sparse_fields = flow.data.onerec_decoder(readdata, key='wide_sparse_fields', dtype=flow.int32, shape=(FLAGS.num_wide_sparse_fields,)) + deep_sparse_fields = flow.data.onerec_decoder(readdata, key='deep_sparse_fields', dtype=flow.int32, shape=(FLAGS.num_deep_sparse_fields,)) + return flow.identity_n([labels, dense_fields, wide_sparse_fields, deep_sparse_fields]) + + +def _model(dense_fields, wide_sparse_fields, deep_sparse_fields): + wide_sparse_fields = flow.parallel_cast(wide_sparse_fields, distribute=flow.distribute.broadcast()) + wide_embedding_table = flow.get_variable( + name='wide_embedding', + shape=(FLAGS.wide_vocab_size, 1), + initializer=flow.random_uniform_initializer(minval=-0.05, maxval=0.05), + distribute=flow.distribute.split(0), + ) + wide_embedding = flow.gather(params=wide_embedding_table, indices=wide_sparse_fields) + wide_embedding = flow.reshape(wide_embedding, shape=(-1, wide_embedding.shape[-1] * wide_embedding.shape[-2])) + wide_scores = flow.math.reduce_sum(wide_embedding, axis=[1], keepdims=True) + wide_scores = flow.parallel_cast(wide_scores, distribute=flow.distribute.split(0), + gradient_distribute=flow.distribute.broadcast()) + + deep_sparse_fields = flow.parallel_cast(deep_sparse_fields, distribute=flow.distribute.broadcast()) + deep_embedding_table = flow.get_variable( + name='deep_embedding', + shape=(FLAGS.deep_vocab_size, FLAGS.deep_embedding_vec_size), + initializer=flow.random_uniform_initializer(minval=-0.05, maxval=0.05), + distribute=flow.distribute.split(1), + ) + deep_embedding = flow.gather(params=deep_embedding_table, indices=deep_sparse_fields) + deep_embedding = flow.parallel_cast(deep_embedding, distribute=flow.distribute.split(0), + gradient_distribute=flow.distribute.split(2)) + deep_embedding = flow.reshape(deep_embedding, shape=(-1, deep_embedding.shape[-1] * deep_embedding.shape[-2])) + deep_features = flow.concat([deep_embedding, dense_fields], axis=1) + for idx, units in enumerate(DEEP_HIDDEN_UNITS): + deep_features = flow.layers.dense( + deep_features, + units=units, + kernel_initializer=flow.glorot_uniform_initializer(), + bias_initializer=flow.constant_initializer(0.0), + activation=flow.math.relu, + name='fc' + str(idx + 1) + ) + deep_features = flow.nn.dropout(deep_features, rate=FLAGS.deep_dropout_rate) + deep_scores = flow.layers.dense( + deep_features, + units=1, + kernel_initializer=flow.glorot_uniform_initializer(), + bias_initializer=flow.constant_initializer(0.0), + name='fc' + str(len(DEEP_HIDDEN_UNITS) + 1) + ) + + scores = wide_scores + deep_scores + return scores + + +def _get_train_conf(): + train_conf = flow.FunctionConfig() + train_conf.default_data_type(flow.float) + train_conf.train.primary_lr(FLAGS.learning_rate) + train_conf.train.model_update_conf({ + 'lazy_adam_conf': { + } + }) + train_conf.use_boxing_v2(True) + train_conf.default_distribute_strategy(flow.distribute.consistent_strategy()) + train_conf.indexed_slices_optimizer_conf(dict(include_op_names=dict(op_name=['wide_embedding', 'deep_embedding']))) + return train_conf + +def _get_eval_conf(): + eval_conf = flow.FunctionConfig() + eval_conf.default_data_type(flow.float) + eval_conf.default_distribute_strategy(flow.distribute.consistent_strategy()) + return eval_conf + + +global_loss = 0.0 +def _create_train_callback(step): + def nop(loss): + global global_loss + global_loss += loss.mean() + pass + + def print_loss(loss): + global global_loss + global_loss += loss.mean() + print(step+1, 'time', datetime.datetime.now(), 'loss', global_loss/FLAGS.loss_print_every_n_iter) + global_loss = 0.0 + + if (step + 1) % FLAGS.loss_print_every_n_iter == 0: + return print_loss + else: + return nop + +@flow.global_function(_get_train_conf()) +def train_job(): + labels, dense_fields, wide_sparse_fields, deep_sparse_fields = \ + _data_loader_ofrecord_new(data_dir=FLAGS.train_data_dir, + data_part_num=FLAGS.train_data_part_num, + batch_size=FLAGS.batch_size) + logits = _model(dense_fields, wide_sparse_fields, deep_sparse_fields) + loss = flow.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits) + flow.losses.add_loss(loss) + return loss + + +@flow.global_function(_get_eval_conf()) +def eval_job(): + labels, dense_fields, wide_sparse_fields, deep_sparse_fields = \ + _data_loader_ofrecord_new(data_dir=FLAGS.eval_data_dir, + data_part_num=FLAGS.eval_data_part_num, + batch_size=FLAGS.batch_size, + shuffle=False) + logits = _model(dense_fields, wide_sparse_fields, deep_sparse_fields) + loss = flow.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits) + predict = flow.math.sigmoid(logits) + return loss, predict, labels + +def main(): + flow.config.gpu_device_num(FLAGS.gpu_num) + #flow.config.enable_numa_aware_cuda_malloc_host(True) + #flow.config.collective_boxing.enable_fusion(False) + check_point = flow.train.CheckPoint() + check_point.init() + for i in range(FLAGS.max_iter): + train_job().async_get(_create_train_callback(i)) + if (i + 1 ) % FLAGS.eval_interval == 0: + labels = np.array([[0]]) + preds = np.array([[0]]) + cur_time = time.time() + eval_loss = 0.0 + for j in range(FLAGS.eval_batchs): + loss, pred, ref = eval_job().get() + label_ = ref.ndarray().astype(np.float32) + labels = np.concatenate((labels, label_), axis=0) + preds = np.concatenate((preds, pred.ndarray()), axis=0) + eval_loss += loss.mean() + auc = roc_auc_score(labels[1:], preds[1:]) + print(i+1, "eval_loss", eval_loss/FLAGS.eval_batchs, "eval_auc", auc) + + +if __name__ == '__main__': + main() diff --git a/ClickThroughRate/WideDeepLearning/wdl_train_eval_test.py b/ClickThroughRate/WideDeepLearning/wdl_train_eval_test.py new file mode 100644 index 0000000000000000000000000000000000000000..19973f07b84040ce2c79cfab26cd52548c0519aa --- /dev/null +++ b/ClickThroughRate/WideDeepLearning/wdl_train_eval_test.py @@ -0,0 +1,261 @@ +import argparse +import oneflow as flow +import datetime +import os +import glob +from sklearn.metrics import roc_auc_score +import numpy as np +import time + +parser = argparse.ArgumentParser() +parser.add_argument('--train_data_dir', type=str, required=True) +parser.add_argument('--train_data_part_num', type=int, required=True) +parser.add_argument('--train_part_name_suffix_length', type=int, default=-1) +parser.add_argument('--train_data_num', type=int, default=36674623) +parser.add_argument('--eval_data_dir', type=str, required=True) +parser.add_argument('--eval_data_part_num', type=int, required=True) +parser.add_argument('--eval_part_name_suffix_length', type=int, default=-1) +parser.add_argument('--eval_data_num', type=int, default=4583478) +parser.add_argument('--test_data_dir', type=str, required=True) +parser.add_argument('--test_data_part_num', type=int, required=True) +parser.add_argument('--test_part_name_suffix_length', type=int, default=-1) +parser.add_argument('--test_data_num', type=int, default=4582516) +parser.add_argument('--batch_size', type=int, default=16384) +parser.add_argument('--learning_rate', type=float, default=1e-3) +parser.add_argument('--wide_vocab_size', type=int, default=3200000) +parser.add_argument('--deep_vocab_size', type=int, default=3200000) +parser.add_argument('--deep_embedding_vec_size', type=int, default=16) +parser.add_argument('--deep_dropout_rate', type=float, default=0.5) +parser.add_argument('--num_dense_fields', type=int, default=13) +parser.add_argument('--num_wide_sparse_fields', type=int, default=2) +parser.add_argument('--num_deep_sparse_fields', type=int, default=26) +parser.add_argument('--epoch_num', type=int, default=4) +parser.add_argument('--loss_print_every_n_iter', type=int, default=100) +parser.add_argument('--gpu_num', type=int, default=8) +parser.add_argument('--hidden_units_num', type=int, default=7) +parser.add_argument('--hidden_size', type=int, default=1024) + +FLAGS = parser.parse_args() + +#DEEP_HIDDEN_UNITS = [1024, 1024]#, 1024, 1024, 1024, 1024, 1024] +DEEP_HIDDEN_UNITS = [FLAGS.hidden_size for i in range(FLAGS.hidden_units_num)] +print(DEEP_HIDDEN_UNITS) +train_epoch_size = FLAGS.train_data_num // FLAGS.batch_size + 1 +eval_epoch_size = FLAGS.eval_data_num // FLAGS.batch_size + 1 +test_epoch_size = FLAGS.test_data_num // FLAGS.batch_size + 1 + + +def _raw_blob_conf(name, shape, data_type): + return flow.data.BlobConf(name=name, shape=shape, dtype=data_type, codec=flow.data.RawCodec()) + + +def _data_loader(data_dir, data_part_num, batch_size): + blob_conf = [ + _raw_blob_conf('labels', (1,), flow.int32), + _raw_blob_conf('dense_fields', (FLAGS.num_dense_fields,), flow.float), + _raw_blob_conf('wide_sparse_fields', (FLAGS.num_wide_sparse_fields,), flow.int32), + _raw_blob_conf('deep_sparse_fields', (FLAGS.num_deep_sparse_fields,), flow.int32) + ] + blobs = flow.data.decode_ofrecord( + data_dir, + blobs=blob_conf, + batch_size=batch_size, + name="decode", + data_part_num=data_part_num, + part_name_suffix_length=FLAGS.train_part_name_suffix_length, + ) + # copy to gpu + blobs = tuple(map(lambda blob: flow.identity(blob), blobs)) + return blobs + +def _data_loader_ofrecord_new(data_dir, data_part_num, batch_size, part_name_suffix_length=-1, + shuffle=True): + ofrecord = flow.data.ofrecord_reader(data_dir, + batch_size=batch_size, + data_part_num=data_part_num, + part_name_suffix_length=part_name_suffix_length, + random_shuffle=shuffle, + shuffle_after_epoch=shuffle) + labels = flow.data.OFRecordRawDecoder(ofrecord, "labels", shape=(1,), dtype=flow.int32) + dense_fields = flow.data.OFRecordRawDecoder(ofrecord, "dense_fields", shape=(FLAGS.num_dense_fields,), dtype=flow.float) + wide_sparse_fields = flow.data.OFRecordRawDecoder(ofrecord, "wide_sparse_fields", shape=(FLAGS.num_wide_sparse_fields,), dtype=flow.int32) + deep_sparse_fields = flow.data.OFRecordRawDecoder(ofrecord, "deep_sparse_fields", shape=(FLAGS.num_deep_sparse_fields,), dtype=flow.int32) + return flow.identity_n([labels, dense_fields, wide_sparse_fields, deep_sparse_fields]) + + +def _data_loader_onerec(data_dir, data_part_num, batch_size): + files = glob.glob(os.path.join(data_dir, '*.onerec')) + readdata = flow.data.onerec_reader(files=files, batch_size=batch_size) + labels = flow.data.onerec_decoder(readdata, key='labels', dtype=flow.int32, shape=(1,)) + dense_fields = flow.data.onerec_decoder(readdata, key='dense_fields', dtype=flow.float, shape=(FLAGS.num_dense_fields,)) + wide_sparse_fields = flow.data.onerec_decoder(readdata, key='wide_sparse_fields', dtype=flow.int32, shape=(FLAGS.num_wide_sparse_fields,)) + deep_sparse_fields = flow.data.onerec_decoder(readdata, key='deep_sparse_fields', dtype=flow.int32, shape=(FLAGS.num_deep_sparse_fields,)) + return flow.identity_n([labels, dense_fields, wide_sparse_fields, deep_sparse_fields]) + + +def _model(dense_fields, wide_sparse_fields, deep_sparse_fields): + wide_sparse_fields = flow.parallel_cast(wide_sparse_fields, distribute=flow.distribute.broadcast()) + wide_embedding_table = flow.get_variable( + name='wide_embedding', + shape=(FLAGS.wide_vocab_size, 1), + initializer=flow.random_uniform_initializer(minval=-0.05, maxval=0.05), + distribute=flow.distribute.split(0), + ) + wide_embedding = flow.gather(params=wide_embedding_table, indices=wide_sparse_fields) + wide_embedding = flow.reshape(wide_embedding, shape=(-1, wide_embedding.shape[-1] * wide_embedding.shape[-2])) + wide_scores = flow.math.reduce_sum(wide_embedding, axis=[1], keepdims=True) + wide_scores = flow.parallel_cast(wide_scores, distribute=flow.distribute.split(0), + gradient_distribute=flow.distribute.broadcast()) + + deep_sparse_fields = flow.parallel_cast(deep_sparse_fields, distribute=flow.distribute.broadcast()) + deep_embedding_table = flow.get_variable( + name='deep_embedding', + shape=(FLAGS.deep_vocab_size, FLAGS.deep_embedding_vec_size), + initializer=flow.random_uniform_initializer(minval=-0.05, maxval=0.05), + distribute=flow.distribute.split(1), + ) + deep_embedding = flow.gather(params=deep_embedding_table, indices=deep_sparse_fields) + deep_embedding = flow.parallel_cast(deep_embedding, distribute=flow.distribute.split(0), + gradient_distribute=flow.distribute.split(2)) + deep_embedding = flow.reshape(deep_embedding, shape=(-1, deep_embedding.shape[-1] * deep_embedding.shape[-2])) + deep_features = flow.concat([deep_embedding, dense_fields], axis=1) + for idx, units in enumerate(DEEP_HIDDEN_UNITS): + deep_features = flow.layers.dense( + deep_features, + units=units, + kernel_initializer=flow.glorot_uniform_initializer(), + bias_initializer=flow.constant_initializer(0.0), + activation=flow.math.relu, + name='fc' + str(idx + 1) + ) + deep_features = flow.nn.dropout(deep_features, rate=FLAGS.deep_dropout_rate) + deep_scores = flow.layers.dense( + deep_features, + units=1, + kernel_initializer=flow.glorot_uniform_initializer(), + bias_initializer=flow.constant_initializer(0.0), + name='fc' + str(len(DEEP_HIDDEN_UNITS) + 1) + ) + + scores = wide_scores + deep_scores + return scores + + +def _get_train_conf(): + train_conf = flow.FunctionConfig() + train_conf.default_data_type(flow.float) + train_conf.train.primary_lr(FLAGS.learning_rate) + train_conf.train.model_update_conf({ + 'lazy_adam_conf': { + } + }) + train_conf.use_boxing_v2(True) + train_conf.default_distribute_strategy(flow.distribute.consistent_strategy()) + train_conf.indexed_slices_optimizer_conf(dict(include_op_names=dict(op_name=['wide_embedding', 'deep_embedding']))) + return train_conf + +def _get_eval_conf(): + eval_conf = flow.FunctionConfig() + eval_conf.default_data_type(flow.float) + eval_conf.default_distribute_strategy(flow.distribute.consistent_strategy()) + return eval_conf + + +global_loss = 0.0 +def _create_train_callback(epoch, step): + def nop(loss): + global global_loss + global_loss += loss.mean() + pass + + def print_loss(loss): + global global_loss + global_loss += loss.mean() + print(epoch, step+1, 'time', datetime.datetime.now(), 'loss', + global_loss/FLAGS.loss_print_every_n_iter) + global_loss = 0.0 + + if (step + 1) % FLAGS.loss_print_every_n_iter == 0: + return print_loss + else: + return nop + +@flow.global_function(_get_train_conf()) +def train_job(): + labels, dense_fields, wide_sparse_fields, deep_sparse_fields = \ + _data_loader_ofrecord_new(data_dir=FLAGS.train_data_dir, + data_part_num=FLAGS.train_data_part_num, + batch_size=FLAGS.batch_size, + part_name_suffix_length=FLAGS.train_part_name_suffix_length, + shuffle=True) + logits = _model(dense_fields, wide_sparse_fields, deep_sparse_fields) + loss = flow.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits) + flow.losses.add_loss(loss) + return loss + + +@flow.global_function(_get_eval_conf()) +def eval_job(): + labels, dense_fields, wide_sparse_fields, deep_sparse_fields = \ + _data_loader_ofrecord_new(data_dir=FLAGS.eval_data_dir, + data_part_num=FLAGS.eval_data_part_num, + batch_size=FLAGS.batch_size, + part_name_suffix_length=FLAGS.eval_part_name_suffix_length, + shuffle=False) + logits = _model(dense_fields, wide_sparse_fields, deep_sparse_fields) + loss = flow.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits) + predict = flow.math.sigmoid(logits) + return loss, predict, labels + +@flow.global_function(_get_eval_conf()) +def test_job(): + labels, dense_fields, wide_sparse_fields, deep_sparse_fields = \ + _data_loader_ofrecord_new(data_dir=FLAGS.test_data_dir, + data_part_num=FLAGS.test_data_part_num, + batch_size=FLAGS.batch_size, + part_name_suffix_length=FLAGS.test_part_name_suffix_length, + shuffle=False) + logits = _model(dense_fields, wide_sparse_fields, deep_sparse_fields) + loss = flow.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits) + predict = flow.math.sigmoid(logits) + return loss, predict, labels + +def main(): + flow.config.gpu_device_num(FLAGS.gpu_num) + #flow.config.enable_numa_aware_cuda_malloc_host(True) + #flow.config.collective_boxing.enable_fusion(False) + check_point = flow.train.CheckPoint() + check_point.init() + global global_loss + for epoch in range(FLAGS.epoch_num): + global_loss = 0.0 + for i in range(train_epoch_size): + train_job().async_get(_create_train_callback(epoch, i)) + + labels = np.array([[0]]) + preds = np.array([[0]]) + eval_loss = 0.0 + for i in range(eval_epoch_size): + loss, pred, ref = eval_job().get() + label_ = ref.ndarray().astype(np.float32) + labels = np.concatenate((labels, label_), axis=0) + preds = np.concatenate((preds, pred.ndarray()), axis=0) + eval_loss += loss.mean() + auc = roc_auc_score(labels[1:], preds[1:]) + print(epoch, "eval_loss", eval_loss/eval_epoch_size, "eval_auc", auc) + + labels = np.array([[0]]) + preds = np.array([[0]]) + eval_loss = 0.0 + for i in range(test_epoch_size): + loss, pred, ref = test_job().get() + label_ = ref.ndarray().astype(np.float32) + labels = np.concatenate((labels, label_), axis=0) + preds = np.concatenate((preds, pred.ndarray()), axis=0) + eval_loss += loss.mean() + auc = roc_auc_score(labels[1:], preds[1:]) + print("test_loss", eval_loss/test_epoch_size, "eval_auc", auc) + + +if __name__ == '__main__': + main()