提交 4e81072c 编写于 作者: O obdev 提交者: wangzelin.wzl

[CP] [CP] Fix reference count leak when stmt_info expires

上级 649a2dc6
...@@ -729,7 +729,7 @@ int ObSql::handle_ps_prepare(const ObString &stmt, ObSqlCtx &context, ObResultSe ...@@ -729,7 +729,7 @@ int ObSql::handle_ps_prepare(const ObString &stmt, ObSqlCtx &context, ObResultSe
stmt_info->set_is_expired(); stmt_info->set_is_expired();
ps_sql_key.set_db_id(stmt_info->get_db_id()); ps_sql_key.set_db_id(stmt_info->get_db_id());
ps_sql_key.set_ps_sql(stmt_info->get_ps_sql()); ps_sql_key.set_ps_sql(stmt_info->get_ps_sql());
if (OB_FAIL(ps_cache->erase_stmt_item(ps_sql_key))) { if (OB_FAIL(ps_cache->erase_stmt_item(inner_stmt_id, ps_sql_key))) {
LOG_WARN("fail to erase stmt item", K(ret), K(*stmt_info)); LOG_WARN("fail to erase stmt item", K(ret), K(*stmt_info));
} }
need_do_real_prepare = true; need_do_real_prepare = true;
......
...@@ -61,12 +61,19 @@ ObPsStmtItem::ObPsStmtItem() ...@@ -61,12 +61,19 @@ ObPsStmtItem::ObPsStmtItem()
db_id_(OB_INVALID_ID), db_id_(OB_INVALID_ID),
ps_sql_(), ps_sql_(),
stmt_id_(OB_INVALID_STMT_ID), stmt_id_(OB_INVALID_STMT_ID),
is_expired_evicted_(false),
allocator_(NULL), allocator_(NULL),
external_allocator_(NULL) external_allocator_(NULL)
{} {}
ObPsStmtItem::ObPsStmtItem(const ObPsStmtId ps_stmt_id) ObPsStmtItem::ObPsStmtItem(const ObPsStmtId ps_stmt_id)
: ref_count_(1), db_id_(OB_INVALID_ID), ps_sql_(), stmt_id_(ps_stmt_id), allocator_(NULL), external_allocator_(NULL) : ref_count_(1),
db_id_(OB_INVALID_ID),
ps_sql_(),
stmt_id_(ps_stmt_id),
is_expired_evicted_(false),
allocator_(NULL),
external_allocator_(NULL)
{} {}
ObPsStmtItem::ObPsStmtItem(ObIAllocator* inner_allocator, ObIAllocator* external_allocator) ObPsStmtItem::ObPsStmtItem(ObIAllocator* inner_allocator, ObIAllocator* external_allocator)
...@@ -74,6 +81,7 @@ ObPsStmtItem::ObPsStmtItem(ObIAllocator* inner_allocator, ObIAllocator* external ...@@ -74,6 +81,7 @@ ObPsStmtItem::ObPsStmtItem(ObIAllocator* inner_allocator, ObIAllocator* external
db_id_(OB_INVALID_ID), db_id_(OB_INVALID_ID),
ps_sql_(), ps_sql_(),
stmt_id_(OB_INVALID_STMT_ID), stmt_id_(OB_INVALID_STMT_ID),
is_expired_evicted_(false),
allocator_(inner_allocator), allocator_(inner_allocator),
external_allocator_(external_allocator) external_allocator_(external_allocator)
{} {}
...@@ -102,6 +110,7 @@ ObPsStmtItem& ObPsStmtItem::operator=(const ObPsStmtItem& other) ...@@ -102,6 +110,7 @@ ObPsStmtItem& ObPsStmtItem::operator=(const ObPsStmtItem& other)
db_id_ = other.get_db_id(); db_id_ = other.get_db_id();
ps_sql_ = other.get_ps_sql(); ps_sql_ = other.get_ps_sql();
stmt_id_ = other.get_ps_stmt_id(); stmt_id_ = other.get_ps_stmt_id();
is_expired_evicted_ = other.is_expired_evicted_;
} }
return *this; return *this;
} }
......
...@@ -121,19 +121,24 @@ public: ...@@ -121,19 +121,24 @@ public:
{ {
return external_allocator_; return external_allocator_;
} }
bool *get_is_expired_evicted_ptr()
{
return &is_expired_evicted_;
}
TO_STRING_KV(K_(ref_count), K_(db_id), K_(ps_sql), K_(stmt_id)); TO_STRING_KV(K_(ref_count), K_(db_id), K_(ps_sql), K_(stmt_id), K_(is_expired_evicted));
private: private:
volatile int64_t ref_count_; volatile int64_t ref_count_;
uint64_t db_id_; uint64_t db_id_;
common::ObString ps_sql_; common::ObString ps_sql_;
ObPsStmtId stmt_id_; ObPsStmtId stmt_id_;
bool is_expired_evicted_;
// ObDataBuffer is used to use the internal memory of ObPsStmtItem. // ObDataBuffer is used to use the internal memory of ObPsStmtItem.
// The memory essentially comes from inner_allocator_ in ObPsPlancache // The memory essentially comes from inner_allocator_ in ObPsPlancache
common::ObIAllocator* allocator_; common::ObIAllocator *allocator_;
// Point to inner_allocator_ in ObPsPlancache, used to release the memory of the entire ObPsStmtItem // Point to inner_allocator_ in ObPsPlancache, used to release the memory of the entire ObPsStmtItem
common::ObIAllocator* external_allocator_; common::ObIAllocator *external_allocator_;
}; };
struct ObPsSqlMeta { struct ObPsSqlMeta {
......
...@@ -127,7 +127,7 @@ int ObPsCache::deref_ps_stmt(const ObPsStmtId stmt_id, bool erase_item /*=false* ...@@ -127,7 +127,7 @@ int ObPsCache::deref_ps_stmt(const ObPsStmtId stmt_id, bool erase_item /*=false*
ps_sql_key.set_ps_sql(ps_info->get_ps_sql()); ps_sql_key.set_ps_sql(ps_info->get_ps_sql());
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
if (erase_item) { // dec cached ref if (erase_item) { // dec cached ref
if (OB_FAIL(erase_stmt_item(ps_sql_key))) { if (OB_FAIL(erase_stmt_item(stmt_id, ps_sql_key))) {
LOG_WARN("fail to erase stmt", K(ret)); LOG_WARN("fail to erase stmt", K(ret));
} }
} else { // dec session ref } else { // dec session ref
...@@ -420,21 +420,48 @@ int ObPsCache::get_or_add_stmt_info(const ObResultSet& result, int64_t param_cnt ...@@ -420,21 +420,48 @@ int ObPsCache::get_or_add_stmt_info(const ObResultSet& result, int64_t param_cnt
} }
// may parallel execute by multi_thread // may parallel execute by multi_thread
int ObPsCache::erase_stmt_item(ObPsSqlKey& ps_key) int ObPsCache::erase_stmt_item(ObPsStmtId stmt_id, ObPsSqlKey &ps_key)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObPsStmtItem* ps_item = NULL; ObPsStmtItem *ps_item = NULL;
if (OB_FAIL(stmt_id_map_.erase_refactored(ps_key, &ps_item))) { /*
* Thread A Thread B
* Get stmt_item successfully Get stmt_item successfully
*
* Check stmt_info expired
* Check stmt_info expired
*
* Delete stmt_item by key(db_id, ps_sql)
*
*
*
* Add stmt_item with (db_id, ps_sql) as key
*
* Delete stmt_item by key(db_id, ps_sql)
* The item added in the previous step of thread A is deleted
*
*
*/
ObPsStmtItemEraseAtomicOp op(stmt_id);
if (OB_FAIL(stmt_id_map_.read_atomic(ps_key, op))) {
if (OB_HASH_NOT_EXIST == ret) { if (OB_HASH_NOT_EXIST == ret) {
LOG_INFO("erased by others", K(ret), K(ps_key)); LOG_INFO("erased by others", K(ret), K(ps_key));
ret = OB_SUCCESS; ret = OB_SUCCESS;
} else { } else {
LOG_WARN("fail to erase stmt info", K(ps_key), K(ret)); LOG_WARN("failed to get ps stmt item", K(ps_key), K(ret));
}
} else if (op.need_erase()) {
if (OB_FAIL(stmt_id_map_.erase_refactored(ps_key, &ps_item))) {
if (OB_HASH_NOT_EXIST == ret) {
LOG_INFO("erased by others", K(ret), K(ps_key));
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to erase stmt info", K(ps_key), K(ret));
}
} else {
ps_item->dec_ref_count_check_erase();
} }
} else {
ps_item->dec_ref_count_check_erase();
} }
return ret; return ret;
} }
......
...@@ -109,9 +109,9 @@ public: ...@@ -109,9 +109,9 @@ public:
return stmt_info_map_.size(); return stmt_info_map_.size();
} }
int get_all_stmt_id(common::ObIArray<ObPsStmtId>* id_array); int get_all_stmt_id(common::ObIArray<ObPsStmtId> *id_array);
int check_schema_version(ObSchemaGetterGuard& schema_guard, ObPsStmtInfo& stmt_info, bool& is_expired); int check_schema_version(ObSchemaGetterGuard &schema_guard, ObPsStmtInfo &stmt_info, bool &is_expired);
int erase_stmt_item(ObPsSqlKey& ps_key); int erase_stmt_item(ObPsStmtId stmt_id, ObPsSqlKey &ps_key);
private: private:
int inner_cache_evict(bool is_evict_all); int inner_cache_evict(bool is_evict_all);
......
...@@ -56,7 +56,19 @@ void ObPsStmtItemDerefAtomicOp::operator()(const PsStmtIdKV& entry) ...@@ -56,7 +56,19 @@ void ObPsStmtItemDerefAtomicOp::operator()(const PsStmtIdKV& entry)
} }
} }
void ObPsStmtInfoRefAtomicOp::operator()(const PsStmtInfoKV& entry) void ObPsStmtItemEraseAtomicOp::operator()(const PsStmtIdKV &entry)
{
if (OB_ISNULL(entry.second)) {
ret_ = OB_HASH_NOT_EXIST;
LOG_WARN("entry not exist", K_(ret));
} else if (entry.second->get_ps_stmt_id() == stmt_id_) {
if (ATOMIC_BCAS(entry.second->get_is_expired_evicted_ptr(), false, true)) {
need_erase_ = true;
}
}
}
void ObPsStmtInfoRefAtomicOp::operator()(const PsStmtInfoKV &entry)
{ {
if (NULL != entry.second) { if (NULL != entry.second) {
if (entry.second->check_erase_inc_ref_count()) { if (entry.second->check_erase_inc_ref_count()) {
......
...@@ -152,6 +152,31 @@ private: ...@@ -152,6 +152,31 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObPsStmtItemDerefAtomicOp); DISALLOW_COPY_AND_ASSIGN(ObPsStmtItemDerefAtomicOp);
}; };
class ObPsStmtItemEraseAtomicOp {
typedef common::hash::HashMapPair<ObPsSqlKey, ObPsStmtItem *> PsStmtIdKV;
public:
ObPsStmtItemEraseAtomicOp(ObPsStmtId id) : stmt_id_(id), ret_(common::OB_SUCCESS), need_erase_(false)
{}
virtual ~ObPsStmtItemEraseAtomicOp()
{}
void operator()(const PsStmtIdKV &entry);
int get_ret() const
{
return ret_;
}
bool need_erase() const
{
return need_erase_;
}
private:
ObPsStmtId stmt_id_;
int ret_;
bool need_erase_;
DISALLOW_COPY_AND_ASSIGN(ObPsStmtItemEraseAtomicOp);
};
class ObPsStmtInfoRefAtomicOp { class ObPsStmtInfoRefAtomicOp {
typedef common::hash::HashMapPair<ObPsStmtId, ObPsStmtInfo*> PsStmtInfoKV; typedef common::hash::HashMapPair<ObPsStmtId, ObPsStmtInfo*> PsStmtInfoKV;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册