提交 73686254 编写于 作者: M Minghao Li

fix(sync): restart with config change

上级 b351fd82
......@@ -201,8 +201,8 @@ void syncNodeRelease(SSyncNode* pNode);
// raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
void syncNodeBecomeFollower(SSyncNode* pSyncNode);
void syncNodeBecomeLeader(SSyncNode* pSyncNode);
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr);
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr);
void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
......
......@@ -150,7 +150,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
"ths->state:%d, logOK:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
syncNodeBecomeFollower(ths);
syncNodeBecomeFollower(ths, "from candidate by append entries");
// ret or reply?
return ret;
......@@ -380,9 +380,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// change isStandBy to normal
if (!isDrop) {
if (ths->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(ths);
syncNodeBecomeLeader(ths, "config change");
} else {
syncNodeBecomeFollower(ths);
syncNodeBecomeFollower(ths, "config change");
}
}
......@@ -469,7 +469,7 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
// delete confict entries
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0);
sInfo("sync event log truncate, from %ld to %ld", delBegin, delEnd);
sInfo("sync event vgId:%d log truncate, from %ld to %ld", ths->vgId, delBegin, delEnd);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
return code;
......@@ -571,7 +571,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
if (condition) {
sTrace("recv SyncAppendEntries, candidate to follower");
syncNodeBecomeFollower(ths);
syncNodeBecomeFollower(ths, "from candidate by append entries");
// do not reply?
return ret;
}
......@@ -742,6 +742,15 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
if (pMsg->commitIndex > ths->commitIndex) {
// has commit entry in local
if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
// advance commit index to sanpshot first
SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
if (snapshot.lastApplyIndex > ths->commitIndex) {
sInfo("sync event vgId:%d commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, ths->commitIndex,
snapshot.lastApplyIndex, syncUtilState2String(ths->state));
ths->commitIndex = snapshot.lastApplyIndex;
}
SyncIndex beginIndex = ths->commitIndex + 1;
SyncIndex endIndex = pMsg->commitIndex;
......
......@@ -184,8 +184,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
char* s = snapshotSender2Str(pSender);
sInfo(
"sync event snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu sender:%s",
host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, s);
"sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu "
"sender:%s",
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, s);
taosMemoryFree(s);
}
......
......@@ -48,10 +48,18 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex);
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex);
// advance commit index to sanpshot first
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
sInfo("sync event vgId:%d commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId,
pSyncNode->commitIndex, snapshot.lastApplyIndex, syncUtilState2String(pSyncNode->state));
pSyncNode->commitIndex = snapshot.lastApplyIndex;
}
// update commit index
SyncIndex newCommitIndex = pSyncNode->commitIndex;
for (SyncIndex index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore); index > pSyncNode->commitIndex;
--index) {
for (SyncIndex index = syncNodeGetLastIndex(pSyncNode); index > pSyncNode->commitIndex; --index) {
bool agree = syncAgree(pSyncNode, index);
sTrace("syncMaybeAdvanceCommitIndex syncAgree:%d, index:%ld, pSyncNode->commitIndex:%ld", agree, index,
pSyncNode->commitIndex);
......
......@@ -411,6 +411,8 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo;
sInfo("sync event vgId:%d sync open", pSyncInfo->vgId);
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
assert(pSyncNode != NULL);
memset(pSyncNode, 0, sizeof(SSyncNode));
......@@ -628,7 +630,7 @@ void syncNodeStart(SSyncNode* pSyncNode) {
// start raft
if (pSyncNode->replicaNum == 1) {
raftStoreNextTerm(pSyncNode->pRaftStore);
syncNodeBecomeLeader(pSyncNode);
syncNodeBecomeLeader(pSyncNode, "one replica start");
syncNodeLog2("==state change become leader immediately==", pSyncNode);
......@@ -654,7 +656,7 @@ void syncNodeStart(SSyncNode* pSyncNode) {
return;
}
syncNodeBecomeFollower(pSyncNode);
syncNodeBecomeFollower(pSyncNode, "first start");
// for test
int32_t ret = 0;
......@@ -687,6 +689,8 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
}
void syncNodeClose(SSyncNode* pSyncNode) {
sInfo("sync event vgId:%d sync close", pSyncNode->vgId);
int32_t ret;
assert(pSyncNode != NULL);
......@@ -1149,13 +1153,13 @@ void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid)
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
if (term > pSyncNode->pRaftStore->currentTerm) {
raftStoreSetTerm(pSyncNode->pRaftStore, term);
syncNodeBecomeFollower(pSyncNode);
syncNodeBecomeFollower(pSyncNode, "update term");
raftStoreClearVote(pSyncNode->pRaftStore);
}
}
void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
sInfo("sync event become follower");
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
sInfo("sync event vgId:%d become follower, %s", pSyncNode->vgId, debugStr);
// maybe clear leader cache
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
......@@ -1188,8 +1192,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
// evoterLog |-> voterLog[i]]}
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
sInfo("sync event become leader");
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
sInfo("sync event vgId:%d become leader, %s", pSyncNode->vgId, debugStr);
// state change
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
......@@ -1241,7 +1245,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
assert(voteGrantedMajority(pSyncNode->pVotesGranted));
syncNodeBecomeLeader(pSyncNode);
syncNodeBecomeLeader(pSyncNode, "candidate to leader");
syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode);
......@@ -1264,14 +1268,14 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
syncNodeBecomeFollower(pSyncNode);
syncNodeBecomeFollower(pSyncNode, "leader to follower");
syncNodeLog2("==state change syncNodeLeader2Follower==", pSyncNode);
}
void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
syncNodeBecomeFollower(pSyncNode);
syncNodeBecomeFollower(pSyncNode, "candidate to follower");
syncNodeLog2("==state change syncNodeCandidate2Follower==", pSyncNode);
}
......@@ -1728,17 +1732,19 @@ const char* syncStr(ESyncState state) {
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
int32_t code = 0;
ESyncState state = flag;
sInfo("sync event commit from index:%" PRId64 " to index:%" PRId64 ", %s", beginIndex, endIndex,
syncUtilState2String(state));
sInfo("sync event vgId:%d commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex,
endIndex, syncUtilState2String(state));
// maybe execute by leader, skip snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (ths->pFsm->FpGetSnapshot != NULL) {
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
}
if (beginIndex <= snapshot.lastApplyIndex) {
beginIndex = snapshot.lastApplyIndex + 1;
}
/*
// maybe execute by leader, skip snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (ths->pFsm->FpGetSnapshot != NULL) {
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
}
if (beginIndex <= snapshot.lastApplyIndex) {
beginIndex = snapshot.lastApplyIndex + 1;
}
*/
// execute fsm
if (ths->pFsm != NULL) {
......@@ -1795,9 +1801,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
// change isStandBy to normal
if (!isDrop) {
if (ths->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(ths);
syncNodeBecomeLeader(ths, "config change");
} else {
syncNodeBecomeFollower(ths);
syncNodeBecomeFollower(ths, "config change");
}
}
......
......@@ -162,8 +162,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
walFsync(pWal, true);
sTrace("sync event write index:%ld, %s, isStandBy:%d, msgType:%s, originalRpcType:%s", pEntry->index,
syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy,
sTrace("sync event vgId:%d write index:%ld, %s, isStandBy:%d, msgType:%s, originalRpcType:%s", pData->pSyncNode->vgId,
pEntry->index, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy,
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType));
return code;
......
......@@ -109,8 +109,10 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
char host[128];
uint16_t port;
syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
sTrace("sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s", host,
port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, msgStr);
sTrace(
"sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, msgStr);
taosMemoryFree(msgStr);
syncSnapshotSendDestroy(pMsg);
......@@ -230,13 +232,17 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
uint16_t port;
syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
sTrace("sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s",
host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
msgStr);
sTrace(
"sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send "
"msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, msgStr);
} else {
sTrace("sync event snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s",
host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
msgStr);
sTrace(
"sync event vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send "
"msg:%s",
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, msgStr);
}
taosMemoryFree(msgStr);
......@@ -264,8 +270,8 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
char host[128];
uint16_t port;
syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
sTrace("sync event snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", host, port, pSender->seq, pSender->ack,
msgStr);
sTrace("sync event vgId:%d snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", pSender->pSyncNode->vgId, host,
port, pSender->seq, pSender->ack, msgStr);
taosMemoryFree(msgStr);
syncSnapshotSendDestroy(pMsg);
......@@ -476,8 +482,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
char host[128];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sTrace("sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host, port,
pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
sTrace("sync event vgId:%d snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
......@@ -495,9 +501,11 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sInfo(
"sync event snapshot recv from %s:%d finish, update log begin index:%ld, snapshot.lastApplyIndex:%ld, "
"sync event vgId:%d snapshot recv from %s:%d finish, update log begin index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, raft log:%s",
host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logSimpleStr);
pSyncNode->vgId, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
logSimpleStr);
taosMemoryFree(logSimpleStr);
pReceiver->pWriter = NULL;
......@@ -506,8 +514,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
needRsp = true;
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host, port,
pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
sTrace("sync event vgId:%d snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
......@@ -520,8 +528,9 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host,
port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
sTrace(
"sync event vgId:%d snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s",
pReceiver->pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
......@@ -539,8 +548,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
char host[128];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sTrace("sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host,
port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
sTrace("sync event vgId:%d snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s",
pSyncNode->vgId, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册