diff --git a/src/storage/memtable/ob_memtable_context.h b/src/storage/memtable/ob_memtable_context.h index f6a01f842e554ad59810fd769d52f65b64379862..e253bed45f9a8593743e8d956016c0761b0dda50 100644 --- a/src/storage/memtable/ob_memtable_context.h +++ b/src/storage/memtable/ob_memtable_context.h @@ -302,7 +302,7 @@ private: }; class ObMemtable; -typedef ObMemtableCtxFactory::IDMap MemtableIDMap; +typedef common::ObIDMap MemtableIDMap; class ObMemtableCtx final : public ObIMemtableCtx { using RWLock = common::SpinRWLock; diff --git a/src/storage/memtable/ob_memtable_interface.cpp b/src/storage/memtable/ob_memtable_interface.cpp index b8add402653b3da85bb4bf1076002c39d838504b..afcbca47f1b1f1c1eca72cf0949be3fd2414eab4 100644 --- a/src/storage/memtable/ob_memtable_interface.cpp +++ b/src/storage/memtable/ob_memtable_interface.cpp @@ -20,124 +20,6 @@ namespace oceanbase using namespace common; namespace memtable { -ObMemtableCtxFactory::ObMemtableCtxFactory() - : is_inited_(false), - mod_(ObModIds::OB_MEMTABLE_CTX_OBJECT), - ctx_obj_allocator_(OBJ_ALLOCATOR_PAGE, mod_), - ctx_dynamic_allocator_(), - malloc_allocator_(ObModIds::OB_MEMTABLE_CTX), - free_list_(), - alloc_count_(0), - free_count_(0) -{ - int ret = OB_SUCCESS; - if (OB_FAIL(ctx_dynamic_allocator_.init( - DYNAMIC_ALLOCATOR_PAGE, - ObModIds::OB_MEMTABLE_CTX, - common::OB_SERVER_TENANT_ID, - DYNAMIC_ALLOCATOR_PAGE_NUM))) { - TRANS_LOG(ERROR, "ctx dynamic allocator init fail", K(ret)); - } else if (OB_FAIL(free_list_.init(MAX_CTX_HOLD_COUNT))) { - TRANS_LOG(ERROR, "free list init fail", K(ret)); - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < MAX_CTX_HOLD_COUNT; ++i) { - ObIMemtableCtx *ctx = NULL; - void *ctx_buffer = ctx_obj_allocator_.alloc(sizeof(ObMemtableCtx)); - if (NULL == ctx_buffer) { - TRANS_LOG(ERROR, "ctx obj allocator alloc fail"); - ret = OB_ALLOCATE_MEMORY_FAILED; - } else if (NULL == (ctx = new(ctx_buffer) ObMemtableCtx())) { - TRANS_LOG(ERROR, "new ObMemtableCtx fail"); - ret = OB_ALLOCATE_MEMORY_FAILED; - } else if (OB_FAIL(free_list_.push(ctx))) { - TRANS_LOG(ERROR, "free list push fail", K(ret)); - } else { - ctx->set_alloc_type(CTX_ALLOC_FIX); - } - } - } - if (OB_SUCC(ret)) { - ObMemAttr attr; - attr.label_ = ObModIds::OB_MEMTABLE_CTX; - ctx_dynamic_allocator_.set_attr(attr); - is_inited_ = true; - } -} - -ObMemtableCtxFactory::~ObMemtableCtxFactory() -{ - is_inited_ = false; - ObIMemtableCtx *ctx = NULL; - while (OB_SUCCESS == free_list_.pop(ctx)) { - if (NULL != ctx) { - int alloc_type = ctx->get_alloc_type(); - ctx->~ObIMemtableCtx(); - if (CTX_ALLOC_VAR == alloc_type) { - ctx_dynamic_allocator_.free(ctx); - } - ctx = NULL; - } - } - free_list_.destroy(); - ctx_dynamic_allocator_.destroy(); -} - -ObIMemtableCtx *ObMemtableCtxFactory::alloc(const uint64_t tenant_id/* OB_SERVER_TENANT_ID */) -{ - ObIMemtableCtx *ctx = NULL; - int tmp_ret = free_list_.pop(ctx); - if (OB_SUCCESS != tmp_ret || NULL == ctx) - { - void *ctx_buffer = ctx_dynamic_allocator_.alloc(sizeof(ObMemtableCtx)); - if (NULL == ctx_buffer) { - TRANS_LOG(WARN, "ctx obj allocator alloc fail"); - } else if (NULL == (ctx = new(ctx_buffer) ObMemtableCtx())) { - TRANS_LOG(WARN, "new ObMemtableCtx fail"); - } else { - ctx->set_alloc_type(CTX_ALLOC_VAR); - } - } - if (NULL != ctx) { - if (OB_SUCCESS != (tmp_ret = static_cast(ctx)->init(tenant_id))) { - TRANS_LOG(WARN, "memtable context init error", "ret", tmp_ret); - } - } - if (NULL != ctx) { - PC_ADD(MTCTX, 1); - (void)ATOMIC_AAF(&alloc_count_, 1); - if (REACH_TIME_INTERVAL(5 * 1000 * 1000)) { - TRANS_LOG(INFO, "alloc memtable context", K_(alloc_count), K_(free_count)); - } - GARL_ADD(&ctx->resource_link_, "memctx"); - if(REACH_TIME_INTERVAL(10 * 1000 * 1000)) - { - PC_REPORT(); - } - } - return ctx; -} - -void ObMemtableCtxFactory::free(ObIMemtableCtx *ctx) -{ - if (NULL != ctx) { - GARL_DEL(&ctx->resource_link_); - ctx->reset(); - if (CTX_ALLOC_VAR == ctx->get_alloc_type()) { - ctx->~ObIMemtableCtx(); - ctx_dynamic_allocator_.free(ctx); - } else if (CTX_ALLOC_FIX == ctx->get_alloc_type()) { - int tmp_ret = free_list_.push(ctx); - if (OB_SUCCESS != tmp_ret) { - TRANS_LOG(ERROR, "free list push fail, ctx will leek", "ret", tmp_ret, KP(ctx)); - } - } else { - TRANS_LOG(ERROR, "unknow ctx alloc type", "alloc_type", ctx->get_alloc_type()); - } - ctx = NULL; - (void)ATOMIC_AAF(&free_count_, 1); - } - PC_ADD(MTCTX, -1); -} //////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/storage/memtable/ob_memtable_interface.h b/src/storage/memtable/ob_memtable_interface.h index 10bde3f0b28e403b5b7645a58ad8eebc8173e029..1f30f383fc9079c7101f6a3d6fbcb75677124240 100644 --- a/src/storage/memtable/ob_memtable_interface.h +++ b/src/storage/memtable/ob_memtable_interface.h @@ -275,53 +275,6 @@ protected: //////////////////////////////////////////////////////////////////////////////////////////////////// -class ObIMemtableCtxFactory -{ -public: - ObIMemtableCtxFactory() {} - virtual ~ObIMemtableCtxFactory() {} -public: - virtual ObIMemtableCtx *alloc(const uint64_t tenant_id = OB_SERVER_TENANT_ID) = 0; - virtual void free(ObIMemtableCtx *ctx) = 0; -}; - -class ObMemtableCtxFactory : public ObIMemtableCtxFactory -{ -public: - enum - { - CTX_ALLOC_FIX = 1, - CTX_ALLOC_VAR = 2, - }; - typedef common::ObFixedQueue FreeList; - typedef common::ObIDMap IDMap; - typedef common::ObLfFIFOAllocator DynamicAllocator; - static const int64_t OBJ_ALLOCATOR_PAGE = 1L<<22; //4MB - static const int64_t DYNAMIC_ALLOCATOR_PAGE = common::OB_MALLOC_NORMAL_BLOCK_SIZE * 8 - 1024; // 64k - static const int64_t DYNAMIC_ALLOCATOR_PAGE_NUM = common::OB_MAX_CPU_NUM; - static const int64_t MAX_CTX_HOLD_COUNT = 10000; - static const int64_t MAX_CTX_COUNT = 3000000; -public: - ObMemtableCtxFactory(); - ~ObMemtableCtxFactory(); -public: - ObIMemtableCtx *alloc(const uint64_t tenant_id = OB_SERVER_TENANT_ID); - void free(ObIMemtableCtx *ctx); - DynamicAllocator &get_allocator() { return ctx_dynamic_allocator_; } - common::ObIAllocator &get_malloc_allocator() { return malloc_allocator_; } -private: - DISALLOW_COPY_AND_ASSIGN(ObMemtableCtxFactory); -private: - bool is_inited_; - common::ModulePageAllocator mod_; - common::ModuleArena ctx_obj_allocator_; - DynamicAllocator ctx_dynamic_allocator_; - common::ObMalloc malloc_allocator_; - FreeList free_list_; - int64_t alloc_count_; - int64_t free_count_; -}; - class ObMemtableFactory { public: diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 8a069eefd5a0789824c31e250b05a40177bb39aa..45d1eb9f2d0adfd53f41809c438517fb4734d634 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -1315,7 +1315,8 @@ int ObPartTransCtx::recover_tx_ctx_table_info(const ObTxCtxTableInfo &ctx_info) || ObTxState::CLEAR == exec_info_.state_)) { if (OB_FAIL(insert_into_retain_ctx_mgr_(RetainCause::MDS_WAIT_GC_COMMIT_LOG, exec_info_.max_applying_log_ts_, - exec_info_.max_durable_lsn_))) { + exec_info_.max_durable_lsn_, + false))) { TRANS_LOG(WARN, "insert into retain ctx mgr failed", K(ret), KPC(this)); } else if ((exec_info_.trans_type_ == TransType::SP_TRANS && ObTxState::COMMIT == exec_info_.state_) @@ -1832,7 +1833,7 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb) } else if (ObTxLogType::TX_COMMIT_LOG == log_type) { tg.click(); if (exec_info_.multi_data_source_.count() > 0 && get_retain_cause() == RetainCause::UNKOWN - && OB_FAIL(insert_into_retain_ctx_mgr_(RetainCause::MDS_WAIT_GC_COMMIT_LOG, log_ts, log_lsn))) { + && OB_FAIL(insert_into_retain_ctx_mgr_(RetainCause::MDS_WAIT_GC_COMMIT_LOG, log_ts, log_lsn, false))) { TRANS_LOG(WARN, "insert into retain_ctx_mgr failed", K(ret), KPC(log_cb), KPC(this)); } if (is_local_tx_()) { @@ -1864,7 +1865,7 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb) } } else if (ObTxLogType::TX_ABORT_LOG == log_type) { if (exec_info_.multi_data_source_.count() > 0 && get_retain_cause() == RetainCause::UNKOWN - && OB_FAIL(insert_into_retain_ctx_mgr_(RetainCause::MDS_WAIT_GC_COMMIT_LOG, log_ts, log_lsn))) { + && OB_FAIL(insert_into_retain_ctx_mgr_(RetainCause::MDS_WAIT_GC_COMMIT_LOG, log_ts, log_lsn, false))) { TRANS_LOG(WARN, "insert into retain_ctx_mgr failed", K(ret), KPC(log_cb), KPC(this)); } if (is_local_tx_() || sub_state_.is_force_abort()) { @@ -4095,7 +4096,7 @@ int ObPartTransCtx::replay_commit(const ObTxCommitLog &commit_log, TRANS_LOG(WARN, "set incremental_participants error", K(ret), K(*this)); } else { if (exec_info_.multi_data_source_.count() > 0 && get_retain_cause() == RetainCause::UNKOWN - && OB_FAIL(insert_into_retain_ctx_mgr_(RetainCause::MDS_WAIT_GC_COMMIT_LOG, timestamp, offset))) { + && OB_FAIL(insert_into_retain_ctx_mgr_(RetainCause::MDS_WAIT_GC_COMMIT_LOG, timestamp, offset, true))) { TRANS_LOG(WARN, "insert into retain_ctx_mgr failed", K(ret), KPC(this)); } if (is_local_tx_()) { @@ -4263,7 +4264,7 @@ int ObPartTransCtx::replay_abort(const ObTxAbortLog &abort_log, TRANS_LOG(WARN, "update replaying log no failed", K(ret), K(timestamp), K(part_log_no)); } else { if (exec_info_.multi_data_source_.count() > 0 && get_retain_cause() == RetainCause::UNKOWN - && OB_FAIL(insert_into_retain_ctx_mgr_(RetainCause::MDS_WAIT_GC_COMMIT_LOG, timestamp, offset))) { + && OB_FAIL(insert_into_retain_ctx_mgr_(RetainCause::MDS_WAIT_GC_COMMIT_LOG, timestamp, offset, true))) { TRANS_LOG(WARN, "insert into retain_ctx_mgr failed", K(ret), KPC(this)); } if (is_local_tx_()) { @@ -5883,10 +5884,16 @@ int ObPartTransCtx::tx_keepalive_response_(const int64_t status) } int ObPartTransCtx::insert_into_retain_ctx_mgr_(RetainCause cause, - const SCN &log_ts, palf::LSN lsn) + const SCN &log_ts, + palf::LSN lsn, + bool for_replay) { int ret = OB_SUCCESS; ObMDSRetainCtxFunctor *retain_func_ptr = nullptr; + int64_t retain_lock_timeout = INT64_MAX; + if (for_replay) { + retain_lock_timeout = 10 * 1000; + } if (OB_ISNULL(ls_tx_ctx_mgr_) || RetainCause::UNKOWN == cause) { ret = OB_INVALID_ARGUMENT; @@ -5902,7 +5909,7 @@ int ObPartTransCtx::insert_into_retain_ctx_mgr_(RetainCause cause, } else if (OB_FALSE_IT(new (retain_func_ptr) ObMDSRetainCtxFunctor())) { } else if (OB_FAIL(retain_func_ptr->init(this, cause, log_ts, lsn))) { TRANS_LOG(WARN, "init retain ctx functor failed", K(ret), KPC(this)); - } else if (OB_FAIL(retain_ctx_mgr.push_retain_ctx(retain_func_ptr))) { + } else if (OB_FAIL(retain_ctx_mgr.push_retain_ctx(retain_func_ptr, retain_lock_timeout))) { TRANS_LOG(WARN, "push into retain_ctx_mgr failed", K(ret), KPC(this)); } // if (OB_FAIL(retain_ctx_mgr.reset())) diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 11e1f120389b0ff7261a952870a8ffa91cf0926d..27426cb24cf989a6dea511b53187f2d56907ef74 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -473,7 +473,10 @@ private: ATOMIC_STORE(&retain_cause_, static_cast(RetainCause::UNKOWN)); } - int insert_into_retain_ctx_mgr_(RetainCause cause, const share::SCN &log_ts, palf::LSN lsn); + int insert_into_retain_ctx_mgr_(RetainCause cause, + const share::SCN &log_ts, + palf::LSN lsn, + bool for_replay); int prepare_mul_data_source_tx_end_(bool is_commit); diff --git a/src/storage/tx/ob_tx_retain_ctx_mgr.cpp b/src/storage/tx/ob_tx_retain_ctx_mgr.cpp index 0a859349793ba00bc819c2042cf189c3be7f99a2..8e1cec23a33c2d823671cd910bdee612e5095851 100644 --- a/src/storage/tx/ob_tx_retain_ctx_mgr.cpp +++ b/src/storage/tx/ob_tx_retain_ctx_mgr.cpp @@ -172,6 +172,7 @@ void ObTxRetainCtxMgr::reset() max_wait_ckpt_ts_.reset(); last_push_gc_task_ts_ = ObTimeUtility::current_time(); retain_ctx_list_.reset(); + skip_remove_cnt_ = 0; reserve_allocator_.reset(); } @@ -179,21 +180,38 @@ void *ObTxRetainCtxMgr::alloc_object(const int64_t size) { return reserve_alloca void ObTxRetainCtxMgr::free_object(void *ptr) { reserve_allocator_.free(ptr); } -int ObTxRetainCtxMgr::push_retain_ctx(ObIRetainCtxCheckFunctor *retain_func) +int ObTxRetainCtxMgr::push_retain_ctx(ObIRetainCtxCheckFunctor *retain_func, int64_t timeout_us) { int ret = OB_SUCCESS; + bool lock_succ = false; ObTimeGuard tg(__func__, 1 * 1000 * 1000); - SpinWLockGuard guard(retain_ctx_lock_); + // SpinWLockGuard guard(retain_ctx_lock_); + if (OB_FAIL(retain_ctx_lock_.wrlock(timeout_us))) { + if (ret == OB_TIMEOUT) { + ret = OB_EAGAIN; + } + TRANS_LOG(WARN, "[RetainCtxMgr] lock retain_ctx_mgr failed", K(ret), K(timeout_us)); + } else { + lock_succ = true; + } tg.click(); - if (!retain_func->is_valid()) { + if (OB_FAIL(ret)) { + } else if (!retain_func->is_valid()) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "[RetainCtxMgr] invalid argument", K(ret), KPC(retain_func)); } else if (OB_FAIL(retain_ctx_list_.push_back(retain_func))) { TRANS_LOG(WARN, "[RetainCtxMgr] push back retain func failed", K(ret)); } + if (lock_succ) { + int64_t tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(retain_ctx_lock_.unlock())) { + TRANS_LOG(WARN, "[RetainCtxMgr] unlock retain_ctx_mgr failed", K(tmp_ret)); + } + } + return ret; } @@ -301,10 +319,12 @@ int ObTxRetainCtxMgr::remove_ctx_func_(RetainCtxList::iterator remove_iter) return ret; } -int ObTxRetainCtxMgr::for_each_remove_(RetainFuncHandler remove_handler, storage::ObLS *ls, +int ObTxRetainCtxMgr::for_each_remove_(RetainFuncHandler remove_handler, + storage::ObLS *ls, const int64_t max_run_us) { int ret = OB_SUCCESS; + int64_t iter_count = 0; int64_t remove_count = 0; bool need_remove = false; const int64_t start_ts = ObTimeUtility::current_time(); @@ -315,8 +335,8 @@ int ObTxRetainCtxMgr::for_each_remove_(RetainFuncHandler remove_handler, storage if (last_remove_iter != retain_ctx_list_.end()) { if (OB_FAIL(remove_ctx_func_(last_remove_iter))) { - TRANS_LOG(WARN, "[RetainCtxMgr] remove from retain_ctx_list_ failed", K(ret), KPC(*last_remove_iter), - KPC(this)); + TRANS_LOG(WARN, "[RetainCtxMgr] remove from retain_ctx_list_ failed", K(ret), + KPC(*last_remove_iter), KPC(this)); } else { remove_count++; last_remove_iter = retain_ctx_list_.end(); @@ -328,8 +348,11 @@ int ObTxRetainCtxMgr::for_each_remove_(RetainFuncHandler remove_handler, storage } else if (*iter == nullptr) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "[RetainCtxMgr] empty retain ctx functor", K(ret), KPC(this)); + } else if (iter_count < skip_remove_cnt_) { + // do nothing } else if (OB_FAIL((this->*remove_handler)(*iter, need_remove, ls))) { - TRANS_LOG(WARN, "[RetainCtxMgr] execute remove_handler failed", K(ret), KPC(*iter), KPC(this)); + TRANS_LOG(WARN, "[RetainCtxMgr] execute remove_handler failed", K(ret), KPC(*iter), + KPC(this)); } else if (need_remove) { last_remove_iter = iter; } @@ -338,20 +361,28 @@ int ObTxRetainCtxMgr::for_each_remove_(RetainFuncHandler remove_handler, storage TRANS_LOG(WARN, "remove retain ctx use too much time", K(use_ts), K(remove_count)); break; } + + iter_count++; } if (OB_FAIL(ret)) { // do nothing } else if (last_remove_iter != retain_ctx_list_.end()) { if (OB_FAIL(remove_ctx_func_(last_remove_iter))) { - TRANS_LOG(WARN, "[RetainCtxMgr] remove from retain_ctx_list_ failed", K(ret), KPC(*last_remove_iter), - KPC(this)); + TRANS_LOG(WARN, "[RetainCtxMgr] remove from retain_ctx_list_ failed", K(ret), + KPC(*last_remove_iter), KPC(this)); } else { remove_count++; last_remove_iter = retain_ctx_list_.end(); } } + if (OB_FAIL(ret) || retain_ctx_list_.end() == iter) { + skip_remove_cnt_ = 0; + } else { + skip_remove_cnt_ = iter_count - remove_count; + } + return ret; } diff --git a/src/storage/tx/ob_tx_retain_ctx_mgr.h b/src/storage/tx/ob_tx_retain_ctx_mgr.h index 4b90ceb9723f3694782a55896c5aec9d461a7050..92c0ddba0f3b40643f00b7b07c8d5eda258ae628 100644 --- a/src/storage/tx/ob_tx_retain_ctx_mgr.h +++ b/src/storage/tx/ob_tx_retain_ctx_mgr.h @@ -111,7 +111,7 @@ public: void *alloc_object(const int64_t size); void free_object(void *ptr); - int push_retain_ctx(ObIRetainCtxCheckFunctor *retain_func); + int push_retain_ctx(ObIRetainCtxCheckFunctor *retain_func, int64_t timeout_us); int try_gc_retain_ctx(storage::ObLS *ls); int print_retain_ctx_info(share::ObLSID ls_id); int force_gc_retain_ctx(); @@ -137,6 +137,8 @@ private: share::SCN max_wait_ckpt_ts_; int64_t last_push_gc_task_ts_; + int64_t skip_remove_cnt_; + TransModulePageAllocator reserve_allocator_; };