From 5ff640d9b8abae318cf5b1c3570ee279923ccfc5 Mon Sep 17 00:00:00 2001 From: qianchanger Date: Mon, 18 Apr 2022 10:28:40 +0800 Subject: [PATCH] Fix px infinite retry loop in PL. && Clean up interrupt status at the beginning of the thread --- src/observer/ob_inner_sql_connection.cpp | 7 +++++++ src/observer/omt/ob_tenant.cpp | 4 +++- src/observer/omt/ob_th_worker.cpp | 1 + src/share/interrupt/ob_global_interrupt_call.cpp | 8 ++++++++ src/share/interrupt/ob_global_interrupt_call.h | 14 ++++++++++++-- src/sql/engine/px/ob_px_admission.cpp | 3 +-- src/sql/ob_result_set.cpp | 8 +------- 7 files changed, 33 insertions(+), 12 deletions(-) diff --git a/src/observer/ob_inner_sql_connection.cpp b/src/observer/ob_inner_sql_connection.cpp index 451fe1dd7f..9c939e5caa 100644 --- a/src/observer/ob_inner_sql_connection.cpp +++ b/src/observer/ob_inner_sql_connection.cpp @@ -474,6 +474,13 @@ int ObInnerSQLConnection::process_retry( THIS_WORKER.sched_wait(); usleep(static_cast(sleep_time_us)); THIS_WORKER.sched_run(); + } else if (repeatable_stmt && is_scheduler_thread_not_enough_err(last_ret)) { + const int64_t sleep_time_us = 10 * 1000; // 10ms + need_retry = true; + LOG_WARN("scheduler thread not enough, need retry", K(ret), K(last_ret), K(retry_cnt)); + THIS_WORKER.sched_wait(); + usleep(static_cast(sleep_time_us)); + THIS_WORKER.sched_run(); } if (get_session().is_nested_session()) { /** diff --git a/src/observer/omt/ob_tenant.cpp b/src/observer/omt/ob_tenant.cpp index dbd999f801..07fc9ae22a 100644 --- a/src/observer/omt/ob_tenant.cpp +++ b/src/observer/omt/ob_tenant.cpp @@ -20,6 +20,7 @@ #include "lib/stat/ob_session_stat.h" #include "share/config/ob_server_config.h" #include "sql/engine/px/ob_px_admission.h" +#include "share/interrupt/ob_global_interrupt_call.h" #include "ob_th_worker.h" #include "ob_worker_pool.h" #include "ob_multi_tenant.h" @@ -133,8 +134,9 @@ void ObPxPool::run1() if (OB_LIKELY(nullptr != pm)) { pm->set_tenant_ctx(tenant_id_, common::ObCtxIds::WORK_AREA); } + CLEAR_INTERRUPTABLE(); ObCgroupCtrl* cgroup_ctrl = GCTX.cgroup_ctrl_; - LOG_INFO("XXXXX: run px pool", K(group_id_), K(tenant_id_)); + LOG_INFO("run px pool", K(group_id_), K(tenant_id_)); if (nullptr != cgroup_ctrl && OB_LIKELY(cgroup_ctrl->is_valid())) { pid_t pid = static_cast(syscall(__NR_gettid)); cgroup_ctrl->add_thread_to_cgroup(tenant_id_, group_id_, pid); diff --git a/src/observer/omt/ob_th_worker.cpp b/src/observer/omt/ob_th_worker.cpp index 0d516e8690..a054777089 100644 --- a/src/observer/omt/ob_th_worker.cpp +++ b/src/observer/omt/ob_th_worker.cpp @@ -392,6 +392,7 @@ void ObThWorker::worker(int64_t& tenant_id, int64_t& req_recv_timestamp, int32_t ret = pm->set_tenant_ctx(tenant_->id(), ObCtxIds::DEFAULT_CTX_ID); } } + CLEAR_INTERRUPTABLE(); set_th_worker_thread_name(tenant_->id()); lib::ContextTLOptGuard guard(true); lib::ContextParam param; diff --git a/src/share/interrupt/ob_global_interrupt_call.cpp b/src/share/interrupt/ob_global_interrupt_call.cpp index f4c4019a79..4fc0b777b4 100644 --- a/src/share/interrupt/ob_global_interrupt_call.cpp +++ b/src/share/interrupt/ob_global_interrupt_call.cpp @@ -97,6 +97,14 @@ ObGlobalInterruptManager* ObGlobalInterruptManager::getInstance() return instance_; } +void ObInterruptChecker::clear_interrupt_status() +{ + if (ref_count_ > 0) { + LIB_LOG(ERROR, "invlid interrupt ref count"); + } + interrupted_ = false; +} + int ObGlobalInterruptManager::init(const common::ObAddr& local, ObInterruptRpcProxy* rpc_proxy) { int ret = OB_SUCCESS; diff --git a/src/share/interrupt/ob_global_interrupt_call.h b/src/share/interrupt/ob_global_interrupt_call.h index 9c965e1aaf..94b0611eb5 100644 --- a/src/share/interrupt/ob_global_interrupt_call.h +++ b/src/share/interrupt/ob_global_interrupt_call.h @@ -124,6 +124,7 @@ public: void clear_status(); void interrupt(ObInterruptCode& interrupt_code); + void clear_interrupt_status(); private: /* @@ -339,7 +340,16 @@ OB_INLINE void UNSET_INTERRUPTABLE(const ObInterruptibleTaskID& tid) } } -} // end namespace common -} // end namespace oceanbase +OB_INLINE void CLEAR_INTERRUPTABLE() +{ + if (OB_ISNULL(get_checker())) { + LIB_LOG(ERROR, "interrupt checker may not be set correctly"); + } else { + get_checker()->clear_interrupt_status(); + } +} + +} // end namespace common +} // end namespace oceanbase #endif // OCEANBASE_OBSERVER_OB_REMOTE_INTERRUPT_CALL_H_ diff --git a/src/sql/engine/px/ob_px_admission.cpp b/src/sql/engine/px/ob_px_admission.cpp index 4ac0cd3dac..1d27d2a72e 100644 --- a/src/sql/engine/px/ob_px_admission.cpp +++ b/src/sql/engine/px/ob_px_admission.cpp @@ -70,8 +70,7 @@ int ObPxAdmission::enter_query_admission( LOG_WARN("fail check query status", K(ret)); } else if (!ObPxAdmission::admit(req_worker_count, admit_worker_count)) { plan.inc_delayed_px_querys(); - THIS_WORKER.set_retry_flag(); - ret = OB_EAGAIN; + ret = OB_ERR_SCHEDULER_THREAD_NOT_ENOUGH; LOG_INFO("It's a px query, out of px worker resource, " "need delay, do not need disconnect", K(req_worker_count), diff --git a/src/sql/ob_result_set.cpp b/src/sql/ob_result_set.cpp index 360879530a..d0dcb31f28 100644 --- a/src/sql/ob_result_set.cpp +++ b/src/sql/ob_result_set.cpp @@ -101,12 +101,7 @@ OB_INLINE int ObResultSet::open_plan() if (OB_FAIL(ObPxAdmission::enter_query_admission( my_session_, get_exec_context(), *get_physical_plan(), worker_count_))) { // query is not admitted to run - if (OB_EAGAIN == ret) { - ret = OB_ERR_SCHEDULER_THREAD_NOT_ENOUGH; - LOG_DEBUG("Query is not admitted to run, try again", K(ret)); - } else { - LOG_WARN("Fail to get admission to use px", K(ret)); - } + LOG_DEBUG("Query is not admitted to run, try again", K(ret)); } else if (THIS_WORKER.is_timeout()) { ret = OB_TIMEOUT; LOG_WARN("query is timeout", @@ -1108,7 +1103,6 @@ void ObResultSet::refresh_location_cache(ObTaskExecutorCtx &task_exec_ctx, bool } } -// obmp_query中重试整个SQL之前,可能需要调用本接口来刷新Location,以避免总是发给了错误的服务器 int ObResultSet::refresh_location_cache(bool is_nonblock) { return ObTaskExecutorCtxUtil::refresh_location_cache(get_exec_context().get_task_exec_ctx(), is_nonblock); -- GitLab