diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index eca57f4a001d0df2a59872580de60022a6669919..9d82245c2199b5fa0b62d709a08633e5a976b007 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -418,7 +418,8 @@ int tsdbCompact(STsdbRepo *pRepo); // no problem return true bool tsdbNoProblem(STsdbRepo* pRepo); -void tsdbSetWalSize(STsdbRepo *pRepo, int64_t walSize); +// unit of walSize: MB +int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize); #ifdef __cplusplus } diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 7c3fedd2ab72eb422b86dd207422a99bfa5d163a..80e92975799f47d68ff72ef80a52efb6fe901b5e 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -94,8 +94,6 @@ struct STsdbRepo { pthread_mutex_t mutex; bool repoLocked; int32_t code; // Commit code - int64_t commitWalSize; // last commit wal size. MB - int64_t walSize; // current wal size. MB SMergeBuf mergeBuf; //used when update=2 int8_t compactState; // compact state: inCompact/noCompact/waitingCompact? @@ -111,7 +109,7 @@ struct STsdbRepo { int tsdbLockRepo(STsdbRepo* pRepo); int tsdbUnlockRepo(STsdbRepo* pRepo); STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo); -int tsdbCheckCommit(STsdbRepo* pRepo, bool checkWal); +int tsdbCheckCommit(STsdbRepo* pRepo); int tsdbRestoreInfo(STsdbRepo* pRepo); int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg); void tsdbGetRootDir(int repoid, char dirName[]); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index eb09432620fbdcb4bb3c26d364717fb172f1e772..17aecd3678b20e68d90748f1f31901a197316bf1 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -185,14 +185,7 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) { return 0; } -void tsdbSetWalSize(STsdbRepo *pRepo, int64_t walSize) { // MB - atomic_store_64(&pRepo->walSize,walSize); - if(walSize <= atomic_load_64(&pRepo->commitWalSize)) { - atomic_store_64(&pRepo->commitWalSize, 0); // restart - } -} - -int tsdbCheckCommit(STsdbRepo *pRepo, bool checkWal) { +int tsdbCheckCommit(STsdbRepo *pRepo) { ASSERT(pRepo->mem != NULL); STsdbCfg *pCfg = &(pRepo->config); @@ -202,14 +195,15 @@ int tsdbCheckCommit(STsdbRepo *pRepo, bool checkWal) { ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) { // trigger commit if (tsdbAsyncCommit(pRepo) < 0) return -1; - } else if (checkWal) { - int64_t walSize = atomic_load_64(&pRepo->walSize); - int64_t commitWalSize = atomic_load_64(&pRepo->commitWalSize); - int64_t delta = walSize - commitWalSize; - if ((delta > tsdbWalFlushSize) && (delta > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) { - atomic_store_64(&pRepo->commitWalSize, walSize); - if (tsdbAsyncCommit(pRepo) < 0) return -1; - } + } + return 0; +} + +int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB + STsdbCfg *pCfg = &(pRepo->config); + + if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) { + if (tsdbAsyncCommit(pRepo) < 0) return -1; } return 0; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index ea97f5a41c21f246aa51845bdc64c3ca9d49dd84..3890dca5b96c26009dcf3ca72205ca4b1725aa29 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -77,7 +77,7 @@ int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pR if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows); - if (tsdbCheckCommit(pRepo, false) < 0) return -1; + if (tsdbCheckCommit(pRepo) < 0) return -1; return 0; } diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index b22e48c0ee8702ae692b544481152748597d86a5..72c86018e83399b7368130dc5b2e5af386caa041 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -140,7 +140,7 @@ int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg) { goto _err; } - if (tsdbCheckCommit(pRepo, true) < 0) return -1; + if (tsdbCheckCommit(pRepo) < 0) return -1; return 0; @@ -191,7 +191,7 @@ int tsdbDropTable(STsdbRepo *repo, STableId tableId) { tsdbDebug("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, tbname, tid, uid); free(tbname); - if (tsdbCheckCommit(pRepo, false) < 0) goto _err; + if (tsdbCheckCommit(pRepo) < 0) goto _err; return 0; @@ -416,7 +416,7 @@ int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) { } tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable); - if (tsdbCheckCommit(pRepo, false) < 0) return -1; + if (tsdbCheckCommit(pRepo) < 0) return -1; return 0; } diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 4864b79dc4ee7c718ad7c023277793f21e74446d..1deceebb0ad3bd146a3cd81fab4cabb2d290b037 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -56,6 +56,7 @@ typedef struct { uint64_t version; // current version uint64_t cversion; // version while commit start uint64_t fversion; // version on saved data file + uint32_t tblMsgVer; // create table msg version void * wqueue; // write queue void * qqueue; // read query queue void * fqueue; // read fetch/cancel queue diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index d3aa927f81428a194aa5e06ad321b1914ec6d2a1..92ad4c0f9b12ab424faccf8bc6aaebe01be0a0b0 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -556,7 +556,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { if (!vnodeInInitStatus(pVnode)) { walRemoveOneOldFile(pVnode->wal); } - // vnodeGetVersion() and calling tsdbSetWalSize() would reply on the vnode isCommiting state + // vnodeGetVersion() and calling tsdbCheckWal() would reply on the vnode isCommiting state pVnode->isCommiting = 0; return vnodeSaveVersion(pVnode); } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 87ef9cf5722a0cb2ab60ca2b0100464447050243..f944a39baade584af1ce06b050f8a34e487a65a0 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -176,17 +176,16 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe return terrno; } - // no need to set wal size to trigger commit if vnode in committing state - // TODO: retrieve pVnode->isCommiting need atomic operation? - if (((pVnode->version & 8191) == 0) && (pVnode->isCommiting == 0)) { - tsdbSetWalSize(pVnode->tsdb, walGetFSize(pVnode->wal) >> 20); - } - if (tsdbCreateTable(pVnode->tsdb, pCfg) < 0) { code = terrno; ASSERT(code != 0); } + // no need to set wal size to trigger commit if vnode in committing state + if ((((++pVnode->tblMsgVer) & 32767) == 0) && (pVnode->isCommiting == 0)) { // lazy check + tsdbCheckWal(pVnode->tsdb, walGetFSize(pVnode->wal) >> 20); + } + tsdbClearTableCfg(pCfg); return code; }