diff --git a/paddle/fluid/operators/collective/c_allreduce_max_op_mlu.cc b/paddle/fluid/operators/collective/c_allreduce_max_op_mlu.cc new file mode 100644 index 0000000000000000000000000000000000000000..4265f7a67779b73a207664d4d80da31abab33fd7 --- /dev/null +++ b/paddle/fluid/operators/collective/c_allreduce_max_op_mlu.cc @@ -0,0 +1,26 @@ +/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/c_allreduce_op.h" + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_MLU_KERNEL(c_allreduce_max, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel) diff --git a/paddle/fluid/operators/collective/c_allreduce_min_op_mlu.cc b/paddle/fluid/operators/collective/c_allreduce_min_op_mlu.cc new file mode 100644 index 0000000000000000000000000000000000000000..da94e18a2f7b4e78c3378bf9e4073dd5811bc70b --- /dev/null +++ b/paddle/fluid/operators/collective/c_allreduce_min_op_mlu.cc @@ -0,0 +1,26 @@ +/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/c_allreduce_op.h" + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_MLU_KERNEL(c_allreduce_min, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel) diff --git a/paddle/fluid/operators/collective/c_allreduce_prod_op_mlu.cc b/paddle/fluid/operators/collective/c_allreduce_prod_op_mlu.cc new file mode 100644 index 0000000000000000000000000000000000000000..a3af972a9529027ee7f9cfc3dbc30635e9514612 --- /dev/null +++ b/paddle/fluid/operators/collective/c_allreduce_prod_op_mlu.cc @@ -0,0 +1,26 @@ +/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/c_allreduce_op.h" + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_MLU_KERNEL(c_allreduce_prod, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel) diff --git a/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt b/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt index 340378225261ff829e4faa334a471cea8e88f4d9..51fc8b3307bd16e709cac6a843113bcef79448d8 100644 --- a/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt @@ -21,7 +21,10 @@ if (WITH_MLU) bash_test_modules(test_launch_nproc_mlu START_BASH test_launch_nproc_mlu.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_c_comm_init_op_mlu START_BASH test_c_comm_init_op_mlu.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) set_tests_properties(test_collective_broadcast PROPERTIES TIMEOUT 120) - set_tests_properties(test_collective_allreduce PROPERTIES TIMEOUT 120) + set_tests_properties(test_collective_allreduce_sum PROPERTIES TIMEOUT 120) + set_tests_properties(test_collective_allreduce_max PROPERTIES TIMEOUT 120) + set_tests_properties(test_collective_allreduce_min PROPERTIES TIMEOUT 120) + set_tests_properties(test_collective_allreduce_prod PROPERTIES TIMEOUT 120) set_tests_properties(test_collective_allgather PROPERTIES TIMEOUT 120) set_tests_properties(test_collective_broadcast_api_mlu PROPERTIES TIMEOUT 120) set_tests_properties(test_collective_allreduce_api_mlu PROPERTIES TIMEOUT 120) diff --git a/python/paddle/fluid/tests/unittests/mlu/collective_allgather_op.py b/python/paddle/fluid/tests/unittests/mlu/collective_allgather_op.py index 1058514f9ca24c6c9be8e6e48499c81c158a82ca..f67b3fbcc6a80cfef4a4ad9c1438d80f7758b268 100755 --- a/python/paddle/fluid/tests/unittests/mlu/collective_allgather_op.py +++ b/python/paddle/fluid/tests/unittests/mlu/collective_allgather_op.py @@ -41,14 +41,14 @@ class TestCollectiveAllgather(TestCollectiveRunnerBase): def __init__(self): self.global_ring_id = 0 - def get_model(self, main_prog, startup_program): + def get_model(self, main_prog, startup_program, col_type): ring_id = 0 nranks = 2 with fluid.program_guard(main_prog, startup_program): tindata = layers.data( name="tindata", shape=[10, 1000], dtype='float32') toutdata = main_prog.current_block().create_var( - name="outofgather", + name="outofallgather", dtype='float32', type=core.VarDesc.VarType.LOD_TENSOR, persistable=False, @@ -68,4 +68,4 @@ class TestCollectiveAllgather(TestCollectiveRunnerBase): if __name__ == "__main__": - runtime_main(TestCollectiveAllgather, "allgather", 0) + runtime_main(TestCollectiveAllgather) diff --git a/python/paddle/fluid/tests/unittests/mlu/collective_allreduce_op.py b/python/paddle/fluid/tests/unittests/mlu/collective_allreduce_op.py index 0371e1bbb24061340209194dc720f64d1b3c39e3..404ed1235d2aee6e75c9eccf3ab8c15b7ce1de5e 100644 --- a/python/paddle/fluid/tests/unittests/mlu/collective_allreduce_op.py +++ b/python/paddle/fluid/tests/unittests/mlu/collective_allreduce_op.py @@ -42,19 +42,19 @@ class TestCollectiveAllreduce(TestCollectiveRunnerBase): def __init__(self): self.global_ring_id = 0 - def get_model(self, main_prog, startup_program): + def get_model(self, main_prog, startup_program, col_type): ring_id = 0 with fluid.program_guard(main_prog, startup_program): tindata = layers.data( name="tindata", shape=[10, 1000], dtype='float32') toutdata = main_prog.current_block().create_var( - name="outofallreduce", + name="outof" + col_type, dtype='float32', type=core.VarDesc.VarType.LOD_TENSOR, persistable=False, stop_gradient=False) main_prog.global_block().append_op( - type="c_allreduce_sum", + type="c_" + col_type, inputs={'X': tindata}, attrs={'ring_id': ring_id}, outputs={'Out': toutdata}) @@ -67,4 +67,4 @@ class TestCollectiveAllreduce(TestCollectiveRunnerBase): if __name__ == "__main__": - runtime_main(TestCollectiveAllreduce, "allreduce", 0) + runtime_main(TestCollectiveAllreduce) diff --git a/python/paddle/fluid/tests/unittests/mlu/collective_broadcast_op.py b/python/paddle/fluid/tests/unittests/mlu/collective_broadcast_op.py old mode 100644 new mode 100755 index d4f32b5f5245d65b84f3459f9a6ea86ce2ae3c29..49bc6a6c4bb247c68f9abe78a3890ae9040172ee --- a/python/paddle/fluid/tests/unittests/mlu/collective_broadcast_op.py +++ b/python/paddle/fluid/tests/unittests/mlu/collective_broadcast_op.py @@ -42,7 +42,7 @@ class TestCollectiveBroadcast(TestCollectiveRunnerBase): def __init__(self): self.global_ring_id = 0 - def get_model(self, main_prog, startup_program): + def get_model(self, main_prog, startup_program, col_type): ring_id = 0 rootid = 1 with fluid.program_guard(main_prog, startup_program): @@ -69,4 +69,4 @@ class TestCollectiveBroadcast(TestCollectiveRunnerBase): if __name__ == "__main__": - runtime_main(TestCollectiveBroadcast, "broadcast", 0) + runtime_main(TestCollectiveBroadcast) diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_max.py similarity index 81% rename from python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce.py rename to python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_max.py index 5fd5db7a604d56fe427792be04d804250642dda0..bd04e6e2dc6afa58341645ec3e13cb4643f80a11 100644 --- a/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce.py +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_max.py @@ -27,27 +27,28 @@ class TestCAllreduceOp(TestDistBase): def _setup_config(self): pass - def test_allreduce_fp32(self): - self.check_with_place("collective_allreduce_op.py", "allreduce", + def test_allreduce_max_fp32(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_max", "float32") - def test_allreduce_fp16(self): - self.check_with_place("collective_allreduce_op.py", "allreduce", + def test_allreduce_max_fp16(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_max", "float16") - def test_allreduce_int32(self): - self.check_with_place("collective_allreduce_op.py", "allreduce", + def test_allreduce_max_int32(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_max", "int32") - def test_allreduce_int16(self): - self.check_with_place("collective_allreduce_op.py", "allreduce", + def test_allreduce_max_int16(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_max", "int16") - def test_allreduce_int8(self): - self.check_with_place("collective_allreduce_op.py", "allreduce", "int8") + def test_allreduce_max_int8(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_max", + "int8") - def test_allreduce_uint8(self): - self.check_with_place("collective_allreduce_op.py", "allreduce", + def test_allreduce_max_uint8(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_max", "uint8") diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_min.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_min.py new file mode 100644 index 0000000000000000000000000000000000000000..4b16146e2ee2e8fc1ea3e8c0fe0e9e0d933dbae7 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_min.py @@ -0,0 +1,56 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import sys +import unittest +import numpy as np +import paddle + +from test_collective_base_mlu import TestDistBase + +paddle.enable_static() + + +class TestCAllreduceOp(TestDistBase): + def _setup_config(self): + pass + + def test_allreduce_min_fp32(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_min", + "float32") + + def test_allreduce_min_fp16(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_min", + "float16") + + def test_allreduce_min_int32(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_min", + "int32") + + def test_allreduce_min_int16(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_min", + "int16") + + def test_allreduce_min_int8(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_min", + "int8") + + def test_allreduce_min_uint8(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_min", + "uint8") + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_prod.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_prod.py new file mode 100644 index 0000000000000000000000000000000000000000..0c6ea566cfa946ca53d1e3cecc2a06d548f99c1f --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_prod.py @@ -0,0 +1,56 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import sys +import unittest +import numpy as np +import paddle + +from test_collective_base_mlu import TestDistBase + +paddle.enable_static() + + +class TestCAllreduceOp(TestDistBase): + def _setup_config(self): + pass + + def test_allreduce_prod_fp32(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_prod", + "float32") + + def test_allreduce_prod_fp16(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_prod", + "float16") + + def test_allreduce_prod_int32(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_prod", + "int32") + + def test_allreduce_prod_int16(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_prod", + "int16") + + def test_allreduce_prod_int8(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_prod", + "int8") + + def test_allreduce_prod_uint8(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_prod", + "uint8") + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_sum.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_sum.py new file mode 100644 index 0000000000000000000000000000000000000000..a7a3984f4e55e9283784127c0dded61c4439afce --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_sum.py @@ -0,0 +1,56 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import sys +import unittest +import numpy as np +import paddle + +from test_collective_base_mlu import TestDistBase + +paddle.enable_static() + + +class TestCAllreduceOp(TestDistBase): + def _setup_config(self): + pass + + def test_allreduce_sum_fp32(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_sum", + "float32") + + def test_allreduce_sum_fp16(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_sum", + "float16") + + def test_allreduce_sum_int32(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_sum", + "int32") + + def test_allreduce_sum_int16(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_sum", + "int16") + + def test_allreduce_sum_int8(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_sum", + "int8") + + def test_allreduce_sum_uint8(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_sum", + "uint8") + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_base_mlu.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_base_mlu.py index 9c2e2205eb8761f93f4d16ab8934c9f1f256d92d..52952968977b022a5bf01adbc6bd3c7bda209a6f 100644 --- a/python/paddle/fluid/tests/unittests/mlu/test_collective_base_mlu.py +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_base_mlu.py @@ -53,7 +53,7 @@ def DataTypeCast(date_type): class TestCollectiveRunnerBase(object): - def get_model(self, train_prog, startup_prog): + def get_model(self, train_prog, startup_prog, col_type): raise NotImplementedError( "get model should be implemented by child class.") @@ -129,7 +129,7 @@ class TestCollectiveRunnerBase(object): self.initCommunicator(startup_prog, rank, nranks, True, current_endpoint, endpoints) self.rank = rank - result = self.get_model(train_prog, startup_prog) + result = self.get_model(train_prog, startup_prog, args["col_type"]) device_id = int(os.getenv("FLAGS_selected_mlus", "0")) place = fluid.MLUPlace(device_id) exe = fluid.Executor(place) @@ -143,7 +143,7 @@ class TestCollectiveRunnerBase(object): sys.stdout.buffer.write(pickle.dumps(out)) -def runtime_main(test_class, col_type, sub_type): +def runtime_main(test_class): args = {} model = test_class() args["deviceid"] = os.getenv("FLAGS_selected_mlus") @@ -151,7 +151,7 @@ def runtime_main(test_class, col_type, sub_type): args["trainernum"] = int(os.getenv("PADDLE_TRAINERS_NUM")) args["endpoints"] = os.getenv('PADDLE_TRAINER_ENDPOINTS') args["currentendpoint"] = os.getenv("PADDLE_CURRENT_ENDPOINT") - args["col_type"] = col_type + args["col_type"] = os.getenv("COL_TYPE") args["data_type"] = os.getenv("DATA_TYPE") model.run_trainer(args) @@ -185,7 +185,7 @@ class TestDistBase(unittest.TestCase): def _run_cluster(self, model_file, envs): worker_endpoints = self._ps_endpoints.split(",") w0_ep, w1_ep = worker_endpoints - #print("w0_ep:",w0_ep," w1_ep:",w1_ep) + env0 = { "FLAGS_selected_mlus": "0", "PADDLE_TRAINER_ID": "0", @@ -209,7 +209,7 @@ class TestDistBase(unittest.TestCase): tr1_cmd = tr_cmd % (self._python_interp, model_file) tr0_pipe = open("/tmp/tr0_err.log", "wb") tr1_pipe = open("/tmp/tr1_err.log", "wb") - #print(tr0_cmd) + tr0_proc = subprocess.Popen( tr0_cmd.strip().split(), stdout=subprocess.PIPE, @@ -246,6 +246,7 @@ class TestDistBase(unittest.TestCase): "LD_PRELOAD": os.getenv("LD_PRELOAD", ""), "GLOG_v": "3", "DATA_TYPE": data_type, + "COL_TYPE": col_type, } required_envs.update(need_envs) if check_error_log: @@ -262,7 +263,7 @@ class TestDistBase(unittest.TestCase): need_result = input2 self.assertTrue(np.allclose(tr0_out, need_result)) self.assertTrue(np.allclose(tr1_out, need_result)) - elif col_type == "allreduce": + elif col_type == "allreduce_sum": need_result = input1 + input2 self.assertTrue( np.allclose( @@ -270,6 +271,30 @@ class TestDistBase(unittest.TestCase): self.assertTrue( np.allclose( tr1_out, need_result, rtol=1e-05, atol=1e-05)) + elif col_type == "allreduce_prod": + need_result = input1 * input2 + self.assertTrue( + np.allclose( + tr0_out, need_result, rtol=1e-05, atol=1e-05)) + self.assertTrue( + np.allclose( + tr1_out, need_result, rtol=1e-05, atol=1e-05)) + elif col_type == "allreduce_max": + need_result = np.maximum(input1, input2) + self.assertTrue( + np.allclose( + tr0_out, need_result, rtol=1e-05, atol=1e-05)) + self.assertTrue( + np.allclose( + tr1_out, need_result, rtol=1e-05, atol=1e-05)) + elif col_type == "allreduce_min": + need_result = np.minimum(input1, input2) + self.assertTrue( + np.allclose( + tr0_out, need_result, rtol=1e-05, atol=1e-05)) + self.assertTrue( + np.allclose( + tr1_out, need_result, rtol=1e-05, atol=1e-05)) elif col_type == "allgather": need_result = np.vstack((input1, input2)) self.assertTrue(np.allclose(tr0_out, need_result))