From 39f946aa19884389a6815e777e2d9f0b86a9ae6b Mon Sep 17 00:00:00 2001 From: shippingwang Date: Thu, 18 Jun 2020 07:28:56 +0000 Subject: [PATCH] add dali --- configs/high_performance/dali.yaml | 78 +++++++ tools/dali.py | 325 +++++++++++++++++++++++++++++ tools/program.py | 14 +- tools/run_dali.sh | 13 ++ tools/train.py | 29 ++- 5 files changed, 447 insertions(+), 12 deletions(-) create mode 100644 configs/high_performance/dali.yaml create mode 100644 tools/dali.py create mode 100755 tools/run_dali.sh diff --git a/configs/high_performance/dali.yaml b/configs/high_performance/dali.yaml new file mode 100644 index 00000000..799748e5 --- /dev/null +++ b/configs/high_performance/dali.yaml @@ -0,0 +1,78 @@ +mode: 'train' +ARCHITECTURE: + name: 'ResNet50_vd' + +pretrained_model: "" +model_save_dir: "./output/" +classes_num: 1000 +total_images: 1281167 +save_interval: 1 +validate: True +valid_interval: 1 +epochs: 200 +topk: 5 +image_shape: [3, 224, 224] + +use_mix: True +ls_epsilon: 0.1 +use_dali: True + +LEARNING_RATE: + function: 'Cosine' + params: + lr: 0.1 + +OPTIMIZER: + function: 'Momentum' + params: + momentum: 0.9 + regularizer: + function: 'L2' + factor: 0.000070 + +TRAIN: + batch_size: 256 + num_workers: 4 + file_list: "./dataset/ILSVRC2012/train_list.txt" + data_dir: "./dataset/ILSVRC2012/" + shuffle_seed: 0 + transforms: + - DecodeImage: + to_rgb: True + to_np: False + channel_first: False + - RandCropImage: + size: 224 + - RandFlipImage: + flip_code: 1 + - NormalizeImage: + scale: 1./255. + mean: [0.485, 0.456, 0.406] + std: [0.229, 0.224, 0.225] + order: '' + - ToCHWImage: + mix: + - MixupOperator: + alpha: 0.2 + +VALID: + batch_size: 64 + num_workers: 4 + file_list: "./dataset/ILSVRC2012/val_list.txt" + data_dir: "./dataset/ILSVRC2012/" + shuffle_seed: 0 + transforms: + - DecodeImage: + to_rgb: True + to_np: False + channel_first: False + - ResizeImage: + resize_short: 256 + - CropImage: + size: 224 + - NormalizeImage: + scale: 1.0/255.0 + mean: [0.485, 0.456, 0.406] + std: [0.229, 0.224, 0.225] + order: '' + - ToCHWImage: diff --git a/tools/dali.py b/tools/dali.py new file mode 100644 index 00000000..6776392d --- /dev/null +++ b/tools/dali.py @@ -0,0 +1,325 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division + +import os + +import numpy as np +from nvidia.dali.pipeline import Pipeline +import nvidia.dali.ops as ops +import nvidia.dali.types as types +from nvidia.dali.plugin.paddle import DALIGenericIterator + +import paddle +from paddle import fluid + + +class HybridTrainPipe(Pipeline): + def __init__(self, + file_root, + file_list, + batch_size, + resize_shorter, + crop, + min_area, + lower, + upper, + interp, + mean, + std, + device_id, + shard_id=0, + num_shards=1, + random_shuffle=True, + num_threads=4, + seed=42): + super(HybridTrainPipe, self).__init__( + batch_size, num_threads, device_id, seed=seed) + self.input = ops.FileReader( + file_root=file_root, + file_list=file_list, + shard_id=shard_id, + num_shards=num_shards, + random_shuffle=random_shuffle) + # set internal nvJPEG buffers size to handle full-sized ImageNet images + # without additional reallocations + device_memory_padding = 211025920 + host_memory_padding = 140544512 + self.decode = ops.ImageDecoderRandomCrop( + device='mixed', + output_type=types.RGB, + device_memory_padding=device_memory_padding, + host_memory_padding=host_memory_padding, + random_aspect_ratio=[lower, upper], + random_area=[min_area, 1.0], + num_attempts=100) + self.res = ops.Resize( + device='gpu', resize_x=crop, resize_y=crop, interp_type=interp) + self.cmnp = ops.CropMirrorNormalize( + device="gpu", + output_dtype=types.FLOAT, + output_layout=types.NCHW, + crop=(crop, crop), + image_type=types.RGB, + mean=mean, + std=std) + self.coin = ops.CoinFlip(probability=0.5) + self.to_int64 = ops.Cast(dtype=types.INT64, device="gpu") + + def define_graph(self): + rng = self.coin() + jpegs, labels = self.input(name="Reader") + images = self.decode(jpegs) + images = self.res(images) + output = self.cmnp(images.gpu(), mirror=rng) + return [output, self.to_int64(labels.gpu())] + + def __len__(self): + return self.epoch_size("Reader") + + +class HybridValPipe(Pipeline): + def __init__(self, + file_root, + file_list, + batch_size, + resize_shorter, + crop, + interp, + mean, + std, + device_id, + shard_id=0, + num_shards=1, + random_shuffle=False, + num_threads=4, + seed=42): + super(HybridValPipe, self).__init__( + batch_size, num_threads, device_id, seed=seed) + self.input = ops.FileReader( + file_root=file_root, + file_list=file_list, + shard_id=shard_id, + num_shards=num_shards, + random_shuffle=random_shuffle) + self.decode = ops.ImageDecoder(device="mixed", output_type=types.RGB) + self.res = ops.Resize( + device="gpu", resize_shorter=resize_shorter, interp_type=interp) + self.cmnp = ops.CropMirrorNormalize( + device="gpu", + output_dtype=types.FLOAT, + output_layout=types.NCHW, + crop=(crop, crop), + image_type=types.RGB, + mean=mean, + std=std) + self.to_int64 = ops.Cast(dtype=types.INT64, device="gpu") + + def define_graph(self): + jpegs, labels = self.input(name="Reader") + images = self.decode(jpegs) + images = self.res(images) + output = self.cmnp(images) + return [output, self.to_int64(labels.gpu())] + + def __len__(self): + return self.epoch_size("Reader") + + +def build(settings, mode='train'): + env = os.environ + assert settings.get('use_gpu', + True) == True, "gpu training is required for DALI" + #assert not settings.get('use_mix'), "mixup is not supported by DALI reader" + assert not settings.get( + 'use_aa'), "auto augment is not supported by DALI reader" + assert float(env.get('FLAGS_fraction_of_gpu_memory_to_use', 0.92)) < 0.9, \ + "Please leave enough GPU memory for DALI workspace, e.g., by setting" \ + " `export FLAGS_fraction_of_gpu_memory_to_use=0.8`" + + file_root = settings.TRAIN.data_dir + bs = settings.TRAIN.batch_size if mode == 'train' else settings.VALID.batch_size + print(bs, paddle.fluid.core.get_cuda_device_count()) + assert bs % paddle.fluid.core.get_cuda_device_count() == 0, \ + "batch size must be multiple of number of devices" + batch_size = bs // paddle.fluid.core.get_cuda_device_count() + + image_mean = [0.485, 0.456, 0.406] + image_std = [0.229, 0.224, 0.225] + mean = [v * 255 for v in image_mean] + std = [v * 255 for v in image_std] + + crop = 224 # settings.crop_size + resize_shorter = 256 # settings.resize_short_size + min_area = 0.08 # settings.lower_scale + lower = 3. / 4. # settings.lower_ratio + upper = 4. / 3. # settings.upper_ratio + + interp = 1 # settings.interpolation or 1 # default to linear + interp_map = { + 0: types.INTERP_NN, # cv2.INTER_NEAREST + 1: types.INTERP_LINEAR, # cv2.INTER_LINEAR + 2: types.INTERP_CUBIC, # cv2.INTER_CUBIC + 4: types.INTERP_LANCZOS3, # XXX use LANCZOS3 for cv2.INTER_LANCZOS4 + } + assert interp in interp_map, "interpolation method not supported by DALI" + interp = interp_map[interp] + + if mode != 'train': + p = fluid.framework.cuda_places()[0] + place = fluid.core.Place() + place.set_place(p) + device_id = place.gpu_device_id() + file_list = os.path.join(file_root, 'val_list.txt') + if not os.path.exists(file_list): + file_list = None + file_root = os.path.join(file_root, 'val') + pipe = HybridValPipe( + file_root, + file_list, + batch_size, + resize_shorter, + crop, + interp, + mean, + std, + device_id=device_id) + pipe.build() + return DALIGenericIterator( + pipe, ['feed_image', 'feed_label'], + size=len(pipe), + dynamic_shape=True, + fill_last_batch=False, + last_batch_padded=True) + + file_list = os.path.join(file_root, 'train_list.txt') + if not os.path.exists(file_list): + file_list = None + file_root = os.path.join(file_root, 'train') + + if 'PADDLE_TRAINER_ID' in env and 'PADDLE_TRAINERS_NUM' in env: + shard_id = int(env['PADDLE_TRAINER_ID']) + num_shards = int(env['PADDLE_TRAINERS_NUM']) + device_id = int(env['FLAGS_selected_gpus']) + pipe = HybridTrainPipe( + file_root, + file_list, + batch_size, + resize_shorter, + crop, + min_area, + lower, + upper, + interp, + mean, + std, + device_id, + shard_id, + num_shards, + seed=42 + shard_id) + pipe.build() + pipelines = [pipe] + sample_per_shard = len(pipe) // num_shards + else: + pipelines = [] + places = fluid.framework.cuda_places() + num_shards = len(places) + for idx, p in enumerate(places): + place = fluid.core.Place() + place.set_place(p) + device_id = place.gpu_device_id() + pipe = HybridTrainPipe( + file_root, + file_list, + batch_size, + resize_shorter, + crop, + min_area, + lower, + upper, + interp, + mean, + std, + device_id, + idx, + num_shards, + seed=42 + idx) + pipe.build() + pipelines.append(pipe) + sample_per_shard = len(pipelines[0]) + + return DALIGenericIterator( + pipelines, ['feed_image', 'feed_label'], size=sample_per_shard) + + +def train(settings): + return build(settings, 'train') + + +def val(settings): + return build(settings, 'val') + + +def _to_Tensor(lod_tensor, dtype): + data_tensor = fluid.layers.create_tensor(dtype=dtype) + data = np.array(lod_tensor).astype(dtype) + fluid.layers.assign(data, data_tensor) + return data_tensor + + +def post_mix(settings, batch): + batch_size = settings.TRAIN.batch_size // paddle.fluid.core.get_cuda_device_count( + ) + + batch_imgs = _to_Tensor(batch[0]['feed_image'], 'float32') + batch_label = _to_Tensor(batch[0]['feed_label'], 'int64') + alpha = 0.2 + idx = _to_Tensor(np.random.permutation(batch_size), 'int32') + lam = np.random.beta(alpha, alpha) + + batch_imgs = lam * batch_imgs + (1 - lam) * paddle.fluid.layers.gather( + batch_imgs, idx) + + # print(type(batch_label)) + feed = [{ + 'feed_image': batch_imgs, + 'feed_y_a': batch_label, + 'feed_y_b': paddle.fluid.layers.gather(batch_label, idx), + 'feed_lam': _to_Tensor([lam] * batch_size, 'float32') + }] + + return feed + + +def post_mix_numpy(settings, batch): + batch_size = settings.TRAIN.batch_size // paddle.fluid.core.get_cuda_device_count( + ) + + batch_imgs = np.array(batch[0]['feed_image']) + batch_label = np.array(batch[0]['feed_label']) + alpha = 0.2 + idx = np.random.permutation(batch_size) + lam = np.random.beta(alpha, alpha) + + batch_imgs = lam * batch_imgs + (1 - lam) * batch_imgs[idx] + + feed = [{ + 'feed_image': batch_imgs, + 'feed_y_a': batch_label, + 'feed_y_b': batch_label[idx], + 'feed_lam': np.array([lam] * batch_size).astype('float32') + }] + + return feed diff --git a/tools/program.py b/tools/program.py index f70cb2dc..9db992a9 100644 --- a/tools/program.py +++ b/tools/program.py @@ -338,7 +338,8 @@ def build(config, main_prog, startup_prog, is_train=True): use_mix = config.get('use_mix') and is_train use_distillation = config.get('use_distillation') feeds = create_feeds(config.image_shape, use_mix=use_mix) - dataloader = create_dataloader(feeds.values()) + dataloader = create_dataloader(feeds.values()) if not config.get( + 'use_dali') else None out = create_model(config.ARCHITECTURE, feeds['image'], config.classes_num, is_train) fetchs = create_fetchs( @@ -405,6 +406,7 @@ def run(dataloader, fetchs, epoch=0, mode='train', + config=None, vdl_writer=None): """ Feed data to the model and fetch the measures and loss @@ -425,7 +427,13 @@ def run(dataloader, m.reset() batch_time = AverageMeter('elapse', '.3f') tic = time.time() - for idx, batch in enumerate(dataloader()): + dataloader = dataloader if config.get('use_dali') else dataloader()() + + for idx, batch in enumerate(dataloader): + if config.get('use_dali'): + import dali + batch = dali.post_mix_numpy(config, batch) + metrics = exe.run(program=program, feed=batch, fetch_list=fetch_list) batch_time.update(time.time() - tic) tic = time.time() @@ -448,6 +456,8 @@ def run(dataloader, if idx == 0 else epoch_str, logger.coloring(step_str, "PURPLE"), logger.coloring(fetchs_str, 'OKGREEN'))) + if config.get('use_dali'): + dataloader.reset() end_str = ''.join([str(m.mean) + ' ' for m in metric_list] + [batch_time.total]) + 's' diff --git a/tools/run_dali.sh b/tools/run_dali.sh new file mode 100755 index 00000000..af805605 --- /dev/null +++ b/tools/run_dali.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +export PYTHONPATH=$PWD:$PYTHONPA +export FLAGS_fraction_of_gpu_memory_to_use=0.8 + + +python3 -m paddle.distributed.launch \ + --selected_gpus="0,1" \ + tools/train.py \ + -c ./configs/high_performance/dali.yaml \ + -o TRAIN.batch_size=4 \ + -o use_mix=True \ + -o use_dali=True diff --git a/tools/train.py b/tools/train.py index a5f765f0..c7ae7320 100644 --- a/tools/train.py +++ b/tools/train.py @@ -90,14 +90,23 @@ def main(args): # load model from 1. checkpoint to resume training, 2. pretrained model to finetune init_model(config, train_prog, exe) + if not config.get('use_dali'): - train_reader = Reader(config, 'train')() - train_dataloader.set_sample_list_generator(train_reader, place) + train_reader = Reader(config, 'train')() + train_dataloader.set_sample_list_generator(train_reader, place) - if config.validate: - valid_reader = Reader(config, 'valid')() - valid_dataloader.set_sample_list_generator(valid_reader, place) - compiled_valid_prog = program.compile(config, valid_prog) + if config.validate: + valid_reader = Reader(config, 'valid')() + valid_dataloader.set_sample_list_generator(valid_reader, place) + compiled_valid_prog = program.compile(config, valid_prog) + + else: + import dali + train_dataloader = dali.train(settings=config) + if config.validate: + if int(os.getenv("PADDLE_TRAINER_ID", 0)) == 0: + valid_dataloader = dali.val(settings=config) + compiled_valid_prog = program.compile(config, valid_prog) compiled_train_prog = fleet.main_program @@ -110,16 +119,16 @@ def main(args): for epoch_id in range(config.epochs): # 1. train with train dataset program.run(train_dataloader, exe, compiled_train_prog, train_fetchs, - epoch_id, 'train', vdl_writer) + epoch_id, 'train', config, vdl_writer) if int(os.getenv("PADDLE_TRAINER_ID", 0)) == 0: # 2. validate with validate dataset if config.validate and epoch_id % config.valid_interval == 0: if config.get('use_ema'): logger.info(logger.coloring("EMA validate start...")) with ema.apply(exe): - top1_acc = program.run(valid_dataloader, exe, - compiled_valid_prog, - valid_fetchs, epoch_id, 'valid') + top1_acc = program.run( + valid_dataloader, exe, compiled_valid_prog, + valid_fetchs, epoch_id, 'valid', config) logger.info(logger.coloring("EMA validate over!")) top1_acc = program.run(valid_dataloader, exe, -- GitLab