提交 f16d9b85 编写于 作者: G guru4elephant

refine ctr example

上级 ef7c7a5a
...@@ -3,83 +3,88 @@ import argparse ...@@ -3,83 +3,88 @@ import argparse
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description="PaddlePaddle CTR example") parser = argparse.ArgumentParser(description="PaddlePaddle CTR example")
parser.add_argument( parser.add_argument(
'--train_data_path', '--train_data_path',
type=str, type=str,
default='./data/raw/train.txt', default='./data/raw/train.txt',
help="The path of training dataset") help="The path of training dataset")
parser.add_argument( parser.add_argument(
'--test_data_path', '--sparse_only',
type=str, type=bool,
default='./data/raw/valid.txt', default=False,
help="The path of testing dataset") help="Whether we use sparse features only")
parser.add_argument( parser.add_argument(
'--batch_size', '--test_data_path',
type=int, type=str,
default=1000, default='./data/raw/valid.txt',
help="The size of mini-batch (default:1000)") help="The path of testing dataset")
parser.add_argument( parser.add_argument(
'--embedding_size', '--batch_size',
type=int, type=int,
default=10, default=1000,
help="The size for embedding layer (default:10)") help="The size of mini-batch (default:1000)")
parser.add_argument( parser.add_argument(
'--num_passes', '--embedding_size',
type=int, type=int,
default=10, default=10,
help="The number of passes to train (default: 10)") help="The size for embedding layer (default:10)")
parser.add_argument( parser.add_argument(
'--model_output_dir', '--num_passes',
type=str, type=int,
default='models', default=10,
help='The path for model to store (default: models)') help="The number of passes to train (default: 10)")
parser.add_argument( parser.add_argument(
'--sparse_feature_dim', '--model_output_dir',
type=int, type=str,
default=1000001, default='models',
help='sparse feature hashing space for index processing') help='The path for model to store (default: models)')
parser.add_argument( parser.add_argument(
'--is_local', '--sparse_feature_dim',
type=int, type=int,
default=1, default=1000001,
help='Local train or distributed train (default: 1)') help='sparse feature hashing space for index processing')
parser.add_argument( parser.add_argument(
'--cloud_train', '--is_local',
type=int, type=int,
default=0, default=1,
help='Local train or distributed train on paddlecloud (default: 0)') help='Local train or distributed train (default: 1)')
parser.add_argument( parser.add_argument(
'--async_mode', '--cloud_train',
action='store_true', type=int,
default=False, default=0,
help='Whether start pserver in async mode to support ASGD') help='Local train or distributed train on paddlecloud (default: 0)')
parser.add_argument( parser.add_argument(
'--no_split_var', '--async_mode',
action='store_true', action='store_true',
default=False, default=False,
help='Whether split variables into blocks when update_method is pserver') help='Whether start pserver in async mode to support ASGD')
parser.add_argument( parser.add_argument(
'--role', '--no_split_var',
type=str, action='store_true',
default='pserver', # trainer or pserver default=False,
help='The path for model to store (default: models)') help='Whether split variables into blocks when update_method is pserver')
parser.add_argument( parser.add_argument(
'--endpoints', '--role',
type=str, type=str,
default='127.0.0.1:6000', default='pserver', # trainer or pserver
help='The pserver endpoints, like: 127.0.0.1:6000,127.0.0.1:6001') help='The path for model to store (default: models)')
parser.add_argument( parser.add_argument(
'--current_endpoint', '--endpoints',
type=str, type=str,
default='127.0.0.1:6000', default='127.0.0.1:6000',
help='The path for model to store (default: 127.0.0.1:6000)') help='The pserver endpoints, like: 127.0.0.1:6000,127.0.0.1:6001')
parser.add_argument( parser.add_argument(
'--trainer_id', '--current_endpoint',
type=int, type=str,
default=0, default='127.0.0.1:6000',
help='The path for model to store (default: models)') help='The path for model to store (default: 127.0.0.1:6000)')
parser.add_argument( parser.add_argument(
'--trainers', '--trainer_id',
type=int, type=int,
default=1, default=0,
help='The num of trianers, (default: 1)') help='The path for model to store (default: models)')
parser.add_argument(
'--trainers',
type=int,
default=1,
help='The num of trianers, (default: 1)')
return parser.parse_args() return parser.parse_args()
wget https://paddle-serving.bj.bcebos.com/data%2Fctr_prediction%2Fctr_data.tar.gz wget https://paddle-serving.bj.bcebos.com/data%2Fctr_prediction%2Fctr_data.tar.gz
tar -zxvf ctr_data.tar.gz tar -zxvf *ctr_data.tar.gz
...@@ -4,15 +4,16 @@ from args import parse_args ...@@ -4,15 +4,16 @@ from args import parse_args
import os import os
import paddle.fluid as fluid import paddle.fluid as fluid
import sys import sys
from network_conf import ctr_dnn_model_dataset from network_conf import dnn_model
dense_feature_dim = 13 dense_feature_dim = 13
def train(): def train():
args = parse_args() args = parse_args()
sparse_only = args.sparse_only
if not os.path.isdir(args.model_output_dir): if not os.path.isdir(args.model_output_dir):
os.mkdir(args.model_output_dir) os.mkdir(args.model_output_dir)
dense_input = fluid.layers.data( dense_input = fluid.layers.data(
name="dense_input", shape=[dense_feature_dim], dtype='float32') name="dense_input", shape=[dense_feature_dim], dtype='float32')
sparse_input_ids = [ sparse_input_ids = [
...@@ -20,8 +21,10 @@ def train(): ...@@ -20,8 +21,10 @@ def train():
for i in range(1, 27)] for i in range(1, 27)]
label = fluid.layers.data(name='label', shape=[1], dtype='int64') label = fluid.layers.data(name='label', shape=[1], dtype='int64')
predict_y, loss, auc_var, batch_auc_var = ctr_dnn_model_dataset( #nn_input = None if sparse_only else dense_input
dense_input, sparse_input_ids, label, nn_input = dense_input
predict_y, loss, auc_var, batch_auc_var = dnn_model(
nn_input, sparse_input_ids, label,
args.embedding_size, args.sparse_feature_dim) args.embedding_size, args.sparse_feature_dim)
optimizer = fluid.optimizer.SGD(learning_rate=1e-4) optimizer = fluid.optimizer.SGD(learning_rate=1e-4)
...@@ -31,29 +34,33 @@ def train(): ...@@ -31,29 +34,33 @@ def train():
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
dataset.set_use_var([dense_input] + sparse_input_ids + [label]) dataset.set_use_var([dense_input] + sparse_input_ids + [label])
python_executable = "python" python_executable = "python"
pipe_command = "{} criteo_reader.py {}".format(python_executable, args.sparse_feature_dim) pipe_command = "{} criteo_reader.py {}".format(
python_executable, args.sparse_feature_dim)
dataset.set_pipe_command(pipe_command) dataset.set_pipe_command(pipe_command)
dataset.set_batch_size(128) dataset.set_batch_size(128)
thread_num = 10 thread_num = 10
dataset.set_thread(thread_num) dataset.set_thread(thread_num)
whole_filelist = ["raw_data/part-%d" % x for x in range(len(os.listdir("raw_data")))]
#dataset.set_filelist(whole_filelist[:(len(whole_filelist)-thread_num)]) whole_filelist = ["raw_data/part-%d" % x for x in
range(len(os.listdir("raw_data")))]
dataset.set_filelist(whole_filelist[:thread_num]) dataset.set_filelist(whole_filelist[:thread_num])
dataset.load_into_memory() dataset.load_into_memory()
epochs = 1 epochs = 1
for i in range(epochs): for i in range(epochs):
exe.train_from_dataset(program=fluid.default_main_program(), exe.train_from_dataset(
dataset=dataset, program=fluid.default_main_program(),
debug=True) dataset=dataset, debug=True)
print("epoch {} finished".format(i)) print("epoch {} finished".format(i))
import paddle_serving_client.io as server_io import paddle_serving_client.io as server_io
feed_var_dict = {} feed_var_dict = {}
for i, sparse in enumerate(sparse_input_ids): for i, sparse in enumerate(sparse_input_ids):
feed_var_dict["sparse_{}".format(i)] = sparse feed_var_dict["sparse_{}".format(i)] = sparse
feed_var_dict["dense_0"] = dense_input
fetch_var_dict = {"prob": predict_y} fetch_var_dict = {"prob": predict_y}
server_io.save_model( server_io.save_model(
......
import paddle.fluid as fluid import paddle.fluid as fluid
import math import math
dense_feature_dim = 13 def dnn_model(dense_input, sparse_inputs, label,
embedding_size, sparse_feature_dim):
def ctr_dnn_model_dataset(dense_input, sparse_inputs, label,
embedding_size, sparse_feature_dim):
def embedding_layer(input): def embedding_layer(input):
emb = fluid.layers.embedding( emb = fluid.layers.embedding(
input=input, input=input,
...@@ -15,20 +14,30 @@ def ctr_dnn_model_dataset(dense_input, sparse_inputs, label, ...@@ -15,20 +14,30 @@ def ctr_dnn_model_dataset(dense_input, sparse_inputs, label,
initializer=fluid.initializer.Uniform())) initializer=fluid.initializer.Uniform()))
return fluid.layers.sequence_pool(input=emb, pool_type='sum') return fluid.layers.sequence_pool(input=emb, pool_type='sum')
sparse_embed_seq = list(map(embedding_layer, sparse_inputs)) def mlp_input_tensor(emb_sums, dense_tensor):
concated = fluid.layers.concat(sparse_embed_seq, axis=1) if isinstance(dense_tensor, fluid.Variable):
fc1 = fluid.layers.fc(input=concated, size=400, act='relu', return fluid.layers.concat(emb_sums, axis=1)
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( else:
scale=1 / math.sqrt(concated.shape[1])))) return fluid.layers.concat(emb_sums + [dense_tensor], axis=1)
fc2 = fluid.layers.fc(input=fc1, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( def mlp(mlp_input):
scale=1 / math.sqrt(fc1.shape[1])))) fc1 = fluid.layers.fc(input=mlp_input, size=400, act='relu',
fc3 = fluid.layers.fc(input=fc2, size=400, act='relu', param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( scale=1 / math.sqrt(mlp_input.shape[1]))))
scale=1 / math.sqrt(fc2.shape[1])))) fc2 = fluid.layers.fc(input=fc1, size=400, act='relu',
predict = fluid.layers.fc(input=fc3, size=2, act='softmax', param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc1.shape[1]))))
fc3 = fluid.layers.fc(input=fc2, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc2.shape[1]))))
pre = fluid.layers.fc(input=fc3, size=2, act='softmax',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(fc3.shape[1])))) scale=1 / math.sqrt(fc3.shape[1]))))
return pre
emb_sums = list(map(embedding_layer, sparse_inputs))
mlp_in = mlp_input_tensor(emb_sums, dense_input)
predict = mlp(mlp_in)
cost = fluid.layers.cross_entropy(input=predict, label=label) cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.reduce_sum(cost) avg_cost = fluid.layers.reduce_sum(cost)
accuracy = fluid.layers.accuracy(input=predict, label=label) accuracy = fluid.layers.accuracy(input=predict, label=label)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册