From 9ea292d68b2dbcc38aa807bc10054672092de6a3 Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 16 Jul 2021 17:57:13 +0800 Subject: [PATCH] Fix bug that check response status when broadcast shuffle --- src/sql/dtl/ob_dtl_basic_channel.cpp | 26 +++++++++--------- src/sql/dtl/ob_dtl_channel_agent.cpp | 27 ++++++++++--------- src/sql/dtl/ob_dtl_channel_agent.h | 3 +-- src/sql/dtl/ob_dtl_local_channel.cpp | 20 +++++++------- src/sql/dtl/ob_dtl_rpc_channel.cpp | 12 ++------- src/sql/dtl/ob_dtl_utils.cpp | 2 +- .../engine/px/exchange/ob_px_transmit_op.cpp | 10 +++++-- 7 files changed, 49 insertions(+), 51 deletions(-) diff --git a/src/sql/dtl/ob_dtl_basic_channel.cpp b/src/sql/dtl/ob_dtl_basic_channel.cpp index 562038b942..52b86241d9 100644 --- a/src/sql/dtl/ob_dtl_basic_channel.cpp +++ b/src/sql/dtl/ob_dtl_basic_channel.cpp @@ -183,6 +183,13 @@ int ObDtlBasicChannel::wait_response() if (OB_FAIL(msg_response_.wait())) { LOG_WARN("send previous message fail", K(ret)); } + if (OB_HASH_NOT_EXIST == ret) { + if (is_drain()) { + ret = OB_SUCCESS; + } else { + ret = OB_ERR_SIGNALED_IN_PARALLEL_QUERY_SERVER; + } + } } return ret; } @@ -190,10 +197,8 @@ int ObDtlBasicChannel::wait_response() int ObDtlBasicChannel::clear_response_block() { int ret = OB_SUCCESS; - if (msg_response_.is_in_process()) { - if (OB_FAIL(msg_response_.wait())) { - LOG_WARN("send previous message fail", K(ret)); - } + if (OB_FAIL(wait_response())) { + LOG_WARN("failed to wait response", K(ret)); } msg_response_.reset_block(); return ret; @@ -684,7 +689,7 @@ int ObDtlBasicChannel::send1(std::function& proc, return ret; } -int ObDtlBasicChannel::flush(bool force_flush, bool wait_response) +int ObDtlBasicChannel::flush(bool force_flush, bool wait_resp) { int ret = OB_SUCCESS; if (force_flush == true) { @@ -724,18 +729,11 @@ int ObDtlBasicChannel::flush(bool force_flush, bool wait_response) } } while (OB_SUCC(ret)); } - if (OB_SUCC(ret) && force_flush && wait_response && msg_response_.is_in_process()) { - if (OB_FAIL(msg_response_.wait())) { + if (OB_SUCC(ret) && force_flush && wait_resp) { + if (OB_FAIL(wait_response())) { LOG_WARN("send previous message fail", K(ret), K(peer_), K(peer_id_), K(lbt())); } } - if (OB_HASH_NOT_EXIST == ret) { - if (is_drain()) { - ret = OB_SUCCESS; - } else { - ret = OB_ERR_SIGNALED_IN_PARALLEL_QUERY_SERVER; - } - } return ret; } diff --git a/src/sql/dtl/ob_dtl_channel_agent.cpp b/src/sql/dtl/ob_dtl_channel_agent.cpp index 36e970ee94..cc0be382c8 100644 --- a/src/sql/dtl/ob_dtl_channel_agent.cpp +++ b/src/sql/dtl/ob_dtl_channel_agent.cpp @@ -429,23 +429,26 @@ int ObDtlChanAgent::send_last_buffer(ObDtlLinkedBuffer*& last_buffer) return ret; } -void ObDtlChanAgent::destroy() +int ObDtlChanAgent::destroy() { + int ret = OB_SUCCESS; if (nullptr != bcast_channel_ && nullptr != current_buffer_) { dtl_buf_allocator_.free_buf(*bcast_channel_, current_buffer_); } - for (int64_t i = 0; i < bc_services_.count(); ++i) { - common::ObArray& resp = bc_services_.at(i)->resps_; - for (int64_t j = 0; j < resp.count(); ++j) { - if (OB_ISNULL(resp.at(j))) { - LOG_WARN("response is null"); - } else if (resp.at(j)->is_in_process()) { - int temp_ret = resp.at(j)->wait(); - if (OB_SUCCESS != temp_ret) { - LOG_WARN("send previous message fail", K(temp_ret)); - } - } + for (int64_t i = 0; i < local_channels_.count(); ++i) { + int temp_ret = local_channels_.at(i)->wait_response(); + if (OB_SUCCESS != temp_ret) { + ret = temp_ret; } + } + for (int64_t i = 0; i < rpc_channels_.count(); ++i) { + int temp_ret = rpc_channels_.at(i)->wait_response(); + if (OB_SUCCESS != temp_ret) { + ret = temp_ret; + } + } + for (int64_t i = 0; i < bc_services_.count(); ++i) { bc_services_.at(i)->~ObDtlBcastService(); } + return ret; } diff --git a/src/sql/dtl/ob_dtl_channel_agent.h b/src/sql/dtl/ob_dtl_channel_agent.h index 3b63a16261..1920b58e7c 100644 --- a/src/sql/dtl/ob_dtl_channel_agent.h +++ b/src/sql/dtl/ob_dtl_channel_agent.h @@ -147,8 +147,7 @@ public: int flush(); int init(dtl::ObDtlFlowControl& dfc, ObPxTaskChSet& task_ch_set, common::ObIArray& channels, int64_t tenant_id, int64_t timeout_ts); - void destroy(); - + int destroy(); private: int switch_buffer(int64_t need_size); int send_last_buffer(ObDtlLinkedBuffer*& last_buffer); diff --git a/src/sql/dtl/ob_dtl_local_channel.cpp b/src/sql/dtl/ob_dtl_local_channel.cpp index c1fe74cc23..f75099d390 100644 --- a/src/sql/dtl/ob_dtl_local_channel.cpp +++ b/src/sql/dtl/ob_dtl_local_channel.cpp @@ -169,16 +169,8 @@ int ObDtlLocalChannel::send_message(ObDtlLinkedBuffer*& buf) ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret)); } else { - if (msg_response_.is_in_process()) { - if (OB_FAIL(msg_response_.wait())) { - LOG_WARN("send previous message fail", K(ret)); - } else if (OB_HASH_NOT_EXIST == ret) { - if (is_drain()) { - ret = OB_SUCCESS; - } else { - ret = OB_ERR_SIGNALED_IN_PARALLEL_QUERY_SERVER; - } - } + if (OB_FAIL(wait_response())) { + LOG_WARN("failed to wait response", K(ret)); } if (OB_SUCC(ret) && OB_FAIL(wait_unblocking_if_blocked())) { LOG_WARN("failed to block data flow", K(ret)); @@ -206,6 +198,14 @@ int ObDtlLocalChannel::send_message(ObDtlLinkedBuffer*& buf) set_eof(); } } + // it may return 4201 after send_message and it don't call wait_response + if (OB_HASH_NOT_EXIST == ret) { + if (is_drain()) { + ret = OB_SUCCESS; + } else { + ret = OB_ERR_SIGNALED_IN_PARALLEL_QUERY_SERVER; + } + } return ret; } diff --git a/src/sql/dtl/ob_dtl_rpc_channel.cpp b/src/sql/dtl/ob_dtl_rpc_channel.cpp index 94d29a8311..d08f974437 100644 --- a/src/sql/dtl/ob_dtl_rpc_channel.cpp +++ b/src/sql/dtl/ob_dtl_rpc_channel.cpp @@ -238,16 +238,8 @@ int ObDtlRpcChannel::send_message(ObDtlLinkedBuffer*& buf) is_first = buf->is_data_msg() && 1 == buf->seq_no(); is_eof = buf->is_eof(); - if (msg_response_.is_in_process()) { - if (OB_FAIL(msg_response_.wait())) { - LOG_WARN("send previous message fail", K(ret)); - } else if (OB_HASH_NOT_EXIST == ret) { - if (is_drain()) { - ret = OB_SUCCESS; - } else { - ret = OB_ERR_SIGNALED_IN_PARALLEL_QUERY_SERVER; - } - } + if (OB_FAIL(wait_response())) { + LOG_WARN("failed to wait for response", K(ret)); } if (OB_SUCC(ret) && OB_FAIL(wait_unblocking_if_blocked())) { LOG_WARN("failed to block data flow", K(ret)); diff --git a/src/sql/dtl/ob_dtl_utils.cpp b/src/sql/dtl/ob_dtl_utils.cpp index e85030787c..f67224a71b 100644 --- a/src/sql/dtl/ob_dtl_utils.cpp +++ b/src/sql/dtl/ob_dtl_utils.cpp @@ -110,7 +110,7 @@ int ObDtlAsynSender::asyn_send() if (OB_FAIL(syn_send())) { LOG_WARN("failed to syn send message", K(ret)); } - LOG_ERROR("failed to calc batch buffer cnt", K(ret)); + LOG_TRACE("failed to calc batch buffer cnt", K(ret)); } else { dtl::ObDtlChannel* ch = NULL; int tmp_ret = OB_SUCCESS; diff --git a/src/sql/engine/px/exchange/ob_px_transmit_op.cpp b/src/sql/engine/px/exchange/ob_px_transmit_op.cpp index 623b053dd1..6f677bf310 100644 --- a/src/sql/engine/px/exchange/ob_px_transmit_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_transmit_op.cpp @@ -252,7 +252,9 @@ int ObPxTransmitOp::inner_close() { int ret = OB_SUCCESS; /* we must release channel even if there is some error happen before */ - chs_agent_.destroy(); + if (OB_FAIL(chs_agent_.destroy())) { + LOG_WARN("failed to destroy ch agent", K(ret)); + } ObDtlBasicChannel *ch = nullptr; int64_t recv_cnt = 0; for (int i = 0; i < task_channels_.count(); ++i) { @@ -271,7 +273,11 @@ int ObPxTransmitOp::inner_close() if (release_channel_ret != common::OB_SUCCESS) { LOG_WARN("release dtl channel failed", K(release_channel_ret)); } - if (OB_FAIL(ObTransmitOp::inner_close())) { + int tmp_ret = OB_SUCCESS; + if (OB_SUCCESS != (tmp_ret = ObTransmitOp::inner_close())) { + if (OB_SUCC(ret)) { + ret = tmp_ret; + } LOG_WARN("fail close op", K(ret)); } return ret; -- GitLab