提交 e6af47ad 编写于 作者: Y yangzhifeng 提交者: ob-robot

refactor: clean the code of lock module

上级 e01e4207
......@@ -114,6 +114,7 @@ ob_set_subtarget(oblib_lib lock
lock/ob_thread_cond.cpp
lock/ob_rwlock.cpp
lock/ob_futex.cpp
lock/ob_qsync_lock.cpp
)
ob_set_subtarget(oblib_lib mysqlclient
......
/**
* Copyright (c) 2021 OceanBase
* Copyright (c) 2021, 2022 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
......@@ -18,6 +18,13 @@
namespace obutil
{
template<class T> class ObMonitor;
/**
* A wrapper class of pthread condition that implements a condition variable.
* @note The condition variable itself is not thread safe and should be protected
* by a mutex.
* See also ObThreadCond which is suitable for most situations.
*/
class ObUtilMutex;
typedef ObUtilMutex Mutex;
class Cond
......
/**
* Copyright (c) 2021 OceanBase
* Copyright (c) 2021, 2022 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
......@@ -18,6 +18,13 @@
namespace obutil
{
/**
* A wrapper class of pthread mutex.
* This class is intended to be used in other low level construct only.
* In most situations, you should use ObLatch or ObMutex.
*
*/
class ObUtilMutex
{
public:
......
/**
* Copyright (c) 2021 OceanBase
* Copyright (c) 2021, 2022 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
......@@ -18,6 +18,11 @@
namespace obutil
{
/**
* A monitor is a synchronization construct that allows threads to have both mutual exclusion and the ability to
* wait (block) for a certain condition to become false. Monitors also have a mechanism for signaling other threads
* that their condition has been met.
*/
template <class T>
class ObMonitor
{
......
/**
* Copyright (c) 2021 OceanBase
* Copyright (c) 2021, 2022 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
......@@ -10,11 +10,11 @@
* See the Mulan PubL v2 for more details.
*/
#include "share/lock/ob_qsync_lock.h"
#include "ob_qsync_lock.h"
namespace oceanbase
{
namespace share
namespace common
{
int ObQSyncLock::init(const lib::ObMemAttr &mem_attr)
{
......
/**
* Copyright (c) 2021 OceanBase
* Copyright (c) 2021, 2022 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
......@@ -17,7 +17,7 @@
namespace oceanbase
{
namespace share
namespace common
{
class ObQSyncLock
......
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef THREAD_COND_H_
#define THREAD_COND_H_
#include "lib/lock/ob_thread_cond.h"
namespace oceanbase {
namespace obsys {
class ThreadCond {
public:
ThreadCond() {
cond_.init();
}
~ThreadCond() {
cond_.destroy();
}
int lock() {
return cond_.lock();
}
int unlock() {
return cond_.unlock();
}
bool wait(int milliseconds = 0) {
return oblib::OB_SUCCESS == cond_.wait(milliseconds);
}
void signal() {
cond_.signal();
}
void broadcast() {
cond_.broadcast();
}
private:
oceanbase::common::ObThreadCond cond_;
};
}
}
#endif /*THREAD_COND_H_*/
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef THREAD_MUTEX_H_
#define THREAD_MUTEX_H_
#include <assert.h>
#include <pthread.h>
namespace oceanbase {
namespace obsys {
class ThreadMutex {
public:
ThreadMutex() {
//assert(pthread_mutex_init(&mutex_, NULL) == 0);
const int iRet = pthread_mutex_init(&mutex_, NULL);
(void) iRet;
assert( iRet == 0 );
}
~ThreadMutex() {
pthread_mutex_destroy(&mutex_);
}
void lock () {
pthread_mutex_lock(&mutex_);
}
int trylock () {
return pthread_mutex_trylock(&mutex_);
}
void unlock() {
pthread_mutex_unlock(&mutex_);
}
protected:
pthread_mutex_t mutex_;
};
class ThreadGuard
{
public:
[[nodiscard]] ThreadGuard(ThreadMutex *mutex)
{
mutex_ = NULL;
if (mutex) {
mutex_ = mutex;
mutex_->lock();
}
}
~ThreadGuard()
{
if (mutex_) {
mutex_->unlock();
}
}
private:
ThreadMutex *mutex_;
};
}
}
#endif /*THREAD_MUTEX_H_*/
......@@ -64,6 +64,7 @@ oblib_addtest(lock/test_bucket_lock.cpp)
oblib_addtest(lock/test_latch.cpp)
#oblib_addtest(lock/test_scond.cpp)
oblib_addtest(lock/test_thread_cond.cpp)
oblib_addtest(lock/test_qsync_lock.cpp)
oblib_addtest(metrics/test_ema_v2.cpp)
oblib_addtest(metrics/test_ob_accumulator.cpp)
oblib_addtest(net/test_ob_addr.cpp)
......
......@@ -14,7 +14,7 @@
#include <thread>
#include <vector>
#include "share/lock/ob_qsync_lock.h"
#include "lib/lock/ob_qsync_lock.h"
#include "lib/time/ob_time_utility.h"
namespace oceanbase
......
......@@ -234,7 +234,6 @@ ob_set_subtarget(ob_share common_mixed
system_variable/ob_system_variable_init.cpp
table/ob_table.cpp
table/ob_table_rpc_struct.cpp
lock/ob_qsync_lock.cpp
)
ob_set_subtarget(ob_share tablet
......
/**
* Copyright (c) 2021 OceanBase
* Copyright (c) 2021, 2022 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
......@@ -32,7 +32,7 @@ int ObLSLocationMap::init()
ret = OB_INIT_TWICE;
LOG_WARN("ObLSLocationMap init twice", KR(ret));
} else if (OB_ISNULL(buckets_lock_ =
(share::ObQSyncLock*)ob_malloc(sizeof(share::ObQSyncLock) * BUCKETS_CNT, mem_attr))) {
(common::ObQSyncLock*)ob_malloc(sizeof(common::ObQSyncLock) * BUCKETS_CNT, mem_attr))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("Fail to allocate ObQSyncLock memory, ", KR(ret), LITERAL_K(BUCKETS_CNT));
} else if (OB_ISNULL(buf = ob_malloc(sizeof(ObLSLocation*) * BUCKETS_CNT, mem_attr))) {
......@@ -40,7 +40,7 @@ int ObLSLocationMap::init()
LOG_WARN("Fail to allocate ObLSLocation memory, ", KR(ret), LITERAL_K(BUCKETS_CNT));
} else {
for (int64_t i = 0 ; i < BUCKETS_CNT; ++i) {
new(buckets_lock_ + i) share::ObQSyncLock();
new(buckets_lock_ + i) common::ObQSyncLock();
if (OB_FAIL((buckets_lock_ + i)->init(mem_attr))) {
LOG_WARN("buckets_lock_ init fail", K(ret), K(OB_SERVER_TENANT_ID));
for (int64_t j = 0 ; j <= i; ++j) {
......@@ -70,7 +70,7 @@ void ObLSLocationMap::destroy()
ObLSLocation *ls_location = nullptr;
ObLSLocation *next_ls = nullptr;
for (int64_t i = 0; i < BUCKETS_CNT; ++i) {
share::ObQSyncLockWriteGuard guard(buckets_lock_[i]);
ObQSyncLockWriteGuard guard(buckets_lock_[i]);
ls_location = ls_buckets_[i];
while (OB_NOT_NULL(ls_location)) {
next_ls = (ObLSLocation*)ls_location->next_;
......@@ -105,7 +105,7 @@ int ObLSLocationMap::update(
LOG_WARN("ObLSLocationMap not init", KR(ret), K(ls_location));
} else {
int64_t pos = key.hash() % BUCKETS_CNT;
share::ObQSyncLockWriteGuard guard(buckets_lock_[pos]);
ObQSyncLockWriteGuard guard(buckets_lock_[pos]);
curr = ls_buckets_[pos];
while (OB_NOT_NULL(curr)) {
if (curr->get_cache_key() == key) {
......@@ -152,7 +152,7 @@ int ObLSLocationMap::get(
LOG_WARN("ObLSLocationMap not init", KR(ret), K(key));
} else {
pos = key.hash() % BUCKETS_CNT;
share::ObQSyncLockReadGuard bucket_guard(buckets_lock_[pos]);
ObQSyncLockReadGuard bucket_guard(buckets_lock_[pos]);
ls_location = ls_buckets_[pos];
while (OB_NOT_NULL(ls_location)) {
if (ls_location->get_cache_key() == key) {
......@@ -188,7 +188,7 @@ int ObLSLocationMap::del(const ObLSLocationCacheKey &key)
} else {
int64_t pos = key.hash() % BUCKETS_CNT;
//remove ls from map
share::ObQSyncLockWriteGuard guard(buckets_lock_[pos]);
ObQSyncLockWriteGuard guard(buckets_lock_[pos]);
ls_location = ls_buckets_[pos];
while (OB_NOT_NULL(ls_location)) {
if (ls_location->get_cache_key() == key) {
......@@ -224,7 +224,7 @@ int ObLSLocationMap::check_and_generate_dead_cache(ObLSLocationArray &arr)
// ignore ret code
for (int64_t i = 0; i < BUCKETS_CNT; ++i) {
ls_location = ls_buckets_[i];
share::ObQSyncLockWriteGuard guard(buckets_lock_[i]);
ObQSyncLockWriteGuard guard(buckets_lock_[i]);
// foreach bucket
while (OB_NOT_NULL(ls_location)) {
if (common::ObClockGenerator::getClock() - ls_location->get_last_access_ts()
......@@ -245,7 +245,7 @@ int ObLSLocationMap::get_all(ObLSLocationArray &arr)
ObLSLocation *ls_location = NULL;
for (int64_t i = 0; i < BUCKETS_CNT; ++i) {
ls_location = ls_buckets_[i];
share::ObQSyncLockReadGuard guard(buckets_lock_[i]);
ObQSyncLockReadGuard guard(buckets_lock_[i]);
// foreach bucket
while (OB_NOT_NULL(ls_location) && OB_SUCC(ret)) {
if (OB_FAIL(arr.push_back(*ls_location))) {
......
/**
* Copyright (c) 2021 OceanBase
* Copyright (c) 2021, 2022 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
......@@ -14,7 +14,7 @@
#define OCEANBASE_SHARE_OB_LS_LOCATION_MAP
#include "share/location_cache/ob_location_struct.h"
#include "share/lock/ob_qsync_lock.h"
#include "lib/lock/ob_qsync_lock.h"
namespace oceanbase
{
......@@ -79,7 +79,7 @@ private:
int64_t size_;
ObLSLocation **ls_buckets_;
const static int64_t BUCKETS_CNT = 1 << 8;
share::ObQSyncLock *buckets_lock_;
common::ObQSyncLock *buckets_lock_;
};
......
......@@ -13,7 +13,7 @@
#define USING_LOG_PREFIX SHARE_LOCATION
#include "share/location_cache/ob_tablet_ls_map.h"
#include "share/lock/ob_qsync_lock.h" // ObQSyncLock
#include "lib/lock/ob_qsync_lock.h" // ObQSyncLock
#include "common/ob_clock_generator.h" // ObClockGenerator
#include "lib/objectpool/ob_concurrency_objpool.h" // op_alloc, op_free
......
......@@ -17,10 +17,13 @@
namespace oceanbase
{
namespace common
{
class ObQSyncLock;
}
namespace share
{
class ObTabletLSService;
class ObQSyncLock;
class ObTabletLSMap
{
......@@ -57,7 +60,7 @@ private:
bool is_inited_;
int64_t size_;
ObTabletLSCache **ls_buckets_;
share::ObQSyncLock *buckets_lock_;
common::ObQSyncLock *buckets_lock_;
};
} // end namespace share
......
/**
* Copyright (c) 2021 OceanBase
* Copyright (c) 2021, 2022 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
......@@ -23,7 +23,7 @@
#include "lib/container/ob_iarray.h"
#include "share/ob_errno.h"
#include "share/ob_thread_pool.h"
#include "share/lock/ob_qsync_lock.h"
#include "lib/lock/ob_qsync_lock.h"
#include "ob_gts_source.h"
#include "ob_gts_define.h"
#include "ob_ts_worker.h"
......@@ -174,7 +174,7 @@ private:
ObITsSource *ts_source_[MAX_TS_SOURCE];
int cur_ts_type_;
ObGtsSource gts_source_;
mutable share::ObQSyncLock rwlock_;
mutable common::ObQSyncLock rwlock_;
int64_t last_access_ts_ CACHE_ALIGNED;
};
......@@ -408,7 +408,7 @@ private:
ObLocationAdapter *location_adapter_;
ObLocationAdapter location_adapter_def_;
ObTsWorker ts_worker_;
share::ObQSyncLock lock_;
common::ObQSyncLock lock_;
ObTsSourceInfo *ts_source_infos_[TS_SOURCE_INFO_CACHE_NUM];
};
......
/**
* Copyright (c) 2021 OceanBase
* Copyright (c) 2021, 2022 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
......@@ -80,7 +80,7 @@ int ObLSIterator::get_next(ObLS *&ls)
ret = OB_ITER_END;
} else {
if (OB_NOT_NULL(ls_map_->ls_buckets_[bucket_pos_])) {
share::ObQSyncLockReadGuard guard(ls_map_->buckets_lock_[bucket_pos_]);
ObQSyncLockReadGuard guard(ls_map_->buckets_lock_[bucket_pos_]);
ls = ls_map_->ls_buckets_[bucket_pos_];
while (OB_NOT_NULL(ls) && OB_SUCC(ret)) {
......@@ -149,14 +149,14 @@ int ObLSMap::init(const int64_t tenant_id, ObIAllocator *ls_allocator)
} else if (OB_ISNULL(ls_allocator)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret));
} else if (OB_ISNULL(buckets_lock_ = (share::ObQSyncLock*)ob_malloc(sizeof(share::ObQSyncLock) * BUCKETS_CNT, mem_attr))) {
} else if (OB_ISNULL(buckets_lock_ = (common::ObQSyncLock*)ob_malloc(sizeof(common::ObQSyncLock) * BUCKETS_CNT, mem_attr))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
} else if (OB_ISNULL(buf = ob_malloc(sizeof(ObLS*) * BUCKETS_CNT, mem_attr))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("Fail to allocate memory, ", K(ret), LITERAL_K(BUCKETS_CNT));
} else {
for (int64_t i = 0 ; i < BUCKETS_CNT; ++i) {
new(buckets_lock_ + i) share::ObQSyncLock();
new(buckets_lock_ + i) common::ObQSyncLock();
if (OB_FAIL((buckets_lock_ + i)->init(mem_attr))) {
LOG_WARN("buckets_lock_ init fail", K(ret), K(tenant_id));
for (int64_t j = 0 ; j <= i; ++j) {
......@@ -201,7 +201,7 @@ int ObLSMap::add_ls(
LOG_WARN("ObLSMap not init", K(ret), K(ls_id));
} else {
int64_t pos = ls_id.hash() % BUCKETS_CNT;
share::ObQSyncLockWriteGuard guard(buckets_lock_[pos]);
ObQSyncLockWriteGuard guard(buckets_lock_[pos]);
curr = ls_buckets_[pos];
while (OB_NOT_NULL(curr)) {
if (curr->get_ls_id() == ls_id) {
......@@ -248,7 +248,7 @@ int ObLSMap::del_ls(const share::ObLSID &ls_id)
} else {
int64_t pos = ls_id.hash() % BUCKETS_CNT;
//remove ls from map
share::ObQSyncLockWriteGuard guard(buckets_lock_[pos]);
ObQSyncLockWriteGuard guard(buckets_lock_[pos]);
ls = ls_buckets_[pos];
while (OB_NOT_NULL(ls)) {
if (ls->get_ls_id() == ls_id) {
......@@ -302,7 +302,7 @@ int ObLSMap::get_ls(const share::ObLSID &ls_id,
LOG_WARN("ObLSMap not init", K(ret), K(ls_id));
} else {
pos = ls_id.hash() % BUCKETS_CNT;
share::ObQSyncLockReadGuard bucket_guard(buckets_lock_[pos]);
ObQSyncLockReadGuard bucket_guard(buckets_lock_[pos]);
ls = ls_buckets_[pos];
while (OB_NOT_NULL(ls)) {
if (ls->get_ls_id() == ls_id) {
......@@ -329,7 +329,7 @@ int ObLSMap::remove_duplicate_ls()
LOG_WARN("ObLSMap has not been inited", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < BUCKETS_CNT; ++i) {
share::ObQSyncLockWriteGuard bucket_guard(buckets_lock_[i]);
ObQSyncLockWriteGuard bucket_guard(buckets_lock_[i]);
if (nullptr != ls_buckets_[i]) {
if (OB_FAIL(remove_duplicate_ls_in_linklist(ls_buckets_[i]))) {
LOG_WARN("fail to remove same ls in linklist", K(ret));
......
/**
* Copyright (c) 2021 OceanBase
* Copyright (c) 2021, 2022 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
......@@ -16,7 +16,7 @@
#include "lib/oblog/ob_log_module.h"
#include "lib/allocator/ob_concurrent_fifo_allocator.h"
#include "lib/container/ob_iarray.h"
#include "share/lock/ob_qsync_lock.h"
#include "lib/lock/ob_qsync_lock.h"
#include "storage/ls/ob_ls.h"
#include "share/leak_checker/obj_leak_checker.h"
......@@ -77,7 +77,7 @@ private:
int64_t ls_cnt_;
ObLS **ls_buckets_;
const static int64_t BUCKETS_CNT = 1 << 8;
share::ObQSyncLock *buckets_lock_;
common::ObQSyncLock *buckets_lock_;
};
//iterate all lss
......@@ -129,7 +129,7 @@ int ObLSMap::operate_ls(const share::ObLSID &ls_id,
} else {
const int64_t pos = ls_id.hash() % BUCKETS_CNT;
ObLS *ls = ls_buckets_[pos];
share::ObQSyncLockReadGuard bucket_guard(buckets_lock_[pos]);
common::ObQSyncLockReadGuard bucket_guard(buckets_lock_[pos]);
while (OB_NOT_NULL(ls)) {
if (ls->get_ls_id() == ls_id) {
break;
......
......@@ -52,7 +52,6 @@ storage_unittest(test_ob_occam_timer)
storage_unittest(test_ob_occam_thread_pool)
#ob_unittest(test_tenant_tablet_checksum_iterator)
ob_unittest(test_ob_future)
ob_unittest(test_qsync_lock lock/test_qsync_lock.cpp)
ob_unittest(test_ob_occam_time_guard)
ob_unittest(test_cluster_version)
ob_unittest(test_scn)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册