未验证 提交 f5caf9c5 编写于 作者: Y Yuang Liu 提交者: GitHub

[fleet_executor] Add retry to the message bus's send. Use unique_lock instead...

[fleet_executor] Add retry to the message bus's send. Use unique_lock instead of calling lock(). (#37087)

* use unique lock, add retry

* bug fix
上级 b4e25436
......@@ -42,9 +42,8 @@ bool Interceptor::EnqueueRemoteInterceptorMessage(
// Called by Carrier, enqueue an InterceptorMessage to remote mailbox
VLOG(3) << "Enqueue message: " << interceptor_message.message_type()
<< " into " << interceptor_id_ << "'s remote mailbox.";
remote_mailbox_mutex_.lock();
std::unique_lock<std::mutex> lock(remote_mailbox_mutex_);
remote_mailbox_.push(interceptor_message);
remote_mailbox_mutex_.unlock();
return true;
}
......
......@@ -48,15 +48,25 @@ bool MessageBus::Send(const InterceptorMessage& interceptor_message) {
int64_t src_id = interceptor_message.src_id();
int64_t dst_id = interceptor_message.dst_id();
if (IsSameRank(src_id, dst_id)) {
VLOG(3) << "Send a message from: " << src_id << " to " << dst_id
<< " within a same rank.";
VLOG(3) << "Send a message from rank " << src_id << " to rank " << dst_id
<< ", which are same ranks.";
return SendIntraRank(interceptor_message);
} else {
VLOG(3) << "Send a message from: " << src_id << " to " << dst_id
<< " between different ranks.";
VLOG(3) << "Send a message from rank " << src_id << " to rank " << dst_id
<< ", which are different ranks.";
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \
!defined(PADDLE_WITH_ASCEND_CL)
return SendInterRank(interceptor_message);
int retry_time = 0; // message bus will retry sending for 10 times
while (retry_time < 10) {
++retry_time;
if (SendInterRank(interceptor_message)) {
VLOG(3) << "Message bus sends inter rank successfully with "
<< retry_time << " times retries.";
return true;
}
}
VLOG(3) << "Message bus sends inter rank fail after 10 times retries.";
return false;
#else
PADDLE_THROW(platform::errors::Unavailable(
"Fleet executor does not support sending message between different "
......@@ -134,6 +144,7 @@ bool MessageBus::SendInterRank(const InterceptorMessage& interceptor_message) {
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = "baidu_std";
options.connect_timeout_ms = 1000;
options.timeout_ms = 1000;
options.max_retry = 5;
PADDLE_ENFORCE_EQ(
......@@ -149,11 +160,11 @@ bool MessageBus::SendInterRank(const InterceptorMessage& interceptor_message) {
VLOG(3) << "Message bus: brpc sends success.";
return true;
} else {
VLOG(3) << "Message bus: InterceptorMessageService error.";
VLOG(4) << "Message bus: InterceptorMessageService error.";
return false;
}
} else {
VLOG(3) << "Message bus: brpc sends failed with error text: "
VLOG(4) << "Message bus: brpc sends failed with error text: "
<< ctrl.ErrorText();
return false;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册