提交 da8879f6 编写于 作者: O obdev 提交者: wangzelin.wzl

fix ttl issue 42550411 & 42561394 & 42635299

上级 68cc172f
...@@ -3758,7 +3758,6 @@ int ObService::broadcast_locations(const obrpc::ObPartitionBroadcastArg& arg, ob ...@@ -3758,7 +3758,6 @@ int ObService::broadcast_locations(const obrpc::ObPartitionBroadcastArg& arg, ob
int ObService::ttl_request(const obrpc::ObTTLRequestArg &arg, obrpc::ObTTLResult &result) int ObService::ttl_request(const obrpc::ObTTLRequestArg &arg, obrpc::ObTTLResult &result)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
LOG_INFO("ttl_request run", K(ret), K(arg)); // todo@dazhi: just for debug
if (OB_UNLIKELY(!inited_)) { if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
LOG_WARN("service do not init", KR(ret), K(arg)); LOG_WARN("service do not init", KR(ret), K(arg));
...@@ -3766,7 +3765,7 @@ int ObService::ttl_request(const obrpc::ObTTLRequestArg &arg, obrpc::ObTTLResult ...@@ -3766,7 +3765,7 @@ int ObService::ttl_request(const obrpc::ObTTLRequestArg &arg, obrpc::ObTTLResult
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(arg)); LOG_WARN("invalid argument", KR(ret), K(arg));
} else if (OB_FAIL(ObTTLManager::get_instance().proc_rs_cmd(arg.tenant_id_, arg.task_id_, } else if (OB_FAIL(ObTTLManager::get_instance().proc_rs_cmd(arg.tenant_id_, arg.task_id_,
ObTTLRequestArg::TTLTriggerType::USER_TRIGGER_TYPE == arg.trigger_type_, TRIGGER_TYPE::USER_TRIGGER == arg.trigger_type_,
static_cast<ObTTLRequestArg::TTLRequestType>(arg.cmd_code_)))) { static_cast<ObTTLRequestArg::TTLRequestType>(arg.cmd_code_)))) {
LOG_WARN("fail to process rs command", K(ret), K(arg)); LOG_WARN("fail to process rs command", K(ret), K(arg));
} }
......
...@@ -2295,12 +2295,12 @@ int ObTableTTLDeleteRowIterator::get_next_row(ObNewRow*& row) ...@@ -2295,12 +2295,12 @@ int ObTableTTLDeleteRowIterator::get_next_row(ObNewRow*& row)
} else { } else {
cur_version_++; cur_version_++;
} }
if (cur_version_ > max_version_) { if (max_version_ > 0 && cur_version_ > max_version_) {
max_version_cnt_++; max_version_cnt_++;
cur_del_rows_++; cur_del_rows_++;
is_last_row_ttl_ = false; is_last_row_ttl_ = false;
break; break;
} else if (cell_ts + time_to_live_ < common::ObTimeUtility::current_time()) { } else if (time_to_live_ > 0 && (cell_ts + time_to_live_ < ObTimeUtility::current_time())) {
ttl_cnt_++; ttl_cnt_++;
cur_del_rows_++; cur_del_rows_++;
is_last_row_ttl_ = true; is_last_row_ttl_ = true;
...@@ -2350,7 +2350,7 @@ int ObTableTTLDeleteRowIterator::init(const ObTableTTLOperation &ttl_operation) ...@@ -2350,7 +2350,7 @@ int ObTableTTLDeleteRowIterator::init(const ObTableTTLOperation &ttl_operation)
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_UNLIKELY(!ttl_operation.is_valid())) { if (OB_UNLIKELY(!ttl_operation.is_valid())) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid ttl operation", K(ret)); LOG_WARN("invalid ttl operation", K(ret), K(ttl_operation));
} else { } else {
time_to_live_ = ttl_operation.time_to_live_; time_to_live_ = ttl_operation.time_to_live_;
max_version_ = ttl_operation.max_version_; max_version_ = ttl_operation.max_version_;
......
...@@ -20,6 +20,7 @@ namespace oceanbase ...@@ -20,6 +20,7 @@ namespace oceanbase
namespace observer namespace observer
{ {
// value not greater than 0 is invalid, which is ignored in ttl task
struct ObTTLPara final struct ObTTLPara final
{ {
public: public:
...@@ -27,7 +28,7 @@ public: ...@@ -27,7 +28,7 @@ public:
max_version_(0) {} max_version_(0) {}
bool is_valid() const bool is_valid() const
{ {
return ttl_ > 0 && max_version_ > 0; return ttl_ > 0 || max_version_ > 0;
} }
TO_STRING_KV(K_(ttl), K_(max_version)); TO_STRING_KV(K_(ttl), K_(max_version));
public: public:
......
...@@ -322,8 +322,8 @@ int ObTTLManager::check_cmd_state_valid(common::ObTTLTaskStatus current_state, c ...@@ -322,8 +322,8 @@ int ObTTLManager::check_cmd_state_valid(common::ObTTLTaskStatus current_state, c
return ret; return ret;
} }
int ObTTLManager::Transform_cmd_and_state(ObTTLRequestArg::TTLRequestType& cmd, int ObTTLManager::transform_cmd_to_state(const ObTTLRequestArg::TTLRequestType& cmd,
common::ObTTLTaskStatus& state) common::ObTTLTaskStatus& state)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (cmd == ObTTLRequestArg::TTL_TRIGGER_TYPE || cmd == ObTTLRequestArg::TTL_RESUME_TYPE) { if (cmd == ObTTLRequestArg::TTL_TRIGGER_TYPE || cmd == ObTTLRequestArg::TTL_RESUME_TYPE) {
...@@ -341,35 +341,35 @@ int ObTTLManager::Transform_cmd_and_state(ObTTLRequestArg::TTLRequestType& cmd, ...@@ -341,35 +341,35 @@ int ObTTLManager::Transform_cmd_and_state(ObTTLRequestArg::TTLRequestType& cmd,
return ret; return ret;
} }
/*rs msg call*/ // RS TTL message entrance
int ObTTLManager::proc_rs_cmd(uint64_t tenant_id, uint64_t task_id, int ObTTLManager::proc_rs_cmd(uint64_t tenant_id, uint64_t task_id,
bool is_usr_trigger, ObTTLRequestArg::TTLRequestType cmd) bool is_usr_trigger, ObTTLRequestArg::TTLRequestType cmd)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObTTLTenantInfo* tenant_info = NULL; ObTTLTenantInfo* tenant_info = NULL;
common::ObTTLTaskStatus state; common::ObTTLTaskStatus expected_state;
bool is_create_tenant_task = false; bool need_create_tenant_info = false;
common::ObSpinLockGuard guard(lock_); common::ObSpinLockGuard guard(lock_);
if (OB_FAIL(Transform_cmd_and_state(cmd, state))) { if (OB_FAIL(transform_cmd_to_state(cmd, expected_state))) {
LOG_WARN("invalid cmd type", K(tenant_id), K(task_id), K(is_usr_trigger), K(cmd)); LOG_WARN("invalid cmd type", K(tenant_id), K(task_id), K(is_usr_trigger), K(cmd));
} else { } else {
is_create_tenant_task = (OB_TTL_TASK_RUNNING == state) ? true : false; need_create_tenant_info = (OB_TTL_TASK_RUNNING == expected_state) ? true : false;
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
} else if (!is_init_) { } else if (!is_init_) {
ret = OB_ENTRY_NOT_EXIST; ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("ttl manager not init", K(tenant_id), K(task_id), K(is_usr_trigger)); LOG_WARN("ttl manager not init", K(tenant_id), K(task_id), K(is_usr_trigger));
} else if (OB_ISNULL(tenant_info = get_tenant_info(tenant_id, is_create_tenant_task))) { } else if (OB_ISNULL(tenant_info = get_tenant_info(tenant_id, need_create_tenant_info))) {
ret = OB_ENTRY_NOT_EXIST; ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("fail to get ttl tenant info", K(tenant_id)); LOG_WARN("fail to get ttl tenant info", K(tenant_id));
} else if (OB_FAIL(check_cmd_state_valid(tenant_info->state_, state))) { } else if (OB_FAIL(check_cmd_state_valid(tenant_info->state_, expected_state))) {
LOG_WARN("ttl cmd state machine is wrong", K(ret), K(tenant_id), K(task_id), K(is_usr_trigger)); LOG_WARN("ttl cmd state machine is wrong", K(ret), K(tenant_id), K(task_id), K(is_usr_trigger));
} else { } else {
tenant_info->cmd_type_ = cmd; tenant_info->cmd_type_ = cmd;
if (OB_INVALID_ID == tenant_info->task_id_) { if (OB_INVALID_ID == tenant_info->task_id_) {
if (OB_TTL_TASK_RUNNING == state) { if (OB_TTL_TASK_RUNNING == expected_state) {
//new ttl tenant //new ttl tenant
tenant_info->task_id_ = task_id; tenant_info->task_id_ = task_id;
tenant_info->is_usr_trigger_ = is_usr_trigger; tenant_info->is_usr_trigger_ = is_usr_trigger;
...@@ -377,25 +377,25 @@ int ObTTLManager::proc_rs_cmd(uint64_t tenant_id, uint64_t task_id, ...@@ -377,25 +377,25 @@ int ObTTLManager::proc_rs_cmd(uint64_t tenant_id, uint64_t task_id,
tenant_info->need_check_ = true; tenant_info->need_check_ = true;
tenant_info->is_dirty_ = true; tenant_info->is_dirty_ = true;
LOG_INFO("new tenent info", K(ret), K(tenant_id), K(tenant_info->task_id_)); LOG_INFO("new tenent info", K(ret), K(tenant_id), K(tenant_info->task_id_));
} else { } else if (OB_INVALID_ID == tenant_info->task_id_) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid task id for current state", K(ret), K(state), K(tenant_id), LOG_WARN("invalid task id for current state", K(ret), K(expected_state), K(tenant_id),
K(task_id), K(tenant_info->task_id_)); K(task_id), K(tenant_info->task_id_));
} }
} else if (tenant_info->state_ == state) { } else if (tenant_info->state_ == expected_state) {
//duplicate //duplicate msg
LOG_INFO("tenant state is duplicated", K(ret), K(state)); LOG_INFO("tenant state is duplicated", K(ret), K(expected_state));
} else { } else {
tenant_info->state_ = state; tenant_info->state_ = expected_state;
tenant_info->is_dirty_ = true; tenant_info->is_dirty_ = true;
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
//receive the msg, need to rsp //receive the msg, need to rsp
tenant_info->rsp_time_ = ObTimeUtility::current_time(); tenant_info->rsp_time_ = ObTimeUtility::current_time();
} }
} }
LOG_INFO("finish process rs cmd", K(ret), K(tenant_id), K(task_id), K(state)); LOG_INFO("finish process rs cmd", K(ret), K(tenant_id), K(task_id), K(expected_state));
return ret; return ret;
} }
...@@ -405,10 +405,7 @@ void ObTTLManager::mark_tenant_need_check(uint64_t tenant_id) ...@@ -405,10 +405,7 @@ void ObTTLManager::mark_tenant_need_check(uint64_t tenant_id)
ObTTLTenantInfo* tenant_info = NULL; ObTTLTenantInfo* tenant_info = NULL;
if (common::ObTTLUtil::check_can_process_tenant_tasks(tenant_id)) { if (common::ObTTLUtil::check_can_process_tenant_tasks(tenant_id)) {
common::ObSpinLockGuard guard(lock_); common::ObSpinLockGuard guard(lock_);
if (OB_ISNULL(tenant_info = get_tenant_info(tenant_id, false))) { if (OB_NOT_NULL(tenant_info = get_tenant_info(tenant_id, false))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get ttl tenant info", K(ret));
} else if (!tenant_info->need_check_) {
tenant_info->need_check_ = true; tenant_info->need_check_ = true;
} }
} }
...@@ -500,7 +497,7 @@ int ObTTLManager::report_task_status(ObTTLTaskInfo& task_info, ObTTLPara& task_p ...@@ -500,7 +497,7 @@ int ObTTLManager::report_task_status(ObTTLTaskInfo& task_info, ObTTLPara& task_p
} }
//schedule task //schedule task
if (is_stop && OB_FAIL(try_schedule_next_task(tenant_info))) { if (is_stop && OB_FAIL(try_schedule_remaining_tasks(tenant_info))) {
LOG_WARN("fail to try schedule task", K(ret)); LOG_WARN("fail to try schedule task", K(ret));
} }
return ret; return ret;
...@@ -770,7 +767,7 @@ int ObTTLManager::inner_handle_one_partition_event(ObTTLTenantInfo* tenant_info, ...@@ -770,7 +767,7 @@ int ObTTLManager::inner_handle_one_partition_event(ObTTLTenantInfo* tenant_info,
if (OB_ISNULL(ctx) || OB_ISNULL(tenant_info)) { if (OB_ISNULL(ctx) || OB_ISNULL(tenant_info)) {
ret = OB_ERR_NULL_VALUE; ret = OB_ERR_NULL_VALUE;
LOG_WARN("tenant info ot ctx is null", K(ret), K(tenant_info), K(ctx)); LOG_WARN("tenant info or ctx is null", K(ret), K(tenant_info), K(ctx));
} else if (ctx->task_status_ != tenant_info->state_) { } else if (ctx->task_status_ != tenant_info->state_) {
if (OB_TTL_TASK_RUNNING == tenant_info->state_) { if (OB_TTL_TASK_RUNNING == tenant_info->state_) {
if (OB_TTL_TASK_PENDING == ctx->task_status_) { if (OB_TTL_TASK_PENDING == ctx->task_status_) {
...@@ -793,7 +790,11 @@ int ObTTLManager::inner_handle_one_partition_event(ObTTLTenantInfo* tenant_info, ...@@ -793,7 +790,11 @@ int ObTTLManager::inner_handle_one_partition_event(ObTTLTenantInfo* tenant_info,
} }
if (try_schedule && OB_FAIL(try_schedule_task(tenant_info, ctx))) { if (try_schedule && OB_FAIL(try_schedule_task(tenant_info, ctx))) {
LOG_WARN("fail to try schedule dag task", K(ret), K(ctx->task_info_.pkey_)); if (OB_EAGAIN != ret) {
LOG_WARN("fail to try schedule dag task", K(ret), K(ctx->task_info_.pkey_));
} else {
ret = OB_SUCCESS;
}
} }
LOG_DEBUG("handle one partition event", K(ret), K(ctx->task_status_), K(tenant_info->state_)); LOG_DEBUG("handle one partition event", K(ret), K(ctx->task_status_), K(tenant_info->state_));
} }
...@@ -880,8 +881,10 @@ int ObTTLManager::get_ttl_para_from_schema(const schema::ObTableSchema *table_sc ...@@ -880,8 +881,10 @@ int ObTTLManager::get_ttl_para_from_schema(const schema::ObTableSchema *table_sc
} else { } else {
para.ttl_ = hc_desc.get_time_to_live(); para.ttl_ = hc_desc.get_time_to_live();
para.max_version_ = hc_desc.get_max_version(); para.max_version_ = hc_desc.get_max_version();
can_ttl = true; if (OB_LIKELY(para.is_valid())) {
LOG_DEBUG("success to find a ttl partition", K(ret), K(para)); can_ttl = true;
LOG_DEBUG("success to find a ttl partition", K(ret), K(para));
}
} }
} else {} } else {}
return ret; return ret;
...@@ -939,7 +942,11 @@ int ObTTLManager::try_schedule_prepare_task(ObPartitionKey& pkey) ...@@ -939,7 +942,11 @@ int ObTTLManager::try_schedule_prepare_task(ObPartitionKey& pkey)
// do nothing // do nothing
} else if (FALSE_IT(ctx->task_status_ = OB_TTL_TASK_PENDING)) { } else if (FALSE_IT(ctx->task_status_ = OB_TTL_TASK_PENDING)) {
} else if (OB_FAIL(try_schedule_task(tenant_info, ctx))) { } else if (OB_FAIL(try_schedule_task(tenant_info, ctx))) {
LOG_WARN("fail to schedule task", K(ret)); if (OB_EAGAIN != ret) {
LOG_WARN("fail to schedule task", K(ret));
} else {
ret = OB_SUCCESS;
}
} }
LOG_DEBUG("try schedule prepare task", K(ret), K(pkey.get_tenant_id()), K(pkey.get_table_id())); LOG_DEBUG("try schedule prepare task", K(ret), K(pkey.get_tenant_id()), K(pkey.get_table_id()));
return ret; return ret;
...@@ -964,7 +971,7 @@ int ObTTLManager::sync_sys_table(ObPartitionKey& pkey) ...@@ -964,7 +971,7 @@ int ObTTLManager::sync_sys_table(ObPartitionKey& pkey)
LOG_WARN("ctx is null", K(ret)); LOG_WARN("ctx is null", K(ret));
} else if (OB_UNLIKELY(!ctx->is_valid())) { } else if (OB_UNLIKELY(!ctx->is_valid())) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid partition task ctx", K(ret), K(ctx)); LOG_WARN("invalid partition task ctx", K(ret), KPC(ctx));
} else { } else {
cp_ctx = *ctx; cp_ctx = *ctx;
if (ctx->task_info_.row_key_.empty()) { if (ctx->task_info_.row_key_.empty()) {
...@@ -1123,6 +1130,7 @@ int ObTTLManager::construct_task_record_filter(const uint64_t& task_id, ...@@ -1123,6 +1130,7 @@ int ObTTLManager::construct_task_record_filter(const uint64_t& task_id,
return ret; return ret;
} }
// deprecated
int ObTTLManager::deep_copy_all_tenant_ctxs(common::ObSArray<ObTTLTaskCtx>& ctx_array, int ObTTLManager::deep_copy_all_tenant_ctxs(common::ObSArray<ObTTLTaskCtx>& ctx_array,
common::ObArenaAllocator& allocator, common::ObArenaAllocator& allocator,
uint64_t tenant_id) uint64_t tenant_id)
...@@ -1174,10 +1182,10 @@ int ObTTLManager::move_record_to_history_table(uint64_t tenant_id) ...@@ -1174,10 +1182,10 @@ int ObTTLManager::move_record_to_history_table(uint64_t tenant_id)
LOG_WARN("failt to start trans", K(ret), K(ttl_record.tenant_id_)); LOG_WARN("failt to start trans", K(ret), K(ttl_record.tenant_id_));
} else { } else {
ObTTLStatusKey key(ttl_record.tenant_id_, ttl_record.table_id_, ObTTLStatusKey key(ttl_record.tenant_id_, ttl_record.table_id_,
ttl_record.partition_id_, ttl_record.task_id_); ttl_record.partition_id_, ttl_record.task_id_);
if (OB_FAIL(ObTTLUtil::insert_ttl_task(ttl_record.tenant_id_, if (OB_FAIL(ObTTLUtil::insert_ttl_task(ttl_record.tenant_id_,
share::OB_ALL_KV_TTL_TASK_HISTORY_TNAME, share::OB_ALL_KV_TTL_TASK_HISTORY_TNAME,
trans, ttl_record))) { trans, ttl_record))) {
LOG_WARN("fail to insert ttl task into __all_ttl_task_status_history.", K(ret)); LOG_WARN("fail to insert ttl task into __all_ttl_task_status_history.", K(ret));
} else if (OB_FAIL(common::ObTTLUtil::delete_ttl_task(ttl_record.tenant_id_, } else if (OB_FAIL(common::ObTTLUtil::delete_ttl_task(ttl_record.tenant_id_,
share::OB_ALL_KV_TTL_TASK_TNAME, share::OB_ALL_KV_TTL_TASK_TNAME,
...@@ -1294,7 +1302,7 @@ bool ObTTLManager::can_schedule_task(const ObTTLTaskCtx &ttl_task) ...@@ -1294,7 +1302,7 @@ bool ObTTLManager::can_schedule_task(const ObTTLTaskCtx &ttl_task)
return ttl_task.task_status_ == OB_TTL_TASK_PENDING || ttl_task.task_status_ == OB_TTL_TASK_PREPARE; return ttl_task.task_status_ == OB_TTL_TASK_PENDING || ttl_task.task_status_ == OB_TTL_TASK_PREPARE;
} }
int ObTTLManager::try_schedule_next_task(ObTTLTenantInfo* tenant_info) int ObTTLManager::try_schedule_remaining_tasks(ObTTLTenantInfo* tenant_info)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_ISNULL(tenant_info)) { if (OB_ISNULL(tenant_info)) {
...@@ -1311,16 +1319,20 @@ int ObTTLManager::try_schedule_next_task(ObTTLTenantInfo* tenant_info) ...@@ -1311,16 +1319,20 @@ int ObTTLManager::try_schedule_next_task(ObTTLTenantInfo* tenant_info)
LOG_ERROR("fatal err, ttl ctx in map is null", K(ret), K(tenant_info->tenant_id_)); LOG_ERROR("fatal err, ttl ctx in map is null", K(ret), K(tenant_info->tenant_id_));
} else if (can_schedule_task(*ctx)) { } else if (can_schedule_task(*ctx)) {
if (OB_FAIL(try_schedule_task(tenant_info, ctx))) { if (OB_FAIL(try_schedule_task(tenant_info, ctx))) {
LOG_WARN("fail to schedule task", K(ret)); if (OB_EAGAIN != ret) {
} else { LOG_WARN("fail to schedule task", K(ret));
ret = OB_ITER_END; }
} }
} }
} }
if (OB_EAGAIN == ret) {
ret = OB_SUCCESS;
}
} }
return ret; return ret;
} }
// try schedule partition task, reutrn OB_EAGAIN if dag scheduler is full
int ObTTLManager::try_schedule_task(ObTTLTenantInfo* tenant_info, ObTTLTaskCtx* ctx) int ObTTLManager::try_schedule_task(ObTTLTenantInfo* tenant_info, ObTTLTaskCtx* ctx)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
...@@ -1328,9 +1340,7 @@ int ObTTLManager::try_schedule_task(ObTTLTenantInfo* tenant_info, ObTTLTaskCtx* ...@@ -1328,9 +1340,7 @@ int ObTTLManager::try_schedule_task(ObTTLTenantInfo* tenant_info, ObTTLTaskCtx*
LOG_WARN("invalid argument", K(ret), KP(tenant_info), KP(ctx)); LOG_WARN("invalid argument", K(ret), KP(tenant_info), KP(ctx));
} else if (can_schedule_tenant(*tenant_info) && can_schedule_task(*ctx)) { } else if (can_schedule_tenant(*tenant_info) && can_schedule_task(*ctx)) {
if (OB_FAIL(generate_ttl_dag(ctx->task_info_, ctx->ttl_para_))) { if (OB_FAIL(generate_ttl_dag(ctx->task_info_, ctx->ttl_para_))) {
if (OB_SIZE_OVERFLOW != ret) { if (OB_EAGAIN != ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to generate dag task", K(ret)); LOG_WARN("fail to generate dag task", K(ret));
} }
} else { } else {
......
...@@ -24,9 +24,6 @@ namespace oceanbase ...@@ -24,9 +24,6 @@ namespace oceanbase
namespace observer namespace observer
{ {
using storage::ObIPartitionGroup;
struct ObTTLTaskCtx struct ObTTLTaskCtx
{ {
public : public :
...@@ -63,20 +60,6 @@ public: ...@@ -63,20 +60,6 @@ public:
bool is_dirty_; bool is_dirty_;
}; };
class ObTTLEventTask : public common::ObDLinkBase<ObTTLEventTask>
{
public:
ObTTLEventTask(): ttl_key_(),
partition_cnt_(OB_INVALID_INDEX),
next_free_slot_(OB_INVALID_INDEX)
{}
ObTTLStatusKey ttl_key_;
int64_t partition_cnt_;
uint64_t next_free_slot_;
uint64_t slot_id_;
};
class OBTTLTimerPeriodicTask : public common::ObTimerTask { class OBTTLTimerPeriodicTask : public common::ObTimerTask {
public: public:
OBTTLTimerPeriodicTask() {} OBTTLTimerPeriodicTask() {}
...@@ -96,8 +79,8 @@ public: ...@@ -96,8 +79,8 @@ public:
int proc_rs_cmd(uint64_t tenant_id, uint64_t task_id, int proc_rs_cmd(uint64_t tenant_id, uint64_t task_id,
bool is_usr_trigger, obrpc::ObTTLRequestArg::TTLRequestType cmd); bool is_usr_trigger, obrpc::ObTTLRequestArg::TTLRequestType cmd);
int report_task_status(ObTTLTaskInfo& task_info, int report_task_status(ObTTLTaskInfo& task_info,
ObTTLPara& task_para, bool& is_stop); ObTTLPara& task_para, bool& is_stop);
void on_leader_active(ObIPartitionGroup* partition); void on_leader_active(storage::ObIPartitionGroup* partition);
void on_schema_changed(uint64_t schema_changed_tenant_id); void on_schema_changed(uint64_t schema_changed_tenant_id);
/*timer handle function*/ /*timer handle function*/
...@@ -159,7 +142,7 @@ private: ...@@ -159,7 +142,7 @@ private:
int generate_one_partition_task(ObTTLTaskInfo& task_info, ObTTLPara& para); int generate_one_partition_task(ObTTLTaskInfo& task_info, ObTTLPara& para);
int get_ttl_para_from_schema(const share::schema::ObTableSchema *table_schema, int get_ttl_para_from_schema(const share::schema::ObTableSchema *table_schema,
ObTTLPara& para, bool& is_tableapi_schema); ObTTLPara& para, bool& is_tableapi_schema);
int check_partition_can_gen_ttl(ObIPartitionGroup *partition, int check_partition_can_gen_ttl(storage::ObIPartitionGroup *partition,
ObTTLPara &para, bool& can_ttl); ObTTLPara &para, bool& can_ttl);
int check_and_do_rsp(uint64_t tenant_id); int check_and_do_rsp(uint64_t tenant_id);
void mark_tenant_need_check(uint64_t tenant_id); void mark_tenant_need_check(uint64_t tenant_id);
...@@ -180,7 +163,7 @@ private: ...@@ -180,7 +163,7 @@ private:
int sync_sys_table(ObPartitionKey& pkey); int sync_sys_table(ObPartitionKey& pkey);
int construct_sys_table_record(ObTTLTaskCtx* ctx, common::ObTTLStatus& ttl_record); int construct_sys_table_record(ObTTLTaskCtx* ctx, common::ObTTLStatus& ttl_record);
int try_schedule_task(ObTTLTenantInfo* tenant_info, ObTTLTaskCtx* ctx); int try_schedule_task(ObTTLTenantInfo* tenant_info, ObTTLTaskCtx* ctx);
int try_schedule_next_task(ObTTLTenantInfo* tenant_info); int try_schedule_remaining_tasks(ObTTLTenantInfo* tenant_info);
bool can_schedule_tenant(const ObTTLTenantInfo &tenant_info); bool can_schedule_tenant(const ObTTLTenantInfo &tenant_info);
bool can_schedule_task(const ObTTLTaskCtx &ttl_task); bool can_schedule_task(const ObTTLTaskCtx &ttl_task);
int check_cmd_state_valid(common::ObTTLTaskStatus current_state, common::ObTTLTaskStatus incoming_state); int check_cmd_state_valid(common::ObTTLTaskStatus current_state, common::ObTTLTaskStatus incoming_state);
...@@ -190,7 +173,7 @@ private: ...@@ -190,7 +173,7 @@ private:
common::ObArenaAllocator& allocator); common::ObArenaAllocator& allocator);
void mark_ttl_ctx_dirty(ObTTLTenantInfo* tenant_info, ObTTLTaskCtx* ctx); void mark_ttl_ctx_dirty(ObTTLTenantInfo* tenant_info, ObTTLTaskCtx* ctx);
void check_ttl_tenant_state(uint64_t tenant_id); void check_ttl_tenant_state(uint64_t tenant_id);
int Transform_cmd_and_state(obrpc::ObTTLRequestArg::TTLRequestType& cmd, common::ObTTLTaskStatus& state); int transform_cmd_to_state(const obrpc::ObTTLRequestArg::TTLRequestType& cmd, common::ObTTLTaskStatus& state);
int try_schedule_prepare_task(ObPartitionKey& pkey); int try_schedule_prepare_task(ObPartitionKey& pkey);
void mark_tenant_checked(uint64_t tenant_id); void mark_tenant_checked(uint64_t tenant_id);
int mark_tenant_droped(const uint64_t& tenant_id); int mark_tenant_droped(const uint64_t& tenant_id);
......
...@@ -280,6 +280,7 @@ int ObTableTTLDeleteTask::process() ...@@ -280,6 +280,7 @@ int ObTableTTLDeleteTask::process()
int ObTableTTLDeleteTask::process_one() int ObTableTTLDeleteTask::process_one()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t start_time = ObTimeUtil::current_time();
ObPartitionKey pkey = info_->pkey_; ObPartitionKey pkey = info_->pkey_;
uint64_t tenant_id = pkey.get_tenant_id(); uint64_t tenant_id = pkey.get_tenant_id();
uint64_t table_id = pkey.get_table_id(); uint64_t table_id = pkey.get_table_id();
...@@ -339,7 +340,8 @@ int ObTableTTLDeleteTask::process_one() ...@@ -339,7 +340,8 @@ int ObTableTTLDeleteTask::process_one()
info_->err_code_ = ret; info_->err_code_ = ret;
LOG_DEBUG("finish delete", K(ret), KPC_(info)); LOG_DEBUG("finish delete", K(ret), KPC_(info));
} }
LOG_DEBUG("finish process one", K(ret), K(result.get_max_version_del_row()), K(result.get_ttl_del_row()), K(result.get_del_row())); int64_t cost = ObTimeUtil::current_time() - start_time;
LOG_DEBUG("finish process one", K(ret), K(cost), K(result.get_max_version_del_row()), K(result.get_ttl_del_row()), K(result.get_del_row()));
return ret; return ret;
} }
......
此差异已折叠。
...@@ -21,8 +21,9 @@ namespace rootserver ...@@ -21,8 +21,9 @@ namespace rootserver
{ {
class ObRootService; class ObRootService;
typedef common::ObArray<common::ObAddr> ServerList; class ObTTLServerInfo;
typedef common::hash::ObHashSet<common::ObAddr> ServerInfoSet; typedef common::ObArray<ObTTLServerInfo> TTLServerInfos;
typedef common::hash::ObHashSet<common::ObAddr> ServerSet;
/** /**
* the task for clear ttl history task in __all_ttl_task_status_history * the task for clear ttl history task in __all_ttl_task_status_history
...@@ -35,32 +36,41 @@ public: ...@@ -35,32 +36,41 @@ public:
virtual void runTimerTask() override; virtual void runTimerTask() override;
void destroy() {} void destroy() {}
static const int64_t OB_KV_TTL_GC_INTERVAL = 120 * 1000L * 1000L; // 120s
private: private:
ObRootService& root_service_; ObRootService& root_service_;
}; };
typedef struct RsTenantTask { struct ObTTLServerInfo
{
public:
ObTTLServerInfo() : addr_(), is_responsed_(false) {}
~ObTTLServerInfo() = default;
TO_STRING_KV(K_(addr), K_(is_responsed));
public:
common::ObAddr addr_;
bool is_responsed_;
};
struct RsTenantTask
{
public:
RsTenantTask()
: ttl_status_(), server_infos_(), all_responsed_(false)
{}
~RsTenantTask() = default;
int set_server_responsed(const ObAddr& server_addr);
void set_servers_not_responsed();
TO_STRING_KV(K_(ttl_status),
K_(server_infos),
K_(all_responsed));
public:
common::ObTTLStatus ttl_status_; common::ObTTLStatus ttl_status_;
ServerList send_servers_; common::ObArray<ObTTLServerInfo> server_infos_;
ServerList eliminate_servers_;
ServerList rsp_servers_;
bool all_responsed_; bool all_responsed_;
};
RsTenantTask() struct ObTTLTenantTask {
: ttl_status_(),
send_servers_(),
eliminate_servers_(),
rsp_servers_(),
all_responsed_(false) {}
TO_STRING_KV(K_(ttl_status),
K_(send_servers),
K_(eliminate_servers),
K_(rsp_servers),
K_(all_responsed));
} RsTenantTask;
typedef struct ObTTLTenantTask {
ObArray<RsTenantTask> tasks_; ObArray<RsTenantTask> tasks_;
bool need_refresh_; bool need_refresh_;
uint64_t tenant_id_; uint64_t tenant_id_;
...@@ -82,7 +92,7 @@ typedef struct ObTTLTenantTask { ...@@ -82,7 +92,7 @@ typedef struct ObTTLTenantTask {
K_(need_refresh), K_(need_refresh),
K_(tenant_id), K_(tenant_id),
K_(is_del)); K_(is_del));
} ObTTLTenantTask; };
/* /*
* the scheduler for all ttl and max version deletion tasks executed in root service * the scheduler for all ttl and max version deletion tasks executed in root service
...@@ -93,7 +103,7 @@ typedef struct ObTTLTenantTask { ...@@ -93,7 +103,7 @@ typedef struct ObTTLTenantTask {
class ObTTLScheduler : private common::ObTimerTask class ObTTLScheduler : private common::ObTimerTask
{ {
public: public:
static const int64_t SCHEDULE_PERIOD = 20 * 1000L * 1000L; // 20s static const int64_t SCHEDULE_PERIOD = 15 * 1000L * 1000L; // 15s
explicit ObTTLScheduler(ObRootService& rs) explicit ObTTLScheduler(ObRootService& rs)
: is_inited_(false), : is_inited_(false),
root_service_(rs), root_service_(rs),
...@@ -156,7 +166,7 @@ private: ...@@ -156,7 +166,7 @@ private:
need_refresh_(true), need_refresh_(true),
is_inited_(false) {} is_inited_(false) {}
int update_task_on_responsed(RsTenantTask& task); int update_task_on_all_responsed(RsTenantTask& task);
virtual bool is_enable_ttl(uint64_t tenant_id); virtual bool is_enable_ttl(uint64_t tenant_id);
...@@ -170,36 +180,29 @@ private: ...@@ -170,36 +180,29 @@ private:
virtual int insert_tenant_task(ObTTLStatus& ttl_task); virtual int insert_tenant_task(ObTTLStatus& ttl_task);
virtual int update_task_status(uint64_t tenant_id, virtual int update_task_status(uint64_t tenant_id,
uint64_t task_id, uint64_t task_id,
int64_t rs_new_status); int64_t rs_new_status);
bool tenant_exist(uint64_t tenant_id); bool tenant_exist(uint64_t tenant_id);
virtual int update_tenant_tasks(uint64_t tenant_id, common::ObTTLStatusArray& tasks); virtual int update_tenant_tasks(uint64_t tenant_id, common::ObTTLStatusArray& tasks);
virtual int get_alive_servers(uint64_t tenant_id, int get_server_infos(uint64_t tenant_id,
ServerList& server_infos); common::ObArray<ObTTLServerInfo>& server_infos);
/* variables */ /* variables */
virtual int fetch_ttl_task_id(uint64_t tenant_id, int64_t &new_task_id); virtual int fetch_ttl_task_id(uint64_t tenant_id, int64_t &new_task_id);
// RS-> observer ttl request // RS-> observer ttl request
virtual int dispatch_ttl_request(ServerList& addrs, virtual int dispatch_ttl_request(const common::ObArray<ObTTLServerInfo>& server_infos,
ServerList& eliminate_addrs, uint64_t tenant_id, int ttl_cmd,
uint64_t tenant_id, int trigger_type, int64_t task_id);
int ttl_cmd,
int trigger_type,
int64_t task_id);
virtual int get_valid_servers(ServerList& all_list, ServerList& remove_list, ServerList& ret_list);
int add_tenant(uint64_t tenant_id); int add_tenant(uint64_t tenant_id);
void delete_tenant(uint64_t tenant_id); void delete_tenant(uint64_t tenant_id);
bool need_refresh_tenant(uint64_t tenant_id); bool need_refresh_tenant(uint64_t tenant_id);
bool need_task_retry(RsTenantTask& rs_task); bool need_retry_task(RsTenantTask& rs_task);
// need lock // need lock
int get_tenant_tasks_ptr(uint64_t tenant_id, int get_tenant_ptr(uint64_t tenant_id, ObTTLTenantTask*& tasks_ptr);
ObTTLTenantTask*& tasks_ptr); int get_task_ptr(uint64_t tenant_id, uint64_t task_id, RsTenantTask*& ten_task);
int get_task_ptr(uint64_t tenant_id,
uint64_t task_id,
RsTenantTask*& ten_task);
int user_cmd_upon_task(ObTTLTaskType task_type, int user_cmd_upon_task(ObTTLTaskType task_type,
ObTTLTaskStatus curr_state, ObTTLTaskStatus curr_state,
ObTTLTaskStatus &next_state, ObTTLTaskStatus &next_state,
...@@ -219,8 +222,6 @@ private: ...@@ -219,8 +222,6 @@ private:
void refresh_deleted_tenants(); void refresh_deleted_tenants();
bool is_addr_exist(ServerList& addr_arr, const ObAddr& addr);
private: private:
lib::ObMutex mutex_; // lib::ObMutexGuard guard(mutex_); lib::ObMutex mutex_; // lib::ObMutexGuard guard(mutex_);
ObArray<ObTTLTenantTask> ten_task_arr_; ObArray<ObTTLTenantTask> ten_task_arr_;
...@@ -229,7 +230,7 @@ private: ...@@ -229,7 +230,7 @@ private:
bool is_inited_; bool is_inited_;
const int64_t OB_TTL_TASK_RETRY_INTERVAL = ObTTLScheduler::SCHEDULE_PERIOD * 15; // retry interval 300s const int64_t OB_TTL_TASK_RETRY_INTERVAL = 60*1000*1000; // 3min
}; };
#define TTLMGR ObTTLTenantTaskMgr::get_instance() #define TTLMGR ObTTLTenantTaskMgr::get_instance()
......
...@@ -8409,12 +8409,6 @@ public: ...@@ -8409,12 +8409,6 @@ public:
TTL_INVALID_TYPE = 5 TTL_INVALID_TYPE = 5
}; };
enum TTLTriggerType {
SYS_TRIGGER_TYPE = 1,
USER_TRIGGER_TYPE,
INVALID_TRIGGER_TYPE,
};
ObTTLRequestArg() ObTTLRequestArg()
: cmd_code_(-1), trigger_type_(-1), task_id_(OB_INVALID_ID), tenant_id_(OB_INVALID_ID) : cmd_code_(-1), trigger_type_(-1), task_id_(OB_INVALID_ID), tenant_id_(OB_INVALID_ID)
{} {}
......
...@@ -340,7 +340,7 @@ public: ...@@ -340,7 +340,7 @@ public:
bool is_valid() const bool is_valid() const
{ {
return OB_INVALID_TENANT_ID != tenant_id_ && OB_INVALID_ID != table_id_ && return OB_INVALID_TENANT_ID != tenant_id_ && OB_INVALID_ID != table_id_ &&
max_version_ > 0 && time_to_live_ > 0 && del_row_limit_ > 0; (max_version_ > 0 || time_to_live_ > 0) && del_row_limit_ > 0;
} }
TO_STRING_KV(K_(tenant_id), K_(table_id), K_(max_version), K_(time_to_live), K_(del_row_limit), K_(start_rowkey), K_(start_qualifier)); TO_STRING_KV(K_(tenant_id), K_(table_id), K_(max_version), K_(time_to_live), K_(del_row_limit), K_(start_rowkey), K_(start_qualifier));
public: public:
......
...@@ -444,8 +444,12 @@ bool ObTTLUtil::check_can_process_tenant_tasks(uint64_t tenant_id) ...@@ -444,8 +444,12 @@ bool ObTTLUtil::check_can_process_tenant_tasks(uint64_t tenant_id)
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
bool is_restore = true; bool is_restore = true;
if (OB_FAIL(share::schema::ObMultiVersionSchemaService::get_instance(). if (OB_FAIL(share::schema::ObMultiVersionSchemaService::get_instance().
check_tenant_is_restore(NULL, tenant_id, is_restore))) { check_tenant_is_restore(NULL, tenant_id, is_restore))) {
LOG_WARN("fail to check tenant is restore", KR(ret), K(tenant_id)); if (OB_TENANT_NOT_EXIST != ret) {
LOG_WARN("fail to check tenant is restore", KR(ret), K(tenant_id), K(common::lbt()));
} else {
ret = OB_SUCCESS;
}
} else { } else {
bret = !is_restore; bret = !is_restore;
} }
......
...@@ -26,8 +26,8 @@ namespace common ...@@ -26,8 +26,8 @@ namespace common
enum TRIGGER_TYPE enum TRIGGER_TYPE
{ {
PERIODIC_TRIGGER = 1, PERIODIC_TRIGGER = 0,
USER_TRIGGER USER_TRIGGER = 1,
}; };
enum ObTTLTaskType { enum ObTTLTaskType {
...@@ -41,20 +41,20 @@ enum ObTTLTaskType { ...@@ -41,20 +41,20 @@ enum ObTTLTaskType {
enum ObTTLTaskStatus enum ObTTLTaskStatus
{ {
// for obsever
OB_TTL_TASK_PREPARE = 0, //inner state OB_TTL_TASK_PREPARE = 0, //inner state
OB_TTL_TASK_RUNNING = 1, OB_TTL_TASK_RUNNING = 1,
OB_TTL_TASK_PENDING = 2, OB_TTL_TASK_PENDING = 2,
OB_TTL_TASK_CANCEL = 3, OB_TTL_TASK_CANCEL = 3,
OB_TTL_TASK_FINISH = 4, //inner state OB_TTL_TASK_FINISH = 4, //inner state
OB_TTL_TASK_MOVING = 5, OB_TTL_TASK_MOVING = 5,
// for rs
OB_RS_TTL_TASK_CREATE = 15, OB_RS_TTL_TASK_CREATE = 15,
OB_RS_TTL_TASK_SUSPEND, OB_RS_TTL_TASK_SUSPEND = 16,
OB_RS_TTL_TASK_CANCEL = 17,
OB_RS_TTL_TASK_CANCEL, OB_RS_TTL_TASK_MOVE = 18,
OB_RS_TTL_TASK_MOVE,
OB_TTL_TASK_INVALID OB_TTL_TASK_INVALID
}; };
typedef struct ObTTLStatus { typedef struct ObTTLStatus {
......
...@@ -22,8 +22,6 @@ ...@@ -22,8 +22,6 @@
#include "lib/json_type/ob_json_base.h" #include "lib/json_type/ob_json_base.h"
#include "lib/json_type/ob_json_parse.h" #include "lib/json_type/ob_json_parse.h"
using namespace oceanbase::common;
namespace oceanbase namespace oceanbase
{ {
namespace sql namespace sql
......
...@@ -675,6 +675,7 @@ TEST_F(TestTableTTL, dag_report) ...@@ -675,6 +675,7 @@ TEST_F(TestTableTTL, dag_report)
} }
#endif #endif
#if 0
class TestRsTTL : public::testing::Test { class TestRsTTL : public::testing::Test {
public: public:
TestRsTTL() {}; TestRsTTL() {};
...@@ -829,7 +830,6 @@ void TestRsTTL::TearDown() ...@@ -829,7 +830,6 @@ void TestRsTTL::TearDown()
* send msg * send msg
* insert into table * insert into table
*/ */
#if 0
TEST_F(TestRsTTL, ttl_basic) TEST_F(TestRsTTL, ttl_basic)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册