train.py 9.4 KB
Newer Older
Q
Qiao Longfei 已提交
1
from __future__ import print_function
Q
Qiao Longfei 已提交
2

Q
Qiao Longfei 已提交
3
import argparse
Q
Qiao Longfei 已提交
4
import logging
Q
Qiao Longfei 已提交
5
import os
C
ccmeteorljh 已提交
6
import time
Q
Qiao Longfei 已提交
7

T
tangwei12 已提交
8
import numpy as np
9

Q
Qiao Longfei 已提交
10
import paddle
Q
Qiao Longfei 已提交
11 12 13
import paddle.fluid as fluid

import reader
Q
Qiao Longfei 已提交
14
from network_conf import ctr_dnn_model
T
tangwei12 已提交
15
from multiprocessing import cpu_count
16
import utils
T
tangwei12 已提交
17 18 19

# disable gpu training for this example
os.environ["CUDA_VISIBLE_DEVICES"] = ""
Q
Qiao Longfei 已提交
20

Z
zhang wenhui 已提交
21
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
Q
Qiao Longfei 已提交
22 23
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
Q
Qiao Longfei 已提交
24 25 26


def parse_args():
Q
Qiao Longfei 已提交
27
    parser = argparse.ArgumentParser(description="PaddlePaddle CTR example")
Q
Qiao Longfei 已提交
28 29 30
    parser.add_argument(
        '--train_data_path',
        type=str,
31
        default='./data/raw/train.txt',
Q
Qiao Longfei 已提交
32 33 34 35
        help="The path of training dataset")
    parser.add_argument(
        '--test_data_path',
        type=str,
36
        default='./data/raw/valid.txt',
Q
Qiao Longfei 已提交
37 38 39 40 41 42 43
        help="The path of testing dataset")
    parser.add_argument(
        '--batch_size',
        type=int,
        default=1000,
        help="The size of mini-batch (default:1000)")
    parser.add_argument(
Q
Qiao Longfei 已提交
44
        '--embedding_size',
Q
Qiao Longfei 已提交
45 46
        type=int,
        default=10,
Q
Qiao Longfei 已提交
47
        help="The size for embedding layer (default:10)")
Q
Qiao Longfei 已提交
48 49 50 51 52 53 54 55 56 57
    parser.add_argument(
        '--num_passes',
        type=int,
        default=10,
        help="The number of passes to train (default: 10)")
    parser.add_argument(
        '--model_output_dir',
        type=str,
        default='models',
        help='The path for model to store (default: models)')
58 59 60 61 62
    parser.add_argument(
        '--sparse_feature_dim',
        type=int,
        default=1000001,
        help='sparse feature hashing space for index processing')
Q
Qiao Longfei 已提交
63 64
    parser.add_argument(
        '--is_local',
Q
Qiao Longfei 已提交
65 66 67
        type=int,
        default=1,
        help='Local train or distributed train (default: 1)')
L
Liang 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
    parser.add_argument(
        '--cloud_train',
        type=int,
        default=0,
        help='Local train or distributed train on paddlecloud (default: 0)')
    parser.add_argument(
        '--async_mode',
        action='store_true',
        default=False,
        help='Whether start pserver in async mode to support ASGD')
    parser.add_argument(
        '--no_split_var',
        action='store_true',
        default=False,
        help='Whether split variables into blocks when update_method is pserver')
Q
Qiao Longfei 已提交
83 84 85 86
    # the following arguments is used for distributed train, if is_local == false, then you should set them
    parser.add_argument(
        '--role',
        type=str,
Z
zhang wenhui 已提交
87
        default='pserver',  # trainer or pserver
Q
Qiao Longfei 已提交
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
        help='The path for model to store (default: models)')
    parser.add_argument(
        '--endpoints',
        type=str,
        default='127.0.0.1:6000',
        help='The pserver endpoints, like: 127.0.0.1:6000,127.0.0.1:6001')
    parser.add_argument(
        '--current_endpoint',
        type=str,
        default='127.0.0.1:6000',
        help='The path for model to store (default: 127.0.0.1:6000)')
    parser.add_argument(
        '--trainer_id',
        type=int,
        default=0,
        help='The path for model to store (default: models)')
    parser.add_argument(
        '--trainers',
        type=int,
        default=1,
        help='The num of trianers, (default: 1)')
Z
zhengya01 已提交
109 110 111 112
    parser.add_argument(
        '--enable_ce',
        action='store_true',
        help='If set, run the task with continuous evaluation logs.')
Q
Qiao Longfei 已提交
113

Q
Qiao Longfei 已提交
114
    return parser.parse_args()
Q
Qiao Longfei 已提交
115 116


