diff --git a/paddle/fluid/operators/distributed/grpc_client.cc b/paddle/fluid/operators/distributed/grpc_client.cc index c28f86146d3040c6a26cabfb795eff67375d4b76..0bd76b3f6c01bd176ef539f9d1121a396bcfd08a 100644 --- a/paddle/fluid/operators/distributed/grpc_client.cc +++ b/paddle/fluid/operators/distributed/grpc_client.cc @@ -22,6 +22,8 @@ limitations under the License. */ #include "paddle/fluid/operators/distributed/request_handler.h" #include "paddle/fluid/platform/profiler.h" +DECLARE_bool(rpc_disable_reuse_port); + namespace paddle { namespace operators { namespace distributed { @@ -383,6 +385,9 @@ std::shared_ptr GRPCClient::GetChannel(const std::string& ep) { // Channel configurations: grpc::ChannelArguments args; args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 2000); + if (FLAGS_rpc_disable_reuse_port) { + args.SetInt(GRPC_ARG_ALLOW_REUSEPORT, 0); + } args.SetCompressionAlgorithm(GRPC_COMPRESS_NONE); args.SetMaxSendMessageSize(std::numeric_limits::max()); args.SetMaxReceiveMessageSize(std::numeric_limits::max()); diff --git a/paddle/fluid/operators/distributed/grpc_server.cc b/paddle/fluid/operators/distributed/grpc_server.cc index ffd2b1707bea6c9379dc09c629fa4c920dac8ed0..77bf67be25780b1fe18c0b2c56680b549637d547 100644 --- a/paddle/fluid/operators/distributed/grpc_server.cc +++ b/paddle/fluid/operators/distributed/grpc_server.cc @@ -20,6 +20,8 @@ limitations under the License. */ using ::grpc::ServerAsyncResponseWriter; +DECLARE_bool(rpc_disable_reuse_port); + namespace paddle { namespace operators { namespace distributed { @@ -252,6 +254,20 @@ void AsyncGRPCServer::WaitServerReady() { VLOG(40) << "AsyncGRPCServer WaitSeverReady"; } +// Define an option subclass in order to disable SO_REUSEPORT for the +// server socket. +// Come from: +// https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc +class NoReusePortOption : public ::grpc::ServerBuilderOption { + public: + void UpdateArguments(::grpc::ChannelArguments* args) override { + args->SetInt(GRPC_ARG_ALLOW_REUSEPORT, 0); + } + + void UpdatePlugins(std::vector>* + plugins) override {} +}; + void AsyncGRPCServer::StartServer() { ::grpc::ServerBuilder builder; builder.AddListeningPort(bind_address_, ::grpc::InsecureServerCredentials(), @@ -259,6 +275,10 @@ void AsyncGRPCServer::StartServer() { builder.SetMaxSendMessageSize(std::numeric_limits::max()); builder.SetMaxReceiveMessageSize(std::numeric_limits::max()); + if (FLAGS_rpc_disable_reuse_port) { + builder.SetOption( + std::unique_ptr<::grpc::ServerBuilderOption>(new NoReusePortOption)); + } builder.RegisterService(&service_); for (auto t : rpc_call_map_) { diff --git a/paddle/fluid/operators/distributed/sendrecvop_utils.cc b/paddle/fluid/operators/distributed/sendrecvop_utils.cc index 374fa680e3681d2e4b1d7513a9522810a15fe485..df5af3476fc159414f21668f9f192028e2994b21 100644 --- a/paddle/fluid/operators/distributed/sendrecvop_utils.cc +++ b/paddle/fluid/operators/distributed/sendrecvop_utils.cc @@ -22,6 +22,8 @@ limitations under the License. */ #include "paddle/fluid/operators/distributed/sendrecvop_utils.h" #include "paddle/fluid/operators/distributed/variable_response.h" +DEFINE_bool(rpc_disable_reuse_port, false, "Disable SO_REUSEPORT or not."); + namespace paddle { namespace operators { namespace distributed { diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index 3c092dee3438c0399d216450f076a2e38456d9ab..d851b9dfaa9fb5eeefc5aa1e03e30a56e323f758 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -129,6 +129,7 @@ def __bootstrap__(): read_env_flags.append('rpc_send_thread_num') read_env_flags.append('rpc_get_thread_num') read_env_flags.append('rpc_prefetch_thread_num') + read_env_flags.append('rpc_disable_reuse_port') if core.is_compiled_with_cuda(): read_env_flags += [