fluid_benchmark.py 13.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import cProfile
import time
import os
W
Wu Yi 已提交
19
import traceback
20 21 22 23 24 25 26 27

import numpy as np

import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.profiler as profiler
import paddle.fluid.transpiler.distribute_transpiler as distribute_transpiler

28
from args import *
29 30


W
Wu Yi 已提交
31
def append_nccl2_prepare(trainer_id, startup_prog):
X
Xin Pan 已提交
32
    if trainer_id >= 0:
33 34 35 36 37 38 39 40 41 42 43
        # append gen_nccl_id at the end of startup program
        trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
        port = os.getenv("PADDLE_PSERVER_PORT")
        worker_ips = os.getenv("PADDLE_TRAINER_IPS")
        worker_endpoints = []
        for ip in worker_ips.split(","):
            worker_endpoints.append(':'.join([ip, port]))
        num_trainers = len(worker_endpoints)
        current_endpoint = os.getenv("PADDLE_CURRENT_IP") + ":" + port
        worker_endpoints.remove(current_endpoint)

W
Wu Yi 已提交
44
        nccl_id_var = startup_prog.global_block().create_var(
45 46 47
            name="NCCLID",
            persistable=True,
            type=fluid.core.VarDesc.VarType.RAW)
W
Wu Yi 已提交
48
        startup_prog.global_block().append_op(
49 50 51 52 53 54 55 56 57 58
            type="gen_nccl_id",
            inputs={},
            outputs={"NCCLID": nccl_id_var},
            attrs={
                "endpoint": current_endpoint,
                "endpoint_list": worker_endpoints,
                "trainer_id": trainer_id
            })
        return nccl_id_var, num_trainers, trainer_id
    else:
X
Xin Pan 已提交
59 60
        raise Exception("must set positive PADDLE_TRAINER_ID env variables for "
                        "nccl-based dist train.")
61 62


W
Wu Yi 已提交
63
def dist_transpile(trainer_id, args, train_prog, startup_prog):
X
Xin Pan 已提交
64
    if trainer_id < 0:
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
        return None, None

    # the port of all pservers, needed by both trainer and pserver
    port = os.getenv("PADDLE_PSERVER_PORT", "6174")
    # comma separated ips of all pservers, needed by trainer and
    # pserver
    pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
    eplist = []
    for ip in pserver_ips.split(","):
        eplist.append(':'.join([ip, port]))
    pserver_endpoints = ",".join(eplist)
    # total number of workers/trainers in the job, needed by
    # trainer and pserver
    trainers = int(os.getenv("PADDLE_TRAINERS"))
    # the IP of the local machine, needed by pserver only
    current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
    # the role, should be either PSERVER or TRAINER
    training_role = os.getenv("PADDLE_TRAINING_ROLE")

W
Wu Yi 已提交
84 85 86
    config = distribute_transpiler.DistributeTranspilerConfig()
    config.slice_var_up = not args.no_split_var
    t = distribute_transpiler.DistributeTranspiler(config=config)
87 88
    t.transpile(
        trainer_id,
W
Wu Yi 已提交
89 90 91
        # NOTE: *MUST* use train_prog, for we are using with guard to
        # generate different program for train and test.
        program=train_prog,
92 93
        pservers=pserver_endpoints,
        trainers=trainers,
Y
Yancey1989 已提交
94 95
        sync_mode=not args.async_mode,
        startup_program=startup_prog)
96 97
    if training_role == "PSERVER":
        pserver_program = t.get_pserver_program(current_endpoint)
W
Wu Yi 已提交
98 99
        pserver_startup_program = t.get_startup_program(
            current_endpoint, pserver_program, startup_program=startup_prog)
100 101 102
        return pserver_program, pserver_startup_program
    elif training_role == "TRAINER":
        train_program = t.get_trainer_program()
W
Wu Yi 已提交
103
        return train_program, startup_prog
