From d87eec46c9913bd197bd0aa9a64b5fc657f91a70 Mon Sep 17 00:00:00 2001 From: obdev Date: Tue, 25 Oct 2022 17:56:58 +0800 Subject: [PATCH] fix stat monitor manager potential core caused by concurrency problem --- .../stat/ob_opt_stat_monitor_manager.cpp | 264 +++++++++--------- src/share/stat/ob_opt_stat_monitor_manager.h | 22 +- src/storage/tx_storage/ob_access_service.cpp | 6 +- 3 files changed, 155 insertions(+), 137 deletions(-) diff --git a/src/share/stat/ob_opt_stat_monitor_manager.cpp b/src/share/stat/ob_opt_stat_monitor_manager.cpp index 3499f6f2c9..2a997d5ee2 100644 --- a/src/share/stat/ob_opt_stat_monitor_manager.cpp +++ b/src/share/stat/ob_opt_stat_monitor_manager.cpp @@ -191,78 +191,6 @@ int ObOptStatMonitorManager::flush_database_monitoring_info(sql::ObExecContext & return ret; } -ColumnUsageMap *ObOptStatMonitorManager::get_or_create_column_usage_map(uint64_t tenant_id) -{ - ColumnUsageMap *col_map = NULL; - ColumnUsageMap *tmp_col_map = NULL; - int ret = column_usage_maps_.get_refactored(tenant_id, col_map); - if (OB_SUCC(ret)) { - // do nothing - } else if (OB_HASH_NOT_EXIST == ret) { - void *buff = ob_malloc(sizeof(ColumnUsageMap), "ColUsagHashMap"); - if (OB_ISNULL(buff)) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("alloc memory failed", K(ret)); - } else if (OB_FALSE_IT(col_map = new(buff)ColumnUsageMap())) { - } else if (OB_FAIL(col_map->create(10000, "ColUsagHashMap", "ColUsagHashMap", tenant_id))) { - LOG_WARN("failed to create column usage map", K(ret)); - } else if (OB_FAIL(column_usage_maps_.set_refactored(tenant_id, col_map))) { - // set refacter failed, may created by other thread - if (OB_SUCCESS == column_usage_maps_.get_refactored(tenant_id, tmp_col_map)) { - LOG_TRACE("get column usage map succeed", K(tenant_id), K(tmp_col_map)); - } else { - LOG_TRACE("get column usage map failed", K(tenant_id), K(tmp_col_map)); - } - } - - if (OB_FAIL(ret) && OB_NOT_NULL(col_map)) { - col_map->~ColumnUsageMap(); - ob_free(buff); - buff = NULL; - col_map = tmp_col_map; - } - } else { - LOG_WARN("failed to get column usage map", K(ret)); - } - return col_map; -} - -DmlStatMap *ObOptStatMonitorManager::get_or_create_dml_stat_map(uint64_t tenant_id) -{ - DmlStatMap *dml_stat_map = NULL; - DmlStatMap *tmp_dml_stat_map = NULL; - int ret = dml_stat_maps_.get_refactored(tenant_id, dml_stat_map); - if (OB_SUCC(ret)) { - // do nothing - } else if (OB_HASH_NOT_EXIST == ret) { - void *buff = ob_malloc(sizeof(DmlStatMap), "DmlStatsHashMap"); - if (OB_ISNULL(buff)) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("alloc memory failed", K(ret)); - } else if (OB_FALSE_IT(dml_stat_map = new(buff)DmlStatMap())) { - } else if (OB_FAIL(dml_stat_map->create(10000, "DmlStatsHashMap", "DmlStatsHashMap", tenant_id))) { - LOG_WARN("failed to create column usage map", K(ret)); - } else if (OB_FAIL(dml_stat_maps_.set_refactored(tenant_id, dml_stat_map))) { - // set refacter failed, may created by other thread - if (OB_SUCCESS == dml_stat_maps_.get_refactored(tenant_id, tmp_dml_stat_map)) { - LOG_TRACE("get dml stats succeed", K(tenant_id), K(tmp_dml_stat_map)); - } else { - LOG_WARN("get dml stats failed", K(tenant_id), K(tmp_dml_stat_map)); - } - } - - if (OB_FAIL(ret) && OB_NOT_NULL(dml_stat_map)) { - dml_stat_map->~DmlStatMap(); - ob_free(buff); - buff = NULL; - dml_stat_map = tmp_dml_stat_map; - } - } else { - LOG_WARN("failed to dml stats", K(ret), K(tenant_id)); - } - return dml_stat_map; -} - int ObOptStatMonitorManager::erase_opt_stat_monitoring_info_map(uint64_t tenant_id) { int ret = OB_SUCCESS; @@ -309,40 +237,42 @@ int ObOptStatMonitorManager::update_local_cache(uint64_t tenant_id, common::ObIArray &args) { int ret = OB_SUCCESS; + ReadMapAtomicOp atomic_op(&args); if (GCTX.is_standby_cluster()) { // standby cluster can't write __all_column_usage, so do not need to update local update - } else { - ColumnUsageMap *col_map = get_or_create_column_usage_map(tenant_id); - if (OB_NOT_NULL(col_map)) { - for (int64_t i = 0; OB_SUCC(ret) && i < args.count(); ++i) { - ColumnUsageArg &arg = args.at(i); - StatKey col_key(arg.table_id_, arg.column_id_); - int64_t flags = 0; - if (OB_FAIL(col_map->get_refactored(col_key, flags))) { - if (OB_LIKELY(ret == OB_HASH_NOT_EXIST)) { - ret = OB_SUCCESS; - if (OB_FAIL(col_map->set_refactored(col_key, arg.flags_))) { - // other thread set the refactor, try update again - ret = OB_SUCCESS; - if (OB_FAIL(col_map->get_refactored(col_key, flags))) { - LOG_WARN("failed to get refactored", K(ret)); - } else if ((~flags) & arg.flags_) { - UpdateValueAtomicOp atomic_op(arg.flags_); - if (OB_FAIL(col_map->atomic_refactored(col_key, atomic_op))) { - LOG_WARN("failed to atomic refactored", K(ret)); - } - } - } - } else { - LOG_WARN("failed to get refactored", K(ret)); - } - } else if ((~flags) & arg.flags_) { - UpdateValueAtomicOp atomic_op(arg.flags_); - if (OB_FAIL(col_map->atomic_refactored(col_key, atomic_op))) { - LOG_WARN("failed to atomic refactored", K(ret)); - } + } else if (OB_FAIL(column_usage_maps_.read_atomic(tenant_id, atomic_op))) { + if (OB_HASH_NOT_EXIST == ret) {//not exists such tenant id map, need alloc new map + ColumnUsageMap *col_map = NULL; + ColumnUsageMap *tmp_col_map = NULL; + void *buff = ob_malloc(sizeof(ColumnUsageMap), "ColUsagHashMap"); + if (OB_ISNULL(buff)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("alloc memory failed", K(ret)); + } else if (OB_FALSE_IT(col_map = new(buff)ColumnUsageMap())) { + } else if (OB_FAIL(col_map->create(10000, "ColUsagHashMap", "ColUsagHashMap", tenant_id))) { + LOG_WARN("failed to create column usage map", K(ret)); + } else if (OB_FAIL(column_usage_maps_.set_refactored(tenant_id, col_map))) { + // set refacter failed, may created by other thread + if (OB_SUCCESS == column_usage_maps_.get_refactored(tenant_id, tmp_col_map)) { + LOG_TRACE("get column usage map succeed", K(tenant_id), K(tmp_col_map)); + } else { + LOG_WARN("get column usage map failed", K(tenant_id), K(tmp_col_map)); + } + } + if (OB_FAIL(ret) && OB_NOT_NULL(col_map)) {//free unused memory + col_map->~ColumnUsageMap(); + ob_free(buff); + buff = NULL; + col_map = tmp_col_map; + } + if (OB_NOT_NULL(col_map)) { + //arrive at here, indicates get a valid dml_stat_map, we need reset error code, and atomic update values + if (OB_FAIL(column_usage_maps_.read_atomic(tenant_id, atomic_op))) { + LOG_WARN("failed to atomic refactored", K(ret), K(tenant_id)); } } + } else { + LOG_WARN("failed to atomic refactored", K(ret)); } } return ret; @@ -351,37 +281,43 @@ int ObOptStatMonitorManager::update_local_cache(uint64_t tenant_id, int ObOptStatMonitorManager::update_local_cache(uint64_t tenant_id, ObOptDmlStat &dml_stat) { int ret = OB_SUCCESS; + ReadMapAtomicOp atomic_op(dml_stat); if (GCTX.is_standby_cluster()) { // standby cluster can't write __all_monitor_modified, so do not need to update local update - } else { - DmlStatMap *dml_stat_map = get_or_create_dml_stat_map(tenant_id); - if (OB_NOT_NULL(dml_stat_map)) { - StatKey key(dml_stat.table_id_, dml_stat.tablet_id_); - ObOptDmlStat tmp_dml_stat; - if (OB_FAIL(dml_stat_map->get_refactored(key, tmp_dml_stat))) { - if (OB_LIKELY(ret == OB_HASH_NOT_EXIST)) { - ret = OB_SUCCESS; - if (OB_FAIL(dml_stat_map->set_refactored(key, dml_stat))) { - // other thread set the refactor, try update again - ret = OB_SUCCESS; - if (OB_FAIL(dml_stat_map->get_refactored(key, tmp_dml_stat))) { - LOG_WARN("failed to get refactored", K(ret)); - } else { - UpdateValueAtomicOp atomic_op(dml_stat); - if (OB_FAIL(dml_stat_map->atomic_refactored(key, atomic_op))) { - LOG_WARN("failed to atomic refactored", K(ret)); - } - } - } + } else if (OB_FAIL(dml_stat_maps_.read_atomic(tenant_id, atomic_op))) { + if (OB_LIKELY(OB_HASH_NOT_EXIST == ret)) {//not exists such tenant id map, need alloc new map + ret = OB_SUCCESS; + DmlStatMap *dml_stat_map = NULL; + DmlStatMap *tmp_dml_stat_map = NULL; + void *buff = ob_malloc(sizeof(DmlStatMap), "DmlStatsHashMap"); + if (OB_ISNULL(buff)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("alloc memory failed", K(ret)); + } else if (OB_FALSE_IT(dml_stat_map = new(buff)DmlStatMap())) { + } else if (OB_FAIL(dml_stat_map->create(10000, "DmlStatsHashMap", "DmlStatsHashMap", tenant_id))) { + LOG_WARN("failed to create column usage map", K(ret)); + } else if (OB_FAIL(dml_stat_maps_.set_refactored(tenant_id, dml_stat_map))) { + // set refacter failed, may created by other thread + if (OB_SUCCESS == dml_stat_maps_.get_refactored(tenant_id, tmp_dml_stat_map)) { + LOG_TRACE("get dml stats succeed", K(tenant_id), K(tmp_dml_stat_map)); } else { - LOG_WARN("failed to get refactored", K(ret)); + LOG_WARN("get dml stats failed", K(tenant_id), K(tmp_dml_stat_map), K(ret)); + } + } + if (OB_FAIL(ret) && OB_NOT_NULL(dml_stat_map)) {//free unused memory + dml_stat_map->~DmlStatMap(); + ob_free(buff); + buff = NULL; + dml_stat_map = tmp_dml_stat_map; + } + if (OB_NOT_NULL(dml_stat_map)) { + //arrive at here, indicates get a valid dml_stat_map, we need reset error code, and atomic update values + if (OB_FAIL(dml_stat_maps_.read_atomic(tenant_id, atomic_op))) { + LOG_WARN("failed to atomic refactored", K(ret), K(tenant_id)); } - } else { - UpdateValueAtomicOp atomic_op(dml_stat); - if (OB_FAIL(dml_stat_map->atomic_refactored(key, atomic_op))) { - LOG_WARN("failed to atomic refactored", K(ret)); - } else {/*do nothing*/} } + } else { + LOG_WARN("failed to atomic refactored", K(ret)); } } return ret; @@ -858,6 +794,78 @@ int ObOptStatMonitorManager::UpdateValueAtomicOp::operator() (common::hash::Hash return ret; } +int ObOptStatMonitorManager::ReadMapAtomicOp::operator() (common::hash::HashMapPair &entry) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(entry.second) || OB_ISNULL(col_usage_args_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret), K(entry.second), K(col_usage_args_)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < col_usage_args_->count(); ++i) { + ColumnUsageArg &arg = col_usage_args_->at(i); + StatKey col_key(arg.table_id_, arg.column_id_); + int64_t flags = 0; + if (OB_FAIL(entry.second->get_refactored(col_key, flags))) { + if (OB_LIKELY(ret == OB_HASH_NOT_EXIST)) { + if (OB_FAIL(entry.second->set_refactored(col_key, arg.flags_))) { + // other thread set the refactor, try update again + if (OB_FAIL(entry.second->get_refactored(col_key, flags))) { + LOG_WARN("failed to get refactored", K(ret)); + } else if ((~flags) & arg.flags_) { + UpdateValueAtomicOp atomic_op(arg.flags_); + if (OB_FAIL(entry.second->atomic_refactored(col_key, atomic_op))) { + LOG_WARN("failed to atomic refactored", K(ret)); + } + } + } + } else { + LOG_WARN("failed to get refactored", K(ret)); + } + } else if ((~flags) & arg.flags_) { + UpdateValueAtomicOp atomic_op(arg.flags_); + if (OB_FAIL(entry.second->atomic_refactored(col_key, atomic_op))) { + LOG_WARN("failed to atomic refactored", K(ret)); + } + } + } + } + return ret; +} + +int ObOptStatMonitorManager::ReadMapAtomicOp::operator() (common::hash::HashMapPair &entry) +{ + int ret = OB_SUCCESS; + StatKey key(dml_stat_.table_id_, dml_stat_.tablet_id_); + UpdateValueAtomicOp atomic_op(dml_stat_); + ObOptDmlStat tmp_dml_stat; + if (OB_ISNULL(entry.second)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret)); + } else if (OB_FAIL(entry.second->get_refactored(key, tmp_dml_stat))) { + if (OB_LIKELY(ret == OB_HASH_NOT_EXIST)) { + if (OB_FAIL(entry.second->set_refactored(key, dml_stat_))) { + // other thread set the refactor, try update again + if (OB_FAIL(entry.second->get_refactored(key, tmp_dml_stat))) { + LOG_WARN("failed to get refactored", K(ret)); + } else { + UpdateValueAtomicOp atomic_op(dml_stat_); + if (OB_FAIL(entry.second->atomic_refactored(key, atomic_op))) { + LOG_WARN("failed to atomic refactored", K(ret)); + } + } + } + } else { + LOG_WARN("failed to get refactored", K(ret)); + } + } else { + UpdateValueAtomicOp atomic_op(dml_stat_); + if (OB_FAIL(entry.second->atomic_refactored(key, atomic_op))) { + LOG_WARN("failed to atomic refactored", K(ret)); + } else {/*do nothing*/} + } + return ret; +} + int ObOptStatMonitorManager::exec_insert_monitor_modified_sql(uint64_t tenant_id, ObSqlString &values_sql) { diff --git a/src/share/stat/ob_opt_stat_monitor_manager.h b/src/share/stat/ob_opt_stat_monitor_manager.h index 8f7a0a66d5..7ed0937093 100644 --- a/src/share/stat/ob_opt_stat_monitor_manager.h +++ b/src/share/stat/ob_opt_stat_monitor_manager.h @@ -91,7 +91,7 @@ public: bool with_check_; }; - // A callback struct used to get ColumnUsageMap and allocate a new one + // A callback struct used to get ColumnUsageMap or ObOptDmlStat, and allocate a new one struct SwapMapAtomicOp { public: @@ -107,7 +107,7 @@ public: DmlStatMap *dml_stat_map_; }; - // A callback struct used to update ColumnUsageMap value + // A callback struct used to update ColumnUsageMap value or DmlStatMap value struct UpdateValueAtomicOp { public: @@ -122,6 +122,22 @@ public: ObOptDmlStat dml_stat_; }; + // A callback struct used to read ColumnUsageMaps value or DmlStatMaps value + struct ReadMapAtomicOp + { + public: + ReadMapAtomicOp(common::ObIArray *col_usage_args) : + col_usage_args_(col_usage_args), dml_stat_() {}; + ReadMapAtomicOp(ObOptDmlStat &dml_stat) : col_usage_args_(NULL), dml_stat_(dml_stat) {}; + virtual ~ReadMapAtomicOp() {}; + int operator() (common::hash::HashMapPair &entry); + int operator() (common::hash::HashMapPair &entry); + private: + DISALLOW_COPY_AND_ASSIGN(ReadMapAtomicOp); + common::ObIArray *col_usage_args_; + ObOptDmlStat dml_stat_; + }; + public: ObOptStatMonitorManager() : inited_(false), mysql_proxy_(NULL) {} @@ -142,8 +158,6 @@ public: int update_dml_stat_info(const bool with_check); int update_tenant_column_usage_info(uint64_t tenant_id); int update_tenant_dml_stat_info(uint64_t tenant_id); - ColumnUsageMap *get_or_create_column_usage_map(uint64_t tenant_id); - DmlStatMap *get_or_create_dml_stat_map(uint64_t tenant_id); int erase_opt_stat_monitoring_info_map(uint64_t tenant_id); int erase_column_usage_map(uint64_t tenant_id); int erase_dml_stat_map(uint64_t tenant_id); diff --git a/src/storage/tx_storage/ob_access_service.cpp b/src/storage/tx_storage/ob_access_service.cpp index 7c875f63d9..87072c9fd7 100644 --- a/src/storage/tx_storage/ob_access_service.cpp +++ b/src/storage/tx_storage/ob_access_service.cpp @@ -1140,10 +1140,7 @@ int ObAccessService::audit_tablet_opt_dml_stat( const int64_t affected_rows) { int ret = OB_SUCCESS; - static __thread int64_t last_access_ts = 0; - if (ObClockGenerator::getClock() - last_access_ts < 1000000) { - // do nothing - } else if (OB_ISNULL(dml_param.table_param_)) { + if (OB_ISNULL(dml_param.table_param_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected null", K(ret), K(dml_param.table_param_)); } else if (dml_stat_type == ObOptDmlStatType::TABLET_OPT_INSERT_STAT || @@ -1165,7 +1162,6 @@ int ObAccessService::audit_tablet_opt_dml_stat( } else { LOG_TRACE("succeed to update dml stat local cache", K(dml_stat)); } - last_access_ts = ObClockGenerator::getClock(); } return ret; } -- GitLab