提交 52e5b99c 编写于 作者: O obdev 提交者: wangzelin.wzl

[CP] fix: add defensive code for table api global index

上级 3d8ce9cf
......@@ -150,9 +150,16 @@ int ObTableBatchExecuteP::try_process()
{
int ret = OB_SUCCESS;
const ObTableBatchOperation &batch_operation = arg_.batch_operation_;
uint64_t table_id = arg_.table_id_;
bool is_index_supported = true;
if (batch_operation.count() <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("no operation in the batch", K(ret));
} else if (OB_FAIL(check_table_index_supported(table_id, is_index_supported))) {
LOG_WARN("fail to check index supported", K(ret), K(table_id));
} else if (OB_UNLIKELY(!is_index_supported)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("index type is not supported by table api", K(ret));
} else {
if (batch_operation.is_readonly()) {
if (batch_operation.is_same_properties_names()) {
......
......@@ -112,47 +112,57 @@ int ObTableApiExecuteP::process()
int ObTableApiExecuteP::try_process()
{
int ret = OB_SUCCESS;
uint64_t table_id = arg_.table_id_;
bool is_index_supported = true;
const ObTableOperation &table_operation = arg_.table_operation_;
switch (table_operation.type()) {
case ObTableOperationType::INSERT:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_INSERT;
ret = process_insert();
break;
case ObTableOperationType::GET:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_GET;
ret = process_get();
break;
case ObTableOperationType::DEL:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_DELETE;
ret = process_del();
break;
case ObTableOperationType::UPDATE:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_UPDATE;
ret = process_update();
break;
case ObTableOperationType::INSERT_OR_UPDATE:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_INSERT_OR_UPDATE;
ret = process_insert_or_update();
break;
case ObTableOperationType::REPLACE:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_REPLACE;
ret = process_replace();
break;
case ObTableOperationType::INCREMENT:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_INCREMENT;
ret = process_increment();
break;
case ObTableOperationType::APPEND:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_APPEND;
// for both increment and append
ret = process_increment();
break;
default:
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid table operation type", K(ret), K(table_operation));
break;
if (ObTableOperationType::GET != table_operation.type() &&
OB_FAIL(check_table_index_supported(table_id, is_index_supported))) {
LOG_WARN("fail to check index supported", K(ret), K(table_id));
} else if (OB_UNLIKELY(!is_index_supported)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("index type is not supported by table api", K(ret));
} else {
switch (table_operation.type()) {
case ObTableOperationType::INSERT:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_INSERT;
ret = process_insert();
break;
case ObTableOperationType::GET:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_GET;
ret = process_get();
break;
case ObTableOperationType::DEL:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_DELETE;
ret = process_del();
break;
case ObTableOperationType::UPDATE:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_UPDATE;
ret = process_update();
break;
case ObTableOperationType::INSERT_OR_UPDATE:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_INSERT_OR_UPDATE;
ret = process_insert_or_update();
break;
case ObTableOperationType::REPLACE:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_REPLACE;
ret = process_replace();
break;
case ObTableOperationType::INCREMENT:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_INCREMENT;
ret = process_increment();
break;
case ObTableOperationType::APPEND:
stat_event_type_ = ObTableProccessType::TABLE_API_SINGLE_APPEND;
// for both increment and append
ret = process_increment();
break;
default:
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid table operation type", K(ret), K(table_operation));
break;
}
audit_row_count_ = 1;
}
audit_row_count_ = 1;
#ifndef NDEBUG
// debug mode
......
......@@ -201,10 +201,16 @@ int ObTableQueryAndMutateP::try_process()
int32_t result_count = 0;
int64_t affected_rows = 0;
const ObTableOperation &mutation = mutations.at(0);
if (OB_FAIL(rewrite_htable_query_if_need(mutation, const_cast<ObTableQuery &>(query)))) {
LOG_WARN("fail to rewrite query", K(ret));
} else if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
bool is_index_supported = true;
if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
LOG_WARN("failed to get table id", K(ret));
} else if (OB_FAIL(check_table_index_supported(table_id, is_index_supported))) {
LOG_WARN("fail to check index supported", K(ret), K(table_id));
} else if (OB_UNLIKELY(!is_index_supported)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("index type is not supported by table api", K(ret));
} else if (OB_FAIL(rewrite_htable_query_if_need(mutation, const_cast<ObTableQuery &>(query)))) {
LOG_WARN("fail to rewrite query", K(ret));
} else if (OB_FAIL(get_partition_ids(table_id, part_ids))) {
LOG_WARN("failed to get part id", K(ret));
} else if (1 != part_ids.count()) {
......
......@@ -909,6 +909,39 @@ int ObTableApiProcessorBase::process_with_retry(const ObString &credential, cons
return ret;
}
// check whether the index type of given table is supported by table api or not.
// global index is not supported by table api. specially, global index in non-partitioned
// table was optimized to local index, which we can support.
int ObTableApiProcessorBase::check_table_index_supported(uint64_t table_id, bool &is_supported)
{
int ret = OB_SUCCESS;
bool exists = false;
is_supported = true;
schema::ObSchemaGetterGuard schema_guard;
const schema::ObSimpleTableSchemaV2 *table_schema = NULL;
if (OB_INVALID_ID == table_id) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid table id", K(ret));
} else if (OB_ISNULL(gctx_.schema_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid schema service", K(ret));
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(credential_.tenant_id_, schema_guard))) {
LOG_WARN("fail to get schema guard", K(ret), K(credential_.tenant_id_));
} else if (OB_FAIL(schema_guard.get_table_schema(table_id, table_schema))) {
LOG_WARN("fail to get table schema", K(ret), K(table_id));
} else if (OB_ISNULL(table_schema)) {
ret = OB_SCHEMA_ERROR;
LOG_WARN("get null table schema", K(ret), K(table_id));
} else if (table_schema->is_partitioned_table()) {
if (OB_FAIL(schema_guard.check_global_index_exist(credential_.tenant_id_, table_id, exists))) {
LOG_WARN("fail to check global index", K(ret), K(table_id));
} else {
is_supported = !exists;
}
}
return ret;
}
////////////////////////////////////////////////////////////////
template class oceanbase::observer::ObTableRpcProcessor<ObTableRpcProxy::ObRpc<OB_TABLE_API_EXECUTE> >;
template class oceanbase::observer::ObTableRpcProcessor<ObTableRpcProxy::ObRpc<OB_TABLE_API_BATCH_EXECUTE> >;
......
......@@ -156,6 +156,7 @@ protected:
virtual void audit_on_finish() {}
virtual void save_request_string() = 0;
virtual void generate_sql_id() = 0;
virtual int check_table_index_supported(uint64_t table_id, bool &is_supported);
// set trans consistency level
void set_consistency_level(const ObTableConsistencyLevel consistency_level) { consistency_level_ = consistency_level; }
......
......@@ -1749,7 +1749,13 @@ int ObTableService::fill_query_table_param(uint64_t table_id,
LOG_DEBUG("[xilin debug]padding", K(padding_num), K(key_column_cnt), K(index_name));
const bool index_back = (index_id != table_id);
if (OB_FAIL(cons_rowkey_infos(*table_schema, NULL, index_back ? NULL : &rowkey_columns_type))) {
bool is_index_supported = true;
if (index_back && OB_FAIL(check_index_supported(schema_guard, table_schema, index_id, is_index_supported))) {
LOG_WARN("fail to check index supported", K(ret), K(index_id));
} else if (OB_UNLIKELY(!is_index_supported)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("index type is not supported by table api", K(ret), K(table_id), K(index_id));
} else if (OB_FAIL(cons_rowkey_infos(*table_schema, NULL, index_back ? NULL : &rowkey_columns_type))) {
} else if (OB_FAIL(cons_properties_infos(*table_schema, properties, output_column_ids, NULL))) {
} else if (OB_FAIL(table_param.convert(*table_schema, ((NULL == index_schema) ? *table_schema: *index_schema),
output_column_ids, index_back))) {
......@@ -2350,4 +2356,30 @@ int ObTableTTLDeleteRowIterator::init(const ObTableTTLOperation &ttl_operation)
is_inited_ = true;
}
return ret;
}
// check whether index is supported in given table schema by table api
// global index is not supported by table api. specially, global index in non-partitioned
// table was optimized to local index, which we can support.
int ObTableService::check_index_supported(schema::ObSchemaGetterGuard &schema_guard,
const schema::ObSimpleTableSchemaV2 *table_schema, uint64_t index_id, bool &is_supported)
{
int ret = OB_SUCCESS;
is_supported = true;
const schema::ObSimpleTableSchemaV2 *index_schema = NULL;
if (OB_ISNULL(table_schema)) {
ret = OB_ERR_NULL_VALUE;
LOG_WARN("null table schema", K(ret));
} else if (table_schema->is_partitioned_table()) {
if (OB_FAIL(schema_guard.get_table_schema(index_id, index_schema))) {
LOG_WARN("fail to get table schmea", K(ret), K(index_id));
} else if (OB_ISNULL(index_schema)) {
ret = OB_SCHEMA_ERROR;
LOG_WARN("get null index schema", K(ret), K(index_id));
} else if (index_schema->is_global_index_table()) {
is_supported = false;
}
}
return ret;
}
\ No newline at end of file
......@@ -360,6 +360,11 @@ private:
storage::ObTableScanParam &scan_param,
bool for_update = false);
int check_htable_query_args(const ObTableQuery &query);
int check_index_supported(share::schema::ObSchemaGetterGuard &schema_guard,
const share::schema::ObSimpleTableSchemaV2 *table_schema,
uint64_t index_id,
bool &is_supported);
private:
int fill_new_entity(
bool returning_rowkey,
......
......@@ -7218,6 +7218,41 @@ int ObSchemaGetterGuard::get_timestamp_service_type(const uint64_t tenant_id, in
return ret;
}
// check whether the given table has global index or not
int ObSchemaGetterGuard::check_global_index_exist(const uint64_t tenant_id, const uint64_t table_id, bool &exist)
{
int ret = OB_SUCCESS;
exist = false;
const ObTableSchema *table_schema = NULL;
if (OB_FAIL(check_tenant_schema_guard(tenant_id))) {
LOG_WARN("fail to check tenant schema guard", K(ret), K(tenant_id), K_(tenant_id));
} else if (OB_FAIL(get_table_schema(table_id, table_schema))) {
LOG_WARN("fail to get table schema", K(ret), K(table_id));
} else if (OB_ISNULL(table_schema)) {
ret = OB_SCHEMA_ERROR;
LOG_WARN("get null table schema", K(ret), K(table_id));
} else {
ObSEArray<ObAuxTableMetaInfo, 16> simple_index_infos;
const ObTableSchema *index_schema = NULL;
if (OB_FAIL(table_schema->get_simple_index_infos_without_delay_deleted_tid(simple_index_infos))) {
LOG_WARN("get simple_index_infos without delay_deleted_tid failed", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && !exist && i < simple_index_infos.count(); ++i) {
if (OB_FAIL(get_table_schema(simple_index_infos.at(i).table_id_, index_schema))) {
LOG_WARN("fail to get index schema", K(ret), K(simple_index_infos.at(i)));
} else if (OB_ISNULL(index_schema)) {
ret = OB_SCHEMA_ERROR;
LOG_WARN("index schema should not be null", K(ret));
} else if (index_schema->can_read_index() && index_schema->is_index_visible() &&
index_schema->is_global_index_table()) {
exist = true;
} else { /* do nothing */ }
}
}
return ret;
}
} // end of namespace schema
} // end of namespace share
} // end of namespace oceanbase
......@@ -542,6 +542,7 @@ public:
int get_sys_priv_with_tenant_id(const uint64_t tenant_id, common::ObIArray<const ObSysPriv*>& sys_privs);
int get_sys_priv_with_grantee_id(const uint64_t tenant_id, const uint64_t grantee_id, ObSysPriv*& sys_priv);
int check_global_index_exist(const uint64_t tenant_id, const uint64_t table_id, bool &exist);
public:
// for optimize
......
......@@ -350,7 +350,8 @@ int ObCodeGeneratorImpl::set_other_properties(const ObLogPlan& log_plan, ObPhysi
bool has_dep_table = false;
for (int64_t i = 0; OB_SUCC(ret) && !exist && i < dependency_table->count(); i++) {
if (DEPENDENCY_TABLE == dependency_table->at(i).object_type_) {
if (OB_FAIL(ObSQLUtils::has_global_index(schema_guard, dependency_table->at(i).object_id_, exist))) {
uint64_t tenant_id = extract_tenant_id(dependency_table->at(i).object_id_);
if (OB_FAIL(schema_guard->check_global_index_exist(tenant_id, dependency_table->at(i).object_id_, exist))) {
LOG_WARN("fail to judge global index", K(ret));
}
has_dep_table = true;
......
......@@ -2594,41 +2594,7 @@ int ObSQLUtils::choose_best_partition_replica_addr(const ObAddr& local_addr,
return ret;
}
int ObSQLUtils::has_global_index(
share::schema::ObSchemaGetterGuard* schema_guard, const uint64_t table_id, bool& exists)
{
int ret = OB_SUCCESS;
const ObTableSchema* index_schema = NULL;
uint64_t index_ids[OB_MAX_INDEX_PER_TABLE + 1];
int64_t index_count = OB_MAX_INDEX_PER_TABLE + 1;
exists = false;
if (OB_ISNULL(schema_guard)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("null schema guard", K(ret));
} else if (OB_FAIL(schema_guard->get_can_read_index_array(table_id, index_ids, index_count, false))) {
LOG_WARN("failed to get can read index", K(ret));
} else if (index_count > OB_MAX_INDEX_PER_TABLE + 1) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table index count is bigger than OB_MAX_INDEX_PER_TABLE", K(ret), K(index_count));
} else {
for (int64_t i = 0; OB_SUCC(ret) && !exists && i < index_count; i++) {
if (OB_FAIL(schema_guard->get_table_schema(index_ids[i], index_schema))) {
LOG_WARN("failed to get index schema", K(ret));
} else if (OB_ISNULL(index_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null index schema", K(ret));
} else if (index_schema->is_global_index_table()) {
exists = true;
} else { /*do nothing*/
}
}
}
LOG_TRACE("finish to check whether table has global index", K(exists), K(table_id));
return ret;
}
int ObSQLUtils::wrap_column_convert_ctx(const ObExprCtx& expr_ctx, ObCastCtx& column_conv_ctx)
int ObSQLUtils::wrap_column_convert_ctx(const ObExprCtx &expr_ctx, ObCastCtx &column_conv_ctx)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(expr_ctx.my_session_)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册