提交 fc36a9a0 编写于 作者: Y Yancey1989

update

上级 a4dd153b
......@@ -14,32 +14,22 @@ import multiprocessing
TRAINER_NUMS = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
TRAINER_ID = int(os.getenv("PADDLE_TRAINER_ID", "0"))
epoch = 0
FINISH_EVENT = "FINISH_EVENT"
class PaddleDataLoader(object):
def __init__(self, torch_dataset, indices=None, concurrent=4, queue_size=1024, shuffle_seed=None, is_train=True):
def __init__(self, torch_dataset, indices=None, concurrent=16, queue_size=1024, shuffle=True, batch_size=224, is_distributed=True):
self.torch_dataset = torch_dataset
self.data_queue = multiprocessing.Queue(queue_size)
self.indices = indices
self.concurrent = concurrent
self.shuffle_seed = shuffle_seed
self.is_train = is_train
def _shuffle_worker_indices(self, indices, shuffle_seed = None):
import copy
shuffled_indices = copy.deepcopy(indices)
random.seed(time.time() if shuffle_seed is None else shuffle_seed)
random.shuffle(shuffled_indices)
sampels_per_worker = len(shuffled_indices) / TRAINER_NUMS
start = TRAINER_ID * sampels_per_worker
end = (TRAINER_ID + 1) * sampels_per_worker
ret = shuffled_indices[start:end]
print("shuffling worker indices trainer_id: [%d], num_trainers:[%d], len: [%d], start: [%d], end: [%d]" % (TRAINER_ID, TRAINER_NUMS, len(ret), start, end))
return ret
self.shuffle_seed = 0
self.shuffle = shuffle
self.is_distributed = is_distributed
self.batch_size = batch_size
def _worker_loop(self, dataset, worker_indices, worker_id):
cnt = 0
print("worker [%d], len: [%d], indices: [%s]"%(worker_id, len(worker_indices), worker_indices[:10]))
for idx in worker_indices:
cnt += 1
img, label = self.torch_dataset[idx]
......@@ -55,20 +45,26 @@ class PaddleDataLoader(object):
print("total image: ", total_img)
if self.indices is None:
self.indices = [i for i in xrange(total_img)]
if self.is_train:
print("shuffle indices by seed: ", self.shuffle_seed)
self.indices = self._shuffle_worker_indices(self.indices, self.shuffle_seed)
print("samples: %d shuffled indices: %s ..." % (len(self.indices), self.indices[:10]))
imgs_per_worker = int(math.ceil(len(self.indices) / self.concurrent))
if self.shuffle:
random.seed(self.shuffle_seed)
random.shuffle(self.indices)
worker_indices = self.indices
if self.is_distributed:
cnt_per_node = len(self.indices) / TRAINER_NUMS
offset = TRAINER_ID * cnt_per_node
worker_indices = self.indices[offset: (offset + cnt_per_node)]
if len(worker_indices) % self.batch_size != 0:
worker_indices += worker_indices[:(self.batch_size - (len(worker_indices) % self.batch_size))]
print("shuffle: [%d], shuffle seed: [%d], worker indices len: [%d], %s" % (self.shuffle, self.shuffle_seed, len(worker_indices), worker_indices[:10]))
cnt_per_thread = int(math.ceil(len(worker_indices) / self.concurrent))
for i in xrange(self.concurrent):
start = i * imgs_per_worker
end = (i + 1) * imgs_per_worker if i != self.concurrent - 1 else -1
print("loader thread: [%d] start idx: [%d], end idx: [%d]" % (i, start, end))
sliced_indices = self.indices[start:end]
offset = i * cnt_per_thread
thread_incides = worker_indices[offset: (offset + cnt_per_thread)]
print("loader thread: [%d] start idx: [%d], end idx: [%d], len: [%d]" % (i, offset, (offset + cnt_per_thread), len(thread_incides)))
w = multiprocessing.Process(
target=self._worker_loop,
args=(self.torch_dataset, sliced_indices, i)
args=(self.torch_dataset, thread_incides, i)
)
w.daemon = True
w.start()
......@@ -84,13 +80,13 @@ class PaddleDataLoader(object):
return _reader_creator
def train(traindir, sz, min_scale=0.08, shuffle_seed=None):
def train(traindir, bs, sz, min_scale=0.08):
train_tfms = [
transforms.RandomResizedCrop(sz, scale=(min_scale, 1.0)),
transforms.RandomHorizontalFlip()
]
train_dataset = datasets.ImageFolder(traindir, transforms.Compose(train_tfms))
return PaddleDataLoader(train_dataset, shuffle_seed=shuffle_seed)
return PaddleDataLoader(train_dataset, batch_size=bs)
def test(valdir, bs, sz, rect_val=False):
if rect_val:
......@@ -100,12 +96,12 @@ def test(valdir, bs, sz, rect_val=False):
ar_tfms = [transforms.Resize(int(sz* 1.14)), CropArTfm(idx2ar, sz)]
val_dataset = ValDataset(valdir, transform=ar_tfms)
return PaddleDataLoader(val_dataset, concurrent=1, indices=idx_sorted, is_train=False)
return PaddleDataLoader(val_dataset, concurrent=1, indices=idx_sorted, shuffle=False, is_distributed=False)
val_tfms = [transforms.Resize(int(sz* 1.14)), transforms.CenterCrop(sz)]
val_dataset = datasets.ImageFolder(valdir, transforms.Compose(val_tfms))
return PaddleDataLoader(val_dataset, is_train=False)
return PaddleDataLoader(val_dataset, is_distributed=False)
class ValDataset(datasets.ImageFolder):
......@@ -170,19 +166,5 @@ def map_idx2ar(idx_ar_sorted, batch_size):
return idx2ar
if __name__ == "__main__":
#ds, sampler = create_validation_set("/data/imagenet/validation", 128, 288, True, True)
#for item in sampler:
# for idx in item:
# ds[idx]
import time
test_reader = test(valdir="/data/imagenet/validation", bs=50, sz=288, rect_val=True)
start_ts = time.time()
for idx, data in enumerate(test_reader.reader()):
print(idx, data[0].shape, data[1])
if idx == 10:
break
if (idx + 1) % 1000 == 0:
cost = (time.time() - start_ts)
print("%d samples per second" % (1000 / cost))
start_ts = time.time()
\ No newline at end of file
reader = test("/work/fast_resnet_data", 64, 128).reader()
print(next(reader()))
\ No newline at end of file
......@@ -19,6 +19,8 @@ import os
import traceback
import numpy as np
import torch
import torchvision_reader
import paddle
import paddle.fluid as fluid
......@@ -26,7 +28,6 @@ import paddle.fluid.core as core
import paddle.fluid.profiler as profiler
import paddle.fluid.transpiler.distribute_transpiler as distribute_transpiler
import torchvision_reader
import sys
sys.path.append("..")
from utility import add_arguments, print_arguments
......@@ -34,6 +35,7 @@ import functools
import models
import utils
from env import dist_env
import reader as imagenet_reader
def is_mp_mode():
return True if os.getenv("FLAGS_selected_gpus") else False
......@@ -50,7 +52,6 @@ def nccl2_prepare(args, startup_prog):
current_endpoint=envs["current_endpoint"],
startup_program=startup_prog)
DEBUG_PROG = bool(os.getenv("DEBUG_PROG", "0"))
def parse_args():
parser = argparse.ArgumentParser(description=__doc__)
add_arg = functools.partial(add_arguments, argparser=parser)
......@@ -58,7 +59,6 @@ def parse_args():
add_arg('use_gpu', bool, True, "Whether to use GPU or not.")
add_arg('total_images', int, 1281167, "Training image number.")
add_arg('num_epochs', int, 120, "number of epochs.")
add_arg('class_dim', int, 1000, "Class number.")
add_arg('image_shape', str, "3,224,224", "input image size")
add_arg('model_save_dir', str, "output", "model save directory")
add_arg('pretrained_model', str, None, "Whether to use pretrained model.")
......@@ -81,7 +81,6 @@ def parse_args():
return args
def get_device_num():
return 8
import subprocess
visible_device = os.getenv('CUDA_VISIBLE_DEVICES')
if visible_device:
......@@ -112,7 +111,7 @@ def linear_lr_decay(lr_values, epochs, bs_values, total_images):
linear_epoch = end_epoch - start_epoch
start_lr, end_lr = lr_values[idx]
linear_lr = end_lr - start_lr
steps = last_steps + math.ceil(total_images * 1.0 / bs_values[idx]) * linear_epoch + 1
steps = last_steps + linear_epoch * total_images / bs_values[idx]
with switch.case(global_step < steps):
decayed_lr = start_lr + linear_lr * ((global_step - last_steps)* 1.0/(steps - last_steps))
last_steps = steps
......@@ -125,9 +124,49 @@ def linear_lr_decay(lr_values, epochs, bs_values, total_images):
fluid.layers.tensor.assign(last_value_var, lr)
return lr
return decayed_lr
def linear_lr_decay_by_epoch(lr_values, epochs, bs_values, total_images):
from paddle.fluid.layers.learning_rate_scheduler import _decay_step_counter
import paddle.fluid.layers.tensor as tensor
import math
with paddle.fluid.default_main_program()._lr_schedule_guard():
global_step = _decay_step_counter()
lr = tensor.create_global_var(
shape=[1],
value=0.0,
dtype='float32',
persistable=True,
name="learning_rate")
with fluid.layers.control_flow.Switch() as switch:
last_steps = 0
for idx, epoch_bound in enumerate(epochs):
start_epoch, end_epoch = epoch_bound
linear_epoch = end_epoch - start_epoch
start_lr, end_lr = lr_values[idx]
linear_lr = end_lr - start_lr
for epoch_step in xrange(linear_epoch):
steps = last_steps + (1 + epoch_step) * total_images / bs_values[idx]
boundary_val = tensor.fill_constant(
shape=[1],
dtype='float32',
value=float(steps),
force_cpu=True)
decayed_lr = start_lr + epoch_step * linear_lr * 1.0 / linear_epoch
with switch.case(global_step < boundary_val):
value_var = tensor.fill_constant(shape=[1], dtype='float32', value=float(decayed_lr))
print("steps: [%d], epoch : [%d], decayed_lr: [%f]" % (steps, start_epoch + epoch_step, decayed_lr))
fluid.layers.tensor.assign(value_var, lr)
last_steps = steps
last_value_var = tensor.fill_constant(
shape=[1],
dtype='float32',
value=float(lr_values[-1]))
with switch.default():
fluid.layers.tensor.assign(last_value_var, lr)
return lr
def test_parallel(exe, test_args, args, test_prog, feeder, bs):
acc_evaluators = []
for i in xrange(len(test_args[2])):
......@@ -149,10 +188,28 @@ def test_parallel(exe, test_args, args, test_prog, feeder, bs):
return [e.eval() for e in acc_evaluators]
def test_single(exe, test_args, args, test_prog, feeder, bs):
test_reader = test_args[3]
to_fetch = [v.name for v in test_args[2]]
acc1 = fluid.metrics.Accuracy()
acc5 = fluid.metrics.Accuracy()
start_ts = time.time()
for batch_id, data in enumerate(test_reader()):
batch_size = len(data[0])
acc_rets = exe.run(test_prog, fetch_list=to_fetch, feed=feeder.feed(data))
acc1.update(value=np.array(acc_rets[0]), weight=batch_size)
acc5.update(value=np.array(acc_rets[1]), weight=batch_size)
if batch_id % 30 == 0:
print("Test batch: [%d], acc_rets: [%s]" % (batch_id, acc_rets))
num_samples = batch_id * bs
print_train_time(start_ts, time.time(), num_samples, "Test")
return np.mean(acc1.eval()), np.mean(acc5.eval())
def build_program(args, is_train, main_prog, startup_prog, py_reader_startup_prog, img_size, trn_dir, batch_size, min_scale, rect_val):
dataloader = None
if is_train:
dataloader = torchvision_reader.train(traindir=os.path.join(args.data_dir, trn_dir, "train"), sz=img_size, min_scale=min_scale)
dataloader = torchvision_reader.train(traindir=os.path.join(args.data_dir, trn_dir, "train"), bs=batch_size if is_mp_mode() else batch_size * get_device_num(), sz=img_size, min_scale=min_scale)
else:
dataloader = torchvision_reader.test(valdir=os.path.join(args.data_dir, trn_dir, "validation"), bs=batch_size if is_mp_mode() else batch_size * get_device_num(), sz=img_size, rect_val=rect_val)
dshape = [3, img_size, img_size]
......@@ -171,22 +228,23 @@ def build_program(args, is_train, main_prog, startup_prog, py_reader_startup_pro
with fluid.program_guard(main_prog, py_reader_startup_prog):
with fluid.unique_name.guard():
pyreader = fluid.layers.py_reader(
capacity=batch_size * 2 if is_mp_mode() else batch_size * get_device_num(),
capacity=batch_size if is_mp_mode() else batch_size * get_device_num(),
shapes=([-1] + dshape, (-1, 1)),
dtypes=('uint8', 'int64'),
name="train_reader_" + str(img_size),
name="train_reader_" + str(img_size) if is_train else "test_reader_" + str(img_size),
use_double_buffer=True)
input, label = fluid.layers.read_file(pyreader)
pyreader.decorate_paddle_reader(paddle.batch(dataloader.reader(), batch_size=batch_size))
#pyreader.decorate_paddle_reader(paddle.batch(imagenet_reader.train(os.path.join(args.data_dir, trn_dir, "train")), batch_size=batch_size))
#pyreader.decorate_paddle_reader(paddle.batch(dataloader.reader(), batch_size=batch_size))
else:
input = fluid.layers.data(name="image", shape=[3, 244, 244], dtype="uint8")
label = fluid.layers.data(name="label", shape=[1], dtype="int64")
batched_reader = paddle.batch(dataloader.reader(), batch_size=batch_size if is_mp_mode() else batch_size * get_device_num())
#batched_reader = paddle.batch(dataloader.reader(), batch_size=batch_size)
cast_img_type = "float16" if args.fp16 else "float32"
cast = fluid.layers.cast(input, cast_img_type)
img_mean = fluid.layers.create_global_var([3, 1, 1], 0.0, cast_img_type, name="img_mean", persistable=True)
img_std = fluid.layers.create_global_var([3, 1, 1], 0.0, cast_img_type, name="img_std", persistable=True)
# image = (image - (mean * 255.0)) / (std * 255.0)
#image = (image - (mean * 255.0)) / (std * 255.0)
t1 = fluid.layers.elementwise_sub(cast, img_mean, axis=1)
t2 = fluid.layers.elementwise_div(t1, img_std, axis=1)
......@@ -204,11 +262,11 @@ def build_program(args, is_train, main_prog, startup_prog, py_reader_startup_pro
optimizer = None
if is_train:
epochs = [(0,7), (7,13), (13, 22), (22, 25), (25, 28)]
bs_epoch = [x * get_device_num() for x in [224, 224, 96, 96, 50]]
bs_epoch = [x if is_mp_mode() else x * get_device_num() for x in [224, 224, 96, 96, 50]]
lrs = [(1.0, 2.0), (2.0, 0.25), (0.42857142857142855, 0.04285714285714286), (0.04285714285714286, 0.004285714285714286), (0.0022321428571428575, 0.00022321428571428573), 0.00022321428571428573]
images_per_worker = args.total_images / get_device_num() if is_mp_mode() else args.total_images
optimizer = fluid.optimizer.Momentum(
learning_rate=linear_lr_decay(lrs, epochs, bs_epoch, args.total_images),
learning_rate=linear_lr_decay_by_epoch(lrs, epochs, bs_epoch, images_per_worker),
momentum=0.9,
regularization=fluid.regularizer.L2Decay(1e-4))
if args.fp16:
......@@ -220,8 +278,12 @@ def build_program(args, is_train, main_prog, startup_prog, py_reader_startup_pro
else:
optimizer.minimize(avg_cost)
if args.memory_optimize:
fluid.memory_optimize(main_prog, skip_grads=True)
if args.memory_optimize:
fluid.memory_optimize(main_prog, skip_grads=True)
if is_train:
pyreader.decorate_paddle_reader(paddle.batch(dataloader.reader(), batch_size=batch_size, drop_last=True))
else:
batched_reader = paddle.batch(dataloader.reader(), batch_size=batch_size if is_mp_mode() else batch_size * get_device_num(), drop_last=True)
return avg_cost, optimizer, [batch_acc1,
batch_acc5], batched_reader, pyreader, py_reader_startup_prog, dataloader
......@@ -247,6 +309,17 @@ def refresh_program(args, epoch, sz, trn_dir, bs, val_bs, need_update_start_prog
if is_mp_mode():
nccl2_prepare(args, startup_prog)
startup_exe.run(startup_prog)
conv2d_w_vars = [var for var in startup_prog.global_block().vars.values() if var.name.startswith('conv2d_')]
for var in conv2d_w_vars:
torch_w = torch.empty(var.shape)
#print("initialize %s, shape: %s, with kaiming normalization." % (var.name, var.shape))
kaiming_np = torch.nn.init.kaiming_normal_(torch_w, mode='fan_out', nonlinearity='relu').numpy()
tensor = fluid.global_scope().find_var(var.name).get_tensor()
if args.fp16:
tensor.set(np.array(kaiming_np, dtype="float16").view(np.uint16), place)
else:
tensor.set(np.array(kaiming_np, dtype="float32"), place)
np_tensors = {}
np_tensors["img_mean"] = np.array([0.485 * 255.0, 0.456 * 255.0, 0.406 * 255.0]).astype("float16" if args.fp16 else "float32").reshape((3, 1, 1))
np_tensors["img_std"] = np.array([0.229 * 255.0, 0.224 * 255.0, 0.225 * 255.0]).astype("float16" if args.fp16 else "float32").reshape((3, 1, 1))
......@@ -275,10 +348,11 @@ def refresh_program(args, epoch, sz, trn_dir, bs, val_bs, need_update_start_prog
build_strategy=build_strategy,
num_trainers=num_trainers,
trainer_id=trainer_id)
test_scope = fluid.global_scope().new_scope()
test_exe = fluid.ParallelExecutor(
True, main_program=test_prog, share_vars_from=train_exe, scope=test_scope)
True, main_program=test_prog, share_vars_from=train_exe)
#return train_args, test_args, test_prog, train_exe, test_exe
return train_args, test_args, test_prog, train_exe, test_exe
# NOTE: only need to benchmark using parallelexe
......@@ -298,6 +372,7 @@ def train_parallel(args):
train_args, test_args, test_prog, exe, test_exe = refresh_program(args, pass_id, sz=128, trn_dir="sz/160/", bs=bs, val_bs=val_bs, need_update_start_prog=True)
elif pass_id == 13: #13
bs = 96
val_bs = 32
train_args, test_args, test_prog, exe, test_exe = refresh_program(args, pass_id, sz=224, trn_dir="sz/352/", bs=bs, val_bs=val_bs, min_scale=0.087)
elif pass_id == 25: #25
bs = 50
......@@ -310,15 +385,16 @@ def train_parallel(args):
num_samples = 0
iters = 0
start_time = time.time()
dataloader = train_args[6] # Paddle DataLoader
dataloader.shuffle_seed = pass_id + 1
train_dataloader = train_args[6] # Paddle DataLoader
train_dataloader.shuffle_seed = pass_id + 1
train_args[4].start() # start pyreader
batch_time_start = time.time()
while True:
fetch_list = [avg_loss.name]
acc_name_list = [v.name for v in train_args[2]]
fetch_list.extend(acc_name_list)
fetch_list.append("learning_rate")
if iters % args.log_period == 0:
if iters > 0 and iters % args.log_period == 0:
should_print = True
else:
should_print = False
......@@ -337,18 +413,21 @@ def train_parallel(args):
traceback.print_exc()
exit(1)
num_samples += bs * get_device_num()
num_samples += bs if is_mp_mode() else bs * get_device_num()
if should_print:
fetched_data = [np.mean(np.array(d)) for d in fetch_ret]
print("Pass %d, batch %d, loss %s, accucacys: %s, learning_rate %s, py_reader queue_size: %d" %
(pass_id, iters, fetched_data[0], fetched_data[1:-1], fetched_data[-1], train_args[4].queue.size()))
print("Pass %d, batch %d, loss %s, accucacys: %s, learning_rate %s, py_reader queue_size: %d, avg batch time: %0.2f " %
(pass_id, iters, fetched_data[0], fetched_data[1:-1], fetched_data[-1], train_args[4].queue.size(), (time.time() - batch_time_start) * 1.0 / bs ))
batch_time_start = time.time()
iters += 1
print_train_time(start_time, time.time(), num_samples, "Train")
feed_list = [test_prog.global_block().var(varname) for varname in ("image", "label")]
test_feeder = fluid.DataFeeder(feed_list=feed_list, place=fluid.CUDAPlace(0))
test_ret = test_parallel(test_exe, test_args, args, test_prog, test_feeder, bs)
gpu_id = int(os.getenv("FLAGS_selected_gpus")) if is_mp_mode() else 0
test_feeder = fluid.DataFeeder(feed_list=feed_list, place=fluid.CUDAPlace(gpu_id))
#test_ret = test_single(test_exe, test_args, args, test_prog, test_feeder, val_bs)
test_ret = test_parallel(test_exe, test_args, args, test_prog, test_feeder, val_bs)
print("Pass: %d, Test Accuracy: %s, Spend %.2f hours\n" %
(pass_id, [np.mean(np.array(v)) for v in test_ret], (time.time() - over_all_start) / 3600))
......
......@@ -70,10 +70,9 @@ class FastResNet():
stride=2 if i == 0 and block != 0 else 1)
pool_size = int(img_size / 32)
pool = fluid.layers.pool2d(
input=conv, pool_size=0, pool_type='avg', global_pooling=True)
input=conv, pool_size=pool_size, pool_type='avg', global_pooling=True)
out = fluid.layers.fc(input=pool,
size=class_dim,
act=None,
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.NormalInitializer(0.0, 0.01),
regularizer=fluid.regularizer.L2Decay(1e-4)),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册