Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
机器未来
Paddle
提交
78d37c3f
P
Paddle
项目概览
机器未来
/
Paddle
与 Fork 源项目一致
Fork自
PaddlePaddle / Paddle
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
Paddle
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
78d37c3f
编写于
1月 28, 2021
作者:
C
Chengmo
提交者:
GitHub
1月 28, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
【Paddle.Fleet】Fix brpc get hostname (#30703)
* fix Brpc get hostname
上级
3491acfb
变更
9
显示空白变更内容
内联
并排
Showing
9 changed file
with
86 addition
and
15 deletion
+86
-15
paddle/fluid/distributed/service/CMakeLists.txt
paddle/fluid/distributed/service/CMakeLists.txt
+3
-3
paddle/fluid/distributed/service/brpc_ps_client.cc
paddle/fluid/distributed/service/brpc_ps_client.cc
+19
-5
paddle/fluid/distributed/service/brpc_ps_client.h
paddle/fluid/distributed/service/brpc_ps_client.h
+1
-0
paddle/fluid/distributed/service/brpc_ps_server.cc
paddle/fluid/distributed/service/brpc_ps_server.cc
+11
-3
paddle/fluid/distributed/service/brpc_ps_server.h
paddle/fluid/distributed/service/brpc_ps_server.h
+1
-1
paddle/fluid/distributed/service/brpc_utils.cc
paddle/fluid/distributed/service/brpc_utils.cc
+30
-0
paddle/fluid/distributed/service/brpc_utils.h
paddle/fluid/distributed/service/brpc_utils.h
+3
-1
paddle/fluid/distributed/service/heter_client.cc
paddle/fluid/distributed/service/heter_client.cc
+9
-1
paddle/fluid/distributed/service/heter_server.cc
paddle/fluid/distributed/service/heter_server.cc
+9
-1
未找到文件。
paddle/fluid/distributed/service/CMakeLists.txt
浏览文件 @
78d37c3f
...
...
@@ -25,9 +25,10 @@ set_source_files_properties(client.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMP
set_source_files_properties
(
ps_client.cc PROPERTIES COMPILE_FLAGS
${
DISTRIBUTE_COMPILE_FLAGS
}
)
set_source_files_properties
(
server.cc PROPERTIES COMPILE_FLAGS
${
DISTRIBUTE_COMPILE_FLAGS
}
)
cc_library
(
brpc_utils SRCS brpc_utils.cc DEPS tensor device_context
${
COMMON_DEPS
}
${
RPC_DEPS
}
)
cc_library
(
downpour_server SRCS brpc_ps_server.cc DEPS boost eigen3 table
${
RPC_DEPS
}
)
cc_library
(
downpour_client SRCS brpc_ps_client.cc DEPS boost eigen3 table
${
RPC_DEPS
}
)
cc_library
(
downpour_server SRCS brpc_ps_server.cc DEPS boost eigen3 table
brpc_utils
${
RPC_DEPS
}
)
cc_library
(
downpour_client SRCS brpc_ps_client.cc DEPS boost eigen3 table
brpc_utils
${
RPC_DEPS
}
)
cc_library
(
client SRCS ps_client.cc DEPS downpour_client boost
${
RPC_DEPS
}
)
cc_library
(
server SRCS server.cc DEPS downpour_server boost
${
RPC_DEPS
}
)
...
...
@@ -35,6 +36,5 @@ cc_library(server SRCS server.cc DEPS downpour_server boost ${RPC_DEPS})
cc_library
(
communicator SRCS communicator.cc DEPS scope client boost table math_function selected_rows_functor
${
RPC_DEPS
}
)
cc_library
(
ps_service SRCS service.cc DEPS communicator client server boost
${
RPC_DEPS
}
)
cc_library
(
brpc_utils SRCS brpc_utils.cc DEPS tensor device_context
${
COMMON_DEPS
}
${
RPC_DEPS
}
)
cc_library
(
heter_server SRCS heter_server.cc DEPS brpc_utils
${
COMMON_DEPS
}
${
RPC_DEPS
}
)
cc_library
(
heter_client SRCS heter_client.cc DEPS brpc_utils
${
COMMON_DEPS
}
${
RPC_DEPS
}
)
paddle/fluid/distributed/service/brpc_ps_client.cc
浏览文件 @
78d37c3f
...
...
@@ -134,8 +134,15 @@ int32_t BrpcPsClient::create_client2client_connection(
server_ip_port
.
append
(
std
::
to_string
(
client_list
[
i
].
port
));
_client_channels
[
i
].
reset
(
new
brpc
::
Channel
());
if
(
_client_channels
[
i
]
->
Init
(
server_ip_port
.
c_str
(),
""
,
&
options
)
!=
0
)
{
LOG
(
ERROR
)
<<
"psclient connect to client:"
<<
server_ip_port
VLOG
(
0
)
<<
"BrpcPSClient connect to Client:"
<<
server_ip_port
<<
" Failed! Try again."
;
std
::
string
int_ip_port
=
GetIntTypeEndpoint
(
client_list
[
i
].
ip
,
client_list
[
i
].
port
);
if
(
_client_channels
[
i
]
->
Init
(
int_ip_port
.
c_str
(),
""
,
&
options
)
!=
0
)
{
LOG
(
ERROR
)
<<
"BrpcPSClient connect to Client:"
<<
int_ip_port
<<
" Failed!"
;
return
-
1
;
}
}
os
<<
server_ip_port
<<
","
;
}
...
...
@@ -168,11 +175,18 @@ int32_t BrpcPsClient::initialize() {
_server_channels
[
i
][
j
].
reset
(
new
brpc
::
Channel
());
if
(
_server_channels
[
i
][
j
]
->
Init
(
server_ip_port
.
c_str
(),
""
,
&
options
)
!=
0
)
{
LOG
(
ERROR
)
<<
"psclient connect to server:"
<<
server_ip_port
VLOG
(
0
)
<<
"BrpcPSclient connect to Server:"
<<
server_ip_port
<<
" Failed! Try again."
;
std
::
string
int_ip_port
=
GetIntTypeEndpoint
(
server_list
[
i
].
ip
,
server_list
[
i
].
port
);
if
(
_server_channels
[
i
][
j
]
->
Init
(
int_ip_port
.
c_str
(),
""
,
&
options
)
!=
0
)
{
LOG
(
ERROR
)
<<
"BrpcPSclient connect to Server:"
<<
int_ip_port
<<
" Failed!"
;
return
-
1
;
}
}
}
os
<<
server_ip_port
<<
","
;
}
// 启动client探听接口, 并相互建立连接
...
...
paddle/fluid/distributed/service/brpc_ps_client.h
浏览文件 @
78d37c3f
...
...
@@ -21,6 +21,7 @@
#include "brpc/channel.h"
#include "brpc/controller.h"
#include "brpc/server.h"
#include "paddle/fluid/distributed/service/brpc_utils.h"
#include "paddle/fluid/distributed/service/ps_client.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
...
...
paddle/fluid/distributed/service/brpc_ps_server.cc
浏览文件 @
78d37c3f
...
...
@@ -13,7 +13,7 @@
// limitations under the License.
#include "paddle/fluid/distributed/service/brpc_ps_server.h"
#include <netdb.h>
#include <thread> // NOLINT
#include "Eigen/Dense"
#include "butil/endpoint.h"
...
...
@@ -65,9 +65,17 @@ uint64_t BrpcPsServer::start(const std::string &ip, uint32_t port) {
options
.
num_threads
=
trainers
>
num_threads
?
trainers
:
num_threads
;
if
(
_server
.
Start
(
ip_port
.
c_str
(),
&
options
)
!=
0
)
{
LOG
(
ERROR
)
<<
"BrpcPsServer start failed, ip_port="
<<
ip_port
;
VLOG
(
0
)
<<
"BrpcPsServer start failed, ip_port= "
<<
ip_port
<<
" , Try Again."
;
std
::
string
int_ip_port
=
GetIntTypeEndpoint
(
ip
,
port
);
if
(
_server
.
Start
(
int_ip_port
.
c_str
(),
&
options
)
!=
0
)
{
LOG
(
ERROR
)
<<
"BrpcPsServer start failed, ip_port= "
<<
int_ip_port
;
return
0
;
}
}
VLOG
(
0
)
<<
"BrpcPsServer::start registe_ps_server"
;
_environment
->
registe_ps_server
(
ip
,
port
,
_rank
);
VLOG
(
0
)
<<
"BrpcPsServer::start wait"
;
...
...
paddle/fluid/distributed/service/brpc_ps_server.h
浏览文件 @
78d37c3f
...
...
@@ -20,6 +20,7 @@
#include <memory>
#include <vector>
#include "paddle/fluid/distributed/service/brpc_utils.h"
#include "paddle/fluid/distributed/service/server.h"
namespace
paddle
{
...
...
@@ -43,7 +44,6 @@ class BrpcPsServer : public PSServer {
private:
virtual
int32_t
initialize
();
mutable
std
::
mutex
mutex_
;
std
::
condition_variable
cv_
;
bool
stoped_
=
false
;
...
...
paddle/fluid/distributed/service/brpc_utils.cc
浏览文件 @
78d37c3f
...
...
@@ -13,6 +13,9 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/distributed/service/brpc_utils.h"
#include <arpa/inet.h>
#include <netdb.h>
#include <netinet/in.h>
#include <limits>
#include <memory>
#include "paddle/fluid/platform/enforce.h"
...
...
@@ -310,5 +313,32 @@ void DeserializeSelectedRows(framework::Variable* var, const VarMsg& msg,
}
}
std
::
string
GetIntTypeEndpoint
(
const
std
::
string
&
ip
,
const
uint32_t
&
port
)
{
// There are usually two forms of IP address: ip(int) / ip (hostname)
// If there're some problem with DNS, or ip triggers the bug of Brpc
// We will try to get the IP address of the domain name manually again
std
::
string
ip_port
=
ip
+
":"
+
std
::
to_string
(
port
);
struct
hostent
*
hp
=
NULL
;
hp
=
gethostbyname
(
ip
.
c_str
());
if
(
NULL
==
hp
)
{
LOG
(
ERROR
)
<<
"Brpc Start failed, ip_port= "
<<
ip_port
<<
" , Error infomation: "
<<
hstrerror
(
h_errno
);
}
int
i
=
0
;
char
*
int_ip
=
NULL
;
while
(
hp
->
h_addr_list
[
i
]
!=
NULL
)
{
int_ip
=
inet_ntoa
(
*
(
struct
in_addr
*
)
hp
->
h_addr_list
[
i
]);
VLOG
(
0
)
<<
"Brpc Get host by name, host:"
<<
ip
<<
" -> ip: "
<<
int_ip
;
break
;
}
std
::
string
str_ip
=
int_ip
;
std
::
string
int_ip_port
=
str_ip
+
":"
+
std
::
to_string
(
port
);
return
int_ip_port
;
}
}
// namespace distributed
}
// namespace paddle
paddle/fluid/distributed/service/brpc_utils.h
浏览文件 @
78d37c3f
...
...
@@ -14,10 +14,10 @@ limitations under the License. */
#pragma once
#include <netdb.h>
#include <iostream>
#include <string>
#include <vector>
#include "brpc/channel.h"
#include "paddle/fluid/distributed/service/sendrecv.pb.h"
#include "paddle/fluid/framework/data_type.h"
...
...
@@ -82,5 +82,7 @@ void DeserializeSelectedRows(framework::Variable* var, const VarMsg& msg,
butil
::
IOBufBytesIterator
&
iobuf
,
const
platform
::
DeviceContext
&
ctx
);
std
::
string
GetIntTypeEndpoint
(
const
std
::
string
&
ip
,
const
uint32_t
&
port
);
}
// namespace distributed
}
// namespace paddle
paddle/fluid/distributed/service/heter_client.cc
浏览文件 @
78d37c3f
...
...
@@ -22,6 +22,7 @@
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/timer.h"
#include "paddle/fluid/string/split.h"
DECLARE_int32
(
rpc_deadline
);
DECLARE_int32
(
pserver_timeout_ms
);
...
...
@@ -96,7 +97,14 @@ void HeterClient::CreateClient2XpuConnection() {
for
(
size_t
i
=
0
;
i
<
xpu_list_
.
size
();
++
i
)
{
xpu_channels_
[
i
].
reset
(
new
brpc
::
Channel
());
if
(
xpu_channels_
[
i
]
->
Init
(
xpu_list_
[
i
].
c_str
(),
""
,
&
options
)
!=
0
)
{
VLOG
(
0
)
<<
"HeterServer channel init fail"
;
VLOG
(
0
)
<<
"HeterClient channel init fail. Try Again"
;
auto
ip_port
=
paddle
::
string
::
Split
(
xpu_list_
[
i
],
':'
);
std
::
string
ip
=
ip_port
[
0
];
int
port
=
std
::
stoi
(
ip_port
[
1
]);
std
::
string
int_ip_port
=
GetIntTypeEndpoint
(
ip
,
port
);
if
(
xpu_channels_
[
i
]
->
Init
(
int_ip_port
.
c_str
(),
""
,
&
options
)
!=
0
)
{
LOG
(
ERROR
)
<<
"BrpcPsServer start failed, ip_port= "
<<
int_ip_port
;
}
}
}
}
...
...
paddle/fluid/distributed/service/heter_server.cc
浏览文件 @
78d37c3f
...
...
@@ -19,6 +19,7 @@
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/platform/timer.h"
#include "paddle/fluid/string/split.h"
namespace
paddle
{
namespace
distributed
{
...
...
@@ -34,7 +35,14 @@ void HeterServer::StartHeterService() {
server_
.
AddService
(
&
service_
,
brpc
::
SERVER_DOESNT_OWN_SERVICE
);
brpc
::
ServerOptions
options
;
if
(
server_
.
Start
(
endpoint_
.
c_str
(),
&
options
)
!=
0
)
{
VLOG
(
0
)
<<
"heter server start fail"
;
VLOG
(
0
)
<<
"HeterServer start fail. Try again."
;
auto
ip_port
=
paddle
::
string
::
Split
(
endpoint_
,
':'
);
std
::
string
ip
=
ip_port
[
0
];
int
port
=
std
::
stoi
(
ip_port
[
1
]);
std
::
string
int_ip_port
=
GetIntTypeEndpoint
(
ip
,
port
);
if
(
server_
.
Start
(
endpoint_
.
c_str
(),
&
options
)
!=
0
)
{
LOG
(
ERROR
)
<<
"HeterServer start failed, ip_port= "
<<
int_ip_port
;
}
}
else
{
VLOG
(
0
)
<<
"heter server start success! listen on "
<<
endpoint_
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录