diff --git a/src/storage/ob_i_partition_group.h b/src/storage/ob_i_partition_group.h index 7fda455d0dc49d258b28b320a6028a58e8723b64..05a44a2cad0bf4e319b5be49e592172b8778d993 100644 --- a/src/storage/ob_i_partition_group.h +++ b/src/storage/ob_i_partition_group.h @@ -467,7 +467,6 @@ public: virtual int retire_warmup_store(const bool is_disk_full) = 0; virtual int enable_write_log(const bool is_replay_old) = 0; virtual uint64_t get_min_replayed_log_id() = 0; // Get the minimum log id that has been replayed continuously. - virtual void get_min_replayed_log(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts) = 0; virtual int get_min_replayed_log_with_keepalive(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts) = 0; virtual int create_partition_group(const ObCreatePGParam& param) = 0; virtual int create_pg_partition(const common::ObPartitionKey& pkey, const int64_t multi_version_start, diff --git a/src/storage/ob_partition_group.cpp b/src/storage/ob_partition_group.cpp index 34241cc9980c57ab9f968c79be821d425a359489..ea62b4cc434d508d7189f1adbe9bcf646d1ab93c 100644 --- a/src/storage/ob_partition_group.cpp +++ b/src/storage/ob_partition_group.cpp @@ -5364,35 +5364,12 @@ uint64_t ObPartitionGroup::get_min_replayed_log_id() uint64_t min_replay_log_id = UINT64_MAX; int64_t unused = 0; - get_min_replayed_log(min_replay_log_id, unused); + get_min_replayed_log_with_keepalive(min_replay_log_id, unused); return min_replay_log_id; } -void ObPartitionGroup::get_min_replayed_log(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts) -{ - uint64_t unreplay_log_id = UINT64_MAX; - int64_t unreplay_log_ts = 0; - uint64_t last_replay_log_id = UINT64_MAX; - int64_t last_replay_log_ts = 0; - - // 1. The left boundary of sliding window. - pls_->get_last_replay_log(last_replay_log_id, last_replay_log_ts); - - // 2. The minimum continuously replayed log of replay engine. - replay_status_->get_min_unreplay_log(unreplay_log_id, unreplay_log_ts); - if (unreplay_log_id <= last_replay_log_id) { - min_replay_log_id = unreplay_log_id - 1; - min_replay_log_ts = unreplay_log_ts - 1; - } else { - min_replay_log_id = last_replay_log_id; - min_replay_log_ts = last_replay_log_ts; - } - - STORAGE_LOG(INFO, "min replayed log", K(pkey_), K(min_replay_log_ts), K(unreplay_log_ts), K(last_replay_log_ts)); -} - -int ObPartitionGroup::get_min_replayed_log_with_keepalive(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts) +int ObPartitionGroup::get_min_replayed_log_with_keepalive(uint64_t &min_replay_log_id, int64_t &min_replay_log_ts) { int ret = OB_SUCCESS; uint64_t unreplay_log_id = UINT64_MAX; @@ -5406,12 +5383,13 @@ int ObPartitionGroup::get_min_replayed_log_with_keepalive(uint64_t& min_replay_l } else { // 2. The minimum continuously replayed log of replay engine. replay_status_->get_min_unreplay_log(unreplay_log_id, unreplay_log_ts); - if (unreplay_log_id <= next_replay_log_id - 1) { - min_replay_log_id = unreplay_log_id - 1; - min_replay_log_ts = unreplay_log_ts - 1; - } else { + if (unreplay_log_id == next_replay_log_id) { + // cold partition, return next_replay_log_ts instead of unreplay_log_ts, unreplay_log_ts may be too small. min_replay_log_id = next_replay_log_id - 1; min_replay_log_ts = next_replay_log_ts - 1; + } else { + min_replay_log_id = unreplay_log_id - 1; + min_replay_log_ts = unreplay_log_ts - 1; } STORAGE_LOG(INFO, @@ -5856,7 +5834,7 @@ int ObPartitionGroup::get_merge_log_ts(int64_t& merge_ts) ObPartitionGroupLockGuard guard(lock_, PGLOCKTRANS | PGLOCKREPLAY | PGLOCKCLOG, 0); uint64_t unused = 0; - get_min_replayed_log(unused, merge_ts); + get_min_replayed_log_with_keepalive(unused, merge_ts); if (OB_ISNULL(txs_)) { ret = OB_ERR_UNEXPECTED; diff --git a/src/storage/ob_partition_group.h b/src/storage/ob_partition_group.h index f6ad0762a7c8b3c5bdf5ec96542a37d395d5e655..ee00cac71899a64b566a1be7cda8426d8d9feb68 100644 --- a/src/storage/ob_partition_group.h +++ b/src/storage/ob_partition_group.h @@ -343,7 +343,6 @@ public: int has_active_memtable(bool& found); virtual int enable_write_log(const bool is_replay_old) override; virtual uint64_t get_min_replayed_log_id() override; - virtual void get_min_replayed_log(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts) override; virtual int get_min_replayed_log_with_keepalive(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts) override; virtual int check_dirty_txn( const int64_t min_log_ts, const int64_t max_log_ts, int64_t& freeze_ts, bool& is_dirty) override; diff --git a/src/storage/ob_partition_loop_worker.cpp b/src/storage/ob_partition_loop_worker.cpp index 084ede5f9850531faf7f035653ec14847f3197eb..0a4aa76914b5f557e1e581a0b93fc98120f393a6 100644 --- a/src/storage/ob_partition_loop_worker.cpp +++ b/src/storage/ob_partition_loop_worker.cpp @@ -137,7 +137,15 @@ int ObPartitionLoopWorker::gen_readable_info_with_memtable_(ObPartitionReadableI STORAGE_LOG(WARN, "get_next_replay_log_info error", K(ret), K_(pkey)); } } else { - readable_info.min_replay_engine_ts_ = replay_status_->get_min_unreplay_log_timestamp(); + uint64_t min_unreplay_log_id = OB_INVALID_ID; + int64_t min_unreplay_log_ts = OB_INVALID_TIMESTAMP; + replay_status_->get_min_unreplay_log(min_unreplay_log_id, min_unreplay_log_ts); + if (min_unreplay_log_id == next_replay_log_id) { + // cold partition, min_unreplay_log_ts returned by replay engine may be too small + readable_info.min_replay_engine_ts_ = readable_info.min_log_service_ts_; + } else { + readable_info.min_replay_engine_ts_ = min_unreplay_log_ts; + } if (OB_FAIL(txs_->get_min_uncommit_prepare_version(pkey_, readable_info.min_trans_service_ts_))) { if (OB_PARTITION_NOT_EXIST == ret) { if (REACH_TIME_INTERVAL(60 * 1000 * 1000)) { diff --git a/src/storage/ob_replay_status.cpp b/src/storage/ob_replay_status.cpp index 1a05143a2593a60d66e8a573349bd18af0da0271..9f4c61b9d0fe852cea3a6de1e0a977cec9074935 100644 --- a/src/storage/ob_replay_status.cpp +++ b/src/storage/ob_replay_status.cpp @@ -526,6 +526,8 @@ uint64_t ObReplayStatus::get_min_unreplay_log_id() // the invoker needs to lock int64_t ObReplayStatus::get_min_unreplay_log_timestamp() { + // for cold partition: timestamp returned may be small then expected, need to be double check with + // value returned by log service uint64_t unused = UINT64_MAX; int64_t timestamp = INT64_MAX; get_min_unreplay_log(unused, timestamp); @@ -534,19 +536,12 @@ int64_t ObReplayStatus::get_min_unreplay_log_timestamp() void ObReplayStatus::get_min_unreplay_log(uint64_t& unreplay_log_id, int64_t& timestamp) { - unreplay_log_id = UINT64_MAX; - timestamp = INT64_MAX; + // for cold partition: timestamp returned may be small then expected, need to be double check with + // value returned by log service { RLockGuard Rlock_guard(get_submit_log_info_rwlock()); - uint64_t next_submit_log_id = get_next_submit_log_id(); - int64_t next_submit_log_ts = get_next_submit_log_ts(); - uint64_t last_slide_out_log_id = get_last_slide_out_log_id(); - int64_t last_slide_out_log_ts = get_last_slide_out_log_ts(); - - if (next_submit_log_ts <= last_slide_out_log_ts) { - unreplay_log_id = next_submit_log_id; - timestamp = next_submit_log_ts; - } + unreplay_log_id = get_next_submit_log_id(); + timestamp = get_next_submit_log_ts(); } for (int64_t i = 0; i < REPLAY_TASK_QUEUE_SIZE; ++i) { diff --git a/unittest/storage/mockcontainer/mock_ob_partition.h b/unittest/storage/mockcontainer/mock_ob_partition.h index 6d0db12daa80028906c515cf243efd6962649acc..825d653ad1ad5c10f15864f14adb7a90946811db 100644 --- a/unittest/storage/mockcontainer/mock_ob_partition.h +++ b/unittest/storage/mockcontainer/mock_ob_partition.h @@ -302,7 +302,6 @@ public: return common::OB_SUCCESS; } MOCK_METHOD0(get_min_replayed_log_id, uint64_t()); - MOCK_METHOD2(get_min_replayed_log, void(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts)); MOCK_METHOD2(get_min_replayed_log_with_keepalive, int(uint64_t& min_replay_log_id, int64_t& min_replay_log_ts)); MOCK_CONST_METHOD1(get_table_store_cnt, int(int64_t& table_cnt)); MOCK_METHOD4(