提交 1613b26c 编写于 作者: Y ym0 提交者: LINGuanRen

Support to get flag replica by ObPartitionTableOperator

上级 86370d23
...@@ -1487,6 +1487,9 @@ int ObGlobalIndexBuilder::drive_this_copy_multi_replica(const share::schema::ObT ...@@ -1487,6 +1487,9 @@ int ObGlobalIndexBuilder::drive_this_copy_multi_replica(const share::schema::ObT
} else { } else {
bool all_copy_finish = true; bool all_copy_finish = true;
int64_t major_sstable_exist_reply_ts = 0; int64_t major_sstable_exist_reply_ts = 0;
const bool need_fail_list = false;
const int64_t cluster_id = OB_INVALID_ID; // local cluster
const bool filter_flag_replica = false;
common::ObArray<PartitionSSTableBuildStat> partition_sstable_stat_array; common::ObArray<PartitionSSTableBuildStat> partition_sstable_stat_array;
{ {
SpinWLockGuard item_guard(task->lock_); SpinWLockGuard item_guard(task->lock_);
...@@ -1515,7 +1518,12 @@ int ObGlobalIndexBuilder::drive_this_copy_multi_replica(const share::schema::ObT ...@@ -1515,7 +1518,12 @@ int ObGlobalIndexBuilder::drive_this_copy_multi_replica(const share::schema::ObT
if (!pkey.is_valid()) { if (!pkey.is_valid()) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("pkey invalid", K(ret), K(pkey)); LOG_WARN("pkey invalid", K(ret), K(pkey));
} else if (OB_FAIL(pt_operator_->get(pkey.get_table_id(), pkey.get_partition_id(), info))) { } else if (OB_FAIL(pt_operator_->get(pkey.get_table_id(),
pkey.get_partition_id(),
info,
need_fail_list,
cluster_id,
filter_flag_replica))) {
LOG_WARN("fail to get partition info", K(ret), K(pkey)); LOG_WARN("fail to get partition info", K(ret), K(pkey));
} else if (OB_FAIL(filter.set_replica_status(REPLICA_STATUS_NORMAL))) { } else if (OB_FAIL(filter.set_replica_status(REPLICA_STATUS_NORMAL))) {
LOG_WARN("fail to set replica status", K(ret)); LOG_WARN("fail to set replica status", K(ret));
......
...@@ -720,6 +720,7 @@ int ObRSBuildIndexTask::wait_build_index_end(bool& is_end) ...@@ -720,6 +720,7 @@ int ObRSBuildIndexTask::wait_build_index_end(bool& is_end)
const ObTableSchema* index_schema = NULL; const ObTableSchema* index_schema = NULL;
const ObTableSchema* table_schema = NULL; const ObTableSchema* table_schema = NULL;
ObIndexBuildStatus all_status; ObIndexBuildStatus all_status;
const bool filter_flag_replica = false;
ObTablePartitionIterator iter; ObTablePartitionIterator iter;
ObReplicaFilterHolder filter; ObReplicaFilterHolder filter;
int64_t table_id = OB_INVALID_ID; int64_t table_id = OB_INVALID_ID;
...@@ -758,7 +759,7 @@ int ObRSBuildIndexTask::wait_build_index_end(bool& is_end) ...@@ -758,7 +759,7 @@ int ObRSBuildIndexTask::wait_build_index_end(bool& is_end)
} }
if (OB_FAIL(ret) || is_end) { if (OB_FAIL(ret) || is_end) {
} else if (OB_FAIL(iter.init(table_id, schema_guard, ddl_service_->get_pt_operator()))) { } else if (OB_FAIL(iter.init(table_id, schema_guard, ddl_service_->get_pt_operator(), filter_flag_replica))) {
LOG_WARN("fail to init partition table iterator", LOG_WARN("fail to init partition table iterator",
K(ret), K(ret),
"table_id", "table_id",
......
...@@ -53,7 +53,7 @@ void ObInMemoryPartitionTable::reuse() ...@@ -53,7 +53,7 @@ void ObInMemoryPartitionTable::reuse()
} }
int ObInMemoryPartitionTable::get(const uint64_t table_id, const int64_t partition_id, ObPartitionInfo& partition_info, int ObInMemoryPartitionTable::get(const uint64_t table_id, const int64_t partition_id, ObPartitionInfo& partition_info,
const bool need_fetch_faillist, const int64_t cluster_id) const bool need_fetch_faillist, const int64_t cluster_id, const bool filter_flag_replica)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
lib::ObMutexGuard guard(mutex_); lib::ObMutexGuard guard(mutex_);
...@@ -68,7 +68,6 @@ int ObInMemoryPartitionTable::get(const uint64_t table_id, const int64_t partiti ...@@ -68,7 +68,6 @@ int ObInMemoryPartitionTable::get(const uint64_t table_id, const int64_t partiti
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), KT(table_id), K(partition_id)); LOG_WARN("invalid argument", K(ret), KT(table_id), K(partition_id));
} else { } else {
const bool filter_flag_replica = true;
if (OB_FAIL(inner_get(table_id, partition_id, filter_flag_replica, partition_info))) { if (OB_FAIL(inner_get(table_id, partition_id, filter_flag_replica, partition_info))) {
LOG_WARN("inner_get failed", KT(table_id), K(partition_id), K(filter_flag_replica), K(ret)); LOG_WARN("inner_get failed", KT(table_id), K(partition_id), K(filter_flag_replica), K(ret));
} }
...@@ -84,7 +83,8 @@ int ObInMemoryPartitionTable::get(const uint64_t table_id, const int64_t partiti ...@@ -84,7 +83,8 @@ int ObInMemoryPartitionTable::get(const uint64_t table_id, const int64_t partiti
} }
int ObInMemoryPartitionTable::prefetch_by_table_id(const uint64_t tenant_id, const uint64_t start_table_id, int ObInMemoryPartitionTable::prefetch_by_table_id(const uint64_t tenant_id, const uint64_t start_table_id,
const int64_t start_partition_id, ObIArray<ObPartitionInfo>& partition_infos, const bool need_fetch_faillist) const int64_t start_partition_id, ObIArray<ObPartitionInfo>& partition_infos, const bool need_fetch_faillist,
const bool filter_flag_replica)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
lib::ObMutexGuard guard(mutex_); lib::ObMutexGuard guard(mutex_);
...@@ -96,7 +96,6 @@ int ObInMemoryPartitionTable::prefetch_by_table_id(const uint64_t tenant_id, con ...@@ -96,7 +96,6 @@ int ObInMemoryPartitionTable::prefetch_by_table_id(const uint64_t tenant_id, con
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", K(tenant_id), K(ret), K(start_table_id), K(start_partition_id)); LOG_WARN("invalid tenant_id", K(tenant_id), K(ret), K(start_table_id), K(start_partition_id));
} else { } else {
const bool filter_flag_replica = true;
const uint64_t table_id = combine_id(OB_SYS_TENANT_ID, OB_ALL_CORE_TABLE_TID); const uint64_t table_id = combine_id(OB_SYS_TENANT_ID, OB_ALL_CORE_TABLE_TID);
const int64_t partition_id = ALL_CORE_TABLE_PARTITION_ID; const int64_t partition_id = ALL_CORE_TABLE_PARTITION_ID;
ObPartitionInfo partition_info; ObPartitionInfo partition_info;
......
...@@ -46,11 +46,12 @@ public: ...@@ -46,11 +46,12 @@ public:
return inited_; return inited_;
} }
virtual int get(const uint64_t table_id, const int64_t partition_id, ObPartitionInfo& partition_info, virtual int get(const uint64_t table_id, const int64_t partition_id, ObPartitionInfo& partition_info,
const bool need_fetch_faillist = false, const int64_t cluster_id = common::OB_INVALID_ID) override; const bool need_fetch_faillist = false, const int64_t cluster_id = common::OB_INVALID_ID,
const bool filter_flag_replica = true) override;
virtual int prefetch_by_table_id(const uint64_t tenant_id, const uint64_t start_table_id, virtual int prefetch_by_table_id(const uint64_t tenant_id, const uint64_t start_table_id,
const int64_t start_partition_id, common::ObIArray<ObPartitionInfo>& partition_infos, const int64_t start_partition_id, common::ObIArray<ObPartitionInfo>& partition_infos,
const bool need_fetch_faillist = false) override; const bool need_fetch_faillist = false, const bool filter_flag_replica = true) override;
virtual int prefetch(const uint64_t tenant_id, const uint64_t start_table_id, const int64_t start_partition_id, virtual int prefetch(const uint64_t tenant_id, const uint64_t start_table_id, const int64_t start_partition_id,
common::ObIArray<ObPartitionInfo>& partition_infos, bool ignore_row_checksum, common::ObIArray<ObPartitionInfo>& partition_infos, bool ignore_row_checksum,
......
...@@ -79,7 +79,8 @@ public: ...@@ -79,7 +79,8 @@ public:
} }
virtual int get(const uint64_t table_id, const int64_t partition_id, ObPartitionInfo& partition_info, virtual int get(const uint64_t table_id, const int64_t partition_id, ObPartitionInfo& partition_info,
const bool need_fetch_faillist = false, const int64_t cluster_id = common::OB_INVALID_ID) = 0; const bool need_fetch_faillist = false, const int64_t cluster_id = common::OB_INVALID_ID,
const bool filter_flag_replica = true) = 0;
virtual int batch_fetch_partition_infos(const common::ObIArray<common::ObPartitionKey>& keys, virtual int batch_fetch_partition_infos(const common::ObIArray<common::ObPartitionKey>& keys,
common::ObIAllocator& allocator, common::ObArray<ObPartitionInfo*>& partitions, common::ObIAllocator& allocator, common::ObArray<ObPartitionInfo*>& partitions,
...@@ -90,7 +91,8 @@ public: ...@@ -90,7 +91,8 @@ public:
virtual int batch_report_partition_role( virtual int batch_report_partition_role(
const common::ObIArray<share::ObPartitionReplica>& pkey_array, const common::ObRole new_role) = 0; const common::ObIArray<share::ObPartitionReplica>& pkey_array, const common::ObRole new_role) = 0;
virtual int prefetch_by_table_id(const uint64_t tenant_id, const uint64_t table_id, const int64_t partition_id, virtual int prefetch_by_table_id(const uint64_t tenant_id, const uint64_t table_id, const int64_t partition_id,
common::ObIArray<ObPartitionInfo>& partition_infos, const bool need_fetch_faillist = false) = 0; common::ObIArray<ObPartitionInfo>& partition_infos, const bool need_fetch_faillist = false,
const bool filter_flag_replica = true) = 0;
virtual int prefetch(const uint64_t tenant_id, const uint64_t table_id, const int64_t partition_id, virtual int prefetch(const uint64_t tenant_id, const uint64_t table_id, const int64_t partition_id,
common::ObIArray<ObPartitionInfo>& partition_infos, bool ignore_row_checksum, common::ObIArray<ObPartitionInfo>& partition_infos, bool ignore_row_checksum,
......
...@@ -167,9 +167,18 @@ int ObTablePartitionIterator::ObPrefetchInfo::prefetch() ...@@ -167,9 +167,18 @@ int ObTablePartitionIterator::ObPrefetchInfo::prefetch()
uint64_t tenant_id = extract_tenant_id(table_id_); uint64_t tenant_id = extract_tenant_id(table_id_);
prefetch_idx_ = 0; prefetch_idx_ = 0;
prefetch_partitions_.reuse(); prefetch_partitions_.reuse();
if (OB_FAIL(pt_operator_->prefetch_by_table_id( if (OB_FAIL(pt_operator_->prefetch_by_table_id(tenant_id,
tenant_id, table_id_, start_partition_id, prefetch_partitions_, need_fetch_faillist_))) { table_id_,
LOG_WARN("fail to prefetch partitions", K(ret), K(table_id_), K(start_partition_id)); start_partition_id,
prefetch_partitions_,
need_fetch_faillist_,
filter_flag_replica_))) {
LOG_WARN("fail to prefetch partitions",
K(ret),
K(table_id_),
K(start_partition_id),
K_(need_fetch_faillist),
K_(filter_flag_replica));
} else if (!first_prefetch) { } else if (!first_prefetch) {
prefetch_idx_++; // the first partition is duplicated, need to be filtered prefetch_idx_++; // the first partition is duplicated, need to be filtered
} }
...@@ -202,7 +211,8 @@ ObTablePartitionIterator::~ObTablePartitionIterator() ...@@ -202,7 +211,8 @@ ObTablePartitionIterator::~ObTablePartitionIterator()
// check if we need to access the tenant level meta table by the mode // check if we need to access the tenant level meta table by the mode
// when TablePartitionIterator::init is invoked // when TablePartitionIterator::init is invoked
int ObTablePartitionIterator::init( int ObTablePartitionIterator::init(
const uint64_t table_id, ObSchemaGetterGuard& schema_guard, ObPartitionTableOperator& pt_operator) const uint64_t table_id, ObSchemaGetterGuard& schema_guard, ObPartitionTableOperator& pt_operator,
const bool filter_flag_replica /* = true*/)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const uint64_t tenant_id = extract_tenant_id(table_id); const uint64_t tenant_id = extract_tenant_id(table_id);
...@@ -232,6 +242,7 @@ int ObTablePartitionIterator::init( ...@@ -232,6 +242,7 @@ int ObTablePartitionIterator::init(
part_level_ = partition_schema->get_part_level(); part_level_ = partition_schema->get_part_level();
prefetch_info_.reset(); prefetch_info_.reset();
prefetch_info_.set_need_fetch_faillist(need_fetch_faillist_); prefetch_info_.set_need_fetch_faillist(need_fetch_faillist_);
prefetch_info_.set_filter_flag_replica(filter_flag_replica);
allocator_.reuse(); allocator_.reuse();
inited_ = true; inited_ = true;
} }
......
...@@ -91,7 +91,8 @@ public: ...@@ -91,7 +91,8 @@ public:
pt_operator_(NULL), pt_operator_(NULL),
prefetch_iter_end_(false), prefetch_iter_end_(false),
prefetch_partitions_(), prefetch_partitions_(),
need_fetch_faillist_(false) need_fetch_faillist_(false),
filter_flag_replica_(true)
{} {}
~ObPrefetchInfo() ~ObPrefetchInfo()
{} {}
...@@ -101,6 +102,7 @@ public: ...@@ -101,6 +102,7 @@ public:
prefetch_iter_end_ = false; prefetch_iter_end_ = false;
prefetch_partitions_.reuse(); prefetch_partitions_.reuse();
need_fetch_faillist_ = false; need_fetch_faillist_ = false;
filter_flag_replica_ = true;
} }
int init(uint64_t table_id, ObPartitionTableOperator& pt_operator); int init(uint64_t table_id, ObPartitionTableOperator& pt_operator);
bool need_prefetch() const bool need_prefetch() const
...@@ -118,6 +120,10 @@ public: ...@@ -118,6 +120,10 @@ public:
{ {
need_fetch_faillist_ = need_fetch_faillist; need_fetch_faillist_ = need_fetch_faillist;
} }
void set_filter_flag_replica(const bool filter_flag_replica)
{
filter_flag_replica_ = filter_flag_replica;
}
private: private:
int64_t prefetch_idx_; int64_t prefetch_idx_;
...@@ -127,13 +133,15 @@ public: ...@@ -127,13 +133,15 @@ public:
common::ObArray<ObPartitionInfo> prefetch_partitions_; common::ObArray<ObPartitionInfo> prefetch_partitions_;
// false by defaulta,only set to true for load balance // false by defaulta,only set to true for load balance
bool need_fetch_faillist_; bool need_fetch_faillist_;
bool filter_flag_replica_;
}; };
ObTablePartitionIterator(); ObTablePartitionIterator();
virtual ~ObTablePartitionIterator(); virtual ~ObTablePartitionIterator();
// can be inited twice // can be inited twice
int init( int init(
const uint64_t table_id, share::schema::ObSchemaGetterGuard& schema_guard, ObPartitionTableOperator& pt_operator); const uint64_t table_id, share::schema::ObSchemaGetterGuard& schema_guard, ObPartitionTableOperator& pt_operator,
const bool filter_flag_replica = true);
bool is_inited() bool is_inited()
{ {
return inited_; return inited_;
......
...@@ -117,7 +117,7 @@ int ObPartitionTableOperator::set_use_rpc_table(ObCommonRpcProxy& rpc_proxy, ObR ...@@ -117,7 +117,7 @@ int ObPartitionTableOperator::set_use_rpc_table(ObCommonRpcProxy& rpc_proxy, ObR
} }
int ObPartitionTableOperator::get(const uint64_t table_id, const int64_t partition_id, ObPartitionInfo& partition_info, int ObPartitionTableOperator::get(const uint64_t table_id, const int64_t partition_id, ObPartitionInfo& partition_info,
const bool need_fetch_faillist, const int64_t cluster_id) const bool need_fetch_faillist, const int64_t cluster_id, const bool filter_flag_replica)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t start_time = ObTimeUtility::current_time(); int64_t start_time = ObTimeUtility::current_time();
...@@ -140,7 +140,8 @@ int ObPartitionTableOperator::get(const uint64_t table_id, const int64_t partiti ...@@ -140,7 +140,8 @@ int ObPartitionTableOperator::get(const uint64_t table_id, const int64_t partiti
ObTimeoutCtx ctx; ObTimeoutCtx ctx;
if (OB_FAIL(rootserver::ObRootUtils::get_rs_default_timeout_ctx(ctx))) { if (OB_FAIL(rootserver::ObRootUtils::get_rs_default_timeout_ctx(ctx))) {
LOG_WARN("fail to get timeout ctx", K(ret), K(ctx)); LOG_WARN("fail to get timeout ctx", K(ret), K(ctx));
} else if (OB_FAIL(pt->get(table_id, partition_id, partition_info, need_fetch_faillist, cluster_id))) { } else if (OB_FAIL(pt->get(
table_id, partition_id, partition_info, need_fetch_faillist, cluster_id, filter_flag_replica))) {
LOG_WARN("get partition info failed", K(ret), KT(table_id), K(partition_id)); LOG_WARN("get partition info failed", K(ret), KT(table_id), K(partition_id));
} }
} }
...@@ -153,7 +154,8 @@ int ObPartitionTableOperator::get(const uint64_t table_id, const int64_t partiti ...@@ -153,7 +154,8 @@ int ObPartitionTableOperator::get(const uint64_t table_id, const int64_t partiti
} }
int ObPartitionTableOperator::prefetch_by_table_id(const uint64_t tenant_id, const uint64_t start_table_id, int ObPartitionTableOperator::prefetch_by_table_id(const uint64_t tenant_id, const uint64_t start_table_id,
const int64_t start_partition_id, ObIArray<ObPartitionInfo>& partition_infos, const bool need_fetch_faillist) const int64_t start_partition_id, ObIArray<ObPartitionInfo>& partition_infos, const bool need_fetch_faillist,
const bool filter_flag_replica)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObIPartitionTable* pt = NULL; ObIPartitionTable* pt = NULL;
...@@ -171,9 +173,19 @@ int ObPartitionTableOperator::prefetch_by_table_id(const uint64_t tenant_id, con ...@@ -171,9 +173,19 @@ int ObPartitionTableOperator::prefetch_by_table_id(const uint64_t tenant_id, con
} else if (OB_ISNULL(pt)) { } else if (OB_ISNULL(pt)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL partition table", K(ret)); LOG_WARN("NULL partition table", K(ret));
} else if (OB_FAIL(pt->prefetch_by_table_id( } else if (OB_FAIL(pt->prefetch_by_table_id(tenant_id,
tenant_id, start_table_id, start_partition_id, partition_infos, need_fetch_faillist))) { start_table_id,
LOG_WARN("partition_table prefetch failed", K(tenant_id), K(start_table_id), K(start_partition_id), K(ret)); start_partition_id,
partition_infos,
need_fetch_faillist,
filter_flag_replica))) {
LOG_WARN("partition_table prefetch failed",
K(tenant_id),
K(start_table_id),
K(start_partition_id),
K(need_fetch_faillist),
K(filter_flag_replica),
K(ret));
} }
LOG_DEBUG( LOG_DEBUG(
"prefetch by table_id", K(ret), K(tenant_id), K(start_table_id), K(start_partition_id), K(need_fetch_faillist)); "prefetch by table_id", K(ret), K(tenant_id), K(start_table_id), K(start_partition_id), K(need_fetch_faillist));
......
...@@ -45,11 +45,12 @@ public: ...@@ -45,11 +45,12 @@ public:
int set_callback_for_obs(obrpc::ObCommonRpcProxy& rpc_proxy, ObRsMgr& rs_mgr, common::ObServerConfig& config); int set_callback_for_obs(obrpc::ObCommonRpcProxy& rpc_proxy, ObRsMgr& rs_mgr, common::ObServerConfig& config);
virtual int get(const uint64_t table_id, const int64_t partition_id, ObPartitionInfo& partition_info, virtual int get(const uint64_t table_id, const int64_t partition_id, ObPartitionInfo& partition_info,
const bool need_fetch_faillist = false, const int64_t cluster_id = common::OB_INVALID_ID) override; const bool need_fetch_faillist = false, const int64_t cluster_id = common::OB_INVALID_ID,
const bool filter_flag_replica = true) override;
virtual int prefetch_by_table_id(const uint64_t tenant_id, const uint64_t start_table_id, virtual int prefetch_by_table_id(const uint64_t tenant_id, const uint64_t start_table_id,
const int64_t set_partition_id, common::ObIArray<ObPartitionInfo>& partition_infos, const int64_t set_partition_id, common::ObIArray<ObPartitionInfo>& partition_infos,
const bool need_fetch_faillist = false) override; const bool need_fetch_faillist = false, const bool filter_flag_replica = true) override;
virtual int prefetch(const uint64_t tenant_id, const uint64_t start_table_id, const int64_t set_partition_id, virtual int prefetch(const uint64_t tenant_id, const uint64_t start_table_id, const int64_t set_partition_id,
common::ObIArray<ObPartitionInfo>& partition_infos, bool ignore_row_checksum, common::ObIArray<ObPartitionInfo>& partition_infos, bool ignore_row_checksum,
......
...@@ -53,7 +53,8 @@ int ObPersistentPartitionTable::init(ObISQLClient& sql_proxy, ObServerConfig* co ...@@ -53,7 +53,8 @@ int ObPersistentPartitionTable::init(ObISQLClient& sql_proxy, ObServerConfig* co
} }
int ObPersistentPartitionTable::get(const uint64_t table_id, const int64_t partition_id, int ObPersistentPartitionTable::get(const uint64_t table_id, const int64_t partition_id,
ObPartitionInfo& partition_info, const bool need_fetch_faillist, const int64_t cluster_id) ObPartitionInfo& partition_info, const bool need_fetch_faillist, const int64_t cluster_id,
const bool filter_flag_replica)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (!is_inited()) { if (!is_inited()) {
...@@ -65,19 +66,16 @@ int ObPersistentPartitionTable::get(const uint64_t table_id, const int64_t parti ...@@ -65,19 +66,16 @@ int ObPersistentPartitionTable::get(const uint64_t table_id, const int64_t parti
} else if (NULL == partition_info.get_allocator()) { } else if (NULL == partition_info.get_allocator()) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("partition info's allocator must set", K(ret), K(partition_info.get_allocator())); LOG_WARN("partition info's allocator must set", K(ret), K(partition_info.get_allocator()));
} else { } else if (OB_FAIL(get_partition_info(
const bool filter_flag_replica = true; table_id, partition_id, filter_flag_replica, partition_info, need_fetch_faillist, cluster_id))) {
if (OB_FAIL(get_partition_info( LOG_WARN("get_partition_info failed", K(cluster_id), KT(table_id), K(partition_id), K(filter_flag_replica), K(ret));
table_id, partition_id, filter_flag_replica, partition_info, need_fetch_faillist, cluster_id))) {
LOG_WARN(
"get_partition_info failed", K(cluster_id), KT(table_id), K(partition_id), K(filter_flag_replica), K(ret));
}
} }
return ret; return ret;
} }
int ObPersistentPartitionTable::prefetch_by_table_id(const uint64_t tenant_id, const uint64_t start_table_id, int ObPersistentPartitionTable::prefetch_by_table_id(const uint64_t tenant_id, const uint64_t start_table_id,
const int64_t start_partition_id, ObIArray<ObPartitionInfo>& partition_infos, const bool need_fetch_faillist) const int64_t start_partition_id, ObIArray<ObPartitionInfo>& partition_infos, const bool need_fetch_faillist,
const bool filter_flag_replica)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (!is_inited()) { if (!is_inited()) {
...@@ -92,7 +90,6 @@ int ObPersistentPartitionTable::prefetch_by_table_id(const uint64_t tenant_id, c ...@@ -92,7 +90,6 @@ int ObPersistentPartitionTable::prefetch_by_table_id(const uint64_t tenant_id, c
} else { } else {
ObPartitionTableProxyFactory factory(*sql_proxy_, merge_error_cb_, config_); ObPartitionTableProxyFactory factory(*sql_proxy_, merge_error_cb_, config_);
ObPartitionTableProxy* proxy = NULL; ObPartitionTableProxy* proxy = NULL;
const bool filter_flag_replica = true;
int64_t fetch_count = GCONF.partition_table_scan_batch_count; int64_t fetch_count = GCONF.partition_table_scan_batch_count;
if (OB_FAIL(factory.get_proxy(start_table_id, proxy))) { if (OB_FAIL(factory.get_proxy(start_table_id, proxy))) {
LOG_WARN("get partition table proxy failed", K(ret), K(start_table_id)); LOG_WARN("get partition table proxy failed", K(ret), K(start_table_id));
......
...@@ -39,14 +39,15 @@ public: ...@@ -39,14 +39,15 @@ public:
} }
virtual int get(const uint64_t table_id, const int64_t partition_id, ObPartitionInfo& partition_info, virtual int get(const uint64_t table_id, const int64_t partition_id, ObPartitionInfo& partition_info,
const bool need_fetch_faillist = false, const int64_t cluster_id = common::OB_INVALID_ID) override; const bool need_fetch_faillist = false, const int64_t cluster_id = common::OB_INVALID_ID,
const bool filter_flag_replica = true) override;
virtual int batch_fetch_partition_infos(const common::ObIArray<common::ObPartitionKey>& keys, virtual int batch_fetch_partition_infos(const common::ObIArray<common::ObPartitionKey>& keys,
common::ObIAllocator& allocator, common::ObArray<ObPartitionInfo*>& partitions, common::ObIAllocator& allocator, common::ObArray<ObPartitionInfo*>& partitions,
const int64_t cluster_id = common::OB_INVALID_ID) override; const int64_t cluster_id = common::OB_INVALID_ID) override;
virtual int prefetch_by_table_id(const uint64_t tenant_id, const uint64_t start_table_id, virtual int prefetch_by_table_id(const uint64_t tenant_id, const uint64_t start_table_id,
const int64_t start_partition_id, common::ObIArray<ObPartitionInfo>& partition_infos, const int64_t start_partition_id, common::ObIArray<ObPartitionInfo>& partition_infos,
const bool need_fetch_faillist = false) override; const bool need_fetch_faillist = false, const bool filter_flag_replica = true) override;
virtual int prefetch(const uint64_t tenant_id, const uint64_t start_table_id, const int64_t start_partition_id, virtual int prefetch(const uint64_t tenant_id, const uint64_t start_table_id, const int64_t start_partition_id,
common::ObIArray<ObPartitionInfo>& partition_infos, bool ignore_row_checksum, common::ObIArray<ObPartitionInfo>& partition_infos, bool ignore_row_checksum,
......
...@@ -41,9 +41,10 @@ int ObRemotePartitionTableOperator::init( ...@@ -41,9 +41,10 @@ int ObRemotePartitionTableOperator::init(
} }
int ObRemotePartitionTableOperator::get(const uint64_t table_id, const int64_t partition_id, int ObRemotePartitionTableOperator::get(const uint64_t table_id, const int64_t partition_id,
ObPartitionInfo& partition_info, const bool need_fetch_faillist, const int64_t cluster_id) ObPartitionInfo& partition_info, const bool need_fetch_faillist, const int64_t cluster_id,
const bool filter_flag_replica)
{ {
UNUSEDx(table_id, partition_id, partition_info, need_fetch_faillist, cluster_id); UNUSEDx(table_id, partition_id, partition_info, need_fetch_faillist, cluster_id, filter_flag_replica);
return OB_NOT_SUPPORTED; return OB_NOT_SUPPORTED;
} }
...@@ -61,11 +62,12 @@ int ObRemotePartitionTableOperator::batch_execute(const common::ObIArray<ObParti ...@@ -61,11 +62,12 @@ int ObRemotePartitionTableOperator::batch_execute(const common::ObIArray<ObParti
} }
int ObRemotePartitionTableOperator::prefetch_by_table_id(const uint64_t tenant_id, const uint64_t table_id, int ObRemotePartitionTableOperator::prefetch_by_table_id(const uint64_t tenant_id, const uint64_t table_id,
const int64_t partition_id, common::ObIArray<ObPartitionInfo>& partition_infos, const bool need_fetch_faillist) const int64_t partition_id, common::ObIArray<ObPartitionInfo>& partition_infos, const bool need_fetch_faillist,
const bool filter_flag_replica)
{ {
UNUSEDx(tenant_id, table_id, partition_id); UNUSEDx(tenant_id, table_id, partition_id);
UNUSEDx(partition_infos, need_fetch_faillist); UNUSEDx(partition_infos, need_fetch_faillist, filter_flag_replica);
return OB_NOT_SUPPORTED; return OB_NOT_SUPPORTED;
} }
......
...@@ -37,7 +37,8 @@ public: ...@@ -37,7 +37,8 @@ public:
{} {}
int init(share::schema::ObMultiVersionSchemaService* schema_service, ObRemoteSqlProxy* remote_sql_proxy); int init(share::schema::ObMultiVersionSchemaService* schema_service, ObRemoteSqlProxy* remote_sql_proxy);
virtual int get(const uint64_t table_id, const int64_t partition_id, ObPartitionInfo& partition_info, virtual int get(const uint64_t table_id, const int64_t partition_id, ObPartitionInfo& partition_info,
const bool need_fetch_faillist = false, const int64_t cluster_id = common::OB_INVALID_ID) override; const bool need_fetch_faillist = false, const int64_t cluster_id = common::OB_INVALID_ID,
const bool filter_flag_replica = true) override;
virtual int batch_fetch_partition_infos(const common::ObIArray<common::ObPartitionKey>& keys, virtual int batch_fetch_partition_infos(const common::ObIArray<common::ObPartitionKey>& keys,
common::ObIAllocator& allocator, common::ObArray<ObPartitionInfo*>& partitions, common::ObIAllocator& allocator, common::ObArray<ObPartitionInfo*>& partitions,
const int64_t cluster_id = common::OB_INVALID_ID) override; const int64_t cluster_id = common::OB_INVALID_ID) override;
...@@ -45,7 +46,8 @@ public: ...@@ -45,7 +46,8 @@ public:
// not supported interface // not supported interface
virtual int batch_execute(const common::ObIArray<ObPartitionReplica>& replicas) override; virtual int batch_execute(const common::ObIArray<ObPartitionReplica>& replicas) override;
virtual int prefetch_by_table_id(const uint64_t tenant_id, const uint64_t table_id, const int64_t partition_id, virtual int prefetch_by_table_id(const uint64_t tenant_id, const uint64_t table_id, const int64_t partition_id,
common::ObIArray<ObPartitionInfo>& partition_infos, const bool need_fetch_faillist = false) override; common::ObIArray<ObPartitionInfo>& partition_infos, const bool need_fetch_faillist = false,
const bool filter_flag_replica = true) override;
virtual int prefetch(const uint64_t tenant_id, const uint64_t table_id, const int64_t partition_id, virtual int prefetch(const uint64_t tenant_id, const uint64_t table_id, const int64_t partition_id,
common::ObIArray<ObPartitionInfo>& partition_infos, bool ignore_row_checksum, common::ObIArray<ObPartitionInfo>& partition_infos, bool ignore_row_checksum,
......
...@@ -50,10 +50,10 @@ int ObRpcPartitionTable::init(ObCommonRpcProxy& rpc_proxy, ObRsMgr& rs_mgr, ObSe ...@@ -50,10 +50,10 @@ int ObRpcPartitionTable::init(ObCommonRpcProxy& rpc_proxy, ObRsMgr& rs_mgr, ObSe
} }
int ObRpcPartitionTable::get(const uint64_t table_id, const int64_t partition_id, ObPartitionInfo& partition_info, int ObRpcPartitionTable::get(const uint64_t table_id, const int64_t partition_id, ObPartitionInfo& partition_info,
const bool need_fetch_faillist, const int64_t cluster_id) const bool need_fetch_faillist, const int64_t cluster_id, const bool filter_flag_replica)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
UNUSED(need_fetch_faillist); UNUSEDx(need_fetch_faillist, filter_flag_replica);
ObAddr rs_addr; ObAddr rs_addr;
partition_info.set_table_id(table_id); partition_info.set_table_id(table_id);
partition_info.set_partition_id(partition_id); partition_info.set_partition_id(partition_id);
...@@ -347,10 +347,11 @@ int ObRpcPartitionTable::fetch_root_partition_from_obs_v1( ...@@ -347,10 +347,11 @@ int ObRpcPartitionTable::fetch_root_partition_from_obs_v1(
} }
int ObRpcPartitionTable::prefetch_by_table_id(const uint64_t tenant_id, const uint64_t start_table_id, int ObRpcPartitionTable::prefetch_by_table_id(const uint64_t tenant_id, const uint64_t start_table_id,
const int64_t start_partition_id, ObIArray<ObPartitionInfo>& partition_infos, const bool need_fetch_faillist) const int64_t start_partition_id, ObIArray<ObPartitionInfo>& partition_infos, const bool need_fetch_faillist,
const bool filter_flag_replica)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
UNUSED(need_fetch_faillist); UNUSEDx(need_fetch_faillist, filter_flag_replica);
if (!is_inited_) { if (!is_inited_) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret)); LOG_WARN("not init", K(ret));
......
...@@ -41,11 +41,12 @@ public: ...@@ -41,11 +41,12 @@ public:
} }
virtual int get(const uint64_t table_id, const int64_t partition_id, ObPartitionInfo& partition_info, virtual int get(const uint64_t table_id, const int64_t partition_id, ObPartitionInfo& partition_info,
const bool need_fetch_faillist = false, const int64_t cluster_id = common::OB_INVALID_ID) override; const bool need_fetch_faillist = false, const int64_t cluster_id = common::OB_INVALID_ID,
const bool filter_flag_replica = true) override;
virtual int prefetch_by_table_id(const uint64_t tenant_id, const uint64_t start_table_id, virtual int prefetch_by_table_id(const uint64_t tenant_id, const uint64_t start_table_id,
const int64_t start_partition_id, common::ObIArray<ObPartitionInfo>& partition_infos, const int64_t start_partition_id, common::ObIArray<ObPartitionInfo>& partition_infos,
const bool need_fetch_faillist = false) override; const bool need_fetch_faillist = false, const bool filter_flag_replica = true) override;
virtual int prefetch(const uint64_t tenant_id, const uint64_t start_table_id, const int64_t start_partition_id, virtual int prefetch(const uint64_t tenant_id, const uint64_t start_table_id, const int64_t start_partition_id,
common::ObIArray<ObPartitionInfo>& partition_info, bool ignore_row_checksum, common::ObIArray<ObPartitionInfo>& partition_info, bool ignore_row_checksum,
......
...@@ -11602,6 +11602,9 @@ int ObPartitionService::check_all_replica_major_sstable_exist( ...@@ -11602,6 +11602,9 @@ int ObPartitionService::check_all_replica_major_sstable_exist(
common::ObMemberList member_list; common::ObMemberList member_list;
const uint64_t fetch_tenant_id = const uint64_t fetch_tenant_id =
is_inner_table(index_table_id) ? OB_SYS_TENANT_ID : extract_tenant_id(index_table_id); is_inner_table(index_table_id) ? OB_SYS_TENANT_ID : extract_tenant_id(index_table_id);
const bool need_fail_list = false;
const int64_t cluster_id = OB_INVALID_ID; // local cluster
const bool filter_flag_replica = false;
if (OB_FAIL(schema_service_->get_tenant_full_schema_guard(fetch_tenant_id, schema_guard))) { if (OB_FAIL(schema_service_->get_tenant_full_schema_guard(fetch_tenant_id, schema_guard))) {
STORAGE_LOG(WARN, "fail to get schema guard", K(ret), K(fetch_tenant_id)); STORAGE_LOG(WARN, "fail to get schema guard", K(ret), K(fetch_tenant_id));
} else if (OB_FAIL(schema_guard.get_table_schema(index_table_id, index_schema))) { } else if (OB_FAIL(schema_guard.get_table_schema(index_table_id, index_schema))) {
...@@ -11623,7 +11626,12 @@ int ObPartitionService::check_all_replica_major_sstable_exist( ...@@ -11623,7 +11626,12 @@ int ObPartitionService::check_all_replica_major_sstable_exist(
} else if (OB_ISNULL(GCTX.pt_operator_)) { } else if (OB_ISNULL(GCTX.pt_operator_)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "error unexpected, pt operator must not be NULL", K(ret)); STORAGE_LOG(WARN, "error unexpected, pt operator must not be NULL", K(ret));
} else if (OB_FAIL(GCTX.pt_operator_->get(pkey.get_table_id(), pkey.get_partition_id(), partition_info))) { } else if (OB_FAIL(GCTX.pt_operator_->get(pkey.get_table_id(),
pkey.get_partition_id(),
partition_info,
need_fail_list,
cluster_id,
filter_flag_replica))) {
STORAGE_LOG(WARN, "fail to get partition info", K(ret), K(pkey)); STORAGE_LOG(WARN, "fail to get partition info", K(ret), K(pkey));
} else if (OB_FAIL(partition_info.filter(filter))) { } else if (OB_FAIL(partition_info.filter(filter))) {
STORAGE_LOG(WARN, "fail to filter partition", K(ret)); STORAGE_LOG(WARN, "fail to filter partition", K(ret));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册