fluid_benchmark.py 13.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import cProfile
import time
import os

import numpy as np

import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.profiler as profiler
import paddle.fluid.transpiler.distribute_transpiler as distribute_transpiler

27
from args import *
28 29


X
Xin Pan 已提交
30 31
def append_nccl2_prepare(trainer_id):
    if trainer_id >= 0:
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
        # append gen_nccl_id at the end of startup program
        trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
        port = os.getenv("PADDLE_PSERVER_PORT")
        worker_ips = os.getenv("PADDLE_TRAINER_IPS")
        worker_endpoints = []
        for ip in worker_ips.split(","):
            worker_endpoints.append(':'.join([ip, port]))
        num_trainers = len(worker_endpoints)
        current_endpoint = os.getenv("PADDLE_CURRENT_IP") + ":" + port
        worker_endpoints.remove(current_endpoint)

        nccl_id_var = fluid.default_startup_program().global_block().create_var(
            name="NCCLID",
            persistable=True,
            type=fluid.core.VarDesc.VarType.RAW)
        fluid.default_startup_program().global_block().append_op(
            type="gen_nccl_id",
            inputs={},
            outputs={"NCCLID": nccl_id_var},
            attrs={
                "endpoint": current_endpoint,
                "endpoint_list": worker_endpoints,
                "trainer_id": trainer_id
            })
        return nccl_id_var, num_trainers, trainer_id
    else:
X
Xin Pan 已提交
58 59
        raise Exception("must set positive PADDLE_TRAINER_ID env variables for "
                        "nccl-based dist train.")
60 61


62
def dist_transpile(trainer_id, args):
X
Xin Pan 已提交
63
    if trainer_id < 0:
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
        return None, None

    # the port of all pservers, needed by both trainer and pserver
    port = os.getenv("PADDLE_PSERVER_PORT", "6174")
    # comma separated ips of all pservers, needed by trainer and
    # pserver
    pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
    eplist = []
    for ip in pserver_ips.split(","):
        eplist.append(':'.join([ip, port]))
    pserver_endpoints = ",".join(eplist)
    # total number of workers/trainers in the job, needed by
    # trainer and pserver
    trainers = int(os.getenv("PADDLE_TRAINERS"))
    # the IP of the local machine, needed by pserver only
    current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
    # the role, should be either PSERVER or TRAINER
    training_role = os.getenv("PADDLE_TRAINING_ROLE")

    t = distribute_transpiler.DistributeTranspiler()
84 85 86 87 88 89
    t.transpile(
        trainer_id,
        pservers=pserver_endpoints,
        trainers=trainers,
        sync_mode=not args.async_mode,
        slice_var_up=not args.no_split_var)
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
    if training_role == "PSERVER":
        pserver_program = t.get_pserver_program(current_endpoint)
        pserver_startup_program = t.get_startup_program(current_endpoint,
                                                        pserver_program)
        return pserver_program, pserver_startup_program
    elif training_role == "TRAINER":
        train_program = t.get_trainer_program()
        return train_program, fluid.default_startup_program()
    else:
        raise ValueError(
            'TRAINING_ROLE environment variable must be either TRAINER or PSERVER'
        )


def test(exe, inference_program, test_reader, feeder, batch_acc):
    accuracy_evaluator = fluid.metrics.Accuracy()
    for batch_id, data in enumerate(test_reader()):
        acc = exe.run(inference_program,
                      feed=feeder.feed(data),
                      fetch_list=[batch_acc])
        accuracy_evaluator.update(value=np.array(acc), weight=len(data))

    return accuracy_evaluator.eval()


# TODO(wuyi): replace train, train_parallel, test functions with new trainer
# API once it is ready.
def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
          args, train_prog, startup_prog):
    if os.getenv("PADDLE_TRAINING_ROLE") == "PSERVER":
        place = core.CPUPlace()
        exe = fluid.Executor(place)
        exe.run(startup_prog)
        exe.run(train_prog)
        return

126 127 128 129
    if args.use_fake_data:
        raise Exception(
            "fake data is not supported in single GPU test for now.")

130 131 132
    place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
    exe = fluid.Executor(place)
    exe.run(startup_prog)
Y
yi.wu 已提交
133 134 135 136 137 138 139

    if not args.use_reader_op:
        feed_var_list = [
            var for var in train_prog.global_block().vars.itervalues()
            if var.is_data
        ]
        feeder = fluid.DataFeeder(feed_var_list, place)
140 141 142 143

    iters, num_samples, start_time = 0, 0, time.time()
    for pass_id in range(args.pass_num):
        train_losses = []
Y
yi.wu 已提交
144 145
        if not args.use_reader_op:
            reader_generator = train_reader()
