From 9261dff44cead82bbfa7199fc81bd6471aef198f Mon Sep 17 00:00:00 2001 From: zn <96479180+kangna-qi@users.noreply.github.com> Date: Fri, 25 Mar 2022 21:33:41 +0800 Subject: [PATCH] [MLU]add allreduce max/prod/min mlu kernel (#40792) --- .../collective/c_allreduce_max_op_mlu.cc | 26 +++++++++ .../collective/c_allreduce_min_op_mlu.cc | 26 +++++++++ .../collective/c_allreduce_prod_op_mlu.cc | 26 +++++++++ .../fluid/tests/unittests/mlu/CMakeLists.txt | 5 +- .../unittests/mlu/collective_allgather_op.py | 6 +- .../unittests/mlu/collective_allreduce_op.py | 8 +-- .../unittests/mlu/collective_broadcast_op.py | 4 +- ...ce.py => test_collective_allreduce_max.py} | 25 +++++---- .../mlu/test_collective_allreduce_min.py | 56 +++++++++++++++++++ .../mlu/test_collective_allreduce_prod.py | 56 +++++++++++++++++++ .../mlu/test_collective_allreduce_sum.py | 56 +++++++++++++++++++ .../unittests/mlu/test_collective_base_mlu.py | 39 ++++++++++--- 12 files changed, 304 insertions(+), 29 deletions(-) create mode 100644 paddle/fluid/operators/collective/c_allreduce_max_op_mlu.cc create mode 100644 paddle/fluid/operators/collective/c_allreduce_min_op_mlu.cc create mode 100644 paddle/fluid/operators/collective/c_allreduce_prod_op_mlu.cc mode change 100644 => 100755 python/paddle/fluid/tests/unittests/mlu/collective_broadcast_op.py rename python/paddle/fluid/tests/unittests/mlu/{test_collective_allreduce.py => test_collective_allreduce_max.py} (81%) create mode 100644 python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_min.py create mode 100644 python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_prod.py create mode 100644 python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_sum.py diff --git a/paddle/fluid/operators/collective/c_allreduce_max_op_mlu.cc b/paddle/fluid/operators/collective/c_allreduce_max_op_mlu.cc new file mode 100644 index 0000000000..4265f7a677 --- /dev/null +++ b/paddle/fluid/operators/collective/c_allreduce_max_op_mlu.cc @@ -0,0 +1,26 @@ +/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/c_allreduce_op.h" + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_MLU_KERNEL(c_allreduce_max, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel) diff --git a/paddle/fluid/operators/collective/c_allreduce_min_op_mlu.cc b/paddle/fluid/operators/collective/c_allreduce_min_op_mlu.cc new file mode 100644 index 0000000000..da94e18a2f --- /dev/null +++ b/paddle/fluid/operators/collective/c_allreduce_min_op_mlu.cc @@ -0,0 +1,26 @@ +/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/c_allreduce_op.h" + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_MLU_KERNEL(c_allreduce_min, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel) diff --git a/paddle/fluid/operators/collective/c_allreduce_prod_op_mlu.cc b/paddle/fluid/operators/collective/c_allreduce_prod_op_mlu.cc new file mode 100644 index 0000000000..a3af972a95 --- /dev/null +++ b/paddle/fluid/operators/collective/c_allreduce_prod_op_mlu.cc @@ -0,0 +1,26 @@ +/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/c_allreduce_op.h" + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_MLU_KERNEL(c_allreduce_prod, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel) diff --git a/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt b/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt index 3403782252..51fc8b3307 100644 --- a/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt @@ -21,7 +21,10 @@ if (WITH_MLU) bash_test_modules(test_launch_nproc_mlu START_BASH test_launch_nproc_mlu.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) bash_test_modules(test_c_comm_init_op_mlu START_BASH test_c_comm_init_op_mlu.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) set_tests_properties(test_collective_broadcast PROPERTIES TIMEOUT 120) - set_tests_properties(test_collective_allreduce PROPERTIES TIMEOUT 120) + set_tests_properties(test_collective_allreduce_sum PROPERTIES TIMEOUT 120) + set_tests_properties(test_collective_allreduce_max PROPERTIES TIMEOUT 120) + set_tests_properties(test_collective_allreduce_min PROPERTIES TIMEOUT 120) + set_tests_properties(test_collective_allreduce_prod PROPERTIES TIMEOUT 120) set_tests_properties(test_collective_allgather PROPERTIES TIMEOUT 120) set_tests_properties(test_collective_broadcast_api_mlu PROPERTIES TIMEOUT 120) set_tests_properties(test_collective_allreduce_api_mlu PROPERTIES TIMEOUT 120) diff --git a/python/paddle/fluid/tests/unittests/mlu/collective_allgather_op.py b/python/paddle/fluid/tests/unittests/mlu/collective_allgather_op.py index 1058514f9c..f67b3fbcc6 100755 --- a/python/paddle/fluid/tests/unittests/mlu/collective_allgather_op.py +++ b/python/paddle/fluid/tests/unittests/mlu/collective_allgather_op.py @@ -41,14 +41,14 @@ class TestCollectiveAllgather(TestCollectiveRunnerBase): def __init__(self): self.global_ring_id = 0 - def get_model(self, main_prog, startup_program): + def get_model(self, main_prog, startup_program, col_type): ring_id = 0 nranks = 2 with fluid.program_guard(main_prog, startup_program): tindata = layers.data( name="tindata", shape=[10, 1000], dtype='float32') toutdata = main_prog.current_block().create_var( - name="outofgather", + name="outofallgather", dtype='float32', type=core.VarDesc.VarType.LOD_TENSOR, persistable=False, @@ -68,4 +68,4 @@ class TestCollectiveAllgather(TestCollectiveRunnerBase): if __name__ == "__main__": - runtime_main(TestCollectiveAllgather, "allgather", 0) + runtime_main(TestCollectiveAllgather) diff --git a/python/paddle/fluid/tests/unittests/mlu/collective_allreduce_op.py b/python/paddle/fluid/tests/unittests/mlu/collective_allreduce_op.py index 0371e1bbb2..404ed1235d 100644 --- a/python/paddle/fluid/tests/unittests/mlu/collective_allreduce_op.py +++ b/python/paddle/fluid/tests/unittests/mlu/collective_allreduce_op.py @@ -42,19 +42,19 @@ class TestCollectiveAllreduce(TestCollectiveRunnerBase): def __init__(self): self.global_ring_id = 0 - def get_model(self, main_prog, startup_program): + def get_model(self, main_prog, startup_program, col_type): ring_id = 0 with fluid.program_guard(main_prog, startup_program): tindata = layers.data( name="tindata", shape=[10, 1000], dtype='float32') toutdata = main_prog.current_block().create_var( - name="outofallreduce", + name="outof" + col_type, dtype='float32', type=core.VarDesc.VarType.LOD_TENSOR, persistable=False, stop_gradient=False) main_prog.global_block().append_op( - type="c_allreduce_sum", + type="c_" + col_type, inputs={'X': tindata}, attrs={'ring_id': ring_id}, outputs={'Out': toutdata}) @@ -67,4 +67,4 @@ class TestCollectiveAllreduce(TestCollectiveRunnerBase): if __name__ == "__main__": - runtime_main(TestCollectiveAllreduce, "allreduce", 0) + runtime_main(TestCollectiveAllreduce) diff --git a/python/paddle/fluid/tests/unittests/mlu/collective_broadcast_op.py b/python/paddle/fluid/tests/unittests/mlu/collective_broadcast_op.py old mode 100644 new mode 100755 index d4f32b5f52..49bc6a6c4b --- a/python/paddle/fluid/tests/unittests/mlu/collective_broadcast_op.py +++ b/python/paddle/fluid/tests/unittests/mlu/collective_broadcast_op.py @@ -42,7 +42,7 @@ class TestCollectiveBroadcast(TestCollectiveRunnerBase): def __init__(self): self.global_ring_id = 0 - def get_model(self, main_prog, startup_program): + def get_model(self, main_prog, startup_program, col_type): ring_id = 0 rootid = 1 with fluid.program_guard(main_prog, startup_program): @@ -69,4 +69,4 @@ class TestCollectiveBroadcast(TestCollectiveRunnerBase): if __name__ == "__main__": - runtime_main(TestCollectiveBroadcast, "broadcast", 0) + runtime_main(TestCollectiveBroadcast) diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_max.py similarity index 81% rename from python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce.py rename to python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_max.py index 5fd5db7a60..bd04e6e2dc 100644 --- a/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce.py +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_max.py @@ -27,27 +27,28 @@ class TestCAllreduceOp(TestDistBase): def _setup_config(self): pass - def test_allreduce_fp32(self): - self.check_with_place("collective_allreduce_op.py", "allreduce", + def test_allreduce_max_fp32(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_max", "float32") - def test_allreduce_fp16(self): - self.check_with_place("collective_allreduce_op.py", "allreduce", + def test_allreduce_max_fp16(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_max", "float16") - def test_allreduce_int32(self): - self.check_with_place("collective_allreduce_op.py", "allreduce", + def test_allreduce_max_int32(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_max", "int32") - def test_allreduce_int16(self): - self.check_with_place("collective_allreduce_op.py", "allreduce", + def test_allreduce_max_int16(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_max", "int16") - def test_allreduce_int8(self): - self.check_with_place("collective_allreduce_op.py", "allreduce", "int8") + def test_allreduce_max_int8(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_max", + "int8") - def test_allreduce_uint8(self): - self.check_with_place("collective_allreduce_op.py", "allreduce", + def test_allreduce_max_uint8(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_max", "uint8") diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_min.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_min.py new file mode 100644 index 0000000000..4b16146e2e --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_min.py @@ -0,0 +1,56 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import sys +import unittest +import numpy as np +import paddle + +from test_collective_base_mlu import TestDistBase + +paddle.enable_static() + + +class TestCAllreduceOp(TestDistBase): + def _setup_config(self): + pass + + def test_allreduce_min_fp32(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_min", + "float32") + + def test_allreduce_min_fp16(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_min", + "float16") + + def test_allreduce_min_int32(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_min", + "int32") + + def test_allreduce_min_int16(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_min", + "int16") + + def test_allreduce_min_int8(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_min", + "int8") + + def test_allreduce_min_uint8(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_min", + "uint8") + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_prod.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_prod.py new file mode 100644 index 0000000000..0c6ea566cf --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_prod.py @@ -0,0 +1,56 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import sys +import unittest +import numpy as np +import paddle + +from test_collective_base_mlu import TestDistBase + +paddle.enable_static() + + +class TestCAllreduceOp(TestDistBase): + def _setup_config(self): + pass + + def test_allreduce_prod_fp32(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_prod", + "float32") + + def test_allreduce_prod_fp16(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_prod", + "float16") + + def test_allreduce_prod_int32(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_prod", + "int32") + + def test_allreduce_prod_int16(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_prod", + "int16") + + def test_allreduce_prod_int8(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_prod", + "int8") + + def test_allreduce_prod_uint8(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_prod", + "uint8") + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_sum.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_sum.py new file mode 100644 index 0000000000..a7a3984f4e --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce_sum.py @@ -0,0 +1,56 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import sys +import unittest +import numpy as np +import paddle + +from test_collective_base_mlu import TestDistBase + +paddle.enable_static() + + +class TestCAllreduceOp(TestDistBase): + def _setup_config(self): + pass + + def test_allreduce_sum_fp32(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_sum", + "float32") + + def test_allreduce_sum_fp16(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_sum", + "float16") + + def test_allreduce_sum_int32(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_sum", + "int32") + + def test_allreduce_sum_int16(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_sum", + "int16") + + def test_allreduce_sum_int8(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_sum", + "int8") + + def test_allreduce_sum_uint8(self): + self.check_with_place("collective_allreduce_op.py", "allreduce_sum", + "uint8") + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_base_mlu.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_base_mlu.py index 9c2e2205eb..5295296897 100644 --- a/python/paddle/fluid/tests/unittests/mlu/test_collective_base_mlu.py +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_base_mlu.py @@ -53,7 +53,7 @@ def DataTypeCast(date_type): class TestCollectiveRunnerBase(object): - def get_model(self, train_prog, startup_prog): + def get_model(self, train_prog, startup_prog, col_type): raise NotImplementedError( "get model should be implemented by child class.") @@ -129,7 +129,7 @@ class TestCollectiveRunnerBase(object): self.initCommunicator(startup_prog, rank, nranks, True, current_endpoint, endpoints) self.rank = rank - result = self.get_model(train_prog, startup_prog) + result = self.get_model(train_prog, startup_prog, args["col_type"]) device_id = int(os.getenv("FLAGS_selected_mlus", "0")) place = fluid.MLUPlace(device_id) exe = fluid.Executor(place) @@ -143,7 +143,7 @@ class TestCollectiveRunnerBase(object): sys.stdout.buffer.write(pickle.dumps(out)) -def runtime_main(test_class, col_type, sub_type): +def runtime_main(test_class): args = {} model = test_class() args["deviceid"] = os.getenv("FLAGS_selected_mlus") @@ -151,7 +151,7 @@ def runtime_main(test_class, col_type, sub_type): args["trainernum"] = int(os.getenv("PADDLE_TRAINERS_NUM")) args["endpoints"] = os.getenv('PADDLE_TRAINER_ENDPOINTS') args["currentendpoint"] = os.getenv("PADDLE_CURRENT_ENDPOINT") - args["col_type"] = col_type + args["col_type"] = os.getenv("COL_TYPE") args["data_type"] = os.getenv("DATA_TYPE") model.run_trainer(args) @@ -185,7 +185,7 @@ class TestDistBase(unittest.TestCase): def _run_cluster(self, model_file, envs): worker_endpoints = self._ps_endpoints.split(",") w0_ep, w1_ep = worker_endpoints - #print("w0_ep:",w0_ep," w1_ep:",w1_ep) + env0 = { "FLAGS_selected_mlus": "0", "PADDLE_TRAINER_ID": "0", @@ -209,7 +209,7 @@ class TestDistBase(unittest.TestCase): tr1_cmd = tr_cmd % (self._python_interp, model_file) tr0_pipe = open("/tmp/tr0_err.log", "wb") tr1_pipe = open("/tmp/tr1_err.log", "wb") - #print(tr0_cmd) + tr0_proc = subprocess.Popen( tr0_cmd.strip().split(), stdout=subprocess.PIPE, @@ -246,6 +246,7 @@ class TestDistBase(unittest.TestCase): "LD_PRELOAD": os.getenv("LD_PRELOAD", ""), "GLOG_v": "3", "DATA_TYPE": data_type, + "COL_TYPE": col_type, } required_envs.update(need_envs) if check_error_log: @@ -262,7 +263,7 @@ class TestDistBase(unittest.TestCase): need_result = input2 self.assertTrue(np.allclose(tr0_out, need_result)) self.assertTrue(np.allclose(tr1_out, need_result)) - elif col_type == "allreduce": + elif col_type == "allreduce_sum": need_result = input1 + input2 self.assertTrue( np.allclose( @@ -270,6 +271,30 @@ class TestDistBase(unittest.TestCase): self.assertTrue( np.allclose( tr1_out, need_result, rtol=1e-05, atol=1e-05)) + elif col_type == "allreduce_prod": + need_result = input1 * input2 + self.assertTrue( + np.allclose( + tr0_out, need_result, rtol=1e-05, atol=1e-05)) + self.assertTrue( + np.allclose( + tr1_out, need_result, rtol=1e-05, atol=1e-05)) + elif col_type == "allreduce_max": + need_result = np.maximum(input1, input2) + self.assertTrue( + np.allclose( + tr0_out, need_result, rtol=1e-05, atol=1e-05)) + self.assertTrue( + np.allclose( + tr1_out, need_result, rtol=1e-05, atol=1e-05)) + elif col_type == "allreduce_min": + need_result = np.minimum(input1, input2) + self.assertTrue( + np.allclose( + tr0_out, need_result, rtol=1e-05, atol=1e-05)) + self.assertTrue( + np.allclose( + tr1_out, need_result, rtol=1e-05, atol=1e-05)) elif col_type == "allgather": need_result = np.vstack((input1, input2)) self.assertTrue(np.allclose(tr0_out, need_result)) -- GitLab