diff --git a/paddle/fluid/distributed/service/CMakeLists.txt b/paddle/fluid/distributed/service/CMakeLists.txt index 6d16ec1dda96e681c15b91cf7fa058225b7a4e2b..bb3f6f1174da9d49a8407ec8db16a5a2aa2a8336 100644 --- a/paddle/fluid/distributed/service/CMakeLists.txt +++ b/paddle/fluid/distributed/service/CMakeLists.txt @@ -25,9 +25,10 @@ set_source_files_properties(client.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMP set_source_files_properties(ps_client.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(server.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) +cc_library(brpc_utils SRCS brpc_utils.cc DEPS tensor device_context ${COMMON_DEPS} ${RPC_DEPS}) -cc_library(downpour_server SRCS brpc_ps_server.cc DEPS boost eigen3 table ${RPC_DEPS}) -cc_library(downpour_client SRCS brpc_ps_client.cc DEPS boost eigen3 table ${RPC_DEPS}) +cc_library(downpour_server SRCS brpc_ps_server.cc DEPS boost eigen3 table brpc_utils ${RPC_DEPS}) +cc_library(downpour_client SRCS brpc_ps_client.cc DEPS boost eigen3 table brpc_utils ${RPC_DEPS}) cc_library(client SRCS ps_client.cc DEPS downpour_client boost ${RPC_DEPS}) cc_library(server SRCS server.cc DEPS downpour_server boost ${RPC_DEPS}) @@ -35,6 +36,5 @@ cc_library(server SRCS server.cc DEPS downpour_server boost ${RPC_DEPS}) cc_library(communicator SRCS communicator.cc DEPS scope client boost table math_function selected_rows_functor ${RPC_DEPS}) cc_library(ps_service SRCS service.cc DEPS communicator client server boost ${RPC_DEPS}) -cc_library(brpc_utils SRCS brpc_utils.cc DEPS tensor device_context ${COMMON_DEPS} ${RPC_DEPS}) cc_library(heter_server SRCS heter_server.cc DEPS brpc_utils ${COMMON_DEPS} ${RPC_DEPS}) cc_library(heter_client SRCS heter_client.cc DEPS brpc_utils ${COMMON_DEPS} ${RPC_DEPS}) diff --git a/paddle/fluid/distributed/service/brpc_ps_client.cc b/paddle/fluid/distributed/service/brpc_ps_client.cc index 4a07c54375ae1a94ddc104132c414deba997088c..e781cc4bcf485ddd5f2b1af96c75d77ecd1a1b2e 100644 --- a/paddle/fluid/distributed/service/brpc_ps_client.cc +++ b/paddle/fluid/distributed/service/brpc_ps_client.cc @@ -134,8 +134,15 @@ int32_t BrpcPsClient::create_client2client_connection( server_ip_port.append(std::to_string(client_list[i].port)); _client_channels[i].reset(new brpc::Channel()); if (_client_channels[i]->Init(server_ip_port.c_str(), "", &options) != 0) { - LOG(ERROR) << "psclient connect to client:" << server_ip_port - << " Failed!"; + VLOG(0) << "BrpcPSClient connect to Client:" << server_ip_port + << " Failed! Try again."; + std::string int_ip_port = + GetIntTypeEndpoint(client_list[i].ip, client_list[i].port); + if (_client_channels[i]->Init(int_ip_port.c_str(), "", &options) != 0) { + LOG(ERROR) << "BrpcPSClient connect to Client:" << int_ip_port + << " Failed!"; + return -1; + } } os << server_ip_port << ","; } @@ -168,9 +175,16 @@ int32_t BrpcPsClient::initialize() { _server_channels[i][j].reset(new brpc::Channel()); if (_server_channels[i][j]->Init(server_ip_port.c_str(), "", &options) != 0) { - LOG(ERROR) << "psclient connect to server:" << server_ip_port - << " Failed!"; - return -1; + VLOG(0) << "BrpcPSclient connect to Server:" << server_ip_port + << " Failed! Try again."; + std::string int_ip_port = + GetIntTypeEndpoint(server_list[i].ip, server_list[i].port); + if (_server_channels[i][j]->Init(int_ip_port.c_str(), "", &options) != + 0) { + LOG(ERROR) << "BrpcPSclient connect to Server:" << int_ip_port + << " Failed!"; + return -1; + } } } os << server_ip_port << ","; diff --git a/paddle/fluid/distributed/service/brpc_ps_client.h b/paddle/fluid/distributed/service/brpc_ps_client.h index 17a5d53e229dcb3937d93fffb3a0abf1b2678dc1..82f772c2d5adeeb4b941f82e5a755d9a3701c64b 100644 --- a/paddle/fluid/distributed/service/brpc_ps_client.h +++ b/paddle/fluid/distributed/service/brpc_ps_client.h @@ -21,6 +21,7 @@ #include "brpc/channel.h" #include "brpc/controller.h" #include "brpc/server.h" +#include "paddle/fluid/distributed/service/brpc_utils.h" #include "paddle/fluid/distributed/service/ps_client.h" #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/scope.h" diff --git a/paddle/fluid/distributed/service/brpc_ps_server.cc b/paddle/fluid/distributed/service/brpc_ps_server.cc index b9afff8c4390620f9033b057d5bc96466f99eeff..ef497d3222aa42eb8ce6b3bb8c365e84e5394b31 100644 --- a/paddle/fluid/distributed/service/brpc_ps_server.cc +++ b/paddle/fluid/distributed/service/brpc_ps_server.cc @@ -13,7 +13,7 @@ // limitations under the License. #include "paddle/fluid/distributed/service/brpc_ps_server.h" - +#include #include // NOLINT #include "Eigen/Dense" #include "butil/endpoint.h" @@ -65,9 +65,17 @@ uint64_t BrpcPsServer::start(const std::string &ip, uint32_t port) { options.num_threads = trainers > num_threads ? trainers : num_threads; if (_server.Start(ip_port.c_str(), &options) != 0) { - LOG(ERROR) << "BrpcPsServer start failed, ip_port=" << ip_port; - return 0; + VLOG(0) << "BrpcPsServer start failed, ip_port= " << ip_port + << " , Try Again."; + + std::string int_ip_port = GetIntTypeEndpoint(ip, port); + + if (_server.Start(int_ip_port.c_str(), &options) != 0) { + LOG(ERROR) << "BrpcPsServer start failed, ip_port= " << int_ip_port; + return 0; + } } + VLOG(0) << "BrpcPsServer::start registe_ps_server"; _environment->registe_ps_server(ip, port, _rank); VLOG(0) << "BrpcPsServer::start wait"; diff --git a/paddle/fluid/distributed/service/brpc_ps_server.h b/paddle/fluid/distributed/service/brpc_ps_server.h index c2d0641743a95ed728958024c3b923b5a3253cef..8262640152772b12790810894158368263c39d60 100644 --- a/paddle/fluid/distributed/service/brpc_ps_server.h +++ b/paddle/fluid/distributed/service/brpc_ps_server.h @@ -20,6 +20,7 @@ #include #include +#include "paddle/fluid/distributed/service/brpc_utils.h" #include "paddle/fluid/distributed/service/server.h" namespace paddle { @@ -43,7 +44,6 @@ class BrpcPsServer : public PSServer { private: virtual int32_t initialize(); - mutable std::mutex mutex_; std::condition_variable cv_; bool stoped_ = false; diff --git a/paddle/fluid/distributed/service/brpc_utils.cc b/paddle/fluid/distributed/service/brpc_utils.cc index 82ec10b327197d1d9d86f212f724a2130edef5fc..2822c2faa204051330ebabe68d04fc70f23af27e 100644 --- a/paddle/fluid/distributed/service/brpc_utils.cc +++ b/paddle/fluid/distributed/service/brpc_utils.cc @@ -13,6 +13,9 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/fluid/distributed/service/brpc_utils.h" +#include +#include +#include #include #include #include "paddle/fluid/platform/enforce.h" @@ -310,5 +313,32 @@ void DeserializeSelectedRows(framework::Variable* var, const VarMsg& msg, } } +std::string GetIntTypeEndpoint(const std::string& ip, const uint32_t& port) { + // There are usually two forms of IP address: ip(int) / ip (hostname) + // If there're some problem with DNS, or ip triggers the bug of Brpc + // We will try to get the IP address of the domain name manually again + std::string ip_port = ip + ":" + std::to_string(port); + struct hostent* hp = NULL; + hp = gethostbyname(ip.c_str()); + + if (NULL == hp) { + LOG(ERROR) << "Brpc Start failed, ip_port= " << ip_port + << " , Error infomation: " << hstrerror(h_errno); + } + + int i = 0; + char* int_ip = NULL; + + while (hp->h_addr_list[i] != NULL) { + int_ip = inet_ntoa(*(struct in_addr*)hp->h_addr_list[i]); + VLOG(0) << "Brpc Get host by name, host:" << ip << " -> ip: " << int_ip; + break; + } + + std::string str_ip = int_ip; + std::string int_ip_port = str_ip + ":" + std::to_string(port); + return int_ip_port; +} + } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/service/brpc_utils.h b/paddle/fluid/distributed/service/brpc_utils.h index 6f00adb94a9ddcaa48125a9194291fe0351b1198..779b765304c4d97b8a39debf5175b14b3e8c6e41 100644 --- a/paddle/fluid/distributed/service/brpc_utils.h +++ b/paddle/fluid/distributed/service/brpc_utils.h @@ -14,10 +14,10 @@ limitations under the License. */ #pragma once +#include #include #include #include - #include "brpc/channel.h" #include "paddle/fluid/distributed/service/sendrecv.pb.h" #include "paddle/fluid/framework/data_type.h" @@ -82,5 +82,7 @@ void DeserializeSelectedRows(framework::Variable* var, const VarMsg& msg, butil::IOBufBytesIterator& iobuf, const platform::DeviceContext& ctx); +std::string GetIntTypeEndpoint(const std::string& ip, const uint32_t& port); + } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/service/communicator.cc b/paddle/fluid/distributed/service/communicator.cc index f0322a0cbe8f52bcb2c663a7dbf470e862c579d3..577686972990f4ae482ac9e9b8c6cd1053d0ead9 100644 --- a/paddle/fluid/distributed/service/communicator.cc +++ b/paddle/fluid/distributed/service/communicator.cc @@ -290,7 +290,7 @@ void Communicator::RpcSendSparse(const std::string &var_name, int table_id, auto dim = tensor->value().dims()[1]; std::transform(tensor->rows().begin(), tensor->rows().end(), std::back_inserter(sparse_push_keys), - [&](int id) { return static_cast(id); }); + [&](int64_t id) { return static_cast(id); }); for (auto i = 0; i < static_cast(sparse_push_keys.size()); ++i) { push_g_vec.push_back(tensor->mutable_value()->data() + i * dim); diff --git a/paddle/fluid/distributed/service/heter_client.cc b/paddle/fluid/distributed/service/heter_client.cc index 99def0aef8eeed8e48ea618fbdd516bb690e77c8..87c71979ee6bcc222f6637b66ddd4fa3c035f841 100644 --- a/paddle/fluid/distributed/service/heter_client.cc +++ b/paddle/fluid/distributed/service/heter_client.cc @@ -22,6 +22,7 @@ #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/timer.h" +#include "paddle/fluid/string/split.h" DECLARE_int32(rpc_deadline); DECLARE_int32(pserver_timeout_ms); @@ -96,7 +97,14 @@ void HeterClient::CreateClient2XpuConnection() { for (size_t i = 0; i < xpu_list_.size(); ++i) { xpu_channels_[i].reset(new brpc::Channel()); if (xpu_channels_[i]->Init(xpu_list_[i].c_str(), "", &options) != 0) { - VLOG(0) << "HeterServer channel init fail"; + VLOG(0) << "HeterClient channel init fail. Try Again"; + auto ip_port = paddle::string::Split(xpu_list_[i], ':'); + std::string ip = ip_port[0]; + int port = std::stoi(ip_port[1]); + std::string int_ip_port = GetIntTypeEndpoint(ip, port); + if (xpu_channels_[i]->Init(int_ip_port.c_str(), "", &options) != 0) { + LOG(ERROR) << "BrpcPsServer start failed, ip_port= " << int_ip_port; + } } } } diff --git a/paddle/fluid/distributed/service/heter_server.cc b/paddle/fluid/distributed/service/heter_server.cc index bfdac348008d8b4864c8a92623b52e72ec46110f..ea2ca09545a49d10e269656a52abce526c1945a1 100644 --- a/paddle/fluid/distributed/service/heter_server.cc +++ b/paddle/fluid/distributed/service/heter_server.cc @@ -19,6 +19,7 @@ #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/platform/timer.h" +#include "paddle/fluid/string/split.h" namespace paddle { namespace distributed { @@ -34,7 +35,14 @@ void HeterServer::StartHeterService() { server_.AddService(&service_, brpc::SERVER_DOESNT_OWN_SERVICE); brpc::ServerOptions options; if (server_.Start(endpoint_.c_str(), &options) != 0) { - VLOG(0) << "heter server start fail"; + VLOG(0) << "HeterServer start fail. Try again."; + auto ip_port = paddle::string::Split(endpoint_, ':'); + std::string ip = ip_port[0]; + int port = std::stoi(ip_port[1]); + std::string int_ip_port = GetIntTypeEndpoint(ip, port); + if (server_.Start(endpoint_.c_str(), &options) != 0) { + LOG(ERROR) << "HeterServer start failed, ip_port= " << int_ip_port; + } } else { VLOG(0) << "heter server start success! listen on " << endpoint_; }