提交 a4dd153b 编写于 作者: Y Yancey1989

update

上级 51bc0a90
import os
def dist_env():
"""
Return a dict of all variable that distributed training may use.
NOTE: you may rewrite this function to suit your cluster environments.
"""
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
num_trainers = 1
training_role = os.getenv("TRAINING_ROLE", "TRAINER")
assert(training_role == "PSERVER" or training_role == "TRAINER")
# - PADDLE_TRAINER_ENDPOINTS means nccl2 mode.
# - PADDLE_PSERVER_ENDPOINTS means pserver mode.
# - PADDLE_CURRENT_ENDPOINT means current process endpoint.
worker_endpoints = []
port = os.getenv("PADDLE_PORT", "8701")
if os.getenv("PADDLE_TRAINER_ENDPOINTS"):
trainer_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS")
else:# for paddlecloud
worker_ips = os.getenv("PADDLE_TRAINERS", "")
for ip in worker_ips.split(","):
worker_endpoints.append(':'.join([ip, port]))
trainer_endpoints = ",".join(worker_endpoints)
pserver_ips = os.getenv("PADDLE_PSERVERS", "")
eplist = []
for ip in pserver_ips.split(","):
eplist.append(':'.join([ip, port]))
pserver_endpoints = ",".join(eplist)
if os.getenv("PADDLE_CURRENT_ENDPOINT"):
current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
else:# for paddlecloud
current_endpoint = os.getenv("POD_IP", "") + ":" + port
if trainer_endpoints:
trainer_endpoints = trainer_endpoints.split(",")
num_trainers = len(trainer_endpoints)
elif pserver_endpoints:
num_trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
return {
"trainer_id": trainer_id,
"num_trainers": num_trainers,
"current_endpoint": current_endpoint,
"training_role": training_role,
"pserver_endpoints": pserver_endpoints,
"trainer_endpoints": trainer_endpoints
}
...@@ -12,18 +12,32 @@ from tqdm import tqdm ...@@ -12,18 +12,32 @@ from tqdm import tqdm
import time import time
import multiprocessing import multiprocessing
TRAINER_NUMS = int(os.getenv("PADDLE_TRAINER_NUM", "1")) TRAINER_NUMS = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
TRAINER_ID = int(os.getenv("PADDLE_TRAINER_ID", "0")) TRAINER_ID = int(os.getenv("PADDLE_TRAINER_ID", "0"))
epoch = 0 epoch = 0
FINISH_EVENT = "FINISH_EVENT" FINISH_EVENT = "FINISH_EVENT"
class PaddleDataLoader(object): class PaddleDataLoader(object):
def __init__(self, torch_dataset, indices=None, concurrent=16, queue_size=3072): def __init__(self, torch_dataset, indices=None, concurrent=4, queue_size=1024, shuffle_seed=None, is_train=True):
self.torch_dataset = torch_dataset self.torch_dataset = torch_dataset
self.data_queue = multiprocessing.Queue(queue_size) self.data_queue = multiprocessing.Queue(queue_size)
self.indices = indices self.indices = indices
self.concurrent = concurrent self.concurrent = concurrent
self.shuffle_seed = shuffle_seed
self.is_train = is_train
def _shuffle_worker_indices(self, indices, shuffle_seed = None):
import copy
shuffled_indices = copy.deepcopy(indices)
random.seed(time.time() if shuffle_seed is None else shuffle_seed)
random.shuffle(shuffled_indices)
sampels_per_worker = len(shuffled_indices) / TRAINER_NUMS
start = TRAINER_ID * sampels_per_worker
end = (TRAINER_ID + 1) * sampels_per_worker
ret = shuffled_indices[start:end]
print("shuffling worker indices trainer_id: [%d], num_trainers:[%d], len: [%d], start: [%d], end: [%d]" % (TRAINER_ID, TRAINER_NUMS, len(ret), start, end))
return ret
def _worker_loop(self, dataset, worker_indices, worker_id): def _worker_loop(self, dataset, worker_indices, worker_id):
cnt = 0 cnt = 0
for idx in worker_indices: for idx in worker_indices:
...@@ -41,14 +55,16 @@ class PaddleDataLoader(object): ...@@ -41,14 +55,16 @@ class PaddleDataLoader(object):
print("total image: ", total_img) print("total image: ", total_img)
if self.indices is None: if self.indices is None:
self.indices = [i for i in xrange(total_img)] self.indices = [i for i in xrange(total_img)]
random.seed(time.time()) if self.is_train:
random.shuffle(self.indices) print("shuffle indices by seed: ", self.shuffle_seed)
print("shuffle indices: %s ..." % self.indices[:10]) self.indices = self._shuffle_worker_indices(self.indices, self.shuffle_seed)
print("samples: %d shuffled indices: %s ..." % (len(self.indices), self.indices[:10]))
imgs_per_worker = int(math.ceil(total_img / self.concurrent)) imgs_per_worker = int(math.ceil(len(self.indices) / self.concurrent))
for i in xrange(self.concurrent): for i in xrange(self.concurrent):
start = i * imgs_per_worker start = i * imgs_per_worker
end = (i + 1) * imgs_per_worker if i != self.concurrent - 1 else None end = (i + 1) * imgs_per_worker if i != self.concurrent - 1 else -1
print("loader thread: [%d] start idx: [%d], end idx: [%d]" % (i, start, end))
sliced_indices = self.indices[start:end] sliced_indices = self.indices[start:end]
w = multiprocessing.Process( w = multiprocessing.Process(
target=self._worker_loop, target=self._worker_loop,
...@@ -68,13 +84,13 @@ class PaddleDataLoader(object): ...@@ -68,13 +84,13 @@ class PaddleDataLoader(object):
return _reader_creator return _reader_creator
def train(traindir, sz, min_scale=0.08): def train(traindir, sz, min_scale=0.08, shuffle_seed=None):
train_tfms = [ train_tfms = [
transforms.RandomResizedCrop(sz, scale=(min_scale, 1.0)), transforms.RandomResizedCrop(sz, scale=(min_scale, 1.0)),
transforms.RandomHorizontalFlip() transforms.RandomHorizontalFlip()
] ]
train_dataset = datasets.ImageFolder(traindir, transforms.Compose(train_tfms)) train_dataset = datasets.ImageFolder(traindir, transforms.Compose(train_tfms))
return PaddleDataLoader(train_dataset).reader() return PaddleDataLoader(train_dataset, shuffle_seed=shuffle_seed)
def test(valdir, bs, sz, rect_val=False): def test(valdir, bs, sz, rect_val=False):
if rect_val: if rect_val:
...@@ -84,12 +100,12 @@ def test(valdir, bs, sz, rect_val=False): ...@@ -84,12 +100,12 @@ def test(valdir, bs, sz, rect_val=False):
ar_tfms = [transforms.Resize(int(sz* 1.14)), CropArTfm(idx2ar, sz)] ar_tfms = [transforms.Resize(int(sz* 1.14)), CropArTfm(idx2ar, sz)]
val_dataset = ValDataset(valdir, transform=ar_tfms) val_dataset = ValDataset(valdir, transform=ar_tfms)
return PaddleDataLoader(val_dataset, concurrent=1, indices=idx_sorted).reader() return PaddleDataLoader(val_dataset, concurrent=1, indices=idx_sorted, is_train=False)
val_tfms = [transforms.Resize(int(sz* 1.14)), transforms.CenterCrop(sz)] val_tfms = [transforms.Resize(int(sz* 1.14)), transforms.CenterCrop(sz)]
val_dataset = datasets.ImageFolder(valdir, transforms.Compose(val_tfms)) val_dataset = datasets.ImageFolder(valdir, transforms.Compose(val_tfms))
return PaddleDataLoader(val_dataset).reader() return PaddleDataLoader(val_dataset, is_train=False)
class ValDataset(datasets.ImageFolder): class ValDataset(datasets.ImageFolder):
...@@ -162,7 +178,7 @@ if __name__ == "__main__": ...@@ -162,7 +178,7 @@ if __name__ == "__main__":
import time import time
test_reader = test(valdir="/data/imagenet/validation", bs=50, sz=288, rect_val=True) test_reader = test(valdir="/data/imagenet/validation", bs=50, sz=288, rect_val=True)
start_ts = time.time() start_ts = time.time()
for idx, data in enumerate(test_reader()): for idx, data in enumerate(test_reader.reader()):
print(idx, data[0].shape, data[1]) print(idx, data[0].shape, data[1])
if idx == 10: if idx == 10:
break break
......
...@@ -33,20 +33,34 @@ from utility import add_arguments, print_arguments ...@@ -33,20 +33,34 @@ from utility import add_arguments, print_arguments
import functools import functools
import models import models
import utils import utils
from env import dist_env
def is_mp_mode():
return True if os.getenv("FLAGS_selected_gpus") else False
def nccl2_prepare(args, startup_prog):
config = fluid.DistributeTranspilerConfig()
config.mode = "nccl2"
t = fluid.DistributeTranspiler(config=config)
envs = args.dist_env
t.transpile(envs["trainer_id"],
trainers=','.join(envs["trainer_endpoints"]),
current_endpoint=envs["current_endpoint"],
startup_program=startup_prog)
DEBUG_PROG = bool(os.getenv("DEBUG_PROG", "0")) DEBUG_PROG = bool(os.getenv("DEBUG_PROG", "0"))
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description=__doc__) parser = argparse.ArgumentParser(description=__doc__)
add_arg = functools.partial(add_arguments, argparser=parser) add_arg = functools.partial(add_arguments, argparser=parser)
# yapf: disable # yapf: disable
add_arg('batch_size', int, 256, "Minibatch size.")
add_arg('use_gpu', bool, True, "Whether to use GPU or not.") add_arg('use_gpu', bool, True, "Whether to use GPU or not.")
add_arg('total_images', int, 1281167, "Training image number.") add_arg('total_images', int, 1281167, "Training image number.")
add_arg('num_epochs', int, 120, "number of epochs.") add_arg('num_epochs', int, 120, "number of epochs.")
add_arg('class_dim', int, 1000, "Class number.") add_arg('class_dim', int, 1000, "Class number.")
add_arg('image_shape', str, "3,224,224", "input image size") add_arg('image_shape', str, "3,224,224", "input image size")
add_arg('model_save_dir', str, "output", "model save directory") add_arg('model_save_dir', str, "output", "model save directory")
add_arg('with_mem_opt', bool, False, "Whether to use memory optimization or not.")
add_arg('pretrained_model', str, None, "Whether to use pretrained model.") add_arg('pretrained_model', str, None, "Whether to use pretrained model.")
add_arg('checkpoint', str, None, "Whether to resume checkpoint.") add_arg('checkpoint', str, None, "Whether to resume checkpoint.")
add_arg('lr', float, 0.1, "set learning rate.") add_arg('lr', float, 0.1, "set learning rate.")
...@@ -60,14 +74,14 @@ def parse_args(): ...@@ -60,14 +74,14 @@ def parse_args():
add_arg('start_test_pass', int, 0, "Start test after x passes.") add_arg('start_test_pass', int, 0, "Start test after x passes.")
add_arg('num_threads', int, 8, "Use num_threads to run the fluid program.") add_arg('num_threads', int, 8, "Use num_threads to run the fluid program.")
add_arg('reduce_strategy', str, "allreduce", "Choose from reduce or allreduce.") add_arg('reduce_strategy', str, "allreduce", "Choose from reduce or allreduce.")
add_arg('log_period', int, 5, "Print period, defualt is 5.") add_arg('log_period', int, 30, "Print period, defualt is 5.")
add_arg('init_conv2d_kaiming', bool, False, "Whether to initliaze conv2d weight by kaiming.")
add_arg('memory_optimize', bool, True, "Whether to enable memory optimize.") add_arg('memory_optimize', bool, True, "Whether to enable memory optimize.")
# yapf: enable # yapf: enable
args = parser.parse_args() args = parser.parse_args()
return args return args
def get_device_num(): def get_device_num():
return 8
import subprocess import subprocess
visible_device = os.getenv('CUDA_VISIBLE_DEVICES') visible_device = os.getenv('CUDA_VISIBLE_DEVICES')
if visible_device: if visible_device:
...@@ -78,9 +92,6 @@ def get_device_num(): ...@@ -78,9 +92,6 @@ def get_device_num():
return device_num return device_num
def linear_lr_decay(lr_values, epochs, bs_values, total_images): def linear_lr_decay(lr_values, epochs, bs_values, total_images):
"""Applies cosine decay to the learning rate.
lr = 0.05 * (math.cos(epoch * (math.pi / 120)) + 1)
"""
from paddle.fluid.layers.learning_rate_scheduler import _decay_step_counter from paddle.fluid.layers.learning_rate_scheduler import _decay_step_counter
import paddle.fluid.layers.tensor as tensor import paddle.fluid.layers.tensor as tensor
import math import math
...@@ -101,9 +112,9 @@ def linear_lr_decay(lr_values, epochs, bs_values, total_images): ...@@ -101,9 +112,9 @@ def linear_lr_decay(lr_values, epochs, bs_values, total_images):
linear_epoch = end_epoch - start_epoch linear_epoch = end_epoch - start_epoch
start_lr, end_lr = lr_values[idx] start_lr, end_lr = lr_values[idx]
linear_lr = end_lr - start_lr linear_lr = end_lr - start_lr
steps = last_steps + math.ceil(total_images * 1.0 / bs_values[idx]) * linear_epoch steps = last_steps + math.ceil(total_images * 1.0 / bs_values[idx]) * linear_epoch + 1
with switch.case(global_step < steps): with switch.case(global_step < steps):
decayed_lr = start_lr + linear_lr * ((global_step - last_steps) * 1.0/steps) decayed_lr = start_lr + linear_lr * ((global_step - last_steps)* 1.0/(steps - last_steps))
last_steps = steps last_steps = steps
fluid.layers.tensor.assign(decayed_lr, lr) fluid.layers.tensor.assign(decayed_lr, lr)
last_value_var = tensor.fill_constant( last_value_var = tensor.fill_constant(
...@@ -134,16 +145,16 @@ def test_parallel(exe, test_args, args, test_prog, feeder, bs): ...@@ -134,16 +145,16 @@ def test_parallel(exe, test_args, args, test_prog, feeder, bs):
e.update( e.update(
value=np.array(acc_rets[i]), weight=bs) value=np.array(acc_rets[i]), weight=bs)
num_samples = batch_id * bs * get_device_num() num_samples = batch_id * bs * get_device_num()
print_train_time(start_ts, time.time(), num_samples) print_train_time(start_ts, time.time(), num_samples, "Test")
return [e.eval() for e in acc_evaluators] return [e.eval() for e in acc_evaluators]
def build_program(args, is_train, main_prog, startup_prog, py_reader_startup_prog, img_size, trn_dir, batch_size, min_scale, rect_val): def build_program(args, is_train, main_prog, startup_prog, py_reader_startup_prog, img_size, trn_dir, batch_size, min_scale, rect_val):
if is_train: if is_train:
reader = torchvision_reader.train(traindir=os.path.join(args.data_dir, trn_dir, "train"), sz=img_size, min_scale=min_scale) dataloader = torchvision_reader.train(traindir=os.path.join(args.data_dir, trn_dir, "train"), sz=img_size, min_scale=min_scale)
else: else:
reader = torchvision_reader.test(valdir=os.path.join(args.data_dir, trn_dir, "validation"), bs=batch_size * get_device_num(), sz=img_size, rect_val=rect_val) dataloader = torchvision_reader.test(valdir=os.path.join(args.data_dir, trn_dir, "validation"), bs=batch_size if is_mp_mode() else batch_size * get_device_num(), sz=img_size, rect_val=rect_val)
dshape = [3, img_size, img_size] dshape = [3, img_size, img_size]
class_dim = 1000 class_dim = 1000
...@@ -160,17 +171,17 @@ def build_program(args, is_train, main_prog, startup_prog, py_reader_startup_pro ...@@ -160,17 +171,17 @@ def build_program(args, is_train, main_prog, startup_prog, py_reader_startup_pro
with fluid.program_guard(main_prog, py_reader_startup_prog): with fluid.program_guard(main_prog, py_reader_startup_prog):
with fluid.unique_name.guard(): with fluid.unique_name.guard():
pyreader = fluid.layers.py_reader( pyreader = fluid.layers.py_reader(
capacity=batch_size * get_device_num(), capacity=batch_size * 2 if is_mp_mode() else batch_size * get_device_num(),
shapes=([-1] + dshape, (-1, 1)), shapes=([-1] + dshape, (-1, 1)),
dtypes=('uint8', 'int64'), dtypes=('uint8', 'int64'),
name="train_reader_" + str(img_size) if is_train else "test_reader_" + str(img_size), name="train_reader_" + str(img_size),
use_double_buffer=True) use_double_buffer=True)
input, label = fluid.layers.read_file(pyreader) input, label = fluid.layers.read_file(pyreader)
pyreader.decorate_paddle_reader(paddle.batch(reader, batch_size=batch_size)) pyreader.decorate_paddle_reader(paddle.batch(dataloader.reader(), batch_size=batch_size))
else: else:
input = fluid.layers.data(name="image", shape=[3, 244, 244], dtype="uint8") input = fluid.layers.data(name="image", shape=[3, 244, 244], dtype="uint8")
label = fluid.layers.data(name="label", shape=[1], dtype="int64") label = fluid.layers.data(name="label", shape=[1], dtype="int64")
batched_reader = paddle.batch(reader, batch_size=batch_size * get_device_num()) batched_reader = paddle.batch(dataloader.reader(), batch_size=batch_size if is_mp_mode() else batch_size * get_device_num())
cast_img_type = "float16" if args.fp16 else "float32" cast_img_type = "float16" if args.fp16 else "float32"
cast = fluid.layers.cast(input, cast_img_type) cast = fluid.layers.cast(input, cast_img_type)
img_mean = fluid.layers.create_global_var([3, 1, 1], 0.0, cast_img_type, name="img_mean", persistable=True) img_mean = fluid.layers.create_global_var([3, 1, 1], 0.0, cast_img_type, name="img_mean", persistable=True)
...@@ -192,13 +203,10 @@ def build_program(args, is_train, main_prog, startup_prog, py_reader_startup_pro ...@@ -192,13 +203,10 @@ def build_program(args, is_train, main_prog, startup_prog, py_reader_startup_pro
# configure optimize # configure optimize
optimizer = None optimizer = None
if is_train: if is_train:
#total_images = 1281167 / trainer_count
epochs = [(0,7), (7,13), (13, 22), (22, 25), (25, 28)] epochs = [(0,7), (7,13), (13, 22), (22, 25), (25, 28)]
bs_epoch = [x * get_device_num() for x in [224, 224, 96, 96, 50]] bs_epoch = [x * get_device_num() for x in [224, 224, 96, 96, 50]]
lrs = [(1.0, 2.0), (2.0, 0.25), (0.42857142857142855, 0.04285714285714286), (0.04285714285714286, 0.004285714285714286), (0.0022321428571428575, 0.00022321428571428573), 0.00022321428571428573] lrs = [(1.0, 2.0), (2.0, 0.25), (0.42857142857142855, 0.04285714285714286), (0.04285714285714286, 0.004285714285714286), (0.0022321428571428575, 0.00022321428571428573), 0.00022321428571428573]
#boundaries, values = lr_decay(lrs, epochs, bs_epoch, total_images)
#print("lr linear decay boundaries: ", boundaries, " \nvalues: ", values)
optimizer = fluid.optimizer.Momentum( optimizer = fluid.optimizer.Momentum(
learning_rate=linear_lr_decay(lrs, epochs, bs_epoch, args.total_images), learning_rate=linear_lr_decay(lrs, epochs, bs_epoch, args.total_images),
momentum=0.9, momentum=0.9,
...@@ -216,37 +224,29 @@ def build_program(args, is_train, main_prog, startup_prog, py_reader_startup_pro ...@@ -216,37 +224,29 @@ def build_program(args, is_train, main_prog, startup_prog, py_reader_startup_pro
fluid.memory_optimize(main_prog, skip_grads=True) fluid.memory_optimize(main_prog, skip_grads=True)
return avg_cost, optimizer, [batch_acc1, return avg_cost, optimizer, [batch_acc1,
batch_acc5], batched_reader, pyreader, py_reader_startup_prog batch_acc5], batched_reader, pyreader, py_reader_startup_prog, dataloader
def refresh_program(args, epoch, sz, trn_dir, bs, val_bs, need_update_start_prog=False, min_scale=0.08, rect_val=False): def refresh_program(args, epoch, sz, trn_dir, bs, val_bs, need_update_start_prog=False, min_scale=0.08, rect_val=False):
print('program changed: epoch: [%d], image size: [%d], trn_dir: [%s], batch_size:[%d]' % (epoch, sz, trn_dir, bs)) print('program changed: epoch: [%d], image size: [%d], trn_dir: [%s], batch_size:[%d]' % (epoch, sz, trn_dir, bs))
train_prog = fluid.Program() train_prog = fluid.Program()
test_prog = fluid.Program() test_prog = fluid.Program()
startup_prog = fluid.Program() startup_prog = fluid.Program()
py_reader_startup_prog = fluid.Program() py_reader_startup_prog = fluid.Program()
num_trainers = args.dist_env["num_trainers"]
trainer_id = args.dist_env["trainer_id"]
train_args = build_program(args, True, train_prog, startup_prog, py_reader_startup_prog, sz, trn_dir, bs, min_scale, False) train_args = build_program(args, True, train_prog, startup_prog, py_reader_startup_prog, sz, trn_dir, bs, min_scale, False)
test_args = build_program(args, False, test_prog, startup_prog, py_reader_startup_prog, sz, trn_dir, val_bs, min_scale, rect_val) test_args = build_program(args, False, test_prog, startup_prog, py_reader_startup_prog, sz, trn_dir, val_bs, min_scale, rect_val)
gpu_id = int(os.getenv("FLAGS_selected_gpus")) if is_mp_mode() else 0
place = core.CUDAPlace(0) place = core.CUDAPlace(gpu_id)
startup_exe = fluid.Executor(place) startup_exe = fluid.Executor(place)
print("execute py_reader startup program") print("execute py_reader startup program")
startup_exe.run(py_reader_startup_prog) startup_exe.run(py_reader_startup_prog)
if need_update_start_prog: if need_update_start_prog:
print("execute startup program") print("execute startup program")
if is_mp_mode():
nccl2_prepare(args, startup_prog)
startup_exe.run(startup_prog) startup_exe.run(startup_prog)
if args.init_conv2d_kaiming:
import torch
conv2d_w_vars = [var for var in startup_prog.global_block().vars.values() if var.name.startswith('conv2d_')]
for var in conv2d_w_vars:
torch_w = torch.empty(var.shape)
kaiming_np = torch.nn.init.kaiming_normal_(torch_w, mode='fan_out', nonlinearity='relu').numpy()
tensor = fluid.global_scope().find_var(var.name).get_tensor()
if args.fp16:
tensor.set(np.array(kaiming_np, dtype="float16").view(np.uint16), place)
else:
tensor.set(np.array(kaiming_np, dtype="float32"), place)
np_tensors = {} np_tensors = {}
np_tensors["img_mean"] = np.array([0.485 * 255.0, 0.456 * 255.0, 0.406 * 255.0]).astype("float16" if args.fp16 else "float32").reshape((3, 1, 1)) np_tensors["img_mean"] = np.array([0.485 * 255.0, 0.456 * 255.0, 0.406 * 255.0]).astype("float16" if args.fp16 else "float32").reshape((3, 1, 1))
np_tensors["img_std"] = np.array([0.229 * 255.0, 0.224 * 255.0, 0.225 * 255.0]).astype("float16" if args.fp16 else "float32").reshape((3, 1, 1)) np_tensors["img_std"] = np.array([0.229 * 255.0, 0.224 * 255.0, 0.225 * 255.0]).astype("float16" if args.fp16 else "float32").reshape((3, 1, 1))
...@@ -258,27 +258,26 @@ def refresh_program(args, epoch, sz, trn_dir, bs, val_bs, need_update_start_prog ...@@ -258,27 +258,26 @@ def refresh_program(args, epoch, sz, trn_dir, bs, val_bs, need_update_start_prog
var.get_tensor().set(np_tensor, place) var.get_tensor().set(np_tensor, place)
if DEBUG_PROG:
with open('/tmp/train_prog_pass%d' % epoch, 'w') as f: f.write(train_prog.to_string(True))
with open('/tmp/test_prog_pass%d' % epoch, 'w') as f: f.write(test_prog.to_string(True))
with open('/tmp/startup_prog_pass%d' % epoch, 'w') as f: f.write(startup_prog.to_string(True))
with open('/tmp/py_reader_startup_prog_pass%d' % epoch, 'w') as f: f.write(py_reader_startup_prog.to_string(True))
strategy = fluid.ExecutionStrategy() strategy = fluid.ExecutionStrategy()
strategy.num_threads = args.num_threads strategy.num_threads = args.num_threads
strategy.allow_op_delay = False strategy.allow_op_delay = False
strategy.num_iteration_per_drop_scope = 30
build_strategy = fluid.BuildStrategy() build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy().ReduceStrategy.AllReduce build_strategy.reduce_strategy = fluid.BuildStrategy().ReduceStrategy.AllReduce
avg_loss = train_args[0] avg_loss = train_args[0]
train_exe = fluid.ParallelExecutor( train_exe = fluid.ParallelExecutor(
True, True,
avg_loss.name, avg_loss.name,
main_program=train_prog, main_program=train_prog,
exec_strategy=strategy, exec_strategy=strategy,
build_strategy=build_strategy) build_strategy=build_strategy,
num_trainers=num_trainers,
trainer_id=trainer_id)
test_scope = fluid.global_scope().new_scope()
test_exe = fluid.ParallelExecutor( test_exe = fluid.ParallelExecutor(
True, main_program=test_prog, share_vars_from=train_exe) True, main_program=test_prog, share_vars_from=train_exe, scope=test_scope)
return train_args, test_args, test_prog, train_exe, test_exe return train_args, test_args, test_prog, train_exe, test_exe
...@@ -311,6 +310,8 @@ def train_parallel(args): ...@@ -311,6 +310,8 @@ def train_parallel(args):
num_samples = 0 num_samples = 0
iters = 0 iters = 0
start_time = time.time() start_time = time.time()
dataloader = train_args[6] # Paddle DataLoader
dataloader.shuffle_seed = pass_id + 1
train_args[4].start() # start pyreader train_args[4].start() # start pyreader
while True: while True:
fetch_list = [avg_loss.name] fetch_list = [avg_loss.name]
...@@ -344,7 +345,7 @@ def train_parallel(args): ...@@ -344,7 +345,7 @@ def train_parallel(args):
(pass_id, iters, fetched_data[0], fetched_data[1:-1], fetched_data[-1], train_args[4].queue.size())) (pass_id, iters, fetched_data[0], fetched_data[1:-1], fetched_data[-1], train_args[4].queue.size()))
iters += 1 iters += 1
print_train_time(start_time, time.time(), num_samples) print_train_time(start_time, time.time(), num_samples, "Train")
feed_list = [test_prog.global_block().var(varname) for varname in ("image", "label")] feed_list = [test_prog.global_block().var(varname) for varname in ("image", "label")]
test_feeder = fluid.DataFeeder(feed_list=feed_list, place=fluid.CUDAPlace(0)) test_feeder = fluid.DataFeeder(feed_list=feed_list, place=fluid.CUDAPlace(0))
test_ret = test_parallel(test_exe, test_args, args, test_prog, test_feeder, bs) test_ret = test_parallel(test_exe, test_args, args, test_prog, test_feeder, bs)
...@@ -353,11 +354,11 @@ def train_parallel(args): ...@@ -353,11 +354,11 @@ def train_parallel(args):
print("total train time: ", time.time() - over_all_start) print("total train time: ", time.time() - over_all_start)
def print_train_time(start_time, end_time, num_samples): def print_train_time(start_time, end_time, num_samples, prefix_text=""):
train_elapsed = end_time - start_time train_elapsed = end_time - start_time
examples_per_sec = num_samples / train_elapsed examples_per_sec = num_samples / train_elapsed
print('\nTotal examples: %d, total time: %.5f, %.5f examples/sed\n' % print('\n%s Total examples: %d, total time: %.5f, %.5f examples/sed\n' %
(num_samples, train_elapsed, examples_per_sec)) (prefix_text, num_samples, train_elapsed, examples_per_sec))
def print_paddle_envs(): def print_paddle_envs():
...@@ -370,6 +371,7 @@ def print_paddle_envs(): ...@@ -370,6 +371,7 @@ def print_paddle_envs():
def main(): def main():
args = parse_args() args = parse_args()
args.dist_env = dist_env()
print_arguments(args) print_arguments(args)
print_paddle_envs() print_paddle_envs()
train_parallel(args) train_parallel(args)
......
...@@ -70,7 +70,7 @@ class FastResNet(): ...@@ -70,7 +70,7 @@ class FastResNet():
stride=2 if i == 0 and block != 0 else 1) stride=2 if i == 0 and block != 0 else 1)
pool_size = int(img_size / 32) pool_size = int(img_size / 32)
pool = fluid.layers.pool2d( pool = fluid.layers.pool2d(
input=conv, pool_size=pool_size, pool_type='avg', global_pooling=True) input=conv, pool_size=0, pool_type='avg', global_pooling=True)
out = fluid.layers.fc(input=pool, out = fluid.layers.fc(input=pool,
size=class_dim, size=class_dim,
act=None, act=None,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册