提交 2d1b1261 编写于 作者: O obdev 提交者: OB-robot

remove memtable_ctx factory and skip removing some retain ctx

上级 4bd3f4e0
......@@ -302,7 +302,7 @@ private:
};
class ObMemtable;
typedef ObMemtableCtxFactory::IDMap MemtableIDMap;
typedef common::ObIDMap<ObIMemtableCtx, uint32_t> MemtableIDMap;
class ObMemtableCtx final : public ObIMemtableCtx
{
using RWLock = common::SpinRWLock;
......
......@@ -20,124 +20,6 @@ namespace oceanbase
using namespace common;
namespace memtable
{
ObMemtableCtxFactory::ObMemtableCtxFactory()
: is_inited_(false),
mod_(ObModIds::OB_MEMTABLE_CTX_OBJECT),
ctx_obj_allocator_(OBJ_ALLOCATOR_PAGE, mod_),
ctx_dynamic_allocator_(),
malloc_allocator_(ObModIds::OB_MEMTABLE_CTX),
free_list_(),
alloc_count_(0),
free_count_(0)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ctx_dynamic_allocator_.init(
DYNAMIC_ALLOCATOR_PAGE,
ObModIds::OB_MEMTABLE_CTX,
common::OB_SERVER_TENANT_ID,
DYNAMIC_ALLOCATOR_PAGE_NUM))) {
TRANS_LOG(ERROR, "ctx dynamic allocator init fail", K(ret));
} else if (OB_FAIL(free_list_.init(MAX_CTX_HOLD_COUNT))) {
TRANS_LOG(ERROR, "free list init fail", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < MAX_CTX_HOLD_COUNT; ++i) {
ObIMemtableCtx *ctx = NULL;
void *ctx_buffer = ctx_obj_allocator_.alloc(sizeof(ObMemtableCtx));
if (NULL == ctx_buffer) {
TRANS_LOG(ERROR, "ctx obj allocator alloc fail");
ret = OB_ALLOCATE_MEMORY_FAILED;
} else if (NULL == (ctx = new(ctx_buffer) ObMemtableCtx())) {
TRANS_LOG(ERROR, "new ObMemtableCtx fail");
ret = OB_ALLOCATE_MEMORY_FAILED;
} else if (OB_FAIL(free_list_.push(ctx))) {
TRANS_LOG(ERROR, "free list push fail", K(ret));
} else {
ctx->set_alloc_type(CTX_ALLOC_FIX);
}
}
}
if (OB_SUCC(ret)) {
ObMemAttr attr;
attr.label_ = ObModIds::OB_MEMTABLE_CTX;
ctx_dynamic_allocator_.set_attr(attr);
is_inited_ = true;
}
}
ObMemtableCtxFactory::~ObMemtableCtxFactory()
{
is_inited_ = false;
ObIMemtableCtx *ctx = NULL;
while (OB_SUCCESS == free_list_.pop(ctx)) {
if (NULL != ctx) {
int alloc_type = ctx->get_alloc_type();
ctx->~ObIMemtableCtx();
if (CTX_ALLOC_VAR == alloc_type) {
ctx_dynamic_allocator_.free(ctx);
}
ctx = NULL;
}
}
free_list_.destroy();
ctx_dynamic_allocator_.destroy();
}
ObIMemtableCtx *ObMemtableCtxFactory::alloc(const uint64_t tenant_id/* OB_SERVER_TENANT_ID */)
{
ObIMemtableCtx *ctx = NULL;
int tmp_ret = free_list_.pop(ctx);
if (OB_SUCCESS != tmp_ret || NULL == ctx)
{
void *ctx_buffer = ctx_dynamic_allocator_.alloc(sizeof(ObMemtableCtx));
if (NULL == ctx_buffer) {
TRANS_LOG(WARN, "ctx obj allocator alloc fail");
} else if (NULL == (ctx = new(ctx_buffer) ObMemtableCtx())) {
TRANS_LOG(WARN, "new ObMemtableCtx fail");
} else {
ctx->set_alloc_type(CTX_ALLOC_VAR);
}
}
if (NULL != ctx) {
if (OB_SUCCESS != (tmp_ret = static_cast<ObMemtableCtx *>(ctx)->init(tenant_id))) {
TRANS_LOG(WARN, "memtable context init error", "ret", tmp_ret);
}
}
if (NULL != ctx) {
PC_ADD(MTCTX, 1);
(void)ATOMIC_AAF(&alloc_count_, 1);
if (REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
TRANS_LOG(INFO, "alloc memtable context", K_(alloc_count), K_(free_count));
}
GARL_ADD(&ctx->resource_link_, "memctx");
if(REACH_TIME_INTERVAL(10 * 1000 * 1000))
{
PC_REPORT();
}
}
return ctx;
}
void ObMemtableCtxFactory::free(ObIMemtableCtx *ctx)
{
if (NULL != ctx) {
GARL_DEL(&ctx->resource_link_);
ctx->reset();
if (CTX_ALLOC_VAR == ctx->get_alloc_type()) {
ctx->~ObIMemtableCtx();
ctx_dynamic_allocator_.free(ctx);
} else if (CTX_ALLOC_FIX == ctx->get_alloc_type()) {
int tmp_ret = free_list_.push(ctx);
if (OB_SUCCESS != tmp_ret) {
TRANS_LOG(ERROR, "free list push fail, ctx will leek", "ret", tmp_ret, KP(ctx));
}
} else {
TRANS_LOG(ERROR, "unknow ctx alloc type", "alloc_type", ctx->get_alloc_type());
}
ctx = NULL;
(void)ATOMIC_AAF(&free_count_, 1);
}
PC_ADD(MTCTX, -1);
}
////////////////////////////////////////////////////////////////////////////////////////////////////
......
......@@ -275,53 +275,6 @@ protected:
////////////////////////////////////////////////////////////////////////////////////////////////////
class ObIMemtableCtxFactory
{
public:
ObIMemtableCtxFactory() {}
virtual ~ObIMemtableCtxFactory() {}
public:
virtual ObIMemtableCtx *alloc(const uint64_t tenant_id = OB_SERVER_TENANT_ID) = 0;
virtual void free(ObIMemtableCtx *ctx) = 0;
};
class ObMemtableCtxFactory : public ObIMemtableCtxFactory
{
public:
enum
{
CTX_ALLOC_FIX = 1,
CTX_ALLOC_VAR = 2,
};
typedef common::ObFixedQueue<ObIMemtableCtx> FreeList;
typedef common::ObIDMap<ObIMemtableCtx, uint32_t> IDMap;
typedef common::ObLfFIFOAllocator DynamicAllocator;
static const int64_t OBJ_ALLOCATOR_PAGE = 1L<<22; //4MB
static const int64_t DYNAMIC_ALLOCATOR_PAGE = common::OB_MALLOC_NORMAL_BLOCK_SIZE * 8 - 1024; // 64k
static const int64_t DYNAMIC_ALLOCATOR_PAGE_NUM = common::OB_MAX_CPU_NUM;
static const int64_t MAX_CTX_HOLD_COUNT = 10000;
static const int64_t MAX_CTX_COUNT = 3000000;
public:
ObMemtableCtxFactory();
~ObMemtableCtxFactory();
public:
ObIMemtableCtx *alloc(const uint64_t tenant_id = OB_SERVER_TENANT_ID);
void free(ObIMemtableCtx *ctx);
DynamicAllocator &get_allocator() { return ctx_dynamic_allocator_; }
common::ObIAllocator &get_malloc_allocator() { return malloc_allocator_; }
private:
DISALLOW_COPY_AND_ASSIGN(ObMemtableCtxFactory);
private:
bool is_inited_;
common::ModulePageAllocator mod_;
common::ModuleArena ctx_obj_allocator_;
DynamicAllocator ctx_dynamic_allocator_;
common::ObMalloc malloc_allocator_;
FreeList free_list_;
int64_t alloc_count_;
int64_t free_count_;
};
class ObMemtableFactory
{
public:
......
......@@ -1315,7 +1315,8 @@ int ObPartTransCtx::recover_tx_ctx_table_info(const ObTxCtxTableInfo &ctx_info)
|| ObTxState::CLEAR == exec_info_.state_)) {
if (OB_FAIL(insert_into_retain_ctx_mgr_(RetainCause::MDS_WAIT_GC_COMMIT_LOG,
exec_info_.max_applying_log_ts_,
exec_info_.max_durable_lsn_))) {
exec_info_.max_durable_lsn_,
false))) {
TRANS_LOG(WARN, "insert into retain ctx mgr failed", K(ret), KPC(this));
} else if ((exec_info_.trans_type_ == TransType::SP_TRANS
&& ObTxState::COMMIT == exec_info_.state_)
......@@ -1832,7 +1833,7 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb)
} else if (ObTxLogType::TX_COMMIT_LOG == log_type) {
tg.click();
if (exec_info_.multi_data_source_.count() > 0 && get_retain_cause() == RetainCause::UNKOWN
&& OB_FAIL(insert_into_retain_ctx_mgr_(RetainCause::MDS_WAIT_GC_COMMIT_LOG, log_ts, log_lsn))) {
&& OB_FAIL(insert_into_retain_ctx_mgr_(RetainCause::MDS_WAIT_GC_COMMIT_LOG, log_ts, log_lsn, false))) {
TRANS_LOG(WARN, "insert into retain_ctx_mgr failed", K(ret), KPC(log_cb), KPC(this));
}
if (is_local_tx_()) {
......@@ -1864,7 +1865,7 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb)
}
} else if (ObTxLogType::TX_ABORT_LOG == log_type) {
if (exec_info_.multi_data_source_.count() > 0 && get_retain_cause() == RetainCause::UNKOWN
&& OB_FAIL(insert_into_retain_ctx_mgr_(RetainCause::MDS_WAIT_GC_COMMIT_LOG, log_ts, log_lsn))) {
&& OB_FAIL(insert_into_retain_ctx_mgr_(RetainCause::MDS_WAIT_GC_COMMIT_LOG, log_ts, log_lsn, false))) {
TRANS_LOG(WARN, "insert into retain_ctx_mgr failed", K(ret), KPC(log_cb), KPC(this));
}
if (is_local_tx_() || sub_state_.is_force_abort()) {
......@@ -4095,7 +4096,7 @@ int ObPartTransCtx::replay_commit(const ObTxCommitLog &commit_log,
TRANS_LOG(WARN, "set incremental_participants error", K(ret), K(*this));
} else {
if (exec_info_.multi_data_source_.count() > 0 && get_retain_cause() == RetainCause::UNKOWN
&& OB_FAIL(insert_into_retain_ctx_mgr_(RetainCause::MDS_WAIT_GC_COMMIT_LOG, timestamp, offset))) {
&& OB_FAIL(insert_into_retain_ctx_mgr_(RetainCause::MDS_WAIT_GC_COMMIT_LOG, timestamp, offset, true))) {
TRANS_LOG(WARN, "insert into retain_ctx_mgr failed", K(ret), KPC(this));
}
if (is_local_tx_()) {
......@@ -4263,7 +4264,7 @@ int ObPartTransCtx::replay_abort(const ObTxAbortLog &abort_log,
TRANS_LOG(WARN, "update replaying log no failed", K(ret), K(timestamp), K(part_log_no));
} else {
if (exec_info_.multi_data_source_.count() > 0 && get_retain_cause() == RetainCause::UNKOWN
&& OB_FAIL(insert_into_retain_ctx_mgr_(RetainCause::MDS_WAIT_GC_COMMIT_LOG, timestamp, offset))) {
&& OB_FAIL(insert_into_retain_ctx_mgr_(RetainCause::MDS_WAIT_GC_COMMIT_LOG, timestamp, offset, true))) {
TRANS_LOG(WARN, "insert into retain_ctx_mgr failed", K(ret), KPC(this));
}
if (is_local_tx_()) {
......@@ -5883,10 +5884,16 @@ int ObPartTransCtx::tx_keepalive_response_(const int64_t status)
}
int ObPartTransCtx::insert_into_retain_ctx_mgr_(RetainCause cause,
const SCN &log_ts, palf::LSN lsn)
const SCN &log_ts,
palf::LSN lsn,
bool for_replay)
{
int ret = OB_SUCCESS;
ObMDSRetainCtxFunctor *retain_func_ptr = nullptr;
int64_t retain_lock_timeout = INT64_MAX;
if (for_replay) {
retain_lock_timeout = 10 * 1000;
}
if (OB_ISNULL(ls_tx_ctx_mgr_) || RetainCause::UNKOWN == cause) {
ret = OB_INVALID_ARGUMENT;
......@@ -5902,7 +5909,7 @@ int ObPartTransCtx::insert_into_retain_ctx_mgr_(RetainCause cause,
} else if (OB_FALSE_IT(new (retain_func_ptr) ObMDSRetainCtxFunctor())) {
} else if (OB_FAIL(retain_func_ptr->init(this, cause, log_ts, lsn))) {
TRANS_LOG(WARN, "init retain ctx functor failed", K(ret), KPC(this));
} else if (OB_FAIL(retain_ctx_mgr.push_retain_ctx(retain_func_ptr))) {
} else if (OB_FAIL(retain_ctx_mgr.push_retain_ctx(retain_func_ptr, retain_lock_timeout))) {
TRANS_LOG(WARN, "push into retain_ctx_mgr failed", K(ret), KPC(this));
}
// if (OB_FAIL(retain_ctx_mgr.reset()))
......
......@@ -473,7 +473,10 @@ private:
ATOMIC_STORE(&retain_cause_, static_cast<int16_t>(RetainCause::UNKOWN));
}
int insert_into_retain_ctx_mgr_(RetainCause cause, const share::SCN &log_ts, palf::LSN lsn);
int insert_into_retain_ctx_mgr_(RetainCause cause,
const share::SCN &log_ts,
palf::LSN lsn,
bool for_replay);
int prepare_mul_data_source_tx_end_(bool is_commit);
......
......@@ -172,6 +172,7 @@ void ObTxRetainCtxMgr::reset()
max_wait_ckpt_ts_.reset();
last_push_gc_task_ts_ = ObTimeUtility::current_time();
retain_ctx_list_.reset();
skip_remove_cnt_ = 0;
reserve_allocator_.reset();
}
......@@ -179,21 +180,38 @@ void *ObTxRetainCtxMgr::alloc_object(const int64_t size) { return reserve_alloca
void ObTxRetainCtxMgr::free_object(void *ptr) { reserve_allocator_.free(ptr); }
int ObTxRetainCtxMgr::push_retain_ctx(ObIRetainCtxCheckFunctor *retain_func)
int ObTxRetainCtxMgr::push_retain_ctx(ObIRetainCtxCheckFunctor *retain_func, int64_t timeout_us)
{
int ret = OB_SUCCESS;
bool lock_succ = false;
ObTimeGuard tg(__func__, 1 * 1000 * 1000);
SpinWLockGuard guard(retain_ctx_lock_);
// SpinWLockGuard guard(retain_ctx_lock_);
if (OB_FAIL(retain_ctx_lock_.wrlock(timeout_us))) {
if (ret == OB_TIMEOUT) {
ret = OB_EAGAIN;
}
TRANS_LOG(WARN, "[RetainCtxMgr] lock retain_ctx_mgr failed", K(ret), K(timeout_us));
} else {
lock_succ = true;
}
tg.click();
if (!retain_func->is_valid()) {
if (OB_FAIL(ret)) {
} else if (!retain_func->is_valid()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "[RetainCtxMgr] invalid argument", K(ret), KPC(retain_func));
} else if (OB_FAIL(retain_ctx_list_.push_back(retain_func))) {
TRANS_LOG(WARN, "[RetainCtxMgr] push back retain func failed", K(ret));
}
if (lock_succ) {
int64_t tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(retain_ctx_lock_.unlock())) {
TRANS_LOG(WARN, "[RetainCtxMgr] unlock retain_ctx_mgr failed", K(tmp_ret));
}
}
return ret;
}
......@@ -301,10 +319,12 @@ int ObTxRetainCtxMgr::remove_ctx_func_(RetainCtxList::iterator remove_iter)
return ret;
}
int ObTxRetainCtxMgr::for_each_remove_(RetainFuncHandler remove_handler, storage::ObLS *ls,
int ObTxRetainCtxMgr::for_each_remove_(RetainFuncHandler remove_handler,
storage::ObLS *ls,
const int64_t max_run_us)
{
int ret = OB_SUCCESS;
int64_t iter_count = 0;
int64_t remove_count = 0;
bool need_remove = false;
const int64_t start_ts = ObTimeUtility::current_time();
......@@ -315,8 +335,8 @@ int ObTxRetainCtxMgr::for_each_remove_(RetainFuncHandler remove_handler, storage
if (last_remove_iter != retain_ctx_list_.end()) {
if (OB_FAIL(remove_ctx_func_(last_remove_iter))) {
TRANS_LOG(WARN, "[RetainCtxMgr] remove from retain_ctx_list_ failed", K(ret), KPC(*last_remove_iter),
KPC(this));
TRANS_LOG(WARN, "[RetainCtxMgr] remove from retain_ctx_list_ failed", K(ret),
KPC(*last_remove_iter), KPC(this));
} else {
remove_count++;
last_remove_iter = retain_ctx_list_.end();
......@@ -328,8 +348,11 @@ int ObTxRetainCtxMgr::for_each_remove_(RetainFuncHandler remove_handler, storage
} else if (*iter == nullptr) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "[RetainCtxMgr] empty retain ctx functor", K(ret), KPC(this));
} else if (iter_count < skip_remove_cnt_) {
// do nothing
} else if (OB_FAIL((this->*remove_handler)(*iter, need_remove, ls))) {
TRANS_LOG(WARN, "[RetainCtxMgr] execute remove_handler failed", K(ret), KPC(*iter), KPC(this));
TRANS_LOG(WARN, "[RetainCtxMgr] execute remove_handler failed", K(ret), KPC(*iter),
KPC(this));
} else if (need_remove) {
last_remove_iter = iter;
}
......@@ -338,20 +361,28 @@ int ObTxRetainCtxMgr::for_each_remove_(RetainFuncHandler remove_handler, storage
TRANS_LOG(WARN, "remove retain ctx use too much time", K(use_ts), K(remove_count));
break;
}
iter_count++;
}
if (OB_FAIL(ret)) {
// do nothing
} else if (last_remove_iter != retain_ctx_list_.end()) {
if (OB_FAIL(remove_ctx_func_(last_remove_iter))) {
TRANS_LOG(WARN, "[RetainCtxMgr] remove from retain_ctx_list_ failed", K(ret), KPC(*last_remove_iter),
KPC(this));
TRANS_LOG(WARN, "[RetainCtxMgr] remove from retain_ctx_list_ failed", K(ret),
KPC(*last_remove_iter), KPC(this));
} else {
remove_count++;
last_remove_iter = retain_ctx_list_.end();
}
}
if (OB_FAIL(ret) || retain_ctx_list_.end() == iter) {
skip_remove_cnt_ = 0;
} else {
skip_remove_cnt_ = iter_count - remove_count;
}
return ret;
}
......
......@@ -111,7 +111,7 @@ public:
void *alloc_object(const int64_t size);
void free_object(void *ptr);
int push_retain_ctx(ObIRetainCtxCheckFunctor *retain_func);
int push_retain_ctx(ObIRetainCtxCheckFunctor *retain_func, int64_t timeout_us);
int try_gc_retain_ctx(storage::ObLS *ls);
int print_retain_ctx_info(share::ObLSID ls_id);
int force_gc_retain_ctx();
......@@ -137,6 +137,8 @@ private:
share::SCN max_wait_ckpt_ts_;
int64_t last_push_gc_task_ts_;
int64_t skip_remove_cnt_;
TransModulePageAllocator reserve_allocator_;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册