diff --git a/deps/oblib/src/lib/utility/ob_tracepoint.h b/deps/oblib/src/lib/utility/ob_tracepoint.h index a92ed3894348693093dd8c5d76746b585bbe819d..922126d483cc8e7128b23415bc4eca3f5d6286b1 100644 --- a/deps/oblib/src/lib/utility/ob_tracepoint.h +++ b/deps/oblib/src/lib/utility/ob_tracepoint.h @@ -501,6 +501,11 @@ public: EN_CLOG_ILOG_MEMSTORE_ALLOC_MEMORY_FAILURE = 268, EN_PREVENT_SYNC_REPORT = 360, EN_PREVENT_ASYNC_REPORT = 361, + + // DDL related 500-550 + EN_SUBMIT_INDEX_TASK_ERROR_BEFORE_STAT_RECORD = 503, + EN_SUBMIT_INDEX_TASK_ERROR_AFTER_STAT_RECORD = 504, + EVENT_TABLE_MAX = SIZE_OF_EVENT_TABLE }; diff --git a/src/observer/ob_service.cpp b/src/observer/ob_service.cpp index 71acfbb3274ff0f62051df38da1c93753f226bd9..ced182b2a1d39b50471da020a381fe855915d21a 100644 --- a/src/observer/ob_service.cpp +++ b/src/observer/ob_service.cpp @@ -51,6 +51,7 @@ #include "storage/ob_partition_scheduler.h" #include "sql/optimizer/ob_opt_est_cost.h" #include "sql/optimizer/ob_join_order.h" +#include "storage/ob_build_index_scheduler.h" #include "rootserver/ob_bootstrap.h" #include "observer/ob_server.h" #include "observer/ob_dump_task_generator.h" @@ -3676,6 +3677,22 @@ int ObService::pre_process_server_reply(const obrpc::ObPreProcessServerReplyArg& return ret; } +int ObService::submit_retry_ghost_index_task(const uint64_t index_id) +{ + int ret = OB_SUCCESS; + ObRetryGhostIndexScheduler &scheduler = ObRetryGhostIndexScheduler::get_instance(); + ObRetryGhostIndexTask task; + if (OB_INVALID_ID == index_id) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid index id", K(ret), K(index_id)); + } else if (OB_FAIL(task.init(index_id))) { + LOG_WARN("fail to init ObRetryGhostIndexTask", K(ret), K(index_id)); + } else if (OB_FAIL(scheduler.push_task(task))) { + LOG_WARN("fail to push ObRetryGhostIndexTask to scheduler", K(ret), K(task)); + } + return ret; +} + int ObService::broadcast_rs_list(const ObRsListArg& arg) { int ret = OB_SUCCESS; diff --git a/src/observer/ob_service.h b/src/observer/ob_service.h index bead08ac30a32919a5cf0d66f39b5beb61199077..f85353e780c4acc5b1fb7892754505bf2d7f0bdf 100644 --- a/src/observer/ob_service.h +++ b/src/observer/ob_service.h @@ -304,6 +304,7 @@ public: int submit_async_refresh_schema_task(const uint64_t tenant_id, const int64_t schema_version); int renew_in_zone_hb(const share::ObInZoneHbRequest& arg, share::ObInZoneHbResponse& result); int pre_process_server_reply(const obrpc::ObPreProcessServerReplyArg& arg); + int submit_retry_ghost_index_task(const uint64_t index_id); private: int register_self(); diff --git a/src/rootserver/ob_global_index_builder.cpp b/src/rootserver/ob_global_index_builder.cpp index 211690bca5b6a6fc8322cf0ecf48d45f0097ec89..babe3526f2ca3a3c89c0cdce1cd4f2ea3d81c998 100644 --- a/src/rootserver/ob_global_index_builder.cpp +++ b/src/rootserver/ob_global_index_builder.cpp @@ -154,7 +154,6 @@ int ObGlobalIndexTask::get_partition_col_checksum_stat( ObGlobalIndexBuilder::ObGlobalIndexBuilder() : inited_(false), - loaded_(false), rpc_proxy_(NULL), mysql_proxy_(NULL), server_mgr_(NULL), @@ -313,8 +312,14 @@ int ObGlobalIndexBuilder::submit_build_global_index_task(const share::schema::Ob ObSqlString sql_string; const int64_t orig_snapshot = 0; int64_t affected_rows = 0; - ObGlobalIndexTask* task_ptr = NULL; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + ObGlobalIndexTask *task_ptr = NULL; + bool skip_set_task_map = false; +#ifdef ERRSIM + ret = E(EventTable::EN_SUBMIT_INDEX_TASK_ERROR_BEFORE_STAT_RECORD) OB_SUCCESS; +#endif + if (OB_SUCCESS != ret) { + LOG_INFO("errsim mock push global index task fail", K(ret)); + } else if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(NULL == index_schema)) { @@ -349,11 +354,31 @@ int ObGlobalIndexBuilder::submit_build_global_index_task(const share::schema::Ob } else if (OB_FAIL(trans.start(mysql_proxy_))) { LOG_WARN("fail to start trans", K(ret)); } else if (OB_FAIL(trans.write(OB_SYS_TENANT_ID, sql_string.ptr(), affected_rows))) { - LOG_WARN("fail to execute write sql", K(ret)); - } else { - if (1 != affected_rows) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("affected rows unexpected", K(ret), K(sql_string)); + ObGlobalIndexTask *tmp_task_ptr = NULL; + int tmp_ret = OB_SUCCESS; + // Ghost index: index schema is existed but somehow the build index task is not there. + // the following logic is for ghost index retry, there could be an index being built in progress + // while the DDL retry scheduler thinks the index is a ghost. we add a new index task when + // this happens though it could lead to repeated task. The repeated task can be detected + // in the task itself. + if (OB_ERR_PRIMARY_KEY_DUPLICATE != ret) { + LOG_WARN("fail to execute write sql", K(ret)); + } else { + ret = OB_SUCCESS; + if (OB_HASH_NOT_EXIST == (tmp_ret = task_map_.get_refactored(index_schema->get_table_id(), tmp_task_ptr))) { + LOG_INFO( + "global index record in __all_index_build_stat, but not in task_map, add it to avoid unexpected miss"); + } else { + skip_set_task_map = true; + } + } + } +#ifdef ERRSIM + ret = E(EventTable::EN_SUBMIT_INDEX_TASK_ERROR_AFTER_STAT_RECORD) OB_SUCCESS; +#endif + if (OB_SUCC(ret)) { + if (skip_set_task_map) { + LOG_INFO("task is already in task map, skip"); } else { task_ptr->tenant_id_ = index_schema->get_tenant_id(); task_ptr->data_table_id_ = index_schema->get_data_table_id(); @@ -518,7 +543,6 @@ int ObGlobalIndexBuilder::reload_building_indexes() ret = OB_SUCCESS; } if (OB_SUCCESS == ret) { - loaded_ = true; idling_.wakeup(); } } @@ -550,7 +574,7 @@ int ObGlobalIndexBuilder::check_and_get_index_schema(share::schema::ObSchemaGett const uint64_t index_table_id, const share::schema::ObTableSchema*& index_schema, bool& index_schema_exist) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(OB_INVALID_ID == index_table_id)) { @@ -578,7 +602,7 @@ int ObGlobalIndexBuilder::generate_original_table_partition_leader_array( { int ret = OB_SUCCESS; UNUSED(schema_guard); - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == data_schema)) { @@ -658,7 +682,7 @@ int ObGlobalIndexBuilder::get_global_index_build_snapshot(ObGlobalIndexTask* tas common::ObArray partition_leader_array; const share::schema::ObTableSchema* data_schema = NULL; share::ObSimpleFrozenStatus frozen_status; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == index_schema || NULL == task)) { @@ -694,7 +718,7 @@ int ObGlobalIndexBuilder::init_build_snapshot_ctx(const common::ObIArray& invalid_snapshot_id_array, common::ObIArray& snapshot_array) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(partition_leader_array.count() <= 0)) { @@ -725,7 +749,7 @@ int ObGlobalIndexBuilder::do_get_associated_snapshot(PROXY& rpc_proxy, ARG& rpc_ // an array which is used to record the partition leader array offset // of all the invalid snapshots common::ObArray invalid_snapshot_id_array; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(all_part_num <= 0 || NULL == task || partition_leader_array.count() <= 0)) { @@ -801,7 +825,7 @@ int ObGlobalIndexBuilder::do_get_associated_snapshot(PROXY& rpc_proxy, ARG& rpc_ int ObGlobalIndexBuilder::switch_state(ObGlobalIndexTask* task, const GlobalIndexBuildStatus next_status) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == task)) { @@ -881,7 +905,7 @@ int ObGlobalIndexBuilder::switch_state(ObGlobalIndexTask* task, const GlobalInde int ObGlobalIndexBuilder::pick_build_snapshot(const common::ObIArray& snapshot_array, int64_t& snapshot) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(snapshot_array.count() <= 0)) { @@ -903,7 +927,7 @@ int ObGlobalIndexBuilder::update_partition_leader_array(common::ObIArray& ret_code_array, const common::ObIArray& invalid_snapshot_id_array) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(ret_code_array.count() != invalid_snapshot_id_array.count())) { @@ -973,7 +997,7 @@ int ObGlobalIndexBuilder::update_build_snapshot_ctx(PROXY& proxy, const common:: common::ObIArray& invalid_snapshot_id_array, common::ObIArray& snapshot_array) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (invalid_snapshot_id_array.count() != ret_code_array.count()) { @@ -1035,7 +1059,7 @@ int ObGlobalIndexBuilder::update_build_snapshot_ctx(PROXY& proxy, const common:: int ObGlobalIndexBuilder::update_task_global_index_build_snapshot(ObGlobalIndexTask* task, const int64_t snapshot) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(NULL == task)) { @@ -1086,7 +1110,7 @@ int ObGlobalIndexBuilder::drive_this_build_single_replica(const share::schema::O { int ret = OB_SUCCESS; UNUSED(schema_guard); - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == task || NULL == index_schema)) { @@ -1146,7 +1170,7 @@ int ObGlobalIndexBuilder::drive_this_build_single_replica(const share::schema::O int ObGlobalIndexBuilder::hold_snapshot(const ObGlobalIndexTask* task, const int64_t snapshot) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(nullptr == task || snapshot < 0)) { @@ -1199,7 +1223,7 @@ int ObGlobalIndexBuilder::launch_new_build_single_replica(const share::schema::O share::schema::ObSchemaGetterGuard& schema_guard, ObGlobalIndexTask* task) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == task || NULL == index_schema)) { @@ -1233,8 +1257,8 @@ int ObGlobalIndexBuilder::do_build_single_replica( ObGlobalIndexTask* task, const share::schema::ObTableSchema* index_schema, const int64_t snapshot) { int ret = OB_SUCCESS; - ObRootService* root_service = NULL; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + ObRootService *root_service = NULL; + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(NULL == task || NULL == index_schema)) { @@ -1263,7 +1287,7 @@ int ObGlobalIndexBuilder::do_build_single_replica( int ObGlobalIndexBuilder::try_build_single_replica(ObGlobalIndexTask* task) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == task)) { @@ -1327,7 +1351,7 @@ int ObGlobalIndexBuilder::check_partition_copy_replica_stat(int64_t& major_sstab int ret = OB_SUCCESS; const ObPartitionReplica* leader_replica = NULL; ObPartitionKey pkey; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(NULL == rpc_proxy_)) { @@ -1395,7 +1419,7 @@ int ObGlobalIndexBuilder::build_replica_sstable_copy_task( PartitionSSTableBuildStat& part_sstable_build_stat, share::ObPartitionInfo& partition_info) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (CMRS_IDLE != part_sstable_build_stat.copy_multi_replica_stat_) { @@ -1480,7 +1504,7 @@ int ObGlobalIndexBuilder::drive_this_copy_multi_replica(const share::schema::ObT { int ret = OB_SUCCESS; UNUSED(schema_guard); - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == task || NULL == index_schema)) { @@ -1589,7 +1613,7 @@ int ObGlobalIndexBuilder::launch_new_copy_multi_replica(const share::schema::ObT { int ret = OB_SUCCESS; SpinWLockGuard item_guard(task->lock_); - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == index_schema || NULL == task)) { @@ -1608,7 +1632,7 @@ int ObGlobalIndexBuilder::generate_task_partition_sstable_array(share::schema::O { int ret = OB_SUCCESS; UNUSED(schema_guard); - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == index_schema || NULL == task)) { @@ -1662,7 +1686,7 @@ int ObGlobalIndexBuilder::build_task_partition_sstable_stat(share::schema::ObSch { int ret = OB_SUCCESS; common::ObArray partition_server_array; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == index_schema || NULL == task)) { @@ -1679,7 +1703,7 @@ int ObGlobalIndexBuilder::try_copy_multi_replica(ObGlobalIndexTask* task) { int ret = OB_SUCCESS; DEBUG_SYNC(BEFORE_COPY_GLOBAL_INDEX); - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == task)) { @@ -1751,7 +1775,7 @@ int ObGlobalIndexBuilder::send_check_unique_index_rpc(const share::schema::ObTab ObGlobalIndexTask* task, const common::ObPartitionKey& pkey, const share::ObPartitionReplica* replica) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(NULL == index_schema || NULL == task || !pkey.is_valid() || NULL == replica)) { @@ -1779,7 +1803,7 @@ int ObGlobalIndexBuilder::drive_this_unique_index_calc_checksum(const share::sch int ret = OB_SUCCESS; UNUSED(data_schema); UNUSED(schema_guard); - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == task || NULL == index_schema)) { @@ -1886,7 +1910,7 @@ int ObGlobalIndexBuilder::launch_new_unique_index_check(const share::schema::ObT { int ret = OB_SUCCESS; UNUSED(schema_guard); - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == task || NULL == index_schema)) { @@ -1907,7 +1931,7 @@ int ObGlobalIndexBuilder::drive_this_unique_index_check(const share::schema::ObT { int ret = OB_SUCCESS; UNUSED(schema_guard); - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == task)) { @@ -1987,8 +2011,8 @@ int ObGlobalIndexBuilder::get_checksum_calculation_snapshot(ObGlobalIndexTask* t { int ret = OB_SUCCESS; common::ObArray partition_leader_array; - const share::schema::ObTableSchema* data_schema = NULL; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + const share::schema::ObTableSchema *data_schema = NULL; + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == index_schema || NULL == task)) { @@ -2023,7 +2047,7 @@ int ObGlobalIndexBuilder::build_task_partition_col_checksum_stat(const share::sc int ret = OB_SUCCESS; SpinWLockGuard item_guard(task->lock_); - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(NULL == task || NULL == index_schema || NULL == data_schema)) { @@ -2163,7 +2187,7 @@ int ObGlobalIndexBuilder::send_col_checksum_calc_rpc(const share::schema::ObTabl const common::ObPartitionKey& pkey, const share::ObPartitionReplica* replica) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(NULL == index_schema || schema_version < 0 || checksum_snapshot < 0 || !pkey.is_valid() || @@ -2192,7 +2216,7 @@ int ObGlobalIndexBuilder::send_checksum_calculation_request( { int ret = OB_SUCCESS; uint64_t execution_id = OB_INVALID_ID; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(NULL == task || NULL == index_schema)) { @@ -2277,7 +2301,7 @@ int ObGlobalIndexBuilder::do_checksum_calculation(ObGlobalIndexTask* task, const share::schema::ObTableSchema* index_schema, const share::schema::ObTableSchema* data_schema) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(NULL == task || NULL == index_schema || NULL == data_schema)) { @@ -2299,7 +2323,7 @@ int ObGlobalIndexBuilder::launch_new_unique_index_calc_checksum(const share::sch ObGlobalIndexTask* task) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == task || NULL == index_schema || NULL == data_schema)) { @@ -2333,7 +2357,7 @@ int ObGlobalIndexBuilder::try_unique_index_calc_checksum(ObGlobalIndexTask* task { int ret = OB_SUCCESS; DEBUG_SYNC(BEFORE_CHECK_GLOBAL_UNIQUE_INDEX); - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == task)) { @@ -2410,7 +2434,7 @@ int ObGlobalIndexBuilder::try_unique_index_calc_checksum(ObGlobalIndexTask* task int ObGlobalIndexBuilder::try_unique_index_check(ObGlobalIndexTask* task) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == task)) { @@ -2487,7 +2511,7 @@ int ObGlobalIndexBuilder::pick_data_replica( share::schema::ObSchemaGetterGuard schema_guard; const share::schema::ObTableSchema* data_schema = nullptr; common::ObPartitionKey phy_part_key; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(!pkey.is_valid())) { @@ -2569,7 +2593,7 @@ int ObGlobalIndexBuilder::pick_index_replica( const common::ObPartitionKey& pkey, const common::ObIArray& previous, common::ObAddr& server) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(!pkey.is_valid())) { @@ -2641,7 +2665,7 @@ static bool is_ob_sql_errno(int err) int ObGlobalIndexBuilder::on_build_single_replica_reply(const uint64_t index_table_id, int64_t snapshot, int ret_code) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(index_table_id == OB_INVALID_ID || snapshot <= 0)) { @@ -2694,7 +2718,7 @@ int ObGlobalIndexBuilder::on_copy_multi_replica_reply(const ObRebalanceTask& reb { int ret = OB_SUCCESS; share::schema::ObSchemaGetterGuard schema_guard; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(NULL == schema_service_)) { @@ -2761,7 +2785,7 @@ int ObGlobalIndexBuilder::on_col_checksum_calculation_reply( const uint64_t index_table_id, const common::ObPartitionKey& pkey, const int ret_code) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(OB_INVALID_ID == index_table_id || !pkey.is_valid())) { @@ -2803,7 +2827,7 @@ int ObGlobalIndexBuilder::on_check_unique_index_reply( const ObPartitionKey& pkey, const int ret_code, const bool is_unique) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(!pkey.is_valid())) { @@ -2849,7 +2873,7 @@ int ObGlobalIndexBuilder::send_check_unique_index_request( const share::schema::ObTableSchema* index_schema, ObGlobalIndexTask* task) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == task)) { @@ -2892,7 +2916,7 @@ int ObGlobalIndexBuilder::build_task_partition_unique_stat( const share::schema::ObTableSchema* schema, ObGlobalIndexTask* task) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == schema || NULL == task)) { @@ -2936,7 +2960,7 @@ int ObGlobalIndexBuilder::build_task_partition_unique_stat( int ObGlobalIndexBuilder::clear_intermediate_result(ObGlobalIndexTask* task) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(NULL == task)) { @@ -3006,7 +3030,7 @@ int ObGlobalIndexBuilder::try_update_index_status_in_schema( const share::schema::ObTableSchema* index_schema, ObGlobalIndexTask* task, const ObIndexStatus new_status) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == task || NULL == index_schema)) { @@ -3034,7 +3058,7 @@ int ObGlobalIndexBuilder::try_update_index_status_in_schema( int ObGlobalIndexBuilder::try_handle_index_build_take_effect(ObGlobalIndexTask* task) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == task)) { @@ -3106,7 +3130,7 @@ int ObGlobalIndexBuilder::try_handle_index_build_take_effect(ObGlobalIndexTask* int ObGlobalIndexBuilder::try_handle_index_build_failed(ObGlobalIndexTask* task) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == task)) { @@ -3184,7 +3208,7 @@ int ObGlobalIndexBuilder::try_handle_index_build_failed(ObGlobalIndexTask* task) int ObGlobalIndexBuilder::try_handle_index_build_finish(ObGlobalIndexTask* task) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); } else if (OB_UNLIKELY(NULL == task)) { @@ -3271,7 +3295,7 @@ int ObGlobalIndexBuilder::try_handle_index_build_finish(ObGlobalIndexTask* task) int ObGlobalIndexBuilder::try_drive(ObGlobalIndexTask* task) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); } else if (OB_UNLIKELY(NULL == task)) { @@ -3333,7 +3357,7 @@ int ObGlobalIndexBuilder::try_drive(ObGlobalIndexTask* task) int ObGlobalIndexBuilder::get_task_count_in_lock(int64_t& task_cnt) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; } else { SpinRLockGuard guard(task_map_lock_); @@ -3344,7 +3368,7 @@ int ObGlobalIndexBuilder::get_task_count_in_lock(int64_t& task_cnt) void ObGlobalIndexBuilder::run3() { - if (OB_UNLIKELY(!inited_ || !loaded_)) { + if (OB_UNLIKELY(!inited_)) { int ret = OB_NOT_INIT; LOG_WARN("ObGlobalIndexBuilder not init", K(ret)); idling_.idle(10 * 1000L * 1000L); diff --git a/src/rootserver/ob_index_builder.cpp b/src/rootserver/ob_index_builder.cpp index 7d1a63a1d207b756cf9b1124cdd9dcf83f0750d8..7ad269534fd8bcfda02894c1741bc2f3d8d412b1 100644 --- a/src/rootserver/ob_index_builder.cpp +++ b/src/rootserver/ob_index_builder.cpp @@ -34,6 +34,7 @@ #include "share/config/ob_server_config.h" #include "share/ob_index_builder_util.h" #include "observer/ob_server_struct.h" +#include "observer/ob_service.h" #include "sql/resolver/ddl/ob_ddl_resolver.h" #include "ob_server_manager.h" #include "ob_zone_manager.h" @@ -1025,6 +1026,9 @@ int ObRSBuildIndexTask::generate_index_build_stat_record() } else if (OB_FAIL(ddl_service_->get_sql_proxy().write(sql_string.ptr(), affected_rows))) { LOG_WARN("fail to execute sql", K(ret)); } +#ifdef ERRSIM + ret = E(EventTable::EN_SUBMIT_INDEX_TASK_ERROR_AFTER_STAT_RECORD) OB_SUCCESS; +#endif return ret; } @@ -1077,7 +1081,12 @@ int ObRSBuildIndexScheduler::init(ObDDLService* ddl_service) int ObRSBuildIndexScheduler::push_task(ObRSBuildIndexTask& task) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!is_inited_)) { +#ifdef ERRSIM + ret = E(EventTable::EN_SUBMIT_INDEX_TASK_ERROR_BEFORE_STAT_RECORD) OB_SUCCESS; +#endif + if (OB_SUCCESS != ret) { + LOG_INFO("errsim mock push local index task fail", K(ret)); + } else if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObRSBuildIndexScheduler has not been inited", K(ret)); } else if (is_stop_) { @@ -1323,8 +1332,20 @@ int ObIndexBuilder::submit_build_global_index_task(const ObTableSchema& index_sc ret = OB_EAGAIN; } } - if (OB_FAIL(ret)) { - FORWARD_USER_ERROR(ret, "create global index failed, please drop and create another one"); + // submit retry task if retryable, otherwise report error + if (OB_EAGAIN == ret || OB_ALLOCATE_MEMORY_FAILED == ret) { + int record_ret = ret; + if (OB_FAIL(GCTX.ob_service_->submit_retry_ghost_index_task(inner_index_schema->get_table_id()))) { + LOG_WARN("fail to submit retry ghost index task", K(ret)); + ret = OB_TIMEOUT; + } else { + LOG_INFO("submit build global index task fail but fast retryable", + K(record_ret), + K(inner_index_schema->get_table_id())); + } + } else if (OB_FAIL(ret)) { + LOG_WARN("submit global index task fail, mark it as timeout", K(ret)); + ret = OB_TIMEOUT; } } return ret; @@ -1413,7 +1434,17 @@ int ObIndexBuilder::submit_build_local_index_task(const ObTableSchema& index_sch LOG_WARN("fail to add task into ObRSBuildIndexScheduler", K(ret)); } - if (OB_FAIL(ret)) { + // submit retry task if retryable, otherwise report error + if (OB_EAGAIN == ret || OB_ALLOCATE_MEMORY_FAILED == ret) { + int record_ret = ret; + if (OB_FAIL(GCTX.ob_service_->submit_retry_ghost_index_task(index_schema.get_table_id()))) { + LOG_WARN("fail to submit retry ghost index task", K(ret)); + ret = OB_TIMEOUT; + } else { + LOG_INFO( + "submit build local index task fail but fast retryable", K(record_ret), K(index_schema.get_table_id())); + } + } else if (OB_FAIL(ret)) { obrpc::ObUpdateIndexStatusArg arg; ObSchemaGetterGuard schema_guard; const ObTableSchema* new_index_schema = NULL; @@ -1430,8 +1461,8 @@ int ObIndexBuilder::submit_build_local_index_task(const ObTableSchema& index_sch } else if (OB_FAIL(schema_guard.get_table_schema(arg.index_table_id_, new_index_schema))) { LOG_WARN("fail to get table schema", K(ret), K(arg.index_table_id_)); } else if (OB_ISNULL(new_index_schema)) { - ret = OB_SUCCESS; LOG_WARN("can not find this index schema", K(ret), K(arg.index_table_id_)); + ret = OB_SUCCESS; } else { LOG_INFO("update index status success", LITERAL_K(INDEX_STATUS_INDEX_ERROR), "index_schema", *new_index_schema); } diff --git a/src/rootserver/ob_root_service.cpp b/src/rootserver/ob_root_service.cpp index c46e8e69450559aa538fc0d6459580b96ae82e96..3dd4e6e2661f559e0a0c4b0326890abc0c9ca66e 100644 --- a/src/rootserver/ob_root_service.cpp +++ b/src/rootserver/ob_root_service.cpp @@ -4918,7 +4918,25 @@ int ObRootService::rebuild_index(const obrpc::ObRebuildIndexArg& arg, obrpc::ObA return ret; } -int ObRootService::flashback_index(const ObFlashBackIndexArg& arg) +int ObRootService::submit_build_index_task(const share::schema::ObTableSchema *index_schema) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(index_schema) || !index_schema->is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), KPC(index_schema)); + } else if (index_schema->is_global_index_table() && + OB_FAIL(global_index_builder_.submit_build_global_index_task(index_schema))) { + LOG_WARN("fail to submit build global index task", K(ret), K(*index_schema)); + } else if (index_schema->is_index_local_storage()) { + ObIndexBuilder index_builder(ddl_service_); + if (OB_FAIL(index_builder.submit_build_local_index_task(*index_schema))) { + LOG_WARN("fail to submit build local index task", K(ret), K(*index_schema)); + } + } + return ret; +} + +int ObRootService::flashback_index(const ObFlashBackIndexArg &arg) { int ret = OB_SUCCESS; if (!inited_) { diff --git a/src/rootserver/ob_root_service.h b/src/rootserver/ob_root_service.h index 250387dd0b265e653ee9f5bad8bafebc2cbc4ec4..2445800c0384e9e6f45661cf3e92605f1d1f151e 100644 --- a/src/rootserver/ob_root_service.h +++ b/src/rootserver/ob_root_service.h @@ -964,6 +964,7 @@ public: int drop_tablegroup(const obrpc::ObDropTablegroupArg& arg); int drop_index(const obrpc::ObDropIndexArg& arg); int rebuild_index(const obrpc::ObRebuildIndexArg& arg, obrpc::ObAlterTableRes& res); + int submit_build_index_task(const share::schema::ObTableSchema *index_schema); // the interface only for switchover: execute skip check enable_ddl int force_drop_index(const obrpc::ObDropIndexArg& arg); int flashback_index(const obrpc::ObFlashBackIndexArg& arg); diff --git a/src/share/ob_ddl_task_executor.h b/src/share/ob_ddl_task_executor.h index dcec292abc1204e480e528a456a116906c11ed31..633cf9317d25bf794ee731d3aaaa7eeba5723f1e 100644 --- a/src/share/ob_ddl_task_executor.h +++ b/src/share/ob_ddl_task_executor.h @@ -31,6 +31,7 @@ enum ObIDDLTaskType { DDL_TASK_SCHEDULE_BUILD_INDEX = 1, DDL_TASK_RS_BUILD_INDEX = 2, DDL_TASK_REFRESH_MEMORY_PERCENTAGE = 3, + DDL_TASK_RETRY_GHOST_INDEX = 4, }; class ObIDDLTask : public common::ObDLinkBase { diff --git a/src/share/ob_debug_sync_point.h b/src/share/ob_debug_sync_point.h index d409348a73bd9484b7992f5775d7eed71da733a0..fe4a6258430749bb60ddfedfe5f288ad60894645 100644 --- a/src/share/ob_debug_sync_point.h +++ b/src/share/ob_debug_sync_point.h @@ -292,6 +292,7 @@ class ObString; ACT(BEFORE_CHECK_BACKUP_TASK_DATA_AVAILABLE, ) \ ACT(BACKUP_BACKUPPIECE_AFTER_SCHEDULE, ) \ ACT(FOLLOWER_BEFORE_UPDATE_RESTORE_FLAG_RESTORE_LOG, ) \ + ACT(BEFORE_GLOBAL_INDEX_BUILDER_MOVE_TASK, ) \ ACT(MAX_DEBUG_SYNC_POINT, ) DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF); diff --git a/src/share/ob_thread_define.h b/src/share/ob_thread_define.h index 31a59339d3af2ba1f1ff41d365a6a1496978d5bc..370cb2ebf857a5062c9cdda0b125f233feaba5d8 100644 --- a/src/share/ob_thread_define.h +++ b/src/share/ob_thread_define.h @@ -72,6 +72,9 @@ TG_DEF(DDLTaskExecutor1, DDLTaskExecutor1, "", TG_STATIC, OB_THREAD_POOL, TG_DEF(DDLTaskExecutor2, DDLTaskExecutor2, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair( storage::ObBuildIndexScheduler::DEFAULT_THREAD_CNT, storage::ObBuildIndexScheduler::MINI_MODE_THREAD_CNT)) +TG_DEF(DDLTaskExecutor3, DDLTaskExecutor3, "", TG_STATIC, OB_THREAD_POOL, + ThreadCountPair(storage::ObRetryGhostIndexScheduler::DEFAULT_THREAD_CNT, + storage::ObRetryGhostIndexScheduler::DEFAULT_THREAD_CNT)) TG_DEF(FetchLogEngine, FetchLogEngine, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(clog::CLOG_FETCH_LOG_THREAD_COUNT, clog::MINI_MODE_CLOG_FETCH_LOG_THREAD_COUNT), clog::CLOG_FETCH_LOG_TASK_QUEUE_SIZE) @@ -137,4 +140,5 @@ TG_DEF(LogMysqlPool, LogMysqlPool, "", TG_STATIC, TIMER) TG_DEF(TblCliSqlPool, TblCliSqlPool, "", TG_STATIC, TIMER) TG_DEF(QueryExecCtxGC, QueryExecCtxGC, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1)) TG_DEF(DtlDfc, DtlDfc, "", TG_STATIC, TIMER) +TG_DEF(DDLRetryGhostIndex, DDLRetryGhostIndex, "", TG_STATIC, TIMER) #endif diff --git a/src/storage/ob_build_index_scheduler.cpp b/src/storage/ob_build_index_scheduler.cpp index 0920011ef474455ff81d7cbbc35cc988e8b5df8f..d149fc0d9f021afa4654c72d2f5c8af2243eea0d 100644 --- a/src/storage/ob_build_index_scheduler.cpp +++ b/src/storage/ob_build_index_scheduler.cpp @@ -38,6 +38,7 @@ #include "observer/ob_service.h" #include "observer/ob_server_struct.h" #include "rootserver/ob_index_builder.h" +#include "rootserver/ob_root_service.h" using namespace oceanbase::storage; using namespace oceanbase::common; @@ -220,51 +221,8 @@ int ObBuildIndexBaseTask::check_partition_split_finish(const ObPartitionKey& pke return ret; } -ObTenantDDLCheckSchemaTask::ObTenantDDLCheckSchemaTask() - : ObBuildIndexBaseTask(DDL_TASK_CHECK_SCHEMA), base_version_(-1), refreshed_version_(-1), tenant_id_(OB_INVALID_ID) -{} - -ObTenantDDLCheckSchemaTask::~ObTenantDDLCheckSchemaTask() -{} - -int ObTenantDDLCheckSchemaTask::init( - const uint64_t tenant_id, const int64_t base_version, const int64_t refreshed_version) -{ - int ret = OB_SUCCESS; - if (base_version < 0 || refreshed_version < 0 || OB_INVALID_ID == tenant_id) { - ret = OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid argument", K(ret), K(base_version), K(refreshed_version), K(tenant_id)); - } else { - base_version_ = base_version; - refreshed_version_ = refreshed_version; - task_id_.init(GCTX.self_addr_); - tenant_id_ = tenant_id; - is_inited_ = true; - } - return ret; -} - -bool ObTenantDDLCheckSchemaTask::operator==(const ObIDDLTask& other) const -{ - bool is_equal = false; - if (get_type() == other.get_type()) { - const ObTenantDDLCheckSchemaTask& task = static_cast(other); - is_equal = base_version_ == task.base_version_ && refreshed_version_ == task.refreshed_version_; - } - return is_equal; -} - -int64_t ObTenantDDLCheckSchemaTask::hash() const -{ - uint64_t hash_val = 0; - hash_val = murmurhash(&tenant_id_, sizeof(tenant_id_), hash_val); - hash_val = murmurhash(&base_version_, sizeof(base_version_), hash_val); - hash_val = murmurhash(&refreshed_version_, sizeof(refreshed_version_), hash_val); - return hash_val; -} - -int ObTenantDDLCheckSchemaTask::find_build_index_partitions( - const ObTableSchema* index_schema, ObSchemaGetterGuard& guard, common::ObIArray& partition_keys) +int ObBuildIndexBaseTask::find_build_index_partitions( + const ObTableSchema *index_schema, ObSchemaGetterGuard &guard, common::ObIArray &partition_keys) { int ret = OB_SUCCESS; const ObTableSchema* table_schema = NULL; @@ -336,8 +294,8 @@ int ObTenantDDLCheckSchemaTask::find_build_index_partitions( return ret; } -int ObTenantDDLCheckSchemaTask::create_index_partition_table_store( - const common::ObPartitionKey& pkey, const uint64_t index_id, const int64_t schema_version) +int ObBuildIndexBaseTask::create_index_partition_table_store( + const common::ObPartitionKey &pkey, const uint64_t index_id, const int64_t schema_version) { int ret = OB_SUCCESS; ObIPartitionGroupGuard part_guard; @@ -368,7 +326,7 @@ int ObTenantDDLCheckSchemaTask::create_index_partition_table_store( return ret; } -int ObTenantDDLCheckSchemaTask::generate_schedule_index_task(const common::ObPartitionKey& pkey, +int ObBuildIndexBaseTask::generate_schedule_index_task(const common::ObPartitionKey &pkey, const uint64_t index_id, const int64_t schema_version, const bool is_unique_index) { int ret = OB_SUCCESS; @@ -397,7 +355,50 @@ int ObTenantDDLCheckSchemaTask::generate_schedule_index_task(const common::ObPar return ret; } -int ObTenantDDLCheckSchemaTask::get_candidate_tables(ObIArray& table_ids) +ObTenantDDLCheckSchemaTask::ObTenantDDLCheckSchemaTask() + : ObBuildIndexBaseTask(DDL_TASK_CHECK_SCHEMA), base_version_(-1), refreshed_version_(-1), tenant_id_(OB_INVALID_ID) +{} + +ObTenantDDLCheckSchemaTask::~ObTenantDDLCheckSchemaTask() +{} + +int ObTenantDDLCheckSchemaTask::init( + const uint64_t tenant_id, const int64_t base_version, const int64_t refreshed_version) +{ + int ret = OB_SUCCESS; + if (base_version < 0 || refreshed_version < 0 || OB_INVALID_ID == tenant_id) { + ret = OB_INVALID_ARGUMENT; + STORAGE_LOG(WARN, "invalid argument", K(ret), K(base_version), K(refreshed_version), K(tenant_id)); + } else { + base_version_ = base_version; + refreshed_version_ = refreshed_version; + task_id_.init(GCTX.self_addr_); + tenant_id_ = tenant_id; + is_inited_ = true; + } + return ret; +} + +bool ObTenantDDLCheckSchemaTask::operator==(const ObIDDLTask &other) const +{ + bool is_equal = false; + if (get_type() == other.get_type()) { + const ObTenantDDLCheckSchemaTask &task = static_cast(other); + is_equal = base_version_ == task.base_version_ && refreshed_version_ == task.refreshed_version_; + } + return is_equal; +} + +int64_t ObTenantDDLCheckSchemaTask::hash() const +{ + uint64_t hash_val = 0; + hash_val = murmurhash(&tenant_id_, sizeof(tenant_id_), hash_val); + hash_val = murmurhash(&base_version_, sizeof(base_version_), hash_val); + hash_val = murmurhash(&refreshed_version_, sizeof(refreshed_version_), hash_val); + return hash_val; +} + +int ObTenantDDLCheckSchemaTask::get_candidate_tables(ObIArray &table_ids) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { @@ -1730,3 +1731,227 @@ void ObBuildIndexScheduler::destroy() task_executor_.destroy(); is_inited_ = false; } + +ObRetryGhostIndexTask::ObRetryGhostIndexTask() + : ObBuildIndexBaseTask(DDL_TASK_RETRY_GHOST_INDEX), index_id_(OB_INVALID_ID), last_log_timestamp_(0) +{} + +ObRetryGhostIndexTask::~ObRetryGhostIndexTask() +{} + +int ObRetryGhostIndexTask::init(const uint64_t index_id) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + LOG_WARN("ObRetryGhostIndexTask has already been inited", K(ret)); + } else if (OB_UNLIKELY(OB_INVALID_ID == index_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arguments", K(ret), K(index_id)); + } else { + index_id_ = index_id; + task_id_.init(GCONF.self_addr_); + is_inited_ = true; + } + return ret; +} + +int64_t ObRetryGhostIndexTask::hash() const +{ + return index_id_; +} + +bool ObRetryGhostIndexTask::operator==(const ObIDDLTask &other) const +{ + bool is_equal = false; + if (get_type() == other.get_type()) { + const ObRetryGhostIndexTask &other_task = static_cast(other); + is_equal = index_id_ == other_task.index_id_; + } + return is_equal; +} + +ObRetryGhostIndexTask *ObRetryGhostIndexTask::deep_copy(char *buf, const int64_t size) const +{ + int ret = OB_SUCCESS; + ObRetryGhostIndexTask *task = NULL; + if (OB_ISNULL(buf) || size < sizeof(*this)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), KP(buf), K(size)); + } else { + task = new (buf) ObRetryGhostIndexTask(); + *task = *this; + } + return task; +} + +int ObRetryGhostIndexTask::process() +{ + int ret = OB_SUCCESS; + const ObTableSchema *index_schema = nullptr; + ObSchemaGetterGuard schema_guard; + ObAddr rs_addr; + if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_full_schema_guard( + extract_tenant_id(index_id_), schema_guard))) { + STORAGE_LOG(WARN, "fail to get schema guard", K(ret), K(index_id_)); + } else if (OB_FAIL(schema_guard.get_table_schema(index_id_, index_schema))) { + STORAGE_LOG(WARN, "fail to get table schema", K(ret), K(index_id_)); + } else if (OB_ISNULL(index_schema)) { + STORAGE_LOG(INFO, "index schema is deleted, skip it"); + } else if (index_schema->is_index_local_storage() && OB_FAIL(retry_local_index(index_schema, schema_guard))) { + STORAGE_LOG(WARN, "fail to retry ghost local index", K(ret), K(*index_schema)); + } else if (OB_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_addr))) { + STORAGE_LOG(WARN, "fail to get rootservice address", K(ret)); + } else if (rs_addr != GCTX.self_addr_) { + STORAGE_LOG(INFO, "rs is not on this observer, skip"); + } else if (NULL == GCTX.root_service_) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "root service is null", K(ret)); + } else if (OB_FAIL(GCTX.root_service_->submit_build_index_task(index_schema))) { + STORAGE_LOG(WARN, "fail to submit build index task", K(ret)); + } + return ret; +} + +int ObRetryGhostIndexTask::retry_local_index(const ObTableSchema *index_schema, ObSchemaGetterGuard &schema_guard) +{ + int ret = OB_SUCCESS; + const ObTableSchema *data_table_schema = nullptr; + ObArray partition_keys; + if (OB_FAIL(find_build_index_partitions(index_schema, schema_guard, partition_keys))) { + if (OB_EAGAIN != ret) { + STORAGE_LOG(WARN, "fail to check need build index", K(ret)); + } + } else if (partition_keys.count() > 0) { + if (OB_FAIL(schema_guard.get_table_schema(index_schema->get_data_table_id(), data_table_schema))) { + STORAGE_LOG(WARN, "fail to get data table schema", K(ret)); + } else if (OB_ISNULL(data_table_schema)) { + ret = OB_TABLE_NOT_EXIST; + STORAGE_LOG(WARN, "schema error, data table not exist while index table exist", K(ret)); + } else { + const int64_t schema_version = + std::max(index_schema->get_schema_version(), data_table_schema->get_schema_version()); + for (int64_t i = 0; OB_SUCC(ret) && i < partition_keys.count(); ++i) { + if (OB_FAIL(generate_schedule_index_task( + partition_keys.at(i), index_schema->get_table_id(), schema_version, index_schema->is_unique_index()))) { + STORAGE_LOG(WARN, "fail to generate schedule build index task", K(ret)); + } + } + } + } + return ret; +} + +ObRetryGhostIndexScheduler::ObRetryGhostIndexScheduler() + : scan_ghost_index_task_(), is_inited_(false), task_executor_(), is_stop_(false) +{} + +ObRetryGhostIndexScheduler::~ObRetryGhostIndexScheduler() +{} + +int ObRetryGhostIndexScheduler::init() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + LOG_WARN("ObRetryGhostIndexScheduler has been inited twice", K(ret)); + } else if (OB_FAIL(TG_START(lib::TGDefIDs::DDLRetryGhostIndex))) { + LOG_WARN("fail to init timer for DDLRetryGhostIndex", K(ret)); + } else if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::DDLRetryGhostIndex, + scan_ghost_index_task_, + DEFAULT_RETRY_GHOST_INDEX_INTERVAL_US, + true /*repeat*/))) { + LOG_WARN("fail to schedule scan_ghost_index_task", K(ret)); + } else if (OB_FAIL(task_executor_.init(DEFAULT_BUCKET_NUM, lib::TGDefIDs::DDLTaskExecutor3))) { + LOG_WARN("fail to init task executor", K(ret)); + } else { + is_inited_ = true; + } + return ret; +} + +int ObRetryGhostIndexScheduler::push_task(ObRetryGhostIndexTask &task) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("ObRetryGhostIndexScheduler has not been inited", K(ret)); + } else if (is_stop_) { + // do nothing + } else if (OB_FAIL(task_executor_.push_task(task))) { + if (OB_LIKELY(OB_ENTRY_EXIST == ret)) { + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to push back task", K(ret)); + } + } + return ret; +} + +ObRetryGhostIndexScheduler &ObRetryGhostIndexScheduler::get_instance() +{ + static ObRetryGhostIndexScheduler instance; + return instance; +} + +void ObRetryGhostIndexScheduler::stop() +{ + is_stop_ = true; + task_executor_.stop(); +} + +void ObRetryGhostIndexScheduler::wait() +{ + task_executor_.wait(); +} + +void ObRetryGhostIndexScheduler::destroy() +{ + is_inited_ = false; + stop(); + wait(); + task_executor_.destroy(); +} + +void ObScanGhostIndexTask::runTimerTask() +{ + int ret = OB_SUCCESS; + ObSchemaGetterGuard schema_guard; + ObArray tenant_ids; + ObRetryGhostIndexScheduler &scheduler = ObRetryGhostIndexScheduler::get_instance(); + if (GCTX.is_standby_cluster()) { + // this retry task should not run on standby server + } else if (OB_FAIL( + ObMultiVersionSchemaService::get_instance().get_tenant_full_schema_guard(OB_SYS_TENANT_ID, schema_guard))) { + LOG_WARN("fail to get schema guard", K(ret)); + } else if (OB_FAIL(schema_guard.get_tenant_ids(tenant_ids))) { + LOG_WARN("fail to get tenant ids", K(ret)); + } else { + ObArray table_schemas; + for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); ++i) { + const uint64_t tenant_id = tenant_ids.at(i); + if (OB_MAX_RESERVED_TENANT_ID >= tenant_id) { + // do nothing for reserved tenant + } else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_full_schema_guard( + tenant_id, schema_guard))) { + LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id)); + } else if (OB_FAIL(schema_guard.get_table_schemas_in_tenant(tenant_id, table_schemas))) { + LOG_WARN("fail to get table schemas in tenant", K(ret)); + } else { + for (int64_t j = 0; OB_SUCC(ret) && j < table_schemas.count(); ++j) { + const ObSimpleTableSchemaV2 *simple_schema = table_schemas.at(j); + if (simple_schema->is_index_table() && simple_schema->is_unavailable_index()) { + ObRetryGhostIndexTask task; + if (OB_FAIL(task.init(simple_schema->get_table_id()))) { + LOG_WARN("fail to init ObRetryGhostIndexTask", K(ret), K(*simple_schema)); + } else if (OB_FAIL(scheduler.push_task(task))) { + LOG_WARN("fail to push ObRetryGhostIndexTask to scheduler", K(ret), K(task)); + } else { + LOG_INFO("find unavailable index table", "index_table_id", simple_schema->get_table_id()); + } + } + } + } + } + } +} diff --git a/src/storage/ob_build_index_scheduler.h b/src/storage/ob_build_index_scheduler.h index a9fb10ab8a49d6cc4048de4ac4bbad26ad959234..ff75ca6fca1041d978d5f0d1c68067a5134fae52 100644 --- a/src/storage/ob_build_index_scheduler.h +++ b/src/storage/ob_build_index_scheduler.h @@ -39,11 +39,17 @@ public: virtual ~ObBuildIndexBaseTask(); static int report_index_status(const uint64_t index_table_id, const int64_t partition_id, const share::schema::ObIndexStatus index_status, const int build_index_ret, const ObRole role); + static int generate_schedule_index_task(const common::ObPartitionKey &pkey, const uint64_t index_id, + const int64_t schema_version, const bool is_unique_index); protected: - int check_partition_need_build_index(const common::ObPartitionKey& pkey, - const share::schema::ObTableSchema& index_schema, const share::schema::ObTableSchema& data_table_schema, - storage::ObIPartitionGroupGuard& guard, bool& need_build); + int check_partition_need_build_index(const common::ObPartitionKey &pkey, + const share::schema::ObTableSchema &index_schema, const share::schema::ObTableSchema &data_table_schema, + storage::ObIPartitionGroupGuard &guard, bool &need_build); + int find_build_index_partitions(const share::schema::ObTableSchema *index_schema, + share::schema::ObSchemaGetterGuard &guard, common::ObIArray &partition_keys); + static int create_index_partition_table_store( + const common::ObPartitionKey &pkey, const uint64_t index_id, const int64_t schema_version); private: int check_partition_exist_in_current_server(const share::schema::ObTableSchema& index_schema, @@ -72,16 +78,10 @@ public: } virtual ObIDDLTask* deep_copy(char* buf, const int64_t size) const override; TO_STRING_KV(K_(tenant_id), K_(base_version), K_(refreshed_version)); - static int generate_schedule_index_task(const common::ObPartitionKey& pkey, const uint64_t index_id, - const int64_t schema_version, const bool is_unique_index); private: - int find_build_index_partitions(const share::schema::ObTableSchema* index_schema, - share::schema::ObSchemaGetterGuard& guard, common::ObIArray& partition_keys); - int get_candidate_tables(common::ObIArray& table_ids); - static int create_index_partition_table_store( - const common::ObPartitionKey& pkey, const uint64_t index_id, const int64_t schema_version); - int get_candidate_tenants(common::ObIArray& tenant_ids); + int get_candidate_tables(common::ObIArray &table_ids); + int get_candidate_tenants(common::ObIArray &tenant_ids); int process_schedule_build_index_task(); int process_tenant_memory_task(); @@ -201,6 +201,70 @@ private: bool is_stop_; }; +class ObRetryGhostIndexTask : public ObBuildIndexBaseTask { +public: + ObRetryGhostIndexTask(); + virtual ~ObRetryGhostIndexTask(); + int init(const uint64_t index_id); + virtual int64_t hash() const; + virtual int process(); + virtual int64_t get_deep_copy_size() const + { + return sizeof(*this); + } + virtual ObRetryGhostIndexTask *deep_copy(char *buf, const int64_t size) const; + bool operator==(const ObIDDLTask &other) const; + TO_STRING_KV(K_(index_id)); + int64_t get_tenant_id() const + { + return extract_tenant_id(index_id_); + } + +private: + int retry_local_index( + const share::schema::ObTableSchema *index_schema, share::schema::ObSchemaGetterGuard &schema_guard); + +private: + uint64_t index_id_; + int64_t last_log_timestamp_; +}; + +class ObScanGhostIndexTask : public ObTimerTask { +public: + ObScanGhostIndexTask() = default; + virtual ~ObScanGhostIndexTask() = default; + void runTimerTask(); +}; + +class ObRetryGhostIndexScheduler { +public: + static const int64_t DEFAULT_THREAD_CNT = 1; + +public: + int init(); + static ObRetryGhostIndexScheduler &get_instance(); + int push_task(ObRetryGhostIndexTask &task); + void destroy(); + +private: + ObRetryGhostIndexScheduler(); + virtual ~ObRetryGhostIndexScheduler(); + void stop(); + void wait(); + +private: +#ifdef ERRSIM + static const int64_t DEFAULT_RETRY_GHOST_INDEX_INTERVAL_US = 1000L * 1000L * 10L; // 10s +#else + static const int64_t DEFAULT_RETRY_GHOST_INDEX_INTERVAL_US = 1000L * 1000L * 60L * 30L; // 30min +#endif + static const int64_t DEFAULT_BUCKET_NUM = 10000; + ObScanGhostIndexTask scan_ghost_index_task_; + bool is_inited_; + share::ObDDLTaskExecutor task_executor_; + bool is_stop_; +}; + } // end namespace storage } // end namespace oceanbase diff --git a/src/storage/ob_partition_service.cpp b/src/storage/ob_partition_service.cpp index 170d59f52373f4bccac386f7dbfe588bb1fd16c8..2ae6b5b3a2d3f2407c4ab20ced7f2b57a62b532a 100644 --- a/src/storage/ob_partition_service.cpp +++ b/src/storage/ob_partition_service.cpp @@ -397,6 +397,8 @@ int ObPartitionService::init(const blocksstable::ObStorageEnv& env, const ObAddr LOG_WARN("failed to init ObTableMgr", K(ret)); } else if (OB_FAIL(ObBuildIndexScheduler::get_instance().init())) { STORAGE_LOG(WARN, "fail to init ObBuildIndexScheduler", K(ret)); + } else if (OB_FAIL(ObRetryGhostIndexScheduler::get_instance().init())) { + LOG_WARN("fail to init ObRetryGhostIndexScheduler", K(ret)); } else if (OB_FAIL(ObFreezeInfoMgrWrapper::init(sql_proxy, remote_sql_proxy))) { STORAGE_LOG(WARN, "fail to init ObFreezeInfoSnapshotMgr", K(ret)); } else if (OB_FAIL(garbage_collector_.init(this, txs_, schema_service, GCTX.srv_rpc_proxy_, &sql_proxy, self_addr))) {