From 9694933ec82bd481950c1ca20b3ec00ce83caeb7 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 10 Feb 2023 16:24:29 +0800 Subject: [PATCH] fix: compact issue --- source/dnode/vnode/src/inc/vnd.h | 2 +- source/dnode/vnode/src/inc/vnodeInt.h | 10 ++- source/dnode/vnode/src/tsdb/tsdbCompact.c | 13 ++-- source/dnode/vnode/src/vnd/vnodeCommit.c | 15 ++--- source/dnode/vnode/src/vnd/vnodeCompact.c | 78 ++++++++++++++-------- source/dnode/vnode/src/vnd/vnodeOpen.c | 8 +-- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 2 +- 7 files changed, 78 insertions(+), 50 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index f06548bbdd..88cd1d99e1 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -100,7 +100,7 @@ int32_t vnodeShouldCommit(SVnode* pVnode); void vnodeUpdCommitSched(SVnode* pVnode); void vnodeRollback(SVnode* pVnode); int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); -int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); +int32_t vnodeCommitInfo(const char* dir); int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); int32_t vnodeSyncCommit(SVnode* pVnode); int32_t vnodeAsyncCommit(SVnode* pVnode); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index f420f2ff8d..4c44d799b1 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -76,6 +76,7 @@ typedef struct SRSmaSnapReader SRSmaSnapReader; typedef struct SRSmaSnapWriter SRSmaSnapWriter; typedef struct SSnapDataHdr SSnapDataHdr; typedef struct SCommitInfo SCommitInfo; +typedef struct SCompactInfo SCompactInfo; typedef struct SQueryNode SQueryNode; #define VNODE_META_DIR "meta" @@ -173,6 +174,7 @@ int tsdbClose(STsdb** pTsdb); int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbPrepareCommit(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo); +int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); int32_t tsdbFinishCommit(STsdb* pTsdb); int32_t tsdbRollbackCommit(STsdb* pTsdb); int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now); @@ -450,10 +452,14 @@ struct SCommitInfo { SVnodeInfo info; SVnode* pVnode; TXN* txn; +}; - // APIs - int32_t (*commitFn)(STsdb* pTsdb, SCommitInfo* pInfo); +struct SCompactInfo { + SVnode* pVnode; + int32_t flag; + int64_t commitID; }; + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 7258f2973a..fc7df98217 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -88,9 +88,8 @@ static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) { int32_t lino = 0; STsdb *pTsdb = pCompactor->pTsdb; - - // TODO - ASSERT(0); + code = tsdbFSRollback(pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { @@ -571,12 +570,12 @@ static void tsdbEndCompact(STsdbCompactor *pCompactor) { tsdbInfo("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->commitID); } -static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) { +static int32_t tsdbBeginCompact(STsdb *pTsdb, SCompactInfo *pInfo, STsdbCompactor *pCompactor) { int32_t code = 0; int32_t lino = 0; pCompactor->pTsdb = pTsdb; - pCompactor->commitID = 0; // TODO + pCompactor->commitID = pInfo->commitID; pCompactor->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows; pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows; @@ -637,12 +636,12 @@ _exit: return code; } -int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { +int32_t tsdbCompact(STsdb *pTsdb, SCompactInfo *pInfo) { int32_t code = 0; STsdbCompactor *pCompactor = &(STsdbCompactor){0}; - if ((code = tsdbBeginCompact(pTsdb, pCompactor))) return code; + if ((code = tsdbBeginCompact(pTsdb, pInfo, pCompactor))) return code; for (;;) { SDFileSet *pSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid}, diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index f0cd08f8fa..86dc0a152e 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -113,8 +113,6 @@ int vnodeBegin(SVnode *pVnode) { int32_t code = 0; int32_t lino = 0; - pVnode->state.commitID++; - // alloc buffer pool code = vnodeGetBufPoolToUse(pVnode); TSDB_CHECK_CODE(code, lino, _exit); @@ -221,7 +219,7 @@ _err: return -1; } -int vnodeCommitInfo(const char *dir, const SVnodeInfo *pInfo) { +int vnodeCommitInfo(const char *dir) { char fname[TSDB_FILENAME_LEN]; char tfname[TSDB_FILENAME_LEN]; @@ -233,8 +231,7 @@ int vnodeCommitInfo(const char *dir, const SVnodeInfo *pInfo) { return -1; } - vInfo("vgId:%d, vnode info is committed", pInfo->config.vgId); - + vInfo("vnode info is committed, dir:%s", dir); return 0; } @@ -289,7 +286,7 @@ _err: return -1; } -int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { +static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { int32_t code = 0; int32_t lino = 0; char dir[TSDB_FILENAME_LEN] = {0}; @@ -301,7 +298,7 @@ int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { pInfo->info.config = pVnode->config; pInfo->info.state.committed = pVnode->state.applied; pInfo->info.state.commitTerm = pVnode->state.applyTerm; - pInfo->info.state.commitID = pVnode->state.commitID; + pInfo->info.state.commitID = ++pVnode->state.commitID; pInfo->pVnode = pVnode; pInfo->txn = metaGetTxn(pVnode->pMeta); @@ -336,7 +333,7 @@ _exit: vError("vgId:%d, %s failed at line %d since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, lino, tstrerror(code), pVnode->state.commitID); } else { - vDebug("vgId:%d, %s done", TD_VID(pVnode), __func__); + vDebug("vgId:%d, %s done, commit id:%d", TD_VID(pVnode), __func__, pInfo->info.state.commitID); } return code; @@ -468,7 +465,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { } // commit info - if (vnodeCommitInfo(dir, &pInfo->info) < 0) { + if (vnodeCommitInfo(dir) < 0) { code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } diff --git a/source/dnode/vnode/src/vnd/vnodeCompact.c b/source/dnode/vnode/src/vnd/vnodeCompact.c index d74993e80a..e31cfc866c 100644 --- a/source/dnode/vnode/src/vnd/vnodeCompact.c +++ b/source/dnode/vnode/src/vnd/vnodeCompact.c @@ -15,60 +15,86 @@ #include "vnd.h" -extern int32_t tsdbCompact(STsdb *pTsdb, int32_t flag); - -int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo); - -static int32_t vnodeCompactImpl(SCommitInfo *pInfo) { +static int32_t vnodeCompactTask(void *param) { int32_t code = 0; int32_t lino = 0; - // TODO - SVnode *pVnode = pInfo->pVnode; + SCompactInfo *pInfo = (SCompactInfo *)param; + SVnode *pVnode = pInfo->pVnode; - code = tsdbCompact(pVnode->pTsdb, 0); + // do compact + code = tsdbCompact(pInfo->pVnode->pTsdb, pInfo); TSDB_CHECK_CODE(code, lino, _exit); -_exit: - if (code) { - vError("vgId:%d %s failed since %s", TD_VID(pInfo->pVnode), __func__, tstrerror(code)); + // end compact + char dir[TSDB_FILENAME_LEN] = {0}; + if (pVnode->pTfs) { + snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); } else { - vDebug("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__); + snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); } + vnodeCommitInfo(dir); + +_exit: + taosMemoryFree(pInfo); + tsem_post(&pInfo->pVnode->canCommit); return code; } - -static int32_t vnodeCompactTask(void *param) { +static int32_t vnodePrepareCompact(SVnode *pVnode, SCompactInfo *pInfo) { int32_t code = 0; + int32_t lino = 0; - SCommitInfo *pInfo = (SCommitInfo *)param; + tsem_wait(&pVnode->canCommit); - // compact - vnodeCompactImpl(pInfo); + pInfo->pVnode = pVnode; + pInfo->flag = 0; + pInfo->commitID = ++pVnode->state.commitID; - // end compact - tsem_post(&pInfo->pVnode->canCommit); + char dir[TSDB_FILENAME_LEN] = {0}; + SVnodeInfo info = {0}; + + if (pVnode->pTfs) { + snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); + } else { + snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path); + } + + vnodeLoadInfo(dir, &info); + info.state.commitID = pInfo->commitID; + vnodeSaveInfo(dir, &info); _exit: - taosMemoryFree(pInfo); + if (code) { + vError("vgId:%d %s failed at line %d since %s, commit ID:%d", TD_VID(pVnode), __func__, lino, tstrerror(code), + pVnode->state.commitID); + } else { + vDebug("vgId:%d %s done, commit ID:%d", TD_VID(pVnode), __func__, pVnode->state.commitID); + } return code; } int32_t vnodeAsyncCompact(SVnode *pVnode) { int32_t code = 0; + int32_t lino = 0; - // schedule compact task - SCommitInfo *pInfo = taosMemoryCalloc(1, sizeof(*pInfo)); - if (NULL == pInfo) { + SCompactInfo *pInfo = taosMemoryCalloc(1, sizeof(*pInfo)); + if (pInfo == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } - vnodePrepareCommit(pVnode, pInfo); + vnodeAsyncCommit(pVnode); + + code = vnodePrepareCompact(pVnode, pInfo); + TSDB_CHECK_CODE(code, lino, _exit); + vnodeScheduleTask(vnodeCompactTask, pInfo); _exit: if (code) { - vError("vgId:%d %s failed since %s", TD_VID(pInfo->pVnode), __func__, tstrerror(code)); + vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + if (pInfo) taosMemoryFree(pInfo); + } else { + vInfo("vgId:%d %s done", TD_VID(pVnode), __func__); } return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 942cd7e4c4..11d8583ff8 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -49,7 +49,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { info.state.commitID = 0; vInfo("vgId:%d, save config while create", pCfg->vgId); - if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) { + if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir) < 0) { vError("vgId:%d, failed to save vnode config since %s", pCfg ? pCfg->vgId : 0, tstrerror(terrno)); return -1; } @@ -97,7 +97,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p return -1; } - ret = vnodeCommitInfo(dir, &info); + ret = vnodeCommitInfo(dir); if (ret < 0) { vError("vgId:%d, failed to commit vnode config since %s", pReq->vgId, tstrerror(terrno)); return -1; @@ -198,7 +198,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod return -1; } - ret = vnodeCommitInfo(dir, &info); + ret = vnodeCommitInfo(dir); if (ret < 0) { vError("vgId:%d, failed to commit vnode config since %s", pReq->dstVgId, tstrerror(terrno)); return -1; @@ -257,7 +257,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { if (updated) { vInfo("vgId:%d, save vnode info since dnode info changed", info.config.vgId); (void)vnodeSaveInfo(dir, &info); - (void)vnodeCommitInfo(dir, &info); + (void)vnodeCommitInfo(dir); } // create handle diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 43f903dc48..cc88f77232 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -349,7 +349,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path); } - vnodeCommitInfo(dir, &pWriter->info); + vnodeCommitInfo(dir); } else { vnodeRollback(pWriter->pVnode); } -- GitLab