diff --git a/src/observer/ob_inner_sql_connection.cpp b/src/observer/ob_inner_sql_connection.cpp index 451fe1dd7f53d291f762d7f01904493a3cddb16e..9c939e5caaf5abfbea71e6ce402c945bf023e8b2 100644 --- a/src/observer/ob_inner_sql_connection.cpp +++ b/src/observer/ob_inner_sql_connection.cpp @@ -474,6 +474,13 @@ int ObInnerSQLConnection::process_retry( THIS_WORKER.sched_wait(); usleep(static_cast(sleep_time_us)); THIS_WORKER.sched_run(); + } else if (repeatable_stmt && is_scheduler_thread_not_enough_err(last_ret)) { + const int64_t sleep_time_us = 10 * 1000; // 10ms + need_retry = true; + LOG_WARN("scheduler thread not enough, need retry", K(ret), K(last_ret), K(retry_cnt)); + THIS_WORKER.sched_wait(); + usleep(static_cast(sleep_time_us)); + THIS_WORKER.sched_run(); } if (get_session().is_nested_session()) { /** diff --git a/src/observer/omt/ob_tenant.cpp b/src/observer/omt/ob_tenant.cpp index dbd999f80107f7ede0cb2f604bf373e3fb13aed2..07fc9ae22a872b08460e7824ad7260b577455057 100644 --- a/src/observer/omt/ob_tenant.cpp +++ b/src/observer/omt/ob_tenant.cpp @@ -20,6 +20,7 @@ #include "lib/stat/ob_session_stat.h" #include "share/config/ob_server_config.h" #include "sql/engine/px/ob_px_admission.h" +#include "share/interrupt/ob_global_interrupt_call.h" #include "ob_th_worker.h" #include "ob_worker_pool.h" #include "ob_multi_tenant.h" @@ -133,8 +134,9 @@ void ObPxPool::run1() if (OB_LIKELY(nullptr != pm)) { pm->set_tenant_ctx(tenant_id_, common::ObCtxIds::WORK_AREA); } + CLEAR_INTERRUPTABLE(); ObCgroupCtrl* cgroup_ctrl = GCTX.cgroup_ctrl_; - LOG_INFO("XXXXX: run px pool", K(group_id_), K(tenant_id_)); + LOG_INFO("run px pool", K(group_id_), K(tenant_id_)); if (nullptr != cgroup_ctrl && OB_LIKELY(cgroup_ctrl->is_valid())) { pid_t pid = static_cast(syscall(__NR_gettid)); cgroup_ctrl->add_thread_to_cgroup(tenant_id_, group_id_, pid); diff --git a/src/observer/omt/ob_th_worker.cpp b/src/observer/omt/ob_th_worker.cpp index 0d516e86901770b2f49fbb11449a4e3af28138ea..a054777089aa3c30258606d49b602741c2c2e428 100644 --- a/src/observer/omt/ob_th_worker.cpp +++ b/src/observer/omt/ob_th_worker.cpp @@ -392,6 +392,7 @@ void ObThWorker::worker(int64_t& tenant_id, int64_t& req_recv_timestamp, int32_t ret = pm->set_tenant_ctx(tenant_->id(), ObCtxIds::DEFAULT_CTX_ID); } } + CLEAR_INTERRUPTABLE(); set_th_worker_thread_name(tenant_->id()); lib::ContextTLOptGuard guard(true); lib::ContextParam param; diff --git a/src/share/interrupt/ob_global_interrupt_call.cpp b/src/share/interrupt/ob_global_interrupt_call.cpp index f4c4019a794aaa275fb9e0d579dad8f09bfe37db..4fc0b777b4f601d34cc542f3595aad67ab4536b2 100644 --- a/src/share/interrupt/ob_global_interrupt_call.cpp +++ b/src/share/interrupt/ob_global_interrupt_call.cpp @@ -97,6 +97,14 @@ ObGlobalInterruptManager* ObGlobalInterruptManager::getInstance() return instance_; } +void ObInterruptChecker::clear_interrupt_status() +{ + if (ref_count_ > 0) { + LIB_LOG(ERROR, "invlid interrupt ref count"); + } + interrupted_ = false; +} + int ObGlobalInterruptManager::init(const common::ObAddr& local, ObInterruptRpcProxy* rpc_proxy) { int ret = OB_SUCCESS; diff --git a/src/share/interrupt/ob_global_interrupt_call.h b/src/share/interrupt/ob_global_interrupt_call.h index 9c965e1aaf630229160e76cc49637c0b8bd25d02..94b0611eb56faf2af5820977ee974a9339955189 100644 --- a/src/share/interrupt/ob_global_interrupt_call.h +++ b/src/share/interrupt/ob_global_interrupt_call.h @@ -124,6 +124,7 @@ public: void clear_status(); void interrupt(ObInterruptCode& interrupt_code); + void clear_interrupt_status(); private: /* @@ -339,7 +340,16 @@ OB_INLINE void UNSET_INTERRUPTABLE(const ObInterruptibleTaskID& tid) } } -} // end namespace common -} // end namespace oceanbase +OB_INLINE void CLEAR_INTERRUPTABLE() +{ + if (OB_ISNULL(get_checker())) { + LIB_LOG(ERROR, "interrupt checker may not be set correctly"); + } else { + get_checker()->clear_interrupt_status(); + } +} + +} // end namespace common +} // end namespace oceanbase #endif // OCEANBASE_OBSERVER_OB_REMOTE_INTERRUPT_CALL_H_ diff --git a/src/sql/engine/px/ob_px_admission.cpp b/src/sql/engine/px/ob_px_admission.cpp index 4ac0cd3dac41b1d3afb990dd7bcd8362303310e0..1d27d2a72e0a88e57f28ff0620305a7450c96790 100644 --- a/src/sql/engine/px/ob_px_admission.cpp +++ b/src/sql/engine/px/ob_px_admission.cpp @@ -70,8 +70,7 @@ int ObPxAdmission::enter_query_admission( LOG_WARN("fail check query status", K(ret)); } else if (!ObPxAdmission::admit(req_worker_count, admit_worker_count)) { plan.inc_delayed_px_querys(); - THIS_WORKER.set_retry_flag(); - ret = OB_EAGAIN; + ret = OB_ERR_SCHEDULER_THREAD_NOT_ENOUGH; LOG_INFO("It's a px query, out of px worker resource, " "need delay, do not need disconnect", K(req_worker_count), diff --git a/src/sql/ob_result_set.cpp b/src/sql/ob_result_set.cpp index 360879530a91fad10520c506145be82c0913b738..d0dcb31f288fdc5be51226e193d29bc69be8ca5c 100644 --- a/src/sql/ob_result_set.cpp +++ b/src/sql/ob_result_set.cpp @@ -101,12 +101,7 @@ OB_INLINE int ObResultSet::open_plan() if (OB_FAIL(ObPxAdmission::enter_query_admission( my_session_, get_exec_context(), *get_physical_plan(), worker_count_))) { // query is not admitted to run - if (OB_EAGAIN == ret) { - ret = OB_ERR_SCHEDULER_THREAD_NOT_ENOUGH; - LOG_DEBUG("Query is not admitted to run, try again", K(ret)); - } else { - LOG_WARN("Fail to get admission to use px", K(ret)); - } + LOG_DEBUG("Query is not admitted to run, try again", K(ret)); } else if (THIS_WORKER.is_timeout()) { ret = OB_TIMEOUT; LOG_WARN("query is timeout", @@ -1108,7 +1103,6 @@ void ObResultSet::refresh_location_cache(ObTaskExecutorCtx &task_exec_ctx, bool } } -// obmp_query中重试整个SQL之前,可能需要调用本接口来刷新Location,以避免总是发给了错误的服务器 int ObResultSet::refresh_location_cache(bool is_nonblock) { return ObTaskExecutorCtxUtil::refresh_location_cache(get_exec_context().get_task_exec_ctx(), is_nonblock);