dist_train.py 17.7 KB
Newer Older
Y
Yancey1989 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import time
import os
import traceback
T
typhoonzero 已提交
19 20
import functools
import subprocess
Y
Yancey1989 已提交
21 22 23 24 25

import numpy as np

import paddle
import paddle.fluid as fluid
26
import six
Y
Yancey1989 已提交
27 28 29
import sys
sys.path.append("..")
import models
T
typhoonzero 已提交
30
import utils
Y
Yancey1989 已提交
31
from reader import train, val
T
typhoonzero 已提交
32 33 34 35
from utility import add_arguments, print_arguments
from batch_merge import copyback_repeat_bn_params, append_bn_repeat_init_op
from dist_utils import pserver_prepare, nccl2_prepare
from env import dist_env
Y
Yancey1989 已提交
36

Y
Yibing Liu 已提交
37

T
typhoonzero 已提交
38
def parse_args():
T
typhoonzero 已提交
39 40 41 42 43 44 45 46 47 48
    parser = argparse.ArgumentParser(description=__doc__)
    add_arg = functools.partial(add_arguments, argparser=parser)
    # yapf: disable
    add_arg('batch_size',       int,   256,                  "Minibatch size.")
    add_arg('use_gpu',          bool,  True,                 "Whether to use GPU or not.")
    add_arg('total_images',     int,   1281167,              "Training image number.")
    add_arg('num_epochs',       int,   120,                  "number of epochs.")
    add_arg('class_dim',        int,   1000,                 "Class number.")
    add_arg('image_shape',      str,   "3,224,224",          "input image size")
    add_arg('model_save_dir',   str,   "output",             "model save directory")
49
    add_arg('with_mem_opt',     bool,  False,                "Whether to use memory optimization or not.")
T
typhoonzero 已提交
50 51 52 53 54 55 56 57 58 59
    add_arg('pretrained_model', str,   None,                 "Whether to use pretrained model.")
    add_arg('checkpoint',       str,   None,                 "Whether to resume checkpoint.")
    add_arg('lr',               float, 0.1,                  "set learning rate.")
    add_arg('lr_strategy',      str,   "piecewise_decay",    "Set the learning rate decay strategy.")
    add_arg('model',            str,   "DistResNet",         "Set the network to use.")
    add_arg('enable_ce',        bool,  False,                "If set True, enable continuous evaluation job.")
    add_arg('data_dir',         str,   "./data/ILSVRC2012",  "The ImageNet dataset root dir.")
    add_arg('model_category',   str,   "models",             "Whether to use models_name or not, valid value:'models','models_name'" )
    add_arg('fp16',             bool,  False,                "Enable half precision training with fp16." )
    add_arg('scale_loss',       float, 1.0,                  "Scale loss for fp16." )
T
typhoonzero 已提交
60
    add_arg('reduce_master_grad', bool, False,               "Whether to allreduce fp32 gradients." )
T
typhoonzero 已提交
61 62 63 64 65 66 67 68
    # for distributed
    add_arg('update_method',      str,  "local",            "Can be local, pserver, nccl2.")
    add_arg('multi_batch_repeat', int,  1,                  "Batch merge repeats.")
    add_arg('start_test_pass',    int,  0,                  "Start test after x passes.")
    add_arg('num_threads',        int,  8,                  "Use num_threads to run the fluid program.")
    add_arg('split_var',          bool, True,               "Split params on pserver.")
    add_arg('async_mode',         bool, False,              "Async distributed training, only for pserver mode.")
    add_arg('reduce_strategy',    str,  "allreduce",        "Choose from reduce or allreduce.")
T
typhoonzero 已提交
69
    add_arg('enable_sequential_execution', bool, False,            "Skip data not if data not balanced on nodes.")
G
gongweibao 已提交
70 71 72
    #for dgc
    add_arg('enable_dgc', bool, False,            "Skip data not if data not balanced on nodes.")
    add_arg('rampup_begin_step', int, 5008,            "Skip data not if data not balanced on nodes.")
