提交 81ddf033 编写于 作者: O obdev 提交者: wangzelin.wzl

fix ttl issue 42742997 & 42787173 & 42954103 & 42920788 & 42876827 & 42869248...

fix ttl issue 42742997 & 42787173 & 42954103 & 42920788 & 42876827 & 42869248 & 42868078 & 42867515 & 42767501
上级 dce23aca
......@@ -32,7 +32,7 @@ int ObHColumnDescriptor::from_string(const common::ObString &str)
} else if (OB_FAIL(json_parser.init(&allocator))) {
LOG_WARN("failed to init json parser", K(ret));
} else if (OB_FAIL(json_parser.parse(str.ptr(), str.length(), ast))) {
LOG_WARN("failed to parse", K(ret), K(str));
LOG_DEBUG("failed to parse", K(ret), K(str));
ret = OB_SUCCESS;
} else if (NULL != ast
&& ast->get_type() == json::JT_OBJECT
......
......@@ -1736,7 +1736,7 @@ int ObTableService::fill_query_table_param(uint64_t table_id,
} else if (OB_FAIL(schema_guard.get_table_schema(table_id, table_schema))) {
LOG_WARN("get table schema failed", K(table_id), K(ret));
} else if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
ret = OB_TABLE_NOT_EXIST;
LOG_ERROR("NULL ptr", K(ret), K(table_schema));
} else if (OB_FAIL(get_index_id_by_name(schema_guard, table_id, index_name, index_id,
rowkey_columns_type, index_schema))) {
......
......@@ -54,6 +54,7 @@ int ObTTLManager::start()
{
int ret = OB_SUCCESS;
if (!is_init_) {
ret = OB_NOT_INIT;
LOG_WARN("ttl manager not init", K(ret));
} else if (OB_FAIL(ttl_timer_.schedule(periodic_task_, periodic_delay_, true))) {
LOG_WARN("fail to schedule periodic task", K(ret));
......@@ -111,24 +112,23 @@ int ObTTLManager::scan_all_tenanat_handle_event()
} else {
common::ObSpinLockGuard guard(lock_);
for (ttl_tenants_iterator iter = ttl_tenant_parts_map_.begin();
iter != ttl_tenant_parts_map_.end(); ++iter) {
iter != ttl_tenant_parts_map_.end() && OB_SUCC(ret); ++iter) {
tenant_id = iter->first;
tenant_info = iter->second;
if (tenant_info->need_check_ &&
OB_TTL_TASK_CANCEL > tenant_info->state_ &&
OB_FAIL(check_tenants.push_back(tenant_id))) {
if (tenant_info->need_check_ && OB_FAIL(check_tenants.push_back(tenant_id))) {
// after observer restart, need check tenant even when cancel and move state
LOG_WARN("fail to push back check tenants", K(ret));
}
if (tenant_info->is_dirty_ && OB_FAIL(dirty_tenants.push_back(tenant_id))) {
if (OB_SUCC(ret) && tenant_info->is_dirty_ && OB_FAIL(dirty_tenants.push_back(tenant_id))) {
LOG_WARN("fail to push back dirty tenants", K(ret));
} else if (OB_TTL_TASK_MOVING == tenant_info->state_ &&
OB_FAIL(need_move_tenants.push_back(tenant_id))) {
LOG_WARN("fail to push back move operation", K(tenant_id));
}
if ((tenant_info->is_droped_ || tenant_info->rsp_time_ != OB_INVALID_ID) &&
OB_FAIL(need_rsp_tenants.push_back(tenant_id))) {
if (OB_SUCC(ret) && (tenant_info->is_droped_ || tenant_info->rsp_time_ != OB_INVALID_ID) &&
OB_FAIL(need_rsp_tenants.push_back(tenant_id))) { // todo: remove is_droped_
LOG_WARN("fail to push back rsp operation", K(tenant_id));
}
}
......@@ -163,7 +163,7 @@ int ObTTLManager::scan_all_tenanat_handle_event()
/*do moving*/
for (int i = 0; i < need_move_tenants.count() && OB_SUCC(ret); ++i) {
if (OB_FAIL(move_record_to_history_table(need_move_tenants.at(i)))) {
LOG_WARN("fail to do check and response", K(ret));
LOG_WARN("fail to move record to history table", K(ret));
}
}
......@@ -174,6 +174,12 @@ int ObTTLManager::scan_all_tenanat_handle_event()
}
}
if (OB_TENANT_NOT_EXIST == ret) {
LOG_INFO("begin to check and reset dropped tenant", K(ret));
if (OB_FAIL(check_and_reset_droped_tenant())) {
LOG_WARN("fail to check and reset dropped tenant", K(ret));
}
}
return ret;
}
......@@ -200,14 +206,20 @@ void ObTTLManager::check_ttl_tenant_state(uint64_t tenant_id)
} else if (ctx->task_status_ != OB_TTL_TASK_CANCEL &&
ctx->task_status_ != OB_TTL_TASK_FINISH) {
tenant_finish = false;
tenant_info->is_finished_ = false;
}
}
}
if (OB_SUCC(ret) && !tenant_dirty) {
tenant_info->is_dirty_ = false;
if (tenant_finish && tenant_info->state_ < OB_TTL_TASK_MOVING) {
tenant_info->state_ = (tenant_info->state_ == OB_TTL_TASK_CANCEL) ? OB_TTL_TASK_CANCEL :OB_TTL_TASK_FINISH;
if (tenant_finish) {
if (tenant_info->state_ == OB_TTL_TASK_CANCEL || tenant_info->state_ == OB_TTL_TASK_RUNNING) {
// all task already in cancel or runing status
tenant_info->is_finished_ = true;
} else {
ret = OB_ERR_UNEXPECTED;
}
}
}
LOG_DEBUG("check ttl tenant dirty", K(tenant_info->is_dirty_), K(tenant_info->state_), K(ret), K(tenant_id));
......@@ -240,26 +252,24 @@ void ObTTLManager::mark_tenant_rsp(uint64_t tenant_id, int64_t rsp_time)
int ObTTLManager::check_and_do_rsp(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
bool can_rsp = true;
bool can_rsp = false;
int64_t rsp_time = OB_INVALID_ID;
ObTTLTaskStatus rsp_status;
ObTTLTenantInfo* tenant_info = NULL;
ObTTLTaskCtx* ctx = NULL;
bool tenant_is_droped = false;
{
common::ObSpinLockGuard guard(lock_);
tenant_info = get_tenant_info(tenant_id, false);
if (OB_ISNULL(tenant_info)) {
can_rsp = false;
ret = OB_INVALID_ARGUMENT;
LOG_WARN("tenant info is null", K(tenant_id));
} else if ((tenant_is_droped = tenant_info->is_droped_)) {
// do nothing if tenant is droped
} else if (tenant_info->rsp_time_ != OB_INVALID_ID) {
can_rsp = true;
rsp_status = tenant_info->state_;
rsp_time = tenant_info->rsp_time_;
for (ttl_parts_iterator iter = tenant_info->part_task_map_.begin();
can_rsp && iter != tenant_info->part_task_map_.end(); ++iter) {
can_rsp && iter != tenant_info->part_task_map_.end();
++iter) {
ctx = iter->second;
if (OB_ISNULL(ctx)) {
can_rsp = false;
......@@ -283,7 +293,7 @@ int ObTTLManager::check_and_do_rsp(uint64_t tenant_id)
LOG_WARN("fail to response ttl task to rs", K(ret), K(tenant_info->task_id_), K(rsp_status));
}
if (OB_SUCC(ret) && (can_rsp || tenant_is_droped)) {
if (OB_SUCC(ret) && (can_rsp)) {
mark_tenant_rsp(tenant_id, rsp_time);
}
}
......@@ -291,32 +301,60 @@ int ObTTLManager::check_and_do_rsp(uint64_t tenant_id)
return ret;
}
int ObTTLManager::check_cmd_state_valid(common::ObTTLTaskStatus current_state, common::ObTTLTaskStatus incoming_state)
/* transformation of state of tenant info (most be driven by rs):
* invalid -> running -> moving
* -> canceling -> moving
* -> pending -> running
* -> canceling
* -> moving
* -> canceling
* -> pending
*/
int ObTTLManager::check_cmd_state_valid(const common::ObTTLTaskStatus current_state,
const common::ObTTLTaskStatus incoming_state)
{
int ret = OB_SUCCESS;
if ((incoming_state < OB_TTL_TASK_RUNNING || incoming_state > OB_TTL_TASK_CANCEL) &&
(incoming_state != OB_TTL_TASK_MOVING)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("fatal error, invalid state type", K(incoming_state));
} else if (incoming_state == OB_TTL_TASK_MOVING) {
if (current_state != OB_TTL_TASK_FINISH && current_state != OB_TTL_TASK_CANCEL) {
switch (incoming_state) {
case OB_TTL_TASK_RUNNING: {
if (current_state != OB_TTL_TASK_PENDING && current_state != OB_TTL_TASK_INVALID &&
current_state != OB_TTL_TASK_RUNNING) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("receive a move cmd, current task state is wrong", K(current_state));
LOG_WARN("receive rs cmd, but current tenant state is unmatached",
K(ret), K(current_state), K(incoming_state));
}
} else if (OB_TTL_TASK_RUNNING == incoming_state) {
if (current_state >= OB_TTL_TASK_CANCEL && current_state != OB_TTL_TASK_INVALID) {
break;
}
case OB_TTL_TASK_MOVING: {
if (current_state != OB_TTL_TASK_RUNNING && current_state != OB_TTL_TASK_CANCEL &&
current_state != OB_TTL_TASK_INVALID && current_state != OB_TTL_TASK_MOVING) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("receive a cmd, current task state is wrong", K(current_state));
LOG_WARN("receive a move cmd, current task state is unmatached", K(current_state));
}
break;
}
} else if (OB_TTL_TASK_PENDING == incoming_state) {
if (current_state >= OB_TTL_TASK_CANCEL) {
case OB_TTL_TASK_PENDING: {
if (current_state != OB_TTL_TASK_RUNNING && current_state != OB_TTL_TASK_INVALID &&
current_state != OB_TTL_TASK_PENDING) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("receive a cmd, current task state is wrong", K(current_state));
LOG_WARN("receive rs cmd, but current tenant state is unmatached",
K(ret), K(current_state), K(incoming_state));
}
break;
}
} else if (OB_TTL_TASK_CANCEL == incoming_state) {
if (current_state > OB_TTL_TASK_FINISH) {
case OB_TTL_TASK_CANCEL: {
if (current_state != OB_TTL_TASK_PENDING && current_state != OB_TTL_TASK_RUNNING &&
current_state != OB_TTL_TASK_INVALID && current_state != OB_TTL_TASK_CANCEL) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("receive a cancel cmd, current task state is wrong", K(current_state));
LOG_WARN("receive rs cmd, but current tenant state is unmatached",
K(ret), K(current_state), K(incoming_state));
}
break;
}
default: {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid incoming status", K(ret), K(incoming_state));
break;
}
}
return ret;
......@@ -341,6 +379,34 @@ int ObTTLManager::transform_cmd_to_state(const ObTTLRequestArg::TTLRequestType&
return ret;
}
// get cmd type from rs state
ObTTLRequestArg::TTLRequestType ObTTLManager::transform_state_to_cmd(const int64_t state)
{
ObTTLRequestArg::TTLRequestType task_type = ObTTLRequestArg::TTL_INVALID_TYPE;
switch (state) {
case static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_CREATE): {
task_type = ObTTLRequestArg::TTL_TRIGGER_TYPE;
break;
}
case static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_SUSPEND): {
task_type = ObTTLRequestArg::TTL_SUSPEND_TYPE;
break;
}
case static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_CANCEL): {
task_type = ObTTLRequestArg::TTL_CANCEL_TYPE;
break;
}
case static_cast<int64_t>(ObTTLTaskStatus::OB_RS_TTL_TASK_MOVE): {
task_type = ObTTLRequestArg::TTL_MOVE_TYPE;
break;
}
default: {
break;
}
}
return task_type;
}
// RS TTL message entrance
int ObTTLManager::proc_rs_cmd(uint64_t tenant_id, uint64_t task_id,
bool is_usr_trigger, ObTTLRequestArg::TTLRequestType cmd)
......@@ -348,43 +414,52 @@ int ObTTLManager::proc_rs_cmd(uint64_t tenant_id, uint64_t task_id,
int ret = OB_SUCCESS;
ObTTLTenantInfo* tenant_info = NULL;
common::ObTTLTaskStatus expected_state;
bool need_create_tenant_info = false;
const bool create_if_not_exists = true;
common::ObSpinLockGuard guard(lock_);
if (OB_FAIL(transform_cmd_to_state(cmd, expected_state))) {
if (!is_init_) {
ret = OB_NOT_INIT;
LOG_WARN("ttl manager not init", K(tenant_id), K(task_id), K(is_usr_trigger));
} else if (task_id == OB_INVALID_ID) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected invalid task id", K(tenant_id), K(ret));
} else if (OB_FAIL(transform_cmd_to_state(cmd, expected_state))) {
LOG_WARN("invalid cmd type", K(tenant_id), K(task_id), K(is_usr_trigger), K(cmd));
} else {
need_create_tenant_info = (OB_TTL_TASK_RUNNING == expected_state) ? true : false;
}
} else {}
if (OB_SUCC(ret)) {
common::ObSpinLockGuard guard(lock_);
if (OB_FAIL(ret)) {
} else if (!is_init_) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("ttl manager not init", K(tenant_id), K(task_id), K(is_usr_trigger));
} else if (OB_ISNULL(tenant_info = get_tenant_info(tenant_id, need_create_tenant_info))) {
// do nothing
} else if (OB_ISNULL(tenant_info = get_tenant_info(tenant_id, create_if_not_exists))) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("fail to get ttl tenant info", K(tenant_id));
LOG_WARN("fail to get ttl tenant info", K(tenant_id), K(create_if_not_exists));
} else if (tenant_info->need_check_) {
ret = OB_EAGAIN;
LOG_INFO("tenant info need check, please resend message later", KPC(tenant_info), K(expected_state));
} else if (OB_UNLIKELY(tenant_info->task_id_ != OB_INVALID_ID && tenant_info->task_id_ != task_id)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ttl task id is wrong", K(ret), K(tenant_id), K(tenant_info->task_id_), K(task_id));
} else if (OB_FAIL(check_cmd_state_valid(tenant_info->state_, expected_state))) {
LOG_WARN("ttl cmd state machine is wrong", K(ret), K(tenant_id), K(task_id), K(is_usr_trigger));
} else {
tenant_info->cmd_type_ = cmd;
if (OB_INVALID_ID == tenant_info->task_id_) {
if (OB_TTL_TASK_RUNNING == expected_state) {
//new ttl tenant
// new ttl tenant info
tenant_info->task_id_ = task_id;
tenant_info->is_usr_trigger_ = is_usr_trigger;
tenant_info->state_ = OB_TTL_TASK_RUNNING;
tenant_info->state_ = expected_state;
tenant_info->need_check_ = true;
tenant_info->is_dirty_ = true;
LOG_INFO("new tenent info", K(ret), K(tenant_id), K(tenant_info->task_id_));
} else if (OB_INVALID_ID == tenant_info->task_id_) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid task id for current state", K(ret), K(expected_state), K(tenant_id),
K(task_id), K(tenant_info->task_id_));
if (OB_TTL_TASK_MOVING == expected_state) {
// after restart, rs send moving means all tasks was finished or canceled
tenant_info->is_finished_ = true;
}
LOG_INFO("new tenent info", K(ret), K(tenant_id), KPC(tenant_info));
}
} else if (tenant_info->state_ == expected_state) {
//duplicate msg
LOG_INFO("tenant state is duplicated", K(ret), K(expected_state));
if (OB_TTL_TASK_MOVING == expected_state && !tenant_info->is_finished_) {
ret = OB_EAGAIN;
LOG_WARN("can not move a unfinished task", K(ret), K(tenant_id), KPC(tenant_info));
} else {
tenant_info->state_ = expected_state;
tenant_info->is_dirty_ = true;
......@@ -395,6 +470,7 @@ int ObTTLManager::proc_rs_cmd(uint64_t tenant_id, uint64_t task_id,
tenant_info->rsp_time_ = ObTimeUtility::current_time();
}
}
}
LOG_INFO("finish process rs cmd", K(ret), K(tenant_id), K(task_id), K(expected_state));
return ret;
}
......@@ -413,24 +489,25 @@ void ObTTLManager::mark_tenant_need_check(uint64_t tenant_id)
LOG_DEBUG("finsh mark tenant need check", K(ret));
}
void ObTTLManager::on_leader_active(ObIPartitionGroup* partition)
void ObTTLManager::on_leader_active(const ObPartitionKey& pkey)
{
int ret = OB_SUCCESS;
ObTTLPara para;
bool can_ttl = false;
uint64_t tenant_id = pkey.get_tenant_id();
if (!is_init_) {
ret = OB_NOT_INIT;
LOG_WARN("ttl manager not init");
} else if (OB_ISNULL(partition)) {
LOG_WARN("invalid partition on leader active");
} else if (!partition->is_valid()) {
} else if (!pkey.is_valid()) {
// do nothing
} else if (!common::ObTTLUtil::check_can_process_tenant_tasks(partition->get_partition_key().get_tenant_id())) {
} else if(OB_SYS_TENANT_ID == tenant_id) {
// do nothing
} else if (!common::ObTTLUtil::check_can_process_tenant_tasks(tenant_id)) {
//do nothing
} else if (OB_FAIL(check_partition_can_gen_ttl(partition, para, can_ttl))) {
LOG_WARN("fail to check partition can ddl", K(ret), K(partition->get_partition_key()));
} else if (OB_FAIL(check_partition_can_gen_ttl(pkey, para, can_ttl))) {
LOG_WARN("fail to check partition can ttl", K(ret), K(pkey), K(para));
} else if (can_ttl) {
mark_tenant_need_check(partition->get_partition_key().get_tenant_id());
mark_tenant_need_check(tenant_id);
}
}
......@@ -483,8 +560,9 @@ int ObTTLManager::report_task_status(ObTTLTaskInfo& task_info, ObTTLPara& task_p
} else if (OB_ITER_END == task_info.err_code_) {
ctx->task_status_ = OB_TTL_TASK_FINISH;
ctx->task_info_.err_code_ = OB_SUCCESS;
} else if (OB_NOT_MASTER == task_info.err_code_ &&
OB_PARTITION_NOT_EXIST == task_info.err_code_) {
} else if (OB_NOT_MASTER == task_info.err_code_ ||
OB_PARTITION_NOT_EXIST == task_info.err_code_ ||
OB_TABLE_NOT_EXIST == task_info.err_code_) {
LOG_INFO("Cancel current task since partition state change",
K(task_info.err_code_), K(task_info.pkey_));
ctx->task_status_ = OB_TTL_TASK_CANCEL;
......@@ -497,7 +575,7 @@ int ObTTLManager::report_task_status(ObTTLTaskInfo& task_info, ObTTLPara& task_p
}
//schedule task
if (is_stop && OB_FAIL(try_schedule_remaining_tasks(tenant_info))) {
if (is_stop && OB_FAIL(try_schedule_remaining_tasks(tenant_info, ctx))) {
LOG_WARN("fail to try schedule task", K(ret));
}
return ret;
......@@ -559,28 +637,37 @@ int ObTTLManager::generate_tenant_tasks(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
ObIPartitionArrayGuard partitions;
ObPartitionState state;
ObIPartitionGroup *partition = NULL;
bool can_ttl = false;
if (OB_FAIL(ObPartitionService::get_instance().get_all_partitions(partitions))) {
LOG_WARN("fail to get all partition", K(ret));
LOG_WARN("fail to get all partition", K(ret), K(tenant_id));
} else {
//filter the partition
for (int64_t i = 0; OB_SUCC(ret) && i < partitions.count(); ++i) {
partition = partitions.at(i);
ObPartitionKey pk = partition->get_partition_key();
ObPartitionArray pkeys;
ObPartitionState state;
state = partitions.at(i)->get_partition_state();
if (!is_leader_state(state)) {
// do nothing, the partition should be a leader
} else if (OB_FAIL(partitions.at(i)->get_all_pg_partition_keys(pkeys))) {
LOG_WARN("fail to get all pg partition keys", "pg_key", partitions.at(i)->get_partition_key(), K(pkeys));
} else {
for (int pkey_index = 0; OB_SUCC(ret) && pkey_index < pkeys.count(); ++pkey_index) {
ObTTLPara para;
if ((tenant_id != partition->get_partition_key().get_tenant_id())) {
ObPartitionKey pkey = pkeys.at(pkey_index);
if ((tenant_id != pkey.get_tenant_id())) {
//do nothing
} else if (OB_FAIL(check_partition_can_gen_ttl(partition, para, can_ttl))) {
LOG_WARN("fail to check partition can get ttl", K(ret));
} else if (OB_FAIL(check_partition_can_gen_ttl(pkey, para, can_ttl))) {
LOG_WARN("fail to check partition can get ttl", K(ret), K(pkey), K(para));
} else {
if (can_ttl) {
ObTTLTaskInfo task_info;
task_info.pkey_ = pk;
task_info.pkey_ = pkey;
if (OB_FAIL(generate_one_partition_task(task_info, para))) {
LOG_WARN("fail to generate task", K(ret), K(pk));
LOG_WARN("fail to generate task", K(ret), K(task_info), K(para));
}
}
}
}
}
......@@ -716,10 +803,9 @@ int ObTTLManager::generate_ttl_dag(ObTTLTaskInfo& task_info, ObTTLPara& para)
LOG_WARN("fail to add ttl prepare task to dag", K(ret));
} else if (OB_FAIL(dag_scheduler.add_dag(dag))) {
if (OB_EAGAIN == ret) {
ret = OB_SUCCESS;
LOG_INFO("ttl dag already exists, no need to schedule once again");
LOG_DEBUG("ttl dag already exists, no need to schedule once again", K(ret));
} else if (OB_SIZE_OVERFLOW == ret) {
ret = OB_EAGAIN;
LOG_DEBUG("dag is full", K(ret));
} else {
LOG_WARN("fail to add dag to queue", K(ret));
}
......@@ -747,8 +833,7 @@ int ObTTLManager::inner_handle_single_tenant_event(uint64_t tenant_id, common::O
ret = OB_ERR_NULL_VALUE;
LOG_WARN("fatal err, ttl ctx in map is null", K(tenant_info->tenant_id_), K(ret));
} else if (OB_FAIL(inner_handle_one_partition_event(tenant_info, ctx))) {
LOG_WARN("fail to handle one partition event",
K(ret), K(ctx->task_info_.pkey_));
LOG_WARN("fail to handle one partition event", K(ret), K(ctx->task_info_.pkey_));
} else if (ctx->is_dirty_ && OB_FAIL(parts_array.push_back(ctx->task_info_.pkey_))) {
LOG_WARN("fail to pushback ttl pk", K(ret), K(ctx->task_info_.pkey_));
}
......@@ -772,25 +857,49 @@ int ObTTLManager::inner_handle_one_partition_event(ObTTLTenantInfo* tenant_info,
if (OB_TTL_TASK_RUNNING == tenant_info->state_) {
if (OB_TTL_TASK_PENDING == ctx->task_status_) {
try_schedule = true;
} else if (OB_TTL_TASK_PREPARE == ctx->task_status_ ||
OB_TTL_TASK_FINISH == ctx->task_status_ ||
OB_TTL_TASK_CANCEL == ctx->task_status_) {
// do nothing
} else {
LOG_WARN("no expected task status", KPC(tenant_info), KPC(ctx));
}
} else if (OB_TTL_TASK_PENDING == tenant_info->state_) {
if (OB_TTL_TASK_RUNNING == ctx->task_status_) {
if (OB_TTL_TASK_RUNNING == ctx->task_status_ ||
OB_TTL_TASK_PREPARE == ctx->task_status_) {
ctx->task_status_ = tenant_info->state_;
mark_ttl_ctx_dirty(tenant_info, ctx);
} else if (OB_TTL_TASK_FINISH == tenant_info->state_){
// do nothing, no need schedule finish task again
} else {
LOG_WARN("no expected task status", KPC(tenant_info), KPC(ctx));
}
} else if (OB_TTL_TASK_CANCEL == tenant_info->state_) {
if (OB_TTL_TASK_PENDING >= ctx->task_status_) {
if (OB_TTL_TASK_PREPARE == ctx->task_status_ ||
OB_TTL_TASK_RUNNING == ctx->task_status_ ||
OB_TTL_TASK_PENDING == ctx->task_status_ ||
OB_TTL_TASK_FINISH == ctx->task_status_) {
ctx->task_status_ = tenant_info->state_;
mark_ttl_ctx_dirty(tenant_info, ctx);
} else {
LOG_WARN("no expected task status", KPC(tenant_info), KPC(ctx));
}
} else if (OB_TTL_TASK_MOVING == tenant_info->state_) {
//do nothing
if (OB_TTL_TASK_PREPARE == ctx->task_status_) {
ctx->task_status_ = OB_TTL_TASK_FINISH; // will refresh real status from task table
mark_ttl_ctx_dirty(tenant_info, ctx);
} else if (OB_TTL_TASK_FINISH == ctx->task_status_ ||
OB_TTL_TASK_CANCEL == ctx->task_status_) {
// do nothing, normal partition task
} else {
LOG_WARN("no expected task status", KPC(tenant_info), KPC(ctx));
}
} else {
LOG_WARN("invalid ttl tenant task state", K(tenant_info->state_));
}
if (try_schedule && OB_FAIL(try_schedule_task(tenant_info, ctx))) {
if (OB_EAGAIN != ret) {
if (OB_SIZE_OVERFLOW != ret) {
LOG_WARN("fail to try schedule dag task", K(ret), K(ctx->task_info_.pkey_));
} else {
ret = OB_SUCCESS;
......@@ -895,35 +1004,32 @@ int ObTTLManager::get_ttl_para_from_schema(const schema::ObTableSchema *table_sc
return ret;
}
int ObTTLManager::check_partition_can_gen_ttl(ObIPartitionGroup *partition,
int ObTTLManager::check_partition_can_gen_ttl(const ObPartitionKey& pkey,
ObTTLPara &para, bool& can_ttl)
{
int ret = OB_SUCCESS;
const schema::ObTableSchema *table_schema = NULL;
schema::ObSchemaGetterGuard schema_guard;
ObTTLTaskCtx* ttl_ctx = NULL;
ObPartitionState state;
can_ttl = false;
if (OB_ISNULL(partition)) {
if (OB_UNLIKELY(!pkey.is_valid() || pkey.is_pg())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("partition is null", K(ret));
LOG_WARN("invalid argument", K(ret), K(pkey), K(pkey.is_pg()));
} else {
ObPartitionKey pkey = partition->get_partition_key();
state = partition->get_partition_state();
if (!is_leader_state(state)) {
//do nothing, the partition should be a leader
} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(pkey.get_tenant_id(), schema_guard))) {
uint64_t tenant_id = pkey.get_tenant_id();
uint64_t table_id = pkey.get_table_id();
if (OB_FAIL(schema_service_->get_schema_guard(schema_guard))) {
LOG_WARN("failed to get schema guard", K(ret));
} else if (OB_FAIL(schema_guard.get_table_schema(pkey.get_table_id(), table_schema))) {
LOG_WARN("get table schema failed", K(pkey.get_table_id()), K(ret));
} else if (OB_FAIL(schema_guard.get_table_schema(table_id, table_schema))) {
LOG_WARN("get table schema failed", K(table_id), K(ret));
} else if (OB_ISNULL(table_schema)) {
ret = OB_SCHEMA_ERROR;
LOG_WARN("null table schema", K(ret));
} else if (table_schema->is_in_recyclebin()) {
// do nothing
} else if (table_schema->get_comment_str().empty()) {
//do nothing
// do nothing
} else if (OB_NOT_NULL(ttl_ctx = get_one_partition_ctx(pkey))) {
LOG_DEBUG("partition task exist", K(pkey));
} else if (OB_FAIL(get_ttl_para_from_schema(table_schema, para, can_ttl))) {
......@@ -947,7 +1053,7 @@ int ObTTLManager::try_schedule_prepare_task(ObPartitionKey& pkey)
// do nothing
} else if (FALSE_IT(ctx->task_status_ = OB_TTL_TASK_PENDING)) {
} else if (OB_FAIL(try_schedule_task(tenant_info, ctx))) {
if (OB_EAGAIN != ret) {
if (OB_SIZE_OVERFLOW != ret) {
LOG_WARN("fail to schedule task", K(ret));
} else {
ret = OB_SUCCESS;
......@@ -960,36 +1066,62 @@ int ObTTLManager::try_schedule_prepare_task(ObPartitionKey& pkey)
int ObTTLManager::sync_sys_table(ObPartitionKey& pkey)
{
int ret = OB_SUCCESS;
ObTTLTaskCtx cp_ctx;
ObArenaAllocator allocator(lib::ObLabel("TTLStatusRecord"));
uint64_t tenant_id = OB_INVALID_ID;
uint64_t tenant_id = pkey.get_tenant_id();
ObTTLTaskCtx* ctx = NULL;
if (OB_UNLIKELY(!pkey.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(pkey));
} else {
tenant_id = pkey.get_tenant_id();
common::ObSpinLockGuard guard(lock_);
ObTTLTaskCtx* ctx = get_one_partition_ctx(pkey);
ctx = get_one_partition_ctx(pkey);
if (OB_ISNULL(ctx)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("ctx is null", K(ret));
} else if (OB_UNLIKELY(!ctx->is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid partition task ctx", K(ret), KPC(ctx));
} else {}
}
if (OB_SUCC(ret) && OB_UNLIKELY(ctx->need_refresh_)) {
switch (ctx->task_status_) {
case OB_TTL_TASK_FINISH: // tenant_info must be in moving status
case OB_TTL_TASK_PREPARE: {
if (OB_FAIL(refresh_partition_task(*ctx, true /*refresh_status*/, true))) {
LOG_WARN("fail to refresh partition task from task table", K(ret));
} else {
cp_ctx = *ctx;
if (ctx->task_info_.row_key_.empty()) {
//do nothing
} else if (OB_FAIL(ob_write_string(allocator, ctx->task_info_.row_key_, cp_ctx.task_info_.row_key_)) ) {
LOG_ERROR("fail to deep copy first key", K(ret));
if (ctx->task_info_.err_code_ == OB_NOT_MASTER ||
(ctx->task_status_ != OB_TTL_TASK_FINISH && ctx->task_status_ != OB_TTL_TASK_CANCEL)) {
ctx->task_status_ = OB_TTL_TASK_PREPARE;
}
ctx->need_refresh_ = false;
ctx->task_info_.err_code_ = OB_SUCCESS;
}
break;
}
case OB_TTL_TASK_RUNNING:
case OB_TTL_TASK_PENDING:
case OB_TTL_TASK_CANCEL: {
if (OB_FAIL(refresh_partition_task(*ctx, false /*refresh_status*/))) {
LOG_WARN("fail to refresh partition task from task table", K(ret));
} else {
ctx->need_refresh_ = false;
}
break;
}
default: {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected ttl task status", K(ret));
break;
}
}
}
if (OB_SUCC(ret)) {
common::ObTTLStatus ttl_record;
switch (cp_ctx.task_status_) {
switch (ctx->task_status_) {
case OB_TTL_TASK_PREPARE: {
ObMySQLTransaction trans;
ObTTLStatusFieldArray filters;
......@@ -997,19 +1129,19 @@ int ObTTLManager::sync_sys_table(ObPartitionKey& pkey)
ObTTLStatusFieldArray filter;
bool commit = false;
int tmp_ret = OB_SUCCESS;
if (OB_FAIL(construct_task_record_filter(cp_ctx.task_info_.task_id_,
cp_ctx.task_info_.pkey_.get_table_id(),
cp_ctx.task_info_.pkey_.get_partition_id(),
if (OB_FAIL(construct_task_record_filter(ctx->task_info_.task_id_,
ctx->task_info_.pkey_.get_table_id(),
ctx->task_info_.pkey_.get_partition_id(),
filters))) {
LOG_WARN("fail to construct task record filter", K(ret));
} else if (OB_FAIL(trans.start(get_sql_proxy(), tenant_id))) {
LOG_WARN("fail to start transation", K(ret));
LOG_WARN("fail to start transation", K(ret), K(tenant_id));
} else if (OB_FAIL(ObTTLUtil::read_ttl_tasks(tenant_id, share::OB_ALL_KV_TTL_TASK_TNAME,
trans, filters, ttl_records, true, &allocator))) {
LOG_WARN("fail to get ttl tasks", K(ret));
} else {
if (ttl_records.empty()) {
if (OB_FAIL(construct_sys_table_record(&cp_ctx, ttl_record))) {
if (OB_FAIL(construct_sys_table_record(ctx, ttl_record))) {
LOG_WARN("fail to construct sys table record", K(ret));
} else if (OB_FAIL(ObTTLUtil::insert_ttl_task(tenant_id, share::OB_ALL_KV_TTL_TASK_TNAME,
trans, ttl_record))) {
......@@ -1019,39 +1151,78 @@ int ObTTLManager::sync_sys_table(ObPartitionKey& pkey)
if (ttl_records.count() != 1) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect ttl records count", K(ret), K(ttl_records.count()));
} else if (OB_FAIL(from_ttl_record(pkey, ttl_records.at(0), allocator))) {
LOG_WARN("fail to convert from ttl record", K(ret));
}
} else { /* do nothing, prepare task only sync once with true need_refrsh flag */ }
}
}
//do commit
commit = (OB_SUCCESS == ret);
tmp_ret = ret;
if (trans.is_started()) {
bool commit = (OB_SUCCESS == ret);
int tmp_ret = ret;
if (OB_FAIL(trans.end(commit))) {
LOG_WARN("fail to end transaction", K(ret), K(commit));
LOG_WARN("faile to end trans", "commit", commit, K(ret));
}
ret = tmp_ret == OB_SUCCESS ? ret : tmp_ret;
}
ret = OB_SUCCESS == tmp_ret ? ret : tmp_ret;
//change prepare state to running/pending
// change prepare state to running/pending
if (OB_SUCC(ret) && OB_FAIL(try_schedule_prepare_task(pkey))) {
LOG_WARN("fail to schedule prepare task", K(ret));
}
break;
}
case OB_TTL_TASK_FINISH:
case OB_TTL_TASK_RUNNING:
case OB_TTL_TASK_PENDING:
case OB_TTL_TASK_CANCEL:
case OB_TTL_TASK_FINISH: {
if (OB_FAIL(construct_sys_table_record(&cp_ctx, ttl_record))) {
case OB_TTL_TASK_CANCEL: {
ObMySQLTransaction trans;
ObTTLStatusFieldArray filters;
common::ObTTLStatusArray ttl_records;
ObTTLStatusFieldArray filter;
bool commit = false;
int tmp_ret = OB_SUCCESS;
if (OB_FAIL(construct_task_record_filter(ctx->task_info_.task_id_,
ctx->task_info_.pkey_.get_table_id(),
ctx->task_info_.pkey_.get_partition_id(),
filters))) {
LOG_WARN("fail to construct task record filter", K(ret));
} else if (OB_FAIL(trans.start(get_sql_proxy(), tenant_id))) {
LOG_WARN("fail to start transation", K(ret));
} else if (OB_FAIL(ObTTLUtil::read_ttl_tasks(tenant_id, share::OB_ALL_KV_TTL_TASK_TNAME,
trans, filters, ttl_records, true, &allocator))) {
LOG_WARN("fail to get ttl tasks", K(ret));
} else {
if (ttl_records.empty()) {
if (OB_FAIL(construct_sys_table_record(ctx, ttl_record))) {
LOG_WARN("fail to construct sys table record", K(ret));
} else if (OB_FAIL(ObTTLUtil::insert_ttl_task(tenant_id, share::OB_ALL_KV_TTL_TASK_TNAME,
trans, ttl_record))) {
LOG_WARN("fail to insert ttl task", K(ret));
}
} else {
if (ttl_records.count() != 1) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect ttl records count", K(ret), K(ttl_records.count()));
} else if (OB_FAIL(construct_sys_table_record(ctx, ttl_record))) {
LOG_WARN("fail to construct sys table record", K(ret));
} else if (OB_FAIL(ObTTLUtil::update_ttl_task_all_fields(tenant_id,
share::OB_ALL_KV_TTL_TASK_TNAME,
*get_sql_proxy(), ttl_record))) {
trans, ttl_record))) {
LOG_WARN("fail to update ttl task in sys table", K(ret), K(ttl_record));
}
}
}
if (trans.is_started()) {
bool commit = (OB_SUCCESS == ret);
int tmp_ret = ret;
if (OB_FAIL(trans.end(commit))) {
LOG_WARN("faile to end trans", "commit", commit, K(ret));
}
ret = tmp_ret == OB_SUCCESS ? ret : tmp_ret;
}
break;
}
default: {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected ttl task status", K(ret));
......@@ -1135,16 +1306,12 @@ int ObTTLManager::construct_task_record_filter(const uint64_t& task_id,
return ret;
}
// deprecated
int ObTTLManager::deep_copy_all_tenant_ctxs(common::ObSArray<ObTTLTaskCtx>& ctx_array,
common::ObArenaAllocator& allocator,
uint64_t tenant_id)
int ObTTLManager::copy_all_tenant_ctxs(common::ObSArray<ObTTLTaskCtx *>& ctx_array, uint64_t tenant_id)
{
int ret = OB_SUCCESS;
common::ObSpinLockGuard guard(lock_);
ObTTLTenantInfo* tenant_info = get_tenant_info(tenant_id, false);
ObTTLTaskCtx* ctx = NULL;
ObTTLTaskCtx cp_ctx;
if (OB_ISNULL(tenant_info)) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("fail to get ttl tenant info", K(ret));
......@@ -1155,17 +1322,11 @@ int ObTTLManager::deep_copy_all_tenant_ctxs(common::ObSArray<ObTTLTaskCtx>& ctx_
if (OB_ISNULL(ctx)) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("fatal err, ttl ctx in map is null", K(tenant_info->tenant_id_), K(ret));
} else {
cp_ctx = *ctx;
if (!ctx->task_info_.row_key_.empty() &&
OB_FAIL(ob_write_string(allocator, ctx->task_info_.row_key_, cp_ctx.task_info_.row_key_))) {
LOG_WARN("fail to deep copy first key", K(ret));
} else if (OB_FAIL(ctx_array.push_back(cp_ctx))) {
} else if (OB_FAIL(ctx_array.push_back(ctx))) {
LOG_WARN("fail to push back ctx array", K(ret));
}
}
}
}
return ret;
}
......@@ -1173,40 +1334,67 @@ int ObTTLManager::move_record_to_history_table(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
common::ObArenaAllocator allocator;
common::ObSArray<ObTTLTaskCtx> need_move_ctxs;
common::ObSArray<ObTTLTaskCtx *> need_move_ctxs;
uint64_t move_rows_cnt = 0;
ObTTLTenantInfo* tenant_info = get_tenant_info(tenant_id, false);
if (OB_FAIL(deep_copy_all_tenant_ctxs(need_move_ctxs, allocator, tenant_id))) {
if (OB_ISNULL(tenant_info)) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("fail to get ttl tenant info", K(ret));
} else if (!tenant_info->is_finished_) {
LOG_INFO("new partition leader on , cannot move right now", K(ret), KPC(tenant_info));
} else if (OB_FAIL(copy_all_tenant_ctxs(need_move_ctxs, tenant_id))) {
LOG_WARN("fail to deep copy ctx", K(ret));
} else {
for (int i = 0; i < need_move_ctxs.count() && OB_SUCC(ret); ++i) {
if (!need_move_ctxs.at(i)->is_moved_) {
ObMySQLTransaction trans;
common::ObTTLStatus ttl_record;
if (OB_FAIL(construct_sys_table_record(&(need_move_ctxs.at(i)), ttl_record))) {
LOG_WARN("fail to construct sys table record", K(ret), K(need_move_ctxs.at(i)));
if (OB_FAIL(construct_sys_table_record(need_move_ctxs.at(i), ttl_record))) {
LOG_WARN("fail to construct sys table record", K(ret), KPC(need_move_ctxs.at(i)));
} else if (OB_FAIL(trans.start(get_sql_proxy(), ttl_record.tenant_id_))) {
LOG_WARN("failt to start trans", K(ret), K(ttl_record.tenant_id_));
} else {
ObTTLStatusKey key(ttl_record.tenant_id_, ttl_record.table_id_,
ttl_record.partition_id_, ttl_record.task_id_);
if (OB_FAIL(ObTTLUtil::insert_ttl_task(ttl_record.tenant_id_,
int64_t affected_rows = 0;
if (OB_FAIL(common::ObTTLUtil::delete_ttl_task(ttl_record.tenant_id_,
share::OB_ALL_KV_TTL_TASK_TNAME,
trans, key, affected_rows))) {
LOG_WARN("fail to delete ttl record in __all_kv_ttl_task", K(ret));
} else {
if (affected_rows == 1) {
// NOTE: use replace instead of insert , because when old partition leader
// move record to history table first, new partition leader may write partition task
// into task table and move, it will cause OB_ERR_PRIMARY_KEY_DUPLICATE
if (OB_FAIL(ObTTLUtil::replace_ttl_task(ttl_record.tenant_id_,
share::OB_ALL_KV_TTL_TASK_HISTORY_TNAME,
trans, ttl_record))) {
LOG_WARN("fail to insert ttl task into __all_ttl_task_status_history.", K(ret));
} else if (OB_FAIL(common::ObTTLUtil::delete_ttl_task(ttl_record.tenant_id_,
share::OB_ALL_KV_TTL_TASK_TNAME,
trans, key))) {
LOG_WARN("fail to delete ttl tasks status", K(ret));
LOG_WARN("fail to replace into ttl task into __all_kv_ttl_task_history.", K(ret));
}
} else if (affected_rows == 0) {
LOG_INFO("delete affecte 0 row, record maybe moved by other observer", K(ret));
} else {
LOG_WARN("unexpected affected rows", K(ret), K(affected_rows));
}
}
bool commit = (OB_SUCCESS == ret);
int tmp_ret = ret;
if (OB_FAIL(trans.end(commit))) {
LOG_WARN("fail to end transaction", K(ret), K(commit));
}
ret = OB_SUCCESS == tmp_ret ? ret : tmp_ret;
if (OB_SUCC(ret)) {
need_move_ctxs.at(i)->is_moved_ = true;
move_rows_cnt += affected_rows;
}
}
}
}
LOG_DEBUG("finish move record to history table", K(ret), K(tenant_id), K(need_move_ctxs.count()));
}
LOG_INFO("finish move record to history table", K(ret), K(tenant_id), K(need_move_ctxs), K(move_rows_cnt));
return ret;
}
......@@ -1233,7 +1421,7 @@ int ObTTLManager::response_ttl_cmd(const uint64_t& tenant_id, const uint64_t& ta
ret = OB_ERR_SYS;
LOG_WARN("innner system error, rootserver rpc proxy or rs mgr must not be NULL", K(ret), K(GCTX));
} else if (OB_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_addr))) {
LOG_WARN("fail to get rootservice address", K(ret));
LOG_WARN("fail to get rootservice address", K(ret), K(rs_addr));
} else if (OB_FAIL(GCTX.rs_rpc_proxy_->to(rs_addr).ttl_response(arg))) {
if (OB_TENANT_NOT_EXIST == ret && OB_FAIL(mark_tenant_droped(tenant_id))) {
LOG_WARN("fail to mark tenant droped", K(ret), K(tenant_id));
......@@ -1254,37 +1442,105 @@ int ObTTLManager::mark_tenant_droped(const uint64_t& tenant_id)
if (OB_ISNULL(tenant_info)) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("tenant info is null", K(tenant_id));
} else if (tenant_info->state_ == OB_TTL_TASK_FINISH || tenant_info->state_ == OB_TTL_TASK_CANCEL) {
LOG_INFO("mark tenant droped", K(tenant_info->state_));
tenant_info->state_ == OB_TTL_TASK_MOVING;
tenant_info->rsp_time_ == OB_INVALID_ID;
tenant_info->is_dirty_ = true;
} else {
bool tenant_not_exist = false;
schema::ObSchemaGetterGuard schema_guard;
if (OB_FAIL(schema::ObMultiVersionSchemaService::get_instance().get_schema_guard(schema_guard))) { // double check
LOG_WARN("fail to get schema guard", K(ret));
} else if (OB_FAIL(schema_guard.check_if_tenant_has_been_dropped(tenant_id, tenant_not_exist))) {
LOG_WARN("fail to check tenant exists", K(ret), K(tenant_id));
} else if (tenant_not_exist) {
// tenant_info->is_dirty_ = false; // no need to scan and sync sys table
// tenant_info->need_check_ = false; // no need to check partitions
// tenant_info->rsp_time_ == OB_INVALID_ID; // no need response
// tenant_info->state_ = OB_TTL_TASK_INVALID;
// tenant_info->is_droped_ = true;
// tenant_info->part_task_map_.reuse();
if (OB_FAIL(ttl_tenant_parts_map_.erase_refactored(tenant_id))) {
LOG_WARN("fail to erase tenant info", K(tenant_id));
} else {
tenant_info->destory();
allocator_.free(tenant_info);
}
LOG_INFO("tenant is droped, drop tenant info directly", K(tenant_id));
}
}
return ret;
}
int ObTTLManager::check_and_reset_droped_tenant()
{
int ret = OB_SUCCESS;
common::ObSpinLockGuard guard(lock_);
for (ttl_tenants_iterator iter = ttl_tenant_parts_map_.begin();
iter != ttl_tenant_parts_map_.end() && OB_SUCC(ret); ++iter) {
uint64_t tenant_id = iter->first;
ObTTLTenantInfo *tenant_info = iter->second;
if (OB_ISNULL(tenant_info)) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("tenant info is null", K(ret), K(tenant_id));
} else {
bool tenant_not_exist = false;
schema::ObSchemaGetterGuard schema_guard;
if (OB_FAIL(schema::ObMultiVersionSchemaService::get_instance().get_schema_guard(schema_guard))) {
LOG_WARN("fail to get schema guard", K(ret));
} else if (OB_FAIL(schema_guard.check_if_tenant_has_been_dropped(tenant_id, tenant_not_exist))) {
LOG_WARN("fail to check tenant exists", K(ret), K(tenant_id));
} else if (tenant_not_exist) {
tenant_info->is_dirty_ = false; // no need to scan and sync sys table
tenant_info->need_check_ = false; // no need to check partitions
tenant_info->rsp_time_ == OB_INVALID_ID; // no need response
tenant_info->state_ = OB_TTL_TASK_INVALID;
tenant_info->is_droped_ = true;
tenant_info->part_task_map_.reuse();
LOG_INFO("tenant is droped, reset partition task map", K(tenant_id), KPC(tenant_info));
}
}
}
return ret;
}
int ObTTLManager::from_ttl_record(ObPartitionKey& pkey, common::ObTTLStatus& record,
common::ObArenaAllocator& allocator)
int ObTTLManager::from_ttl_record(ObPartitionKey& pkey, common::ObTTLStatus& record, bool with_status /*true*/, bool with_err_code /*true*/)
{
int ret = OB_SUCCESS;
if (!pkey.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret));
} else {
common::ObSpinLockGuard guard(lock_);
ObTTLTenantInfo *tenant_info = get_tenant_info(pkey.get_tenant_id(), false);
ObTTLTaskCtx* ctx = get_one_partition_ctx(pkey);
char *rowkey_buf = NULL;
if (OB_ISNULL(ctx)) {
if (OB_ISNULL(ctx) || OB_ISNULL(tenant_info)) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("unexpected null tenant info", K(ret));
LOG_WARN("unexpected null value", K(ret), KP(ctx), KP(tenant_info));
} else if (pkey.get_tenant_id() != record.tenant_id_ ||
pkey.get_partition_id() != record.partition_id_ ||
pkey.get_table_id() != record.table_id_) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("fatel error, record not match pkey", K(pkey), K(record));
} else {
ctx->task_info_.err_code_ = record.ret_code_.compare("OB_SUCCESS") == 0 ? OB_SUCCESS : OB_INVALID_ERROR;
ctx->task_info_.pkey_ = pkey;
ctx->task_info_.task_id_ = record.task_id_;
ctx->task_start_time_ = record.task_start_time_;
ctx->task_end_time_ = record.task_update_time_;
ctx->task_info_.is_user_trigger_ = record.trigger_type_ == TRIGGER_TYPE::USER_TRIGGER;
ctx->task_info_.ttl_del_cnt_ = record.ttl_del_cnt_;
ctx->task_info_.max_version_del_cnt_ = record.max_version_del_cnt_;
ctx->task_info_.scan_cnt_ = record.scan_cnt_;
char *rowkey_buf = static_cast<char *>(allocator.alloc(record.row_key_.length()));
if (with_err_code) {
if (record.ret_code_.compare("OB_SUCCESS") == 0) {
ctx->task_info_.err_code_ = OB_SUCCESS;
} else if (record.ret_code_.compare("OB_NOT_MASTER") == 0) {
ctx->task_info_.err_code_ = OB_NOT_MASTER;
} else {
ctx->task_info_.err_code_ = OB_INVALID_ERROR;
}
}
if (with_status) {
ctx->task_status_ = static_cast<ObTTLTaskStatus>(record.status_);
}
if (!record.row_key_.empty()) {
char *rowkey_buf = static_cast<char *>(tenant_info->allocator_.alloc(record.row_key_.length()));
if (OB_ISNULL(rowkey_buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc memory", K(ret));
......@@ -1293,6 +1549,8 @@ int ObTTLManager::from_ttl_record(ObPartitionKey& pkey, common::ObTTLStatus& rec
ctx->task_info_.row_key_.assign(rowkey_buf, record.row_key_.length());
}
}
}
}
LOG_DEBUG("finish from ttl record", K(ret), K(pkey));
return ret;
}
......@@ -1304,10 +1562,10 @@ bool ObTTLManager::can_schedule_tenant(const ObTTLTenantInfo &tenant_info)
bool ObTTLManager::can_schedule_task(const ObTTLTaskCtx &ttl_task)
{
return ttl_task.task_status_ == OB_TTL_TASK_PENDING || ttl_task.task_status_ == OB_TTL_TASK_PREPARE;
return ttl_task.task_status_ == OB_TTL_TASK_PENDING;
}
int ObTTLManager::try_schedule_remaining_tasks(ObTTLTenantInfo* tenant_info)
int ObTTLManager::try_schedule_remaining_tasks(ObTTLTenantInfo* tenant_info, const ObTTLTaskCtx *current_ctx)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(tenant_info)) {
......@@ -1322,22 +1580,24 @@ int ObTTLManager::try_schedule_remaining_tasks(ObTTLTenantInfo* tenant_info)
if (OB_ISNULL(ctx)) {
ret = OB_ERR_NULL_VALUE;
LOG_ERROR("fatal err, ttl ctx in map is null", K(ret), K(tenant_info->tenant_id_));
} else if (current_ctx == ctx) {
// do nothing
} else if (can_schedule_task(*ctx)) {
if (OB_FAIL(try_schedule_task(tenant_info, ctx))) {
if (OB_EAGAIN != ret) {
if (OB_SIZE_OVERFLOW != ret) {
LOG_WARN("fail to schedule task", K(ret));
}
}
}
}
if (OB_EAGAIN == ret) {
if (OB_SIZE_OVERFLOW == ret) {
ret = OB_SUCCESS;
}
}
return ret;
}
// try schedule partition task, reutrn OB_EAGAIN if dag scheduler is full
// try schedule partition task, reutrn OB_SIZE_OVERFLOW if dag scheduler is full
int ObTTLManager::try_schedule_task(ObTTLTenantInfo* tenant_info, ObTTLTaskCtx* ctx)
{
int ret = OB_SUCCESS;
......@@ -1345,7 +1605,11 @@ int ObTTLManager::try_schedule_task(ObTTLTenantInfo* tenant_info, ObTTLTaskCtx*
LOG_WARN("invalid argument", K(ret), KP(tenant_info), KP(ctx));
} else if (can_schedule_tenant(*tenant_info) && can_schedule_task(*ctx)) {
if (OB_FAIL(generate_ttl_dag(ctx->task_info_, ctx->ttl_para_))) {
if (OB_EAGAIN != ret) {
if (OB_EAGAIN == ret) {
ret = OB_SUCCESS;
} else if (OB_SIZE_OVERFLOW == ret) {
// do noting
} else {
LOG_WARN("fail to generate dag task", K(ret));
}
} else {
......@@ -1371,3 +1635,49 @@ void ObTTLManager::mark_ttl_ctx_dirty(ObTTLTenantInfo* tenant_info, ObTTLTaskCtx
}
}
int ObTTLManager::refresh_partition_task(ObTTLTaskCtx &ttl_task, bool refresh_status, bool refresh_retcode /*false*/)
{
int ret = OB_SUCCESS;
ObMySQLTransaction trans;
ObTTLStatusFieldArray filters;
common::ObTTLStatusArray ttl_records;
ObTTLStatusFieldArray filter;
ObPartitionKey pkey = ttl_task.task_info_.pkey_;
uint64_t tenant_id = pkey.get_tenant_id();
ObTTLTenantInfo *tenant_info = get_tenant_info(pkey.get_tenant_id(), false);
if (!ttl_task.is_valid() || OB_ISNULL(tenant_info)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(ttl_task), KP(tenant_info));
} else if (OB_FAIL(construct_task_record_filter(ttl_task.task_info_.task_id_,
pkey.get_table_id(),
pkey.get_partition_id(),
filters))) {
LOG_WARN("fail to construct task record filter", K(ret), K(ttl_task));
} else if (OB_FAIL(trans.start(get_sql_proxy(), tenant_id))) {
LOG_WARN("fail to start transation", K(ret));
} else if (OB_FAIL(ObTTLUtil::read_ttl_tasks(tenant_id, share::OB_ALL_KV_TTL_TASK_TNAME,
trans, filters, ttl_records, true, &tenant_info->allocator_))) {
LOG_WARN("fail to get ttl tasks", K(ret), K(tenant_id), K(filters));
} else {
if (ttl_records.empty()) {
// do nothing
} else {
if (ttl_records.count() != 1) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect ttl records count", K(ret), K(ttl_records.count()));
} else if (OB_FAIL(from_ttl_record(pkey, ttl_records.at(0), refresh_status, refresh_retcode))) {
LOG_WARN("fail to convert from ttl record", K(ret), K(pkey), K(refresh_status));
}
}
}
if (trans.is_started()) {
bool commit = (OB_SUCCESS == ret);
int tmp_ret = ret;
if (OB_FAIL(trans.end(commit))) {
LOG_WARN("faile to end trans", "commit", commit, K(ret));
}
ret = tmp_ret == OB_SUCCESS ? ret : tmp_ret;
}
return ret;
}
......@@ -35,7 +35,9 @@ public :
task_end_time_(OB_INVALID_ID),
failure_times_(0),
rsp_time_(OB_INVALID_ID),
is_dirty_(false) {}
is_dirty_(false),
is_moved_(false),
need_refresh_(true) {}
bool is_valid()
{
return task_info_.is_valid() && ttl_para_.is_valid();
......@@ -43,10 +45,9 @@ public :
TO_STRING_KV(K_(task_info), K_(task_status), K_(ttl_para), K_(task_start_time),
K_(last_modify_time), K_(task_end_time), K_(failure_times),
K_(rsp_time), K_(is_dirty));
K_(rsp_time), K_(is_dirty), K_(is_moved), K_(need_refresh));
public:
ObTTLTaskInfo task_info_;
common::ObTTLTaskStatus task_status_;
ObTTLPara ttl_para_;
......@@ -57,7 +58,9 @@ public:
int64_t rsp_time_;
bool is_invalid_;
bool is_dirty_;
bool is_dirty_; // should sync sys table for tasks
bool is_moved_;
bool need_refresh_; // should refresh task from task table
};
class OBTTLTimerPeriodicTask : public common::ObTimerTask {
......@@ -80,7 +83,7 @@ public:
bool is_usr_trigger, obrpc::ObTTLRequestArg::TTLRequestType cmd);
int report_task_status(ObTTLTaskInfo& task_info,
ObTTLPara& task_para, bool& is_stop);
void on_leader_active(storage::ObIPartitionGroup* partition);
void on_leader_active(const ObPartitionKey& pkey);
void on_schema_changed(uint64_t schema_changed_tenant_id);
/*timer handle function*/
......@@ -105,7 +108,27 @@ private:
cmd_type_(obrpc::ObTTLRequestArg::TTL_INVALID_TYPE),
rsp_time_(OB_INVALID_ID),
state_(common::ObTTLTaskStatus::OB_TTL_TASK_INVALID),
is_droped_(false) {}
is_droped_(false),
is_finished_(false)
{}
void destory()
{
part_task_map_.destroy();
allocator_.reset();
}
TO_STRING_KV(K_(tenant_id),
K_(task_id),
K_(is_usr_trigger),
K_(need_check),
K_(is_dirty),
K_(ttl_continue),
K_(cmd_type),
K_(rsp_time),
K_(state),
K_(is_droped),
K_(is_finished));
public:
PartTasksMap part_task_map_;
common::ObArenaAllocator allocator_;
uint64_t tenant_id_;
......@@ -115,16 +138,10 @@ private:
bool is_dirty_; /*need check the current ctx task*/
bool ttl_continue_;
obrpc::ObTTLRequestArg::TTLRequestType cmd_type_;
int64_t rsp_time_;
int64_t rsp_time_; // OB_INVALID_ID means no need response
common::ObTTLTaskStatus state_;
bool is_droped_;
public:
void destory()
{
part_task_map_.destroy();
allocator_.reset();
}
bool is_droped_; // tenant is droped
bool is_finished_; // all delete task is finished (or canceled)
};
typedef common::hash::ObHashMap<int64_t, ObTTLTenantInfo*> TenantPartsMap;
......@@ -142,7 +159,7 @@ private:
int generate_one_partition_task(ObTTLTaskInfo& task_info, ObTTLPara& para);
int get_ttl_para_from_schema(const share::schema::ObTableSchema *table_schema,
ObTTLPara& para, bool& is_tableapi_schema);
int check_partition_can_gen_ttl(storage::ObIPartitionGroup *partition,
int check_partition_can_gen_ttl(const ObPartitionKey& pkey,
ObTTLPara &para, bool& can_ttl);
int check_and_do_rsp(uint64_t tenant_id);
void mark_tenant_need_check(uint64_t tenant_id);
......@@ -163,20 +180,22 @@ private:
int sync_sys_table(ObPartitionKey& pkey);
int construct_sys_table_record(ObTTLTaskCtx* ctx, common::ObTTLStatus& ttl_record);
int try_schedule_task(ObTTLTenantInfo* tenant_info, ObTTLTaskCtx* ctx);
int try_schedule_remaining_tasks(ObTTLTenantInfo* tenant_info);
int try_schedule_remaining_tasks(ObTTLTenantInfo* tenant_info, const ObTTLTaskCtx *current_ctx);
bool can_schedule_tenant(const ObTTLTenantInfo &tenant_info);
bool can_schedule_task(const ObTTLTaskCtx &ttl_task);
int check_cmd_state_valid(common::ObTTLTaskStatus current_state, common::ObTTLTaskStatus incoming_state);
int deep_copy_all_tenant_ctxs(common::ObSArray<ObTTLTaskCtx>& ctx_array, common::ObArenaAllocator& allocator,
uint64_t tenant_id);
int from_ttl_record(ObPartitionKey& pkey, common::ObTTLStatus& record,
common::ObArenaAllocator& allocator);
int check_cmd_state_valid(const common::ObTTLTaskStatus current_state,
const common::ObTTLTaskStatus incoming_state);
int copy_all_tenant_ctxs(common::ObSArray<ObTTLTaskCtx *>& ctx_array, uint64_t tenant_id);
int from_ttl_record(ObPartitionKey& pkey, common::ObTTLStatus& record, bool with_status = true, bool with_err_code = true);
void mark_ttl_ctx_dirty(ObTTLTenantInfo* tenant_info, ObTTLTaskCtx* ctx);
void check_ttl_tenant_state(uint64_t tenant_id);
int transform_cmd_to_state(const obrpc::ObTTLRequestArg::TTLRequestType& cmd, common::ObTTLTaskStatus& state);
int try_schedule_prepare_task(ObPartitionKey& pkey);
void mark_tenant_checked(uint64_t tenant_id);
int mark_tenant_droped(const uint64_t& tenant_id);
int check_and_reset_droped_tenant();
obrpc::ObTTLRequestArg::TTLRequestType transform_state_to_cmd(const int64_t state);
int refresh_partition_task(ObTTLTaskCtx &ttl_task, bool refresh_status, bool refresh_retcode = false);
private:
static const int64_t DEFAULT_TTL_BUCKET_NUM = 100;
......@@ -192,6 +211,7 @@ private:
common::ObTimer ttl_timer_;
OBTTLTimerPeriodicTask periodic_task_;
common::ObSpinLock lock_;
bool is_first_cmd_; // recovery tenant info after restart
private:
void stop();
......@@ -203,7 +223,8 @@ private:
periodic_delay_(TTL_PERIODIC_DELAY),
ttl_timer_(),
periodic_task_(),
lock_()
lock_(),
is_first_cmd_(true)
{}
~ObTTLManager() {}
};
......
......@@ -55,10 +55,10 @@ int ObTableTTLDeleteTask::init(const ObTTLPara &ttl_para, ObTTLTaskInfo &ttl_inf
ObRowkey rowkey;
int64_t pos = 0;
if (OB_FAIL(rowkey.deserialize(allocator_, ttl_info.row_key_.ptr(), ttl_info.row_key_.length(), pos))) {
LOG_WARN("fail to deserialize rowkey", K(ret));
LOG_WARN("fail to deserialize rowkey", K(ret), K(ttl_info.row_key_));
} else if (rowkey.get_obj_cnt() != 2) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(rowkey));
LOG_WARN("invalid argument", K(ret), K(rowkey), K(ttl_info.row_key_));
} else {
first_key_ = rowkey.get_obj_ptr()[0].get_string();
second_key_ = rowkey.get_obj_ptr()[1].get_string();
......@@ -326,7 +326,7 @@ int ObTableTTLDeleteTask::process_one()
char *buf = static_cast<char *>(allocator_.alloc(buf_len));
int64_t pos = 0;
if (OB_FAIL(row_key.serialize(buf, buf_len, pos))) {
LOG_WARN("fail to deserialize", K(ret));
LOG_WARN("fail to serialize", K(ret));
} else {
result_key.assign_ptr(buf, buf_len);
}
......
......@@ -23,16 +23,6 @@ namespace oceanbase
namespace rootserver
{
#define OB_TTL_RESPONSE_MASK (1 << 5)
#define OB_TTL_STATUS_MASK (OB_TTL_RESPONSE_MASK - 1)
#define SET_TASK_PURE_STATUS(status, state) ((status) = ((state) & OB_TTL_STATUS_MASK) + ((status & OB_TTL_RESPONSE_MASK)))
#define SET_TASK_RESPONSE(status, state) ((status) |= (((state) & 1) << 5))
#define SET_TASK_STATUS(status, pure_status, is_responsed) { SET_TASK_PURE_STATUS(status, pure_status), SET_TASK_RESPONSE(status, is_responsed); }
#define EVAL_TASK_RESPONSE(status) (((status) & OB_TTL_RESPONSE_MASK) >> 5)
#define EVAL_TASK_PURE_STATUS(status) (static_cast<ObTTLTaskStatus>((status) & OB_TTL_STATUS_MASK))
ObClearTTLStatusHistoryTask::ObClearTTLStatusHistoryTask(ObRootService& rs)
: root_service_(rs)
{
......@@ -158,6 +148,8 @@ int ObTTLTenantTaskMgr::add_tenant(uint64_t tenant_id)
tenant_task.tenant_id_ = tenant_id;
if (OB_FAIL(ten_task_arr_.push_back(tenant_task))) {
LOG_WARN("fail to store tenant task", K(ret));
} else {
LOG_INFO("add tenant to tenant task array", K(tenant_id), K(tenant_task));
}
}
return ret;
......@@ -170,6 +162,7 @@ void ObTTLTenantTaskMgr::delete_tenant(uint64_t tenant_id)
if (ten_task.tenant_id_ == tenant_id) {
ten_task.reset();
ten_task_arr_.remove(i);
LOG_INFO("remove tennat task in tenant task array", K(tenant_id), K(ten_task_arr_));
break;
}
}
......@@ -306,7 +299,7 @@ int ObTTLTenantTaskMgr::add_ttl_task(uint64_t tenant_id,
}
if (curr_state != next_state) {
if (OB_FAIL(update_task_status(tenant_id, rs_task.ttl_status_.task_id_, static_cast<int64_t>(next_state)))) {
if (OB_FAIL(update_task_status(tenant_id, rs_task.ttl_status_.task_id_, static_cast<int64_t>(next_state), *GCTX.sql_proxy_))) {
LOG_WARN("fail to update ttl tasks", K(ret));
} else {
// update memory status only
......@@ -348,7 +341,7 @@ int ObTTLTenantTaskMgr::add_ttl_task(uint64_t tenant_id,
} else if (next_state != curr_state) {
// update status
if (OB_FAIL(update_task_status(tenant_id, rs_task.ttl_status_.task_id_,
static_cast<int64_t>(next_state)))) {
static_cast<int64_t>(next_state), *GCTX.sql_proxy_))) {
LOG_WARN("fail to update ttl tasks", K(ret));
} else {
rs_task.all_responsed_ = false;
......@@ -382,6 +375,12 @@ int ObTTLTenantTaskMgr::add_ttl_task(uint64_t tenant_id,
}
}
if (ret == OB_EAGAIN) {
// it's a success cmd, cannot return OB_EAGAIN to user
LOG_INFO("reset OB_EAGAIN to OB_SUCCESS, because ttl scheduler will resend later");
ret = OB_SUCCESS;
}
return ret;
}
......@@ -437,7 +436,7 @@ bool ObTTLTenantTaskMgr::need_retry_task(RsTenantTask& rs_task)
{
bool bool_ret = false;
int64_t cur_time = ObTimeUtility::current_time();
bool_ret = (cur_time - rs_task.ttl_status_.task_update_time_ < OB_TTL_TASK_RETRY_INTERVAL) ||
bool_ret = (cur_time - rs_task.ttl_status_.task_update_time_ >= OB_TTL_TASK_RETRY_INTERVAL) ||
(rs_task.server_infos_.count() == 0);
return bool_ret;
}
......@@ -510,7 +509,7 @@ int ObTTLTenantTaskMgr::process_tenant_tasks(uint64_t tenant_id)
}
}
} else {
LOG_INFO("process_tenant_tasks begin", K(tenant_id), K(task_count),
LOG_INFO("process_tenant_tasks begin", K(tenant_id), K(task_count), K(status_responsed),
K(curr_state), K(task_count));
// task_count == 1
if (status_responsed) {
......@@ -551,7 +550,7 @@ int ObTTLTenantTaskMgr::process_tenant_tasks(uint64_t tenant_id)
if (OB_SUCC(ret) && curr_state != next_state) {
if (OB_FAIL(update_task_status(tenant_id, cur_task.ttl_status_.task_id_,
static_cast<int64_t>(next_state)))) {
static_cast<int64_t>(next_state), *GCTX.sql_proxy_))) {
LOG_WARN("fail to update ttl tasks", K(ret));
} else {
cur_task.all_responsed_ = false;
......@@ -589,7 +588,8 @@ int ObTTLTenantTaskMgr::insert_tenant_task(ObTTLStatus& ttl_task)
int ObTTLTenantTaskMgr::update_task_status(uint64_t tenant_id,
uint64_t task_id,
int64_t status)
int64_t status,
common::ObISQLClient& proxy)
{
int ret = OB_SUCCESS;
ObTTLStatusKey key(tenant_id, OB_INVALID_ID, OB_INVALID_ID, task_id);
......@@ -611,7 +611,7 @@ int ObTTLTenantTaskMgr::update_task_status(uint64_t tenant_id,
} else {
if (OB_FAIL(ObTTLUtil::update_ttl_task(tenant_id,
share::OB_ALL_KV_TTL_TASK_TNAME,
*GCTX.sql_proxy_,
proxy,
key,
update_fields))) {
LOG_WARN("fail to update ttl task status.", K(ret), K(tenant_id), K(task_id), K(status));
......@@ -627,12 +627,13 @@ int ObTTLTenantTaskMgr::delete_task(uint64_t tenant_id, uint64_t task_id)
{
int ret = OB_SUCCESS;
ObTTLStatusKey key(tenant_id, OB_INVALID_ID, OB_INVALID_ID, task_id);
int64_t affected_rows = 0;
if (OB_FAIL(ObTTLUtil::delete_ttl_task(tenant_id,
share::OB_ALL_KV_TTL_TASK_TNAME,
*GCTX.sql_proxy_, key))) {
*GCTX.sql_proxy_, key, affected_rows))) {
LOG_WARN("fail to delete ttl tasks status", K(ret), K(tenant_id), K(task_id));
} else {
LOG_DEBUG("success to delete ttl tasks status", K(ret), K(tenant_id), K(task_id));
LOG_DEBUG("success to delete ttl tasks status", K(ret), K(tenant_id), K(task_id), K(affected_rows));
}
return ret;
......@@ -724,6 +725,8 @@ int ObTTLTenantTaskMgr::refresh_tenant(uint64_t tenant_id)
} else if (OB_FAIL(update_tenant_tasks(tenant_id, ttl_tasks))) {
LOG_WARN("fail to update tenant tasks", K(ret), K(tenant_id));
}
LOG_INFO("refresh tenant task from system table", K(tenant_id));
}
return ret;
}
......@@ -731,13 +734,11 @@ int ObTTLTenantTaskMgr::refresh_tenant(uint64_t tenant_id)
int ObTTLTenantTaskMgr::refresh_all()
{
int ret = OB_SUCCESS;
if (!need_refresh_) {
} else {
ObArray<uint64_t> tenant_ids;
if (OB_FAIL(get_tenant_ids(tenant_ids))) {
LOG_WARN("fail to get tenant ids", K(ret));
} else {
LOG_INFO("get all tenant ids", K(tenant_ids));
for (size_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); ++i) {
uint64_t tenant_id = tenant_ids.at(i);
if (OB_FAIL(refresh_tenant(tenant_id))) {
......@@ -746,10 +747,6 @@ int ObTTLTenantTaskMgr::refresh_all()
}
}
if (OB_SUCC(ret) && tenant_ids.count() > 0) {
need_refresh_ = false;
}
}
return ret;
}
......@@ -803,8 +800,7 @@ int ObTTLTenantTaskMgr::alter_status_and_add_ttl_task(uint64_t tenant_id)
if (OB_FAIL(in_active_time(tenant_id, is_active_time))) {
LOG_WARN("fail to eval active time", K(ret));
} else if (OB_FAIL(get_tenant_ptr(tenant_id, tenant_ptr))) {
need_refresh_ = true;
LOG_WARN("fail to get tenant task ptr", K(tenant_id), K(ret));
LOG_WARN("fail to get tenant task ptr, need refresh", K(tenant_id), K(ret));
} else {
ObTTLTenantTask& tenant_ref = *tenant_ptr;
size_t task_count = tenant_ref.tasks_.count();
......@@ -824,7 +820,7 @@ int ObTTLTenantTaskMgr::alter_status_and_add_ttl_task(uint64_t tenant_id)
if (!status_responsed) {
int64_t tmp_status = 0;
SET_TASK_STATUS(tmp_status, cur_state, 1);
if (OB_FAIL(update_task_status(tenant_id, rs_task.ttl_status_.task_id_, tmp_status))) {
if (OB_FAIL(update_task_status(tenant_id, rs_task.ttl_status_.task_id_, tmp_status, *GCTX.sql_proxy_))) {
LOG_WARN("fail to update ttl tasks", K(ret));
} else {
rs_task.set_servers_not_responsed();
......@@ -862,7 +858,7 @@ int ObTTLTenantTaskMgr::alter_status_and_add_ttl_task(uint64_t tenant_id)
* send move to servers, update status
*/
LOG_INFO("alter status and add ttl task", K(next_state));
if (OB_FAIL(update_task_status(tenant_id, rs_task.ttl_status_.task_id_, next_state))) {
if (OB_FAIL(update_task_status(tenant_id, rs_task.ttl_status_.task_id_, next_state, *GCTX.sql_proxy_))) {
LOG_WARN("fail to update ttl tasks", K(ret));
} else {
LOG_INFO("alter status and add ttl task", K(next_state));
......@@ -1098,7 +1094,7 @@ int ObTTLTenantTaskMgr::dispatch_ttl_request(const TTLServerInfos& server_infos,
}
}
LOG_INFO("send ttl server ttl request", K(ret), K(arg), K(send_cnt), K(server_infos.count()));
LOG_INFO("send ttl server ttl request", K(ret), K(arg), K(send_cnt), K(server_infos.count()), K(server_infos));
return ret;
}
......@@ -1210,8 +1206,12 @@ void ObTTLTenantTaskMgr::refresh_deleted_tenants()
exist = (del_ten_arr_.at(k) == tenant_id);
}
if (!exist && OB_FAIL(del_ten_arr_.push_back(tenant_id))) {
if (!exist) {
if (OB_FAIL(del_ten_arr_.push_back(tenant_id))) {
LOG_WARN("fail to store deleted tenant id", K(ret));
} else {
LOG_INFO("add tennat id to del tenant array", K(ret), K(tenant_id), K(del_ten_arr_));
}
}
}
}
......@@ -1248,6 +1248,7 @@ int ObTTLTenantTaskMgr::process_tenant_task_rsp(uint64_t tenant_id,
const ObAddr& server_addr)
{
int ret = OB_SUCCESS;
TTLMGR.refresh_all();
lib::ObMutexGuard guard(mutex_);
RsTenantTask* rs_task_ptr = NULL;
......@@ -1255,8 +1256,10 @@ int ObTTLTenantTaskMgr::process_tenant_task_rsp(uint64_t tenant_id,
LOG_WARN("fail to get tasks ptr", K(ret), K(tenant_id), K(task_id));
} else {
RsTenantTask& rs_task = *rs_task_ptr;
if (OB_FAIL(rsp_task_status(static_cast<ObTTLTaskType>(task_type), EVAL_TASK_PURE_STATUS(rs_task.ttl_status_.status_)))) {
LOG_WARN("response task type incorrect", K(ret), K(tenant_id), K(task_id), K(task_type));
LOG_WARN("response task type incorrect",
K(ret), K(tenant_id), K(task_id), K(task_type), K(EVAL_TASK_PURE_STATUS(rs_task.ttl_status_.status_)));
} else if (OB_FAIL(rs_task.set_server_responsed(server_addr))) {
LOG_WARN("fail to set server responsed", K(ret), K(tenant_id), K(task_id));
} else if (!EVAL_TASK_RESPONSE(rs_task.ttl_status_.status_) &&
......@@ -1282,23 +1285,50 @@ int ObTTLTenantTaskMgr::update_task_on_all_responsed(RsTenantTask& task)
ObTTLTaskStatus next_state, cur_state;
cur_state = EVAL_TASK_PURE_STATUS(task.ttl_status_.status_);
next_state = next_status(cur_state);
bool is_move = false;
int64_t task_status = static_cast<int64_t>(next_state);
if (next_state == cur_state) {
// move or suspend
SET_TASK_RESPONSE(task_status, 1);
if (cur_state == OB_RS_TTL_TASK_MOVE) {
is_move = true;
}
}
if (task.ttl_status_.status_ == task_status) {
// SUSPEND or MOVED
} else if (OB_FAIL(update_task_status(tenant_id, task.ttl_status_.task_id_, task_status))) {
LOG_WARN("fail to update ttl tasks", K(ret), K(task.ttl_status_.task_id_), K(cur_state), K(next_state), K(task_status));
} else {
ObMySQLTransaction trans;
uint64_t task_id = task.ttl_status_.task_id_;
if (OB_ISNULL(GCTX.sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null GCTX.sql_proxy_", K(ret));
} else if (OB_FAIL(trans.start(GCTX.sql_proxy_, tenant_id))) {
LOG_WARN("fail to start transation", K(ret), K(tenant_id));
} else if (OB_FAIL(update_task_status(tenant_id, task_id, task_status, *GCTX.sql_proxy_))) {
LOG_WARN("fail to update ttl tasks", K(ret), K(task_id), K(cur_state), K(next_state), K(task_status));
} else if (is_move && OB_FAIL(ObTTLUtil::remove_all_task_to_history_table(tenant_id, task_id, trans))) {
// NOTE: if parition was removed and observer restart, task won't be moved by observer itself
LOG_WARN("fail to move task to history table", K(tenant_id), K(task_id));
} else {}
if (trans.is_started()) {
bool commit = (OB_SUCCESS == ret);
int tmp_ret = ret;
if (OB_FAIL(trans.end(commit))) {
LOG_WARN("faile to end trans", "commit", commit, K(ret));
}
ret = tmp_ret == OB_SUCCESS ? ret : tmp_ret;
}
if (OB_SUCC(ret)) {
// update stauts and update time
task.ttl_status_.status_ = task_status;
task.all_responsed_ = false;
task.set_servers_not_responsed();
}
}
return ret;
}
......@@ -1389,17 +1419,23 @@ void ObTTLScheduler::runTimerTask()
int RsTenantTask::set_server_responsed(const ObAddr& server_addr)
{
int ret = OB_SUCCESS;
bool find_server = false;
TTLServerInfos& server_infos = server_infos_;
if (OB_UNLIKELY(!server_addr.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(server_addr));
} else {
for (int64_t i = 0; i < server_infos.count(); ++i) {
for (int64_t i = 0; i < server_infos.count() && !find_server; ++i) {
if (server_addr == server_infos.at(i).addr_) {
server_infos.at(i).is_responsed_ = true;
break;
find_server = true;
}
}
if (!find_server) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("cannot find addr in sever infos", K(ret), K(server_addr), K(server_infos));
}
}
return ret;
}
......
......@@ -163,7 +163,6 @@ private:
: mutex_(),
ten_task_arr_(),
del_ten_arr_(),
need_refresh_(true),
is_inited_(false) {}
int update_task_on_all_responsed(RsTenantTask& task);
......@@ -181,7 +180,8 @@ private:
virtual int update_task_status(uint64_t tenant_id,
uint64_t task_id,
int64_t rs_new_status);
int64_t rs_new_status,
common::ObISQLClient& proxy);
bool tenant_exist(uint64_t tenant_id);
......@@ -226,11 +226,9 @@ private:
lib::ObMutex mutex_; // lib::ObMutexGuard guard(mutex_);
ObArray<ObTTLTenantTask> ten_task_arr_;
ObArray<uint64_t> del_ten_arr_;
bool need_refresh_;
bool is_inited_;
const int64_t OB_TTL_TASK_RETRY_INTERVAL = 60*1000*1000; // 3min
const int64_t OB_TTL_TASK_RETRY_INTERVAL = 15*1000*1000; // 15s
};
#define TTLMGR ObTTLTenantTaskMgr::get_instance()
......
......@@ -29,10 +29,11 @@ bool ObTTLTime::is_same_day(int64_t ttl_time1, int64_t ttl_time2)
time_t param1 = static_cast<time_t>(ttl_time1 / 1000000l);
time_t param2 = static_cast<time_t>(ttl_time2 / 1000000l);
struct tm *t1 = localtime(&param1);
struct tm *t2 = localtime(&param1);
struct tm tm1, tm2;
::localtime_r(&param1, &tm1);
::localtime_r(&param2, &tm2);
return (t1 && t2 && t1->tm_mday == t2->tm_mday);
return (tm1.tm_yday == tm2.tm_yday);
}
bool ObTTLUtil::extract_val(const char* ptr, uint64_t len, int& val)
......@@ -135,12 +136,13 @@ int ObTTLUtil::insert_ttl_task(uint64_t tenant_id,
" VALUE "
"(now(), now(), %ld, %ld, %ld,"
" %ld, %ld, %ld, %ld, %ld, "
" %ld, %ld, %ld,'%s', '%s')", // 12
" %ld, %ld, %ld,'%.*s', '%.*s')", // 12
tname, // 0
tenant_id, task.table_id_, task.partition_id_,
task.task_id_, task.task_start_time_, task.task_update_time_, task.trigger_type_, task.status_,
task.ttl_del_cnt_, task.max_version_del_cnt_,
task.scan_cnt_, task.row_key_.ptr(), task.ret_code_.ptr()))) {
task.scan_cnt_, task.row_key_.length(), task.row_key_.ptr(),
task.ret_code_.length(), task.ret_code_.ptr()))) {
LOG_WARN("sql assign fmt failed", K(ret));
} else if (OB_FAIL(proxy.write(tenant_id, sql.ptr(), affect_rows))) {
LOG_WARN("fail to execute sql", K(ret), K(sql));
......@@ -167,12 +169,13 @@ int ObTTLUtil::update_ttl_task_all_fields(uint64_t tenant_id,
if (OB_FAIL(sql.assign_fmt("UPDATE %s SET "
"task_start_time = %ld, task_update_time = %ld, trigger_type = %ld, status = %ld,"
" ttl_del_cnt = %ld, max_version_del_cnt = %ld, scan_cnt = %ld, row_key = '%s', ret_code = '%s'"
" WHERE "
"tenant_id = %ld AND table_id = %ld AND partition_id = %ld AND task_id = %ld ",
" ttl_del_cnt = %ld, max_version_del_cnt = %ld, scan_cnt = %ld, row_key = '%*.s', ret_code = '%*.s'"
" WHERE tenant_id = %ld AND table_id = %ld AND partition_id = %ld AND task_id = %ld ",
tname, // 0
task.task_start_time_, task.task_update_time_, task.trigger_type_, task.status_,
task.ttl_del_cnt_, task.max_version_del_cnt_, task.scan_cnt_, task.row_key_.ptr(), task.ret_code_.ptr(),
task.ttl_del_cnt_, task.max_version_del_cnt_, task.scan_cnt_,
task.row_key_.length(), task.row_key_.ptr(),
task.ret_code_.length(), task.ret_code_.ptr(),
tenant_id, task.table_id_, key.partition_id_, key.task_id_))) {
LOG_WARN("sql assign fmt failed", K(ret));
} else if (OB_FAIL(proxy.write(tenant_id, sql.ptr(), affect_rows))) {
......@@ -260,12 +263,13 @@ int ObTTLUtil::update_ttl_task_all_fields(uint64_t tenant_id,
if (OB_FAIL(sql.assign_fmt("UPDATE %s SET "
"task_start_time = %ld, task_update_time = %ld, trigger_type = %ld, status = %ld,"
" ttl_del_cnt = %ld, max_version_del_cnt = %ld, scan_cnt = %ld, row_key = '%s', ret_code = '%s'"
" ttl_del_cnt = %ld, max_version_del_cnt = %ld, scan_cnt = %ld, row_key = '%*.s', ret_code = '%*.s'"
" WHERE "
"tenant_id = %ld AND table_id = %ld AND partition_id = %ld AND task_id = %ld ",
tname, // 0
task.task_start_time_, task.task_update_time_, task.trigger_type_, task.status_,
task.ttl_del_cnt_, task.max_version_del_cnt_, task.scan_cnt_, task.row_key_.ptr(), task.ret_code_.ptr(),
task.ttl_del_cnt_, task.max_version_del_cnt_, task.scan_cnt_,
task.row_key_.length(), task.row_key_.ptr(), task.ret_code_.length(), task.ret_code_.ptr(),
tenant_id, task.table_id_, task.partition_id_, task.task_id_))) {
LOG_WARN("sql assign fmt failed", K(ret));
} else if (OB_FAIL(proxy.write(tenant_id, sql.ptr(), affect_rows))) {
......@@ -280,11 +284,11 @@ int ObTTLUtil::update_ttl_task_all_fields(uint64_t tenant_id,
int ObTTLUtil::delete_ttl_task(uint64_t tenant_id,
const char* tname,
common::ObISQLClient& proxy,
ObTTLStatusKey& key)
ObTTLStatusKey& key,
int64_t &affect_rows)
{
int ret = OB_SUCCESS;
ObSqlString sql;
int64_t affect_rows = 0;
if (OB_FAIL(sql.assign_fmt("DELETE FROM %s WHERE "
"tenant_id = %ld AND table_id = %ld "
......@@ -396,20 +400,23 @@ int ObTTLUtil::read_ttl_tasks(uint64_t tenant_id,
ObString rowkey;
char *rowkey_buf = nullptr;
EXTRACT_VARCHAR_FIELD_MYSQL(*result, "row_key", rowkey);
if (OB_SUCC(ret) && !rowkey.empty()) {
if (OB_ISNULL(rowkey_buf = static_cast<char *>(allocator->alloc(rowkey.length())))) {
LOG_WARN("failt to allocate memory", K(ret));
LOG_WARN("failt to allocate memory", K(ret), K(rowkey));
} else {
MEMCPY(rowkey_buf, rowkey.ptr(), rowkey.length());
result_arr.at(idx).row_key_.assign(rowkey_buf, rowkey.length());
}
}
}
if (OB_NOT_NULL(allocator)) {
ObString err_msg;
char *err_buf = nullptr;
EXTRACT_VARCHAR_FIELD_MYSQL(*result, "ret_code", err_msg);
if (OB_SUCC(ret) && !err_msg.empty()) {
if (OB_ISNULL(err_buf = static_cast<char *>(allocator->alloc(err_msg.length())))) {
LOG_WARN("failt to allocate memory", K(ret), K(err_msg.length()));
LOG_WARN("failt to allocate memory", K(ret), K(err_msg));
} else {
MEMCPY(err_buf, err_msg.ptr(), err_msg.length());
result_arr.at(idx).ret_code_.assign(err_buf, err_msg.length());
......@@ -421,6 +428,7 @@ int ObTTLUtil::read_ttl_tasks(uint64_t tenant_id,
}
}
}
}
return ret;
}
......@@ -457,5 +465,58 @@ bool ObTTLUtil::check_can_process_tenant_tasks(uint64_t tenant_id)
return bret;
}
int ObTTLUtil::remove_all_task_to_history_table(uint64_t tenant_id, uint64_t task_id, common::ObISQLClient& proxy)
{
int ret = OB_SUCCESS;
ObSqlString sql;
int64_t affect_rows = 0;
if (OB_FAIL(sql.assign_fmt("insert into %s select * from %s "
" where task_id = %ld and partition_id != -1 and table_id != -1",
share::OB_ALL_KV_TTL_TASK_HISTORY_TNAME,
share::OB_ALL_KV_TTL_TASK_TNAME,
task_id))) {
LOG_WARN("sql assign fmt failed", K(ret));
} else if (OB_FAIL(proxy.write(tenant_id, sql.ptr(), affect_rows))) {
LOG_WARN("fail to execute sql", K(ret), K(sql), K(tenant_id));
} else {
LOG_INFO("success to execute sql", K(ret), K(tenant_id), K(sql), K(affect_rows));
}
return ret;
}
int ObTTLUtil::replace_ttl_task(uint64_t tenant_id,
const char* tname,
common::ObISQLClient& proxy,
ObTTLStatus& task)
{
int ret = OB_SUCCESS;
ObSqlString sql;
int64_t affect_rows = 0;
if (OB_FAIL(sql.assign_fmt("REPLACE INTO %s "
"(gmt_create, gmt_modified, tenant_id, table_id, partition_id, "
"task_id, task_start_time, task_update_time, trigger_type, status,"
" ttl_del_cnt, max_version_del_cnt, scan_cnt, row_key, ret_code)"
" VALUE "
"(now(), now(), %ld, %ld, %ld,"
" %ld, %ld, %ld, %ld, %ld, "
" %ld, %ld, %ld,'%.*s', '%.*s')", // 12
tname, // 0
tenant_id, task.table_id_, task.partition_id_,
task.task_id_, task.task_start_time_, task.task_update_time_, task.trigger_type_, task.status_,
task.ttl_del_cnt_, task.max_version_del_cnt_,
task.scan_cnt_, task.row_key_.length(), task.row_key_.ptr(),
task.ret_code_.length(), task.ret_code_.ptr()))) {
LOG_WARN("sql assign fmt failed", K(ret));
} else if (OB_FAIL(proxy.write(tenant_id, sql.ptr(), affect_rows))) {
LOG_WARN("fail to execute sql", K(ret), K(sql));
} else {
LOG_INFO("success to execute sql", K(ret), K(sql));
}
return ret;
}
} // end namespace rootserver
} // end namespace oceanbase
\ No newline at end of file
......@@ -24,6 +24,17 @@ namespace oceanbase
namespace common
{
#define OB_TTL_RESPONSE_MASK (1 << 5)
#define OB_TTL_STATUS_MASK (OB_TTL_RESPONSE_MASK - 1)
#define SET_TASK_PURE_STATUS(status, state) ((status) = ((state) & OB_TTL_STATUS_MASK) + ((status & OB_TTL_RESPONSE_MASK)))
#define SET_TASK_RESPONSE(status, state) ((status) |= (((state) & 1) << 5))
#define SET_TASK_STATUS(status, pure_status, is_responsed) { SET_TASK_PURE_STATUS(status, pure_status), SET_TASK_RESPONSE(status, is_responsed); }
#define EVAL_TASK_RESPONSE(status) (((status) & OB_TTL_RESPONSE_MASK) >> 5)
#define EVAL_TASK_PURE_STATUS(status) (static_cast<ObTTLTaskStatus>((status) & OB_TTL_STATUS_MASK))
enum TRIGGER_TYPE
{
PERIODIC_TRIGGER = 0,
......@@ -106,6 +117,7 @@ typedef struct ObTTLStatus {
K_(ttl_del_cnt),
K_(max_version_del_cnt),
K_(scan_cnt),
K_(row_key),
K_(ret_code));
} ObTTLStatus;
......@@ -210,6 +222,11 @@ public:
common::ObISQLClient& proxy,
ObTTLStatus& task);
static int replace_ttl_task(uint64_t tenant_id,
const char* tname,
common::ObISQLClient& proxy,
ObTTLStatus& task);
static int update_ttl_task(uint64_t tenant_id,
const char* tname,
common::ObISQLClient& proxy,
......@@ -230,7 +247,8 @@ public:
static int delete_ttl_task(uint64_t tenant_id,
const char* tname,
common::ObISQLClient& proxy,
ObTTLStatusKey& key);
ObTTLStatusKey& key,
int64_t &affect_rows);
static int read_ttl_tasks(uint64_t tenant_id,
const char* tname,
......@@ -240,6 +258,8 @@ public:
bool for_update = false,
common::ObIAllocator *allocator = NULL);
static int remove_all_task_to_history_table(uint64_t tenant_id, uint64_t task_id, common::ObISQLClient& proxy);
static bool check_can_do_work();
static bool check_can_process_tenant_tasks(uint64_t tenant_id);
......
......@@ -10369,12 +10369,9 @@ int ObPartitionService::internal_leader_active(const ObCbTask& active_task)
const bool is_normal_pg = !(guard.get_partition_group()->get_pg_storage().is_restore());
if ((OB_SYS_TENANT_ID != pkey.get_tenant_id()) && is_normal_pg) {
(void)clog_mgr_->add_pg_archive_task(partition);
observer::ObTTLManager::get_instance().on_leader_active(pkey);
}
}
if (OB_SUCC(ret)) {
observer::ObTTLManager::get_instance().on_leader_active(partition);
}
}
if (OB_FAIL(ret) && OB_NOT_NULL(partition)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册