diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index cafa232462e3e48316a0ec96f6e361244fe07960..f90969bcaa18b32b0cfb73f6d4ab1c1e58a8e41c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -967,14 +967,53 @@ static void cleanupTableScanInfo(SReaderStatus* pStatus) { doCleanupTableScanInfo(*px); } +} + +typedef struct SBrinRecordIter { + SArray* pBrinBlockList; + SBrinBlk* pCurrentBlk; + int32_t blockIndex; + int32_t recordIndex; + SDataFileReader* pReader; + SBrinBlock block; + SBrinRecord record; +} SBrinRecordIter; + +void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray* pList) { + memset(&pIter->block, 0, sizeof(SBrinBlock)); + memset(&pIter->record, 0, sizeof(SBrinRecord)); + pIter->blockIndex = -1; + pIter->recordIndex = -1; + + pIter->pReader = pReader; + pIter->pBrinBlockList = pList; +} + +SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) { + if (pIter->blockIndex == -1 || (pIter->recordIndex + 1) >= TARRAY2_SIZE(pIter->block.numRow)) { + pIter->blockIndex += 1; + if (pIter->blockIndex >= taosArrayGetSize(pIter->pBrinBlockList)) { + return NULL; + } + + pIter->pCurrentBlk = taosArrayGet(pIter->pBrinBlockList, pIter->blockIndex); + + tBrinBlockClear(&pIter->block); + tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block); + pIter->recordIndex = -1; + } -// pStatus->mapDataCleaned = true; + pIter->recordIndex += 1; + tBrinBlockGet(&pIter->block, pIter->recordIndex, &pIter->record); + return &pIter->record; +} + +void clearBrinBlockIter(SBrinRecordIter* pIter) { + tBrinBlockDestroy(&pIter->block); } static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, SArray* pTableScanInfoList) { size_t sizeInDisk = 0; - size_t numOfBlocks = taosArrayGetSize(pIndexList); - int64_t st = taosGetTimestampUs(); cleanupTableScanInfo(&pReader->status); @@ -982,26 +1021,40 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN int32_t i = 0, k = 0; int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); - while(i < numOfBlocks && k < numOfTables) { - SBrinBlk* pBlk = taosArrayGet(pIndexList, i); - uint64_t uid = pReader->status.uidList.tableUidList[k]; + int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; + STimeWindow w = pReader->window; + + SBrinRecordIter iter = {0}; + initBrinRecordIter(&iter, pReader->pFileReader, pIndexList); + SBrinRecord* pRecord = NULL; - SBrinBlock block = {0}; - tsdbDataFileReadBrinBlock(pReader->pFileReader, pBlk, &block); + while (k < numOfTables) { + pRecord = getNextBrinRecord(&iter); -// tMapDataReset(&pScanInfo->mapData); -// tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); -// taosArrayEnsureCap(pScanInfo->pBlockList, pScanInfo->mapData.nItem); + uint64_t uid = pReader->status.uidList.tableUidList[k]; + if (pRecord == NULL || pRecord->suid > pReader->suid) { + break; + } -// todo set the correct size - sizeInDisk += 0;//pScanInfo->mapData.nData; + if (pRecord->suid < pReader->suid) { + continue; + } - int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; - STimeWindow w = pReader->window; - if (isEmptyQueryTimeWindow(&w)) { + ASSERT(pRecord->suid == pReader->suid); + if (pRecord->uid < uid) { continue; } + while (pRecord->uid > uid && k < numOfTables) { + k += 1; + } + + if (k >= numOfTables) { + break; + } + + ASSERT(pRecord->suid == pReader->suid && uid == pRecord->uid); + STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); if (ASCENDING_TRAVERSE(pReader->order)) { w.skey = pScanInfo->lastKey + step; @@ -1009,69 +1062,39 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN w.ekey = pScanInfo->lastKey + step; } - SBrinRecord record = {0}; - for (int32_t j = 0; j < TARRAY2_SIZE(block.numRow); ++j) { - tBrinBlockGet(&block, j, &record); - if (record.suid < pReader->suid) { - continue; - } - - if (record.suid > pReader->suid) { - break; - } - - { - while (record.uid > uid && (k + 1) < numOfTables) { - k += 1; - uid = pReader->status.uidList.tableUidList[k]; - pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); - - if (ASCENDING_TRAVERSE(pReader->order)) { - w.skey = pScanInfo->lastKey + step; - } else { - w.ekey = pScanInfo->lastKey + step; - } - } - - if (k >= numOfTables) { - break; - } - - if (record.uid < uid) { - continue; - } - } - - ASSERT(record.suid == pReader->suid); - - // 1. time range check - if (record.firstKey > w.ekey || record.lastKey < w.skey) { - continue; - } - - // 2. version range check - if (record.firstKeyVer > pReader->verRange.maxVer || record.lastKeyVer < pReader->verRange.minVer) { - continue; - } + if (isEmptyQueryTimeWindow(&w)) { + k += 1; + continue; + } -// SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = record.blockOffset}; -// bIndex.window = (STimeWindow){.skey = record.firstKey, .ekey = record.lastKey}; - void* p1 = taosArrayPush(pScanInfo->pBlockList, &record); - if (p1 == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } + // 1. time range check + if (pRecord->firstKey > w.ekey || pRecord->lastKey < w.skey) { + continue; + } - pBlockNum->numOfBlocks += 1; + // 2. version range check + if (pRecord->firstKeyVer > pReader->verRange.maxVer || pRecord->lastKeyVer < pReader->verRange.minVer) { + continue; } - i += 1; + void* p1 = taosArrayPush(pScanInfo->pBlockList, pRecord); + if (p1 == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } - STableBlockScanInfo** p = taosArrayGetLast(pTableScanInfoList); - if ((p == NULL || (*p)->uid != uid) && taosArrayGetSize(pScanInfo->pBlockList) > 0) { + pBlockNum->numOfBlocks += 1; + if (taosArrayGetSize(pTableScanInfoList) == 0) { taosArrayPush(pTableScanInfoList, &pScanInfo); + } else { + STableBlockScanInfo** p = taosArrayGetLast(pTableScanInfoList); + if ((*p)->uid != uid) { + taosArrayPush(pTableScanInfoList, &pScanInfo); + } } } + clearBrinBlockIter(&iter); + pBlockNum->numOfLastFiles = pReader->status.pCurrentFileset->lvlArr->size; int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks; @@ -1336,7 +1359,6 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { SBlockData* pBlockData = &pStatus->fileBlockData; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); -// SDataBlk* pBlock = getCurrentBlock(pBlockIter); SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; int32_t numOfOutputCols = pSupInfo->numOfCols; int32_t code = TSDB_CODE_SUCCESS;