dist_train.py 16.0 KB
Newer Older
Y
Yancey1989 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import time
import os
import traceback
T
typhoonzero 已提交
19 20
import functools
import subprocess
Y
Yancey1989 已提交
21 22 23 24 25 26

import numpy as np

import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
27
import six
Y
Yancey1989 已提交
28 29 30
import sys
sys.path.append("..")
import models
T
typhoonzero 已提交
31
import utils
Y
Yancey1989 已提交
32
from reader import train, val
T
typhoonzero 已提交
33 34 35 36
from utility import add_arguments, print_arguments
from batch_merge import copyback_repeat_bn_params, append_bn_repeat_init_op
from dist_utils import pserver_prepare, nccl2_prepare
from env import dist_env
Y
Yancey1989 已提交
37

T
typhoonzero 已提交
38
def parse_args():
T
typhoonzero 已提交
39 40 41 42 43 44 45 46 47 48
    parser = argparse.ArgumentParser(description=__doc__)
    add_arg = functools.partial(add_arguments, argparser=parser)
    # yapf: disable
    add_arg('batch_size',       int,   256,                  "Minibatch size.")
    add_arg('use_gpu',          bool,  True,                 "Whether to use GPU or not.")
    add_arg('total_images',     int,   1281167,              "Training image number.")
    add_arg('num_epochs',       int,   120,                  "number of epochs.")
    add_arg('class_dim',        int,   1000,                 "Class number.")
    add_arg('image_shape',      str,   "3,224,224",          "input image size")
    add_arg('model_save_dir',   str,   "output",             "model save directory")
T
typhoonzero 已提交
49
    add_arg('with_mem_opt',     bool,  False,                "Whether to use memory optimization or not.")
T
typhoonzero 已提交
50 51 52 53 54 55 56 57 58 59
    add_arg('pretrained_model', str,   None,                 "Whether to use pretrained model.")
    add_arg('checkpoint',       str,   None,                 "Whether to resume checkpoint.")
    add_arg('lr',               float, 0.1,                  "set learning rate.")
    add_arg('lr_strategy',      str,   "piecewise_decay",    "Set the learning rate decay strategy.")
    add_arg('model',            str,   "DistResNet",         "Set the network to use.")
    add_arg('enable_ce',        bool,  False,                "If set True, enable continuous evaluation job.")
    add_arg('data_dir',         str,   "./data/ILSVRC2012",  "The ImageNet dataset root dir.")
    add_arg('model_category',   str,   "models",             "Whether to use models_name or not, valid value:'models','models_name'" )
    add_arg('fp16',             bool,  False,                "Enable half precision training with fp16." )
    add_arg('scale_loss',       float, 1.0,                  "Scale loss for fp16." )
T
typhoonzero 已提交
60
    add_arg('reduce_master_grad', bool, False,               "Whether to allreduce fp32 gradients." )
T
typhoonzero 已提交
61 62 63 64 65 66 67 68
    # for distributed
    add_arg('update_method',      str,  "local",            "Can be local, pserver, nccl2.")
    add_arg('multi_batch_repeat', int,  1,                  "Batch merge repeats.")
    add_arg('start_test_pass',    int,  0,                  "Start test after x passes.")
    add_arg('num_threads',        int,  8,                  "Use num_threads to run the fluid program.")
    add_arg('split_var',          bool, True,               "Split params on pserver.")
    add_arg('async_mode',         bool, False,              "Async distributed training, only for pserver mode.")
    add_arg('reduce_strategy',    str,  "allreduce",        "Choose from reduce or allreduce.")
T
update  
typhoonzero 已提交
69
    add_arg('skip_unbalanced_data', bool, False,            "Skip data not if data not balanced on nodes.")
T
typhoonzero 已提交
70
    add_arg('enable_sequential_execution', bool, False,            "Skip data not if data not balanced on nodes.")
T
typhoonzero 已提交
71
    # yapf: enable
T
typhoonzero 已提交
72 73 74
    args = parser.parse_args()
    return args

T
typhoonzero 已提交
75 76 77 78 79 80
def get_device_num():
    if os.getenv("CPU_NUM"):
        return int(os.getenv("CPU_NUM"))
    visible_device = os.getenv('CUDA_VISIBLE_DEVICES')
    if visible_device:
        device_num = len(visible_device.split(','))
