未验证 提交 8c32619e 编写于 作者: Q Qiao Longfei 提交者: GitHub

Merge pull request #1481 from seiriosPlus/update_ctr

update ctr use pyreader
...@@ -16,6 +16,8 @@ build_doc/ ...@@ -16,6 +16,8 @@ build_doc/
.cproject .cproject
.pydevproject .pydevproject
.settings/ .settings/
*.pyc
CMakeSettings.json CMakeSettings.json
Makefile Makefile
.test_env/ .test_env/
......
...@@ -74,3 +74,6 @@ python infer.py \ ...@@ -74,3 +74,6 @@ python infer.py \
1. 用preprocess.py处理训练数据生成train.txt。 1. 用preprocess.py处理训练数据生成train.txt。
1. 将train.txt切分成集群机器份,放到每台机器上。 1. 将train.txt切分成集群机器份,放到每台机器上。
1. 用上面的 `分布式训练` 中的命令行启动分布式训练任务. 1. 用上面的 `分布式训练` 中的命令行启动分布式训练任务.
## 在PaddleCloud上运行集群训练
如果你正在使用PaddleCloud做集群训练,你可以使用```cloud.py```这个文件来帮助你提交任务,```trian.py```中所需要的参数可以通过PaddleCloud的环境变量来提交。
\ No newline at end of file
...@@ -91,3 +91,6 @@ Note: The AUC value in the last log info is the total AUC for all test dataset. ...@@ -91,3 +91,6 @@ Note: The AUC value in the last log info is the total AUC for all test dataset.
1. Prepare dataset using preprocess.py. 1. Prepare dataset using preprocess.py.
1. Split the train.txt to trainer_num parts and put them on the machines. 1. Split the train.txt to trainer_num parts and put them on the machines.
1. Run training with the cluster train using the command in `Distributed Train` above. 1. Run training with the cluster train using the command in `Distributed Train` above.
## Train on Paddle Cloud
If you want to run this training on PaddleCloud, you can use the script ```cloud.py```, you can change the arguments in ```trian.py``` through environments in PaddleCloud.
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# ======================================================================
#
# Copyright (c) 2017 Baidu.com, Inc. All Rights Reserved
#
# ======================================================================
"""this file is only for PaddleCloud"""
import os
import logging
import paddle.fluid.contrib.utils.hdfs_utils as hdfs_utils
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("cloud")
logger.setLevel(logging.INFO)
def run():
cmd = "python -u train.py "
cmd += " --train_data_path %s " % "data/train.txt"
cmd += " --test_data_path %s " % "data/test.txt"
if os.getenv("BATCH_SIZE", ""):
cmd += " --batch_size %s " % os.getenv("BATCH_SIZE")
if os.getenv("EMBEDDING_SIZE", ""):
cmd += " --embedding_size %s " % os.getenv("EMBEDDING_SIZE")
if os.getenv("NUM_PASSES", ""):
cmd += " --num_passes %s " % os.getenv("NUM_PASSES")
if os.getenv("MODEL_OUTPUT_DIR", ""):
cmd += " --model_output_dir %s " % os.getenv("MODEL_OUTPUT_DIR")
if os.getenv("SPARSE_FEATURE_DIM", ""):
cmd += " --sparse_feature_dim %s " % os.getenv("SPARSE_FEATURE_DIM")
if os.getenv("ASYNC_MODE", ""):
cmd += " --async_mode "
if os.getenv("NO_SPLIT_VAR", ""):
cmd += " --no_split_var "
is_local = int(os.getenv("PADDLE_IS_LOCAL", "1"))
if is_local:
cmd += " --is_local 1 "
cmd += " --cloud_train 0 "
else:
cmd += " --is_local 0 "
cmd += " --cloud_train 1 "
trainer_id = int(os.environ["PADDLE_TRAINER_ID"])
trainers = int(os.environ["PADDLE_TRAINERS"])
training_role = os.environ["PADDLE_TRAINING_ROLE"]
port = os.getenv("PADDLE_PSERVER_PORT", "6174")
pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
eplist = []
for ip in pserver_ips.split(","):
eplist.append(':'.join([ip, port]))
pserver_endpoints = ",".join(eplist)
current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
if training_role == "PSERVER":
cmd += " --role pserver "
else:
cmd += " --role trainer "
cmd += " --endpoints %s " % pserver_endpoints
cmd += " --current_endpoint %s " % current_endpoint
cmd += " --trainer_id %s " % trainer_id
cmd += " --trainers %s " % trainers
logging.info("run cluster commands: {}".format(cmd))
exit(os.system(cmd))
def download():
hadoop_home = os.getenv("HADOOP_HOME")
configs = {}
configs["fs.default.name"] = os.getenv("DATA_FS_NAME")
configs["hadoop.job.ugi"] = os.getenv("DATA_FS_UGI")
client = hdfs_utils.HDFSClient(hadoop_home, configs)
local_train_data_dir = os.getenv("TRAIN_DATA_LOCAL", "data")
hdfs_train_data_dir = os.getenv("TRAIN_DATA_HDFS", "")
downloads = hdfs_utils.multi_download(client, hdfs_train_data_dir, local_train_data_dir, 0, 1, multi_processes=1)
print(downloads)
for d in downloads:
base_dir = os.path.dirname(d)
tar_cmd = "tar -zxvf {} -C {}".format(d, base_dir)
print tar_cmd
for d in downloads:
base_dir = os.path.dirname(d)
tar_cmd = "tar -zxvf {} -C {}".format(d, base_dir)
logging.info("DOWNLOAD DATA: {}, AND TAR IT: {}".format(d, tar_cmd))
os.system(tar_cmd)
def env_declar():
logging.info("******** Rename Cluster Env to PaddleFluid Env ********")
if os.environ["TRAINING_ROLE"] == "PSERVER" or os.environ["PADDLE_IS_LOCAL"] == "0":
os.environ["PADDLE_TRAINING_ROLE"] = os.environ["TRAINING_ROLE"]
os.environ["PADDLE_PSERVER_PORT"] = os.environ["PADDLE_PORT"]
os.environ["PADDLE_PSERVER_IPS"] = os.environ["PADDLE_PSERVERS"]
os.environ["PADDLE_TRAINERS"] = os.environ["PADDLE_TRAINERS_NUM"]
os.environ["PADDLE_CURRENT_IP"] = os.environ["POD_IP"]
os.environ["PADDLE_TRAINER_ID"] = os.environ["PADDLE_TRAINER_ID"]
os.environ["CPU_NUM"] = os.getenv("CPU_NUM", "12")
os.environ["NUM_THREADS"] = os.getenv("NUM_THREADS", "12")
logging.info("Content-Type: text/plain\n\n")
for key in os.environ.keys():
logging.info("%30s %s \n" % (key, os.environ[key]))
logging.info("****** Rename Cluster Env to PaddleFluid Env END ******")
if __name__ == '__main__':
env_declar()
if os.getenv("NEED_CUSTOM_DOWNLOAD", ""):
if os.environ["PADDLE_TRAINING_ROLE"] == "PSERVER":
logging.info("PSERVER do not need to download datas")
else:
logging.info("NEED_CUSTOM_DOWNLOAD is True, will download train data with hdfs_utils")
download()
run()
...@@ -3,14 +3,108 @@ import math ...@@ -3,14 +3,108 @@ import math
dense_feature_dim = 13 dense_feature_dim = 13
def ctr_dnn_model(embedding_size, sparse_feature_dim):
dense_input = fluid.layers.data( def ctr_deepfm_model(factor_size, sparse_feature_dim, dense_feature_dim, sparse_input):
name="dense_input", shape=[dense_feature_dim], dtype='float32') def dense_fm_layer(input, emb_dict_size, factor_size, fm_param_attr):
"""
dense_fm_layer
"""
first_order = fluid.layers.fc(input=input, size=1)
emb_table = fluid.layers.create_parameter(shape=[emb_dict_size, factor_size],
dtype='float32', attr=fm_param_attr)
input_mul_factor = fluid.layers.matmul(input, emb_table)
input_mul_factor_square = fluid.layers.square(input_mul_factor)
input_square = fluid.layers.square(input)
factor_square = fluid.layers.square(emb_table)
input_square_mul_factor_square = fluid.layers.matmul(input_square, factor_square)
second_order = 0.5 * (input_mul_factor_square - input_square_mul_factor_square)
return first_order, second_order
def sparse_fm_layer(input, emb_dict_size, factor_size, fm_param_attr):
"""
sparse_fm_layer
"""
first_embeddings = fluid.layers.embedding(
input=input, dtype='float32', size=[emb_dict_size, 1], is_sparse=True)
first_order = fluid.layers.sequence_pool(input=first_embeddings, pool_type='sum')
nonzero_embeddings = fluid.layers.embedding(
input=input, dtype='float32', size=[emb_dict_size, factor_size],
param_attr=fm_param_attr, is_sparse=True)
summed_features_emb = fluid.layers.sequence_pool(input=nonzero_embeddings, pool_type='sum')
summed_features_emb_square = fluid.layers.square(summed_features_emb)
squared_features_emb = fluid.layers.square(nonzero_embeddings)
squared_sum_features_emb = fluid.layers.sequence_pool(
input=squared_features_emb, pool_type='sum')
second_order = 0.5 * (summed_features_emb_square - squared_sum_features_emb)
return first_order, second_order
dense_input = fluid.layers.data(name="dense_input", shape=[dense_feature_dim], dtype='float32')
sparse_input_ids = [ sparse_input_ids = [
fluid.layers.data( fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, dtype='int64')
name="C" + str(i), shape=[1], lod_level=1, dtype='int64') for i in range(1, 27)]
for i in range(1, 27)
] label = fluid.layers.data(name='label', shape=[1], dtype='int64')
datas = [dense_input] + sparse_input_ids + [label]
py_reader = fluid.layers.create_py_reader_by_data(capacity=64,
feed_list=datas,
name='py_reader',
use_double_buffer=True)
words = fluid.layers.read_file(py_reader)
sparse_fm_param_attr = fluid.param_attr.ParamAttr(name="SparseFeatFactors",
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(sparse_feature_dim)))
dense_fm_param_attr = fluid.param_attr.ParamAttr(name="DenseFeatFactors",
initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(dense_feature_dim)))
sparse_fm_first, sparse_fm_second = sparse_fm_layer(
sparse_input, sparse_feature_dim, factor_size, sparse_fm_param_attr)
dense_fm_first, dense_fm_second = dense_fm_layer(
dense_input, dense_feature_dim, factor_size, dense_fm_param_attr)
def embedding_layer(input):
"""embedding_layer"""
emb = fluid.layers.embedding(
input=input, dtype='float32', size=[sparse_feature_dim, factor_size],
param_attr=sparse_fm_param_attr, is_sparse=True)
return fluid.layers.sequence_pool(input=emb, pool_type='average')
sparse_embed_seq = map(embedding_layer, sparse_input_ids)
concated = fluid.layers.concat(sparse_embed_seq + [dense_input], axis=1)
fc1 = fluid.layers.fc(input=concated, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(concated.shape[1]))))
fc2 = fluid.layers.fc(input=fc1, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc1.shape[1]))))
fc3 = fluid.layers.fc(input=fc2, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc2.shape[1]))))
predict = fluid.layers.fc(
input=[sparse_fm_first, sparse_fm_second, dense_fm_first, dense_fm_second, fc3],
size=2,
act="softmax",
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(scale=1 / math.sqrt(fc3.shape[1]))))
cost = fluid.layers.cross_entropy(input=predict, label=words[-1])
avg_cost = fluid.layers.reduce_sum(cost)
accuracy = fluid.layers.accuracy(input=predict, label=words[-1])
auc_var, batch_auc_var, auc_states = \
fluid.layers.auc(input=predict, label=words[-1], num_thresholds=2 ** 12, slide_steps=20)
return avg_cost, auc_var, batch_auc_var, py_reader
def ctr_dnn_model(embedding_size, sparse_feature_dim):
def embedding_layer(input): def embedding_layer(input):
return fluid.layers.embedding( return fluid.layers.embedding(
...@@ -20,27 +114,46 @@ def ctr_dnn_model(embedding_size, sparse_feature_dim): ...@@ -20,27 +114,46 @@ def ctr_dnn_model(embedding_size, sparse_feature_dim):
# if you want to set is_distributed to True # if you want to set is_distributed to True
is_distributed=False, is_distributed=False,
size=[sparse_feature_dim, embedding_size], size=[sparse_feature_dim, embedding_size],
param_attr=fluid.ParamAttr(name="SparseFeatFactors", initializer=fluid.initializer.Uniform())) param_attr=fluid.ParamAttr(name="SparseFeatFactors",
initializer=fluid.initializer.Uniform()))
sparse_embed_seq = map(embedding_layer, sparse_input_ids) dense_input = fluid.layers.data(
concated = fluid.layers.concat(sparse_embed_seq + [dense_input], axis=1) name="dense_input", shape=[dense_feature_dim], dtype='float32')
sparse_input_ids = [
fluid.layers.data(name="C" + str(i), shape=[1], lod_level=1, dtype='int64')
for i in range(1, 27)]
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
datas = [dense_input] + sparse_input_ids + [label]
py_reader = fluid.layers.create_py_reader_by_data(capacity=64,
feed_list=datas,
name='py_reader',
use_double_buffer=True)
words = fluid.layers.read_file(py_reader)
sparse_embed_seq = map(embedding_layer, words[1:-1])
concated = fluid.layers.concat(sparse_embed_seq + words[0:1], axis=1)
fc1 = fluid.layers.fc(input=concated, size=400, act='relu', fc1 = fluid.layers.fc(input=concated, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(scale=1/math.sqrt(concated.shape[1])))) param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(concated.shape[1]))))
fc2 = fluid.layers.fc(input=fc1, size=400, act='relu', fc2 = fluid.layers.fc(input=fc1, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(scale=1/math.sqrt(fc1.shape[1])))) param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc1.shape[1]))))
fc3 = fluid.layers.fc(input=fc2, size=400, act='relu', fc3 = fluid.layers.fc(input=fc2, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(scale=1/math.sqrt(fc2.shape[1])))) param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc2.shape[1]))))
predict = fluid.layers.fc(input=fc3, size=2, act='softmax', predict = fluid.layers.fc(input=fc3, size=2, act='softmax',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(scale=1/math.sqrt(fc3.shape[1])))) param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc3.shape[1]))))
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
data_list = [dense_input] + sparse_input_ids + [label]
cost = fluid.layers.cross_entropy(input=predict, label=label) cost = fluid.layers.cross_entropy(input=predict, label=words[-1])
avg_cost = fluid.layers.reduce_sum(cost) avg_cost = fluid.layers.reduce_sum(cost)
accuracy = fluid.layers.accuracy(input=predict, label=label) accuracy = fluid.layers.accuracy(input=predict, label=words[-1])
auc_var, batch_auc_var, auc_states = fluid.layers.auc(input=predict, label=label, num_thresholds=2**12, slide_steps=20) auc_var, batch_auc_var, auc_states = \
fluid.layers.auc(input=predict, label=words[-1], num_thresholds=2 ** 12, slide_steps=20)
return avg_cost, data_list, auc_var, batch_auc_var return avg_cost, auc_var, batch_auc_var, py_reader
...@@ -5,14 +5,18 @@ import logging ...@@ -5,14 +5,18 @@ import logging
import os import os
import time import time
# disable gpu training for this example import numpy as np
os.environ["CUDA_VISIBLE_DEVICES"] = ""
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
import reader import reader
from network_conf import ctr_dnn_model from network_conf import ctr_dnn_model
from multiprocessing import cpu_count
# disable gpu training for this example
os.environ["CUDA_VISIBLE_DEVICES"] = ""
logging.basicConfig( logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s') format='%(asctime)s - %(levelname)s - %(message)s')
...@@ -107,7 +111,7 @@ def parse_args(): ...@@ -107,7 +111,7 @@ def parse_args():
return parser.parse_args() return parser.parse_args()
def train_loop(args, train_program, data_list, loss, auc_var, batch_auc_var, def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var,
trainer_num, trainer_id): trainer_num, trainer_id):
dataset = reader.CriteoDataset(args.sparse_feature_dim) dataset = reader.CriteoDataset(args.sparse_feature_dim)
train_reader = paddle.batch( train_reader = paddle.batch(
...@@ -115,28 +119,56 @@ def train_loop(args, train_program, data_list, loss, auc_var, batch_auc_var, ...@@ -115,28 +119,56 @@ def train_loop(args, train_program, data_list, loss, auc_var, batch_auc_var,
dataset.train([args.train_data_path], trainer_num, trainer_id), dataset.train([args.train_data_path], trainer_num, trainer_id),
buf_size=args.batch_size * 100), buf_size=args.batch_size * 100),
batch_size=args.batch_size) batch_size=args.batch_size)
place = fluid.CPUPlace()
feeder = fluid.DataFeeder(feed_list=data_list, place=place) py_reader.decorate_paddle_reader(train_reader)
data_name_list = [var.name for var in data_list] data_name_list = []
place = fluid.CPUPlace()
exe = fluid.Executor(place) exe = fluid.Executor(place)
exec_strategy = fluid.ExecutionStrategy()
build_strategy = fluid.BuildStrategy()
if os.getenv("NUM_THREADS", ""):
exec_strategy.num_threads = int(os.getenv("NUM_THREADS"))
cpu_num = int(os.environ.get('CPU_NUM', cpu_count()))
build_strategy.reduce_strategy = \
fluid.BuildStrategy.ReduceStrategy.Reduce if cpu_num > 1 \
else fluid.BuildStrategy.ReduceStrategy.AllReduce
pe = fluid.ParallelExecutor(
use_cuda=False,
loss_name=loss.name,
main_program=train_program,
build_strategy=build_strategy,
exec_strategy=exec_strategy)
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
for pass_id in range(args.num_passes): for pass_id in range(args.num_passes):
pass_start = time.time() pass_start = time.time()
for batch_id, data in enumerate(train_reader()): batch_id = 0
loss_val, auc_val, batch_auc_val = exe.run( py_reader.start()
train_program,
feed=feeder.feed(data), try:
fetch_list=[loss, auc_var, batch_auc_var] while True:
) loss_val, auc_val, batch_auc_val = pe.run(fetch_list=[loss.name, auc_var.name, batch_auc_var.name])
loss_val = np.mean(loss_val)
auc_val = np.mean(auc_val)
batch_auc_val = np.mean(batch_auc_val)
logger.info("TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}" logger.info("TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}"
.format(pass_id, batch_id, loss_val/args.batch_size, auc_val, batch_auc_val)) .format(pass_id, batch_id, loss_val/args.batch_size, auc_val, batch_auc_val))
if batch_id % 1000 == 0 and batch_id != 0: if batch_id % 1000 == 0 and batch_id != 0:
model_dir = args.model_output_dir + '/batch-' + str(batch_id) model_dir = args.model_output_dir + '/batch-' + str(batch_id)
if args.trainer_id == 0: if args.trainer_id == 0:
fluid.io.save_inference_model(model_dir, data_name_list, [loss, auc_var], exe) fluid.io.save_inference_model(model_dir, data_name_list, [loss, auc_var], exe)
batch_id += 1
except fluid.core.EOFException:
py_reader.reset()
print("pass_id: %d, pass_time_cost: %f" % (pass_id, time.time() - pass_start)) print("pass_id: %d, pass_time_cost: %f" % (pass_id, time.time() - pass_start))
model_dir = args.model_output_dir + '/pass-' + str(pass_id) model_dir = args.model_output_dir + '/pass-' + str(pass_id)
if args.trainer_id == 0: if args.trainer_id == 0:
fluid.io.save_inference_model(model_dir, data_name_list, [loss, auc_var], exe) fluid.io.save_inference_model(model_dir, data_name_list, [loss, auc_var], exe)
...@@ -148,7 +180,7 @@ def train(): ...@@ -148,7 +180,7 @@ def train():
if not os.path.isdir(args.model_output_dir): if not os.path.isdir(args.model_output_dir):
os.mkdir(args.model_output_dir) os.mkdir(args.model_output_dir)
loss, data_list, auc_var, batch_auc_var = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim) loss, auc_var, batch_auc_var, py_reader = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim)
optimizer = fluid.optimizer.Adam(learning_rate=1e-4) optimizer = fluid.optimizer.Adam(learning_rate=1e-4)
optimizer.minimize(loss) optimizer.minimize(loss)
if args.cloud_train: if args.cloud_train:
...@@ -166,11 +198,10 @@ def train(): ...@@ -166,11 +198,10 @@ def train():
args.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) args.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
args.is_local = bool(int(os.getenv("PADDLE_IS_LOCAL", 0))) args.is_local = bool(int(os.getenv("PADDLE_IS_LOCAL", 0)))
if args.is_local: if args.is_local:
logger.info("run local training") logger.info("run local training")
main_program = fluid.default_main_program() main_program = fluid.default_main_program()
train_loop(args, main_program, data_list, loss, auc_var, batch_auc_var, 1, 0) train_loop(args, main_program, py_reader, loss, auc_var, batch_auc_var, 1, 0)
else: else:
logger.info("run dist training") logger.info("run dist training")
t = fluid.DistributeTranspiler() t = fluid.DistributeTranspiler()
...@@ -185,7 +216,7 @@ def train(): ...@@ -185,7 +216,7 @@ def train():
elif args.role == "trainer" or args.role == "TRAINER": elif args.role == "trainer" or args.role == "TRAINER":
logger.info("run trainer") logger.info("run trainer")
train_prog = t.get_trainer_program() train_prog = t.get_trainer_program()
train_loop(args, train_prog, data_list, loss, auc_var, batch_auc_var, train_loop(args, train_prog, py_reader, loss, auc_var, batch_auc_var,
args.trainers, args.trainer_id) args.trainers, args.trainer_id)
else: else:
raise ValueError( raise ValueError(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册