104 105
    else:
        raise ValueError(
G
gongweibao 已提交
106
            'PADDLE_TRAINING_ROLE environment variable must be either TRAINER or PSERVER'
107 108 109
        )


W
Wu Yi 已提交
110 111 112 113
def test_parallel(exe, test_args, args, test_prog, feeder):
    acc_evaluators = []
    for i in xrange(len(test_args[2])):
        acc_evaluators.append(fluid.metrics.Accuracy())
114

W
Wu Yi 已提交
115 116 117
    to_fetch = [v.name for v in test_args[2]]
    if args.use_reader_op:
        test_args[4].start()
118
        while True:
W
Wu Yi 已提交
119 120 121 122 123 124 125
            try:
                acc_rets = exe.run(fetch_list=to_fetch)
                for i, e in enumerate(acc_evaluators):
                    e.update(
                        value=np.array(acc_rets[i]), weight=args.batch_size)
            except fluid.core.EOFException as eof:
                test_args[4].reset()
126
                break
W
Wu Yi 已提交
127 128 129 130 131
    else:
        for batch_id, data in enumerate(test_args[3]()):
            acc_rets = exe.run(feed=feeder.feed(data), fetch_list=to_fetch)
            for i, e in enumerate(acc_evaluators):
                e.update(value=np.array(acc_rets[i]), weight=len(data))
132

W
Wu Yi 已提交
133
    return [e.eval() for e in acc_evaluators]
134 135


W
Wu Yi 已提交
136 137 138 139
# NOTE: only need to benchmark using parallelexe
def train_parallel(train_args, test_args, args, train_prog, test_prog,
                   startup_prog, nccl_id_var, num_trainers, trainer_id):
    over_all_start = time.time()
Y
yi.wu 已提交
140
    place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
W
Wu Yi 已提交
141
    feeder = None
Y
yi.wu 已提交
142 143 144 145 146 147
    if not args.use_reader_op:
        feed_var_list = [
            var for var in train_prog.global_block().vars.itervalues()
            if var.is_data
        ]
        feeder = fluid.DataFeeder(feed_var_list, place)
148 149 150
    # generate fake:
    if args.use_fake_data:
        for var in feed_var_list:
W
Wu Yi 已提交
151
            v = startup_prog.global_block()._clone_variable(var)
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
            var.persistable = True
            v.persistable = True

            real_shape = list(var.shape)
            real_shape[0] = args.batch_size / args.gpus
            startup_prog.global_block().append_op(
                outputs={"Out": v},
                type="fill_constant",
                attrs={"shape": real_shape,
                       "value": 1.0,
                       "dtype": var.dtype})

    if nccl_id_var and trainer_id == 0:
        #FIXME(wuyi): wait other trainer to start listening
        time.sleep(30)

168 169 170
    startup_exe = fluid.Executor(place)
    startup_exe.run(startup_prog)
    strategy = fluid.ExecutionStrategy()
W
Wu Yi 已提交
171
    strategy.num_threads = args.cpus
172
    strategy.allow_op_delay = False
Y
Yancey1989 已提交
173 174 175 176 177 178 179
    build_strategy = fluid.BuildStrategy()
    if args.reduce_strategy == "reduce":
        build_strategy.reduce_strategy = fluid.BuildStrategy(
        ).ReduceStrategy.Reduce
    else:
        build_strategy.reduce_strategy = fluid.BuildStrategy(
        ).ReduceStrategy.AllReduce
180
    build_strategy.fuse_broadcast_op = args.fuse_broadcast_op
Y
Yancey1989 已提交
181

W
Wu Yi 已提交
182 183 184 185 186 187 188 189 190
    avg_loss = train_args[0]

    if args.update_method == "pserver":
        # parameter server mode distributed training, merge
        # gradients on local server, do not initialize
        # ParallelExecutor with multi server all-reduce mode.
        num_trainers = 1
        trainer_id = 0

