train.py 9.3 KB
Newer Older
Q
Qiao Longfei 已提交
1
from __future__ import print_function
Q
Qiao Longfei 已提交
2

Q
Qiao Longfei 已提交
3
import argparse
Q
Qiao Longfei 已提交
4
import logging
Q
Qiao Longfei 已提交
5
import os
C
ccmeteorljh 已提交
6
import time
Q
Qiao Longfei 已提交
7

T
tangwei12 已提交
8
import numpy as np
9

Q
Qiao Longfei 已提交
10
import paddle
Q
Qiao Longfei 已提交
11 12 13
import paddle.fluid as fluid

import reader
Q
Qiao Longfei 已提交
14
from network_conf import ctr_dnn_model
T
tangwei12 已提交
15 16 17 18
from multiprocessing import cpu_count

# disable gpu training for this example
os.environ["CUDA_VISIBLE_DEVICES"] = ""
Q
Qiao Longfei 已提交
19

Z
zhang wenhui 已提交
20
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
Q
Qiao Longfei 已提交
21 22
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
Q
Qiao Longfei 已提交
23 24 25


def parse_args():
Q
Qiao Longfei 已提交
26
    parser = argparse.ArgumentParser(description="PaddlePaddle CTR example")
Q
Qiao Longfei 已提交
27 28 29
    parser.add_argument(
        '--train_data_path',
        type=str,
30
        default='./data/raw/train.txt',
Q
Qiao Longfei 已提交
31 32 33 34
        help="The path of training dataset")
    parser.add_argument(
        '--test_data_path',
        type=str,
35
        default='./data/raw/valid.txt',
Q
Qiao Longfei 已提交
36 37 38 39 40 41 42
        help="The path of testing dataset")
    parser.add_argument(
        '--batch_size',
        type=int,
        default=1000,
        help="The size of mini-batch (default:1000)")
    parser.add_argument(
Q
Qiao Longfei 已提交
43
        '--embedding_size',
Q
Qiao Longfei 已提交
44 45
        type=int,
        default=10,
Q
Qiao Longfei 已提交
46
        help="The size for embedding layer (default:10)")
Q
Qiao Longfei 已提交
47 48 49 50 51 52 53 54 55 56
    parser.add_argument(
        '--num_passes',
        type=int,
        default=10,
        help="The number of passes to train (default: 10)")
    parser.add_argument(
        '--model_output_dir',
        type=str,
        default='models',
        help='The path for model to store (default: models)')
57 58 59 60 61
    parser.add_argument(
        '--sparse_feature_dim',
        type=int,
        default=1000001,
        help='sparse feature hashing space for index processing')
Q
Qiao Longfei 已提交
62 63
    parser.add_argument(
        '--is_local',
Q
Qiao Longfei 已提交
64 65 66
        type=int,
        default=1,
        help='Local train or distributed train (default: 1)')
L
Liang 已提交
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
    parser.add_argument(
        '--cloud_train',
        type=int,
        default=0,
        help='Local train or distributed train on paddlecloud (default: 0)')
    parser.add_argument(
        '--async_mode',
        action='store_true',
        default=False,
        help='Whether start pserver in async mode to support ASGD')
    parser.add_argument(
        '--no_split_var',
        action='store_true',
        default=False,
        help='Whether split variables into blocks when update_method is pserver')
Q
Qiao Longfei 已提交
82 83 84 85
    # the following arguments is used for distributed train, if is_local == false, then you should set them
    parser.add_argument(
        '--role',
        type=str,
Z
zhang wenhui 已提交
86
        default='pserver',  # trainer or pserver
Q
Qiao Longfei 已提交
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
        help='The path for model to store (default: models)')
    parser.add_argument(
        '--endpoints',
        type=str,
        default='127.0.0.1:6000',
        help='The pserver endpoints, like: 127.0.0.1:6000,127.0.0.1:6001')
    parser.add_argument(
        '--current_endpoint',
        type=str,
        default='127.0.0.1:6000',
        help='The path for model to store (default: 127.0.0.1:6000)')
    parser.add_argument(
        '--trainer_id',
        type=int,
        default=0,
        help='The path for model to store (default: models)')
    parser.add_argument(
        '--trainers',
        type=int,
        default=1,
        help='The num of trianers, (default: 1)')
Z
zhengya01 已提交
108 109 110 111
    parser.add_argument(
        '--enable_ce',
        action='store_true',
        help='If set, run the task with continuous evaluation logs.')
Q
Qiao Longfei 已提交
112

Q
Qiao Longfei 已提交
113
    return parser.parse_args()
Q
Qiao Longfei 已提交
114 115


