fluid_benchmark.py 13.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import cProfile
import time
import os

import numpy as np

import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.profiler as profiler
import paddle.fluid.transpiler.distribute_transpiler as distribute_transpiler

27
from args import *
28 29


X
Xin Pan 已提交
30 31
def append_nccl2_prepare(trainer_id):
    if trainer_id >= 0:
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
        # append gen_nccl_id at the end of startup program
        trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
        port = os.getenv("PADDLE_PSERVER_PORT")
        worker_ips = os.getenv("PADDLE_TRAINER_IPS")
        worker_endpoints = []
        for ip in worker_ips.split(","):
            worker_endpoints.append(':'.join([ip, port]))
        num_trainers = len(worker_endpoints)
        current_endpoint = os.getenv("PADDLE_CURRENT_IP") + ":" + port
        worker_endpoints.remove(current_endpoint)

        nccl_id_var = fluid.default_startup_program().global_block().create_var(
            name="NCCLID",
            persistable=True,
            type=fluid.core.VarDesc.VarType.RAW)
        fluid.default_startup_program().global_block().append_op(
            type="gen_nccl_id",
            inputs={},
            outputs={"NCCLID": nccl_id_var},
            attrs={
                "endpoint": current_endpoint,
                "endpoint_list": worker_endpoints,
                "trainer_id": trainer_id
            })
        return nccl_id_var, num_trainers, trainer_id
    else:
X
Xin Pan 已提交
58 59
        raise Exception("must set positive PADDLE_TRAINER_ID env variables for "
                        "nccl-based dist train.")
60 61


62
def dist_transpile(trainer_id, args):
X
Xin Pan 已提交
63
    if trainer_id < 0:
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
        return None, None

    # the port of all pservers, needed by both trainer and pserver
    port = os.getenv("PADDLE_PSERVER_PORT", "6174")
    # comma separated ips of all pservers, needed by trainer and
    # pserver
    pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
    eplist = []
    for ip in pserver_ips.split(","):
        eplist.append(':'.join([ip, port]))
    pserver_endpoints = ",".join(eplist)
    # total number of workers/trainers in the job, needed by
    # trainer and pserver
    trainers = int(os.getenv("PADDLE_TRAINERS"))
    # the IP of the local machine, needed by pserver only
    current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
    # the role, should be either PSERVER or TRAINER
    training_role = os.getenv("PADDLE_TRAINING_ROLE")

    t = distribute_transpiler.DistributeTranspiler()
84 85 86 87 88 89
    t.transpile(
        trainer_id,
        pservers=pserver_endpoints,
        trainers=trainers,
        sync_mode=not args.async_mode,
        slice_var_up=not args.no_split_var)
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
    if training_role == "PSERVER":
        pserver_program = t.get_pserver_program(current_endpoint)
        pserver_startup_program = t.get_startup_program(current_endpoint,
                                                        pserver_program)
        return pserver_program, pserver_startup_program
    elif training_role == "TRAINER":
        train_program = t.get_trainer_program()
        return train_program, fluid.default_startup_program()
    else:
        raise ValueError(
            'TRAINING_ROLE environment variable must be either TRAINER or PSERVER'
        )


def test(exe, inference_program, test_reader, feeder, batch_acc):
    accuracy_evaluator = fluid.metrics.Accuracy()
    for batch_id, data in enumerate(test_reader()):
        acc = exe.run(inference_program,
                      feed=feeder.feed(data),
                      fetch_list=[batch_acc])
        accuracy_evaluator.update(value=np.array(acc), weight=len(data))

    return accuracy_evaluator.eval()


# TODO(wuyi): replace train, train_parallel, test functions with new trainer
# API once it is ready.
def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
          args, train_prog, startup_prog):
    if os.getenv("PADDLE_TRAINING_ROLE") == "PSERVER":
        place = core.CPUPlace()
        exe = fluid.Executor(place)
        exe.run(startup_prog)
        exe.run(train_prog)
        return

126 127 128 129
    if args.use_fake_data:
        raise Exception(
            "fake data is not supported in single GPU test for now.")

130 131 132
    place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
    exe = fluid.Executor(place)
    exe.run(startup_prog)
Y
yi.wu 已提交
133 134 135 136 137 138 139

    if not args.use_reader_op:
        feed_var_list = [
            var for var in train_prog.global_block().vars.itervalues()
            if var.is_data
        ]
        feeder = fluid.DataFeeder(feed_var_list, place)
140 141 142 143

    iters, num_samples, start_time = 0, 0, time.time()
    for pass_id in range(args.pass_num):
        train_losses = []
