from __future__ import absolute_import from __future__ import division from __future__ import print_function import os # GPU memory garbage collection optimization flags os.environ['FLAGS_eager_delete_tensor_gb'] = "0.0" import sys import timeit import argparse import pprint import shutil import functools import paddle import numpy as np import paddle.fluid as fluid from src.utils.metrics import ConfusionMatrix from src.utils.config import cfg from src.utils.timer import Timer, calculate_eta from src.utils import dist_utils from src.datasets import build_dataset from src.models.model_builder import build_model from src.models.model_builder import ModelPhase from src.models.model_builder import parse_shape_from_file from eval import evaluate from vis import visualize def parse_args(): parser = argparse.ArgumentParser(description='semseg-paddle') parser.add_argument( '--cfg', dest='cfg_file', help='Config file for training (and optionally testing)', default=None, type=str) parser.add_argument( '--use_gpu', dest='use_gpu', help='Use gpu or cpu', action='store_true', default=False) parser.add_argument( '--use_mpio', dest='use_mpio', help='Use multiprocess I/O or not', action='store_true', default=False) parser.add_argument( '--log_steps', dest='log_steps', help='Display logging information at every log_steps', default=10, type=int) parser.add_argument( '--debug', dest='debug', help='debug mode, display detail information of training', action='store_true') parser.add_argument( '--use_tb', dest='use_tb', help='whether to record the data during training to Tensorboard', action='store_true') parser.add_argument( '--tb_log_dir', dest='tb_log_dir', help='Tensorboard logging directory', default=None, type=str) parser.add_argument( '--do_eval', dest='do_eval', help='Evaluation models result on every new checkpoint', action='store_true') parser.add_argument( 'opts', help='See utils/config.py for all options', default=None, nargs=argparse.REMAINDER) return parser.parse_args() def save_checkpoint(exe, program, ckpt_name): """ Save checkpoint for evaluation or resume training """ filename= '{}_{}_{}_epoch_{}.pdparams'.format(str(cfg.MODEL.MODEL_NAME), str(cfg.MODEL.BACKBONE), str(cfg.DATASET.DATASET_NAME), ckpt_name) ckpt_dir = cfg.TRAIN.MODEL_SAVE_DIR print("Save model checkpoint to {}".format(ckpt_dir)) if not os.path.isdir(ckpt_dir): os.makedirs(ckpt_dir) fluid.io.save_params(exe, ckpt_dir, program, filename) return ckpt_dir def load_checkpoint(exe, program): """ Load checkpoiont from pretrained model directory for resume training """ print('Resume model training from:', cfg.TRAIN.RESUME_MODEL_DIR) if not os.path.exists(cfg.TRAIN.RESUME_MODEL_DIR): raise ValueError("TRAIN.PRETRAIN_MODEL {} not exist!".format( cfg.TRAIN.RESUME_MODEL_DIR)) fluid.io.load_persistables( exe, cfg.TRAIN.RESUME_MODEL_DIR, main_program=program) model_path = cfg.TRAIN.RESUME_MODEL_DIR # Check is path ended by path spearator if model_path[-1] == os.sep: model_path = model_path[0:-1] epoch_name = os.path.basename(model_path) # If resume model is final model if epoch_name == 'final': begin_epoch = cfg.SOLVER.NUM_EPOCHS # If resume model path is end of digit, restore epoch status elif epoch_name.isdigit(): epoch = int(epoch_name) begin_epoch = epoch + 1 else: raise ValueError("Resume model path is not valid!") print("Model checkpoint loaded successfully!") return begin_epoch def print_info(*msg): if cfg.TRAINER_ID == 0: print(*msg) def train(cfg): startup_prog = fluid.Program() train_prog = fluid.Program() drop_last = True dataset = build_dataset(cfg.DATASET.DATASET_NAME, file_list=cfg.DATASET.TRAIN_FILE_LIST, mode=ModelPhase.TRAIN, shuffle=True, data_dir=cfg.DATASET.DATA_DIR, base_size= cfg.DATAAUG.BASE_SIZE, crop_size= cfg.DATAAUG.CROP_SIZE, rand_scale=True) def data_generator(): if args.use_mpio: data_gen = dataset.multiprocess_generator( num_processes=cfg.DATALOADER.NUM_WORKERS, max_queue_size=cfg.DATALOADER.BUF_SIZE) else: data_gen = dataset.generator() batch_data = [] for b in data_gen: batch_data.append(b) if len(batch_data) == (cfg.TRAIN_BATCH_SIZE // cfg.NUM_TRAINERS): for item in batch_data: yield item[0], item[1], item[2] batch_data = [] # If use sync batch norm strategy, drop last batch if number of samples # in batch_data is less then cfg.BATCH_SIZE to avoid NCCL hang issues if not cfg.TRAIN.SYNC_BATCH_NORM: for item in batch_data: yield item[0], item[1], item[2] # Get device environment gpu_id = int(os.environ.get('FLAGS_selected_gpus', 0)) place = fluid.CUDAPlace(gpu_id) if args.use_gpu else fluid.CPUPlace() places = fluid.cuda_places() if args.use_gpu else fluid.cpu_places() # Get number of GPU dev_count = cfg.NUM_TRAINERS if cfg.NUM_TRAINERS > 1 else len(places) print_info("#device count: {}".format(dev_count)) cfg.TRAIN_BATCH_SIZE = dev_count * int(cfg.TRAIN_BATCH_SIZE_PER_GPU) print_info("#train_batch_size: {}".format(cfg.TRAIN_BATCH_SIZE)) print_info("#batch_size_per_dev: {}".format(cfg.TRAIN_BATCH_SIZE_PER_GPU)) py_reader, avg_loss, lr, pred, grts, masks = build_model( train_prog, startup_prog, phase=ModelPhase.TRAIN) py_reader.decorate_sample_generator( data_generator, batch_size=cfg.TRAIN_BATCH_SIZE_PER_GPU, drop_last=drop_last) exe = fluid.Executor(place) exe.run(startup_prog) exec_strategy = fluid.ExecutionStrategy() # Clear temporary variables every 100 iteration if args.use_gpu: exec_strategy.num_threads = fluid.core.get_cuda_device_count() exec_strategy.num_iteration_per_drop_scope = 100 build_strategy = fluid.BuildStrategy() if cfg.NUM_TRAINERS > 1 and args.use_gpu: dist_utils.prepare_for_multi_process(exe, build_strategy, train_prog) exec_strategy.num_threads = 1 if cfg.TRAIN.SYNC_BATCH_NORM and args.use_gpu: if dev_count > 1: # Apply sync batch norm strategy print_info("Sync BatchNorm strategy is effective.") build_strategy.sync_batch_norm = True else: print_info( "Sync BatchNorm strategy will not be effective if GPU device" " count <= 1") compiled_train_prog = fluid.CompiledProgram(train_prog).with_data_parallel( loss_name=avg_loss.name, exec_strategy=exec_strategy, build_strategy=build_strategy) # Resume training begin_epoch = cfg.SOLVER.BEGIN_EPOCH if cfg.TRAIN.RESUME_MODEL_DIR: begin_epoch = load_checkpoint(exe, train_prog) # Load pretrained model elif os.path.exists(cfg.TRAIN.PRETRAINED_MODEL_DIR): print_info('Pretrained model dir: ', cfg.TRAIN.PRETRAINED_MODEL_DIR) load_vars = [] load_fail_vars = [] def var_shape_matched(var, shape): """ Check whehter persitable variable shape is match with current network """ var_exist = os.path.exists( os.path.join(cfg.TRAIN.PRETRAINED_MODEL_DIR, var.name)) if var_exist: var_shape = parse_shape_from_file( os.path.join(cfg.TRAIN.PRETRAINED_MODEL_DIR, var.name)) return var_shape == shape return False for x in train_prog.list_vars(): if isinstance(x, fluid.framework.Parameter): shape = tuple(fluid.global_scope().find_var( x.name).get_tensor().shape()) if var_shape_matched(x, shape): load_vars.append(x) else: load_fail_vars.append(x) fluid.io.load_vars( exe, dirname=cfg.TRAIN.PRETRAINED_MODEL_DIR, vars=load_vars) for var in load_vars: print_info("Parameter[{}] loaded sucessfully!".format(var.name)) for var in load_fail_vars: print_info( "Parameter[{}] don't exist or shape does not match current network, skip" " to load it.".format(var.name)) print_info("{}/{} pretrained parameters loaded successfully!".format( len(load_vars), len(load_vars) + len(load_fail_vars))) else: print_info( 'Pretrained model dir {} not exists, training from scratch...'. format(cfg.TRAIN.PRETRAINED_MODEL_DIR)) fetch_list = [avg_loss.name, lr.name] if args.debug: # Fetch more variable info and use streaming confusion matrix to # calculate IoU results if in debug mode np.set_printoptions( precision=4, suppress=True, linewidth=160, floatmode="fixed") fetch_list.extend([pred.name, grts.name, masks.name]) cm = ConfusionMatrix(cfg.DATASET.NUM_CLASSES, streaming=True) if args.use_tb: if not args.tb_log_dir: print_info("Please specify the log directory by --tb_log_dir.") exit(1) from tb_paddle import SummaryWriter log_writer = SummaryWriter(args.tb_log_dir) # trainer_id = int(os.getenv("PADDLE_TRAINER_ID", 0)) # num_trainers = int(os.environ.get('PADDLE_TRAINERS_NUM', 1)) global_step = 0 all_step = cfg.DATASET.TRAIN_TOTAL_IMAGES // cfg.TRAIN_BATCH_SIZE if cfg.DATASET.TRAIN_TOTAL_IMAGES % cfg.TRAIN_BATCH_SIZE and drop_last != True: all_step += 1 all_step *= (cfg.SOLVER.NUM_EPOCHS - begin_epoch + 1) avg_loss = 0.0 timer = Timer() timer.start() if begin_epoch > cfg.SOLVER.NUM_EPOCHS: raise ValueError( ("begin epoch[{}] is larger than cfg.SOLVER.NUM_EPOCHS[{}]").format( begin_epoch, cfg.SOLVER.NUM_EPOCHS)) if args.use_mpio: print_info("Use multiprocess reader") else: print_info("Use multi-thread reader") for epoch in range(begin_epoch, cfg.SOLVER.NUM_EPOCHS + 1): py_reader.start() while True: try: if args.debug: # Print category IoU and accuracy to check whether the # traning process is corresponed to expectation loss, lr, pred, grts, masks = exe.run( program=compiled_train_prog, fetch_list=fetch_list, return_numpy=True) cm.calculate(pred, grts, masks) avg_loss += np.mean(np.array(loss)) global_step += 1 if global_step % args.log_steps == 0: speed = args.log_steps / timer.elapsed_time() avg_loss /= args.log_steps category_acc, mean_acc = cm.accuracy() category_iou, mean_iou = cm.mean_iou() print_info(( "epoch={}/{} step={}/{} lr={:.5f} loss={:.4f} acc={:.5f} mIoU={:.5f} step/sec={:.3f} | ETA {}" ).format(epoch, cfg.SOLVER.NUM_EPOCHS, global_step, all_step, lr[0], avg_loss, mean_acc, mean_iou, speed, calculate_eta(all_step - global_step, speed))) print_info("Category IoU: ", category_iou) print_info("Category Acc: ", category_acc) if args.use_tb: log_writer.add_scalar('Train/mean_iou', mean_iou, global_step) log_writer.add_scalar('Train/mean_acc', mean_acc, global_step) log_writer.add_scalar('Train/loss', avg_loss, global_step) log_writer.add_scalar('Train/lr', lr[0], global_step) log_writer.add_scalar('Train/step/sec', speed, global_step) sys.stdout.flush() avg_loss = 0.0 cm.zero_matrix() timer.restart() else: # If not in debug mode, avoid unnessary log and calculate loss, lr = exe.run( program=compiled_train_prog, fetch_list=fetch_list, return_numpy=True) avg_loss += np.mean(np.array(loss)) global_step += 1 if global_step % args.log_steps == 0 and cfg.TRAINER_ID == 0: avg_loss /= args.log_steps speed = args.log_steps / timer.elapsed_time() print(( "epoch={}/{} step={}/{} lr={:.5f} loss={:.4f} step/sec={:.3f} | ETA {}" ).format(epoch, cfg.SOLVER.NUM_EPOCHS, global_step, all_step, lr[0], avg_loss, speed, calculate_eta(all_step - global_step, speed))) if args.use_tb: log_writer.add_scalar('Train/loss', avg_loss, global_step) log_writer.add_scalar('Train/lr', lr[0], global_step) log_writer.add_scalar('Train/speed', speed, global_step) sys.stdout.flush() avg_loss = 0.0 timer.restart() except fluid.core.EOFException: py_reader.reset() break except Exception as e: print(e) if epoch % cfg.TRAIN.SNAPSHOT_EPOCH == 0 and cfg.TRAINER_ID == 0: ckpt_dir = save_checkpoint(exe, train_prog, epoch) if args.do_eval: print("Evaluation start") _, mean_iou, _, mean_acc = evaluate( cfg=cfg, ckpt_dir=ckpt_dir, use_gpu=args.use_gpu, use_mpio=args.use_mpio) if args.use_tb: log_writer.add_scalar('Evaluate/mean_iou', mean_iou, global_step) log_writer.add_scalar('Evaluate/mean_acc', mean_acc, global_step) # Use Tensorboard to visualize results if args.use_tb and cfg.DATASET.VIS_FILE_LIST is not None: visualize( cfg=cfg, use_gpu=args.use_gpu, vis_file_list=cfg.DATASET.VIS_FILE_LIST, vis_dir="visual", ckpt_dir=ckpt_dir, log_writer=log_writer) # save final model if cfg.TRAINER_ID == 0: save_checkpoint(exe, train_prog, 'final') def main(args): if args.cfg_file is not None: cfg.update_from_file(args.cfg_file) if args.opts: cfg.update_from_list(args.opts) cfg.TRAINER_ID = int(os.getenv("PADDLE_TRAINER_ID", 0)) cfg.NUM_TRAINERS = int(os.environ.get('PADDLE_TRAINERS_NUM', 1)) cfg.check_and_infer() print_info(pprint.pformat(cfg)) train(cfg) if __name__ == '__main__': args = parse_args() start = timeit.default_timer() main(args) end = timeit.default_timer() print("training time: {} h".format(1.0*(end-start)/3600))