提交 a48d359f 编写于 作者: S Shungang Li

fix: ttlMgrDeleteTtl should ignore ttl 0 tables

上级 55ee9d6c
......@@ -38,6 +38,8 @@ typedef struct STtlManger {
SHashObj* pTtlCache; // key: tuid, value: {ttl, ctime}
SHashObj* pDirtyUids; // dirty tuid
TTB* pTtlIdx; // btree<{deleteTime, tuid}, ttl>
char* logPrefix;
} STtlManger;
typedef struct {
......@@ -77,9 +79,10 @@ typedef struct {
typedef struct {
tb_uid_t uid;
TXN* pTxn;
int64_t ttlDays;
} STtlDelTtlCtx;
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback);
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback, const char* logPrefix);
void ttlMgrClose(STtlManger* pTtlMgr);
int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta);
......
......@@ -130,7 +130,9 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
}
// open pTtlMgr ("ttlv1.idx")
ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0);
char logPrefix[128] = {0};
sprintf(logPrefix, "vgId:%d", TD_VID(pVnode));
ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0, logPrefix);
if (ret < 0) {
metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
......
......@@ -974,7 +974,15 @@ static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) {
}
static int metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) {
if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0;
STtlDelTtlCtx ctx = {.uid = pME->uid, .pTxn = pMeta->txn};
if (pME->type == TSDB_CHILD_TABLE) {
ctx.ttlDays = pME->ctbEntry.ttlDays;
} else {
ctx.ttlDays = pME->ntbEntry.ttlDays;
}
return ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx);
}
......@@ -1968,7 +1976,6 @@ static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) {
if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0;
STtlUpdTtlCtx ctx = {.uid = pME->uid};
if (pME->type == TSDB_CHILD_TABLE) {
ctx.ttlDays = pME->ctbEntry.ttlDays;
ctx.changeTimeMs = pME->ctbEntry.btime;
......
......@@ -39,8 +39,8 @@ static int32_t ttlMgrULock(STtlManger *pTtlMgr);
const char *ttlTbname = "ttl.idx";
const char *ttlV1Tbname = "ttlv1.idx";
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
int ret = TSDB_CODE_SUCCESS;
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback, const char *logPrefix) {
int ret = TSDB_CODE_SUCCESS;
int64_t startNs = taosGetTimestampNs();
*ppTtlMgr = NULL;
......@@ -48,9 +48,17 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr));
if (pTtlMgr == NULL) return TSDB_CODE_OUT_OF_MEMORY;
char *logBuffer = (char *)tdbOsCalloc(1, strlen(logPrefix) + 1);
if (logBuffer == NULL) {
tdbOsFree(pTtlMgr);
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(logBuffer, logPrefix);
pTtlMgr->logPrefix = logBuffer;
ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
if (ret < 0) {
metaError("failed to open %s since %s", ttlV1Tbname, tstrerror(terrno));
metaError("%s, failed to open %s since %s", pTtlMgr->logPrefix, ttlV1Tbname, tstrerror(terrno));
tdbOsFree(pTtlMgr);
return ret;
}
......@@ -62,14 +70,14 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
ret = ttlMgrFillCache(pTtlMgr);
if (ret < 0) {
metaError("failed to fill hash since %s", tstrerror(terrno));
metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno));
ttlMgrCleanup(pTtlMgr);
return ret;
}
int64_t endNs = taosGetTimestampNs();
metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
endNs - startNs);
metaInfo("%s, ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix,
taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
*ppTtlMgr = pTtlMgr;
return TSDB_CODE_SUCCESS;
......@@ -91,37 +99,37 @@ int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
if (!tdbTbExist(ttlTbname, meta->pEnv)) return TSDB_CODE_SUCCESS;
metaInfo("ttl mgr start upgrade");
metaInfo("%s, ttl mgr start upgrade", pTtlMgr->logPrefix);
int64_t startNs = taosGetTimestampNs();
ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0);
if (ret < 0) {
metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno));
metaError("%s, failed to open %s index since %s", pTtlMgr->logPrefix, ttlTbname, tstrerror(terrno));
goto _out;
}
ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta);
if (ret < 0) {
metaError("failed to convert ttl index since %s", tstrerror(terrno));
metaError("%s, failed to convert ttl index since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn);
if (ret < 0) {
metaError("failed to drop old ttl index since %s", tstrerror(terrno));
metaError("%s, failed to drop old ttl index since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
ret = ttlMgrFillCache(pTtlMgr);
if (ret < 0) {
metaError("failed to fill hash since %s", tstrerror(terrno));
metaError("%s, failed to fill hash since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
int64_t endNs = taosGetTimestampNs();
metaInfo("ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
endNs - startNs);
metaInfo("%s, ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix,
taosHashGetSize(pTtlMgr->pTtlCache), endNs - startNs);
_out:
tdbTbClose(pTtlMgr->pOldTtlIdx);
pTtlMgr->pOldTtlIdx = NULL;
......@@ -130,11 +138,12 @@ _out:
}
static void ttlMgrCleanup(STtlManger *pTtlMgr) {
taosMemoryFree(pTtlMgr->logPrefix);
taosHashCleanup(pTtlMgr->pTtlCache);
taosHashCleanup(pTtlMgr->pDirtyUids);
tdbTbClose(pTtlMgr->pTtlIdx);
taosThreadRwlockDestroy(&pTtlMgr->lock);
tdbOsFree(pTtlMgr);
taosMemoryFree(pTtlMgr);
}
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) {
......@@ -250,13 +259,13 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
int ret = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
if (ret < 0) {
metaError("ttlMgr insert failed to update ttl cache since %s", tstrerror(terrno));
metaError("%s, ttlMgr insert failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
ret = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
if (ret < 0) {
metaError("ttlMgr insert failed to update ttl dirty uids since %s", tstrerror(terrno));
metaError("%s, ttlMgr insert failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
......@@ -264,20 +273,21 @@ int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
_out:
ttlMgrULock(pTtlMgr);
metaDebug("ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, updCtx->uid,
updCtx->changeTimeMs, updCtx->ttlDays);
metaDebug("%s, ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, pTtlMgr->logPrefix,
updCtx->uid, updCtx->changeTimeMs, updCtx->ttlDays);
return ret;
}
int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
if (delCtx->ttlDays == 0) return 0;
ttlMgrWLock(pTtlMgr);
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DEL};
int ret = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
if (ret < 0) {
metaError("ttlMgr del failed to update ttl dirty uids since %s", tstrerror(terrno));
metaError("%s, ttlMgr del failed to update ttl dirty uids since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
......@@ -285,7 +295,7 @@ int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
_out:
ttlMgrULock(pTtlMgr);
metaDebug("ttl mgr delete ttl, uid: %" PRId64, delCtx->uid);
metaDebug("%s, ttl mgr delete ttl, uid: %" PRId64, pTtlMgr->logPrefix, delCtx->uid);
return ret;
}
......@@ -293,6 +303,8 @@ _out:
int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtimeCtx) {
ttlMgrWLock(pTtlMgr);
int ret = 0;
STtlCacheEntry *oldData = taosHashGet(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid));
if (oldData == NULL) {
goto _out;
......@@ -301,17 +313,17 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
STtlCacheEntry cacheEntry = {.ttlDays = oldData->ttlDays, .changeTimeMs = pUpdCtimeCtx->changeTimeMs};
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
int ret =
taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
ret = taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
if (ret < 0) {
metaError("ttlMgr update ctime failed to update ttl cache since %s", tstrerror(terrno));
metaError("%s, ttlMgr update ctime failed to update ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
ret = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry,
sizeof(dirtryEntry));
if (ret < 0) {
metaError("ttlMgr update ctime failed to update ttl dirty uids since %s", tstrerror(terrno));
metaError("%s, ttlMgr update ctime failed to update ttl dirty uids since %s", pTtlMgr->logPrefix,
tstrerror(terrno));
goto _out;
}
......@@ -319,7 +331,8 @@ int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtime
_out:
ttlMgrULock(pTtlMgr);
metaDebug("ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pUpdCtimeCtx->uid, pUpdCtimeCtx->changeTimeMs);
metaDebug("%s, ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pTtlMgr->logPrefix, pUpdCtimeCtx->uid,
pUpdCtimeCtx->changeTimeMs);
return ret;
}
......@@ -366,7 +379,7 @@ _out:
int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
ttlMgrWLock(pTtlMgr);
metaInfo("ttl mgr flush start.");
metaInfo("%s, ttl mgr flush start. dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids));
int ret = -1;
......@@ -377,9 +390,9 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
if (cacheEntry == NULL) {
metaError("ttlMgr flush failed to get ttl cache since %s, uid: %" PRId64 ", type: %d", tstrerror(terrno), *pUid,
pEntry->type);
goto _out;
metaError("%s, ttlMgr flush failed to get ttl cache since %s, uid: %" PRId64 ", type: %d", pTtlMgr->logPrefix,
tstrerror(terrno), *pUid, pEntry->type);
continue;
}
STtlIdxKeyV1 ttlKey;
......@@ -389,27 +402,29 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
ret = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), &cacheEntry->ttlDays, sizeof(cacheEntry->ttlDays),
pTxn);
if (ret < 0) {
metaError("ttlMgr flush failed to flush ttl cache upsert since %s", tstrerror(terrno));
metaError("%s, ttlMgr flush failed to flush ttl cache upsert since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
} else if (pEntry->type == ENTRY_TYPE_DEL) {
ret = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);
if (ret < 0) {
metaError("ttlMgr flush failed to flush ttl cache del since %s", tstrerror(terrno));
metaError("%s, ttlMgr flush failed to flush ttl cache del since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
ret = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
if (ret < 0) {
metaError("ttlMgr flush failed to delete ttl cache since %s", tstrerror(terrno));
metaError("%s, ttlMgr flush failed to delete ttl cache since %s", pTtlMgr->logPrefix, tstrerror(terrno));
goto _out;
}
} else {
metaError("ttlMgr flush failed to flush ttl cache, unknown type: %d", pEntry->type);
metaError("%s, ttlMgr flush failed to flush ttl cache, unknown type: %d", pTtlMgr->logPrefix, pEntry->type);
goto _out;
}
pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIter);
void *pIterTmp = pIter;
pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIterTmp);
taosHashRemove(pTtlMgr->pDirtyUids, pUid, sizeof(tb_uid_t));
}
taosHashClear(pTtlMgr->pDirtyUids);
......@@ -418,7 +433,7 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
_out:
ttlMgrULock(pTtlMgr);
metaInfo("ttl mgr flush end.");
metaInfo("%s, ttl mgr flush end.", pTtlMgr->logPrefix);
return ret;
}
......@@ -426,7 +441,7 @@ _out:
static int32_t ttlMgrRLock(STtlManger *pTtlMgr) {
int32_t ret = 0;
metaTrace("ttlMgr rlock %p", &pTtlMgr->lock);
metaTrace("%s, ttlMgr rlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock);
ret = taosThreadRwlockRdlock(&pTtlMgr->lock);
......@@ -436,7 +451,7 @@ static int32_t ttlMgrRLock(STtlManger *pTtlMgr) {
static int32_t ttlMgrWLock(STtlManger *pTtlMgr) {
int32_t ret = 0;
metaTrace("ttlMgr wlock %p", &pTtlMgr->lock);
metaTrace("%s, ttlMgr wlock %p", pTtlMgr->logPrefix, &pTtlMgr->lock);
ret = taosThreadRwlockWrlock(&pTtlMgr->lock);
......@@ -446,7 +461,7 @@ static int32_t ttlMgrWLock(STtlManger *pTtlMgr) {
static int32_t ttlMgrULock(STtlManger *pTtlMgr) {
int32_t ret = 0;
metaTrace("ttlMgr ulock %p", &pTtlMgr->lock);
metaTrace("%s, ttlMgr ulock %p", pTtlMgr->logPrefix, &pTtlMgr->lock);
ret = taosThreadRwlockUnlock(&pTtlMgr->lock);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册