From 450790300931343c0060298d0fbe54567db78e4d Mon Sep 17 00:00:00 2001 From: obdev Date: Mon, 19 Sep 2022 08:03:00 +0000 Subject: [PATCH] [CP] [PHYSICAL RESTORE] Remove dependence of schema fallback --- .../restore/ob_restore_scheduler.cpp | 256 ++++++++++++++---- src/rootserver/restore/ob_restore_scheduler.h | 19 +- src/share/backup/ob_backup_info_mgr.cpp | 103 +++++++ src/share/backup/ob_backup_info_mgr.h | 7 + 4 files changed, 323 insertions(+), 62 deletions(-) diff --git a/src/rootserver/restore/ob_restore_scheduler.cpp b/src/rootserver/restore/ob_restore_scheduler.cpp index 7eb771445..c3c552ae2 100644 --- a/src/rootserver/restore/ob_restore_scheduler.cpp +++ b/src/rootserver/restore/ob_restore_scheduler.cpp @@ -455,10 +455,10 @@ int ObRestoreScheduler::assign_pool_list(const char *str, common::ObIArray member_list_map; DEBUG_SYNC(BEFORE_PHYSICAL_RESTORE_SET_MEMBER_LIST); if (!inited_) { @@ -897,8 +897,9 @@ int ObRestoreScheduler::set_member_list(const ObPhysicalRestoreJob &job_info, co LOG_WARN("not inited", K(ret)); } else if (OB_FAIL(check_stop())) { LOG_WARN("restore scheduler stopped", K(ret)); - } else if (OB_FAIL(member_list_map.create( - bucket_num, ObModIds::OB_RESTORE_SET_MEMBER_LIST, ObModIds::OB_RESTORE_SET_MEMBER_LIST))) { + } else if (OB_FAIL(member_list_map.create(hash::cal_next_prime(BUCKET_NUM), + ObModIds::OB_RESTORE_SET_MEMBER_LIST, + ObModIds::OB_RESTORE_SET_MEMBER_LIST))) { LOG_WARN("fail to create hashmap", K(ret), K(tenant_id)); } else if (OB_FAIL(build_member_list_map(tenant_id, member_list_map))) { LOG_WARN("fail to build member_list_map", K(ret), K(tenant_id)); @@ -1723,10 +1724,9 @@ int ObRestoreScheduler::filter_schema(const ObPhysicalRestoreJob &job_info) } else { common::hash::ObHashSet table_white_list; common::hash::ObHashSet tablegroup_white_list; - const int64_t BUCKET_NUM = 1024; - if (OB_FAIL(table_white_list.create(BUCKET_NUM))) { + if (OB_FAIL(table_white_list.create(hash::cal_next_prime(BUCKET_NUM)))) { LOG_WARN("fail to init table_white_list", KR(ret)); - } else if (OB_FAIL(tablegroup_white_list.create(BUCKET_NUM))) { + } else if (OB_FAIL(tablegroup_white_list.create(hash::cal_next_prime(BUCKET_NUM)))) { LOG_WARN("fail to init tablegroup_white_list", KR(ret)); } else if (OB_FAIL(gen_white_list(job_info, table_items, table_white_list, tablegroup_white_list))) { LOG_WARN("fail to gen white list", KR(ret), K(table_items)); @@ -2450,11 +2450,10 @@ int ObRestoreScheduler::convert_table_options(const uint64_t tenant_id) int ObRestoreScheduler::convert_index_status(const ObPhysicalRestoreJob &job_info) { int ret = OB_SUCCESS; - ObSchemaGetterGuard base_guard; // schema_guard with schema_version using by data backup ObSchemaGetterGuard schema_guard; // schema_guard with local latest schema version ObArray error_index_ids; + ObArray avaliable_index_ids; ObArray unavaliable_index_ids; - ObMultiVersionSchemaService::RefreshSchemaMode mode = ObMultiVersionSchemaService::FORCE_FALLBACK; uint64_t tenant_id = job_info.tenant_id_; if (!inited_) { ret = OB_NOT_INIT; @@ -2464,9 +2463,6 @@ int ObRestoreScheduler::convert_index_status(const ObPhysicalRestoreJob &job_inf LOG_WARN("invalid tenant id", KR(ret), K(tenant_id)); } else if (OB_FAIL(check_stop())) { LOG_WARN("restore scheduler stopped", KR(ret)); - } else if (OB_FAIL(schema_service_->get_tenant_schema_guard( - tenant_id, base_guard, job_info.schema_version_, OB_INVALID_VERSION /*latest*/, mode))) { - LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id), "schema_version", job_info.schema_version_); } else if (OB_FAIL(schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) { LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id)); } else { @@ -2479,37 +2475,29 @@ int ObRestoreScheduler::convert_index_status(const ObPhysicalRestoreJob &job_inf if (OB_ISNULL(table_schema)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("error unexpected, table schema is NULL", KR(ret)); + } else if (is_inner_table(table_schema->get_table_id())) { + // inner_table's index won't rebuild, just skip } else if (table_schema->is_index_table()) { const uint64_t index_id = table_schema->get_table_id(); const ObIndexStatus index_status = table_schema->get_index_status(); - if (INDEX_STATUS_UNAVAILABLE == index_status || INDEX_STATUS_RESTORE_INDEX_ERROR == index_status) { // case 1 + if (INDEX_STATUS_UNAVAILABLE == index_status || INDEX_STATUS_RESTORE_INDEX_ERROR == index_status) { + // case 1 if (OB_FAIL(error_index_ids.push_back(index_id))) { LOG_WARN("fail to push back index id", KR(ret), K(index_id)); } } else if (INDEX_STATUS_INDEX_ERROR == index_status || INDEX_STATUS_UNUSABLE == index_status) { // case 2, just skip } else if (INDEX_STATUS_AVAILABLE == index_status) { - const ObSimpleTableSchemaV2 *index_schema = NULL; - if (OB_FAIL(base_guard.get_table_schema(index_id, index_schema))) { - LOG_WARN("fail to get index schema", KR(ret), K(index_id)); - } else if (OB_ISNULL(index_schema)) { - // case 3.2 index is created when clog backup. - if (OB_FAIL(unavaliable_index_ids.push_back(index_id))) { - LOG_WARN("fail to push back index id", KR(ret), K(index_id)); - } - } else if (INDEX_STATUS_AVAILABLE != index_schema->get_index_status()) { - // case 3.2 index is avaliable when clog backup. - if (OB_FAIL(unavaliable_index_ids.push_back(index_id))) { - LOG_WARN("fail to push back index id", KR(ret), K(index_id)); - } - } else { - // case 3.1, just skip + if (OB_FAIL(avaliable_index_ids.push_back(index_id))) { + LOG_WARN("fail to push back index id", KR(ret), K(index_id)); } } } } } - if (FAILEDx(update_index_status(error_index_ids, INDEX_STATUS_INDEX_ERROR))) { + if (FAILEDx(generate_unavaliable_index_ids_(job_info, avaliable_index_ids, unavaliable_index_ids))) { // case 3 + LOG_WARN("fail to generate unavaliable_index_ids", KR(ret), K(tenant_id)); + } else if (OB_FAIL(update_index_status(error_index_ids, INDEX_STATUS_INDEX_ERROR))) { LOG_WARN("fail to update index status", KR(ret), K(tenant_id)); } else if (OB_FAIL(update_index_status(unavaliable_index_ids, INDEX_STATUS_UNAVAILABLE))) { LOG_WARN("fail to update index status", KR(ret), K(tenant_id)); @@ -2518,6 +2506,123 @@ int ObRestoreScheduler::convert_index_status(const ObPhysicalRestoreJob &job_inf return ret; } +int ObRestoreScheduler::generate_unavaliable_index_ids_(const ObPhysicalRestoreJob &job_info, + const ObIArray &avaliable_index_ids, ObIArray &unavaliable_index_ids) +{ + int ret = OB_SUCCESS; + const int64_t index_cnt = avaliable_index_ids.count(); + if (!inited_) { + ret = OB_NOT_INIT; + LOG_WARN("not inited", KR(ret)); + } else if (OB_FAIL(check_stop())) { + LOG_WARN("restore scheduler stopped", KR(ret)); + } else if (index_cnt <= 0) { + // skip + } else { + const uint64_t tenant_id = job_info.tenant_id_; + const int64_t schema_version = job_info.schema_version_; + common::hash::ObHashSet base_avaliable_index_ids; + if (OB_FAIL(base_avaliable_index_ids.create(hash::cal_next_prime(BUCKET_NUM), "BaseIdxes", "BaseIdxes"))) { + LOG_WARN("failed to create base_avaliable_index_ids", KR(ret)); + } + + const int64_t BATCH_FETCH_CNT = 1000; + int64_t start_idx = 0; + int64_t end_idx = min(start_idx + BATCH_FETCH_CNT, index_cnt); + while (OB_SUCC(ret) && end_idx <= index_cnt && start_idx < end_idx) { + if (OB_FAIL(check_stop())) { + LOG_WARN("restore scheduler stopped", KR(ret)); + } else if (OB_FAIL(batch_fetch_base_avaliable_index_ids_( + tenant_id, schema_version, avaliable_index_ids, start_idx, end_idx, base_avaliable_index_ids))) { + LOG_WARN("fail to fetch base avaliable index tids", KR(ret), K(start_idx), K(end_idx)); + } else { + start_idx = end_idx; + end_idx = min(start_idx + BATCH_FETCH_CNT, index_cnt); + } + } // end while + + for (int64_t i = 0; OB_SUCC(ret) && i < index_cnt; i++) { + const uint64_t &index_id = avaliable_index_ids.at(i); + int64_t hash_ret = base_avaliable_index_ids.exist_refactored(index_id); + if (OB_HASH_EXIST == hash_ret) { + // index is avaliable in base schema version, skip rebuild + } else if (OB_HASH_NOT_EXIST == hash_ret) { + // index not exist or not avaliable in base schema version + if (OB_FAIL(unavaliable_index_ids.push_back(index_id))) { + LOG_WARN("fail to push back index_id", KR(ret), K(index_id)); + } + } else { + ret = OB_SUCCESS == hash_ret ? OB_ERR_UNEXPECTED : hash_ret; + LOG_WARN("fail to check index_id exist", KR(ret), K(hash_ret), K(index_id)); + } + } // end for + } + return ret; +} +// [start_idx, end_idx) +int ObRestoreScheduler::batch_fetch_base_avaliable_index_ids_(const uint64_t tenant_id, const int64_t schema_version, + const ObIArray &avaliable_index_ids, const int64_t start_idx, const int64_t end_idx, + common::hash::ObHashSet &base_avaliable_index_ids) +{ + int ret = OB_SUCCESS; + const int64_t index_cnt = avaliable_index_ids.count(); + if (!inited_) { + ret = OB_NOT_INIT; + LOG_WARN("not inited", K(ret)); + } else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || OB_SYS_TENANT_ID == tenant_id || schema_version <= 0 || + start_idx >= end_idx || end_idx > index_cnt)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arg", KR(ret), K(tenant_id), K(schema_version), K(start_idx), K(end_idx), K(index_cnt)); + } else { + ObSqlString sql; + HEAP_VAR(ObMySQLProxy::MySQLResult, res) + { + common::sqlclient::ObMySQLResult *result = NULL; + if (OB_FAIL(sql.assign_fmt("SELECT table_id FROM ( " + "SELECT table_id, is_deleted, index_status, " + "ROW_NUMBER() OVER (PARTITION BY table_id ORDER BY schema_version DESC) AS RN " + "FROM %s WHERE tenant_id = 0 AND schema_version <= %ld and table_id in (", + OB_ALL_TABLE_V2_HISTORY_TNAME, + schema_version))) { + LOG_WARN("fail to assign sql", KR(ret), K(tenant_id), K(schema_version)); + } + for (int64_t i = start_idx; OB_SUCC(ret) && i < end_idx; i++) { + if (OB_FAIL(sql.append_fmt("%s %ld", + i == start_idx ? "" : ",", + ObSchemaUtils::get_extract_schema_id(tenant_id, avaliable_index_ids.at(i))))) { + LOG_WARN("fail to append sql", KR(ret), K(i), "index_id", avaliable_index_ids.at(i)); + } + } // end for + if (FAILEDx(sql.append_fmt(")) AS A WHERE A.rn = 1 AND is_deleted = 0 AND index_status = %ld", + static_cast(INDEX_STATUS_AVAILABLE)))) { + LOG_WARN("fail to append sql", KR(ret)); + } else if (OB_FAIL(sql_proxy_->read(res, tenant_id, sql.ptr()))) { + LOG_WARN("fail to execute read", KR(ret), K(tenant_id), K(sql)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get result", KR(ret), K(tenant_id), K(sql)); + } else { + uint64_t index_id = OB_INVALID_ID; + while (OB_SUCC(ret) && OB_SUCC(result->next())) { + EXTRACT_INT_FIELD_MYSQL_WITH_TENANT_ID(*result, "table_id", index_id, tenant_id); + if (FAILEDx(base_avaliable_index_ids.set_refactored(index_id))) { // overwrite + LOG_WARN("fail to set index_id", KR(ret), K(index_id)); + } else { + LOG_TRACE("get base avaliable index id", KR(ret), K(index_id)); + } + } // end while + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + } else { + ret = OB_SUCC(ret) ? OB_ERR_UNEXPECTED : ret; + LOG_WARN("iter failed", KR(ret)); + } + } + } // end HEAP_VAR + } + return ret; +} + int ObRestoreScheduler::update_index_status(const common::ObIArray &index_ids, ObIndexStatus index_status) { int ret = OB_SUCCESS; @@ -2635,11 +2740,10 @@ int ObRestoreScheduler::create_user_partitions(const ObPhysicalRestoreJob &job_i int ret = OB_SUCCESS; uint64_t tenant_id = job_info.tenant_id_; ObSchemaGetterGuard latest_guard; - ObSchemaGetterGuard base_guard; int64_t local_schema_version = OB_INVALID_VERSION; - ObMultiVersionSchemaService::RefreshSchemaMode mode = ObMultiVersionSchemaService::FORCE_FALLBACK; ObArray tables; ObArray tablegroups; + common::hash::ObHashSet pg_key_set; DEBUG_SYNC(BEFORE_PHYSICAL_RESTORE_USER_PARTITIONS); if (!inited_) { ret = OB_NOT_INIT; @@ -2652,6 +2756,8 @@ int ObRestoreScheduler::create_user_partitions(const ObPhysicalRestoreJob &job_i } else if (job_info.schema_version_ <= 0) { ret = OB_INVALID_ARGUMENT; LOG_WARN("schema version is invalid", KR(ret), K(job_info)); + } else if (OB_FAIL(pg_key_set.create(hash::cal_next_prime(BUCKET_NUM), "ResDataPGKeys", "ResDataPGKeys"))) { + LOG_WARN("failed to create pg_key set", KR(ret)); } else if (OB_FAIL(schema_service_->get_tenant_schema_guard(tenant_id, latest_guard))) { LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id)); } else if (OB_FAIL(latest_guard.get_schema_version(tenant_id, local_schema_version))) { @@ -2659,9 +2765,8 @@ int ObRestoreScheduler::create_user_partitions(const ObPhysicalRestoreJob &job_i } else if (job_info.schema_version_ > local_schema_version) { ret = OB_EAGAIN; LOG_WARN("local schema is old, try again", KR(ret), K(local_schema_version), K(job_info)); - } else if (OB_FAIL(schema_service_->get_tenant_schema_guard( - tenant_id, base_guard, job_info.schema_version_, OB_INVALID_VERSION /*latest*/, mode))) { - LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id), "schema_version", job_info.schema_version_); + } else if (OB_FAIL(get_pg_keys_for_physical_restore_data_(job_info, pg_key_set))) { + LOG_WARN("fail to get pg_key set", KR(ret), K(job_info)); } else if (OB_FAIL(latest_guard.get_user_table_schemas_in_tenant(tenant_id, tables))) { LOG_WARN("get tenant table schemas failed", KR(ret), K(tenant_id)); } else if (OB_FAIL(latest_guard.get_tablegroup_schemas_in_tenant(tenant_id, tablegroups))) { @@ -2674,7 +2779,6 @@ int ObRestoreScheduler::create_user_partitions(const ObPhysicalRestoreJob &job_i const int64_t PARTITION_CNT_PER_RPC = 5; for (int64_t i = 0; i < tablegroups.count() && OB_SUCC(ret); ++i) { const ObTablegroupSchema *tablegroup = tablegroups.at(i); - const ObTablegroupSchema *base_tablegroup = NULL; uint64_t tablegroup_id = OB_INVALID_ID; ObRestorePartitionsArg arg; if (OB_FAIL(check_stop())) { @@ -2685,9 +2789,7 @@ int ObRestoreScheduler::create_user_partitions(const ObPhysicalRestoreJob &job_i } else if (FALSE_IT(tablegroup_id = tablegroup->get_tablegroup_id())) { } else if (!tablegroup->has_self_partition()) { // bypass - } else if (OB_FAIL(base_guard.get_tablegroup_schema(tablegroup_id, base_tablegroup))) { - LOG_WARN("fail to get base tablegroup schema", KR(ret), K(tablegroup_id)); - } else if (OB_FAIL(fill_restore_partition_arg(tablegroup_id, base_tablegroup, arg))) { + } else if (OB_FAIL(fill_restore_partition_arg_(*tablegroup, pg_key_set, arg))) { LOG_WARN("fail to fill restore partition arg", KR(ret), K(tablegroup_id)); } else { int64_t timeout = (tablegroup->get_all_part_num() / PARTITION_CNT_PER_RPC) * TIMEOUT_PER_RPC; @@ -2701,7 +2803,6 @@ int ObRestoreScheduler::create_user_partitions(const ObPhysicalRestoreJob &job_i } for (int64_t i = 0; i < tables.count() && OB_SUCC(ret); ++i) { const ObSimpleTableSchemaV2 *table = tables.at(i); - const ObSimpleTableSchemaV2 *base_table = NULL; uint64_t table_id = OB_INVALID_ID; ObRestorePartitionsArg arg; if (OB_FAIL(check_stop())) { @@ -2715,9 +2816,7 @@ int ObRestoreScheduler::create_user_partitions(const ObPhysicalRestoreJob &job_i } else if (is_inner_table(table_id)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("should not be inner table", KR(ret), K(table_id)); - } else if (OB_FAIL(base_guard.get_table_schema(table_id, base_table))) { - LOG_WARN("fail to get base table schema", KR(ret), K(table_id)); - } else if (OB_FAIL(fill_restore_partition_arg(table_id, base_table, arg))) { + } else if (OB_FAIL(fill_restore_partition_arg_(*table, pg_key_set, arg))) { LOG_WARN("fail to fill restore partition arg", KR(ret), K(table_id)); } else { int64_t timeout = (table->get_all_part_num() / PARTITION_CNT_PER_RPC) * TIMEOUT_PER_RPC; @@ -2738,32 +2837,71 @@ int ObRestoreScheduler::create_user_partitions(const ObPhysicalRestoreJob &job_i return ret; } -int ObRestoreScheduler::fill_restore_partition_arg( - const uint64_t schema_id, const ObPartitionSchema *schema, obrpc::ObRestorePartitionsArg &arg) +int ObRestoreScheduler::get_pg_keys_for_physical_restore_data_( + const ObPhysicalRestoreJob &job_info, common::hash::ObHashSet &pg_key_set) +{ + int ret = OB_SUCCESS; + ObRestoreBackupInfoUtil::GetRestoreBackupInfoParam param; + ObArray normal_pg_keys; + const uint64_t tenant_id = job_info.tenant_id_; + if (!inited_) { + ret = OB_NOT_INIT; + LOG_WARN("not inited", KR(ret)); + } else if (OB_FAIL(check_stop())) { + LOG_WARN("restore scheduler stopped", KR(ret)); + } else if (OB_FAIL(fill_restore_backup_info_param(job_info, param))) { + LOG_WARN("fail to fill restore backup info param", KR(ret), K(job_info)); + } else if (OB_FAIL(ObRestoreBackupInfoUtil::get_restore_normal_pg_keys(param, normal_pg_keys))) { + LOG_WARN("fail to get restore normal pg keys", KR(ret), K(job_info)); + } else { + ObPGKey new_pg_key; + for (int64_t i = 0; OB_SUCC(ret) && i < normal_pg_keys.count(); i++) { + const ObPGKey &pg_key = normal_pg_keys.at(i); + // convert table_id with new tenant_id + if (OB_FAIL(new_pg_key.init( + combine_id(tenant_id, pg_key.get_table_id()), pg_key.get_partition_id(), pg_key.get_partition_cnt()))) { + LOG_WARN("fail to init new pg_key", KR(ret), K(tenant_id), K(pg_key)); + } else if (OB_FAIL(pg_key_set.set_refactored(new_pg_key))) { // overwrite + LOG_WARN("fail to set pg_key", KR(ret), K(new_pg_key), K(pg_key)); + } else { + LOG_TRACE("get base restore pg_key", KR(ret), K(new_pg_key), K(pg_key)); + } + } // end for + } + return ret; +} + +int ObRestoreScheduler::fill_restore_partition_arg_(const ObPartitionSchema &schema, + const common::hash::ObHashSet &pg_key_set, obrpc::ObRestorePartitionsArg &arg) { int ret = OB_SUCCESS; - arg.schema_id_ = schema_id; + arg.schema_id_ = schema.get_table_id(); arg.mode_ = OB_CREATE_TABLE_MODE_PHYSICAL_RESTORE; bool skip = false; - if (OB_ISNULL(schema)) { - // schema doesn't exist for data restore, and log restore is needed. - skip = true; - } else if (!is_new_tablegroup_id(arg.schema_id_)) { - // unavaliable index for data restore - const ObTableSchema *table = static_cast(schema); - if (table->has_self_partition() && table->is_global_index_table() && - (table->is_dropped_schema() || INDEX_STATUS_AVAILABLE != table->get_index_status())) { + if (!is_new_tablegroup_id(arg.schema_id_)) { + const ObTableSchema &table = static_cast(schema); + if (table.has_self_partition() && table.is_global_index_table() && + (table.is_dropped_schema() || INDEX_STATUS_AVAILABLE != table.get_index_status())) { skip = true; } } if (OB_SUCC(ret) && !skip) { bool check_dropped_schema = false; - ObPartitionKeyIter iter(arg.schema_id_, *schema, check_dropped_schema); - int64_t partition_id = OB_INVALID_ID; - while (OB_SUCC(ret) && OB_SUCC(iter.next_partition_id_v2(partition_id))) { - if (OB_FAIL(arg.partition_ids_.push_back(partition_id))) { - LOG_WARN("fail to push back partition_id", K(ret), K(schema_id), K(partition_id)); + ObPartitionKeyIter iter(arg.schema_id_, schema, check_dropped_schema); + ObPGKey pg_key; + while (OB_SUCC(ret) && OB_SUCC(iter.next_partition_key_v2(pg_key))) { + int hash_ret = pg_key_set.exist_refactored(pg_key); + if (OB_HASH_NOT_EXIST == hash_ret) { + // skip + LOG_TRACE("pg_key not exist in base version, just skip", KR(ret), K(pg_key)); + } else if (OB_HASH_EXIST == hash_ret) { + if (OB_FAIL(arg.partition_ids_.push_back(pg_key.get_partition_id()))) { + LOG_WARN("fail to push back partition_id", K(ret), K(pg_key)); + } + } else { + ret = OB_SUCCESS == hash_ret ? OB_ERR_UNEXPECTED : hash_ret; + LOG_WARN("fail to check pg key exist", KR(ret), K(hash_ret), K(pg_key)); } } if (OB_ITER_END == ret) { diff --git a/src/rootserver/restore/ob_restore_scheduler.h b/src/rootserver/restore/ob_restore_scheduler.h index ec3572fb2..b9c8864c9 100755 --- a/src/rootserver/restore/ob_restore_scheduler.h +++ b/src/rootserver/restore/ob_restore_scheduler.h @@ -95,7 +95,7 @@ private: /* restore tenant related */ int fill_job_info(share::ObPhysicalRestoreJob& job, obrpc::ObCreateTenantArg& arg); int fill_restore_backup_info_param( - share::ObPhysicalRestoreJob& job, share::ObRestoreBackupInfoUtil::GetRestoreBackupInfoParam& param); + const share::ObPhysicalRestoreJob& job, share::ObRestoreBackupInfoUtil::GetRestoreBackupInfoParam& param); int fill_backup_info(share::ObPhysicalRestoreJob& job, obrpc::ObCreateTenantArg& arg); int fill_pkeys_for_physical_restore_log(const share::ObPhysicalRestoreJob& job, obrpc::ObCreateTenantArg& arg); int fill_rs_info(share::ObPhysicalRestoreJob& job); @@ -119,8 +119,20 @@ private: int convert_parameters(const share::ObPhysicalRestoreJob& job_info); int log_nop_operation(const share::ObPhysicalRestoreJob& job_info); int convert_column_statistic(const uint64_t tenant_id); + int generate_unavaliable_index_ids_(const share::ObPhysicalRestoreJob &job_info, + const common::ObIArray &avaliable_index_ids, common::ObIArray &unavaliable_index_ids); + int batch_fetch_base_avaliable_index_ids_(const uint64_t tenant_id, const int64_t schema_version, + const common::ObIArray &avaliable_index_ids, const int64_t start_idx, const int64_t end_idx, + common::hash::ObHashSet &base_avaliable_index_ids); /*------------------------*/ + /*---create user partitions---*/ + int get_pg_keys_for_physical_restore_data_( + const share::ObPhysicalRestoreJob &job_info, common::hash::ObHashSet &pg_key_set); + int fill_restore_partition_arg_(const share::schema::ObPartitionSchema &schema, + const common::hash::ObHashSet &pg_key_set, obrpc::ObRestorePartitionsArg &arg); + /*----------------------------*/ + /* filter schema */ int gen_white_list(const share::ObPhysicalRestoreJob& job_info, const common::ObIArray& table_items, common::hash::ObHashSet& table_white_list, @@ -170,8 +182,6 @@ private: const PhysicalRestorePartition& partition, const ObPhysicalRestoreStat& stat, share::ObDMLSqlSplicer& dml); int clear_member_list_table(const uint64_t tenant_id); - int fill_restore_partition_arg( - const uint64_t schema_id, const share::schema::ObPartitionSchema* schema, obrpc::ObRestorePartitionsArg& arg); /*------------------------*/ /* upgrade related */ @@ -191,6 +201,9 @@ private: private: int drop_tenant_force_if_necessary(const share::ObPhysicalRestoreJob& job_info); + +private: + static const int64_t BUCKET_NUM = 1024; bool inited_; mutable ObRestoreIdling idling_; share::schema::ObMultiVersionSchemaService* schema_service_; diff --git a/src/share/backup/ob_backup_info_mgr.cpp b/src/share/backup/ob_backup_info_mgr.cpp index 1e4613d18..e9b8153a4 100644 --- a/src/share/backup/ob_backup_info_mgr.cpp +++ b/src/share/backup/ob_backup_info_mgr.cpp @@ -1293,6 +1293,109 @@ int ObRestoreBackupInfoUtil::get_restore_sys_table_ids( return OB_SUCCESS; } +int ObRestoreBackupInfoUtil::get_restore_normal_pg_keys( + const GetRestoreBackupInfoParam ¶m, common::ObIArray &pkey_list) +{ + int ret = OB_SUCCESS; + bool is_cluster_level = param.backup_set_path_list_.empty(); + if (is_cluster_level) { + if (OB_FAIL(get_restore_normal_pg_keys_v1_(param, pkey_list))) { + LOG_WARN("failed to get restore normal pg keys v1", KR(ret)); + } + } else { + if (OB_FAIL(get_restore_normal_pg_keys_v2_(param, pkey_list))) { + LOG_WARN("failed to get restore normal pg keys v2", KR(ret)); + } + } + return ret; +} + +int ObRestoreBackupInfoUtil::get_restore_normal_pg_keys_v1_( + const GetRestoreBackupInfoParam ¶m, common::ObIArray &pkey_list) +{ + int ret = OB_SUCCESS; + ObClusterBackupDest dest; + ObExternBackupInfoMgr backup_info_mgr; + ObExternTenantInfoMgr tenant_info_mgr; + ObExternTenantInfo tenant_info; + ObExternBackupInfo backup_info; + const int64_t cluster_version = ObClusterVersion::get_instance().get_cluster_version(); + ObFakeBackupLeaseService fake_backup_lease; + ObTenantNameSimpleMgr tenant_name_mgr; + ObExternPGListMgr pg_list_mgr; + uint64_t backup_tenant_id = 0; + const char *backup_dest = param.backup_dest_; + const char *backup_cluster_name = param.backup_cluster_name_; + const int64_t cluster_id = param.cluster_id_; + const int64_t incarnation = param.incarnation_; + const char *backup_tenant_name = param.backup_tenant_name_; + const int64_t restore_timestamp = param.restore_timestamp_; + const char *passwd_array = param.passwd_array_; + + if (OB_ISNULL(backup_dest) || OB_ISNULL(backup_cluster_name) || cluster_id <= 0 || incarnation < 0 || + OB_ISNULL(backup_tenant_name) || restore_timestamp <= 0 || OB_ISNULL(passwd_array)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get restore backup info get invalid argument", + K(ret), + KP(backup_dest), + KP(backup_cluster_name), + K(cluster_id), + KP(backup_tenant_name), + K(restore_timestamp), + KP(passwd_array)); + } else if (strlen(backup_dest) >= share::OB_MAX_BACKUP_DEST_LENGTH || + strlen(backup_cluster_name) >= OB_MAX_CLUSTER_NAME_LENGTH) { + ret = OB_SIZE_OVERFLOW; + LOG_WARN("backup dest or backup cluster name size over flow", K(ret), K(backup_dest), K(backup_cluster_name)); + } else if (OB_FAIL(dest.set(backup_dest, backup_cluster_name, cluster_id, incarnation))) { + LOG_WARN( + "failed to set backup dest", K(ret), K(backup_dest), K(backup_cluster_name), K(cluster_id), K(incarnation)); + } else if (OB_FAIL(tenant_name_mgr.init())) { + LOG_WARN("faiuiled to init tenant_name mgr", K(ret)); + } else if (OB_FAIL(tenant_name_mgr.read_backup_file(dest))) { + LOG_WARN("failed to read backup tenant name mgr", K(ret), K(dest)); + } else if (OB_FAIL(tenant_name_mgr.get_tenant_id(backup_tenant_name, restore_timestamp, backup_tenant_id))) { + LOG_WARN("failed to backup tenant id", K(ret), K(backup_tenant_name), K(restore_timestamp)); + } else if (OB_FAIL(tenant_info_mgr.init(dest, fake_backup_lease))) { + LOG_WARN("failed to init tenant info mgr", K(ret), K(dest)); + } else if (OB_FAIL(tenant_info_mgr.find_tenant_info(backup_tenant_id, tenant_info))) { + LOG_WARN("failed to find tenant info", K(ret), K(backup_tenant_id)); + } else if (OB_FAIL(backup_info_mgr.init(tenant_info.tenant_id_, dest, fake_backup_lease))) { + LOG_WARN("failed to init backup info mgr", K(ret), K(dest), K(tenant_info)); + } else if (OB_FAIL(backup_info_mgr.find_backup_info(restore_timestamp, passwd_array, backup_info))) { + LOG_WARN("failed to find backup info", K(ret), K(restore_timestamp), K(tenant_info)); + } else if (OB_FAIL(pg_list_mgr.init(backup_tenant_id, + backup_info.full_backup_set_id_, + backup_info.inc_backup_set_id_, + dest, + backup_info.date_, + backup_info.compatible_, + fake_backup_lease))) { + LOG_WARN("failed to init pg list mgr", K(ret), K(backup_info), K(dest)); + } else if (OB_FAIL(pg_list_mgr.get_normal_pg_list(pkey_list))) { + LOG_WARN("failed to get sys pg list", K(ret), K(backup_info), K(dest)); + } + return ret; +} + +int ObRestoreBackupInfoUtil::get_restore_normal_pg_keys_v2_( + const GetRestoreBackupInfoParam ¶m, common::ObIArray &pkey_list) +{ + int ret = OB_SUCCESS; + ObFakeBackupLeaseService fake_backup_lease; + ObExternPGListMgr pg_list_mgr; + ObSimpleBackupSetPath simple_set_path; // largest backup set path + + if (OB_FAIL(param.get_largest_backup_set_path(simple_set_path))) { + LOG_WARN("failed to get smallest largest backup set path", K(ret)); + } else if (OB_FAIL(pg_list_mgr.init(simple_set_path, fake_backup_lease))) { + LOG_WARN("failed to get sys pg list", K(ret), K(simple_set_path)); + } else if (OB_FAIL(pg_list_mgr.get_normal_pg_list(pkey_list))) { + LOG_WARN("failed to get sys pg list", K(ret), K(simple_set_path)); + } + return ret; +} + int ObRestoreBackupInfoUtil::check_is_snapshot_restore(const int64_t backup_snapshot, const int64_t restore_timestamp, const uint64_t cluster_version, bool &is_snapshot_restore) { diff --git a/src/share/backup/ob_backup_info_mgr.h b/src/share/backup/ob_backup_info_mgr.h index 96ce9c226..1ba731320 100644 --- a/src/share/backup/ob_backup_info_mgr.h +++ b/src/share/backup/ob_backup_info_mgr.h @@ -166,6 +166,9 @@ public: static int get_restore_sys_table_ids( const ObPhysicalRestoreInfo &info, common::ObIArray &pkey_list); + static int get_restore_normal_pg_keys( + const GetRestoreBackupInfoParam ¶m, common::ObIArray &pkey_list); + static int check_is_snapshot_restore(const int64_t backup_snapshot, const int64_t restore_timestamp, const uint64_t cluster_version, bool &is_snapshot_restore); @@ -174,6 +177,10 @@ private: static int get_restore_backup_info_v1_(const GetRestoreBackupInfoParam ¶m, ObRestoreBackupInfo &info); // get info from simple path level static int get_restore_backup_info_v2_(const GetRestoreBackupInfoParam ¶m, ObRestoreBackupInfo &info); + static int get_restore_normal_pg_keys_v1_( + const GetRestoreBackupInfoParam ¶m, common::ObIArray &pkey_list); + static int get_restore_normal_pg_keys_v2_( + const GetRestoreBackupInfoParam ¶m, common::ObIArray &pkey_list); static int inner_get_restore_backup_set_info_(const GetRestoreBackupInfoParam ¶m, ObBackupSetFileInfo &backup_set_info, ObExternTenantLocalityInfo &tenant_locality_info, common::ObIArray &sys_pg_keys); -- GitLab