dist_train.py 13.8 KB
Newer Older
Y
Yancey1989 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import time
import os
import traceback
T
typhoonzero 已提交
19 20
import functools
import subprocess
Y
Yancey1989 已提交
21 22 23 24 25 26

import numpy as np

import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
27
import six
Y
Yancey1989 已提交
28 29 30
import sys
sys.path.append("..")
import models
T
typhoonzero 已提交
31
import utils
Y
Yancey1989 已提交
32
from reader import train, val
T
typhoonzero 已提交
33 34 35 36
from utility import add_arguments, print_arguments
from batch_merge import copyback_repeat_bn_params, append_bn_repeat_init_op
from dist_utils import pserver_prepare, nccl2_prepare
from env import dist_env
Y
Yancey1989 已提交
37

T
typhoonzero 已提交
38
def parse_args():
T
typhoonzero 已提交
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
    parser = argparse.ArgumentParser(description=__doc__)
    add_arg = functools.partial(add_arguments, argparser=parser)
    # yapf: disable
    add_arg('batch_size',       int,   256,                  "Minibatch size.")
    add_arg('use_gpu',          bool,  True,                 "Whether to use GPU or not.")
    add_arg('total_images',     int,   1281167,              "Training image number.")
    add_arg('num_epochs',       int,   120,                  "number of epochs.")
    add_arg('class_dim',        int,   1000,                 "Class number.")
    add_arg('image_shape',      str,   "3,224,224",          "input image size")
    add_arg('model_save_dir',   str,   "output",             "model save directory")
    add_arg('with_mem_opt',     bool,  False,                 "Whether to use memory optimization or not.")
    add_arg('pretrained_model', str,   None,                 "Whether to use pretrained model.")
    add_arg('checkpoint',       str,   None,                 "Whether to resume checkpoint.")
    add_arg('lr',               float, 0.1,                  "set learning rate.")
    add_arg('lr_strategy',      str,   "piecewise_decay",    "Set the learning rate decay strategy.")
    add_arg('model',            str,   "DistResNet",         "Set the network to use.")
    add_arg('enable_ce',        bool,  False,                "If set True, enable continuous evaluation job.")
    add_arg('data_dir',         str,   "./data/ILSVRC2012",  "The ImageNet dataset root dir.")
    add_arg('model_category',   str,   "models",             "Whether to use models_name or not, valid value:'models','models_name'" )
    add_arg('fp16',             bool,  False,                "Enable half precision training with fp16." )
    add_arg('scale_loss',       float, 1.0,                  "Scale loss for fp16." )
    # for distributed
    add_arg('update_method',      str,  "local",            "Can be local, pserver, nccl2.")
    add_arg('multi_batch_repeat', int,  1,                  "Batch merge repeats.")
    add_arg('start_test_pass',    int,  0,                  "Start test after x passes.")
    add_arg('num_threads',        int,  8,                  "Use num_threads to run the fluid program.")
    add_arg('split_var',          bool, True,               "Split params on pserver.")
    add_arg('async_mode',         bool, False,              "Async distributed training, only for pserver mode.")
    add_arg('reduce_strategy',    str,  "allreduce",        "Choose from reduce or allreduce.")
T
update  
typhoonzero 已提交
68
    add_arg('skip_unbalanced_data', bool, False,            "Skip data not if data not balanced on nodes.")
T
typhoonzero 已提交
69
    # yapf: enable
T
typhoonzero 已提交
70 71 72
    args = parser.parse_args()
    return args

T
typhoonzero 已提交
73 74 75 76 77 78
def get_device_num():
    if os.getenv("CPU_NUM"):
        return int(os.getenv("CPU_NUM"))
    visible_device = os.getenv('CUDA_VISIBLE_DEVICES')
    if visible_device:
        device_num = len(visible_device.split(','))
Y
Yancey1989 已提交
79
    else:
T
typhoonzero 已提交
80 81 82
        device_num = subprocess.check_output(['nvidia-smi', '-L']).decode().count('\n')
    return device_num

T
typhoonzero 已提交
83
def prepare_reader(is_train, pyreader, args, pass_id=0):
Y
Yancey1989 已提交
84
    if is_train:
T
typhoonzero 已提交
85
        reader = train(data_dir=args.data_dir, pass_id_as_seed=pass_id)
Y
Yancey1989 已提交
86
    else:
Y
Yancey1989 已提交
87
        reader = val(data_dir=args.data_dir)
