未验证 提交 b6b5326f 编写于 作者: D Dong Daxiang 提交者: GitHub

Merge pull request #235 from guru4elephant/refine_ctr_example

refine ctr example
......@@ -3,83 +3,88 @@ import argparse
def parse_args():
parser = argparse.ArgumentParser(description="PaddlePaddle CTR example")
parser.add_argument(
'--train_data_path',
type=str,
default='./data/raw/train.txt',
help="The path of training dataset")
parser.add_argument(
'--test_data_path',
type=str,
default='./data/raw/valid.txt',
help="The path of testing dataset")
parser.add_argument(
'--batch_size',
type=int,
default=1000,
help="The size of mini-batch (default:1000)")
parser.add_argument(
'--embedding_size',
type=int,
default=10,
help="The size for embedding layer (default:10)")
parser.add_argument(
'--num_passes',
type=int,
default=10,
help="The number of passes to train (default: 10)")
parser.add_argument(
'--model_output_dir',
type=str,
default='models',
help='The path for model to store (default: models)')
parser.add_argument(
'--sparse_feature_dim',
type=int,
default=1000001,
help='sparse feature hashing space for index processing')
parser.add_argument(
'--is_local',
type=int,
default=1,
help='Local train or distributed train (default: 1)')
parser.add_argument(
'--cloud_train',
type=int,
default=0,
help='Local train or distributed train on paddlecloud (default: 0)')
parser.add_argument(
'--async_mode',
action='store_true',
default=False,
help='Whether start pserver in async mode to support ASGD')
parser.add_argument(
'--no_split_var',
action='store_true',
default=False,
help='Whether split variables into blocks when update_method is pserver')
parser.add_argument(
'--role',
type=str,
default='pserver', # trainer or pserver
help='The path for model to store (default: models)')
parser.add_argument(
'--endpoints',
type=str,
default='127.0.0.1:6000',
help='The pserver endpoints, like: 127.0.0.1:6000,127.0.0.1:6001')
parser.add_argument(
'--current_endpoint',
type=str,
default='127.0.0.1:6000',
help='The path for model to store (default: 127.0.0.1:6000)')
parser.add_argument(
'--trainer_id',
type=int,
default=0,
help='The path for model to store (default: models)')
parser.add_argument(
'--trainers',
type=int,
default=1,
help='The num of trianers, (default: 1)')
'--train_data_path',
type=str,
default='./data/raw/train.txt',
help="The path of training dataset")
parser.add_argument(
'--sparse_only',
type=bool,
default=False,
help="Whether we use sparse features only")
parser.add_argument(
'--test_data_path',
type=str,
default='./data/raw/valid.txt',
help="The path of testing dataset")
parser.add_argument(
'--batch_size',
type=int,
default=1000,
help="The size of mini-batch (default:1000)")
parser.add_argument(
'--embedding_size',
type=int,
default=10,
help="The size for embedding layer (default:10)")
parser.add_argument(
'--num_passes',
type=int,
default=10,
help="The number of passes to train (default: 10)")
parser.add_argument(
'--model_output_dir',
type=str,
default='models',
help='The path for model to store (default: models)')
parser.add_argument(
'--sparse_feature_dim',
type=int,
default=1000001,
help='sparse feature hashing space for index processing')
parser.add_argument(
'--is_local',
type=int,
default=1,
help='Local train or distributed train (default: 1)')
parser.add_argument(
'--cloud_train',
type=int,
default=0,
help='Local train or distributed train on paddlecloud (default: 0)')
parser.add_argument(
'--async_mode',
action='store_true',
default=False,
help='Whether start pserver in async mode to support ASGD')
parser.add_argument(
'--no_split_var',
action='store_true',
default=False,
help='Whether split variables into blocks when update_method is pserver')
parser.add_argument(
'--role',
type=str,
default='pserver', # trainer or pserver
help='The path for model to store (default: models)')
parser.add_argument(
'--endpoints',
type=str,
default='127.0.0.1:6000',
help='The pserver endpoints, like: 127.0.0.1:6000,127.0.0.1:6001')
parser.add_argument(
'--current_endpoint',
type=str,
default='127.0.0.1:6000',
help='The path for model to store (default: 127.0.0.1:6000)')
parser.add_argument(
'--trainer_id',
type=int,
default=0,
help='The path for model to store (default: models)')
parser.add_argument(
'--trainers',
type=int,
default=1,
help='The num of trianers, (default: 1)')
return parser.parse_args()
wget https://paddle-serving.bj.bcebos.com/data%2Fctr_prediction%2Fctr_data.tar.gz
tar -zxvf ctr_data.tar.gz
tar -zxvf *ctr_data.tar.gz
......@@ -4,15 +4,16 @@ from args import parse_args
import os
import paddle.fluid as fluid
import sys
from network_conf import ctr_dnn_model_dataset
from network_conf import dnn_model
dense_feature_dim = 13
def train():
args = parse_args()
sparse_only = args.sparse_only
if not os.path.isdir(args.model_output_dir):
os.mkdir(args.model_output_dir)
dense_input = fluid.layers.data(
name="dense_input", shape=[dense_feature_dim], dtype='float32')
sparse_input_ids = [
......@@ -20,8 +21,10 @@ def train():
for i in range(1, 27)]
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
predict_y, loss, auc_var, batch_auc_var = ctr_dnn_model_dataset(
dense_input, sparse_input_ids, label,
#nn_input = None if sparse_only else dense_input
nn_input = dense_input
predict_y, loss, auc_var, batch_auc_var = dnn_model(
nn_input, sparse_input_ids, label,
args.embedding_size, args.sparse_feature_dim)
optimizer = fluid.optimizer.SGD(learning_rate=1e-4)
......@@ -31,29 +34,33 @@ def train():
exe.run(fluid.default_startup_program())
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
dataset.set_use_var([dense_input] + sparse_input_ids + [label])
python_executable = "python"
pipe_command = "{} criteo_reader.py {}".format(python_executable, args.sparse_feature_dim)
pipe_command = "{} criteo_reader.py {}".format(
python_executable, args.sparse_feature_dim)
dataset.set_pipe_command(pipe_command)
dataset.set_batch_size(128)
thread_num = 10
dataset.set_thread(thread_num)
whole_filelist = ["raw_data/part-%d" % x for x in range(len(os.listdir("raw_data")))]
#dataset.set_filelist(whole_filelist[:(len(whole_filelist)-thread_num)])
whole_filelist = ["raw_data/part-%d" % x for x in
range(len(os.listdir("raw_data")))]
dataset.set_filelist(whole_filelist[:thread_num])
dataset.load_into_memory()
epochs = 1
for i in range(epochs):
exe.train_from_dataset(program=fluid.default_main_program(),
dataset=dataset,
debug=True)
exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=dataset, debug=True)
print("epoch {} finished".format(i))
import paddle_serving_client.io as server_io
feed_var_dict = {}
for i, sparse in enumerate(sparse_input_ids):
feed_var_dict["sparse_{}".format(i)] = sparse
feed_var_dict["dense_0"] = dense_input
fetch_var_dict = {"prob": predict_y}
server_io.save_model(
......
import paddle.fluid as fluid
import math
dense_feature_dim = 13
def dnn_model(dense_input, sparse_inputs, label,
embedding_size, sparse_feature_dim):
def ctr_dnn_model_dataset(dense_input, sparse_inputs, label,
embedding_size, sparse_feature_dim):
def embedding_layer(input):
emb = fluid.layers.embedding(
input=input,
......@@ -15,20 +14,30 @@ def ctr_dnn_model_dataset(dense_input, sparse_inputs, label,
initializer=fluid.initializer.Uniform()))
return fluid.layers.sequence_pool(input=emb, pool_type='sum')
sparse_embed_seq = list(map(embedding_layer, sparse_inputs))
concated = fluid.layers.concat(sparse_embed_seq, axis=1)
fc1 = fluid.layers.fc(input=concated, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(concated.shape[1]))))
fc2 = fluid.layers.fc(input=fc1, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc1.shape[1]))))
fc3 = fluid.layers.fc(input=fc2, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc2.shape[1]))))
predict = fluid.layers.fc(input=fc3, size=2, act='softmax',
def mlp_input_tensor(emb_sums, dense_tensor):
if isinstance(dense_tensor, fluid.Variable):
return fluid.layers.concat(emb_sums, axis=1)
else:
return fluid.layers.concat(emb_sums + [dense_tensor], axis=1)
def mlp(mlp_input):
fc1 = fluid.layers.fc(input=mlp_input, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(mlp_input.shape[1]))))
fc2 = fluid.layers.fc(input=fc1, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc1.shape[1]))))
fc3 = fluid.layers.fc(input=fc2, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc2.shape[1]))))
pre = fluid.layers.fc(input=fc3, size=2, act='softmax',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc3.shape[1]))))
return pre
emb_sums = list(map(embedding_layer, sparse_inputs))
mlp_in = mlp_input_tensor(emb_sums, dense_input)
predict = mlp(mlp_in)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.reduce_sum(cost)
accuracy = fluid.layers.accuracy(input=predict, label=label)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册