T
typhoonzero 已提交
73
    # yapf: enable
T
typhoonzero 已提交
74 75 76
    args = parser.parse_args()
    return args

Y
Yibing Liu 已提交
77

T
typhoonzero 已提交
78 79 80
def get_device_num():
    if os.getenv("CPU_NUM"):
        return int(os.getenv("CPU_NUM"))
81
    return fluid.core.get_cuda_device_count()
T
typhoonzero 已提交
82

Y
Yibing Liu 已提交
83

T
update  
typhoonzero 已提交
84 85
def prepare_reader(is_train, pyreader, args, pass_id=1):
    # NOTE: always use infinite reader for dist training
Y
Yancey1989 已提交
86
    if is_train:
Y
Yibing Liu 已提交
87 88
        reader = train(
            data_dir=args.data_dir, pass_id_as_seed=pass_id, infinite=True)
Y
Yancey1989 已提交
89
    else:
Y
Yancey1989 已提交
90
        reader = val(data_dir=args.data_dir)
T
typhoonzero 已提交
91 92 93 94
    if is_train:
        bs = args.batch_size / get_device_num()
    else:
        bs = 16
95
    pyreader.decorate_paddle_reader(fluid.io.batch(reader, batch_size=bs))
Y
Yibing Liu 已提交
96

T
typhoonzero 已提交
97 98 99 100 101

def build_program(is_train, main_prog, startup_prog, args):
    pyreader = None
    class_dim = args.class_dim
    image_shape = [int(m) for m in args.image_shape.split(",")]
Y
Yancey1989 已提交
102

T
typhoonzero 已提交
103
    trainer_count = args.dist_env["num_trainers"]
T
update  
typhoonzero 已提交
104
    device_num_per_worker = get_device_num()
Y
Yancey1989 已提交
105
    with fluid.program_guard(main_prog, startup_prog):
T
typhoonzero 已提交
106 107 108 109 110 111
        pyreader = fluid.layers.py_reader(
            capacity=16,
            shapes=([-1] + image_shape, (-1, 1)),
            dtypes=('float32', 'int64'),
            name="train_reader" if is_train else "test_reader",
            use_double_buffer=True)
Y
Yancey1989 已提交
112
        with fluid.unique_name.guard():
T
typhoonzero 已提交
113 114 115
            image, label = fluid.layers.read_file(pyreader)
            if args.fp16:
                image = fluid.layers.cast(image, "float16")
Y
Yibing Liu 已提交
116 117
            model_def = models.__dict__[args.model](layers=50,
                                                    is_train=is_train)
T
typhoonzero 已提交
118
            predict = model_def.net(image, class_dim=class_dim)
Y
Yibing Liu 已提交
119 120
            cost, pred = fluid.layers.softmax_with_cross_entropy(
                predict, label, return_softmax=True)
T
typhoonzero 已提交
121 122 123 124
            if args.scale_loss > 1:
                avg_cost = fluid.layers.mean(x=cost) * float(args.scale_loss)
            else:
                avg_cost = fluid.layers.mean(x=cost)
Y
Yancey1989 已提交
125

T
typhoonzero 已提交
126 127
            batch_acc1 = fluid.layers.accuracy(input=pred, label=label, k=1)
            batch_acc5 = fluid.layers.accuracy(input=pred, label=label, k=5)
Y
Yancey1989 已提交
128 129 130

            optimizer = None
            if is_train:
T
typhoonzero 已提交
131 132
                start_lr = args.lr
                end_lr = args.lr * trainer_count * args.multi_batch_repeat
T
update  
typhoonzero 已提交
133 134 135
                if os.getenv("FLAGS_selected_gpus"):
                    # in multi process mode, "trainer_count" will be total devices
                    # in the whole cluster, and we need to scale num_of nodes.
T
typhoonzero 已提交
136
                    end_lr /= device_num_per_worker
T
update  
typhoonzero 已提交
137

T
typhoonzero 已提交
138
                total_images = args.total_images / trainer_count
