未验证 提交 635099c1 编写于 作者: W Wu Yi 提交者: GitHub

Merge pull request #11121 from typhoonzero/fluid_benchmark_support_recordioreader

Fluid benchmark support recordio reader
...@@ -19,4 +19,4 @@ ADD *.whl / ...@@ -19,4 +19,4 @@ ADD *.whl /
RUN pip install /*.whl && rm -f /*.whl && chmod +x /usr/bin/paddle_k8s RUN pip install /*.whl && rm -f /*.whl && chmod +x /usr/bin/paddle_k8s
ENV LD_LIBRARY_PATH=/usr/local/lib ENV LD_LIBRARY_PATH=/usr/local/lib
ADD fluid_benchmark.py dataset.py models/ /workspace/ ADD fluid_benchmark.py recordio_converter.py models/ /workspace/
...@@ -44,6 +44,16 @@ Currently supported `--model` argument include: ...@@ -44,6 +44,16 @@ Currently supported `--model` argument include:
PADDLE_PSERVER_PORT=7164 PADDLE_TRAINER_IPS=192.168.0.2,192.168.0.3 PADDLE_CURRENT_IP=127.0.0.1 PADDLE_TRAINER_ID=0 python fluid_benchmark.py --model mnist --device GPU --update_method nccl2 PADDLE_PSERVER_PORT=7164 PADDLE_TRAINER_IPS=192.168.0.2,192.168.0.3 PADDLE_CURRENT_IP=127.0.0.1 PADDLE_TRAINER_ID=0 python fluid_benchmark.py --model mnist --device GPU --update_method nccl2
``` ```
## Prepare the RecordIO file to Achieve Better Performance
Run the following command will generate RecordIO files like "mnist.recordio" under the path
and batch_size you choose, you can use batch_size=1 so that later reader can change the batch_size
at any time using `fluid.batch`.
```bash
python -c 'from recordio_converter import *; prepare_mnist("data", 1)'
```
## Run Distributed Benchmark on Kubernetes Cluster ## Run Distributed Benchmark on Kubernetes Cluster
You may need to build a Docker image before submitting a cluster job onto Kubernetes, or you will You may need to build a Docker image before submitting a cluster job onto Kubernetes, or you will
......
...@@ -38,10 +38,12 @@ def parse_args(): ...@@ -38,10 +38,12 @@ def parse_args():
default='resnet', default='resnet',
help='The model to run benchmark with.') help='The model to run benchmark with.')
parser.add_argument( parser.add_argument(
'--batch_size', type=int, default=32, help='The minibatch size.') '--batch_size',
type=int,
default=32,
help='The batch size on each gpu.')
parser.add_argument( parser.add_argument(
'--learning_rate', type=float, default=0.001, help='The learning rate.') '--learning_rate', type=float, default=0.001, help='The learning rate.')
# TODO(wuyi): add "--use_fake_data" option back.
parser.add_argument( parser.add_argument(
'--skip_batch_num', '--skip_batch_num',
type=int, type=int,
...@@ -49,7 +51,10 @@ def parse_args(): ...@@ -49,7 +51,10 @@ def parse_args():
help='The first num of minibatch num to skip, for better performance test' help='The first num of minibatch num to skip, for better performance test'
) )
parser.add_argument( parser.add_argument(
'--iterations', type=int, default=80, help='The number of minibatches.') '--iterations',
type=int,
default=80,
help='The number of minibatches, set to -1 to run all batches.')
parser.add_argument( parser.add_argument(
'--pass_num', type=int, default=100, help='The number of passes.') '--pass_num', type=int, default=100, help='The number of passes.')
parser.add_argument( parser.add_argument(
...@@ -69,6 +74,7 @@ def parse_args(): ...@@ -69,6 +74,7 @@ def parse_args():
type=int, type=int,
default=1, default=1,
help='If gpus > 1, will use ParallelExecutor to run, else use Executor.') help='If gpus > 1, will use ParallelExecutor to run, else use Executor.')
# this option is available only for vgg and resnet.
parser.add_argument( parser.add_argument(
'--cpus', '--cpus',
type=int, type=int,
...@@ -78,7 +84,7 @@ def parse_args(): ...@@ -78,7 +84,7 @@ def parse_args():
'--data_set', '--data_set',
type=str, type=str,
default='flowers', default='flowers',
choices=['cifar10', 'flowers'], choices=['cifar10', 'flowers', 'imagenet'],
help='Optional dataset for benchmark.') help='Optional dataset for benchmark.')
parser.add_argument( parser.add_argument(
'--infer_only', action='store_true', help='If set, run forward only.') '--infer_only', action='store_true', help='If set, run forward only.')
...@@ -108,6 +114,16 @@ def parse_args(): ...@@ -108,6 +114,16 @@ def parse_args():
default='local', default='local',
choices=['local', 'pserver', 'nccl2'], choices=['local', 'pserver', 'nccl2'],
help='Choose parameter update method, can be local, pserver, nccl2.') help='Choose parameter update method, can be local, pserver, nccl2.')
parser.add_argument(
'--use_reader_op',
action='store_true',
help='Whether to use reader op, and must specify the data path if set this to true.'
)
parser.add_argument(
'--data_path',
type=str,
default="",
help='Directory that contains all the training recordio files.')
args = parser.parse_args() args = parser.parse_args()
return args return args
...@@ -210,26 +226,50 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, ...@@ -210,26 +226,50 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0) place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(startup_prog) exe.run(startup_prog)
feed_var_list = [
var for var in train_prog.global_block().vars.itervalues() if not args.use_reader_op:
if var.is_data feed_var_list = [
] var for var in train_prog.global_block().vars.itervalues()
feeder = fluid.DataFeeder(feed_var_list, place) if var.is_data
]
feeder = fluid.DataFeeder(feed_var_list, place)
iters, num_samples, start_time = 0, 0, time.time() iters, num_samples, start_time = 0, 0, time.time()
for pass_id in range(args.pass_num): for pass_id in range(args.pass_num):
train_losses = [] train_losses = []
for batch_id, data in enumerate(train_reader()): if not args.use_reader_op:
reader_generator = train_reader()
batch_id = 0
data = None
while True:
if not args.use_reader_op:
data = next(reader_generator, None)
if data == None:
break
if iters == args.iterations:
break
if iters == args.skip_batch_num: if iters == args.skip_batch_num:
start_time = time.time() start_time = time.time()
num_samples = 0 num_samples = 0
if iters == args.iterations:
break if args.use_reader_op:
loss = exe.run(train_prog, try:
feed=feeder.feed(data), loss = exe.run(train_prog, fetch_list=[avg_loss])
fetch_list=[avg_loss]) except fluid.core.EnforceNotMet as ex:
break
else:
loss = exe.run(train_prog,
feed=feeder.feed(data),
fetch_list=[avg_loss])
iters += 1 iters += 1
num_samples += len(data) batch_id += 1
# FIXME(wuyi): For use_reader_op, if the current
# pass is not the last, the last batch of this pass
# is also equal to args.batch_size.
if args.use_reader_op:
num_samples += args.batch_size * args.gpus
else:
num_samples += len(data)
train_losses.append(loss) train_losses.append(loss)
print("Pass: %d, Iter: %d, Loss: %f\n" % print("Pass: %d, Iter: %d, Loss: %f\n" %
(pass_id, iters, np.mean(train_losses))) (pass_id, iters, np.mean(train_losses)))
...@@ -250,10 +290,14 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc, ...@@ -250,10 +290,14 @@ def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
batch_acc, args, train_prog, startup_prog, nccl_id_var, batch_acc, args, train_prog, startup_prog, nccl_id_var,
num_trainers, trainer_id): num_trainers, trainer_id):
feed_var_list = [ place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
var for var in train_prog.global_block().vars.itervalues() if not args.use_reader_op:
if var.is_data feed_var_list = [
] var for var in train_prog.global_block().vars.itervalues()
if var.is_data
]
feeder = fluid.DataFeeder(feed_var_list, place)
# generate fake: # generate fake:
if args.use_fake_data: if args.use_fake_data:
for var in feed_var_list: for var in feed_var_list:
...@@ -270,7 +314,6 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, ...@@ -270,7 +314,6 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
"value": 1.0, "value": 1.0,
"dtype": var.dtype}) "dtype": var.dtype})
place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
if nccl_id_var and trainer_id == 0: if nccl_id_var and trainer_id == 0:
#FIXME(wuyi): wait other trainer to start listening #FIXME(wuyi): wait other trainer to start listening
time.sleep(30) time.sleep(30)
...@@ -287,12 +330,21 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, ...@@ -287,12 +330,21 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
num_trainers=num_trainers, num_trainers=num_trainers,
trainer_id=trainer_id) trainer_id=trainer_id)
feeder = fluid.DataFeeder(feed_var_list, place)
for pass_id in range(args.pass_num): for pass_id in range(args.pass_num):
num_samples = 0 num_samples = 0
iters = 0 iters = 0
start_time = time.time() start_time = time.time()
for batch_id, data in enumerate(train_reader()): if not args.use_reader_op:
reader_generator = train_reader()
batch_id = 0
data = None
while True:
if not args.use_reader_op:
data = next(reader_generator, None)
if data == None:
break
if iters == args.iterations:
break
if args.profile and pass_id == 0 and batch_id == 5: if args.profile and pass_id == 0 and batch_id == 5:
profiler.start_profiler("All") profiler.start_profiler("All")
elif args.profile and pass_id == 0 and batch_id == 10: elif args.profile and pass_id == 0 and batch_id == 10:
...@@ -301,19 +353,26 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, ...@@ -301,19 +353,26 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
if iters == args.skip_batch_num: if iters == args.skip_batch_num:
start_time = time.time() start_time = time.time()
num_samples = 0 num_samples = 0
if iters == args.iterations: if args.use_fake_data or args.use_reader_op:
break try:
if args.use_fake_data: loss, = exe.run([avg_loss.name])
loss, = exe.run([avg_loss.name]) except fluid.core.EnforceNotMet as ex:
break
else: else:
loss, = exe.run([avg_loss.name], feed=feeder.feed(data)) loss, = exe.run([avg_loss.name], feed=feeder.feed(data))
if args.update_method == "pserver": if args.update_method == "pserver":
exe.bcast_params() exe.bcast_params()
num_samples += len(data) if args.use_reader_op:
num_samples += args.batch_size * args.gpus
else:
num_samples += len(data)
iters += 1 iters += 1
if batch_id % 1 == 0: if batch_id % 1 == 0:
print("Pass %d, batch %d, loss %s" % print("Pass %d, batch %d, loss %s" %
(pass_id, batch_id, np.array(loss))) (pass_id, batch_id, np.array(loss)))
batch_id += 1
if args.use_reader_op:
num_samples = num_samples * args.gpus
print_train_time(start_time, time.time(), num_samples) print_train_time(start_time, time.time(), num_samples)
if not args.no_test and batch_acc: if not args.no_test and batch_acc:
test_acc = test(startup_exe, infer_prog, test_reader, feeder, test_acc = test(startup_exe, infer_prog, test_reader, feeder,
......
...@@ -197,6 +197,8 @@ def lodtensor_to_ndarray(lod_tensor): ...@@ -197,6 +197,8 @@ def lodtensor_to_ndarray(lod_tensor):
def get_model(args): def get_model(args):
if args.use_reader_op:
raise Exception("machine_translation do not support reader op for now.")
embedding_dim = 512 embedding_dim = 512
encoder_size = 512 encoder_size = 512
decoder_size = 512 decoder_size = 512
...@@ -221,7 +223,7 @@ def get_model(args): ...@@ -221,7 +223,7 @@ def get_model(args):
train_batch_generator = paddle.batch( train_batch_generator = paddle.batch(
paddle.reader.shuffle( paddle.reader.shuffle(
paddle.dataset.wmt14.train(dict_size), buf_size=1000), paddle.dataset.wmt14.train(dict_size), buf_size=1000),
batch_size=args.batch_size) batch_size=args.batch_size * args.gpus)
test_batch_generator = paddle.batch( test_batch_generator = paddle.batch(
paddle.reader.shuffle( paddle.reader.shuffle(
......
...@@ -20,6 +20,7 @@ import numpy as np ...@@ -20,6 +20,7 @@ import numpy as np
import argparse import argparse
import time import time
import cProfile import cProfile
import os
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
...@@ -65,9 +66,24 @@ def cnn_model(data): ...@@ -65,9 +66,24 @@ def cnn_model(data):
def get_model(args): def get_model(args):
# Input data if args.use_reader_op:
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE) filelist = [
label = fluid.layers.data(name='label', shape=[1], dtype='int64') os.path.join(args.data_path, f) for f in os.listdir(args.data_path)
]
data_file = fluid.layers.open_files(
filenames=filelist,
shapes=[[-1, 1, 28, 28], (-1, 1)],
lod_levels=[0, 0],
dtypes=["float32", "int64"],
thread_num=args.gpus,
pass_num=args.pass_num)
data_file = fluid.layers.double_buffer(
fluid.layers.batch(
data_file, batch_size=args.batch_size))
images, label = fluid.layers.read_file(data_file)
else:
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE)
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
if args.device == 'CPU' and args.cpus > 1: if args.device == 'CPU' and args.cpus > 1:
places = fluid.layers.get_places(args.cpus) places = fluid.layers.get_places(args.cpus)
...@@ -103,7 +119,7 @@ def get_model(args): ...@@ -103,7 +119,7 @@ def get_model(args):
# Reader # Reader
train_reader = paddle.batch( train_reader = paddle.batch(
paddle.dataset.mnist.train(), batch_size=args.batch_size) paddle.dataset.mnist.train(), batch_size=args.batch_size * args.gpus)
test_reader = paddle.batch( test_reader = paddle.batch(
paddle.dataset.mnist.test(), batch_size=args.batch_size) paddle.dataset.mnist.test(), batch_size=args.batch_size)
return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc
...@@ -19,6 +19,7 @@ from __future__ import print_function ...@@ -19,6 +19,7 @@ from __future__ import print_function
import functools import functools
import numpy as np import numpy as np
import time import time
import os
import cProfile, pstats, StringIO import cProfile, pstats, StringIO
...@@ -26,6 +27,7 @@ import paddle ...@@ -26,6 +27,7 @@ import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.core as core import paddle.fluid.core as core
import paddle.fluid.profiler as profiler import paddle.fluid.profiler as profiler
from recordio_converter import imagenet_train, imagenet_test
def conv_bn_layer(input, ch_out, filter_size, stride, padding, act='relu'): def conv_bn_layer(input, ch_out, filter_size, stride, padding, act='relu'):
...@@ -122,16 +124,48 @@ def get_model(args): ...@@ -122,16 +124,48 @@ def get_model(args):
else: else:
dshape = [32, 32, 3] dshape = [32, 32, 3]
model = resnet_cifar10 model = resnet_cifar10
else: train_reader = paddle.dataset.cifar.train10()
test_reader = paddle.dataset.cifar.test10()
elif args.data_set == "flowers":
class_dim = 102 class_dim = 102
if args.data_format == 'NCHW': if args.data_format == 'NCHW':
dshape = [3, 224, 224] dshape = [3, 224, 224]
else: else:
dshape = [224, 224, 3] dshape = [224, 224, 3]
model = resnet_imagenet model = resnet_imagenet
train_reader = paddle.dataset.flowers.train()
input = fluid.layers.data(name='data', shape=dshape, dtype='float32') test_reader = paddle.dataset.flowers.test()
label = fluid.layers.data(name='label', shape=[1], dtype='int64') elif args.data_set == "imagenet":
class_dim = 1000
if args.data_format == 'NCHW':
dshape = [3, 224, 224]
else:
dshape = [224, 224, 3]
model = resnet_imagenet
if not args.data_path:
raise Exception(
"Must specify --data_path when training with imagenet")
train_reader = imagenet_train(args.data_path)
test_reader = imagenet_test(args.data_path)
if args.use_reader_op:
filelist = [
os.path.join(args.data_path, f) for f in os.listdir(args.data_path)
]
data_file = fluid.layers.open_files(
filenames=filelist,
shapes=[[-1] + dshape, (-1, 1)],
lod_levels=[0, 0],
dtypes=["float32", "int64"],
thread_num=args.gpus,
pass_num=args.pass_num)
data_file = fluid.layers.double_buffer(
fluid.layers.batch(
data_file, batch_size=args.batch_size))
input, label = fluid.layers.read_file(data_file)
else:
input = fluid.layers.data(name='data', shape=dshape, dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
if args.device == 'CPU' and args.cpus > 1: if args.device == 'CPU' and args.cpus > 1:
places = fluid.layers.get_places(args.cpus) places = fluid.layers.get_places(args.cpus)
...@@ -162,15 +196,10 @@ def get_model(args): ...@@ -162,15 +196,10 @@ def get_model(args):
optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9) optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9)
train_reader = paddle.batch( batched_train_reader = paddle.batch(
paddle.reader.shuffle( paddle.reader.shuffle(
paddle.dataset.cifar.train10() train_reader, buf_size=5120),
if args.data_set == 'cifar10' else paddle.dataset.flowers.train(), batch_size=args.batch_size * args.gpus)
buf_size=5120), batched_test_reader = paddle.batch(train_reader, batch_size=args.batch_size)
batch_size=args.batch_size)
test_reader = paddle.batch( return avg_cost, inference_program, optimizer, batched_train_reader, batched_test_reader, batch_acc
paddle.dataset.cifar.test10()
if args.data_set == 'cifar10' else paddle.dataset.flowers.test(),
batch_size=args.batch_size)
return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc
...@@ -44,6 +44,9 @@ def crop_sentence(reader, crop_size): ...@@ -44,6 +44,9 @@ def crop_sentence(reader, crop_size):
def get_model(args): def get_model(args):
if args.use_reader_op:
raise Exception(
"stacked_dynamic_lstm do not support reader op for now.")
lstm_size = 512 lstm_size = 512
emb_dim = 512 emb_dim = 512
crop_size = 1500 crop_size = 1500
...@@ -114,7 +117,7 @@ def get_model(args): ...@@ -114,7 +117,7 @@ def get_model(args):
train_reader = batch( train_reader = batch(
paddle.reader.shuffle( paddle.reader.shuffle(
crop_sentence(imdb.train(word_dict), crop_size), buf_size=25000), crop_sentence(imdb.train(word_dict), crop_size), buf_size=25000),
batch_size=args.batch_size) batch_size=args.batch_size * args.gpus)
test_reader = batch( test_reader = batch(
paddle.reader.shuffle( paddle.reader.shuffle(
crop_sentence(imdb.test(word_dict), crop_size), buf_size=25000), crop_sentence(imdb.test(word_dict), crop_size), buf_size=25000),
......
...@@ -22,6 +22,7 @@ import paddle.fluid as fluid ...@@ -22,6 +22,7 @@ import paddle.fluid as fluid
import paddle.fluid.core as core import paddle.fluid.core as core
import argparse import argparse
import functools import functools
import os
def vgg16_bn_drop(input): def vgg16_bn_drop(input):
...@@ -65,9 +66,24 @@ def get_model(args): ...@@ -65,9 +66,24 @@ def get_model(args):
else: else:
data_shape = [224, 224, 3] data_shape = [224, 224, 3]
# Input data if args.use_reader_op:
images = fluid.layers.data(name='pixel', shape=data_shape, dtype='float32') filelist = [
label = fluid.layers.data(name='label', shape=[1], dtype='int64') os.path.join(args.data_path, f) for f in os.listdir(args.data_path)
]
data_file = fluid.layers.open_files(
filenames=filelist,
shapes=[[-1] + data_shape, (-1, 1)],
lod_levels=[0, 0],
dtypes=["float32", "int64"],
thread_num=args.gpus,
pass_num=args.pass_num)
data_file = fluid.layers.double_buffer(
fluid.layers.batch(
data_file, batch_size=args.batch_size))
images, label = fluid.layers.read_file(data_file)
else:
images = fluid.layers.data(name='data', shape=dshape, dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
# Train program # Train program
net = vgg16_bn_drop(images) net = vgg16_bn_drop(images)
...@@ -95,7 +111,7 @@ def get_model(args): ...@@ -95,7 +111,7 @@ def get_model(args):
paddle.dataset.cifar.train10() paddle.dataset.cifar.train10()
if args.data_set == 'cifar10' else paddle.dataset.flowers.train(), if args.data_set == 'cifar10' else paddle.dataset.flowers.train(),
buf_size=5120), buf_size=5120),
batch_size=args.batch_size) batch_size=args.batch_size * args.gpus)
test_reader = paddle.batch( test_reader = paddle.batch(
paddle.dataset.cifar.test10() paddle.dataset.cifar.test10()
if args.data_set == 'cifar10' else paddle.dataset.flowers.test(), if args.data_set == 'cifar10' else paddle.dataset.flowers.test(),
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import random
import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
from paddle.dataset import mnist, cifar, flowers, image
def convert_2_recordio(py_reader, outfilepath, batch_size, shape_data,
shape_label):
num_batches = 0
with fluid.program_guard(fluid.Program(), fluid.Program()):
reader = paddle.batch(py_reader(), batch_size=batch_size)
feeder = fluid.DataFeeder(
feed_list=[ # order is image and label
fluid.layers.data(
name='image', shape=shape_data),
fluid.layers.data(
name='label', shape=shape_label, dtype='int64'),
],
place=fluid.CPUPlace())
num_batches = fluid.recordio_writer.convert_reader_to_recordio_file(
outfilepath, reader, feeder)
return num_batches
def prepare_mnist(outpath, batch_size):
outfilepath = os.path.join(outpath, "mnist.recordio")
convert_2_recordio(mnist.train, outfilepath, batch_size, [784], [1])
def prepare_cifar10(outpath, batch_size):
outfilepath = os.path.join(outpath, "cifar.recordio")
convert_2_recordio(cifar.train10, outfilepath, batch_size, [3, 32, 32], [1])
def prepare_flowers(outpath, batch_size):
outfilepath = os.path.join(outpath, "flowers.recordio")
convert_2_recordio(flowers.train, outfilepath, batch_size, [3, 224, 224],
[1])
def default_mapper(sample):
img, label = sample
img = image.simple_transform(
img, 256, 224, True, mean=[103.94, 116.78, 123.68])
return img.flatten().astype('float32'), label
def imagenet_train(data_dir):
contents = os.listdir(data_dir)
if set(contents) != set(
["train", "train.txt", "val", "val_set", "val.txt", "unzip.sh"]):
raise Exception("Imagenet data contents error!")
img2label = dict()
imgfilelist = []
with open(os.path.join(data_dir, "train.txt")) as fn:
while 1:
l = fn.readline()
if not l:
break
img, lbl = l[:-1].split(" ")
img2label[img] = int(lbl)
imgfilelist.append(img)
# shuffle all, this is slow
random.shuffle(imgfilelist)
def train_reader():
for idx, imgfile in enumerate(imgfilelist):
data = image.load_image(
os.path.join(data_dir, "train", imgfile.lower()))
label = [img2label[imgfile], ]
yield [data, label]
return paddle.reader.map_readers(default_mapper, train_reader)
def imagenet_test(data_dir):
contents = os.listdir(data_dir)
if set(contents) != set(
["train", "train.txt", "val", "val_set", "val.txt", "unzip.sh"]):
raise Exception("Imagenet data contents error!")
img2label = dict()
imgfilelist = []
with open(os.path.join(data_dir, "val.txt")) as fn:
while 1:
l = fn.readline()
if not l:
break
img, lbl = l[:-1].split(" ")
img2label[img] = int(lbl)
imgfilelist.append(img)
def test_reader():
for idx, imgfile in enumerate(imgfilelist):
base_path = os.path.join(data_dir, "val", imgfile.split(".")[0])
image_path = ".".join([base_path, "jpeg"])
data = image.load_image(image_path)
label = [img2label[imgfile], ]
yield [data, label]
return paddle.reader.map_readers(default_mapper, test_reader)
# FIXME(wuyi): delete this when https://github.com/PaddlePaddle/Paddle/pull/11066 is merged
def convert_reader_to_recordio_files(
filename,
batch_per_file,
reader_creator,
feeder,
compressor=core.RecordIOWriter.Compressor.Snappy,
max_num_records=1000,
feed_order=None):
if feed_order is None:
feed_order = feeder.feed_names
f_name, f_ext = os.path.splitext(filename)
assert (f_ext == ".recordio")
lines = []
f_idx = 0
counter = 0
for idx, batch in enumerate(reader_creator()):
lines.append(batch)
if idx >= batch_per_file and idx % batch_per_file == 0:
filename = "%s-%05d%s" % (f_name, f_idx, f_ext)
with fluid.recordio_writer.create_recordio_writer(
filename, compressor, max_num_records) as writer:
for l in lines:
res = feeder.feed(l)
for each in feed_order:
writer.append_tensor(res[each])
writer.complete_append_tensor()
counter += 1
lines = []
f_idx += 1
print("written file: ", filename)
return counter
def prepare_imagenet(inpath, outpath, batch_size):
r = paddle.batch(imagenet_train(inpath), batch_size=batch_size)
feeder = fluid.DataFeeder(
feed_list=[
fluid.layers.data(
name="image", shape=[3, 224, 224]), fluid.layers.data(
name="label", shape=[1], dtype='int64')
],
place=fluid.CPUPlace())
outpath = os.path.join(outpath, "imagenet.recordio")
convert_reader_to_recordio_files(outpath, 10000, r, feeder)
...@@ -434,7 +434,7 @@ def open_files(filenames, ...@@ -434,7 +434,7 @@ def open_files(filenames,
shapes, shapes,
lod_levels, lod_levels,
dtypes, dtypes,
thread_num, thread_num=1,
buffer_size=None, buffer_size=None,
pass_num=1, pass_num=1,
for_parallel=True): for_parallel=True):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册