提交 11d1a829 编写于 作者: H hzcheng

refactor code

上级 0c23737d
...@@ -245,7 +245,7 @@ typedef struct { ...@@ -245,7 +245,7 @@ typedef struct {
int32_t len; int32_t len;
int32_t offset; int32_t offset;
int32_t hasLast : 1; int32_t hasLast : 1;
int32_t numOfSuperBlocks : 31; int32_t numOfBlocks : 31;
int32_t checksum; int32_t checksum;
TSKEY maxKey; TSKEY maxKey;
} SCompIdx; /* sizeof(SCompIdx) = 24 */ } SCompIdx; /* sizeof(SCompIdx) = 24 */
......
...@@ -322,12 +322,12 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { ...@@ -322,12 +322,12 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
if (tsdbWriteBlockToFile(pHelper, pWFile, pDataCols, rowsToWrite, &compBlock, isLast, true) < 0) goto _err; if (tsdbWriteBlockToFile(pHelper, pWFile, pDataCols, rowsToWrite, &compBlock, isLast, true) < 0) goto _err;
if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfSuperBlocks) < 0) goto _err; if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) goto _err;
} else { // (Has old data) AND ((has last block) OR (key overlap)), need to merge the block } else { // (Has old data) AND ((has last block) OR (key overlap)), need to merge the block
SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)(pHelper->pCompInfo->blocks), SCompBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)(pHelper->pCompInfo->blocks),
pIdx->numOfSuperBlocks, sizeof(SCompBlock), compareKeyBlock, TD_GE); pIdx->numOfBlocks, sizeof(SCompBlock), compareKeyBlock, TD_GE);
int blkIdx = (pCompBlock == NULL) ? (pIdx->numOfSuperBlocks - 1) : (pCompBlock - pHelper->pCompInfo->blocks); int blkIdx = (pCompBlock == NULL) ? (pIdx->numOfBlocks - 1) : (pCompBlock - pHelper->pCompInfo->blocks);
if (pCompBlock == NULL) { // No key overlap, must has last block, just merge with the last block if (pCompBlock == NULL) { // No key overlap, must has last block, just merge with the last block
ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last); ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last);
...@@ -362,18 +362,18 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { ...@@ -362,18 +362,18 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
if ((pHelper->files.nLastF.fd > 0) && (pHelper->hasOldLastBlock)) { if ((pHelper->files.nLastF.fd > 0) && (pHelper->hasOldLastBlock)) {
if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1; if (tsdbLoadCompInfo(pHelper, NULL) < 0) return -1;
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + pIdx->numOfSuperBlocks - 1; SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + pIdx->numOfBlocks - 1;
ASSERT(pCompBlock->last); ASSERT(pCompBlock->last);
if (pCompBlock->numOfSubBlocks > 1) { if (pCompBlock->numOfSubBlocks > 1) {
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfSuperBlocks - 1), NULL) < 0) return -1; if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfPoints > 0 && ASSERT(pHelper->pDataCols[0]->numOfPoints > 0 &&
pHelper->pDataCols[0]->numOfPoints < pHelper->config.minRowsPerFileBlock); pHelper->pDataCols[0]->numOfPoints < pHelper->config.minRowsPerFileBlock);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0],
pHelper->pDataCols[0]->numOfPoints, &compBlock, true, true) < 0) pHelper->pDataCols[0]->numOfPoints, &compBlock, true, true) < 0)
return -1; return -1;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfSuperBlocks - 1) < 0) return -1; if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
} else { } else {
if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) return -1; if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) return -1;
...@@ -827,7 +827,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ...@@ -827,7 +827,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
ASSERT(keyFirst <= blockAtIdx(pHelper, blkIdx)->keyLast); ASSERT(keyFirst <= blockAtIdx(pHelper, blkIdx)->keyLast);
TSKEY keyLimit = TSKEY keyLimit =
(blkIdx == pIdx->numOfSuperBlocks - 1) ? INT64_MAX : pHelper->pCompInfo->blocks[blkIdx + 1].keyFirst - 1; (blkIdx == pIdx->numOfBlocks - 1) ? INT64_MAX : pHelper->pCompInfo->blocks[blkIdx + 1].keyFirst - 1;
// rows1: number of rows must merge in this block // rows1: number of rows must merge in this block
int rows1 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast); int rows1 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast);
...@@ -969,7 +969,7 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int ...@@ -969,7 +969,7 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SCompInfo)) < 0) goto _err; if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SCompInfo)) < 0) goto _err;
// Change the offset // Change the offset
for (int i = 0; i < pIdx->numOfSuperBlocks; i++) { for (int i = 0; i < pIdx->numOfBlocks; i++) {
SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock); if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock);
} }
...@@ -984,13 +984,13 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int ...@@ -984,13 +984,13 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
} }
pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock; pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock;
pIdx->numOfSuperBlocks++; pIdx->numOfBlocks++;
pIdx->len += sizeof(SCompBlock); pIdx->len += sizeof(SCompBlock);
ASSERT(pIdx->len <= tsizeof(pHelper->pCompInfo)); ASSERT(pIdx->len <= tsizeof(pHelper->pCompInfo));
pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].keyLast; pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last; pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
if (pIdx->numOfSuperBlocks > 1) { if (pIdx->numOfBlocks > 1) {
ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst); ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst);
} }
...@@ -1022,7 +1022,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId ...@@ -1022,7 +1022,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len + sizeof(SCompBlock)), memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len + sizeof(SCompBlock)),
(void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize); (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize);
for (int i = blkIdx + 1; i < pIdx->numOfSuperBlocks; i++) { for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock); if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SCompBlock);
} }
...@@ -1040,7 +1040,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId ...@@ -1040,7 +1040,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
pIdx->len += sizeof(SCompBlock); pIdx->len += sizeof(SCompBlock);
} else { // Need to create two sub-blocks } else { // Need to create two sub-blocks
void *ptr = NULL; void *ptr = NULL;
for (int i = blkIdx + 1; i < pIdx->numOfSuperBlocks; i++) { for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
if (pTCompBlock->numOfSubBlocks > 1) { if (pTCompBlock->numOfSubBlocks > 1) {
ptr = (void *)((char *)(pHelper->pCompInfo) + pTCompBlock->offset + pTCompBlock->len); ptr = (void *)((char *)(pHelper->pCompInfo) + pTCompBlock->offset + pTCompBlock->len);
...@@ -1053,7 +1053,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId ...@@ -1053,7 +1053,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo)); size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo));
if (tsize > 0) { if (tsize > 0) {
memmove((void *)((char *)ptr + sizeof(SCompBlock) * 2), ptr, tsize); memmove((void *)((char *)ptr + sizeof(SCompBlock) * 2), ptr, tsize);
for (int i = blkIdx + 1; i < pIdx->numOfSuperBlocks; i++) { for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; SCompBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SCompBlock) * 2); if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SCompBlock) * 2);
} }
...@@ -1074,8 +1074,8 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId ...@@ -1074,8 +1074,8 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
pIdx->len += (sizeof(SCompBlock) * 2); pIdx->len += (sizeof(SCompBlock) * 2);
} }
pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].keyLast; pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last; pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
return 0; return 0;
...@@ -1102,7 +1102,7 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int ...@@ -1102,7 +1102,7 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
(void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize); (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize);
} }
for (int i = blkIdx + 1; i < pIdx->numOfSuperBlocks; i++) { for (int i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; SCompBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks); if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset -= (sizeof(SCompBlock) * pSCompBlock->numOfSubBlocks);
} }
...@@ -1112,8 +1112,8 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int ...@@ -1112,8 +1112,8 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
*pSCompBlock = *pCompBlock; *pSCompBlock = *pCompBlock;
pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].keyLast; pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last; pIdx->hasLast = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
return 0; return 0;
} }
......
...@@ -301,7 +301,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo ...@@ -301,7 +301,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid]; SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid];
if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file if (compIndex->len == 0 || compIndex->numOfBlocks == 0) { // no data block in this file, try next file
continue;//no data blocks in the file belongs to pCheckInfo->pTable continue;//no data blocks in the file belongs to pCheckInfo->pTable
} else { } else {
if (pCheckInfo->compSize < compIndex->len) { if (pCheckInfo->compSize < compIndex->len) {
...@@ -327,7 +327,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo ...@@ -327,7 +327,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
TSKEY e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey); TSKEY e = MAX(pCheckInfo->lastKey, pQueryHandle->window.ekey);
// discard the unqualified data block based on the query time window // discard the unqualified data block based on the query time window
int32_t start = binarySearchForBlockImpl(pCompInfo->blocks, compIndex->numOfSuperBlocks, s, TSDB_ORDER_ASC); int32_t start = binarySearchForBlockImpl(pCompInfo->blocks, compIndex->numOfBlocks, s, TSDB_ORDER_ASC);
int32_t end = start; int32_t end = start;
if (s > pCompInfo->blocks[start].keyLast) { if (s > pCompInfo->blocks[start].keyLast) {
...@@ -335,7 +335,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo ...@@ -335,7 +335,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
} }
// todo speedup the procedure of located end block // todo speedup the procedure of located end block
while (end < compIndex->numOfSuperBlocks && (pCompInfo->blocks[end].keyFirst <= e)) { while (end < compIndex->numOfBlocks && (pCompInfo->blocks[end].keyFirst <= e)) {
end += 1; end += 1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册