diff --git a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_gradscaler.py b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_gradscaler.py index c0f671e7e446bcb749ab9fec120b905eb570506e..0b7e1e59951370fa7d6c3e0e1df6a39cd2c3a3b9 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_gradscaler.py +++ b/python/paddle/distributed/fleet/meta_optimizers/dygraph_optimizer/hybrid_parallel_gradscaler.py @@ -30,8 +30,8 @@ class HybridParallelGradScaler: def __init__(self, scaler, hcg): self._scaler = scaler self._hcg = hcg - self._is_mp = ( - self._hcg.get_parallel_mode() == ParallelMode.TENSOR_PARALLEL) + self._use_dp_mode = ( + self._hcg.get_parallel_mode() == ParallelMode.DATA_PARALLEL) def scale(self, var): return self._scaler.scale(var) @@ -67,7 +67,7 @@ class HybridParallelGradScaler: core.ops.check_finite_and_unscale(param_grads, self._scale, param_grads, self._found_inf) # allreduce_max found_inf in check_group - if self._is_mp: + if not self._use_dp_mode: self._found_inf = paddle.cast(self._found_inf, dtype="int32") # TODO(shenliang03) Since the minimize call in the optimizer is # after the gradscaler, check_finite needs to synchronize global diff --git a/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py b/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py index 343e6db04c24c4d4eb1b7c28b0171c4577f0610b..c30167bb7c52b999b105304cbe8c98be06e003bb 100644 --- a/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py +++ b/python/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py @@ -106,11 +106,12 @@ class PipelineParallel(MetaParallelBase): group=self.pp_group) return loss - def train_batch(self, data, optimizer, lr_scheduler=None): + def train_batch(self, data, optimizer, lr_scheduler=None, scaler=None): assert isinstance(optimizer, HybridParallelOptimizer), ( 'optimizer should be HybridParallelOptimizer subclass.') self.optimizer = optimizer self.lr_scheduler = lr_scheduler + self.scaler = scaler assert fluid.framework._dygraph_tracer()._has_grad, ( 'Please enable the generation of gradients.') @@ -143,8 +144,8 @@ class PipelineParallel(MetaParallelBase): self._layers.allreduce_shared_weight_gradients() # optimizer - self._step() self.train_loss = self._reduce_final_loss() + self._step() return self.train_loss def _forward(self, cache_id): @@ -192,7 +193,12 @@ class PipelineParallel(MetaParallelBase): def _backward(self, cache_id): if self.is_last_stage: - paddle.autograd.backward(self.caches['outputs'][cache_id]) + if self.scaler: + paddle.autograd.backward( + self.scaler.scale(self.caches['outputs'][cache_id])) + else: + paddle.autograd.backward(self.caches['outputs'][cache_id]) + self._send_gradients(cache_id) return self._recv_gradients(cache_id) @@ -441,7 +447,10 @@ class PipelineParallel(MetaParallelBase): p2p.recv(d, self.next_stage_id) def _step(self): - self.optimizer.step() + if self.scaler: + self.scaler.minimize(self.optimizer, self.train_loss) + else: + self.optimizer.step() self.optimizer.clear_grad() if self.lr_scheduler: self.lr_scheduler.step() diff --git a/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_amp.py b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_amp.py new file mode 100644 index 0000000000000000000000000000000000000000..33a04a5e7e1838eff169247fb810bff38868c45e --- /dev/null +++ b/python/paddle/fluid/tests/unittests/hybrid_parallel_pp_amp.py @@ -0,0 +1,126 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division +from __future__ import print_function + +import unittest +import paddle +import numpy as np +import random +import paddle +import paddle.distributed as dist +import paddle.distributed.fleet as fleet +from hybrid_parallel_pp_layer import AlexNetPipeDesc, AlexNet + + +def set_random_seed(seed, dp_id, rank_id): + """Set random seed for reproducability.""" + random.seed(seed) + np.random.seed(seed + dp_id) + paddle.seed(seed + dp_id) + + +batch_size = 4 +micro_batch_size = 2 + + +class TestDistPPTraning(unittest.TestCase): + def setUp(self): + strategy = fleet.DistributedStrategy() + self.model_parallel_size = 1 + self.data_parallel_size = 1 + self.pipeline_parallel_size = 2 + strategy.hybrid_configs = { + "dp_degree": self.data_parallel_size, + "mp_degree": self.model_parallel_size, + "pp_degree": self.pipeline_parallel_size, + } + strategy.pipeline_configs = { + "accumulate_steps": batch_size // micro_batch_size, + "micro_batch_size": micro_batch_size + } + fleet.init(is_collective=True, strategy=strategy) + + def test_pp_model(self): + hcg = fleet.get_hybrid_communicate_group() + word_size = hcg.get_model_parallel_world_size() + dp_id = hcg.get_data_parallel_rank() + pp_id = hcg.get_stage_id() + rank_id = dist.get_rank() + set_random_seed(1024, dp_id, rank_id) + + #construct model a + model_a = AlexNet(10) + scheduler_a = paddle.optimizer.lr.PiecewiseDecay( + boundaries=[2], values=[0.001, 0.002], verbose=True) + optimizer_a = paddle.optimizer.SGD(learning_rate=scheduler_a, + parameters=model_a.parameters()) + + scaler_a = paddle.amp.GradScaler(init_loss_scaling=2**5) + + param_len = len(model_a.parameters()) + parameters = [] + for param in model_a.parameters(): + parameters.append(param.numpy()) + + # construct model b + model_b = AlexNetPipeDesc(num_stages=self.pipeline_parallel_size) + scheduler_b = paddle.optimizer.lr.PiecewiseDecay( + boundaries=[2], values=[0.001, 0.002], verbose=True) + optimizer_b = paddle.optimizer.SGD(learning_rate=scheduler_b, + parameters=model_b.parameters()) + model_b = fleet.distributed_model(model_b) + optimizer_b = fleet.distributed_optimizer(optimizer_b) + scaler_b = paddle.amp.GradScaler(init_loss_scaling=2**5) + scaler_b = fleet.distributed_scaler(scaler_b) + + for idx, param in enumerate(model_b.parameters()): + param.set_value(parameters[idx + pp_id * (param_len // 2)]) + + # construct reader + train_reader = paddle.batch( + paddle.dataset.mnist.train(), batch_size=batch_size, drop_last=True) + + for step_id, data in enumerate(train_reader()): + x_data = np.array([x[0] for x in data]).astype('float32').reshape( + batch_size, 1, 28, 28) + y_data = np.array([x[1] for x in data]).astype('int64').reshape( + batch_size, 1) + img = paddle.to_tensor(x_data) + label = paddle.to_tensor(y_data) + img.stop_gradient = True + label.stop_gradient = True + + if step_id >= 5: + return True + + with paddle.amp.auto_cast(): + loss_a = model_a(img, label) + scaler_a.scale(loss_a).backward() + scaler_a.minimize(optimizer_a, loss_a) + optimizer_a.clear_grad() + scheduler_a.step() + + with paddle.amp.auto_cast(): + loss_b = model_b.train_batch( + [img, label], optimizer_b, scheduler_b, scaler=scaler_b) + + print("loss: ", loss_a.numpy(), loss_b.numpy()) + np.testing.assert_allclose( + loss_a.numpy(), loss_b.numpy(), rtol=5e-5) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_parallel.py b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_parallel.py index ef8ee2e4ad4454d7cdef9350c53a19772a800f70..73967782aea2dae373fe260385e6963fa504b0b8 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_parallel.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_dygraph_pipeline_parallel.py @@ -30,6 +30,9 @@ class TestHybridPipeParallel(TestMultipleGpus): def test_hybrid_parallel_pp_tuple_inputs(self): self.run_mnist_2gpu('hybrid_parallel_shared_weight.py') + def test_pipeline_parallel(self): + self.run_mnist_2gpu('hybrid_parallel_pp_amp.py') + if __name__ == "__main__": unittest.main()