提交 c392189b 编写于 作者: C chinaxing 提交者: wangzelin.wzl

[FIX] pg partition map handle pg remove corrupted after globalize

上级 c29d0907
......@@ -4249,6 +4249,8 @@ int ObPartitionService::inner_del_partition_impl(const ObPartitionKey& pkey, con
STORAGE_LOG(WARN, "remove election from election mgr error", K(ret), K(pkey));
} else if (OB_FAIL(pg_mgr_.del_pg(pkey, file_id))) {
STORAGE_LOG(WARN, "pg mgr remove partition group error", K(ret), K(pkey));
} else if (OB_FAIL(pg->get_pg_storage().post_del_pg())) {
STORAGE_LOG(WARN, "failed to call post del_pg", K(ret), K(pkey));
} else if (OB_FAIL(pg->get_pg_storage().remove_all_pg_index())) {
STORAGE_LOG(WARN, "failed to remove all pg index", K(ret), K(pkey));
} else {
......
......@@ -181,21 +181,31 @@ static const int64_t SHRINK_THRESHOLD = 128;
typedef common::ObLinkHashMap<common::ObPartitionKey, ObPGPartition, PGPartitionInfoAlloc> ObPGPartitionMap;
// thread safe
class ObPartitionKeyList {
class ObPGPartitionList {
private:
class Node : public ObDLinkBase<Node> {
public:
Node(const common::ObPartitionKey& key) : key_(key)
Node(const common::ObPartitionKey &key) : key_(key), part_(NULL)
{}
const common::ObPartitionKey key_;
// no-NULL when PartitionGroup removed
ObPGPartition *part_;
TO_STRING_KV(K(key_), KP(part_));
};
typedef common::ObDList<Node> List;
public:
ObPartitionKeyList() : lock_(), list_()
ObPGPartitionList() : lock_(), list_(), contains_part_info_(false)
{}
void reset()
{
TCWLockGuard guard(lock_);
STORAGE_LOG(INFO, "reset partition list", K(this), K_(list), K_(contains_part_info), K(lbt()));
list_.reset();
contains_part_info_ = false;
}
inline int size()
inline int size() const
{
return list_.get_size();
}
......@@ -224,8 +234,7 @@ public:
// NOTICE : _assumption_ : no concurrent remove of same `pkey`
bool remove_latest(const common::ObPartitionKey& pkey)
{
Node* n = NULL;
Node *n = NULL;
{
TCRLockGuard guard(lock_);
DLIST_FOREACH_BACKWARD_X(curr, list_, NULL == n)
......@@ -235,7 +244,6 @@ public:
}
}
}
if (NULL != n) {
{
TCWLockGuard guard(lock_);
......@@ -243,7 +251,6 @@ public:
}
ob_free(n);
}
return NULL != n;
}
......@@ -266,14 +273,54 @@ public:
DLIST_REMOVE_ALL_NORET(cur, list_)
{
cur->unlink();
fn(cur->key_);
fn(cur->key_, cur->part_);
ob_free(cur);
}
}
template <typename Fn>
int set_partition_info(Fn &fn)
{
int ret = OB_SUCCESS;
TCWLockGuard guard(lock_);
DLIST_FOREACH(cur, list_)
{
ret = fn(cur->key_, cur->part_);
}
if (OB_SUCC(ret)) {
contains_part_info_ = true;
}
STORAGE_LOG(INFO, "set_partition_info OK", K(this), K_(list), K_(contains_part_info));
return ret;
}
int get_part_info(const ObPartitionKey &pkey, ObPGPartition *&part) const
{
int ret = OB_ENTRY_NOT_EXIST;
bool found_pkey = false;
TCRLockGuard guard(lock_);
if (contains_part_info_) {
DLIST_FOREACH_NORET(cur, list_)
{
if (pkey == cur->key_) {
found_pkey = true;
part = cur->part_;
if (OB_NOT_NULL(part)) {
ret = OB_SUCCESS;
}
break;
}
}
}
if (OB_SUCCESS != ret) {
STORAGE_LOG(
WARN, "get_part_info fail", K(ret), K(pkey), K(found_pkey), K(this), K_(list), K_(contains_part_info));
}
return ret;
}
private:
TCRWLock lock_;
List list_;
bool contains_part_info_;
};
class ObPGPartitionGuard {
......@@ -285,10 +332,30 @@ public:
(void)set_pg_partition(pkey, map);
}
int set_pg_partition(const common::ObPartitionKey& pkey, const ObPGPartitionMap& map)
ObPGPartitionGuard(const common::ObPartitionKey &pkey, const ObPGPartitionList &part_list)
: pg_partition_(NULL), map_(NULL)
{
(void)set_pg_partition(pkey, part_list);
}
int set_pg_partition(const common::ObPartitionKey &pkey, const ObPGPartitionList &part_list)
{
int ret = OB_SUCCESS;
if (!pkey.is_valid()) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(pkey));
} else if (OB_FAIL(part_list.get_part_info(pkey, pg_partition_))) {
STORAGE_LOG(WARN, "get pg part from partition list fail", K(ret), K(pkey), KP(&part_list));
} else {
pkey_ = pkey;
map_ = NULL;
}
return ret;
}
int set_pg_partition(const common::ObPartitionKey &pkey, const ObPGPartitionMap &map)
{
int ret = OB_SUCCESS;
if (!pkey.is_valid()) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(pkey));
......@@ -298,7 +365,6 @@ public:
pkey_ = pkey;
map_ = &const_cast<ObPGPartitionMap&>(map);
}
return ret;
}
......
......@@ -96,6 +96,7 @@ ObPGStorage::ObPGStorage()
pg_partition_(NULL),
schema_service_(NULL),
partition_list_(),
part_list_contains_part_info_(false),
lock_(ObLatchIds::PARTITION_GROUP_LOCK),
meta_(nullptr),
log_seq_num_(0),
......@@ -180,6 +181,8 @@ void ObPGStorage::clear()
RemovePGPartitionFunctor functor(*(pg_->get_pg_partition_map()));
partition_list_.remove_all(functor);
}
partition_list_.reset();
part_list_contains_part_info_ = false;
pg_ = NULL;
pg_partition_ = NULL;
schema_service_ = NULL;
......@@ -193,7 +196,11 @@ void ObPGStorage::clear()
bucket_lock_.destroy();
}
int ObPGStorage::alloc_meta_(ObPartitionGroupMeta*& meta)
#define PG_PARTITION_GUARD(g, k) \
ObPGPartitionGuard g; \
get_pg_partition(k, g);
int ObPGStorage::alloc_meta_(ObPartitionGroupMeta *&meta)
{
int ret = OB_SUCCESS;
void* buf = nullptr;
......@@ -271,7 +278,7 @@ int ObPGStorage::serialize_impl(
ObPGPartition* pg_partition = nullptr;
const ObPartitionKey& pkey = meta_->partitions_.at(i);
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
PG_PARTITION_GUARD(guard, pkey)
if (OB_ISNULL(pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pg partition info is null, unexpected error", K(ret), K(pkey));
......@@ -661,19 +668,18 @@ int ObPGStorage::deserialize_before_2270_(
return ret;
}
int ObPGStorage::get_pg_partition(const common::ObPartitionKey& pkey, ObPGPartitionGuard& guard)
int ObPGStorage::get_pg_partition(const common::ObPartitionKey &pkey, ObPGPartitionGuard &guard) const
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!pkey.is_valid())) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(pkey));
} else if (OB_FAIL(guard.set_pg_partition(pkey, *(pg_->get_pg_partition_map())))) {
STORAGE_LOG(WARN, "get pg partition error", K(ret), K(pkey));
if (ATOMIC_LOAD(&part_list_contains_part_info_)) {
ret = guard.set_pg_partition(pkey, partition_list_);
} else {
// do nothing
if (OB_FAIL(guard.set_pg_partition(pkey, *pg_->get_pg_partition_map()))) {
if (ATOMIC_LOAD(&part_list_contains_part_info_)) {
ret = guard.set_pg_partition(pkey, partition_list_);
}
}
}
return ret;
}
......@@ -685,7 +691,7 @@ int ObPGStorage::check_pg_partition_exist(const ObPartitionKey& pkey, bool& exis
if (!pkey.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(pkey));
} else if (OB_FAIL(pg_partition_guard.set_pg_partition(pkey, *(pg_->get_pg_partition_map())))) {
} else if (OB_FAIL(get_pg_partition(pkey, pg_partition_guard))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
exist = false;
......@@ -795,8 +801,8 @@ int ObPGStorage::check_can_replay_add_partition_to_pg_log(const common::ObPartit
STORAGE_LOG(INFO, "partition group is garbaging, no need to replay", K(pkey), K(log_id));
can_replay = false;
} else {
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (NULL == (pg_partition = guard.get_pg_partition())) {
can_replay = true;
const int64_t publish_version = get_publish_version();
......@@ -861,8 +867,8 @@ int ObPGStorage::check_can_replay_remove_partition_from_pg_log(
STORAGE_LOG(INFO, "partition group is garbaging, no need to replay", K(pkey), K(log_id));
can_replay = false;
} else {
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (NULL == (pg_partition = guard.get_pg_partition())) {
can_replay = false;
} else {
......@@ -1391,9 +1397,9 @@ int ObPGStorage::get_create_schema_version(const ObPartitionKey& pkey, int64_t&
ret = OB_NOT_INIT;
LOG_WARN("pg storage is not inited", K(ret));
} else {
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPGPartition *partition = nullptr;
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(partition = pg_partition_guard.get_pg_partition())) {
ret = OB_PARTITION_NOT_EXIST;
LOG_WARN("pg partition not exist", K(ret), K(pkey), K(*meta_));
......@@ -1756,8 +1762,8 @@ int ObPGStorage::check_pg_partition_offline(const ObPartitionKey& pkey, bool& of
STORAGE_LOG(WARN, "invalid argument", K(ret), K(pkey));
// offline pg
} else {
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (NULL == (pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K(pkey_), K(pkey));
......@@ -2011,10 +2017,10 @@ int ObPGStorage::retire_warmup_store(const bool is_disk_full)
LOG_WARN("pg is removed", K(ret), K(*meta_));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < meta_->partitions_.count(); ++i) {
const ObPartitionKey& pkey = meta_->partitions_.at(i);
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
ObPartitionStorage* storage = nullptr;
const ObPartitionKey &pkey = meta_->partitions_.at(i);
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPGPartition *partition = nullptr;
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(partition = pg_partition_guard.get_pg_partition())) {
ret = OB_PARTITION_NOT_EXIST;
LOG_WARN("pg partition not exist", K(ret), K(pkey), K(*meta_));
......@@ -2121,10 +2127,10 @@ int ObPGStorage::enable_write_log(const bool is_replay_old)
STORAGE_LOG(WARN, "get all pg partition keys error", K(ret), K(pkey_), K(pkeys));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < pkeys.count(); ++i) {
const ObPartitionKey& pkey = pkeys.at(i);
ObPGPartition* pg_partition = nullptr;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
ObPartitionStorage* storage = nullptr;
const ObPartitionKey &pkey = pkeys.at(i);
ObPGPartition *pg_partition = nullptr;
PG_PARTITION_GUARD(guard, pkey)
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K_(pkey), K(pkey));
......@@ -2179,7 +2185,7 @@ int ObPGStorage::check_can_migrate(bool& can_migrate)
} else {
can_migrate = false;
for (int64_t i = 0; OB_SUCC(ret) && i < pkeys.count(); i++) {
ObPGPartitionGuard guard(pkeys.at(i), *(pg_->get_pg_partition_map()));
PG_PARTITION_GUARD(guard, pkeys.at(i))
if (NULL == (pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K(pkeys.at(i)));
......@@ -2209,8 +2215,8 @@ int ObPGStorage::do_warm_up_request(const ObIWarmUpRequest* request)
STORAGE_LOG(WARN, "fail to get pg partition keys", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < pkeys.count(); ++i) {
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkeys.at(i), *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkeys.at(i))
if (NULL == (pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K_(pkey));
......@@ -2315,9 +2321,9 @@ int ObPGStorage::set_pg_replica_type(const ObReplicaType& replica_type, const bo
if (OB_SUCC(ret)) {
for (int64_t i = 0; i < pkeys.count(); ++i) {
ObPGPartition* pg_partition = nullptr;
ObPGPartitionGuard guard(pkeys.at(i), *(pg_->get_pg_partition_map()));
ObPartitionStorage* storage = nullptr;
ObPGPartition *pg_partition = nullptr;
PG_PARTITION_GUARD(guard, pkeys.at(i))
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(pg_partition = guard.get_pg_partition())) {
tmp_ret = OB_ERR_UNEXPECTED;
LOG_ERROR("pg partition info is null, unexpected error", K(tmp_ret), K_(pkey));
......@@ -2471,8 +2477,8 @@ int ObPGStorage::append_local_sort_data(
ret = OB_INVALID_ARGUMENT;
LOG_WARN("param is invalid", K(ret), K(pkey), K(param));
} else {
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (NULL == (pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K(pkey), K(pkey_));
......@@ -2496,8 +2502,8 @@ int ObPGStorage::append_sstable(
ret = OB_INVALID_ARGUMENT;
LOG_WARN("param is invalid", K(ret), K(param));
} else {
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
ObTableHandle sstable_handle;
ObSSTable* sstable = nullptr;
const int64_t max_kept_major_version_number = 1;
......@@ -2549,8 +2555,8 @@ int ObPGStorage::append_sstable(
int ObPGStorage::check_single_replica_major_sstable_exist(const ObPartitionKey& pkey, const uint64_t index_table_id)
{
int ret = OB_SUCCESS;
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
bool has_major = false;
if (NULL == (pg_partition = guard.get_pg_partition())) {
......@@ -2574,8 +2580,8 @@ int ObPGStorage::get_table_stat(const common::ObPartitionKey& pkey, ObTableStat&
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(pkey), K_(pkey));
} else {
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (NULL == (pg_partition = guard.get_pg_partition())) {
ret = OB_ENTRY_NOT_EXIST;
STORAGE_LOG(WARN, "pg partition info not exist", K(ret), K_(pkey));
......@@ -2867,8 +2873,8 @@ int ObPGStorage::get_all_table_ids(const ObPartitionKey& pkey, ObIArray<uint64_t
{
int ret = OB_SUCCESS;
index_tables.reset();
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (NULL == (pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K(pkey));
......@@ -2885,8 +2891,8 @@ int ObPGStorage::get_all_table_ids(const ObPartitionKey& pkey, ObIArray<uint64_t
int ObPGStorage::get_reference_tables(const ObPartitionKey& pkey, const int64_t index_id, ObTablesHandle& handle)
{
int ret = OB_SUCCESS;
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (NULL == (pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K_(pkey));
......@@ -2966,9 +2972,9 @@ int ObPGStorage::fill_pg_partition_replica(const ObPartitionKey& pkey, ObReportS
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(INFO, "invalid argument", K(ret), K(pkey));
} else {
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* pg_partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPGPartition *pg_partition = nullptr;
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(pg_partition = pg_partition_guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pg_partition is null", K(ret), K(pkey));
......@@ -3004,9 +3010,9 @@ int ObPGStorage::fill_replica(share::ObPartitionReplica& replica)
}
// for compatibility
if (OB_SUCC(ret) && !pkey_.is_pg()) {
ObPGPartitionGuard pg_partition_guard(pkey_, *(pg_->get_pg_partition_map()));
ObPGPartition* pg_partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, pkey_)
ObPGPartition *pg_partition = nullptr;
ObPartitionStorage *storage = nullptr;
ObReportStatus report_status;
if (OB_ISNULL(pg_partition = pg_partition_guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
......@@ -3037,8 +3043,8 @@ int ObPGStorage::replay_schema_log(const char* buf, const int64_t size, const in
if (OB_FAIL(pkey.deserialize(buf, size, pos))) {
STORAGE_LOG(WARN, "fail to deserialize pkey", K(pkey_));
} else {
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (OB_ISNULL(pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
......@@ -3080,8 +3086,8 @@ int ObPGStorage::table_scan(ObTableScanParam& param, ObNewRowIterator*& result)
ret = OB_REPLICA_NOT_READABLE;
STORAGE_LOG(WARN, "replica is not readable", K(ret), "this", *this);
} else if (is_pg || NULL == pg_partition_) {
const common::ObPartitionKey& pkey = (is_pg ? param.pkey_ : pkey_);
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
const common::ObPartitionKey &pkey = (is_pg ? param.pkey_ : pkey_);
PG_PARTITION_GUARD(guard, pkey)
if (NULL == (pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K_(pkey));
......@@ -3129,8 +3135,8 @@ int ObPGStorage::table_scan(ObTableScanParam& param, ObNewIterIterator*& result)
ret = OB_REPLICA_NOT_READABLE;
STORAGE_LOG(WARN, "replica is not readable", K(ret), "this", *this);
} else if (is_pg || NULL == pg_partition_) {
const common::ObPartitionKey& pkey = (is_pg ? param.pkey_ : pkey_);
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
const common::ObPartitionKey &pkey = (is_pg ? param.pkey_ : pkey_);
PG_PARTITION_GUARD(guard, pkey)
if (NULL == (pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K_(pkey));
......@@ -3176,7 +3182,7 @@ int ObPGStorage::join_mv_scan(ObTableScanParam& left_param, ObTableScanParam& ri
const common::ObPartitionKey& lpkey = (pkey_.is_pg() ? left_param.pkey_ : pkey_);
const common::ObPartitionKey& rpkey = (rp.is_pg() ? right_param.pkey_ : rp.get_partition_key());
ObPGPartitionGuard lguard(lpkey, *(pg_->get_pg_partition_map()));
PG_PARTITION_GUARD(lguard, lpkey)
ObPGPartitionGuard rguard;
if (!ObReplicaTypeCheck::is_readable_replica(get_replica_type_())) {
......@@ -3219,9 +3225,9 @@ int ObPGStorage::delete_rows(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_pa
{
int ret = OB_SUCCESS;
const ObPartitionKey& pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
const ObPartitionKey &pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (!ObReplicaTypeCheck::is_writable_replica(get_replica_type_())) {
ret = OB_ERR_READ_ONLY;
STORAGE_LOG(ERROR, "replica is not writable", K(ret), "this", *this);
......@@ -3242,9 +3248,9 @@ int ObPGStorage::delete_row(
{
int ret = OB_SUCCESS;
const ObPartitionKey& pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
const ObPartitionKey &pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (!ObReplicaTypeCheck::is_writable_replica(get_replica_type_())) {
ret = OB_ERR_READ_ONLY;
STORAGE_LOG(ERROR, "replica is not writable", K(ret), "this", *this);
......@@ -3264,9 +3270,9 @@ int ObPGStorage::put_rows(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_param
{
int ret = OB_SUCCESS;
const ObPartitionKey& pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
const ObPartitionKey &pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (!ObReplicaTypeCheck::is_writable_replica(get_replica_type_())) {
ret = OB_ERR_READ_ONLY;
STORAGE_LOG(ERROR, "replica is not writable", K(ret), "this", *this);
......@@ -3286,9 +3292,9 @@ int ObPGStorage::insert_rows(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_pa
{
int ret = OB_SUCCESS;
const ObPartitionKey& pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
const ObPartitionKey &pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (!ObReplicaTypeCheck::is_writable_replica(get_replica_type_())) {
ret = OB_ERR_READ_ONLY;
STORAGE_LOG(ERROR, "replica is not writable", K(ret), "this", *this);
......@@ -3308,9 +3314,9 @@ int ObPGStorage::insert_row(
{
int ret = OB_SUCCESS;
const ObPartitionKey& pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
const ObPartitionKey &pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (!ObReplicaTypeCheck::is_writable_replica(get_replica_type_())) {
ret = OB_ERR_READ_ONLY;
STORAGE_LOG(ERROR, "replica is not writable", K(ret), "this", *this);
......@@ -3332,9 +3338,9 @@ int ObPGStorage::insert_row(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_par
{
int ret = OB_SUCCESS;
const ObPartitionKey& pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
const ObPartitionKey &pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (!ObReplicaTypeCheck::is_writable_replica(get_replica_type_())) {
ret = OB_ERR_READ_ONLY;
STORAGE_LOG(ERROR, "replica is not writable", K(ret), "this", *this);
......@@ -3358,9 +3364,9 @@ int ObPGStorage::fetch_conflict_rows(const ObStoreCtx& ctx, const ObDMLBaseParam
{
int ret = OB_SUCCESS;
const ObPartitionKey& pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
const ObPartitionKey &pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (!ObReplicaTypeCheck::is_writable_replica(get_replica_type_())) {
ret = OB_ERR_READ_ONLY;
STORAGE_LOG(ERROR, "replica is not writable", K(ret), "this", *this);
......@@ -3384,9 +3390,9 @@ int ObPGStorage::revert_insert_iter(const common::ObPartitionKey& pkey, ObNewRow
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(pkey), KP(iter));
} else {
const common::ObPartitionKey& key = (pkey_.is_pg() ? pkey : pkey_);
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(key, *(pg_->get_pg_partition_map()));
const common::ObPartitionKey &key = (pkey_.is_pg() ? pkey : pkey_);
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, key)
if (NULL == (pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K(pkey), K_(pkey));
......@@ -3403,9 +3409,9 @@ int ObPGStorage::update_rows(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_pa
{
int ret = OB_SUCCESS;
const ObPartitionKey& pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
const ObPartitionKey &pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (!ObReplicaTypeCheck::is_writable_replica(get_replica_type_())) {
ret = OB_ERR_READ_ONLY;
STORAGE_LOG(ERROR, "replica is not writable", K(ret), "this", *this);
......@@ -3427,9 +3433,9 @@ int ObPGStorage::update_row(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_par
{
int ret = OB_SUCCESS;
const ObPartitionKey& pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
const ObPartitionKey &pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (!ObReplicaTypeCheck::is_writable_replica(get_replica_type_())) {
ret = OB_ERR_READ_ONLY;
STORAGE_LOG(ERROR, "replica is not writable", K(ret), "this", *this);
......@@ -3449,9 +3455,9 @@ int ObPGStorage::lock_rows(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_para
{
int ret = OB_SUCCESS;
const ObPartitionKey& pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
const ObPartitionKey &pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (!ObReplicaTypeCheck::is_writable_replica(get_replica_type_())) {
ret = OB_ERR_READ_ONLY;
STORAGE_LOG(ERROR, "replica is not writable", K(ret), "this", *this);
......@@ -3471,9 +3477,9 @@ int ObPGStorage::lock_rows(const ObStoreCtx& ctx, const ObDMLBaseParam& dml_para
{
int ret = OB_SUCCESS;
const ObPartitionKey& pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
const ObPartitionKey &pkey = (pkey_.is_pg() ? ctx.cur_pkey_ : pkey_);
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (!ObReplicaTypeCheck::is_writable_replica(get_replica_type_())) {
ret = OB_ERR_READ_ONLY;
STORAGE_LOG(ERROR, "replica is not writable", K(ret), "this", *this);
......@@ -3561,10 +3567,10 @@ int ObPGStorage::check_can_release_pg_memtable_(ObTablesHandle& memtable_merged,
bool part_can_release = false;
schema_version = std::max(schema_version, memtable->get_max_schema_version());
for (int64_t i = 0; OB_SUCC(ret) && (pg_all_merged || pg_can_release) && i < partitions.count(); ++i) {
ObPGPartition* pg_partition = nullptr;
ObPartitionStorage* storage = nullptr;
const ObPartitionKey& pkey = partitions.at(i);
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = nullptr;
ObPartitionStorage *storage = nullptr;
const ObPartitionKey &pkey = partitions.at(i);
PG_PARTITION_GUARD(guard, pkey)
if (OB_ISNULL(pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition is null", K(ret), K(pkey));
......@@ -4140,8 +4146,8 @@ int ObPGStorage::try_update_report_status(
for (int64_t i = 0; OB_SUCC(ret) && i < pkeys.count(); ++i) {
bool is_part_finish = false;
bool part_need_report = false;
const ObPartitionKey& pkey = pkeys.at(i);
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
const ObPartitionKey &pkey = pkeys.at(i);
PG_PARTITION_GUARD(guard, pkey)
if (OB_ISNULL(pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition is null", K(ret), K(pkey));
......@@ -4241,9 +4247,9 @@ int ObPGStorage::get_pg_partition_store_meta(const ObPartitionKey& pkey, ObPGPar
STORAGE_LOG(WARN, "get partition store meta get invalid argument", K(ret), K(pkey));
} else {
TCRLockGuard lock_guard(lock_);
ObPGPartition* pg_partition = NULL;
ObPartitionStorage* partition_storage = NULL;
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
ObPartitionStorage *partition_storage = NULL;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
if (NULL == (pg_partition = pg_partition_guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K_(pkey));
......@@ -4282,10 +4288,10 @@ int ObPGStorage::update_report_status_(
int64_t data_version = INT64_MAX;
int64_t snapshot_version = INT64_MAX;
for (int64_t i = 0; OB_SUCC(ret) && i < pkeys.count(); ++i) {
ObPGPartition* pg_partition = nullptr;
ObPartitionStorage* storage = nullptr;
const ObPartitionKey& pkey = pkeys.at(i);
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = nullptr;
ObPartitionStorage *storage = nullptr;
const ObPartitionKey &pkey = pkeys.at(i);
PG_PARTITION_GUARD(guard, pkey)
if (OB_ISNULL(pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition is null", K(ret), K(pkey));
......@@ -4357,9 +4363,9 @@ int ObPGStorage::get_migrate_table_ids(const ObPartitionKey& pkey, ObIArray<uint
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "get partition store meta get invalid argument", K(ret), K(pkey));
} else {
ObPGPartition* pg_partition = NULL;
ObPartitionStorage* partition_storage = NULL;
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
ObPartitionStorage *partition_storage = NULL;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
if (NULL == (pg_partition = pg_partition_guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K_(pkey));
......@@ -4387,9 +4393,9 @@ int ObPGStorage::get_partition_tables(
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "get partition store meta get invalid argument", K(ret), K(pkey), K(table_id));
} else {
ObPGPartition* pg_partition = NULL;
ObPartitionStorage* partition_storage = NULL;
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
ObPartitionStorage *partition_storage = NULL;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
if (NULL == (pg_partition = pg_partition_guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K_(pkey));
......@@ -4418,9 +4424,9 @@ int ObPGStorage::get_partition_gc_tables(
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "get partition store meta get invalid argument", K(ret), K(pkey), K(table_id));
} else {
ObPGPartition* pg_partition = NULL;
ObPartitionStorage* partition_storage = NULL;
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
ObPartitionStorage *partition_storage = NULL;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
if (NULL == (pg_partition = pg_partition_guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K_(pkey));
......@@ -4570,9 +4576,9 @@ int ObPGStorage::replay_pg_partition_store(ObPGPartitionStoreMeta& meta)
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", K(ret), K(meta));
} else {
ObPGPartitionGuard pg_partition_guard(meta.pkey_, *(pg_->get_pg_partition_map()));
ObPGPartition* pg_partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, meta.pkey_)
ObPGPartition *pg_partition = nullptr;
ObPartitionStorage *storage = nullptr;
meta.replica_type_ = meta_->replica_type_;
if (OB_ISNULL(pg_partition = pg_partition_guard.get_pg_partition())) {
FLOG_INFO("pg partition not exist when replay_pg_partition_store", K(meta));
......@@ -4614,9 +4620,9 @@ int ObPGStorage::replay_pg_partition_meta(ObPGPartitionStoreMeta& meta)
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", K(ret), K(meta));
} else {
ObPGPartitionGuard pg_partition_guard(meta.pkey_, *(pg_->get_pg_partition_map()));
ObPGPartition* pg_partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, meta.pkey_)
ObPGPartition *pg_partition = nullptr;
ObPartitionStorage *storage = nullptr;
meta.replica_type_ = meta_->replica_type_;
if (OB_ISNULL(pg_partition = pg_partition_guard.get_pg_partition())) {
FLOG_INFO("pg partition not exist when replay pg partition meta", K(ret), K(meta));
......@@ -4646,9 +4652,9 @@ int ObPGStorage::update_multi_version_start(const ObPartitionKey& pkey, const in
} else {
ObBucketWLockAllGuard bucket_guard(bucket_lock_);
TCWLockGuard guard(lock_);
ObPGPartition* pg_partition = NULL;
ObPartitionStorage* partition_storage = NULL;
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
ObPartitionStorage *partition_storage = NULL;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
if (NULL == (pg_partition = pg_partition_guard.get_pg_partition())) {
STORAGE_LOG(INFO, "pg partition not exist, no need update multi version start", K(ret), K(pkey_), K(pkey));
} else if (OB_UNLIKELY(is_removed_)) {
......@@ -4675,9 +4681,9 @@ int ObPGStorage::get_last_all_major_sstable(ObTablesHandle& handle)
STORAGE_LOG(WARN, "failed to get all pg partition keys", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < pkeys.count(); ++i) {
ObPGPartition* pg_partition = NULL;
const ObPartitionKey& pkey = pkeys.at(i);
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
const ObPartitionKey &pkey = pkeys.at(i);
PG_PARTITION_GUARD(guard, pkey)
if (NULL == (pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition is null, unexpected error", K(ret), K(pkey));
......@@ -4908,7 +4914,7 @@ int ObPGStorage::physical_flashback(const int64_t flashback_scn)
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < partitions.count(); i++) {
bool need_remove = false;
ObPGPartitionGuard pg_partition_guard(partitions.at(i), *(pg_->get_pg_partition_map()));
PG_PARTITION_GUARD(pg_partition_guard, partitions.at(i))
if (NULL == (pg_partition = pg_partition_guard.get_pg_partition())) {
// may pg has create but partition do not create
// do nothing
......@@ -5014,9 +5020,9 @@ int ObPGStorage::get_max_major_sstable_snapshot(int64_t& sstable_ts)
STORAGE_LOG(WARN, "pg storage has not been inited", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < meta_->partitions_.count(); ++i) {
const ObPartitionKey& pkey = meta_->partitions_.at(i);
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPartitionStorage* storage = nullptr;
const ObPartitionKey &pkey = meta_->partitions_.at(i);
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(pg_partition_guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get pg partition", K(ret), K(pkey));
......@@ -5046,9 +5052,9 @@ int ObPGStorage::get_min_max_major_version(int64_t& min_version, int64_t& max_ve
STORAGE_LOG(WARN, "pg storage has not been inited", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < meta_->partitions_.count(); ++i) {
const ObPartitionKey& pkey = meta_->partitions_.at(i);
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPartitionStorage* storage = nullptr;
const ObPartitionKey &pkey = meta_->partitions_.at(i);
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(pg_partition_guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get pg partition", K(ret), K(pkey));
......@@ -5074,9 +5080,9 @@ int ObPGStorage::set_partition_removed_(const ObPartitionKey& pkey)
ret = OB_NOT_INIT;
LOG_WARN("pg storage is not init", K(ret));
} else {
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPGPartition *partition = nullptr;
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(partition = pg_partition_guard.get_pg_partition())) {
ret = OB_PARTITION_NOT_EXIST;
LOG_WARN("pg partition not exist", K(ret), K(pkey), K(*meta_));
......@@ -5108,9 +5114,9 @@ int ObPGStorage::create_index_table_store(
ret = OB_ERR_SYS;
LOG_ERROR("tenant id not match", K(ret), K(pkey), K(pkey_));
} else {
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPGPartition *partition = nullptr;
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(partition = pg_partition_guard.get_pg_partition())) {
ret = OB_PARTITION_NOT_EXIST;
LOG_WARN("pg partition not exist", K(ret), K(pkey), K(*meta_));
......@@ -5144,9 +5150,9 @@ int ObPGStorage::add_sstable(const ObPartitionKey& pkey, storage::ObSSTable* tab
ret = OB_ERR_SYS;
LOG_ERROR("tenant id not match", K(ret), K(pkey), K(pkey_));
} else {
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPGPartition *partition = nullptr;
ObPartitionStorage *storage = nullptr;
const bool is_in_dest_split = is_dest_split(static_cast<ObPartitionSplitStateEnum>(meta_->saved_split_state_));
if (OB_ISNULL(partition = pg_partition_guard.get_pg_partition())) {
ret = OB_PARTITION_NOT_EXIST;
......@@ -5179,9 +5185,9 @@ int ObPGStorage::add_sstable_for_merge(const ObPartitionKey& pkey, storage::ObSS
ret = OB_ERR_SYS;
LOG_ERROR("tenant id not match", K(ret), K(pkey), K(pkey_));
} else {
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPGPartition *partition = nullptr;
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(partition = pg_partition_guard.get_pg_partition())) {
ret = OB_PARTITION_NOT_EXIST;
LOG_WARN("pg partition not exist", K(ret), K(pkey), K(*meta_));
......@@ -5215,10 +5221,10 @@ int ObPGStorage::halt_prewarm()
LOG_WARN("pg is removed", K(ret), K(*meta_));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < meta_->partitions_.count(); ++i) {
const ObPartitionKey& pkey = meta_->partitions_.at(i);
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
ObPartitionStorage* storage = nullptr;
const ObPartitionKey &pkey = meta_->partitions_.at(i);
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPGPartition *partition = nullptr;
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(partition = pg_partition_guard.get_pg_partition())) {
ret = OB_PARTITION_NOT_EXIST;
LOG_WARN("pg partition not exist", K(ret), K(pkey), K(*meta_));
......@@ -5260,7 +5266,7 @@ int ObPGStorage::remove_uncontinues_inc_tables(const ObPartitionKey& pkey, const
const int64_t bucket_idx = pkey.hash() % BUCKET_LOCK_BUCKET_CNT;
ObBucketWLockGuard bucket_guard(bucket_lock_, bucket_idx);
ObPGPartitionGuard pg_guard(pkey, *(pg_->get_pg_partition_map()));
PG_PARTITION_GUARD(pg_guard, pkey)
if (is_removed_) {
ret = OB_PG_IS_REMOVED;
LOG_WARN("pg is removed", K(ret), K_(pkey));
......@@ -5296,9 +5302,9 @@ int ObPGStorage::replay_modify_table_store(const ObModifyTableStoreLogEntry& log
ret = OB_PG_IS_REMOVED;
LOG_WARN("pg is removed", K(ret), K(*meta_));
} else {
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPGPartition *partition = nullptr;
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(partition = pg_partition_guard.get_pg_partition())) {
FLOG_INFO("pg partition not exist when replay modify table store", K(ret), K(pkey), K(*meta_));
ret = OB_SUCCESS;
......@@ -5322,9 +5328,9 @@ int ObPGStorage::set_reference_tables(const ObPartitionKey& pkey, const int64_t
} else if (OB_UNLIKELY(is_removed_)) {
LOG_INFO("pg is removed", K(ret), K(*meta_));
} else {
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPGPartition *partition = nullptr;
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(partition = pg_partition_guard.get_pg_partition())) {
ret = OB_PARTITION_NOT_EXIST;
LOG_WARN("pg partition not exist", K(ret), K(pkey), K(*meta_));
......@@ -5346,8 +5352,8 @@ int ObPGStorage::get_partition_store_info(const ObPartitionKey& pkey, ObPartitio
ret = OB_NOT_INIT;
LOG_WARN("pg storage is not inited", K(ret));
} else {
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(pg_partition_guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get pg partition", K(ret), K(pkey));
......@@ -5380,9 +5386,9 @@ int ObPGStorage::replay_drop_index(const ObPartitionKey& pkey, const uint64_t ta
ret = OB_PG_IS_REMOVED;
LOG_WARN("pg is removed", K(ret), K(*meta_));
} else {
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPGPartition *partition = nullptr;
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(partition = pg_partition_guard.get_pg_partition())) {
FLOG_INFO("pg partition not exist when replay drop index", K(pkey), K(*meta_));
ret = OB_SUCCESS;
......@@ -5426,10 +5432,10 @@ int ObPGStorage::try_drop_unneed_index(const int64_t latest_schema_version /*OB_
ObSEArray<TableStoreStat, OB_MAX_INDEX_PER_TABLE> index_tables;
const bool write_slog = true;
for (int64_t i = 0; OB_SUCC(ret) && i < pkeys.count(); ++i) {
const ObPartitionKey& pkey = pkeys.at(i);
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
ObPartitionStorage* storage = nullptr;
const ObPartitionKey &pkey = pkeys.at(i);
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPGPartition *partition = nullptr;
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(partition = pg_partition_guard.get_pg_partition())) {
ret = OB_PARTITION_NOT_EXIST;
LOG_WARN("pg partition not exist", K(ret), K(pkey));
......@@ -5594,7 +5600,7 @@ int ObPGStorage::check_table_store_empty(bool& is_empty)
STORAGE_LOG(WARN, "failed to get all pg partition keys", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < partitions.count(); i++) {
ObPGPartitionGuard pg_partition_guard(partitions.at(i), *(pg_->get_pg_partition_map()));
PG_PARTITION_GUARD(pg_partition_guard, partitions.at(i))
if (OB_ISNULL(pg_partition = pg_partition_guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get pg partition", K(ret), K(pkey_));
......@@ -5627,8 +5633,8 @@ int ObPGStorage::update_dest_split_state_after_merge_(int64_t& split_state)
LOG_WARN("failed to get all pg partition keys", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < pkeys.count(); ++i) {
const ObPartitionKey& pkey = pkeys.at(i);
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
const ObPartitionKey &pkey = pkeys.at(i);
PG_PARTITION_GUARD(pg_partition_guard, pkey)
if (OB_ISNULL(partition = pg_partition_guard.get_pg_partition())) {
ret = OB_PARTITION_NOT_EXIST;
LOG_WARN("pg partition not exist", K(ret), K(pkey));
......@@ -5650,6 +5656,33 @@ int ObPGStorage::update_dest_split_state_after_merge_(int64_t& split_state)
return ret;
}
// remove pg's partition from global pg_partition_map
// and bookkeepingg on pg_storage
int ObPGStorage::post_del_pg()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "Partition object not initialized", K(ret), K(is_inited_));
} else {
STORAGE_LOG(INFO, "start to localize pg partitions", KP(this), K(pkey_), K(&partition_list_));
LocalizePGPartitionFunctor functor(*pg_->get_pg_partition_map());
if (OB_FAIL(partition_list_.set_partition_info(functor))) {
STORAGE_LOG(ERROR, "localize pg partition fail", K(ret), K(pkey_));
} else {
ATOMIC_STORE(&part_list_contains_part_info_, true); // commit
RemovePGPartitionFunctor functor(*pg_->get_pg_partition_map());
if (OB_FAIL(partition_list_.for_each(functor))) {
STORAGE_LOG(ERROR, "clean pg partition global map fail", K(ret), K(pkey_));
}
}
}
if (OB_SUCC(ret)) {
STORAGE_LOG(INFO, "post del pg succeed: localize pg_partition done.", K(pkey_));
}
return ret;
}
int ObPGStorage::remove_all_pg_index()
{
int ret = OB_SUCCESS;
......@@ -5714,9 +5747,9 @@ int ObPGStorage::update_split_table_store(
ret = OB_PG_IS_REMOVED;
LOG_WARN("pg is removed", K(ret), K_(pkey));
} else {
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPGPartition *partition = nullptr;
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(partition = pg_partition_guard.get_pg_partition())) {
ret = OB_PARTITION_NOT_EXIST;
LOG_WARN("pg partition not exist", K(ret), K(pkey), K(*meta_));
......@@ -5750,9 +5783,9 @@ int ObPGStorage::get_min_sstable_version_(int64_t& min_sstable_snapshot_version)
LOG_WARN("failed to get all pg partition keys", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < pkeys.count(); ++i) {
const ObPartitionKey& pkey = pkeys.at(i);
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPartitionStorage* storage = nullptr;
const ObPartitionKey &pkey = pkeys.at(i);
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(pg_partition_guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get pg partition", K(ret), K(pkey));
......@@ -5806,9 +5839,9 @@ int ObPGStorage::get_table_store_cnt(int64_t& table_store_cnt) const
}
TCRLockGuard lock_guard(lock_);
for (int64_t i = 0; OB_SUCC(ret) && i < meta_->partitions_.count(); ++i) {
const ObPartitionKey& pkey = meta_->partitions_.at(i);
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPartitionStorage* storage = nullptr;
const ObPartitionKey &pkey = meta_->partitions_.at(i);
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(pg_partition_guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get pg partition", K(ret), K(pkey));
......@@ -5831,8 +5864,8 @@ int ObPGStorage::get_partition_access_stat(const ObPartitionKey& pkey, ObPartiti
ret = OB_NOT_INIT;
LOG_WARN("pg storage is not inited", K(ret));
} else {
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(pg_partition_guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get pg partition", K(ret), K(pkey));
......@@ -5850,9 +5883,9 @@ int ObPGStorage::get_partition_access_stat(const ObPartitionKey& pkey, ObPartiti
int ObPGStorage::feedback_scan_access_stat(const ObTableScanParam& param)
{
int ret = OB_SUCCESS;
const common::ObPartitionKey& pkey = (pkey_.is_pg() ? param.pkey_ : pkey_);
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
const common::ObPartitionKey &pkey = (pkey_.is_pg() ? param.pkey_ : pkey_);
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (OB_ISNULL(pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K_(pkey));
......@@ -6421,7 +6454,7 @@ int ObPGStorage::check_complete(bool& is_complete)
STORAGE_LOG(WARN, "get all pg partition keys failed", K(ret), K_(pkey));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < pkeys.count(); i++) {
ObPGPartitionGuard guard(pkeys.at(i), *(pg_->get_pg_partition_map()));
PG_PARTITION_GUARD(guard, pkeys.at(i))
if (NULL == (pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K(pkeys.at(i)));
......@@ -6630,9 +6663,9 @@ int ObPGStorage::update_upper_trans_version_(const ObPartitionKey& pkey, ObTrans
const int64_t last_replay_log_ts, ObTablesHandle& updated_tables)
{
int ret = OB_SUCCESS;
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* pg_partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPGPartition *pg_partition = nullptr;
ObPartitionStorage *storage = nullptr;
ObTablesHandle handle;
if (OB_ISNULL(pg_partition = pg_partition_guard.get_pg_partition())) {
LOG_INFO("pg partition may be deleted", K(ret), K(pkey));
......@@ -6740,9 +6773,9 @@ int ObPGStorage::restore_mem_trans_table()
} else if (OB_FAIL(trans_table_pkey.generate_trans_table_pkey(pkey_))) {
LOG_WARN("failed to generate trans_table_pkey", K(ret), K_(pkey));
} else {
ObPGPartitionGuard pg_partition_guard(trans_table_pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, trans_table_pkey)
ObPGPartition *partition = nullptr;
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(partition = pg_partition_guard.get_pg_partition())) {
if (0 != get_partition_cnt()) {
ret = OB_PARTITION_NOT_EXIST;
......@@ -6820,8 +6853,8 @@ int ObPGStorage::restore_mem_trans_table(ObSSTable& trans_sstable)
iter_param.rowkey_cnt_ = 1;
iter_param.out_cols_ = &columns;
ObPGPartitionGuard guard(trans_table_pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
PG_PARTITION_GUARD(guard, trans_table_pkey)
ObPGPartition *partition = nullptr;
if (OB_ISNULL(partition = guard.get_pg_partition())) {
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(trans_sstable.scan(iter_param, access_context, whole_range, row_iter))) {
......@@ -6908,9 +6941,9 @@ int ObPGStorage::remove_old_table_(const ObPartitionKey& pkey, const int64_t fro
ret = OB_PG_IS_REMOVED;
LOG_WARN("pg is removed", K(ret), K(pkey));
} else {
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* pg_partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
ObPGPartition *pg_partition = nullptr;
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(pg_partition = pg_partition_guard.get_pg_partition())) {
LOG_INFO("pg partition may be deleted", K(ret), K(pkey));
} else if (OB_ISNULL(storage = static_cast<ObPartitionStorage*>(pg_partition->get_storage()))) {
......@@ -6976,7 +7009,7 @@ int ObPGStorage::get_min_schema_version(int64_t& min_schema_version)
STORAGE_LOG(WARN, "get all pg partition keys failed", K(ret), K_(pkey));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < pkeys.count(); i++) {
ObPGPartitionGuard guard(pkeys.at(i), *(pg_->get_pg_partition_map()));
PG_PARTITION_GUARD(guard, pkeys.at(i))
if (NULL == (pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K(pkeys.at(i)));
......@@ -7057,9 +7090,9 @@ int ObPGStorage::get_trans_table_end_log_ts_and_timestamp_(int64_t& end_log_ts,
} else if (OB_FAIL(trans_table_pkey.generate_trans_table_pkey(pkey_))) {
LOG_WARN("failed to generate trans_table_pkey", K(ret), K_(pkey));
} else {
ObPGPartitionGuard pg_partition_guard(trans_table_pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, trans_table_pkey)
ObPGPartition *partition = nullptr;
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(partition = pg_partition_guard.get_pg_partition())) {
if (0 != get_partition_cnt()) {
ret = OB_PARTITION_NOT_EXIST;
......@@ -7115,7 +7148,7 @@ int ObPGStorage::clear_all_complement_minor_sstable_()
} else {
ObPGPartition* pg_partition = nullptr;
for (int64_t i = 0; OB_SUCC(ret) && i < pkeys.count(); ++i) {
ObPGPartitionGuard guard(pkeys.at(i), *(pg_->get_pg_partition_map()));
PG_PARTITION_GUARD(guard, pkeys.at(i))
if (NULL == (pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K(pkeys.at(i)));
......@@ -7169,7 +7202,7 @@ int ObPGStorage::check_can_physical_flashback(const int64_t flashback_scn)
need_skip_check = true;
for (int64_t i = 0; OB_SUCC(ret) && i < partitions.count(); i++) {
bool need_remove = false;
ObPGPartitionGuard pg_partition_guard(partitions.at(i), *(pg_->get_pg_partition_map()));
PG_PARTITION_GUARD(pg_partition_guard, partitions.at(i))
int64_t publish_version = 0;
if (NULL == (pg_partition = pg_partition_guard.get_pg_partition())) {
// may pg has create but partition do not create
......@@ -7234,8 +7267,8 @@ int ObPGStorage::check_can_physical_flashback(const int64_t flashback_scn)
int ObPGStorage::get_latest_table_count(const ObPartitionKey& pkey, const int64_t index_id, int64_t& table_count)
{
int ret = OB_SUCCESS;
ObPGPartition* pg_partition = NULL;
ObPGPartitionGuard guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
PG_PARTITION_GUARD(guard, pkey)
if (NULL == (pg_partition = guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K_(pkey));
......@@ -7351,8 +7384,8 @@ int ObPGStorage::get_trans_table_status(ObTransTableStatus& status)
if (OB_FAIL(trans_table_pkey.generate_trans_table_pkey(pkey_))) {
LOG_WARN("failed to generate trans_table_pkey", K(ret), K_(pkey));
} else {
ObPGPartitionGuard guard(trans_table_pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
PG_PARTITION_GUARD(guard, trans_table_pkey)
ObPGPartition *partition = nullptr;
if (OB_ISNULL(partition = guard.get_pg_partition())) {
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(static_cast<ObPartitionStorage*>(partition->get_storage())
......@@ -7445,9 +7478,9 @@ int ObPGStorage::prepare_partition_store_map_(
const ObPartitionMigrateCtx& ctx, ObPartitionStore::TableStoreMap*& store_map)
{
int ret = OB_SUCCESS;
ObPGPartitionGuard part_guard(ctx.copy_info_.meta_.pkey_, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(part_guard, ctx.copy_info_.meta_.pkey_)
ObPGPartition *partition = nullptr;
ObPartitionStorage *storage = nullptr;
const int64_t max_kept_major_version_number = GCONF.max_kept_major_version_number;
store_map = nullptr;
if (ctx.handle_.empty()) {
......@@ -7476,7 +7509,7 @@ int ObPGStorage::remove_unneed_table_store_within_trans(
for (int64_t i = 0; OB_SUCC(ret) && i < part_ctx_array.count(); ++i) {
const ObPartitionMigrateCtx &ctx = part_ctx_array.at(i);
if (OB_NOT_NULL(store_map = store_maps[i])) {
ObPGPartitionGuard part_guard(ctx.copy_info_.meta_.pkey_, *(pg_->get_pg_partition_map()));
PG_PARTITION_GUARD(part_guard, ctx.copy_info_.meta_.pkey_)
if (OB_ISNULL(partition = part_guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get pg partition failed", K(ret), K(ctx));
......@@ -7502,9 +7535,9 @@ int ObPGStorage::do_replace_store_map_(
const ObPartitionMigrateCtx& ctx = part_ctx_array.at(i);
ObPartitionStore::TableStoreMap* store_map = store_maps[i];
if (store_map != nullptr) {
ObPGPartitionGuard part_guard(ctx.copy_info_.meta_.pkey_, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(part_guard, ctx.copy_info_.meta_.pkey_)
ObPGPartition *partition = nullptr;
ObPartitionStorage *storage = nullptr;
if (OB_ISNULL(partition = part_guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get pg partition failed", K(ret), K(ctx));
......@@ -7563,9 +7596,9 @@ int ObPGStorage::add_trans_sstable(const int64_t old_trans_table_seq, ObSSTable*
LOG_WARN("failed to generate trans_table_pkey", K(ret), K_(pkey));
} else {
ObBucketWLockGuard bucket_guard(bucket_lock_, get_bucket_idx_(trans_table_pkey));
ObPGPartitionGuard pg_partition_guard(trans_table_pkey, *(pg_->get_pg_partition_map()));
ObPGPartition* partition = nullptr;
ObPartitionStorage* storage = nullptr;
PG_PARTITION_GUARD(pg_partition_guard, trans_table_pkey)
ObPGPartition *partition = nullptr;
ObPartitionStorage *storage = nullptr;
if (is_paused_) {
ret = OB_EAGAIN;
LOG_WARN("pg is paused, can not add trans table now", K(ret), K_(pkey), K(trans_table_pkey));
......@@ -7936,9 +7969,9 @@ int ObPGStorage::get_partition_migrate_tables(
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "get partition store meta get invalid argument", K(ret), K(pkey), K(table_id));
} else {
ObPGPartition* pg_partition = NULL;
ObPartitionStorage* partition_storage = NULL;
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
ObPGPartition *pg_partition = NULL;
ObPartitionStorage *partition_storage = NULL;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
if (NULL == (pg_partition = pg_partition_guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K_(pkey));
......@@ -8247,10 +8280,10 @@ int ObPGStorage::get_restore_point_tables_(const int64_t snapshot_version, const
} else {
for (int64_t i = 0; OB_SUCC(ret) && is_tmp_ready && i < pg_meta.partitions_.count(); ++i) {
meta.reset();
const ObPartitionKey& pkey = pg_meta.partitions_.at(i);
ObPGPartition* pg_partition = NULL;
ObPartitionStorage* partition_storage = NULL;
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
const ObPartitionKey &pkey = pg_meta.partitions_.at(i);
ObPGPartition *pg_partition = NULL;
ObPartitionStorage *partition_storage = NULL;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
if (NULL == (pg_partition = pg_partition_guard.get_pg_partition())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "pg partition info is null, unexpected error", K(ret), K_(pkey));
......@@ -8318,10 +8351,10 @@ int ObPGStorage::get_restore_point_max_schema_version_(
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < partitions.count(); ++i) {
tables_handle.reset();
const ObPartitionKey& pkey = partitions.at(i);
ObPGPartition* pg_partition = NULL;
ObPartitionStorage* partition_storage = NULL;
ObPGPartitionGuard pg_partition_guard(pkey, *(pg_->get_pg_partition_map()));
const ObPartitionKey &pkey = partitions.at(i);
ObPGPartition *pg_partition = NULL;
ObPartitionStorage *partition_storage = NULL;
PG_PARTITION_GUARD(pg_partition_guard, pkey)
if (pkey.is_trans_table()) {
// do nothing
} else if (NULL == (pg_partition = pg_partition_guard.get_pg_partition())) {
......
......@@ -168,9 +168,8 @@ public:
int set_emergency_release();
int get_active_memtable(ObTableHandle& handle);
// PG, pg partition
int get_pg_partition(const common::ObPartitionKey& pkey, ObPGPartitionGuard& guard);
const common::ObPartitionKey& get_partition_key() const
int get_pg_partition(const common::ObPartitionKey &pkey, ObPGPartitionGuard &guard) const;
const common::ObPartitionKey &get_partition_key() const
{
return pkey_;
}
......@@ -308,8 +307,9 @@ public:
{
return pg_memtable_mgr_.get_readable_info();
}
int get_pkey_for_table(const int64_t table_id, ObPartitionKey& pkey);
int update_split_state_after_merge(int64_t& split_state);
int get_pkey_for_table(const int64_t table_id, ObPartitionKey &pkey);
int update_split_state_after_merge(int64_t &split_state);
int post_del_pg();
int remove_all_pg_index();
int update_split_table_store(
const common::ObPartitionKey& pkey, int64_t table_id, bool is_major_split, ObTablesHandle& handle);
......@@ -448,6 +448,29 @@ private:
private:
ObPartitionArray& pkeys_;
};
class LocalizePGPartitionFunctor {
public:
explicit LocalizePGPartitionFunctor(ObPGPartitionMap &map) : map_(map)
{}
~LocalizePGPartitionFunctor()
{}
int operator()(const common::ObPartitionKey &pkey, ObPGPartition *&part)
{
int ret = OB_SUCCESS;
if (!pkey.is_valid()) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(ERROR, "invalid pkey", K(pkey));
} else if (OB_FAIL(map_.get(pkey, part))) {
STORAGE_LOG(ERROR, "fail to get part", K(ret), K(pkey));
} else {
STORAGE_LOG(INFO, "localize part OK", K(pkey), K(*part));
}
return ret;
}
private:
ObPGPartitionMap &map_;
};
class RemovePGIndexFunctor {
public:
explicit RemovePGIndexFunctor(ObPartitionGroupIndex& pg_index) : pg_index_(pg_index)
......@@ -462,6 +485,8 @@ private:
STORAGE_LOG(ERROR, "invalid pkey", K(pkey));
} else if (OB_FAIL(pg_index_.remove_partition(pkey))) {
STORAGE_LOG(ERROR, "failed to remove pg index", K(ret), K(pkey));
} else {
STORAGE_LOG(INFO, "remove part from index OK", K(pkey));
}
return ret;
}
......@@ -503,13 +528,20 @@ private:
public:
RemovePGPartitionFunctor(ObPGPartitionMap& map) : map_(map)
{}
void operator()(const common::ObPartitionKey& pkey)
int operator()(const common::ObPartitionKey &pkey, ObPGPartition *part = NULL)
{
int ret = OB_SUCCESS;
if (!pkey.is_valid()) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid pkey", K(pkey));
} else {
map_.del(pkey);
if (part != NULL) {
map_.revert(part);
} else if (OB_FAIL(map_.del(pkey))) {
STORAGE_LOG(WARN, "del pg partition from map fail", K(pkey));
}
}
return ret;
}
private:
......@@ -634,13 +666,14 @@ private:
bool is_inited_;
bool is_removed_;
common::ObPartitionKey pkey_;
ObIPartitionComponentFactory* cp_fty_;
transaction::ObTransService* txs_;
clog::ObIPartitionLogService* pls_;
ObIPartitionGroup* pg_;
ObPGPartition* pg_partition_;
share::schema::ObMultiVersionSchemaService* schema_service_;
ObPartitionKeyList partition_list_;
ObIPartitionComponentFactory *cp_fty_;
transaction::ObTransService *txs_;
clog::ObIPartitionLogService *pls_;
ObIPartitionGroup *pg_;
ObPGPartition *pg_partition_;
share::schema::ObMultiVersionSchemaService *schema_service_;
ObPGPartitionList partition_list_;
bool part_list_contains_part_info_; // true when pg removed
// pg meta and memstore
common::ObBucketLock bucket_lock_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册