Y
Yancey1989 已提交
81
    else:
T
typhoonzero 已提交
82 83 84
        device_num = subprocess.check_output(['nvidia-smi', '-L']).decode().count('\n')
    return device_num

T
typhoonzero 已提交
85
def prepare_reader(is_train, pyreader, args, pass_id=0):
Y
Yancey1989 已提交
86
    if is_train:
T
typhoonzero 已提交
87
        reader = train(data_dir=args.data_dir, pass_id_as_seed=pass_id)
Y
Yancey1989 已提交
88
    else:
Y
Yancey1989 已提交
89
        reader = val(data_dir=args.data_dir)
T
typhoonzero 已提交
90 91 92 93 94 95 96 97 98 99 100 101 102
    if is_train:
        bs = args.batch_size / get_device_num()
    else:
        bs = 16
    pyreader.decorate_paddle_reader(
        paddle.batch(
            reader,
            batch_size=bs))

def build_program(is_train, main_prog, startup_prog, args):
    pyreader = None
    class_dim = args.class_dim
    image_shape = [int(m) for m in args.image_shape.split(",")]
Y
Yancey1989 已提交
103

T
typhoonzero 已提交
104
    trainer_count = args.dist_env["num_trainers"]
T
update  
typhoonzero 已提交
105
    device_num_per_worker = get_device_num()
Y
Yancey1989 已提交
106
    with fluid.program_guard(main_prog, startup_prog):
T
typhoonzero 已提交
107 108 109 110 111 112
        pyreader = fluid.layers.py_reader(
            capacity=16,
            shapes=([-1] + image_shape, (-1, 1)),
            dtypes=('float32', 'int64'),
            name="train_reader" if is_train else "test_reader",
            use_double_buffer=True)
Y
Yancey1989 已提交
113
        with fluid.unique_name.guard():
T
typhoonzero 已提交
114 115 116
            image, label = fluid.layers.read_file(pyreader)
            if args.fp16:
                image = fluid.layers.cast(image, "float16")
T
fix  
typhoonzero 已提交
117
            model_def = models.__dict__[args.model](layers=50, is_train=is_train)
T
typhoonzero 已提交
118 119 120 121 122 123
            predict = model_def.net(image, class_dim=class_dim)
            cost, pred = fluid.layers.softmax_with_cross_entropy(predict, label, return_softmax=True) 
            if args.scale_loss > 1:
                avg_cost = fluid.layers.mean(x=cost) * float(args.scale_loss)
            else:
                avg_cost = fluid.layers.mean(x=cost)
Y
Yancey1989 已提交
124

T
typhoonzero 已提交
125 126
            batch_acc1 = fluid.layers.accuracy(input=pred, label=label, k=1)
            batch_acc5 = fluid.layers.accuracy(input=pred, label=label, k=5)
Y
Yancey1989 已提交
127 128 129

            optimizer = None
            if is_train:
T
typhoonzero 已提交
130 131
                start_lr = args.lr
                end_lr = args.lr * trainer_count * args.multi_batch_repeat
T
update  
typhoonzero 已提交
132 133 134
                if os.getenv("FLAGS_selected_gpus"):
                    # in multi process mode, "trainer_count" will be total devices
                    # in the whole cluster, and we need to scale num_of nodes.
T
typhoonzero 已提交
135
                    end_lr /= device_num_per_worker
T
update  
typhoonzero 已提交
136

T
typhoonzero 已提交
137 138
                total_images = args.total_images / trainer_count
                step = int(total_images / (args.batch_size * args.multi_batch_repeat) + 1)
T
typhoonzero 已提交
139 140
                warmup_steps = step * 5  # warmup 5 passes
                epochs = [30, 60, 80]
Y
Yancey1989 已提交
141
                bd = [step * e for e in epochs]
T
typhoonzero 已提交
142
                base_lr = end_lr
Y
Yancey1989 已提交
143 144
                lr = []
                lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
T
typhoonzero 已提交
145 146 147 148 149
                print("start lr: %s, end lr: %s, decay boundaries: %s" % (
                    start_lr,
                    end_lr,
                    bd
                ))
T
typhoonzero 已提交
150