Y
yi.wu 已提交
144 145
        if not args.use_reader_op:
            reader_generator = train_reader()
146 147 148 149 150
        batch_id = 0
        data = None
        while True:
            if not args.use_reader_op:
                data = next(reader_generator, None)
Y
yi.wu 已提交
151 152 153
                if data == None:
                    break
            if iters == args.iterations:
154
                break
155 156 157
            if iters == args.skip_batch_num:
                start_time = time.time()
                num_samples = 0
158

Y
yi.wu 已提交
159
            if args.use_reader_op:
Y
yi.wu 已提交
160
                try:
S
sneaxiy 已提交
161
                    loss = exe.run(train_prog, fetch_list=[avg_loss])
Y
yi.wu 已提交
162 163
                except fluid.core.EnforceNotMet as ex:
                    break
Y
yi.wu 已提交
164 165 166
            else:
                loss = exe.run(train_prog,
                               feed=feeder.feed(data),
S
sneaxiy 已提交
167
                               fetch_list=[avg_loss])
168
            iters += 1
169
            batch_id += 1
Y
yi.wu 已提交
170 171 172
            # FIXME(wuyi): For use_reader_op, if the current
            # pass is not the last, the last batch of this pass
            # is also equal to args.batch_size.
Y
update  
yi.wu 已提交
173
            if args.use_reader_op:
Y
yi.wu 已提交
174
                num_samples += args.batch_size * args.gpus
Y
update  
yi.wu 已提交
175 176
            else:
                num_samples += len(data)
177 178 179
            train_losses.append(loss)
            print("Pass: %d, Iter: %d, Loss: %f\n" %
                  (pass_id, iters, np.mean(train_losses)))
L
Luo Tao 已提交
180
        print_train_time(start_time, time.time(), num_samples)
L
Luo Tao 已提交
181
        print("Pass: %d, Loss: %f" % (pass_id, np.mean(train_losses))),
182
        # evaluation
G
guochaorong 已提交
183
        if not args.no_test and batch_acc and not args.use_reader_op:
184 185 186 187 188 189 190 191 192 193 194 195 196
            pass_test_acc = test(exe, infer_prog, test_reader, feeder,
                                 batch_acc)
            print(", Test Accuracy: %f" % pass_test_acc)
        print("\n")
        # TODO(wuyi): add warmup passes to get better perf data.
        exit(0)


# TODO(wuyi): replace train, train_parallel, test functions with new trainer
# API once it is ready.
def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
                   batch_acc, args, train_prog, startup_prog, nccl_id_var,
                   num_trainers, trainer_id):
Y
yi.wu 已提交
197 198 199 200 201 202 203 204
    place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
    if not args.use_reader_op:
        feed_var_list = [
            var for var in train_prog.global_block().vars.itervalues()
            if var.is_data
        ]
        feeder = fluid.DataFeeder(feed_var_list, place)

205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
    # generate fake:
    if args.use_fake_data:
        for var in feed_var_list:
            v = startup_prog.global_block().clone_variable(var)
            var.persistable = True
            v.persistable = True

            real_shape = list(var.shape)
            real_shape[0] = args.batch_size / args.gpus
            startup_prog.global_block().append_op(
                outputs={"Out": v},
                type="fill_constant",
                attrs={"shape": real_shape,
                       "value": 1.0,
                       "dtype": var.dtype})

    if nccl_id_var and trainer_id == 0:
        #FIXME(wuyi): wait other trainer to start listening
        time.sleep(30)

225 226 227 228 229 230 231 232 233 234 235
    startup_exe = fluid.Executor(place)
    startup_exe.run(startup_prog)
    strategy = fluid.ExecutionStrategy()
    strategy.num_threads = 1
    strategy.allow_op_delay = False
    exe = fluid.ParallelExecutor(
        True,
        avg_loss.name,
        exec_strategy=strategy,
        num_trainers=num_trainers,
        trainer_id=trainer_id)
236

237 238 239 240
    for pass_id in range(args.pass_num):
        num_samples = 0
        iters = 0
        start_time = time.time()
Y
yi.wu 已提交
241 242
        if not args.use_reader_op:
            reader_generator = train_reader()
243 244 245 246 247
        batch_id = 0
        data = None
        while True:
            if not args.use_reader_op:
                data = next(reader_generator, None)
Y
yi.wu 已提交
248 249 250
                if data == None:
                    break
            if iters == args.iterations:
251
                break
X
Xin Pan 已提交
252 253 254 255 256
            if args.profile and pass_id == 0 and batch_id == 5:
                profiler.start_profiler("All")
            elif args.profile and pass_id == 0 and batch_id == 10:
                profiler.stop_profiler("total", "/tmp/profile_%d" % trainer_id)

