提交 e13c90c5 编写于 作者: P plum-lihui

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

......@@ -44,6 +44,10 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont;
// delete msg handle
SRpcMsg rpcMsg = {0};
syncGetAndDelRespRpc(pMnode->syncMgmt.sync, cbMeta.seqNum, &rpcMsg.info);
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
pMgmt->errCode = cbMeta.code;
mDebug("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
......
......@@ -71,6 +71,8 @@ int32_t syncNodeRequestVotePeersSnapshot(SSyncNode* pSyncNode) {
}
int32_t syncNodeElect(SSyncNode* pSyncNode) {
syncNodeEventLog(pSyncNode, "begin election");
int32_t ret = 0;
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
syncNodeFollower2Candidate(pSyncNode);
......@@ -120,12 +122,15 @@ int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, con
int32_t ret = 0;
do {
char host[128];
char host[64];
uint16_t port;
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-request-vote to %s:%d, {term:%" PRIu64 ", last-index:%" PRId64 ", last-term:%" PRIu64
"}",
pSyncNode->vgId, host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-request-vote to %s:%d {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "", host, port,
pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
SRpcMsg rpcMsg;
......
......@@ -999,7 +999,18 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// init TLA+ log vars
pSyncNode->pLogStore = logStoreCreate(pSyncNode);
ASSERT(pSyncNode->pLogStore != NULL);
pSyncNode->commitIndex = SYNC_INDEX_INVALID;
SyncIndex commitIndex = SYNC_INDEX_INVALID;
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot = {0};
int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
ASSERT(code == 0);
if (snapshot.lastApplyIndex > commitIndex) {
commitIndex = snapshot.lastApplyIndex;
syncNodeEventLog(pSyncNode, "reset commit index by snapshot");
}
}
pSyncNode->commitIndex = commitIndex;
// timer ms init
pSyncNode->pingBaseLine = PING_TIMER_MS;
......@@ -2061,21 +2072,21 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
syncNodeLog2("==state change syncNodeFollower2Candidate==", pSyncNode);
syncNodeEventLog(pSyncNode, "follower to candidate");
}
void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
syncNodeBecomeFollower(pSyncNode, "leader to follower");
syncNodeLog2("==state change syncNodeLeader2Follower==", pSyncNode);
syncNodeEventLog(pSyncNode, "leader to follower");
}
void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
syncNodeBecomeFollower(pSyncNode, "candidate to follower");
syncNodeLog2("==state change syncNodeCandidate2Follower==", pSyncNode);
syncNodeEventLog(pSyncNode, "candidate to follower");
}
// raft vote --------------
......
......@@ -45,8 +45,6 @@
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t ret = 0;
syncRequestVoteLog2("==syncNodeOnRequestVoteCb==", pMsg);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
do {
......@@ -55,8 +53,8 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
", maybe replica already dropped",
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
"}, maybe replica already dropped",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
syncNodeEventLog(ths, logBuf);
} while (0);
......@@ -98,8 +96,8 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
", reply-grant:%d",
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
"}, reply-grant:%d",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted);
syncNodeEventLog(ths, logBuf);
} while (0);
......@@ -220,8 +218,8 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
", maybe replica already dropped",
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
"}, maybe replica already dropped",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
syncNodeEventLog(ths, logBuf);
} while (0);
......@@ -262,7 +260,7 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
", reply-grant:%d}",
"}, reply-grant:%d",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted);
syncNodeEventLog(ths, logBuf);
} while (0);
......
......@@ -40,22 +40,41 @@
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0;
// print log
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteReplyCb== term:%" PRIu64, ths->pRaftStore->currentTerm);
syncRequestVoteReplyLog2(logBuf, pMsg);
// trace log
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host,
port, pMsg->term, pMsg->voteGranted);
syncNodeEventLog(ths, logBuf);
} while (0);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
sInfo("recv SyncRequestVoteReply, maybe replica already dropped");
return ret;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port,
pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
return -1;
}
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("recv SyncRequestVoteReply, drop stale response, receive_term:%" PRIu64 " current_term:%" PRIu64, pMsg->term,
ths->pRaftStore->currentTerm);
return ret;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port,
pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
return -1;
}
// ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm));
......@@ -65,12 +84,14 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
// }
if (pMsg->term > ths->pRaftStore->currentTerm) {
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "syncNodeOnRequestVoteReplyCb error term, receive:%" PRIu64 " current:%" PRIu64,
pMsg->term, ths->pRaftStore->currentTerm);
syncNodePrint2(logBuf, ths);
sError("%s", logBuf);
return ret;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term",
host, port, pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
return -1;
}
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
......@@ -99,7 +120,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
}
}
return ret;
return 0;
}
#if 0
......@@ -164,22 +185,41 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0;
// print log
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "recv SyncRequestVoteReply, term:%" PRIu64, ths->pRaftStore->currentTerm);
syncRequestVoteReplyLog2(logBuf, pMsg);
// trace log
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host,
port, pMsg->term, pMsg->voteGranted);
syncNodeEventLog(ths, logBuf);
} while (0);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
sInfo("recv SyncRequestVoteReply, maybe replica already dropped");
return ret;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port,
pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
return -1;
}
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("recv SyncRequestVoteReply, drop stale response, receive_term:%" PRIu64 " current_term:%" PRIu64, pMsg->term,
ths->pRaftStore->currentTerm);
return ret;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port,
pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
return -1;
}
// ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm));
......@@ -189,13 +229,14 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
// }
if (pMsg->term > ths->pRaftStore->currentTerm) {
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf),
"recv SyncRequestVoteReply, error term, receive_term:%" PRIu64 " current_term:%" PRIu64, pMsg->term,
ths->pRaftStore->currentTerm);
syncNodePrint2(logBuf, ths);
sError("%s", logBuf);
return ret;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term",
host, port, pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
return -1;
}
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
......@@ -224,5 +265,5 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
}
}
return ret;
return 0;
}
\ No newline at end of file
......@@ -573,6 +573,12 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
}
// maybe update term
if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->pRaftStore->currentTerm) {
pReceiver->pSyncNode->pRaftStore->currentTerm = pReceiver->snapshot.lastApplyTerm;
raftStorePersist(pReceiver->pSyncNode->pRaftStore);
}
// stop writer, apply data
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
&(pReceiver->snapshot));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册