From 8e3084ecc0adcc87fa95dd9a87a90842ee619df0 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 27 Feb 2023 18:09:51 +0800 Subject: [PATCH] feat: auto retention --- source/dnode/vnode/CMakeLists.txt | 1 + source/dnode/vnode/src/inc/vnodeInt.h | 1 - source/dnode/vnode/src/sma/smaRollup.c | 2 + source/dnode/vnode/src/tsdb/tsdbRetention.c | 56 +++++---- source/dnode/vnode/src/tsdb/tsdbWrite.c | 4 +- source/dnode/vnode/src/vnd/vnodeRetention.c | 130 ++++++++++++++++++++ source/dnode/vnode/src/vnd/vnodeSvr.c | 9 +- 7 files changed, 171 insertions(+), 32 deletions(-) create mode 100644 source/dnode/vnode/src/vnd/vnodeRetention.c diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index ea7046886e..8b13d8f02b 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -15,6 +15,7 @@ target_sources( "src/vnd/vnodeSync.c" "src/vnd/vnodeSnapshot.c" "src/vnd/vnodeCompact.c" + "src/vnd/vnodeRetention.c" # meta "src/meta/metaOpen.c" diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 93e611e870..c0d017e350 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -180,7 +180,6 @@ int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); int32_t tsdbFinishCommit(STsdb* pTsdb); int32_t tsdbRollbackCommit(STsdb* pTsdb); -int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp); int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 37ae7d895e..99e171dde1 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -595,6 +595,7 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) { return 0; } +#if 0 /** * @brief retention of rsma1/rsma2 * @@ -618,6 +619,7 @@ int32_t smaDoRetention(SSma *pSma, int64_t now) { _end: return code; } +#endif static void tdBlockDataDestroy(SArray *pBlockArr) { for (int32_t i = 0; i < taosArrayGetSize(pBlockArr); ++i) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index c6e1ed99f1..7c7e1bd0f7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -15,7 +15,7 @@ #include "tsdb.h" -static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) { +static bool tsdbShouldDoRetentionImpl(STsdb *pTsdb, int64_t now) { for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) { SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); @@ -38,19 +38,21 @@ static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) { return false; } +bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) { + bool should; + taosThreadRwlockRdlock(&pTsdb->rwLock); + should = tsdbShouldDoRetentionImpl(pTsdb, now); + taosThreadRwlockUnlock(&pTsdb->rwLock); + return should; +} int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { int32_t code = 0; - - if (!tsdbShouldDoRetention(pTsdb, now)) { - return code; - } - - // do retention + int32_t lino = 0; STsdbFS fs = {0}; code = tsdbFSCopy(pTsdb, &fs); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); iSet++) { SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); @@ -60,8 +62,10 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { if (expLevel < 0) { taosMemoryFree(pSet->pHeadF); taosMemoryFree(pSet->pDataF); - taosMemoryFree(pSet->aSttF[0]); taosMemoryFree(pSet->pSmaF); + for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { + taosMemoryFree(pSet->aSttF[iStt]); + } taosArrayRemove(fs.aDFileSet, iSet); iSet--; } else { @@ -78,35 +82,33 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { fSet.diskId = did; code = tsdbDFileSetCopy(pTsdb, pSet, &fSet); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); code = tsdbFSUpsertFSet(&fs, &fSet); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } } // do change fs code = tsdbFSPrepareCommit(pTsdb, &fs); - if (code) goto _err; - - taosThreadRwlockWrlock(&pTsdb->rwLock); + TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbFSCommit(pTsdb); +_exit: if (code) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _err; + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); } - - taosThreadRwlockUnlock(&pTsdb->rwLock); - tsdbFSDestroy(&fs); - -_exit: return code; +} -_err: - tsdbError("vgId:%d, tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - ASSERT(0); - // tsdbFSRollback(pTsdb->pFS); - return code; +static int32_t tsdbCommitRetentionImpl(STsdb *pTsdb) { return tsdbFSCommit(pTsdb); } + +int32_t tsdbCommitRetention(STsdb *pTsdb) { + taosThreadRwlockWrlock(&pTsdb->rwLock); + tsdbCommitRetentionImpl(pTsdb); + taosThreadRwlockUnlock(&pTsdb->rwLock); + tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); + return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index bd2d263804..2dbac956ed 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -35,9 +35,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2 return -1; } - if (pMsg) { - arrSize = taosArrayGetSize(pMsg->aSubmitTbData); - } + arrSize = taosArrayGetSize(pMsg->aSubmitTbData); // scan and convert if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) { diff --git a/source/dnode/vnode/src/vnd/vnodeRetention.c b/source/dnode/vnode/src/vnd/vnodeRetention.c new file mode 100644 index 0000000000..170deb4286 --- /dev/null +++ b/source/dnode/vnode/src/vnd/vnodeRetention.c @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "vnd.h" + +typedef struct { + SVnode *pVnode; + int64_t now; + int64_t commitID; + SVnodeInfo info; +} SRetentionInfo; + +extern bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now); +extern int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now); +extern int32_t tsdbCommitRetention(STsdb *pTsdb); + +static int32_t vnodePrepareRentention(SVnode *pVnode, SRetentionInfo *pInfo) { + int32_t code = 0; + int32_t lino = 0; + + tsem_wait(&pVnode->canCommit); + + pInfo->commitID = ++pVnode->state.commitID; + + char dir[TSDB_FILENAME_LEN] = {0}; + if (pVnode->pTfs) { + snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); + } else { + snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); + } + + if (vnodeLoadInfo(dir, &pInfo->info) < 0) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + tsem_post(&pVnode->canCommit); + } else { + vInfo("vgId:%d %s done", TD_VID(pVnode), __func__); + } + return code; +} + +static int32_t vnodeRetentionTask(void *param) { + int32_t code = 0; + int32_t lino = 0; + + SRetentionInfo *pInfo = (SRetentionInfo *)param; + SVnode *pVnode = pInfo->pVnode; + char dir[TSDB_FILENAME_LEN] = {0}; + + if (pVnode->pTfs) { + snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); + } else { + snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); + } + + // save info + pInfo->info.state.commitID = pInfo->commitID; + + if (vnodeSaveInfo(dir, &pInfo->info) < 0) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + + // do job + code = tsdbDoRetention(pInfo->pVnode->pTsdb, pInfo->now); + TSDB_CHECK_CODE(code, lino, _exit); + + // commit info + vnodeCommitInfo(dir); + + // commit sub-job + tsdbCommitRetention(pVnode->pTsdb); + +_exit: + if (code) { + vError("vgId:%d %s failed at line %d since %s", TD_VID(pInfo->pVnode), __func__, lino, tstrerror(code)); + } else { + vInfo("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__); + } + tsem_post(&pInfo->pVnode->canCommit); + taosMemoryFree(pInfo); + return code; +} + +int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now) { + int32_t code = 0; + int32_t lino = 0; + + if (!tsdbShouldDoRetention(pVnode->pTsdb, now)) return code; + + SRetentionInfo *pInfo = (SRetentionInfo *)taosMemoryCalloc(1, sizeof(*pInfo)); + if (pInfo == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + pInfo->pVnode = pVnode; + pInfo->now = now; + + code = vnodePrepareRentention(pVnode, pInfo); + TSDB_CHECK_CODE(code, lino, _exit); + + vnodeScheduleTask(vnodeRetentionTask, pInfo); + +_exit: + if (code) { + vError("vgId:%d %s failed at line %d since %s", TD_VID(pInfo->pVnode), __func__, lino, tstrerror(code)); + if (pInfo) taosMemoryFree(pInfo); + } else { + vInfo("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__); + } + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 8651478afa..59e830ea4b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -586,6 +586,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) { pMetaRsp->precision = pVnode->config.tsdbCfg.precision; } +extern int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now); static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { int32_t code = 0; SVTrimDbReq trimReq = {0}; @@ -598,12 +599,16 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp); - // process +// process +#if 0 code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp); if (code) goto _exit; code = smaDoRetention(pVnode->pSma, trimReq.timestamp); if (code) goto _exit; +#else + vnodeAsyncRentention(pVnode, trimReq.timestamp); +#endif _exit: return code; @@ -635,6 +640,8 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p ret = smaDoRetention(pVnode->pSma, ttlReq.timestamp); if (ret) goto end; +#else + vnodeAsyncRentention(pVnode, ttlReq.timestamp); #endif end: -- GitLab