diff --git a/source/dnode/vnode/src/inc/metaTtl.h b/source/dnode/vnode/src/inc/metaTtl.h index a3d3ceab24dce0d9c4e5606d719d4a7c834053a8..45faceb1ea2bfa7468e486ab547cc3cd83521ba7 100644 --- a/source/dnode/vnode/src/inc/metaTtl.h +++ b/source/dnode/vnode/src/inc/metaTtl.h @@ -38,6 +38,8 @@ typedef struct STtlManger { SHashObj* pTtlCache; // key: tuid, value: {ttl, ctime} SHashObj* pDirtyUids; // dirty tuid TTB* pTtlIdx; // btree<{deleteTime, tuid}, ttl> + + char* logPrefix; } STtlManger; typedef struct { @@ -77,9 +79,10 @@ typedef struct { typedef struct { tb_uid_t uid; TXN* pTxn; + int64_t ttlDays; } STtlDelTtlCtx; -int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback); +int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback, const char* logPrefix); void ttlMgrClose(STtlManger* pTtlMgr); int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta); diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index e2253e26db311588a39fcb585dfe0514b622cb50..9d4fdf8b11b23dac044b7fea77083c3d52b34060 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -128,7 +128,9 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { } // open pTtlMgr ("ttlv1.idx") - ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0); + char logPrefix[128] = {0}; + sprintf(logPrefix, "vgId:%d", TD_VID(pVnode)); + ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0, logPrefix); if (ret < 0) { metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 670ab2643bc55f2effefdc56816d024e35856ddb..ad96004098af64da70b13061d44d6467bb5ffecd 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -974,7 +974,15 @@ static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) { } static int metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) { + if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0; + STtlDelTtlCtx ctx = {.uid = pME->uid, .pTxn = pMeta->txn}; + if (pME->type == TSDB_CHILD_TABLE) { + ctx.ttlDays = pME->ctbEntry.ttlDays; + } else { + ctx.ttlDays = pME->ntbEntry.ttlDays; + } + return ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx); } @@ -1968,7 +1976,6 @@ static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) { if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0; STtlUpdTtlCtx ctx = {.uid = pME->uid}; - if (pME->type == TSDB_CHILD_TABLE) { ctx.ttlDays = pME->ctbEntry.ttlDays; ctx.changeTimeMs = pME->ctbEntry.btime; diff --git a/source/dnode/vnode/src/meta/metaTtl.c b/source/dnode/vnode/src/meta/metaTtl.c index c6cb82614969db6dc2b14404fd83d5753e848f34..045a759fad23c1c8cb1986cafb0f8a1b6062de32 100644 --- a/source/dnode/vnode/src/meta/metaTtl.c +++ b/source/dnode/vnode/src/meta/metaTtl.c @@ -39,8 +39,8 @@ static int32_t ttlMgrULock(STtlManger *pTtlMgr); const char *ttlTbname = "ttl.idx"; const char *ttlV1Tbname = "ttlv1.idx"; -int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) { - int ret = TSDB_CODE_SUCCESS; +int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix) { + int ret = TSDB_CODE_SUCCESS; int64_t startNs = taosGetTimestampNs(); *ppTtlMgr = NULL; @@ -48,9 +48,17 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) { STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr)); if (pTtlMgr == NULL) return TSDB_CODE_OUT_OF_MEMORY; + char *logBuffer = (char *)tdbOsCalloc(1, strlen(logPrefix) + 1); + if (logBuffer == NULL) { + tdbOsFree(pTtlMgr); + return TSDB_CODE_OUT_OF_MEMORY; + } + strcpy(logBuffer, logPrefix); + pTtlMgr->logPrefix = logBuffer; + ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback); if (ret < 0) { - metaError("failed to open %s since %s", ttlV1Tbname, tstrerror(terrno)); + metaError("%s, failed to open %s since %s", pTtlMgr->logPrefix, ttlV1Tbname, tstrerror(terrno)); tdbOsFree(pTtlMgr); return ret; } @@ -62,14 +70,14 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) { ret = ttlMgrFillCache(pTtlMgr); if (ret < 0) { - metaError("failed to fill hash since %s", tstrerror(terrno)); + metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno)); ttlMgrCleanup(pTtlMgr); return ret; } int64_t endNs = taosGetTimestampNs(); - metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache), - endNs - startNs); + metaInfo("%s, ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix, + taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs); *ppTtlMgr = pTtlMgr; return TSDB_CODE_SUCCESS; @@ -91,37 +99,37 @@ int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) { if (!tdbTbExist(ttlTbname, meta->pEnv)) return TSDB_CODE_SUCCESS; - metaInfo("ttl mgr start upgrade"); + metaInfo("%s, ttl mgr start upgrade", pTtlMgr->logPrefix); int64_t startNs = taosGetTimestampNs(); ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0); if (ret < 0) { - metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno)); + metaError("%s, failed to open %s index since %s", pTtlMgr->logPrefix, ttlTbname, tstrerror(terrno)); goto _out; } ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta); if (ret < 0) { - metaError("failed to convert ttl index since %s", tstrerror(terrno)); + metaError("%s, failed to convert ttl index since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn); if (ret < 0) { - metaError("failed to drop old ttl index since %s", tstrerror(terrno)); + metaError("%s, failed to drop old ttl index since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } ret = ttlMgrFillCache(pTtlMgr); if (ret < 0) { - metaError("failed to fill hash since %s", tstrerror(terrno)); + metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } int64_t endNs = taosGetTimestampNs(); - metaInfo("ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache), - endNs - startNs); + metaInfo("%s, ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix, + taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs); _out: tdbTbClose(pTtlMgr->pOldTtlIdx); pTtlMgr->pOldTtlIdx = NULL; @@ -130,11 +138,12 @@ _out: } static void ttlMgrCleanup(STtlManger *pTtlMgr) { + taosMemoryFree(pTtlMgr->logPrefix); taosHashCleanup(pTtlMgr->pTtlCache); taosHashCleanup(pTtlMgr->pDirtyUids); tdbTbClose(pTtlMgr->pTtlIdx); taosThreadRwlockDestroy(&pTtlMgr->lock); - tdbOsFree(pTtlMgr); + taosMemoryFree(pTtlMgr); } static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) { @@ -250,13 +259,13 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) { int ret = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry)); if (ret < 0) { - metaError("ttlMgr insert failed to update ttl cache since %s", tstrerror(terrno)); + metaError("%s, ttlMgr insert failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } ret = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry)); if (ret < 0) { - metaError("ttlMgr insert failed to update ttl dirty uids since %s", tstrerror(terrno)); + metaError("%s, ttlMgr insert failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } @@ -264,20 +273,21 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) { _out: ttlMgrULock(pTtlMgr); - metaDebug("ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, updCtx->uid, - updCtx->changeTimeMs, updCtx->ttlDays); + metaDebug("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix, + updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays); return ret; } int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) { + if (delCtx->ttlDays == 0) return 0; ttlMgrWLock(pTtlMgr); STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DEL}; int ret = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry)); if (ret < 0) { - metaError("ttlMgr del failed to update ttl dirty uids since %s", tstrerror(terrno)); + metaError("%s, ttlMgr del failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } @@ -285,7 +295,7 @@ int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) { _out: ttlMgrULock(pTtlMgr); - metaDebug("ttl mgr delete ttl, uid: %" PRId64, delCtx->uid); + metaDebug("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid); return ret; } @@ -293,6 +303,8 @@ _out: int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtimeCtx) { ttlMgrWLock(pTtlMgr); + int ret = 0; + STtlCacheEntry *oldData = taosHashGet(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid)); if (oldData == NULL) { goto _out; @@ -301,17 +313,17 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime STtlCacheEntry cacheEntry = {.ttlDays = oldData->ttlDays, .changeTimeMs = pUpdCtimeCtx->changeTimeMs}; STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT}; - int ret = - taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry)); + ret = taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry)); if (ret < 0) { - metaError("ttlMgr update ctime failed to update ttl cache since %s", tstrerror(terrno)); + metaError("%s, ttlMgr update ctime failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } ret = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry, sizeof(dirtryEntry)); if (ret < 0) { - metaError("ttlMgr update ctime failed to update ttl dirty uids since %s", tstrerror(terrno)); + metaError("%s, ttlMgr update ctime failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, + tstrerror(terrno)); goto _out; } @@ -319,7 +331,8 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime _out: ttlMgrULock(pTtlMgr); - metaDebug("ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pUpdCtimeCtx->uid, pUpdCtimeCtx->changeTimeMs); + metaDebug("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid, + pUpdCtimeCtx->changeTimeMs); return ret; } @@ -366,7 +379,7 @@ _out: int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) { ttlMgrWLock(pTtlMgr); - metaInfo("ttl mgr flush start."); + metaInfo("%s, ttl mgr flush start. dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids)); int ret = -1; @@ -377,9 +390,9 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) { STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid)); if (cacheEntry == NULL) { - metaError("ttlMgr flush failed to get ttl cache since %s, uid: %" PRId64 ", type: %d", tstrerror(terrno), *pUid, - pEntry->type); - goto _out; + metaError("%s, ttlMgr flush failed to get ttl cache since %s, uid: %" PRId64 ", type: %d", pTtlMgr->logPrefix, + tstrerror(terrno), *pUid, pEntry->type); + continue; } STtlIdxKeyV1 ttlKey; @@ -389,27 +402,29 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) { ret = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), &cacheEntry->ttlDays, sizeof(cacheEntry->ttlDays), pTxn); if (ret < 0) { - metaError("ttlMgr flush failed to flush ttl cache upsert since %s", tstrerror(terrno)); + metaError("%s, ttlMgr flush failed to flush ttl cache upsert since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } } else if (pEntry->type == ENTRY_TYPE_DEL) { ret = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn); if (ret < 0) { - metaError("ttlMgr flush failed to flush ttl cache del since %s", tstrerror(terrno)); + metaError("%s, ttlMgr flush failed to flush ttl cache del since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } ret = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid)); if (ret < 0) { - metaError("ttlMgr flush failed to delete ttl cache since %s", tstrerror(terrno)); + metaError("%s, ttlMgr flush failed to delete ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno)); goto _out; } } else { - metaError("ttlMgr flush failed to flush ttl cache, unknown type: %d", pEntry->type); + metaError("%s, ttlMgr flush failed to flush ttl cache, unknown type: %d", pTtlMgr->logPrefix, pEntry->type); goto _out; } - pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIter); + void *pIterTmp = pIter; + pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIterTmp); + taosHashRemove(pTtlMgr->pDirtyUids, pUid, sizeof(tb_uid_t)); } taosHashClear(pTtlMgr->pDirtyUids); @@ -418,7 +433,7 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) { _out: ttlMgrULock(pTtlMgr); - metaInfo("ttl mgr flush end."); + metaInfo("%s, ttl mgr flush end.", pTtlMgr->logPrefix); return ret; } @@ -426,7 +441,7 @@ _out: static int32_t ttlMgrRLock(STtlManger *pTtlMgr) { int32_t ret = 0; - metaTrace("ttlMgr rlock %p", &pTtlMgr->lock); + metaTrace("%s, ttlMgr rlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock); ret = taosThreadRwlockRdlock(&pTtlMgr->lock); @@ -436,7 +451,7 @@ static int32_t ttlMgrRLock(STtlManger *pTtlMgr) { static int32_t ttlMgrWLock(STtlManger *pTtlMgr) { int32_t ret = 0; - metaTrace("ttlMgr wlock %p", &pTtlMgr->lock); + metaTrace("%s, ttlMgr wlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock); ret = taosThreadRwlockWrlock(&pTtlMgr->lock); @@ -446,7 +461,7 @@ static int32_t ttlMgrWLock(STtlManger *pTtlMgr) { static int32_t ttlMgrULock(STtlManger *pTtlMgr) { int32_t ret = 0; - metaTrace("ttlMgr ulock %p", &pTtlMgr->lock); + metaTrace("%s, ttlMgr ulock %p", pTtlMgr->logPrefix, &pTtlMgr->lock); ret = taosThreadRwlockUnlock(&pTtlMgr->lock);