diff --git a/src/share/schema/ob_server_schema_service.cpp b/src/share/schema/ob_server_schema_service.cpp index 71f0263f2fc895173b537eecec6036745766e937..6dc7331e1475a9c7bfb28af8c8c6d26bed1269fd 100644 --- a/src/share/schema/ob_server_schema_service.cpp +++ b/src/share/schema/ob_server_schema_service.cpp @@ -5318,7 +5318,6 @@ int ObServerSchemaService::refresh_full_schema( bool sys_schema_change = true; int64_t local_schema_version = 0; int64_t core_schema_version = 0; - int64_t new_core_schema_version = 0; int64_t schema_version = 0; if (OB_FAIL(schema_mgr_for_cache_map_.get_refactored(tenant_id, schema_mgr_for_cache))) { LOG_WARN("fail to get schema_mgr_for_cache", KR(ret), K(schema_status)); @@ -5343,6 +5342,10 @@ int ObServerSchemaService::refresh_full_schema( if (OB_FAIL(schema_service_->get_core_version( sql_client, schema_status, core_schema_version))) { LOG_WARN("get_core_version failed", KR(ret), K(schema_status)); + } else if (core_schema_version <= OB_CORE_SCHEMA_VERSION + 1) { + ret = OB_EAGAIN; + LOG_WARN("schema may be not persisted, try again", + KR(ret), K(schema_status), K(core_schema_version)); } else if (core_schema_version > local_schema_version) { // for core table schema, we publis as core_temp_version int64_t publish_version = 0; @@ -5362,6 +5365,21 @@ int ObServerSchemaService::refresh_full_schema( if (OB_SUCC(ret) && !core_schema_change && sys_schema_change) { if (OB_FAIL(get_schema_version_in_inner_table(sql_client, schema_status, schema_version))) { LOG_WARN("fail to get schema version in inner table", KR(ret), K(schema_status)); + } else if (schema_version <= OB_CORE_SCHEMA_VERSION + 1) { + ret = OB_EAGAIN; + LOG_WARN("schema may be not persisted, try again", + KR(ret), K(schema_status), K(schema_version)); + } else if (core_schema_version > schema_version) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("schema version fallback, unexpected", + KR(ret), K(schema_status), K(core_schema_version), K(schema_version)); + } else if (OB_FAIL(check_core_schema_change_(sql_client, schema_status, + core_schema_version, core_schema_change))) { + LOG_WARN("fail to check core schema version change", KR(ret), K(schema_status), K(core_schema_version)); + } else if (core_schema_change) { + sys_schema_change = true; + LOG_WARN("core schema version change, try again", + KR(ret), K(schema_status), K(core_schema_version), K(schema_version)); } else if (OB_FAIL(check_sys_schema_change(sql_client, schema_status, local_schema_version, schema_version, sys_schema_change))) { LOG_WARN("check_sys_schema_change failed", KR(ret), K(schema_status), K(schema_version)); @@ -5384,14 +5402,13 @@ int ObServerSchemaService::refresh_full_schema( if (OB_FAIL(ret)) { // check whether failed because of core table schema change, go to suitable pos int temp_ret = OB_SUCCESS; - if (OB_SUCCESS != (temp_ret = schema_service_->get_core_version( - sql_client, schema_status, new_core_schema_version))) { - LOG_WARN("get_core_version failed", K(temp_ret), K(schema_status)); - } else if (new_core_schema_version != core_schema_version) { - core_schema_change = true; + if (OB_SUCCESS != (temp_ret = check_core_schema_change_( + sql_client, schema_status, core_schema_version, core_schema_change))) { + LOG_WARN("get_core_version failed", KR(ret), KR(temp_ret), K(schema_status), K(core_schema_version)); + } else if (core_schema_change) { sys_schema_change = true; - LOG_WARN("core schema change during refresh sys schema", KR(ret), - K(schema_status), K(core_schema_version), K(new_core_schema_version)); + LOG_WARN("core schema version change, try again", + KR(ret), K(schema_status), K(core_schema_version), K(schema_version)); ret = OB_SUCCESS; } } @@ -5593,6 +5610,10 @@ int ObServerSchemaService::refresh_increment_schema( if (OB_FAIL(schema_service_->get_core_version( sql_client, schema_status, core_schema_version))) { LOG_WARN("get_core_version failed", KR(ret), K(schema_status)); + } else if (core_schema_version <= OB_CORE_SCHEMA_VERSION + 1) { + ret = OB_EAGAIN; + LOG_WARN("schema may be not persisted, try again", + KR(ret), K(schema_status), K(core_schema_version)); } else if (core_schema_version > local_schema_version) { int64_t publish_version = OB_INVALID_INDEX; if (OB_FAIL(ObSchemaService::gen_core_temp_version( @@ -5611,6 +5632,27 @@ int ObServerSchemaService::refresh_increment_schema( if (OB_SUCC(ret) && !core_schema_change && sys_schema_change) { if (OB_FAIL(get_schema_version_in_inner_table(sql_client, schema_status, schema_version))) { LOG_WARN("fail to get schema version in inner table", KR(ret), K(schema_status)); + } else if (schema_version < local_schema_version) { + if (local_schema_version <= OB_CORE_SCHEMA_VERSION + 1) { + ret = OB_EAGAIN; + LOG_WARN("schema may be not persisted, try again", + KR(ret), K(schema_status), K(schema_version), K(local_schema_version)); + } else { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("schema version fallback, unexpected", + KR(ret), K(schema_status), K(schema_version), K(local_schema_version)); + } + } else if (core_schema_version > schema_version) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("schema version fallback, unexpected", + KR(ret), K(schema_status), K(core_schema_version), K(schema_version)); + } else if (OB_FAIL(check_core_schema_change_(sql_client, schema_status, + core_schema_version, core_schema_change))) { + LOG_WARN("fail to check core schema version change", KR(ret), K(schema_status), K(core_schema_version)); + } else if (core_schema_change) { + sys_schema_change = true; + LOG_WARN("core schema version change, try again", + KR(ret), K(schema_status), K(core_schema_version), K(schema_version)); } else if (OB_FAIL(check_sys_schema_change(sql_client, schema_status, local_schema_version, schema_version, sys_schema_change))) { LOG_WARN("check_sys_schema_change failed", KR(ret), K(schema_status), K(schema_version)); @@ -5629,16 +5671,13 @@ int ObServerSchemaService::refresh_increment_schema( if (OB_FAIL(ret)) { // check whether failed because of core table schema change, go to suitable pos int temp_ret = OB_SUCCESS; - int64_t new_core_schema_version = 0; - if (OB_SUCCESS != (temp_ret = schema_service_->get_core_version( - sql_client, schema_status, new_core_schema_version))) { - LOG_WARN("get_core_version failed, need retry", - KR(ret), K(temp_ret), K(schema_status)); - } else if (new_core_schema_version != core_schema_version) { - core_schema_change = true; + if (OB_SUCCESS != (temp_ret = check_core_schema_change_( + sql_client, schema_status, core_schema_version, core_schema_change))) { + LOG_WARN("get_core_version failed", KR(ret), KR(temp_ret), K(schema_status), K(core_schema_version)); + } else if (core_schema_change) { sys_schema_change = true; - LOG_WARN("core schema change during refresh sys schema", KR(ret), - K(schema_status), K(core_schema_version), K(new_core_schema_version)); + LOG_WARN("core schema version change, try again", + KR(ret), K(schema_status), K(core_schema_version), K(schema_version)); ret = OB_SUCCESS; } } @@ -5765,20 +5804,17 @@ int ObServerSchemaService::try_fetch_publish_core_schemas( } else { ObArray core_schemas; ObArray core_table_ids; - int64_t new_core_schema_version = 0; if (OB_FAIL(schema_service_->get_core_table_schemas( sql_client, schema_status, core_schemas))) { LOG_WARN("get_core_table_schemas failed", KR(ret), K(schema_status), K(core_table_ids)); - } else if (OB_FAIL(schema_service_->get_core_version( - sql_client, schema_status, new_core_schema_version))) { - LOG_WARN("get_core_version failed", KR(ret), K(schema_status)); - } else if (new_core_schema_version != core_schema_version) { - core_schema_change = true; - LOG_INFO("core schema change", K(schema_status), - K(core_schema_version), K(new_core_schema_version)); + } else if (OB_FAIL(check_core_schema_change_(sql_client, schema_status, + core_schema_version, core_schema_change))) { + LOG_WARN("fail to check core schema version change", KR(ret), K(schema_status), K(core_schema_version)); + } else if (core_schema_change) { + LOG_WARN("core schema version change", + KR(ret), K(schema_status), K(core_schema_version)); } else { // core schema don't change, publish core schemas - core_schema_change = false; ObArray core_tables; for (int64_t i = 0; i < core_schemas.count() && OB_SUCC(ret); ++i) { if (OB_FAIL(core_tables.push_back(&core_schemas.at(i)))) { @@ -6326,7 +6362,6 @@ int ObServerSchemaService::check_core_or_sys_schema_change( { int ret = OB_SUCCESS; int64_t new_schema_version = 0; - int64_t new_core_schema_version = 0; // check whether failed because of sys table schema change, go to suitable pos if (!check_inner_stat()) { ret = OB_INNER_STAT_ERROR; @@ -6334,22 +6369,41 @@ int ObServerSchemaService::check_core_or_sys_schema_change( } else if (OB_FAIL(get_schema_version_in_inner_table( sql_client, schema_status, new_schema_version))) { LOG_WARN("fail to get schema version in inner table", KR(ret), K(schema_status)); + } else if (OB_FAIL(check_core_schema_change_(sql_client, schema_status, + core_schema_version, core_schema_change))) { + LOG_WARN("fail to check core schema change", KR(ret), K(schema_status), K(core_schema_version)); + } else if (core_schema_change) { + sys_schema_change = true; + LOG_WARN("core schema change", KR(ret), K(schema_status), K(core_schema_version), K(new_schema_version)); } else if (OB_FAIL(check_sys_schema_change(sql_client, schema_status, schema_version, new_schema_version, sys_schema_change))) { LOG_WARN("sys schema change during refresh schema", KR(ret), K(schema_status), K(schema_version), K(new_schema_version)); } - if (OB_SUCCESS != ret && OB_NOT_NULL(schema_service_)) { - // check whether failed because of core table schema schema - if (OB_FAIL(schema_service_->get_core_version( - sql_client, schema_status, new_core_schema_version))) { - LOG_WARN("get_core_version failed", KR(ret), K(schema_status)); - } else if (new_core_schema_version != core_schema_version) { - ret = OB_SUCCESS; - core_schema_change = true; - LOG_WARN("core schema change during check whether failed because of sys schema change", - KR(ret), K(schema_status), K(core_schema_version), K(new_core_schema_version)); - } + return ret; +} + +int ObServerSchemaService::check_core_schema_change_( + ObISQLClient &sql_client, + const ObRefreshSchemaStatus &schema_status, + const int64_t core_schema_version, + bool &core_schema_change) +{ + int ret = OB_SUCCESS; + int64_t new_core_schema_version = OB_INVALID_VERSION; + if (!check_inner_stat()) { + ret = OB_INNER_STAT_ERROR; + LOG_WARN("inner stat error", KR(ret), K(schema_status)); + } else if (OB_FAIL(schema_service_->get_core_version(sql_client, schema_status, new_core_schema_version))) { + LOG_WARN("fail to get core schema version", KR(ret), K(schema_status)); + } else if (core_schema_version != new_core_schema_version) { + core_schema_change = true; + LOG_WARN("core schema change during refresh sys schema", KR(ret), + K(schema_status), K(core_schema_version), K(new_core_schema_version)); + } else { + core_schema_change = false; + LOG_INFO("core schema is not changed", KR(ret), + K(schema_status), K(core_schema_version), K(new_core_schema_version)); } return ret; } diff --git a/src/share/schema/ob_server_schema_service.h b/src/share/schema/ob_server_schema_service.h index 28d71fd5b2fe9e031c5bcd5b92e9603b7528a86d..71c10b510db0fc2b2e6105f51a82c1569f53a06d 100644 --- a/src/share/schema/ob_server_schema_service.h +++ b/src/share/schema/ob_server_schema_service.h @@ -1110,6 +1110,12 @@ private: const int64_t schema_version, bool &core_schema_change, bool &sys_schema_change); + int check_core_schema_change_( + ObISQLClient &sql_client, + const ObRefreshSchemaStatus &schema_status, + const int64_t core_schema_version, + bool &core_schema_change); + virtual int check_sys_schema_change(common::ObISQLClient &sql_client, const ObRefreshSchemaStatus &schema_status, const int64_t schema_version,