dist_train.py 17.9 KB
Newer Older
Y
Yancey1989 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import time
import os
import traceback
T
typhoonzero 已提交
19 20
import functools
import subprocess
Y
Yancey1989 已提交
21 22 23 24 25

import numpy as np

import paddle
import paddle.fluid as fluid
26
import six
Y
Yancey1989 已提交
27 28 29
import sys
sys.path.append("..")
import models
T
typhoonzero 已提交
30
import utils
Y
Yancey1989 已提交
31
from reader import train, val
T
typhoonzero 已提交
32 33 34 35
from utility import add_arguments, print_arguments
from batch_merge import copyback_repeat_bn_params, append_bn_repeat_init_op
from dist_utils import pserver_prepare, nccl2_prepare
from env import dist_env
Y
Yancey1989 已提交
36

Y
Yibing Liu 已提交
37

T
typhoonzero 已提交
38
def parse_args():
T
typhoonzero 已提交
39 40 41 42 43 44 45 46 47 48
    parser = argparse.ArgumentParser(description=__doc__)
    add_arg = functools.partial(add_arguments, argparser=parser)
    # yapf: disable
    add_arg('batch_size',       int,   256,                  "Minibatch size.")
    add_arg('use_gpu',          bool,  True,                 "Whether to use GPU or not.")
    add_arg('total_images',     int,   1281167,              "Training image number.")
    add_arg('num_epochs',       int,   120,                  "number of epochs.")
    add_arg('class_dim',        int,   1000,                 "Class number.")
    add_arg('image_shape',      str,   "3,224,224",          "input image size")
    add_arg('model_save_dir',   str,   "output",             "model save directory")
T
typhoonzero 已提交
49
    add_arg('with_mem_opt',     bool,  False,                "Whether to use memory optimization or not.")
T
typhoonzero 已提交
50 51 52 53 54 55 56 57 58 59
    add_arg('pretrained_model', str,   None,                 "Whether to use pretrained model.")
    add_arg('checkpoint',       str,   None,                 "Whether to resume checkpoint.")
    add_arg('lr',               float, 0.1,                  "set learning rate.")
    add_arg('lr_strategy',      str,   "piecewise_decay",    "Set the learning rate decay strategy.")
    add_arg('model',            str,   "DistResNet",         "Set the network to use.")
    add_arg('enable_ce',        bool,  False,                "If set True, enable continuous evaluation job.")
    add_arg('data_dir',         str,   "./data/ILSVRC2012",  "The ImageNet dataset root dir.")
    add_arg('model_category',   str,   "models",             "Whether to use models_name or not, valid value:'models','models_name'" )
    add_arg('fp16',             bool,  False,                "Enable half precision training with fp16." )
    add_arg('scale_loss',       float, 1.0,                  "Scale loss for fp16." )
T
typhoonzero 已提交
60
    add_arg('reduce_master_grad', bool, False,               "Whether to allreduce fp32 gradients." )
T
typhoonzero 已提交
61 62 63 64 65 66 67 68
    # for distributed
    add_arg('update_method',      str,  "local",            "Can be local, pserver, nccl2.")
    add_arg('multi_batch_repeat', int,  1,                  "Batch merge repeats.")
    add_arg('start_test_pass',    int,  0,                  "Start test after x passes.")
    add_arg('num_threads',        int,  8,                  "Use num_threads to run the fluid program.")
    add_arg('split_var',          bool, True,               "Split params on pserver.")
    add_arg('async_mode',         bool, False,              "Async distributed training, only for pserver mode.")
    add_arg('reduce_strategy',    str,  "allreduce",        "Choose from reduce or allreduce.")
T
typhoonzero 已提交
69
    add_arg('enable_sequential_execution', bool, False,            "Skip data not if data not balanced on nodes.")
G
gongweibao 已提交
70 71 72
    #for dgc
    add_arg('enable_dgc', bool, False,            "Skip data not if data not balanced on nodes.")
    add_arg('rampup_begin_step', int, 5008,            "Skip data not if data not balanced on nodes.")
