diff --git a/python/examples/criteo_ctr/args.py b/python/examples/criteo_ctr/args.py index eb2227a5865087f05fcd6124812724544ae5335d..a30398e33de4618282988bc2dc990a3a099a91b7 100644 --- a/python/examples/criteo_ctr/args.py +++ b/python/examples/criteo_ctr/args.py @@ -3,83 +3,88 @@ import argparse def parse_args(): parser = argparse.ArgumentParser(description="PaddlePaddle CTR example") parser.add_argument( - '--train_data_path', - type=str, - default='./data/raw/train.txt', - help="The path of training dataset") - parser.add_argument( - '--test_data_path', - type=str, - default='./data/raw/valid.txt', - help="The path of testing dataset") - parser.add_argument( - '--batch_size', - type=int, - default=1000, - help="The size of mini-batch (default:1000)") - parser.add_argument( - '--embedding_size', - type=int, - default=10, - help="The size for embedding layer (default:10)") - parser.add_argument( - '--num_passes', - type=int, - default=10, - help="The number of passes to train (default: 10)") - parser.add_argument( - '--model_output_dir', - type=str, - default='models', - help='The path for model to store (default: models)') - parser.add_argument( - '--sparse_feature_dim', - type=int, - default=1000001, - help='sparse feature hashing space for index processing') - parser.add_argument( - '--is_local', - type=int, - default=1, - help='Local train or distributed train (default: 1)') - parser.add_argument( - '--cloud_train', - type=int, - default=0, - help='Local train or distributed train on paddlecloud (default: 0)') - parser.add_argument( - '--async_mode', - action='store_true', - default=False, - help='Whether start pserver in async mode to support ASGD') - parser.add_argument( - '--no_split_var', - action='store_true', - default=False, - help='Whether split variables into blocks when update_method is pserver') - parser.add_argument( - '--role', - type=str, - default='pserver', # trainer or pserver - help='The path for model to store (default: models)') - parser.add_argument( - '--endpoints', - type=str, - default='127.0.0.1:6000', - help='The pserver endpoints, like: 127.0.0.1:6000,127.0.0.1:6001') - parser.add_argument( - '--current_endpoint', - type=str, - default='127.0.0.1:6000', - help='The path for model to store (default: 127.0.0.1:6000)') - parser.add_argument( - '--trainer_id', - type=int, - default=0, - help='The path for model to store (default: models)') - parser.add_argument( - '--trainers', - type=int, - default=1, - help='The num of trianers, (default: 1)') + '--train_data_path', + type=str, + default='./data/raw/train.txt', + help="The path of training dataset") + parser.add_argument( + '--sparse_only', + type=bool, + default=False, + help="Whether we use sparse features only") + parser.add_argument( + '--test_data_path', + type=str, + default='./data/raw/valid.txt', + help="The path of testing dataset") + parser.add_argument( + '--batch_size', + type=int, + default=1000, + help="The size of mini-batch (default:1000)") + parser.add_argument( + '--embedding_size', + type=int, + default=10, + help="The size for embedding layer (default:10)") + parser.add_argument( + '--num_passes', + type=int, + default=10, + help="The number of passes to train (default: 10)") + parser.add_argument( + '--model_output_dir', + type=str, + default='models', + help='The path for model to store (default: models)') + parser.add_argument( + '--sparse_feature_dim', + type=int, + default=1000001, + help='sparse feature hashing space for index processing') + parser.add_argument( + '--is_local', + type=int, + default=1, + help='Local train or distributed train (default: 1)') + parser.add_argument( + '--cloud_train', + type=int, + default=0, + help='Local train or distributed train on paddlecloud (default: 0)') + parser.add_argument( + '--async_mode', + action='store_true', + default=False, + help='Whether start pserver in async mode to support ASGD') + parser.add_argument( + '--no_split_var', + action='store_true', + default=False, + help='Whether split variables into blocks when update_method is pserver') + parser.add_argument( + '--role', + type=str, + default='pserver', # trainer or pserver + help='The path for model to store (default: models)') + parser.add_argument( + '--endpoints', + type=str, + default='127.0.0.1:6000', + help='The pserver endpoints, like: 127.0.0.1:6000,127.0.0.1:6001') + parser.add_argument( + '--current_endpoint', + type=str, + default='127.0.0.1:6000', + help='The path for model to store (default: 127.0.0.1:6000)') + parser.add_argument( + '--trainer_id', + type=int, + default=0, + help='The path for model to store (default: models)') + parser.add_argument( + '--trainers', + type=int, + default=1, + help='The num of trianers, (default: 1)') return parser.parse_args() diff --git a/python/examples/criteo_ctr/get_data.sh b/python/examples/criteo_ctr/get_data.sh index 6a75a0b1db1c013e6e5bf7b6312f1ebebe2d737a..338b8d1fdfa6bc64421f559d41b10c004495cbdd 100644 --- a/python/examples/criteo_ctr/get_data.sh +++ b/python/examples/criteo_ctr/get_data.sh @@ -1,2 +1,2 @@ wget https://paddle-serving.bj.bcebos.com/data%2Fctr_prediction%2Fctr_data.tar.gz -tar -zxvf ctr_data.tar.gz +tar -zxvf *ctr_data.tar.gz diff --git a/python/examples/criteo_ctr/local_train.py b/python/examples/criteo_ctr/local_train.py index ea8d3ad5948557a495bb680e7db8f19d8b0ce10c..a3cf57c8df68fa8b943b1e754141fdaa4c786eb7 100644 --- a/python/examples/criteo_ctr/local_train.py +++ b/python/examples/criteo_ctr/local_train.py @@ -4,15 +4,16 @@ from args import parse_args import os import paddle.fluid as fluid import sys -from network_conf import ctr_dnn_model_dataset +from network_conf import dnn_model dense_feature_dim = 13 + def train(): args = parse_args() + sparse_only = args.sparse_only if not os.path.isdir(args.model_output_dir): os.mkdir(args.model_output_dir) - dense_input = fluid.layers.data( name="dense_input", shape=[dense_feature_dim], dtype='float32') sparse_input_ids = [ @@ -20,8 +21,10 @@ def train(): for i in range(1, 27)] label = fluid.layers.data(name='label', shape=[1], dtype='int64') - predict_y, loss, auc_var, batch_auc_var = ctr_dnn_model_dataset( - dense_input, sparse_input_ids, label, + #nn_input = None if sparse_only else dense_input + nn_input = dense_input + predict_y, loss, auc_var, batch_auc_var = dnn_model( + nn_input, sparse_input_ids, label, args.embedding_size, args.sparse_feature_dim) optimizer = fluid.optimizer.SGD(learning_rate=1e-4) @@ -31,29 +34,33 @@ def train(): exe.run(fluid.default_startup_program()) dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") dataset.set_use_var([dense_input] + sparse_input_ids + [label]) + python_executable = "python" - pipe_command = "{} criteo_reader.py {}".format(python_executable, args.sparse_feature_dim) + pipe_command = "{} criteo_reader.py {}".format( + python_executable, args.sparse_feature_dim) + dataset.set_pipe_command(pipe_command) dataset.set_batch_size(128) thread_num = 10 dataset.set_thread(thread_num) - whole_filelist = ["raw_data/part-%d" % x for x in range(len(os.listdir("raw_data")))] - #dataset.set_filelist(whole_filelist[:(len(whole_filelist)-thread_num)]) + + whole_filelist = ["raw_data/part-%d" % x for x in + range(len(os.listdir("raw_data")))] + dataset.set_filelist(whole_filelist[:thread_num]) dataset.load_into_memory() epochs = 1 for i in range(epochs): - exe.train_from_dataset(program=fluid.default_main_program(), - dataset=dataset, - debug=True) + exe.train_from_dataset( + program=fluid.default_main_program(), + dataset=dataset, debug=True) print("epoch {} finished".format(i)) import paddle_serving_client.io as server_io feed_var_dict = {} for i, sparse in enumerate(sparse_input_ids): feed_var_dict["sparse_{}".format(i)] = sparse - feed_var_dict["dense_0"] = dense_input fetch_var_dict = {"prob": predict_y} server_io.save_model( diff --git a/python/examples/criteo_ctr/network_conf.py b/python/examples/criteo_ctr/network_conf.py index c2e2f1dd98a5b67098d4859a95104601627bcf49..429921da0f11edd2fe5296667271135ce53a5808 100644 --- a/python/examples/criteo_ctr/network_conf.py +++ b/python/examples/criteo_ctr/network_conf.py @@ -1,10 +1,9 @@ import paddle.fluid as fluid import math -dense_feature_dim = 13 +def dnn_model(dense_input, sparse_inputs, label, + embedding_size, sparse_feature_dim): -def ctr_dnn_model_dataset(dense_input, sparse_inputs, label, - embedding_size, sparse_feature_dim): def embedding_layer(input): emb = fluid.layers.embedding( input=input, @@ -15,20 +14,30 @@ def ctr_dnn_model_dataset(dense_input, sparse_inputs, label, initializer=fluid.initializer.Uniform())) return fluid.layers.sequence_pool(input=emb, pool_type='sum') - sparse_embed_seq = list(map(embedding_layer, sparse_inputs)) - concated = fluid.layers.concat(sparse_embed_seq, axis=1) - fc1 = fluid.layers.fc(input=concated, size=400, act='relu', - param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( - scale=1 / math.sqrt(concated.shape[1])))) - fc2 = fluid.layers.fc(input=fc1, size=400, act='relu', - param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( - scale=1 / math.sqrt(fc1.shape[1])))) - fc3 = fluid.layers.fc(input=fc2, size=400, act='relu', - param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( - scale=1 / math.sqrt(fc2.shape[1])))) - predict = fluid.layers.fc(input=fc3, size=2, act='softmax', + def mlp_input_tensor(emb_sums, dense_tensor): + if isinstance(dense_tensor, fluid.Variable): + return fluid.layers.concat(emb_sums, axis=1) + else: + return fluid.layers.concat(emb_sums + [dense_tensor], axis=1) + + def mlp(mlp_input): + fc1 = fluid.layers.fc(input=mlp_input, size=400, act='relu', + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(mlp_input.shape[1])))) + fc2 = fluid.layers.fc(input=fc1, size=400, act='relu', + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc1.shape[1])))) + fc3 = fluid.layers.fc(input=fc2, size=400, act='relu', + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc2.shape[1])))) + pre = fluid.layers.fc(input=fc3, size=2, act='softmax', param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal( scale=1 / math.sqrt(fc3.shape[1])))) + return pre + + emb_sums = list(map(embedding_layer, sparse_inputs)) + mlp_in = mlp_input_tensor(emb_sums, dense_input) + predict = mlp(mlp_in) cost = fluid.layers.cross_entropy(input=predict, label=label) avg_cost = fluid.layers.reduce_sum(cost) accuracy = fluid.layers.accuracy(input=predict, label=label)