T
tangwei12 已提交
116
def train_loop(args, train_program, py_reader, loss, auc_var, batch_auc_var,
D
dongdaxiang 已提交
117
               trainer_num, trainer_id):
Z
zhang wenhui 已提交
118

Z
zhengya01 已提交
119 120 121 122 123
    if args.enable_ce:
        SEED = 102
        train_program.random_seed = SEED
        fluid.default_startup_program().random_seed = SEED

124
    dataset = reader.CriteoDataset(args.sparse_feature_dim)
Q
Qiao Longfei 已提交
125 126
    train_reader = paddle.batch(
        paddle.reader.shuffle(
D
dongdaxiang 已提交
127
            dataset.train([args.train_data_path], trainer_num, trainer_id),
Q
Qiao Longfei 已提交
128 129 130
            buf_size=args.batch_size * 100),
        batch_size=args.batch_size)

T
tangwei12 已提交
131
    py_reader.decorate_paddle_reader(train_reader)
T
tangwei12 已提交
132
    data_name_list = []
Q
Qiao Longfei 已提交
133

T
tangwei12 已提交
134
    place = fluid.CPUPlace()
Q
Qiao Longfei 已提交
135
    exe = fluid.Executor(place)
T
tangwei12 已提交
136 137 138 139 140 141 142 143 144 145 146 147

    exec_strategy = fluid.ExecutionStrategy()
    build_strategy = fluid.BuildStrategy()

    if os.getenv("NUM_THREADS", ""):
        exec_strategy.num_threads = int(os.getenv("NUM_THREADS"))

    cpu_num = int(os.environ.get('CPU_NUM', cpu_count()))
    build_strategy.reduce_strategy = \
        fluid.BuildStrategy.ReduceStrategy.Reduce if cpu_num > 1 \
            else fluid.BuildStrategy.ReduceStrategy.AllReduce

148
    exe.run(fluid.default_startup_program())
T
tangwei12 已提交
149 150 151 152 153 154 155
    pe = fluid.ParallelExecutor(
        use_cuda=False,
        loss_name=loss.name,
        main_program=train_program,
        build_strategy=build_strategy,
        exec_strategy=exec_strategy)

Z
zhengya01 已提交
156
    total_time = 0
Q
Qiao Longfei 已提交
157
    for pass_id in range(args.num_passes):
C
ccmeteorljh 已提交
158
        pass_start = time.time()
T
tangwei12 已提交
159 160 161 162 163
        batch_id = 0
        py_reader.start()

        try:
            while True:
Z
zhang wenhui 已提交
164 165
                loss_val, auc_val, batch_auc_val = pe.run(
                    fetch_list=[loss.name, auc_var.name, batch_auc_var.name])
T
tangwei12 已提交
166 167 168 169
                loss_val = np.mean(loss_val)
                auc_val = np.mean(auc_val)
                batch_auc_val = np.mean(batch_auc_val)

Z
zhang wenhui 已提交
170 171 172 173
                logger.info(
                    "TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}"
                    .format(pass_id, batch_id, loss_val / args.batch_size,
                            auc_val, batch_auc_val))
T
tangwei12 已提交
174
                if batch_id % 1000 == 0 and batch_id != 0:
Z
zhang wenhui 已提交
175 176
                    model_dir = args.model_output_dir + '/batch-' + str(
                        batch_id)
T
tangwei12 已提交
177
                    if args.trainer_id == 0:
Z
zhang wenhui 已提交
178 179 180 181
                        fluid.io.save_persistables(
                            executor=exe,
                            dirname=model_dir,
                            main_program=fluid.default_main_program())
T
tangwei12 已提交
182 183 184
                batch_id += 1
        except fluid.core.EOFException:
            py_reader.reset()
Z
zhang wenhui 已提交
185 186
        print("pass_id: %d, pass_time_cost: %f" %
              (pass_id, time.time() - pass_start))
T
tangwei12 已提交
187

Z
zhengya01 已提交
188 189
        total_time += time.time() - pass_start

Q
Qiao Longfei 已提交
190
        model_dir = args.model_output_dir + '/pass-' + str(pass_id)
Q
Qiao Longfei 已提交
191
        if args.trainer_id == 0:
Z
zhang wenhui 已提交
192 193 194 195
            fluid.io.save_persistables(
                executor=exe,
                dirname=model_dir,
                main_program=fluid.default_main_program())
Q
Qiao Longfei 已提交
196

Z
zhengya01 已提交
197 198
    # only for ce
    if args.enable_ce:
Z
zhengya01 已提交
199
        threads_num, cpu_num = get_cards(args)
Z
zhang wenhui 已提交
200
        epoch_idx = args.num_passes
