未验证 提交 e16a2a50 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16278 from taosdata/feature/3_liaohj

fix(query): add one more condition check when merge file block and last block.
...@@ -69,8 +69,10 @@ typedef struct SIOCostSummary { ...@@ -69,8 +69,10 @@ typedef struct SIOCostSummary {
double buildmemBlock; double buildmemBlock;
int64_t headFileLoad; int64_t headFileLoad;
double headFileLoadTime; double headFileLoadTime;
int64_t smaData; int64_t smaDataLoad;
double smaLoadTime; double smaLoadTime;
int64_t lastBlockLoad;
double lastBlockLoadTime;
} SIOCostSummary; } SIOCostSummary;
typedef struct SBlockLoadSuppInfo { typedef struct SBlockLoadSuppInfo {
...@@ -98,10 +100,10 @@ typedef struct SLastBlockReader { ...@@ -98,10 +100,10 @@ typedef struct SLastBlockReader {
} SLastBlockReader; } SLastBlockReader;
typedef struct SFilesetIter { typedef struct SFilesetIter {
int32_t numOfFiles; // number of total files int32_t numOfFiles; // number of total files
int32_t index; // current accessed index in the list int32_t index; // current accessed index in the list
SArray* pFileList; // data file list SArray* pFileList; // data file list
int32_t order; int32_t order;
SLastBlockReader* pLastBlockReader; // last file block reader SLastBlockReader* pLastBlockReader; // last file block reader
} SFilesetIter; } SFilesetIter;
...@@ -857,7 +859,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn ...@@ -857,7 +859,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData) { static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
double elapsedTime = 0; double elapsedTime = 0;
int32_t code = 0; int32_t code = 0;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
...@@ -1303,9 +1305,23 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc ...@@ -1303,9 +1305,23 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc
overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey); overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey);
} }
return (overlapWithNeighbor || hasDup || dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock) || bool moreThanOutputCapacity = pBlock->nRow > pReader->capacity;
keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity) || bool partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
overlapWithDel || overlapWithlastBlock); bool overlapWithKey = keyOverlapFileBlock(key, pBlock, &pReader->verRange);
bool loadDataBlock = (overlapWithNeighbor || hasDup || partiallyRequired || overlapWithKey ||
moreThanOutputCapacity || overlapWithDel || overlapWithlastBlock);
// log the reason why load the datablock for profile
if (loadDataBlock) {
tsdbDebug("%p uid:%" PRIu64
" need to load the datablock, reason overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
"overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
pReader, pFBlock->uid, overlapWithNeighbor, hasDup, partiallyRequired, overlapWithKey,
moreThanOutputCapacity, overlapWithDel, overlapWithlastBlock, pReader->idStr);
}
return loadDataBlock;
} }
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) { static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
...@@ -1992,7 +2008,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -1992,7 +2008,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
// no last block // no last block
if (pLastBlockReader->lastBlockData.nRow == 0) { if (pLastBlockReader->lastBlockData.nRow == 0 || (!hasDataInLastBlock(pLastBlockReader))) {
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
...@@ -2383,7 +2399,6 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) { ...@@ -2383,7 +2399,6 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// todo add elapsed time results
static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo *pBlockScanInfo, STsdbReader* pReader) { static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo *pBlockScanInfo, STsdbReader* pReader) {
SArray* pBlocks = pLastBlockReader->pBlockL; SArray* pBlocks = pLastBlockReader->pBlockL;
SBlockL* pBlock = NULL; SBlockL* pBlock = NULL;
...@@ -2415,6 +2430,7 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable ...@@ -2415,6 +2430,7 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int64_t st = taosGetTimestampUs();
int32_t code = tBlockDataInit(&pLastBlockReader->lastBlockData, pReader->suid, pReader->suid ? 0 : uid, pReader->pSchema); int32_t code = tBlockDataInit(&pLastBlockReader->lastBlockData, pReader->suid, pReader->suid ? 0 : uid, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p init block data failed, code:%s %s", pReader, tstrerror(code), pReader->idStr); tsdbError("%p init block data failed, code:%s %s", pReader, tstrerror(code), pReader->idStr);
...@@ -2422,17 +2438,23 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable ...@@ -2422,17 +2438,23 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable
} }
code = tsdbReadLastBlock(pReader->pFileReader, pBlock, &pLastBlockReader->lastBlockData); code = tsdbReadLastBlock(pReader->pFileReader, pBlock, &pLastBlockReader->lastBlockData);
double el = (taosGetTimestampUs() - st) / 1000.0;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p error occurs in loading last block into buffer, last block index:%d, total:%d code:%s %s", pReader, tsdbError("%p error occurs in loading last block into buffer, last block index:%d, total:%d code:%s %s", pReader,
pLastBlockReader->currentBlockIndex, totalLastBlocks, tstrerror(code), pReader->idStr); pLastBlockReader->currentBlockIndex, totalLastBlocks, tstrerror(code), pReader->idStr);
} else { } else {
tsdbDebug("%p load last block completed, uid:%" PRIu64 tsdbDebug("%p load last block completed, uid:%" PRIu64
" last block index:%d, total:%d rows:%d, minVer:%d, maxVer:%d, brange:%" PRId64 " - %" PRId64 " %s", " last block index:%d, total:%d rows:%d, minVer:%d, maxVer:%d, brange:%" PRId64 " - %" PRId64
" elapsed time:%.2f ms, %s",
pReader, uid, pLastBlockReader->currentBlockIndex, totalLastBlocks, pBlock->nRow, pBlock->minVer, pReader, uid, pLastBlockReader->currentBlockIndex, totalLastBlocks, pBlock->nRow, pBlock->minVer,
pBlock->maxVer, pBlock->minKey, pBlock->maxKey, pReader->idStr); pBlock->maxVer, pBlock->minKey, pBlock->maxKey, el, pReader->idStr);
} }
pLastBlockReader->currentBlockIndex = index; pLastBlockReader->currentBlockIndex = index;
pReader->cost.lastBlockLoad += 1;
pReader->cost.lastBlockLoadTime += el;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2495,13 +2517,12 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2495,13 +2517,12 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
} }
static int32_t doBuildDataBlock(STsdbReader* pReader) { static int32_t doBuildDataBlock(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
TSDBKEY key = {0}; TSDBKEY key = {0};
int32_t code = TSDB_CODE_SUCCESS;
SBlock* pBlock = NULL; SBlock* pBlock = NULL;
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
STableBlockScanInfo* pScanInfo = NULL; STableBlockScanInfo* pScanInfo = NULL;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
...@@ -2628,7 +2649,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl ...@@ -2628,7 +2649,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
// initialize the block iterator for a new fileset // initialize the block iterator for a new fileset
if (num.numOfBlocks > 0) { if (num.numOfBlocks > 0) {
code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks); code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
} else { } else { // no block data, only last block exists
tBlockDataReset(&pReader->status.fileBlockData); tBlockDataReset(&pReader->status.fileBlockData);
resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap); resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap);
} }
...@@ -2701,7 +2722,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { ...@@ -2701,7 +2722,6 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
if (hasNext) { // check for the next block in the block accessed order list if (hasNext) { // check for the next block in the block accessed order list
initBlockDumpInfo(pReader, pBlockIter); initBlockDumpInfo(pReader, pBlockIter);
} else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0) { // data blocks in current file are exhausted, let's try the next file now } else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0) { // data blocks in current file are exhausted, let's try the next file now
// todo dump all data in last block if exists.
tBlockDataReset(&pReader->status.fileBlockData); tBlockDataReset(&pReader->status.fileBlockData);
resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap); resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap);
goto _begin; goto _begin;
...@@ -3498,10 +3518,11 @@ void tsdbReaderClose(STsdbReader* pReader) { ...@@ -3498,10 +3518,11 @@ void tsdbReaderClose(STsdbReader* pReader) {
tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64 tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
" SMA-time:%.2f ms, fileBlocks:%" PRId64 " SMA-time:%.2f ms, fileBlocks:%" PRId64
", fileBlocks-time:%.2f ms, " ", fileBlocks-time:%.2f ms, "
"build in-memory-block-time:%.2f ms, STableBlockScanInfo size:%.2f Kb %s", "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime, ", lastBlocks-time:%.2f ms, STableBlockScanInfo size:%.2f Kb %s",
pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime,
numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr); pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad,
pCost->lastBlockLoadTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr);
taosMemoryFree(pReader->idStr); taosMemoryFree(pReader->idStr);
taosMemoryFree(pReader->pSchema); taosMemoryFree(pReader->pSchema);
...@@ -3663,7 +3684,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS ...@@ -3663,7 +3684,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
double elapsed = (taosGetTimestampUs() - stime) / 1000.0; double elapsed = (taosGetTimestampUs() - stime) / 1000.0;
pReader->cost.smaLoadTime += elapsed; pReader->cost.smaLoadTime += elapsed;
pReader->cost.smaData += 1; pReader->cost.smaDataLoad += 1;
*pBlockStatis = pSup->plist; *pBlockStatis = pSup->plist;
......
...@@ -860,8 +860,8 @@ int32_t handleLimitOffset(SOperatorInfo *pOperator, SLimitInfo* pLimitInfo, SSDa ...@@ -860,8 +860,8 @@ int32_t handleLimitOffset(SOperatorInfo *pOperator, SLimitInfo* pLimitInfo, SSDa
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList, char** pNextStart); int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList, char** pNextStart);
void updateLoadRemoteInfo(SLoadRemoteDataInfo *pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs, void updateLoadRemoteInfo(SLoadRemoteDataInfo *pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs,
......
...@@ -378,15 +378,30 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow ...@@ -378,15 +378,30 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) { colDataDestroy(pColData); } void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) { colDataDestroy(pColData); }
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, typedef struct {
SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, bool hasAgg;
int32_t numOfTotal, int32_t numOfOutput, int32_t order) { int32_t numOfRows;
int32_t startOffset;
} SFunctionCtxStatus;
static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
pStatus->hasAgg = pCtx->input.colDataAggIsSet;
pStatus->numOfRows = pCtx->input.numOfRows;
pStatus->startOffset = pCtx->input.startRowIndex;
}
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
pCtx->input.colDataAggIsSet = pStatus->hasAgg;
pCtx->input.numOfRows = pStatus->numOfRows;
pCtx->input.startRowIndex = pStatus->startOffset;
}
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
// keep it temporarily // keep it temporarily
// todo no need this?? SFunctionCtxStatus status = {0};
bool hasAgg = pCtx[k].input.colDataAggIsSet; functionCtxSave(&pCtx[k], &status);
int32_t numOfRows = pCtx[k].input.numOfRows;
int32_t startOffset = pCtx[k].input.startRowIndex;
pCtx[k].input.startRowIndex = offset; pCtx[k].input.startRowIndex = offset;
pCtx[k].input.numOfRows = forwardStep; pCtx[k].input.numOfRows = forwardStep;
...@@ -424,9 +439,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow ...@@ -424,9 +439,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
} }
// restore it // restore it
pCtx[k].input.colDataAggIsSet = hasAgg; functionCtxRestore(&pCtx[k], &status);
pCtx[k].input.startRowIndex = startOffset;
pCtx[k].input.numOfRows = numOfRows;
} }
} }
} }
......
...@@ -277,7 +277,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -277,7 +277,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
} }
int32_t rowIndex = j - num; int32_t rowIndex = j - num;
doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->exprSupp.numOfExprs, TSDB_ORDER_ASC); doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs);
// assign the group keys or user input constant values if required // assign the group keys or user input constant values if required
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex); doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
...@@ -295,7 +295,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -295,7 +295,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
} }
int32_t rowIndex = pBlock->info.rows - num; int32_t rowIndex = pBlock->info.rows - num;
doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->exprSupp.numOfExprs, TSDB_ORDER_ASC); doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs);
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex); doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
} }
} }
......
...@@ -641,8 +641,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num ...@@ -641,8 +641,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
doApplyFunctions(pTaskInfo, pSup->pCtx, &w, &pInfo->twAggSup.timeWindowData, startPos, 0, tsCols, pBlock->info.rows, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, numOfExprs);
numOfExprs, pInfo->inputOrder);
if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
closeResultRow(pr); closeResultRow(pr);
...@@ -986,8 +985,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -986,8 +985,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) &&
inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput, pInfo->inputOrder); pBlock->info.rows, numOfOutput);
} }
doCloseWindow(pResultRowInfo, pInfo, pResult); doCloseWindow(pResultRowInfo, pInfo, pResult);
...@@ -1026,8 +1025,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1026,8 +1025,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput, pInfo->inputOrder); pBlock->info.rows, numOfOutput);
doCloseWindow(pResultRowInfo, pInfo, pResult); doCloseWindow(pResultRowInfo, pInfo, pResult);
} }
...@@ -1190,8 +1189,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -1190,8 +1189,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
doApplyFunctions(pTaskInfo, pSup->pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
// here we start a new session window // here we start a new session window
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
...@@ -1215,8 +1214,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -1215,8 +1214,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
} }
static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
...@@ -1934,8 +1933,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1934,8 +1933,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
// pInfo->numOfRows data belong to the current session window // pInfo->numOfRows data belong to the current session window
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
doApplyFunctions(pTaskInfo, pSup->pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
// here we start a new session window // here we start a new session window
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
...@@ -1952,8 +1951,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1952,8 +1951,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
} }
static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
...@@ -2952,8 +2951,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -2952,8 +2951,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); pSDataBlock->info.rows, numOfOutput);
int32_t prevEndPos = (forwardRows - 1) * step + startPos; int32_t prevEndPos = (forwardRows - 1) * step + startPos;
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0); ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order); startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order);
...@@ -3776,8 +3775,7 @@ static int32_t doOneWindowAggImpl(int32_t tsColId, SOptrBasicInfo* pBinfo, SStre ...@@ -3776,8 +3775,7 @@ static int32_t doOneWindowAggImpl(int32_t tsColId, SOptrBasicInfo* pBinfo, SStre
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
updateTimeWindowInfo(pTimeWindowData, &pCurWin->win, false); updateTimeWindowInfo(pTimeWindowData, &pCurWin->win, false);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pCurWin->win, pTimeWindowData, startIndex, winRows, tsCols, doApplyFunctions(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, pSDataBlock->info.rows, numOutput);
pSDataBlock->info.rows, numOutput, TSDB_ORDER_ASC);
SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, pCurWin->pos.pageId); SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, pCurWin->pos.pageId);
setBufPageDirty(bufPage, true); setBufPageDirty(bufPage, true);
releaseBufPage(pAggSup->pResultBuf, bufPage); releaseBufPage(pAggSup->pResultBuf, bufPage);
...@@ -4938,8 +4936,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4938,8 +4936,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
} }
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, doApplyFunctions(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); pBlock->info.rows, numOfOutput);
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs); outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs);
miaInfo->curTs = tsCols[currPos]; miaInfo->curTs = tsCols[currPos];
...@@ -4960,8 +4958,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4960,8 +4958,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
} }
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, doApplyFunctions(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); pBlock->info.rows, numOfOutput);
} }
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
...@@ -5253,8 +5251,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ...@@ -5253,8 +5251,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
} }
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pTaskInfo, pExprSup->pCtx, &win, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, doApplyFunctions(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput, iaInfo->inputOrder); pBlock->info.rows, numOfOutput);
doCloseWindow(pResultRowInfo, iaInfo, pResult); doCloseWindow(pResultRowInfo, iaInfo, pResult);
// output previous interval results after this interval (&win) is closed // output previous interval results after this interval (&win) is closed
...@@ -5285,8 +5283,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ...@@ -5285,8 +5283,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup); doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pExprSup->pCtx, &nextWin, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, doApplyFunctions(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); pBlock->info.rows, numOfOutput);
doCloseWindow(pResultRowInfo, iaInfo, pResult); doCloseWindow(pResultRowInfo, iaInfo, pResult);
// output previous interval results after this interval (&nextWin) is closed // output previous interval results after this interval (&nextWin) is closed
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册