diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index a18075572814b0ef2f780f981371265505e92640..745c63d5b3bf2fe6f9dabe60ae12bc863c581ba1 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -362,6 +362,11 @@ typedef struct SOffsetAndContLen { } SOffsetAndContLen; */ +// block1: SOffsetAndContLen +// block2: SOffsetAndContLen Array +// block3: SRpcMsg Array +// block4: SRpcMsg pCont Array + typedef struct SyncAppendEntriesBatch { uint32_t bytes; int32_t vgId; diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index e0ff7a70d7bbd8d120982f1696689533c92f5ab9..6622b89b1d142bb3caebadc96f8e56055322aa83 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -47,6 +47,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, S void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *cntlen, bool standby); void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); void *mndBuildAlterVnodeReq(SMnode *, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); +bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index abd1492d5e1bbb7abdf8ffe711e01f137b2292fd..dabb9387055c2ca79d389ba3a4119b5914574f02 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -645,7 +645,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid == pNew->uid) { + if (mndVgroupInDb(pVgroup, pNew->uid)) { if (mndBuildAlterVgroupAction(pMnode, pTrans, pNew, pVgroup, pArray) != 0) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); @@ -1006,7 +1006,7 @@ static int32_t mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid == pDb->uid) { + if (mndVgroupInDb(pVgroup, pDb->uid)) { numOfTables += pVgroup->numOfTables / TSDB_TABLE_NUM_UNIT; vindex++; } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index eb0ccbb5f77f51420b37c00d9280b9bcd206e190..aa6e7192fcf923ac88c6f2f84c281bbe8e0ca038 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -225,10 +225,11 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { SVgObj* pVgroup; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); if (pIter == NULL) break; - if (strcmp(pVgroup->dbName, pStream->targetDb) != 0) { + if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) { sdbRelease(pSdb, pVgroup); continue; } + SStreamTask* pTask = tNewSStreamTask(pStream->uid); if (pTask == NULL) { sdbRelease(pSdb, pVgroup); @@ -420,10 +421,11 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { SVgObj* pVgroup; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pStream->sourceDbUid) { + if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) { sdbRelease(pSdb, pVgroup); continue; } + SStreamTask* pTask = tNewSStreamTask(pStream->uid); if (pInnerTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -483,10 +485,11 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { SVgObj* pVgroup; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pStream->sourceDbUid) { + if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) { sdbRelease(pSdb, pVgroup); continue; } + SStreamTask* pTask = tNewSStreamTask(pStream->uid); if (pTask == NULL) { sdbRelease(pSdb, pVgroup); @@ -559,7 +562,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pTopic->dbUid) { + if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) { sdbRelease(pSdb, pVgroup); continue; } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 8a0098bc7f341b54fe3fd89fd3c3c805f055e57d..530cc57390cab7b6ba5ab4e18bd55f01db682809 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -822,6 +822,17 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p if (pSma->stbUid == pStb->uid) { pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId); if (pVgroup == NULL) goto _OVER; + + SStreamObj *pStream = mndAcquireStream(pMnode, pSma->name); + if (pStream != NULL && pStream->smaId == pSma->uid) { + if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { + mError("stream:%s, failed to drop task since %s", pStream->name, terrstr()); + goto _OVER; + } + if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { + goto _OVER; + } + } if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER; if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index a4b0453932a39f104c901715560be2b09cc8020c..c7a89945f801dcf6dbcae687819ec8c86ba6713a 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -621,12 +621,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) { - sdbRelease(pSdb, pVgroup); - continue; - } - - if (pVgroup->isTsma) { + if (!mndVgroupInDb(pVgroup, pDb->uid)) { sdbRelease(pSdb, pVgroup); continue; } @@ -664,12 +659,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) { - sdbRelease(pSdb, pVgroup); - continue; - } - - if (pVgroup->isTsma) { + if (!mndVgroupInDb(pVgroup, pDb->uid)) { sdbRelease(pSdb, pVgroup); continue; } @@ -1297,12 +1287,7 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) { - sdbRelease(pSdb, pVgroup); - continue; - } - - if (pVgroup->isTsma) { + if (!mndVgroupInDb(pVgroup, pDb->uid)) { sdbRelease(pSdb, pVgroup); continue; } @@ -1688,12 +1673,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid != pDb->uid) { - sdbRelease(pSdb, pVgroup); - continue; - } - - if (pVgroup->isTsma) { + if (!mndVgroupInDb(pVgroup, pDb->uid)) { sdbRelease(pSdb, pVgroup); continue; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index b09ee5f6088d69c08bb38f627e188ff24529ed96..534cba73c79ff01732f98e942970155082004284 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1146,13 +1146,13 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) } else { code = TSDB_CODE_ACTION_IN_PROGRESS; } - } - if (pAction->rawWritten) { + } else if (pAction->rawWritten) { if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) { code = pAction->errCode; } else { mDebug("trans:%d, %s:%d write successfully", pTrans->id, mndTransStr(pAction->stage), action); } + } else { } } diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index b66e1e0545b9fcf05949ba2248ff2696190af84f..5ca945624dd643d5dfec45d7f2afa3de3698082a 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1759,4 +1759,6 @@ _OVER: taosArrayDestroy(pArray); return code; -} \ No newline at end of file +} + +bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; } \ No newline at end of file diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index a9b4470b423e4aedcb8755df9f23397ba7f01fe7..6801e7212ac24b74df280f6f2bcf2f2f59140946 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -40,8 +40,8 @@ typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void * pReader; - void * pCurrentBlock; + void *pReader; + void *pCurrentBlock; int32_t blockLen; SSnapshot snapshot; SSyncCfg lastConfig; @@ -56,20 +56,20 @@ typedef struct SSyncSnapshotSender { SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex); void snapshotSenderDestroy(SSyncSnapshotSender *pSender); bool snapshotSenderIsStart(SSyncSnapshotSender *pSender); -void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader); -void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish); +int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader); +int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish); int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); -char * snapshotSender2Str(SSyncSnapshotSender *pSender); -char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); +char *snapshotSender2Str(SSyncSnapshotSender *pSender); +char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); //--------------------------------------------------- typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; - void * pWriter; + void *pWriter; SyncTerm term; SyncTerm privateTerm; SSnapshot snapshot; @@ -80,13 +80,13 @@ typedef struct SSyncSnapshotReceiver { SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId); void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); -void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg); -bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); -void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); +int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg); +int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); +bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); -char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); +char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); //--------------------------------------------------- // on message diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 029374368f97a9e45d58353532820db069a33e7a..be48227d404f5b9ad82735ac521d0e2125b62c2f 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -628,6 +628,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { #endif +static int32_t syncNodeMakeLogSame2(SSyncNode* ths, SyncAppendEntriesBatch* pMsg) { return 0; } + static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t code; @@ -719,7 +721,282 @@ static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries return false; } -int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatch* pMsg) { return 0; } +int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatch* pMsg) { + int32_t ret = 0; + int32_t code = 0; + + // if already drop replica, do not process + if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { + syncNodeEventLog(ths, "recv sync-append-entries-batch, maybe replica already dropped"); + return ret; + } + + // maybe update term + if (pMsg->term > ths->pRaftStore->currentTerm) { + syncNodeUpdateTerm(ths, pMsg->term); + } + ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); + + // reset elect timer + if (pMsg->term == ths->pRaftStore->currentTerm) { + ths->leaderCache = pMsg->srcId; + syncNodeResetElectTimer(ths); + } + ASSERT(pMsg->dataLen >= 0); + + // candidate to follower + // + // operation: + // to follower + do { + bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE; + if (condition) { + syncNodeEventLog(ths, "recv sync-append-entries-batch, candidate to follower"); + + syncNodeBecomeFollower(ths, "from candidate by append entries"); + // do not reply? + return ret; + } + } while (0); + + // fake match2 + // + // condition1: + // preIndex <= my commit index + // + // operation: + // if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry + // match my-commit-index or my-commit-index + 1 + // no operation on log + do { + bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && + (pMsg->prevLogIndex <= ths->commitIndex); + if (condition) { + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-batch, fake match2, pre-index:%ld, pre-term:%lu, datalen:%d", + pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen); + syncNodeEventLog(ths, logBuf); + } while (0); + + SyncIndex matchIndex = ths->commitIndex; + bool hasAppendEntries = pMsg->dataLen > 0; + if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) { + SRpcMsg rpcMsgArr[SYNC_MAX_BATCH_SIZE]; + memset(rpcMsgArr, 0, sizeof(rpcMsgArr)); + int32_t retArrSize = 0; + syncAppendEntriesBatch2RpcMsgArray(pMsg, rpcMsgArr, SYNC_MAX_BATCH_SIZE, &retArrSize); + + // make log same + do { + SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); + bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex; + + if (hasExtraEntries) { + // make log same, rollback deleted entries + code = syncNodeMakeLogSame2(ths, pMsg); + ASSERT(code == 0); + } + + } while (0); + + // append entry batch + for (int32_t i = 0; i < retArrSize; ++i) { + SSyncRaftEntry* pAppendEntry = syncEntryBuild(1234); + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + if (code != 0) { + return -1; + } + + code = syncNodePreCommit(ths, pAppendEntry); + ASSERT(code == 0); + + syncEntryDestory(pAppendEntry); + } + + // fsync once + SSyncLogStoreData* pData = ths->pLogStore->data; + SWal* pWal = pData->pWal; + walFsync(pWal, true); + + // update match index + matchIndex = pMsg->prevLogIndex + retArrSize; + } + + // prepare response msg + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; + pReply->success = true; + pReply->matchIndex = matchIndex; + + // send response + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncAppendEntriesReplyDestroy(pReply); + + return ret; + } + } while (0); + + // calculate logOK here, before will coredump, due to fake match + // bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg); + bool logOK = true; + + // not match + // + // condition1: + // term < myTerm + // + // condition2: + // !logOK + // + // operation: + // not match + // no operation on log + do { + bool condition1 = pMsg->term < ths->pRaftStore->currentTerm; + bool condition2 = + (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK; + bool condition = condition1 || condition2; + + if (condition) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, not match, pre-index:%ld, pre-term:%lu, datalen:%d", + pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen); + syncNodeEventLog(ths, logBuf); + + // prepare response msg + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; + pReply->success = false; + pReply->matchIndex = SYNC_INDEX_INVALID; + + // send response + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncAppendEntriesReplyDestroy(pReply); + + return ret; + } + } while (0); + + // really match + // + // condition: + // logOK + // + // operation: + // match + // make log same + do { + bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && logOK; + if (condition) { + // has extra entries (> preIndex) in local log + SyncIndex myLastIndex = syncNodeGetLastIndex(ths); + bool hasExtraEntries = myLastIndex > pMsg->prevLogIndex; + + // has entries in SyncAppendEntries msg + bool hasAppendEntries = pMsg->dataLen > 0; + + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, match, pre-index:%ld, pre-term:%lu, datalen:%d", + pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen); + syncNodeEventLog(ths, logBuf); + + if (hasExtraEntries) { + // make log same, rollback deleted entries + // code = syncNodeMakeLogSame(ths, pMsg); + ASSERT(code == 0); + } + + int32_t retArrSize = 0; + if (hasAppendEntries) { + SRpcMsg rpcMsgArr[SYNC_MAX_BATCH_SIZE]; + memset(rpcMsgArr, 0, sizeof(rpcMsgArr)); + syncAppendEntriesBatch2RpcMsgArray(pMsg, rpcMsgArr, SYNC_MAX_BATCH_SIZE, &retArrSize); + + // append entry batch + for (int32_t i = 0; i < retArrSize; ++i) { + SSyncRaftEntry* pAppendEntry = syncEntryBuild(1234); + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + if (code != 0) { + return -1; + } + + code = syncNodePreCommit(ths, pAppendEntry); + ASSERT(code == 0); + + syncEntryDestory(pAppendEntry); + } + + // fsync once + SSyncLogStoreData* pData = ths->pLogStore->data; + SWal* pWal = pData->pWal; + walFsync(pWal, true); + } + + // prepare response msg + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; + pReply->success = true; + pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + retArrSize : pMsg->prevLogIndex; + + // send response + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncAppendEntriesReplyDestroy(pReply); + + // maybe update commit index, leader notice me + if (pMsg->commitIndex > ths->commitIndex) { + // has commit entry in local + if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { + // advance commit index to sanpshot first + SSnapshot snapshot; + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); + if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) { + SyncIndex commitBegin = ths->commitIndex; + SyncIndex commitEnd = snapshot.lastApplyIndex; + ths->commitIndex = snapshot.lastApplyIndex; + + char eventLog[128]; + snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%ld to index:%ld", commitBegin, + commitEnd); + syncNodeEventLog(ths, eventLog); + } + + SyncIndex beginIndex = ths->commitIndex + 1; + SyncIndex endIndex = pMsg->commitIndex; + + // update commit index + ths->commitIndex = pMsg->commitIndex; + + // call back Wal + code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); + ASSERT(code == 0); + + code = syncNodeCommit(ths, beginIndex, endIndex, ths->state); + ASSERT(code == 0); + } + } + return ret; + } + } while (0); + + return ret; +} int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t ret = 0; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 56f5aaa239bdc9e4b542fa78eb358e5b92fda2aa..4466cccbbcbc35401ca20fc916856a20d8abb582 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -80,7 +80,7 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; } // begin send snapshot by snapshot, pReader -void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader) { +int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader) { ASSERT(!snapshotSenderIsStart(pSender)); // init snapshot and reader @@ -181,9 +181,11 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void syncNodeEventLog(pSender->pSyncNode, eventLog); taosMemoryFree(eventLog); } while (0); + + return 0; } -void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { +int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { // close reader if (pSender->pReader != NULL) { int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); @@ -208,6 +210,8 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { syncNodeEventLog(pSender->pSyncNode, eventLog); taosMemoryFree(eventLog); } while (0); + + return 0; } // when sender receive ack, call this function to send msg from seq @@ -349,14 +353,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { int32_t len = 256; - char * s = taosMemoryMalloc(len); + char *s = taosMemoryMalloc(len); SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; char host[64]; @@ -471,7 +475,7 @@ static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { // if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver // if already start, force close, start again -void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) { +int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) { if (!snapshotReceiverIsStart(pReceiver)) { // first start snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg); @@ -486,9 +490,11 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTer // start again snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg); } + + return 0; } -void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { +int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { if (pReceiver->pWriter != NULL) { int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false); @@ -506,6 +512,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { syncNodeEventLog(pReceiver->pSyncNode, eventLog); taosMemoryFree(eventLog); } while (0); + + return 0; } static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { @@ -604,7 +612,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddStringToObject(pFromId, "addr", u64buf); { uint64_t u64 = pReceiver->fromId.addr; - cJSON * pTmp = pFromId; + cJSON *pTmp = pFromId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); @@ -637,14 +645,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { int32_t len = 256; - char * s = taosMemoryMalloc(len); + char *s = taosMemoryMalloc(len); SRaftId fromId = pReceiver->fromId; char host[128]; diff --git a/tools/taos-tools b/tools/taos-tools index 7105027650b51e701cfa1dac11b8fb42d447dd01..5fdd694621fbb7bd2d6102ff4feaec92a7001038 160000 --- a/tools/taos-tools +++ b/tools/taos-tools @@ -1 +1 @@ -Subproject commit 7105027650b51e701cfa1dac11b8fb42d447dd01 +Subproject commit 5fdd694621fbb7bd2d6102ff4feaec92a7001038