提交 f10f55af 编写于 作者: X xy0 提交者: LINGuanRen

core in ObMultipleMultiScanMerge::construct_iters

上级 fb7b94bc
...@@ -22,13 +22,13 @@ namespace storage { ...@@ -22,13 +22,13 @@ namespace storage {
ObMultipleMultiScanMerge::ObMultipleMultiScanMerge() ObMultipleMultiScanMerge::ObMultipleMultiScanMerge()
: ObMultipleScanMergeImpl(), get_num_(0), ranges_(NULL), cow_ranges_() : ObMultipleScanMergeImpl(), get_num_(0), ranges_(NULL), cow_ranges_()
{ {
memset(get_items_, 0, sizeof(ObStoreRow*) * MAX_TABLE_CNT_IN_STORAGE); memset(get_items_, 0, sizeof(ObStoreRow *) * MAX_TABLE_CNT_IN_STORAGE);
} }
ObMultipleMultiScanMerge::~ObMultipleMultiScanMerge() ObMultipleMultiScanMerge::~ObMultipleMultiScanMerge()
{} {}
int ObMultipleMultiScanMerge::is_get_data_ready(bool& is_ready) int ObMultipleMultiScanMerge::is_get_data_ready(bool &is_ready)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t cur_get_index_ = -1; int64_t cur_get_index_ = -1;
...@@ -53,7 +53,7 @@ int ObMultipleMultiScanMerge::is_get_data_ready(bool& is_ready) ...@@ -53,7 +53,7 @@ int ObMultipleMultiScanMerge::is_get_data_ready(bool& is_ready)
return ret; return ret;
} }
int ObMultipleMultiScanMerge::is_scan_data_ready(bool& is_ready) int ObMultipleMultiScanMerge::is_scan_data_ready(bool &is_ready)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
is_ready = !is_scan_end(); is_ready = !is_scan_end();
...@@ -66,10 +66,10 @@ void ObMultipleMultiScanMerge::reset() ...@@ -66,10 +66,10 @@ void ObMultipleMultiScanMerge::reset()
get_num_ = 0; get_num_ = 0;
ranges_ = NULL; ranges_ = NULL;
cow_ranges_.reset(); cow_ranges_.reset();
memset(get_items_, 0, sizeof(ObStoreRow*) * MAX_TABLE_CNT_IN_STORAGE); memset(get_items_, 0, sizeof(ObStoreRow *) * MAX_TABLE_CNT_IN_STORAGE);
} }
int ObMultipleMultiScanMerge::open(const ObIArray<ObExtStoreRange>& ranges) int ObMultipleMultiScanMerge::open(const ObIArray<ObExtStoreRange> &ranges)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
...@@ -138,12 +138,12 @@ int ObMultipleMultiScanMerge::calc_scan_range() ...@@ -138,12 +138,12 @@ int ObMultipleMultiScanMerge::calc_scan_range()
cow_ranges_.reuse(); cow_ranges_.reuse();
for (int64_t i = l; i < r && OB_SUCC(ret); ++i) { for (int64_t i = l; i < r && OB_SUCC(ret); ++i) {
ObExtStoreRange& range = tmp_ranges.at(i); ObExtStoreRange &range = tmp_ranges.at(i);
if (curr_scan_index_ == i && range.get_range().include(rowkey_range)) { if (curr_scan_index_ == i && range.get_range().include(rowkey_range)) {
range.change_boundary(curr_rowkey_, is_reverse_scan, true); range.change_boundary(curr_rowkey_, is_reverse_scan, true);
if (range.get_range().is_valid()) { if (range.get_range().is_valid()) {
if (OB_FAIL(const_cast<ObExtStoreRange&>(range).to_collation_free_range_on_demand_and_cutoff_range( if (OB_FAIL(const_cast<ObExtStoreRange &>(range).to_collation_free_range_on_demand_and_cutoff_range(
*access_ctx_->allocator_))) { *access_ctx_->allocator_))) {
STORAGE_LOG(WARN, "fail to get collation free rowkey", K(ret)); STORAGE_LOG(WARN, "fail to get collation free rowkey", K(ret));
} else if (OB_FAIL(cow_ranges_.push_back(range))) { } else if (OB_FAIL(cow_ranges_.push_back(range))) {
...@@ -182,7 +182,7 @@ OB_INLINE int ObMultipleMultiScanMerge::prepare() ...@@ -182,7 +182,7 @@ OB_INLINE int ObMultipleMultiScanMerge::prepare()
return ObMultipleScanMergeImpl::prepare_loser_tree(); return ObMultipleScanMergeImpl::prepare_loser_tree();
} }
void ObMultipleMultiScanMerge::collect_merge_stat(ObTableStoreStat& stat) const void ObMultipleMultiScanMerge::collect_merge_stat(ObTableStoreStat &stat) const
{ {
stat.multi_scan_stat_.call_cnt_++; stat.multi_scan_stat_.call_cnt_++;
stat.multi_scan_stat_.output_row_cnt_ += table_stat_.output_row_cnt_; stat.multi_scan_stat_.output_row_cnt_ += table_stat_.output_row_cnt_;
...@@ -191,7 +191,7 @@ void ObMultipleMultiScanMerge::collect_merge_stat(ObTableStoreStat& stat) const ...@@ -191,7 +191,7 @@ void ObMultipleMultiScanMerge::collect_merge_stat(ObTableStoreStat& stat) const
int ObMultipleMultiScanMerge::construct_iters() int ObMultipleMultiScanMerge::construct_iters()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const ObIArray<ObITable*>& tables = tables_handle_.get_tables(); const ObIArray<ObITable *> &tables = tables_handle_.get_tables();
consumer_.reset(); consumer_.reset();
...@@ -206,9 +206,9 @@ int ObMultipleMultiScanMerge::construct_iters() ...@@ -206,9 +206,9 @@ int ObMultipleMultiScanMerge::construct_iters()
tables.count(), tables.count(),
KP(this)); KP(this));
} else if (tables.count() > 0) { } else if (tables.count() > 0) {
ObITable* table = NULL; ObITable *table = NULL;
ObStoreRowIterator* iter = NULL; ObStoreRowIterator *iter = NULL;
const ObTableIterParam* iter_param = NULL; const ObTableIterParam *iter_param = NULL;
bool use_cache_iter = iters_.count() > 0; bool use_cache_iter = iters_.count() > 0;
if (OB_FAIL(loser_tree_.init(tables.count(), *access_ctx_->stmt_allocator_))) { if (OB_FAIL(loser_tree_.init(tables.count(), *access_ctx_->stmt_allocator_))) {
...@@ -228,7 +228,7 @@ int ObMultipleMultiScanMerge::construct_iters() ...@@ -228,7 +228,7 @@ int ObMultipleMultiScanMerge::construct_iters()
iter->~ObStoreRowIterator(); iter->~ObStoreRowIterator();
STORAGE_LOG(WARN, "Fail to push iter to iterator array, ", K(ret), K(i)); STORAGE_LOG(WARN, "Fail to push iter to iterator array, ", K(ret), K(i));
} }
} else if (OB_ISNULL(iters_.at(tables.count() - 1 - i))) { } else if (OB_ISNULL(iter = iters_.at(tables.count() - 1 - i))) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "Unexpected null iter", K(ret), "idx", tables.count() - 1 - i, K_(iters)); STORAGE_LOG(WARN, "Unexpected null iter", K(ret), "idx", tables.count() - 1 - i, K_(iters));
} else if (OB_FAIL(iter->init(*iter_param, *access_ctx_, table, ranges_))) { } else if (OB_FAIL(iter->init(*iter_param, *access_ctx_, table, ranges_))) {
...@@ -263,7 +263,7 @@ int ObMultipleMultiScanMerge::supply_consume() ...@@ -263,7 +263,7 @@ int ObMultipleMultiScanMerge::supply_consume()
const int64_t consumer_cnt = consumer_.get_consumer_num(); const int64_t consumer_cnt = consumer_.get_consumer_num();
for (int64_t i = 0; OB_SUCC(ret) && i < consumer_cnt; ++i) { for (int64_t i = 0; OB_SUCC(ret) && i < consumer_cnt; ++i) {
const int64_t iter_idx = consumer_.get_consumer_iters()[i]; const int64_t iter_idx = consumer_.get_consumer_iters()[i];
ObStoreRowIterator* iter = iters_.at(iter_idx); ObStoreRowIterator *iter = iters_.at(iter_idx);
if (NULL == iter) { if (NULL == iter) {
ret = common::OB_ERR_UNEXPECTED; ret = common::OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "Unexpected error", K(ret), K(iter)); STORAGE_LOG(WARN, "Unexpected error", K(ret), K(iter));
...@@ -322,10 +322,10 @@ int ObMultipleMultiScanMerge::supply_consume() ...@@ -322,10 +322,10 @@ int ObMultipleMultiScanMerge::supply_consume()
return ret; return ret;
} }
int ObMultipleMultiScanMerge::inner_get_next_row_for_get(ObStoreRow& row, bool& need_retry) int ObMultipleMultiScanMerge::inner_get_next_row_for_get(ObStoreRow &row, bool &need_retry)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const ObStoreRow* tmp_row = NULL; const ObStoreRow *tmp_row = NULL;
bool final_result = false; bool final_result = false;
nop_pos_.reset(); nop_pos_.reset();
row.row_val_.count_ = 0; row.row_val_.count_ = 0;
...@@ -367,7 +367,7 @@ int ObMultipleMultiScanMerge::inner_get_next_row_for_get(ObStoreRow& row, bool& ...@@ -367,7 +367,7 @@ int ObMultipleMultiScanMerge::inner_get_next_row_for_get(ObStoreRow& row, bool&
return ret; return ret;
} }
int ObMultipleMultiScanMerge::inner_get_next_row_for_scan(ObStoreRow& row, bool& need_retry) int ObMultipleMultiScanMerge::inner_get_next_row_for_scan(ObStoreRow &row, bool &need_retry)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_FAIL(ObMultipleScanMergeImpl::inner_get_next_row(row, need_retry))) { if (OB_FAIL(ObMultipleScanMergeImpl::inner_get_next_row(row, need_retry))) {
...@@ -377,7 +377,7 @@ int ObMultipleMultiScanMerge::inner_get_next_row_for_scan(ObStoreRow& row, bool& ...@@ -377,7 +377,7 @@ int ObMultipleMultiScanMerge::inner_get_next_row_for_scan(ObStoreRow& row, bool&
return ret; return ret;
} }
int ObMultipleMultiScanMerge::inner_get_next_row(ObStoreRow& row) int ObMultipleMultiScanMerge::inner_get_next_row(ObStoreRow &row)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
bool need_retry = true; bool need_retry = true;
...@@ -424,7 +424,7 @@ int ObMultipleMultiScanMerge::inner_get_next_row(ObStoreRow& row) ...@@ -424,7 +424,7 @@ int ObMultipleMultiScanMerge::inner_get_next_row(ObStoreRow& row)
} }
int ObMultipleMultiScanMerge::to_collation_free_range_on_demand( int ObMultipleMultiScanMerge::to_collation_free_range_on_demand(
const ObIArray<ObExtStoreRange>& ranges, common::ObIAllocator& allocator) const ObIArray<ObExtStoreRange> &ranges, common::ObIAllocator &allocator)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (0 == ranges.count()) { if (0 == ranges.count()) {
...@@ -433,8 +433,8 @@ int ObMultipleMultiScanMerge::to_collation_free_range_on_demand( ...@@ -433,8 +433,8 @@ int ObMultipleMultiScanMerge::to_collation_free_range_on_demand(
} }
for (int64_t i = 0; OB_SUCC(ret) && i < ranges.count(); ++i) { for (int64_t i = 0; OB_SUCC(ret) && i < ranges.count(); ++i) {
if (OB_FAIL( if (OB_FAIL(const_cast<ObExtStoreRange &>(ranges.at(i))
const_cast<ObExtStoreRange&>(ranges.at(i)).to_collation_free_range_on_demand_and_cutoff_range(allocator))) { .to_collation_free_range_on_demand_and_cutoff_range(allocator))) {
STORAGE_LOG(WARN, "fail to get collation free rowkey", K(ret), K(ranges.at(i).get_range())); STORAGE_LOG(WARN, "fail to get collation free rowkey", K(ret), K(ranges.at(i).get_range()));
} }
} }
...@@ -442,8 +442,8 @@ int ObMultipleMultiScanMerge::to_collation_free_range_on_demand( ...@@ -442,8 +442,8 @@ int ObMultipleMultiScanMerge::to_collation_free_range_on_demand(
} }
int ObMultipleMultiScanMerge::estimate_row_count(const ObQueryFlag query_flag, const uint64_t table_id, int ObMultipleMultiScanMerge::estimate_row_count(const ObQueryFlag query_flag, const uint64_t table_id,
const common::ObIArray<common::ObExtStoreRange>& ranges, const common::ObIArray<ObITable*>& tables, const common::ObIArray<common::ObExtStoreRange> &ranges, const common::ObIArray<ObITable *> &tables,
ObPartitionEst& part_estimate, common::ObIArray<common::ObEstRowCountRecord>& est_records) ObPartitionEst &part_estimate, common::ObIArray<common::ObEstRowCountRecord> &est_records)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
...@@ -460,7 +460,7 @@ int ObMultipleMultiScanMerge::estimate_row_count(const ObQueryFlag query_flag, c ...@@ -460,7 +460,7 @@ int ObMultipleMultiScanMerge::estimate_row_count(const ObQueryFlag query_flag, c
for (int64_t i = 0; OB_SUCC(ret) && i < tables.count(); ++i) { for (int64_t i = 0; OB_SUCC(ret) && i < tables.count(); ++i) {
int64_t start_time = common::ObTimeUtility::current_time(); int64_t start_time = common::ObTimeUtility::current_time();
table_est.reset(); table_est.reset();
ObITable* table = tables.at(i); ObITable *table = tables.at(i);
if (NULL == table) { if (NULL == table) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "Unexpected error, store shouldn't be null", K(ret), KP(table)); STORAGE_LOG(WARN, "Unexpected error, store shouldn't be null", K(ret), KP(table));
...@@ -516,7 +516,7 @@ int ObMultipleMultiScanMerge::skip_to_range(const int64_t range_idx) ...@@ -516,7 +516,7 @@ int ObMultipleMultiScanMerge::skip_to_range(const int64_t range_idx)
} }
// skip data in heap and iterators // skip data in heap and iterators
const bool include_gap_key = ranges_->at(range_idx).get_range().get_border_flag().inclusive_start(); const bool include_gap_key = ranges_->at(range_idx).get_range().get_border_flag().inclusive_start();
const ObStoreRowkey& rowkey = ranges_->at(range_idx).get_range().get_start_key(); const ObStoreRowkey &rowkey = ranges_->at(range_idx).get_range().get_start_key();
STORAGE_LOG(DEBUG, "skip to range", K(include_gap_key), K(rowkey), K(range_idx)); STORAGE_LOG(DEBUG, "skip to range", K(include_gap_key), K(rowkey), K(range_idx));
if (OB_FAIL(reset_range(0L /*skip all tables*/, range_idx, &rowkey, include_gap_key))) { if (OB_FAIL(reset_range(0L /*skip all tables*/, range_idx, &rowkey, include_gap_key))) {
STORAGE_LOG(WARN, "fail to reset range", K(ret)); STORAGE_LOG(WARN, "fail to reset range", K(ret));
......
...@@ -30,7 +30,7 @@ ObSSTableMultiVersionRowIterator::ObSSTableMultiVersionRowIterator() ...@@ -30,7 +30,7 @@ ObSSTableMultiVersionRowIterator::ObSSTableMultiVersionRowIterator()
read_newest_(false) read_newest_(false)
{ {
not_exist_row_.flag_ = ObActionFlag::OP_ROW_DOES_NOT_EXIST; not_exist_row_.flag_ = ObActionFlag::OP_ROW_DOES_NOT_EXIST;
not_exist_row_.row_val_.cells_ = reinterpret_cast<ObObj*>(obj_buf_); not_exist_row_.row_val_.cells_ = reinterpret_cast<ObObj *>(obj_buf_);
not_exist_row_.row_val_.count_ = 0; not_exist_row_.row_val_.count_ = 0;
} }
...@@ -72,7 +72,7 @@ int ObSSTableMultiVersionRowIterator::new_iterator(ObIAllocator &allocator) ...@@ -72,7 +72,7 @@ int ObSSTableMultiVersionRowIterator::new_iterator(ObIAllocator &allocator)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (NULL == iter_) { if (NULL == iter_) {
void* buf = NULL; void *buf = NULL;
if (OB_ISNULL(buf = allocator.alloc(sizeof(T)))) { if (OB_ISNULL(buf = allocator.alloc(sizeof(T)))) {
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate iterator", K(ret)); LOG_WARN("failed to allocate iterator", K(ret));
...@@ -83,7 +83,7 @@ int ObSSTableMultiVersionRowIterator::new_iterator(ObIAllocator &allocator) ...@@ -83,7 +83,7 @@ int ObSSTableMultiVersionRowIterator::new_iterator(ObIAllocator &allocator)
return ret; return ret;
} }
int ObSSTableMultiVersionRowIterator::get_not_exist_row(const common::ObStoreRowkey& rowkey, const ObStoreRow*& row) int ObSSTableMultiVersionRowIterator::get_not_exist_row(const common::ObStoreRowkey &rowkey, const ObStoreRow *&row)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (out_cols_cnt_ <= 0) { if (out_cols_cnt_ <= 0) {
...@@ -91,7 +91,7 @@ int ObSSTableMultiVersionRowIterator::get_not_exist_row(const common::ObStoreRow ...@@ -91,7 +91,7 @@ int ObSSTableMultiVersionRowIterator::get_not_exist_row(const common::ObStoreRow
LOG_WARN("The multi version row iterator has not been inited, ", K(ret), K(out_cols_cnt_)); LOG_WARN("The multi version row iterator has not been inited, ", K(ret), K(out_cols_cnt_));
} else { } else {
const int64_t rowkey_cnt = rowkey.get_obj_cnt(); const int64_t rowkey_cnt = rowkey.get_obj_cnt();
not_exist_row_.row_val_.cells_ = reinterpret_cast<ObObj*>(obj_buf_); not_exist_row_.row_val_.cells_ = reinterpret_cast<ObObj *>(obj_buf_);
not_exist_row_.flag_ = ObActionFlag::OP_ROW_DOES_NOT_EXIST; not_exist_row_.flag_ = ObActionFlag::OP_ROW_DOES_NOT_EXIST;
not_exist_row_.row_val_.count_ = out_cols_cnt_; not_exist_row_.row_val_.count_ = out_cols_cnt_;
...@@ -108,10 +108,10 @@ int ObSSTableMultiVersionRowIterator::get_not_exist_row(const common::ObStoreRow ...@@ -108,10 +108,10 @@ int ObSSTableMultiVersionRowIterator::get_not_exist_row(const common::ObStoreRow
} }
int ObSSTableMultiVersionRowGetter::inner_open( int ObSSTableMultiVersionRowGetter::inner_open(
const ObTableIterParam& iter_param, ObTableAccessContext& access_ctx, ObITable* table, const void* query_range) const ObTableIterParam &iter_param, ObTableAccessContext &access_ctx, ObITable *table, const void *query_range)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const ObColDescIArray* out_cols = nullptr; const ObColDescIArray *out_cols = nullptr;
if (OB_ISNULL(query_range) || OB_ISNULL(table)) { if (OB_ISNULL(query_range) || OB_ISNULL(table)) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), KP(table), KP(query_range)); LOG_WARN("invalid argument", K(ret), KP(table), KP(query_range));
...@@ -123,7 +123,7 @@ int ObSSTableMultiVersionRowGetter::inner_open( ...@@ -123,7 +123,7 @@ int ObSSTableMultiVersionRowGetter::inner_open(
if (OB_FAIL(ObVersionStoreRangeConversionHelper::store_rowkey_to_multi_version_range( if (OB_FAIL(ObVersionStoreRangeConversionHelper::store_rowkey_to_multi_version_range(
*rowkey_, access_ctx.trans_version_range_, *access_ctx.allocator_, multi_version_range_))) { *rowkey_, access_ctx.trans_version_range_, *access_ctx.allocator_, multi_version_range_))) {
LOG_WARN("convert to multi version range failed", K(ret), K(*rowkey_)); LOG_WARN("convert to multi version range failed", K(ret), K(*rowkey_));
} else if (OB_FAIL(new_iterator<ObSSTableRowScanner>(*access_ctx.allocator_))) { } else if (OB_FAIL(new_iterator<ObSSTableRowScanner>(*access_ctx.stmt_allocator_))) {
LOG_WARN("failed to new iterator", K(ret)); LOG_WARN("failed to new iterator", K(ret));
} else if (OB_FAIL(iter_->init(iter_param, access_ctx, table, &multi_version_range_))) { } else if (OB_FAIL(iter_->init(iter_param, access_ctx, table, &multi_version_range_))) {
LOG_WARN("failed to open scanner", K(ret)); LOG_WARN("failed to open scanner", K(ret));
...@@ -132,7 +132,7 @@ int ObSSTableMultiVersionRowGetter::inner_open( ...@@ -132,7 +132,7 @@ int ObSSTableMultiVersionRowGetter::inner_open(
return ret; return ret;
} }
int ObSSTableMultiVersionRowGetter::inner_get_next_row(const ObStoreRow*& row) int ObSSTableMultiVersionRowGetter::inner_get_next_row(const ObStoreRow *&row)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_ISNULL(iter_) || OB_ISNULL(query_range_)) { if (OB_ISNULL(iter_) || OB_ISNULL(query_range_)) {
...@@ -173,7 +173,7 @@ uint8_t ObSSTableMultiVersionRowScannerBase::get_iter_flag() ...@@ -173,7 +173,7 @@ uint8_t ObSSTableMultiVersionRowScannerBase::get_iter_flag()
uint8_t flag = 0; uint8_t flag = 0;
if (!read_newest_) { if (!read_newest_) {
// do nothing // do nothing
} else if (OB_FAIL(static_cast<ObSSTableRowScanner*>(iter_)->get_row_iter_flag_impl(flag))) { } else if (OB_FAIL(static_cast<ObSSTableRowScanner *>(iter_)->get_row_iter_flag_impl(flag))) {
ret = OB_SUCCESS; ret = OB_SUCCESS;
} }
return flag; return flag;
...@@ -196,7 +196,7 @@ void ObSSTableMultiVersionRowScannerBase::reuse() ...@@ -196,7 +196,7 @@ void ObSSTableMultiVersionRowScannerBase::reuse()
} }
int ObSSTableMultiVersionRowScanner::inner_open( int ObSSTableMultiVersionRowScanner::inner_open(
const ObTableIterParam& iter_param, ObTableAccessContext& access_ctx, ObITable* table, const void* query_range) const ObTableIterParam &iter_param, ObTableAccessContext &access_ctx, ObITable *table, const void *query_range)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_ISNULL(query_range) || OB_ISNULL(table)) { if (OB_ISNULL(query_range) || OB_ISNULL(table)) {
...@@ -205,7 +205,7 @@ int ObSSTableMultiVersionRowScanner::inner_open( ...@@ -205,7 +205,7 @@ int ObSSTableMultiVersionRowScanner::inner_open(
} else { } else {
iter_param_ = &iter_param; iter_param_ = &iter_param;
access_ctx_ = &access_ctx; access_ctx_ = &access_ctx;
sstable_ = static_cast<ObSSTable*>(table); sstable_ = static_cast<ObSSTable *>(table);
query_range_ = query_range; query_range_ = query_range;
read_newest_ = access_ctx.trans_version_range_.snapshot_version_ >= sstable_->get_upper_trans_version() && read_newest_ = access_ctx.trans_version_range_.snapshot_version_ >= sstable_->get_upper_trans_version() &&
sstable_->has_compact_row(); sstable_->has_compact_row();
...@@ -213,7 +213,7 @@ int ObSSTableMultiVersionRowScanner::inner_open( ...@@ -213,7 +213,7 @@ int ObSSTableMultiVersionRowScanner::inner_open(
if (OB_FAIL(ObVersionStoreRangeConversionHelper::range_to_multi_version_range( if (OB_FAIL(ObVersionStoreRangeConversionHelper::range_to_multi_version_range(
*range_, access_ctx.trans_version_range_, *access_ctx.allocator_, multi_version_range_))) { *range_, access_ctx.trans_version_range_, *access_ctx.allocator_, multi_version_range_))) {
LOG_WARN("convert to multi version range failed", K(ret), K(*range_)); LOG_WARN("convert to multi version range failed", K(ret), K(*range_));
} else if (OB_FAIL(new_iterator<ObSSTableRowScanner>(*access_ctx.allocator_))) { } else if (OB_FAIL(new_iterator<ObSSTableRowScanner>(*access_ctx.stmt_allocator_))) {
LOG_WARN("failed to new iterator", K(ret)); LOG_WARN("failed to new iterator", K(ret));
} else if (OB_FAIL(iter_->init(iter_param, access_ctx, table, &multi_version_range_))) { } else if (OB_FAIL(iter_->init(iter_param, access_ctx, table, &multi_version_range_))) {
LOG_WARN("failed to open scanner", K(ret)); LOG_WARN("failed to open scanner", K(ret));
...@@ -222,7 +222,7 @@ int ObSSTableMultiVersionRowScanner::inner_open( ...@@ -222,7 +222,7 @@ int ObSSTableMultiVersionRowScanner::inner_open(
return ret; return ret;
} }
int ObSSTableMultiVersionRowScanner::inner_get_next_row(const ObStoreRow*& row) int ObSSTableMultiVersionRowScanner::inner_get_next_row(const ObStoreRow *&row)
{ {
return iter_->get_next_row(row); return iter_->get_next_row(row);
} }
...@@ -240,21 +240,21 @@ void ObSSTableMultiVersionRowScanner::reuse() ...@@ -240,21 +240,21 @@ void ObSSTableMultiVersionRowScanner::reuse()
} }
int ObSSTableMultiVersionRowScanner::skip_range( int ObSSTableMultiVersionRowScanner::skip_range(
int64_t range_idx, const ObStoreRowkey* gap_rowkey, const bool include_gap_key) int64_t range_idx, const ObStoreRowkey *gap_rowkey, const bool include_gap_key)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObExtStoreRange* new_range = NULL; ObExtStoreRange *new_range = NULL;
if (OB_UNLIKELY(range_idx < 0 || NULL == gap_rowkey)) { if (OB_UNLIKELY(range_idx < 0 || NULL == gap_rowkey)) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(range_idx), KP(gap_rowkey)); LOG_WARN("invalid arguments", K(ret), K(range_idx), KP(gap_rowkey));
} else if (OB_FAIL(static_cast<ObSSTableRowScanner*>(iter_)->generate_new_range( } else if (OB_FAIL(static_cast<ObSSTableRowScanner *>(iter_)->generate_new_range(
range_idx, *gap_rowkey, include_gap_key, *range_, new_range))) { range_idx, *gap_rowkey, include_gap_key, *range_, new_range))) {
LOG_WARN("fail to generate new range", K(ret)); LOG_WARN("fail to generate new range", K(ret));
} else if (NULL != new_range) { } else if (NULL != new_range) {
if (OB_FAIL(ObVersionStoreRangeConversionHelper::range_to_multi_version_range( if (OB_FAIL(ObVersionStoreRangeConversionHelper::range_to_multi_version_range(
*new_range, trans_version_range_, *access_ctx_->allocator_, skip_range_))) { *new_range, trans_version_range_, *access_ctx_->allocator_, skip_range_))) {
LOG_WARN("fail to do range to multi version range", K(ret)); LOG_WARN("fail to do range to multi version range", K(ret));
} else if (OB_FAIL(static_cast<ObSSTableRowScanner*>(iter_)->skip_range_impl( } else if (OB_FAIL(static_cast<ObSSTableRowScanner *>(iter_)->skip_range_impl(
range_idx, multi_version_range_, skip_range_))) { range_idx, multi_version_range_, skip_range_))) {
LOG_WARN("fail to skip range impl", K(ret), K(range_idx)); LOG_WARN("fail to skip range impl", K(ret), K(range_idx));
} }
...@@ -263,12 +263,12 @@ int ObSSTableMultiVersionRowScanner::skip_range( ...@@ -263,12 +263,12 @@ int ObSSTableMultiVersionRowScanner::skip_range(
} }
int ObSSTableMultiVersionRowScanner::get_gap_end( int ObSSTableMultiVersionRowScanner::get_gap_end(
int64_t& range_idx, const common::ObStoreRowkey*& gap_key, int64_t& gap_size) int64_t &range_idx, const common::ObStoreRowkey *&gap_key, int64_t &gap_size)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (!read_newest_) { if (!read_newest_) {
// do nothing // do nothing
} else if (OB_FAIL(static_cast<ObSSTableRowScanner*>(iter_)->get_gap_end_impl( } else if (OB_FAIL(static_cast<ObSSTableRowScanner *>(iter_)->get_gap_end_impl(
multi_version_range_, gap_rowkey_, gap_size))) { multi_version_range_, gap_rowkey_, gap_size))) {
STORAGE_LOG(WARN, "fail to get gap end impl", K(ret)); STORAGE_LOG(WARN, "fail to get gap end impl", K(ret));
} else { } else {
...@@ -279,10 +279,10 @@ int ObSSTableMultiVersionRowScanner::get_gap_end( ...@@ -279,10 +279,10 @@ int ObSSTableMultiVersionRowScanner::get_gap_end(
} }
int ObSSTableMultiVersionRowMultiGetter::inner_open( int ObSSTableMultiVersionRowMultiGetter::inner_open(
const ObTableIterParam& iter_param, ObTableAccessContext& access_ctx, ObITable* table, const void* query_range) const ObTableIterParam &iter_param, ObTableAccessContext &access_ctx, ObITable *table, const void *query_range)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const ObColDescIArray* out_cols = nullptr; const ObColDescIArray *out_cols = nullptr;
if (OB_ISNULL(query_range) || OB_ISNULL(table)) { if (OB_ISNULL(query_range) || OB_ISNULL(table)) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), KP(table), KP(query_range)); LOG_WARN("invalid argument", K(ret), KP(table), KP(query_range));
...@@ -316,7 +316,7 @@ int ObSSTableMultiVersionRowMultiGetter::inner_open( ...@@ -316,7 +316,7 @@ int ObSSTableMultiVersionRowMultiGetter::inner_open(
return ret; return ret;
} }
int ObSSTableMultiVersionRowMultiGetter::inner_get_next_row(const ObStoreRow*& row) int ObSSTableMultiVersionRowMultiGetter::inner_get_next_row(const ObStoreRow *&row)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
row = NULL; row = NULL;
...@@ -360,8 +360,8 @@ int ObSSTableMultiVersionRowMultiGetter::inner_get_next_row(const ObStoreRow*& r ...@@ -360,8 +360,8 @@ int ObSSTableMultiVersionRowMultiGetter::inner_get_next_row(const ObStoreRow*& r
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("row is empty", K(ret), K(range_idx_), K(rowkeys_->at(range_idx_).get_store_rowkey())); LOG_WARN("row is empty", K(ret), K(range_idx_), K(rowkeys_->at(range_idx_).get_store_rowkey()));
} else { } else {
(const_cast<ObStoreRow*>(row))->scan_index_ = range_idx_; (const_cast<ObStoreRow *>(row))->scan_index_ = range_idx_;
(const_cast<ObStoreRow*>(row))->is_get_ = true; (const_cast<ObStoreRow *>(row))->is_get_ = true;
++range_idx_; ++range_idx_;
} }
} }
...@@ -384,7 +384,7 @@ void ObSSTableMultiVersionRowMultiGetter::reuse() ...@@ -384,7 +384,7 @@ void ObSSTableMultiVersionRowMultiGetter::reuse()
} }
int ObSSTableMultiVersionRowMultiScanner::inner_open( int ObSSTableMultiVersionRowMultiScanner::inner_open(
const ObTableIterParam& iter_param, ObTableAccessContext& access_ctx, ObITable* table, const void* query_range) const ObTableIterParam &iter_param, ObTableAccessContext &access_ctx, ObITable *table, const void *query_range)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_ISNULL(query_range) || OB_ISNULL(table)) { if (OB_ISNULL(query_range) || OB_ISNULL(table)) {
...@@ -393,7 +393,7 @@ int ObSSTableMultiVersionRowMultiScanner::inner_open( ...@@ -393,7 +393,7 @@ int ObSSTableMultiVersionRowMultiScanner::inner_open(
} else { } else {
iter_param_ = &iter_param; iter_param_ = &iter_param;
access_ctx_ = &access_ctx; access_ctx_ = &access_ctx;
sstable_ = static_cast<ObSSTable*>(table); sstable_ = static_cast<ObSSTable *>(table);
read_newest_ = access_ctx.trans_version_range_.snapshot_version_ >= sstable_->get_upper_trans_version() && read_newest_ = access_ctx.trans_version_range_.snapshot_version_ >= sstable_->get_upper_trans_version() &&
sstable_->has_compact_row(); sstable_->has_compact_row();
query_range_ = query_range; query_range_ = query_range;
...@@ -442,21 +442,21 @@ int ObSSTableMultiVersionRowMultiScanner::inner_open( ...@@ -442,21 +442,21 @@ int ObSSTableMultiVersionRowMultiScanner::inner_open(
} }
int ObSSTableMultiVersionRowMultiScanner::skip_range( int ObSSTableMultiVersionRowMultiScanner::skip_range(
int64_t range_idx, const ObStoreRowkey* gap_rowkey, const bool include_gap_key) int64_t range_idx, const ObStoreRowkey *gap_rowkey, const bool include_gap_key)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObExtStoreRange* new_range = NULL; ObExtStoreRange *new_range = NULL;
if (OB_UNLIKELY(range_idx < 0 || NULL == gap_rowkey)) { if (OB_UNLIKELY(range_idx < 0 || NULL == gap_rowkey)) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(range_idx), KP(gap_rowkey)); LOG_WARN("invalid arguments", K(ret), K(range_idx), KP(gap_rowkey));
} else if (OB_FAIL(static_cast<ObSSTableRowScanner*>(iter_)->generate_new_range( } else if (OB_FAIL(static_cast<ObSSTableRowScanner *>(iter_)->generate_new_range(
range_idx, *gap_rowkey, include_gap_key, ranges_->at(range_idx), new_range))) { range_idx, *gap_rowkey, include_gap_key, ranges_->at(range_idx), new_range))) {
LOG_WARN("fail to generate new range", K(ret)); LOG_WARN("fail to generate new range", K(ret));
} else if (NULL != new_range) { } else if (NULL != new_range) {
if (OB_FAIL(ObVersionStoreRangeConversionHelper::range_to_multi_version_range( if (OB_FAIL(ObVersionStoreRangeConversionHelper::range_to_multi_version_range(
*new_range, trans_version_range_, *access_ctx_->allocator_, skip_range_))) { *new_range, trans_version_range_, *access_ctx_->allocator_, skip_range_))) {
LOG_WARN("fail to do range to multi version range", K(ret)); LOG_WARN("fail to do range to multi version range", K(ret));
} else if (OB_FAIL(static_cast<ObSSTableRowScanner*>(iter_)->skip_range_impl( } else if (OB_FAIL(static_cast<ObSSTableRowScanner *>(iter_)->skip_range_impl(
range_idx, multi_version_ranges_.at(range_idx), skip_range_))) { range_idx, multi_version_ranges_.at(range_idx), skip_range_))) {
LOG_WARN("fail to skip range impl", K(ret), K(range_idx)); LOG_WARN("fail to skip range impl", K(ret), K(range_idx));
} }
...@@ -465,14 +465,14 @@ int ObSSTableMultiVersionRowMultiScanner::skip_range( ...@@ -465,14 +465,14 @@ int ObSSTableMultiVersionRowMultiScanner::skip_range(
} }
int ObSSTableMultiVersionRowMultiScanner::get_gap_end( int ObSSTableMultiVersionRowMultiScanner::get_gap_end(
int64_t& range_idx, const common::ObStoreRowkey*& gap_key, int64_t& gap_size) int64_t &range_idx, const common::ObStoreRowkey *&gap_key, int64_t &gap_size)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (!read_newest_) { if (!read_newest_) {
// do nothing // do nothing
} else if (OB_FAIL(static_cast<ObSSTableRowScanner*>(iter_)->get_gap_range_idx(range_idx))) { } else if (OB_FAIL(static_cast<ObSSTableRowScanner *>(iter_)->get_gap_range_idx(range_idx))) {
STORAGE_LOG(WARN, "fail to get gap range idx", K(ret)); STORAGE_LOG(WARN, "fail to get gap range idx", K(ret));
} else if (OB_FAIL(static_cast<ObSSTableRowScanner*>(iter_)->get_gap_end_impl( } else if (OB_FAIL(static_cast<ObSSTableRowScanner *>(iter_)->get_gap_end_impl(
multi_version_ranges_.at(range_idx), gap_rowkey_, gap_size))) { multi_version_ranges_.at(range_idx), gap_rowkey_, gap_size))) {
STORAGE_LOG(WARN, "fail to get gap end impl", K(ret)); STORAGE_LOG(WARN, "fail to get gap end impl", K(ret));
} else { } else {
...@@ -481,7 +481,7 @@ int ObSSTableMultiVersionRowMultiScanner::get_gap_end( ...@@ -481,7 +481,7 @@ int ObSSTableMultiVersionRowMultiScanner::get_gap_end(
return ret; return ret;
} }
int ObSSTableMultiVersionRowMultiScanner::inner_get_next_row(const ObStoreRow*& row) int ObSSTableMultiVersionRowMultiScanner::inner_get_next_row(const ObStoreRow *&row)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
row = NULL; row = NULL;
...@@ -539,8 +539,8 @@ int ObSSTableMultiVersionRowMultiScanner::inner_get_next_row(const ObStoreRow*& ...@@ -539,8 +539,8 @@ int ObSSTableMultiVersionRowMultiScanner::inner_get_next_row(const ObStoreRow*&
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("row is NULL", K(ret), K(range_idx_), K(ranges_->count())); LOG_WARN("row is NULL", K(ret), K(range_idx_), K(ranges_->count()));
} else { } else {
(const_cast<ObStoreRow*>(row))->scan_index_ = range_idx_; (const_cast<ObStoreRow *>(row))->scan_index_ = range_idx_;
(const_cast<ObStoreRow*>(row))->is_get_ = ranges_->at(range_idx_).is_single_rowkey(); (const_cast<ObStoreRow *>(row))->is_get_ = ranges_->at(range_idx_).is_single_rowkey();
range_idx_ = has_empty_range ? range_idx_ + 1 : range_idx_; range_idx_ = has_empty_range ? range_idx_ + 1 : range_idx_;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册