train.py 7.0 KB
Newer Older
Q
Qiao Longfei 已提交
1
from __future__ import print_function
Q
Qiao Longfei 已提交
2

Q
Qiao Longfei 已提交
3
import argparse
Q
Qiao Longfei 已提交
4
import logging
Q
Qiao Longfei 已提交
5
import os
C
ccmeteorljh 已提交
6
import time
Q
Qiao Longfei 已提交
7

8 9 10
# disable gpu training for this example 
os.environ["CUDA_VISIBLE_DEVICES"] = ""

Q
Qiao Longfei 已提交
11
import paddle
Q
Qiao Longfei 已提交
12 13 14
import paddle.fluid as fluid

import reader
Q
Qiao Longfei 已提交
15 16
from network_conf import ctr_dnn_model

Q
Qiao Longfei 已提交
17 18 19 20
logging.basicConfig(
    format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
Q
Qiao Longfei 已提交
21 22 23


def parse_args():
Q
Qiao Longfei 已提交
24
    parser = argparse.ArgumentParser(description="PaddlePaddle CTR example")
Q
Qiao Longfei 已提交
25 26 27
    parser.add_argument(
        '--train_data_path',
        type=str,
28
        default='./data/raw/train.txt',
Q
Qiao Longfei 已提交
29 30 31 32
        help="The path of training dataset")
    parser.add_argument(
        '--test_data_path',
        type=str,
33
        default='./data/raw/valid.txt',
Q
Qiao Longfei 已提交
34 35 36 37 38 39 40
        help="The path of testing dataset")
    parser.add_argument(
        '--batch_size',
        type=int,
        default=1000,
        help="The size of mini-batch (default:1000)")
    parser.add_argument(
Q
Qiao Longfei 已提交
41
        '--embedding_size',
Q
Qiao Longfei 已提交
42 43
        type=int,
        default=10,
Q
Qiao Longfei 已提交
44
        help="The size for embedding layer (default:10)")
Q
Qiao Longfei 已提交
45 46 47 48 49 50 51 52 53 54
    parser.add_argument(
        '--num_passes',
        type=int,
        default=10,
        help="The number of passes to train (default: 10)")
    parser.add_argument(
        '--model_output_dir',
        type=str,
        default='models',
        help='The path for model to store (default: models)')
55 56 57 58 59
    parser.add_argument(
        '--sparse_feature_dim',
        type=int,
        default=1000001,
        help='sparse feature hashing space for index processing')
Q
Qiao Longfei 已提交
60 61
    parser.add_argument(
        '--is_local',
Q
Qiao Longfei 已提交
62 63 64
        type=int,
        default=1,
        help='Local train or distributed train (default: 1)')
L
Liang 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
    parser.add_argument(
        '--cloud_train',
        type=int,
        default=0,
        help='Local train or distributed train on paddlecloud (default: 0)')
    parser.add_argument(
        '--async_mode',
        action='store_true',
        default=False,
        help='Whether start pserver in async mode to support ASGD')
    parser.add_argument(
        '--no_split_var',
        action='store_true',
        default=False,
        help='Whether split variables into blocks when update_method is pserver')
Q
Qiao Longfei 已提交
80 81 82 83
    # the following arguments is used for distributed train, if is_local == false, then you should set them
    parser.add_argument(
        '--role',
        type=str,
Q
Qiao Longfei 已提交
84
        default='pserver', # trainer or pserver
Q
Qiao Longfei 已提交
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
        help='The path for model to store (default: models)')
    parser.add_argument(
        '--endpoints',
        type=str,
        default='127.0.0.1:6000',
        help='The pserver endpoints, like: 127.0.0.1:6000,127.0.0.1:6001')
    parser.add_argument(
        '--current_endpoint',
        type=str,
        default='127.0.0.1:6000',
        help='The path for model to store (default: 127.0.0.1:6000)')
    parser.add_argument(
        '--trainer_id',
        type=int,
        default=0,
        help='The path for model to store (default: models)')
    parser.add_argument(
        '--trainers',
        type=int,
        default=1,
        help='The num of trianers, (default: 1)')
Q
Qiao Longfei 已提交
106

Q
Qiao Longfei 已提交
107
    return parser.parse_args()
Q
Qiao Longfei 已提交
108 109


D
dongdaxiang 已提交
110 111
def train_loop(args, train_program, data_list, loss, auc_var, batch_auc_var, 
               trainer_num, trainer_id):
112
    dataset = reader.CriteoDataset(args.sparse_feature_dim)
Q
Qiao Longfei 已提交
113 114
    train_reader = paddle.batch(
        paddle.reader.shuffle(
D
dongdaxiang 已提交
115
            dataset.train([args.train_data_path], trainer_num, trainer_id),
Q
Qiao Longfei 已提交
116 117 118 119 120
            buf_size=args.batch_size * 100),
        batch_size=args.batch_size)
    place = fluid.CPUPlace()

    feeder = fluid.DataFeeder(feed_list=data_list, place=place)
Q
Qiao Longfei 已提交
121
    data_name_list = [var.name for var in data_list]
Q
Qiao Longfei 已提交
122 123 124

    exe = fluid.Executor(place)
    exe.run(fluid.default_startup_program())
Q
Qiao Longfei 已提交
125
    for pass_id in range(args.num_passes):
C
ccmeteorljh 已提交
126
        pass_start = time.time()
Q
Qiao Longfei 已提交
127
        for batch_id, data in enumerate(train_reader()):
Q
Qiao Longfei 已提交
128
            loss_val, auc_val, batch_auc_val = exe.run(
Q
Qiao Longfei 已提交
129
                train_program,
Q
Qiao Longfei 已提交
130 131 132
                feed=feeder.feed(data),
                fetch_list=[loss, auc_var, batch_auc_var]
            )
Q
Qiao Longfei 已提交
133
            logger.info("TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}"
Q
Qiao Longfei 已提交
134
                      .format(pass_id, batch_id, loss_val/args.batch_size, auc_val, batch_auc_val))
Q
Qiao Longfei 已提交
135 136
            if batch_id % 1000 == 0 and batch_id != 0:
                model_dir = args.model_output_dir + '/batch-' + str(batch_id)
Q
Qiao Longfei 已提交
137 138
                if args.trainer_id == 0:
                    fluid.io.save_inference_model(model_dir, data_name_list, [loss, auc_var], exe)
C
ccmeteorljh 已提交
139
        print("pass_id: %d, pass_time_cost: %f" % (pass_id, time.time() - pass_start))
Q
Qiao Longfei 已提交
140
        model_dir = args.model_output_dir + '/pass-' + str(pass_id)
Q
Qiao Longfei 已提交
141 142
        if args.trainer_id == 0:
            fluid.io.save_inference_model(model_dir, data_name_list, [loss, auc_var], exe)
Q
Qiao Longfei 已提交
143 144


Q
Qiao Longfei 已提交
145 146 147 148 149 150
def train():
    args = parse_args()

    if not os.path.isdir(args.model_output_dir):
        os.mkdir(args.model_output_dir)

151
    loss, data_list, auc_var, batch_auc_var = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim)