T
typhoonzero 已提交
88 89 90 91 92 93 94 95 96 97 98 99 100
    if is_train:
        bs = args.batch_size / get_device_num()
    else:
        bs = 16
    pyreader.decorate_paddle_reader(
        paddle.batch(
            reader,
            batch_size=bs))

def build_program(is_train, main_prog, startup_prog, args):
    pyreader = None
    class_dim = args.class_dim
    image_shape = [int(m) for m in args.image_shape.split(",")]
Y
Yancey1989 已提交
101

T
typhoonzero 已提交
102
    trainer_count = args.dist_env["num_trainers"]
T
update  
typhoonzero 已提交
103
    device_num_per_worker = get_device_num()
Y
Yancey1989 已提交
104
    with fluid.program_guard(main_prog, startup_prog):
T
typhoonzero 已提交
105 106 107 108 109 110
        pyreader = fluid.layers.py_reader(
            capacity=16,
            shapes=([-1] + image_shape, (-1, 1)),
            dtypes=('float32', 'int64'),
            name="train_reader" if is_train else "test_reader",
            use_double_buffer=True)
Y
Yancey1989 已提交
111
        with fluid.unique_name.guard():
T
typhoonzero 已提交
112 113 114
            image, label = fluid.layers.read_file(pyreader)
            if args.fp16:
                image = fluid.layers.cast(image, "float16")
T
fix  
typhoonzero 已提交
115
            model_def = models.__dict__[args.model](layers=50, is_train=is_train)
T
typhoonzero 已提交
116 117 118 119 120 121
            predict = model_def.net(image, class_dim=class_dim)
            cost, pred = fluid.layers.softmax_with_cross_entropy(predict, label, return_softmax=True) 
            if args.scale_loss > 1:
                avg_cost = fluid.layers.mean(x=cost) * float(args.scale_loss)
            else:
                avg_cost = fluid.layers.mean(x=cost)
Y
Yancey1989 已提交
122

T
typhoonzero 已提交
123 124
            batch_acc1 = fluid.layers.accuracy(input=pred, label=label, k=1)
            batch_acc5 = fluid.layers.accuracy(input=pred, label=label, k=5)
Y
Yancey1989 已提交
125 126 127

            optimizer = None
            if is_train:
T
typhoonzero 已提交
128 129
                start_lr = args.lr
                end_lr = args.lr * trainer_count * args.multi_batch_repeat
T
update  
typhoonzero 已提交
130 131 132 133 134
                if os.getenv("FLAGS_selected_gpus"):
                    # in multi process mode, "trainer_count" will be total devices
                    # in the whole cluster, and we need to scale num_of nodes.
                    end_lr *= device_num_per_worker

T
typhoonzero 已提交
135 136
                total_images = args.total_images / trainer_count
                step = int(total_images / (args.batch_size * args.multi_batch_repeat) + 1)
T
typhoonzero 已提交
137 138
                warmup_steps = step * 5  # warmup 5 passes
                epochs = [30, 60, 80]
Y
Yancey1989 已提交
139
                bd = [step * e for e in epochs]
T
typhoonzero 已提交
140
                base_lr = end_lr
Y
Yancey1989 已提交
141 142
                lr = []
                lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
T
typhoonzero 已提交
143 144 145 146 147
                print("start lr: %s, end lr: %s, decay boundaries: %s" % (
                    start_lr,
                    end_lr,
                    bd
                ))
T
typhoonzero 已提交
148

T
typhoonzero 已提交
149 150 151
                # NOTE: we put weight decay in layers config, and remove
                # weight decay on bn layers, so don't add weight decay in
                # optimizer config.
Y
Yancey1989 已提交
152
                optimizer = fluid.optimizer.Momentum(
T
typhoonzero 已提交
153
                    learning_rate=utils.learning_rate.lr_warmup(
T
typhoonzero 已提交
154 155 156
                        fluid.layers.piecewise_decay(
                            boundaries=bd, values=lr),
                        warmup_steps, start_lr, end_lr),
T
typhoonzero 已提交
157
                    momentum=0.9)
T
typhoonzero 已提交
158 159 160 161 162 163 164 165
                if args.fp16:
                    params_grads = optimizer.backward(avg_cost)
                    master_params_grads = utils.create_master_params_grads(
                        params_grads, main_prog, startup_prog, args.scale_loss)
                    optimizer.apply_gradients(master_params_grads)
                    utils.master_param_to_train_param(master_params_grads, params_grads, main_prog)
                else:
                    optimizer.minimize(avg_cost)
Y
Yancey1989 已提交
166

