diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 42505cd92b9e48a782d065a89f6a85500276e407..a25f30a322e228a2695648cfed9853b627fe85af 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -347,6 +347,7 @@ struct SVState { int64_t applyTerm; int64_t commitID; int64_t commitTerm; + int64_t ttlExpireTime; }; struct SVStatis { diff --git a/source/dnode/vnode/src/meta/metaTtl.c b/source/dnode/vnode/src/meta/metaTtl.c index fdd62802ee17e21579ae074328ef7e159aeb5ef8..fd6d3b2229973f4cd907f0c2f183875e5dd40045 100644 --- a/source/dnode/vnode/src/meta/metaTtl.c +++ b/source/dnode/vnode/src/meta/metaTtl.c @@ -42,6 +42,8 @@ static int32_t ttlMgrULock(STtlManger *pTtlMgr); const char *ttlTbname = "ttl.idx"; const char *ttlV1Tbname = "ttlv1.idx"; +#define TTL_EXPIRE_TIME_UNINIT -1 + int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix, int32_t flushThreshold) { int ret = TSDB_CODE_SUCCESS; int64_t startNs = taosGetTimestampNs(); @@ -59,7 +61,7 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *lo strcpy(logBuffer, logPrefix); pTtlMgr->logPrefix = logBuffer; pTtlMgr->flushThreshold = flushThreshold; - pTtlMgr->expireTimeMs = 0; // TODO(LSG) + pTtlMgr->expireTimeMs = TTL_EXPIRE_TIME_UNINIT; ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback); if (ret < 0) { @@ -370,9 +372,9 @@ _out: int ttlMgrFindExpired(STtlManger *pTtlMgr, SArray *pTbUids) { ttlMgrRLock(pTtlMgr); - if (pTtlMgr->expireTimeMs == 0) { + if (pTtlMgr->expireTimeMs == TTL_EXPIRE_TIME_UNINIT) { metaError("%s, ttl mgr expireTimeMs uninitialized, skip find expired", pTtlMgr->logPrefix); - return 0; + goto _out; } TBC *pCur; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index a382e4ed8bd70af7e4853929b6dd5cbf1e4f6da7..7edad08d1b55fd331d0763d5f9937a6f20682027 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -291,6 +291,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { pInfo->info.state.committed = pVnode->state.applied; pInfo->info.state.commitTerm = pVnode->state.applyTerm; pInfo->info.state.commitID = ++pVnode->state.commitID; + pInfo->info.state.ttlExpireTime = pVnode->state.ttlExpireTime; pInfo->pVnode = pVnode; pInfo->txn = metaGetTxn(pVnode->pMeta); @@ -572,6 +573,7 @@ static int vnodeEncodeState(const void *pObj, SJson *pJson) { if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "ttl expire time", pState->ttlExpireTime) < 0) return -1; return 0; } @@ -586,6 +588,8 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) { if (code < 0) return -1; tjsonGetNumberValue(pJson, "commit term", pState->commitTerm, code); if (code < 0) return -1; + tjsonGetNumberValue(pJson, "ttl expire time", pState->ttlExpireTime, code); + if (code < 0) return -1; return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 65fc552365ff1b9de1d866c098997ed7917dae9a..8796f8714827611c85f2c3b3f91d41e8d3fb4a69 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -60,6 +60,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs info.state.committed = -1; info.state.applied = -1; info.state.commitID = 0; + info.state.ttlExpireTime = 0; vInfo("vgId:%d, save config while create", info.config.vgId); if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir) < 0) { @@ -364,6 +365,7 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC pVnode->state.commitID = info.state.commitID; pVnode->state.applied = info.state.committed; pVnode->state.applyTerm = info.state.commitTerm; + pVnode->state.ttlExpireTime = info.state.ttlExpireTime; pVnode->pTfs = pTfs; pVnode->diskPrimary = diskPrimary; pVnode->msgCb = msgCb; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 1cb460657affdc57fdecffada0cd957f487135f4..7d3adeac42c3622035534a603289acf7ac6b3075 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -720,7 +720,9 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, } vDebug("vgId:%d, drop ttl table req will be processed, time:%" PRId32, pVnode->config.vgId, ttlReq.timestampSec); - code = metaTtlSetExpireTime(pVnode->pMeta, (int64_t)ttlReq.timestampSec * 1000); + int64_t ttlExpireTimeMs = (int64_t)ttlReq.timestampSec * 1000; + atomic_store_64(&pVnode->state.ttlExpireTime, ttlExpireTimeMs); + code = metaTtlSetExpireTime(pVnode->pMeta, ttlExpireTimeMs); if (code) goto end; code = vnodeAsyncTtlDropTable(pVnode);