T
typhoonzero 已提交
139
                if os.getenv("FLAGS_selected_gpus"):
Y
Yibing Liu 已提交
140 141 142
                    step = int(total_images /
                               (args.batch_size / device_num_per_worker *
                                args.multi_batch_repeat) + 1)
T
typhoonzero 已提交
143
                else:
Y
Yibing Liu 已提交
144 145
                    step = int(total_images / (args.batch_size *
                                               args.multi_batch_repeat) + 1)
T
typhoonzero 已提交
146 147
                warmup_steps = step * 5  # warmup 5 passes
                epochs = [30, 60, 80]
Y
Yancey1989 已提交
148
                bd = [step * e for e in epochs]
T
typhoonzero 已提交
149
                base_lr = end_lr
Y
Yancey1989 已提交
150 151
                lr = []
                lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
Y
Yibing Liu 已提交
152 153
                print("start lr: %s, end lr: %s, decay boundaries: %s" %
                      (start_lr, end_lr, bd))
T
typhoonzero 已提交
154

T
typhoonzero 已提交
155 156 157
                # NOTE: we put weight decay in layers config, and remove
                # weight decay on bn layers, so don't add weight decay in
                # optimizer config.
Y
Yancey1989 已提交
158
                optimizer = fluid.optimizer.Momentum(
T
typhoonzero 已提交
159
                    learning_rate=utils.learning_rate.lr_warmup(
T
typhoonzero 已提交
160 161
                        fluid.layers.piecewise_decay(
                            boundaries=bd, values=lr),
Y
Yibing Liu 已提交
162 163 164
                        warmup_steps,
                        start_lr,
                        end_lr),
T
typhoonzero 已提交
165
                    momentum=0.9)
G
gongweibao 已提交
166 167 168 169 170 171

                if args.enable_dgc:
                    optimizer = fluid.optimizer.DGCMomentumOptimizer(
                        learning_rate=utils.learning_rate.lr_warmup(
                            fluid.layers.piecewise_decay(
                                boundaries=bd, values=lr),
Y
Yibing Liu 已提交
172 173 174
                            warmup_steps,
                            start_lr,
                            end_lr),
G
gongweibao 已提交
175 176 177 178
                        momentum=0.9,
                        sparsity=[0.999, 0.999],
                        rampup_begin_step=args.rampup_begin_step)

T
typhoonzero 已提交
179 180 181
                if args.fp16:
                    params_grads = optimizer.backward(avg_cost)
                    master_params_grads = utils.create_master_params_grads(
Y
Yibing Liu 已提交
182 183 184 185 186
                        params_grads,
                        main_prog,
                        startup_prog,
                        args.scale_loss,
                        reduce_master_grad=args.reduce_master_grad)
T
typhoonzero 已提交
187
                    optimizer.apply_gradients(master_params_grads)
Y
Yibing Liu 已提交
188 189
                    utils.master_param_to_train_param(master_params_grads,
                                                      params_grads, main_prog)
T
typhoonzero 已提交
190 191
                else:
                    optimizer.minimize(avg_cost)
Y
Yancey1989 已提交
192

T
typhoonzero 已提交
193 194 195 196 197 198 199 200 201 202 203
    # prepare reader for current program
    prepare_reader(is_train, pyreader, args)

    return pyreader, avg_cost, batch_acc1, batch_acc5


def test_single(exe, test_prog, args, pyreader, fetch_list):
    acc1 = fluid.metrics.Accuracy()
    acc5 = fluid.metrics.Accuracy()
    test_losses = []
    pyreader.start()
Y
Yancey1989 已提交
204 205
    while True:
        try:
T
typhoonzero 已提交
206 207 208 209 210 211
            acc_rets = exe.run(program=test_prog, fetch_list=fetch_list)
            test_losses.append(acc_rets[0])
            acc1.update(value=np.array(acc_rets[1]), weight=args.batch_size)
            acc5.update(value=np.array(acc_rets[2]), weight=args.batch_size)
        except fluid.core.EOFException:
            pyreader.reset()