T
typhoonzero 已提交
167 168 169 170 171 172 173 174 175 176 177
    # prepare reader for current program
    prepare_reader(is_train, pyreader, args)

    return pyreader, avg_cost, batch_acc1, batch_acc5


def test_single(exe, test_prog, args, pyreader, fetch_list):
    acc1 = fluid.metrics.Accuracy()
    acc5 = fluid.metrics.Accuracy()
    test_losses = []
    pyreader.start()
Y
Yancey1989 已提交
178 179
    while True:
        try:
T
typhoonzero 已提交
180 181 182 183 184 185
            acc_rets = exe.run(program=test_prog, fetch_list=fetch_list)
            test_losses.append(acc_rets[0])
            acc1.update(value=np.array(acc_rets[1]), weight=args.batch_size)
            acc5.update(value=np.array(acc_rets[2]), weight=args.batch_size)
        except fluid.core.EOFException:
            pyreader.reset()
Y
Yancey1989 已提交
186
            break
T
typhoonzero 已提交
187 188
    test_avg_loss = np.mean(np.array(test_losses))
    return test_avg_loss, np.mean(acc1.eval()), np.mean(acc5.eval())
Y
Yancey1989 已提交
189

T
typhoonzero 已提交
190 191 192 193
def run_pserver(train_prog, startup_prog):
    server_exe = fluid.Executor(fluid.CPUPlace())
    server_exe.run(startup_prog)
    server_exe.run(train_prog)
Y
Yancey1989 已提交
194

T
typhoonzero 已提交
195 196 197 198
def train_parallel(args):
    train_prog = fluid.Program()
    test_prog = fluid.Program()
    startup_prog = fluid.Program()
T
typhoonzero 已提交
199

T
typhoonzero 已提交
200 201 202 203 204 205 206
    train_pyreader, train_cost, train_acc1, train_acc5 = build_program(True, train_prog, startup_prog, args)
    test_pyreader, test_cost, test_acc1, test_acc5 = build_program(False, test_prog, startup_prog, args)

    if args.update_method == "pserver":
        train_prog, startup_prog = pserver_prepare(args, train_prog, startup_prog)
    elif args.update_method == "nccl2":
        nccl2_prepare(args, startup_prog)
Y
Yancey1989 已提交
207

T
typhoonzero 已提交
208 209 210 211 212 213 214 215 216 217
    if args.dist_env["training_role"] == "PSERVER":
        run_pserver(train_prog, startup_prog)
        exit(0)

    if args.use_gpu:
        # NOTE: for multi process mode: one process per GPU device.        
        gpu_id = 0
        if os.getenv("FLAGS_selected_gpus"):
            gpu_id = int(os.getenv("FLAGS_selected_gpus"))
    place = core.CUDAPlace(gpu_id) if args.use_gpu else core.CPUPlace()
Y
Yancey1989 已提交
218 219

    startup_exe = fluid.Executor(place)
T
typhoonzero 已提交
220 221
    if args.multi_batch_repeat > 1:
        append_bn_repeat_init_op(train_prog, startup_prog, args.multi_batch_repeat)
Y
Yancey1989 已提交
222
    startup_exe.run(startup_prog)
T
typhoonzero 已提交
223

Y
Yancey1989 已提交
224
    strategy = fluid.ExecutionStrategy()
T
typhoonzero 已提交
225
    strategy.num_threads = args.num_threads
Y
Yancey1989 已提交
226
    build_strategy = fluid.BuildStrategy()
T
typhoonzero 已提交
227
    if args.multi_batch_repeat > 1:
T
typhoonzero 已提交
228
        pass_builder = build_strategy._finalize_strategy_and_create_passes()
T
typhoonzero 已提交
229 230 231
        mypass = pass_builder.insert_pass(
            len(pass_builder.all_passes()) - 2, "multi_batch_merge_pass")
        mypass.set_int("num_repeats", args.multi_batch_repeat)
Y
Yancey1989 已提交
232 233 234 235 236 237 238
    if args.reduce_strategy == "reduce":
        build_strategy.reduce_strategy = fluid.BuildStrategy(
        ).ReduceStrategy.Reduce
    else:
        build_strategy.reduce_strategy = fluid.BuildStrategy(
        ).ReduceStrategy.AllReduce

T
typhoonzero 已提交
239
    if args.update_method == "pserver" or args.update_method == "local":
Y
Yancey1989 已提交
240 241 242 243 244
        # parameter server mode distributed training, merge
        # gradients on local server, do not initialize
        # ParallelExecutor with multi server all-reduce mode.
        num_trainers = 1
        trainer_id = 0
