diff --git a/deps/oblib/src/lib/container/ob_ext_ring_buffer.h b/deps/oblib/src/lib/container/ob_ext_ring_buffer.h index 89a39211f0d87a8939ae2e7e4b1842e0d7f87d51..6d3f9715289d1810581e951b002cefc16d2ec20e 100644 --- a/deps/oblib/src/lib/container/ob_ext_ring_buffer.h +++ b/deps/oblib/src/lib/container/ob_ext_ring_buffer.h @@ -50,7 +50,8 @@ public: virtual ~ObExtendibleRingBuffer(); public: - int init(const int64_t begin_sn); + int init(const int64_t begin_sn, + const int64_t seg_size = erb::ObExtendibleRingBufferBase >::DEFAULT_SEG_SIZE); int destroy(); bool is_inited() const; @@ -134,13 +135,13 @@ ObExtendibleRingBuffer::~ObExtendibleRingBuffer() {} template -int ObExtendibleRingBuffer::init(const int64_t begin_sn) +int ObExtendibleRingBuffer::init(const int64_t begin_sn, const int64_t seg_size) { int ret = OB_SUCCESS; if (begin_sn < 0) { ret = OB_INVALID_ARGUMENT; CLOG_LOG(WARN, "err begin sn", K(ret), K(begin_sn)); - } else if (OB_SUCCESS != (ret = BaseType::init(begin_sn, &alloc_))) { + } else if (OB_SUCCESS != (ret = BaseType::init(begin_sn, &alloc_, seg_size))) { CLOG_LOG(WARN, "err init", K(ret), K(begin_sn)); } return ret; diff --git a/deps/oblib/src/lib/container/ob_ext_ring_buffer_impl.h b/deps/oblib/src/lib/container/ob_ext_ring_buffer_impl.h index 4c6b636592ae2341dc5afdef051a6ed85231627a..fa6089e51f6eb700d74ea2005e4ef4bd145bcb64 100644 --- a/deps/oblib/src/lib/container/ob_ext_ring_buffer_impl.h +++ b/deps/oblib/src/lib/container/ob_ext_ring_buffer_impl.h @@ -211,13 +211,13 @@ class ObExtendibleRingBufferBase { }; // Defines. typedef ObExtendibleRingBufferBase MyType; - static const int64_t DEFAULT_SEG_SIZE = (1LL << 9); // 512 bytes - static const int64_t DEFAULT_SEG_CAPACITY = - static_cast((DEFAULT_SEG_SIZE - sizeof(Segment)) / sizeof(SlotT)); static const int64_t MIN_SEG_CNT = 2; static const int64_t INIT_SEG_CNT = MIN_SEG_CNT; typedef std::pair SlotIdx; // public: + static const int64_t DEFAULT_SEG_SIZE = (1LL << 9); // 512 bytes + static const int64_t DEFAULT_SEG_CAPACITY = + static_cast((DEFAULT_SEG_SIZE - sizeof(Segment)) / sizeof(SlotT)); // Interface for subclass. ObExtendibleRingBufferBase() : inited_(false), seg_size_(0), seg_capacity_(0), begin_sn_(0), end_sn_(0), dir_(0), es_lock_(), allocator_(NULL) diff --git a/src/clog/CMakeLists.txt b/src/clog/CMakeLists.txt index ec1a350895cf2471ecf5ea990bcc3063a9ae2188..a6cc5a172105bdeef512a3c0f459f3be9c0abf96 100644 --- a/src/clog/CMakeLists.txt +++ b/src/clog/CMakeLists.txt @@ -15,7 +15,6 @@ ob_set_subtarget(ob_clog common ob_clog_writer.cpp ob_disk_log_buffer.cpp ob_external_fetcher.cpp - ob_external_heartbeat_handler.cpp ob_external_leader_heartbeat_handler.cpp ob_external_log_service.cpp ob_external_log_service_monitor.cpp diff --git a/src/clog/ob_clog_config.h b/src/clog/ob_clog_config.h index 80d29ad137671317587a075499d24d5fd424a50b..71a12f0698bfb96a49952b1eb6a9a69cdd20901e 100644 --- a/src/clog/ob_clog_config.h +++ b/src/clog/ob_clog_config.h @@ -189,7 +189,7 @@ const uint64_t DEFAULT_CLOG_APPEND_TIMEOUT_US = 365ull * 24 * 3600 * 1000 * 1000 const uint64_t DEFAULT_WRITER_MAX_BUFFER_ITEM_CNT = 4 * 1024; // the buffer size of membership log const int64_t MS_LOG_BUFFER_SIZE = 2048; -} // namespace clog -} // namespace oceanbase +} // namespace clog +} // namespace oceanbase #endif // OCEANBASE_CLOG_OB_CLOG_CONFIG_H_ diff --git a/src/clog/ob_clog_file_writer.cpp b/src/clog/ob_clog_file_writer.cpp index 4a9f941548f6ce3726e5c86b473d412e2ae2b127..5125c08f9f948d2d34ecaf9af2aebe7a42aa1b28 100644 --- a/src/clog/ob_clog_file_writer.cpp +++ b/src/clog/ob_clog_file_writer.cpp @@ -27,15 +27,15 @@ using namespace oceanbase::common; namespace oceanbase { namespace clog { -ObCLogBaseFileWriter::ObCLogBaseFileWriter() : - is_inited_(false), - aligned_data_buf_(nullptr), - buf_write_pos_(0), - file_offset_(0), - buf_padding_size_(0), - align_size_(0), - store_(NULL), - file_id_(0) +ObCLogBaseFileWriter::ObCLogBaseFileWriter() + : is_inited_(false), + aligned_data_buf_(nullptr), + buf_write_pos_(0), + file_offset_(0), + buf_padding_size_(0), + align_size_(0), + store_(NULL), + file_id_(0) { log_dir_[0] = '\0'; } @@ -45,8 +45,7 @@ ObCLogBaseFileWriter::~ObCLogBaseFileWriter() destroy(); } -int ObCLogBaseFileWriter::init(const char *log_dir, - const uint32_t align_size, const ObILogFileStore *file_store) +int ObCLogBaseFileWriter::init(const char* log_dir, const uint32_t align_size, const ObILogFileStore* file_store) { int ret = OB_SUCCESS; if (IS_INIT) { @@ -55,8 +54,8 @@ int ObCLogBaseFileWriter::init(const char *log_dir, } else if (OB_ISNULL(log_dir) || OB_ISNULL(file_store)) { ret = OB_INVALID_ARGUMENT; CLOG_LOG(WARN, "invalid param", K(ret), K(log_dir), K(align_size), KP(file_store)); - } else if (OB_ISNULL(aligned_data_buf_ = (char*) ob_malloc_align( - align_size, CLOG_MAX_WRITE_BUFFER_SIZE, "CLogFileWriter"))) { + } else if (OB_ISNULL(aligned_data_buf_ = + (char*)ob_malloc_align(align_size, CLOG_MAX_WRITE_BUFFER_SIZE, "CLogFileWriter"))) { ret = OB_ALLOCATE_MEMORY_FAILED; CLOG_LOG(WARN, "get log buf failed", K(ret), K(log_dir)); } else { @@ -126,14 +125,13 @@ int ObCLogLocalFileWriter::load_file(uint32_t& file_id, uint32_t& offset, bool e if (need_align()) { buf_write_pos_ = offset % align_size_; int64_t read_size = 0; - if (buf_write_pos_ > 0 - && OB_FAIL(store_->read(aligned_data_buf_, align_size_, lower_align(offset, align_size_), read_size))) { + if (buf_write_pos_ > 0 && + OB_FAIL(store_->read(aligned_data_buf_, align_size_, lower_align(offset, align_size_), read_size))) { CLOG_LOG(ERROR, "Fail to read data from log file, ", K(ret), K(buf_write_pos_), K(offset)); } else if (read_size != align_size_) { CLOG_LOG(INFO, "Log file size is not aligned. ", K(read_size), K(align_size_)); } else { - CLOG_LOG(INFO, "Read data from log file to shared memory buffer, ", K(buf_write_pos_), - K(file_id), K(offset)); + CLOG_LOG(INFO, "Read data from log file to shared memory buffer, ", K(buf_write_pos_), K(file_id), K(offset)); } } else { reset_buf(); @@ -159,19 +157,22 @@ int ObCLogBaseFileWriter::append_trailer_entry(const uint32_t info_block_offset) // build trailer from last 512 byte offset (4096-512) int64_t trailer_pos = CLOG_DIO_ALIGN_SIZE - CLOG_TRAILER_SIZE; - char *buf = aligned_data_buf_ + trailer_pos; + char* buf = aligned_data_buf_ + trailer_pos; reset_buf(); - if (CLOG_TRAILER_OFFSET != file_offset_) { //Defense code + if (CLOG_TRAILER_OFFSET != file_offset_) { // Defense code ret = OB_ERR_UNEXPECTED; - CLOG_LOG(WARN, "file_offset_ mismatch trailer offset", K(ret), K_(file_offset), - LITERAL_K(CLOG_TRAILER_OFFSET)); - } else if (OB_FAIL(trailer.build_serialized_trailer(buf, CLOG_TRAILER_SIZE, info_block_offset, - phy_file_id, pos))) { - CLOG_LOG(WARN, "build_serialized_trailer fail", K(ret), LITERAL_K(CLOG_DIO_ALIGN_SIZE), - K(info_block_offset), K_(file_id), K(phy_file_id)); + CLOG_LOG(WARN, "file_offset_ mismatch trailer offset", K(ret), K_(file_offset), LITERAL_K(CLOG_TRAILER_OFFSET)); + } else if (OB_FAIL(trailer.build_serialized_trailer(buf, CLOG_TRAILER_SIZE, info_block_offset, phy_file_id, pos))) { + CLOG_LOG(WARN, + "build_serialized_trailer fail", + K(ret), + LITERAL_K(CLOG_DIO_ALIGN_SIZE), + K(info_block_offset), + K_(file_id), + K(phy_file_id)); } else { - buf_write_pos_ += (uint32_t) CLOG_DIO_ALIGN_SIZE; + buf_write_pos_ += (uint32_t)CLOG_DIO_ALIGN_SIZE; } return ret; @@ -180,7 +181,7 @@ int ObCLogBaseFileWriter::append_trailer_entry(const uint32_t info_block_offset) int ObCLogBaseFileWriter::flush_trailer_entry() { int ret = OB_SUCCESS; - if (CLOG_TRAILER_OFFSET != file_offset_) { // Defense code + if (CLOG_TRAILER_OFFSET != file_offset_) { // Defense code ret = OB_ERR_UNEXPECTED; CLOG_LOG(WARN, "file offset mismatch", K_(file_offset), LITERAL_K(CLOG_TRAILER_OFFSET)); } else if (CLOG_DIO_ALIGN_SIZE != buf_write_pos_) { @@ -188,8 +189,13 @@ int ObCLogBaseFileWriter::flush_trailer_entry() CLOG_LOG(WARN, "buf write position mismatch", K_(buf_write_pos), LITERAL_K(CLOG_DIO_ALIGN_SIZE)); } else if (OB_FAIL(store_->write(aligned_data_buf_, buf_write_pos_, CLOG_TRAILER_ALIGN_WRITE_OFFSET))) { // no retry - CLOG_LOG(ERROR, "write fail", K(ret), K(buf_write_pos_), K_(file_offset), - LITERAL_K(CLOG_TRAILER_ALIGN_WRITE_OFFSET), K(errno)); + CLOG_LOG(ERROR, + "write fail", + K(ret), + K(buf_write_pos_), + K_(file_offset), + LITERAL_K(CLOG_TRAILER_ALIGN_WRITE_OFFSET), + K(errno)); } return ret; } @@ -198,7 +204,7 @@ int ObCLogBaseFileWriter::append_info_block_entry(ObIInfoBlockHandler* info_gett { int ret = OB_SUCCESS; ObLogBlockMetaV2 meta; - const uint32_t block_meta_len = (uint32_t) meta.get_serialize_size(); + const uint32_t block_meta_len = (uint32_t)meta.get_serialize_size(); const int64_t buf_len = CLOG_MAX_WRITE_BUFFER_SIZE - block_meta_len; int64_t data_len = 0; int64_t pos = 0; @@ -211,7 +217,8 @@ int ObCLogBaseFileWriter::append_info_block_entry(ObIInfoBlockHandler* info_gett // build_info_block will reset flying info_block for next file CLOG_LOG(WARN, "read partition meta fail", K(ret), KP(buf), K(buf_len), K_(file_offset), K_(buf_padding_size)); - } else if (OB_FAIL(meta.build_serialized_block(aligned_data_buf_, block_meta_len, buf, data_len, OB_INFO_BLOCK, pos))) { + } else if (OB_FAIL( + meta.build_serialized_block(aligned_data_buf_, block_meta_len, buf, data_len, OB_INFO_BLOCK, pos))) { CLOG_LOG(WARN, "build serialized block fail", K(ret), K_(file_offset), K_(buf_padding_size)); } else { buf_write_pos_ += (block_meta_len + (uint32_t)data_len); @@ -266,8 +273,12 @@ int ObCLogBaseFileWriter::append_padding_entry(const uint32_t padding_size) if (buf_write_pos_ + padding_size > CLOG_MAX_WRITE_BUFFER_SIZE) { ret = OB_BUF_NOT_ENOUGH; - CLOG_LOG(WARN, "padding entry size over buf length", K(ret), K(padding_size), - K_(buf_write_pos), LITERAL_K(CLOG_MAX_WRITE_BUFFER_SIZE)); + CLOG_LOG(WARN, + "padding entry size over buf length", + K(ret), + K(padding_size), + K_(buf_write_pos), + LITERAL_K(CLOG_MAX_WRITE_BUFFER_SIZE)); } else if (OB_FAIL(padding_entry.set_entry_size(padding_size))) { CLOG_LOG(WARN, "padding entry set size error", K(ret), K(padding_size)); } else if (OB_FAIL(padding_entry.serialize(buf, padding_size, serialize_pos))) { @@ -280,7 +291,7 @@ int ObCLogBaseFileWriter::append_padding_entry(const uint32_t padding_size) return ret; } -int ObCLogBaseFileWriter::cache_buf(ObLogCache *log_cache, const char *buf, const uint32_t buf_len) +int ObCLogBaseFileWriter::cache_buf(ObLogCache* log_cache, const char* buf, const uint32_t buf_len) { int ret = OB_SUCCESS; if (OB_ISNULL(buf) || 0 == buf_len) { @@ -310,9 +321,9 @@ int ObCLogBaseFileWriter::append_log_entry(const char* item_buf, const uint32_t ret = OB_ERR_UNEXPECTED; CLOG_LOG(WARN, "file not start", K_(file_id), K(ret)); } else { - //copy log to memory buffer + // copy log to memory buffer memcpy(aligned_data_buf_ + buf_write_pos_, item_buf, len); - buf_write_pos_ += (uint32_t) len; + buf_write_pos_ += (uint32_t)len; if (OB_FAIL(align_buf())) { CLOG_LOG(ERROR, "fail to add padding, ", K(ret)); @@ -359,8 +370,7 @@ int ObCLogLocalFileWriter::align_buf() } /// ObCLogLocalFileWriter /// -int ObCLogLocalFileWriter::init(const char* log_dir, - const uint32_t align_size, const ObILogFileStore* file_store) +int ObCLogLocalFileWriter::init(const char* log_dir, const uint32_t align_size, const ObILogFileStore* file_store) { int ret = OB_SUCCESS; if (IS_INIT) { @@ -554,7 +564,7 @@ int ObCLogLocalFileWriter::end_current_file(ObIInfoBlockHandler* info_getter, Ob // - Flush trailer entry to log file // - Cache trailer entry to log cache - char *trailer_buf = aligned_data_buf_ + CLOG_DIO_ALIGN_SIZE - CLOG_TRAILER_SIZE; + char* trailer_buf = aligned_data_buf_ + CLOG_DIO_ALIGN_SIZE - CLOG_TRAILER_SIZE; if (OB_SUCC(ret)) { if (OB_FAIL(append_trailer_entry(info_block_offset))) { CLOG_LOG(WARN, "fail to add trailer", K(ret)); diff --git a/src/clog/ob_clog_file_writer.h b/src/clog/ob_clog_file_writer.h index d3f2a1d667b5076275bb89e0551399b13a5a97fe..a51fd22b6c79eb0692c1af7dc03f911963646ef0 100644 --- a/src/clog/ob_clog_file_writer.h +++ b/src/clog/ob_clog_file_writer.h @@ -37,8 +37,7 @@ public: ObCLogBaseFileWriter(); virtual ~ObCLogBaseFileWriter(); - virtual int init(const char *log_dir, - const uint32_t align_size, const common::ObILogFileStore *file_store); + virtual int init(const char* log_dir, const uint32_t align_size, const common::ObILogFileStore* file_store); virtual void destroy(); // When log engine start, need to flush remaining content in shared memory buffer to log file @@ -96,7 +95,7 @@ protected: int append_trailer_entry(const uint32_t info_block_offset); int flush_trailer_entry(); // append all data in buffer to log cache - int cache_buf(ObLogCache *log_cache, const char *buf, const uint32_t buf_len); + int cache_buf(ObLogCache* log_cache, const char* buf, const uint32_t buf_len); OB_INLINE bool need_align() const { @@ -109,7 +108,7 @@ protected: protected: bool is_inited_; - char *aligned_data_buf_; + char* aligned_data_buf_; uint32_t buf_write_pos_; uint32_t file_offset_; // the last aligned part padding size of the buffer @@ -132,8 +131,7 @@ public: destroy(); } - virtual int init(const char *log_dir, - const uint32_t align_size, const common::ObILogFileStore *file_store) override; + virtual int init(const char* log_dir, const uint32_t align_size, const common::ObILogFileStore* file_store) override; virtual void destroy(); virtual int load_file(uint32_t& file_id, uint32_t& offset, bool enable_pre_creation = false) override; diff --git a/src/clog/ob_clog_writer.cpp b/src/clog/ob_clog_writer.cpp index aa020a8553f107f22126ccf64184b82fddad58d3..314014205b346e1306264b3cfc9b9e7a79f34eb5 100644 --- a/src/clog/ob_clog_writer.cpp +++ b/src/clog/ob_clog_writer.cpp @@ -179,7 +179,7 @@ void ObCLogWriter::set_clog_writer_thread_name() void ObCLogWriter::process_log_items(common::ObIBaseLogItem** items, const int64_t item_cnt, int64_t& finish_cnt) { int ret = OB_SUCCESS; - ObICLogItem *item = NULL; + ObICLogItem* item = NULL; int64_t cur_time = 0; int64_t io_time = 0; int64_t flush_time = 0; @@ -205,7 +205,7 @@ void ObCLogWriter::process_log_items(common::ObIBaseLogItem** items, const int64 const bool is_idempotent = false; const uint64_t write_len = block_meta_len + item->get_data_len(); const int64_t warning_value = GCONF.data_storage_warning_tolerance_time; - ObCLogDiskErrorCB *cb = NULL; + ObCLogDiskErrorCB* cb = NULL; lib::ObMutexGuard guard(file_mutex_); BG_NEW_CALLBACK(cb, ObCLogDiskErrorCB, this); @@ -257,7 +257,7 @@ void ObCLogWriter::process_log_items(common::ObIBaseLogItem** items, const int64 if (OB_SUCC(ret)) { io_time = ObTimeUtility::current_time() - cur_time; - //log flush succeed, invoke callback when disk sync + // log flush succeed, invoke callback when disk sync after_flush(item, block_meta_len, ret, flush_start_offset, finish_cnt); flush_time = ObTimeUtility::current_time() - cur_time - io_time; @@ -378,8 +378,7 @@ ObCLogDiskErrorCB::ObCLogDiskErrorCB(ObCLogWriter* host) : host_(host) {} ObCLogDiskErrorCB::~ObCLogDiskErrorCB() -{ -} +{} int ObCLogDiskErrorCB::callback() { @@ -405,11 +404,8 @@ void ObCLogDiskErrorCB::destroy() } } -int locate_clog_tail(const int64_t timeout, - ObILogFileStore *file_store, - ObLogDirectReader *reader, - file_id_t &file_id, - offset_t &offset) +int locate_clog_tail( + const int64_t timeout, ObILogFileStore* file_store, ObLogDirectReader* reader, file_id_t& file_id, offset_t& offset) { ObLogFileTailLocatorImpl impl; return impl.locate_tail(timeout, file_store, reader, file_id, offset); diff --git a/src/clog/ob_external_fetcher.cpp b/src/clog/ob_external_fetcher.cpp index e9dd7a34c0b772b251422b0d5c361adbf77ba0ea..c8fb519a630ac2033181962f899cc9417436db54 100644 --- a/src/clog/ob_external_fetcher.cpp +++ b/src/clog/ob_external_fetcher.cpp @@ -291,17 +291,13 @@ int ObExtLogFetcher::handle_log_not_exist( return ret; } -int ObExtLogFetcher::after_partition_fetch_log(ObStreamItem &stream_item, - const uint64_t beyond_upper_log_id, - const int64_t beyond_upper_log_ts, - const int64_t fetched_log_count, - ObLogStreamFetchLogResp &resp, - const ObLogCursorExt *cursor_ext, - clog::ObReadCost &read_cost) +int ObExtLogFetcher::after_partition_fetch_log(ObStreamItem& stream_item, const uint64_t beyond_upper_log_id, + const int64_t beyond_upper_log_ts, const int64_t fetched_log_count, ObLogStreamFetchLogResp& resp, + const ObLogCursorExt* cursor_ext, clog::ObReadCost& read_cost) { int ret = OB_SUCCESS; int64_t upper_log_ts = beyond_upper_log_ts; - const ObPartitionKey &pkey = stream_item.pkey_; + const ObPartitionKey& pkey = stream_item.pkey_; // 1. dealing with heartbeat hollow if ((0 == fetched_log_count) && OB_INVALID_ID != beyond_upper_log_id) { // log hole problem: @@ -317,7 +313,8 @@ int ObExtLogFetcher::after_partition_fetch_log(ObStreamItem &stream_item, LOG_WARN("resp get_aggre_log_min_timestamp error", K(ret), KPC(cursor_ext), K(upper_log_ts)); } else if (upper_log_ts != cursor_ext->get_submit_timestamp()) { LOG_TRACE("next log is aggregate log, update first log id as beyond_upper_log_ts", - K(upper_log_ts), K(cursor_ext->get_submit_timestamp())); + K(upper_log_ts), + K(cursor_ext->get_submit_timestamp())); } } ObLogStreamFetchLogResp::FetchLogHeartbeatItem hbp; @@ -335,23 +332,21 @@ int ObExtLogFetcher::after_partition_fetch_log(ObStreamItem &stream_item, return ret; } -int ObExtLogFetcher::get_aggre_log_min_timestamp(const ObPartitionKey &pkey, - const clog::ObLogCursorExt &cursor_ext, - int64_t &first_log_ts, - ObReadCost &read_cost) +int ObExtLogFetcher::get_aggre_log_min_timestamp( + const ObPartitionKey& pkey, const clog::ObLogCursorExt& cursor_ext, int64_t& first_log_ts, ObReadCost& read_cost) { int ret = OB_SUCCESS; bool fetch_log_from_hot_cache = true; int64_t log_entry_size = 0; - int64_t end_tstamp = INT64_MAX; // no need for timeout limit + int64_t end_tstamp = INT64_MAX; // no need for timeout limit ObReadParam param; param.offset_ = cursor_ext.get_offset(); param.read_len_ = cursor_ext.get_size(); param.file_id_ = cursor_ext.get_file_id(); ObReadBufGuard guard(ObModIds::OB_LOG_DECRYPT_ID); - ObReadBuf &rbuf = guard.get_read_buf(); - if (OB_FAIL(fetch_log_entry_(pkey, param, rbuf.buf_, rbuf.buf_len_, end_tstamp, - read_cost, fetch_log_from_hot_cache, log_entry_size))) { + ObReadBuf& rbuf = guard.get_read_buf(); + if (OB_FAIL(fetch_log_entry_( + pkey, param, rbuf.buf_, rbuf.buf_len_, end_tstamp, read_cost, fetch_log_from_hot_cache, log_entry_size))) { LOG_WARN("failed to fetch log entry", K(ret), K(param), K(pkey)); } else { clog::ObLogEntry log_entry; @@ -359,7 +354,7 @@ int ObExtLogFetcher::get_aggre_log_min_timestamp(const ObPartitionKey &pkey, if (OB_FAIL(log_entry.deserialize(rbuf.buf_, rbuf.buf_len_, log_entry_pos))) { LOG_WARN("failed to deserialize log entry", K(ret), K(rbuf), K(log_entry_pos)); } else if (OB_LOG_AGGRE == log_entry.get_header().get_log_type()) { - const char *data_buf = log_entry.get_buf(); + const char* data_buf = log_entry.get_buf(); const int64_t data_len = log_entry.get_header().get_data_len(); int32_t next_log_offset = 0; int64_t pos = 0; @@ -369,8 +364,13 @@ int ObExtLogFetcher::get_aggre_log_min_timestamp(const ObPartitionKey &pkey, // update first log ts as aggre log ts LOG_WARN("serialization::decode_i64 failed", K(ret), K(data_len), K(pos), KP(data_buf)); } else { - LOG_TRACE("get_aggre_log_min_timestamp", K(ret), K(data_len), K(pos), KP(data_buf), - K(first_log_ts), K(next_log_offset)); + LOG_TRACE("get_aggre_log_min_timestamp", + K(ret), + K(data_len), + K(pos), + KP(data_buf), + K(first_log_ts), + K(next_log_offset)); } } else { // not aggregate log, no need to update @@ -380,13 +380,9 @@ int ObExtLogFetcher::get_aggre_log_min_timestamp(const ObPartitionKey &pkey, } // Get single log entry -int ObExtLogFetcher::partition_fetch_log_entry_(const ObLogCursorExt &cursor_ext, - const ObPartitionKey &pkey, - const int64_t end_tstamp, - ObReadCost &read_cost, - ObLogStreamFetchLogResp &resp, - bool &fetch_log_from_hot_cache, - int64_t &log_entry_size) +int ObExtLogFetcher::partition_fetch_log_entry_(const ObLogCursorExt& cursor_ext, const ObPartitionKey& pkey, + const int64_t end_tstamp, ObReadCost& read_cost, ObLogStreamFetchLogResp& resp, bool& fetch_log_from_hot_cache, + int64_t& log_entry_size) { int ret = OB_SUCCESS; int64_t remain_size = 0; @@ -540,7 +536,7 @@ int ObExtLogFetcher::partition_fetch_log(ObStreamItem& stream_item, FetchRunTime const ObPartitionKey& pkey = stream_item.pkey_; uint64_t beyond_upper_log_id = OB_INVALID_ID; int64_t beyond_upper_log_ts = OB_INVALID_TIMESTAMP; - const ObLogCursorExt *next_cursor = NULL; + const ObLogCursorExt* next_cursor = NULL; // Note: After the optimization of step by step, the count of logs fetched in each round of // each partition will be "suddenly reduced". It is possible to get only a few logs per round, @@ -627,8 +623,8 @@ int ObExtLogFetcher::partition_fetch_log(ObStreamItem& stream_item, FetchRunTime } if (OB_SUCCESS == ret) { - if (OB_FAIL(after_partition_fetch_log(stream_item, beyond_upper_log_id, beyond_upper_log_ts, - log_count, resp, next_cursor, frt.read_cost_))) { + if (OB_FAIL(after_partition_fetch_log( + stream_item, beyond_upper_log_id, beyond_upper_log_ts, log_count, resp, next_cursor, frt.read_cost_))) { LOG_WARN("after partition fetch log error", K(ret), K(stream_item), diff --git a/src/clog/ob_external_fetcher.h b/src/clog/ob_external_fetcher.h index b55b5c0c03bde8f3453a95595f87dd56551484f9..2e90fc60ea8ec82dedf9923973a966ca3662f321 100644 --- a/src/clog/ob_external_fetcher.h +++ b/src/clog/ob_external_fetcher.h @@ -165,9 +165,9 @@ private: const common::ObPartitionKey& pkey, const uint64_t next_log_id, obrpc::ObLogStreamFetchLogResp& resp); int after_partition_fetch_log(ObStreamItem& stream_item, const uint64_t beyond_upper_log_id, const int64_t beyond_upper_log_ts, const int64_t fetched_log_count, obrpc::ObLogStreamFetchLogResp& resp, - const clog::ObLogCursorExt *cursor_ext, clog::ObReadCost &read_cost); - int get_aggre_log_min_timestamp(const common::ObPartitionKey &pkey, const clog::ObLogCursorExt &cursor_ext, - int64_t &first_log_ts, clog::ObReadCost &read_cost); + const clog::ObLogCursorExt* cursor_ext, clog::ObReadCost& read_cost); + int get_aggre_log_min_timestamp(const common::ObPartitionKey& pkey, const clog::ObLogCursorExt& cursor_ext, + int64_t& first_log_ts, clog::ObReadCost& read_cost); int prefill_resp_with_clog_entry(const clog::ObLogCursorExt& cursor_ext, const common::ObPartitionKey& pkey, const int64_t end_tstamp, clog::ObReadCost& read_cost, obrpc::ObLogStreamFetchLogResp& resp, bool& fetch_log_from_hot_cache, int64_t& log_entry_size); diff --git a/src/clog/ob_external_log_service.cpp b/src/clog/ob_external_log_service.cpp index 40f7febce958b6ed9381992388a4748dbb5aa3ca..41dc2fff63092401f61faeb48c43304b90f4ea34 100644 --- a/src/clog/ob_external_log_service.cpp +++ b/src/clog/ob_external_log_service.cpp @@ -58,8 +58,6 @@ int ObExtLogService::init(ObPartitionService* partition_service, ObILogEngine* l EXTLOG_LOG(WARN, "fetcher init error", K(ret), KP(log_engine), K(self_addr)); } else if (OB_FAIL(archive_log_fetcher_.init(log_archive_line_cache_, log_engine))) { EXTLOG_LOG(WARN, "log_archive_fetcher init error", K(ret), KP(log_engine), K(self_addr)); - } else if (OB_FAIL(hb_handler_.init(partition_service, log_engine))) { - EXTLOG_LOG(WARN, "hb_handler_ init error", K(ret)); } else if (OB_FAIL(leader_hb_handler_.init(partition_service))) { EXTLOG_LOG(WARN, "leader_hb_handler_ init error", K(ret)); } else { @@ -80,7 +78,6 @@ void ObExtLogService::destroy() locator_.destroy(); fetcher_.destroy(); archive_log_fetcher_.destroy(); - hb_handler_.destroy(); leader_hb_handler_.destroy(); clog_mgr_ = NULL; line_cache_.destroy(); @@ -196,24 +193,6 @@ int ObExtLogService::archive_fetch_log(const ObPGKey& pg_key, const ObReadParam& return ret; } -int ObExtLogService::req_heartbeat_info( - const ObLogReqHeartbeatInfoRequest& req_msg, ObLogReqHeartbeatInfoResponse& resp) -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - EXTLOG_LOG(WARN, "ObExtLogService not init", K(ret)); - } else { - const int64_t start_ts = ObTimeUtility::current_time(); - ret = hb_handler_.req_heartbeat_info(req_msg, resp); - const int64_t end_ts = ObTimeUtility::current_time(); - ObExtLogServiceMonitor::heartbeat_count(); - EVENT_INC(CLOG_EXTLOG_HEARTBEAT_RPC_COUNT); - ObExtLogServiceMonitor::heartbeat_time(end_ts - start_ts); - } - return ret; -} - int ObExtLogService::leader_heartbeat( const obrpc::ObLogLeaderHeartbeatReq& req_msg, obrpc::ObLogLeaderHeartbeatResp& resp) { diff --git a/src/clog/ob_external_log_service.h b/src/clog/ob_external_log_service.h index a41f2f14afa38391a4f527fb13ede46c1cdcd91d..460f1ce6e01990ef49640ef1aeae494127a7ec03 100644 --- a/src/clog/ob_external_log_service.h +++ b/src/clog/ob_external_log_service.h @@ -18,7 +18,6 @@ #include "ob_external_fetcher.h" #include "ob_archive_log_fetcher.h" #include "ob_external_start_log_locator.h" -#include "ob_external_heartbeat_handler.h" #include "ob_external_leader_heartbeat_handler.h" #include "ob_log_line_cache.h" // ObLogLineCache @@ -39,8 +38,6 @@ namespace logservice { * > ObExtStartLogLocator: Given a timestamp (specified when liboblog restart) to determine * from which log_id each partition will be pulled. * > ObExtLogFetcher: streaming pull operator - * > ObExtHeartbeatHandler: The old version of Heartbeat (supports querying the timestamp of older logs) - * is obsolete and is reserved for compatibility consideration. * > ObExtLeaderHeartbeatHandler: The new version of the heartbeat, called LeaderHeartbeat, * returns the next log and forecast timestamp. */ @@ -105,7 +102,6 @@ public: log_archive_line_cache_(), locator_(), fetcher_(), - hb_handler_(), leader_hb_handler_() {} ~ObExtLogService() @@ -126,8 +122,6 @@ public: // for log archive int archive_fetch_log( const common::ObPGKey& pg_key, const clog::ObReadParam& param, clog::ObReadBuf& rbuf, clog::ObReadRes& res); - int req_heartbeat_info( - const obrpc::ObLogReqHeartbeatInfoRequest& req_msg, obrpc::ObLogReqHeartbeatInfoResponse& response); int leader_heartbeat(const obrpc::ObLogLeaderHeartbeatReq& req_msg, obrpc::ObLogLeaderHeartbeatResp& resp); int wash_expired_stream(); int report_all_stream(); @@ -151,7 +145,6 @@ private: ObExtStartLogLocator locator_; ObExtLogFetcher fetcher_; ObArchiveLogFetcher archive_log_fetcher_; - ObExtHeartbeatHandler hb_handler_; ObExtLeaderHeartbeatHandler leader_hb_handler_; }; diff --git a/src/clog/ob_file_id_cache.cpp b/src/clog/ob_file_id_cache.cpp index 32824ce606b5f941c016202660270e5e7d92f8b5..eed05ca700f7cb5ce38f4c72840de7216096b975 100644 --- a/src/clog/ob_file_id_cache.cpp +++ b/src/clog/ob_file_id_cache.cpp @@ -481,13 +481,9 @@ int ObFileIdList::locate(const ObPartitionKey& pkey, const int64_t target_value, } // max_log_id,max_log_timestamp,start_offset may be invalid -int ObFileIdList::append(const ObPartitionKey &pkey, - const file_id_t file_id, - const offset_t start_offset, - const uint64_t min_log_id, - const uint64_t max_log_id, - const int64_t min_log_timestamp, - const int64_t max_log_timestamp) +int ObFileIdList::append(const ObPartitionKey& pkey, const file_id_t file_id, const offset_t start_offset, + const uint64_t min_log_id, const uint64_t max_log_id, const int64_t min_log_timestamp, + const int64_t max_log_timestamp) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -686,9 +682,7 @@ int ObFileIdList::purge(const common::ObPartitionKey& pkey, ObIFileIdCachePurgeS } // The caller guarantees that the function will not be executed concurrently -int ObFileIdList::purge_(const bool is_front_end, - IPurgeChecker &checker, - bool &empty) +int ObFileIdList::purge_(const bool is_front_end, IPurgeChecker& checker, bool& empty) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -1076,11 +1070,8 @@ void ObFileIdCache::destroy() // 4. OB_PARTITION_NOT_EXIST partiiton not exist, prev_item and next_item are both invalid // 5. OB_NEED_RETRY need retrym prev_item and next_item are both invalid // 6. Others -int ObFileIdCache::locate(const ObPartitionKey &pkey, - const int64_t target_value, - const bool locate_by_log_id, - Log2File &prev_item, - Log2File &next_item) +int ObFileIdCache::locate(const ObPartitionKey& pkey, const int64_t target_value, const bool locate_by_log_id, + Log2File& prev_item, Log2File& next_item) { int ret = OB_SUCCESS; ObFileIdList* list = NULL; @@ -1297,8 +1288,7 @@ int ObFileIdCache::AppendInfoFunctor::init(const file_id_t file_id, ObFileIdCach } // For compatibility, allow max_log_id and max_log_timestamp to be invalid -bool ObFileIdCache::AppendInfoFunctor::operator()(const ObPartitionKey &pkey, - const IndexInfoBlockEntry &entry) +bool ObFileIdCache::AppendInfoFunctor::operator()(const ObPartitionKey& pkey, const IndexInfoBlockEntry& entry) { if (OB_UNLIKELY(!pkey.is_valid()) || OB_UNLIKELY(!entry.is_valid())) { err_ = OB_ERR_UNEXPECTED; @@ -1466,13 +1456,9 @@ int ObFileIdCache::append_new_list_(const ObPartitionKey& pkey, const file_id_t } // For compatibility, allow max_log_id and max_log_timestamp to be invalid -int ObFileIdCache::do_append_(const ObPartitionKey &pkey, - const file_id_t file_id, - const offset_t start_offset, - const uint64_t min_log_id, - const uint64_t max_log_id, - const int64_t min_log_timestamp, - const int64_t max_log_timestamp) +int ObFileIdCache::do_append_(const ObPartitionKey& pkey, const file_id_t file_id, const offset_t start_offset, + const uint64_t min_log_id, const uint64_t max_log_id, const int64_t min_log_timestamp, + const int64_t max_log_timestamp) { int ret = OB_SUCCESS; ObFileIdList* list = NULL; diff --git a/src/clog/ob_file_id_cache.h b/src/clog/ob_file_id_cache.h index ad531591440016d0675965141cc6da2c68c2caf1..9a52cb73ee456c4e625be5bede713fb2918b56ba 100644 --- a/src/clog/ob_file_id_cache.h +++ b/src/clog/ob_file_id_cache.h @@ -52,21 +52,26 @@ class ObIlogAccessor; // p3 -> [(50,f1), (5000,f3)] // // When querying (p2, 170), first find the ordered list corresponding to p2, and then, use binary search. -class Log2File -{ +class Log2File { public: - Log2File() : file_id_(common::OB_INVALID_FILE_ID), - // For version after 2.1(include 2.1), start_offset correspond to the offset - // of the first ilog of this partition in ilog file - // For version before 2.1, the start_offset correspond to the start_offset_index - // of the first ilog of this partition in ilog cache - start_offset_(OB_INVALID_OFFSET), - min_log_id_(common::OB_INVALID_ID), - max_log_id_(common::OB_INVALID_ID), - min_log_timestamp_(common::OB_INVALID_TIMESTAMP), - max_log_timestamp_(common::OB_INVALID_TIMESTAMP) {} - - ~Log2File() { reset(); } + Log2File() + : file_id_(common::OB_INVALID_FILE_ID), + // For version after 2.1(include 2.1), start_offset correspond to the offset + // of the first ilog of this partition in ilog file + // For version before 2.1, the start_offset correspond to the start_offset_index + // of the first ilog of this partition in ilog cache + start_offset_(OB_INVALID_OFFSET), + min_log_id_(common::OB_INVALID_ID), + max_log_id_(common::OB_INVALID_ID), + min_log_timestamp_(common::OB_INVALID_TIMESTAMP), + max_log_timestamp_(common::OB_INVALID_TIMESTAMP) + {} + + ~Log2File() + { + reset(); + } + public: file_id_t get_file_id() const { @@ -223,7 +228,7 @@ public: } // Determine whether target_item is the next consecutive item - bool is_preceding_to(const Log2File &target_item) const + bool is_preceding_to(const Log2File& target_item) const { bool bool_ret = false; if (common::OB_INVALID_ID != get_max_log_id() && common::OB_INVALID_ID != target_item.min_log_id_) { @@ -311,8 +316,7 @@ private: ObFileIdCache& file_id_cache_; }; -class ObFileIdList -{ +class ObFileIdList { public: class BackFillFunctor { public: @@ -332,8 +336,7 @@ public: offset_t start_offset_; }; - class IPurgeChecker - { + class IPurgeChecker { public: virtual bool should_purge(const Log2File& log_2_file) const = 0; virtual bool is_valid() const = 0; @@ -341,8 +344,7 @@ public: }; // purge min // should_purge return true if min_log_id > top_item.file_id_ - class PurgeChecker : public IPurgeChecker - { + class PurgeChecker : public IPurgeChecker { public: explicit PurgeChecker(const common::ObPartitionKey& pkey, ObIFileIdCachePurgeStrategy& purge_strategy) : partition_key_(pkey), purge_strategy_(purge_strategy) @@ -362,8 +364,7 @@ public: // should_purge return true if top_item.file_id_ == broken_file_id_ // Because loading an InfoBlock involves multiple partitions, if only load a part of them, // then all of this load must be cleaned up - class ClearBrokenFunctor : public IPurgeChecker - { + class ClearBrokenFunctor : public IPurgeChecker { public: explicit ClearBrokenFunctor(const file_id_t file_id) : broken_file_id_(file_id) {} @@ -413,11 +414,8 @@ public: static const int64_t NEED_USE_SEG_ARRAY_THRESHOLD = 50; private: - int purge_(const bool is_front_end, - IPurgeChecker &checker, - bool &empty); - int purge_preceding_items_(const ObPartitionKey &pkey, - const Log2File &last_item); + int purge_(const bool is_front_end, IPurgeChecker& checker, bool& empty); + int purge_preceding_items_(const ObPartitionKey& pkey, const Log2File& last_item); // The caller guarantees that the function will not be executed concurrently int prepare_container_(); int move_item_to_seg_array_(common::ObISegArray* tmp_container_ptr) const; @@ -426,7 +424,7 @@ private: bool is_inited_; bool use_seg_array_; uint64_t min_continuous_log_id_; - ObFileIdCache *file_id_cache_; + ObFileIdCache* file_id_cache_; ObLogBasePos base_pos_; common::ObISegArray* container_ptr_; @@ -448,29 +446,27 @@ public: int init(const int64_t server_seq, const common::ObAddr& addr, ObIlogAccessor* ilog_accessor); void destroy(); - int locate(const common::ObPartitionKey &pkey, - const int64_t target_value, - const bool locate_by_log_id, - Log2File &prev_item, - Log2File &next_item); - int append(const file_id_t file_id, - IndexInfoBlockMap &index_info_block_map); - int backfill(const common::ObPartitionKey &pkey, - const uint64_t min_log_id, - const file_id_t file_id, - const offset_t start_offset); - int purge(ObIFileIdCachePurgeStrategy &purge_strategy); - int ensure_log_continuous(const common::ObPartitionKey &pkey, - const uint64_t log_id); - int add_partition_needed(const common::ObPartitionKey &pkey, - const uint64_t last_replay_log_id); - file_id_t get_curr_max_file_id() const {return ATOMIC_LOAD(&curr_max_file_id_);} - int64_t get_next_can_purge_log2file_timestamp() const {return ATOMIC_LOAD(&next_can_purge_log2file_timestamp_);} - int get_clog_base_pos(const ObPartitionKey &pkey, file_id_t &file_id, - offset_t &offset) const; - //Attention: this interface doesn't consider the format of version which before 2.1 - int get_cursor_from_file(const ObPartitionKey &pkey, const uint64_t log_id, - const Log2File &item, ObLogCursorExt &log_cursor); + int locate(const common::ObPartitionKey& pkey, const int64_t target_value, const bool locate_by_log_id, + Log2File& prev_item, Log2File& next_item); + int append(const file_id_t file_id, IndexInfoBlockMap& index_info_block_map); + int backfill(const common::ObPartitionKey& pkey, const uint64_t min_log_id, const file_id_t file_id, + const offset_t start_offset); + int purge(ObIFileIdCachePurgeStrategy& purge_strategy); + int ensure_log_continuous(const common::ObPartitionKey& pkey, const uint64_t log_id); + int add_partition_needed(const common::ObPartitionKey& pkey, const uint64_t last_replay_log_id); + file_id_t get_curr_max_file_id() const + { + return ATOMIC_LOAD(&curr_max_file_id_); + } + int64_t get_next_can_purge_log2file_timestamp() const + { + return ATOMIC_LOAD(&next_can_purge_log2file_timestamp_); + } + int get_clog_base_pos(const ObPartitionKey& pkey, file_id_t& file_id, offset_t& offset) const; + // Attention: this interface doesn't consider the format of version which before 2.1 + int get_cursor_from_file( + const ObPartitionKey& pkey, const uint64_t log_id, const Log2File& item, ObLogCursorExt& log_cursor); + private: class AppendInfoFunctor { public: @@ -488,8 +484,7 @@ private: ObFileIdCache* cache_; }; // Ensure that the loading process is atomic - class ObUndoAppendFunctor - { + class ObUndoAppendFunctor { public: explicit ObUndoAppendFunctor(const file_id_t broken_file_id) : broken_file_id_(broken_file_id) {} @@ -504,8 +499,7 @@ private: file_id_t broken_file_id_; common::ObPartitionArray dead_pkeys_; }; - class ObPurgeFunctor - { + class ObPurgeFunctor { public: explicit ObPurgeFunctor(ObIFileIdCachePurgeStrategy& purge_strategy) : purge_strategy_(purge_strategy), next_can_purge_log2file_timestamp_(common::OB_INVALID_TIMESTAMP) @@ -601,7 +595,7 @@ private: common::ObSmallAllocator seg_item_allocator_; // allocator for Log2File items(seg) common::ObSmallAllocator log2file_list_allocator_; // allocator for Log2FileList common::ObSmallAllocator list_item_allocator_; // allocator for Log2File items(list) - common::ObLinearHashMap map_; + common::ObLinearHashMap map_; common::ObLinearHashMap filter_map_; private: diff --git a/src/clog/ob_i_log_engine.h b/src/clog/ob_i_log_engine.h index f1d217ef0aa5b4631c132af910a56ff57363a060..4938a85a4c55e50a82e8def86cc53943de24b01a 100644 --- a/src/clog/ob_i_log_engine.h +++ b/src/clog/ob_i_log_engine.h @@ -134,12 +134,10 @@ public: virtual int notify_follower_log_missing(const common::ObAddr& server, const int64_t cluster_id, const common::ObPartitionKey& partition_key, const uint64_t start_log_id, const bool is_in_member_list, const int32_t msg_type) = 0; - virtual int send_restore_check_rqst(const common::ObAddr &server, const int64_t dst_cluster_id, - const common::ObPartitionKey &key, const ObRestoreCheckType restore_type) = 0; - virtual int send_query_restore_end_id_resp(const common::ObAddr &server, - const int64_t cluster_id, - const common::ObPartitionKey &partition_key, - const uint64_t last_restore_log_id) = 0; + virtual int send_restore_check_rqst(const common::ObAddr& server, const int64_t dst_cluster_id, + const common::ObPartitionKey& key, const ObRestoreCheckType restore_type) = 0; + virtual int send_query_restore_end_id_resp(const common::ObAddr& server, const int64_t cluster_id, + const common::ObPartitionKey& partition_key, const uint64_t last_restore_log_id) = 0; virtual void update_clog_info(const int64_t max_submit_timestamp) = 0; virtual void update_clog_info( const common::ObPartitionKey& partition_key, const uint64_t log_id, const int64_t submit_timestamp) = 0; diff --git a/src/clog/ob_ilog_memstore.cpp b/src/clog/ob_ilog_memstore.cpp index c70df27c45237c308cbdce34f8d02b7e04092683..1a4925ea2662b0dc43c9a0a1e36bbc4413f942a8 100644 --- a/src/clog/ob_ilog_memstore.cpp +++ b/src/clog/ob_ilog_memstore.cpp @@ -171,7 +171,7 @@ int ObIlogMemstore::timer_check_need_freeze(bool& need_freeze) const return ret; } -int ObIlogMemstore::get_cursor_size(int64_t &cursor_size) const +int ObIlogMemstore::get_cursor_size(int64_t& cursor_size) const { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -183,7 +183,7 @@ int ObIlogMemstore::get_cursor_size(int64_t &cursor_size) const return ret; } -int ObIlogMemstore::get_clog_size(int64_t &clog_size) const +int ObIlogMemstore::get_clog_size(int64_t& clog_size) const { int ret = OB_SUCCESS; if (IS_NOT_INIT) { diff --git a/src/clog/ob_ilog_memstore.h b/src/clog/ob_ilog_memstore.h index bed010235f1d55463603981c6068220f64488bef..11799eb6f01e69e58b324bdafe2f6ae09428f0e8 100644 --- a/src/clog/ob_ilog_memstore.h +++ b/src/clog/ob_ilog_memstore.h @@ -32,8 +32,7 @@ enum ObIlogFreezeTriggerType { OB_INVALID_TRIGGER_TYPE = 6, }; -class ObIlogMemstore -{ +class ObIlogMemstore { friend ObIlogFileBuilder; public: @@ -78,26 +77,21 @@ public: // Return value: // 1) OB_SUCCESS, query success // 2) OB_PARTITION_NOT_EXIST, partition not exist - int get_min_log_id_and_ts(const common::ObPartitionKey &partition_key, - uint64_t &ret_min_log_id, - int64_t &ret_min_log_ts) const; + int get_min_log_id_and_ts( + const common::ObPartitionKey& partition_key, uint64_t& ret_min_log_id, int64_t& ret_min_log_ts) const; // Return value: // 1) OB_SUCCESS, query success // 2) OB_PARTITION_NOT_EXIST, partition not exist - int get_log_id_range(const common::ObPartitionKey &partition_key, - uint64_t &ret_min_log_id, - uint64_t &ret_max_log_id) const; - - int get_cursor_size(int64_t &cursor_size) const; - int get_clog_size(int64_t &clog_size) const; - int insert_partition_meta_info(const common::ObPartitionKey &pkey, - const IndexInfoBlockEntry &entry); - int insert_partition_memberlist_info(const common::ObPartitionKey &pkey, - const MemberListInfo &member_list); - int insert_partition_log_cursor_ext_info(const ObPartitionLogInfo &log_info, - const ObLogCursorExt &log_cursor); - template - int operate_partition_meta_info(Function &fn) + int get_log_id_range( + const common::ObPartitionKey& partition_key, uint64_t& ret_min_log_id, uint64_t& ret_max_log_id) const; + + int get_cursor_size(int64_t& cursor_size) const; + int get_clog_size(int64_t& clog_size) const; + int insert_partition_meta_info(const common::ObPartitionKey& pkey, const IndexInfoBlockEntry& entry); + int insert_partition_memberlist_info(const common::ObPartitionKey& pkey, const MemberListInfo& member_list); + int insert_partition_log_cursor_ext_info(const ObPartitionLogInfo& log_info, const ObLogCursorExt& log_cursor); + template + int operate_partition_meta_info(Function& fn) { int ret = common::OB_SUCCESS; if (OB_FAIL(partition_meta_info_.for_each(fn))) { diff --git a/src/clog/ob_ilog_storage.cpp b/src/clog/ob_ilog_storage.cpp index 89b3496ae72a920174eab8a69d6e585c5ecbde12..da06b8bbb60d0d1caa6b90bd457e69d0800a0d2a 100644 --- a/src/clog/ob_ilog_storage.cpp +++ b/src/clog/ob_ilog_storage.cpp @@ -51,10 +51,8 @@ void ObIlogAccessor::destroy() inited_ = false; } -int ObIlogAccessor::init(const char *dir_name, - const int64_t server_seq, - const common::ObAddr &addr, - ObLogCache *log_cache) +int ObIlogAccessor::init( + const char* dir_name, const int64_t server_seq, const common::ObAddr& addr, ObLogCache* log_cache) { int ret = OB_SUCCESS; const bool use_log_cache = true; @@ -71,9 +69,12 @@ int ObIlogAccessor::init(const char *dir_name, CSR_LOG(ERROR, "file_store_ init failed", K(ret)); } else if (OB_FAIL(file_id_cache_.init(server_seq, addr, this))) { CSR_LOG(ERROR, "file_id_cache_ init failed", K(ret)); - } else if (OB_FAIL(direct_reader_.init(dir_name, nullptr/*no shared memory*/, use_log_cache, - log_cache, &log_tail_, - ObLogWritePoolType::ILOG_WRITE_POOL))) { + } else if (OB_FAIL(direct_reader_.init(dir_name, + nullptr /*no shared memory*/, + use_log_cache, + log_cache, + &log_tail_, + ObLogWritePoolType::ILOG_WRITE_POOL))) { CSR_LOG(ERROR, "direct_reader_ init failed", K(ret)); } else if (OB_FAIL(buffer_.init(OB_MAX_LOG_BUFFER_SIZE, CLOG_DIO_ALIGN_SIZE, ObModIds::OB_CLOG_INFO_BLK_HNDLR))) { CSR_LOG(ERROR, "buffer init failed", K(ret)); @@ -719,12 +720,8 @@ ObIlogStorage::~ObIlogStorage() destroy(); } -int ObIlogStorage::init(const char *dir_name, - const int64_t server_seq, - const common::ObAddr &addr, - ObLogCache *log_cache, - ObPartitionService *partition_service, - ObCommitLogEnv *commit_log_env) +int ObIlogStorage::init(const char* dir_name, const int64_t server_seq, const common::ObAddr& addr, + ObLogCache* log_cache, ObPartitionService* partition_service, ObCommitLogEnv* commit_log_env) { int ret = OB_SUCCESS; @@ -738,8 +735,15 @@ int ObIlogStorage::init(const char *dir_name, } else if (OB_ISNULL(dir_name) || OB_ISNULL(log_cache) || OB_ISNULL(partition_service) || OB_ISNULL(commit_log_env) || OB_UNLIKELY(server_seq < 0 || !addr.is_valid())) { ret = OB_INVALID_ARGUMENT; - CSR_LOG(ERROR, "invalid argument", KR(ret), KP(dir_name), KP(log_cache), KP(partition_service), - KP(commit_log_env), K(server_seq), K(addr)); + CSR_LOG(ERROR, + "invalid argument", + KR(ret), + KP(dir_name), + KP(log_cache), + KP(partition_service), + KP(commit_log_env), + K(server_seq), + K(addr)); } else if (OB_FAIL(ObIlogAccessor::init(dir_name, server_seq, addr, log_cache))) { CSR_LOG(ERROR, "failed to init ObIlogAccessor", K(ret)); } else if (OB_FAIL(init_next_ilog_file_id_(next_ilog_file_id))) { diff --git a/src/clog/ob_ilog_storage.h b/src/clog/ob_ilog_storage.h index 481619d0dc14107da63220b26739159ddd0294ed..c1761c19264acd3737c06a2a547db3695cf3fab9 100644 --- a/src/clog/ob_ilog_storage.h +++ b/src/clog/ob_ilog_storage.h @@ -40,10 +40,7 @@ public: virtual void destroy(); public: - int init(const char *dir_name, - const int64_t server_seq, - const common::ObAddr &addr, - ObLogCache *log_cache); + int init(const char* dir_name, const int64_t server_seq, const common::ObAddr& addr, ObLogCache* log_cache); int add_partition_needed_to_file_id_cache( const common::ObPartitionKey& partition_key, const uint64_t last_replay_log_id); @@ -102,12 +99,8 @@ public: ~ObIlogStorage(); public: - int init(const char *dir_name, - const int64_t server_seq, - const common::ObAddr &addr, - ObLogCache *log_cache, - storage::ObPartitionService *partition_service, - ObCommitLogEnv *commit_log_env); + int init(const char* dir_name, const int64_t server_seq, const common::ObAddr& addr, ObLogCache* log_cache, + storage::ObPartitionService* partition_service, ObCommitLogEnv* commit_log_env); void destroy(); int start(); void stop(); @@ -151,16 +144,12 @@ public: // Return value // 1) OB_SUCCESS // 2) OB_ENTRY_NOT_EXIST - int get_file_id_range(file_id_t &min_file_id, file_id_t &max_file_id) const; - - int locate_by_timestamp(const common::ObPartitionKey &partition_key, - const int64_t start_ts, - uint64_t &target_log_id, - int64_t &target_log_timestamp); - int locate_ilog_file_by_log_id(const common::ObPartitionKey &pkey, - const uint64_t start_log_id, - uint64_t &end_log_id, - file_id_t &ilog_id); + int get_file_id_range(file_id_t& min_file_id, file_id_t& max_file_id) const; + + int locate_by_timestamp(const common::ObPartitionKey& partition_key, const int64_t start_ts, uint64_t& target_log_id, + int64_t& target_log_timestamp); + int locate_ilog_file_by_log_id( + const common::ObPartitionKey& pkey, const uint64_t start_log_id, uint64_t& end_log_id, file_id_t& ilog_id); int wash_ilog_cache(); int purge_stale_file(); int purge_stale_ilog_index(); diff --git a/src/clog/ob_ilog_store.cpp b/src/clog/ob_ilog_store.cpp index 6e3f79e564126f43c93600cbc606bcf1f31962d0..4e000b162855c998e86a3caa90741c0a13546cfd 100644 --- a/src/clog/ob_ilog_store.cpp +++ b/src/clog/ob_ilog_store.cpp @@ -1192,7 +1192,7 @@ int ObIlogStore::get_merge_range_(int64_t& end_idx, bool& need_switch_file) end_idx = 0; // limit each ilog file should less than ObIlogMemstore::CURSOR_SIZE_TRIGGER while (end_idx < size && false == need_switch_file && OB_SUCC(ret)) { - ObIlogMemstore *memstore = frozen_memstore_array_[end_idx].memstore_; + ObIlogMemstore* memstore = frozen_memstore_array_[end_idx].memstore_; ObIlogFreezeTriggerType trigger_type = frozen_memstore_array_[end_idx].trigger_type_; // In normal case, when end_idx is 0, trigger_type mustn't be OB_ILOG_NOT_CONTINOUS_TRIGGER_TYPE if (frozen_memstore_array_[end_idx].trigger_type_ == OB_ILOG_NOT_CONTINOUS_TRIGGER_TYPE) { @@ -1206,8 +1206,8 @@ int ObIlogStore::get_merge_range_(int64_t& end_idx, bool& need_switch_file) CLOG_LOG(ERROR, "get_clog_size failed", K(ret)); // Try to ensure the total size of each file does not exceed 32MB, // because of the ilog memstore may exceed 32MB in concurrent case. - } else if (true == (total_clog_size >= ObIlogMemstore::CLOG_SIZE_TRIGGER - || total_cursor_size >= ObIlogMemstore::CURSOR_SIZE_TRIGGER)) { + } else if (true == (total_clog_size >= ObIlogMemstore::CLOG_SIZE_TRIGGER || + total_cursor_size >= ObIlogMemstore::CURSOR_SIZE_TRIGGER)) { need_switch_file = true; break; } else { @@ -1253,9 +1253,8 @@ int ObIlogStore::merge_frozen_memstore_(int64_t& end_idx, FrozenMemstore& memsto } } while (0); - if (OB_SUCC(ret) && OB_FAIL(do_merge_frozen_memstore_(tmp_frozen_memstore_array, - need_switch_file, - memstore_after_merge))) { + if (OB_SUCC(ret) && + OB_FAIL(do_merge_frozen_memstore_(tmp_frozen_memstore_array, need_switch_file, memstore_after_merge))) { if (ret == OB_EAGAIN) { CLOG_LOG(WARN, "log not continous in do_merge_frozen_memstore_", K(tmp_frozen_memstore_array)); } else { @@ -1384,9 +1383,8 @@ bool ObIlogStore::need_merge_frozen_memstore_array_by_trigger_type_(const ObIlog return trigger_type == OB_TIMER_TRIGGER_TYPE; } -int ObIlogStore::do_merge_frozen_memstore_(const FrozenMemstoreArray& tmp_frozen_memstore_array, - bool need_switch_file, - FrozenMemstore& memstore_after_merge) +int ObIlogStore::do_merge_frozen_memstore_( + const FrozenMemstoreArray& tmp_frozen_memstore_array, bool need_switch_file, FrozenMemstore& memstore_after_merge) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; @@ -1408,23 +1406,29 @@ int ObIlogStore::do_merge_frozen_memstore_(const FrozenMemstoreArray& tmp_frozen if (false == frozen_memstore.is_valid()) { ret = OB_ERR_UNEXPECTED; CLOG_LOG(ERROR, "unexpect error, invalid frozen_memstore", K(ret), K(frozen_memstore)); - } else if (OB_FAIL(merge_container.merge_ilog_memstore_to_container(frozen_memstore.memstore_)) - && OB_EAGAIN != ret) { + } else if (OB_FAIL(merge_container.merge_ilog_memstore_to_container(frozen_memstore.memstore_)) && + OB_EAGAIN != ret) { CLOG_LOG(ERROR, "merge ilog memstore failed", K(ret), K(frozen_memstore), K(tmp_frozen_memstore_array)); } else if (OB_EAGAIN == ret) { if (i == 0) { ret = OB_ERR_UNEXPECTED; - CLOG_LOG(ERROR, "unexpected error, there is no possiblity for merging frozen memstore\ - failed because ilog not continous when i is 0", K(ret)); + CLOG_LOG(ERROR, + "unexpected error, there is no possiblity for merging frozen memstore\ + failed because ilog not continous when i is 0", + K(ret)); } else if (ret == OB_EAGAIN) { WLockGuard guard(lock_); - frozen_memstore_array_[i-1].trigger_type_ = OB_ILOG_NOT_CONTINOUS_TRIGGER_TYPE; - CLOG_LOG(WARN, "log not continous in merge_frozen_memstore, need modify its trigger_type", K(ret), - K(frozen_memstore), K(tmp_frozen_memstore_array)); + frozen_memstore_array_[i - 1].trigger_type_ = OB_ILOG_NOT_CONTINOUS_TRIGGER_TYPE; + CLOG_LOG(WARN, + "log not continous in merge_frozen_memstore, need modify its trigger_type", + K(ret), + K(frozen_memstore), + K(tmp_frozen_memstore_array)); } else { CLOG_LOG(ERROR, "ilog_memstore_merge failed", K(ret), K(frozen_memstore), K(tmp_frozen_memstore_array)); } - } else {} + } else { + } } if (OB_SUCC(ret) && OB_FAIL(merge_container.transfer_to_ilog_memstore(memstore))) { @@ -1436,13 +1440,12 @@ int ObIlogStore::do_merge_frozen_memstore_(const FrozenMemstoreArray& tmp_frozen } if (OB_SUCC(ret)) { - int64_t seq = tmp_frozen_memstore_array[tmp_size-1].seq_; - ObIlogFreezeTriggerType trigger_type = (true == need_switch_file ? OB_MERGE_NEED_SWITCH_FILE_TRIGGER_TYPE : - OB_TIMER_TRIGGER_TYPE); + int64_t seq = tmp_frozen_memstore_array[tmp_size - 1].seq_; + ObIlogFreezeTriggerType trigger_type = + (true == need_switch_file ? OB_MERGE_NEED_SWITCH_FILE_TRIGGER_TYPE : OB_TIMER_TRIGGER_TYPE); - if(OB_FAIL(memstore_after_merge.set_frozen_memstore(trigger_type, memstore, seq))) { - CLOG_LOG(ERROR, "set_frozen_memstore failed", K(memstore_after_merge), K(trigger_type), - K(memstore), K(seq)); + if (OB_FAIL(memstore_after_merge.set_frozen_memstore(trigger_type, memstore, seq))) { + CLOG_LOG(ERROR, "set_frozen_memstore failed", K(memstore_after_merge), K(trigger_type), K(memstore), K(seq)); } } diff --git a/src/clog/ob_ilog_store.h b/src/clog/ob_ilog_store.h index 8baac08f4bd3517777cfb75e9f6089b797c84202..c97fc299ca24c1dd74f4f980023f482ff5acb580 100644 --- a/src/clog/ob_ilog_store.h +++ b/src/clog/ob_ilog_store.h @@ -175,9 +175,8 @@ private: // after doing merge bool need_merge_frozen_memstore_array_by_trigger_type_(const ObIlogFreezeTriggerType& trigger_type) const; - int do_merge_frozen_memstore_(const FrozenMemstoreArray& frozen_memstore_array, - bool need_switch_file, - FrozenMemstore& memstore_after_merge); + int do_merge_frozen_memstore_( + const FrozenMemstoreArray& frozen_memstore_array, bool need_switch_file, FrozenMemstore& memstore_after_merge); void alloc_memstore_(ObIlogMemstore*& memstore); diff --git a/src/clog/ob_info_block_handler.cpp b/src/clog/ob_info_block_handler.cpp index 120651c18baebeda5e2de38b43b8625c311999ae..e41decc041e4d8af8e642363881c4c7213dc3719 100644 --- a/src/clog/ob_info_block_handler.cpp +++ b/src/clog/ob_info_block_handler.cpp @@ -1237,11 +1237,10 @@ int ObIInfoBlockHandler::CheckPartitionNeedFreezeFunctor::do_check_full_partitio // 2. INVALID, means that archive may be had started or stopped, cann't reclaime the log file, need // wati next round // 3. STOPING and STOPED, means that archive has stopped - } else if (ObLogArchiveStatus::STATUS::BEGINNING == info.status_.status_ - || ObLogArchiveStatus::STATUS::DOING == info.status_.status_) { - if (OB_FAIL(pls->get_last_archived_log_id(info.status_.incarnation_, - info.status_.round_, - last_archived_log_id))) { + } else if (ObLogArchiveStatus::STATUS::BEGINNING == info.status_.status_ || + ObLogArchiveStatus::STATUS::DOING == info.status_.status_) { + if (OB_FAIL( + pls->get_last_archived_log_id(info.status_.incarnation_, info.status_.round_, last_archived_log_id))) { CLOG_LOG(WARN, "failed to get_log_archive_backup_info", K(partition_key), K(info), KR(ret)); } else if (OB_INVALID_ID == last_archived_log_id || last_archived_log_id < max_log_id) { can_skip_ = false; diff --git a/src/clog/ob_log_define.h b/src/clog/ob_log_define.h index b456f77d0fb214f6a69a681cf7bbb1cdb0e12b6b..5cbe06949d99913cc94f1ead5470b322ff2b56d1 100644 --- a/src/clog/ob_log_define.h +++ b/src/clog/ob_log_define.h @@ -141,7 +141,7 @@ enum ObReplicaMsgType { OB_REPLICA_MSG_TYPE_NOT_CHILD = 3, // I'm not your child OB_REPLICA_MSG_TYPE_NOT_EXIST = 4, // partition not exist OB_REPLICA_MSG_TYPE_DISABLED_STATE = 5, // server in disabled state - OB_REPLICA_MSG_TYPE_QUICK_REGISTER= 6, // quick register to me + OB_REPLICA_MSG_TYPE_QUICK_REGISTER = 6, // quick register to me }; enum ObRegRespMsgType { @@ -161,16 +161,14 @@ enum ObFetchLogType { OB_FETCH_LOG_TYPE_MAX, }; -enum ObRestoreCheckType -{ +enum ObRestoreCheckType { OB_CHECK_UNKNOWN = 0, OB_CHECK_STANDBY_RESTORE = 1, OB_CHECK_RESTORE_END_ID = 2, OB_CHECK_MAX, }; -enum ReceiveLogType -{ +enum ReceiveLogType { RL_TYPE_UNKNOWN = 0, PUSH_LOG = 1, FETCH_LOG = 2, diff --git a/src/clog/ob_log_entry_header.h b/src/clog/ob_log_entry_header.h index a649ae844cb641f06cf5e39ed79694d5874ed930..9514443a5aa81c8e519fd24f6b013cc8b3a0618d 100644 --- a/src/clog/ob_log_entry_header.h +++ b/src/clog/ob_log_entry_header.h @@ -155,7 +155,10 @@ public: } return log_ts; } - void set_submit_timestamp(const int64_t ts) { submit_timestamp_ = ts; } + void set_submit_timestamp(const int64_t ts) + { + submit_timestamp_ = ts; + } bool is_batch_committed() const { bool bool_ret = false; @@ -182,7 +185,7 @@ public: } // Serialize submit_timestamp at specified offset - int serialize_submit_timestamp(char *buf, const int64_t buf_len, int64_t &pos); + int serialize_submit_timestamp(char* buf, const int64_t buf_len, int64_t& pos); static bool check_magic_number(const int16_t magic_number) { diff --git a/src/clog/ob_log_event_scheduler.cpp b/src/clog/ob_log_event_scheduler.cpp index c0471616d2195d2f518c6e53e3890c648ec63636..dc2bbe534a14a4cb7938140a38bb9972bfbefacf 100644 --- a/src/clog/ob_log_event_scheduler.cpp +++ b/src/clog/ob_log_event_scheduler.cpp @@ -26,13 +26,12 @@ ObLogEventScheduler::~ObLogEventScheduler() int ObLogEventScheduler::init() { int ret = common::OB_SUCCESS; - const char *CLOG_EVENT_TIME_WHEEL_NAME = "ClogEventTimeWheel"; + const char* CLOG_EVENT_TIME_WHEEL_NAME = "ClogEventTimeWheel"; const int64_t thread_num = get_time_wheel_thread_num_(); if (IS_INIT) { ret = common::OB_INIT_TWICE; CLOG_LOG(ERROR, "ObLogEventScheduler init twice", K(ret)); - } else if (OB_FAIL(time_wheel_.init(CLOG_EVENT_TIME_WHEEL_PRECISION, - thread_num, CLOG_EVENT_TIME_WHEEL_NAME))) { + } else if (OB_FAIL(time_wheel_.init(CLOG_EVENT_TIME_WHEEL_PRECISION, thread_num, CLOG_EVENT_TIME_WHEEL_NAME))) { CLOG_LOG(ERROR, "ObTimeWheel init fail", K(ret)); } else { is_inited_ = true; @@ -105,7 +104,7 @@ int ObLogEventScheduler::schedule_task_(ObLogStateEventTaskV2* task, const int64 int64_t ObLogEventScheduler::get_time_wheel_thread_num_() const { - int64_t thread_num = MAX(common::get_cpu_num()/2, 4); + int64_t thread_num = MAX(common::get_cpu_num() / 2, 4); if (thread_num > common::ObTimeWheel::MAX_THREAD_NUM) { thread_num = common::ObTimeWheel::MAX_THREAD_NUM; } diff --git a/src/clog/ob_log_external_rpc.cpp b/src/clog/ob_log_external_rpc.cpp index 7189873279bbcf7fff369b421bc01e4296b9d0ee..96db38b948fd690adfca9fb1a54e9941c664540a 100644 --- a/src/clog/ob_log_external_rpc.cpp +++ b/src/clog/ob_log_external_rpc.cpp @@ -870,216 +870,6 @@ int ObLogExternalFetchLogProcessor::process() return ret; } -// Request heartbeat information of given partitions. -void ObLogReqHeartbeatInfoRequest::Param::reset() -{ - pkey_.reset(); - log_id_ = OB_INVALID_ID; -} - -OB_SERIALIZE_MEMBER(ObLogReqHeartbeatInfoRequest::Param, pkey_, log_id_); - -ObLogReqHeartbeatInfoRequest::ObLogReqHeartbeatInfoRequest() : rpc_ver_(CUR_RPC_VER), params_() -{} - -ObLogReqHeartbeatInfoRequest::~ObLogReqHeartbeatInfoRequest() -{ - reset(); -} - -void ObLogReqHeartbeatInfoRequest::reset() -{ - params_.reset(); -} - -bool ObLogReqHeartbeatInfoRequest::is_valid() const -{ - return (0 < params_.count()); -} - -int ObLogReqHeartbeatInfoRequest::set_params(const ParamArray& params) -{ - int ret = OB_SUCCESS; - if (ITEM_CNT_LMT < params.count()) { - ret = OB_BUF_NOT_ENOUGH; - EXTLOG_LOG(WARN, "err set params, buf not enough", K(ret), LITERAL_K(ITEM_CNT_LMT), "count", params.count()); - } else if (OB_SUCCESS != (ret = params_.assign(params))) { - EXTLOG_LOG(ERROR, "err assign params", K(ret)); - } - return ret; -} - -int ObLogReqHeartbeatInfoRequest::append_param(const Param& param) -{ - int ret = OB_SUCCESS; - if (ITEM_CNT_LMT <= params_.count()) { - ret = OB_BUF_NOT_ENOUGH; - EXTLOG_LOG(WARN, "err append param, buf not enough", K(ret), LITERAL_K(ITEM_CNT_LMT), "count", params_.count()); - } else if (OB_SUCCESS != (ret = params_.push_back(param))) { - EXTLOG_LOG(ERROR, "err push param", K(ret)); - } - return ret; -} - -const ObLogReqHeartbeatInfoRequest::ParamArray& ObLogReqHeartbeatInfoRequest::get_params() const -{ - return params_; -} - -int64_t ObLogReqHeartbeatInfoRequest::rpc_ver() const -{ - return rpc_ver_; -} - -OB_DEF_SERIALIZE(ObLogReqHeartbeatInfoRequest) -{ - int ret = OB_SUCCESS; - LST_DO_CODE(OB_UNIS_ENCODE, rpc_ver_, params_); - return ret; -} - -OB_DEF_SERIALIZE_SIZE(ObLogReqHeartbeatInfoRequest) -{ - int64_t len = 0; - LST_DO_CODE(OB_UNIS_ADD_LEN, rpc_ver_, params_); - return len; -} - -OB_DEF_DESERIALIZE(ObLogReqHeartbeatInfoRequest) -{ - int ret = OB_SUCCESS; - LST_DO_CODE(OB_UNIS_DECODE, rpc_ver_); - if (CUR_RPC_VER == rpc_ver_) { - LST_DO_CODE(OB_UNIS_DECODE, params_); - } else { - ret = OB_NOT_SUPPORTED; - EXTLOG_LOG(ERROR, "deserialize error, version not match", K(ret), K(rpc_ver_), LITERAL_K(CUR_RPC_VER)); - } - return ret; -} - -void ObLogReqHeartbeatInfoResponse::Result::reset() -{ - err_ = OB_SUCCESS; - tstamp_ = OB_INVALID_TIMESTAMP; -} - -OB_SERIALIZE_MEMBER(ObLogReqHeartbeatInfoResponse::Result, err_, tstamp_); - -ObLogReqHeartbeatInfoResponse::ObLogReqHeartbeatInfoResponse() : rpc_ver_(CUR_RPC_VER), err_(OB_SUCCESS), res_() -{} - -ObLogReqHeartbeatInfoResponse::~ObLogReqHeartbeatInfoResponse() -{ - reset(); -} - -void ObLogReqHeartbeatInfoResponse::reset() -{ - err_ = OB_SUCCESS; - res_.reset(); -} - -void ObLogReqHeartbeatInfoResponse::set_err(const int err) -{ - err_ = err; -} - -int ObLogReqHeartbeatInfoResponse::set_results(const ResultArray& results) -{ - int ret = OB_SUCCESS; - if (ITEM_CNT_LMT < results.count()) { - ret = OB_BUF_NOT_ENOUGH; - EXTLOG_LOG(WARN, "err set results, buf not enough", K(ret), LITERAL_K(ITEM_CNT_LMT), "count", results.count()); - } else if (OB_SUCCESS != (ret = res_.assign(results))) { - EXTLOG_LOG(ERROR, "err assign results", K(ret)); - } - return ret; -} - -int ObLogReqHeartbeatInfoResponse::append_result(const Result& result) -{ - int ret = OB_SUCCESS; - if (ITEM_CNT_LMT <= res_.count()) { - ret = OB_BUF_NOT_ENOUGH; - EXTLOG_LOG(WARN, "err append result, buf not enough", K(ret), LITERAL_K(ITEM_CNT_LMT), "count", res_.count()); - } else if (OB_SUCCESS != (ret = res_.push_back(result))) { - EXTLOG_LOG(ERROR, "err push back results", K(ret)); - } - return ret; -} - -int ObLogReqHeartbeatInfoResponse::get_err() const -{ - return err_; -} - -const ObLogReqHeartbeatInfoResponse::ResultArray& ObLogReqHeartbeatInfoResponse::get_results() const -{ - return res_; -} - -int64_t ObLogReqHeartbeatInfoResponse::rpc_ver() const -{ - return rpc_ver_; -} - -void ObLogReqHeartbeatInfoResponse::set_rpc_ver(const int64_t ver) -{ - rpc_ver_ = ver; -} - -OB_DEF_SERIALIZE(ObLogReqHeartbeatInfoResponse) -{ - int ret = OB_SUCCESS; - LST_DO_CODE(OB_UNIS_ENCODE, rpc_ver_, err_, res_); - return ret; -} - -OB_DEF_SERIALIZE_SIZE(ObLogReqHeartbeatInfoResponse) -{ - int64_t len = 0; - LST_DO_CODE(OB_UNIS_ADD_LEN, rpc_ver_, err_, res_); - return len; -} - -OB_DEF_DESERIALIZE(ObLogReqHeartbeatInfoResponse) -{ - int ret = OB_SUCCESS; - LST_DO_CODE(OB_UNIS_DECODE, rpc_ver_); - if (CUR_RPC_VER == rpc_ver_) { - LST_DO_CODE(OB_UNIS_DECODE, err_, res_); - } else { - ret = OB_NOT_SUPPORTED; - EXTLOG_LOG(ERROR, "deserialize error, version not match", K(ret), K(rpc_ver_), LITERAL_K(CUR_RPC_VER)); - } - return ret; -} - -int ObLogReqHeartbeatInfoProcessor::process() -{ - int ret = OB_SUCCESS; - const ObLogReqHeartbeatInfoRequest& req = arg_; - ObLogReqHeartbeatInfoResponse& resp = result_; - clog::ObICLogMgr* clog_mgr = NULL; - logservice::ObExtLogService* els = NULL; - if (OB_ISNULL(partition_service_)) { - ret = OB_ERR_UNEXPECTED; - EXTLOG_LOG(ERROR, "partition_service_ is null", K(ret)); - } else if (OB_ISNULL(clog_mgr = partition_service_->get_clog_mgr())) { - ret = OB_ERR_UNEXPECTED; - EXTLOG_LOG(ERROR, "clog_mgr is null", K(ret)); - } else if (OB_ISNULL(els = clog_mgr->get_external_log_service())) { - ret = OB_ERR_UNEXPECTED; - EXTLOG_LOG(ERROR, "els is null", K(ret)); - } else { - ret = els->req_heartbeat_info(req, resp); - } - // rewrite ret for rpc - ret = OB_SUCCESS; - return ret; -} - int ObExternalProcessorHelper::get_clog_mgr(ObPartitionService* ps, ObICLogMgr*& clog_mgr) { int ret = OB_SUCCESS; diff --git a/src/clog/ob_log_external_rpc.h b/src/clog/ob_log_external_rpc.h index 2e3799641f96933281fa616b2a03969a2a0b6b35..eb015cf41c698d6b0c015b30f02aa5a71edcbd40 100644 --- a/src/clog/ob_log_external_rpc.h +++ b/src/clog/ob_log_external_rpc.h @@ -40,8 +40,6 @@ class ObLogReqStartPosByLogIdRequest; class ObLogReqStartPosByLogIdResponse; class ObLogExternalFetchLogRequest; class ObLogExternalFetchLogResponse; -class ObLogReqHeartbeatInfoRequest; -class ObLogReqHeartbeatInfoResponse; // for request with breakpoint class ObLogReqStartLogIdByTsRequestWithBreakpoint; @@ -73,10 +71,6 @@ public: // fetch_log() RPC_S(@PR5 fetch_log, OB_LOG_FETCH_LOG_EXTERNAL, (ObLogExternalFetchLogRequest), ObLogExternalFetchLogResponse); - // req_last_log_serv_info() - RPC_S(@PR5 req_heartbeat_info, OB_LOG_REQUEST_HEARTBEAT_INFO, (ObLogReqHeartbeatInfoRequest), - ObLogReqHeartbeatInfoResponse); - // for request with breakpoint RPC_S(@PR5 req_start_log_id_by_ts_with_breakpoint, OB_LOG_REQ_START_LOG_ID_BY_TS_WITH_BREAKPOINT, (ObLogReqStartLogIdByTsRequestWithBreakpoint), ObLogReqStartLogIdByTsResponseWithBreakpoint); @@ -441,95 +435,6 @@ private: storage::ObPartitionService* partition_service_; }; -// Request heartbeat information of given partitions. -// Err: -// - OB_SUCCESS -// - OB_NEED_RETRY: can't generate heartbeat info -// - OB_ERR_SYS: observer internal error -class ObLogReqHeartbeatInfoRequest { - static const int64_t CUR_RPC_VER = 1; - static const int64_t ITEM_CNT_LMT = 10000; // Around 300kb for cur version. -public: - struct Param { - common::ObPartitionKey pkey_; - uint64_t log_id_; - void reset(); - TO_STRING_KV(K_(pkey), K_(log_id)); - OB_UNIS_VERSION(1); - }; - typedef common::ObSEArray ParamArray; - -public: - ObLogReqHeartbeatInfoRequest(); - ~ObLogReqHeartbeatInfoRequest(); - -public: - void reset(); - bool is_valid() const; - int set_params(const ParamArray& params); - int append_param(const Param& param); - const ParamArray& get_params() const; - int64_t rpc_ver() const; - TO_STRING_KV(K_(rpc_ver), K_(params)); - OB_UNIS_VERSION(1); - -private: - int64_t rpc_ver_; - ParamArray params_; -}; - -class ObLogReqHeartbeatInfoResponse { - static const int64_t CUR_RPC_VER = 1; - static const int64_t ITEM_CNT_LMT = 10000; // Around 300kb for cur version. -public: - struct Result { - int err_; - int64_t tstamp_; - void reset(); - TO_STRING_KV(K_(err), K_(tstamp)); - OB_UNIS_VERSION(1); - }; - typedef common::ObSEArray ResultArray; - -public: - ObLogReqHeartbeatInfoResponse(); - ~ObLogReqHeartbeatInfoResponse(); - -public: - void reset(); - void set_err(const int err); - int set_results(const ResultArray& results); - int append_result(const Result& result); - int get_err() const; - const ResultArray& get_results() const; - int64_t rpc_ver() const; - void set_rpc_ver(const int64_t ver); - TO_STRING_KV(K_(rpc_ver), K_(err), K_(res)); - OB_UNIS_VERSION(1); - -private: - int64_t rpc_ver_; - int err_; - ResultArray res_; -}; - -class ObLogReqHeartbeatInfoProcessor - : public ObRpcProcessor > { -public: - ObLogReqHeartbeatInfoProcessor(storage::ObPartitionService* partition_service) : partition_service_(partition_service) - {} - ~ObLogReqHeartbeatInfoProcessor() - { - partition_service_ = NULL; - } - -protected: - int process(); - -private: - storage::ObPartitionService* partition_service_; -}; - // check ps & ps->clog_mgr not null // to simplify code class ObExternalProcessorHelper { diff --git a/src/clog/ob_log_membership_task_mgr.cpp b/src/clog/ob_log_membership_task_mgr.cpp index bc562635e7ffb34ec0f346493c3ac75249559c97..93b23c0586335182d873c82c74a4dd03aa3ff3b5 100644 --- a/src/clog/ob_log_membership_task_mgr.cpp +++ b/src/clog/ob_log_membership_task_mgr.cpp @@ -594,9 +594,8 @@ int ObLogMembershipTaskMgr::submit_confirmed_info_(const uint64_t log_id, const int64_t confirmed_info_epoch_id = confirmed_info.get_epoch_id(); int64_t confirmed_info_submit_timestamp = confirmed_info.get_submit_timestamp(); - if (log_id != cur_renew_log_id - || ms_proposal_id != cur_ms_proposal_id - || ms_proposal_id != log_task.get_proposal_id()) { + if (log_id != cur_renew_log_id || ms_proposal_id != cur_ms_proposal_id || + ms_proposal_id != log_task.get_proposal_id()) { ret = OB_STATE_NOT_MATCH; CLOG_LOG(WARN, "log_id or ms_proposal_id not match with cur_renew_ms_task", @@ -625,12 +624,16 @@ int ObLogMembershipTaskMgr::submit_confirmed_info_(const uint64_t log_id, const if (log_task.is_confirmed_info_exist()) { } else { if (log_task.is_submit_log_exist()) { - if ((log_task.get_data_checksum() != confirmed_info_data_checksum) - || (log_task.get_epoch_id() != confirmed_info_epoch_id) - || (OB_INVALID_TIMESTAMP != confirmed_info_submit_timestamp - && log_task.get_submit_timestamp() != confirmed_info_submit_timestamp)) { - CLOG_LOG(INFO, "log_task and confirmed_info not match, reset", K_(partition_key), - K(log_id), K(log_task), K(confirmed_info)); + if ((log_task.get_data_checksum() != confirmed_info_data_checksum) || + (log_task.get_epoch_id() != confirmed_info_epoch_id) || + (OB_INVALID_TIMESTAMP != confirmed_info_submit_timestamp && + log_task.get_submit_timestamp() != confirmed_info_submit_timestamp)) { + CLOG_LOG(INFO, + "log_task and confirmed_info not match, reset", + K_(partition_key), + K(log_id), + K(log_task), + K(confirmed_info)); log_task.reset_log(); log_task.reset_state(false); log_task.reset_log_cursor(); diff --git a/src/clog/ob_log_reader_interface.h b/src/clog/ob_log_reader_interface.h index f8c48b99d67e5b11746929b5a6e2a14eb290d555..5808bba5858404191d44f7e9a98bebfff199376d 100644 --- a/src/clog/ob_log_reader_interface.h +++ b/src/clog/ob_log_reader_interface.h @@ -16,14 +16,11 @@ #include "ob_log_entry.h" #include "ob_log_file_pool.h" -namespace oceanbase -{ -namespace clog -{ +namespace oceanbase { +namespace clog { // Used to set the parameters of reading files, the purpose is to add // parameters int the future without changing the interface -struct ObReadParam -{ +struct ObReadParam { file_id_t file_id_; offset_t offset_; common::ObPartitionKey partition_key_; diff --git a/src/clog/ob_log_reconfirm.cpp b/src/clog/ob_log_reconfirm.cpp index 9e5c70d78162b6c9aee01993d1d58fa5558df17b..1b66db28b8bffd9c92b349c5e982c5977004d1bd 100644 --- a/src/clog/ob_log_reconfirm.cpp +++ b/src/clog/ob_log_reconfirm.cpp @@ -798,8 +798,8 @@ int ObLogReconfirm::confirm_log_() if (OB_FAIL(try_update_nop_or_truncate_timestamp(*header))) { CLOG_LOG(WARN, "try_update_nop_or_truncate_timestamp fail", K(ret), K_(partition_key)); } else if (OB_FAIL(sw_->submit_log(log_ptr->get_header(), log_ptr->get_buf(), NULL))) { - CLOG_LOG(WARN, "submit log failed", K_(partition_key), K(ret), K_(next_id), - K_(start_id), K_(max_flushed_id)); + CLOG_LOG( + WARN, "submit log failed", K_(partition_key), K(ret), K_(next_id), K_(start_id), K_(max_flushed_id)); break; } else { CLOG_LOG(TRACE, "submit log success", K_(partition_key), K_(next_id), K_(start_id), K_(max_flushed_id)); @@ -814,7 +814,7 @@ int ObLogReconfirm::confirm_log_() next_id_++; } } - } // end while + } // end while // In case of rebuild in leader reconfirm: // 1. when majority has already recycled specified log, the follower @@ -832,15 +832,17 @@ int ObLogReconfirm::confirm_log_() const uint64_t new_start_id = sw_->get_start_id(); if (new_start_id > next_id_) { next_id_ = new_start_id; - CLOG_LOG(INFO, "there may execute a rebuild operation in\ - leader reconfirm", K(ret), K(new_start_id), K(next_id_)); + CLOG_LOG(INFO, + "there may execute a rebuild operation in\ + leader reconfirm", + K(ret), + K(new_start_id), + K(next_id_)); } ret = OB_SUCCESS; } - if (OB_SUCC(ret) - && next_id_ <= max_flushed_id_ - && next_id_ >= log_info_array_.get_end_id()) { + if (OB_SUCC(ret) && next_id_ <= max_flushed_id_ && next_id_ >= log_info_array_.get_end_id()) { // process next log_range if (OB_EAGAIN == (ret = init_log_info_range_(next_id_))) { // ret is EAGAIN when some log has slide out, need update next_id_ diff --git a/src/clog/ob_log_sliding_window.cpp b/src/clog/ob_log_sliding_window.cpp index c5e96e27d5947c861e72147749e5128ad419ecd0..b5366495ab0cb97eb0a9972f6f177781f31f3e00 100644 --- a/src/clog/ob_log_sliding_window.cpp +++ b/src/clog/ob_log_sliding_window.cpp @@ -849,141 +849,15 @@ int ObLogSlidingWindow::try_update_submit_timestamp(const int64_t base_ts) return ret; } -// only called by ObExtLeaderHeartbeatHandler, it will double check leader -int ObLogSlidingWindow::get_next_timestamp(const uint64_t last_log_id, int64_t& res_ts) -{ - int ret = OB_SUCCESS; - int tmp_ret = OB_SUCCESS; - uint64_t last_log_id_dummy = OB_INVALID_ID; - uint64_t next_log_id_dummy = OB_INVALID_ID; - int64_t last_log_ts = OB_INVALID_TIMESTAMP; - int64_t next_log_ts = OB_INVALID_TIMESTAMP; - - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - } else { - res_ts = OB_INVALID_TIMESTAMP; - get_last_replay_log(last_log_id_dummy, last_log_ts); - get_next_replay_log_id_info(next_log_id_dummy, next_log_ts); - if (OB_INVALID_TIMESTAMP == last_log_ts || OB_INVALID_TIMESTAMP == next_log_ts) { - ret = OB_ERR_UNEXPECTED; - CLOG_LOG( - ERROR, "last_log_ts or next_log_ts is invalid", K(ret), K(partition_key_), K(last_log_ts), K(next_log_ts)); - } else { - if (OB_LIKELY(is_empty())) { // check empty first - if (last_log_id == static_cast(sw_.get_start_id() - 1)) { - int64_t safe_cur_ts = next_log_ts - MAX_TIME_DIFF_BETWEEN_SERVER; - res_ts = safe_cur_ts > last_log_ts ? safe_cur_ts : last_log_ts; - CLOG_LOG(TRACE, "sw get next timestamp", K(partition_key_), K(res_ts), K(safe_cur_ts), K(last_log_ts)); - } else { - ret = OB_EAGAIN; - } - } else { - CLOG_LOG(TRACE, "sw not empty, get next log task timestamp", K(last_log_id), K(partition_key_)); - // not empty, try to get timestamp of the unconfirmed timestamp - // this function is called under double check leader - int64_t next_ts = OB_INVALID_TIMESTAMP; - ObLogTask* task = NULL; - const int64_t* ref = NULL; - if (OB_FAIL(get_log_task(last_log_id + 1, task, ref))) { - // in time interval [test sw is empty ~ log_task is put into sw], OB_ERR_NULL_VALUE is expected - if (OB_ERR_NULL_VALUE == ret) { - CLOG_LOG(INFO, - "get log task error, log_task not put in sw yet", - K(ret), - K(partition_key_), - K(last_log_id), - "start_id", - sw_.get_start_id(), - "max_log_id", - get_max_log_id()); - // heartbeat handler retry - ret = OB_EAGAIN; - } else if (OB_ERROR_OUT_OF_RANGE == ret) { - CLOG_LOG(INFO, "get log task error, log slide out", K(ret), K(partition_key_), K(last_log_id)); - // heartbeat handler retry - ret = OB_EAGAIN; - } else { - CLOG_LOG(WARN, "get log task error", K(ret), K(partition_key_), K(last_log_id)); - } - } else { - task->lock(); - if (task->is_submit_log_exist()) { - next_ts = task->get_submit_timestamp(); - if (OB_INVALID_TIMESTAMP == next_ts) { - ret = OB_ERR_UNEXPECTED; - CLOG_LOG(WARN, "get invalid next_ts", K(partition_key_), K(last_log_id)); - } else { - res_ts = std::min(next_ts - MAX_TIME_DIFF_BETWEEN_SERVER, last_log_ts); - } - } else { // submit_log not exist - // not leader now, maybe log_task is generated by a quicker confirm_info_packet - ret = OB_NEED_RETRY; - CLOG_LOG(INFO, "submit log not exist", K(ret)); - } - task->unlock(); - } - if (NULL != ref && OB_SUCCESS != (tmp_ret = revert_log_task(ref))) { - CLOG_LOG(ERROR, "revert_log_task failed", K_(partition_key), K(tmp_ret)); - } else { - ref = NULL; - } - } // not empty - } - } - return ret; -} - // only called by ObExtLeaderHeartbeatHandler to get next served log_id and ts based on keepalive ts int ObLogSlidingWindow::get_next_served_log_info_by_next_replay_log_info( uint64_t& next_served_log_id, int64_t& next_served_log_ts) { int ret = OB_SUCCESS; - uint64_t next_replay_log_id = OB_INVALID_ID; - int64_t next_replay_log_ts = OB_INVALID_TIMESTAMP; - int64_t next_log_tstamp = OB_INVALID_TIMESTAMP; if (IS_NOT_INIT) { ret = OB_NOT_INIT; } else { - get_next_replay_log_id_info(next_replay_log_id, next_replay_log_ts); - - // when next_log is start log in sw, try to retrieve its log_ts - if (!is_empty() && next_replay_log_id == static_cast(sw_.get_start_id())) { - int tmp_ret = get_log_submit_tstamp_from_task_(next_replay_log_id, next_log_tstamp); - if (OB_EAGAIN == tmp_ret) { - CLOG_LOG(TRACE, - "[GET_NEXT_SERVED_LOG_INFO] next log is not ready", - K_(partition_key), - K(next_replay_log_id), - "start_id", - sw_.get_start_id(), - "max_log_id", - get_max_log_id()); - } else if (OB_ERROR_OUT_OF_RANGE == tmp_ret) { - CLOG_LOG(TRACE, - "[GET_NEXT_SERVED_LOG_INFO] next log just slide out", - K_(partition_key), - K(next_replay_log_id), - "start_id", - sw_.get_start_id(), - "max_log_id", - get_max_log_id()); - } - } - - if (OB_SUCCESS == ret) { - next_served_log_id = next_replay_log_id; - - // If the next log is valid and effective, select the max value between lower bound of the next log and - // follower-read ts - if (OB_INVALID_TIMESTAMP != next_log_tstamp) { - // Here minus the maximum clock offset between servers - int64_t safe_next_log_tstamp = next_log_tstamp - MAX_TIME_DIFF_BETWEEN_SERVER; - next_served_log_ts = std::max(safe_next_log_tstamp, next_replay_log_ts); - } else { - next_served_log_ts = next_replay_log_ts; - } - } + get_next_replay_log_id_info(next_served_log_id, next_served_log_ts); CLOG_LOG(TRACE, "[GET_NEXT_SERVED_LOG_INFO]", @@ -991,9 +865,6 @@ int ObLogSlidingWindow::get_next_served_log_info_by_next_replay_log_info( K_(partition_key), K(next_served_log_id), K(next_served_log_ts), - K(next_replay_log_id), - K(next_replay_log_ts), - K(next_log_tstamp), "start_id", sw_.get_start_id(), "max_log_id", @@ -1002,53 +873,6 @@ int ObLogSlidingWindow::get_next_served_log_info_by_next_replay_log_info( return ret; } -// return code: -// OB_EAGIN: log is not ready, need retry -// OB_ERROR_OUT_OF_RANGE: log has been slide out -// other code: failure -int ObLogSlidingWindow::get_log_submit_tstamp_from_task_(const uint64_t log_id, int64_t& log_tstamp) -{ - int ret = OB_SUCCESS; - int tmp_ret = OB_SUCCESS; - ObLogTask* task = NULL; - const int64_t* ref = NULL; - log_tstamp = OB_INVALID_TIMESTAMP; - if (OB_FAIL(get_log_task(log_id, task, ref))) { - if (OB_ERR_NULL_VALUE == ret) { - ret = OB_EAGAIN; - } else if (OB_ERROR_OUT_OF_RANGE == ret) { - } else { - CLOG_LOG(WARN, - "get log task error", - K(ret), - K(partition_key_), - K(log_id), - "start_id", - sw_.get_start_id(), - "max_log_id", - get_max_log_id()); - } - } else if (OB_ISNULL(task)) { - CLOG_LOG(WARN, "invalid task after get_log_task", K(task), K(ret), K(log_id)); - ret = OB_ERR_UNEXPECTED; - } else { - task->lock(); - if (task->is_submit_log_exist()) { - log_tstamp = task->get_submit_timestamp(); - } else { - ret = OB_EAGAIN; - } - task->unlock(); - } - if (NULL != ref && OB_SUCCESS != (tmp_ret = revert_log_task(ref))) { - CLOG_LOG(ERROR, "revert_log_task failed", K_(partition_key), K(tmp_ret), K(task)); - } else { - ref = NULL; - task = NULL; - } - return ret; -} - int ObLogSlidingWindow::submit_aggre_log(ObAggreBuffer* buffer, const int64_t base_timestamp) { int ret = OB_SUCCESS; @@ -1248,9 +1072,12 @@ int ObLogSlidingWindow::need_update_log_task_( ret = OB_INVALID_ARGUMENT; } else if (task.is_log_confirmed()) { if (is_confirm_match_(log_id, - header.get_data_checksum(), header.get_epoch_id(), - header.get_submit_timestamp(), task.get_data_checksum(), - task.get_epoch_id(), task.get_submit_timestamp())) { + header.get_data_checksum(), + header.get_epoch_id(), + header.get_submit_timestamp(), + task.get_data_checksum(), + task.get_epoch_id(), + task.get_submit_timestamp())) { CLOG_LOG(DEBUG, "receive submit log after confirm log, match", K(header), K_(partition_key), K(task)); } else { ret = OB_INVALID_LOG; @@ -1806,11 +1633,18 @@ int ObLogSlidingWindow::submit_confirmed_info_( */ if (log_task->is_submit_log_exist()) { if (!is_confirm_match_(log_id, - log_task->get_data_checksum(), log_task->get_epoch_id(), - log_task->get_submit_timestamp(), confirmed_info.get_data_checksum(), - confirmed_info.get_epoch_id(), confirmed_info.get_submit_timestamp())) { - CLOG_LOG(INFO, "log_task and confirmed_info not match, reset", K_(partition_key), - K(log_id), K(*log_task), K(confirmed_info)); + log_task->get_data_checksum(), + log_task->get_epoch_id(), + log_task->get_submit_timestamp(), + confirmed_info.get_data_checksum(), + confirmed_info.get_epoch_id(), + confirmed_info.get_submit_timestamp())) { + CLOG_LOG(INFO, + "log_task and confirmed_info not match, reset", + K_(partition_key), + K(log_id), + K(*log_task), + K(confirmed_info)); log_task->reset_log(); log_task->reset_log_cursor(); } @@ -2737,22 +2571,24 @@ int ObLogSlidingWindow::get_log(const uint64_t log_id, const uint32_t log_attr, return ret; } -bool ObLogSlidingWindow::is_confirm_match_(const uint64_t log_id, - const int64_t log_data_checksum, - const int64_t log_epoch_id, - const int64_t log_submit_timestamp, - const int64_t confirmed_info_data_checksum, - const int64_t confirmed_info_epoch_id, - const int64_t confirmed_info_submit_timestamp) +bool ObLogSlidingWindow::is_confirm_match_(const uint64_t log_id, const int64_t log_data_checksum, + const int64_t log_epoch_id, const int64_t log_submit_timestamp, const int64_t confirmed_info_data_checksum, + const int64_t confirmed_info_epoch_id, const int64_t confirmed_info_submit_timestamp) { bool bret = false; - if (log_data_checksum != confirmed_info_data_checksum - || log_epoch_id != confirmed_info_epoch_id - || (OB_INVALID_TIMESTAMP != confirmed_info_submit_timestamp - && log_submit_timestamp != confirmed_info_submit_timestamp)) { - CLOG_LOG(WARN, "confirm log not match", K_(partition_key), K(log_id), K(log_data_checksum), - K(log_epoch_id), K(log_submit_timestamp), K(confirmed_info_data_checksum), - K(confirmed_info_epoch_id), K(confirmed_info_submit_timestamp)); + if (log_data_checksum != confirmed_info_data_checksum || log_epoch_id != confirmed_info_epoch_id || + (OB_INVALID_TIMESTAMP != confirmed_info_submit_timestamp && + log_submit_timestamp != confirmed_info_submit_timestamp)) { + CLOG_LOG(WARN, + "confirm log not match", + K_(partition_key), + K(log_id), + K(log_data_checksum), + K(log_epoch_id), + K(log_submit_timestamp), + K(confirmed_info_data_checksum), + K(confirmed_info_epoch_id), + K(confirmed_info_submit_timestamp)); } else { bret = true; } @@ -3806,19 +3642,18 @@ int ObLogSlidingWindow::do_fetch_log(const uint64_t start_id, const uint64_t end int ret = OB_SUCCESS; bool need_check_rebuild = false; is_fetched = false; -// the follow code is used to test case clog/3050_rebuild_when_leader_reconfirm.test -// don't delete it -// user needs add this configuration to share/parameter/ob_parameter_seed.ipp -// DEF_BOOL(_enable_fetch_log, OB_CLUSTER_PARAMETER, "true", -// "enabl fetch log", ObParameterAttr(Section::OBSERVER, Source::DEFAULT -// , EditLevel::DYNAMIC_EFFECTIVE)); -// const bool can_fetch_log = GCONF._enable_fetch_log; + // the follow code is used to test case clog/3050_rebuild_when_leader_reconfirm.test + // don't delete it + // user needs add this configuration to share/parameter/ob_parameter_seed.ipp + // DEF_BOOL(_enable_fetch_log, OB_CLUSTER_PARAMETER, "true", + // "enabl fetch log", ObParameterAttr(Section::OBSERVER, Source::DEFAULT + // , EditLevel::DYNAMIC_EFFECTIVE)); + // const bool can_fetch_log = GCONF._enable_fetch_log; if (IS_NOT_INIT) { ret = OB_NOT_INIT; -// } else if (!can_fetch_log) { -// CLOG_LOG(INFO, "can't fetch log", K(ret), K(partition_key_), K(start_id), K(end_id)); - } else if (start_id <= 0 || end_id <= 0 || start_id >= end_id || OB_ISNULL(state_mgr_) - || OB_ISNULL(log_engine_)) { + // } else if (!can_fetch_log) { + // CLOG_LOG(INFO, "can't fetch log", K(ret), K(partition_key_), K(start_id), K(end_id)); + } else if (start_id <= 0 || end_id <= 0 || start_id >= end_id || OB_ISNULL(state_mgr_) || OB_ISNULL(log_engine_)) { ret = OB_INVALID_ARGUMENT; CLOG_LOG(WARN, "invalid arguments", K(ret), K(partition_key_), K(start_id), K(end_id)); } else if (!check_need_fetch_log_(start_id, need_check_rebuild)) { @@ -5192,10 +5027,8 @@ int ObLogSlidingWindow::set_confirmed_info_without_lock_( { int ret = OB_SUCCESS; ObConfirmedInfo confirmed_info; - if (OB_FAIL(confirmed_info.init(header.get_data_checksum(), - header.get_epoch_id(), - accum_checksum, - header.get_submit_timestamp()))) { + if (OB_FAIL(confirmed_info.init( + header.get_data_checksum(), header.get_epoch_id(), accum_checksum, header.get_submit_timestamp()))) { CLOG_LOG(ERROR, "confirmed_info init failed", K_(partition_key), K(header), KR(ret)); } else { log_task.set_confirmed_info(confirmed_info); diff --git a/src/clog/ob_log_sliding_window.h b/src/clog/ob_log_sliding_window.h index b8d49a5b3f33b8f5d42de0e6e1aa32f7957c208a..04faeadf1013ea0b88779f7063bb5b463678e013 100644 --- a/src/clog/ob_log_sliding_window.h +++ b/src/clog/ob_log_sliding_window.h @@ -427,7 +427,6 @@ public: int submit_replay_task(const bool need_async, bool& is_replayed, bool& is_replay_failed) override; void destroy(); int alloc_log_id(const int64_t base_timestamp, uint64_t& log_id, int64_t& submit_timestamp) override; - int get_next_timestamp(const uint64_t last_log_id, int64_t& res_ts); int get_next_served_log_info_by_next_replay_log_info(uint64_t& next_served_log_id, int64_t& next_served_log_ts); bool is_inited() const { @@ -543,19 +542,12 @@ private: const int64_t submit_timestamp, ObISubmitLogCb* cb); int try_freeze_aggre_buffer_(const uint64_t log_id); int submit_freeze_aggre_buffer_task_(const uint64_t log_id); - int submit_aggre_log_(ObAggreBuffer *buffer, - const uint64_t log_id, - const int64_t submit_timestamp); + int submit_aggre_log_(ObAggreBuffer* buffer, const uint64_t log_id, const int64_t submit_timestamp); int try_update_submit_timestamp(const int64_t base_ts); - bool is_confirm_match_(const uint64_t log_id, - const int64_t log_data_checksum, - const int64_t log_epoch_id, - const int64_t log_submit_timestamp, - const int64_t confirmed_info_data_checksum, - const int64_t confirmed_info_epoch_id, - const int64_t confirmed_info_submit_timestamp); - int receive_log_(const ObLogEntry &log_entry, const common::ObAddr &server, - const int64_t cluster_id); + bool is_confirm_match_(const uint64_t log_id, const int64_t log_data_checksum, const int64_t log_epoch_id, + const int64_t log_submit_timestamp, const int64_t confirmed_info_data_checksum, + const int64_t confirmed_info_epoch_id, const int64_t confirmed_info_submit_timestamp); + int receive_log_(const ObLogEntry& log_entry, const common::ObAddr& server, const int64_t cluster_id); void update_max_log_id_(const uint64_t log_id); int submit_to_sliding_window_(const ObLogEntryHeader& header, const char* buff, ObISubmitLogCb* cb, const bool need_replay, const bool send_slave, const common::ObAddr& server, const int64_t cluster_id, @@ -624,7 +616,6 @@ private: int generate_backfill_log_task_(const ObLogEntryHeader& header, const char* buff, const ObLogCursor& log_cursor, ObISubmitLogCb* submit_cb, const bool need_replay, const bool need_copy, const bool need_pinned, ObLogTask*& task); - int get_log_submit_tstamp_from_task_(const uint64_t log_id, int64_t& log_tstamp); int check_pre_barrier_(ObLogType log_type) const; void* alloc_log_task_buf_(); int need_replay_for_data_or_log_replica_(const bool is_trans_log, bool& need_replay) const; diff --git a/src/clog/ob_log_task.cpp b/src/clog/ob_log_task.cpp index e2fd299df61f659853b221dbae06962cbae34398..00cffc132bf9503d911c6088354723cabbe3fa90 100644 --- a/src/clog/ob_log_task.cpp +++ b/src/clog/ob_log_task.cpp @@ -660,12 +660,9 @@ void ObLogTask::set_confirmed_info(const ObConfirmedInfo& confirmed_info) const int64_t arg_submit_timestamp = confirmed_info.get_submit_timestamp(); if (is_submit_log_exist()) { // check data_checksum_ and epoch_id_ when log exists - if (data_checksum_ != arg_data_checksum - || epoch_id_ != arg_epoch_id - || (OB_INVALID_TIMESTAMP != arg_submit_timestamp - && submit_timestamp_ != arg_submit_timestamp)) { - CLOG_LOG(ERROR, "set_confirmed_info meta info not match", K(data_checksum_), - K(epoch_id_), K(confirmed_info)); + if (data_checksum_ != arg_data_checksum || epoch_id_ != arg_epoch_id || + (OB_INVALID_TIMESTAMP != arg_submit_timestamp && submit_timestamp_ != arg_submit_timestamp)) { + CLOG_LOG(ERROR, "set_confirmed_info meta info not match", K(data_checksum_), K(epoch_id_), K(confirmed_info)); } } epoch_id_ = arg_epoch_id; diff --git a/src/clog/ob_log_type.cpp b/src/clog/ob_log_type.cpp index 82a42d1786b7d346819cce73503f4456b3ec7c07..424b9c70aab78c929f8ef8ca50ee9841362ef23b 100644 --- a/src/clog/ob_log_type.cpp +++ b/src/clog/ob_log_type.cpp @@ -16,12 +16,9 @@ namespace oceanbase { using namespace common; -namespace clog -{ -int ObConfirmedInfo::init(const int64_t data_checksum, - const int64_t epoch_id, - const int64_t accum_checksum, - const int64_t submit_timestamp) +namespace clog { +int ObConfirmedInfo::init( + const int64_t data_checksum, const int64_t epoch_id, const int64_t accum_checksum, const int64_t submit_timestamp) { int ret = OB_SUCCESS; data_checksum_ = data_checksum; @@ -32,9 +29,7 @@ int ObConfirmedInfo::init(const int64_t data_checksum, } // used for RPC -OB_SERIALIZE_MEMBER(ObConfirmedInfo, data_checksum_, - epoch_id_, accum_checksum_, - submit_timestamp_); +OB_SERIALIZE_MEMBER(ObConfirmedInfo, data_checksum_, epoch_id_, accum_checksum_, submit_timestamp_); ObMembershipLog::ObMembershipLog() : version_(MS_LOG_VERSION), diff --git a/src/clog/ob_log_type.h b/src/clog/ob_log_type.h index b01e577a6441204ab8d2d12cb26ad2f40a4c5c64..59028d1d01e8532a942cd813dc8648a522007c17 100644 --- a/src/clog/ob_log_type.h +++ b/src/clog/ob_log_type.h @@ -24,18 +24,41 @@ class ObConfirmedInfo { OB_UNIS_VERSION(1); public: - ObConfirmedInfo() : data_checksum_(0), epoch_id_(common::OB_INVALID_TIMESTAMP), - accum_checksum_(0), submit_timestamp_(common::OB_INVALID_TIMESTAMP) {} - ~ObConfirmedInfo() {} + ObConfirmedInfo() + : data_checksum_(0), + epoch_id_(common::OB_INVALID_TIMESTAMP), + accum_checksum_(0), + submit_timestamp_(common::OB_INVALID_TIMESTAMP) + {} + ~ObConfirmedInfo() + {} + public: - int init(const int64_t data_checksum, const int64_t epoch_id, - const int64_t accum_checksum, const int64_t submit_timestamp_); - int64_t get_data_checksum() const { return data_checksum_; } - int64_t get_epoch_id() const { return epoch_id_; } - int64_t get_accum_checksum() const { return accum_checksum_; } - int64_t get_submit_timestamp() const { return submit_timestamp_; } - void reset() { data_checksum_ = 0; epoch_id_ = common::OB_INVALID_TIMESTAMP; accum_checksum_ = 0; } - void deep_copy(const ObConfirmedInfo &confirmed_info) + int init(const int64_t data_checksum, const int64_t epoch_id, const int64_t accum_checksum, + const int64_t submit_timestamp_); + int64_t get_data_checksum() const + { + return data_checksum_; + } + int64_t get_epoch_id() const + { + return epoch_id_; + } + int64_t get_accum_checksum() const + { + return accum_checksum_; + } + int64_t get_submit_timestamp() const + { + return submit_timestamp_; + } + void reset() + { + data_checksum_ = 0; + epoch_id_ = common::OB_INVALID_TIMESTAMP; + accum_checksum_ = 0; + } + void deep_copy(const ObConfirmedInfo& confirmed_info) { data_checksum_ = confirmed_info.data_checksum_; epoch_id_ = confirmed_info.epoch_id_; @@ -50,6 +73,7 @@ private: int64_t epoch_id_; int64_t accum_checksum_; int64_t submit_timestamp_; + private: DISALLOW_COPY_AND_ASSIGN(ObConfirmedInfo); }; diff --git a/src/clog/ob_partition_log_service.cpp b/src/clog/ob_partition_log_service.cpp index bcc570cab54d029472e576341e8fbbe5473c8d7b..156592b27323ee1c6dc17e5bf92b213c82929553 100644 --- a/src/clog/ob_partition_log_service.cpp +++ b/src/clog/ob_partition_log_service.cpp @@ -2922,8 +2922,15 @@ int ObPartitionLogService::fetch_log_from_ilog_storage_(const uint64_t log_id, c CLOG_LOG(INFO, "this replica need rebuild", K(ret), K(server), K(partition_key_), K(log_id), K(read_from_clog)); uint64_t last_replay_log_id = OB_INVALID_ID; if (OB_FAIL(get_storage_last_replay_log_id_(last_replay_log_id))) { - CLOG_LOG(WARN, "get_storage_last_replay_log_id_ failed", K(ret), K(partition_key_), K(log_id), K(fetch_type), - K(proposal_id), K(server), K(need_send_confirm_info)); + CLOG_LOG(WARN, + "get_storage_last_replay_log_id_ failed", + K(ret), + K(partition_key_), + K(log_id), + K(fetch_type), + K(proposal_id), + K(server), + K(need_send_confirm_info)); } else if (log_id > last_replay_log_id) { ret = OB_ERR_UNEXPECTED; CLOG_LOG(ERROR, @@ -5891,9 +5898,9 @@ int ObPartitionLogService::send_confirm_info_(const common::ObAddr& server, cons const uint64_t log_id = log_entry.get_header().get_log_id(); ObConfirmedInfo confirmed_info; if (OB_SUCCESS != (ret = confirmed_info.init(log_entry.get_header().get_data_checksum(), - log_entry.get_header().get_epoch_id(), - accum_checksum, - log_entry.get_header().get_submit_timestamp()))) { + log_entry.get_header().get_epoch_id(), + accum_checksum, + log_entry.get_header().get_submit_timestamp()))) { CLOG_LOG(WARN, "confirmed_info init failed", K_(partition_key), K(ret)); } else if (OB_SUCCESS != (ret = log_engine_->submit_confirmed_info( list, partition_key_, log_id, confirmed_info, batch_committed))) { @@ -5910,20 +5917,6 @@ int ObPartitionLogService::send_confirm_info_(const common::ObAddr& server, cons return ret; } -int ObPartitionLogService::get_next_timestamp(const uint64_t last_log_id, int64_t& res_ts) -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - } else if (OB_UNLIKELY(OB_INVALID_ID == last_log_id)) { - ret = OB_INVALID_ARGUMENT; - CLOG_LOG(WARN, "get next timestamp error", K(ret), K(last_log_id)); - } else { - ret = sw_.get_next_timestamp(last_log_id, res_ts); - } - return ret; -} - int ObPartitionLogService::try_update_next_replay_log_ts_in_restore(const int64_t new_ts) { int ret = OB_SUCCESS; @@ -8271,8 +8264,7 @@ int ObPartitionLogService::check_and_try_leader_revoke(const ObElection::RevokeT CLOG_LOG(ERROR, "check_majority_replica_clog_disk_full_ failed", K(ret)); } else { need_revoke = !majority_is_clog_disk_full; - CLOG_LOG(INFO, "partition may need revoke, ", "need revoke is ", need_revoke, - "and revoke type is ", revoke_type); + CLOG_LOG(INFO, "partition may need revoke, ", "need revoke is ", need_revoke, "and revoke type is ", revoke_type); } } @@ -8460,5 +8452,5 @@ int ObPartitionLogService::get_role_and_leader_epoch_unlock_( return ret; } -} // namespace clog -} // namespace oceanbase +} // namespace clog +} // namespace oceanbase diff --git a/src/clog/ob_partition_log_service.h b/src/clog/ob_partition_log_service.h index e5f588940688d5ac58bb2bc7a1bdec2d44acc664..84bc3b5ced3c149db35b391aa91ce07ba00b7a27 100644 --- a/src/clog/ob_partition_log_service.h +++ b/src/clog/ob_partition_log_service.h @@ -381,7 +381,6 @@ public: virtual int force_set_parent(const common::ObAddr& new_parent) = 0; virtual int force_reset_parent() = 0; virtual int force_set_server_list(const obrpc::ObServerList& server_list, const int64_t replica_num) = 0; - virtual int get_next_timestamp(const uint64_t last_log_id, int64_t& res_ts) = 0; virtual int get_next_served_log_info_for_leader(uint64_t& next_served_log_id, int64_t& next_served_log_ts) = 0; virtual uint64_t get_next_index_log_id() const = 0; virtual int get_pls_epoch(int64_t& pls_epoch) const = 0; @@ -663,7 +662,6 @@ public: virtual int flush_cb(const ObLogFlushCbArg& arg) override; virtual int on_get_election_priority(election::ObElectionPriority& priority) override; virtual int on_change_leader_retry(const common::ObAddr& server, ObTsWindows& changing_leader_windows) override; - virtual int get_next_timestamp(const uint64_t last_log_id, int64_t& res_ts) override; virtual int get_next_served_log_info_for_leader(uint64_t& next_served_log_id, int64_t& next_served_log_ts) override; virtual uint64_t get_next_index_log_id() const override { diff --git a/src/clog/ob_raw_entry_iterator.h b/src/clog/ob_raw_entry_iterator.h index ae0660e72d80a7b2667af3bf13a6504e44aeb41d..e9e05738aa427e06686060d00a8bba7fc81b8dc9 100644 --- a/src/clog/ob_raw_entry_iterator.h +++ b/src/clog/ob_raw_entry_iterator.h @@ -102,9 +102,8 @@ inline int parse_log_item_type(const char* buf, const int64_t len, ObCLogItemTyp // or the magic of this block is a ilog entry or clog entry), we need to read all subsequent contents of this // file, check whether there is a valid block and the timestamp recorded in the block header is greater than // or equal to last_block_ts, if not, the end of the file is read. -template -class ObRawEntryIterator: public Interface -{ +template +class ObRawEntryIterator : public Interface { public: ObRawEntryIterator(); virtual ~ObRawEntryIterator(); @@ -150,7 +149,7 @@ private: // // Since our log disk space is large enough, log files will not be reused within two seconds, // so this constant is safe in the scenario of reusing files. - static const int64_t CHECK_LAST_BLOCK_TS_INTERVAL = 2000 * 1000; // 2s + static const int64_t CHECK_LAST_BLOCK_TS_INTERVAL = 2000 * 1000; // 2s private: bool is_inited_; ObILogDirectReader* reader_; @@ -716,10 +715,9 @@ int ObRawEntryIterator::next_entry(Type& entry, ObReadParam& pa // last_block_ts must be vaild, because of this: // 1. Write file header is atomic, therefore, the last_block_ts is valid // 2. else, file header is ObNewLogFileBuf -template -bool ObRawEntryIterator::check_last_block_(const file_id_t file_id, - const offset_t start_offset, - const int64_t last_block_ts) const +template +bool ObRawEntryIterator::check_last_block_( + const file_id_t file_id, const offset_t start_offset, const int64_t last_block_ts) const { int ret = common::OB_SUCCESS; bool bool_ret = false; diff --git a/src/observer/ob_srv_xlator_primary.cpp b/src/observer/ob_srv_xlator_primary.cpp index 4602f5f792723ffdad45391c211996bc5d484f38..6629cc2658321d88ec7b412c282dae2c1c0f90ee 100644 --- a/src/observer/ob_srv_xlator_primary.cpp +++ b/src/observer/ob_srv_xlator_primary.cpp @@ -116,7 +116,6 @@ void oceanbase::observer::init_srv_xlator_for_clog(ObSrvRpcXlator* xlator) RPC_PROCESSOR(ObLogReqStartLogIdByTsProcessor, gctx_.par_ser_); RPC_PROCESSOR(ObLogReqStartPosByLogIdProcessor, gctx_.par_ser_); RPC_PROCESSOR(ObLogExternalFetchLogProcessor, gctx_.par_ser_); - RPC_PROCESSOR(ObLogReqHeartbeatInfoProcessor, gctx_.par_ser_); RPC_PROCESSOR(ObLogReqStartLogIdByTsProcessorWithBreakpoint, gctx_.par_ser_); RPC_PROCESSOR(ObLogReqStartPosByLogIdProcessorWithBreakpoint, gctx_.par_ser_); RPC_PROCESSOR(ObLogOpenStreamProcessor, gctx_.par_ser_);