提交 00b7bf04 编写于 作者: O obdev 提交者: wangzelin.wzl

fix adjust_recursive_cte_plan function adjust shared operator bug

上级 4bdf7852
......@@ -3412,6 +3412,11 @@ int ObJoinOrder::add_path(Path* path)
} else {
bool should_add = true;
DominateRelation plan_rel = DominateRelation::OBJ_UNCOMPARABLE;
if (!path->is_cte_path() &&
path->contain_match_all_fake_cte() &&
!path->is_remote()) {
should_add = false;
}
for (int64_t i = interesting_paths_.count() - 1; OB_SUCC(ret) && should_add && i >= 0; --i) {
Path *cur_path = interesting_paths_.at(i);
if (OB_ISNULL(cur_path)) {
......@@ -3720,6 +3725,7 @@ int oceanbase::sql::Path::assign(const Path &other, common::ObIAllocator *alloca
location_type_ = other.location_type_;
contain_fake_cte_ = other.contain_fake_cte_;
contain_pw_merge_op_ = other.contain_pw_merge_op_;
contain_match_all_fake_cte_ = other.contain_match_all_fake_cte_;
contain_das_op_ = other.contain_das_op_;
parallel_ = other.parallel_;
server_cnt_ = other.server_cnt_;
......@@ -4829,6 +4835,8 @@ int JoinPath::compute_join_path_info()
contain_pw_merge_op_ = (left_path_->contain_pw_merge_op_ && !is_left_need_exchange()) ||
(right_path_->contain_pw_merge_op_ && !is_right_need_exchange()) ||
(join_algo_ == JoinAlgo::MERGE_JOIN && is_partition_wise());
contain_match_all_fake_cte_ = left_path_->contain_match_all_fake_cte_ ||
right_path_->contain_match_all_fake_cte_;
contain_das_op_ = left_path_->contain_das_op_ || right_path_->contain_das_op_;
}
return ret;
......@@ -5369,6 +5377,7 @@ void JoinPath::reuse()
location_type_ = ObPhyPlanType::OB_PHY_PLAN_UNINITIALIZED;
contain_fake_cte_ = false;
contain_pw_merge_op_ = false;
contain_match_all_fake_cte_ = false;
contain_das_op_ = false;
parallel_ = 1;
server_cnt_ = 1;
......@@ -5642,19 +5651,12 @@ int ObJoinOrder::param_funct_table_expr(ObRawExpr* &function_table_expr,
return ret;
}
int ObJoinOrder::generate_cte_table_paths()
int ObJoinOrder::create_one_cte_table_path(const TableItem* table_item,
ObShardingInfo *sharding)
{
int ret = OB_SUCCESS;
CteTablePath *ap = NULL;
const ObDMLStmt *stmt = NULL;
const TableItem *table_item = NULL;
if (OB_ISNULL(get_plan()) || OB_ISNULL(stmt = get_plan()->get_stmt()) || OB_ISNULL(allocator_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get unexpected null", K(get_plan()), K(stmt), K(allocator_), K(ret));
} else if (OB_ISNULL(table_item = stmt->get_table_item_by_id(table_id_))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret), K(table_id_));
} else if (OB_ISNULL(ap = reinterpret_cast<CteTablePath*>(allocator_->alloc(sizeof(CteTablePath))))) {
if (OB_ISNULL(ap = reinterpret_cast<CteTablePath*>(allocator_->alloc(sizeof(CteTablePath))))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("failed to allocate an AccessPath", K(ret));
} else {
......@@ -5666,7 +5668,9 @@ int ObJoinOrder::generate_cte_table_paths()
ap->ref_table_id_ = table_item->ref_id_;
ap->parent_ = this;
ap->contain_fake_cte_ = true;
ap->strong_sharding_ = get_plan()->get_optimizer_context().get_match_all_sharding();
ap->strong_sharding_ = sharding;
ap->contain_match_all_fake_cte_ = (table_item->is_recursive_union_fake_table_ &&
sharding->is_match_all());
if (OB_FAIL(append(ap->filter_, get_restrict_infos()))) {
LOG_WARN("failed to push back expr", K(ret));
} else if (OB_FAIL(ap->estimate_cost())) {
......@@ -5675,7 +5679,32 @@ int ObJoinOrder::generate_cte_table_paths()
LOG_WARN("failed to compute pipelined path", K(ret));
} else if (OB_FAIL(add_path(ap))) {
LOG_WARN("failed to add path", K(ret));
} else { /*do nothing*/ }
} else {
/* do nothing */
}
}
return ret;
}
int ObJoinOrder::generate_cte_table_paths()
{
int ret = OB_SUCCESS;
const ObDMLStmt *stmt = NULL;
const TableItem *table_item = NULL;
if (OB_ISNULL(get_plan()) || OB_ISNULL(stmt = get_plan()->get_stmt()) || OB_ISNULL(allocator_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get unexpected null", K(get_plan()), K(stmt), K(allocator_), K(ret));
} else if (OB_ISNULL(table_item = stmt->get_table_item_by_id(table_id_))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret), K(table_id_));
} else if (OB_FAIL(create_one_cte_table_path(table_item,
get_plan()->get_optimizer_context().get_match_all_sharding()))) {
LOG_WARN("failed to create one cte table path", K(ret));
} else if (table_item->is_recursive_union_fake_table_ &&
OB_FAIL(create_one_cte_table_path(table_item,
get_plan()->get_optimizer_context().get_local_sharding()))) {
LOG_WARN("failed to create one cte table path", K(ret));
}
return ret;
}
......@@ -6068,6 +6097,7 @@ int ObJoinOrder::compute_subquery_path_property(const uint64_t table_id,
path->location_type_ = root->get_location_type();
path->contain_fake_cte_ = root->get_contains_fake_cte();
path->contain_pw_merge_op_ = root->get_contains_pw_merge_op();
path->contain_match_all_fake_cte_ = root->get_contains_match_all_fake_cte();
path->contain_das_op_ = root->get_contains_das_op();
path->parallel_ = root->get_parallel();
path->server_cnt_ = root->get_server_cnt();
......
......@@ -311,6 +311,7 @@ struct EstimateCostInfo {
location_type_(ObPhyPlanType::OB_PHY_PLAN_UNINITIALIZED),
contain_fake_cte_(false),
contain_pw_merge_op_(false),
contain_match_all_fake_cte_(false),
contain_das_op_(false),
parallel_(1),
server_cnt_(1)
......@@ -336,6 +337,7 @@ struct EstimateCostInfo {
location_type_(ObPhyPlanType::OB_PHY_PLAN_UNINITIALIZED),
contain_fake_cte_(false),
contain_pw_merge_op_(false),
contain_match_all_fake_cte_(false),
contain_das_op_(false),
parallel_(1),
server_cnt_(1),
......@@ -410,6 +412,7 @@ struct EstimateCostInfo {
double get_path_output_rows() const;
bool contain_fake_cte() const { return contain_fake_cte_; }
bool contain_pw_merge_op() const { return contain_pw_merge_op_; }
bool contain_match_all_fake_cte() const { return contain_match_all_fake_cte_; }
bool is_pipelined_path() const { return is_pipelined_path_; }
bool is_nl_style_pipelined_path() const { return is_nl_style_pipelined_path_; }
virtual int compute_pipeline_info();
......@@ -481,6 +484,7 @@ struct EstimateCostInfo {
ObPhyPlanType location_type_;
bool contain_fake_cte_;
bool contain_pw_merge_op_;
bool contain_match_all_fake_cte_;
bool contain_das_op_;
// remember the parallel info to get this sharding
int64_t parallel_;
......@@ -1481,7 +1485,9 @@ struct NullAwareAntiJoinInfo {
int estimate_size_for_inner_subquery_path(double root_card,
const ObIArray<ObRawExpr*> &filters,
double &output_card);
int create_one_cte_table_path(const TableItem* table_item,
ObShardingInfo * sharding);
int generate_cte_table_paths();
int generate_function_table_paths();
......
......@@ -9889,6 +9889,20 @@ int ObLogPlan::add_candidate_plan(ObIArray<CandidatePlan> &current_plans,
int ret = OB_SUCCESS;
bool should_add = true;
DominateRelation plan_rel = DominateRelation::OBJ_UNCOMPARABLE;
if (OB_ISNULL(new_plan.plan_tree_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (new_plan.plan_tree_->get_type() == LOG_SET &&
static_cast<ObLogSet*>(new_plan.plan_tree_)->is_recursive_union()) {
ObLogicalOperator* right_child = new_plan.plan_tree_->get_child(ObLogicalOperator::second_child);
if (OB_ISNULL(right_child)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (right_child->get_contains_match_all_fake_cte() &&
!new_plan.plan_tree_->is_remote()) {
should_add = false;
}
}
for (int64_t i = current_plans.count() - 1;
OB_SUCC(ret) && should_add && i >= 0; --i) {
if (OB_FAIL(compute_plan_relationship(current_plans.at(i),
......@@ -10848,6 +10862,18 @@ int ObLogPlan::adjust_final_plan_info(ObLogicalOperator *&op)
}
}
if (OB_SUCC(ret) && op->get_type() == LOG_SET &&
static_cast<ObLogSet*>(op)->is_recursive_union()) {
ObLogicalOperator* right_child = NULL;
if (OB_UNLIKELY(2 != op->get_num_of_child()) ||
OB_ISNULL(right_child = op->get_child(ObLogicalOperator::second_child))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(op->get_name()));
} else if (OB_FAIL(allocate_material_for_recursive_cte_plan(right_child->get_child_list()))) {
LOG_WARN("faile to allocate material for recursive cte plan", K(ret));
}
}
if (OB_SUCC(ret)) {
if (op->is_plan_root() && OB_FAIL(op->set_plan_root_output_exprs())) {
LOG_WARN("failed to add plan root exprs", K(ret));
......@@ -12438,3 +12464,35 @@ int ObLogPlan::compute_subplan_filter_repartition_distribution_info(ObLogicalOpe
}
return ret;
}
int ObLogPlan::allocate_material_for_recursive_cte_plan(ObIArray<ObLogicalOperator*> &child_ops)
{
int ret = OB_SUCCESS;
ObLogPlan *log_plan = NULL;
int64_t fake_cte_pos = -1;
for (int64_t i = 0; OB_SUCC(ret) && fake_cte_pos == -1 && i < child_ops.count(); i++) {
if (OB_ISNULL(child_ops.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (child_ops.at(i)->get_contains_fake_cte()) {
fake_cte_pos = i;
} else { /*do nothing*/ }
}
if (OB_SUCC(ret) && fake_cte_pos != -1) {
for (int64_t i = 0; OB_SUCC(ret) && i < child_ops.count(); i++) {
if (OB_ISNULL(child_ops.at(i)) || OB_ISNULL(log_plan = child_ops.at(i)->get_plan())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (i == fake_cte_pos) {
if (OB_FAIL(SMART_CALL(allocate_material_for_recursive_cte_plan(child_ops.at(i)->get_child_list())))) {
LOG_WARN("failed to adjust recursive cte plan", K(ret));
} else { /*do nothing*/ }
} else if (log_op_def::LOG_MATERIAL != child_ops.at(i)->get_type() &&
log_op_def::LOG_TABLE_SCAN != child_ops.at(i)->get_type() &&
OB_FAIL(log_plan->allocate_material_as_top(child_ops.at(i)))) {
LOG_WARN("failed to allocate materialize as top", K(ret));
} else { /*do nothing*/ }
}
}
return ret;
}
\ No newline at end of file
......@@ -1333,6 +1333,8 @@ public:
int candi_allocate_material();
int allocate_material_for_recursive_cte_plan(ObIArray<ObLogicalOperator*> &child_ops);
protected:
ObColumnRefRawExpr *get_column_expr_by_id(uint64_t table_id, uint64_t column_id) const;
const ColumnItem *get_column_item_by_id(uint64_t table_id, uint64_t column_id) const;
......
......@@ -346,6 +346,7 @@ ObLogicalOperator::ObLogicalOperator(ObLogPlan &plan)
contain_fake_cte_(false),
contain_pw_merge_op_(false),
contain_das_op_(false),
contain_match_all_fake_cte_(false),
dup_table_pos_(),
strong_sharding_(NULL),
weak_sharding_(),
......@@ -799,6 +800,23 @@ int ObLogicalOperator::compute_op_other_info()
}
}
}
// compute contains fake cte match all sharding
if (OB_SUCC(ret)) {
if (get_type() == log_op_def::ObLogOpType::LOG_SET &&
static_cast<ObLogSet*>(this)->is_recursive_union()) {
/*do nothing*/
} else {
for (int64_t i = 0; OB_SUCC(ret) && !contain_match_all_fake_cte_ && i < get_num_of_child(); i++) {
if (OB_ISNULL(get_child(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else {
contain_match_all_fake_cte_ |= get_child(i)->get_contains_match_all_fake_cte();
}
}
}
}
// compute contains merge style op
if (OB_SUCC(ret)) {
......@@ -891,6 +909,7 @@ int ObLogicalOperator::compute_property(Path *path)
set_location_type(path->location_type_);
set_contains_fake_cte(path->contain_fake_cte_);
set_contains_pw_merge_op(path->contain_pw_merge_op_);
set_contains_match_all_fake_cte(path->contain_match_all_fake_cte_);
set_contains_das_op(path->contain_das_op_);
is_pipelined_plan_ = path->is_pipelined_path();
is_nl_style_pipelined_plan_ = path->is_nl_style_pipelined_path();
......
......@@ -1502,6 +1502,11 @@ public:
{
contain_pw_merge_op_ = contain_pw_merge_op;
}
inline bool get_contains_match_all_fake_cte() const { return contain_match_all_fake_cte_; }
inline void set_contains_match_all_fake_cte(bool contain_match_all_fake_cte)
{
contain_match_all_fake_cte_ = contain_match_all_fake_cte;
}
inline bool get_contains_das_op() const { return contain_das_op_; }
inline void set_contains_das_op(bool contain_das_op)
{
......@@ -1822,6 +1827,7 @@ protected:
bool contain_fake_cte_;
bool contain_pw_merge_op_;
bool contain_das_op_;
bool contain_match_all_fake_cte_;
common::ObSEArray<int64_t, 4, common::ModulePageAllocator, true> dup_table_pos_; // for duplicated table
ObShardingInfo *strong_sharding_;
common::ObSEArray<ObShardingInfo*, 8, common::ModulePageAllocator, true> weak_sharding_;
......
......@@ -2218,10 +2218,6 @@ int ObSelectLogPlan::create_recursive_union_all_plan(ObLogicalOperator *left_chi
} else if (is_basic) {
if (DistAlgo::DIST_BASIC_METHOD & set_dist_methods) {
dist_set_method = DistAlgo::DIST_BASIC_METHOD;
if (right_child->is_local() &&
OB_FAIL(adjust_recursive_cte_plan(right_child->get_child_list()))) {
LOG_WARN("failed to adjust recursive cte plan", K(ret));
}
}
} else if (DistAlgo::DIST_PULL_TO_LOCAL & set_dist_methods) {
// pull to local
......@@ -2229,9 +2225,6 @@ int ObSelectLogPlan::create_recursive_union_all_plan(ObLogicalOperator *left_chi
if (left_child->is_sharding()) {
left_exch_info.dist_method_ = ObPQDistributeMethod::LOCAL;
}
if (OB_FAIL(adjust_recursive_cte_plan(right_child->get_child_list()))) {
LOG_WARN("failed to adjust recurisve cte plan", K(ret));
} else { /*do nothing*/ }
}
if (OB_SUCC(ret) && DistAlgo::DIST_INVALID_METHOD != dist_set_method) {
if (OB_FAIL(ObOptimizerUtil::check_need_sort(order_items,
......@@ -2257,43 +2250,6 @@ int ObSelectLogPlan::create_recursive_union_all_plan(ObLogicalOperator *left_chi
return ret;
}
int ObSelectLogPlan::adjust_recursive_cte_plan(ObIArray<ObLogicalOperator*> &child_ops)
{
int ret = OB_SUCCESS;
ObLogPlan *log_plan = NULL;
ObExchangeInfo exch_info;
int64_t fake_cte_pos = -1;
for (int64_t i = 0; OB_SUCC(ret) && fake_cte_pos == -1 && i < child_ops.count(); i++) {
if (OB_ISNULL(child_ops.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (child_ops.at(i)->get_contains_fake_cte()) {
fake_cte_pos = i;
} else { /*do nothing*/ }
}
if (OB_SUCC(ret) && fake_cte_pos != -1) {
for (int64_t i = 0; OB_SUCC(ret) && i < child_ops.count(); i++) {
if (OB_ISNULL(child_ops.at(i)) || OB_ISNULL(log_plan = child_ops.at(i)->get_plan())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (i == fake_cte_pos) {
if (OB_FAIL(SMART_CALL(adjust_recursive_cte_plan(child_ops.at(i)->get_child_list())))) {
LOG_WARN("failed to adjust recursive cte plan", K(ret));
} else { /*do nothing*/ }
} else if (child_ops.at(i)->is_sharding() &&
OB_FAIL(log_plan->allocate_exchange_as_top(child_ops.at(i),
exch_info))) {
LOG_WARN("failed to allocate exchange", K(ret));
} else if (log_op_def::LOG_MATERIAL != child_ops.at(i)->get_type() &&
log_op_def::LOG_TABLE_SCAN != child_ops.at(i)->get_type() &&
OB_FAIL(log_plan->allocate_material_as_top(child_ops.at(i)))) {
LOG_WARN("failed to allocate materialize as top", K(ret));
} else { /*do nothing*/ }
}
}
return ret;
}
int ObSelectLogPlan::allocate_recursive_union_all_as_top(ObLogicalOperator *left_child,
ObLogicalOperator *right_child,
DistAlgo dist_set_method,
......
......@@ -245,8 +245,6 @@ private:
const bool ignore_hint,
ObLogicalOperator *&top);
int adjust_recursive_cte_plan(ObIArray<ObLogicalOperator*> &child_ops);
int allocate_recursive_union_all_as_top(ObLogicalOperator *left_child,
ObLogicalOperator *right_child,
DistAlgo dist_set_method,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册