提交 e8bb18cd 编写于 作者: T tino247 提交者: ob-robot

Fix core table full schema miss problem

上级 7c48e979
......@@ -5318,7 +5318,6 @@ int ObServerSchemaService::refresh_full_schema(
bool sys_schema_change = true;
int64_t local_schema_version = 0;
int64_t core_schema_version = 0;
int64_t new_core_schema_version = 0;
int64_t schema_version = 0;
if (OB_FAIL(schema_mgr_for_cache_map_.get_refactored(tenant_id, schema_mgr_for_cache))) {
LOG_WARN("fail to get schema_mgr_for_cache", KR(ret), K(schema_status));
......@@ -5343,6 +5342,10 @@ int ObServerSchemaService::refresh_full_schema(
if (OB_FAIL(schema_service_->get_core_version(
sql_client, schema_status, core_schema_version))) {
LOG_WARN("get_core_version failed", KR(ret), K(schema_status));
} else if (core_schema_version <= OB_CORE_SCHEMA_VERSION + 1) {
ret = OB_EAGAIN;
LOG_WARN("schema may be not persisted, try again",
KR(ret), K(schema_status), K(core_schema_version));
} else if (core_schema_version > local_schema_version) {
// for core table schema, we publis as core_temp_version
int64_t publish_version = 0;
......@@ -5362,6 +5365,21 @@ int ObServerSchemaService::refresh_full_schema(
if (OB_SUCC(ret) && !core_schema_change && sys_schema_change) {
if (OB_FAIL(get_schema_version_in_inner_table(sql_client, schema_status, schema_version))) {
LOG_WARN("fail to get schema version in inner table", KR(ret), K(schema_status));
} else if (schema_version <= OB_CORE_SCHEMA_VERSION + 1) {
ret = OB_EAGAIN;
LOG_WARN("schema may be not persisted, try again",
KR(ret), K(schema_status), K(schema_version));
} else if (core_schema_version > schema_version) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("schema version fallback, unexpected",
KR(ret), K(schema_status), K(core_schema_version), K(schema_version));
} else if (OB_FAIL(check_core_schema_change_(sql_client, schema_status,
core_schema_version, core_schema_change))) {
LOG_WARN("fail to check core schema version change", KR(ret), K(schema_status), K(core_schema_version));
} else if (core_schema_change) {
sys_schema_change = true;
LOG_WARN("core schema version change, try again",
KR(ret), K(schema_status), K(core_schema_version), K(schema_version));
} else if (OB_FAIL(check_sys_schema_change(sql_client, schema_status,
local_schema_version, schema_version, sys_schema_change))) {
LOG_WARN("check_sys_schema_change failed", KR(ret), K(schema_status), K(schema_version));
......@@ -5384,14 +5402,13 @@ int ObServerSchemaService::refresh_full_schema(
if (OB_FAIL(ret)) {
// check whether failed because of core table schema change, go to suitable pos
int temp_ret = OB_SUCCESS;
if (OB_SUCCESS != (temp_ret = schema_service_->get_core_version(
sql_client, schema_status, new_core_schema_version))) {
LOG_WARN("get_core_version failed", K(temp_ret), K(schema_status));
} else if (new_core_schema_version != core_schema_version) {
core_schema_change = true;
if (OB_SUCCESS != (temp_ret = check_core_schema_change_(
sql_client, schema_status, core_schema_version, core_schema_change))) {
LOG_WARN("get_core_version failed", KR(ret), KR(temp_ret), K(schema_status), K(core_schema_version));
} else if (core_schema_change) {
sys_schema_change = true;
LOG_WARN("core schema change during refresh sys schema", KR(ret),
K(schema_status), K(core_schema_version), K(new_core_schema_version));
LOG_WARN("core schema version change, try again",
KR(ret), K(schema_status), K(core_schema_version), K(schema_version));
ret = OB_SUCCESS;
}
}
......@@ -5593,6 +5610,10 @@ int ObServerSchemaService::refresh_increment_schema(
if (OB_FAIL(schema_service_->get_core_version(
sql_client, schema_status, core_schema_version))) {
LOG_WARN("get_core_version failed", KR(ret), K(schema_status));
} else if (core_schema_version <= OB_CORE_SCHEMA_VERSION + 1) {
ret = OB_EAGAIN;
LOG_WARN("schema may be not persisted, try again",
KR(ret), K(schema_status), K(core_schema_version));
} else if (core_schema_version > local_schema_version) {
int64_t publish_version = OB_INVALID_INDEX;
if (OB_FAIL(ObSchemaService::gen_core_temp_version(
......@@ -5611,6 +5632,27 @@ int ObServerSchemaService::refresh_increment_schema(
if (OB_SUCC(ret) && !core_schema_change && sys_schema_change) {
if (OB_FAIL(get_schema_version_in_inner_table(sql_client, schema_status, schema_version))) {
LOG_WARN("fail to get schema version in inner table", KR(ret), K(schema_status));
} else if (schema_version < local_schema_version) {
if (local_schema_version <= OB_CORE_SCHEMA_VERSION + 1) {
ret = OB_EAGAIN;
LOG_WARN("schema may be not persisted, try again",
KR(ret), K(schema_status), K(schema_version), K(local_schema_version));
} else {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("schema version fallback, unexpected",
KR(ret), K(schema_status), K(schema_version), K(local_schema_version));
}
} else if (core_schema_version > schema_version) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("schema version fallback, unexpected",
KR(ret), K(schema_status), K(core_schema_version), K(schema_version));
} else if (OB_FAIL(check_core_schema_change_(sql_client, schema_status,
core_schema_version, core_schema_change))) {
LOG_WARN("fail to check core schema version change", KR(ret), K(schema_status), K(core_schema_version));
} else if (core_schema_change) {
sys_schema_change = true;
LOG_WARN("core schema version change, try again",
KR(ret), K(schema_status), K(core_schema_version), K(schema_version));
} else if (OB_FAIL(check_sys_schema_change(sql_client, schema_status,
local_schema_version, schema_version, sys_schema_change))) {
LOG_WARN("check_sys_schema_change failed", KR(ret), K(schema_status), K(schema_version));
......@@ -5629,16 +5671,13 @@ int ObServerSchemaService::refresh_increment_schema(
if (OB_FAIL(ret)) {
// check whether failed because of core table schema change, go to suitable pos
int temp_ret = OB_SUCCESS;
int64_t new_core_schema_version = 0;
if (OB_SUCCESS != (temp_ret = schema_service_->get_core_version(
sql_client, schema_status, new_core_schema_version))) {
LOG_WARN("get_core_version failed, need retry",
KR(ret), K(temp_ret), K(schema_status));
} else if (new_core_schema_version != core_schema_version) {
core_schema_change = true;
if (OB_SUCCESS != (temp_ret = check_core_schema_change_(
sql_client, schema_status, core_schema_version, core_schema_change))) {
LOG_WARN("get_core_version failed", KR(ret), KR(temp_ret), K(schema_status), K(core_schema_version));
} else if (core_schema_change) {
sys_schema_change = true;
LOG_WARN("core schema change during refresh sys schema", KR(ret),
K(schema_status), K(core_schema_version), K(new_core_schema_version));
LOG_WARN("core schema version change, try again",
KR(ret), K(schema_status), K(core_schema_version), K(schema_version));
ret = OB_SUCCESS;
}
}
......@@ -5765,20 +5804,17 @@ int ObServerSchemaService::try_fetch_publish_core_schemas(
} else {
ObArray<ObTableSchema> core_schemas;
ObArray<uint64_t> core_table_ids;
int64_t new_core_schema_version = 0;
if (OB_FAIL(schema_service_->get_core_table_schemas(
sql_client, schema_status, core_schemas))) {
LOG_WARN("get_core_table_schemas failed", KR(ret), K(schema_status), K(core_table_ids));
} else if (OB_FAIL(schema_service_->get_core_version(
sql_client, schema_status, new_core_schema_version))) {
LOG_WARN("get_core_version failed", KR(ret), K(schema_status));
} else if (new_core_schema_version != core_schema_version) {
core_schema_change = true;
LOG_INFO("core schema change", K(schema_status),
K(core_schema_version), K(new_core_schema_version));
} else if (OB_FAIL(check_core_schema_change_(sql_client, schema_status,
core_schema_version, core_schema_change))) {
LOG_WARN("fail to check core schema version change", KR(ret), K(schema_status), K(core_schema_version));
} else if (core_schema_change) {
LOG_WARN("core schema version change",
KR(ret), K(schema_status), K(core_schema_version));
} else {
// core schema don't change, publish core schemas
core_schema_change = false;
ObArray<ObTableSchema *> core_tables;
for (int64_t i = 0; i < core_schemas.count() && OB_SUCC(ret); ++i) {
if (OB_FAIL(core_tables.push_back(&core_schemas.at(i)))) {
......@@ -6326,7 +6362,6 @@ int ObServerSchemaService::check_core_or_sys_schema_change(
{
int ret = OB_SUCCESS;
int64_t new_schema_version = 0;
int64_t new_core_schema_version = 0;
// check whether failed because of sys table schema change, go to suitable pos
if (!check_inner_stat()) {
ret = OB_INNER_STAT_ERROR;
......@@ -6334,22 +6369,41 @@ int ObServerSchemaService::check_core_or_sys_schema_change(
} else if (OB_FAIL(get_schema_version_in_inner_table(
sql_client, schema_status, new_schema_version))) {
LOG_WARN("fail to get schema version in inner table", KR(ret), K(schema_status));
} else if (OB_FAIL(check_core_schema_change_(sql_client, schema_status,
core_schema_version, core_schema_change))) {
LOG_WARN("fail to check core schema change", KR(ret), K(schema_status), K(core_schema_version));
} else if (core_schema_change) {
sys_schema_change = true;
LOG_WARN("core schema change", KR(ret), K(schema_status), K(core_schema_version), K(new_schema_version));
} else if (OB_FAIL(check_sys_schema_change(sql_client, schema_status,
schema_version, new_schema_version, sys_schema_change))) {
LOG_WARN("sys schema change during refresh schema", KR(ret), K(schema_status),
K(schema_version), K(new_schema_version));
}
if (OB_SUCCESS != ret && OB_NOT_NULL(schema_service_)) {
// check whether failed because of core table schema schema
if (OB_FAIL(schema_service_->get_core_version(
sql_client, schema_status, new_core_schema_version))) {
LOG_WARN("get_core_version failed", KR(ret), K(schema_status));
} else if (new_core_schema_version != core_schema_version) {
ret = OB_SUCCESS;
core_schema_change = true;
LOG_WARN("core schema change during check whether failed because of sys schema change",
KR(ret), K(schema_status), K(core_schema_version), K(new_core_schema_version));
}
return ret;
}
int ObServerSchemaService::check_core_schema_change_(
ObISQLClient &sql_client,
const ObRefreshSchemaStatus &schema_status,
const int64_t core_schema_version,
bool &core_schema_change)
{
int ret = OB_SUCCESS;
int64_t new_core_schema_version = OB_INVALID_VERSION;
if (!check_inner_stat()) {
ret = OB_INNER_STAT_ERROR;
LOG_WARN("inner stat error", KR(ret), K(schema_status));
} else if (OB_FAIL(schema_service_->get_core_version(sql_client, schema_status, new_core_schema_version))) {
LOG_WARN("fail to get core schema version", KR(ret), K(schema_status));
} else if (core_schema_version != new_core_schema_version) {
core_schema_change = true;
LOG_WARN("core schema change during refresh sys schema", KR(ret),
K(schema_status), K(core_schema_version), K(new_core_schema_version));
} else {
core_schema_change = false;
LOG_INFO("core schema is not changed", KR(ret),
K(schema_status), K(core_schema_version), K(new_core_schema_version));
}
return ret;
}
......
......@@ -1110,6 +1110,12 @@ private:
const int64_t schema_version,
bool &core_schema_change,
bool &sys_schema_change);
int check_core_schema_change_(
ObISQLClient &sql_client,
const ObRefreshSchemaStatus &schema_status,
const int64_t core_schema_version,
bool &core_schema_change);
virtual int check_sys_schema_change(common::ObISQLClient &sql_client,
const ObRefreshSchemaStatus &schema_status,
const int64_t schema_version,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册