提交 5ac52778 编写于 作者: R raywill 提交者: LINGuanRen

fix pdml error when target table is partitioned by functions, instead of ref columns

上级 76ae3d87
......@@ -921,10 +921,8 @@ int ObLogInsert::allocate_exchange_post_pdml(AllocExchContext* ctx)
ObSEArray<ObRawExpr*, 8> left_keys;
ObSEArray<ObRawExpr*, 8> right_keys;
ObShardingInfo output_sharding;
ObSEArray<ObShardingInfo*, 2> input_sharding;
EqualSets sharding_input_esets;
ObInsertStmt* stmt = static_cast<ObInsertStmt*>(get_stmt());
ObSEArray<ObShardingInfo *, 2> input_sharding;
ObInsertStmt *stmt = static_cast<ObInsertStmt *>(get_stmt());
if (OB_ISNULL(ctx) || OB_ISNULL(get_plan()) || OB_ISNULL(stmt) || OB_ISNULL(table_columns_) ||
OB_ISNULL(column_convert_exprs_) || OB_ISNULL(all_table_columns_) || OB_ISNULL(child = get_child(first_child))) {
ret = OB_ERR_UNEXPECTED;
......@@ -935,7 +933,7 @@ int ObLogInsert::allocate_exchange_post_pdml(AllocExchContext* ctx)
} else if (FALSE_IT(get_sharding_info().set_location_type(OB_TBL_LOCATION_DISTRIBUTED))) {
} else if (FALSE_IT(calc_phy_location_type())) {
LOG_WARN("fail calc phy location type for insert", K(ret));
} else if (OB_FAIL(sharding_info_.get_all_partition_keys(left_keys))) {
} else if (OB_FAIL(sharding_info_.get_all_partition_ref_columns(left_keys))) {
LOG_WARN("failed to get all partition keys", K(ret));
} else if (OB_FAIL(get_right_key(left_keys,
*column_convert_exprs_,
......@@ -945,12 +943,13 @@ int ObLogInsert::allocate_exchange_post_pdml(AllocExchContext* ctx)
} else {
// allocate pkey below insert operator
ObExchangeInfo exch_info;
ObRawExprFactory& expr_factory = get_plan()->get_optimizer_context().get_expr_factory();
if (!left_keys.empty() &&
OB_FAIL(compute_repartition_func_info(
sharding_input_esets, right_keys, left_keys, sharding_info_, expr_factory, exch_info))) {
ObRawExprFactory &expr_factory = get_plan()->get_optimizer_context().get_expr_factory();
if (!left_keys.empty() && OB_FAIL(compute_repartition_func_info_for_insert(right_keys, // select from columns
left_keys, // insert into columns
sharding_info_,
expr_factory,
exch_info))) {
LOG_WARN("failed to compute repartition func info", K(ret));
} else if (OB_FAIL(set_hash_dist_column_exprs(exch_info, get_index_tid()))) {
LOG_WARN("fail set hash dist column exprs", K(ret));
} else {
......
......@@ -3745,6 +3745,86 @@ int ObLogicalOperator::compute_repartition_func_info(const EqualSets& equal_sets
return ret;
}
int ObLogicalOperator::compute_repartition_func_info_for_insert(const ObIArray<ObRawExpr *> &src_keys,
const ObIArray<ObRawExpr *> &target_keys, const ObShardingInfo &target_sharding, ObRawExprFactory &expr_factory,
ObExchangeInfo &exch_info)
{
int ret = OB_SUCCESS;
ObSQLSessionInfo *session_info = NULL;
ObSEArray<ObRawExpr *, 4> repart_exprs;
ObSEArray<ObRawExpr *, 4> repart_sub_exprs;
ObSEArray<ObRawExpr *, 4> repart_func_exprs;
// get repart exprs
bool skip_part = target_sharding.is_partition_single();
bool skip_subpart = target_sharding.is_subpartition_single();
if (!skip_part && OB_SUCC(ret)) {
if (OB_FAIL(ObRawExprUtils::copy_exprs(
expr_factory, target_sharding.get_partition_keys(), repart_exprs, COPY_REF_DEFAULT))) {
LOG_WARN("fail copy expr", K(ret));
}
}
if (!skip_subpart && OB_SUCC(ret)) {
if (OB_FAIL(ObRawExprUtils::copy_exprs(
expr_factory, target_sharding.get_sub_partition_keys(), repart_sub_exprs, COPY_REF_DEFAULT))) {
LOG_WARN("fail copy expr", K(ret));
}
}
if (OB_FAIL(ret)) {
// pass
} else if (OB_ISNULL(get_plan()) ||
OB_ISNULL(session_info = get_plan()->get_optimizer_context().get_session_info())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(get_plan()), K(session_info), K(ret));
} else if (!skip_part && OB_FAIL(ObTransformUtils::replace_equal_expr(target_keys, src_keys, repart_exprs))) {
LOG_WARN("failed to get repartition keys", K(ret), K(target_keys), K(src_keys));
} else if (!skip_subpart && OB_FAIL(ObTransformUtils::replace_equal_expr(target_keys, src_keys, repart_sub_exprs))) {
LOG_WARN("failed to get repartition keys", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < target_sharding.get_partition_func().count(); i++) {
ObRawExpr *repart_func_expr = NULL;
ObRawExpr *target_func_expr = target_sharding.get_partition_func().at(i);
if ((0 == i && skip_part) || (1 == i && skip_subpart)) {
ObConstRawExpr *const_expr = NULL;
ObRawExpr *dummy_expr = NULL;
int64_t const_value = 1;
if (OB_FAIL(ObRawExprUtils::build_const_int_expr(
get_plan()->get_optimizer_context().get_expr_factory(), ObIntType, const_value, const_expr))) {
LOG_WARN("Failed to build const expr", K(ret));
} else if (OB_ISNULL(dummy_expr = const_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (OB_FAIL(dummy_expr->formalize(session_info))) {
LOG_WARN("Failed to formalize a new expr", K(ret));
} else if (OB_FAIL(repart_func_exprs.push_back(dummy_expr))) {
LOG_WARN("failed to push back expr", K(ret));
}
} else if (OB_ISNULL(target_func_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (OB_FAIL(
ObRawExprUtils::copy_expr(expr_factory, target_func_expr, repart_func_expr, COPY_REF_DEFAULT))) {
LOG_WARN("failed to deep copy the partition fuc raw expr");
} else if (OB_ISNULL(repart_func_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (OB_FAIL(ObTransformUtils::replace_equal_expr(target_keys, src_keys, repart_func_expr))) {
LOG_WARN("failed to replace general expr", K(ret));
} else if (OB_FAIL(repart_func_exprs.push_back(repart_func_expr))) {
LOG_WARN("failed to push back expr", K(ret));
} else { /*do nothing*/
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(exch_info.set_repartition_info(repart_exprs, repart_sub_exprs, repart_func_exprs))) {
LOG_WARN("failed to set repartition keys", K(ret));
} else { /*do nothing*/
}
}
}
return ret;
}
int ObLogicalOperator::get_repartition_keys(const EqualSets& equal_sets, const ObIArray<ObRawExpr*>& src_keys,
const ObIArray<ObRawExpr*>& target_keys, const ObIArray<ObRawExpr*>& target_part_keys,
ObIArray<ObRawExpr*>& src_part_keys)
......@@ -3783,7 +3863,7 @@ int ObLogicalOperator::get_repartition_keys(const EqualSets& equal_sets, const O
}
if (OB_SUCC(ret) && !is_find) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("can not find part expr", K(target_part_keys.at(i)), K(src_keys), K(target_keys), K(ret));
LOG_WARN("can not find part expr", K(*target_part_keys.at(i)), K(src_keys), K(target_keys), K(ret));
}
}
}
......
......@@ -1741,6 +1741,10 @@ public:
const common::ObIArray<ObRawExpr*>& target_join_key, const ObShardingInfo& target_sharding,
ObRawExprFactory& expr_factory, ObExchangeInfo& exch_info);
int compute_repartition_func_info_for_insert(const ObIArray<ObRawExpr *> &src_keys,
const ObIArray<ObRawExpr *> &target_keys, const ObShardingInfo &target_sharding, ObRawExprFactory &expr_factory,
ObExchangeInfo &exch_info);
int get_repartition_keys(const EqualSets& equal_sets, const common::ObIArray<ObRawExpr*>& src_keys,
const common::ObIArray<ObRawExpr*>& target_keys, const common::ObIArray<ObRawExpr*>& target_part_keys,
common::ObIArray<ObRawExpr*>& src_part_keys);
......
......@@ -521,7 +521,23 @@ int ObShardingInfo::get_all_partition_keys(
return ret;
}
int ObShardingInfo::get_total_part_cnt(int64_t& total_part_cnt) const
// extract all base column exprs from partition expr
int ObShardingInfo::get_all_partition_ref_columns(
common::ObIArray<ObRawExpr *> &out_part_keys, bool ignore_single_partition /* = false */) const
{
int ret = OB_SUCCESS;
if (!(ignore_single_partition && is_partition_single()) &&
OB_FAIL(ObRawExprUtils::extract_column_exprs(partition_keys_, out_part_keys))) {
LOG_WARN("failed to assign array", K(ret));
} else if (!(ignore_single_partition && is_subpartition_single()) &&
OB_FAIL(ObRawExprUtils::extract_column_exprs(sub_partition_keys_, out_part_keys))) {
LOG_WARN("failed to append array", K(ret));
} else { /*do nothing*/
}
return ret;
}
int ObShardingInfo::get_total_part_cnt(int64_t &total_part_cnt) const
{
int ret = OB_SUCCESS;
total_part_cnt = 1;
......
......@@ -256,6 +256,9 @@ public:
int get_all_partition_keys(common::ObIArray<ObRawExpr*>& out_part_keys, bool ignore_single_partition = false) const;
int get_all_partition_ref_columns(
common::ObIArray<ObRawExpr *> &out_part_keys, bool ignore_single_partition = false) const;
TO_STRING_KV(K(is_sharding()), K(is_local()), K(is_remote_or_distribute()), K(is_match_all()), K_(part_level),
K_(part_func_type), K_(subpart_func_type), K_(part_num), K_(subpart_num), K_(location_type),
K_(can_reselect_replica), K_(phy_table_location_info));
......
......@@ -916,6 +916,18 @@ int ObTransformUtils::replace_equal_expr(const common::ObIArray<ObRawExpr*>& oth
return ret;
}
int ObTransformUtils::replace_equal_expr(const common::ObIArray<ObRawExpr *> &other_exprs,
const common::ObIArray<ObRawExpr *> &current_exprs, common::ObIArray<ObRawExpr *> &exprs)
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < exprs.count(); ++i) {
if (OB_FAIL(replace_equal_expr(other_exprs, current_exprs, exprs.at(i)))) {
LOG_WARN("fail replace equal exprs", K(ret));
}
}
return ret;
}
int ObTransformUtils::replace_expr(ObRawExpr* old_expr, ObRawExpr* new_expr, ObRawExpr*& expr)
{
int ret = OB_SUCCESS;
......
......@@ -168,8 +168,10 @@ public:
static int replace_equal_expr(ObRawExpr* old_expr, ObRawExpr* new_expr, ObRawExpr*& expr);
static int replace_equal_expr(const common::ObIArray<ObRawExpr*>& other_exprs,
const common::ObIArray<ObRawExpr*>& current_exprs, ObRawExpr*& expr);
static int replace_equal_expr(const common::ObIArray<ObRawExpr *> &other_exprs,
const common::ObIArray<ObRawExpr *> &current_exprs, ObRawExpr *&expr);
static int replace_equal_expr(const common::ObIArray<ObRawExpr *> &other_exprs,
const common::ObIArray<ObRawExpr *> &current_exprs, common::ObIArray<ObRawExpr *> &exprs);
static int replace_expr(ObRawExpr* old_expr, ObRawExpr* new_expr, ObRawExpr*& expr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册