提交 2547e8d0 编写于 作者: M Minghao Li

refactor: sync index

上级 ee757b00
......@@ -225,7 +225,8 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncInd
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag);
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId);
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId);
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId);
// for debug --------------
void syncNodePrint(SSyncNode* pObj);
......
......@@ -549,6 +549,15 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntries, term:%lu", ths->pRaftStore->currentTerm);
syncAppendEntriesLog2(logBuf, pMsg);
// if I am standby, be added into a raft group, I should process SyncAppendEntries msg
/*
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
sInfo("recv SyncAppendEntries maybe replica already dropped");
return ret;
}
*/
// maybe update term
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term);
......@@ -598,7 +607,36 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
return ret;
}
// case 3, accept request
// case 3, index in my snapshot
if (pMsg->term == ths->pRaftStore->currentTerm && syncNodeHasSnapshot(ths)) {
SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
if (pMsg->prevLogIndex < snapshot.lastApplyIndex) {
sTrace(
"recv SyncAppendEntries, accept, in snapshot, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d, "
"snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
// prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = true;
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->matchIndex = snapshot.lastApplyIndex - 1;
// send response
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
return ret;
}
}
// case 4, accept request
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
// has extra entries (> preIndex) in local log
SyncIndex myLastIndex = syncNodeGetLastIndex(ths);
......
......@@ -106,7 +106,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
sInfo("maybe already dropped");
sInfo("recv SyncAppendEntriesReply, maybe replica already dropped");
return ret;
}
......@@ -195,35 +195,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
sentryIndex = pSender->snapshot.lastApplyIndex;
}
}
#if 0
SyncIndex sentryIndex;
if (pSender->start && pSender->term == ths->pRaftStore->currentTerm) {
// already start
sentryIndex = pSender->snapshot.lastApplyIndex;
sTrace("sending snapshot already start: pSender->term:%lu, ths->pRaftStore->currentTerm:%lu", pSender->term,
ths->pRaftStore->currentTerm);
} else {
if (pMsg->privateTerm == pSender->privateTerm) {
sTrace("same privateTerm, pMsg->privateTerm:%lu, pSender->privateTerm:%lu, do not start snapshot again",
pMsg->privateTerm, pSender->privateTerm);
} else {
// start send snapshot, first time
sTrace(
"sending snapshot start first: pSender->term:%lu, ths->pRaftStore->currentTerm:%lu, "
"pMsg->privateTerm:%lu, pSender->privateTerm:%lu",
pSender->term, ths->pRaftStore->currentTerm, pMsg->privateTerm, pSender->privateTerm);
snapshotSenderDoStart(pSender);
pSender->start = true;
// update snapshot private term
syncIndexMgrSetTerm(ths->pNextIndex, &(pMsg->srcId), pSender->privateTerm);
}
sentryIndex = pSender->snapshot.lastApplyIndex;
}
#endif
// update nextIndex to sentryIndex + 1
if (nextIndex <= sentryIndex) {
......
......@@ -96,116 +96,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
if (pSyncNode->pFsm != NULL) {
int32_t code = syncNodeCommit(pSyncNode, beginIndex, endIndex, pSyncNode->state);
ASSERT(code == 0);
#if 0
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
if (i != SYNC_INDEX_INVALID) {
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i);
assert(pEntry != NULL);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = pSyncNode->state;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
cbMeta.flag = 0x1;
SSnapshot snapshot;
ASSERT(pSyncNode->pFsm->FpGetSnapshot != NULL);
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
bool needExecute = true;
if (cbMeta.index <= snapshot.lastApplyIndex) {
needExecute = false;
}
if (needExecute) {
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta);
}
}
// config change
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg;
SSyncCfg newSyncCfg;
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
ASSERT(ret == 0);
// update new config myIndex
bool hit = false;
for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
pSyncNode->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
newSyncCfg.myIndex = i;
hit = true;
break;
}
}
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
ASSERT(hit == true);
}
bool isDrop;
syncNodeUpdateConfig(pSyncNode, &newSyncCfg, &isDrop);
// change isStandBy to normal
if (!isDrop) {
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(pSyncNode);
} else {
syncNodeBecomeFollower(pSyncNode);
}
}
char* sOld = syncCfg2Str(&oldSyncCfg);
char* sNew = syncCfg2Str(&newSyncCfg);
sInfo("==config change== 0x1 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
taosMemoryFree(sOld);
taosMemoryFree(sNew);
if (pSyncNode->pFsm->FpReConfigCb != NULL) {
SReConfigCbMeta cbMeta = {0};
cbMeta.code = 0;
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
cbMeta.index = pEntry->index;
cbMeta.term = pEntry->term;
cbMeta.oldCfg = oldSyncCfg;
cbMeta.flag = 0x1;
cbMeta.isDrop = isDrop;
pSyncNode->pFsm->FpReConfigCb(pSyncNode->pFsm, newSyncCfg, cbMeta);
}
}
// restore finish
if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) {
if (pSyncNode->restoreFinish == false) {
if (pSyncNode->pFsm->FpRestoreFinishCb != NULL) {
pSyncNode->pFsm->FpRestoreFinishCb(pSyncNode->pFsm);
}
pSyncNode->restoreFinish = true;
sInfo("==syncMaybeAdvanceCommitIndex== restoreFinish set true %p vgId:%d", pSyncNode, pSyncNode->vgId);
/*
tsem_post(&pSyncNode->restoreSem);
sInfo("==syncMaybeAdvanceCommitIndex== RestoreFinish tsem_post %p", pSyncNode);
*/
}
}
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
}
}
#endif
}
}
}
......
......@@ -835,6 +835,10 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet);
if (pSyncNode->FpSendMsg != NULL) {
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "==syncNodeSendMsgById== msgType:%d", pMsg->msgType);
syncRpcMsgLog2(logBuf, pMsg);
// htonl
syncUtilMsgHtoN(pMsg->pCont);
......@@ -1817,4 +1821,14 @@ bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) {
}
}
return false;
}
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) {
SSyncSnapshotSender* pSender = NULL;
for (int i = 0; i < ths->replicaNum; ++i) {
if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) {
pSender = (ths->senders)[i];
}
}
return pSender;
}
\ No newline at end of file
......@@ -44,6 +44,8 @@ static SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
// refactor, log[0 .. n] ==> log[m .. n]
static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex) {
sTrace("raftLogSetBeginIndex beginIndex:%ld", beginIndex);
// if beginIndex == 0, donot need call this funciton
ASSERT(beginIndex > 0);
......
......@@ -103,6 +103,12 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
snprintf(logBuf, sizeof(logBuf), "recv SyncRequestVote, currentTerm:%lu", ths->pRaftStore->currentTerm);
syncRequestVoteLog2(logBuf, pMsg);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
sInfo("recv SyncRequestVote maybe replica already dropped");
return ret;
}
// maybe update term
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term);
......
......@@ -101,6 +101,12 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
snprintf(logBuf, sizeof(logBuf), "recv SyncRequestVoteReply, term:%lu", ths->pRaftStore->currentTerm);
syncRequestVoteReplyLog2(logBuf, pMsg);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
sInfo("recv SyncRequestVoteReply, maybe replica already dropped");
return ret;
}
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("recv SyncRequestVoteReply, drop stale response, receive_term:%lu current_term:%lu", pMsg->term,
......
......@@ -438,7 +438,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
needRsp = true;
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("snapshot recv begin ack:%d recv msg:%s", pReceiver->ack, msgStr);
sTrace("snapshot recv begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
......@@ -448,7 +448,9 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
sInfo("snapshot receive finish, update log begin index:%ld, raft log:%s", pMsg->lastIndex + 1, logSimpleStr);
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
sInfo("snapshot recv finish, update log begin index:%ld, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu, raft log:%s", pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logSimpleStr);
taosMemoryFree(logSimpleStr);
// walRestoreFromSnapshot(pSyncNode->pWal, pMsg->lastIndex);
......@@ -460,7 +462,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
needRsp = true;
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("snapshot recv end ack:%d recv msg:%s", pReceiver->ack, msgStr);
sTrace("snapshot recv end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
......@@ -469,7 +471,9 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
needRsp = false;
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("snapshot recv force close ack:%d recv msg:%s", pReceiver->ack, msgStr);
sTrace("snapshot recv force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
......@@ -481,7 +485,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
needRsp = true;
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("snapshot recv receiving ack:%d recv msg:%s", pReceiver->ack, msgStr);
sTrace("snapshot recv receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
taosMemoryFree(msgStr);
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册