diff --git a/paddle/fluid/operators/collective/c_allreduce_op.h b/paddle/fluid/operators/collective/c_allreduce_op.h index a04935d43eb2d8ab36749f0d2a35b09552001e7c..7e5120cd2b392b1eb0698727ccebac485193f6d9 100644 --- a/paddle/fluid/operators/collective/c_allreduce_op.h +++ b/paddle/fluid/operators/collective/c_allreduce_op.h @@ -23,8 +23,9 @@ limitations under the License. */ #include "paddle/fluid/memory/memory.h" #include "paddle/fluid/platform/device/npu/npu_op_runner.h" -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \ - defined(PADDLE_WITH_ASCEND_CL) || defined(PADDLE_WITH_XPU_BKCL) +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \ + defined(PADDLE_WITH_ASCEND_CL) || defined(PADDLE_WITH_XPU_BKCL) || \ + defined(PADDLE_WITH_CNCL) #include "paddle/fluid/platform/collective_helper.h" #endif @@ -45,6 +46,10 @@ limitations under the License. */ #include "paddle/fluid/platform/device/npu/hccl_helper.h" #endif +#if defined(PADDLE_WITH_CNCL) +#include "paddle/fluid/platform/device/mlu/cncl_helper.h" +#endif + #if defined(PADDLE_WITH_ASCEND_CL) DECLARE_bool(hccl_check_nan); #endif @@ -398,6 +403,65 @@ class CAllReduceOpCUDAKernel : public framework::OpKernel { } }; +template +class CAllReduceOpMLUKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext& ctx) const override { +#if defined(PADDLE_WITH_CNCL) + auto in = ctx.Input("X"); + auto out = ctx.Output("Out"); + + auto place = ctx.GetPlace(); + cnclDataType_t dtype = + platform::ToCNCLDataType(framework::TransToProtoVarType(in->type())); + int64_t numel = in->numel(); + const void* sendbuff = in->data(); + out->Resize(in->dims()); + void* recvbuff = out->mutable_data(place); + + int rid = ctx.Attr("ring_id"); + auto comm = platform::CNCLCommContext::Instance().Get(rid, place); + + mluStream stream = nullptr; + if (ctx.Attr("use_calc_stream")) { + auto dev_ctx = platform::DeviceContextPool::Instance().Get(place); + stream = static_cast(dev_ctx)->stream(); + } else { + stream = comm->stream(); + } + + cnclReduceOp_t cncl_red_type = cnclSum; + switch (red_type) { + case kRedSum: + cncl_red_type = cnclSum; + break; + + case kRedMax: + cncl_red_type = cnclMax; + break; + + case kRedMin: + cncl_red_type = cnclMin; + break; + + case kRedProd: + cncl_red_type = cnclProd; + break; + + default: + PADDLE_THROW(platform::errors::InvalidArgument( + "Invalid reduce type: %d", red_type)); + } + + PADDLE_ENFORCE_MLU_SUCCESS(cnclAllReduce( + sendbuff, recvbuff, numel, dtype, cncl_red_type, comm->comm(), stream)); +#else + PADDLE_THROW(platform::errors::PreconditionNotMet( + "PaddlePaddle should compile with MLU.")); +#endif + } +}; + class CAllReduceOpMaker : public framework::OpProtoAndCheckerMaker { public: void Make() { diff --git a/paddle/fluid/operators/collective/c_allreduce_sum_op_mlu.cc b/paddle/fluid/operators/collective/c_allreduce_sum_op_mlu.cc new file mode 100644 index 0000000000000000000000000000000000000000..4879696b3f47032dd30e35b2ffba05af8fa2f609 --- /dev/null +++ b/paddle/fluid/operators/collective/c_allreduce_sum_op_mlu.cc @@ -0,0 +1,26 @@ +/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/operators/collective/c_allreduce_op.h" + +namespace ops = paddle::operators; +namespace plat = paddle::platform; + +REGISTER_OP_MLU_KERNEL(c_allreduce_sum, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel, + ops::CAllReduceOpMLUKernel) diff --git a/paddle/fluid/operators/collective/c_broadcast_op_mlu.cc b/paddle/fluid/operators/collective/c_broadcast_op_mlu.cc index 123fb2aafb524d89901959e57d378838acfdf0af..d315f211709e4f76c2d5c685721961a91c2102fe 100644 --- a/paddle/fluid/operators/collective/c_broadcast_op_mlu.cc +++ b/paddle/fluid/operators/collective/c_broadcast_op_mlu.cc @@ -30,7 +30,8 @@ class CBroadcastOPMLUKernel : public framework::OpKernel { auto x = ctx.Input("X"); auto out = ctx.Output("Out"); int numel = x->numel(); - cnclDataType_t dtype = platform::ToCNCLDataType(x->type()); + cnclDataType_t dtype = + platform::ToCNCLDataType(framework::TransToProtoVarType(x->type())); int rid = ctx.Attr("ring_id"); auto place = ctx.GetPlace(); diff --git a/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt b/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt index 2e588355ce7939674fcddba58f477b19eac4dde2..41f3a31017e7fef78ed335ad3df934e9274cecea 100644 --- a/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/mlu/CMakeLists.txt @@ -6,4 +6,5 @@ if (WITH_MLU) py_test_modules(${TEST_OP} MODULES ${TEST_OP}) endforeach(TEST_OP) set_tests_properties(test_collective_broadcast PROPERTIES TIMEOUT 120) + set_tests_properties(test_collective_allreduce PROPERTIES TIMEOUT 120) endif() diff --git a/python/paddle/fluid/tests/unittests/mlu/collective_allreduce_op.py b/python/paddle/fluid/tests/unittests/mlu/collective_allreduce_op.py new file mode 100644 index 0000000000000000000000000000000000000000..0371e1bbb24061340209194dc720f64d1b3c39e3 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/collective_allreduce_op.py @@ -0,0 +1,70 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import numpy as np +import argparse +import os +import sys +import signal +import time +import socket +from contextlib import closing +from six import string_types +import math +import paddle +import paddle.fluid as fluid +import paddle.fluid.profiler as profiler +import paddle.fluid.unique_name as nameGen +from paddle.fluid import core +import unittest +from multiprocessing import Process +import paddle.fluid.layers as layers +from functools import reduce +from test_collective_base_mlu import TestCollectiveRunnerBase, runtime_main + +paddle.enable_static() + + +class TestCollectiveAllreduce(TestCollectiveRunnerBase): + def __init__(self): + self.global_ring_id = 0 + + def get_model(self, main_prog, startup_program): + ring_id = 0 + with fluid.program_guard(main_prog, startup_program): + tindata = layers.data( + name="tindata", shape=[10, 1000], dtype='float32') + toutdata = main_prog.current_block().create_var( + name="outofallreduce", + dtype='float32', + type=core.VarDesc.VarType.LOD_TENSOR, + persistable=False, + stop_gradient=False) + main_prog.global_block().append_op( + type="c_allreduce_sum", + inputs={'X': tindata}, + attrs={'ring_id': ring_id}, + outputs={'Out': toutdata}) + main_prog.global_block().append_op( + type="c_sync_comm_stream", + inputs={'X': toutdata}, + outputs={'Out': toutdata}, + attrs={'ring_id': ring_id}) + return toutdata + + +if __name__ == "__main__": + runtime_main(TestCollectiveAllreduce, "allreduce", 0) diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce.py new file mode 100644 index 0000000000000000000000000000000000000000..5fd5db7a604d56fe427792be04d804250642dda0 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_allreduce.py @@ -0,0 +1,55 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import sys +import unittest +import numpy as np +import paddle + +from test_collective_base_mlu import TestDistBase + +paddle.enable_static() + + +class TestCAllreduceOp(TestDistBase): + def _setup_config(self): + pass + + def test_allreduce_fp32(self): + self.check_with_place("collective_allreduce_op.py", "allreduce", + "float32") + + def test_allreduce_fp16(self): + self.check_with_place("collective_allreduce_op.py", "allreduce", + "float16") + + def test_allreduce_int32(self): + self.check_with_place("collective_allreduce_op.py", "allreduce", + "int32") + + def test_allreduce_int16(self): + self.check_with_place("collective_allreduce_op.py", "allreduce", + "int16") + + def test_allreduce_int8(self): + self.check_with_place("collective_allreduce_op.py", "allreduce", "int8") + + def test_allreduce_uint8(self): + self.check_with_place("collective_allreduce_op.py", "allreduce", + "uint8") + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/mlu/test_collective_base_mlu.py b/python/paddle/fluid/tests/unittests/mlu/test_collective_base_mlu.py index 2a7c64fe48972330b37ad8ab965f63f070e0ce65..4692c893d00b4595c1927f01c7e1b55dd6935c70 100644 --- a/python/paddle/fluid/tests/unittests/mlu/test_collective_base_mlu.py +++ b/python/paddle/fluid/tests/unittests/mlu/test_collective_base_mlu.py @@ -262,5 +262,13 @@ class TestDistBase(unittest.TestCase): need_result = input2 self.assertTrue(np.allclose(tr0_out, need_result)) self.assertTrue(np.allclose(tr1_out, need_result)) + elif col_type == "allreduce": + need_result = input1 + input2 + self.assertTrue( + np.allclose( + tr0_out, need_result, rtol=1e-05, atol=1e-05)) + self.assertTrue( + np.allclose( + tr1_out, need_result, rtol=1e-05, atol=1e-05)) else: pass