diff --git a/cmake/third_party.cmake b/cmake/third_party.cmake index 66568037ac0b098ed39833c4144ba8cfdaa8f441..6fac4d2c64080d7af7d200696c4430dcf91f96f0 100755 --- a/cmake/third_party.cmake +++ b/cmake/third_party.cmake @@ -444,6 +444,18 @@ if(WITH_RPC list(APPEND third_party_deps extern_brpc) endif() +if(WITH_DISTRIBUTE + AND NOT WITH_PSLIB + AND NOT WITH_PSCORE) + include(external/snappy) + list(APPEND third_party_deps extern_snappy) + + include(external/leveldb) + list(APPEND third_party_deps extern_leveldb) + include(external/brpc) + list(APPEND third_party_deps extern_brpc) +endif() + if(WITH_XBYAK) include(external/xbyak) # download, build, install xbyak list(APPEND third_party_deps extern_xbyak) diff --git a/paddle/fluid/distributed/CMakeLists.txt b/paddle/fluid/distributed/CMakeLists.txt index 34d480de9ee780db13c7428e0e42bb95e894b5b6..97e44772962bbaf8442d132dc4e6a50d91e02f18 100755 --- a/paddle/fluid/distributed/CMakeLists.txt +++ b/paddle/fluid/distributed/CMakeLists.txt @@ -1,5 +1,6 @@ add_subdirectory(auto_parallel) add_subdirectory(collective) +add_subdirectory(fleet_executor) if(WITH_PYTHON) py_proto_compile(ps_py_proto SRCS the_one_ps.proto) add_custom_target( @@ -32,7 +33,6 @@ if(WITH_RPC) endif() if(NOT WITH_PSCORE) - add_subdirectory(fleet_executor) return() endif() @@ -49,4 +49,3 @@ add_subdirectory(common) add_subdirectory(ps) add_subdirectory(test) add_subdirectory(index_dataset) -add_subdirectory(fleet_executor) diff --git a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt index 3cafb0bdb5f9278c97e19261e545427ae12119d5..cc5ed287e954f67d9c2877a413333d72a4bde534 100755 --- a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt +++ b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt @@ -6,7 +6,7 @@ proto_library(interceptor_message_proto SRCS interceptor_message.proto) if(WITH_ARM_BRPC) set(BRPC_DEPS arm_brpc snappy gflags glog) -elseif(WITH_DISTRIBUTE AND WITH_PSCORE) +elseif(WITH_DISTRIBUTE) set(BRPC_DEPS brpc ssl diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.cc b/paddle/fluid/distributed/fleet_executor/message_bus.cc index b73ee060a1719e7a778c256af635cdb6dfa3537a..d1a23cc5752966d62a21c166337a6c1a646222f4 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.cc +++ b/paddle/fluid/distributed/fleet_executor/message_bus.cc @@ -73,7 +73,7 @@ bool MessageBus::IsInit() const { return is_init_; } MessageBus::~MessageBus() { VLOG(3) << "Message bus releases resource."; -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) +#if defined(PADDLE_WITH_DISTRIBUTE) server_.Stop(1000); server_.Join(); #endif @@ -94,7 +94,7 @@ bool MessageBus::Send(int64_t dst_rank, true, platform::errors::PreconditionNotMet( "Using message bus since it has not been initialized.")); -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) +#if defined(PADDLE_WITH_DISTRIBUTE) int retry_time = 0; // message bus will retry sending for 10 times while (retry_time < 10) { ++retry_time; @@ -179,7 +179,7 @@ void MessageBus::ListenPort() { LOG(INFO) << "No need listen to port since training on single card."; return; } -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) +#if defined(PADDLE_WITH_DISTRIBUTE) // function keep listen the port and handle the message PADDLE_ENFORCE_EQ( server_.AddService(&message_service_, brpc::SERVER_DOESNT_OWN_SERVICE), @@ -209,7 +209,7 @@ void MessageBus::ListenPort() { #endif } -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) +#if defined(PADDLE_WITH_DISTRIBUTE) bool MessageBus::SendInterRank(int64_t dst_rank, const InterceptorMessage& interceptor_message) { const auto& dst_addr = GetAddr(dst_rank); diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.h b/paddle/fluid/distributed/fleet_executor/message_bus.h index dfd65fdbc00d445a11f60f4e1cde4f4da77b80dc..481a64b71c7dd9138b270ab7fbfbf8791c13f557 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.h +++ b/paddle/fluid/distributed/fleet_executor/message_bus.h @@ -20,7 +20,7 @@ #include #include -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) +#if defined(PADDLE_WITH_DISTRIBUTE) #include "brpc/channel.h" #include "brpc/server.h" #include "paddle/fluid/distributed/fleet_executor/message_service.h" @@ -63,7 +63,7 @@ class MessageBus final { const std::string& GetAddr(int64_t rank) const; -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) +#if defined(PADDLE_WITH_DISTRIBUTE) // send the message inter rank (dst is different rank with src) bool SendInterRank(int64_t dst_rank, const InterceptorMessage& interceptor_message); @@ -79,7 +79,7 @@ class MessageBus final { // the ip needs to be listened std::string addr_; -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) +#if defined(PADDLE_WITH_DISTRIBUTE) MessageServiceImpl message_service_; // brpc server brpc::Server server_; diff --git a/paddle/fluid/distributed/fleet_executor/message_service.cc b/paddle/fluid/distributed/fleet_executor/message_service.cc index 390024b67ab6bf14895cbc72f6de93fc431b7f76..5a1f3bf34d9fb80d3d0452acde3fef5d7faae756 100644 --- a/paddle/fluid/distributed/fleet_executor/message_service.cc +++ b/paddle/fluid/distributed/fleet_executor/message_service.cc @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) +#if defined(PADDLE_WITH_DISTRIBUTE) #include "paddle/fluid/distributed/fleet_executor/message_service.h" #include "brpc/server.h" diff --git a/paddle/fluid/distributed/fleet_executor/message_service.h b/paddle/fluid/distributed/fleet_executor/message_service.h index 54ce0b6c1c41654fb09c356a91282deec98adb20..115732ea08f1248eca2cab4a7494557308e22f67 100644 --- a/paddle/fluid/distributed/fleet_executor/message_service.h +++ b/paddle/fluid/distributed/fleet_executor/message_service.h @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) +#if defined(PADDLE_WITH_DISTRIBUTE) #pragma once #include "brpc/server.h" diff --git a/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt b/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt index a0078c675d5052b910cc0e65f4968f70894ab451..466071e3be38e4b8657dba693995f4621487af93 100644 --- a/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt +++ b/paddle/fluid/distributed/fleet_executor/test/CMakeLists.txt @@ -51,9 +51,7 @@ cc_test_old( scope device_context) -if(WITH_DISTRIBUTE - AND WITH_PSCORE - AND NOT (WITH_ASCEND OR WITH_ASCEND_CL)) +if(WITH_DISTRIBUTE AND NOT (WITH_ASCEND OR WITH_ASCEND_CL)) set_source_files_properties( interceptor_ping_pong_with_brpc_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 71288a44c096984491c8ecaccc32a5d7fdce7dea..a7dfa61f7ef4bd83a24b76d8b23ad6b961683e83 100755 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -630,16 +630,18 @@ if(WITH_PYTHON) ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto COMMENT "Copy generated python proto into directory paddle/fluid/proto." WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) - add_custom_target( - fleet_executor_proto_init ALL - DEPENDS fleet_proto_init fleet_executor_desc_py_proto - COMMAND - cp - ${PADDLE_BINARY_DIR}/paddle/fluid/distributed/fleet_executor/fleet_executor_*.py - ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto - COMMENT - "Copy generated python proto into directory paddle/distributed/fleet/proto." - ) + if(NOT WITH_ROCM) + add_custom_target( + fleet_executor_proto_init ALL + DEPENDS fleet_proto_init fleet_executor_desc_py_proto + COMMAND + cp + ${PADDLE_BINARY_DIR}/paddle/fluid/distributed/fleet_executor/fleet_executor_*.py + ${PADDLE_BINARY_DIR}/python/paddle/distributed/fleet/proto + COMMENT + "Copy generated python proto into directory paddle/distributed/fleet/proto." + ) + endif() else() string(REPLACE "/" "\\" proto_dstpath "${PADDLE_BINARY_DIR}/python/paddle/fluid/proto/")