diff --git a/include/common/tglobal.h b/include/common/tglobal.h index cd9667843b833746a20a33b066a72b7364ad93e4..2331f0b23c52d6b826796549a1964d4eb970a9fd 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -69,6 +69,9 @@ extern int32_t tsElectInterval; extern int32_t tsHeartbeatInterval; extern int32_t tsHeartbeatTimeout; +// vnode +extern int64_t tsVndCommitMaxIntervalMs; + // monitor extern bool tsEnableMonitor; extern int32_t tsMonitorInterval; diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index f93bf9a326008b25dfbc05737d213f3a7cc3cbeb..4c23c1f5575434cc971a49a042df4ce1720b73ef 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -49,10 +49,12 @@ extern "C" { #define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500 #define SYNC_SNAP_RESEND_MS 1000 * 60 +#define SYNC_VND_COMMIT_MIN_MS 1000 + #define SYNC_MAX_BATCH_SIZE 1 #define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_INVALID -1 -#define SYNC_TERM_INVALID -1 // 0xFFFFFFFFFFFFFFFF +#define SYNC_TERM_INVALID -1 typedef enum { SYNC_STRATEGY_NO_SNAPSHOT = 0, diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index f670ce5aaf52a37bc43bebc4349ca2e0d6b6c6c1..037c8a45419353864c4e6b962beb641daf3b35c1 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -60,6 +60,9 @@ int32_t tsElectInterval = 25 * 1000; int32_t tsHeartbeatInterval = 1000; int32_t tsHeartbeatTimeout = 20 * 1000; +// vnode +int64_t tsVndCommitMaxIntervalMs = 60 * 1000; + // monitor bool tsEnableMonitor = true; int32_t tsMonitorInterval = 30; @@ -435,6 +438,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, 0) != 0) return -1; if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, 0) != 0) return -1; + if (cfgAddInt64(pCfg, "vndCommitMaxInterval", tsVndCommitMaxIntervalMs, 1000, 1000 * 60 * 60, 0) != 0) return -1; + if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1; if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1; @@ -754,6 +759,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsHeartbeatInterval = cfgGetItem(pCfg, "syncHeartbeatInterval")->i32; tsHeartbeatTimeout = cfgGetItem(pCfg, "syncHeartbeatTimeout")->i32; + tsVndCommitMaxIntervalMs = cfgGetItem(pCfg, "vndCommitMaxInterval")->i64; + tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs)); tstrncpy(tsUdfdLdLibPath, cfgGetItem(pCfg, "udfdLdLibPath")->str, sizeof(tsUdfdLdLibPath)); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 6b274261e5856511e8f3eea9635aecf2c492aeec..693fe97daaaa238f917132bc3c3282abba3c4dda 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -79,6 +79,8 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { char path[TSDB_FILENAME_LEN] = {0}; + vnodeProposeCommitOnNeed(pVnode->pImpl); + taosThreadRwlockWrlock(&pMgmt->lock); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); taosThreadRwlockUnlock(&pMgmt->lock); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index c8c8e06c5ed5a3b041161cf7b3d573053f1a93ec..9b3934c40cd0756fb11de91e311efa985b3801ed 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -785,9 +785,9 @@ static void mndReloadSyncConfig(SMnode *pMnode) { int32_t code = syncReconfig(pMnode->syncMgmt.sync, &cfg); if (code != 0) { - mError("vgId:1, failed to reconfig mnode sync since %s", terrstr()); + mError("vgId:1, mnode sync reconfig failed since %s", terrstr()); } else { - mInfo("vgId:1, reconfig mnode sync success"); + mInfo("vgId:1, mnode sync reconfig success"); } } } diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 2a63e3faf3a902e001b7511e8b340908a8c81e7d..9e830b83e68f1b4c32a040abc3b72b1c6981c225 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -243,7 +243,7 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { if (pFile == NULL) { taosMemoryFree(pRaw); terrno = TAOS_SYSTEM_ERROR(errno); - mDebug("failed to read sdb file:%s since %s", file, terrstr()); + mInfo("read sdb file:%s finished since %s", file, terrstr()); return 0; } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index b133226ed39bb5c20ed96b56d95881d009e1e872..860db20fa87e225f43e33e7b25f728a13f65846a 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -124,6 +124,7 @@ FAIL: } void sndClose(SSnode *pSnode) { + streamMetaCommit(pSnode->pMeta); streamMetaClose(pSnode->pMeta); taosMemoryFree(pSnode->path); taosMemoryFree(pSnode); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index e92daaabe888c62b3a4b59f75e24ce37bee4df0d..e5e7fea1cfdb12a46fc3126ce8fe5df85212acea 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -89,6 +89,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); +void vnodeProposeCommitOnNeed(SVnode *pVnode); // meta typedef struct SMeta SMeta; // todo: remove diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 93750ed58506ba237edd8158b08d8a779a9b6691..e075d8a6ca0af9da2a0ca7f12eeb834599fce3e5 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -86,6 +86,7 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg); // vnodeCommit.c int32_t vnodeBegin(SVnode* pVnode); int32_t vnodeShouldCommit(SVnode* pVnode); +void vnodeUpdCommitSched(SVnode* pVnode); void vnodeRollback(SVnode* pVnode); int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); @@ -103,6 +104,7 @@ void vnodeSyncClose(SVnode* pVnode); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg, int32_t code); bool vnodeIsLeader(SVnode* pVnode); bool vnodeIsRoleLeader(SVnode* pVnode); +int vnodeShouldCommit(SVnode* pVnode); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 23c8e7e037ed0bd54bab4cbb3699aff3ac193510..75367883f1d82edf349bbccbee5552dcf299a003 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -332,6 +332,11 @@ struct STsdbKeepCfg { int32_t keep2; }; +typedef struct SVCommitSched { + int64_t commitMs; + int64_t maxWaitMs; +} SVCommitSched; + struct SVnode { char* path; SVnodeCfg config; @@ -350,6 +355,7 @@ struct SVnode { STQ* pTq; SSink* pSink; tsem_t canCommit; + SVCommitSched commitSched; int64_t sync; TdThreadMutex lock; bool blocked; diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index 1b5f74255948da71938eb274fe3cfd05700d8aa9..867b481bcc104561703105c4aed5bd9c75fd8455 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -203,6 +203,7 @@ _err: int metaClose(SMeta *pMeta) { if (pMeta) { + if (pMeta->pEnv) tdbAbort(pMeta->pEnv, pMeta->txn); if (pMeta->pCache) metaCacheClose(pMeta); if (pMeta->pIdx) metaCloseIdx(pMeta); if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); diff --git a/source/dnode/vnode/src/tq/tqCommit.c b/source/dnode/vnode/src/tq/tqCommit.c index dabd97a345f375c6774d37e4f4a408bd0bd44940..7fc66c49192eb011ce674bc7a2d396825ba1b435 100644 --- a/source/dnode/vnode/src/tq/tqCommit.c +++ b/source/dnode/vnode/src/tq/tqCommit.c @@ -15,4 +15,11 @@ #include "tq.h" -int tqCommit(STQ* pTq) { return tqOffsetCommitFile(pTq->pOffsetStore); } +int tqCommit(STQ* pTq) { + if (streamMetaCommit(pTq->pStreamMeta) < 0) { + tqError("vgId:%d, failed to commit stream meta since %s", TD_VID(pTq->pVnode), terrstr()); + return -1; + } + + return tqOffsetCommitFile(pTq->pOffsetStore); +} diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 6a8251706721cc4ac5dd178d258728c4d9f0b2d2..0fc5b617bbb83e514ff8e00295d68295f70d1bb1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1080,6 +1080,8 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) { iMax[nMax] = i; max[nMax++] = pIter->input[i].pRow; + } else { + pIter->input[i].next = false; } } } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 6c54c3cb5c67b404c069e952c021597bdd0122f5..2b76e9f9b5877465698ce2d7c364357ed1b06bdc 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -58,7 +58,25 @@ int vnodeBegin(SVnode *pVnode) { return 0; } +void vnodeUpdCommitSched(SVnode *pVnode) { + int64_t randNum = taosRand(); + pVnode->commitSched.commitMs = taosGetMonoTimestampMs(); + pVnode->commitSched.maxWaitMs = tsVndCommitMaxIntervalMs + (randNum % tsVndCommitMaxIntervalMs); +} + int vnodeShouldCommit(SVnode *pVnode) { + if (!pVnode->inUse || !osDataSpaceAvailable()) { + return false; + } + + SVCommitSched *pSched = &pVnode->commitSched; + int64_t nowMs = taosGetMonoTimestampMs(); + + return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) || + (pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs)); +} + +int vnodeShouldCommitOld(SVnode *pVnode) { if (pVnode->inUse) { return osDataSpaceAvailable() && (pVnode->inUse->size > pVnode->inUse->node.size); } @@ -247,6 +265,7 @@ _exit: taosMemoryFree(pInfo); return code; } + int vnodeAsyncCommit(SVnode *pVnode) { int32_t code = 0; @@ -294,7 +313,9 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { SVnode *pVnode = pInfo->pVnode; vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode), - pVnode->state.commitID, pVnode->state.applied, pVnode->state.applyTerm); + pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm); + + vnodeUpdCommitSched(pVnode); // persist wal before starting if (walPersist(pVnode->pWal) < 0) { diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index cea85a0c1134dbd78f9163824c1bc3f5be7887d6..8c07d0cec7db4dd2a6e398259a842e8659d6e10a 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -160,6 +160,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { taosThreadMutexInit(&pVnode->mutex, NULL); taosThreadCondInit(&pVnode->poolNotEmpty, NULL); + vnodeUpdCommitSched(pVnode); + int8_t rollback = vnodeShouldRollback(pVnode); // open buffer pool @@ -254,7 +256,7 @@ void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); } void vnodeClose(SVnode *pVnode) { if (pVnode) { - vnodeSyncCommit(pVnode); + tsem_wait(&pVnode->canCommit); vnodeSyncClose(pVnode); vnodeQueryClose(pVnode); walClose(pVnode->pWal); @@ -263,6 +265,8 @@ void vnodeClose(SVnode *pVnode) { smaClose(pVnode->pSma); metaClose(pVnode->pMeta); vnodeCloseBufPool(pVnode); + tsem_post(&pVnode->canCommit); + // destroy handle tsem_destroy(&(pVnode->canCommit)); tsem_destroy(&pVnode->syncSem); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ac6406fc2352fb5d9c4559d2c186811742e6dc62..4cf6c4e55c7b8126cd4b999af58289625a9cad9b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -203,6 +203,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp // skip header pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); len = pMsg->contLen - sizeof(SMsgHead); + bool needCommit = false; switch (pMsg->msgType) { /* META */ @@ -299,9 +300,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp); break; case TDMT_VND_COMMIT: - vnodeSyncCommit(pVnode); - vnodeBegin(pVnode); - goto _exit; + needCommit = true; + break; default: vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType); return -1; @@ -318,7 +318,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp } // commit if need - if (vnodeShouldCommit(pVnode)) { + if (needCommit) { vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); vnodeAsyncCommit(pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 65e17cfaad36cc03c8ee178223153e6ea62919a9..6f3788616a5f45e440d07f23b89e3e493d07fa4d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -101,6 +101,64 @@ static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) } } +static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) { + int64_t seq = 0; + + taosThreadMutexLock(&pVnode->lock); + int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq); + bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType)); + if (wait) { + ASSERT(!pVnode->blocked); + pVnode->blocked = true; + pVnode->blockSec = taosGetTimestampSec(); + pVnode->blockSeq = seq; +#if 0 + pVnode->blockInfo = pMsg->info; +#endif + } + taosThreadMutexUnlock(&pVnode->lock); + + if (code > 0) { + vnodeHandleWriteMsg(pVnode, pMsg); + } else if (code < 0) { + if (terrno != 0) code = terrno; + vnodeHandleProposeError(pVnode, pMsg, code); + } + + if (wait) vnodeWaitBlockMsg(pVnode, pMsg); + return code; +} + +void vnodeProposeCommitOnNeed(SVnode *pVnode) { + if (!vnodeShouldCommit(pVnode)) { + return; + } + + int32_t contLen = sizeof(SMsgHead); + SMsgHead *pHead = rpcMallocCont(contLen); + pHead->contLen = contLen; + pHead->vgId = pVnode->config.vgId; + + SRpcMsg rpcMsg = {0}; + rpcMsg.msgType = TDMT_VND_COMMIT; + rpcMsg.contLen = contLen; + rpcMsg.pCont = pHead; + rpcMsg.info.noResp = 1; + + bool isWeak = false; + if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) { + vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr()); + goto _out; + } + + vInfo("vgId:%d, proposed vnode commit", pVnode->config.vgId); + +_out: + vnodeUpdCommitSched(pVnode); + rpcFreeCont(rpcMsg.pCont); + rpcMsg.pCont = NULL; +} + #if BATCH_ENABLE static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) { @@ -178,6 +236,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) continue; } + vnodeProposeCommitOnNeed(pVnode); + code = vnodePreProcessWriteMsg(pVnode, pMsg); if (code != 0) { vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr()); @@ -205,34 +265,6 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) #else -static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) { - int64_t seq = 0; - - taosThreadMutexLock(&pVnode->lock); - int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq); - bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType)); - if (wait) { - ASSERT(!pVnode->blocked); - pVnode->blocked = true; - pVnode->blockSec = taosGetTimestampSec(); - pVnode->blockSeq = seq; -#if 0 - pVnode->blockInfo = pMsg->info; -#endif - } - taosThreadMutexUnlock(&pVnode->lock); - - if (code > 0) { - vnodeHandleWriteMsg(pVnode, pMsg); - } else if (code < 0) { - if (terrno != 0) code = terrno; - vnodeHandleProposeError(pVnode, pMsg, code); - } - - if (wait) vnodeWaitBlockMsg(pVnode, pMsg); - return code; -} - void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnode *pVnode = pInfo->ahandle; int32_t vgId = pVnode->config.vgId; @@ -256,6 +288,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) continue; } + vnodeProposeCommitOnNeed(pVnode); + code = vnodePreProcessWriteMsg(pVnode, pMsg); if (code != 0) { vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr()); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2b22a4ed2937b52c591f41661952cd01a667ae79..6f77769decdf2649b66e5cac0c8bbf823cce85e1 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -69,8 +69,7 @@ _err: } void streamMetaClose(SStreamMeta* pMeta) { - tdbCommit(pMeta->db, pMeta->txn); - tdbPostCommit(pMeta->db, pMeta->txn); + tdbAbort(pMeta->db, pMeta->txn); tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pCheckpointDb); tdbClose(pMeta->db); @@ -88,6 +87,7 @@ void streamMetaClose(SStreamMeta* pMeta) { /*streamMetaReleaseTask(pMeta, pTask);*/ } taosHashCleanup(pMeta->pTasks); + taosHashCleanup(pMeta->pRecoverStatus); taosMemoryFree(pMeta->path); taosMemoryFree(pMeta); } diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 3bd94dbab5811fc071d7444ab864d3087f785d53..49486bc12dbc8cea7318def97a12cc160d6ed42a 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -247,8 +247,8 @@ typedef struct SyncLocalCmd { SRaftId destId; int32_t cmd; - SyncTerm sdNewTerm; // step down new term - SyncIndex fcIndex; // follower commit index + SyncTerm currentTerm; // step down new term + SyncIndex commitIndex; // follower commit index } SyncLocalCmd; int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType ttype, uint64_t logicClock, int32_t ms, SSyncNode* pNode); diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index a0a0691694776f2830d4f80ec2e96ec8bf782921..55cb0d7db685304053fdafff092a8206a12a7d73 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -98,6 +98,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); // access int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf); +SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf); int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry); int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm); int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 1dc6905b88cbf93dc31a4b9dc73f27567868876b..026ebdb37ce0354cd5a13eea60e9bac906b2d296 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -90,6 +90,7 @@ // int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { + ASSERT(false && "deprecated"); if (ths->state != TAOS_SYNC_STATE_FOLLOWER) { sNTrace(ths, "can not do follower commit"); return -1; @@ -206,12 +207,13 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { accepted = true; _SEND_RESPONSE: + pEntry = NULL; pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, &pReply->lastMatchTerm); bool matched = (pReply->matchIndex >= pReply->lastSendIndex); if (accepted && matched) { pReply->success = true; // update commit index only after matching - (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); + (void)syncNodeUpdateCommitIndex(ths, TMIN(pMsg->commitIndex, pReply->lastSendIndex)); } // ack, i.e. send response diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 5fdcbeb91c119bfebb21805b12fc1faa15eb2cab..152fddb7e68127bac57947faf04a7ac8b907ff20 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -44,6 +44,7 @@ // /\ UNCHANGED <> // void syncOneReplicaAdvance(SSyncNode* pSyncNode) { + ASSERT(false && "deprecated"); if (pSyncNode == NULL) { sError("pSyncNode is NULL"); return; diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index 0d6d0f93f102800b1e9d13e0aa65061983db7332..1fa67cfa4d7a9b5831219cae699f474403ff243a 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -114,7 +114,7 @@ void syncHbTimerDataRemove(int64_t rid) { taosRemoveRef(gHbDataRefId, rid); } SSyncHbTimerData *syncHbTimerDataAcquire(int64_t rid) { SSyncHbTimerData *pData = taosAcquireRef(gHbDataRefId, rid); - if (pData == NULL) { + if (pData == NULL && rid > 0) { sInfo("failed to acquire hb-timer-data from refId:%" PRId64, rid); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 91a05e78b0ac626343eb42fb9b20e3fde9737c57..c2bf6dc837ed732a0d8e0993319634c5bf2a5aa4 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1030,6 +1030,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { } } pSyncNode->commitIndex = commitIndex; + sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex); if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) { goto _error; @@ -1170,9 +1171,10 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) { } ASSERT(endIndex == lastVer + 1); - commitIndex = TMAX(pSyncNode->commitIndex, commitIndex); + pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex); + sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex); - if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, commitIndex) < 0) { + if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex) < 0) { return -1; } @@ -1480,16 +1482,21 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg } } + int32_t code = -1; if (pNode->syncSendMSg != NULL && epSet != NULL) { syncUtilMsgHtoN(pMsg->pCont); pMsg->info.noResp = 1; - return pNode->syncSendMSg(epSet, pMsg); - } else { - sError("vgId:%d, sync send msg by id error, fp:%p epset:%p", pNode->vgId, pNode->syncSendMSg, epSet); + code = pNode->syncSendMSg(epSet, pMsg); + } + + if (code < 0) { + sError("vgId:%d, sync send msg by id error, epset:%p dnode:%d addr:%" PRId64 " err:0x%x", pNode->vgId, epSet, + DID(destRaftId), destRaftId->addr, terrno); rpcFreeCont(pMsg->pCont); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; } + + return code; } inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) { @@ -2537,8 +2544,9 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont; pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT; - pSyncMsg->fcIndex = pMsg->commitIndex; - SyncIndex fcIndex = pSyncMsg->fcIndex; + pSyncMsg->commitIndex = pMsg->commitIndex; + pSyncMsg->currentTerm = pMsg->term; + SyncIndex fcIndex = pSyncMsg->commitIndex; if (ths->syncEqMsg != NULL && ths->msgcb != NULL) { int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd); @@ -2559,7 +2567,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont; pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN; - pSyncMsg->sdNewTerm = pMsg->term; + pSyncMsg->currentTerm = pMsg->term; + pSyncMsg->commitIndex = pMsg->commitIndex; if (ths->syncEqMsg != NULL && ths->msgcb != NULL) { int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd); @@ -2567,7 +2576,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code); rpcFreeCont(rpcMsgLocalCmd.pCont); } else { - sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm); + sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->currentTerm); } } } @@ -2625,10 +2634,13 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncLogRecvLocalCmd(ths, pMsg, ""); if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { - syncNodeStepDown(ths, pMsg->sdNewTerm); + syncNodeStepDown(ths, pMsg->currentTerm); } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { - (void)syncNodeUpdateCommitIndex(ths, pMsg->fcIndex); + SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf); + if (pMsg->currentTerm == matchTerm) { + (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); + } if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { sError("vgId:%d, failed to commit raft log since %s. commit index: %" PRId64 "", ths->vgId, terrstr(), ths->commitIndex); @@ -2641,14 +2653,15 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } int32_t syncNodeOnLocalCmdOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) { + ASSERT(false && "deprecated"); SyncLocalCmd* pMsg = pRpcMsg->pCont; syncLogRecvLocalCmd(ths, pMsg, ""); if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { - syncNodeStepDown(ths, pMsg->sdNewTerm); + syncNodeStepDown(ths, pMsg->currentTerm); } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { - syncNodeFollowerCommit(ths, pMsg->fcIndex); + syncNodeFollowerCommit(ths, pMsg->commitIndex); } else { sError("error local cmd"); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 410986b87a4031d19e8b4e7d96da65ceac7a4839..f878044bca35a3a22997d32a3247d150523dfb88 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -31,6 +31,10 @@ static bool syncIsMsgBlock(tmsg_t type) { (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM); } +FORCE_INLINE static int64_t syncGetRetryMaxWaitMs() { + return SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF); +} + int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) { taosThreadMutexLock(&pBuf->mutex); int64_t index = pBuf->endIndex; @@ -264,20 +268,27 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { return ret; } -FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) { +FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTermWithoutLock(SSyncLogBuffer* pBuf) { SyncIndex index = pBuf->matchIndex; SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; ASSERT(pEntry != NULL); return pEntry->term; } +SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) { + taosThreadMutexLock(&pBuf->mutex); + SyncTerm term = syncLogBufferGetLastMatchTermWithoutLock(pBuf); + taosThreadMutexUnlock(&pBuf->mutex); + return term; +} + int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) { taosThreadMutexLock(&pBuf->mutex); syncLogBufferValidate(pBuf); int32_t ret = -1; SyncIndex index = pEntry->index; SyncIndex prevIndex = pEntry->index - 1; - SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf); + SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTermWithoutLock(pBuf); SSyncRaftEntry* pExist = NULL; bool inBuf = true; @@ -329,6 +340,8 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt } // update + ASSERT(pBuf->startIndex < index); + ASSERT(index - pBuf->startIndex < pBuf->size); ASSERT(pBuf->entries[index % pBuf->size].pItem == NULL); SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm}; pEntry = NULL; @@ -456,6 +469,11 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode); } + if (pEntry->originalRpcType == TDMT_VND_COMMIT) { + sInfo("vgId:%d, fsm execute vnode commit. index: %" PRId64 ", term: %" PRId64 "", pNode->vgId, pEntry->index, + pEntry->term); + } + SRpcMsg rpcMsg = {.code = applyCode}; syncEntry2OriginalRpc(pEntry, &rpcMsg); @@ -552,7 +570,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm ret = 0; _out: // mark as restored if needed - if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex) { + if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL && + pNode->pRaftStore->currentTerm <= pEntry->term) { pNode->pFsm->FpRestoreFinishCb(pNode->pFsm); pNode->restoreFinish = true; sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, @@ -613,6 +632,12 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { } if (pMgr->states[pos].acked) { + if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) { + syncLogReplMgrReset(pMgr); + sWarn("vgId:%d, reset sync log repl mgr since stagnation. index: %" PRId64 ", peer: %" PRIx64, pNode->vgId, + index, pDestId->addr); + goto _out; + } continue; } @@ -639,10 +664,12 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { _out: if (retried) { pMgr->retryBackoff = syncLogGetNextRetryBackoff(pMgr); - sInfo("vgId:%d, resent %d sync log entries. dest: %" PRIx64 ", indexes: %" PRId64 " ..., terms: ... %" PRId64 - ", retryWaitMs: %" PRId64 ", repl mgr: [%" PRId64 " %" PRId64 ", %" PRId64 ")", + SSyncLogBuffer* pBuf = pNode->pLogBuf; + sInfo("vgId:%d, resend %d sync log entries. dest: %" PRIx64 ", indexes: %" PRId64 " ..., terms: ... %" PRId64 + ", retryWaitMs: %" PRId64 ", mgr: [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 + " %" PRId64 ", %" PRId64 ")", pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex, - pMgr->endIndex); + pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); } return ret; } @@ -771,7 +798,7 @@ int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { ASSERT(!pMgr->restored); ASSERT(pMgr->startIndex >= 0); - int64_t retryMaxWaitMs = SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF); + int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs(); int64_t nowMs = taosGetMonoTimestampMs(); if (pMgr->endIndex > pMgr->startIndex && @@ -799,11 +826,10 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode pMgr->endIndex = index + 1; SSyncLogBuffer* pBuf = pNode->pLogBuf; - sTrace("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64 - ". pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 - ")", - pNode->vgId, pMgr->peerId, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, - pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + sInfo("vgId:%d, probe peer:%" PRIx64 " with msg of index:%" PRId64 " term: %" PRId64 ". mgr (rs:%d): [%" PRId64 + " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pDestId->addr, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, + pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; } @@ -815,9 +841,11 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p int32_t count = 0; int64_t nowMs = taosGetMonoTimestampMs(); int64_t limit = pMgr->size >> 1; + SyncTerm term = -1; + SyncIndex firstIndex = -1; for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) { - if (batchSize < count++ || limit <= index - pMgr->startIndex) { + if (batchSize < count || limit <= index - pMgr->startIndex) { break; } if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) { @@ -826,7 +854,6 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p int64_t pos = index % pMgr->size; SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; bool barrier = false; - SyncTerm term = -1; if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); @@ -837,6 +864,9 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p pMgr->states[pos].term = term; pMgr->states[pos].acked = false; + if (firstIndex == -1) firstIndex = index; + count++; + pMgr->endIndex = index + 1; if (barrier) { sInfo("vgId:%d, replicated sync barrier to dest: %" PRIx64 ". index: %" PRId64 ", term: %" PRId64 @@ -850,10 +880,11 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p syncLogReplMgrRetryOnNeed(pMgr, pNode); SSyncLogBuffer* pBuf = pNode->pLogBuf; - sTrace("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 - "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, count, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, - pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + sTrace("vgId:%d, replicated %d msgs to peer: %" PRIx64 ". indexes: %" PRId64 "..., terms: ...%" PRId64 + ", mgr: (rs:%d) [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 + ")", + pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, + pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; } @@ -985,6 +1016,10 @@ void syncLogBufferDestroy(SSyncLogBuffer* pBuf) { int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) { ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex); + if (toIndex == pBuf->endIndex) { + return 0; + } + sInfo("vgId:%d, rollback sync log buffer. toindex: %" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 3f9f397ef5726b2d29449080529dff6a4d9c2ddf..03c3fe154d74422104c055f3cf83b6e05fc20770 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -219,6 +219,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ASSERT(pEntry->index == index); + if (pEntry->originalRpcType == TDMT_VND_COMMIT) { + walFsync(pWal, true); + } + sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index, TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed); return 0; @@ -312,29 +316,6 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - // need not truncate - SyncIndex wallastVer = walGetLastVer(pWal); - if (fromIndex > wallastVer) { - return 0; - } - - // need not truncate - SyncIndex walCommitVer = walGetCommittedVer(pWal); - if (fromIndex <= walCommitVer) { - return 0; - } - - // delete from cache - for (SyncIndex index = fromIndex; index <= wallastVer; ++index) { - SLRUCache* pCache = pData->pSyncNode->pLogStore->pCache; - LRUHandle* h = taosLRUCacheLookup(pCache, &index, sizeof(index)); - if (h) { - sNTrace(pData->pSyncNode, "cache delete index:%" PRId64, index); - - taosLRUCacheRelease(pData->pSyncNode->pLogStore->pCache, h, true); - } - } - int32_t code = walRollback(pWal, fromIndex); if (code != 0) { int32_t err = terrno; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 0ec24d5326c83fdf70111cc6ab16ee7a49533cdf..e4a65837f73dd1c22230e21025cf36775bd20fec 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -17,20 +17,20 @@ #include "syncUtil.h" #include "syncIndexMgr.h" #include "syncMessage.h" +#include "syncPipeline.h" #include "syncRaftCfg.h" #include "syncRaftStore.h" #include "syncSnapshot.h" void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) { - int32_t len = snprintf(buf, bufLen, "{r-num:%d, my:%d, ", pCfg->replicaNum, pCfg->myIndex); - + int32_t len = snprintf(buf, bufLen, "{num:%d, as:%d, [", pCfg->replicaNum, pCfg->myIndex); for (int32_t i = 0; i < pCfg->replicaNum; ++i) { + len += snprintf(buf + len, bufLen - len, "%s:%d", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort); if (i < pCfg->replicaNum - 1) { - len += snprintf(buf + len, bufLen - len, "%s:%d, ", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort); - } else { - len += snprintf(buf + len, bufLen - len, "%s:%d}", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort); + len += snprintf(buf + len, bufLen - len, "%s", ", "); } } + len += snprintf(buf + len, bufLen - len, "%s", "]}"); } void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) { @@ -89,32 +89,55 @@ bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && // for leader static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { - int32_t len = 5; - + int32_t len = 0; + len += snprintf(buf + len, bufLen - len, "%s", "{"); for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i])); - + len += snprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs); if (i < pSyncNode->replicaNum - 1) { - len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 ",", i, tsMs); - } else { - len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 "}", i, tsMs); + len += snprintf(buf + len, bufLen - len, "%s", ","); } } + len += snprintf(buf + len, bufLen - len, "%s", "}"); } // for follower static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { - int32_t len = 4; - + int32_t len = 0; + len += snprintf(buf + len, bufLen - len, "%s", "{"); for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->replicasId[i])); - + len += snprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs); if (i < pSyncNode->replicaNum - 1) { - len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 ",", i, tsMs); - } else { - len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 "}", i, tsMs); + len += snprintf(buf + len, bufLen - len, "%s", ","); } } + len += snprintf(buf + len, bufLen - len, "%s", "}"); +} + +static void syncLogBufferStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { + SSyncLogBuffer* pBuf = pSyncNode->pLogBuf; + if (pBuf == NULL) { + return; + } + int len = 0; + len += snprintf(buf + len, bufLen - len, "[%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pBuf->startIndex, + pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); +} + +static void syncLogReplMgrStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { + int len = 0; + len += snprintf(buf + len, bufLen - len, "%s", "{"); + for (int32_t i = 0; i < pSyncNode->replicaNum; i++) { + SSyncLogReplMgr* pMgr = pSyncNode->logReplMgrs[i]; + if (pMgr == NULL) break; + len += snprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 " %" PRId64 ", %" PRId64 ")", i, pMgr->restored, + pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex); + if (i + 1 < pSyncNode->replicaNum) { + len += snprintf(buf + len, bufLen - len, "%s", ", "); + } + } + len += snprintf(buf + len, bufLen - len, "%s", "}"); } static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { @@ -156,16 +179,19 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo int32_t cacheHit = pNode->pLogStore->cacheHit; int32_t cacheMiss = pNode->pLogStore->cacheMiss; - char cfgStr[1024]; + char cfgStr[1024] = ""; syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr)); - char peerStr[1024] = "{"; - syncPeerState2Str(pNode, peerStr, sizeof(peerStr)); + char replMgrStatesStr[1024] = ""; + syncLogReplMgrStates2Str(pNode, replMgrStatesStr, sizeof(replMgrStatesStr)); - char hbrTimeStr[256] = "hbr:{"; + char bufferStatesStr[256] = ""; + syncLogBufferStates2Str(pNode, bufferStatesStr, sizeof(bufferStatesStr)); + + char hbrTimeStr[256] = ""; syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr)); - char hbTimeStr[256] = "hb:{"; + char hbTimeStr[256] = ""; syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr)); char eventLog[512]; // {0}; @@ -181,21 +207,21 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo // restore error code terrno = errCode; - + if (pNode != NULL) { taosPrintLog(flags, level, dflag, "vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", first-ver:%" PRId64 ", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64 - ", elect-times:%d, as-leader-times:%d, cfg-ch-times:%d, hit:%d, mis:%d, hb-slow:%d, hbr-slow:%d, " + ", elect-times:%d, as-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, " "aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64 - ", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64 ", %s, %s, %s, %s", + ", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64 + ", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s", pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum, - pNode->becomeLeaderNum, pNode->configChangeNum, cacheHit, cacheMiss, pNode->hbSlowNum, - pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex, pNode->replicaNum, - pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), - pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr, hbTimeStr, - hbrTimeStr); + pNode->becomeLeaderNum, pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, + pNode->snapshottingIndex, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, + pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, + bufferStatesStr, replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr); } } @@ -216,7 +242,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); } - char cfgStr[1024]; + char cfgStr[1024] = ""; syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr)); char peerStr[1024] = "{"; @@ -262,7 +288,7 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); } - char cfgStr[1024]; + char cfgStr[1024] = ""; syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr)); char peerStr[1024] = "{"; @@ -304,7 +330,7 @@ void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) { sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd, - syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s); + syncLocalCmdGetStr(pMsg->cmd), pMsg->currentTerm, pMsg->commitIndex, s); } void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { diff --git a/source/libs/tdb/inc/tdb.h b/source/libs/tdb/inc/tdb.h index 10a99bb1fab525a9ff4569d55c3d688bb099ee46..0e20941b3ae472986d5116616eda05af93a83dc1 100644 --- a/source/libs/tdb/inc/tdb.h +++ b/source/libs/tdb/inc/tdb.h @@ -74,7 +74,12 @@ int32_t tdbTbcUpsert(TBC *pTbc, const void *pKey, int nKey, const void *pData, i int32_t tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), void *xArg, int flags); -int32_t tdbTxnClose(TXN *pTxn); +int32_t tdbTxnCloseImpl(TXN *pTxn); +#define tdbTxnClose(pTxn) \ + do { \ + tdbTxnCloseImpl(pTxn); \ + (pTxn) = NULL; \ + } while (0) // other void tdbFree(void *); diff --git a/source/libs/tdb/src/db/tdbTxn.c b/source/libs/tdb/src/db/tdbTxn.c index 055d9c7f984b05003a7a69d996e1eb54a51bbe8c..24f955fe2fde29c089b4768baae54eaea892d09e 100644 --- a/source/libs/tdb/src/db/tdbTxn.c +++ b/source/libs/tdb/src/db/tdbTxn.c @@ -28,13 +28,18 @@ int tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void return 0; } -int tdbTxnClose(TXN *pTxn) { +int tdbTxnCloseImpl(TXN *pTxn) { if (pTxn) { if (pTxn->jPageSet) { hashset_destroy(pTxn->jPageSet); pTxn->jPageSet = NULL; } + if (pTxn->jfd) { + tdbOsClose(pTxn->jfd); + ASSERT(pTxn->jfd == NULL); + } + tdbOsFree(pTxn); } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2632d290e956c536bdcfb3ac70d20aa3607e7382..1a99db5f992d51f5e01ba5120e2099f1d53a307f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1126,7 +1126,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { - tGTrace("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port, + tGError("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port, uv_err_name(ret)); uv_timer_stop(conn->timer); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 8e6628bb21eabc25e2e66000e58b796854c5ac34..44e88a4dcc425a728789a5e95b9fbf19bcaae415 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -292,19 +292,10 @@ void walAlignVersions(SWal* pWal) { } pWal->vers.lastVer = pWal->vers.snapshotVer; } - if (pWal->vers.commitVer < pWal->vers.snapshotVer) { - wWarn("vgId:%d, commitVer:%" PRId64 " is less than snapshotVer:%" PRId64 ". align with it.", pWal->cfg.vgId, - pWal->vers.commitVer, pWal->vers.snapshotVer); - pWal->vers.commitVer = pWal->vers.snapshotVer; - } - if (pWal->vers.appliedVer < pWal->vers.snapshotVer) { - wWarn("vgId:%d, appliedVer:%" PRId64 " is less than snapshotVer:%" PRId64 ". align with it.", pWal->cfg.vgId, - pWal->vers.appliedVer, pWal->vers.snapshotVer); - pWal->vers.appliedVer = pWal->vers.snapshotVer; - } - - pWal->vers.commitVer = TMIN(pWal->vers.lastVer, pWal->vers.commitVer); - pWal->vers.appliedVer = TMIN(pWal->vers.commitVer, pWal->vers.appliedVer); + // reset commitVer and appliedVer + pWal->vers.commitVer = pWal->vers.snapshotVer; + pWal->vers.appliedVer = pWal->vers.snapshotVer; + wInfo("vgId:%d, reset commitVer to %" PRId64, pWal->cfg.vgId, pWal->vers.commitVer); } bool walLogEntriesComplete(const SWal* pWal) { diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index a5c7bf1abdb8fd4ee71beb7fb3186a21cda03c32..51307dc17d10821d126e030e22270206af650ba4 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -105,7 +105,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { wInfo("vgId:%d, wal rollback for version %" PRId64, pWal->cfg.vgId, ver); int64_t code; char fnameStr[WAL_FILE_LEN]; - if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) { + if (ver > pWal->vers.lastVer || ver <= pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) { terrno = TSDB_CODE_WAL_INVALID_VER; taosThreadMutexUnlock(&pWal->mutex); return -1; diff --git a/tests/script/tsim/sma/rsmaCreateInsertQuery.sim b/tests/script/tsim/sma/rsmaCreateInsertQuery.sim index 04cf09715c7def3d73800e9a5cbc6f2e0fc78ddd..508e6f88c14f09d4e55c8bd3d40ea78426e5fa43 100644 --- a/tests/script/tsim/sma/rsmaCreateInsertQuery.sim +++ b/tests/script/tsim/sma/rsmaCreateInsertQuery.sim @@ -82,8 +82,8 @@ endi #=================================================================== - #==================== reboot to trigger commit data to file +sql flush database d0; system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s start diff --git a/tests/script/tsim/sma/rsmaPersistenceRecovery.sim b/tests/script/tsim/sma/rsmaPersistenceRecovery.sim index faff48b61c1216e744d349a301f39fabd6dc578a..4117a2403dcc7156a8b9ea3adb840fe25b26b376 100644 --- a/tests/script/tsim/sma/rsmaPersistenceRecovery.sim +++ b/tests/script/tsim/sma/rsmaPersistenceRecovery.sim @@ -85,6 +85,7 @@ endi #==================== reboot to trigger commit data to file +sql flush database d0; system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s start