提交 e1e186ad 编写于 作者: C chenguowei01

add parallel training

上级 76fef60e
...@@ -57,7 +57,7 @@ class OpticDiscSeg(Dataset): ...@@ -57,7 +57,7 @@ class OpticDiscSeg(Dataset):
if mode == 'train': if mode == 'train':
file_list = os.path.join(self.data_dir, 'train_list.txt') file_list = os.path.join(self.data_dir, 'train_list.txt')
elif mode == 'eval': elif mode == 'eval':
file_list = os.paht.join(self.data_dir, 'val_list.txt') file_list = os.path.join(self.data_dir, 'val_list.txt')
else: else:
file_list = os.path.join(self.data_dir, 'test_list.txt') file_list = os.path.join(self.data_dir, 'test_list.txt')
else: else:
......
...@@ -132,12 +132,19 @@ def train(model, ...@@ -132,12 +132,19 @@ def train(model,
save_interval_epochs=1, save_interval_epochs=1,
num_classes=None, num_classes=None,
num_workers=8): num_workers=8):
ignore_index = model.ignore_index
nranks = ParallelEnv().nranks
load_pretrained_model(model, pretrained_model)
if not os.path.isdir(save_dir): if not os.path.isdir(save_dir):
if os.path.exists(save_dir): if os.path.exists(save_dir):
os.remove(save_dir) os.remove(save_dir)
os.makedirs(save_dir) os.makedirs(save_dir)
load_pretrained_model(model, pretrained_model) if nranks > 1:
strategy = fluid.dygraph.prepare_context()
model_parallel = fluid.dygraph.DataParallel(model, strategy)
batch_sampler = DistributedBatchSampler( batch_sampler = DistributedBatchSampler(
train_dataset, batch_size=batch_size, shuffle=True, drop_last=True) train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
...@@ -155,32 +162,39 @@ def train(model, ...@@ -155,32 +162,39 @@ def train(model,
for step, data in enumerate(loader): for step, data in enumerate(loader):
images = data[0] images = data[0]
labels = data[1].astype('int64') labels = data[1].astype('int64')
loss = model(images, labels, mode='train') if nranks > 1:
loss.backward() loss = model_parallel(images, labels, mode='train')
loss = model_parallel.scale_loss(loss)
loss.backward()
model_parallel.apply_collective_grads()
else:
loss = model(images, labels, mode='train')
loss.backward()
optimizer.minimize(loss) optimizer.minimize(loss)
model_parallel.clear_gradients()
logging.info("[TRAIN] Epoch={}/{}, Step={}/{}, loss={}".format( logging.info("[TRAIN] Epoch={}/{}, Step={}/{}, loss={}".format(
epoch + 1, num_epochs, step + 1, num_steps_each_epoch, epoch + 1, num_epochs, step + 1, num_steps_each_epoch,
loss.numpy())) loss.numpy()))
if ( if ((epoch + 1) % save_interval_epochs == 0
epoch + 1 or num_steps_each_epoch == num_epochs - 1
) % save_interval_epochs == 0 or num_steps_each_epoch == num_epochs - 1: ) and ParallelEnv().local_rank == 0:
current_save_dir = os.path.join(save_dir, current_save_dir = os.path.join(save_dir,
"epoch_{}".format(epoch + 1)) "epoch_{}".format(epoch + 1))
if not os.path.isdir(current_save_dir): if not os.path.isdir(current_save_dir):
os.makedirs(current_save_dir) os.makedirs(current_save_dir)
fluid.save_dygraph(model.state_dict(), fluid.save_dygraph(model_parallel.state_dict(),
os.path.join(current_save_dir, 'model')) os.path.join(current_save_dir, 'model'))
if eval_dataset is not None: if eval_dataset is not None:
model.eval()
evaluate( evaluate(
model, model,
eval_dataset, eval_dataset,
places=places,
model_dir=current_save_dir, model_dir=current_save_dir,
num_classes=num_classes, num_classes=num_classes,
batch_size=batch_size, batch_size=batch_size,
ignore_index=model.ignore_index, ignore_index=ignore_index,
epoch_id=epoch + 1) epoch_id=epoch + 1)
model.train() model.train()
...@@ -188,7 +202,7 @@ def train(model, ...@@ -188,7 +202,7 @@ def train(model,
def main(args): def main(args):
env_info = get_environ_info() env_info = get_environ_info()
places = fluid.CUDAPlace(ParallelEnv().dev_id) \ places = fluid.CUDAPlace(ParallelEnv().dev_id) \
if env_info['place'] == 'gpu' and fluid.is_compiled_with_cuda() \ if env_info['place'] == 'cuda' and fluid.is_compiled_with_cuda() \
else fluid.CPUPlace() else fluid.CPUPlace()
with fluid.dygraph.guard(places): with fluid.dygraph.guard(places):
...@@ -200,18 +214,13 @@ def main(args): ...@@ -200,18 +214,13 @@ def main(args):
]) ])
train_dataset = OpticDiscSeg(transforms=train_transforms, mode='train') train_dataset = OpticDiscSeg(transforms=train_transforms, mode='train')
eval_dataset = None
if args.val_list is not None: if args.val_list is not None:
eval_transforms = T.Compose( eval_transforms = T.Compose(
[T.Resize(args.input_size), [T.Resize(args.input_size),
T.Normalize()]) T.Normalize()])
eval_dataset = Dataset( eval_dataset = OpticDiscSeg(
data_dir=args.data_dir, transforms=train_transforms, mode='eval')
file_list=args.val_list,
transforms=eval_transforms,
num_workers='auto',
buffer_size=100,
parallel_method='thread',
shuffle=False)
if args.model_name == 'UNet': if args.model_name == 'UNet':
model = models.UNet(num_classes=args.num_classes, ignore_index=255) model = models.UNet(num_classes=args.num_classes, ignore_index=255)
...@@ -244,5 +253,4 @@ def main(args): ...@@ -244,5 +253,4 @@ def main(args):
if __name__ == '__main__': if __name__ == '__main__':
args = parse_args() args = parse_args()
print(args)
main(args) main(args)
...@@ -16,18 +16,22 @@ import time ...@@ -16,18 +16,22 @@ import time
import os import os
import sys import sys
from paddle.fluid.dygraph.parallel import ParallelEnv
levels = {0: 'ERROR', 1: 'WARNING', 2: 'INFO', 3: 'DEBUG'} levels = {0: 'ERROR', 1: 'WARNING', 2: 'INFO', 3: 'DEBUG'}
log_level = 2 log_level = 2
def log(level=2, message=""): def log(level=2, message=""):
current_time = time.time() if ParallelEnv().local_rank == 0:
time_array = time.localtime(current_time) current_time = time.time()
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time_array) time_array = time.localtime(current_time)
if log_level >= level: current_time = time.strftime("%Y-%m-%d %H:%M:%S", time_array)
print("{} [{}]\t{}".format(current_time, levels[level], if log_level >= level:
message).encode("utf-8").decode("latin1")) print(
sys.stdout.flush() "{} [{}]\t{}".format(current_time, levels[level],
message).encode("utf-8").decode("latin1"))
sys.stdout.flush()
def debug(message=""): def debug(message=""):
......
...@@ -19,6 +19,7 @@ import math ...@@ -19,6 +19,7 @@ import math
from paddle.fluid.dygraph.base import to_variable from paddle.fluid.dygraph.base import to_variable
import numpy as np import numpy as np
import paddle.fluid as fluid import paddle.fluid as fluid
from paddle.fluid.io import DataLoader
from datasets import Dataset from datasets import Dataset
import transforms as T import transforms as T
...@@ -26,57 +27,66 @@ import models ...@@ -26,57 +27,66 @@ import models
import utils.logging as logging import utils.logging as logging
from utils import get_environ_info from utils import get_environ_info
from utils import ConfusionMatrix from utils import ConfusionMatrix
from utils import DistributedBatchSampler
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description='Model training') parser = argparse.ArgumentParser(description='Model training')
# params of model # params of model
parser.add_argument('--model_name', parser.add_argument(
dest='model_name', '--model_name',
help="Model type for traing, which is one of ('UNet')", dest='model_name',
type=str, help="Model type for traing, which is one of ('UNet')",
default='UNet') type=str,
default='UNet')
# params of dataset # params of dataset
parser.add_argument('--data_dir', parser.add_argument(
dest='data_dir', '--data_dir',
help='The root directory of dataset', dest='data_dir',
type=str) help='The root directory of dataset',
parser.add_argument('--val_list', type=str)
dest='val_list', parser.add_argument(
help='Val list file of dataset', '--val_list',
type=str, dest='val_list',
default=None) help='Val list file of dataset',
parser.add_argument('--num_classes', type=str,
dest='num_classes', default=None)
help='Number of classes', parser.add_argument(
type=int, '--num_classes',
default=2) dest='num_classes',
help='Number of classes',
type=int,
default=2)
# params of evaluate # params of evaluate
parser.add_argument("--input_size", parser.add_argument(
dest="input_size", "--input_size",
help="The image size for net inputs.", dest="input_size",
nargs=2, help="The image size for net inputs.",
default=[512, 512], nargs=2,
type=int) default=[512, 512],
parser.add_argument('--batch_size', type=int)
dest='batch_size', parser.add_argument(
help='Mini batch size', '--batch_size',
type=int, dest='batch_size',
default=2) help='Mini batch size',
parser.add_argument('--model_dir', type=int,
dest='model_dir', default=2)
help='The path of model for evaluation', parser.add_argument(
type=str, '--model_dir',
default=None) dest='model_dir',
help='The path of model for evaluation',
type=str,
default=None)
return parser.parse_args() return parser.parse_args()
def evaluate(model, def evaluate(model,
eval_dataset=None, eval_dataset=None,
places=None,
model_dir=None, model_dir=None,
num_classes=None, num_classes=None,
batch_size=2, batch_size=2,
...@@ -87,18 +97,23 @@ def evaluate(model, ...@@ -87,18 +97,23 @@ def evaluate(model,
model.set_dict(para_state_dict) model.set_dict(para_state_dict)
model.eval() model.eval()
data_generator = eval_dataset.generator(batch_size=batch_size, batch_sampler = DistributedBatchSampler(
drop_last=True) eval_dataset, batch_size=batch_size, shuffle=True, drop_last=False)
loader = DataLoader(
eval_dataset,
batch_sampler=batch_sampler,
places=places,
return_list=True,
)
total_steps = math.ceil(eval_dataset.num_samples * 1.0 / batch_size) total_steps = math.ceil(eval_dataset.num_samples * 1.0 / batch_size)
conf_mat = ConfusionMatrix(num_classes, streaming=True) conf_mat = ConfusionMatrix(num_classes, streaming=True)
logging.info( logging.info(
"Start to evaluating(total_samples={}, total_steps={})...".format( "Start to evaluating(total_samples={}, total_steps={})...".format(
eval_dataset.num_samples, total_steps)) eval_dataset.num_samples, total_steps))
for step, data in enumerate(data_generator()): for step, data in enumerate(loader):
images = np.array([d[0] for d in data]) images = data[0]
labels = np.array([d[2] for d in data]).astype('int64') labels = data[1].astype('int64')
images = to_variable(images)
pred, _ = model(images, labels, mode='eval') pred, _ = model(images, labels, mode='eval')
pred = pred.numpy() pred = pred.numpy()
...@@ -120,31 +135,33 @@ def evaluate(model, ...@@ -120,31 +135,33 @@ def evaluate(model,
def main(args): def main(args):
env_info = get_environ_info()
if env_info['place'] == 'cpu':
places = fluid.CPUPlace()
else:
places = fluid.CUDAPlace(0)
with fluid.dygraph.guard(places): with fluid.dygraph.guard(places):
eval_transforms = T.Compose([T.Resize(args.input_size), T.Normalize()]) eval_transforms = T.Compose([T.Resize(args.input_size), T.Normalize()])
eval_dataset = Dataset(data_dir=args.data_dir, eval_dataset = Dataset(
file_list=args.val_list, data_dir=args.data_dir,
transforms=eval_transforms, file_list=args.val_list,
num_workers='auto', transforms=eval_transforms,
buffer_size=100, num_workers='auto',
parallel_method='thread', buffer_size=100,
shuffle=False) parallel_method='thread',
shuffle=False)
if args.model_name == 'UNet': if args.model_name == 'UNet':
model = models.UNet(num_classes=args.num_classes) model = models.UNet(num_classes=args.num_classes)
evaluate(model, evaluate(
eval_dataset, model,
model_dir=args.model_dir, eval_dataset,
num_classes=args.num_classes, model_dir=args.model_dir,
batch_size=args.batch_size) num_classes=args.num_classes,
batch_size=args.batch_size)
if __name__ == '__main__': if __name__ == '__main__':
args = parse_args() args = parse_args()
env_info = get_environ_info()
if env_info['place'] == 'cpu':
places = fluid.CPUPlace()
else:
places = fluid.CUDAPlace(0)
main(args) main(args)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册