dist_train.py 17.7 KB
Newer Older
Y
Yancey1989 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import time
import os
import traceback
T
typhoonzero 已提交
19 20
import functools
import subprocess
Y
Yancey1989 已提交
21 22 23 24 25 26

import numpy as np

import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
27
import six
Y
Yancey1989 已提交
28 29 30
import sys
sys.path.append("..")
import models
T
typhoonzero 已提交
31
import utils
Y
Yancey1989 已提交
32
from reader import train, val
T
typhoonzero 已提交
33 34 35 36
from utility import add_arguments, print_arguments
from batch_merge import copyback_repeat_bn_params, append_bn_repeat_init_op
from dist_utils import pserver_prepare, nccl2_prepare
from env import dist_env
Y
Yancey1989 已提交
37

T
typhoonzero 已提交
38
def parse_args():
T
typhoonzero 已提交
39 40 41 42 43 44 45 46 47 48
    parser = argparse.ArgumentParser(description=__doc__)
    add_arg = functools.partial(add_arguments, argparser=parser)
    # yapf: disable
    add_arg('batch_size',       int,   256,                  "Minibatch size.")
    add_arg('use_gpu',          bool,  True,                 "Whether to use GPU or not.")
    add_arg('total_images',     int,   1281167,              "Training image number.")
    add_arg('num_epochs',       int,   120,                  "number of epochs.")
    add_arg('class_dim',        int,   1000,                 "Class number.")
    add_arg('image_shape',      str,   "3,224,224",          "input image size")
    add_arg('model_save_dir',   str,   "output",             "model save directory")
T
typhoonzero 已提交
49
    add_arg('with_mem_opt',     bool,  False,                "Whether to use memory optimization or not.")
T
typhoonzero 已提交
50 51 52 53 54 55 56 57 58 59
    add_arg('pretrained_model', str,   None,                 "Whether to use pretrained model.")
    add_arg('checkpoint',       str,   None,                 "Whether to resume checkpoint.")
    add_arg('lr',               float, 0.1,                  "set learning rate.")
    add_arg('lr_strategy',      str,   "piecewise_decay",    "Set the learning rate decay strategy.")
    add_arg('model',            str,   "DistResNet",         "Set the network to use.")
    add_arg('enable_ce',        bool,  False,                "If set True, enable continuous evaluation job.")
    add_arg('data_dir',         str,   "./data/ILSVRC2012",  "The ImageNet dataset root dir.")
    add_arg('model_category',   str,   "models",             "Whether to use models_name or not, valid value:'models','models_name'" )
    add_arg('fp16',             bool,  False,                "Enable half precision training with fp16." )
    add_arg('scale_loss',       float, 1.0,                  "Scale loss for fp16." )
T
typhoonzero 已提交
60
    add_arg('reduce_master_grad', bool, False,               "Whether to allreduce fp32 gradients." )
T
typhoonzero 已提交
61 62 63 64 65 66 67 68
    # for distributed
    add_arg('update_method',      str,  "local",            "Can be local, pserver, nccl2.")
    add_arg('multi_batch_repeat', int,  1,                  "Batch merge repeats.")
    add_arg('start_test_pass',    int,  0,                  "Start test after x passes.")
    add_arg('num_threads',        int,  8,                  "Use num_threads to run the fluid program.")
    add_arg('split_var',          bool, True,               "Split params on pserver.")
    add_arg('async_mode',         bool, False,              "Async distributed training, only for pserver mode.")
    add_arg('reduce_strategy',    str,  "allreduce",        "Choose from reduce or allreduce.")
T
update  
typhoonzero 已提交
69
    add_arg('skip_unbalanced_data', bool, False,            "Skip data not if data not balanced on nodes.")
T
typhoonzero 已提交
70
    add_arg('enable_sequential_execution', bool, False,            "Skip data not if data not balanced on nodes.")
