diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 7014f9517efc486fb0260e7c7ef60540ec39d622..2b929cf66414d6f7f52032a6c145a793d4afa6e2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -70,6 +70,7 @@ typedef struct STableBlockScanInfo { int32_t fileDelIndex; // file block delete index int32_t lastBlockDelIndex; // delete index for last block bool iterInit; // whether to initialize the in-memory skip list iterator or not + bool skylineBuilt; // load current stt block } STableBlockScanInfo; typedef struct SBlockOrderWrapper { @@ -253,8 +254,7 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScan static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader); -static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, - STbData* piMemTbData); +static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader); static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr, int8_t* pLevel); static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); @@ -473,6 +473,8 @@ static void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; pInfo->iterInit = false; + pInfo->skylineBuilt = false; + pInfo->iter.hasVal = false; pInfo->iiter.hasVal = false; @@ -492,6 +494,7 @@ static void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t static void clearBlockScanInfo(STableBlockScanInfo* p) { p->iterInit = false; + p->skylineBuilt = false; p->iter.hasVal = false; p->iiter.hasVal = false; @@ -2717,20 +2720,47 @@ int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScan return code; } +static void doLoadMemTombData(STableBlockScanInfo* pScanInfo, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) { + if (pScanInfo->pDelData == NULL) { + pScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData)); + } + + SDelData* p = NULL; + if (pMemTbData != NULL) { + p = pMemTbData->pHead; + while (p) { + if (p->version <= ver) { + taosArrayPush(pScanInfo->pDelData, p); + } + + p = p->pNext; + } + } + + if (piMemTbData != NULL) { + p = piMemTbData->pHead; + while (p) { + if (p->version <= ver) { + taosArrayPush(pScanInfo->pDelData, p); + } + p = p->pNext; + } + } +} + static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { if (pBlockScanInfo->iterInit) { return TSDB_CODE_SUCCESS; } - TSDBKEY startKey = {0}; + STbData* d = NULL; + TSDBKEY startKey = {0}; if (ASCENDING_TRAVERSE(pReader->order)) { startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->verRange.minVer}; } else { startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->verRange.maxVer}; } - STbData* d = NULL; - int32_t code = doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pReader->pReadSnap->pMem, &pBlockScanInfo->iter, "mem"); if (code != TSDB_CODE_SUCCESS) { @@ -2744,9 +2774,7 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea return code; } - int64_t st = taosGetTimestampUs(); - initDelSkylineIterator(pBlockScanInfo, pReader, d, di); - pReader->cost.initDelSkylineIterTime += (taosGetTimestampUs() - st) / 1000.0; + doLoadMemTombData(pBlockScanInfo, d, di, pReader->verRange.maxVer); pBlockScanInfo->iterInit = true; return TSDB_CODE_SUCCESS; @@ -2794,14 +2822,13 @@ static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t continue; } - // todo use binary search instead here - if (record.uid < uid) { + if (record.uid != uid) { continue; } - if (record.uid > uid) { - break; - } +// if (record.uid > uid) { +// break; +// } if (record.version <= maxVer) { SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; @@ -2812,7 +2839,7 @@ static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t return TSDB_CODE_SUCCESS; } -static int32_t loadTomRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t suid, STableBlockScanInfo* pBlockScanInfo, +static int32_t loadTombRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t suid, STableBlockScanInfo* pBlockScanInfo, uint64_t maxVer) { int32_t size = taosArrayGetSize(pLDataIterList); if (size <= 0) { @@ -2824,6 +2851,8 @@ static int32_t loadTomRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t su pBlockScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData)); } + pBlockScanInfo->skylineBuilt = false; + for(int32_t i = 0; i < size; ++i) { SArray* pLeveledLDataIter = taosArrayGetP(pLDataIterList, i); @@ -2880,12 +2909,15 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan return false; } - code = loadTomRecordInfoFromSttFiles(pReader->status.pLDataIterArray, pReader->suid, pScanInfo, pReader->verRange.maxVer); + code = loadTombRecordInfoFromSttFiles(pReader->status.pLDataIterArray, pReader->suid, pScanInfo, pReader->verRange.maxVer); if (code != TSDB_CODE_SUCCESS) { return false; } initMemDataIterator(pScanInfo, pReader); + + // todo: del tomb order problem + initDelSkylineIterator(pScanInfo, pReader); return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange); } @@ -3159,47 +3191,25 @@ int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) { return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1; } -int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, - STbData* piMemTbData) { - if (pBlockScanInfo->delSkyline != NULL) { - return TSDB_CODE_SUCCESS; - } - +int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { int32_t code = 0; - - if (pBlockScanInfo->pDelData == NULL) { - pBlockScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData)); + if (pBlockScanInfo->skylineBuilt) { + return code; } - SDelData* p = NULL; - if (pMemTbData != NULL) { - p = pMemTbData->pHead; - while (p) { - if (p->version <= pReader->verRange.maxVer) { - taosArrayPush(pBlockScanInfo->pDelData, p); - } - - p = p->pNext; - } - } + int32_t numOfElems = taosArrayGetSize(pBlockScanInfo->pDelData); - if (piMemTbData != NULL) { - p = piMemTbData->pHead; - while (p) { - if (p->version <= pReader->verRange.maxVer) { - taosArrayPush(pBlockScanInfo->pDelData, p); - } - p = p->pNext; + if (numOfElems > 0) { + if (pBlockScanInfo->delSkyline != NULL) { + taosArrayClear(pBlockScanInfo->delSkyline); + } else { + pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY)); } - } - int32_t numOfElems = taosArrayGetSize(pBlockScanInfo->pDelData); - if (numOfElems > 0) { - pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY)); code = tsdbBuildDeleteSkyline(pBlockScanInfo->pDelData, 0, numOfElems - 1, pBlockScanInfo->delSkyline); } - pBlockScanInfo->pDelData = taosArrayDestroy(pBlockScanInfo->pDelData); + pBlockScanInfo->skylineBuilt = true; int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, pReader->order); pBlockScanInfo->iter.index = index; @@ -3208,10 +3218,6 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pBlockScanInfo->lastBlockDelIndex = index; return code; - -// _err: -// taosArrayDestroy(pBlockScanInfo->pDelData); -// return code; } TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { @@ -3322,7 +3328,8 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr return code; } - uint64_t uid = pReader->status.uidList.tableUidList[j]; + uint64_t uid = pReader->status.uidList.tableUidList[j]; + STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); if (pScanInfo->pDelData == NULL) { pScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData)); @@ -3352,6 +3359,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr if (record.version <= pReader->verRange.maxVer) { SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; taosArrayPush(pScanInfo->pDelData, &delData); + pScanInfo->skylineBuilt = false; } } @@ -3728,6 +3736,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { } initMemDataIterator(*pBlockScanInfo, pReader); + initDelSkylineIterator(*pBlockScanInfo, pReader); int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN; int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey); @@ -5000,6 +5009,8 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { // reset current current table's data block scan info, pBlockScanInfo->iterInit = false; + pBlockScanInfo->skylineBuilt = false; + pBlockScanInfo->iter.hasVal = false; pBlockScanInfo->iiter.hasVal = false; if (pBlockScanInfo->iter.iter != NULL) {