提交 3dfe486d 编写于 作者: H hzcheng

TD-100

上级 b0fe9341
...@@ -673,37 +673,46 @@ _err: ...@@ -673,37 +673,46 @@ _err:
return -1; return -1;
} }
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) { static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t spaceNeeded) {
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
ASSERT(blkIdx >=0 && blkIdx <= pIdx->numOfSuperBlocks);
ASSERT(pCompBlock->numOfSubBlocks == 1);
// Adjust memory if no more room
size_t spaceNeed = sizeof(SCompBlock);
size_t spaceLeft = pHelper->compInfoSize - pIdx->len; size_t spaceLeft = pHelper->compInfoSize - pIdx->len;
ASSERT(spaceLeft >= 0); ASSERT(spaceLeft >= 0);
if (spaceLeft < spaceNeed) { if (spaceLeft < spaceNeeded) {
size_t tsize = pHelper->compInfoSize + sizeof(SCompBlock) * 16; size_t tsize = pHelper->compInfoSize + sizeof(SCompBlock) * 16;
if (pHelper->compInfoSize == 0) tsize += sizeof(SCompInfo); if (pHelper->compInfoSize == 0) tsize += sizeof(SCompInfo);
pHelper->pCompInfo = (SCompInfo *)realloc((void *)(pHelper->pCompInfo), tsize); pHelper->pCompInfo = (SCompInfo *)realloc((void *)(pHelper->pCompInfo), tsize);
if (pHelper->pCompInfo == NULL) goto _err; if (pHelper->pCompInfo == NULL) return -1;
pHelper->compInfoSize = tsize;
} }
return 0;
}
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) {
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
ASSERT(blkIdx >=0 && blkIdx <= pIdx->numOfSuperBlocks);
ASSERT(pCompBlock->numOfSubBlocks == 1);
// Adjust memory if no more room
if (tsdbAdjustInfoSizeIfNeeded(pHelper, sizeof(SCompBlock)) < 0) goto _err;
// Insert the block // Insert the block
if (blkIdx < pIdx->numOfSuperBlocks) { if (blkIdx < pIdx->numOfSuperBlocks) {
SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + blkIdx; SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + blkIdx;
memmove((void *)(pTCompBlock + 1), (void *)pTCompBlock, pIdx->len - sizeof(SCompInfo) - sizeof(SCompBlock) *blkIdx); memmove((void *)(pTCompBlock + 1), (void *)pTCompBlock, pIdx->len - sizeof(SCompInfo) - sizeof(SCompBlock) *blkIdx);
pTCompBlock++; pTCompBlock++;
for (int i = 0; i < pIdx->numOfSuperBlocks - blkIdx; i++) { for (int i = 0; i < pIdx->numOfSuperBlocks - blkIdx; i++) {
pTCompBlock->offset++; if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock);
} }
} }
pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock; pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock;
pIdx->numOfSuperBlocks++; pIdx->numOfSuperBlocks++;
pIdx->len++; pIdx->len += sizeof(SCompBlock);
pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].keyLast; pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].keyLast;
pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last; pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last;
...@@ -713,12 +722,112 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int ...@@ -713,12 +722,112 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
return -1; return -1;
} }
static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock) { static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded) {
// TODO ASSERT(pCompBlock->numOfSubBlocks == 0);
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS);
size_t spaceNeeded = (pSCompBlock->numOfSubBlocks == 1) ? sizeof(SCompBlock) * 2 : sizeof(SCompBlock);
if (tsdbAdjustInfoSizeIfNeeded(pHelper, spaceNeeded) < 0) goto _err;
// Add the sub-block
if (pSCompBlock->numOfSubBlocks > 1) {
size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len);
if (tsize > 0) {
memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len + sizeof(SCompBlock)),
(void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize);
for (int i = blkIdx; i < pIdx->numOfSuperBlocks; i++) {
SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock);
}
}
*(SCompBlock *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len) = *pCompBlock;
pSCompBlock->numOfSubBlocks++;
pSCompBlock->len += sizeof(SCompBlock);
pIdx->len += sizeof(SCompBlock);
} else { // Need to create two sub-blocks
void *ptr = NULL;
for (int i = blkIdx - 1; i >= 0; i--) {
SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
if (pTCompBlock->numOfSubBlocks > 1) {
ptr = (void *)((char *)(pHelper->pCompInfo) + pTCompBlock->offset + pTCompBlock->len);
break;
}
}
if (ptr == NULL)
ptr = (void *)((char *)(pHelper->pCompInfo) + sizeof(SCompInfo) + sizeof(SCompBlock) * pIdx->numOfSuperBlocks);
size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo));
if (tsize > 0) {
memmove((void *)((char *)ptr + sizeof(SCompBlock) * 2), ptr, tsize);
for (int i = blkIdx + 1; i < pIdx->numOfSuperBlocks; i++) {
SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SCompBlock) * 2);
}
}
((SCompBlock *)ptr)[0] = *pSCompBlock;
((SCompBlock *)ptr)[0].numOfSubBlocks = 0;
((SCompBlock *)ptr)[1] = *pCompBlock;
pSCompBlock->numOfSubBlocks = 2;
pSCompBlock->numOfPoints += rowsAdded;
pSCompBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo);
pSCompBlock->len = sizeof(SCompBlock) * 2;
pSCompBlock->keyFirst = MIN(((SCompBlock *)ptr)[0].keyFirst, ((SCompBlock *)ptr)[1].keyFirst);
pSCompBlock->keyLast = MAX(((SCompBlock *)ptr)[0].keyLast, ((SCompBlock *)ptr)[1].keyLast);
pIdx->len += (sizeof(SCompBlock) * 2);
}
pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].keyLast;
pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last;
return 0; return 0;
_err:
return -1;
} }
static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) { static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) {
// TODO ASSERT(pCompBlock->numOfSubBlocks == 1);
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfSuperBlocks);
SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
ASSERT(pSCompBlock->numOfSubBlocks >= 1);
// Delete the sub blocks it has
if (pSCompBlock->numOfSubBlocks > 1) {
size_t tsize = pIdx->len - (pSCompBlock->offset + pSCompBlock->len);
if (tsize > 0) {
memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset),
(void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize);
}
for (int i = blkIdx + 1; i < pIdx->numOfSuperBlocks; i++) {
SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks);
}
pIdx->len -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks);
}
*pSCompBlock = *pCompBlock;
pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].keyLast;
pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last;
return 0; return 0;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册