From 94cb210b3d2271ef33eb045817947f620d59a198 Mon Sep 17 00:00:00 2001 From: Chengmo Date: Tue, 9 Feb 2021 16:54:15 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90Cherry-pick=E3=80=91Fix=20Parameter=20?= =?UTF-8?q?Server=20Bug=20(#30860)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 【Paddle.Fleet】Fix brpc get hostname (#30703) * fix Brpc get hostname * fix int64 bug (#30780) fix push sparse int64 bug --- .../fluid/distributed/service/CMakeLists.txt | 6 ++-- .../distributed/service/brpc_ps_client.cc | 24 +++++++++++---- .../distributed/service/brpc_ps_client.h | 1 + .../distributed/service/brpc_ps_server.cc | 14 +++++++-- .../distributed/service/brpc_ps_server.h | 2 +- .../fluid/distributed/service/brpc_utils.cc | 30 +++++++++++++++++++ paddle/fluid/distributed/service/brpc_utils.h | 4 ++- .../fluid/distributed/service/communicator.cc | 2 +- .../fluid/distributed/service/heter_client.cc | 10 ++++++- .../fluid/distributed/service/heter_server.cc | 10 ++++++- 10 files changed, 87 insertions(+), 16 deletions(-) diff --git a/paddle/fluid/distributed/service/CMakeLists.txt b/paddle/fluid/distributed/service/CMakeLists.txt index 6d16ec1dda9..bb3f6f1174d 100644 --- a/paddle/fluid/distributed/service/CMakeLists.txt +++ b/paddle/fluid/distributed/service/CMakeLists.txt @@ -25,9 +25,10 @@ set_source_files_properties(client.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMP set_source_files_properties(ps_client.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(server.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) +cc_library(brpc_utils SRCS brpc_utils.cc DEPS tensor device_context ${COMMON_DEPS} ${RPC_DEPS}) -cc_library(downpour_server SRCS brpc_ps_server.cc DEPS boost eigen3 table ${RPC_DEPS}) -cc_library(downpour_client SRCS brpc_ps_client.cc DEPS boost eigen3 table ${RPC_DEPS}) +cc_library(downpour_server SRCS brpc_ps_server.cc DEPS boost eigen3 table brpc_utils ${RPC_DEPS}) +cc_library(downpour_client SRCS brpc_ps_client.cc DEPS boost eigen3 table brpc_utils ${RPC_DEPS}) cc_library(client SRCS ps_client.cc DEPS downpour_client boost ${RPC_DEPS}) cc_library(server SRCS server.cc DEPS downpour_server boost ${RPC_DEPS}) @@ -35,6 +36,5 @@ cc_library(server SRCS server.cc DEPS downpour_server boost ${RPC_DEPS}) cc_library(communicator SRCS communicator.cc DEPS scope client boost table math_function selected_rows_functor ${RPC_DEPS}) cc_library(ps_service SRCS service.cc DEPS communicator client server boost ${RPC_DEPS}) -cc_library(brpc_utils SRCS brpc_utils.cc DEPS tensor device_context ${COMMON_DEPS} ${RPC_DEPS}) cc_library(heter_server SRCS heter_server.cc DEPS brpc_utils ${COMMON_DEPS} ${RPC_DEPS}) cc_library(heter_client SRCS heter_client.cc DEPS brpc_utils ${COMMON_DEPS} ${RPC_DEPS}) diff --git a/paddle/fluid/distributed/service/brpc_ps_client.cc b/paddle/fluid/distributed/service/brpc_ps_client.cc index 4a07c54375a..e781cc4bcf4 100644 --- a/paddle/fluid/distributed/service/brpc_ps_client.cc +++ b/paddle/fluid/distributed/service/brpc_ps_client.cc @@ -134,8 +134,15 @@ int32_t BrpcPsClient::create_client2client_connection( server_ip_port.append(std::to_string(client_list[i].port)); _client_channels[i].reset(new brpc::Channel()); if (_client_channels[i]->Init(server_ip_port.c_str(), "", &options) != 0) { - LOG(ERROR) << "psclient connect to client:" << server_ip_port - << " Failed!"; + VLOG(0) << "BrpcPSClient connect to Client:" << server_ip_port + << " Failed! Try again."; + std::string int_ip_port = + GetIntTypeEndpoint(client_list[i].ip, client_list[i].port); + if (_client_channels[i]->Init(int_ip_port.c_str(), "", &options) != 0) { + LOG(ERROR) << "BrpcPSClient connect to Client:" << int_ip_port + << " Failed!"; + return -1; + } } os << server_ip_port << ","; } @@ -168,9 +175,16 @@ int32_t BrpcPsClient::initialize() { _server_channels[i][j].reset(new brpc::Channel()); if (_server_channels[i][j]->Init(server_ip_port.c_str(), "", &options) != 0) { - LOG(ERROR) << "psclient connect to server:" << server_ip_port - << " Failed!"; - return -1; + VLOG(0) << "BrpcPSclient connect to Server:" << server_ip_port + << " Failed! Try again."; + std::string int_ip_port = + GetIntTypeEndpoint(server_list[i].ip, server_list[i].port); + if (_server_channels[i][j]->Init(int_ip_port.c_str(), "", &options) != + 0) { + LOG(ERROR) << "BrpcPSclient connect to Server:" << int_ip_port + << " Failed!"; + return -1; + } } } os << server_ip_port << ","; diff --git a/paddle/fluid/distributed/service/brpc_ps_client.h b/paddle/fluid/distributed/service/brpc_ps_client.h index 17a5d53e229..82f772c2d5a 100644 --- a/paddle/fluid/distributed/service/brpc_ps_client.h +++ b/paddle/fluid/distributed/service/brpc_ps_client.h @@ -21,6 +21,7 @@ #include "brpc/channel.h" #include "brpc/controller.h" #include "brpc/server.h" +#include "paddle/fluid/distributed/service/brpc_utils.h" #include "paddle/fluid/distributed/service/ps_client.h" #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/scope.h" diff --git a/paddle/fluid/distributed/service/brpc_ps_server.cc b/paddle/fluid/distributed/service/brpc_ps_server.cc index b9afff8c439..ef497d3222a 100644 --- a/paddle/fluid/distributed/service/brpc_ps_server.cc +++ b/paddle/fluid/distributed/service/brpc_ps_server.cc @@ -13,7 +13,7 @@ // limitations under the License. #include "paddle/fluid/distributed/service/brpc_ps_server.h" - +#include #include // NOLINT #include "Eigen/Dense" #include "butil/endpoint.h" @@ -65,9 +65,17 @@ uint64_t BrpcPsServer::start(const std::string &ip, uint32_t port) { options.num_threads = trainers > num_threads ? trainers : num_threads; if (_server.Start(ip_port.c_str(), &options) != 0) { - LOG(ERROR) << "BrpcPsServer start failed, ip_port=" << ip_port; - return 0; + VLOG(0) << "BrpcPsServer start failed, ip_port= " << ip_port + << " , Try Again."; + + std::string int_ip_port = GetIntTypeEndpoint(ip, port); + + if (_server.Start(int_ip_port.c_str(), &options) != 0) { + LOG(ERROR) << "BrpcPsServer start failed, ip_port= " << int_ip_port; + return 0; + } } + VLOG(0) << "BrpcPsServer::start registe_ps_server"; _environment->registe_ps_server(ip, port, _rank); VLOG(0) << "BrpcPsServer::start wait"; diff --git a/paddle/fluid/distributed/service/brpc_ps_server.h b/paddle/fluid/distributed/service/brpc_ps_server.h index c2d0641743a..82626401527 100644 --- a/paddle/fluid/distributed/service/brpc_ps_server.h +++ b/paddle/fluid/distributed/service/brpc_ps_server.h @@ -20,6 +20,7 @@ #include #include +#include "paddle/fluid/distributed/service/brpc_utils.h" #include "paddle/fluid/distributed/service/server.h" namespace paddle { @@ -43,7 +44,6 @@ class BrpcPsServer : public PSServer { private: virtual int32_t initialize(); - mutable std::mutex mutex_; std::condition_variable cv_; bool stoped_ = false; diff --git a/paddle/fluid/distributed/service/brpc_utils.cc b/paddle/fluid/distributed/service/brpc_utils.cc index 82ec10b3271..2822c2faa20 100644 --- a/paddle/fluid/distributed/service/brpc_utils.cc +++ b/paddle/fluid/distributed/service/brpc_utils.cc @@ -13,6 +13,9 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/distributed/service/brpc_utils.h" +#include +#include +#include #include #include #include "paddle/fluid/platform/enforce.h" @@ -310,5 +313,32 @@ void DeserializeSelectedRows(framework::Variable* var, const VarMsg& msg, } } +std::string GetIntTypeEndpoint(const std::string& ip, const uint32_t& port) { + // There are usually two forms of IP address: ip(int) / ip (hostname) + // If there're some problem with DNS, or ip triggers the bug of Brpc + // We will try to get the IP address of the domain name manually again + std::string ip_port = ip + ":" + std::to_string(port); + struct hostent* hp = NULL; + hp = gethostbyname(ip.c_str()); + + if (NULL == hp) { + LOG(ERROR) << "Brpc Start failed, ip_port= " << ip_port + << " , Error infomation: " << hstrerror(h_errno); + } + + int i = 0; + char* int_ip = NULL; + + while (hp->h_addr_list[i] != NULL) { + int_ip = inet_ntoa(*(struct in_addr*)hp->h_addr_list[i]); + VLOG(0) << "Brpc Get host by name, host:" << ip << " -> ip: " << int_ip; + break; + } + + std::string str_ip = int_ip; + std::string int_ip_port = str_ip + ":" + std::to_string(port); + return int_ip_port; +} + } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/service/brpc_utils.h b/paddle/fluid/distributed/service/brpc_utils.h index 6f00adb94a9..779b765304c 100644 --- a/paddle/fluid/distributed/service/brpc_utils.h +++ b/paddle/fluid/distributed/service/brpc_utils.h @@ -14,10 +14,10 @@ limitations under the License. */ #pragma once +#include #include #include #include - #include "brpc/channel.h" #include "paddle/fluid/distributed/service/sendrecv.pb.h" #include "paddle/fluid/framework/data_type.h" @@ -82,5 +82,7 @@ void DeserializeSelectedRows(framework::Variable* var, const VarMsg& msg, butil::IOBufBytesIterator& iobuf, const platform::DeviceContext& ctx); +std::string GetIntTypeEndpoint(const std::string& ip, const uint32_t& port); + } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/service/communicator.cc b/paddle/fluid/distributed/service/communicator.cc index f0322a0cbe8..57768697299 100644 --- a/paddle/fluid/distributed/service/communicator.cc +++ b/paddle/fluid/distributed/service/communicator.cc @@ -290,7 +290,7 @@ void Communicator::RpcSendSparse(const std::string &var_name, int table_id, auto dim = tensor->value().dims()[1]; std::transform(tensor->rows().begin(), tensor->rows().end(), std::back_inserter(sparse_push_keys), - [&](int id) { return static_cast(id); }); + [&](int64_t id) { return static_cast(id); }); for (auto i = 0; i < static_cast(sparse_push_keys.size()); ++i) { push_g_vec.push_back(tensor->mutable_value()->data() + i * dim); diff --git a/paddle/fluid/distributed/service/heter_client.cc b/paddle/fluid/distributed/service/heter_client.cc index 99def0aef8e..87c71979ee6 100644 --- a/paddle/fluid/distributed/service/heter_client.cc +++ b/paddle/fluid/distributed/service/heter_client.cc @@ -22,6 +22,7 @@ #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/timer.h" +#include "paddle/fluid/string/split.h" DECLARE_int32(rpc_deadline); DECLARE_int32(pserver_timeout_ms); @@ -96,7 +97,14 @@ void HeterClient::CreateClient2XpuConnection() { for (size_t i = 0; i < xpu_list_.size(); ++i) { xpu_channels_[i].reset(new brpc::Channel()); if (xpu_channels_[i]->Init(xpu_list_[i].c_str(), "", &options) != 0) { - VLOG(0) << "HeterServer channel init fail"; + VLOG(0) << "HeterClient channel init fail. Try Again"; + auto ip_port = paddle::string::Split(xpu_list_[i], ':'); + std::string ip = ip_port[0]; + int port = std::stoi(ip_port[1]); + std::string int_ip_port = GetIntTypeEndpoint(ip, port); + if (xpu_channels_[i]->Init(int_ip_port.c_str(), "", &options) != 0) { + LOG(ERROR) << "BrpcPsServer start failed, ip_port= " << int_ip_port; + } } } } diff --git a/paddle/fluid/distributed/service/heter_server.cc b/paddle/fluid/distributed/service/heter_server.cc index bfdac348008..ea2ca09545a 100644 --- a/paddle/fluid/distributed/service/heter_server.cc +++ b/paddle/fluid/distributed/service/heter_server.cc @@ -19,6 +19,7 @@ #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/platform/timer.h" +#include "paddle/fluid/string/split.h" namespace paddle { namespace distributed { @@ -34,7 +35,14 @@ void HeterServer::StartHeterService() { server_.AddService(&service_, brpc::SERVER_DOESNT_OWN_SERVICE); brpc::ServerOptions options; if (server_.Start(endpoint_.c_str(), &options) != 0) { - VLOG(0) << "heter server start fail"; + VLOG(0) << "HeterServer start fail. Try again."; + auto ip_port = paddle::string::Split(endpoint_, ':'); + std::string ip = ip_port[0]; + int port = std::stoi(ip_port[1]); + std::string int_ip_port = GetIntTypeEndpoint(ip, port); + if (server_.Start(endpoint_.c_str(), &options) != 0) { + LOG(ERROR) << "HeterServer start failed, ip_port= " << int_ip_port; + } } else { VLOG(0) << "heter server start success! listen on " << endpoint_; } -- GitLab