提交 025f70b8 编写于 作者: O obdev 提交者: wangzelin.wzl

force mini minor to reduce mini sstable cnt

上级 c1a3a90f
...@@ -1320,6 +1320,18 @@ int ObTableStore::deal_with_minor_result( ...@@ -1320,6 +1320,18 @@ int ObTableStore::deal_with_minor_result(
return ret; return ret;
} }
void ObTableStore::adjust_minor_merge_boundary(int64_t &min_snapshot, int64_t &max_snapshot)
{
if (table_count_ >= OB_UNSAFE_TABLE_CNT) {
max_snapshot = INT64_MAX;
if (table_count_ >= OB_EMERGENCY_TABLE_CNT) {
min_snapshot = 0;
} else if (start_pos_ >= 0 && inc_pos_ > start_pos_) { // get last major sstable after start_pos_
min_snapshot = tables_[inc_pos_ - 1]->get_snapshot_version();
}
}
}
int ObTableStore::get_mini_minor_merge_tables( int ObTableStore::get_mini_minor_merge_tables(
const ObGetMergeTablesParam& param, const int64_t multi_version_start, ObGetMergeTablesResult& result) const ObGetMergeTablesParam& param, const int64_t multi_version_start, ObGetMergeTablesResult& result)
{ {
...@@ -1345,7 +1357,7 @@ int ObTableStore::get_mini_minor_merge_tables( ...@@ -1345,7 +1357,7 @@ int ObTableStore::get_mini_minor_merge_tables(
} else if (OB_FAIL(get_neighbour_freeze_info(merge_inc_base_version, freeze_info))) { // get freeze info } else if (OB_FAIL(get_neighbour_freeze_info(merge_inc_base_version, freeze_info))) { // get freeze info
LOG_WARN("failed to get freeze info", K(ret), K(merge_inc_base_version), K(PRETTY_TS(*this))); LOG_WARN("failed to get freeze info", K(ret), K(merge_inc_base_version), K(PRETTY_TS(*this)));
} else { } else {
const int64_t min_snapshot_version = freeze_info.prev.freeze_ts; int64_t min_snapshot_version = freeze_info.prev.freeze_ts;
// In the primary and standby database scenarios, // In the primary and standby database scenarios,
// the freeze_info of the standby library may not be refreshed for a long time, // the freeze_info of the standby library may not be refreshed for a long time,
// but then drag a minor from the primary database to update the major version. // but then drag a minor from the primary database to update the major version.
...@@ -1353,7 +1365,8 @@ int ObTableStore::get_mini_minor_merge_tables( ...@@ -1353,7 +1365,8 @@ int ObTableStore::get_mini_minor_merge_tables(
// If directly do mini minor merge, it will cause the loss of the previous major data, // If directly do mini minor merge, it will cause the loss of the previous major data,
// so the mini minor merge should also use snapshot_gc_ts as the boundary // so the mini minor merge should also use snapshot_gc_ts as the boundary
// Unless the multi_version exceeding snapshot_gc_ts is continuous // Unless the multi_version exceeding snapshot_gc_ts is continuous
const int64_t max_snapshot_version = freeze_info.next.freeze_version > 0 ? freeze_info.next.freeze_ts : INT64_MAX; int64_t max_snapshot_version = freeze_info.next.freeze_version > 0 ? freeze_info.next.freeze_ts : INT64_MAX;
adjust_minor_merge_boundary(min_snapshot_version, max_snapshot_version);
const int64_t expect_multi_version = MIN(freeze_info.next.freeze_ts, multi_version_start); const int64_t expect_multi_version = MIN(freeze_info.next.freeze_ts, multi_version_start);
if (OB_FAIL(find_mini_minor_merge_tables( if (OB_FAIL(find_mini_minor_merge_tables(
param, min_snapshot_version, max_snapshot_version, expect_multi_version, result))) { param, min_snapshot_version, max_snapshot_version, expect_multi_version, result))) {
...@@ -1400,6 +1413,7 @@ int ObTableStore::get_hist_minor_range(const ObIArray<ObFreezeInfoSnapshotMgr::F ...@@ -1400,6 +1413,7 @@ int ObTableStore::get_hist_minor_range(const ObIArray<ObFreezeInfoSnapshotMgr::F
int64_t max_cnt = 1; int64_t max_cnt = 1;
int64_t max_idx = 0; int64_t max_idx = 0;
ObITable* table = nullptr; ObITable* table = nullptr;
const bool is_strict_mode = table_count_ < OB_UNSAFE_TABLE_CNT;
for (int64_t pos = inc_pos_; OB_SUCC(ret) && idx < freeze_infos.count() && pos < table_count_; pos++) { for (int64_t pos = inc_pos_; OB_SUCC(ret) && idx < freeze_infos.count() && pos < table_count_; pos++) {
int64_t freeze_ts = freeze_infos.at(idx).freeze_ts; int64_t freeze_ts = freeze_infos.at(idx).freeze_ts;
if (OB_ISNULL(table = tables_[pos])) { if (OB_ISNULL(table = tables_[pos])) {
...@@ -1407,7 +1421,8 @@ int ObTableStore::get_hist_minor_range(const ObIArray<ObFreezeInfoSnapshotMgr::F ...@@ -1407,7 +1421,8 @@ int ObTableStore::get_hist_minor_range(const ObIArray<ObFreezeInfoSnapshotMgr::F
LOG_ERROR("Unexpected null table", K(ret), K(pos), K(PRETTY_TS(*this))); LOG_ERROR("Unexpected null table", K(ret), K(pos), K(PRETTY_TS(*this)));
} else if (table->get_base_version() < last_freeze_ts) { } else if (table->get_base_version() < last_freeze_ts) {
// skip small minor sstable // skip small minor sstable
} else if (table->get_max_merged_trans_version() > freeze_ts) { } else if (table->get_snapshot_version() > freeze_ts
|| (is_strict_mode && table->get_max_merged_trans_version() > freeze_ts)) {
if (cnt > max_cnt) { if (cnt > max_cnt) {
max_cnt = cnt; max_cnt = cnt;
max_idx = idx; max_idx = idx;
...@@ -1709,6 +1724,7 @@ int ObTableStore::find_mini_minor_merge_tables(const ObGetMergeTablesParam& para ...@@ -1709,6 +1724,7 @@ int ObTableStore::find_mini_minor_merge_tables(const ObGetMergeTablesParam& para
const int64_t inc_pos = inc_pos_ >= 0 ? inc_pos_ : 0; const int64_t inc_pos = inc_pos_ >= 0 ? inc_pos_ : 0;
const ObMergeType merge_type = param.merge_type_; const ObMergeType merge_type = param.merge_type_;
int64_t reserve_snapshot_for_major = INT64_MAX; int64_t reserve_snapshot_for_major = INT64_MAX;
const bool is_strict_mode = table_count_ < OB_UNSAFE_TABLE_CNT;
LOG_INFO("find_mini_minor_merge_tables", LOG_INFO("find_mini_minor_merge_tables",
K(ret), K(ret),
K(min_snapshot_version), K(min_snapshot_version),
...@@ -1760,7 +1776,8 @@ int ObTableStore::find_mini_minor_merge_tables(const ObGetMergeTablesParam& para ...@@ -1760,7 +1776,8 @@ int ObTableStore::find_mini_minor_merge_tables(const ObGetMergeTablesParam& para
result.handle_.reset(); result.handle_.reset();
result.version_range_.reset(); result.version_range_.reset();
result.log_ts_range_.reset(); result.log_ts_range_.reset();
} else if (table->get_max_merged_trans_version() > max_snapshot_version) { } else if (table->get_snapshot_version() > max_snapshot_version
|| (is_strict_mode && table->get_max_merged_trans_version() > max_snapshot_version)) {
// snapshot_version <= max_merged_trans_version <= upper_trans_version // snapshot_version <= max_merged_trans_version <= upper_trans_version
// upper_trans_version is more safe to keep the minor sstable do not // upper_trans_version is more safe to keep the minor sstable do not
// crossing the major freeze, but it's not always properly filled. // crossing the major freeze, but it's not always properly filled.
...@@ -2697,12 +2714,14 @@ int ObTableStore::check_need_mini_minor_merge(const bool using_remote_memstore, ...@@ -2697,12 +2714,14 @@ int ObTableStore::check_need_mini_minor_merge(const bool using_remote_memstore,
int64_t minor_sstable_count = 0; int64_t minor_sstable_count = 0;
int64_t need_merge_mini_count = 0; int64_t need_merge_mini_count = 0;
int64_t minor_check_snapshot_version = 0; int64_t minor_check_snapshot_version = 0;
int64_t min_snapshot_version = freeze_info.prev.freeze_ts;
int64_t max_snapshot_version = freeze_info.next.freeze_version > 0 ? freeze_info.next.freeze_ts : INT64_MAX;
adjust_minor_merge_boundary(min_snapshot_version, max_snapshot_version);
for (int64_t i = inc_pos_; OB_SUCC(ret) && i < table_count_; ++i) { for (int64_t i = inc_pos_; OB_SUCC(ret) && i < table_count_; ++i) {
if (!is_own_table(tables_[i])) { if (!is_own_table(tables_[i])) {
continue; continue;
} else if (tables_[i]->get_base_version() >= freeze_info.prev.freeze_ts) { } else if (tables_[i]->get_base_version() >= min_snapshot_version) {
if (freeze_info.next.freeze_version > 0 && if (tables_[i]->get_max_merged_trans_version() > max_snapshot_version) {
tables_[i]->get_max_merged_trans_version() > freeze_info.next.freeze_ts) {
break; break;
} }
minor_sstable_count++; minor_sstable_count++;
......
...@@ -163,6 +163,7 @@ public: ...@@ -163,6 +163,7 @@ public:
private: private:
// Common Section // Common Section
void adjust_minor_merge_boundary(int64_t &min_snapshot, int64_t &max_snapshot);
bool is_multi_version_break(const ObVersionRange& new_version_range, const int64_t last_snapshot_vesion); bool is_multi_version_break(const ObVersionRange& new_version_range, const int64_t last_snapshot_vesion);
int classify_tables(const ObTablesHandle& old_handle, common::ObArray<ObITable*>& major_tables, int classify_tables(const ObTablesHandle& old_handle, common::ObArray<ObITable*>& major_tables,
common::ObArray<ObITable*>& inc_tables); common::ObArray<ObITable*>& inc_tables);
...@@ -237,10 +238,13 @@ private: ...@@ -237,10 +238,13 @@ private:
const ObTablesHandle& old_handle, int64_t& first_reference_pos, int64_t& last_reference_pos); const ObTablesHandle& old_handle, int64_t& first_reference_pos, int64_t& last_reference_pos);
int get_major_split_table_pos(const ObTablesHandle& old_handle, int64_t& pos); int get_major_split_table_pos(const ObTablesHandle& old_handle, int64_t& pos);
protected: protected:
static const int64_t INVAID_TABLE_POS = -1; static const int64_t INVAID_TABLE_POS = -1;
static const int64_t DEFAULT_SSTABLE_CNT = 6; static const int64_t DEFAULT_SSTABLE_CNT = 6;
static const int64_t OB_HIST_MINOR_FACTOR = 3; static const int64_t OB_HIST_MINOR_FACTOR = 3;
static const int64_t OB_UNSAFE_TABLE_CNT = 48;
static const int64_t OB_EMERGENCY_TABLE_CNT = 52;
// No Need Persistence // No Need Persistence
bool is_inited_; bool is_inited_;
ObFreezeInfoSnapshotMgr* freeze_info_mgr_; ObFreezeInfoSnapshotMgr* freeze_info_mgr_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册