提交 e75c4000 编写于 作者: H Haojun Liao

fix(query): fix bug in query last block.

上级 2cd74016
......@@ -89,7 +89,7 @@ typedef struct SLastBlockReader {
STimeWindow window;
SVersionRange verRange;
uint64_t uid;
int32_t rowIndex;
int16_t* rowIndex; // row index ptr, usually from the STableBlockScanInfo->indexInBlockL
} SLastBlockReader;
typedef struct SFilesetIter {
......@@ -181,7 +181,8 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* re
int8_t* pLevel);
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t doBuildDataBlock(STsdbReader* pReader);
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
......@@ -449,7 +450,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
pReader->suid = pCond->suid;
pReader->order = pCond->order;
pReader->capacity = capacity;
pReader->capacity = 1;
pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
pReader->verRange = getQueryVerRange(pVnode, pCond, level);
pReader->type = pCond->type;
......@@ -624,24 +625,29 @@ _end:
return code;
}
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* pLastBlockIndex,
SBlockNumber * pBlockNum, SArray* pQualifiedLastBlock) {
int32_t numOfQTable= 0;
size_t numOfTables = taosArrayGetSize(pIndexList);
int64_t st = taosGetTimestampUs();
size_t size = 0;
static void cleanupTableScanInfo(SHashObj* pTableMap) {
STableBlockScanInfo* px = NULL;
while (1) {
px = taosHashIterate(pReader->status.pTableMap, px);
px = taosHashIterate(pTableMap, px);
if (px == NULL) {
break;
}
// reset the index in last block when handing a new file
px->indexInBlockL = -1;
tMapDataClear(&px->mapData);
taosArrayClear(px->pBlockList);
}
}
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* pLastBlockIndex,
SBlockNumber * pBlockNum, SArray* pQualifiedLastBlock) {
int32_t numOfQTable = 0;
size_t sizeInDisk = 0;
size_t numOfTables = taosArrayGetSize(pIndexList);
int64_t st = taosGetTimestampUs();
cleanupTableScanInfo(pReader->status.pTableMap);
for (int32_t i = 0; i < numOfTables; ++i) {
SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
......@@ -651,7 +657,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray*
tMapDataReset(&pScanInfo->mapData);
tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
size += pScanInfo->mapData.nData;
sizeInDisk += pScanInfo->mapData.nData;
for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
SBlock block = {0};
tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetBlock);
......@@ -707,7 +713,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray*
double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, lastBlock:%d, size:%.2f Kb, elapsed time:%.2f ms %s",
numOfTables, total, numOfQTable, pBlockNum->numOfLastBlocks, size / 1000.0, el, pReader->idStr);
numOfTables, total, numOfQTable, pBlockNum->numOfLastBlocks, sizeInDisk
/ 1000.0, el, pReader->idStr);
pReader->cost.numOfBlocks += total;
pReader->cost.headFileLoadTime += el;
......@@ -1396,7 +1403,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
if (minKey == tsLast) {
if (!init) {
init = true;
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex);
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
}
......@@ -1541,7 +1548,7 @@ static int32_t doMergeThreeLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo
if (minKey == tsLast) {
if (!init) {
init = true;
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex);
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
}
doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge);
......@@ -1747,7 +1754,7 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, STimeWindow* pWin,
SVersionRange* pVerRange, int16_t startPos) {
SVersionRange* pVerRange, int16_t* startPos) {
pLastBlockReader->uid = uid;
pLastBlockReader->window = *pWin;
pLastBlockReader->verRange = *pVerRange;
......@@ -1755,14 +1762,14 @@ static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid
}
static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) {
if (pLastBlockReader->rowIndex >= pLastBlockReader->lastBlockData.nRow) {
if (*(pLastBlockReader->rowIndex) >= pLastBlockReader->lastBlockData.nRow) {
return false;
}
pLastBlockReader->rowIndex += 1;
*(pLastBlockReader->rowIndex) += 1;
SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
for(int32_t i = pLastBlockReader->rowIndex; i < pBlockData->nRow; ++i) {
for(int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow; ++i) {
if (pBlockData->aUid[i] != pLastBlockReader->uid) {
continue;
}
......@@ -1777,23 +1784,25 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) {
// no data any more
if (pBlockData->aTSKEY[i] > pLastBlockReader->window.ekey) {
pLastBlockReader->rowIndex = pBlockData->nRow;
*(pLastBlockReader->rowIndex) = pBlockData->nRow;
return false;
}
if (pBlockData->aVersion[i] > pLastBlockReader->verRange.maxVer) {
pLastBlockReader->rowIndex = pBlockData->nRow;
*(pLastBlockReader->rowIndex) = pBlockData->nRow;
return false;
}
pLastBlockReader->rowIndex = i;
*(pLastBlockReader->rowIndex) = i;
return true;
}
pLastBlockReader->rowIndex = pBlockData->nRow;
// set all data is consumed in last block
*(pLastBlockReader->rowIndex) = pBlockData->nRow;
return false;
}
#if 0
static int32_t saveCurrentState(SLastBlockReader* pLastBlockReader) {
return pLastBlockReader->rowIndex;
}
......@@ -1801,15 +1810,16 @@ static int32_t saveCurrentState(SLastBlockReader* pLastBlockReader) {
static void restoreState(SLastBlockReader* pLastBlockReader, int32_t state) {
pLastBlockReader->rowIndex = state;
}
#endif
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
return pBlockData->aTSKEY[pLastBlockReader->rowIndex];
return pBlockData->aTSKEY[*pLastBlockReader->rowIndex];
}
// todo handle desc order
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) {
if (pLastBlockReader->rowIndex >= pLastBlockReader->lastBlockData.nRow) {
if (*pLastBlockReader->rowIndex >= pLastBlockReader->lastBlockData.nRow) {
return false;
}
......@@ -1847,7 +1857,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex);
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
doMergeRowsInLastBlock(pLastBlockReader, ts, &merge);
......@@ -1898,7 +1908,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex);
TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInLastBlock(pLastBlockReader, tsLastBlock, &merge);
......@@ -1925,12 +1935,9 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
pBlockScanInfo = pReader->status.pTableIter;
}
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
int16_t startIndex = pBlockInfo != NULL? pBlockScanInfo->indexInBlockL:-1;
initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pReader->window, &pReader->verRange, startIndex);
bool has = nextRowInLastBlock(pLastBlockReader); // todo handle if no data in last block
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pReader->window, &pReader->verRange, &pBlockScanInfo->indexInBlockL);
// bool has = nextRowInLastBlock(pLastBlockReader); // todo handle if no data in last block
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData;
......@@ -1961,7 +1968,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
// currently loaded file data block is consumed
if (pBlockData->nRow > 0 && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
break;
......@@ -2252,7 +2259,7 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, uint64
}
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
SReaderStatus* pStatus = &pReader->status;
SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
while(1) {
......@@ -2261,17 +2268,42 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
if (pStatus->pTableIter == NULL) {
return TSDB_CODE_SUCCESS;
}
} else { // let's try next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
if (pStatus->pTableIter == NULL) {
return TSDB_CODE_SUCCESS;
}
// load the last data block of current table
// todo opt perf by avoiding load last block repeatly
STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
int32_t code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo->uid, pReader);
if (code != TSDB_CODE_SUCCESS) { // todo handle error
return code;
}
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pReader->window, &pReader->verRange, &pScanInfo->indexInBlockL);
if (pScanInfo->indexInBlockL == -1) {
bool hasData = nextRowInLastBlock(pLastBlockReader);
if (!hasData) { // current table does not have rows in last block, try next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
if (pStatus->pTableIter == NULL) {
return TSDB_CODE_SUCCESS;
}
continue;
}
}
// find the last block that contain the specified block uid
return doLoadRelatedLastBlock(pLastBlockReader, pStatus->pTableIter->uid, pReader);
code = doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pReader->pResBlock->info.rows > 0) {
return TSDB_CODE_SUCCESS;
}
//todo check for all empty table
// current table is exhausted, let's try next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
if (pStatus->pTableIter == NULL) {
return TSDB_CODE_SUCCESS;
}
}
}
......@@ -2281,8 +2313,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
SBlock* pBlock = NULL;
TSDBKEY key = {0};
SBlock* pBlock = NULL;
STableBlockScanInfo* pScanInfo = NULL;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
......@@ -2292,29 +2324,28 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
pBlock = getCurrentBlock(pBlockIter);
key = getCurrentKeyInBuf(pBlockIter, pReader);
// load the last data block of current table
code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo->uid, pReader);
if (code != TSDB_CODE_SUCCESS) {
// todo handle error
}
initLastBlockReader(pLastBlockReader, pBlockInfo->uid, &pReader->window, &pReader->verRange, pScanInfo->indexInBlockL);
bool hasData = nextRowInLastBlock(pLastBlockReader);
} else {
ASSERT(pBlockIter->numOfBlocks == 0);
initLastBlockReader(pLastBlockReader, pBlockInfo->uid, &pReader->window, &pReader->verRange, &pScanInfo->indexInBlockL);
}
if (pBlockInfo == NULL || fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, key, pLastBlockReader)) {
if (pBlockInfo != NULL) {
tBlockDataReset(&pStatus->fileBlockData);
code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
//todo
}
if (pBlockInfo == NULL) { // build data block from last data file
ASSERT(pBlockIter->numOfBlocks == 0);
code = buildComposedDataBlock(pReader);
} else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, key, pLastBlockReader)) {
tBlockDataReset(&pStatus->fileBlockData);
code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
// todo
}
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// build composed data block
......@@ -2425,7 +2456,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
return code;
}
// all data blocks are check in last file, now let's try the next file
// all data blocks are checked in this last block file, now let's try the next file
if (pReader->status.pTableIter == NULL) {
code = initForFirstBlockInFile(pReader, pBlockIter);
......@@ -2434,7 +2465,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
return code;
}
// this file does not have blocks, let's start check the last block file
// this file does not have data files, let's start check the last block file if exists
if (pBlockIter->numOfBlocks == 0) {
goto _begin;
}
......@@ -2855,7 +2886,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, int64_t ts, S
while(nextRowInLastBlock(pLastBlockReader)) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 == ts) {
TSDBROW fRow1 = tsdbRowFromBlockData(&pLastBlockReader->lastBlockData, pLastBlockReader->rowIndex);
TSDBROW fRow1 = tsdbRowFromBlockData(&pLastBlockReader->lastBlockData, *pLastBlockReader->rowIndex);
tRowMerge(pMerger, &fRow1);
} else {
break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册