提交 ae0ace66 编写于 作者: O obdev 提交者: wangzelin.wzl

cherry-pick omitted fixs to open-source branch

上级 c949c5e0
......@@ -404,9 +404,12 @@ void ObExtStoreRange::set_range_array_idx(const int64_t range_array_idx)
}
// for multi version get, the rowkey would be converted into a range (with trans version),
// e.g. rowkey1 -> [(rowkey1, -read_snapshot), (rowkey1, MIN_VERSION))
int ObVersionStoreRangeConversionHelper::store_rowkey_to_multi_version_range(const ObExtStoreRowkey& src_rowkey,
const ObVersionRange& version_range, ObIAllocator& allocator, ObExtStoreRange& multi_version_range)
// e.g. rowkey1 -> [(rowkey1, -read_snapshot), (rowkey1, MAX_VERSION)]
int ObVersionStoreRangeConversionHelper::store_rowkey_to_multi_version_range(
const ObExtStoreRowkey &src_rowkey,
const ObVersionRange &version_range,
ObIAllocator &allocator,
ObExtStoreRange &multi_version_range)
{
int ret = OB_SUCCESS;
// FIXME: hard coding
......@@ -420,24 +423,28 @@ int ObVersionStoreRangeConversionHelper::store_rowkey_to_multi_version_range(con
multi_version_range.get_range().get_start_key()))) {
COMMON_LOG(WARN, "build multi version store rowkey failed", K(ret), K(src_rowkey), K(version_range));
} else if (OB_FAIL(build_multi_version_store_rowkey(src_rowkey.get_store_rowkey(),
ObVersionRange::MIN_VERSION,
allocator,
multi_version_range.get_range().get_end_key()))) {
COMMON_LOG(WARN, "build multi version store rowkey failed", K(ret), K(src_rowkey), K(version_range));
ObVersionRange::MAX_VERSION,
allocator,
multi_version_range.get_range().get_end_key()))) {
COMMON_LOG(WARN, "build multi version store rowkey failed",
K(ret), K(src_rowkey), K(version_range));
} else if (OB_FAIL(multi_version_range.to_collation_free_range_on_demand_and_cutoff_range(allocator))) {
COMMON_LOG(WARN, "fail to get colllation free rowkey and range cutoff", K(ret));
} else {
multi_version_range.get_range().set_left_closed();
multi_version_range.get_range().set_right_open();
multi_version_range.get_range().set_right_closed();
}
return ret;
}
// for multi version scan, the range would be converted into a range (with trans version),
// e.g. case 1 : (rowkey1, rowkey2) -> ((rowkey1, MIN_VERSION), (rowkey2, -MAX_VERSION))
// case 2 : [rowkey1, rowkey2] -> [(rowkey1, -read_snapshot), (rowkey2, MIN_VERSION))
int ObVersionStoreRangeConversionHelper::range_to_multi_version_range(const ObExtStoreRange& src_range,
const ObVersionRange& version_range, ObIAllocator& allocator, ObExtStoreRange& multi_version_range)
// e.g. case 1 : (rowkey1, rowkey2) -> ((rowkey1, MAX_VERSION), (rowkey2, -MAX_VERSION))
// case 2 : [rowkey1, rowkey2] -> [(rowkey1, -MAX_VERSION), (rowkey2, MAX_VERSION)]
int ObVersionStoreRangeConversionHelper::range_to_multi_version_range(
const ObExtStoreRange &src_range,
const ObVersionRange &version_range,
ObIAllocator &allocator,
ObExtStoreRange &multi_version_range)
{
int ret = OB_SUCCESS;
const bool include_start = src_range.get_range().get_border_flag().inclusive_start();
......@@ -448,15 +455,15 @@ int ObVersionStoreRangeConversionHelper::range_to_multi_version_range(const ObEx
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "version_range is not valid", K(ret), K(version_range));
} else if (OB_FAIL(build_multi_version_store_rowkey(src_range.get_range().get_start_key(),
include_start ? -INT64_MAX : ObVersionRange::MIN_VERSION,
allocator,
multi_version_range.get_range().get_start_key()))) {
COMMON_LOG(WARN, "build multi version store rowkey failed", K(ret), K(src_range), K(version_range));
include_start ? -INT64_MAX : ObVersionRange::MAX_VERSION,
allocator, multi_version_range.get_range().get_start_key()))) {
COMMON_LOG(WARN, "build multi version store rowkey failed",
K(ret), K(src_range), K(version_range));
} else if (OB_FAIL(build_multi_version_store_rowkey(src_range.get_range().get_end_key(),
include_end ? ObVersionRange::MIN_VERSION : -ObVersionRange::MAX_VERSION,
allocator,
multi_version_range.get_range().get_end_key()))) {
COMMON_LOG(WARN, "build multi version store rowkey failed", K(ret), K(src_range), K(version_range));
include_end ? ObVersionRange::MAX_VERSION : - ObVersionRange::MAX_VERSION,
allocator, multi_version_range.get_range().get_end_key()))) {
COMMON_LOG(WARN, "build multi version store rowkey failed",
K(ret), K(src_range), K(version_range));
} else if (OB_FAIL(multi_version_range.to_collation_free_range_on_demand_and_cutoff_range(allocator))) {
COMMON_LOG(WARN, "fail to get collation free rowkey", K(ret));
} else {
......@@ -466,7 +473,11 @@ int ObVersionStoreRangeConversionHelper::range_to_multi_version_range(const ObEx
} else {
multi_version_range.get_range().set_left_open();
}
multi_version_range.get_range().set_right_open();
if (include_end) {
multi_version_range.get_range().set_right_closed();
} else {
multi_version_range.get_range().set_right_open();
}
}
return ret;
}
......
......@@ -298,6 +298,10 @@ int ObGVPartitionInfo::inner_get_next_row(ObNewRow*& row)
// schema_version
cur_row_.cells_[i].set_int(info.get_data_info().get_schema_version());
break;
case OB_APP_MIN_COLUMN_ID + 33:
// last_replay_log_ts
cur_row_.cells_[i].set_int(info.get_data_info().get_last_replay_log_ts());
break;
default:
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "invalid column id", K(ret), K(col_id));
......
......@@ -5644,6 +5644,7 @@ def_table_schema(
('replica_type', 'int', 'false', '0'),
('last_replay_log_id', 'int', 'false', '0'),
('schema_version', 'int', 'false', '0'),
('last_replay_log_ts', 'int', 'false', '0'),
],
partition_columns = ['svr_ip', 'svr_port'],
)
......
......@@ -66,6 +66,8 @@ private:
private:
void reset();
void reuse();
int init_node_vector();
int block_index_to_node_vector();
int node_vector_to_node_array();
int add_node_to_vector(MediumNode& cur_node, int index);
......@@ -112,14 +114,13 @@ private:
int64_t block_count_; // count of micro blocks
int64_t rowkey_column_count_;
int64_t data_offset_;
common::ObArenaAllocator allocator_;
common::ObArenaAllocator vec_allocator_;
common::ObVector<MediumNode> node_vector_[common::OB_MAX_ROWKEY_COLUMN_NUMBER];
common::ObVector<MediumNode, common::ObArenaAllocator> *node_vector_[common::OB_MAX_ROWKEY_COLUMN_NUMBER];
int64_t node_vector_count_;
NodeArray node_array_;
ObMicroBlockIndexMgr* micro_index_mgr_;
common::ObArenaAllocator allocator_;
bool vec_inited_;
private:
DISALLOW_COPY_AND_ASSIGN(ObMicroBlockIndexTransformer);
};
......
......@@ -1662,15 +1662,16 @@ int ObPartitionMergeUtil::merge_partition(memtable::ObIMemtableCtxFactory* memct
if (0 == minimum_iters.count()) {
ret = OB_ITER_END;
} else if (1 == minimum_iters.count() && NULL == minimum_iters.at(0)->get_curr_row()) {
ObMacroRowIterator* iter = minimum_iters.at(0);
const storage::ObMacroBlockDesc& block_desc = iter->get_curr_macro_block();
if (!iter->macro_block_opened() &&
((rewrite_block_cnt < need_rewrite_block_cnt && ctx.need_rewrite_macro_block(block_desc)) ||
(iter->need_rewrite_current_macro_block()))) {
ObMacroRowIterator *iter = minimum_iters.at(0);
const storage::ObMacroBlockDesc &block_desc = iter->get_curr_macro_block();
if (!iter->macro_block_opened()
&& ((rewrite_block_cnt < need_rewrite_block_cnt
&& ctx.need_rewrite_macro_block(block_desc))
|| (iter->need_rewrite_current_macro_block()))) {
if (!ctx.param_.is_major_merge()) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("only major merge can call rewrite_macro_block", K(ret), K(ctx), KPC(iter));
} else if (OB_FAIL(rewrite_macro_block(minimum_iters, ctx.merge_level_, partition_fuser, processor))) {
} else if (OB_FAIL(rewrite_macro_block(minimum_iters, partition_fuser, processor))) {
LOG_WARN("rewrite_macro_block failed", K(ret), K(ctx));
} else {
++rewrite_block_cnt;
......@@ -2237,8 +2238,10 @@ int ObPartitionMergeUtil::get_macro_block_count_to_rewrite(const storage::ObSSTa
return ret;
}
int ObPartitionMergeUtil::rewrite_macro_block(ObIPartitionMergeFuser::MERGE_ITER_ARRAY& minimum_iters,
const storage::ObMergeLevel& merge_level, ObIPartitionMergeFuser* partition_fuser, ObIStoreRowProcessor& processor)
int ObPartitionMergeUtil::rewrite_macro_block(
ObIPartitionMergeFuser::MERGE_ITER_ARRAY &minimum_iters,
ObIPartitionMergeFuser *partition_fuser,
ObIStoreRowProcessor &processor)
{
int ret = OB_SUCCESS;
ObMacroRowIterator* iter = static_cast<ObMacroRowIterator*>(minimum_iters.at(0));
......@@ -2253,7 +2256,8 @@ int ObPartitionMergeUtil::rewrite_macro_block(ObIPartitionMergeFuser::MERGE_ITER
} else {
while (OB_SUCC(ret) && iter->macro_block_opened()) {
// open the micro block if needed
if (MICRO_BLOCK_MERGE_LEVEL == merge_level && !iter->micro_block_opened()) {
if (MICRO_BLOCK_MERGE_LEVEL == iter->get_merge_level()
&& !iter->micro_block_opened()) {
if (OB_FAIL(iter->open_curr_micro_block())) {
LOG_WARN("open_curr_micro_block failed", K(ret));
}
......
......@@ -504,9 +504,10 @@ private:
static bool need_open_right(int64_t cmp_ret);
static int purge_minimum_iters(common::ObIArray<ObMacroRowIterator*>& minimum_iters, ObMacroRowIterator* base_iter);
static int rewrite_macro_block(ObIPartitionMergeFuser::MERGE_ITER_ARRAY& minimum_iters,
const storage::ObMergeLevel& merge_level, ObIPartitionMergeFuser* partition_fuser,
ObIStoreRowProcessor& processor);
static int rewrite_macro_block(
ObIPartitionMergeFuser::MERGE_ITER_ARRAY &minimum_iters,
ObIPartitionMergeFuser *partition_fuser,
ObIStoreRowProcessor &processor);
static int fuse_row(const storage::ObSSTableMergeCtx& ctx,
const common::ObIArray<ObMacroRowIterator*>& macro_row_iters, ObRowFuseInfo& row_fuse_info,
......
......@@ -594,6 +594,7 @@ ObSSTableMergeCtx::ObSSTableMergeCtx()
merge_log_ts_(INT_MAX),
trans_table_end_log_ts_(0),
trans_table_timestamp_(0),
pg_last_replay_log_ts_(0),
read_base_version_(0)
{}
......@@ -1037,8 +1038,11 @@ int ObSSTableMergePrepareTask::process()
storage = static_cast<ObPartitionStorage*>(ctx->partition_guard_.get_pg_partition()->get_storage()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("The partition storage must not NULL", K(ret), K(ctx));
} else if (ctx->param_.is_multi_version_minor_merge() &&
OB_FAIL(pg->get_pg_storage().get_trans_table_end_log_ts_and_timestamp(
} else if (ctx->param_.is_mini_merge()
&& OB_FAIL(pg->get_pg_storage().get_last_replay_log_ts(ctx->pg_last_replay_log_ts_))) {
LOG_WARN("failed to get pg last replay log ts", K(ret), K(ctx->param_));
} else if (ctx->param_.is_mini_merge()
&& OB_FAIL(pg->get_pg_storage().get_trans_table_end_log_ts_and_timestamp(
ctx->trans_table_end_log_ts_, ctx->trans_table_timestamp_))) {
LOG_WARN("failed to get trans_table end_log_ts and timestamp", K(ret), K(ctx->param_));
} else if (OB_FAIL(storage->build_merge_ctx(*ctx))) {
......
......@@ -155,6 +155,7 @@ struct ObSSTableScheduleMergeParam {
};
class ObMergeParameter;
//TODO such a massive context!
struct ObSSTableMergeCtx {
ObSSTableMergeCtx();
virtual ~ObSSTableMergeCtx();
......@@ -250,18 +251,28 @@ struct ObSSTableMergeCtx {
int64_t merge_log_ts_;
int64_t trans_table_end_log_ts_;
int64_t trans_table_timestamp_;
int64_t read_base_version_; // use for major merge
TO_STRING_KV(K_(param), K_(sstable_version_range), K_(create_snapshot_version), K_(base_schema_version),
K_(schema_version), K_(dump_memtable_timestamp), KP_(table_schema), K_(is_full_merge), K_(stat_sampling_ratio),
K_(merge_level), K_(progressive_merge_num), K_(progressive_merge_start_version), K_(parallel_merge_ctx),
K_(checksum_method), K_(result_code), KP_(data_table_schema), KP_(mv_dep_table_schema), K_(index_stats),
"tables_handle count", tables_handle_.get_count(), K_(index_stats), K_(is_in_progressive_new_checksum),
K_(store_column_checksum_in_micro), K_(progressive_merge_round), K_(progressive_merge_step),
K_(use_new_progressive), K_(tables_handle), K_(base_table_handle), K_(create_sstable_for_large_snapshot),
K_(logical_data_version), K_(log_ts_range), K_(merge_log_ts), K_(trans_table_end_log_ts),
K_(trans_table_timestamp), K_(read_base_version));
// we would push up last_replay_log_ts if the correspoding memtable has been merged,
// but this memtable may not be released due to the warming-up table_store
// if a new index is created, the schedule will also trigger a mini merge for it with the old frozen memtable
// now we get a table store with old end_log_ts within the pg which has a larger last_replay_log_ts
// so we need use last_replay_log_ts to prevent such uselsess mini merge happenning
int64_t pg_last_replay_log_ts_;
int64_t read_base_version_; // use for major merge
TO_STRING_KV(K_(param), K_(sstable_version_range), K_(create_snapshot_version),
K_(base_schema_version), K_(schema_version), K_(dump_memtable_timestamp),
KP_(table_schema), K_(is_full_merge), K_(stat_sampling_ratio), K_(merge_level),
K_(progressive_merge_num), K_(progressive_merge_start_version),
K_(parallel_merge_ctx), K_(checksum_method), K_(result_code),
KP_(data_table_schema), KP_(mv_dep_table_schema),
K_(index_stats),
"tables_handle count", tables_handle_.get_count(), K_(index_stats),
K_(is_in_progressive_new_checksum), K_(store_column_checksum_in_micro),
K_(progressive_merge_round),
K_(progressive_merge_step), K_(use_new_progressive),
K_(tables_handle), K_(base_table_handle), K_(create_sstable_for_large_snapshot),
K_(logical_data_version), K_(log_ts_range), K_(merge_log_ts), K_(trans_table_end_log_ts),
K_(trans_table_timestamp), K_(pg_last_replay_log_ts), K_(read_base_version));
private:
DISALLOW_COPY_AND_ASSIGN(ObSSTableMergeCtx);
};
......
......@@ -12503,7 +12503,7 @@ int ObMigrateUtil::wait_trans_table_merge_finish(ObMigrateCtx& ctx)
do {
if (OB_FAIL(ObDagScheduler::get_instance().check_dag_exist(&fake_dag, exist))) {
LOG_WARN("failed to check dag exist", K(ret), K(fake_dag));
} else if (!exist) {
} else if (exist) {
if (ObTimeUtility::current_time() - start_ts > WAIT_TIMEOUT) {
ret = OB_WAIT_TRANS_TABLE_MERGE_TIMEOUT;
LOG_WARN("wait trans table merge finish timeout", K(ret), K(fake_dag));
......
......@@ -690,6 +690,7 @@ private:
const common::ObNewRow& row, ObLockFlag lock_flag, RowReshape*& row_reshape);
int lock_rows_(
const ObStoreCtx& ctx, const ObTableScanParam& scan_param, const common::ObNewRow& row, RowReshape*& row_reshape);
int check_useless_index_mini_merge(const storage::ObSSTableMergeCtx &ctx);
int dump_error_info(ObSSTable& main_sstable, ObSSTable& index_sstable);
// disallow copy;
DISALLOW_COPY_AND_ASSIGN(ObPartitionStorage);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册