Q
Qiao Longfei 已提交
152 153
    optimizer = fluid.optimizer.Adam(learning_rate=1e-4)
    optimizer.minimize(loss)
L
Liang 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
    if args.cloud_train:
        # the port of all pservers, needed by both trainer and pserver
        port = os.getenv("PADDLE_PORT", "6174")
        # comma separated ips of all pservers, needed by trainer and
        pserver_ips = os.getenv("PADDLE_PSERVERS", "")
        eplist = []
        for ip in pserver_ips.split(","):
            eplist.append(':'.join([ip, port]))
        args.endpoints = ",".join(eplist)
        args.trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
        args.current_endpoint = os.getenv("POD_IP", "localhost") + ":" + port
        args.role = os.getenv("TRAINING_ROLE", "TRAINER")
        args.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
        args.is_local = bool(int(os.getenv("PADDLE_IS_LOCAL", 0)))

Q
Qiao Longfei 已提交
169 170

    if args.is_local:
Q
Qiao Longfei 已提交
171
        logger.info("run local training")
Q
Qiao Longfei 已提交
172
        main_program = fluid.default_main_program()
Q
Qiao Longfei 已提交
173
        train_loop(args, main_program, data_list, loss, auc_var, batch_auc_var, 1, 0)
Q
Qiao Longfei 已提交
174
    else:
Q
Qiao Longfei 已提交
175
        logger.info("run dist training")
Q
Qiao Longfei 已提交
176 177
        t = fluid.DistributeTranspiler()
        t.transpile(args.trainer_id, pservers=args.endpoints, trainers=args.trainers)
L
Liang 已提交
178
        if args.role == "pserver" or args.role == "PSERVER":
Q
Qiao Longfei 已提交
179
            logger.info("run pserver")
Q
Qiao Longfei 已提交
180
            prog = t.get_pserver_program(args.current_endpoint)
Q
Qiao Longfei 已提交
181 182 183 184
            startup = t.get_startup_program(args.current_endpoint, pserver_program=prog)
            exe = fluid.Executor(fluid.CPUPlace())
            exe.run(startup)
            exe.run(prog)
L
Liang 已提交
185
        elif args.role == "trainer" or args.role == "TRAINER":
Q
Qiao Longfei 已提交
186
            logger.info("run trainer")
Q
Qiao Longfei 已提交
187
            train_prog = t.get_trainer_program()
188
            train_loop(args, train_prog, data_list, loss, auc_var, batch_auc_var, 
Q
Qiao Longfei 已提交
189
                       args.trainers, args.trainer_id)
L
Liang 已提交
190 191 192 193
        else:
            raise ValueError(
                'PADDLE_TRAINING_ROLE environment variable must be either TRAINER or PSERVER'
            )
Q
Qiao Longfei 已提交
194 195


Q
Qiao Longfei 已提交
196 197
if __name__ == '__main__':
    train()