提交 515e8d36 编写于 作者: H Haojun Liao

fix(query): fix bug in merge buffer data and last block.

上级 5e4b7bf9
......@@ -15,7 +15,10 @@
#include "osDef.h"
#include "tsdb.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define ALL_ROWS_CHECKED_INDEX (INT16_MIN)
#define DEFAULT_ROW_INDEX_VAL (-1)
typedef enum {
EXTERNAL_ROWS_PREV = 0x1,
......@@ -220,7 +223,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
}
for (int32_t j = 0; j < numOfTables; ++j) {
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = -1};
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = DEFAULT_ROW_INDEX_VAL};
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) {
info.lastKey = pTsdbReader->window.skey;
......@@ -699,10 +702,10 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray*
}
{
// 1. time range check, todo add later
// if (pLastBlock->.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
// continue;
// }
// 1. time range check
if (pLastBlock->minKey > pReader->window.ekey || pLastBlock->maxKey < pReader->window.skey) {
continue;
}
// 2. version range check
if (pLastBlock->minVer > pReader->verRange.maxVer || pLastBlock->maxVer < pReader->verRange.minVer) {
......@@ -727,12 +730,10 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray*
return TSDB_CODE_SUCCESS;
}
// todo remove pblock parameter
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int32_t order) {
static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int32_t order) {
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
pDumpInfo->allDumped = true;
pDumpInfo->lastKey = pBlock->maxKey.ts + step;
pDumpInfo->lastKey = maxKey + step;
}
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
......@@ -832,7 +833,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
pResBlock->info.rows = remain;
pDumpInfo->rowIndex += step * remain;
setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
pReader->cost.blockLoadTime += elapsedTime;
......@@ -1289,7 +1290,7 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc
// todo here we need to each key in the last files to identify if it is really overlapped with last block
bool overlapWithlastBlock = false;
if (/*hasDataInLastBlock(pLastBlockReader)*/taosArrayGetSize(pLastBlockReader->pBlockL) > 0) {
if (taosArrayGetSize(pLastBlockReader->pBlockL) > 0) {
SBlockL *pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex);
overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey);
}
......@@ -1417,7 +1418,8 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
bool init = false;
// file block ---> last block -----> imem -----> mem
// ASC: file block ---> last block -----> imem -----> mem
//DESC: mem -----> imem -----> last block -----> file block
if (pReader->order == TSDB_ORDER_ASC) {
if (minKey == key) {
init = true;
......@@ -1549,8 +1551,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return TSDB_CODE_SUCCESS;
}
// todo handle the desc order check
static int32_t doMergeThreeLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
......@@ -1569,56 +1570,114 @@ static int32_t doMergeThreeLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
int64_t minKey = INT64_MAX;
if (minKey > k.ts) {
minKey = k.ts;
}
int64_t minKey = 0;//INT64_MAX;
if (ASCENDING_TRAVERSE(pReader->order)) {
minKey = INT64_MAX; // let's find the minimum
if (minKey > k.ts) {
minKey = k.ts;
}
if (minKey > ik.ts) {
minKey = ik.ts;
}
if (minKey > ik.ts) {
minKey = ik.ts;
}
if (minKey > key) {
minKey = key;
}
if (minKey > key && pBlockData->nRow > 0) {
minKey = key;
}
if (minKey > tsLast) {
minKey = tsLast;
if (minKey > tsLast && pLastBlockData->nRow > 0) {
minKey = tsLast;
}
} else {
minKey = INT64_MIN; // let find the maximum ts value
if (minKey < k.ts) {
minKey = k.ts;
}
if (minKey < ik.ts) {
minKey = ik.ts;
}
if (minKey < key && pBlockData->nRow > 0) {
minKey = key;
}
if (minKey < tsLast && pLastBlockData->nRow > 0) {
minKey = tsLast;
}
}
// file block ---> last block -----> imem -----> mem
bool init = false;
if (minKey == key) {
init = true;
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
}
if (minKey == tsLast) {
if (!init) {
// ASC: file block -----> last block -----> imem -----> mem
// DESC: mem -----> imem -----> last block -----> file block
if (ASCENDING_TRAVERSE(pReader->order)) {
if (minKey == key) {
init = true;
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
}
doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge);
}
if (minKey == ik.ts) {
if (!init) {
init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, piRow, pSchema);
if (minKey == tsLast) {
if (!init) {
init = true;
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
}
doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge);
}
if (minKey == ik.ts) {
if (!init) {
init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, piRow, pSchema);
}
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
}
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
}
if (minKey == k.ts) {
if (!init) {
if (minKey == k.ts) {
if (!init) {
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, pRow, pSchema);
}
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
}
} else {
if (minKey == k.ts) {
init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, pRow, pSchema);
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
}
if (minKey == ik.ts) {
if (!init) {
init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, piRow, pSchema);
}
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
}
if (minKey == tsLast) {
if (!init) {
init = true;
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
}
doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge);
}
if (minKey == key) {
if (!init) {
init = true;
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
}
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
}
tRowMergerGetRow(&merge, &pTSRow);
......@@ -1816,7 +1875,6 @@ static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid
}
}
#define ALL_ROWS_CHECKED_INDEX INT16_MIN
static void setAllRowsChecked(SLastBlockReader *pLastBlockReader) {
*pLastBlockReader->rowIndex = ALL_ROWS_CHECKED_INDEX;
}
......@@ -1863,16 +1921,6 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) {
return false;
}
#if 0
static int32_t saveCurrentState(SLastBlockReader* pLastBlockReader) {
return pLastBlockReader->rowIndex;
}
static void restoreState(SLastBlockReader* pLastBlockReader, int32_t state) {
pLastBlockReader->rowIndex = state;
}
#endif
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
return pBlockData->aTSKEY[*pLastBlockReader->rowIndex];
......@@ -1894,7 +1942,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
return doMergeThreeLevelRowsRv(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
return doMergeMultiLevelRowsRv(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
} else {
// imem + file + last block
if (pBlockScanInfo->iiter.hasVal) {
......@@ -2014,7 +2062,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
}
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pBlockScanInfo->indexInBlockL);
// initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pBlockScanInfo->indexInBlockL);
// bool has = nextRowInLastBlock(pLastBlockReader); // todo handle if no data in last block
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
......@@ -2026,19 +2074,27 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
while (1) {
// todo check the validate of row in file block
{
if (pBlockData->nRow > 0 && !isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
bool hasBlockData = false;
while (pBlockData->nRow > 0) {
if (isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
hasBlockData = true;
break;
}
pDumpInfo->rowIndex += step;
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
break;
}
continue;
}
if (!hasDataInLastBlock(pLastBlockReader)) {
bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
// no data in last block and block, no need to proceed.
if ((hasBlockData == false) && (hasBlockLData == false)) {
break;
}
}
......@@ -2048,7 +2104,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
// currently loaded file data block is consumed
if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
break;
}
......@@ -2218,11 +2274,11 @@ _err:
return code;
}
static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pReader) {
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
TSDBKEY key = {.ts = TSKEY_INITIAL_VAL};
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
// SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
// STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
initMemDataIterator(pScanInfo, pReader);
TSDBROW* pRow = getValidRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
......@@ -2321,16 +2377,17 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, uint64
code = tBlockDataInit(&pLastBlockReader->lastBlockData, pReader->suid, pReader->suid ? 0 : uid, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
//todo add log
tsdbError("%p init block data failed, code:%s %s", pReader, tstrerror(code), pReader->idStr);
return code;
}
code = tsdbReadLastBlock(pReader->pFileReader, pBlock, &pLastBlockReader->lastBlockData);
if (code != TSDB_CODE_SUCCESS) {
// tsdbDebug("%p error occurs in loading last block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64
// ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", code:%s %s",
// pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlock->nRow,
// pBlock->minVer, pBlock->maxVer, tstrerror(code), pReader->idStr);
tsdbError(
"%p error occurs in loading last block into buffer, last block index:%d, total:%d rows:%d, minVer:%" PRId64
", maxVer:%" PRId64 ", code:%s %s",
pReader, pLastBlockReader->currentBlockIndex, (int32_t)taosArrayGetSize(pBlocks), pBlock->nRow, pBlock->minVer,
pBlock->maxVer, tstrerror(code), pReader->idStr);
}
return TSDB_CODE_SUCCESS;
......@@ -2357,7 +2414,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
}
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL);
if (pScanInfo->indexInBlockL == -1 || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) {
if (pScanInfo->indexInBlockL == DEFAULT_ROW_INDEX_VAL || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) {
bool hasData = nextRowInLastBlock(pLastBlockReader);
if (!hasData) { // current table does not have rows in last block, try next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
......@@ -2398,9 +2455,17 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
if (pBlockInfo != NULL) {
pScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
} else {
pScanInfo = pReader->status.pTableIter;
}
if (pBlockInfo != NULL) {
pBlock = getCurrentBlock(pBlockIter);
key = getCurrentKeyInBuf(pBlockIter, pReader);
}
{
key = getCurrentKeyInBuf(pScanInfo, pReader);
// load the last data block of current table
code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo->uid, pReader);
......@@ -2408,7 +2473,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return code;
}
initLastBlockReader(pLastBlockReader, pBlockInfo->uid, &pScanInfo->indexInBlockL);
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL);
if (pScanInfo->indexInBlockL == DEFAULT_ROW_INDEX_VAL || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) {
bool hasData = nextRowInLastBlock(pLastBlockReader);
}
}
if (pBlockInfo == NULL) { // build data block from last data file
......@@ -2439,7 +2507,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
pInfo->uid = pScanInfo->uid;
pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
setComposedBlockFlag(pReader, false);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock, pReader->order);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
}
return code;
......@@ -2663,39 +2731,6 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_
return (SVersionRange){.minVer = startVer, .maxVer = endVer};
}
// // todo not unref yet, since it is not support multi-group interpolation query
// static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) {
// // filter the queried time stamp in the first place
// STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;
// // starts from the buffer in case of descending timestamp order check data blocks
// size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
// int32_t i = 0;
// while (i < numOfTables) {
// STableBlockScanInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
// // the first qualified table for interpolation query
// // if ((pTsdbReadHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
// // (pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
// // break;
// // }
// i++;
// }
// // there are no data in all the tables
// if (i == numOfTables) {
// return;
// }
// STableBlockScanInfo info = *(STableBlockScanInfo*)taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
// taosArrayClear(pTsdbReadHandle->pTableCheckInfo);
// info.lastKey = pTsdbReadHandle->window.skey;
// taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
// }
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order) {
ASSERT(pKey != NULL);
if (pDelList == NULL) {
......@@ -3227,7 +3262,7 @@ int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
ASSERT(pReader != NULL);
taosHashClear(pReader->status.pTableMap);
STableBlockScanInfo info = {.lastKey = 0, .uid = uid};
STableBlockScanInfo info = {.lastKey = 0, .uid = uid, .indexInBlockL = DEFAULT_ROW_INDEX_VAL};
taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
return TDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册