From 1abd119e0803f67d0d7a23207bc9be69b75a4c5e Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 17 Mar 2023 03:11:04 +0000 Subject: [PATCH] revert 'Reset PX DataHub whole msg in rescan scenario' --- .../px/datahub/components/ob_dh_range_dist_wf.cpp | 2 -- src/sql/engine/px/datahub/ob_dh_msg_provider.h | 13 ++++--------- src/sql/engine/px/ob_px_sqc_proxy.h | 13 ------------- 3 files changed, 4 insertions(+), 24 deletions(-) diff --git a/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp b/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp index 2bbaffca63..638edc7791 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp +++ b/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp @@ -298,8 +298,6 @@ int ObRDWFPieceMsgCtx::send_whole_msg(common::ObIArray &sqcs) void ObRDWFPieceMsgCtx::reset_resource() { received_ = 0; - infos_.reset(); - arena_alloc_.reset(); } int ObRDWFWholeMsg::assign(const ObRDWFWholeMsg &msg) diff --git a/src/sql/engine/px/datahub/ob_dh_msg_provider.h b/src/sql/engine/px/datahub/ob_dh_msg_provider.h index 4639eb8d35..5df3d86846 100644 --- a/src/sql/engine/px/datahub/ob_dh_msg_provider.h +++ b/src/sql/engine/px/datahub/ob_dh_msg_provider.h @@ -25,26 +25,20 @@ namespace sql class ObPxDatahubDataProvider { public: - ObPxDatahubDataProvider() - : op_id_(-1), msg_type_(dtl::TESTING), send_msg_cnt_(0), msg_set_(false) - { - } virtual int get_msg_nonblock(const dtl::ObDtlMsg *&msg, int64_t timeout_ts) = 0; - virtual void reset() { msg_set_ = false; } + virtual void reset() {} TO_STRING_KV(K_(op_id), K_(msg_type)); uint64_t op_id_; // 注册本 provider 的算子 id,用于 provder 数组里寻址对应 provider dtl::ObDtlMsgType msg_type_; - volatile int64_t send_msg_cnt_; - bool msg_set_; }; template class ObWholeMsgProvider : public ObPxDatahubDataProvider { public: - ObWholeMsgProvider() {} + ObWholeMsgProvider() : msg_set_(false) {} virtual ~ObWholeMsgProvider() = default; - virtual void reset() override { msg_.reset(); ObPxDatahubDataProvider::reset(); } + virtual void reset() override { msg_.reset(); msg_set_ = false; } int get_msg_nonblock(const dtl::ObDtlMsg *&msg, int64_t timeout_ts) { int ret = OB_SUCCESS; @@ -89,6 +83,7 @@ private: return ret; } private: + bool msg_set_; T msg_; common::ObThreadCond msg_ready_cond_; }; diff --git a/src/sql/engine/px/ob_px_sqc_proxy.h b/src/sql/engine/px/ob_px_sqc_proxy.h index a9ae75448f..c63f30fc4f 100644 --- a/src/sql/engine/px/ob_px_sqc_proxy.h +++ b/src/sql/engine/px/ob_px_sqc_proxy.h @@ -225,19 +225,6 @@ int ObPxSQCProxy::get_dh_msg( SQL_LOG(WARN, "fail push data to channel", K(ret)); } else if (OB_FAIL(ch->flush())) { SQL_LOG(WARN, "fail flush dtl data", K(ret)); - } else { - // The whole message should be reset in next rescan, we reset it after last piece msg - // send in sending piece msg and receiving whole msg scenario (need send && wait whole msg). - if (need_wait_whole_msg) { - const int64_t task_cnt = get_task_count(); - if (provider->send_msg_cnt_ % task_cnt == 0) { - provider->msg_set_ = false; - } - provider->send_msg_cnt_ += 1; - if (provider->send_msg_cnt_ % task_cnt == 0) { - provider->reset(); // reset whole message - } - } } } -- GitLab