提交 b8bc052a 编写于 作者: H Haojun Liao

refactor(query): do some internal refactor.

上级 5efd5a3b
...@@ -129,6 +129,8 @@ typedef struct SReaderStatus { ...@@ -129,6 +129,8 @@ typedef struct SReaderStatus {
SHashObj* pTableMap; // SHash<STableBlockScanInfo> SHashObj* pTableMap; // SHash<STableBlockScanInfo>
STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks. STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks.
SFileBlockDumpInfo fBlockDumpInfo; SFileBlockDumpInfo fBlockDumpInfo;
SDFileSet* pCurrentFileSet; // current opened file set
SBlockData fileBlockData; SBlockData fileBlockData;
SFileSetIter fileIter; SFileSetIter fileIter;
SDataBlockIter blockIter; SDataBlockIter blockIter;
...@@ -352,6 +354,8 @@ static int32_t initFileIterator(SFileSetIter* pIter, const STsdbFSState* pFState ...@@ -352,6 +354,8 @@ static int32_t initFileIterator(SFileSetIter* pIter, const STsdbFSState* pFState
static void resetDataBlockIterator(SDataBlockIter* pIter) { static void resetDataBlockIterator(SDataBlockIter* pIter) {
pIter->numOfBlocks = -1; pIter->numOfBlocks = -1;
pIter->index = -1;
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
} }
static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* pReader) { static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* pReader) {
...@@ -363,13 +367,14 @@ static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* ...@@ -363,13 +367,14 @@ static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader*
// check file the time range of coverage // check file the time range of coverage
STimeWindow win = {0}; STimeWindow win = {0};
SDFileSet *pDFile = (SDFileSet *)taosArrayGet(pIter->pFileList, pIter->index); pReader->status.pCurrentFileSet = (SDFileSet *)taosArrayGet(pIter->pFileList, pIter->index);
int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pDFile); int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileSet);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _err; goto _err;
} }
// todo file range check
// tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey); // tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
// current file are not overlapped with query time window, ignore remain files // current file are not overlapped with query time window, ignore remain files
...@@ -380,6 +385,8 @@ static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* ...@@ -380,6 +385,8 @@ static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader*
return false; return false;
} }
return true;
_err: _err:
return false; return false;
} }
...@@ -884,10 +891,11 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, ...@@ -884,10 +891,11 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
pScanInfo->pBlockList = taosArrayInit(16, sizeof(SBlock)); pScanInfo->pBlockList = taosArrayInit(16, sizeof(SBlock));
} }
pScanInfo->blockIdx = blockIndex;
taosArrayPush(pIndexList, &blockIndex); taosArrayPush(pIndexList, &blockIndex);
} }
tMapDataClear(&blockIdxMap); // tMapDataClear(&blockIdxMap);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
...@@ -895,7 +903,7 @@ _err: ...@@ -895,7 +903,7 @@ _err:
return code; return code;
} }
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_t* numOfValidTables) { static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_t* numOfValidTables, int32_t* numOfBlocks) {
size_t numOfTables = taosArrayGetSize(pIndexList); size_t numOfTables = taosArrayGetSize(pIndexList);
*numOfValidTables = 0; *numOfValidTables = 0;
...@@ -917,6 +925,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ ...@@ -917,6 +925,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
return code; return code;
} }
// 1. time range check
if ((ASCENDING_TRAVERSE(pReader->order) && if ((ASCENDING_TRAVERSE(pReader->order) &&
(block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey)) || (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey)) ||
(!ASCENDING_TRAVERSE(pReader->order) && (!ASCENDING_TRAVERSE(pReader->order) &&
...@@ -924,10 +933,17 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ ...@@ -924,10 +933,17 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
continue; continue;
} }
// 2. version range check
// if (block.minVersion > pReader->startVersion || block.maxVersion < pReader->endVersion) {
// continue;
// }
void* p = taosArrayPush(pScanInfo->pBlockList, &block); void* p = taosArrayPush(pScanInfo->pBlockList, &block);
if (p == NULL) { if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
(*numOfBlocks) += 1;
} }
if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) { if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
...@@ -940,7 +956,6 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ ...@@ -940,7 +956,6 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
int32_t numOfCols = taosArrayGetSize(pReader->pResBlock->pDataBlock);
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx); SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
...@@ -1983,7 +1998,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte ...@@ -1983,7 +1998,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
int32_t cnt = 0; int32_t cnt = 0;
void* ptr = NULL; void* ptr = NULL;
while(1) { while(1) {
ptr = taosHashIterate(pReader->status.pTableMap, &ptr); ptr = taosHashIterate(pReader->status.pTableMap, ptr);
if (ptr == NULL) { if (ptr == NULL) {
break; break;
} }
...@@ -2025,6 +2040,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte ...@@ -2025,6 +2040,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
} }
tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted %s", tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted %s",
pReader, cnt, pReader->idStr); pReader, cnt, pReader->idStr);
pBlockIter->index = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2428,18 +2445,29 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* ...@@ -2428,18 +2445,29 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t code = TSDB_CODE_SUCCESS;
TSDBKEY startKey = {.ts = pReader->window.skey, .version = pReader->startVersion}; TSDBKEY startKey = {.ts = pReader->window.skey, .version = pReader->startVersion};
STbData* d = NULL; STbData* d = NULL;
if (pReader->pTsdb->mem != NULL) { if (pReader->pTsdb->mem != NULL) {
tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d); tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d);
tsdbTbDataIterCreate(d, &startKey, 0, &pBlockScanInfo->iter); if (d != NULL) {
code = tsdbTbDataIterCreate(d, &startKey, 0, &pBlockScanInfo->iter);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
} }
STbData* di = NULL; STbData* di = NULL;
if (pReader->pTsdb->imem != NULL) { if (pReader->pTsdb->imem != NULL) {
tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di); tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di);
tsdbTbDataIterCreate(di, &startKey, 0, &pBlockScanInfo->iiter); if (di != NULL) {
code = tsdbTbDataIterCreate(di, &startKey, 0, &pBlockScanInfo->iiter);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
} }
pBlockScanInfo->iterInit = true; pBlockScanInfo->iterInit = true;
...@@ -2494,7 +2522,7 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { ...@@ -2494,7 +2522,7 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
if (taosArrayGetSize(pIndexList) > 0) { if (taosArrayGetSize(pIndexList) > 0) {
uint32_t numOfValidTable = 0; uint32_t numOfValidTable = 0;
code = doLoadFileBlock(pReader, pIndexList, &numOfValidTable); code = doLoadFileBlock(pReader, pIndexList, &numOfValidTable, &numOfBlocks);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2539,6 +2567,7 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { ...@@ -2539,6 +2567,7 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
pInfo->window.skey = pBlock->minKey.ts; pInfo->window.skey = pBlock->minKey.ts;
pInfo->window.ekey = pBlock->maxKey.ts; pInfo->window.ekey = pBlock->maxKey.ts;
setComposedBlockFlag(pReader, false); setComposedBlockFlag(pReader, false);
*exists = true;
} }
} else { } else {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
...@@ -3167,7 +3196,15 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { ...@@ -3167,7 +3196,15 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if (pStatus->loadFromFile) { if (pStatus->loadFromFile) {
bool exists = true; bool exists = true;
int32_t code = loadDataInFiles(pReader, &exists); int32_t code = loadDataInFiles(pReader, &exists);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
if (exists) {
return true;
} else { // let's try the in-memory buffer
}
} else { // no data in files, let's try the buffer } else { // no data in files, let's try the buffer
while(1) { while(1) {
if (pStatus->pTableIter == NULL) { if (pStatus->pTableIter == NULL) {
...@@ -3323,10 +3360,12 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { ...@@ -3323,10 +3360,12 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlockData data = {0}; int32_t code = tBlockDataInit(&pReader->status.fileBlockData);
doLoadFileBlockData(pReader, &pReader->status.blockIter, pBlockScanInfo, &data); doLoadFileBlockData(pReader, &pReader->status.blockIter, pBlockScanInfo, &pReader->status.fileBlockData);
// TSDBROW row = tsdbRowFromBlockData(&pReader->status.fileBlockData, 0);
// doAppendOneRow(pReader->pResBlock, pReader, row.);
// todo convert blockData to ssdatablock return pReader->pResBlock->pDataBlock;
} }
// /** // /**
// * In the following two cases, the data has been loaded to SColumnInfoData. // * In the following two cases, the data has been loaded to SColumnInfoData.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册