diff --git a/deps/oblib/src/lib/thread/threads.cpp b/deps/oblib/src/lib/thread/threads.cpp index 2eba1449bae4605b5f3af29f88b982e48cc85dee..d0650b0d60b7a4e9bd91316e27bcf853bbcefa12 100644 --- a/deps/oblib/src/lib/thread/threads.cpp +++ b/deps/oblib/src/lib/thread/threads.cpp @@ -112,6 +112,48 @@ int Threads::inc_thread_count(int64_t inc) return do_set_thread_count(n_threads); } +int Threads::thread_recycle() +{ + // check if any idle threads and notify them to exit + // idle defination: not working for more than N minutes + common::SpinWLockGuard g(lock_); + // int target = 10; // leave at most 10 threads as cached thread + return do_thread_recycle(); +} + +int Threads::do_thread_recycle() +{ + int ret = OB_SUCCESS; + int n_threads = n_threads_; + // destroy all stopped threads + // px threads mark itself as stopped when it is idle for more than 10 minutes. + for (int i = 0; i < n_threads_; i++) { + if (nullptr != threads_[i]) { + if (threads_[i]->has_set_stop()) { + destroy_thread(threads_[i]); + threads_[i] = nullptr; + n_threads--; + LOG_INFO("recycle one thread", "total", n_threads_, "remain", n_threads); + } + } + } + // for simplicity, don't free threads_ buffer, only reduce n_threads_ size + if (n_threads != n_threads_) { + int from = 0; + int to = 0; + // find non-empty slot, set it to threads_[i] + while (from < n_threads_ && to < n_threads_) { + if (nullptr != threads_[from]) { + threads_[to] = threads_[from]; + to++; + } + from++; + } + n_threads_ = n_threads; + } + return ret; +} + int Threads::init() { return OB_SUCCESS; diff --git a/deps/oblib/src/lib/thread/threads.h b/deps/oblib/src/lib/thread/threads.h index dc77bba382b33e12572acd37d9ad1c206248ce40..37822fb47c4bee893b4750113cd17f42c7a1dca8 100644 --- a/deps/oblib/src/lib/thread/threads.h +++ b/deps/oblib/src/lib/thread/threads.h @@ -81,6 +81,7 @@ public: int do_set_thread_count(int64_t n_threads); int set_thread_count(int64_t n_threads); int inc_thread_count(int64_t inc = 1); + int thread_recycle(); int init(); // IRunWrapper 用于创建多租户线程时指定租户上下文 @@ -121,6 +122,7 @@ private: virtual void run(int64_t idx); virtual void run1() {} + int do_thread_recycle(); /// \brief Create thread with start entry \c entry. int create_thread(Thread *&thread, std::function entry); diff --git a/src/observer/omt/ob_tenant.cpp b/src/observer/omt/ob_tenant.cpp index 713b0af11ef322958499f6552fb7fab9670827a0..29ab85d461ebcff730e05f74c391540ccf382f1c 100644 --- a/src/observer/omt/ob_tenant.cpp +++ b/src/observer/omt/ob_tenant.cpp @@ -123,6 +123,30 @@ int ObPxPools::create_pool(int64_t group_id, ObPxPool *&pool) return ret; } +int ObPxPools::thread_recycle() +{ + int ret = OB_SUCCESS; + common::SpinWLockGuard g(lock_); + ThreadRecyclePoolFunc recycle_pool_func; + if (OB_FAIL(pool_map_.foreach_refactored(recycle_pool_func))) { + LOG_WARN("failed to do foreach", K(ret)); + } + return ret; +} + +int ObPxPools::ThreadRecyclePoolFunc::operator() (common::hash::HashMapPair &kv) +{ + int ret = OB_SUCCESS; + int64_t &group_id = kv.first; + ObPxPool *pool = kv.second; + if (NULL == pool) { + LOG_WARN("pool is null", K(group_id)); + } else { + pool->thread_recycle(); + } + return ret; +} + int ObPxPools::DeletePoolFunc::operator() (common::hash::HashMapPair &kv) { int ret = OB_SUCCESS; @@ -214,12 +238,26 @@ void ObPxPool::run1() } ObLink *task = nullptr; + int64_t idle_time = 0; while (!Thread::current().has_set_stop()) { if (!is_inited_) { ob_usleep(10 * 1000L); } else { if (OB_SUCC(queue_.pop(task, QUEUE_WAIT_TIME))) { handle(task); + idle_time = 0; // reset recycle timer + } else { + // recycle thread policy: + // 1. first N threads reserved for first 10min idle period + // 2. no thread reserved after 1h idle period + const int N = 10; + idle_time += QUEUE_WAIT_TIME; + // if idle for more than 10 min, exit thread + if (idle_time > 10LL * 60 * 1000 * 1000 && get_thread_idx() >= N) { + Thread::current().stop(); + } else if (idle_time > 60LL * 60 * 1000 * 1000) { + Thread::current().stop(); + } } } } @@ -1798,6 +1836,7 @@ void ObTenant::periodically_check() check_parallel_servers_target(); check_resource_manager_plan(); check_dtl(); + check_px_thread_recycle(); } } @@ -1852,7 +1891,7 @@ void ObTenant::check_parallel_servers_target() { int ret = OB_SUCCESS; int64_t val = 0; - if (OB_SYS_TENANT_ID != id_ && OB_MAX_RESERVED_TENANT_ID >= id_) { + if (is_virtual_tenant_id(id_)) { // Except for system rentals, internal tenants do not allocate px threads } else if (OB_FAIL(ObSchemaUtils::get_tenant_int_variable( id_, @@ -1863,3 +1902,20 @@ void ObTenant::check_parallel_servers_target() LOG_WARN("set parallel_servers_target failed", K(ret), K(id_), K(val)); } } + +void ObTenant::check_px_thread_recycle() +{ + int ret = OB_SUCCESS; + if (is_virtual_tenant_id(id_)) { + // Except for system rentals, internal tenants do not allocate px threads + } else { + ObTenantSwitchGuard guard(this); + auto px_pools = MTL(ObPxPools*); + if (OB_NOT_NULL(px_pools)) { + px_pools->thread_recycle(); + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to switch to tenant", K(id_), K(ret)); + } + } +} diff --git a/src/observer/omt/ob_tenant.h b/src/observer/omt/ob_tenant.h index 9761a3014ea714900410f1fa0968f0935b7fcdb7..36e919553af8be9bd6066404f27ccd71d6ebc646 100644 --- a/src/observer/omt/ob_tenant.h +++ b/src/observer/omt/ob_tenant.h @@ -106,6 +106,14 @@ public: virtual ~DeletePoolFunc() = default; int operator()(common::hash::HashMapPair &kv); }; + + class ThreadRecyclePoolFunc + { + public: + ThreadRecyclePoolFunc() {} + virtual ~ThreadRecyclePoolFunc() = default; + int operator()(common::hash::HashMapPair &kv); + }; public: static int mtl_init(ObPxPools *&pools) { @@ -132,6 +140,7 @@ public: } int init(uint64_t tenant_id); int get_or_create(int64_t group_id, ObPxPool *&pool); + int thread_recycle(); private: void destroy(); int create_pool(int64_t group_id, ObPxPool *&pool); @@ -566,6 +575,7 @@ private: // read tenant variable PARALLEL_SERVERS_TARGET void check_parallel_servers_target(); + void check_px_thread_recycle(); // The update of the resource manager is applied to the cgroup void check_resource_manager_plan();