diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index c5f6d179d8c2f3d983944303de385139dc00f333..832a4617b056b4c8e8ed0a4699176189b50298de 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1389,7 +1389,7 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; void *pIter = NULL; - SVTrimDbReq trimReq = {.timestamp = taosGetTimestampMs(), .maxSpeed = 1048576 << 5}; // TODO: use specified maxSpeed + SVTrimDbReq trimReq = {.timestamp = taosGetTimestampMs(), .maxSpeed = 1048576 << 2}; // TODO: use specified maxSpeed int32_t reqLen = tSerializeSVTrimDbReq(NULL, 0, &trimReq); int32_t contLen = reqLen + sizeof(SMsgHead); @@ -1413,7 +1413,7 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) { if (code != 0) { mError("vgId:%d, failed to send vnode-trim request to vnode since 0x%x", pVgroup->vgId, code); } else { - mInfo("vgId:%d, send vnode-trim request to vnode, time:%d", pVgroup->vgId, trimReq.timestamp); + mInfo("vgId:%d, send vnode-trim request to vnode, time:%" PRIi64, pVgroup->vgId, trimReq.timestamp); } sdbRelease(pSdb, pVgroup); } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 53d04cc9c16c6cd132f2be551942d42e61ad3b71..81ad1d0e7f2641434433e3930e6a95899aea15df 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -1101,34 +1101,35 @@ int32_t tsdbFSUpdDel(STsdb *pTsdb, STsdbFS *pFS, STsdbFS *pFSNew, int32_t maxFid ASSERT(pSetOld->pHeadF->commitID == pSetNew->pHeadF->commitID); ASSERT(pSetOld->pHeadF->size == pSetNew->pHeadF->size); ASSERT(pSetOld->pHeadF->offset == pSetNew->pHeadF->offset); - ASSERT(!sameDisk); - // head - *pSetOld->pHeadF = *pSetNew->pHeadF; - pSetOld->pHeadF->nRef = 1; + if (!sameDisk) { + // head + *pSetOld->pHeadF = *pSetNew->pHeadF; + pSetOld->pHeadF->nRef = 1; - // data - ASSERT(pSetOld->pDataF->size == pSetNew->pDataF->size); - *pSetOld->pDataF = *pSetNew->pDataF; - pSetOld->pDataF->nRef = 1; + // data + ASSERT(pSetOld->pDataF->size == pSetNew->pDataF->size); + *pSetOld->pDataF = *pSetNew->pDataF; + pSetOld->pDataF->nRef = 1; - // sma - ASSERT(pSetOld->pSmaF->size == pSetNew->pSmaF->size); - *pSetOld->pSmaF = *pSetNew->pSmaF; - pSetOld->pSmaF->nRef = 1; + // sma + ASSERT(pSetOld->pSmaF->size == pSetNew->pSmaF->size); + *pSetOld->pSmaF = *pSetNew->pSmaF; + pSetOld->pSmaF->nRef = 1; - // stt - ASSERT(pSetOld->nSttF == pSetNew->nSttF); - for (int32_t iStt = 0; iStt < pSetOld->nSttF; ++iStt) { - ASSERT(pSetOld->aSttF[iStt]->size == pSetNew->aSttF[iStt]->size); - ASSERT(pSetOld->aSttF[iStt]->offset == pSetNew->aSttF[iStt]->offset); + // stt + ASSERT(pSetOld->nSttF == pSetNew->nSttF); + for (int32_t iStt = 0; iStt < pSetOld->nSttF; ++iStt) { + ASSERT(pSetOld->aSttF[iStt]->size == pSetNew->aSttF[iStt]->size); + ASSERT(pSetOld->aSttF[iStt]->offset == pSetNew->aSttF[iStt]->offset); - *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; - pSetOld->aSttF[iStt]->nRef = 1; - } + *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; + pSetOld->aSttF[iStt]->nRef = 1; + } - // set diskId - pSetOld->diskId = pSetNew->diskId; + // set diskId + pSetOld->diskId = pSetNew->diskId; + } iOld++; iNew++; diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index 71d745972e58d9414b83c4f0074fe39c4f1b0419..5072d1a15efc4f3fef09087d75d3e2d6bc3dcabd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -73,6 +73,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee pTsdb->trimHdl.maxRetentFid = INT32_MIN; pTsdb->trimHdl.minCommitFid = INT32_MAX; + pTsdb->trimHdl.limitSpeed = 1; tsdbDebug("vgId:%d, tsdb is opened at %s, days:%d, keep:%d,%d,%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days, pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index db39d98b9e994c29363bc8baabd771666eacd8c2..7020e52cfa4cfd1464fba03b3c2a30d4ac8d248e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -634,10 +634,16 @@ static int64_t tsdbFSendFile(TdFilePtr pOutFD, TdFilePtr pInFD, int64_t size, in cost = taosGetTimestampMs() - startMs; tBytes += nBytes; + int64_t nSleep = 0; if (cost < 0) { - taosMsleep(1000); + nSleep = 1000; } else if (cost < 1000) { - taosMsleep(1000 - cost); + nSleep = 1000 - cost; + } + if (nSleep > 0) { + taosMsleep(nSleep); + tsdbDebug("sendFile from %p to %p, fSize:%" PRIi64 ", maxSpeed:%d, msleep:%" PRIi64, pOutFD, pInFD, size, speed, + nSleep); } } if (offset < size) { @@ -659,9 +665,9 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i char fNameFrom[TSDB_FILENAME_LEN]; char fNameTo[TSDB_FILENAME_LEN]; int32_t speed = 0; + int64_t fStatSize = 0; if (atomic_load_8(&pTsdb->trimHdl.limitSpeed)) { - ASSERT(maxSpeed > 0); speed = maxSpeed; } @@ -678,6 +684,9 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i code = TAOS_SYSTEM_ERROR(errno); goto _err; } + fStatSize = 0; + taosStatFile(fNameFrom, &fStatSize, 0); + ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage)); n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage), speed); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); @@ -699,7 +708,10 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i code = TAOS_SYSTEM_ERROR(errno); goto _err; } - n = tsdbFSendFile(pOutFD, PInFD, LOGIC_TO_FILE_OFFSET(pSetFrom->pDataF->size, szPage), speed); + fStatSize = 0; + taosStatFile(fNameFrom, &fStatSize, 0); + ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->pDataF->size, szPage)); + n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pDataF->size, szPage), speed); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -720,6 +732,9 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i code = TAOS_SYSTEM_ERROR(errno); goto _err; } + fStatSize = 0; + taosStatFile(fNameFrom, &fStatSize, 0); + ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage)); n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage), speed); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); @@ -742,6 +757,9 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i code = TAOS_SYSTEM_ERROR(errno); goto _err; } + fStatSize = 0; + taosStatFile(fNameFrom, &fStatSize, 0); + ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage)); n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage), speed); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention2.c b/source/dnode/vnode/src/tsdb/tsdbRetention2.c index 47fe3c538445325e9d5d59aeaaa08bcec536ae7d..9594334808c865c3c087cdf080d8d8f431ac89f0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention2.c @@ -17,9 +17,15 @@ enum { RETENTION_NO = 0, RETENTION_EXPIRED = 1, RETENTION_MIGRATE = 2 }; +#if 1 #define MIGRATE_MIN_FSIZE (1048576 << 9) // 512 MB #define MIGRATE_MAX_SPEED (1048576 << 5) // 32 MB #define MIGRATE_MIN_COST (5) // second +#else +#define MIGRATE_MIN_FSIZE (1048576 << 5) // 32 MB +#define MIGRATE_MAX_SPEED (1048576 << 2) // 4 MB +#define MIGRATE_MIN_COST (5) // second +#endif static bool tsdbShouldDoMigrate(STsdb *pTsdb); static int32_t tsdbShouldDoRetention(STsdb *pTsdb, int64_t now); @@ -103,7 +109,6 @@ _wait_commit_end: if (++nLoops > 1000) { nLoops = 0; sched_yield(); - printf("%s:%d sche_yield() minCommitFid:%d maxFid:%d\n", __func__, __LINE__, pTsdb->trimHdl.minCommitFid, maxFid); } } if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) { @@ -160,6 +165,7 @@ _exit: */ static int32_t tsdbProcessMigrate(STsdb *pTsdb, int64_t now, int32_t maxSpeed, int32_t retention) { int32_t code = 0; + int32_t nBatch = 0; int32_t nLoops = 0; int32_t maxFid = INT32_MIN; int64_t fSize = 0; @@ -186,6 +192,8 @@ _migrate_loop: int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now); SDiskID did; + if (pSet->diskId.level == expLevel) continue; + if (expLevel > 0) { ASSERT(pSet->fid > maxFid); maxFid = pSet->fid; @@ -197,6 +205,7 @@ _migrate_loop: fSize += pSet->aSttF[iStt]->size; } if (fSize / MIGRATE_MAX_SPEED > MIGRATE_MIN_COST) { + tsdbInfo("vgId:%d migrate loop[%d] with maxFid:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid); break; } } @@ -209,7 +218,6 @@ _wait_commit_end: if (++nLoops > 1000) { nLoops = 0; sched_yield(); - printf("%s:%d sche_yield()\n", __func__, __LINE__); } } if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) { @@ -231,9 +239,14 @@ _wait_commit_end: if (pSet->fid > maxFid) break; + tsdbInfo("vgId:%d migrate loop[%d] with maxFid:%d, fid:%d, did:%d, level:%d, expLevel:%d", TD_VID(pTsdb->pVnode), + nBatch, maxFid, pSet->fid, pSet->diskId.id, pSet->diskId.level, expLevel); + if (expLevel < 0) { SET_DFSET_EXPIRED(pSet); - } else if (expLevel > 0) { + } else { + if (expLevel == pSet->diskId.level) continue; + if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) { code = terrno; goto _exit; @@ -277,6 +290,9 @@ _merge_fs: } taosThreadRwlockUnlock(&pTsdb->rwLock); + ++nBatch; + goto _migrate_loop; + _exit: tsdbFSDestroy(&fs); tsdbFSDestroy(&fsLatest); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 0263740e4a581b2d1d4c2bbefd8c1d601be51483..a80211f402dc3f93805cecb7b182cc5caf1b377c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -389,7 +389,7 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, goto _exit; } - vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp); + vInfo("vgId:%d, trim vnode request will be processed, time:%" PRIi64, pVnode->config.vgId, trimReq.timestamp); // process code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp); @@ -461,7 +461,7 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, goto _exit; } - vInfo("vgId:%d, trim vnode request will be processed, time:%d", TD_VID(pVnode), pVndTrimReq->trimReq.timestamp); + vInfo("vgId:%d, trim vnode request will be processed, time:%" PRIi64, TD_VID(pVnode), pVndTrimReq->trimReq.timestamp); TdThreadAttr thAttr = {0}; taosThreadAttrInit(&thAttr);