提交 442f8978 编写于 作者: 5 54liuyao

fix(stream): read preversion data for delete

上级 8fb78fd7
......@@ -169,14 +169,14 @@ static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbRe
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
SRowMerger* pMerger);
static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
SRowMerger* pMerger);
SRowMerger* pMerger, SVersionRange* pVerRange);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
int32_t rowIndex);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pRange);
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
......@@ -1052,7 +1052,7 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr)
if (pBlockInfo != NULL) {
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pScanInfo == NULL) {
tsdbError("failed to locate the uid:%"PRIu64" in query table uid list, %s", pBlockInfo->uid, idStr);
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, %s", pBlockInfo->uid, idStr);
return TSDB_CODE_INVALID_PARA;
}
......@@ -1466,7 +1466,8 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
return false;
}
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) {
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
SVersionRange* pVerRange) {
while (1) {
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
if (!hasVal) {
......@@ -1475,7 +1476,8 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
TSDBKEY k = TSDBROW_KEY(&row);
if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) {
if (!hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order,
pVerRange)) {
return true;
}
}
......@@ -1483,7 +1485,7 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo);
bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
if (hasVal) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 != ts) {
......@@ -1602,7 +1604,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
}
if (minKey == k.ts) {
......@@ -1647,7 +1649,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
}
if (minKey == key) {
......@@ -1699,7 +1701,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tRowMerge(&merge, &fRow1);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
......@@ -1717,7 +1719,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return code;
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
ASSERT(mergeBlockData);
// merge with block data if ts == key
......@@ -1771,7 +1773,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tRowMerge(&merge, &fRow1);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
......@@ -1882,7 +1884,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
}
if (minKey == ik.ts) {
......@@ -1901,8 +1903,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline,
&merge, pReader);
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1973,7 +1975,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
}
if (minKey == key) {
......@@ -2095,7 +2097,8 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
}
TSDBKEY k = {.ts = ts, .version = ver};
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order)) {
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
&pReader->verRange)) {
return false;
}
......@@ -2130,7 +2133,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
return false;
}
return nextRowFromLastBlocks(pLBlockReader, pScanInfo);
return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
}
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
......@@ -2225,8 +2228,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pBlockScanInfo == NULL) {
code = TSDB_CODE_INVALID_PARA;
tsdbError("failed to locate the uid:%"PRIu64" in query table uid list, total tables:%d, %s",
pBlockInfo->uid, taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
goto _end;
}
......@@ -2290,7 +2293,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
}
_end:
pResBlock->info.uid = (pBlockScanInfo != NULL)? pBlockScanInfo->uid:0;
pResBlock->info.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
blockDataUpdateTsWindow(pResBlock, 0);
setComposedBlockFlag(pReader, true);
......@@ -2859,7 +2862,7 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_
return (SVersionRange){.minVer = startVer, .maxVer = endVer};
}
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order) {
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
ASSERT(pKey != NULL);
if (pDelList == NULL) {
return false;
......@@ -2887,7 +2890,8 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
return false;
}
if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
pVerRange->maxVer >= pCurrent->version) {
return true;
}
......@@ -2973,7 +2977,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
// it is a valid data version
if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
(!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
(!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
return pRow;
}
......@@ -2992,7 +2996,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
}
if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
(!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
(!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
return pRow;
}
}
......@@ -3130,9 +3134,9 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
}
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
SRowMerger* pMerger) {
SRowMerger* pMerger, SVersionRange* pVerRange) {
pScanInfo->lastKey = ts;
while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo)) {
while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 == ts) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册