提交 42d0af5c 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 99fb3c67
......@@ -147,8 +147,7 @@ struct STsdbReader {
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader);
static TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader);
static int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock* pBlock, SBlockData* pBlockData,
STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger);
static int32_t doLoadRowsOfIdenticalTsInFileBlock(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger);
static int32_t doLoadRowsOfIdenticalTsInBuf(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader);
static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
......@@ -1069,64 +1068,6 @@ _error:
// return code;
// }
// static int32_t loadFileDataBlock(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STableBlockScanInfo* pCheckInfo,
// bool* exists) {
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
// int32_t code = TSDB_CODE_SUCCESS;
// bool asc = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
// if (asc) {
// // query ended in/started from current block
// if (pTsdbReadHandle->window.ekey < pBlock->maxKey.ts || pCheckInfo->lastKey > pBlock->minKey.ts) {
// if ((code = doLoadFileBlockData(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
// *exists = false;
// return code;
// }
// SDataCols* pTSCol = pTsdbReadHandle->rhelper.pDCols[0];
// assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
// if (pCheckInfo->lastKey > pBlock->minKey.ts) {
// cur->pos =
// binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey,
// pTsdbReadHandle->order);
// } else {
// cur->pos = 0;
// }
// assert(pCheckInfo->lastKey <= pBlock->maxKey.ts);
// doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
// } else { // the whole block is loaded in to buffer
// cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
// code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
// }
// } else { // desc order, query ended in current block
// if (pTsdbReadHandle->window.ekey > pBlock->minKey.ts || pCheckInfo->lastKey < pBlock->maxKey.ts) {
// if ((code = doLoadFileBlockData(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
// *exists = false;
// return code;
// }
// SDataCols* pTsCol = pTsdbReadHandle->rhelper.pDCols[0];
// if (pCheckInfo->lastKey < pBlock->maxKey.ts) {
// cur->pos =
// binarySearchForKey(pTsCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey,
// pTsdbReadHandle->order);
// } else {
// cur->pos = pBlock->numOfRows - 1;
// }
// assert(pCheckInfo->lastKey >= pBlock->minKey.ts);
// doMergeTwoLevelData(pTsdbReadHandle, pCheckInfo, pBlock);
// } else {
// cur->pos = asc ? 0 : (pBlock->numOfRows - 1);
// code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlock, pCheckInfo);
// }
// }
// *exists = pTsdbReadHandle->realNumOfRows > 0;
// return code;
// }
// static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
// int firstPos, lastPos, midPos = -1;
......@@ -1189,94 +1130,6 @@ _error:
// return midPos;
// }
// static int32_t doCopyRowsFromFileBlock(STsdbReader* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t
// start,
// int32_t end) {
// SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
// TSKEY* tsArray = pCols->cols[0].pData;
// int32_t num = end - start + 1;
// assert(num >= 0);
// if (num == 0) {
// return numOfRows;
// }
// bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
// int32_t trueStart = ascScan ? start : end;
// int32_t trueEnd = ascScan ? end : start;
// int32_t step = ascScan ? 1 : -1;
// int32_t requiredNumOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
// // data in buffer has greater timestamp, copy data in file block
// int32_t i = 0, j = 0;
// while (i < requiredNumOfCols && j < pCols->numOfCols) {
// SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
// SDataCol* src = &pCols->cols[j];
// if (src->colId < pColInfo->info.colId) {
// j++;
// continue;
// }
// if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) {
// if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) { // todo opt performance
// // memmove(pData, (char*)src->pData + bytes * start, bytes * num);
// int32_t rowIndex = numOfRows;
// for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex)
// {
// SCellVal sVal = {0};
// if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
// TASSERT(0);
// }
// if (sVal.valType == TD_VTYPE_NORM) {
// colDataAppend(pColInfo, rowIndex, sVal.val, false);
// } else {
// colDataAppendNULL(pColInfo, rowIndex);
// }
// }
// } else { // handle the var-string
// int32_t rowIndex = numOfRows;
// // todo refactor, only copy one-by-one
// for (int32_t k = trueStart; ((ascScan && k <= trueEnd) || (!ascScan && k >= trueEnd)); k += step, ++rowIndex)
// {
// SCellVal sVal = {0};
// if (tdGetColDataOfRow(&sVal, src, k, pCols->bitmapMode) < 0) {
// TASSERT(0);
// }
// if (sVal.valType == TD_VTYPE_NORM) {
// colDataAppend(pColInfo, rowIndex, sVal.val, false);
// } else {
// colDataAppendNULL(pColInfo, rowIndex);
// }
// }
// }
// j++;
// i++;
// } else { // pColInfo->info.colId < src->colId, it is a NULL data
// colDataAppendNNULL(pColInfo, numOfRows, num);
// i++;
// }
// }
// while (i < requiredNumOfCols) { // the remain columns are all null data
// SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
// colDataAppendNNULL(pColInfo, numOfRows, num);
// i++;
// }
// pTsdbReadHandle->cur.win.ekey = tsArray[trueEnd];
// pTsdbReadHandle->cur.lastKey = tsArray[trueEnd] + step;
// return numOfRows + num;
// }
// static int32_t mergeTwoRowFromMem(STsdbReader* pTsdbReadHandle, int32_t capacity, int32_t* curRow, STSRow* row1,
// STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema*
// pSchema2, bool update, TSKEY* lastRowKey) {
......@@ -1472,42 +1325,6 @@ _error:
// #endif
// }
// static void getQualifiedRowsPos(STsdbReader* pTsdbReadHandle, int32_t startPos, int32_t endPos, int32_t numOfExisted,
// int32_t* start, int32_t* end) {
// *start = -1;
// if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
// int32_t remain = endPos - startPos + 1;
// if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
// *end = (pTsdbReadHandle->outputCapacity - numOfExisted) + startPos - 1;
// } else {
// *end = endPos;
// }
// *start = startPos;
// } else {
// int32_t remain = (startPos - endPos) + 1;
// if (remain + numOfExisted > pTsdbReadHandle->outputCapacity) {
// *end = startPos + 1 - (pTsdbReadHandle->outputCapacity - numOfExisted);
// } else {
// *end = endPos;
// }
// *start = *end;
// *end = startPos;
// }
// }
// static void updateInfoAfterMerge(STsdbReader* pTsdbReadHandle, STableBlockScanInfo* pCheckInfo, int32_t numOfRows,
// int32_t endPos) {
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
// pCheckInfo->lastKey = cur->lastKey;
// pTsdbReadHandle->realNumOfRows = numOfRows;
// cur->rows = numOfRows;
// cur->pos = endPos;
// }
// static void doCheckGeneratedBlockRange(STsdbReader* pTsdbReadHandle) {
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
......@@ -2198,18 +2015,22 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock
return -1;
}
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index) {
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
if (index < 0 || index >= pBlockIter->numOfBlocks) {
return -1;
}
SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
pBlockIter->index += step;
if (index != pBlockIter->index) {
taosArrayRemove(pBlockIter->blockList, index);
taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
taosArrayRemove(pBlockIter->blockList, index);
taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
}
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
return TSDB_CODE_SUCCESS;
}
......@@ -2280,7 +2101,7 @@ static int32_t doMergeBufFileRows(STsdbReader* pReader, STableBlockScanInfo* pBl
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge);
doLoadRowsOfIdenticalTsInFileBlock(pBlockData, pBlockScanInfo, pReader, &merge);
if (k.ts == key) {
tRowMerge(&merge, pRow);
......@@ -2296,9 +2117,12 @@ static int32_t doMergeBufFileRows(STsdbReader* pReader, STableBlockScanInfo* pBl
return TSDB_CODE_SUCCESS;
}
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock, STableBlockScanInfo* pBlockScanInfo) {
SFileBlockDumpInfo *pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData;
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
......@@ -2316,7 +2140,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge);
doLoadRowsOfIdenticalTsInFileBlock(pBlockData, pBlockScanInfo, pReader, &merge);
if (ik.ts == key) {
tRowMerge(&merge, piRow);
......@@ -2370,7 +2194,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn
// imem & mem are all empty
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge);
doLoadRowsOfIdenticalTsInFileBlock(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
}
......@@ -2378,34 +2202,35 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockIn
return TSDB_CODE_SUCCESS;
}
static int32_t buildComposedDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock,
STableBlockScanInfo* pBlockScanInfo) {
static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
SSDataBlock* pResBlock = pReader->pResBlock;
while(1) {
buildComposedDataBlockImpl(pReader, pFBlock, pBlock, pBlockScanInfo);
buildComposedDataBlockImpl(pReader, pBlockScanInfo);
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
if (pBlockInfo->tbBlockIdx == pFBlock->tbBlockIdx) { // still in the same file block now
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
break;
}
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
if (pResBlock->info.rows >= pReader->capacity) {
break;
}
} else { // todo traverse to next file due to time window overlap
if (pResBlock->info.rows >= pReader->capacity) {
ASSERT(0);
return TSDB_CODE_SUCCESS;
}
// currently loaded file data block is consumed
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
break;
}
if (pResBlock->info.rows >= pReader->capacity) {
break;
}
}
pResBlock->info.uid = pBlockScanInfo->uid;
blockDataUpdateTsWindow(pResBlock, 0);
setComposedBlockFlag(pReader, true);
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, %s",
pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows,
pReader->idStr);
return TSDB_CODE_SUCCESS;
}
......@@ -2549,7 +2374,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
}
// build composed data block
code = buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
code = buildComposedDataBlock(pReader, pScanInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2673,7 +2498,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
return code;
}
} else {
code = buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
code = buildComposedDataBlock(pReader, pScanInfo);
return code;
}
}
......@@ -2815,10 +2640,10 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
//1. find the next neighbor block in the scan block list
SFileDataBlockInfo fb = {.uid = pFBlock->uid, .tbBlockIdx = nextIndex};
int32_t neighborIndex = findFileBlockInfoIndex(&pStatus->blockIter, &fb);
int32_t neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
//2. remove it from the scan block list
setFileBlockActiveInBlockIter(&pStatus->blockIter, neighborIndex);
setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
//3. load the neighbor block, and set it to be the currently accessed file data block
int32_t code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
......@@ -2840,13 +2665,12 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
return TSDB_CODE_SUCCESS;
}
int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock* pBlock, SBlockData* pBlockData,
STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger) {
int32_t doLoadRowsOfIdenticalTsInFileBlock(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
bool asc = ASCENDING_TRAVERSE(pReader->order);
int32_t step = asc ? 1 : -1;
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
int32_t step = asc ? 1 : -1;
if (asc) {
pDumpInfo->rowIndex += step;
......@@ -2858,7 +2682,10 @@ int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock*
if (pDumpInfo->rowIndex >= pBlockData->nRow && asc) {
while (1) {
CHECK_FILEBLOCK_STATE st;
checkForNeighborFileBlock(pReader, pScanInfo, pBlock, pFBlock, pMerger, key, &st);
SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
SBlock* pCurrentBlock = taosArrayGet(pScanInfo->pBlockList, pFileBlockInfo->tbBlockIdx);
checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
if (st == CHECK_FILEBLOCK_QUIT) {
break;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册