diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index d08345a02f3f59d9626ee447d7905719a80b15d0..0ab172f03a3f0cfd07eedcd37e5336e43701057d 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -13,7 +13,6 @@
* along with this program. If not, see .
*/
-#include
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
@@ -1601,9 +1600,6 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) {
}
}
-static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId);
-static void doSetTagValueInParam(void* pTable, int32_t tagColId, SVariant* tag, int16_t type, int16_t bytes);
-
static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
uint32_t status = BLK_DATA_NOT_LOAD;
@@ -1771,100 +1767,6 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
return TSDB_CODE_SUCCESS;
}
-/*
- * set tag value in SqlFunctionCtx
- * e.g.,tag information into input buffer
- */
-static void doSetTagValueInParam(void* pTable, int32_t tagColId, SVariant* tag, int16_t type, int16_t bytes) {
- taosVariantDestroy(tag);
-
- char* val = NULL;
- // if (tagColId == TSDB_TBNAME_COLUMN_INDEX) {
- // val = tsdbGetTableName(pTable);
- // assert(val != NULL);
- // } else {
- // val = tsdbGetTableTagVal(pTable, tagColId, type, bytes);
- // }
-
- if (val == NULL || isNull(val, type)) {
- tag->nType = TSDB_DATA_TYPE_NULL;
- return;
- }
-
- if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
- int32_t maxLen = bytes - VARSTR_HEADER_SIZE;
- int32_t len = (varDataLen(val) > maxLen) ? maxLen : varDataLen(val);
- taosVariantCreateFromBinary(tag, varDataVal(val), len, type);
- // taosVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), type);
- } else {
- taosVariantCreateFromBinary(tag, val, bytes, type);
- }
-}
-
-static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId) {
- assert(pTagColList != NULL && numOfTags > 0);
-
- for (int32_t i = 0; i < numOfTags; ++i) {
- if (pTagColList[i].colId == colId) {
- return &pTagColList[i];
- }
- }
-
- return NULL;
-}
-
-void setTagValue(SOperatorInfo* pOperatorInfo, void* pTable, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
- SExprInfo* pExpr = pOperatorInfo->pExpr;
- SExprInfo* pExprInfo = &pExpr[0];
- int32_t functionId = getExprFunctionId(pExprInfo);
-#if 0
- if (pQueryAttr->numOfOutput == 1 && functionId == FUNCTION_TS_COMP && pQueryAttr->stableQuery) {
- assert(pExprInfo->base.numOfParams == 1);
-
- // int16_t tagColId = (int16_t)pExprInfo->base.param[0].i;
- int16_t tagColId = -1;
- SColumnInfo* pColInfo = doGetTagColumnInfoById(pQueryAttr->tagColList, pQueryAttr->numOfTags, tagColId);
-
- doSetTagValueInParam(pTable, tagColId, &pCtx[0].tag, pColInfo->type, pColInfo->bytes);
-
- } else {
- // set tag value, by which the results are aggregated.
- int32_t offset = 0;
- memset(pRuntimeEnv->tagVal, 0, pQueryAttr->tagLen);
-
- for (int32_t idx = 0; idx < numOfOutput; ++idx) {
- SExprInfo* pLocalExprInfo = &pExpr[idx];
-
- // ts_comp column required the tag value for join filter
- if (!TSDB_COL_IS_TAG(pLocalExprInfo->base.pParam[0].pCol->flag)) {
- continue;
- }
-
- // todo use tag column index to optimize performance
- doSetTagValueInParam(pTable, pLocalExprInfo->base.pParam[0].pCol->colId, &pCtx[idx].tag,
- pLocalExprInfo->base.resSchema.type, pLocalExprInfo->base.resSchema.bytes);
-
- if (IS_NUMERIC_TYPE(pLocalExprInfo->base.resSchema.type) ||
- pLocalExprInfo->base.resSchema.type == TSDB_DATA_TYPE_BOOL ||
- pLocalExprInfo->base.resSchema.type == TSDB_DATA_TYPE_TIMESTAMP) {
- memcpy(pRuntimeEnv->tagVal + offset, &pCtx[idx].tag.i, pLocalExprInfo->base.resSchema.bytes);
- } else {
- if (pCtx[idx].tag.pz != NULL) {
- memcpy(pRuntimeEnv->tagVal + offset, pCtx[idx].tag.pz, pCtx[idx].tag.nLen);
- }
- }
-
- offset += pLocalExprInfo->base.resSchema.bytes;
- }
- }
-
- // set the tsBuf start position before check each data block
- if (pRuntimeEnv->pTsBuf != NULL) {
- setCtxTagForJoin(pRuntimeEnv, &pCtx[0], pExprInfo, pTable);
- }
-#endif
-}
-
void copyToSDataBlock(SSDataBlock* pBlock, int32_t* offset, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pResBuf) {
pBlock->info.rows = 0;
@@ -4038,12 +3940,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
if (pProjectInfo->existDataBlock) { // TODO refactor
SSDataBlock* pBlock = pProjectInfo->existDataBlock;
pProjectInfo->existDataBlock = NULL;
- *newgroup = true;
-
- // todo dynamic set tags
- // if (pTableQueryInfo != NULL) {
- // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs);
- // }
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
@@ -4084,13 +3980,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
}
- // todo set tags
-
- // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
- // if (pTableQueryInfo != NULL) {
- // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs);
- // }
-
// the pDataBlock are always the same one, no need to call this again
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
@@ -4430,10 +4319,6 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
doDestroyBasicInfo(pInfo, numOfOutput);
}
-void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
- SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
-}
-
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
@@ -4778,7 +4663,6 @@ static SArray* extractColumnInfo(SNodeList* pNodeList);
static SArray* createSortInfo(SNodeList* pNodeList);
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
-static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
@@ -5447,150 +5331,3 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
return TSDB_CODE_SUCCESS;
}
-
-static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
- SJoinOperatorInfo* pJoinInfo = pOperator->info;
-
- SSDataBlock* pRes = pJoinInfo->pRes;
- blockDataCleanup(pRes);
- blockDataEnsureCapacity(pRes, 4096);
-
- int32_t nrows = 0;
-
- while (1) {
- if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
- SOperatorInfo* ds1 = pOperator->pDownstream[0];
- publishOperatorProfEvent(ds1, QUERY_PROF_BEFORE_OPERATOR_EXEC);
- pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1);
- publishOperatorProfEvent(ds1, QUERY_PROF_AFTER_OPERATOR_EXEC);
-
- pJoinInfo->leftPos = 0;
- if (pJoinInfo->pLeft == NULL) {
- setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
- break;
- }
- }
-
- if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
- SOperatorInfo* ds2 = pOperator->pDownstream[1];
- publishOperatorProfEvent(ds2, QUERY_PROF_BEFORE_OPERATOR_EXEC);
- pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2);
- publishOperatorProfEvent(ds2, QUERY_PROF_AFTER_OPERATOR_EXEC);
-
- pJoinInfo->rightPos = 0;
- if (pJoinInfo->pRight == NULL) {
- setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
- break;
- }
- }
-
- SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);
- char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos);
-
- SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId);
- char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos);
-
- // only the timestamp match support for ordinary table
- ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
- if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) {
- for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
- SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);
-
- SExprInfo* pExprInfo = &pOperator->pExpr[i];
-
- int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
- int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
- int32_t rowIndex = -1;
-
- SColumnInfoData* pSrc = NULL;
- if (pJoinInfo->pLeft->info.blockId == blockId) {
- pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId);
- rowIndex = pJoinInfo->leftPos;
- } else {
- pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId);
- rowIndex = pJoinInfo->rightPos;
- }
-
- if (colDataIsNull_s(pSrc, rowIndex)) {
- colDataAppendNULL(pDst, nrows);
- } else {
- char* p = colDataGetData(pSrc, rowIndex);
- colDataAppend(pDst, nrows, p, false);
- }
- }
-
- pJoinInfo->leftPos += 1;
- pJoinInfo->rightPos += 1;
-
- nrows += 1;
- } else if (*(int64_t*)pLeftVal < *(int64_t*)pRightVal) {
- pJoinInfo->leftPos += 1;
-
- if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
- continue;
- }
- } else if (*(int64_t*)pLeftVal > *(int64_t*)pRightVal) {
- pJoinInfo->rightPos += 1;
- if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
- continue;
- }
- }
-
- // the pDataBlock are always the same one, no need to call this again
- pRes->info.rows = nrows;
- if (pRes->info.rows >= pOperator->resultInfo.threshold) {
- break;
- }
- }
-
- return (pRes->info.rows > 0) ? pRes : NULL;
-}
-
-SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo,
- int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition,
- SExecTaskInfo* pTaskInfo) {
- SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
- SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
- if (pOperator == NULL || pInfo == NULL) {
- goto _error;
- }
-
- initResultSizeInfo(pOperator, 4096);
-
- pInfo->pRes = pResBlock;
- pOperator->name = "MergeJoinOperator";
- pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN;
- pOperator->blocking = false;
- pOperator->status = OP_NOT_OPENED;
- pOperator->pExpr = pExprInfo;
- pOperator->numOfExprs = numOfCols;
- pOperator->info = pInfo;
- pOperator->pTaskInfo = pTaskInfo;
-
- SOperatorNode* pNode = (SOperatorNode*)pOnCondition;
- setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft);
- setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight);
-
- pOperator->fpSet =
- createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
- int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
- if (code != TSDB_CODE_SUCCESS) {
- goto _error;
- }
-
- return pOperator;
-
-_error:
- taosMemoryFree(pInfo);
- taosMemoryFree(pOperator);
- pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
- return NULL;
-}
-
-void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
- pColumn->slotId = pColumnNode->slotId;
- pColumn->type = pColumnNode->node.resType.type;
- pColumn->bytes = pColumnNode->node.resType.bytes;
- pColumn->precision = pColumnNode->node.resType.precision;
- pColumn->scale = pColumnNode->node.resType.scale;
-}
diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c
new file mode 100644
index 0000000000000000000000000000000000000000..82bb435a2cc82abcfd1805c947231e2958f895e8
--- /dev/null
+++ b/source/libs/executor/src/joinoperator.c
@@ -0,0 +1,184 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include "function.h"
+#include "os.h"
+#include "querynodes.h"
+#include "tdatablock.h"
+#include "tmsg.h"
+#include "executorimpl.h"
+#include "tcompare.h"
+#include "thash.h"
+#include "ttypes.h"
+
+static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
+static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
+static void destroyMergeJoinOperator(void* param, int32_t numOfOutput);
+
+SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo,
+ int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition,
+ SExecTaskInfo* pTaskInfo) {
+ SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo));
+ SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
+ if (pOperator == NULL || pInfo == NULL) {
+ goto _error;
+ }
+
+ initResultSizeInfo(pOperator, 4096);
+
+ pInfo->pRes = pResBlock;
+ pOperator->name = "MergeJoinOperator";
+ pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN;
+ pOperator->blocking = false;
+ pOperator->status = OP_NOT_OPENED;
+ pOperator->pExpr = pExprInfo;
+ pOperator->numOfExprs = numOfCols;
+ pOperator->info = pInfo;
+ pOperator->pTaskInfo = pTaskInfo;
+
+ if (nodeType(pOnCondition) == QUERY_NODE_OPERATOR) {
+ SOperatorNode* pNode = (SOperatorNode*)pOnCondition;
+ setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft);
+ setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight);
+ } else if (nodeType(pOnCondition) == QUERY_NODE_LOGIC_CONDITION) {
+ ASSERT(0);
+ }
+
+ pOperator->fpSet =
+ createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
+ int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
+ if (code != TSDB_CODE_SUCCESS) {
+ goto _error;
+ }
+
+ return pOperator;
+
+ _error:
+ taosMemoryFree(pInfo);
+ taosMemoryFree(pOperator);
+ pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
+ return NULL;
+}
+
+void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
+ pColumn->slotId = pColumnNode->slotId;
+ pColumn->type = pColumnNode->node.resType.type;
+ pColumn->bytes = pColumnNode->node.resType.bytes;
+ pColumn->precision = pColumnNode->node.resType.precision;
+ pColumn->scale = pColumnNode->node.resType.scale;
+}
+
+void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
+ SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
+}
+
+SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
+ SJoinOperatorInfo* pJoinInfo = pOperator->info;
+
+ SSDataBlock* pRes = pJoinInfo->pRes;
+ blockDataCleanup(pRes);
+ blockDataEnsureCapacity(pRes, 4096);
+
+ int32_t nrows = 0;
+
+ while (1) {
+ // todo extract method
+ if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
+ SOperatorInfo* ds1 = pOperator->pDownstream[0];
+ publishOperatorProfEvent(ds1, QUERY_PROF_BEFORE_OPERATOR_EXEC);
+ pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1);
+ publishOperatorProfEvent(ds1, QUERY_PROF_AFTER_OPERATOR_EXEC);
+
+ pJoinInfo->leftPos = 0;
+ if (pJoinInfo->pLeft == NULL) {
+ setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
+ break;
+ }
+ }
+
+ if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
+ SOperatorInfo* ds2 = pOperator->pDownstream[1];
+ publishOperatorProfEvent(ds2, QUERY_PROF_BEFORE_OPERATOR_EXEC);
+ pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2);
+ publishOperatorProfEvent(ds2, QUERY_PROF_AFTER_OPERATOR_EXEC);
+
+ pJoinInfo->rightPos = 0;
+ if (pJoinInfo->pRight == NULL) {
+ setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
+ break;
+ }
+ }
+
+ SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);
+ char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos);
+
+ SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId);
+ char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos);
+
+ // only the timestamp match support for ordinary table
+ ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
+ if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) {
+ for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
+ SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);
+
+ SExprInfo* pExprInfo = &pOperator->pExpr[i];
+
+ int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
+ int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
+ int32_t rowIndex = -1;
+
+ SColumnInfoData* pSrc = NULL;
+ if (pJoinInfo->pLeft->info.blockId == blockId) {
+ pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId);
+ rowIndex = pJoinInfo->leftPos;
+ } else {
+ pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId);
+ rowIndex = pJoinInfo->rightPos;
+ }
+
+ if (colDataIsNull_s(pSrc, rowIndex)) {
+ colDataAppendNULL(pDst, nrows);
+ } else {
+ char* p = colDataGetData(pSrc, rowIndex);
+ colDataAppend(pDst, nrows, p, false);
+ }
+ }
+
+ pJoinInfo->leftPos += 1;
+ pJoinInfo->rightPos += 1;
+
+ nrows += 1;
+ } else if (*(int64_t*)pLeftVal < *(int64_t*)pRightVal) {
+ pJoinInfo->leftPos += 1;
+
+ if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
+ continue;
+ }
+ } else if (*(int64_t*)pLeftVal > *(int64_t*)pRightVal) {
+ pJoinInfo->rightPos += 1;
+ if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
+ continue;
+ }
+ }
+
+ // the pDataBlock are always the same one, no need to call this again
+ pRes->info.rows = nrows;
+ if (pRes->info.rows >= pOperator->resultInfo.threshold) {
+ break;
+ }
+ }
+
+ return (pRes->info.rows > 0) ? pRes : NULL;
+}
diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c
index 10dc482462f84a4dcdd7c53cb4bd164baa74f6af..79e675e2df33047dbcec027f97ad930a3a7b4ad2 100644
--- a/source/libs/executor/src/timewindowoperator.c
+++ b/source/libs/executor/src/timewindowoperator.c
@@ -773,7 +773,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
break;
}
- // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
@@ -1062,8 +1061,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
// The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the
// caller. Note that all the time window are not close till now.
-
- // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
if (pInfo->invertible) {
@@ -1377,7 +1374,6 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
break;
}
- // setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfExprs);
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
// hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0);