diff --git a/src/sql/engine/px/ob_dfo_scheduler.cpp b/src/sql/engine/px/ob_dfo_scheduler.cpp index 472520caaa8ea7729e3144d5fd7a47cef463bac8..183fa5ec7c2e280ff33a2344686309d75329d71c 100644 --- a/src/sql/engine/px/ob_dfo_scheduler.cpp +++ b/src/sql/engine/px/ob_dfo_scheduler.cpp @@ -980,6 +980,17 @@ int ObParallelDfoScheduler::dispatch_sqc( } else { LOG_WARN("fail to wait all async init sqc", K(ret), K(dfo), K(exec_ctx)); } + // 对于正确process的sqc, 是需要sqc report的, 否则在后续的wait_running_dfo逻辑中不会等待此sqc结束 + const ObSqcAsyncCB *cb = NULL; + const ObArray &callbacks = proxy.get_callbacks(); + for (int i = 0; i < callbacks.count(); ++i) { + cb = callbacks.at(i); + if (OB_NOT_NULL(cb) && cb->is_processed() && OB_SUCCESS == cb->get_ret_code().rcode_ && + OB_SUCCESS == cb->get_result().rc_) { + ObPxSqcMeta &sqc = *sqcs.at(i); + sqc.set_need_report(true); + } + } } else { const ObArray& callbacks = proxy.get_callbacks(); ARRAY_FOREACH(callbacks, idx) { diff --git a/src/sql/engine/px/ob_px_sqc_async_proxy.cpp b/src/sql/engine/px/ob_px_sqc_async_proxy.cpp index 8c18d34bfca73e4e1feae753a1dedbaa4a184af2..153b88625047127a79c1df3620c1ab0c6db11503 100644 --- a/src/sql/engine/px/ob_px_sqc_async_proxy.cpp +++ b/src/sql/engine/px/ob_px_sqc_async_proxy.cpp @@ -206,19 +206,6 @@ int ObPxSqcAsyncProxy::wait_all() LOG_WARN("call rpc failed", K(ret), K(callback.get_ret_code())); } } - - if (callback.need_retry() && OB_SUCC(ret)) { - // need retry the task. - // reset: visit, eturn_cb_count_ - callback.set_visited(false); - return_cb_count_--; - if (check_for_retry(callback)) { - callback.reset(); - if (OB_FAIL(launch_one_rpc_request(idx, &callback))) { - LOG_WARN("retrying to send sqc rpc failed"); - } - } - } } } @@ -244,18 +231,6 @@ void ObPxSqcAsyncProxy::destroy() results_.reuse(); } -bool ObPxSqcAsyncProxy::check_for_retry(ObSqcAsyncCB& callback) -{ - bool retry = false; - int64_t timeout_us = phy_plan_ctx_->get_timeout_timestamp() - ObTimeUtility::current_time(); - int64_t send_duration = ObTimeUtility::current_time() - callback.get_send_ts(); - // avoid retry too mutch - if (timeout_us >= 100 * 1000L && send_duration >= 10 * 1000L) { - retry = true; - } - return retry; -} - void ObPxSqcAsyncProxy::fail_process() { LOG_WARN("async sqc fails, process the callbacks that have not yet got results", diff --git a/src/sql/engine/px/ob_px_sqc_async_proxy.h b/src/sql/engine/px/ob_px_sqc_async_proxy.h index 6f2044eb5365e3573fb55f24f4a31bab31c9de02..aea9bd381361747e1116264ee83a070a6a675676 100644 --- a/src/sql/engine/px/ob_px_sqc_async_proxy.h +++ b/src/sql/engine/px/ob_px_sqc_async_proxy.h @@ -57,7 +57,6 @@ public: is_timeout_ = false; is_invalid_ = false; is_visited_ = false; - need_retry_ = false; } void set_visited(bool value) { @@ -83,14 +82,6 @@ public: { return is_processed_; } - void set_retry(bool value) - { - need_retry_ = value; - } - bool need_retry() const - { - return need_retry_; - } const obrpc::ObRpcResultCode get_ret_code() const { return rcode_; @@ -113,8 +104,7 @@ private: bool is_timeout_; bool is_invalid_; bool is_visited_; - bool need_retry_; - ObThreadCond& cond_; + ObThreadCond &cond_; ObCurTraceId::TraceId trace_id_; }; @@ -160,7 +150,6 @@ private: void destroy(); // asynchronously request a single sqc rpc task int launch_one_rpc_request(int64_t idx, ObSqcAsyncCB* cb); - bool check_for_retry(ObSqcAsyncCB& callback); void fail_process(); private: