提交 324977a9 编写于 作者: wmmhello's avatar wmmhello

feat:add filter for tag scan

上级 70e27717
...@@ -353,6 +353,7 @@ typedef struct STagScanInfo { ...@@ -353,6 +353,7 @@ typedef struct STagScanInfo {
int32_t curPos; int32_t curPos;
SReadHandle readHandle; SReadHandle readHandle;
STableListInfo *pTableList; STableListInfo *pTableList;
SNode* pFilterNode; // filter info,
} STagScanInfo; } STagScanInfo;
typedef enum EStreamScanMode { typedef enum EStreamScanMode {
...@@ -775,7 +776,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -775,7 +776,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SNode* pConditions, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
......
...@@ -1883,8 +1883,11 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR ...@@ -1883,8 +1883,11 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
ASSERT(pBlock->info.rows == numOfRows); ASSERT(pBlock->info.rows == numOfRows);
} }
SColumnInfoData tmp = *pSrc;
*pSrc = *pDst; *pSrc = *pDst;
*pDst = tmp;
} }
blockDataDestroy(px); // fix memory leak
} else { } else {
// do nothing // do nothing
pBlock->info.rows = 0; pBlock->info.rows = 0;
...@@ -4554,7 +4557,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4554,7 +4557,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); return createTagScanOperatorInfo(pHandle, pScanPhyNode->node.pConditions, pScanPhyNode, pTableListInfo, pTaskInfo);
} else { } else {
ASSERT(0); ASSERT(0);
} }
......
...@@ -1716,7 +1716,9 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -1716,7 +1716,9 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
} }
pRes->info.rows = count; pRes->info.rows = count;
pOperator->resultInfo.totalRows += count; doFilter(pInfo->pFilterNode, pRes);
pOperator->resultInfo.totalRows += pRes->info.rows;
return (pRes->info.rows == 0) ? NULL : pInfo->pRes; return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
} }
...@@ -1726,7 +1728,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -1726,7 +1728,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pRes = blockDataDestroy(pInfo->pRes);
} }
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SNode* pConditions, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -1746,6 +1748,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi ...@@ -1746,6 +1748,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
pInfo->pRes = createResDataBlock(pDescNode);; pInfo->pRes = createResDataBlock(pDescNode);;
pInfo->readHandle = *pReadHandle; pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0; pInfo->curPos = 0;
pInfo->pFilterNode = pConditions;
pOperator->name = "TagScanOperator"; pOperator->name = "TagScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
pOperator->blocking = false; pOperator->blocking = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册