T
typhoonzero 已提交
151 152 153
                # NOTE: we put weight decay in layers config, and remove
                # weight decay on bn layers, so don't add weight decay in
                # optimizer config.
Y
Yancey1989 已提交
154
                optimizer = fluid.optimizer.Momentum(
T
typhoonzero 已提交
155
                    learning_rate=utils.learning_rate.lr_warmup(
T
typhoonzero 已提交
156 157 158
                        fluid.layers.piecewise_decay(
                            boundaries=bd, values=lr),
                        warmup_steps, start_lr, end_lr),
T
typhoonzero 已提交
159
                    momentum=0.9)
T
typhoonzero 已提交
160 161 162
                if args.fp16:
                    params_grads = optimizer.backward(avg_cost)
                    master_params_grads = utils.create_master_params_grads(
T
typhoonzero 已提交
163 164
                        params_grads, main_prog, startup_prog, args.scale_loss,
                        reduce_master_grad = args.reduce_master_grad)
T
typhoonzero 已提交
165 166 167 168
                    optimizer.apply_gradients(master_params_grads)
                    utils.master_param_to_train_param(master_params_grads, params_grads, main_prog)
                else:
                    optimizer.minimize(avg_cost)
Y
Yancey1989 已提交
169

T
typhoonzero 已提交
170 171 172 173 174 175 176 177 178 179 180
    # prepare reader for current program
    prepare_reader(is_train, pyreader, args)

    return pyreader, avg_cost, batch_acc1, batch_acc5


def test_single(exe, test_prog, args, pyreader, fetch_list):
    acc1 = fluid.metrics.Accuracy()
    acc5 = fluid.metrics.Accuracy()
    test_losses = []
    pyreader.start()
Y
Yancey1989 已提交
181 182
    while True:
        try:
T
typhoonzero 已提交
183 184 185 186 187 188
            acc_rets = exe.run(program=test_prog, fetch_list=fetch_list)
            test_losses.append(acc_rets[0])
            acc1.update(value=np.array(acc_rets[1]), weight=args.batch_size)
            acc5.update(value=np.array(acc_rets[2]), weight=args.batch_size)
        except fluid.core.EOFException:
            pyreader.reset()
Y
Yancey1989 已提交
189
            break
T
typhoonzero 已提交
190 191
    test_avg_loss = np.mean(np.array(test_losses))
    return test_avg_loss, np.mean(acc1.eval()), np.mean(acc5.eval())
Y
Yancey1989 已提交
192

T
typhoonzero 已提交
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
def test_parallel(exe, test_prog, args, pyreader, fetch_list):
    acc1 = fluid.metrics.Accuracy()
    acc5 = fluid.metrics.Accuracy()
    test_losses = []
    pyreader.start()
    while True:
        try:
            acc_rets = exe.run(fetch_list=fetch_list)
            test_losses.append(acc_rets[0])
            acc1.update(value=np.array(acc_rets[1]), weight=args.batch_size)
            acc5.update(value=np.array(acc_rets[2]), weight=args.batch_size)
        except fluid.core.EOFException:
            pyreader.reset()
            break
    test_avg_loss = np.mean(np.array(test_losses))
    return test_avg_loss, np.mean(acc1.eval()), np.mean(acc5.eval())


T
typhoonzero 已提交
211 212 213 214
def run_pserver(train_prog, startup_prog):
    server_exe = fluid.Executor(fluid.CPUPlace())
    server_exe.run(startup_prog)
    server_exe.run(train_prog)
Y
Yancey1989 已提交
215

T
typhoonzero 已提交
216 217 218 219
def train_parallel(args):
    train_prog = fluid.Program()
    test_prog = fluid.Program()
    startup_prog = fluid.Program()
T
typhoonzero 已提交
220

T
typhoonzero 已提交
221 222 223 224 225 226 227
    train_pyreader, train_cost, train_acc1, train_acc5 = build_program(True, train_prog, startup_prog, args)
    test_pyreader, test_cost, test_acc1, test_acc5 = build_program(False, test_prog, startup_prog, args)

    if args.update_method == "pserver":
        train_prog, startup_prog = pserver_prepare(args, train_prog, startup_prog)
    elif args.update_method == "nccl2":
        nccl2_prepare(args, startup_prog)