T
typhoonzero 已提交
245 246 247
    else:
        num_trainers = args.dist_env["num_trainers"]
        trainer_id = args.dist_env["trainer_id"]
Y
Yancey1989 已提交
248 249 250

    exe = fluid.ParallelExecutor(
        True,
T
typhoonzero 已提交
251
        train_cost.name,
Y
Yancey1989 已提交
252 253 254 255 256 257
        main_program=train_prog,
        exec_strategy=strategy,
        build_strategy=build_strategy,
        num_trainers=num_trainers,
        trainer_id=trainer_id)

T
typhoonzero 已提交
258 259
    over_all_start = time.time()
    fetch_list = [train_cost.name, train_acc1.name, train_acc5.name]
T
update  
typhoonzero 已提交
260
    steps_per_pass = args.total_images / args.batch_size / args.dist_env["num_trainers"]
T
typhoonzero 已提交
261
    for pass_id in range(args.num_epochs):
Y
Yancey1989 已提交
262 263
        num_samples = 0
        start_time = time.time()
T
typhoonzero 已提交
264
        batch_id = 1
T
typhoonzero 已提交
265 266
        # use pass_id+1 as per pass global shuffle for distributed training
        prepare_reader(True, train_pyreader, args, pass_id + 1)
T
typhoonzero 已提交
267
        train_pyreader.start()
Y
Yancey1989 已提交
268 269
        while True:
            try:
T
typhoonzero 已提交
270 271
                if batch_id % 30 == 0:
                    fetch_ret = exe.run(fetch_list)
T
typhoonzero 已提交
272 273 274 275
                    fetched_data = [np.mean(np.array(d)) for d in fetch_ret]
                    print("Pass %d, batch %d, loss %s, acc1: %s, acc5: %s, avg batch time %.4f" %
                        (pass_id, batch_id, fetched_data[0], fetched_data[1],
                         fetched_data[2], (time.time()-start_time) / batch_id))
T
typhoonzero 已提交
276 277
                else:
                    fetch_ret = exe.run([])
T
typhoonzero 已提交
278
            except fluid.core.EOFException:
Y
Yancey1989 已提交
279
                break
T
typhoonzero 已提交
280
            except fluid.core.EnforceNotMet:
Y
Yancey1989 已提交
281 282
                traceback.print_exc()
                break
T
typhoonzero 已提交
283
            num_samples += args.batch_size
Y
Yancey1989 已提交
284
            batch_id += 1
T
update  
typhoonzero 已提交
285 286
            if args.skip_unbalanced_data and batch_id >= steps_per_pass:
                break
Y
Yancey1989 已提交
287 288

        print_train_time(start_time, time.time(), num_samples)
T
typhoonzero 已提交
289
        train_pyreader.reset()
Y
Yancey1989 已提交
290

T
typhoonzero 已提交
291
        if pass_id > args.start_test_pass:
T
typhoonzero 已提交
292 293
            if args.multi_batch_repeat > 1:
                copyback_repeat_bn_params(train_prog)
T
typhoonzero 已提交
294 295 296 297
            test_fetch_list = [test_cost.name, test_acc1.name, test_acc5.name]
            test_ret = test_single(startup_exe, test_prog, args, test_pyreader,test_fetch_list)
            print("Pass: %d, Test Loss %s, test acc1: %s, test acc5: %s\n" %
                  (pass_id, test_ret[0], test_ret[1], test_ret[2]))
Y
Yancey1989 已提交
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313

    startup_exe.close()
    print("total train time: ", time.time() - over_all_start)


def print_train_time(start_time, end_time, num_samples):
    train_elapsed = end_time - start_time
    examples_per_sec = num_samples / train_elapsed
    print('\nTotal examples: %d, total time: %.5f, %.5f examples/sed\n' %
          (num_samples, train_elapsed, examples_per_sec))


def print_paddle_envs():
    print('----------- Configuration envs -----------')
    for k in os.environ:
        if "PADDLE_" in k:
314
            print("ENV %s:%s" % (k, os.environ[k]))
Y
Yancey1989 已提交
315 316 317 318 319 320 321
    print('------------------------------------------------')


def main():
    args = parse_args()
    print_arguments(args)
    print_paddle_envs()
T
typhoonzero 已提交
322 323
    args.dist_env = dist_env()
    train_parallel(args)
Y
Yancey1989 已提交
324 325 326

if __name__ == "__main__":
    main()