未验证 提交 94cc3af0 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #14686 from taosdata/feature/3.0_mhli

refactor(sync): add fake syncRestoreFromSnapshot
......@@ -384,6 +384,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
code = 0;
......
......@@ -338,12 +338,12 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL);
code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL);
code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) {
SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pMsg);
......@@ -355,6 +355,14 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
ASSERT(pSyncMsg != NULL);
code = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg);
syncSnapshotSendDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
syncSnapshotRspDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
code = vnodeSetStandBy(pVnode);
if (code != 0 && terrno != 0) code = terrno;
......
......@@ -193,9 +193,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
// start snapshot <match+1, old snapshot.end>
SSnapshot oldSnapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot);
ASSERT(oldSnapshot.lastApplyIndex >= newMatchIndex + 1);
syncNodeStartSnapshotOnce(ths, newMatchIndex + 1, oldSnapshot.lastApplyIndex, oldSnapshot.lastApplyTerm,
pMsg); // term maybe not ok?
if (oldSnapshot.lastApplyIndex > newMatchIndex) {
syncNodeStartSnapshotOnce(ths, newMatchIndex + 1, oldSnapshot.lastApplyIndex, oldSnapshot.lastApplyTerm,
pMsg); // term maybe not ok?
}
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), oldSnapshot.lastApplyIndex + 1);
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex);
......
......@@ -57,8 +57,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
pSyncNode->commitIndex = snapshot.lastApplyIndex;
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%ld to index:%ld", pSyncNode->commitIndex,
snapshot.lastApplyIndex);
snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%ld to index:%ld", commitBegin, commitEnd);
syncNodeEventLog(pSyncNode, eventLog);
}
......
......@@ -336,7 +336,7 @@ int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaft
sDebug(
"vgId:%d, send sync-append-entries-batch to %s:%d, {term:%lu, pre-index:%ld, pre-term:%lu, pterm:%lu, "
"commit:%ld, "
"datalen:%d, dataCount:%d}",
"datalen:%d, datacount:%d}",
pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm,
pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount);
} while (0);
......
system sh/stop_dnodes.sh
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
#system sh/exec.sh -n dnode2 -s stop -x SIGINT
#system sh/exec.sh -n dnode3 -s stop -x SIGINT
#system sh/exec.sh -n dnode4 -s stop -x SIGINT
......@@ -140,7 +140,6 @@ endi
sql create table ct1 using stb tags(1000)
system sh/exec.sh -n dnode4 -s stop -x SIGINT
$N = 100
$count = 0
......@@ -154,12 +153,8 @@ endw
#sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
#sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
#sql flush database db;
#system sh/exec.sh -n dnode4 -s start
#sql insert into ct1 values(now+1s, 81, 8.1, 8.1)(now+2s, -92, -9.2, -9.2)(now+3s, -73, -7.3, -7.3)
......@@ -169,7 +164,7 @@ sleep 5000
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
#system sh/exec.sh -n dnode4 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
system sh/stop_dnodes.sh
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
sleep 5000
sql use db
$N = 100
$count = 0
while $count < $N
$ms = 1591201000000 + $count
sql insert into ct1 values( $ms , $count , 2.1, 3.1)
$count = $count + 1
endw
sleep 5000
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册