提交 a3011a01 编写于 作者: H Haojun Liao

refactor(query): do some internal refactor.

上级 72e0f09d
......@@ -34,7 +34,7 @@ typedef struct {
typedef struct {
int32_t numOfBlocks;
int32_t numOfLastBlocks;
int32_t numOfLastFiles;
} SBlockNumber;
typedef struct STableBlockScanInfo {
......@@ -84,7 +84,6 @@ typedef struct SBlockLoadSuppInfo {
} SBlockLoadSuppInfo;
typedef struct SLastBlockReader {
SArray* pBlockL;
int32_t currentBlockIndex;
SBlockData lastBlockData;
STimeWindow window;
......@@ -343,7 +342,6 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet,
}
SLastBlockReader* pLReader = pIter->pLastBlockReader;
pLReader->pBlockL = taosArrayInit(4, sizeof(SBlockL));
pLReader->order = pReader->order;
pLReader->window = pReader->window;
pLReader->verRange = pReader->verRange;
......@@ -363,6 +361,7 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
int32_t step = asc ? 1 : -1;
pIter->index += step;
pIter->pLastBlockReader->uid = 0;
if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
return false;
}
......@@ -581,8 +580,7 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) {
}
}
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* pLastBlockIndex,
SBlockNumber* pBlockNum, SArray* pQualifiedLastBlock) {
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
int32_t numOfQTable = 0;
size_t sizeInDisk = 0;
size_t numOfTables = taosArrayGetSize(pIndexList);
......@@ -627,37 +625,14 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray*
}
}
size_t numOfLast = taosArrayGetSize(pLastBlockIndex);
for (int32_t i = 0; i < numOfLast; ++i) {
SBlockL* pLastBlock = taosArrayGet(pLastBlockIndex, i);
if (pLastBlock->suid != pReader->suid) {
continue;
}
{
// 1. time range check
printf("%ld, %ld\n", pLastBlock->minKey, pLastBlock->maxKey);
if (pLastBlock->minKey > pReader->window.ekey || pLastBlock->maxKey < pReader->window.skey) {
continue;
}
// 2. version range check
if (pLastBlock->minVer > pReader->verRange.maxVer || pLastBlock->maxVer < pReader->verRange.minVer) {
continue;
}
pBlockNum->numOfLastBlocks += 1;
taosArrayPush(pQualifiedLastBlock, pLastBlock);
}
}
int32_t total = pBlockNum->numOfLastBlocks + pBlockNum->numOfBlocks;
pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nLastF;
int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug(
"load block of %d tables completed, blocks:%d in %d tables, lastBlock:%d, block-info-size:%.2f Kb, elapsed "
"load block of %d tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
"time:%.2f ms %s",
numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastBlocks, sizeInDisk / 1000.0, el,
numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
pReader->idStr);
pReader->cost.numOfBlocks += total;
......@@ -1188,11 +1163,14 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc
bool overlapWithDel = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
// todo here we need to each key in the last files to identify if it is really overlapped with last block
// todo
bool overlapWithlastBlock = false;
#if 0
if (taosArrayGetSize(pLastBlockReader->pBlockL) > 0 && (pLastBlockReader->currentBlockIndex != -1)) {
SBlockL* pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex);
overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey);
}
#endif
bool moreThanOutputCapacity = pBlock->nRow > pReader->capacity;
bool partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
......@@ -2185,12 +2163,10 @@ static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* p
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
SReaderStatus* pStatus = &pReader->status;
pBlockNum->numOfBlocks = 0;
pBlockNum->numOfLastBlocks = 0;
pBlockNum->numOfLastFiles = 0;
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
SArray* pLastBlocks = pStatus->fileIter.pLastBlockReader->pBlockL;
taosArrayClear(pLastBlocks);
while (1) {
bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
......@@ -2205,32 +2181,16 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
return code;
}
code = tsdbReadBlockL(pReader->pFileReader, 0, pLastBlocks);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pIndexList);
return code;
}
if (taosArrayGetSize(pIndexList) > 0 || taosArrayGetSize(pLastBlocks) > 0) {
SArray* pQLastBlock = taosArrayInit(4, sizeof(SBlockL));
code = doLoadFileBlock(pReader, pIndexList, pLastBlocks, pBlockNum, pQLastBlock);
if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nLastF > 0) {
code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pIndexList);
taosArrayDestroy(pQLastBlock);
return code;
}
if (pBlockNum->numOfBlocks + pBlockNum->numOfLastBlocks > 0) {
ASSERT(taosArrayGetSize(pQLastBlock) == pBlockNum->numOfLastBlocks);
taosArrayClear(pLastBlocks);
taosArrayAddAll(pLastBlocks, pQLastBlock);
taosArrayDestroy(pQLastBlock);
if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
break;
}
taosArrayDestroy(pQLastBlock);
}
// no blocks in current file, try next files
......@@ -2240,69 +2200,6 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
return TSDB_CODE_SUCCESS;
}
#if 0
static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader) {
SArray* pBlocks = pLastBlockReader->pBlockL;
SBlockL* pBlock = NULL;
uint64_t uid = pBlockScanInfo->uid;
int32_t totalLastBlocks = (int32_t)taosArrayGetSize(pBlocks);
initMemDataIterator(pBlockScanInfo, pReader);
// find the correct SBlockL. todo binary search
int32_t index = -1;
for (int32_t i = 0; i < totalLastBlocks; ++i) {
SBlockL* p = taosArrayGet(pBlocks, i);
if (p->minUid <= uid && p->maxUid >= uid) {
index = i;
pBlock = p;
break;
}
}
if (index == -1) {
pLastBlockReader->currentBlockIndex = index;
tBlockDataReset(&pLastBlockReader->lastBlockData);
return TSDB_CODE_SUCCESS;
}
// the required last datablock has already loaded
if (index == pLastBlockReader->currentBlockIndex) {
return TSDB_CODE_SUCCESS;
}
int64_t st = taosGetTimestampUs();
int32_t code =
tBlockDataInit(&pLastBlockReader->lastBlockData, pReader->suid, pReader->suid ? 0 : uid, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p init block data failed, code:%s %s", pReader, tstrerror(code), pReader->idStr);
return code;
}
code = tsdbReadLastBlock(pReader->pFileReader, 0, pBlock, &pLastBlockReader->lastBlockData);
double el = (taosGetTimestampUs() - st) / 1000.0;
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p error occurs in loading last block into buffer, last block index:%d, total:%d code:%s %s", pReader,
pLastBlockReader->currentBlockIndex, totalLastBlocks, tstrerror(code), pReader->idStr);
} else {
tsdbDebug("%p load last block completed, uid:%" PRIu64
" last block index:%d, total:%d rows:%d, minVer:%d, maxVer:%d, brange:%" PRId64 "-%" PRId64
" elapsed time:%.2f ms, %s",
pReader, uid, index, totalLastBlocks, pBlock->nRow, pBlock->minVer, pBlock->maxVer, pBlock->minKey,
pBlock->maxKey, el, pReader->idStr);
}
pLastBlockReader->currentBlockIndex = index;
pReader->cost.lastBlockLoad += 1;
pReader->cost.lastBlockLoadTime += el;
return TSDB_CODE_SUCCESS;
}
#endif
static int32_t uidComparFunc(const void* p1, const void* p2) {
uint64_t pu1 = *(uint64_t*)p1;
uint64_t pu2 = *(uint64_t*)p2;
......@@ -2395,11 +2292,6 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
while (1) {
// load the last data block of current table
STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
// code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader);
// if (code != TSDB_CODE_SUCCESS) {
// return code;
// }
bool hasVal =
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL, pReader->pFileReader);
if (!hasVal) {
......@@ -2566,7 +2458,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
}
// all data files are consumed, try data in buffer
if (num.numOfBlocks + num.numOfLastBlocks == 0) {
if (num.numOfBlocks + num.numOfLastFiles == 0) {
pReader->status.loadFromFile = false;
return code;
}
......@@ -2579,8 +2471,6 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
resetDataBlockIterator(pBlockIter, pReader->order);
}
SLastBlockReader* pLReader = pReader->status.fileIter.pLastBlockReader;
// set the correct start position according to the query time window
initBlockDumpInfo(pReader, pBlockIter);
return code;
......@@ -2645,7 +2535,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
bool hasNext = blockIteratorNext(&pReader->status.blockIter);
if (hasNext) { // check for the next block in the block accessed order list
initBlockDumpInfo(pReader, pBlockIter);
} else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0) {
} else if (1/*taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0*/) {
// data blocks in current file are exhausted, let's try the next file now
tBlockDataReset(&pReader->status.fileBlockData);
resetDataBlockIterator(pBlockIter, pReader->order);
......@@ -3444,7 +3334,6 @@ void tsdbReaderClose(STsdbReader* pReader) {
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
if (pFilesetIter->pLastBlockReader != NULL) {
tBlockDataDestroy(&pFilesetIter->pLastBlockReader->lastBlockData, true);
taosArrayDestroy(pFilesetIter->pLastBlockReader->pBlockL);
taosMemoryFree(pFilesetIter->pLastBlockReader);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册