提交 768112b4 编写于 作者: T tangwei12

update feeder to pyreader

上级 2dc53cb0
......@@ -3,15 +3,27 @@ import math
dense_feature_dim = 13
def ctr_dnn_model(embedding_size, sparse_feature_dim):
dense_input = fluid.layers.data(
name="dense_input", shape=[dense_feature_dim], dtype='float32')
sparse_input_ids = [
fluid.layers.data(
name="C" + str(i), shape=[1], lod_level=1, dtype='int64')
for i in range(1, 27)
]
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
datas = [dense_input] + sparse_input_ids + [label]
py_reader = fluid.layers.create_py_reader_by_data(capacity=64,
feed_list=datas,
name='py_reader',
use_double_buffer=True)
words = fluid.layers.read_file(py_reader)
def embedding_layer(input):
return fluid.layers.embedding(
input=input,
......@@ -22,8 +34,8 @@ def ctr_dnn_model(embedding_size, sparse_feature_dim):
size=[sparse_feature_dim, embedding_size],
param_attr=fluid.ParamAttr(name="SparseFeatFactors", initializer=fluid.initializer.Uniform()))
sparse_embed_seq = map(embedding_layer, sparse_input_ids)
concated = fluid.layers.concat(sparse_embed_seq + [dense_input], axis=1)
sparse_embed_seq = map(embedding_layer, words[1:-1])
concated = fluid.layers.concat(sparse_embed_seq + words[0:1], axis=1)
fc1 = fluid.layers.fc(input=concated, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(scale=1/math.sqrt(concated.shape[1]))))
......@@ -34,13 +46,10 @@ def ctr_dnn_model(embedding_size, sparse_feature_dim):
predict = fluid.layers.fc(input=fc3, size=2, act='softmax',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(scale=1/math.sqrt(fc3.shape[1]))))
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
data_list = [dense_input] + sparse_input_ids + [label]
cost = fluid.layers.cross_entropy(input=predict, label=label)
cost = fluid.layers.cross_entropy(input=predict, label=words[-1:])
avg_cost = fluid.layers.reduce_sum(cost)
accuracy = fluid.layers.accuracy(input=predict, label=label)
auc_var, batch_auc_var, auc_states = fluid.layers.auc(input=predict, label=label, num_thresholds=2**12, slide_steps=20)
accuracy = fluid.layers.accuracy(input=predict, label=words[-1:])
auc_var, batch_auc_var, auc_states = \
fluid.layers.auc(input=predict, label=words[-1:], num_thresholds=2**12, slide_steps=20)
return avg_cost, data_list, auc_var, batch_auc_var
return avg_cost, auc_var, batch_auc_var, py_reader
......@@ -5,14 +5,18 @@ import logging
import os
import time
# disable gpu training for this example
os.environ["CUDA_VISIBLE_DEVICES"] = ""
import numpy as np
import paddle
import paddle.fluid as fluid
import reader
from network_conf import ctr_dnn_model
from multiprocessing import cpu_count
# disable gpu training for this example
os.environ["CUDA_VISIBLE_DEVICES"] = ""
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s')
......@@ -107,7 +111,7 @@ def parse_args():
return parser.parse_args()
def train_loop(args, train_program, data_list, loss, auc_var, batch_auc_var,
def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var,
trainer_num, trainer_id):
dataset = reader.CriteoDataset(args.sparse_feature_dim)
train_reader = paddle.batch(
......@@ -115,28 +119,56 @@ def train_loop(args, train_program, data_list, loss, auc_var, batch_auc_var,
dataset.train([args.train_data_path], trainer_num, trainer_id),
buf_size=args.batch_size * 100),
batch_size=args.batch_size)
place = fluid.CPUPlace()
feeder = fluid.DataFeeder(feed_list=data_list, place=place)
data_name_list = [var.name for var in data_list]
py_reader.decorate_paddle_reader(train_reader)
data_name_list = None
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exec_strategy = fluid.ExecutionStrategy()
build_strategy = fluid.BuildStrategy()
if os.getenv("NUM_THREADS", ""):
exec_strategy.num_threads = int(os.getenv("NUM_THREADS"))
cpu_num = int(os.environ.get('CPU_NUM', cpu_count()))
build_strategy.reduce_strategy = \
fluid.BuildStrategy.ReduceStrategy.Reduce if cpu_num > 1 \
else fluid.BuildStrategy.ReduceStrategy.AllReduce
pe = fluid.ParallelExecutor(
use_cuda=False,
loss_name=loss.name,
main_program=train_program,
build_strategy=build_strategy,
exec_strategy=exec_strategy)
exe.run(fluid.default_startup_program())
for pass_id in range(args.num_passes):
pass_start = time.time()
for batch_id, data in enumerate(train_reader()):
loss_val, auc_val, batch_auc_val = exe.run(
train_program,
feed=feeder.feed(data),
fetch_list=[loss, auc_var, batch_auc_var]
)
logger.info("TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}"
batch_id = 0
py_reader.start()
try:
while True:
loss_val, auc_val, batch_auc_val = pe.run(fetch_list=[loss.name, auc_var.name, batch_auc_var.name])
loss_val = np.mean(loss_val)
auc_val = np.mean(auc_val)
batch_auc_val = np.mean(batch_auc_val)
logger.info("TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}"
.format(pass_id, batch_id, loss_val/args.batch_size, auc_val, batch_auc_val))
if batch_id % 1000 == 0 and batch_id != 0:
model_dir = args.model_output_dir + '/batch-' + str(batch_id)
if args.trainer_id == 0:
fluid.io.save_inference_model(model_dir, data_name_list, [loss, auc_var], exe)
if batch_id % 1000 == 0 and batch_id != 0:
model_dir = args.model_output_dir + '/batch-' + str(batch_id)
if args.trainer_id == 0:
fluid.io.save_inference_model(model_dir, data_name_list, [loss, auc_var], exe)
batch_id += 1
except fluid.core.EOFException:
py_reader.reset()
print("pass_id: %d, pass_time_cost: %f" % (pass_id, time.time() - pass_start))
model_dir = args.model_output_dir + '/pass-' + str(pass_id)
if args.trainer_id == 0:
fluid.io.save_inference_model(model_dir, data_name_list, [loss, auc_var], exe)
......@@ -148,7 +180,7 @@ def train():
if not os.path.isdir(args.model_output_dir):
os.mkdir(args.model_output_dir)
loss, data_list, auc_var, batch_auc_var = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim)
loss, auc_var, batch_auc_var, py_reader = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim)
optimizer = fluid.optimizer.Adam(learning_rate=1e-4)
optimizer.minimize(loss)
if args.cloud_train:
......@@ -166,11 +198,10 @@ def train():
args.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
args.is_local = bool(int(os.getenv("PADDLE_IS_LOCAL", 0)))
if args.is_local:
logger.info("run local training")
main_program = fluid.default_main_program()
train_loop(args, main_program, data_list, loss, auc_var, batch_auc_var, 1, 0)
train_loop(args, main_program, py_reader, loss, auc_var, batch_auc_var, 1, 0)
else:
logger.info("run dist training")
t = fluid.DistributeTranspiler()
......@@ -185,7 +216,7 @@ def train():
elif args.role == "trainer" or args.role == "TRAINER":
logger.info("run trainer")
train_prog = t.get_trainer_program()
train_loop(args, train_prog, data_list, loss, auc_var, batch_auc_var,
train_loop(args, train_prog, py_reader, loss, auc_var, batch_auc_var,
args.trainers, args.trainer_id)
else:
raise ValueError(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册