提交 1817340c 编写于 作者: cslinbo's avatar cslinbo 提交者: ob-robot

bugfix: fix two phase fts index merge

上级 32fb3b34
......@@ -24,7 +24,7 @@ namespace sql
int ObDASIndexMergeAndIter::inner_init(ObDASIterParam &param)
{
int ret = OB_SUCCESS;
if (ObDASIterType::DAS_ITER_INDEX_MERGE != param.type_) {
if (ObDASIterType::DAS_ITER_INDEX_MERGE != param.type_ && ObDASIterType::DAS_ITER_TWO_PHASE_INDEX_MERGE != param.type_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("inner init das iter with bad param type", K(param));
} else {
......
......@@ -42,7 +42,7 @@ int ObDASIndexMergeFTSAndIter::inner_init(ObDASIterParam &param)
offset_ = fts_and_iter_param.offset_;
pushdown_topk_iter_ = fts_and_iter_param.pushdown_topk_iter_;
pushdown_topk_iter_tree_ = fts_and_iter_param.pushdown_topk_iter_tree_;
first_fts_idx_ = fts_and_iter_param.first_fts_idx_;
pushdown_topk_iter_idx_ = fts_and_iter_param.pushdown_topk_iter_idx_;
for (int64_t i = 0; OB_SUCC(ret) && i < relevance_exprs_.count(); i++) {
ObDASIter *child = child_iters_.at(i);
......@@ -264,7 +264,7 @@ int ObDASIndexMergeFTSAndIter::inner_release()
if (OB_FAIL(ObDASIndexMergeAndIter::inner_release())) {
LOG_WARN("failed to release index merge and iter", K(ret));
} else if (OB_FAIL(pushdown_topk_iter_tree_->release())) {
} else if (OB_NOT_NULL(pushdown_topk_iter_tree_) && OB_FAIL(pushdown_topk_iter_tree_->release())) {
LOG_WARN("failed to release pushdown topk iter tree", K(ret));
} else {
result_items_ = nullptr;
......@@ -332,7 +332,7 @@ int ObDASIndexMergeFTSAndIter::fill_other_child_stores(int64_t capacity)
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < fts_index_idxs_.count(); i++) {
if (fts_index_idxs_.at(i) == first_fts_idx_) {
if (fts_index_idxs_.at(i) == pushdown_topk_iter_idx_) {
// skip
} else if (OB_FAIL(fill_one_child_stores(capacity, fts_index_idxs_.at(i), child_iters_.at(fts_index_idxs_.at(i))))) {
LOG_WARN("failed to fill one child stores", K(ret));
......@@ -474,7 +474,7 @@ int ObDASIndexMergeFTSAndIter::filter_fts_result_by_other_index_vectorized(
for (int64_t i = 0; OB_SUCC(ret) && (cmp_ret == 0) &&
child_empty_count_ <= 0 &&
i < child_cnt && fts_row_idx < actual_top_n; i++) {
if (i == first_fts_idx_) {
if (i == pushdown_topk_iter_idx_) {
// skip
} else {
IndexMergeRowStore &child_store = child_stores_.at(i);
......@@ -518,7 +518,7 @@ int ObDASIndexMergeFTSAndIter::filter_fts_result_by_other_index_vectorized(
if (OB_SUCC(ret) && cmp_ret == 0) {
for (int64_t i = 0; OB_SUCC(ret) && i < child_cnt; i++) {
if (i == first_fts_idx_) {
if (i == pushdown_topk_iter_idx_) {
if (OB_FAIL(item.row_buffer_->to_expr(1))) {
LOG_WARN("failed to to expr", K(ret));
}
......@@ -573,14 +573,14 @@ int ObDASIndexMergeFTSAndIter::get_topn_fts_result_vectorized(
ObMinRelevanceHeap &heap)
{
int ret = OB_SUCCESS;
IndexMergeRowStore &child_store = child_stores_.at(first_fts_idx_);
IndexMergeRowStore &child_store = child_stores_.at(pushdown_topk_iter_idx_);
while (OB_SUCC(ret) && child_empty_count_ <= 0) {
clear_evaluated_flag();
pushdown_topk_iter_tree_->clear_evaluated_flag();
child_store.reuse();
if (OB_FAIL(fill_one_child_stores(capacity, first_fts_idx_, pushdown_topk_iter_tree_))) {
if (OB_FAIL(fill_one_child_stores(capacity, pushdown_topk_iter_idx_, pushdown_topk_iter_tree_))) {
LOG_WARN("failed to fill child stores", K(ret));
} else if (child_empty_count_ > 0) {
ret = OB_ITER_END;
......
......@@ -124,7 +124,7 @@ struct ObDASIndexMergeFTSAndIterParam : public ObDASIndexMergeIterParam
relevance_exprs_(),
pushdown_topk_iter_(nullptr),
pushdown_topk_iter_tree_(nullptr),
first_fts_idx_(OB_INVALID_INDEX)
pushdown_topk_iter_idx_(OB_INVALID_INDEX)
{}
int64_t limit_;
......@@ -132,7 +132,7 @@ struct ObDASIndexMergeFTSAndIterParam : public ObDASIndexMergeIterParam
common::ObSEArray<ObExpr*, 16> relevance_exprs_;
ObDASTRMergeIter *pushdown_topk_iter_;
ObDASIter *pushdown_topk_iter_tree_;
int64_t first_fts_idx_;
int64_t pushdown_topk_iter_idx_;
};
class ObDASIndexMergeFTSAndIter : public ObDASIndexMergeAndIter
......@@ -150,7 +150,7 @@ public:
pushdown_topk_iter_(nullptr),
pushdown_topk_iter_tree_(nullptr),
pushdown_topk_(0),
first_fts_idx_(OB_INVALID_INDEX),
pushdown_topk_iter_idx_(OB_INVALID_INDEX),
pushdown_topk_iter_first_scan_(true),
fts_index_idxs_(),
normal_index_idxs_(),
......@@ -163,7 +163,7 @@ public:
virtual ~ObDASIndexMergeFTSAndIter() {}
int64_t get_first_fts_idx() const { return first_fts_idx_; }
int64_t get_pushdown_topk_iter_idx() const { return pushdown_topk_iter_idx_; }
ObDASIter *get_pushdown_topk_iter_tree() const { return pushdown_topk_iter_tree_; }
protected:
......@@ -210,7 +210,7 @@ private:
ObDASTRMergeIter *pushdown_topk_iter_;
ObDASIter *pushdown_topk_iter_tree_;
int64_t pushdown_topk_;
int64_t first_fts_idx_;
int64_t pushdown_topk_iter_idx_;
bool pushdown_topk_iter_first_scan_;
common::ObSEArray<uint64_t, 16> fts_index_idxs_;
......
......@@ -299,7 +299,7 @@ void ObDASIndexMergeIter::MergeResultBuffer::reset()
int ObDASIndexMergeIter::inner_init(ObDASIterParam &param)
{
int ret = OB_SUCCESS;
if (ObDASIterType::DAS_ITER_INDEX_MERGE != param.type_) {
if (ObDASIterType::DAS_ITER_INDEX_MERGE != param.type_ && ObDASIterType::DAS_ITER_TWO_PHASE_INDEX_MERGE != param.type_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("inner init das iter with bad param type", K(param));
} else {
......
......@@ -43,6 +43,7 @@ enum ObDASIterType : uint32_t
DAS_ITER_SPIV_MERGE,
DAS_ITER_SPIV_SCAN,
DAS_ITER_ES_MATCH,
DAS_ITER_TWO_PHASE_INDEX_MERGE,
// append DASIterType before me
DAS_ITER_MAX
};
......
......@@ -475,7 +475,7 @@ int ObDASIterUtils::set_index_merge_related_ids(const ObDASBaseCtDef *attach_ctd
break;
}
case ObDASOpType::DAS_OP_INDEX_MERGE: {
if (OB_UNLIKELY(iter_type != ObDASIterType::DAS_ITER_INDEX_MERGE)) {
if (OB_UNLIKELY(iter_type != ObDASIterType::DAS_ITER_INDEX_MERGE && iter_type != ObDASIterType::DAS_ITER_TWO_PHASE_INDEX_MERGE)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("iter type not match with ctdef", K(ret), K(attach_ctdef->op_type_), K(iter_type));
} else {
......@@ -668,7 +668,7 @@ int ObDASIterUtils::set_func_lookup_iter_related_ids(const ObDASBaseCtDef *attac
break;
}
case ObDASOpType::DAS_OP_INDEX_MERGE: {
if (OB_UNLIKELY(iter_type != ObDASIterType::DAS_ITER_INDEX_MERGE)) {
if (OB_UNLIKELY(iter_type != ObDASIterType::DAS_ITER_INDEX_MERGE && iter_type != ObDASIterType::DAS_ITER_TWO_PHASE_INDEX_MERGE)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("iter type not match with ctdef", K(ret), K(attach_ctdef->op_type_), K(iter_type));
} else {
......@@ -3105,7 +3105,7 @@ int ObDASIterUtils::create_index_merge_tree_common(ObTableScanParam &scan_param,
ObArray<ObDASScanIter*> child_scan_iters;
ObArray<ObDASScanRtDef*> child_scan_rtdefs;
int64_t children_cnt = ctdef->children_cnt_;
int64_t first_fts_idx = OB_INVALID_INDEX;
int64_t pushdown_topk_iter_idx = OB_INVALID_INDEX;
bool lookup_non_ror_opt = merge_ctdef->main_scan_ctdef_ != nullptr;
bool forbidden_pushdown_topk = (limit != -1);
......@@ -3135,8 +3135,8 @@ int ObDASIterUtils::create_index_merge_tree_common(ObTableScanParam &scan_param,
// skip non-ror scan iter (except the first one)
continue;
} else if (limit != -1 && merge_ctdef->merge_node_types_.at(i) == INDEX_MERGE_FTS_INDEX) {
if (OB_INVALID_INDEX == first_fts_idx) {
first_fts_idx = i;
if (OB_INVALID_INDEX == pushdown_topk_iter_idx) {
pushdown_topk_iter_idx = i;
}
}
......@@ -3183,9 +3183,9 @@ int ObDASIterUtils::create_index_merge_tree_common(ObTableScanParam &scan_param,
ObDASScanIter *child_scan_iter = nullptr;
ObDASScanRtDef *child_scan_rtdef = nullptr;
ObDASIter *pushdown_topk_iter = nullptr;
if (OB_INVALID_INDEX == first_fts_idx) {
if (OB_INVALID_INDEX == pushdown_topk_iter_idx) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected first_fts_idx", K(ret));
LOG_WARN("unexpected pushdown topk iter idx", K(ret));
} else if (OB_FAIL(create_index_merge_child_tree(scan_param,
alloc,
merge_ctdef,
......@@ -3193,7 +3193,7 @@ int ObDASIterUtils::create_index_merge_tree_common(ObTableScanParam &scan_param,
related_tablet_ids,
tx_desc,
snapshot,
first_fts_idx,
pushdown_topk_iter_idx,
lookup_non_ror_opt,
child_iter,
child_scan_iter,
......@@ -3212,6 +3212,7 @@ int ObDASIterUtils::create_index_merge_tree_common(ObTableScanParam &scan_param,
pushdown_topk_mode = true;
ObDASIndexMergeFTSAndIterParam merge_param;
merge_param.type_ = DAS_ITER_TWO_PHASE_INDEX_MERGE;
merge_param.max_size_ = merge_rtdef->eval_ctx_->is_vectorized() ? merge_rtdef->eval_ctx_->max_batch_size_ : 1;
merge_param.eval_ctx_ = merge_rtdef->eval_ctx_;
merge_param.exec_ctx_ = &merge_rtdef->eval_ctx_->exec_ctx_;
......@@ -3230,7 +3231,7 @@ int ObDASIterUtils::create_index_merge_tree_common(ObTableScanParam &scan_param,
merge_param.offset_ = offset;
merge_param.pushdown_topk_iter_ = static_cast<ObDASTRMergeIter*>(pushdown_topk_iter);
merge_param.pushdown_topk_iter_tree_ = child_iter;
merge_param.first_fts_idx_ = first_fts_idx;
merge_param.pushdown_topk_iter_idx_ = pushdown_topk_iter_idx;
for (int64_t i = 0; OB_SUCC(ret) && i < children_cnt; ++i) {
if (merge_ctdef->merge_node_types_.at(i) == INDEX_MERGE_SCAN) {
......
......@@ -989,48 +989,8 @@ int ObDASScanOp::reuse_iter()
if (OB_FAIL(ObDASIterUtils::set_index_merge_related_ids(
attach_ctdef_, attach_rtdef_, tablet_ids_, ls_id_, result_iter))) {
LOG_WARN("failed to set index merge related ids", K(ret));
} else {
const bool need_lookup = (attach_ctdef_->op_type_ == ObDASOpType::DAS_OP_TABLE_LOOKUP) ||
(attach_ctdef_->op_type_ == ObDASOpType::DAS_OP_INDEX_PROJ_LOOKUP);
const ObDASBaseCtDef *index_merge_ctdef = need_lookup ? attach_ctdef_->children_[0] : attach_ctdef_;
ObDASBaseRtDef *index_merge_rtdef = need_lookup ? attach_rtdef_->children_[0] : attach_rtdef_;
bool fts_index_merge_and_opt = false;
if (OB_FAIL(ObDASIterUtils::check_fts_index_merge_and_opt(scan_ctdef_, scan_rtdef_, index_merge_ctdef, index_merge_rtdef, fts_index_merge_and_opt))) {
LOG_WARN("failed to check fts index merge and opt", K(ret));
} else if (fts_index_merge_and_opt) {
ObDASIndexMergeFTSAndIter *fts_and_iter = nullptr;
if (need_lookup) {
fts_and_iter = static_cast<ObDASIndexMergeFTSAndIter *>(result_iter->get_children()[0]);
} else {
fts_and_iter = static_cast<ObDASIndexMergeFTSAndIter *>(result_iter);
}
if (OB_ISNULL(fts_and_iter)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected nullptr", K(ret));
} else {
int64_t first_fts_idx = fts_and_iter->get_first_fts_idx();
const ObDASBaseCtDef *child_ctdef = nullptr;
ObDASBaseRtDef *child_rtdef = nullptr;
if (need_lookup) {
child_ctdef = attach_ctdef_->children_[0]->children_[first_fts_idx];
child_rtdef = attach_rtdef_->children_[0]->children_[first_fts_idx];
} else {
child_ctdef = attach_ctdef_->children_[first_fts_idx];
child_rtdef = attach_rtdef_->children_[first_fts_idx];
}
if (OB_ISNULL(child_ctdef) || OB_ISNULL(child_rtdef)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected nullptr", K(ret));
} else if (OB_FAIL(ObDASIterUtils::set_index_merge_related_ids(child_ctdef,
child_rtdef,
tablet_ids_,
ls_id_,
fts_and_iter->get_pushdown_topk_iter_tree()))) {
LOG_WARN("failed to set index merge related ids", K(ret));
}
}
}
} else if (OB_FAIL(try_set_pushdown_topk_related_ids(result_iter))) {
LOG_WARN("failed to set pushdown topk related ids", K(ret));
}
break;
}
......@@ -1085,6 +1045,67 @@ int ObDASScanOp::reuse_iter()
return ret;
}
int ObDASScanOp::try_set_pushdown_topk_related_ids(ObDASIter *result_iter)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(result_iter)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected nullptr result_iter", K(ret));
} else if (OB_ISNULL(attach_ctdef_) || OB_ISNULL(attach_rtdef_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected nullptr attach_ctdef or attach_rtdef", K(ret));
} else {
const bool need_lookup = (attach_ctdef_->op_type_ == ObDASOpType::DAS_OP_TABLE_LOOKUP) ||
(attach_ctdef_->op_type_ == ObDASOpType::DAS_OP_INDEX_PROJ_LOOKUP);
const ObDASBaseCtDef *index_merge_ctdef = need_lookup ? attach_ctdef_->children_[0] : attach_ctdef_;
ObDASBaseRtDef *index_merge_rtdef = need_lookup ? attach_rtdef_->children_[0] : attach_rtdef_;
ObDASIndexMergeFTSAndIter *two_phase_index_merge_iter = nullptr;
if (need_lookup) {
if (ObDASIterType::DAS_ITER_TWO_PHASE_INDEX_MERGE == result_iter->get_children()[0]->get_type()) {
two_phase_index_merge_iter = static_cast<ObDASIndexMergeFTSAndIter *>(result_iter->get_children()[0]);
} else if (ObDASIterType::DAS_ITER_INDEX_MERGE == result_iter->get_type()) {
two_phase_index_merge_iter = static_cast<ObDASIndexMergeFTSAndIter *>(result_iter);
}
}
if (OB_ISNULL(two_phase_index_merge_iter)) {
// do nothing
} else {
int64_t pushdown_topk_iter_idx = two_phase_index_merge_iter->get_pushdown_topk_iter_idx();
ObDASIter *pushdown_topk_iter_tree = two_phase_index_merge_iter->get_pushdown_topk_iter_tree();
const ObDASBaseCtDef *child_ctdef = nullptr;
ObDASBaseRtDef *child_rtdef = nullptr;
if (need_lookup) {
child_ctdef = attach_ctdef_->children_[0]->children_[pushdown_topk_iter_idx];
child_rtdef = attach_rtdef_->children_[0]->children_[pushdown_topk_iter_idx];
} else {
child_ctdef = attach_ctdef_->children_[pushdown_topk_iter_idx];
child_rtdef = attach_rtdef_->children_[pushdown_topk_iter_idx];
}
if (OB_ISNULL(child_ctdef) || OB_ISNULL(child_rtdef) || OB_ISNULL(pushdown_topk_iter_tree)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected nullptr", K(ret));
} else if (ObDASOpType::DAS_OP_SORT == child_ctdef->op_type_ && ObDASIterType::DAS_ITER_SORT != pushdown_topk_iter_tree->get_type()) {
child_ctdef = child_ctdef->children_[0];
child_rtdef = child_rtdef->children_[0];
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ObDASIterUtils::set_index_merge_related_ids(child_ctdef,
child_rtdef,
tablet_ids_,
ls_id_,
pushdown_topk_iter_tree))) {
LOG_WARN("failed to set index merge related ids", K(ret));
}
}
}
return ret;
}
const ExprFixedArray &ObDASScanOp::get_result_outputs() const
{
const ExprFixedArray *result_output = nullptr;
......
......@@ -467,6 +467,7 @@ protected:
common::ObITabletScan &get_tsc_service();
common::ObNewRowIterator *get_output_result_iter() { return result_; }
ObDASIterTreeType get_iter_tree_type() const;
int try_set_pushdown_topk_related_ids(ObDASIter *result_iter);
public:
ObSEArray<ObDatum *, 4> trans_info_array_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册