From 5ac527787632a394fd06b1a4ed911930e67d71de Mon Sep 17 00:00:00 2001 From: raywill Date: Fri, 8 Apr 2022 14:01:42 +0800 Subject: [PATCH] fix pdml error when target table is partitioned by functions, instead of ref columns --- src/sql/optimizer/ob_log_insert.cpp | 19 +++--- src/sql/optimizer/ob_logical_operator.cpp | 82 ++++++++++++++++++++++- src/sql/optimizer/ob_logical_operator.h | 4 ++ src/sql/optimizer/ob_sharding_info.cpp | 18 ++++- src/sql/optimizer/ob_sharding_info.h | 3 + src/sql/rewrite/ob_transform_utils.cpp | 12 ++++ src/sql/rewrite/ob_transform_utils.h | 6 +- 7 files changed, 130 insertions(+), 14 deletions(-) diff --git a/src/sql/optimizer/ob_log_insert.cpp b/src/sql/optimizer/ob_log_insert.cpp index 8c3edbe2ad..202f1f3d69 100644 --- a/src/sql/optimizer/ob_log_insert.cpp +++ b/src/sql/optimizer/ob_log_insert.cpp @@ -921,10 +921,8 @@ int ObLogInsert::allocate_exchange_post_pdml(AllocExchContext* ctx) ObSEArray left_keys; ObSEArray right_keys; ObShardingInfo output_sharding; - ObSEArray input_sharding; - EqualSets sharding_input_esets; - ObInsertStmt* stmt = static_cast(get_stmt()); - + ObSEArray input_sharding; + ObInsertStmt *stmt = static_cast(get_stmt()); if (OB_ISNULL(ctx) || OB_ISNULL(get_plan()) || OB_ISNULL(stmt) || OB_ISNULL(table_columns_) || OB_ISNULL(column_convert_exprs_) || OB_ISNULL(all_table_columns_) || OB_ISNULL(child = get_child(first_child))) { ret = OB_ERR_UNEXPECTED; @@ -935,7 +933,7 @@ int ObLogInsert::allocate_exchange_post_pdml(AllocExchContext* ctx) } else if (FALSE_IT(get_sharding_info().set_location_type(OB_TBL_LOCATION_DISTRIBUTED))) { } else if (FALSE_IT(calc_phy_location_type())) { LOG_WARN("fail calc phy location type for insert", K(ret)); - } else if (OB_FAIL(sharding_info_.get_all_partition_keys(left_keys))) { + } else if (OB_FAIL(sharding_info_.get_all_partition_ref_columns(left_keys))) { LOG_WARN("failed to get all partition keys", K(ret)); } else if (OB_FAIL(get_right_key(left_keys, *column_convert_exprs_, @@ -945,12 +943,13 @@ int ObLogInsert::allocate_exchange_post_pdml(AllocExchContext* ctx) } else { // allocate pkey below insert operator ObExchangeInfo exch_info; - ObRawExprFactory& expr_factory = get_plan()->get_optimizer_context().get_expr_factory(); - if (!left_keys.empty() && - OB_FAIL(compute_repartition_func_info( - sharding_input_esets, right_keys, left_keys, sharding_info_, expr_factory, exch_info))) { + ObRawExprFactory &expr_factory = get_plan()->get_optimizer_context().get_expr_factory(); + if (!left_keys.empty() && OB_FAIL(compute_repartition_func_info_for_insert(right_keys, // select from columns + left_keys, // insert into columns + sharding_info_, + expr_factory, + exch_info))) { LOG_WARN("failed to compute repartition func info", K(ret)); - } else if (OB_FAIL(set_hash_dist_column_exprs(exch_info, get_index_tid()))) { LOG_WARN("fail set hash dist column exprs", K(ret)); } else { diff --git a/src/sql/optimizer/ob_logical_operator.cpp b/src/sql/optimizer/ob_logical_operator.cpp index 573515e673..f0f1d56d45 100644 --- a/src/sql/optimizer/ob_logical_operator.cpp +++ b/src/sql/optimizer/ob_logical_operator.cpp @@ -3745,6 +3745,86 @@ int ObLogicalOperator::compute_repartition_func_info(const EqualSets& equal_sets return ret; } +int ObLogicalOperator::compute_repartition_func_info_for_insert(const ObIArray &src_keys, + const ObIArray &target_keys, const ObShardingInfo &target_sharding, ObRawExprFactory &expr_factory, + ObExchangeInfo &exch_info) +{ + int ret = OB_SUCCESS; + ObSQLSessionInfo *session_info = NULL; + ObSEArray repart_exprs; + ObSEArray repart_sub_exprs; + ObSEArray repart_func_exprs; + // get repart exprs + bool skip_part = target_sharding.is_partition_single(); + bool skip_subpart = target_sharding.is_subpartition_single(); + if (!skip_part && OB_SUCC(ret)) { + if (OB_FAIL(ObRawExprUtils::copy_exprs( + expr_factory, target_sharding.get_partition_keys(), repart_exprs, COPY_REF_DEFAULT))) { + LOG_WARN("fail copy expr", K(ret)); + } + } + if (!skip_subpart && OB_SUCC(ret)) { + if (OB_FAIL(ObRawExprUtils::copy_exprs( + expr_factory, target_sharding.get_sub_partition_keys(), repart_sub_exprs, COPY_REF_DEFAULT))) { + LOG_WARN("fail copy expr", K(ret)); + } + } + + if (OB_FAIL(ret)) { + // pass + } else if (OB_ISNULL(get_plan()) || + OB_ISNULL(session_info = get_plan()->get_optimizer_context().get_session_info())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(get_plan()), K(session_info), K(ret)); + } else if (!skip_part && OB_FAIL(ObTransformUtils::replace_equal_expr(target_keys, src_keys, repart_exprs))) { + LOG_WARN("failed to get repartition keys", K(ret), K(target_keys), K(src_keys)); + } else if (!skip_subpart && OB_FAIL(ObTransformUtils::replace_equal_expr(target_keys, src_keys, repart_sub_exprs))) { + LOG_WARN("failed to get repartition keys", K(ret)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < target_sharding.get_partition_func().count(); i++) { + ObRawExpr *repart_func_expr = NULL; + ObRawExpr *target_func_expr = target_sharding.get_partition_func().at(i); + if ((0 == i && skip_part) || (1 == i && skip_subpart)) { + ObConstRawExpr *const_expr = NULL; + ObRawExpr *dummy_expr = NULL; + int64_t const_value = 1; + if (OB_FAIL(ObRawExprUtils::build_const_int_expr( + get_plan()->get_optimizer_context().get_expr_factory(), ObIntType, const_value, const_expr))) { + LOG_WARN("Failed to build const expr", K(ret)); + } else if (OB_ISNULL(dummy_expr = const_expr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret)); + } else if (OB_FAIL(dummy_expr->formalize(session_info))) { + LOG_WARN("Failed to formalize a new expr", K(ret)); + } else if (OB_FAIL(repart_func_exprs.push_back(dummy_expr))) { + LOG_WARN("failed to push back expr", K(ret)); + } + } else if (OB_ISNULL(target_func_expr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret)); + } else if (OB_FAIL( + ObRawExprUtils::copy_expr(expr_factory, target_func_expr, repart_func_expr, COPY_REF_DEFAULT))) { + LOG_WARN("failed to deep copy the partition fuc raw expr"); + } else if (OB_ISNULL(repart_func_expr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null", K(ret)); + } else if (OB_FAIL(ObTransformUtils::replace_equal_expr(target_keys, src_keys, repart_func_expr))) { + LOG_WARN("failed to replace general expr", K(ret)); + } else if (OB_FAIL(repart_func_exprs.push_back(repart_func_expr))) { + LOG_WARN("failed to push back expr", K(ret)); + } else { /*do nothing*/ + } + } + if (OB_SUCC(ret)) { + if (OB_FAIL(exch_info.set_repartition_info(repart_exprs, repart_sub_exprs, repart_func_exprs))) { + LOG_WARN("failed to set repartition keys", K(ret)); + } else { /*do nothing*/ + } + } + } + return ret; +} + int ObLogicalOperator::get_repartition_keys(const EqualSets& equal_sets, const ObIArray& src_keys, const ObIArray& target_keys, const ObIArray& target_part_keys, ObIArray& src_part_keys) @@ -3783,7 +3863,7 @@ int ObLogicalOperator::get_repartition_keys(const EqualSets& equal_sets, const O } if (OB_SUCC(ret) && !is_find) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("can not find part expr", K(target_part_keys.at(i)), K(src_keys), K(target_keys), K(ret)); + LOG_WARN("can not find part expr", K(*target_part_keys.at(i)), K(src_keys), K(target_keys), K(ret)); } } } diff --git a/src/sql/optimizer/ob_logical_operator.h b/src/sql/optimizer/ob_logical_operator.h index 378a9b6313..09b10150c5 100644 --- a/src/sql/optimizer/ob_logical_operator.h +++ b/src/sql/optimizer/ob_logical_operator.h @@ -1741,6 +1741,10 @@ public: const common::ObIArray& target_join_key, const ObShardingInfo& target_sharding, ObRawExprFactory& expr_factory, ObExchangeInfo& exch_info); + int compute_repartition_func_info_for_insert(const ObIArray &src_keys, + const ObIArray &target_keys, const ObShardingInfo &target_sharding, ObRawExprFactory &expr_factory, + ObExchangeInfo &exch_info); + int get_repartition_keys(const EqualSets& equal_sets, const common::ObIArray& src_keys, const common::ObIArray& target_keys, const common::ObIArray& target_part_keys, common::ObIArray& src_part_keys); diff --git a/src/sql/optimizer/ob_sharding_info.cpp b/src/sql/optimizer/ob_sharding_info.cpp index 1af96221c3..46837b5d7c 100644 --- a/src/sql/optimizer/ob_sharding_info.cpp +++ b/src/sql/optimizer/ob_sharding_info.cpp @@ -521,7 +521,23 @@ int ObShardingInfo::get_all_partition_keys( return ret; } -int ObShardingInfo::get_total_part_cnt(int64_t& total_part_cnt) const +// extract all base column exprs from partition expr +int ObShardingInfo::get_all_partition_ref_columns( + common::ObIArray &out_part_keys, bool ignore_single_partition /* = false */) const +{ + int ret = OB_SUCCESS; + if (!(ignore_single_partition && is_partition_single()) && + OB_FAIL(ObRawExprUtils::extract_column_exprs(partition_keys_, out_part_keys))) { + LOG_WARN("failed to assign array", K(ret)); + } else if (!(ignore_single_partition && is_subpartition_single()) && + OB_FAIL(ObRawExprUtils::extract_column_exprs(sub_partition_keys_, out_part_keys))) { + LOG_WARN("failed to append array", K(ret)); + } else { /*do nothing*/ + } + return ret; +} + +int ObShardingInfo::get_total_part_cnt(int64_t &total_part_cnt) const { int ret = OB_SUCCESS; total_part_cnt = 1; diff --git a/src/sql/optimizer/ob_sharding_info.h b/src/sql/optimizer/ob_sharding_info.h index 9d2b115823..8c8f694b2a 100644 --- a/src/sql/optimizer/ob_sharding_info.h +++ b/src/sql/optimizer/ob_sharding_info.h @@ -256,6 +256,9 @@ public: int get_all_partition_keys(common::ObIArray& out_part_keys, bool ignore_single_partition = false) const; + int get_all_partition_ref_columns( + common::ObIArray &out_part_keys, bool ignore_single_partition = false) const; + TO_STRING_KV(K(is_sharding()), K(is_local()), K(is_remote_or_distribute()), K(is_match_all()), K_(part_level), K_(part_func_type), K_(subpart_func_type), K_(part_num), K_(subpart_num), K_(location_type), K_(can_reselect_replica), K_(phy_table_location_info)); diff --git a/src/sql/rewrite/ob_transform_utils.cpp b/src/sql/rewrite/ob_transform_utils.cpp index 96d63a146d..12ef04f70f 100644 --- a/src/sql/rewrite/ob_transform_utils.cpp +++ b/src/sql/rewrite/ob_transform_utils.cpp @@ -916,6 +916,18 @@ int ObTransformUtils::replace_equal_expr(const common::ObIArray& oth return ret; } +int ObTransformUtils::replace_equal_expr(const common::ObIArray &other_exprs, + const common::ObIArray ¤t_exprs, common::ObIArray &exprs) +{ + int ret = OB_SUCCESS; + for (int64_t i = 0; OB_SUCC(ret) && i < exprs.count(); ++i) { + if (OB_FAIL(replace_equal_expr(other_exprs, current_exprs, exprs.at(i)))) { + LOG_WARN("fail replace equal exprs", K(ret)); + } + } + return ret; +} + int ObTransformUtils::replace_expr(ObRawExpr* old_expr, ObRawExpr* new_expr, ObRawExpr*& expr) { int ret = OB_SUCCESS; diff --git a/src/sql/rewrite/ob_transform_utils.h b/src/sql/rewrite/ob_transform_utils.h index d7da7d3369..3dd375dc4f 100644 --- a/src/sql/rewrite/ob_transform_utils.h +++ b/src/sql/rewrite/ob_transform_utils.h @@ -168,8 +168,10 @@ public: static int replace_equal_expr(ObRawExpr* old_expr, ObRawExpr* new_expr, ObRawExpr*& expr); - static int replace_equal_expr(const common::ObIArray& other_exprs, - const common::ObIArray& current_exprs, ObRawExpr*& expr); + static int replace_equal_expr(const common::ObIArray &other_exprs, + const common::ObIArray ¤t_exprs, ObRawExpr *&expr); + static int replace_equal_expr(const common::ObIArray &other_exprs, + const common::ObIArray ¤t_exprs, common::ObIArray &exprs); static int replace_expr(ObRawExpr* old_expr, ObRawExpr* new_expr, ObRawExpr*& expr); -- GitLab