T
typhoonzero 已提交
73
    # yapf: enable
T
typhoonzero 已提交
74 75 76
    args = parser.parse_args()
    return args

Y
Yibing Liu 已提交
77

T
typhoonzero 已提交
78 79 80 81 82 83
def get_device_num():
    if os.getenv("CPU_NUM"):
        return int(os.getenv("CPU_NUM"))
    visible_device = os.getenv('CUDA_VISIBLE_DEVICES')
    if visible_device:
        device_num = len(visible_device.split(','))
Y
Yancey1989 已提交
84
    else:
Y
Yibing Liu 已提交
85 86
        device_num = subprocess.check_output(
            ['nvidia-smi', '-L']).decode().count('\n')
T
typhoonzero 已提交
87 88
    return device_num

Y
Yibing Liu 已提交
89

T
update  
typhoonzero 已提交
90 91
def prepare_reader(is_train, pyreader, args, pass_id=1):
    # NOTE: always use infinite reader for dist training
Y
Yancey1989 已提交
92
    if is_train:
Y
Yibing Liu 已提交
93 94
        reader = train(
            data_dir=args.data_dir, pass_id_as_seed=pass_id, infinite=True)
Y
Yancey1989 已提交
95
    else:
Y
Yancey1989 已提交
96
        reader = val(data_dir=args.data_dir)
T
typhoonzero 已提交
97 98 99 100
    if is_train:
        bs = args.batch_size / get_device_num()
    else:
        bs = 16
Y
Yibing Liu 已提交
101 102
    pyreader.decorate_paddle_reader(paddle.batch(reader, batch_size=bs))

T
typhoonzero 已提交
103 104 105 106 107

def build_program(is_train, main_prog, startup_prog, args):
    pyreader = None
    class_dim = args.class_dim
    image_shape = [int(m) for m in args.image_shape.split(",")]
Y
Yancey1989 已提交
108

T
typhoonzero 已提交
109
    trainer_count = args.dist_env["num_trainers"]
T
update  
typhoonzero 已提交
110
    device_num_per_worker = get_device_num()
Y
Yancey1989 已提交
111
    with fluid.program_guard(main_prog, startup_prog):
T
typhoonzero 已提交
112 113 114 115 116 117
        pyreader = fluid.layers.py_reader(
            capacity=16,
            shapes=([-1] + image_shape, (-1, 1)),
            dtypes=('float32', 'int64'),
            name="train_reader" if is_train else "test_reader",
            use_double_buffer=True)
Y
Yancey1989 已提交
118
        with fluid.unique_name.guard():
T
typhoonzero 已提交
119 120 121
            image, label = fluid.layers.read_file(pyreader)
            if args.fp16:
                image = fluid.layers.cast(image, "float16")
Y
Yibing Liu 已提交
122 123
            model_def = models.__dict__[args.model](layers=50,
                                                    is_train=is_train)
T
typhoonzero 已提交
124
            predict = model_def.net(image, class_dim=class_dim)
Y
Yibing Liu 已提交
125 126
            cost, pred = fluid.layers.softmax_with_cross_entropy(
                predict, label, return_softmax=True)
T
typhoonzero 已提交
127 128 129 130
            if args.scale_loss > 1:
                avg_cost = fluid.layers.mean(x=cost) * float(args.scale_loss)
            else:
                avg_cost = fluid.layers.mean(x=cost)
Y
Yancey1989 已提交
131

T
typhoonzero 已提交
132 133
            batch_acc1 = fluid.layers.accuracy(input=pred, label=label, k=1)
            batch_acc5 = fluid.layers.accuracy(input=pred, label=label, k=5)
Y
Yancey1989 已提交
134 135 136

            optimizer = None
            if is_train:
T
typhoonzero 已提交
137 138
                start_lr = args.lr
                end_lr = args.lr * trainer_count * args.multi_batch_repeat
T
update  
typhoonzero 已提交
139 140 141
                if os.getenv("FLAGS_selected_gpus"):
                    # in multi process mode, "trainer_count" will be total devices
                    # in the whole cluster, and we need to scale num_of nodes.
