diff --git a/paddle/fluid/distributed/fleet_executor/interceptor.cc b/paddle/fluid/distributed/fleet_executor/interceptor.cc index 2bee99d183b996c3564eac14210968c666b56c0f..7a87e3e6a006de6d706ee5b3580066d303e219f2 100644 --- a/paddle/fluid/distributed/fleet_executor/interceptor.cc +++ b/paddle/fluid/distributed/fleet_executor/interceptor.cc @@ -42,9 +42,8 @@ bool Interceptor::EnqueueRemoteInterceptorMessage( // Called by Carrier, enqueue an InterceptorMessage to remote mailbox VLOG(3) << "Enqueue message: " << interceptor_message.message_type() << " into " << interceptor_id_ << "'s remote mailbox."; - remote_mailbox_mutex_.lock(); + std::unique_lock lock(remote_mailbox_mutex_); remote_mailbox_.push(interceptor_message); - remote_mailbox_mutex_.unlock(); return true; } diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.cc b/paddle/fluid/distributed/fleet_executor/message_bus.cc index 6853768c84839304d0fa536c2f2e229f61fc0584..0094dbd1f10a1acce1d31d94641e19fa416196c6 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.cc +++ b/paddle/fluid/distributed/fleet_executor/message_bus.cc @@ -48,15 +48,25 @@ bool MessageBus::Send(const InterceptorMessage& interceptor_message) { int64_t src_id = interceptor_message.src_id(); int64_t dst_id = interceptor_message.dst_id(); if (IsSameRank(src_id, dst_id)) { - VLOG(3) << "Send a message from: " << src_id << " to " << dst_id - << " within a same rank."; + VLOG(3) << "Send a message from rank " << src_id << " to rank " << dst_id + << ", which are same ranks."; return SendIntraRank(interceptor_message); } else { - VLOG(3) << "Send a message from: " << src_id << " to " << dst_id - << " between different ranks."; + VLOG(3) << "Send a message from rank " << src_id << " to rank " << dst_id + << ", which are different ranks."; #if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ !defined(PADDLE_WITH_ASCEND_CL) - return SendInterRank(interceptor_message); + int retry_time = 0; // message bus will retry sending for 10 times + while (retry_time < 10) { + ++retry_time; + if (SendInterRank(interceptor_message)) { + VLOG(3) << "Message bus sends inter rank successfully with " + << retry_time << " times retries."; + return true; + } + } + VLOG(3) << "Message bus sends inter rank fail after 10 times retries."; + return false; #else PADDLE_THROW(platform::errors::Unavailable( "Fleet executor does not support sending message between different " @@ -134,6 +144,7 @@ bool MessageBus::SendInterRank(const InterceptorMessage& interceptor_message) { brpc::Channel channel; brpc::ChannelOptions options; options.protocol = "baidu_std"; + options.connect_timeout_ms = 1000; options.timeout_ms = 1000; options.max_retry = 5; PADDLE_ENFORCE_EQ( @@ -149,11 +160,11 @@ bool MessageBus::SendInterRank(const InterceptorMessage& interceptor_message) { VLOG(3) << "Message bus: brpc sends success."; return true; } else { - VLOG(3) << "Message bus: InterceptorMessageService error."; + VLOG(4) << "Message bus: InterceptorMessageService error."; return false; } } else { - VLOG(3) << "Message bus: brpc sends failed with error text: " + VLOG(4) << "Message bus: brpc sends failed with error text: " << ctrl.ErrorText(); return false; }