diff --git a/distributed.py b/distributed.py index e8a424584e4ce45d5dbe448dcb2f03eb2eaaa74e..c83f8093020b3ac4397c180e57577f02e4963447 100644 --- a/distributed.py +++ b/distributed.py @@ -36,6 +36,7 @@ from paddle.fluid.layers.collective import _c_allreduce, _c_allgather, _c_broadc _c_sync_comm_stream, _c_sync_calc_stream from paddle.fluid.io import BatchSampler, DataLoader +__parallel_context_init = False class DistributedBatchSampler(BatchSampler): """Sampler that restricts data loading to a subset of the dataset. @@ -109,53 +110,6 @@ class DistributedBatchSampler(BatchSampler): return num_samples // self.batch_size -@contextlib.contextmanager -def null_guard(): - yield - - -def to_numpy(var): - assert isinstance(var, (Variable, fluid.core.VarBase)), "not a variable" - if isinstance(var, fluid.core.VarBase): - return var.numpy() - t = global_scope().find_var(var.name).get_tensor() - return np.array(t) - - -def all_gather(input): - place = fluid.CUDAPlace(Env().dev_id) \ - if Env().nranks > 1 else fluid.CUDAPlace(0) - guard = null_guard() if fluid.in_dygraph_mode() else fluid.dygraph.guard(place) - - with guard: - input = to_variable(input) - output = _all_gather(input, Env().nranks) - return to_numpy(output) - - -def _all_reduce(x, out=None, reduce_type="sum", sync_mode=True): - out = _c_allreduce(x, out, reduce_type) - if sync_mode: - return _c_sync_calc_stream(out) - - -def _all_gather(x, nranks, ring_id=0, use_calc_stream=True): - return _c_allgather(x, nranks, ring_id=ring_id, use_calc_stream=use_calc_stream) - - -def _bradcast(x, root=0, ring_id=0, use_calc_stream=True): - return _c_broadcast(x, root, ring_id, use_calc_stream) - - -def _sync_comm_stream(x, ring_id): - return _c_sync_comm_stream(x, ring_id) - - -def barrier(): - pass - - - def get_local_rank(): return Env().local_rank @@ -224,24 +178,45 @@ def init_communicator(program, rank, nranks, wait_port, }) -def prepare_context(place): - +def prepare_distributed_context(place=None): + if place is None: + place = fluid.CUDAPlace(Env().dev_id) if Env().nranks > 1 \ + else fluid.CUDAPlace(0) + strategy = ParallelStrategy() strategy.nranks = Env().nranks strategy.local_rank = Env().local_rank strategy.trainer_endpoints = Env().trainer_endpoints strategy.current_endpoint = Env().current_endpoint + if strategy.nranks < 2: return - if isinstance(place, core.CUDAPlace): - communicator_prog = framework.Program() - init_communicator(communicator_prog, strategy.local_rank, strategy.nranks, True, - strategy.current_endpoint, strategy.trainer_endpoints) - exe = fluid.Executor(place) - exe.run(communicator_prog) + global __parallel_context_init + + if not __parallel_context_init and isinstance(place, core.CUDAPlace): + def _init_context(): + communicator_prog = framework.Program() + init_communicator(communicator_prog, strategy.local_rank, strategy.nranks, + True, strategy.current_endpoint, strategy.trainer_endpoints) + exe = fluid.Executor(place) + exe.run(communicator_prog) + + if fluid.in_dygraph_mode(): + cnt = 0 + while fluid.in_dygraph_mode(): + cnt += 1 + print('debug', cnt) + fluid.disable_dygraph() + _init_context() + fluid.enable_dygraph(place) + else: + _init_context() + else: assert ("Only support CUDAPlace for now.") + + __parallel_context_init = True return strategy diff --git a/mnist.py b/mnist.py index 1b392c0a199db31b5d4d226bcd6fc3f505dfea8e..7beac484dc1ff8bbc015d5ac56a8199e15e25ec5 100644 --- a/mnist.py +++ b/mnist.py @@ -21,15 +21,14 @@ import os import numpy as np -import paddle from paddle import fluid from paddle.fluid.optimizer import Momentum from paddle.fluid.dygraph.nn import Conv2D, Pool2D, Linear +from paddle.fluid.io import MNIST as MnistDataset from model import Model, CrossEntropy, Input from metrics import Accuracy -from distributed import prepare_context, Env, get_nranks, DistributedBatchSampler -from paddle.fluid.io import BatchSampler, DataLoader, MnistDataset + class SimpleImgConvPool(fluid.dygraph.Layer): def __init__(self, @@ -106,71 +105,28 @@ class MNIST(Model): return x -class CustromMnistDataset(MnistDataset): - def __init__(self, - image_filename=None, - label_filename=None, - mode='train', - download=True): - super(CustromMnistDataset, self).__init__(image_filename, - label_filename, mode, download) - - - def __getitem__(self, idx): - return self.images[idx], [self.labels[idx]] - - def main(): - @contextlib.contextmanager - def null_guard(): - yield - place = fluid.CUDAPlace(fluid.dygraph.parallel.Env().dev_id) \ if fluid.dygraph.parallel.Env().nranks > 1 else fluid.CUDAPlace(0) - guard = fluid.dygraph.guard(place) if FLAGS.dynamic else null_guard() - if fluid.dygraph.parallel.Env().nranks > 1: - prepare_context(place) - - - if not os.path.exists('mnist_checkpoints'): - os.mkdir('mnist_checkpoints') - - with guard: - train_dataset = CustromMnistDataset(mode='train') - val_dataset = CustromMnistDataset(mode='test') - - inputs = [Input([None, 784], 'float32', name='image')] - labels = [Input([None, 1], 'int64', name='label')] - - if fluid.in_dygraph_mode(): - feed_list = None - else: - feed_list = [x.forward() for x in inputs + labels] - - if get_nranks() > 1: - train_sampler = DistributedBatchSampler(train_dataset, batch_size=FLAGS.batch_size, shuffle=True) - train_loader = DataLoader(train_dataset, batch_sampler=train_sampler, places=place, - feed_list=feed_list, num_workers=4, return_list=True) - val_sampler = DistributedBatchSampler(val_dataset, batch_size=FLAGS.batch_size) - val_loader = DataLoader(val_dataset, batch_sampler=val_sampler, places=place, - feed_list=feed_list, num_workers=4, return_list=True) - else: - train_loader = DataLoader(train_dataset, batch_size=FLAGS.batch_size, places=place, - feed_list=feed_list, num_workers=4, return_list=True) - val_loader = DataLoader(val_dataset, batch_size=FLAGS.batch_size, places=place, - feed_list=feed_list, num_workers=4, return_list=True) - - model = MNIST() - optim = Momentum( - learning_rate=FLAGS.lr, - momentum=.9, - parameter_list=model.parameters()) - - model.prepare(optim, CrossEntropy(), Accuracy(topk=(1, 2)), inputs, labels) - if FLAGS.resume is not None: - model.load(FLAGS.resume) - - model.fit(train_loader, val_loader, epochs=FLAGS.epoch) + fluid.enable_dygraph(place) if FLAGS.dynamic else None + + train_dataset = MnistDataset(mode='train') + val_dataset = MnistDataset(mode='test') + + inputs = [Input([None, 784], 'float32', name='image')] + labels = [Input([None, 1], 'int64', name='label')] + + model = MNIST() + optim = Momentum( + learning_rate=FLAGS.lr, + momentum=.9, + parameter_list=model.parameters()) + + model.prepare(optim, CrossEntropy(), Accuracy(topk=(1, 2)), inputs, labels) + if FLAGS.resume is not None: + model.load(FLAGS.resume) + + model.fit(train_dataset, val_dataset, epochs=FLAGS.epoch, batch_size=FLAGS.batch_size) if __name__ == '__main__': @@ -178,7 +134,7 @@ if __name__ == '__main__': parser.add_argument( "-d", "--dynamic", action='store_true', help="enable dygraph mode") parser.add_argument( - "-e", "--epoch", default=100, type=int, help="number of epoch") + "-e", "--epoch", default=2, type=int, help="number of epoch") parser.add_argument( '--lr', '--learning-rate', diff --git a/model.py b/model.py index 3d023bbc29f056618873cc5eef0a4dea750b9f4b..01f452fd4d213999238ac90d40519102196f160b 100644 --- a/model.py +++ b/model.py @@ -33,7 +33,8 @@ from paddle.fluid.dygraph.base import to_variable from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy import paddle.fluid.incubate.fleet.base.role_maker as role_maker import distributed - +from distributed import DistributedBatchSampler +from paddle.fluid.io import DataLoader from metrics import Metric from callbacks import config_callbacks @@ -348,6 +349,7 @@ class StaticGraphAdapter(object): for metric, state in zip(self.model._metrics, metric_states): # cut off padding size if self.mode != 'train' and self.model._test_dataloader is not None \ + and isinstance(self.model._test_dataloader, DataLoader) \ and self._nranks > 1: total_size = len(self.model._test_dataloader.dataset) # TODO: fixme if have better way to get batch size @@ -417,7 +419,8 @@ class StaticGraphAdapter(object): strategy=dist_strategy) self.model._optimizer.minimize(self._loss_endpoint) - if self._nranks > 1 and mode != 'train' and self.model._test_dataloader is not None: + if self._nranks > 1 and mode != 'train' and self.model._test_dataloader is not None \ + and isinstance(self.model._test_dataloader, DataLoader): outputs = [distributed._all_gather(o, self._nranks) for o in outputs] if mode != 'test': labels = [distributed._all_gather(l, self._nranks) for l in labels] @@ -457,7 +460,7 @@ class StaticGraphAdapter(object): # therefore startup program only needs to run once if self._executor is None: if self._nranks > 1 and device.lower() == 'gpu': - gpu_id = int(os.environ.get('FLAGS_selected_gpus', 0)) + gpu_id = int(distributed.Env().dev_id) place = fluid.CUDAPlace(gpu_id) if device.lower() == 'gpu' else fluid.CPUPlace() else: place = places[0] @@ -529,6 +532,7 @@ class DynamicGraphAdapter(object): losses = self.model._loss_function(outputs, labels) final_loss = fluid.layers.sum(losses) final_loss.backward() + self.model._optimizer.minimize(final_loss) self.model.clear_gradients() metrics = [] @@ -536,6 +540,7 @@ class DynamicGraphAdapter(object): metric_outs = metric.add_metric_op(to_list(outputs), to_list(labels)) m = metric.update(*[to_numpy(m) for m in to_list(metric_outs)]) metrics.append(m) + return ([to_numpy(l) for l in losses], metrics) \ if len(metrics) > 0 else [to_numpy(l) for l in losses] @@ -667,10 +672,16 @@ class Model(fluid.dygraph.Layer): self._device = None self._device_ids = None self._optimizer = None - self._distributed_sampler = None self._test_dataloader = None - if in_dygraph_mode(): + # init multiple gpus context + self._place = fluid.CUDAPlace(distributed.Env().dev_id) \ + if distributed.Env().nranks > 1 else fluid.CUDAPlace(0) + if distributed.get_nranks() > 1: + distributed.prepare_distributed_context(self._place) + + # init backend + if fluid.in_dygraph_mode(): self._adapter = DynamicGraphAdapter(self) else: self._adapter = StaticGraphAdapter(self) @@ -799,6 +810,7 @@ class Model(fluid.dygraph.Layer): the variable to the environment variable and set its value to 1. The default is None. """ + self._optimizer = optimizer if loss_function: if not isinstance(loss_function, Loss): @@ -830,13 +842,17 @@ class Model(fluid.dygraph.Layer): def fit( self, + train_dataset=None, + eval_dataset=None, train_loader=None, eval_loader=None, + batch_size=1, epochs=1, eval_freq=1, log_freq=10, save_freq=1, verbose=2, + num_workers=0, callbacks=None, ): """ FIXME: add more comments and usage @@ -853,6 +869,57 @@ class Model(fluid.dygraph.Layer): callbacks (Callback|None): list of `Callback` instances to apply during training. """ + + assert train_dataset is not None or train_loader is not None, \ + "train_dataset or train_loader must be given" + + assert (train_loader is not None and train_dataset is None) or \ + (train_loader is None and train_dataset is not None), \ + "train_dataset should not be set when train_loader is given" + + if fluid.in_dygraph_mode(): + feed_list = None + else: + feed_list = [x.forward() for x in self._inputs + self._labels] + + if train_loader is None: + if distributed.get_nranks() > 1: + train_sampler = DistributedBatchSampler(train_dataset, + batch_size=batch_size, + shuffle=True) + train_loader = DataLoader(train_dataset, + batch_sampler=train_sampler, + places=self._place, + feed_list=feed_list, + num_workers=num_workers, + return_list=True) + + else: + train_loader = DataLoader(train_dataset, + batch_size=batch_size, + places=self._place, + feed_list=feed_list, + num_workers=4, + return_list=True) + + if eval_loader is None and eval_dataset is not None: + if distributed.get_nranks() > 1: + eval_sampler = DistributedBatchSampler(eval_dataset, + batch_size=batch_size) + eval_loader = DataLoader(eval_dataset, + batch_sampler=eval_sampler, + places=self._place, + feed_list=feed_list, + num_workers=num_workers, + return_list=True) + else: + eval_loader = DataLoader(eval_dataset, + batch_size=batch_size, + places=self._place, + feed_list=feed_list, + num_workers=num_workers, + return_list=True) + do_eval = eval_loader is not None self._test_dataloader = eval_loader metrics_name = self._metrics_name() diff --git a/tests/test_model.py b/tests/test_model.py index 02e7bd3589996d98fbbc2f3813cb7dce2cded882..f3829becb40e34775ea02d4347a95ecf34d87cc1 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -31,8 +31,9 @@ from paddle.fluid.dygraph.nn import Conv2D, Pool2D, Linear from model import Model, CrossEntropy, Input, Loss from metrics import Accuracy from callbacks import ProgBarLogger -from paddle.fluid.io import BatchSampler, DataLoader, MnistDataset -from distributed import * +from paddle.fluid.io import BatchSampler, DataLoader +from paddle.fluid.io import MNIST as MnistDataset + class SimpleImgConvPool(fluid.dygraph.Layer): def __init__(self, @@ -143,65 +144,30 @@ class MyCrossEntropy(Loss): return [loss1, loss2] -class CustromMnistDataset(MnistDataset): - def __init__(self, - image_filename=None, - label_filename=None, - mode='train', - download=True): - super(CustromMnistDataset, self).__init__(image_filename, label_filename, mode, download) - - - def __getitem__(self, idx): - return self.images[idx], [self.labels[idx]] - - - class TestModel(unittest.TestCase): def fit(self, dynamic, is_mlp=False): im_shape = (-1, 784) - guard = fluid.dygraph.guard() if dynamic else null_guard() batch_size = 128 + place = fluid.CUDAPlace(fluid.dygraph.parallel.Env().dev_id) \ if fluid.dygraph.parallel.Env().nranks > 1 else fluid.CUDAPlace(0) - guard = fluid.dygraph.guard(place) if dynamic else null_guard() - if fluid.dygraph.parallel.Env().nranks > 1: - prepare_context(place) - - with guard: - inputs = [Input(im_shape, 'float32', name='image')] - labels = [Input([None, 1], 'int64', name='label')] - - if fluid.in_dygraph_mode(): - feed_list = None - else: - feed_list = [x.forward() for x in inputs + labels] - train_dataset = CustromMnistDataset(mode='train') - val_dataset = CustromMnistDataset(mode='test') - - if get_nranks() > 1: - train_sampler = DistributedBatchSampler(train_dataset, batch_size=batch_size, shuffle=True) - train_loader = DataLoader(train_dataset, batch_sampler=train_sampler, places=place, - feed_list=feed_list, num_workers=4, return_list=True) - val_sampler = DistributedBatchSampler(val_dataset, batch_size=batch_size) - val_loader = DataLoader(val_dataset, batch_sampler=val_sampler, places=place, - feed_list=feed_list, num_workers=4, return_list=True) - else: - train_loader = DataLoader(train_dataset, batch_size=batch_size, places=place, - feed_list=feed_list, num_workers=4, return_list=True) - val_loader = DataLoader(val_dataset, batch_size=batch_size, places=place, - feed_list=feed_list, num_workers=4, return_list=True) - - - model = MNIST() if not is_mlp else MLP() - optim = fluid.optimizer.Momentum( - learning_rate=0.01, - momentum=.9, - parameter_list=model.parameters()) - loss = CrossEntropy() if not is_mlp else MyCrossEntropy() - model.prepare(optim, loss, Accuracy(), inputs, labels) - cbk = ProgBarLogger(50) - model.fit(train_loader, val_loader, epochs=2, callbacks=cbk) + fluid.enable_dygraph(place) if dynamic else None + + inputs = [Input(im_shape, 'float32', name='image')] + labels = [Input([None, 1], 'int64', name='label')] + + train_dataset = MnistDataset(mode='train') + val_dataset = MnistDataset(mode='test') + + model = MNIST() if not is_mlp else MLP() + optim = fluid.optimizer.Momentum( + learning_rate=0.01, + momentum=.9, + parameter_list=model.parameters()) + loss = CrossEntropy() if not is_mlp else MyCrossEntropy() + model.prepare(optim, loss, Accuracy(), inputs, labels) + cbk = ProgBarLogger(50) + model.fit(train_dataset, val_dataset, epochs=2, batch_size=batch_size, callbacks=cbk) def test_fit_static(self): self.fit(False)