提交 a6101b02 编写于 作者: Y Yancey1989

update by comment

上级 d6dac4df
......@@ -86,7 +86,9 @@ The default resnet50 distributed training config is based on this paper: https:/
we run several batches forward and backward computation, then merge the gradients and send to pserver for
optimization, we use different batch norm mean and variance variable in each repeat so that adding repeats
behaves the same as adding more GPUs.
- use [VisionTools DataReader](https://github.com/PaddlePaddle/VisionTools/tree/master/datareader) `--visreader` can achieve better performance to process images.
- use [VisionTools DataReader](https://github.com/PaddlePaddle/VisionTools/tree/master/datareader) `--use_visiontool True`
can achieve better performance to pre-process images. You need to install visiontool on your host before enable it by:
`pip install -r requiremenets.txt`.
### Performance
......
......@@ -65,12 +65,17 @@ def parse_args():
add_arg('split_var', bool, True, "Split params on pserver.")
add_arg('async_mode', bool, False, "Async distributed training, only for pserver mode.")
add_arg('reduce_strategy', str, "allreduce", "Choose from reduce or allreduce.")
add_arg('skip_unbalanced_data', bool, False, "Skip data not if data not balanced on nodes.")
add_arg('visreader', bool, False, "Whether to use high performance VisTool reader.")
add_arg('skip_unbalanced_data', bool, False, "Skip data not if data not balanced on nodes.")
add_arg('use_visiontool', bool, False, "Whether to use high performance VisTool reader.")
add_arg('visiontool_workers', int, 16, "Number for visiontool reader workers.")
# yapf: enable
args = parser.parse_args()
return args
def is_mp_mode():
return True if os.getenv("FLAGS_selected_gpus") else False
def get_device_num():
if os.getenv("CPU_NUM"):
return int(os.getenv("CPU_NUM"))
......@@ -83,9 +88,9 @@ def get_device_num():
def prepare_reader(is_train, pyreader, args, pass_id=0):
if is_train:
reader = train(data_dir=args.data_dir, pass_id_as_seed=pass_id)
reader = train(data_dir=args.data_dir, pass_id_as_seed=pass_id, normalize=False)
else:
reader = val(data_dir=args.data_dir)
reader = val(data_dir=args.data_dir, normalize=False)
if is_train:
bs = args.batch_size / get_device_num()
else:
......@@ -112,7 +117,7 @@ def prepare_visreader(is_train, pyreader, args):
worker_args = {}
worker_args['cpp_xmap'] = True
worker_args['use_process'] = False
worker_args['worker_num'] = 16
worker_args['worker_num'] = args.visiontool_workers
imagenet.g_settings['worker_args'] = worker_args
imagenet.g_settings['part_id'] = args.dist_env['trainer_id']
imagenet.g_settings['part_num'] = args.dist_env['num_trainers']
......@@ -146,15 +151,16 @@ def build_program(is_train, main_prog, startup_prog, args):
with fluid.unique_name.guard():
image, label = fluid.layers.read_file(pyreader)
# normalize image on GPU can achieve better performance
cast = fluid.layers.cast(image, "float32")
img_mean = fluid.layers.create_global_var([3, 1, 1], 0.0, "float32", name="img_mean", persistable=True)
img_std = fluid.layers.create_global_var([3, 1, 1], 0.0, "float32", name="img_std", persistable=True)
t1 = fluid.layers.elementwise_sub(cast / 255.0, img_mean, axis=1)
t2 = fluid.layers.elementwise_div(t1, img_std, axis=1)
if args.fp16:
t2 = fluid.layers.cast(t2, "float16")
img_type = "float16" if args.fp16 else "float32"
cast = fluid.layers.cast(image, img_type)
img_mean = fluid.layers.create_global_var([3, 1, 1], 0.0, img_type, name="img_mean", persistable=True)
img_std = fluid.layers.create_global_var([3, 1, 1], 0.0, img_type, name="img_std", persistable=True)
img_scale = fluid.layers.create_global_var([1], 0.0, img_type, name="img_scale", persistable=True)
# image = (image / 255.0 - mean) / std
t1 = fluid.layers.elementwise_sub(cast / img_scale, img_mean, axis=1)
input = fluid.layers.elementwise_div(t1, img_std, axis=1)
model_def = models.__dict__[args.model](layers=50, is_train=is_train)
predict = model_def.net(t2, class_dim=class_dim)
predict = model_def.net(input, class_dim=class_dim)
cost, pred = fluid.layers.softmax_with_cross_entropy(predict, label, return_softmax=True)
if args.scale_loss > 1:
avg_cost = fluid.layers.mean(x=cost) * float(args.scale_loss)
......@@ -206,7 +212,7 @@ def build_program(is_train, main_prog, startup_prog, args):
optimizer.minimize(avg_cost)
# prepare reader for current program
if args.visreader:
if args.use_visiontool:
prepare_visreader(is_train, pyreader, args)
else:
prepare_reader(is_train, pyreader, args)
......@@ -264,12 +270,18 @@ def train_parallel(args):
if args.multi_batch_repeat > 1:
append_bn_repeat_init_op(train_prog, startup_prog, args.multi_batch_repeat)
startup_exe.run(startup_prog)
img_mean_np = np.array([0.485, 0.456, 0.406]).astype("float32").reshape((3, 1, 1))
img_std_np = np.array([0.229, 0.224, 0.225]).astype("float32").reshape((3, 1, 1))
mean_var = fluid.global_scope().find_var("img_mean")
mean_var.get_tensor().set(img_mean_np, place)
std_var = fluid.global_scope().find_var("img_std")
std_var.get_tensor().set(img_std_np, place)
np_tensors = {}
np_tensors["img_mean"] = np.array([0.485, 0.456, 0.406]).astype("float16" if args.fp16 else "float32").reshape((3, 1, 1))
np_tensors["img_std"] = np.array([0.229, 0.224, 0.225]).astype("float16" if args.fp16 else "float32").reshape((3, 1, 1))
np_tensors["img_scale"] = np.array([255.0]).astype("float16" if args.fp16 else "float32")
for vname, np_tensor in np_tensors.items():
var = fluid.global_scope().find_var(vname)
if args.fp16:
var.get_tensor().set(np_tensor.view(np.uint16), place)
else:
var.get_tensor().set(np_tensor, place)
strategy = fluid.ExecutionStrategy()
strategy.num_threads = args.num_threads
......@@ -308,24 +320,30 @@ def train_parallel(args):
over_all_start = time.time()
fetch_list = [train_cost.name, train_acc1.name, train_acc5.name]
steps_per_pass = args.total_images / args.batch_size / args.dist_env["num_trainers"]
# for multi-processes mode, one GPU device per worker
# for single-process mode, multiple GPU devices per worker
batch_size_per_worker = args.batch_size / get_device_num() if is_mp_mode() else args.batch_size
steps_per_pass = args.total_images / args.dist_env["num_trainers"] / batch_size_per_worker
for pass_id in range(args.num_epochs):
num_samples = 0
start_time = time.time()
if args.visreader and pass_id == 0:
# vistool reader is ulimited data, don't need restart pyreader
train_pyreader.start()
if args.use_visiontool:
# vistool reader is ulimited data, don't need restart pyreader.
# And should be aware that vistool readre doesn't support global shuffle.
if pass_id == 0: train_pyreader.start()
else:
# use pass_id+1 as per pass global shuffle for distributed training
prepare_reader(True, train_pyreader, args, pass_id + 1)
train_pyreader.start()
for batch_id in range(1, steps_per_pass+1):
batch_id = 0
while True:
batch_id += 1
try:
if batch_id % 30 == 0:
fetch_ret = exe.run(fetch_list)
fetched_data = [np.mean(np.array(d)) for d in fetch_ret]
print("Pass %d, batch %d, loss %s, acc1: %s, acc5: %s, avg batch time %.4f" %
(pass_id, batch_id, fetched_data[0], fetched_data[1],
print("Pass %d, batch [%d/%d], loss %s, acc1: %s, acc5: %s, avg batch time %.4f" %
(pass_id, batch_id, steps_per_pass, fetched_data[0], fetched_data[1],
fetched_data[2], (time.time()-start_time) / batch_id))
else:
fetch_ret = exe.run([])
......@@ -335,9 +353,11 @@ def train_parallel(args):
traceback.print_exc()
break
num_samples += args.batch_size
if batch_id > steps_per_pass and args.use_visiontool:
break
print_train_time(start_time, time.time(), num_samples)
if (args.visreader and pass_id == args.num_epochs - 1) or not args.visreader:
if (args.use_visiontool and pass_id == args.num_epochs - 1) or not args.use_visiontool:
train_pyreader.reset()
if pass_id >= args.start_test_pass:
......
......@@ -96,7 +96,7 @@ def distort_color(img):
return img
def process_image(sample, mode, color_jitter, rotate):
def process_image(sample, mode, color_jitter, rotate, normalize):
img_path = sample[0]
img = Image.open(img_path)
......@@ -114,10 +114,12 @@ def process_image(sample, mode, color_jitter, rotate):
if img.mode != 'RGB':
img = img.convert('RGB')
img = np.array(img).astype('float32').transpose((2, 0, 1)) / 255
img -= img_mean
img /= img_std
if normalize:
img = np.array(img).astype('float32').transpose((2, 0, 1)) / 255
img -= img_mean
img /= img_std
else:
img = np.array(img).astype('uint8').transpose((2, 0, 1))
if mode == 'train' or mode == 'val':
return img, sample[1]
......@@ -131,7 +133,8 @@ def _reader_creator(file_list,
color_jitter=False,
rotate=False,
data_dir=DATA_DIR,
pass_id_as_seed=0):
pass_id_as_seed=0,
normalize=True):
def reader():
with open(file_list) as flist:
full_lines = [line.strip() for line in flist]
......@@ -164,12 +167,12 @@ def _reader_creator(file_list,
yield [img_path]
mapper = functools.partial(
process_image, mode=mode, color_jitter=color_jitter, rotate=rotate)
process_image, mode=mode, color_jitter=color_jitter, rotate=rotate, normalize=normalize)
return paddle.reader.xmap_readers(mapper, reader, THREAD, BUF_SIZE)
def train(data_dir=DATA_DIR, pass_id_as_seed=0):
def train(data_dir=DATA_DIR, pass_id_as_seed=0, normalize=True):
file_list = os.path.join(data_dir, 'train_list.txt')
return _reader_creator(
file_list,
......@@ -178,14 +181,15 @@ def train(data_dir=DATA_DIR, pass_id_as_seed=0):
color_jitter=False,
rotate=False,
data_dir=data_dir,
pass_id_as_seed=pass_id_as_seed)
pass_id_as_seed=pass_id_as_seed,
normalize=normalize)
def val(data_dir=DATA_DIR):
def val(data_dir=DATA_DIR, normalize=True):
file_list = os.path.join(data_dir, 'val_list.txt')
return _reader_creator(file_list, 'val', shuffle=False, data_dir=data_dir)
return _reader_creator(file_list, 'val', shuffle=False, data_dir=data_dir, normalize=normalize)
def test(data_dir=DATA_DIR):
def test(data_dir=DATA_DIR, normalize=True):
file_list = os.path.join(data_dir, 'val_list.txt')
return _reader_creator(file_list, 'test', shuffle=False, data_dir=data_dir)
return _reader_creator(file_list, 'test', shuffle=False, data_dir=data_dir, normalize=normalize)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册