提交 7ffa8665 编写于 作者: O obdev 提交者: wangzelin.wzl

fix partial partition wise merge join bug

上级 2e11976f
......@@ -3719,7 +3719,7 @@ int oceanbase::sql::Path::assign(const Path &other, common::ObIAllocator *alloca
phy_plan_type_ = other.phy_plan_type_;
location_type_ = other.location_type_;
contain_fake_cte_ = other.contain_fake_cte_;
contain_merge_op_ = other.contain_merge_op_;
contain_pw_merge_op_ = other.contain_pw_merge_op_;
contain_das_op_ = other.contain_das_op_;
parallel_ = other.parallel_;
server_cnt_ = other.server_cnt_;
......@@ -4826,8 +4826,9 @@ int JoinPath::compute_join_path_info()
LOG_WARN("get unexpected null", K(left_path_), K(right_path_), K(ret));
} else {
contain_fake_cte_ = left_path_->contain_fake_cte_ || right_path_->contain_fake_cte_;
contain_merge_op_ = left_path_->contain_merge_op_ || right_path_->contain_merge_op_ ||
(join_algo_ == JoinAlgo::MERGE_JOIN);
contain_pw_merge_op_ = (left_path_->contain_pw_merge_op_ && !is_left_need_exchange()) ||
(right_path_->contain_pw_merge_op_ && !is_right_need_exchange()) ||
(join_algo_ == JoinAlgo::MERGE_JOIN && is_partition_wise());
contain_das_op_ = left_path_->contain_das_op_ || right_path_->contain_das_op_;
}
return ret;
......@@ -5367,7 +5368,7 @@ void JoinPath::reuse()
phy_plan_type_ = ObPhyPlanType::OB_PHY_PLAN_UNINITIALIZED;
location_type_ = ObPhyPlanType::OB_PHY_PLAN_UNINITIALIZED;
contain_fake_cte_ = false;
contain_merge_op_ = false;
contain_pw_merge_op_ = false;
contain_das_op_ = false;
parallel_ = 1;
server_cnt_ = 1;
......@@ -6075,7 +6076,7 @@ int ObJoinOrder::compute_subquery_path_property(const uint64_t table_id,
path->phy_plan_type_ = root->get_phy_plan_type();
path->location_type_ = root->get_location_type();
path->contain_fake_cte_ = root->get_contains_fake_cte();
path->contain_merge_op_ = root->get_contains_merge_op();
path->contain_pw_merge_op_ = root->get_contains_pw_merge_op();
path->contain_das_op_ = root->get_contains_das_op();
path->parallel_ = root->get_parallel();
path->server_cnt_ = root->get_server_cnt();
......@@ -7309,11 +7310,8 @@ bool ObJoinOrder::is_partition_wise_valid(const Path &left_path,
const Path &right_path)
{
bool is_valid = true;
if (left_path.exchange_allocated_ && !right_path.exchange_allocated_ &&
right_path.contain_merge_op()) {
is_valid = false;
} else if (!left_path.exchange_allocated_ && right_path.exchange_allocated_ &&
left_path.contain_merge_op()) {
if ((left_path.exchange_allocated_ || right_path.exchange_allocated_) &&
(left_path.contain_pw_merge_op() || right_path.contain_pw_merge_op())) {
is_valid = false;
} else {
is_valid = true;
......
......@@ -310,7 +310,7 @@ struct EstimateCostInfo {
phy_plan_type_(ObPhyPlanType::OB_PHY_PLAN_UNINITIALIZED),
location_type_(ObPhyPlanType::OB_PHY_PLAN_UNINITIALIZED),
contain_fake_cte_(false),
contain_merge_op_(false),
contain_pw_merge_op_(false),
contain_das_op_(false),
parallel_(1),
server_cnt_(1)
......@@ -335,7 +335,7 @@ struct EstimateCostInfo {
phy_plan_type_(ObPhyPlanType::OB_PHY_PLAN_UNINITIALIZED),
location_type_(ObPhyPlanType::OB_PHY_PLAN_UNINITIALIZED),
contain_fake_cte_(false),
contain_merge_op_(false),
contain_pw_merge_op_(false),
contain_das_op_(false),
parallel_(1),
server_cnt_(1),
......@@ -409,7 +409,7 @@ struct EstimateCostInfo {
virtual int re_estimate_cost(EstimateCostInfo &info, double &card, double &cost);
double get_path_output_rows() const;
bool contain_fake_cte() const { return contain_fake_cte_; }
bool contain_merge_op() const { return contain_merge_op_; }
bool contain_pw_merge_op() const { return contain_pw_merge_op_; }
bool is_pipelined_path() const { return is_pipelined_path_; }
bool is_nl_style_pipelined_path() const { return is_nl_style_pipelined_path_; }
virtual int compute_pipeline_info();
......@@ -480,7 +480,7 @@ struct EstimateCostInfo {
ObPhyPlanType phy_plan_type_;
ObPhyPlanType location_type_;
bool contain_fake_cte_;
bool contain_merge_op_;
bool contain_pw_merge_op_;
bool contain_das_op_;
// remember the parallel info to get this sharding
int64_t parallel_;
......
......@@ -344,7 +344,7 @@ ObLogicalOperator::ObLogicalOperator(ObLogPlan &plan)
dblink_id_(0), // 0 represent local cluster.
plan_depth_(0),
contain_fake_cte_(false),
contain_merge_op_(false),
contain_pw_merge_op_(false),
contain_das_op_(false),
dup_table_pos_(),
strong_sharding_(NULL),
......@@ -802,27 +802,32 @@ int ObLogicalOperator::compute_op_other_info()
// compute contains merge style op
if (OB_SUCC(ret)) {
for (int64_t i = 0; OB_SUCC(ret) && !contain_merge_op_ && i < get_num_of_child(); i++) {
for (int64_t i = 0; OB_SUCC(ret) && !contain_pw_merge_op_ && i < get_num_of_child(); i++) {
if (OB_ISNULL(get_child(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else {
contain_merge_op_ |= get_child(i)->get_contains_merge_op();
contain_pw_merge_op_ |= get_child(i)->get_contains_pw_merge_op() &&
log_op_def::LOG_EXCHANGE != get_child(i)->get_type();
}
}
if (OB_SUCC(ret) && !contain_merge_op_) {
if (OB_SUCC(ret) && !contain_pw_merge_op_) {
if (log_op_def::LOG_GROUP_BY == get_type()) {
ObLogGroupBy *group_by = static_cast<ObLogGroupBy*>(this);
contain_merge_op_ = !group_by->get_group_by_exprs().empty() &&
(AggregateAlgo::MERGE_AGGREGATE == group_by->get_algo());
contain_pw_merge_op_ = !group_by->get_group_by_exprs().empty() &&
(AggregateAlgo::MERGE_AGGREGATE == group_by->get_algo()) &&
is_partition_wise();
} else if (log_op_def::LOG_DISTINCT == get_type()) {
ObLogDistinct *distinct = static_cast<ObLogDistinct*>(this);
contain_merge_op_ = AggregateAlgo::MERGE_AGGREGATE == distinct->get_algo();
contain_pw_merge_op_ = AggregateAlgo::MERGE_AGGREGATE == distinct->get_algo() &&
is_partition_wise();
} else if (log_op_def::LOG_SET == get_type()) {
ObLogSet *set = static_cast<ObLogSet*>(this);
contain_merge_op_ = set->is_set_distinct() && SetAlgo::MERGE_SET == set->get_algo();
contain_pw_merge_op_ = set->is_set_distinct() && SetAlgo::MERGE_SET == set->get_algo() &&
is_partition_wise();
} else if (log_op_def::LOG_WINDOW_FUNCTION == get_type()) {
contain_merge_op_ = is_block_op();
contain_pw_merge_op_ = is_block_op() &&
is_partition_wise();
} else { /*do nothing*/ }
}
}
......@@ -885,7 +890,7 @@ int ObLogicalOperator::compute_property(Path *path)
set_phy_plan_type(path->phy_plan_type_);
set_location_type(path->location_type_);
set_contains_fake_cte(path->contain_fake_cte_);
set_contains_merge_op(path->contain_merge_op_);
set_contains_pw_merge_op(path->contain_pw_merge_op_);
set_contains_das_op(path->contain_das_op_);
is_pipelined_plan_ = path->is_pipelined_path();
is_nl_style_pipelined_plan_ = path->is_nl_style_pipelined_path();
......@@ -1013,7 +1018,7 @@ int ObLogicalOperator::compute_property()
K(phy_plan_type_),
K(location_type_),
K(contain_fake_cte_),
K(contain_merge_op_),
K(contain_pw_merge_op_),
K(contain_das_op_),
K(width_));
}
......
......@@ -1497,10 +1497,10 @@ public:
{
contain_fake_cte_ = contain_fake_cte;
}
inline bool get_contains_merge_op() const { return contain_merge_op_; }
inline void set_contains_merge_op(bool contain_merge_op)
inline bool get_contains_pw_merge_op() const { return contain_pw_merge_op_; }
inline void set_contains_pw_merge_op(bool contain_pw_merge_op)
{
contain_merge_op_ = contain_merge_op;
contain_pw_merge_op_ = contain_pw_merge_op;
}
inline bool get_contains_das_op() const { return contain_das_op_; }
inline void set_contains_das_op(bool contain_das_op)
......@@ -1820,7 +1820,7 @@ protected:
uint64_t dblink_id_;
int64_t plan_depth_;
bool contain_fake_cte_;
bool contain_merge_op_;
bool contain_pw_merge_op_;
bool contain_das_op_;
common::ObSEArray<int64_t, 4, common::ModulePageAllocator, true> dup_table_pos_; // for duplicated table
ObShardingInfo *strong_sharding_;
......
......@@ -2622,11 +2622,8 @@ bool ObSelectLogPlan::is_set_partition_wise_valid(const ObLogicalOperator &left_
const ObLogicalOperator &right_plan)
{
bool is_valid = true;
if (left_plan.is_exchange_allocated() && !right_plan.is_exchange_allocated() &&
right_plan.get_contains_merge_op()) {
is_valid = false;
} else if (!left_plan.is_exchange_allocated() &&
right_plan.is_exchange_allocated() && left_plan.get_contains_merge_op()) {
if ((left_plan.is_exchange_allocated() || right_plan.is_exchange_allocated()) &&
(left_plan.get_contains_pw_merge_op() || right_plan.get_contains_pw_merge_op())) {
is_valid = false;
} else { /*do nothing*/ }
return is_valid;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册