From 655020509ba3559ca1280f8c1d22c0ecd9e1f88c Mon Sep 17 00:00:00 2001 From: JiahuaChen Date: Thu, 17 Mar 2022 15:11:18 +0800 Subject: [PATCH] Fix slog reader doesnt switch file when theres no switch log at EOF --- .../slog/ob_storage_log_reader.cpp | 8 +- .../slog/test_storage_log_reader_writer.cpp | 95 ++++++++++++++++++- 2 files changed, 98 insertions(+), 5 deletions(-) diff --git a/src/storage/blocksstable/slog/ob_storage_log_reader.cpp b/src/storage/blocksstable/slog/ob_storage_log_reader.cpp index 5603d8dc16..5d5c4ed0d5 100644 --- a/src/storage/blocksstable/slog/ob_storage_log_reader.cpp +++ b/src/storage/blocksstable/slog/ob_storage_log_reader.cpp @@ -437,10 +437,12 @@ int ObStorageLogReader::load_buf() int ObStorageLogReader::check_switch_file(const int get_ret, const LogCommand cmd) { int ret = get_ret; - // Check switch file when meet below situation: - // 1) Meet switch log command + const bool fetch_log_again = (OB_READ_NOTHING == get_ret); + // Check switch file when meet below 2 situation: + // 1) Meet switch log command, or + // 2) read the last log (which may be incomplete and ignored) // Otherwise directly ret - if (common::OB_SUCCESS == get_ret && OB_LOG_SWITCH_LOG == cmd) { + if ((common::OB_SUCCESS == get_ret && OB_LOG_SWITCH_LOG == cmd) || fetch_log_again) { STORAGE_REDO_LOG(INFO, "reach the end of log", K_(file_id)); if (OB_FAIL(close())) { STORAGE_REDO_LOG(ERROR, "close error", K(ret)); diff --git a/unittest/storage/blocksstable/slog/test_storage_log_reader_writer.cpp b/unittest/storage/blocksstable/slog/test_storage_log_reader_writer.cpp index 059e0f3b59..a07597c6d5 100644 --- a/unittest/storage/blocksstable/slog/test_storage_log_reader_writer.cpp +++ b/unittest/storage/blocksstable/slog/test_storage_log_reader_writer.cpp @@ -863,9 +863,100 @@ TEST_F(TestStorageLogReaderWriter, seek_to_end) ASSERT_EQ(1, cnt); ret = reader.get_next_cursor(read_cursor); ASSERT_EQ(OB_SUCCESS, ret); - ASSERT_EQ(LOG_FILE_ID, read_cursor.file_id_); + ASSERT_EQ(LOG_FILE_ID + 1, read_cursor.file_id_); ASSERT_EQ(LOG_START_SEQ + 2, read_cursor.log_id_); - ASSERT_EQ(4096 * 2, read_cursor.offset_); + ASSERT_EQ(0, read_cursor.offset_); +} + +TEST_F(TestStorageLogReaderWriter, read_multiple_files) +{ + int ret = OB_SUCCESS; + const char LOG_DIR[512] = "./test_storage_log_rw"; + const int64_t LOG_FILE_SIZE = 64 << 20; // 64MB + const int64_t CONCURRENT_TRANS_CNT = 128; + const int64_t LOG_BUFFER_SIZE = 1966080L; // 1.875MB + const int64_t log_cnt_per_file = 10; + + // write part + ObLogCursor start_cursor; + start_cursor.file_id_ = 1; + start_cursor.log_id_ = 1; + start_cursor.offset_ = 0; + + char write_data[128] = "this is read multiple files test."; + ObBaseStorageLogBuffer log_buf; + ret = log_buf.assign(write_data, 1024); + ASSERT_EQ(OB_SUCCESS, ret); + ret = log_buf.set_pos(strlen(write_data)); + ASSERT_EQ(OB_SUCCESS, ret); + + ObStorageLogWriter writer; + ret = writer.init(LOG_DIR, LOG_FILE_SIZE, LOG_BUFFER_SIZE, CONCURRENT_TRANS_CNT); + ASSERT_EQ(OB_SUCCESS, ret); + ret = writer.start_log(start_cursor); + ASSERT_EQ(OB_SUCCESS, ret); + for (int64_t i = 0; i < log_cnt_per_file; ++i) { + ret = writer.flush_log(LogCommand::OB_LOG_DUMMY_LOG, log_buf, start_cursor); + ASSERT_EQ(OB_SUCCESS, ret); + } + start_cursor.file_id_ = 2; + start_cursor.offset_ = 0; + start_cursor.log_id_ = writer.get_cur_cursor().log_id_; + writer.destroy(); + + // manually switch file + ret = writer.init(LOG_DIR, LOG_FILE_SIZE, LOG_BUFFER_SIZE, CONCURRENT_TRANS_CNT); + ASSERT_EQ(OB_SUCCESS, ret); + ret = writer.start_log(start_cursor); + ASSERT_EQ(OB_SUCCESS, ret); + for (int64_t i = 0; i < log_cnt_per_file; ++i) { + ret = writer.flush_log(LogCommand::OB_LOG_DUMMY_LOG, log_buf, start_cursor); + ASSERT_EQ(OB_SUCCESS, ret); + } + writer.destroy(); + + // read part + LogCommand cmd = LogCommand::OB_LOG_UNKNOWN; + uint64_t seq = 0; + int64_t read_len = 0; + char *read_data = NULL; + ObLogCursor read_cursor; + + ObStorageLogReader reader; + ret = reader.init(LOG_DIR, 1, 0); + ASSERT_EQ(OB_SUCCESS, ret); + + while (OB_SUCCESS == ret) { + // read dummy log + ret = reader.read_log(cmd, seq, read_data, read_len); + } + ASSERT_EQ(OB_READ_NOTHING, ret); + + ret = reader.get_cursor(read_cursor); + ASSERT_EQ(OB_SUCCESS, ret); + + ASSERT_EQ(start_cursor.file_id_ + 1, read_cursor.file_id_); + ASSERT_EQ(0, read_cursor.offset_); + ASSERT_EQ(start_cursor.log_id_ + 1, read_cursor.log_id_); + reader.reset(); + + // read start from the end of file 1 + ret = reader.init(LOG_DIR, 1, 21); + ASSERT_EQ(OB_SUCCESS, ret); + + while (OB_SUCCESS == ret) { + // read dummy log + ret = reader.read_log(cmd, seq, read_data, read_len); + } + ASSERT_EQ(OB_READ_NOTHING, ret); + + ret = reader.get_cursor(read_cursor); + ASSERT_EQ(OB_SUCCESS, ret); + + ASSERT_EQ(start_cursor.file_id_ + 1, read_cursor.file_id_); + ASSERT_EQ(0, read_cursor.offset_); + ASSERT_EQ(start_cursor.log_id_ + 1, read_cursor.log_id_); + reader.reset(); } } // namespace blocksstable } // namespace oceanbase -- GitLab