From a4dd153b7d74aac0c4ada6cd5095472fc96698b4 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Fri, 22 Feb 2019 16:43:33 +0800 Subject: [PATCH] update --- .../image_classification/fast_resnet/env.py | 50 ++++++++++ .../fast_resnet/torchvision_reader.py | 42 +++++--- .../image_classification/fast_resnet/train.py | 96 ++++++++++--------- .../models/fast_resnet.py | 2 +- 4 files changed, 129 insertions(+), 61 deletions(-) create mode 100644 fluid/PaddleCV/image_classification/fast_resnet/env.py diff --git a/fluid/PaddleCV/image_classification/fast_resnet/env.py b/fluid/PaddleCV/image_classification/fast_resnet/env.py new file mode 100644 index 00000000..54242022 --- /dev/null +++ b/fluid/PaddleCV/image_classification/fast_resnet/env.py @@ -0,0 +1,50 @@ +import os + + +def dist_env(): + """ + Return a dict of all variable that distributed training may use. + NOTE: you may rewrite this function to suit your cluster environments. + """ + trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) + num_trainers = 1 + training_role = os.getenv("TRAINING_ROLE", "TRAINER") + assert(training_role == "PSERVER" or training_role == "TRAINER") + + # - PADDLE_TRAINER_ENDPOINTS means nccl2 mode. + # - PADDLE_PSERVER_ENDPOINTS means pserver mode. + # - PADDLE_CURRENT_ENDPOINT means current process endpoint. + worker_endpoints = [] + port = os.getenv("PADDLE_PORT", "8701") + if os.getenv("PADDLE_TRAINER_ENDPOINTS"): + trainer_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS") + else:# for paddlecloud + worker_ips = os.getenv("PADDLE_TRAINERS", "") + for ip in worker_ips.split(","): + worker_endpoints.append(':'.join([ip, port])) + trainer_endpoints = ",".join(worker_endpoints) + + pserver_ips = os.getenv("PADDLE_PSERVERS", "") + eplist = [] + for ip in pserver_ips.split(","): + eplist.append(':'.join([ip, port])) + pserver_endpoints = ",".join(eplist) + + if os.getenv("PADDLE_CURRENT_ENDPOINT"): + current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT") + else:# for paddlecloud + current_endpoint = os.getenv("POD_IP", "") + ":" + port + if trainer_endpoints: + trainer_endpoints = trainer_endpoints.split(",") + num_trainers = len(trainer_endpoints) + elif pserver_endpoints: + num_trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) + + return { + "trainer_id": trainer_id, + "num_trainers": num_trainers, + "current_endpoint": current_endpoint, + "training_role": training_role, + "pserver_endpoints": pserver_endpoints, + "trainer_endpoints": trainer_endpoints + } diff --git a/fluid/PaddleCV/image_classification/fast_resnet/torchvision_reader.py b/fluid/PaddleCV/image_classification/fast_resnet/torchvision_reader.py index 1d4f3a78..823e7b8c 100644 --- a/fluid/PaddleCV/image_classification/fast_resnet/torchvision_reader.py +++ b/fluid/PaddleCV/image_classification/fast_resnet/torchvision_reader.py @@ -12,18 +12,32 @@ from tqdm import tqdm import time import multiprocessing -TRAINER_NUMS = int(os.getenv("PADDLE_TRAINER_NUM", "1")) +TRAINER_NUMS = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) TRAINER_ID = int(os.getenv("PADDLE_TRAINER_ID", "0")) epoch = 0 FINISH_EVENT = "FINISH_EVENT" class PaddleDataLoader(object): - def __init__(self, torch_dataset, indices=None, concurrent=16, queue_size=3072): + def __init__(self, torch_dataset, indices=None, concurrent=4, queue_size=1024, shuffle_seed=None, is_train=True): self.torch_dataset = torch_dataset self.data_queue = multiprocessing.Queue(queue_size) self.indices = indices self.concurrent = concurrent - + self.shuffle_seed = shuffle_seed + self.is_train = is_train + + def _shuffle_worker_indices(self, indices, shuffle_seed = None): + import copy + shuffled_indices = copy.deepcopy(indices) + random.seed(time.time() if shuffle_seed is None else shuffle_seed) + random.shuffle(shuffled_indices) + sampels_per_worker = len(shuffled_indices) / TRAINER_NUMS + start = TRAINER_ID * sampels_per_worker + end = (TRAINER_ID + 1) * sampels_per_worker + ret = shuffled_indices[start:end] + print("shuffling worker indices trainer_id: [%d], num_trainers:[%d], len: [%d], start: [%d], end: [%d]" % (TRAINER_ID, TRAINER_NUMS, len(ret), start, end)) + return ret + def _worker_loop(self, dataset, worker_indices, worker_id): cnt = 0 for idx in worker_indices: @@ -41,14 +55,16 @@ class PaddleDataLoader(object): print("total image: ", total_img) if self.indices is None: self.indices = [i for i in xrange(total_img)] - random.seed(time.time()) - random.shuffle(self.indices) - print("shuffle indices: %s ..." % self.indices[:10]) + if self.is_train: + print("shuffle indices by seed: ", self.shuffle_seed) + self.indices = self._shuffle_worker_indices(self.indices, self.shuffle_seed) + print("samples: %d shuffled indices: %s ..." % (len(self.indices), self.indices[:10])) - imgs_per_worker = int(math.ceil(total_img / self.concurrent)) + imgs_per_worker = int(math.ceil(len(self.indices) / self.concurrent)) for i in xrange(self.concurrent): start = i * imgs_per_worker - end = (i + 1) * imgs_per_worker if i != self.concurrent - 1 else None + end = (i + 1) * imgs_per_worker if i != self.concurrent - 1 else -1 + print("loader thread: [%d] start idx: [%d], end idx: [%d]" % (i, start, end)) sliced_indices = self.indices[start:end] w = multiprocessing.Process( target=self._worker_loop, @@ -68,13 +84,13 @@ class PaddleDataLoader(object): return _reader_creator -def train(traindir, sz, min_scale=0.08): +def train(traindir, sz, min_scale=0.08, shuffle_seed=None): train_tfms = [ transforms.RandomResizedCrop(sz, scale=(min_scale, 1.0)), transforms.RandomHorizontalFlip() ] train_dataset = datasets.ImageFolder(traindir, transforms.Compose(train_tfms)) - return PaddleDataLoader(train_dataset).reader() + return PaddleDataLoader(train_dataset, shuffle_seed=shuffle_seed) def test(valdir, bs, sz, rect_val=False): if rect_val: @@ -84,12 +100,12 @@ def test(valdir, bs, sz, rect_val=False): ar_tfms = [transforms.Resize(int(sz* 1.14)), CropArTfm(idx2ar, sz)] val_dataset = ValDataset(valdir, transform=ar_tfms) - return PaddleDataLoader(val_dataset, concurrent=1, indices=idx_sorted).reader() + return PaddleDataLoader(val_dataset, concurrent=1, indices=idx_sorted, is_train=False) val_tfms = [transforms.Resize(int(sz* 1.14)), transforms.CenterCrop(sz)] val_dataset = datasets.ImageFolder(valdir, transforms.Compose(val_tfms)) - return PaddleDataLoader(val_dataset).reader() + return PaddleDataLoader(val_dataset, is_train=False) class ValDataset(datasets.ImageFolder): @@ -162,7 +178,7 @@ if __name__ == "__main__": import time test_reader = test(valdir="/data/imagenet/validation", bs=50, sz=288, rect_val=True) start_ts = time.time() - for idx, data in enumerate(test_reader()): + for idx, data in enumerate(test_reader.reader()): print(idx, data[0].shape, data[1]) if idx == 10: break diff --git a/fluid/PaddleCV/image_classification/fast_resnet/train.py b/fluid/PaddleCV/image_classification/fast_resnet/train.py index c252d417..9ef19134 100644 --- a/fluid/PaddleCV/image_classification/fast_resnet/train.py +++ b/fluid/PaddleCV/image_classification/fast_resnet/train.py @@ -33,20 +33,34 @@ from utility import add_arguments, print_arguments import functools import models import utils +from env import dist_env + +def is_mp_mode(): + return True if os.getenv("FLAGS_selected_gpus") else False + +def nccl2_prepare(args, startup_prog): + config = fluid.DistributeTranspilerConfig() + config.mode = "nccl2" + t = fluid.DistributeTranspiler(config=config) + + envs = args.dist_env + + t.transpile(envs["trainer_id"], + trainers=','.join(envs["trainer_endpoints"]), + current_endpoint=envs["current_endpoint"], + startup_program=startup_prog) DEBUG_PROG = bool(os.getenv("DEBUG_PROG", "0")) def parse_args(): parser = argparse.ArgumentParser(description=__doc__) add_arg = functools.partial(add_arguments, argparser=parser) # yapf: disable - add_arg('batch_size', int, 256, "Minibatch size.") add_arg('use_gpu', bool, True, "Whether to use GPU or not.") add_arg('total_images', int, 1281167, "Training image number.") add_arg('num_epochs', int, 120, "number of epochs.") add_arg('class_dim', int, 1000, "Class number.") add_arg('image_shape', str, "3,224,224", "input image size") add_arg('model_save_dir', str, "output", "model save directory") - add_arg('with_mem_opt', bool, False, "Whether to use memory optimization or not.") add_arg('pretrained_model', str, None, "Whether to use pretrained model.") add_arg('checkpoint', str, None, "Whether to resume checkpoint.") add_arg('lr', float, 0.1, "set learning rate.") @@ -60,14 +74,14 @@ def parse_args(): add_arg('start_test_pass', int, 0, "Start test after x passes.") add_arg('num_threads', int, 8, "Use num_threads to run the fluid program.") add_arg('reduce_strategy', str, "allreduce", "Choose from reduce or allreduce.") - add_arg('log_period', int, 5, "Print period, defualt is 5.") - add_arg('init_conv2d_kaiming', bool, False, "Whether to initliaze conv2d weight by kaiming.") + add_arg('log_period', int, 30, "Print period, defualt is 5.") add_arg('memory_optimize', bool, True, "Whether to enable memory optimize.") # yapf: enable args = parser.parse_args() return args def get_device_num(): + return 8 import subprocess visible_device = os.getenv('CUDA_VISIBLE_DEVICES') if visible_device: @@ -78,9 +92,6 @@ def get_device_num(): return device_num def linear_lr_decay(lr_values, epochs, bs_values, total_images): - """Applies cosine decay to the learning rate. - lr = 0.05 * (math.cos(epoch * (math.pi / 120)) + 1) - """ from paddle.fluid.layers.learning_rate_scheduler import _decay_step_counter import paddle.fluid.layers.tensor as tensor import math @@ -101,9 +112,9 @@ def linear_lr_decay(lr_values, epochs, bs_values, total_images): linear_epoch = end_epoch - start_epoch start_lr, end_lr = lr_values[idx] linear_lr = end_lr - start_lr - steps = last_steps + math.ceil(total_images * 1.0 / bs_values[idx]) * linear_epoch + steps = last_steps + math.ceil(total_images * 1.0 / bs_values[idx]) * linear_epoch + 1 with switch.case(global_step < steps): - decayed_lr = start_lr + linear_lr * ((global_step - last_steps) * 1.0/steps) + decayed_lr = start_lr + linear_lr * ((global_step - last_steps)* 1.0/(steps - last_steps)) last_steps = steps fluid.layers.tensor.assign(decayed_lr, lr) last_value_var = tensor.fill_constant( @@ -134,16 +145,16 @@ def test_parallel(exe, test_args, args, test_prog, feeder, bs): e.update( value=np.array(acc_rets[i]), weight=bs) num_samples = batch_id * bs * get_device_num() - print_train_time(start_ts, time.time(), num_samples) + print_train_time(start_ts, time.time(), num_samples, "Test") return [e.eval() for e in acc_evaluators] def build_program(args, is_train, main_prog, startup_prog, py_reader_startup_prog, img_size, trn_dir, batch_size, min_scale, rect_val): if is_train: - reader = torchvision_reader.train(traindir=os.path.join(args.data_dir, trn_dir, "train"), sz=img_size, min_scale=min_scale) + dataloader = torchvision_reader.train(traindir=os.path.join(args.data_dir, trn_dir, "train"), sz=img_size, min_scale=min_scale) else: - reader = torchvision_reader.test(valdir=os.path.join(args.data_dir, trn_dir, "validation"), bs=batch_size * get_device_num(), sz=img_size, rect_val=rect_val) + dataloader = torchvision_reader.test(valdir=os.path.join(args.data_dir, trn_dir, "validation"), bs=batch_size if is_mp_mode() else batch_size * get_device_num(), sz=img_size, rect_val=rect_val) dshape = [3, img_size, img_size] class_dim = 1000 @@ -160,17 +171,17 @@ def build_program(args, is_train, main_prog, startup_prog, py_reader_startup_pro with fluid.program_guard(main_prog, py_reader_startup_prog): with fluid.unique_name.guard(): pyreader = fluid.layers.py_reader( - capacity=batch_size * get_device_num(), + capacity=batch_size * 2 if is_mp_mode() else batch_size * get_device_num(), shapes=([-1] + dshape, (-1, 1)), dtypes=('uint8', 'int64'), - name="train_reader_" + str(img_size) if is_train else "test_reader_" + str(img_size), + name="train_reader_" + str(img_size), use_double_buffer=True) input, label = fluid.layers.read_file(pyreader) - pyreader.decorate_paddle_reader(paddle.batch(reader, batch_size=batch_size)) + pyreader.decorate_paddle_reader(paddle.batch(dataloader.reader(), batch_size=batch_size)) else: input = fluid.layers.data(name="image", shape=[3, 244, 244], dtype="uint8") label = fluid.layers.data(name="label", shape=[1], dtype="int64") - batched_reader = paddle.batch(reader, batch_size=batch_size * get_device_num()) + batched_reader = paddle.batch(dataloader.reader(), batch_size=batch_size if is_mp_mode() else batch_size * get_device_num()) cast_img_type = "float16" if args.fp16 else "float32" cast = fluid.layers.cast(input, cast_img_type) img_mean = fluid.layers.create_global_var([3, 1, 1], 0.0, cast_img_type, name="img_mean", persistable=True) @@ -192,13 +203,10 @@ def build_program(args, is_train, main_prog, startup_prog, py_reader_startup_pro # configure optimize optimizer = None if is_train: - #total_images = 1281167 / trainer_count epochs = [(0,7), (7,13), (13, 22), (22, 25), (25, 28)] bs_epoch = [x * get_device_num() for x in [224, 224, 96, 96, 50]] lrs = [(1.0, 2.0), (2.0, 0.25), (0.42857142857142855, 0.04285714285714286), (0.04285714285714286, 0.004285714285714286), (0.0022321428571428575, 0.00022321428571428573), 0.00022321428571428573] - #boundaries, values = lr_decay(lrs, epochs, bs_epoch, total_images) - #print("lr linear decay boundaries: ", boundaries, " \nvalues: ", values) optimizer = fluid.optimizer.Momentum( learning_rate=linear_lr_decay(lrs, epochs, bs_epoch, args.total_images), momentum=0.9, @@ -216,37 +224,29 @@ def build_program(args, is_train, main_prog, startup_prog, py_reader_startup_pro fluid.memory_optimize(main_prog, skip_grads=True) return avg_cost, optimizer, [batch_acc1, - batch_acc5], batched_reader, pyreader, py_reader_startup_prog + batch_acc5], batched_reader, pyreader, py_reader_startup_prog, dataloader def refresh_program(args, epoch, sz, trn_dir, bs, val_bs, need_update_start_prog=False, min_scale=0.08, rect_val=False): print('program changed: epoch: [%d], image size: [%d], trn_dir: [%s], batch_size:[%d]' % (epoch, sz, trn_dir, bs)) train_prog = fluid.Program() test_prog = fluid.Program() startup_prog = fluid.Program() py_reader_startup_prog = fluid.Program() + num_trainers = args.dist_env["num_trainers"] + trainer_id = args.dist_env["trainer_id"] train_args = build_program(args, True, train_prog, startup_prog, py_reader_startup_prog, sz, trn_dir, bs, min_scale, False) test_args = build_program(args, False, test_prog, startup_prog, py_reader_startup_prog, sz, trn_dir, val_bs, min_scale, rect_val) - - place = core.CUDAPlace(0) + gpu_id = int(os.getenv("FLAGS_selected_gpus")) if is_mp_mode() else 0 + place = core.CUDAPlace(gpu_id) startup_exe = fluid.Executor(place) print("execute py_reader startup program") startup_exe.run(py_reader_startup_prog) if need_update_start_prog: print("execute startup program") + if is_mp_mode(): + nccl2_prepare(args, startup_prog) startup_exe.run(startup_prog) - if args.init_conv2d_kaiming: - import torch - conv2d_w_vars = [var for var in startup_prog.global_block().vars.values() if var.name.startswith('conv2d_')] - for var in conv2d_w_vars: - torch_w = torch.empty(var.shape) - kaiming_np = torch.nn.init.kaiming_normal_(torch_w, mode='fan_out', nonlinearity='relu').numpy() - tensor = fluid.global_scope().find_var(var.name).get_tensor() - if args.fp16: - tensor.set(np.array(kaiming_np, dtype="float16").view(np.uint16), place) - else: - tensor.set(np.array(kaiming_np, dtype="float32"), place) - np_tensors = {} np_tensors["img_mean"] = np.array([0.485 * 255.0, 0.456 * 255.0, 0.406 * 255.0]).astype("float16" if args.fp16 else "float32").reshape((3, 1, 1)) np_tensors["img_std"] = np.array([0.229 * 255.0, 0.224 * 255.0, 0.225 * 255.0]).astype("float16" if args.fp16 else "float32").reshape((3, 1, 1)) @@ -258,27 +258,26 @@ def refresh_program(args, epoch, sz, trn_dir, bs, val_bs, need_update_start_prog var.get_tensor().set(np_tensor, place) - if DEBUG_PROG: - with open('/tmp/train_prog_pass%d' % epoch, 'w') as f: f.write(train_prog.to_string(True)) - with open('/tmp/test_prog_pass%d' % epoch, 'w') as f: f.write(test_prog.to_string(True)) - with open('/tmp/startup_prog_pass%d' % epoch, 'w') as f: f.write(startup_prog.to_string(True)) - with open('/tmp/py_reader_startup_prog_pass%d' % epoch, 'w') as f: f.write(py_reader_startup_prog.to_string(True)) - strategy = fluid.ExecutionStrategy() strategy.num_threads = args.num_threads strategy.allow_op_delay = False + strategy.num_iteration_per_drop_scope = 30 build_strategy = fluid.BuildStrategy() build_strategy.reduce_strategy = fluid.BuildStrategy().ReduceStrategy.AllReduce - + + avg_loss = train_args[0] train_exe = fluid.ParallelExecutor( True, avg_loss.name, main_program=train_prog, exec_strategy=strategy, - build_strategy=build_strategy) + build_strategy=build_strategy, + num_trainers=num_trainers, + trainer_id=trainer_id) + test_scope = fluid.global_scope().new_scope() test_exe = fluid.ParallelExecutor( - True, main_program=test_prog, share_vars_from=train_exe) + True, main_program=test_prog, share_vars_from=train_exe, scope=test_scope) return train_args, test_args, test_prog, train_exe, test_exe @@ -311,6 +310,8 @@ def train_parallel(args): num_samples = 0 iters = 0 start_time = time.time() + dataloader = train_args[6] # Paddle DataLoader + dataloader.shuffle_seed = pass_id + 1 train_args[4].start() # start pyreader while True: fetch_list = [avg_loss.name] @@ -344,7 +345,7 @@ def train_parallel(args): (pass_id, iters, fetched_data[0], fetched_data[1:-1], fetched_data[-1], train_args[4].queue.size())) iters += 1 - print_train_time(start_time, time.time(), num_samples) + print_train_time(start_time, time.time(), num_samples, "Train") feed_list = [test_prog.global_block().var(varname) for varname in ("image", "label")] test_feeder = fluid.DataFeeder(feed_list=feed_list, place=fluid.CUDAPlace(0)) test_ret = test_parallel(test_exe, test_args, args, test_prog, test_feeder, bs) @@ -353,11 +354,11 @@ def train_parallel(args): print("total train time: ", time.time() - over_all_start) -def print_train_time(start_time, end_time, num_samples): +def print_train_time(start_time, end_time, num_samples, prefix_text=""): train_elapsed = end_time - start_time examples_per_sec = num_samples / train_elapsed - print('\nTotal examples: %d, total time: %.5f, %.5f examples/sed\n' % - (num_samples, train_elapsed, examples_per_sec)) + print('\n%s Total examples: %d, total time: %.5f, %.5f examples/sed\n' % + (prefix_text, num_samples, train_elapsed, examples_per_sec)) def print_paddle_envs(): @@ -370,6 +371,7 @@ def print_paddle_envs(): def main(): args = parse_args() + args.dist_env = dist_env() print_arguments(args) print_paddle_envs() train_parallel(args) diff --git a/fluid/PaddleCV/image_classification/models/fast_resnet.py b/fluid/PaddleCV/image_classification/models/fast_resnet.py index 7e109eed..067c3af9 100644 --- a/fluid/PaddleCV/image_classification/models/fast_resnet.py +++ b/fluid/PaddleCV/image_classification/models/fast_resnet.py @@ -70,7 +70,7 @@ class FastResNet(): stride=2 if i == 0 and block != 0 else 1) pool_size = int(img_size / 32) pool = fluid.layers.pool2d( - input=conv, pool_size=pool_size, pool_type='avg', global_pooling=True) + input=conv, pool_size=0, pool_type='avg', global_pooling=True) out = fluid.layers.fc(input=pool, size=class_dim, act=None, -- GitLab