Y
Yancey1989 已提交
212
            break
T
typhoonzero 已提交
213 214
    test_avg_loss = np.mean(np.array(test_losses))
    return test_avg_loss, np.mean(acc1.eval()), np.mean(acc5.eval())
Y
Yancey1989 已提交
215

Y
Yibing Liu 已提交
216

T
typhoonzero 已提交
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
def test_parallel(exe, test_prog, args, pyreader, fetch_list):
    acc1 = fluid.metrics.Accuracy()
    acc5 = fluid.metrics.Accuracy()
    test_losses = []
    pyreader.start()
    while True:
        try:
            acc_rets = exe.run(fetch_list=fetch_list)
            test_losses.append(acc_rets[0])
            acc1.update(value=np.array(acc_rets[1]), weight=args.batch_size)
            acc5.update(value=np.array(acc_rets[2]), weight=args.batch_size)
        except fluid.core.EOFException:
            pyreader.reset()
            break
    test_avg_loss = np.mean(np.array(test_losses))
    return test_avg_loss, np.mean(acc1.eval()), np.mean(acc5.eval())


T
typhoonzero 已提交
235 236 237 238
def run_pserver(train_prog, startup_prog):
    server_exe = fluid.Executor(fluid.CPUPlace())
    server_exe.run(startup_prog)
    server_exe.run(train_prog)
Y
Yancey1989 已提交
239

Y
Yibing Liu 已提交
240

T
typhoonzero 已提交
241 242 243 244
def train_parallel(args):
    train_prog = fluid.Program()
    test_prog = fluid.Program()
    startup_prog = fluid.Program()
T
typhoonzero 已提交
245

Y
Yibing Liu 已提交
246 247 248 249
    train_pyreader, train_cost, train_acc1, train_acc5 = build_program(
        True, train_prog, startup_prog, args)
    test_pyreader, test_cost, test_acc1, test_acc5 = build_program(
        False, test_prog, startup_prog, args)
T
typhoonzero 已提交
250 251

    if args.update_method == "pserver":
Y
Yibing Liu 已提交
252 253
        train_prog, startup_prog = pserver_prepare(args, train_prog,
                                                   startup_prog)
T
typhoonzero 已提交
254
    elif args.update_method == "nccl2":
G
gongweibao 已提交
255
        nccl2_prepare(args, startup_prog, main_prog=train_prog)
Y
Yancey1989 已提交
256

T
typhoonzero 已提交
257 258 259 260 261 262 263 264 265
    if args.dist_env["training_role"] == "PSERVER":
        run_pserver(train_prog, startup_prog)
        exit(0)

    if args.use_gpu:
        # NOTE: for multi process mode: one process per GPU device.        
        gpu_id = 0
        if os.getenv("FLAGS_selected_gpus"):
            gpu_id = int(os.getenv("FLAGS_selected_gpus"))
Y
Yibing Liu 已提交
266
    place = fluid.CUDAPlace(gpu_id) if args.use_gpu else fluid.CPUPlace()
Y
Yancey1989 已提交
267 268

    startup_exe = fluid.Executor(place)
T
typhoonzero 已提交
269
    if args.multi_batch_repeat > 1:
Y
Yibing Liu 已提交
270 271
        append_bn_repeat_init_op(train_prog, startup_prog,
                                 args.multi_batch_repeat)
Y
Yancey1989 已提交
272
    startup_exe.run(startup_prog)
T
typhoonzero 已提交
273

T
typhoonzero 已提交
274
    if args.checkpoint:
Y
Yibing Liu 已提交
275 276
        fluid.io.load_persistables(
            startup_exe, args.checkpoint, main_program=train_prog)
T
typhoonzero 已提交
277

Y
Yancey1989 已提交
278
    strategy = fluid.ExecutionStrategy()
T
typhoonzero 已提交
279
    strategy.num_threads = args.num_threads
G
gongweibao 已提交
280
    # num_iteration_per_drop_scope indicates how
