diff --git a/ClickThroughRate/WideDeepLearning/train_eval.sh b/ClickThroughRate/WideDeepLearning/train_eval.sh new file mode 100755 index 0000000000000000000000000000000000000000..045bb5bde5775082fcce0430f6a9a2953e9ab59d --- /dev/null +++ b/ClickThroughRate/WideDeepLearning/train_eval.sh @@ -0,0 +1,20 @@ +rm -rf core.* +export ENABLE_USER_OP=True + +#EMBD_SIZE=3200000 +EMBD_SIZE=1603616 +DATA_ROOT=/DATA/disk1/criteo_wdl/ofrecord +python3 wdl_train_eval.py \ + --train_data_dir $DATA_ROOT/train \ + --train_data_part_num 256 \ + --train_part_name_suffix_length=5 \ + --eval_data_dir $DATA_ROOT/val \ + --eval_data_part_num 256 \ + --max_iter=300000 \ + --loss_print_every_n_iter=1000 \ + --eval_interval=1000 \ + --batch_size=512 \ + --wide_vocab_size=$EMBD_SIZE \ + --deep_vocab_size=$EMBD_SIZE \ + --gpu_num 1 + diff --git a/ClickThroughRate/WideDeepLearning/train_eval_test.sh b/ClickThroughRate/WideDeepLearning/train_eval_test.sh new file mode 100755 index 0000000000000000000000000000000000000000..27063eac9e30da23d4def19f3c641c590e7ad4d5 --- /dev/null +++ b/ClickThroughRate/WideDeepLearning/train_eval_test.sh @@ -0,0 +1,22 @@ +rm -rf core.* +export ENABLE_USER_OP=True + +#EMBD_SIZE=3200000 +EMBD_SIZE=1603616 +DATA_ROOT=/DATA/disk1/criteo_wdl/ofrecord +python3 wdl_train_eval_test.py \ + --train_data_dir $DATA_ROOT/train \ + --train_data_part_num 256 \ + --train_part_name_suffix_length=5 \ + --eval_data_dir $DATA_ROOT/val \ + --eval_data_part_num 256 \ + --eval_part_name_suffix_length=5 \ + --test_data_dir $DATA_ROOT/test \ + --test_data_part_num 256 \ + --test_part_name_suffix_length=5 \ + --loss_print_every_n_iter=1000 \ + --batch_size=16484 \ + --wide_vocab_size=$EMBD_SIZE \ + --deep_vocab_size=$EMBD_SIZE \ + --gpu_num 1 + diff --git a/ClickThroughRate/WideDeepLearning/wdl_train_eval.py b/ClickThroughRate/WideDeepLearning/wdl_train_eval.py new file mode 100644 index 0000000000000000000000000000000000000000..f862c28d09f7f1f16c833ece4c2f6ea411de958a --- /dev/null +++ b/ClickThroughRate/WideDeepLearning/wdl_train_eval.py @@ -0,0 +1,222 @@ +import argparse +import oneflow as flow +import datetime +import os +import glob +from sklearn.metrics import roc_auc_score +import numpy as np +import time + +parser = argparse.ArgumentParser() +parser.add_argument('--train_data_dir', type=str, required=True) +parser.add_argument('--train_data_part_num', type=int, required=True) +parser.add_argument('--train_part_name_suffix_length', type=int, default=-1) +parser.add_argument('--eval_data_dir', type=str, required=True) +parser.add_argument('--eval_data_part_num', type=int, required=True) +parser.add_argument('--eval_part_name_suffix_length', type=int, default=-1) +parser.add_argument('--eval_batchs', type=int, default=20) +parser.add_argument('--eval_interval', type=int, default=1000) +parser.add_argument('--batch_size', type=int, default=16384) +parser.add_argument('--learning_rate', type=float, default=1e-3) +parser.add_argument('--wide_vocab_size', type=int, default=3200000) +parser.add_argument('--deep_vocab_size', type=int, default=3200000) +parser.add_argument('--deep_embedding_vec_size', type=int, default=16) +parser.add_argument('--deep_dropout_rate', type=float, default=0.5) +parser.add_argument('--num_dense_fields', type=int, default=13) +parser.add_argument('--num_wide_sparse_fields', type=int, default=2) +parser.add_argument('--num_deep_sparse_fields', type=int, default=26) +parser.add_argument('--max_iter', type=int, default=30000) +parser.add_argument('--loss_print_every_n_iter', type=int, default=100) +parser.add_argument('--gpu_num', type=int, default=8) +parser.add_argument('--hidden_units_num', type=int, default=7) +parser.add_argument('--hidden_size', type=int, default=1024) + +FLAGS = parser.parse_args() + +#DEEP_HIDDEN_UNITS = [1024, 1024]#, 1024, 1024, 1024, 1024, 1024] +DEEP_HIDDEN_UNITS = [FLAGS.hidden_size for i in range(FLAGS.hidden_units_num)] +print(DEEP_HIDDEN_UNITS) + + +def _raw_blob_conf(name, shape, data_type): + return flow.data.BlobConf(name=name, shape=shape, dtype=data_type, codec=flow.data.RawCodec()) + + +def _data_loader(data_dir, data_part_num, batch_size): + blob_conf = [ + _raw_blob_conf('labels', (1,), flow.int32), + _raw_blob_conf('dense_fields', (FLAGS.num_dense_fields,), flow.float), + _raw_blob_conf('wide_sparse_fields', (FLAGS.num_wide_sparse_fields,), flow.int32), + _raw_blob_conf('deep_sparse_fields', (FLAGS.num_deep_sparse_fields,), flow.int32) + ] + blobs = flow.data.decode_ofrecord( + data_dir, + blobs=blob_conf, + batch_size=batch_size, + name="decode", + data_part_num=data_part_num, + part_name_suffix_length=FLAGS.train_part_name_suffix_length, + ) + # copy to gpu + blobs = tuple(map(lambda blob: flow.identity(blob), blobs)) + return blobs + +def _data_loader_ofrecord_new(data_dir, data_part_num, batch_size, shuffle=True): + ofrecord = flow.data.ofrecord_reader(data_dir, + batch_size=batch_size, + data_part_num=data_part_num, + part_name_suffix_length=FLAGS.train_part_name_suffix_length, + random_shuffle=shuffle, + shuffle_after_epoch=shuffle) + labels = flow.data.OFRecordRawDecoder(ofrecord, "labels", shape=(1,), dtype=flow.int32) + dense_fields = flow.data.OFRecordRawDecoder(ofrecord, "dense_fields", shape=(FLAGS.num_dense_fields,), dtype=flow.float) + wide_sparse_fields = flow.data.OFRecordRawDecoder(ofrecord, "wide_sparse_fields", shape=(FLAGS.num_wide_sparse_fields,), dtype=flow.int32) + deep_sparse_fields = flow.data.OFRecordRawDecoder(ofrecord, "deep_sparse_fields", shape=(FLAGS.num_deep_sparse_fields,), dtype=flow.int32) + return flow.identity_n([labels, dense_fields, wide_sparse_fields, deep_sparse_fields]) + + +def _data_loader_onerec(data_dir, data_part_num, batch_size): + files = glob.glob(os.path.join(data_dir, '*.onerec')) + readdata = flow.data.onerec_reader(files=files, batch_size=batch_size) + labels = flow.data.onerec_decoder(readdata, key='labels', dtype=flow.int32, shape=(1,)) + dense_fields = flow.data.onerec_decoder(readdata, key='dense_fields', dtype=flow.float, shape=(FLAGS.num_dense_fields,)) + wide_sparse_fields = flow.data.onerec_decoder(readdata, key='wide_sparse_fields', dtype=flow.int32, shape=(FLAGS.num_wide_sparse_fields,)) + deep_sparse_fields = flow.data.onerec_decoder(readdata, key='deep_sparse_fields', dtype=flow.int32, shape=(FLAGS.num_deep_sparse_fields,)) + return flow.identity_n([labels, dense_fields, wide_sparse_fields, deep_sparse_fields]) + + +def _model(dense_fields, wide_sparse_fields, deep_sparse_fields): + wide_sparse_fields = flow.parallel_cast(wide_sparse_fields, distribute=flow.distribute.broadcast()) + wide_embedding_table = flow.get_variable( + name='wide_embedding', + shape=(FLAGS.wide_vocab_size, 1), + initializer=flow.random_uniform_initializer(minval=-0.05, maxval=0.05), + distribute=flow.distribute.split(0), + ) + wide_embedding = flow.gather(params=wide_embedding_table, indices=wide_sparse_fields) + wide_embedding = flow.reshape(wide_embedding, shape=(-1, wide_embedding.shape[-1] * wide_embedding.shape[-2])) + wide_scores = flow.math.reduce_sum(wide_embedding, axis=[1], keepdims=True) + wide_scores = flow.parallel_cast(wide_scores, distribute=flow.distribute.split(0), + gradient_distribute=flow.distribute.broadcast()) + + deep_sparse_fields = flow.parallel_cast(deep_sparse_fields, distribute=flow.distribute.broadcast()) + deep_embedding_table = flow.get_variable( + name='deep_embedding', + shape=(FLAGS.deep_vocab_size, FLAGS.deep_embedding_vec_size), + initializer=flow.random_uniform_initializer(minval=-0.05, maxval=0.05), + distribute=flow.distribute.split(1), + ) + deep_embedding = flow.gather(params=deep_embedding_table, indices=deep_sparse_fields) + deep_embedding = flow.parallel_cast(deep_embedding, distribute=flow.distribute.split(0), + gradient_distribute=flow.distribute.split(2)) + deep_embedding = flow.reshape(deep_embedding, shape=(-1, deep_embedding.shape[-1] * deep_embedding.shape[-2])) + deep_features = flow.concat([deep_embedding, dense_fields], axis=1) + for idx, units in enumerate(DEEP_HIDDEN_UNITS): + deep_features = flow.layers.dense( + deep_features, + units=units, + kernel_initializer=flow.glorot_uniform_initializer(), + bias_initializer=flow.constant_initializer(0.0), + activation=flow.math.relu, + name='fc' + str(idx + 1) + ) + deep_features = flow.nn.dropout(deep_features, rate=FLAGS.deep_dropout_rate) + deep_scores = flow.layers.dense( + deep_features, + units=1, + kernel_initializer=flow.glorot_uniform_initializer(), + bias_initializer=flow.constant_initializer(0.0), + name='fc' + str(len(DEEP_HIDDEN_UNITS) + 1) + ) + + scores = wide_scores + deep_scores + return scores + + +def _get_train_conf(): + train_conf = flow.FunctionConfig() + train_conf.default_data_type(flow.float) + train_conf.train.primary_lr(FLAGS.learning_rate) + train_conf.train.model_update_conf({ + 'lazy_adam_conf': { + } + }) + train_conf.use_boxing_v2(True) + train_conf.default_distribute_strategy(flow.distribute.consistent_strategy()) + train_conf.indexed_slices_optimizer_conf(dict(include_op_names=dict(op_name=['wide_embedding', 'deep_embedding']))) + return train_conf + +def _get_eval_conf(): + eval_conf = flow.FunctionConfig() + eval_conf.default_data_type(flow.float) + eval_conf.default_distribute_strategy(flow.distribute.consistent_strategy()) + return eval_conf + + +global_loss = 0.0 +def _create_train_callback(step): + def nop(loss): + global global_loss + global_loss += loss.mean() + pass + + def print_loss(loss): + global global_loss + global_loss += loss.mean() + print(step+1, 'time', datetime.datetime.now(), 'loss', global_loss/FLAGS.loss_print_every_n_iter) + global_loss = 0.0 + + if (step + 1) % FLAGS.loss_print_every_n_iter == 0: + return print_loss + else: + return nop + +@flow.global_function(_get_train_conf()) +def train_job(): + labels, dense_fields, wide_sparse_fields, deep_sparse_fields = \ + _data_loader_ofrecord_new(data_dir=FLAGS.train_data_dir, + data_part_num=FLAGS.train_data_part_num, + batch_size=FLAGS.batch_size) + logits = _model(dense_fields, wide_sparse_fields, deep_sparse_fields) + loss = flow.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits) + flow.losses.add_loss(loss) + return loss + + +@flow.global_function(_get_eval_conf()) +def eval_job(): + labels, dense_fields, wide_sparse_fields, deep_sparse_fields = \ + _data_loader_ofrecord_new(data_dir=FLAGS.eval_data_dir, + data_part_num=FLAGS.eval_data_part_num, + batch_size=FLAGS.batch_size, + shuffle=False) + logits = _model(dense_fields, wide_sparse_fields, deep_sparse_fields) + loss = flow.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits) + predict = flow.math.sigmoid(logits) + return loss, predict, labels + +def main(): + flow.config.gpu_device_num(FLAGS.gpu_num) + #flow.config.enable_numa_aware_cuda_malloc_host(True) + #flow.config.collective_boxing.enable_fusion(False) + check_point = flow.train.CheckPoint() + check_point.init() + for i in range(FLAGS.max_iter): + train_job().async_get(_create_train_callback(i)) + if (i + 1 ) % FLAGS.eval_interval == 0: + labels = np.array([[0]]) + preds = np.array([[0]]) + cur_time = time.time() + eval_loss = 0.0 + for j in range(FLAGS.eval_batchs): + loss, pred, ref = eval_job().get() + label_ = ref.ndarray().astype(np.float32) + labels = np.concatenate((labels, label_), axis=0) + preds = np.concatenate((preds, pred.ndarray()), axis=0) + eval_loss += loss.mean() + auc = roc_auc_score(labels[1:], preds[1:]) + print(i+1, "eval_loss", eval_loss/FLAGS.eval_batchs, "eval_auc", auc) + + +if __name__ == '__main__': + main() diff --git a/ClickThroughRate/WideDeepLearning/wdl_train_eval_test.py b/ClickThroughRate/WideDeepLearning/wdl_train_eval_test.py new file mode 100644 index 0000000000000000000000000000000000000000..19973f07b84040ce2c79cfab26cd52548c0519aa --- /dev/null +++ b/ClickThroughRate/WideDeepLearning/wdl_train_eval_test.py @@ -0,0 +1,261 @@ +import argparse +import oneflow as flow +import datetime +import os +import glob +from sklearn.metrics import roc_auc_score +import numpy as np +import time + +parser = argparse.ArgumentParser() +parser.add_argument('--train_data_dir', type=str, required=True) +parser.add_argument('--train_data_part_num', type=int, required=True) +parser.add_argument('--train_part_name_suffix_length', type=int, default=-1) +parser.add_argument('--train_data_num', type=int, default=36674623) +parser.add_argument('--eval_data_dir', type=str, required=True) +parser.add_argument('--eval_data_part_num', type=int, required=True) +parser.add_argument('--eval_part_name_suffix_length', type=int, default=-1) +parser.add_argument('--eval_data_num', type=int, default=4583478) +parser.add_argument('--test_data_dir', type=str, required=True) +parser.add_argument('--test_data_part_num', type=int, required=True) +parser.add_argument('--test_part_name_suffix_length', type=int, default=-1) +parser.add_argument('--test_data_num', type=int, default=4582516) +parser.add_argument('--batch_size', type=int, default=16384) +parser.add_argument('--learning_rate', type=float, default=1e-3) +parser.add_argument('--wide_vocab_size', type=int, default=3200000) +parser.add_argument('--deep_vocab_size', type=int, default=3200000) +parser.add_argument('--deep_embedding_vec_size', type=int, default=16) +parser.add_argument('--deep_dropout_rate', type=float, default=0.5) +parser.add_argument('--num_dense_fields', type=int, default=13) +parser.add_argument('--num_wide_sparse_fields', type=int, default=2) +parser.add_argument('--num_deep_sparse_fields', type=int, default=26) +parser.add_argument('--epoch_num', type=int, default=4) +parser.add_argument('--loss_print_every_n_iter', type=int, default=100) +parser.add_argument('--gpu_num', type=int, default=8) +parser.add_argument('--hidden_units_num', type=int, default=7) +parser.add_argument('--hidden_size', type=int, default=1024) + +FLAGS = parser.parse_args() + +#DEEP_HIDDEN_UNITS = [1024, 1024]#, 1024, 1024, 1024, 1024, 1024] +DEEP_HIDDEN_UNITS = [FLAGS.hidden_size for i in range(FLAGS.hidden_units_num)] +print(DEEP_HIDDEN_UNITS) +train_epoch_size = FLAGS.train_data_num // FLAGS.batch_size + 1 +eval_epoch_size = FLAGS.eval_data_num // FLAGS.batch_size + 1 +test_epoch_size = FLAGS.test_data_num // FLAGS.batch_size + 1 + + +def _raw_blob_conf(name, shape, data_type): + return flow.data.BlobConf(name=name, shape=shape, dtype=data_type, codec=flow.data.RawCodec()) + + +def _data_loader(data_dir, data_part_num, batch_size): + blob_conf = [ + _raw_blob_conf('labels', (1,), flow.int32), + _raw_blob_conf('dense_fields', (FLAGS.num_dense_fields,), flow.float), + _raw_blob_conf('wide_sparse_fields', (FLAGS.num_wide_sparse_fields,), flow.int32), + _raw_blob_conf('deep_sparse_fields', (FLAGS.num_deep_sparse_fields,), flow.int32) + ] + blobs = flow.data.decode_ofrecord( + data_dir, + blobs=blob_conf, + batch_size=batch_size, + name="decode", + data_part_num=data_part_num, + part_name_suffix_length=FLAGS.train_part_name_suffix_length, + ) + # copy to gpu + blobs = tuple(map(lambda blob: flow.identity(blob), blobs)) + return blobs + +def _data_loader_ofrecord_new(data_dir, data_part_num, batch_size, part_name_suffix_length=-1, + shuffle=True): + ofrecord = flow.data.ofrecord_reader(data_dir, + batch_size=batch_size, + data_part_num=data_part_num, + part_name_suffix_length=part_name_suffix_length, + random_shuffle=shuffle, + shuffle_after_epoch=shuffle) + labels = flow.data.OFRecordRawDecoder(ofrecord, "labels", shape=(1,), dtype=flow.int32) + dense_fields = flow.data.OFRecordRawDecoder(ofrecord, "dense_fields", shape=(FLAGS.num_dense_fields,), dtype=flow.float) + wide_sparse_fields = flow.data.OFRecordRawDecoder(ofrecord, "wide_sparse_fields", shape=(FLAGS.num_wide_sparse_fields,), dtype=flow.int32) + deep_sparse_fields = flow.data.OFRecordRawDecoder(ofrecord, "deep_sparse_fields", shape=(FLAGS.num_deep_sparse_fields,), dtype=flow.int32) + return flow.identity_n([labels, dense_fields, wide_sparse_fields, deep_sparse_fields]) + + +def _data_loader_onerec(data_dir, data_part_num, batch_size): + files = glob.glob(os.path.join(data_dir, '*.onerec')) + readdata = flow.data.onerec_reader(files=files, batch_size=batch_size) + labels = flow.data.onerec_decoder(readdata, key='labels', dtype=flow.int32, shape=(1,)) + dense_fields = flow.data.onerec_decoder(readdata, key='dense_fields', dtype=flow.float, shape=(FLAGS.num_dense_fields,)) + wide_sparse_fields = flow.data.onerec_decoder(readdata, key='wide_sparse_fields', dtype=flow.int32, shape=(FLAGS.num_wide_sparse_fields,)) + deep_sparse_fields = flow.data.onerec_decoder(readdata, key='deep_sparse_fields', dtype=flow.int32, shape=(FLAGS.num_deep_sparse_fields,)) + return flow.identity_n([labels, dense_fields, wide_sparse_fields, deep_sparse_fields]) + + +def _model(dense_fields, wide_sparse_fields, deep_sparse_fields): + wide_sparse_fields = flow.parallel_cast(wide_sparse_fields, distribute=flow.distribute.broadcast()) + wide_embedding_table = flow.get_variable( + name='wide_embedding', + shape=(FLAGS.wide_vocab_size, 1), + initializer=flow.random_uniform_initializer(minval=-0.05, maxval=0.05), + distribute=flow.distribute.split(0), + ) + wide_embedding = flow.gather(params=wide_embedding_table, indices=wide_sparse_fields) + wide_embedding = flow.reshape(wide_embedding, shape=(-1, wide_embedding.shape[-1] * wide_embedding.shape[-2])) + wide_scores = flow.math.reduce_sum(wide_embedding, axis=[1], keepdims=True) + wide_scores = flow.parallel_cast(wide_scores, distribute=flow.distribute.split(0), + gradient_distribute=flow.distribute.broadcast()) + + deep_sparse_fields = flow.parallel_cast(deep_sparse_fields, distribute=flow.distribute.broadcast()) + deep_embedding_table = flow.get_variable( + name='deep_embedding', + shape=(FLAGS.deep_vocab_size, FLAGS.deep_embedding_vec_size), + initializer=flow.random_uniform_initializer(minval=-0.05, maxval=0.05), + distribute=flow.distribute.split(1), + ) + deep_embedding = flow.gather(params=deep_embedding_table, indices=deep_sparse_fields) + deep_embedding = flow.parallel_cast(deep_embedding, distribute=flow.distribute.split(0), + gradient_distribute=flow.distribute.split(2)) + deep_embedding = flow.reshape(deep_embedding, shape=(-1, deep_embedding.shape[-1] * deep_embedding.shape[-2])) + deep_features = flow.concat([deep_embedding, dense_fields], axis=1) + for idx, units in enumerate(DEEP_HIDDEN_UNITS): + deep_features = flow.layers.dense( + deep_features, + units=units, + kernel_initializer=flow.glorot_uniform_initializer(), + bias_initializer=flow.constant_initializer(0.0), + activation=flow.math.relu, + name='fc' + str(idx + 1) + ) + deep_features = flow.nn.dropout(deep_features, rate=FLAGS.deep_dropout_rate) + deep_scores = flow.layers.dense( + deep_features, + units=1, + kernel_initializer=flow.glorot_uniform_initializer(), + bias_initializer=flow.constant_initializer(0.0), + name='fc' + str(len(DEEP_HIDDEN_UNITS) + 1) + ) + + scores = wide_scores + deep_scores + return scores + + +def _get_train_conf(): + train_conf = flow.FunctionConfig() + train_conf.default_data_type(flow.float) + train_conf.train.primary_lr(FLAGS.learning_rate) + train_conf.train.model_update_conf({ + 'lazy_adam_conf': { + } + }) + train_conf.use_boxing_v2(True) + train_conf.default_distribute_strategy(flow.distribute.consistent_strategy()) + train_conf.indexed_slices_optimizer_conf(dict(include_op_names=dict(op_name=['wide_embedding', 'deep_embedding']))) + return train_conf + +def _get_eval_conf(): + eval_conf = flow.FunctionConfig() + eval_conf.default_data_type(flow.float) + eval_conf.default_distribute_strategy(flow.distribute.consistent_strategy()) + return eval_conf + + +global_loss = 0.0 +def _create_train_callback(epoch, step): + def nop(loss): + global global_loss + global_loss += loss.mean() + pass + + def print_loss(loss): + global global_loss + global_loss += loss.mean() + print(epoch, step+1, 'time', datetime.datetime.now(), 'loss', + global_loss/FLAGS.loss_print_every_n_iter) + global_loss = 0.0 + + if (step + 1) % FLAGS.loss_print_every_n_iter == 0: + return print_loss + else: + return nop + +@flow.global_function(_get_train_conf()) +def train_job(): + labels, dense_fields, wide_sparse_fields, deep_sparse_fields = \ + _data_loader_ofrecord_new(data_dir=FLAGS.train_data_dir, + data_part_num=FLAGS.train_data_part_num, + batch_size=FLAGS.batch_size, + part_name_suffix_length=FLAGS.train_part_name_suffix_length, + shuffle=True) + logits = _model(dense_fields, wide_sparse_fields, deep_sparse_fields) + loss = flow.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits) + flow.losses.add_loss(loss) + return loss + + +@flow.global_function(_get_eval_conf()) +def eval_job(): + labels, dense_fields, wide_sparse_fields, deep_sparse_fields = \ + _data_loader_ofrecord_new(data_dir=FLAGS.eval_data_dir, + data_part_num=FLAGS.eval_data_part_num, + batch_size=FLAGS.batch_size, + part_name_suffix_length=FLAGS.eval_part_name_suffix_length, + shuffle=False) + logits = _model(dense_fields, wide_sparse_fields, deep_sparse_fields) + loss = flow.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits) + predict = flow.math.sigmoid(logits) + return loss, predict, labels + +@flow.global_function(_get_eval_conf()) +def test_job(): + labels, dense_fields, wide_sparse_fields, deep_sparse_fields = \ + _data_loader_ofrecord_new(data_dir=FLAGS.test_data_dir, + data_part_num=FLAGS.test_data_part_num, + batch_size=FLAGS.batch_size, + part_name_suffix_length=FLAGS.test_part_name_suffix_length, + shuffle=False) + logits = _model(dense_fields, wide_sparse_fields, deep_sparse_fields) + loss = flow.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits) + predict = flow.math.sigmoid(logits) + return loss, predict, labels + +def main(): + flow.config.gpu_device_num(FLAGS.gpu_num) + #flow.config.enable_numa_aware_cuda_malloc_host(True) + #flow.config.collective_boxing.enable_fusion(False) + check_point = flow.train.CheckPoint() + check_point.init() + global global_loss + for epoch in range(FLAGS.epoch_num): + global_loss = 0.0 + for i in range(train_epoch_size): + train_job().async_get(_create_train_callback(epoch, i)) + + labels = np.array([[0]]) + preds = np.array([[0]]) + eval_loss = 0.0 + for i in range(eval_epoch_size): + loss, pred, ref = eval_job().get() + label_ = ref.ndarray().astype(np.float32) + labels = np.concatenate((labels, label_), axis=0) + preds = np.concatenate((preds, pred.ndarray()), axis=0) + eval_loss += loss.mean() + auc = roc_auc_score(labels[1:], preds[1:]) + print(epoch, "eval_loss", eval_loss/eval_epoch_size, "eval_auc", auc) + + labels = np.array([[0]]) + preds = np.array([[0]]) + eval_loss = 0.0 + for i in range(test_epoch_size): + loss, pred, ref = test_job().get() + label_ = ref.ndarray().astype(np.float32) + labels = np.concatenate((labels, label_), axis=0) + preds = np.concatenate((preds, pred.ndarray()), axis=0) + eval_loss += loss.mean() + auc = roc_auc_score(labels[1:], preds[1:]) + print("test_loss", eval_loss/test_epoch_size, "eval_auc", auc) + + +if __name__ == '__main__': + main()