diff --git a/src/clog/ob_external_fetcher.cpp b/src/clog/ob_external_fetcher.cpp index d649916b9c020b55b4992eced1a1e5142517a569..e9dd7a34c0b772b251422b0d5c361adbf77ba0ea 100644 --- a/src/clog/ob_external_fetcher.cpp +++ b/src/clog/ob_external_fetcher.cpp @@ -291,10 +291,17 @@ int ObExtLogFetcher::handle_log_not_exist( return ret; } -int ObExtLogFetcher::after_partition_fetch_log(ObStreamItem& stream_item, const uint64_t beyond_upper_log_id, - const int64_t beyond_upper_log_ts, const int64_t fetched_log_count, ObLogStreamFetchLogResp& resp) +int ObExtLogFetcher::after_partition_fetch_log(ObStreamItem &stream_item, + const uint64_t beyond_upper_log_id, + const int64_t beyond_upper_log_ts, + const int64_t fetched_log_count, + ObLogStreamFetchLogResp &resp, + const ObLogCursorExt *cursor_ext, + clog::ObReadCost &read_cost) { int ret = OB_SUCCESS; + int64_t upper_log_ts = beyond_upper_log_ts; + const ObPartitionKey &pkey = stream_item.pkey_; // 1. dealing with heartbeat hollow if ((0 == fetched_log_count) && OB_INVALID_ID != beyond_upper_log_id) { // log hole problem: @@ -302,25 +309,84 @@ int ObExtLogFetcher::after_partition_fetch_log(ObStreamItem& stream_item, const // adjacent log timestamps is greater than the expected log time interval length of liboblog, // It may cause the slowest pkey to be unable to advance since upper_limit_ts limit of the expected log time // interval. At this time, the next log timestamp -1 is returned to liboblog to advance the progress of this pkey. + if (NULL != cursor_ext) { + // update current submit timestamp for aggregate log + if (OB_UNLIKELY(!cursor_ext->is_valid())) { + LOG_WARN("cursor not valid", K(ret), KPC(cursor_ext)); + } else if (OB_FAIL(get_aggre_log_min_timestamp(pkey, *cursor_ext, upper_log_ts, read_cost))) { + LOG_WARN("resp get_aggre_log_min_timestamp error", K(ret), KPC(cursor_ext), K(upper_log_ts)); + } else if (upper_log_ts != cursor_ext->get_submit_timestamp()) { + LOG_TRACE("next log is aggregate log, update first log id as beyond_upper_log_ts", + K(upper_log_ts), K(cursor_ext->get_submit_timestamp())); + } + } ObLogStreamFetchLogResp::FetchLogHeartbeatItem hbp; - hbp.pkey_ = stream_item.pkey_; + hbp.pkey_ = pkey; hbp.next_log_id_ = beyond_upper_log_id; - hbp.heartbeat_ts_ = beyond_upper_log_ts - 1; + hbp.heartbeat_ts_ = upper_log_ts - 1; if (OB_FAIL(resp.append_hb(hbp))) { LOG_WARN("resp append fetch_log heartbeat error", K(ret), K(hbp)); } else { LOG_TRACE("resp append fetch_log heartbeat success", K(hbp)); // update and log progress - stream_item.fetch_progress_ts_ = beyond_upper_log_ts - 1; + stream_item.fetch_progress_ts_ = upper_log_ts - 1; + } + } + return ret; +} + +int ObExtLogFetcher::get_aggre_log_min_timestamp(const ObPartitionKey &pkey, + const clog::ObLogCursorExt &cursor_ext, + int64_t &first_log_ts, + ObReadCost &read_cost) +{ + int ret = OB_SUCCESS; + bool fetch_log_from_hot_cache = true; + int64_t log_entry_size = 0; + int64_t end_tstamp = INT64_MAX; // no need for timeout limit + ObReadParam param; + param.offset_ = cursor_ext.get_offset(); + param.read_len_ = cursor_ext.get_size(); + param.file_id_ = cursor_ext.get_file_id(); + ObReadBufGuard guard(ObModIds::OB_LOG_DECRYPT_ID); + ObReadBuf &rbuf = guard.get_read_buf(); + if (OB_FAIL(fetch_log_entry_(pkey, param, rbuf.buf_, rbuf.buf_len_, end_tstamp, + read_cost, fetch_log_from_hot_cache, log_entry_size))) { + LOG_WARN("failed to fetch log entry", K(ret), K(param), K(pkey)); + } else { + clog::ObLogEntry log_entry; + int64_t log_entry_pos = 0; + if (OB_FAIL(log_entry.deserialize(rbuf.buf_, rbuf.buf_len_, log_entry_pos))) { + LOG_WARN("failed to deserialize log entry", K(ret), K(rbuf), K(log_entry_pos)); + } else if (OB_LOG_AGGRE == log_entry.get_header().get_log_type()) { + const char *data_buf = log_entry.get_buf(); + const int64_t data_len = log_entry.get_header().get_data_len(); + int32_t next_log_offset = 0; + int64_t pos = 0; + if (OB_FAIL(serialization::decode_i32(data_buf, data_len, pos, &next_log_offset))) { + LOG_WARN("serialization::decode_i32 failed", K(ret), K(data_len), K(pos), KP(data_buf)); + } else if (OB_FAIL(serialization::decode_i64(data_buf, data_len, pos, &first_log_ts))) { + // update first log ts as aggre log ts + LOG_WARN("serialization::decode_i64 failed", K(ret), K(data_len), K(pos), KP(data_buf)); + } else { + LOG_TRACE("get_aggre_log_min_timestamp", K(ret), K(data_len), K(pos), KP(data_buf), + K(first_log_ts), K(next_log_offset)); + } + } else { + // not aggregate log, no need to update } } return ret; } // Get single log entry -int ObExtLogFetcher::partition_fetch_log_entry_(const ObLogCursorExt& cursor_ext, const ObPartitionKey& pkey, - const int64_t end_tstamp, ObReadCost& read_cost, ObLogStreamFetchLogResp& resp, bool& fetch_log_from_hot_cache, - int64_t& log_entry_size) +int ObExtLogFetcher::partition_fetch_log_entry_(const ObLogCursorExt &cursor_ext, + const ObPartitionKey &pkey, + const int64_t end_tstamp, + ObReadCost &read_cost, + ObLogStreamFetchLogResp &resp, + bool &fetch_log_from_hot_cache, + int64_t &log_entry_size) { int ret = OB_SUCCESS; int64_t remain_size = 0; @@ -474,6 +540,7 @@ int ObExtLogFetcher::partition_fetch_log(ObStreamItem& stream_item, FetchRunTime const ObPartitionKey& pkey = stream_item.pkey_; uint64_t beyond_upper_log_id = OB_INVALID_ID; int64_t beyond_upper_log_ts = OB_INVALID_TIMESTAMP; + const ObLogCursorExt *next_cursor = NULL; // Note: After the optimization of step by step, the count of logs fetched in each round of // each partition will be "suddenly reduced". It is possible to get only a few logs per round, @@ -481,8 +548,6 @@ int ObExtLogFetcher::partition_fetch_log(ObStreamItem& stream_item, FetchRunTime // In order to optimize performance, during the log fetching process, the current timestamp is // no longer used to evaluate the log fetching time, and the log reading time monitoring is temporarily removed. while (OB_SUCCESS == ret && !part_fetch_stopped && !frt.is_stopped()) { - const ObLogCursorExt* next_cursor = NULL; - if (is_time_up_(log_count, end_tstamp)) { // time up, stop fetching logs globally frt.stop("TimeUP"); part_stop_reason = "TimeUP"; @@ -562,7 +627,8 @@ int ObExtLogFetcher::partition_fetch_log(ObStreamItem& stream_item, FetchRunTime } if (OB_SUCCESS == ret) { - if (OB_FAIL(after_partition_fetch_log(stream_item, beyond_upper_log_id, beyond_upper_log_ts, log_count, resp))) { + if (OB_FAIL(after_partition_fetch_log(stream_item, beyond_upper_log_id, beyond_upper_log_ts, + log_count, resp, next_cursor, frt.read_cost_))) { LOG_WARN("after partition fetch log error", K(ret), K(stream_item), diff --git a/src/clog/ob_external_fetcher.h b/src/clog/ob_external_fetcher.h index 8ffdb41d31afe6784ea206202f0b86df8b71ccbb..b55b5c0c03bde8f3453a95595f87dd56551484f9 100644 --- a/src/clog/ob_external_fetcher.h +++ b/src/clog/ob_external_fetcher.h @@ -164,7 +164,10 @@ private: int handle_log_not_exist( const common::ObPartitionKey& pkey, const uint64_t next_log_id, obrpc::ObLogStreamFetchLogResp& resp); int after_partition_fetch_log(ObStreamItem& stream_item, const uint64_t beyond_upper_log_id, - const int64_t beyond_upper_log_ts, const int64_t fetched_log_count, obrpc::ObLogStreamFetchLogResp& resp); + const int64_t beyond_upper_log_ts, const int64_t fetched_log_count, obrpc::ObLogStreamFetchLogResp& resp, + const clog::ObLogCursorExt *cursor_ext, clog::ObReadCost &read_cost); + int get_aggre_log_min_timestamp(const common::ObPartitionKey &pkey, const clog::ObLogCursorExt &cursor_ext, + int64_t &first_log_ts, clog::ObReadCost &read_cost); int prefill_resp_with_clog_entry(const clog::ObLogCursorExt& cursor_ext, const common::ObPartitionKey& pkey, const int64_t end_tstamp, clog::ObReadCost& read_cost, obrpc::ObLogStreamFetchLogResp& resp, bool& fetch_log_from_hot_cache, int64_t& log_entry_size);