提交 3946d3a9 编写于 作者: O obdev 提交者: wangzelin.wzl

[CP] BUG FIX: make create partition group process rollback success if it is failed.

上级 829cfdec
......@@ -107,6 +107,7 @@ ObPartitionGroup::ObPartitionGroup()
ObPartitionGroup::~ObPartitionGroup()
{
FLOG_INFO("deconstruct ObPartitionGroup", K(this), K_(pkey));
destroy();
}
......
......@@ -181,6 +181,7 @@ public:
}
return ret;
}
TO_STRING_KV(KP_(pg_mgr), K_(partitions));
private:
const ObPGMgr* pg_mgr_;
......
......@@ -1435,7 +1435,7 @@ int ObPartitionService::add_new_partition(ObIPartitionGroupGuard& partition_guar
return ret;
}
int ObPartitionService::add_partitions_to_mgr(common::ObIArray<ObIPartitionGroup *> &partitions)
int ObPartitionService::add_partitions_to_mgr_(ObIPartitionArrayGuard &partitions)
{
int ret = OB_SUCCESS;
......@@ -1495,7 +1495,7 @@ int ObPartitionService::add_partitions_to_mgr(common::ObIArray<ObIPartitionGroup
return ret;
}
int ObPartitionService::add_partitions_to_replay_engine(const common::ObIArray<ObIPartitionGroup*>& partitions)
int ObPartitionService::add_partitions_to_replay_engine_(ObIPartitionArrayGuard &partitions)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
......@@ -2166,7 +2166,7 @@ int ObPartitionService::create_batch_partition_groups(
ObTimeGuard tg(__func__, 100L * 1000L);
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObArray<ObIPartitionGroup*> partition_list;
ObIPartitionArrayGuard partition_array_guard;
bool txs_add_success = false;
bool rp_eg_add_success = false;
ObArray<ObStorageFileHandle> files_handle;
......@@ -2196,8 +2196,9 @@ int ObPartitionService::create_batch_partition_groups(
ret = OB_TOO_MANY_TENANT_PARTITIONS_ERROR;
STORAGE_LOG(
WARN, "reach tenant partition count limit, cannot create new partitions", K(ret), "count", batch_arg.count());
} else if (OB_FAIL(partition_list.reserve(batch_arg.count()))) {
STORAGE_LOG(WARN, "reserve array failed", K(ret), "count", batch_arg.count());
} else if (FALSE_IT(partition_array_guard.set_pg_mgr(pg_mgr_))) {
} else if (OB_FAIL(partition_array_guard.reserve(batch_arg.count()))) {
STORAGE_LOG(WARN, "reserve partition guard array failed", K(ret), "count", batch_arg.count());
} else if (OB_FAIL(batch_res.reserve(batch_arg.count()))) {
STORAGE_LOG(WARN, "reserver res array failed, ", K(ret));
} else if (OB_FAIL(batch_prepare_splitting(batch_arg))) {
......@@ -2250,21 +2251,21 @@ int ObPartitionService::create_batch_partition_groups(
tg.click();
if (OB_SUCC(ret)) {
if (OB_FAIL(batch_register_election_mgr(is_pg, batch_arg, partition_list, files_handle))) {
if (OB_FAIL(batch_register_election_mgr_(is_pg, batch_arg, partition_array_guard, files_handle))) {
STORAGE_LOG(WARN, "fail to batch register election mgr, ", K(ret));
}
}
tg.click();
if (OB_SUCC(ret)) {
if (OB_FAIL(add_partitions_to_mgr(partition_list))) {
if (OB_FAIL(add_partitions_to_mgr_(partition_array_guard))) {
STORAGE_LOG(WARN, "add partition to mgr failed.", K(ret));
}
}
tg.click();
if (OB_SUCC(ret)) {
if (OB_FAIL(add_partitions_to_replay_engine(partition_list))) {
if (OB_FAIL(add_partitions_to_replay_engine_(partition_array_guard))) {
STORAGE_LOG(WARN, "add partition to replay engine failed.", K(ret));
} else {
rp_eg_add_success = true;
......@@ -2273,7 +2274,7 @@ int ObPartitionService::create_batch_partition_groups(
tg.click();
if (OB_SUCC(ret)) {
if (OB_FAIL(batch_start_partition_election(batch_arg, partition_list))) {
if (OB_FAIL(batch_start_partition_election_(batch_arg, partition_array_guard))) {
STORAGE_LOG(WARN, "batch start partition election failed", K(ret));
}
}
......@@ -2286,8 +2287,8 @@ int ObPartitionService::create_batch_partition_groups(
for (int64_t i = 0; i < batch_arg.count(); ++i) {
rollback_partition_register(batch_arg.at(i).partition_key_, txs_add_success, rp_eg_add_success);
}
for (int64_t i = 0; i < partition_list.count(); ++i) {
ObIPartitionGroup* rb_pg = partition_list.at(i);
for (int64_t i = 0; i < partition_array_guard.count(); ++i) {
ObIPartitionGroup *rb_pg = partition_array_guard.at(i);
if (OB_ISNULL(rb_pg)) {
STORAGE_LOG(ERROR, "rollback pg is null", K(i));
ob_abort();
......@@ -2296,11 +2297,6 @@ int ObPartitionService::create_batch_partition_groups(
tmp_ret = remove_pg_from_mgr(rb_pg, true/*write_slog*/);
if (OB_SUCCESS == tmp_ret) {
// partition object was released by partition service, do nothing
} else if (OB_PARTITION_NOT_EXIST == tmp_ret) {
// partition object hasn't been added to partition service, release it manually
if (NULL != cp_fty_) {
cp_fty_->free(rb_pg);
}
} else {
STORAGE_LOG(ERROR, "fail to rollback pg", K(tmp_ret), K(rb_pkey), K(rb_pg));
ob_abort();
......@@ -2386,7 +2382,14 @@ int ObPartitionService::remove_duplicate_partitions(const ObIArray<ObCreateParti
if (arg.ignore_member_list_ && (OB_ENTRY_EXIST == partition_map_.contains_key(pkey))) {
need_retry = true;
if (REACH_TIME_INTERVAL(100 * 1000)) {
STORAGE_LOG(WARN, "partition still exist, need retry. ", K(pkey));
int tmp_ret = OB_SUCCESS;
ObPGPartition *pg_partition = NULL;
if (OB_SUCCESS != (tmp_ret = partition_map_.get(pkey, pg_partition))) {
STORAGE_LOG(WARN, "partition still exist, need retry. but get partition failed", K(tmp_ret), K(pkey));
} else {
STORAGE_LOG(WARN, "partition still exist, need retry. the left partition is", K(pkey), K(pg_partition));
partition_map_.revert(pg_partition);
}
}
break;
}
......@@ -2758,9 +2761,9 @@ int ObPartitionService::standby_update_replica_protection_level()
return ret;
}
int ObPartitionService::batch_register_election_mgr(const bool is_pg,
const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg, common::ObIArray<ObIPartitionGroup*>& partitions,
ObIArray<ObStorageFileHandle>& files_handle)
int ObPartitionService::batch_register_election_mgr_(const bool is_pg,
const common::ObIArray<obrpc::ObCreatePartitionArg> &batch_arg, ObIPartitionArrayGuard &partitions,
ObIArray<ObStorageFileHandle> &files_handle)
{
int ret = OB_SUCCESS;
int64_t index = 0;
......@@ -2964,8 +2967,8 @@ int ObPartitionService::batch_register_election_mgr(const bool is_pg,
return ret;
}
int ObPartitionService::batch_start_partition_election(
const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg, common::ObIArray<ObIPartitionGroup*>& partitions)
int ObPartitionService::batch_start_partition_election_(
const common::ObIArray<obrpc::ObCreatePartitionArg> &batch_arg, ObIPartitionArrayGuard &partitions)
{
int ret = OB_SUCCESS;
int64_t index = 0;
......
......@@ -923,23 +923,23 @@ private:
int replace_restore_info_(const uint64_t cur_tenant_id, const share::ObReplicaRestoreStatus is_restore,
const int64_t create_frozen_version, ObCreatePartitionParam& create_param);
int prepare_all_partitions();
int remove_duplicate_partitions(const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg);
int add_partitions_to_mgr(common::ObIArray<ObIPartitionGroup*>& partitions);
int add_partitions_to_replay_engine(const common::ObIArray<ObIPartitionGroup*>& partitions);
int batch_register_trans_service(const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg);
int batch_register_election_mgr(const bool is_pg, const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg,
common::ObIArray<ObIPartitionGroup*>& partitions,
int remove_duplicate_partitions(const common::ObIArray<obrpc::ObCreatePartitionArg> &batch_arg);
int add_partitions_to_mgr_(ObIPartitionArrayGuard &partitions);
int add_partitions_to_replay_engine_(ObIPartitionArrayGuard &partitions);
int batch_register_trans_service(const common::ObIArray<obrpc::ObCreatePartitionArg> &batch_arg);
int batch_register_election_mgr_(const bool is_pg, const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg,
ObIPartitionArrayGuard &partitions,
common::ObIArray<blocksstable::ObStorageFileHandle>& files_handle);
int batch_start_partition_election(
const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg, common::ObIArray<ObIPartitionGroup*>& partitions);
int try_remove_from_member_list(ObIPartitionGroup& partition);
int batch_prepare_splitting(const common::ObIArray<obrpc::ObCreatePartitionArg>& batch_arg);
int try_add_to_member_list(const obrpc::ObMemberChangeArg& arg);
int check_active_member(common::ObAddr& leader, const common::ObMember& member, const common::ObPartitionKey& key);
int handle_add_replica_callback(const ObReplicaOpArg& arg, const int result);
int handle_migrate_replica_callback(const ObReplicaOpArg& arg, const int result, bool& could_retry);
int handle_rebuild_replica_callback(const ObReplicaOpArg& arg, const int result);
int handle_change_replica_callback(const ObReplicaOpArg& arg, const int result);
int batch_start_partition_election_(
const common::ObIArray<obrpc::ObCreatePartitionArg> &batch_arg, ObIPartitionArrayGuard &partitions);
int try_remove_from_member_list(ObIPartitionGroup &partition);
int batch_prepare_splitting(const common::ObIArray<obrpc::ObCreatePartitionArg> &batch_arg);
int try_add_to_member_list(const obrpc::ObMemberChangeArg &arg);
int check_active_member(common::ObAddr &leader, const common::ObMember &member, const common::ObPartitionKey &key);
int handle_add_replica_callback(const ObReplicaOpArg &arg, const int result);
int handle_migrate_replica_callback(const ObReplicaOpArg &arg, const int result, bool &could_retry);
int handle_rebuild_replica_callback(const ObReplicaOpArg &arg, const int result);
int handle_change_replica_callback(const ObReplicaOpArg &arg, const int result);
template <typename ResultT>
int get_operate_replica_res(const ObReplicaOpArg& arg, const int result, ResultT& res);
int retry_get_is_member_change_done(common::ObAddr& leader, obrpc::ObMCLogRpcInfo& mc_log_info);
......
......@@ -228,6 +228,7 @@ OB_INLINE void ObPGMgr::free_pg(ObIPartitionGroup* pg) const
cp_fty_->free(pg);
}
if (!can_free) {
STORAGE_LOG(WARN, "pg can not free now", K(pg), KPC(pg));
ObPGMemoryGarbageCollector::get_instance().add_pg(pg);
}
}
......
......@@ -54,11 +54,13 @@ ObPGPartition::ObPGPartition()
ObPGPartition::~ObPGPartition()
{
FLOG_INFO("deconstruct ObPGPartition", K(this), K_(pkey));
destroy();
}
void ObPGPartition::destroy()
{
FLOG_INFO("destroy ObPGPartition", K(this), K_(pkey));
pkey_.reset();
if (NULL != cp_fty_) {
if (NULL != storage_) {
......@@ -123,6 +125,7 @@ int ObPGPartition::init(const common::ObPartitionKey& pkey, ObIPartitionComponen
if (OB_FAIL(ret)) {
destroy();
}
FLOG_INFO("ObPGPartition::init", K(ret), K(pkey), K(this));
return ret;
}
......
......@@ -114,6 +114,7 @@ ObPGStorage::ObPGStorage()
ObPGStorage::~ObPGStorage()
{
FLOG_INFO("deconstruct ObPGStorage", K(this), K_(pkey));
destroy();
}
......@@ -146,13 +147,13 @@ int ObPGStorage::init(const ObPartitionKey& key, ObIPartitionComponentFactory* c
pg_memtable_mgr_.set_pkey(key);
is_inited_ = true;
}
STORAGE_LOG(INFO, "partition init", K(ret), K(key));
STORAGE_LOG(INFO, "pgstorage init", K(ret), K(key), K(this));
return ret;
}
void ObPGStorage::destroy()
{
FLOG_INFO("destroy pg storage", K(*this), K(lbt()));
FLOG_INFO("destroy pg storage", K(this), K(*this), K(lbt()));
clear();
if (NULL != file_handle_.get_storage_file()) {
file_handle_.reset();
......@@ -161,7 +162,7 @@ void ObPGStorage::destroy()
void ObPGStorage::clear()
{
FLOG_INFO("clear pg storage", K(*this), K(lbt()));
FLOG_INFO("clear pg storage", K(this), K(*this), K(lbt()));
int tmp_ret = OB_SUCCESS;
......
......@@ -540,6 +540,9 @@ private:
} else if (OB_FAIL(map_.del(pkey))) {
STORAGE_LOG(WARN, "del pg partition from map fail", K(pkey));
}
if (OB_SUCC(ret)) {
STORAGE_LOG(INFO, "remove pg partition success", K(pkey), K(part));
}
}
return ret;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册