提交 ed2b3678 编写于 作者: C Cary Xu

feat(tsdb): migrate support speed limit

上级 797b9376
......@@ -265,7 +265,7 @@ int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo,
int8_t cmprAlg, int8_t toLast);
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo);
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, int32_t maxSpeed);
// SDataFReader
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet);
int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
......
......@@ -763,6 +763,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
_wait_retention_end:
while (atomic_load_32(&pTsdb->trimHdl.maxRetentFid) >= minCommitFid) {
atomic_val_compare_exchange_8(&pTsdb->trimHdl.limitSpeed, 1, 0);
if (++nLoops > 1000) {
nLoops = 0;
sched_yield();
......@@ -778,6 +779,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
} else {
goto _wait_retention_end;
}
atomic_store_8(&pTsdb->trimHdl.limitSpeed, 1);
}
code = tsdbFSCopy(pTsdb, &pCommitter->fs);
......
......@@ -607,7 +607,43 @@ _err:
return code;
}
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
/**
* @brief send file with limited speed(rough control)
*
* @param pFileOut
* @param pFileIn
* @param size
* @param speed 0 no limit, unit: B/s
* @return int64_t
*/
static int64_t tsdbFSendFile(TdFilePtr pOutFD, TdFilePtr pInFD, int64_t size, int32_t speed) {
if (speed <= 0) {
return taosFSendFile(pOutFD, pInFD, 0, size);
}
int64_t offset = 0;
int64_t nBytes = 0;
int64_t startMs = 0;
int64_t endMs = 0;
int64_t cost = 0;
while ((offset + speed) < size) {
startMs = taosGetTimestampMs();
nBytes += taosFSendFile(pOutFD, pInFD, &offset, speed);
cost = taosGetTimestampMs() - startMs;
if (cost < 0) {
taosMsleep(1000);
} else if (cost < 1000) {
taosMsleep(1000 - cost);
}
}
if (offset < size) {
nBytes += taosFSendFile(pOutFD, pInFD, &offset, size - offset);
}
return nBytes;
}
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, int32_t maxSpeed) {
int32_t code = 0;
int64_t n;
int64_t size;
......@@ -616,6 +652,12 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
int32_t szPage = pTsdb->pVnode->config.szPage;
char fNameFrom[TSDB_FILENAME_LEN];
char fNameTo[TSDB_FILENAME_LEN];
int32_t speed = 0;
if (atomic_load_8(&pTsdb->trimHdl.limitSpeed)) {
ASSERT(maxSpeed > 0);
speed = maxSpeed;
}
// head
tsdbHeadFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pHeadF, fNameFrom);
......@@ -630,7 +672,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage));
n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage), speed);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -651,7 +693,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosFSendFile(pOutFD, PInFD, 0, LOGIC_TO_FILE_OFFSET(pSetFrom->pDataF->size, szPage));
n = tsdbFSendFile(pOutFD, PInFD, LOGIC_TO_FILE_OFFSET(pSetFrom->pDataF->size, szPage), speed);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -672,7 +714,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage));
n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage), speed);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......@@ -694,7 +736,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
n = taosFSendFile(pOutFD, PInFD, 0, tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage));
n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage), speed);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
......
......@@ -245,7 +245,7 @@ _wait_commit_end:
SDFileSet fSet = *pSet;
fSet.diskId = did;
code = tsdbDFileSetCopy(pTsdb, pSet, &fSet);
code = tsdbDFileSetCopy(pTsdb, pSet, &fSet, maxSpeed);
if (code) goto _exit;
code = tsdbFSUpsertFSet(&fs, &fSet);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册