diff --git a/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp b/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp index 638edc779130bc39e47cb87523e3e7b5e647907d..2bbaffca63d74cf009df87adb40a1569aa4a02ee 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp +++ b/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp @@ -298,6 +298,8 @@ int ObRDWFPieceMsgCtx::send_whole_msg(common::ObIArray &sqcs) void ObRDWFPieceMsgCtx::reset_resource() { received_ = 0; + infos_.reset(); + arena_alloc_.reset(); } int ObRDWFWholeMsg::assign(const ObRDWFWholeMsg &msg) diff --git a/src/sql/engine/px/datahub/ob_dh_msg_provider.h b/src/sql/engine/px/datahub/ob_dh_msg_provider.h index 5df3d8684615f7f80b22ff13cec1dcd741d8af50..4639eb8d35299c128c6fcbe99cc537c8e670403c 100644 --- a/src/sql/engine/px/datahub/ob_dh_msg_provider.h +++ b/src/sql/engine/px/datahub/ob_dh_msg_provider.h @@ -25,20 +25,26 @@ namespace sql class ObPxDatahubDataProvider { public: + ObPxDatahubDataProvider() + : op_id_(-1), msg_type_(dtl::TESTING), send_msg_cnt_(0), msg_set_(false) + { + } virtual int get_msg_nonblock(const dtl::ObDtlMsg *&msg, int64_t timeout_ts) = 0; - virtual void reset() {} + virtual void reset() { msg_set_ = false; } TO_STRING_KV(K_(op_id), K_(msg_type)); uint64_t op_id_; // 注册本 provider 的算子 id,用于 provder 数组里寻址对应 provider dtl::ObDtlMsgType msg_type_; + volatile int64_t send_msg_cnt_; + bool msg_set_; }; template class ObWholeMsgProvider : public ObPxDatahubDataProvider { public: - ObWholeMsgProvider() : msg_set_(false) {} + ObWholeMsgProvider() {} virtual ~ObWholeMsgProvider() = default; - virtual void reset() override { msg_.reset(); msg_set_ = false; } + virtual void reset() override { msg_.reset(); ObPxDatahubDataProvider::reset(); } int get_msg_nonblock(const dtl::ObDtlMsg *&msg, int64_t timeout_ts) { int ret = OB_SUCCESS; @@ -83,7 +89,6 @@ private: return ret; } private: - bool msg_set_; T msg_; common::ObThreadCond msg_ready_cond_; }; diff --git a/src/sql/engine/px/ob_px_sqc_proxy.h b/src/sql/engine/px/ob_px_sqc_proxy.h index 20b44efa4e1036b790b17616412b211651d5fb86..a9ae75448fac3a22ba8865b5ccffb87944e3dea2 100644 --- a/src/sql/engine/px/ob_px_sqc_proxy.h +++ b/src/sql/engine/px/ob_px_sqc_proxy.h @@ -217,7 +217,6 @@ int ObPxSQCProxy::get_dh_msg( } else { if (send_piece) { ObLockGuard lock_guard(dtl_lock_); - // TODO: LOCK sqc channel dtl::ObDtlChannel *ch = sqc_arg_.sqc_.get_sqc_channel(); if (OB_ISNULL(ch)) { ret = common::OB_ERR_UNEXPECTED; @@ -226,6 +225,19 @@ int ObPxSQCProxy::get_dh_msg( SQL_LOG(WARN, "fail push data to channel", K(ret)); } else if (OB_FAIL(ch->flush())) { SQL_LOG(WARN, "fail flush dtl data", K(ret)); + } else { + // The whole message should be reset in next rescan, we reset it after last piece msg + // send in sending piece msg and receiving whole msg scenario (need send && wait whole msg). + if (need_wait_whole_msg) { + const int64_t task_cnt = get_task_count(); + if (provider->send_msg_cnt_ % task_cnt == 0) { + provider->msg_set_ = false; + } + provider->send_msg_cnt_ += 1; + if (provider->send_msg_cnt_ % task_cnt == 0) { + provider->reset(); // reset whole message + } + } } }