提交 3fa7f397 编写于 作者: S SanmuWangZJU 提交者: wangzelin.wzl

patch code of observer modified by oblog

上级 4223b124
......@@ -50,7 +50,8 @@ public:
virtual ~ObExtendibleRingBuffer();
public:
int init(const int64_t begin_sn);
int init(const int64_t begin_sn,
const int64_t seg_size = erb::ObExtendibleRingBufferBase<T*, erb::PtrSlot<T> >::DEFAULT_SEG_SIZE);
int destroy();
bool is_inited() const;
......@@ -134,13 +135,13 @@ ObExtendibleRingBuffer<T>::~ObExtendibleRingBuffer()
{}
template <typename T>
int ObExtendibleRingBuffer<T>::init(const int64_t begin_sn)
int ObExtendibleRingBuffer<T>::init(const int64_t begin_sn, const int64_t seg_size)
{
int ret = OB_SUCCESS;
if (begin_sn < 0) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "err begin sn", K(ret), K(begin_sn));
} else if (OB_SUCCESS != (ret = BaseType::init(begin_sn, &alloc_))) {
} else if (OB_SUCCESS != (ret = BaseType::init(begin_sn, &alloc_, seg_size))) {
CLOG_LOG(WARN, "err init", K(ret), K(begin_sn));
}
return ret;
......
......@@ -211,13 +211,13 @@ class ObExtendibleRingBufferBase {
};
// Defines.
typedef ObExtendibleRingBufferBase<ValT, SlotT> MyType;
static const int64_t DEFAULT_SEG_SIZE = (1LL << 9); // 512 bytes
static const int64_t DEFAULT_SEG_CAPACITY =
static_cast<int64_t>((DEFAULT_SEG_SIZE - sizeof(Segment)) / sizeof(SlotT));
static const int64_t MIN_SEG_CNT = 2;
static const int64_t INIT_SEG_CNT = MIN_SEG_CNT;
typedef std::pair<int64_t, int64_t> SlotIdx; // <Segment index, Slot index on Segment>
public:
static const int64_t DEFAULT_SEG_SIZE = (1LL << 9); // 512 bytes
static const int64_t DEFAULT_SEG_CAPACITY =
static_cast<int64_t>((DEFAULT_SEG_SIZE - sizeof(Segment)) / sizeof(SlotT));
// Interface for subclass.
ObExtendibleRingBufferBase()
: inited_(false), seg_size_(0), seg_capacity_(0), begin_sn_(0), end_sn_(0), dir_(0), es_lock_(), allocator_(NULL)
......
......@@ -15,7 +15,6 @@ ob_set_subtarget(ob_clog common
ob_clog_writer.cpp
ob_disk_log_buffer.cpp
ob_external_fetcher.cpp
ob_external_heartbeat_handler.cpp
ob_external_leader_heartbeat_handler.cpp
ob_external_log_service.cpp
ob_external_log_service_monitor.cpp
......
......@@ -189,7 +189,7 @@ const uint64_t DEFAULT_CLOG_APPEND_TIMEOUT_US = 365ull * 24 * 3600 * 1000 * 1000
const uint64_t DEFAULT_WRITER_MAX_BUFFER_ITEM_CNT = 4 * 1024;
// the buffer size of membership log
const int64_t MS_LOG_BUFFER_SIZE = 2048;
} // namespace clog
} // namespace oceanbase
} // namespace clog
} // namespace oceanbase
#endif // OCEANBASE_CLOG_OB_CLOG_CONFIG_H_
......@@ -27,15 +27,15 @@ using namespace oceanbase::common;
namespace oceanbase {
namespace clog {
ObCLogBaseFileWriter::ObCLogBaseFileWriter() :
is_inited_(false),
aligned_data_buf_(nullptr),
buf_write_pos_(0),
file_offset_(0),
buf_padding_size_(0),
align_size_(0),
store_(NULL),
file_id_(0)
ObCLogBaseFileWriter::ObCLogBaseFileWriter()
: is_inited_(false),
aligned_data_buf_(nullptr),
buf_write_pos_(0),
file_offset_(0),
buf_padding_size_(0),
align_size_(0),
store_(NULL),
file_id_(0)
{
log_dir_[0] = '\0';
}
......@@ -45,8 +45,7 @@ ObCLogBaseFileWriter::~ObCLogBaseFileWriter()
destroy();
}
int ObCLogBaseFileWriter::init(const char *log_dir,
const uint32_t align_size, const ObILogFileStore *file_store)
int ObCLogBaseFileWriter::init(const char* log_dir, const uint32_t align_size, const ObILogFileStore* file_store)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
......@@ -55,8 +54,8 @@ int ObCLogBaseFileWriter::init(const char *log_dir,
} else if (OB_ISNULL(log_dir) || OB_ISNULL(file_store)) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid param", K(ret), K(log_dir), K(align_size), KP(file_store));
} else if (OB_ISNULL(aligned_data_buf_ = (char*) ob_malloc_align(
align_size, CLOG_MAX_WRITE_BUFFER_SIZE, "CLogFileWriter"))) {
} else if (OB_ISNULL(aligned_data_buf_ =
(char*)ob_malloc_align(align_size, CLOG_MAX_WRITE_BUFFER_SIZE, "CLogFileWriter"))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
CLOG_LOG(WARN, "get log buf failed", K(ret), K(log_dir));
} else {
......@@ -126,14 +125,13 @@ int ObCLogLocalFileWriter::load_file(uint32_t& file_id, uint32_t& offset, bool e
if (need_align()) {
buf_write_pos_ = offset % align_size_;
int64_t read_size = 0;
if (buf_write_pos_ > 0
&& OB_FAIL(store_->read(aligned_data_buf_, align_size_, lower_align(offset, align_size_), read_size))) {
if (buf_write_pos_ > 0 &&
OB_FAIL(store_->read(aligned_data_buf_, align_size_, lower_align(offset, align_size_), read_size))) {
CLOG_LOG(ERROR, "Fail to read data from log file, ", K(ret), K(buf_write_pos_), K(offset));
} else if (read_size != align_size_) {
CLOG_LOG(INFO, "Log file size is not aligned. ", K(read_size), K(align_size_));
} else {
CLOG_LOG(INFO, "Read data from log file to shared memory buffer, ", K(buf_write_pos_),
K(file_id), K(offset));
CLOG_LOG(INFO, "Read data from log file to shared memory buffer, ", K(buf_write_pos_), K(file_id), K(offset));
}
} else {
reset_buf();
......@@ -159,19 +157,22 @@ int ObCLogBaseFileWriter::append_trailer_entry(const uint32_t info_block_offset)
// build trailer from last 512 byte offset (4096-512)
int64_t trailer_pos = CLOG_DIO_ALIGN_SIZE - CLOG_TRAILER_SIZE;
char *buf = aligned_data_buf_ + trailer_pos;
char* buf = aligned_data_buf_ + trailer_pos;
reset_buf();
if (CLOG_TRAILER_OFFSET != file_offset_) { //Defense code
if (CLOG_TRAILER_OFFSET != file_offset_) { // Defense code
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(WARN, "file_offset_ mismatch trailer offset", K(ret), K_(file_offset),
LITERAL_K(CLOG_TRAILER_OFFSET));
} else if (OB_FAIL(trailer.build_serialized_trailer(buf, CLOG_TRAILER_SIZE, info_block_offset,
phy_file_id, pos))) {
CLOG_LOG(WARN, "build_serialized_trailer fail", K(ret), LITERAL_K(CLOG_DIO_ALIGN_SIZE),
K(info_block_offset), K_(file_id), K(phy_file_id));
CLOG_LOG(WARN, "file_offset_ mismatch trailer offset", K(ret), K_(file_offset), LITERAL_K(CLOG_TRAILER_OFFSET));
} else if (OB_FAIL(trailer.build_serialized_trailer(buf, CLOG_TRAILER_SIZE, info_block_offset, phy_file_id, pos))) {
CLOG_LOG(WARN,
"build_serialized_trailer fail",
K(ret),
LITERAL_K(CLOG_DIO_ALIGN_SIZE),
K(info_block_offset),
K_(file_id),
K(phy_file_id));
} else {
buf_write_pos_ += (uint32_t) CLOG_DIO_ALIGN_SIZE;
buf_write_pos_ += (uint32_t)CLOG_DIO_ALIGN_SIZE;
}
return ret;
......@@ -180,7 +181,7 @@ int ObCLogBaseFileWriter::append_trailer_entry(const uint32_t info_block_offset)
int ObCLogBaseFileWriter::flush_trailer_entry()
{
int ret = OB_SUCCESS;
if (CLOG_TRAILER_OFFSET != file_offset_) { // Defense code
if (CLOG_TRAILER_OFFSET != file_offset_) { // Defense code
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(WARN, "file offset mismatch", K_(file_offset), LITERAL_K(CLOG_TRAILER_OFFSET));
} else if (CLOG_DIO_ALIGN_SIZE != buf_write_pos_) {
......@@ -188,8 +189,13 @@ int ObCLogBaseFileWriter::flush_trailer_entry()
CLOG_LOG(WARN, "buf write position mismatch", K_(buf_write_pos), LITERAL_K(CLOG_DIO_ALIGN_SIZE));
} else if (OB_FAIL(store_->write(aligned_data_buf_, buf_write_pos_, CLOG_TRAILER_ALIGN_WRITE_OFFSET))) {
// no retry
CLOG_LOG(ERROR, "write fail", K(ret), K(buf_write_pos_), K_(file_offset),
LITERAL_K(CLOG_TRAILER_ALIGN_WRITE_OFFSET), K(errno));
CLOG_LOG(ERROR,
"write fail",
K(ret),
K(buf_write_pos_),
K_(file_offset),
LITERAL_K(CLOG_TRAILER_ALIGN_WRITE_OFFSET),
K(errno));
}
return ret;
}
......@@ -198,7 +204,7 @@ int ObCLogBaseFileWriter::append_info_block_entry(ObIInfoBlockHandler* info_gett
{
int ret = OB_SUCCESS;
ObLogBlockMetaV2 meta;
const uint32_t block_meta_len = (uint32_t) meta.get_serialize_size();
const uint32_t block_meta_len = (uint32_t)meta.get_serialize_size();
const int64_t buf_len = CLOG_MAX_WRITE_BUFFER_SIZE - block_meta_len;
int64_t data_len = 0;
int64_t pos = 0;
......@@ -211,7 +217,8 @@ int ObCLogBaseFileWriter::append_info_block_entry(ObIInfoBlockHandler* info_gett
// build_info_block will reset flying info_block for next file
CLOG_LOG(WARN, "read partition meta fail", K(ret), KP(buf), K(buf_len), K_(file_offset), K_(buf_padding_size));
} else if (OB_FAIL(meta.build_serialized_block(aligned_data_buf_, block_meta_len, buf, data_len, OB_INFO_BLOCK, pos))) {
} else if (OB_FAIL(
meta.build_serialized_block(aligned_data_buf_, block_meta_len, buf, data_len, OB_INFO_BLOCK, pos))) {
CLOG_LOG(WARN, "build serialized block fail", K(ret), K_(file_offset), K_(buf_padding_size));
} else {
buf_write_pos_ += (block_meta_len + (uint32_t)data_len);
......@@ -266,8 +273,12 @@ int ObCLogBaseFileWriter::append_padding_entry(const uint32_t padding_size)
if (buf_write_pos_ + padding_size > CLOG_MAX_WRITE_BUFFER_SIZE) {
ret = OB_BUF_NOT_ENOUGH;
CLOG_LOG(WARN, "padding entry size over buf length", K(ret), K(padding_size),
K_(buf_write_pos), LITERAL_K(CLOG_MAX_WRITE_BUFFER_SIZE));
CLOG_LOG(WARN,
"padding entry size over buf length",
K(ret),
K(padding_size),
K_(buf_write_pos),
LITERAL_K(CLOG_MAX_WRITE_BUFFER_SIZE));
} else if (OB_FAIL(padding_entry.set_entry_size(padding_size))) {
CLOG_LOG(WARN, "padding entry set size error", K(ret), K(padding_size));
} else if (OB_FAIL(padding_entry.serialize(buf, padding_size, serialize_pos))) {
......@@ -280,7 +291,7 @@ int ObCLogBaseFileWriter::append_padding_entry(const uint32_t padding_size)
return ret;
}
int ObCLogBaseFileWriter::cache_buf(ObLogCache *log_cache, const char *buf, const uint32_t buf_len)
int ObCLogBaseFileWriter::cache_buf(ObLogCache* log_cache, const char* buf, const uint32_t buf_len)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(buf) || 0 == buf_len) {
......@@ -310,9 +321,9 @@ int ObCLogBaseFileWriter::append_log_entry(const char* item_buf, const uint32_t
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(WARN, "file not start", K_(file_id), K(ret));
} else {
//copy log to memory buffer
// copy log to memory buffer
memcpy(aligned_data_buf_ + buf_write_pos_, item_buf, len);
buf_write_pos_ += (uint32_t) len;
buf_write_pos_ += (uint32_t)len;
if (OB_FAIL(align_buf())) {
CLOG_LOG(ERROR, "fail to add padding, ", K(ret));
......@@ -359,8 +370,7 @@ int ObCLogLocalFileWriter::align_buf()
}
/// ObCLogLocalFileWriter ///
int ObCLogLocalFileWriter::init(const char* log_dir,
const uint32_t align_size, const ObILogFileStore* file_store)
int ObCLogLocalFileWriter::init(const char* log_dir, const uint32_t align_size, const ObILogFileStore* file_store)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
......@@ -554,7 +564,7 @@ int ObCLogLocalFileWriter::end_current_file(ObIInfoBlockHandler* info_getter, Ob
// - Flush trailer entry to log file
// - Cache trailer entry to log cache
char *trailer_buf = aligned_data_buf_ + CLOG_DIO_ALIGN_SIZE - CLOG_TRAILER_SIZE;
char* trailer_buf = aligned_data_buf_ + CLOG_DIO_ALIGN_SIZE - CLOG_TRAILER_SIZE;
if (OB_SUCC(ret)) {
if (OB_FAIL(append_trailer_entry(info_block_offset))) {
CLOG_LOG(WARN, "fail to add trailer", K(ret));
......
......@@ -37,8 +37,7 @@ public:
ObCLogBaseFileWriter();
virtual ~ObCLogBaseFileWriter();
virtual int init(const char *log_dir,
const uint32_t align_size, const common::ObILogFileStore *file_store);
virtual int init(const char* log_dir, const uint32_t align_size, const common::ObILogFileStore* file_store);
virtual void destroy();
// When log engine start, need to flush remaining content in shared memory buffer to log file
......@@ -96,7 +95,7 @@ protected:
int append_trailer_entry(const uint32_t info_block_offset);
int flush_trailer_entry();
// append all data in buffer to log cache
int cache_buf(ObLogCache *log_cache, const char *buf, const uint32_t buf_len);
int cache_buf(ObLogCache* log_cache, const char* buf, const uint32_t buf_len);
OB_INLINE bool need_align() const
{
......@@ -109,7 +108,7 @@ protected:
protected:
bool is_inited_;
char *aligned_data_buf_;
char* aligned_data_buf_;
uint32_t buf_write_pos_;
uint32_t file_offset_;
// the last aligned part padding size of the buffer
......@@ -132,8 +131,7 @@ public:
destroy();
}
virtual int init(const char *log_dir,
const uint32_t align_size, const common::ObILogFileStore *file_store) override;
virtual int init(const char* log_dir, const uint32_t align_size, const common::ObILogFileStore* file_store) override;
virtual void destroy();
virtual int load_file(uint32_t& file_id, uint32_t& offset, bool enable_pre_creation = false) override;
......
......@@ -179,7 +179,7 @@ void ObCLogWriter::set_clog_writer_thread_name()
void ObCLogWriter::process_log_items(common::ObIBaseLogItem** items, const int64_t item_cnt, int64_t& finish_cnt)
{
int ret = OB_SUCCESS;
ObICLogItem *item = NULL;
ObICLogItem* item = NULL;
int64_t cur_time = 0;
int64_t io_time = 0;
int64_t flush_time = 0;
......@@ -205,7 +205,7 @@ void ObCLogWriter::process_log_items(common::ObIBaseLogItem** items, const int64
const bool is_idempotent = false;
const uint64_t write_len = block_meta_len + item->get_data_len();
const int64_t warning_value = GCONF.data_storage_warning_tolerance_time;
ObCLogDiskErrorCB *cb = NULL;
ObCLogDiskErrorCB* cb = NULL;
lib::ObMutexGuard guard(file_mutex_);
BG_NEW_CALLBACK(cb, ObCLogDiskErrorCB, this);
......@@ -257,7 +257,7 @@ void ObCLogWriter::process_log_items(common::ObIBaseLogItem** items, const int64
if (OB_SUCC(ret)) {
io_time = ObTimeUtility::current_time() - cur_time;
//log flush succeed, invoke callback when disk sync
// log flush succeed, invoke callback when disk sync
after_flush(item, block_meta_len, ret, flush_start_offset, finish_cnt);
flush_time = ObTimeUtility::current_time() - cur_time - io_time;
......@@ -378,8 +378,7 @@ ObCLogDiskErrorCB::ObCLogDiskErrorCB(ObCLogWriter* host) : host_(host)
{}
ObCLogDiskErrorCB::~ObCLogDiskErrorCB()
{
}
{}
int ObCLogDiskErrorCB::callback()
{
......@@ -405,11 +404,8 @@ void ObCLogDiskErrorCB::destroy()
}
}
int locate_clog_tail(const int64_t timeout,
ObILogFileStore *file_store,
ObLogDirectReader *reader,
file_id_t &file_id,
offset_t &offset)
int locate_clog_tail(
const int64_t timeout, ObILogFileStore* file_store, ObLogDirectReader* reader, file_id_t& file_id, offset_t& offset)
{
ObLogFileTailLocatorImpl<ObLogEntry, ObIRawLogIterator> impl;
return impl.locate_tail(timeout, file_store, reader, file_id, offset);
......
......@@ -291,17 +291,13 @@ int ObExtLogFetcher::handle_log_not_exist(
return ret;
}
int ObExtLogFetcher::after_partition_fetch_log(ObStreamItem &stream_item,
const uint64_t beyond_upper_log_id,
const int64_t beyond_upper_log_ts,
const int64_t fetched_log_count,
ObLogStreamFetchLogResp &resp,
const ObLogCursorExt *cursor_ext,
clog::ObReadCost &read_cost)
int ObExtLogFetcher::after_partition_fetch_log(ObStreamItem& stream_item, const uint64_t beyond_upper_log_id,
const int64_t beyond_upper_log_ts, const int64_t fetched_log_count, ObLogStreamFetchLogResp& resp,
const ObLogCursorExt* cursor_ext, clog::ObReadCost& read_cost)
{
int ret = OB_SUCCESS;
int64_t upper_log_ts = beyond_upper_log_ts;
const ObPartitionKey &pkey = stream_item.pkey_;
const ObPartitionKey& pkey = stream_item.pkey_;
// 1. dealing with heartbeat hollow
if ((0 == fetched_log_count) && OB_INVALID_ID != beyond_upper_log_id) {
// log hole problem:
......@@ -317,7 +313,8 @@ int ObExtLogFetcher::after_partition_fetch_log(ObStreamItem &stream_item,
LOG_WARN("resp get_aggre_log_min_timestamp error", K(ret), KPC(cursor_ext), K(upper_log_ts));
} else if (upper_log_ts != cursor_ext->get_submit_timestamp()) {
LOG_TRACE("next log is aggregate log, update first log id as beyond_upper_log_ts",
K(upper_log_ts), K(cursor_ext->get_submit_timestamp()));
K(upper_log_ts),
K(cursor_ext->get_submit_timestamp()));
}
}
ObLogStreamFetchLogResp::FetchLogHeartbeatItem hbp;
......@@ -335,23 +332,21 @@ int ObExtLogFetcher::after_partition_fetch_log(ObStreamItem &stream_item,
return ret;
}
int ObExtLogFetcher::get_aggre_log_min_timestamp(const ObPartitionKey &pkey,
const clog::ObLogCursorExt &cursor_ext,
int64_t &first_log_ts,
ObReadCost &read_cost)
int ObExtLogFetcher::get_aggre_log_min_timestamp(
const ObPartitionKey& pkey, const clog::ObLogCursorExt& cursor_ext, int64_t& first_log_ts, ObReadCost& read_cost)
{
int ret = OB_SUCCESS;
bool fetch_log_from_hot_cache = true;
int64_t log_entry_size = 0;
int64_t end_tstamp = INT64_MAX; // no need for timeout limit
int64_t end_tstamp = INT64_MAX; // no need for timeout limit
ObReadParam param;
param.offset_ = cursor_ext.get_offset();
param.read_len_ = cursor_ext.get_size();
param.file_id_ = cursor_ext.get_file_id();
ObReadBufGuard guard(ObModIds::OB_LOG_DECRYPT_ID);
ObReadBuf &rbuf = guard.get_read_buf();
if (OB_FAIL(fetch_log_entry_(pkey, param, rbuf.buf_, rbuf.buf_len_, end_tstamp,
read_cost, fetch_log_from_hot_cache, log_entry_size))) {
ObReadBuf& rbuf = guard.get_read_buf();
if (OB_FAIL(fetch_log_entry_(
pkey, param, rbuf.buf_, rbuf.buf_len_, end_tstamp, read_cost, fetch_log_from_hot_cache, log_entry_size))) {
LOG_WARN("failed to fetch log entry", K(ret), K(param), K(pkey));
} else {
clog::ObLogEntry log_entry;
......@@ -359,7 +354,7 @@ int ObExtLogFetcher::get_aggre_log_min_timestamp(const ObPartitionKey &pkey,
if (OB_FAIL(log_entry.deserialize(rbuf.buf_, rbuf.buf_len_, log_entry_pos))) {
LOG_WARN("failed to deserialize log entry", K(ret), K(rbuf), K(log_entry_pos));
} else if (OB_LOG_AGGRE == log_entry.get_header().get_log_type()) {
const char *data_buf = log_entry.get_buf();
const char* data_buf = log_entry.get_buf();
const int64_t data_len = log_entry.get_header().get_data_len();
int32_t next_log_offset = 0;
int64_t pos = 0;
......@@ -369,8 +364,13 @@ int ObExtLogFetcher::get_aggre_log_min_timestamp(const ObPartitionKey &pkey,
// update first log ts as aggre log ts
LOG_WARN("serialization::decode_i64 failed", K(ret), K(data_len), K(pos), KP(data_buf));
} else {
LOG_TRACE("get_aggre_log_min_timestamp", K(ret), K(data_len), K(pos), KP(data_buf),
K(first_log_ts), K(next_log_offset));
LOG_TRACE("get_aggre_log_min_timestamp",
K(ret),
K(data_len),
K(pos),
KP(data_buf),
K(first_log_ts),
K(next_log_offset));
}
} else {
// not aggregate log, no need to update
......@@ -380,13 +380,9 @@ int ObExtLogFetcher::get_aggre_log_min_timestamp(const ObPartitionKey &pkey,
}
// Get single log entry
int ObExtLogFetcher::partition_fetch_log_entry_(const ObLogCursorExt &cursor_ext,
const ObPartitionKey &pkey,
const int64_t end_tstamp,
ObReadCost &read_cost,
ObLogStreamFetchLogResp &resp,
bool &fetch_log_from_hot_cache,
int64_t &log_entry_size)
int ObExtLogFetcher::partition_fetch_log_entry_(const ObLogCursorExt& cursor_ext, const ObPartitionKey& pkey,
const int64_t end_tstamp, ObReadCost& read_cost, ObLogStreamFetchLogResp& resp, bool& fetch_log_from_hot_cache,
int64_t& log_entry_size)
{
int ret = OB_SUCCESS;
int64_t remain_size = 0;
......@@ -540,7 +536,7 @@ int ObExtLogFetcher::partition_fetch_log(ObStreamItem& stream_item, FetchRunTime
const ObPartitionKey& pkey = stream_item.pkey_;
uint64_t beyond_upper_log_id = OB_INVALID_ID;
int64_t beyond_upper_log_ts = OB_INVALID_TIMESTAMP;
const ObLogCursorExt *next_cursor = NULL;
const ObLogCursorExt* next_cursor = NULL;
// Note: After the optimization of step by step, the count of logs fetched in each round of
// each partition will be "suddenly reduced". It is possible to get only a few logs per round,
......@@ -627,8 +623,8 @@ int ObExtLogFetcher::partition_fetch_log(ObStreamItem& stream_item, FetchRunTime
}
if (OB_SUCCESS == ret) {
if (OB_FAIL(after_partition_fetch_log(stream_item, beyond_upper_log_id, beyond_upper_log_ts,
log_count, resp, next_cursor, frt.read_cost_))) {
if (OB_FAIL(after_partition_fetch_log(
stream_item, beyond_upper_log_id, beyond_upper_log_ts, log_count, resp, next_cursor, frt.read_cost_))) {
LOG_WARN("after partition fetch log error",
K(ret),
K(stream_item),
......
......@@ -165,9 +165,9 @@ private:
const common::ObPartitionKey& pkey, const uint64_t next_log_id, obrpc::ObLogStreamFetchLogResp& resp);
int after_partition_fetch_log(ObStreamItem& stream_item, const uint64_t beyond_upper_log_id,
const int64_t beyond_upper_log_ts, const int64_t fetched_log_count, obrpc::ObLogStreamFetchLogResp& resp,
const clog::ObLogCursorExt *cursor_ext, clog::ObReadCost &read_cost);
int get_aggre_log_min_timestamp(const common::ObPartitionKey &pkey, const clog::ObLogCursorExt &cursor_ext,
int64_t &first_log_ts, clog::ObReadCost &read_cost);
const clog::ObLogCursorExt* cursor_ext, clog::ObReadCost& read_cost);
int get_aggre_log_min_timestamp(const common::ObPartitionKey& pkey, const clog::ObLogCursorExt& cursor_ext,
int64_t& first_log_ts, clog::ObReadCost& read_cost);
int prefill_resp_with_clog_entry(const clog::ObLogCursorExt& cursor_ext, const common::ObPartitionKey& pkey,
const int64_t end_tstamp, clog::ObReadCost& read_cost, obrpc::ObLogStreamFetchLogResp& resp,
bool& fetch_log_from_hot_cache, int64_t& log_entry_size);
......
......@@ -58,8 +58,6 @@ int ObExtLogService::init(ObPartitionService* partition_service, ObILogEngine* l
EXTLOG_LOG(WARN, "fetcher init error", K(ret), KP(log_engine), K(self_addr));
} else if (OB_FAIL(archive_log_fetcher_.init(log_archive_line_cache_, log_engine))) {
EXTLOG_LOG(WARN, "log_archive_fetcher init error", K(ret), KP(log_engine), K(self_addr));
} else if (OB_FAIL(hb_handler_.init(partition_service, log_engine))) {
EXTLOG_LOG(WARN, "hb_handler_ init error", K(ret));
} else if (OB_FAIL(leader_hb_handler_.init(partition_service))) {
EXTLOG_LOG(WARN, "leader_hb_handler_ init error", K(ret));
} else {
......@@ -80,7 +78,6 @@ void ObExtLogService::destroy()
locator_.destroy();
fetcher_.destroy();
archive_log_fetcher_.destroy();
hb_handler_.destroy();
leader_hb_handler_.destroy();
clog_mgr_ = NULL;
line_cache_.destroy();
......@@ -196,24 +193,6 @@ int ObExtLogService::archive_fetch_log(const ObPGKey& pg_key, const ObReadParam&
return ret;
}
int ObExtLogService::req_heartbeat_info(
const ObLogReqHeartbeatInfoRequest& req_msg, ObLogReqHeartbeatInfoResponse& resp)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
EXTLOG_LOG(WARN, "ObExtLogService not init", K(ret));
} else {
const int64_t start_ts = ObTimeUtility::current_time();
ret = hb_handler_.req_heartbeat_info(req_msg, resp);
const int64_t end_ts = ObTimeUtility::current_time();
ObExtLogServiceMonitor::heartbeat_count();
EVENT_INC(CLOG_EXTLOG_HEARTBEAT_RPC_COUNT);
ObExtLogServiceMonitor::heartbeat_time(end_ts - start_ts);
}
return ret;
}
int ObExtLogService::leader_heartbeat(
const obrpc::ObLogLeaderHeartbeatReq& req_msg, obrpc::ObLogLeaderHeartbeatResp& resp)
{
......
......@@ -18,7 +18,6 @@
#include "ob_external_fetcher.h"
#include "ob_archive_log_fetcher.h"
#include "ob_external_start_log_locator.h"
#include "ob_external_heartbeat_handler.h"
#include "ob_external_leader_heartbeat_handler.h"
#include "ob_log_line_cache.h" // ObLogLineCache
......@@ -39,8 +38,6 @@ namespace logservice {
* > ObExtStartLogLocator: Given a timestamp (specified when liboblog restart) to determine
* from which log_id each partition will be pulled.
* > ObExtLogFetcher: streaming pull operator
* > ObExtHeartbeatHandler: The old version of Heartbeat (supports querying the timestamp of older logs)
* is obsolete and is reserved for compatibility consideration.
* > ObExtLeaderHeartbeatHandler: The new version of the heartbeat, called LeaderHeartbeat,
* returns the next log and forecast timestamp.
*/
......@@ -105,7 +102,6 @@ public:
log_archive_line_cache_(),
locator_(),
fetcher_(),
hb_handler_(),
leader_hb_handler_()
{}
~ObExtLogService()
......@@ -126,8 +122,6 @@ public:
// for log archive
int archive_fetch_log(
const common::ObPGKey& pg_key, const clog::ObReadParam& param, clog::ObReadBuf& rbuf, clog::ObReadRes& res);
int req_heartbeat_info(
const obrpc::ObLogReqHeartbeatInfoRequest& req_msg, obrpc::ObLogReqHeartbeatInfoResponse& response);
int leader_heartbeat(const obrpc::ObLogLeaderHeartbeatReq& req_msg, obrpc::ObLogLeaderHeartbeatResp& resp);
int wash_expired_stream();
int report_all_stream();
......@@ -151,7 +145,6 @@ private:
ObExtStartLogLocator locator_;
ObExtLogFetcher fetcher_;
ObArchiveLogFetcher archive_log_fetcher_;
ObExtHeartbeatHandler hb_handler_;
ObExtLeaderHeartbeatHandler leader_hb_handler_;
};
......
......@@ -481,13 +481,9 @@ int ObFileIdList::locate(const ObPartitionKey& pkey, const int64_t target_value,
}
// max_log_id,max_log_timestamp,start_offset may be invalid
int ObFileIdList::append(const ObPartitionKey &pkey,
const file_id_t file_id,
const offset_t start_offset,
const uint64_t min_log_id,
const uint64_t max_log_id,
const int64_t min_log_timestamp,
const int64_t max_log_timestamp)
int ObFileIdList::append(const ObPartitionKey& pkey, const file_id_t file_id, const offset_t start_offset,
const uint64_t min_log_id, const uint64_t max_log_id, const int64_t min_log_timestamp,
const int64_t max_log_timestamp)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
......@@ -686,9 +682,7 @@ int ObFileIdList::purge(const common::ObPartitionKey& pkey, ObIFileIdCachePurgeS
}
// The caller guarantees that the function will not be executed concurrently
int ObFileIdList::purge_(const bool is_front_end,
IPurgeChecker &checker,
bool &empty)
int ObFileIdList::purge_(const bool is_front_end, IPurgeChecker& checker, bool& empty)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
......@@ -1076,11 +1070,8 @@ void ObFileIdCache::destroy()
// 4. OB_PARTITION_NOT_EXIST partiiton not exist, prev_item and next_item are both invalid
// 5. OB_NEED_RETRY need retrym prev_item and next_item are both invalid
// 6. Others
int ObFileIdCache::locate(const ObPartitionKey &pkey,
const int64_t target_value,
const bool locate_by_log_id,
Log2File &prev_item,
Log2File &next_item)
int ObFileIdCache::locate(const ObPartitionKey& pkey, const int64_t target_value, const bool locate_by_log_id,
Log2File& prev_item, Log2File& next_item)
{
int ret = OB_SUCCESS;
ObFileIdList* list = NULL;
......@@ -1297,8 +1288,7 @@ int ObFileIdCache::AppendInfoFunctor::init(const file_id_t file_id, ObFileIdCach
}
// For compatibility, allow max_log_id and max_log_timestamp to be invalid
bool ObFileIdCache::AppendInfoFunctor::operator()(const ObPartitionKey &pkey,
const IndexInfoBlockEntry &entry)
bool ObFileIdCache::AppendInfoFunctor::operator()(const ObPartitionKey& pkey, const IndexInfoBlockEntry& entry)
{
if (OB_UNLIKELY(!pkey.is_valid()) || OB_UNLIKELY(!entry.is_valid())) {
err_ = OB_ERR_UNEXPECTED;
......@@ -1466,13 +1456,9 @@ int ObFileIdCache::append_new_list_(const ObPartitionKey& pkey, const file_id_t
}
// For compatibility, allow max_log_id and max_log_timestamp to be invalid
int ObFileIdCache::do_append_(const ObPartitionKey &pkey,
const file_id_t file_id,
const offset_t start_offset,
const uint64_t min_log_id,
const uint64_t max_log_id,
const int64_t min_log_timestamp,
const int64_t max_log_timestamp)
int ObFileIdCache::do_append_(const ObPartitionKey& pkey, const file_id_t file_id, const offset_t start_offset,
const uint64_t min_log_id, const uint64_t max_log_id, const int64_t min_log_timestamp,
const int64_t max_log_timestamp)
{
int ret = OB_SUCCESS;
ObFileIdList* list = NULL;
......
......@@ -52,21 +52,26 @@ class ObIlogAccessor;
// p3 -> [(50,f1), (5000,f3)]
//
// When querying (p2, 170), first find the ordered list corresponding to p2, and then, use binary search.
class Log2File
{
class Log2File {
public:
Log2File() : file_id_(common::OB_INVALID_FILE_ID),
// For version after 2.1(include 2.1), start_offset correspond to the offset
// of the first ilog of this partition in ilog file
// For version before 2.1, the start_offset correspond to the start_offset_index
// of the first ilog of this partition in ilog cache
start_offset_(OB_INVALID_OFFSET),
min_log_id_(common::OB_INVALID_ID),
max_log_id_(common::OB_INVALID_ID),
min_log_timestamp_(common::OB_INVALID_TIMESTAMP),
max_log_timestamp_(common::OB_INVALID_TIMESTAMP) {}
~Log2File() { reset(); }
Log2File()
: file_id_(common::OB_INVALID_FILE_ID),
// For version after 2.1(include 2.1), start_offset correspond to the offset
// of the first ilog of this partition in ilog file
// For version before 2.1, the start_offset correspond to the start_offset_index
// of the first ilog of this partition in ilog cache
start_offset_(OB_INVALID_OFFSET),
min_log_id_(common::OB_INVALID_ID),
max_log_id_(common::OB_INVALID_ID),
min_log_timestamp_(common::OB_INVALID_TIMESTAMP),
max_log_timestamp_(common::OB_INVALID_TIMESTAMP)
{}
~Log2File()
{
reset();
}
public:
file_id_t get_file_id() const
{
......@@ -223,7 +228,7 @@ public:
}
// Determine whether target_item is the next consecutive item
bool is_preceding_to(const Log2File &target_item) const
bool is_preceding_to(const Log2File& target_item) const
{
bool bool_ret = false;
if (common::OB_INVALID_ID != get_max_log_id() && common::OB_INVALID_ID != target_item.min_log_id_) {
......@@ -311,8 +316,7 @@ private:
ObFileIdCache& file_id_cache_;
};
class ObFileIdList
{
class ObFileIdList {
public:
class BackFillFunctor {
public:
......@@ -332,8 +336,7 @@ public:
offset_t start_offset_;
};
class IPurgeChecker
{
class IPurgeChecker {
public:
virtual bool should_purge(const Log2File& log_2_file) const = 0;
virtual bool is_valid() const = 0;
......@@ -341,8 +344,7 @@ public:
};
// purge min
// should_purge return true if min_log_id > top_item.file_id_
class PurgeChecker : public IPurgeChecker
{
class PurgeChecker : public IPurgeChecker {
public:
explicit PurgeChecker(const common::ObPartitionKey& pkey, ObIFileIdCachePurgeStrategy& purge_strategy)
: partition_key_(pkey), purge_strategy_(purge_strategy)
......@@ -362,8 +364,7 @@ public:
// should_purge return true if top_item.file_id_ == broken_file_id_
// Because loading an InfoBlock involves multiple partitions, if only load a part of them,
// then all of this load must be cleaned up
class ClearBrokenFunctor : public IPurgeChecker
{
class ClearBrokenFunctor : public IPurgeChecker {
public:
explicit ClearBrokenFunctor(const file_id_t file_id) : broken_file_id_(file_id)
{}
......@@ -413,11 +414,8 @@ public:
static const int64_t NEED_USE_SEG_ARRAY_THRESHOLD = 50;
private:
int purge_(const bool is_front_end,
IPurgeChecker &checker,
bool &empty);
int purge_preceding_items_(const ObPartitionKey &pkey,
const Log2File &last_item);
int purge_(const bool is_front_end, IPurgeChecker& checker, bool& empty);
int purge_preceding_items_(const ObPartitionKey& pkey, const Log2File& last_item);
// The caller guarantees that the function will not be executed concurrently
int prepare_container_();
int move_item_to_seg_array_(common::ObISegArray<Log2File>* tmp_container_ptr) const;
......@@ -426,7 +424,7 @@ private:
bool is_inited_;
bool use_seg_array_;
uint64_t min_continuous_log_id_;
ObFileIdCache *file_id_cache_;
ObFileIdCache* file_id_cache_;
ObLogBasePos base_pos_;
common::ObISegArray<Log2File>* container_ptr_;
......@@ -448,29 +446,27 @@ public:
int init(const int64_t server_seq, const common::ObAddr& addr, ObIlogAccessor* ilog_accessor);
void destroy();
int locate(const common::ObPartitionKey &pkey,
const int64_t target_value,
const bool locate_by_log_id,
Log2File &prev_item,
Log2File &next_item);
int append(const file_id_t file_id,
IndexInfoBlockMap &index_info_block_map);
int backfill(const common::ObPartitionKey &pkey,
const uint64_t min_log_id,
const file_id_t file_id,
const offset_t start_offset);
int purge(ObIFileIdCachePurgeStrategy &purge_strategy);
int ensure_log_continuous(const common::ObPartitionKey &pkey,
const uint64_t log_id);
int add_partition_needed(const common::ObPartitionKey &pkey,
const uint64_t last_replay_log_id);
file_id_t get_curr_max_file_id() const {return ATOMIC_LOAD(&curr_max_file_id_);}
int64_t get_next_can_purge_log2file_timestamp() const {return ATOMIC_LOAD(&next_can_purge_log2file_timestamp_);}
int get_clog_base_pos(const ObPartitionKey &pkey, file_id_t &file_id,
offset_t &offset) const;
//Attention: this interface doesn't consider the format of version which before 2.1
int get_cursor_from_file(const ObPartitionKey &pkey, const uint64_t log_id,
const Log2File &item, ObLogCursorExt &log_cursor);
int locate(const common::ObPartitionKey& pkey, const int64_t target_value, const bool locate_by_log_id,
Log2File& prev_item, Log2File& next_item);
int append(const file_id_t file_id, IndexInfoBlockMap& index_info_block_map);
int backfill(const common::ObPartitionKey& pkey, const uint64_t min_log_id, const file_id_t file_id,
const offset_t start_offset);
int purge(ObIFileIdCachePurgeStrategy& purge_strategy);
int ensure_log_continuous(const common::ObPartitionKey& pkey, const uint64_t log_id);
int add_partition_needed(const common::ObPartitionKey& pkey, const uint64_t last_replay_log_id);
file_id_t get_curr_max_file_id() const
{
return ATOMIC_LOAD(&curr_max_file_id_);
}
int64_t get_next_can_purge_log2file_timestamp() const
{
return ATOMIC_LOAD(&next_can_purge_log2file_timestamp_);
}
int get_clog_base_pos(const ObPartitionKey& pkey, file_id_t& file_id, offset_t& offset) const;
// Attention: this interface doesn't consider the format of version which before 2.1
int get_cursor_from_file(
const ObPartitionKey& pkey, const uint64_t log_id, const Log2File& item, ObLogCursorExt& log_cursor);
private:
class AppendInfoFunctor {
public:
......@@ -488,8 +484,7 @@ private:
ObFileIdCache* cache_;
};
// Ensure that the loading process is atomic
class ObUndoAppendFunctor
{
class ObUndoAppendFunctor {
public:
explicit ObUndoAppendFunctor(const file_id_t broken_file_id) : broken_file_id_(broken_file_id)
{}
......@@ -504,8 +499,7 @@ private:
file_id_t broken_file_id_;
common::ObPartitionArray dead_pkeys_;
};
class ObPurgeFunctor
{
class ObPurgeFunctor {
public:
explicit ObPurgeFunctor(ObIFileIdCachePurgeStrategy& purge_strategy)
: purge_strategy_(purge_strategy), next_can_purge_log2file_timestamp_(common::OB_INVALID_TIMESTAMP)
......@@ -601,7 +595,7 @@ private:
common::ObSmallAllocator seg_item_allocator_; // allocator for Log2File items(seg)
common::ObSmallAllocator log2file_list_allocator_; // allocator for Log2FileList
common::ObSmallAllocator list_item_allocator_; // allocator for Log2File items(list)
common::ObLinearHashMap<common::ObPartitionKey, ObFileIdList *> map_;
common::ObLinearHashMap<common::ObPartitionKey, ObFileIdList*> map_;
common::ObLinearHashMap<common::ObPartitionKey, uint64_t> filter_map_;
private:
......
......@@ -134,12 +134,10 @@ public:
virtual int notify_follower_log_missing(const common::ObAddr& server, const int64_t cluster_id,
const common::ObPartitionKey& partition_key, const uint64_t start_log_id, const bool is_in_member_list,
const int32_t msg_type) = 0;
virtual int send_restore_check_rqst(const common::ObAddr &server, const int64_t dst_cluster_id,
const common::ObPartitionKey &key, const ObRestoreCheckType restore_type) = 0;
virtual int send_query_restore_end_id_resp(const common::ObAddr &server,
const int64_t cluster_id,
const common::ObPartitionKey &partition_key,
const uint64_t last_restore_log_id) = 0;
virtual int send_restore_check_rqst(const common::ObAddr& server, const int64_t dst_cluster_id,
const common::ObPartitionKey& key, const ObRestoreCheckType restore_type) = 0;
virtual int send_query_restore_end_id_resp(const common::ObAddr& server, const int64_t cluster_id,
const common::ObPartitionKey& partition_key, const uint64_t last_restore_log_id) = 0;
virtual void update_clog_info(const int64_t max_submit_timestamp) = 0;
virtual void update_clog_info(
const common::ObPartitionKey& partition_key, const uint64_t log_id, const int64_t submit_timestamp) = 0;
......
......@@ -171,7 +171,7 @@ int ObIlogMemstore::timer_check_need_freeze(bool& need_freeze) const
return ret;
}
int ObIlogMemstore::get_cursor_size(int64_t &cursor_size) const
int ObIlogMemstore::get_cursor_size(int64_t& cursor_size) const
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
......@@ -183,7 +183,7 @@ int ObIlogMemstore::get_cursor_size(int64_t &cursor_size) const
return ret;
}
int ObIlogMemstore::get_clog_size(int64_t &clog_size) const
int ObIlogMemstore::get_clog_size(int64_t& clog_size) const
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
......
......@@ -32,8 +32,7 @@ enum ObIlogFreezeTriggerType {
OB_INVALID_TRIGGER_TYPE = 6,
};
class ObIlogMemstore
{
class ObIlogMemstore {
friend ObIlogFileBuilder;
public:
......@@ -78,26 +77,21 @@ public:
// Return value:
// 1) OB_SUCCESS, query success
// 2) OB_PARTITION_NOT_EXIST, partition not exist
int get_min_log_id_and_ts(const common::ObPartitionKey &partition_key,
uint64_t &ret_min_log_id,
int64_t &ret_min_log_ts) const;
int get_min_log_id_and_ts(
const common::ObPartitionKey& partition_key, uint64_t& ret_min_log_id, int64_t& ret_min_log_ts) const;
// Return value:
// 1) OB_SUCCESS, query success
// 2) OB_PARTITION_NOT_EXIST, partition not exist
int get_log_id_range(const common::ObPartitionKey &partition_key,
uint64_t &ret_min_log_id,
uint64_t &ret_max_log_id) const;
int get_cursor_size(int64_t &cursor_size) const;
int get_clog_size(int64_t &clog_size) const;
int insert_partition_meta_info(const common::ObPartitionKey &pkey,
const IndexInfoBlockEntry &entry);
int insert_partition_memberlist_info(const common::ObPartitionKey &pkey,
const MemberListInfo &member_list);
int insert_partition_log_cursor_ext_info(const ObPartitionLogInfo &log_info,
const ObLogCursorExt &log_cursor);
template<class Function>
int operate_partition_meta_info(Function &fn)
int get_log_id_range(
const common::ObPartitionKey& partition_key, uint64_t& ret_min_log_id, uint64_t& ret_max_log_id) const;
int get_cursor_size(int64_t& cursor_size) const;
int get_clog_size(int64_t& clog_size) const;
int insert_partition_meta_info(const common::ObPartitionKey& pkey, const IndexInfoBlockEntry& entry);
int insert_partition_memberlist_info(const common::ObPartitionKey& pkey, const MemberListInfo& member_list);
int insert_partition_log_cursor_ext_info(const ObPartitionLogInfo& log_info, const ObLogCursorExt& log_cursor);
template <class Function>
int operate_partition_meta_info(Function& fn)
{
int ret = common::OB_SUCCESS;
if (OB_FAIL(partition_meta_info_.for_each(fn))) {
......
......@@ -51,10 +51,8 @@ void ObIlogAccessor::destroy()
inited_ = false;
}
int ObIlogAccessor::init(const char *dir_name,
const int64_t server_seq,
const common::ObAddr &addr,
ObLogCache *log_cache)
int ObIlogAccessor::init(
const char* dir_name, const int64_t server_seq, const common::ObAddr& addr, ObLogCache* log_cache)
{
int ret = OB_SUCCESS;
const bool use_log_cache = true;
......@@ -71,9 +69,12 @@ int ObIlogAccessor::init(const char *dir_name,
CSR_LOG(ERROR, "file_store_ init failed", K(ret));
} else if (OB_FAIL(file_id_cache_.init(server_seq, addr, this))) {
CSR_LOG(ERROR, "file_id_cache_ init failed", K(ret));
} else if (OB_FAIL(direct_reader_.init(dir_name, nullptr/*no shared memory*/, use_log_cache,
log_cache, &log_tail_,
ObLogWritePoolType::ILOG_WRITE_POOL))) {
} else if (OB_FAIL(direct_reader_.init(dir_name,
nullptr /*no shared memory*/,
use_log_cache,
log_cache,
&log_tail_,
ObLogWritePoolType::ILOG_WRITE_POOL))) {
CSR_LOG(ERROR, "direct_reader_ init failed", K(ret));
} else if (OB_FAIL(buffer_.init(OB_MAX_LOG_BUFFER_SIZE, CLOG_DIO_ALIGN_SIZE, ObModIds::OB_CLOG_INFO_BLK_HNDLR))) {
CSR_LOG(ERROR, "buffer init failed", K(ret));
......@@ -719,12 +720,8 @@ ObIlogStorage::~ObIlogStorage()
destroy();
}
int ObIlogStorage::init(const char *dir_name,
const int64_t server_seq,
const common::ObAddr &addr,
ObLogCache *log_cache,
ObPartitionService *partition_service,
ObCommitLogEnv *commit_log_env)
int ObIlogStorage::init(const char* dir_name, const int64_t server_seq, const common::ObAddr& addr,
ObLogCache* log_cache, ObPartitionService* partition_service, ObCommitLogEnv* commit_log_env)
{
int ret = OB_SUCCESS;
......@@ -738,8 +735,15 @@ int ObIlogStorage::init(const char *dir_name,
} else if (OB_ISNULL(dir_name) || OB_ISNULL(log_cache) || OB_ISNULL(partition_service) || OB_ISNULL(commit_log_env) ||
OB_UNLIKELY(server_seq < 0 || !addr.is_valid())) {
ret = OB_INVALID_ARGUMENT;
CSR_LOG(ERROR, "invalid argument", KR(ret), KP(dir_name), KP(log_cache), KP(partition_service),
KP(commit_log_env), K(server_seq), K(addr));
CSR_LOG(ERROR,
"invalid argument",
KR(ret),
KP(dir_name),
KP(log_cache),
KP(partition_service),
KP(commit_log_env),
K(server_seq),
K(addr));
} else if (OB_FAIL(ObIlogAccessor::init(dir_name, server_seq, addr, log_cache))) {
CSR_LOG(ERROR, "failed to init ObIlogAccessor", K(ret));
} else if (OB_FAIL(init_next_ilog_file_id_(next_ilog_file_id))) {
......
......@@ -40,10 +40,7 @@ public:
virtual void destroy();
public:
int init(const char *dir_name,
const int64_t server_seq,
const common::ObAddr &addr,
ObLogCache *log_cache);
int init(const char* dir_name, const int64_t server_seq, const common::ObAddr& addr, ObLogCache* log_cache);
int add_partition_needed_to_file_id_cache(
const common::ObPartitionKey& partition_key, const uint64_t last_replay_log_id);
......@@ -102,12 +99,8 @@ public:
~ObIlogStorage();
public:
int init(const char *dir_name,
const int64_t server_seq,
const common::ObAddr &addr,
ObLogCache *log_cache,
storage::ObPartitionService *partition_service,
ObCommitLogEnv *commit_log_env);
int init(const char* dir_name, const int64_t server_seq, const common::ObAddr& addr, ObLogCache* log_cache,
storage::ObPartitionService* partition_service, ObCommitLogEnv* commit_log_env);
void destroy();
int start();
void stop();
......@@ -151,16 +144,12 @@ public:
// Return value
// 1) OB_SUCCESS
// 2) OB_ENTRY_NOT_EXIST
int get_file_id_range(file_id_t &min_file_id, file_id_t &max_file_id) const;
int locate_by_timestamp(const common::ObPartitionKey &partition_key,
const int64_t start_ts,
uint64_t &target_log_id,
int64_t &target_log_timestamp);
int locate_ilog_file_by_log_id(const common::ObPartitionKey &pkey,
const uint64_t start_log_id,
uint64_t &end_log_id,
file_id_t &ilog_id);
int get_file_id_range(file_id_t& min_file_id, file_id_t& max_file_id) const;
int locate_by_timestamp(const common::ObPartitionKey& partition_key, const int64_t start_ts, uint64_t& target_log_id,
int64_t& target_log_timestamp);
int locate_ilog_file_by_log_id(
const common::ObPartitionKey& pkey, const uint64_t start_log_id, uint64_t& end_log_id, file_id_t& ilog_id);
int wash_ilog_cache();
int purge_stale_file();
int purge_stale_ilog_index();
......
......@@ -1192,7 +1192,7 @@ int ObIlogStore::get_merge_range_(int64_t& end_idx, bool& need_switch_file)
end_idx = 0;
// limit each ilog file should less than ObIlogMemstore::CURSOR_SIZE_TRIGGER
while (end_idx < size && false == need_switch_file && OB_SUCC(ret)) {
ObIlogMemstore *memstore = frozen_memstore_array_[end_idx].memstore_;
ObIlogMemstore* memstore = frozen_memstore_array_[end_idx].memstore_;
ObIlogFreezeTriggerType trigger_type = frozen_memstore_array_[end_idx].trigger_type_;
// In normal case, when end_idx is 0, trigger_type mustn't be OB_ILOG_NOT_CONTINOUS_TRIGGER_TYPE
if (frozen_memstore_array_[end_idx].trigger_type_ == OB_ILOG_NOT_CONTINOUS_TRIGGER_TYPE) {
......@@ -1206,8 +1206,8 @@ int ObIlogStore::get_merge_range_(int64_t& end_idx, bool& need_switch_file)
CLOG_LOG(ERROR, "get_clog_size failed", K(ret));
// Try to ensure the total size of each file does not exceed 32MB,
// because of the ilog memstore may exceed 32MB in concurrent case.
} else if (true == (total_clog_size >= ObIlogMemstore::CLOG_SIZE_TRIGGER
|| total_cursor_size >= ObIlogMemstore::CURSOR_SIZE_TRIGGER)) {
} else if (true == (total_clog_size >= ObIlogMemstore::CLOG_SIZE_TRIGGER ||
total_cursor_size >= ObIlogMemstore::CURSOR_SIZE_TRIGGER)) {
need_switch_file = true;
break;
} else {
......@@ -1253,9 +1253,8 @@ int ObIlogStore::merge_frozen_memstore_(int64_t& end_idx, FrozenMemstore& memsto
}
} while (0);
if (OB_SUCC(ret) && OB_FAIL(do_merge_frozen_memstore_(tmp_frozen_memstore_array,
need_switch_file,
memstore_after_merge))) {
if (OB_SUCC(ret) &&
OB_FAIL(do_merge_frozen_memstore_(tmp_frozen_memstore_array, need_switch_file, memstore_after_merge))) {
if (ret == OB_EAGAIN) {
CLOG_LOG(WARN, "log not continous in do_merge_frozen_memstore_", K(tmp_frozen_memstore_array));
} else {
......@@ -1384,9 +1383,8 @@ bool ObIlogStore::need_merge_frozen_memstore_array_by_trigger_type_(const ObIlog
return trigger_type == OB_TIMER_TRIGGER_TYPE;
}
int ObIlogStore::do_merge_frozen_memstore_(const FrozenMemstoreArray& tmp_frozen_memstore_array,
bool need_switch_file,
FrozenMemstore& memstore_after_merge)
int ObIlogStore::do_merge_frozen_memstore_(
const FrozenMemstoreArray& tmp_frozen_memstore_array, bool need_switch_file, FrozenMemstore& memstore_after_merge)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
......@@ -1408,23 +1406,29 @@ int ObIlogStore::do_merge_frozen_memstore_(const FrozenMemstoreArray& tmp_frozen
if (false == frozen_memstore.is_valid()) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "unexpect error, invalid frozen_memstore", K(ret), K(frozen_memstore));
} else if (OB_FAIL(merge_container.merge_ilog_memstore_to_container(frozen_memstore.memstore_))
&& OB_EAGAIN != ret) {
} else if (OB_FAIL(merge_container.merge_ilog_memstore_to_container(frozen_memstore.memstore_)) &&
OB_EAGAIN != ret) {
CLOG_LOG(ERROR, "merge ilog memstore failed", K(ret), K(frozen_memstore), K(tmp_frozen_memstore_array));
} else if (OB_EAGAIN == ret) {
if (i == 0) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "unexpected error, there is no possiblity for merging frozen memstore\
failed because ilog not continous when i is 0", K(ret));
CLOG_LOG(ERROR,
"unexpected error, there is no possiblity for merging frozen memstore\
failed because ilog not continous when i is 0",
K(ret));
} else if (ret == OB_EAGAIN) {
WLockGuard guard(lock_);
frozen_memstore_array_[i-1].trigger_type_ = OB_ILOG_NOT_CONTINOUS_TRIGGER_TYPE;
CLOG_LOG(WARN, "log not continous in merge_frozen_memstore, need modify its trigger_type", K(ret),
K(frozen_memstore), K(tmp_frozen_memstore_array));
frozen_memstore_array_[i - 1].trigger_type_ = OB_ILOG_NOT_CONTINOUS_TRIGGER_TYPE;
CLOG_LOG(WARN,
"log not continous in merge_frozen_memstore, need modify its trigger_type",
K(ret),
K(frozen_memstore),
K(tmp_frozen_memstore_array));
} else {
CLOG_LOG(ERROR, "ilog_memstore_merge failed", K(ret), K(frozen_memstore), K(tmp_frozen_memstore_array));
}
} else {}
} else {
}
}
if (OB_SUCC(ret) && OB_FAIL(merge_container.transfer_to_ilog_memstore(memstore))) {
......@@ -1436,13 +1440,12 @@ int ObIlogStore::do_merge_frozen_memstore_(const FrozenMemstoreArray& tmp_frozen
}
if (OB_SUCC(ret)) {
int64_t seq = tmp_frozen_memstore_array[tmp_size-1].seq_;
ObIlogFreezeTriggerType trigger_type = (true == need_switch_file ? OB_MERGE_NEED_SWITCH_FILE_TRIGGER_TYPE :
OB_TIMER_TRIGGER_TYPE);
int64_t seq = tmp_frozen_memstore_array[tmp_size - 1].seq_;
ObIlogFreezeTriggerType trigger_type =
(true == need_switch_file ? OB_MERGE_NEED_SWITCH_FILE_TRIGGER_TYPE : OB_TIMER_TRIGGER_TYPE);
if(OB_FAIL(memstore_after_merge.set_frozen_memstore(trigger_type, memstore, seq))) {
CLOG_LOG(ERROR, "set_frozen_memstore failed", K(memstore_after_merge), K(trigger_type),
K(memstore), K(seq));
if (OB_FAIL(memstore_after_merge.set_frozen_memstore(trigger_type, memstore, seq))) {
CLOG_LOG(ERROR, "set_frozen_memstore failed", K(memstore_after_merge), K(trigger_type), K(memstore), K(seq));
}
}
......
......@@ -175,9 +175,8 @@ private:
// after doing merge
bool need_merge_frozen_memstore_array_by_trigger_type_(const ObIlogFreezeTriggerType& trigger_type) const;
int do_merge_frozen_memstore_(const FrozenMemstoreArray& frozen_memstore_array,
bool need_switch_file,
FrozenMemstore& memstore_after_merge);
int do_merge_frozen_memstore_(
const FrozenMemstoreArray& frozen_memstore_array, bool need_switch_file, FrozenMemstore& memstore_after_merge);
void alloc_memstore_(ObIlogMemstore*& memstore);
......
......@@ -1237,11 +1237,10 @@ int ObIInfoBlockHandler::CheckPartitionNeedFreezeFunctor::do_check_full_partitio
// 2. INVALID, means that archive may be had started or stopped, cann't reclaime the log file, need
// wati next round
// 3. STOPING and STOPED, means that archive has stopped
} else if (ObLogArchiveStatus::STATUS::BEGINNING == info.status_.status_
|| ObLogArchiveStatus::STATUS::DOING == info.status_.status_) {
if (OB_FAIL(pls->get_last_archived_log_id(info.status_.incarnation_,
info.status_.round_,
last_archived_log_id))) {
} else if (ObLogArchiveStatus::STATUS::BEGINNING == info.status_.status_ ||
ObLogArchiveStatus::STATUS::DOING == info.status_.status_) {
if (OB_FAIL(
pls->get_last_archived_log_id(info.status_.incarnation_, info.status_.round_, last_archived_log_id))) {
CLOG_LOG(WARN, "failed to get_log_archive_backup_info", K(partition_key), K(info), KR(ret));
} else if (OB_INVALID_ID == last_archived_log_id || last_archived_log_id < max_log_id) {
can_skip_ = false;
......
......@@ -141,7 +141,7 @@ enum ObReplicaMsgType {
OB_REPLICA_MSG_TYPE_NOT_CHILD = 3, // I'm not your child
OB_REPLICA_MSG_TYPE_NOT_EXIST = 4, // partition not exist
OB_REPLICA_MSG_TYPE_DISABLED_STATE = 5, // server in disabled state
OB_REPLICA_MSG_TYPE_QUICK_REGISTER= 6, // quick register to me
OB_REPLICA_MSG_TYPE_QUICK_REGISTER = 6, // quick register to me
};
enum ObRegRespMsgType {
......@@ -161,16 +161,14 @@ enum ObFetchLogType {
OB_FETCH_LOG_TYPE_MAX,
};
enum ObRestoreCheckType
{
enum ObRestoreCheckType {
OB_CHECK_UNKNOWN = 0,
OB_CHECK_STANDBY_RESTORE = 1,
OB_CHECK_RESTORE_END_ID = 2,
OB_CHECK_MAX,
};
enum ReceiveLogType
{
enum ReceiveLogType {
RL_TYPE_UNKNOWN = 0,
PUSH_LOG = 1,
FETCH_LOG = 2,
......
......@@ -155,7 +155,10 @@ public:
}
return log_ts;
}
void set_submit_timestamp(const int64_t ts) { submit_timestamp_ = ts; }
void set_submit_timestamp(const int64_t ts)
{
submit_timestamp_ = ts;
}
bool is_batch_committed() const
{
bool bool_ret = false;
......@@ -182,7 +185,7 @@ public:
}
// Serialize submit_timestamp at specified offset
int serialize_submit_timestamp(char *buf, const int64_t buf_len, int64_t &pos);
int serialize_submit_timestamp(char* buf, const int64_t buf_len, int64_t& pos);
static bool check_magic_number(const int16_t magic_number)
{
......
......@@ -26,13 +26,12 @@ ObLogEventScheduler::~ObLogEventScheduler()
int ObLogEventScheduler::init()
{
int ret = common::OB_SUCCESS;
const char *CLOG_EVENT_TIME_WHEEL_NAME = "ClogEventTimeWheel";
const char* CLOG_EVENT_TIME_WHEEL_NAME = "ClogEventTimeWheel";
const int64_t thread_num = get_time_wheel_thread_num_();
if (IS_INIT) {
ret = common::OB_INIT_TWICE;
CLOG_LOG(ERROR, "ObLogEventScheduler init twice", K(ret));
} else if (OB_FAIL(time_wheel_.init(CLOG_EVENT_TIME_WHEEL_PRECISION,
thread_num, CLOG_EVENT_TIME_WHEEL_NAME))) {
} else if (OB_FAIL(time_wheel_.init(CLOG_EVENT_TIME_WHEEL_PRECISION, thread_num, CLOG_EVENT_TIME_WHEEL_NAME))) {
CLOG_LOG(ERROR, "ObTimeWheel init fail", K(ret));
} else {
is_inited_ = true;
......@@ -105,7 +104,7 @@ int ObLogEventScheduler::schedule_task_(ObLogStateEventTaskV2* task, const int64
int64_t ObLogEventScheduler::get_time_wheel_thread_num_() const
{
int64_t thread_num = MAX(common::get_cpu_num()/2, 4);
int64_t thread_num = MAX(common::get_cpu_num() / 2, 4);
if (thread_num > common::ObTimeWheel::MAX_THREAD_NUM) {
thread_num = common::ObTimeWheel::MAX_THREAD_NUM;
}
......
......@@ -870,216 +870,6 @@ int ObLogExternalFetchLogProcessor::process()
return ret;
}
// Request heartbeat information of given partitions.
void ObLogReqHeartbeatInfoRequest::Param::reset()
{
pkey_.reset();
log_id_ = OB_INVALID_ID;
}
OB_SERIALIZE_MEMBER(ObLogReqHeartbeatInfoRequest::Param, pkey_, log_id_);
ObLogReqHeartbeatInfoRequest::ObLogReqHeartbeatInfoRequest() : rpc_ver_(CUR_RPC_VER), params_()
{}
ObLogReqHeartbeatInfoRequest::~ObLogReqHeartbeatInfoRequest()
{
reset();
}
void ObLogReqHeartbeatInfoRequest::reset()
{
params_.reset();
}
bool ObLogReqHeartbeatInfoRequest::is_valid() const
{
return (0 < params_.count());
}
int ObLogReqHeartbeatInfoRequest::set_params(const ParamArray& params)
{
int ret = OB_SUCCESS;
if (ITEM_CNT_LMT < params.count()) {
ret = OB_BUF_NOT_ENOUGH;
EXTLOG_LOG(WARN, "err set params, buf not enough", K(ret), LITERAL_K(ITEM_CNT_LMT), "count", params.count());
} else if (OB_SUCCESS != (ret = params_.assign(params))) {
EXTLOG_LOG(ERROR, "err assign params", K(ret));
}
return ret;
}
int ObLogReqHeartbeatInfoRequest::append_param(const Param& param)
{
int ret = OB_SUCCESS;
if (ITEM_CNT_LMT <= params_.count()) {
ret = OB_BUF_NOT_ENOUGH;
EXTLOG_LOG(WARN, "err append param, buf not enough", K(ret), LITERAL_K(ITEM_CNT_LMT), "count", params_.count());
} else if (OB_SUCCESS != (ret = params_.push_back(param))) {
EXTLOG_LOG(ERROR, "err push param", K(ret));
}
return ret;
}
const ObLogReqHeartbeatInfoRequest::ParamArray& ObLogReqHeartbeatInfoRequest::get_params() const
{
return params_;
}
int64_t ObLogReqHeartbeatInfoRequest::rpc_ver() const
{
return rpc_ver_;
}
OB_DEF_SERIALIZE(ObLogReqHeartbeatInfoRequest)
{
int ret = OB_SUCCESS;
LST_DO_CODE(OB_UNIS_ENCODE, rpc_ver_, params_);
return ret;
}
OB_DEF_SERIALIZE_SIZE(ObLogReqHeartbeatInfoRequest)
{
int64_t len = 0;
LST_DO_CODE(OB_UNIS_ADD_LEN, rpc_ver_, params_);
return len;
}
OB_DEF_DESERIALIZE(ObLogReqHeartbeatInfoRequest)
{
int ret = OB_SUCCESS;
LST_DO_CODE(OB_UNIS_DECODE, rpc_ver_);
if (CUR_RPC_VER == rpc_ver_) {
LST_DO_CODE(OB_UNIS_DECODE, params_);
} else {
ret = OB_NOT_SUPPORTED;
EXTLOG_LOG(ERROR, "deserialize error, version not match", K(ret), K(rpc_ver_), LITERAL_K(CUR_RPC_VER));
}
return ret;
}
void ObLogReqHeartbeatInfoResponse::Result::reset()
{
err_ = OB_SUCCESS;
tstamp_ = OB_INVALID_TIMESTAMP;
}
OB_SERIALIZE_MEMBER(ObLogReqHeartbeatInfoResponse::Result, err_, tstamp_);
ObLogReqHeartbeatInfoResponse::ObLogReqHeartbeatInfoResponse() : rpc_ver_(CUR_RPC_VER), err_(OB_SUCCESS), res_()
{}
ObLogReqHeartbeatInfoResponse::~ObLogReqHeartbeatInfoResponse()
{
reset();
}
void ObLogReqHeartbeatInfoResponse::reset()
{
err_ = OB_SUCCESS;
res_.reset();
}
void ObLogReqHeartbeatInfoResponse::set_err(const int err)
{
err_ = err;
}
int ObLogReqHeartbeatInfoResponse::set_results(const ResultArray& results)
{
int ret = OB_SUCCESS;
if (ITEM_CNT_LMT < results.count()) {
ret = OB_BUF_NOT_ENOUGH;
EXTLOG_LOG(WARN, "err set results, buf not enough", K(ret), LITERAL_K(ITEM_CNT_LMT), "count", results.count());
} else if (OB_SUCCESS != (ret = res_.assign(results))) {
EXTLOG_LOG(ERROR, "err assign results", K(ret));
}
return ret;
}
int ObLogReqHeartbeatInfoResponse::append_result(const Result& result)
{
int ret = OB_SUCCESS;
if (ITEM_CNT_LMT <= res_.count()) {
ret = OB_BUF_NOT_ENOUGH;
EXTLOG_LOG(WARN, "err append result, buf not enough", K(ret), LITERAL_K(ITEM_CNT_LMT), "count", res_.count());
} else if (OB_SUCCESS != (ret = res_.push_back(result))) {
EXTLOG_LOG(ERROR, "err push back results", K(ret));
}
return ret;
}
int ObLogReqHeartbeatInfoResponse::get_err() const
{
return err_;
}
const ObLogReqHeartbeatInfoResponse::ResultArray& ObLogReqHeartbeatInfoResponse::get_results() const
{
return res_;
}
int64_t ObLogReqHeartbeatInfoResponse::rpc_ver() const
{
return rpc_ver_;
}
void ObLogReqHeartbeatInfoResponse::set_rpc_ver(const int64_t ver)
{
rpc_ver_ = ver;
}
OB_DEF_SERIALIZE(ObLogReqHeartbeatInfoResponse)
{
int ret = OB_SUCCESS;
LST_DO_CODE(OB_UNIS_ENCODE, rpc_ver_, err_, res_);
return ret;
}
OB_DEF_SERIALIZE_SIZE(ObLogReqHeartbeatInfoResponse)
{
int64_t len = 0;
LST_DO_CODE(OB_UNIS_ADD_LEN, rpc_ver_, err_, res_);
return len;
}
OB_DEF_DESERIALIZE(ObLogReqHeartbeatInfoResponse)
{
int ret = OB_SUCCESS;
LST_DO_CODE(OB_UNIS_DECODE, rpc_ver_);
if (CUR_RPC_VER == rpc_ver_) {
LST_DO_CODE(OB_UNIS_DECODE, err_, res_);
} else {
ret = OB_NOT_SUPPORTED;
EXTLOG_LOG(ERROR, "deserialize error, version not match", K(ret), K(rpc_ver_), LITERAL_K(CUR_RPC_VER));
}
return ret;
}
int ObLogReqHeartbeatInfoProcessor::process()
{
int ret = OB_SUCCESS;
const ObLogReqHeartbeatInfoRequest& req = arg_;
ObLogReqHeartbeatInfoResponse& resp = result_;
clog::ObICLogMgr* clog_mgr = NULL;
logservice::ObExtLogService* els = NULL;
if (OB_ISNULL(partition_service_)) {
ret = OB_ERR_UNEXPECTED;
EXTLOG_LOG(ERROR, "partition_service_ is null", K(ret));
} else if (OB_ISNULL(clog_mgr = partition_service_->get_clog_mgr())) {
ret = OB_ERR_UNEXPECTED;
EXTLOG_LOG(ERROR, "clog_mgr is null", K(ret));
} else if (OB_ISNULL(els = clog_mgr->get_external_log_service())) {
ret = OB_ERR_UNEXPECTED;
EXTLOG_LOG(ERROR, "els is null", K(ret));
} else {
ret = els->req_heartbeat_info(req, resp);
}
// rewrite ret for rpc
ret = OB_SUCCESS;
return ret;
}
int ObExternalProcessorHelper::get_clog_mgr(ObPartitionService* ps, ObICLogMgr*& clog_mgr)
{
int ret = OB_SUCCESS;
......
......@@ -40,8 +40,6 @@ class ObLogReqStartPosByLogIdRequest;
class ObLogReqStartPosByLogIdResponse;
class ObLogExternalFetchLogRequest;
class ObLogExternalFetchLogResponse;
class ObLogReqHeartbeatInfoRequest;
class ObLogReqHeartbeatInfoResponse;
// for request with breakpoint
class ObLogReqStartLogIdByTsRequestWithBreakpoint;
......@@ -73,10 +71,6 @@ public:
// fetch_log()
RPC_S(@PR5 fetch_log, OB_LOG_FETCH_LOG_EXTERNAL, (ObLogExternalFetchLogRequest), ObLogExternalFetchLogResponse);
// req_last_log_serv_info()
RPC_S(@PR5 req_heartbeat_info, OB_LOG_REQUEST_HEARTBEAT_INFO, (ObLogReqHeartbeatInfoRequest),
ObLogReqHeartbeatInfoResponse);
// for request with breakpoint
RPC_S(@PR5 req_start_log_id_by_ts_with_breakpoint, OB_LOG_REQ_START_LOG_ID_BY_TS_WITH_BREAKPOINT,
(ObLogReqStartLogIdByTsRequestWithBreakpoint), ObLogReqStartLogIdByTsResponseWithBreakpoint);
......@@ -441,95 +435,6 @@ private:
storage::ObPartitionService* partition_service_;
};
// Request heartbeat information of given partitions.
// Err:
// - OB_SUCCESS
// - OB_NEED_RETRY: can't generate heartbeat info
// - OB_ERR_SYS: observer internal error
class ObLogReqHeartbeatInfoRequest {
static const int64_t CUR_RPC_VER = 1;
static const int64_t ITEM_CNT_LMT = 10000; // Around 300kb for cur version.
public:
struct Param {
common::ObPartitionKey pkey_;
uint64_t log_id_;
void reset();
TO_STRING_KV(K_(pkey), K_(log_id));
OB_UNIS_VERSION(1);
};
typedef common::ObSEArray<Param, 16> ParamArray;
public:
ObLogReqHeartbeatInfoRequest();
~ObLogReqHeartbeatInfoRequest();
public:
void reset();
bool is_valid() const;
int set_params(const ParamArray& params);
int append_param(const Param& param);
const ParamArray& get_params() const;
int64_t rpc_ver() const;
TO_STRING_KV(K_(rpc_ver), K_(params));
OB_UNIS_VERSION(1);
private:
int64_t rpc_ver_;
ParamArray params_;
};
class ObLogReqHeartbeatInfoResponse {
static const int64_t CUR_RPC_VER = 1;
static const int64_t ITEM_CNT_LMT = 10000; // Around 300kb for cur version.
public:
struct Result {
int err_;
int64_t tstamp_;
void reset();
TO_STRING_KV(K_(err), K_(tstamp));
OB_UNIS_VERSION(1);
};
typedef common::ObSEArray<Result, 16> ResultArray;
public:
ObLogReqHeartbeatInfoResponse();
~ObLogReqHeartbeatInfoResponse();
public:
void reset();
void set_err(const int err);
int set_results(const ResultArray& results);
int append_result(const Result& result);
int get_err() const;
const ResultArray& get_results() const;
int64_t rpc_ver() const;
void set_rpc_ver(const int64_t ver);
TO_STRING_KV(K_(rpc_ver), K_(err), K_(res));
OB_UNIS_VERSION(1);
private:
int64_t rpc_ver_;
int err_;
ResultArray res_;
};
class ObLogReqHeartbeatInfoProcessor
: public ObRpcProcessor<ObLogExternalProxy::ObRpc<OB_LOG_REQUEST_HEARTBEAT_INFO> > {
public:
ObLogReqHeartbeatInfoProcessor(storage::ObPartitionService* partition_service) : partition_service_(partition_service)
{}
~ObLogReqHeartbeatInfoProcessor()
{
partition_service_ = NULL;
}
protected:
int process();
private:
storage::ObPartitionService* partition_service_;
};
// check ps & ps->clog_mgr not null
// to simplify code
class ObExternalProcessorHelper {
......
......@@ -594,9 +594,8 @@ int ObLogMembershipTaskMgr::submit_confirmed_info_(const uint64_t log_id, const
int64_t confirmed_info_epoch_id = confirmed_info.get_epoch_id();
int64_t confirmed_info_submit_timestamp = confirmed_info.get_submit_timestamp();
if (log_id != cur_renew_log_id
|| ms_proposal_id != cur_ms_proposal_id
|| ms_proposal_id != log_task.get_proposal_id()) {
if (log_id != cur_renew_log_id || ms_proposal_id != cur_ms_proposal_id ||
ms_proposal_id != log_task.get_proposal_id()) {
ret = OB_STATE_NOT_MATCH;
CLOG_LOG(WARN,
"log_id or ms_proposal_id not match with cur_renew_ms_task",
......@@ -625,12 +624,16 @@ int ObLogMembershipTaskMgr::submit_confirmed_info_(const uint64_t log_id, const
if (log_task.is_confirmed_info_exist()) {
} else {
if (log_task.is_submit_log_exist()) {
if ((log_task.get_data_checksum() != confirmed_info_data_checksum)
|| (log_task.get_epoch_id() != confirmed_info_epoch_id)
|| (OB_INVALID_TIMESTAMP != confirmed_info_submit_timestamp
&& log_task.get_submit_timestamp() != confirmed_info_submit_timestamp)) {
CLOG_LOG(INFO, "log_task and confirmed_info not match, reset", K_(partition_key),
K(log_id), K(log_task), K(confirmed_info));
if ((log_task.get_data_checksum() != confirmed_info_data_checksum) ||
(log_task.get_epoch_id() != confirmed_info_epoch_id) ||
(OB_INVALID_TIMESTAMP != confirmed_info_submit_timestamp &&
log_task.get_submit_timestamp() != confirmed_info_submit_timestamp)) {
CLOG_LOG(INFO,
"log_task and confirmed_info not match, reset",
K_(partition_key),
K(log_id),
K(log_task),
K(confirmed_info));
log_task.reset_log();
log_task.reset_state(false);
log_task.reset_log_cursor();
......
......@@ -16,14 +16,11 @@
#include "ob_log_entry.h"
#include "ob_log_file_pool.h"
namespace oceanbase
{
namespace clog
{
namespace oceanbase {
namespace clog {
// Used to set the parameters of reading files, the purpose is to add
// parameters int the future without changing the interface
struct ObReadParam
{
struct ObReadParam {
file_id_t file_id_;
offset_t offset_;
common::ObPartitionKey partition_key_;
......
......@@ -798,8 +798,8 @@ int ObLogReconfirm::confirm_log_()
if (OB_FAIL(try_update_nop_or_truncate_timestamp(*header))) {
CLOG_LOG(WARN, "try_update_nop_or_truncate_timestamp fail", K(ret), K_(partition_key));
} else if (OB_FAIL(sw_->submit_log(log_ptr->get_header(), log_ptr->get_buf(), NULL))) {
CLOG_LOG(WARN, "submit log failed", K_(partition_key), K(ret), K_(next_id),
K_(start_id), K_(max_flushed_id));
CLOG_LOG(
WARN, "submit log failed", K_(partition_key), K(ret), K_(next_id), K_(start_id), K_(max_flushed_id));
break;
} else {
CLOG_LOG(TRACE, "submit log success", K_(partition_key), K_(next_id), K_(start_id), K_(max_flushed_id));
......@@ -814,7 +814,7 @@ int ObLogReconfirm::confirm_log_()
next_id_++;
}
}
} // end while
} // end while
// In case of rebuild in leader reconfirm:
// 1. when majority has already recycled specified log, the follower
......@@ -832,15 +832,17 @@ int ObLogReconfirm::confirm_log_()
const uint64_t new_start_id = sw_->get_start_id();
if (new_start_id > next_id_) {
next_id_ = new_start_id;
CLOG_LOG(INFO, "there may execute a rebuild operation in\
leader reconfirm", K(ret), K(new_start_id), K(next_id_));
CLOG_LOG(INFO,
"there may execute a rebuild operation in\
leader reconfirm",
K(ret),
K(new_start_id),
K(next_id_));
}
ret = OB_SUCCESS;
}
if (OB_SUCC(ret)
&& next_id_ <= max_flushed_id_
&& next_id_ >= log_info_array_.get_end_id()) {
if (OB_SUCC(ret) && next_id_ <= max_flushed_id_ && next_id_ >= log_info_array_.get_end_id()) {
// process next log_range
if (OB_EAGAIN == (ret = init_log_info_range_(next_id_))) {
// ret is EAGAIN when some log has slide out, need update next_id_
......
......@@ -849,141 +849,15 @@ int ObLogSlidingWindow::try_update_submit_timestamp(const int64_t base_ts)
return ret;
}
// only called by ObExtLeaderHeartbeatHandler, it will double check leader
int ObLogSlidingWindow::get_next_timestamp(const uint64_t last_log_id, int64_t& res_ts)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
uint64_t last_log_id_dummy = OB_INVALID_ID;
uint64_t next_log_id_dummy = OB_INVALID_ID;
int64_t last_log_ts = OB_INVALID_TIMESTAMP;
int64_t next_log_ts = OB_INVALID_TIMESTAMP;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else {
res_ts = OB_INVALID_TIMESTAMP;
get_last_replay_log(last_log_id_dummy, last_log_ts);
get_next_replay_log_id_info(next_log_id_dummy, next_log_ts);
if (OB_INVALID_TIMESTAMP == last_log_ts || OB_INVALID_TIMESTAMP == next_log_ts) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(
ERROR, "last_log_ts or next_log_ts is invalid", K(ret), K(partition_key_), K(last_log_ts), K(next_log_ts));
} else {
if (OB_LIKELY(is_empty())) { // check empty first
if (last_log_id == static_cast<uint64_t>(sw_.get_start_id() - 1)) {
int64_t safe_cur_ts = next_log_ts - MAX_TIME_DIFF_BETWEEN_SERVER;
res_ts = safe_cur_ts > last_log_ts ? safe_cur_ts : last_log_ts;
CLOG_LOG(TRACE, "sw get next timestamp", K(partition_key_), K(res_ts), K(safe_cur_ts), K(last_log_ts));
} else {
ret = OB_EAGAIN;
}
} else {
CLOG_LOG(TRACE, "sw not empty, get next log task timestamp", K(last_log_id), K(partition_key_));
// not empty, try to get timestamp of the unconfirmed timestamp
// this function is called under double check leader
int64_t next_ts = OB_INVALID_TIMESTAMP;
ObLogTask* task = NULL;
const int64_t* ref = NULL;
if (OB_FAIL(get_log_task(last_log_id + 1, task, ref))) {
// in time interval [test sw is empty ~ log_task is put into sw], OB_ERR_NULL_VALUE is expected
if (OB_ERR_NULL_VALUE == ret) {
CLOG_LOG(INFO,
"get log task error, log_task not put in sw yet",
K(ret),
K(partition_key_),
K(last_log_id),
"start_id",
sw_.get_start_id(),
"max_log_id",
get_max_log_id());
// heartbeat handler retry
ret = OB_EAGAIN;
} else if (OB_ERROR_OUT_OF_RANGE == ret) {
CLOG_LOG(INFO, "get log task error, log slide out", K(ret), K(partition_key_), K(last_log_id));
// heartbeat handler retry
ret = OB_EAGAIN;
} else {
CLOG_LOG(WARN, "get log task error", K(ret), K(partition_key_), K(last_log_id));
}
} else {
task->lock();
if (task->is_submit_log_exist()) {
next_ts = task->get_submit_timestamp();
if (OB_INVALID_TIMESTAMP == next_ts) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(WARN, "get invalid next_ts", K(partition_key_), K(last_log_id));
} else {
res_ts = std::min(next_ts - MAX_TIME_DIFF_BETWEEN_SERVER, last_log_ts);
}
} else { // submit_log not exist
// not leader now, maybe log_task is generated by a quicker confirm_info_packet
ret = OB_NEED_RETRY;
CLOG_LOG(INFO, "submit log not exist", K(ret));
}
task->unlock();
}
if (NULL != ref && OB_SUCCESS != (tmp_ret = revert_log_task(ref))) {
CLOG_LOG(ERROR, "revert_log_task failed", K_(partition_key), K(tmp_ret));
} else {
ref = NULL;
}
} // not empty
}
}
return ret;
}
// only called by ObExtLeaderHeartbeatHandler to get next served log_id and ts based on keepalive ts
int ObLogSlidingWindow::get_next_served_log_info_by_next_replay_log_info(
uint64_t& next_served_log_id, int64_t& next_served_log_ts)
{
int ret = OB_SUCCESS;
uint64_t next_replay_log_id = OB_INVALID_ID;
int64_t next_replay_log_ts = OB_INVALID_TIMESTAMP;
int64_t next_log_tstamp = OB_INVALID_TIMESTAMP;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else {
get_next_replay_log_id_info(next_replay_log_id, next_replay_log_ts);
// when next_log is start log in sw, try to retrieve its log_ts
if (!is_empty() && next_replay_log_id == static_cast<uint64_t>(sw_.get_start_id())) {
int tmp_ret = get_log_submit_tstamp_from_task_(next_replay_log_id, next_log_tstamp);
if (OB_EAGAIN == tmp_ret) {
CLOG_LOG(TRACE,
"[GET_NEXT_SERVED_LOG_INFO] next log is not ready",
K_(partition_key),
K(next_replay_log_id),
"start_id",
sw_.get_start_id(),
"max_log_id",
get_max_log_id());
} else if (OB_ERROR_OUT_OF_RANGE == tmp_ret) {
CLOG_LOG(TRACE,
"[GET_NEXT_SERVED_LOG_INFO] next log just slide out",
K_(partition_key),
K(next_replay_log_id),
"start_id",
sw_.get_start_id(),
"max_log_id",
get_max_log_id());
}
}
if (OB_SUCCESS == ret) {
next_served_log_id = next_replay_log_id;
// If the next log is valid and effective, select the max value between lower bound of the next log and
// follower-read ts
if (OB_INVALID_TIMESTAMP != next_log_tstamp) {
// Here minus the maximum clock offset between servers
int64_t safe_next_log_tstamp = next_log_tstamp - MAX_TIME_DIFF_BETWEEN_SERVER;
next_served_log_ts = std::max(safe_next_log_tstamp, next_replay_log_ts);
} else {
next_served_log_ts = next_replay_log_ts;
}
}
get_next_replay_log_id_info(next_served_log_id, next_served_log_ts);
CLOG_LOG(TRACE,
"[GET_NEXT_SERVED_LOG_INFO]",
......@@ -991,9 +865,6 @@ int ObLogSlidingWindow::get_next_served_log_info_by_next_replay_log_info(
K_(partition_key),
K(next_served_log_id),
K(next_served_log_ts),
K(next_replay_log_id),
K(next_replay_log_ts),
K(next_log_tstamp),
"start_id",
sw_.get_start_id(),
"max_log_id",
......@@ -1002,53 +873,6 @@ int ObLogSlidingWindow::get_next_served_log_info_by_next_replay_log_info(
return ret;
}
// return code:
// OB_EAGIN: log is not ready, need retry
// OB_ERROR_OUT_OF_RANGE: log has been slide out
// other code: failure
int ObLogSlidingWindow::get_log_submit_tstamp_from_task_(const uint64_t log_id, int64_t& log_tstamp)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObLogTask* task = NULL;
const int64_t* ref = NULL;
log_tstamp = OB_INVALID_TIMESTAMP;
if (OB_FAIL(get_log_task(log_id, task, ref))) {
if (OB_ERR_NULL_VALUE == ret) {
ret = OB_EAGAIN;
} else if (OB_ERROR_OUT_OF_RANGE == ret) {
} else {
CLOG_LOG(WARN,
"get log task error",
K(ret),
K(partition_key_),
K(log_id),
"start_id",
sw_.get_start_id(),
"max_log_id",
get_max_log_id());
}
} else if (OB_ISNULL(task)) {
CLOG_LOG(WARN, "invalid task after get_log_task", K(task), K(ret), K(log_id));
ret = OB_ERR_UNEXPECTED;
} else {
task->lock();
if (task->is_submit_log_exist()) {
log_tstamp = task->get_submit_timestamp();
} else {
ret = OB_EAGAIN;
}
task->unlock();
}
if (NULL != ref && OB_SUCCESS != (tmp_ret = revert_log_task(ref))) {
CLOG_LOG(ERROR, "revert_log_task failed", K_(partition_key), K(tmp_ret), K(task));
} else {
ref = NULL;
task = NULL;
}
return ret;
}
int ObLogSlidingWindow::submit_aggre_log(ObAggreBuffer* buffer, const int64_t base_timestamp)
{
int ret = OB_SUCCESS;
......@@ -1248,9 +1072,12 @@ int ObLogSlidingWindow::need_update_log_task_(
ret = OB_INVALID_ARGUMENT;
} else if (task.is_log_confirmed()) {
if (is_confirm_match_(log_id,
header.get_data_checksum(), header.get_epoch_id(),
header.get_submit_timestamp(), task.get_data_checksum(),
task.get_epoch_id(), task.get_submit_timestamp())) {
header.get_data_checksum(),
header.get_epoch_id(),
header.get_submit_timestamp(),
task.get_data_checksum(),
task.get_epoch_id(),
task.get_submit_timestamp())) {
CLOG_LOG(DEBUG, "receive submit log after confirm log, match", K(header), K_(partition_key), K(task));
} else {
ret = OB_INVALID_LOG;
......@@ -1806,11 +1633,18 @@ int ObLogSlidingWindow::submit_confirmed_info_(
*/
if (log_task->is_submit_log_exist()) {
if (!is_confirm_match_(log_id,
log_task->get_data_checksum(), log_task->get_epoch_id(),
log_task->get_submit_timestamp(), confirmed_info.get_data_checksum(),
confirmed_info.get_epoch_id(), confirmed_info.get_submit_timestamp())) {
CLOG_LOG(INFO, "log_task and confirmed_info not match, reset", K_(partition_key),
K(log_id), K(*log_task), K(confirmed_info));
log_task->get_data_checksum(),
log_task->get_epoch_id(),
log_task->get_submit_timestamp(),
confirmed_info.get_data_checksum(),
confirmed_info.get_epoch_id(),
confirmed_info.get_submit_timestamp())) {
CLOG_LOG(INFO,
"log_task and confirmed_info not match, reset",
K_(partition_key),
K(log_id),
K(*log_task),
K(confirmed_info));
log_task->reset_log();
log_task->reset_log_cursor();
}
......@@ -2737,22 +2571,24 @@ int ObLogSlidingWindow::get_log(const uint64_t log_id, const uint32_t log_attr,
return ret;
}
bool ObLogSlidingWindow::is_confirm_match_(const uint64_t log_id,
const int64_t log_data_checksum,
const int64_t log_epoch_id,
const int64_t log_submit_timestamp,
const int64_t confirmed_info_data_checksum,
const int64_t confirmed_info_epoch_id,
const int64_t confirmed_info_submit_timestamp)
bool ObLogSlidingWindow::is_confirm_match_(const uint64_t log_id, const int64_t log_data_checksum,
const int64_t log_epoch_id, const int64_t log_submit_timestamp, const int64_t confirmed_info_data_checksum,
const int64_t confirmed_info_epoch_id, const int64_t confirmed_info_submit_timestamp)
{
bool bret = false;
if (log_data_checksum != confirmed_info_data_checksum
|| log_epoch_id != confirmed_info_epoch_id
|| (OB_INVALID_TIMESTAMP != confirmed_info_submit_timestamp
&& log_submit_timestamp != confirmed_info_submit_timestamp)) {
CLOG_LOG(WARN, "confirm log not match", K_(partition_key), K(log_id), K(log_data_checksum),
K(log_epoch_id), K(log_submit_timestamp), K(confirmed_info_data_checksum),
K(confirmed_info_epoch_id), K(confirmed_info_submit_timestamp));
if (log_data_checksum != confirmed_info_data_checksum || log_epoch_id != confirmed_info_epoch_id ||
(OB_INVALID_TIMESTAMP != confirmed_info_submit_timestamp &&
log_submit_timestamp != confirmed_info_submit_timestamp)) {
CLOG_LOG(WARN,
"confirm log not match",
K_(partition_key),
K(log_id),
K(log_data_checksum),
K(log_epoch_id),
K(log_submit_timestamp),
K(confirmed_info_data_checksum),
K(confirmed_info_epoch_id),
K(confirmed_info_submit_timestamp));
} else {
bret = true;
}
......@@ -3806,19 +3642,18 @@ int ObLogSlidingWindow::do_fetch_log(const uint64_t start_id, const uint64_t end
int ret = OB_SUCCESS;
bool need_check_rebuild = false;
is_fetched = false;
// the follow code is used to test case clog/3050_rebuild_when_leader_reconfirm.test
// don't delete it
// user needs add this configuration to share/parameter/ob_parameter_seed.ipp
// DEF_BOOL(_enable_fetch_log, OB_CLUSTER_PARAMETER, "true",
// "enabl fetch log", ObParameterAttr(Section::OBSERVER, Source::DEFAULT
// , EditLevel::DYNAMIC_EFFECTIVE));
// const bool can_fetch_log = GCONF._enable_fetch_log;
// the follow code is used to test case clog/3050_rebuild_when_leader_reconfirm.test
// don't delete it
// user needs add this configuration to share/parameter/ob_parameter_seed.ipp
// DEF_BOOL(_enable_fetch_log, OB_CLUSTER_PARAMETER, "true",
// "enabl fetch log", ObParameterAttr(Section::OBSERVER, Source::DEFAULT
// , EditLevel::DYNAMIC_EFFECTIVE));
// const bool can_fetch_log = GCONF._enable_fetch_log;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
// } else if (!can_fetch_log) {
// CLOG_LOG(INFO, "can't fetch log", K(ret), K(partition_key_), K(start_id), K(end_id));
} else if (start_id <= 0 || end_id <= 0 || start_id >= end_id || OB_ISNULL(state_mgr_)
|| OB_ISNULL(log_engine_)) {
// } else if (!can_fetch_log) {
// CLOG_LOG(INFO, "can't fetch log", K(ret), K(partition_key_), K(start_id), K(end_id));
} else if (start_id <= 0 || end_id <= 0 || start_id >= end_id || OB_ISNULL(state_mgr_) || OB_ISNULL(log_engine_)) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid arguments", K(ret), K(partition_key_), K(start_id), K(end_id));
} else if (!check_need_fetch_log_(start_id, need_check_rebuild)) {
......@@ -5192,10 +5027,8 @@ int ObLogSlidingWindow::set_confirmed_info_without_lock_(
{
int ret = OB_SUCCESS;
ObConfirmedInfo confirmed_info;
if (OB_FAIL(confirmed_info.init(header.get_data_checksum(),
header.get_epoch_id(),
accum_checksum,
header.get_submit_timestamp()))) {
if (OB_FAIL(confirmed_info.init(
header.get_data_checksum(), header.get_epoch_id(), accum_checksum, header.get_submit_timestamp()))) {
CLOG_LOG(ERROR, "confirmed_info init failed", K_(partition_key), K(header), KR(ret));
} else {
log_task.set_confirmed_info(confirmed_info);
......
......@@ -427,7 +427,6 @@ public:
int submit_replay_task(const bool need_async, bool& is_replayed, bool& is_replay_failed) override;
void destroy();
int alloc_log_id(const int64_t base_timestamp, uint64_t& log_id, int64_t& submit_timestamp) override;
int get_next_timestamp(const uint64_t last_log_id, int64_t& res_ts);
int get_next_served_log_info_by_next_replay_log_info(uint64_t& next_served_log_id, int64_t& next_served_log_ts);
bool is_inited() const
{
......@@ -543,19 +542,12 @@ private:
const int64_t submit_timestamp, ObISubmitLogCb* cb);
int try_freeze_aggre_buffer_(const uint64_t log_id);
int submit_freeze_aggre_buffer_task_(const uint64_t log_id);
int submit_aggre_log_(ObAggreBuffer *buffer,
const uint64_t log_id,
const int64_t submit_timestamp);
int submit_aggre_log_(ObAggreBuffer* buffer, const uint64_t log_id, const int64_t submit_timestamp);
int try_update_submit_timestamp(const int64_t base_ts);
bool is_confirm_match_(const uint64_t log_id,
const int64_t log_data_checksum,
const int64_t log_epoch_id,
const int64_t log_submit_timestamp,
const int64_t confirmed_info_data_checksum,
const int64_t confirmed_info_epoch_id,
const int64_t confirmed_info_submit_timestamp);
int receive_log_(const ObLogEntry &log_entry, const common::ObAddr &server,
const int64_t cluster_id);
bool is_confirm_match_(const uint64_t log_id, const int64_t log_data_checksum, const int64_t log_epoch_id,
const int64_t log_submit_timestamp, const int64_t confirmed_info_data_checksum,
const int64_t confirmed_info_epoch_id, const int64_t confirmed_info_submit_timestamp);
int receive_log_(const ObLogEntry& log_entry, const common::ObAddr& server, const int64_t cluster_id);
void update_max_log_id_(const uint64_t log_id);
int submit_to_sliding_window_(const ObLogEntryHeader& header, const char* buff, ObISubmitLogCb* cb,
const bool need_replay, const bool send_slave, const common::ObAddr& server, const int64_t cluster_id,
......@@ -624,7 +616,6 @@ private:
int generate_backfill_log_task_(const ObLogEntryHeader& header, const char* buff, const ObLogCursor& log_cursor,
ObISubmitLogCb* submit_cb, const bool need_replay, const bool need_copy, const bool need_pinned,
ObLogTask*& task);
int get_log_submit_tstamp_from_task_(const uint64_t log_id, int64_t& log_tstamp);
int check_pre_barrier_(ObLogType log_type) const;
void* alloc_log_task_buf_();
int need_replay_for_data_or_log_replica_(const bool is_trans_log, bool& need_replay) const;
......
......@@ -660,12 +660,9 @@ void ObLogTask::set_confirmed_info(const ObConfirmedInfo& confirmed_info)
const int64_t arg_submit_timestamp = confirmed_info.get_submit_timestamp();
if (is_submit_log_exist()) {
// check data_checksum_ and epoch_id_ when log exists
if (data_checksum_ != arg_data_checksum
|| epoch_id_ != arg_epoch_id
|| (OB_INVALID_TIMESTAMP != arg_submit_timestamp
&& submit_timestamp_ != arg_submit_timestamp)) {
CLOG_LOG(ERROR, "set_confirmed_info meta info not match", K(data_checksum_),
K(epoch_id_), K(confirmed_info));
if (data_checksum_ != arg_data_checksum || epoch_id_ != arg_epoch_id ||
(OB_INVALID_TIMESTAMP != arg_submit_timestamp && submit_timestamp_ != arg_submit_timestamp)) {
CLOG_LOG(ERROR, "set_confirmed_info meta info not match", K(data_checksum_), K(epoch_id_), K(confirmed_info));
}
}
epoch_id_ = arg_epoch_id;
......
......@@ -16,12 +16,9 @@
namespace oceanbase {
using namespace common;
namespace clog
{
int ObConfirmedInfo::init(const int64_t data_checksum,
const int64_t epoch_id,
const int64_t accum_checksum,
const int64_t submit_timestamp)
namespace clog {
int ObConfirmedInfo::init(
const int64_t data_checksum, const int64_t epoch_id, const int64_t accum_checksum, const int64_t submit_timestamp)
{
int ret = OB_SUCCESS;
data_checksum_ = data_checksum;
......@@ -32,9 +29,7 @@ int ObConfirmedInfo::init(const int64_t data_checksum,
}
// used for RPC
OB_SERIALIZE_MEMBER(ObConfirmedInfo, data_checksum_,
epoch_id_, accum_checksum_,
submit_timestamp_);
OB_SERIALIZE_MEMBER(ObConfirmedInfo, data_checksum_, epoch_id_, accum_checksum_, submit_timestamp_);
ObMembershipLog::ObMembershipLog()
: version_(MS_LOG_VERSION),
......
......@@ -24,18 +24,41 @@ class ObConfirmedInfo {
OB_UNIS_VERSION(1);
public:
ObConfirmedInfo() : data_checksum_(0), epoch_id_(common::OB_INVALID_TIMESTAMP),
accum_checksum_(0), submit_timestamp_(common::OB_INVALID_TIMESTAMP) {}
~ObConfirmedInfo() {}
ObConfirmedInfo()
: data_checksum_(0),
epoch_id_(common::OB_INVALID_TIMESTAMP),
accum_checksum_(0),
submit_timestamp_(common::OB_INVALID_TIMESTAMP)
{}
~ObConfirmedInfo()
{}
public:
int init(const int64_t data_checksum, const int64_t epoch_id,
const int64_t accum_checksum, const int64_t submit_timestamp_);
int64_t get_data_checksum() const { return data_checksum_; }
int64_t get_epoch_id() const { return epoch_id_; }
int64_t get_accum_checksum() const { return accum_checksum_; }
int64_t get_submit_timestamp() const { return submit_timestamp_; }
void reset() { data_checksum_ = 0; epoch_id_ = common::OB_INVALID_TIMESTAMP; accum_checksum_ = 0; }
void deep_copy(const ObConfirmedInfo &confirmed_info)
int init(const int64_t data_checksum, const int64_t epoch_id, const int64_t accum_checksum,
const int64_t submit_timestamp_);
int64_t get_data_checksum() const
{
return data_checksum_;
}
int64_t get_epoch_id() const
{
return epoch_id_;
}
int64_t get_accum_checksum() const
{
return accum_checksum_;
}
int64_t get_submit_timestamp() const
{
return submit_timestamp_;
}
void reset()
{
data_checksum_ = 0;
epoch_id_ = common::OB_INVALID_TIMESTAMP;
accum_checksum_ = 0;
}
void deep_copy(const ObConfirmedInfo& confirmed_info)
{
data_checksum_ = confirmed_info.data_checksum_;
epoch_id_ = confirmed_info.epoch_id_;
......@@ -50,6 +73,7 @@ private:
int64_t epoch_id_;
int64_t accum_checksum_;
int64_t submit_timestamp_;
private:
DISALLOW_COPY_AND_ASSIGN(ObConfirmedInfo);
};
......
......@@ -2922,8 +2922,15 @@ int ObPartitionLogService::fetch_log_from_ilog_storage_(const uint64_t log_id, c
CLOG_LOG(INFO, "this replica need rebuild", K(ret), K(server), K(partition_key_), K(log_id), K(read_from_clog));
uint64_t last_replay_log_id = OB_INVALID_ID;
if (OB_FAIL(get_storage_last_replay_log_id_(last_replay_log_id))) {
CLOG_LOG(WARN, "get_storage_last_replay_log_id_ failed", K(ret), K(partition_key_), K(log_id), K(fetch_type),
K(proposal_id), K(server), K(need_send_confirm_info));
CLOG_LOG(WARN,
"get_storage_last_replay_log_id_ failed",
K(ret),
K(partition_key_),
K(log_id),
K(fetch_type),
K(proposal_id),
K(server),
K(need_send_confirm_info));
} else if (log_id > last_replay_log_id) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR,
......@@ -5891,9 +5898,9 @@ int ObPartitionLogService::send_confirm_info_(const common::ObAddr& server, cons
const uint64_t log_id = log_entry.get_header().get_log_id();
ObConfirmedInfo confirmed_info;
if (OB_SUCCESS != (ret = confirmed_info.init(log_entry.get_header().get_data_checksum(),
log_entry.get_header().get_epoch_id(),
accum_checksum,
log_entry.get_header().get_submit_timestamp()))) {
log_entry.get_header().get_epoch_id(),
accum_checksum,
log_entry.get_header().get_submit_timestamp()))) {
CLOG_LOG(WARN, "confirmed_info init failed", K_(partition_key), K(ret));
} else if (OB_SUCCESS != (ret = log_engine_->submit_confirmed_info(
list, partition_key_, log_id, confirmed_info, batch_committed))) {
......@@ -5910,20 +5917,6 @@ int ObPartitionLogService::send_confirm_info_(const common::ObAddr& server, cons
return ret;
}
int ObPartitionLogService::get_next_timestamp(const uint64_t last_log_id, int64_t& res_ts)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (OB_UNLIKELY(OB_INVALID_ID == last_log_id)) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "get next timestamp error", K(ret), K(last_log_id));
} else {
ret = sw_.get_next_timestamp(last_log_id, res_ts);
}
return ret;
}
int ObPartitionLogService::try_update_next_replay_log_ts_in_restore(const int64_t new_ts)
{
int ret = OB_SUCCESS;
......@@ -8271,8 +8264,7 @@ int ObPartitionLogService::check_and_try_leader_revoke(const ObElection::RevokeT
CLOG_LOG(ERROR, "check_majority_replica_clog_disk_full_ failed", K(ret));
} else {
need_revoke = !majority_is_clog_disk_full;
CLOG_LOG(INFO, "partition may need revoke, ", "need revoke is ", need_revoke,
"and revoke type is ", revoke_type);
CLOG_LOG(INFO, "partition may need revoke, ", "need revoke is ", need_revoke, "and revoke type is ", revoke_type);
}
}
......@@ -8460,5 +8452,5 @@ int ObPartitionLogService::get_role_and_leader_epoch_unlock_(
return ret;
}
} // namespace clog
} // namespace oceanbase
} // namespace clog
} // namespace oceanbase
......@@ -381,7 +381,6 @@ public:
virtual int force_set_parent(const common::ObAddr& new_parent) = 0;
virtual int force_reset_parent() = 0;
virtual int force_set_server_list(const obrpc::ObServerList& server_list, const int64_t replica_num) = 0;
virtual int get_next_timestamp(const uint64_t last_log_id, int64_t& res_ts) = 0;
virtual int get_next_served_log_info_for_leader(uint64_t& next_served_log_id, int64_t& next_served_log_ts) = 0;
virtual uint64_t get_next_index_log_id() const = 0;
virtual int get_pls_epoch(int64_t& pls_epoch) const = 0;
......@@ -663,7 +662,6 @@ public:
virtual int flush_cb(const ObLogFlushCbArg& arg) override;
virtual int on_get_election_priority(election::ObElectionPriority& priority) override;
virtual int on_change_leader_retry(const common::ObAddr& server, ObTsWindows& changing_leader_windows) override;
virtual int get_next_timestamp(const uint64_t last_log_id, int64_t& res_ts) override;
virtual int get_next_served_log_info_for_leader(uint64_t& next_served_log_id, int64_t& next_served_log_ts) override;
virtual uint64_t get_next_index_log_id() const override
{
......
......@@ -102,9 +102,8 @@ inline int parse_log_item_type(const char* buf, const int64_t len, ObCLogItemTyp
// or the magic of this block is a ilog entry or clog entry), we need to read all subsequent contents of this
// file, check whether there is a valid block and the timestamp recorded in the block header is greater than
// or equal to last_block_ts, if not, the end of the file is read.
template<class Type, class Interface>
class ObRawEntryIterator: public Interface
{
template <class Type, class Interface>
class ObRawEntryIterator : public Interface {
public:
ObRawEntryIterator();
virtual ~ObRawEntryIterator();
......@@ -150,7 +149,7 @@ private:
//
// Since our log disk space is large enough, log files will not be reused within two seconds,
// so this constant is safe in the scenario of reusing files.
static const int64_t CHECK_LAST_BLOCK_TS_INTERVAL = 2000 * 1000; // 2s
static const int64_t CHECK_LAST_BLOCK_TS_INTERVAL = 2000 * 1000; // 2s
private:
bool is_inited_;
ObILogDirectReader* reader_;
......@@ -716,10 +715,9 @@ int ObRawEntryIterator<Type, Interface>::next_entry(Type& entry, ObReadParam& pa
// last_block_ts must be vaild, because of this:
// 1. Write file header is atomic, therefore, the last_block_ts is valid
// 2. else, file header is ObNewLogFileBuf
template<class Type, class Interface>
bool ObRawEntryIterator<Type, Interface>::check_last_block_(const file_id_t file_id,
const offset_t start_offset,
const int64_t last_block_ts) const
template <class Type, class Interface>
bool ObRawEntryIterator<Type, Interface>::check_last_block_(
const file_id_t file_id, const offset_t start_offset, const int64_t last_block_ts) const
{
int ret = common::OB_SUCCESS;
bool bool_ret = false;
......
......@@ -116,7 +116,6 @@ void oceanbase::observer::init_srv_xlator_for_clog(ObSrvRpcXlator* xlator)
RPC_PROCESSOR(ObLogReqStartLogIdByTsProcessor, gctx_.par_ser_);
RPC_PROCESSOR(ObLogReqStartPosByLogIdProcessor, gctx_.par_ser_);
RPC_PROCESSOR(ObLogExternalFetchLogProcessor, gctx_.par_ser_);
RPC_PROCESSOR(ObLogReqHeartbeatInfoProcessor, gctx_.par_ser_);
RPC_PROCESSOR(ObLogReqStartLogIdByTsProcessorWithBreakpoint, gctx_.par_ser_);
RPC_PROCESSOR(ObLogReqStartPosByLogIdProcessorWithBreakpoint, gctx_.par_ser_);
RPC_PROCESSOR(ObLogOpenStreamProcessor, gctx_.par_ser_);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册