提交 a7cbb93a 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 ba8ab926
...@@ -148,7 +148,7 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); ...@@ -148,7 +148,7 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader); static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader);
static TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader); static TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger);
static int32_t doLoadRowsOfIdenticalTsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader); static int32_t doMergeRowsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader);
static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow); static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader); static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader);
...@@ -422,7 +422,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd ...@@ -422,7 +422,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
#if 1 #if 1
if (pReader->window.skey > pReader->window.ekey) { if (pReader->window.skey > pReader->window.ekey) {
TSWAP(pReader->window.skey, pReader->window.ekey); // TSWAP(pReader->window.skey, pReader->window.ekey);
} }
#endif #endif
...@@ -449,15 +449,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd ...@@ -449,15 +449,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
setColumnIdSlotList(pReader, pReader->pResBlock); setColumnIdSlotList(pReader, pReader->pResBlock);
STsdbFSState* pFState = pReader->pTsdb->fs->cState;
initFileIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
// no data in files, let's try buffer in memory
if (pReader->status.fileIter.numOfFiles == 0) {
pReader->status.loadFromFile = false;
}
*ppReader = pReader; *ppReader = pReader;
return code; return code;
...@@ -722,14 +713,11 @@ _end: ...@@ -722,14 +713,11 @@ _end:
// } // }
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) { static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
int32_t code = 0;
bool asc = ASCENDING_TRAVERSE(pReader->order);
SArray *aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); SArray *aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
code = tsdbReadBlockIdx(pFileReader, aBlockIdx, NULL); int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _err; goto _end;
} }
if (taosArrayGetSize(aBlockIdx) == 0) { if (taosArrayGetSize(aBlockIdx) == 0) {
...@@ -741,6 +729,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, ...@@ -741,6 +729,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
for (int32_t i = 0; i < taosArrayGetSize(aBlockIdx); ++i) { for (int32_t i = 0; i < taosArrayGetSize(aBlockIdx); ++i) {
pBlockIdx = (SBlockIdx *)taosArrayGet(aBlockIdx, i); pBlockIdx = (SBlockIdx *)taosArrayGet(aBlockIdx, i);
// uid check
if (pBlockIdx->suid != pReader->suid) { if (pBlockIdx->suid != pReader->suid) {
continue; continue;
} }
...@@ -751,10 +740,16 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, ...@@ -751,10 +740,16 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
continue; continue;
} }
if ((asc && (pBlockIdx->minKey > pReader->window.ekey || pBlockIdx->maxKey < pReader->window.skey)) /*|| // todo: not valid info in bockIndex
(!asc && (pBlockIdx->minKey > pReader->window.skey || pBlockIdx->maxKey < pReader->window.ekey))*/) { // time range check
continue; // if (pBlockIdx->minKey > pReader->window.ekey || pBlockIdx->maxKey < pReader->window.skey) {
} // continue;
// }
// version check
// if (pBlockIdx->minVersion > pReader->verRange.maxVer || pBlockIdx->maxVersion < pReader->verRange.minVer) {
// continue;
// }
STableBlockScanInfo* pScanInfo = p; STableBlockScanInfo* pScanInfo = p;
if (pScanInfo->pBlockList == NULL) { if (pScanInfo->pBlockList == NULL) {
...@@ -765,10 +760,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, ...@@ -765,10 +760,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
taosArrayPush(pIndexList, pBlockIdx); taosArrayPush(pIndexList, pBlockIdx);
} }
taosArrayDestroy(aBlockIdx); _end:
return TSDB_CODE_SUCCESS;
_err:
taosArrayDestroy(aBlockIdx); taosArrayDestroy(aBlockIdx);
return code; return code;
} }
...@@ -1764,51 +1756,52 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte ...@@ -1764,51 +1756,52 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
bool asc = ASCENDING_TRAVERSE(pReader->order); bool asc = ASCENDING_TRAVERSE(pReader->order);
pBlockIter->numOfBlocks = numOfBlocks; pBlockIter->numOfBlocks = numOfBlocks;
// access data blocks according to the offset of each block in asc/desc order. // access data blocks according to the offset of each block in asc/desc order.
int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap); int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
SBlockOrderSupporter sup = {0}; SBlockOrderSupporter sup = {0};
int32_t code = initBlockOrderSupporter(&sup, numOfTables);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t cnt = 0; int32_t code = initBlockOrderSupporter(&sup, numOfTables);
void* ptr = NULL; if (code != TSDB_CODE_SUCCESS) {
while(1) { return code;
ptr = taosHashIterate(pReader->status.pTableMap, ptr); }
if (ptr == NULL) {
break;
}
STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr; int32_t cnt = 0;
if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) { void* ptr = NULL;
continue; while (1) {
} ptr = taosHashIterate(pReader->status.pTableMap, ptr);
if (ptr == NULL) {
break;
}
size_t num = taosArrayGetSize(pTableScanInfo->pBlockList); STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr;
sup.numOfBlocksPerTable[sup.numOfTables] = num; if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
continue;
}
char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num); size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
if (buf == NULL) { sup.numOfBlocksPerTable[sup.numOfTables] = num;
cleanupBlockOrderSupporter(&sup);
return TSDB_CODE_TDB_OUT_OF_MEMORY;
}
sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf; char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
for (int32_t k = 0; k < num; ++k) { if (buf == NULL) {
SBlockOrderWrapper wrapper = {0}; cleanupBlockOrderSupporter(&sup);
wrapper.pBlock = (SBlock*)taosArrayGet(pTableScanInfo->pBlockList, k); return TSDB_CODE_TDB_OUT_OF_MEMORY;
wrapper.uid = pTableScanInfo->uid; }
sup.pDataBlockInfo[sup.numOfTables][k] = wrapper; sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
cnt++; for (int32_t k = 0; k < num; ++k) {
} SBlockOrderWrapper wrapper = {0};
wrapper.pBlock = (SBlock*)taosArrayGet(pTableScanInfo->pBlockList, k);
wrapper.uid = pTableScanInfo->uid;
sup.numOfTables += 1; sup.pDataBlockInfo[sup.numOfTables][k] = wrapper;
} cnt++;
}
ASSERT(numOfBlocks == cnt); sup.numOfTables += 1;
}
ASSERT(numOfBlocks == cnt);
// since there is only one table qualified, blocks are not sorted // since there is only one table qualified, blocks are not sorted
if (sup.numOfTables == 1) { if (sup.numOfTables == 1) {
...@@ -2123,7 +2116,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2123,7 +2116,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMerge(&merge, pRow); tRowMerge(&merge, pRow);
doLoadRowsOfIdenticalTsInBuf(pIter, hasVal, k.ts, &merge, pReader); doMergeRowsInBuf(pIter, hasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow); tRowMergerGetRow(&merge, &pTSRow);
} }
...@@ -2139,7 +2132,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2139,7 +2132,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
updateSchema(pRow, pBlockScanInfo->uid, pReader); updateSchema(pRow, pBlockScanInfo->uid, pReader);
tRowMergerInit(&merge, pRow, pReader->pSchema); tRowMergerInit(&merge, pRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInBuf(pIter, hasVal, k.ts, &merge, pReader); doMergeRowsInBuf(pIter, hasVal, k.ts, &merge, pReader);
tRowMerge(&merge, &fRow); tRowMerge(&merge, &fRow);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
...@@ -2162,6 +2155,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2162,6 +2155,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader); TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader);
TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader); TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader);
ASSERT(pRow != NULL && piRow != NULL);
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
...@@ -2180,12 +2174,12 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2180,12 +2174,12 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (ik.ts == key) { if (ik.ts == key) {
tRowMerge(&merge, piRow); tRowMerge(&merge, piRow);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, key, &merge, pReader); doMergeRowsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, key, &merge, pReader);
} }
if (k.ts == key) { if (k.ts == key) {
tRowMerge(&merge, pRow); tRowMerge(&merge, pRow);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, key, &merge, pReader); doMergeRowsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, key, &merge, pReader);
} }
tRowMergerGetRow(&merge, &pTSRow); tRowMergerGetRow(&merge, &pTSRow);
...@@ -2225,11 +2219,11 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2225,11 +2219,11 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
updateSchema(pRow, uid, pReader); updateSchema(pRow, uid, pReader);
tRowMergerInit(&merge, pRow, pReader->pSchema); tRowMergerInit(&merge, pRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, key, &merge, pReader); doMergeRowsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, key, &merge, pReader);
if (ik.ts == k.ts) { if (ik.ts == k.ts) {
tRowMerge(&merge, piRow); tRowMerge(&merge, piRow);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, key, &merge, pReader); doMergeRowsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, key, &merge, pReader);
} }
if (k.ts == key) { if (k.ts == key) {
...@@ -2281,6 +2275,38 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2281,6 +2275,38 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
ASSERT(0); ASSERT(0);
} }
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, STsdbReader* pReader) {
// check for version and time range
int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
return false;
}
int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
if (ts > pReader->window.ekey || ts < pReader->window.skey) {
return false;
}
return true;
}
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) {
return (ts > pWindow->ekey) || (ts < pWindow->skey);
}
static bool isValidTSDBRow(TSDBROW* pRow, STimeWindow* pWindow, SVersionRange* pVerRange) {
TSDBKEY key = TSDBROW_KEY(pRow);
if (outOfTimeWindow(key.ts, pWindow)) {
return false;
}
if (key.version > pVerRange->maxVer || key.version < pVerRange->minVer) {
return false;
}
return true;
}
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
...@@ -2309,10 +2335,16 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -2309,10 +2335,16 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
// imem & mem are all empty, only file exist // imem & mem are all empty, only file exist
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema); // if (!isValidFileBlockRow(pBlockData, pDumpInfo, pReader)) {
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); // int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
tRowMergerGetRow(&merge, &pTSRow); // pDumpInfo->rowIndex += step;
doAppendOneRow(pReader->pResBlock, pReader, pTSRow); // } else {
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
// }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
...@@ -2320,9 +2352,29 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -2320,9 +2352,29 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
SSDataBlock* pResBlock = pReader->pResBlock; SSDataBlock* pResBlock = pReader->pResBlock;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData;
int32_t step = ASCENDING_TRAVERSE(pReader->order)? 1:-1;
while(1) { while(1) {
// todo check the validate of row in file block
{
if (!isValidFileBlockRow(pBlockData, pDumpInfo, pReader)) {
pDumpInfo->rowIndex += step;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
break;
}
continue;
}
}
buildComposedDataBlockImpl(pReader, pBlockScanInfo); buildComposedDataBlockImpl(pReader, pBlockScanInfo);
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx); SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
...@@ -2346,6 +2398,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2346,6 +2398,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo*
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s", tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows,
pReader->idStr); pReader->idStr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2420,13 +2473,13 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead ...@@ -2420,13 +2473,13 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
initMemIterator(pScanInfo, pReader); initMemIterator(pScanInfo, pReader);
if (pScanInfo->memHasVal) { TSDBROW* pRow = getValidRow(pScanInfo->iter, &pScanInfo->memHasVal, pReader);
TSDBROW* pRow = getValidRow(pScanInfo->iter, &pScanInfo->memHasVal, pReader); if (pRow != NULL) {
key = TSDBROW_KEY(pRow); key = TSDBROW_KEY(pRow);
} }
if (pScanInfo->imemHasVal) { pRow = getValidRow(pScanInfo->iiter, &pScanInfo->imemHasVal, pReader);
TSDBROW* pRow = getValidRow(pScanInfo->iiter, &pScanInfo->imemHasVal, pReader); if (pRow != NULL) {
TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY k = TSDBROW_KEY(pRow);
if (key.ts > k.ts) { if (key.ts > k.ts) {
key = k; key = k;
...@@ -2490,9 +2543,6 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ...@@ -2490,9 +2543,6 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// build composed data block // build composed data block
code = buildComposedDataBlock(pReader, pScanInfo); code = buildComposedDataBlock(pReader, pScanInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) { } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
// data in memory that are earlier than current file block // data in memory that are earlier than current file block
// todo rows in buffer should be less than the file block in asc, greater than file block in desc // todo rows in buffer should be less than the file block in asc, greater than file block in desc
...@@ -2542,12 +2592,15 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { ...@@ -2542,12 +2592,15 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
} }
} }
// set the correct start position in case of the first/last file block, according to the query time window
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SReaderStatus* pStatus = &pReader->status;
SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
pDumpInfo->totalRows = pBlock->nRow; pDumpInfo->totalRows = pBlock->nRow;
pDumpInfo->allDumped = false; pDumpInfo->allDumped = false;
...@@ -2569,56 +2622,59 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl ...@@ -2569,56 +2622,59 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
// initialize the block iterator for a new fileset // initialize the block iterator for a new fileset
code = initBlockIterator(pReader, pBlockIter, numOfBlocks); code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
// set the correct start position according to the query time window
initBlockDumpInfo(pReader, pBlockIter); initBlockDumpInfo(pReader, pBlockIter);
return code; return code;
} }
static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) {
return (!pDumpInfo->allDumped) && ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc)));
}
static int32_t buildBlockFromFiles(STsdbReader* pReader) { static int32_t buildBlockFromFiles(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
bool asc = ASCENDING_TRAVERSE(pReader->order);
SDataBlockIter* pBlockIter = &pReader->status.blockIter; SDataBlockIter* pBlockIter = &pReader->status.blockIter;
if (pReader->status.blockIter.index == -1) { while (1) {
code = initForFirstBlockInFile(pReader, pBlockIter);
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
return code;
}
code = doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
bool asc = ASCENDING_TRAVERSE(pReader->order);
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
// current block are exhausted, try the next file block if (fileBlockPartiallyRead(pDumpInfo, asc)) { // file data block is partially loaded
if (pDumpInfo->allDumped) { code = buildComposedDataBlock(pReader, pScanInfo);
// try next data block in current file } else {
bool hasNext = blockIteratorNext(&pReader->status.blockIter); // current block are exhausted, try the next file block
if (hasNext) { // current file is exhausted, let's try the next file if (pDumpInfo->allDumped) {
initBlockDumpInfo(pReader, pBlockIter); // try next data block in current file
} else { bool hasNext = blockIteratorNext(&pReader->status.blockIter);
code = initForFirstBlockInFile(pReader, pBlockIter); if (hasNext) { // check for the next block in the block accessed order list
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { initBlockDumpInfo(pReader, pBlockIter);
return code; } else { // data blocks in current file are exhausted, let's try the next file now
code = initForFirstBlockInFile(pReader, pBlockIter);
// error happens or all the data files are completely checked
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
return code;
}
} }
} }
return doBuildDataBlock(pReader); // current block is not loaded yet, or data in buffer may overlap with the file block.
} else if ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && !asc)) { code = doBuildDataBlock(pReader);
// file data block is partially loaded
// todo refactor: extract method
return buildComposedDataBlock(pReader, pScanInfo);
} else { // current block is not loaded yet
return doBuildDataBlock(pReader);
} }
}
return code; if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pReader->pResBlock->info.rows > 0) {
return TSDB_CODE_SUCCESS;
}
}
} }
// // todo not unref yet, since it is not support multi-group interpolation query // // todo not unref yet, since it is not support multi-group interpolation query
...@@ -2654,24 +2710,19 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { ...@@ -2654,24 +2710,19 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
// taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info); // taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
// } // }
static bool outofTimeWindow(int64_t ts, STimeWindow* pWindow, int32_t order) {
return (((ts > pWindow->ekey) && ASCENDING_TRAVERSE(order)) || ((ts < pWindow->skey) && ASCENDING_TRAVERSE(order)));
}
TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) { TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) {
if (!(*hasVal)) { if (!(*hasVal)) {
return NULL; return NULL;
} }
TSDBROW* pRow = tsdbTbDataIterGet(pIter); TSDBROW* pRow = tsdbTbDataIterGet(pIter);
TSDBKEY key = TSDBROW_KEY(pRow); TSDBKEY key = TSDBROW_KEY(pRow);
if (outofTimeWindow(key.ts, &pReader->window, pReader->order)) { if (outOfTimeWindow(key.ts, &pReader->window)) {
*hasVal = false; *hasVal = false;
return NULL; return NULL;
} }
if (key.version <= pReader->verRange.maxVer) { if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) {
return pRow; return pRow;
} }
...@@ -2684,25 +2735,31 @@ TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) { ...@@ -2684,25 +2735,31 @@ TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) {
pRow = tsdbTbDataIterGet(pIter); pRow = tsdbTbDataIterGet(pIter);
key = TSDBROW_KEY(pRow); key = TSDBROW_KEY(pRow);
if (outofTimeWindow(key.ts, &pReader->window, pReader->order)) { if (outOfTimeWindow(key.ts, &pReader->window)) {
*hasVal = false; *hasVal = false;
return NULL; return NULL;
} }
if (key.version <= pReader->verRange.maxVer) { if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) {
return pRow; return pRow;
} }
} }
} }
int32_t doLoadRowsOfIdenticalTsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader) { int32_t doMergeRowsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader) {
while (1) { while (1) {
*hasVal = tsdbTbDataIterNext(pIter); *hasVal = tsdbTbDataIterNext(pIter);
if (!(*hasVal)) { if (!(*hasVal)) {
break; break;
} }
// data exists but not valid
TSDBROW* pRow = getValidRow(pIter, hasVal, pReader); TSDBROW* pRow = getValidRow(pIter, hasVal, pReader);
if (pRow == NULL) {
break;
}
// ts is not identical, quit
TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY k = TSDBROW_KEY(pRow);
if (k.ts != ts) { if (k.ts != ts) {
break; break;
...@@ -2828,7 +2885,7 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, STbDataIter* dIter, bool* has ...@@ -2828,7 +2885,7 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, STbDataIter* dIter, bool* has
updateSchema(pRow, uid, pReader); updateSchema(pRow, uid, pReader);
tRowMergerInit(&merge, pRow, pReader->pSchema); tRowMergerInit(&merge, pRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInBuf(dIter, hasVal, k.ts, &merge, pReader); doMergeRowsInBuf(dIter, hasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow); tRowMergerGetRow(&merge, pTSRow);
} }
...@@ -2842,18 +2899,18 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo *pBlo ...@@ -2842,18 +2899,18 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo *pBlo
updateSchema(piRow, pBlockScanInfo->uid, pReader); updateSchema(piRow, pBlockScanInfo->uid, pReader);
tRowMergerInit(&merge, piRow, pReader->pSchema); tRowMergerInit(&merge, piRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader); doMergeRowsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
tRowMerge(&merge, pRow); tRowMerge(&merge, pRow);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader); doMergeRowsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
} else { } else {
updateSchema(pRow, pBlockScanInfo->uid, pReader); updateSchema(pRow, pBlockScanInfo->uid, pReader);
tRowMergerInit(&merge, pRow, pReader->pSchema); tRowMergerInit(&merge, pRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->memHasVal, ik.ts, &merge, pReader); doMergeRowsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->memHasVal, ik.ts, &merge, pReader);
tRowMerge(&merge, piRow); tRowMerge(&merge, piRow);
doLoadRowsOfIdenticalTsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->imemHasVal, k.ts, &merge, pReader); doMergeRowsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->imemHasVal, k.ts, &merge, pReader);
} }
tRowMergerGetRow(&merge, pTSRow); tRowMergerGetRow(&merge, pTSRow);
...@@ -3191,11 +3248,32 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl ...@@ -3191,11 +3248,32 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
goto _err; goto _err;
} }
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
STsdbFSState* pFState = pReader->pTsdb->fs->cState;
initFileIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
// no data in files, let's try buffer in memory
if (pReader->status.fileIter.numOfFiles == 0) {
pReader->status.loadFromFile = false;
} else {
code = initForFirstBlockInFile(pReader, pBlockIter);
if ((code != TSDB_CODE_SUCCESS)/* || (pReader->status.loadFromFile == false)*/) {
return code;
}
// code = doBuildDataBlock(pReader);
// if (code != TSDB_CODE_SUCCESS) {
// return code;
// }
}
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
return code; return code;
_err: _err:
tsdbError("failed to create tsdb reader, code: %s %s", tstrerror(code), pReader->idStr); tsdbError("failed to create data reader, code: %s %s", tstrerror(code), pReader->idStr);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册