diff --git a/paddle/fluid/operators/collective/CMakeLists.txt b/paddle/fluid/operators/collective/CMakeLists.txt index bd88c8f9cd2b40d30a16b10e56aadb556fe91b06..89c573d2dcb717aa1f3b1d4e19cbd5828e390afb 100644 --- a/paddle/fluid/operators/collective/CMakeLists.txt +++ b/paddle/fluid/operators/collective/CMakeLists.txt @@ -11,7 +11,7 @@ foreach(src ${OPS}) set_source_files_properties(${src} PROPERTIES COMPILE_FLAGS ${COLLECTIVE_COMPILE_FLAGS}) endforeach() -register_operators(EXCLUDES c_gen_bkcl_id_op gen_bkcl_id_op c_gen_nccl_id_op gen_nccl_id_op c_gen_hccl_id_op gen_hccl_id_op DEPS ${COLLECTIVE_DEPS}) +register_operators(EXCLUDES c_gen_bkcl_id_op gen_bkcl_id_op c_gen_nccl_id_op gen_nccl_id_op c_gen_hccl_id_op gen_hccl_id_op c_gen_cncl_id_op DEPS ${COLLECTIVE_DEPS}) if(WITH_NCCL OR WITH_RCCL) set(COLLECTIVE_DEPS ${COLLECTIVE_DEPS} nccl_common collective_helper) @@ -29,6 +29,11 @@ if(WITH_XPU_BKCL) op_library(gen_bkcl_id_op DEPS ${COLLECTIVE_DEPS}) endif() +if(WITH_CNCL) + set(COLLECTIVE_DEPS ${COLLECTIVE_DEPS} collective_helper) + op_library(c_gen_cncl_id_op DEPS ${COLLECTIVE_DEPS}) +endif() + if(WITH_ASCEND_CL) cc_library(gen_hccl_id_op_helper SRCS gen_hccl_id_op_helper.cc DEPS dynload_warpctc dynamic_loader scope) set(COLLECTIVE_DEPS ${COLLECTIVE_DEPS} collective_helper gen_hccl_id_op_helper) diff --git a/paddle/fluid/operators/collective/c_gen_cncl_id_op.cc b/paddle/fluid/operators/collective/c_gen_cncl_id_op.cc new file mode 100644 index 0000000000000000000000000000000000000000..7e65fba571800b4d48a931dbdaa4bacc5f534e81 --- /dev/null +++ b/paddle/fluid/operators/collective/c_gen_cncl_id_op.cc @@ -0,0 +1,119 @@ +/* Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ +#include +#include + +#include "paddle/fluid/framework/op_proto_maker.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/operator.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/var_type_traits.h" +#include "paddle/fluid/platform/device_context.h" +#include "paddle/fluid/platform/enforce.h" +#include "paddle/fluid/platform/place.h" + +#include "paddle/fluid/platform/gen_comm_id_helper.h" + +namespace paddle { +namespace operators { + +static void GenCNCLID(std::vector* cncl_ids) { + for (size_t i = 0; i < cncl_ids->size(); ++i) { + PADDLE_ENFORCE_MLU_SUCCESS(cnclGetCliqueId(&(*cncl_ids)[i])); + } +} + +static void CopyCNCLIDToVar(const std::vector& cncl_ids, + std::function func, + const framework::Scope& scope) { + for (size_t i = 0; i < cncl_ids.size(); ++i) { + std::string var_name = func(i); + auto var = scope.FindVar(var_name); + PADDLE_ENFORCE_NOT_NULL( + var, platform::errors::NotFound("Variable with name %s is not found", + var_name.c_str())); + auto cncl_id = var->GetMutable(); + memcpy(cncl_id, &cncl_ids[i], sizeof(cnclCliqueId)); + } +} + +class CGenCNCLIdOp : public framework::OperatorBase { + public: + CGenCNCLIdOp(const std::string& type, + const framework::VariableNameMap& inputs, + const framework::VariableNameMap& outputs, + const framework::AttributeMap& attrs) + : OperatorBase(type, inputs, outputs, attrs) {} + + void RunImpl(const framework::Scope& scope, + const platform::Place& dev_place) const override { + int rank = Attr("rank"); + int ring_id = Attr("ring_id"); + + std::function func = [&](size_t i) -> std::string { + return Output("Out"); + }; + + std::string endpoint = Attr("endpoint"); + int server_fd = platform::SocketServer::GetInstance(endpoint).socket(); + + std::vector cncl_ids; + cncl_ids.resize(1); + + if (rank == 0) { + GenCNCLID(&cncl_ids); + std::vector endpoint_list = + Attr>("other_endpoints"); + platform::SendBroadCastCommID(endpoint_list, &cncl_ids, ring_id); + } else { + platform::RecvBroadCastCommID(server_fd, endpoint, &cncl_ids, ring_id); + } + + CopyCNCLIDToVar(cncl_ids, func, scope); + } +}; + +class CGenCNCLIdOpMaker : public framework::OpProtoAndCheckerMaker { + public: + void Make() override { + AddOutput("Out", "Raw variable contains a CNCL CliqueId instaces."); + AddComment(R"DOC( +CGenCNCLId operator + +For trainer 0: generate a new CliqueId and send it to all the other trainers. +For trainer 1~n: start a gRPC server to get the CliqueId, once got, stop the server. +)DOC"); + AddAttr("endpoint", + "(string), e.g. 127.0.0.1:6175 " + "current listen endpoint"); + AddAttr>( + "other_endpoints", + "['trainer1_ip:port', 'trainer2_ip:port', ...] " + "list of other trainer endpoints") + .SetDefault({}); + AddAttr("rank", + "(int default 0) " + "The rank of the trainer in distributed training.") + .SetDefault(0); + AddAttr("ring_id", "(int default 0) user specified ring id") + .SetDefault(0); + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; + +REGISTER_OPERATOR(c_gen_cncl_id, ops::CGenCNCLIdOp, ops::CGenCNCLIdOpMaker); diff --git a/paddle/fluid/platform/gen_comm_id_helper.cc b/paddle/fluid/platform/gen_comm_id_helper.cc index 1b77eb42837d4193cf8f7decbfd41634f16ac882..bbec743d26f3b7ccf9ed9f049f3100dc25f50bad 100644 --- a/paddle/fluid/platform/gen_comm_id_helper.cc +++ b/paddle/fluid/platform/gen_comm_id_helper.cc @@ -12,8 +12,9 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \ - defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \ + defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_ASCEND_CL) || \ + defined(PADDLE_WITH_CNCL) #include "paddle/fluid/platform/gen_comm_id_helper.h" #include @@ -37,6 +38,10 @@ limitations under the License. */ #include "paddle/fluid/platform/collective_helper.h" #endif +#if defined(PADDLE_WITH_CNCL) +#include +#endif + DECLARE_int32(get_host_by_name_time); namespace paddle { @@ -430,6 +435,9 @@ INSTANT_TEMPLATE(BKCLUniqueId) #ifdef PADDLE_WITH_ASCEND_CL INSTANT_TEMPLATE(HcclRootInfo) #endif +#ifdef PADDLE_WITH_CNCL +INSTANT_TEMPLATE(cnclCliqueId) +#endif } // namespace platform } // namespace paddle diff --git a/paddle/fluid/platform/gen_comm_id_helper.h b/paddle/fluid/platform/gen_comm_id_helper.h index 9bbbb1f424a74fe080b5dfcb7bfc0df9c7272356..952090010a40e3bd5f5a612d4652787aaf2d9fe4 100644 --- a/paddle/fluid/platform/gen_comm_id_helper.h +++ b/paddle/fluid/platform/gen_comm_id_helper.h @@ -14,8 +14,9 @@ limitations under the License. */ #pragma once -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \ - defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_ASCEND_CL) +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) || \ + defined(PADDLE_WITH_XPU_BKCL) || defined(PADDLE_WITH_ASCEND_CL) || \ + defined(PADDLE_WITH_CNCL) #include #include #include diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 84d3f5547feb4b5c4fa6d3d9b88a57b9a1e52344..bb77f6031f7f99f85925cc805ee9b8ae57fc17df 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -2467,7 +2467,7 @@ class Operator(object): 'c_comm_init', 'c_sync_calc_stream', 'c_sync_comm_stream', 'queue_generator', 'dequeue', 'enqueue', 'heter_listen_and_serv', 'c_wait_comm', 'c_wait_compute', 'c_gen_hccl_id', 'c_comm_init_hccl', - 'copy_cross_scope' + 'copy_cross_scope', 'c_gen_cncl_id' } def __init__(self,