From ef2b6227955c0345bc564a19d9d1fc3bff4cb2b0 Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 15 Jul 2021 10:51:14 +0800 Subject: [PATCH] Decouple clog writer and shared memory file --- src/clog/ob_clog_file_writer.cpp | 283 +++++-------------- src/clog/ob_clog_file_writer.h | 14 +- src/clog/ob_clog_writer.cpp | 21 +- src/clog/ob_ilog_storage.cpp | 33 ++- src/clog/ob_ilog_storage.h | 14 +- src/clog/ob_log_define.h | 1 + src/clog/ob_log_engine.cpp | 28 +- src/clog/ob_log_engine.h | 2 +- src/clog/ob_log_entry.cpp | 2 +- unittest/clog/test_clog_writer.cpp | 138 ++++----- unittest/clog/test_ob_raw_entry_iterator.cpp | 4 +- 11 files changed, 185 insertions(+), 355 deletions(-) diff --git a/src/clog/ob_clog_file_writer.cpp b/src/clog/ob_clog_file_writer.cpp index d525599744..4a9f941548 100644 --- a/src/clog/ob_clog_file_writer.cpp +++ b/src/clog/ob_clog_file_writer.cpp @@ -27,17 +27,15 @@ using namespace oceanbase::common; namespace oceanbase { namespace clog { -ObCLogBaseFileWriter::ObCLogBaseFileWriter() - : is_inited_(false), - log_ctrl_(NULL), - shm_buf_(NULL), - shm_data_buf_(NULL), - buf_write_pos_(0), - file_offset_(0), - buf_padding_size_(0), - align_size_(0), - store_(NULL), - file_id_(0) +ObCLogBaseFileWriter::ObCLogBaseFileWriter() : + is_inited_(false), + aligned_data_buf_(nullptr), + buf_write_pos_(0), + file_offset_(0), + buf_padding_size_(0), + align_size_(0), + store_(NULL), + file_id_(0) { log_dir_[0] = '\0'; } @@ -47,35 +45,37 @@ ObCLogBaseFileWriter::~ObCLogBaseFileWriter() destroy(); } -int ObCLogBaseFileWriter::init( - const char* log_dir, const char* shm_path, const uint32_t align_size, const ObILogFileStore* file_store) +int ObCLogBaseFileWriter::init(const char *log_dir, + const uint32_t align_size, const ObILogFileStore *file_store) { int ret = OB_SUCCESS; if (IS_INIT) { ret = OB_INIT_TWICE; CLOG_LOG(WARN, "already inited", K(ret)); - } else if (OB_ISNULL(log_dir) || OB_ISNULL(shm_path) || OB_ISNULL(file_store)) { + } else if (OB_ISNULL(log_dir) || OB_ISNULL(file_store)) { ret = OB_INVALID_ARGUMENT; - CLOG_LOG(WARN, "invalid argument", K(ret), K(log_dir), K(align_size), KP(file_store)); - } else if (OB_FAIL(ObBaseLogBufferMgr::get_instance().get_buffer(shm_path, log_ctrl_))) { + CLOG_LOG(WARN, "invalid param", K(ret), K(log_dir), K(align_size), KP(file_store)); + } else if (OB_ISNULL(aligned_data_buf_ = (char*) ob_malloc_align( + align_size, CLOG_MAX_WRITE_BUFFER_SIZE, "CLogFileWriter"))) { + ret = OB_ALLOCATE_MEMORY_FAILED; CLOG_LOG(WARN, "get log buf failed", K(ret), K(log_dir)); } else { - shm_buf_ = log_ctrl_->base_buf_; - shm_data_buf_ = log_ctrl_->data_buf_; log_dir_[sizeof(log_dir_) - 1] = '\0'; (void)snprintf(log_dir_, sizeof(log_dir_) - 1, log_dir); align_size_ = align_size; store_ = const_cast(file_store); } + return OB_SUCCESS; } void ObCLogBaseFileWriter::destroy() { is_inited_ = false; - log_ctrl_ = NULL; - shm_buf_ = NULL; - shm_data_buf_ = NULL; + if (nullptr != aligned_data_buf_) { + ob_free_align(aligned_data_buf_); + aligned_data_buf_ = nullptr; + } buf_write_pos_ = 0; file_offset_ = 0; buf_padding_size_ = 0; @@ -104,7 +104,6 @@ int ObCLogLocalFileWriter::load_file(uint32_t& file_id, uint32_t& offset, bool e { UNUSED(enable_pre_creation); int ret = OB_SUCCESS; - ObAtomicFilePos file_pos; if (IS_NOT_INIT) { ret = OB_NOT_INIT; @@ -118,122 +117,23 @@ int ObCLogLocalFileWriter::load_file(uint32_t& file_id, uint32_t& offset, bool e } else if (OB_FAIL(store_->open(file_id))) { CLOG_LOG(WARN, "open file failed", K(file_id), K(ret)); } else { - file_id_ = file_id; - file_pos.file_id_ = file_id; - file_pos.file_offset_ = offset; - CLOG_LOG(INFO, "load start, ", K(file_pos), K(shm_buf_->file_write_pos_), K(shm_buf_->file_flush_pos_)); - - if (0 == shm_buf_->file_flush_pos_.atomic_ || 0 == shm_buf_->file_write_pos_.atomic_) { - // first start or start after server restart - ATOMIC_STORE(&shm_buf_->file_flush_pos_.atomic_, file_pos.atomic_); - ATOMIC_STORE(&shm_buf_->file_write_pos_.atomic_, file_pos.atomic_); - } else if (shm_buf_->file_write_pos_.file_id_ + 1 == file_pos.file_id_ || - shm_buf_->file_flush_pos_.file_id_ + 1 == file_pos.file_id_) { - // observer restart just after creating new_file() - if (0 != file_pos.file_offset_ || shm_buf_->file_write_pos_ < shm_buf_->file_flush_pos_) { - ret = OB_ERR_UNEXPECTED; - CLOG_LOG(ERROR, - "The clog new file start pos is unexpected, ", - K(ret), - K(file_pos), - K(shm_buf_->file_flush_pos_), - K(shm_buf_->file_write_pos_), - K(shm_buf_->log_dir_)); - } else { - ATOMIC_STORE(&shm_buf_->file_write_pos_.atomic_, file_pos.atomic_); - ATOMIC_STORE(&shm_buf_->file_flush_pos_.atomic_, file_pos.atomic_); - CLOG_LOG(INFO, "Success to sync new file pos", K(file_pos)); - } - } else { - // start after observer process restart and there is data in share memory - if (shm_buf_->file_flush_pos_.file_id_ != file_pos.file_id_ || - shm_buf_->file_write_pos_.file_id_ != file_pos.file_id_ || shm_buf_->file_flush_pos_ > file_pos || - shm_buf_->file_write_pos_ < shm_buf_->file_flush_pos_) { - ret = OB_ERR_UNEXPECTED; - CLOG_LOG(ERROR, - "The clog start pos is unexpected, ", - K(ret), - K(file_id), - K(offset), - K(shm_buf_->file_flush_pos_), - K(shm_buf_->file_write_pos_), - K(shm_buf_->log_dir_)); - } else if (shm_buf_->file_write_pos_ > shm_buf_->file_flush_pos_) { - // the write buffer is not flushed, need flush - // if need alignment, also include previous unaligned part + padding part - buf_write_pos_ = shm_buf_->file_write_pos_.file_offset_ - shm_buf_->file_flush_pos_.file_offset_; - if (need_align()) { - buf_write_pos_ += (shm_buf_->file_flush_pos_.file_offset_ % align_size_); - buf_write_pos_ += ObPaddingEntry::get_padding_size(buf_write_pos_); - } - file_offset_ = shm_buf_->file_flush_pos_.file_offset_; - - CLOG_LOG(INFO, - "Flush remaining buf, ", - K(ret), - K(file_id), - K(offset), - K(shm_buf_->file_flush_pos_), - K(shm_buf_->file_write_pos_), - K(shm_buf_->log_dir_), - K_(file_offset), - K_(buf_write_pos)); - - if (buf_write_pos_ > shm_buf_->buf_len_) { - ret = OB_ERR_UNEXPECTED; - CLOG_LOG(ERROR, - "The buf pos is unexpected, ", - K(ret), - K_(buf_write_pos), - K(shm_buf_->buf_len_), - K(shm_buf_->file_flush_pos_), - K(shm_buf_->file_write_pos_), - K(shm_buf_->log_dir_)); - } else if (OB_FAIL(flush_buf())) { - CLOG_LOG(ERROR, - "Fail to flush share memory buffer to log file, ", - K(ret), - K(errno), - K_(buf_write_pos), - K(shm_buf_->log_dir_)); - } else { - ATOMIC_STORE(&shm_buf_->file_flush_pos_.atomic_, shm_buf_->file_write_pos_.atomic_); - CLOG_LOG(INFO, "Success to flush log buf to file!"); - } - } else { - // equal write and flush pos - } - } - } - - if (OB_SUCC(ret)) { - if (shm_buf_->file_flush_pos_ < file_pos) { - ATOMIC_STORE(&shm_buf_->file_flush_pos_.atomic_, file_pos.atomic_); - ATOMIC_STORE(&shm_buf_->file_write_pos_.atomic_, file_pos.atomic_); - } + CLOG_LOG(INFO, "load start", K(file_id), K(offset)); } // Load last time unaligned part if it is aligned system // Append only system is not needed, so just reset buf empty if (OB_SUCC(ret)) { if (need_align()) { - buf_write_pos_ = shm_buf_->file_flush_pos_.file_offset_ % align_size_; + buf_write_pos_ = offset % align_size_; int64_t read_size = 0; - if (buf_write_pos_ > 0 && OB_FAIL(store_->read(shm_data_buf_, - align_size_, - lower_align(shm_buf_->file_flush_pos_.file_offset_, align_size_), - read_size))) { - CLOG_LOG(ERROR, - "Fail to read data from log file, ", - K(ret), - K_(buf_write_pos), - K(shm_buf_->file_flush_pos_.file_offset_), - K(shm_buf_->log_dir_)); + if (buf_write_pos_ > 0 + && OB_FAIL(store_->read(aligned_data_buf_, align_size_, lower_align(offset, align_size_), read_size))) { + CLOG_LOG(ERROR, "Fail to read data from log file, ", K(ret), K(buf_write_pos_), K(offset)); } else if (read_size != align_size_) { - CLOG_LOG(INFO, "Log file size is not aligned. ", K(read_size), K_(align_size), K(shm_buf_->file_flush_pos_)); + CLOG_LOG(INFO, "Log file size is not aligned. ", K(read_size), K(align_size_)); } else { - CLOG_LOG( - INFO, "Read data from log file to shared memory buffer, ", K(buf_write_pos_), K(shm_buf_->file_flush_pos_)); + CLOG_LOG(INFO, "Read data from log file to shared memory buffer, ", K(buf_write_pos_), + K(file_id), K(offset)); } } else { reset_buf(); @@ -243,7 +143,8 @@ int ObCLogLocalFileWriter::load_file(uint32_t& file_id, uint32_t& offset, bool e if (OB_FAIL(ret)) { CLOG_LOG(WARN, "log writer start failed", K(ret), K(file_id), K(offset)); } else { - file_offset_ = shm_buf_->file_flush_pos_.file_offset_; + file_id_ = file_id; + file_offset_ = offset; CLOG_LOG(INFO, "load success", K(file_id), K(offset)); } return ret; @@ -255,24 +156,22 @@ int ObCLogBaseFileWriter::append_trailer_entry(const uint32_t info_block_offset) ObLogFileTrailer trailer; int64_t pos = 0; const file_id_t phy_file_id = file_id_ + 1; + // build trailer from last 512 byte offset (4096-512) int64_t trailer_pos = CLOG_DIO_ALIGN_SIZE - CLOG_TRAILER_SIZE; - char* buf = shm_data_buf_ + trailer_pos; + char *buf = aligned_data_buf_ + trailer_pos; reset_buf(); - if (CLOG_TRAILER_OFFSET != file_offset_) { // Defense code + if (CLOG_TRAILER_OFFSET != file_offset_) { //Defense code ret = OB_ERR_UNEXPECTED; - CLOG_LOG(WARN, "file_offset_ mismatch trailer offset", K(ret), K_(file_offset), LITERAL_K(CLOG_TRAILER_OFFSET)); - } else if (OB_FAIL(trailer.build_serialized_trailer(buf, CLOG_TRAILER_SIZE, info_block_offset, phy_file_id, pos))) { - CLOG_LOG(WARN, - "build_serialized_trailer fail", - K(ret), - LITERAL_K(CLOG_DIO_ALIGN_SIZE), - K(info_block_offset), - K_(file_id), - K(phy_file_id)); + CLOG_LOG(WARN, "file_offset_ mismatch trailer offset", K(ret), K_(file_offset), + LITERAL_K(CLOG_TRAILER_OFFSET)); + } else if (OB_FAIL(trailer.build_serialized_trailer(buf, CLOG_TRAILER_SIZE, info_block_offset, + phy_file_id, pos))) { + CLOG_LOG(WARN, "build_serialized_trailer fail", K(ret), LITERAL_K(CLOG_DIO_ALIGN_SIZE), + K(info_block_offset), K_(file_id), K(phy_file_id)); } else { - buf_write_pos_ += (uint32_t)CLOG_DIO_ALIGN_SIZE; + buf_write_pos_ += (uint32_t) CLOG_DIO_ALIGN_SIZE; } return ret; @@ -281,20 +180,16 @@ int ObCLogBaseFileWriter::append_trailer_entry(const uint32_t info_block_offset) int ObCLogBaseFileWriter::flush_trailer_entry() { int ret = OB_SUCCESS; - if (CLOG_TRAILER_OFFSET != file_offset_) { // Defense code + if (CLOG_TRAILER_OFFSET != file_offset_) { // Defense code ret = OB_ERR_UNEXPECTED; CLOG_LOG(WARN, "file offset mismatch", K_(file_offset), LITERAL_K(CLOG_TRAILER_OFFSET)); } else if (CLOG_DIO_ALIGN_SIZE != buf_write_pos_) { ret = OB_ERR_UNEXPECTED; CLOG_LOG(WARN, "buf write position mismatch", K_(buf_write_pos), LITERAL_K(CLOG_DIO_ALIGN_SIZE)); - } else if (OB_FAIL(store_->write(shm_data_buf_, buf_write_pos_, CLOG_TRAILER_ALIGN_WRITE_OFFSET))) { - CLOG_LOG(ERROR, - "write fail", - K(ret), - K(buf_write_pos_), - K_(file_offset), - LITERAL_K(CLOG_TRAILER_ALIGN_WRITE_OFFSET), - K(errno)); + } else if (OB_FAIL(store_->write(aligned_data_buf_, buf_write_pos_, CLOG_TRAILER_ALIGN_WRITE_OFFSET))) { + // no retry + CLOG_LOG(ERROR, "write fail", K(ret), K(buf_write_pos_), K_(file_offset), + LITERAL_K(CLOG_TRAILER_ALIGN_WRITE_OFFSET), K(errno)); } return ret; } @@ -303,11 +198,11 @@ int ObCLogBaseFileWriter::append_info_block_entry(ObIInfoBlockHandler* info_gett { int ret = OB_SUCCESS; ObLogBlockMetaV2 meta; - uint32_t block_meta_len = (uint32_t)meta.get_serialize_size(); - int64_t buf_len = shm_buf_->buf_len_ - block_meta_len; + const uint32_t block_meta_len = (uint32_t) meta.get_serialize_size(); + const int64_t buf_len = CLOG_MAX_WRITE_BUFFER_SIZE - block_meta_len; int64_t data_len = 0; int64_t pos = 0; - char* buf = shm_data_buf_ + block_meta_len; + char* buf = aligned_data_buf_ + block_meta_len; reset_buf(); @@ -316,7 +211,7 @@ int ObCLogBaseFileWriter::append_info_block_entry(ObIInfoBlockHandler* info_gett // build_info_block will reset flying info_block for next file CLOG_LOG(WARN, "read partition meta fail", K(ret), KP(buf), K(buf_len), K_(file_offset), K_(buf_padding_size)); - } else if (OB_FAIL(meta.build_serialized_block(shm_data_buf_, block_meta_len, buf, data_len, OB_INFO_BLOCK, pos))) { + } else if (OB_FAIL(meta.build_serialized_block(aligned_data_buf_, block_meta_len, buf, data_len, OB_INFO_BLOCK, pos))) { CLOG_LOG(WARN, "build serialized block fail", K(ret), K_(file_offset), K_(buf_padding_size)); } else { buf_write_pos_ += (block_meta_len + (uint32_t)data_len); @@ -346,12 +241,6 @@ int ObCLogLocalFileWriter::create_next_file() } else { ++file_id_; file_offset_ = 0; - ObAtomicFilePos file_pos; - file_pos.file_id_ = file_id_; - file_pos.file_offset_ = file_offset_; - - ATOMIC_STORE(&shm_buf_->file_write_pos_.atomic_, file_pos.atomic_); - ATOMIC_STORE(&shm_buf_->file_flush_pos_.atomic_, file_pos.atomic_); // fild_id is 32 bits. The first clog file_id is 1 and each clog file size is 64MB. // Suppose the maximum log write throughput is 1GB/s. @@ -373,16 +262,12 @@ int ObCLogBaseFileWriter::append_padding_entry(const uint32_t padding_size) if (padding_size > 0) { int64_t serialize_pos = 0; ObPaddingEntry padding_entry; - char* buf = shm_data_buf_ + buf_write_pos_; + char* buf = aligned_data_buf_ + buf_write_pos_; - if (buf_write_pos_ + padding_size > shm_buf_->buf_len_) { + if (buf_write_pos_ + padding_size > CLOG_MAX_WRITE_BUFFER_SIZE) { ret = OB_BUF_NOT_ENOUGH; - CLOG_LOG(WARN, - "padding entry size over buf length", - K(ret), - K(padding_size), - K_(buf_write_pos), - K(shm_buf_->buf_len_)); + CLOG_LOG(WARN, "padding entry size over buf length", K(ret), K(padding_size), + K_(buf_write_pos), LITERAL_K(CLOG_MAX_WRITE_BUFFER_SIZE)); } else if (OB_FAIL(padding_entry.set_entry_size(padding_size))) { CLOG_LOG(WARN, "padding entry set size error", K(ret), K(padding_size)); } else if (OB_FAIL(padding_entry.serialize(buf, padding_size, serialize_pos))) { @@ -395,7 +280,7 @@ int ObCLogBaseFileWriter::append_padding_entry(const uint32_t padding_size) return ret; } -int ObCLogBaseFileWriter::cache_buf(ObLogCache* log_cache, const char* buf, const uint32_t buf_len) +int ObCLogBaseFileWriter::cache_buf(ObLogCache *log_cache, const char *buf, const uint32_t buf_len) { int ret = OB_SUCCESS; if (OB_ISNULL(buf) || 0 == buf_len) { @@ -425,18 +310,12 @@ int ObCLogBaseFileWriter::append_log_entry(const char* item_buf, const uint32_t ret = OB_ERR_UNEXPECTED; CLOG_LOG(WARN, "file not start", K_(file_id), K(ret)); } else { - // copy log to share memory buffer - lib::ObMutexGuard buf_guard(log_ctrl_->buf_mutex_); - memcpy(shm_data_buf_ + buf_write_pos_, item_buf, len); - buf_write_pos_ += (uint32_t)len; + //copy log to memory buffer + memcpy(aligned_data_buf_ + buf_write_pos_, item_buf, len); + buf_write_pos_ += (uint32_t) len; if (OB_FAIL(align_buf())) { CLOG_LOG(ERROR, "fail to add padding, ", K(ret)); - } else { - ObAtomicFilePos file_pos; - file_pos.file_id_ = file_id_; - file_pos.file_offset_ = file_offset_ + len; - ATOMIC_STORE(&shm_buf_->file_write_pos_.atomic_, file_pos.atomic_); } } @@ -451,26 +330,12 @@ int ObCLogLocalFileWriter::flush( if (IS_NOT_INIT) { ret = OB_NOT_INIT; CLOG_LOG(WARN, "not inited", K(ret)); - } else if (file_id_ != shm_buf_->file_flush_pos_.file_id_ || - file_offset_ != shm_buf_->file_flush_pos_.file_offset_) { // Defense code - ret = OB_ERR_UNEXPECTED; - CLOG_LOG(ERROR, - "file position not match", - K(ret), - K_(file_id), - K_(file_offset), - K(shm_buf_->file_flush_pos_), - K(shm_buf_->file_write_pos_)); - } - - if (OB_SUCC(ret)) { + } else { flush_start_offset = file_offset_; if (OB_FAIL(flush_buf())) { CLOG_LOG(WARN, "Fail to flush clog to disk, ", K(ret)); } else { - lib::ObMutexGuard buf_guard(log_ctrl_->buf_mutex_); - ATOMIC_STORE(&shm_buf_->file_flush_pos_.atomic_, shm_buf_->file_write_pos_.atomic_); - file_offset_ = shm_buf_->file_write_pos_.file_offset_; + file_offset_ = lower_align(file_offset_, align_size_) + buf_write_pos_ - buf_padding_size_; truncate_buf(); } } @@ -494,14 +359,14 @@ int ObCLogLocalFileWriter::align_buf() } /// ObCLogLocalFileWriter /// -int ObCLogLocalFileWriter::init( - const char* log_dir, const char* shm_path, const uint32_t align_size, const ObILogFileStore* file_store) +int ObCLogLocalFileWriter::init(const char* log_dir, + const uint32_t align_size, const ObILogFileStore* file_store) { int ret = OB_SUCCESS; if (IS_INIT) { ret = OB_INIT_TWICE; CLOG_LOG(WARN, "already inited", K(ret)); - } else if (OB_FAIL(ObCLogBaseFileWriter::init(log_dir, shm_path, align_size, file_store))) { + } else if (OB_FAIL(ObCLogBaseFileWriter::init(log_dir, align_size, file_store))) { CLOG_LOG(WARN, "ObCLogBaseFileWriter init fail", K(ret)); } else if (NULL == (blank_buf_ = (char*)ob_malloc(OB_MAX_LOG_BUFFER_SIZE, ObModIds::OB_LOG_WRITER))) { ret = OB_ALLOCATE_MEMORY_FAILED; @@ -675,7 +540,7 @@ int ObCLogLocalFileWriter::end_current_file(ObIInfoBlockHandler* info_getter, Ob CLOG_LOG(WARN, "fail to add info block", K(ret), K(info_getter)); } else if (OB_FAIL(flush_buf())) { CLOG_LOG(WARN, "fail to flush info block", K(ret)); - } else if (OB_FAIL(cache_buf(log_cache, shm_data_buf_, buf_write_pos_))) { + } else if (OB_FAIL(cache_buf(log_cache, aligned_data_buf_, buf_write_pos_))) { CLOG_LOG(WARN, "fail to cache info block", K(ret)); } } @@ -689,14 +554,14 @@ int ObCLogLocalFileWriter::end_current_file(ObIInfoBlockHandler* info_getter, Ob // - Flush trailer entry to log file // - Cache trailer entry to log cache - char* trailer_buf = shm_data_buf_ + CLOG_DIO_ALIGN_SIZE - CLOG_TRAILER_SIZE; + char *trailer_buf = aligned_data_buf_ + CLOG_DIO_ALIGN_SIZE - CLOG_TRAILER_SIZE; if (OB_SUCC(ret)) { if (OB_FAIL(append_trailer_entry(info_block_offset))) { CLOG_LOG(WARN, "fail to add trailer", K(ret)); } else if (OB_FAIL(flush_trailer_entry())) { CLOG_LOG(WARN, "fail to flush trailer", K(ret)); } else if (OB_FAIL(cache_buf(log_cache, trailer_buf, CLOG_TRAILER_SIZE))) { - CLOG_LOG(WARN, "fail to cache trailer", K(ret), KP(trailer_buf), LITERAL_K(CLOG_TRAILER_SIZE)); + CLOG_LOG(WARN, "fail to cache trailer", K(ret)); } else if (CLOG_FILE_SIZE != file_offset_) { // Defense code ret = OB_ERR_UNEXPECTED; CLOG_LOG(WARN, "file_offset_ mismatch file size", K(ret), K_(file_offset)); @@ -730,7 +595,7 @@ int ObCLogLocalFileWriter::cache_last_padding_entry(ObLogCache* log_cache) padding_size = ObPaddingEntry::get_padding_size(file_offset_, align_size_); if (OB_FAIL(append_padding_entry(padding_size))) { CLOG_LOG(WARN, "inner add padding entry error", K(ret), K(padding_size)); - } else if (OB_FAIL(cache_buf(log_cache, shm_data_buf_, buf_write_pos_))) { + } else if (OB_FAIL(cache_buf(log_cache, aligned_data_buf_, buf_write_pos_))) { CLOG_LOG(WARN, "fail to cache last padding", K(ret)); } } @@ -740,16 +605,6 @@ int ObCLogLocalFileWriter::cache_last_padding_entry(ObLogCache* log_cache) if (need_align() && 0 != (file_offset_ % align_size_)) { ret = OB_ERR_UNEXPECTED; CLOG_LOG(WARN, "file offset not align", K(ret), K_(file_offset), K_(align_size)); - } else if (file_offset_ < shm_buf_->file_flush_pos_.file_offset_ || - file_id_ != shm_buf_->file_flush_pos_.file_id_ || - shm_buf_->file_flush_pos_ != shm_buf_->file_write_pos_) { - ret = OB_ERR_UNEXPECTED; - CLOG_LOG(WARN, - "file position mismatch", - K(ret), - K_(file_offset), - K(shm_buf_->file_flush_pos_), - K(shm_buf_->file_write_pos_)); } } @@ -797,7 +652,7 @@ void ObCLogLocalFileWriter::truncate_buf() tail_part_start = (uint32_t)lower_align(buf_write_pos_ - buf_padding_size_, align_size_); buf_write_pos_ = (buf_write_pos_ - buf_padding_size_) % align_size_; if (buf_write_pos_ > 0) { - memmove(shm_data_buf_, shm_data_buf_ + tail_part_start, buf_write_pos_); + memmove(aligned_data_buf_, aligned_data_buf_ + tail_part_start, buf_write_pos_); } } } @@ -817,7 +672,7 @@ int ObCLogLocalFileWriter::flush_buf() // buf has been pad and include last flush time remaining unaligned part file_write_pos = (uint32_t)lower_align(file_offset_, align_size_); } - if (OB_FAIL(store_->write(shm_data_buf_, buf_write_pos_, file_write_pos))) { + if (OB_FAIL(store_->write(aligned_data_buf_, buf_write_pos_, file_write_pos))) { CLOG_LOG(ERROR, "write fail", K(ret), K(buf_write_pos_), K(file_write_pos), K(errno)); } timer.finish_timer(__FILE__, __LINE__, CLOG_PERF_WARN_THRESHOLD); diff --git a/src/clog/ob_clog_file_writer.h b/src/clog/ob_clog_file_writer.h index b72432b439..d3f2a1d667 100644 --- a/src/clog/ob_clog_file_writer.h +++ b/src/clog/ob_clog_file_writer.h @@ -37,8 +37,8 @@ public: ObCLogBaseFileWriter(); virtual ~ObCLogBaseFileWriter(); - virtual int init( - const char* log_dir, const char* shm_path, const uint32_t align_size, const common::ObILogFileStore* file_store); + virtual int init(const char *log_dir, + const uint32_t align_size, const common::ObILogFileStore *file_store); virtual void destroy(); // When log engine start, need to flush remaining content in shared memory buffer to log file @@ -96,7 +96,7 @@ protected: int append_trailer_entry(const uint32_t info_block_offset); int flush_trailer_entry(); // append all data in buffer to log cache - int cache_buf(ObLogCache* log_cache, const char* buf, const uint32_t buf_len); + int cache_buf(ObLogCache *log_cache, const char *buf, const uint32_t buf_len); OB_INLINE bool need_align() const { @@ -109,9 +109,7 @@ protected: protected: bool is_inited_; - common::ObBaseLogBufferCtrl* log_ctrl_; - common::ObBaseLogBuffer* shm_buf_; - char* shm_data_buf_; + char *aligned_data_buf_; uint32_t buf_write_pos_; uint32_t file_offset_; // the last aligned part padding size of the buffer @@ -134,8 +132,8 @@ public: destroy(); } - virtual int init(const char* log_dir, const char* shm_path, const uint32_t align_size, - const common::ObILogFileStore* file_store) override; + virtual int init(const char *log_dir, + const uint32_t align_size, const common::ObILogFileStore *file_store) override; virtual void destroy(); virtual int load_file(uint32_t& file_id, uint32_t& offset, bool enable_pre_creation = false) override; diff --git a/src/clog/ob_clog_writer.cpp b/src/clog/ob_clog_writer.cpp index b93d1502a0..ad0043674e 100644 --- a/src/clog/ob_clog_writer.cpp +++ b/src/clog/ob_clog_writer.cpp @@ -179,8 +179,7 @@ void ObCLogWriter::set_clog_writer_thread_name() void ObCLogWriter::process_log_items(common::ObIBaseLogItem** items, const int64_t item_cnt, int64_t& finish_cnt) { int ret = OB_SUCCESS; - ObICLogItem* item = NULL; - int64_t sync_mode = ObServerConfig::get_instance().flush_log_at_trx_commit; + ObICLogItem *item = NULL; int64_t cur_time = 0; int64_t io_time = 0; int64_t flush_time = 0; @@ -205,13 +204,7 @@ void ObCLogWriter::process_log_items(common::ObIBaseLogItem** items, const int64 } else { const bool is_idempotent = false; const uint64_t write_len = block_meta_len + item->get_data_len(); - ObCLogDiskErrorCB* cb = NULL; - if (CLOG_DISK_SYNC != sync_mode && CLOG_MEM_SYNC != sync_mode) { - if (REACH_TIME_INTERVAL(1000 * 1000)) { - CLOG_LOG(WARN, "Not supported sync mode, ", K(sync_mode)); - } - sync_mode = CLOG_DISK_SYNC; - } + ObCLogDiskErrorCB *cb = NULL; lib::ObMutexGuard guard(file_mutex_); @@ -241,10 +234,6 @@ void ObCLogWriter::process_log_items(common::ObIBaseLogItem** items, const int64 CLOG_LOG(ERROR, "fail to add log item to buf, ", K(ret)); } } - // invoke callback when memory sync - if (OB_SUCC(ret) && CLOG_MEM_SYNC == sync_mode) { - after_flush(item, block_meta_len, ret, file_writer_->get_cur_file_len(), finish_cnt); - } int64_t flush_start_offset = -1; if (OB_SUCC(ret)) { @@ -268,10 +257,8 @@ void ObCLogWriter::process_log_items(common::ObIBaseLogItem** items, const int64 if (OB_SUCC(ret)) { io_time = ObTimeUtility::current_time() - cur_time; - // log flush succeed, invoke callback when disk sync - if (CLOG_DISK_SYNC == sync_mode) { - after_flush(item, block_meta_len, ret, flush_start_offset, finish_cnt); - } + //log flush succeed, invoke callback when disk sync + after_flush(item, block_meta_len, ret, flush_start_offset, finish_cnt); flush_time = ObTimeUtility::current_time() - cur_time - io_time; if (flush_time + io_time > 100 * 1000) { diff --git a/src/clog/ob_ilog_storage.cpp b/src/clog/ob_ilog_storage.cpp index 3a378990d0..a1ed9d17c4 100644 --- a/src/clog/ob_ilog_storage.cpp +++ b/src/clog/ob_ilog_storage.cpp @@ -51,8 +51,10 @@ void ObIlogAccessor::destroy() inited_ = false; } -int ObIlogAccessor::init(const char* dir_name, const char* shm_path, const int64_t server_seq, - const common::ObAddr& addr, ObLogCache* log_cache) +int ObIlogAccessor::init(const char *dir_name, + const int64_t server_seq, + const common::ObAddr &addr, + ObLogCache *log_cache) { int ret = OB_SUCCESS; const bool use_log_cache = true; @@ -69,8 +71,9 @@ int ObIlogAccessor::init(const char* dir_name, const char* shm_path, const int64 CSR_LOG(ERROR, "file_store_ init failed", K(ret)); } else if (OB_FAIL(file_id_cache_.init(server_seq, addr, this))) { CSR_LOG(ERROR, "file_id_cache_ init failed", K(ret)); - } else if (OB_FAIL(direct_reader_.init( - dir_name, shm_path, use_log_cache, log_cache, &log_tail_, ObLogWritePoolType::ILOG_WRITE_POOL))) { + } else if (OB_FAIL(direct_reader_.init(dir_name, nullptr/*no shared memory*/, use_log_cache, + log_cache, &log_tail_, + ObLogWritePoolType::ILOG_WRITE_POOL))) { CSR_LOG(ERROR, "direct_reader_ init failed", K(ret)); } else if (OB_FAIL(buffer_.init(OB_MAX_LOG_BUFFER_SIZE, CLOG_DIO_ALIGN_SIZE, ObModIds::OB_CLOG_INFO_BLK_HNDLR))) { CSR_LOG(ERROR, "buffer init failed", K(ret)); @@ -717,9 +720,12 @@ ObIlogStorage::~ObIlogStorage() destroy(); } -int ObIlogStorage::init(const char* dir_name, const char* shm_path, const int64_t server_seq, - const common::ObAddr& addr, ObLogCache* log_cache, ObPartitionService* partition_service, - ObCommitLogEnv* commit_log_env) +int ObIlogStorage::init(const char *dir_name, + const int64_t server_seq, + const common::ObAddr &addr, + ObLogCache *log_cache, + ObPartitionService *partition_service, + ObCommitLogEnv *commit_log_env) { int ret = OB_SUCCESS; @@ -733,16 +739,9 @@ int ObIlogStorage::init(const char* dir_name, const char* shm_path, const int64_ } else if (OB_ISNULL(dir_name) || OB_ISNULL(log_cache) || OB_ISNULL(partition_service) || OB_ISNULL(commit_log_env) || OB_UNLIKELY(server_seq < 0 || !addr.is_valid())) { ret = OB_INVALID_ARGUMENT; - CSR_LOG(ERROR, - "invalid argument", - KR(ret), - KP(dir_name), - KP(log_cache), - KP(partition_service), - KP(commit_log_env), - K(server_seq), - K(addr)); - } else if (OB_FAIL(ObIlogAccessor::init(dir_name, shm_path, server_seq, addr, log_cache))) { + CSR_LOG(ERROR, "invalid argument", KR(ret), KP(dir_name), KP(log_cache), KP(partition_service), + KP(commit_log_env), K(server_seq), K(addr)); + } else if (OB_FAIL(ObIlogAccessor::init(dir_name, server_seq, addr, log_cache))) { CSR_LOG(ERROR, "failed to init ObIlogAccessor", K(ret)); } else if (OB_FAIL(init_next_ilog_file_id_(next_ilog_file_id))) { CSR_LOG(ERROR, "get_next_ilog_file_id failed", K(ret)); diff --git a/src/clog/ob_ilog_storage.h b/src/clog/ob_ilog_storage.h index 8aa0066e32..23cce8d174 100644 --- a/src/clog/ob_ilog_storage.h +++ b/src/clog/ob_ilog_storage.h @@ -40,8 +40,10 @@ public: virtual void destroy(); public: - int init(const char* dir_name, const char* shm_path, const int64_t server_seq, const common::ObAddr& addr, - ObLogCache* log_cache); + int init(const char *dir_name, + const int64_t server_seq, + const common::ObAddr &addr, + ObLogCache *log_cache); int add_partition_needed_to_file_id_cache( const common::ObPartitionKey& partition_key, const uint64_t last_replay_log_id); @@ -100,8 +102,12 @@ public: ~ObIlogStorage(); public: - int init(const char* dir_name, const char* shm_path, const int64_t server_seq, const common::ObAddr& addr, - ObLogCache* log_cache, storage::ObPartitionService* partition_service, ObCommitLogEnv* commit_log_env); + int init(const char *dir_name, + const int64_t server_seq, + const common::ObAddr &addr, + ObLogCache *log_cache, + storage::ObPartitionService *partition_service, + ObCommitLogEnv *commit_log_env); void destroy(); int start(); void stop(); diff --git a/src/clog/ob_log_define.h b/src/clog/ob_log_define.h index f6cfd39cd8..d2422bc88f 100644 --- a/src/clog/ob_log_define.h +++ b/src/clog/ob_log_define.h @@ -44,6 +44,7 @@ const int64_t CLOG_CACHE_SIZE = 64 * 1024; const int64_t CLOG_REPLAY_CHECKSUM_WINDOW_SIZE = 1 << 9; const int64_t CLOG_INFO_BLOCK_SIZE_LIMIT = 1 << 22; const offset_t OB_INVALID_OFFSET = -1; +const int64_t CLOG_MAX_WRITE_BUFFER_SIZE = 2 << 20; inline bool is_valid_log_id(const uint64_t log_id) { diff --git a/src/clog/ob_log_engine.cpp b/src/clog/ob_log_engine.cpp index 74c74d7cf5..99397d7abd 100644 --- a/src/clog/ob_log_engine.cpp +++ b/src/clog/ob_log_engine.cpp @@ -94,10 +94,14 @@ int ObLogEnv::init(const Config& cfg, const ObAddr& self_addr, ObIInfoBlockHandl } else if (NULL == (file_store_ = ObLogStoreFactory::create(cfg.log_dir_, cfg.file_size_, write_pool_type))) { ret = OB_INIT_FAIL; CLOG_LOG(WARN, "create file store failed.", K(ret)); - } else if (OB_FAIL(direct_reader_.init( - cfg.log_dir_, cfg.log_shm_path_, use_log_cache, &log_cache_, &log_tail_, write_pool_type))) { + } else if (OB_FAIL(direct_reader_.init(cfg.log_dir_, + nullptr/*no shared memory*/, + use_log_cache, + &log_cache_, + &log_tail_, + write_pool_type))) { CLOG_LOG(WARN, "direct reader init error", K(ret), K(enable_log_cache), K(write_pool_type)); - } else if (OB_FAIL(init_log_file_writer(cfg.log_dir_, cfg.log_shm_path_, file_store_))) { + } else if (OB_FAIL(init_log_file_writer(cfg.log_dir_, file_store_))) { CLOG_LOG(WARN, "Fail to init log file writer ", K(ret)); } else { // do nothing @@ -334,14 +338,14 @@ bool ObLogEnv::cluster_version_before_2000_() const return GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_2000; } -int ObLogEnv::init_log_file_writer(const char* log_dir, const char* shm_path, const ObILogFileStore* file_store) +int ObLogEnv::init_log_file_writer(const char *log_dir, const ObILogFileStore *file_store) { int ret = OB_SUCCESS; if (nullptr == (log_file_writer_ = static_cast(OB_NEW(ObCLogLocalFileWriter, ObModIds::OB_LOG_WRITER)))) { ret = OB_ALLOCATE_MEMORY_FAILED; CLOG_LOG(WARN, "alloc file writer failed, ", K(ret)); - } else if (OB_FAIL(log_file_writer_->init(log_dir, shm_path, CLOG_DIO_ALIGN_SIZE, file_store))) { + } else if (OB_FAIL(log_file_writer_->init(log_dir, CLOG_DIO_ALIGN_SIZE, file_store))) { CLOG_LOG(WARN, "Fail to init file writer, ", K(ret)); } @@ -758,13 +762,13 @@ int ObLogEngine::init(const ObLogEnv::Config& cfg, const ObAddr& self_addr, obrp } else if (OB_FAIL(ilog_log_cache_.init( self_addr, cfg.index_cache_name_, cfg.index_cache_priority_, ilog_hot_cache_size))) { CLOG_LOG(WARN, "failed to init ilog_log_cache", K(ret)); - } else if (OB_FAIL(ilog_storage_.init(cfg.index_log_dir_, - cfg.index_log_shm_path_, - server_seq, - self_addr, - &ilog_log_cache_, - partition_service, - &clog_env_))) { + } else if (OB_FAIL(ilog_storage_.init( + cfg.index_log_dir_, + server_seq, + self_addr, + &ilog_log_cache_, + partition_service, + &clog_env_))) { CLOG_LOG(WARN, "ilog_storage_ init failed", K(ret)); } else { batch_rpc_ = batch_rpc; diff --git a/src/clog/ob_log_engine.h b/src/clog/ob_log_engine.h index 586d48049a..2f5261270c 100644 --- a/src/clog/ob_log_engine.h +++ b/src/clog/ob_log_engine.h @@ -143,7 +143,7 @@ protected: // the clog_writer is returned to be busy static const int64_t BUFFER_ITEM_CONGESTED_PERCENTAGE = 50; bool cluster_version_before_2000_() const; - int init_log_file_writer(const char* log_dir, const char* shm_path, const ObILogFileStore* file_store); + int init_log_file_writer(const char *log_dir, const ObILogFileStore *file_store); bool is_inited_; Config config_; diff --git a/src/clog/ob_log_entry.cpp b/src/clog/ob_log_entry.cpp index 11af4dad0a..ad7e081701 100644 --- a/src/clog/ob_log_entry.cpp +++ b/src/clog/ob_log_entry.cpp @@ -167,7 +167,7 @@ DEFINE_DESERIALIZE(ObLogEntry) CLOG_LOG(TRACE, "header deserialize error", K(ret), K(data_len), K(new_pos)); } else if (data_len - new_pos < header_.get_data_len()) { ret = OB_DESERIALIZE_ERROR; - CLOG_LOG(TRACE, "buf is not enough to deserialize clog entry buf", K(ret)); + CLOG_LOG(TRACE, "buf is not enough to deserialize clog entry buf", K(ret), K(header_), K(data_len), K(new_pos)); } else if (header_.get_data_len() < 0) { ret = OB_INVALID_DATA; CLOG_LOG(WARN, "get invalid data len", K(ret), "data_len", header_.get_data_len()); diff --git a/unittest/clog/test_clog_writer.cpp b/unittest/clog/test_clog_writer.cpp index e560457ba8..d9d3cb5697 100644 --- a/unittest/clog/test_clog_writer.cpp +++ b/unittest/clog/test_clog_writer.cpp @@ -26,6 +26,7 @@ #include "share/cache/ob_kv_storecache.h" #include "share/ob_tenant_mgr.h" #include "observer/ob_server_struct.h" +#include "share/redolog/ob_log_file_reader.h" #include #include @@ -146,7 +147,6 @@ public: protected: char log_path_[1024]; - char shm_path_[1024]; char* log_buf_; ObLogDir log_dir_; ObLogWriteFilePool write_pool_; @@ -163,8 +163,6 @@ TestCLogWriter::TestCLogWriter() { getcwd(log_path_, 1024); strcat(log_path_, "/test_clog"); - getcwd(shm_path_, 1024); - strcat(shm_path_, "/test_clog/shm_buf"); clog_cfg_.log_file_writer_ = &log_file_writer_; clog_cfg_.log_cache_ = &log_cache_; clog_cfg_.tail_ptr_ = &tail_cursor_; @@ -189,6 +187,7 @@ void TestCLogWriter::SetUp() common::ObAddr addr(ObAddr::VER::IPV4, "100.81.152.48", 2828); GCTX.self_addr_ = addr; + system("rm -rf ./test_clog"); system("mkdir test_clog"); const int64_t hot_cache_size = 1L << 27; ret = log_cache_.init(addr, "clog_cache", 1, hot_cache_size); @@ -200,10 +199,11 @@ void TestCLogWriter::SetUp() ASSERT_EQ(OB_SUCCESS, ret); ret = file_store_.init(log_path_, CLOG_FILE_SIZE, clog_cfg_.type_); ASSERT_EQ(OB_SUCCESS, ret); - ret = log_file_writer_.init(log_path_, shm_path_, CLOG_DIO_ALIGN_SIZE, &file_store_); + ret = log_file_writer_.init(log_path_, CLOG_DIO_ALIGN_SIZE, &file_store_); ASSERT_EQ(OB_SUCCESS, ret); ret = clog_writer_.init(clog_cfg_); ASSERT_EQ(OB_SUCCESS, ret); + ret = OB_LOG_FILE_READER.init(); } void TestCLogWriter::TearDown() @@ -290,16 +290,38 @@ TEST_F(TestCLogWriter, border) ret = clog_writer_.start(file_id, offset); ASSERT_EQ(OB_SUCCESS, ret); + const ObPartitionKey partition_key(1, 3001, 1); + // normal write while (true) { if (offset > CLOG_MAX_DATA_OFFSET - 2 * 1024 * 1024) { log_item.data_len_ = 1024 * 1024 + 512 * 1024; } else { - log_item.data_len_ = ObRandom::rand(1, 1024 * 1024); + log_item.data_len_ = ObRandom::rand(1024, 1024 * 1024); } log_item.is_flushed_ = false; memset(log_item.buf_, (uint8_t)ObRandom::rand(100, 132), log_item.data_len_); + + ObLogEntryHeader header; + common::ObProposalID rts; + rts.ts_ = ObTimeUtility::fast_current_time(); + int64_t pos = 0; + const int64_t header_size = header.get_serialize_size(); + ret = header.generate_header(OB_LOG_SUBMIT, partition_key, + 1, log_item.buf_ + header_size, log_item.data_len_ - header_size, + ObTimeUtility::fast_current_time(), + ObTimeUtility::fast_current_time(), + rts, + ObTimeUtility::fast_current_time(), + ObVersion(0), true); + ASSERT_EQ(common::OB_SUCCESS, ret); + ret = header.serialize(log_item.buf_, log_item.data_len_, pos); + ASSERT_EQ(common::OB_SUCCESS, ret); + ASSERT_EQ(header_size, pos); + + CLOG_LOG(INFO, "flush one item start", K(file_id), K(offset), K(log_item.data_len_)); + ret = clog_writer_.append_log(log_item); ASSERT_EQ(OB_SUCCESS, ret); log_item.wait(); @@ -316,25 +338,43 @@ TEST_F(TestCLogWriter, border) offset = (uint32_t)log_item.data_len_ + static_cast(block_meta_size); } - CLOG_LOG(INFO, "flush one item, ", K(offset), K(log_item.offset_), K(log_item.data_len_), K(file_id)); + CLOG_LOG(INFO, "flush one item end", K(offset), K(log_item.offset_), K(log_item.data_len_), K(file_id)); if (file_id > 3) { break; } } - // start with earlier pos - ObBaseLogBufferCtrl* log_ctrl = NULL; - ret = ObBaseLogBufferMgr::get_instance().get_buffer(shm_path_, log_ctrl); - ASSERT_EQ(OB_SUCCESS, ret); clog_writer_.destroy(); log_file_writer_.reset(); - log_ctrl->base_buf_->file_flush_pos_.file_offset_ = 0; + int64_t test_lower = lower_align(4096, 4096); + int64_t test_upper = upper_align(4096, 4096); + CLOG_LOG(INFO, "cooper", K(test_lower), K(test_upper)); + ASSERT_EQ(4096, test_lower); + ASSERT_EQ(4096, test_upper); + + //try locate last time write position + ObLogDirectReader log_reader; + ObLogCache log_cache; + ObTailCursor tail_cursor; + file_id_t restart_file_id = 0; + offset_t restart_file_id_offset = 0; + + ret = log_reader.init(log_path_, nullptr, true, &log_cache_, &tail_cursor, ObLogWritePoolType::CLOG_WRITE_POOL); + ASSERT_EQ(OB_SUCCESS, ret); + + ret = locate_clog_tail(1000000000, &file_store_, &log_reader, restart_file_id, restart_file_id_offset); + ASSERT_EQ(OB_SUCCESS, ret); + + offset = upper_align(offset, CLOG_DIO_ALIGN_SIZE); + ASSERT_EQ(file_id, restart_file_id); + ASSERT_EQ(offset, restart_file_id_offset); + + // continue write clog ret = clog_writer_.init(clog_cfg_); ASSERT_EQ(OB_SUCCESS, ret); - offset_t new_offset = 0; - ret = clog_writer_.start(file_id, new_offset); + ret = clog_writer_.start(restart_file_id, restart_file_id_offset); ASSERT_EQ(OB_SUCCESS, ret); log_item.data_len_ = ObRandom::rand(1, 1024 * 1024); log_item.is_flushed_ = false; @@ -344,6 +384,9 @@ TEST_F(TestCLogWriter, border) log_item.wait(); ASSERT_EQ(OB_SUCCESS, log_item.err_code_); ASSERT_EQ(offset + static_cast(block_meta_size), log_item.offset_); + + log_reader.destroy(); + log_cache.destroy(); } TEST_F(TestCLogWriter, errsim_aio_timeout) @@ -393,75 +436,12 @@ TEST_F(TestCLogWriter, errsim_aio_timeout) usleep(100 * 1000); #endif } - -TEST_F(TestCLogWriter, crash) -{ - int ret = OB_SUCCESS; - int fd = 0; - ObBaseLogBufferCtrl* log_ctrl = NULL; - ObBaseLogBuffer* shm_buf = NULL; - file_id_t file_id = 1; - offset_t offset = 0; - ObAtomicFilePos file_pos; - int64_t write_len = 4096 * 8; - - ret = clog_writer_.start(file_id, offset); - ASSERT_EQ(OB_SUCCESS, ret); - - ret = ObBaseLogBufferMgr::get_instance().get_buffer(shm_path_, log_ctrl); - ASSERT_EQ(OB_SUCCESS, ret); - shm_buf = log_ctrl->base_buf_; - - // set first 32K data - memset(log_ctrl->data_buf_, (uint8_t)ObRandom::rand(100, 132), write_len); - file_pos.file_id_ = file_id; - file_pos.file_offset_ = (uint32_t)write_len; - ATOMIC_STORE(&shm_buf->file_write_pos_.atomic_, file_pos.atomic_); - - clog_writer_.destroy(); - log_file_writer_.reset(); - ret = clog_writer_.init(clog_cfg_); - ASSERT_EQ(OB_SUCCESS, ret); - file_id = file_pos.file_id_; - offset = file_pos.file_offset_; - ret = clog_writer_.start(file_id, offset); - ASSERT_EQ(OB_SUCCESS, ret); - ASSERT_EQ(shm_buf->file_flush_pos_.file_id_, file_id); - ASSERT_EQ(shm_buf->file_flush_pos_.file_offset_, write_len); - ret = write_pool_.get_fd(file_pos.file_id_, fd); - ASSERT_EQ(OB_SUCCESS, ret); - - char* read_buf = (char*)ob_malloc_align(4096, write_len); - ob_pread(fd, read_buf, write_len, 0); - ASSERT_TRUE(0 == memcmp(log_ctrl->data_buf_, read_buf, write_len)); - - // continue next 32K data and let writer catch up - memset(log_ctrl->data_buf_, (uint8_t)ObRandom::rand(132, 164), write_len); - file_pos.file_id_ = file_id; - file_pos.file_offset_ += (uint32_t)write_len; - ATOMIC_STORE(&shm_buf->file_write_pos_.atomic_, file_pos.atomic_); - - clog_writer_.destroy(); - log_file_writer_.reset(); - ret = clog_writer_.init(clog_cfg_); - ASSERT_EQ(OB_SUCCESS, ret); - file_id = file_pos.file_id_; - offset = file_pos.file_offset_; - ret = clog_writer_.start(file_id, offset); - ASSERT_EQ(OB_SUCCESS, ret); - ASSERT_EQ(shm_buf->file_flush_pos_.file_id_, file_id); - ASSERT_EQ(shm_buf->file_flush_pos_.file_offset_, write_len + write_len); - - ob_pread(fd, read_buf, write_len, write_len); - ASSERT_TRUE(0 == memcmp(log_ctrl->data_buf_, read_buf, write_len)); - - ob_free_align(read_buf); -} -} // end namespace unittest -} // end namespace oceanbase +} // end namespace unittest +} // end namespace oceanbase int main(int argc, char** argv) { + system("rm -f test_clog_writer.log*"); OB_LOGGER.set_file_name("test_clog_writer.log", true); OB_LOGGER.set_log_level("DEBUG"); diff --git a/unittest/clog/test_ob_raw_entry_iterator.cpp b/unittest/clog/test_ob_raw_entry_iterator.cpp index 0282d6c9ab..3b3223c6b1 100644 --- a/unittest/clog/test_ob_raw_entry_iterator.cpp +++ b/unittest/clog/test_ob_raw_entry_iterator.cpp @@ -401,9 +401,9 @@ TEST_F(TestObRawLogIterator, test_read_data) int64_t header_len = meta.get_serialize_size() + header_offset; ObLogDir log_dir; - ObTailCursor* tail = new ObTailCursor(); + ObTailCursor tail; EXPECT_EQ(OB_SUCCESS, log_dir.init(path)); - EXPECT_EQ(OB_SUCCESS, reader.init(path, shm_path, true, &cache, tail, ObLogWritePoolType::CLOG_WRITE_POOL)); + EXPECT_EQ(OB_SUCCESS, reader.init(path, shm_path, true, &cache, &tail, ObLogWritePoolType::CLOG_WRITE_POOL)); EXPECT_EQ(OB_SUCCESS, write_file()); // Test init failed -- GitLab