提交 357951e9 编写于 作者: M Minghao Li

refactor(sync): modify some code

上级 28a5a9d7
......@@ -252,6 +252,7 @@ void syncNodeRelease(SSyncNode* pNode);
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term);
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm);
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr);
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr);
......@@ -306,7 +307,6 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode);
bool syncNodeIsMnode(SSyncNode* pSyncNode);
int32_t syncNodePeerStateInit(SSyncNode* pSyncNode);
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm);
// trace log
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s);
......
......@@ -308,24 +308,31 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
}
int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
syncLogRecvAppendEntries(ths, pMsg, "ignore, maybe replica already dropped");
goto _IGNORE;
}
// prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = false;
pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
// pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
pReply->matchIndex = SYNC_INDEX_INVALID;
pReply->lastSendIndex = pMsg->prevLogIndex + 1;
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->startTime = ths->startTime;
if (pMsg->term < ths->pRaftStore->currentTerm) {
syncLogRecvAppendEntries(ths, pMsg, "reject, small term");
goto _SEND_RESPONSE;
}
if (pMsg->term > ths->pRaftStore->currentTerm) {
pReply->term = pMsg->term;
goto _SEND_RESPONSE;
}
syncNodeStepDown(ths, pMsg->term);
......@@ -335,6 +342,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
SyncIndex lastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
if (pMsg->prevLogIndex > lastIndex) {
syncLogRecvAppendEntries(ths, pMsg, "reject, index not match");
goto _SEND_RESPONSE;
}
......@@ -343,6 +351,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
ASSERT(myPreLogTerm != SYNC_TERM_INVALID);
if (myPreLogTerm != pMsg->prevLogTerm) {
syncLogRecvAppendEntries(ths, pMsg, "reject, pre-term not match");
goto _SEND_RESPONSE;
}
}
......@@ -357,6 +366,71 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
SyncIndex appendIndex = pMsg->prevLogIndex + 1;
SSyncRaftEntry* pLocalEntry = NULL;
int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, appendIndex, &pLocalEntry);
if (code == 0) {
if (pLocalEntry->term == pAppendEntry->term) {
// do nothing
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "log match, do nothing, index:%ld", appendIndex);
syncNodeEventLog(ths, logBuf);
} else {
// truncate
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex);
if (code != 0) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "ignore, truncate error, append-index:%ld", appendIndex);
syncLogRecvAppendEntries(ths, pMsg, logBuf);
goto _IGNORE;
}
// append
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
if (code != 0) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "ignore, append error, append-index:%ld", appendIndex);
syncLogRecvAppendEntries(ths, pMsg, logBuf);
goto _IGNORE;
}
}
} else {
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
// log not exist
// truncate
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex);
if (code != 0) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "ignore, log not exist, truncate error, append-index:%ld", appendIndex);
syncLogRecvAppendEntries(ths, pMsg, logBuf);
goto _IGNORE;
}
// append
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
if (code != 0) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "ignore, log not exist, append error, append-index:%ld", appendIndex);
syncLogRecvAppendEntries(ths, pMsg, logBuf);
goto _IGNORE;
}
} else {
// error
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "ignore, get local entry error, append-index:%ld", appendIndex);
syncLogRecvAppendEntries(ths, pMsg, logBuf);
goto _IGNORE;
}
}
#if 0
if (code != 0 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex);
ASSERT(code == 0);
......@@ -377,17 +451,26 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
ASSERT(code == 0);
}
}
#endif
// update match index
pReply->matchIndex = pAppendEntry->index;
syncEntryDestory(pLocalEntry);
syncEntryDestory(pAppendEntry);
}
} else {
// no append entries, do nothing
// maybe has extra entries, no harm
// update match index
pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
pReply->matchIndex = pMsg->prevLogIndex;
}
// maybe update commit index, leader notice me
syncNodeFollowerCommit(ths, pMsg->commitIndex);
syncLogRecvAppendEntries(ths, pMsg, "accept");
goto _SEND_RESPONSE;
_IGNORE:
......
......@@ -133,5 +133,6 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMs
}
}
syncLogRecvAppendEntriesReply(ths, pMsg, "process");
return 0;
}
\ No newline at end of file
......@@ -2154,6 +2154,30 @@ void syncNodeUpdateTermWithoutStepDown(SSyncNode* pSyncNode, SyncTerm term) {
}
}
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
ASSERT(pSyncNode->pRaftStore->currentTerm <= newTerm);
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "step down, new-term:%lu, current-term:%lu", newTerm,
pSyncNode->pRaftStore->currentTerm);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
if (pSyncNode->pRaftStore->currentTerm < newTerm) {
raftStoreSetTerm(pSyncNode->pRaftStore, newTerm);
char tmpBuf[64];
snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRIu64, newTerm);
syncNodeBecomeFollower(pSyncNode, tmpBuf);
raftStoreClearVote(pSyncNode->pRaftStore);
} else {
if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
syncNodeBecomeFollower(pSyncNode, "step down");
}
}
}
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
......@@ -2243,6 +2267,9 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
}
// init peer mgr
syncNodePeerStateInit(pSyncNode);
// update sender private term
SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
if (pMySender != NULL) {
......@@ -2316,23 +2343,6 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
return 0;
}
void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
ASSERT(pSyncNode->pRaftStore->currentTerm <= newTerm);
if (pSyncNode->pRaftStore->currentTerm < newTerm) {
raftStoreSetTerm(pSyncNode->pRaftStore, newTerm);
char tmpBuf[64];
snprintf(tmpBuf, sizeof(tmpBuf), "step down, update term to %" PRIu64, newTerm);
syncNodeBecomeFollower(pSyncNode, tmpBuf);
raftStoreClearVote(pSyncNode->pRaftStore);
} else {
if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER) {
syncNodeBecomeFollower(pSyncNode, "step down");
}
}
}
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
......@@ -2831,17 +2841,20 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
SRpcMsg rpcMsg;
syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg);
#if 0
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
syncNodeBecomeFollower(ths, "become follower by hb");
syncNodeStepDown(ths, pMsg->term);
}
#endif
if (pMsg->term == ths->pRaftStore->currentTerm) {
// sInfo("vgId:%d, heartbeat reset timer", ths->vgId);
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
syncNodeResetElectTimer(ths);
#if 0
if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
syncNodeFollowerCommit(ths, pMsg->commitIndex);
}
#endif
}
/*
......
......@@ -263,11 +263,15 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
index, err, err, errStr, sysErr, sysErrStr);
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
// syncNodeEventLog(pData->pSyncNode, logBuf);
snprintf(logBuf, sizeof(logBuf),
"wal read not exist, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index, err, err,
errStr, sysErr, sysErrStr);
syncNodeEventLog(pData->pSyncNode, logBuf);
} else {
snprintf(logBuf, sizeof(logBuf), "wal read error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
index, err, err, errStr, sysErr, sysErrStr);
syncNodeErrorLog(pData->pSyncNode, logBuf);
}
} while (0);
......
......@@ -94,8 +94,18 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) {
ASSERT(pMsg != NULL);
} else {
syncNodeLog3("", pSyncNode);
ASSERT(0);
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "replicate to %s:%d error, next-index:%ld", host, port, nextIndex);
syncNodeErrorLog(pSyncNode, logBuf);
} while (0);
syncAppendEntriesDestroy(pMsg);
return -1;
}
}
......
......@@ -111,14 +111,8 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg) {
// maybe update term
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term);
#if 0
if (logOK) {
syncNodeUpdateTerm(ths, pMsg->term);
} else {
syncNodeUpdateTermWithoutStepDown(ths, pMsg->term);
}
#endif
syncNodeStepDown(ths, pMsg->term);
// syncNodeUpdateTerm(ths, pMsg->term);
}
ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
......@@ -129,6 +123,9 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg) {
// vote again, no harm
raftStoreVote(ths->pRaftStore, &(pMsg->srcId));
// candidate ?
syncNodeStepDown(ths, ths->pRaftStore->currentTerm);
// forbid elect for this round
syncNodeResetElectTimer(ths);
}
......
......@@ -60,6 +60,7 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncLogRecvRequestVoteReply(ths, pMsg, "error term");
syncNodeStepDown(ths, pMsg->term);
return -1;
}
......
......@@ -408,6 +408,8 @@ char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
// calculate <start, end> index
syncNodeEventLog(pSyncNode, "start snapshot ...");
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
if (pSender == NULL) {
// create sender
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册