257 258 259
            if iters == args.skip_batch_num:
                start_time = time.time()
                num_samples = 0
Y
yi.wu 已提交
260
            if args.use_fake_data or args.use_reader_op:
Y
yi.wu 已提交
261 262 263 264
                try:
                    loss, = exe.run([avg_loss.name])
                except fluid.core.EnforceNotMet as ex:
                    break
265 266
            else:
                loss, = exe.run([avg_loss.name], feed=feeder.feed(data))
267 268
            if args.update_method == "pserver":
                exe.bcast_params()
Y
update  
yi.wu 已提交
269
            if args.use_reader_op:
Y
yi.wu 已提交
270
                num_samples += args.batch_size * args.gpus
Y
update  
yi.wu 已提交
271 272
            else:
                num_samples += len(data)
273 274 275 276
            iters += 1
            if batch_id % 1 == 0:
                print("Pass %d, batch %d, loss %s" %
                      (pass_id, batch_id, np.array(loss)))
277
            batch_id += 1
Y
yi.wu 已提交
278

L
Luo Tao 已提交
279
        print_train_time(start_time, time.time(), num_samples)
G
guochaorong 已提交
280 281 282
        if not args.no_test and batch_acc and not args.use_reader_op:
            # we have not implement record io for test
            # skip test when use args.use_reader_op
283 284 285 286 287 288 289 290
            test_acc = test(startup_exe, infer_prog, test_reader, feeder,
                            batch_acc)
            print("Pass: %d, Test Accuracy: %f\n" % (pass_id, test_acc))


def print_arguments(args):
    vars(args)['use_nvprof'] = (vars(args)['use_nvprof'] and
                                vars(args)['device'] == 'GPU')
L
Luo Tao 已提交
291
    print('----------- Configuration Arguments -----------')
292 293 294 295 296
    for arg, value in sorted(vars(args).iteritems()):
        print('%s: %s' % (arg, value))
    print('------------------------------------------------')


L
Luo Tao 已提交
297 298 299 300 301 302 303
def print_train_time(start_time, end_time, num_samples):
    train_elapsed = end_time - start_time
    examples_per_sec = num_samples / train_elapsed
    print('\nTotal examples: %d, total time: %.5f, %.5f examples/sed\n' %
          (num_samples, train_elapsed, examples_per_sec))


304 305 306
def main():
    args = parse_args()
    print_arguments(args)
X
Xin Pan 已提交
307 308 309 310

    # the unique trainer id, starting from 0, needed by trainer
    # only
    nccl_id_var, num_trainers, trainer_id = (
Y
yi.wu 已提交
311
        None, 1, int(os.getenv("PADDLE_TRAINER_ID", "0")))
312 313 314 315 316 317 318 319 320 321 322 323 324

    if args.use_cprof:
        pr = cProfile.Profile()
        pr.enable()
    model_def = __import__("models.%s" % args.model, fromlist=["models"])
    train_args = list(model_def.get_model(args))
    train_args.append(args)
    # Run optimizer.minimize(avg_loss)
    train_args[2].minimize(train_args[0])
    if args.memory_optimize:
        fluid.memory_optimize(fluid.default_main_program())

    if args.update_method == "pserver":
325
        train_prog, startup_prog = dist_transpile(trainer_id, args)
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
        if not train_prog:
            raise Exception(
                "Must configure correct environments to run dist train.")
        train_args.extend([train_prog, startup_prog])
        if args.gpus > 1 and os.getenv("PADDLE_TRAINING_ROLE") == "TRAINER":
            train_args.extend([nccl_id_var, num_trainers, trainer_id])
            train_parallel(*train_args)
        train(*train_args)
        exit(0)

    # for other update methods, use default programs
    train_args.append(fluid.default_main_program())
    train_args.append(fluid.default_startup_program())

    if args.update_method == "nccl2":
X
Xin Pan 已提交
341
        nccl_id_var, num_trainers, trainer_id = append_nccl2_prepare(trainer_id)
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
    if args.gpus == 1:
        # NOTE: parallel executor use profiler interanlly
        if args.use_nvprof and args.device == 'GPU':
            with profiler.cuda_profiler("cuda_profiler.txt", 'csv') as nvprof:
                train(*train_args)
        else:
            train(*train_args)
    else:
        if args.device == "CPU":
            raise Exception("Only support GPU perf with parallel exe")
        train_args.extend([nccl_id_var, num_trainers, trainer_id])
        train_parallel(*train_args)


if __name__ == "__main__":
    main()