提交 d6dac4df 编写于 作者: Y Yancey1989

add visreader

上级 a4e5a149
......@@ -86,6 +86,7 @@ The default resnet50 distributed training config is based on this paper: https:/
we run several batches forward and backward computation, then merge the gradients and send to pserver for
optimization, we use different batch norm mean and variance variable in each repeat so that adding repeats
behaves the same as adding more GPUs.
- use [VisionTools DataReader](https://github.com/PaddlePaddle/VisionTools/tree/master/datareader) `--visreader` can achieve better performance to process images.
### Performance
......
......@@ -66,6 +66,7 @@ def parse_args():
add_arg('async_mode', bool, False, "Async distributed training, only for pserver mode.")
add_arg('reduce_strategy', str, "allreduce", "Choose from reduce or allreduce.")
add_arg('skip_unbalanced_data', bool, False, "Skip data not if data not balanced on nodes.")
add_arg('visreader', bool, False, "Whether to use high performance VisTool reader.")
# yapf: enable
args = parser.parse_args()
return args
......@@ -94,6 +95,40 @@ def prepare_reader(is_train, pyreader, args, pass_id=0):
reader,
batch_size=bs))
def prepare_visreader(is_train, pyreader, args):
import datareader.example.imagenet_demo as imagenet
def _parse_kv(r):
""" parse kv data from sequence file for imagenet
"""
import cPickle
k, v = r
obj = cPickle.loads(v)
if len(obj['label']) >= 4:
label = int(obj['label'][3]) # class id
else:
label = int(obj['label'][2])
return obj['image'], label
pre_maps = [_parse_kv]
worker_args = {}
worker_args['cpp_xmap'] = True
worker_args['use_process'] = False
worker_args['worker_num'] = 16
imagenet.g_settings['worker_args'] = worker_args
imagenet.g_settings['part_id'] = args.dist_env['trainer_id']
imagenet.g_settings['part_num'] = args.dist_env['num_trainers']
if is_train:
reader = imagenet.train(os.path.join(args.data_dir, "train"), pre_maps=pre_maps)
else:
reader = imagenet.val(os.path.join(args.data_dir, "val"), pre_maps=pre_maps)
batch_size = args.batch_size
batch_size_per_gpu = batch_size / get_device_num()
assert batch_size_per_gpu * get_device_num() == batch_size, \
"invalid batch_size[%d] for multiple gpus[%d]" % (batch_size, get_device_num())
pyreader.decorate_paddle_reader(
paddle.batch(reader, batch_size=batch_size_per_gpu))
def build_program(is_train, main_prog, startup_prog, args):
pyreader = None
class_dim = args.class_dim
......@@ -103,17 +138,23 @@ def build_program(is_train, main_prog, startup_prog, args):
device_num_per_worker = get_device_num()
with fluid.program_guard(main_prog, startup_prog):
pyreader = fluid.layers.py_reader(
capacity=16,
capacity=args.batch_size,
shapes=([-1] + image_shape, (-1, 1)),
dtypes=('float32', 'int64'),
dtypes=('uint8', 'int64'),
name="train_reader" if is_train else "test_reader",
use_double_buffer=True)
with fluid.unique_name.guard():
image, label = fluid.layers.read_file(pyreader)
# normalize image on GPU can achieve better performance
cast = fluid.layers.cast(image, "float32")
img_mean = fluid.layers.create_global_var([3, 1, 1], 0.0, "float32", name="img_mean", persistable=True)
img_std = fluid.layers.create_global_var([3, 1, 1], 0.0, "float32", name="img_std", persistable=True)
t1 = fluid.layers.elementwise_sub(cast / 255.0, img_mean, axis=1)
t2 = fluid.layers.elementwise_div(t1, img_std, axis=1)
if args.fp16:
image = fluid.layers.cast(image, "float16")
t2 = fluid.layers.cast(t2, "float16")
model_def = models.__dict__[args.model](layers=50, is_train=is_train)
predict = model_def.net(image, class_dim=class_dim)
predict = model_def.net(t2, class_dim=class_dim)
cost, pred = fluid.layers.softmax_with_cross_entropy(predict, label, return_softmax=True)
if args.scale_loss > 1:
avg_cost = fluid.layers.mean(x=cost) * float(args.scale_loss)
......@@ -130,7 +171,7 @@ def build_program(is_train, main_prog, startup_prog, args):
if os.getenv("FLAGS_selected_gpus"):
# in multi process mode, "trainer_count" will be total devices
# in the whole cluster, and we need to scale num_of nodes.
end_lr *= device_num_per_worker
end_lr /= device_num_per_worker
total_images = args.total_images / trainer_count
step = int(total_images / (args.batch_size * args.multi_batch_repeat) + 1)
......@@ -165,7 +206,10 @@ def build_program(is_train, main_prog, startup_prog, args):
optimizer.minimize(avg_cost)
# prepare reader for current program
prepare_reader(is_train, pyreader, args)
if args.visreader:
prepare_visreader(is_train, pyreader, args)
else:
prepare_reader(is_train, pyreader, args)
return pyreader, avg_cost, batch_acc1, batch_acc5
......@@ -220,9 +264,16 @@ def train_parallel(args):
if args.multi_batch_repeat > 1:
append_bn_repeat_init_op(train_prog, startup_prog, args.multi_batch_repeat)
startup_exe.run(startup_prog)
img_mean_np = np.array([0.485, 0.456, 0.406]).astype("float32").reshape((3, 1, 1))
img_std_np = np.array([0.229, 0.224, 0.225]).astype("float32").reshape((3, 1, 1))
mean_var = fluid.global_scope().find_var("img_mean")
mean_var.get_tensor().set(img_mean_np, place)
std_var = fluid.global_scope().find_var("img_std")
std_var.get_tensor().set(img_std_np, place)
strategy = fluid.ExecutionStrategy()
strategy.num_threads = args.num_threads
strategy.num_iteration_per_drop_scope = 30
build_strategy = fluid.BuildStrategy()
if args.multi_batch_repeat > 1:
pass_builder = build_strategy._finalize_strategy_and_create_passes()
......@@ -261,11 +312,14 @@ def train_parallel(args):
for pass_id in range(args.num_epochs):
num_samples = 0
start_time = time.time()
batch_id = 1
# use pass_id+1 as per pass global shuffle for distributed training
prepare_reader(True, train_pyreader, args, pass_id + 1)
train_pyreader.start()
while True:
if args.visreader and pass_id == 0:
# vistool reader is ulimited data, don't need restart pyreader
train_pyreader.start()
else:
# use pass_id+1 as per pass global shuffle for distributed training
prepare_reader(True, train_pyreader, args, pass_id + 1)
train_pyreader.start()
for batch_id in range(1, steps_per_pass+1):
try:
if batch_id % 30 == 0:
fetch_ret = exe.run(fetch_list)
......@@ -281,14 +335,12 @@ def train_parallel(args):
traceback.print_exc()
break
num_samples += args.batch_size
batch_id += 1
if args.skip_unbalanced_data and batch_id >= steps_per_pass:
break
print_train_time(start_time, time.time(), num_samples)
train_pyreader.reset()
if (args.visreader and pass_id == args.num_epochs - 1) or not args.visreader:
train_pyreader.reset()
if pass_id > args.start_test_pass:
if pass_id >= args.start_test_pass:
if args.multi_batch_repeat > 1:
copyback_repeat_bn_params(train_prog)
test_fetch_list = [test_cost.name, test_acc1.name, test_acc5.name]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册