diff --git a/fluid/image_classification/dist_train/README.md b/fluid/image_classification/dist_train/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..5e8cd4a7634ba13197e3dcc3b4c3cced1d78e393
--- /dev/null
+++ b/fluid/image_classification/dist_train/README.md
@@ -0,0 +1,108 @@
+# Distributed Image Classification Models Training
+
+This folder contains implementations of **Image Classification Models**, they are designed to support
+large-scaled distributed training with two distributed architecture: parameter server with RPC and ring-base with Nvidia NCCL2 library.
+
+## Getting Started
+
+1. The entrypoint file is `dist_train.py`, some important flags are as follows:
+
+ - `model`, the model to run with, such as `ResNet50`, `ResNet101` and etc..
+ - `batch_size`, the batch_size per device.
+ - `update_method`, specify the update method, local, pserver or nccl2.
+ - `device`, use CPU or GPU device.
+ - `gpus`, the GPU device count that the process used.
+
+ you can check out more details of the flags by `python dist_train.py --help`.
+
+1. Runtime configurations
+
+ We use the environment variable to distinguish the different training role of a distributed training job.
+
+ - `PADDLE_TRAINING_ROLE`, the current training role, should be in [PSERVER, TRAINER].
+ - `PADDLE_TRAINERS`, the trainer count of a job.
+ - `PADDLE_CURRENT_IP`, the current instance IP.
+ - `PADDLE_PSERVER_IPS`, the parameter server IP list, separated by "," only be used with update_method is pserver.
+ - `PADDLE_TRAINER_ID`, the unique trainer ID of a job, the ranging is [0, PADDLE_TRAINERS).
+ - `PADDLE_PSERVER_PORT`, the port of the parameter pserver listened on.
+ - `PADDLE_TRAINER_IPS`, the trainer IP list, separated by ",", only be used with upadte_method is nccl2.
+
+### Pserver Server with RPC
+
+In this example, we launched 4 parameter server instances and 4 trainer instances in the cluster:
+
+1. launch parameter server process
+
+ ``` python
+ PADDLE_TRAINING_ROLE=PSERVER \
+ PADDLE_TRAINERS=4 \
+ PADDLE_PSERVER_IPS=192.168.0.100,192.168.0.101,192.168.0.102,192.168.0.103 \
+ PADDLE_CURRENT_IP=192.168.0.100 \
+ PADDLE_PSERVER_PORT=7164 \
+ python dist_train.py \
+ --model=ResNet50 \
+ --batch_size=32 \
+ --update_method=pserver \
+ --device=CPU
+ ```
+
+1. launch trainer process
+
+ ``` python
+ PADDLE_TRAINING_ROLE=PSERVER \
+ PADDLE_TRAINERS=4 \
+ PADDLE_PSERVER_IPS=192.168.0.100,192.168.0.101,192.168.0.102,192.168.0.103 \
+ PADDLE_TRAINER_ID=0 \
+ PADDLE_PSERVER_PORT=7164 \
+ python dist_train.py \
+ --model=ResNet50 \
+ --batch_size=32 \
+ --update_method=pserver \
+ --device=GPU
+
+ ```
+
+### Ring-base with Nvidia NCCL2 library
+
+1. launch trainer process
+
+ ``` python
+ PADDLE_TRAINING_ROLE=TRAINER \
+ PADDLE_TRAINERS=4 \
+ PADDLE_TRAINER_IPS=192.168.0.100,192.168.0.101,192.168.0.102,192.168.0.103 \
+ PADDLE_TRAINER_ID=0 \
+ python dist_train.py \
+ --model=ResNet50 \
+ --batch_size=32 \
+ --update_method=pserver \
+ --device=GPU
+ ```
+
+### Training Curve
+
+It's easy to draw the training curve accroding to the training logs, for example,
+the logs of ResNet50 is as follows:
+
+``` text
+Pass 0, batch 0, loss 7.0336914, accucacys: [0.0, 0.00390625]
+Pass 0, batch 1, loss 7.094781, accucacys: [0.0, 0.0]
+Pass 0, batch 2, loss 7.007068, accucacys: [0.0, 0.0078125]
+Pass 0, batch 3, loss 7.1056547, accucacys: [0.00390625, 0.00390625]
+Pass 0, batch 4, loss 7.133543, accucacys: [0.0, 0.0078125]
+Pass 0, batch 5, loss 7.3055463, accucacys: [0.0078125, 0.01171875]
+Pass 0, batch 6, loss 7.341838, accucacys: [0.0078125, 0.01171875]
+Pass 0, batch 7, loss 7.290557, accucacys: [0.0, 0.0]
+Pass 0, batch 8, loss 7.264951, accucacys: [0.0, 0.00390625]
+Pass 0, batch 9, loss 7.43522, accucacys: [0.00390625, 0.00390625]
+```
+
+The training accucacys top1 of local training, distributed training with NCCL2 and parameter server architecture on the ResNet50 model are shown in the below figure:
+
+
+
+Training acc1 curves
+
+
+### Performance
+
+TBD
\ No newline at end of file
diff --git a/fluid/image_classification/dist_train/__init__.py b/fluid/image_classification/dist_train/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391
diff --git a/fluid/image_classification/dist_train/args.py b/fluid/image_classification/dist_train/args.py
new file mode 100644
index 0000000000000000000000000000000000000000..b69362a08357839e698ecd3f9f84ac3111adf0ff
--- /dev/null
+++ b/fluid/image_classification/dist_train/args.py
@@ -0,0 +1,127 @@
+# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+
+__all__ = ['parse_args', ]
+
+BENCHMARK_MODELS = [
+ "ResNet50", "ResNet101", "ResNet152"
+]
+
+
+def parse_args():
+ parser = argparse.ArgumentParser('Fluid model benchmarks.')
+ parser.add_argument(
+ '--model',
+ type=str,
+ choices=BENCHMARK_MODELS,
+ default='resnet',
+ help='The model to run benchmark with.')
+ parser.add_argument(
+ '--batch_size', type=int, default=32, help='The minibatch size.')
+ # args related to learning rate
+ parser.add_argument(
+ '--learning_rate', type=float, default=0.001, help='The learning rate.')
+ # TODO(wuyi): add "--use_fake_data" option back.
+ parser.add_argument(
+ '--skip_batch_num',
+ type=int,
+ default=5,
+ help='The first num of minibatch num to skip, for better performance test'
+ )
+ parser.add_argument(
+ '--iterations', type=int, default=80, help='The number of minibatches.')
+ parser.add_argument(
+ '--pass_num', type=int, default=100, help='The number of passes.')
+ parser.add_argument(
+ '--data_format',
+ type=str,
+ default='NCHW',
+ choices=['NCHW', 'NHWC'],
+ help='The data data_format, now only support NCHW.')
+ parser.add_argument(
+ '--device',
+ type=str,
+ default='GPU',
+ choices=['CPU', 'GPU'],
+ help='The device type.')
+ parser.add_argument(
+ '--gpus',
+ type=int,
+ default=1,
+ help='If gpus > 1, will use ParallelExecutor to run, else use Executor.')
+ # this option is available only for vgg and resnet.
+ parser.add_argument(
+ '--cpus',
+ type=int,
+ default=1,
+ help='If cpus > 1, will set ParallelExecutor to use multiple threads.')
+ parser.add_argument(
+ '--data_set',
+ type=str,
+ default='flowers',
+ choices=['cifar10', 'flowers', 'imagenet'],
+ help='Optional dataset for benchmark.')
+ parser.add_argument(
+ '--infer_only', action='store_true', help='If set, run forward only.')
+ parser.add_argument(
+ '--no_test',
+ action='store_true',
+ help='If set, do not test the testset during training.')
+ parser.add_argument(
+ '--memory_optimize',
+ action='store_true',
+ help='If set, optimize runtime memory before start.')
+ parser.add_argument(
+ '--use_fake_data',
+ action='store_true',
+ help='If set ommit the actual read data operators.')
+ parser.add_argument(
+ '--update_method',
+ type=str,
+ default='local',
+ choices=['local', 'pserver', 'nccl2'],
+ help='Choose parameter update method, can be local, pserver, nccl2.')
+ parser.add_argument(
+ '--no_split_var',
+ action='store_true',
+ default=False,
+ help='Whether split variables into blocks when update_method is pserver')
+ parser.add_argument(
+ '--async_mode',
+ action='store_true',
+ default=False,
+ help='Whether start pserver in async mode to support ASGD')
+ parser.add_argument(
+ '--use_reader_op',
+ action='store_true',
+ help='Whether to use reader op, and must specify the data path if set this to true.'
+ )
+ parser.add_argument(
+ '--no_random',
+ action='store_true',
+ help='If set, keep the random seed and do not shuffle the data.')
+ parser.add_argument(
+ '--use_lars',
+ action='store_true',
+ help='If set, use lars for optimizers, ONLY support resnet module.')
+ parser.add_argument(
+ '--reduce_strategy',
+ type=str,
+ choices=['reduce', 'all_reduce'],
+ default='all_reduce',
+ help='Specify the reduce strategy, can be reduce, all_reduce')
+ args = parser.parse_args()
+ return args
diff --git a/fluid/image_classification/dist_train/dist_train.py b/fluid/image_classification/dist_train/dist_train.py
new file mode 100644
index 0000000000000000000000000000000000000000..340e09dcdec37f73d169b2d38a778bd9ab05c5fe
--- /dev/null
+++ b/fluid/image_classification/dist_train/dist_train.py
@@ -0,0 +1,369 @@
+# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import time
+import os
+import traceback
+
+import numpy as np
+
+import paddle
+import paddle.fluid as fluid
+import paddle.fluid.core as core
+import paddle.fluid.transpiler.distribute_transpiler as distribute_transpiler
+import sys
+sys.path.append("..")
+import models
+from imagenet_reader import train, val
+
+from args import *
+
+def get_model(args, is_train, main_prog, startup_prog):
+ pyreader = None
+ class_dim = 1000
+ if args.data_format == 'NCHW':
+ dshape = [3, 224, 224]
+ else:
+ dshape = [224, 224, 3]
+ if is_train:
+ reader = train(xmap=False)
+ else:
+ reader = val(xmap=False)
+
+ trainer_count = int(os.getenv("PADDLE_TRAINERS", "1"))
+ with fluid.program_guard(main_prog, startup_prog):
+ with fluid.unique_name.guard():
+ pyreader = fluid.layers.py_reader(
+ capacity=args.batch_size * args.gpus,
+ shapes=([-1] + dshape, (-1, 1)),
+ dtypes=('float32', 'int64'),
+ name="train_reader" if is_train else "test_reader",
+ use_double_buffer=True)
+ input, label = fluid.layers.read_file(pyreader)
+ model_def = models.__dict__[args.model]()
+ predict = model_def.net(input, class_dim=class_dim)
+
+ cost = fluid.layers.cross_entropy(input=predict, label=label)
+ avg_cost = fluid.layers.mean(x=cost)
+
+ batch_acc1 = fluid.layers.accuracy(input=predict, label=label, k=1)
+ batch_acc5 = fluid.layers.accuracy(input=predict, label=label, k=5)
+
+
+
+ # configure optimize
+ optimizer = None
+ if is_train:
+
+ total_images = 1281167 / trainer_count
+
+ step = int(total_images / (args.batch_size * args.gpus) + 1)
+ epochs = [30, 60, 90]
+ bd = [step * e for e in epochs]
+ base_lr = args.learning_rate
+ lr = []
+ lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
+ optimizer = fluid.optimizer.Momentum(
+ learning_rate=fluid.layers.piecewise_decay(
+ boundaries=bd, values=lr),
+ momentum=0.9,
+ regularization=fluid.regularizer.L2Decay(1e-4))
+ optimizer.minimize(avg_cost)
+
+ if args.memory_optimize:
+ fluid.memory_optimize(main_prog)
+
+ batched_reader = None
+ pyreader.decorate_paddle_reader(
+ paddle.batch(
+ reader if args.no_random else paddle.reader.shuffle(
+ reader, buf_size=5120),
+ batch_size=args.batch_size))
+
+ return avg_cost, optimizer, [batch_acc1,
+ batch_acc5], batched_reader, pyreader
+
+def append_nccl2_prepare(trainer_id, startup_prog):
+ if trainer_id >= 0:
+ # append gen_nccl_id at the end of startup program
+ trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
+ port = os.getenv("PADDLE_PSERVER_PORT")
+ worker_ips = os.getenv("PADDLE_TRAINER_IPS")
+ worker_endpoints = []
+ for ip in worker_ips.split(","):
+ worker_endpoints.append(':'.join([ip, port]))
+ num_trainers = len(worker_endpoints)
+ current_endpoint = os.getenv("PADDLE_CURRENT_IP") + ":" + port
+ worker_endpoints.remove(current_endpoint)
+
+ nccl_id_var = startup_prog.global_block().create_var(
+ name="NCCLID",
+ persistable=True,
+ type=fluid.core.VarDesc.VarType.RAW)
+ startup_prog.global_block().append_op(
+ type="gen_nccl_id",
+ inputs={},
+ outputs={"NCCLID": nccl_id_var},
+ attrs={
+ "endpoint": current_endpoint,
+ "endpoint_list": worker_endpoints,
+ "trainer_id": trainer_id
+ })
+ return nccl_id_var, num_trainers, trainer_id
+ else:
+ raise Exception("must set positive PADDLE_TRAINER_ID env variables for "
+ "nccl-based dist train.")
+
+
+def dist_transpile(trainer_id, args, train_prog, startup_prog):
+ if trainer_id < 0:
+ return None, None
+
+ # the port of all pservers, needed by both trainer and pserver
+ port = os.getenv("PADDLE_PSERVER_PORT", "6174")
+ # comma separated ips of all pservers, needed by trainer and
+ # pserver
+ pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
+ eplist = []
+ for ip in pserver_ips.split(","):
+ eplist.append(':'.join([ip, port]))
+ pserver_endpoints = ",".join(eplist)
+ # total number of workers/trainers in the job, needed by
+ # trainer and pserver
+ trainers = int(os.getenv("PADDLE_TRAINERS"))
+ # the IP of the local machine, needed by pserver only
+ current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
+ # the role, should be either PSERVER or TRAINER
+ training_role = os.getenv("PADDLE_TRAINING_ROLE")
+
+ config = distribute_transpiler.DistributeTranspilerConfig()
+ config.slice_var_up = not args.no_split_var
+ t = distribute_transpiler.DistributeTranspiler(config=config)
+ t.transpile(
+ trainer_id,
+ # NOTE: *MUST* use train_prog, for we are using with guard to
+ # generate different program for train and test.
+ program=train_prog,
+ pservers=pserver_endpoints,
+ trainers=trainers,
+ sync_mode=not args.async_mode,
+ startup_program=startup_prog)
+ if training_role == "PSERVER":
+ pserver_program = t.get_pserver_program(current_endpoint)
+ pserver_startup_program = t.get_startup_program(
+ current_endpoint, pserver_program, startup_program=startup_prog)
+ return pserver_program, pserver_startup_program
+ elif training_role == "TRAINER":
+ train_program = t.get_trainer_program()
+ return train_program, startup_prog
+ else:
+ raise ValueError(
+ 'PADDLE_TRAINING_ROLE environment variable must be either TRAINER or PSERVER'
+ )
+
+
+def test_parallel(exe, test_args, args, test_prog, feeder):
+ acc_evaluators = []
+ for i in xrange(len(test_args[2])):
+ acc_evaluators.append(fluid.metrics.Accuracy())
+
+ to_fetch = [v.name for v in test_args[2]]
+ test_args[4].start()
+ while True:
+ try:
+ acc_rets = exe.run(fetch_list=to_fetch)
+ for i, e in enumerate(acc_evaluators):
+ e.update(
+ value=np.array(acc_rets[i]), weight=args.batch_size)
+ except fluid.core.EOFException as eof:
+ test_args[4].reset()
+ break
+
+ return [e.eval() for e in acc_evaluators]
+
+
+# NOTE: only need to benchmark using parallelexe
+def train_parallel(train_args, test_args, args, train_prog, test_prog,
+ startup_prog, nccl_id_var, num_trainers, trainer_id):
+ over_all_start = time.time()
+ place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
+ feeder = None
+
+ if nccl_id_var and trainer_id == 0:
+ #FIXME(wuyi): wait other trainer to start listening
+ time.sleep(30)
+
+ startup_exe = fluid.Executor(place)
+ startup_exe.run(startup_prog)
+ strategy = fluid.ExecutionStrategy()
+ strategy.num_threads = args.cpus
+ strategy.allow_op_delay = False
+ build_strategy = fluid.BuildStrategy()
+ if args.reduce_strategy == "reduce":
+ build_strategy.reduce_strategy = fluid.BuildStrategy(
+ ).ReduceStrategy.Reduce
+ else:
+ build_strategy.reduce_strategy = fluid.BuildStrategy(
+ ).ReduceStrategy.AllReduce
+
+ avg_loss = train_args[0]
+
+ if args.update_method == "pserver":
+ # parameter server mode distributed training, merge
+ # gradients on local server, do not initialize
+ # ParallelExecutor with multi server all-reduce mode.
+ num_trainers = 1
+ trainer_id = 0
+
+ exe = fluid.ParallelExecutor(
+ True,
+ avg_loss.name,
+ main_program=train_prog,
+ exec_strategy=strategy,
+ build_strategy=build_strategy,
+ num_trainers=num_trainers,
+ trainer_id=trainer_id)
+
+ if not args.no_test:
+ if args.update_method == "pserver":
+ test_scope = None
+ else:
+ # NOTE: use an empty scope to avoid test exe using NCCLID
+ test_scope = fluid.Scope()
+ test_exe = fluid.ParallelExecutor(
+ True, main_program=test_prog, share_vars_from=exe)
+
+ pyreader = train_args[4]
+ for pass_id in range(args.pass_num):
+ num_samples = 0
+ iters = 0
+ start_time = time.time()
+ batch_id = 0
+ pyreader.start()
+ while True:
+ if iters == args.iterations:
+ break
+
+ if iters == args.skip_batch_num:
+ start_time = time.time()
+ num_samples = 0
+ fetch_list = [avg_loss.name]
+ acc_name_list = [v.name for v in train_args[2]]
+ fetch_list.extend(acc_name_list)
+
+ try:
+ fetch_ret = exe.run(fetch_list)
+ except fluid.core.EOFException as eof:
+ break
+ except fluid.core.EnforceNotMet as ex:
+ traceback.print_exc()
+ break
+ num_samples += args.batch_size * args.gpus
+
+ iters += 1
+ if batch_id % 1 == 0:
+ fetched_data = [np.mean(np.array(d)) for d in fetch_ret]
+ print("Pass %d, batch %d, loss %s, accucacys: %s" %
+ (pass_id, batch_id, fetched_data[0], fetched_data[1:]))
+ batch_id += 1
+
+ print_train_time(start_time, time.time(), num_samples)
+ pyreader.reset() # reset reader handle
+
+ if not args.no_test and test_args[2]:
+ test_feeder = None
+ test_ret = test_parallel(test_exe, test_args, args, test_prog,
+ test_feeder)
+ print("Pass: %d, Test Accuracy: %s\n" %
+ (pass_id, [np.mean(np.array(v)) for v in test_ret]))
+
+ startup_exe.close()
+ print("total train time: ", time.time() - over_all_start)
+
+
+def print_arguments(args):
+ print('----------- Configuration Arguments -----------')
+ for arg, value in sorted(vars(args).iteritems()):
+ print('%s: %s' % (arg, value))
+ print('------------------------------------------------')
+
+
+def print_train_time(start_time, end_time, num_samples):
+ train_elapsed = end_time - start_time
+ examples_per_sec = num_samples / train_elapsed
+ print('\nTotal examples: %d, total time: %.5f, %.5f examples/sed\n' %
+ (num_samples, train_elapsed, examples_per_sec))
+
+
+def print_paddle_envs():
+ print('----------- Configuration envs -----------')
+ for k in os.environ:
+ if "PADDLE_" in k:
+ print "ENV %s:%s" % (k, os.environ[k])
+ print('------------------------------------------------')
+
+
+def main():
+ args = parse_args()
+ print_arguments(args)
+ print_paddle_envs()
+ if args.no_random:
+ fluid.default_startup_program().random_seed = 1
+
+ # the unique trainer id, starting from 0, needed by trainer
+ # only
+ nccl_id_var, num_trainers, trainer_id = (
+ None, 1, int(os.getenv("PADDLE_TRAINER_ID", "0")))
+
+ train_prog = fluid.Program()
+ test_prog = fluid.Program()
+ startup_prog = fluid.Program()
+
+
+
+ train_args = list(get_model(args, True, train_prog, startup_prog))
+ test_args = list(get_model(args, False, test_prog, startup_prog))
+
+ all_args = [train_args, test_args, args]
+
+ if args.update_method == "pserver":
+ train_prog, startup_prog = dist_transpile(trainer_id, args, train_prog,
+ startup_prog)
+ if not train_prog:
+ raise Exception(
+ "Must configure correct environments to run dist train.")
+ all_args.extend([train_prog, test_prog, startup_prog])
+ if args.gpus > 1 and os.getenv("PADDLE_TRAINING_ROLE") == "TRAINER":
+ all_args.extend([nccl_id_var, num_trainers, trainer_id])
+ train_parallel(*all_args)
+ elif os.getenv("PADDLE_TRAINING_ROLE") == "PSERVER":
+ # start pserver with Executor
+ server_exe = fluid.Executor(fluid.CPUPlace())
+ server_exe.run(startup_prog)
+ server_exe.run(train_prog)
+ exit(0)
+
+ # for other update methods, use default programs
+ all_args.extend([train_prog, test_prog, startup_prog])
+
+ if args.update_method == "nccl2":
+ nccl_id_var, num_trainers, trainer_id = append_nccl2_prepare(
+ trainer_id, startup_prog)
+
+ all_args.extend([nccl_id_var, num_trainers, trainer_id])
+ train_parallel(*all_args)
+
+if __name__ == "__main__":
+ main()
diff --git a/fluid/image_classification/dist_train/imagenet_reader.py b/fluid/image_classification/dist_train/imagenet_reader.py
new file mode 100644
index 0000000000000000000000000000000000000000..5f26ab99560b923173c336c36f714238169c4a9f
--- /dev/null
+++ b/fluid/image_classification/dist_train/imagenet_reader.py
@@ -0,0 +1,344 @@
+# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import math
+import random
+import functools
+import numpy as np
+from threading import Thread
+import subprocess
+import time
+
+from Queue import Queue
+import paddle
+from PIL import Image, ImageEnhance
+
+random.seed(0)
+
+DATA_DIM = 224
+
+THREAD = int(os.getenv("PREPROCESS_THREADS", "10"))
+BUF_SIZE = 5120
+
+DATA_DIR = '../data/ILSVRC2012'
+TRAIN_LIST = '../data/ILSVRC2012/train.txt'
+TEST_LIST = '../data/ILSVRC2012/val.txt'
+
+img_mean = np.array([0.485, 0.456, 0.406]).reshape((3, 1, 1))
+img_std = np.array([0.229, 0.224, 0.225]).reshape((3, 1, 1))
+
+
+def resize_short(img, target_size):
+ percent = float(target_size) / min(img.size[0], img.size[1])
+ resized_width = int(round(img.size[0] * percent))
+ resized_height = int(round(img.size[1] * percent))
+ img = img.resize((resized_width, resized_height), Image.LANCZOS)
+ return img
+
+
+def crop_image(img, target_size, center):
+ width, height = img.size
+ size = target_size
+ if center == True:
+ w_start = (width - size) / 2
+ h_start = (height - size) / 2
+ else:
+ w_start = random.randint(0, width - size)
+ h_start = random.randint(0, height - size)
+ w_end = w_start + size
+ h_end = h_start + size
+ img = img.crop((w_start, h_start, w_end, h_end))
+ return img
+
+
+def random_crop(img, size, scale=[0.08, 1.0], ratio=[3. / 4., 4. / 3.]):
+ aspect_ratio = math.sqrt(random.uniform(*ratio))
+ w = 1. * aspect_ratio
+ h = 1. / aspect_ratio
+
+ bound = min((float(img.size[0]) / img.size[1]) / (w**2),
+ (float(img.size[1]) / img.size[0]) / (h**2))
+ scale_max = min(scale[1], bound)
+ scale_min = min(scale[0], bound)
+
+ target_area = img.size[0] * img.size[1] * random.uniform(scale_min,
+ scale_max)
+ target_size = math.sqrt(target_area)
+ w = int(target_size * w)
+ h = int(target_size * h)
+
+ i = random.randint(0, img.size[0] - w)
+ j = random.randint(0, img.size[1] - h)
+
+ img = img.crop((i, j, i + w, j + h))
+ img = img.resize((size, size), Image.LANCZOS)
+ return img
+
+
+def rotate_image(img):
+ angle = random.randint(-10, 10)
+ img = img.rotate(angle)
+ return img
+
+
+def distort_color(img):
+ def random_brightness(img, lower=0.5, upper=1.5):
+ e = random.uniform(lower, upper)
+ return ImageEnhance.Brightness(img).enhance(e)
+
+ def random_contrast(img, lower=0.5, upper=1.5):
+ e = random.uniform(lower, upper)
+ return ImageEnhance.Contrast(img).enhance(e)
+
+ def random_color(img, lower=0.5, upper=1.5):
+ e = random.uniform(lower, upper)
+ return ImageEnhance.Color(img).enhance(e)
+
+ ops = [random_brightness, random_contrast, random_color]
+ random.shuffle(ops)
+
+ img = ops[0](img)
+ img = ops[1](img)
+ img = ops[2](img)
+
+ return img
+
+
+def process_image(sample, mode, color_jitter, rotate):
+ img_path = sample[0]
+
+ img = Image.open(img_path)
+ if mode == 'train':
+ if rotate: img = rotate_image(img)
+ img = random_crop(img, DATA_DIM)
+ else:
+ img = resize_short(img, target_size=256)
+ img = crop_image(img, target_size=DATA_DIM, center=True)
+ if mode == 'train':
+ if color_jitter:
+ img = distort_color(img)
+ if random.randint(0, 1) == 1:
+ img = img.transpose(Image.FLIP_LEFT_RIGHT)
+
+ if img.mode != 'RGB':
+ img = img.convert('RGB')
+
+ img = np.array(img).astype('float32').transpose((2, 0, 1)) / 255
+ img -= img_mean
+ img /= img_std
+
+ if mode == 'train' or mode == 'val':
+ return img, sample[1]
+ elif mode == 'test':
+ return [img]
+
+
+class XmapEndSignal():
+ pass
+
+
+def xmap_readers(mapper,
+ reader,
+ process_num,
+ buffer_size,
+ order=False,
+ print_queue_state=True):
+ end = XmapEndSignal()
+
+ # define a worker to read samples from reader to in_queue
+ def read_worker(reader, in_queue):
+ for i in reader():
+ in_queue.put(i)
+ in_queue.put(end)
+
+ # define a worker to read samples from reader to in_queue with order flag
+ def order_read_worker(reader, in_queue, file_queue):
+ in_order = 0
+ for i in reader():
+ in_queue.put((in_order, i))
+ in_order += 1
+ in_queue.put(end)
+
+ # define a worker to handle samples from in_queue by mapper
+ # and put mapped samples into out_queue
+ def handle_worker(in_queue, out_queue, mapper):
+ sample = in_queue.get()
+ while not isinstance(sample, XmapEndSignal):
+ r = mapper(sample)
+ out_queue.put(r)
+ sample = in_queue.get()
+ in_queue.put(end)
+ out_queue.put(end)
+
+ # define a worker to handle samples from in_queue by mapper
+ # and put mapped samples into out_queue by order
+ def order_handle_worker(in_queue, out_queue, mapper, out_order):
+ ins = in_queue.get()
+ while not isinstance(ins, XmapEndSignal):
+ order, sample = ins
+ r = mapper(sample)
+ while order != out_order[0]:
+ pass
+ out_queue.put(r)
+ out_order[0] += 1
+ ins = in_queue.get()
+ in_queue.put(end)
+ out_queue.put(end)
+
+ def xreader():
+ file_queue = Queue()
+ in_queue = Queue(buffer_size)
+ out_queue = Queue(buffer_size)
+ out_order = [0]
+ # start a read worker in a thread
+ target = order_read_worker if order else read_worker
+ t = Thread(target=target, args=(reader, in_queue))
+ t.daemon = True
+ t.start()
+ # start several handle_workers
+ target = order_handle_worker if order else handle_worker
+ args = (in_queue, out_queue, mapper, out_order) if order else (
+ in_queue, out_queue, mapper)
+ workers = []
+ for i in xrange(process_num):
+ worker = Thread(target=target, args=args)
+ worker.daemon = True
+ workers.append(worker)
+ for w in workers:
+ w.start()
+
+ sample = out_queue.get()
+ start_t = time.time()
+ while not isinstance(sample, XmapEndSignal):
+ yield sample
+ sample = out_queue.get()
+ if time.time() - start_t > 3:
+ if print_queue_state:
+ print("queue sizes: ", in_queue.qsize(), out_queue.qsize())
+ start_t = time.time()
+ finish = 1
+ while finish < process_num:
+ sample = out_queue.get()
+ if isinstance(sample, XmapEndSignal):
+ finish += 1
+ else:
+ yield sample
+
+ return xreader
+
+
+def _reader_creator(file_list,
+ mode,
+ shuffle=False,
+ color_jitter=False,
+ rotate=False,
+ xmap=True):
+ def reader():
+ with open(file_list) as flist:
+ full_lines = [line.strip() for line in flist]
+ if shuffle:
+ random.shuffle(full_lines)
+ if mode == 'train':
+ trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
+ trainer_count = int(os.getenv("PADDLE_TRAINERS", "1"))
+ per_node_lines = len(full_lines) / trainer_count
+ lines = full_lines[trainer_id * per_node_lines:(trainer_id + 1)
+ * per_node_lines]
+ print(
+ "read images from %d, length: %d, lines length: %d, total: %d"
+ % (trainer_id * per_node_lines, per_node_lines, len(lines),
+ len(full_lines)))
+ else:
+ lines = full_lines
+
+ for line in lines:
+ if mode == 'train':
+ img_path, label = line.split()
+ img_path = img_path.replace("JPEG", "jpeg")
+ img_path = os.path.join(DATA_DIR, "train", img_path)
+ yield (img_path, int(label))
+ elif mode == 'val':
+ img_path, label = line.split()
+ img_path = img_path.replace("JPEG", "jpeg")
+ img_path = os.path.join(DATA_DIR, "val", img_path)
+ yield (img_path, int(label))
+ elif mode == 'test':
+ img_path = os.path.join(DATA_DIR, line)
+ yield [img_path]
+
+ mapper = functools.partial(
+ process_image, mode=mode, color_jitter=color_jitter, rotate=rotate)
+
+ return paddle.reader.xmap_readers(mapper, reader, THREAD, BUF_SIZE)
+
+
+def load_raw_image_uint8(sample):
+ img_arr = np.array(Image.open(sample[0])).astype('int64')
+ return img_arr, int(sample[1])
+
+
+def train_raw(file_list=TRAIN_LIST, shuffle=True):
+ def reader():
+ with open(file_list) as flist:
+ full_lines = [line.strip() for line in flist]
+ if shuffle:
+ random.shuffle(full_lines)
+
+ trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
+ trainer_count = int(os.getenv("PADDLE_TRAINERS"))
+ per_node_lines = len(full_lines) / trainer_count
+ lines = full_lines[trainer_id * per_node_lines:(trainer_id + 1) *
+ per_node_lines]
+ print("read images from %d, length: %d, lines length: %d, total: %d"
+ % (trainer_id * per_node_lines, per_node_lines, len(lines),
+ len(full_lines)))
+
+ for line in lines:
+ img_path, label = line.split()
+ img_path = img_path.replace("JPEG", "jpeg")
+ img_path = os.path.join(DATA_DIR, "train", img_path)
+ yield (img_path, int(label))
+
+ return paddle.reader.xmap_readers(load_raw_image_uint8, reader, THREAD,
+ BUF_SIZE)
+
+
+def train(file_list=TRAIN_LIST, xmap=True):
+ return _reader_creator(
+ file_list,
+ 'train',
+ shuffle=True,
+ color_jitter=False,
+ rotate=False,
+ xmap=xmap)
+
+
+def val(file_list=TEST_LIST, xmap=True):
+ return _reader_creator(file_list, 'val', shuffle=False, xmap=xmap)
+
+
+def test(file_list=TEST_LIST):
+ return _reader_creator(file_list, 'test', shuffle=False)
+
+
+if __name__ == "__main__":
+ c = 0
+ start_t = time.time()
+ for d in train()():
+ c += 1
+ if c >= 10000:
+ break
+ spent = time.time() - start_t
+ print("read 10000 speed: ", 10000 / spent, spent)
diff --git a/fluid/image_classification/images/resnet50_32gpus-acc1.png b/fluid/image_classification/images/resnet50_32gpus-acc1.png
new file mode 100644
index 0000000000000000000000000000000000000000..6d4c478743d0e5af0a9d727c76b433849c6a81dc
Binary files /dev/null and b/fluid/image_classification/images/resnet50_32gpus-acc1.png differ