191 192 193
    exe = fluid.ParallelExecutor(
        True,
        avg_loss.name,
W
Wu Yi 已提交
194
        main_program=train_prog,
195
        exec_strategy=strategy,
Y
Yancey1989 已提交
196
        build_strategy=build_strategy,
197 198
        num_trainers=num_trainers,
        trainer_id=trainer_id)
199

W
Wu Yi 已提交
200 201 202 203 204 205 206 207 208
    if not args.no_test:
        if args.update_method == "pserver":
            test_scope = None
        else:
            # NOTE: use an empty scope to avoid test exe using NCCLID
            test_scope = fluid.Scope()
        test_exe = fluid.ParallelExecutor(
            True, main_program=test_prog, share_vars_from=exe)

209 210 211 212
    for pass_id in range(args.pass_num):
        num_samples = 0
        iters = 0
        start_time = time.time()
Y
yi.wu 已提交
213
        if not args.use_reader_op:
W
Wu Yi 已提交
214
            reader_generator = train_args[3]()  #train_reader
215 216
        batch_id = 0
        data = None
W
Wu Yi 已提交
217 218
        if args.use_reader_op:
            train_args[4].start()
219 220 221
        while True:
            if not args.use_reader_op:
                data = next(reader_generator, None)
Y
yi.wu 已提交
222 223
                if data == None:
                    break
W
Wu Yi 已提交
224 225 226 227 228 229 230
            if args.profile and batch_id == 5:
                profiler.start_profiler("All")
                profiler.reset_profiler()
            elif args.profile and batch_id == 10:
                print("profiling total time: ", time.time() - start_time)
                profiler.stop_profiler("total", "/tmp/profile_%d_pass%d" %
                                       (trainer_id, pass_id))
Y
yi.wu 已提交
231
            if iters == args.iterations:
232
                reader_generator.close()
233
                break
X
Xin Pan 已提交
234

235 236 237
            if iters == args.skip_batch_num:
                start_time = time.time()
                num_samples = 0
W
Wu Yi 已提交
238 239 240 241
            fetch_list = [avg_loss.name]
            acc_name_list = [v.name for v in train_args[2]]
            fetch_list.extend(acc_name_list)

Y
yi.wu 已提交
242
            if args.use_fake_data or args.use_reader_op:
Y
yi.wu 已提交
243
                try:
W
Wu Yi 已提交
244 245 246
                    fetch_ret = exe.run(fetch_list)
                except fluid.core.EOFException as eof:
                    break
Y
yi.wu 已提交
247
                except fluid.core.EnforceNotMet as ex:
W
Wu Yi 已提交
248
                    traceback.print_exc()
Y
yi.wu 已提交
249
                    break
250
            else:
W
Wu Yi 已提交
251
                fetch_ret = exe.run(fetch_list, feed=feeder.feed(data))
Y
update  
yi.wu 已提交
252
            if args.use_reader_op:
Y
yi.wu 已提交
253
                num_samples += args.batch_size * args.gpus
Y
update  
yi.wu 已提交
254 255
            else:
                num_samples += len(data)
W
Wu Yi 已提交
256

257 258
            iters += 1
            if batch_id % 1 == 0:
W
Wu Yi 已提交
259 260 261
                fetched_data = [np.mean(np.array(d)) for d in fetch_ret]
                print("Pass %d, batch %d, loss %s, accucacys: %s" %
                      (pass_id, batch_id, fetched_data[0], fetched_data[1:]))
262
            batch_id += 1
Y
yi.wu 已提交
263

L
Luo Tao 已提交
264
        print_train_time(start_time, time.time(), num_samples)
W
Wu Yi 已提交
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
        if args.use_reader_op:
            train_args[4].reset()  # reset reader handle
        else:
            del reader_generator

        if not args.no_test and test_args[2]:
            test_feeder = None
            if not args.use_reader_op:
                test_feed_var_list = [
                    var for var in test_prog.global_block().vars.itervalues()
                    if var.is_data
                ]
                test_feeder = fluid.DataFeeder(test_feed_var_list, place)
            test_ret = test_parallel(test_exe, test_args, args, test_prog,
                                     test_feeder)
            print("Pass: %d, Test Accuracy: %s\n" %
                  (pass_id, [np.mean(np.array(v)) for v in test_ret]))

    print("total train time: ", time.time() - over_all_start)