G
gongweibao 已提交
71 72 73
    #for dgc
    add_arg('enable_dgc', bool, False,            "Skip data not if data not balanced on nodes.")
    add_arg('rampup_begin_step', int, 5008,            "Skip data not if data not balanced on nodes.")
T
typhoonzero 已提交
74
    # yapf: enable
T
typhoonzero 已提交
75 76 77
    args = parser.parse_args()
    return args

T
typhoonzero 已提交
78 79 80 81 82 83
def get_device_num():
    if os.getenv("CPU_NUM"):
        return int(os.getenv("CPU_NUM"))
    visible_device = os.getenv('CUDA_VISIBLE_DEVICES')
    if visible_device:
        device_num = len(visible_device.split(','))
Y
Yancey1989 已提交
84
    else:
T
typhoonzero 已提交
85 86 87
        device_num = subprocess.check_output(['nvidia-smi', '-L']).decode().count('\n')
    return device_num

T
typhoonzero 已提交
88
def prepare_reader(is_train, pyreader, args, pass_id=0):
T
typhoonzero 已提交
89 90 91
    # NOTE: allways set reader infinite when nccl2 mode to balance data
    # between ranks
    is_infinite = (args.update_method == "nccl2")
Y
Yancey1989 已提交
92
    if is_train:
T
typhoonzero 已提交
93 94
        reader = train(data_dir=args.data_dir, pass_id_as_seed=pass_id,
                       infinite=is_infinite)
Y
Yancey1989 已提交
95
    else:
Y
Yancey1989 已提交
96
        reader = val(data_dir=args.data_dir)
T
typhoonzero 已提交
97 98 99 100 101 102 103 104 105 106 107 108 109
    if is_train:
        bs = args.batch_size / get_device_num()
    else:
        bs = 16
    pyreader.decorate_paddle_reader(
        paddle.batch(
            reader,
            batch_size=bs))

def build_program(is_train, main_prog, startup_prog, args):
    pyreader = None
    class_dim = args.class_dim
    image_shape = [int(m) for m in args.image_shape.split(",")]
Y
Yancey1989 已提交
110

T
typhoonzero 已提交
111
    trainer_count = args.dist_env["num_trainers"]
T
update  
typhoonzero 已提交
112
    device_num_per_worker = get_device_num()
Y
Yancey1989 已提交
113
    with fluid.program_guard(main_prog, startup_prog):
T
typhoonzero 已提交
114 115 116 117 118 119
        pyreader = fluid.layers.py_reader(
            capacity=16,
            shapes=([-1] + image_shape, (-1, 1)),
            dtypes=('float32', 'int64'),
            name="train_reader" if is_train else "test_reader",
            use_double_buffer=True)
Y
Yancey1989 已提交
120
        with fluid.unique_name.guard():
T
typhoonzero 已提交
121 122 123
            image, label = fluid.layers.read_file(pyreader)
            if args.fp16:
                image = fluid.layers.cast(image, "float16")
T
fix  
typhoonzero 已提交
124
            model_def = models.__dict__[args.model](layers=50, is_train=is_train)
T
typhoonzero 已提交
125 126 127 128 129 130
            predict = model_def.net(image, class_dim=class_dim)
            cost, pred = fluid.layers.softmax_with_cross_entropy(predict, label, return_softmax=True) 
            if args.scale_loss > 1:
                avg_cost = fluid.layers.mean(x=cost) * float(args.scale_loss)
            else:
                avg_cost = fluid.layers.mean(x=cost)
Y
Yancey1989 已提交
131

T
typhoonzero 已提交
132 133
            batch_acc1 = fluid.layers.accuracy(input=pred, label=label, k=1)
            batch_acc5 = fluid.layers.accuracy(input=pred, label=label, k=5)
Y
Yancey1989 已提交
134 135 136

            optimizer = None
            if is_train:
T
typhoonzero 已提交
137 138
                start_lr = args.lr
                end_lr = args.lr * trainer_count * args.multi_batch_repeat
