diff --git a/python/paddle/fluid/layers/collective.py b/python/paddle/fluid/layers/collective.py index 43eb436f65e78114fe4a4c9bf7450faca0d87b38..0b4211cbb63dc709a53399813892c944e0cfce11 100644 --- a/python/paddle/fluid/layers/collective.py +++ b/python/paddle/fluid/layers/collective.py @@ -14,7 +14,9 @@ from __future__ import print_function from ..layer_helper import LayerHelper, unique_name -from ..framework import Variable +from ..framework import Variable, in_dygraph_mode, _in_legacy_dygraph +import paddle +from paddle import _C_ops def _allreduce(x, out=None, reduce_type="sum", sync_mode=False): @@ -107,6 +109,21 @@ def _c_broadcast(x, root=0, ring_id=0, use_calc_stream=False): def _c_allgather(x, nranks, ring_id=0, use_calc_stream=False): op_type = 'c_allgather' + + if in_dygraph_mode(): + group = paddle.distributed.collective._get_default_group() + tensor_shape = list(x.shape) + tensor_shape[0] *= nranks + out = paddle.empty(tensor_shape, x.dtype) + task = group.process_group.all_gather(x, out) + task.wait() + return out + + if _in_legacy_dygraph(): + attrs = ('nranks', nranks, 'ring_id', ring_id, 'use_calc_stream', + use_calc_stream) + return _C_ops.c_allgather(x, *attrs) + helper = LayerHelper(op_type, **locals()) out_shape = list(x.shape[:]) if out_shape[0] > 0: diff --git a/python/paddle/hapi/model.py b/python/paddle/hapi/model.py index 4b349a1957731b1249e98dd499ebbd6c70b2b5b6..a7a5e59f39409682f8b3b96c2cac10c43ccb9706 100644 --- a/python/paddle/hapi/model.py +++ b/python/paddle/hapi/model.py @@ -29,7 +29,7 @@ import contextlib import paddle from paddle import fluid from paddle.fluid import core -from paddle.fluid.framework import _non_static_mode +from paddle.fluid.framework import _non_static_mode, in_dygraph_mode from paddle.fluid.framework import Variable from paddle.fluid.framework import _get_paddle_place from paddle.fluid.framework import _current_expected_place as _get_device @@ -761,6 +761,15 @@ class DynamicGraphAdapter(object): labels = [to_variable(l) for l in to_list(labels)] outputs = self.model.network.forward(*[to_variable(x) for x in inputs]) + + # Transfrom data to expected device + expected_device = paddle.device.get_device() + for o in to_list(outputs): + o._to(device=expected_device) + + for l in labels: + l._to(device=expected_device) + if self.model._loss: losses = self.model._loss(*(to_list(outputs) + labels)) losses = to_list(losses) @@ -2088,7 +2097,6 @@ class Model(object): callbacks.on_batch_begin(mode, step, logs) if mode != 'predict': - _inputs = [data[:len(self._inputs)], data[len(self._inputs):]] if mode == 'train': _inputs.append((step + 1) % self._accumulate == 0 or diff --git a/python/paddle/tests/dist_hapi_mnist_dynamic.py b/python/paddle/tests/dist_hapi_mnist_dynamic.py index eab34a6dafbc354a24aa51e93a9fec9efc3b3cee..de0518e229b0a54ac0c0161b4ce772087b5410cb 100644 --- a/python/paddle/tests/dist_hapi_mnist_dynamic.py +++ b/python/paddle/tests/dist_hapi_mnist_dynamic.py @@ -58,7 +58,7 @@ def compute_accuracy(pred, gt): @unittest.skipIf(not fluid.is_compiled_with_cuda(), 'CPU testing is not supported') class TestDistTraning(unittest.TestCase): - def test_static_multiple_gpus(self): + def test_dynamic_multiple_gpus(self): device = set_device('gpu') im_shape = (-1, 1, 28, 28) diff --git a/python/paddle/tests/test_dist_hapi_model.py b/python/paddle/tests/test_dist_hapi_model.py index 16788e4656192e43f17e09464d1d53ab6dda3ce7..006800d3caeee23cc1cbcdd4ea28c3a0beb0d477 100644 --- a/python/paddle/tests/test_dist_hapi_model.py +++ b/python/paddle/tests/test_dist_hapi_model.py @@ -52,6 +52,7 @@ def get_gpus(selected_gpus): def start_local_trainers(cluster, pod, training_script, + eager_mode, training_script_args, log_dir=None): current_env = copy.copy(os.environ.copy()) @@ -72,6 +73,9 @@ def start_local_trainers(cluster, "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()) } + if not eager_mode: + proc_env["FLAGS_enable_eager_mode"] = "%d" % 0 + current_env.update(proc_env) print("trainer proc env:{}".format(current_env)) @@ -99,7 +103,7 @@ def start_local_trainers(cluster, class TestMultipleGpus(unittest.TestCase): - def run_mnist_2gpu(self, target_file_name): + def run_mnist_2gpu(self, target_file_name, eager_mode=True): if fluid.core.get_cuda_device_count() == 0: return @@ -112,6 +116,7 @@ class TestMultipleGpus(unittest.TestCase): procs = start_local_trainers( cluster, pod, + eager_mode=eager_mode, training_script=target_file_name, training_script_args=[]) @@ -125,13 +130,17 @@ class TestMultipleGpus(unittest.TestCase): def test_hapi_multiple_gpus_static(self): self.run_mnist_2gpu('dist_hapi_mnist_static.py') + self.run_mnist_2gpu('dist_hapi_mnist_static.py', eager_mode=False) def test_hapi_multiple_gpus_dynamic(self): self.run_mnist_2gpu('dist_hapi_mnist_dynamic.py') + self.run_mnist_2gpu('dist_hapi_mnist_dynamic.py', eager_mode=False) def test_hapi_amp_static(self): self.run_mnist_2gpu('dist_hapi_pure_fp16_static.py') + self.run_mnist_2gpu('dist_hapi_pure_fp16_static.py', eager_mode=False) if __name__ == "__main__": + os.environ["FLAGS_enable_eager_mode"] = "1" unittest.main()