Y
Yancey1989 已提交
228

T
typhoonzero 已提交
229 230 231 232 233 234 235 236 237 238
    if args.dist_env["training_role"] == "PSERVER":
        run_pserver(train_prog, startup_prog)
        exit(0)

    if args.use_gpu:
        # NOTE: for multi process mode: one process per GPU device.        
        gpu_id = 0
        if os.getenv("FLAGS_selected_gpus"):
            gpu_id = int(os.getenv("FLAGS_selected_gpus"))
    place = core.CUDAPlace(gpu_id) if args.use_gpu else core.CPUPlace()
Y
Yancey1989 已提交
239 240

    startup_exe = fluid.Executor(place)
T
typhoonzero 已提交
241 242
    if args.multi_batch_repeat > 1:
        append_bn_repeat_init_op(train_prog, startup_prog, args.multi_batch_repeat)
Y
Yancey1989 已提交
243
    startup_exe.run(startup_prog)
T
typhoonzero 已提交
244

T
typhoonzero 已提交
245 246 247
    if args.checkpoint:
        fluid.io.load_persistables(startup_exe, args.checkpoint, main_program=train_prog)

Y
Yancey1989 已提交
248
    strategy = fluid.ExecutionStrategy()
T
typhoonzero 已提交
249
    strategy.num_threads = args.num_threads
Y
Yancey1989 已提交
250
    build_strategy = fluid.BuildStrategy()
T
typhoonzero 已提交
251 252
    build_strategy.enable_inplace = False
    build_strategy.memory_optimize = False
T
typhoonzero 已提交
253
    build_strategy.enable_sequential_execution = bool(args.enable_sequential_execution)
T
typhoonzero 已提交
254 255

    
Y
Yancey1989 已提交
256 257 258 259 260 261 262
    if args.reduce_strategy == "reduce":
        build_strategy.reduce_strategy = fluid.BuildStrategy(
        ).ReduceStrategy.Reduce
    else:
        build_strategy.reduce_strategy = fluid.BuildStrategy(
        ).ReduceStrategy.AllReduce

T
typhoonzero 已提交
263
    if args.update_method == "pserver" or args.update_method == "local":
Y
Yancey1989 已提交
264 265 266 267 268
        # parameter server mode distributed training, merge
        # gradients on local server, do not initialize
        # ParallelExecutor with multi server all-reduce mode.
        num_trainers = 1
        trainer_id = 0
T
typhoonzero 已提交
269 270 271
    else:
        num_trainers = args.dist_env["num_trainers"]
        trainer_id = args.dist_env["trainer_id"]
T
typhoonzero 已提交
272 273 274 275 276 277 278 279 280
        # Set this to let build_strategy to add "allreduce_deps_pass" automatically
        build_strategy.num_trainers = num_trainers
        build_strategy.trainer_id = trainer_id

    if args.multi_batch_repeat > 1:
        pass_builder = build_strategy._finalize_strategy_and_create_passes()
        mypass = pass_builder.insert_pass(
            len(pass_builder.all_passes()) - 4, "multi_batch_merge_pass")
        mypass.set("num_repeats", args.multi_batch_repeat)
Y
Yancey1989 已提交
281 282 283

    exe = fluid.ParallelExecutor(
        True,
T
typhoonzero 已提交
284
        train_cost.name,
Y
Yancey1989 已提交
285 286 287 288 289 290
        main_program=train_prog,
        exec_strategy=strategy,
        build_strategy=build_strategy,
        num_trainers=num_trainers,
        trainer_id=trainer_id)

T
typhoonzero 已提交
291 292 293 294 295 296 297 298
    # Uncomment below lines to use ParallelExecutor to run test.
    # test_exe = fluid.ParallelExecutor(
    #     True,
    #     main_program=test_prog,
    #     share_vars_from=exe,
    #     scope=fluid.global_scope().new_scope()
    # )

T
typhoonzero 已提交
299 300
    over_all_start = time.time()
    fetch_list = [train_cost.name, train_acc1.name, train_acc5.name]
T
update  
typhoonzero 已提交
301
    steps_per_pass = args.total_images / args.batch_size / args.dist_env["num_trainers"]
T
typhoonzero 已提交
302
    for pass_id in range(args.num_epochs):
