From 43c954b6e7eddf3dc7af3d338f6d4d54316748a9 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 27 Sep 2022 14:27:18 +0800 Subject: [PATCH] chore: data migration support specify max speed --- include/common/tmsg.h | 2 +- source/common/src/tmsg.c | 4 +- source/dnode/mnode/impl/src/mndDb.c | 17 ++++++-- source/dnode/vnode/src/inc/tsdb.h | 6 +-- source/dnode/vnode/src/inc/vnodeInt.h | 4 +- source/dnode/vnode/src/sma/smaRollup.c | 2 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 5 ++- source/dnode/vnode/src/tsdb/tsdbOpen.c | 2 +- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 42 +++++++++++-------- source/dnode/vnode/src/tsdb/tsdbRetention2.c | 12 ++++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 11 ++--- 11 files changed, 65 insertions(+), 42 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a6de4f8863..30d38d1177 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -875,7 +875,7 @@ int32_t tDeserializeSTrimDbReq(void* buf, int32_t bufLen, STrimDbReq* pReq); typedef struct { int64_t timestamp; // unit: millisecond - int32_t maxSpeed; // 0 no limit, unit: bit/s + int64_t maxSpeed; // 0 no limit, unit: Byte/s } SVTrimDbReq; int32_t tSerializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 3793b0188d..04aed7ca36 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2709,7 +2709,7 @@ int32_t tSerializeSVTrimDbReq(void *buf, int32_t bufLen, SVTrimDbReq *pReq) { if (tStartEncode(&encoder) < 0) return -1; if (tEncodeI64(&encoder, pReq->timestamp) < 0) return -1; - if (tEncodeI32(&encoder, pReq->maxSpeed) < 0) return -1; + if (tEncodeI64(&encoder, pReq->maxSpeed) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -2723,7 +2723,7 @@ int32_t tDeserializeSVTrimDbReq(void *buf, int32_t bufLen, SVTrimDbReq *pReq) { if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI64(&decoder, &pReq->timestamp) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->maxSpeed) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->maxSpeed) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 832a4617b0..cb47e9dac1 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1385,11 +1385,19 @@ _OVER: return code; } -static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) { +/** + * @brief trim database + * + * @param pMnode + * @param pDb + * @param maxSpeed MB/s + * @return int32_t + */ +static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb, int32_t maxSpeed) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; void *pIter = NULL; - SVTrimDbReq trimReq = {.timestamp = taosGetTimestampMs(), .maxSpeed = 1048576 << 2}; // TODO: use specified maxSpeed + SVTrimDbReq trimReq = {.timestamp = taosGetTimestampMs(), .maxSpeed = maxSpeed << 20}; int32_t reqLen = tSerializeSVTrimDbReq(NULL, 0, &trimReq); int32_t contLen = reqLen + sizeof(SMsgHead); @@ -1413,7 +1421,8 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) { if (code != 0) { mError("vgId:%d, failed to send vnode-trim request to vnode since 0x%x", pVgroup->vgId, code); } else { - mInfo("vgId:%d, send vnode-trim request to vnode, time:%" PRIi64, pVgroup->vgId, trimReq.timestamp); + mInfo("vgId:%d, send vnode-trim request to vnode, time:%" PRIi64 ", max speed:%" PRIi64, pVgroup->vgId, + trimReq.timestamp, trimReq.maxSpeed); } sdbRelease(pSdb, pVgroup); } @@ -1443,7 +1452,7 @@ static int32_t mndProcessTrimDbReq(SRpcMsg *pReq) { goto _OVER; } - code = mndTrimDb(pMnode, pDb); + code = mndTrimDb(pMnode, pDb, trimReq.maxSpeed); _OVER: if (code != 0) { diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index f379a7c528..251e364497 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -264,7 +264,7 @@ int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk); int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, int8_t cmprAlg, int8_t toLast); -int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, int32_t maxSpeed); +int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, int64_t maxSpeed); // SDataFReader int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFReaderClose(SDataFReader **ppReader); @@ -320,8 +320,8 @@ struct STsdbFS { }; struct STsdbTrimHdl { - volatile int8_t state; // 0 idle 1 in use - volatile int8_t limitSpeed; // 0 no limit, 1 with limit + volatile int8_t state; // 0 idle 1 in use + volatile int8_t commitInWait; // 0 not in wait, 1 in wait volatile int32_t maxRetentFid; volatile int32_t minCommitFid; }; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index d9acb6da21..8757dc88e6 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -145,7 +145,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepC int tsdbClose(STsdb** pTsdb); int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb); -int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now, int32_t maxSpeed); +int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now, int64_t maxSpeed); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, @@ -200,7 +200,7 @@ int32_t smaSyncPostCommit(SSma* pSma); int32_t smaAsyncPreCommit(SSma* pSma); int32_t smaAsyncCommit(SSma* pSma); int32_t smaAsyncPostCommit(SSma* pSma); -int32_t smaDoRetention(SSma* pSma, int64_t now, int32_t maxSpeed); +int32_t smaDoRetention(SSma* pSma, int64_t now, int64_t maxSpeed); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 4f5f741b9e..e149abbace 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -664,7 +664,7 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { * @param maxSpeed * @return int32_t */ -int32_t smaDoRetention(SSma *pSma, int64_t now, int32_t maxSpeed) { +int32_t smaDoRetention(SSma *pSma, int64_t now, int64_t maxSpeed) { int32_t code = TSDB_CODE_SUCCESS; if (!VND_IS_RSMA(pSma->pVnode)) { return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index bc9c37fc76..7e8aeafe8a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -763,10 +763,11 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { _wait_retention_end: while (atomic_load_32(&pTsdb->trimHdl.maxRetentFid) >= minCommitFid) { - atomic_val_compare_exchange_8(&pTsdb->trimHdl.limitSpeed, 1, 0); + atomic_val_compare_exchange_8(&pTsdb->trimHdl.commitInWait, 0, 1); if (++nLoops > 1000) { nLoops = 0; sched_yield(); + printf("%s:%d wait retention to finish\n", __func__, __LINE__); } } if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) { @@ -779,7 +780,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { } else { goto _wait_retention_end; } - atomic_store_8(&pTsdb->trimHdl.limitSpeed, 1); + atomic_store_8(&pTsdb->trimHdl.commitInWait, 0); } code = tsdbFSCopy(pTsdb, &pCommitter->fs); diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index 5072d1a15e..8ce61d49b9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -73,7 +73,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee pTsdb->trimHdl.maxRetentFid = INT32_MIN; pTsdb->trimHdl.minCommitFid = INT32_MAX; - pTsdb->trimHdl.limitSpeed = 1; + pTsdb->trimHdl.commitInWait = 0; tsdbDebug("vgId:%d, tsdb is opened at %s, days:%d, keep:%d,%d,%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days, pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 7020e52cfa..31fe940848 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -610,13 +610,14 @@ _err: /** * @brief send file with limited speed(rough control) * + * @param pTsdb * @param pFileOut * @param pFileIn * @param size * @param speed 0 no limit, unit: B/s * @return int64_t */ -static int64_t tsdbFSendFile(TdFilePtr pOutFD, TdFilePtr pInFD, int64_t size, int32_t speed) { +static int64_t tsdbFSendFile(STsdb *pTsdb, TdFilePtr pOutFD, TdFilePtr pInFD, int64_t size, int64_t speed) { if (speed <= 0) { return taosFSendFile(pOutFD, pInFD, 0, size); } @@ -626,9 +627,15 @@ static int64_t tsdbFSendFile(TdFilePtr pOutFD, TdFilePtr pInFD, int64_t size, in int64_t nBytes = 0; int64_t startMs = 0; int64_t cost = 0; - while ((offset + speed) < size) { + + while ((offset + nBytes) < size) { + if (atomic_load_8(&pTsdb->trimHdl.commitInWait) == 1) { + tsdbInfo("vgId:%d sendFile without limit since conflicts, fSize:%" PRIi64 ", maxSpeed:%" PRIi64, + TD_VID(pTsdb->pVnode), size, speed); + goto _send_remain; + } startMs = taosGetTimestampMs(); - if ((nBytes = taosFSendFile(pOutFD, pInFD, &offset, speed)) < 0) { + if ((nBytes = taosFSendFile(pOutFD, pInFD, &offset, speed / 10)) < 0) { return nBytes; } cost = taosGetTimestampMs() - startMs; @@ -642,20 +649,24 @@ static int64_t tsdbFSendFile(TdFilePtr pOutFD, TdFilePtr pInFD, int64_t size, in } if (nSleep > 0) { taosMsleep(nSleep); - tsdbDebug("sendFile from %p to %p, fSize:%" PRIi64 ", maxSpeed:%d, msleep:%" PRIi64, pOutFD, pInFD, size, speed, - nSleep); + tsdbInfo("vgId:%d sendFile and msleep:%" PRIi64 ", fSize:%" PRIi64 ", tBytes:%" PRIi64 " maxSpeed:%" PRIi64, + TD_VID(pTsdb->pVnode), nSleep, size, tBytes, speed); } } + +_send_remain: if (offset < size) { if ((nBytes = taosFSendFile(pOutFD, pInFD, &offset, size - offset)) < 0) { return nBytes; } tBytes += nBytes; + tsdbInfo("vgId:%d sendFile remain, fSize:%" PRIi64 ", tBytes:%" PRIi64 " maxSpeed:%" PRIi64, TD_VID(pTsdb->pVnode), + size, tBytes, speed); } return tBytes; } -int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, int32_t maxSpeed) { +int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, int64_t maxSpeed) { int32_t code = 0; int64_t n; int64_t size; @@ -664,13 +675,8 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i int32_t szPage = pTsdb->pVnode->config.szPage; char fNameFrom[TSDB_FILENAME_LEN]; char fNameTo[TSDB_FILENAME_LEN]; - int32_t speed = 0; int64_t fStatSize = 0; - if (atomic_load_8(&pTsdb->trimHdl.limitSpeed)) { - speed = maxSpeed; - } - // head tsdbHeadFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->pHeadF, fNameFrom); tsdbHeadFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->pHeadF, fNameTo); @@ -687,7 +693,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i fStatSize = 0; taosStatFile(fNameFrom, &fStatSize, 0); ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage)); - n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage), speed); + n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pHeadF->size, szPage), maxSpeed); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -711,7 +717,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i fStatSize = 0; taosStatFile(fNameFrom, &fStatSize, 0); ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->pDataF->size, szPage)); - n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pDataF->size, szPage), speed); + n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pDataF->size, szPage), maxSpeed); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -735,7 +741,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i fStatSize = 0; taosStatFile(fNameFrom, &fStatSize, 0); ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage)); - n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage), speed); + n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->pSmaF->size, szPage), maxSpeed); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -757,10 +763,10 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo, i code = TAOS_SYSTEM_ERROR(errno); goto _err; } - fStatSize = 0; - taosStatFile(fNameFrom, &fStatSize, 0); - ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage)); - n = tsdbFSendFile(pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage), speed); + fStatSize = 0; + taosStatFile(fNameFrom, &fStatSize, 0); + ASSERT(fStatSize == tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage)); + n = tsdbFSendFile(pTsdb, pOutFD, PInFD, tsdbLogicToFileSize(pSetFrom->aSttF[iStt]->size, szPage), maxSpeed); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention2.c b/source/dnode/vnode/src/tsdb/tsdbRetention2.c index 9594334808..acd5a95649 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention2.c @@ -30,7 +30,7 @@ enum { RETENTION_NO = 0, RETENTION_EXPIRED = 1, RETENTION_MIGRATE = 2 }; static bool tsdbShouldDoMigrate(STsdb *pTsdb); static int32_t tsdbShouldDoRetention(STsdb *pTsdb, int64_t now); static int32_t tsdbProcessExpire(STsdb *pTsdb, int64_t now, int32_t retention); -static int32_t tsdbProcessMigrate(STsdb *pTsdb, int64_t now, int32_t maxSpeed, int32_t retention); +static int32_t tsdbProcessMigrate(STsdb *pTsdb, int64_t now, int64_t maxSpeed, int32_t retention); static bool tsdbShouldDoMigrate(STsdb *pTsdb) { if (tfsGetLevel(pTsdb->pVnode->pTfs) < 2) { @@ -109,6 +109,7 @@ _wait_commit_end: if (++nLoops > 1000) { nLoops = 0; sched_yield(); + printf("%s:%d wait commit finished\n", __func__, __LINE__); } } if (atomic_val_compare_exchange_8(&pTsdb->trimHdl.state, 0, 1) == 0) { @@ -163,7 +164,7 @@ _exit: * @param retention * @return int32_t */ -static int32_t tsdbProcessMigrate(STsdb *pTsdb, int64_t now, int32_t maxSpeed, int32_t retention) { +static int32_t tsdbProcessMigrate(STsdb *pTsdb, int64_t now, int64_t maxSpeed, int32_t retention) { int32_t code = 0; int32_t nBatch = 0; int32_t nLoops = 0; @@ -183,6 +184,11 @@ _migrate_loop: tsdbFSDestroy(&fs); tsdbFSDestroy(&fsLatest); + if (atomic_load_8(&pTsdb->trimHdl.commitInWait) == 1) { + atomic_store_32(&pTsdb->trimHdl.maxRetentFid, INT32_MIN); + taosMsleep(10); + } + code = tsdbFSCopy(pTsdb, &fs); if (code) goto _exit; @@ -315,7 +321,7 @@ _exit: * @param maxSpeed * @return int32_t */ -int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now, int32_t maxSpeed) { +int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now, int64_t maxSpeed) { int32_t code = 0; int32_t retention = RETENTION_NO; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index a80211f402..4139a8ca2b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -455,13 +455,14 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, } if (atomic_val_compare_exchange_8(&pHandle->state, 0, 1) != 0) { - vInfo("vgId:%d, trim vnode request will not be processed since duplicated req, time:%" PRIi64, TD_VID(pVnode), - pVndTrimReq->trimReq.timestamp); + vInfo("vgId:%d, trim vnode request ignored since duplicated req, time:%" PRIi64 ", max speed:%" PRIi64, + TD_VID(pVnode), pVndTrimReq->trimReq.timestamp, pVndTrimReq->trimReq.maxSpeed); taosMemoryFree(pVndTrimReq); goto _exit; } - vInfo("vgId:%d, trim vnode request will be processed, time:%" PRIi64, TD_VID(pVnode), pVndTrimReq->trimReq.timestamp); + vInfo("vgId:%d, trim vnode request will be processed, time:%" PRIi64 ", max speed:%" PRIi64, TD_VID(pVnode), + pVndTrimReq->trimReq.timestamp, pVndTrimReq->trimReq.maxSpeed); TdThreadAttr thAttr = {0}; taosThreadAttrInit(&thAttr); @@ -474,10 +475,10 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, taosThreadAttrDestroy(&thAttr); int8_t oldVal = atomic_val_compare_exchange_8(&pHandle->state, 1, 0); ASSERT(oldVal == 1); - vError("vgId:%d, failed to create pthread for trim vnode since %s", TD_VID(pVnode), tstrerror(code)); + vError("vgId:%d, failed to create pthread to trim vnode since %s", TD_VID(pVnode), tstrerror(code)); goto _exit; } - vDebug("vgId:%d, success to create pthread for trim vnode", TD_VID(pVnode)); + vDebug("vgId:%d, success to create pthread to trim vnode", TD_VID(pVnode)); taosThreadAttrDestroy(&thAttr); -- GitLab