提交 c371a17f 编写于 作者: Y YoungYang0820 提交者: LINGuanRen

Add retry for building index

上级 fa8b8384
......@@ -501,6 +501,11 @@ public:
EN_CLOG_ILOG_MEMSTORE_ALLOC_MEMORY_FAILURE = 268,
EN_PREVENT_SYNC_REPORT = 360,
EN_PREVENT_ASYNC_REPORT = 361,
// DDL related 500-550
EN_SUBMIT_INDEX_TASK_ERROR_BEFORE_STAT_RECORD = 503,
EN_SUBMIT_INDEX_TASK_ERROR_AFTER_STAT_RECORD = 504,
EVENT_TABLE_MAX = SIZE_OF_EVENT_TABLE
};
......
......@@ -51,6 +51,7 @@
#include "storage/ob_partition_scheduler.h"
#include "sql/optimizer/ob_opt_est_cost.h"
#include "sql/optimizer/ob_join_order.h"
#include "storage/ob_build_index_scheduler.h"
#include "rootserver/ob_bootstrap.h"
#include "observer/ob_server.h"
#include "observer/ob_dump_task_generator.h"
......@@ -3676,6 +3677,22 @@ int ObService::pre_process_server_reply(const obrpc::ObPreProcessServerReplyArg&
return ret;
}
int ObService::submit_retry_ghost_index_task(const uint64_t index_id)
{
int ret = OB_SUCCESS;
ObRetryGhostIndexScheduler &scheduler = ObRetryGhostIndexScheduler::get_instance();
ObRetryGhostIndexTask task;
if (OB_INVALID_ID == index_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid index id", K(ret), K(index_id));
} else if (OB_FAIL(task.init(index_id))) {
LOG_WARN("fail to init ObRetryGhostIndexTask", K(ret), K(index_id));
} else if (OB_FAIL(scheduler.push_task(task))) {
LOG_WARN("fail to push ObRetryGhostIndexTask to scheduler", K(ret), K(task));
}
return ret;
}
int ObService::broadcast_rs_list(const ObRsListArg& arg)
{
int ret = OB_SUCCESS;
......
......@@ -304,6 +304,7 @@ public:
int submit_async_refresh_schema_task(const uint64_t tenant_id, const int64_t schema_version);
int renew_in_zone_hb(const share::ObInZoneHbRequest& arg, share::ObInZoneHbResponse& result);
int pre_process_server_reply(const obrpc::ObPreProcessServerReplyArg& arg);
int submit_retry_ghost_index_task(const uint64_t index_id);
private:
int register_self();
......
......@@ -34,6 +34,7 @@
#include "share/config/ob_server_config.h"
#include "share/ob_index_builder_util.h"
#include "observer/ob_server_struct.h"
#include "observer/ob_service.h"
#include "sql/resolver/ddl/ob_ddl_resolver.h"
#include "ob_server_manager.h"
#include "ob_zone_manager.h"
......@@ -1025,6 +1026,9 @@ int ObRSBuildIndexTask::generate_index_build_stat_record()
} else if (OB_FAIL(ddl_service_->get_sql_proxy().write(sql_string.ptr(), affected_rows))) {
LOG_WARN("fail to execute sql", K(ret));
}
#ifdef ERRSIM
ret = E(EventTable::EN_SUBMIT_INDEX_TASK_ERROR_AFTER_STAT_RECORD) OB_SUCCESS;
#endif
return ret;
}
......@@ -1077,7 +1081,12 @@ int ObRSBuildIndexScheduler::init(ObDDLService* ddl_service)
int ObRSBuildIndexScheduler::push_task(ObRSBuildIndexTask& task)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
#ifdef ERRSIM
ret = E(EventTable::EN_SUBMIT_INDEX_TASK_ERROR_BEFORE_STAT_RECORD) OB_SUCCESS;
#endif
if (OB_SUCCESS != ret) {
LOG_INFO("errsim mock push local index task fail", K(ret));
} else if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObRSBuildIndexScheduler has not been inited", K(ret));
} else if (is_stop_) {
......@@ -1323,8 +1332,20 @@ int ObIndexBuilder::submit_build_global_index_task(const ObTableSchema& index_sc
ret = OB_EAGAIN;
}
}
if (OB_FAIL(ret)) {
FORWARD_USER_ERROR(ret, "create global index failed, please drop and create another one");
// submit retry task if retryable, otherwise report error
if (OB_EAGAIN == ret || OB_ALLOCATE_MEMORY_FAILED == ret) {
int record_ret = ret;
if (OB_FAIL(GCTX.ob_service_->submit_retry_ghost_index_task(inner_index_schema->get_table_id()))) {
LOG_WARN("fail to submit retry ghost index task", K(ret));
ret = OB_TIMEOUT;
} else {
LOG_INFO("submit build global index task fail but fast retryable",
K(record_ret),
K(inner_index_schema->get_table_id()));
}
} else if (OB_FAIL(ret)) {
LOG_WARN("submit global index task fail, mark it as timeout", K(ret));
ret = OB_TIMEOUT;
}
}
return ret;
......@@ -1413,7 +1434,17 @@ int ObIndexBuilder::submit_build_local_index_task(const ObTableSchema& index_sch
LOG_WARN("fail to add task into ObRSBuildIndexScheduler", K(ret));
}
if (OB_FAIL(ret)) {
// submit retry task if retryable, otherwise report error
if (OB_EAGAIN == ret || OB_ALLOCATE_MEMORY_FAILED == ret) {
int record_ret = ret;
if (OB_FAIL(GCTX.ob_service_->submit_retry_ghost_index_task(index_schema.get_table_id()))) {
LOG_WARN("fail to submit retry ghost index task", K(ret));
ret = OB_TIMEOUT;
} else {
LOG_INFO(
"submit build local index task fail but fast retryable", K(record_ret), K(index_schema.get_table_id()));
}
} else if (OB_FAIL(ret)) {
obrpc::ObUpdateIndexStatusArg arg;
ObSchemaGetterGuard schema_guard;
const ObTableSchema* new_index_schema = NULL;
......@@ -1430,8 +1461,8 @@ int ObIndexBuilder::submit_build_local_index_task(const ObTableSchema& index_sch
} else if (OB_FAIL(schema_guard.get_table_schema(arg.index_table_id_, new_index_schema))) {
LOG_WARN("fail to get table schema", K(ret), K(arg.index_table_id_));
} else if (OB_ISNULL(new_index_schema)) {
ret = OB_SUCCESS;
LOG_WARN("can not find this index schema", K(ret), K(arg.index_table_id_));
ret = OB_SUCCESS;
} else {
LOG_INFO("update index status success", LITERAL_K(INDEX_STATUS_INDEX_ERROR), "index_schema", *new_index_schema);
}
......
......@@ -4918,7 +4918,25 @@ int ObRootService::rebuild_index(const obrpc::ObRebuildIndexArg& arg, obrpc::ObA
return ret;
}
int ObRootService::flashback_index(const ObFlashBackIndexArg& arg)
int ObRootService::submit_build_index_task(const share::schema::ObTableSchema *index_schema)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(index_schema) || !index_schema->is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), KPC(index_schema));
} else if (index_schema->is_global_index_table() &&
OB_FAIL(global_index_builder_.submit_build_global_index_task(index_schema))) {
LOG_WARN("fail to submit build global index task", K(ret), K(*index_schema));
} else if (index_schema->is_index_local_storage()) {
ObIndexBuilder index_builder(ddl_service_);
if (OB_FAIL(index_builder.submit_build_local_index_task(*index_schema))) {
LOG_WARN("fail to submit build local index task", K(ret), K(*index_schema));
}
}
return ret;
}
int ObRootService::flashback_index(const ObFlashBackIndexArg &arg)
{
int ret = OB_SUCCESS;
if (!inited_) {
......
......@@ -964,6 +964,7 @@ public:
int drop_tablegroup(const obrpc::ObDropTablegroupArg& arg);
int drop_index(const obrpc::ObDropIndexArg& arg);
int rebuild_index(const obrpc::ObRebuildIndexArg& arg, obrpc::ObAlterTableRes& res);
int submit_build_index_task(const share::schema::ObTableSchema *index_schema);
// the interface only for switchover: execute skip check enable_ddl
int force_drop_index(const obrpc::ObDropIndexArg& arg);
int flashback_index(const obrpc::ObFlashBackIndexArg& arg);
......
......@@ -31,6 +31,7 @@ enum ObIDDLTaskType {
DDL_TASK_SCHEDULE_BUILD_INDEX = 1,
DDL_TASK_RS_BUILD_INDEX = 2,
DDL_TASK_REFRESH_MEMORY_PERCENTAGE = 3,
DDL_TASK_RETRY_GHOST_INDEX = 4,
};
class ObIDDLTask : public common::ObDLinkBase<ObIDDLTask> {
......
......@@ -292,6 +292,7 @@ class ObString;
ACT(BEFORE_CHECK_BACKUP_TASK_DATA_AVAILABLE, ) \
ACT(BACKUP_BACKUPPIECE_AFTER_SCHEDULE, ) \
ACT(FOLLOWER_BEFORE_UPDATE_RESTORE_FLAG_RESTORE_LOG, ) \
ACT(BEFORE_GLOBAL_INDEX_BUILDER_MOVE_TASK, ) \
ACT(MAX_DEBUG_SYNC_POINT, )
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);
......
......@@ -72,6 +72,9 @@ TG_DEF(DDLTaskExecutor1, DDLTaskExecutor1, "", TG_STATIC, OB_THREAD_POOL,
TG_DEF(DDLTaskExecutor2, DDLTaskExecutor2, "", TG_STATIC, OB_THREAD_POOL,
ThreadCountPair(
storage::ObBuildIndexScheduler::DEFAULT_THREAD_CNT, storage::ObBuildIndexScheduler::MINI_MODE_THREAD_CNT))
TG_DEF(DDLTaskExecutor3, DDLTaskExecutor3, "", TG_STATIC, OB_THREAD_POOL,
ThreadCountPair(storage::ObRetryGhostIndexScheduler::DEFAULT_THREAD_CNT,
storage::ObRetryGhostIndexScheduler::DEFAULT_THREAD_CNT))
TG_DEF(FetchLogEngine, FetchLogEngine, "", TG_STATIC, QUEUE_THREAD,
ThreadCountPair(clog::CLOG_FETCH_LOG_THREAD_COUNT, clog::MINI_MODE_CLOG_FETCH_LOG_THREAD_COUNT),
clog::CLOG_FETCH_LOG_TASK_QUEUE_SIZE)
......@@ -137,4 +140,5 @@ TG_DEF(LogMysqlPool, LogMysqlPool, "", TG_STATIC, TIMER)
TG_DEF(TblCliSqlPool, TblCliSqlPool, "", TG_STATIC, TIMER)
TG_DEF(QueryExecCtxGC, QueryExecCtxGC, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(DtlDfc, DtlDfc, "", TG_STATIC, TIMER)
TG_DEF(DDLRetryGhostIndex, DDLRetryGhostIndex, "", TG_STATIC, TIMER)
#endif
......@@ -38,6 +38,7 @@
#include "observer/ob_service.h"
#include "observer/ob_server_struct.h"
#include "rootserver/ob_index_builder.h"
#include "rootserver/ob_root_service.h"
using namespace oceanbase::storage;
using namespace oceanbase::common;
......@@ -220,51 +221,8 @@ int ObBuildIndexBaseTask::check_partition_split_finish(const ObPartitionKey& pke
return ret;
}
ObTenantDDLCheckSchemaTask::ObTenantDDLCheckSchemaTask()
: ObBuildIndexBaseTask(DDL_TASK_CHECK_SCHEMA), base_version_(-1), refreshed_version_(-1), tenant_id_(OB_INVALID_ID)
{}
ObTenantDDLCheckSchemaTask::~ObTenantDDLCheckSchemaTask()
{}
int ObTenantDDLCheckSchemaTask::init(
const uint64_t tenant_id, const int64_t base_version, const int64_t refreshed_version)
{
int ret = OB_SUCCESS;
if (base_version < 0 || refreshed_version < 0 || OB_INVALID_ID == tenant_id) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(base_version), K(refreshed_version), K(tenant_id));
} else {
base_version_ = base_version;
refreshed_version_ = refreshed_version;
task_id_.init(GCTX.self_addr_);
tenant_id_ = tenant_id;
is_inited_ = true;
}
return ret;
}
bool ObTenantDDLCheckSchemaTask::operator==(const ObIDDLTask& other) const
{
bool is_equal = false;
if (get_type() == other.get_type()) {
const ObTenantDDLCheckSchemaTask& task = static_cast<const ObTenantDDLCheckSchemaTask&>(other);
is_equal = base_version_ == task.base_version_ && refreshed_version_ == task.refreshed_version_;
}
return is_equal;
}
int64_t ObTenantDDLCheckSchemaTask::hash() const
{
uint64_t hash_val = 0;
hash_val = murmurhash(&tenant_id_, sizeof(tenant_id_), hash_val);
hash_val = murmurhash(&base_version_, sizeof(base_version_), hash_val);
hash_val = murmurhash(&refreshed_version_, sizeof(refreshed_version_), hash_val);
return hash_val;
}
int ObTenantDDLCheckSchemaTask::find_build_index_partitions(
const ObTableSchema* index_schema, ObSchemaGetterGuard& guard, common::ObIArray<ObPartitionKey>& partition_keys)
int ObBuildIndexBaseTask::find_build_index_partitions(
const ObTableSchema *index_schema, ObSchemaGetterGuard &guard, common::ObIArray<ObPartitionKey> &partition_keys)
{
int ret = OB_SUCCESS;
const ObTableSchema* table_schema = NULL;
......@@ -336,8 +294,8 @@ int ObTenantDDLCheckSchemaTask::find_build_index_partitions(
return ret;
}
int ObTenantDDLCheckSchemaTask::create_index_partition_table_store(
const common::ObPartitionKey& pkey, const uint64_t index_id, const int64_t schema_version)
int ObBuildIndexBaseTask::create_index_partition_table_store(
const common::ObPartitionKey &pkey, const uint64_t index_id, const int64_t schema_version)
{
int ret = OB_SUCCESS;
ObIPartitionGroupGuard part_guard;
......@@ -368,7 +326,7 @@ int ObTenantDDLCheckSchemaTask::create_index_partition_table_store(
return ret;
}
int ObTenantDDLCheckSchemaTask::generate_schedule_index_task(const common::ObPartitionKey& pkey,
int ObBuildIndexBaseTask::generate_schedule_index_task(const common::ObPartitionKey &pkey,
const uint64_t index_id, const int64_t schema_version, const bool is_unique_index)
{
int ret = OB_SUCCESS;
......@@ -397,7 +355,50 @@ int ObTenantDDLCheckSchemaTask::generate_schedule_index_task(const common::ObPar
return ret;
}
int ObTenantDDLCheckSchemaTask::get_candidate_tables(ObIArray<uint64_t>& table_ids)
ObTenantDDLCheckSchemaTask::ObTenantDDLCheckSchemaTask()
: ObBuildIndexBaseTask(DDL_TASK_CHECK_SCHEMA), base_version_(-1), refreshed_version_(-1), tenant_id_(OB_INVALID_ID)
{}
ObTenantDDLCheckSchemaTask::~ObTenantDDLCheckSchemaTask()
{}
int ObTenantDDLCheckSchemaTask::init(
const uint64_t tenant_id, const int64_t base_version, const int64_t refreshed_version)
{
int ret = OB_SUCCESS;
if (base_version < 0 || refreshed_version < 0 || OB_INVALID_ID == tenant_id) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(base_version), K(refreshed_version), K(tenant_id));
} else {
base_version_ = base_version;
refreshed_version_ = refreshed_version;
task_id_.init(GCTX.self_addr_);
tenant_id_ = tenant_id;
is_inited_ = true;
}
return ret;
}
bool ObTenantDDLCheckSchemaTask::operator==(const ObIDDLTask &other) const
{
bool is_equal = false;
if (get_type() == other.get_type()) {
const ObTenantDDLCheckSchemaTask &task = static_cast<const ObTenantDDLCheckSchemaTask &>(other);
is_equal = base_version_ == task.base_version_ && refreshed_version_ == task.refreshed_version_;
}
return is_equal;
}
int64_t ObTenantDDLCheckSchemaTask::hash() const
{
uint64_t hash_val = 0;
hash_val = murmurhash(&tenant_id_, sizeof(tenant_id_), hash_val);
hash_val = murmurhash(&base_version_, sizeof(base_version_), hash_val);
hash_val = murmurhash(&refreshed_version_, sizeof(refreshed_version_), hash_val);
return hash_val;
}
int ObTenantDDLCheckSchemaTask::get_candidate_tables(ObIArray<uint64_t> &table_ids)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
......@@ -1730,3 +1731,227 @@ void ObBuildIndexScheduler::destroy()
task_executor_.destroy();
is_inited_ = false;
}
ObRetryGhostIndexTask::ObRetryGhostIndexTask()
: ObBuildIndexBaseTask(DDL_TASK_RETRY_GHOST_INDEX), index_id_(OB_INVALID_ID), last_log_timestamp_(0)
{}
ObRetryGhostIndexTask::~ObRetryGhostIndexTask()
{}
int ObRetryGhostIndexTask::init(const uint64_t index_id)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("ObRetryGhostIndexTask has already been inited", K(ret));
} else if (OB_UNLIKELY(OB_INVALID_ID == index_id)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(index_id));
} else {
index_id_ = index_id;
task_id_.init(GCONF.self_addr_);
is_inited_ = true;
}
return ret;
}
int64_t ObRetryGhostIndexTask::hash() const
{
return index_id_;
}
bool ObRetryGhostIndexTask::operator==(const ObIDDLTask &other) const
{
bool is_equal = false;
if (get_type() == other.get_type()) {
const ObRetryGhostIndexTask &other_task = static_cast<const ObRetryGhostIndexTask &>(other);
is_equal = index_id_ == other_task.index_id_;
}
return is_equal;
}
ObRetryGhostIndexTask *ObRetryGhostIndexTask::deep_copy(char *buf, const int64_t size) const
{
int ret = OB_SUCCESS;
ObRetryGhostIndexTask *task = NULL;
if (OB_ISNULL(buf) || size < sizeof(*this)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), KP(buf), K(size));
} else {
task = new (buf) ObRetryGhostIndexTask();
*task = *this;
}
return task;
}
int ObRetryGhostIndexTask::process()
{
int ret = OB_SUCCESS;
const ObTableSchema *index_schema = nullptr;
ObSchemaGetterGuard schema_guard;
ObAddr rs_addr;
if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_full_schema_guard(
extract_tenant_id(index_id_), schema_guard))) {
STORAGE_LOG(WARN, "fail to get schema guard", K(ret), K(index_id_));
} else if (OB_FAIL(schema_guard.get_table_schema(index_id_, index_schema))) {
STORAGE_LOG(WARN, "fail to get table schema", K(ret), K(index_id_));
} else if (OB_ISNULL(index_schema)) {
STORAGE_LOG(INFO, "index schema is deleted, skip it");
} else if (index_schema->is_index_local_storage() && OB_FAIL(retry_local_index(index_schema, schema_guard))) {
STORAGE_LOG(WARN, "fail to retry ghost local index", K(ret), K(*index_schema));
} else if (OB_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_addr))) {
STORAGE_LOG(WARN, "fail to get rootservice address", K(ret));
} else if (rs_addr != GCTX.self_addr_) {
STORAGE_LOG(INFO, "rs is not on this observer, skip");
} else if (NULL == GCTX.root_service_) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "root service is null", K(ret));
} else if (OB_FAIL(GCTX.root_service_->submit_build_index_task(index_schema))) {
STORAGE_LOG(WARN, "fail to submit build index task", K(ret));
}
return ret;
}
int ObRetryGhostIndexTask::retry_local_index(const ObTableSchema *index_schema, ObSchemaGetterGuard &schema_guard)
{
int ret = OB_SUCCESS;
const ObTableSchema *data_table_schema = nullptr;
ObArray<ObPartitionKey> partition_keys;
if (OB_FAIL(find_build_index_partitions(index_schema, schema_guard, partition_keys))) {
if (OB_EAGAIN != ret) {
STORAGE_LOG(WARN, "fail to check need build index", K(ret));
}
} else if (partition_keys.count() > 0) {
if (OB_FAIL(schema_guard.get_table_schema(index_schema->get_data_table_id(), data_table_schema))) {
STORAGE_LOG(WARN, "fail to get data table schema", K(ret));
} else if (OB_ISNULL(data_table_schema)) {
ret = OB_TABLE_NOT_EXIST;
STORAGE_LOG(WARN, "schema error, data table not exist while index table exist", K(ret));
} else {
const int64_t schema_version =
std::max(index_schema->get_schema_version(), data_table_schema->get_schema_version());
for (int64_t i = 0; OB_SUCC(ret) && i < partition_keys.count(); ++i) {
if (OB_FAIL(generate_schedule_index_task(
partition_keys.at(i), index_schema->get_table_id(), schema_version, index_schema->is_unique_index()))) {
STORAGE_LOG(WARN, "fail to generate schedule build index task", K(ret));
}
}
}
}
return ret;
}
ObRetryGhostIndexScheduler::ObRetryGhostIndexScheduler()
: scan_ghost_index_task_(), is_inited_(false), task_executor_(), is_stop_(false)
{}
ObRetryGhostIndexScheduler::~ObRetryGhostIndexScheduler()
{}
int ObRetryGhostIndexScheduler::init()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("ObRetryGhostIndexScheduler has been inited twice", K(ret));
} else if (OB_FAIL(TG_START(lib::TGDefIDs::DDLRetryGhostIndex))) {
LOG_WARN("fail to init timer for DDLRetryGhostIndex", K(ret));
} else if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::DDLRetryGhostIndex,
scan_ghost_index_task_,
DEFAULT_RETRY_GHOST_INDEX_INTERVAL_US,
true /*repeat*/))) {
LOG_WARN("fail to schedule scan_ghost_index_task", K(ret));
} else if (OB_FAIL(task_executor_.init(DEFAULT_BUCKET_NUM, lib::TGDefIDs::DDLTaskExecutor3))) {
LOG_WARN("fail to init task executor", K(ret));
} else {
is_inited_ = true;
}
return ret;
}
int ObRetryGhostIndexScheduler::push_task(ObRetryGhostIndexTask &task)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObRetryGhostIndexScheduler has not been inited", K(ret));
} else if (is_stop_) {
// do nothing
} else if (OB_FAIL(task_executor_.push_task(task))) {
if (OB_LIKELY(OB_ENTRY_EXIST == ret)) {
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to push back task", K(ret));
}
}
return ret;
}
ObRetryGhostIndexScheduler &ObRetryGhostIndexScheduler::get_instance()
{
static ObRetryGhostIndexScheduler instance;
return instance;
}
void ObRetryGhostIndexScheduler::stop()
{
is_stop_ = true;
task_executor_.stop();
}
void ObRetryGhostIndexScheduler::wait()
{
task_executor_.wait();
}
void ObRetryGhostIndexScheduler::destroy()
{
is_inited_ = false;
stop();
wait();
task_executor_.destroy();
}
void ObScanGhostIndexTask::runTimerTask()
{
int ret = OB_SUCCESS;
ObSchemaGetterGuard schema_guard;
ObArray<uint64_t> tenant_ids;
ObRetryGhostIndexScheduler &scheduler = ObRetryGhostIndexScheduler::get_instance();
if (GCTX.is_standby_cluster()) {
// this retry task should not run on standby server
} else if (OB_FAIL(
ObMultiVersionSchemaService::get_instance().get_tenant_full_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
LOG_WARN("fail to get schema guard", K(ret));
} else if (OB_FAIL(schema_guard.get_tenant_ids(tenant_ids))) {
LOG_WARN("fail to get tenant ids", K(ret));
} else {
ObArray<const ObSimpleTableSchemaV2 *> table_schemas;
for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); ++i) {
const uint64_t tenant_id = tenant_ids.at(i);
if (OB_MAX_RESERVED_TENANT_ID >= tenant_id) {
// do nothing for reserved tenant
} else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_full_schema_guard(
tenant_id, schema_guard))) {
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(schema_guard.get_table_schemas_in_tenant(tenant_id, table_schemas))) {
LOG_WARN("fail to get table schemas in tenant", K(ret));
} else {
for (int64_t j = 0; OB_SUCC(ret) && j < table_schemas.count(); ++j) {
const ObSimpleTableSchemaV2 *simple_schema = table_schemas.at(j);
if (simple_schema->is_index_table() && simple_schema->is_unavailable_index()) {
ObRetryGhostIndexTask task;
if (OB_FAIL(task.init(simple_schema->get_table_id()))) {
LOG_WARN("fail to init ObRetryGhostIndexTask", K(ret), K(*simple_schema));
} else if (OB_FAIL(scheduler.push_task(task))) {
LOG_WARN("fail to push ObRetryGhostIndexTask to scheduler", K(ret), K(task));
} else {
LOG_INFO("find unavailable index table", "index_table_id", simple_schema->get_table_id());
}
}
}
}
}
}
}
......@@ -39,11 +39,17 @@ public:
virtual ~ObBuildIndexBaseTask();
static int report_index_status(const uint64_t index_table_id, const int64_t partition_id,
const share::schema::ObIndexStatus index_status, const int build_index_ret, const ObRole role);
static int generate_schedule_index_task(const common::ObPartitionKey &pkey, const uint64_t index_id,
const int64_t schema_version, const bool is_unique_index);
protected:
int check_partition_need_build_index(const common::ObPartitionKey& pkey,
const share::schema::ObTableSchema& index_schema, const share::schema::ObTableSchema& data_table_schema,
storage::ObIPartitionGroupGuard& guard, bool& need_build);
int check_partition_need_build_index(const common::ObPartitionKey &pkey,
const share::schema::ObTableSchema &index_schema, const share::schema::ObTableSchema &data_table_schema,
storage::ObIPartitionGroupGuard &guard, bool &need_build);
int find_build_index_partitions(const share::schema::ObTableSchema *index_schema,
share::schema::ObSchemaGetterGuard &guard, common::ObIArray<common::ObPartitionKey> &partition_keys);
static int create_index_partition_table_store(
const common::ObPartitionKey &pkey, const uint64_t index_id, const int64_t schema_version);
private:
int check_partition_exist_in_current_server(const share::schema::ObTableSchema& index_schema,
......@@ -72,16 +78,10 @@ public:
}
virtual ObIDDLTask* deep_copy(char* buf, const int64_t size) const override;
TO_STRING_KV(K_(tenant_id), K_(base_version), K_(refreshed_version));
static int generate_schedule_index_task(const common::ObPartitionKey& pkey, const uint64_t index_id,
const int64_t schema_version, const bool is_unique_index);
private:
int find_build_index_partitions(const share::schema::ObTableSchema* index_schema,
share::schema::ObSchemaGetterGuard& guard, common::ObIArray<common::ObPartitionKey>& partition_keys);
int get_candidate_tables(common::ObIArray<uint64_t>& table_ids);
static int create_index_partition_table_store(
const common::ObPartitionKey& pkey, const uint64_t index_id, const int64_t schema_version);
int get_candidate_tenants(common::ObIArray<uint64_t>& tenant_ids);
int get_candidate_tables(common::ObIArray<uint64_t> &table_ids);
int get_candidate_tenants(common::ObIArray<uint64_t> &tenant_ids);
int process_schedule_build_index_task();
int process_tenant_memory_task();
......@@ -201,6 +201,70 @@ private:
bool is_stop_;
};
class ObRetryGhostIndexTask : public ObBuildIndexBaseTask {
public:
ObRetryGhostIndexTask();
virtual ~ObRetryGhostIndexTask();
int init(const uint64_t index_id);
virtual int64_t hash() const;
virtual int process();
virtual int64_t get_deep_copy_size() const
{
return sizeof(*this);
}
virtual ObRetryGhostIndexTask *deep_copy(char *buf, const int64_t size) const;
bool operator==(const ObIDDLTask &other) const;
TO_STRING_KV(K_(index_id));
int64_t get_tenant_id() const
{
return extract_tenant_id(index_id_);
}
private:
int retry_local_index(
const share::schema::ObTableSchema *index_schema, share::schema::ObSchemaGetterGuard &schema_guard);
private:
uint64_t index_id_;
int64_t last_log_timestamp_;
};
class ObScanGhostIndexTask : public ObTimerTask {
public:
ObScanGhostIndexTask() = default;
virtual ~ObScanGhostIndexTask() = default;
void runTimerTask();
};
class ObRetryGhostIndexScheduler {
public:
static const int64_t DEFAULT_THREAD_CNT = 1;
public:
int init();
static ObRetryGhostIndexScheduler &get_instance();
int push_task(ObRetryGhostIndexTask &task);
void destroy();
private:
ObRetryGhostIndexScheduler();
virtual ~ObRetryGhostIndexScheduler();
void stop();
void wait();
private:
#ifdef ERRSIM
static const int64_t DEFAULT_RETRY_GHOST_INDEX_INTERVAL_US = 1000L * 1000L * 10L; // 10s
#else
static const int64_t DEFAULT_RETRY_GHOST_INDEX_INTERVAL_US = 1000L * 1000L * 60L * 30L; // 30min
#endif
static const int64_t DEFAULT_BUCKET_NUM = 10000;
ObScanGhostIndexTask scan_ghost_index_task_;
bool is_inited_;
share::ObDDLTaskExecutor task_executor_;
bool is_stop_;
};
} // end namespace storage
} // end namespace oceanbase
......
......@@ -397,6 +397,8 @@ int ObPartitionService::init(const blocksstable::ObStorageEnv& env, const ObAddr
LOG_WARN("failed to init ObTableMgr", K(ret));
} else if (OB_FAIL(ObBuildIndexScheduler::get_instance().init())) {
STORAGE_LOG(WARN, "fail to init ObBuildIndexScheduler", K(ret));
} else if (OB_FAIL(ObRetryGhostIndexScheduler::get_instance().init())) {
LOG_WARN("fail to init ObRetryGhostIndexScheduler", K(ret));
} else if (OB_FAIL(ObFreezeInfoMgrWrapper::init(sql_proxy, remote_sql_proxy))) {
STORAGE_LOG(WARN, "fail to init ObFreezeInfoSnapshotMgr", K(ret));
} else if (OB_FAIL(garbage_collector_.init(this, txs_, schema_service, GCTX.srv_rpc_proxy_, &sql_proxy, self_addr))) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册