From 924deae1ac707fffb2ee51bba5de30f44d860cfa Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 6 Jul 2022 17:33:36 +0800 Subject: [PATCH] [CP] fix atomicity of htable query_and_mutate --- .../ob_table_query_and_mutate_processor.cpp | 82 ++++++++++++++++--- .../ob_table_query_and_mutate_processor.h | 6 +- src/observer/table/ob_table_service.cpp | 32 ++++---- src/observer/table/ob_table_service.h | 7 +- 4 files changed, 93 insertions(+), 34 deletions(-) diff --git a/src/observer/table/ob_table_query_and_mutate_processor.cpp b/src/observer/table/ob_table_query_and_mutate_processor.cpp index 0e8ccc7372..19d436b74a 100644 --- a/src/observer/table/ob_table_query_and_mutate_processor.cpp +++ b/src/observer/table/ob_table_query_and_mutate_processor.cpp @@ -132,6 +132,55 @@ int ObTableQueryAndMutateP::get_partition_ids(uint64_t table_id, ObIArray &key_ranges = query.get_scan_ranges(); + if (key_ranges.count() != 1) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("the count of key range of increment query must be 1", K(ret)); + } else { + const ObIArray &columns = htable_filter.get_columns(); + if (columns.count() < 1 && ObTableOperationType::DEL != mutaion.type()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("must specified at least one column qualifier except delete", K(ret)); + } else if (columns.count() == 1) { // from tableapi java client's view, all ops are based on cq + const ObObj *start_key_ptr = key_ranges.at(0).start_key_.get_obj_ptr(); + int64_t start_key_cnt = key_ranges.at(0).start_key_.length(); + const ObObj *end_key_ptr = key_ranges.at(0).end_key_.get_obj_ptr(); + int64_t end_key_cnt = key_ranges.at(0).end_key_.length(); + if (start_key_cnt < 2 || end_key_cnt < 2) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("the rowkey must be longer than 2", K(ret), K(start_key_cnt), K(end_key_cnt)); + } else { + ObObj htable_filter_cq; + htable_filter_cq.set_varbinary(columns.at(0)); + ObObj &start_key_cq = const_cast(start_key_ptr[1]); + ObObj &end_key_cq = const_cast(end_key_ptr[1]); + if (OB_FAIL(ob_write_obj(allocator_, htable_filter_cq, start_key_cq))) { + LOG_WARN("fail to deep copy obobj", K(ret)); + } else if (OB_FAIL(ob_write_obj(allocator_, htable_filter_cq, end_key_cq))) { + LOG_WARN("fail to deep copy obobj", K(ret)); + } else { + if (ObTableOperationType::DEL != mutaion.type()) { // checkAnddelete delete all version + query.set_limit(1); // only lock one row + } + } + } + } else { + } // we have to scan additional rows to get result with multi-column + } + } else { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("htable query and mutate must have a valid htable filter", K(ret)); + } + } + return ret; +} + int ObTableQueryAndMutateP::try_process() { int ret = OB_SUCCESS; @@ -151,7 +200,10 @@ int ObTableQueryAndMutateP::try_process() ObTableQueryResultIterator *result_iterator = nullptr; int32_t result_count = 0; int64_t affected_rows = 0; - if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) { + const ObTableOperation &mutation = mutations.at(0); + if (OB_FAIL(rewrite_htable_query_if_need(mutation, const_cast(query)))) { + LOG_WARN("fail to rewrite query", K(ret)); + } else if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) { LOG_WARN("failed to get table id", K(ret)); } else if (OB_FAIL(get_partition_ids(table_id, part_ids))) { LOG_WARN("failed to get part id", K(ret)); @@ -162,10 +214,10 @@ int ObTableQueryAndMutateP::try_process() } else if (OB_FAIL(start_trans(is_readonly, sql::stmt::T_UPDATE, table_id, part_ids, get_timeout_ts()))) { LOG_WARN("failed to start readonly transaction", K(ret)); - } else if (OB_FAIL(table_service_->execute_query(query_ctx_, query, - one_result_, result_iterator))) { + } else if (OB_FAIL(table_service_->execute_query( + query_ctx_, query, one_result_, result_iterator, true /* for update */))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret) { - LOG_WARN("failed to execute query", K(ret), K(table_id)); + LOG_WARN("failed to execute query", K(ret), K(table_id), K(arg_.entity_type_)); } } else { // one_result references to result_ @@ -175,8 +227,7 @@ int ObTableQueryAndMutateP::try_process() if (OB_ITER_END == ret || OB_SUCC(ret)) { ret = OB_SUCCESS; one_result = &one_result_; // empty result is OK for APPEND and INCREMENT - const ObTableOperation &mutation = mutations.at(0); - switch(mutation.type()) { + switch (mutation.type()) { case ObTableOperationType::DEL: // checkAndDelete stat_event_type_ = ObTableProccessType::TABLE_API_HBASE_CHECK_AND_DELETE; if (one_result->get_row_count() > 0) { // not empty result means check passed @@ -205,11 +256,14 @@ int ObTableQueryAndMutateP::try_process() table_service_, part_service_); ret = put_executor.htable_put(mutations, put_rows); + } else { + ret = OB_NOT_SUPPORTED; + LOG_WARN("put with empty check result is not supported currently", K(ret)); } break; case ObTableOperationType::INCREMENT: // Increment stat_event_type_ = ObTableProccessType::TABLE_API_HBASE_INCREMENT; - { // one_result->get_row_count() >= 0 + if (one_result->get_row_count() > 0) { // not empty result means check passed affected_rows = 1; ObHTableIncrementExecutor inc_executor(ObTableOperationType::INCREMENT, allocator_, @@ -224,13 +278,15 @@ int ObTableQueryAndMutateP::try_process() if (arg_.query_and_mutate_.return_affected_entity()) { results = &result_.affected_entity_; } - ret = inc_executor.htable_increment(*one_result, mutations, - put_cells, results); + ret = inc_executor.htable_increment(*one_result, mutations, put_cells, results); + } else { + ret = OB_NOT_SUPPORTED; + LOG_WARN("increment with empty check result is not supported currently", K(ret)); } break; case ObTableOperationType::APPEND: // Append stat_event_type_ = ObTableProccessType::TABLE_API_HBASE_APPEND; - { // one_result->get_row_count() >= 0 + if (one_result->get_row_count() > 0) { // not empty result means check passed affected_rows = 1; ObHTableIncrementExecutor apd_executor(ObTableOperationType::APPEND, allocator_, @@ -245,8 +301,10 @@ int ObTableQueryAndMutateP::try_process() if (arg_.query_and_mutate_.return_affected_entity()) { results = &result_.affected_entity_; } - ret = apd_executor.htable_increment(*one_result, mutations, - put_cells, results); + ret = apd_executor.htable_increment(*one_result, mutations, put_cells, results); + } else { + ret = OB_NOT_SUPPORTED; + LOG_WARN("append with empty check result is not supported currently", K(ret)); } break; default: diff --git a/src/observer/table/ob_table_query_and_mutate_processor.h b/src/observer/table/ob_table_query_and_mutate_processor.h index a0aeb745f4..67a6a19379 100644 --- a/src/observer/table/ob_table_query_and_mutate_processor.h +++ b/src/observer/table/ob_table_query_and_mutate_processor.h @@ -40,9 +40,9 @@ protected: private: int get_partition_ids(uint64_t table_id, common::ObIArray &part_ids); - int check_rowkey_and_generate_mutations( - ObTableQueryResult &one_row, - ObTableBatchOperation *&mutations); + int check_rowkey_and_generate_mutations(ObTableQueryResult &one_row, ObTableBatchOperation *&mutations); + // rewrite htable query to avoid lock too much rows for update + int rewrite_htable_query_if_need(const ObTableOperation &mutaion, ObTableQuery &query); DISALLOW_COPY_AND_ASSIGN(ObTableQueryAndMutateP); private: common::ObArenaAllocator allocator_; diff --git a/src/observer/table/ob_table_service.cpp b/src/observer/table/ob_table_service.cpp index 40dbd9f167..9b86ec90eb 100644 --- a/src/observer/table/ob_table_service.cpp +++ b/src/observer/table/ob_table_service.cpp @@ -1863,14 +1863,9 @@ int ObTableService::fill_query_scan_ranges(ObTableServiceCtx &ctx, return ret; } -int ObTableService::fill_query_scan_param(ObTableServiceCtx &ctx, - const ObIArray &output_column_ids, - int64_t schema_version, - ObQueryFlag::ScanOrder scan_order, - uint64_t index_id, - int32_t limit, - int32_t offset, - storage::ObTableScanParam &scan_param) +int ObTableService::fill_query_scan_param(ObTableServiceCtx &ctx, const ObIArray &output_column_ids, + int64_t schema_version, ObQueryFlag::ScanOrder scan_order, uint64_t index_id, int32_t limit, int32_t offset, + storage::ObTableScanParam &scan_param, bool for_update /* false */) { int ret = OB_SUCCESS; const uint64_t table_id = ctx.param_.table_id_; @@ -1889,7 +1884,7 @@ int ObTableService::fill_query_scan_param(ObTableServiceCtx &ctx, ); scan_param.scan_flag_.flag_ = query_flag.flag_; scan_param.reserved_cell_count_ = output_column_ids.count() + 10; - scan_param.for_update_ = false; + scan_param.for_update_ = for_update; scan_param.column_ids_.reset(); scan_param.pkey_ = part_key; scan_param.schema_version_ = schema_version; @@ -2051,8 +2046,7 @@ int ObTableService::check_htable_query_args(const ObTableQuery &query) } } if (OB_SUCC(ret)) { - if (0 != query.get_offset() - || -1 != query.get_limit()) { + if (0 != query.get_offset()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("htable scan should not set Offset and Limit", K(ret), K(query)); } else if (ObQueryFlag::Forward != query.get_scan_order() && ObQueryFlag::Reverse != query.get_scan_order()) { @@ -2064,8 +2058,8 @@ int ObTableService::check_htable_query_args(const ObTableQuery &query) } int ObTableService::execute_query(ObTableServiceQueryCtx &ctx, const ObTableQuery &query, - table::ObTableQueryResult &one_result, - table::ObTableQueryResultIterator *&query_result) + table::ObTableQueryResult &one_result, table::ObTableQueryResultIterator *&query_result, + bool for_update /* false */) { int ret = OB_SUCCESS; ObSEArray output_column_ids; @@ -2107,9 +2101,15 @@ int ObTableService::execute_query(ObTableServiceQueryCtx &ctx, const ObTableQuer (table_id != index_id) ? padding_num : -1, ctx.scan_param_))) { LOG_WARN("failed to fill range", K(ret)); - } else if (OB_FAIL(fill_query_scan_param(ctx, output_column_ids, schema_version, - query.get_scan_order(), index_id, query.get_limit(), - query.get_offset(), ctx.scan_param_))) { + } else if (OB_FAIL(fill_query_scan_param(ctx, + output_column_ids, + schema_version, + query.get_scan_order(), + index_id, + query.get_limit(), + query.get_offset(), + ctx.scan_param_, + for_update))) { LOG_WARN("failed to fill param", K(ret)); } else if (OB_FAIL(part_service_->table_scan(ctx.scan_param_, ctx.scan_result_))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret) { diff --git a/src/observer/table/ob_table_service.h b/src/observer/table/ob_table_service.h index 9fba18007c..a2a1832f26 100644 --- a/src/observer/table/ob_table_service.h +++ b/src/observer/table/ob_table_service.h @@ -257,9 +257,9 @@ public: int multi_replace(ObTableServiceCtx &ctx, const ObTableBatchOperation &batch_operation, ObTableBatchOperationResult &result); int multi_update(ObTableServiceGetCtx &ctx, const ObTableBatchOperation &batch_operation, ObTableBatchOperationResult &result); + int execute_query(ObTableServiceQueryCtx &ctx, const ObTableQuery &query, table::ObTableQueryResult &one_result, + table::ObTableQueryResultIterator *&query_result, bool for_update = false); int batch_execute(ObTableServiceGetCtx &ctx, const ObTableBatchOperation &batch_operation, ObTableBatchOperationResult &result); - int execute_query(ObTableServiceQueryCtx &ctx, const ObTableQuery &query, - table::ObTableQueryResult &one_result, table::ObTableQueryResultIterator *&query_result); int execute_ttl_delete(ObTableServiceTTLCtx &ctx, const ObTableTTLOperation &ttl_operation, ObTableTTLOperationResult &result); private: static int cons_rowkey_infos(const share::schema::ObTableSchema &table_schema, @@ -357,7 +357,8 @@ private: uint64_t index_id, int32_t limit, int32_t offset, - storage::ObTableScanParam &scan_param); + storage::ObTableScanParam &scan_param, + bool for_update = false); int check_htable_query_args(const ObTableQuery &query); private: int fill_new_entity( -- GitLab