提交 79046869 编写于 作者: H Haojun Liao

fix(query): support delete in last

上级 20873759
...@@ -166,7 +166,7 @@ static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i ...@@ -166,7 +166,7 @@ static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i
static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
SRowMerger* pMerger); SRowMerger* pMerger);
static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, int64_t ts, SRowMerger* pMerger); static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
STsdbReader* pReader); STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
...@@ -1441,7 +1441,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf ...@@ -1441,7 +1441,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
init = true; init = true;
tRowMergerInit(&merge, &fRow1, pReader->pSchema); tRowMergerInit(&merge, &fRow1, pReader->pSchema);
} }
doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
} }
if (minKey == k.ts) { if (minKey == k.ts) {
...@@ -1470,7 +1470,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf ...@@ -1470,7 +1470,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
init = true; init = true;
tRowMergerInit(&merge, &fRow1, pReader->pSchema); tRowMergerInit(&merge, &fRow1, pReader->pSchema);
} }
doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
} }
if (minKey == key) { if (minKey == key) {
...@@ -1641,7 +1641,7 @@ static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo ...@@ -1641,7 +1641,7 @@ static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo
init = true; init = true;
tRowMergerInit(&merge, &fRow1, pReader->pSchema); tRowMergerInit(&merge, &fRow1, pReader->pSchema);
} }
doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
} }
if (minKey == ik.ts) { if (minKey == ik.ts) {
...@@ -1691,7 +1691,7 @@ static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo ...@@ -1691,7 +1691,7 @@ static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo
init = true; init = true;
tRowMergerInit(&merge, &fRow1, pReader->pSchema); tRowMergerInit(&merge, &fRow1, pReader->pSchema);
} }
doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
} }
if (minKey == key) { if (minKey == key) {
...@@ -1904,7 +1904,7 @@ static void setAllRowsChecked(SLastBlockReader *pLastBlockReader) { ...@@ -1904,7 +1904,7 @@ static void setAllRowsChecked(SLastBlockReader *pLastBlockReader) {
*pLastBlockReader->rowIndex = ALL_ROWS_CHECKED_INDEX; *pLastBlockReader->rowIndex = ALL_ROWS_CHECKED_INDEX;
} }
static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) { static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) {
int32_t step = (pLastBlockReader->order == TSDB_ORDER_ASC) ? 1 : -1; int32_t step = (pLastBlockReader->order == TSDB_ORDER_ASC) ? 1 : -1;
if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) { if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) {
return false; return false;
...@@ -1918,21 +1918,28 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) { ...@@ -1918,21 +1918,28 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) {
continue; continue;
} }
if (pBlockData->aTSKEY[i] < pLastBlockReader->window.skey) { int64_t ts = pBlockData->aTSKEY[i];
if (ts < pLastBlockReader->window.skey) {
continue; continue;
} }
if (pBlockData->aVersion[i] < pLastBlockReader->verRange.minVer) { int64_t ver = pBlockData->aVersion[i];
if (ver < pLastBlockReader->verRange.minVer) {
continue; continue;
} }
// no data any more, todo opt handle desc case // no data any more, todo opt handle desc case
if (pBlockData->aTSKEY[i] > pLastBlockReader->window.ekey) { if (ts > pLastBlockReader->window.ekey) {
continue; continue;
} }
// todo opt handle desc case // todo opt handle desc case
if (pBlockData->aVersion[i] > pLastBlockReader->verRange.maxVer) { if (ver > pLastBlockReader->verRange.maxVer) {
continue;
}
TSDBKEY k = {.ts = ts, .version = ver};
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pLastBlockReader->order)) {
continue; continue;
} }
...@@ -1958,7 +1965,8 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { ...@@ -1958,7 +1965,8 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) {
} }
// todo refactor // todo refactor
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader *pLastBlockReader) { static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t key = (pBlockData->nRow > 0)? pBlockData->aTSKEY[pDumpInfo->rowIndex]:INT64_MIN; int64_t key = (pBlockData->nRow > 0)? pBlockData->aTSKEY[pDumpInfo->rowIndex]:INT64_MIN;
...@@ -2011,7 +2019,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -2011,7 +2019,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow1, pReader->pSchema); tRowMergerInit(&merge, &fRow1, pReader->pSchema);
doMergeRowsInLastBlock(pLastBlockReader, ts, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
tRowMergerGetRow(&merge, &pTSRow); tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
...@@ -2025,7 +2033,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -2025,7 +2033,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
tRowMergerInit(&merge, &fRow, pReader->pSchema); tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInLastBlock(pLastBlockReader, ts, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
tRowMergerGetRow(&merge, &pTSRow); tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
...@@ -2061,7 +2069,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -2061,7 +2069,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema); tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInLastBlock(pLastBlockReader, tsLastBlock, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
tRowMergerGetRow(&merge, &pTSRow); tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
...@@ -2085,10 +2093,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { ...@@ -2085,10 +2093,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
pBlockScanInfo = pReader->status.pTableIter; pBlockScanInfo = pReader->status.pTableIter;
} }
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
// initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pBlockScanInfo->indexInBlockL);
// bool has = nextRowInLastBlock(pLastBlockReader); // todo handle if no data in last block
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
...@@ -2439,7 +2444,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2439,7 +2444,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL);
int32_t index = pScanInfo->indexInBlockL; int32_t index = pScanInfo->indexInBlockL;
if (index == DEFAULT_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) { if (index == DEFAULT_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) {
bool hasData = nextRowInLastBlock(pLastBlockReader); bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo);
if (!hasData) { // current table does not have rows in last block, try next table if (!hasData) { // current table does not have rows in last block, try next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
if (pStatus->pTableIter == NULL) { if (pStatus->pTableIter == NULL) {
...@@ -2504,13 +2509,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ...@@ -2504,13 +2509,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return code; return code;
} }
if (pLastBlockReader->currentBlockIndex == -1) { // note: the lastblock may be null here
// ASSERT(0);
}
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL);
if (pScanInfo->indexInBlockL == DEFAULT_ROW_INDEX_VAL || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) { if (pScanInfo->indexInBlockL == DEFAULT_ROW_INDEX_VAL || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) {
bool hasData = nextRowInLastBlock(pLastBlockReader); bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo);
} }
} }
...@@ -3034,8 +3036,8 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc ...@@ -3034,8 +3036,8 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
} }
// todo check if the rows are dropped or not // todo check if the rows are dropped or not
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, int64_t ts, SRowMerger* pMerger) { int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger) {
while(nextRowInLastBlock(pLastBlockReader)) { while(nextRowInLastBlock(pLastBlockReader, pScanInfo)) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 == ts) { if (next1 == ts) {
TSDBROW fRow1 = tsdbRowFromBlockData(&pLastBlockReader->lastBlockData, *pLastBlockReader->rowIndex); TSDBROW fRow1 = tsdbRowFromBlockData(&pLastBlockReader->lastBlockData, *pLastBlockReader->rowIndex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册