T
update  
typhoonzero 已提交
139 140 141
                if os.getenv("FLAGS_selected_gpus"):
                    # in multi process mode, "trainer_count" will be total devices
                    # in the whole cluster, and we need to scale num_of nodes.
T
typhoonzero 已提交
142
                    end_lr /= device_num_per_worker
T
update  
typhoonzero 已提交
143

T
typhoonzero 已提交
144
                total_images = args.total_images / trainer_count
T
typhoonzero 已提交
145 146 147 148
                if os.getenv("FLAGS_selected_gpus"):
                    step = int(total_images / (args.batch_size / device_num_per_worker * args.multi_batch_repeat) + 1)
                else:
                    step = int(total_images / (args.batch_size * args.multi_batch_repeat) + 1)
T
typhoonzero 已提交
149 150
                warmup_steps = step * 5  # warmup 5 passes
                epochs = [30, 60, 80]
Y
Yancey1989 已提交
151
                bd = [step * e for e in epochs]
T
typhoonzero 已提交
152
                base_lr = end_lr
Y
Yancey1989 已提交
153 154
                lr = []
                lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
T
typhoonzero 已提交
155 156 157 158 159
                print("start lr: %s, end lr: %s, decay boundaries: %s" % (
                    start_lr,
                    end_lr,
                    bd
                ))
T
typhoonzero 已提交
160

T
typhoonzero 已提交
161 162 163
                # NOTE: we put weight decay in layers config, and remove
                # weight decay on bn layers, so don't add weight decay in
                # optimizer config.
Y
Yancey1989 已提交
164
                optimizer = fluid.optimizer.Momentum(
T
typhoonzero 已提交
165
                    learning_rate=utils.learning_rate.lr_warmup(
T
typhoonzero 已提交
166 167 168
                        fluid.layers.piecewise_decay(
                            boundaries=bd, values=lr),
                        warmup_steps, start_lr, end_lr),
T
typhoonzero 已提交
169
                    momentum=0.9)
G
gongweibao 已提交
170 171 172 173 174 175 176 177 178 179 180

                if args.enable_dgc:
                    optimizer = fluid.optimizer.DGCMomentumOptimizer(
                        learning_rate=utils.learning_rate.lr_warmup(
                            fluid.layers.piecewise_decay(
                                boundaries=bd, values=lr),
                            warmup_steps, start_lr, end_lr),
                        momentum=0.9,
                        sparsity=[0.999, 0.999],
                        rampup_begin_step=args.rampup_begin_step)

T
typhoonzero 已提交
181 182 183
                if args.fp16:
                    params_grads = optimizer.backward(avg_cost)
                    master_params_grads = utils.create_master_params_grads(
T
typhoonzero 已提交
184 185
                        params_grads, main_prog, startup_prog, args.scale_loss,
                        reduce_master_grad = args.reduce_master_grad)
T
typhoonzero 已提交
186 187 188 189
                    optimizer.apply_gradients(master_params_grads)
                    utils.master_param_to_train_param(master_params_grads, params_grads, main_prog)
                else:
                    optimizer.minimize(avg_cost)
Y
Yancey1989 已提交
190

T
typhoonzero 已提交
191 192 193 194 195 196 197 198 199 200 201
    # prepare reader for current program
    prepare_reader(is_train, pyreader, args)

    return pyreader, avg_cost, batch_acc1, batch_acc5


def test_single(exe, test_prog, args, pyreader, fetch_list):
    acc1 = fluid.metrics.Accuracy()
    acc5 = fluid.metrics.Accuracy()
    test_losses = []
    pyreader.start()
Y
Yancey1989 已提交
202 203
    while True:
        try:
T
typhoonzero 已提交
204 205 206 207 208 209
            acc_rets = exe.run(program=test_prog, fetch_list=fetch_list)
            test_losses.append(acc_rets[0])
            acc1.update(value=np.array(acc_rets[1]), weight=args.batch_size)
            acc5.update(value=np.array(acc_rets[2]), weight=args.batch_size)
        except fluid.core.EOFException:
            pyreader.reset()
Y
Yancey1989 已提交
210
            break
T
typhoonzero 已提交
211 212
    test_avg_loss = np.mean(np.array(test_losses))
    return test_avg_loss, np.mean(acc1.eval()), np.mean(acc5.eval())
Y
Yancey1989 已提交
213

T
typhoonzero 已提交
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
def test_parallel(exe, test_prog, args, pyreader, fetch_list):
    acc1 = fluid.metrics.Accuracy()
    acc5 = fluid.metrics.Accuracy()
    test_losses = []
    pyreader.start()
    while True:
        try:
            acc_rets = exe.run(fetch_list=fetch_list)
            test_losses.append(acc_rets[0])
            acc1.update(value=np.array(acc_rets[1]), weight=args.batch_size)
            acc5.update(value=np.array(acc_rets[2]), weight=args.batch_size)
        except fluid.core.EOFException:
            pyreader.reset()
            break
    test_avg_loss = np.mean(np.array(test_losses))
    return test_avg_loss, np.mean(acc1.eval()), np.mean(acc5.eval())


T
typhoonzero 已提交
232 233 234 235
def run_pserver(train_prog, startup_prog):
    server_exe = fluid.Executor(fluid.CPUPlace())
    server_exe.run(startup_prog)
    server_exe.run(train_prog)
Y
Yancey1989 已提交
236

T
typhoonzero 已提交
237 238 239 240
def train_parallel(args):
    train_prog = fluid.Program()
    test_prog = fluid.Program()
    startup_prog = fluid.Program()
T
typhoonzero 已提交
241

T
typhoonzero 已提交
242 243 244 245 246 247
    train_pyreader, train_cost, train_acc1, train_acc5 = build_program(True, train_prog, startup_prog, args)
    test_pyreader, test_cost, test_acc1, test_acc5 = build_program(False, test_prog, startup_prog, args)

    if args.update_method == "pserver":
        train_prog, startup_prog = pserver_prepare(args, train_prog, startup_prog)
    elif args.update_method == "nccl2":
G
gongweibao 已提交
248
        nccl2_prepare(args, startup_prog, main_prog=train_prog)
Y
Yancey1989 已提交
249

T
typhoonzero 已提交
250 251 252 253 254 255 256 257 258 259
    if args.dist_env["training_role"] == "PSERVER":
        run_pserver(train_prog, startup_prog)
        exit(0)

    if args.use_gpu:
        # NOTE: for multi process mode: one process per GPU device.        
        gpu_id = 0
        if os.getenv("FLAGS_selected_gpus"):
            gpu_id = int(os.getenv("FLAGS_selected_gpus"))
    place = core.CUDAPlace(gpu_id) if args.use_gpu else core.CPUPlace()
Y
Yancey1989 已提交
260 261

    startup_exe = fluid.Executor(place)
T
typhoonzero 已提交
262 263
    if args.multi_batch_repeat > 1:
        append_bn_repeat_init_op(train_prog, startup_prog, args.multi_batch_repeat)
Y
Yancey1989 已提交
264
    startup_exe.run(startup_prog)
T
typhoonzero 已提交
265

T
typhoonzero 已提交
266 267 268
    if args.checkpoint:
        fluid.io.load_persistables(startup_exe, args.checkpoint, main_program=train_prog)

Y
Yancey1989 已提交
269
    strategy = fluid.ExecutionStrategy()
T
typhoonzero 已提交
270
    strategy.num_threads = args.num_threads
G
gongweibao 已提交
271
    # num_iteration_per_drop_scope indicates how
