提交 9169c90f 编写于 作者: C Cary Xu

fix: migrate logic for multi-tier storage

上级 f8702c03
......@@ -1389,7 +1389,7 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
SVTrimDbReq trimReq = {.timestamp = taosGetTimestampMs(), .maxSpeed = 1048576 << 5}; // TODO: use specified maxSpeed
SVTrimDbReq trimReq = {.timestamp = taosGetTimestampMs(), .maxSpeed = 1048576 << 2}; // TODO: use specified maxSpeed
int32_t reqLen = tSerializeSVTrimDbReq(NULL, 0, &trimReq);
int32_t contLen = reqLen + sizeof(SMsgHead);
......@@ -1413,7 +1413,7 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) {
if (code != 0) {
mError("vgId:%d, failed to send vnode-trim request to vnode since 0x%x", pVgroup->vgId, code);
} else {
mInfo("vgId:%d, send vnode-trim request to vnode, time:%d", pVgroup->vgId, trimReq.timestamp);
mInfo("vgId:%d, send vnode-trim request to vnode, time:%" PRIi64, pVgroup->vgId, trimReq.timestamp);
}
sdbRelease(pSdb, pVgroup);
}
......
......@@ -1101,34 +1101,35 @@ int32_t tsdbFSUpdDel(STsdb *pTsdb, STsdbFS *pFS, STsdbFS *pFSNew, int32_t maxFid
ASSERT(pSetOld->pHeadF->commitID == pSetNew->pHeadF->commitID);
ASSERT(pSetOld->pHeadF->size == pSetNew->pHeadF->size);
ASSERT(pSetOld->pHeadF->offset == pSetNew->pHeadF->offset);
ASSERT(!sameDisk);
// head
*pSetOld->pHeadF = *pSetNew->pHeadF;
pSetOld->pHeadF->nRef = 1;
if (!sameDisk) {
// head
*pSetOld->pHeadF = *pSetNew->pHeadF;
pSetOld->pHeadF->nRef = 1;
// data
ASSERT(pSetOld->pDataF->size == pSetNew->pDataF->size);
*pSetOld->pDataF = *pSetNew->pDataF;
pSetOld->pDataF->nRef = 1;
// data
ASSERT(pSetOld->pDataF->size == pSetNew->pDataF->size);
*pSetOld->pDataF = *pSetNew->pDataF;
pSetOld->pDataF->nRef = 1;
// sma
ASSERT(pSetOld->pSmaF->size == pSetNew->pSmaF->size);
*pSetOld->pSmaF = *pSetNew->pSmaF;
pSetOld->pSmaF->nRef = 1;
// sma
ASSERT(pSetOld->pSmaF->size == pSetNew->pSmaF->size);
*pSetOld->pSmaF = *pSetNew->pSmaF;
pSetOld->pSmaF->nRef = 1;
// stt
ASSERT(pSetOld->nSttF == pSetNew->nSttF);
for (int32_t iStt = 0; iStt < pSetOld->nSttF; ++iStt) {
ASSERT(pSetOld->aSttF[iStt]->size == pSetNew->aSttF[iStt]->size);
ASSERT(pSetOld->aSttF[iStt]->offset == pSetNew->aSttF[iStt]->offset);
// stt
ASSERT(pSetOld->nSttF == pSetNew->nSttF);
for (int32_t iStt = 0; iStt < pSetOld->nSttF; ++iStt) {
ASSERT(pSetOld->aSttF[iStt]->size == pSetNew->aSttF[iStt]->size);
ASSERT(pSetOld->aSttF[iStt]->offset == pSetNew->aSttF[iStt]->offset);
*pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
pSetOld->aSttF[iStt]->nRef = 1;
}
*pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
pSetOld->aSttF[iStt]->nRef = 1;
}
// set diskId
pSetOld->diskId = pSetNew->diskId;
// set diskId
pSetOld->diskId = pSetNew->diskId;
}
iOld++;
iNew++;
......
......@@ -73,6 +73,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
pTsdb->trimHdl.maxRetentFid = INT32_MIN;
pTsdb->trimHdl.minCommitFid = INT32_MAX;
pTsdb->trimHdl.limitSpeed = 1;
tsdbDebug("vgId:%d, tsdb is opened at %s, days:%d, keep:%d,%d,%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days,
pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2);
......
......@@ -634,10 +634,16 @@ static int64_t tsdbFSendFile(TdFilePtr pOutFD, TdFilePtr pInFD, int64_t size, in
cost = taosGetTimestampMs() - startMs;
tBytes += nBytes;
int64_t nSleep = 0;
if (cost < 0) {
taosMsleep(1000);
nSleep = 1000;
} else if (cost < 1000) {
taosMsleep(1000 - cost);
nSleep = 1000 - cost;
}
if (nSleep > 0) {
taosMsleep(nSleep);
tsdbDebug("sendFile from %p to %p, fSize:%" PRIi64 ", maxSpeed:%d, msleep:%" PRIi64, pOutFD, pInFD, size, speed,
nSleep);
}
}
if (offset < size) {
......@@ -659,9 +665,9 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i
char fNameFrom[TSDB_FILENAME_LEN];
char fNameTo[TSDB_FILENAME_LEN];
int32_t speed = 0;
int64_t fStatSize = 0;
if (atomic_load_8(&pTsdb->trimHdl.limitSpeed)) {
ASSERT(maxSpeed > 0);
speed = maxSpeed;
}
......@@ -678,6 +684,9 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
fStatSize = 0;
taosStatFile(fNameFrom, &fStatSize, 0);
ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage));
n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage), speed);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
......@@ -699,7 +708,10 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = tsdbFSendFile(pOutFD, PInFD, LOGIC_TO_FILE_OFFSET(pSetFrom->pDataF->size, szPage), speed);
fStatSize = 0;
taosStatFile(fNameFrom, &fStatSize, 0);
ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->pDataF->size, szPage));
n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pDataF->size, szPage), speed);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -720,6 +732,9 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
fStatSize = 0;
taosStatFile(fNameFrom, &fStatSize, 0);
ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage));
n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage), speed);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
......@@ -742,6 +757,9 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
fStatSize = 0;
taosStatFile(fNameFrom, &fStatSize, 0);
ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage));
n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage), speed);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
......
......@@ -17,9 +17,15 @@
enum { RETENTION_NO = 0, RETENTION_EXPIRED = 1, RETENTION_MIGRATE = 2 };
#if 1
#define MIGRATE_MIN_FSIZE (1048576 << 9) // 512 MB
#define MIGRATE_MAX_SPEED (1048576 << 5) // 32 MB
#define MIGRATE_MIN_COST (5) // second
#else
#define MIGRATE_MIN_FSIZE (1048576 << 5) // 32 MB
#define MIGRATE_MAX_SPEED (1048576 << 2) // 4 MB
#define MIGRATE_MIN_COST (5) // second
#endif
static bool tsdbShouldDoMigrate(STsdb *pTsdb);
static int32_t tsdbShouldDoRetention(STsdb *pTsdb, int64_t now);
......@@ -103,7 +109,6 @@ _wait_commit_end:
if (++nLoops > 1000) {
nLoops = 0;
sched_yield();
printf("%s:%d sche_yield() minCommitFid:%d maxFid:%d\n", __func__, __LINE__, pTsdb->trimHdl.minCommitFid, maxFid);
}
}
if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) {
......@@ -160,6 +165,7 @@ _exit:
*/
static int32_t tsdbProcessMigrate(STsdb *pTsdb, int64_t now, int32_t maxSpeed, int32_t retention) {
int32_t code = 0;
int32_t nBatch = 0;
int32_t nLoops = 0;
int32_t maxFid = INT32_MIN;
int64_t fSize = 0;
......@@ -186,6 +192,8 @@ _migrate_loop:
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
SDiskID did;
if (pSet->diskId.level == expLevel) continue;
if (expLevel > 0) {
ASSERT(pSet->fid > maxFid);
maxFid = pSet->fid;
......@@ -197,6 +205,7 @@ _migrate_loop:
fSize += pSet->aSttF[iStt]->size;
}
if (fSize / MIGRATE_MAX_SPEED > MIGRATE_MIN_COST) {
tsdbInfo("vgId:%d migrate loop[%d] with maxFid:%d", TD_VID(pTsdb->pVnode), nBatch, maxFid);
break;
}
}
......@@ -209,7 +218,6 @@ _wait_commit_end:
if (++nLoops > 1000) {
nLoops = 0;
sched_yield();
printf("%s:%d sche_yield()\n", __func__, __LINE__);
}
}
if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) {
......@@ -231,9 +239,14 @@ _wait_commit_end:
if (pSet->fid > maxFid) break;
tsdbInfo("vgId:%d migrate loop[%d] with maxFid:%d, fid:%d, did:%d, level:%d, expLevel:%d", TD_VID(pTsdb->pVnode),
nBatch, maxFid, pSet->fid, pSet->diskId.id, pSet->diskId.level, expLevel);
if (expLevel < 0) {
SET_DFSET_EXPIRED(pSet);
} else if (expLevel > 0) {
} else {
if (expLevel == pSet->diskId.level) continue;
if (tfsAllocDisk(pTsdb->pVnode->pTfs, expLevel, &did) < 0) {
code = terrno;
goto _exit;
......@@ -277,6 +290,9 @@ _merge_fs:
}
taosThreadRwlockUnlock(&pTsdb->rwLock);
++nBatch;
goto _migrate_loop;
_exit:
tsdbFSDestroy(&fs);
tsdbFSDestroy(&fsLatest);
......
......@@ -389,7 +389,7 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq,
goto _exit;
}
vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
vInfo("vgId:%d, trim vnode request will be processed, time:%" PRIi64, pVnode->config.vgId, trimReq.timestamp);
// process
code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp);
......@@ -461,7 +461,7 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq,
goto _exit;
}
vInfo("vgId:%d, trim vnode request will be processed, time:%d", TD_VID(pVnode), pVndTrimReq->trimReq.timestamp);
vInfo("vgId:%d, trim vnode request will be processed, time:%" PRIi64, TD_VID(pVnode), pVndTrimReq->trimReq.timestamp);
TdThreadAttr thAttr = {0};
taosThreadAttrInit(&thAttr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册