提交 b75f4694 编写于 作者: R raywill 提交者: wangzelin.wzl

[Shein-Feature] support auto_increment attribute for part column

上级 6ee95017
......@@ -18,7 +18,7 @@ namespace share {
OB_SERIALIZE_MEMBER(AutoincParam, tenant_id_, autoinc_table_id_, autoinc_table_part_num_, autoinc_col_id_,
autoinc_col_index_, autoinc_update_col_index_, autoinc_col_type_, total_value_count_, autoinc_desired_count_,
autoinc_old_value_index_, autoinc_increment_, autoinc_offset_, autoinc_first_part_num_, part_level_, pkey_,
auto_increment_cache_size_);
auto_increment_cache_size_, part_value_no_order_);
} // end namespace share
} // end namespace oceanbase
......@@ -88,6 +88,7 @@ struct AutoincParam {
value_to_sync_(0),
sync_flag_(false),
is_ignore_(false),
part_value_no_order_(false),
autoinc_intervals_count_(0),
part_level_(schema::PARTITION_LEVEL_ZERO),
pkey_(),
......@@ -102,8 +103,13 @@ struct AutoincParam {
autoinc_increment_, "autoinc_offset", autoinc_offset_, "curr_value_count", curr_value_count_,
"global_value_to_sync", global_value_to_sync_, "value_to_sync", value_to_sync_, "sync_flag", sync_flag_,
"is_ignore", is_ignore_, "autoinc_intervals_count", autoinc_intervals_count_, "part_level", part_level_,
"paritition key", pkey_, "auto_increment_cache_size", auto_increment_cache_size_);
"paritition key", pkey_, "auto_increment_cache_size", auto_increment_cache_size_, "part_value_no_order",
part_value_no_order_);
inline bool with_order() const
{
return !part_value_no_order_;
}
// pay attention to schema changes
uint64_t tenant_id_;
uint64_t autoinc_table_id_;
......@@ -127,6 +133,13 @@ struct AutoincParam {
uint64_t value_to_sync_;
bool sync_flag_;
bool is_ignore_;
// in order to support partitioning with auto increment pkey,
// we have to **loose the restriction** that generated number must be
// in intra-partition ascending order.
// https://aone.alibaba-inc.com/req/34489012
// If part_value_no_order_ flag = true, we can break this ordering restriction.
// for compatibility consideration, part_value_no_order_ defaults to false
bool part_value_no_order_;
// count for cache handle allocated already
uint64_t autoinc_intervals_count_;
......
......@@ -623,7 +623,7 @@ int ObAutoincrementService::get_table_node(const AutoincParam& param, TableNode*
const int64_t partition_id = param.pkey_.is_valid() ? param.pkey_.get_partition_id() : -1;
int64_t leader_epoch = 0;
if (OB_FAIL(get_leader_epoch_id(param.pkey_, leader_epoch))) {
if (param.with_order() && OB_FAIL(get_leader_epoch_id(param.pkey_, leader_epoch))) {
LOG_WARN("get leader epoch id failed", K(ret), K(param));
} else if (OB_FAIL(node_map_.get(key, table_node))) {
if (ret != OB_ENTRY_NOT_EXIST) {
......@@ -683,7 +683,7 @@ int ObAutoincrementService::get_table_node(const AutoincParam& param, TableNode*
}
}
if (OB_SUCC(ret)) {
LOG_DEBUG("succ to get table node", K(param), K(*table_node), K(ret));
LOG_DEBUG("succ to get table node", K(param), KPC(table_node), K(ret));
} else {
LOG_WARN("failed to get table node", K(param), K(ret));
}
......
......@@ -686,6 +686,7 @@ int ObLogInsert::need_multi_table_dml(AllocExchContext& ctx, ObShardingInfo& sha
ObSEArray<ObShardingInfo*, 2> input_sharding;
ObShardingInfo output_sharding;
ObLogicalOperator* child = NULL;
bool has_auto_inc_part_key = false;
bool has_rand_part_key = false;
bool has_subquery_part_key = false;
bool trigger_exist = false;
......@@ -701,7 +702,12 @@ int ObLogInsert::need_multi_table_dml(AllocExchContext& ctx, ObShardingInfo& sha
} else if (OB_ISNULL(tbl_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table schema is null", K(ret), K(insert_stmt));
} else if (modify_multi_tables()) {
is_needed = true;
} else if (is_table_update_part_key() && tbl_schema->get_all_part_num() > 1) {
is_needed = true;
}
if (OB_SUCC(ret) && OB_FAIL(generate_sharding_info(target_sharding_info))) {
if (ret == OB_NO_PARTITION_FOR_GIVEN_VALUE && trigger_exist) {
ret = OB_SUCCESS;
......@@ -709,18 +715,18 @@ int ObLogInsert::need_multi_table_dml(AllocExchContext& ctx, ObShardingInfo& sha
LOG_WARN("failed to generate sharding info", K(ret));
}
}
if (OB_FAIL(ret) || is_needed) {
} else if (modify_multi_tables()) {
is_needed = true;
} else if (is_table_update_part_key() && tbl_schema->get_all_part_num() > 1) {
is_needed = true;
// nop
} else if (is_table_insert_sequence_part_key()) {
is_needed = true;
} else if (OB_FAIL(insert_stmt->part_key_has_auto_inc(has_auto_inc_part_key))) {
LOG_WARN("check to check whether part key containts auto inc column", K(ret));
} else if (OB_FAIL(insert_stmt->part_key_has_rand_value(has_rand_part_key))) {
LOG_WARN("failed to check whether part key containts random value", K(ret));
} else if (OB_FAIL(insert_stmt->part_key_has_subquery(has_subquery_part_key))) {
LOG_WARN("check to check whether part key containts subquery", K(ret));
} else if (has_rand_part_key || has_subquery_part_key) {
} else if (has_auto_inc_part_key || has_rand_part_key || has_subquery_part_key) {
is_needed = true;
} else if (get_part_hint() != NULL && insert_stmt->value_from_select()) {
is_needed = true;
......@@ -735,11 +741,6 @@ int ObLogInsert::need_multi_table_dml(AllocExchContext& ctx, ObShardingInfo& sha
} else if (is_basic) {
is_needed = false;
sharding_info.copy_with_part_keys(target_sharding_info);
} else if (target_sharding_info.is_distributed() && child->get_sharding_info().is_match_all() &&
insert_stmt->with_explicit_autoinc_column() && !is_insert_select()) {
is_needed = false;
ctx.plan_type_ = AllocExchContext::DistrStat::DISTRIBUTED;
sharding_info.copy_with_part_keys(target_sharding_info);
} else if (OB_FAIL(check_if_match_partition_wise_insert(
ctx, target_sharding_info, child->get_sharding_info(), is_match))) {
LOG_WARN("failed to check partition wise", K(ret));
......
......@@ -2991,6 +2991,7 @@ int ObTableLocation::record_insert_partition_info(
ObDMLStmt& stmt, const share::schema::ObTableSchema* table_schema, ObSQLSessionInfo* session_info)
{
int ret = OB_SUCCESS;
bool has_auto_inc_part_key = false;
bool partkey_has_subquery = false;
ObInsertStmt& insert_stmt = static_cast<ObInsertStmt&>(stmt);
ObSEArray<ColumnItem, 4> partition_columns;
......@@ -2998,7 +2999,9 @@ int ObTableLocation::record_insert_partition_info(
const ObRawExpr* partition_raw_expr = NULL;
if (OB_FAIL(insert_stmt.part_key_has_subquery(partkey_has_subquery))) {
LOG_WARN("failed to check whether insert stmt has part key", K(ret));
} else if ((insert_stmt.has_part_key_sequence() || partkey_has_subquery) &&
} else if (OB_FAIL(insert_stmt.part_key_has_auto_inc(has_auto_inc_part_key))) {
LOG_WARN("check to check whether part key containts auto inc column", K(ret));
} else if ((insert_stmt.has_part_key_sequence() || partkey_has_subquery || has_auto_inc_part_key) &&
PARTITION_LEVEL_ZERO != table_schema->get_part_level()) {
part_get_all_ = true;
} else if (OB_FAIL(get_partition_column_info(stmt,
......@@ -4621,7 +4624,7 @@ int ObTableLocation::record_insert_part_info(ObInsertStmt& insert_stmt, ObSQLSes
if (OB_ISNULL(value_desc->at(value_idx))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("value desc expr is null");
} else if (!value_desc->at(value_idx)->is_auto_increment()) {
} else {
if (OB_FAIL(value_need_idx.push_back(value_idx))) {
LOG_WARN("Failed to add value idx", K(ret));
}
......
......@@ -1875,6 +1875,10 @@ int ObInsertResolver::save_autoinc_params(uint64_t table_offset /*default 0*/)
if (OB_HIDDEN_PK_INCREMENT_COLUMN_ID == column_id) {
param.autoinc_increment_ = 1;
param.autoinc_offset_ = 1;
param.part_value_no_order_ = true;
} else if (column_schema->is_tbl_part_key_column()) {
// don't keep intra-partition value asc order when partkey column is auto inc
param.part_value_no_order_ = true;
}
uint64_t tid = insert_stmt->get_insert_table_id(table_offset);
const ObIArray<ObColumnRefRawExpr*>* table_columns = NULL;
......
......@@ -619,6 +619,32 @@ int ObInsertStmt::part_key_has_rand_value(bool& has)
return ret;
}
int ObInsertStmt::part_key_has_auto_inc(bool& has)
{
int ret = OB_SUCCESS;
const ObIArray<ObColumnRefRawExpr*>* table_columns = get_table_columns();
if (OB_ISNULL(table_columns)) {
ret = OB_ERR_UNEXPECTED;
} else {
has = false;
for (int64_t i = 0; OB_SUCCESS == ret && i < table_columns->count(); ++i) {
ObColumnRefRawExpr* col_expr = table_columns->at(i);
if (IS_SHADOW_COLUMN(col_expr->get_column_id())) {
// do nothing
} else if (OB_FAIL(ObTransformUtils::get_base_column(this, col_expr))) {
LOG_WARN("failed to get base column", K(ret));
} else if (OB_ISNULL(col_expr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (col_expr->is_table_part_key_column() && col_expr->is_auto_increment()) {
has = true;
break;
}
}
}
return ret;
}
int ObInsertStmt::part_key_has_subquery(bool& has)
{
int ret = OB_SUCCESS;
......
......@@ -209,6 +209,7 @@ public:
const common::ObIArray<ObRawExpr*>& other_exprs, const common::ObIArray<ObRawExpr*>& new_exprs) override;
int part_key_has_rand_value(bool& has);
int part_key_has_subquery(bool& has);
int part_key_has_auto_inc(bool& has);
int get_value_exprs(ObIArray<ObRawExpr*>& value_exprs);
// if generated col is partition key in heap table, we need to store all dep cols,
......
......@@ -2964,11 +2964,6 @@ int ObResolverUtils::resolve_columns_for_partition_expr(ObRawExpr*& expr, ObIArr
q_name.col_name_.ptr(),
scope_name.length(),
scope_name.ptr());
} else if (col_schema->is_autoincrement()) {
ret = OB_ERR_AUTO_PARTITION_KEY;
LOG_USER_ERROR(OB_ERR_AUTO_PARTITION_KEY,
col_schema->get_column_name_str().length(),
col_schema->get_column_name_str().ptr());
} else if (OB_FAIL(partition_keys.push_back(q_name.col_name_))) {
LOG_WARN("add column name failed", K(ret), K_(q_name.col_name));
} else if (OB_FAIL(ObRawExprUtils::init_column_expr(*col_schema, *col_expr))) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册