From fea498845aea1ab37b7a8383af64535de7c590b8 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 18 Oct 2020 18:19:43 +0800 Subject: [PATCH] refactor more code --- src/tsdb/src/tsdbCommit.c | 183 ++++---------------------------------- 1 file changed, 16 insertions(+), 167 deletions(-) diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index b70666396e..ff918e396b 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -680,17 +680,15 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) { static int tsdbWriteBlockToRightFile(STSCommitHandle *pTSCh, SDataCols *pDataCols, SBlock *pBlock) { STsdbCfg *pCfg = &(pTSCh->pReadH->pRepo->config); - SFile * pFile = NULL; - bool islast = false; + int ftype = 0; if (pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { - pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA); + ftype = TSDB_FILE_TYPE_DATA; } else { - islast = true; - pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST); + ftype = TSDB_FILE_TYPE_LAST; } - if (tsdbWriteBlockToFile(pTSCh, pFile, pDataCols, pBlock, islast, true) < 0) return -1; + if (tsdbWriteBlockToFile(pTSCh, ftype, pDataCols, pBlock, true) < 0) return -1; return 0; } @@ -964,8 +962,8 @@ static int tsdbMergeCommit(STSCommitHandle *pTSCh, SBlock *pBlock) { return 0; } -static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, - bool isSuperBlock) { +static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, int ftype, SDataCols *pDataCols, SBlock *pBlock, bool isSuperBlock) { + SFile * pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, ftype); SReadHandle *pReadH = pTSCh->pReadH; STsdbRepo * pRepo = pReadH->pRepo; STsdbCfg * pCfg = &(pRepo->config); @@ -973,6 +971,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols int nColsNotAllNull = 0; int csize = TSDB_BLOCK_DATA_LEN(nColsNotAllNull); // column size int32_t keyLen = 0; + bool isLast = (ftype == TSDB_FILE_TYPE_LAST); ASSERT(offset == lseek(pFile->fd, 0, SEEK_END)); ASSERT(pDataCols->numOfRows > 0 && pDataCols->numOfRows <= pCfg->maxRowsPerFileBlock); @@ -1101,151 +1100,6 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols return 0; } -// static int tsdbCommitMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { -// SDataCols * pDataCols = pTSCh->pDataCols; -// SReadHandle *pReadH = pTSCh->pReadH; -// STable * pTable = pTSCh->pReadH->pTable; -// SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable); -// STsdbRepo * pRepo = pReadH->pRepo; -// STsdbCfg * pCfg = &(pRepo->config); -// int blockCommitRows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock); -// SBlock nblock = {0}; -// SBlock * pNBlock = &nblock; - -// TSKEY nextKey = tsdbNextIterKey(pIter->pIter); -// int bidx = POINTER_DISTANCE(pBlock, pReadH->pBlockInfo->blocks) / sizeof(SBlock); - -// if (nextKey > pBlock->keyLast) { // just merge and append -// tdResetDataCols(pDataCols); - -// int rowsToRead = tsdbLoadDataFromCache(pTable, pIter->pIter, pReadH->maxKey, blockCommitRows - pBlock->numOfRows, -// pDataCols, NULL, 0); -// ASSERT(rowsToRead > 0); - -// if (rowsToRead + pBlock->numOfRows < pCfg->minRowsPerFileBlock && pBlock->numOfSubBlocks < TSDB_MAX_SUB_BLOCKS && -// /* No new last file is opened*/) { -// if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST), pDataCols, pNBlock, true, -// false) < 0) { -// return -1; -// }; -// if (tsdbAddSubBlock(pTSCh, pNBlock, NULL) < 0) return -1; -// } else { -// if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; - -// if (tdMergeDataCols(pReadH->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1; - -// if (tsdbWriteBlockToProperFile(pReadH, pReadH->pDataCols[0], pNBlock) < 0) return -1; -// if (tsdbAddSuperBlock(pTSCh, pNBlock) < 0) return -1; -// } -// } else { -// if (/* append the old last file */) { -// SSkipListIterator titer = *(pIter->pIter); -// int16_t colId = 0; -// if (tsdbLoadBlockDataCols(pReadH, pBlock, NULL, &colId, 1) < 0) return -1; - -// int rowsToRead = tsdbLoadDataFromCache(); -// if (rowsToRead == 0) { -// *(pIter->pIter) = titer; -// tsdbCopyBlocks(); -// } else { -// if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rowsToRead + Block->numOfRows < pCfg->minRowsPerFileBlock) { -// tsdbLoadDataFromCache(); -// if (tsdbWriteBlockToFile() < 0) return -1; -// if (tsdbaddsubblock() < 0) return -1; -// } else { -// if (tasdbloadblockdata() < 0) return -1; - -// while (true) -// { -// tsdbLoadAndMergeFromCache(); -// } -// } -// } -// } else { -// if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; - -// while (true) { -// tsdbLoadAndMergeFromCache(); -// } -// } -// } -// return 0; -// } - -// static int tsdbCommitMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { -// TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1); -// TSKEY blkKeyFirst = pCompBlock->keyFirst; -// TSKEY blkKeyLast = pCompBlock->keyLast; - -// if (keyFirst < blkKeyFirst) { -// while (true) { -// tdResetDataCols(pDataCols); -// int rowsRead = -// tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0); -// if (rowsRead == 0) break; - -// ASSERT(rowsRead == pDataCols->numOfRows); -// if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; -// if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; -// tblkIdx++; -// } -// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || -// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); -// } else { -// ASSERT(keyFirst <= blkKeyLast); -// int16_t colId = 0; -// if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; - -// slIter = *(pCommitIter->pIter); -// int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); -// int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, -// pDataCols0->numOfRows); - -// if (rows2 == 0) { // all filtered out -// *(pCommitIter->pIter) = slIter; -// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || -// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); -// } else { -// int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2; - -// if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) { -// int rows = (rows1 >= rows3) ? rows3 : rows2; -// tdResetDataCols(pDataCols); -// int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, -// pDataCols0->cols[0].pData, pDataCols0->numOfRows); -// ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); -// if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) return -1; -// if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; -// tblkIdx++; -// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || -// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); -// } else { -// if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; -// int round = 0; -// int dIter = 0; -// while (true) { -// int rowsRead = -// tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); -// if (rowsRead == 0) break; - -// if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; -// if (round == 0) { -// if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; -// } else { -// if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; -// } - -// round++; -// tblkIdx++; -// } -// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || -// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); -// } -// } -// } -// return 0; -// } - static int tsdbEncodeBlockIdxArray(STSCommitHandle *pTSCh) { SReadHandle *pReadH = pTSCh->pReadH; int len = 0; @@ -1293,13 +1147,16 @@ static int tsdbCopyBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { SFile * pWFile = NULL; SFile * pRFile = NULL; SBlock newBlock = {0}; + int ftype = 0; if (pBlock->last) { pWFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST); pRFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST); + ftype = TSDB_FILE_TYPE_LAST; } else { pWFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA); pRFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_DATA); + ftype = TSDB_FILE_TYPE_DATA; } // TODO: use flag to omit this string compare. this may cause a lot of time @@ -1316,7 +1173,7 @@ static int tsdbCopyBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { } } else { if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; - if (tsdbWriteBlockToFile(pTSCh, pWFile, pReadH->pDataCols[0], &newBlock, pBlock->last, true) < 0) return -1; + if (tsdbWriteBlockToFile(pTSCh, ftype, pReadH->pDataCols[0], &newBlock, true) < 0) return -1; if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1; } @@ -1385,8 +1242,7 @@ static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { if (pBlock->numOfRows + pDataCols->numOfRows < pCfg->minRowsPerFileBlock && pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && true /*TODO: check if same file*/) { - pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST); - if (tsdbWriteBlockToFile(pTSCh, pFile, pDataCols, &newBlock, true, false) < 0) return -1; + if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_LAST, pDataCols, &newBlock, false) < 0) return -1; // TODO: refactor code here if (tsdbInsertSubBlock(pTSCh, &newBlock) < 0) return -1; } else { @@ -1412,9 +1268,7 @@ static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { tsdbLoadDataFromCache(pTable, pIter->pIter, pTSCh->maxKey, INT32_MAX, pDataCols, pReadH->pDataCols[0]->cols[0].pData, pBlock->numOfRows); ASSERT(pDataCols->numOfRows == rows); - if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST), pDataCols, &newBlock, - true, false) < 0) - return -1; + if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_LAST, pDataCols, &newBlock, false) < 0) return -1; if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1; if (tsdbInsertSubBlock() < 0) return -1; // TODO } else { @@ -1457,8 +1311,7 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { tdResetDataCols(pDataCols); rows = tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, pBlock->keyFirst - 1, dbrows, pDataCols, NULL, 0); ASSERT(rows > 0); - if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA), pDataCols, &newBlock, - false, true) < 0) + if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_DATA, pDataCols, &newBlock, true) < 0) return -1; if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1; @@ -1487,9 +1340,7 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, rows, pDataCols, pReadH->pDataCols[0]->cols[0].pData, pBlock->numOfRows); ASSERT(pDataCols->numOfRows == rows); - if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA), pDataCols, &newBlock, - false, false) < 0) - return -1; + if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_DATA, pDataCols, &newBlock, false) < 0) return -1; if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1; if (tsdbInsertSubBlock() < 0) return -1; // TODO } else { @@ -1498,9 +1349,7 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { tdResetDataCols(pDataCols); rows = tsdbLoadMergeFromCache(pTSCh, keyLimit); if (rows == 0) break; - if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA), pDataCols, &newBlock, - false, true) < 0) - return -1; + if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_DATA, pDataCols, &newBlock, true) < 0) return -1; if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1; } } -- GitLab