Y
Yancey1989 已提交
303 304
        num_samples = 0
        start_time = time.time()
T
typhoonzero 已提交
305
        batch_id = 1
T
typhoonzero 已提交
306 307
        # use pass_id+1 as per pass global shuffle for distributed training
        prepare_reader(True, train_pyreader, args, pass_id + 1)
T
typhoonzero 已提交
308
        train_pyreader.start()
Y
Yancey1989 已提交
309 310
        while True:
            try:
T
typhoonzero 已提交
311 312
                if batch_id % 30 == 0:
                    fetch_ret = exe.run(fetch_list)
T
typhoonzero 已提交
313
                    fetched_data = [np.mean(np.array(d)) for d in fetch_ret]
T
refine  
typhoonzero 已提交
314 315
                    print("Pass [%d/%d], batch [%d/%d], loss %s, acc1: %s, acc5: %s, avg batch time %.4f" %
                        (pass_id, args.num_epochs, batch_id, steps_per_pass, fetched_data[0], fetched_data[1],
T
typhoonzero 已提交
316
                         fetched_data[2], (time.time()-start_time) / batch_id))
T
typhoonzero 已提交
317 318
                else:
                    fetch_ret = exe.run([])
T
typhoonzero 已提交
319
            except fluid.core.EOFException:
Y
Yancey1989 已提交
320
                break
T
typhoonzero 已提交
321
            except fluid.core.EnforceNotMet:
Y
Yancey1989 已提交
322 323
                traceback.print_exc()
                break
T
typhoonzero 已提交
324
            num_samples += args.batch_size
Y
Yancey1989 已提交
325
            batch_id += 1
T
update  
typhoonzero 已提交
326 327
            if args.skip_unbalanced_data and batch_id >= steps_per_pass:
                break
Y
Yancey1989 已提交
328 329

        print_train_time(start_time, time.time(), num_samples)
T
typhoonzero 已提交
330
        train_pyreader.reset()
T
refine  
typhoonzero 已提交
331
        if pass_id >= args.start_test_pass:
T
typhoonzero 已提交
332 333
            if args.multi_batch_repeat > 1:
                copyback_repeat_bn_params(train_prog)
T
typhoonzero 已提交
334 335
            test_fetch_list = [test_cost.name, test_acc1.name, test_acc5.name]
            test_ret = test_single(startup_exe, test_prog, args, test_pyreader,test_fetch_list)
T
typhoonzero 已提交
336 337
            # NOTE: switch to below line if you use ParallelExecutor to run test.
            # test_ret = test_parallel(test_exe, test_prog, args, test_pyreader,test_fetch_list)
T
typhoonzero 已提交
338 339
            print("Pass: %d, Test Loss %s, test acc1: %s, test acc5: %s\n" %
                  (pass_id, test_ret[0], test_ret[1], test_ret[2]))
T
typhoonzero 已提交
340 341 342 343 344 345
            model_path = os.path.join(args.model_save_dir + '/' + args.model,
                                  str(pass_id))
            print("saving model to ", model_path)
            if not os.path.isdir(model_path):
                os.makedirs(model_path)
            fluid.io.save_persistables(startup_exe, model_path, main_program=train_prog)
Y
Yancey1989 已提交
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
    startup_exe.close()
    print("total train time: ", time.time() - over_all_start)


def print_train_time(start_time, end_time, num_samples):
    train_elapsed = end_time - start_time
    examples_per_sec = num_samples / train_elapsed
    print('\nTotal examples: %d, total time: %.5f, %.5f examples/sed\n' %
          (num_samples, train_elapsed, examples_per_sec))


def print_paddle_envs():
    print('----------- Configuration envs -----------')
    for k in os.environ:
        if "PADDLE_" in k:
361
            print("ENV %s:%s" % (k, os.environ[k]))
Y
Yancey1989 已提交
362 363 364 365 366 367 368
    print('------------------------------------------------')


def main():
    args = parse_args()
    print_arguments(args)
    print_paddle_envs()
T
typhoonzero 已提交
369 370
    args.dist_env = dist_env()
    train_parallel(args)
Y
Yancey1989 已提交
371 372 373

if __name__ == "__main__":
    main()
T
typhoonzero 已提交
374