提交 1c24d38f 编写于 作者: O obdev 提交者: ob-robot

optimize the performance of reporting tablet stat

上级 2c298301
......@@ -1432,7 +1432,10 @@ int ObAdaptiveMergePolicy::get_adaptive_merge_reason(
if (AdaptiveMergeReason::NONE == reason && OB_TMP_FAIL(check_ineffecient_read(tablet_stat, tablet, reason))) {
LOG_WARN("failed to check ineffecient read", K(tmp_ret), K(ls_id), K(tablet_id));
}
LOG_INFO("DanLing Check tablet adaptive merge reason", K(reason), K(tablet_stat)); // TODO tmp log, remove later
if (REACH_TENANT_TIME_INTERVAL(10 * 1000 * 1000 /*10s*/)) {
LOG_INFO("Check tablet adaptive merge reason", K(reason), K(tablet_stat)); // TODO tmp log, remove later
}
}
return ret;
}
......
......@@ -1195,7 +1195,8 @@ int ObTabletMergeFinishTask::try_report_tablet_stat_after_mini(ObTabletMergeCtx
} else if (OB_FAIL(ObTableEstimator::estimate_row_count_for_scan(
base_input, ranges, part_estimate, records))) {
LOG_WARN("failed to estimate row counts", K(ret), K(part_estimate), K(records));
} else if (0 == part_estimate.logical_row_count_ && 0 == part_estimate.physical_row_count_) {
} else if (0 == part_estimate.logical_row_count_ &&
ObTabletStat::MERGE_REPORT_MIN_ROW_CNT >= part_estimate.physical_row_count_) {
} else {
ObTabletStat report_stat;
report_stat.ls_id_ = ctx.param_.ls_id_.id(),
......
......@@ -77,11 +77,20 @@ bool ObTabletStat::is_valid() const
return ls_id_ > 0 && tablet_id_ > 0;
}
bool ObTabletStat::is_empty_query() const
bool ObTabletStat::check_need_report() const
{
bool bret = false;
if (0 == scan_physical_row_cnt_ && 0 == scan_micro_block_cnt_) {
bret = true;
if (0 != query_cnt_) { // report by query
if (QUERY_REPORT_MIN_ROW_CNT <= scan_physical_row_cnt_ ||
QUERY_REPORT_MIN_MICRO_BLOCK_CNT <= scan_micro_block_cnt_ ||
QUERY_REPORT_MIN_SCAN_TABLE_CNT <= exist_row_total_table_cnt_) {
bret = true;
}
} else if (0 != merge_cnt_) { // report by compaction
bret = MERGE_REPORT_MIN_ROW_CNT <= merge_physical_row_cnt_;
} else { // invalid tablet stat
bret = false;
}
return bret;
}
......@@ -512,26 +521,20 @@ int ObTenantTabletStatMgr::report_stat(const ObTabletStat &stat)
} else if (!stat.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get invalid arguments", K(ret), K(stat));
} else if (!stat.check_need_report()) {
} else {
int64_t retry_cnt = 0;
while (retry_cnt < MAX_REPORT_RETRY_CNT) {
uint64_t pending_cur = ATOMIC_LOAD(&pending_cursor_);
uint64_t report_cur = ATOMIC_LOAD(&report_cursor_);
if (pending_cur - report_cur + 1 == DEFAULT_MAX_PENDING_CNT) { // full queue
uint64_t pending_cur = pending_cursor_;
if (pending_cur - report_cursor_ >= DEFAULT_MAX_PENDING_CNT) { // first check full queue with dirty read
if (REACH_TENANT_TIME_INTERVAL(10 * 1000L * 1000L/*10s*/)) {
LOG_INFO("report_queue is full, wait to process", K(report_cursor_), K(pending_cur), K(stat));
}
} else if (FALSE_IT(pending_cur = ATOMIC_FAA(&pending_cursor_, 1))) {
} else if (pending_cur - report_cursor_ >= DEFAULT_MAX_PENDING_CNT) { // double check
if (REACH_TENANT_TIME_INTERVAL(10 * 1000L * 1000L/*10s*/)) {
LOG_INFO("report_queue is full, wait to process", K(report_cur), K(pending_cur), K(stat));
}
break;
} else if (pending_cur != ATOMIC_CAS(&pending_cursor_, pending_cur, pending_cur + 1)) {
++retry_cnt;
} else {
report_queue_[pending_cur % DEFAULT_MAX_PENDING_CNT] = stat; // allow dirty write
break;
LOG_INFO("report_queue is full, wait to process", K(report_cursor_), K(pending_cur), K(stat));
}
}
if (retry_cnt == MAX_REPORT_RETRY_CNT) {
// pending cursor has been moved in other thread, ignore this tablet_stat
LOG_INFO("pending cursor has beed moved in other thread, ignore current stat", K(stat));
} else {
report_queue_[pending_cur % DEFAULT_MAX_PENDING_CNT] = stat;
}
}
return ret;
......@@ -687,8 +690,11 @@ void ObTenantTabletStatMgr::dump_tablet_stat_status()
void ObTenantTabletStatMgr::process_stats()
{
int tmp_ret = OB_SUCCESS;
uint64_t start_idx = ATOMIC_LOAD(&report_cursor_);
const uint64_t end_idx = ATOMIC_LOAD(&pending_cursor_);
const uint64_t start_idx = report_cursor_;
const uint64_t pending_cur = ATOMIC_LOAD(&pending_cursor_);
uint64_t end_idx = (pending_cur > start_idx + DEFAULT_MAX_PENDING_CNT)
? start_idx + DEFAULT_MAX_PENDING_CNT
: pending_cur;
if (start_idx == end_idx) { // empty queue
} else {
......@@ -700,7 +706,7 @@ void ObTenantTabletStatMgr::process_stats()
LOG_WARN("failed to update tablet stat", K(tmp_ret), K(cur_stat));
}
}
ATOMIC_STORE(&report_cursor_, end_idx);
report_cursor_ = pending_cur; // only TabletStatUpdater update this value.
}
}
......
......@@ -49,7 +49,7 @@ public:
~ObTabletStat() = default;
void reset() { MEMSET(this, 0, sizeof(ObTabletStat)); }
bool is_valid() const;
bool is_empty_query() const;
bool check_need_report() const;
ObTabletStat& operator=(const ObTabletStat &other);
ObTabletStat& operator+=(const ObTabletStat &other);
ObTabletStat& archive(int64_t factor);
......@@ -74,6 +74,10 @@ public:
static constexpr int64_t BASIC_TABLE_CNT_THRESHOLD = 5;
static constexpr int64_t BASIC_MICRO_BLOCK_CNT_THRESHOLD = 16;
static constexpr int64_t BASIC_ROW_CNT_THRESHOLD = 10000; // TODO(@Danling) make it a comfiguration item
static constexpr int64_t QUERY_REPORT_MIN_ROW_CNT = 100;
static constexpr int64_t QUERY_REPORT_MIN_MICRO_BLOCK_CNT = 10;
static constexpr int64_t QUERY_REPORT_MIN_SCAN_TABLE_CNT = 2;
static constexpr int64_t MERGE_REPORT_MIN_ROW_CNT = 100;
public:
int64_t ls_id_;
uint64_t tablet_id_;
......
......@@ -113,6 +113,7 @@ void TestTenantTabletStatMgr::batch_report_stat(int64_t report_num)
curr_stat.ls_id_ = 1;
curr_stat.tablet_id_ = 10001 + i;
curr_stat.query_cnt_ = 100 * (i + 1);
curr_stat.scan_physical_row_cnt_ = 10000 + i;
std::thread sub_report_thread(report, stat_mgr_, curr_stat);
if (sub_report_thread.joinable()) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册