diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp index d9db2877aec067a2f019c55f998e047d5e3d81cc..74ae66b28c4f8adc17c55f3d33428338fd52e94d 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp @@ -189,7 +189,7 @@ int ObTableLoadCoordinatorCtx::advance_status(ObTableLoadStatusType status) ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(status)); } else { - ObMutexGuard guard(mutex_); + obsys::ObWLockGuard guard(rwlock_); if (OB_UNLIKELY(ObTableLoadStatusType::ERROR == status_)) { ret = error_code_; LOG_WARN("coordinator has error", KR(ret)); @@ -220,7 +220,7 @@ int ObTableLoadCoordinatorCtx::set_status_error(int error_code) ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(error_code)); } else { - ObMutexGuard guard(mutex_); + obsys::ObWLockGuard guard(rwlock_); if (OB_UNLIKELY(status_ == ObTableLoadStatusType::ABORT)) { ret = OB_TRANS_KILLED; } else if (status_ != ObTableLoadStatusType::ERROR) { @@ -240,7 +240,7 @@ int ObTableLoadCoordinatorCtx::set_status_abort() ret = OB_NOT_INIT; LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObWLockGuard guard(rwlock_); if (OB_UNLIKELY(status_ != ObTableLoadStatusType::ABORT)) { status_ = ObTableLoadStatusType::ABORT; table_load_status_to_string(status_, ctx_->job_stat_->coordinator.status_); @@ -272,7 +272,7 @@ int ObTableLoadCoordinatorCtx::check_status(ObTableLoadStatusType status) const ret = OB_NOT_INIT; LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); ret = check_status_unlock(status); } return ret; @@ -342,7 +342,7 @@ int ObTableLoadCoordinatorCtx::start_trans(const ObTableLoadSegmentID &segment_i ret = OB_NOT_INIT; LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObWLockGuard guard(rwlock_); if (OB_FAIL(check_status_unlock(ObTableLoadStatusType::LOADING))) { LOG_WARN("fail to check status", KR(ret), K_(status)); } else { @@ -388,7 +388,7 @@ int ObTableLoadCoordinatorCtx::commit_trans(ObTableLoadCoordinatorTrans *trans) ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), KP(trans)); } else { - ObMutexGuard guard(mutex_); + obsys::ObWLockGuard guard(rwlock_); const ObTableLoadSegmentID &segment_id = trans->get_trans_id().segment_id_; SegmentCtx *segment_ctx = nullptr; if (OB_FAIL(segment_ctx_map_.get(segment_id, segment_ctx))) { @@ -424,7 +424,7 @@ int ObTableLoadCoordinatorCtx::abort_trans(ObTableLoadCoordinatorTrans *trans) ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), KP(trans)); } else { - ObMutexGuard guard(mutex_); + obsys::ObWLockGuard guard(rwlock_); const ObTableLoadSegmentID &segment_id = trans->get_trans_id().segment_id_; SegmentCtx *segment_ctx = nullptr; if (OB_FAIL(segment_ctx_map_.get(segment_id, segment_ctx))) { @@ -462,7 +462,7 @@ void ObTableLoadCoordinatorCtx::put_trans(ObTableLoadCoordinatorTrans *trans) ObTableLoadTransStatusType trans_status = trans_ctx->get_trans_status(); OB_ASSERT(ObTableLoadTransStatusType::COMMIT == trans_status || ObTableLoadTransStatusType::ABORT == trans_status); - ObMutexGuard guard(mutex_); + obsys::ObWLockGuard guard(rwlock_); if (OB_FAIL(trans_map_.erase_refactored(trans->get_trans_id()))) { LOG_WARN("fail to erase_refactored", KR(ret)); } else { @@ -485,7 +485,7 @@ int ObTableLoadCoordinatorCtx::get_trans(const ObTableLoadTransId &trans_id, ret = OB_NOT_INIT; LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); if (OB_FAIL(trans_map_.get_refactored(trans_id, trans))) { if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) { LOG_WARN("fail to get_refactored", KR(ret), K(trans_id)); @@ -507,7 +507,7 @@ int ObTableLoadCoordinatorCtx::get_trans_ctx(const ObTableLoadTransId &trans_id, ret = OB_NOT_INIT; LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); if (OB_FAIL(trans_ctx_map_.get_refactored(trans_id, trans_ctx))) { if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) { LOG_WARN("fail to get trans ctx", KR(ret), K(trans_id)); @@ -527,7 +527,7 @@ int ObTableLoadCoordinatorCtx::get_segment_trans_ctx(const ObTableLoadSegmentID ret = OB_NOT_INIT; LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); SegmentCtx *segment_ctx = nullptr; if (OB_FAIL(segment_ctx_map_.get(segment_id, segment_ctx))) { if (OB_UNLIKELY(OB_ENTRY_NOT_EXIST != ret)) { @@ -556,7 +556,7 @@ int ObTableLoadCoordinatorCtx::get_active_trans_ids( ret = OB_NOT_INIT; LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); for (TransMap::const_iterator trans_iter = trans_map_.begin(); OB_SUCC(ret) && trans_iter != trans_map_.end(); ++trans_iter) { if (OB_FAIL(trans_id_array.push_back(trans_iter->first))) { @@ -576,7 +576,7 @@ int ObTableLoadCoordinatorCtx::get_committed_trans_ids( ret = OB_NOT_INIT; LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); if (OB_FAIL(trans_id_array.create(commited_trans_ctx_array_.count(), allocator))) { LOG_WARN("fail to create trans id array", KR(ret)); } else { @@ -596,7 +596,7 @@ int ObTableLoadCoordinatorCtx::check_exist_trans(bool &is_exist) const ret = OB_NOT_INIT; LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); is_exist = !trans_map_.empty(); } return ret; @@ -609,7 +609,7 @@ int ObTableLoadCoordinatorCtx::check_exist_committed_trans(bool &is_exist) const ret = OB_NOT_INIT; LOG_WARN("ObTableLoadCoordinatorCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); is_exist = !commited_trans_ctx_array_.empty(); } return ret; diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.h b/src/observer/table_load/ob_table_load_coordinator_ctx.h index e396e9ee591fa35df3b9f666ec0a5c7af9120454..133fd3c43d529b1d79364060dbb10a8b150d047a 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.h +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.h @@ -36,12 +36,12 @@ public: public: OB_INLINE table::ObTableLoadStatusType get_status() const { - lib::ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); return status_; } OB_INLINE int get_error_code() const { - lib::ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); return error_code_; } OB_INLINE int set_status_inited() @@ -133,7 +133,7 @@ private: ObTableLoadObjectAllocator trans_allocator_; // 多线程安全 uint64_t last_trans_gid_ CACHE_ALIGNED; uint64_t next_session_id_ CACHE_ALIGNED; - mutable lib::ObMutex mutex_; + mutable obsys::ObRWLock rwlock_; table::ObTableLoadStatusType status_; int error_code_; TransMap trans_map_; diff --git a/src/observer/table_load/ob_table_load_store_ctx.cpp b/src/observer/table_load/ob_table_load_store_ctx.cpp index 0b1f6c1d526b13b60f85e2dc126586ad56d6ea8b..38b23a6197942487cebca76c346e8a3744472332 100644 --- a/src/observer/table_load/ob_table_load_store_ctx.cpp +++ b/src/observer/table_load/ob_table_load_store_ctx.cpp @@ -300,7 +300,7 @@ int ObTableLoadStoreCtx::advance_status(ObTableLoadStatusType status) ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(status)); } else { - ObMutexGuard guard(mutex_); + obsys::ObWLockGuard guard(rwlock_); if (OB_UNLIKELY(ObTableLoadStatusType::ERROR == status_)) { ret = error_code_; LOG_WARN("store has error", KR(ret)); @@ -331,7 +331,7 @@ int ObTableLoadStoreCtx::set_status_error(int error_code) ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(error_code)); } else { - ObMutexGuard guard(mutex_); + obsys::ObWLockGuard guard(rwlock_); if (OB_UNLIKELY(status_ == ObTableLoadStatusType::ABORT)) { ret = OB_TRANS_KILLED; } else if (status_ != ObTableLoadStatusType::ERROR) { @@ -351,7 +351,7 @@ int ObTableLoadStoreCtx::set_status_abort() ret = OB_NOT_INIT; LOG_WARN("ObTableLoadStoreCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObWLockGuard guard(rwlock_); if (OB_UNLIKELY(status_ != ObTableLoadStatusType::ABORT)) { status_ = ObTableLoadStatusType::ABORT; table_load_status_to_string(status_, ctx_->job_stat_->store.status_); @@ -383,7 +383,7 @@ int ObTableLoadStoreCtx::check_status(ObTableLoadStatusType status) const ret = OB_NOT_INIT; LOG_WARN("ObTableLoadStoreCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); ret = check_status_unlock(status); } return ret; @@ -630,7 +630,7 @@ int ObTableLoadStoreCtx::start_trans(const ObTableLoadTransId &trans_id, ret = OB_NOT_INIT; LOG_WARN("ObTableLoadStoreCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObWLockGuard guard(rwlock_); if (OB_FAIL(check_status_unlock(ObTableLoadStatusType::LOADING))) { LOG_WARN("fail to check status", KR(ret), K_(status)); } else { @@ -676,7 +676,7 @@ int ObTableLoadStoreCtx::commit_trans(ObTableLoadStoreTrans *trans) ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), KP(trans)); } else { - ObMutexGuard guard(mutex_); + obsys::ObWLockGuard guard(rwlock_); const ObTableLoadSegmentID &segment_id = trans->get_trans_id().segment_id_; SegmentCtx *segment_ctx = nullptr; ObTableLoadTransStore *trans_store = nullptr; @@ -721,7 +721,7 @@ int ObTableLoadStoreCtx::abort_trans(ObTableLoadStoreTrans *trans) ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), KP(trans)); } else { - ObMutexGuard guard(mutex_); + obsys::ObWLockGuard guard(rwlock_); const ObTableLoadSegmentID &segment_id = trans->get_trans_id().segment_id_; SegmentCtx *segment_ctx = nullptr; if (OB_FAIL(segment_ctx_map_.get(segment_id, segment_ctx))) { @@ -759,7 +759,7 @@ void ObTableLoadStoreCtx::put_trans(ObTableLoadStoreTrans *trans) ObTableLoadTransStatusType trans_status = trans_ctx->get_trans_status(); OB_ASSERT(ObTableLoadTransStatusType::COMMIT == trans_status || ObTableLoadTransStatusType::ABORT == trans_status); - ObMutexGuard guard(mutex_); + obsys::ObWLockGuard guard(rwlock_); if (OB_FAIL(trans_map_.erase_refactored(trans->get_trans_id()))) { LOG_WARN("fail to erase_refactored", KR(ret)); } else { @@ -782,7 +782,7 @@ int ObTableLoadStoreCtx::get_trans(const ObTableLoadTransId &trans_id, ret = OB_NOT_INIT; LOG_WARN("ObTableLoadStoreCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); if (OB_FAIL(trans_map_.get_refactored(trans_id, trans))) { if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) { LOG_WARN("fail to get_refactored", KR(ret), K(trans_id)); @@ -804,7 +804,7 @@ int ObTableLoadStoreCtx::get_trans_ctx(const ObTableLoadTransId &trans_id, ret = OB_NOT_INIT; LOG_WARN("ObTableLoadStoreCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); if (OB_FAIL(trans_ctx_map_.get_refactored(trans_id, trans_ctx))) { if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) { LOG_WARN("fail to get trans ctx", KR(ret), K(trans_id)); @@ -825,7 +825,7 @@ int ObTableLoadStoreCtx::get_active_trans_ids( ret = OB_NOT_INIT; LOG_WARN("ObTableLoadStoreCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); for (TransMap::const_iterator trans_iter = trans_map_.begin(); OB_SUCC(ret) && trans_iter != trans_map_.end(); ++trans_iter) { if (OB_FAIL(trans_id_array.push_back(trans_iter->first))) { @@ -844,7 +844,7 @@ int ObTableLoadStoreCtx::get_committed_trans_ids( ret = OB_NOT_INIT; LOG_WARN("ObTableLoadStoreCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); if (OB_FAIL(trans_id_array.create(committed_trans_store_array_.count(), allocator))) { LOG_WARN("fail to create trans id array", KR(ret)); } else { @@ -865,7 +865,7 @@ int ObTableLoadStoreCtx::get_committed_trans_stores( ret = OB_NOT_INIT; LOG_WARN("ObTableLoadStoreCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); if (OB_FAIL(trans_store_array.assign(committed_trans_store_array_))) { LOG_WARN("fail to assign trans store array", KR(ret)); } @@ -880,7 +880,7 @@ int ObTableLoadStoreCtx::check_exist_trans(bool &exist) const ret = OB_NOT_INIT; LOG_WARN("ObTableLoadStoreCtx not init", KR(ret)); } else { - ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); exist = !trans_map_.empty(); } return ret; @@ -888,7 +888,7 @@ int ObTableLoadStoreCtx::check_exist_trans(bool &exist) const void ObTableLoadStoreCtx::clear_committed_trans_stores() { - ObMutexGuard guard(mutex_); + obsys::ObWLockGuard guard(rwlock_); for (int64_t i = 0; i < committed_trans_store_array_.count(); ++i) { ObTableLoadTransStore *trans_store = committed_trans_store_array_.at(i); ObTableLoadTransCtx *trans_ctx = trans_store->trans_ctx_; diff --git a/src/observer/table_load/ob_table_load_store_ctx.h b/src/observer/table_load/ob_table_load_store_ctx.h index efc07ea918de48b10b32afb5f4c72699224e8091..632c21da8448ca559f574e55df4793dbe69ab327 100644 --- a/src/observer/table_load/ob_table_load_store_ctx.h +++ b/src/observer/table_load/ob_table_load_store_ctx.h @@ -49,12 +49,12 @@ public: public: OB_INLINE table::ObTableLoadStatusType get_status() const { - lib::ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); return status_; } OB_INLINE int get_error_code() const { - lib::ObMutexGuard guard(mutex_); + obsys::ObRLockGuard guard(rwlock_); return error_code_; } OB_INLINE int set_status_inited() @@ -153,7 +153,7 @@ private: typedef common::ObLinkHashMap SegmentCtxMap; private: ObTableLoadObjectAllocator trans_allocator_; // 多线程安全 - mutable lib::ObMutex mutex_; + mutable obsys::ObRWLock rwlock_; common::ObArenaAllocator allocator_; table::ObTableLoadStatusType status_; int error_code_;