T
typhoonzero 已提交
281 282 283
    # many iterations to clean up the temp variables which
    # is generated during execution. It may make the execution faster,
    # because the temp variable's shape are the same between two iterations.
G
gongweibao 已提交
284 285
    strategy.num_iteration_per_drop_scope = 30

Y
Yancey1989 已提交
286
    build_strategy = fluid.BuildStrategy()
T
typhoonzero 已提交
287
    build_strategy.memory_optimize = False
Y
Yibing Liu 已提交
288 289 290
    build_strategy.enable_sequential_execution = bool(
        args.enable_sequential_execution)

Y
Yancey1989 已提交
291 292 293 294 295 296 297
    if args.reduce_strategy == "reduce":
        build_strategy.reduce_strategy = fluid.BuildStrategy(
        ).ReduceStrategy.Reduce
    else:
        build_strategy.reduce_strategy = fluid.BuildStrategy(
        ).ReduceStrategy.AllReduce

T
typhoonzero 已提交
298
    if args.update_method == "pserver" or args.update_method == "local":
Y
Yancey1989 已提交
299 300 301 302 303
        # parameter server mode distributed training, merge
        # gradients on local server, do not initialize
        # ParallelExecutor with multi server all-reduce mode.
        num_trainers = 1
        trainer_id = 0
T
typhoonzero 已提交
304 305 306
    else:
        num_trainers = args.dist_env["num_trainers"]
        trainer_id = args.dist_env["trainer_id"]
T
typhoonzero 已提交
307 308 309 310 311 312 313 314 315
        # Set this to let build_strategy to add "allreduce_deps_pass" automatically
        build_strategy.num_trainers = num_trainers
        build_strategy.trainer_id = trainer_id

    if args.multi_batch_repeat > 1:
        pass_builder = build_strategy._finalize_strategy_and_create_passes()
        mypass = pass_builder.insert_pass(
            len(pass_builder.all_passes()) - 4, "multi_batch_merge_pass")
        mypass.set("num_repeats", args.multi_batch_repeat)
Y
Yancey1989 已提交
316 317 318

    exe = fluid.ParallelExecutor(
        True,
T
typhoonzero 已提交
319
        train_cost.name,
Y
Yancey1989 已提交
320 321 322 323 324 325
        main_program=train_prog,
        exec_strategy=strategy,
        build_strategy=build_strategy,
        num_trainers=num_trainers,
        trainer_id=trainer_id)

T
typhoonzero 已提交
326 327 328 329 330 331 332 333
    # Uncomment below lines to use ParallelExecutor to run test.
    # test_exe = fluid.ParallelExecutor(
    #     True,
    #     main_program=test_prog,
    #     share_vars_from=exe,
    #     scope=fluid.global_scope().new_scope()
    # )

T
typhoonzero 已提交
334 335
    over_all_start = time.time()
    fetch_list = [train_cost.name, train_acc1.name, train_acc5.name]
T
typhoonzero 已提交
336 337 338
    # 1. MP mode, batch size for current process should be args.batch_size / GPUs
    # 2. SP/PG mode, batch size for each process should be original args.batch_size
    if os.getenv("FLAGS_selected_gpus"):
Y
Yibing Liu 已提交
339 340
        steps_per_pass = args.total_images / (
            args.batch_size / get_device_num()) / args.dist_env["num_trainers"]
T
typhoonzero 已提交
341
    else:
Y
Yibing Liu 已提交
342 343
        steps_per_pass = args.total_images / args.batch_size / args.dist_env[
            "num_trainers"]
T
typhoonzero 已提交
344

T
typhoonzero 已提交
345
    for pass_id in range(args.num_epochs):
Y
Yancey1989 已提交
346 347
        num_samples = 0
        start_time = time.time()
T
typhoonzero 已提交
348
        batch_id = 1
T
update  
typhoonzero 已提交
349 350
        if pass_id == 0:
            train_pyreader.start()
Y
Yancey1989 已提交
351 352
        while True:
            try:
