diff --git a/benchmark/fluid/Dockerfile b/benchmark/fluid/Dockerfile index 46140a9d1be01a50cd74dab2799e3731e8d3debd..5d98a9b3c42a9c0ee4be45d80f43a00d060f61aa 100644 --- a/benchmark/fluid/Dockerfile +++ b/benchmark/fluid/Dockerfile @@ -19,4 +19,4 @@ ADD *.whl / RUN pip install /*.whl && rm -f /*.whl && chmod +x /usr/bin/paddle_k8s ENV LD_LIBRARY_PATH=/usr/local/lib -ADD fluid_benchmark.py dataset.py models/ /workspace/ +ADD fluid_benchmark.py recordio_converter.py models/ /workspace/ diff --git a/benchmark/fluid/README.md b/benchmark/fluid/README.md index 1b0c7dce8bd6faab0c4c59caa1cbe337483cbd16..c17cab999b5f706599fce6a8547aa44f2536e73e 100644 --- a/benchmark/fluid/README.md +++ b/benchmark/fluid/README.md @@ -42,6 +42,15 @@ Currently supported `--model` argument include: PADDLE_PSERVER_PORT=7164 PADDLE_TRAINER_IPS=192.168.0.2,192.168.0.3 PADDLE_CURRENT_IP=127.0.0.1 PADDLE_TRAINER_ID=0 python fluid_benchmark.py --model mnist --device GPU --update_method nccl2 ``` +## Prepare the RecordIO file to Achieve Better Performance + +Run the following command will generate RecordIO files like "mnist.recordio" under the path +and batch_size you choose: + +```bash +python -c 'from recordio_converter import *; prepare_mnist("data", 32)' +``` + ## Run Distributed Benchmark on Kubernetes Cluster You may need to build a Docker image before submitting a cluster job onto Kubernetes, or you will diff --git a/benchmark/fluid/fluid_benchmark.py b/benchmark/fluid/fluid_benchmark.py index c1d458970a58bfac2a3369e8964eb100568b28f2..9dce7d564784dd5beae4dc89e5b4e02480f4ae40 100644 --- a/benchmark/fluid/fluid_benchmark.py +++ b/benchmark/fluid/fluid_benchmark.py @@ -44,7 +44,6 @@ def parse_args(): type=float, default=0.001, help='The minibatch size.') - # TODO(wuyi): add "--use_fake_data" option back. parser.add_argument( '--skip_batch_num', type=int, @@ -106,6 +105,16 @@ def parse_args(): default='local', choices=['local', 'pserver', 'nccl2'], help='Choose parameter update method, can be local, pserver, nccl2.') + parser.add_argument( + '--use_reader_op', + action='store_true', + help='Whether to use reader op, and must specify the data path if set this to true.' + ) + parser.add_argument( + '--data_path', + type=str, + default="", + help='Directory that contains all the training recordio files.') args = parser.parse_args() return args @@ -208,11 +217,13 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0) exe = fluid.Executor(place) exe.run(startup_prog) - feed_var_list = [ - var for var in train_prog.global_block().vars.itervalues() - if var.is_data - ] - feeder = fluid.DataFeeder(feed_var_list, place) + + if not args.use_reader_op: + feed_var_list = [ + var for var in train_prog.global_block().vars.itervalues() + if var.is_data + ] + feeder = fluid.DataFeeder(feed_var_list, place) iters, num_samples, start_time = 0, 0, time.time() for pass_id in range(args.pass_num): @@ -223,9 +234,12 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, num_samples = 0 if iters == args.iterations: break - loss = exe.run(train_prog, - feed=feeder.feed(data), - fetch_list=[avg_loss]) + if args.use_reader_op: + loss = exe.run(train_prog, fetch_list=[avg_loss]) + else: + loss = exe.run(train_prog, + feed=feeder.feed(data), + fetch_list=[avg_loss]) iters += 1 num_samples += len(data) train_losses.append(loss) @@ -251,10 +265,14 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, args, train_prog, startup_prog, nccl_id_var, num_trainers, trainer_id): - feed_var_list = [ - var for var in train_prog.global_block().vars.itervalues() - if var.is_data - ] + place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0) + if not args.use_reader_op: + feed_var_list = [ + var for var in train_prog.global_block().vars.itervalues() + if var.is_data + ] + feeder = fluid.DataFeeder(feed_var_list, place) + # generate fake: if args.use_fake_data: for var in feed_var_list: @@ -271,7 +289,6 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, "value": 1.0, "dtype": var.dtype}) - place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0) if nccl_id_var and trainer_id == 0: #FIXME(wuyi): wait other trainer to start listening time.sleep(30) @@ -288,7 +305,6 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, num_trainers=num_trainers, trainer_id=trainer_id) - feeder = fluid.DataFeeder(feed_var_list, place) for pass_id in range(args.pass_num): num_samples = 0 iters = 0 @@ -304,7 +320,10 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, num_samples = 0 if iters == args.iterations: break - if args.use_fake_data: + # NOTE: if use reader ops, the input data is not splited to multiple cards + if args.use_reader_op and iters >= args.iterations / args.gpus: + break + if args.use_fake_data or args.use_reader_op: loss, = exe.run([avg_loss.name]) else: loss, = exe.run([avg_loss.name], feed=feeder.feed(data)) @@ -316,6 +335,8 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, print("Pass %d, batch %d, loss %s" % (pass_id, batch_id, np.array(loss))) train_elapsed = time.time() - start_time + if args.use_reader_op: + num_samples = num_samples * args.gpus examples_per_sec = num_samples / train_elapsed print('\nTotal examples: %d, total time: %.5f, %.5f examples/sed\n' % (num_samples, train_elapsed, examples_per_sec)) @@ -342,7 +363,7 @@ def main(): # the unique trainer id, starting from 0, needed by trainer # only nccl_id_var, num_trainers, trainer_id = ( - None, 1, int(os.getenv("PADDLE_TRAINER_ID", "-1"))) + None, 1, int(os.getenv("PADDLE_TRAINER_ID", "0"))) if args.use_cprof: pr = cProfile.Profile() diff --git a/benchmark/fluid/models/machine_translation.py b/benchmark/fluid/models/machine_translation.py index 635b3373dd27b21f83afae10b1d24833b81d57eb..3024882725dd8b02ca0991f3d70f13bf1d9808e1 100644 --- a/benchmark/fluid/models/machine_translation.py +++ b/benchmark/fluid/models/machine_translation.py @@ -197,6 +197,8 @@ def lodtensor_to_ndarray(lod_tensor): def get_model(args): + if args.use_reader_op: + raise Exception("machine_translation do not support reader op for now.") embedding_dim = 512 encoder_size = 512 decoder_size = 512 diff --git a/benchmark/fluid/models/mnist.py b/benchmark/fluid/models/mnist.py index d264bfc12bdb159c06dae81db4949b9ee17268e2..5d3da68dafa6c0cea6aafd678c5425f2f6a361aa 100644 --- a/benchmark/fluid/models/mnist.py +++ b/benchmark/fluid/models/mnist.py @@ -20,6 +20,7 @@ import numpy as np import argparse import time import cProfile +import os import paddle import paddle.fluid as fluid @@ -65,9 +66,23 @@ def cnn_model(data): def get_model(args): - # Input data - images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE) - label = fluid.layers.data(name='label', shape=[1], dtype='int64') + if args.use_reader_op: + filelist = [ + os.path.join(args.data_path, f) for f in os.listdir(args.data_path) + ] + data_file = fluid.layers.open_files( + filenames=filelist, + shapes=[[-1, 1, 28, 28], (-1, 1)], + lod_levels=[0, 0], + dtypes=["float32", "int64"], + thread_num=args.gpus) + data_file = fluid.layers.double_buffer( + fluid.layers.batch( + data_file, batch_size=args.batch_size)) + images, label = fluid.layers.read_file(data_file) + else: + images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE) + label = fluid.layers.data(name='label', shape=[1], dtype='int64') # Train program predict = cnn_model(images) diff --git a/benchmark/fluid/models/resnet.py b/benchmark/fluid/models/resnet.py index 9dec8911ed64e09285fb461c4a12adb601535316..35daad6664bd51484faf1b99d7fbba86e057b58f 100644 --- a/benchmark/fluid/models/resnet.py +++ b/benchmark/fluid/models/resnet.py @@ -19,6 +19,7 @@ from __future__ import print_function import functools import numpy as np import time +import os import cProfile, pstats, StringIO @@ -129,9 +130,24 @@ def get_model(args): else: dshape = [224, 224, 3] model = resnet_imagenet + if args.use_reader_op: + filelist = [ + os.path.join(args.data_path, f) for f in os.listdir(args.data_path) + ] + data_file = fluid.layers.open_files( + filenames=filelist, + shapes=[[-1] + dshape, (-1, 1)], + lod_levels=[0, 0], + dtypes=["float32", "int64"], + thread_num=args.gpus) + data_file = fluid.layers.double_buffer( + fluid.layers.batch( + data_file, batch_size=args.batch_size)) + input, label = fluid.layers.read_file(data_file) + else: + input = fluid.layers.data(name='data', shape=dshape, dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') - input = fluid.layers.data(name='data', shape=dshape, dtype='float32') - label = fluid.layers.data(name='label', shape=[1], dtype='int64') predict = model(input, class_dim) cost = fluid.layers.cross_entropy(input=predict, label=label) avg_cost = fluid.layers.mean(x=cost) diff --git a/benchmark/fluid/models/stacked_dynamic_lstm.py b/benchmark/fluid/models/stacked_dynamic_lstm.py index 81a28b5f3aed0c325398b909d700c23df545824a..e2a8cf45ac3f2cb5c4023dba176d9d1670799ed3 100644 --- a/benchmark/fluid/models/stacked_dynamic_lstm.py +++ b/benchmark/fluid/models/stacked_dynamic_lstm.py @@ -44,6 +44,9 @@ def crop_sentence(reader, crop_size): def get_model(args): + if args.use_reader_op: + raise Exception( + "stacked_dynamic_lstm do not support reader op for now.") lstm_size = 512 emb_dim = 512 crop_size = 1500 diff --git a/benchmark/fluid/models/vgg.py b/benchmark/fluid/models/vgg.py index 53856c5f7acd3a4e1476ec57154a880bb6f984c9..b84e118a88e53ebef0cfe26b21c0a4edc5107d20 100644 --- a/benchmark/fluid/models/vgg.py +++ b/benchmark/fluid/models/vgg.py @@ -22,6 +22,7 @@ import paddle.fluid as fluid import paddle.fluid.core as core import argparse import functools +import os def vgg16_bn_drop(input): @@ -65,9 +66,23 @@ def get_model(args): else: data_shape = [224, 224, 3] - # Input data - images = fluid.layers.data(name='pixel', shape=data_shape, dtype='float32') - label = fluid.layers.data(name='label', shape=[1], dtype='int64') + if args.use_reader_op: + filelist = [ + os.path.join(args.data_path, f) for f in os.listdir(args.data_path) + ] + data_file = fluid.layers.open_files( + filenames=filelist, + shapes=[[-1] + data_shape, (-1, 1)], + lod_levels=[0, 0], + dtypes=["float32", "int64"], + thread_num=args.gpus) + data_file = fluid.layers.double_buffer( + fluid.layers.batch( + data_file, batch_size=args.batch_size)) + images, label = fluid.layers.read_file(data_file) + else: + images = fluid.layers.data(name='data', shape=dshape, dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') # Train program net = vgg16_bn_drop(images) diff --git a/benchmark/fluid/recordio_converter.py b/benchmark/fluid/recordio_converter.py new file mode 100644 index 0000000000000000000000000000000000000000..c69062c4a1f576503c82971de8fa035382950268 --- /dev/null +++ b/benchmark/fluid/recordio_converter.py @@ -0,0 +1,133 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import paddle +import paddle.fluid as fluid +import paddle.fluid.core as core +from paddle.dataset import mnist, cifar, flowers, image + + +def convert_2_recordio(py_reader, outfilepath, batch_size, shape_data, + shape_label): + num_batches = 0 + with fluid.program_guard(fluid.Program(), fluid.Program()): + reader = paddle.batch(py_reader(), batch_size=batch_size) + feeder = fluid.DataFeeder( + feed_list=[ # order is image and label + fluid.layers.data( + name='image', shape=shape_data), + fluid.layers.data( + name='label', shape=shape_label, dtype='int64'), + ], + place=fluid.CPUPlace()) + num_batches = fluid.recordio_writer.convert_reader_to_recordio_file( + outfilepath, reader, feeder) + return num_batches + + +def prepare_mnist(outpath, batch_size): + outfilepath = os.path.join(outpath, "mnist.recordio") + convert_2_recordio(mnist.train, outfilepath, batch_size, [784], [1]) + + +def prepare_cifar10(outpath, batch_size): + outfilepath = os.path.join(outpath, "cifar.recordio") + convert_2_recordio(cifar.train10, outfilepath, batch_size, [3, 32, 32], [1]) + + +def prepare_flowers(outpath, batch_size): + outfilepath = os.path.join(outpath, "flowers.recordio") + convert_2_recordio(flowers.train, outfilepath, batch_size, [3, 224, 224], + [1]) + + +def imagenet_train(data_dir): + contents = os.listdir(data_dir) + if set(contents) != set( + ["train", "train.txt", "val", "val_set", "val.txt", "unzip.sh"]): + raise Exception("Imagenet data contents error!") + img2label = dict() + imgfilelist = [] + with open(os.path.join(data_dir, "train.txt")) as fn: + while 1: + l = fn.readline() + if not l: + break + img, lbl = l[:-1].split(" ") + img2label[img] = int(lbl) + imgfilelist.append(img) + + def train_reader(): + for idx, imgfile in enumerate(imgfilelist): + data = image.load_image( + os.path.join(data_dir, "train", imgfile.lower())) + label = [img2label[imgfile], ] + yield [data, label] + + def default_mapper(sample): + img, label = sample + img = image.simple_transform( + img, 256, 224, True, mean=[103.94, 116.78, 123.68]) + return img.flatten().astype('float32'), label + + return paddle.reader.map_readers(default_mapper, train_reader) + + +# FIXME(wuyi): delete this when https://github.com/PaddlePaddle/Paddle/pull/11066 is merged +def convert_reader_to_recordio_files( + filename, + batch_per_file, + reader_creator, + feeder, + compressor=core.RecordIOWriter.Compressor.Snappy, + max_num_records=1000, + feed_order=None): + if feed_order is None: + feed_order = feeder.feed_names + f_name, f_ext = os.path.splitext(filename) + assert (f_ext == ".recordio") + + lines = [] + f_idx = 0 + counter = 0 + for idx, batch in enumerate(reader_creator()): + lines.append(batch) + if idx >= batch_per_file and idx % batch_per_file == 0: + filename = "%s-%05d%s" % (f_name, f_idx, f_ext) + with fluid.recordio_writer.create_recordio_writer( + filename, compressor, max_num_records) as writer: + for l in lines: + res = feeder.feed(l) + for each in feed_order: + writer.append_tensor(res[each]) + writer.complete_append_tensor() + counter += 1 + lines = [] + f_idx += 1 + print("written file: ", filename) + return counter + + +def prepare_imagenet(inpath, outpath, batch_size): + r = paddle.batch(imagenet_train(inpath), batch_size=batch_size) + feeder = fluid.DataFeeder( + feed_list=[ + fluid.layers.data( + name="image", shape=[3, 224, 224]), fluid.layers.data( + name="label", shape=[1], dtype='int64') + ], + place=fluid.CPUPlace()) + outpath = os.path.join(outpath, "imagenet.recordio") + convert_reader_to_recordio_files(outpath, 10000, r, feeder) diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 8758ac9f94ab91b5be5fc70917c64db38997d1c1..a56f3ea9db6b9fabf9d78f102d394a0817a44a98 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -434,7 +434,7 @@ def open_files(filenames, shapes, lod_levels, dtypes, - thread_num, + thread_num=1, buffer_size=None, pass_num=1, for_parallel=True):