diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c6ff9138372c9ca8f004f1568f6fdd51ffe80d59..94a4294d4a2bce0e9989e97ec417f7afe957c478 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -807,7 +807,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scan int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); void doSetOperatorCompleted(SOperatorInfo* pOperator); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo); int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock, const char* idStr); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 012d25a0c957a30059d10d7299cf3f0ed183054f..eef292f3f20d46c9b580d77e8c6168544aac51ab 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1333,7 +1333,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo) { if (pFilterNode == NULL || pBlock->info.rows == 0) { return; } @@ -1354,6 +1354,20 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { filterFreeInfo(filter); extractQualifiedTupleByFilterResult(pBlock, rowRes, keep); + + if (pColMatchInfo != NULL) { + for(int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) { + SColMatchInfo* pInfo = taosArrayGet(pColMatchInfo, i); + if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->targetSlotId); + if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) { + blockDataUpdateTsWindow(pBlock, pInfo->targetSlotId); + break; + } + } + } + } + taosMemoryFree(rowRes); } @@ -3040,7 +3054,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); - doFilter(pAggInfo->pCondition, pInfo->pRes); + doFilter(pAggInfo->pCondition, pInfo->pRes, NULL); if (!hasDataInGroupInfo(&pAggInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); @@ -3401,7 +3415,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // do apply filter SSDataBlock* p = pProjectInfo->mergeDataBlocks ? pFinalRes : pRes; - doFilter(pProjectInfo->pFilterNode, p); + doFilter(pProjectInfo->pFilterNode, p, NULL); if (p->info.rows > 0) { break; } @@ -3543,7 +3557,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { break; } - doFilter(pInfo->pCondition, fillResult); + doFilter(pInfo->pCondition, fillResult, pInfo->pColMatchColInfo); if (fillResult->info.rows > 0) { break; } @@ -4005,7 +4019,7 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { } } - doFilter(pIndefInfo->pCondition, pInfo->pRes); + doFilter(pIndefInfo->pCondition, pInfo->pRes, NULL); size_t rows = pInfo->pRes->info.rows; if (rows > 0 || pOperator->status == OP_EXEC_DONE) { break; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 20630fd6ff2f246bc72ec82f6b659134fe394863..c83206b7301a54985615d83132cc9a635eb1a3c3 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -299,7 +299,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->binfo.pRes; while(1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pRes); + doFilter(pInfo->pCondition, pRes, NULL); bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); if (!hasRemain) { diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 497de8347cb21de975a3d767e4b42cf236795761..2e6c9bd351b5c5b9bdff21ca6d44f4d6520e0eab 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -211,7 +211,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { break; } if (pJoinInfo->pCondAfterMerge != NULL) { - doFilter(pJoinInfo->pCondAfterMerge, pRes); + doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL); } if (pRes->info.rows >= pOperator->resultInfo.threshold) { break; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9ecf6b96d77a12646dc22eba2f515456836fbc7c..31c20ca47b26bd6febfac4a9fcbd475c0c6bbe3a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -265,7 +265,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca } int64_t st = taosGetTimestampMs(); - doFilter(pTableScanInfo->pFilterNode, pBlock); + doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); int64_t et = taosGetTimestampMs(); pTableScanInfo->readRecorder.filterTime += (et - st); @@ -274,6 +274,8 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca pCost->filterOutBlocks += 1; qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + } else { + qDebug("%s data block filter out, elapsed time:%"PRId64, GET_TASKID(pTaskInfo), (et - st)); } return TSDB_CODE_SUCCESS; @@ -1225,7 +1227,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock } } - doFilter(pInfo->pCondition, pInfo->pRes); + doFilter(pInfo->pCondition, pInfo->pRes, NULL); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataFreeRes((SSDataBlock*)pBlock); return 0; @@ -1761,55 +1763,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; } - doFilter(pInfo->pCondition, pInfo->pRes); -#if 0 - SFilterInfo* filter = NULL; - - int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0); - - SFilterColumnParam param1 = {.numOfCols = pInfo->pRes->info.numOfCols, .pDataBlock = pInfo->pRes->pDataBlock}; - code = filterSetDataFromSlotId(filter, ¶m1); - - int8_t* rowRes = NULL; - bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); - filterFreeInfo(filter); - - SSDataBlock* px = createOneDataBlock(pInfo->pRes, false); - blockDataEnsureCapacity(px, pInfo->pRes->info.rows); - - // TODO refactor - int32_t numOfRow = 0; - for (int32_t i = 0; i < pInfo->pRes->info.numOfCols; ++i) { - SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i); - SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i); - - if (keep) { - colDataAssign(pDest, pSrc, pInfo->pRes->info.rows, &px->info); - numOfRow = pInfo->pRes->info.rows; - } else if (NULL != rowRes) { - numOfRow = 0; - for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) { - if (rowRes[j] == 0) { - continue; - } - - if (colDataIsNull_s(pSrc, j)) { - colDataAppendNULL(pDest, numOfRow); - } else { - colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false); - } - - numOfRow += 1; - } - } else { - numOfRow = 0; - } - } - - px->info.rows = numOfRow; - pInfo->pRes = px; -#endif - + doFilter(pInfo->pCondition, pInfo->pRes, NULL); return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; } @@ -2777,7 +2731,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc } int64_t st = taosGetTimestampMs(); - doFilter(pTableScanInfo->pFilterNode, pBlock); + doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); int64_t et = taosGetTimestampMs(); pTableScanInfo->readRecorder.filterTime += (et - st); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index f019ee94b854acce88fa824e28118dcd73dc3399..26dd772292fa4992eaced5fe103e9e7f7ef18c56 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -216,7 +216,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { return NULL; } - doFilter(pInfo->pCondition, pBlock); + doFilter(pInfo->pCondition, pBlock, pInfo->pColMatchInfo); if (blockDataGetNumOfRows(pBlock) == 0) { continue; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index b5966fc4633753f2f2c4df1dc20a622217c189cb..7c8b0ef6498a2218ba48ff3222be26ed62787304 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1178,7 +1178,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_RES_TO_RETURN) { while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes); + doFilter(pInfo->pCondition, pBInfo->pRes, NULL); bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); if (!hasRemain) { @@ -1219,7 +1219,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes); + doFilter(pInfo->pCondition, pBInfo->pRes, NULL); bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); if (!hasRemain) { @@ -1256,7 +1256,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBlock); + doFilter(pInfo->pCondition, pBlock, NULL); bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); if (!hasRemain) { @@ -1969,7 +1969,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_RES_TO_RETURN) { while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes); + doFilter(pInfo->pCondition, pBInfo->pRes, NULL); bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); if (!hasRemain) { @@ -2013,7 +2013,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes); + doFilter(pInfo->pCondition, pBInfo->pRes, NULL); bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); if (!hasRemain) { @@ -4626,7 +4626,7 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { getTableScanInfo(pOperator, &iaInfo->order, &scanFlag); setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->order, scanFlag, true); doMergeAlignedIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); - doFilter(miaInfo->pCondition, pRes); + doFilter(miaInfo->pCondition, pRes, NULL); if (pRes->info.rows >= pOperator->resultInfo.capacity) { break; }