提交 6464a3d2 编写于 作者: H Haojun Liao

refactor(query): do some internal refactor.

上级 10ab8ca8
...@@ -363,12 +363,11 @@ static void resetDataBlockIterator(SDataBlockIter* pIter) { ...@@ -363,12 +363,11 @@ static void resetDataBlockIterator(SDataBlockIter* pIter) {
} }
static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* pReader) { static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* pReader) {
pIter->index += 1;
if (pIter->index >= pIter->numOfFiles) { if (pIter->index >= pIter->numOfFiles) {
return false; return false;
} }
pIter->index += 1;
// check file the time range of coverage // check file the time range of coverage
STimeWindow win = {0}; STimeWindow win = {0};
pReader->status.pCurrentFileSet = (SDFileSet *)taosArrayGet(pIter->pFileList, pIter->index); pReader->status.pCurrentFileSet = (SDFileSet *)taosArrayGet(pIter->pFileList, pIter->index);
...@@ -920,7 +919,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ ...@@ -920,7 +919,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
tsdbReadBlock(pReader->pFileReader, pBlockIdx, &mapData, NULL); tsdbReadBlock(pReader->pFileReader, pBlockIdx, &mapData, NULL);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t)); STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
ASSERT(pScanInfo->pBlockList == NULL || taosArrayGetSize(pScanInfo->pBlockList) == 0); taosArrayClear(pScanInfo->pBlockList);
for (int32_t j = 0; j < mapData.nItem; ++j) { for (int32_t j = 0; j < mapData.nItem; ++j) {
SBlock block = {0}; SBlock block = {0};
...@@ -971,18 +971,25 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI ...@@ -971,18 +971,25 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx); SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
uint8_t *pb = NULL, *pb1 = NULL; uint8_t *pb = NULL, *pb1 = NULL;
int32_t code = tsdbReadBlockData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pBlockData, /*pReader->suppInfo.colIdList, numOfCols, */&pb, &pb1); int32_t code = tsdbReadBlockData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pBlockData, &pb, &pb1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
SColVal cv = {0};
for(int32_t i = 0; i < taosArrayGetSize(pReader->pResBlock->pDataBlock); ++i) { for(int32_t i = 0; i < taosArrayGetSize(pReader->pResBlock->pDataBlock); ++i) {
SColData* pData = (SColData*)taosArrayGet(pBlockData->aColDataP, pReader->suppInfo.slotIds[i]);
SColumnInfoData* pColData = taosArrayGet(pReader->pResBlock->pDataBlock, i); SColumnInfoData* pColData = taosArrayGet(pReader->pResBlock->pDataBlock, i);
for(int32_t j = 0; j < pBlockData->nRow; ++j) { if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
tColDataGetValue(pData, j, &cv); for (int32_t j = 0; j < pBlockData->nRow; ++j) {
colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull); colDataAppend(pColData, j, (const char*)&pBlockData->aTSKEY[j], false);
}
} else {
SColVal cv = {0};
SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pReader->suppInfo.slotIds[i]);
for (int32_t j = 0; j < pBlockData->nRow; ++j) {
tColDataGetValue(pData, j, &cv);
colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull);
}
} }
} }
...@@ -2276,7 +2283,7 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) ...@@ -2276,7 +2283,7 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock)
} }
static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) { static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) {
return (key.ts >= pBlock->minKey.ts || key.ts <= pBlock->maxKey.ts) /*&& (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer)*/; return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) /*&& (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer)*/;
} }
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) { static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) {
...@@ -2511,7 +2518,6 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead ...@@ -2511,7 +2518,6 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
initMemIterator(pScanInfo, pReader); initMemIterator(pScanInfo, pReader);
if (pScanInfo->memHasVal) { if (pScanInfo->memHasVal) {
...@@ -2563,7 +2569,82 @@ static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) { ...@@ -2563,7 +2569,82 @@ static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { static int32_t doBuildDataBlock(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
SBlockData data = {0};
tBlockDataInit(&data);
code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// build composed data block
code = buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
// data in memory that are earlier than current file block
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
code = buildInmemDataBlock(pReader, pScanInfo, &maxKey);
// build data block from in-memory buffer data completed.
} else { // whole block is required, return it directly
// todo
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
pInfo->rows = pBlock->nRow;
pInfo->uid = pScanInfo->uid;
pInfo->window.skey = pBlock->minKey.ts;
pInfo->window.ekey = pBlock->maxKey.ts;
setComposedBlockFlag(pReader, false);
}
return code;
}
static int32_t buildInmemBlockSeqentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
while(1) {
if (pStatus->pTableIter == NULL) {
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
if (pStatus->pTableIter == NULL) {
return false;
}
}
STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
initMemIterator(pBlockScanInfo, pReader);
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey);
if (pReader->pResBlock->info.rows > 0) {
return true;
}
// current table is exhausted, let's try the next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
if (pStatus->pTableIter == NULL) {
return false;
}
}
}
static int32_t loadDataInFiles(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SFileSetIter* pFIter = &pStatus->fileIter; SFileSetIter* pFIter = &pStatus->fileIter;
...@@ -2583,36 +2664,8 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { ...@@ -2583,36 +2664,8 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
return code; return code;
} }
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); code = doBuildDataBlock(pReader);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
SBlockData data = {0};
doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data);
// build composed data block
buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
} else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
// data in memory that are earlier than current file block
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
buildInmemDataBlock(pReader, pScanInfo, &maxKey);
// build data block from in-memory buffer data completed.
} else { // whole block is required, return it directly
// todo
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
pInfo->rows = pBlock->nRow;
pInfo->uid = pScanInfo->uid;
pInfo->window.skey = pBlock->minKey.ts;
pInfo->window.ekey = pBlock->maxKey.ts;
setComposedBlockFlag(pReader, false);
}
*exists = true;
} else { } else {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
...@@ -2629,46 +2682,22 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { ...@@ -2629,46 +2682,22 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
return code; return code;
} }
// initialize the block iterator for a new fileset // all data files are consumed, try data in buffer
code = initBlockIterator(pReader, pBlockIter, numOfBlocks); if (numOfBlocks == 0) {
if (code != TSDB_CODE_SUCCESS) { pReader->status.loadFromFile = false;
return code; return code;
} else {
// initialize the block iterator for a new fileset
code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doBuildDataBlock(pReader);
} }
pFBlock = getCurrentBlockInfo(pBlockIter);
pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
SBlockData data = {0};
doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data);
// build composed data block
buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
} else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
// data in memory that are earlier than current file block
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
buildInmemDataBlock(pReader, pScanInfo, &maxKey);
// build data block from in-memory buffer data completed.
} else { // whole block is required, return it directly
// todo
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
pInfo->rows = pBlock->nRow;
pInfo->uid = pScanInfo->uid;
pInfo->window.skey = pBlock->minKey.ts;
pInfo->window.ekey = pBlock->maxKey.ts;
setComposedBlockFlag(pReader, false);
}
*exists = true;
} else { // try next data block in current file } else { // try next data block in current file
// 1. check if ts in buffer is overlap with current file data block
blockIteratorNext(pBlockIter); blockIteratorNext(pBlockIter);
doBuildDataBlock(pReader);
} }
} else { } else {
buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo); buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
...@@ -3263,13 +3292,13 @@ void tsdbReaderClose(STsdbReader* pReader) { ...@@ -3263,13 +3292,13 @@ void tsdbReaderClose(STsdbReader* pReader) {
} }
bool tsdbNextDataBlock(STsdbReader* pReader) { bool tsdbNextDataBlock(STsdbReader* pReader) {
bool ret = false;
if (isEmptyQueryTimeWindow(pReader)) { if (isEmptyQueryTimeWindow(pReader)) {
return false; return false;
} }
// cleanup the data that belongs to the previous data block // cleanup the data that belongs to the previous data block
blockDataCleanup(pReader->pResBlock); SSDataBlock* pBlock = pReader->pResBlock;
blockDataCleanup(pBlock);
int64_t stime = taosGetTimestampUs(); int64_t stime = taosGetTimestampUs();
int64_t elapsedTime = stime; int64_t elapsedTime = stime;
...@@ -3277,41 +3306,20 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { ...@@ -3277,41 +3306,20 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if (pReader->type == BLOCK_LOAD_OFFSET_ORDER) { if (pReader->type == BLOCK_LOAD_OFFSET_ORDER) {
if (pStatus->loadFromFile) { if (pStatus->loadFromFile) {
bool exists = true; int32_t code = loadDataInFiles(pReader);
int32_t code = loadDataInFiles(pReader, &exists);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return false; return false;
} }
if (exists) { if (pBlock->info.rows > 0) {
return true; return true;
} else { // let's try the in-memory buffer } else {
buildInmemBlockSeqentially(pReader);
return pBlock->info.rows > 0;
} }
} else { // no data in files, let's try the buffer } else { // no data in files, let's try the buffer
while(1) { buildInmemBlockSeqentially(pReader);
if (pStatus->pTableIter == NULL) { return pBlock->info.rows > 0;
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
if (pStatus->pTableIter == NULL) {
return false;
}
}
STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
initMemIterator(pBlockScanInfo, pReader);
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey);
if (pReader->pResBlock->info.rows > 0) {
return true;
}
// current table is exhausted, let's try the next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
if (pStatus->pTableIter == NULL) {
return false;
}
}
} }
} else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) { } else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) {
...@@ -3351,7 +3359,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { ...@@ -3351,7 +3359,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
// pReader->cost.checkForNextTime += elapsedTime; // pReader->cost.checkForNextTime += elapsedTime;
// return ret; // return ret;
// } // }
return ret; return false;
} }
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) { void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
...@@ -3450,40 +3458,6 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { ...@@ -3450,40 +3458,6 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
return pReader->pResBlock->pDataBlock; return pReader->pResBlock->pDataBlock;
} }
// /**
// * In the following two cases, the data has been loaded to SColumnInfoData.
// * 1. data is from cache, 2. data block is not completed qualified to query time range
// */
// if (pReader->cur.fid == INT32_MIN) {
// return pReader->pColumns;
// } else {
// SFileBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[pReader->cur.slot];
// STableBlockScanInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
// if (pReader->cur.mixBlock) {
// return pReader->pColumns;
// } else {
// SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
// assert(pReader->realNumOfRows <= binfo.rows);
// // data block has been loaded, todo extract method
// SDataBlockLoadInfo* pBlockLoadInfo = &pReader->dataBlockLoadInfo;
// if (pBlockLoadInfo->slot == pReader->cur.slot && pBlockLoadInfo->fileGroup->fid == pReader->cur.fid &&
// pBlockLoadInfo->uid == pCheckInfo->tableId) {
// return pReader->pColumns;
// } else { // only load the file block
// SBlock* pBlock = pBlockInfo->compBlock;
// if (doLoadFileBlockData(pReader, pBlock, pCheckInfo, pReader->cur.slot) != TSDB_CODE_SUCCESS) {
// return NULL;
// }
// int32_t numOfRows = doCopyRowsFromFileBlock(pReader, pReader->outputCapacity, 0, 0, pBlock->numOfRows - 1);
// return pReader->pColumns;
// }
// }
// }
return NULL;
} }
void tsdbResetReadHandle(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) { void tsdbResetReadHandle(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册