From 442f89783f16ce34510ef47ff39bf5db22692cb7 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 27 Oct 2022 10:55:32 +0800 Subject: [PATCH] fix(stream): read preversion data for delete --- source/dnode/vnode/src/tsdb/tsdbRead.c | 62 ++++++++++++++------------ 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index f83755fc4f..e05448b444 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -111,7 +111,7 @@ typedef struct SDataBlockIter { int32_t index; SArray* blockList; // SArray int32_t order; - SDataBlk block; // current SDataBlk data + SDataBlk block; // current SDataBlk data SHashObj* pTableMap; } SDataBlockIter; @@ -169,14 +169,14 @@ static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbRe static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger); static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, - SRowMerger* pMerger); + SRowMerger* pMerger, SVersionRange* pVerRange); static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid); static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex); static void setComposedBlockFlag(STsdbReader* pReader, bool composed); -static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); +static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pRange); static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow); @@ -1052,7 +1052,7 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) if (pBlockInfo != NULL) { STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); if (pScanInfo == NULL) { - tsdbError("failed to locate the uid:%"PRIu64" in query table uid list, %s", pBlockInfo->uid, idStr); + tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr); return TSDB_CODE_INVALID_PARA; } @@ -1466,7 +1466,8 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB return false; } -static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) { +static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo, + SVersionRange* pVerRange) { while (1) { bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); if (!hasVal) { @@ -1475,7 +1476,8 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBKEY k = TSDBROW_KEY(&row); - if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) { + if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, + pVerRange)) { return true; } } @@ -1483,7 +1485,7 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) { - bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo); + bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange); if (hasVal) { int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); if (next1 != ts) { @@ -1602,7 +1604,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange); } if (minKey == k.ts) { @@ -1647,7 +1649,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange); } if (minKey == key) { @@ -1699,7 +1701,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); tRowMerge(&merge, &fRow1); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange); code = tRowMergerGetRow(&merge, &pTSRow); if (code != TSDB_CODE_SUCCESS) { @@ -1717,7 +1719,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, return code; } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange); ASSERT(mergeBlockData); // merge with block data if ts == key @@ -1771,7 +1773,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); tRowMerge(&merge, &fRow1); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange); code = tRowMergerGetRow(&merge, &pTSRow); if (code != TSDB_CODE_SUCCESS) { @@ -1882,7 +1884,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange); } if (minKey == ik.ts) { @@ -1901,8 +1903,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } - code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, - &merge, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, + pReader); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1973,7 +1975,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange); } if (minKey == key) { @@ -1993,7 +1995,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } - if (merge.pTSchema == NULL) { + if (merge.pTSchema == NULL) { return code; } @@ -2095,7 +2097,8 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum } TSDBKEY k = {.ts = ts, .version = ver}; - if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order)) { + if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order, + &pReader->verRange)) { return false; } @@ -2130,7 +2133,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan return false; } - return nextRowFromLastBlocks(pLBlockReader, pScanInfo); + return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange); } static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { @@ -2225,8 +2228,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); if (pBlockScanInfo == NULL) { code = TSDB_CODE_INVALID_PARA; - tsdbError("failed to locate the uid:%"PRIu64" in query table uid list, total tables:%d, %s", - pBlockInfo->uid, taosHashGetSize(pReader->status.pTableMap), pReader->idStr); + tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, + taosHashGetSize(pReader->status.pTableMap), pReader->idStr); goto _end; } @@ -2290,7 +2293,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } _end: - pResBlock->info.uid = (pBlockScanInfo != NULL)? pBlockScanInfo->uid:0; + pResBlock->info.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0; blockDataUpdateTsWindow(pResBlock, 0); setComposedBlockFlag(pReader, true); @@ -2859,7 +2862,7 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_ return (SVersionRange){.minVer = startVer, .maxVer = endVer}; } -bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order) { +bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) { ASSERT(pKey != NULL); if (pDelList == NULL) { return false; @@ -2887,7 +2890,8 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32 return false; } - if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) { + if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version && + pVerRange->maxVer >= pCurrent->version) { return true; } @@ -2973,7 +2977,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p // it is a valid data version if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) && - (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) { + (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) { return pRow; } @@ -2992,7 +2996,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p } if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer && - (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) { + (!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) { return pRow; } } @@ -3130,9 +3134,9 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc } int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, - SRowMerger* pMerger) { + SRowMerger* pMerger, SVersionRange* pVerRange) { pScanInfo->lastKey = ts; - while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo)) { + while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) { int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); if (next1 == ts) { TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); @@ -3772,7 +3776,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) { STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid)); - if (pBlockScanInfo == NULL) { // no data block for the table of given uid + if (pBlockScanInfo == NULL) { // no data block for the table of given uid return false; } -- GitLab