提交 23934c96 编写于 作者: C chaser-ch 提交者: wangzelin.wzl

fix border key buf overflow problem

上级 ce5da320
......@@ -807,26 +807,30 @@ int ObMacroRowIterator::compare(const ObMacroRowIterator& other, int64_t& cmp_re
} else if (cmp_ret < 0) {
cmp_ret = -1;
}
} else {
cmp_ret = compare(left_rowkey, other.curr_range_);
if (0 == cmp_ret) {
cmp_ret = CANNOT_COMPARE_RIGHT_IS_RANGE;
}
} else if (OB_FAIL(compare(left_rowkey, other.curr_range_, cmp_ret))) {
STORAGE_LOG(WARN, "Failed to compare rowkey with range", K(ret), K(left_rowkey), K(other.curr_range_));
} else if (0 == cmp_ret) {
cmp_ret = CANNOT_COMPARE_RIGHT_IS_RANGE;
}
} else {
if (NULL != other.curr_row_) {
// when doing minor merge, the only case we need to use trans version column in comparison
// is when left is a range and right is a row
right_rowkey.assign(other.curr_row_->row_val_.cells_, get_rowkey_column_cnt());
cmp_ret = -compare(right_rowkey, curr_range_);
if (0 == cmp_ret) {
cmp_ret = CANNOT_COMPARE_LEFT_IS_RANGE;
if (OB_FAIL(compare(right_rowkey, curr_range_, cmp_ret))) {
STORAGE_LOG(WARN, "Failed to compare rowkey with range", K(ret), K(right_rowkey), K(curr_range_));
} else {
cmp_ret = -cmp_ret;
if (0 == cmp_ret) {
cmp_ret = CANNOT_COMPARE_LEFT_IS_RANGE;
}
}
} else {
left_rowkey = curr_range_.get_start_key();
right_rowkey = curr_range_.get_end_key();
cmp_ret = compare(left_rowkey, other.curr_range_);
if (cmp_ret > 0) {
if (OB_FAIL(compare(left_rowkey, other.curr_range_, cmp_ret))) {
STORAGE_LOG(WARN, "Failed to compare rowkey with range", K(ret), K(left_rowkey), K(other.curr_range_));
} else if (cmp_ret > 0) {
cmp_ret = 1;
} else if (cmp_ret == 0) {
if (left_rowkey.compare(other.curr_range_.get_end_key()) == 0 &&
......@@ -842,8 +846,9 @@ int ObMacroRowIterator::compare(const ObMacroRowIterator& other, int64_t& cmp_re
cmp_ret = CANNOT_COMPARE_RIGHT_IS_RANGE;
}
} else {
cmp_ret = compare(right_rowkey, other.curr_range_);
if (cmp_ret < 0) {
if (OB_FAIL(compare(right_rowkey, other.curr_range_, cmp_ret))) {
STORAGE_LOG(WARN, "Failed to compare rowkey with range", K(ret), K(right_rowkey), K(other.curr_range_));
} else if (cmp_ret < 0) {
cmp_ret = -1;
} else if (cmp_ret == 0) {
if (right_rowkey.compare(other.curr_range_.get_start_key()) == 0 &&
......@@ -862,10 +867,16 @@ int ObMacroRowIterator::compare(const ObMacroRowIterator& other, int64_t& cmp_re
return ret;
}
void ObMacroRowIterator::get_border_key(const ObStoreRowkey& border_key, const bool is_start_key, ObStoreRowkey& rowkey)
int ObMacroRowIterator::get_border_key(const ObStoreRowkey& border_key, const bool is_start_key, ObStoreRowkey& rowkey)
{
int ret = OB_SUCCESS;
if (NULL == multi_version_row_info_) {
rowkey.assign(const_cast<ObObj*>(border_key.get_obj_ptr()), border_key.get_obj_cnt());
} else if (border_key.is_min() || border_key.is_max()) {
rowkey = border_key;
} else if (OB_UNLIKELY(border_key.get_obj_cnt() < rowkey_column_cnt_)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "Unexpected column cnt of border key", K(ret), K(border_key), K_(rowkey_column_cnt));
} else {
rowkey.assign(cells_, multi_version_row_info_->multi_version_rowkey_column_cnt_);
for (int64_t i = 0; i < rowkey_column_cnt_; ++i) {
......@@ -879,43 +890,49 @@ void ObMacroRowIterator::get_border_key(const ObStoreRowkey& border_key, const b
cells_[multi_version_row_info_->trans_version_index_ + 1].set_max_value();
}
}
return ret;
}
int64_t ObMacroRowIterator::compare(const ObStoreRowkey& rowkey, const ObStoreRange& range)
int ObMacroRowIterator::compare(const ObStoreRowkey& rowkey, const ObStoreRange& range, int64_t& cmp_ret)
{
int64_t cmp_ret = 0;
int ret = OB_SUCCESS;
int32_t left_cmp = 0;
int32_t right_cmp = 0;
cmp_ret = 0;
ObStoreRowkey start_key;
ObStoreRowkey end_key;
get_border_key(range.get_start_key(), true, start_key);
left_cmp = rowkey.compare(start_key);
if (left_cmp < 0) {
cmp_ret = -1;
} else if (left_cmp == 0) {
if (range.get_border_flag().inclusive_start()) {
cmp_ret = 0;
} else {
cmp_ret = -1;
}
if (OB_FAIL(get_border_key(range.get_start_key(), true, start_key))) {
STORAGE_LOG(WARN, "Failed to get start key from border key", K(ret), K(range));
} else {
get_border_key(range.get_end_key(), false, end_key);
right_cmp = rowkey.compare(end_key);
if (right_cmp > 0) {
cmp_ret = 1;
} else if (right_cmp == 0) {
if (range.get_border_flag().inclusive_end()) {
left_cmp = rowkey.compare(start_key);
if (left_cmp < 0) {
cmp_ret = -1;
} else if (left_cmp == 0) {
if (range.get_border_flag().inclusive_start()) {
cmp_ret = 0;
} else {
cmp_ret = 1;
cmp_ret = -1;
}
} else if (OB_FAIL(get_border_key(range.get_end_key(), false, end_key))) {
STORAGE_LOG(WARN, "Failed to get end key from border key", K(ret), K(range));
} else {
cmp_ret = 0;
right_cmp = rowkey.compare(end_key);
if (right_cmp > 0) {
cmp_ret = 1;
} else if (right_cmp == 0) {
if (range.get_border_flag().inclusive_end()) {
cmp_ret = 0;
} else {
cmp_ret = 1;
}
} else {
cmp_ret = 0;
}
}
}
return cmp_ret;
return ret;
}
int ObMacroRowIterator::multi_version_compare(const ObMacroRowIterator& other, int64_t& cmp_ret)
......
......@@ -340,8 +340,8 @@ public:
static const int64_t CANNOT_COMPARE_BOTH_ARE_RANGE = 3;
protected:
int64_t compare(const common::ObStoreRowkey& rowkey, const common::ObStoreRange& range);
void get_border_key(const common::ObStoreRowkey& border_key, const bool is_start_key, common::ObStoreRowkey& rowkey);
int compare(const common::ObStoreRowkey& rowkey, const common::ObStoreRange& range, int64_t& cmp_ret);
int get_border_key(const common::ObStoreRowkey& border_key, const bool is_start_key, common::ObStoreRowkey& rowkey);
protected:
int next_range();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册