提交 bf634a84 编写于 作者: B Benguang Zhao

enh: check lastVer and commit version during syncNodeOpen

上级 cf14200d
......@@ -120,16 +120,15 @@ static FORCE_INLINE int32_t syncLogGetNextRetryBackoff(SSyncLogReplMgr* pMgr) {
}
static FORCE_INLINE int32_t syncLogReplMgrUpdateTerm(SSyncLogReplMgr* pMgr, SyncIndex index, SyncTerm term) {
if (index < pMgr->startIndex || index >= pMgr->endIndex) {
return -1;
}
if (pMgr->endIndex == 0) return -1;
ASSERT(pMgr->startIndex <= index && index < pMgr->endIndex);
pMgr->states[(index + pMgr->size) % pMgr->size].term = term;
return 0;
}
SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SRaftId* pDestId,
bool* pBarrier);
int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
SRaftId* pDestId, bool* pBarrier);
int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogBufferReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
......
......@@ -328,9 +328,16 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
SyncTerm commitTerm = snapshot.lastApplyTerm;
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
SyncIndex toIndex = lastVer;
ASSERT(lastVer >= commitIndex);
if (lastVer < commitIndex) {
sError("vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer: %" PRId64
", tsdb commit version: %" PRId64 "",
pNode->vgId, lastVer, commitIndex);
// TODO: terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
goto _err;
}
ASSERT(lastVer >= commitIndex);
SyncIndex toIndex = lastVer;
// update match index
pBuf->commitIndex = commitIndex;
pBuf->matchIndex = toIndex;
......@@ -547,7 +554,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
ASSERT(prevLogIndex == pMatch->index);
if (pMatch->term != prevLogTerm) {
sError(
sInfo(
"vgId:%d, mismatching raft log entries encountered. "
"{ index:%" PRId64 ", term:%" PRId64
" } "
......
......@@ -131,8 +131,8 @@ static FORCE_INLINE bool syncLogIsReplicationBarrier(SSyncRaftEntry* pEntry) {
return pEntry->originalRpcType == TDMT_SYNC_NOOP;
}
int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SRaftId* pDestId,
bool* pBarrier) {
int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm,
SRaftId* pDestId, bool* pBarrier) {
SSyncRaftEntry* pEntry = NULL;
SyncAppendEntries* pMsgOut = NULL;
bool inBuf = false;
......@@ -140,8 +140,6 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn
SyncTerm prevLogTerm = -1;
SSyncLogBuffer* pBuf = pNode->pLogBuf;
sDebug("vgId:%d, replicate one msg index: %" PRId64 " to dest: 0x%016" PRIx64, pNode->vgId, index, pDestId->addr);
pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
if (pEntry == NULL) {
sError("vgId:%d, failed to get raft entry for index: %" PRId64 "", pNode->vgId, index);
......@@ -154,7 +152,7 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn
sError("vgId:%d, failed to get prev log term since %s. index: %" PRId64 "", pNode->vgId, terrstr(), index);
goto _out;
}
(void)syncLogReplMgrUpdateTerm(pMgr, pEntry->index, pEntry->term);
if (pTerm) *pTerm = pEntry->term;
pMsgOut = syncLogToAppendEntries(pNode, pEntry, prevLogTerm);
if (pMsgOut == NULL) {
......@@ -165,6 +163,9 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn
(void)syncNodeSendAppendEntries(pNode, pDestId, pMsgOut);
ret = 0;
sInfo("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64,
pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr);
_out:
syncAppendEntriesDestroy(pMsgOut);
pMsgOut = NULL;
......
......@@ -1140,13 +1140,15 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
bool barrier = false;
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, pDestId, &barrier) < 0) {
SyncTerm term = -1;
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, pDestId->addr);
goto _out;
}
ASSERT(barrier == pMgr->states[pos].barrier);
pMgr->states[pos].timeMs = nowMs;
pMgr->states[pos].term = term;
pMgr->states[pos].acked = false;
retried = true;
tsRetryCnt++;
......@@ -1162,6 +1164,7 @@ _out:
int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode,
SyncAppendEntriesReply* pMsg) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
SRaftId destId = pMsg->srcId;
ASSERT(pMgr->restored == false);
......@@ -1170,6 +1173,10 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
ASSERT(pMgr->matchIndex == 0);
if (pMsg->matchIndex < 0) {
pMgr->restored = true;
sInfo("vgId:%d, sync log repl mgr of the %d'th peer restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
"), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0;
}
} else {
......@@ -1182,6 +1189,10 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
if (pMsg->matchIndex == pMsg->lastSendIndex) {
pMgr->restored = true;
sInfo("vgId:%d, sync log repl mgr of the %d'th peer restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
"), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0;
}
......@@ -1191,8 +1202,9 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
// send match index
SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex);
bool barrier = false;
SyncTerm term = -1;
ASSERT(index >= 0);
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &destId, &barrier) < 0) {
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, &destId, &barrier) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, destId.addr);
return -1;
......@@ -1201,6 +1213,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
int64_t nowMs = taosGetMonoTimestampMs();
pMgr->states[index % pMgr->size].barrier = barrier;
pMgr->states[index % pMgr->size].timeMs = nowMs;
pMgr->states[index % pMgr->size].term = term;
pMgr->states[index % pMgr->size].acked = false;
pMgr->matchIndex = index;
......@@ -1212,7 +1225,9 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
taosThreadMutexLock(&pBuf->mutex);
if (pMsg->startTime != pMgr->peerStartTime) {
if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
sInfo("vgId:%d, reset sync log repl mgr in heartbeat. start time:%" PRId64 ", old start time:%" PRId64 "",
pNode->vgId, pMsg->startTime, pMgr->peerStartTime);
syncLogResetLogReplMgr(pMgr);
pMgr->peerStartTime = pMsg->startTime;
}
......@@ -1224,6 +1239,9 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync
SSyncLogBuffer* pBuf = pNode->pLogBuf;
taosThreadMutexLock(&pBuf->mutex);
if (pMsg->startTime != pMgr->peerStartTime) {
sInfo("vgId:%d, reset sync log repl mgr in append entries reply. start time:%" PRId64 ", old start time:%" PRId64
"",
pNode->vgId, pMsg->startTime, pMgr->peerStartTime);
syncLogResetLogReplMgr(pMgr);
pMgr->peerStartTime = pMsg->startTime;
}
......@@ -1252,17 +1270,19 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode
SyncIndex index = pNode->pLogBuf->matchIndex;
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
bool barrier = false;
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, pDestId, &barrier) < 0) {
SyncTerm term = -1;
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, pDestId->addr);
return -1;
}
SSyncLogBuffer* pBuf = pNode->pLogBuf;
sInfo("vgId:%d, attempted to probe the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
"), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex,
pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
sInfo("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64
". pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
")",
pNode->vgId, pMgr->peerId, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0;
}
......@@ -1273,6 +1293,7 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
int32_t batchSize = TMAX(1, pMgr->size / 20);
int32_t count = 0;
int64_t nowMs = taosGetMonoTimestampMs();
for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) {
if (batchSize < count++ || pMgr->startIndex + pMgr->size <= index) {
......@@ -1284,13 +1305,15 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
int64_t pos = index % pMgr->size;
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
bool barrier = false;
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, pDestId, &barrier) < 0) {
SyncTerm term = -1;
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, pDestId->addr);
return -1;
}
pMgr->states[pos].barrier = barrier;
pMgr->states[pos].timeMs = taosGetMonoTimestampMs();
pMgr->states[pos].timeMs = nowMs;
pMgr->states[pos].term = term;
pMgr->states[pos].acked = false;
pMgr->endIndex = index + 1;
......@@ -1685,7 +1708,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) {
// init log buffer
if (syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode) < 0) {
sError("vgId:%d, failed to init raft log buffer since %s", pSyncNode->vgId, terrstr());
ASSERT(false);
goto _error;
}
syncNodeEventLog(pSyncNode, "sync open");
......@@ -3497,6 +3520,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
pMsgReply->srcId = ths->myRaftId;
pMsgReply->term = ths->pRaftStore->currentTerm;
pMsgReply->privateTerm = 8864; // magic number
pMsgReply->startTime = ths->startTime;
SRpcMsg rpcMsg;
syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg);
......@@ -3618,6 +3642,7 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S
int64_t timeMs = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].timeMs;
ASSERT(timeMs != 0 && "no log entry found");
prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
ASSERT(prevIndex == 0 || prevLogTerm != 0);
return prevLogTerm;
}
......@@ -3669,21 +3694,6 @@ void syncLogReplicateAppendEntries(SSyncNode* pNode, SyncAppendEntries* pMsg) {
}
}
int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm) {
SyncAppendEntries* pMsgOut = syncLogToAppendEntries(pNode, pEntry, prevLogTerm);
if (pMsgOut == NULL) {
sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, pEntry->index);
goto _err;
}
// replicate pMsgOut
(void)syncLogReplicateAppendEntries(pNode, pMsgOut);
_err:
syncAppendEntriesDestroy(pMsgOut);
return 0;
}
// TLA+ Spec
// ClientRequest(i, v) ==
// /\ state[i] = Leader
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册