提交 6243275a 编写于 作者: R raywill 提交者: OB-robot

px thread auto recycling

上级 7b17fe21
......@@ -112,6 +112,48 @@ int Threads::inc_thread_count(int64_t inc)
return do_set_thread_count(n_threads);
}
int Threads::thread_recycle()
{
// check if any idle threads and notify them to exit
// idle defination: not working for more than N minutes
common::SpinWLockGuard g(lock_);
// int target = 10; // leave at most 10 threads as cached thread
return do_thread_recycle();
}
int Threads::do_thread_recycle()
{
int ret = OB_SUCCESS;
int n_threads = n_threads_;
// destroy all stopped threads
// px threads mark itself as stopped when it is idle for more than 10 minutes.
for (int i = 0; i < n_threads_; i++) {
if (nullptr != threads_[i]) {
if (threads_[i]->has_set_stop()) {
destroy_thread(threads_[i]);
threads_[i] = nullptr;
n_threads--;
LOG_INFO("recycle one thread", "total", n_threads_, "remain", n_threads);
}
}
}
// for simplicity, don't free threads_ buffer, only reduce n_threads_ size
if (n_threads != n_threads_) {
int from = 0;
int to = 0;
// find non-empty slot, set it to threads_[i]
while (from < n_threads_ && to < n_threads_) {
if (nullptr != threads_[from]) {
threads_[to] = threads_[from];
to++;
}
from++;
}
n_threads_ = n_threads;
}
return ret;
}
int Threads::init()
{
return OB_SUCCESS;
......
......@@ -81,6 +81,7 @@ public:
int do_set_thread_count(int64_t n_threads);
int set_thread_count(int64_t n_threads);
int inc_thread_count(int64_t inc = 1);
int thread_recycle();
int init();
// IRunWrapper 用于创建多租户线程时指定租户上下文
......@@ -121,6 +122,7 @@ private:
virtual void run(int64_t idx);
virtual void run1() {}
int do_thread_recycle();
/// \brief Create thread with start entry \c entry.
int create_thread(Thread *&thread, std::function<void()> entry);
......
......@@ -123,6 +123,30 @@ int ObPxPools::create_pool(int64_t group_id, ObPxPool *&pool)
return ret;
}
int ObPxPools::thread_recycle()
{
int ret = OB_SUCCESS;
common::SpinWLockGuard g(lock_);
ThreadRecyclePoolFunc recycle_pool_func;
if (OB_FAIL(pool_map_.foreach_refactored(recycle_pool_func))) {
LOG_WARN("failed to do foreach", K(ret));
}
return ret;
}
int ObPxPools::ThreadRecyclePoolFunc::operator() (common::hash::HashMapPair<int64_t, ObPxPool*> &kv)
{
int ret = OB_SUCCESS;
int64_t &group_id = kv.first;
ObPxPool *pool = kv.second;
if (NULL == pool) {
LOG_WARN("pool is null", K(group_id));
} else {
pool->thread_recycle();
}
return ret;
}
int ObPxPools::DeletePoolFunc::operator() (common::hash::HashMapPair<int64_t, ObPxPool*> &kv)
{
int ret = OB_SUCCESS;
......@@ -214,12 +238,26 @@ void ObPxPool::run1()
}
ObLink *task = nullptr;
int64_t idle_time = 0;
while (!Thread::current().has_set_stop()) {
if (!is_inited_) {
ob_usleep(10 * 1000L);
} else {
if (OB_SUCC(queue_.pop(task, QUEUE_WAIT_TIME))) {
handle(task);
idle_time = 0; // reset recycle timer
} else {
// recycle thread policy:
// 1. first N threads reserved for first 10min idle period
// 2. no thread reserved after 1h idle period
const int N = 10;
idle_time += QUEUE_WAIT_TIME;
// if idle for more than 10 min, exit thread
if (idle_time > 10LL * 60 * 1000 * 1000 && get_thread_idx() >= N) {
Thread::current().stop();
} else if (idle_time > 60LL * 60 * 1000 * 1000) {
Thread::current().stop();
}
}
}
}
......@@ -1798,6 +1836,7 @@ void ObTenant::periodically_check()
check_parallel_servers_target();
check_resource_manager_plan();
check_dtl();
check_px_thread_recycle();
}
}
......@@ -1852,7 +1891,7 @@ void ObTenant::check_parallel_servers_target()
{
int ret = OB_SUCCESS;
int64_t val = 0;
if (OB_SYS_TENANT_ID != id_ && OB_MAX_RESERVED_TENANT_ID >= id_) {
if (is_virtual_tenant_id(id_)) {
// Except for system rentals, internal tenants do not allocate px threads
} else if (OB_FAIL(ObSchemaUtils::get_tenant_int_variable(
id_,
......@@ -1863,3 +1902,20 @@ void ObTenant::check_parallel_servers_target()
LOG_WARN("set parallel_servers_target failed", K(ret), K(id_), K(val));
}
}
void ObTenant::check_px_thread_recycle()
{
int ret = OB_SUCCESS;
if (is_virtual_tenant_id(id_)) {
// Except for system rentals, internal tenants do not allocate px threads
} else {
ObTenantSwitchGuard guard(this);
auto px_pools = MTL(ObPxPools*);
if (OB_NOT_NULL(px_pools)) {
px_pools->thread_recycle();
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to switch to tenant", K(id_), K(ret));
}
}
}
......@@ -106,6 +106,14 @@ public:
virtual ~DeletePoolFunc() = default;
int operator()(common::hash::HashMapPair<int64_t, ObPxPool*> &kv);
};
class ThreadRecyclePoolFunc
{
public:
ThreadRecyclePoolFunc() {}
virtual ~ThreadRecyclePoolFunc() = default;
int operator()(common::hash::HashMapPair<int64_t, ObPxPool*> &kv);
};
public:
static int mtl_init(ObPxPools *&pools)
{
......@@ -132,6 +140,7 @@ public:
}
int init(uint64_t tenant_id);
int get_or_create(int64_t group_id, ObPxPool *&pool);
int thread_recycle();
private:
void destroy();
int create_pool(int64_t group_id, ObPxPool *&pool);
......@@ -566,6 +575,7 @@ private:
// read tenant variable PARALLEL_SERVERS_TARGET
void check_parallel_servers_target();
void check_px_thread_recycle();
// The update of the resource manager is applied to the cgroup
void check_resource_manager_plan();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册