diff --git a/src/clog/ob_log_entry.cpp b/src/clog/ob_log_entry.cpp index 491d77319dacbc6943fd51a95d7b58ba89eb3043..8a0bf05e92dba7e4cbffdb06c0c0528a38d29883 100644 --- a/src/clog/ob_log_entry.cpp +++ b/src/clog/ob_log_entry.cpp @@ -113,6 +113,28 @@ int ObLogEntry::deep_copy_to(ObLogEntry& entry) const return deep_copy_to_(entry); } +int ObLogEntry::get_next_replay_ts_for_rg(int64_t &next_replay_ts) const +{ + int ret = OB_SUCCESS; + if (OB_LOG_AGGRE == header_.get_log_type()) { + int64_t pos = 0; + int32_t next_log_offset = 0; + const char *log_buf = get_buf(); + const int64_t log_buf_len = header_.get_data_len(); + if (OB_FAIL(serialization::decode_i32(log_buf, log_buf_len, pos, &next_log_offset))) { + REPLAY_LOG(ERROR, "serialization decode_i32 failed", KR(ret), K(log_buf_len), K(pos), + K(header_), K(next_log_offset)); + } else if (OB_FAIL(serialization::decode_i64(log_buf, log_buf_len, pos, &next_replay_ts))) { + REPLAY_LOG(ERROR, "serialization decode_i64 failed", KR(ret), K(log_buf_len), K(pos), + K(header_), K(next_log_offset)); + } else {/*do nothing*/} + } else { + //for non_aggregate_log, just assign next_replay_ts with submit_timestamp_ + next_replay_ts = header_.get_submit_timestamp(); + } + return ret; +} + DEFINE_SERIALIZE(ObLogEntry) { int ret = OB_SUCCESS; diff --git a/src/clog/ob_log_entry.h b/src/clog/ob_log_entry.h index 5612fd76417932f2b44389762d9a7448f1688b15..e9caf6f577ae6a745c3f7604abb9fae1f09bd5f7 100644 --- a/src/clog/ob_log_entry.h +++ b/src/clog/ob_log_entry.h @@ -59,6 +59,7 @@ class ObLogEntry { { return header_.update_proposal_id(new_proposal_id); } + int get_next_replay_ts_for_rg(int64_t &next_replay_ts) const; TO_STRING_KV(N_HEADER, header_); NEED_SERIALIZE_AND_DESERIALIZE; diff --git a/src/clog/ob_log_sliding_window.cpp b/src/clog/ob_log_sliding_window.cpp index a2a5105dc8a79a3b7765f9bf6eda1ae7bb1cd0cd..ada52445921b5ba4f3df733a0318e2b6b8001c32 100644 --- a/src/clog/ob_log_sliding_window.cpp +++ b/src/clog/ob_log_sliding_window.cpp @@ -3357,26 +3357,27 @@ int ObLogSlidingWindow::need_replay_for_data_or_log_replica_(const bool is_trans return ret; } -int ObLogSlidingWindow::check_is_meta_log(const ObPartitionKey& pkey, uint64_t log_id, bool& is_meta_log, - int64_t& log_ts, int64_t& accum_checksum, ObLogType& log_type) const +int ObLogSlidingWindow::get_log_meta_info(uint64_t log_id, bool &is_meta_log, int64_t &log_ts, + int64_t &next_replay_log_ts_for_rg, int64_t &accum_checksum, ObLogType &log_type) const { int ret = OB_SUCCESS; is_meta_log = false; ObICLogMgr* clog_mgr = NULL; if (OB_ISNULL(partition_service_) || OB_ISNULL(clog_mgr = partition_service_->get_clog_mgr())) { ret = OB_ERR_UNEXPECTED; - CLOG_LOG(WARN, "invalid argument", K(pkey), K(log_id), KP(partition_service_), KP(clog_mgr), KR(ret)); + CLOG_LOG(WARN, "invalid argument", K(partition_key_), K(log_id), KP(partition_service_), KP(clog_mgr), KR(ret)); } else { clog::ObLogEntry log_entry; bool is_batch_committed = false; - if (OB_FAIL(clog_mgr->query_log_info_with_log_id(pkey, log_id, log_entry, accum_checksum, is_batch_committed))) { + if (OB_FAIL(clog_mgr->query_log_info_with_log_id(partition_key_, log_id, log_entry, accum_checksum, is_batch_committed))) { if (OB_EAGAIN == ret) { if (REACH_TIME_INTERVAL(100 * 1000)) { - CLOG_LOG(WARN, "failed to query_log_info_with_log_id ", K(pkey), K(log_id), K(ret)); + CLOG_LOG(WARN, "failed to query_log_info_with_log_id ", K(partition_key_), K(log_id), K(ret)); } } else { - CLOG_LOG(WARN, "failed to query_log_info_with_log_id ", K(pkey), K(log_id), K(ret)); + CLOG_LOG(WARN, "failed to query_log_info_with_log_id ", K(partition_key_), K(log_id), K(ret)); } + } else if (OB_FAIL(log_entry.get_next_replay_ts_for_rg(next_replay_log_ts_for_rg))) { } else { log_type = log_entry.get_header().get_log_type(); log_ts = log_entry.get_header().get_submit_timestamp(); @@ -3425,6 +3426,7 @@ int ObLogSlidingWindow::try_submit_replay_task_(const uint64_t log_id, const ObL } else { const ObLogType header_log_type = log_task.get_log_type(); const int64_t log_submit_timestamp = log_task.get_submit_timestamp(); + const int64_t next_replay_log_ts = log_task.get_next_replay_log_ts(); bool need_replay = log_task.need_replay(); const bool is_trans_log = log_task.is_trans_log(); uint64_t last_replay_log_id = OB_INVALID_ID; @@ -3510,18 +3512,10 @@ int ObLogSlidingWindow::try_submit_replay_task_(const uint64_t log_id, const ObL } if (OB_SUCC(ret)) { - int64_t next_replay_log_timestamp = INT64_MAX; if (state_mgr_->is_offline()) { - CLOG_LOG( - WARN, "no need to submit log to replay when partition is offline", KR(ret), K(partition_key_), K(log_id)); - } else if (OB_FAIL(get_next_replay_log_timestamp(next_replay_log_timestamp))) { - CLOG_LOG(WARN, "failed to get_next_replay_log_timestamp", KR(ret), K(partition_key_), K(log_id)); - } else if (OB_FAIL(replay_engine_->submit_replay_log_task_sequentially(partition_key_, - log_id, - log_submit_timestamp, - need_replay, - header_log_type, - next_replay_log_timestamp))) { + CLOG_LOG(WARN, "no need to submit log to replay when partition is offline", KR(ret), K(partition_key_), K(log_id)); + } else if (OB_FAIL(replay_engine_->submit_replay_log_task_sequentially(partition_key_, log_id, log_submit_timestamp, + need_replay, header_log_type, next_replay_log_ts))) { if (OB_EAGAIN != ret) { CLOG_LOG(WARN, "failed to submit replay task", diff --git a/src/clog/ob_log_sliding_window.h b/src/clog/ob_log_sliding_window.h index d2d648be7242bf51ef2d39c07e0d23258089130c..8638982288f2499b220deba0a4e7fbf270cbd9ac 100644 --- a/src/clog/ob_log_sliding_window.h +++ b/src/clog/ob_log_sliding_window.h @@ -498,10 +498,14 @@ class ObLogSlidingWindow : public ObILogSWForCasMgr, int leader_active(); int leader_takeover(); int leader_revoke(); - int get_replica_replay_type(ObReplicaReplayType& replay_type) const; - // is_meta_log: log type that need been replayed by D replica and log replica - int check_is_meta_log(const common::ObPartitionKey& pkey, uint64_t log_id, bool& is_meta_log, int64_t& log_ts, - int64_t& accum_checksum, ObLogType& log_type) const; + int get_replica_replay_type(ObReplicaReplayType &replay_type) const; + //is_meta_log: log type that need been replayed by D replica and log replica + int get_log_meta_info(uint64_t log_id, + bool &is_meta_log, + int64_t &log_ts, + int64_t &next_replay_log_ts_for_rg, + int64_t &accum_checksum, + ObLogType &log_type) const; void destroy_aggre_buffer(); uint64_t get_leader_max_unconfirmed_log_count(); uint64_t get_follower_max_unconfirmed_log_count(); diff --git a/src/clog/ob_log_task.cpp b/src/clog/ob_log_task.cpp index f7d8fe75cbc1ab45cc9cea09a78b9d7e09002524..bbf892bcb9a518c7b1f0521323b8c8252240ffdc 100644 --- a/src/clog/ob_log_task.cpp +++ b/src/clog/ob_log_task.cpp @@ -101,6 +101,7 @@ ObLogTask::ObLogTask() log_buf_(NULL), generation_timestamp_(OB_INVALID_TIMESTAMP), submit_timestamp_(OB_INVALID_TIMESTAMP), + next_replay_log_ts_(OB_INVALID_TIMESTAMP), epoch_id_(OB_INVALID_TIMESTAMP), data_checksum_(0), accum_checksum_(0), @@ -221,6 +222,7 @@ int ObLogTask::reset_log() log_buf_len_ = 0; generation_timestamp_ = OB_INVALID_TIMESTAMP; submit_timestamp_ = OB_INVALID_TIMESTAMP; + next_replay_log_ts_ = OB_INVALID_TIMESTAMP; state_map_.reset_map(LOCAL_FLUSHED); state_map_.reset_map(ALREADY_SEND_TO_STANDBY); state_map_.reset_map(SUBMIT_LOG_EXIST); @@ -629,6 +631,11 @@ int64_t ObLogTask::get_submit_timestamp() const return submit_timestamp_; } +int64_t ObLogTask::get_next_replay_log_ts() const +{ + return next_replay_log_ts_; +} + int64_t ObLogTask::get_data_checksum() const { return data_checksum_; @@ -731,7 +738,7 @@ int ObLogTask::log_deep_copy_to_(const ObLogEntry& log_entry, const bool need_co if (NULL == (buf = static_cast(TMA_MGR_INSTANCE.alloc_log_entry_buf(log_entry.get_header().get_data_len())))) { ret = OB_ALLOCATE_MEMORY_FAILED; - CLOG_LOG(ERROR, "allocate memory fail", K(ret), "header", log_entry.get_header()); + CLOG_LOG(WARN, "allocate memory fail", K(ret), "header", log_entry.get_header()); } else { MEMCPY(buf, log_entry.get_buf(), log_entry.get_header().get_data_len()); log_buf_ = buf; @@ -742,15 +749,20 @@ int ObLogTask::log_deep_copy_to_(const ObLogEntry& log_entry, const bool need_co log_buf_len_ = 0; } if (OB_SUCC(ret)) { - log_type_ = static_cast(log_entry.get_header().get_log_type()); - if (log_entry.get_header().is_trans_log()) { - state_map_.set_map(IS_TRANS_LOG); + if (OB_FAIL(log_entry.get_next_replay_ts_for_rg(next_replay_log_ts_))) { + CLOG_LOG(WARN, "failed to get_next_replay_ts_for_rg", K(ret), "header", log_entry.get_header()); + } else { + log_type_ = static_cast(log_entry.get_header().get_log_type()); + const ObLogEntryHeader &log_header = log_entry.get_header(); + if (log_header.is_trans_log()) { + state_map_.set_map(IS_TRANS_LOG); + } + proposal_id_ = log_header.get_proposal_id(); + generation_timestamp_ = log_header.get_generation_timestamp(); + submit_timestamp_ = log_header.get_submit_timestamp(); + data_checksum_ = log_header.get_data_checksum(); + epoch_id_ = log_header.get_epoch_id(); } - proposal_id_ = log_entry.get_header().get_proposal_id(); - generation_timestamp_ = log_entry.get_header().get_generation_timestamp(); - submit_timestamp_ = log_entry.get_header().get_submit_timestamp(); - data_checksum_ = log_entry.get_header().get_data_checksum(); - epoch_id_ = log_entry.get_header().get_epoch_id(); } return ret; } diff --git a/src/clog/ob_log_task.h b/src/clog/ob_log_task.h index c0cfc03e1c7da4635c45e1f86464afcc474b75fb..27d994300cc2486bbb07f3e4b7299303a69d5dc6 100644 --- a/src/clog/ob_log_task.h +++ b/src/clog/ob_log_task.h @@ -142,6 +142,7 @@ class ObLogTask : public ObILogExtRingBufferData { char* get_log_buf() const; int32_t get_log_buf_len() const; int64_t get_generation_timestamp() const; + int64_t get_next_replay_log_ts() const; int64_t get_submit_timestamp() const; int64_t get_data_checksum() const; int64_t get_epoch_id() const; @@ -152,11 +153,10 @@ class ObLogTask : public ObILogExtRingBufferData { // common::ObTraceProfile *get_trace_profile() {return trace_profile_;} // int report_trace(); - TO_STRING_KV(K(log_type_), K(proposal_id_), K(log_buf_len_), K_(generation_timestamp), K(submit_timestamp_), - K_(data_checksum), K(epoch_id_), K(accum_checksum_), K_(state_map), K_(ack_list), KP_(submit_cb), - K_(majority_cnt), K_(log_cursor)); - - public: + TO_STRING_KV(K(log_type_), K(proposal_id_), K(log_buf_len_), K_(generation_timestamp), + K(submit_timestamp_), K(next_replay_log_ts_), K_(data_checksum), K(epoch_id_), K(accum_checksum_), + K_(state_map), K_(ack_list), KP_(submit_cb), K_(majority_cnt), K_(log_cursor)); +public: virtual void destroy(); virtual bool can_be_removed(); virtual bool can_overwrite(const ObILogExtRingBufferData* log_task); @@ -185,6 +185,10 @@ class ObLogTask : public ObILogExtRingBufferData { int64_t generation_timestamp_; // 8 bytes int64_t submit_timestamp_; + // 8 bytes, used for record next_replay_log_ts of total log_entry: + // for unencrypted OB_LOG_AGGR log: next_replay_log_ts is min_log_submit_ts of all aggregated logs + // for other logs: equal with submit_timestamp_ + int64_t next_replay_log_ts_; // ObConfirmedInfo, 16 bytes int64_t epoch_id_; int64_t data_checksum_; diff --git a/src/clog/ob_partition_log_service.cpp b/src/clog/ob_partition_log_service.cpp index c03c54ac266853940940ecdf4e90bbbe78c3e261..ecd93c8492818f5a786337a34e3976a7e41ae46b 100644 --- a/src/clog/ob_partition_log_service.cpp +++ b/src/clog/ob_partition_log_service.cpp @@ -5024,6 +5024,7 @@ int ObPartitionLogService::restore_replayed_log(const common::ObBaseStorageInfo& uint64_t cur_log_id = start_id; bool need_replay = false; int64_t log_submit_timestamp = OB_INVALID_TIMESTAMP; + int64_t next_replay_log_ts_for_rg = OB_INVALID_TIMESTAMP; int64_t accum_checksum = 0; do { if (OB_EAGAIN == ret) { @@ -5031,8 +5032,8 @@ int ObPartitionLogService::restore_replayed_log(const common::ObBaseStorageInfo& } bool is_meta_log = false; ObLogType log_type = OB_LOG_UNKNOWN; - if (OB_FAIL(sw_.check_is_meta_log( - partition_key_, cur_log_id, is_meta_log, log_submit_timestamp, accum_checksum, log_type))) { + if (OB_FAIL(sw_.get_log_meta_info(cur_log_id, is_meta_log, log_submit_timestamp, + next_replay_log_ts_for_rg, accum_checksum, log_type))) { if (OB_EAGAIN == ret) { if (REACH_TIME_INTERVAL(100 * 1000)) { CLOG_LOG(WARN, "failed to check is meta log", K(partition_key_), K(cur_log_id), K(ret)); @@ -5045,15 +5046,10 @@ int ObPartitionLogService::restore_replayed_log(const common::ObBaseStorageInfo& } uint64_t last_replay_log_id = OB_INVALID_ID; - int64_t last_replay_log_ts = OB_INVALID_TIMESTAMP; - (void)sw_.get_last_replay_log(last_replay_log_id, last_replay_log_ts); if (OB_FAIL(ret)) { - } else if (OB_FAIL(replay_engine_->submit_replay_log_task_sequentially(partition_key_, - cur_log_id, - log_submit_timestamp, - need_replay, - log_type, - last_replay_log_ts + 1))) { + } else if (OB_FAIL(replay_engine_->submit_replay_log_task_sequentially(partition_key_, cur_log_id, + log_submit_timestamp, need_replay, + log_type, next_replay_log_ts_for_rg))) { if (OB_EAGAIN == ret) { if (REACH_TIME_INTERVAL(100 * 1000)) { CLOG_LOG(WARN, "failed to submit replay task step by step", K(ret), K_(partition_key), K(cur_log_id)); diff --git a/src/storage/ob_partition_loop_worker.cpp b/src/storage/ob_partition_loop_worker.cpp index 95cacfb5c00dd7e4be968382a928b98ff2eb8cd0..778c03269e0119018af16d72f85c9b72cfb055cb 100644 --- a/src/storage/ob_partition_loop_worker.cpp +++ b/src/storage/ob_partition_loop_worker.cpp @@ -193,18 +193,21 @@ int ObPartitionLoopWorker::generate_weak_read_timestamp(const int64_t max_stale_ DEBUG_SYNC(BLOCK_WEAK_READ_TIMESTAMP); DEBUG_SYNC(SYNC_PG_AND_REPLAY_ENGINE_DEADLOCK); + bool is_restore = false; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "partition is not initialized", K(ret), K(pkey_)); + } else if (FALSE_IT(is_restore = (partition_->get_pg_storage().is_restore()))) { } else if (OB_FAIL(gen_readable_info_(readable_info))) { // no need to caculate timestamp when partition is rebuilding if (OB_STATE_NOT_MATCH != ret && OB_PARTITION_NOT_EXIST != ret) { STORAGE_LOG(WARN, "fail to gen readble info", K(ret), K(pkey_)); } } else if (!readable_info.is_valid()) { - if (partition_->get_pg_storage().is_restore()) { - // ignore pg in restore - if (REACH_TIME_INTERVAL(2 * 1000 * 1000L)) { + if (is_restore) { + //ignore pg in restore + ret = OB_STATE_NOT_MATCH; + if (REACH_TIME_INTERVAL(2 *1000 * 1000L)) { STORAGE_LOG(WARN, "partition is in restore, just ignore", K(ret), K_(pkey), K(readable_info)); } } else { diff --git a/src/storage/ob_replay_status.cpp b/src/storage/ob_replay_status.cpp index aad05eab5ffa3b09823da8e582b88fb387c86721..91dba2a820eebba77840ee51d1e6da4900e4be9a 100644 --- a/src/storage/ob_replay_status.cpp +++ b/src/storage/ob_replay_status.cpp @@ -877,41 +877,28 @@ int ObReplayStatus::set_need_filter_trans_log(const ObPartitionKey& pkey, const return ret; } -int ObReplayStatus::check_and_submit_task(const ObPartitionKey& pkey, const uint64_t log_id, const int64_t log_ts, - const bool need_replay, const clog::ObLogType log_type, const int64_t sw_next_replay_log_ts) +int ObReplayStatus::check_and_submit_task(const ObPartitionKey &pkey, + const uint64_t log_id, + const int64_t log_ts, + const bool need_replay, + const clog::ObLogType log_type, + const int64_t next_replay_log_ts) { int ret = OB_SUCCESS; - const bool is_aggre_log = (clog::OB_LOG_AGGRE == log_type); - const bool is_nop_log = (clog::OB_LOG_NOP == log_type); - // check when log slide out + //check when log slide out const int64_t last_slide_out_log_id = get_last_slide_out_log_id(); if (OB_UNLIKELY(!is_enabled())) { ret = OB_ERR_UNEXPECTED; - REPLAY_LOG( - ERROR, "replay status is not enabled", K(need_replay), K(pkey), K(log_id), K(log_type), K(log_ts), K(ret)); - } else if (OB_UNLIKELY(!pkey.is_valid() || OB_INVALID_TIMESTAMP == log_ts || OB_INVALID_ID == log_id || - OB_INVALID_TIMESTAMP == sw_next_replay_log_ts)) { + REPLAY_LOG(ERROR, "replay status is not enabled", K(need_replay), K(pkey), K(log_id), K(log_type), + K(log_ts), K(ret)); + } else if (OB_UNLIKELY(!pkey.is_valid() + || OB_INVALID_TIMESTAMP == log_ts + || OB_INVALID_ID == log_id + || OB_INVALID_TIMESTAMP == next_replay_log_ts + || next_replay_log_ts > log_ts)) { ret = OB_INVALID_ARGUMENT; - REPLAY_LOG(ERROR, - "invalid arguments", - K(need_replay), - K(pkey), - K(log_id), - K(log_ts), - K(log_type), - K(sw_next_replay_log_ts), - K(ret)); - } else if (OB_UNLIKELY((!is_nop_log) && (sw_next_replay_log_ts > log_ts))) { - ret = OB_ERR_UNEXPECTED; - REPLAY_LOG(ERROR, - "invalid arguments", - K(need_replay), - K(pkey), - K(log_id), - K(log_ts), - K(log_type), - K(sw_next_replay_log_ts), - K(ret)); + REPLAY_LOG(ERROR, "invalid arguments", K(need_replay), K(pkey), K(log_id), K(log_ts), + K(log_type), K(next_replay_log_ts), K(ret)); } else if (log_id != (last_slide_out_log_id + 1)) { ret = OB_ERR_UNEXPECTED; REPLAY_LOG( @@ -947,14 +934,11 @@ int ObReplayStatus::check_and_submit_task(const ObPartitionKey& pkey, const uint } else { { if (!submit_log_task_.need_submit_log()) { - // do not inc next_submit_log_ts with aggre log, it's the biggest log_submit_timestamp among its - // logs - const int64_t next_submit_log_ts = is_aggre_log ? sw_next_replay_log_ts : log_ts; - // here must modify log_ts first, or may lead to the rollback of min_unreplay_log_timestamp + //here must modify log_ts first, or may lead to the rollback of min_unreplay_log_timestamp WLockGuard wlock_guard(get_submit_log_info_rwlock()); const uint64_t old_next_submit_log_id = get_next_submit_log_id(); const int64_t old_next_submit_log_ts = get_next_submit_log_ts(); - set_next_submit_log_info(log_id, next_submit_log_ts); + set_next_submit_log_info(log_id, next_replay_log_ts); if (OB_FAIL(update_last_slide_out_log_info(log_id, log_ts))) { REPLAY_LOG( ERROR, "failed to update_last_slide_out_log_info", KR(ret), K(pkey), K(log_id), K(log_ts), K(log_type)); diff --git a/src/storage/ob_replay_status.h b/src/storage/ob_replay_status.h index 0da8d0f4c81d6acad8645c4c63c2ad0f4d114df3..cf92c17ceb32ac530a6f226997db1d9296d90549 100644 --- a/src/storage/ob_replay_status.h +++ b/src/storage/ob_replay_status.h @@ -532,11 +532,15 @@ class ObReplayStatus { { return submit_log_task_.get_pending_submit_task_count(); } - int check_and_submit_task(const common::ObPartitionKey& pkey, const uint64_t log_id, const int64_t log_ts, - const bool need_replay, const clog::ObLogType log_type, const int64_t sw_next_replay_log_ts); - int submit_restore_task(const common::ObPartitionKey& pkey, const uint64_t log_id, const int64_t log_ts); - - int push_task(ObReplayLogTask& task, uint64_t task_sign); + int check_and_submit_task(const common::ObPartitionKey &pkey, + const uint64_t log_id, + const int64_t log_ts, + const bool need_replay, + const clog::ObLogType log_type, + const int64_t next_replay_log_ts); + int submit_restore_task(const common::ObPartitionKey &pkey, const uint64_t log_id, + const int64_t log_ts); + int push_task(ObReplayLogTask &task, uint64_t task_sign); void add_task(ObReplayLogTask& task); void remove_task(ObReplayLogTask& task); void dec_task_count(const common::ObPartitionKey& pkey); diff --git a/src/storage/replayengine/ob_log_replay_engine.cpp b/src/storage/replayengine/ob_log_replay_engine.cpp index bd1390920ce7c9da4109eafcb8e726e0edd13c80..083a11cc96e870dfee3b0c85493cc2304de14f45 100644 --- a/src/storage/replayengine/ob_log_replay_engine.cpp +++ b/src/storage/replayengine/ob_log_replay_engine.cpp @@ -507,8 +507,12 @@ bool ObLogReplayEngine::is_valid_param( } /* -----------------submit log task related begin------ */ -int ObLogReplayEngine::submit_replay_log_task_sequentially(const common::ObPartitionKey& pkey, const uint64_t log_id, - const int64_t log_ts, const bool need_replay, const clog::ObLogType log_type, const int64_t sw_next_replay_log_ts) +int ObLogReplayEngine::submit_replay_log_task_sequentially(const common::ObPartitionKey &pkey, + const uint64_t log_id, + const int64_t log_ts, + const bool need_replay, + const ObLogType log_type, + const int64_t next_replay_log_ts) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -549,12 +553,13 @@ int ObLogReplayEngine::submit_replay_log_task_sequentially(const common::ObParti } else { ObReplayStatus::RLockGuard rlock_guard(replay_status->get_rwlock()); bool need_submit = true; - if (need_replay && - OB_FAIL(check_need_submit_current_log_(pkey, *partition, log_id, log_ts, *replay_status, need_submit))) { - REPLAY_LOG( - ERROR, "failed to check_need_submit_current_log_", K(pkey), K(need_replay), K(log_id), K(log_ts), K(ret)); - } else if (OB_FAIL(replay_status->check_and_submit_task( - pkey, log_id, log_ts, (need_replay && need_submit), log_type, sw_next_replay_log_ts))) { + if (need_replay && OB_FAIL(check_need_submit_current_log_(pkey, *partition, log_id, log_ts, + *replay_status, need_submit))) { + REPLAY_LOG(ERROR, "failed to check_need_submit_current_log_", + K(pkey), K(need_replay), K(log_id), K(log_ts), K(ret)); + } else if (OB_FAIL(replay_status->check_and_submit_task(pkey, log_id, log_ts, + (need_replay && need_submit), + log_type, next_replay_log_ts))) { if (OB_EAGAIN == ret) { if (REACH_TIME_INTERVAL(1000 * 1000)) { REPLAY_LOG(WARN, diff --git a/src/storage/replayengine/ob_log_replay_engine.h b/src/storage/replayengine/ob_log_replay_engine.h index ab02d0592d5e95d3e3ef7dc93d1e4deb777d520a..a4ebd3e4229ad84ecaac97a715913399125e6545 100644 --- a/src/storage/replayengine/ob_log_replay_engine.h +++ b/src/storage/replayengine/ob_log_replay_engine.h @@ -48,29 +48,34 @@ class ObLogReplayEngine : public ObILogReplayEngine, public lib::TGTaskHandler { typedef storage::ObReplayLogTask ObReplayLogTask; ObLogReplayEngine(); virtual ~ObLogReplayEngine(); - public: - virtual int init(transaction::ObTransService* trans_replay_service, storage::ObPartitionService* partition_service, - const ObLogReplayEngineConfig& config); - virtual int submit_replay_log_task_sequentially(const common::ObPartitionKey& pkey, const uint64_t log_id, - const int64_t log_ts, const bool need_replay, const clog::ObLogType log_type, - const int64_t sw_next_replay_log_ts); - virtual int submit_replay_log_task_by_restore( - const common::ObPartitionKey& pkey, const uint64_t log_id, const int64_t log_ts); - - virtual int add_partition(const common::ObPartitionKey& partition_key); - virtual int remove_partition(const common::ObPartitionKey& pkey, storage::ObIPartitionGroup* partition); - virtual int remove_partition(const common::ObPartitionKey& partition_key); - virtual int reset_partition(const common::ObPartitionKey& partition_key); - virtual int set_need_filter_trans_log(const common::ObPartitionKey& partition_key, const bool need_filter); - - virtual int is_replay_finished(const common::ObPartitionKey& partition_key, bool& is_finished) const; - virtual int is_submit_finished(const common::ObPartitionKey& partition_key, bool& is_finished) const; - virtual int check_can_receive_log(const common::ObPartitionKey& pkey, bool& can_receive_log) const; - virtual int get_pending_submit_task_count(const common::ObPartitionKey& partition_key, int64_t& pending_count) const; - virtual void handle(void* task); - virtual int submit_task_into_queue(storage::ObReplayTask* task); - + virtual int init(transaction::ObTransService *trans_replay_service, + storage::ObPartitionService *partition_service, + const ObLogReplayEngineConfig &config); + virtual int submit_replay_log_task_sequentially(const common::ObPartitionKey &pkey, + const uint64_t log_id, + const int64_t log_submit_ts, + const bool need_replay, + const clog::ObLogType log_type, + const int64_t next_replay_log_ts); + virtual int submit_replay_log_task_by_restore(const common::ObPartitionKey &pkey, + const uint64_t log_id, + const int64_t log_ts); + + virtual int add_partition(const common::ObPartitionKey &partition_key); + virtual int remove_partition(const common::ObPartitionKey &pkey, + storage::ObIPartitionGroup *partition); + virtual int remove_partition(const common::ObPartitionKey &partition_key); + virtual int reset_partition(const common::ObPartitionKey &partition_key); + virtual int set_need_filter_trans_log(const common::ObPartitionKey &partition_key, + const bool need_filter); + + virtual int is_replay_finished(const common::ObPartitionKey &partition_key, bool &is_finished) const; + virtual int is_submit_finished(const common::ObPartitionKey &partition_key, bool &is_finished) const; + virtual int check_can_receive_log(const common::ObPartitionKey &pkey, bool &can_receive_log) const; + virtual int get_pending_submit_task_count(const common::ObPartitionKey &partition_key, int64_t &pending_count) const; + virtual void handle(void *task); + virtual int submit_task_into_queue(storage::ObReplayTask *task); virtual int is_tenant_out_of_memory(const common::ObPartitionKey& partition_key, bool& is_out_of_mem); virtual void stop(); virtual void wait();