T
typhoonzero 已提交
142
                    end_lr /= device_num_per_worker
T
update  
typhoonzero 已提交
143

T
typhoonzero 已提交
144
                total_images = args.total_images / trainer_count
T
typhoonzero 已提交
145
                if os.getenv("FLAGS_selected_gpus"):
Y
Yibing Liu 已提交
146 147 148
                    step = int(total_images /
                               (args.batch_size / device_num_per_worker *
                                args.multi_batch_repeat) + 1)
T
typhoonzero 已提交
149
                else:
Y
Yibing Liu 已提交
150 151
                    step = int(total_images / (args.batch_size *
                                               args.multi_batch_repeat) + 1)
T
typhoonzero 已提交
152 153
                warmup_steps = step * 5  # warmup 5 passes
                epochs = [30, 60, 80]
Y
Yancey1989 已提交
154
                bd = [step * e for e in epochs]
T
typhoonzero 已提交
155
                base_lr = end_lr
Y
Yancey1989 已提交
156 157
                lr = []
                lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
Y
Yibing Liu 已提交
158 159
                print("start lr: %s, end lr: %s, decay boundaries: %s" %
                      (start_lr, end_lr, bd))
T
typhoonzero 已提交
160

T
typhoonzero 已提交
161 162 163
                # NOTE: we put weight decay in layers config, and remove
                # weight decay on bn layers, so don't add weight decay in
                # optimizer config.
Y
Yancey1989 已提交
164
                optimizer = fluid.optimizer.Momentum(
T
typhoonzero 已提交
165
                    learning_rate=utils.learning_rate.lr_warmup(
T
typhoonzero 已提交
166 167
                        fluid.layers.piecewise_decay(
                            boundaries=bd, values=lr),
Y
Yibing Liu 已提交
168 169 170
                        warmup_steps,
                        start_lr,
                        end_lr),
T
typhoonzero 已提交
171
                    momentum=0.9)
G
gongweibao 已提交
172 173 174 175 176 177

                if args.enable_dgc:
                    optimizer = fluid.optimizer.DGCMomentumOptimizer(
                        learning_rate=utils.learning_rate.lr_warmup(
                            fluid.layers.piecewise_decay(
                                boundaries=bd, values=lr),
Y
Yibing Liu 已提交
178 179 180
                            warmup_steps,
                            start_lr,
                            end_lr),
G
gongweibao 已提交
181 182 183 184
                        momentum=0.9,
                        sparsity=[0.999, 0.999],
                        rampup_begin_step=args.rampup_begin_step)

T
typhoonzero 已提交
185 186 187
                if args.fp16:
                    params_grads = optimizer.backward(avg_cost)
                    master_params_grads = utils.create_master_params_grads(
Y
Yibing Liu 已提交
188 189 190 191 192
                        params_grads,
                        main_prog,
                        startup_prog,
                        args.scale_loss,
                        reduce_master_grad=args.reduce_master_grad)
T
typhoonzero 已提交
193
                    optimizer.apply_gradients(master_params_grads)
Y
Yibing Liu 已提交
194 195
                    utils.master_param_to_train_param(master_params_grads,
                                                      params_grads, main_prog)
T
typhoonzero 已提交
196 197
                else:
                    optimizer.minimize(avg_cost)
Y
Yancey1989 已提交
198

T
typhoonzero 已提交
199 200 201 202 203 204 205 206 207 208 209
    # prepare reader for current program
    prepare_reader(is_train, pyreader, args)

    return pyreader, avg_cost, batch_acc1, batch_acc5


def test_single(exe, test_prog, args, pyreader, fetch_list):
    acc1 = fluid.metrics.Accuracy()
    acc5 = fluid.metrics.Accuracy()
    test_losses = []
    pyreader.start()
Y
Yancey1989 已提交
210 211
    while True:
        try:
