提交 26d446e4 编写于 作者: Z zx0 提交者: wangzelin.wzl

Optimize query performance of __all_virtual_trans_stat and bugfix

上级 0d4cad40
......@@ -51,6 +51,7 @@ void ObGVTransStat::destroy()
int ObGVTransStat::prepare_start_to_read_()
{
int ret = OB_SUCCESS;
trans_stat_iter_.reset();
if (NULL == allocator_ || NULL == trans_service_) {
SERVER_LOG(WARN,
"invalid argument, allocator_ or trans_service_ is null",
......@@ -59,12 +60,9 @@ int ObGVTransStat::prepare_start_to_read_()
"trans_service",
OB_P(trans_service_));
ret = OB_INVALID_ARGUMENT;
} else if (OB_SUCCESS != (ret = trans_service_->iterate_partition(partition_iter_))) {
TRANS_LOG(WARN, "iterate partition error", K(ret));
} else if (!partition_iter_.is_ready()) {
TRANS_LOG(WARN, "ObPartitionIterator is not ready");
ret = OB_ERR_UNEXPECTED;
} else if (OB_SUCCESS != (ret = trans_stat_iter_.set_ready())) { // set ready for the first count
} else if (OB_SUCCESS != (ret = trans_service_->iterate_trans_stat_without_partition(trans_stat_iter_))) {
TRANS_LOG(WARN, "iterate transaction stat error", K(ret));
} else if (OB_SUCCESS != (ret = trans_stat_iter_.set_ready())) { // set ready for the first count
TRANS_LOG(WARN, "ObTransStatIterator set ready error", K(ret));
} else {
start_to_read_ = true;
......@@ -75,27 +73,9 @@ int ObGVTransStat::prepare_start_to_read_()
int ObGVTransStat::get_next_trans_info_(ObTransStat& trans_stat)
{
int ret = OB_SUCCESS;
ObTransStat tmp_trans_stat;
ObPartitionKey partition;
bool bool_ret = true;
while (bool_ret && OB_SUCCESS == ret) {
if (OB_ITER_END == (ret = trans_stat_iter_.get_next(tmp_trans_stat))) {
if (OB_SUCCESS != (ret = partition_iter_.get_next(partition))) {
if (OB_ITER_END != ret) {
TRANS_LOG(WARN, "ObPartitionIterator get next partition error", K(ret));
}
} else {
trans_stat_iter_.reset();
if (OB_SUCCESS != (ret = trans_service_->iterate_trans_stat(partition, trans_stat_iter_))) {
TRANS_LOG(WARN, "iterate transaction stat error", K(ret), K(partition));
}
}
} else {
bool_ret = false;
}
}
int ret = trans_stat_iter_.get_next(tmp_trans_stat);
if (OB_SUCC(ret)) {
trans_stat = tmp_trans_stat;
......
......@@ -101,7 +101,6 @@ private:
private:
transaction::ObTransService* trans_service_;
transaction::ObPartitionIterator partition_iter_;
transaction::ObTransStatIterator trans_stat_iter_;
private:
......
......@@ -77,7 +77,9 @@ int ObMvccValueIterator::init(const ObIMvccCtx& ctx, const ObTransSnapInfo& snap
TRANS_LOG(WARN, "fail to find start pos for iterator", K(ret));
} else {
if (GCONF.enable_sql_audit) {
mark_trans_node_for_elr(snapshot_info.get_snapshot_version(), query_flag.is_prewarm());
if (OB_SUCCESS != (ret = mark_trans_node_for_elr(snapshot_info.get_snapshot_version(), query_flag.is_prewarm()))) {
//do nothing
}
}
// set has_read_relocated_row flag true when reading relocated row
if (value->is_has_relocated()) {
......@@ -91,24 +93,29 @@ int ObMvccValueIterator::init(const ObIMvccCtx& ctx, const ObTransSnapInfo& snap
return ret;
}
void ObMvccValueIterator::mark_trans_node_for_elr(const int64_t read_snapshot, const bool is_prewarm)
int ObMvccValueIterator::mark_trans_node_for_elr(const int64_t read_snapshot, const bool is_prewarm)
{
int ret = OB_SUCCESS;
if (NULL != version_iter_) {
ObMemtableCtx *curr_mt_ctx = static_cast<ObMemtableCtx *>(const_cast<ObIMvccCtx *>(ctx_));
transaction::ObTransCtx *trans_ctx = curr_mt_ctx->get_trans_ctx();
if (NULL != trans_ctx) {
if (!trans_ctx->is_bounded_staleness_read() && curr_mt_ctx->is_for_replay()) {
TRANS_LOG(WARN, "strong consistent read follower", K(*trans_ctx), K(ctx_));
ret = OB_NOT_MASTER;
}
}
// do not set barrier info when transaction node is ELR node
if (!is_prewarm && !version_iter_->is_elr()) {
version_iter_->clear_safe_read_barrier();
ObMemtableCtx* curr_mt_ctx = static_cast<ObMemtableCtx*>(const_cast<ObIMvccCtx*>(ctx_));
transaction::ObTransCtx* trans_ctx = curr_mt_ctx->get_trans_ctx();
if (NULL != trans_ctx) {
if (!trans_ctx->is_bounded_staleness_read() && curr_mt_ctx->is_for_replay()) {
TRANS_LOG(WARN, "strong consistent read follower", K(*trans_ctx), K(ctx_));
}
version_iter_->set_safe_read_barrier(trans_ctx->is_bounded_staleness_read());
version_iter_->set_inc_num(trans_ctx->get_trans_id().get_inc_num());
}
version_iter_->set_snapshot_version_barrier(read_snapshot);
}
}
return ret;
}
int ObMvccValueIterator::find_start_pos(const int64_t read_snapshot)
......
......@@ -123,7 +123,7 @@ private:
int find_trans_node_below_version(const int64_t read_snapshot, const bool is_safe_read);
int check_trans_node_readable(const int64_t read_snapshot);
void print_conflict_trace_log();
void mark_trans_node_for_elr(const int64_t read_snapshot, const bool is_prewarm);
int mark_trans_node_for_elr(const int64_t read_snapshot, const bool is_prewarm);
void move_to_next_node();
......
......@@ -593,7 +593,7 @@ public:
class ObDistTransCtx : public ObTransCtx {
friend class CtxLock;
friend class IterateTransStatFunctor;
friend class IterateTransStatForKeyFunctor;
public:
explicit ObDistTransCtx(const char* ctx_type_str, const int64_t ctx_type)
......
......@@ -1121,25 +1121,6 @@ void ObPartitionTransCtxMgr::reset_elr_statistic()
ATOMIC_STORE(&end_trans_by_self_count_, 0);
}
int ObPartitionTransCtxMgr::iterate_trans_stat(ObTransStatIterator& trans_stat_iter)
{
int ret = OB_SUCCESS;
RLockGuard guard(rwlock_);
if (IS_NOT_INIT) {
TRANS_LOG(WARN, "ObPartitionTransCtxMgr not inited");
ret = OB_NOT_INIT;
} else {
IterateTransStatFunctor fn(trans_stat_iter);
if (OB_FAIL(ctx_map_mgr_.foreach_ctx(fn))) {
TRANS_LOG(WARN, "for each transaction context error", KR(ret), "manager", *this);
}
}
return ret;
}
int ObPartitionTransCtxMgr::set_last_restore_log_id(const uint64_t last_restore_log_id)
{
int ret = OB_SUCCESS;
......@@ -4139,26 +4120,35 @@ int ObPartTransCtxMgr::check_ctx_create_timestamp_elapsed(const ObPartitionKey&
return ret;
}
int ObPartTransCtxMgr::iterate_trans_stat(const ObPartitionKey& partition, ObTransStatIterator& trans_stat_iter)
//iterate_trans_stat_without_partition achieves complete transaction information at the server level without partition
int ObPartTransCtxMgr::iterate_trans_stat_without_partition(ObTransStatIterator& trans_stat_iter)
{
int ret = OB_SUCCESS;
ObPartitionTransCtxMgr* ctx_mgr = NULL;
ObTimeGuard tg("ObPartTransCtxMgr iterate_trans_stat_without_partition", 5000000);
DRWLock::RDLockGuard guard(rwlock_);
if (IS_NOT_INIT) {
TRANS_LOG(WARN, "ObPartTransCtxMgr not inited");
ret = OB_NOT_INIT;
} else if (OB_UNLIKELY(!partition.is_valid())) {
TRANS_LOG(WARN, "invalid argument", K(partition));
ret = OB_INVALID_ARGUMENT;
} else if (OB_ISNULL(ctx_mgr = get_partition_trans_ctx_mgr(partition))) {
TRANS_LOG(WARN, "get partition transaction context manager error", K(partition));
} else if (OB_ISNULL(ctx_map_)) {
TRANS_LOG(WARN, "get partition transaction context manager error");
ret = OB_PARTITION_NOT_EXIST;
} else if (OB_FAIL(ctx_mgr->iterate_trans_stat(trans_stat_iter))) {
TRANS_LOG(WARN, "iterate transaction stat error", KR(ret), K(partition));
} else {
TRANS_LOG(DEBUG, "ObTransStatIterator set ready success", K(partition));
//Traverse 64 map memory
for (int i=0; i<CONTEXT_MAP_COUNT; i++) {
CtxMap *tmp_ctx = ctx_map_ + i;
if (OB_NOT_NULL(tmp_ctx)) {
IterateTransStatForKeyFunctor fn(trans_stat_iter);
if (OB_SUCCESS != (ret = tmp_ctx->for_each(fn))) {
TRANS_LOG(WARN, "iterate transaction stat for each error", KR(ret));
}
}
}
}
if (OB_SUCCESS == ret) {
tg.click();
TRANS_LOG(DEBUG, "ObTransStatIterator set ready success");
}
return ret;
......
......@@ -268,7 +268,6 @@ public:
return ATOMIC_LOAD(&end_trans_by_self_count_);
}
void reset_elr_statistic();
int iterate_trans_stat(ObTransStatIterator& trans_stat_iter);
int iterate_trans_lock_stat(ObTransLockStatIterator& trans_lock_stat_iter);
int iterate_trans_result_info_in_TRIM(ObTransResultInfoStatIterator& iter);
int iterate_trans_table(const uint64_t end_log_id, blocksstable::ObMacroBlockWriter& writer);
......@@ -757,6 +756,7 @@ private:
};
class ObTransCtxMgrImpl {
protected:
enum { CACHE_NUM = 17313, CONTEXT_MAP_COUNT = 1 << 6 };
public:
......@@ -926,8 +926,8 @@ public:
int iterate_partition(ObPartitionIterator& partition_iter);
int iterate_partition(ObELRStatSummary& elr_stat);
int iterate_partition_mgr_stat(ObTransPartitionMgrStatIterator& partition_mgr_stat_iter, const ObAddr& addr);
// get transaction stat iterator by partition
int iterate_trans_stat(const common::ObPartitionKey& partition, ObTransStatIterator& trans_stat_iter);
// get transaction stat iterator without partition
int iterate_trans_stat_without_partition(ObTransStatIterator& trans_stat_iter);
int print_all_trans_ctx(const common::ObPartitionKey& partition);
// get transaction lock stat iterator by partition
int iterate_trans_lock_stat(const common::ObPartitionKey& partition, ObTransLockStatIterator& trans_lock_stat_iter);
......
......@@ -671,17 +671,17 @@ public:
}
};
class IterateTransStatFunctor {
class IterateTransStatForKeyFunctor {
public:
explicit IterateTransStatFunctor(ObTransStatIterator& trans_stat_iter) : trans_stat_iter_(trans_stat_iter)
explicit IterateTransStatForKeyFunctor(ObTransStatIterator& trans_stat_iter) : trans_stat_iter_(trans_stat_iter)
{}
bool operator()(const ObTransID& trans_id, ObTransCtx* ctx_base)
bool operator()(const ObTransKey& trans_key, ObTransCtx* ctx_base)
{
int tmp_ret = common::OB_SUCCESS;
bool bool_ret = false;
if (!trans_id.is_valid() || OB_ISNULL(ctx_base)) {
TRANS_LOG(WARN, "invalid argument", K(trans_id), "ctx", OB_P(ctx_base));
if (!trans_key.is_valid() || OB_ISNULL(ctx_base)) {
TRANS_LOG(WARN, "invalid argument", K(trans_key), "ctx", OB_P(ctx_base));
tmp_ret = OB_INVALID_ARGUMENT;
// If you encounter a situation where part_ctx has not been init yet,
// skip it directly, there will be a background thread retry
......@@ -747,7 +747,7 @@ public:
if (OB_SUCCESS != (tmp_ret = part_ctx->get_participants_copy(participants_arr))) {
TRANS_LOG(WARN, "ObTransStat get participants copy error", K(tmp_ret));
} else if (OB_SUCCESS != (tmp_ret = trans_stat.init(part_ctx->addr_,
trans_id,
trans_key.get_trans_id(),
part_ctx->tenant_id_,
part_ctx->is_exiting_,
part_ctx->is_readonly_,
......@@ -777,7 +777,7 @@ public:
"ObTransStat init error",
K(tmp_ret),
K(has_decided),
K(trans_id),
K(trans_key.get_trans_id()),
"addr",
part_ctx->addr_,
"tenant_id",
......
......@@ -89,7 +89,7 @@ private:
// participant transaction context
class ObPartTransCtx : public ObDistTransCtx, public ObTsCbTask {
friend class IterateTransStatFunctor;
friend class IterateTransStatForKeyFunctor;
public:
ObPartTransCtx()
......
......@@ -6733,7 +6733,7 @@ int ObTransService::clear_all_ctx(const common::ObPartitionKey& partition)
return ret;
}
int ObTransService::iterate_trans_stat(const common::ObPartitionKey& partition, ObTransStatIterator& trans_stat_iter)
int ObTransService::iterate_trans_stat_without_partition(ObTransStatIterator& trans_stat_iter)
{
int ret = OB_SUCCESS;
const int64_t PRINT_SCHE_COUNT = 128;
......@@ -6744,17 +6744,10 @@ int ObTransService::iterate_trans_stat(const common::ObPartitionKey& partition,
} else if (OB_UNLIKELY(!is_running_)) {
TRANS_LOG(WARN, "ObTransService is not running");
ret = OB_NOT_RUNNING;
} else if (!partition.is_valid()) {
TRANS_LOG(WARN, "invalid argument", K(partition));
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(part_trans_ctx_mgr_.iterate_trans_stat(partition, trans_stat_iter))) {
TRANS_LOG(WARN, "iterate transaction stat error", KR(ret), K(partition));
} else if (OB_FAIL(slave_part_trans_ctx_mgr_.iterate_trans_stat(partition, trans_stat_iter))) {
TRANS_LOG(WARN, "iterate slave transaction stat error", KR(ret), K(partition));
} else if (OB_FAIL(trans_stat_iter.set_ready())) {
TRANS_LOG(WARN, "ObTransStatIterator set ready error", KR(ret), K(partition));
} else {
// do nothing
if (OB_FAIL(part_trans_ctx_mgr_.iterate_trans_stat_without_partition(trans_stat_iter))) {
TRANS_LOG(WARN, "iterate transaction stat error", KR(ret));
}
}
if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) {
sche_trans_ctx_mgr_.print_all_ctx(PRINT_SCHE_COUNT);
......
......@@ -396,8 +396,8 @@ public:
// get partition iterator
int iterate_partition(ObPartitionIterator& partition_iter);
int iterate_partition_mgr_stat(ObTransPartitionMgrStatIterator& partition_mgr_stat_iter);
// get transaction stat iterator by partition
int iterate_trans_stat(const common::ObPartitionKey& partition, ObTransStatIterator& trans_stat_iter);
// get transaction stat iterator without partition
int iterate_trans_stat_without_partition(ObTransStatIterator& trans_stat_iter);
int print_all_trans_ctx(const common::ObPartitionKey& partition);
// get the memory used condition of transaction module
int iterate_trans_memory_stat(ObTransMemStatIterator& mem_stat_iter);
......
......@@ -40,7 +40,7 @@ class ObIPartitionGroupGuard;
namespace transaction {
// slave participant transaction context
class ObSlaveTransCtx : public ObDistTransCtx {
friend class IterateTransStatFunctor;
friend class IterateTransStatForKeyFunctor;
public:
ObSlaveTransCtx() : ObDistTransCtx("slave_participant", ObTransCtxType::SLAVE_PARTICIPANT), mt_ctx_()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册