diff --git a/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp b/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp index 2bbaffca63d74cf009df87adb40a1569aa4a02ee..638edc779130bc39e47cb87523e3e7b5e647907d 100644 --- a/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp +++ b/src/sql/engine/px/datahub/components/ob_dh_range_dist_wf.cpp @@ -298,8 +298,6 @@ int ObRDWFPieceMsgCtx::send_whole_msg(common::ObIArray &sqcs) void ObRDWFPieceMsgCtx::reset_resource() { received_ = 0; - infos_.reset(); - arena_alloc_.reset(); } int ObRDWFWholeMsg::assign(const ObRDWFWholeMsg &msg) diff --git a/src/sql/engine/px/datahub/ob_dh_msg_provider.h b/src/sql/engine/px/datahub/ob_dh_msg_provider.h index 4639eb8d35299c128c6fcbe99cc537c8e670403c..5df3d8684615f7f80b22ff13cec1dcd741d8af50 100644 --- a/src/sql/engine/px/datahub/ob_dh_msg_provider.h +++ b/src/sql/engine/px/datahub/ob_dh_msg_provider.h @@ -25,26 +25,20 @@ namespace sql class ObPxDatahubDataProvider { public: - ObPxDatahubDataProvider() - : op_id_(-1), msg_type_(dtl::TESTING), send_msg_cnt_(0), msg_set_(false) - { - } virtual int get_msg_nonblock(const dtl::ObDtlMsg *&msg, int64_t timeout_ts) = 0; - virtual void reset() { msg_set_ = false; } + virtual void reset() {} TO_STRING_KV(K_(op_id), K_(msg_type)); uint64_t op_id_; // 注册本 provider 的算子 id,用于 provder 数组里寻址对应 provider dtl::ObDtlMsgType msg_type_; - volatile int64_t send_msg_cnt_; - bool msg_set_; }; template class ObWholeMsgProvider : public ObPxDatahubDataProvider { public: - ObWholeMsgProvider() {} + ObWholeMsgProvider() : msg_set_(false) {} virtual ~ObWholeMsgProvider() = default; - virtual void reset() override { msg_.reset(); ObPxDatahubDataProvider::reset(); } + virtual void reset() override { msg_.reset(); msg_set_ = false; } int get_msg_nonblock(const dtl::ObDtlMsg *&msg, int64_t timeout_ts) { int ret = OB_SUCCESS; @@ -89,6 +83,7 @@ private: return ret; } private: + bool msg_set_; T msg_; common::ObThreadCond msg_ready_cond_; }; diff --git a/src/sql/engine/px/ob_px_sqc_proxy.h b/src/sql/engine/px/ob_px_sqc_proxy.h index a9ae75448fac3a22ba8865b5ccffb87944e3dea2..c63f30fc4f001d183d7a6a84b0de2fdb693cc399 100644 --- a/src/sql/engine/px/ob_px_sqc_proxy.h +++ b/src/sql/engine/px/ob_px_sqc_proxy.h @@ -225,19 +225,6 @@ int ObPxSQCProxy::get_dh_msg( SQL_LOG(WARN, "fail push data to channel", K(ret)); } else if (OB_FAIL(ch->flush())) { SQL_LOG(WARN, "fail flush dtl data", K(ret)); - } else { - // The whole message should be reset in next rescan, we reset it after last piece msg - // send in sending piece msg and receiving whole msg scenario (need send && wait whole msg). - if (need_wait_whole_msg) { - const int64_t task_cnt = get_task_count(); - if (provider->send_msg_cnt_ % task_cnt == 0) { - provider->msg_set_ = false; - } - provider->send_msg_cnt_ += 1; - if (provider->send_msg_cnt_ % task_cnt == 0) { - provider->reset(); // reset whole message - } - } } }