T
typhoonzero 已提交
212 213 214 215 216 217
            acc_rets = exe.run(program=test_prog, fetch_list=fetch_list)
            test_losses.append(acc_rets[0])
            acc1.update(value=np.array(acc_rets[1]), weight=args.batch_size)
            acc5.update(value=np.array(acc_rets[2]), weight=args.batch_size)
        except fluid.core.EOFException:
            pyreader.reset()
Y
Yancey1989 已提交
218
            break
T
typhoonzero 已提交
219 220
    test_avg_loss = np.mean(np.array(test_losses))
    return test_avg_loss, np.mean(acc1.eval()), np.mean(acc5.eval())
Y
Yancey1989 已提交
221

Y
Yibing Liu 已提交
222

T
typhoonzero 已提交
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
def test_parallel(exe, test_prog, args, pyreader, fetch_list):
    acc1 = fluid.metrics.Accuracy()
    acc5 = fluid.metrics.Accuracy()
    test_losses = []
    pyreader.start()
    while True:
        try:
            acc_rets = exe.run(fetch_list=fetch_list)
            test_losses.append(acc_rets[0])
            acc1.update(value=np.array(acc_rets[1]), weight=args.batch_size)
            acc5.update(value=np.array(acc_rets[2]), weight=args.batch_size)
        except fluid.core.EOFException:
            pyreader.reset()
            break
    test_avg_loss = np.mean(np.array(test_losses))
    return test_avg_loss, np.mean(acc1.eval()), np.mean(acc5.eval())


T
typhoonzero 已提交
241 242 243 244
def run_pserver(train_prog, startup_prog):
    server_exe = fluid.Executor(fluid.CPUPlace())
    server_exe.run(startup_prog)
    server_exe.run(train_prog)
Y
Yancey1989 已提交
245

Y
Yibing Liu 已提交
246

T
typhoonzero 已提交
247 248 249 250
def train_parallel(args):
    train_prog = fluid.Program()
    test_prog = fluid.Program()
    startup_prog = fluid.Program()
T
typhoonzero 已提交
251

Y
Yibing Liu 已提交
252 253 254 255
    train_pyreader, train_cost, train_acc1, train_acc5 = build_program(
        True, train_prog, startup_prog, args)
    test_pyreader, test_cost, test_acc1, test_acc5 = build_program(
        False, test_prog, startup_prog, args)
T
typhoonzero 已提交
256 257

    if args.update_method == "pserver":
Y
Yibing Liu 已提交
258 259
        train_prog, startup_prog = pserver_prepare(args, train_prog,
                                                   startup_prog)
T
typhoonzero 已提交
260
    elif args.update_method == "nccl2":
G
gongweibao 已提交
261
        nccl2_prepare(args, startup_prog, main_prog=train_prog)
Y
Yancey1989 已提交
262

T
typhoonzero 已提交
263 264 265 266 267 268 269 270 271
    if args.dist_env["training_role"] == "PSERVER":
        run_pserver(train_prog, startup_prog)
        exit(0)

    if args.use_gpu:
        # NOTE: for multi process mode: one process per GPU device.        
        gpu_id = 0
        if os.getenv("FLAGS_selected_gpus"):
            gpu_id = int(os.getenv("FLAGS_selected_gpus"))
Y
Yibing Liu 已提交
272
    place = fluid.CUDAPlace(gpu_id) if args.use_gpu else fluid.CPUPlace()
Y
Yancey1989 已提交
273 274

    startup_exe = fluid.Executor(place)
T
typhoonzero 已提交
275
    if args.multi_batch_repeat > 1:
Y
Yibing Liu 已提交
276 277
        append_bn_repeat_init_op(train_prog, startup_prog,
                                 args.multi_batch_repeat)
Y
Yancey1989 已提交
278
    startup_exe.run(startup_prog)
T
typhoonzero 已提交
279

T
typhoonzero 已提交
280
    if args.checkpoint:
Y
Yibing Liu 已提交
281 282
        fluid.io.load_persistables(
            startup_exe, args.checkpoint, main_program=train_prog)
T
typhoonzero 已提交
283

Y
Yancey1989 已提交
284
    strategy = fluid.ExecutionStrategy()
T
typhoonzero 已提交
285
    strategy.num_threads = args.num_threads
