diff --git a/source/dnode/vnode/src/inc/metaTtl.h b/source/dnode/vnode/src/inc/metaTtl.h index 2c261826248963c28882d36367a4f33472576462..9d564a018e3010bf08413ab08526263176a34b66 100644 --- a/source/dnode/vnode/src/inc/metaTtl.h +++ b/source/dnode/vnode/src/inc/metaTtl.h @@ -39,6 +39,8 @@ typedef struct STtlManger { SHashObj* pDirtyUids; // hash TTB* pTtlIdx; // btree<{deleteTime, tuid}, ttl> + int64_t expireTimeMs; // ttl table expiration time + char* logPrefix; int32_t flushThreshold; // max dirty entry number in memory. if -1, flush will not be triggered by write-ops } STtlManger; @@ -96,7 +98,9 @@ int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx); int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx); int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn); -int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids); +int ttlMgrFindExpired(STtlManger* pTtlMgr, SArray* pTbUids); + +int ttlMgrSetExpireTime(STtlManger* pTtlMgr, int64_t timePointMs); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 85ef384ea98a79c11c6e7774f9bb797e5de21e8e..6987c0c64a7c747261fb85e372c1128cdd242719 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -107,6 +107,7 @@ int32_t vnodeCommitInfo(const char* dir); int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); int32_t vnodeSyncCommit(SVnode* pVnode); int32_t vnodeAsyncCommit(SVnode* pVnode); +int32_t vnodeAsyncTtlDropTable(SVnode* pVnode); bool vnodeShouldRollback(SVnode* pVnode); // vnodeSync.c diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index f6bbd24c4fc0b5b13df44b773cc01db039ba5a58..42505cd92b9e48a782d065a89f6a85500276e407 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -77,6 +77,7 @@ typedef struct SRSmaSnapReader SRSmaSnapReader; typedef struct SRSmaSnapWriter SRSmaSnapWriter; typedef struct SSnapDataHdr SSnapDataHdr; typedef struct SCommitInfo SCommitInfo; +typedef struct STtlInfo STtlInfo; typedef struct SCompactInfo SCompactInfo; typedef struct SQueryNode SQueryNode; @@ -151,7 +152,8 @@ int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid); int32_t metaTrimTables(SMeta* pMeta); -int metaTtlDropTable(SMeta* pMeta, int64_t timePointMs, SArray* tbUids); +int32_t metaTtlSetExpireTime(SMeta* pMeta, int64_t timePointMs); +int metaTtlDropTable(SMeta* pMeta, SArray* tbUids); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); @@ -491,6 +493,10 @@ struct SCommitInfo { TXN* txn; }; +struct STtlInfo { + SVnode* pVnode; +}; + struct SCompactInfo { SVnode* pVnode; int32_t flag; diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 4731c768c7c0f17886ce8894928a73eb5b5409bc..ae4b351966b1a57311f15be49ed30fce6fc534ab 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -927,7 +927,12 @@ end: return code; } -int metaTtlDropTable(SMeta *pMeta, int64_t timePointMs, SArray *tbUids) { +int metaTtlSetExpireTime(SMeta *pMeta, int64_t timePointMs) { + ttlMgrSetExpireTime(pMeta->pTtlMgr, timePointMs); + return 0; +} + +int metaTtlDropTable(SMeta *pMeta, SArray *tbUids) { int64_t startNs = taosGetTimestampNs(); int ret = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn); @@ -936,7 +941,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t timePointMs, SArray *tbUids) { return ret; } - ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids); + ret = ttlMgrFindExpired(pMeta->pTtlMgr, tbUids); if (ret != 0) { metaError("ttl failed to find expired table, ret:%d", ret); return ret; diff --git a/source/dnode/vnode/src/meta/metaTtl.c b/source/dnode/vnode/src/meta/metaTtl.c index a68aa0a73a2e2b835d97757459c77023f8708881..da2538e66e9a49de9d53e11e06072457ccedd7d5 100644 --- a/source/dnode/vnode/src/meta/metaTtl.c +++ b/source/dnode/vnode/src/meta/metaTtl.c @@ -59,6 +59,7 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *lo strcpy(logBuffer, logPrefix); pTtlMgr->logPrefix = logBuffer; pTtlMgr->flushThreshold = flushThreshold; + pTtlMgr->expireTimeMs = 0; // TODO(LSG) ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback); if (ret < 0) { @@ -358,9 +359,14 @@ _out: return ret; } -int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids) { +int ttlMgrFindExpired(STtlManger *pTtlMgr, SArray *pTbUids) { ttlMgrRLock(pTtlMgr); + if (pTtlMgr->expireTimeMs == 0) { + metaError("%s, ttl expireTimeMs uninitialized, skip find expired", pTtlMgr->logPrefix); + return 0; + } + TBC *pCur; int ret = tdbTbcOpen(pTtlMgr->pTtlIdx, &pCur, NULL); if (ret < 0) { @@ -368,7 +374,7 @@ int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids) } STtlIdxKeyV1 ttlKey = {0}; - ttlKey.deleteTimeMs = timePointMs; + ttlKey.deleteTimeMs = pTtlMgr->expireTimeMs; ttlKey.uid = INT64_MAX; int c = 0; tdbTbcMoveTo(pCur, &ttlKey, sizeof(ttlKey), &c); @@ -475,6 +481,20 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) { return ret; } +int ttlMgrSetExpireTime(STtlManger *pTtlMgr, int64_t timePointMs) { + ttlMgrWLock(pTtlMgr); + + if (pTtlMgr->expireTimeMs > timePointMs) { + metaWarn("%s, ttl mgr new expireTimeMs:%" PRId64 " is smaller than old:%" PRId64 ". maybe restoring?", + pTtlMgr->logPrefix, timePointMs, pTtlMgr->expireTimeMs); + } else { + pTtlMgr->expireTimeMs = timePointMs; + } + + ttlMgrULock(pTtlMgr); + return 0; +} + static int32_t ttlMgrRLock(STtlManger *pTtlMgr) { int32_t ret = 0; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 33c6f9d533099a57ebc0ea29e744f832c3d24734..fdd7f7f7e6979b9a39c2341b419ca4cc838d5deb 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -353,6 +353,59 @@ static void vnodeReturnBufPool(SVnode *pVnode) { taosThreadMutexUnlock(&pVnode->mutex); } + +static int32_t vnodeTtlTask(void *arg) { + int32_t code = 0; + + STtlInfo *pInfo = (STtlInfo *)arg; + SVnode *pVnode = pInfo->pVnode; + SArray *tbUids = taosArrayInit(8, sizeof(int64_t)); + + code = metaTtlDropTable(pVnode->pMeta, tbUids); + if (code) { + vFatal("vgId:%d, meta failed to drop table by ttl since %s", TD_VID(pVnode), terrstr()); + goto _exit; + } + + // if (taosArrayGetSize(tbUids) > 0) { + // tqUpdateTbUidList(pVnode->pTq, tbUids, false); + // } + + // TODO(LSG) + //vnodeAsyncRentention(pVnode, ttlReq.timestampSec); + +_exit: + // end commit + taosMemoryFree(pInfo); + return code; +} + +int vnodeAsyncTtlDropTable(SVnode *pVnode) { + int32_t code = 0; + + STtlInfo *pInfo = (STtlInfo *)taosMemoryCalloc(1, sizeof(*pInfo)); + if (NULL == pInfo) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + // schedule the task + code = vnodeScheduleTask(vnodeTtlTask, pInfo); + +_exit: + if (code) { + if (NULL != pInfo) { + taosMemoryFree(pInfo); + } + vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), + pVnode->state.commitID); + } else { + vInfo("vgId:%d, vnode async commit done, commitId:%" PRId64 " term:%" PRId64 " applied:%" PRId64, TD_VID(pVnode), + pVnode->state.commitID, pVnode->state.applyTerm, pVnode->state.applied); + } + return code; +} + static int32_t vnodeCommitTask(void *arg) { int32_t code = 0; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index f0de0c752966cb3a4db13b6b400f0439c3ea4460..68c78a5701b7cb38a7dfcf304778378e86ca3261 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -710,29 +710,23 @@ _exit: } static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { - SArray *tbUids = taosArrayInit(8, sizeof(int64_t)); - if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY; - + int32_t code = 0; SVDropTtlTableReq ttlReq = {0}; + + // decode if (tDeserializeSVDropTtlTableReq(pReq, len, &ttlReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; + code = TSDB_CODE_INVALID_MSG; goto end; } vDebug("vgId:%d, drop ttl table req will be processed, time:%" PRId32, pVnode->config.vgId, ttlReq.timestampSec); - int32_t ret = metaTtlDropTable(pVnode->pMeta, (int64_t)ttlReq.timestampSec * 1000, tbUids); - if (ret != 0) { - goto end; - } - if (taosArrayGetSize(tbUids) > 0) { - tqUpdateTbUidList(pVnode->pTq, tbUids, false); - } + code = metaTtlSetExpireTime(pVnode->pMeta, (int64_t)ttlReq.timestampSec * 1000); + if (code) goto end; - vnodeDoRetention(pVnode, ttlReq.timestampSec); + code = vnodeAsyncTtlDropTable(pVnode); end: - taosArrayDestroy(tbUids); - return ret; + return code; } static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {