From f227df663780f0dd5583e2f139d6319e29468472 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 5 Oct 2021 22:47:45 +0800 Subject: [PATCH] [TS-385]: support flush wal by walFlushSize --- src/inc/taosdef.h | 6 +++--- src/inc/tsdb.h | 2 ++ src/inc/twal.h | 1 + src/tsdb/inc/tsdbint.h | 4 +++- src/tsdb/src/tsdbMain.c | 17 ++++++++++++++++- src/tsdb/src/tsdbMemTable.c | 2 +- src/tsdb/src/tsdbMeta.c | 6 +++--- src/util/inc/tfile.h | 1 + src/util/src/tfile.c | 11 +++++++++++ src/vnode/src/vnodeWrite.c | 4 ++++ src/wal/src/walWrite.c | 10 ++++++++++ 11 files changed, 55 insertions(+), 9 deletions(-) diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 6e9782fbd1..4385d51ed6 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -278,9 +278,9 @@ do { \ #define TSDB_MAX_TOTAL_BLOCKS 10000 #define TSDB_DEFAULT_TOTAL_BLOCKS 6 -#define TSDB_MIN_WAL_FLUSH_SIZE 0 -#define TSDB_MAX_WAL_FLUSH_SIZE 10000000 -#define TSDB_DEFAULT_WAL_FLUSH_SIZE 1024 // 1024MB +#define TSDB_MIN_WAL_FLUSH_SIZE 128 // MB +#define TSDB_MAX_WAL_FLUSH_SIZE 10000000 // MB +#define TSDB_DEFAULT_WAL_FLUSH_SIZE 1024 // MB #define TSDB_MIN_TABLES 4 #define TSDB_MAX_TABLES 10000000 diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 4e11e4f247..eca57f4a00 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -418,6 +418,8 @@ int tsdbCompact(STsdbRepo *pRepo); // no problem return true bool tsdbNoProblem(STsdbRepo* pRepo); +void tsdbSetWalSize(STsdbRepo *pRepo, int64_t walSize); + #ifdef __cplusplus } #endif diff --git a/src/inc/twal.h b/src/inc/twal.h index 868a1fbd78..daea34daed 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -66,6 +66,7 @@ int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp); int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId); uint64_t walGetVersion(twalh); void walResetVersion(twalh, uint64_t newVer); +int64_t walGetFSize(twalh); #ifdef __cplusplus } diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 80e9297579..7c3fedd2ab 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -94,6 +94,8 @@ struct STsdbRepo { pthread_mutex_t mutex; bool repoLocked; int32_t code; // Commit code + int64_t commitWalSize; // last commit wal size. MB + int64_t walSize; // current wal size. MB SMergeBuf mergeBuf; //used when update=2 int8_t compactState; // compact state: inCompact/noCompact/waitingCompact? @@ -109,7 +111,7 @@ struct STsdbRepo { int tsdbLockRepo(STsdbRepo* pRepo); int tsdbUnlockRepo(STsdbRepo* pRepo); STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo); -int tsdbCheckCommit(STsdbRepo* pRepo); +int tsdbCheckCommit(STsdbRepo* pRepo, bool checkWal); int tsdbRestoreInfo(STsdbRepo* pRepo); int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg); void tsdbGetRootDir(int repoid, char dirName[]); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index c2021963e0..eb09432620 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -185,7 +185,14 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) { return 0; } -int tsdbCheckCommit(STsdbRepo *pRepo) { +void tsdbSetWalSize(STsdbRepo *pRepo, int64_t walSize) { // MB + atomic_store_64(&pRepo->walSize,walSize); + if(walSize <= atomic_load_64(&pRepo->commitWalSize)) { + atomic_store_64(&pRepo->commitWalSize, 0); // restart + } +} + +int tsdbCheckCommit(STsdbRepo *pRepo, bool checkWal) { ASSERT(pRepo->mem != NULL); STsdbCfg *pCfg = &(pRepo->config); @@ -195,6 +202,14 @@ int tsdbCheckCommit(STsdbRepo *pRepo) { ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) { // trigger commit if (tsdbAsyncCommit(pRepo) < 0) return -1; + } else if (checkWal) { + int64_t walSize = atomic_load_64(&pRepo->walSize); + int64_t commitWalSize = atomic_load_64(&pRepo->commitWalSize); + int64_t delta = walSize - commitWalSize; + if ((delta > tsdbWalFlushSize) && (delta > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) { + atomic_store_64(&pRepo->commitWalSize, walSize); + if (tsdbAsyncCommit(pRepo) < 0) return -1; + } } return 0; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 3890dca5b9..ea97f5a41c 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -77,7 +77,7 @@ int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pR if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows); - if (tsdbCheckCommit(pRepo) < 0) return -1; + if (tsdbCheckCommit(pRepo, false) < 0) return -1; return 0; } diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index a311868de6..f4d99b2871 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -140,7 +140,7 @@ int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) { goto _err; } - if (tsdbCheckCommit(pRepo) < 0) return -1; + if (tsdbCheckCommit(pRepo, true) < 0) return -1; return 0; @@ -191,7 +191,7 @@ int tsdbDropTable(STsdbRepo *repo, STableId tableId) { tsdbDebug("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, tbname, tid, uid); free(tbname); - if (tsdbCheckCommit(pRepo) < 0) goto _err; + if (tsdbCheckCommit(pRepo, false) < 0) goto _err; return 0; @@ -416,7 +416,7 @@ int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) { } tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable); - if (tsdbCheckCommit(pRepo) < 0) return -1; + if (tsdbCheckCommit(pRepo, false) < 0) return -1; return 0; } diff --git a/src/util/inc/tfile.h b/src/util/inc/tfile.h index 066040170e..11a04cdf94 100644 --- a/src/util/inc/tfile.h +++ b/src/util/inc/tfile.h @@ -37,6 +37,7 @@ int32_t tfFsync(int64_t tfd); bool tfValid(int64_t tfd); int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence); int32_t tfFtruncate(int64_t tfd, int64_t length); +int32_t tfStat(int64_t tfd, struct stat *pFstat); #ifdef __cplusplus } diff --git a/src/util/src/tfile.c b/src/util/src/tfile.c index 455c885e75..d975995b21 100644 --- a/src/util/src/tfile.c +++ b/src/util/src/tfile.c @@ -133,3 +133,14 @@ int32_t tfFtruncate(int64_t tfd, int64_t length) { taosReleaseRef(tsFileRsetId, tfd); return code; } + +int32_t tfStat(int64_t tfd, struct stat *pFstat) { + void *p = taosAcquireRef(tsFileRsetId, tfd); + if (p == NULL) return -1; + + int32_t fd = (int32_t)(uintptr_t)p; + int32_t code = fstat(fd, pFstat); + + taosReleaseRef(tsFileRsetId, tfd); + return code; +} diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 743398d834..8bf8e264b0 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -176,6 +176,10 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe return terrno; } + if ((pVnode->version & 8191) == 0) { + tsdbSetWalSize(pVnode->tsdb, walGetFSize(pVnode->wal) >> 20); + } + if (tsdbCreateTable(pVnode->tsdb, pCfg) < 0) { code = terrno; ASSERT(code != 0); diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index e991bf02aa..3f2df3f624 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -576,4 +576,14 @@ void walResetVersion(twalh param, uint64_t newVer) { wInfo("vgId:%d, version reset from %" PRIu64 " to %" PRIu64, pWal->vgId, pWal->version, newVer); pWal->version = newVer; +} + +int64_t walGetFSize(twalh handle) { + SWal *pWal = handle; + if (pWal == NULL) return 0; + struct stat _fstat; + if (tfStat(pWal->tfd, &_fstat) == 0) { + return _fstat.st_size; + }; + return 0; } \ No newline at end of file -- GitLab