G
gongweibao 已提交
286
    # num_iteration_per_drop_scope indicates how
T
typhoonzero 已提交
287 288 289
    # many iterations to clean up the temp variables which
    # is generated during execution. It may make the execution faster,
    # because the temp variable's shape are the same between two iterations.
G
gongweibao 已提交
290 291
    strategy.num_iteration_per_drop_scope = 30

Y
Yancey1989 已提交
292
    build_strategy = fluid.BuildStrategy()
T
typhoonzero 已提交
293 294
    build_strategy.enable_inplace = False
    build_strategy.memory_optimize = False
Y
Yibing Liu 已提交
295 296 297
    build_strategy.enable_sequential_execution = bool(
        args.enable_sequential_execution)

Y
Yancey1989 已提交
298 299 300 301 302 303 304
    if args.reduce_strategy == "reduce":
        build_strategy.reduce_strategy = fluid.BuildStrategy(
        ).ReduceStrategy.Reduce
    else:
        build_strategy.reduce_strategy = fluid.BuildStrategy(
        ).ReduceStrategy.AllReduce

T
typhoonzero 已提交
305
    if args.update_method == "pserver" or args.update_method == "local":
Y
Yancey1989 已提交
306 307 308 309 310
        # parameter server mode distributed training, merge
        # gradients on local server, do not initialize
        # ParallelExecutor with multi server all-reduce mode.
        num_trainers = 1
        trainer_id = 0
T
typhoonzero 已提交
311 312 313
    else:
        num_trainers = args.dist_env["num_trainers"]
        trainer_id = args.dist_env["trainer_id"]
T
typhoonzero 已提交
314 315 316 317 318 319 320 321 322
        # Set this to let build_strategy to add "allreduce_deps_pass" automatically
        build_strategy.num_trainers = num_trainers
        build_strategy.trainer_id = trainer_id

    if args.multi_batch_repeat > 1:
        pass_builder = build_strategy._finalize_strategy_and_create_passes()
        mypass = pass_builder.insert_pass(
            len(pass_builder.all_passes()) - 4, "multi_batch_merge_pass")
        mypass.set("num_repeats", args.multi_batch_repeat)
Y
Yancey1989 已提交
323 324 325

    exe = fluid.ParallelExecutor(
        True,
T
typhoonzero 已提交
326
        train_cost.name,
Y
Yancey1989 已提交
327 328 329 330 331 332
        main_program=train_prog,
        exec_strategy=strategy,
        build_strategy=build_strategy,
        num_trainers=num_trainers,
        trainer_id=trainer_id)

T
typhoonzero 已提交
333 334 335 336 337 338 339 340
    # Uncomment below lines to use ParallelExecutor to run test.
    # test_exe = fluid.ParallelExecutor(
    #     True,
    #     main_program=test_prog,
    #     share_vars_from=exe,
    #     scope=fluid.global_scope().new_scope()
    # )

T
typhoonzero 已提交
341 342
    over_all_start = time.time()
    fetch_list = [train_cost.name, train_acc1.name, train_acc5.name]
T
typhoonzero 已提交
343 344 345
    # 1. MP mode, batch size for current process should be args.batch_size / GPUs
    # 2. SP/PG mode, batch size for each process should be original args.batch_size
    if os.getenv("FLAGS_selected_gpus"):
Y
Yibing Liu 已提交
346 347
        steps_per_pass = args.total_images / (
            args.batch_size / get_device_num()) / args.dist_env["num_trainers"]
T
typhoonzero 已提交
348
    else:
Y
Yibing Liu 已提交
349 350
        steps_per_pass = args.total_images / args.batch_size / args.dist_env[
            "num_trainers"]
T
typhoonzero 已提交
351

T
typhoonzero 已提交
352
    for pass_id in range(args.num_epochs):
Y
Yancey1989 已提交
353 354
        num_samples = 0
        start_time = time.time()
T
typhoonzero 已提交
355
        batch_id = 1
T
update  
typhoonzero 已提交
356 357
        if pass_id == 0:
            train_pyreader.start()
