fluid_benchmark.py 13.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import cProfile
import time
import os
W
Wu Yi 已提交
19
import traceback
20 21 22 23 24 25 26 27

import numpy as np

import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.profiler as profiler
import paddle.fluid.transpiler.distribute_transpiler as distribute_transpiler

28
from args import *
29 30


W
Wu Yi 已提交
31
def append_nccl2_prepare(trainer_id, startup_prog):
X
Xin Pan 已提交
32
    if trainer_id >= 0:
33 34 35 36 37 38 39 40 41 42 43
        # append gen_nccl_id at the end of startup program
        trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
        port = os.getenv("PADDLE_PSERVER_PORT")
        worker_ips = os.getenv("PADDLE_TRAINER_IPS")
        worker_endpoints = []
        for ip in worker_ips.split(","):
            worker_endpoints.append(':'.join([ip, port]))
        num_trainers = len(worker_endpoints)
        current_endpoint = os.getenv("PADDLE_CURRENT_IP") + ":" + port
        worker_endpoints.remove(current_endpoint)

W
Wu Yi 已提交
44
        nccl_id_var = startup_prog.global_block().create_var(
45 46 47
            name="NCCLID",
            persistable=True,
            type=fluid.core.VarDesc.VarType.RAW)
W
Wu Yi 已提交
48
        startup_prog.global_block().append_op(
49 50 51 52 53 54 55 56 57 58
            type="gen_nccl_id",
            inputs={},
            outputs={"NCCLID": nccl_id_var},
            attrs={
                "endpoint": current_endpoint,
                "endpoint_list": worker_endpoints,
                "trainer_id": trainer_id
            })
        return nccl_id_var, num_trainers, trainer_id
    else:
X
Xin Pan 已提交
59 60
        raise Exception("must set positive PADDLE_TRAINER_ID env variables for "
                        "nccl-based dist train.")
61 62


W
Wu Yi 已提交
63
def dist_transpile(trainer_id, args, train_prog, startup_prog):
X
Xin Pan 已提交
64
    if trainer_id < 0:
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
        return None, None

    # the port of all pservers, needed by both trainer and pserver
    port = os.getenv("PADDLE_PSERVER_PORT", "6174")
    # comma separated ips of all pservers, needed by trainer and
    # pserver
    pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
    eplist = []
    for ip in pserver_ips.split(","):
        eplist.append(':'.join([ip, port]))
    pserver_endpoints = ",".join(eplist)
    # total number of workers/trainers in the job, needed by
    # trainer and pserver
    trainers = int(os.getenv("PADDLE_TRAINERS"))
    # the IP of the local machine, needed by pserver only
    current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
    # the role, should be either PSERVER or TRAINER
    training_role = os.getenv("PADDLE_TRAINING_ROLE")

W
Wu Yi 已提交
84 85 86
    config = distribute_transpiler.DistributeTranspilerConfig()
    config.slice_var_up = not args.no_split_var
    t = distribute_transpiler.DistributeTranspiler(config=config)
87 88
    t.transpile(
        trainer_id,
W
Wu Yi 已提交
89 90 91
        # NOTE: *MUST* use train_prog, for we are using with guard to
        # generate different program for train and test.
        program=train_prog,
92 93
        pservers=pserver_endpoints,
        trainers=trainers,
94
        sync_mode=not args.async_mode)
95 96
    if training_role == "PSERVER":
        pserver_program = t.get_pserver_program(current_endpoint)
W
Wu Yi 已提交
97 98
        pserver_startup_program = t.get_startup_program(
            current_endpoint, pserver_program, startup_program=startup_prog)
99 100 101
        return pserver_program, pserver_startup_program
    elif training_role == "TRAINER":
        train_program = t.get_trainer_program()
W
Wu Yi 已提交
102
        return train_program, startup_prog
103 104
    else:
        raise ValueError(
G
gongweibao 已提交
105
            'PADDLE_TRAINING_ROLE environment variable must be either TRAINER or PSERVER'
106 107 108
        )


W
Wu Yi 已提交
109 110 111 112
def test_parallel(exe, test_args, args, test_prog, feeder):
    acc_evaluators = []
    for i in xrange(len(test_args[2])):
        acc_evaluators.append(fluid.metrics.Accuracy())
113

W
Wu Yi 已提交
114 115 116
    to_fetch = [v.name for v in test_args[2]]
    if args.use_reader_op:
        test_args[4].start()
117
        while True:
W
Wu Yi 已提交
118 119 120 121 122 123 124
            try:
                acc_rets = exe.run(fetch_list=to_fetch)
                for i, e in enumerate(acc_evaluators):
                    e.update(
                        value=np.array(acc_rets[i]), weight=args.batch_size)
            except fluid.core.EOFException as eof:
                test_args[4].reset()
125
                break
W
Wu Yi 已提交
126 127 128 129 130
    else:
        for batch_id, data in enumerate(test_args[3]()):
            acc_rets = exe.run(feed=feeder.feed(data), fetch_list=to_fetch)
            for i, e in enumerate(acc_evaluators):
                e.update(value=np.array(acc_rets[i]), weight=len(data))