146 147 148 149 150
        batch_id = 0
        data = None
        while True:
            if not args.use_reader_op:
                data = next(reader_generator, None)
Y
yi.wu 已提交
151 152 153
                if data == None:
                    break
            if iters == args.iterations:
154
                break
155 156 157
            if iters == args.skip_batch_num:
                start_time = time.time()
                num_samples = 0
158

S
commit  
sneaxiy 已提交
159 160 161 162 163
            if arg.profile and pass_id == 0 and batch_id == 5:
                profiler.start_profiler("All")
            elif args.profile and pass_id == 0 and batch_id == 10:
                profiler.stop_profiler("total", "/tmp/profile")

Y
yi.wu 已提交
164
            if args.use_reader_op:
Y
yi.wu 已提交
165
                try:
S
commit  
sneaxiy 已提交
166 167 168
                    loss = exe.run(train_prog,
                                   fetch_list=[avg_loss],
                                   use_program_cache=True)
Y
yi.wu 已提交
169 170
                except fluid.core.EnforceNotMet as ex:
                    break
Y
yi.wu 已提交
171 172 173
            else:
                loss = exe.run(train_prog,
                               feed=feeder.feed(data),
S
commit  
sneaxiy 已提交
174 175
                               fetch_list=[avg_loss],
                               use_program_cache=True)
176
            iters += 1
177
            batch_id += 1
Y
yi.wu 已提交
178 179 180
            # FIXME(wuyi): For use_reader_op, if the current
            # pass is not the last, the last batch of this pass
            # is also equal to args.batch_size.
Y
update  
yi.wu 已提交
181
            if args.use_reader_op:
Y
yi.wu 已提交
182
                num_samples += args.batch_size * args.gpus
Y
update  
yi.wu 已提交
183 184
            else:
                num_samples += len(data)
185 186 187
            train_losses.append(loss)
            print("Pass: %d, Iter: %d, Loss: %f\n" %
                  (pass_id, iters, np.mean(train_losses)))
L
Luo Tao 已提交
188
        print_train_time(start_time, time.time(), num_samples)
L
Luo Tao 已提交
189
        print("Pass: %d, Loss: %f" % (pass_id, np.mean(train_losses))),
190
        # evaluation
L
Luo Tao 已提交
191
        if not args.no_test and batch_acc:
192 193 194 195 196 197 198 199 200 201 202 203 204
            pass_test_acc = test(exe, infer_prog, test_reader, feeder,
                                 batch_acc)
            print(", Test Accuracy: %f" % pass_test_acc)
        print("\n")
        # TODO(wuyi): add warmup passes to get better perf data.
        exit(0)


# TODO(wuyi): replace train, train_parallel, test functions with new trainer
# API once it is ready.
def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
                   batch_acc, args, train_prog, startup_prog, nccl_id_var,
                   num_trainers, trainer_id):
Y
yi.wu 已提交
205 206 207 208 209 210 211 212
    place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
    if not args.use_reader_op:
        feed_var_list = [
            var for var in train_prog.global_block().vars.itervalues()
            if var.is_data
        ]
        feeder = fluid.DataFeeder(feed_var_list, place)

213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
    # generate fake:
    if args.use_fake_data:
        for var in feed_var_list:
            v = startup_prog.global_block().clone_variable(var)
            var.persistable = True
            v.persistable = True

            real_shape = list(var.shape)
            real_shape[0] = args.batch_size / args.gpus
            startup_prog.global_block().append_op(
                outputs={"Out": v},
                type="fill_constant",
                attrs={"shape": real_shape,
                       "value": 1.0,
                       "dtype": var.dtype})

    if nccl_id_var and trainer_id == 0:
        #FIXME(wuyi): wait other trainer to start listening
        time.sleep(30)

233 234 235 236 237 238 239 240 241 242 243
    startup_exe = fluid.Executor(place)
    startup_exe.run(startup_prog)
    strategy = fluid.ExecutionStrategy()
    strategy.num_threads = 1
    strategy.allow_op_delay = False
    exe = fluid.ParallelExecutor(
        True,
        avg_loss.name,
        exec_strategy=strategy,
        num_trainers=num_trainers,
        trainer_id=trainer_id)
244

245 246 247 248
    for pass_id in range(args.pass_num):
        num_samples = 0
        iters = 0
        start_time = time.time()
Y
yi.wu 已提交
249 250
        if not args.use_reader_op:
            reader_generator = train_reader()
251 252 253 254 255
        batch_id = 0
        data = None
        while True:
            if not args.use_reader_op:
                data = next(reader_generator, None)
Y
yi.wu 已提交
256 257 258
                if data == None:
                    break
            if iters == args.iterations:
259
                break
