diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ab940b2107fc1ddd197d4b6ac4d3a40fe217b4eb..180be0ff6e56b392f1fc7531627b0e33c68336dd 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -322,6 +322,7 @@ int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSc // structs ======================= struct STsdbFS { + int64_t version; SDelFile *pDelFile; SArray *aDFileSet; // SArray }; @@ -575,9 +576,6 @@ struct SDFileSet { SSttFile *aSttF[TSDB_MAX_STT_TRIGGER]; }; -#define SET_DFSET_EXPIRED(d) ((d)->diskId.id = -1) -#define IS_DFSET_EXPIRED(d) ((d)->diskId.id == -1) - struct SRowIter { TSDBROW *pRow; STSchema *pTSchema; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 6fa8d503b73933202d2452efe4acde6127ca6ae3..447efcebf9e9e276ce52edb7662d292da6ff438d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -988,21 +988,33 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; SMemTable *pMemTable = pTsdb->imem; - bool inTrim = atomic_load_8(&pTsdb->pVnode->trimDbH.state); + STsdbFS fsLatest = {0}; ASSERT(eno == 0); - if (inTrim) taosThreadRwlockWrlock(&pTsdb->rwLock); + // lock + taosThreadRwlockWrlock(&pTsdb->rwLock); + + ASSERT(pCommitter->fs.version <= pTsdb->fs.version); + + if (pCommitter->fs.version < pTsdb->fs.version) { + if ((code = tsdbFSCopy(pTsdb, &fsLatest))) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _err; + } + + if ((code = tsdbFSUpdDel(pTsdb, &pCommitter->fs, &fsLatest, pTsdb->trimHdl.minCommitFid - 1))) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _err; + } + } code = tsdbFSCommit1(pTsdb, &pCommitter->fs); if (code) { - if (inTrim) taosThreadRwlockUnlock(&pTsdb->rwLock); + taosThreadRwlockUnlock(&pTsdb->rwLock); goto _err; } - // lock - if (!inTrim) taosThreadRwlockWrlock(&pTsdb->rwLock); - // commit or rollback code = tsdbFSCommit2(pTsdb, &pCommitter->fs); if (code) { diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index d02cbdc089321f730f6e4c2f9384ba44b9056c2d..ef4f6930beba13375ca1e53787ab215a6094ae78 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -419,6 +419,7 @@ int32_t tsdbFSOpen(STsdb *pTsdb) { int32_t code = 0; // open handle + pTsdb->fs.version = 0; pTsdb->fs.pDelFile = NULL; pTsdb->fs.aDFileSet = taosArrayInit(0, sizeof(SDFileSet)); if (pTsdb->fs.aDFileSet == NULL) { @@ -534,6 +535,7 @@ int32_t tsdbFSClose(STsdb *pTsdb) { int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { int32_t code = 0; + pFS->version = pTsdb->fs.version; pFS->pDelFile = NULL; pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet)); if (pFS->aDFileSet == NULL) { @@ -746,12 +748,12 @@ int32_t tsdbFSUpdDel(STsdb *pTsdb, STsdbFS *pFS, STsdbFS *pFSNew, int32_t maxFid if (pSetOld && pSetNew) { if (pSetOld->fid == pSetNew->fid) { - if (IS_DFSET_EXPIRED(pSetNew)) goto _remove_old; goto _merge_migrate; - } else if (pSetOld->fid < pSetNew->fid) { - ++iOld; + } else if (pSetOld->fid > pSetNew->fid) { + goto _remove_old; } else { - ++iNew; + ++iOld; + ASSERT(0); } continue; } else { @@ -794,8 +796,8 @@ int32_t tsdbFSUpdDel(STsdb *pTsdb, STsdbFS *pFS, STsdbFS *pFSNew, int32_t maxFid pSetOld->diskId = pSetNew->diskId; } - iOld++; - iNew++; + ++iOld; + ++iNew; continue; _remove_old: @@ -806,7 +808,7 @@ int32_t tsdbFSUpdDel(STsdb *pTsdb, STsdbFS *pFS, STsdbFS *pFSNew, int32_t maxFid } taosMemoryFree(pSetOld->pSmaF); taosArrayRemove(pFS->aDFileSet, iOld); - iNew++; + ++iNew; continue; } @@ -850,6 +852,8 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { int32_t nRef; char fname[TSDB_FILENAME_LEN]; + ++pTsdb->fs.version; + // del if (pFSNew->pDelFile) { SDelFile *pDelFile = pTsdb->fs.pDelFile; diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 971b950975e3a5db997eb116d48e7889b7ad696d..fbe16b6a4255a8378f88efe7839d9eedfcf68aa0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -17,7 +17,7 @@ enum { RETENTION_NO = 0, RETENTION_EXPIRED = 1, RETENTION_MIGRATE = 2 }; -#define MIGRATE_MAX_SPEED (1048576 << 5) // 32 MB +#define MIGRATE_MAX_SPEED (1048576 << 4) // 16 MB, vnode level #define MIGRATE_MIN_COST (5) // second static bool tsdbShouldDoMigrate(STsdb *pTsdb); @@ -105,8 +105,8 @@ _retention_loop: code = tsdbFSCopy(pTsdb, &fs); if (code) goto _exit; - int32_t fsSize = taosArrayGetSize(fs.aDFileSet); if (type == RETENTION_MIGRATE) { + int32_t fsSize = taosArrayGetSize(fs.aDFileSet); for (int32_t iSet = 0; iSet < fsSize; ++iSet) { SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); @@ -119,26 +119,35 @@ _retention_loop: maxFid = pSet->fid; fSize += (pSet->pDataF->size + pSet->pHeadF->size + pSet->pSmaF->size); if (fSize / speed > MIGRATE_MIN_COST) { + tsdbDebug("vgId:%d migrate loop %d with maxFid:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid); break; } for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) { fSize += pSet->aSttF[iStt]->size; } if (fSize / speed > MIGRATE_MIN_COST) { - tsdbDebug("vgId:%d migrate loop[%d] with maxFid:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid); + tsdbDebug("vgId:%d migrate loop %d with maxFid:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid); break; } } } } else if (type == RETENTION_EXPIRED) { - for (int32_t iSet = 0; iSet < fsSize; ++iSet) { + for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); ++iSet) { SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); SDiskID did; if (expLevel < 0) { - SET_DFSET_EXPIRED(pSet); + ASSERT(pSet->fid > maxFid); if (pSet->fid > maxFid) maxFid = pSet->fid; + taosMemoryFree(pSet->pHeadF); + taosMemoryFree(pSet->pDataF); + taosMemoryFree(pSet->pSmaF); + for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) { + taosMemoryFree(pSet->aSttF[iStt]); + } + taosArrayRemove(fs.aDFileSet, iSet); + --iSet; } else { break; } @@ -167,18 +176,25 @@ _commit_conflict_check: // migrate if (type == RETENTION_MIGRATE) { - for (int32_t iSet = 0; iSet < fsSize; ++iSet) { + for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); ++iSet) { SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); SDiskID did; if (pSet->fid > maxFid) break; - tsdbDebug("vgId:%d migrate loop[%d] with maxFid:%d, fid:%d, did:%d, level:%d, expLevel:%d", TD_VID(pTsdb->pVnode), + tsdbDebug("vgId:%d migrate loop %d with maxFid:%d, fid:%d, did:%d, level:%d, expLevel:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid, pSet->fid, pSet->diskId.id, pSet->diskId.level, expLevel); if (expLevel < 0) { - SET_DFSET_EXPIRED(pSet); + taosMemoryFree(pSet->pHeadF); + taosMemoryFree(pSet->pDataF); + taosMemoryFree(pSet->pSmaF); + for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) { + taosMemoryFree(pSet->aSttF[iStt]); + } + taosArrayRemove(fs.aDFileSet, iSet); + --iSet; } else { if (expLevel == pSet->diskId.level) continue;