提交 94816acd 编写于 作者: O obdev 提交者: wangzelin.wzl

patch code from 3_1_x_release

上级 ef2b6227
......@@ -806,6 +806,7 @@ int ObMacroBlockWriter::check_order(const ObStoreRow& row)
}
} else if (!row.row_type_flag_.is_uncommitted_row()) { // update max commit version
micro_writer_->update_max_merged_trans_version(-cur_row_version);
row.row_val_.cells_[sql_sequence_col_idx].set_int(0); // make sql sequence = 0
} else { // not committed
micro_writer_->set_contain_uncommitted_row();
LOG_TRACE("meet uncommited trans row", K(row));
......
......@@ -70,6 +70,7 @@ public:
{
return task_index_writer_->get_macro_block_write_ctx();
}
int dump_micro_block_writer_buffer();
TO_STRING_KV(K_(block_write_ctx));
struct IndexMicroBlockBuilder {
......@@ -159,7 +160,6 @@ private:
int prepare_micro_block_reader(const char* buf, const int64_t size, ObIMicroBlockReader*& micro_reader);
int print_micro_block_row(ObIMicroBlockReader* micro_reader);
int dump_micro_block_writer_buffer();
private:
static const int64_t DEFAULT_MACRO_BLOCK_COUNT = 128;
......
......@@ -1363,7 +1363,7 @@ int ObMinorPartitionMergeFuser::set_multi_version_row_flag(
const MERGE_ITER_ARRAY& macro_row_iters, ObStoreRow& store_row)
{
int ret = OB_SUCCESS;
store_row.row_type_flag_.set_compacted_multi_version_row(false);
store_row.row_type_flag_.set_compacted_multi_version_row(true);
store_row.row_type_flag_.set_first_multi_version_row(true);
if (need_check_curr_row_last_) {
store_row.row_type_flag_.set_last_multi_version_row(true);
......@@ -1372,9 +1372,8 @@ int ObMinorPartitionMergeFuser::set_multi_version_row_flag(
}
for (int64_t i = 0; i < macro_row_iters.count(); ++i) {
if (macro_row_iters.at(i)->get_curr_row()->row_type_flag_.is_compacted_multi_version_row() ||
!macro_row_iters.at(i)->get_table()->is_multi_version_table()) {
store_row.row_type_flag_.set_compacted_multi_version_row(true);
if (!macro_row_iters.at(i)->get_curr_row()->row_type_flag_.is_compacted_multi_version_row()) {
store_row.row_type_flag_.set_compacted_multi_version_row(false);
break;
}
}
......
......@@ -45,7 +45,9 @@ ObMacroBlockBuilder::ObMacroBlockBuilder()
need_build_bloom_filter_(false),
bf_macro_writer_(),
cols_id_map_(nullptr),
is_opened_(false)
is_opened_(false),
check_row_flag_status_(CHECK_FIRST_ROW),
last_compact_row_nop_cnt_(-1)
{}
ObMacroBlockBuilder::~ObMacroBlockBuilder()
......@@ -352,6 +354,8 @@ int ObMacroBlockBuilder::process(const blocksstable::ObMacroBlockCtx& macro_bloc
} else if (OB_FAIL(writer_->append_macro_block(macro_block_ctx))) {
STORAGE_LOG(WARN, "macro block writer fail to close.", K(ret));
} else {
check_row_flag_status_ = CHECK_FIRST_ROW;
last_compact_row_nop_cnt_ = -1;
STORAGE_LOG(DEBUG, "Success to append macro block, ", K(macro_block_ctx));
}
return ret;
......@@ -423,12 +427,18 @@ int ObMacroBlockBuilder::check_flat_row_columns(const ObStoreRow& row)
{
int ret = OB_SUCCESS;
if (ObActionFlag::OP_ROW_EXIST != row.flag_) {
if (row.row_type_flag_.is_last_multi_version_row()) { // meet last row
check_row_flag_status_ = CHECK_FIRST_ROW;
last_compact_row_nop_cnt_ = -1;
}
} else if (row.row_val_.count_ != desc_.row_column_count_) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("Unexpected column count of store row", K(row), K_(desc), K(ret));
} else {
const int64_t interval = 4;
int64_t i = 0;
int64_t nop_pos_cnt = 0;
bool check_nop_pos_flag = desc_.is_multi_version_minor_sstable();
for (i = 0; i + interval < row.row_val_.count_; i += interval) {
const int tmp0 = check_row_column(row, i + 0);
const int tmp1 = check_row_column(row, i + 1);
......@@ -438,14 +448,43 @@ int ObMacroBlockBuilder::check_flat_row_columns(const ObStoreRow& row)
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to check row column", K(ret), K(i), K(interval), K(row));
break;
} else if (check_nop_pos_flag) {
nop_pos_cnt += (row.row_val_.cells_[i].is_nop_value()
+ row.row_val_.cells_[i + 1].is_nop_value()
+ row.row_val_.cells_[i + 2].is_nop_value()
+ row.row_val_.cells_[i + 3].is_nop_value());
}
}
for (; OB_SUCC(ret) && i < row.row_val_.count_; ++i) {
if (OB_FAIL(check_row_column(row, i))) {
LOG_WARN("failed to check row column", K(ret), K(i));
} else if (check_nop_pos_flag) {
nop_pos_cnt += row.row_val_.cells_[i].is_nop_value();
}
}
if (OB_SUCC(ret) && check_nop_pos_flag) {
if (row.row_type_flag_.is_uncommitted_row()) {
// do nothing
} else if (CHECK_FIRST_ROW == check_row_flag_status_) { // meet first committed row
check_row_flag_status_ = CHECK_LAST_ROW;
} else if (CHECK_LAST_ROW == check_row_flag_status_) {
if (nop_pos_cnt < last_compact_row_nop_cnt_) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("nop_cnt of current row is less than last compact row", K(ret), K(nop_pos_cnt),
K(row), K(last_compact_row_nop_cnt_));
}
}
if (row.row_type_flag_.is_last_multi_version_row()) { // meet last row
check_row_flag_status_ = CHECK_FIRST_ROW;
last_compact_row_nop_cnt_ = -1;
} else if (row.row_type_flag_.is_compacted_multi_version_row()
&& ObActionFlag::OP_ROW_DOES_NOT_EXIST != row.flag_
&& ObActionFlag::OP_DEL_ROW != row.flag_) {
last_compact_row_nop_cnt_ = nop_pos_cnt;
}
}
}
return ret;
......@@ -550,6 +589,7 @@ int ObMacroBlockBuilder::process(const ObStoreRow& row, const ObCompactRowType::
STORAGE_LOG(WARN, "The row is invalid, ", K(row), K(ret));
} else if (OB_FAIL(check_row_columns(row))) {
STORAGE_LOG(WARN, "The row is invalid, ", K(row), K_(desc), K(ret));
writer_->dump_micro_block_writer_buffer();
} else if (!is_multi_version_minor_merge(merge_type_)) {
if ((ObActionFlag::OP_ROW_EXIST == row.flag_ || row.row_type_flag_.is_uncommitted_row()) &&
OB_FAIL(writer_->append_row(row))) {
......@@ -690,6 +730,8 @@ void ObMacroBlockBuilder::reset()
need_build_bloom_filter_ = false;
bf_macro_writer_.reset();
is_opened_ = false;
check_row_flag_status_ = CHECK_FIRST_ROW;
last_compact_row_nop_cnt_ = -1;
}
void ObMacroBlockBuilder::set_purged_count(const int64_t count)
......
......@@ -96,6 +96,11 @@ protected:
OB_INLINE int check_sparse_row_column(const common::ObObj& obj, const int64_t idx);
int append_bloom_filter(const storage::ObStoreRow& row);
enum CheckRowFlagStatus
{
CHECK_FIRST_ROW = 0,
CHECK_LAST_ROW = 1,
};
private:
storage::ObMergeType merge_type_;
blocksstable::ObMacroBlockWriter* writer_;
......@@ -109,6 +114,8 @@ private:
blocksstable::ObBloomFilterDataWriter bf_macro_writer_;
share::schema::ColumnMap* cols_id_map_;
bool is_opened_;
CheckRowFlagStatus check_row_flag_status_;
int64_t last_compact_row_nop_cnt_;
};
class ObMacroBlockEstimator : public ObIStoreRowProcessor {
......
......@@ -1213,8 +1213,9 @@ int ObMinorMergeMacroRowIterator::next()
// the first output row of each rowkey must be compact row
// skip the uncommited row and magic row(last row)
check_first_row_compacted_ = false;
if (curr_row_->row_type_flag_.is_compacted_multi_version_row()) { // do nothing for compacted row
// do nothing
if (curr_row_->row_type_flag_.is_compacted_multi_version_row()
&& 0 == curr_row_->row_val_.cells_[multi_version_row_info_->trans_version_index_ + 1].get_int()) {
// curr row is compact row && is not a uncommitted->committed row
} else if (OB_FAIL(make_first_row_compacted())) {
LOG_WARN("Fail to compact first row, ", K(ret));
} else if (OB_FAIL(row_queue_.get_next_row(curr_row_))) { // return first row in row_queue
......
......@@ -927,13 +927,15 @@ int ObMemtableMultiVersionScanIterator::init(const storage::ObTableIterParam& pa
int ObMemtableMultiVersionScanIterator::init_row_cells(ObIAllocator* allocator)
{
int ret = OB_SUCCESS;
void *buf = nullptr;
if (OB_ISNULL(allocator)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "allocator is null", K(ret), K(allocator));
} else if (NULL == (row_.row_val_.cells_ = (ObObj*)allocator->alloc(sizeof(ObObj) * columns_.count()))) {
} else if (NULL == (buf = (ObObj*)allocator->alloc(sizeof(ObObj) * columns_.count()))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
TRANS_LOG(WARN, "alloc cells fail", K(ret), "size", sizeof(ObObj) * columns_.count());
} else {
row_.row_val_.cells_ = new (buf) ObObj[columns_.count()];
row_.row_val_.count_ = columns_.count();
}
return ret;
......
......@@ -216,6 +216,7 @@ OB_INLINE static int simple_fuse_row(
if (common::ObActionFlag::OP_DEL_ROW == former.flag_) {
final_result = true;
if (first_val) { // copy rowkey
result.row_val_.count_ = former.row_val_.count_;
for (int i = 0; OB_SUCC(ret) && i < former.row_val_.count_; ++i) {
if (OB_FAIL(obj_copy(former.row_val_.cells_[i], result.row_val_.cells_[i]))) {
STORAGE_LOG(WARN, "failed to copy obj", K(ret), K(i), K(former.row_val_.cells_[i]));
......
......@@ -30,24 +30,26 @@ int ObSSTableDumpErrorInfo::get_sstable_scan_iter(
range.get_range().set_whole_range();
if (OB_FAIL(range.to_collation_free_range_on_demand_and_cutoff_range(allocator_))) {
STORAGE_LOG(WARN, "failed to transform range to collation free and range cutoff", K(range), K(ret));
} else if (OB_FAIL(prepare_sstable_query_param(sstable, schema))) {
} else if (OB_FAIL(prepare_sstable_query_param(sstable, schema, query_param_for_table1_))) {
STORAGE_LOG(WARN, "Fail to preapare scan param", K(ret));
} else if (OB_FAIL(sstable.scan(param_, context_, range, scanner))) {
} else if (OB_FAIL(sstable.scan(query_param_for_table1_.param_, query_param_for_table1_.context_, range, scanner))) {
STORAGE_LOG(WARN, "Fail to scan param", K(ret));
}
return ret;
}
int ObSSTableDumpErrorInfo::prepare_sstable_query_param(ObSSTable& sstable, const ObTableSchema& schema)
int ObSSTableDumpErrorInfo::prepare_sstable_query_param(
ObSSTable& sstable, const ObTableSchema& schema, QueryParam& query_param)
{
int ret = OB_SUCCESS;
reset();
query_param.reset();
const uint64_t tenant_id = extract_tenant_id(schema.get_table_id());
if (OB_FAIL(schema.get_column_ids(column_ids_, true))) {
if (OB_FAIL(schema.get_column_ids(query_param.column_ids_, true))) {
STORAGE_LOG(WARN, "Fail to get column ids. ", K(ret));
} else if (FALSE_IT(param_.out_cols_ = &column_ids_)) {
STORAGE_LOG(ERROR, "row getter", K(ret), K(column_ids_));
} else if (FALSE_IT(query_param.param_.out_cols_ = &query_param.column_ids_)) {
STORAGE_LOG(ERROR, "row getter", K(ret), K(query_param.column_ids_));
} else {
STORAGE_LOG(INFO, "success to get column ids. ", K(ret), K(query_param.column_ids_));
ObQueryFlag query_flag(ObQueryFlag::Forward,
true, /*is daily merge scan*/
true, /*is read multiple macro block*/
......@@ -55,22 +57,22 @@ int ObSSTableDumpErrorInfo::prepare_sstable_query_param(ObSSTable& sstable, cons
false /*is full row scan?*/,
false,
false);
store_ctx_.cur_pkey_ = sstable.get_partition_key();
param_.table_id_ = schema.get_table_id();
param_.rowkey_cnt_ = schema.get_rowkey_column_num();
param_.schema_version_ = schema.get_schema_version();
context_.query_flag_ = query_flag;
context_.store_ctx_ = &store_ctx_;
context_.allocator_ = &allocator_;
context_.stmt_allocator_ = &allocator_;
context_.trans_version_range_ = sstable.get_key().trans_version_range_;
context_.is_inited_ = true; // just used for dump
query_param.store_ctx_.cur_pkey_ = sstable.get_partition_key();
query_param.param_.table_id_ = schema.get_table_id();
query_param.param_.rowkey_cnt_ = schema.get_rowkey_column_num();
query_param.param_.schema_version_ = schema.get_schema_version();
query_param.context_.query_flag_ = query_flag;
query_param.context_.store_ctx_ = &query_param.store_ctx_;
query_param.context_.allocator_ = &allocator_;
query_param.context_.stmt_allocator_ = &allocator_;
query_param.context_.trans_version_range_ = sstable.get_key().trans_version_range_;
query_param.context_.is_inited_ = true; // just used for dump
}
return ret;
}
int ObSSTableDumpErrorInfo::simple_get_sstable_rowkey_get_iter(
ObSSTable& sstable, const common::ObStoreRowkey& rowkey, ObStoreRowIterator*& getter)
ObSSTable& sstable, const common::ObStoreRowkey& rowkey, QueryParam& query_param, ObStoreRowIterator*& getter)
{
int ret = OB_SUCCESS;
getter = NULL;
......@@ -78,8 +80,8 @@ int ObSSTableDumpErrorInfo::simple_get_sstable_rowkey_get_iter(
ext_rowkey_.get_store_rowkey() = rowkey;
if (OB_FAIL(ext_rowkey_.to_collation_free_on_demand_and_cutoff_range(allocator_))) {
STORAGE_LOG(WARN, "Fail to transfer rowkey", K(ret), K(ext_rowkey_));
} else if (OB_FAIL(sstable.get(param_, context_, ext_rowkey_, getter))) {
STORAGE_LOG(WARN, "Fail to get param", K(ret), K(column_ids_));
} else if (OB_FAIL(sstable.get(query_param.param_, query_param.context_, ext_rowkey_, getter))) {
STORAGE_LOG(WARN, "Fail to get param", K(ret), K(query_param.column_ids_));
}
return ret;
}
......@@ -101,6 +103,7 @@ int ObSSTableDumpErrorInfo::generate_projecter(
STORAGE_LOG(WARN, "failed to add into map", K(ret), K(i), K(column_ids_1.at(i)));
}
}
STORAGE_LOG(INFO, "generate_projecter", K(ret), K(column_ids_1), K(column_ids_2));
if (OB_SUCC(ret)) {
if (OB_FAIL(schema2.get_column_ids(column_ids_2))) {
STORAGE_LOG(WARN, "failed to get column ids", K(ret), K(schema1));
......@@ -116,7 +119,9 @@ int ObSSTableDumpErrorInfo::generate_projecter(
}
}
if (OB_SUCC(ret) && OB_FAIL(projector.push_back(dest_pos))) {
STORAGE_LOG(WARN, "success to get from hash map", K(ret), K(i), K(column_ids_2.at(i)), K(dest_pos));
STORAGE_LOG(WARN, "failed to push into projector", K(ret), K(i), K(column_ids_2.at(i)), K(dest_pos));
} else {
STORAGE_LOG(INFO, "success to push into projector", K(ret), K(i), K(column_ids_2.at(i)), K(dest_pos));
}
} // end for
}
......@@ -136,7 +141,12 @@ int ObSSTableDumpErrorInfo::transform_rowkey(
{
int ret = OB_SUCCESS;
for (int i = 0; i < rowkey_cnt; ++i) {
rowkey_obj_[i] = row.row_val_.cells_[projector.at(i)];
if (projector.at(i) < 0) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "projector is invalid", K(ret), K(i), K(rowkey_cnt), K(projector));
} else {
rowkey_obj_[i] = row.row_val_.cells_[projector.at(i)];
}
}
rowkey.assign(rowkey_obj_, rowkey_cnt);
return ret;
......@@ -151,20 +161,20 @@ int ObSSTableDumpErrorInfo::get_row_with_rowkey_and_check(const ObStoreRow* inpu
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "getter is NULL", K(ret), KPC(getter));
} else if (OB_FAIL(getter->get_next_row(ret_row))) { // get row from sstable1
STORAGE_LOG(WARN, "failed to get row", K(ret), KPC(input_row));
STORAGE_LOG(WARN, "failed to get row", K(ret), KPC(input_row), KPC(ret_row));
} else if (OB_ISNULL(ret_row) || !ret_row->is_valid()) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "row is invalid", K(ret), KPC(ret_row));
} else if (ObActionFlag::OP_ROW_EXIST != ret_row->flag_) {
++found_row_cnt;
STORAGE_LOG(ERROR, "found, row is not exist", KPC(input_row));
STORAGE_LOG(ERROR, "found, row is not exist", KPC(input_row), KPC(ret_row));
} else { // check row cells
for (int i = 0; i < projector.count(); ++i) {
if (projector.at(i) < 0 || projector.at(i) > input_row->row_val_.count_) {
// not exist
} else if (ret_row->row_val_.cells_[i] != input_row->row_val_.cells_[projector.at(i)]) {
++found_row_cnt;
STORAGE_LOG(ERROR, "found, column is diffenrent", K(i), KPC(ret_row), KPC(input_row));
STORAGE_LOG(ERROR, "found, column is diffenrent", K(i), KPC(ret_row), K(projector.at(i)), KPC(input_row));
break;
}
} // end of for
......@@ -178,62 +188,62 @@ int ObSSTableDumpErrorInfo::find_extra_row(
ObSSTable& sstable1, const ObTableSchema& schema1, ObSSTable& sstable2, const ObTableSchema& schema2)
{
int ret = OB_SUCCESS;
ObStoreRowIterator* scanner = NULL;
ObSEArray<int64_t, COL_ARRAY_LEN> projector;
common::ObSEArray<share::schema::ObColDesc, COL_ARRAY_LEN> col_descs;
int64_t found_row_cnt = 0;
int64_t iter_row_cnt = 0;
if (OB_FAIL(get_sstable_scan_iter(sstable1, schema1, scanner)) || OB_ISNULL(scanner)) {
STORAGE_LOG(WARN, "failed to get sstable scan iter", K(ret), K(sstable1), K(schema1));
} else if (OB_FAIL(generate_projecter(schema1, schema2, projector))) {
STORAGE_LOG(WARN, "failed to generate rowkey projector", K(ret));
} else if (OB_FAIL(schema2.get_column_ids(col_descs))) {
STORAGE_LOG(WARN, "failed to get column id array", K(ret), K(schema2));
} else if (OB_FAIL(prepare_sstable_query_param(sstable2, schema2))) {
STORAGE_LOG(WARN, "Fail to preapare scan param", K(ret));
if (schema1.get_virtual_column_cnt() > 0 || schema2.get_virtual_column_cnt() > 0) {
STORAGE_LOG(INFO, "schema have virtual column", K(ret));
} else {
const ObStoreRow* row_in_table1 = NULL;
ObStoreRowIterator* getter = NULL;
common::ObStoreRowkey rowkey;
const int64_t rowkey_cnt = schema2.get_rowkey_column_num();
param_.table_id_ = schema2.get_table_id();
param_.rowkey_cnt_ = rowkey_cnt;
while (OB_SUCC(ret)) {
if (OB_FAIL(scanner->get_next_row(row_in_table1))) { // get row from sstable1
if (OB_ITER_END != ret) {
STORAGE_LOG(WARN, "failed to get next row", K(ret));
} else {
STORAGE_LOG(WARN, "meet iter end", K(ret), KPC(scanner));
ret = OB_SUCCESS;
break;
ObStoreRowIterator* scanner = NULL;
ObSEArray<int64_t, COL_ARRAY_LEN> projector;
int64_t found_row_cnt = 0;
int64_t iter_row_cnt = 0;
if (OB_FAIL(get_sstable_scan_iter(sstable1, schema1, scanner)) || OB_ISNULL(scanner)) {
STORAGE_LOG(WARN, "failed to get sstable scan iter", K(ret), K(sstable1), K(schema1));
} else if (OB_FAIL(generate_projecter(schema1, schema2, projector))) {
STORAGE_LOG(WARN, "failed to generate rowkey projector", K(ret));
} else if (OB_FAIL(prepare_sstable_query_param(sstable2, schema2, query_param_for_table2_))) {
STORAGE_LOG(WARN, "Fail to preapare scan param", K(ret));
} else {
const ObStoreRow* row_in_table1 = NULL;
ObStoreRowIterator* getter = NULL;
common::ObStoreRowkey rowkey;
const int64_t rowkey_cnt = schema2.get_rowkey_column_num();
while (OB_SUCC(ret)) {
if (OB_FAIL(scanner->get_next_row(row_in_table1))) { // get row from sstable1
if (OB_ITER_END != ret) {
STORAGE_LOG(WARN, "failed to get next row", K(ret));
} else {
STORAGE_LOG(WARN, "meet iter end", K(ret), KPC(scanner));
ret = OB_SUCCESS;
break;
}
} else if (OB_ISNULL(row_in_table1) || !row_in_table1->is_valid()) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "row is invalid", K(ret), KPC(row_in_table1));
} else if (OB_FAIL(transform_rowkey(
*row_in_table1, rowkey_cnt, projector, rowkey))) { // project into rowkey of sstable2
STORAGE_LOG(WARN, "failed to transfor rowkey", K(ret));
} else if (OB_FAIL(simple_get_sstable_rowkey_get_iter(
sstable2, rowkey, query_param_for_table2_, getter))) { // get row in sstable2
STORAGE_LOG(WARN, "failed to get table getter", K(ret), KPC(getter));
} else if (OB_FAIL(get_row_with_rowkey_and_check(row_in_table1, getter, projector, found_row_cnt))) {
STORAGE_LOG(WARN, "failed to check row", K(ret));
}
} else if (OB_ISNULL(row_in_table1) || !row_in_table1->is_valid()) {
++iter_row_cnt;
} // end of while
}
if (OB_SUCC(ret)) {
if (found_row_cnt + sstable2.get_meta().row_count_ != sstable1.get_meta().row_count_) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "row is invalid", K(ret), KPC(row_in_table1));
} else if (OB_FAIL(transform_rowkey(
*row_in_table1, rowkey_cnt, projector, rowkey))) { // project into rowkey of sstable2
STORAGE_LOG(WARN, "failed to transfor rowkey", K(ret));
} else if (OB_FAIL(simple_get_sstable_rowkey_get_iter(sstable2, rowkey, getter))) { // get row in sstable2
STORAGE_LOG(WARN, "failed to get table getter", K(ret), KPC(getter));
} else if (OB_FAIL(get_row_with_rowkey_and_check(row_in_table1, getter, projector, found_row_cnt))) {
STORAGE_LOG(WARN, "failed to check row", K(ret));
STORAGE_LOG(WARN, "have not found all extra rows", K(ret), K(found_row_cnt), K(iter_row_cnt));
} else {
STORAGE_LOG(ERROR,
"success to get all extra rows",
K(ret),
K(sstable1),
K(schema1),
K(sstable2),
K(schema2),
K(found_row_cnt));
}
++iter_row_cnt;
} // end of while
}
if (OB_SUCC(ret)) {
if (found_row_cnt + sstable2.get_meta().row_count_ != sstable1.get_meta().row_count_) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "have not found all extra rows", K(ret), K(found_row_cnt), K(iter_row_cnt));
} else {
STORAGE_LOG(ERROR,
"success to get all extra rows",
K(ret),
K(sstable1),
K(schema1),
K(sstable2),
K(schema2),
K(found_row_cnt));
}
}
return ret;
......@@ -257,9 +267,8 @@ int ObSSTableDumpErrorInfo::main_and_index_row_count_error(ObSSTable& main_table
void ObSSTableDumpErrorInfo::reset()
{
param_.reset();
context_.reset();
store_ctx_.reset();
query_param_for_table1_.reset();
query_param_for_table2_.reset();
ext_rowkey_.reset();
}
......
......@@ -23,12 +23,10 @@ namespace storage {
class ObSSTableDumpErrorInfo {
public:
ObSSTableDumpErrorInfo()
: param_(),
context_(),
store_ctx_(),
allocator_("dump_error_info"),
column_ids_(COL_ARRAY_LEN, allocator_),
ext_rowkey_()
: allocator_("dump_error_info"),
ext_rowkey_(),
query_param_for_table1_(allocator_),
query_param_for_table2_(allocator_)
{}
~ObSSTableDumpErrorInfo()
{
......@@ -44,6 +42,22 @@ public:
private:
static const int64_t COL_ARRAY_LEN = 128;
struct QueryParam {
QueryParam(ObIAllocator& allocator) : param_(), context_(), store_ctx_(), column_ids_(COL_ARRAY_LEN, allocator)
{}
void reset()
{
param_.reset();
context_.reset();
store_ctx_.reset();
column_ids_.reuse();
}
ObTableIterParam param_;
ObTableAccessContext context_;
ObStoreCtx store_ctx_;
ObArray<ObColDesc, ObIAllocator&> column_ids_;
};
private:
int find_extra_row(
ObSSTable& sstable1, const ObTableSchema& schema1, ObSSTable& sstable2, const ObTableSchema& schema2);
......@@ -51,20 +65,18 @@ private:
const ObStoreRow& row, const int64_t rowkey_cnt, ObIArray<int64_t>& projector, ObStoreRowkey& rowkey);
int get_sstable_scan_iter(ObSSTable& sstable, const ObTableSchema& schema, ObStoreRowIterator*& scanner);
int simple_get_sstable_rowkey_get_iter(
ObSSTable& sstable, const common::ObStoreRowkey& rowkey, ObStoreRowIterator*& getter);
ObSSTable& sstable, const common::ObStoreRowkey& rowkey, QueryParam& query_param, ObStoreRowIterator*& getter);
int generate_projecter(const ObTableSchema& schema1, const ObTableSchema& schema2, ObIArray<int64_t>& projector);
int get_row_with_rowkey_and_check(const ObStoreRow* input_row, ObStoreRowIterator* getter,
common::ObSEArray<int64_t, COL_ARRAY_LEN>& projector, int64_t& found_row_cnt);
int prepare_sstable_query_param(ObSSTable& sstable, const ObTableSchema& schema);
int prepare_sstable_query_param(ObSSTable& sstable, const ObTableSchema& schema, QueryParam& query_param);
private:
ObObj rowkey_obj_[OB_MAX_ROWKEY_COLUMN_NUMBER];
ObTableIterParam param_;
ObTableAccessContext context_;
ObStoreCtx store_ctx_;
ObArenaAllocator allocator_;
ObArray<ObColDesc, ObIAllocator&> column_ids_;
ObExtStoreRowkey ext_rowkey_;
QueryParam query_param_for_table1_;
QueryParam query_param_for_table2_;
};
} // namespace storage
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册