From 81ddf033084cd6eed52d5560813a4a2742721af1 Mon Sep 17 00:00:00 2001 From: obdev Date: Tue, 5 Jul 2022 16:12:52 +0800 Subject: [PATCH] fix ttl issue 42742997 & 42787173 & 42954103 & 42920788 & 42876827 & 42869248 & 42868078 & 42867515 & 42767501 --- .../table/ob_htable_filter_operator.cpp | 2 +- src/observer/table/ob_table_service.cpp | 2 +- src/observer/table/ob_table_ttl_manager.cpp | 784 ++++++++++++------ src/observer/table/ob_table_ttl_manager.h | 77 +- src/observer/table/ob_table_ttl_task.cpp | 6 +- src/rootserver/ob_ttl_scheduler.cpp | 142 ++-- src/rootserver/ob_ttl_scheduler.h | 8 +- src/share/table/ob_ttl_util.cpp | 109 ++- src/share/table/ob_ttl_util.h | 22 +- src/storage/ob_partition_service.cpp | 5 +- 10 files changed, 800 insertions(+), 357 deletions(-) diff --git a/src/observer/table/ob_htable_filter_operator.cpp b/src/observer/table/ob_htable_filter_operator.cpp index 1c6d7d984e..fffd9d4e51 100644 --- a/src/observer/table/ob_htable_filter_operator.cpp +++ b/src/observer/table/ob_htable_filter_operator.cpp @@ -32,7 +32,7 @@ int ObHColumnDescriptor::from_string(const common::ObString &str) } else if (OB_FAIL(json_parser.init(&allocator))) { LOG_WARN("failed to init json parser", K(ret)); } else if (OB_FAIL(json_parser.parse(str.ptr(), str.length(), ast))) { - LOG_WARN("failed to parse", K(ret), K(str)); + LOG_DEBUG("failed to parse", K(ret), K(str)); ret = OB_SUCCESS; } else if (NULL != ast && ast->get_type() == json::JT_OBJECT diff --git a/src/observer/table/ob_table_service.cpp b/src/observer/table/ob_table_service.cpp index bcca6e0ac4..40dbd9f167 100644 --- a/src/observer/table/ob_table_service.cpp +++ b/src/observer/table/ob_table_service.cpp @@ -1736,7 +1736,7 @@ int ObTableService::fill_query_table_param(uint64_t table_id, } else if (OB_FAIL(schema_guard.get_table_schema(table_id, table_schema))) { LOG_WARN("get table schema failed", K(table_id), K(ret)); } else if (OB_ISNULL(table_schema)) { - ret = OB_ERR_UNEXPECTED; + ret = OB_TABLE_NOT_EXIST; LOG_ERROR("NULL ptr", K(ret), K(table_schema)); } else if (OB_FAIL(get_index_id_by_name(schema_guard, table_id, index_name, index_id, rowkey_columns_type, index_schema))) { diff --git a/src/observer/table/ob_table_ttl_manager.cpp b/src/observer/table/ob_table_ttl_manager.cpp index 0bc4f644aa..892a78f833 100644 --- a/src/observer/table/ob_table_ttl_manager.cpp +++ b/src/observer/table/ob_table_ttl_manager.cpp @@ -54,6 +54,7 @@ int ObTTLManager::start() { int ret = OB_SUCCESS; if (!is_init_) { + ret = OB_NOT_INIT; LOG_WARN("ttl manager not init", K(ret)); } else if (OB_FAIL(ttl_timer_.schedule(periodic_task_, periodic_delay_, true))) { LOG_WARN("fail to schedule periodic task", K(ret)); @@ -111,24 +112,23 @@ int ObTTLManager::scan_all_tenanat_handle_event() } else { common::ObSpinLockGuard guard(lock_); for (ttl_tenants_iterator iter = ttl_tenant_parts_map_.begin(); - iter != ttl_tenant_parts_map_.end(); ++iter) { + iter != ttl_tenant_parts_map_.end() && OB_SUCC(ret); ++iter) { tenant_id = iter->first; tenant_info = iter->second; - if (tenant_info->need_check_ && - OB_TTL_TASK_CANCEL > tenant_info->state_ && - OB_FAIL(check_tenants.push_back(tenant_id))) { + if (tenant_info->need_check_ && OB_FAIL(check_tenants.push_back(tenant_id))) { + // after observer restart, need check tenant even when cancel and move state LOG_WARN("fail to push back check tenants", K(ret)); - } + } - if (tenant_info->is_dirty_ && OB_FAIL(dirty_tenants.push_back(tenant_id))) { + if (OB_SUCC(ret) && tenant_info->is_dirty_ && OB_FAIL(dirty_tenants.push_back(tenant_id))) { LOG_WARN("fail to push back dirty tenants", K(ret)); } else if (OB_TTL_TASK_MOVING == tenant_info->state_ && OB_FAIL(need_move_tenants.push_back(tenant_id))) { LOG_WARN("fail to push back move operation", K(tenant_id)); } - if ((tenant_info->is_droped_ || tenant_info->rsp_time_ != OB_INVALID_ID) && - OB_FAIL(need_rsp_tenants.push_back(tenant_id))) { + if (OB_SUCC(ret) && (tenant_info->is_droped_ || tenant_info->rsp_time_ != OB_INVALID_ID) && + OB_FAIL(need_rsp_tenants.push_back(tenant_id))) { // todo: remove is_droped_ LOG_WARN("fail to push back rsp operation", K(tenant_id)); } } @@ -142,7 +142,7 @@ int ObTTLManager::scan_all_tenanat_handle_event() for (int i = 0; i < dirty_tenants.count() && OB_SUCC(ret); ++i) { if (OB_FAIL(inner_handle_single_tenant_event(dirty_tenants.at(i), - sync_sys_partitions))) { + sync_sys_partitions))) { LOG_WARN("fail to handle tenant event in timer", K(tenant_id)); } @@ -163,7 +163,7 @@ int ObTTLManager::scan_all_tenanat_handle_event() /*do moving*/ for (int i = 0; i < need_move_tenants.count() && OB_SUCC(ret); ++i) { if (OB_FAIL(move_record_to_history_table(need_move_tenants.at(i)))) { - LOG_WARN("fail to do check and response", K(ret)); + LOG_WARN("fail to move record to history table", K(ret)); } } @@ -174,6 +174,12 @@ int ObTTLManager::scan_all_tenanat_handle_event() } } + if (OB_TENANT_NOT_EXIST == ret) { + LOG_INFO("begin to check and reset dropped tenant", K(ret)); + if (OB_FAIL(check_and_reset_droped_tenant())) { + LOG_WARN("fail to check and reset dropped tenant", K(ret)); + } + } return ret; } @@ -191,23 +197,29 @@ void ObTTLManager::check_ttl_tenant_state(uint64_t tenant_id) LOG_WARN("tenant info is null", K(tenant_id), K(ret)); } else { for (ttl_parts_iterator iter = tenant_info->part_task_map_.begin(); - !tenant_dirty && iter != tenant_info->part_task_map_.end(); ++iter) { + !tenant_dirty && iter != tenant_info->part_task_map_.end(); ++iter) { ctx = iter->second; if (OB_ISNULL(ctx)) { LOG_WARN("fatal err, ttl ctx in map is null", K(tenant_info->tenant_id_)); } else if (ctx->is_dirty_) { tenant_dirty = true; } else if (ctx->task_status_ != OB_TTL_TASK_CANCEL && - ctx->task_status_ != OB_TTL_TASK_FINISH) { + ctx->task_status_ != OB_TTL_TASK_FINISH) { tenant_finish = false; + tenant_info->is_finished_ = false; } } } if (OB_SUCC(ret) && !tenant_dirty) { tenant_info->is_dirty_ = false; - if (tenant_finish && tenant_info->state_ < OB_TTL_TASK_MOVING) { - tenant_info->state_ = (tenant_info->state_ == OB_TTL_TASK_CANCEL) ? OB_TTL_TASK_CANCEL :OB_TTL_TASK_FINISH; + if (tenant_finish) { + if (tenant_info->state_ == OB_TTL_TASK_CANCEL || tenant_info->state_ == OB_TTL_TASK_RUNNING) { + // all task already in cancel or runing status + tenant_info->is_finished_ = true; + } else { + ret = OB_ERR_UNEXPECTED; + } } } LOG_DEBUG("check ttl tenant dirty", K(tenant_info->is_dirty_), K(tenant_info->state_), K(ret), K(tenant_id)); @@ -240,26 +252,24 @@ void ObTTLManager::mark_tenant_rsp(uint64_t tenant_id, int64_t rsp_time) int ObTTLManager::check_and_do_rsp(uint64_t tenant_id) { int ret = OB_SUCCESS; - bool can_rsp = true; + bool can_rsp = false; int64_t rsp_time = OB_INVALID_ID; ObTTLTaskStatus rsp_status; ObTTLTenantInfo* tenant_info = NULL; ObTTLTaskCtx* ctx = NULL; - bool tenant_is_droped = false; { common::ObSpinLockGuard guard(lock_); tenant_info = get_tenant_info(tenant_id, false); if (OB_ISNULL(tenant_info)) { - can_rsp = false; ret = OB_INVALID_ARGUMENT; LOG_WARN("tenant info is null", K(tenant_id)); - } else if ((tenant_is_droped = tenant_info->is_droped_)) { - // do nothing if tenant is droped } else if (tenant_info->rsp_time_ != OB_INVALID_ID) { + can_rsp = true; rsp_status = tenant_info->state_; rsp_time = tenant_info->rsp_time_; for (ttl_parts_iterator iter = tenant_info->part_task_map_.begin(); - can_rsp && iter != tenant_info->part_task_map_.end(); ++iter) { + can_rsp && iter != tenant_info->part_task_map_.end(); + ++iter) { ctx = iter->second; if (OB_ISNULL(ctx)) { can_rsp = false; @@ -283,7 +293,7 @@ int ObTTLManager::check_and_do_rsp(uint64_t tenant_id) LOG_WARN("fail to response ttl task to rs", K(ret), K(tenant_info->task_id_), K(rsp_status)); } - if (OB_SUCC(ret) && (can_rsp || tenant_is_droped)) { + if (OB_SUCC(ret) && (can_rsp)) { mark_tenant_rsp(tenant_id, rsp_time); } } @@ -291,32 +301,60 @@ int ObTTLManager::check_and_do_rsp(uint64_t tenant_id) return ret; } -int ObTTLManager::check_cmd_state_valid(common::ObTTLTaskStatus current_state, common::ObTTLTaskStatus incoming_state) +/* transformation of state of tenant info (most be driven by rs): + * invalid -> running -> moving + * -> canceling -> moving + * -> pending -> running + * -> canceling + * -> moving + * -> canceling + * -> pending + */ + +int ObTTLManager::check_cmd_state_valid(const common::ObTTLTaskStatus current_state, + const common::ObTTLTaskStatus incoming_state) { int ret = OB_SUCCESS; - if ((incoming_state < OB_TTL_TASK_RUNNING || incoming_state > OB_TTL_TASK_CANCEL) && - (incoming_state != OB_TTL_TASK_MOVING)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("fatal error, invalid state type", K(incoming_state)); - } else if (incoming_state == OB_TTL_TASK_MOVING) { - if (current_state != OB_TTL_TASK_FINISH && current_state != OB_TTL_TASK_CANCEL) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("receive a move cmd, current task state is wrong", K(current_state)); + switch (incoming_state) { + case OB_TTL_TASK_RUNNING: { + if (current_state != OB_TTL_TASK_PENDING && current_state != OB_TTL_TASK_INVALID && + current_state != OB_TTL_TASK_RUNNING) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("receive rs cmd, but current tenant state is unmatached", + K(ret), K(current_state), K(incoming_state)); + } + break; } - } else if (OB_TTL_TASK_RUNNING == incoming_state) { - if (current_state >= OB_TTL_TASK_CANCEL && current_state != OB_TTL_TASK_INVALID) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("receive a cmd, current task state is wrong", K(current_state)); + case OB_TTL_TASK_MOVING: { + if (current_state != OB_TTL_TASK_RUNNING && current_state != OB_TTL_TASK_CANCEL && + current_state != OB_TTL_TASK_INVALID && current_state != OB_TTL_TASK_MOVING) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("receive a move cmd, current task state is unmatached", K(current_state)); + } + break; } - } else if (OB_TTL_TASK_PENDING == incoming_state) { - if (current_state >= OB_TTL_TASK_CANCEL) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("receive a cmd, current task state is wrong", K(current_state)); + case OB_TTL_TASK_PENDING: { + if (current_state != OB_TTL_TASK_RUNNING && current_state != OB_TTL_TASK_INVALID && + current_state != OB_TTL_TASK_PENDING) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("receive rs cmd, but current tenant state is unmatached", + K(ret), K(current_state), K(incoming_state)); + } + break; } - } else if (OB_TTL_TASK_CANCEL == incoming_state) { - if (current_state > OB_TTL_TASK_FINISH) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("receive a cancel cmd, current task state is wrong", K(current_state)); + case OB_TTL_TASK_CANCEL: { + if (current_state != OB_TTL_TASK_PENDING && current_state != OB_TTL_TASK_RUNNING && + current_state != OB_TTL_TASK_INVALID && current_state != OB_TTL_TASK_CANCEL) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("receive rs cmd, but current tenant state is unmatached", + K(ret), K(current_state), K(incoming_state)); + } + break; + } + default: { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid incoming status", K(ret), K(incoming_state)); + break; } } return ret; @@ -341,6 +379,34 @@ int ObTTLManager::transform_cmd_to_state(const ObTTLRequestArg::TTLRequestType& return ret; } +// get cmd type from rs state +ObTTLRequestArg::TTLRequestType ObTTLManager::transform_state_to_cmd(const int64_t state) +{ + ObTTLRequestArg::TTLRequestType task_type = ObTTLRequestArg::TTL_INVALID_TYPE; + switch (state) { + case static_cast(ObTTLTaskStatus::OB_RS_TTL_TASK_CREATE): { + task_type = ObTTLRequestArg::TTL_TRIGGER_TYPE; + break; + } + case static_cast(ObTTLTaskStatus::OB_RS_TTL_TASK_SUSPEND): { + task_type = ObTTLRequestArg::TTL_SUSPEND_TYPE; + break; + } + case static_cast(ObTTLTaskStatus::OB_RS_TTL_TASK_CANCEL): { + task_type = ObTTLRequestArg::TTL_CANCEL_TYPE; + break; + } + case static_cast(ObTTLTaskStatus::OB_RS_TTL_TASK_MOVE): { + task_type = ObTTLRequestArg::TTL_MOVE_TYPE; + break; + } + default: { + break; + } + } + return task_type; +} + // RS TTL message entrance int ObTTLManager::proc_rs_cmd(uint64_t tenant_id, uint64_t task_id, bool is_usr_trigger, ObTTLRequestArg::TTLRequestType cmd) @@ -348,51 +414,61 @@ int ObTTLManager::proc_rs_cmd(uint64_t tenant_id, uint64_t task_id, int ret = OB_SUCCESS; ObTTLTenantInfo* tenant_info = NULL; common::ObTTLTaskStatus expected_state; - bool need_create_tenant_info = false; + const bool create_if_not_exists = true; - common::ObSpinLockGuard guard(lock_); - if (OB_FAIL(transform_cmd_to_state(cmd, expected_state))) { + if (!is_init_) { + ret = OB_NOT_INIT; + LOG_WARN("ttl manager not init", K(tenant_id), K(task_id), K(is_usr_trigger)); + } else if (task_id == OB_INVALID_ID) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected invalid task id", K(tenant_id), K(ret)); + } else if (OB_FAIL(transform_cmd_to_state(cmd, expected_state))) { LOG_WARN("invalid cmd type", K(tenant_id), K(task_id), K(is_usr_trigger), K(cmd)); - } else { - need_create_tenant_info = (OB_TTL_TASK_RUNNING == expected_state) ? true : false; - } + } else {} - if (OB_FAIL(ret)) { - } else if (!is_init_) { - ret = OB_ENTRY_NOT_EXIST; - LOG_WARN("ttl manager not init", K(tenant_id), K(task_id), K(is_usr_trigger)); - } else if (OB_ISNULL(tenant_info = get_tenant_info(tenant_id, need_create_tenant_info))) { - ret = OB_ENTRY_NOT_EXIST; - LOG_WARN("fail to get ttl tenant info", K(tenant_id)); - } else if (OB_FAIL(check_cmd_state_valid(tenant_info->state_, expected_state))) { - LOG_WARN("ttl cmd state machine is wrong", K(ret), K(tenant_id), K(task_id), K(is_usr_trigger)); - } else { - tenant_info->cmd_type_ = cmd; - if (OB_INVALID_ID == tenant_info->task_id_) { - if (OB_TTL_TASK_RUNNING == expected_state) { - //new ttl tenant + if (OB_SUCC(ret)) { + common::ObSpinLockGuard guard(lock_); + if (OB_FAIL(ret)) { + // do nothing + } else if (OB_ISNULL(tenant_info = get_tenant_info(tenant_id, create_if_not_exists))) { + ret = OB_ENTRY_NOT_EXIST; + LOG_WARN("fail to get ttl tenant info", K(tenant_id), K(create_if_not_exists)); + } else if (tenant_info->need_check_) { + ret = OB_EAGAIN; + LOG_INFO("tenant info need check, please resend message later", KPC(tenant_info), K(expected_state)); + } else if (OB_UNLIKELY(tenant_info->task_id_ != OB_INVALID_ID && tenant_info->task_id_ != task_id)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ttl task id is wrong", K(ret), K(tenant_id), K(tenant_info->task_id_), K(task_id)); + } else if (OB_FAIL(check_cmd_state_valid(tenant_info->state_, expected_state))) { + LOG_WARN("ttl cmd state machine is wrong", K(ret), K(tenant_id), K(task_id), K(is_usr_trigger)); + } else { + tenant_info->cmd_type_ = cmd; + if (OB_INVALID_ID == tenant_info->task_id_) { + // new ttl tenant info tenant_info->task_id_ = task_id; tenant_info->is_usr_trigger_ = is_usr_trigger; - tenant_info->state_ = OB_TTL_TASK_RUNNING; + tenant_info->state_ = expected_state; tenant_info->need_check_ = true; tenant_info->is_dirty_ = true; - LOG_INFO("new tenent info", K(ret), K(tenant_id), K(tenant_info->task_id_)); - } else if (OB_INVALID_ID == tenant_info->task_id_) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid task id for current state", K(ret), K(expected_state), K(tenant_id), - K(task_id), K(tenant_info->task_id_)); + if (OB_TTL_TASK_MOVING == expected_state) { + // after restart, rs send moving means all tasks was finished or canceled + tenant_info->is_finished_ = true; + } + LOG_INFO("new tenent info", K(ret), K(tenant_id), KPC(tenant_info)); + } + + if (OB_TTL_TASK_MOVING == expected_state && !tenant_info->is_finished_) { + ret = OB_EAGAIN; + LOG_WARN("can not move a unfinished task", K(ret), K(tenant_id), KPC(tenant_info)); + } else { + tenant_info->state_ = expected_state; + tenant_info->is_dirty_ = true; } - } else if (tenant_info->state_ == expected_state) { - //duplicate msg - LOG_INFO("tenant state is duplicated", K(ret), K(expected_state)); - } else { - tenant_info->state_ = expected_state; - tenant_info->is_dirty_ = true; - } - if (OB_SUCC(ret)) { - //receive the msg, need to rsp - tenant_info->rsp_time_ = ObTimeUtility::current_time(); + if (OB_SUCC(ret)) { + //receive the msg, need to rsp + tenant_info->rsp_time_ = ObTimeUtility::current_time(); + } } } LOG_INFO("finish process rs cmd", K(ret), K(tenant_id), K(task_id), K(expected_state)); @@ -413,24 +489,25 @@ void ObTTLManager::mark_tenant_need_check(uint64_t tenant_id) LOG_DEBUG("finsh mark tenant need check", K(ret)); } -void ObTTLManager::on_leader_active(ObIPartitionGroup* partition) +void ObTTLManager::on_leader_active(const ObPartitionKey& pkey) { int ret = OB_SUCCESS; ObTTLPara para; bool can_ttl = false; + uint64_t tenant_id = pkey.get_tenant_id(); if (!is_init_) { ret = OB_NOT_INIT; LOG_WARN("ttl manager not init"); - } else if (OB_ISNULL(partition)) { - LOG_WARN("invalid partition on leader active"); - } else if (!partition->is_valid()) { + } else if (!pkey.is_valid()) { + // do nothing + } else if(OB_SYS_TENANT_ID == tenant_id) { // do nothing - } else if (!common::ObTTLUtil::check_can_process_tenant_tasks(partition->get_partition_key().get_tenant_id())) { + } else if (!common::ObTTLUtil::check_can_process_tenant_tasks(tenant_id)) { //do nothing - } else if (OB_FAIL(check_partition_can_gen_ttl(partition, para, can_ttl))) { - LOG_WARN("fail to check partition can ddl", K(ret), K(partition->get_partition_key())); + } else if (OB_FAIL(check_partition_can_gen_ttl(pkey, para, can_ttl))) { + LOG_WARN("fail to check partition can ttl", K(ret), K(pkey), K(para)); } else if (can_ttl) { - mark_tenant_need_check(partition->get_partition_key().get_tenant_id()); + mark_tenant_need_check(tenant_id); } } @@ -483,8 +560,9 @@ int ObTTLManager::report_task_status(ObTTLTaskInfo& task_info, ObTTLPara& task_p } else if (OB_ITER_END == task_info.err_code_) { ctx->task_status_ = OB_TTL_TASK_FINISH; ctx->task_info_.err_code_ = OB_SUCCESS; - } else if (OB_NOT_MASTER == task_info.err_code_ && - OB_PARTITION_NOT_EXIST == task_info.err_code_) { + } else if (OB_NOT_MASTER == task_info.err_code_ || + OB_PARTITION_NOT_EXIST == task_info.err_code_ || + OB_TABLE_NOT_EXIST == task_info.err_code_) { LOG_INFO("Cancel current task since partition state change", K(task_info.err_code_), K(task_info.pkey_)); ctx->task_status_ = OB_TTL_TASK_CANCEL; @@ -497,7 +575,7 @@ int ObTTLManager::report_task_status(ObTTLTaskInfo& task_info, ObTTLPara& task_p } //schedule task - if (is_stop && OB_FAIL(try_schedule_remaining_tasks(tenant_info))) { + if (is_stop && OB_FAIL(try_schedule_remaining_tasks(tenant_info, ctx))) { LOG_WARN("fail to try schedule task", K(ret)); } return ret; @@ -559,28 +637,37 @@ int ObTTLManager::generate_tenant_tasks(uint64_t tenant_id) { int ret = OB_SUCCESS; ObIPartitionArrayGuard partitions; - ObPartitionState state; ObIPartitionGroup *partition = NULL; bool can_ttl = false; if (OB_FAIL(ObPartitionService::get_instance().get_all_partitions(partitions))) { - LOG_WARN("fail to get all partition", K(ret)); + LOG_WARN("fail to get all partition", K(ret), K(tenant_id)); } else { //filter the partition for (int64_t i = 0; OB_SUCC(ret) && i < partitions.count(); ++i) { - partition = partitions.at(i); - ObPartitionKey pk = partition->get_partition_key(); - ObTTLPara para; - if ((tenant_id != partition->get_partition_key().get_tenant_id())) { - //do nothing - } else if (OB_FAIL(check_partition_can_gen_ttl(partition, para, can_ttl))) { - LOG_WARN("fail to check partition can get ttl", K(ret)); + ObPartitionArray pkeys; + ObPartitionState state; + state = partitions.at(i)->get_partition_state(); + if (!is_leader_state(state)) { + // do nothing, the partition should be a leader + } else if (OB_FAIL(partitions.at(i)->get_all_pg_partition_keys(pkeys))) { + LOG_WARN("fail to get all pg partition keys", "pg_key", partitions.at(i)->get_partition_key(), K(pkeys)); } else { - if (can_ttl) { - ObTTLTaskInfo task_info; - task_info.pkey_ = pk; - if (OB_FAIL(generate_one_partition_task(task_info, para))) { - LOG_WARN("fail to generate task", K(ret), K(pk)); + for (int pkey_index = 0; OB_SUCC(ret) && pkey_index < pkeys.count(); ++pkey_index) { + ObTTLPara para; + ObPartitionKey pkey = pkeys.at(pkey_index); + if ((tenant_id != pkey.get_tenant_id())) { + //do nothing + } else if (OB_FAIL(check_partition_can_gen_ttl(pkey, para, can_ttl))) { + LOG_WARN("fail to check partition can get ttl", K(ret), K(pkey), K(para)); + } else { + if (can_ttl) { + ObTTLTaskInfo task_info; + task_info.pkey_ = pkey; + if (OB_FAIL(generate_one_partition_task(task_info, para))) { + LOG_WARN("fail to generate task", K(ret), K(task_info), K(para)); + } + } } } } @@ -716,10 +803,9 @@ int ObTTLManager::generate_ttl_dag(ObTTLTaskInfo& task_info, ObTTLPara& para) LOG_WARN("fail to add ttl prepare task to dag", K(ret)); } else if (OB_FAIL(dag_scheduler.add_dag(dag))) { if (OB_EAGAIN == ret) { - ret = OB_SUCCESS; - LOG_INFO("ttl dag already exists, no need to schedule once again"); + LOG_DEBUG("ttl dag already exists, no need to schedule once again", K(ret)); } else if (OB_SIZE_OVERFLOW == ret) { - ret = OB_EAGAIN; + LOG_DEBUG("dag is full", K(ret)); } else { LOG_WARN("fail to add dag to queue", K(ret)); } @@ -747,8 +833,7 @@ int ObTTLManager::inner_handle_single_tenant_event(uint64_t tenant_id, common::O ret = OB_ERR_NULL_VALUE; LOG_WARN("fatal err, ttl ctx in map is null", K(tenant_info->tenant_id_), K(ret)); } else if (OB_FAIL(inner_handle_one_partition_event(tenant_info, ctx))) { - LOG_WARN("fail to handle one partition event", - K(ret), K(ctx->task_info_.pkey_)); + LOG_WARN("fail to handle one partition event", K(ret), K(ctx->task_info_.pkey_)); } else if (ctx->is_dirty_ && OB_FAIL(parts_array.push_back(ctx->task_info_.pkey_))) { LOG_WARN("fail to pushback ttl pk", K(ret), K(ctx->task_info_.pkey_)); } @@ -772,25 +857,49 @@ int ObTTLManager::inner_handle_one_partition_event(ObTTLTenantInfo* tenant_info, if (OB_TTL_TASK_RUNNING == tenant_info->state_) { if (OB_TTL_TASK_PENDING == ctx->task_status_) { try_schedule = true; - } + } else if (OB_TTL_TASK_PREPARE == ctx->task_status_ || + OB_TTL_TASK_FINISH == ctx->task_status_ || + OB_TTL_TASK_CANCEL == ctx->task_status_) { + // do nothing + } else { + LOG_WARN("no expected task status", KPC(tenant_info), KPC(ctx)); + } } else if (OB_TTL_TASK_PENDING == tenant_info->state_) { - if (OB_TTL_TASK_RUNNING == ctx->task_status_) { + if (OB_TTL_TASK_RUNNING == ctx->task_status_ || + OB_TTL_TASK_PREPARE == ctx->task_status_) { ctx->task_status_ = tenant_info->state_; mark_ttl_ctx_dirty(tenant_info, ctx); + } else if (OB_TTL_TASK_FINISH == tenant_info->state_){ + // do nothing, no need schedule finish task again + } else { + LOG_WARN("no expected task status", KPC(tenant_info), KPC(ctx)); } } else if (OB_TTL_TASK_CANCEL == tenant_info->state_) { - if (OB_TTL_TASK_PENDING >= ctx->task_status_) { + if (OB_TTL_TASK_PREPARE == ctx->task_status_ || + OB_TTL_TASK_RUNNING == ctx->task_status_ || + OB_TTL_TASK_PENDING == ctx->task_status_ || + OB_TTL_TASK_FINISH == ctx->task_status_) { ctx->task_status_ = tenant_info->state_; mark_ttl_ctx_dirty(tenant_info, ctx); + } else { + LOG_WARN("no expected task status", KPC(tenant_info), KPC(ctx)); } } else if (OB_TTL_TASK_MOVING == tenant_info->state_) { - //do nothing + if (OB_TTL_TASK_PREPARE == ctx->task_status_) { + ctx->task_status_ = OB_TTL_TASK_FINISH; // will refresh real status from task table + mark_ttl_ctx_dirty(tenant_info, ctx); + } else if (OB_TTL_TASK_FINISH == ctx->task_status_ || + OB_TTL_TASK_CANCEL == ctx->task_status_) { + // do nothing, normal partition task + } else { + LOG_WARN("no expected task status", KPC(tenant_info), KPC(ctx)); + } } else { LOG_WARN("invalid ttl tenant task state", K(tenant_info->state_)); } if (try_schedule && OB_FAIL(try_schedule_task(tenant_info, ctx))) { - if (OB_EAGAIN != ret) { + if (OB_SIZE_OVERFLOW != ret) { LOG_WARN("fail to try schedule dag task", K(ret), K(ctx->task_info_.pkey_)); } else { ret = OB_SUCCESS; @@ -895,35 +1004,32 @@ int ObTTLManager::get_ttl_para_from_schema(const schema::ObTableSchema *table_sc return ret; } -int ObTTLManager::check_partition_can_gen_ttl(ObIPartitionGroup *partition, +int ObTTLManager::check_partition_can_gen_ttl(const ObPartitionKey& pkey, ObTTLPara ¶, bool& can_ttl) { int ret = OB_SUCCESS; const schema::ObTableSchema *table_schema = NULL; schema::ObSchemaGetterGuard schema_guard; ObTTLTaskCtx* ttl_ctx = NULL; - ObPartitionState state; can_ttl = false; - if (OB_ISNULL(partition)) { + if (OB_UNLIKELY(!pkey.is_valid() || pkey.is_pg())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("partition is null", K(ret)); + LOG_WARN("invalid argument", K(ret), K(pkey), K(pkey.is_pg())); } else { - ObPartitionKey pkey = partition->get_partition_key(); - state = partition->get_partition_state(); - if (!is_leader_state(state)) { - //do nothing, the partition should be a leader - } else if (OB_FAIL(schema_service_->get_tenant_schema_guard(pkey.get_tenant_id(), schema_guard))) { + uint64_t tenant_id = pkey.get_tenant_id(); + uint64_t table_id = pkey.get_table_id(); + if (OB_FAIL(schema_service_->get_schema_guard(schema_guard))) { LOG_WARN("failed to get schema guard", K(ret)); - } else if (OB_FAIL(schema_guard.get_table_schema(pkey.get_table_id(), table_schema))) { - LOG_WARN("get table schema failed", K(pkey.get_table_id()), K(ret)); + } else if (OB_FAIL(schema_guard.get_table_schema(table_id, table_schema))) { + LOG_WARN("get table schema failed", K(table_id), K(ret)); } else if (OB_ISNULL(table_schema)) { ret = OB_SCHEMA_ERROR; LOG_WARN("null table schema", K(ret)); } else if (table_schema->is_in_recyclebin()) { // do nothing } else if (table_schema->get_comment_str().empty()) { - //do nothing + // do nothing } else if (OB_NOT_NULL(ttl_ctx = get_one_partition_ctx(pkey))) { LOG_DEBUG("partition task exist", K(pkey)); } else if (OB_FAIL(get_ttl_para_from_schema(table_schema, para, can_ttl))) { @@ -947,7 +1053,7 @@ int ObTTLManager::try_schedule_prepare_task(ObPartitionKey& pkey) // do nothing } else if (FALSE_IT(ctx->task_status_ = OB_TTL_TASK_PENDING)) { } else if (OB_FAIL(try_schedule_task(tenant_info, ctx))) { - if (OB_EAGAIN != ret) { + if (OB_SIZE_OVERFLOW != ret) { LOG_WARN("fail to schedule task", K(ret)); } else { ret = OB_SUCCESS; @@ -960,36 +1066,62 @@ int ObTTLManager::try_schedule_prepare_task(ObPartitionKey& pkey) int ObTTLManager::sync_sys_table(ObPartitionKey& pkey) { int ret = OB_SUCCESS; - ObTTLTaskCtx cp_ctx; ObArenaAllocator allocator(lib::ObLabel("TTLStatusRecord")); - uint64_t tenant_id = OB_INVALID_ID; + uint64_t tenant_id = pkey.get_tenant_id(); + ObTTLTaskCtx* ctx = NULL; if (OB_UNLIKELY(!pkey.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(pkey)); } else { - tenant_id = pkey.get_tenant_id(); common::ObSpinLockGuard guard(lock_); - ObTTLTaskCtx* ctx = get_one_partition_ctx(pkey); + ctx = get_one_partition_ctx(pkey); if (OB_ISNULL(ctx)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("ctx is null", K(ret)); } else if (OB_UNLIKELY(!ctx->is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid partition task ctx", K(ret), KPC(ctx)); - } else { - cp_ctx = *ctx; - if (ctx->task_info_.row_key_.empty()) { - //do nothing - } else if (OB_FAIL(ob_write_string(allocator, ctx->task_info_.row_key_, cp_ctx.task_info_.row_key_)) ) { - LOG_ERROR("fail to deep copy first key", K(ret)); + } else {} + } + + if (OB_SUCC(ret) && OB_UNLIKELY(ctx->need_refresh_)) { + switch (ctx->task_status_) { + case OB_TTL_TASK_FINISH: // tenant_info must be in moving status + case OB_TTL_TASK_PREPARE: { + if (OB_FAIL(refresh_partition_task(*ctx, true /*refresh_status*/, true))) { + LOG_WARN("fail to refresh partition task from task table", K(ret)); + } else { + if (ctx->task_info_.err_code_ == OB_NOT_MASTER || + (ctx->task_status_ != OB_TTL_TASK_FINISH && ctx->task_status_ != OB_TTL_TASK_CANCEL)) { + ctx->task_status_ = OB_TTL_TASK_PREPARE; + } + ctx->need_refresh_ = false; + ctx->task_info_.err_code_ = OB_SUCCESS; + } + break; + } + case OB_TTL_TASK_RUNNING: + case OB_TTL_TASK_PENDING: + case OB_TTL_TASK_CANCEL: { + if (OB_FAIL(refresh_partition_task(*ctx, false /*refresh_status*/))) { + LOG_WARN("fail to refresh partition task from task table", K(ret)); + } else { + ctx->need_refresh_ = false; + } + break; + } + default: { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected ttl task status", K(ret)); + break; } } } if (OB_SUCC(ret)) { common::ObTTLStatus ttl_record; - switch (cp_ctx.task_status_) { + switch (ctx->task_status_) { case OB_TTL_TASK_PREPARE: { ObMySQLTransaction trans; ObTTLStatusFieldArray filters; @@ -997,61 +1129,100 @@ int ObTTLManager::sync_sys_table(ObPartitionKey& pkey) ObTTLStatusFieldArray filter; bool commit = false; int tmp_ret = OB_SUCCESS; - if (OB_FAIL(construct_task_record_filter(cp_ctx.task_info_.task_id_, - cp_ctx.task_info_.pkey_.get_table_id(), - cp_ctx.task_info_.pkey_.get_partition_id(), + if (OB_FAIL(construct_task_record_filter(ctx->task_info_.task_id_, + ctx->task_info_.pkey_.get_table_id(), + ctx->task_info_.pkey_.get_partition_id(), filters))) { LOG_WARN("fail to construct task record filter", K(ret)); } else if (OB_FAIL(trans.start(get_sql_proxy(), tenant_id))) { - LOG_WARN("fail to start transation", K(ret)); + LOG_WARN("fail to start transation", K(ret), K(tenant_id)); } else if (OB_FAIL(ObTTLUtil::read_ttl_tasks(tenant_id, share::OB_ALL_KV_TTL_TASK_TNAME, - trans, filters, ttl_records, true, &allocator))) { + trans, filters, ttl_records, true, &allocator))) { LOG_WARN("fail to get ttl tasks", K(ret)); } else { if (ttl_records.empty()) { - if (OB_FAIL(construct_sys_table_record(&cp_ctx, ttl_record))) { + if (OB_FAIL(construct_sys_table_record(ctx, ttl_record))) { LOG_WARN("fail to construct sys table record", K(ret)); } else if (OB_FAIL(ObTTLUtil::insert_ttl_task(tenant_id, share::OB_ALL_KV_TTL_TASK_TNAME, - trans, ttl_record))) { + trans, ttl_record))) { LOG_WARN("fail to insert ttl task", K(ret)); } } else { if (ttl_records.count() != 1) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpect ttl records count", K(ret), K(ttl_records.count())); - } else if (OB_FAIL(from_ttl_record(pkey, ttl_records.at(0), allocator))) { - LOG_WARN("fail to convert from ttl record", K(ret)); - } + } else { /* do nothing, prepare task only sync once with true need_refrsh flag */ } } } - //do commit - commit = (OB_SUCCESS == ret); - tmp_ret = ret; - if (OB_FAIL(trans.end(commit))) { - LOG_WARN("fail to end transaction", K(ret), K(commit)); - } - ret = OB_SUCCESS == tmp_ret ? ret : tmp_ret; + if (trans.is_started()) { + bool commit = (OB_SUCCESS == ret); + int tmp_ret = ret; + if (OB_FAIL(trans.end(commit))) { + LOG_WARN("faile to end trans", "commit", commit, K(ret)); + } + ret = tmp_ret == OB_SUCCESS ? ret : tmp_ret; + } - //change prepare state to running/pending - if (OB_SUCC(ret) && OB_FAIL(try_schedule_prepare_task(pkey))) { + // change prepare state to running/pending + if (OB_SUCC(ret) && OB_FAIL(try_schedule_prepare_task(pkey))) { LOG_WARN("fail to schedule prepare task", K(ret)); } break; } + case OB_TTL_TASK_FINISH: case OB_TTL_TASK_RUNNING: case OB_TTL_TASK_PENDING: - case OB_TTL_TASK_CANCEL: - case OB_TTL_TASK_FINISH: { - if (OB_FAIL(construct_sys_table_record(&cp_ctx, ttl_record))) { - LOG_WARN("fail to construct sys table record", K(ret)); - } else if (OB_FAIL(ObTTLUtil::update_ttl_task_all_fields(tenant_id, + case OB_TTL_TASK_CANCEL: { + ObMySQLTransaction trans; + ObTTLStatusFieldArray filters; + common::ObTTLStatusArray ttl_records; + ObTTLStatusFieldArray filter; + bool commit = false; + int tmp_ret = OB_SUCCESS; + if (OB_FAIL(construct_task_record_filter(ctx->task_info_.task_id_, + ctx->task_info_.pkey_.get_table_id(), + ctx->task_info_.pkey_.get_partition_id(), + filters))) { + LOG_WARN("fail to construct task record filter", K(ret)); + } else if (OB_FAIL(trans.start(get_sql_proxy(), tenant_id))) { + LOG_WARN("fail to start transation", K(ret)); + } else if (OB_FAIL(ObTTLUtil::read_ttl_tasks(tenant_id, share::OB_ALL_KV_TTL_TASK_TNAME, + trans, filters, ttl_records, true, &allocator))) { + LOG_WARN("fail to get ttl tasks", K(ret)); + } else { + if (ttl_records.empty()) { + if (OB_FAIL(construct_sys_table_record(ctx, ttl_record))) { + LOG_WARN("fail to construct sys table record", K(ret)); + } else if (OB_FAIL(ObTTLUtil::insert_ttl_task(tenant_id, share::OB_ALL_KV_TTL_TASK_TNAME, + trans, ttl_record))) { + LOG_WARN("fail to insert ttl task", K(ret)); + } + } else { + if (ttl_records.count() != 1) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpect ttl records count", K(ret), K(ttl_records.count())); + } else if (OB_FAIL(construct_sys_table_record(ctx, ttl_record))) { + LOG_WARN("fail to construct sys table record", K(ret)); + } else if (OB_FAIL(ObTTLUtil::update_ttl_task_all_fields(tenant_id, share::OB_ALL_KV_TTL_TASK_TNAME, - *get_sql_proxy(), ttl_record))) { - LOG_WARN("fail to update ttl task in sys table", K(ret), K(ttl_record)); + trans, ttl_record))) { + LOG_WARN("fail to update ttl task in sys table", K(ret), K(ttl_record)); + } + } + } + + if (trans.is_started()) { + bool commit = (OB_SUCCESS == ret); + int tmp_ret = ret; + if (OB_FAIL(trans.end(commit))) { + LOG_WARN("faile to end trans", "commit", commit, K(ret)); + } + ret = tmp_ret == OB_SUCCESS ? ret : tmp_ret; } break; } + default: { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected ttl task status", K(ret)); @@ -1135,16 +1306,12 @@ int ObTTLManager::construct_task_record_filter(const uint64_t& task_id, return ret; } -// deprecated -int ObTTLManager::deep_copy_all_tenant_ctxs(common::ObSArray& ctx_array, - common::ObArenaAllocator& allocator, - uint64_t tenant_id) +int ObTTLManager::copy_all_tenant_ctxs(common::ObSArray& ctx_array, uint64_t tenant_id) { int ret = OB_SUCCESS; common::ObSpinLockGuard guard(lock_); ObTTLTenantInfo* tenant_info = get_tenant_info(tenant_id, false); ObTTLTaskCtx* ctx = NULL; - ObTTLTaskCtx cp_ctx; if (OB_ISNULL(tenant_info)) { ret = OB_ERR_NULL_VALUE; LOG_WARN("fail to get ttl tenant info", K(ret)); @@ -1155,14 +1322,8 @@ int ObTTLManager::deep_copy_all_tenant_ctxs(common::ObSArray& ctx_ if (OB_ISNULL(ctx)) { ret = OB_ERR_NULL_VALUE; LOG_WARN("fatal err, ttl ctx in map is null", K(tenant_info->tenant_id_), K(ret)); - } else { - cp_ctx = *ctx; - if (!ctx->task_info_.row_key_.empty() && - OB_FAIL(ob_write_string(allocator, ctx->task_info_.row_key_, cp_ctx.task_info_.row_key_))) { - LOG_WARN("fail to deep copy first key", K(ret)); - } else if (OB_FAIL(ctx_array.push_back(cp_ctx))) { - LOG_WARN("fail to push back ctx array", K(ret)); - } + } else if (OB_FAIL(ctx_array.push_back(ctx))) { + LOG_WARN("fail to push back ctx array", K(ret)); } } } @@ -1173,40 +1334,67 @@ int ObTTLManager::move_record_to_history_table(uint64_t tenant_id) { int ret = OB_SUCCESS; common::ObArenaAllocator allocator; - common::ObSArray need_move_ctxs; + common::ObSArray need_move_ctxs; + uint64_t move_rows_cnt = 0; + ObTTLTenantInfo* tenant_info = get_tenant_info(tenant_id, false); - if (OB_FAIL(deep_copy_all_tenant_ctxs(need_move_ctxs, allocator, tenant_id))) { + if (OB_ISNULL(tenant_info)) { + ret = OB_ERR_NULL_VALUE; + LOG_WARN("fail to get ttl tenant info", K(ret)); + } else if (!tenant_info->is_finished_) { + LOG_INFO("new partition leader on , cannot move right now", K(ret), KPC(tenant_info)); + } else if (OB_FAIL(copy_all_tenant_ctxs(need_move_ctxs, tenant_id))) { LOG_WARN("fail to deep copy ctx", K(ret)); } else { for (int i = 0; i < need_move_ctxs.count() && OB_SUCC(ret); ++i) { - ObMySQLTransaction trans; - common::ObTTLStatus ttl_record; - if (OB_FAIL(construct_sys_table_record(&(need_move_ctxs.at(i)), ttl_record))) { - LOG_WARN("fail to construct sys table record", K(ret), K(need_move_ctxs.at(i))); - } else if (OB_FAIL(trans.start(get_sql_proxy(), ttl_record.tenant_id_))) { - LOG_WARN("failt to start trans", K(ret), K(ttl_record.tenant_id_)); - } else { - ObTTLStatusKey key(ttl_record.tenant_id_, ttl_record.table_id_, - ttl_record.partition_id_, ttl_record.task_id_); - if (OB_FAIL(ObTTLUtil::insert_ttl_task(ttl_record.tenant_id_, - share::OB_ALL_KV_TTL_TASK_HISTORY_TNAME, - trans, ttl_record))) { - LOG_WARN("fail to insert ttl task into __all_ttl_task_status_history.", K(ret)); - } else if (OB_FAIL(common::ObTTLUtil::delete_ttl_task(ttl_record.tenant_id_, - share::OB_ALL_KV_TTL_TASK_TNAME, - trans, key))) { - LOG_WARN("fail to delete ttl tasks status", K(ret)); - } - bool commit = (OB_SUCCESS == ret); - int tmp_ret = ret; - if (OB_FAIL(trans.end(commit))) { - LOG_WARN("fail to end transaction", K(ret), K(commit)); + if (!need_move_ctxs.at(i)->is_moved_) { + ObMySQLTransaction trans; + common::ObTTLStatus ttl_record; + if (OB_FAIL(construct_sys_table_record(need_move_ctxs.at(i), ttl_record))) { + LOG_WARN("fail to construct sys table record", K(ret), KPC(need_move_ctxs.at(i))); + } else if (OB_FAIL(trans.start(get_sql_proxy(), ttl_record.tenant_id_))) { + LOG_WARN("failt to start trans", K(ret), K(ttl_record.tenant_id_)); + } else { + ObTTLStatusKey key(ttl_record.tenant_id_, ttl_record.table_id_, + ttl_record.partition_id_, ttl_record.task_id_); + int64_t affected_rows = 0; + if (OB_FAIL(common::ObTTLUtil::delete_ttl_task(ttl_record.tenant_id_, + share::OB_ALL_KV_TTL_TASK_TNAME, + trans, key, affected_rows))) { + LOG_WARN("fail to delete ttl record in __all_kv_ttl_task", K(ret)); + } else { + if (affected_rows == 1) { + // NOTE: use replace instead of insert , because when old partition leader + // move record to history table first, new partition leader may write partition task + // into task table and move, it will cause OB_ERR_PRIMARY_KEY_DUPLICATE + if (OB_FAIL(ObTTLUtil::replace_ttl_task(ttl_record.tenant_id_, + share::OB_ALL_KV_TTL_TASK_HISTORY_TNAME, + trans, ttl_record))) { + LOG_WARN("fail to replace into ttl task into __all_kv_ttl_task_history.", K(ret)); + } + } else if (affected_rows == 0) { + LOG_INFO("delete affecte 0 row, record maybe moved by other observer", K(ret)); + } else { + LOG_WARN("unexpected affected rows", K(ret), K(affected_rows)); + } + } + + bool commit = (OB_SUCCESS == ret); + int tmp_ret = ret; + if (OB_FAIL(trans.end(commit))) { + LOG_WARN("fail to end transaction", K(ret), K(commit)); + } + ret = OB_SUCCESS == tmp_ret ? ret : tmp_ret; + + if (OB_SUCC(ret)) { + need_move_ctxs.at(i)->is_moved_ = true; + move_rows_cnt += affected_rows; + } } - ret = OB_SUCCESS == tmp_ret ? ret : tmp_ret; - } + } } } - LOG_DEBUG("finish move record to history table", K(ret), K(tenant_id), K(need_move_ctxs.count())); + LOG_INFO("finish move record to history table", K(ret), K(tenant_id), K(need_move_ctxs), K(move_rows_cnt)); return ret; } @@ -1233,7 +1421,7 @@ int ObTTLManager::response_ttl_cmd(const uint64_t& tenant_id, const uint64_t& ta ret = OB_ERR_SYS; LOG_WARN("innner system error, rootserver rpc proxy or rs mgr must not be NULL", K(ret), K(GCTX)); } else if (OB_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_addr))) { - LOG_WARN("fail to get rootservice address", K(ret)); + LOG_WARN("fail to get rootservice address", K(ret), K(rs_addr)); } else if (OB_FAIL(GCTX.rs_rpc_proxy_->to(rs_addr).ttl_response(arg))) { if (OB_TENANT_NOT_EXIST == ret && OB_FAIL(mark_tenant_droped(tenant_id))) { LOG_WARN("fail to mark tenant droped", K(ret), K(tenant_id)); @@ -1254,43 +1442,113 @@ int ObTTLManager::mark_tenant_droped(const uint64_t& tenant_id) if (OB_ISNULL(tenant_info)) { ret = OB_ERR_NULL_VALUE; LOG_WARN("tenant info is null", K(tenant_id)); - } else if (tenant_info->state_ == OB_TTL_TASK_FINISH || tenant_info->state_ == OB_TTL_TASK_CANCEL) { - LOG_INFO("mark tenant droped", K(tenant_info->state_)); - tenant_info->state_ == OB_TTL_TASK_MOVING; - tenant_info->rsp_time_ == OB_INVALID_ID; - tenant_info->is_dirty_ = true; - tenant_info->is_droped_ = true; + } else { + bool tenant_not_exist = false; + schema::ObSchemaGetterGuard schema_guard; + if (OB_FAIL(schema::ObMultiVersionSchemaService::get_instance().get_schema_guard(schema_guard))) { // double check + LOG_WARN("fail to get schema guard", K(ret)); + } else if (OB_FAIL(schema_guard.check_if_tenant_has_been_dropped(tenant_id, tenant_not_exist))) { + LOG_WARN("fail to check tenant exists", K(ret), K(tenant_id)); + } else if (tenant_not_exist) { + // tenant_info->is_dirty_ = false; // no need to scan and sync sys table + // tenant_info->need_check_ = false; // no need to check partitions + // tenant_info->rsp_time_ == OB_INVALID_ID; // no need response + // tenant_info->state_ = OB_TTL_TASK_INVALID; + // tenant_info->is_droped_ = true; + // tenant_info->part_task_map_.reuse(); + if (OB_FAIL(ttl_tenant_parts_map_.erase_refactored(tenant_id))) { + LOG_WARN("fail to erase tenant info", K(tenant_id)); + } else { + tenant_info->destory(); + allocator_.free(tenant_info); + } + LOG_INFO("tenant is droped, drop tenant info directly", K(tenant_id)); + } } return ret; } -int ObTTLManager::from_ttl_record(ObPartitionKey& pkey, common::ObTTLStatus& record, - common::ObArenaAllocator& allocator) +int ObTTLManager::check_and_reset_droped_tenant() { int ret = OB_SUCCESS; common::ObSpinLockGuard guard(lock_); - ObTTLTaskCtx* ctx = get_one_partition_ctx(pkey); - char *rowkey_buf = NULL; - if (OB_ISNULL(ctx)) { - ret = OB_ERR_NULL_VALUE; - LOG_WARN("unexpected null tenant info", K(ret)); - } else if (pkey.get_tenant_id() != record.tenant_id_ || - pkey.get_partition_id() != record.partition_id_ || - pkey.get_table_id() != record.table_id_) { + for (ttl_tenants_iterator iter = ttl_tenant_parts_map_.begin(); + iter != ttl_tenant_parts_map_.end() && OB_SUCC(ret); ++iter) { + uint64_t tenant_id = iter->first; + ObTTLTenantInfo *tenant_info = iter->second; + if (OB_ISNULL(tenant_info)) { + ret = OB_ERR_NULL_VALUE; + LOG_WARN("tenant info is null", K(ret), K(tenant_id)); + } else { + bool tenant_not_exist = false; + schema::ObSchemaGetterGuard schema_guard; + if (OB_FAIL(schema::ObMultiVersionSchemaService::get_instance().get_schema_guard(schema_guard))) { + LOG_WARN("fail to get schema guard", K(ret)); + } else if (OB_FAIL(schema_guard.check_if_tenant_has_been_dropped(tenant_id, tenant_not_exist))) { + LOG_WARN("fail to check tenant exists", K(ret), K(tenant_id)); + } else if (tenant_not_exist) { + tenant_info->is_dirty_ = false; // no need to scan and sync sys table + tenant_info->need_check_ = false; // no need to check partitions + tenant_info->rsp_time_ == OB_INVALID_ID; // no need response + tenant_info->state_ = OB_TTL_TASK_INVALID; + tenant_info->is_droped_ = true; + tenant_info->part_task_map_.reuse(); + LOG_INFO("tenant is droped, reset partition task map", K(tenant_id), KPC(tenant_info)); + } + } + } + return ret; +} + +int ObTTLManager::from_ttl_record(ObPartitionKey& pkey, common::ObTTLStatus& record, bool with_status /*true*/, bool with_err_code /*true*/) +{ + int ret = OB_SUCCESS; + if (!pkey.is_valid()) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("fatel error, record not match pkey", K(pkey), K(record)); + LOG_WARN("invalid arguments", K(ret)); } else { - ctx->task_info_.err_code_ = record.ret_code_.compare("OB_SUCCESS") == 0 ? OB_SUCCESS : OB_INVALID_ERROR; - ctx->task_info_.ttl_del_cnt_ = record.ttl_del_cnt_; - ctx->task_info_.max_version_del_cnt_ = record.max_version_del_cnt_; - ctx->task_info_.scan_cnt_ = record.scan_cnt_; - char *rowkey_buf = static_cast(allocator.alloc(record.row_key_.length())); - if (OB_ISNULL(rowkey_buf)) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to alloc memory", K(ret)); + common::ObSpinLockGuard guard(lock_); + ObTTLTenantInfo *tenant_info = get_tenant_info(pkey.get_tenant_id(), false); + ObTTLTaskCtx* ctx = get_one_partition_ctx(pkey); + if (OB_ISNULL(ctx) || OB_ISNULL(tenant_info)) { + ret = OB_ERR_NULL_VALUE; + LOG_WARN("unexpected null value", K(ret), KP(ctx), KP(tenant_info)); + } else if (pkey.get_tenant_id() != record.tenant_id_ || + pkey.get_partition_id() != record.partition_id_ || + pkey.get_table_id() != record.table_id_) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("fatel error, record not match pkey", K(pkey), K(record)); } else { - MEMCPY(rowkey_buf, record.row_key_.ptr(), record.row_key_.length()); - ctx->task_info_.row_key_.assign(rowkey_buf, record.row_key_.length()); + ctx->task_info_.pkey_ = pkey; + ctx->task_info_.task_id_ = record.task_id_; + ctx->task_start_time_ = record.task_start_time_; + ctx->task_end_time_ = record.task_update_time_; + ctx->task_info_.is_user_trigger_ = record.trigger_type_ == TRIGGER_TYPE::USER_TRIGGER; + ctx->task_info_.ttl_del_cnt_ = record.ttl_del_cnt_; + ctx->task_info_.max_version_del_cnt_ = record.max_version_del_cnt_; + ctx->task_info_.scan_cnt_ = record.scan_cnt_; + if (with_err_code) { + if (record.ret_code_.compare("OB_SUCCESS") == 0) { + ctx->task_info_.err_code_ = OB_SUCCESS; + } else if (record.ret_code_.compare("OB_NOT_MASTER") == 0) { + ctx->task_info_.err_code_ = OB_NOT_MASTER; + } else { + ctx->task_info_.err_code_ = OB_INVALID_ERROR; + } + } + if (with_status) { + ctx->task_status_ = static_cast(record.status_); + } + if (!record.row_key_.empty()) { + char *rowkey_buf = static_cast(tenant_info->allocator_.alloc(record.row_key_.length())); + if (OB_ISNULL(rowkey_buf)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to alloc memory", K(ret)); + } else { + MEMCPY(rowkey_buf, record.row_key_.ptr(), record.row_key_.length()); + ctx->task_info_.row_key_.assign(rowkey_buf, record.row_key_.length()); + } + } } } LOG_DEBUG("finish from ttl record", K(ret), K(pkey)); @@ -1304,10 +1562,10 @@ bool ObTTLManager::can_schedule_tenant(const ObTTLTenantInfo &tenant_info) bool ObTTLManager::can_schedule_task(const ObTTLTaskCtx &ttl_task) { - return ttl_task.task_status_ == OB_TTL_TASK_PENDING || ttl_task.task_status_ == OB_TTL_TASK_PREPARE; + return ttl_task.task_status_ == OB_TTL_TASK_PENDING; } -int ObTTLManager::try_schedule_remaining_tasks(ObTTLTenantInfo* tenant_info) +int ObTTLManager::try_schedule_remaining_tasks(ObTTLTenantInfo* tenant_info, const ObTTLTaskCtx *current_ctx) { int ret = OB_SUCCESS; if (OB_ISNULL(tenant_info)) { @@ -1322,22 +1580,24 @@ int ObTTLManager::try_schedule_remaining_tasks(ObTTLTenantInfo* tenant_info) if (OB_ISNULL(ctx)) { ret = OB_ERR_NULL_VALUE; LOG_ERROR("fatal err, ttl ctx in map is null", K(ret), K(tenant_info->tenant_id_)); + } else if (current_ctx == ctx) { + // do nothing } else if (can_schedule_task(*ctx)) { if (OB_FAIL(try_schedule_task(tenant_info, ctx))) { - if (OB_EAGAIN != ret) { + if (OB_SIZE_OVERFLOW != ret) { LOG_WARN("fail to schedule task", K(ret)); } } } } - if (OB_EAGAIN == ret) { + if (OB_SIZE_OVERFLOW == ret) { ret = OB_SUCCESS; } } return ret; } -// try schedule partition task, reutrn OB_EAGAIN if dag scheduler is full +// try schedule partition task, reutrn OB_SIZE_OVERFLOW if dag scheduler is full int ObTTLManager::try_schedule_task(ObTTLTenantInfo* tenant_info, ObTTLTaskCtx* ctx) { int ret = OB_SUCCESS; @@ -1345,7 +1605,11 @@ int ObTTLManager::try_schedule_task(ObTTLTenantInfo* tenant_info, ObTTLTaskCtx* LOG_WARN("invalid argument", K(ret), KP(tenant_info), KP(ctx)); } else if (can_schedule_tenant(*tenant_info) && can_schedule_task(*ctx)) { if (OB_FAIL(generate_ttl_dag(ctx->task_info_, ctx->ttl_para_))) { - if (OB_EAGAIN != ret) { + if (OB_EAGAIN == ret) { + ret = OB_SUCCESS; + } else if (OB_SIZE_OVERFLOW == ret) { + // do noting + } else { LOG_WARN("fail to generate dag task", K(ret)); } } else { @@ -1371,3 +1635,49 @@ void ObTTLManager::mark_ttl_ctx_dirty(ObTTLTenantInfo* tenant_info, ObTTLTaskCtx } } +int ObTTLManager::refresh_partition_task(ObTTLTaskCtx &ttl_task, bool refresh_status, bool refresh_retcode /*false*/) +{ + int ret = OB_SUCCESS; + ObMySQLTransaction trans; + ObTTLStatusFieldArray filters; + common::ObTTLStatusArray ttl_records; + ObTTLStatusFieldArray filter; + ObPartitionKey pkey = ttl_task.task_info_.pkey_; + uint64_t tenant_id = pkey.get_tenant_id(); + ObTTLTenantInfo *tenant_info = get_tenant_info(pkey.get_tenant_id(), false); + if (!ttl_task.is_valid() || OB_ISNULL(tenant_info)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(ttl_task), KP(tenant_info)); + } else if (OB_FAIL(construct_task_record_filter(ttl_task.task_info_.task_id_, + pkey.get_table_id(), + pkey.get_partition_id(), + filters))) { + LOG_WARN("fail to construct task record filter", K(ret), K(ttl_task)); + } else if (OB_FAIL(trans.start(get_sql_proxy(), tenant_id))) { + LOG_WARN("fail to start transation", K(ret)); + } else if (OB_FAIL(ObTTLUtil::read_ttl_tasks(tenant_id, share::OB_ALL_KV_TTL_TASK_TNAME, + trans, filters, ttl_records, true, &tenant_info->allocator_))) { + LOG_WARN("fail to get ttl tasks", K(ret), K(tenant_id), K(filters)); + } else { + if (ttl_records.empty()) { + // do nothing + } else { + if (ttl_records.count() != 1) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpect ttl records count", K(ret), K(ttl_records.count())); + } else if (OB_FAIL(from_ttl_record(pkey, ttl_records.at(0), refresh_status, refresh_retcode))) { + LOG_WARN("fail to convert from ttl record", K(ret), K(pkey), K(refresh_status)); + } + } + } + if (trans.is_started()) { + bool commit = (OB_SUCCESS == ret); + int tmp_ret = ret; + if (OB_FAIL(trans.end(commit))) { + LOG_WARN("faile to end trans", "commit", commit, K(ret)); + } + ret = tmp_ret == OB_SUCCESS ? ret : tmp_ret; + } + + return ret; +} diff --git a/src/observer/table/ob_table_ttl_manager.h b/src/observer/table/ob_table_ttl_manager.h index 5cd86c796a..9c1bd6ed93 100644 --- a/src/observer/table/ob_table_ttl_manager.h +++ b/src/observer/table/ob_table_ttl_manager.h @@ -35,7 +35,9 @@ public : task_end_time_(OB_INVALID_ID), failure_times_(0), rsp_time_(OB_INVALID_ID), - is_dirty_(false) {} + is_dirty_(false), + is_moved_(false), + need_refresh_(true) {} bool is_valid() { return task_info_.is_valid() && ttl_para_.is_valid(); @@ -43,10 +45,9 @@ public : TO_STRING_KV(K_(task_info), K_(task_status), K_(ttl_para), K_(task_start_time), K_(last_modify_time), K_(task_end_time), K_(failure_times), - K_(rsp_time), K_(is_dirty)); + K_(rsp_time), K_(is_dirty), K_(is_moved), K_(need_refresh)); public: - ObTTLTaskInfo task_info_; common::ObTTLTaskStatus task_status_; ObTTLPara ttl_para_; @@ -57,7 +58,9 @@ public: int64_t rsp_time_; bool is_invalid_; - bool is_dirty_; + bool is_dirty_; // should sync sys table for tasks + bool is_moved_; + bool need_refresh_; // should refresh task from task table }; class OBTTLTimerPeriodicTask : public common::ObTimerTask { @@ -80,7 +83,7 @@ public: bool is_usr_trigger, obrpc::ObTTLRequestArg::TTLRequestType cmd); int report_task_status(ObTTLTaskInfo& task_info, ObTTLPara& task_para, bool& is_stop); - void on_leader_active(storage::ObIPartitionGroup* partition); + void on_leader_active(const ObPartitionKey& pkey); void on_schema_changed(uint64_t schema_changed_tenant_id); /*timer handle function*/ @@ -105,26 +108,40 @@ private: cmd_type_(obrpc::ObTTLRequestArg::TTL_INVALID_TYPE), rsp_time_(OB_INVALID_ID), state_(common::ObTTLTaskStatus::OB_TTL_TASK_INVALID), - is_droped_(false) {} - PartTasksMap part_task_map_; - common::ObArenaAllocator allocator_; - uint64_t tenant_id_; - uint64_t task_id_; - bool is_usr_trigger_; - bool need_check_; /*need scan partition & check*/ - bool is_dirty_; /*need check the current ctx task*/ - bool ttl_continue_; - obrpc::ObTTLRequestArg::TTLRequestType cmd_type_; - int64_t rsp_time_; - common::ObTTLTaskStatus state_; - bool is_droped_; - - public: + is_droped_(false), + is_finished_(false) + {} void destory() { part_task_map_.destroy(); allocator_.reset(); } + TO_STRING_KV(K_(tenant_id), + K_(task_id), + K_(is_usr_trigger), + K_(need_check), + K_(is_dirty), + K_(ttl_continue), + K_(cmd_type), + K_(rsp_time), + K_(state), + K_(is_droped), + K_(is_finished)); + + public: + PartTasksMap part_task_map_; + common::ObArenaAllocator allocator_; + uint64_t tenant_id_; + uint64_t task_id_; + bool is_usr_trigger_; + bool need_check_; /*need scan partition & check*/ + bool is_dirty_; /*need check the current ctx task*/ + bool ttl_continue_; + obrpc::ObTTLRequestArg::TTLRequestType cmd_type_; + int64_t rsp_time_; // OB_INVALID_ID means no need response + common::ObTTLTaskStatus state_; + bool is_droped_; // tenant is droped + bool is_finished_; // all delete task is finished (or canceled) }; typedef common::hash::ObHashMap TenantPartsMap; @@ -142,7 +159,7 @@ private: int generate_one_partition_task(ObTTLTaskInfo& task_info, ObTTLPara& para); int get_ttl_para_from_schema(const share::schema::ObTableSchema *table_schema, ObTTLPara& para, bool& is_tableapi_schema); - int check_partition_can_gen_ttl(storage::ObIPartitionGroup *partition, + int check_partition_can_gen_ttl(const ObPartitionKey& pkey, ObTTLPara ¶, bool& can_ttl); int check_and_do_rsp(uint64_t tenant_id); void mark_tenant_need_check(uint64_t tenant_id); @@ -163,20 +180,22 @@ private: int sync_sys_table(ObPartitionKey& pkey); int construct_sys_table_record(ObTTLTaskCtx* ctx, common::ObTTLStatus& ttl_record); int try_schedule_task(ObTTLTenantInfo* tenant_info, ObTTLTaskCtx* ctx); - int try_schedule_remaining_tasks(ObTTLTenantInfo* tenant_info); + int try_schedule_remaining_tasks(ObTTLTenantInfo* tenant_info, const ObTTLTaskCtx *current_ctx); bool can_schedule_tenant(const ObTTLTenantInfo &tenant_info); bool can_schedule_task(const ObTTLTaskCtx &ttl_task); - int check_cmd_state_valid(common::ObTTLTaskStatus current_state, common::ObTTLTaskStatus incoming_state); - int deep_copy_all_tenant_ctxs(common::ObSArray& ctx_array, common::ObArenaAllocator& allocator, - uint64_t tenant_id); - int from_ttl_record(ObPartitionKey& pkey, common::ObTTLStatus& record, - common::ObArenaAllocator& allocator); + int check_cmd_state_valid(const common::ObTTLTaskStatus current_state, + const common::ObTTLTaskStatus incoming_state); + int copy_all_tenant_ctxs(common::ObSArray& ctx_array, uint64_t tenant_id); + int from_ttl_record(ObPartitionKey& pkey, common::ObTTLStatus& record, bool with_status = true, bool with_err_code = true); void mark_ttl_ctx_dirty(ObTTLTenantInfo* tenant_info, ObTTLTaskCtx* ctx); void check_ttl_tenant_state(uint64_t tenant_id); int transform_cmd_to_state(const obrpc::ObTTLRequestArg::TTLRequestType& cmd, common::ObTTLTaskStatus& state); int try_schedule_prepare_task(ObPartitionKey& pkey); void mark_tenant_checked(uint64_t tenant_id); int mark_tenant_droped(const uint64_t& tenant_id); + int check_and_reset_droped_tenant(); + obrpc::ObTTLRequestArg::TTLRequestType transform_state_to_cmd(const int64_t state); + int refresh_partition_task(ObTTLTaskCtx &ttl_task, bool refresh_status, bool refresh_retcode = false); private: static const int64_t DEFAULT_TTL_BUCKET_NUM = 100; @@ -192,6 +211,7 @@ private: common::ObTimer ttl_timer_; OBTTLTimerPeriodicTask periodic_task_; common::ObSpinLock lock_; + bool is_first_cmd_; // recovery tenant info after restart private: void stop(); @@ -203,7 +223,8 @@ private: periodic_delay_(TTL_PERIODIC_DELAY), ttl_timer_(), periodic_task_(), - lock_() + lock_(), + is_first_cmd_(true) {} ~ObTTLManager() {} }; diff --git a/src/observer/table/ob_table_ttl_task.cpp b/src/observer/table/ob_table_ttl_task.cpp index 4c3a285d0a..a9cc6f2afb 100644 --- a/src/observer/table/ob_table_ttl_task.cpp +++ b/src/observer/table/ob_table_ttl_task.cpp @@ -55,10 +55,10 @@ int ObTableTTLDeleteTask::init(const ObTTLPara &ttl_para, ObTTLTaskInfo &ttl_inf ObRowkey rowkey; int64_t pos = 0; if (OB_FAIL(rowkey.deserialize(allocator_, ttl_info.row_key_.ptr(), ttl_info.row_key_.length(), pos))) { - LOG_WARN("fail to deserialize rowkey", K(ret)); + LOG_WARN("fail to deserialize rowkey", K(ret), K(ttl_info.row_key_)); } else if (rowkey.get_obj_cnt() != 2) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(rowkey)); + LOG_WARN("invalid argument", K(ret), K(rowkey), K(ttl_info.row_key_)); } else { first_key_ = rowkey.get_obj_ptr()[0].get_string(); second_key_ = rowkey.get_obj_ptr()[1].get_string(); @@ -326,7 +326,7 @@ int ObTableTTLDeleteTask::process_one() char *buf = static_cast(allocator_.alloc(buf_len)); int64_t pos = 0; if (OB_FAIL(row_key.serialize(buf, buf_len, pos))) { - LOG_WARN("fail to deserialize", K(ret)); + LOG_WARN("fail to serialize", K(ret)); } else { result_key.assign_ptr(buf, buf_len); } diff --git a/src/rootserver/ob_ttl_scheduler.cpp b/src/rootserver/ob_ttl_scheduler.cpp index da642f031b..ebfd8227ba 100644 --- a/src/rootserver/ob_ttl_scheduler.cpp +++ b/src/rootserver/ob_ttl_scheduler.cpp @@ -23,16 +23,6 @@ namespace oceanbase namespace rootserver { -#define OB_TTL_RESPONSE_MASK (1 << 5) -#define OB_TTL_STATUS_MASK (OB_TTL_RESPONSE_MASK - 1) - -#define SET_TASK_PURE_STATUS(status, state) ((status) = ((state) & OB_TTL_STATUS_MASK) + ((status & OB_TTL_RESPONSE_MASK))) -#define SET_TASK_RESPONSE(status, state) ((status) |= (((state) & 1) << 5)) -#define SET_TASK_STATUS(status, pure_status, is_responsed) { SET_TASK_PURE_STATUS(status, pure_status), SET_TASK_RESPONSE(status, is_responsed); } - -#define EVAL_TASK_RESPONSE(status) (((status) & OB_TTL_RESPONSE_MASK) >> 5) -#define EVAL_TASK_PURE_STATUS(status) (static_cast((status) & OB_TTL_STATUS_MASK)) - ObClearTTLStatusHistoryTask::ObClearTTLStatusHistoryTask(ObRootService& rs) : root_service_(rs) { @@ -158,6 +148,8 @@ int ObTTLTenantTaskMgr::add_tenant(uint64_t tenant_id) tenant_task.tenant_id_ = tenant_id; if (OB_FAIL(ten_task_arr_.push_back(tenant_task))) { LOG_WARN("fail to store tenant task", K(ret)); + } else { + LOG_INFO("add tenant to tenant task array", K(tenant_id), K(tenant_task)); } } return ret; @@ -170,6 +162,7 @@ void ObTTLTenantTaskMgr::delete_tenant(uint64_t tenant_id) if (ten_task.tenant_id_ == tenant_id) { ten_task.reset(); ten_task_arr_.remove(i); + LOG_INFO("remove tennat task in tenant task array", K(tenant_id), K(ten_task_arr_)); break; } } @@ -306,7 +299,7 @@ int ObTTLTenantTaskMgr::add_ttl_task(uint64_t tenant_id, } if (curr_state != next_state) { - if (OB_FAIL(update_task_status(tenant_id, rs_task.ttl_status_.task_id_, static_cast(next_state)))) { + if (OB_FAIL(update_task_status(tenant_id, rs_task.ttl_status_.task_id_, static_cast(next_state), *GCTX.sql_proxy_))) { LOG_WARN("fail to update ttl tasks", K(ret)); } else { // update memory status only @@ -348,7 +341,7 @@ int ObTTLTenantTaskMgr::add_ttl_task(uint64_t tenant_id, } else if (next_state != curr_state) { // update status if (OB_FAIL(update_task_status(tenant_id, rs_task.ttl_status_.task_id_, - static_cast(next_state)))) { + static_cast(next_state), *GCTX.sql_proxy_))) { LOG_WARN("fail to update ttl tasks", K(ret)); } else { rs_task.all_responsed_ = false; @@ -382,6 +375,12 @@ int ObTTLTenantTaskMgr::add_ttl_task(uint64_t tenant_id, } } + if (ret == OB_EAGAIN) { + // it's a success cmd, cannot return OB_EAGAIN to user + LOG_INFO("reset OB_EAGAIN to OB_SUCCESS, because ttl scheduler will resend later"); + ret = OB_SUCCESS; + } + return ret; } @@ -398,7 +397,7 @@ int ObTTLTenantTaskMgr::add_ttl_task_internal(uint64_t tenant_id, if (OB_FAIL(get_tenant_ptr(tenant_id, tenant_ptr))) { LOG_WARN("fail to get tenant task ptr", K(ret)); } else if (OB_FAIL(in_active_time(tenant_id, is_active_time))) { - LOG_WARN("fail to eval active time", K(ret)); + LOG_WARN("fail to eval active time", K(ret)); } else { bool enable_ttl = is_enable_ttl(tenant_id); ObTTLTenantTask& tenant_ref = *tenant_ptr; @@ -437,7 +436,7 @@ bool ObTTLTenantTaskMgr::need_retry_task(RsTenantTask& rs_task) { bool bool_ret = false; int64_t cur_time = ObTimeUtility::current_time(); - bool_ret = (cur_time - rs_task.ttl_status_.task_update_time_ < OB_TTL_TASK_RETRY_INTERVAL) || + bool_ret = (cur_time - rs_task.ttl_status_.task_update_time_ >= OB_TTL_TASK_RETRY_INTERVAL) || (rs_task.server_infos_.count() == 0); return bool_ret; } @@ -485,7 +484,7 @@ int ObTTLTenantTaskMgr::process_tenant_tasks(uint64_t tenant_id) ObTTLTaskType ttl_task_type = ObTTLTaskType::OB_TTL_INVALID; LOG_INFO("process_tenant_tasks begin", K(tenant_id), K(task_count), K(status_responsed), - K(curr_state), K(task_count)); + K(curr_state), K(task_count)); if (task_count > 1) { if (!(curr_state == ObTTLTaskStatus::OB_RS_TTL_TASK_CANCEL || @@ -510,8 +509,8 @@ int ObTTLTenantTaskMgr::process_tenant_tasks(uint64_t tenant_id) } } } else { - LOG_INFO("process_tenant_tasks begin", K(tenant_id), K(task_count), - K(curr_state), K(task_count)); + LOG_INFO("process_tenant_tasks begin", K(tenant_id), K(task_count), K(status_responsed), + K(curr_state), K(task_count)); // task_count == 1 if (status_responsed) { next_state = next_status(curr_state); @@ -551,7 +550,7 @@ int ObTTLTenantTaskMgr::process_tenant_tasks(uint64_t tenant_id) if (OB_SUCC(ret) && curr_state != next_state) { if (OB_FAIL(update_task_status(tenant_id, cur_task.ttl_status_.task_id_, - static_cast(next_state)))) { + static_cast(next_state), *GCTX.sql_proxy_))) { LOG_WARN("fail to update ttl tasks", K(ret)); } else { cur_task.all_responsed_ = false; @@ -589,7 +588,8 @@ int ObTTLTenantTaskMgr::insert_tenant_task(ObTTLStatus& ttl_task) int ObTTLTenantTaskMgr::update_task_status(uint64_t tenant_id, uint64_t task_id, - int64_t status) + int64_t status, + common::ObISQLClient& proxy) { int ret = OB_SUCCESS; ObTTLStatusKey key(tenant_id, OB_INVALID_ID, OB_INVALID_ID, task_id); @@ -611,7 +611,7 @@ int ObTTLTenantTaskMgr::update_task_status(uint64_t tenant_id, } else { if (OB_FAIL(ObTTLUtil::update_ttl_task(tenant_id, share::OB_ALL_KV_TTL_TASK_TNAME, - *GCTX.sql_proxy_, + proxy, key, update_fields))) { LOG_WARN("fail to update ttl task status.", K(ret), K(tenant_id), K(task_id), K(status)); @@ -627,12 +627,13 @@ int ObTTLTenantTaskMgr::delete_task(uint64_t tenant_id, uint64_t task_id) { int ret = OB_SUCCESS; ObTTLStatusKey key(tenant_id, OB_INVALID_ID, OB_INVALID_ID, task_id); + int64_t affected_rows = 0; if (OB_FAIL(ObTTLUtil::delete_ttl_task(tenant_id, share::OB_ALL_KV_TTL_TASK_TNAME, - *GCTX.sql_proxy_, key))) { + *GCTX.sql_proxy_, key, affected_rows))) { LOG_WARN("fail to delete ttl tasks status", K(ret), K(tenant_id), K(task_id)); } else { - LOG_DEBUG("success to delete ttl tasks status", K(ret), K(tenant_id), K(task_id)); + LOG_DEBUG("success to delete ttl tasks status", K(ret), K(tenant_id), K(task_id), K(affected_rows)); } return ret; @@ -724,6 +725,8 @@ int ObTTLTenantTaskMgr::refresh_tenant(uint64_t tenant_id) } else if (OB_FAIL(update_tenant_tasks(tenant_id, ttl_tasks))) { LOG_WARN("fail to update tenant tasks", K(ret), K(tenant_id)); } + + LOG_INFO("refresh tenant task from system table", K(tenant_id)); } return ret; } @@ -731,25 +734,19 @@ int ObTTLTenantTaskMgr::refresh_tenant(uint64_t tenant_id) int ObTTLTenantTaskMgr::refresh_all() { int ret = OB_SUCCESS; - if (!need_refresh_) { + ObArray tenant_ids; + if (OB_FAIL(get_tenant_ids(tenant_ids))) { + LOG_WARN("fail to get tenant ids", K(ret)); } else { - ObArray tenant_ids; - - if (OB_FAIL(get_tenant_ids(tenant_ids))) { - LOG_WARN("fail to get tenant ids", K(ret)); - } else { - for (size_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); ++i) { - uint64_t tenant_id = tenant_ids.at(i); - if (OB_FAIL(refresh_tenant(tenant_id))) { - LOG_WARN("fail to refresh tenant", K(ret), K(tenant_id)); - } + LOG_INFO("get all tenant ids", K(tenant_ids)); + for (size_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); ++i) { + uint64_t tenant_id = tenant_ids.at(i); + if (OB_FAIL(refresh_tenant(tenant_id))) { + LOG_WARN("fail to refresh tenant", K(ret), K(tenant_id)); } } - - if (OB_SUCC(ret) && tenant_ids.count() > 0) { - need_refresh_ = false; - } } + return ret; } @@ -803,8 +800,7 @@ int ObTTLTenantTaskMgr::alter_status_and_add_ttl_task(uint64_t tenant_id) if (OB_FAIL(in_active_time(tenant_id, is_active_time))) { LOG_WARN("fail to eval active time", K(ret)); } else if (OB_FAIL(get_tenant_ptr(tenant_id, tenant_ptr))) { - need_refresh_ = true; - LOG_WARN("fail to get tenant task ptr", K(tenant_id), K(ret)); + LOG_WARN("fail to get tenant task ptr, need refresh", K(tenant_id), K(ret)); } else { ObTTLTenantTask& tenant_ref = *tenant_ptr; size_t task_count = tenant_ref.tasks_.count(); @@ -824,7 +820,7 @@ int ObTTLTenantTaskMgr::alter_status_and_add_ttl_task(uint64_t tenant_id) if (!status_responsed) { int64_t tmp_status = 0; SET_TASK_STATUS(tmp_status, cur_state, 1); - if (OB_FAIL(update_task_status(tenant_id, rs_task.ttl_status_.task_id_, tmp_status))) { + if (OB_FAIL(update_task_status(tenant_id, rs_task.ttl_status_.task_id_, tmp_status, *GCTX.sql_proxy_))) { LOG_WARN("fail to update ttl tasks", K(ret)); } else { rs_task.set_servers_not_responsed(); @@ -862,7 +858,7 @@ int ObTTLTenantTaskMgr::alter_status_and_add_ttl_task(uint64_t tenant_id) * send move to servers, update status */ LOG_INFO("alter status and add ttl task", K(next_state)); - if (OB_FAIL(update_task_status(tenant_id, rs_task.ttl_status_.task_id_, next_state))) { + if (OB_FAIL(update_task_status(tenant_id, rs_task.ttl_status_.task_id_, next_state, *GCTX.sql_proxy_))) { LOG_WARN("fail to update ttl tasks", K(ret)); } else { LOG_INFO("alter status and add ttl task", K(next_state)); @@ -1098,7 +1094,7 @@ int ObTTLTenantTaskMgr::dispatch_ttl_request(const TTLServerInfos& server_infos, } } - LOG_INFO("send ttl server ttl request", K(ret), K(arg), K(send_cnt), K(server_infos.count())); + LOG_INFO("send ttl server ttl request", K(ret), K(arg), K(send_cnt), K(server_infos.count()), K(server_infos)); return ret; } @@ -1210,8 +1206,12 @@ void ObTTLTenantTaskMgr::refresh_deleted_tenants() exist = (del_ten_arr_.at(k) == tenant_id); } - if (!exist && OB_FAIL(del_ten_arr_.push_back(tenant_id))) { - LOG_WARN("fail to store deleted tenant id", K(ret)); + if (!exist) { + if (OB_FAIL(del_ten_arr_.push_back(tenant_id))) { + LOG_WARN("fail to store deleted tenant id", K(ret)); + } else { + LOG_INFO("add tennat id to del tenant array", K(ret), K(tenant_id), K(del_ten_arr_)); + } } } } @@ -1248,6 +1248,7 @@ int ObTTLTenantTaskMgr::process_tenant_task_rsp(uint64_t tenant_id, const ObAddr& server_addr) { int ret = OB_SUCCESS; + TTLMGR.refresh_all(); lib::ObMutexGuard guard(mutex_); RsTenantTask* rs_task_ptr = NULL; @@ -1255,8 +1256,10 @@ int ObTTLTenantTaskMgr::process_tenant_task_rsp(uint64_t tenant_id, LOG_WARN("fail to get tasks ptr", K(ret), K(tenant_id), K(task_id)); } else { RsTenantTask& rs_task = *rs_task_ptr; + if (OB_FAIL(rsp_task_status(static_cast(task_type), EVAL_TASK_PURE_STATUS(rs_task.ttl_status_.status_)))) { - LOG_WARN("response task type incorrect", K(ret), K(tenant_id), K(task_id), K(task_type)); + LOG_WARN("response task type incorrect", + K(ret), K(tenant_id), K(task_id), K(task_type), K(EVAL_TASK_PURE_STATUS(rs_task.ttl_status_.status_))); } else if (OB_FAIL(rs_task.set_server_responsed(server_addr))) { LOG_WARN("fail to set server responsed", K(ret), K(tenant_id), K(task_id)); } else if (!EVAL_TASK_RESPONSE(rs_task.ttl_status_.status_) && @@ -1282,22 +1285,49 @@ int ObTTLTenantTaskMgr::update_task_on_all_responsed(RsTenantTask& task) ObTTLTaskStatus next_state, cur_state; cur_state = EVAL_TASK_PURE_STATUS(task.ttl_status_.status_); next_state = next_status(cur_state); + bool is_move = false; int64_t task_status = static_cast(next_state); if (next_state == cur_state) { // move or suspend SET_TASK_RESPONSE(task_status, 1); + if (cur_state == OB_RS_TTL_TASK_MOVE) { + is_move = true; + } } if (task.ttl_status_.status_ == task_status) { // SUSPEND or MOVED - } else if (OB_FAIL(update_task_status(tenant_id, task.ttl_status_.task_id_, task_status))) { - LOG_WARN("fail to update ttl tasks", K(ret), K(task.ttl_status_.task_id_), K(cur_state), K(next_state), K(task_status)); } else { - // update stauts and update time - task.ttl_status_.status_ = task_status; - task.all_responsed_ = false; - task.set_servers_not_responsed(); + ObMySQLTransaction trans; + uint64_t task_id = task.ttl_status_.task_id_; + if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null GCTX.sql_proxy_", K(ret)); + } else if (OB_FAIL(trans.start(GCTX.sql_proxy_, tenant_id))) { + LOG_WARN("fail to start transation", K(ret), K(tenant_id)); + } else if (OB_FAIL(update_task_status(tenant_id, task_id, task_status, *GCTX.sql_proxy_))) { + LOG_WARN("fail to update ttl tasks", K(ret), K(task_id), K(cur_state), K(next_state), K(task_status)); + } else if (is_move && OB_FAIL(ObTTLUtil::remove_all_task_to_history_table(tenant_id, task_id, trans))) { + // NOTE: if parition was removed and observer restart, task won't be moved by observer itself + LOG_WARN("fail to move task to history table", K(tenant_id), K(task_id)); + } else {} + + if (trans.is_started()) { + bool commit = (OB_SUCCESS == ret); + int tmp_ret = ret; + if (OB_FAIL(trans.end(commit))) { + LOG_WARN("faile to end trans", "commit", commit, K(ret)); + } + ret = tmp_ret == OB_SUCCESS ? ret : tmp_ret; + } + + if (OB_SUCC(ret)) { + // update stauts and update time + task.ttl_status_.status_ = task_status; + task.all_responsed_ = false; + task.set_servers_not_responsed(); + } } return ret; @@ -1389,17 +1419,23 @@ void ObTTLScheduler::runTimerTask() int RsTenantTask::set_server_responsed(const ObAddr& server_addr) { int ret = OB_SUCCESS; + bool find_server = false; TTLServerInfos& server_infos = server_infos_; if (OB_UNLIKELY(!server_addr.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(server_addr)); } else { - for (int64_t i = 0; i < server_infos.count(); ++i) { + for (int64_t i = 0; i < server_infos.count() && !find_server; ++i) { if (server_addr == server_infos.at(i).addr_) { server_infos.at(i).is_responsed_ = true; - break; + find_server = true; } } + + if (!find_server) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("cannot find addr in sever infos", K(ret), K(server_addr), K(server_infos)); + } } return ret; } diff --git a/src/rootserver/ob_ttl_scheduler.h b/src/rootserver/ob_ttl_scheduler.h index 1a0aae41ac..caf94d9dcc 100644 --- a/src/rootserver/ob_ttl_scheduler.h +++ b/src/rootserver/ob_ttl_scheduler.h @@ -163,7 +163,6 @@ private: : mutex_(), ten_task_arr_(), del_ten_arr_(), - need_refresh_(true), is_inited_(false) {} int update_task_on_all_responsed(RsTenantTask& task); @@ -181,7 +180,8 @@ private: virtual int update_task_status(uint64_t tenant_id, uint64_t task_id, - int64_t rs_new_status); + int64_t rs_new_status, + common::ObISQLClient& proxy); bool tenant_exist(uint64_t tenant_id); @@ -226,11 +226,9 @@ private: lib::ObMutex mutex_; // lib::ObMutexGuard guard(mutex_); ObArray ten_task_arr_; ObArray del_ten_arr_; - bool need_refresh_; bool is_inited_; - - const int64_t OB_TTL_TASK_RETRY_INTERVAL = 60*1000*1000; // 3min + const int64_t OB_TTL_TASK_RETRY_INTERVAL = 15*1000*1000; // 15s }; #define TTLMGR ObTTLTenantTaskMgr::get_instance() diff --git a/src/share/table/ob_ttl_util.cpp b/src/share/table/ob_ttl_util.cpp index 7ad4c32377..d8fed22661 100644 --- a/src/share/table/ob_ttl_util.cpp +++ b/src/share/table/ob_ttl_util.cpp @@ -29,10 +29,11 @@ bool ObTTLTime::is_same_day(int64_t ttl_time1, int64_t ttl_time2) time_t param1 = static_cast(ttl_time1 / 1000000l); time_t param2 = static_cast(ttl_time2 / 1000000l); - struct tm *t1 = localtime(¶m1); - struct tm *t2 = localtime(¶m1); + struct tm tm1, tm2; + ::localtime_r(¶m1, &tm1); + ::localtime_r(¶m2, &tm2); - return (t1 && t2 && t1->tm_mday == t2->tm_mday); + return (tm1.tm_yday == tm2.tm_yday); } bool ObTTLUtil::extract_val(const char* ptr, uint64_t len, int& val) @@ -135,12 +136,13 @@ int ObTTLUtil::insert_ttl_task(uint64_t tenant_id, " VALUE " "(now(), now(), %ld, %ld, %ld," " %ld, %ld, %ld, %ld, %ld, " - " %ld, %ld, %ld,'%s', '%s')", // 12 + " %ld, %ld, %ld,'%.*s', '%.*s')", // 12 tname, // 0 tenant_id, task.table_id_, task.partition_id_, task.task_id_, task.task_start_time_, task.task_update_time_, task.trigger_type_, task.status_, task.ttl_del_cnt_, task.max_version_del_cnt_, - task.scan_cnt_, task.row_key_.ptr(), task.ret_code_.ptr()))) { + task.scan_cnt_, task.row_key_.length(), task.row_key_.ptr(), + task.ret_code_.length(), task.ret_code_.ptr()))) { LOG_WARN("sql assign fmt failed", K(ret)); } else if (OB_FAIL(proxy.write(tenant_id, sql.ptr(), affect_rows))) { LOG_WARN("fail to execute sql", K(ret), K(sql)); @@ -167,12 +169,13 @@ int ObTTLUtil::update_ttl_task_all_fields(uint64_t tenant_id, if (OB_FAIL(sql.assign_fmt("UPDATE %s SET " "task_start_time = %ld, task_update_time = %ld, trigger_type = %ld, status = %ld," - " ttl_del_cnt = %ld, max_version_del_cnt = %ld, scan_cnt = %ld, row_key = '%s', ret_code = '%s'" - " WHERE " - "tenant_id = %ld AND table_id = %ld AND partition_id = %ld AND task_id = %ld ", + " ttl_del_cnt = %ld, max_version_del_cnt = %ld, scan_cnt = %ld, row_key = '%*.s', ret_code = '%*.s'" + " WHERE tenant_id = %ld AND table_id = %ld AND partition_id = %ld AND task_id = %ld ", tname, // 0 task.task_start_time_, task.task_update_time_, task.trigger_type_, task.status_, - task.ttl_del_cnt_, task.max_version_del_cnt_, task.scan_cnt_, task.row_key_.ptr(), task.ret_code_.ptr(), + task.ttl_del_cnt_, task.max_version_del_cnt_, task.scan_cnt_, + task.row_key_.length(), task.row_key_.ptr(), + task.ret_code_.length(), task.ret_code_.ptr(), tenant_id, task.table_id_, key.partition_id_, key.task_id_))) { LOG_WARN("sql assign fmt failed", K(ret)); } else if (OB_FAIL(proxy.write(tenant_id, sql.ptr(), affect_rows))) { @@ -260,12 +263,13 @@ int ObTTLUtil::update_ttl_task_all_fields(uint64_t tenant_id, if (OB_FAIL(sql.assign_fmt("UPDATE %s SET " "task_start_time = %ld, task_update_time = %ld, trigger_type = %ld, status = %ld," - " ttl_del_cnt = %ld, max_version_del_cnt = %ld, scan_cnt = %ld, row_key = '%s', ret_code = '%s'" + " ttl_del_cnt = %ld, max_version_del_cnt = %ld, scan_cnt = %ld, row_key = '%*.s', ret_code = '%*.s'" " WHERE " "tenant_id = %ld AND table_id = %ld AND partition_id = %ld AND task_id = %ld ", tname, // 0 task.task_start_time_, task.task_update_time_, task.trigger_type_, task.status_, - task.ttl_del_cnt_, task.max_version_del_cnt_, task.scan_cnt_, task.row_key_.ptr(), task.ret_code_.ptr(), + task.ttl_del_cnt_, task.max_version_del_cnt_, task.scan_cnt_, + task.row_key_.length(), task.row_key_.ptr(), task.ret_code_.length(), task.ret_code_.ptr(), tenant_id, task.table_id_, task.partition_id_, task.task_id_))) { LOG_WARN("sql assign fmt failed", K(ret)); } else if (OB_FAIL(proxy.write(tenant_id, sql.ptr(), affect_rows))) { @@ -280,11 +284,11 @@ int ObTTLUtil::update_ttl_task_all_fields(uint64_t tenant_id, int ObTTLUtil::delete_ttl_task(uint64_t tenant_id, const char* tname, common::ObISQLClient& proxy, - ObTTLStatusKey& key) + ObTTLStatusKey& key, + int64_t &affect_rows) { int ret = OB_SUCCESS; ObSqlString sql; - int64_t affect_rows = 0; if (OB_FAIL(sql.assign_fmt("DELETE FROM %s WHERE " "tenant_id = %ld AND table_id = %ld " @@ -396,11 +400,13 @@ int ObTTLUtil::read_ttl_tasks(uint64_t tenant_id, ObString rowkey; char *rowkey_buf = nullptr; EXTRACT_VARCHAR_FIELD_MYSQL(*result, "row_key", rowkey); - if (OB_ISNULL(rowkey_buf = static_cast(allocator->alloc(rowkey.length())))) { - LOG_WARN("failt to allocate memory", K(ret)); - } else { - MEMCPY(rowkey_buf, rowkey.ptr(), rowkey.length()); - result_arr.at(idx).row_key_.assign(rowkey_buf, rowkey.length()); + if (OB_SUCC(ret) && !rowkey.empty()) { + if (OB_ISNULL(rowkey_buf = static_cast(allocator->alloc(rowkey.length())))) { + LOG_WARN("failt to allocate memory", K(ret), K(rowkey)); + } else { + MEMCPY(rowkey_buf, rowkey.ptr(), rowkey.length()); + result_arr.at(idx).row_key_.assign(rowkey_buf, rowkey.length()); + } } } @@ -408,13 +414,15 @@ int ObTTLUtil::read_ttl_tasks(uint64_t tenant_id, ObString err_msg; char *err_buf = nullptr; EXTRACT_VARCHAR_FIELD_MYSQL(*result, "ret_code", err_msg); - if (OB_ISNULL(err_buf = static_cast(allocator->alloc(err_msg.length())))) { - LOG_WARN("failt to allocate memory", K(ret), K(err_msg.length())); - } else { - MEMCPY(err_buf, err_msg.ptr(), err_msg.length()); - result_arr.at(idx).ret_code_.assign(err_buf, err_msg.length()); + if (OB_SUCC(ret) && !err_msg.empty()) { + if (OB_ISNULL(err_buf = static_cast(allocator->alloc(err_msg.length())))) { + LOG_WARN("failt to allocate memory", K(ret), K(err_msg)); + } else { + MEMCPY(err_buf, err_msg.ptr(), err_msg.length()); + result_arr.at(idx).ret_code_.assign(err_buf, err_msg.length()); + } } - } + } } } } @@ -457,5 +465,58 @@ bool ObTTLUtil::check_can_process_tenant_tasks(uint64_t tenant_id) return bret; } +int ObTTLUtil::remove_all_task_to_history_table(uint64_t tenant_id, uint64_t task_id, common::ObISQLClient& proxy) +{ + int ret = OB_SUCCESS; + ObSqlString sql; + int64_t affect_rows = 0; + if (OB_FAIL(sql.assign_fmt("insert into %s select * from %s " + " where task_id = %ld and partition_id != -1 and table_id != -1", + share::OB_ALL_KV_TTL_TASK_HISTORY_TNAME, + share::OB_ALL_KV_TTL_TASK_TNAME, + task_id))) { + LOG_WARN("sql assign fmt failed", K(ret)); + } else if (OB_FAIL(proxy.write(tenant_id, sql.ptr(), affect_rows))) { + LOG_WARN("fail to execute sql", K(ret), K(sql), K(tenant_id)); + } else { + LOG_INFO("success to execute sql", K(ret), K(tenant_id), K(sql), K(affect_rows)); + } + + return ret; +} + +int ObTTLUtil::replace_ttl_task(uint64_t tenant_id, + const char* tname, + common::ObISQLClient& proxy, + ObTTLStatus& task) +{ + int ret = OB_SUCCESS; + ObSqlString sql; + int64_t affect_rows = 0; + + if (OB_FAIL(sql.assign_fmt("REPLACE INTO %s " + "(gmt_create, gmt_modified, tenant_id, table_id, partition_id, " + "task_id, task_start_time, task_update_time, trigger_type, status," + " ttl_del_cnt, max_version_del_cnt, scan_cnt, row_key, ret_code)" + " VALUE " + "(now(), now(), %ld, %ld, %ld," + " %ld, %ld, %ld, %ld, %ld, " + " %ld, %ld, %ld,'%.*s', '%.*s')", // 12 + tname, // 0 + tenant_id, task.table_id_, task.partition_id_, + task.task_id_, task.task_start_time_, task.task_update_time_, task.trigger_type_, task.status_, + task.ttl_del_cnt_, task.max_version_del_cnt_, + task.scan_cnt_, task.row_key_.length(), task.row_key_.ptr(), + task.ret_code_.length(), task.ret_code_.ptr()))) { + LOG_WARN("sql assign fmt failed", K(ret)); + } else if (OB_FAIL(proxy.write(tenant_id, sql.ptr(), affect_rows))) { + LOG_WARN("fail to execute sql", K(ret), K(sql)); + } else { + LOG_INFO("success to execute sql", K(ret), K(sql)); + } + + return ret; +} + } // end namespace rootserver } // end namespace oceanbase \ No newline at end of file diff --git a/src/share/table/ob_ttl_util.h b/src/share/table/ob_ttl_util.h index 87640dac6e..e86f73d9ff 100644 --- a/src/share/table/ob_ttl_util.h +++ b/src/share/table/ob_ttl_util.h @@ -24,6 +24,17 @@ namespace oceanbase namespace common { +#define OB_TTL_RESPONSE_MASK (1 << 5) +#define OB_TTL_STATUS_MASK (OB_TTL_RESPONSE_MASK - 1) + +#define SET_TASK_PURE_STATUS(status, state) ((status) = ((state) & OB_TTL_STATUS_MASK) + ((status & OB_TTL_RESPONSE_MASK))) +#define SET_TASK_RESPONSE(status, state) ((status) |= (((state) & 1) << 5)) +#define SET_TASK_STATUS(status, pure_status, is_responsed) { SET_TASK_PURE_STATUS(status, pure_status), SET_TASK_RESPONSE(status, is_responsed); } + +#define EVAL_TASK_RESPONSE(status) (((status) & OB_TTL_RESPONSE_MASK) >> 5) +#define EVAL_TASK_PURE_STATUS(status) (static_cast((status) & OB_TTL_STATUS_MASK)) + + enum TRIGGER_TYPE { PERIODIC_TRIGGER = 0, @@ -106,6 +117,7 @@ typedef struct ObTTLStatus { K_(ttl_del_cnt), K_(max_version_del_cnt), K_(scan_cnt), + K_(row_key), K_(ret_code)); } ObTTLStatus; @@ -210,6 +222,11 @@ public: common::ObISQLClient& proxy, ObTTLStatus& task); + static int replace_ttl_task(uint64_t tenant_id, + const char* tname, + common::ObISQLClient& proxy, + ObTTLStatus& task); + static int update_ttl_task(uint64_t tenant_id, const char* tname, common::ObISQLClient& proxy, @@ -230,7 +247,8 @@ public: static int delete_ttl_task(uint64_t tenant_id, const char* tname, common::ObISQLClient& proxy, - ObTTLStatusKey& key); + ObTTLStatusKey& key, + int64_t &affect_rows); static int read_ttl_tasks(uint64_t tenant_id, const char* tname, @@ -240,6 +258,8 @@ public: bool for_update = false, common::ObIAllocator *allocator = NULL); + static int remove_all_task_to_history_table(uint64_t tenant_id, uint64_t task_id, common::ObISQLClient& proxy); + static bool check_can_do_work(); static bool check_can_process_tenant_tasks(uint64_t tenant_id); diff --git a/src/storage/ob_partition_service.cpp b/src/storage/ob_partition_service.cpp index 221687837a..05821ef6b4 100644 --- a/src/storage/ob_partition_service.cpp +++ b/src/storage/ob_partition_service.cpp @@ -10369,12 +10369,9 @@ int ObPartitionService::internal_leader_active(const ObCbTask& active_task) const bool is_normal_pg = !(guard.get_partition_group()->get_pg_storage().is_restore()); if ((OB_SYS_TENANT_ID != pkey.get_tenant_id()) && is_normal_pg) { (void)clog_mgr_->add_pg_archive_task(partition); + observer::ObTTLManager::get_instance().on_leader_active(pkey); } } - - if (OB_SUCC(ret)) { - observer::ObTTLManager::get_instance().on_leader_active(partition); - } } if (OB_FAIL(ret) && OB_NOT_NULL(partition)) { -- GitLab