提交 8eb86676 编写于 作者: H Haojun Liao

fix(query): support query last file.

上级 082d2093
...@@ -1279,7 +1279,6 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc ...@@ -1279,7 +1279,6 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc
bool overlapWithlastBlock = false; bool overlapWithlastBlock = false;
if (hasDataInLastBlock(pLastBlockReader)) { if (hasDataInLastBlock(pLastBlockReader)) {
SBlockL *pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex); SBlockL *pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex);
// int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey); overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey);
} }
...@@ -1358,8 +1357,71 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* ...@@ -1358,8 +1357,71 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
return pReader->pMemSchema; return pReader->pMemSchema;
} }
// todo handle desc
static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
SBlockData* pBlockData = &pReader->status.fileBlockData;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
int64_t minKey = INT64_MAX;
if (minKey > tsLast) {
minKey = tsLast;
}
if (minKey > k.ts) {
minKey = k.ts;
}
if (minKey > key && pBlockData->nRow > 0) {
minKey = key;
}
// file block ---> last block -----> imem -----> mem
bool init = false;
if (minKey == key) {
init = true;
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
}
if (minKey == tsLast) {
if (!init) {
init = true;
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
}
doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge);
}
if (minKey == k.ts) {
if (!init) {
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, pRow, pSchema);
}
doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
}
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
SIterInfo* pIter, int64_t key) { SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0}; SRowMerger merge = {0};
STSRow* pTSRow = NULL; STSRow* pTSRow = NULL;
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
...@@ -1431,6 +1493,85 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1431,6 +1493,85 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t doMergeThreeLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SArray* pDelList = pBlockScanInfo->delSkyline;
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader);
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader);
ASSERT(pRow != NULL && piRow != NULL);
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
int64_t minKey = INT64_MAX;
if (minKey > k.ts) {
minKey = k.ts;
}
if (minKey > ik.ts) {
minKey = ik.ts;
}
if (minKey > key) {
minKey = key;
}
if (minKey > tsLast) {
minKey = tsLast;
}
// file block ---> last block -----> imem -----> mem
bool init = false;
if (minKey == key) {
init = true;
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
}
if (minKey == tsLast) {
if (!init) {
init = true;
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
}
doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge);
}
if (minKey == ik.ts) {
if (!init) {
init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, piRow, pSchema);
}
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
}
if (minKey == k.ts) {
if (!init) {
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, pRow, pSchema);
}
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
}
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
SRowMerger merge = {0}; SRowMerger merge = {0};
STSRow* pTSRow = NULL; STSRow* pTSRow = NULL;
...@@ -1605,8 +1746,8 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum ...@@ -1605,8 +1746,8 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, STimeWindow* pWin, SVersionRange* pVerRange, static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, STimeWindow* pWin,
int16_t startPos) { SVersionRange* pVerRange, int16_t startPos) {
pLastBlockReader->uid = uid; pLastBlockReader->uid = uid;
pLastBlockReader->window = *pWin; pLastBlockReader->window = *pWin;
pLastBlockReader->verRange = *pVerRange; pLastBlockReader->verRange = *pVerRange;
...@@ -1636,10 +1777,12 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) { ...@@ -1636,10 +1777,12 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) {
// no data any more // no data any more
if (pBlockData->aTSKEY[i] > pLastBlockReader->window.ekey) { if (pBlockData->aTSKEY[i] > pLastBlockReader->window.ekey) {
pLastBlockReader->rowIndex = pBlockData->nRow;
return false; return false;
} }
if (pBlockData->aVersion[i] > pLastBlockReader->verRange.maxVer) { if (pBlockData->aVersion[i] > pLastBlockReader->verRange.maxVer) {
pLastBlockReader->rowIndex = pBlockData->nRow;
return false; return false;
} }
...@@ -1676,25 +1819,21 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { ...@@ -1676,25 +1819,21 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) {
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader *pLastBlockReader) { static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader *pLastBlockReader) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t key = INT64_MIN; int64_t key = (pBlockData->nRow > 0)? pBlockData->aTSKEY[pDumpInfo->rowIndex]:INT64_MIN;
if (pBlockData->nRow > 0) {
key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
}
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) { if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
return doMergeThreeLevelRows(pReader, pBlockScanInfo, pBlockData); return doMergeThreeLevelRowsRv(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
} else { } else {
// imem + file // imem + file + last block
if (pBlockScanInfo->iiter.hasVal) { if (pBlockScanInfo->iiter.hasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key); return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
} }
// mem + file // mem + file
if (pBlockScanInfo->iter.hasVal) { if (pBlockScanInfo->iter.hasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key); return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
} }
if (pBlockData->nRow > 0) { if (pBlockData->nRow > 0) {
...@@ -1704,7 +1843,6 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -1704,7 +1843,6 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader); int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
if (ts < key) { // save rows in last block if (ts < key) { // save rows in last block
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
STSRow* pTSRow = NULL; STSRow* pTSRow = NULL;
SRowMerger merge = {0}; SRowMerger merge = {0};
...@@ -1712,7 +1850,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -1712,7 +1850,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex); TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow1, pReader->pSchema); tRowMergerInit(&merge, &fRow1, pReader->pSchema);
doMergeRowsInLastBlock(pLastBlockReader, tsLastBlock, &merge); doMergeRowsInLastBlock(pLastBlockReader, ts, &merge);
tRowMergerGetRow(&merge, &pTSRow); tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
...@@ -1753,6 +1891,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -1753,6 +1891,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
} }
} }
} else { // only last block exists } else { // only last block exists
// only last block exits
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
...@@ -1820,10 +1959,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { ...@@ -1820,10 +1959,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
} }
buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
// currently loaded file data block is consumed // currently loaded file data block is consumed
if (pBlockData->nRow > 0 && (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0)) { if (pBlockData->nRow > 0 && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
setBlockAllDumped(pDumpInfo, pBlock, pReader->order); setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
break; break;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册