diff --git a/cmake/third_party.cmake b/cmake/third_party.cmake index 7cdbee1746a8ff4610911bc09f1ca882289fd299..7df70477682bb1f1c72ccf9214f995ae8bed07f4 100644 --- a/cmake/third_party.cmake +++ b/cmake/third_party.cmake @@ -331,7 +331,7 @@ if (WITH_PSCORE) include(external/libmct) # download, build, install libmct list(APPEND third_party_deps extern_libmct) - + if (WITH_HETERPS) include(external/rocksdb) # download, build, install libmct list(APPEND third_party_deps extern_rocksdb) diff --git a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt index 0ff8d09703a3d8c8d643f407eb8a2a9e154955e4..3f8d5e6bfd34e5a7d512c32d2c5409bfbd0ea2fc 100644 --- a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt +++ b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt @@ -1,6 +1,24 @@ proto_library(fleet_executor_desc_proto SRCS fleet_executor_desc.proto) -cc_library(fleet_executor SRCS fleet_executor.cc DEPS fleet_executor_desc_proto) - if(WITH_PYTHON) py_proto_compile(fleet_executor_desc_py_proto SRCS fleet_executor_desc.proto) endif() +proto_library(interceptor_message_proto SRCS interceptor_message.proto) + +if(WITH_DISTRIBUTE AND NOT (WITH_ASCEND OR WITH_ASCEND_CL)) + set(BRPC_DEPS brpc) +else() + set(BRPC_DEPS "") +endif() + +cc_library(fleet_executor SRCS fleet_executor.cc carrier.cc + interceptor.cc interceptor_message_service.cc message_bus.cc + DEPS fleet_executor_desc_proto interceptor_message_proto ${BRPC_DEPS}) + +if(WITH_DISTRIBUTE) + set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") + set_source_files_properties(message_bus.h PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) + set_source_files_properties(message_bus.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) + set_source_files_properties(carrier.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) + set_source_files_properties(interceptor_message_service.h PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) + set_source_files_properties(interceptor_message_service.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) +endif() diff --git a/paddle/fluid/distributed/fleet_executor/carrier.cc b/paddle/fluid/distributed/fleet_executor/carrier.cc new file mode 100644 index 0000000000000000000000000000000000000000..443310c9d78fe5ef0f94e1095e217af0481145ac --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/carrier.cc @@ -0,0 +1,43 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/fleet_executor/carrier.h" +#include "paddle/fluid/distributed/fleet_executor/interceptor.h" +#include "paddle/fluid/distributed/fleet_executor/interceptor_message_service.h" +#include "paddle/fluid/distributed/fleet_executor/task_node.h" + +namespace paddle { +namespace distributed { + +Carrier::Carrier( + const std::unordered_map& interceptor_id_to_node) { + // init +} + +Carrier::~Carrier() { + // destroy +} + +bool Carrier::EnqueueInterceptorMessage( + const InterceptorMessage& interceptor_message) { + // enqueue message to interceptor + return true; +} + +void Carrier::CreateInterceptors() { + // create each Interceptor +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/fleet_executor/carrier.h b/paddle/fluid/distributed/fleet_executor/carrier.h new file mode 100644 index 0000000000000000000000000000000000000000..b0b0922e7bad0f40b4c001867347c152986ac21d --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/carrier.h @@ -0,0 +1,60 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "paddle/fluid/distributed/fleet_executor/interceptor_message.pb.h" +#include "paddle/fluid/platform/macros.h" + +namespace paddle { +namespace distributed { + +class Interceptor; +class TaskNode; +class InterceptorMessageServiceImpl; + +class Carrier final { + public: + Carrier() = delete; + + Carrier(const std::unordered_map& interceptor_id_to_node); + + ~Carrier(); + + // Enqueue a message to corresponding interceptor id + bool EnqueueInterceptorMessage(const InterceptorMessage& interceptor_message); + + DISABLE_COPY_AND_ASSIGN(Carrier); + + private: + // create each Interceptor + void CreateInterceptors(); + + // get interceptor based on the interceptor id + Interceptor* GetInterceptor(int64_t interceptor_id); + + // interceptor logic id to the Nodes info + std::unordered_map interceptor_id_to_node_; + + // interceptor logic id to actually interceptor + std::unordered_map> + interceptor_idx_to_interceptor_; +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc index 3373b26d8d69dabf97d617c7b61f55d0ff162724..b184ea8a7160192ce40633d76160c70acdf13fa3 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc @@ -39,5 +39,15 @@ void FleetExecutor::Release() { // Release } +std::shared_ptr FleetExecutor::GetCarrier() { + // get carrier + return nullptr; +} + +std::shared_ptr FleetExecutor::GetMessageBus() { + // get message bus + return nullptr; +} + } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.h b/paddle/fluid/distributed/fleet_executor/fleet_executor.h index 47b97fc833fce596fcb62aee39cd0921a5a9fac9..613dacf5496f77cc58a5fc3a27c9574a92168428 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.h +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.h @@ -14,6 +14,7 @@ #pragma once #include + #include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h" #include "paddle/fluid/platform/macros.h" @@ -24,6 +25,8 @@ class ProgramDesc; namespace distributed { class RuntimeGraph; +class Carrier; +class MessageBus; class FleetExecutor final { public: @@ -33,11 +36,15 @@ class FleetExecutor final { void Init(const paddle::framework::ProgramDesc& program_desc); void Run(); void Release(); + static std::shared_ptr GetCarrier(); + static std::shared_ptr GetMessageBus(); private: DISABLE_COPY_AND_ASSIGN(FleetExecutor); FleetExecutorDesc exe_desc_; std::unique_ptr runtime_graph_; + static std::shared_ptr global_carrier_; + static std::shared_ptr global_message_bus_; }; } // namespace distributed diff --git a/paddle/fluid/distributed/fleet_executor/interceptor.cc b/paddle/fluid/distributed/fleet_executor/interceptor.cc new file mode 100644 index 0000000000000000000000000000000000000000..48b5eddc41095951f7d4e0b23284e912a0479808 --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/interceptor.cc @@ -0,0 +1,46 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/fleet_executor/interceptor.h" + +namespace paddle { +namespace distributed { + +Interceptor::Interceptor(int64_t interceptor_id_, TaskNode* node) { + // init +} + +int64_t Interceptor::GetInterceptorId() const { + // return the interceptor id + return 0; +} + +bool Interceptor::EnqueueRemoteInterceptorMessage( + const InterceptorMessage& interceptor_message) { + // Called by Carrier, enqueue an InterceptorMessage to remote mailbox + return true; +} + +void Interceptor::PoolTheMailbox() { + // pool the local mailbox, parse the Message +} + +bool Interceptor::FetchRemoteMailbox() { + // fetch all Message from remote mailbox to local mailbox + // return true if remote mailbox not empty, otherwise return false + return true; +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/fleet_executor/interceptor.h b/paddle/fluid/distributed/fleet_executor/interceptor.h new file mode 100644 index 0000000000000000000000000000000000000000..a2e25a591bf4fb6fea581ebab145d93ef6a56cf4 --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/interceptor.h @@ -0,0 +1,83 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paddle/fluid/distributed/fleet_executor/interceptor_message.pb.h" +#include "paddle/fluid/platform/macros.h" + +namespace paddle { +namespace distributed { + +class TaskNode; + +class Interceptor { + public: + Interceptor() = delete; + + Interceptor(int64_t interceptor_id_, TaskNode* node); + + virtual ~Interceptor() = default; + + // return the interceptor id + int64_t GetInterceptorId() const; + + // Called by Carrier, enqueue an InterceptorMessage to remote mailbox + bool EnqueueRemoteInterceptorMessage( + const InterceptorMessage& interceptor_message); + + DISABLE_COPY_AND_ASSIGN(Interceptor); + + private: + // pool the local mailbox, parse the Message + void PoolTheMailbox(); + + // fetch all Message from remote mailbox to local mailbox + // return true if remote mailbox not empty, otherwise return false + bool FetchRemoteMailbox(); + + // interceptor id, handed from above layer + int64_t interceptor_id_; + + // node need to be handled by this interceptor + TaskNode* node_; + + // mutex to control read/write conflict for remote mailbox + std::mutex remote_mailbox_mutex_; + + // interceptor runs PoolTheMailbox() function to poll local mailbox + std::thread interceptor_thread_; + + // conditional variable for blocking the thread when + // fetch an empty remote mailbox + std::condition_variable cond_var_; + + // remote mailbox, written by EnqueueRemoteMessage() + // read by FetchRemoteMailbox() + std::queue remote_mailbox_; + + // local mailbox, written by FetchRemoteMailbox() + // read by PoolTheMailbox() + std::queue local_mailbox_; +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/fleet_executor/interceptor_message.proto b/paddle/fluid/distributed/fleet_executor/interceptor_message.proto new file mode 100644 index 0000000000000000000000000000000000000000..a2fe01cfe3822b83fe5ee4b723edc99e23fe5923 --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/interceptor_message.proto @@ -0,0 +1,40 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto2"; +package paddle.distributed; +option cc_generic_services = true; +option cc_enable_arenas = true; + +enum MessageType { + STOP = 1; // STOP an Interceptor + DATA_IS_READY = 2; // upstream data is ready + DATE_IS_USELESS = 3; // downstream has used the data + ERROR = 4; // current Interceptor encounters error + RESET = 5; // reset the status +} + +message InterceptorMessage { + optional int64 src_id = 1 [ default = 0 ]; + optional int64 dst_id = 2 [ default = 0 ]; + optional MessageType message_type = 3 [ default = RESET ]; + optional bool ctrl_message = 4 [ default = false ]; +} + +message InterceptorResponse { optional bool rst = 1 [ default = false ]; } + +service TheInterceptorMessageService { + rpc InterceptorMessageService(InterceptorMessage) + returns (InterceptorResponse); +} diff --git a/paddle/fluid/distributed/fleet_executor/interceptor_message_service.cc b/paddle/fluid/distributed/fleet_executor/interceptor_message_service.cc new file mode 100644 index 0000000000000000000000000000000000000000..c038e2333d29a21db65fa74ab38612f55eb85d5d --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/interceptor_message_service.cc @@ -0,0 +1,31 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef PADDLE_WITH_ASCEND_CL +#ifdef PADDLE_WITH_DISTRIBUTE +#include "paddle/fluid/distributed/fleet_executor/interceptor_message_service.h" + +namespace paddle { +namespace distributed { + +void InterceptorMessageServiceImpl::InterceptorMessageService( + google::protobuf::RpcController* control_base, + const InterceptorMessage* request, InterceptorResponse* response, + google::protobuf::Closure* done) { + // receive msg +} + +} // namespace distributed +} // namespace paddle +#endif +#endif diff --git a/paddle/fluid/distributed/fleet_executor/interceptor_message_service.h b/paddle/fluid/distributed/fleet_executor/interceptor_message_service.h new file mode 100644 index 0000000000000000000000000000000000000000..77eda7816e468dd4ae0b860f33dde189184059ad --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/interceptor_message_service.h @@ -0,0 +1,37 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef PADDLE_WITH_ASCEND_CL +#ifdef PADDLE_WITH_DISTRIBUTE +#pragma once + +#include "brpc/server.h" +#include "paddle/fluid/distributed/fleet_executor/interceptor_message.pb.h" + +namespace paddle { +namespace distributed { + +class InterceptorMessageServiceImpl : public TheInterceptorMessageService { + public: + InterceptorMessageServiceImpl() {} + virtual ~InterceptorMessageServiceImpl() {} + virtual void InterceptorMessageService( + google::protobuf::RpcController* control_base, + const InterceptorMessage* request, InterceptorResponse* response, + google::protobuf::Closure* done); +}; + +} // namespace distributed +} // namespace paddle +#endif +#endif diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.cc b/paddle/fluid/distributed/fleet_executor/message_bus.cc new file mode 100644 index 0000000000000000000000000000000000000000..d529a0ba5fa26269115b83ebabc89d4c30592fba --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/message_bus.cc @@ -0,0 +1,54 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/fleet_executor/message_bus.h" +#include "paddle/fluid/distributed/fleet_executor/carrier.h" + +namespace paddle { +namespace distributed { + +MessageBus::~MessageBus() { + // destroy +} + +bool MessageBus::Send(const InterceptorMessage& interceptor_message) { + // called by Interceptor, send InterceptorMessage to dst + return true; +} + +void MessageBus::ListenPort() { + // function keep listen the port and handle the message +} + +bool MessageBus::IsSameRank(int64_t src_id, int64_t dst_id) { + // check whether the dst is the same rank or different rank with src + return true; +} + +#ifndef PADDLE_WITH_ASCEND_CL +#ifdef PADDLE_WITH_DISTRIBUTE +bool MessageBus::SendInterRank(const InterceptorMessage& interceptor_message) { + // send the message inter rank (dst is different rank with src) + return true; +} +#endif +#endif + +bool MessageBus::SendIntraRank(const InterceptorMessage& interceptor_message) { + // send the message intra rank (dst is the same rank with src) + return true; +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.h b/paddle/fluid/distributed/fleet_executor/message_bus.h new file mode 100644 index 0000000000000000000000000000000000000000..f0f491b603061e6bcd9da96dd761068686e040fd --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/message_bus.h @@ -0,0 +1,94 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#ifndef PADDLE_WITH_ASCEND_CL +#ifdef PADDLE_WITH_DISTRIBUTE +#include "brpc/channel.h" +#include "brpc/server.h" +#endif +#endif + +#include "paddle/fluid/distributed/fleet_executor/interceptor_message.pb.h" +#include "paddle/fluid/platform/macros.h" + +namespace paddle { +namespace distributed { + +class Carrier; + +class MessageBus final { + public: + MessageBus() = delete; + + explicit MessageBus( + const std::unordered_map& interceptor_id_to_rank, + const std::unordered_map& rank_to_addr, + const std::string& addr) + : interceptor_id_to_rank_(interceptor_id_to_rank), + rank_to_addr_(rank_to_addr), + addr_(addr) {} + + ~MessageBus(); + + // called by Interceptor, send InterceptorMessage to dst + bool Send(const InterceptorMessage& interceptor_message); + + DISABLE_COPY_AND_ASSIGN(MessageBus); + + private: + // function keep listen the port and handle the message + void ListenPort(); + + // check whether the dst is the same rank or different rank with src + bool IsSameRank(int64_t src_id, int64_t dst_id); + +#ifndef PADDLE_WITH_ASCEND_CL +#ifdef PADDLE_WITH_DISTRIBUTE + // send the message inter rank (dst is different rank with src) + bool SendInterRank(const InterceptorMessage& interceptor_message); +#endif +#endif + + // send the message intra rank (dst is the same rank with src) + bool SendIntraRank(const InterceptorMessage& interceptor_message); + + // handed by above layer, save the info mapping interceptor id to rank id + std::unordered_map interceptor_id_to_rank_; + + // handed by above layer, save the info mapping rank id to addr + std::unordered_map rank_to_addr_; + + // the ip needs to be listened + std::string addr_; + +#ifndef PADDLE_WITH_ASCEND_CL +#ifdef PADDLE_WITH_DISTRIBUTE + // brpc server + brpc::Server server_; +#endif +#endif + + // thread keeps listening to the port to receive remote message + // this thread runs ListenPort() function + std::thread listen_port_thread_; +}; + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/fleet_executor/task_node.h b/paddle/fluid/distributed/fleet_executor/task_node.h new file mode 100644 index 0000000000000000000000000000000000000000..62fb9dfb011886ef8b8fffccf3919887d70217a4 --- /dev/null +++ b/paddle/fluid/distributed/fleet_executor/task_node.h @@ -0,0 +1,27 @@ +// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +namespace paddle { +namespace distributed { + +class TaskNode final { + public: + TaskNode() = default; + ~TaskNode() = default; +}; + +} // namespace distributed +} // namespace paddle