提交 69facd62 编写于 作者: G godyangfight 提交者: LINGuanRen

Fix weak read get tenant schema version using wrong backup snapshot version

上级 a26f56ec
......@@ -447,6 +447,8 @@ public:
EN_BACKUP_CHECK_BACKUP_POINT_EXIST = 185,
EN_STOP_TENANT_LOG_ARCHIVE_BACKUP = 186,
EN_BACKUP_SERVER_DISK_IS_FULL = 188,
EN_BACKUP_DATA_CLEAN_ERROR = 189,
EN_BACKUP_SCHEDULER_WEAK_READ = 191,
EN_CHECK_STANDBY_CLUSTER_SCHEMA_CONDITION = 201,
EN_ALLOCATE_LOB_BUF_FAILED = 202,
EN_ALLOCATE_DESERIALIZE_LOB_BUF_FAILED = 203,
......
......@@ -155,12 +155,22 @@ int ObBackupScheduler::check_backup_schema_version_(const uint64_t tenant_id, co
int ret = OB_SUCCESS;
int hash_ret = OB_SUCCESS;
int64_t schema_version = 0;
int64_t weak_read_schema_version = 0;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("backup scheduler do not init", K(ret));
} else if (tenant_id == OB_SYS_TENANT_ID) {
// skip check sys tenant
} else if (OB_FAIL(fetch_schema_version(tenant_id, weak_read_schema_version))) {
LOG_WARN("failed to fetch schema version", K(ret), K(tenant_id));
} else if (backup_schema_version != weak_read_schema_version) {
ret = OB_NOT_SUPPORTED;
FLOG_ERROR("cannot backup tenant with weak read schema version bigger than backup schema version",
K(ret),
K(tenant_id),
K(backup_schema_version),
K(weak_read_schema_version));
} else if (OB_SUCCESS != (hash_ret = frozen_schema_version_map_.get_refactored(tenant_id, schema_version))) {
if (OB_HASH_NOT_EXIST == hash_ret) {
LOG_INFO("tenant not freeze, skip check schema version", K(ret), K(tenant_id));
......@@ -211,7 +221,9 @@ int ObBackupScheduler::get_tenant_ids(ObIArray<uint64_t>& tenant_ids)
LOG_WARN("backup scheduler do not init", K(ret));
} else {
if (is_cluster_backup_) {
if (OB_FAIL(get_tenant_schema_version(OB_SYS_TENANT_ID, backup_schema_version))) {
if (OB_FAIL(prepare_tenant_schema_version_(OB_SYS_TENANT_ID))) {
LOG_WARN("failed to prepare tenant schema version", K(ret));
} else if (OB_FAIL(get_tenant_schema_version(OB_SYS_TENANT_ID, backup_schema_version))) {
LOG_WARN("failed to get tenant schem version", K(ret));
} else if (OB_FAIL(ObBackupUtils::retry_get_tenant_schema_guard(
OB_SYS_TENANT_ID, *schema_service_, backup_schema_version, guard))) {
......@@ -222,10 +234,17 @@ int ObBackupScheduler::get_tenant_ids(ObIArray<uint64_t>& tenant_ids)
for (int64_t i = 0; OB_SUCC(ret) && i < tmp_tenant_ids.count(); ++i) {
const uint64_t tenant_id = tmp_tenant_ids.at(i);
bool can_backup = false;
if (OB_FAIL(get_tenant_schema_version(tenant_id, backup_schema_version))) {
if (OB_SYS_TENANT_ID == tenant_id) {
can_backup = true;
} else if (OB_FAIL(prepare_tenant_schema_version_(tenant_id))) {
LOG_WARN("failed to prepare tenant schema version", K(ret), K(tenant_id));
} else if (OB_FAIL(get_tenant_schema_version(tenant_id, backup_schema_version))) {
LOG_WARN("failed to get tenant schem version", K(ret), K(tenant_id));
} else if (OB_FAIL(check_tenant_can_backup(tenant_id, backup_schema_version, can_backup))) {
LOG_WARN("failed to check tenant can backup", K(ret), K(tenant_id), K(backup_schema_version));
}
if (OB_FAIL(ret)) {
} else if (!can_backup) {
// do nothing
} else if (OB_FAIL(tenant_ids.push_back(tenant_id))) {
......@@ -520,17 +539,8 @@ int ObBackupScheduler::get_tenant_schema_version(const uint64_t tenant_id, int64
} else if (OB_INVALID_ID == tenant_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get tenant schema version get invalid argument", K(ret), K(tenant_id));
} else {
hash_ret = schema_version_map_.get_refactored(tenant_id, backup_schema_version);
if (OB_SUCCESS == hash_ret) {
} else if (OB_HASH_NOT_EXIST != hash_ret) {
ret = hash_ret;
LOG_WARN("get schema version map error", K(ret), K(hash_ret), K(tenant_id));
} else if (OB_FAIL(fetch_schema_version(tenant_id, backup_schema_version))) {
LOG_WARN("failed to fetch schema version", K(ret), K(tenant_id));
} else if (OB_FAIL(schema_version_map_.set_refactored(tenant_id, backup_schema_version))) {
LOG_WARN("failed to set backup schema version into map", K(ret), K(tenant_id), K(backup_schema_version));
}
} else if (OB_FAIL(schema_version_map_.get_refactored(tenant_id, backup_schema_version))) {
LOG_WARN("failed to get tenant schema version", K(ret), K(tenant_id));
}
return ret;
}
......@@ -540,41 +550,57 @@ int ObBackupScheduler::fetch_schema_version(const uint64_t tenant_id, int64_t& b
// TODO() backup weak read without snapshot_version is wrong, fix it later
int ret = OB_SUCCESS;
backup_schema_version = 0;
SMART_VAR(ObMySQLProxy::MySQLResult, res)
{
ObMySQLResult* result = NULL;
ObSqlString sql;
bool did_use_weak = true;
bool did_use_retry = false;
bool check_sys_variable = true;
ObSQLClientRetryWeak sql_client_retry_weak(
proxy_, did_use_weak, did_use_retry, backup_snapshot_version_, check_sys_variable);
if (OB_FAIL(sql.append_fmt("SELECT max(schema_version) as version FROM %s", OB_ALL_DDL_OPERATION_TNAME))) {
LOG_WARN("append sql failed", K(ret), K(tenant_id));
} else if (OB_FAIL(sql_client_retry_weak.read(res, tenant_id, sql.ptr()))) {
LOG_WARN("execute sql failed", K(ret), K(tenant_id), K(sql));
} else if (OB_UNLIKELY(NULL == (result = res.get_result()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get result", K(ret));
} else if (OB_FAIL(result->next())) {
if (ret == OB_ITER_END) { // no record
ret = OB_EMPTY_RESULT;
LOG_WARN("select max(schema_version) return no row", K(ret));
} else {
LOG_WARN("fail to get schema version. iter quit. ", K(ret));
if (backup_snapshot_version_ <= 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("backup snapshot version should not smaller than 0", K(ret), K(backup_snapshot_version_));
} else {
SMART_VAR(ObMySQLProxy::MySQLResult, res)
{
ObMySQLResult *result = NULL;
ObSqlString sql;
bool did_use_weak = true;
bool did_use_retry = false;
bool check_sys_variable = true;
#ifdef ERRSIM
ret = E(EventTable::EN_BACKUP_SCHEDULER_WEAK_READ) OB_SUCCESS;
LOG_INFO("before set backup snapshot", K(ret), K(backup_snapshot_version_));
if (OB_SUCCESS != ret) {
backup_snapshot_version_ = 0;
LOG_INFO("after set backup snapshot", K(ret), K(backup_snapshot_version_));
ret = OB_SUCCESS;
}
} else {
EXTRACT_INT_FIELD_MYSQL_SKIP_RET(*result, "version", backup_schema_version, int64_t);
if (OB_FAIL(ret)) {
LOG_WARN("fail to get backup schema_version: ", K(ret));
#endif
ObSQLClientRetryWeak sql_client_retry_weak(
proxy_, did_use_weak, did_use_retry, backup_snapshot_version_, check_sys_variable);
if (OB_FAIL(sql.append_fmt("SELECT max(schema_version) as version FROM %s", OB_ALL_DDL_OPERATION_TNAME))) {
LOG_WARN("append sql failed", K(ret), K(tenant_id));
} else if (OB_FAIL(sql_client_retry_weak.read(res, tenant_id, sql.ptr()))) {
LOG_WARN("execute sql failed", K(ret), K(tenant_id), K(sql), K(backup_snapshot_version_));
} else if (OB_UNLIKELY(NULL == (result = res.get_result()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get result", K(ret));
} else if (OB_FAIL(result->next())) {
if (ret == OB_ITER_END) { // no record
ret = OB_EMPTY_RESULT;
LOG_WARN("select max(schema_version) return no row", K(ret));
} else {
LOG_WARN("fail to get schema version. iter quit. ", K(ret));
}
} else {
LOG_INFO("get backup schema version", K(tenant_id), K(backup_schema_version));
// check if this is only one
if (OB_ITER_END != (ret = result->next())) {
LOG_WARN("fail to get all table schema. iter quit. ", K(ret));
ret = OB_ERR_UNEXPECTED;
EXTRACT_INT_FIELD_MYSQL_SKIP_RET(*result, "version", backup_schema_version, int64_t);
if (OB_FAIL(ret)) {
LOG_WARN("fail to get backup schema_version: ", K(ret));
} else {
ret = OB_SUCCESS;
LOG_INFO("get backup schema version", K(tenant_id), K(backup_schema_version));
// check if this is only one
if (OB_ITER_END != (ret = result->next())) {
LOG_WARN("fail to get all table schema. iter quit. ", K(ret));
ret = OB_ERR_UNEXPECTED;
} else {
ret = OB_SUCCESS;
}
}
}
}
......@@ -942,5 +968,29 @@ int ObBackupScheduler::prepare_backup_point_(
return ret;
}
int ObBackupScheduler::prepare_tenant_schema_version_(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
ObSchemaGetterGuard guard;
int64_t schema_version = 0;
const int64_t latest_schema_version = -1;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("backup scheduler do not init", K(ret));
} else if (OB_INVALID_ID == tenant_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("prepare tenant schema version get invalid argument", K(ret), K(tenant_id));
} else if (OB_FAIL(ObBackupUtils::retry_get_tenant_schema_guard(
tenant_id, *schema_service_, latest_schema_version, guard))) {
LOG_WARN("failed to get tenant schema guard", K(ret), K(latest_schema_version));
} else if (OB_FAIL(guard.get_schema_version(tenant_id, schema_version))) {
LOG_WARN("failed to get schema version", K(ret), K(tenant_id));
} else if (OB_FAIL(schema_version_map_.set_refactored(tenant_id, schema_version))) {
LOG_WARN("failed to set tenant schema version into map", K(ret), K(tenant_id), K(schema_version));
}
return ret;
}
} // namespace share
} // namespace oceanbase
......@@ -58,6 +58,7 @@ private:
int check_log_archive_status();
int check_tenant_backup_data_version(const uint64_t tenant_id, ObBackupInfoManager& info_manager, bool& can_backup);
int prepare_backup_point_(const common::ObIArray<uint64_t> &tenant_ids, ObBackupInfoManager &info_manager);
int prepare_tenant_schema_version_(const uint64_t tenant_id);
private:
static const int64_t MAX_TENANT_BUCKET = 1024;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册