提交 24cb9387 编写于 作者: O obdev 提交者: wangzelin.wzl

update first log timestamp as heartbeat response for aggregate log

上级 a997b238
......@@ -291,10 +291,17 @@ int ObExtLogFetcher::handle_log_not_exist(
return ret;
}
int ObExtLogFetcher::after_partition_fetch_log(ObStreamItem& stream_item, const uint64_t beyond_upper_log_id,
const int64_t beyond_upper_log_ts, const int64_t fetched_log_count, ObLogStreamFetchLogResp& resp)
int ObExtLogFetcher::after_partition_fetch_log(ObStreamItem &stream_item,
const uint64_t beyond_upper_log_id,
const int64_t beyond_upper_log_ts,
const int64_t fetched_log_count,
ObLogStreamFetchLogResp &resp,
const ObLogCursorExt *cursor_ext,
clog::ObReadCost &read_cost)
{
int ret = OB_SUCCESS;
int64_t upper_log_ts = beyond_upper_log_ts;
const ObPartitionKey &pkey = stream_item.pkey_;
// 1. dealing with heartbeat hollow
if ((0 == fetched_log_count) && OB_INVALID_ID != beyond_upper_log_id) {
// log hole problem:
......@@ -302,25 +309,84 @@ int ObExtLogFetcher::after_partition_fetch_log(ObStreamItem& stream_item, const
// adjacent log timestamps is greater than the expected log time interval length of liboblog,
// It may cause the slowest pkey to be unable to advance since upper_limit_ts limit of the expected log time
// interval. At this time, the next log timestamp -1 is returned to liboblog to advance the progress of this pkey.
if (NULL != cursor_ext) {
// update current submit timestamp for aggregate log
if (OB_UNLIKELY(!cursor_ext->is_valid())) {
LOG_WARN("cursor not valid", K(ret), KPC(cursor_ext));
} else if (OB_FAIL(get_aggre_log_min_timestamp(pkey, *cursor_ext, upper_log_ts, read_cost))) {
LOG_WARN("resp get_aggre_log_min_timestamp error", K(ret), KPC(cursor_ext), K(upper_log_ts));
} else if (upper_log_ts != cursor_ext->get_submit_timestamp()) {
LOG_TRACE("next log is aggregate log, update first log id as beyond_upper_log_ts",
K(upper_log_ts), K(cursor_ext->get_submit_timestamp()));
}
}
ObLogStreamFetchLogResp::FetchLogHeartbeatItem hbp;
hbp.pkey_ = stream_item.pkey_;
hbp.pkey_ = pkey;
hbp.next_log_id_ = beyond_upper_log_id;
hbp.heartbeat_ts_ = beyond_upper_log_ts - 1;
hbp.heartbeat_ts_ = upper_log_ts - 1;
if (OB_FAIL(resp.append_hb(hbp))) {
LOG_WARN("resp append fetch_log heartbeat error", K(ret), K(hbp));
} else {
LOG_TRACE("resp append fetch_log heartbeat success", K(hbp));
// update and log progress
stream_item.fetch_progress_ts_ = beyond_upper_log_ts - 1;
stream_item.fetch_progress_ts_ = upper_log_ts - 1;
}
}
return ret;
}
int ObExtLogFetcher::get_aggre_log_min_timestamp(const ObPartitionKey &pkey,
const clog::ObLogCursorExt &cursor_ext,
int64_t &first_log_ts,
ObReadCost &read_cost)
{
int ret = OB_SUCCESS;
bool fetch_log_from_hot_cache = true;
int64_t log_entry_size = 0;
int64_t end_tstamp = INT64_MAX; // no need for timeout limit
ObReadParam param;
param.offset_ = cursor_ext.get_offset();
param.read_len_ = cursor_ext.get_size();
param.file_id_ = cursor_ext.get_file_id();
ObReadBufGuard guard(ObModIds::OB_LOG_DECRYPT_ID);
ObReadBuf &rbuf = guard.get_read_buf();
if (OB_FAIL(fetch_log_entry_(pkey, param, rbuf.buf_, rbuf.buf_len_, end_tstamp,
read_cost, fetch_log_from_hot_cache, log_entry_size))) {
LOG_WARN("failed to fetch log entry", K(ret), K(param), K(pkey));
} else {
clog::ObLogEntry log_entry;
int64_t log_entry_pos = 0;
if (OB_FAIL(log_entry.deserialize(rbuf.buf_, rbuf.buf_len_, log_entry_pos))) {
LOG_WARN("failed to deserialize log entry", K(ret), K(rbuf), K(log_entry_pos));
} else if (OB_LOG_AGGRE == log_entry.get_header().get_log_type()) {
const char *data_buf = log_entry.get_buf();
const int64_t data_len = log_entry.get_header().get_data_len();
int32_t next_log_offset = 0;
int64_t pos = 0;
if (OB_FAIL(serialization::decode_i32(data_buf, data_len, pos, &next_log_offset))) {
LOG_WARN("serialization::decode_i32 failed", K(ret), K(data_len), K(pos), KP(data_buf));
} else if (OB_FAIL(serialization::decode_i64(data_buf, data_len, pos, &first_log_ts))) {
// update first log ts as aggre log ts
LOG_WARN("serialization::decode_i64 failed", K(ret), K(data_len), K(pos), KP(data_buf));
} else {
LOG_TRACE("get_aggre_log_min_timestamp", K(ret), K(data_len), K(pos), KP(data_buf),
K(first_log_ts), K(next_log_offset));
}
} else {
// not aggregate log, no need to update
}
}
return ret;
}
// Get single log entry
int ObExtLogFetcher::partition_fetch_log_entry_(const ObLogCursorExt& cursor_ext, const ObPartitionKey& pkey,
const int64_t end_tstamp, ObReadCost& read_cost, ObLogStreamFetchLogResp& resp, bool& fetch_log_from_hot_cache,
int64_t& log_entry_size)
int ObExtLogFetcher::partition_fetch_log_entry_(const ObLogCursorExt &cursor_ext,
const ObPartitionKey &pkey,
const int64_t end_tstamp,
ObReadCost &read_cost,
ObLogStreamFetchLogResp &resp,
bool &fetch_log_from_hot_cache,
int64_t &log_entry_size)
{
int ret = OB_SUCCESS;
int64_t remain_size = 0;
......@@ -474,6 +540,7 @@ int ObExtLogFetcher::partition_fetch_log(ObStreamItem& stream_item, FetchRunTime
const ObPartitionKey& pkey = stream_item.pkey_;
uint64_t beyond_upper_log_id = OB_INVALID_ID;
int64_t beyond_upper_log_ts = OB_INVALID_TIMESTAMP;
const ObLogCursorExt *next_cursor = NULL;
// Note: After the optimization of step by step, the count of logs fetched in each round of
// each partition will be "suddenly reduced". It is possible to get only a few logs per round,
......@@ -481,8 +548,6 @@ int ObExtLogFetcher::partition_fetch_log(ObStreamItem& stream_item, FetchRunTime
// In order to optimize performance, during the log fetching process, the current timestamp is
// no longer used to evaluate the log fetching time, and the log reading time monitoring is temporarily removed.
while (OB_SUCCESS == ret && !part_fetch_stopped && !frt.is_stopped()) {
const ObLogCursorExt* next_cursor = NULL;
if (is_time_up_(log_count, end_tstamp)) { // time up, stop fetching logs globally
frt.stop("TimeUP");
part_stop_reason = "TimeUP";
......@@ -562,7 +627,8 @@ int ObExtLogFetcher::partition_fetch_log(ObStreamItem& stream_item, FetchRunTime
}
if (OB_SUCCESS == ret) {
if (OB_FAIL(after_partition_fetch_log(stream_item, beyond_upper_log_id, beyond_upper_log_ts, log_count, resp))) {
if (OB_FAIL(after_partition_fetch_log(stream_item, beyond_upper_log_id, beyond_upper_log_ts,
log_count, resp, next_cursor, frt.read_cost_))) {
LOG_WARN("after partition fetch log error",
K(ret),
K(stream_item),
......
......@@ -164,7 +164,10 @@ private:
int handle_log_not_exist(
const common::ObPartitionKey& pkey, const uint64_t next_log_id, obrpc::ObLogStreamFetchLogResp& resp);
int after_partition_fetch_log(ObStreamItem& stream_item, const uint64_t beyond_upper_log_id,
const int64_t beyond_upper_log_ts, const int64_t fetched_log_count, obrpc::ObLogStreamFetchLogResp& resp);
const int64_t beyond_upper_log_ts, const int64_t fetched_log_count, obrpc::ObLogStreamFetchLogResp& resp,
const clog::ObLogCursorExt *cursor_ext, clog::ObReadCost &read_cost);
int get_aggre_log_min_timestamp(const common::ObPartitionKey &pkey, const clog::ObLogCursorExt &cursor_ext,
int64_t &first_log_ts, clog::ObReadCost &read_cost);
int prefill_resp_with_clog_entry(const clog::ObLogCursorExt& cursor_ext, const common::ObPartitionKey& pkey,
const int64_t end_tstamp, clog::ObReadCost& read_cost, obrpc::ObLogStreamFetchLogResp& resp,
bool& fetch_log_from_hot_cache, int64_t& log_entry_size);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册