提交 830601a4 编写于 作者: S ShawnXuan 提交者: GitHub

Merge pull request #29 from Oneflow-Inc/wdl_ctr

Wdl ctr
The main different between `wdl_train_eval.py` and `wdl_train_eval_test.py` is:
`wdl_train_eval_test.py` is a end to end process of n-epoch training with training dataset, evaluation with full eval dataset after training of every epoch and testing with test dataset at the last stage. The main training loop is `epoch`.
Otherwise, in `wdl_train_eval.py`, the main training loop is `iteration`. Only evaluate 20 samples a time but not full eval dataset. and no test stage.
## Run OneFlow-WDL with train and evaluation
```
EMBD_SIZE=1603616
DATA_ROOT=/DATA/disk1/criteo_wdl/ofrecord
python3 wdl_train_eval.py \
--train_data_dir $DATA_ROOT/train \
--train_data_part_num 256 \
--train_part_name_suffix_length=5 \
--eval_data_dir $DATA_ROOT/val \
--eval_data_part_num 256 \
--max_iter=300000 \
--loss_print_every_n_iter=1000 \
--eval_interval=1000 \
--batch_size=512 \
--wide_vocab_size=$EMBD_SIZE \
--deep_vocab_size=$EMBD_SIZE \
--gpu_num 1
```
## Run OneFlow-WDL with train, evaluation and test
```
EMBD_SIZE=1603616
DATA_ROOT=/DATA/disk1/criteo_wdl/ofrecord
python3 wdl_train_eval_test.py \
--train_data_dir $DATA_ROOT/train \
--train_data_part_num 256 \
--train_part_name_suffix_length=5 \
--eval_data_dir $DATA_ROOT/val \
--eval_data_part_num 256 \
--eval_part_name_suffix_length=5 \
--test_data_dir $DATA_ROOT/test \
--test_data_part_num 256 \
--test_part_name_suffix_length=5 \
--loss_print_every_n_iter=1000 \
--batch_size=16484 \
--wide_vocab_size=$EMBD_SIZE \
--deep_vocab_size=$EMBD_SIZE \
--gpu_num 1
```
[TOC]
[HugeCTR](https://github.com/NVIDIA/HugeCTR)是英伟达提供的一种高效的GPU框架,专为点击率(CTR)估计训练而设计。
OneFlow对标HugeCTR搭建了Wide & Deep 网络,本文介绍如何为该网络准备数据集。
Wide and Deep Learning (WDL)
另1:本文作者没有太多spark和scala的相关经验。由于绝大多数数据集的处理方式是列操作,导致内存溢出频繁发生。本来想提供一个完整的工具,比如一个可执行的jar包,后来还是在Spark环境下交互式操作完成的这项工作。本文把交互的步骤罗列下来,中间有一些冗余的代码也没有处理,待后面整理。
另2:整个处理过程最大内存消耗是170G,建议给Spark开192G以上的内存。
## 数据集及预处理
数据由[CriteoLabs](http://labs.criteo.com/2014/02/kaggle-display-advertising-challenge-dataset/)提供。原始数据包括三个部分:一个标签列`labels`、13个整型特征`I列`、26个分类特征`C列`。数据处理后:
- `I列`转换为`dense_fields`
- `C列`转换为`deep_sparse_fields`;
- `C列`中的`C1 C2``C3 C4`构成了交叉特征,形成了`wide_sparse_fields`
数据经过处理后保存成`ofrecord`格式,结构如下:
```
root
|-- deep_sparse_fields: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- dense_fields: array (nullable = true)
| |-- element: float (containsNull = true)
|-- labels: integer (nullable = true)
|-- wide_sparse_fields: array (nullable = true)
| |-- element: integer (containsNull = true)
```
## step0 准备工作
这一步主要是导入相关的库,并且准备一个临时目录。后面的很多步骤中都主动把中间结果保存到临时目录中,这样能够节省内存,而且方便中断恢复操作。
```
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.{when, _}
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, MinMaxScaler}
import org.apache.spark.ml.linalg._
import java.nio.file.{Files, Paths}
val tmp_dir = "/DATA/disk1/xuan/wdl_tmp"
Files.createDirectories(Paths.get(tmp_dir))
```
## step1 导入数据
这一步中读入原始数据集,并根据需求做了如下操作:
1. 给读入的每一列命名[label, I1,...,I13, C1,...,C26]
2. 给每一条数据加上`id`,后面所有的表的合并操作都基于这个`id`
3.`I列`转换成整型
4. `I列``C列`空白处补`NaN`
5. `features`是后面经常用到的DataFrame
```
// load input file
var input = spark.read.options(Map("delimiter"->"\t")).csv("file:///DATA/disk1/xuan/train.shuf.bak")
// var input = spark.read.options(Map("delimiter"->"\t")).csv("file:///DATA/disk1/xuan/train.shuf.txt")
// rename columns [label, I1,...,I13, C1,...,C26]
val NUM_INTEGER_COLUMNS = 13
val NUM_CATEGORICAL_COLUMNS = 26
// val integer_cols = (1 to NUM_INTEGER_COLUMNS).map{id=>s"I$id"}
val integer_cols = (1 to NUM_INTEGER_COLUMNS).map{id=>$"I$id"} // note
val categorical_cols = (1 to NUM_CATEGORICAL_COLUMNS).map{id=>s"C$id"}
val feature_cols = integer_cols.map{c=>c.toString} ++ categorical_cols
val all_cols = (Seq(s"labels") ++ feature_cols)
input = input.toDF(all_cols: _*).withColumn("id", monotonically_increasing_id())
input = input.withColumn("labels", col("labels").cast(IntegerType))
// cast integer columns to int
for(i <- 1 to NUM_INTEGER_COLUMNS) {
val col_name = s"I$i"
input = input.withColumn(col_name, col(col_name).cast(IntegerType))
}
// replace `null` with `NaN`
val features = input.na.fill(Int.MinValue, integer_cols.map{c=>c.toString}).na.fill("80000000", categorical_cols)
// dump features as parquet format
val features_dir = tmp_dir ++ "/filled_features"
features.write.mode("overwrite").parquet(features_dir)
```
Mem: 52.6G
duration: 1 min
## step2 处理整型特征生成`dense_fields`
需要两个步骤:
1. 循环处理每一个`I列`,编码映射后保存到临时文件夹;
2. 从临时文件夹中读取后转换成`dense_fields`并保存。
### `I列`编码映射
对于每一个整型特征:
- 计算每个特征值的频次
- 频次小于6的特征值修改为NaN
- 特征编码
- 进行normalize操作,或仅+1操作
- 保存该列到临时文件夹
```
val features_dir = tmp_dir ++ "/filled_features"
val features = spark.read.parquet(features_dir)
// integer features
println("create integer feature cols")
val normalize_dense = 1
val nanValue = Int.MinValue
val getItem = udf((v: Vector, i: Int) => v(i).toFloat)
for(column_name <- integer_cols) {
val col_name = column_name.toString
println(col_name)
val col_index = col_name ++ "_index"
val uniqueValueCounts = features.groupBy(col_name).count()
val df = features.join(uniqueValueCounts, Seq(col_name))
.withColumn(col_name, when(col("count") >= 6, col(col_name)).otherwise(nanValue))
.select("id", col_name)
val indexedDf = new StringIndexer().setInputCol(col_name)
.setOutputCol(col_index)
.fit(df).transform(df)
.drop(col_name) // trick: drop col_name here and will be reused later
var scaledDf = spark.emptyDataFrame
if (normalize_dense > 0) {
val assembler = new VectorAssembler().setInputCols(Array(col_index)).setOutputCol("vVec")
val df= assembler.transform(indexedDf)
scaledDf = new MinMaxScaler().setInputCol("vVec")
.setOutputCol(col_name)
.fit(df).transform(df)
.select("id", col_name)
} else {
scaledDf = indexedDf.withColumn(col_name, col(col_index) + lit(1)) // trick: reuse col_name
.select("id", col_name)
//.withColumn(col_name, col(col_index).cast(IntegerType))
}
val col_dir = tmp_dir ++ "/" ++ col_name
scaledDf = scaledDf.withColumn(col_name, getItem(column_name, lit(0)))
scaledDf.write.mode("overwrite").parquet(col_dir)
scaledDf.printSchema
}
```
Mem: 58.6G
duration: 3*13 ~= 40 min
### 合并所有`I列`形成`dense_fields`
- 从临时文件夹里分别读取各列,并合并到一个dataframe `df`里;
-`df`里的`I列`合并成`dense_fields`;
-`dense_fields`保存到临时文件夹。
```
val integer_cols = (1 to NUM_INTEGER_COLUMNS).map{id=>s"I$id"}
var df = features.select("id")
for(col_name <- integer_cols) {
println(col_name)
val df_col = spark.read.parquet(tmp_dir ++ "/" ++ col_name)
df = df.join(df_col, Seq("id"))
}
df = df.select($"id", array(integer_cols map col: _*).as("dense_fields"))
val parquet_dir = tmp_dir ++ "/parquet_dense"
df.write.mode("overwrite").parquet(parquet_dir)
```
Mem: 110G
Duration: 3mins
## step3 处理分类特征和交叉特征并生成`deep_sparse_fields`和`wide_sparse_fields`
### 处理分类特征
对于每一个分类特征:
- 计算每个特征值的频次
- 频次小于6的特征值修改为NaN
- 特征编码
- 编码后的值加offset
- 保存该列到临时文件夹
需要注意的是,offset的初始值设为1,而且offset是随着`C列`递增的,而且最后的offset还要继续被用到交叉特征里。
```
println("create categorical feature cols")
val nanValue = "80000000"
var offset: Long = 1
for(col_name <- categorical_cols) {
println(col_name)
val col_index = col_name ++ "_index"
val uniqueValueCounts = features.groupBy(col_name).count()
val df = features.join(uniqueValueCounts, Seq(col_name))
.withColumn(col_name, when(col("count") >= 6, col(col_name)).otherwise(nanValue))
.select("id", col_name)
val indexedDf = new StringIndexer().setInputCol(col_name)
.setOutputCol(col_index)
.fit(df).transform(df).drop(col_name)
val scaledDf = indexedDf.withColumn(col_name, col(col_index) + lit(offset))
.withColumn(col_name, col(col_name).cast(IntegerType))
.drop(col_index)
val hfCount = indexedDf.select(col_index).distinct.count()
println(offset, hfCount)
offset += hfCount
val col_dir = tmp_dir ++ "/" ++ col_name
scaledDf.write.mode("overwrite").parquet(col_dir)
}
```
Mem: 110G
Time: 1 hour
下面是输出结果中代表某个`C列`的offset和独立值个数。下面这段不是程序脚本,是输出结果。
```
C1
(1,1460)
C2
(1461,558)
C3
(2019,335378)
C4
(337397,211710)
C5
(549107,305)
C6
(549412,20)
C7
(549432,12136)
C8
(561568,633)
C9
(562201,3)
C10
(562204,51298)
C11
(613502,5302)
C12
(618804,332600)
C13
(951404,3179)
C14
(954583,27)
C15
(954610,12191)
C16
(966801,301211)
C17
(1268012,10)
C18
(1268022,4841)
C19
(1272863,2086)
C20
(1274949,4)
C21
(1274953,324273)
C22
(1599226,17)
C23
(1599243,15)
C24
(1599258,79734)
C25
(1678992,96)
C26
(1679088,58622)
```
### 生成交叉特征
首先要注意:这一步依赖前面offset的值。
- 交叉特征是由两个分类特征组合而成,组合方式是:col0 * col1_width + col1。
- 组合后的值进行编码
- 编码后保存到临时文件夹
```
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, MinMaxScaler}
var offset: Long = 1737710
println("create cross cols")
val feature_pairs = Array(Array("C1", "C2"), Array("C3", "C4"))
for(feature_pair <- feature_pairs) {
val col0 = feature_pair(0)
val col1 = feature_pair(1)
val df_col0 = spark.read.parquet(tmp_dir ++ "/" ++ col0)
val df_col1 = spark.read.parquet(tmp_dir ++ "/" ++ col1)
val cross_col = col0 ++ "_" ++ col1
val cross_col_index = cross_col ++ "_index"
println(cross_col)
val min_max = df_col1.agg(min(col1), max(col1)).head()
val col1_width = min_max.getInt(1) - min_max.getInt(0) + 1
val df = df_col0.withColumn(col0, col(col0) * lit(col1_width))
.join(df_col1, Seq("id"))
.withColumn(cross_col, col(col0) + col(col1))
.select("id", cross_col)
val indexedDf = new StringIndexer().setInputCol(cross_col)
.setOutputCol(cross_col_index)
.fit(df).transform(df).drop(cross_col)
val scaledDf = indexedDf.withColumn(cross_col, col(cross_col_index).cast(IntegerType))
.withColumn(cross_col, col(cross_col) + lit(offset))
.drop(cross_col_index)
.withColumn(cross_col, col(cross_col).cast(IntegerType))
val hfCount = indexedDf.select(cross_col_index).distinct.count()
println(offset, hfCount)
offset += hfCount
val col_dir = tmp_dir ++ "/" ++ cross_col
scaledDf.write.mode("overwrite").parquet(col_dir)
scaledDf.show
scaledDf.printSchema
}
```
Mem: 110G
Duration: 2mins
```
(1737710,144108)
(1881818,447097)
```
### 生成`deep_sparse_fields`
这段操作和形成`dense_fields`的方式相似,代码冗余。
这一段要处理26个列,内存消耗极大(170G),速度到不是最慢的。如果数据集更大,或可采用每次合一列的方式。前面的`dense_fields`也可以采用这种方式,列为`TODO`吧。
```
val tmp_dir = "/DATA/disk1/xuan/wdl_tmp"
val features_dir = tmp_dir ++ "/filled_features"
val features = spark.read.parquet(features_dir)
val NUM_CATEGORICAL_COLUMNS = 26
val categorical_cols = (1 to NUM_CATEGORICAL_COLUMNS).map{id=>s"C$id"}
var df = features.select("id")
for(col_name <- categorical_cols) {
println(col_name)
val df_col = spark.read.parquet(tmp_dir ++ "/" ++ col_name)
df = df.join(df_col, Seq("id"))
}
df = df.select($"id", array(categorical_cols map col: _*).as("deep_sparse_fields"))
val parquet_dir = tmp_dir ++ "/parquet_deep_sparse"
df.write.mode("overwrite").parquet(parquet_dir)
```
Mem: 170G
Duration: 5min
### 生成`wide_sparse_fields`
这段操作和形成`dense_fields`的方式相似,代码冗余。
```
val cross_pairs = Array("C1_C2", "C3_C4")
var df = features.select("id")
for(cross_pair <- cross_pairs) {
val df_col = spark.read.parquet(tmp_dir ++ "/" ++ cross_pair)
df = df.join(df_col, Seq("id"))
}
df = df.select($"id", array(cross_pairs map col: _*).as("wide_sparse_fields"))
val parquet_dir = tmp_dir ++ "/parquet_wide_sparse"
df.write.mode("overwrite").parquet(parquet_dir)
```
Duration: 1min
## step4 合并所有字段
```
val fields = Array("dense", "deep_sparse", "wide_sparse")
var df = features.select("id", "labels")
for(field <- fields) {
val df_col = spark.read.parquet(tmp_dir ++ "/parquet_" ++ field)
df = df.join(df_col, Seq("id"))
}
val parquet_dir = tmp_dir ++ "/parquet_all"
df.write.mode("overwrite").parquet(parquet_dir)
```
## step5 写入ofrecord
```
val tmp_dir = "/DATA/disk1/xuan/wdl_tmp"
import org.oneflow.spark.functions._
val parquet_dir = tmp_dir ++ "/parquet_all"
val df = spark.read.parquet(parquet_dir)
val dfs = df.drop("id").randomSplit(Array(0.8, 0.1, 0.1))
val ofrecord_dir = tmp_dir ++ "/ofrecord/train"
dfs(0).repartition(256).write.mode("overwrite").ofrecord(ofrecord_dir)
dfs(0).count
sc.formatFilenameAsOneflowStyle(ofrecord_dir)
val ofrecord_dir = tmp_dir ++ "/ofrecord/val"
dfs(1).repartition(256).write.mode("overwrite").ofrecord(ofrecord_dir)
dfs(1).count
sc.formatFilenameAsOneflowStyle(ofrecord_dir)
val ofrecord_dir = tmp_dir ++ "/ofrecord/test"
dfs(2).repartition(256).write.mode("overwrite").ofrecord(ofrecord_dir)
dfs(2).count
sc.formatFilenameAsOneflowStyle(ofrecord_dir)
```
import argparse
import oneflow as flow
import datetime
import os
import glob
from sklearn.metrics import roc_auc_score
import numpy as np
import time
parser = argparse.ArgumentParser()
parser.add_argument('--train_data_dir', type=str, required=True)
parser.add_argument('--train_data_part_num', type=int, required=True)
parser.add_argument('--train_part_name_suffix_length', type=int, default=-1)
parser.add_argument('--eval_data_dir', type=str, required=True)
parser.add_argument('--eval_data_part_num', type=int, required=True)
parser.add_argument('--eval_part_name_suffix_length', type=int, default=-1)
parser.add_argument('--eval_batchs', type=int, default=20)
parser.add_argument('--eval_interval', type=int, default=1000)
parser.add_argument('--batch_size', type=int, default=16384)
parser.add_argument('--learning_rate', type=float, default=1e-3)
parser.add_argument('--wide_vocab_size', type=int, default=3200000)
parser.add_argument('--deep_vocab_size', type=int, default=3200000)
parser.add_argument('--deep_embedding_vec_size', type=int, default=16)
parser.add_argument('--deep_dropout_rate', type=float, default=0.5)
parser.add_argument('--num_dense_fields', type=int, default=13)
parser.add_argument('--num_wide_sparse_fields', type=int, default=2)
parser.add_argument('--num_deep_sparse_fields', type=int, default=26)
parser.add_argument('--max_iter', type=int, default=30000)
parser.add_argument('--loss_print_every_n_iter', type=int, default=100)
parser.add_argument('--gpu_num', type=int, default=8)
parser.add_argument('--hidden_units_num', type=int, default=7)
parser.add_argument('--hidden_size', type=int, default=1024)
FLAGS = parser.parse_args()
#DEEP_HIDDEN_UNITS = [1024, 1024]#, 1024, 1024, 1024, 1024, 1024]
DEEP_HIDDEN_UNITS = [FLAGS.hidden_size for i in range(FLAGS.hidden_units_num)]
print(DEEP_HIDDEN_UNITS)
def _raw_blob_conf(name, shape, data_type):
return flow.data.BlobConf(name=name, shape=shape, dtype=data_type, codec=flow.data.RawCodec())
def _data_loader(data_dir, data_part_num, batch_size):
blob_conf = [
_raw_blob_conf('labels', (1,), flow.int32),
_raw_blob_conf('dense_fields', (FLAGS.num_dense_fields,), flow.float),
_raw_blob_conf('wide_sparse_fields', (FLAGS.num_wide_sparse_fields,), flow.int32),
_raw_blob_conf('deep_sparse_fields', (FLAGS.num_deep_sparse_fields,), flow.int32)
]
blobs = flow.data.decode_ofrecord(
data_dir,
blobs=blob_conf,
batch_size=batch_size,
name="decode",
data_part_num=data_part_num,
part_name_suffix_length=FLAGS.train_part_name_suffix_length,
)
# copy to gpu
blobs = tuple(map(lambda blob: flow.identity(blob), blobs))
return blobs
def _data_loader_ofrecord_new(data_dir, data_part_num, batch_size, shuffle=True):
ofrecord = flow.data.ofrecord_reader(data_dir,
batch_size=batch_size,
data_part_num=data_part_num,
part_name_suffix_length=FLAGS.train_part_name_suffix_length,
random_shuffle=shuffle,
shuffle_after_epoch=shuffle)
labels = flow.data.OFRecordRawDecoder(ofrecord, "labels", shape=(1,), dtype=flow.int32)
dense_fields = flow.data.OFRecordRawDecoder(ofrecord, "dense_fields", shape=(FLAGS.num_dense_fields,), dtype=flow.float)
wide_sparse_fields = flow.data.OFRecordRawDecoder(ofrecord, "wide_sparse_fields", shape=(FLAGS.num_wide_sparse_fields,), dtype=flow.int32)
deep_sparse_fields = flow.data.OFRecordRawDecoder(ofrecord, "deep_sparse_fields", shape=(FLAGS.num_deep_sparse_fields,), dtype=flow.int32)
return flow.identity_n([labels, dense_fields, wide_sparse_fields, deep_sparse_fields])
def _data_loader_onerec(data_dir, data_part_num, batch_size):
files = glob.glob(os.path.join(data_dir, '*.onerec'))
readdata = flow.data.onerec_reader(files=files, batch_size=batch_size)
labels = flow.data.onerec_decoder(readdata, key='labels', dtype=flow.int32, shape=(1,))
dense_fields = flow.data.onerec_decoder(readdata, key='dense_fields', dtype=flow.float, shape=(FLAGS.num_dense_fields,))
wide_sparse_fields = flow.data.onerec_decoder(readdata, key='wide_sparse_fields', dtype=flow.int32, shape=(FLAGS.num_wide_sparse_fields,))
deep_sparse_fields = flow.data.onerec_decoder(readdata, key='deep_sparse_fields', dtype=flow.int32, shape=(FLAGS.num_deep_sparse_fields,))
return flow.identity_n([labels, dense_fields, wide_sparse_fields, deep_sparse_fields])
def _model(dense_fields, wide_sparse_fields, deep_sparse_fields):
wide_sparse_fields = flow.parallel_cast(wide_sparse_fields, distribute=flow.distribute.broadcast())
wide_embedding_table = flow.get_variable(
name='wide_embedding',
shape=(FLAGS.wide_vocab_size, 1),
initializer=flow.random_uniform_initializer(minval=-0.05, maxval=0.05),
distribute=flow.distribute.split(0),
)
wide_embedding = flow.gather(params=wide_embedding_table, indices=wide_sparse_fields)
wide_embedding = flow.reshape(wide_embedding, shape=(-1, wide_embedding.shape[-1] * wide_embedding.shape[-2]))
wide_scores = flow.math.reduce_sum(wide_embedding, axis=[1], keepdims=True)
wide_scores = flow.parallel_cast(wide_scores, distribute=flow.distribute.split(0),
gradient_distribute=flow.distribute.broadcast())
deep_sparse_fields = flow.parallel_cast(deep_sparse_fields, distribute=flow.distribute.broadcast())
deep_embedding_table = flow.get_variable(
name='deep_embedding',
shape=(FLAGS.deep_vocab_size, FLAGS.deep_embedding_vec_size),
initializer=flow.random_uniform_initializer(minval=-0.05, maxval=0.05),
distribute=flow.distribute.split(1),
)
deep_embedding = flow.gather(params=deep_embedding_table, indices=deep_sparse_fields)
deep_embedding = flow.parallel_cast(deep_embedding, distribute=flow.distribute.split(0),
gradient_distribute=flow.distribute.split(2))
deep_embedding = flow.reshape(deep_embedding, shape=(-1, deep_embedding.shape[-1] * deep_embedding.shape[-2]))
deep_features = flow.concat([deep_embedding, dense_fields], axis=1)
for idx, units in enumerate(DEEP_HIDDEN_UNITS):
deep_features = flow.layers.dense(
deep_features,
units=units,
kernel_initializer=flow.glorot_uniform_initializer(),
bias_initializer=flow.constant_initializer(0.0),
activation=flow.math.relu,
name='fc' + str(idx + 1)
)
deep_features = flow.nn.dropout(deep_features, rate=FLAGS.deep_dropout_rate)
deep_scores = flow.layers.dense(
deep_features,
units=1,
kernel_initializer=flow.glorot_uniform_initializer(),
bias_initializer=flow.constant_initializer(0.0),
name='fc' + str(len(DEEP_HIDDEN_UNITS) + 1)
)
scores = wide_scores + deep_scores
return scores
def _get_train_conf():
train_conf = flow.FunctionConfig()
train_conf.default_data_type(flow.float)
train_conf.train.primary_lr(FLAGS.learning_rate)
train_conf.train.model_update_conf({
'lazy_adam_conf': {
}
})
train_conf.use_boxing_v2(True)
train_conf.default_distribute_strategy(flow.distribute.consistent_strategy())
train_conf.indexed_slices_optimizer_conf(dict(include_op_names=dict(op_name=['wide_embedding', 'deep_embedding'])))
return train_conf
def _get_eval_conf():
eval_conf = flow.FunctionConfig()
eval_conf.default_data_type(flow.float)
eval_conf.default_distribute_strategy(flow.distribute.consistent_strategy())
return eval_conf
global_loss = 0.0
def _create_train_callback(step):
def nop(loss):
global global_loss
global_loss += loss.mean()
pass
def print_loss(loss):
global global_loss
global_loss += loss.mean()
print(step+1, 'time', datetime.datetime.now(), 'loss', global_loss/FLAGS.loss_print_every_n_iter)
global_loss = 0.0
if (step + 1) % FLAGS.loss_print_every_n_iter == 0:
return print_loss
else:
return nop
@flow.global_function(_get_train_conf())
def train_job():
labels, dense_fields, wide_sparse_fields, deep_sparse_fields = \
_data_loader_ofrecord_new(data_dir=FLAGS.train_data_dir,
data_part_num=FLAGS.train_data_part_num,
batch_size=FLAGS.batch_size)
logits = _model(dense_fields, wide_sparse_fields, deep_sparse_fields)
loss = flow.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits)
flow.losses.add_loss(loss)
return loss
@flow.global_function(_get_eval_conf())
def eval_job():
labels, dense_fields, wide_sparse_fields, deep_sparse_fields = \
_data_loader_ofrecord_new(data_dir=FLAGS.eval_data_dir,
data_part_num=FLAGS.eval_data_part_num,
batch_size=FLAGS.batch_size,
shuffle=False)
logits = _model(dense_fields, wide_sparse_fields, deep_sparse_fields)
loss = flow.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits)
predict = flow.math.sigmoid(logits)
return loss, predict, labels
def main():
flow.config.gpu_device_num(FLAGS.gpu_num)
#flow.config.enable_numa_aware_cuda_malloc_host(True)
#flow.config.collective_boxing.enable_fusion(False)
check_point = flow.train.CheckPoint()
check_point.init()
for i in range(FLAGS.max_iter):
train_job().async_get(_create_train_callback(i))
if (i + 1 ) % FLAGS.eval_interval == 0:
labels = np.array([[0]])
preds = np.array([[0]])
cur_time = time.time()
eval_loss = 0.0
for j in range(FLAGS.eval_batchs):
loss, pred, ref = eval_job().get()
label_ = ref.ndarray().astype(np.float32)
labels = np.concatenate((labels, label_), axis=0)
preds = np.concatenate((preds, pred.ndarray()), axis=0)
eval_loss += loss.mean()
auc = roc_auc_score(labels[1:], preds[1:])
print(i+1, "eval_loss", eval_loss/FLAGS.eval_batchs, "eval_auc", auc)
if __name__ == '__main__':
main()
import argparse
import oneflow as flow
import datetime
import os
import glob
from sklearn.metrics import roc_auc_score
import numpy as np
import time
parser = argparse.ArgumentParser()
parser.add_argument('--train_data_dir', type=str, required=True)
parser.add_argument('--train_data_part_num', type=int, required=True)
parser.add_argument('--train_part_name_suffix_length', type=int, default=-1)
parser.add_argument('--train_data_num', type=int, default=36674623)
parser.add_argument('--eval_data_dir', type=str, required=True)
parser.add_argument('--eval_data_part_num', type=int, required=True)
parser.add_argument('--eval_part_name_suffix_length', type=int, default=-1)
parser.add_argument('--eval_data_num', type=int, default=4583478)
parser.add_argument('--test_data_dir', type=str, required=True)
parser.add_argument('--test_data_part_num', type=int, required=True)
parser.add_argument('--test_part_name_suffix_length', type=int, default=-1)
parser.add_argument('--test_data_num', type=int, default=4582516)
parser.add_argument('--batch_size', type=int, default=16384)
parser.add_argument('--learning_rate', type=float, default=1e-3)
parser.add_argument('--wide_vocab_size', type=int, default=3200000)
parser.add_argument('--deep_vocab_size', type=int, default=3200000)
parser.add_argument('--deep_embedding_vec_size', type=int, default=16)
parser.add_argument('--deep_dropout_rate', type=float, default=0.5)
parser.add_argument('--num_dense_fields', type=int, default=13)
parser.add_argument('--num_wide_sparse_fields', type=int, default=2)
parser.add_argument('--num_deep_sparse_fields', type=int, default=26)
parser.add_argument('--epoch_num', type=int, default=4)
parser.add_argument('--loss_print_every_n_iter', type=int, default=100)
parser.add_argument('--gpu_num', type=int, default=8)
parser.add_argument('--hidden_units_num', type=int, default=7)
parser.add_argument('--hidden_size', type=int, default=1024)
FLAGS = parser.parse_args()
#DEEP_HIDDEN_UNITS = [1024, 1024]#, 1024, 1024, 1024, 1024, 1024]
DEEP_HIDDEN_UNITS = [FLAGS.hidden_size for i in range(FLAGS.hidden_units_num)]
print(DEEP_HIDDEN_UNITS)
train_epoch_size = FLAGS.train_data_num // FLAGS.batch_size + 1
eval_epoch_size = FLAGS.eval_data_num // FLAGS.batch_size + 1
test_epoch_size = FLAGS.test_data_num // FLAGS.batch_size + 1
def _raw_blob_conf(name, shape, data_type):
return flow.data.BlobConf(name=name, shape=shape, dtype=data_type, codec=flow.data.RawCodec())
def _data_loader(data_dir, data_part_num, batch_size):
blob_conf = [
_raw_blob_conf('labels', (1,), flow.int32),
_raw_blob_conf('dense_fields', (FLAGS.num_dense_fields,), flow.float),
_raw_blob_conf('wide_sparse_fields', (FLAGS.num_wide_sparse_fields,), flow.int32),
_raw_blob_conf('deep_sparse_fields', (FLAGS.num_deep_sparse_fields,), flow.int32)
]
blobs = flow.data.decode_ofrecord(
data_dir,
blobs=blob_conf,
batch_size=batch_size,
name="decode",
data_part_num=data_part_num,
part_name_suffix_length=FLAGS.train_part_name_suffix_length,
)
# copy to gpu
blobs = tuple(map(lambda blob: flow.identity(blob), blobs))
return blobs
def _data_loader_ofrecord_new(data_dir, data_part_num, batch_size, part_name_suffix_length=-1,
shuffle=True):
ofrecord = flow.data.ofrecord_reader(data_dir,
batch_size=batch_size,
data_part_num=data_part_num,
part_name_suffix_length=part_name_suffix_length,
random_shuffle=shuffle,
shuffle_after_epoch=shuffle)
labels = flow.data.OFRecordRawDecoder(ofrecord, "labels", shape=(1,), dtype=flow.int32)
dense_fields = flow.data.OFRecordRawDecoder(ofrecord, "dense_fields", shape=(FLAGS.num_dense_fields,), dtype=flow.float)
wide_sparse_fields = flow.data.OFRecordRawDecoder(ofrecord, "wide_sparse_fields", shape=(FLAGS.num_wide_sparse_fields,), dtype=flow.int32)
deep_sparse_fields = flow.data.OFRecordRawDecoder(ofrecord, "deep_sparse_fields", shape=(FLAGS.num_deep_sparse_fields,), dtype=flow.int32)
return flow.identity_n([labels, dense_fields, wide_sparse_fields, deep_sparse_fields])
def _data_loader_onerec(data_dir, data_part_num, batch_size):
files = glob.glob(os.path.join(data_dir, '*.onerec'))
readdata = flow.data.onerec_reader(files=files, batch_size=batch_size)
labels = flow.data.onerec_decoder(readdata, key='labels', dtype=flow.int32, shape=(1,))
dense_fields = flow.data.onerec_decoder(readdata, key='dense_fields', dtype=flow.float, shape=(FLAGS.num_dense_fields,))
wide_sparse_fields = flow.data.onerec_decoder(readdata, key='wide_sparse_fields', dtype=flow.int32, shape=(FLAGS.num_wide_sparse_fields,))
deep_sparse_fields = flow.data.onerec_decoder(readdata, key='deep_sparse_fields', dtype=flow.int32, shape=(FLAGS.num_deep_sparse_fields,))
return flow.identity_n([labels, dense_fields, wide_sparse_fields, deep_sparse_fields])
def _model(dense_fields, wide_sparse_fields, deep_sparse_fields):
wide_sparse_fields = flow.parallel_cast(wide_sparse_fields, distribute=flow.distribute.broadcast())
wide_embedding_table = flow.get_variable(
name='wide_embedding',
shape=(FLAGS.wide_vocab_size, 1),
initializer=flow.random_uniform_initializer(minval=-0.05, maxval=0.05),
distribute=flow.distribute.split(0),
)
wide_embedding = flow.gather(params=wide_embedding_table, indices=wide_sparse_fields)
wide_embedding = flow.reshape(wide_embedding, shape=(-1, wide_embedding.shape[-1] * wide_embedding.shape[-2]))
wide_scores = flow.math.reduce_sum(wide_embedding, axis=[1], keepdims=True)
wide_scores = flow.parallel_cast(wide_scores, distribute=flow.distribute.split(0),
gradient_distribute=flow.distribute.broadcast())
deep_sparse_fields = flow.parallel_cast(deep_sparse_fields, distribute=flow.distribute.broadcast())
deep_embedding_table = flow.get_variable(
name='deep_embedding',
shape=(FLAGS.deep_vocab_size, FLAGS.deep_embedding_vec_size),
initializer=flow.random_uniform_initializer(minval=-0.05, maxval=0.05),
distribute=flow.distribute.split(1),
)
deep_embedding = flow.gather(params=deep_embedding_table, indices=deep_sparse_fields)
deep_embedding = flow.parallel_cast(deep_embedding, distribute=flow.distribute.split(0),
gradient_distribute=flow.distribute.split(2))
deep_embedding = flow.reshape(deep_embedding, shape=(-1, deep_embedding.shape[-1] * deep_embedding.shape[-2]))
deep_features = flow.concat([deep_embedding, dense_fields], axis=1)
for idx, units in enumerate(DEEP_HIDDEN_UNITS):
deep_features = flow.layers.dense(
deep_features,
units=units,
kernel_initializer=flow.glorot_uniform_initializer(),
bias_initializer=flow.constant_initializer(0.0),
activation=flow.math.relu,
name='fc' + str(idx + 1)
)
deep_features = flow.nn.dropout(deep_features, rate=FLAGS.deep_dropout_rate)
deep_scores = flow.layers.dense(
deep_features,
units=1,
kernel_initializer=flow.glorot_uniform_initializer(),
bias_initializer=flow.constant_initializer(0.0),
name='fc' + str(len(DEEP_HIDDEN_UNITS) + 1)
)
scores = wide_scores + deep_scores
return scores
def _get_train_conf():
train_conf = flow.FunctionConfig()
train_conf.default_data_type(flow.float)
train_conf.train.primary_lr(FLAGS.learning_rate)
train_conf.train.model_update_conf({
'lazy_adam_conf': {
}
})
train_conf.use_boxing_v2(True)
train_conf.default_distribute_strategy(flow.distribute.consistent_strategy())
train_conf.indexed_slices_optimizer_conf(dict(include_op_names=dict(op_name=['wide_embedding', 'deep_embedding'])))
return train_conf
def _get_eval_conf():
eval_conf = flow.FunctionConfig()
eval_conf.default_data_type(flow.float)
eval_conf.default_distribute_strategy(flow.distribute.consistent_strategy())
return eval_conf
global_loss = 0.0
def _create_train_callback(epoch, step):
def nop(loss):
global global_loss
global_loss += loss.mean()
pass
def print_loss(loss):
global global_loss
global_loss += loss.mean()
print(epoch, step+1, 'time', datetime.datetime.now(), 'loss',
global_loss/FLAGS.loss_print_every_n_iter)
global_loss = 0.0
if (step + 1) % FLAGS.loss_print_every_n_iter == 0:
return print_loss
else:
return nop
@flow.global_function(_get_train_conf())
def train_job():
labels, dense_fields, wide_sparse_fields, deep_sparse_fields = \
_data_loader_ofrecord_new(data_dir=FLAGS.train_data_dir,
data_part_num=FLAGS.train_data_part_num,
batch_size=FLAGS.batch_size,
part_name_suffix_length=FLAGS.train_part_name_suffix_length,
shuffle=True)
logits = _model(dense_fields, wide_sparse_fields, deep_sparse_fields)
loss = flow.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits)
flow.losses.add_loss(loss)
return loss
@flow.global_function(_get_eval_conf())
def eval_job():
labels, dense_fields, wide_sparse_fields, deep_sparse_fields = \
_data_loader_ofrecord_new(data_dir=FLAGS.eval_data_dir,
data_part_num=FLAGS.eval_data_part_num,
batch_size=FLAGS.batch_size,
part_name_suffix_length=FLAGS.eval_part_name_suffix_length,
shuffle=False)
logits = _model(dense_fields, wide_sparse_fields, deep_sparse_fields)
loss = flow.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits)
predict = flow.math.sigmoid(logits)
return loss, predict, labels
@flow.global_function(_get_eval_conf())
def test_job():
labels, dense_fields, wide_sparse_fields, deep_sparse_fields = \
_data_loader_ofrecord_new(data_dir=FLAGS.test_data_dir,
data_part_num=FLAGS.test_data_part_num,
batch_size=FLAGS.batch_size,
part_name_suffix_length=FLAGS.test_part_name_suffix_length,
shuffle=False)
logits = _model(dense_fields, wide_sparse_fields, deep_sparse_fields)
loss = flow.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits)
predict = flow.math.sigmoid(logits)
return loss, predict, labels
def main():
flow.config.gpu_device_num(FLAGS.gpu_num)
#flow.config.enable_numa_aware_cuda_malloc_host(True)
#flow.config.collective_boxing.enable_fusion(False)
check_point = flow.train.CheckPoint()
check_point.init()
global global_loss
for epoch in range(FLAGS.epoch_num):
global_loss = 0.0
for i in range(train_epoch_size):
train_job().async_get(_create_train_callback(epoch, i))
labels = np.array([[0]])
preds = np.array([[0]])
eval_loss = 0.0
for i in range(eval_epoch_size):
loss, pred, ref = eval_job().get()
label_ = ref.ndarray().astype(np.float32)
labels = np.concatenate((labels, label_), axis=0)
preds = np.concatenate((preds, pred.ndarray()), axis=0)
eval_loss += loss.mean()
auc = roc_auc_score(labels[1:], preds[1:])
print(epoch, "eval_loss", eval_loss/eval_epoch_size, "eval_auc", auc)
labels = np.array([[0]])
preds = np.array([[0]])
eval_loss = 0.0
for i in range(test_epoch_size):
loss, pred, ref = test_job().get()
label_ = ref.ndarray().astype(np.float32)
labels = np.concatenate((labels, label_), axis=0)
preds = np.concatenate((preds, pred.ndarray()), axis=0)
eval_loss += loss.mean()
auc = roc_auc_score(labels[1:], preds[1:])
print("test_loss", eval_loss/test_epoch_size, "eval_auc", auc)
if __name__ == '__main__':
main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册