提交 1f055408 编写于 作者: C chaser-ch 提交者: wangzelin.wzl

fix fuse row cache problem with uncommited trans

上级 f2299662
......@@ -106,12 +106,11 @@ ObFuseRowCacheValue::ObFuseRowCacheValue()
schema_version_(0),
snapshot_version_(0),
partition_id_(0),
sstable_end_log_ts_(0),
fq_ctx_()
sstable_end_log_ts_(0)
{}
int ObFuseRowCacheValue::init(const ObStoreRow& row, const int64_t schema_version, const int64_t snapshot_version,
const int64_t partition_id, const int64_t sstable_end_log_ts, const ObFastQueryContext& fq_ctx)
const int64_t partition_id, const int64_t sstable_end_log_ts)
{
int ret = OB_SUCCESS;
if (schema_version < 0 || snapshot_version <= 0 || partition_id < 0) {
......@@ -125,7 +124,6 @@ int ObFuseRowCacheValue::init(const ObStoreRow& row, const int64_t schema_versio
snapshot_version_ = snapshot_version;
partition_id_ = partition_id;
sstable_end_log_ts_ = sstable_end_log_ts;
fq_ctx_ = fq_ctx;
size_ = sizeof(ObObj) * column_cnt_;
for (int64_t i = 0; OB_SUCC(ret) && i < column_cnt_; ++i) {
const ObObj& obj = obj_array_[i];
......@@ -174,7 +172,6 @@ int ObFuseRowCacheValue::deep_copy(char* buf, const int64_t buf_len, ObIKVCacheV
pfuse_value->flag_ = flag_;
pfuse_value->snapshot_version_ = snapshot_version_;
pfuse_value->partition_id_ = partition_id_;
pfuse_value->fq_ctx_ = fq_ctx_;
pfuse_value->size_ = size_;
pfuse_value->sstable_end_log_ts_ = sstable_end_log_ts_;
pos = sizeof(*this) + sizeof(ObObj) * column_cnt_;
......
......@@ -44,7 +44,7 @@ public:
ObFuseRowCacheValue();
virtual ~ObFuseRowCacheValue() = default;
int init(const storage::ObStoreRow& row, const int64_t schema_version, const int64_t snapshot_version,
const int64_t partition_id, const int64_t sstable_end_log_ts, const storage::ObFastQueryContext& fq_ctx);
const int64_t partition_id, const int64_t sstable_end_log_ts);
virtual int64_t size() const override;
virtual int deep_copy(char* buf, const int64_t buf_len, ObIKVCacheValue*& value) const override;
bool is_valid() const
......@@ -80,12 +80,8 @@ public:
{
return sstable_end_log_ts_;
}
const storage::ObFastQueryContext* get_fq_ctx() const
{
return &fq_ctx_;
}
TO_STRING_KV(KP_(obj_array), K_(size), K_(column_cnt), K_(schema_version), K_(flag), K_(snapshot_version),
K_(partition_id), K_(sstable_end_log_ts), K(fq_ctx_));
K_(partition_id), K_(sstable_end_log_ts));
private:
common::ObObj* obj_array_;
......@@ -96,7 +92,6 @@ private:
int64_t snapshot_version_;
int64_t partition_id_;
int64_t sstable_end_log_ts_;
storage::ObFastQueryContext fq_ctx_;
};
struct ObFuseRowValueHandle {
......
......@@ -897,7 +897,6 @@ int ObMemtable::get(const storage::ObTableIterParam& param, storage::ObTableAcce
ObMemtableKey returned_mtk;
ObMvccValueIterator value_iter;
const common::ObIArray<share::schema::ObColDesc>* out_cols = nullptr;
bool need_query_memtable = false;
ObTransSnapInfo snapshot_info;
const bool skip_compact = false;
if (IS_NOT_INIT) {
......@@ -915,64 +914,14 @@ int ObMemtable::get(const storage::ObTableIterParam& param, storage::ObTableAcce
TRANS_LOG(WARN, "mtk encode fail", "ret", ret);
} else if (OB_FAIL(context.store_ctx_->get_snapshot_info(snapshot_info))) {
TRANS_LOG(WARN, "get snapshot info failed", K(ret));
} else {
bool fast_query = false;
const ObFastQueryContext* fq_ctx = nullptr;
if (nullptr != context.fq_ctx_) {
fq_ctx = context.fq_ctx_;
fast_query =
timestamp_ == fq_ctx->get_timestamp() && this == fq_ctx->get_memtable() && nullptr != fq_ctx->get_mvcc_row();
}
if (fast_query && !context.store_ctx_->mem_ctx_->is_can_elr()) {
int64_t trans_version = 0L;
if (OB_FAIL(mvcc_engine_.get_trans_version(*context.store_ctx_->mem_ctx_,
snapshot_info,
context.query_flag_,
&parameter_mtk,
reinterpret_cast<ObMvccRow*>(fq_ctx->get_mvcc_row()),
trans_version))) {
TRANS_LOG(WARN, "fail to do mvcc engine fast get", K(ret));
} else {
if (trans_version == fq_ctx->get_row_version()) {
// just read from row cache
// do not set memtable row
row.flag_ = common::ObActionFlag::OP_ROW_DOES_NOT_EXIST;
row.fq_ctx_.set_timestamp(-1L);
row.fq_ctx_.set_memtable(nullptr);
row.fq_ctx_.set_mvcc_row(nullptr);
row.fq_ctx_.set_row_version(0L);
row.snapshot_version_ = trans_version;
TRANS_LOG(DEBUG, "do fast get successfully", K(rowkey), K(trans_version), K(*fq_ctx));
} else {
if (OB_FAIL(mvcc_engine_.get(*context.store_ctx_->mem_ctx_,
snapshot_info,
context.query_flag_,
skip_compact,
&parameter_mtk,
&returned_mtk,
value_iter))) {
TRANS_LOG(WARN, "fail to do mvcc engine get", K(ret));
} else {
need_query_memtable = true;
}
}
}
} else {
if (OB_FAIL(mvcc_engine_.get(*context.store_ctx_->mem_ctx_,
snapshot_info,
context.query_flag_,
skip_compact,
&parameter_mtk,
&returned_mtk,
value_iter))) {
TRANS_LOG(WARN, "fail to do mvcc engine get", K(ret));
} else {
need_query_memtable = true;
}
}
}
if (OB_FAIL(ret) || !need_query_memtable) {
} else if (OB_FAIL(mvcc_engine_.get(*context.store_ctx_->mem_ctx_,
snapshot_info,
context.query_flag_,
skip_compact,
&parameter_mtk,
&returned_mtk,
value_iter))) {
TRANS_LOG(WARN, "fail to do mvcc engine get", K(ret));
} else {
ColumnMap* local_map = NULL;
const ColumnMap* param_column_map = nullptr;
......@@ -1037,10 +986,6 @@ int ObMemtable::get(const storage::ObTableIterParam& param, storage::ObTableAcce
}
}
}
row.fq_ctx_.set_timestamp(timestamp_);
row.fq_ctx_.set_memtable(this);
row.fq_ctx_.set_mvcc_row(const_cast<ObMvccRow*>(value_iter.get_mvcc_row()));
row.fq_ctx_.set_row_version(row.snapshot_version_);
}
}
}
......
......@@ -101,8 +101,7 @@ int ObFuseRowCacheFetcher::put_fuse_row_cache(
access_param_->iter_param_.schema_version_,
read_snapshot_version,
access_ctx_->pkey_.get_partition_id(),
sstable_end_log_ts,
row.fq_ctx_))) {
sstable_end_log_ts))) {
STORAGE_LOG(WARN, "fail to init row cache value", K(tmp_ret));
} else if (OB_SUCCESS != (tmp_ret = ObStorageCacheSuite::get_instance().get_fuse_row_cache().put_row(
cache_key, row_cache_value))) {
......
......@@ -901,7 +901,6 @@ ObTableAccessContext::ObTableAccessContext()
trans_version_range_(),
row_filter_(NULL),
use_fuse_row_cache_(false),
fq_ctx_(nullptr),
need_scn_(false),
fuse_row_cache_hit_rate_(0),
block_cache_hit_rate_(0),
......@@ -1103,7 +1102,6 @@ void ObTableAccessContext::reset()
trans_version_range_.reset();
row_filter_ = NULL;
use_fuse_row_cache_ = false;
fq_ctx_ = nullptr;
fuse_row_cache_hit_rate_ = 0;
block_cache_hit_rate_ = 0;
is_array_binding_ = false;
......@@ -1142,7 +1140,6 @@ void ObTableAccessContext::reuse()
trans_version_range_.reset();
row_filter_ = NULL;
use_fuse_row_cache_ = false;
fq_ctx_ = nullptr;
fuse_row_cache_hit_rate_ = 0;
block_cache_hit_rate_ = 0;
is_array_binding_ = false;
......
......@@ -658,7 +658,6 @@ public:
uint16_t* column_ids_;
common::ObNewRow row_val_;
int64_t snapshot_version_;
ObFastQueryContext fq_ctx_;
int64_t range_array_idx_;
transaction::ObTransID* trans_id_ptr_;
bool fast_filter_skipped_;
......@@ -1393,7 +1392,6 @@ public:
common::ObVersionRange trans_version_range_;
const ObIStoreRowFilter* row_filter_;
bool use_fuse_row_cache_; // temporary code
const ObFastQueryContext* fq_ctx_;
bool need_scn_;
int16_t fuse_row_cache_hit_rate_;
int16_t block_cache_hit_rate_;
......
......@@ -228,7 +228,6 @@ int ObMultipleGetMerge::construct_iters_without_fuse_row_cache()
{
int ret = OB_SUCCESS;
const ObIArray<ObITable*>& tables = tables_handle_.get_tables();
access_ctx_->fq_ctx_ = nullptr;
if (iters_.count() > 0 && iters_.count() != tables.count()) {
ret = OB_ERR_UNEXPECTED;
......@@ -325,7 +324,6 @@ int ObMultipleGetMerge::try_get_fuse_row_cache(int64_t& end_table_idx)
// do not use fuse row cache
handle.reset();
} else {
access_ctx_->fq_ctx_ = handle.value_->get_fq_ctx();
end_table_idx = i;
rows_[handle_idx].state_ = ObMultiGetRowState::IN_FUSE_ROW_CACHE;
rows_[handle_idx].sstable_end_log_ts_ = sstable_end_log_ts;
......@@ -485,17 +483,13 @@ int ObMultipleGetMerge::get_table_row(const int64_t table_idx, const int64_t row
if (table->is_multi_version_minor_sstable() && row_info.sstable_end_log_ts_ < table->get_end_log_ts()) {
row_info.sstable_end_log_ts_ = table->get_end_log_ts();
}
} else if (!prow->fq_ctx_.is_valid() && table->is_memtable()) {
stop_reading = true;
}
row_info.row_.fq_ctx_ = prow->fq_ctx_.is_valid() ? prow->fq_ctx_ : row_info.row_.fq_ctx_;
STORAGE_LOG(DEBUG,
"process row fuse",
KP(this),
"row",
prow->flag_ == ObActionFlag::OP_ROW_DOES_NOT_EXIST ? "not exist" : to_cstring(*prow),
K(row_info.row_),
K(row_info.row_.fq_ctx_),
K(access_ctx_->store_ctx_->mem_ctx_->get_read_snapshot()),
K(stop_reading));
}
......@@ -552,7 +546,6 @@ int ObMultipleGetMerge::prefetch()
}
} else {
// try get from latest memtable
access_ctx_->fq_ctx_ = nullptr;
for (int64_t i = table_cnt - 1; OB_SUCC(ret) && i >= end_memtable_idx_; --i) {
if (OB_FAIL(get_table_row(i, prefetch_range_idx_, stop_reading))) {
STORAGE_LOG(WARN, "fail to get table row", K(ret));
......
......@@ -78,7 +78,7 @@ int ObSingleMerge::is_range_valid() const
}
int ObSingleMerge::get_table_row(const int64_t table_idx, const ObIArray<ObITable*>& tables, const ObStoreRow*& prow,
ObStoreRow& fuse_row, bool& final_result, int64_t& sstable_end_log_ts, bool& stop_reading)
ObStoreRow& fuse_row, bool& final_result, int64_t& sstable_end_log_ts)
{
int ret = OB_SUCCESS;
ObStoreRowIterator* iter = NULL;
......@@ -124,17 +124,9 @@ int ObSingleMerge::get_table_row(const int64_t table_idx, const ObIArray<ObITabl
if (table->is_minor_sstable() && sstable_end_log_ts < table->get_end_log_ts()) {
sstable_end_log_ts = table->get_end_log_ts();
}
} else if (!prow->fq_ctx_.is_valid() && table->is_memtable()) {
// use fast query but no data is changed
stop_reading = true;
}
fuse_row.fq_ctx_ = prow->fq_ctx_.is_valid() ? prow->fq_ctx_ : fuse_row.fq_ctx_;
STORAGE_LOG(DEBUG,
"process row fuse",
K(*prow),
K(fuse_row),
K(fuse_row.fq_ctx_),
K(access_ctx_->store_ctx_->mem_ctx_->get_read_snapshot()));
STORAGE_LOG(
DEBUG, "process row fuse", K(*prow), K(fuse_row), K(access_ctx_->store_ctx_->mem_ctx_->get_read_snapshot()));
}
}
return ret;
......@@ -159,7 +151,6 @@ int ObSingleMerge::inner_get_next_row(ObStoreRow& row)
bool final_result = false;
bool is_fuse_row_empty = false;
int64_t sstable_end_log_ts = 0;
bool stop_reading = false;
ObStoreRow& fuse_row = full_row_;
nop_pos_.reset();
fuse_row.row_val_.count_ = 0;
......@@ -174,8 +165,6 @@ int ObSingleMerge::inner_get_next_row(ObStoreRow& row)
K(access_ctx_->use_fuse_row_cache_),
K(access_param_->iter_param_.enable_fuse_row_cache()));
access_ctx_->fq_ctx_ = nullptr;
// firstly, try get from fuse row cache if memtable row is not final result
if (OB_SUCC(ret) && enable_fuse_row_cache && !has_frozen_memtable) {
if (OB_FAIL(fuse_row_cache_fetcher_.get_fuse_row_cache(rowkey_->get_store_rowkey(), handle_))) {
......@@ -203,7 +192,6 @@ int ObSingleMerge::inner_get_next_row(ObStoreRow& row)
handle_.reset();
} else {
found_row_cache = true;
access_ctx_->fq_ctx_ = handle_.value_->get_fq_ctx();
end_table_idx = i;
STORAGE_LOG(DEBUG, "fuse row cache info", K(*(handle_.value_)), K(sstable_end_log_ts), K(*table));
}
......@@ -215,8 +203,8 @@ int ObSingleMerge::inner_get_next_row(ObStoreRow& row)
}
// secondly, try to get from other delta table
for (int64_t i = table_cnt - 1; OB_SUCC(ret) && !stop_reading && !final_result && i >= end_table_idx; --i) {
if (OB_FAIL(get_table_row(i, tables, prow, fuse_row, final_result, sstable_end_log_ts, stop_reading))) {
for (int64_t i = table_cnt - 1; OB_SUCC(ret) && !final_result && i >= end_table_idx; --i) {
if (OB_FAIL(get_table_row(i, tables, prow, fuse_row, final_result, sstable_end_log_ts))) {
STORAGE_LOG(WARN, "fail to get table row", K(ret));
}
}
......
......@@ -38,7 +38,7 @@ protected:
private:
virtual int get_table_row(const int64_t table_idx, const ObIArray<ObITable*>& tables, const ObStoreRow*& prow,
ObStoreRow& fuse_row, bool& final_result, int64_t& sstable_end_log_ts, bool& stop_reading);
ObStoreRow& fuse_row, bool& final_result, int64_t& sstable_end_log_ts);
private:
const common::ObExtStoreRowkey* rowkey_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册