diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a13d203889fbd36c383f622d6e0d6d29d91d39a6..27aadee96a0ec20bd8ca2e9bbcf30e646ee5a8f3 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -49,10 +49,13 @@ extern "C" { #define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500 #define SYNC_SNAP_RESEND_MS 1000 * 60 +#define SYNC_VND_COMMIT_MIN_MS 200 +#define SYNC_VND_COMMIT_MAX_MS 60000 + #define SYNC_MAX_BATCH_SIZE 1 #define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_INVALID -1 -#define SYNC_TERM_INVALID -1 // 0xFFFFFFFFFFFFFFFF +#define SYNC_TERM_INVALID -1 typedef enum { SYNC_STRATEGY_NO_SNAPSHOT = 0, diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 313a88fc5c60f3ac2ae84b1ca6cf811033f5779c..4469e0afe68d68db76033f65dc82f5c6165a6343 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -79,6 +79,8 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { char path[TSDB_FILENAME_LEN] = {0}; + vnodeProposeCommitOnNeed(pVnode->pImpl); + taosThreadRwlockWrlock(&pMgmt->lock); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); taosThreadRwlockUnlock(&pMgmt->lock); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 540f0c3127bd26d12064bfca9eb0b135dae8622f..7a3e3cb4a52c5fc7c5eb965a09389eccc2a5b1fa 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -88,6 +88,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); +void vnodeProposeCommitOnNeed(SVnode *pVnode); // meta typedef struct SMeta SMeta; // todo: remove diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 28797c536166d554aaa99f1d4866e76e85564971..d8c4b001b137a676228b89319dba939dd7a206d6 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -102,6 +102,7 @@ void vnodeSyncClose(SVnode* pVnode); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg, int32_t code); bool vnodeIsLeader(SVnode* pVnode); bool vnodeIsRoleLeader(SVnode* pVnode); +int vnodeShouldCommit(SVnode* pVnode); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index e56f130c2c4af7a70957e29c34962f0b06331c9e..b9080fd6c652bf9c9b3e67fb99f5ab5cb93f89b5 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -348,6 +348,7 @@ struct SVnode { STQ* pTq; SSink* pSink; tsem_t canCommit; + int64_t commitMs; int64_t sync; TdThreadMutex lock; bool blocked; diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index 1b5f74255948da71938eb274fe3cfd05700d8aa9..8974d93678133e5d0a14f849dd5c58f787f78f80 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -203,6 +203,7 @@ _err: int metaClose(SMeta *pMeta) { if (pMeta) { + if (pMeta->txn) tdbTxnClose(pMeta->txn); if (pMeta->pCache) metaCacheClose(pMeta); if (pMeta->pIdx) metaCloseIdx(pMeta); if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 4daab074b570a4acffcfaf976b2e7358f697eccd..f7ec18e50e33e6d095781ebfcbaef40785a367c9 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -59,6 +59,17 @@ int vnodeBegin(SVnode *pVnode) { } int vnodeShouldCommit(SVnode *pVnode) { + if (!pVnode->inUse || !osDataSpaceAvailable()) { + return false; + } + + int64_t nowMs = taosGetMonoTimestampMs(); + + return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pVnode->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) || + (pVnode->inUse->size > 0 && pVnode->commitMs + SYNC_VND_COMMIT_MAX_MS < nowMs)); +} + +int vnodeShouldCommitOld(SVnode *pVnode) { if (pVnode->inUse) { return osDataSpaceAvailable() && (pVnode->inUse->size > pVnode->inUse->node.size); } @@ -194,6 +205,7 @@ static void vnodePrepareCommit(SVnode *pVnode) { vnodeBufPoolUnRef(pVnode->inUse); pVnode->inUse = NULL; } + static int32_t vnodeCommitTask(void *arg) { int32_t code = 0; @@ -210,6 +222,7 @@ _exit: taosMemoryFree(pInfo); return code; } + int vnodeAsyncCommit(SVnode *pVnode) { int32_t code = 0; @@ -257,7 +270,9 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { SVnode *pVnode = pInfo->pVnode; vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode), - pVnode->state.commitID, pVnode->state.applied, pVnode->state.applyTerm); + pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm); + + pVnode->commitMs = taosGetMonoTimestampMs(); // persist wal before starting if (walPersist(pVnode->pWal) < 0) { diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index e09fafb75690f13dd160763126d139dd21ae8c1b..58b73d806f4d984906c96643caf7cb2b701d2e25 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -249,15 +249,18 @@ void vnodePreClose(SVnode *pVnode) { void vnodeClose(SVnode *pVnode) { if (pVnode) { - vnodeSyncCommit(pVnode); vnodeSyncClose(pVnode); vnodeQueryClose(pVnode); + + tsem_wait(&pVnode->canCommit); walClose(pVnode->pWal); tqClose(pVnode->pTq); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); smaClose(pVnode->pSma); metaClose(pVnode->pMeta); vnodeCloseBufPool(pVnode); + tsem_post(&pVnode->canCommit); + // destroy handle tsem_destroy(&(pVnode->canCommit)); tsem_destroy(&pVnode->syncSem); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 60928881362c563bc6f4755c871624e75e336d5a..49bdfec269fd70caf884e2e2dcfdcd5a95fe151b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -200,6 +200,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp // skip header pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); len = pMsg->contLen - sizeof(SMsgHead); + bool needCommit = false; switch (pMsg->msgType) { /* META */ @@ -296,9 +297,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp); break; case TDMT_VND_COMMIT: - vnodeSyncCommit(pVnode); - vnodeBegin(pVnode); - goto _exit; + needCommit = true; + break; default: vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType); return -1; @@ -315,7 +315,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp } // commit if need - if (vnodeShouldCommit(pVnode)) { + if (needCommit) { vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); vnodeAsyncCommit(pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 2c23646db1c874a0cbb671784fbcdcec0e5918ec..e000b26c6befe4833ebb49b39684515e910f1ec5 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -101,6 +101,64 @@ static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) } } +static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) { + int64_t seq = 0; + + taosThreadMutexLock(&pVnode->lock); + int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq); + bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType)); + if (wait) { + ASSERT(!pVnode->blocked); + pVnode->blocked = true; + pVnode->blockSec = taosGetTimestampSec(); + pVnode->blockSeq = seq; +#if 0 + pVnode->blockInfo = pMsg->info; +#endif + } + taosThreadMutexUnlock(&pVnode->lock); + + if (code > 0) { + vnodeHandleWriteMsg(pVnode, pMsg); + } else if (code < 0) { + if (terrno != 0) code = terrno; + vnodeHandleProposeError(pVnode, pMsg, code); + } + + if (wait) vnodeWaitBlockMsg(pVnode, pMsg); + return code; +} + +void vnodeProposeCommitOnNeed(SVnode *pVnode) { + if (!vnodeShouldCommit(pVnode)) { + return; + } + + int32_t contLen = sizeof(SMsgHead); + SMsgHead *pHead = rpcMallocCont(contLen); + pHead->contLen = contLen; + pHead->vgId = pVnode->config.vgId; + + SRpcMsg rpcMsg = {0}; + rpcMsg.msgType = TDMT_VND_COMMIT; + rpcMsg.contLen = contLen; + rpcMsg.pCont = pHead; + rpcMsg.info.noResp = 1; + + bool isWeak = false; + if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) { + vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr()); + goto _out; + } + + vInfo("vgId:%d, proposed vnode commit", pVnode->config.vgId); + +_out: + pVnode->commitMs = taosGetMonoTimestampMs(); + rpcFreeCont(rpcMsg.pCont); + rpcMsg.pCont = NULL; +} + #if BATCH_ENABLE static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) { @@ -178,6 +236,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) continue; } + vnodeProposeCommitOnNeed(pVnode); + code = vnodePreProcessWriteMsg(pVnode, pMsg); if (code != 0) { vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr()); @@ -205,34 +265,6 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) #else -static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) { - int64_t seq = 0; - - taosThreadMutexLock(&pVnode->lock); - int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq); - bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType)); - if (wait) { - ASSERT(!pVnode->blocked); - pVnode->blocked = true; - pVnode->blockSec = taosGetTimestampSec(); - pVnode->blockSeq = seq; -#if 0 - pVnode->blockInfo = pMsg->info; -#endif - } - taosThreadMutexUnlock(&pVnode->lock); - - if (code > 0) { - vnodeHandleWriteMsg(pVnode, pMsg); - } else if (code < 0) { - if (terrno != 0) code = terrno; - vnodeHandleProposeError(pVnode, pMsg, code); - } - - if (wait) vnodeWaitBlockMsg(pVnode, pMsg); - return code; -} - void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnode *pVnode = pInfo->ahandle; int32_t vgId = pVnode->config.vgId; @@ -256,6 +288,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) continue; } + vnodeProposeCommitOnNeed(pVnode); + code = vnodePreProcessWriteMsg(pVnode, pMsg); if (code != 0) { vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr()); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index d875d3ca09967671f13a9261edfbeb61e9b0ab34..34fbebdc39a3655eedc32d6ba75d7fc0776d3522 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -326,6 +326,8 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt } // update + ASSERT(pBuf->startIndex < index); + ASSERT(index - pBuf->startIndex < pBuf->size); ASSERT(pBuf->entries[index % pBuf->size].pItem == NULL); SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm}; pEntry = NULL; @@ -454,6 +456,11 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType)); } + if (pEntry->originalRpcType == TDMT_VND_COMMIT) { + sInfo("vgId:%d, fsm execute vnode commit. index: %" PRId64 ", term: %" PRId64 "", pNode->vgId, pEntry->index, + pEntry->term); + } + SRpcMsg rpcMsg = {0}; syncEntry2OriginalRpc(pEntry, &rpcMsg); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 018ac5bb7da3764d2b54b6dfc7995ffe9d9a47d8..d86ff847f901803e314c440034fc88eef65c51bc 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -219,6 +219,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ASSERT(pEntry->index == index); + if (pEntry->originalRpcType == TDMT_VND_COMMIT) { + walFsync(pWal, true); + } + sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index, TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed); return 0; diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 79a38cad7a55fdcc0a587c48299d155d3f396931..6e945b591e135e0cad21d7552b8fa473a5483f81 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -107,7 +107,7 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *p taosThreadMutexUnlock(&pObj->mutex); return 1; // get one object } else { - sNError(pObj->data, "get-and-del message handle, no object of seq:%" PRIu64, seq); + sNTrace(pObj->data, "get-and-del message handle, no object of seq:%" PRIu64, seq); } taosThreadMutexUnlock(&pObj->mutex); diff --git a/source/libs/tdb/inc/tdb.h b/source/libs/tdb/inc/tdb.h index 10a99bb1fab525a9ff4569d55c3d688bb099ee46..0e20941b3ae472986d5116616eda05af93a83dc1 100644 --- a/source/libs/tdb/inc/tdb.h +++ b/source/libs/tdb/inc/tdb.h @@ -74,7 +74,12 @@ int32_t tdbTbcUpsert(TBC *pTbc, const void *pKey, int nKey, const void *pData, i int32_t tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), void *xArg, int flags); -int32_t tdbTxnClose(TXN *pTxn); +int32_t tdbTxnCloseImpl(TXN *pTxn); +#define tdbTxnClose(pTxn) \ + do { \ + tdbTxnCloseImpl(pTxn); \ + (pTxn) = NULL; \ + } while (0) // other void tdbFree(void *); diff --git a/source/libs/tdb/src/db/tdbPage.c b/source/libs/tdb/src/db/tdbPage.c index 50dc8e0a6506abf40cd85b33acdd0e50f737168b..d35f05461d423540f2b58842f2270ab02bab0089 100644 --- a/source/libs/tdb/src/db/tdbPage.c +++ b/source/libs/tdb/src/db/tdbPage.c @@ -77,7 +77,7 @@ int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg) u8 *ptr; tdbTrace("page/destroy: %p/%d %p", pPage, pPage->id, xFree); - ASSERT(!pPage->isDirty); + // ASSERT(!pPage->isDirty); ASSERT(xFree); for (int iOvfl = 0; iOvfl < pPage->nOverflow; iOvfl++) { diff --git a/source/libs/tdb/src/db/tdbTxn.c b/source/libs/tdb/src/db/tdbTxn.c index 055d9c7f984b05003a7a69d996e1eb54a51bbe8c..24f955fe2fde29c089b4768baae54eaea892d09e 100644 --- a/source/libs/tdb/src/db/tdbTxn.c +++ b/source/libs/tdb/src/db/tdbTxn.c @@ -28,13 +28,18 @@ int tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void return 0; } -int tdbTxnClose(TXN *pTxn) { +int tdbTxnCloseImpl(TXN *pTxn) { if (pTxn) { if (pTxn->jPageSet) { hashset_destroy(pTxn->jPageSet); pTxn->jPageSet = NULL; } + if (pTxn->jfd) { + tdbOsClose(pTxn->jfd); + ASSERT(pTxn->jfd == NULL); + } + tdbOsFree(pTxn); } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d144a76eb035522272bb13b8db630e0042485519..d37dff4d016c16804cc6999751dc439b2d0e4159 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1124,7 +1124,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { - tGTrace("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port, + tGError("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port, uv_err_name(ret)); uv_timer_stop(conn->timer);