T
typhoonzero 已提交
353 354
                if batch_id % 30 == 0:
                    fetch_ret = exe.run(fetch_list)
T
typhoonzero 已提交
355
                    fetched_data = [np.mean(np.array(d)) for d in fetch_ret]
Y
Yibing Liu 已提交
356 357 358 359 360
                    print(
                        "Pass [%d/%d], batch [%d/%d], loss %s, acc1: %s, acc5: %s, avg batch time %.4f"
                        % (pass_id, args.num_epochs, batch_id, steps_per_pass,
                           fetched_data[0], fetched_data[1], fetched_data[2],
                           (time.time() - start_time) / batch_id))
T
typhoonzero 已提交
361 362
                else:
                    fetch_ret = exe.run([])
T
typhoonzero 已提交
363
            except fluid.core.EOFException:
Y
Yancey1989 已提交
364
                break
T
typhoonzero 已提交
365
            except fluid.core.EnforceNotMet:
Y
Yancey1989 已提交
366 367
                traceback.print_exc()
                break
T
typhoonzero 已提交
368
            num_samples += args.batch_size
Y
Yancey1989 已提交
369
            batch_id += 1
T
update  
typhoonzero 已提交
370
            if batch_id >= steps_per_pass:
T
update  
typhoonzero 已提交
371
                break
Y
Yancey1989 已提交
372 373

        print_train_time(start_time, time.time(), num_samples)
T
refine  
typhoonzero 已提交
374
        if pass_id >= args.start_test_pass:
T
typhoonzero 已提交
375 376
            if args.multi_batch_repeat > 1:
                copyback_repeat_bn_params(train_prog)
T
typhoonzero 已提交
377
            test_fetch_list = [test_cost.name, test_acc1.name, test_acc5.name]
Y
Yibing Liu 已提交
378 379
            test_ret = test_single(startup_exe, test_prog, args, test_pyreader,
                                   test_fetch_list)
T
typhoonzero 已提交
380 381
            # NOTE: switch to below line if you use ParallelExecutor to run test.
            # test_ret = test_parallel(test_exe, test_prog, args, test_pyreader,test_fetch_list)
T
typhoonzero 已提交
382 383
            print("Pass: %d, Test Loss %s, test acc1: %s, test acc5: %s\n" %
                  (pass_id, test_ret[0], test_ret[1], test_ret[2]))
T
typhoonzero 已提交
384
            model_path = os.path.join(args.model_save_dir + '/' + args.model,
Y
Yibing Liu 已提交
385
                                      str(pass_id))
T
typhoonzero 已提交
386 387 388
            print("saving model to ", model_path)
            if not os.path.isdir(model_path):
                os.makedirs(model_path)
Y
Yibing Liu 已提交
389 390
            fluid.io.save_persistables(
                startup_exe, model_path, main_program=train_prog)
T
update  
typhoonzero 已提交
391
    train_pyreader.reset()
Y
Yancey1989 已提交
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
    startup_exe.close()
    print("total train time: ", time.time() - over_all_start)


def print_train_time(start_time, end_time, num_samples):
    train_elapsed = end_time - start_time
    examples_per_sec = num_samples / train_elapsed
    print('\nTotal examples: %d, total time: %.5f, %.5f examples/sed\n' %
          (num_samples, train_elapsed, examples_per_sec))


def print_paddle_envs():
    print('----------- Configuration envs -----------')
    for k in os.environ:
        if "PADDLE_" in k:
407
            print("ENV %s:%s" % (k, os.environ[k]))
Y
Yancey1989 已提交
408 409 410 411 412 413 414
    print('------------------------------------------------')


def main():
    args = parse_args()
    print_arguments(args)
    print_paddle_envs()
T
typhoonzero 已提交
415 416
    args.dist_env = dist_env()
    train_parallel(args)
Y
Yancey1989 已提交
417

Y
Yibing Liu 已提交
418

Y
Yancey1989 已提交
419 420
if __name__ == "__main__":
    main()