T
typhoonzero 已提交
272 273 274
    # many iterations to clean up the temp variables which
    # is generated during execution. It may make the execution faster,
    # because the temp variable's shape are the same between two iterations.
G
gongweibao 已提交
275 276
    strategy.num_iteration_per_drop_scope = 30

Y
Yancey1989 已提交
277
    build_strategy = fluid.BuildStrategy()
T
typhoonzero 已提交
278 279
    build_strategy.enable_inplace = False
    build_strategy.memory_optimize = False
T
typhoonzero 已提交
280
    build_strategy.enable_sequential_execution = bool(args.enable_sequential_execution)
T
typhoonzero 已提交
281
    
Y
Yancey1989 已提交
282 283 284 285 286 287 288
    if args.reduce_strategy == "reduce":
        build_strategy.reduce_strategy = fluid.BuildStrategy(
        ).ReduceStrategy.Reduce
    else:
        build_strategy.reduce_strategy = fluid.BuildStrategy(
        ).ReduceStrategy.AllReduce

T
typhoonzero 已提交
289
    if args.update_method == "pserver" or args.update_method == "local":
Y
Yancey1989 已提交
290 291 292 293 294
        # parameter server mode distributed training, merge
        # gradients on local server, do not initialize
        # ParallelExecutor with multi server all-reduce mode.
        num_trainers = 1
        trainer_id = 0
T
typhoonzero 已提交
295 296 297
    else:
        num_trainers = args.dist_env["num_trainers"]
        trainer_id = args.dist_env["trainer_id"]
T
typhoonzero 已提交
298 299 300 301 302 303 304 305 306
        # Set this to let build_strategy to add "allreduce_deps_pass" automatically
        build_strategy.num_trainers = num_trainers
        build_strategy.trainer_id = trainer_id

    if args.multi_batch_repeat > 1:
        pass_builder = build_strategy._finalize_strategy_and_create_passes()
        mypass = pass_builder.insert_pass(
            len(pass_builder.all_passes()) - 4, "multi_batch_merge_pass")
        mypass.set("num_repeats", args.multi_batch_repeat)
Y
Yancey1989 已提交
307 308 309

    exe = fluid.ParallelExecutor(
        True,
T
typhoonzero 已提交
310
        train_cost.name,
Y
Yancey1989 已提交
311 312 313 314 315 316
        main_program=train_prog,
        exec_strategy=strategy,
        build_strategy=build_strategy,
        num_trainers=num_trainers,
        trainer_id=trainer_id)

T
typhoonzero 已提交
317 318 319 320 321 322 323 324
    # Uncomment below lines to use ParallelExecutor to run test.
    # test_exe = fluid.ParallelExecutor(
    #     True,
    #     main_program=test_prog,
    #     share_vars_from=exe,
    #     scope=fluid.global_scope().new_scope()
    # )

T
typhoonzero 已提交
325 326
    over_all_start = time.time()
    fetch_list = [train_cost.name, train_acc1.name, train_acc5.name]
T
typhoonzero 已提交
327 328 329 330 331 332 333
    # 1. MP mode, batch size for current process should be args.batch_size / GPUs
    # 2. SP/PG mode, batch size for each process should be original args.batch_size
    if os.getenv("FLAGS_selected_gpus"):
        steps_per_pass = args.total_images / (args.batch_size / get_device_num()) / args.dist_env["num_trainers"]
    else:
        steps_per_pass = args.total_images / args.batch_size / args.dist_env["num_trainers"]

T
typhoonzero 已提交
334
    for pass_id in range(args.num_epochs):
Y
Yancey1989 已提交
335 336
        num_samples = 0
        start_time = time.time()
T
typhoonzero 已提交
337
        batch_id = 1
T
typhoonzero 已提交
338 339
        # use pass_id+1 as per pass global shuffle for distributed training
        prepare_reader(True, train_pyreader, args, pass_id + 1)
T
typhoonzero 已提交
340
        train_pyreader.start()
