diff --git a/src/tsdb/src/tsdbTruncate.c b/src/tsdb/src/tsdbTruncate.c index a7cbcd89a20622908a00f5c245e792ec109d6272..82ab2a20c0282450a5166027a646687d8de1f88c 100644 --- a/src/tsdb/src/tsdbTruncate.c +++ b/src/tsdb/src/tsdbTruncate.c @@ -29,7 +29,7 @@ typedef struct { SDFileSet wSet; SArray * aBlkIdx; SArray * aSupBlk; - SDataCols *pDataCols; + SDataCols *pDCols; void * param; // STruncateTblMsg *pMsg } STruncateH; @@ -59,7 +59,8 @@ static int tsdbTruncateCache(STsdbRepo *pRepo, void *param); static int tsdbTruncateFSetInit(STruncateH *pTruncateH, SDFileSet *pSet); static void tsdbTruncateFSetEnd(STruncateH *pTruncateH); static int tsdbTruncateFSetImpl(STruncateH *pTruncateH); -static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDataCols, void **ppBuf, +static bool tsdbBlockInterleaved(STruncateH *pTruncateH, SBlock *pBlock); +static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDCols, void **ppBuf, void **ppCBuf, void **ppExBuf); enum { @@ -71,6 +72,7 @@ enum { int tsdbTruncate(STsdbRepo *pRepo, void *param) { return tsdbAsyncTruncate(pRepo, param); } void *tsdbTruncateImpl(STsdbRepo *pRepo, void *param) { + ASSERT(param != NULL); int32_t code = 0; // Step 1: check and clear cache if ((code = tsdbTruncateCache(pRepo, param)) != 0) { @@ -180,15 +182,13 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param) { STruncateH truncateH = {0}; SDFileSet * pSet = NULL; STruncateTblMsg *pMsg = (STruncateTblMsg *)param; - ASSERT(pMsg != NULL); - - truncateH.param = pMsg; tsdbDebug("vgId:%d start to truncate TS data for %" PRIu64, REPO_ID(pRepo), pMsg->uid); if (tsdbInitTruncateH(&truncateH, pRepo) < 0) { return -1; } + truncateH.param = pMsg; int sFid = TSDB_KEY_FID(pMsg->span[0].skey, pCfg->daysPerFile, pCfg->precision); int eFid = TSDB_KEY_FID(pMsg->span[0].ekey, pCfg->daysPerFile, pCfg->precision); @@ -217,7 +217,6 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param) { continue; } #endif - if (tsdbTruncateFSet(&truncateH, pSet) < 0) { tsdbDestroyTruncateH(&truncateH); tsdbError("vgId:%d failed to truncate FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); @@ -307,8 +306,8 @@ static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo) { return -1; } - pTruncateH->pDataCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock); - if (pTruncateH->pDataCols == NULL) { + pTruncateH->pDCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock); + if (pTruncateH->pDCols == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbDestroyTruncateH(pTruncateH); return -1; @@ -318,7 +317,7 @@ static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo) { } static void tsdbDestroyTruncateH(STruncateH *pTruncateH) { - pTruncateH->pDataCols = tdFreeDataCols(pTruncateH->pDataCols); + pTruncateH->pDCols = tdFreeDataCols(pTruncateH->pDCols); pTruncateH->aSupBlk = taosArrayDestroy(pTruncateH->aSupBlk); pTruncateH->aBlkIdx = taosArrayDestroy(pTruncateH->aBlkIdx); tsdbDestroyTruncateTblArray(pTruncateH); @@ -423,16 +422,56 @@ static int tsdbTruncateFSetInit(STruncateH *pTruncateH, SDFileSet *pSet) { static void tsdbTruncateFSetEnd(STruncateH *pTruncateH) { tsdbCloseAndUnsetFSet(&(pTruncateH->readh)); } +static bool tsdbBlockInterleaved(STruncateH *pTruncateH, SBlock *pBlock) { + // STruncateTblMsg *pMsg = (STruncateTblMsg *)pTruncateH->param; + // for (uint16_t i = 0; i < pMsg->nSpan; ++i) { + // STimeWindow tw = pMsg->span[i]; + // if (!(pBlock->keyFirst > tw.ekey || pBlock->keyLast < tw.skey)) { + // return true; + // } + // } + // return false; + return true; +} + +static int32_t tsdbFilterDataCols(STruncateH *pTruncateH, SDataCols *pSrcDCols) { + STruncateTblMsg *pMsg = (STruncateTblMsg *)pTruncateH->param; + SDataCols * pDstDCols = pTruncateH->pDCols; + + tdResetDataCols(pDstDCols); + pDstDCols->maxCols = pSrcDCols->maxCols; + pDstDCols->maxPoints = pSrcDCols->maxPoints; + pDstDCols->numOfCols = pSrcDCols->numOfCols; + pDstDCols->sversion = pSrcDCols->sversion; + + for (int i = 0; i < pSrcDCols->numOfRows; ++i) { + int64_t tsKey = *(int64_t *)tdGetColDataOfRow(pSrcDCols->cols, i); + if ((tsKey >= pMsg->span[0].skey) && (tsKey <= pMsg->span[0].ekey)) { + printf("tsKey %" PRId64 " is filtered\n", tsKey); + continue; + } + for (int j = 0; j < pSrcDCols->numOfCols; ++j) { + if (pSrcDCols->cols[j].len > 0 || pDstDCols->cols[j].len > 0) { + dataColAppendVal(pDstDCols->cols + j, tdGetColDataOfRow(pSrcDCols->cols + j, i), pDstDCols->numOfRows, + pDstDCols->maxPoints, 0); + } + } + ++pDstDCols->numOfRows; + } + + return 0; +} + static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) { STsdbRepo * pRepo = TSDB_TRUNCATE_REPO(pTruncateH); STruncateTblMsg *pMsg = (STruncateTblMsg *)pTruncateH->param; - STsdbCfg * pCfg = REPO_CFG(pRepo); - SReadH * pReadh = &(pTruncateH->readh); - SBlockIdx blkIdx = {0}; - void ** ppBuf = &(TSDB_TRUNCATE_BUF(pTruncateH)); - void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(pTruncateH)); - void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(pTruncateH)); - int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock); + // STsdbCfg * pCfg = REPO_CFG(pRepo); + SReadH * pReadh = &(pTruncateH->readh); + SBlockIdx blkIdx = {0}; + void ** ppBuf = &(TSDB_TRUNCATE_BUF(pTruncateH)); + void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(pTruncateH)); + void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(pTruncateH)); + // int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock); taosArrayClear(pTruncateH->aBlkIdx); @@ -448,7 +487,12 @@ static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) { taosArrayClear(pTruncateH->aSupBlk); - if ((tdInitDataCols(pTruncateH->pDataCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) || + uint64_t uid = pTblHandle->pTable->tableId.uid; + // if(uid != pMsg->uid) { + // TODO: copy the block data directly + // } + + if ((tdInitDataCols(pTruncateH->pDCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[1], pSchema) < 0)) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tdFreeSchema(pSchema); @@ -461,49 +505,35 @@ static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) { for (int i = 0; i < pTblHandle->pBlkIdx->numOfBlocks; ++i) { SBlock *pBlock = pTblHandle->pInfo->blocks + i; - // Load the block data + // Copy the Blocks directly if TS is not interleaved. + if (!tsdbBlockInterleaved(pTruncateH, pBlock)) { + // tsdbWriteBlockAndDataToFile(); + continue; + } + + // Otherwise load the block data and copy the specific rows. if (tsdbLoadBlockData(pReadh, pBlock, pTblHandle->pInfo) < 0) { return -1; } + if (uid == pMsg->uid) { + tsdbFilterDataCols(pTruncateH, pReadh->pDCols[0]); + tsdbDebug("vgId:%d uid %" PRIu64 " matched, filter block data from rows %d to %d rows", REPO_ID(pRepo), uid, + pReadh->pDCols[0]->numOfRows, pTruncateH->pDCols->numOfRows); + if (pTruncateH->pDCols->numOfRows <= 0) continue; - // Merge pTruncateH->pDataCols and pReadh->pDCols[0] and write data to file - if ((pTruncateH->pDataCols->numOfRows == 0) && (pBlock->numOfRows >= defaultRows)) { - // if pTruncateH->pDataCols has no data and pBlock->numOfRows >= defaultRows, write data directly - if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pReadh->pDCols[0], ppBuf, ppCBuf, ppExBuf) < 0) { + if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pTruncateH->pDCols, ppBuf, ppCBuf, ppExBuf) < 0) { return -1; } } else { - int ridx = 0; - while (true) { - // no data left to merge anymore - if ((pReadh->pDCols[0]->numOfRows - ridx) == 0) break; - - // min(data left, target space left) - int rowsToMerge = MIN(pReadh->pDCols[0]->numOfRows - ridx, defaultRows - pTruncateH->pDataCols->numOfRows); - - tdMergeDataCols(pTruncateH->pDataCols, pReadh->pDCols[0], rowsToMerge, &ridx, - pCfg->update != TD_ROW_PARTIAL_UPDATE); - - if (pTruncateH->pDataCols->numOfRows < defaultRows) { - // continue to read more blocks - break; - } - - if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pTruncateH->pDataCols, ppBuf, ppCBuf, ppExBuf) < 0) { - return -1; - } - tdResetDataCols(pTruncateH->pDataCols); + tsdbDebug("vgId:%d uid %" PRIu64 " not matched, copy block data directly\n", REPO_ID(pRepo), uid); + if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pReadh->pDCols[0], ppBuf, ppCBuf, ppExBuf) < 0) { + return -1; } } } - if (pTruncateH->pDataCols->numOfRows > 0 && - tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pTruncateH->pDataCols, ppBuf, ppCBuf, ppExBuf) < 0) { - return -1; - } - - if (tsdbWriteBlockInfoImpl(TSDB_TRUNCATE_HEAD_FILE(pTruncateH), pTblHandle->pTable, pTruncateH->aSupBlk, NULL, ppBuf, - &blkIdx) < 0) { + if (tsdbWriteBlockInfoImpl(TSDB_TRUNCATE_HEAD_FILE(pTruncateH), pTblHandle->pTable, pTruncateH->aSupBlk, NULL, + ppBuf, &blkIdx) < 0) { return -1; } @@ -520,7 +550,7 @@ static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) { return 0; } -static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDataCols, void **ppBuf, +static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDataCols *pDCols, void **ppBuf, void **ppCBuf, void **ppExBuf) { STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH); STsdbCfg * pCfg = REPO_CFG(pRepo); @@ -528,9 +558,9 @@ static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDa bool isLast = false; SBlock block = {0}; - ASSERT(pDataCols->numOfRows > 0); + ASSERT(pDCols->numOfRows > 0); - if (pDataCols->numOfRows < pCfg->minRowsPerFileBlock) { + if (pDCols->numOfRows < pCfg->minRowsPerFileBlock) { pDFile = TSDB_TRUNCATE_LAST_FILE(pTruncateH); isLast = true; } else { @@ -539,7 +569,7 @@ static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDa } if (tsdbWriteBlockImpl(pRepo, pTable, pDFile, - isLast ? TSDB_TRUNCATE_SMAL_FILE(pTruncateH) : TSDB_TRUNCATE_SMAD_FILE(pTruncateH), pDataCols, + isLast ? TSDB_TRUNCATE_SMAL_FILE(pTruncateH) : TSDB_TRUNCATE_SMAD_FILE(pTruncateH), pDCols, &block, isLast, true, ppBuf, ppCBuf, ppExBuf) < 0) { return -1; } @@ -550,4 +580,34 @@ static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDa } return 0; -} \ No newline at end of file +} + +// static int tsdbWriteBlockAndDataToFile(STruncateH *pTruncateH, STable *pTable, SBlock *pSupBlock, void **ppBuf, +// void **ppCBuf, void **ppExBuf) { +// STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH); +// SDFile * pDFile = NULL; +// bool isLast = false; + +// ASSERT(pSupBlock->numOfRows > 0); + +// if (pSupBlock->last) { +// pDFile = TSDB_TRUNCATE_LAST_FILE(pTruncateH); +// isLast = true; +// } else { +// pDFile = TSDB_TRUNCATE_DATA_FILE(pTruncateH); +// isLast = false; +// } + +// if (tsdbWriteBlockImpl(pRepo, pTable, pDFile, +// isLast ? TSDB_TRUNCATE_SMAL_FILE(pTruncateH) : TSDB_TRUNCATE_SMAD_FILE(pTruncateH), +// pDCols, &block, isLast, true, ppBuf, ppCBuf, ppExBuf) < 0) { +// return -1; +// } + +// if (taosArrayPush(pTruncateH->aSupBlk, (void *)(&block)) == NULL) { +// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; +// return -1; +// } + +// return 0; +// } \ No newline at end of file diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index e7697a7e0fbf37e2348c60bc651de8071820edaa..d5d4cc4c2e5a0e9d9a6019c4e08565217a2b7394 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -143,10 +143,10 @@ int32_t vnodeTruncate(STruncateTblMsg *pMsg) { // not care success or not STruncateTblMsg *param = (STruncateTblMsg *)calloc(1, sizeof(STruncateTblMsg) + pMsg->nSpan * sizeof(STimeWindow)); param->vgId = 2; - param->uid = 562949986978794; + param->uid = 562949986979009; param->nSpan = 1; - param->span[0].skey = 1637417678000; - param->span[0].ekey = 1637417679000; + param->span[0].skey = 1634701320001; + param->span[0].ekey = 1634701320001; if (tsdbTruncate(((SVnodeObj *)pVnode)->tsdb, param) < 0) { tfree(param); }