T
tangwei12 已提交
117
def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var,
D
dongdaxiang 已提交
118
               trainer_num, trainer_id):
Z
zhang wenhui 已提交
119

Z
zhengya01 已提交
120 121 122 123 124
    if args.enable_ce:
        SEED = 102
        train_program.random_seed = SEED
        fluid.default_startup_program().random_seed = SEED

125
    dataset = reader.CriteoDataset(args.sparse_feature_dim)
Q
Qiao Longfei 已提交
126 127
    train_reader = paddle.batch(
        paddle.reader.shuffle(
D
dongdaxiang 已提交
128
            dataset.train([args.train_data_path], trainer_num, trainer_id),
Q
Qiao Longfei 已提交
129 130 131
            buf_size=args.batch_size * 100),
        batch_size=args.batch_size)

T
tangwei12 已提交
132
    py_reader.decorate_paddle_reader(train_reader)
T
tangwei12 已提交
133
    data_name_list = []
Q
Qiao Longfei 已提交
134

T
tangwei12 已提交
135
    place = fluid.CPUPlace()
Q
Qiao Longfei 已提交
136
    exe = fluid.Executor(place)
T
tangwei12 已提交
137 138 139 140 141 142 143 144 145 146 147 148

    exec_strategy = fluid.ExecutionStrategy()
    build_strategy = fluid.BuildStrategy()

    if os.getenv("NUM_THREADS", ""):
        exec_strategy.num_threads = int(os.getenv("NUM_THREADS"))

    cpu_num = int(os.environ.get('CPU_NUM', cpu_count()))
    build_strategy.reduce_strategy = \
        fluid.BuildStrategy.ReduceStrategy.Reduce if cpu_num > 1 \
            else fluid.BuildStrategy.ReduceStrategy.AllReduce

149
    exe.run(fluid.default_startup_program())
T
tangwei12 已提交
150 151 152 153 154 155 156
    pe = fluid.ParallelExecutor(
        use_cuda=False,
        loss_name=loss.name,
        main_program=train_program,
        build_strategy=build_strategy,
        exec_strategy=exec_strategy)

Z
zhengya01 已提交
157
    total_time = 0
Q
Qiao Longfei 已提交
158
    for pass_id in range(args.num_passes):
C
ccmeteorljh 已提交
159
        pass_start = time.time()
T
tangwei12 已提交
160 161 162 163 164
        batch_id = 0
        py_reader.start()

        try:
            while True:
Z
zhang wenhui 已提交
165 166
                loss_val, auc_val, batch_auc_val = pe.run(
                    fetch_list=[loss.name, auc_var.name, batch_auc_var.name])
T
tangwei12 已提交
167 168 169 170
                loss_val = np.mean(loss_val)
                auc_val = np.mean(auc_val)
                batch_auc_val = np.mean(batch_auc_val)

Z
zhang wenhui 已提交
171 172 173 174
                logger.info(
                    "TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}"
                    .format(pass_id, batch_id, loss_val / args.batch_size,
                            auc_val, batch_auc_val))
T
tangwei12 已提交
175
                if batch_id % 1000 == 0 and batch_id != 0:
176 177
                    model_dir = os.path.join(args.model_output_dir,
                                             'batch-' + str(batch_id))
T
tangwei12 已提交
178
                    if args.trainer_id == 0:
Z
zhang wenhui 已提交
179 180 181 182
                        fluid.io.save_persistables(
                            executor=exe,
                            dirname=model_dir,
                            main_program=fluid.default_main_program())
T
tangwei12 已提交
183 184 185
                batch_id += 1
        except fluid.core.EOFException:
            py_reader.reset()
Z
zhang wenhui 已提交
186 187
        print("pass_id: %d, pass_time_cost: %f" %
              (pass_id, time.time() - pass_start))
T
tangwei12 已提交
188

Z
zhengya01 已提交
189 190
        total_time += time.time() - pass_start

191
        model_dir = os.path.join(args.model_output_dir, 'pass-' + str(pass_id))
Q
Qiao Longfei 已提交
192
        if args.trainer_id == 0:
Z
zhang wenhui 已提交
193 194 195 196
            fluid.io.save_persistables(
                executor=exe,
                dirname=model_dir,
                main_program=fluid.default_main_program())
Q
Qiao Longfei 已提交
197

Z
zhengya01 已提交
198 199
    # only for ce
    if args.enable_ce:
Z
zhengya01 已提交
200
        threads_num, cpu_num = get_cards(args)
Z
zhang wenhui 已提交
201
        epoch_idx = args.num_passes
