提交 0b7cf7d3 编写于 作者: O obdev 提交者: ob-robot

[BUGFIX] fix repeat update lob in one stmt wrong

上级 0a84adb3
......@@ -299,6 +299,13 @@ DEF_TO_STRING(ObLobLocatorV2)
J_KV(K(*location_info));
J_COMMA();
}
if (buf_len > pos && extern_header->flags_.has_retry_info_
&& size_ >= offset + MEM_LOB_EXTERN_RETRYINFO_LEN) {
ObMemLobRetryInfo *retry_info = reinterpret_cast<ObMemLobRetryInfo *>(ptr_ + offset);
offset += MEM_LOB_EXTERN_RETRYINFO_LEN;
J_KV(K(*retry_info));
J_COMMA();
}
if (buf_len > pos) {
ObString rowkey_str(MIN(extern_header->rowkey_size_, buf_len - pos), ptr_ + offset);
offset += extern_header->rowkey_size_;
......@@ -346,6 +353,9 @@ uint32_t ObLobLocatorV2::calc_locator_full_len(const ObMemLobExternFlags &flags,
if (flags.has_location_info_) {
loc_len += MEM_LOB_EXTERN_LOCATIONINFO_LEN;
}
if (flags.has_retry_info_) {
loc_len += MEM_LOB_EXTERN_RETRYINFO_LEN;
}
loc_len += MEM_LOB_ADDR_LEN; //ToDo:@gehao server address.
loc_len += rowkey_size;
}
......@@ -426,6 +436,10 @@ int ObLobLocatorV2::fill(ObMemLobType type,
offset += MEM_LOB_EXTERN_LOCATIONINFO_LEN;
*extern_len += MEM_LOB_EXTERN_LOCATIONINFO_LEN;
}
if (flags.has_retry_info_) {
offset += MEM_LOB_EXTERN_RETRYINFO_LEN;
*extern_len += MEM_LOB_EXTERN_RETRYINFO_LEN;
}
if ((offset + rowkey_str.length()) && OB_UNLIKELY(offset > size_)) {
ret = OB_BUF_NOT_ENOUGH;
......@@ -749,6 +763,28 @@ int ObLobLocatorV2::get_location_info(ObMemLobLocationInfo *&location_info) cons
return ret;
}
int ObLobLocatorV2::get_retry_info(ObMemLobRetryInfo *&retry_info) const
{
int ret = OB_SUCCESS;
ObMemLobExternHeader *extern_header = NULL;
if (OB_SUCC(get_extern_header(extern_header))) {
char *cur_pos = extern_header->data_ + MEM_LOB_EXTERN_SIZE_LEN;
if (extern_header->flags_.has_tx_info_) {
cur_pos += MEM_LOB_EXTERN_TXINFO_LEN;
}
if (extern_header->flags_.has_location_info_) {
cur_pos += MEM_LOB_EXTERN_LOCATIONINFO_LEN;
}
if (extern_header->flags_.has_retry_info_) {
retry_info = reinterpret_cast<ObMemLobRetryInfo *>(cur_pos);
} else {
ret = OB_ERR_NULL_VALUE;
COMMON_LOG(WARN, "Lob: does not have retry info", K(this), K(ret));
}
}
return ret;
}
int ObLobLocatorV2::get_real_locator_len(int64_t &real_len) const
{
int ret = OB_SUCCESS;
......@@ -883,6 +919,17 @@ int ObLobLocatorV2::set_location_info(const ObMemLobLocationInfo &location_info)
return ret;
}
int ObLobLocatorV2::set_retry_info(const ObMemLobRetryInfo &retry_info)
{
validate_has_lob_header(has_lob_header_);
int ret = OB_SUCCESS;
ObMemLobRetryInfo *retry_info_ptr = NULL;
if (OB_SUCC(get_retry_info(retry_info_ptr))) {
*retry_info_ptr = retry_info;
}
return ret;
}
OB_DEF_SERIALIZE(ObLobLocatorV2)
{
int ret = OB_SUCCESS;
......
......@@ -781,11 +781,11 @@ struct ObMemLobCommon
struct ObMemLobExternFlags
{
ObMemLobExternFlags() :
has_tx_info_(1), has_location_info_(1), reserved_(0)
has_tx_info_(1), has_location_info_(1), has_retry_info_(1), reserved_(0)
{}
ObMemLobExternFlags(bool enable) :
has_tx_info_(enable), has_location_info_(enable), reserved_(0)
has_tx_info_(enable), has_location_info_(enable), has_retry_info_(enable), reserved_(0)
{}
ObMemLobExternFlags(const ObMemLobExternFlags &flags) { *this = flags; }
......@@ -800,11 +800,12 @@ struct ObMemLobExternFlags
return (*(reinterpret_cast<uint16_t *>(this)) = 0);
}
TO_STRING_KV(K_(has_tx_info), K_(has_location_info), K_(reserved));
TO_STRING_KV(K_(has_tx_info), K_(has_location_info), K_(has_retry_info), K_(reserved));
uint16_t has_tx_info_ : 1; // Indicate whether tx info exists
uint16_t has_location_info_ : 1; // Indicate whether has cid exists (reserved)
uint16_t reserved_ : 14;
uint16_t has_retry_info_ : 1; // Indicate whether has retry info exists
uint16_t reserved_ : 13;
};
// Memory Locator V2, Extern Header:
......@@ -873,6 +874,16 @@ struct ObMemLobLocationInfo
char data_[0];
};
struct ObMemLobRetryInfo
{
ObMemLobRetryInfo() : is_select_leader_(true), read_latest_(false), addr_(), timeout_(0) {}
TO_STRING_KV(K_(is_select_leader), K_(read_latest), K_(addr), K_(timeout));
bool is_select_leader_;
bool read_latest_;
ObAddr addr_;
uint64_t timeout_;
};
OB_INLINE void validate_has_lob_header(const bool &has_header)
{
#ifdef VALIDATE_LOB_HEADER
......@@ -897,6 +908,7 @@ public:
static const uint32_t MEM_LOB_EXTERN_HEADER_LEN = sizeof(ObMemLobExternHeader);
static const uint32_t MEM_LOB_EXTERN_TXINFO_LEN = sizeof(ObMemLobTxInfo);
static const uint32_t MEM_LOB_EXTERN_LOCATIONINFO_LEN = sizeof(ObMemLobLocationInfo);
static const uint32_t MEM_LOB_EXTERN_RETRYINFO_LEN = sizeof(ObMemLobRetryInfo);
static const uint16_t MEM_LOB_EXTERN_SIZE_LEN = sizeof(uint16_t);
static const uint32_t MEM_LOB_ADDR_LEN = 0; // reserved for temp lob address
......@@ -1008,6 +1020,7 @@ public:
int set_payload_data(const ObLobCommon *lob_comm, const ObString& payload);
int set_tx_info(const ObMemLobTxInfo &tx_info);
int set_location_info(const ObMemLobLocationInfo &location_info);
int set_retry_info(const ObMemLobRetryInfo &retry_info);
// interfaces for read
// Notice: all the following functions should be called after is_valid() or fill()
......@@ -1022,6 +1035,7 @@ public:
int get_table_info(uint64_t &table_id, uint32_t &column_idex);
int get_tx_info(ObMemLobTxInfo *&tx_info) const;
int get_location_info(ObMemLobLocationInfo *&location_info) const;
int get_retry_info(ObMemLobRetryInfo *&retry_info) const;
int get_real_locator_len(int64_t &real_len) const;
bool is_empty_lob() const;
......
......@@ -19,6 +19,7 @@
#include "storage/tx/ob_trans_define_v4.h"
#include "storage/tx/ob_trans_service.h"
#include "share/ob_lob_access_utils.h"
#include "observer/ob_server.h"
namespace oceanbase
{
......@@ -38,7 +39,8 @@ ObLobLocatorHelper::ObLobLocatorHelper()
locator_allocator_(ObModIds::OB_LOB_READER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
rowkey_str_(),
enable_locator_v2_(),
is_inited_(false)
is_inited_(false),
scan_flag_()
{
}
......@@ -96,6 +98,7 @@ int ObLobLocatorHelper::init(const ObTableScanParam &scan_param,
ls_id_ = ls_id.id();
read_snapshot_ = ctx.mvcc_acc_ctx_.snapshot_;
enable_locator_v2_ = table_param.enable_lob_locator_v2();
scan_flag_ = scan_param.scan_flag_;
if (snapshot_version != read_snapshot_.version_.get_val_for_tx()) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "snapshot version mismatch",
......@@ -310,6 +313,7 @@ int ObLobLocatorHelper::fuse_mem_lob_header(ObObj &def_obj, uint64_t col_id, boo
// mysql inrow lobs & systable lobs do not have extern fields
bool has_extern = (lib::is_oracle_mode() && !is_systable);
ObMemLobExternFlags extern_flags(has_extern);
extern_flags.has_retry_info_ = 0; // default obj should only be inrow, no need retry info
ObLobCommon lob_common;
int64_t full_loc_size = ObLobLocatorV2::calc_locator_full_len(extern_flags,
rowkey_str_.length(),
......@@ -544,6 +548,11 @@ int ObLobLocatorHelper::build_lob_locatorv2(ObLobLocatorV2 &locator,
ObMemLobTxInfo tx_info(read_snapshot_.version_.get_val_for_tx(),
read_snapshot_.tx_id_.get_id(),
read_snapshot_.scn_.cast_to_int());
ObMemLobRetryInfo retry_info;
retry_info.addr_ = MYADDR;
retry_info.is_select_leader_ = true;
retry_info.read_latest_ = scan_flag_.read_latest_;
retry_info.timeout_ = access_ctx.timeout_;
ObMemLobLocationInfo location_info(tablet_id_, ls_id_, cs_type);
if (has_extern && OB_FAIL(locator.set_table_info(table_id_, column_id))) { // should be column idx
STORAGE_LOG(WARN, "Lob: set table info failed", K(ret), K(table_id_), K(column_id));
......@@ -551,6 +560,8 @@ int ObLobLocatorHelper::build_lob_locatorv2(ObLobLocatorV2 &locator,
STORAGE_LOG(WARN, "Lob: set transaction info failed", K(ret), K(tx_info));
} else if (extern_flags.has_location_info_ && OB_FAIL(locator.set_location_info(location_info))) {
STORAGE_LOG(WARN, "Lob: set location info failed", K(ret), K(location_info));
} else if (extern_flags.has_retry_info_ && OB_FAIL(locator.set_retry_info(retry_info))) {
STORAGE_LOG(WARN, "Lob: set location info failed", K(ret), K(retry_info));
}
}
......
......@@ -93,6 +93,7 @@ private:
ObString rowkey_str_; // for default values
bool enable_locator_v2_;
bool is_inited_;
ObQueryFlag scan_flag_;
};
} // namespace storage
......
......@@ -3554,10 +3554,16 @@ int ObLobManager::build_lob_param(ObLobAccessParam& param,
if (OB_SUCC(ret) && lob.is_persist_lob() && !lob.has_inrow_data()) {
ObMemLobTxInfo *tx_info = nullptr;
ObMemLobLocationInfo *location_info = nullptr;
ObMemLobRetryInfo *retry_info = nullptr;
ObMemLobExternHeader *extern_header = NULL;
if (OB_FAIL(lob.get_tx_info(tx_info))) {
LOG_WARN("failed to get tx info", K(ret), K(lob));
} else if (OB_FAIL(lob.get_location_info(location_info))) {
LOG_WARN("failed to get location info", K(ret), K(lob));
} else if (OB_FAIL(lob.get_extern_header(extern_header))) {
LOG_WARN("failed to get extern header", K(ret), K(lob));
} else if (extern_header->flags_.has_retry_info_ && OB_FAIL(lob.get_retry_info(retry_info))) {
LOG_WARN("failed to get retry info", K(ret), K(lob));
} else {
auto snapshot_tx_seq = transaction::ObTxSEQ::cast_from_int(tx_info->snapshot_seq_);
if (OB_ISNULL(param.tx_desc_) ||
......@@ -3569,6 +3575,7 @@ int ObLobManager::build_lob_param(ObLobAccessParam& param,
param.snapshot_.valid_ = true;
param.snapshot_.source_ = transaction::ObTxReadSnapshot::SRC::LS;
param.snapshot_.snapshot_lsid_ = share::ObLSID(location_info->ls_id_);
param.read_latest_ = retry_info->read_latest_;
} else {
// When param for write, param.tx_desc_ should not be null
// If tx indfo from lob locator is old, produce new read snapshot directly
......
......@@ -936,7 +936,7 @@ int ObPersistentLobApator::build_common_scan_param(
false, // index_back
false, // query_stat
ObQueryFlag::MysqlMode, // sql_mode
false // read_latest
param.read_latest_ // read_latest
);
query_flag.disable_cache();
query_flag.scan_order_ = param.scan_backward_ ? ObQueryFlag::Reverse : ObQueryFlag::Forward;
......@@ -961,6 +961,9 @@ int ObPersistentLobApator::build_common_scan_param(
scan_param.limit_param_.offset_ = 0;
// sessions
scan_param.snapshot_ = param.snapshot_;
if(param.read_latest_) {
scan_param.tx_id_ = param.snapshot_.core_.tx_id_;
}
scan_param.sql_mode_ = param.sql_mode_;
// common set
scan_param.allocator_ = param.allocator_;
......
......@@ -44,7 +44,8 @@ struct ObLobStorageParam
struct ObLobAccessParam {
ObLobAccessParam()
: tx_desc_(nullptr), snapshot_(), tx_id_(), sql_mode_(SMO_DEFAULT), allocator_(nullptr),
: tx_desc_(nullptr), snapshot_(), tx_id_(), read_latest_(0),
sql_mode_(SMO_DEFAULT), allocator_(nullptr),
dml_base_param_(nullptr), column_ids_(),
meta_table_schema_(nullptr), piece_table_schema_(nullptr),
main_tablet_param_(nullptr), meta_tablet_param_(nullptr), piece_tablet_param_(nullptr),
......@@ -68,12 +69,13 @@ public:
TO_STRING_KV(K_(tenant_id), K_(src_tenant_id), K_(ls_id), K_(tablet_id), KPC_(lob_locator), KPC_(lob_common),
KPC_(lob_data), K_(byte_size), K_(handle_size), K_(coll_type), K_(scan_backward), K_(offset), K_(len),
K_(parent_seq_no), K_(seq_no_st), K_(used_seq_cnt), K_(total_seq_cnt), K_(checksum),
K_(update_len), K_(op_type), K_(is_fill_zero), K_(from_rpc), K_(snapshot), K_(tx_id), K_(inrow_read_nocopy),
K_(inrow_threshold), K_(spec_lob_id));
K_(update_len), K_(op_type), K_(is_fill_zero), K_(from_rpc), K_(snapshot), K_(tx_id), K_(read_latest),
K_(inrow_read_nocopy), K_(inrow_threshold), K_(spec_lob_id));
public:
transaction::ObTxDesc *tx_desc_; // for write/update/delete
transaction::ObTxReadSnapshot snapshot_; // for read
transaction::ObTransID tx_id_; // used when read-latest
transaction::ObTransID tx_id_; // used when read-latest
bool read_latest_;
ObSQLMode sql_mode_;
bool is_total_quantity_log_;
ObIAllocator *allocator_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册