提交 a22145fe 编写于 作者: O obdev 提交者: ob-robot

fix: core at calling get_tid()

上级 ed4e8ef8
......@@ -50,8 +50,6 @@ Thread::Thread(int64_t stack_size)
Thread::Thread(Runnable runnable, int64_t stack_size)
: pth_(0),
pid_(0),
tid_(0),
runnable_(runnable),
#ifndef OB_USE_ASAN
stack_addr_(nullptr),
......@@ -152,8 +150,8 @@ void Thread::dump_pth() // for debug pthread join faileds
ssize_t size = 0;
char path[PATH_SIZE];
len = (char*)stack_addr_ + stack_size_ - (char*)pth_;
snprintf(path, PATH_SIZE, "log/dump_pth.%p.%d", (char*)pth_, (int)tid_);
LOG_WARN("dump pth start", K(path), K(pth_), K(tid_), K(len), K(stack_addr_), K(stack_size_));
snprintf(path, PATH_SIZE, "log/dump_pth.%p.%d", (char*)pth_, static_cast<pid_t>(syscall(__NR_gettid)));
LOG_WARN("dump pth start", K(path), K(pth_), K(len), K(stack_addr_), K(stack_size_));
if (NULL == (char*)pth_ || len >= stack_size_ || len <= 0) {
LOG_WARN("invalid member", K(pth_), K(stack_addr_), K(stack_size_));
} else if ((fd = ::open(path, O_WRONLY | O_CREAT | O_TRUNC,
......@@ -176,7 +174,7 @@ void Thread::dump_pth() // for debug pthread join faileds
ret = OB_IO_ERROR;
LOG_WARN("fail to close file fd", K(fd), KERRMSG, K(ret));
} else {
LOG_WARN("dump pth done", K(path), K(pth_), K(tid_), K(size));
LOG_WARN("dump pth done", K(path), K(pth_), K(size));
}
#endif
}
......@@ -197,8 +195,6 @@ void Thread::wait()
}
destroy_stack();
pth_ = 0;
pid_ = 0;
tid_ = 0;
runnable_ = nullptr;
if (1 <= ATOMIC_AAF(&join_concurrency_, -1)) {
abort();
......@@ -284,8 +280,6 @@ void* Thread::__th_start(void *arg)
.set_label("ThreadRoot")))) {
LOG_ERROR("create memory context failed", K(ret));
} else {
th->pid_ = getpid();
th->tid_ = static_cast<pid_t>(syscall(__NR_gettid));
WITH_CONTEXT(*mem_context) {
try {
in_try_stmt = true;
......
......@@ -38,9 +38,6 @@ public:
void destroy();
void dump_pth();
pid_t get_pid() const;
pid_t get_tid() const;
/// \brief Get current thread object.
///
/// \warning It would encounter segment fault if current thread
......@@ -72,8 +69,6 @@ private:
static int64_t total_thread_count_;
private:
pthread_t pth_;
pid_t pid_;
pid_t tid_;
Runnable runnable_;
#ifndef OB_USE_ASAN
void *stack_addr_;
......@@ -85,16 +80,6 @@ private:
pid_t tid_before_stop_;
};
OB_INLINE pid_t Thread::get_pid() const
{
return pid_;
}
OB_INLINE pid_t Thread::get_tid() const
{
return tid_;
}
OB_INLINE bool Thread::has_set_stop() const
{
IGNORE_RETURN update_loop_ts();
......
......@@ -96,11 +96,6 @@ public:
virtual void wait();
void destroy();
pid_t get_tid() const
{
OB_ASSERT(n_threads_ > 0);
return threads_[0]->get_tid();
}
public:
template <class Functor>
int submit(const Functor &func)
......
......@@ -73,8 +73,7 @@ public:
if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread &&
tenant_id_ != common::OB_INVALID_ID && nullptr != GCTX.cgroup_ctrl_ &&
OB_LIKELY(GCTX.cgroup_ctrl_->is_valid())) {
GCTX.cgroup_ctrl_->add_thread_to_cgroup(
static_cast<pid_t>(syscall(__NR_gettid)), tenant_id_,
GCTX.cgroup_ctrl_->add_self_to_cgroup(tenant_id_,
share::OBCG_MYSQL_LOGIN);
}
queue_.loop();
......
......@@ -2142,8 +2142,7 @@ int obmysql::sql_nio_add_cgroup(const uint64_t tenant_id)
if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread &&
nullptr != GCTX.cgroup_ctrl_ &&
OB_LIKELY(GCTX.cgroup_ctrl_->is_valid())) {
ret = GCTX.cgroup_ctrl_->add_thread_to_cgroup(syscall(__NR_gettid),
tenant_id, OBCG_SQL_NIO);
ret = GCTX.cgroup_ctrl_->add_self_to_cgroup(tenant_id, OBCG_SQL_NIO);
}
return ret;
}
......
......@@ -255,9 +255,8 @@ void ObPxPool::run1()
ObCgroupCtrl *cgroup_ctrl = GCTX.cgroup_ctrl_;
LOG_INFO("run px pool", K(group_id_), K(tenant_id_), K_(active_threads));
if (nullptr != cgroup_ctrl && OB_LIKELY(cgroup_ctrl->is_valid())) {
pid_t pid = static_cast<pid_t>(syscall(__NR_gettid));
cgroup_ctrl->add_thread_to_cgroup(pid, tenant_id_, group_id_);
LOG_INFO("set pid to group succ", K(tenant_id_), K(group_id_), K(pid));
cgroup_ctrl->add_self_to_cgroup(tenant_id_, group_id_);
LOG_INFO("add thread to group succ", K(tenant_id_), K(group_id_));
}
if (!is_inited_) {
......@@ -409,7 +408,7 @@ void ObResourceGroup::check_worker_count(ObThWorker &w)
ATOMIC_INC(&token_cnt_);
ATOMIC_STORE(&token_change_ts_, now);
if (cgroup_ctrl_->is_valid()
&& OB_FAIL(cgroup_ctrl_->remove_thread_from_cgroup(w.get_tid(), tenant_->id()))) {
&& OB_FAIL(cgroup_ctrl_->remove_self_from_cgroup(tenant_->id()))) {
LOG_WARN("remove thread from cgroup failed", K(ret), "tenant:", tenant_->id(), K_(group_id));
}
}
......@@ -1369,7 +1368,7 @@ void ObTenant::check_worker_count(ObThWorker &w)
w.stop();
ATOMIC_INC(&token_cnt_);
ATOMIC_STORE(&token_change_ts_, now);
if (cgroup_ctrl_.is_valid() && OB_FAIL(cgroup_ctrl_.remove_thread_from_cgroup(w.get_tid(), id_))) {
if (cgroup_ctrl_.is_valid() && OB_FAIL(cgroup_ctrl_.remove_self_from_cgroup(id_))) {
LOG_WARN("remove thread from cgroup failed", K(ret), K_(id));
}
}
......@@ -1441,7 +1440,7 @@ void ObTenant::lq_end(ObThWorker &w)
{
int ret = OB_SUCCESS;
if (w.is_lq_yield()) {
if (OB_FAIL(cgroup_ctrl_.add_thread_to_cgroup(w.get_tid(), id_, w.get_group_id()))) {
if (OB_FAIL(cgroup_ctrl_.add_self_to_cgroup(id_, w.get_group_id()))) {
LOG_WARN("move thread from lq group failed", K(ret), K(id_));
} else {
w.set_lq_yield(false);
......@@ -1475,7 +1474,7 @@ int ObTenant::lq_yield(ObThWorker &w)
}
} else if (w.is_lq_yield()) {
// avoid duplicate change group
} else if (OB_FAIL(cgroup_ctrl_.add_thread_to_cgroup(w.get_tid(), id_, OBCG_LQ))) {
} else if (OB_FAIL(cgroup_ctrl_.add_self_to_cgroup(id_, OBCG_LQ))) {
LOG_WARN("move thread to lq group failed", K(ret), K(id_));
} else {
w.set_lq_yield();
......
......@@ -355,7 +355,7 @@ void ObThWorker::worker(int64_t &tenant_id, int64_t &req_recv_timestamp, int32_t
LOG_ERROR("invalid status, unexpected", K(tenant_));
} else {
if (nullptr != GCTX.cgroup_ctrl_ && nullptr != tenant_ && OB_LIKELY(GCTX.cgroup_ctrl_->is_valid()) && !has_add_to_cgroup_) {
if (OB_SUCC(GCTX.cgroup_ctrl_->add_thread_to_cgroup(gettid(), tenant_->id(), get_group_id()))) {
if (OB_SUCC(GCTX.cgroup_ctrl_->add_self_to_cgroup(tenant_->id(), get_group_id()))) {
has_add_to_cgroup_ = true;
}
}
......
......@@ -126,7 +126,7 @@ int64_t ObRsReentrantThread::get_last_run_timestamp() const
void ObRsReentrantThread::check_alert(const ObRsReentrantThread &thread)
{
if (thread.need_monitor_check()) {
const pid_t thread_id = thread.get_tid();
const pid_t thread_id = syscall(__NR_gettid); // only called by thread self
const char *thread_name = thread.get_thread_name();
int64_t last_run_timestamp = thread.get_last_run_timestamp();
int64_t last_run_interval = ObTimeUtility::current_time() - last_run_timestamp;
......
......@@ -270,7 +270,7 @@ int ObTenantBase::pre_run(lib::Threads *th)
ObTenantEnv::set_tenant(this);
ObCgroupCtrl *cgroup_ctrl = get_cgroup(th->get_cgroup());
if (cgroup_ctrl != nullptr) {
ret = cgroup_ctrl->add_thread_to_cgroup(static_cast<pid_t>(syscall(__NR_gettid)), id_);
ret = cgroup_ctrl->add_self_to_cgroup(id_);
}
ATOMIC_INC(&thread_count_);
LOG_INFO("tenant thread pre_run", K(MTL_ID()), K(ret), K(thread_count_), KP(th));
......@@ -283,7 +283,7 @@ int ObTenantBase::end_run(lib::Threads *th)
ObTenantEnv::set_tenant(nullptr);
ObCgroupCtrl *cgroup_ctrl = get_cgroup(th->get_cgroup());
if (cgroup_ctrl != nullptr) {
ret = cgroup_ctrl->remove_thread_from_cgroup(static_cast<pid_t>(syscall(__NR_gettid)), id_);
ret = cgroup_ctrl->remove_self_from_cgroup(id_);
}
ATOMIC_DEC(&thread_count_);
LOG_INFO("tenant thread end_run", K(id_), K(ret), K(thread_count_));
......
......@@ -368,7 +368,7 @@ int ObCgroupCtrl::create_user_tenant_group_dir(
return ret;
}
int ObCgroupCtrl::add_thread_to_cgroup(const pid_t tid, const uint64_t tenant_id, int64_t group_id)
int ObCgroupCtrl::add_self_to_cgroup(const uint64_t tenant_id, int64_t group_id)
{
int ret = OB_SUCCESS;
char group_path[PATH_BUFSIZE];
......@@ -383,7 +383,7 @@ int ObCgroupCtrl::add_thread_to_cgroup(const pid_t tid, const uint64_t tenant_id
} else {
char task_path[PATH_BUFSIZE];
char tid_value[VALUE_BUFSIZE];
snprintf(tid_value, VALUE_BUFSIZE, "%d", tid);
snprintf(tid_value, VALUE_BUFSIZE, "%ld", syscall(__NR_gettid));
snprintf(task_path, PATH_BUFSIZE, "%s/tasks", group_path);
if(OB_FAIL(write_string_to_file_(task_path, tid_value))) {
LOG_WARN("add tid to cgroup failed", K(ret), K(task_path), K(tid_value), K(tenant_id));
......@@ -409,14 +409,14 @@ int ObCgroupCtrl::get_group_info_by_group_id(const uint64_t tenant_id,
return ret;
}
int ObCgroupCtrl::remove_thread_from_cgroup(const pid_t tid, const uint64_t tenant_id)
int ObCgroupCtrl::remove_self_from_cgroup(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
char task_path[PATH_BUFSIZE];
char tid_value[VALUE_BUFSIZE];
// 把该tid加入other_cgroup目录的tasks文件中就会从其它tasks中删除
snprintf(task_path, PATH_BUFSIZE, "%s/tasks", other_cgroup_);
snprintf(tid_value, VALUE_BUFSIZE, "%d", tid);
snprintf(tid_value, VALUE_BUFSIZE, "%ld", syscall(__NR_gettid));
if(OB_FAIL(write_string_to_file_(task_path, tid_value))) {
LOG_WARN("remove tid from cgroup failed", K(ret), K(task_path), K(tid_value), K(tenant_id));
} else {
......@@ -425,18 +425,17 @@ int ObCgroupCtrl::remove_thread_from_cgroup(const pid_t tid, const uint64_t tena
return ret;
}
int ObCgroupCtrl::add_thread_to_group(const pid_t tid,
const uint64_t tenant_id,
int ObCgroupCtrl::add_self_to_group(const uint64_t tenant_id,
const uint64_t group_id)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
ret = OB_INVALID_CONFIG;
LOG_WARN("invalid config", K(ret), K(tenant_id));
} else if (OB_FAIL(add_thread_to_cgroup(tid, tenant_id, group_id))) {
LOG_WARN("fail to add thread to group", K(ret), K(tid), K(group_id), K(tenant_id));
} else if (OB_FAIL(add_self_to_cgroup(tenant_id, group_id))) {
LOG_WARN("fail to add thread to group", K(ret), K(group_id), K(tenant_id));
} else {
LOG_INFO("set backup pid to group success", K(tenant_id), K(group_id), K(tid));
LOG_INFO("set backup pid to group success", K(tenant_id), K(group_id));
}
return ret;
}
......
......@@ -115,14 +115,13 @@ public:
// 删除租户cgroup规则
int remove_tenant_cgroup(const uint64_t tenant_id);
int add_thread_to_cgroup(const pid_t tid, const uint64_t tenant_id, int64_t group_id = INT64_MAX);
int add_self_to_cgroup(const uint64_t tenant_id, int64_t group_id = INT64_MAX);
// 从指定租户cgroup组移除指定tid
int remove_thread_from_cgroup(const pid_t tid, const uint64_t tenant_id);
int remove_self_from_cgroup(const uint64_t tenant_id);
// 后台线程绑定接口
int add_thread_to_group(const pid_t tid,
const uint64_t tenant_id,
int add_self_to_group(const uint64_t tenant_id,
const uint64_t group_id);
// 设定指定租户cgroup组的cpu.shares
int set_cpu_shares(const int32_t cpu_shares, const uint64_t tenant_id, int64_t group_id = INT64_MAX);
......
......@@ -1380,7 +1380,7 @@ int ObTenantDagWorker::set_dag_resource()
LOG_WARN("fail to get group id by function", K(ret), K(MTL_ID()), K(function_type_), K(group_id));
} else if (group_id == group_id_) {
// group not change, do nothing
} else if (OB_FAIL(GCTX.cgroup_ctrl_->add_thread_to_group(static_cast<pid_t>(GETTID()), MTL_ID(), group_id))) {
} else if (OB_FAIL(GCTX.cgroup_ctrl_->add_self_to_group(MTL_ID(), group_id))) {
LOG_WARN("bind back thread to group failed", K(ret), K(GETTID()), K(MTL_ID()), K(group_id));
} else {
ATOMIC_SET(&group_id_, group_id);
......
......@@ -24,9 +24,9 @@ using namespace oceanbase::observer;
void *thread_func(void *args)
{
int *tid = static_cast<int *>(args);
*tid = static_cast<int>(syscall(SYS_gettid));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.add_self_to_cgroup(1001));
sleep(3);
ASSERT_EQ(OB_SUCCESS, cg_ctrl.remove_self_from_cgroup(1001));
return nullptr;
}
......@@ -37,29 +37,16 @@ TEST(TestCgroupCtrl, AddDelete)
ASSERT_EQ(OB_SUCCESS, cg_ctrl.init());
ASSERT_TRUE(cg_ctrl.is_valid());
int tids[4];
pthread_t ts[4];
for (int i = 0; i < 4; i++) {
pthread_create(&ts[i], nullptr, thread_func, &tids[i]);
pthread_create(&ts[i], nullptr, thread_func, NULL);
}
sleep(1);
const uint64_t tenant_id1 = 1001;
ASSERT_EQ(OB_SUCCESS, cg_ctrl.add_thread_to_cgroup(tids[0],tenant_id1));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.add_thread_to_cgroup(tids[1],tenant_id1));
const uint64_t tenant_id2 = 1002;
ASSERT_EQ(OB_SUCCESS, cg_ctrl.add_thread_to_cgroup(tids[2], tenant_id2));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.add_thread_to_cgroup(tids[3], tenant_id2));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.remove_thread_from_cgroup(tids[2], tenant_id2));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.remove_thread_from_cgroup(tids[3], tenant_id2));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.remove_tenant_cgroup(tenant_id1));
for (int i = 0; i < 4; i++) {
pthread_join(ts[i], nullptr);
}
ASSERT_EQ(OB_SUCCESS, cg_ctrl.remove_tenant_cgroup(1001));
}
TEST(TestCgroupCtrl, SetGetValue)
......@@ -68,27 +55,22 @@ TEST(TestCgroupCtrl, SetGetValue)
ASSERT_EQ(OB_SUCCESS, cg_ctrl.init());
ASSERT_TRUE(cg_ctrl.is_valid());
int tids[4];
pthread_t ts[4];
for (int i = 0; i < 4; i++) {
pthread_create(&ts[i], nullptr, thread_func, &tids[i]);
pthread_create(&ts[i], nullptr, thread_func, NULL);
}
sleep(1);
const uint64_t tenant_id1 = 1001;
ASSERT_EQ(OB_SUCCESS, cg_ctrl.add_thread_to_cgroup(tids[0],tenant_id1));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.add_thread_to_cgroup(tids[1],tenant_id1));
const int32_t cpu_shares = 2048;
int32_t cpu_shares_v = 0;
ASSERT_EQ(OB_SUCCESS, cg_ctrl.set_cpu_shares(cpu_shares, tenant_id1));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.get_cpu_shares(cpu_shares_v, tenant_id1));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.set_cpu_shares(cpu_shares, 1001));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.get_cpu_shares(cpu_shares_v, 1001));
ASSERT_EQ(cpu_shares, cpu_shares_v);
const int32_t cpu_cfs_quota = 80000;
int32_t cpu_cfs_quota_v = 0;
ASSERT_EQ(OB_SUCCESS, cg_ctrl.set_cpu_cfs_quota(cpu_cfs_quota, tenant_id1));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.get_cpu_cfs_quota(cpu_cfs_quota_v, tenant_id1));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.set_cpu_cfs_quota(cpu_cfs_quota, 1001));
ASSERT_EQ(OB_SUCCESS, cg_ctrl.get_cpu_cfs_quota(cpu_cfs_quota_v, 1001));
ASSERT_EQ(cpu_cfs_quota, cpu_cfs_quota_v);
for (int i = 0; i < 4; i++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册