X
Xin Pan 已提交
260 261 262 263 264
            if args.profile and pass_id == 0 and batch_id == 5:
                profiler.start_profiler("All")
            elif args.profile and pass_id == 0 and batch_id == 10:
                profiler.stop_profiler("total", "/tmp/profile_%d" % trainer_id)

265 266 267
            if iters == args.skip_batch_num:
                start_time = time.time()
                num_samples = 0
Y
yi.wu 已提交
268
            if args.use_fake_data or args.use_reader_op:
Y
yi.wu 已提交
269 270 271 272
                try:
                    loss, = exe.run([avg_loss.name])
                except fluid.core.EnforceNotMet as ex:
                    break
273 274
            else:
                loss, = exe.run([avg_loss.name], feed=feeder.feed(data))
275 276
            if args.update_method == "pserver":
                exe.bcast_params()
Y
update  
yi.wu 已提交
277
            if args.use_reader_op:
Y
yi.wu 已提交
278
                num_samples += args.batch_size * args.gpus
Y
update  
yi.wu 已提交
279 280
            else:
                num_samples += len(data)
281 282 283 284
            iters += 1
            if batch_id % 1 == 0:
                print("Pass %d, batch %d, loss %s" %
                      (pass_id, batch_id, np.array(loss)))
285
            batch_id += 1
Y
yi.wu 已提交
286

L
Luo Tao 已提交
287
        print_train_time(start_time, time.time(), num_samples)
L
Luo Tao 已提交
288
        if not args.no_test and batch_acc:
289 290 291 292 293 294 295 296 297
            test_acc = test(startup_exe, infer_prog, test_reader, feeder,
                            batch_acc)
            print("Pass: %d, Test Accuracy: %f\n" % (pass_id, test_acc))
        exit(0)


def print_arguments(args):
    vars(args)['use_nvprof'] = (vars(args)['use_nvprof'] and
                                vars(args)['device'] == 'GPU')
L
Luo Tao 已提交
298
    print('----------- Configuration Arguments -----------')
299 300 301 302 303
    for arg, value in sorted(vars(args).iteritems()):
        print('%s: %s' % (arg, value))
    print('------------------------------------------------')


L
Luo Tao 已提交
304 305 306 307 308 309 310
def print_train_time(start_time, end_time, num_samples):
    train_elapsed = end_time - start_time
    examples_per_sec = num_samples / train_elapsed
    print('\nTotal examples: %d, total time: %.5f, %.5f examples/sed\n' %
          (num_samples, train_elapsed, examples_per_sec))


311 312 313
def main():
    args = parse_args()
    print_arguments(args)
X
Xin Pan 已提交
314 315 316 317

    # the unique trainer id, starting from 0, needed by trainer
    # only
    nccl_id_var, num_trainers, trainer_id = (
Y
yi.wu 已提交
318
        None, 1, int(os.getenv("PADDLE_TRAINER_ID", "0")))
319 320 321 322 323 324 325 326 327 328 329 330 331

    if args.use_cprof:
        pr = cProfile.Profile()
        pr.enable()
    model_def = __import__("models.%s" % args.model, fromlist=["models"])
    train_args = list(model_def.get_model(args))
    train_args.append(args)
    # Run optimizer.minimize(avg_loss)
    train_args[2].minimize(train_args[0])
    if args.memory_optimize:
        fluid.memory_optimize(fluid.default_main_program())

    if args.update_method == "pserver":
332
        train_prog, startup_prog = dist_transpile(trainer_id, args)
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
        if not train_prog:
            raise Exception(
                "Must configure correct environments to run dist train.")
        train_args.extend([train_prog, startup_prog])
        if args.gpus > 1 and os.getenv("PADDLE_TRAINING_ROLE") == "TRAINER":
            train_args.extend([nccl_id_var, num_trainers, trainer_id])
            train_parallel(*train_args)
        train(*train_args)
        exit(0)

    # for other update methods, use default programs
    train_args.append(fluid.default_main_program())
    train_args.append(fluid.default_startup_program())

    if args.update_method == "nccl2":
X
Xin Pan 已提交
348
        nccl_id_var, num_trainers, trainer_id = append_nccl2_prepare(trainer_id)
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
    if args.gpus == 1:
        # NOTE: parallel executor use profiler interanlly
        if args.use_nvprof and args.device == 'GPU':
            with profiler.cuda_profiler("cuda_profiler.txt", 'csv') as nvprof:
                train(*train_args)
        else:
            train(*train_args)
    else:
        if args.device == "CPU":
            raise Exception("Only support GPU perf with parallel exe")
        train_args.extend([nccl_id_var, num_trainers, trainer_id])
        train_parallel(*train_args)


if __name__ == "__main__":
    main()