未验证 提交 c1bf9664 编写于 作者: G gongweibao 提交者: GitHub

Add options to disable SO_REUSEPORT of grpc. (#14269)

上级 60d25bfe
...@@ -22,6 +22,8 @@ limitations under the License. */ ...@@ -22,6 +22,8 @@ limitations under the License. */
#include "paddle/fluid/operators/distributed/request_handler.h" #include "paddle/fluid/operators/distributed/request_handler.h"
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
DECLARE_bool(rpc_disable_reuse_port);
namespace paddle { namespace paddle {
namespace operators { namespace operators {
namespace distributed { namespace distributed {
...@@ -383,6 +385,9 @@ std::shared_ptr<grpc::Channel> GRPCClient::GetChannel(const std::string& ep) { ...@@ -383,6 +385,9 @@ std::shared_ptr<grpc::Channel> GRPCClient::GetChannel(const std::string& ep) {
// Channel configurations: // Channel configurations:
grpc::ChannelArguments args; grpc::ChannelArguments args;
args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 2000); args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 2000);
if (FLAGS_rpc_disable_reuse_port) {
args.SetInt(GRPC_ARG_ALLOW_REUSEPORT, 0);
}
args.SetCompressionAlgorithm(GRPC_COMPRESS_NONE); args.SetCompressionAlgorithm(GRPC_COMPRESS_NONE);
args.SetMaxSendMessageSize(std::numeric_limits<int>::max()); args.SetMaxSendMessageSize(std::numeric_limits<int>::max());
args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max()); args.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
......
...@@ -20,6 +20,8 @@ limitations under the License. */ ...@@ -20,6 +20,8 @@ limitations under the License. */
using ::grpc::ServerAsyncResponseWriter; using ::grpc::ServerAsyncResponseWriter;
DECLARE_bool(rpc_disable_reuse_port);
namespace paddle { namespace paddle {
namespace operators { namespace operators {
namespace distributed { namespace distributed {
...@@ -252,6 +254,20 @@ void AsyncGRPCServer::WaitServerReady() { ...@@ -252,6 +254,20 @@ void AsyncGRPCServer::WaitServerReady() {
VLOG(40) << "AsyncGRPCServer WaitSeverReady"; VLOG(40) << "AsyncGRPCServer WaitSeverReady";
} }
// Define an option subclass in order to disable SO_REUSEPORT for the
// server socket.
// Come from:
// https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc
class NoReusePortOption : public ::grpc::ServerBuilderOption {
public:
void UpdateArguments(::grpc::ChannelArguments* args) override {
args->SetInt(GRPC_ARG_ALLOW_REUSEPORT, 0);
}
void UpdatePlugins(std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>>*
plugins) override {}
};
void AsyncGRPCServer::StartServer() { void AsyncGRPCServer::StartServer() {
::grpc::ServerBuilder builder; ::grpc::ServerBuilder builder;
builder.AddListeningPort(bind_address_, ::grpc::InsecureServerCredentials(), builder.AddListeningPort(bind_address_, ::grpc::InsecureServerCredentials(),
...@@ -259,6 +275,10 @@ void AsyncGRPCServer::StartServer() { ...@@ -259,6 +275,10 @@ void AsyncGRPCServer::StartServer() {
builder.SetMaxSendMessageSize(std::numeric_limits<int>::max()); builder.SetMaxSendMessageSize(std::numeric_limits<int>::max());
builder.SetMaxReceiveMessageSize(std::numeric_limits<int>::max()); builder.SetMaxReceiveMessageSize(std::numeric_limits<int>::max());
if (FLAGS_rpc_disable_reuse_port) {
builder.SetOption(
std::unique_ptr<::grpc::ServerBuilderOption>(new NoReusePortOption));
}
builder.RegisterService(&service_); builder.RegisterService(&service_);
for (auto t : rpc_call_map_) { for (auto t : rpc_call_map_) {
......
...@@ -22,6 +22,8 @@ limitations under the License. */ ...@@ -22,6 +22,8 @@ limitations under the License. */
#include "paddle/fluid/operators/distributed/sendrecvop_utils.h" #include "paddle/fluid/operators/distributed/sendrecvop_utils.h"
#include "paddle/fluid/operators/distributed/variable_response.h" #include "paddle/fluid/operators/distributed/variable_response.h"
DEFINE_bool(rpc_disable_reuse_port, false, "Disable SO_REUSEPORT or not.");
namespace paddle { namespace paddle {
namespace operators { namespace operators {
namespace distributed { namespace distributed {
......
...@@ -129,6 +129,7 @@ def __bootstrap__(): ...@@ -129,6 +129,7 @@ def __bootstrap__():
read_env_flags.append('rpc_send_thread_num') read_env_flags.append('rpc_send_thread_num')
read_env_flags.append('rpc_get_thread_num') read_env_flags.append('rpc_get_thread_num')
read_env_flags.append('rpc_prefetch_thread_num') read_env_flags.append('rpc_prefetch_thread_num')
read_env_flags.append('rpc_disable_reuse_port')
if core.is_compiled_with_cuda(): if core.is_compiled_with_cuda():
read_env_flags += [ read_env_flags += [
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册