提交 85caf5ef 编写于 作者: J JiahuaChen 提交者: LINGuanRen

Fix pg not removed from tenant file map when rollback batch create partitions

上级 2f1c6922
......@@ -781,7 +781,9 @@ int ObPartitionMetaRedoModule::inner_del_partition_for_replay(const ObPartitionK
} else if (OB_FAIL(inner_del_partition_impl(pkey, OB_INVALID_DATA_FILE_ID == file_id ? nullptr : &file_id))) {
LOG_WARN("fail to inner del partition", K(ret));
} else if (OB_INVALID_DATA_FILE_ID == file_id) {
// replay slog before 3.1, need to remove pg from file mgr explicitly
// OB_INVALID_DATA_FILE_ID means the slog is written before 3.1,
// need to remove pg from file mgr explicitly
// Otherwise, pg would be removed from file mgr in tenant file slog replay
const ObIPartitionGroup& pg = *guard.get_partition_group();
const ObStorageFile* file = pg.get_storage_file();
if (OB_ISNULL(file)) {
......
......@@ -1438,8 +1438,6 @@ int ObPartitionService::add_new_partition(ObIPartitionGroupGuard& partition_guar
int ObPartitionService::add_partitions_to_mgr(common::ObIArray<ObIPartitionGroup *> &partitions)
{
int ret = OB_SUCCESS;
int64_t added_pg_count = 0;
int64_t added_pg_to_file_count = 0;
ObIPartitionGroup* partition = NULL;
int tmp_ret = OB_SUCCESS;
......@@ -1466,11 +1464,8 @@ int ObPartitionService::add_partitions_to_mgr(common::ObIArray<ObIPartitionGroup
STORAGE_LOG(WARN, "fail to inner add partition, ", K(ret));
} else {
ObTenantFileKey file_key(file->get_tenant_id(), file->get_file_id());
++added_pg_count;
if (OB_FAIL(file_mgr_->add_pg(file_key, partitions.at(i)->get_partition_key()))) {
LOG_WARN("fail to remove pg from file mgr", K(ret), K(file_key));
} else {
++added_pg_to_file_count;
}
}
}
......@@ -1491,29 +1486,6 @@ int ObPartitionService::add_partitions_to_mgr(common::ObIArray<ObIPartitionGroup
if (OB_SUCC(ret)) {
STORAGE_LOG(INFO, "add partition groups to partition service success", K(partitions));
} else {
STORAGE_LOG(WARN, "add partition groups to partition service fail", K(ret), K(partitions));
for (int64_t i = 0; i < added_pg_to_file_count; ++i) {
ObStorageFile* file = partitions.at(i)->get_storage_file();
ObTenantFileKey file_key(file->get_tenant_id(), file->get_file_id());
if (OB_SUCCESS != (tmp_ret = file_mgr_->remove_pg(file_key, partitions.at(i)->get_partition_key()))) {
LOG_WARN("failed to inner del partition", K(ret), K(i), K(file_key));
ob_abort();
}
}
for (int64_t i = 0; i < added_pg_count; ++i) {
const ObPartitionKey& pkey = partitions.at(i)->get_partition_key();
if (OB_SUCCESS != (tmp_ret = inner_del_partition(pkey))) {
LOG_WARN("failed to inner del partition", K(ret), K(i), K(pkey));
ob_abort();
}
}
for (int64_t i = added_pg_count; i < partitions.count(); ++i) {
partition = partitions.at(i);
cp_fty_->free(partition);
}
partitions.reuse();
}
#ifdef ERRSIM
if (OB_SUCC(ret)) {
......@@ -2288,11 +2260,6 @@ int ObPartitionService::create_batch_partition_groups(
if (OB_FAIL(add_partitions_to_mgr(partition_list))) {
STORAGE_LOG(WARN, "add partition to mgr failed.", K(ret));
}
} else {
free_partition_list(partition_list);
if (OB_SUCCESS != (tmp_ret = SLOGGER.abort())) {
STORAGE_LOG(WARN, "commit logger failed to abort", K(tmp_ret));
}
}
tg.click();
......@@ -2314,9 +2281,29 @@ int ObPartitionService::create_batch_partition_groups(
if (OB_FAIL(ret)) {
// do some rollback work
tmp_ret = OB_SUCCESS;
for (int64_t i = 0; i < batch_arg.count(); ++i) {
rollback_partition_register(batch_arg.at(i).partition_key_, txs_add_success, rp_eg_add_success);
inner_del_partition(batch_arg.at(i).partition_key_);
}
for (int64_t i = 0; i < partition_list.count(); ++i) {
ObIPartitionGroup* rb_pg = partition_list.at(i);
if (OB_ISNULL(rb_pg)) {
STORAGE_LOG(ERROR, "rollback pg is null", K(i));
ob_abort();
}
const ObPGKey &rb_pkey = rb_pg->get_partition_key();
tmp_ret = remove_pg_from_mgr(rb_pg, true/*write_slog*/);
if (OB_SUCCESS == tmp_ret) {
// partition object was released by partition service, do nothing
} else if (OB_PARTITION_NOT_EXIST == tmp_ret) {
// partition object hasn't been added to partition service, release it manually
if (NULL != cp_fty_) {
cp_fty_->free(rb_pg);
}
} else {
STORAGE_LOG(ERROR, "fail to rollback pg", K(tmp_ret), K(rb_pkey), K(rb_pg));
ob_abort();
}
}
}
tg.click();
......@@ -2365,17 +2352,6 @@ int ObPartitionService::create_batch_partition_groups(
return ret;
}
void ObPartitionService::free_partition_list(ObArray<ObIPartitionGroup*>& partitions)
{
for (int64_t i = 0; i < partitions.count(); ++i) {
ObIPartitionGroup* partition = partitions.at(i);
if (NULL != cp_fty_ && NULL != partition) {
cp_fty_->free(partition);
}
}
partitions.reuse();
}
int ObPartitionService::remove_duplicate_partitions(const ObIArray<ObCreatePartitionArg>& batch_arg)
{
int ret = OB_SUCCESS;
......@@ -2979,18 +2955,6 @@ int ObPartitionService::batch_register_election_mgr(const bool is_pg,
files_handle.at(index).reset();
}
} // end for loop
if (OB_FAIL(ret)) {
// do some rollback work
for (int64_t i = 0; i < partitions.count(); ++i) {
partition = partitions.at(i);
if (NULL != partition) {
cp_fty_->free(partition);
partition = NULL;
}
}
partitions.reuse();
}
#ifdef ERRSIM
if (OB_SUCC(ret)) {
ret = E(EventTable::EN_CREATE_PG_AFTER_REGISTER_ELECTION_MGR) OB_SUCCESS;
......@@ -3132,13 +3096,13 @@ int ObPartitionService::remove_pg_from_mgr(const ObIPartitionGroup* partition, c
}
if (OB_SUCC(ret)) {
if (OB_FAIL(inner_del_partition(pkey))) {
// should not happen
STORAGE_LOG(ERROR, "Fail to inner del partition, ", K(ret), K(pkey));
ob_abort();
} else if (OB_FAIL(file_mgr_->remove_pg(file_key, pkey))) {
if (OB_FAIL(file_mgr_->remove_pg(file_key, pkey))) {
STORAGE_LOG(ERROR, "fail to remove pg from file mgr", K(ret));
ob_abort();
} else if (OB_FAIL(inner_del_partition(pkey))) {
//should not happen
STORAGE_LOG(ERROR, "Fail to inner del partition, ", K(ret), K(pkey));
ob_abort();
}
}
}
......
......@@ -1013,7 +1013,6 @@ private:
const int64_t schema_version, const uint64_t index_id, const int64_t timeout, uint64_t& log_id, int64_t& log_ts);
int check_partition_state_(const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg,
common::ObIArray<obrpc::ObCreatePartitionArg>& target_batch_arg, common::ObIArray<int>& batch_res);
void free_partition_list(ObArray<ObIPartitionGroup*>& partition_list);
void submit_pt_update_task_(const ObPartitionKey& pkey, const bool need_report_checksum = true);
int submit_pg_pt_update_task_(const ObPartitionKey& pkey);
int try_inc_total_partition_cnt(const int64_t new_partition_cnt, const bool need_check);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册