提交 05d1f586 编写于 作者: Y Yancey1989

add dist restnet

上级 36d3825b
# Distributed Image Classification Models Training
This folder contains implementations of **Image Classification Models**, they are designed to support
large-scaled distributed training with two distributed architecture: parameter server with RPC and ring-base with Nvidia NCCL2 library.
## Getting Started
1. The entrypoint file is `dist_train.py`, some important flags are as follows:
- `model`, the model to run with, such as `ResNet50`, `ResNet101` and etc..
- `batch_size`, the batch_size per device.
- `update_method`, specify the update method, local, pserver or nccl2.
- `device`, use CPU or GPU device.
- `gpus`, the GPU device count that the process used.
you can check out more details of the flags by `python dist_train.py --help`.
1. Runtime configurations
We use the environment variable to distinguish the different training role of a distributed training job.
- `PADDLE_TRAINING_ROLE`, the current training role, should be in [PSERVER, TRAINER].
- `PADDLE_TRAINERS`, the trainer count of a job.
- `PADDLE_CURRENT_IP`, the current instance IP.
- `PADDLE_PSERVER_IPS`, the parameter server IP list, separated by "," only be used with update_method is pserver.
- `PADDLE_TRAINER_ID`, the unique trainer ID of a job, the ranging is [0, PADDLE_TRAINERS).
- `PADDLE_PSERVER_PORT`, the port of the parameter pserver listened on.
- `PADDLE_TRAINER_IPS`, the trainer IP list, separated by ",", only be used with upadte_method is nccl2.
### Pserver Server with RPC
In this example, we launched 4 parameter server instances and 4 trainer instances in the cluster:
1. launch parameter server process
``` python
PADDLE_TRAINING_ROLE=PSERVER \
PADDLE_TRAINERS=4 \
PADDLE_PSERVER_IPS=192.168.0.100,192.168.0.101,192.168.0.102,192.168.0.103 \
PADDLE_CURRENT_IP=192.168.0.100 \
PADDLE_PSERVER_PORT=7164 \
python dist_train.py \
--model=ResNet50 \
--batch_size=32 \
--update_method=pserver \
--device=CPU
```
1. launch trainer process
``` python
PADDLE_TRAINING_ROLE=PSERVER \
PADDLE_TRAINERS=4 \
PADDLE_PSERVER_IPS=192.168.0.100,192.168.0.101,192.168.0.102,192.168.0.103 \
PADDLE_TRAINER_ID=0 \
PADDLE_PSERVER_PORT=7164 \
python dist_train.py \
--model=ResNet50 \
--batch_size=32 \
--update_method=pserver \
--device=GPU
```
### Ring-base with Nvidia NCCL2 library
1. launch trainer process
``` python
PADDLE_TRAINING_ROLE=TRAINER \
PADDLE_TRAINERS=4 \
PADDLE_TRAINER_IPS=192.168.0.100,192.168.0.101,192.168.0.102,192.168.0.103 \
PADDLE_TRAINER_ID=0 \
python dist_train.py \
--model=ResNet50 \
--batch_size=32 \
--update_method=pserver \
--device=GPU
```
### Training Curve
It's easy to draw the training curve accroding to the training logs, for example,
the logs of ResNet50 is as follows:
``` text
Pass 0, batch 0, loss 7.0336914, accucacys: [0.0, 0.00390625]
Pass 0, batch 1, loss 7.094781, accucacys: [0.0, 0.0]
Pass 0, batch 2, loss 7.007068, accucacys: [0.0, 0.0078125]
Pass 0, batch 3, loss 7.1056547, accucacys: [0.00390625, 0.00390625]
Pass 0, batch 4, loss 7.133543, accucacys: [0.0, 0.0078125]
Pass 0, batch 5, loss 7.3055463, accucacys: [0.0078125, 0.01171875]
Pass 0, batch 6, loss 7.341838, accucacys: [0.0078125, 0.01171875]
Pass 0, batch 7, loss 7.290557, accucacys: [0.0, 0.0]
Pass 0, batch 8, loss 7.264951, accucacys: [0.0, 0.00390625]
Pass 0, batch 9, loss 7.43522, accucacys: [0.00390625, 0.00390625]
```
The training accucacys top1 of local training, distributed training with NCCL2 and parameter server architecture on the ResNet50 model are shown in the below figure:
<p align="center">
<img src="../images/resnet50_32gpus-acc1.png" height=300 width=528 > <br/>
Training acc1 curves
</p>
### Performance
TBD
\ No newline at end of file
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
__all__ = ['parse_args', ]
BENCHMARK_MODELS = [
"ResNet50", "ResNet101", "ResNet152"
]
def parse_args():
parser = argparse.ArgumentParser('Fluid model benchmarks.')
parser.add_argument(
'--model',
type=str,
choices=BENCHMARK_MODELS,
default='resnet',
help='The model to run benchmark with.')
parser.add_argument(
'--batch_size', type=int, default=32, help='The minibatch size.')
# args related to learning rate
parser.add_argument(
'--learning_rate', type=float, default=0.001, help='The learning rate.')
# TODO(wuyi): add "--use_fake_data" option back.
parser.add_argument(
'--skip_batch_num',
type=int,
default=5,
help='The first num of minibatch num to skip, for better performance test'
)
parser.add_argument(
'--iterations', type=int, default=80, help='The number of minibatches.')
parser.add_argument(
'--pass_num', type=int, default=100, help='The number of passes.')
parser.add_argument(
'--data_format',
type=str,
default='NCHW',
choices=['NCHW', 'NHWC'],
help='The data data_format, now only support NCHW.')
parser.add_argument(
'--device',
type=str,
default='GPU',
choices=['CPU', 'GPU'],
help='The device type.')
parser.add_argument(
'--gpus',
type=int,
default=1,
help='If gpus > 1, will use ParallelExecutor to run, else use Executor.')
# this option is available only for vgg and resnet.
parser.add_argument(
'--cpus',
type=int,
default=1,
help='If cpus > 1, will set ParallelExecutor to use multiple threads.')
parser.add_argument(
'--data_set',
type=str,
default='flowers',
choices=['cifar10', 'flowers', 'imagenet'],
help='Optional dataset for benchmark.')
parser.add_argument(
'--infer_only', action='store_true', help='If set, run forward only.')
parser.add_argument(
'--no_test',
action='store_true',
help='If set, do not test the testset during training.')
parser.add_argument(
'--memory_optimize',
action='store_true',
help='If set, optimize runtime memory before start.')
parser.add_argument(
'--use_fake_data',
action='store_true',
help='If set ommit the actual read data operators.')
parser.add_argument(
'--update_method',
type=str,
default='local',
choices=['local', 'pserver', 'nccl2'],
help='Choose parameter update method, can be local, pserver, nccl2.')
parser.add_argument(
'--no_split_var',
action='store_true',
default=False,
help='Whether split variables into blocks when update_method is pserver')
parser.add_argument(
'--async_mode',
action='store_true',
default=False,
help='Whether start pserver in async mode to support ASGD')
parser.add_argument(
'--use_reader_op',
action='store_true',
help='Whether to use reader op, and must specify the data path if set this to true.'
)
parser.add_argument(
'--no_random',
action='store_true',
help='If set, keep the random seed and do not shuffle the data.')
parser.add_argument(
'--use_lars',
action='store_true',
help='If set, use lars for optimizers, ONLY support resnet module.')
parser.add_argument(
'--reduce_strategy',
type=str,
choices=['reduce', 'all_reduce'],
default='all_reduce',
help='Specify the reduce strategy, can be reduce, all_reduce')
args = parser.parse_args()
return args
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import time
import os
import traceback
import numpy as np
import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.transpiler.distribute_transpiler as distribute_transpiler
import sys
sys.path.append("..")
import models
from imagenet_reader import train, val
from args import *
def get_model(args, is_train, main_prog, startup_prog):
pyreader = None
class_dim = 1000
if args.data_format == 'NCHW':
dshape = [3, 224, 224]
else:
dshape = [224, 224, 3]
if is_train:
reader = train(xmap=False)
else:
reader = val(xmap=False)
trainer_count = int(os.getenv("PADDLE_TRAINERS", "1"))
with fluid.program_guard(main_prog, startup_prog):
with fluid.unique_name.guard():
pyreader = fluid.layers.py_reader(
capacity=args.batch_size * args.gpus,
shapes=([-1] + dshape, (-1, 1)),
dtypes=('float32', 'int64'),
name="train_reader" if is_train else "test_reader",
use_double_buffer=True)
input, label = fluid.layers.read_file(pyreader)
model_def = models.__dict__[args.model]()
predict = model_def.net(input, class_dim=class_dim)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
batch_acc1 = fluid.layers.accuracy(input=predict, label=label, k=1)
batch_acc5 = fluid.layers.accuracy(input=predict, label=label, k=5)
# configure optimize
optimizer = None
if is_train:
total_images = 1281167 / trainer_count
step = int(total_images / (args.batch_size * args.gpus) + 1)
epochs = [30, 60, 90]
bd = [step * e for e in epochs]
base_lr = args.learning_rate
lr = []
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
optimizer = fluid.optimizer.Momentum(
learning_rate=fluid.layers.piecewise_decay(
boundaries=bd, values=lr),
momentum=0.9,
regularization=fluid.regularizer.L2Decay(1e-4))
optimizer.minimize(avg_cost)
if args.memory_optimize:
fluid.memory_optimize(main_prog)
batched_reader = None
pyreader.decorate_paddle_reader(
paddle.batch(
reader if args.no_random else paddle.reader.shuffle(
reader, buf_size=5120),
batch_size=args.batch_size))
return avg_cost, optimizer, [batch_acc1,
batch_acc5], batched_reader, pyreader
def append_nccl2_prepare(trainer_id, startup_prog):
if trainer_id >= 0:
# append gen_nccl_id at the end of startup program
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
port = os.getenv("PADDLE_PSERVER_PORT")
worker_ips = os.getenv("PADDLE_TRAINER_IPS")
worker_endpoints = []
for ip in worker_ips.split(","):
worker_endpoints.append(':'.join([ip, port]))
num_trainers = len(worker_endpoints)
current_endpoint = os.getenv("PADDLE_CURRENT_IP") + ":" + port
worker_endpoints.remove(current_endpoint)
nccl_id_var = startup_prog.global_block().create_var(
name="NCCLID",
persistable=True,
type=fluid.core.VarDesc.VarType.RAW)
startup_prog.global_block().append_op(
type="gen_nccl_id",
inputs={},
outputs={"NCCLID": nccl_id_var},
attrs={
"endpoint": current_endpoint,
"endpoint_list": worker_endpoints,
"trainer_id": trainer_id
})
return nccl_id_var, num_trainers, trainer_id
else:
raise Exception("must set positive PADDLE_TRAINER_ID env variables for "
"nccl-based dist train.")
def dist_transpile(trainer_id, args, train_prog, startup_prog):
if trainer_id < 0:
return None, None
# the port of all pservers, needed by both trainer and pserver
port = os.getenv("PADDLE_PSERVER_PORT", "6174")
# comma separated ips of all pservers, needed by trainer and
# pserver
pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
eplist = []
for ip in pserver_ips.split(","):
eplist.append(':'.join([ip, port]))
pserver_endpoints = ",".join(eplist)
# total number of workers/trainers in the job, needed by
# trainer and pserver
trainers = int(os.getenv("PADDLE_TRAINERS"))
# the IP of the local machine, needed by pserver only
current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
# the role, should be either PSERVER or TRAINER
training_role = os.getenv("PADDLE_TRAINING_ROLE")
config = distribute_transpiler.DistributeTranspilerConfig()
config.slice_var_up = not args.no_split_var
t = distribute_transpiler.DistributeTranspiler(config=config)
t.transpile(
trainer_id,
# NOTE: *MUST* use train_prog, for we are using with guard to
# generate different program for train and test.
program=train_prog,
pservers=pserver_endpoints,
trainers=trainers,
sync_mode=not args.async_mode,
startup_program=startup_prog)
if training_role == "PSERVER":
pserver_program = t.get_pserver_program(current_endpoint)
pserver_startup_program = t.get_startup_program(
current_endpoint, pserver_program, startup_program=startup_prog)
return pserver_program, pserver_startup_program
elif training_role == "TRAINER":
train_program = t.get_trainer_program()
return train_program, startup_prog
else:
raise ValueError(
'PADDLE_TRAINING_ROLE environment variable must be either TRAINER or PSERVER'
)
def test_parallel(exe, test_args, args, test_prog, feeder):
acc_evaluators = []
for i in xrange(len(test_args[2])):
acc_evaluators.append(fluid.metrics.Accuracy())
to_fetch = [v.name for v in test_args[2]]
test_args[4].start()
while True:
try:
acc_rets = exe.run(fetch_list=to_fetch)
for i, e in enumerate(acc_evaluators):
e.update(
value=np.array(acc_rets[i]), weight=args.batch_size)
except fluid.core.EOFException as eof:
test_args[4].reset()
break
return [e.eval() for e in acc_evaluators]
# NOTE: only need to benchmark using parallelexe
def train_parallel(train_args, test_args, args, train_prog, test_prog,
startup_prog, nccl_id_var, num_trainers, trainer_id):
over_all_start = time.time()
place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
feeder = None
if nccl_id_var and trainer_id == 0:
#FIXME(wuyi): wait other trainer to start listening
time.sleep(30)
startup_exe = fluid.Executor(place)
startup_exe.run(startup_prog)
strategy = fluid.ExecutionStrategy()
strategy.num_threads = args.cpus
strategy.allow_op_delay = False
build_strategy = fluid.BuildStrategy()
if args.reduce_strategy == "reduce":
build_strategy.reduce_strategy = fluid.BuildStrategy(
).ReduceStrategy.Reduce
else:
build_strategy.reduce_strategy = fluid.BuildStrategy(
).ReduceStrategy.AllReduce
avg_loss = train_args[0]
if args.update_method == "pserver":
# parameter server mode distributed training, merge
# gradients on local server, do not initialize
# ParallelExecutor with multi server all-reduce mode.
num_trainers = 1
trainer_id = 0
exe = fluid.ParallelExecutor(
True,
avg_loss.name,
main_program=train_prog,
exec_strategy=strategy,
build_strategy=build_strategy,
num_trainers=num_trainers,
trainer_id=trainer_id)
if not args.no_test:
if args.update_method == "pserver":
test_scope = None
else:
# NOTE: use an empty scope to avoid test exe using NCCLID
test_scope = fluid.Scope()
test_exe = fluid.ParallelExecutor(
True, main_program=test_prog, share_vars_from=exe)
pyreader = train_args[4]
for pass_id in range(args.pass_num):
num_samples = 0
iters = 0
start_time = time.time()
batch_id = 0
pyreader.start()
while True:
if iters == args.iterations:
break
if iters == args.skip_batch_num:
start_time = time.time()
num_samples = 0
fetch_list = [avg_loss.name]
acc_name_list = [v.name for v in train_args[2]]
fetch_list.extend(acc_name_list)
try:
fetch_ret = exe.run(fetch_list)
except fluid.core.EOFException as eof:
break
except fluid.core.EnforceNotMet as ex:
traceback.print_exc()
break
num_samples += args.batch_size * args.gpus
iters += 1
if batch_id % 1 == 0:
fetched_data = [np.mean(np.array(d)) for d in fetch_ret]
print("Pass %d, batch %d, loss %s, accucacys: %s" %
(pass_id, batch_id, fetched_data[0], fetched_data[1:]))
batch_id += 1
print_train_time(start_time, time.time(), num_samples)
pyreader.reset() # reset reader handle
if not args.no_test and test_args[2]:
test_feeder = None
test_ret = test_parallel(test_exe, test_args, args, test_prog,
test_feeder)
print("Pass: %d, Test Accuracy: %s\n" %
(pass_id, [np.mean(np.array(v)) for v in test_ret]))
startup_exe.close()
print("total train time: ", time.time() - over_all_start)
def print_arguments(args):
print('----------- Configuration Arguments -----------')
for arg, value in sorted(vars(args).iteritems()):
print('%s: %s' % (arg, value))
print('------------------------------------------------')
def print_train_time(start_time, end_time, num_samples):
train_elapsed = end_time - start_time
examples_per_sec = num_samples / train_elapsed
print('\nTotal examples: %d, total time: %.5f, %.5f examples/sed\n' %
(num_samples, train_elapsed, examples_per_sec))
def print_paddle_envs():
print('----------- Configuration envs -----------')
for k in os.environ:
if "PADDLE_" in k:
print "ENV %s:%s" % (k, os.environ[k])
print('------------------------------------------------')
def main():
args = parse_args()
print_arguments(args)
print_paddle_envs()
if args.no_random:
fluid.default_startup_program().random_seed = 1
# the unique trainer id, starting from 0, needed by trainer
# only
nccl_id_var, num_trainers, trainer_id = (
None, 1, int(os.getenv("PADDLE_TRAINER_ID", "0")))
train_prog = fluid.Program()
test_prog = fluid.Program()
startup_prog = fluid.Program()
train_args = list(get_model(args, True, train_prog, startup_prog))
test_args = list(get_model(args, False, test_prog, startup_prog))
all_args = [train_args, test_args, args]
if args.update_method == "pserver":
train_prog, startup_prog = dist_transpile(trainer_id, args, train_prog,
startup_prog)
if not train_prog:
raise Exception(
"Must configure correct environments to run dist train.")
all_args.extend([train_prog, test_prog, startup_prog])
if args.gpus > 1 and os.getenv("PADDLE_TRAINING_ROLE") == "TRAINER":
all_args.extend([nccl_id_var, num_trainers, trainer_id])
train_parallel(*all_args)
elif os.getenv("PADDLE_TRAINING_ROLE") == "PSERVER":
# start pserver with Executor
server_exe = fluid.Executor(fluid.CPUPlace())
server_exe.run(startup_prog)
server_exe.run(train_prog)
exit(0)
# for other update methods, use default programs
all_args.extend([train_prog, test_prog, startup_prog])
if args.update_method == "nccl2":
nccl_id_var, num_trainers, trainer_id = append_nccl2_prepare(
trainer_id, startup_prog)
all_args.extend([nccl_id_var, num_trainers, trainer_id])
train_parallel(*all_args)
if __name__ == "__main__":
main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import math
import random
import functools
import numpy as np
from threading import Thread
import subprocess
import time
from Queue import Queue
import paddle
from PIL import Image, ImageEnhance
random.seed(0)
DATA_DIM = 224
THREAD = int(os.getenv("PREPROCESS_THREADS", "10"))
BUF_SIZE = 5120
DATA_DIR = '../data/ILSVRC2012'
TRAIN_LIST = '../data/ILSVRC2012/train.txt'
TEST_LIST = '../data/ILSVRC2012/val.txt'
img_mean = np.array([0.485, 0.456, 0.406]).reshape((3, 1, 1))
img_std = np.array([0.229, 0.224, 0.225]).reshape((3, 1, 1))
def resize_short(img, target_size):
percent = float(target_size) / min(img.size[0], img.size[1])
resized_width = int(round(img.size[0] * percent))
resized_height = int(round(img.size[1] * percent))
img = img.resize((resized_width, resized_height), Image.LANCZOS)
return img
def crop_image(img, target_size, center):
width, height = img.size
size = target_size
if center == True:
w_start = (width - size) / 2
h_start = (height - size) / 2
else:
w_start = random.randint(0, width - size)
h_start = random.randint(0, height - size)
w_end = w_start + size
h_end = h_start + size
img = img.crop((w_start, h_start, w_end, h_end))
return img
def random_crop(img, size, scale=[0.08, 1.0], ratio=[3. / 4., 4. / 3.]):
aspect_ratio = math.sqrt(random.uniform(*ratio))
w = 1. * aspect_ratio
h = 1. / aspect_ratio
bound = min((float(img.size[0]) / img.size[1]) / (w**2),
(float(img.size[1]) / img.size[0]) / (h**2))
scale_max = min(scale[1], bound)
scale_min = min(scale[0], bound)
target_area = img.size[0] * img.size[1] * random.uniform(scale_min,
scale_max)
target_size = math.sqrt(target_area)
w = int(target_size * w)
h = int(target_size * h)
i = random.randint(0, img.size[0] - w)
j = random.randint(0, img.size[1] - h)
img = img.crop((i, j, i + w, j + h))
img = img.resize((size, size), Image.LANCZOS)
return img
def rotate_image(img):
angle = random.randint(-10, 10)
img = img.rotate(angle)
return img
def distort_color(img):
def random_brightness(img, lower=0.5, upper=1.5):
e = random.uniform(lower, upper)
return ImageEnhance.Brightness(img).enhance(e)
def random_contrast(img, lower=0.5, upper=1.5):
e = random.uniform(lower, upper)
return ImageEnhance.Contrast(img).enhance(e)
def random_color(img, lower=0.5, upper=1.5):
e = random.uniform(lower, upper)
return ImageEnhance.Color(img).enhance(e)
ops = [random_brightness, random_contrast, random_color]
random.shuffle(ops)
img = ops[0](img)
img = ops[1](img)
img = ops[2](img)
return img
def process_image(sample, mode, color_jitter, rotate):
img_path = sample[0]
img = Image.open(img_path)
if mode == 'train':
if rotate: img = rotate_image(img)
img = random_crop(img, DATA_DIM)
else:
img = resize_short(img, target_size=256)
img = crop_image(img, target_size=DATA_DIM, center=True)
if mode == 'train':
if color_jitter:
img = distort_color(img)
if random.randint(0, 1) == 1:
img = img.transpose(Image.FLIP_LEFT_RIGHT)
if img.mode != 'RGB':
img = img.convert('RGB')
img = np.array(img).astype('float32').transpose((2, 0, 1)) / 255
img -= img_mean
img /= img_std
if mode == 'train' or mode == 'val':
return img, sample[1]
elif mode == 'test':
return [img]
class XmapEndSignal():
pass
def xmap_readers(mapper,
reader,
process_num,
buffer_size,
order=False,
print_queue_state=True):
end = XmapEndSignal()
# define a worker to read samples from reader to in_queue
def read_worker(reader, in_queue):
for i in reader():
in_queue.put(i)
in_queue.put(end)
# define a worker to read samples from reader to in_queue with order flag
def order_read_worker(reader, in_queue, file_queue):
in_order = 0
for i in reader():
in_queue.put((in_order, i))
in_order += 1
in_queue.put(end)
# define a worker to handle samples from in_queue by mapper
# and put mapped samples into out_queue
def handle_worker(in_queue, out_queue, mapper):
sample = in_queue.get()
while not isinstance(sample, XmapEndSignal):
r = mapper(sample)
out_queue.put(r)
sample = in_queue.get()
in_queue.put(end)
out_queue.put(end)
# define a worker to handle samples from in_queue by mapper
# and put mapped samples into out_queue by order
def order_handle_worker(in_queue, out_queue, mapper, out_order):
ins = in_queue.get()
while not isinstance(ins, XmapEndSignal):
order, sample = ins
r = mapper(sample)
while order != out_order[0]:
pass
out_queue.put(r)
out_order[0] += 1
ins = in_queue.get()
in_queue.put(end)
out_queue.put(end)
def xreader():
file_queue = Queue()
in_queue = Queue(buffer_size)
out_queue = Queue(buffer_size)
out_order = [0]
# start a read worker in a thread
target = order_read_worker if order else read_worker
t = Thread(target=target, args=(reader, in_queue))
t.daemon = True
t.start()
# start several handle_workers
target = order_handle_worker if order else handle_worker
args = (in_queue, out_queue, mapper, out_order) if order else (
in_queue, out_queue, mapper)
workers = []
for i in xrange(process_num):
worker = Thread(target=target, args=args)
worker.daemon = True
workers.append(worker)
for w in workers:
w.start()
sample = out_queue.get()
start_t = time.time()
while not isinstance(sample, XmapEndSignal):
yield sample
sample = out_queue.get()
if time.time() - start_t > 3:
if print_queue_state:
print("queue sizes: ", in_queue.qsize(), out_queue.qsize())
start_t = time.time()
finish = 1
while finish < process_num:
sample = out_queue.get()
if isinstance(sample, XmapEndSignal):
finish += 1
else:
yield sample
return xreader
def _reader_creator(file_list,
mode,
shuffle=False,
color_jitter=False,
rotate=False,
xmap=True):
def reader():
with open(file_list) as flist:
full_lines = [line.strip() for line in flist]
if shuffle:
random.shuffle(full_lines)
if mode == 'train':
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
trainer_count = int(os.getenv("PADDLE_TRAINERS", "1"))
per_node_lines = len(full_lines) / trainer_count
lines = full_lines[trainer_id * per_node_lines:(trainer_id + 1)
* per_node_lines]
print(
"read images from %d, length: %d, lines length: %d, total: %d"
% (trainer_id * per_node_lines, per_node_lines, len(lines),
len(full_lines)))
else:
lines = full_lines
for line in lines:
if mode == 'train':
img_path, label = line.split()
img_path = img_path.replace("JPEG", "jpeg")
img_path = os.path.join(DATA_DIR, "train", img_path)
yield (img_path, int(label))
elif mode == 'val':
img_path, label = line.split()
img_path = img_path.replace("JPEG", "jpeg")
img_path = os.path.join(DATA_DIR, "val", img_path)
yield (img_path, int(label))
elif mode == 'test':
img_path = os.path.join(DATA_DIR, line)
yield [img_path]
mapper = functools.partial(
process_image, mode=mode, color_jitter=color_jitter, rotate=rotate)
return paddle.reader.xmap_readers(mapper, reader, THREAD, BUF_SIZE)
def load_raw_image_uint8(sample):
img_arr = np.array(Image.open(sample[0])).astype('int64')
return img_arr, int(sample[1])
def train_raw(file_list=TRAIN_LIST, shuffle=True):
def reader():
with open(file_list) as flist:
full_lines = [line.strip() for line in flist]
if shuffle:
random.shuffle(full_lines)
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
trainer_count = int(os.getenv("PADDLE_TRAINERS"))
per_node_lines = len(full_lines) / trainer_count
lines = full_lines[trainer_id * per_node_lines:(trainer_id + 1) *
per_node_lines]
print("read images from %d, length: %d, lines length: %d, total: %d"
% (trainer_id * per_node_lines, per_node_lines, len(lines),
len(full_lines)))
for line in lines:
img_path, label = line.split()
img_path = img_path.replace("JPEG", "jpeg")
img_path = os.path.join(DATA_DIR, "train", img_path)
yield (img_path, int(label))
return paddle.reader.xmap_readers(load_raw_image_uint8, reader, THREAD,
BUF_SIZE)
def train(file_list=TRAIN_LIST, xmap=True):
return _reader_creator(
file_list,
'train',
shuffle=True,
color_jitter=False,
rotate=False,
xmap=xmap)
def val(file_list=TEST_LIST, xmap=True):
return _reader_creator(file_list, 'val', shuffle=False, xmap=xmap)
def test(file_list=TEST_LIST):
return _reader_creator(file_list, 'test', shuffle=False)
if __name__ == "__main__":
c = 0
start_t = time.time()
for d in train()():
c += 1
if c >= 10000:
break
spent = time.time() - start_t
print("read 10000 speed: ", 10000 / spent, spent)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册