131

W
Wu Yi 已提交
132
    return [e.eval() for e in acc_evaluators]
133 134


W
Wu Yi 已提交
135 136 137 138
# NOTE: only need to benchmark using parallelexe
def train_parallel(train_args, test_args, args, train_prog, test_prog,
                   startup_prog, nccl_id_var, num_trainers, trainer_id):
    over_all_start = time.time()
Y
yi.wu 已提交
139
    place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
W
Wu Yi 已提交
140
    feeder = None
Y
yi.wu 已提交
141 142 143 144 145 146
    if not args.use_reader_op:
        feed_var_list = [
            var for var in train_prog.global_block().vars.itervalues()
            if var.is_data
        ]
        feeder = fluid.DataFeeder(feed_var_list, place)
147 148 149
    # generate fake:
    if args.use_fake_data:
        for var in feed_var_list:
W
Wu Yi 已提交
150
            v = startup_prog.global_block()._clone_variable(var)
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
            var.persistable = True
            v.persistable = True

            real_shape = list(var.shape)
            real_shape[0] = args.batch_size / args.gpus
            startup_prog.global_block().append_op(
                outputs={"Out": v},
                type="fill_constant",
                attrs={"shape": real_shape,
                       "value": 1.0,
                       "dtype": var.dtype})

    if nccl_id_var and trainer_id == 0:
        #FIXME(wuyi): wait other trainer to start listening
        time.sleep(30)

167 168 169
    startup_exe = fluid.Executor(place)
    startup_exe.run(startup_prog)
    strategy = fluid.ExecutionStrategy()
W
Wu Yi 已提交
170
    strategy.num_threads = args.cpus
171
    strategy.allow_op_delay = False
W
Wu Yi 已提交
172 173 174 175 176 177 178 179 180
    avg_loss = train_args[0]

    if args.update_method == "pserver":
        # parameter server mode distributed training, merge
        # gradients on local server, do not initialize
        # ParallelExecutor with multi server all-reduce mode.
        num_trainers = 1
        trainer_id = 0

181 182 183
    exe = fluid.ParallelExecutor(
        True,
        avg_loss.name,
W
Wu Yi 已提交
184
        main_program=train_prog,
185 186 187
        exec_strategy=strategy,
        num_trainers=num_trainers,
        trainer_id=trainer_id)
188

W
Wu Yi 已提交
189 190 191 192 193 194 195 196 197
    if not args.no_test:
        if args.update_method == "pserver":
            test_scope = None
        else:
            # NOTE: use an empty scope to avoid test exe using NCCLID
            test_scope = fluid.Scope()
        test_exe = fluid.ParallelExecutor(
            True, main_program=test_prog, share_vars_from=exe)

198 199 200 201
    for pass_id in range(args.pass_num):
        num_samples = 0
        iters = 0
        start_time = time.time()
Y
yi.wu 已提交
202
        if not args.use_reader_op:
W
Wu Yi 已提交
203
            reader_generator = train_args[3]()  #train_reader
204 205
        batch_id = 0
        data = None
W
Wu Yi 已提交
206 207
        if args.use_reader_op:
            train_args[4].start()
208 209 210
        while True:
            if not args.use_reader_op:
                data = next(reader_generator, None)
Y
yi.wu 已提交
211 212
                if data == None:
                    break
W
Wu Yi 已提交
213 214 215 216 217 218 219
            if args.profile and batch_id == 5:
                profiler.start_profiler("All")
                profiler.reset_profiler()
            elif args.profile and batch_id == 10:
                print("profiling total time: ", time.time() - start_time)
                profiler.stop_profiler("total", "/tmp/profile_%d_pass%d" %
                                       (trainer_id, pass_id))
Y
yi.wu 已提交
220
            if iters == args.iterations:
221
                reader_generator.close()
222
                break
X
Xin Pan 已提交
223

224 225 226
            if iters == args.skip_batch_num:
                start_time = time.time()
                num_samples = 0
W
Wu Yi 已提交
227 228 229 230
            fetch_list = [avg_loss.name]
            acc_name_list = [v.name for v in train_args[2]]
            fetch_list.extend(acc_name_list)

Y
yi.wu 已提交
231
            if args.use_fake_data or args.use_reader_op:
Y
yi.wu 已提交
232
                try:
W
Wu Yi 已提交
233 234 235 236

                    fetch_ret = exe.run(fetch_list)
                except fluid.core.EOFException as eof:
                    break
Y
yi.wu 已提交
237
                except fluid.core.EnforceNotMet as ex:
W
Wu Yi 已提交
238
                    traceback.print_exc()
Y
yi.wu 已提交
239
                    break
240
            else:
W
Wu Yi 已提交
241
                fetch_ret = exe.run(fetch_list, feed=feeder.feed(data))
Y
update  
yi.wu 已提交
242
            if args.use_reader_op:
Y
yi.wu 已提交
243
                num_samples += args.batch_size * args.gpus
Y
update  
yi.wu 已提交
244 245
            else:
                num_samples += len(data)
