From 6238622393495fa6c21ce3d3d90dafafdd6b4653 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 15 Oct 2020 15:31:26 +0800 Subject: [PATCH] refactor more code --- src/tsdb/src/tsdbCommit.c | 714 +++++++++++++++++++++++++------------- 1 file changed, 472 insertions(+), 242 deletions(-) diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 3b63de3616..0bb548744a 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -29,7 +29,10 @@ typedef struct { SReadHandle *pReadH; SFileGroup * pFGroup; SBlockIdx * pBlockIdx; + int nBlockIdx; SBlockInfo * pBlockInfo; + SBlock * pSubBlock; + int nSubBlock; SDataCols * pDataCols; } STSCommitHandle; @@ -60,17 +63,18 @@ int tsdbCommitData(STsdbRepo *pRepo) { if (tsdbStartCommit(pCommitH) < 0) return -1; - if (tsdbCommitTimeSeriesData(pCommitH) < 0) goto _err; + if (tsdbCommitTimeSeriesData(pCommitH) < 0) { + tsdbEndCommit(pCommitH, true); + return -1; + } - if (tsdbCommitMetaData(pCommitH) < 0) goto _err; + if (tsdbCommitMetaData(pCommitH) < 0) { + tsdbEndCommit(pCommitH, true); + return -1; + } tsdbEndCommit(pCommitH, false); - return 0; - -_err: - tsdbEndCommit(pCommitH, true); - return -1; } static int tsdbStartCommit(SCommitHandle *pCommitH) { @@ -302,28 +306,21 @@ static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pOldGroup, SFileG } for (int tid = 1; tid < pTSCh->maxIters; tid++) { - if (tsdbCommitTableData(pTSCh, tid) < 0) { - tsdbCloseAndUnsetCommitFGroup(pTSCh, true /* hasError = true */); - return -1; - } - - if (tsdbTryMoveLastBlock(pTSCh) < 0) { - tsdbCloseAndUnsetCommitFGroup(pTSCh, true /* hasError = true */); - return -1; - } + SCommitIter *pIter = iters + tid; + if (pIter->pTable == NULL) continue; - if (tsdbWriteBlockInfo(pWHelper) < 0) { + if (tsdbCommitTableData(pTSCh, tid) < 0) { tsdbCloseAndUnsetCommitFGroup(pTSCh, true /* hasError = true */); return -1; } } - if (tsdbWriteBlockIdx(pWHelper) < 0) { + if (tsdbWriteBlockIdx(pTSCh) < 0) { tsdbCloseAndUnsetCommitFGroup(pTSCh, true /* hasError = true */); return -1; } - tsdbCloseAndUnsetCommitFGroup(pTSCh, false /* hasError = true */); + tsdbCloseAndUnsetCommitFGroup(pTSCh, false /* hasError = false */); return 0; } @@ -610,236 +607,228 @@ static void tsdbGetNextCommitFileGroup(SFileGroup *pOldGroup, SFileGroup *pNewGr static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) { SCommitIter *pIter = pTSCh->pIters + tid; - if (pIter->pTable == NULL) return 0; + SReadHandle *pReadH = pTSCh->pReadH; + SDataCols * pDataCols = pTSCh->pDataCols; taosRLockLatch(&(pIter->pTable->latch)); - if (pIter->pIter == NULL) { - // TODO + if (tsdbSetCommitTable(pTSCh, pIter->pTable) < 0) { + taosRUnLockLatch(&(pIter->pTable->latch)); + return -1; } - if (tdInitDataCols(pTSCh->pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; + if (tsdbLoadBlockInfo(pReadH) < 0) { + taosRUnLockLatch(&(pIter->pTable->latch)); + return -1; } - if (tsdbReadBlockInfo() < 0) { - goto _err; + if (tsdbCommitTableDataImpl(pTSCh) < 0) { + taosRUnLockLatch(&(pIter->pTable->latch)); + return -1; } - while (true) { - TSKEY keyNext = tsdbNextIterKey(pIter->pIter); - if (keyNext < 0 || keyNext > maxKey) break; - - if (/* no block info exists*/ || keyNext > pIdx->maxKey) { - if (tsdbProcessAppendCommit() < 0) goto _err; - } else { - if (tsdbProcessMergeCommit() < 0) goto _err; - } - + if (tsdbWriteBlockInfo(pTSCh) < 0) { + taosRUnLockLatch(&(pIter->pTable->latch)); + return -1; } taosRUnLockLatch(&(pIter->pTable->latch)); return 0; - -_err: - taosRUnLockLatch(&(pIter->pTable->latch)); - return -1; } -static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { - STsdbCfg * pCfg = &(pHelper->pRepo->config); - STable * pTable = pCommitIter->pTable; - SBlockIdx * pIdx = &(pHelper->curCompIdx); - TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); - int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; - SBlock compBlock = {0}; - - ASSERT(pIdx->len <= 0 || keyFirst > pIdx->maxKey); - if (pIdx->hasLast) { // append to with last block - ASSERT(pIdx->len > 0); - SBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); - ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); - tdResetDataCols(pDataCols); - int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows, - pDataCols, NULL, 0); - ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); - if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && - pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { - if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; - if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1; - } else { - if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; - ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows); - - if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1; - ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows + pDataCols->numOfRows); - - if (tsdbWriteBlockToProperFile(pHelper, pHelper->pDataCols[0], &compBlock) < 0) return -1; - if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; - } - - if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; - } else { - ASSERT(!pHelper->hasOldLastBlock); - tdResetDataCols(pDataCols); - int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0); - ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); - - if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; - if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1; - } - -#ifndef NDEBUG - TSKEY keyNext = tsdbNextIterKey(pCommitIter->pIter); - ASSERT(keyNext < 0 || keyNext > pIdx->maxKey); -#endif - - return 0; -} - -static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey, - int *blkIdx) { - STsdbCfg * pCfg = &(pHelper->pRepo->config); - STable * pTable = pCommitIter->pTable; - SBlockIdx * pIdx = &(pHelper->curCompIdx); - SBlock compBlock = {0}; - TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); - int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; - SDataCols *pDataCols0 = pHelper->pDataCols[0]; - - SSkipListIterator slIter = {0}; - - ASSERT(keyFirst <= pIdx->maxKey); - - SBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx), - pIdx->numOfBlocks - *blkIdx, sizeof(SBlock), compareKeyBlock, TD_GE); - ASSERT(pCompBlock != NULL); - int tblkIdx = (int32_t)(TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock)); - - if (pCompBlock->last) { - ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && tblkIdx == pIdx->numOfBlocks - 1); - int16_t colId = 0; - slIter = *(pCommitIter->pIter); - if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; - ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows); - - int rows1 = defaultRowsInBlock - pCompBlock->numOfRows; - int rows2 = - tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows); - if (rows2 == 0) { // all data filtered out - *(pCommitIter->pIter) = slIter; - } else { - if (pCompBlock->numOfRows + rows2 < pCfg->minRowsPerFileBlock && - pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { - tdResetDataCols(pDataCols); - int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols, - pDataCols0->cols[0].pData, pDataCols0->numOfRows); - ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; - if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; - tblkIdx++; - } else { - if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; - int round = 0; - int dIter = 0; - while (true) { - tdResetDataCols(pDataCols); - int rowsRead = - tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, defaultRowsInBlock); - if (rowsRead == 0) break; - - if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; - if (round == 0) { - if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; - } else { - if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; - } - - tblkIdx++; - round++; - } - } - if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; - } - } else { - TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1); - TSKEY blkKeyFirst = pCompBlock->keyFirst; - TSKEY blkKeyLast = pCompBlock->keyLast; - - if (keyFirst < blkKeyFirst) { - while (true) { - tdResetDataCols(pDataCols); - int rowsRead = - tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0); - if (rowsRead == 0) break; - - ASSERT(rowsRead == pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; - if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; - tblkIdx++; - } - ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || - tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); - } else { - ASSERT(keyFirst <= blkKeyLast); - int16_t colId = 0; - if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; - - slIter = *(pCommitIter->pIter); - int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); - int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, - pDataCols0->numOfRows); - - if (rows2 == 0) { // all filtered out - *(pCommitIter->pIter) = slIter; - ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || - tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); - } else { - int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2; - - if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) { - int rows = (rows1 >= rows3) ? rows3 : rows2; - tdResetDataCols(pDataCols); - int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, - pDataCols0->cols[0].pData, pDataCols0->numOfRows); - ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) - return -1; - if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; - tblkIdx++; - ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || - tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); - } else { - if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; - int round = 0; - int dIter = 0; - while (true) { - int rowsRead = - tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); - if (rowsRead == 0) break; - - if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) - return -1; - if (round == 0) { - if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; - } else { - if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; - } - - round++; - tblkIdx++; - } - ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || - tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); - } - } - } - } - - *blkIdx = tblkIdx; - return 0; -} +// static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { +// STsdbCfg * pCfg = &(pHelper->pRepo->config); +// STable * pTable = pCommitIter->pTable; +// SBlockIdx * pIdx = &(pHelper->curCompIdx); +// TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); +// int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; +// SBlock compBlock = {0}; + +// ASSERT(pIdx->len <= 0 || keyFirst > pIdx->maxKey); +// if (pIdx->hasLast) { // append to with last block +// ASSERT(pIdx->len > 0); +// SBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); +// ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); +// tdResetDataCols(pDataCols); +// int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows, +// pDataCols, NULL, 0); +// ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); +// if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && +// pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { +// if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; +// if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1; +// } else { +// if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; +// ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows); + +// if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1; +// ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows + pDataCols->numOfRows); + +// if (tsdbWriteBlockToProperFile(pHelper, pHelper->pDataCols[0], &compBlock) < 0) return -1; +// if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; +// } + +// if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; +// } else { +// ASSERT(!pHelper->hasOldLastBlock); +// tdResetDataCols(pDataCols); +// int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0); +// ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); + +// if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; +// if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1; +// } + +// #ifndef NDEBUG +// TSKEY keyNext = tsdbNextIterKey(pCommitIter->pIter); +// ASSERT(keyNext < 0 || keyNext > pIdx->maxKey); +// #endif + +// return 0; +// } + +// static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey, +// int *blkIdx) { +// STsdbCfg * pCfg = &(pHelper->pRepo->config); +// STable * pTable = pCommitIter->pTable; +// SBlockIdx * pIdx = &(pHelper->curCompIdx); +// SBlock compBlock = {0}; +// TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); +// int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; +// SDataCols *pDataCols0 = pHelper->pDataCols[0]; + +// SSkipListIterator slIter = {0}; + +// ASSERT(keyFirst <= pIdx->maxKey); + +// SBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx), +// pIdx->numOfBlocks - *blkIdx, sizeof(SBlock), compareKeyBlock, TD_GE); +// ASSERT(pCompBlock != NULL); +// int tblkIdx = (int32_t)(TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock)); + +// if (pCompBlock->last) { +// ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && tblkIdx == pIdx->numOfBlocks - 1); +// int16_t colId = 0; +// slIter = *(pCommitIter->pIter); +// if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; +// ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows); + +// int rows1 = defaultRowsInBlock - pCompBlock->numOfRows; +// int rows2 = +// tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows); +// if (rows2 == 0) { // all data filtered out +// *(pCommitIter->pIter) = slIter; +// } else { +// if (pCompBlock->numOfRows + rows2 < pCfg->minRowsPerFileBlock && +// pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { +// tdResetDataCols(pDataCols); +// int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols, +// pDataCols0->cols[0].pData, pDataCols0->numOfRows); +// ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows); +// if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; +// if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; +// tblkIdx++; +// } else { +// if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; +// int round = 0; +// int dIter = 0; +// while (true) { +// tdResetDataCols(pDataCols); +// int rowsRead = +// tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, defaultRowsInBlock); +// if (rowsRead == 0) break; + +// if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; +// if (round == 0) { +// if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; +// } else { +// if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; +// } + +// tblkIdx++; +// round++; +// } +// } +// if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; +// } +// } else { +// TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1); +// TSKEY blkKeyFirst = pCompBlock->keyFirst; +// TSKEY blkKeyLast = pCompBlock->keyLast; + +// if (keyFirst < blkKeyFirst) { +// while (true) { +// tdResetDataCols(pDataCols); +// int rowsRead = +// tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0); +// if (rowsRead == 0) break; + +// ASSERT(rowsRead == pDataCols->numOfRows); +// if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; +// if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; +// tblkIdx++; +// } +// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || +// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); +// } else { +// ASSERT(keyFirst <= blkKeyLast); +// int16_t colId = 0; +// if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; + +// slIter = *(pCommitIter->pIter); +// int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); +// int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, +// pDataCols0->numOfRows); + +// if (rows2 == 0) { // all filtered out +// *(pCommitIter->pIter) = slIter; +// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || +// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); +// } else { +// int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2; + +// if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) { +// int rows = (rows1 >= rows3) ? rows3 : rows2; +// tdResetDataCols(pDataCols); +// int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, +// pDataCols0->cols[0].pData, pDataCols0->numOfRows); +// ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); +// if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) +// return -1; +// if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; +// tblkIdx++; +// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || +// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); +// } else { +// if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; +// int round = 0; +// int dIter = 0; +// while (true) { +// int rowsRead = +// tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); +// if (rowsRead == 0) break; + +// if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) +// return -1; +// if (round == 0) { +// if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; +// } else { +// if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; +// } + +// round++; +// tblkIdx++; +// } +// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || +// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); +// } +// } +// } +// } + +// *blkIdx = tblkIdx; +// return 0; +// } static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, TSKEY maxKey, int maxRows) { @@ -1097,19 +1086,18 @@ static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, STsdbRepo *pOldGro if (pOldFile->fname[0] == '\0' || strncmp(pOldFile->fname, pNewFile->fname, TSDB_FILENAME_LEN) != 0) { // new file is created - if (tsdbUpdateFileHeader(pNewFile) < 0) { - tsdbError("vgId:%d failed to update file %s header since %s", REPO_ID(pRepo), pNewFile->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - tsdbCloseAndUnsetCommitFGroup(pTSCh, true /*hasError = true*/); - return -1; - } + if (tsdbUpdateFileHeader(pNewFile) < 0) { + tsdbError("vgId:%d failed to update file %s header since %s", REPO_ID(pRepo), pNewFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + tsdbCloseAndUnsetCommitFGroup(pTSCh, true /*hasError = true*/); + return -1; + } } } pTSCh->pFGroup = pNewGroup; + pTSCh->nBlockIdx = 0; return 0; - -_err: } static void tsdbCloseAndUnsetCommitFGroup(STSCommitHandle *pTSCh, bool hasError) { @@ -1128,3 +1116,245 @@ static void tsdbCloseAndUnsetCommitFGroup(STSCommitHandle *pTSCh, bool hasError) } } } + +static int tsdbWriteBlockInfo(STSCommitHandle *pTSCh) { + // TODO + return 0; +} + +static int tsdbWriteBlockIdx(STSCommitHandle *pTSCh) { + // TODO + return 0; +} + +static int tsdbSetCommitTable(STSCommitHandle *pTSCh, STable *pTable) { + if (tsdbSetReadTable(pTSCh->pReadH, pTable) < 0) return -1; + if (tdInitDataCols(pTSCh->pDataCols, tsdbGetTableSchemaImpl(pTable, false, false, -1)) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + return 0; +} + +static int tsdbCommitTableDataImpl(STSCommitHandle *pTSCh, int tid) { + SCommitIter *pIter = pTSCh->pIters + tid; + SReadHandle *pReadH = pTSCh->pReadH; + SDataCols * pDataCols = pTSCh->pDataCols; + SBlockIdx * pOldIdx = pReadH->pCurBlockIdx; + + ASSERT(pOldIdx == NULL || pOldIdx->numOfBlocks > 0); + + int sidx = 0; + int eidx = (pOldIdx == NULL) ? 0 : pOldIdx->numOfBlocks; + + while (true) { + TSKEY keyNext = tsdbNextIterKey(pIter->pIter); + if (keyNext > maxKey) break; + + void *ptr = taosbsearch((void *)keyNext, (void *)(pReadH->pBlockInfo->blocks + sidx), eidx - sidx, sizeof(SBlock), + NULL, TD_GE); + if (ptr == NULL) { + if (sidx < eidx && pOldIdx->hasLast) { + ptr = pReadH->pBlockInfo->blocks + eidx - 1; + } + } + + int bidx = 0; + if (ptr == NULL) { + bidx = eidx; + } else { + bidx = POINTER_DISTANCE(ptr, (void *)pReadH->pBlockInfo->blocks) / sizeof(SBlock); + } + + if (tsdbCopyBlocks(pTSCh, sidx, bidx) < 0) return -1; + sidx = bidx; + + if (ptr == NULL) { + if (tsdbAppendCommit(pTSCh) < 0) return -1; + } else { + if (tsdbMergeCommit(pTSCh, (SBlock *)ptr) < 0) return -1; + sidx++; + } + } + + if (tsdbCopyBlocks(pTSCh, sidx, eidx) < 0) return -1; + + return 0; +} + +static int tsdbCopyBlocks(STSCommitHandle *pTSCh, int sidx, int eidx) { + ASSERT(sidx <= eidx); + for (int idx = sidx; idx < eidx; idx++) { + // TODO + } + return 0; +} + +static int tsdbAppendCommit(STSCommitHandle *pTSCh) { + SDataCols *pDataCols = pTSCh->pDataCols; + SBlock block = {0}; + SBlock * pBlock = █ + + tdResetDataCols(pDataCols); + int rowsToRead = tsdbLoadDataFromCache(); + ASSERT(rowsToRead > 0); + + if (tsdbWriteBlockToProperFile(pTSCh, pDataCols, pBlock) < 0) return -1; + if (tsdbAddSuperBlock(pTSCh, pBlock) < 0) return -1; + + return -1; +} + +static int tsdbMergeCommit(STSCommitHandle *pTSCh, SBlock *pBlock) { + STable * pTable = pTSCh->pReadH->pTable; + SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable); + + TSKEY nextKey = tsdbNextIterKey(pIter->pIter); + + if (pBlock->last) { + + } else { + + } + + return 0; +} + + +static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SBlock *pCompBlock, + bool isLast, bool isSuperBlock) { + STsdbCfg * pCfg = &(pHelper->pRepo->config); + SBlockData *pCompData = (SBlockData *)(pHelper->pBuffer); + int64_t offset = 0; + int rowsToWrite = pDataCols->numOfRows; + + ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); + ASSERT(isLast ? rowsToWrite < pCfg->minRowsPerFileBlock : true); + + offset = lseek(pFile->fd, 0, SEEK_END); + if (offset < 0) { + tsdbError("vgId:%d failed to write block to file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + int nColsNotAllNull = 0; + for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column + SDataCol *pDataCol = pDataCols->cols + ncol; + SBlockCol *pCompCol = pCompData->cols + nColsNotAllNull; + + if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it + continue; + } + + memset(pCompCol, 0, sizeof(*pCompCol)); + + pCompCol->colId = pDataCol->colId; + pCompCol->type = pDataCol->type; + if (tDataTypeDesc[pDataCol->type].getStatisFunc) { + (*tDataTypeDesc[pDataCol->type].getStatisFunc)( + (TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max), + &(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull)); + } + nColsNotAllNull++; + } + + ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols); + + // Compress the data if neccessary + int tcol = 0; + int32_t toffset = 0; + int32_t tsize = TSDB_GET_COMPCOL_LEN(nColsNotAllNull); + int32_t lsize = tsize; + int32_t keyLen = 0; + for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { + if (ncol != 0 && tcol >= nColsNotAllNull) break; + + SDataCol *pDataCol = pDataCols->cols + ncol; + SBlockCol *pCompCol = pCompData->cols + tcol; + + if (ncol != 0 && (pDataCol->colId != pCompCol->colId)) continue; + void *tptr = POINTER_SHIFT(pCompData, lsize); + + int32_t flen = 0; // final length + int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite); + + if (pCfg->compression) { + if (pCfg->compression == TWO_STAGE_COMP) { + pHelper->compBuffer = taosTRealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES); + if (pHelper->compBuffer == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } + } + + flen = (*(tDataTypeDesc[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr, + (int32_t)taosTSizeof(pHelper->pBuffer) - lsize, pCfg->compression, + pHelper->compBuffer, (int32_t)taosTSizeof(pHelper->compBuffer)); + } else { + flen = tlen; + memcpy(tptr, pDataCol->pData, flen); + } + + // Add checksum + ASSERT(flen > 0); + flen += sizeof(TSCKSUM); + taosCalcChecksumAppend(0, (uint8_t *)tptr, flen); + pFile->info.magic = + taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)), sizeof(TSCKSUM)); + + if (ncol != 0) { + pCompCol->offset = toffset; + pCompCol->len = flen; + tcol++; + } else { + keyLen = flen; + } + + toffset += flen; + lsize += flen; + } + + pCompData->delimiter = TSDB_FILE_DELIMITER; + pCompData->uid = pHelper->tableInfo.uid; + pCompData->numOfCols = nColsNotAllNull; + + taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize); + pFile->info.magic = taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pCompData, tsize - sizeof(TSCKSUM)), + sizeof(TSCKSUM)); + + // Write the whole block to file + if (taosTWrite(pFile->fd, (void *)pCompData, lsize) < lsize) { + tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize, pFile->fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // Update pCompBlock membership vairables + pCompBlock->last = isLast; + pCompBlock->offset = offset; + pCompBlock->algorithm = pCfg->compression; + pCompBlock->numOfRows = rowsToWrite; + pCompBlock->len = lsize; + pCompBlock->keyLen = keyLen; + pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0; + pCompBlock->numOfCols = nColsNotAllNull; + pCompBlock->keyFirst = dataColsKeyFirst(pDataCols); + pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1); + + tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64 + " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, + REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, pFile->fname, (int64_t)(pCompBlock->offset), + (int)(pCompBlock->numOfRows), pCompBlock->len, pCompBlock->numOfCols, pCompBlock->keyFirst, + pCompBlock->keyLast); + + pFile->info.size += pCompBlock->len; + + return 0; + +_err: + return -1; +} \ No newline at end of file -- GitLab