284 285 286 287 288


def print_arguments(args):
    vars(args)['use_nvprof'] = (vars(args)['use_nvprof'] and
                                vars(args)['device'] == 'GPU')
L
Luo Tao 已提交
289
    print('----------- Configuration Arguments -----------')
290 291 292 293 294
    for arg, value in sorted(vars(args).iteritems()):
        print('%s: %s' % (arg, value))
    print('------------------------------------------------')


L
Luo Tao 已提交
295 296 297 298 299 300 301
def print_train_time(start_time, end_time, num_samples):
    train_elapsed = end_time - start_time
    examples_per_sec = num_samples / train_elapsed
    print('\nTotal examples: %d, total time: %.5f, %.5f examples/sed\n' %
          (num_samples, train_elapsed, examples_per_sec))


302 303 304 305 306 307 308 309
def print_paddle_envs():
    print('----------- Configuration envs -----------')
    for k in os.environ:
        if "PADDLE_" in k:
            print "ENV %s:%s" % (k, os.environ[k])
    print('------------------------------------------------')


310 311 312
def main():
    args = parse_args()
    print_arguments(args)
313
    print_paddle_envs()
314 315
    if args.no_random:
        fluid.default_startup_program().random_seed = 1
X
Xin Pan 已提交
316 317 318 319

    # the unique trainer id, starting from 0, needed by trainer
    # only
    nccl_id_var, num_trainers, trainer_id = (
Y
yi.wu 已提交
320
        None, 1, int(os.getenv("PADDLE_TRAINER_ID", "0")))
321 322 323 324

    if args.use_cprof:
        pr = cProfile.Profile()
        pr.enable()
W
Wu Yi 已提交
325

326
    model_def = __import__("models.%s" % args.model, fromlist=["models"])
W
Wu Yi 已提交
327 328 329 330 331 332 333 334 335

    train_prog = fluid.Program()
    test_prog = fluid.Program()
    startup_prog = fluid.Program()

    train_args = list(model_def.get_model(args, True, train_prog, startup_prog))
    test_args = list(model_def.get_model(args, False, test_prog, startup_prog))

    all_args = [train_args, test_args, args]
336 337

    if args.update_method == "pserver":
W
Wu Yi 已提交
338 339
        train_prog, startup_prog = dist_transpile(trainer_id, args, train_prog,
                                                  startup_prog)
340 341 342
        if not train_prog:
            raise Exception(
                "Must configure correct environments to run dist train.")
W
Wu Yi 已提交
343
        all_args.extend([train_prog, test_prog, startup_prog])
344
        if args.gpus > 1 and os.getenv("PADDLE_TRAINING_ROLE") == "TRAINER":
W
Wu Yi 已提交
345 346 347 348 349 350 351
            all_args.extend([nccl_id_var, num_trainers, trainer_id])
            train_parallel(*all_args)
        elif os.getenv("PADDLE_TRAINING_ROLE") == "PSERVER":
            # start pserver with Executor
            server_exe = fluid.Executor(fluid.CPUPlace())
            server_exe.run(startup_prog)
            server_exe.run(train_prog)
352 353 354
        exit(0)

    # for other update methods, use default programs
W
Wu Yi 已提交
355
    all_args.extend([train_prog, test_prog, startup_prog])
356 357

    if args.update_method == "nccl2":
W
Wu Yi 已提交
358 359 360 361 362 363 364
        nccl_id_var, num_trainers, trainer_id = append_nccl2_prepare(
            trainer_id, startup_prog)

    if args.device == "CPU":
        raise Exception("Only support GPU perf with parallel exe")
    all_args.extend([nccl_id_var, num_trainers, trainer_id])
    train_parallel(*all_args)
365 366 367 368


if __name__ == "__main__":
    main()