From e1e186ad8869727fc1ee559ef90ea5f6cd543f05 Mon Sep 17 00:00:00 2001 From: chenguowei01 Date: Tue, 16 Jun 2020 19:47:59 +0800 Subject: [PATCH] add parallel training --- dygraph/datasets/optic_disc_seg.py | 2 +- dygraph/train.py | 46 +++++----- dygraph/utils/logging.py | 18 ++-- dygraph/val.py | 133 ++++++++++++++++------------- 4 files changed, 114 insertions(+), 85 deletions(-) diff --git a/dygraph/datasets/optic_disc_seg.py b/dygraph/datasets/optic_disc_seg.py index 0730bfd5..9e278f1b 100644 --- a/dygraph/datasets/optic_disc_seg.py +++ b/dygraph/datasets/optic_disc_seg.py @@ -57,7 +57,7 @@ class OpticDiscSeg(Dataset): if mode == 'train': file_list = os.path.join(self.data_dir, 'train_list.txt') elif mode == 'eval': - file_list = os.paht.join(self.data_dir, 'val_list.txt') + file_list = os.path.join(self.data_dir, 'val_list.txt') else: file_list = os.path.join(self.data_dir, 'test_list.txt') else: diff --git a/dygraph/train.py b/dygraph/train.py index b21b80dd..abc55179 100644 --- a/dygraph/train.py +++ b/dygraph/train.py @@ -132,12 +132,19 @@ def train(model, save_interval_epochs=1, num_classes=None, num_workers=8): + ignore_index = model.ignore_index + nranks = ParallelEnv().nranks + + load_pretrained_model(model, pretrained_model) + if not os.path.isdir(save_dir): if os.path.exists(save_dir): os.remove(save_dir) os.makedirs(save_dir) - load_pretrained_model(model, pretrained_model) + if nranks > 1: + strategy = fluid.dygraph.prepare_context() + model_parallel = fluid.dygraph.DataParallel(model, strategy) batch_sampler = DistributedBatchSampler( train_dataset, batch_size=batch_size, shuffle=True, drop_last=True) @@ -155,32 +162,39 @@ def train(model, for step, data in enumerate(loader): images = data[0] labels = data[1].astype('int64') - loss = model(images, labels, mode='train') - loss.backward() + if nranks > 1: + loss = model_parallel(images, labels, mode='train') + loss = model_parallel.scale_loss(loss) + loss.backward() + model_parallel.apply_collective_grads() + else: + loss = model(images, labels, mode='train') + loss.backward() optimizer.minimize(loss) + model_parallel.clear_gradients() logging.info("[TRAIN] Epoch={}/{}, Step={}/{}, loss={}".format( epoch + 1, num_epochs, step + 1, num_steps_each_epoch, loss.numpy())) - if ( - epoch + 1 - ) % save_interval_epochs == 0 or num_steps_each_epoch == num_epochs - 1: + if ((epoch + 1) % save_interval_epochs == 0 + or num_steps_each_epoch == num_epochs - 1 + ) and ParallelEnv().local_rank == 0: current_save_dir = os.path.join(save_dir, "epoch_{}".format(epoch + 1)) if not os.path.isdir(current_save_dir): os.makedirs(current_save_dir) - fluid.save_dygraph(model.state_dict(), + fluid.save_dygraph(model_parallel.state_dict(), os.path.join(current_save_dir, 'model')) if eval_dataset is not None: - model.eval() evaluate( model, eval_dataset, + places=places, model_dir=current_save_dir, num_classes=num_classes, batch_size=batch_size, - ignore_index=model.ignore_index, + ignore_index=ignore_index, epoch_id=epoch + 1) model.train() @@ -188,7 +202,7 @@ def train(model, def main(args): env_info = get_environ_info() places = fluid.CUDAPlace(ParallelEnv().dev_id) \ - if env_info['place'] == 'gpu' and fluid.is_compiled_with_cuda() \ + if env_info['place'] == 'cuda' and fluid.is_compiled_with_cuda() \ else fluid.CPUPlace() with fluid.dygraph.guard(places): @@ -200,18 +214,13 @@ def main(args): ]) train_dataset = OpticDiscSeg(transforms=train_transforms, mode='train') + eval_dataset = None if args.val_list is not None: eval_transforms = T.Compose( [T.Resize(args.input_size), T.Normalize()]) - eval_dataset = Dataset( - data_dir=args.data_dir, - file_list=args.val_list, - transforms=eval_transforms, - num_workers='auto', - buffer_size=100, - parallel_method='thread', - shuffle=False) + eval_dataset = OpticDiscSeg( + transforms=train_transforms, mode='eval') if args.model_name == 'UNet': model = models.UNet(num_classes=args.num_classes, ignore_index=255) @@ -244,5 +253,4 @@ def main(args): if __name__ == '__main__': args = parse_args() - print(args) main(args) diff --git a/dygraph/utils/logging.py b/dygraph/utils/logging.py index 5b38b1b9..015948f6 100644 --- a/dygraph/utils/logging.py +++ b/dygraph/utils/logging.py @@ -16,18 +16,22 @@ import time import os import sys +from paddle.fluid.dygraph.parallel import ParallelEnv + levels = {0: 'ERROR', 1: 'WARNING', 2: 'INFO', 3: 'DEBUG'} log_level = 2 def log(level=2, message=""): - current_time = time.time() - time_array = time.localtime(current_time) - current_time = time.strftime("%Y-%m-%d %H:%M:%S", time_array) - if log_level >= level: - print("{} [{}]\t{}".format(current_time, levels[level], - message).encode("utf-8").decode("latin1")) - sys.stdout.flush() + if ParallelEnv().local_rank == 0: + current_time = time.time() + time_array = time.localtime(current_time) + current_time = time.strftime("%Y-%m-%d %H:%M:%S", time_array) + if log_level >= level: + print( + "{} [{}]\t{}".format(current_time, levels[level], + message).encode("utf-8").decode("latin1")) + sys.stdout.flush() def debug(message=""): diff --git a/dygraph/val.py b/dygraph/val.py index 78ae845c..1076af4d 100644 --- a/dygraph/val.py +++ b/dygraph/val.py @@ -19,6 +19,7 @@ import math from paddle.fluid.dygraph.base import to_variable import numpy as np import paddle.fluid as fluid +from paddle.fluid.io import DataLoader from datasets import Dataset import transforms as T @@ -26,57 +27,66 @@ import models import utils.logging as logging from utils import get_environ_info from utils import ConfusionMatrix +from utils import DistributedBatchSampler def parse_args(): parser = argparse.ArgumentParser(description='Model training') # params of model - parser.add_argument('--model_name', - dest='model_name', - help="Model type for traing, which is one of ('UNet')", - type=str, - default='UNet') + parser.add_argument( + '--model_name', + dest='model_name', + help="Model type for traing, which is one of ('UNet')", + type=str, + default='UNet') # params of dataset - parser.add_argument('--data_dir', - dest='data_dir', - help='The root directory of dataset', - type=str) - parser.add_argument('--val_list', - dest='val_list', - help='Val list file of dataset', - type=str, - default=None) - parser.add_argument('--num_classes', - dest='num_classes', - help='Number of classes', - type=int, - default=2) + parser.add_argument( + '--data_dir', + dest='data_dir', + help='The root directory of dataset', + type=str) + parser.add_argument( + '--val_list', + dest='val_list', + help='Val list file of dataset', + type=str, + default=None) + parser.add_argument( + '--num_classes', + dest='num_classes', + help='Number of classes', + type=int, + default=2) # params of evaluate - parser.add_argument("--input_size", - dest="input_size", - help="The image size for net inputs.", - nargs=2, - default=[512, 512], - type=int) - parser.add_argument('--batch_size', - dest='batch_size', - help='Mini batch size', - type=int, - default=2) - parser.add_argument('--model_dir', - dest='model_dir', - help='The path of model for evaluation', - type=str, - default=None) + parser.add_argument( + "--input_size", + dest="input_size", + help="The image size for net inputs.", + nargs=2, + default=[512, 512], + type=int) + parser.add_argument( + '--batch_size', + dest='batch_size', + help='Mini batch size', + type=int, + default=2) + parser.add_argument( + '--model_dir', + dest='model_dir', + help='The path of model for evaluation', + type=str, + default=None) return parser.parse_args() def evaluate(model, eval_dataset=None, + places=None, model_dir=None, num_classes=None, batch_size=2, @@ -87,18 +97,23 @@ def evaluate(model, model.set_dict(para_state_dict) model.eval() - data_generator = eval_dataset.generator(batch_size=batch_size, - drop_last=True) + batch_sampler = DistributedBatchSampler( + eval_dataset, batch_size=batch_size, shuffle=True, drop_last=False) + loader = DataLoader( + eval_dataset, + batch_sampler=batch_sampler, + places=places, + return_list=True, + ) total_steps = math.ceil(eval_dataset.num_samples * 1.0 / batch_size) conf_mat = ConfusionMatrix(num_classes, streaming=True) logging.info( "Start to evaluating(total_samples={}, total_steps={})...".format( eval_dataset.num_samples, total_steps)) - for step, data in enumerate(data_generator()): - images = np.array([d[0] for d in data]) - labels = np.array([d[2] for d in data]).astype('int64') - images = to_variable(images) + for step, data in enumerate(loader): + images = data[0] + labels = data[1].astype('int64') pred, _ = model(images, labels, mode='eval') pred = pred.numpy() @@ -120,31 +135,33 @@ def evaluate(model, def main(args): + env_info = get_environ_info() + if env_info['place'] == 'cpu': + places = fluid.CPUPlace() + else: + places = fluid.CUDAPlace(0) with fluid.dygraph.guard(places): eval_transforms = T.Compose([T.Resize(args.input_size), T.Normalize()]) - eval_dataset = Dataset(data_dir=args.data_dir, - file_list=args.val_list, - transforms=eval_transforms, - num_workers='auto', - buffer_size=100, - parallel_method='thread', - shuffle=False) + eval_dataset = Dataset( + data_dir=args.data_dir, + file_list=args.val_list, + transforms=eval_transforms, + num_workers='auto', + buffer_size=100, + parallel_method='thread', + shuffle=False) if args.model_name == 'UNet': model = models.UNet(num_classes=args.num_classes) - evaluate(model, - eval_dataset, - model_dir=args.model_dir, - num_classes=args.num_classes, - batch_size=args.batch_size) + evaluate( + model, + eval_dataset, + model_dir=args.model_dir, + num_classes=args.num_classes, + batch_size=args.batch_size) if __name__ == '__main__': args = parse_args() - env_info = get_environ_info() - if env_info['place'] == 'cpu': - places = fluid.CPUPlace() - else: - places = fluid.CUDAPlace(0) main(args) -- GitLab