Z
zhengya01 已提交
202
        print("kpis\teach_pass_duration_cpu%s_thread%s\t%s" %
Z
zhang wenhui 已提交
203
              (cpu_num, threads_num, total_time / epoch_idx))
Z
zhengya01 已提交
204
        print("kpis\ttrain_loss_cpu%s_thread%s\t%s" %
Z
zhang wenhui 已提交
205
              (cpu_num, threads_num, loss_val / args.batch_size))
Z
zhengya01 已提交
206
        print("kpis\ttrain_auc_val_cpu%s_thread%s\t%s" %
Z
zhang wenhui 已提交
207
              (cpu_num, threads_num, auc_val))
Z
zhengya01 已提交
208
        print("kpis\ttrain_batch_auc_val_cpu%s_thread%s\t%s" %
Z
zhang wenhui 已提交
209 210
              (cpu_num, threads_num, batch_auc_val))

Q
Qiao Longfei 已提交
211

Q
Qiao Longfei 已提交
212 213 214 215 216 217
def train():
    args = parse_args()

    if not os.path.isdir(args.model_output_dir):
        os.mkdir(args.model_output_dir)

218
    loss, auc_var, batch_auc_var, py_reader, _, auc_states = ctr_dnn_model(
Z
zhang wenhui 已提交
219
        args.embedding_size, args.sparse_feature_dim)
Q
Qiao Longfei 已提交
220 221
    optimizer = fluid.optimizer.Adam(learning_rate=1e-4)
    optimizer.minimize(loss)
L
Liang 已提交
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
    if args.cloud_train:
        # the port of all pservers, needed by both trainer and pserver
        port = os.getenv("PADDLE_PORT", "6174")
        # comma separated ips of all pservers, needed by trainer and
        pserver_ips = os.getenv("PADDLE_PSERVERS", "")
        eplist = []
        for ip in pserver_ips.split(","):
            eplist.append(':'.join([ip, port]))
        args.endpoints = ",".join(eplist)
        args.trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
        args.current_endpoint = os.getenv("POD_IP", "localhost") + ":" + port
        args.role = os.getenv("TRAINING_ROLE", "TRAINER")
        args.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
        args.is_local = bool(int(os.getenv("PADDLE_IS_LOCAL", 0)))

Q
Qiao Longfei 已提交
237
    if args.is_local:
Q
Qiao Longfei 已提交
238
        logger.info("run local training")
Q
Qiao Longfei 已提交
239
        main_program = fluid.default_main_program()
Z
zhang wenhui 已提交
240 241
        train_loop(args, main_program, py_reader, loss, auc_var, batch_auc_var,
                   1, 0)
Q
Qiao Longfei 已提交
242
    else:
Q
Qiao Longfei 已提交
243
        logger.info("run dist training")
Q
Qiao Longfei 已提交
244
        t = fluid.DistributeTranspiler()
Z
zhang wenhui 已提交
245 246
        t.transpile(
            args.trainer_id, pservers=args.endpoints, trainers=args.trainers)
L
Liang 已提交
247
        if args.role == "pserver" or args.role == "PSERVER":
Q
Qiao Longfei 已提交
248
            logger.info("run pserver")
Q
Qiao Longfei 已提交
249
            prog = t.get_pserver_program(args.current_endpoint)
Z
zhang wenhui 已提交
250 251
            startup = t.get_startup_program(
                args.current_endpoint, pserver_program=prog)
Q
Qiao Longfei 已提交
252 253 254
            exe = fluid.Executor(fluid.CPUPlace())
            exe.run(startup)
            exe.run(prog)
L
Liang 已提交
255
        elif args.role == "trainer" or args.role == "TRAINER":
Q
Qiao Longfei 已提交
256
            logger.info("run trainer")
Q
Qiao Longfei 已提交
257
            train_prog = t.get_trainer_program()
Z
zhang wenhui 已提交
258 259
            train_loop(args, train_prog, py_reader, loss, auc_var,
                       batch_auc_var, args.trainers, args.trainer_id)
L
Liang 已提交
260 261 262 263
        else:
            raise ValueError(
                'PADDLE_TRAINING_ROLE environment variable must be either TRAINER or PSERVER'
            )
Q
Qiao Longfei 已提交
264 265


Z
zhengya01 已提交
266
def get_cards(args):
Z
zhengya01 已提交
267 268 269
    threads_num = os.environ.get('NUM_THREADS', 1)
    cpu_num = os.environ.get('CPU_NUM', 1)
    return int(threads_num), int(cpu_num)
Z
zhengya01 已提交
270 271


Q
Qiao Longfei 已提交
272
if __name__ == '__main__':
273
    utils.check_version()
Q
Qiao Longfei 已提交
274
    train()