提交 795c8346 编写于 作者: Y yy 提交者: MizuhaHimuraki

[replayengine] fix rollback of min_unreplay_log_ts

上级 ede12134
...@@ -113,6 +113,28 @@ int ObLogEntry::deep_copy_to(ObLogEntry& entry) const ...@@ -113,6 +113,28 @@ int ObLogEntry::deep_copy_to(ObLogEntry& entry) const
return deep_copy_to_(entry); return deep_copy_to_(entry);
} }
int ObLogEntry::get_next_replay_ts_for_rg(int64_t &next_replay_ts) const
{
int ret = OB_SUCCESS;
if (OB_LOG_AGGRE == header_.get_log_type()) {
int64_t pos = 0;
int32_t next_log_offset = 0;
const char *log_buf = get_buf();
const int64_t log_buf_len = header_.get_data_len();
if (OB_FAIL(serialization::decode_i32(log_buf, log_buf_len, pos, &next_log_offset))) {
REPLAY_LOG(ERROR, "serialization decode_i32 failed", KR(ret), K(log_buf_len), K(pos),
K(header_), K(next_log_offset));
} else if (OB_FAIL(serialization::decode_i64(log_buf, log_buf_len, pos, &next_replay_ts))) {
REPLAY_LOG(ERROR, "serialization decode_i64 failed", KR(ret), K(log_buf_len), K(pos),
K(header_), K(next_log_offset));
} else {/*do nothing*/}
} else {
//for non_aggregate_log, just assign next_replay_ts with submit_timestamp_
next_replay_ts = header_.get_submit_timestamp();
}
return ret;
}
DEFINE_SERIALIZE(ObLogEntry) DEFINE_SERIALIZE(ObLogEntry)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
......
...@@ -59,6 +59,7 @@ class ObLogEntry { ...@@ -59,6 +59,7 @@ class ObLogEntry {
{ {
return header_.update_proposal_id(new_proposal_id); return header_.update_proposal_id(new_proposal_id);
} }
int get_next_replay_ts_for_rg(int64_t &next_replay_ts) const;
TO_STRING_KV(N_HEADER, header_); TO_STRING_KV(N_HEADER, header_);
NEED_SERIALIZE_AND_DESERIALIZE; NEED_SERIALIZE_AND_DESERIALIZE;
......
...@@ -3357,26 +3357,27 @@ int ObLogSlidingWindow::need_replay_for_data_or_log_replica_(const bool is_trans ...@@ -3357,26 +3357,27 @@ int ObLogSlidingWindow::need_replay_for_data_or_log_replica_(const bool is_trans
return ret; return ret;
} }
int ObLogSlidingWindow::check_is_meta_log(const ObPartitionKey& pkey, uint64_t log_id, bool& is_meta_log, int ObLogSlidingWindow::get_log_meta_info(uint64_t log_id, bool &is_meta_log, int64_t &log_ts,
int64_t& log_ts, int64_t& accum_checksum, ObLogType& log_type) const int64_t &next_replay_log_ts_for_rg, int64_t &accum_checksum, ObLogType &log_type) const
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
is_meta_log = false; is_meta_log = false;
ObICLogMgr* clog_mgr = NULL; ObICLogMgr* clog_mgr = NULL;
if (OB_ISNULL(partition_service_) || OB_ISNULL(clog_mgr = partition_service_->get_clog_mgr())) { if (OB_ISNULL(partition_service_) || OB_ISNULL(clog_mgr = partition_service_->get_clog_mgr())) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
CLOG_LOG(WARN, "invalid argument", K(pkey), K(log_id), KP(partition_service_), KP(clog_mgr), KR(ret)); CLOG_LOG(WARN, "invalid argument", K(partition_key_), K(log_id), KP(partition_service_), KP(clog_mgr), KR(ret));
} else { } else {
clog::ObLogEntry log_entry; clog::ObLogEntry log_entry;
bool is_batch_committed = false; bool is_batch_committed = false;
if (OB_FAIL(clog_mgr->query_log_info_with_log_id(pkey, log_id, log_entry, accum_checksum, is_batch_committed))) { if (OB_FAIL(clog_mgr->query_log_info_with_log_id(partition_key_, log_id, log_entry, accum_checksum, is_batch_committed))) {
if (OB_EAGAIN == ret) { if (OB_EAGAIN == ret) {
if (REACH_TIME_INTERVAL(100 * 1000)) { if (REACH_TIME_INTERVAL(100 * 1000)) {
CLOG_LOG(WARN, "failed to query_log_info_with_log_id ", K(pkey), K(log_id), K(ret)); CLOG_LOG(WARN, "failed to query_log_info_with_log_id ", K(partition_key_), K(log_id), K(ret));
} }
} else { } else {
CLOG_LOG(WARN, "failed to query_log_info_with_log_id ", K(pkey), K(log_id), K(ret)); CLOG_LOG(WARN, "failed to query_log_info_with_log_id ", K(partition_key_), K(log_id), K(ret));
} }
} else if (OB_FAIL(log_entry.get_next_replay_ts_for_rg(next_replay_log_ts_for_rg))) {
} else { } else {
log_type = log_entry.get_header().get_log_type(); log_type = log_entry.get_header().get_log_type();
log_ts = log_entry.get_header().get_submit_timestamp(); log_ts = log_entry.get_header().get_submit_timestamp();
...@@ -3425,6 +3426,7 @@ int ObLogSlidingWindow::try_submit_replay_task_(const uint64_t log_id, const ObL ...@@ -3425,6 +3426,7 @@ int ObLogSlidingWindow::try_submit_replay_task_(const uint64_t log_id, const ObL
} else { } else {
const ObLogType header_log_type = log_task.get_log_type(); const ObLogType header_log_type = log_task.get_log_type();
const int64_t log_submit_timestamp = log_task.get_submit_timestamp(); const int64_t log_submit_timestamp = log_task.get_submit_timestamp();
const int64_t next_replay_log_ts = log_task.get_next_replay_log_ts();
bool need_replay = log_task.need_replay(); bool need_replay = log_task.need_replay();
const bool is_trans_log = log_task.is_trans_log(); const bool is_trans_log = log_task.is_trans_log();
uint64_t last_replay_log_id = OB_INVALID_ID; uint64_t last_replay_log_id = OB_INVALID_ID;
...@@ -3510,18 +3512,10 @@ int ObLogSlidingWindow::try_submit_replay_task_(const uint64_t log_id, const ObL ...@@ -3510,18 +3512,10 @@ int ObLogSlidingWindow::try_submit_replay_task_(const uint64_t log_id, const ObL
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
int64_t next_replay_log_timestamp = INT64_MAX;
if (state_mgr_->is_offline()) { if (state_mgr_->is_offline()) {
CLOG_LOG( CLOG_LOG(WARN, "no need to submit log to replay when partition is offline", KR(ret), K(partition_key_), K(log_id));
WARN, "no need to submit log to replay when partition is offline", KR(ret), K(partition_key_), K(log_id)); } else if (OB_FAIL(replay_engine_->submit_replay_log_task_sequentially(partition_key_, log_id, log_submit_timestamp,
} else if (OB_FAIL(get_next_replay_log_timestamp(next_replay_log_timestamp))) { need_replay, header_log_type, next_replay_log_ts))) {
CLOG_LOG(WARN, "failed to get_next_replay_log_timestamp", KR(ret), K(partition_key_), K(log_id));
} else if (OB_FAIL(replay_engine_->submit_replay_log_task_sequentially(partition_key_,
log_id,
log_submit_timestamp,
need_replay,
header_log_type,
next_replay_log_timestamp))) {
if (OB_EAGAIN != ret) { if (OB_EAGAIN != ret) {
CLOG_LOG(WARN, CLOG_LOG(WARN,
"failed to submit replay task", "failed to submit replay task",
......
...@@ -498,10 +498,14 @@ class ObLogSlidingWindow : public ObILogSWForCasMgr, ...@@ -498,10 +498,14 @@ class ObLogSlidingWindow : public ObILogSWForCasMgr,
int leader_active(); int leader_active();
int leader_takeover(); int leader_takeover();
int leader_revoke(); int leader_revoke();
int get_replica_replay_type(ObReplicaReplayType& replay_type) const; int get_replica_replay_type(ObReplicaReplayType &replay_type) const;
// is_meta_log: log type that need been replayed by D replica and log replica //is_meta_log: log type that need been replayed by D replica and log replica
int check_is_meta_log(const common::ObPartitionKey& pkey, uint64_t log_id, bool& is_meta_log, int64_t& log_ts, int get_log_meta_info(uint64_t log_id,
int64_t& accum_checksum, ObLogType& log_type) const; bool &is_meta_log,
int64_t &log_ts,
int64_t &next_replay_log_ts_for_rg,
int64_t &accum_checksum,
ObLogType &log_type) const;
void destroy_aggre_buffer(); void destroy_aggre_buffer();
uint64_t get_leader_max_unconfirmed_log_count(); uint64_t get_leader_max_unconfirmed_log_count();
uint64_t get_follower_max_unconfirmed_log_count(); uint64_t get_follower_max_unconfirmed_log_count();
......
...@@ -101,6 +101,7 @@ ObLogTask::ObLogTask() ...@@ -101,6 +101,7 @@ ObLogTask::ObLogTask()
log_buf_(NULL), log_buf_(NULL),
generation_timestamp_(OB_INVALID_TIMESTAMP), generation_timestamp_(OB_INVALID_TIMESTAMP),
submit_timestamp_(OB_INVALID_TIMESTAMP), submit_timestamp_(OB_INVALID_TIMESTAMP),
next_replay_log_ts_(OB_INVALID_TIMESTAMP),
epoch_id_(OB_INVALID_TIMESTAMP), epoch_id_(OB_INVALID_TIMESTAMP),
data_checksum_(0), data_checksum_(0),
accum_checksum_(0), accum_checksum_(0),
...@@ -221,6 +222,7 @@ int ObLogTask::reset_log() ...@@ -221,6 +222,7 @@ int ObLogTask::reset_log()
log_buf_len_ = 0; log_buf_len_ = 0;
generation_timestamp_ = OB_INVALID_TIMESTAMP; generation_timestamp_ = OB_INVALID_TIMESTAMP;
submit_timestamp_ = OB_INVALID_TIMESTAMP; submit_timestamp_ = OB_INVALID_TIMESTAMP;
next_replay_log_ts_ = OB_INVALID_TIMESTAMP;
state_map_.reset_map(LOCAL_FLUSHED); state_map_.reset_map(LOCAL_FLUSHED);
state_map_.reset_map(ALREADY_SEND_TO_STANDBY); state_map_.reset_map(ALREADY_SEND_TO_STANDBY);
state_map_.reset_map(SUBMIT_LOG_EXIST); state_map_.reset_map(SUBMIT_LOG_EXIST);
...@@ -629,6 +631,11 @@ int64_t ObLogTask::get_submit_timestamp() const ...@@ -629,6 +631,11 @@ int64_t ObLogTask::get_submit_timestamp() const
return submit_timestamp_; return submit_timestamp_;
} }
int64_t ObLogTask::get_next_replay_log_ts() const
{
return next_replay_log_ts_;
}
int64_t ObLogTask::get_data_checksum() const int64_t ObLogTask::get_data_checksum() const
{ {
return data_checksum_; return data_checksum_;
...@@ -731,7 +738,7 @@ int ObLogTask::log_deep_copy_to_(const ObLogEntry& log_entry, const bool need_co ...@@ -731,7 +738,7 @@ int ObLogTask::log_deep_copy_to_(const ObLogEntry& log_entry, const bool need_co
if (NULL == if (NULL ==
(buf = static_cast<char*>(TMA_MGR_INSTANCE.alloc_log_entry_buf(log_entry.get_header().get_data_len())))) { (buf = static_cast<char*>(TMA_MGR_INSTANCE.alloc_log_entry_buf(log_entry.get_header().get_data_len())))) {
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
CLOG_LOG(ERROR, "allocate memory fail", K(ret), "header", log_entry.get_header()); CLOG_LOG(WARN, "allocate memory fail", K(ret), "header", log_entry.get_header());
} else { } else {
MEMCPY(buf, log_entry.get_buf(), log_entry.get_header().get_data_len()); MEMCPY(buf, log_entry.get_buf(), log_entry.get_header().get_data_len());
log_buf_ = buf; log_buf_ = buf;
...@@ -742,15 +749,20 @@ int ObLogTask::log_deep_copy_to_(const ObLogEntry& log_entry, const bool need_co ...@@ -742,15 +749,20 @@ int ObLogTask::log_deep_copy_to_(const ObLogEntry& log_entry, const bool need_co
log_buf_len_ = 0; log_buf_len_ = 0;
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
log_type_ = static_cast<uint8_t>(log_entry.get_header().get_log_type()); if (OB_FAIL(log_entry.get_next_replay_ts_for_rg(next_replay_log_ts_))) {
if (log_entry.get_header().is_trans_log()) { CLOG_LOG(WARN, "failed to get_next_replay_ts_for_rg", K(ret), "header", log_entry.get_header());
state_map_.set_map(IS_TRANS_LOG); } else {
log_type_ = static_cast<uint8_t>(log_entry.get_header().get_log_type());
const ObLogEntryHeader &log_header = log_entry.get_header();
if (log_header.is_trans_log()) {
state_map_.set_map(IS_TRANS_LOG);
}
proposal_id_ = log_header.get_proposal_id();
generation_timestamp_ = log_header.get_generation_timestamp();
submit_timestamp_ = log_header.get_submit_timestamp();
data_checksum_ = log_header.get_data_checksum();
epoch_id_ = log_header.get_epoch_id();
} }
proposal_id_ = log_entry.get_header().get_proposal_id();
generation_timestamp_ = log_entry.get_header().get_generation_timestamp();
submit_timestamp_ = log_entry.get_header().get_submit_timestamp();
data_checksum_ = log_entry.get_header().get_data_checksum();
epoch_id_ = log_entry.get_header().get_epoch_id();
} }
return ret; return ret;
} }
......
...@@ -142,6 +142,7 @@ class ObLogTask : public ObILogExtRingBufferData { ...@@ -142,6 +142,7 @@ class ObLogTask : public ObILogExtRingBufferData {
char* get_log_buf() const; char* get_log_buf() const;
int32_t get_log_buf_len() const; int32_t get_log_buf_len() const;
int64_t get_generation_timestamp() const; int64_t get_generation_timestamp() const;
int64_t get_next_replay_log_ts() const;
int64_t get_submit_timestamp() const; int64_t get_submit_timestamp() const;
int64_t get_data_checksum() const; int64_t get_data_checksum() const;
int64_t get_epoch_id() const; int64_t get_epoch_id() const;
...@@ -152,11 +153,10 @@ class ObLogTask : public ObILogExtRingBufferData { ...@@ -152,11 +153,10 @@ class ObLogTask : public ObILogExtRingBufferData {
// common::ObTraceProfile *get_trace_profile() {return trace_profile_;} // common::ObTraceProfile *get_trace_profile() {return trace_profile_;}
// int report_trace(); // int report_trace();
TO_STRING_KV(K(log_type_), K(proposal_id_), K(log_buf_len_), K_(generation_timestamp), K(submit_timestamp_), TO_STRING_KV(K(log_type_), K(proposal_id_), K(log_buf_len_), K_(generation_timestamp),
K_(data_checksum), K(epoch_id_), K(accum_checksum_), K_(state_map), K_(ack_list), KP_(submit_cb), K(submit_timestamp_), K(next_replay_log_ts_), K_(data_checksum), K(epoch_id_), K(accum_checksum_),
K_(majority_cnt), K_(log_cursor)); K_(state_map), K_(ack_list), KP_(submit_cb), K_(majority_cnt), K_(log_cursor));
public:
public:
virtual void destroy(); virtual void destroy();
virtual bool can_be_removed(); virtual bool can_be_removed();
virtual bool can_overwrite(const ObILogExtRingBufferData* log_task); virtual bool can_overwrite(const ObILogExtRingBufferData* log_task);
...@@ -185,6 +185,10 @@ class ObLogTask : public ObILogExtRingBufferData { ...@@ -185,6 +185,10 @@ class ObLogTask : public ObILogExtRingBufferData {
int64_t generation_timestamp_; int64_t generation_timestamp_;
// 8 bytes // 8 bytes
int64_t submit_timestamp_; int64_t submit_timestamp_;
// 8 bytes, used for record next_replay_log_ts of total log_entry:
// for unencrypted OB_LOG_AGGR log: next_replay_log_ts is min_log_submit_ts of all aggregated logs
// for other logs: equal with submit_timestamp_
int64_t next_replay_log_ts_;
// ObConfirmedInfo, 16 bytes // ObConfirmedInfo, 16 bytes
int64_t epoch_id_; int64_t epoch_id_;
int64_t data_checksum_; int64_t data_checksum_;
......
...@@ -5024,6 +5024,7 @@ int ObPartitionLogService::restore_replayed_log(const common::ObBaseStorageInfo& ...@@ -5024,6 +5024,7 @@ int ObPartitionLogService::restore_replayed_log(const common::ObBaseStorageInfo&
uint64_t cur_log_id = start_id; uint64_t cur_log_id = start_id;
bool need_replay = false; bool need_replay = false;
int64_t log_submit_timestamp = OB_INVALID_TIMESTAMP; int64_t log_submit_timestamp = OB_INVALID_TIMESTAMP;
int64_t next_replay_log_ts_for_rg = OB_INVALID_TIMESTAMP;
int64_t accum_checksum = 0; int64_t accum_checksum = 0;
do { do {
if (OB_EAGAIN == ret) { if (OB_EAGAIN == ret) {
...@@ -5031,8 +5032,8 @@ int ObPartitionLogService::restore_replayed_log(const common::ObBaseStorageInfo& ...@@ -5031,8 +5032,8 @@ int ObPartitionLogService::restore_replayed_log(const common::ObBaseStorageInfo&
} }
bool is_meta_log = false; bool is_meta_log = false;
ObLogType log_type = OB_LOG_UNKNOWN; ObLogType log_type = OB_LOG_UNKNOWN;
if (OB_FAIL(sw_.check_is_meta_log( if (OB_FAIL(sw_.get_log_meta_info(cur_log_id, is_meta_log, log_submit_timestamp,
partition_key_, cur_log_id, is_meta_log, log_submit_timestamp, accum_checksum, log_type))) { next_replay_log_ts_for_rg, accum_checksum, log_type))) {
if (OB_EAGAIN == ret) { if (OB_EAGAIN == ret) {
if (REACH_TIME_INTERVAL(100 * 1000)) { if (REACH_TIME_INTERVAL(100 * 1000)) {
CLOG_LOG(WARN, "failed to check is meta log", K(partition_key_), K(cur_log_id), K(ret)); CLOG_LOG(WARN, "failed to check is meta log", K(partition_key_), K(cur_log_id), K(ret));
...@@ -5045,15 +5046,10 @@ int ObPartitionLogService::restore_replayed_log(const common::ObBaseStorageInfo& ...@@ -5045,15 +5046,10 @@ int ObPartitionLogService::restore_replayed_log(const common::ObBaseStorageInfo&
} }
uint64_t last_replay_log_id = OB_INVALID_ID; uint64_t last_replay_log_id = OB_INVALID_ID;
int64_t last_replay_log_ts = OB_INVALID_TIMESTAMP;
(void)sw_.get_last_replay_log(last_replay_log_id, last_replay_log_ts);
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
} else if (OB_FAIL(replay_engine_->submit_replay_log_task_sequentially(partition_key_, } else if (OB_FAIL(replay_engine_->submit_replay_log_task_sequentially(partition_key_, cur_log_id,
cur_log_id, log_submit_timestamp, need_replay,
log_submit_timestamp, log_type, next_replay_log_ts_for_rg))) {
need_replay,
log_type,
last_replay_log_ts + 1))) {
if (OB_EAGAIN == ret) { if (OB_EAGAIN == ret) {
if (REACH_TIME_INTERVAL(100 * 1000)) { if (REACH_TIME_INTERVAL(100 * 1000)) {
CLOG_LOG(WARN, "failed to submit replay task step by step", K(ret), K_(partition_key), K(cur_log_id)); CLOG_LOG(WARN, "failed to submit replay task step by step", K(ret), K_(partition_key), K(cur_log_id));
......
...@@ -193,18 +193,21 @@ int ObPartitionLoopWorker::generate_weak_read_timestamp(const int64_t max_stale_ ...@@ -193,18 +193,21 @@ int ObPartitionLoopWorker::generate_weak_read_timestamp(const int64_t max_stale_
DEBUG_SYNC(BLOCK_WEAK_READ_TIMESTAMP); DEBUG_SYNC(BLOCK_WEAK_READ_TIMESTAMP);
DEBUG_SYNC(SYNC_PG_AND_REPLAY_ENGINE_DEADLOCK); DEBUG_SYNC(SYNC_PG_AND_REPLAY_ENGINE_DEADLOCK);
bool is_restore = false;
if (OB_UNLIKELY(!is_inited_)) { if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "partition is not initialized", K(ret), K(pkey_)); STORAGE_LOG(WARN, "partition is not initialized", K(ret), K(pkey_));
} else if (FALSE_IT(is_restore = (partition_->get_pg_storage().is_restore()))) {
} else if (OB_FAIL(gen_readable_info_(readable_info))) { } else if (OB_FAIL(gen_readable_info_(readable_info))) {
// no need to caculate timestamp when partition is rebuilding // no need to caculate timestamp when partition is rebuilding
if (OB_STATE_NOT_MATCH != ret && OB_PARTITION_NOT_EXIST != ret) { if (OB_STATE_NOT_MATCH != ret && OB_PARTITION_NOT_EXIST != ret) {
STORAGE_LOG(WARN, "fail to gen readble info", K(ret), K(pkey_)); STORAGE_LOG(WARN, "fail to gen readble info", K(ret), K(pkey_));
} }
} else if (!readable_info.is_valid()) { } else if (!readable_info.is_valid()) {
if (partition_->get_pg_storage().is_restore()) { if (is_restore) {
// ignore pg in restore //ignore pg in restore
if (REACH_TIME_INTERVAL(2 * 1000 * 1000L)) { ret = OB_STATE_NOT_MATCH;
if (REACH_TIME_INTERVAL(2 *1000 * 1000L)) {
STORAGE_LOG(WARN, "partition is in restore, just ignore", K(ret), K_(pkey), K(readable_info)); STORAGE_LOG(WARN, "partition is in restore, just ignore", K(ret), K_(pkey), K(readable_info));
} }
} else { } else {
......
...@@ -877,41 +877,28 @@ int ObReplayStatus::set_need_filter_trans_log(const ObPartitionKey& pkey, const ...@@ -877,41 +877,28 @@ int ObReplayStatus::set_need_filter_trans_log(const ObPartitionKey& pkey, const
return ret; return ret;
} }
int ObReplayStatus::check_and_submit_task(const ObPartitionKey& pkey, const uint64_t log_id, const int64_t log_ts, int ObReplayStatus::check_and_submit_task(const ObPartitionKey &pkey,
const bool need_replay, const clog::ObLogType log_type, const int64_t sw_next_replay_log_ts) const uint64_t log_id,
const int64_t log_ts,
const bool need_replay,
const clog::ObLogType log_type,
const int64_t next_replay_log_ts)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const bool is_aggre_log = (clog::OB_LOG_AGGRE == log_type); //check when log slide out
const bool is_nop_log = (clog::OB_LOG_NOP == log_type);
// check when log slide out
const int64_t last_slide_out_log_id = get_last_slide_out_log_id(); const int64_t last_slide_out_log_id = get_last_slide_out_log_id();
if (OB_UNLIKELY(!is_enabled())) { if (OB_UNLIKELY(!is_enabled())) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
REPLAY_LOG( REPLAY_LOG(ERROR, "replay status is not enabled", K(need_replay), K(pkey), K(log_id), K(log_type),
ERROR, "replay status is not enabled", K(need_replay), K(pkey), K(log_id), K(log_type), K(log_ts), K(ret)); K(log_ts), K(ret));
} else if (OB_UNLIKELY(!pkey.is_valid() || OB_INVALID_TIMESTAMP == log_ts || OB_INVALID_ID == log_id || } else if (OB_UNLIKELY(!pkey.is_valid()
OB_INVALID_TIMESTAMP == sw_next_replay_log_ts)) { || OB_INVALID_TIMESTAMP == log_ts
|| OB_INVALID_ID == log_id
|| OB_INVALID_TIMESTAMP == next_replay_log_ts
|| next_replay_log_ts > log_ts)) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
REPLAY_LOG(ERROR, REPLAY_LOG(ERROR, "invalid arguments", K(need_replay), K(pkey), K(log_id), K(log_ts),
"invalid arguments", K(log_type), K(next_replay_log_ts), K(ret));
K(need_replay),
K(pkey),
K(log_id),
K(log_ts),
K(log_type),
K(sw_next_replay_log_ts),
K(ret));
} else if (OB_UNLIKELY((!is_nop_log) && (sw_next_replay_log_ts > log_ts))) {
ret = OB_ERR_UNEXPECTED;
REPLAY_LOG(ERROR,
"invalid arguments",
K(need_replay),
K(pkey),
K(log_id),
K(log_ts),
K(log_type),
K(sw_next_replay_log_ts),
K(ret));
} else if (log_id != (last_slide_out_log_id + 1)) { } else if (log_id != (last_slide_out_log_id + 1)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
REPLAY_LOG( REPLAY_LOG(
...@@ -947,14 +934,11 @@ int ObReplayStatus::check_and_submit_task(const ObPartitionKey& pkey, const uint ...@@ -947,14 +934,11 @@ int ObReplayStatus::check_and_submit_task(const ObPartitionKey& pkey, const uint
} else { } else {
{ {
if (!submit_log_task_.need_submit_log()) { if (!submit_log_task_.need_submit_log()) {
// do not inc next_submit_log_ts with aggre log, it's the biggest log_submit_timestamp among its //here must modify log_ts first, or may lead to the rollback of min_unreplay_log_timestamp
// logs
const int64_t next_submit_log_ts = is_aggre_log ? sw_next_replay_log_ts : log_ts;
// here must modify log_ts first, or may lead to the rollback of min_unreplay_log_timestamp
WLockGuard wlock_guard(get_submit_log_info_rwlock()); WLockGuard wlock_guard(get_submit_log_info_rwlock());
const uint64_t old_next_submit_log_id = get_next_submit_log_id(); const uint64_t old_next_submit_log_id = get_next_submit_log_id();
const int64_t old_next_submit_log_ts = get_next_submit_log_ts(); const int64_t old_next_submit_log_ts = get_next_submit_log_ts();
set_next_submit_log_info(log_id, next_submit_log_ts); set_next_submit_log_info(log_id, next_replay_log_ts);
if (OB_FAIL(update_last_slide_out_log_info(log_id, log_ts))) { if (OB_FAIL(update_last_slide_out_log_info(log_id, log_ts))) {
REPLAY_LOG( REPLAY_LOG(
ERROR, "failed to update_last_slide_out_log_info", KR(ret), K(pkey), K(log_id), K(log_ts), K(log_type)); ERROR, "failed to update_last_slide_out_log_info", KR(ret), K(pkey), K(log_id), K(log_ts), K(log_type));
......
...@@ -532,11 +532,15 @@ class ObReplayStatus { ...@@ -532,11 +532,15 @@ class ObReplayStatus {
{ {
return submit_log_task_.get_pending_submit_task_count(); return submit_log_task_.get_pending_submit_task_count();
} }
int check_and_submit_task(const common::ObPartitionKey& pkey, const uint64_t log_id, const int64_t log_ts, int check_and_submit_task(const common::ObPartitionKey &pkey,
const bool need_replay, const clog::ObLogType log_type, const int64_t sw_next_replay_log_ts); const uint64_t log_id,
int submit_restore_task(const common::ObPartitionKey& pkey, const uint64_t log_id, const int64_t log_ts); const int64_t log_ts,
const bool need_replay,
int push_task(ObReplayLogTask& task, uint64_t task_sign); const clog::ObLogType log_type,
const int64_t next_replay_log_ts);
int submit_restore_task(const common::ObPartitionKey &pkey, const uint64_t log_id,
const int64_t log_ts);
int push_task(ObReplayLogTask &task, uint64_t task_sign);
void add_task(ObReplayLogTask& task); void add_task(ObReplayLogTask& task);
void remove_task(ObReplayLogTask& task); void remove_task(ObReplayLogTask& task);
void dec_task_count(const common::ObPartitionKey& pkey); void dec_task_count(const common::ObPartitionKey& pkey);
......
...@@ -507,8 +507,12 @@ bool ObLogReplayEngine::is_valid_param( ...@@ -507,8 +507,12 @@ bool ObLogReplayEngine::is_valid_param(
} }
/* -----------------submit log task related begin------ */ /* -----------------submit log task related begin------ */
int ObLogReplayEngine::submit_replay_log_task_sequentially(const common::ObPartitionKey& pkey, const uint64_t log_id, int ObLogReplayEngine::submit_replay_log_task_sequentially(const common::ObPartitionKey &pkey,
const int64_t log_ts, const bool need_replay, const clog::ObLogType log_type, const int64_t sw_next_replay_log_ts) const uint64_t log_id,
const int64_t log_ts,
const bool need_replay,
const ObLogType log_type,
const int64_t next_replay_log_ts)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
...@@ -549,12 +553,13 @@ int ObLogReplayEngine::submit_replay_log_task_sequentially(const common::ObParti ...@@ -549,12 +553,13 @@ int ObLogReplayEngine::submit_replay_log_task_sequentially(const common::ObParti
} else { } else {
ObReplayStatus::RLockGuard rlock_guard(replay_status->get_rwlock()); ObReplayStatus::RLockGuard rlock_guard(replay_status->get_rwlock());
bool need_submit = true; bool need_submit = true;
if (need_replay && if (need_replay && OB_FAIL(check_need_submit_current_log_(pkey, *partition, log_id, log_ts,
OB_FAIL(check_need_submit_current_log_(pkey, *partition, log_id, log_ts, *replay_status, need_submit))) { *replay_status, need_submit))) {
REPLAY_LOG( REPLAY_LOG(ERROR, "failed to check_need_submit_current_log_",
ERROR, "failed to check_need_submit_current_log_", K(pkey), K(need_replay), K(log_id), K(log_ts), K(ret)); K(pkey), K(need_replay), K(log_id), K(log_ts), K(ret));
} else if (OB_FAIL(replay_status->check_and_submit_task( } else if (OB_FAIL(replay_status->check_and_submit_task(pkey, log_id, log_ts,
pkey, log_id, log_ts, (need_replay && need_submit), log_type, sw_next_replay_log_ts))) { (need_replay && need_submit),
log_type, next_replay_log_ts))) {
if (OB_EAGAIN == ret) { if (OB_EAGAIN == ret) {
if (REACH_TIME_INTERVAL(1000 * 1000)) { if (REACH_TIME_INTERVAL(1000 * 1000)) {
REPLAY_LOG(WARN, REPLAY_LOG(WARN,
......
...@@ -48,29 +48,34 @@ class ObLogReplayEngine : public ObILogReplayEngine, public lib::TGTaskHandler { ...@@ -48,29 +48,34 @@ class ObLogReplayEngine : public ObILogReplayEngine, public lib::TGTaskHandler {
typedef storage::ObReplayLogTask ObReplayLogTask; typedef storage::ObReplayLogTask ObReplayLogTask;
ObLogReplayEngine(); ObLogReplayEngine();
virtual ~ObLogReplayEngine(); virtual ~ObLogReplayEngine();
public: public:
virtual int init(transaction::ObTransService* trans_replay_service, storage::ObPartitionService* partition_service, virtual int init(transaction::ObTransService *trans_replay_service,
const ObLogReplayEngineConfig& config); storage::ObPartitionService *partition_service,
virtual int submit_replay_log_task_sequentially(const common::ObPartitionKey& pkey, const uint64_t log_id, const ObLogReplayEngineConfig &config);
const int64_t log_ts, const bool need_replay, const clog::ObLogType log_type, virtual int submit_replay_log_task_sequentially(const common::ObPartitionKey &pkey,
const int64_t sw_next_replay_log_ts); const uint64_t log_id,
virtual int submit_replay_log_task_by_restore( const int64_t log_submit_ts,
const common::ObPartitionKey& pkey, const uint64_t log_id, const int64_t log_ts); const bool need_replay,
const clog::ObLogType log_type,
virtual int add_partition(const common::ObPartitionKey& partition_key); const int64_t next_replay_log_ts);
virtual int remove_partition(const common::ObPartitionKey& pkey, storage::ObIPartitionGroup* partition); virtual int submit_replay_log_task_by_restore(const common::ObPartitionKey &pkey,
virtual int remove_partition(const common::ObPartitionKey& partition_key); const uint64_t log_id,
virtual int reset_partition(const common::ObPartitionKey& partition_key); const int64_t log_ts);
virtual int set_need_filter_trans_log(const common::ObPartitionKey& partition_key, const bool need_filter);
virtual int add_partition(const common::ObPartitionKey &partition_key);
virtual int is_replay_finished(const common::ObPartitionKey& partition_key, bool& is_finished) const; virtual int remove_partition(const common::ObPartitionKey &pkey,
virtual int is_submit_finished(const common::ObPartitionKey& partition_key, bool& is_finished) const; storage::ObIPartitionGroup *partition);
virtual int check_can_receive_log(const common::ObPartitionKey& pkey, bool& can_receive_log) const; virtual int remove_partition(const common::ObPartitionKey &partition_key);
virtual int get_pending_submit_task_count(const common::ObPartitionKey& partition_key, int64_t& pending_count) const; virtual int reset_partition(const common::ObPartitionKey &partition_key);
virtual void handle(void* task); virtual int set_need_filter_trans_log(const common::ObPartitionKey &partition_key,
virtual int submit_task_into_queue(storage::ObReplayTask* task); const bool need_filter);
virtual int is_replay_finished(const common::ObPartitionKey &partition_key, bool &is_finished) const;
virtual int is_submit_finished(const common::ObPartitionKey &partition_key, bool &is_finished) const;
virtual int check_can_receive_log(const common::ObPartitionKey &pkey, bool &can_receive_log) const;
virtual int get_pending_submit_task_count(const common::ObPartitionKey &partition_key, int64_t &pending_count) const;
virtual void handle(void *task);
virtual int submit_task_into_queue(storage::ObReplayTask *task);
virtual int is_tenant_out_of_memory(const common::ObPartitionKey& partition_key, bool& is_out_of_mem); virtual int is_tenant_out_of_memory(const common::ObPartitionKey& partition_key, bool& is_out_of_mem);
virtual void stop(); virtual void stop();
virtual void wait(); virtual void wait();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册