From c426fa8bb18683ffaf9e660615cc6029ca38afcf Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 27 Sep 2022 17:43:33 +0800 Subject: [PATCH] feat: code optimization for data migrate --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 3 +- source/dnode/vnode/src/tsdb/tsdbRetention2.c | 228 +++++++------------ 2 files changed, 81 insertions(+), 150 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 02284fb6d9..866a9b3aaa 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -767,7 +767,6 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { if (++nLoops > 1000) { nLoops = 0; sched_yield(); - // printf("%s:%d wait retention to finish\n", __func__, __LINE__); } } if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) { @@ -780,7 +779,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { } else { goto _wait_retention_end; } - atomic_store_8(&pTsdb->trimHdl.commitInWait, 0); + atomic_val_compare_exchange_8(&pTsdb->trimHdl.commitInWait, 1, 0); } code = tsdbFSCopy(pTsdb, &pCommitter->fs); diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention2.c b/source/dnode/vnode/src/tsdb/tsdbRetention2.c index ed45c6bc2a..d72211047f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention2.c @@ -17,20 +17,12 @@ enum { RETENTION_NO = 0, RETENTION_EXPIRED = 1, RETENTION_MIGRATE = 2 }; -#if 1 -#define MIGRATE_MIN_FSIZE (1048576 << 9) // 512 MB #define MIGRATE_MAX_SPEED (1048576 << 5) // 32 MB #define MIGRATE_MIN_COST (5) // second -#else -#define MIGRATE_MIN_FSIZE (1048576 << 5) // 32 MB -#define MIGRATE_MAX_SPEED (1048576 << 2) // 4 MB -#define MIGRATE_MIN_COST (5) // second -#endif static bool tsdbShouldDoMigrate(STsdb *pTsdb); static int32_t tsdbShouldDoRetention(STsdb *pTsdb, int64_t now); -static int32_t tsdbProcessExpire(STsdb *pTsdb, int64_t now, int32_t retention); -static int32_t tsdbProcessMigrate(STsdb *pTsdb, int64_t now, int64_t maxSpeed, int32_t retention); +static int32_t tsdbProcessRetention(STsdb *pTsdb, int64_t now, int64_t maxSpeed, int32_t retention, int8_t type); static bool tsdbShouldDoMigrate(STsdb *pTsdb) { if (tfsGetLevel(pTsdb->pVnode->pTfs) < 2) { @@ -74,110 +66,31 @@ static int32_t tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) { return retention; } -static int32_t tsdbProcessExpire(STsdb *pTsdb, int64_t now, int32_t retention) { - int32_t code = 0; - int32_t nLoops = 0; - int32_t maxFid = INT32_MIN; - STsdbFS fs = {0}; - STsdbFS fsLatest = {0}; - - if (!(retention & RETENTION_EXPIRED)) { - goto _exit; - } - - code = tsdbFSCopy(pTsdb, &fs); - if (code) goto _exit; - - int32_t fsSize = taosArrayGetSize(fs.aDFileSet); - for (int32_t iSet = 0; iSet < fsSize; iSet++) { - SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); - int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); - SDiskID did; - - if (expLevel < 0) { - SET_DFSET_EXPIRED(pSet); - if (pSet->fid > maxFid) maxFid = pSet->fid; - } else { - break; - } - } - - if (maxFid == INT32_MIN) goto _exit; - -_wait_commit_end: - while (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) { - if (++nLoops > 1000) { - nLoops = 0; - sched_yield(); - // printf("%s:%d wait commit finished\n", __func__, __LINE__); - } - } - if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) { - if (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) { - atomic_store_8(&pTsdb->trimHdl.state, 0); - goto _wait_commit_end; - } - atomic_store_32(&pTsdb->trimHdl.maxRetentFid, maxFid); - atomic_store_8(&pTsdb->trimHdl.state, 0); - } else { - goto _wait_commit_end; - } - -_merge_fs: - taosThreadRwlockWrlock(&pTsdb->rwLock); - if ((code = tsdbFSCopy(pTsdb, &fsLatest))) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _exit; - } - // 1) merge tsdbFSNew and pTsdb->fs - if ((code = tsdbFSUpdDel(pTsdb, &fsLatest, &fs, maxFid))) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _exit; - } - // 2) save CURRENT - if ((code = tsdbFSCommit1(pTsdb, &fsLatest))) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _exit; - } - // 3) apply the tsdbFS to pTsdb->fs - if ((code = tsdbFSCommit2(pTsdb, &fsLatest))) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - goto _exit; - } - taosThreadRwlockUnlock(&pTsdb->rwLock); - -_exit: - tsdbFSDestroy(&fs); - tsdbFSDestroy(&fsLatest); - if (code != 0) { - tsdbError("vgId:%d, tsdb do retention(expire) failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - ASSERT(0); - } - return code; -} - /** - * @brief + * @brief process retention * * @param pTsdb * @param now + * @param maxSpeed * @param retention + * @param type 0 RETENTION_EXPIRED, 1 RETENTION_MIGRATE * @return int32_t */ -static int32_t tsdbProcessMigrate(STsdb *pTsdb, int64_t now, int64_t maxSpeed, int32_t retention) { +static int32_t tsdbProcessRetention(STsdb *pTsdb, int64_t now, int64_t maxSpeed, int32_t retention, int8_t type) { int32_t code = 0; int32_t nBatch = 0; int32_t nLoops = 0; - int32_t maxFid = INT32_MIN; + int32_t maxFid = 0; int64_t fSize = 0; + int64_t speed = maxSpeed > 0 ? maxSpeed : MIGRATE_MAX_SPEED; STsdbFS fs = {0}; STsdbFS fsLatest = {0}; - if (!(retention & RETENTION_MIGRATE)) { + if (!(retention & type)) { goto _exit; } -_migrate_loop: +_retention_loop: // reset maxFid = INT32_MIN; fSize = 0; @@ -186,32 +99,47 @@ _migrate_loop: if (atomic_load_8(&pTsdb->trimHdl.commitInWait) == 1) { atomic_store_32(&pTsdb->trimHdl.maxRetentFid, INT32_MIN); - taosMsleep(10); + taosMsleep(50); } code = tsdbFSCopy(pTsdb, &fs); if (code) goto _exit; int32_t fsSize = taosArrayGetSize(fs.aDFileSet); - for (int32_t iSet = 0; iSet < fsSize; ++iSet) { - SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); - int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); - SDiskID did; - - if (pSet->diskId.level == expLevel) continue; - - if (expLevel > 0) { - ASSERT(pSet->fid > maxFid); - maxFid = pSet->fid; - fSize += (pSet->pDataF->size + pSet->pHeadF->size + pSet->pSmaF->size); - if (fSize / MIGRATE_MAX_SPEED > MIGRATE_MIN_COST) { - break; - } - for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) { - fSize += pSet->aSttF[iStt]->size; + if (type == RETENTION_MIGRATE) { + for (int32_t iSet = 0; iSet < fsSize; ++iSet) { + SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); + int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); + SDiskID did; + + if (pSet->diskId.level == expLevel) continue; + + if (expLevel > 0) { + ASSERT(pSet->fid > maxFid); + maxFid = pSet->fid; + fSize += (pSet->pDataF->size + pSet->pHeadF->size + pSet->pSmaF->size); + if (fSize / speed > MIGRATE_MIN_COST) { + break; + } + for (int32_t iStt = 0; iStt < pSet->nSttF; ++iStt) { + fSize += pSet->aSttF[iStt]->size; + } + if (fSize / speed > MIGRATE_MIN_COST) { + tsdbInfo("vgId:%d migrate loop[%d] with maxFid:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid); + break; + } } - if (fSize / MIGRATE_MAX_SPEED > MIGRATE_MIN_COST) { - tsdbInfo("vgId:%d migrate loop[%d] with maxFid:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid); + } + } else if (type == RETENTION_EXPIRED) { + for (int32_t iSet = 0; iSet < fsSize; ++iSet) { + SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); + int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); + SDiskID did; + + if (expLevel < 0) { + SET_DFSET_EXPIRED(pSet); + if (pSet->fid > maxFid) maxFid = pSet->fid; + } else { break; } } @@ -219,7 +147,7 @@ _migrate_loop: if (maxFid == INT32_MIN) goto _exit; -_wait_commit_end: +_commit_conflict_check: while (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) { if (++nLoops > 1000) { nLoops = 0; @@ -229,46 +157,48 @@ _wait_commit_end: if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) { if (atomic_load_32(&pTsdb->trimHdl.minCommitFid) <= maxFid) { atomic_store_8(&pTsdb->trimHdl.state, 0); - goto _wait_commit_end; + goto _commit_conflict_check; } atomic_store_32(&pTsdb->trimHdl.maxRetentFid, maxFid); atomic_store_8(&pTsdb->trimHdl.state, 0); } else { - goto _wait_commit_end; + goto _commit_conflict_check; } // migrate - for (int32_t iSet = 0; iSet < fsSize; ++iSet) { - SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); - int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); - SDiskID did; + if (type == RETENTION_MIGRATE) { + for (int32_t iSet = 0; iSet < fsSize; ++iSet) { + SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet); + int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); + SDiskID did; - if (pSet->fid > maxFid) break; + if (pSet->fid > maxFid) break; - tsdbInfo("vgId:%d migrate loop[%d] with maxFid:%d, fid:%d, did:%d, level:%d, expLevel:%d", TD_VID(pTsdb->pVnode), - nBatch, maxFid, pSet->fid, pSet->diskId.id, pSet->diskId.level, expLevel); + tsdbInfo("vgId:%d migrate loop[%d] with maxFid:%d, fid:%d, did:%d, level:%d, expLevel:%d", TD_VID(pTsdb->pVnode), + nBatch, maxFid, pSet->fid, pSet->diskId.id, pSet->diskId.level, expLevel); - if (expLevel < 0) { - SET_DFSET_EXPIRED(pSet); - } else { - if (expLevel == pSet->diskId.level) continue; + if (expLevel < 0) { + SET_DFSET_EXPIRED(pSet); + } else { + if (expLevel == pSet->diskId.level) continue; - if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) { - code = terrno; - goto _exit; - } + if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) { + code = terrno; + goto _exit; + } - if (did.level == pSet->diskId.level) continue; + if (did.level == pSet->diskId.level) continue; - // copy file to new disk - SDFileSet fSet = *pSet; - fSet.diskId = did; + // copy file to new disk + SDFileSet fSet = *pSet; + fSet.diskId = did; - code = tsdbDFileSetCopy(pTsdb, pSet, &fSet, maxSpeed); - if (code) goto _exit; + code = tsdbDFileSetCopy(pTsdb, pSet, &fSet, maxSpeed); + if (code) goto _exit; - code = tsdbFSUpsertFSet(&fs, &fSet); - if (code) goto _exit; + code = tsdbFSUpsertFSet(&fs, &fSet); + if (code) goto _exit; + } } } @@ -296,14 +226,16 @@ _merge_fs: } taosThreadRwlockUnlock(&pTsdb->rwLock); - ++nBatch; - goto _migrate_loop; + if (type == RETENTION_MIGRATE) { + ++nBatch; + goto _retention_loop; + } _exit: tsdbFSDestroy(&fs); tsdbFSDestroy(&fsLatest); if (code != 0) { - tsdbError("vgId:%d, tsdb do retention(migrate) failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d, tsdb do retention %" PRIi8 " failed since %s", TD_VID(pTsdb->pVnode), type, tstrerror(code)); ASSERT(0); } return code; @@ -331,21 +263,21 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now, int64_t maxSpeed) { } // step 1: process expire - code = tsdbProcessExpire(pTsdb, now, retention); + code = tsdbProcessRetention(pTsdb, now, maxSpeed, retention, RETENTION_EXPIRED); if (code < 0) goto _exit; // step 2: process multi-tier migration - code = tsdbProcessMigrate(pTsdb, now, maxSpeed, retention); + code = tsdbProcessRetention(pTsdb, now, maxSpeed, retention, RETENTION_MIGRATE); if (code < 0) goto _exit; _exit: pTsdb->trimHdl.maxRetentFid = INT32_MIN; if (code != 0) { - tsdbError("vgId:%d, tsdb do retention:%d failed since %s", TD_VID(pTsdb->pVnode), retention, tstrerror(code)); + tsdbError("vgId:%d, tsdb do retention %d failed since %s", TD_VID(pTsdb->pVnode), retention, tstrerror(code)); ASSERT(0); // tsdbFSRollback(pTsdb->pFS); } else { - tsdbInfo("vgId:%d, tsdb do retention:%d succeed", TD_VID(pTsdb->pVnode), retention); + tsdbInfo("vgId:%d, tsdb do retention %d succeed", TD_VID(pTsdb->pVnode), retention); } return code; } \ No newline at end of file -- GitLab