diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index b66ab5c44680c77f3be86ea6dcbf21d6bfc7d39e..000302af57c73d8071734478d23e5cda6f845d7f 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -1356,6 +1356,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx), pIdx->numOfBlocks - *blkIdx, sizeof(SCompBlock), compareKeyBlock, TD_GE); ASSERT(pCompBlock != NULL); + int tblkIdx = TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock); if (pCompBlock->last) { ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); @@ -1365,17 +1366,20 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows); int rows1 = defaultRowsInBlock - pCompBlock->numOfRows; - int rows2 = tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows); - if (rows2 == 0) { // all data filtered out + int rows2 = + tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows); + if (rows2 == 0) { // all data filtered out *(pCommitIter->pIter) = slIter; } else { - if (rows1 + rows2 < pCfg->minRowsPerFileBlock && pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { + if (rows1 + rows2 < pCfg->minRowsPerFileBlock && pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && + !TSDB_NLAST_FILE_OPENED(pHelper)) { tdResetDataCols(pDataCols); int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols, pDataCols0->cols[0].pData, pDataCols0->numOfRows); ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows); if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1; if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1; + tblkIdx++; } else { if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; int round = 0; @@ -1392,18 +1396,18 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1; } + tblkIdx++; round++; } } } - *blkIdx = pIdx->numOfBlocks; } else { - int tblkIdx = TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock); TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1); if (keyFirst < pCompBlock->keyFirst) { while (true) { tdResetDataCols(pDataCols); - int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, defaultRowsInBlock, pDataCols, NULL, 0); + int rowsRead = + tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, defaultRowsInBlock, pDataCols, NULL, 0); if (rowsRead == 0) break; ASSERT(rowsRead == pDataCols->numOfRows); @@ -1411,18 +1415,18 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; tblkIdx++; } - *blkIdx = tblkIdx; } else { ASSERT(keyFirst <= pCompBlock->keyLast); - int16_t colId =0; + int16_t colId = 0; if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows); slIter = *(pCommitIter->pIter); int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); - int rows2 = tsdbLoadDataFromCache(pTable, &slIter, pCompBlock->keyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows); + int rows2 = tsdbLoadDataFromCache(pTable, &slIter, pCompBlock->keyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, + pDataCols0->numOfRows); - if (rows2 == 0) { // all filtered out + if (rows2 == 0) { // all filtered out *(pCommitIter->pIter) = slIter; } else { int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2; @@ -1434,33 +1438,36 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, pDataCols0->cols[0].pData, pDataCols0->numOfRows); ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, false) < 0) return -1; + if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, false) < 0) + return -1; if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; - *blkIdx = tblkIdx + 1; + tblkIdx++; } else { if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; int round = 0; int dIter = 0; while (true) { - int rowsRead = tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); + int rowsRead = + tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); if (rowsRead == 0) break; if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0) return -1; if (round == 0) { - if (tsdbUpdateSuperBlock(pHelper, &compBlock, 0 /*TODO*/) < 0) return -1; + if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; } else { - if (tsdbInsertSuperBlock(pHelper, &compBlock, 0 /*TODO */) < 0) return -1; + if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; } round++; + tblkIdx++; } } - } } } + *blkIdx = tblkIdx; return 0; }