diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index c38a41983a8a02accb2af74c7b3b9220ac24d3ee..73abe3f4693e8dd19bb596180f505e512356aaf3 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -244,6 +244,9 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg int32_t syncLeaderTransfer(int64_t rid); int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader); +int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex); +int32_t syncEndSnapshot(int64_t rid); + #ifdef __cplusplus } #endif diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index bd49384af0be6e37bfb65b25d5183fcc8d3b3650..7aafaf028ed79eeb0799b4a6bc3b842057f6e7e9 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -696,6 +696,8 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie int32_t syncNodeOnSnapshotSendCb(SSyncNode* ths, SyncSnapshotSend* pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg); +int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex); + int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg); int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 07d9b9626137bf7a80f049367fa9f694cbf127fd..fde5722797f21b202cc6b73e668f6e02d13603c6 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -15,7 +15,7 @@ #include "vnd.h" -#define VND_INFO_FNAME "vnode.json" +#define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json" static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); @@ -236,7 +236,9 @@ int vnodeCommit(SVnode *pVnode) { ASSERT(0); return -1; } - walBeginSnapshot(pVnode->pWal, pVnode->state.applied); + + // walBeginSnapshot(pVnode->pWal, pVnode->state.applied); + syncBeginSnapshot(pVnode->sync, pVnode->state.applied); // preCommit // smaSyncPreCommit(pVnode->pSma); @@ -301,7 +303,8 @@ int vnodeCommit(SVnode *pVnode) { } // apply the commit (TODO) - walEndSnapshot(pVnode->pWal); + // walEndSnapshot(pVnode->pWal); + syncEndSnapshot(pVnode->sync); vInfo("vgId:%d, commit end", TD_VID(pVnode)); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index c670185533211846a07306bd45971a268df44036..a231a8be5866d483988b233cf87ebfa85e1a845c 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -196,6 +196,8 @@ typedef struct SSyncNode { // is config changing bool changing; + int64_t snapshottingIndex; + int64_t startTime; int64_t leaderTime; int64_t lastReplicateTime; diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index ed5fc553d6c8609ca1df9b2cbc4e554928438def..8d21d5fabdfdffb32bf1c021c0fd4e965b76f262 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -51,6 +51,7 @@ typedef struct SSyncSnapshotSender { int32_t replicaIndex; SyncTerm term; SyncTerm privateTerm; + int64_t startTime; bool finish; } SSyncSnapshotSender; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index da59ab9dd9db9b80ad0a388f22756acc935c153d..75a9b5a7197367c8c41a1ad91e042455cf62e67c 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -1042,6 +1042,43 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs return ret; } +int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { + // maybe update commit index, leader notice me + if (newCommitIndex > ths->commitIndex) { + // has commit entry in local + if (newCommitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { + // advance commit index to sanpshot first + SSnapshot snapshot; + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); + if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) { + SyncIndex commitBegin = ths->commitIndex; + SyncIndex commitEnd = snapshot.lastApplyIndex; + ths->commitIndex = snapshot.lastApplyIndex; + + char eventLog[128]; + snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, commitBegin, + commitEnd); + syncNodeEventLog(ths, eventLog); + } + + SyncIndex beginIndex = ths->commitIndex + 1; + SyncIndex endIndex = newCommitIndex; + + // update commit index + ths->commitIndex = newCommitIndex; + + // call back Wal + int32_t code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); + ASSERT(code == 0); + + code = syncNodeCommit(ths, beginIndex, endIndex, ths->state); + ASSERT(code == 0); + } + } + + return 0; +} + int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { // prepare response msg SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); @@ -1092,16 +1129,25 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { SyncIndex appendIndex = pMsg->prevLogIndex + 1; SSyncRaftEntry* pLocalEntry = NULL; int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, appendIndex, &pLocalEntry); - ASSERT(code == 0); - - if (pLocalEntry->term == pAppendEntry->term) { - // do nothing - } else { + if (code != 0 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex); ASSERT(code == 0); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); ASSERT(code == 0); + + } else { + ASSERT(code == 0); + + if (pLocalEntry->term == pAppendEntry->term) { + // do nothing + } else { + code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex); + ASSERT(code == 0); + + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + ASSERT(code == 0); + } } syncEntryDestory(pLocalEntry); @@ -1112,37 +1158,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); // maybe update commit index, leader notice me - if (pMsg->commitIndex > ths->commitIndex) { - // has commit entry in local - if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { - // advance commit index to sanpshot first - SSnapshot snapshot; - ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); - if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) { - SyncIndex commitBegin = ths->commitIndex; - SyncIndex commitEnd = snapshot.lastApplyIndex; - ths->commitIndex = snapshot.lastApplyIndex; - - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, commitBegin, - commitEnd); - syncNodeEventLog(ths, eventLog); - } - - SyncIndex beginIndex = ths->commitIndex + 1; - SyncIndex endIndex = pMsg->commitIndex; - - // update commit index - ths->commitIndex = pMsg->commitIndex; - - // call back Wal - int32_t code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); - ASSERT(code == 0); - - code = syncNodeCommit(ths, beginIndex, endIndex, ths->state); - ASSERT(code == 0); - } - } + syncNodeFollowerCommit(ths, pMsg->commitIndex); goto _SEND_RESPONSE; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index bc791af0bb6a4bfd8879e3e2923d517ae6247ed1..759b56c92a143e017e63764e970573288f76a955 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -288,6 +288,51 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { return ret; } +int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } + ASSERT(rid == pSyncNode->rid); + int32_t code = 0; + + if (pSyncNode->replicaNum == 1) { + SSyncLogStoreData* pData = pSyncNode->pLogStore->data; + code = walBeginSnapshot(pData->pWal, lastApplyIndex); + } else { + SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex); + + if (snapshottingIndex == SYNC_INDEX_INVALID) { + atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex); + + SSyncLogStoreData* pData = pSyncNode->pLogStore->data; + code = walBeginSnapshot(pData->pWal, lastApplyIndex); + + } else { + sError("vgId:%d snapshotting index:%ld, lastApplyIndex:%ld", snapshottingIndex, lastApplyIndex); + } + } + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return code; +} + +int32_t syncEndSnapshot(int64_t rid) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } + ASSERT(rid == pSyncNode->rid); + + SSyncLogStoreData* pData = pSyncNode->pLogStore->data; + int32_t code = walEndSnapshot(pData->pWal); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return code; +} + int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) { if (pSyncNode->peersNum == 0) { sDebug("only one replica, cannot leader transfer"); @@ -1231,6 +1276,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { pSyncNode->leaderTime = timeNow; pSyncNode->lastReplicateTime = timeNow; + // snapshotting + atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID); + syncNodeEventLog(pSyncNode, "sync open"); return pSyncNode; @@ -1423,11 +1471,13 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { &pSyncNode->pElectTimer); atomic_store_64(&pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser); - do { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "elect timer reset, ms:%d", ms); - syncNodeEventLog(pSyncNode, logBuf); - } while (0); + /* + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "elect timer reset, ms:%d", ms); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); + */ } else { sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId); @@ -1441,7 +1491,7 @@ int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { taosTmrStop(pSyncNode->pElectTimer); pSyncNode->pElectTimer = NULL; - sTrace("vgId:%d, sync %s stop elect timer", pSyncNode->vgId, syncUtilState2String(pSyncNode->state)); + // sTrace("vgId:%d, sync %s stop elect timer", pSyncNode->vgId, syncUtilState2String(pSyncNode->state)); return ret; } @@ -2334,6 +2384,10 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { // Raft 3.6.2 Committing entries from previous terms syncNodeAppendNoop(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode); + + if (pSyncNode->replicaNum > 1) { + syncNodeDoReplicate(pSyncNode); + } } bool syncNodeIsMnode(SSyncNode* pSyncNode) { return (pSyncNode->vgId == 1); } @@ -2868,8 +2922,12 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { } if (pMsg->term == ths->pRaftStore->currentTerm) { - sInfo("vgId:%d, heartbeat reset timer", 1); + // sInfo("vgId:%d, heartbeat reset timer", ths->vgId); syncNodeResetElectTimer(ths); + + if (ths->state == TAOS_SYNC_STATE_FOLLOWER) { + syncNodeFollowerCommit(ths, pMsg->commitIndex); + } } /* diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 4d756fb382ed214653b1215d8047408d81d11226..c461f4cff1ca24c57eb56eeb91c42271405a4fd3 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -41,6 +41,8 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI } memset(pSender, 0, sizeof(*pSender)); + int64_t timeNow = taosGetTimestampMs(); + pSender->start = false; pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; @@ -51,7 +53,8 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->pSyncNode = pSyncNode; pSender->replicaIndex = replicaIndex; pSender->term = pSyncNode->pRaftStore->currentTerm; - pSender->privateTerm = taosGetTimestampMs() + 100; + pSender->privateTerm = timeNow + 100; + pSender->startTime = timeNow; pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->finish = false; } else { @@ -402,7 +405,21 @@ char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { return s; } -int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { return 0; } +int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { + // calculate index + + SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId); + if (pSender == NULL) { + // create sender + } else { + // if is same + // return 0; + } + + // send begin msg + + return 0; +} // ------------------------------------- SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) { diff --git a/tests/script/tsim/sync/sync2-test.sim b/tests/script/tsim/sync/sync2-test.sim index 14e1954714211953b0297acf63635a610303cacf..4f6bf3f9f1d7e4ff0f2daf73292245e5d2603bb4 100644 --- a/tests/script/tsim/sync/sync2-test.sim +++ b/tests/script/tsim/sync/sync2-test.sim @@ -113,6 +113,13 @@ endi + +#return 0 + + + + + vg_ready: print ====> create stable/child table sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) @@ -120,7 +127,7 @@ sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int) -return 0 +#return 0