提交 d87eec46 编写于 作者: O obdev 提交者: wangzelin.wzl

fix stat monitor manager potential core caused by concurrency problem

上级 c0e13393
......@@ -191,78 +191,6 @@ int ObOptStatMonitorManager::flush_database_monitoring_info(sql::ObExecContext &
return ret;
}
ColumnUsageMap *ObOptStatMonitorManager::get_or_create_column_usage_map(uint64_t tenant_id)
{
ColumnUsageMap *col_map = NULL;
ColumnUsageMap *tmp_col_map = NULL;
int ret = column_usage_maps_.get_refactored(tenant_id, col_map);
if (OB_SUCC(ret)) {
// do nothing
} else if (OB_HASH_NOT_EXIST == ret) {
void *buff = ob_malloc(sizeof(ColumnUsageMap), "ColUsagHashMap");
if (OB_ISNULL(buff)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc memory failed", K(ret));
} else if (OB_FALSE_IT(col_map = new(buff)ColumnUsageMap())) {
} else if (OB_FAIL(col_map->create(10000, "ColUsagHashMap", "ColUsagHashMap", tenant_id))) {
LOG_WARN("failed to create column usage map", K(ret));
} else if (OB_FAIL(column_usage_maps_.set_refactored(tenant_id, col_map))) {
// set refacter failed, may created by other thread
if (OB_SUCCESS == column_usage_maps_.get_refactored(tenant_id, tmp_col_map)) {
LOG_TRACE("get column usage map succeed", K(tenant_id), K(tmp_col_map));
} else {
LOG_TRACE("get column usage map failed", K(tenant_id), K(tmp_col_map));
}
}
if (OB_FAIL(ret) && OB_NOT_NULL(col_map)) {
col_map->~ColumnUsageMap();
ob_free(buff);
buff = NULL;
col_map = tmp_col_map;
}
} else {
LOG_WARN("failed to get column usage map", K(ret));
}
return col_map;
}
DmlStatMap *ObOptStatMonitorManager::get_or_create_dml_stat_map(uint64_t tenant_id)
{
DmlStatMap *dml_stat_map = NULL;
DmlStatMap *tmp_dml_stat_map = NULL;
int ret = dml_stat_maps_.get_refactored(tenant_id, dml_stat_map);
if (OB_SUCC(ret)) {
// do nothing
} else if (OB_HASH_NOT_EXIST == ret) {
void *buff = ob_malloc(sizeof(DmlStatMap), "DmlStatsHashMap");
if (OB_ISNULL(buff)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc memory failed", K(ret));
} else if (OB_FALSE_IT(dml_stat_map = new(buff)DmlStatMap())) {
} else if (OB_FAIL(dml_stat_map->create(10000, "DmlStatsHashMap", "DmlStatsHashMap", tenant_id))) {
LOG_WARN("failed to create column usage map", K(ret));
} else if (OB_FAIL(dml_stat_maps_.set_refactored(tenant_id, dml_stat_map))) {
// set refacter failed, may created by other thread
if (OB_SUCCESS == dml_stat_maps_.get_refactored(tenant_id, tmp_dml_stat_map)) {
LOG_TRACE("get dml stats succeed", K(tenant_id), K(tmp_dml_stat_map));
} else {
LOG_WARN("get dml stats failed", K(tenant_id), K(tmp_dml_stat_map));
}
}
if (OB_FAIL(ret) && OB_NOT_NULL(dml_stat_map)) {
dml_stat_map->~DmlStatMap();
ob_free(buff);
buff = NULL;
dml_stat_map = tmp_dml_stat_map;
}
} else {
LOG_WARN("failed to dml stats", K(ret), K(tenant_id));
}
return dml_stat_map;
}
int ObOptStatMonitorManager::erase_opt_stat_monitoring_info_map(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
......@@ -309,40 +237,42 @@ int ObOptStatMonitorManager::update_local_cache(uint64_t tenant_id,
common::ObIArray<ColumnUsageArg> &args)
{
int ret = OB_SUCCESS;
ReadMapAtomicOp atomic_op(&args);
if (GCTX.is_standby_cluster()) {
// standby cluster can't write __all_column_usage, so do not need to update local update
} else {
ColumnUsageMap *col_map = get_or_create_column_usage_map(tenant_id);
if (OB_NOT_NULL(col_map)) {
for (int64_t i = 0; OB_SUCC(ret) && i < args.count(); ++i) {
ColumnUsageArg &arg = args.at(i);
StatKey col_key(arg.table_id_, arg.column_id_);
int64_t flags = 0;
if (OB_FAIL(col_map->get_refactored(col_key, flags))) {
if (OB_LIKELY(ret == OB_HASH_NOT_EXIST)) {
ret = OB_SUCCESS;
if (OB_FAIL(col_map->set_refactored(col_key, arg.flags_))) {
// other thread set the refactor, try update again
ret = OB_SUCCESS;
if (OB_FAIL(col_map->get_refactored(col_key, flags))) {
LOG_WARN("failed to get refactored", K(ret));
} else if ((~flags) & arg.flags_) {
UpdateValueAtomicOp atomic_op(arg.flags_);
if (OB_FAIL(col_map->atomic_refactored(col_key, atomic_op))) {
LOG_WARN("failed to atomic refactored", K(ret));
}
}
}
} else {
LOG_WARN("failed to get refactored", K(ret));
}
} else if ((~flags) & arg.flags_) {
UpdateValueAtomicOp atomic_op(arg.flags_);
if (OB_FAIL(col_map->atomic_refactored(col_key, atomic_op))) {
LOG_WARN("failed to atomic refactored", K(ret));
}
} else if (OB_FAIL(column_usage_maps_.read_atomic(tenant_id, atomic_op))) {
if (OB_HASH_NOT_EXIST == ret) {//not exists such tenant id map, need alloc new map
ColumnUsageMap *col_map = NULL;
ColumnUsageMap *tmp_col_map = NULL;
void *buff = ob_malloc(sizeof(ColumnUsageMap), "ColUsagHashMap");
if (OB_ISNULL(buff)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc memory failed", K(ret));
} else if (OB_FALSE_IT(col_map = new(buff)ColumnUsageMap())) {
} else if (OB_FAIL(col_map->create(10000, "ColUsagHashMap", "ColUsagHashMap", tenant_id))) {
LOG_WARN("failed to create column usage map", K(ret));
} else if (OB_FAIL(column_usage_maps_.set_refactored(tenant_id, col_map))) {
// set refacter failed, may created by other thread
if (OB_SUCCESS == column_usage_maps_.get_refactored(tenant_id, tmp_col_map)) {
LOG_TRACE("get column usage map succeed", K(tenant_id), K(tmp_col_map));
} else {
LOG_WARN("get column usage map failed", K(tenant_id), K(tmp_col_map));
}
}
if (OB_FAIL(ret) && OB_NOT_NULL(col_map)) {//free unused memory
col_map->~ColumnUsageMap();
ob_free(buff);
buff = NULL;
col_map = tmp_col_map;
}
if (OB_NOT_NULL(col_map)) {
//arrive at here, indicates get a valid dml_stat_map, we need reset error code, and atomic update values
if (OB_FAIL(column_usage_maps_.read_atomic(tenant_id, atomic_op))) {
LOG_WARN("failed to atomic refactored", K(ret), K(tenant_id));
}
}
} else {
LOG_WARN("failed to atomic refactored", K(ret));
}
}
return ret;
......@@ -351,37 +281,43 @@ int ObOptStatMonitorManager::update_local_cache(uint64_t tenant_id,
int ObOptStatMonitorManager::update_local_cache(uint64_t tenant_id, ObOptDmlStat &dml_stat)
{
int ret = OB_SUCCESS;
ReadMapAtomicOp atomic_op(dml_stat);
if (GCTX.is_standby_cluster()) {
// standby cluster can't write __all_monitor_modified, so do not need to update local update
} else {
DmlStatMap *dml_stat_map = get_or_create_dml_stat_map(tenant_id);
if (OB_NOT_NULL(dml_stat_map)) {
StatKey key(dml_stat.table_id_, dml_stat.tablet_id_);
ObOptDmlStat tmp_dml_stat;
if (OB_FAIL(dml_stat_map->get_refactored(key, tmp_dml_stat))) {
if (OB_LIKELY(ret == OB_HASH_NOT_EXIST)) {
ret = OB_SUCCESS;
if (OB_FAIL(dml_stat_map->set_refactored(key, dml_stat))) {
// other thread set the refactor, try update again
ret = OB_SUCCESS;
if (OB_FAIL(dml_stat_map->get_refactored(key, tmp_dml_stat))) {
LOG_WARN("failed to get refactored", K(ret));
} else {
UpdateValueAtomicOp atomic_op(dml_stat);
if (OB_FAIL(dml_stat_map->atomic_refactored(key, atomic_op))) {
LOG_WARN("failed to atomic refactored", K(ret));
}
}
}
} else if (OB_FAIL(dml_stat_maps_.read_atomic(tenant_id, atomic_op))) {
if (OB_LIKELY(OB_HASH_NOT_EXIST == ret)) {//not exists such tenant id map, need alloc new map
ret = OB_SUCCESS;
DmlStatMap *dml_stat_map = NULL;
DmlStatMap *tmp_dml_stat_map = NULL;
void *buff = ob_malloc(sizeof(DmlStatMap), "DmlStatsHashMap");
if (OB_ISNULL(buff)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc memory failed", K(ret));
} else if (OB_FALSE_IT(dml_stat_map = new(buff)DmlStatMap())) {
} else if (OB_FAIL(dml_stat_map->create(10000, "DmlStatsHashMap", "DmlStatsHashMap", tenant_id))) {
LOG_WARN("failed to create column usage map", K(ret));
} else if (OB_FAIL(dml_stat_maps_.set_refactored(tenant_id, dml_stat_map))) {
// set refacter failed, may created by other thread
if (OB_SUCCESS == dml_stat_maps_.get_refactored(tenant_id, tmp_dml_stat_map)) {
LOG_TRACE("get dml stats succeed", K(tenant_id), K(tmp_dml_stat_map));
} else {
LOG_WARN("failed to get refactored", K(ret));
LOG_WARN("get dml stats failed", K(tenant_id), K(tmp_dml_stat_map), K(ret));
}
}
if (OB_FAIL(ret) && OB_NOT_NULL(dml_stat_map)) {//free unused memory
dml_stat_map->~DmlStatMap();
ob_free(buff);
buff = NULL;
dml_stat_map = tmp_dml_stat_map;
}
if (OB_NOT_NULL(dml_stat_map)) {
//arrive at here, indicates get a valid dml_stat_map, we need reset error code, and atomic update values
if (OB_FAIL(dml_stat_maps_.read_atomic(tenant_id, atomic_op))) {
LOG_WARN("failed to atomic refactored", K(ret), K(tenant_id));
}
} else {
UpdateValueAtomicOp atomic_op(dml_stat);
if (OB_FAIL(dml_stat_map->atomic_refactored(key, atomic_op))) {
LOG_WARN("failed to atomic refactored", K(ret));
} else {/*do nothing*/}
}
} else {
LOG_WARN("failed to atomic refactored", K(ret));
}
}
return ret;
......@@ -858,6 +794,78 @@ int ObOptStatMonitorManager::UpdateValueAtomicOp::operator() (common::hash::Hash
return ret;
}
int ObOptStatMonitorManager::ReadMapAtomicOp::operator() (common::hash::HashMapPair<uint64_t, ColumnUsageMap *> &entry)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(entry.second) || OB_ISNULL(col_usage_args_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret), K(entry.second), K(col_usage_args_));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < col_usage_args_->count(); ++i) {
ColumnUsageArg &arg = col_usage_args_->at(i);
StatKey col_key(arg.table_id_, arg.column_id_);
int64_t flags = 0;
if (OB_FAIL(entry.second->get_refactored(col_key, flags))) {
if (OB_LIKELY(ret == OB_HASH_NOT_EXIST)) {
if (OB_FAIL(entry.second->set_refactored(col_key, arg.flags_))) {
// other thread set the refactor, try update again
if (OB_FAIL(entry.second->get_refactored(col_key, flags))) {
LOG_WARN("failed to get refactored", K(ret));
} else if ((~flags) & arg.flags_) {
UpdateValueAtomicOp atomic_op(arg.flags_);
if (OB_FAIL(entry.second->atomic_refactored(col_key, atomic_op))) {
LOG_WARN("failed to atomic refactored", K(ret));
}
}
}
} else {
LOG_WARN("failed to get refactored", K(ret));
}
} else if ((~flags) & arg.flags_) {
UpdateValueAtomicOp atomic_op(arg.flags_);
if (OB_FAIL(entry.second->atomic_refactored(col_key, atomic_op))) {
LOG_WARN("failed to atomic refactored", K(ret));
}
}
}
}
return ret;
}
int ObOptStatMonitorManager::ReadMapAtomicOp::operator() (common::hash::HashMapPair<uint64_t, DmlStatMap *> &entry)
{
int ret = OB_SUCCESS;
StatKey key(dml_stat_.table_id_, dml_stat_.tablet_id_);
UpdateValueAtomicOp atomic_op(dml_stat_);
ObOptDmlStat tmp_dml_stat;
if (OB_ISNULL(entry.second)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (OB_FAIL(entry.second->get_refactored(key, tmp_dml_stat))) {
if (OB_LIKELY(ret == OB_HASH_NOT_EXIST)) {
if (OB_FAIL(entry.second->set_refactored(key, dml_stat_))) {
// other thread set the refactor, try update again
if (OB_FAIL(entry.second->get_refactored(key, tmp_dml_stat))) {
LOG_WARN("failed to get refactored", K(ret));
} else {
UpdateValueAtomicOp atomic_op(dml_stat_);
if (OB_FAIL(entry.second->atomic_refactored(key, atomic_op))) {
LOG_WARN("failed to atomic refactored", K(ret));
}
}
}
} else {
LOG_WARN("failed to get refactored", K(ret));
}
} else {
UpdateValueAtomicOp atomic_op(dml_stat_);
if (OB_FAIL(entry.second->atomic_refactored(key, atomic_op))) {
LOG_WARN("failed to atomic refactored", K(ret));
} else {/*do nothing*/}
}
return ret;
}
int ObOptStatMonitorManager::exec_insert_monitor_modified_sql(uint64_t tenant_id,
ObSqlString &values_sql)
{
......
......@@ -91,7 +91,7 @@ public:
bool with_check_;
};
// A callback struct used to get ColumnUsageMap and allocate a new one
// A callback struct used to get ColumnUsageMap or ObOptDmlStat, and allocate a new one
struct SwapMapAtomicOp
{
public:
......@@ -107,7 +107,7 @@ public:
DmlStatMap *dml_stat_map_;
};
// A callback struct used to update ColumnUsageMap value
// A callback struct used to update ColumnUsageMap value or DmlStatMap value
struct UpdateValueAtomicOp
{
public:
......@@ -122,6 +122,22 @@ public:
ObOptDmlStat dml_stat_;
};
// A callback struct used to read ColumnUsageMaps value or DmlStatMaps value
struct ReadMapAtomicOp
{
public:
ReadMapAtomicOp(common::ObIArray<ColumnUsageArg> *col_usage_args) :
col_usage_args_(col_usage_args), dml_stat_() {};
ReadMapAtomicOp(ObOptDmlStat &dml_stat) : col_usage_args_(NULL), dml_stat_(dml_stat) {};
virtual ~ReadMapAtomicOp() {};
int operator() (common::hash::HashMapPair<uint64_t, ColumnUsageMap *> &entry);
int operator() (common::hash::HashMapPair<uint64_t, DmlStatMap *> &entry);
private:
DISALLOW_COPY_AND_ASSIGN(ReadMapAtomicOp);
common::ObIArray<ColumnUsageArg> *col_usage_args_;
ObOptDmlStat dml_stat_;
};
public:
ObOptStatMonitorManager()
: inited_(false), mysql_proxy_(NULL) {}
......@@ -142,8 +158,6 @@ public:
int update_dml_stat_info(const bool with_check);
int update_tenant_column_usage_info(uint64_t tenant_id);
int update_tenant_dml_stat_info(uint64_t tenant_id);
ColumnUsageMap *get_or_create_column_usage_map(uint64_t tenant_id);
DmlStatMap *get_or_create_dml_stat_map(uint64_t tenant_id);
int erase_opt_stat_monitoring_info_map(uint64_t tenant_id);
int erase_column_usage_map(uint64_t tenant_id);
int erase_dml_stat_map(uint64_t tenant_id);
......
......@@ -1140,10 +1140,7 @@ int ObAccessService::audit_tablet_opt_dml_stat(
const int64_t affected_rows)
{
int ret = OB_SUCCESS;
static __thread int64_t last_access_ts = 0;
if (ObClockGenerator::getClock() - last_access_ts < 1000000) {
// do nothing
} else if (OB_ISNULL(dml_param.table_param_)) {
if (OB_ISNULL(dml_param.table_param_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret), K(dml_param.table_param_));
} else if (dml_stat_type == ObOptDmlStatType::TABLET_OPT_INSERT_STAT ||
......@@ -1165,7 +1162,6 @@ int ObAccessService::audit_tablet_opt_dml_stat(
} else {
LOG_TRACE("succeed to update dml stat local cache", K(dml_stat));
}
last_access_ts = ObClockGenerator::getClock();
}
return ret;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册