提交 c19b2b1d 编写于 作者: O obdev 提交者: ob-robot

Fix sw accum_checksum rollback bug.

上级 6d7dbdc5
......@@ -17,8 +17,7 @@
#include "lib/checksum/ob_parity_check.h" // parity_check
#include "lib/utility/utility.h" // !FALSE_IT
#include "lib/oblog/ob_log_module.h" // LOG*
#include "share/rc/ob_tenant_base.h" // mtl_malloc
#include "share/scn.h" // SCN
#include "share/scn.h" // SCN
#include "log_define.h" // is_valid_log_id...
#include "log_writer_utils.h" // LogWriteBuf
......@@ -118,41 +117,85 @@ int LogGroupEntryHeader::calculate_log_checksum_(const bool is_padding_log,
data_checksum = PADDING_LOG_DATA_CHECKSUM;
PALF_LOG(INFO, "This is a padding log, set log data checksum to 0", K(data_checksum), K(data_len));
} else {
char *tmp_buf = NULL;
bool need_free_mem = false;
const int64_t total_buf_len = data_len + LogGroupEntryHeader::HEADER_SER_SIZE;
// no need memcpy
if (log_write_buf.check_memory_is_continous()) {
int64_t unused_buf_len = 0;
const char *first_buf = NULL;
if (OB_FAIL(log_write_buf.get_write_buf(0, first_buf, unused_buf_len))) {
PALF_LOG(ERROR, "get_write_buf failed", K(ret), K(log_write_buf), K(data_len));
} else {
tmp_buf = const_cast<char*>(first_buf);
assert(total_buf_len == log_write_buf.get_total_size());
}
ob_assert(total_buf_len == log_write_buf.get_total_size());
char *curr_log_buf = NULL;
const char *log_buf = NULL;
int64_t buf_idx = 0, curr_buf_len = 0;
const int64_t buf_cnt = log_write_buf.get_buf_count();
if (OB_FAIL(log_write_buf.get_write_buf(buf_idx, log_buf, curr_buf_len))) {
PALF_LOG(ERROR, "get_write_buf failed", K(ret), K(log_write_buf), K(data_len));
} else {
assert(total_buf_len == log_write_buf.get_total_size());
if (NULL == (tmp_buf = reinterpret_cast<char *>(mtl_malloc(total_buf_len, "LogChecksum")))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
PALF_LOG(ERROR, "allocate memory failed", K(ret));
} else {
need_free_mem = true;
log_write_buf.memcpy_to_continous_memory(tmp_buf);
}
curr_log_buf = const_cast<char*>(log_buf);
}
int64_t pos = LogGroupEntryHeader::HEADER_SER_SIZE; // skip group entry header
assert(total_buf_len > pos);
LogEntryHeader log_entry_header;
int64_t log_entry_data_checksum = 0;
int64_t tmp_log_checksum = 0;
while (OB_SUCC(ret) && NULL != tmp_buf && pos < total_buf_len) {
if (OB_FAIL(log_entry_header.deserialize(tmp_buf, total_buf_len, pos))) {
PALF_LOG(ERROR, "log_entry_header deserialize failed", K(ret), KP(tmp_buf), K(pos), K(total_buf_len));
int64_t pos = LogGroupEntryHeader::HEADER_SER_SIZE; // skip group entry header
const int64_t log_header_size = LogEntryHeader::HEADER_SER_SIZE;
char tmp_buf[log_header_size];
int64_t tmp_buf_pos = 0;
while (OB_SUCC(ret)) {
bool need_use_tmp_buf = false;
if (curr_buf_len - pos <= 0) {
if ((buf_idx + 1) >= buf_cnt) {
// calculate finished, end loop
break;
} else {
// switch to next log_buf
// update pos to new val at new log_buf
pos = pos - curr_buf_len;
buf_idx++;
if (OB_FAIL(log_write_buf.get_write_buf(buf_idx, log_buf, curr_buf_len))) {
PALF_LOG(ERROR, "get_write_buf failed", K(ret), K(log_write_buf), K(data_len));
} else {
curr_log_buf = const_cast<char*>(log_buf);
}
if (pos == curr_buf_len) {
// Reach end of log_write_buf, end loop.
break;
}
ob_assert(pos < curr_buf_len);
}
} else if (curr_buf_len - pos < log_header_size) {
need_use_tmp_buf = true;
const int64_t curr_copy_size = curr_buf_len - pos;
// copy the first part of log_entry_header
memcpy(tmp_buf, curr_log_buf + pos, curr_copy_size);
// update pos to the log_entry_header's tail pos at next log_buf
pos = log_header_size - curr_copy_size;
// inc buf_idx and get the next log_buf
buf_idx++;
ob_assert(buf_idx < buf_cnt);
if (OB_FAIL(log_write_buf.get_write_buf(buf_idx, log_buf, curr_buf_len))) {
PALF_LOG(ERROR, "get_write_buf failed", K(ret), K(log_write_buf), K(data_len));
} else {
curr_log_buf = const_cast<char*>(log_buf);
ob_assert(log_header_size > curr_copy_size);
// copy the second part of log_entry_header
memcpy(tmp_buf + curr_copy_size, curr_log_buf, log_header_size - curr_copy_size);
// set the pos of tmp_buf to 0
tmp_buf_pos = 0;
}
PALF_LOG(INFO, "[WRAP LOG HEADER]", K(ret), K(log_write_buf), K(data_len),
K(pos), K(log_header_size), K(curr_copy_size));
} else {
// The rest buf contains a valid log_entry_header.
}
if (OB_FAIL(ret)) {
} else if (false == need_use_tmp_buf
&& OB_FAIL(log_entry_header.deserialize(curr_log_buf, curr_buf_len, pos))) {
PALF_LOG(ERROR, "log_entry_header deserialize failed", K(ret), KP(curr_log_buf),
K(curr_buf_len), K(pos), K(total_buf_len), K(log_write_buf), K(buf_idx));
} else if (true == need_use_tmp_buf
&& OB_FAIL(log_entry_header.deserialize(tmp_buf, log_header_size, tmp_buf_pos))) {
PALF_LOG(ERROR, "log_entry_header deserialize failed", K(ret), KP(curr_log_buf), K(curr_buf_len),
K(pos), K(total_buf_len), K(tmp_buf_pos), K(log_write_buf), K(buf_idx));
} else if (false == log_entry_header.check_header_integrity()) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "log_entry_header is invalid", K(ret), KP(tmp_buf), K(pos), K(total_buf_len),
K(log_entry_header));
PALF_LOG(ERROR, "log_entry_header is invalid", K(ret), KP(curr_log_buf), K(curr_buf_len), K(pos), K(total_buf_len),
K(log_entry_header), K(log_write_buf), K(buf_idx));
} else {
log_entry_data_checksum = log_entry_header.get_data_checksum();
tmp_log_checksum = common::ob_crc64(tmp_log_checksum, &log_entry_data_checksum, sizeof(log_entry_data_checksum));
......@@ -163,12 +206,8 @@ int LogGroupEntryHeader::calculate_log_checksum_(const bool is_padding_log,
if (OB_SUCC(ret)) {
data_checksum = tmp_log_checksum;
}
if (NULL != tmp_buf && need_free_mem) {
mtl_free(tmp_buf);
}
}
PALF_LOG(TRACE, "calculate_log_checksum_ finished", K(ret), K(*this), K(data_checksum));
PALF_LOG(TRACE, "calculate_log_checksum_ finished", K(ret), K(log_write_buf), K(*this), K(data_checksum));
return ret;
}
......
......@@ -916,11 +916,9 @@ int LogSlidingWindow::handle_next_submit_log_(bool &is_committed_lsn_updated)
K(prev_lsn), K(prev_log_pid), K(last_submit_log_id), K(last_submit_lsn), K(last_submit_log_pid),
K(tmp_log_id), KPC(log_task));
} else if (OB_FAIL(generate_group_entry_header_(tmp_log_id, log_task, group_entry_header,
group_log_data_checksum))) {
group_log_data_checksum, is_accum_checksum_acquired))) {
PALF_LOG(WARN, "generate_group_entry_header_ failed", K_(palf_id), K_(self));
} else {
// set flag for rollback accum_checksum
is_accum_checksum_acquired = true;
log_task->lock();
if (!state_mgr_->is_follower_active()) {
// Updating data_checksum, accum_checksum, committed_end_lsn for log_task.
......@@ -1048,7 +1046,8 @@ int LogSlidingWindow::handle_next_submit_log_(bool &is_committed_lsn_updated)
int LogSlidingWindow::generate_group_entry_header_(const int64_t log_id,
LogTask *log_task,
LogGroupEntryHeader &group_header,
int64_t &group_log_data_checksum)
int64_t &group_log_data_checksum,
bool &is_accum_checksum_acquired)
{
int ret = OB_SUCCESS;
if (OB_INVALID_LOG_ID == log_id
......@@ -1089,6 +1088,8 @@ int LogSlidingWindow::generate_group_entry_header_(const int64_t log_id,
} else if (OB_FAIL(checksum_.acquire_accum_checksum(group_log_data_checksum, accum_checksum))) {
PALF_LOG(WARN, "update_accumulated_checksum failed", K(ret), K_(palf_id), K_(self));
} else {
// set flag for rollback accum_checksum
is_accum_checksum_acquired = true;
(void) group_header.update_accumulated_checksum(accum_checksum);
(void) group_header.update_header_checksum();
PALF_LOG(TRACE, "generate_group_entry_header_ success", K(ret), K_(palf_id), K_(self), K(is_padding_log),
......@@ -1214,6 +1215,9 @@ int LogSlidingWindow::period_freeze_last_log()
} else if (OB_FAIL(try_freeze_last_log_task_(last_log_id, last_log_end_lsn, is_need_handle))) {
PALF_LOG(WARN, "try_freeze_last_log_task_ failed", K(ret), K_(palf_id), K_(self), K(last_log_id), K(last_log_end_lsn));
} else {
}
if (get_max_log_id() > get_last_submit_log_id_()) {
// try handle next submit log
bool is_committed_lsn_updated = false;
(void) handle_next_submit_log_(is_committed_lsn_updated);
}
......
......@@ -338,7 +338,8 @@ private:
int generate_group_entry_header_(const int64_t log_id,
LogTask *log_task,
LogGroupEntryHeader &header,
int64_t &group_log_checksum);
int64_t &group_log_checksum,
bool &is_accum_checksum_acquired);
int gen_committed_end_lsn_(LSN &new_committed_end_lsn);
int gen_committed_end_lsn_with_memberlist_(
const ObMemberList &member_list,
......
......@@ -43,7 +43,7 @@ public:
// NB: check_memory_is_continous firstly, and ensure dest_buf can hold enough data.
void memcpy_to_continous_memory(char *dest_buf) const;
TO_STRING_KV("total_size", get_total_size(), "count", get_buf_count());
TO_STRING_KV("total_size", get_total_size(), "count", get_buf_count(), K_(write_buf));
NEED_SERIALIZE_AND_DESERIALIZE;
static constexpr int64_t MAX_COUNT = 2;
......
......@@ -32,6 +32,98 @@ using namespace palf;
namespace unittest
{
TEST(TestLogGroupEntryHeader, test_group_entry_header_wrap_checksum)
{
const int64_t BUFSIZE = 1 << 21;
LogGroupEntryHeader header;
LogEntryHeader log_entry_header;
int64_t group_entry_header_size = header.get_serialize_size();
int64_t log_entry_header_size = log_entry_header.get_serialize_size();
int64_t total_header_size = group_entry_header_size + log_entry_header_size;
char buf[BUFSIZE];
char ptr[BUFSIZE] = "helloworld";
// 数据部分
memcpy(buf + total_header_size, ptr, strlen(ptr));
bool is_padding_log = false;
const char *data = buf + total_header_size;
int64_t data_len = strlen(ptr);
memcpy(buf + total_header_size + data_len + log_entry_header_size, ptr, strlen(ptr));
int64_t min_timestamp = 0;
share::SCN max_scn = share::SCN::min_scn();
int64_t log_id = 1;
LSN committed_lsn;
committed_lsn.val_ = 1;
int64_t proposal_id = 1;
int64_t log_checksum = 0;
// test LogEntry and LogEntryHeader
EXPECT_EQ(OB_SUCCESS, log_entry_header.generate_header(data, data_len, share::SCN::base_scn()));
int64_t tmp_pos = 0, new_pos = 0;
EXPECT_EQ(OB_SUCCESS,
log_entry_header.serialize(buf + group_entry_header_size, BUFSIZE, tmp_pos));
EXPECT_EQ(tmp_pos, log_entry_header_size);
EXPECT_EQ(OB_SUCCESS,
log_entry_header.serialize(buf + total_header_size + data_len, BUFSIZE, new_pos));
EXPECT_EQ(new_pos, log_entry_header_size);
// test LogGroupEntryHeader and LogEntry
LogWriteBuf write_buf;
int64_t group_log_data_len = 0;
int64_t group_log_len = group_entry_header_size + (log_entry_header_size + data_len);
for (int64_t sub_val = 1; sub_val < group_log_len; ++sub_val) {
write_buf.reset();
EXPECT_EQ(OB_SUCCESS, write_buf.push_back(buf, sub_val));
EXPECT_EQ(OB_SUCCESS, write_buf.push_back(buf + sub_val, group_log_len - sub_val));
group_log_data_len = group_log_len - group_entry_header_size;
PALF_LOG(INFO, "before group_header generate", K(group_log_data_len), K(write_buf), K(sub_val));
EXPECT_EQ(OB_SUCCESS,
header.generate(false, is_padding_log, write_buf, group_log_data_len,
max_scn, log_id, committed_lsn, proposal_id, log_checksum));
}
is_padding_log = true;
for (int64_t sub_val = 1; sub_val < group_log_len; ++sub_val) {
write_buf.reset();
EXPECT_EQ(OB_SUCCESS, write_buf.push_back(buf, sub_val));
EXPECT_EQ(OB_SUCCESS, write_buf.push_back(buf + sub_val, group_log_len - sub_val));
group_log_data_len = group_log_len - group_entry_header_size;
PALF_LOG(INFO, "before group_header generate", K(group_log_data_len), K(write_buf), K(sub_val));
EXPECT_EQ(OB_SUCCESS,
header.generate(false, is_padding_log, write_buf, group_log_data_len,
max_scn, log_id, committed_lsn, proposal_id, log_checksum));
}
group_log_len = group_entry_header_size + 2 * (log_entry_header_size + data_len);
for (int64_t sub_val = 1; sub_val < group_log_len; ++sub_val) {
write_buf.reset();
EXPECT_EQ(OB_SUCCESS, write_buf.push_back(buf, sub_val));
EXPECT_EQ(OB_SUCCESS, write_buf.push_back(buf + sub_val, group_log_len - sub_val));
group_log_data_len = group_log_len - group_entry_header_size;
PALF_LOG(INFO, "before group_header generate", K(group_log_data_len), K(write_buf), K(sub_val));
EXPECT_EQ(OB_SUCCESS,
header.generate(false, is_padding_log, write_buf, group_log_data_len,
max_scn, log_id, committed_lsn, proposal_id, log_checksum));
}
is_padding_log = true;
for (int64_t sub_val = 1; sub_val < group_log_len; ++sub_val) {
write_buf.reset();
EXPECT_EQ(OB_SUCCESS, write_buf.push_back(buf, sub_val));
EXPECT_EQ(OB_SUCCESS, write_buf.push_back(buf + sub_val, group_log_len - sub_val));
group_log_data_len = group_log_len - group_entry_header_size;
PALF_LOG(INFO, "before group_header generate", K(group_log_data_len), K(write_buf), K(sub_val));
EXPECT_EQ(OB_SUCCESS,
header.generate(false, is_padding_log, write_buf, group_log_data_len,
max_scn, log_id, committed_lsn, proposal_id, log_checksum));
}
is_padding_log = true;
EXPECT_EQ(OB_SUCCESS,
header.generate(false, is_padding_log, write_buf, group_log_data_len,
max_scn, log_id, committed_lsn, proposal_id, log_checksum));
}
TEST(TestLogGroupEntryHeader, test_log_group_entry_header)
{
const int64_t BUFSIZE = 1 << 21;
......@@ -78,8 +170,6 @@ TEST(TestLogGroupEntryHeader, test_log_group_entry_header)
max_scn, log_id, committed_lsn, proposal_id, log_checksum));
max_scn.set_base();
int64_t defalut_acc = 10;
header.update_accumulated_checksum(defalut_acc);
header.update_header_checksum();
min_timestamp = 1;
EXPECT_EQ(OB_SUCCESS,
header.generate(false, is_padding_log, write_buf, data_len + log_entry_header_size,
......@@ -92,8 +182,6 @@ TEST(TestLogGroupEntryHeader, test_log_group_entry_header)
EXPECT_EQ(data_len + log_entry_header_size, header.get_data_len());
EXPECT_EQ(max_scn, header.get_max_scn());
EXPECT_EQ(log_id, header.get_log_id());
EXPECT_EQ(proposal_id, header.get_log_proposal_id());
EXPECT_EQ(committed_lsn, header.get_committed_end_lsn());
int64_t pos = 0;
EXPECT_EQ(OB_SUCCESS, header.serialize(buf, BUFSIZE, pos));
EXPECT_EQ(pos, header.get_serialize_size());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册