W
Wu Yi 已提交
246

247 248
            iters += 1
            if batch_id % 1 == 0:
W
Wu Yi 已提交
249 250 251
                fetched_data = [np.mean(np.array(d)) for d in fetch_ret]
                print("Pass %d, batch %d, loss %s, accucacys: %s" %
                      (pass_id, batch_id, fetched_data[0], fetched_data[1:]))
252
            batch_id += 1
Y
yi.wu 已提交
253

L
Luo Tao 已提交
254
        print_train_time(start_time, time.time(), num_samples)
W
Wu Yi 已提交
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
        if args.use_reader_op:
            train_args[4].reset()  # reset reader handle
        else:
            del reader_generator

        if not args.no_test and test_args[2]:
            test_feeder = None
            if not args.use_reader_op:
                test_feed_var_list = [
                    var for var in test_prog.global_block().vars.itervalues()
                    if var.is_data
                ]
                test_feeder = fluid.DataFeeder(test_feed_var_list, place)
            test_ret = test_parallel(test_exe, test_args, args, test_prog,
                                     test_feeder)
            print("Pass: %d, Test Accuracy: %s\n" %
                  (pass_id, [np.mean(np.array(v)) for v in test_ret]))

    print("total train time: ", time.time() - over_all_start)
274 275 276 277 278


def print_arguments(args):
    vars(args)['use_nvprof'] = (vars(args)['use_nvprof'] and
                                vars(args)['device'] == 'GPU')
L
Luo Tao 已提交
279
    print('----------- Configuration Arguments -----------')
280 281 282 283 284
    for arg, value in sorted(vars(args).iteritems()):
        print('%s: %s' % (arg, value))
    print('------------------------------------------------')


L
Luo Tao 已提交
285 286 287 288 289 290 291
def print_train_time(start_time, end_time, num_samples):
    train_elapsed = end_time - start_time
    examples_per_sec = num_samples / train_elapsed
    print('\nTotal examples: %d, total time: %.5f, %.5f examples/sed\n' %
          (num_samples, train_elapsed, examples_per_sec))


292 293 294 295 296 297 298 299
def print_paddle_envs():
    print('----------- Configuration envs -----------')
    for k in os.environ:
        if "PADDLE_" in k:
            print "ENV %s:%s" % (k, os.environ[k])
    print('------------------------------------------------')


300 301 302
def main():
    args = parse_args()
    print_arguments(args)
303
    print_paddle_envs()
304 305
    if args.no_random:
        fluid.default_startup_program().random_seed = 1
X
Xin Pan 已提交
306 307 308 309

    # the unique trainer id, starting from 0, needed by trainer
    # only
    nccl_id_var, num_trainers, trainer_id = (
Y
yi.wu 已提交
310
        None, 1, int(os.getenv("PADDLE_TRAINER_ID", "0")))
311 312 313 314

    if args.use_cprof:
        pr = cProfile.Profile()
        pr.enable()
W
Wu Yi 已提交
315

316
    model_def = __import__("models.%s" % args.model, fromlist=["models"])
W
Wu Yi 已提交
317 318 319 320 321 322 323 324 325

    train_prog = fluid.Program()
    test_prog = fluid.Program()
    startup_prog = fluid.Program()

    train_args = list(model_def.get_model(args, True, train_prog, startup_prog))
    test_args = list(model_def.get_model(args, False, test_prog, startup_prog))

    all_args = [train_args, test_args, args]
326 327

    if args.update_method == "pserver":
W
Wu Yi 已提交
328 329
        train_prog, startup_prog = dist_transpile(trainer_id, args, train_prog,
                                                  startup_prog)
330 331 332
        if not train_prog:
            raise Exception(
                "Must configure correct environments to run dist train.")
W
Wu Yi 已提交
333
        all_args.extend([train_prog, test_prog, startup_prog])
334
        if args.gpus > 1 and os.getenv("PADDLE_TRAINING_ROLE") == "TRAINER":
W
Wu Yi 已提交
335 336 337 338 339 340 341
            all_args.extend([nccl_id_var, num_trainers, trainer_id])
            train_parallel(*all_args)
        elif os.getenv("PADDLE_TRAINING_ROLE") == "PSERVER":
            # start pserver with Executor
            server_exe = fluid.Executor(fluid.CPUPlace())
            server_exe.run(startup_prog)
            server_exe.run(train_prog)
342 343 344
        exit(0)

    # for other update methods, use default programs
W
Wu Yi 已提交
345
    all_args.extend([train_prog, test_prog, startup_prog])
346 347

    if args.update_method == "nccl2":
W
Wu Yi 已提交
348 349 350 351 352 353 354
        nccl_id_var, num_trainers, trainer_id = append_nccl2_prepare(
            trainer_id, startup_prog)

    if args.device == "CPU":
        raise Exception("Only support GPU perf with parallel exe")
    all_args.extend([nccl_id_var, num_trainers, trainer_id])
    train_parallel(*all_args)
355 356 357 358


if __name__ == "__main__":
    main()