diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index a25cba029dd8bac81d6b00c1d9fb710f421ce9d0..cdd94a1fa031471e97ee37a5da50ae6a51a9316c 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -57,6 +57,7 @@ if(NOT WITH_GPU OR WIN32) LIST(REMOVE_ITEM TEST_OPS test_allreduce) LIST(REMOVE_ITEM TEST_OPS test_broadcast) LIST(REMOVE_ITEM TEST_OPS test_collective_reduce) + LIST(REMOVE_ITEM TEST_OPS test_collective_sendrecv) LIST(REMOVE_ITEM TEST_OPS test_collective_scatter) LIST(REMOVE_ITEM TEST_OPS test_collective_reduce_api) LIST(REMOVE_ITEM TEST_OPS test_collective_scatter_api) diff --git a/python/paddle/fluid/tests/unittests/collective_sendrecv_op.py b/python/paddle/fluid/tests/unittests/collective_sendrecv_op.py new file mode 100644 index 0000000000000000000000000000000000000000..822bd3c4293d7d483f8c98890acf7f507723d6e5 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/collective_sendrecv_op.py @@ -0,0 +1,69 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import numpy as np +import argparse +import os +import sys +import signal +import time +import socket +from contextlib import closing +from six import string_types +import math +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +import paddle.fluid.unique_name as nameGen +from paddle.fluid import core +import unittest +from multiprocessing import Process +import paddle.fluid.layers as layers +from functools import reduce +from test_collective_base import TestCollectiveRunnerBase, runtime_main + + +class TestCollectiveScatter(TestCollectiveRunnerBase): + def __init__(self): + self.global_ring_id = 0 + + def get_model(self, main_prog, startup_program, rank=None): + ring_id = 0 + with fluid.program_guard(main_prog, startup_program): + tindata = layers.data( + name="tindata", shape=[10, 1000], dtype='float32') + if rank == 0: + main_prog.global_block().append_op( + type="c_recv", + inputs={'Out': tindata}, + attrs={'ring_id': ring_id, + 'peer': 1}) + else: + main_prog.global_block().append_op( + type="c_send", + inputs={'X': tindata}, + attrs={'ring_id': ring_id, + 'peer': 0}) + main_prog.global_block().append_op( + type="c_sync_comm_stream", + inputs={'X': toutdata}, + outputs={'Out': toutdata}, + attrs={'ring_id': ring_id}) + return tindata + + +if __name__ == "__main__": + runtime_main(TestCollectiveScatter, "scatter", 0) diff --git a/python/paddle/fluid/tests/unittests/test_collective_base.py b/python/paddle/fluid/tests/unittests/test_collective_base.py index 512b2967e02fd01e67f416c2fd9222ae8589d8d8..e97be6b7a557ee17fd48837f81a658edc70f0555 100644 --- a/python/paddle/fluid/tests/unittests/test_collective_base.py +++ b/python/paddle/fluid/tests/unittests/test_collective_base.py @@ -103,7 +103,7 @@ class TestCollectiveRunnerBase(object): nranks = 2 self.initCommunicator(startup_prog, rank, nranks, True, current_endpoint, endpoints) - result = self.get_model(train_prog, startup_prog) + result = self.get_model(train_prog, startup_prog, rank) device_id = int(os.getenv("FLAGS_selected_gpus", "0")) place = fluid.CUDAPlace( device_id) #if args.use_gpu else fluid.CPUPlace() @@ -258,6 +258,9 @@ class TestDistBase(unittest.TestCase): self.assertTrue( np.allclose( tr1_out, need_result, rtol=1e-05, atol=1e-05)) + elif col_type == "sendrecv": + need_result = input2 + self.assertTrue(np.allclose(tr0_out, need_result)) elif col_type == "reduce_scatter": tmp = input1 + input2 need_result1 = tmp[0:tmp.shape[0] // 2] diff --git a/python/paddle/fluid/tests/unittests/test_collective_sendrecv.py b/python/paddle/fluid/tests/unittests/test_collective_sendrecv.py new file mode 100644 index 0000000000000000000000000000000000000000..ed6596faa494ecd929765fbff0ecfa1f53313fd0 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_collective_sendrecv.py @@ -0,0 +1,31 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import unittest +import numpy as np + +from test_collective_base import TestDistBase + + +class TestCScatterOp(TestDistBase): + def _setup_config(self): + pass + + def test_sendrecv(self): + self.check_with_place("collective_sendrecv_op.py", "sendrecv") + + +if __name__ == '__main__': + unittest.main()