Y
Yancey1989 已提交
341 342
        while True:
            try:
T
typhoonzero 已提交
343 344
                if batch_id % 30 == 0:
                    fetch_ret = exe.run(fetch_list)
T
typhoonzero 已提交
345
                    fetched_data = [np.mean(np.array(d)) for d in fetch_ret]
T
refine  
typhoonzero 已提交
346 347
                    print("Pass [%d/%d], batch [%d/%d], loss %s, acc1: %s, acc5: %s, avg batch time %.4f" %
                        (pass_id, args.num_epochs, batch_id, steps_per_pass, fetched_data[0], fetched_data[1],
T
typhoonzero 已提交
348
                         fetched_data[2], (time.time()-start_time) / batch_id))
T
typhoonzero 已提交
349 350
                else:
                    fetch_ret = exe.run([])
T
typhoonzero 已提交
351
            except fluid.core.EOFException:
Y
Yancey1989 已提交
352
                break
T
typhoonzero 已提交
353
            except fluid.core.EnforceNotMet:
Y
Yancey1989 已提交
354 355
                traceback.print_exc()
                break
T
typhoonzero 已提交
356
            num_samples += args.batch_size
Y
Yancey1989 已提交
357
            batch_id += 1
T
typhoonzero 已提交
358
            if (args.skip_unbalanced_data or args.update_method == "nccl2") and batch_id >= steps_per_pass:
T
update  
typhoonzero 已提交
359
                break
Y
Yancey1989 已提交
360 361

        print_train_time(start_time, time.time(), num_samples)
T
typhoonzero 已提交
362
        train_pyreader.reset()
T
refine  
typhoonzero 已提交
363
        if pass_id >= args.start_test_pass:
T
typhoonzero 已提交
364 365
            if args.multi_batch_repeat > 1:
                copyback_repeat_bn_params(train_prog)
T
typhoonzero 已提交
366 367
            test_fetch_list = [test_cost.name, test_acc1.name, test_acc5.name]
            test_ret = test_single(startup_exe, test_prog, args, test_pyreader,test_fetch_list)
T
typhoonzero 已提交
368 369
            # NOTE: switch to below line if you use ParallelExecutor to run test.
            # test_ret = test_parallel(test_exe, test_prog, args, test_pyreader,test_fetch_list)
T
typhoonzero 已提交
370 371
            print("Pass: %d, Test Loss %s, test acc1: %s, test acc5: %s\n" %
                  (pass_id, test_ret[0], test_ret[1], test_ret[2]))
T
typhoonzero 已提交
372 373 374 375 376 377
            model_path = os.path.join(args.model_save_dir + '/' + args.model,
                                  str(pass_id))
            print("saving model to ", model_path)
            if not os.path.isdir(model_path):
                os.makedirs(model_path)
            fluid.io.save_persistables(startup_exe, model_path, main_program=train_prog)
Y
Yancey1989 已提交
378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
    startup_exe.close()
    print("total train time: ", time.time() - over_all_start)


def print_train_time(start_time, end_time, num_samples):
    train_elapsed = end_time - start_time
    examples_per_sec = num_samples / train_elapsed
    print('\nTotal examples: %d, total time: %.5f, %.5f examples/sed\n' %
          (num_samples, train_elapsed, examples_per_sec))


def print_paddle_envs():
    print('----------- Configuration envs -----------')
    for k in os.environ:
        if "PADDLE_" in k:
393
            print("ENV %s:%s" % (k, os.environ[k]))
Y
Yancey1989 已提交
394 395 396 397 398 399 400
    print('------------------------------------------------')


def main():
    args = parse_args()
    print_arguments(args)
    print_paddle_envs()
T
typhoonzero 已提交
401 402
    args.dist_env = dist_env()
    train_parallel(args)
Y
Yancey1989 已提交
403 404 405

if __name__ == "__main__":
    main()
T
typhoonzero 已提交
406