提交 ab066ef7 编写于 作者: H HaHaJeff 提交者: LINGuanRen

add an interface to get the minest server log ts

上级 de9b8333
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include <stdint.h> #include <stdint.h>
#include <pthread.h> #include <pthread.h>
#include "lib/ob_define.h"
#include "lib/oblog/ob_log.h" #include "lib/oblog/ob_log.h"
#include "lib/atomic/ob_atomic.h" #include "lib/atomic/ob_atomic.h"
#include "lib/lock/Monitor.h" #include "lib/lock/Monitor.h"
...@@ -42,6 +43,7 @@ public: ...@@ -42,6 +43,7 @@ public:
static int64_t getCurrentTime(); static int64_t getCurrentTime();
static void msleep(const int64_t ms); static void msleep(const int64_t ms);
static void usleep(const int64_t us); static void usleep(const int64_t us);
static void try_advance_cur_ts(const int64_t cur_ts);
private: private:
int64_t get_us(); int64_t get_us();
...@@ -109,7 +111,20 @@ inline void ObClockGenerator::usleep(const int64_t us) ...@@ -109,7 +111,20 @@ inline void ObClockGenerator::usleep(const int64_t us)
} }
} }
inline int64_t ObClockGenerator::get_us() inline void ObClockGenerator::try_advance_cur_ts(const int64_t cur_ts)
{
int64_t origin_cur_ts = OB_INVALID_TIMESTAMP;
do {
origin_cur_ts = ATOMIC_LOAD(&clock_generator_.cur_ts_);
if (origin_cur_ts < cur_ts) {
break;
} else {
TRANS_LOG(WARN, "timestamp rollback, need advance cur ts", K(origin_cur_ts), K(cur_ts));
}
} while (false == ATOMIC_BCAS(&clock_generator_.cur_ts_, origin_cur_ts, cur_ts));
}
OB_INLINE int64_t ObClockGenerator::get_us()
{ {
return common::ObTimeUtility::current_time(); return common::ObTimeUtility::current_time();
} }
......
...@@ -3415,5 +3415,18 @@ bool ObCLogMgr::is_server_archive_stop(const int64_t incarnation, const int64_t ...@@ -3415,5 +3415,18 @@ bool ObCLogMgr::is_server_archive_stop(const int64_t incarnation, const int64_t
return archive_mgr_.is_server_archive_stop(incarnation, archive_round); return archive_mgr_.is_server_archive_stop(incarnation, archive_round);
} }
int ObCLogMgr::get_server_min_log_ts(int64_t &server_min_log_ts)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "clog_mgr is not inited", KR(ret));
} else if (OB_FAIL(log_engine_.get_server_min_log_ts(server_min_log_ts))) {
CLOG_LOG(WARN, "failed to get_server_min_log_ts", KR(ret));
} else {
CLOG_LOG(INFO, "get_server_min_log_ts success", K(server_min_log_ts));
}
return ret;
}
} // namespace clog } // namespace clog
} // end namespace oceanbase } // end namespace oceanbase
...@@ -300,6 +300,7 @@ public: ...@@ -300,6 +300,7 @@ public:
const common::ObPartitionKey& pkey, const int64_t incarnation, const int64_t archive_round) = 0; const common::ObPartitionKey& pkey, const int64_t incarnation, const int64_t archive_round) = 0;
virtual int get_archive_pg_map(archive::PGArchiveMap*& map) = 0; virtual int get_archive_pg_map(archive::PGArchiveMap*& map) = 0;
virtual bool is_server_archive_stop(const int64_t incarnation, const int64_t archive_round) = 0; virtual bool is_server_archive_stop(const int64_t incarnation, const int64_t archive_round) = 0;
virtual int get_server_min_log_ts(int64_t &server_min_log_ts) = 0;
}; };
class ObCLogMgr : public ObICLogMgr { class ObCLogMgr : public ObICLogMgr {
...@@ -429,6 +430,8 @@ public: ...@@ -429,6 +430,8 @@ public:
{ {
return cb_engine_; return cb_engine_;
} }
virtual int get_server_min_log_ts(int64_t &server_min_log_ts);
// ==================== physical flashback ===================== // ==================== physical flashback =====================
int delete_all_log_files() override; int delete_all_log_files() override;
// ==================== log archive ===================== // ==================== log archive =====================
......
...@@ -141,6 +141,7 @@ public: ...@@ -141,6 +141,7 @@ public:
int ret = common::OB_SUCCESS; int ret = common::OB_SUCCESS;
file_id_t range_min_file_id = common::OB_INVALID_FILE_ID; file_id_t range_min_file_id = common::OB_INVALID_FILE_ID;
file_id_t range_max_file_id = common::OB_INVALID_FILE_ID; file_id_t range_max_file_id = common::OB_INVALID_FILE_ID;
int64_t last_entry_ts = common::OB_INVALID_TIMESTAMP;
// When empty folder, because the file header block occupies DIO_ALIGN_SIZE, // When empty folder, because the file header block occupies DIO_ALIGN_SIZE,
// so the file start offset begins from CLOG_DIO_ALIGN_SIZE // so the file start offset begins from CLOG_DIO_ALIGN_SIZE
ObRawEntryIterator<Type, Interface> iter; ObRawEntryIterator<Type, Interface> iter;
...@@ -165,7 +166,11 @@ public: ...@@ -165,7 +166,11 @@ public:
if (common::OB_ITER_END == ret) { if (common::OB_ITER_END == ret) {
file_id = param.file_id_; file_id = param.file_id_;
offset = param.offset_; offset = param.offset_;
CLOG_LOG(INFO, "iter next_entry finish", K(ret), K(param), K(entry)); // FIXME last_entry_ts = entry.get_submit_timestamp();
if (OB_INVALID_TIMESTAMP != last_entry_ts) {
ObClockGenerator::try_advance_cur_ts(last_entry_ts);
}
CLOG_LOG(INFO, "iter next_entry finish", K(ret), K(param), K(entry)); // FIXME
ret = common::OB_SUCCESS; ret = common::OB_SUCCESS;
} else { } else {
CLOG_LOG(ERROR, "get cursor fail", K(ret), K(param), K(entry), K(range_max_file_id)); CLOG_LOG(ERROR, "get cursor fail", K(ret), K(param), K(entry), K(range_max_file_id));
......
...@@ -10,11 +10,13 @@ ...@@ -10,11 +10,13 @@
* See the Mulan PubL v2 for more details. * See the Mulan PubL v2 for more details.
*/ */
#include <sys/vfs.h>
#include "ob_log_engine.h" #include "ob_log_engine.h"
#include <sys/vfs.h>
#include "common/ob_member_list.h" #include "common/ob_member_list.h"
#include "lib/file/file_directory_utils.h" #include "lib/file/file_directory_utils.h"
#include "lib/ob_define.h"
#include "lib/thread_local/thread_buffer.h" #include "lib/thread_local/thread_buffer.h"
#include "lib/time/ob_time_utility.h"
#include "rpc/obrpc/ob_rpc_net_handler.h" #include "rpc/obrpc/ob_rpc_net_handler.h"
#include "share/ob_cluster_version.h" #include "share/ob_cluster_version.h"
#include "share/ob_server_blacklist.h" #include "share/ob_server_blacklist.h"
...@@ -2595,6 +2597,63 @@ bool ObLogEngine::is_clog_disk_hang() const ...@@ -2595,6 +2597,63 @@ bool ObLogEngine::is_clog_disk_hang() const
return is_disk_hang; return is_disk_hang;
} }
int ObLogEngine::get_server_min_log_ts(int64_t &server_min_log_ts)
{
int ret = OB_SUCCESS;
ObLogBlockMetaV2 log_block;
ObReadBuf read_buf;
ObReadRes res;
ObReadCost cost;
ObReadParam read_param;
const int64_t in_read_size = 4 * 1024;
read_buf.buf_len_ = in_read_size + CLOG_DIO_ALIGN_SIZE;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (NULL == (read_buf.buf_
= static_cast<char*>(ob_malloc_align(CLOG_DIO_ALIGN_SIZE, read_buf.buf_len_, "LogEngine")))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
const int64_t TIMEOUT_TS = 1 * 1000 * 1000;
int64_t start_ts = ObTimeUtility::current_time();
do {
file_id_t min_file_id = get_clog_min_file_id();
read_param.file_id_ = min_file_id;
read_param.offset_ = 0;
read_param.read_len_ = in_read_size;
if (OB_INVALID_FILE_ID == min_file_id) {
ret = OB_ENTRY_NOT_EXIST;
} else if (OB_FAIL(read_data_direct(read_param, read_buf, res, cost))) {
CLOG_LOG(WARN, "read_log_by_location failed", K(ret), K(read_param));
} else {
}
int64_t cost_ts = ObTimeUtility::current_time() - start_ts;
if (OB_NO_SUCH_FILE_OR_DIRECTORY == ret) {
if (cost_ts >= TIMEOUT_TS) {
ret = OB_TIMEOUT;
CLOG_LOG(WARN, "get_server_min_log_ts timeout", K(ret));
} else {
usleep(100 * 1000);
}
}
} while (OB_NO_SUCH_FILE_OR_DIRECTORY == ret);
}
if (OB_SUCC(ret)) {
int64_t pos = 0;
if (OB_FAIL(log_block.deserialize(res.buf_, res.data_len_, pos))) {
} else if (false == log_block.check_meta_checksum()) {
ret = OB_INVALID_DATA;
CLOG_LOG(WARN, "LogBlock has been corrupt", K(ret), K(log_block));
} else {
server_min_log_ts = log_block.get_timestamp();
CLOG_LOG(INFO, "ObLogEngine get_server_min_log_ts success", K(server_min_log_ts), K(read_param));
}
}
if (true == read_buf.is_valid()) {
ob_free_align(read_buf.buf_);
}
return ret;
}
NetworkLimitManager::NetworkLimitManager() : is_inited_(false), addr_array_(), ethernet_speed_(0), hash_map_() NetworkLimitManager::NetworkLimitManager() : is_inited_(false), addr_array_(), ethernet_speed_(0), hash_map_()
{} {}
......
...@@ -537,6 +537,7 @@ public: ...@@ -537,6 +537,7 @@ public:
int get_clog_using_disk_space(int64_t &space) const; int get_clog_using_disk_space(int64_t &space) const;
int get_ilog_using_disk_space(int64_t &space) const; int get_ilog_using_disk_space(int64_t &space) const;
bool is_clog_disk_hang() const; bool is_clog_disk_hang() const;
int get_server_min_log_ts(int64_t &server_min_log_ts);
private: private:
int fetch_log_from_server( int fetch_log_from_server(
......
...@@ -60,6 +60,7 @@ public: ...@@ -60,6 +60,7 @@ public:
return header_.update_proposal_id(new_proposal_id); return header_.update_proposal_id(new_proposal_id);
} }
int get_next_replay_ts_for_rg(int64_t& next_replay_ts) const; int get_next_replay_ts_for_rg(int64_t& next_replay_ts) const;
int64_t get_submit_timestamp() const { return header_.get_submit_timestamp(); }
TO_STRING_KV(N_HEADER, header_); TO_STRING_KV(N_HEADER, header_);
NEED_SERIALIZE_AND_DESERIALIZE; NEED_SERIALIZE_AND_DESERIALIZE;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册