From dc77382831103e25bc17cc8f2c3307041e348b73 Mon Sep 17 00:00:00 2001 From: zn <96479180+kangna-qi@users.noreply.github.com> Date: Fri, 11 Mar 2022 16:38:47 +0800 Subject: [PATCH] [MLU]add allgather_op mlu kernel (#40356) --- .../collective/c_allgather_op_mlu.cc | 81 +++++++++++++++++++ .../operators/collective/c_allreduce_op.h | 2 +- .../collective/c_broadcast_op_mlu.cc | 2 +- .../fluid/tests/unittests/mlu/CMakeLists.txt | 2 + .../unittests/mlu/collective_allgather_api.py | 55 +++++++++++++ .../unittests/mlu/collective_allgather_op.py | 71 ++++++++++++++++ .../mlu/test_collective_allgather.py | 55 +++++++++++++ .../mlu/test_collective_allgather_api_mlu.py | 43 ++++++++++ .../mlu/test_collective_api_base_mlu.py | 6 ++ .../unittests/mlu/test_collective_base_mlu.py | 4 + 10 files changed, 319 insertions(+), 2 deletions(-) create mode 100644 paddle/fluid/operators/collective/c_allgather_op_mlu.cc create mode 100755 python/paddle/fluid/tests/unittests/mlu/collective_allgather_api.py create mode 100755 python/paddle/fluid/tests/unittests/mlu/collective_allgather_op.py create mode 100644 python/paddle/fluid/tests/unittests/mlu/test_collective_allgather.py create mode 100755 python/paddle/fluid/tests/unittests/mlu/test_collective_allgather_api_mlu.py diff --git a/paddle/fluid/operators/collective/c_allgather_op_mlu.cc b/paddle/fluid/operators/collective/c_allgather_op_mlu.cc new file mode 100644 index 0000000000..f29bc57c9a --- /dev/null +++ b/paddle/fluid/operators/collective/c_allgather_op_mlu.cc @@ -0,0 +1,81 @@ +/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/c_allgather_op.h" + +#if defined(PADDLE_WITH_CNCL) +#include "paddle/fluid/platform/collective_helper.h" +#include "paddle/fluid/platform/device/mlu/cncl_helper.h" +#endif +#include "paddle/fluid/framework/convert_utils.h" + +namespace paddle { +namespace operators { + +template +class CAllGatherOpMLUKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext& ctx) const override { +#if defined(PADDLE_WITH_CNCL) + auto x = ctx.Input("X"); + auto out = ctx.Output("Out"); + cnclDataType_t dtype = + platform::ToCNCLDataType(framework::TransToProtoVarType(x->dtype())); + + int nranks = ctx.Attr("nranks"); + int rid = ctx.Attr("ring_id"); + auto place = ctx.GetPlace(); + auto comm = platform::CNCLCommContext::Instance().Get(rid, place); + PADDLE_ENFORCE_EQ( + nranks, comm->nranks(), + platform::errors::InvalidArgument("nranks: %s should equal to %s", + nranks, comm->nranks())); + + framework::DDim out_dims = x->dims(); + out_dims[0] *= nranks; + out->mutable_data(out_dims, place); + + uint32_t send_numel = x->numel(); + void* send_buff = reinterpret_cast(const_cast(x->data())); + void* recv_buff = reinterpret_cast(out->data()); + + mluStream stream = nullptr; + if (ctx.Attr("use_calc_stream")) { + auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); + stream = static_cast(dev_ctx)->stream(); + } else { + stream = comm->stream(); + } + + PADDLE_ENFORCE_MLU_SUCCESS(cnclAllGather(send_buff, recv_buff, send_numel, + dtype, comm->comm(), stream)); +#else + PADDLE_THROW(platform::errors::PreconditionNotMet( + "PaddlePaddle should compile with MLU.")); +#endif + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_MLU_KERNEL(c_allgather, ops::CAllGatherOpMLUKernel, + ops::CAllGatherOpMLUKernel, + ops::CAllGatherOpMLUKernel, + ops::CAllGatherOpMLUKernel, + ops::CAllGatherOpMLUKernel, + ops::CAllGatherOpMLUKernel); diff --git a/paddle/fluid/operators/collective/c_allreduce_op.h b/paddle/fluid/operators/collective/c_allreduce_op.h index 7e5120cd2b..2c4e85400c 100644 --- a/paddle/fluid/operators/collective/c_allreduce_op.h +++ b/paddle/fluid/operators/collective/c_allreduce_op.h @@ -413,7 +413,7 @@ class CAllReduceOpMLUKernel : public framework::OpKernel { auto place = ctx.GetPlace(); cnclDataType_t dtype = - platform::ToCNCLDataType(framework::TransToProtoVarType(in->type())); + platform::ToCNCLDataType(framework::TransToProtoVarType(in->dtype())); int64_t numel = in->numel(); const void* sendbuff = in->data(); out->Resize(in->dims()); diff --git a/paddle/fluid/operators/collective/c_broadcast_op_mlu.cc b/paddle/fluid/operators/collective/c_broadcast_op_mlu.cc index d315f21170..d1e269fb5a 100644 --- a/paddle/fluid/operators/collective/c_broadcast_op_mlu.cc +++ b/paddle/fluid/operators/collective/c_broadcast_op_mlu.cc @@ -31,7 +31,7 @@ class CBroadcastOPMLUKernel : public framework::OpKernel { auto out = ctx.Output("Out"); int numel = x->numel(); cnclDataType_t dtype = - platform::ToCNCLDataType(framework::TransToProtoVarType(x->type())); + platform::ToCNCLDataType(framework::TransToProtoVarType(x->dtype())); int rid = ctx.Attr("ring_id"); auto place = ctx.GetPlace(); diff --git a/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt b/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt index 17f5509bdb..3403782252 100644 --- a/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt @@ -22,8 +22,10 @@ if (WITH_MLU) bash_test_modules(test_c_comm_init_op_mlu START_BASH test_c_comm_init_op_mlu.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR}) set_tests_properties(test_collective_broadcast PROPERTIES TIMEOUT 120) set_tests_properties(test_collective_allreduce PROPERTIES TIMEOUT 120) + set_tests_properties(test_collective_allgather PROPERTIES TIMEOUT 120) set_tests_properties(test_collective_broadcast_api_mlu PROPERTIES TIMEOUT 120) set_tests_properties(test_collective_allreduce_api_mlu PROPERTIES TIMEOUT 120) + set_tests_properties(test_collective_allgather_api_mlu PROPERTIES TIMEOUT 120) set_tests_properties(test_c_comm_init_op_mlu PROPERTIES TIMEOUT 120) endif(WITH_CNCL) endif() diff --git a/python/paddle/fluid/tests/unittests/mlu/collective_allgather_api.py b/python/paddle/fluid/tests/unittests/mlu/collective_allgather_api.py new file mode 100755 index 0000000000..50ae6b1a16 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/collective_allgather_api.py @@ -0,0 +1,55 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import numpy as np +import argparse +import os +import sys +import signal +import time +import socket +from contextlib import closing +from six import string_types +import math +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +import paddle.fluid.unique_name as nameGen +from paddle.fluid import core +import unittest +from multiprocessing import Process +import paddle.fluid.layers as layers +from functools import reduce +from test_collective_api_base_mlu import TestCollectiveAPIRunnerBase, runtime_main + +paddle.enable_static() + + +class TestCollectiveAllgatherAPI(TestCollectiveAPIRunnerBase): + def __init__(self): + self.global_ring_id = 0 + + def get_model(self, main_prog, startup_program, rank): + with fluid.program_guard(main_prog, startup_program): + tensor_list = [] + tindata = layers.data( + name="tindata", shape=[10, 1000], dtype='float32') + paddle.distributed.all_gather(tensor_list, tindata) + return tensor_list + + +if __name__ == "__main__": + runtime_main(TestCollectiveAllgatherAPI, "allgather") diff --git a/python/paddle/fluid/tests/unittests/mlu/collective_allgather_op.py b/python/paddle/fluid/tests/unittests/mlu/collective_allgather_op.py new file mode 100755 index 0000000000..1058514f9c --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/collective_allgather_op.py @@ -0,0 +1,71 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import numpy as np +import argparse +import os +import sys +import signal +import time +from contextlib import closing +from six import string_types +import math +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +import paddle.fluid.unique_name as nameGen +from paddle.fluid import core +import unittest +from multiprocessing import Process +import paddle.fluid.layers as layers +from functools import reduce +from test_collective_base_mlu import TestCollectiveRunnerBase, runtime_main + +paddle.enable_static() + + +class TestCollectiveAllgather(TestCollectiveRunnerBase): + def __init__(self): + self.global_ring_id = 0 + + def get_model(self, main_prog, startup_program): + ring_id = 0 + nranks = 2 + with fluid.program_guard(main_prog, startup_program): + tindata = layers.data( + name="tindata", shape=[10, 1000], dtype='float32') + toutdata = main_prog.current_block().create_var( + name="outofgather", + dtype='float32', + type=core.VarDesc.VarType.LOD_TENSOR, + persistable=False, + stop_gradient=False) + main_prog.global_block().append_op( + type="c_allgather", + inputs={'X': tindata}, + attrs={'ring_id': ring_id, + 'nranks': nranks}, + outputs={'Out': toutdata}) + main_prog.global_block().append_op( + type="c_sync_comm_stream", + inputs={'X': toutdata}, + outputs={'Out': toutdata}, + attrs={'ring_id': ring_id}) + return toutdata + + +if __name__ == "__main__": + runtime_main(TestCollectiveAllgather, "allgather", 0) diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_allgather.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_allgather.py new file mode 100644 index 0000000000..09166e15aa --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_allgather.py @@ -0,0 +1,55 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import sys +import unittest +import numpy as np +import paddle + +from test_collective_base_mlu import TestDistBase + +paddle.enable_static() + + +class TestCAllgatherOp(TestDistBase): + def _setup_config(self): + pass + + def test_allgather_fp32(self): + self.check_with_place("collective_allgather_op.py", "allgather", + "float32") + + def test_allgather_fp16(self): + self.check_with_place("collective_allgather_op.py", "allgather", + "float16") + + def test_allgather_int32(self): + self.check_with_place("collective_allgather_op.py", "allgather", + "int32") + + def test_allgather_int16(self): + self.check_with_place("collective_allgather_op.py", "allgather", + "int16") + + def test_allgather_int8(self): + self.check_with_place("collective_allgather_op.py", "allgather", "int8") + + def test_allgather_uint8(self): + self.check_with_place("collective_allgather_op.py", "allgather", + "uint8") + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_allgather_api_mlu.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_allgather_api_mlu.py new file mode 100755 index 0000000000..576c310cc3 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_allgather_api_mlu.py @@ -0,0 +1,43 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import unittest +import numpy as np +import paddle + +from test_collective_api_base_mlu import TestDistBase + +paddle.enable_static() + + +class TestCollectiveAllgatherAPI(TestDistBase): + def _setup_config(self): + pass + + def test_allgather_cncl_fp16(self): + self.check_with_place("collective_allgather_api.py", "allgather", + "float16") + + def test_allgather_cncl_fp32(self): + self.check_with_place("collective_allgather_api.py", "allgather", + "float32") + + def test_allgather_cncl_int32(self): + self.check_with_place("collective_allgather_api.py", "allgather", + "int32") + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_api_base_mlu.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_api_base_mlu.py index 556fc6fcbb..3c1cf7d2d1 100644 --- a/python/paddle/fluid/tests/unittests/mlu/test_collective_api_base_mlu.py +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_api_base_mlu.py @@ -219,5 +219,11 @@ class TestDistBase(unittest.TestCase): self.assertTrue( np.allclose( tr1_out, need_result, rtol=1e-05, atol=1e-05)) + elif col_type == "allgather": + need_result = np.vstack((input1, input2)) + tr_out0 = np.vstack((tr0_out[0], tr0_out[1])) + tr_out1 = np.vstack((tr1_out[0], tr1_out[1])) + self.assertTrue(np.allclose(tr_out0, need_result)) + self.assertTrue(np.allclose(tr_out1, need_result)) else: pass diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_base_mlu.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_base_mlu.py index 4692c893d0..9c2e2205eb 100644 --- a/python/paddle/fluid/tests/unittests/mlu/test_collective_base_mlu.py +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_base_mlu.py @@ -270,5 +270,9 @@ class TestDistBase(unittest.TestCase): self.assertTrue( np.allclose( tr1_out, need_result, rtol=1e-05, atol=1e-05)) + elif col_type == "allgather": + need_result = np.vstack((input1, input2)) + self.assertTrue(np.allclose(tr0_out, need_result)) + self.assertTrue(np.allclose(tr1_out, need_result)) else: pass -- GitLab