diff --git a/deps/oblib/src/common/ob_clock_generator.h b/deps/oblib/src/common/ob_clock_generator.h index 97a720e2f9dd2cf940efe26929d9c4f9c8999425..e4fb6e9ba14ade64e775c7656246922e636ce2a2 100644 --- a/deps/oblib/src/common/ob_clock_generator.h +++ b/deps/oblib/src/common/ob_clock_generator.h @@ -15,6 +15,7 @@ #include #include +#include "lib/ob_define.h" #include "lib/oblog/ob_log.h" #include "lib/atomic/ob_atomic.h" #include "lib/lock/Monitor.h" @@ -42,6 +43,7 @@ public: static int64_t getCurrentTime(); static void msleep(const int64_t ms); static void usleep(const int64_t us); + static void try_advance_cur_ts(const int64_t cur_ts); private: int64_t get_us(); @@ -109,7 +111,20 @@ inline void ObClockGenerator::usleep(const int64_t us) } } -inline int64_t ObClockGenerator::get_us() +inline void ObClockGenerator::try_advance_cur_ts(const int64_t cur_ts) +{ + int64_t origin_cur_ts = OB_INVALID_TIMESTAMP; + do { + origin_cur_ts = ATOMIC_LOAD(&clock_generator_.cur_ts_); + if (origin_cur_ts < cur_ts) { + break; + } else { + TRANS_LOG(WARN, "timestamp rollback, need advance cur ts", K(origin_cur_ts), K(cur_ts)); + } + } while (false == ATOMIC_BCAS(&clock_generator_.cur_ts_, origin_cur_ts, cur_ts)); +} + +OB_INLINE int64_t ObClockGenerator::get_us() { return common::ObTimeUtility::current_time(); } diff --git a/src/clog/ob_clog_mgr.cpp b/src/clog/ob_clog_mgr.cpp index eb806cc16700f866eb5ec766b4b32176d41965b7..f09fbfa72612c2bd34e4d140be0fec7dc77cea3c 100644 --- a/src/clog/ob_clog_mgr.cpp +++ b/src/clog/ob_clog_mgr.cpp @@ -3415,5 +3415,18 @@ bool ObCLogMgr::is_server_archive_stop(const int64_t incarnation, const int64_t return archive_mgr_.is_server_archive_stop(incarnation, archive_round); } +int ObCLogMgr::get_server_min_log_ts(int64_t &server_min_log_ts) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + CLOG_LOG(WARN, "clog_mgr is not inited", KR(ret)); + } else if (OB_FAIL(log_engine_.get_server_min_log_ts(server_min_log_ts))) { + CLOG_LOG(WARN, "failed to get_server_min_log_ts", KR(ret)); + } else { + CLOG_LOG(INFO, "get_server_min_log_ts success", K(server_min_log_ts)); + } + return ret; +} } // namespace clog } // end namespace oceanbase diff --git a/src/clog/ob_clog_mgr.h b/src/clog/ob_clog_mgr.h index 06df1c93f010bdc09d39aabfc8c6992fc064bcf2..712d34124d1222456fb3d089182764d2831f2613 100644 --- a/src/clog/ob_clog_mgr.h +++ b/src/clog/ob_clog_mgr.h @@ -300,6 +300,7 @@ public: const common::ObPartitionKey& pkey, const int64_t incarnation, const int64_t archive_round) = 0; virtual int get_archive_pg_map(archive::PGArchiveMap*& map) = 0; virtual bool is_server_archive_stop(const int64_t incarnation, const int64_t archive_round) = 0; + virtual int get_server_min_log_ts(int64_t &server_min_log_ts) = 0; }; class ObCLogMgr : public ObICLogMgr { @@ -429,6 +430,8 @@ public: { return cb_engine_; } + + virtual int get_server_min_log_ts(int64_t &server_min_log_ts); // ==================== physical flashback ===================== int delete_all_log_files() override; // ==================== log archive ===================== diff --git a/src/clog/ob_clog_writer.h b/src/clog/ob_clog_writer.h index 3fada851176e3dcbefc7c64991c02636bebf87fa..03b59d6d6fd904d2ff3678c664e711da0673db76 100644 --- a/src/clog/ob_clog_writer.h +++ b/src/clog/ob_clog_writer.h @@ -141,6 +141,7 @@ public: int ret = common::OB_SUCCESS; file_id_t range_min_file_id = common::OB_INVALID_FILE_ID; file_id_t range_max_file_id = common::OB_INVALID_FILE_ID; + int64_t last_entry_ts = common::OB_INVALID_TIMESTAMP; // When empty folder, because the file header block occupies DIO_ALIGN_SIZE, // so the file start offset begins from CLOG_DIO_ALIGN_SIZE ObRawEntryIterator iter; @@ -165,7 +166,11 @@ public: if (common::OB_ITER_END == ret) { file_id = param.file_id_; offset = param.offset_; - CLOG_LOG(INFO, "iter next_entry finish", K(ret), K(param), K(entry)); // FIXME + last_entry_ts = entry.get_submit_timestamp(); + if (OB_INVALID_TIMESTAMP != last_entry_ts) { + ObClockGenerator::try_advance_cur_ts(last_entry_ts); + } + CLOG_LOG(INFO, "iter next_entry finish", K(ret), K(param), K(entry)); // FIXME ret = common::OB_SUCCESS; } else { CLOG_LOG(ERROR, "get cursor fail", K(ret), K(param), K(entry), K(range_max_file_id)); diff --git a/src/clog/ob_log_engine.cpp b/src/clog/ob_log_engine.cpp index 2d9f94d156abbf76072ab1feb87e22aec8dffe1c..6d7189e12e335c9f0d46b09e2c2c9a6f35488505 100644 --- a/src/clog/ob_log_engine.cpp +++ b/src/clog/ob_log_engine.cpp @@ -10,11 +10,13 @@ * See the Mulan PubL v2 for more details. */ -#include #include "ob_log_engine.h" +#include #include "common/ob_member_list.h" #include "lib/file/file_directory_utils.h" +#include "lib/ob_define.h" #include "lib/thread_local/thread_buffer.h" +#include "lib/time/ob_time_utility.h" #include "rpc/obrpc/ob_rpc_net_handler.h" #include "share/ob_cluster_version.h" #include "share/ob_server_blacklist.h" @@ -2595,6 +2597,63 @@ bool ObLogEngine::is_clog_disk_hang() const return is_disk_hang; } +int ObLogEngine::get_server_min_log_ts(int64_t &server_min_log_ts) +{ + int ret = OB_SUCCESS; + ObLogBlockMetaV2 log_block; + ObReadBuf read_buf; + ObReadRes res; + ObReadCost cost; + ObReadParam read_param; + const int64_t in_read_size = 4 * 1024; + read_buf.buf_len_ = in_read_size + CLOG_DIO_ALIGN_SIZE; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (NULL == (read_buf.buf_ + = static_cast(ob_malloc_align(CLOG_DIO_ALIGN_SIZE, read_buf.buf_len_, "LogEngine")))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + } else { + const int64_t TIMEOUT_TS = 1 * 1000 * 1000; + int64_t start_ts = ObTimeUtility::current_time(); + do { + file_id_t min_file_id = get_clog_min_file_id(); + read_param.file_id_ = min_file_id; + read_param.offset_ = 0; + read_param.read_len_ = in_read_size; + if (OB_INVALID_FILE_ID == min_file_id) { + ret = OB_ENTRY_NOT_EXIST; + } else if (OB_FAIL(read_data_direct(read_param, read_buf, res, cost))) { + CLOG_LOG(WARN, "read_log_by_location failed", K(ret), K(read_param)); + } else { + } + int64_t cost_ts = ObTimeUtility::current_time() - start_ts; + if (OB_NO_SUCH_FILE_OR_DIRECTORY == ret) { + if (cost_ts >= TIMEOUT_TS) { + ret = OB_TIMEOUT; + CLOG_LOG(WARN, "get_server_min_log_ts timeout", K(ret)); + } else { + usleep(100 * 1000); + } + } + } while (OB_NO_SUCH_FILE_OR_DIRECTORY == ret); + } + if (OB_SUCC(ret)) { + int64_t pos = 0; + if (OB_FAIL(log_block.deserialize(res.buf_, res.data_len_, pos))) { + } else if (false == log_block.check_meta_checksum()) { + ret = OB_INVALID_DATA; + CLOG_LOG(WARN, "LogBlock has been corrupt", K(ret), K(log_block)); + } else { + server_min_log_ts = log_block.get_timestamp(); + CLOG_LOG(INFO, "ObLogEngine get_server_min_log_ts success", K(server_min_log_ts), K(read_param)); + } + } + if (true == read_buf.is_valid()) { + ob_free_align(read_buf.buf_); + } + return ret; +} + NetworkLimitManager::NetworkLimitManager() : is_inited_(false), addr_array_(), ethernet_speed_(0), hash_map_() {} diff --git a/src/clog/ob_log_engine.h b/src/clog/ob_log_engine.h index 8c078f4702fc8da2fa9e80eeef527cd42b2de27a..57fa052d3e717a8fbbd2e196c85031bc3d374f5f 100644 --- a/src/clog/ob_log_engine.h +++ b/src/clog/ob_log_engine.h @@ -537,6 +537,7 @@ public: int get_clog_using_disk_space(int64_t &space) const; int get_ilog_using_disk_space(int64_t &space) const; bool is_clog_disk_hang() const; + int get_server_min_log_ts(int64_t &server_min_log_ts); private: int fetch_log_from_server( diff --git a/src/clog/ob_log_entry.h b/src/clog/ob_log_entry.h index 40180328c2251533c1d81149bde4705b16226417..76bfccd44576e220b0261eb965ca2e5efeb3e143 100644 --- a/src/clog/ob_log_entry.h +++ b/src/clog/ob_log_entry.h @@ -60,6 +60,7 @@ public: return header_.update_proposal_id(new_proposal_id); } int get_next_replay_ts_for_rg(int64_t& next_replay_ts) const; + int64_t get_submit_timestamp() const { return header_.get_submit_timestamp(); } TO_STRING_KV(N_HEADER, header_); NEED_SERIALIZE_AND_DESERIALIZE;