diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index c09d5fffba9de0d9e37bded5734ce8da3bd96442..e6d7b2e875021f9494c6e9fd8a29f280f2bddf30 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -557,6 +557,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { pRepo->state = TSDB_STATE_OK; pRepo->code = TSDB_CODE_SUCCESS; pRepo->compactState = 0; + pRepo->truncateState = 0; pRepo->config = *pCfg; if (pAppH) { pRepo->appH = *pAppH; diff --git a/src/tsdb/src/tsdbReadImpl.c b/src/tsdb/src/tsdbReadImpl.c index 4976e8b8fb8213b6a9cdedbf380d812e117f1fc8..98fac3df7717157676b5c9ebd309c9413bfc8914 100644 --- a/src/tsdb/src/tsdbReadImpl.c +++ b/src/tsdb/src/tsdbReadImpl.c @@ -682,7 +682,6 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat uint32_t toffset = TSDB_KEY_COL_OFFSET; int32_t tlen = pBlock->keyLen; - if (dcol != 0) { tsdbGetSBlockCol(pBlock, &pBlockCol, pBlockData->cols, ccol); tcolId = pBlockCol->colId; diff --git a/src/tsdb/src/tsdbTruncate.c b/src/tsdb/src/tsdbTruncate.c index da0e8fb6026312fcf8cb0e952f42505d56e5c464..63402f162c066aa122f3bc3b9811e2f6281ca522 100644 --- a/src/tsdb/src/tsdbTruncate.c +++ b/src/tsdb/src/tsdbTruncate.c @@ -24,7 +24,7 @@ typedef struct { typedef struct { SRtn rtn; SFSIter fsIter; - SArray * tbArray; // STableTruncateH, table array to cache table obj and block indexes + SArray * tblArray; // STableTruncateH, table array to cache table obj and block indexes SReadH readh; SDFileSet wSet; SArray * aBlkIdx; @@ -51,8 +51,8 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, void *param); static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet); static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo); static void tsdbDestroyTruncateH(STruncateH *pTruncateH); -static int tsdbInitCompTbArray(STruncateH *pTruncateH); -static void tsdbDestroyCompTbArray(STruncateH *pTruncateH); +static int tsdbInitTruncateTblArray(STruncateH *pTruncateH); +static void tsdbDestroyTruncateTblArray(STruncateH *pTruncateH); static int tsdbCacheFSetIndex(STruncateH *pTruncateH); static int tsdbTruncateCache(STsdbRepo *pRepo, void *param); static int tsdbTruncateFSetInit(STruncateH *pTruncateH, SDFileSet *pSet); @@ -250,6 +250,7 @@ static int tsdbTruncateFSet(STruncateH *pTruncateH, SDFileSet *pSet) { tsdbInitDFileSet(TSDB_TRUNCATE_WSET(pTruncateH), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet), FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_LATEST_FSET_VER); + if (tsdbCreateDFileSet(TSDB_TRUNCATE_WSET(pTruncateH), true) < 0) { tsdbError("vgId:%d failed to truncate FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); tsdbTruncateFSetEnd(pTruncateH); @@ -285,7 +286,7 @@ static int tsdbInitTruncateH(STruncateH *pTruncateH, STsdbRepo *pRepo) { return -1; } - if (tsdbInitCompTbArray(pTruncateH) < 0) { + if (tsdbInitTruncateTblArray(pTruncateH) < 0) { tsdbDestroyTruncateH(pTruncateH); return -1; } @@ -318,33 +319,33 @@ static void tsdbDestroyTruncateH(STruncateH *pTruncateH) { pTruncateH->pDataCols = tdFreeDataCols(pTruncateH->pDataCols); pTruncateH->aSupBlk = taosArrayDestroy(pTruncateH->aSupBlk); pTruncateH->aBlkIdx = taosArrayDestroy(pTruncateH->aBlkIdx); - tsdbDestroyCompTbArray(pTruncateH); + tsdbDestroyTruncateTblArray(pTruncateH); tsdbDestroyReadH(&(pTruncateH->readh)); tsdbCloseDFileSet(TSDB_TRUNCATE_WSET(pTruncateH)); } -static int tsdbInitCompTbArray(STruncateH *pTruncateH) { // Init pComp->tbArray +static int tsdbInitTruncateTblArray(STruncateH *pTruncateH) { STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH); STsdbMeta *pMeta = pRepo->tsdbMeta; if (tsdbRLockRepoMeta(pRepo) < 0) return -1; - pTruncateH->tbArray = taosArrayInit(pMeta->maxTables, sizeof(STableTruncateH)); - if (pTruncateH->tbArray == NULL) { + pTruncateH->tblArray = taosArrayInit(pMeta->maxTables, sizeof(STableTruncateH)); + if (pTruncateH->tblArray == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbUnlockRepoMeta(pRepo); return -1; } // Note here must start from 0 - for (int i = 0; i < pMeta->maxTables; i++) { + for (int i = 0; i < pMeta->maxTables; ++i) { STableTruncateH ch = {0}; if (pMeta->tables[i] != NULL) { tsdbRefTable(pMeta->tables[i]); ch.pTable = pMeta->tables[i]; } - if (taosArrayPush(pTruncateH->tbArray, &ch) == NULL) { + if (taosArrayPush(pTruncateH->tblArray, &ch) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbUnlockRepoMeta(pRepo); return -1; @@ -355,22 +356,21 @@ static int tsdbInitCompTbArray(STruncateH *pTruncateH) { // Init pComp->tbArray return 0; } -static void tsdbDestroyCompTbArray(STruncateH *pTruncateH) { - STableTruncateH *pTh; +static void tsdbDestroyTruncateTblArray(STruncateH *pTruncateH) { + STableTruncateH *pTblHandle = NULL; - if (pTruncateH->tbArray == NULL) return; + if (pTruncateH->tblArray == NULL) return; - for (size_t i = 0; i < taosArrayGetSize(pTruncateH->tbArray); i++) { - pTh = (STableTruncateH *)taosArrayGet(pTruncateH->tbArray, i); - if (pTh->pTable) { - tsdbUnRefTable(pTh->pTable); + for (size_t i = 0; i < taosArrayGetSize(pTruncateH->tblArray); ++i) { + pTblHandle = (STableTruncateH *)taosArrayGet(pTruncateH->tblArray, i); + if (pTblHandle->pTable) { + tsdbUnRefTable(pTblHandle->pTable); } - // pTh->pInfo = taosTZfree(pTh->pInfo); - tfree(pTh->pInfo); + tfree(pTblHandle->pInfo); } - pTruncateH->tbArray = taosArrayDestroy(pTruncateH->tbArray); + pTruncateH->tblArray = taosArrayDestroy(pTruncateH->tblArray); } static int tsdbCacheFSetIndex(STruncateH *pTruncateH) { @@ -380,21 +380,22 @@ static int tsdbCacheFSetIndex(STruncateH *pTruncateH) { return -1; } - for (int tid = 1; tid < taosArrayGetSize(pTruncateH->tbArray); tid++) { - STableTruncateH *pTh = (STableTruncateH *)taosArrayGet(pTruncateH->tbArray, tid); - pTh->pBlkIdx = NULL; + size_t tblArraySize = taosArrayGetSize(pTruncateH->tblArray); + for (size_t tid = 1; tid < tblArraySize; ++tid) { + STableTruncateH *pTblHandle = (STableTruncateH *)taosArrayGet(pTruncateH->tblArray, tid); + pTblHandle->pBlkIdx = NULL; - if (pTh->pTable == NULL) continue; - if (tsdbSetReadTable(pReadH, pTh->pTable) < 0) { + if (pTblHandle->pTable == NULL) continue; + if (tsdbSetReadTable(pReadH, pTblHandle->pTable) < 0) { return -1; } if (pReadH->pBlkIdx == NULL) continue; - pTh->bIndex = *(pReadH->pBlkIdx); - pTh->pBlkIdx = &(pTh->bIndex); + pTblHandle->bIndex = *(pReadH->pBlkIdx); + pTblHandle->pBlkIdx = &(pTblHandle->bIndex); uint32_t originLen = 0; - if (tsdbLoadBlockInfo(pReadH, (void **)(&(pTh->pInfo)), &originLen) < 0) { + if (tsdbLoadBlockInfo(pReadH, (void **)(&(pTblHandle->pInfo)), &originLen) < 0) { return -1; } } @@ -424,7 +425,7 @@ static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) { STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH); STsdbCfg * pCfg = REPO_CFG(pRepo); SReadH * pReadh = &(pTruncateH->readh); - SBlockIdx blkIdx; + SBlockIdx blkIdx = {0}; void ** ppBuf = &(TSDB_TRUNCATE_BUF(pTruncateH)); void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(pTruncateH)); void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(pTruncateH)); @@ -432,51 +433,60 @@ static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) { taosArrayClear(pTruncateH->aBlkIdx); - for (int tid = 1; tid < taosArrayGetSize(pTruncateH->tbArray); tid++) { - STableTruncateH *pTh = (STableTruncateH *)taosArrayGet(pTruncateH->tbArray, tid); - STSchema * pSchema; + for (size_t tid = 1; tid < taosArrayGetSize(pTruncateH->tblArray); ++tid) { + STableTruncateH *pTblHandle = (STableTruncateH *)taosArrayGet(pTruncateH->tblArray, tid); + STSchema * pSchema = NULL; + + if (pTblHandle->pTable == NULL || pTblHandle->pBlkIdx == NULL) continue; - if (pTh->pTable == NULL || pTh->pBlkIdx == NULL) continue; + if ((pSchema = tsdbGetTableSchemaImpl(pTblHandle->pTable, true, true, -1, -1)) == NULL) { + return -1; + } - pSchema = tsdbGetTableSchemaImpl(pTh->pTable, true, true, -1, -1); taosArrayClear(pTruncateH->aSupBlk); + if ((tdInitDataCols(pTruncateH->pDataCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[1], pSchema) < 0)) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tdFreeSchema(pSchema); return -1; } + tdFreeSchema(pSchema); // Loop to truncate each block data - for (int i = 0; i < pTh->pBlkIdx->numOfBlocks; i++) { - SBlock *pBlock = pTh->pInfo->blocks + i; + for (int i = 0; i < pTblHandle->pBlkIdx->numOfBlocks; ++i) { + SBlock *pBlock = pTblHandle->pInfo->blocks + i; // Load the block data - if (tsdbLoadBlockData(pReadh, pBlock, pTh->pInfo) < 0) { + if (tsdbLoadBlockData(pReadh, pBlock, pTblHandle->pInfo) < 0) { return -1; } // Merge pTruncateH->pDataCols and pReadh->pDCols[0] and write data to file - if (pTruncateH->pDataCols->numOfRows == 0 && pBlock->numOfRows >= defaultRows) { - if (tsdbWriteBlockToRightFile(pTruncateH, pTh->pTable, pReadh->pDCols[0], ppBuf, ppCBuf, ppExBuf) < 0) { + if ((pTruncateH->pDataCols->numOfRows == 0) && (pBlock->numOfRows >= defaultRows)) { + // if pTruncateH->pDataCols has no data and pBlock->numOfRows >= defaultRows, write data directly + if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pReadh->pDCols[0], ppBuf, ppCBuf, ppExBuf) < 0) { return -1; } } else { int ridx = 0; - while (true) { - if (pReadh->pDCols[0]->numOfRows - ridx == 0) break; + // no data left to merge anymore + if ((pReadh->pDCols[0]->numOfRows - ridx) == 0) break; + + // min(data left, target space left) int rowsToMerge = MIN(pReadh->pDCols[0]->numOfRows - ridx, defaultRows - pTruncateH->pDataCols->numOfRows); tdMergeDataCols(pTruncateH->pDataCols, pReadh->pDCols[0], rowsToMerge, &ridx, pCfg->update != TD_ROW_PARTIAL_UPDATE); if (pTruncateH->pDataCols->numOfRows < defaultRows) { + // continue to read more blocks break; } - if (tsdbWriteBlockToRightFile(pTruncateH, pTh->pTable, pTruncateH->pDataCols, ppBuf, ppCBuf, ppExBuf) < 0) { + if (tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pTruncateH->pDataCols, ppBuf, ppCBuf, ppExBuf) < 0) { return -1; } tdResetDataCols(pTruncateH->pDataCols); @@ -485,16 +495,16 @@ static int tsdbTruncateFSetImpl(STruncateH *pTruncateH) { } if (pTruncateH->pDataCols->numOfRows > 0 && - tsdbWriteBlockToRightFile(pTruncateH, pTh->pTable, pTruncateH->pDataCols, ppBuf, ppCBuf, ppExBuf) < 0) { + tsdbWriteBlockToRightFile(pTruncateH, pTblHandle->pTable, pTruncateH->pDataCols, ppBuf, ppCBuf, ppExBuf) < 0) { return -1; } - if (tsdbWriteBlockInfoImpl(TSDB_TRUNCATE_HEAD_FILE(pTruncateH), pTh->pTable, pTruncateH->aSupBlk, NULL, ppBuf, + if (tsdbWriteBlockInfoImpl(TSDB_TRUNCATE_HEAD_FILE(pTruncateH), pTblHandle->pTable, pTruncateH->aSupBlk, NULL, ppBuf, &blkIdx) < 0) { return -1; } - if ((blkIdx.numOfBlocks > 0) && (taosArrayPush(pTruncateH->aBlkIdx, (void *)(&blkIdx)) == NULL)) { + if ((blkIdx.numOfBlocks > 0) && (taosArrayPush(pTruncateH->aBlkIdx, (const void *)(&blkIdx)) == NULL)) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -511,9 +521,9 @@ static int tsdbWriteBlockToRightFile(STruncateH *pTruncateH, STable *pTable, SDa void **ppCBuf, void **ppExBuf) { STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(pTruncateH); STsdbCfg * pCfg = REPO_CFG(pRepo); - SDFile * pDFile; - bool isLast; - SBlock block; + SDFile * pDFile = NULL; + bool isLast = false; + SBlock block = {0}; ASSERT(pDataCols->numOfRows > 0);