diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 96567ad009c433191e672fb2f64bf278455bf05a..048c64032ca6ca2c018339ab2c1284f7a48524ca 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -37,13 +37,14 @@ typedef struct { typedef struct STableBlockScanInfo { uint64_t uid; TSKEY lastKey; - SMapData mapData; // block info (compressed) - SArray* pBlockList; // block data index list - SIterInfo iter; // mem buffer skip list iterator - SIterInfo iiter; // imem buffer skip list iterator - SArray* delSkyline; // delete info for this table + SMapData mapData; // block info (compressed) + SArray* pBlockList; // block data index list + SIterInfo iter; // mem buffer skip list iterator + SIterInfo iiter; // imem buffer skip list iterator + SArray* delSkyline; // delete info for this table int32_t fileDelIndex; - bool iterInit; // whether to initialize the in-memory skip list iterator or not + bool iterInit; // whether to initialize the in-memory skip list iterator or not + int16_t indexInBlockL;// row position in last block } STableBlockScanInfo; typedef struct SBlockOrderWrapper { @@ -76,12 +77,27 @@ typedef struct SBlockLoadSuppInfo { char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. } SBlockLoadSuppInfo; +typedef struct SVersionRange { + uint64_t minVer; + uint64_t maxVer; +} SVersionRange; + +typedef struct SLastBlockReader { + SArray* pBlockL; + int32_t currentBlockIndex; + SBlockData lastBlockData; + STimeWindow window; + SVersionRange verRange; + uint64_t uid; + int32_t rowIndex; +} SLastBlockReader; + typedef struct SFilesetIter { int32_t numOfFiles; // number of total files int32_t index; // current accessed index in the list SArray* pFileList; // data file list int32_t order; - SArray* pLastBlockList;// last block array list + SLastBlockReader* pLastBlockReader; // last file block reader } SFilesetIter; typedef struct SFileDataBlockInfo { @@ -91,14 +107,12 @@ typedef struct SFileDataBlockInfo { } SFileDataBlockInfo; typedef struct SDataBlockIter { - SBlockNumber numOfBlocks; + int32_t numOfBlocks; int32_t index; SArray* blockList; // SArray - SArray* pLastBlockList; // last block list int32_t order; SBlock block; // current SBlock data SHashObj* pTableMap; - SArray* pBlockL; // ptr to SBlockL in fileIterator } SDataBlockIter; typedef struct SFileBlockDumpInfo { @@ -108,11 +122,6 @@ typedef struct SFileBlockDumpInfo { bool allDumped; } SFileBlockDumpInfo; -typedef struct SVersionRange { - uint64_t minVer; - uint64_t maxVer; -} SVersionRange; - typedef struct SReaderStatus { bool loadFromFile; // check file stage SHashObj* pTableMap; // SHash @@ -142,7 +151,6 @@ struct STsdbReader { STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times SDataFReader* pFileReader; SVersionRange verRange; - SArray* pLastBlock; // last block info int32_t step; STsdbReader* innerReader[2]; @@ -154,10 +162,11 @@ static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger); +static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, int64_t ts, SRowMerger* pMerger); static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid); -static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, +static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex); static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); @@ -171,6 +180,8 @@ static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdb static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr, int8_t* pLevel); static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); +static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader); +static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; @@ -191,7 +202,6 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { if (IS_VAR_DATA_TYPE(pCol->info.type)) { pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes); - // tsdbInfo("-------------------%d\n", pCol->info.bytes); } } @@ -208,7 +218,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK } for (int32_t j = 0; j < numOfTables; ++j) { - STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid}; + STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = -1}; if (ASCENDING_TRAVERSE(pTsdbReader->order)) { if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) { info.lastKey = pTsdbReader->window.skey; @@ -310,11 +320,15 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, int32 pIter->pFileList = aDFileSet; pIter->numOfFiles = numOfFileset; - pIter->pLastBlockList = taosArrayInit(4, sizeof(SBlockL)); - if (pIter->pLastBlockList == NULL) { - int32_t code = TSDB_CODE_OUT_OF_MEMORY; - tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), idstr); - return code; + if (pIter->pLastBlockReader == NULL) { + pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader)); + if (pIter->pLastBlockReader == NULL) { + int32_t code = TSDB_CODE_OUT_OF_MEMORY; + tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), idstr); + return code; + } + + pIter->pLastBlockReader->pBlockL = taosArrayInit(4, sizeof(SBlockL)); } tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr); @@ -377,7 +391,7 @@ _err: static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashObj* pTableMap) { pIter->order = order; pIter->index = -1; - pIter->numOfBlocks = (SBlockNumber){-1, -1}; + pIter->numOfBlocks = -1; if (pIter->blockList == NULL) { pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); } else { @@ -685,7 +699,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* } pBlockNum->numOfLastBlocks += 1; - taosArrayPush(pQualifiedLastBlock, &i); + taosArrayPush(pQualifiedLastBlock, pLastBlock); } } @@ -726,12 +740,13 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_ } static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { - if (pBlockIter->index >= pBlockIter->numOfBlocks.numOfBlocks) { + if (taosArrayGetSize(pBlockIter->blockList) == 0) { + ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList)); return NULL; - } else { - SFileDataBlockInfo* pFBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index); - return pFBlockInfo; } + + SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index); + return pBlockInfo; } static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; } @@ -824,13 +839,17 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI double elapsedTime = 0; int32_t code = 0; - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); + SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - if (pFBlock != NULL) { + if (pBlockInfo != NULL) { SBlock* pBlock = getCurrentBlock(pBlockIter); code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData); if (code != TSDB_CODE_SUCCESS) { + tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 + ", rows:%d, code:%s %s", + pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, + tstrerror(code), pReader->idStr); goto _error; } @@ -838,14 +857,41 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", - pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, + pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr); } else { - int32_t pos = pBlockIter->index - pBlockIter->numOfBlocks.numOfBlocks; - int32_t* index = taosArrayGet(pBlockIter->pLastBlockList, pos); +#if 0 + SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; + + uint64_t uid = pBlockInfo->uid; + SArray* pBlocks = pLastBlockReader->pBlockL; - SBlockL* pBlockL = taosArrayGet(pReader->status.fileIter.pLastBlockList, *index); + pLastBlockReader->currentBlockIndex = -1; + + // find the correct SBlockL + for(int32_t i = 0; i < taosArrayGetSize(pBlocks); ++i) { + SBlockL* pBlock = taosArrayGet(pBlocks, i); + if (pBlock->minUid >= uid && pBlock->maxUid <= uid) { + pLastBlockReader->currentBlockIndex = i; + break; + } + } + +// SBlockL* pBlockL = taosArrayGet(pLastBlockReader->pBlockL, *index); code = tsdbReadLastBlock(pReader->pFileReader, pBlockL, pBlockData); + if (code != TSDB_CODE_SUCCESS) { + tsdbDebug("%p error occurs in loading last block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64 + ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", code:%s %s", + pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlockL->nRow, + pBlockL->minVer, pBlockL->maxVer, tstrerror(code), pReader->idStr); + goto _error; + } + + tsdbDebug("%p load last file block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64 + ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", + pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlockL->nRow, + pBlockL->minVer, pBlockL->maxVer, elapsedTime, pReader->idStr); +#endif } pReader->cost.blockLoadTime += elapsedTime; @@ -854,10 +900,6 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI return TSDB_CODE_SUCCESS; _error: -// tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 -// ", rows:%d, %s", -// pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, -// pReader->idStr); return code; } @@ -926,17 +968,12 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) { return TSDB_CODE_SUCCESS; } -static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockNumber* pBlockNum, SArray* pQLastBlock) { +static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) { bool asc = ASCENDING_TRAVERSE(pReader->order); - pBlockIter->numOfBlocks = *pBlockNum; + pBlockIter->numOfBlocks = numOfBlocks; taosArrayClear(pBlockIter->blockList); - if (pBlockNum->numOfLastBlocks > 0) { - taosArrayDestroy(pBlockIter->pLastBlockList); - pBlockIter->pLastBlockList = pQLastBlock; - } - // access data blocks according to the offset of each block in asc/desc order. int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap); @@ -988,21 +1025,20 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte sup.numOfTables += 1; } - ASSERT(pBlockNum->numOfBlocks == cnt); - int32_t total = pBlockNum->numOfLastBlocks + pBlockNum->numOfBlocks; + ASSERT(numOfBlocks == cnt); // since there is only one table qualified, blocks are not sorted - if (sup.numOfTables == 1 || pBlockNum->numOfBlocks == 0) { - for (int32_t i = 0; i < pBlockNum->numOfBlocks; ++i) { + if (sup.numOfTables == 1) { + for (int32_t i = 0; i < numOfBlocks; ++i) { SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i}; taosArrayPush(pBlockIter->blockList, &blockInfo); } int64_t et = taosGetTimestampUs(); tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s", - pReader, total, (et - st) / 1000.0, pReader->idStr); + pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr); - pBlockIter->index = asc ? 0 : (total - 1); + pBlockIter->index = asc ? 0 : (numOfBlocks - 1); cleanupBlockOrderSupporter(&sup); doSetCurrentBlock(pBlockIter); return TSDB_CODE_SUCCESS; @@ -1011,7 +1047,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables, pReader->idStr); - ASSERT(cnt <= pBlockNum->numOfBlocks && sup.numOfTables <= numOfTables); + ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables); SMultiwayMergeTreeInfo* pTree = NULL; uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar); @@ -1038,12 +1074,12 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte } int64_t et = taosGetTimestampUs(); - tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, total, (et - st) / 1000.0, + tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr); cleanupBlockOrderSupporter(&sup); taosMemoryFree(pTree); - pBlockIter->index = asc ? 0 : (total - 1); + pBlockIter->index = asc ? 0 : (numOfBlocks - 1); doSetCurrentBlock(pBlockIter); return TSDB_CODE_SUCCESS; @@ -1053,7 +1089,7 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) { bool asc = ASCENDING_TRAVERSE(pBlockIter->order); int32_t step = asc ? 1 : -1; - if ((pBlockIter->index >= pBlockIter->numOfBlocks.numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) { + if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) { return false; } @@ -1100,7 +1136,7 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1; int32_t index = pBlockIter->index; - while (index < pBlockIter->numOfBlocks.numOfBlocks && index >= 0) { + while (index < pBlockIter->numOfBlocks && index >= 0) { SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index); if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) { return index; @@ -1114,7 +1150,7 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock } static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) { - if (index < 0 || index >= pBlockIter->numOfBlocks.numOfBlocks) { + if (index < 0 || index >= pBlockIter->numOfBlocks) { return -1; } @@ -1224,7 +1260,7 @@ static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBl // 4. output buffer should be large enough to hold all rows in current block // 5. delete info should not overlap with current block data static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock, - STableBlockScanInfo* pScanInfo, TSDBKEY key) { + STableBlockScanInfo* pScanInfo, TSDBKEY key, SLastBlockReader* pLastBlockReader) { int32_t neighborIndex = 0; SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order); @@ -1239,8 +1275,17 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc bool hasDup = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true; bool overlapWithDel = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order); + // todo here we need to each key in the last files to identify if it is really overlapped with last block + bool overlapWithlastBlock = false; + if (hasDataInLastBlock(pLastBlockReader)) { + SBlockL *pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex); +// int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader); + overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey); + } + return (overlapWithNeighbor || hasDup || dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock) || - keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity) || overlapWithDel); + keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity) || + overlapWithDel || overlapWithlastBlock); } static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) { @@ -1279,7 +1324,7 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step]; if (nextKey != key) { // merge is not needed - doAppendRowFromBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); + doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); pDumpInfo->rowIndex += step; return true; } @@ -1386,12 +1431,11 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return TSDB_CODE_SUCCESS; } -static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { +static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { SRowMerger merge = {0}; STSRow* pTSRow = NULL; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - SBlockData* pBlockData = &pReader->status.fileBlockData; SArray* pDelList = pBlockScanInfo->delSkyline; TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader); @@ -1532,6 +1576,14 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { + // it is an multi-table data block + if (pBlockData->aUid != NULL) { + uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex]; + if (uid != pBlockScanInfo->uid) { // move to next row + return false; + } + } + // check for version and time range int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex]; if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) { @@ -1553,16 +1605,87 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } -static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { +static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, STimeWindow* pWin, SVersionRange* pVerRange, + int16_t startPos) { + pLastBlockReader->uid = uid; + pLastBlockReader->window = *pWin; + pLastBlockReader->verRange = *pVerRange; + pLastBlockReader->rowIndex = startPos; +} + +static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) { + if (pLastBlockReader->rowIndex >= pLastBlockReader->lastBlockData.nRow) { + return false; + } + + pLastBlockReader->rowIndex += 1; + + SBlockData* pBlockData = &pLastBlockReader->lastBlockData; + for(int32_t i = pLastBlockReader->rowIndex; i < pBlockData->nRow; ++i) { + if (pBlockData->aUid[i] != pLastBlockReader->uid) { + continue; + } + + if (pBlockData->aTSKEY[i] < pLastBlockReader->window.skey) { + continue; + } + + if (pBlockData->aVersion[i] < pLastBlockReader->verRange.minVer) { + continue; + } + + // no data any more + if (pBlockData->aTSKEY[i] > pLastBlockReader->window.ekey) { + return false; + } + + if (pBlockData->aVersion[i] > pLastBlockReader->verRange.maxVer) { + return false; + } + + pLastBlockReader->rowIndex = i; + return true; + } + + pLastBlockReader->rowIndex = pBlockData->nRow; + return false; +} + +static int32_t saveCurrentState(SLastBlockReader* pLastBlockReader) { + return pLastBlockReader->rowIndex; +} + +static void restoreState(SLastBlockReader* pLastBlockReader, int32_t state) { + pLastBlockReader->rowIndex = state; +} + +static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { + SBlockData* pBlockData = &pLastBlockReader->lastBlockData; + return pBlockData->aTSKEY[pLastBlockReader->rowIndex]; +} + +// todo handle desc order +static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { + if (pLastBlockReader->rowIndex >= pLastBlockReader->lastBlockData.nRow) { + return false; + } + + return true; +} + +static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader *pLastBlockReader) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - SBlockData* pBlockData = &pReader->status.fileBlockData; - int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; + int64_t key = INT64_MIN; + if (pBlockData->nRow > 0) { + key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; + } + TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) { - return doMergeThreeLevelRows(pReader, pBlockScanInfo); + return doMergeThreeLevelRows(pReader, pBlockScanInfo, pBlockData); } else { // imem + file if (pBlockScanInfo->iiter.hasVal) { @@ -1574,18 +1697,74 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key); } - // imem & mem are all empty, only file exist - if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { - return TSDB_CODE_SUCCESS; - } else { + if (pBlockData->nRow > 0) { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + // row in last file block + int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader); + if (ts < key) { // save rows in last block + SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; + int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); + + STSRow* pTSRow = NULL; + SRowMerger merge = {0}; + + TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex); + + tRowMergerInit(&merge, &fRow1, pReader->pSchema); + doMergeRowsInLastBlock(pLastBlockReader, tsLastBlock, &merge); + tRowMergerGetRow(&merge, &pTSRow); + + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); + + taosMemoryFree(pTSRow); + tRowMergerClear(&merge); + return TSDB_CODE_SUCCESS; + } else if (ts == key) { + STSRow* pTSRow = NULL; + SRowMerger merge = {0}; + + tRowMergerInit(&merge, &fRow, pReader->pSchema); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + doMergeRowsInLastBlock(pLastBlockReader, ts, &merge); + + tRowMergerGetRow(&merge, &pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); + + taosMemoryFree(pTSRow); + tRowMergerClear(&merge); + return TSDB_CODE_SUCCESS; + } else { // ts > key, asc; todo handle desc + // imem & mem are all empty, only file exist + if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { + return TSDB_CODE_SUCCESS; + } else { + STSRow* pTSRow = NULL; + SRowMerger merge = {0}; + + tRowMergerInit(&merge, &fRow, pReader->pSchema); + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + tRowMergerGetRow(&merge, &pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); + + taosMemoryFree(pTSRow); + tRowMergerClear(&merge); + return TSDB_CODE_SUCCESS; + } + } + } else { // only last block exists + SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; + int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); + STSRow* pTSRow = NULL; SRowMerger merge = {0}; + TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex); + tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + doMergeRowsInLastBlock(pLastBlockReader, tsLastBlock, &merge); tRowMergerGetRow(&merge, &pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); taosMemoryFree(pTSRow); @@ -1599,25 +1778,34 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { SSDataBlock* pResBlock = pReader->pResBlock; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); - STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + + STableBlockScanInfo* pBlockScanInfo = NULL; + if (pBlockInfo != NULL) { + pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + } else { + pBlockScanInfo = pReader->status.pTableIter; + } + + SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; + + int16_t startIndex = pBlockInfo != NULL? pBlockScanInfo->indexInBlockL:-1; + initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pReader->window, &pReader->verRange, startIndex); + + bool has = nextRowInLastBlock(pLastBlockReader); // todo handle if no data in last block SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SBlockData* pBlockData = &pReader->status.fileBlockData; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; - int32_t numOfSub = 1; - int64_t st = taosGetTimestampUs(); while (1) { // todo check the validate of row in file block { - if (!isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) { + if (pBlockData->nRow > 0 && !isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) { pDumpInfo->rowIndex += step; SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter); - numOfSub = pBlock->nSubBlock; - if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) { setBlockAllDumped(pDumpInfo, pBlock, pReader->order); break; @@ -1625,13 +1813,17 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { continue; } + + if (!hasDataInLastBlock(pLastBlockReader)) { + break; + } } - buildComposedDataBlockImpl(pReader, pBlockScanInfo); + buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter); // currently loaded file data block is consumed - if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) { + if (pBlockData->nRow > 0 && (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0)) { setBlockAllDumped(pDumpInfo, pBlock, pReader->order); break; } @@ -1647,9 +1839,9 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { setComposedBlockFlag(pReader, true); int64_t et = taosGetTimestampUs(); - tsdbDebug("%p uid:%" PRIu64 ", composed data block created, subBlock:%d, brange:%" PRIu64 "-%" PRIu64 + tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, elapsed time:%.2f ms %s", - pReader, pBlockScanInfo->uid, numOfSub, pResBlock->info.window.skey, pResBlock->info.window.ekey, + pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr); return TSDB_CODE_SUCCESS; @@ -1825,7 +2017,7 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead return key; } -static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SArray* pQLastBlock) { +static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) { SReaderStatus* pStatus = &pReader->status; size_t numOfTables = taosHashGetSize(pReader->status.pTableMap); @@ -1844,21 +2036,29 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr return code; } - code = tsdbReadBlockL(pReader->pFileReader, pStatus->fileIter.pLastBlockList); + SArray* pLastBlocks = pStatus->fileIter.pLastBlockReader->pBlockL; + code = tsdbReadBlockL(pReader->pFileReader, pLastBlocks); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pIndexList); return code; } - if (taosArrayGetSize(pIndexList) > 0 || taosArrayGetSize(pStatus->fileIter.pLastBlockList) > 0) { - code = doLoadFileBlock(pReader, pIndexList, pStatus->fileIter.pLastBlockList, pBlockNum, pQLastBlock); + if (taosArrayGetSize(pIndexList) > 0 || taosArrayGetSize(pLastBlocks) > 0) { + SArray* pQLastBlock = taosArrayInit(4, sizeof(SBlockL)); + + code = doLoadFileBlock(pReader, pIndexList, pLastBlocks, pBlockNum, pQLastBlock); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pIndexList); + taosArrayDestroy(pQLastBlock); return code; } if (pBlockNum->numOfBlocks + pBlockNum->numOfLastBlocks > 0) { ASSERT(taosArrayGetSize(pQLastBlock) == pBlockNum->numOfLastBlocks); + taosArrayClear(pLastBlocks); + taosArrayAddAll(pLastBlocks, pQLastBlock); + + taosArrayDestroy(pQLastBlock); break; } } @@ -1870,30 +2070,112 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr return TSDB_CODE_SUCCESS; } +static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, uint64_t uid, STsdbReader* pReader) { + SArray* pBlocks = pLastBlockReader->pBlockL; + SBlockL* pBlock = NULL; + + pLastBlockReader->currentBlockIndex = -1; + + // find the correct SBlockL + for (int32_t i = 0; i < taosArrayGetSize(pBlocks); ++i) { + SBlockL* p = taosArrayGet(pBlocks, i); + if (p->minUid <= uid && p->maxUid >= uid) { + pLastBlockReader->currentBlockIndex = i; + pBlock = p; + break; + } + } + + if (pLastBlockReader->currentBlockIndex == -1) { + return TSDB_CODE_SUCCESS; + } + + int32_t code = tBlockDataCreate(&pLastBlockReader->lastBlockData); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = tBlockDataInit(&pLastBlockReader->lastBlockData, pReader->suid, pReader->suid ? 0 : uid, pReader->pSchema); + if (code != TSDB_CODE_SUCCESS) { + //todo add log + return code; + } + + code = tsdbReadLastBlock(pReader->pFileReader, pBlock, &pLastBlockReader->lastBlockData); + if (code != TSDB_CODE_SUCCESS) { + // tsdbDebug("%p error occurs in loading last block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64 + // ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", code:%s %s", + // pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlock->nRow, + // pBlock->minVer, pBlock->maxVer, tstrerror(code), pReader->idStr); + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { + SReaderStatus* pStatus = &pReader->status; + SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader; + + while(1) { + if (pStatus->pTableIter == NULL) { + pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL); + if (pStatus->pTableIter == NULL) { + return TSDB_CODE_SUCCESS; + } + } else { // let's try next table + pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); + if (pStatus->pTableIter == NULL) { + return TSDB_CODE_SUCCESS; + } + } + + // find the last block that contain the specified block uid + return doLoadRelatedLastBlock(pLastBlockReader, pStatus->pTableIter->uid, pReader); + + //todo check for all empty table + } +} + static int32_t doBuildDataBlock(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); - STableBlockScanInfo* pScanInfo = NULL; SBlock* pBlock = NULL; TSDBKEY key = {0}; + STableBlockScanInfo* pScanInfo = NULL; + SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); + SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; - if (pFBlock != NULL) { - pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); + if (pBlockInfo != NULL) { + pScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); pBlock = getCurrentBlock(pBlockIter); key = getCurrentKeyInBuf(pBlockIter, pReader); + + code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo->uid, pReader); + if (code != TSDB_CODE_SUCCESS) { + // todo handle error + } + + initLastBlockReader(pLastBlockReader, pBlockInfo->uid, &pReader->window, &pReader->verRange, pScanInfo->indexInBlockL); + bool hasData = nextRowInLastBlock(pLastBlockReader); + } else { + ASSERT(pBlockIter->numOfBlocks == 0); } - if (pFBlock == NULL || fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) { - tBlockDataReset(&pStatus->fileBlockData); - tBlockDataClear(&pStatus->fileBlockData); + if (pBlockInfo == NULL || fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, key, pLastBlockReader)) { + if (pBlockInfo != NULL) { + tBlockDataReset(&pStatus->fileBlockData); + code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema); + if (code != TSDB_CODE_SUCCESS) { + //todo + } - code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData); - if (code != TSDB_CODE_SUCCESS) { - return code; + code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } // build composed data block @@ -1962,22 +2244,24 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { SBlockNumber num = {0}; - SArray* pQLastBlock = taosArrayInit(4, sizeof(int32_t)); - int32_t code = moveToNextFile(pReader, &num, pQLastBlock); + int32_t code = moveToNextFile(pReader, &num); if (code != TSDB_CODE_SUCCESS) { return code; } // all data files are consumed, try data in buffer if (num.numOfBlocks + num.numOfLastBlocks == 0) { - taosArrayDestroy(pQLastBlock); pReader->status.loadFromFile = false; return code; } // initialize the block iterator for a new fileset - code = initBlockIterator(pReader, pBlockIter, &num, pQLastBlock); + if (num.numOfBlocks > 0) { + code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks); + } else { + pBlockIter->numOfBlocks = 0; + } // set the correct start position according to the query time window initBlockDumpInfo(pReader, pBlockIter); @@ -1995,6 +2279,38 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { SDataBlockIter* pBlockIter = &pReader->status.blockIter; + if (pBlockIter->numOfBlocks == 0) { + _begin: + code = doLoadLastBlockSequentially(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // all data blocks are check in last file, now let's try the next file + if (pReader->status.pTableIter == NULL) { + code = initForFirstBlockInFile(pReader, pBlockIter); + + // error happens or all the data files are completely checked + if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { + return code; + } + + // this file does not have blocks, let's start check the last block file + if (pBlockIter->numOfBlocks == 0) { + goto _begin; + } + } + + code = doBuildDataBlock(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (pReader->pResBlock->info.rows > 0) { + return TSDB_CODE_SUCCESS; + } + } + while (1) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; @@ -2007,17 +2323,27 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { bool hasNext = blockIteratorNext(&pReader->status.blockIter); if (hasNext) { // check for the next block in the block accessed order list initBlockDumpInfo(pReader, pBlockIter); - } else { // data blocks in current file are exhausted, let's try the next file now + } else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0) { // data blocks in current file are exhausted, let's try the next file now + // todo dump all data in last block if exists. + pBlockIter->numOfBlocks = 0; + taosArrayClear(pBlockIter->blockList); + tBlockDataReset(&pReader->status.fileBlockData); + goto _begin; + } else { code = initForFirstBlockInFile(pReader, pBlockIter); // error happens or all the data files are completely checked if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { return code; } + + // this file does not have blocks, let's start check the last block file + if (pBlockIter->numOfBlocks == 0) { + goto _begin; + } } } - // current block is not loaded yet, or data in buffer may overlap with the file block. code = doBuildDataBlock(pReader); } @@ -2336,7 +2662,6 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn // 3. load the neighbor block, and set it to be the currently accessed file data block tBlockDataReset(&pStatus->fileBlockData); - tBlockDataClear(&pStatus->fileBlockData); int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2386,6 +2711,21 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc return TSDB_CODE_SUCCESS; } +// todo support desc order +int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, int64_t ts, SRowMerger* pMerger) { + while(nextRowInLastBlock(pLastBlockReader)) { + int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); + if (next1 == ts) { + TSDBROW fRow1 = tsdbRowFromBlockData(&pLastBlockReader->lastBlockData, pLastBlockReader->rowIndex); + tRowMerge(pMerger, &fRow1); + } else { + break; + } + } + + return TSDB_CODE_SUCCESS; +} + void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow) { TSDBROW* pNextRow = NULL; @@ -2558,7 +2898,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* return TSDB_CODE_SUCCESS; } -int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex) { +int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex) { int32_t i = 0, j = 0; int32_t outputRowIndex = pResBlock->info.rows; @@ -3021,8 +3361,12 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); tBlockDataReset(&pStatus->fileBlockData); - tBlockDataClear(&pStatus->fileBlockData); - int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData); + int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pBlockScanInfo->uid, pReader->pSchema); + if (code != TSDB_CODE_SUCCESS) { + //todo + } + + code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData); if (code != TSDB_CODE_SUCCESS) { tBlockDataDestroy(&pStatus->fileBlockData, 1); @@ -3118,12 +3462,12 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa SDataBlockIter* pBlockIter = &pStatus->blockIter; pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles; - if (pBlockIter->numOfBlocks.numOfBlocks > 0) { - pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks.numOfBlocks; + if (pBlockIter->numOfBlocks > 0) { + pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks; } pTableBlockInfo->numOfTables = numOfTables; - bool hasNext = (pBlockIter->numOfBlocks.numOfBlocks > 0); + bool hasNext = (pBlockIter->numOfBlocks > 0); while (true) { if (hasNext) { @@ -3154,8 +3498,8 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa break; } - pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks.numOfBlocks; - hasNext = (pBlockIter->numOfBlocks.numOfBlocks > 0); + pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks; + hasNext = (pBlockIter->numOfBlocks > 0); } // tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,