diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index ed9d63f1ed0071a0ac75064f9e0f1a3d4a67d379..aeb867b3b630f02e3cde0424ab75dbc7b9975eba 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -836,9 +836,9 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v *(int8_t *)pWrite = pVgroup->compact; cols++; - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int8_t *)pWrite = pVgroup->truncate; - cols++; + // pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + // *(int8_t *)pWrite = pVgroup->truncate; + // cols++; mnodeDecVgroupRef(pVgroup); numOfRows++; diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 52e22dcce726c4e834ed88792053b839bf21ac0a..e55c880cadced0ec6ecaf77295efd2a774fe97c4 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -17,14 +17,13 @@ extern int32_t tsTsdbMetaCompactRatio; #define TSDB_MAX_SUBBLOCKS 8 -static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) { +FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) { if (key < 0) { return (int)((key + 1) / tsTickPerDay[precision] / days - 1); } else { return (int)((key / tsTickPerDay[precision] / days)); } } - typedef struct { SRtn rtn; // retention snapshot SFSIter fsIter; // tsdb file iterator diff --git a/src/tsdb/src/tsdbTruncate.c b/src/tsdb/src/tsdbTruncate.c index bcb6307b2a22ad2768d42a0006fbe78ca4dfddfc..f400e9dccb6494f57c60ce86614b955f9755dd39 100644 --- a/src/tsdb/src/tsdbTruncate.c +++ b/src/tsdb/src/tsdbTruncate.c @@ -49,7 +49,7 @@ static void tsdbEndTruncate(STsdbRepo *pRepo, int eno); static int tsdbTruncateMeta(STsdbRepo *pRepo); static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param); static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet); -static bool tsdbShouldTruncate(STruncateH *pTruncateH); +static bool tsdbShouldTruncate(STsdbRepo *pRepo, STruncateH *pTruncateH); static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo); static void tsdbDestroyTruncateH(STruncateH *pTruncateH); static int tsdbInitCompTbArray(STruncateH *pTruncateH); @@ -133,12 +133,14 @@ static int tsdbAsyncTruncate(STsdbRepo *pRepo, void *param) { // avoid repeated input of commands by end users in a short period of time if (pRepo->truncateState != TSDB_NO_TRUNCATE) { tsdbInfo("vgId:%d retry later as tsdb in truncating state", REPO_ID(pRepo)); - return 0; + return -1; } pRepo->truncateState = TSDB_WAITING_TRUNCATE; // flush the mem data to disk synchronously(have impact on the compression rate) - tsdbSyncCommit(pRepo); + if (tsdbSyncCommit(pRepo) < 0) { + return -1; + } // truncate tsem_wait(&(pRepo->readyToCommit)); @@ -175,6 +177,7 @@ static int tsdbTruncateMeta(STsdbRepo *pRepo) { } static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param) { + STsdbCfg * pCfg = REPO_CFG(pRepo); STruncateH compactH; SDFileSet *pSet = NULL; STruncateTblMsg *pMsg = (STruncateTblMsg *)param; @@ -186,6 +189,9 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param) { return -1; } + int sFid = TSDB_KEY_FID(pMsg->span[0].skey, pCfg->daysPerFile, pCfg->precision); + int eFid = TSDB_KEY_FID(pMsg->span[0].ekey, pCfg->daysPerFile, pCfg->precision); + while ((pSet = tsdbFSIterNext(&(compactH.fsIter)))) { // remove expired files if (pSet->fid < compactH.rtn.minFid) { @@ -194,6 +200,11 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param) { continue; } + if ((pSet->fid != sFid) && (pSet->fid != eFid)) { + + continue; + } + #if 0 if (TSDB_FSET_LEVEL(pSet) == TFS_MAX_LEVEL) { tsdbDebug("vgId:%d FSET %d on level %d, should not truncate", REPO_ID(pRepo), pSet->fid, TFS_MAX_LEVEL); @@ -225,7 +236,7 @@ static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) { return -1; } - if (!tsdbShouldTruncate(pTruncateH)) { + if (!tsdbShouldTruncate(pRepo, pTruncateH)) { tsdbDebug("vgId:%d no need to compact FSET %d", REPO_ID(pRepo), pSet->fid); if (tsdbApplyRtnOnFSet(TSDB_TRUNCATE_REPO(pTruncateH), pSet, &(pTruncateH->rtn)) < 0) { tsdbTruncateFSetEnd(pTruncateH); @@ -234,12 +245,14 @@ static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) { } else { // Create new fset as compacted fset tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pTruncateH->rtn)), &(did.level), &(did.id)); +#if 1 // how to make the decision? if (did.level == TFS_UNDECIDED_LEVEL) { terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); tsdbTruncateFSetEnd(pTruncateH); return -1; } +#endif tsdbInitDFileSet(TSDB_TRUNCATE_WSET(pTruncateH), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet), FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_LATEST_FSET_VER); @@ -265,61 +278,27 @@ static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) { return 0; } -static bool tsdbShouldTruncate(STruncateH *pTruncateH) { - if (tsdbForceTruncateFile) { - return true; - } - STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(pTruncateH); - STsdbCfg * pCfg = REPO_CFG(pRepo); - SReadH * pReadh = &(pTruncateH->readh); - STableTruncateH *pTh; - SBlock * pBlock; - int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock); - SDFile * pDataF = TSDB_READ_DATA_FILE(pReadh); - SDFile * pLastF = TSDB_READ_LAST_FILE(pReadh); - - int tblocks = 0; // total blocks - int nSubBlocks = 0; // # of blocks with sub-blocks - int nSmallBlocks = 0; // # of blocks with rows < defaultRows - int64_t tsize = 0; - - for (size_t i = 0; i < taosArrayGetSize(pTruncateH->tbArray); i++) { - pTh = (STableTruncateH *)taosArrayGet(pTruncateH->tbArray, i); - - if (pTh->pTable == NULL || pTh->pBlkIdx == NULL) continue; - - for (size_t bidx = 0; bidx < pTh->pBlkIdx->numOfBlocks; bidx++) { - tblocks++; - pBlock = pTh->pInfo->blocks + bidx; - - if (pBlock->numOfRows < defaultRows) { - nSmallBlocks++; - } +static bool tsdbShouldTruncate(STsdbRepo *pRepo, STruncateH *pTruncateH) { + STsdbCfg *pCfg = REPO_CFG(pRepo); + TSKEY minKey, midKey, maxKey, now; - if (pBlock->numOfSubBlocks > 1) { - nSubBlocks++; - for (int k = 0; k < pBlock->numOfSubBlocks; k++) { - SBlock *iBlock = ((SBlock *)POINTER_SHIFT(pTh->pInfo, pBlock->offset)) + k; - tsize = tsize + iBlock->len; - } - } else if (pBlock->numOfSubBlocks == 1) { - tsize += pBlock->len; - } else { - ASSERT(0); - } - } - } + now = taosGetTimestamp(pCfg->precision); + minKey = now - pCfg->keep * tsTickPerDay[pCfg->precision]; + midKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision]; + maxKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision]; - return (((nSubBlocks * 1.0 / tblocks) > 0.33) || ((nSmallBlocks * 1.0 / tblocks) > 0.33) || - (tsize * 1.0 / (pDataF->info.size + pLastF->info.size - 2 * TSDB_FILE_HEAD_SIZE) < 0.85)); + pRtn->minKey = minKey; + pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision)); } -static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo) { +static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo, ) { STsdbCfg *pCfg = REPO_CFG(pRepo); memset(pTruncateH, 0, sizeof(*pTruncateH)); - TSDB_FSET_SET_CLOSED(TSDB_TRUNCATE_WSET(pTruncateH)); + pTruncateH-> + + TSDB_FSET_SET_CLOSED(TSDB_TRUNCATE_WSET(pTruncateH)); tsdbGetRtnSnap(pRepo, &(pTruncateH->rtn)); tsdbFSIterInit(&(pTruncateH->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index a344930d8a703e812e9ca6fa199dc443012bc3c3..73f2ae18d9a7e17a39280b074bdd6ff1e56b606a 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -117,7 +117,7 @@ int32_t vnodeDrop(int32_t vgId) { int32_t vnodeCompact(int32_t vgId) { void *pVnode = vnodeAcquire(vgId); if (pVnode != NULL) { - vDebug("vgId:%d, compact vnode msg is received", vgId); + printf("vgId:%d, compact vnode msg is received\n", vgId); //not care success or not tsdbCompact(((SVnodeObj*)pVnode)->tsdb); vnodeRelease(pVnode); @@ -125,17 +125,31 @@ int32_t vnodeCompact(int32_t vgId) { vInfo("vgId:%d, vnode not exist, can't compact it", vgId); return TSDB_CODE_VND_INVALID_VGROUP_ID; } + printf("vgId:%d, compact vnode msg is finished\n", vgId); return TSDB_CODE_SUCCESS; } int32_t vnodeTruncate(STruncateTblMsg *pMsg) { - int32_t vgId = pMsg->vgId; + // build test data + // pMsg->vgId = 2; + // pMsg->uid = 562949986978701; + // pMsg->nSpan = 1; + // pMsg->span = malloc(pMsg->nSpan * sizeof(STimeWindow)); + + int32_t vgId = 2; void * pVnode = vnodeAcquire(vgId); if (pVnode != NULL) { vDebug("vgId:%d, truncate table %s msg is received", vgId, pMsg->tableFname); // not care success or not - void *param = NULL; - tsdbTruncate(((SVnodeObj *)pVnode)->tsdb, param); + STruncateTblMsg *param = (STruncateTblMsg *)calloc(1, sizeof(STruncateTblMsg) + pMsg->nSpan * sizeof(STimeWindow)); + param->vgId = 2; + param->uid = 562949986978794; + param->nSpan = 1; + param->span[0].skey = 0; + param->span[0].ekey = 1; + if (tsdbTruncate(((SVnodeObj *)pVnode)->tsdb, param) < 0) { + tfree(param); + } vnodeRelease(pVnode); } else { vInfo("vgId:%d, vnode not exist, can't truncate table %s in it", vgId, pMsg->tableFname);