提交 79020ea4 编写于 作者: S shenglian zhou

fix: stt and data block merge

上级 78bdb043
...@@ -1729,40 +1729,44 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader ...@@ -1729,40 +1729,44 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
// row in last file block // row in last file block
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader); int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
if (ASCENDING_TRAVERSE(pReader->info.order)) { if (ASCENDING_TRAVERSE(pReader->info.order)) {
if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist if (key < tsLast) {
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key == ts) { } else if (key > tsLast) {
SRow* pTSRow = NULL; return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); }
if (code != TSDB_CODE_SUCCESS) { } else {
return code; if (key > tsLast) {
} return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key < tsLast) {
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
}
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); }
tsdbRowMergerAdd(pMerger, pRow1, NULL); if (key == tsLast) {
SRow* pTSRow = NULL;
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, pMerger, &pReader->info.verRange, pReader->idStr); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
code = tsdbRowMergerGetRow(pMerger, &pTSRow); TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (code != TSDB_CODE_SUCCESS) { tsdbRowMergerAdd(pMerger, pRow1, NULL);
return code;
}
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
taosMemoryFree(pTSRow); code = tsdbRowMergerGetRow(pMerger, &pTSRow);
tsdbRowMergerClear(pMerger); if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} else { // key > ts
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
} }
} else { // desc order
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true); code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(pMerger);
return code;
} }
} else { // only last block exists } else { // only last block exists
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
...@@ -2190,7 +2194,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -2190,7 +2194,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
TSDBROW *pRow = NULL, *piRow = NULL; TSDBROW *pRow = NULL, *piRow = NULL;
int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] :
(ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MAX : INT64_MIN);
if (pBlockScanInfo->iter.hasVal) { if (pBlockScanInfo->iter.hasVal) {
pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
} }
...@@ -2667,16 +2672,32 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ...@@ -2667,16 +2672,32 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
(ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey; (ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey;
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
} else { } else {
if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->info.order)) { bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader);
// only return the rows in last block int64_t tsLast = bHasDataInLastBlock ? getCurrentKeyInLastBlock(pLastBlockReader) : INT64_MIN;
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); if (!bHasDataInLastBlock || ((ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.lastKey < tsLast) ||
ASSERT(tsLast >= pBlockInfo->record.lastKey); (!ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.firstKey > tsLast))) {
// whole block is required, return it directly
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
pInfo->rows = pBlockInfo->record.numRow;
pInfo->id.uid = pScanInfo->uid;
pInfo->dataLoad = 0;
pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey};
setComposedBlockFlag(pReader, false);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
// update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow,
pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr);
} else {
SBlockData* pBData = &pReader->status.fileBlockData; SBlockData* pBData = &pReader->status.fileBlockData;
tBlockDataReset(pBData); tBlockDataReset(pBData);
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr); tsdbDebug("load data in last block firstly %s", pReader->idStr);
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
...@@ -2707,23 +2728,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ...@@ -2707,23 +2728,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr); pResBlock->info.rows, el, pReader->idStr);
} }
} else { // whole block is required, return it directly
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
pInfo->rows = pBlockInfo->record.numRow;
pInfo->id.uid = pScanInfo->uid;
pInfo->dataLoad = 0;
pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey};
setComposedBlockFlag(pReader, false);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
// update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow,
pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr);
} }
} }
return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code; return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册