From 2a4e788d8e56b7dabcfe0db7bb4244836fd8d9ff Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 16 Oct 2020 03:23:33 +0000 Subject: [PATCH] refactor more code --- src/tsdb/src/tsdbCommit.c | 174 +++++++++++++++++++++++++++++++++++--- 1 file changed, 161 insertions(+), 13 deletions(-) diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index aea38372b2..f3345b5384 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -23,6 +23,8 @@ #define TSDB_DATA_FILE_CHANGE 0 #define TSDB_META_FILE_CHANGE 1 +#define TSDB_DEFAULT_ROWS_TO_COMMIT(maxRows) ((maxRows) * 4 / 5) + typedef struct { int maxIters; SCommitIter *pIters; @@ -998,12 +1000,18 @@ static int tsdbCopyBlocks(STSCommitHandle *pTSCh, int sidx, int eidx) { } static int tsdbAppendCommit(STSCommitHandle *pTSCh) { - SDataCols *pDataCols = pTSCh->pDataCols; - SBlock block = {0}; - SBlock * pBlock = █ + SDataCols * pDataCols = pTSCh->pDataCols; + SReadHandle *pReadH = pTSCh->pReadH; + STsdbRepo * pRepo = pReadH->pRepo; + STable * pTable = pReadH->pTable; + SBlock block = {0}; + SBlock * pBlock = █ + STsdbCfg * pCfg = &(pRepo->config); + SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable); + int blockCommitRows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock); tdResetDataCols(pDataCols); - int rowsToRead = tsdbLoadDataFromCache(); + int rowsToRead = tsdbLoadDataFromCache(pTable, pIter->pIter, pReadH->maxKey, blockCommitRows, pDataCols, NULL, 0); ASSERT(rowsToRead > 0); if (tsdbWriteBlockToProperFile(pTSCh, pDataCols, pBlock) < 0) return -1; @@ -1013,15 +1021,10 @@ static int tsdbAppendCommit(STSCommitHandle *pTSCh) { } static int tsdbMergeCommit(STSCommitHandle *pTSCh, SBlock *pBlock) { - STable * pTable = pTSCh->pReadH->pTable; - SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable); - - TSKEY nextKey = tsdbNextIterKey(pIter->pIter); - - if (pBlock->last) { - - } else { - + if (pBlock->last) { // merge with the last block + if (tsdbCommitMergeLastBlock(pTSCh, pBlock) < 0) return -1; + } else { // merge with a data block + if (tsdbCommitMergeDataBlock(pTSCh, pBlock) < 0) return -1; } return 0; @@ -1163,5 +1166,150 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols pFile->info.size += pBlock->len; + return 0; +} + +static int tsdbCommitMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { + SDataCols * pDataCols = pTSCh->pDataCols; + SReadHandle *pReadH = pTSCh->pReadH; + STable * pTable = pTSCh->pReadH->pTable; + SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable); + STsdbRepo * pRepo = pReadH->pRepo; + STsdbCfg * pCfg = &(pRepo->config); + int blockCommitRows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock); + SBlock nblock = {0}; + SBlock * pNBlock = &nblock; + + TSKEY nextKey = tsdbNextIterKey(pIter->pIter); + int bidx = POINTER_DISTANCE(pBlock, pReadH->pBlockInfo->blocks) / sizeof(SBlock); + + if (nextKey > pBlock->keyLast) { // just merge and append + tdResetDataCols(pDataCols); + + int rowsToRead = tsdbLoadDataFromCache(pTable, pIter->pIter, pReadH->maxKey, blockCommitRows - pBlock->numOfRows, + pDataCols, NULL, 0); + ASSERT(rowsToRead > 0); + + if (rowsToRead + pBlock->numOfRows < pCfg->minRowsPerFileBlock && pBlock->numOfSubBlocks < TSDB_MAX_SUB_BLOCKS && + /* No new last file is opened*/) { + if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST), pDataCols, pNBlock, true, + false) < 0) { + return -1; + }; + if (tsdbAddSubBlock(pTSCh, pNBlock, NULL) < 0) return -1; + } else { + if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; + + if (tdMergeDataCols(pReadH->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1; + + if (tsdbWriteBlockToProperFile(pReadH, pReadH->pDataCols[0], pNBlock) < 0) return -1; + if (tsdbAddSuperBlock(pTSCh, pNBlock) < 0) return -1; + } + } else { + if (/* append the old last file */) { + SSkipListIterator titer = *(pIter->pIter); + int16_t colId = 0; + if (tsdbLoadBlockDataCols(pReadH, pBlock, NULL, &colId, 1) < 0) return -1; + + int rowsToRead = tsdbLoadDataFromCache(); + if (rowsToRead == 0) { + *(pIter->pIter) = titer; + tsdbCopyBlocks(); + } else { + if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rowsToRead + Block->numOfRows < pCfg->minRowsPerFileBlock) { + tsdbLoadDataFromCache(); + if (tsdbWriteBlockToFile() < 0) return -1; + if (tsdbaddsubblock() < 0) return -1; + } else { + if (tasdbloadblockdata() < 0) return -1; + + while (true) + { + tsdbLoadAndMergeFromCache(); + } + } + } + } else { + if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; + + while (true) { + tsdbLoadAndMergeFromCache(); + } + } + } + return 0; +} + +static int tsdbCommitMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { + TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1); + TSKEY blkKeyFirst = pCompBlock->keyFirst; + TSKEY blkKeyLast = pCompBlock->keyLast; + + if (keyFirst < blkKeyFirst) { + while (true) { + tdResetDataCols(pDataCols); + int rowsRead = + tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0); + if (rowsRead == 0) break; + + ASSERT(rowsRead == pDataCols->numOfRows); + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; + if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; + tblkIdx++; + } + ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || + tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); + } else { + ASSERT(keyFirst <= blkKeyLast); + int16_t colId = 0; + if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; + + slIter = *(pCommitIter->pIter); + int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); + int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, + pDataCols0->numOfRows); + + if (rows2 == 0) { // all filtered out + *(pCommitIter->pIter) = slIter; + ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || + tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); + } else { + int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2; + + if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) { + int rows = (rows1 >= rows3) ? rows3 : rows2; + tdResetDataCols(pDataCols); + int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, + pDataCols0->cols[0].pData, pDataCols0->numOfRows); + ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) return -1; + if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; + tblkIdx++; + ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || + tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); + } else { + if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; + int round = 0; + int dIter = 0; + while (true) { + int rowsRead = + tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); + if (rowsRead == 0) break; + + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; + if (round == 0) { + if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; + } else { + if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; + } + + round++; + tblkIdx++; + } + ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || + tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); + } + } + } return 0; } \ No newline at end of file -- GitLab