提交 aa15788e 编写于 作者: Y yy0 提交者: wangzelin.wzl

fix logic of Maximum continuous replayed log_id

上级 047132a3
...@@ -467,7 +467,6 @@ public: ...@@ -467,7 +467,6 @@ public:
virtual int retire_warmup_store(const bool is_disk_full) = 0; virtual int retire_warmup_store(const bool is_disk_full) = 0;
virtual int enable_write_log(const bool is_replay_old) = 0; virtual int enable_write_log(const bool is_replay_old) = 0;
virtual uint64_t get_min_replayed_log_id() = 0; // Get the minimum log id that has been replayed continuously. virtual uint64_t get_min_replayed_log_id() = 0; // Get the minimum log id that has been replayed continuously.
virtual void get_min_replayed_log(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts) = 0;
virtual int get_min_replayed_log_with_keepalive(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts) = 0; virtual int get_min_replayed_log_with_keepalive(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts) = 0;
virtual int create_partition_group(const ObCreatePGParam& param) = 0; virtual int create_partition_group(const ObCreatePGParam& param) = 0;
virtual int create_pg_partition(const common::ObPartitionKey& pkey, const int64_t multi_version_start, virtual int create_pg_partition(const common::ObPartitionKey& pkey, const int64_t multi_version_start,
......
...@@ -5364,35 +5364,12 @@ uint64_t ObPartitionGroup::get_min_replayed_log_id() ...@@ -5364,35 +5364,12 @@ uint64_t ObPartitionGroup::get_min_replayed_log_id()
uint64_t min_replay_log_id = UINT64_MAX; uint64_t min_replay_log_id = UINT64_MAX;
int64_t unused = 0; int64_t unused = 0;
get_min_replayed_log(min_replay_log_id, unused); get_min_replayed_log_with_keepalive(min_replay_log_id, unused);
return min_replay_log_id; return min_replay_log_id;
} }
void ObPartitionGroup::get_min_replayed_log(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts) int ObPartitionGroup::get_min_replayed_log_with_keepalive(uint64_t &min_replay_log_id, int64_t &min_replay_log_ts)
{
uint64_t unreplay_log_id = UINT64_MAX;
int64_t unreplay_log_ts = 0;
uint64_t last_replay_log_id = UINT64_MAX;
int64_t last_replay_log_ts = 0;
// 1. The left boundary of sliding window.
pls_->get_last_replay_log(last_replay_log_id, last_replay_log_ts);
// 2. The minimum continuously replayed log of replay engine.
replay_status_->get_min_unreplay_log(unreplay_log_id, unreplay_log_ts);
if (unreplay_log_id <= last_replay_log_id) {
min_replay_log_id = unreplay_log_id - 1;
min_replay_log_ts = unreplay_log_ts - 1;
} else {
min_replay_log_id = last_replay_log_id;
min_replay_log_ts = last_replay_log_ts;
}
STORAGE_LOG(INFO, "min replayed log", K(pkey_), K(min_replay_log_ts), K(unreplay_log_ts), K(last_replay_log_ts));
}
int ObPartitionGroup::get_min_replayed_log_with_keepalive(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
uint64_t unreplay_log_id = UINT64_MAX; uint64_t unreplay_log_id = UINT64_MAX;
...@@ -5406,12 +5383,13 @@ int ObPartitionGroup::get_min_replayed_log_with_keepalive(uint64_t& min_replay_l ...@@ -5406,12 +5383,13 @@ int ObPartitionGroup::get_min_replayed_log_with_keepalive(uint64_t& min_replay_l
} else { } else {
// 2. The minimum continuously replayed log of replay engine. // 2. The minimum continuously replayed log of replay engine.
replay_status_->get_min_unreplay_log(unreplay_log_id, unreplay_log_ts); replay_status_->get_min_unreplay_log(unreplay_log_id, unreplay_log_ts);
if (unreplay_log_id <= next_replay_log_id - 1) { if (unreplay_log_id == next_replay_log_id) {
min_replay_log_id = unreplay_log_id - 1; // cold partition, return next_replay_log_ts instead of unreplay_log_ts, unreplay_log_ts may be too small.
min_replay_log_ts = unreplay_log_ts - 1;
} else {
min_replay_log_id = next_replay_log_id - 1; min_replay_log_id = next_replay_log_id - 1;
min_replay_log_ts = next_replay_log_ts - 1; min_replay_log_ts = next_replay_log_ts - 1;
} else {
min_replay_log_id = unreplay_log_id - 1;
min_replay_log_ts = unreplay_log_ts - 1;
} }
STORAGE_LOG(INFO, STORAGE_LOG(INFO,
...@@ -5856,7 +5834,7 @@ int ObPartitionGroup::get_merge_log_ts(int64_t& merge_ts) ...@@ -5856,7 +5834,7 @@ int ObPartitionGroup::get_merge_log_ts(int64_t& merge_ts)
ObPartitionGroupLockGuard guard(lock_, PGLOCKTRANS | PGLOCKREPLAY | PGLOCKCLOG, 0); ObPartitionGroupLockGuard guard(lock_, PGLOCKTRANS | PGLOCKREPLAY | PGLOCKCLOG, 0);
uint64_t unused = 0; uint64_t unused = 0;
get_min_replayed_log(unused, merge_ts); get_min_replayed_log_with_keepalive(unused, merge_ts);
if (OB_ISNULL(txs_)) { if (OB_ISNULL(txs_)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
......
...@@ -343,7 +343,6 @@ public: ...@@ -343,7 +343,6 @@ public:
int has_active_memtable(bool& found); int has_active_memtable(bool& found);
virtual int enable_write_log(const bool is_replay_old) override; virtual int enable_write_log(const bool is_replay_old) override;
virtual uint64_t get_min_replayed_log_id() override; virtual uint64_t get_min_replayed_log_id() override;
virtual void get_min_replayed_log(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts) override;
virtual int get_min_replayed_log_with_keepalive(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts) override; virtual int get_min_replayed_log_with_keepalive(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts) override;
virtual int check_dirty_txn( virtual int check_dirty_txn(
const int64_t min_log_ts, const int64_t max_log_ts, int64_t& freeze_ts, bool& is_dirty) override; const int64_t min_log_ts, const int64_t max_log_ts, int64_t& freeze_ts, bool& is_dirty) override;
......
...@@ -137,7 +137,15 @@ int ObPartitionLoopWorker::gen_readable_info_with_memtable_(ObPartitionReadableI ...@@ -137,7 +137,15 @@ int ObPartitionLoopWorker::gen_readable_info_with_memtable_(ObPartitionReadableI
STORAGE_LOG(WARN, "get_next_replay_log_info error", K(ret), K_(pkey)); STORAGE_LOG(WARN, "get_next_replay_log_info error", K(ret), K_(pkey));
} }
} else { } else {
readable_info.min_replay_engine_ts_ = replay_status_->get_min_unreplay_log_timestamp(); uint64_t min_unreplay_log_id = OB_INVALID_ID;
int64_t min_unreplay_log_ts = OB_INVALID_TIMESTAMP;
replay_status_->get_min_unreplay_log(min_unreplay_log_id, min_unreplay_log_ts);
if (min_unreplay_log_id == next_replay_log_id) {
// cold partition, min_unreplay_log_ts returned by replay engine may be too small
readable_info.min_replay_engine_ts_ = readable_info.min_log_service_ts_;
} else {
readable_info.min_replay_engine_ts_ = min_unreplay_log_ts;
}
if (OB_FAIL(txs_->get_min_uncommit_prepare_version(pkey_, readable_info.min_trans_service_ts_))) { if (OB_FAIL(txs_->get_min_uncommit_prepare_version(pkey_, readable_info.min_trans_service_ts_))) {
if (OB_PARTITION_NOT_EXIST == ret) { if (OB_PARTITION_NOT_EXIST == ret) {
if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) { if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) {
......
...@@ -526,6 +526,8 @@ uint64_t ObReplayStatus::get_min_unreplay_log_id() ...@@ -526,6 +526,8 @@ uint64_t ObReplayStatus::get_min_unreplay_log_id()
// the invoker needs to lock // the invoker needs to lock
int64_t ObReplayStatus::get_min_unreplay_log_timestamp() int64_t ObReplayStatus::get_min_unreplay_log_timestamp()
{ {
// for cold partition: timestamp returned may be small then expected, need to be double check with
// value returned by log service
uint64_t unused = UINT64_MAX; uint64_t unused = UINT64_MAX;
int64_t timestamp = INT64_MAX; int64_t timestamp = INT64_MAX;
get_min_unreplay_log(unused, timestamp); get_min_unreplay_log(unused, timestamp);
...@@ -534,19 +536,12 @@ int64_t ObReplayStatus::get_min_unreplay_log_timestamp() ...@@ -534,19 +536,12 @@ int64_t ObReplayStatus::get_min_unreplay_log_timestamp()
void ObReplayStatus::get_min_unreplay_log(uint64_t& unreplay_log_id, int64_t& timestamp) void ObReplayStatus::get_min_unreplay_log(uint64_t& unreplay_log_id, int64_t& timestamp)
{ {
unreplay_log_id = UINT64_MAX; // for cold partition: timestamp returned may be small then expected, need to be double check with
timestamp = INT64_MAX; // value returned by log service
{ {
RLockGuard Rlock_guard(get_submit_log_info_rwlock()); RLockGuard Rlock_guard(get_submit_log_info_rwlock());
uint64_t next_submit_log_id = get_next_submit_log_id(); unreplay_log_id = get_next_submit_log_id();
int64_t next_submit_log_ts = get_next_submit_log_ts(); timestamp = get_next_submit_log_ts();
uint64_t last_slide_out_log_id = get_last_slide_out_log_id();
int64_t last_slide_out_log_ts = get_last_slide_out_log_ts();
if (next_submit_log_ts <= last_slide_out_log_ts) {
unreplay_log_id = next_submit_log_id;
timestamp = next_submit_log_ts;
}
} }
for (int64_t i = 0; i < REPLAY_TASK_QUEUE_SIZE; ++i) { for (int64_t i = 0; i < REPLAY_TASK_QUEUE_SIZE; ++i) {
......
...@@ -302,7 +302,6 @@ public: ...@@ -302,7 +302,6 @@ public:
return common::OB_SUCCESS; return common::OB_SUCCESS;
} }
MOCK_METHOD0(get_min_replayed_log_id, uint64_t()); MOCK_METHOD0(get_min_replayed_log_id, uint64_t());
MOCK_METHOD2(get_min_replayed_log, void(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts));
MOCK_METHOD2(get_min_replayed_log_with_keepalive, int(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts)); MOCK_METHOD2(get_min_replayed_log_with_keepalive, int(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts));
MOCK_CONST_METHOD1(get_table_store_cnt, int(int64_t& table_cnt)); MOCK_CONST_METHOD1(get_table_store_cnt, int(int64_t& table_cnt));
MOCK_METHOD4( MOCK_METHOD4(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册