diff --git a/src/sql/ob_sql.cpp b/src/sql/ob_sql.cpp index b426320e477b220c713324a29efe7e3489af4f40..b207c32a79b346a202af70c4d861d6404320ed41 100644 --- a/src/sql/ob_sql.cpp +++ b/src/sql/ob_sql.cpp @@ -729,7 +729,7 @@ int ObSql::handle_ps_prepare(const ObString &stmt, ObSqlCtx &context, ObResultSe stmt_info->set_is_expired(); ps_sql_key.set_db_id(stmt_info->get_db_id()); ps_sql_key.set_ps_sql(stmt_info->get_ps_sql()); - if (OB_FAIL(ps_cache->erase_stmt_item(ps_sql_key))) { + if (OB_FAIL(ps_cache->erase_stmt_item(inner_stmt_id, ps_sql_key))) { LOG_WARN("fail to erase stmt item", K(ret), K(*stmt_info)); } need_do_real_prepare = true; diff --git a/src/sql/plan_cache/ob_prepare_stmt_struct.cpp b/src/sql/plan_cache/ob_prepare_stmt_struct.cpp index bf878edbd2b1854fc8561573304ca4006b170735..c6642186580ed2397499423d519559b7f2f1af21 100644 --- a/src/sql/plan_cache/ob_prepare_stmt_struct.cpp +++ b/src/sql/plan_cache/ob_prepare_stmt_struct.cpp @@ -61,12 +61,19 @@ ObPsStmtItem::ObPsStmtItem() db_id_(OB_INVALID_ID), ps_sql_(), stmt_id_(OB_INVALID_STMT_ID), + is_expired_evicted_(false), allocator_(NULL), external_allocator_(NULL) {} ObPsStmtItem::ObPsStmtItem(const ObPsStmtId ps_stmt_id) - : ref_count_(1), db_id_(OB_INVALID_ID), ps_sql_(), stmt_id_(ps_stmt_id), allocator_(NULL), external_allocator_(NULL) + : ref_count_(1), + db_id_(OB_INVALID_ID), + ps_sql_(), + stmt_id_(ps_stmt_id), + is_expired_evicted_(false), + allocator_(NULL), + external_allocator_(NULL) {} ObPsStmtItem::ObPsStmtItem(ObIAllocator* inner_allocator, ObIAllocator* external_allocator) @@ -74,6 +81,7 @@ ObPsStmtItem::ObPsStmtItem(ObIAllocator* inner_allocator, ObIAllocator* external db_id_(OB_INVALID_ID), ps_sql_(), stmt_id_(OB_INVALID_STMT_ID), + is_expired_evicted_(false), allocator_(inner_allocator), external_allocator_(external_allocator) {} @@ -102,6 +110,7 @@ ObPsStmtItem& ObPsStmtItem::operator=(const ObPsStmtItem& other) db_id_ = other.get_db_id(); ps_sql_ = other.get_ps_sql(); stmt_id_ = other.get_ps_stmt_id(); + is_expired_evicted_ = other.is_expired_evicted_; } return *this; } diff --git a/src/sql/plan_cache/ob_prepare_stmt_struct.h b/src/sql/plan_cache/ob_prepare_stmt_struct.h index 68e3ff630a7283754823b5b8726add3e0fca619f..d6321982a277dddbbae48ca4224cd15d1148afcc 100644 --- a/src/sql/plan_cache/ob_prepare_stmt_struct.h +++ b/src/sql/plan_cache/ob_prepare_stmt_struct.h @@ -121,19 +121,24 @@ public: { return external_allocator_; } + bool *get_is_expired_evicted_ptr() + { + return &is_expired_evicted_; + } - TO_STRING_KV(K_(ref_count), K_(db_id), K_(ps_sql), K_(stmt_id)); + TO_STRING_KV(K_(ref_count), K_(db_id), K_(ps_sql), K_(stmt_id), K_(is_expired_evicted)); private: volatile int64_t ref_count_; uint64_t db_id_; common::ObString ps_sql_; ObPsStmtId stmt_id_; + bool is_expired_evicted_; // ObDataBuffer is used to use the internal memory of ObPsStmtItem. // The memory essentially comes from inner_allocator_ in ObPsPlancache - common::ObIAllocator* allocator_; + common::ObIAllocator *allocator_; // Point to inner_allocator_ in ObPsPlancache, used to release the memory of the entire ObPsStmtItem - common::ObIAllocator* external_allocator_; + common::ObIAllocator *external_allocator_; }; struct ObPsSqlMeta { diff --git a/src/sql/plan_cache/ob_ps_cache.cpp b/src/sql/plan_cache/ob_ps_cache.cpp index 140a9aaa14165effd03f8181be3d96023284fc4d..5b6ab2efd4cf092a6e70685d3b2df5d3b1995e03 100644 --- a/src/sql/plan_cache/ob_ps_cache.cpp +++ b/src/sql/plan_cache/ob_ps_cache.cpp @@ -127,7 +127,7 @@ int ObPsCache::deref_ps_stmt(const ObPsStmtId stmt_id, bool erase_item /*=false* ps_sql_key.set_ps_sql(ps_info->get_ps_sql()); int tmp_ret = OB_SUCCESS; if (erase_item) { // dec cached ref - if (OB_FAIL(erase_stmt_item(ps_sql_key))) { + if (OB_FAIL(erase_stmt_item(stmt_id, ps_sql_key))) { LOG_WARN("fail to erase stmt", K(ret)); } } else { // dec session ref @@ -420,21 +420,48 @@ int ObPsCache::get_or_add_stmt_info(const ObResultSet& result, int64_t param_cnt } // may parallel execute by multi_thread -int ObPsCache::erase_stmt_item(ObPsSqlKey& ps_key) +int ObPsCache::erase_stmt_item(ObPsStmtId stmt_id, ObPsSqlKey &ps_key) { int ret = OB_SUCCESS; - ObPsStmtItem* ps_item = NULL; - if (OB_FAIL(stmt_id_map_.erase_refactored(ps_key, &ps_item))) { + ObPsStmtItem *ps_item = NULL; + /* + * Thread A Thread B + * Get stmt_item successfully Get stmt_item successfully + * + * Check stmt_info expired + * Check stmt_info expired + * + * Delete stmt_item by key(db_id, ps_sql) + * + * + * + * Add stmt_item with (db_id, ps_sql) as key + * + * Delete stmt_item by key(db_id, ps_sql) + * The item added in the previous step of thread A is deleted + * + * + */ + ObPsStmtItemEraseAtomicOp op(stmt_id); + if (OB_FAIL(stmt_id_map_.read_atomic(ps_key, op))) { if (OB_HASH_NOT_EXIST == ret) { LOG_INFO("erased by others", K(ret), K(ps_key)); ret = OB_SUCCESS; } else { - LOG_WARN("fail to erase stmt info", K(ps_key), K(ret)); + LOG_WARN("failed to get ps stmt item", K(ps_key), K(ret)); + } + } else if (op.need_erase()) { + if (OB_FAIL(stmt_id_map_.erase_refactored(ps_key, &ps_item))) { + if (OB_HASH_NOT_EXIST == ret) { + LOG_INFO("erased by others", K(ret), K(ps_key)); + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to erase stmt info", K(ps_key), K(ret)); + } + } else { + ps_item->dec_ref_count_check_erase(); } - } else { - ps_item->dec_ref_count_check_erase(); } - return ret; } diff --git a/src/sql/plan_cache/ob_ps_cache.h b/src/sql/plan_cache/ob_ps_cache.h index ad0861ae9466d1a2bf6929fd3ccb8dc75187e021..9a8fa31efe84aad2c81d566b6e8684c0e8fe7076 100644 --- a/src/sql/plan_cache/ob_ps_cache.h +++ b/src/sql/plan_cache/ob_ps_cache.h @@ -109,9 +109,9 @@ public: return stmt_info_map_.size(); } - int get_all_stmt_id(common::ObIArray* id_array); - int check_schema_version(ObSchemaGetterGuard& schema_guard, ObPsStmtInfo& stmt_info, bool& is_expired); - int erase_stmt_item(ObPsSqlKey& ps_key); + int get_all_stmt_id(common::ObIArray *id_array); + int check_schema_version(ObSchemaGetterGuard &schema_guard, ObPsStmtInfo &stmt_info, bool &is_expired); + int erase_stmt_item(ObPsStmtId stmt_id, ObPsSqlKey &ps_key); private: int inner_cache_evict(bool is_evict_all); diff --git a/src/sql/plan_cache/ob_ps_cache_callback.cpp b/src/sql/plan_cache/ob_ps_cache_callback.cpp index 45ff639ea6789a5f482fe0d149eecda471cbfd00..a29804995f33dfc2807b8955b08cb84e91bd460a 100644 --- a/src/sql/plan_cache/ob_ps_cache_callback.cpp +++ b/src/sql/plan_cache/ob_ps_cache_callback.cpp @@ -56,7 +56,19 @@ void ObPsStmtItemDerefAtomicOp::operator()(const PsStmtIdKV& entry) } } -void ObPsStmtInfoRefAtomicOp::operator()(const PsStmtInfoKV& entry) +void ObPsStmtItemEraseAtomicOp::operator()(const PsStmtIdKV &entry) +{ + if (OB_ISNULL(entry.second)) { + ret_ = OB_HASH_NOT_EXIST; + LOG_WARN("entry not exist", K_(ret)); + } else if (entry.second->get_ps_stmt_id() == stmt_id_) { + if (ATOMIC_BCAS(entry.second->get_is_expired_evicted_ptr(), false, true)) { + need_erase_ = true; + } + } +} + +void ObPsStmtInfoRefAtomicOp::operator()(const PsStmtInfoKV &entry) { if (NULL != entry.second) { if (entry.second->check_erase_inc_ref_count()) { diff --git a/src/sql/plan_cache/ob_ps_cache_callback.h b/src/sql/plan_cache/ob_ps_cache_callback.h index 39a49e6de70b78b877734e3003c3185b8c67e561..9ac05500e80d5406103679a2b5a0d382653d5f04 100644 --- a/src/sql/plan_cache/ob_ps_cache_callback.h +++ b/src/sql/plan_cache/ob_ps_cache_callback.h @@ -152,6 +152,31 @@ private: DISALLOW_COPY_AND_ASSIGN(ObPsStmtItemDerefAtomicOp); }; +class ObPsStmtItemEraseAtomicOp { + typedef common::hash::HashMapPair PsStmtIdKV; + +public: + ObPsStmtItemEraseAtomicOp(ObPsStmtId id) : stmt_id_(id), ret_(common::OB_SUCCESS), need_erase_(false) + {} + virtual ~ObPsStmtItemEraseAtomicOp() + {} + void operator()(const PsStmtIdKV &entry); + int get_ret() const + { + return ret_; + } + bool need_erase() const + { + return need_erase_; + } + +private: + ObPsStmtId stmt_id_; + int ret_; + bool need_erase_; + DISALLOW_COPY_AND_ASSIGN(ObPsStmtItemEraseAtomicOp); +}; + class ObPsStmtInfoRefAtomicOp { typedef common::hash::HashMapPair PsStmtInfoKV;