Z
zhengya01 已提交
201
        print("kpis\teach_pass_duration_cpu%s_thread%s\t%s" %
Z
zhang wenhui 已提交
202
              (cpu_num, threads_num, total_time / epoch_idx))
Z
zhengya01 已提交
203
        print("kpis\ttrain_loss_cpu%s_thread%s\t%s" %
Z
zhang wenhui 已提交
204
              (cpu_num, threads_num, loss_val / args.batch_size))
Z
zhengya01 已提交
205
        print("kpis\ttrain_auc_val_cpu%s_thread%s\t%s" %
Z
zhang wenhui 已提交
206
              (cpu_num, threads_num, auc_val))
Z
zhengya01 已提交
207
        print("kpis\ttrain_batch_auc_val_cpu%s_thread%s\t%s" %
Z
zhang wenhui 已提交
208 209
              (cpu_num, threads_num, batch_auc_val))

Q
Qiao Longfei 已提交
210

Q
Qiao Longfei 已提交
211 212 213 214 215 216
def train():
    args = parse_args()

    if not os.path.isdir(args.model_output_dir):
        os.mkdir(args.model_output_dir)

Z
zhang wenhui 已提交
217 218
    loss, auc_var, batch_auc_var, py_reader, _ = ctr_dnn_model(
        args.embedding_size, args.sparse_feature_dim)
Q
Qiao Longfei 已提交
219 220
    optimizer = fluid.optimizer.Adam(learning_rate=1e-4)
    optimizer.minimize(loss)
L
Liang 已提交
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
    if args.cloud_train:
        # the port of all pservers, needed by both trainer and pserver
        port = os.getenv("PADDLE_PORT", "6174")
        # comma separated ips of all pservers, needed by trainer and
        pserver_ips = os.getenv("PADDLE_PSERVERS", "")
        eplist = []
        for ip in pserver_ips.split(","):
            eplist.append(':'.join([ip, port]))
        args.endpoints = ",".join(eplist)
        args.trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
        args.current_endpoint = os.getenv("POD_IP", "localhost") + ":" + port
        args.role = os.getenv("TRAINING_ROLE", "TRAINER")
        args.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
        args.is_local = bool(int(os.getenv("PADDLE_IS_LOCAL", 0)))

Q
Qiao Longfei 已提交
236
    if args.is_local:
Q
Qiao Longfei 已提交
237
        logger.info("run local training")
Q
Qiao Longfei 已提交
238
        main_program = fluid.default_main_program()
Z
zhang wenhui 已提交
239 240
        train_loop(args, main_program, py_reader, loss, auc_var, batch_auc_var,
                   1, 0)
Q
Qiao Longfei 已提交
241
    else:
Q
Qiao Longfei 已提交
242
        logger.info("run dist training")
Q
Qiao Longfei 已提交
243
        t = fluid.DistributeTranspiler()
Z
zhang wenhui 已提交
244 245
        t.transpile(
            args.trainer_id, pservers=args.endpoints, trainers=args.trainers)
L
Liang 已提交
246
        if args.role == "pserver" or args.role == "PSERVER":
Q
Qiao Longfei 已提交
247
            logger.info("run pserver")
Q
Qiao Longfei 已提交
248
            prog = t.get_pserver_program(args.current_endpoint)
Z
zhang wenhui 已提交
249 250
            startup = t.get_startup_program(
                args.current_endpoint, pserver_program=prog)
Q
Qiao Longfei 已提交
251 252 253
            exe = fluid.Executor(fluid.CPUPlace())
            exe.run(startup)
            exe.run(prog)
L
Liang 已提交
254
        elif args.role == "trainer" or args.role == "TRAINER":
Q
Qiao Longfei 已提交
255
            logger.info("run trainer")
Q
Qiao Longfei 已提交
256
            train_prog = t.get_trainer_program()
Z
zhang wenhui 已提交
257 258
            train_loop(args, train_prog, py_reader, loss, auc_var,
                       batch_auc_var, args.trainers, args.trainer_id)
L
Liang 已提交
259 260 261 262
        else:
            raise ValueError(
                'PADDLE_TRAINING_ROLE environment variable must be either TRAINER or PSERVER'
            )
Q
Qiao Longfei 已提交
263 264


Z
zhengya01 已提交
265
def get_cards(args):
Z
zhengya01 已提交
266 267 268
    threads_num = os.environ.get('NUM_THREADS', 1)
    cpu_num = os.environ.get('CPU_NUM', 1)
    return int(threads_num), int(cpu_num)
Z
zhengya01 已提交
269 270


Q
Qiao Longfei 已提交
271 272
if __name__ == '__main__':
    train()