提交 924deae1 编写于 作者: O obdev 提交者: wangzelin.wzl

[CP] fix atomicity of htable query_and_mutate

上级 d84589f7
......@@ -132,6 +132,55 @@ int ObTableQueryAndMutateP::get_partition_ids(uint64_t table_id, ObIArray<int64_
return ret;
}
int ObTableQueryAndMutateP::rewrite_htable_query_if_need(const ObTableOperation &mutaion, ObTableQuery &query)
{
int ret = OB_SUCCESS;
if (ObTableEntityType::ET_HKV == arg_.entity_type_) {
const ObHTableFilter &htable_filter = query.get_htable_filter();
if (htable_filter.is_valid()) {
const ObIArray<common::ObNewRange> &key_ranges = query.get_scan_ranges();
if (key_ranges.count() != 1) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("the count of key range of increment query must be 1", K(ret));
} else {
const ObIArray<ObString> &columns = htable_filter.get_columns();
if (columns.count() < 1 && ObTableOperationType::DEL != mutaion.type()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("must specified at least one column qualifier except delete", K(ret));
} else if (columns.count() == 1) { // from tableapi java client's view, all ops are based on cq
const ObObj *start_key_ptr = key_ranges.at(0).start_key_.get_obj_ptr();
int64_t start_key_cnt = key_ranges.at(0).start_key_.length();
const ObObj *end_key_ptr = key_ranges.at(0).end_key_.get_obj_ptr();
int64_t end_key_cnt = key_ranges.at(0).end_key_.length();
if (start_key_cnt < 2 || end_key_cnt < 2) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("the rowkey must be longer than 2", K(ret), K(start_key_cnt), K(end_key_cnt));
} else {
ObObj htable_filter_cq;
htable_filter_cq.set_varbinary(columns.at(0));
ObObj &start_key_cq = const_cast<ObObj &>(start_key_ptr[1]);
ObObj &end_key_cq = const_cast<ObObj &>(end_key_ptr[1]);
if (OB_FAIL(ob_write_obj(allocator_, htable_filter_cq, start_key_cq))) {
LOG_WARN("fail to deep copy obobj", K(ret));
} else if (OB_FAIL(ob_write_obj(allocator_, htable_filter_cq, end_key_cq))) {
LOG_WARN("fail to deep copy obobj", K(ret));
} else {
if (ObTableOperationType::DEL != mutaion.type()) { // checkAnddelete delete all version
query.set_limit(1); // only lock one row
}
}
}
} else {
} // we have to scan additional rows to get result with multi-column
}
} else {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("htable query and mutate must have a valid htable filter", K(ret));
}
}
return ret;
}
int ObTableQueryAndMutateP::try_process()
{
int ret = OB_SUCCESS;
......@@ -151,7 +200,10 @@ int ObTableQueryAndMutateP::try_process()
ObTableQueryResultIterator *result_iterator = nullptr;
int32_t result_count = 0;
int64_t affected_rows = 0;
if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
const ObTableOperation &mutation = mutations.at(0);
if (OB_FAIL(rewrite_htable_query_if_need(mutation, const_cast<ObTableQuery &>(query)))) {
LOG_WARN("fail to rewrite query", K(ret));
} else if (OB_FAIL(get_table_id(arg_.table_name_, arg_.table_id_, table_id))) {
LOG_WARN("failed to get table id", K(ret));
} else if (OB_FAIL(get_partition_ids(table_id, part_ids))) {
LOG_WARN("failed to get part id", K(ret));
......@@ -162,10 +214,10 @@ int ObTableQueryAndMutateP::try_process()
} else if (OB_FAIL(start_trans(is_readonly, sql::stmt::T_UPDATE, table_id, part_ids,
get_timeout_ts()))) {
LOG_WARN("failed to start readonly transaction", K(ret));
} else if (OB_FAIL(table_service_->execute_query(query_ctx_, query,
one_result_, result_iterator))) {
} else if (OB_FAIL(table_service_->execute_query(
query_ctx_, query, one_result_, result_iterator, true /* for update */))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
LOG_WARN("failed to execute query", K(ret), K(table_id));
LOG_WARN("failed to execute query", K(ret), K(table_id), K(arg_.entity_type_));
}
} else {
// one_result references to result_
......@@ -175,8 +227,7 @@ int ObTableQueryAndMutateP::try_process()
if (OB_ITER_END == ret || OB_SUCC(ret)) {
ret = OB_SUCCESS;
one_result = &one_result_; // empty result is OK for APPEND and INCREMENT
const ObTableOperation &mutation = mutations.at(0);
switch(mutation.type()) {
switch (mutation.type()) {
case ObTableOperationType::DEL: // checkAndDelete
stat_event_type_ = ObTableProccessType::TABLE_API_HBASE_CHECK_AND_DELETE;
if (one_result->get_row_count() > 0) { // not empty result means check passed
......@@ -205,11 +256,14 @@ int ObTableQueryAndMutateP::try_process()
table_service_,
part_service_);
ret = put_executor.htable_put(mutations, put_rows);
} else {
ret = OB_NOT_SUPPORTED;
LOG_WARN("put with empty check result is not supported currently", K(ret));
}
break;
case ObTableOperationType::INCREMENT: // Increment
stat_event_type_ = ObTableProccessType::TABLE_API_HBASE_INCREMENT;
{ // one_result->get_row_count() >= 0
if (one_result->get_row_count() > 0) { // not empty result means check passed
affected_rows = 1;
ObHTableIncrementExecutor inc_executor(ObTableOperationType::INCREMENT,
allocator_,
......@@ -224,13 +278,15 @@ int ObTableQueryAndMutateP::try_process()
if (arg_.query_and_mutate_.return_affected_entity()) {
results = &result_.affected_entity_;
}
ret = inc_executor.htable_increment(*one_result, mutations,
put_cells, results);
ret = inc_executor.htable_increment(*one_result, mutations, put_cells, results);
} else {
ret = OB_NOT_SUPPORTED;
LOG_WARN("increment with empty check result is not supported currently", K(ret));
}
break;
case ObTableOperationType::APPEND: // Append
stat_event_type_ = ObTableProccessType::TABLE_API_HBASE_APPEND;
{ // one_result->get_row_count() >= 0
if (one_result->get_row_count() > 0) { // not empty result means check passed
affected_rows = 1;
ObHTableIncrementExecutor apd_executor(ObTableOperationType::APPEND,
allocator_,
......@@ -245,8 +301,10 @@ int ObTableQueryAndMutateP::try_process()
if (arg_.query_and_mutate_.return_affected_entity()) {
results = &result_.affected_entity_;
}
ret = apd_executor.htable_increment(*one_result, mutations,
put_cells, results);
ret = apd_executor.htable_increment(*one_result, mutations, put_cells, results);
} else {
ret = OB_NOT_SUPPORTED;
LOG_WARN("append with empty check result is not supported currently", K(ret));
}
break;
default:
......
......@@ -40,9 +40,9 @@ protected:
private:
int get_partition_ids(uint64_t table_id, common::ObIArray<int64_t> &part_ids);
int check_rowkey_and_generate_mutations(
ObTableQueryResult &one_row,
ObTableBatchOperation *&mutations);
int check_rowkey_and_generate_mutations(ObTableQueryResult &one_row, ObTableBatchOperation *&mutations);
// rewrite htable query to avoid lock too much rows for update
int rewrite_htable_query_if_need(const ObTableOperation &mutaion, ObTableQuery &query);
DISALLOW_COPY_AND_ASSIGN(ObTableQueryAndMutateP);
private:
common::ObArenaAllocator allocator_;
......
......@@ -1863,14 +1863,9 @@ int ObTableService::fill_query_scan_ranges(ObTableServiceCtx &ctx,
return ret;
}
int ObTableService::fill_query_scan_param(ObTableServiceCtx &ctx,
const ObIArray<uint64_t> &output_column_ids,
int64_t schema_version,
ObQueryFlag::ScanOrder scan_order,
uint64_t index_id,
int32_t limit,
int32_t offset,
storage::ObTableScanParam &scan_param)
int ObTableService::fill_query_scan_param(ObTableServiceCtx &ctx, const ObIArray<uint64_t> &output_column_ids,
int64_t schema_version, ObQueryFlag::ScanOrder scan_order, uint64_t index_id, int32_t limit, int32_t offset,
storage::ObTableScanParam &scan_param, bool for_update /* false */)
{
int ret = OB_SUCCESS;
const uint64_t table_id = ctx.param_.table_id_;
......@@ -1889,7 +1884,7 @@ int ObTableService::fill_query_scan_param(ObTableServiceCtx &ctx,
);
scan_param.scan_flag_.flag_ = query_flag.flag_;
scan_param.reserved_cell_count_ = output_column_ids.count() + 10;
scan_param.for_update_ = false;
scan_param.for_update_ = for_update;
scan_param.column_ids_.reset();
scan_param.pkey_ = part_key;
scan_param.schema_version_ = schema_version;
......@@ -2051,8 +2046,7 @@ int ObTableService::check_htable_query_args(const ObTableQuery &query)
}
}
if (OB_SUCC(ret)) {
if (0 != query.get_offset()
|| -1 != query.get_limit()) {
if (0 != query.get_offset()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("htable scan should not set Offset and Limit", K(ret), K(query));
} else if (ObQueryFlag::Forward != query.get_scan_order() && ObQueryFlag::Reverse != query.get_scan_order()) {
......@@ -2064,8 +2058,8 @@ int ObTableService::check_htable_query_args(const ObTableQuery &query)
}
int ObTableService::execute_query(ObTableServiceQueryCtx &ctx, const ObTableQuery &query,
table::ObTableQueryResult &one_result,
table::ObTableQueryResultIterator *&query_result)
table::ObTableQueryResult &one_result, table::ObTableQueryResultIterator *&query_result,
bool for_update /* false */)
{
int ret = OB_SUCCESS;
ObSEArray<uint64_t, COMMON_COLUMN_NUM> output_column_ids;
......@@ -2107,9 +2101,15 @@ int ObTableService::execute_query(ObTableServiceQueryCtx &ctx, const ObTableQuer
(table_id != index_id) ? padding_num : -1,
ctx.scan_param_))) {
LOG_WARN("failed to fill range", K(ret));
} else if (OB_FAIL(fill_query_scan_param(ctx, output_column_ids, schema_version,
query.get_scan_order(), index_id, query.get_limit(),
query.get_offset(), ctx.scan_param_))) {
} else if (OB_FAIL(fill_query_scan_param(ctx,
output_column_ids,
schema_version,
query.get_scan_order(),
index_id,
query.get_limit(),
query.get_offset(),
ctx.scan_param_,
for_update))) {
LOG_WARN("failed to fill param", K(ret));
} else if (OB_FAIL(part_service_->table_scan(ctx.scan_param_, ctx.scan_result_))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret) {
......
......@@ -257,9 +257,9 @@ public:
int multi_replace(ObTableServiceCtx &ctx, const ObTableBatchOperation &batch_operation, ObTableBatchOperationResult &result);
int multi_update(ObTableServiceGetCtx &ctx, const ObTableBatchOperation &batch_operation, ObTableBatchOperationResult &result);
int execute_query(ObTableServiceQueryCtx &ctx, const ObTableQuery &query, table::ObTableQueryResult &one_result,
table::ObTableQueryResultIterator *&query_result, bool for_update = false);
int batch_execute(ObTableServiceGetCtx &ctx, const ObTableBatchOperation &batch_operation, ObTableBatchOperationResult &result);
int execute_query(ObTableServiceQueryCtx &ctx, const ObTableQuery &query,
table::ObTableQueryResult &one_result, table::ObTableQueryResultIterator *&query_result);
int execute_ttl_delete(ObTableServiceTTLCtx &ctx, const ObTableTTLOperation &ttl_operation, ObTableTTLOperationResult &result);
private:
static int cons_rowkey_infos(const share::schema::ObTableSchema &table_schema,
......@@ -357,7 +357,8 @@ private:
uint64_t index_id,
int32_t limit,
int32_t offset,
storage::ObTableScanParam &scan_param);
storage::ObTableScanParam &scan_param,
bool for_update = false);
int check_htable_query_args(const ObTableQuery &query);
private:
int fill_new_entity(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册