From 93f20a0766dd647d508d0160d6d7d24f87e8b2d4 Mon Sep 17 00:00:00 2001 From: LiYuRio <63526175+LiYuRio@users.noreply.github.com> Date: Fri, 13 Jan 2023 14:41:30 +0800 Subject: [PATCH] remove ps_core dependency (#49716) --- cmake/third_party.cmake | 12 ++++++++++ paddle/fluid/distributed/CMakeLists.txt | 3 +-- .../distributed/fleet_executor/CMakeLists.txt | 2 +- .../distributed/fleet_executor/message_bus.cc | 8 +++---- .../distributed/fleet_executor/message_bus.h | 6 ++--- .../fleet_executor/message_service.cc | 2 +- .../fleet_executor/message_service.h | 2 +- .../fleet_executor/test/CMakeLists.txt | 4 +--- paddle/fluid/framework/CMakeLists.txt | 22 ++++++++++--------- 9 files changed, 36 insertions(+), 25 deletions(-) diff --git a/cmake/third_party.cmake b/cmake/third_party.cmake index 66568037ac0..6fac4d2c640 100755 --- a/cmake/third_party.cmake +++ b/cmake/third_party.cmake @@ -444,6 +444,18 @@ if(WITH_RPC list(APPEND third_party_deps extern_brpc) endif() +if(WITH_DISTRIBUTE + AND NOT WITH_PSLIB + AND NOT WITH_PSCORE) + include(external/snappy) + list(APPEND third_party_deps extern_snappy) + + include(external/leveldb) + list(APPEND third_party_deps extern_leveldb) + include(external/brpc) + list(APPEND third_party_deps extern_brpc) +endif() + if(WITH_XBYAK) include(external/xbyak) # download, build, install xbyak list(APPEND third_party_deps extern_xbyak) diff --git a/paddle/fluid/distributed/CMakeLists.txt b/paddle/fluid/distributed/CMakeLists.txt index 34d480de9ee..97e44772962 100755 --- a/paddle/fluid/distributed/CMakeLists.txt +++ b/paddle/fluid/distributed/CMakeLists.txt @@ -1,5 +1,6 @@ add_subdirectory(auto_parallel) add_subdirectory(collective) +add_subdirectory(fleet_executor) if(WITH_PYTHON) py_proto_compile(ps_py_proto SRCS the_one_ps.proto) add_custom_target( @@ -32,7 +33,6 @@ if(WITH_RPC) endif() if(NOT WITH_PSCORE) - add_subdirectory(fleet_executor) return() endif() @@ -49,4 +49,3 @@ add_subdirectory(common) add_subdirectory(ps) add_subdirectory(test) add_subdirectory(index_dataset) -add_subdirectory(fleet_executor) diff --git a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt index 3cafb0bdb5f..cc5ed287e95 100755 --- a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt +++ b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt @@ -6,7 +6,7 @@ proto_library(interceptor_message_proto SRCS interceptor_message.proto) if(WITH_ARM_BRPC) set(BRPC_DEPS arm_brpc snappy gflags glog) -elseif(WITH_DISTRIBUTE AND WITH_PSCORE) +elseif(WITH_DISTRIBUTE) set(BRPC_DEPS brpc ssl diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.cc b/paddle/fluid/distributed/fleet_executor/message_bus.cc index b73ee060a17..d1a23cc5752 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.cc +++ b/paddle/fluid/distributed/fleet_executor/message_bus.cc @@ -73,7 +73,7 @@ bool MessageBus::IsInit() const { return is_init_; } MessageBus::~MessageBus() { VLOG(3) << "Message bus releases resource."; -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) +#if defined(PADDLE_WITH_DISTRIBUTE) server_.Stop(1000); server_.Join(); #endif @@ -94,7 +94,7 @@ bool MessageBus::Send(int64_t dst_rank, true, platform::errors::PreconditionNotMet( "Using message bus since it has not been initialized.")); -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) +#if defined(PADDLE_WITH_DISTRIBUTE) int retry_time = 0; // message bus will retry sending for 10 times while (retry_time < 10) { ++retry_time; @@ -179,7 +179,7 @@ void MessageBus::ListenPort() { LOG(INFO) << "No need listen to port since training on single card."; return; } -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) +#if defined(PADDLE_WITH_DISTRIBUTE) // function keep listen the port and handle the message PADDLE_ENFORCE_EQ( server_.AddService(&message_service_, brpc::SERVER_DOESNT_OWN_SERVICE), @@ -209,7 +209,7 @@ void MessageBus::ListenPort() { #endif } -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) +#if defined(PADDLE_WITH_DISTRIBUTE) bool MessageBus::SendInterRank(int64_t dst_rank, const InterceptorMessage& interceptor_message) { const auto& dst_addr = GetAddr(dst_rank); diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.h b/paddle/fluid/distributed/fleet_executor/message_bus.h index dfd65fdbc00..481a64b71c7 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.h +++ b/paddle/fluid/distributed/fleet_executor/message_bus.h @@ -20,7 +20,7 @@ #include #include -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) +#if defined(PADDLE_WITH_DISTRIBUTE) #include "brpc/channel.h" #include "brpc/server.h" #include "paddle/fluid/distributed/fleet_executor/message_service.h" @@ -63,7 +63,7 @@ class MessageBus final { const std::string& GetAddr(int64_t rank) const; -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) +#if defined(PADDLE_WITH_DISTRIBUTE) // send the message inter rank (dst is different rank with src) bool SendInterRank(int64_t dst_rank, const InterceptorMessage& interceptor_message); @@ -79,7 +79,7 @@ class MessageBus final { // the ip needs to be listened std::string addr_; -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) +#if defined(PADDLE_WITH_DISTRIBUTE) MessageServiceImpl message_service_; // brpc server brpc::Server server_; diff --git a/paddle/fluid/distributed/fleet_executor/message_service.cc b/paddle/fluid/distributed/fleet_executor/message_service.cc index 390024b67ab..5a1f3bf34d9 100644 --- a/paddle/fluid/distributed/fleet_executor/message_service.cc +++ b/paddle/fluid/distributed/fleet_executor/message_service.cc @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) +#if defined(PADDLE_WITH_DISTRIBUTE) #include "paddle/fluid/distributed/fleet_executor/message_service.h" #include "brpc/server.h" diff --git a/paddle/fluid/distributed/fleet_executor/message_service.h b/paddle/fluid/distributed/fleet_executor/message_service.h index 54ce0b6c1c4..115732ea08f 100644 --- a/paddle/fluid/distributed/fleet_executor/message_service.h +++ b/paddle/fluid/distributed/fleet_executor/message_service.h @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) +#if defined(PADDLE_WITH_DISTRIBUTE) #pragma once #include "brpc/server.h" diff --git a/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt b/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt index a0078c675d5..466071e3be3 100644 --- a/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt +++ b/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt @@ -51,9 +51,7 @@ cc_test_old( scope device_context) -if(WITH_DISTRIBUTE - AND WITH_PSCORE - AND NOT (WITH_ASCEND OR WITH_ASCEND_CL)) +if(WITH_DISTRIBUTE AND NOT (WITH_ASCEND OR WITH_ASCEND_CL)) set_source_files_properties( interceptor_ping_pong_with_brpc_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 71288a44c09..a7dfa61f7ef 100755 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -630,16 +630,18 @@ if(WITH_PYTHON) ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto COMMENT "Copy generated python proto into directory paddle/fluid/proto." WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) - add_custom_target( - fleet_executor_proto_init ALL - DEPENDS fleet_proto_init fleet_executor_desc_py_proto - COMMAND - cp - ${PADDLE_BINARY_DIR}/paddle/fluid/distributed/fleet_executor/fleet_executor_*.py - ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto - COMMENT - "Copy generated python proto into directory paddle/distributed/fleet/proto." - ) + if(NOT WITH_ROCM) + add_custom_target( + fleet_executor_proto_init ALL + DEPENDS fleet_proto_init fleet_executor_desc_py_proto + COMMAND + cp + ${PADDLE_BINARY_DIR}/paddle/fluid/distributed/fleet_executor/fleet_executor_*.py + ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto + COMMENT + "Copy generated python proto into directory paddle/distributed/fleet/proto." + ) + endif() else() string(REPLACE "/" "\\" proto_dstpath "${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/") -- GitLab