diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4804984397bc67df0133fd514ae562d426707082..72344a46f70e62045f39f657392ed54c2ca9e550 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -353,6 +353,7 @@ typedef struct STagScanInfo { int32_t curPos; SReadHandle readHandle; STableListInfo *pTableList; + SNode* pFilterNode; // filter info, } STagScanInfo; typedef enum EStreamScanMode { @@ -775,7 +776,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SNode* pConditions, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9d7f939bb015d6b074286b0b9a681027419e3f3e..32fdc19b7c557c22751c4ad22e94e5b2584726cc 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1883,8 +1883,11 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR ASSERT(pBlock->info.rows == numOfRows); } + SColumnInfoData tmp = *pSrc; *pSrc = *pDst; + *pDst = tmp; } + blockDataDestroy(px); // fix memory leak } else { // do nothing pBlock->info.rows = 0; @@ -4554,7 +4557,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); + return createTagScanOperatorInfo(pHandle, pScanPhyNode->node.pConditions, pScanPhyNode, pTableListInfo, pTaskInfo); } else { ASSERT(0); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ba0397fd3439e252c3de115459205dd534c7f78a..aed86c9d4466c0264149d021247301398c5b050b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1716,7 +1716,9 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { } pRes->info.rows = count; - pOperator->resultInfo.totalRows += count; + doFilter(pInfo->pFilterNode, pRes); + + pOperator->resultInfo.totalRows += pRes->info.rows; return (pRes->info.rows == 0) ? NULL : pInfo->pRes; } @@ -1726,7 +1728,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { pInfo->pRes = blockDataDestroy(pInfo->pRes); } -SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SNode* pConditions, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -1746,6 +1748,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi pInfo->pRes = createResDataBlock(pDescNode);; pInfo->readHandle = *pReadHandle; pInfo->curPos = 0; + pInfo->pFilterNode = pConditions; pOperator->name = "TagScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; pOperator->blocking = false;