提交 5643b8be 编写于 作者: S suz-yang 提交者: ob-robot

fix calculate inc ddl read tables

上级 b90cdf32
......@@ -1068,22 +1068,17 @@ int ObTabletTableStore::calculate_inc_ddl_read_tables(
const bool is_empty_inc_ddl_read_tables = inc_major_ddl_sstables_.empty()
&& inc_major_ddl_mem_sstables_.empty();
if (!is_empty_inc_ddl_read_tables) {
const SCN end_scn = inc_major_tables_.empty()
? SCN::min_scn()
: inc_major_tables_.get_boundary_table(true /*is_last*/)->get_end_scn();
const bool is_row_store = tablet.is_row_store();
if (!is_row_store) {
const int64_t data_format_version = tablet.get_tablet_meta().ddl_data_format_version_;
const bool need_rescan = ObDDLUtil::need_rescan_column_store(data_format_version);
if (OB_FAIL(inner_calculate_inc_ddl_column_read_tables(tablet,
snapshot_version,
// column_group_cnt,
// column_cnt,
// co_base_type,
need_rescan,
iterator))) {
LOG_WARN("fail to inner calculate inc ddl column read tables", KR(ret), K(snapshot_version), K(need_rescan));
if (OB_FAIL(inner_calculate_inc_ddl_column_read_tables(tablet, snapshot_version, end_scn, iterator))) {
LOG_WARN("fail to inner calculate inc ddl column read tables", KR(ret), K(snapshot_version), K(end_scn));
}
} else {
if (OB_FAIL(inner_calculate_inc_ddl_row_read_tables(snapshot_version, iterator))) {
LOG_WARN("fail to inner calculate inc ddl row read tables", KR(ret), K(snapshot_version));
if (OB_FAIL(inner_calculate_inc_ddl_row_read_tables(snapshot_version, end_scn, iterator))) {
LOG_WARN("fail to inner calculate inc ddl row read tables", KR(ret), K(snapshot_version), K(end_scn));
}
}
}
......@@ -1131,25 +1126,9 @@ int ObTabletTableStore::get_inc_major_cg_info(
return ret;
}
int ObTabletTableStore::check_scn_range_less_than_inc_major_right_border(const ObScnRange &scn_range, bool &is_less_than_right_border) const
{
int ret = OB_SUCCESS;
if (inc_major_tables_.count() == 0) {
is_less_than_right_border = false;
} else {
is_less_than_right_border = true;
if (scn_range.end_scn_ <= inc_major_tables_[inc_major_tables_.count() - 1]->get_end_scn()) {
is_less_than_right_border = true;
LOG_INFO("scn range is less than inc major right border", K(scn_range), K(inc_major_tables_[inc_major_tables_.count() - 1]->get_end_scn()));
} else {
is_less_than_right_border = false;
}
}
return ret;
}
int ObTabletTableStore::inner_calculate_inc_ddl_row_read_tables(
const int64_t snapshot_version,
const SCN &inc_major_end_scn,
ObTableStoreIterator &iterator) const
{
int ret = OB_SUCCESS;
......@@ -1157,36 +1136,29 @@ int ObTabletTableStore::inner_calculate_inc_ddl_row_read_tables(
ObDDLKV *ddlkv = nullptr;
for (int64_t i = 0; OB_SUCC(ret) && i < inc_major_ddl_sstables_.count(); ++i) {
sstable = inc_major_ddl_sstables_.at(i);
bool is_less_than_right_border = false;
if (OB_ISNULL(sstable)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sstable is nullptr", KR(ret), K(i));
} else if (sstable->is_empty()) {
// skip empty inc major ddl dump sstables
continue;
} else if (OB_FAIL(check_scn_range_less_than_inc_major_right_border(sstable->get_scn_range(), is_less_than_right_border))) {
LOG_WARN("fail to check scn range is less than inc major right border", KR(ret));
} else if (is_less_than_right_border) {
LOG_INFO("scn range is less than inc major right border, skip", K(sstable->get_scn_range()));
continue;
} else if (sstable->get_end_scn() <= inc_major_end_scn) {
// inc major already merged, skip
LOG_INFO("scn range is less than inc major right border, skip", K(inc_major_end_scn), K(sstable->get_key()));
} else if (OB_FAIL(iterator.add_table(sstable))) {
LOG_WARN("fail to add table", KR(ret));
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < inc_major_ddl_mem_sstables_.count(); ++i) {
ddlkv = inc_major_ddl_mem_sstables_[i];
bool is_less_than_right_border = false;
if (OB_ISNULL(ddlkv)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ddlkv is nullptr", KR(ret), K(i));
} else if (!ddlkv->is_freezed()) {
// Unfrozen ddlkv is not read to prevent concurrent reading and writing of ddl_memtable (inc_commit_log ensures that ddlkv is frozen)
break;
} else if (OB_FAIL(check_scn_range_less_than_inc_major_right_border(ddlkv->get_scn_range(), is_less_than_right_border))) {
LOG_WARN("fail to check scn range is less than inc major right border", KR(ret));
} else if (is_less_than_right_border) {
LOG_INFO("scn range is less than inc major right border, skip", K(ddlkv->get_scn_range()));
continue;
} else if (ddlkv->get_end_scn() <= inc_major_end_scn) {
// inc major already merged, skip
LOG_INFO("scn range is less than inc major right border, skip", K(inc_major_end_scn), K(ddlkv->get_key()));
} else {
ObIArray<ObDDLMemtable *> &ddl_memtables = ddlkv->get_ddl_memtables();
for (int64_t j = 0; OB_SUCC(ret) && j < ddl_memtables.count(); ++j) {
......@@ -1203,57 +1175,10 @@ int ObTabletTableStore::inner_calculate_inc_ddl_row_read_tables(
return ret;
}
int ObTabletTableStore::get_filtered_inc_major_ddl_sstables(
ObIArray<ObSSTable *> &filtered_inc_major_ddl_sstables) const
{
int ret = OB_SUCCESS;
filtered_inc_major_ddl_sstables.reset();
for (int64_t i = 0; OB_SUCC(ret) && i < inc_major_ddl_sstables_.count(); ++i) {
ObSSTable *sstable = inc_major_ddl_sstables_.at(i);
bool is_less_than_right_border = false;
if (OB_ISNULL(sstable)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sstable is nullptr", KR(ret), K(i));
} else if (OB_FAIL(check_scn_range_less_than_inc_major_right_border(sstable->get_scn_range(), is_less_than_right_border))) {
LOG_WARN("fail to check scn range is less than inc major right border", KR(ret));
} else if (is_less_than_right_border) {
LOG_INFO("scn range is less than inc major right border, skip", K(sstable->get_scn_range()));
continue;
} else if (OB_FAIL(filtered_inc_major_ddl_sstables.push_back(sstable))) {
LOG_WARN("fail to push back sstable", KR(ret));
}
}
return ret;
}
int ObTabletTableStore::get_filtered_inc_major_ddl_mem_sstables(
ObIArray<ObDDLKV *> &filtered_inc_major_ddl_mem_sstables) const
{
int ret = OB_SUCCESS;
filtered_inc_major_ddl_mem_sstables.reset();
for (int64_t i = 0; OB_SUCC(ret) && i < inc_major_ddl_mem_sstables_.count(); ++i) {
ObDDLKV *ddlkv = inc_major_ddl_mem_sstables_[i];
bool is_less_than_right_border = false;
if (OB_ISNULL(ddlkv)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ddlkv is nullptr", KR(ret), K(i));
} else if (OB_FAIL(check_scn_range_less_than_inc_major_right_border(ddlkv->get_scn_range(), is_less_than_right_border))) {
LOG_WARN("fail to check scn range is less than inc major right border", KR(ret));
} else if (is_less_than_right_border) {
LOG_INFO("scn range is less than inc major right border, skip", K(ddlkv->get_scn_range()));
continue;
} else if (OB_FAIL(filtered_inc_major_ddl_mem_sstables.push_back(ddlkv))) {
LOG_WARN("fail to push back ddlkv", KR(ret));
}
}
return ret;
}
int ObTabletTableStore::inner_calculate_inc_ddl_column_read_tables(
const ObTablet &tablet,
const int64_t snapshot_version,
const bool need_rescan,
const SCN &inc_major_end_scn,
ObTableStoreIterator &iterator) const
{
int ret = OB_SUCCESS;
......@@ -1268,6 +1193,28 @@ int ObTabletTableStore::inner_calculate_inc_ddl_column_read_tables(
ObTableHandleV2 table_handle;
ObIncMajorDDLAggregateCOSSTable *agg_sstable = nullptr;
ObTabletDDLCompleteMdsUserData user_data;
// locate sstable start idx and ddlkv start idx
while (cur_sstable_idx < inc_major_ddl_sstables_.count()) {
ObSSTable *sstable = inc_major_ddl_sstables_.at(cur_sstable_idx);
if (sstable->get_end_scn() <= inc_major_end_scn) {
LOG_INFO("scn range is less than inc major right border, skip", K(inc_major_end_scn), K(sstable->get_key()));
++cur_sstable_idx;
} else {
break;
}
++cur_sstable_idx;
}
while (cur_ddl_kv_idx < inc_major_ddl_mem_sstables_.count()) {
ObDDLKV *ddlkv = inc_major_ddl_mem_sstables_[cur_ddl_kv_idx];
if (ddlkv->get_end_scn() <= inc_major_end_scn) {
LOG_INFO("scn range is less than inc major right border, skip", K(inc_major_end_scn), K(ddlkv->get_key()));
++cur_ddl_kv_idx;
} else {
break;
}
}
while (OB_SUCC(ret) && (cur_sstable_idx < inc_major_ddl_sstables_.count()
|| cur_ddl_kv_idx < inc_major_ddl_mem_sstables_.count())) {
read_tables.reset();
......@@ -1280,12 +1227,11 @@ int ObTabletTableStore::inner_calculate_inc_ddl_column_read_tables(
int64_t column_group_cnt = 0;
int64_t column_cnt = 0;
ObCOSSTableBaseType co_base_type;
if (OB_FAIL(inner_calculate_inc_ddl_column_read_sstables(need_rescan,
cur_sstable_idx,
if (OB_FAIL(inner_calculate_inc_ddl_column_read_sstables(cur_sstable_idx,
tx_id,
read_tables))) {
LOG_WARN("fail to calculate inc ddl read sstables",
KR(ret), K(need_rescan), K(cur_sstable_idx), K(tx_id));
KR(ret), K(cur_sstable_idx), K(tx_id));
} else if (FALSE_IT(ddl_dump_cnt = read_tables.count())) {
} else if (OB_FAIL(inner_calculate_inc_ddl_column_read_memtables(cur_ddl_kv_idx,
tx_id,
......@@ -1349,7 +1295,6 @@ int ObTabletTableStore::inner_calculate_inc_ddl_column_read_tables(
}
int ObTabletTableStore::inner_calculate_inc_ddl_column_read_sstables(
const bool need_rescan,
int64_t &cur_sstable_idx,
int64_t &tx_id,
ObIArray<ObITable *> &read_tables) const
......@@ -1358,12 +1303,8 @@ int ObTabletTableStore::inner_calculate_inc_ddl_column_read_sstables(
ObSSTable *sstable = nullptr;
ObCOSSTableV2 *co_sstable = nullptr;
ObSSTableMetaHandle meta_handle;
ObArray<ObSSTable *> filtered_inc_major_ddl_sstables;
if (OB_FAIL(get_filtered_inc_major_ddl_sstables(filtered_inc_major_ddl_sstables))) {
LOG_WARN("fail to get filtered inc major ddl sstables", KR(ret));
}
while (OB_SUCC(ret) && cur_sstable_idx < filtered_inc_major_ddl_sstables.count()) {
sstable = filtered_inc_major_ddl_sstables.at(cur_sstable_idx);
while (OB_SUCC(ret) && cur_sstable_idx < inc_major_ddl_sstables_.count()) {
sstable = inc_major_ddl_sstables_.at(cur_sstable_idx);
if (OB_ISNULL(sstable)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sstable is nullptr", KR(ret));
......@@ -1382,11 +1323,6 @@ int ObTabletTableStore::inner_calculate_inc_ddl_column_read_sstables(
if (OB_INVALID_ID == tx_id) {
// Get first ddl dump sstable for a specific inc major direct load.
tx_id = uncommit_info.tx_infos_[0].tx_id_;
} else if (need_rescan && tx_id == uncommit_info.tx_infos_[0].tx_id_) {
// Only one ddl dump sstable is allowed to exist when need rescan.
ret = OB_DATA_NOT_UPTODATE;
LOG_WARN("query only support one co sstable", K(ret), K(need_rescan), K(filtered_inc_major_ddl_sstables.count()),
K(tx_id), K(uncommit_info));
} else if (tx_id != uncommit_info.tx_infos_[0].tx_id_) {
// Get the first ddl dump sstable for next inc major direct load.
break;
......@@ -1418,12 +1354,8 @@ int ObTabletTableStore::inner_calculate_inc_ddl_column_read_memtables(
{
int ret = OB_SUCCESS;
ObSSTable *sstable = nullptr;
ObArray<ObDDLKV *> filtered_inc_major_ddl_mem_sstables;
if (OB_FAIL(get_filtered_inc_major_ddl_mem_sstables(filtered_inc_major_ddl_mem_sstables))) {
LOG_WARN("fail to get filtered inc major ddl mem sstables", KR(ret));
}
while (OB_SUCC(ret) && cur_ddl_kv_idx < filtered_inc_major_ddl_mem_sstables.count()) {
ObDDLKV *ddl_kv = filtered_inc_major_ddl_mem_sstables.at(cur_ddl_kv_idx);
while (OB_SUCC(ret) && cur_ddl_kv_idx < inc_major_ddl_mem_sstables_.count()) {
ObDDLKV *ddl_kv = inc_major_ddl_mem_sstables_[cur_ddl_kv_idx];
if (OB_ISNULL(ddl_kv)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ddl kv is nullptr", KR(ret));
......
......@@ -519,14 +519,14 @@ private:
ObCOSSTableBaseType &co_base_type) const;
int inner_calculate_inc_ddl_row_read_tables(
const int64_t snapshot_version,
const share::SCN &inc_major_end_scn,
ObTableStoreIterator &iterator) const;
int inner_calculate_inc_ddl_column_read_tables(
const ObTablet &tablet,
const int64_t snapshot_version,
const bool need_rescan,
const share::SCN &inc_major_end_scn,
ObTableStoreIterator &iterator) const;
int inner_calculate_inc_ddl_column_read_sstables(
const bool need_rescan,
int64_t &cur_sstable_idx,
int64_t &tx_id,
ObIArray<ObITable *> &read_tables) const;
......@@ -540,11 +540,6 @@ private:
const ObBatchUpdateTableStoreParam &param,
const ObTabletTableStore &old_store,
const int64_t inc_base_snapshot_version);
int check_scn_range_less_than_inc_major_right_border(const ObScnRange &scn_range, bool &is_less_than_right_border) const;
int get_filtered_inc_major_ddl_sstables(
ObIArray<ObSSTable *> &filtered_inc_major_ddl_sstables) const;
int get_filtered_inc_major_ddl_mem_sstables(
ObIArray<ObDDLKV *> &filtered_inc_major_ddl_mem_sstables) const;
public:
static const int64_t TABLE_STORE_VERSION_V1 = 0x0100;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册