diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d70b61bb4aed7859a45aa06dd07935bf331e2b1b..0999c44fab3f85529c678bf1fc46f405de7a4e41 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -289,7 +289,7 @@ static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, SQueryRuntimeE return; } - int32_t orderId = pRuntimeEnv->pQueryAttr->order.orderColId; + int32_t orderId = pRuntimeEnv->pQueryAttr->order.col.info.colId; if (orderId <= 0) { return; } @@ -1914,7 +1914,7 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr pCtx->param[3].i = functionId; pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT; - pCtx->param[1].i = pQueryAttr->order.orderColId; + pCtx->param[1].i = pQueryAttr->order.col.info.colId; } else if (functionId == FUNCTION_INTERP) { pCtx->param[2].i = (int8_t)pQueryAttr->fillType; if (pQueryAttr->fillVal != NULL) { @@ -2013,162 +2013,162 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf int32_t* op = taosArrayGet(pOperator, i); switch (*op) { - case OP_TagScan: { - pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - break; - } - case OP_MultiTableTimeInterval: { - pRuntimeEnv->proot = - createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); - break; - } - case OP_AllMultiTableTimeInterval: { - pRuntimeEnv->proot = - createAllMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); - break; - } - case OP_TimeWindow: { - pRuntimeEnv->proot = - createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; - if (opType != OP_DummyInput && opType != OP_Join) { - setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); - } - break; - } - case OP_AllTimeWindow: { - pRuntimeEnv->proot = - createAllTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; - if (opType != OP_DummyInput && opType != OP_Join) { - setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); - } - break; - } - case OP_Groupby: { - pRuntimeEnv->proot = - createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - - int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; - if (opType != OP_DummyInput) { - setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); - } - break; - } - case OP_SessionWindow: { - pRuntimeEnv->proot = - createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; - if (opType != OP_DummyInput) { - setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); - } - break; - } - case OP_MultiTableAggregate: { - pRuntimeEnv->proot = - createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); - break; - } - case OP_Aggregate: { - pRuntimeEnv->proot = - createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - - int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; - if (opType != OP_DummyInput && opType != OP_Join) { - setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); - } - break; - } - - case OP_Project: { // TODO refactor to remove arith operator. - SOperatorInfo* prev = pRuntimeEnv->proot; - if (i == 0) { - pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - if (pRuntimeEnv->proot != NULL && prev->operatorType != OP_DummyInput && prev->operatorType != OP_Join) { // TODO refactor - setTableScanFilterOperatorInfo(prev->info, pRuntimeEnv->proot); - } - } else { - prev = pRuntimeEnv->proot; - assert(pQueryAttr->pExpr2 != NULL); - pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); - } - break; - } - - case OP_StateWindow: { - pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; - if (opType != OP_DummyInput) { - setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); - } - break; - } - - case OP_Limit: { - pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); - break; - } - - case OP_Filter: { // todo refactor - int32_t numOfFilterCols = 0; - if (pQueryAttr->stableQuery) { - SColumnInfo* pColInfo = - extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols); - pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, - pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols); - freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3); - } else { - SColumnInfo* pColInfo = - extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols); - pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, - pQueryAttr->numOfOutput, pColInfo, numOfFilterCols); - freeColumnInfo(pColInfo, pQueryAttr->numOfOutput); - } - - break; - } - - case OP_Fill: { - SOperatorInfo* pInfo = pRuntimeEnv->proot; - pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput, pQueryAttr->multigroupResult); - break; - } - - case OP_MultiwayMergeSort: { - pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 4096, merger); - break; - } - - case OP_GlobalAggregate: { // If fill operator exists, the result rows of different group can not be in the same SSDataBlock. - bool multigroupResult = pQueryAttr->multigroupResult; - if (pQueryAttr->multigroupResult) { - multigroupResult = (pQueryAttr->fillType == TSDB_FILL_NONE); - } - - pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, - pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, multigroupResult); - break; - } - - case OP_SLimit: { - int32_t num = pRuntimeEnv->proot->numOfOutput; - SExprInfo* pExpr = pRuntimeEnv->proot->pExpr; - pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger, pQueryAttr->multigroupResult); - break; - } - - case OP_Distinct: { - pRuntimeEnv->proot = createDistinctOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - break; - } - - case OP_Order: { - pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order); - break; - } +// case OP_TagScan: { +// pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); +// break; +// } +// case OP_MultiTableTimeInterval: { +// pRuntimeEnv->proot = +// createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); +// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); +// break; +// } +// case OP_AllMultiTableTimeInterval: { +// pRuntimeEnv->proot = +// createAllMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); +// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); +// break; +// } +// case OP_TimeWindow: { +// pRuntimeEnv->proot = +// createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); +// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; +// if (opType != OP_DummyInput && opType != OP_Join) { +// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); +// } +// break; +// } +// case OP_AllTimeWindow: { +// pRuntimeEnv->proot = +// createAllTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); +// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; +// if (opType != OP_DummyInput && opType != OP_Join) { +// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); +// } +// break; +// } +// case OP_Groupby: { +// pRuntimeEnv->proot = +// createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); +// +// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; +// if (opType != OP_DummyInput) { +// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); +// } +// break; +// } +// case OP_SessionWindow: { +// pRuntimeEnv->proot = +// createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); +// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; +// if (opType != OP_DummyInput) { +// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); +// } +// break; +// } +// case OP_MultiTableAggregate: { +// pRuntimeEnv->proot = +// createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); +// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); +// break; +// } +// case OP_Aggregate: { +// pRuntimeEnv->proot = +// createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); +// +// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; +// if (opType != OP_DummyInput && opType != OP_Join) { +// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); +// } +// break; +// } +// +// case OP_Project: { // TODO refactor to remove arith operator. +// SOperatorInfo* prev = pRuntimeEnv->proot; +// if (i == 0) { +// pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); +// if (pRuntimeEnv->proot != NULL && prev->operatorType != OP_DummyInput && prev->operatorType != OP_Join) { // TODO refactor +// setTableScanFilterOperatorInfo(prev->info, pRuntimeEnv->proot); +// } +// } else { +// prev = pRuntimeEnv->proot; +// assert(pQueryAttr->pExpr2 != NULL); +// pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); +// } +// break; +// } +// +// case OP_StateWindow: { +// pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); +// int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; +// if (opType != OP_DummyInput) { +// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); +// } +// break; +// } +// +// case OP_Limit: { +// pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); +// break; +// } +// +// case OP_Filter: { // todo refactor +// int32_t numOfFilterCols = 0; +// if (pQueryAttr->stableQuery) { +// SColumnInfo* pColInfo = +// extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols); +// pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, +// pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols); +// freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3); +// } else { +// SColumnInfo* pColInfo = +// extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols); +// pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, +// pQueryAttr->numOfOutput, pColInfo, numOfFilterCols); +// freeColumnInfo(pColInfo, pQueryAttr->numOfOutput); +// } +// +// break; +// } +// +// case OP_Fill: { +// SOperatorInfo* pInfo = pRuntimeEnv->proot; +// pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput, pQueryAttr->multigroupResult); +// break; +// } +// +// case OP_MultiwayMergeSort: { +// pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 4096, merger); +// break; +// } +// +// case OP_GlobalAggregate: { // If fill operator exists, the result rows of different group can not be in the same SSDataBlock. +// bool multigroupResult = pQueryAttr->multigroupResult; +// if (pQueryAttr->multigroupResult) { +// multigroupResult = (pQueryAttr->fillType == TSDB_FILL_NONE); +// } +// +// pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, +// pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, multigroupResult); +// break; +// } +// +// case OP_SLimit: { +// int32_t num = pRuntimeEnv->proot->numOfOutput; +// SExprInfo* pExpr = pRuntimeEnv->proot->pExpr; +// pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger, pQueryAttr->multigroupResult); +// break; +// } +// +// case OP_Distinct: { +// pRuntimeEnv->proot = createDistinctOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); +// break; +// } +// +// case OP_Order: { +// pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order); +// break; +// } default: { assert(0); @@ -4557,22 +4557,22 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr setResultBufSize(pQueryAttr, &pRuntimeEnv->resultInfo); switch(tbScanner) { - case OP_TableBlockInfoScan: { - pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); - break; - } - case OP_TableSeqScan: { - pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); - break; - } - case OP_DataBlocksOptScan: { - pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0); - break; - } - case OP_TableScan: { - pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr)); - break; - } +// case OP_TableBlockInfoScan: { +// pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); +// break; +// } +// case OP_TableSeqScan: { +// pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); +// break; +// } +// case OP_DataBlocksOptScan: { +// pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0); +// break; +// } +// case OP_TableScan: { +// pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr)); +// break; +// } default: { // do nothing break; } @@ -4881,7 +4881,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableScanOperator"; - pOperator->operatorType = OP_TableScan; +// pOperator->operatorType = OP_TableScan; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -4905,7 +4905,7 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableSeqScanOperator"; - pOperator->operatorType = OP_TableSeqScan; +// pOperator->operatorType = OP_TableSeqScan; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -4930,7 +4930,7 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableBlockInfoScanOperator"; - pOperator->operatorType = OP_TableBlockInfoScan; +// pOperator->operatorType = OP_TableBlockInfoScan; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -4946,7 +4946,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf pTableScanInfo->pExpr = pDownstream->pExpr; // TODO refactor to use colId instead of pExpr pTableScanInfo->numOfOutput = pDownstream->numOfOutput; - +#if 0 if (pDownstream->operatorType == OP_Aggregate || pDownstream->operatorType == OP_MultiTableAggregate) { SAggOperatorInfo* pAggInfo = pDownstream->info; @@ -4995,6 +4995,8 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf } else { assert(0); } +#endif + } SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) { @@ -5009,7 +5011,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); pOptr->name = "DataBlocksOptimizedScanOperator"; - pOptr->operatorType = OP_DataBlocksOptScan; +// pOptr->operatorType = OP_DataBlocksOptScan; pOptr->pRuntimeEnv = pRuntimeEnv; pOptr->blockingOptr = false; pOptr->info = pInfo; @@ -5161,7 +5163,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "GlobalAggregate"; - pOperator->operatorType = OP_GlobalAggregate; +// pOperator->operatorType = OP_GlobalAggregate; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -5205,7 +5207,7 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "MultiwaySortOperator"; - pOperator->operatorType = OP_MultiwayMergeSort; +// pOperator->operatorType = OP_MultiwayMergeSort; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -5312,7 +5314,7 @@ SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "InMemoryOrder"; - pOperator->operatorType = OP_Order; +// pOperator->operatorType = OP_Order; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -5358,10 +5360,10 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); } - if (upstream->operatorType == OP_DataBlocksOptScan) { - STableScanInfo* pScanInfo = upstream->info; - order = getTableScanOrder(pScanInfo); - } +// if (upstream->operatorType == OP_DataBlocksOptScan) { +// STableScanInfo* pScanInfo = upstream->info; +// order = getTableScanOrder(pScanInfo); +// } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); @@ -5413,10 +5415,10 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) { setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); - if (upstream->operatorType == OP_DataBlocksOptScan) { - STableScanInfo* pScanInfo = upstream->info; - order = getTableScanOrder(pScanInfo); - } +// if (upstream->operatorType == OP_DataBlocksOptScan) { +// STableScanInfo* pScanInfo = upstream->info; +// order = getTableScanOrder(pScanInfo); +// } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); @@ -6268,7 +6270,7 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableAggregate"; - pOperator->operatorType = OP_Aggregate; +// pOperator->operatorType = OP_Aggregate; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -6363,7 +6365,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "MultiTableAggregate"; - pOperator->operatorType = OP_MultiTableAggregate; +// pOperator->operatorType = OP_MultiTableAggregate; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -6393,7 +6395,7 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "ProjectOperator"; - pOperator->operatorType = OP_Project; +// pOperator->operatorType = OP_Project; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -6452,7 +6454,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "FilterOperator"; - pOperator->operatorType = OP_Filter; +// pOperator->operatorType = OP_Filter; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->numOfOutput = numOfOutput; @@ -6473,7 +6475,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "LimitOperator"; - pOperator->operatorType = OP_Limit; +// pOperator->operatorType = OP_Limit; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->exec = doLimit; @@ -6494,7 +6496,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TimeIntervalAggOperator"; - pOperator->operatorType = OP_TimeWindow; +// pOperator->operatorType = OP_TimeWindow; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->pExpr = pExpr; @@ -6519,7 +6521,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "AllTimeIntervalAggOperator"; - pOperator->operatorType = OP_AllTimeWindow; +// pOperator->operatorType = OP_AllTimeWindow; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->pExpr = pExpr; @@ -6543,7 +6545,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "StateWindowOperator"; - pOperator->operatorType = OP_StateWindow; +// pOperator->operatorType = OP_StateWindow; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->pExpr = pExpr; @@ -6568,7 +6570,7 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "SessionWindowAggOperator"; - pOperator->operatorType = OP_SessionWindow; +// pOperator->operatorType = OP_SessionWindow; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->pExpr = pExpr; @@ -6591,7 +6593,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "MultiTableTimeIntervalOperator"; - pOperator->operatorType = OP_MultiTableTimeInterval; +// pOperator->operatorType = OP_MultiTableTimeInterval; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->pExpr = pExpr; @@ -6615,7 +6617,7 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRu SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "AllMultiTableTimeIntervalOperator"; - pOperator->operatorType = OP_AllMultiTableTimeInterval; +// pOperator->operatorType = OP_AllMultiTableTimeInterval; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->pExpr = pExpr; @@ -6651,7 +6653,7 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato pOperator->name = "GroupbyAggOperator"; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; - pOperator->operatorType = OP_Groupby; +// pOperator->operatorType = OP_Groupby; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; @@ -6690,7 +6692,7 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn pOperator->name = "FillOperator"; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; - pOperator->operatorType = OP_Fill; +// pOperator->operatorType = OP_Fill; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; @@ -6738,7 +6740,7 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "SLimitOperator"; - pOperator->operatorType = OP_SLimit; +// pOperator->operatorType = OP_SLimit; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->exec = doSLimit; @@ -6894,7 +6896,7 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "SeqTableTagScan"; - pOperator->operatorType = OP_TagScan; +// pOperator->operatorType = OP_TagScan; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -7035,7 +7037,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat pOperator->name = "DistinctOperator"; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; - pOperator->operatorType = OP_Distinct; +// pOperator->operatorType = OP_Distinct; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; @@ -8034,7 +8036,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S pQueryAttr->limit.limit = pQueryMsg->limit; pQueryAttr->limit.offset = pQueryMsg->offset; pQueryAttr->order.order = pQueryMsg->order; - pQueryAttr->order.orderColId = pQueryMsg->orderColId; + pQueryAttr->order.col.info.colId = pQueryMsg->orderColId; pQueryAttr->pExpr1 = pExprs; pQueryAttr->pExpr2 = pSecExprs; pQueryAttr->numOfExpr2 = pQueryMsg->secondStageOutput;