Y
Yancey1989 已提交
358 359
        while True:
            try:
T
typhoonzero 已提交
360 361
                if batch_id % 30 == 0:
                    fetch_ret = exe.run(fetch_list)
T
typhoonzero 已提交
362
                    fetched_data = [np.mean(np.array(d)) for d in fetch_ret]
Y
Yibing Liu 已提交
363 364 365 366 367
                    print(
                        "Pass [%d/%d], batch [%d/%d], loss %s, acc1: %s, acc5: %s, avg batch time %.4f"
                        % (pass_id, args.num_epochs, batch_id, steps_per_pass,
                           fetched_data[0], fetched_data[1], fetched_data[2],
                           (time.time() - start_time) / batch_id))
T
typhoonzero 已提交
368 369
                else:
                    fetch_ret = exe.run([])
T
typhoonzero 已提交
370
            except fluid.core.EOFException:
Y
Yancey1989 已提交
371
                break
T
typhoonzero 已提交
372
            except fluid.core.EnforceNotMet:
Y
Yancey1989 已提交
373 374
                traceback.print_exc()
                break
T
typhoonzero 已提交
375
            num_samples += args.batch_size
Y
Yancey1989 已提交
376
            batch_id += 1
T
update  
typhoonzero 已提交
377
            if batch_id >= steps_per_pass:
T
update  
typhoonzero 已提交
378
                break
Y
Yancey1989 已提交
379 380

        print_train_time(start_time, time.time(), num_samples)
T
refine  
typhoonzero 已提交
381
        if pass_id >= args.start_test_pass:
T
typhoonzero 已提交
382 383
            if args.multi_batch_repeat > 1:
                copyback_repeat_bn_params(train_prog)
T
typhoonzero 已提交
384
            test_fetch_list = [test_cost.name, test_acc1.name, test_acc5.name]
Y
Yibing Liu 已提交
385 386
            test_ret = test_single(startup_exe, test_prog, args, test_pyreader,
                                   test_fetch_list)
T
typhoonzero 已提交
387 388
            # NOTE: switch to below line if you use ParallelExecutor to run test.
            # test_ret = test_parallel(test_exe, test_prog, args, test_pyreader,test_fetch_list)
T
typhoonzero 已提交
389 390
            print("Pass: %d, Test Loss %s, test acc1: %s, test acc5: %s\n" %
                  (pass_id, test_ret[0], test_ret[1], test_ret[2]))
T
typhoonzero 已提交
391
            model_path = os.path.join(args.model_save_dir + '/' + args.model,
Y
Yibing Liu 已提交
392
                                      str(pass_id))
T
typhoonzero 已提交
393 394 395
            print("saving model to ", model_path)
            if not os.path.isdir(model_path):
                os.makedirs(model_path)
Y
Yibing Liu 已提交
396 397
            fluid.io.save_persistables(
                startup_exe, model_path, main_program=train_prog)
T
update  
typhoonzero 已提交
398
    train_pyreader.reset()
Y
Yancey1989 已提交
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413
    startup_exe.close()
    print("total train time: ", time.time() - over_all_start)


def print_train_time(start_time, end_time, num_samples):
    train_elapsed = end_time - start_time
    examples_per_sec = num_samples / train_elapsed
    print('\nTotal examples: %d, total time: %.5f, %.5f examples/sed\n' %
          (num_samples, train_elapsed, examples_per_sec))


def print_paddle_envs():
    print('----------- Configuration envs -----------')
    for k in os.environ:
        if "PADDLE_" in k:
414
            print("ENV %s:%s" % (k, os.environ[k]))
Y
Yancey1989 已提交
415 416 417 418 419 420 421
    print('------------------------------------------------')


def main():
    args = parse_args()
    print_arguments(args)
    print_paddle_envs()
T
typhoonzero 已提交
422 423
    args.dist_env = dist_env()
    train_parallel(args)
Y
Yancey1989 已提交
424

Y
Yibing Liu 已提交
425

Y
Yancey1989 已提交
426 427
if __name__ == "__main__":
    main()