From c19b2b1dcd0a5f58403ec6d2c4ad46d8b47527fb Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 3 Mar 2023 07:46:11 +0000 Subject: [PATCH] Fix sw accum_checksum rollback bug. --- .../palf/log_group_entry_header.cpp | 107 ++++++++++++------ src/logservice/palf/log_sliding_window.cpp | 12 +- src/logservice/palf/log_sliding_window.h | 3 +- src/logservice/palf/log_writer_utils.h | 2 +- .../test_log_entry_and_group_entry.cpp | 96 +++++++++++++++- 5 files changed, 176 insertions(+), 44 deletions(-) diff --git a/src/logservice/palf/log_group_entry_header.cpp b/src/logservice/palf/log_group_entry_header.cpp index 0dde7fdf0..d2e1bc611 100644 --- a/src/logservice/palf/log_group_entry_header.cpp +++ b/src/logservice/palf/log_group_entry_header.cpp @@ -17,8 +17,7 @@ #include "lib/checksum/ob_parity_check.h" // parity_check #include "lib/utility/utility.h" // !FALSE_IT #include "lib/oblog/ob_log_module.h" // LOG* -#include "share/rc/ob_tenant_base.h" // mtl_malloc -#include "share/scn.h" // SCN +#include "share/scn.h" // SCN #include "log_define.h" // is_valid_log_id... #include "log_writer_utils.h" // LogWriteBuf @@ -118,41 +117,85 @@ int LogGroupEntryHeader::calculate_log_checksum_(const bool is_padding_log, data_checksum = PADDING_LOG_DATA_CHECKSUM; PALF_LOG(INFO, "This is a padding log, set log data checksum to 0", K(data_checksum), K(data_len)); } else { - char *tmp_buf = NULL; - bool need_free_mem = false; const int64_t total_buf_len = data_len + LogGroupEntryHeader::HEADER_SER_SIZE; - // no need memcpy - if (log_write_buf.check_memory_is_continous()) { - int64_t unused_buf_len = 0; - const char *first_buf = NULL; - if (OB_FAIL(log_write_buf.get_write_buf(0, first_buf, unused_buf_len))) { - PALF_LOG(ERROR, "get_write_buf failed", K(ret), K(log_write_buf), K(data_len)); - } else { - tmp_buf = const_cast(first_buf); - assert(total_buf_len == log_write_buf.get_total_size()); - } + ob_assert(total_buf_len == log_write_buf.get_total_size()); + char *curr_log_buf = NULL; + const char *log_buf = NULL; + int64_t buf_idx = 0, curr_buf_len = 0; + const int64_t buf_cnt = log_write_buf.get_buf_count(); + if (OB_FAIL(log_write_buf.get_write_buf(buf_idx, log_buf, curr_buf_len))) { + PALF_LOG(ERROR, "get_write_buf failed", K(ret), K(log_write_buf), K(data_len)); } else { - assert(total_buf_len == log_write_buf.get_total_size()); - if (NULL == (tmp_buf = reinterpret_cast(mtl_malloc(total_buf_len, "LogChecksum")))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - PALF_LOG(ERROR, "allocate memory failed", K(ret)); - } else { - need_free_mem = true; - log_write_buf.memcpy_to_continous_memory(tmp_buf); - } + curr_log_buf = const_cast(log_buf); } - int64_t pos = LogGroupEntryHeader::HEADER_SER_SIZE; // skip group entry header - assert(total_buf_len > pos); LogEntryHeader log_entry_header; int64_t log_entry_data_checksum = 0; int64_t tmp_log_checksum = 0; - while (OB_SUCC(ret) && NULL != tmp_buf && pos < total_buf_len) { - if (OB_FAIL(log_entry_header.deserialize(tmp_buf, total_buf_len, pos))) { - PALF_LOG(ERROR, "log_entry_header deserialize failed", K(ret), KP(tmp_buf), K(pos), K(total_buf_len)); + int64_t pos = LogGroupEntryHeader::HEADER_SER_SIZE; // skip group entry header + const int64_t log_header_size = LogEntryHeader::HEADER_SER_SIZE; + char tmp_buf[log_header_size]; + int64_t tmp_buf_pos = 0; + while (OB_SUCC(ret)) { + bool need_use_tmp_buf = false; + if (curr_buf_len - pos <= 0) { + if ((buf_idx + 1) >= buf_cnt) { + // calculate finished, end loop + break; + } else { + // switch to next log_buf + // update pos to new val at new log_buf + pos = pos - curr_buf_len; + buf_idx++; + if (OB_FAIL(log_write_buf.get_write_buf(buf_idx, log_buf, curr_buf_len))) { + PALF_LOG(ERROR, "get_write_buf failed", K(ret), K(log_write_buf), K(data_len)); + } else { + curr_log_buf = const_cast(log_buf); + } + if (pos == curr_buf_len) { + // Reach end of log_write_buf, end loop. + break; + } + ob_assert(pos < curr_buf_len); + } + } else if (curr_buf_len - pos < log_header_size) { + need_use_tmp_buf = true; + const int64_t curr_copy_size = curr_buf_len - pos; + // copy the first part of log_entry_header + memcpy(tmp_buf, curr_log_buf + pos, curr_copy_size); + // update pos to the log_entry_header's tail pos at next log_buf + pos = log_header_size - curr_copy_size; + // inc buf_idx and get the next log_buf + buf_idx++; + ob_assert(buf_idx < buf_cnt); + if (OB_FAIL(log_write_buf.get_write_buf(buf_idx, log_buf, curr_buf_len))) { + PALF_LOG(ERROR, "get_write_buf failed", K(ret), K(log_write_buf), K(data_len)); + } else { + curr_log_buf = const_cast(log_buf); + ob_assert(log_header_size > curr_copy_size); + // copy the second part of log_entry_header + memcpy(tmp_buf + curr_copy_size, curr_log_buf, log_header_size - curr_copy_size); + // set the pos of tmp_buf to 0 + tmp_buf_pos = 0; + } + PALF_LOG(INFO, "[WRAP LOG HEADER]", K(ret), K(log_write_buf), K(data_len), + K(pos), K(log_header_size), K(curr_copy_size)); + } else { + // The rest buf contains a valid log_entry_header. + } + + if (OB_FAIL(ret)) { + } else if (false == need_use_tmp_buf + && OB_FAIL(log_entry_header.deserialize(curr_log_buf, curr_buf_len, pos))) { + PALF_LOG(ERROR, "log_entry_header deserialize failed", K(ret), KP(curr_log_buf), + K(curr_buf_len), K(pos), K(total_buf_len), K(log_write_buf), K(buf_idx)); + } else if (true == need_use_tmp_buf + && OB_FAIL(log_entry_header.deserialize(tmp_buf, log_header_size, tmp_buf_pos))) { + PALF_LOG(ERROR, "log_entry_header deserialize failed", K(ret), KP(curr_log_buf), K(curr_buf_len), + K(pos), K(total_buf_len), K(tmp_buf_pos), K(log_write_buf), K(buf_idx)); } else if (false == log_entry_header.check_header_integrity()) { ret = OB_ERR_UNEXPECTED; - PALF_LOG(ERROR, "log_entry_header is invalid", K(ret), KP(tmp_buf), K(pos), K(total_buf_len), - K(log_entry_header)); + PALF_LOG(ERROR, "log_entry_header is invalid", K(ret), KP(curr_log_buf), K(curr_buf_len), K(pos), K(total_buf_len), + K(log_entry_header), K(log_write_buf), K(buf_idx)); } else { log_entry_data_checksum = log_entry_header.get_data_checksum(); tmp_log_checksum = common::ob_crc64(tmp_log_checksum, &log_entry_data_checksum, sizeof(log_entry_data_checksum)); @@ -163,12 +206,8 @@ int LogGroupEntryHeader::calculate_log_checksum_(const bool is_padding_log, if (OB_SUCC(ret)) { data_checksum = tmp_log_checksum; } - - if (NULL != tmp_buf && need_free_mem) { - mtl_free(tmp_buf); - } } - PALF_LOG(TRACE, "calculate_log_checksum_ finished", K(ret), K(*this), K(data_checksum)); + PALF_LOG(TRACE, "calculate_log_checksum_ finished", K(ret), K(log_write_buf), K(*this), K(data_checksum)); return ret; } diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp index 4d6586cad..843a809b5 100644 --- a/src/logservice/palf/log_sliding_window.cpp +++ b/src/logservice/palf/log_sliding_window.cpp @@ -916,11 +916,9 @@ int LogSlidingWindow::handle_next_submit_log_(bool &is_committed_lsn_updated) K(prev_lsn), K(prev_log_pid), K(last_submit_log_id), K(last_submit_lsn), K(last_submit_log_pid), K(tmp_log_id), KPC(log_task)); } else if (OB_FAIL(generate_group_entry_header_(tmp_log_id, log_task, group_entry_header, - group_log_data_checksum))) { + group_log_data_checksum, is_accum_checksum_acquired))) { PALF_LOG(WARN, "generate_group_entry_header_ failed", K_(palf_id), K_(self)); } else { - // set flag for rollback accum_checksum - is_accum_checksum_acquired = true; log_task->lock(); if (!state_mgr_->is_follower_active()) { // Updating data_checksum, accum_checksum, committed_end_lsn for log_task. @@ -1048,7 +1046,8 @@ int LogSlidingWindow::handle_next_submit_log_(bool &is_committed_lsn_updated) int LogSlidingWindow::generate_group_entry_header_(const int64_t log_id, LogTask *log_task, LogGroupEntryHeader &group_header, - int64_t &group_log_data_checksum) + int64_t &group_log_data_checksum, + bool &is_accum_checksum_acquired) { int ret = OB_SUCCESS; if (OB_INVALID_LOG_ID == log_id @@ -1089,6 +1088,8 @@ int LogSlidingWindow::generate_group_entry_header_(const int64_t log_id, } else if (OB_FAIL(checksum_.acquire_accum_checksum(group_log_data_checksum, accum_checksum))) { PALF_LOG(WARN, "update_accumulated_checksum failed", K(ret), K_(palf_id), K_(self)); } else { + // set flag for rollback accum_checksum + is_accum_checksum_acquired = true; (void) group_header.update_accumulated_checksum(accum_checksum); (void) group_header.update_header_checksum(); PALF_LOG(TRACE, "generate_group_entry_header_ success", K(ret), K_(palf_id), K_(self), K(is_padding_log), @@ -1214,6 +1215,9 @@ int LogSlidingWindow::period_freeze_last_log() } else if (OB_FAIL(try_freeze_last_log_task_(last_log_id, last_log_end_lsn, is_need_handle))) { PALF_LOG(WARN, "try_freeze_last_log_task_ failed", K(ret), K_(palf_id), K_(self), K(last_log_id), K(last_log_end_lsn)); } else { + } + if (get_max_log_id() > get_last_submit_log_id_()) { + // try handle next submit log bool is_committed_lsn_updated = false; (void) handle_next_submit_log_(is_committed_lsn_updated); } diff --git a/src/logservice/palf/log_sliding_window.h b/src/logservice/palf/log_sliding_window.h index 54a7412f6..df754f930 100644 --- a/src/logservice/palf/log_sliding_window.h +++ b/src/logservice/palf/log_sliding_window.h @@ -338,7 +338,8 @@ private: int generate_group_entry_header_(const int64_t log_id, LogTask *log_task, LogGroupEntryHeader &header, - int64_t &group_log_checksum); + int64_t &group_log_checksum, + bool &is_accum_checksum_acquired); int gen_committed_end_lsn_(LSN &new_committed_end_lsn); int gen_committed_end_lsn_with_memberlist_( const ObMemberList &member_list, diff --git a/src/logservice/palf/log_writer_utils.h b/src/logservice/palf/log_writer_utils.h index 55c332750..bde8da250 100644 --- a/src/logservice/palf/log_writer_utils.h +++ b/src/logservice/palf/log_writer_utils.h @@ -43,7 +43,7 @@ public: // NB: check_memory_is_continous firstly, and ensure dest_buf can hold enough data. void memcpy_to_continous_memory(char *dest_buf) const; - TO_STRING_KV("total_size", get_total_size(), "count", get_buf_count()); + TO_STRING_KV("total_size", get_total_size(), "count", get_buf_count(), K_(write_buf)); NEED_SERIALIZE_AND_DESERIALIZE; static constexpr int64_t MAX_COUNT = 2; diff --git a/unittest/logservice/test_log_entry_and_group_entry.cpp b/unittest/logservice/test_log_entry_and_group_entry.cpp index ad18eb217..940b03563 100644 --- a/unittest/logservice/test_log_entry_and_group_entry.cpp +++ b/unittest/logservice/test_log_entry_and_group_entry.cpp @@ -32,6 +32,98 @@ using namespace palf; namespace unittest { +TEST(TestLogGroupEntryHeader, test_group_entry_header_wrap_checksum) +{ + const int64_t BUFSIZE = 1 << 21; + LogGroupEntryHeader header; + LogEntryHeader log_entry_header; + int64_t group_entry_header_size = header.get_serialize_size(); + int64_t log_entry_header_size = log_entry_header.get_serialize_size(); + int64_t total_header_size = group_entry_header_size + log_entry_header_size; + char buf[BUFSIZE]; + char ptr[BUFSIZE] = "helloworld"; + // 数据部分 + memcpy(buf + total_header_size, ptr, strlen(ptr)); + + bool is_padding_log = false; + const char *data = buf + total_header_size; + int64_t data_len = strlen(ptr); + memcpy(buf + total_header_size + data_len + log_entry_header_size, ptr, strlen(ptr)); + int64_t min_timestamp = 0; + share::SCN max_scn = share::SCN::min_scn(); + int64_t log_id = 1; + LSN committed_lsn; + committed_lsn.val_ = 1; + int64_t proposal_id = 1; + int64_t log_checksum = 0; + + // test LogEntry and LogEntryHeader + EXPECT_EQ(OB_SUCCESS, log_entry_header.generate_header(data, data_len, share::SCN::base_scn())); + int64_t tmp_pos = 0, new_pos = 0; + EXPECT_EQ(OB_SUCCESS, + log_entry_header.serialize(buf + group_entry_header_size, BUFSIZE, tmp_pos)); + EXPECT_EQ(tmp_pos, log_entry_header_size); + EXPECT_EQ(OB_SUCCESS, + log_entry_header.serialize(buf + total_header_size + data_len, BUFSIZE, new_pos)); + EXPECT_EQ(new_pos, log_entry_header_size); + // test LogGroupEntryHeader and LogEntry + LogWriteBuf write_buf; + + int64_t group_log_data_len = 0; + int64_t group_log_len = group_entry_header_size + (log_entry_header_size + data_len); + for (int64_t sub_val = 1; sub_val < group_log_len; ++sub_val) { + write_buf.reset(); + EXPECT_EQ(OB_SUCCESS, write_buf.push_back(buf, sub_val)); + EXPECT_EQ(OB_SUCCESS, write_buf.push_back(buf + sub_val, group_log_len - sub_val)); + group_log_data_len = group_log_len - group_entry_header_size; + PALF_LOG(INFO, "before group_header generate", K(group_log_data_len), K(write_buf), K(sub_val)); + EXPECT_EQ(OB_SUCCESS, + header.generate(false, is_padding_log, write_buf, group_log_data_len, + max_scn, log_id, committed_lsn, proposal_id, log_checksum)); + } + + is_padding_log = true; + for (int64_t sub_val = 1; sub_val < group_log_len; ++sub_val) { + write_buf.reset(); + EXPECT_EQ(OB_SUCCESS, write_buf.push_back(buf, sub_val)); + EXPECT_EQ(OB_SUCCESS, write_buf.push_back(buf + sub_val, group_log_len - sub_val)); + group_log_data_len = group_log_len - group_entry_header_size; + PALF_LOG(INFO, "before group_header generate", K(group_log_data_len), K(write_buf), K(sub_val)); + EXPECT_EQ(OB_SUCCESS, + header.generate(false, is_padding_log, write_buf, group_log_data_len, + max_scn, log_id, committed_lsn, proposal_id, log_checksum)); + } + + group_log_len = group_entry_header_size + 2 * (log_entry_header_size + data_len); + for (int64_t sub_val = 1; sub_val < group_log_len; ++sub_val) { + write_buf.reset(); + EXPECT_EQ(OB_SUCCESS, write_buf.push_back(buf, sub_val)); + EXPECT_EQ(OB_SUCCESS, write_buf.push_back(buf + sub_val, group_log_len - sub_val)); + group_log_data_len = group_log_len - group_entry_header_size; + PALF_LOG(INFO, "before group_header generate", K(group_log_data_len), K(write_buf), K(sub_val)); + EXPECT_EQ(OB_SUCCESS, + header.generate(false, is_padding_log, write_buf, group_log_data_len, + max_scn, log_id, committed_lsn, proposal_id, log_checksum)); + } + + is_padding_log = true; + for (int64_t sub_val = 1; sub_val < group_log_len; ++sub_val) { + write_buf.reset(); + EXPECT_EQ(OB_SUCCESS, write_buf.push_back(buf, sub_val)); + EXPECT_EQ(OB_SUCCESS, write_buf.push_back(buf + sub_val, group_log_len - sub_val)); + group_log_data_len = group_log_len - group_entry_header_size; + PALF_LOG(INFO, "before group_header generate", K(group_log_data_len), K(write_buf), K(sub_val)); + EXPECT_EQ(OB_SUCCESS, + header.generate(false, is_padding_log, write_buf, group_log_data_len, + max_scn, log_id, committed_lsn, proposal_id, log_checksum)); + } + + is_padding_log = true; + EXPECT_EQ(OB_SUCCESS, + header.generate(false, is_padding_log, write_buf, group_log_data_len, + max_scn, log_id, committed_lsn, proposal_id, log_checksum)); +} + TEST(TestLogGroupEntryHeader, test_log_group_entry_header) { const int64_t BUFSIZE = 1 << 21; @@ -78,8 +170,6 @@ TEST(TestLogGroupEntryHeader, test_log_group_entry_header) max_scn, log_id, committed_lsn, proposal_id, log_checksum)); max_scn.set_base(); int64_t defalut_acc = 10; - header.update_accumulated_checksum(defalut_acc); - header.update_header_checksum(); min_timestamp = 1; EXPECT_EQ(OB_SUCCESS, header.generate(false, is_padding_log, write_buf, data_len + log_entry_header_size, @@ -92,8 +182,6 @@ TEST(TestLogGroupEntryHeader, test_log_group_entry_header) EXPECT_EQ(data_len + log_entry_header_size, header.get_data_len()); EXPECT_EQ(max_scn, header.get_max_scn()); EXPECT_EQ(log_id, header.get_log_id()); - EXPECT_EQ(proposal_id, header.get_log_proposal_id()); - EXPECT_EQ(committed_lsn, header.get_committed_end_lsn()); int64_t pos = 0; EXPECT_EQ(OB_SUCCESS, header.serialize(buf, BUFSIZE, pos)); EXPECT_EQ(pos, header.get_serialize_size()); -- GitLab