提交 a501ceac 编写于 作者: O obdev 提交者: ob-robot

[CP] Fix startup filter post bug(3.1_opensource < 3_2_x)

上级 a6956003
......@@ -2095,15 +2095,60 @@ int ObLogJoin::allocate_startup_expr_post()
}
} else if (LEFT_OUTER_JOIN == join_type_ || CONNECT_BY_JOIN == join_type_ || LEFT_SEMI_JOIN == join_type_ ||
LEFT_ANTI_JOIN == join_type_) {
if (OB_FAIL(ObLogicalOperator::allocate_startup_expr_post(first_child))) {
if (OB_FAIL(allocate_startup_expr_post(first_child))) {
LOG_WARN("failed to allocate startup expr post", K(ret));
}
} else if (RIGHT_OUTER_JOIN == join_type_ || RIGHT_SEMI_JOIN == join_type_ || RIGHT_ANTI_JOIN == join_type_) {
if (OB_FAIL(ObLogicalOperator::allocate_startup_expr_post(second_child))) {
if (OB_FAIL(allocate_startup_expr_post(second_child))) {
LOG_WARN("failed to allocate startup expr post", K(ret));
}
} else if (FULL_OUTER_JOIN == join_type_) {
// do nothing
}
return ret;
}
int ObLogJoin::allocate_startup_expr_post(int64_t child_idx)
{
int ret = OB_SUCCESS;
ObLogicalOperator *child = get_child(child_idx);
if (OB_ISNULL(child)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null child", K(ret));
} else if (child->get_startup_exprs().empty()) {
// do nothing
} else {
ObSEArray<ObRawExpr *, 4> non_startup_exprs, new_startup_exprs;
ObIArray<ObRawExpr *> &startup_exprs = child->get_startup_exprs();
for (int64_t i = 0; OB_SUCC(ret) && i < startup_exprs.count(); ++i) {
if (OB_ISNULL(startup_exprs.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null expr", K(ret));
} else if (log_op_def::LOG_COUNT == child->get_type() && startup_exprs.at(i)->has_flag(CNT_ROWNUM)) {
if (OB_FAIL(non_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("fail to push back non startup expr", K(ret));
}
} else if (startup_exprs.at(i)->has_flag(CNT_EXEC_PARAM)) {
bool found = false;
if (is_nlj_with_param_down() &&
OB_FAIL(ObOptimizerUtil::check_contain_my_exec_param(startup_exprs.at(i), get_nl_params(), found))) {
LOG_WARN("fail to check if contain my exec param");
} else if (found && OB_FAIL(non_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("fail to push back non startup expr", K(ret));
} else if (!found && OB_FAIL(new_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("fail to push back non startup expr", K(ret));
}
} else if (OB_FAIL(new_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("failed to push back expr", K(ret));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(append_array_no_dup(get_startup_exprs(), new_startup_exprs))) {
LOG_WARN("failed to add startup exprs", K(ret));
} else if (OB_FAIL(child->get_startup_exprs().assign(non_startup_exprs))) {
LOG_WARN("failed to assign exprs", K(ret));
}
}
}
return ret;
}
\ No newline at end of file
......@@ -336,6 +336,7 @@ private:
return SM_NONE != slave_mapping_type_;
}
int allocate_startup_expr_post() override;
int allocate_startup_expr_post(int64_t child_idx) override;
private:
// all join predicates
common::ObSEArray<ObRawExpr*, 8, common::ModulePageAllocator, true> join_conditions_; // equal join condition, for
......
......@@ -710,8 +710,59 @@ int ObLogSubPlanFilter::compute_one_row_info()
int ObLogSubPlanFilter::allocate_startup_expr_post()
{
int ret = OB_SUCCESS;
if (OB_FAIL(ObLogicalOperator::allocate_startup_expr_post(first_child))) {
if (OB_FAIL(allocate_startup_expr_post(first_child))) {
LOG_WARN("failed to allocate startup expr post", K(ret));
}
return ret;
}
\ No newline at end of file
}
int ObLogSubPlanFilter::allocate_startup_expr_post(int64_t child_idx)
{
int ret = OB_SUCCESS;
ObLogicalOperator *child = get_child(child_idx);
if (OB_ISNULL(child)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null child", K(ret));
} else if (child->get_startup_exprs().empty()) {
// do nothing
} else {
ObSEArray<ObRawExpr *, 4> non_startup_exprs, new_startup_exprs;
ObIArray<ObRawExpr *> &startup_exprs = child->get_startup_exprs();
ObSEArray<std::pair<int64_t, ObRawExpr *>, 8> my_exec_params;
if (OB_FAIL(my_exec_params.assign(onetime_exprs_))) {
LOG_WARN("fail to push back onetime exprs", K(ret));
} else if (OB_FAIL(append(my_exec_params, exec_params_))) {
LOG_WARN("fail to push back exec param exprs", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < startup_exprs.count(); ++i) {
if (OB_ISNULL(startup_exprs.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null expr", K(ret));
} else if (log_op_def::LOG_COUNT == child->get_type() && startup_exprs.at(i)->has_flag(CNT_ROWNUM)) {
if (OB_FAIL(non_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("fail to push back non startup expr", K(ret));
}
} else if (startup_exprs.at(i)->has_flag(CNT_EXEC_PARAM)) {
bool found = false;
if (!my_exec_params.empty() &&
OB_FAIL(ObOptimizerUtil::check_contain_my_exec_param(startup_exprs.at(i), my_exec_params, found))) {
LOG_WARN("fail to check if contain onetime exec param", K(ret));
} else if (found && OB_FAIL(non_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("fail to push back non startup expr", K(ret));
} else if (!found && OB_FAIL(new_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("fail to push back non startup expr", K(ret));
}
} else if (OB_FAIL(new_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("failed to push back expr", K(ret));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(append_array_no_dup(get_startup_exprs(), new_startup_exprs))) {
LOG_WARN("failed to add startup exprs", K(ret));
} else if (OB_FAIL(child->get_startup_exprs().assign(non_startup_exprs))) {
LOG_WARN("failed to assign exprs", K(ret));
}
}
}
return ret;
}
......@@ -127,6 +127,7 @@ public:
int allocate_granule_post(AllocGIContext& ctx) override;
virtual int compute_one_row_info() override;
int allocate_startup_expr_post() override;
int allocate_startup_expr_post(int64_t child_idx) override;
protected:
common::ObSEArray<std::pair<int64_t, ObRawExpr*>, 8, common::ModulePageAllocator, true> exec_params_;
common::ObSEArray<std::pair<int64_t, ObRawExpr*>, 8, common::ModulePageAllocator, true> onetime_exprs_;
......
......@@ -6824,8 +6824,7 @@ int ObLogicalOperator::allocate_startup_expr_post(int64_t child_idx)
if (OB_ISNULL(startup_exprs.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null expr", K(ret));
} else if (startup_exprs.at(i)->has_flag(CNT_ROWNUM) ||
startup_exprs.at(i)->has_flag(CNT_EXEC_PARAM)) {
} else if (startup_exprs.at(i)->has_flag(CNT_ROWNUM)) {
if (OB_FAIL(non_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("failed to push back expr", K(ret));
}
......@@ -6834,8 +6833,7 @@ int ObLogicalOperator::allocate_startup_expr_post(int64_t child_idx)
}
}
if (OB_SUCC(ret)) {
if (get_startup_exprs().empty() &&
OB_FAIL(add_startup_exprs(new_startup_exprs))) {
if (OB_FAIL(append_array_no_dup(get_startup_exprs(), new_startup_exprs))) {
LOG_WARN("failed to add startup exprs", K(ret));
} else {
bool mark_exchange_out = false;
......
......@@ -1799,7 +1799,7 @@ public:
virtual int set_exchange_cnt_post(AdjustSortContext* ctx);
virtual int allocate_link_post();
virtual int allocate_startup_expr_post();
int allocate_startup_expr_post(int64_t child_idx);
virtual int allocate_startup_expr_post(int64_t child_idx);
int allocate_link_node_above(int64_t child_idx);
virtual int generate_link_sql_pre(GenLinkStmtContext& link_ctx);
......
......@@ -6115,3 +6115,35 @@ int ObOptimizerUtil::check_pushdown_filter_to_base_table(ObLogPlan& plan, const
LOG_TRACE("check pushdown filter to tables", K(table_id), K(can_pushdown));
return ret;
}
int ObOptimizerUtil::check_contain_my_exec_param(
ObRawExpr *expr, const common::ObIArray<std::pair<int64_t, ObRawExpr *>> &my_exec_params, bool &contain)
{
int ret = OB_SUCCESS;
bool is_stack_overflow = false;
contain = false;
if (OB_ISNULL(expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null expr", K(ret));
} else if (OB_FAIL(check_stack_overflow(is_stack_overflow))) {
LOG_WARN("check stack overflow failed", K(ret));
} else if (is_stack_overflow) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("too deep recursive", K(ret));
} else if (!expr->has_flag(CNT_EXEC_PARAM)) {
// do nothing
} else if (expr->has_flag(IS_EXEC_PARAM)) {
const ObConstRawExpr *const_expr = static_cast<const ObConstRawExpr *>(expr);
int64_t param = const_expr->get_value().get_unknown();
contain = NULL != find_exec_param(my_exec_params, param);
} else if (expr->is_set_op_expr() || expr->is_query_ref_expr() || expr->is_column_ref_expr()) {
// do nothing
} else {
for (int64_t i = 0; !contain && OB_SUCC(ret) && i < expr->get_param_count(); ++i) {
if (OB_FAIL(SMART_CALL(check_contain_my_exec_param(expr->get_param_expr(i), my_exec_params, contain)))) {
LOG_WARN("failed to check contain batch stmt parameter", K(ret));
}
}
}
return ret;
}
......@@ -676,6 +676,9 @@ public:
static int check_pushdown_filter_to_base_table(ObLogPlan& plan, const uint64_t table_id,
const ObIArray<ObRawExpr*>& pushdown_filters, const ObIArray<ObRawExpr*>& restrict_infos, bool& can_pushdown);
static int check_contain_my_exec_param(
ObRawExpr *expr, const common::ObIArray<std::pair<int64_t, ObRawExpr *>> &my_exec_params, bool &contain);
private:
// disallow construct
ObOptimizerUtil();
......
......@@ -2514,9 +2514,9 @@ Outputs & filters:
access([t1.c1]), partitions(p0)
2 - output([T_FUN_MAX(VIEW1.c1)]), filter(nil),
group(nil), agg_func([T_FUN_MAX(VIEW1.c1)])
3 - output([VIEW1.c1]), filter(nil),
3 - output([VIEW1.c1]), filter(nil), startup_filter([?]),
access([VIEW1.c1])
4 - output([t1.c1]), filter(nil), startup_filter([?]),
4 - output([t1.c1]), filter(nil),
access([t1.c1]), partitions(p0),
limit(1), offset(nil)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册