diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 2b929cf66414d6f7f52032a6c145a793d4afa6e2..133d6bf769552ff272fb86af0769af660b6eb298 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -61,16 +61,15 @@ typedef struct STableBlockScanInfo { uint64_t uid; TSKEY lastKey; TSKEY lastKeyInStt; // last accessed key in stt - SMapData mapData; // block info (compressed) SArray* pBlockList; // block data index list, SArray - SArray* pDelData; // SArray + SArray* pMemDelData; // SArray + SArray* pfileDelData; // SArray from each file set SIterInfo iter; // mem buffer skip list iterator SIterInfo iiter; // imem buffer skip list iterator SArray* delSkyline; // delete info for this table int32_t fileDelIndex; // file block delete index int32_t lastBlockDelIndex; // delete index for last block bool iterInit; // whether to initialize the in-memory skip list iterator or not - bool skylineBuilt; // load current stt block } STableBlockScanInfo; typedef struct SBlockOrderWrapper { @@ -473,7 +472,7 @@ static void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; pInfo->iterInit = false; - pInfo->skylineBuilt = false; +// pInfo->skylineBuilt = false; pInfo->iter.hasVal = false; pInfo->iiter.hasVal = false; @@ -494,8 +493,6 @@ static void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t static void clearBlockScanInfo(STableBlockScanInfo* p) { p->iterInit = false; - p->skylineBuilt = false; - p->iter.hasVal = false; p->iiter.hasVal = false; @@ -509,7 +506,8 @@ static void clearBlockScanInfo(STableBlockScanInfo* p) { p->delSkyline = taosArrayDestroy(p->delSkyline); p->pBlockList = taosArrayDestroy(p->pBlockList); - p->pDelData = taosArrayDestroy(p->pDelData); + p->pMemDelData = taosArrayDestroy(p->pMemDelData); + p->pfileDelData = taosArrayDestroy(p->pfileDelData); } static void destroyAllBlockScanInfo(SSHashObj* pTableMap) { @@ -566,19 +564,6 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA pLReader->uid = 0; tMergeTreeClose(&pLReader->mergeTree); - -// if (pLReader->pInfo == NULL) { -// // here we ignore the first column, which is always be the primary timestamp column -// SBlockLoadSuppInfo* pInfo = &pReader->suppInfo; -// // todo dynamic number of stt -// int32_t numOfStt = pReader->pTsdb->pVnode->config.sttTrigger; -// pLReader->pInfo = tCreateLastBlockLoadInfo(pReader->pSchema, &pInfo->colId[1], pInfo->numOfCols - 1, numOfStt); -// if (pLReader->pInfo == NULL) { -// tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr); -// return terrno; -// } -// } - tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr); return TSDB_CODE_SUCCESS; } @@ -923,11 +908,6 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead ASSERT(pBrinBlk->minTbid.suid <= pReader->suid && pBrinBlk->maxTbid.suid >= pReader->suid); - // if (pBrinBlk->maxTbid.uid < pList->tableUidList[j]) { - // i += 1; - // continue; - // } - // this block belongs to a table that is not queried. STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pList->tableUidList[j], pReader->idStr); @@ -967,6 +947,8 @@ _end: static void doCleanupTableScanInfo(STableBlockScanInfo* pScanInfo) { // reset the index in last block when handing a new file taosArrayClear(pScanInfo->pBlockList); + taosArrayClear(pScanInfo->delSkyline); // built delete skyline for each fileset + taosArrayClear(pScanInfo->pfileDelData); // del data from each file set } static void cleanupTableScanInfo(SReaderStatus* pStatus) { @@ -1033,7 +1015,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN cleanupTableScanInfo(&pReader->status); // set the flag for the new file - int32_t i = 0, k = 0; + int32_t k = 0; int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; @@ -2721,8 +2703,8 @@ int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScan } static void doLoadMemTombData(STableBlockScanInfo* pScanInfo, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) { - if (pScanInfo->pDelData == NULL) { - pScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData)); + if (pScanInfo->pMemDelData == NULL) { + pScanInfo->pMemDelData = taosArrayInit(4, sizeof(SDelData)); } SDelData* p = NULL; @@ -2730,7 +2712,7 @@ static void doLoadMemTombData(STableBlockScanInfo* pScanInfo, STbData* pMemTbDat p = pMemTbData->pHead; while (p) { if (p->version <= ver) { - taosArrayPush(pScanInfo->pDelData, p); + taosArrayPush(pScanInfo->pMemDelData, p); } p = p->pNext; @@ -2741,7 +2723,7 @@ static void doLoadMemTombData(STableBlockScanInfo* pScanInfo, STbData* pMemTbDat p = piMemTbData->pHead; while (p) { if (p->version <= ver) { - taosArrayPush(pScanInfo->pDelData, p); + taosArrayPush(pScanInfo->pMemDelData, p); } p = p->pNext; } @@ -2811,7 +2793,6 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t suid, uint64_t uid, int64_t maxVer) { STombRecord record = {0}; - for (int32_t j = 0; j < pBlock->suid->size; ++j) { int32_t code = tTombBlockGet(pBlock, j, &record); if (code != TSDB_CODE_SUCCESS) { @@ -2822,13 +2803,13 @@ static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t continue; } - if (record.uid != uid) { - continue; + if (record.suid > suid || (record.suid == suid && record.uid > uid)) { + break; } -// if (record.uid > uid) { -// break; -// } + if (record.uid < uid) { + continue; + } if (record.version <= maxVer) { SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; @@ -2839,20 +2820,18 @@ static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t return TSDB_CODE_SUCCESS; } -static int32_t loadTombRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t suid, STableBlockScanInfo* pBlockScanInfo, - uint64_t maxVer) { +static int32_t loadTombRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t suid, + STableBlockScanInfo* pBlockScanInfo, uint64_t maxVer) { int32_t size = taosArrayGetSize(pLDataIterList); if (size <= 0) { return TSDB_CODE_SUCCESS; } uint64_t uid = pBlockScanInfo->uid; - if (pBlockScanInfo->pDelData == NULL) { - pBlockScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData)); + if (pBlockScanInfo->pfileDelData == NULL) { + pBlockScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData)); } - pBlockScanInfo->skylineBuilt = false; - for(int32_t i = 0; i < size; ++i) { SArray* pLeveledLDataIter = taosArrayGetP(pLDataIterList, i); @@ -2869,7 +2848,7 @@ static int32_t loadTombRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t s for (int32_t k = 0; k < numOfBlocks; ++k) { STombBlock* pBlock = taosArrayGetP(pTombBlockArray, k); - int32_t code = checkTombBlockRecords(pBlockScanInfo->pDelData, pBlock, suid, uid, maxVer); + int32_t code = checkTombBlockRecords(pBlockScanInfo->pfileDelData, pBlock, suid, uid, maxVer); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2880,6 +2859,93 @@ static int32_t loadTombRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t s return TSDB_CODE_SUCCESS; } +static int32_t loadTombRecordsFromDataFiles(STsdbReader* pReader, int32_t numOfTables) { + if (pReader->status.pCurrentFileset == NULL) { + return TSDB_CODE_SUCCESS; + } + + STFileObj* pTombFileObj = pReader->status.pCurrentFileset->farr[3]; + if (pTombFileObj == NULL) { + return TSDB_CODE_SUCCESS; + } + + const TTombBlkArray* pBlkArray = NULL; + + int32_t code = tsdbDataFileReadTombBlk(pReader->pFileReader, &pBlkArray); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // todo find the correct start position. + int32_t i = 0, j = 0; + while (i < pBlkArray->size && j < numOfTables) { + STombBlock block = {0}; + code = tsdbDataFileReadTombBlock(pReader->pFileReader, &pBlkArray->data[i], &block); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + uint64_t uid = pReader->status.uidList.tableUidList[j]; + + STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); + if (pScanInfo->pfileDelData == NULL) { + pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData)); + } + + STombRecord record = {0}; + for (int32_t k = 0; k < TARRAY2_SIZE(block.suid); ++k) { + code = tTombBlockGet(&block, k, &record); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (record.suid < pReader->suid) { + continue; + } + + if (record.suid > pReader->suid) { + tTombBlockDestroy(&block); + return TSDB_CODE_SUCCESS; + } + + ASSERT(record.suid == pReader->suid); + if (record.uid < uid) { + continue; + } + + bool newTable = false; + while (uid < record.uid && j < (numOfTables - 1)) { + j += 1; + uid = pReader->status.uidList.tableUidList[j]; + newTable = true; + } + + if (uid != record.uid) { + tTombBlockDestroy(&block); + return TSDB_CODE_SUCCESS; + } else { + if (newTable) { + pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); + if (pScanInfo->pfileDelData == NULL) { + pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData)); + } + } + } + + ASSERT(record.uid == uid); + if (record.version <= pReader->verRange.maxVer) { + SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; + taosArrayPush(pScanInfo->pfileDelData, &delData); + } + } + + i += 1; + tTombBlockDestroy(&block); + } + + return TSDB_CODE_SUCCESS; +} + static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { // the last block reader has been initialized for this table. if (pLBlockReader->uid == pScanInfo->uid) { @@ -3014,7 +3080,6 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock int32_t code = TSDB_CODE_SUCCESS; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; int32_t nextIndex = -1; -// SBlockIndex nxtBIndex = {0}; *loadNeighbor = false; @@ -3193,23 +3258,25 @@ int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) { int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { int32_t code = 0; - if (pBlockScanInfo->skylineBuilt) { + int32_t newDelDataInFile = taosArrayGetSize(pBlockScanInfo->pfileDelData); + if (newDelDataInFile == 0) { return code; } - int32_t numOfElems = taosArrayGetSize(pBlockScanInfo->pDelData); - - if (numOfElems > 0) { + int32_t delInFile = taosArrayGetSize(pBlockScanInfo->pfileDelData); + if (delInFile > 0) { if (pBlockScanInfo->delSkyline != NULL) { taosArrayClear(pBlockScanInfo->delSkyline); } else { pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY)); } - - code = tsdbBuildDeleteSkyline(pBlockScanInfo->pDelData, 0, numOfElems - 1, pBlockScanInfo->delSkyline); } - pBlockScanInfo->skylineBuilt = true; + taosArrayAddAll(pBlockScanInfo->pfileDelData, pBlockScanInfo->pMemDelData); + int32_t total = taosArrayGetSize(pBlockScanInfo->pfileDelData); + code = tsdbBuildDeleteSkyline(pBlockScanInfo->pfileDelData, 0, total - 1, pBlockScanInfo->delSkyline); + + taosArrayClear(pBlockScanInfo->pfileDelData); int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, pReader->order); pBlockScanInfo->iter.index = index; @@ -3222,8 +3289,6 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { bool asc = ASCENDING_TRAVERSE(pReader->order); - // TSKEY initialVal = asc? TSKEY_MIN:TSKEY_MAX; - TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}, ikey = {.ts = TSKEY_INITIAL_VAL}; bool hasKey = false, hasIKey = false; @@ -3305,71 +3370,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr } taosArrayDestroy(pIndexList); - - if (pReader->status.pCurrentFileset != NULL) { - STFileObj* pTombFileObj = pReader->status.pCurrentFileset->farr[3]; - - if (pTombFileObj != NULL) { - const TTombBlkArray* pBlkArray = NULL; - - int32_t code = tsdbDataFileReadTombBlk(pReader->pFileReader, &pBlkArray); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - int32_t i = 0, j = 0; - - // todo find the correct start position. - - while (i < pBlkArray->size && j < numOfTables) { - STombBlock block = {0}; - code = tsdbDataFileReadTombBlock(pReader->pFileReader, &pBlkArray->data[i], &block); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - uint64_t uid = pReader->status.uidList.tableUidList[j]; - - STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); - if (pScanInfo->pDelData == NULL) { - pScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData)); - } - - STombRecord record = {0}; - for (int32_t k = 0; k < block.suid->size; ++k) { - code = tTombBlockGet(&block, k, &record); - - { - while (record.uid > uid) { - j += 1; - uid = pReader->status.uidList.tableUidList[j]; - pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); - if (pScanInfo->pDelData == NULL) { - pScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData)); - } - } - - if (record.uid < uid) { - continue; - } - } - - ASSERT(record.suid == pReader->suid); - - if (record.version <= pReader->verRange.maxVer) { - SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; - taosArrayPush(pScanInfo->pDelData, &delData); - pScanInfo->skylineBuilt = false; - } - } - - i += 1; - tTombBlockDestroy(&block); - } - } - } - - return TSDB_CODE_SUCCESS; + return loadTombRecordsFromDataFiles(pReader, numOfTables); } static void resetTableListIndex(SReaderStatus* pStatus) { @@ -3605,13 +3606,11 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade } STableBlockScanInfo* pScanInfo = *p; - tMapDataReset(&pScanInfo->mapData); - SDataBlk block = {0}; - for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { - tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block); - pReader->rowsNum += block.nRow; - } +// for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { +// tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block); +// pReader->rowsNum += block.nRow; +// } } _end: @@ -4977,6 +4976,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { } pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); + pInfo->pfileDelData = taosArrayDestroy(pInfo->pfileDelData); } } else { // resetDataBlockScanInfo excluding lastKey @@ -5009,7 +5009,6 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { // reset current current table's data block scan info, pBlockScanInfo->iterInit = false; - pBlockScanInfo->skylineBuilt = false; pBlockScanInfo->iter.hasVal = false; pBlockScanInfo->iiter.hasVal = false; @@ -5022,7 +5021,6 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { } pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList); - tMapDataClear(&pBlockScanInfo->mapData); // TODO: keep skyline for reuse pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline); } @@ -5343,16 +5341,14 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, } int64_t st = taosGetTimestampUs(); - TARRAY2_CLEAR(&pSup->colAggArray, 0); -// if (pFBlock->record.smaSize > 0) { - code = tsdbDataFileReadBlockSma(pReader->pFileReader, &pFBlock->record, &pSup->colAggArray); - if (code != TSDB_CODE_SUCCESS) { - tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code), - pReader->idStr); - return code; - } + code = tsdbDataFileReadBlockSma(pReader->pFileReader, &pFBlock->record, &pSup->colAggArray); + if (code != TSDB_CODE_SUCCESS) { + tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64 ", code:%s, %s", 0, pFBlock->uid, tstrerror(code), + pReader->idStr); + return code; + } if (pSup->colAggArray.size > 0) { *allHave = true; @@ -5394,7 +5390,6 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, int32_t i = 0, j = 0; while (j < numOfCols && i < size) { -// SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i); SColumnDataAgg* pAgg = &pSup->colAggArray.data[i]; if (pAgg->colId == pSup->colId[j]) { pResBlock->pBlockAgg[pSup->slotId[j]] = pAgg;