未验证 提交 f90c7865 编写于 作者: W Wu Yi 提交者: GitHub

Benchmark tool for imgnet (#12305)

* support test using executor without reader

* run imgnet

* update fluid benchmark

* wip

* update

* update all models

* support pyreader

* update

* clean up

* make profile batches contollable

* update API.spec

* update scripts

* clean dockerfile

* update

* clean comments

* add scope argument docstring

* use num_trainers to determine nccl init comms
上级 8a6b4640
...@@ -11,6 +11,7 @@ RUN ln -s /usr/lib/x86_64-linux-gnu/libcudnn.so.7 /usr/lib/libcudnn.so && ln -s ...@@ -11,6 +11,7 @@ RUN ln -s /usr/lib/x86_64-linux-gnu/libcudnn.so.7 /usr/lib/libcudnn.so && ln -s
# Add "ENV http_proxy=http://ip:port" if your download is slow, and don't forget to unset it at runtime. # Add "ENV http_proxy=http://ip:port" if your download is slow, and don't forget to unset it at runtime.
# exmaple: unset http_proxy && unset https_proxy && python fluid_benchmark.py ... # exmaple: unset http_proxy && unset https_proxy && python fluid_benchmark.py ...
RUN pip install -U pip RUN pip install -U pip
RUN pip install -U kubernetes paddlepaddle RUN pip install -U kubernetes paddlepaddle
...@@ -27,5 +28,6 @@ ADD *.whl / ...@@ -27,5 +28,6 @@ ADD *.whl /
RUN pip install /*.whl && rm -f /*.whl RUN pip install /*.whl && rm -f /*.whl
ENV LD_LIBRARY_PATH=/usr/local/lib ENV LD_LIBRARY_PATH=/usr/local/lib
ADD fluid_benchmark.py recordio_converter.py args.py recordio_converter.py run.sh run_fluid_benchmark.sh /workspace/ ADD fluid_benchmark.py recordio_converter.py args.py recordio_converter.py run.sh run_fluid_benchmark.sh imagenet_reader.py /workspace/
ADD models/ /workspace/models/ ADD models/ /workspace/models/
...@@ -17,7 +17,8 @@ import argparse ...@@ -17,7 +17,8 @@ import argparse
__all__ = ['parse_args', ] __all__ = ['parse_args', ]
BENCHMARK_MODELS = [ BENCHMARK_MODELS = [
"machine_translation", "resnet", "vgg", "mnist", "stacked_dynamic_lstm" "machine_translation", "resnet", "se_resnext", "vgg", "mnist",
"stacked_dynamic_lstm", "resnet_with_preprocess"
] ]
...@@ -67,12 +68,12 @@ def parse_args(): ...@@ -67,12 +68,12 @@ def parse_args():
'--cpus', '--cpus',
type=int, type=int,
default=1, default=1,
help='If cpus > 1, will use ParallelDo to run, else use Executor.') help='If cpus > 1, will set ParallelExecutor to use multiple threads.')
parser.add_argument( parser.add_argument(
'--data_set', '--data_set',
type=str, type=str,
default='flowers', default='flowers',
choices=['cifar10', 'flowers'], choices=['cifar10', 'flowers', 'imagenet'],
help='Optional dataset for benchmark.') help='Optional dataset for benchmark.')
parser.add_argument( parser.add_argument(
'--infer_only', action='store_true', help='If set, run forward only.') '--infer_only', action='store_true', help='If set, run forward only.')
...@@ -122,6 +123,11 @@ def parse_args(): ...@@ -122,6 +123,11 @@ def parse_args():
type=str, type=str,
default="", default="",
help='Directory that contains all the training recordio files.') help='Directory that contains all the training recordio files.')
parser.add_argument(
'--test_data_path',
type=str,
default="",
help='Directory that contains all the test data (NOT recordio).')
parser.add_argument( parser.add_argument(
'--use_inference_transpiler', '--use_inference_transpiler',
action='store_true', action='store_true',
...@@ -130,5 +136,9 @@ def parse_args(): ...@@ -130,5 +136,9 @@ def parse_args():
'--no_random', '--no_random',
action='store_true', action='store_true',
help='If set, keep the random seed and do not shuffle the data.') help='If set, keep the random seed and do not shuffle the data.')
parser.add_argument(
'--use_lars',
action='store_true',
help='If set, use lars for optimizers, ONLY support resnet module.')
args = parser.parse_args() args = parser.parse_args()
return args return args
...@@ -16,6 +16,7 @@ import argparse ...@@ -16,6 +16,7 @@ import argparse
import cProfile import cProfile
import time import time
import os import os
import traceback
import numpy as np import numpy as np
...@@ -27,7 +28,7 @@ import paddle.fluid.transpiler.distribute_transpiler as distribute_transpiler ...@@ -27,7 +28,7 @@ import paddle.fluid.transpiler.distribute_transpiler as distribute_transpiler
from args import * from args import *
def append_nccl2_prepare(trainer_id): def append_nccl2_prepare(trainer_id, startup_prog):
if trainer_id >= 0: if trainer_id >= 0:
# append gen_nccl_id at the end of startup program # append gen_nccl_id at the end of startup program
trainer_id = int(os.getenv("PADDLE_TRAINER_ID")) trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
...@@ -40,11 +41,11 @@ def append_nccl2_prepare(trainer_id): ...@@ -40,11 +41,11 @@ def append_nccl2_prepare(trainer_id):
current_endpoint = os.getenv("PADDLE_CURRENT_IP") + ":" + port current_endpoint = os.getenv("PADDLE_CURRENT_IP") + ":" + port
worker_endpoints.remove(current_endpoint) worker_endpoints.remove(current_endpoint)
nccl_id_var = fluid.default_startup_program().global_block().create_var( nccl_id_var = startup_prog.global_block().create_var(
name="NCCLID", name="NCCLID",
persistable=True, persistable=True,
type=fluid.core.VarDesc.VarType.RAW) type=fluid.core.VarDesc.VarType.RAW)
fluid.default_startup_program().global_block().append_op( startup_prog.global_block().append_op(
type="gen_nccl_id", type="gen_nccl_id",
inputs={}, inputs={},
outputs={"NCCLID": nccl_id_var}, outputs={"NCCLID": nccl_id_var},
...@@ -59,7 +60,7 @@ def append_nccl2_prepare(trainer_id): ...@@ -59,7 +60,7 @@ def append_nccl2_prepare(trainer_id):
"nccl-based dist train.") "nccl-based dist train.")
def dist_transpile(trainer_id, args): def dist_transpile(trainer_id, args, train_prog, startup_prog):
if trainer_id < 0: if trainer_id < 0:
return None, None return None, None
...@@ -80,133 +81,69 @@ def dist_transpile(trainer_id, args): ...@@ -80,133 +81,69 @@ def dist_transpile(trainer_id, args):
# the role, should be either PSERVER or TRAINER # the role, should be either PSERVER or TRAINER
training_role = os.getenv("PADDLE_TRAINING_ROLE") training_role = os.getenv("PADDLE_TRAINING_ROLE")
t = distribute_transpiler.DistributeTranspiler() config = distribute_transpiler.DistributeTranspilerConfig()
config.slice_var_up = not args.no_split_var
t = distribute_transpiler.DistributeTranspiler(config=config)
t.transpile( t.transpile(
trainer_id, trainer_id,
# NOTE: *MUST* use train_prog, for we are using with guard to
# generate different program for train and test.
program=train_prog,
pservers=pserver_endpoints, pservers=pserver_endpoints,
trainers=trainers, trainers=trainers,
sync_mode=not args.async_mode) sync_mode=not args.async_mode)
if training_role == "PSERVER": if training_role == "PSERVER":
pserver_program = t.get_pserver_program(current_endpoint) pserver_program = t.get_pserver_program(current_endpoint)
pserver_startup_program = t.get_startup_program(current_endpoint, pserver_startup_program = t.get_startup_program(
pserver_program) current_endpoint, pserver_program, startup_program=startup_prog)
return pserver_program, pserver_startup_program return pserver_program, pserver_startup_program
elif training_role == "TRAINER": elif training_role == "TRAINER":
train_program = t.get_trainer_program() train_program = t.get_trainer_program()
return train_program, fluid.default_startup_program() return train_program, startup_prog
else: else:
raise ValueError( raise ValueError(
'PADDLE_TRAINING_ROLE environment variable must be either TRAINER or PSERVER' 'PADDLE_TRAINING_ROLE environment variable must be either TRAINER or PSERVER'
) )
def test(exe, inference_program, test_reader, feeder, batch_acc): def test_parallel(exe, test_args, args, test_prog, feeder):
accuracy_evaluator = fluid.metrics.Accuracy() acc_evaluators = []
for batch_id, data in enumerate(test_reader()): for i in xrange(len(test_args[2])):
acc = exe.run(inference_program, acc_evaluators.append(fluid.metrics.Accuracy())
feed=feeder.feed(data),
fetch_list=[batch_acc])
accuracy_evaluator.update(value=np.array(acc), weight=len(data))
return accuracy_evaluator.eval() to_fetch = [v.name for v in test_args[2]]
if args.use_reader_op:
test_args[4].start()
# TODO(wuyi): replace train, train_parallel, test functions with new trainer
# API once it is ready.
def train(avg_loss, infer_prog, optimizer, train_reader, test_reader, batch_acc,
args, train_prog, startup_prog):
if os.getenv("PADDLE_TRAINING_ROLE") == "PSERVER":
place = core.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_prog)
exe.run(train_prog)
return
if args.use_fake_data:
raise Exception(
"fake data is not supported in single GPU test for now.")
place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
exe = fluid.Executor(place)
exe.run(startup_prog)
# Use inference_transpiler to speedup
if not args.use_reader_op:
feed_var_list = [
var for var in train_prog.global_block().vars.itervalues()
if var.is_data
]
feeder = fluid.DataFeeder(feed_var_list, place)
iters, num_samples, start_time = 0, 0, time.time()
for pass_id in range(args.pass_num):
train_losses = []
if not args.use_reader_op:
reader_generator = train_reader()
batch_id = 0
data = None
while True: while True:
if not args.use_reader_op: try:
data = next(reader_generator, None) acc_rets = exe.run(fetch_list=to_fetch)
if data == None: for i, e in enumerate(acc_evaluators):
break e.update(
if iters == args.iterations: value=np.array(acc_rets[i]), weight=args.batch_size)
reader_generator.close() except fluid.core.EOFException as eof:
test_args[4].reset()
break break
if iters == args.skip_batch_num: else:
start_time = time.time() for batch_id, data in enumerate(test_args[3]()):
num_samples = 0 acc_rets = exe.run(feed=feeder.feed(data), fetch_list=to_fetch)
for i, e in enumerate(acc_evaluators):
e.update(value=np.array(acc_rets[i]), weight=len(data))
if args.use_reader_op: return [e.eval() for e in acc_evaluators]
try:
loss = exe.run(train_prog, fetch_list=[avg_loss])
except fluid.core.EnforceNotMet as ex:
break
else:
loss = exe.run(train_prog,
feed=feeder.feed(data),
fetch_list=[avg_loss])
iters += 1
batch_id += 1
# FIXME(wuyi): For use_reader_op, if the current
# pass is not the last, the last batch of this pass
# is also equal to args.batch_size.
if args.use_reader_op:
num_samples += args.batch_size * args.gpus
else:
num_samples += len(data)
train_losses.append(loss)
print("Pass: %d, Iter: %d, Loss: %f\n" %
(pass_id, iters, np.mean(train_losses)))
print_train_time(start_time, time.time(), num_samples)
print("Pass: %d, Loss: %f" % (pass_id, np.mean(train_losses))),
# evaluation
if not args.no_test and batch_acc and not args.use_reader_op:
if args.use_inference_transpiler:
t = fluid.InferenceTranspiler()
t.transpile(infer_prog, place)
pass_test_acc = test(exe, infer_prog, test_reader, feeder,
batch_acc)
print(", Test Accuracy: %f" % pass_test_acc)
print("\n")
# TODO(wuyi): add warmup passes to get better perf data.
exit(0)
# TODO(wuyi): replace train, train_parallel, test functions with new trainer # NOTE: only need to benchmark using parallelexe
# API once it is ready. def train_parallel(train_args, test_args, args, train_prog, test_prog,
def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, startup_prog, nccl_id_var, num_trainers, trainer_id):
batch_acc, args, train_prog, startup_prog, nccl_id_var, over_all_start = time.time()
num_trainers, trainer_id):
place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0) place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
feeder = None
if not args.use_reader_op: if not args.use_reader_op:
feed_var_list = [ feed_var_list = [
var for var in train_prog.global_block().vars.itervalues() var for var in train_prog.global_block().vars.itervalues()
if var.is_data if var.is_data
] ]
feeder = fluid.DataFeeder(feed_var_list, place) feeder = fluid.DataFeeder(feed_var_list, place)
# generate fake: # generate fake:
if args.use_fake_data: if args.use_fake_data:
for var in feed_var_list: for var in feed_var_list:
...@@ -230,63 +167,110 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader, ...@@ -230,63 +167,110 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
startup_exe = fluid.Executor(place) startup_exe = fluid.Executor(place)
startup_exe.run(startup_prog) startup_exe.run(startup_prog)
strategy = fluid.ExecutionStrategy() strategy = fluid.ExecutionStrategy()
strategy.num_threads = 1 strategy.num_threads = args.cpus
strategy.allow_op_delay = False strategy.allow_op_delay = False
avg_loss = train_args[0]
if args.update_method == "pserver":
# parameter server mode distributed training, merge
# gradients on local server, do not initialize
# ParallelExecutor with multi server all-reduce mode.
num_trainers = 1
trainer_id = 0
exe = fluid.ParallelExecutor( exe = fluid.ParallelExecutor(
True, True,
avg_loss.name, avg_loss.name,
main_program=train_prog,
exec_strategy=strategy, exec_strategy=strategy,
num_trainers=num_trainers, num_trainers=num_trainers,
trainer_id=trainer_id) trainer_id=trainer_id)
if not args.no_test:
if args.update_method == "pserver":
test_scope = None
else:
# NOTE: use an empty scope to avoid test exe using NCCLID
test_scope = fluid.Scope()
test_exe = fluid.ParallelExecutor(
True, main_program=test_prog, share_vars_from=exe)
for pass_id in range(args.pass_num): for pass_id in range(args.pass_num):
num_samples = 0 num_samples = 0
iters = 0 iters = 0
start_time = time.time() start_time = time.time()
if not args.use_reader_op: if not args.use_reader_op:
reader_generator = train_reader() reader_generator = train_args[3]() #train_reader
batch_id = 0 batch_id = 0
data = None data = None
if args.use_reader_op:
train_args[4].start()
while True: while True:
if not args.use_reader_op: if not args.use_reader_op:
data = next(reader_generator, None) data = next(reader_generator, None)
if data == None: if data == None:
break break
if args.profile and batch_id == 5:
profiler.start_profiler("All")
profiler.reset_profiler()
elif args.profile and batch_id == 10:
print("profiling total time: ", time.time() - start_time)
profiler.stop_profiler("total", "/tmp/profile_%d_pass%d" %
(trainer_id, pass_id))
if iters == args.iterations: if iters == args.iterations:
reader_generator.close() reader_generator.close()
break break
if args.profile and pass_id == 0 and batch_id == 5:
profiler.start_profiler("All")
elif args.profile and pass_id == 0 and batch_id == 10:
profiler.stop_profiler("total", "/tmp/profile_%d" % trainer_id)
if iters == args.skip_batch_num: if iters == args.skip_batch_num:
start_time = time.time() start_time = time.time()
num_samples = 0 num_samples = 0
fetch_list = [avg_loss.name]
acc_name_list = [v.name for v in train_args[2]]
fetch_list.extend(acc_name_list)
if args.use_fake_data or args.use_reader_op: if args.use_fake_data or args.use_reader_op:
try: try:
loss, = exe.run([avg_loss.name])
fetch_ret = exe.run(fetch_list)
except fluid.core.EOFException as eof:
break
except fluid.core.EnforceNotMet as ex: except fluid.core.EnforceNotMet as ex:
traceback.print_exc()
break break
else: else:
loss, = exe.run([avg_loss.name], feed=feeder.feed(data)) fetch_ret = exe.run(fetch_list, feed=feeder.feed(data))
if args.use_reader_op: if args.use_reader_op:
num_samples += args.batch_size * args.gpus num_samples += args.batch_size * args.gpus
else: else:
num_samples += len(data) num_samples += len(data)
iters += 1 iters += 1
if batch_id % 1 == 0: if batch_id % 1 == 0:
print("Pass %d, batch %d, loss %s" % fetched_data = [np.mean(np.array(d)) for d in fetch_ret]
(pass_id, batch_id, np.array(loss))) print("Pass %d, batch %d, loss %s, accucacys: %s" %
(pass_id, batch_id, fetched_data[0], fetched_data[1:]))
batch_id += 1 batch_id += 1
print_train_time(start_time, time.time(), num_samples) print_train_time(start_time, time.time(), num_samples)
if not args.no_test and batch_acc and not args.use_reader_op: if args.use_reader_op:
# we have not implement record io for test train_args[4].reset() # reset reader handle
# skip test when use args.use_reader_op else:
test_acc = test(startup_exe, infer_prog, test_reader, feeder, del reader_generator
batch_acc)
print("Pass: %d, Test Accuracy: %f\n" % (pass_id, test_acc)) if not args.no_test and test_args[2]:
test_feeder = None
if not args.use_reader_op:
test_feed_var_list = [
var for var in test_prog.global_block().vars.itervalues()
if var.is_data
]
test_feeder = fluid.DataFeeder(test_feed_var_list, place)
test_ret = test_parallel(test_exe, test_args, args, test_prog,
test_feeder)
print("Pass: %d, Test Accuracy: %s\n" %
(pass_id, [np.mean(np.array(v)) for v in test_ret]))
print("total train time: ", time.time() - over_all_start)
def print_arguments(args): def print_arguments(args):
...@@ -328,44 +312,46 @@ def main(): ...@@ -328,44 +312,46 @@ def main():
if args.use_cprof: if args.use_cprof:
pr = cProfile.Profile() pr = cProfile.Profile()
pr.enable() pr.enable()
model_def = __import__("models.%s" % args.model, fromlist=["models"]) model_def = __import__("models.%s" % args.model, fromlist=["models"])
train_args = list(model_def.get_model(args))
train_args.append(args) train_prog = fluid.Program()
# Run optimizer.minimize(avg_loss) test_prog = fluid.Program()
train_args[2].minimize(train_args[0]) startup_prog = fluid.Program()
if args.memory_optimize:
fluid.memory_optimize(fluid.default_main_program()) train_args = list(model_def.get_model(args, True, train_prog, startup_prog))
test_args = list(model_def.get_model(args, False, test_prog, startup_prog))
all_args = [train_args, test_args, args]
if args.update_method == "pserver": if args.update_method == "pserver":
train_prog, startup_prog = dist_transpile(trainer_id, args) train_prog, startup_prog = dist_transpile(trainer_id, args, train_prog,
startup_prog)
if not train_prog: if not train_prog:
raise Exception( raise Exception(
"Must configure correct environments to run dist train.") "Must configure correct environments to run dist train.")
train_args.extend([train_prog, startup_prog]) all_args.extend([train_prog, test_prog, startup_prog])
if args.gpus > 1 and os.getenv("PADDLE_TRAINING_ROLE") == "TRAINER": if args.gpus > 1 and os.getenv("PADDLE_TRAINING_ROLE") == "TRAINER":
train_args.extend([nccl_id_var, num_trainers, trainer_id]) all_args.extend([nccl_id_var, num_trainers, trainer_id])
train_parallel(*train_args) train_parallel(*all_args)
train(*train_args) elif os.getenv("PADDLE_TRAINING_ROLE") == "PSERVER":
# start pserver with Executor
server_exe = fluid.Executor(fluid.CPUPlace())
server_exe.run(startup_prog)
server_exe.run(train_prog)
exit(0) exit(0)
# for other update methods, use default programs # for other update methods, use default programs
train_args.append(fluid.default_main_program()) all_args.extend([train_prog, test_prog, startup_prog])
train_args.append(fluid.default_startup_program())
if args.update_method == "nccl2": if args.update_method == "nccl2":
nccl_id_var, num_trainers, trainer_id = append_nccl2_prepare(trainer_id) nccl_id_var, num_trainers, trainer_id = append_nccl2_prepare(
if args.gpus == 1: trainer_id, startup_prog)
# NOTE: parallel executor use profiler interanlly
if args.use_nvprof and args.device == 'GPU': if args.device == "CPU":
with profiler.cuda_profiler("cuda_profiler.txt", 'csv') as nvprof: raise Exception("Only support GPU perf with parallel exe")
train(*train_args) all_args.extend([nccl_id_var, num_trainers, trainer_id])
else: train_parallel(*all_args)
train(*train_args)
else:
if args.device == "CPU":
raise Exception("Only support GPU perf with parallel exe")
train_args.extend([nccl_id_var, num_trainers, trainer_id])
train_parallel(*train_args)
if __name__ == "__main__": if __name__ == "__main__":
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import math
import random
import functools
import numpy as np
from threading import Thread
import subprocess
import time
from Queue import Queue
import paddle
from PIL import Image, ImageEnhance
random.seed(0)
DATA_DIM = 224
THREAD = int(os.getenv("PREPROCESS_THREADS", "10"))
BUF_SIZE = 5120
DATA_DIR = '/mnt/ImageNet'
TRAIN_LIST = '/mnt/ImageNet/train.txt'
TEST_LIST = '/mnt/ImageNet/val.txt'
img_mean = np.array([0.485, 0.456, 0.406]).reshape((3, 1, 1))
img_std = np.array([0.229, 0.224, 0.225]).reshape((3, 1, 1))
def resize_short(img, target_size):
percent = float(target_size) / min(img.size[0], img.size[1])
resized_width = int(round(img.size[0] * percent))
resized_height = int(round(img.size[1] * percent))
img = img.resize((resized_width, resized_height), Image.LANCZOS)
return img
def crop_image(img, target_size, center):
width, height = img.size
size = target_size
if center == True:
w_start = (width - size) / 2
h_start = (height - size) / 2
else:
w_start = random.randint(0, width - size)
h_start = random.randint(0, height - size)
w_end = w_start + size
h_end = h_start + size
img = img.crop((w_start, h_start, w_end, h_end))
return img
def random_crop(img, size, scale=[0.08, 1.0], ratio=[3. / 4., 4. / 3.]):
aspect_ratio = math.sqrt(random.uniform(*ratio))
w = 1. * aspect_ratio
h = 1. / aspect_ratio
bound = min((float(img.size[0]) / img.size[1]) / (w**2),
(float(img.size[1]) / img.size[0]) / (h**2))
scale_max = min(scale[1], bound)
scale_min = min(scale[0], bound)
target_area = img.size[0] * img.size[1] * random.uniform(scale_min,
scale_max)
target_size = math.sqrt(target_area)
w = int(target_size * w)
h = int(target_size * h)
i = random.randint(0, img.size[0] - w)
j = random.randint(0, img.size[1] - h)
img = img.crop((i, j, i + w, j + h))
img = img.resize((size, size), Image.LANCZOS)
return img
def rotate_image(img):
angle = random.randint(-10, 10)
img = img.rotate(angle)
return img
def distort_color(img):
def random_brightness(img, lower=0.5, upper=1.5):
e = random.uniform(lower, upper)
return ImageEnhance.Brightness(img).enhance(e)
def random_contrast(img, lower=0.5, upper=1.5):
e = random.uniform(lower, upper)
return ImageEnhance.Contrast(img).enhance(e)
def random_color(img, lower=0.5, upper=1.5):
e = random.uniform(lower, upper)
return ImageEnhance.Color(img).enhance(e)
ops = [random_brightness, random_contrast, random_color]
random.shuffle(ops)
img = ops[0](img)
img = ops[1](img)
img = ops[2](img)
return img
def process_image(sample, mode, color_jitter, rotate):
img_path = sample[0]
img = Image.open(img_path)
if mode == 'train':
if rotate: img = rotate_image(img)
img = random_crop(img, DATA_DIM)
else:
img = resize_short(img, target_size=256)
img = crop_image(img, target_size=DATA_DIM, center=True)
if mode == 'train':
if color_jitter:
img = distort_color(img)
if random.randint(0, 1) == 1:
img = img.transpose(Image.FLIP_LEFT_RIGHT)
if img.mode != 'RGB':
img = img.convert('RGB')
img = np.array(img).astype('float32').transpose((2, 0, 1)) / 255
img -= img_mean
img /= img_std
if mode == 'train' or mode == 'val':
return img, sample[1]
elif mode == 'test':
return [img]
class XmapEndSignal():
pass
def xmap_readers(mapper,
reader,
process_num,
buffer_size,
order=False,
print_queue_state=True):
end = XmapEndSignal()
# define a worker to read samples from reader to in_queue
def read_worker(reader, in_queue):
for i in reader():
in_queue.put(i)
in_queue.put(end)
# define a worker to read samples from reader to in_queue with order flag
def order_read_worker(reader, in_queue, file_queue):
in_order = 0
for i in reader():
in_queue.put((in_order, i))
in_order += 1
in_queue.put(end)
# define a worker to handle samples from in_queue by mapper
# and put mapped samples into out_queue
def handle_worker(in_queue, out_queue, mapper):
sample = in_queue.get()
while not isinstance(sample, XmapEndSignal):
r = mapper(sample)
out_queue.put(r)
sample = in_queue.get()
in_queue.put(end)
out_queue.put(end)
# define a worker to handle samples from in_queue by mapper
# and put mapped samples into out_queue by order
def order_handle_worker(in_queue, out_queue, mapper, out_order):
ins = in_queue.get()
while not isinstance(ins, XmapEndSignal):
order, sample = ins
r = mapper(sample)
while order != out_order[0]:
pass
out_queue.put(r)
out_order[0] += 1
ins = in_queue.get()
in_queue.put(end)
out_queue.put(end)
def xreader():
file_queue = Queue()
in_queue = Queue(buffer_size)
out_queue = Queue(buffer_size)
out_order = [0]
# start a read worker in a thread
target = order_read_worker if order else read_worker
t = Thread(target=target, args=(reader, in_queue))
t.daemon = True
t.start()
# start several handle_workers
target = order_handle_worker if order else handle_worker
args = (in_queue, out_queue, mapper, out_order) if order else (
in_queue, out_queue, mapper)
workers = []
for i in xrange(process_num):
worker = Thread(target=target, args=args)
worker.daemon = True
workers.append(worker)
for w in workers:
w.start()
sample = out_queue.get()
start_t = time.time()
while not isinstance(sample, XmapEndSignal):
yield sample
sample = out_queue.get()
if time.time() - start_t > 3:
if print_queue_state:
print("queue sizes: ", in_queue.qsize(), out_queue.qsize())
start_t = time.time()
finish = 1
while finish < process_num:
sample = out_queue.get()
if isinstance(sample, XmapEndSignal):
finish += 1
else:
yield sample
return xreader
def _reader_creator(file_list,
mode,
shuffle=False,
color_jitter=False,
rotate=False,
xmap=True):
def reader():
with open(file_list) as flist:
full_lines = [line.strip() for line in flist]
if shuffle:
random.shuffle(full_lines)
if mode == 'train':
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
trainer_count = int(os.getenv("PADDLE_TRAINERS"))
per_node_lines = len(full_lines) / trainer_count
lines = full_lines[trainer_id * per_node_lines:(trainer_id + 1)
* per_node_lines]
print(
"read images from %d, length: %d, lines length: %d, total: %d"
% (trainer_id * per_node_lines, per_node_lines, len(lines),
len(full_lines)))
else:
lines = full_lines
for line in lines:
if mode == 'train':
img_path, label = line.split()
img_path = img_path.replace("JPEG", "jpeg")
img_path = os.path.join(DATA_DIR, "train", img_path)
yield (img_path, int(label))
elif mode == 'val':
img_path, label = line.split()
img_path = img_path.replace("JPEG", "jpeg")
img_path = os.path.join(DATA_DIR, "val", img_path)
yield (img_path, int(label))
elif mode == 'test':
img_path = os.path.join(DATA_DIR, line)
yield [img_path]
mapper = functools.partial(
process_image, mode=mode, color_jitter=color_jitter, rotate=rotate)
return paddle.reader.xmap_readers(mapper, reader, THREAD, BUF_SIZE)
def load_raw_image_uint8(sample):
img_arr = np.array(Image.open(sample[0])).astype('int64')
return img_arr, int(sample[1])
def train_raw(file_list=TRAIN_LIST, shuffle=True):
def reader():
with open(file_list) as flist:
full_lines = [line.strip() for line in flist]
if shuffle:
random.shuffle(full_lines)
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
trainer_count = int(os.getenv("PADDLE_TRAINERS"))
per_node_lines = len(full_lines) / trainer_count
lines = full_lines[trainer_id * per_node_lines:(trainer_id + 1) *
per_node_lines]
print("read images from %d, length: %d, lines length: %d, total: %d"
% (trainer_id * per_node_lines, per_node_lines, len(lines),
len(full_lines)))
for line in lines:
img_path, label = line.split()
img_path = img_path.replace("JPEG", "jpeg")
img_path = os.path.join(DATA_DIR, "train", img_path)
yield (img_path, int(label))
return paddle.reader.xmap_readers(load_raw_image_uint8, reader, THREAD,
BUF_SIZE)
def train(file_list=TRAIN_LIST, xmap=True):
return _reader_creator(
file_list,
'train',
shuffle=True,
color_jitter=False,
rotate=False,
xmap=xmap)
def val(file_list=TEST_LIST, xmap=True):
return _reader_creator(file_list, 'val', shuffle=False, xmap=xmap)
def test(file_list=TEST_LIST):
return _reader_creator(file_list, 'test', shuffle=False)
if __name__ == "__main__":
c = 0
start_t = time.time()
for d in train()():
c += 1
if c >= 10000:
break
spent = time.time() - start_t
print("read 10000 speed: ", 10000 / spent, spent)
...@@ -163,6 +163,19 @@ def gen_job(): ...@@ -163,6 +163,19 @@ def gen_job():
volumes.append({"name": "dshm", "emptyDir": {"medium": "Memory"}}) volumes.append({"name": "dshm", "emptyDir": {"medium": "Memory"}})
volumeMounts.append({"mountPath": "/dev/shm", "name": "dshm"}) volumeMounts.append({"mountPath": "/dev/shm", "name": "dshm"})
# add ceph volumes
volumes.append({
"name": "ceph-data",
"cephfs": {
"monitors": ["192.168.16.23:6789"],
"secretRef": {
"name": "ceph-secret"
},
"user": "admin",
}
})
volumeMounts.append({"mountPath": "/mnt/data", "name": "ceph-data"})
tn["spec"]["template"]["spec"]["volumes"] = volumes tn["spec"]["template"]["spec"]["volumes"] = volumes
tn_container["volumeMounts"] = volumeMounts tn_container["volumeMounts"] = volumeMounts
......
...@@ -13,5 +13,6 @@ ...@@ -13,5 +13,6 @@
# limitations under the License. # limitations under the License.
__all__ = [ __all__ = [
"machine_translation", "resnet", "vgg", "mnist", "stacked_dynamic_lstm" "machine_translation", "resnet", "vgg", "mnist", "stacked_dynamic_lstm",
"resnet_with_preprocess"
] ]
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""seq2seq model for fluid.""" """seq2seq model for fluid."""
from __future__ import absolute_import from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
...@@ -181,7 +182,7 @@ def lodtensor_to_ndarray(lod_tensor): ...@@ -181,7 +182,7 @@ def lodtensor_to_ndarray(lod_tensor):
return ndarray return ndarray
def get_model(args): def get_model(args, is_train, main_prog, startup_prog):
if args.use_reader_op: if args.use_reader_op:
raise Exception("machine_translation do not support reader op for now.") raise Exception("machine_translation do not support reader op for now.")
embedding_dim = 512 embedding_dim = 512
...@@ -190,30 +191,27 @@ def get_model(args): ...@@ -190,30 +191,27 @@ def get_model(args):
dict_size = 30000 dict_size = 30000
beam_size = 3 beam_size = 3
max_length = 250 max_length = 250
avg_cost, feeding_list = seq_to_seq_net(
embedding_dim,
encoder_size,
decoder_size,
dict_size,
dict_size,
False,
beam_size=beam_size,
max_length=max_length)
# clone from default main program
inference_program = fluid.default_main_program().clone()
optimizer = fluid.optimizer.Adam(learning_rate=args.learning_rate)
train_batch_generator = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.wmt14.train(dict_size), buf_size=1000),
batch_size=args.batch_size * args.gpus)
test_batch_generator = paddle.batch( with fluid.program_guard(main_prog, startup_prog):
with fluid.unique_name.guard():
avg_cost, feeding_list = seq_to_seq_net(
embedding_dim,
encoder_size,
decoder_size,
dict_size,
dict_size,
False,
beam_size=beam_size,
max_length=max_length)
if is_train:
optimizer = fluid.optimizer.Adam(learning_rate=args.learning_rate)
optimizer.minimize(avg_cost)
batch_generator = paddle.batch(
paddle.reader.shuffle( paddle.reader.shuffle(
paddle.dataset.wmt14.test(dict_size), buf_size=1000), paddle.dataset.wmt14.train(dict_size)
batch_size=args.batch_size) if is_train else paddle.dataset.wmt14.test(dict_size),
buf_size=1000),
batch_size=args.batch_size * args.gpus)
return avg_cost, inference_program, optimizer, train_batch_generator, \ return avg_cost, optimizer, [], batch_generator, None
test_batch_generator, None
...@@ -65,61 +65,50 @@ def cnn_model(data): ...@@ -65,61 +65,50 @@ def cnn_model(data):
return predict return predict
def get_model(args): def get_model(args, is_train, main_prog, startup_prog):
if args.use_reader_op: # NOTE: mnist is small, we don't implement data sharding yet.
filelist = [ filelist = [
os.path.join(args.data_path, f) for f in os.listdir(args.data_path) os.path.join(args.data_path, f) for f in os.listdir(args.data_path)
] ]
data_file = fluid.layers.open_files( with fluid.program_guard(main_prog, startup_prog):
filenames=filelist, if args.use_reader_op:
shapes=[[-1, 1, 28, 28], (-1, 1)], data_file_handle = fluid.layers.open_files(
lod_levels=[0, 0], filenames=filelist,
dtypes=["float32", "int64"], shapes=[[-1, 1, 28, 28], (-1, 1)],
thread_num=args.gpus, lod_levels=[0, 0],
pass_num=args.pass_num) dtypes=["float32", "int64"],
data_file = fluid.layers.double_buffer( thread_num=1,
fluid.layers.batch( pass_num=1)
data_file, batch_size=args.batch_size)) data_file = fluid.layers.double_buffer(
images, label = fluid.layers.read_file(data_file) fluid.layers.batch(
else: data_file_handle, batch_size=args.batch_size))
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype=DTYPE) with fluid.unique_name.guard():
label = fluid.layers.data(name='label', shape=[1], dtype='int64') if args.use_reader_op:
input, label = fluid.layers.read_file(data_file)
if args.device == 'CPU' and args.cpus > 1: else:
places = fluid.layers.get_places(args.cpus) images = fluid.layers.data(
pd = fluid.layers.ParallelDo(places) name='pixel', shape=[1, 28, 28], dtype='float32')
with pd.do(): label = fluid.layers.data(
predict = cnn_model(pd.read_input(images)) name='label', shape=[1], dtype='int64')
label = pd.read_input(label)
predict = cnn_model(images)
cost = fluid.layers.cross_entropy(input=predict, label=label) cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost) avg_cost = fluid.layers.mean(x=cost)
# Evaluator
batch_acc = fluid.layers.accuracy(input=predict, label=label) batch_acc = fluid.layers.accuracy(input=predict, label=label)
# Optimization
pd.write_output(avg_cost) if is_train:
pd.write_output(batch_acc) opt = fluid.optimizer.AdamOptimizer(
learning_rate=0.001, beta1=0.9, beta2=0.999)
avg_cost, batch_acc = pd() opt.minimize()
avg_cost = fluid.layers.mean(avg_cost) if args.memory_optimize:
batch_acc = fluid.layers.mean(batch_acc) fluid.memory_optimize(main_prog)
else:
# Train program
predict = cnn_model(images)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
# Evaluator
batch_acc = fluid.layers.accuracy(input=predict, label=label)
# inference program
inference_program = fluid.default_main_program().clone()
# Optimization
opt = fluid.optimizer.AdamOptimizer(
learning_rate=0.001, beta1=0.9, beta2=0.999)
# Reader # Reader
train_reader = paddle.batch( if is_train:
paddle.dataset.mnist.train(), batch_size=args.batch_size * args.gpus) reader = paddle.dataset.mnist.train()
test_reader = paddle.batch( else:
paddle.dataset.mnist.test(), batch_size=args.batch_size) reader = paddle.dataset.mnist.test()
return avg_cost, inference_program, opt, train_reader, test_reader, batch_acc batched_reader = paddle.batch(
reader, batch_size=args.batch_size * args.gpus)
return avg_cost, opt, [batch_acc], batched_reader, data_file_handle
...@@ -27,10 +27,17 @@ import paddle ...@@ -27,10 +27,17 @@ import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.core as core import paddle.fluid.core as core
import paddle.fluid.profiler as profiler import paddle.fluid.profiler as profiler
from recordio_converter import imagenet_train, imagenet_test # from recordio_converter import imagenet_train, imagenet_test
from imagenet_reader import train, val
def conv_bn_layer(input, ch_out, filter_size, stride, padding, act='relu'): def conv_bn_layer(input,
ch_out,
filter_size,
stride,
padding,
act='relu',
is_train=True):
conv1 = fluid.layers.conv2d( conv1 = fluid.layers.conv2d(
input=input, input=input,
filter_size=filter_size, filter_size=filter_size,
...@@ -39,29 +46,31 @@ def conv_bn_layer(input, ch_out, filter_size, stride, padding, act='relu'): ...@@ -39,29 +46,31 @@ def conv_bn_layer(input, ch_out, filter_size, stride, padding, act='relu'):
padding=padding, padding=padding,
act=None, act=None,
bias_attr=False) bias_attr=False)
return fluid.layers.batch_norm(input=conv1, act=act) return fluid.layers.batch_norm(input=conv1, act=act, is_test=not is_train)
def shortcut(input, ch_out, stride): def shortcut(input, ch_out, stride, is_train=True):
ch_in = input.shape[1] # if args.data_format == 'NCHW' else input.shape[-1] ch_in = input.shape[1] # if args.data_format == 'NCHW' else input.shape[-1]
if ch_in != ch_out: if ch_in != ch_out:
return conv_bn_layer(input, ch_out, 1, stride, 0, None) return conv_bn_layer(
input, ch_out, 1, stride, 0, None, is_train=is_train)
else: else:
return input return input
def basicblock(input, ch_out, stride): def basicblock(input, ch_out, stride, is_train=True):
short = shortcut(input, ch_out, stride) short = shortcut(input, ch_out, stride, is_train=is_train)
conv1 = conv_bn_layer(input, ch_out, 3, stride, 1) conv1 = conv_bn_layer(input, ch_out, 3, stride, 1, is_train=is_train)
conv2 = conv_bn_layer(conv1, ch_out, 3, 1, 1, act=None) conv2 = conv_bn_layer(conv1, ch_out, 3, 1, 1, act=None, is_train=is_train)
return fluid.layers.elementwise_add(x=short, y=conv2, act='relu') return fluid.layers.elementwise_add(x=short, y=conv2, act='relu')
def bottleneck(input, ch_out, stride): def bottleneck(input, ch_out, stride, is_train=True):
short = shortcut(input, ch_out * 4, stride) short = shortcut(input, ch_out * 4, stride, is_train=is_train)
conv1 = conv_bn_layer(input, ch_out, 1, stride, 0) conv1 = conv_bn_layer(input, ch_out, 1, stride, 0, is_train=is_train)
conv2 = conv_bn_layer(conv1, ch_out, 3, 1, 1) conv2 = conv_bn_layer(conv1, ch_out, 3, 1, 1, is_train=is_train)
conv3 = conv_bn_layer(conv2, ch_out * 4, 1, 1, 0, act=None) conv3 = conv_bn_layer(
conv2, ch_out * 4, 1, 1, 0, act=None, is_train=is_train)
return fluid.layers.elementwise_add(x=short, y=conv3, act='relu') return fluid.layers.elementwise_add(x=short, y=conv3, act='relu')
...@@ -72,7 +81,11 @@ def layer_warp(block_func, input, ch_out, count, stride): ...@@ -72,7 +81,11 @@ def layer_warp(block_func, input, ch_out, count, stride):
return res_out return res_out
def resnet_imagenet(input, class_dim, depth=50, data_format='NCHW'): def resnet_imagenet(input,
class_dim,
depth=50,
data_format='NCHW',
is_train=True):
cfg = { cfg = {
18: ([2, 2, 2, 1], basicblock), 18: ([2, 2, 2, 1], basicblock),
...@@ -115,8 +128,9 @@ def resnet_cifar10(input, class_dim, depth=32, data_format='NCHW'): ...@@ -115,8 +128,9 @@ def resnet_cifar10(input, class_dim, depth=32, data_format='NCHW'):
return out return out
def get_model(args): def _model_reader_dshape_classdim(args, is_train):
model = resnet_cifar10 model = resnet_cifar10
reader = None
if args.data_set == "cifar10": if args.data_set == "cifar10":
class_dim = 10 class_dim = 10
if args.data_format == 'NCHW': if args.data_format == 'NCHW':
...@@ -124,8 +138,10 @@ def get_model(args): ...@@ -124,8 +138,10 @@ def get_model(args):
else: else:
dshape = [32, 32, 3] dshape = [32, 32, 3]
model = resnet_cifar10 model = resnet_cifar10
train_reader = paddle.dataset.cifar.train10() if is_train:
test_reader = paddle.dataset.cifar.test10() reader = paddle.dataset.cifar.train10()
else:
reader = paddle.dataset.cifar.test10()
elif args.data_set == "flowers": elif args.data_set == "flowers":
class_dim = 102 class_dim = 102
if args.data_format == 'NCHW': if args.data_format == 'NCHW':
...@@ -133,8 +149,10 @@ def get_model(args): ...@@ -133,8 +149,10 @@ def get_model(args):
else: else:
dshape = [224, 224, 3] dshape = [224, 224, 3]
model = resnet_imagenet model = resnet_imagenet
train_reader = paddle.dataset.flowers.train() if is_train:
test_reader = paddle.dataset.flowers.test() reader = paddle.dataset.flowers.train()
else:
reader = paddle.dataset.flowers.test()
elif args.data_set == "imagenet": elif args.data_set == "imagenet":
class_dim = 1000 class_dim = 1000
if args.data_format == 'NCHW': if args.data_format == 'NCHW':
...@@ -145,64 +163,89 @@ def get_model(args): ...@@ -145,64 +163,89 @@ def get_model(args):
if not args.data_path: if not args.data_path:
raise Exception( raise Exception(
"Must specify --data_path when training with imagenet") "Must specify --data_path when training with imagenet")
train_reader = imagenet_train(args.data_path) if not args.use_reader_op:
test_reader = imagenet_test(args.data_path) if is_train:
reader = train()
if args.use_reader_op: else:
filelist = [ reader = val()
os.path.join(args.data_path, f) for f in os.listdir(args.data_path) else:
] if is_train:
data_file = fluid.layers.open_files( reader = train(xmap=False)
filenames=filelist, else:
shapes=[[-1] + dshape, (-1, 1)], reader = val(xmap=False)
lod_levels=[0, 0], return model, reader, dshape, class_dim
dtypes=["float32", "int64"],
thread_num=args.gpus,
pass_num=args.pass_num) def get_model(args, is_train, main_prog, startup_prog):
data_file = fluid.layers.double_buffer( model, reader, dshape, class_dim = _model_reader_dshape_classdim(args,
fluid.layers.batch( is_train)
data_file, batch_size=args.batch_size))
input, label = fluid.layers.read_file(data_file) pyreader = None
else: trainer_count = int(os.getenv("PADDLE_TRAINERS"))
input = fluid.layers.data(name='data', shape=dshape, dtype='float32') with fluid.program_guard(main_prog, startup_prog):
label = fluid.layers.data(name='label', shape=[1], dtype='int64') with fluid.unique_name.guard():
if args.use_reader_op:
if args.device == 'CPU' and args.cpus > 1: pyreader = fluid.layers.py_reader(
places = fluid.layers.get_places(args.cpus) capacity=args.batch_size * args.gpus,
pd = fluid.layers.ParallelDo(places) shapes=([-1] + dshape, (-1, 1)),
with pd.do(): dtypes=('float32', 'int64'),
predict = model(pd.read_input(input), class_dim) name="train_reader" if is_train else "test_reader",
label = pd.read_input(label) use_double_buffer=True)
input, label = fluid.layers.read_file(pyreader)
else:
input = fluid.layers.data(
name='data', shape=dshape, dtype='float32')
label = fluid.layers.data(
name='label', shape=[1], dtype='int64')
predict = model(input, class_dim, is_train=is_train)
cost = fluid.layers.cross_entropy(input=predict, label=label) cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost) avg_cost = fluid.layers.mean(x=cost)
batch_acc = fluid.layers.accuracy(input=predict, label=label)
pd.write_output(avg_cost)
pd.write_output(batch_acc)
avg_cost, batch_acc = pd() batch_acc1 = fluid.layers.accuracy(input=predict, label=label, k=1)
avg_cost = fluid.layers.mean(avg_cost) batch_acc5 = fluid.layers.accuracy(input=predict, label=label, k=5)
batch_acc = fluid.layers.mean(batch_acc)
# configure optimize
optimizer = None
if is_train:
if args.use_lars:
lars_decay = 1.0
else:
lars_decay = 0.0
total_images = 1281167 / trainer_count
step = int(total_images / args.batch_size + 1)
epochs = [30, 60, 80, 90]
bd = [step * e for e in epochs]
base_lr = args.learning_rate
lr = []
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
optimizer = fluid.optimizer.Momentum(
learning_rate=base_lr,
#learning_rate=fluid.layers.piecewise_decay(
# boundaries=bd, values=lr),
momentum=0.9,
regularization=fluid.regularizer.L2Decay(1e-4))
optimizer.minimize(avg_cost)
if args.memory_optimize:
fluid.memory_optimize(main_prog)
# config readers
if not args.use_reader_op:
batched_reader = paddle.batch(
reader if args.no_random else paddle.reader.shuffle(
reader, buf_size=5120),
batch_size=args.batch_size * args.gpus,
drop_last=True)
else: else:
predict = model(input, class_dim) batched_reader = None
cost = fluid.layers.cross_entropy(input=predict, label=label) pyreader.decorate_paddle_reader(
avg_cost = fluid.layers.mean(x=cost) paddle.batch(
batch_acc = fluid.layers.accuracy(input=predict, label=label) reader if args.no_random else paddle.reader.shuffle(
reader, buf_size=5120),
inference_program = fluid.default_main_program().clone() batch_size=args.batch_size))
with fluid.program_guard(inference_program):
inference_program = fluid.io.get_inference_program( return avg_cost, optimizer, [batch_acc1,
target_vars=[batch_acc]) batch_acc5], batched_reader, pyreader
optimizer = fluid.optimizer.Momentum(learning_rate=0.01, momentum=0.9)
batched_train_reader = paddle.batch(
train_reader if args.no_random else paddle.reader.shuffle(
train_reader, buf_size=5120),
batch_size=args.batch_size * args.gpus,
drop_last=True)
batched_test_reader = paddle.batch(
test_reader, batch_size=args.batch_size, drop_last=True)
return avg_cost, inference_program, optimizer, batched_train_reader,\
batched_test_reader, batch_acc
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import functools
import numpy as np
import time
import os
import cProfile, pstats, StringIO
import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.fluid.profiler as profiler
# from recordio_converter import imagenet_train, imagenet_test
from imagenet_reader import train_raw, val
def conv_bn_layer(input,
ch_out,
filter_size,
stride,
padding,
act='relu',
is_train=True):
conv1 = fluid.layers.conv2d(
input=input,
filter_size=filter_size,
num_filters=ch_out,
stride=stride,
padding=padding,
act=None,
bias_attr=False)
return fluid.layers.batch_norm(input=conv1, act=act, is_test=not is_train)
def shortcut(input, ch_out, stride, is_train=True):
ch_in = input.shape[1] # if args.data_format == 'NCHW' else input.shape[-1]
if ch_in != ch_out:
return conv_bn_layer(
input, ch_out, 1, stride, 0, None, is_train=is_train)
else:
return input
def basicblock(input, ch_out, stride, is_train=True):
short = shortcut(input, ch_out, stride, is_train=is_train)
conv1 = conv_bn_layer(input, ch_out, 3, stride, 1, is_train=is_train)
conv2 = conv_bn_layer(conv1, ch_out, 3, 1, 1, act=None, is_train=is_train)
return fluid.layers.elementwise_add(x=short, y=conv2, act='relu')
def bottleneck(input, ch_out, stride, is_train=True):
short = shortcut(input, ch_out * 4, stride, is_train=is_train)
conv1 = conv_bn_layer(input, ch_out, 1, stride, 0, is_train=is_train)
conv2 = conv_bn_layer(conv1, ch_out, 3, 1, 1, is_train=is_train)
conv3 = conv_bn_layer(
conv2, ch_out * 4, 1, 1, 0, act=None, is_train=is_train)
return fluid.layers.elementwise_add(x=short, y=conv3, act='relu')
def layer_warp(block_func, input, ch_out, count, stride):
res_out = block_func(input, ch_out, stride)
for i in range(1, count):
res_out = block_func(res_out, ch_out, 1)
return res_out
def resnet_imagenet(input,
class_dim,
depth=50,
data_format='NCHW',
is_train=True):
cfg = {
18: ([2, 2, 2, 1], basicblock),
34: ([3, 4, 6, 3], basicblock),
50: ([3, 4, 6, 3], bottleneck),
101: ([3, 4, 23, 3], bottleneck),
152: ([3, 8, 36, 3], bottleneck)
}
stages, block_func = cfg[depth]
conv1 = conv_bn_layer(input, ch_out=64, filter_size=7, stride=2, padding=3)
pool1 = fluid.layers.pool2d(
input=conv1, pool_type='avg', pool_size=3, pool_stride=2)
res1 = layer_warp(block_func, pool1, 64, stages[0], 1)
res2 = layer_warp(block_func, res1, 128, stages[1], 2)
res3 = layer_warp(block_func, res2, 256, stages[2], 2)
res4 = layer_warp(block_func, res3, 512, stages[3], 2)
pool2 = fluid.layers.pool2d(
input=res4,
pool_size=7,
pool_type='avg',
pool_stride=1,
global_pooling=True)
out = fluid.layers.fc(input=pool2, size=class_dim, act='softmax')
return out
def resnet_cifar10(input, class_dim, depth=32, data_format='NCHW'):
assert (depth - 2) % 6 == 0
n = (depth - 2) // 6
conv1 = conv_bn_layer(
input=input, ch_out=16, filter_size=3, stride=1, padding=1)
res1 = layer_warp(basicblock, conv1, 16, n, 1)
res2 = layer_warp(basicblock, res1, 32, n, 2)
res3 = layer_warp(basicblock, res2, 64, n, 2)
pool = fluid.layers.pool2d(
input=res3, pool_size=8, pool_type='avg', pool_stride=1)
out = fluid.layers.fc(input=pool, size=class_dim, act='softmax')
return out
def _model_reader_dshape_classdim(args, is_train):
model = resnet_cifar10
reader = None
if args.data_set == "cifar10":
class_dim = 10
if args.data_format == 'NCHW':
dshape = [3, 32, 32]
else:
dshape = [32, 32, 3]
model = resnet_cifar10
if is_train:
reader = paddle.dataset.cifar.train10()
else:
reader = paddle.dataset.cifar.test10()
elif args.data_set == "flowers":
class_dim = 102
if args.data_format == 'NCHW':
dshape = [3, 224, 224]
else:
dshape = [224, 224, 3]
model = resnet_imagenet
if is_train:
reader = paddle.dataset.flowers.train()
else:
reader = paddle.dataset.flowers.test()
elif args.data_set == "imagenet":
class_dim = 1000
if args.data_format == 'NCHW':
dshape = [3, 224, 224]
else:
dshape = [224, 224, 3]
model = resnet_imagenet
if not args.data_path:
raise Exception(
"Must specify --data_path when training with imagenet")
if not args.use_reader_op:
if is_train:
reader = train_raw()
else:
reader = val()
else:
if is_train:
reader = train_raw()
else:
reader = val(xmap=False)
return model, reader, dshape, class_dim
def get_model(args, is_train, main_prog, startup_prog):
model, reader, dshape, class_dim = _model_reader_dshape_classdim(args,
is_train)
pyreader = None
trainer_count = int(os.getenv("PADDLE_TRAINERS"))
with fluid.program_guard(main_prog, startup_prog):
with fluid.unique_name.guard():
if args.use_reader_op:
pyreader = fluid.layers.py_reader(
capacity=args.batch_size * args.gpus,
shapes=([-1] + dshape, (-1, 1)),
dtypes=('uint8', 'int64'),
name="train_reader" if is_train else "test_reader",
use_double_buffer=True)
input, label = fluid.layers.read_file(pyreader)
else:
input = fluid.layers.data(
name='data', shape=dshape, dtype='uint8')
label = fluid.layers.data(
name='label', shape=[1], dtype='int64')
# add imagenet preprocessors
random_crop = fluid.layers.random_crop(input, dshape)
casted = fluid.layers.cast(random_crop, 'float32')
# input is HWC
trans = fluid.layers.transpose(casted, [0, 3, 1, 2]) / 255.0
img_mean = fluid.layers.tensor.assign(
np.array([0.485, 0.456, 0.406]).astype('float32').reshape((3, 1,
1)))
img_std = fluid.layers.tensor.assign(
np.array([0.229, 0.224, 0.225]).astype('float32').reshape((3, 1,
1)))
h1 = fluid.layers.elementwise_sub(trans, img_mean, axis=1)
h2 = fluid.layers.elementwise_div(h1, img_std, axis=1)
# pre_out = (trans - img_mean) / img_std
predict = model(h2, class_dim, is_train=is_train)
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
batch_acc1 = fluid.layers.accuracy(input=predict, label=label, k=1)
batch_acc5 = fluid.layers.accuracy(input=predict, label=label, k=5)
# configure optimize
optimizer = None
if is_train:
if args.use_lars:
lars_decay = 1.0
else:
lars_decay = 0.0
total_images = 1281167 / trainer_count
step = int(total_images / args.batch_size + 1)
epochs = [30, 60, 80, 90]
bd = [step * e for e in epochs]
base_lr = args.learning_rate
lr = []
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
optimizer = fluid.optimizer.Momentum(
learning_rate=base_lr,
#learning_rate=fluid.layers.piecewise_decay(
# boundaries=bd, values=lr),
momentum=0.9,
regularization=fluid.regularizer.L2Decay(1e-4))
optimizer.minimize(avg_cost)
if args.memory_optimize:
fluid.memory_optimize(main_prog)
# config readers
if not args.use_reader_op:
batched_reader = paddle.batch(
reader if args.no_random else paddle.reader.shuffle(
reader, buf_size=5120),
batch_size=args.batch_size * args.gpus,
drop_last=True)
else:
batched_reader = None
pyreader.decorate_paddle_reader(
paddle.batch(
# reader if args.no_random else paddle.reader.shuffle(
# reader, buf_size=5120),
reader,
batch_size=args.batch_size))
return avg_cost, optimizer, [batch_acc1,
batch_acc5], batched_reader, pyreader
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.fluid as fluid
import math
import os
from imagenet_reader import train, val
__all__ = [
"SE_ResNeXt", "SE_ResNeXt50_32x4d", "SE_ResNeXt101_32x4d",
"SE_ResNeXt152_32x4d", "get_model"
]
train_parameters = {
"input_size": [3, 224, 224],
"input_mean": [0.485, 0.456, 0.406],
"input_std": [0.229, 0.224, 0.225],
"learning_strategy": {
"name": "piecewise_decay",
"batch_size": 256,
"epochs": [30, 60, 90],
"steps": [0.1, 0.01, 0.001, 0.0001]
}
}
class SE_ResNeXt():
def __init__(self, layers=50, is_train=True):
self.params = train_parameters
self.layers = layers
self.is_train = is_train
def net(self, input, class_dim=1000):
layers = self.layers
supported_layers = [50, 101, 152]
assert layers in supported_layers, \
"supported layers are {} but input layer is {}".format(supported_layers, layers)
if layers == 50:
cardinality = 32
reduction_ratio = 16
depth = [3, 4, 6, 3]
num_filters = [128, 256, 512, 1024]
conv = self.conv_bn_layer(
input=input,
num_filters=64,
filter_size=7,
stride=2,
act='relu')
conv = fluid.layers.pool2d(
input=conv,
pool_size=3,
pool_stride=2,
pool_padding=1,
pool_type='max')
elif layers == 101:
cardinality = 32
reduction_ratio = 16
depth = [3, 4, 23, 3]
num_filters = [128, 256, 512, 1024]
conv = self.conv_bn_layer(
input=input,
num_filters=64,
filter_size=7,
stride=2,
act='relu')
conv = fluid.layers.pool2d(
input=conv,
pool_size=3,
pool_stride=2,
pool_padding=1,
pool_type='max')
elif layers == 152:
cardinality = 64
reduction_ratio = 16
depth = [3, 8, 36, 3]
num_filters = [128, 256, 512, 1024]
conv = self.conv_bn_layer(
input=input,
num_filters=64,
filter_size=3,
stride=2,
act='relu')
conv = self.conv_bn_layer(
input=conv, num_filters=64, filter_size=3, stride=1, act='relu')
conv = self.conv_bn_layer(
input=conv,
num_filters=128,
filter_size=3,
stride=1,
act='relu')
conv = fluid.layers.pool2d(
input=conv, pool_size=3, pool_stride=2, pool_padding=1, \
pool_type='max')
for block in range(len(depth)):
for i in range(depth[block]):
conv = self.bottleneck_block(
input=conv,
num_filters=num_filters[block],
stride=2 if i == 0 and block != 0 else 1,
cardinality=cardinality,
reduction_ratio=reduction_ratio)
pool = fluid.layers.pool2d(
input=conv, pool_size=7, pool_type='avg', global_pooling=True)
drop = fluid.layers.dropout(x=pool, dropout_prob=0.5)
stdv = 1.0 / math.sqrt(drop.shape[1] * 1.0)
out = fluid.layers.fc(input=drop,
size=class_dim,
act='softmax',
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Uniform(-stdv,
stdv)))
return out
def shortcut(self, input, ch_out, stride):
ch_in = input.shape[1]
if ch_in != ch_out or stride != 1:
filter_size = 1
return self.conv_bn_layer(input, ch_out, filter_size, stride)
else:
return input
def bottleneck_block(self, input, num_filters, stride, cardinality,
reduction_ratio):
conv0 = self.conv_bn_layer(
input=input, num_filters=num_filters, filter_size=1, act='relu')
conv1 = self.conv_bn_layer(
input=conv0,
num_filters=num_filters,
filter_size=3,
stride=stride,
groups=cardinality,
act='relu')
conv2 = self.conv_bn_layer(
input=conv1, num_filters=num_filters * 2, filter_size=1, act=None)
scale = self.squeeze_excitation(
input=conv2,
num_channels=num_filters * 2,
reduction_ratio=reduction_ratio)
short = self.shortcut(input, num_filters * 2, stride)
return fluid.layers.elementwise_add(x=short, y=scale, act='relu')
def conv_bn_layer(self,
input,
num_filters,
filter_size,
stride=1,
groups=1,
act=None):
conv = fluid.layers.conv2d(
input=input,
num_filters=num_filters,
filter_size=filter_size,
stride=stride,
padding=(filter_size - 1) / 2,
groups=groups,
act=None,
bias_attr=False)
return fluid.layers.batch_norm(
input=conv, act=act, is_test=not self.is_train)
def squeeze_excitation(self, input, num_channels, reduction_ratio):
pool = fluid.layers.pool2d(
input=input, pool_size=0, pool_type='avg', global_pooling=True)
stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0)
squeeze = fluid.layers.fc(input=pool,
size=num_channels / reduction_ratio,
act='relu',
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Uniform(
-stdv, stdv)))
stdv = 1.0 / math.sqrt(squeeze.shape[1] * 1.0)
excitation = fluid.layers.fc(input=squeeze,
size=num_channels,
act='sigmoid',
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Uniform(
-stdv, stdv)))
scale = fluid.layers.elementwise_mul(x=input, y=excitation, axis=0)
return scale
def SE_ResNeXt50_32x4d():
model = SE_ResNeXt(layers=50)
return model
def SE_ResNeXt101_32x4d():
model = SE_ResNeXt(layers=101)
return model
def SE_ResNeXt152_32x4d():
model = SE_ResNeXt(layers=152)
return model
def get_model(args, is_train, main_prog, startup_prog):
model = SE_ResNeXt(layers=50)
batched_reader = None
pyreader = None
trainer_count = int(os.getenv("PADDLE_TRAINERS"))
dshape = train_parameters["input_size"]
with fluid.program_guard(main_prog, startup_prog):
with fluid.unique_name.guard():
if args.use_reader_op:
pyreader = fluid.layers.py_reader(
capacity=10,
shapes=([-1] + dshape, (-1, 1)),
dtypes=('float32', 'int64'),
name="train_reader" if is_train else "test_reader",
use_double_buffer=True)
input, label = fluid.layers.read_file(pyreader)
else:
input = fluid.layers.data(
name='data', shape=dshape, dtype='float32')
label = fluid.layers.data(
name='label', shape=[1], dtype='int64')
out = model.net(input=input)
cost = fluid.layers.cross_entropy(input=out, label=label)
avg_cost = fluid.layers.mean(x=cost)
acc_top1 = fluid.layers.accuracy(input=out, label=label, k=1)
acc_top5 = fluid.layers.accuracy(input=out, label=label, k=5)
optimizer = None
if is_train:
if args.use_lars:
lars_decay = 1.0
else:
lars_decay = 0.0
total_images = 1281167 / trainer_count
step = int(total_images / args.batch_size + 1)
epochs = [40, 80, 100]
bd = [step * e for e in epochs]
base_lr = args.learning_rate
lr = []
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
optimizer = fluid.optimizer.Momentum(
# learning_rate=base_lr,
learning_rate=fluid.layers.piecewise_decay(
boundaries=bd, values=lr),
momentum=0.9,
regularization=fluid.regularizer.L2Decay(1e-4),
LARS_weight_decay=lars_decay)
optimizer.minimize(avg_cost)
if args.memory_optimize:
fluid.memory_optimize(main_prog)
# config readers
if is_train:
reader = train()
else:
reader = val()
if not args.use_reader_op:
batched_reader = paddle.batch(
reader, batch_size=args.batch_size * args.gpus, drop_last=True)
else:
pyreader.decorate_paddle_reader(
paddle.batch(
reader, batch_size=args.batch_size))
return avg_cost, optimizer, [acc_top1, acc_top5], batched_reader, pyreader
...@@ -26,7 +26,6 @@ import numpy ...@@ -26,7 +26,6 @@ import numpy
import paddle import paddle
import paddle.dataset.imdb as imdb import paddle.dataset.imdb as imdb
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.batch as batch
import paddle.fluid.profiler as profiler import paddle.fluid.profiler as profiler
word_dict = imdb.word_dict() word_dict = imdb.word_dict()
...@@ -43,19 +42,7 @@ def crop_sentence(reader, crop_size): ...@@ -43,19 +42,7 @@ def crop_sentence(reader, crop_size):
return __impl__ return __impl__
def get_model(args): def lstm_net(sentence, lstm_size):
if args.use_reader_op:
raise Exception(
"stacked_dynamic_lstm do not support reader op for now.")
lstm_size = 512
emb_dim = 512
crop_size = 1500
data = fluid.layers.data(
name="words", shape=[1], lod_level=1, dtype='int64')
sentence = fluid.layers.embedding(
input=data, size=[len(word_dict), emb_dim])
sentence = fluid.layers.fc(input=sentence, size=lstm_size, act='tanh') sentence = fluid.layers.fc(input=sentence, size=lstm_size, act='tanh')
rnn = fluid.layers.DynamicRNN() rnn = fluid.layers.DynamicRNN()
...@@ -97,31 +84,47 @@ def get_model(args): ...@@ -97,31 +84,47 @@ def get_model(args):
last = fluid.layers.sequence_pool(rnn(), 'last') last = fluid.layers.sequence_pool(rnn(), 'last')
logit = fluid.layers.fc(input=last, size=2, act='softmax') logit = fluid.layers.fc(input=last, size=2, act='softmax')
loss = fluid.layers.cross_entropy( return logit
input=logit,
label=fluid.layers.data(
name='label', shape=[1], dtype='int64'))
loss = fluid.layers.mean(x=loss)
# add acc
batch_size_tensor = fluid.layers.create_tensor(dtype='int64')
batch_acc = fluid.layers.accuracy(input=logit, label=fluid.layers.data(name='label', \
shape=[1], dtype='int64'), total=batch_size_tensor)
inference_program = fluid.default_main_program().clone() def get_model(args, is_train, main_prog, startup_prog):
with fluid.program_guard(inference_program): if args.use_reader_op:
inference_program = fluid.io.get_inference_program( raise Exception(
target_vars=[batch_acc, batch_size_tensor]) "stacked_dynamic_lstm do not support reader op for now.")
lstm_size = 512
adam = fluid.optimizer.Adam() emb_dim = 512
crop_size = 1500
train_reader = batch( with fluid.program_guard(main_prog, startup_prog):
with fluid.unique_name.guard():
data = fluid.layers.data(
name="words", shape=[1], lod_level=1, dtype='int64')
sentence = fluid.layers.embedding(
input=data, size=[len(word_dict), emb_dim])
logit = lstm_net(sentence, lstm_size)
loss = fluid.layers.cross_entropy(
input=logit,
label=fluid.layers.data(
name='label', shape=[1], dtype='int64'))
loss = fluid.layers.mean(x=loss)
# add acc
batch_size_tensor = fluid.layers.create_tensor(dtype='int64')
batch_acc = fluid.layers.accuracy(input=logit, label=fluid.layers.data(name='label', \
shape=[1], dtype='int64'), total=batch_size_tensor)
if is_train:
adam = fluid.optimizer.Adam()
adam.minimize(loss)
if is_train:
reader = crop_sentence(imdb.train(word_dict), crop_size)
else:
reader = crop_sentence(imdb.test(word_dict), crop_size)
batched_reader = paddle.batch(
paddle.reader.shuffle( paddle.reader.shuffle(
crop_sentence(imdb.train(word_dict), crop_size), buf_size=25000), reader, buf_size=25000),
batch_size=args.batch_size * args.gpus) batch_size=args.batch_size * args.gpus)
test_reader = batch(
paddle.reader.shuffle(
crop_sentence(imdb.test(word_dict), crop_size), buf_size=25000),
batch_size=args.batch_size)
return loss, inference_program, adam, train_reader, test_reader, batch_acc return loss, adam, [batch_acc], batched_reader, None
...@@ -25,7 +25,7 @@ import functools ...@@ -25,7 +25,7 @@ import functools
import os import os
def vgg16_bn_drop(input): def vgg16_bn_drop(input, is_train=True):
def conv_block(input, num_filter, groups, dropouts): def conv_block(input, num_filter, groups, dropouts):
return fluid.nets.img_conv_group( return fluid.nets.img_conv_group(
input=input, input=input,
...@@ -46,13 +46,13 @@ def vgg16_bn_drop(input): ...@@ -46,13 +46,13 @@ def vgg16_bn_drop(input):
drop = fluid.layers.dropout(x=conv5, dropout_prob=0.5) drop = fluid.layers.dropout(x=conv5, dropout_prob=0.5)
fc1 = fluid.layers.fc(input=drop, size=512, act=None) fc1 = fluid.layers.fc(input=drop, size=512, act=None)
bn = fluid.layers.batch_norm(input=fc1, act='relu') bn = fluid.layers.batch_norm(input=fc1, act='relu', is_test=not is_train)
drop2 = fluid.layers.dropout(x=bn, dropout_prob=0.5) drop2 = fluid.layers.dropout(x=bn, dropout_prob=0.5)
fc2 = fluid.layers.fc(input=drop2, size=512, act=None) fc2 = fluid.layers.fc(input=drop2, size=512, act=None)
return fc2 return fc2
def get_model(args): def get_model(args, is_train, main_prog, startup_prog):
if args.data_set == "cifar10": if args.data_set == "cifar10":
classdim = 10 classdim = 10
if args.data_format == 'NCHW': if args.data_format == 'NCHW':
...@@ -65,57 +65,56 @@ def get_model(args): ...@@ -65,57 +65,56 @@ def get_model(args):
data_shape = [3, 224, 224] data_shape = [3, 224, 224]
else: else:
data_shape = [224, 224, 3] data_shape = [224, 224, 3]
filelist = [
os.path.join(args.data_path, f) for f in os.listdir(args.data_path)
]
with fluid.program_guard(main_prog, startup_prog):
if args.use_reader_op:
data_file_handle = fluid.layers.open_files(
filenames=filelist,
shapes=[[-1] + data_shape, (-1, 1)],
lod_levels=[0, 0],
dtypes=["float32", "int64"],
thread_num=1,
pass_num=1)
data_file = fluid.layers.double_buffer(
fluid.layers.batch(
data_file_handle, batch_size=args.batch_size))
with fluid.unique_name.guard():
if args.use_reader_op:
images, label = fluid.layers.read_file(data_file)
else:
images = fluid.layers.data(
name='data', shape=data_shape, dtype='float32')
label = fluid.layers.data(
name='label', shape=[1], dtype='int64')
# Train program
net = vgg16_bn_drop(images, is_train=is_train)
predict = fluid.layers.fc(input=net, size=classdim, act='softmax')
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
if args.use_reader_op: # Evaluator
filelist = [ batch_size_tensor = fluid.layers.create_tensor(dtype='int64')
os.path.join(args.data_path, f) for f in os.listdir(args.data_path) batch_acc = fluid.layers.accuracy(
] input=predict, label=label, total=batch_size_tensor)
data_file = fluid.layers.open_files( # Optimization
filenames=filelist, if is_train:
shapes=[[-1] + data_shape, (-1, 1)], optimizer = fluid.optimizer.Adam(
lod_levels=[0, 0], learning_rate=args.learning_rate)
dtypes=["float32", "int64"], optimizer.minimize(avg_cost)
thread_num=args.gpus,
pass_num=args.pass_num)
data_file = fluid.layers.double_buffer(
fluid.layers.batch(
data_file, batch_size=args.batch_size))
images, label = fluid.layers.read_file(data_file)
else:
images = fluid.layers.data(
name='data', shape=data_shape, dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
# Train program
net = vgg16_bn_drop(images)
predict = fluid.layers.fc(input=net, size=classdim, act='softmax')
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
# Evaluator
batch_size_tensor = fluid.layers.create_tensor(dtype='int64')
batch_acc = fluid.layers.accuracy(
input=predict, label=label, total=batch_size_tensor)
# inference program
inference_program = fluid.default_main_program().clone()
with fluid.program_guard(inference_program):
inference_program = fluid.io.get_inference_program(
target_vars=[batch_acc, batch_size_tensor])
# Optimization
optimizer = fluid.optimizer.Adam(learning_rate=args.learning_rate)
# data reader # data reader
train_reader = paddle.batch( if is_train:
reader = paddle.dataset.cifar.train10() \
if args.data_set == 'cifar10' else paddle.dataset.flowers.train()
else:
reader = paddle.dataset.cifar.test10() \
if args.data_set == 'cifar10' else paddle.dataset.flowers.test()
batched_reader = paddle.batch(
paddle.reader.shuffle( paddle.reader.shuffle(
paddle.dataset.cifar.train10() reader, buf_size=5120),
if args.data_set == 'cifar10' else paddle.dataset.flowers.train(),
buf_size=5120),
batch_size=args.batch_size * args.gpus) batch_size=args.batch_size * args.gpus)
test_reader = paddle.batch(
paddle.dataset.cifar.test10()
if args.data_set == 'cifar10' else paddle.dataset.flowers.test(),
batch_size=args.batch_size)
return avg_cost, inference_program, optimizer, train_reader, test_reader, batch_acc return avg_cost, optimizer, [batch_acc], batched_reader, data_file_handle
...@@ -66,7 +66,7 @@ paddle.fluid.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'pla ...@@ -66,7 +66,7 @@ paddle.fluid.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'pla
paddle.fluid.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0)) paddle.fluid.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0))
paddle.fluid.release_memory ArgSpec(args=['input_program', 'skip_opt_set'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.release_memory ArgSpec(args=['input_program', 'skip_opt_set'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.DistributeTranspilerConfig.__init__ paddle.fluid.DistributeTranspilerConfig.__init__
paddle.fluid.ParallelExecutor.__init__ ArgSpec(args=['self', 'use_cuda', 'loss_name', 'main_program', 'share_vars_from', 'exec_strategy', 'build_strategy', 'num_trainers', 'trainer_id'], varargs=None, keywords='kwargs', defaults=(None, None, None, None, None, 1, 0)) paddle.fluid.ParallelExecutor.__init__ ArgSpec(args=['self', 'use_cuda', 'loss_name', 'main_program', 'share_vars_from', 'exec_strategy', 'build_strategy', 'num_trainers', 'trainer_id', 'scope'], varargs=None, keywords='kwargs', defaults=(None, None, None, None, None, 1, 0, None))
paddle.fluid.ParallelExecutor.run ArgSpec(args=['self', 'fetch_list', 'feed', 'feed_dict', 'return_numpy'], varargs=None, keywords=None, defaults=(None, None, True)) paddle.fluid.ParallelExecutor.run ArgSpec(args=['self', 'fetch_list', 'feed', 'feed_dict', 'return_numpy'], varargs=None, keywords=None, defaults=(None, None, True))
paddle.fluid.ExecutionStrategy.__init__ __init__(self: paddle.fluid.core.ExecutionStrategy) -> None paddle.fluid.ExecutionStrategy.__init__ __init__(self: paddle.fluid.core.ExecutionStrategy) -> None
paddle.fluid.BuildStrategy.GradientScaleStrategy.__init__ __init__(self: paddle.fluid.core.GradientScaleStrategy, arg0: int) -> None paddle.fluid.BuildStrategy.GradientScaleStrategy.__init__ __init__(self: paddle.fluid.core.GradientScaleStrategy, arg0: int) -> None
......
...@@ -100,14 +100,13 @@ struct NCCLContextMap { ...@@ -100,14 +100,13 @@ struct NCCLContextMap {
return; return;
} }
std::unique_ptr<ncclComm_t[]> comms(new ncclComm_t[order_.size()]); std::unique_ptr<ncclComm_t[]> comms(new ncclComm_t[order_.size()]);
// if pass nccl_id here, can assume we are doing multi node training // if num_trainers == 1, should create a new nccl id for local comms.
if (nccl_id == nullptr) { if (num_trainers == 1) {
std::lock_guard<std::mutex> guard(NCCLGroupGuard::NCCLMutex()); std::lock_guard<std::mutex> guard(NCCLGroupGuard::NCCLMutex());
PADDLE_ENFORCE(platform::dynload::ncclCommInitAll( PADDLE_ENFORCE(platform::dynload::ncclCommInitAll(
comms.get(), static_cast<int>(order_.size()), order_.data())); comms.get(), static_cast<int>(order_.size()), order_.data()));
} else { } else {
PADDLE_ENFORCE_GT(num_trainers, 1); PADDLE_ENFORCE_NOT_NULL(nccl_id);
// TODO(wuyi): need to ensure each node have same number of GPUs
{ {
int nranks = num_trainers * order_.size(); int nranks = num_trainers * order_.size();
NCCLGroupGuard gurad; NCCLGroupGuard gurad;
......
...@@ -43,8 +43,9 @@ class ParallelExecutor(object): ...@@ -43,8 +43,9 @@ class ParallelExecutor(object):
num_trainers(int): If greater than 1, NCCL will be initialized with num_trainers(int): If greater than 1, NCCL will be initialized with
multiple rank of nodes, each node should have same number of GPUs. multiple rank of nodes, each node should have same number of GPUs.
Distributed training will be enabled then. Default 1. Distributed training will be enabled then. Default 1.
trainer_id(int: Must use together with num_trainers. trainer_id is the trainer_id(int): Must use together with num_trainers. trainer_id is the
"rank" of current node starts from 0. Default 0. "rank" of current node starts from 0. Default 0.
scope(Scope): scope to run with, default use fluid.global_scope().
Returns: Returns:
ParallelExecutor: The initialized ParallelExecutor object. ParallelExecutor: The initialized ParallelExecutor object.
...@@ -73,6 +74,7 @@ class ParallelExecutor(object): ...@@ -73,6 +74,7 @@ class ParallelExecutor(object):
build_strategy=None, build_strategy=None,
num_trainers=1, num_trainers=1,
trainer_id=0, trainer_id=0,
scope=None,
**kwargs): **kwargs):
if len(kwargs) != 0: if len(kwargs) != 0:
err_msg = "" err_msg = ""
...@@ -131,7 +133,8 @@ class ParallelExecutor(object): ...@@ -131,7 +133,8 @@ class ParallelExecutor(object):
main = main_program main = main_program
main = main if main else framework.default_main_program() main = main if main else framework.default_main_program()
scope = executor.global_scope() if scope == None:
scope = executor.global_scope()
# FIXME(Yancey1989): it's a temporary approach to determinate the distribute # FIXME(Yancey1989): it's a temporary approach to determinate the distribute
# train program, call self.bcast_param() at the end of each mini-batch. # train program, call self.bcast_param() at the end of each mini-batch.
self.is_dist = True if "recv" in [ self.is_dist = True if "recv" in [
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册