diff --git a/src/vnode/tsdb/src/tsdbRWHelper.c b/src/vnode/tsdb/src/tsdbRWHelper.c index 2e01d9efeccf5d3affd344685a70c4c6b03436fd..d543b1283149a77a516179e4ec0254d1a0999d9c 100644 --- a/src/vnode/tsdb/src/tsdbRWHelper.c +++ b/src/vnode/tsdb/src/tsdbRWHelper.c @@ -188,18 +188,17 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { TSKEY keyFirst = dataColsKeyFirst(pDataCols); ASSERT(helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)); - SCompIdx curIdx = pHelper->compIdx; // old table SCompIdx for sendfile usage - SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; // for change purpose + // SCompIdx curIdx = pHelper->compIdx; // old table SCompIdx for sendfile usage + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; // for change purpose // Load the SCompInfo part if neccessary ASSERT(helperHasState(pHelper, TSDB_HELPER_TABLE_SET)); if ((!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) && - ((pIdx->offset > 0) && (pIdx->hasLast || dataColsKeyFirst(pDataCols) <= pIdx->maxKey)) && - (tsdbLoadCompInfo(pHelper, NULL) < 0)) - goto _err; + ((pIdx->offset > 0) && (pIdx->hasLast || keyFirst <= pIdx->maxKey))) { + if (tsdbLoadCompInfo(pHelper, NULL) < 0) goto _err; + } - if (!pIdx->hasLast && keyFirst > pIdx->maxKey) { - // Just need to append as a super block + if (pIdx->offset == 0 || (!pIdx->hasLast && keyFirst > pIdx->maxKey)) { // Just append as a super block rowsToWrite = pDataCols->numOfPoints; SFile *pWFile = NULL; bool isLast = false; @@ -218,39 +217,48 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { pIdx->hasLast = compBlock.last; pIdx->numOfSuperBlocks++; - pIdx->maxKey = dataColsKeyLast(pDataCols); - // pIdx->len = ?????? - } else { // (pIdx->hasLast) OR (keyFirst <= pIdx->maxKey) - if (keyFirst > pIdx->maxKey) { - int blkIdx = pIdx->numOfSuperBlocks - 1; - ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[blkIdx].last); - - // Need to merge with the last block - if (tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols) < 0) goto _err; - } else { - // Find the first block greater or equal to the block - SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)(pHelper->pCompInfo->blocks), - pIdx->numOfSuperBlocks, sizeof(SCompBlock), compareKeyBlock, TD_GE); - if (pCompBlock == NULL) { - if (tsdbMergeDataWithBlock(pHelper, pIdx->numOfSuperBlocks-1, pDataCols) < 0) goto _err; - } else { - if (compareKeyBlock((void *)(&keyFirst), (void *)pCompBlock) == 0) { - SCompBlock *pNextBlock = NULL; - TSKEY keyLimit = (pNextBlock == NULL) ? INT_MAX : (pNextBlock->keyFirst - 1); - rowsToWrite = - MIN(nRowsLEThan(pDataCols, keyLimit), pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints); - - if (tsdbMergeDataWithBlock(pHelper, pCompBlock-pHelper->pCompInfo->blocks, pDataCols) < 0) goto _err; + pIdx->maxKey = compBlock.keyLast; + ASSERT(compBlock.keyLast == dataColsKeyLast(pDataCols)); + pIdx->len += sizeof(SCompBlock); + } else { // (Has old data) AND ((has last block) OR (key overlap)), need to merge the block + SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)(pHelper->pCompInfo->blocks), + pIdx->numOfSuperBlocks, sizeof(SCompBlock), compareKeyBlock, TD_GE); + + int blkIdx = (pCompBlock == NULL) ? (pIdx->numOfSuperBlocks - 1) : (pCompBlock - pHelper->pCompInfo->blocks); + + if (pCompBlock == NULL) { // No key overlap, must has last block, just merge with the last block + ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last); + rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols); + if (rowsToWrite < 0) goto _err; + } else { // Has key overlap + + if (compareKeyBlock((void *)(&keyFirst), (void *)pCompBlock) == 0) { // Key overlap with the block + // TSKEY keyLimit = + // (blkIdx == pIdx->numOfSuperBlocks - 1) ? INT_MAX : (pHelper->pCompInfo->blocks[blkIdx + 1].keyFirst - 1); + + rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols); + if (rowsToWrite < 0) goto _err; + + ASSERT(rowsToWrite == MIN(rows1, rows2)); + } else { // Either merge with the previous block or save as a super block in the middle + SCompBlock *prevBlock = (blkIdx == 0) ? NULL : (pCompBlock - 1); + + int rows1 = nRowsLEThan(pDataCols, pCompBlock->keyFirst); // rows write as a super block in the middle + int rows2 = (prevBlock) ? (pHelper->config.maxRowsPerFileBlock - prevBlock->numOfPoints) + : rows1; // rows can merge with the previous block + if (rows1 >= rows2) { + rowsToWrite = tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, rows1, &compBlock, false, true); + if (rowsToWrite < 0) goto _err; + + ASSERT(rowsToWrite == rows1); + + // Add the super block to it + pIdx->len += sizeof(SCompBlock); + pIdx->numOfSuperBlocks++; } else { - // There options: 1. merge with previous block - // 2. commit as one block - // 3. merge with current block - int nRows1 = INT_MAX; - int nRows2 = nRowsLEThan(pDataCols, pCompBlock->keyFirst); - int nRows3 = MIN(nRowsLEThan(pDataCols, (pCompBlock + 1)->keyFirst), (pHelper->config.maxRowsPerFileBlock - pCompBlock->numOfPoints)); - - // TODO: find the block with max rows can merge - if (tsdbMergeDataWithBlock(pHelper, pCompBlock, pDataCols) < 0) goto _err; + rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx-1, pDataCols); + if (rowsToWrite < 0) goto _err; + ASSERT(rowsToWrite == rows2); } } } @@ -258,7 +266,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { return rowsToWrite; - _err: +_err: return -1; } @@ -309,7 +317,7 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { if (lseek(fd, curCompIdx.offset, SEEK_SET) < 0) return -1; adjustMem(pHelper->pCompInfo, pHelper->compInfoSize, curCompIdx.len); - if (tread(fd, (void *)(pHelper->pCompInfo), pHelper) < 0) return -1; + if (tread(fd, (void *)(pHelper->pCompInfo), pHelper->compIdx.len) < pHelper->compIdx.len) return -1; // TODO: check the checksum helperSetState(pHelper, TSDB_HELPER_INFO_LOAD); @@ -521,8 +529,14 @@ static int compareKeyBlock(const void *arg1, const void *arg2) { return 0; } +static FORCE_INLINE int compKeyFunc(const void *arg1, const void *arg2) { + return ((*(TSKEY *)arg1) - (*(TSKEY *)arg2)); +} + static int nRowsLEThan(SDataCols *pDataCols, int maxKey) { - return 0; + void *ptr = taosbsearch((void *)&maxKey, pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), compKeyFunc, TD_LE); + if (ptr == NULL) return 0; + return ((TSKEY *)ptr - (TSKEY *)(pDataCols->cols[0].pData)) + 1; } static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {