diff --git a/src/vnode/tsdb/src/tsdbRWHelper.c b/src/vnode/tsdb/src/tsdbRWHelper.c index c5bd90a6eb7521aad4902bb1db022105f2355be0..7da3327343f2fd239e1758136cc9a5a72d194d3e 100644 --- a/src/vnode/tsdb/src/tsdbRWHelper.c +++ b/src/vnode/tsdb/src/tsdbRWHelper.c @@ -673,37 +673,46 @@ _err: return -1; } -static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) { +static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t spaceNeeded) { SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; - ASSERT(blkIdx >=0 && blkIdx <= pIdx->numOfSuperBlocks); - ASSERT(pCompBlock->numOfSubBlocks == 1); - - // Adjust memory if no more room - size_t spaceNeed = sizeof(SCompBlock); size_t spaceLeft = pHelper->compInfoSize - pIdx->len; ASSERT(spaceLeft >= 0); - if (spaceLeft < spaceNeed) { + if (spaceLeft < spaceNeeded) { size_t tsize = pHelper->compInfoSize + sizeof(SCompBlock) * 16; if (pHelper->compInfoSize == 0) tsize += sizeof(SCompInfo); pHelper->pCompInfo = (SCompInfo *)realloc((void *)(pHelper->pCompInfo), tsize); - if (pHelper->pCompInfo == NULL) goto _err; + if (pHelper->pCompInfo == NULL) return -1; + + pHelper->compInfoSize = tsize; } + return 0; +} + +static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) { + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + + ASSERT(blkIdx >=0 && blkIdx <= pIdx->numOfSuperBlocks); + ASSERT(pCompBlock->numOfSubBlocks == 1); + + // Adjust memory if no more room + if (tsdbAdjustInfoSizeIfNeeded(pHelper, sizeof(SCompBlock)) < 0) goto _err; + // Insert the block if (blkIdx < pIdx->numOfSuperBlocks) { SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + blkIdx; memmove((void *)(pTCompBlock + 1), (void *)pTCompBlock, pIdx->len - sizeof(SCompInfo) - sizeof(SCompBlock) *blkIdx); pTCompBlock++; for (int i = 0; i < pIdx->numOfSuperBlocks - blkIdx; i++) { - pTCompBlock->offset++; + if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock); } } pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock; pIdx->numOfSuperBlocks++; - pIdx->len++; + pIdx->len += sizeof(SCompBlock); pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].keyLast; pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last; @@ -713,12 +722,112 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int return -1; } -static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock) { - // TODO +static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded) { + ASSERT(pCompBlock->numOfSubBlocks == 0); + + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; + ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS); + + size_t spaceNeeded = (pSCompBlock->numOfSubBlocks == 1) ? sizeof(SCompBlock) * 2 : sizeof(SCompBlock); + if (tsdbAdjustInfoSizeIfNeeded(pHelper, spaceNeeded) < 0) goto _err; + + // Add the sub-block + if (pSCompBlock->numOfSubBlocks > 1) { + size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len); + if (tsize > 0) { + memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len + sizeof(SCompBlock)), + (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize); + + for (int i = blkIdx; i < pIdx->numOfSuperBlocks; i++) { + SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; + if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock); + } + } + + + *(SCompBlock *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len) = *pCompBlock; + + pSCompBlock->numOfSubBlocks++; + pSCompBlock->len += sizeof(SCompBlock); + pIdx->len += sizeof(SCompBlock); + } else { // Need to create two sub-blocks + void *ptr = NULL; + for (int i = blkIdx - 1; i >= 0; i--) { + SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; + if (pTCompBlock->numOfSubBlocks > 1) { + ptr = (void *)((char *)(pHelper->pCompInfo) + pTCompBlock->offset + pTCompBlock->len); + break; + } + } + + if (ptr == NULL) + ptr = (void *)((char *)(pHelper->pCompInfo) + sizeof(SCompInfo) + sizeof(SCompBlock) * pIdx->numOfSuperBlocks); + + size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo)); + if (tsize > 0) { + memmove((void *)((char *)ptr + sizeof(SCompBlock) * 2), ptr, tsize); + for (int i = blkIdx + 1; i < pIdx->numOfSuperBlocks; i++) { + SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; + if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SCompBlock) * 2); + } + } + + ((SCompBlock *)ptr)[0] = *pSCompBlock; + ((SCompBlock *)ptr)[0].numOfSubBlocks = 0; + + ((SCompBlock *)ptr)[1] = *pCompBlock; + + pSCompBlock->numOfSubBlocks = 2; + pSCompBlock->numOfPoints += rowsAdded; + pSCompBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo); + pSCompBlock->len = sizeof(SCompBlock) * 2; + pSCompBlock->keyFirst = MIN(((SCompBlock *)ptr)[0].keyFirst, ((SCompBlock *)ptr)[1].keyFirst); + pSCompBlock->keyLast = MAX(((SCompBlock *)ptr)[0].keyLast, ((SCompBlock *)ptr)[1].keyLast); + + pIdx->len += (sizeof(SCompBlock) * 2); + } + + pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].keyLast; + pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last; + return 0; + +_err: + return -1; } static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) { - // TODO + ASSERT(pCompBlock->numOfSubBlocks == 1); + + SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; + + ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfSuperBlocks); + + SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; + + ASSERT(pSCompBlock->numOfSubBlocks >= 1); + + // Delete the sub blocks it has + if (pSCompBlock->numOfSubBlocks > 1) { + size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len); + if (tsize > 0) { + memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset), + (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize); + } + + for (int i = blkIdx + 1; i < pIdx->numOfSuperBlocks; i++) { + SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; + if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks); + } + + pIdx->len -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks); + } + + *pSCompBlock = *pCompBlock; + + pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].keyLast; + pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last; + return 0; } \ No newline at end of file