From 182936c4f123a443e2876d0f10682c0d07b478e9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 21 May 2021 05:40:40 +0800 Subject: [PATCH] [TD-2570] support state window --- src/client/inc/tsclient.h | 2 +- src/client/src/tscSQLParser.c | 56 +++++++++++++++++------- src/client/src/tscServer.c | 1 + src/client/src/tscUtil.c | 7 +-- src/inc/taosmsg.h | 1 + src/query/inc/qExecutor.h | 2 +- src/query/src/qAggMain.c | 6 ++- src/query/src/qExecutor.c | 81 +++++++++++++++++++++++------------ src/query/src/qPlan.c | 12 ++++-- 9 files changed, 114 insertions(+), 54 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index b2fdafb7f3..554a062d96 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -226,7 +226,7 @@ typedef struct SQueryInfo { int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX int16_t resColumnId; // result column id bool distinctTag; // distinct tag or not - bool windowState; // window state or not + bool stateWindow; // window state or not int32_t round; // 0/1/.... int32_t bufLen; char* buf; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 9b60cca0ed..777ae88860 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -89,7 +89,7 @@ static int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCm static int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode); static int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* offsetToken); static int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding); -static int32_t validateWindowStateNode(SSqlCmd* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable); +static int32_t validateStateWindowNode(SSqlCmd* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable); static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem); @@ -829,34 +829,58 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pS // The following part is used to check for the invalid query expression. return checkInvalidExprForTimeWindow(pCmd, pQueryInfo); } -static int32_t validateWindowStateNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable) { +static int32_t validateStateWindowNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable) { const char* msg1 = "invalid column name"; - const char* msg2 = "invalid window state"; - const char* msg3 = "not support window_state on super table"; + const char* msg3 = "not support state window on super table/tag column"; + const char* msg4 = "not support state_window with group by "; SStrToken *col = &(pSqlNode->windowstateVal.col) ; if (col->z == NULL || col->n <= 0) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); + return TSDB_CODE_SUCCESS; + } + + if (pQueryInfo->colList == NULL) { + pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); + } + if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4); } + pQueryInfo->groupbyExpr.numOfGroupCols = 1; + //TODO(dengyihao): check tag column if (isStable) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); } SColumnIndex index = COLUMN_INDEX_INITIALIZER; - if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { + if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - if (tscSqlExprNumOfExprs(pQueryInfo) == 1) { - SSqlExpr* pExpr = &(tscSqlExprGet(pQueryInfo, 0)->base); - if (index.columnIndex == pExpr->colInfo.colIndex && pExpr->colType == TSDB_DATA_TYPE_INT) { - pQueryInfo->windowState = true; - return TSDB_CODE_SUCCESS; - } - } - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); + STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; + int32_t numOfCols = tscGetNumOfColumns(pTableMeta); + if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX || index.columnIndex >= numOfCols) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); + } + + SSqlGroupbyExpr* pGroupExpr = &pQueryInfo->groupbyExpr; + if (pGroupExpr->columnInfo == NULL) { + pGroupExpr->columnInfo = taosArrayInit(4, sizeof(SColIndex)); + } + + SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex); + if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP || pSchema->type == TSDB_DATA_TYPE_FLOAT || pSchema->type == TSDB_DATA_TYPE_DOUBLE) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); + } + + tscColumnListInsert(pQueryInfo->colList, index.columnIndex, pTableMeta->id.uid, pSchema); + SColIndex colIndex = { .colIndex = index.columnIndex, .flag = TSDB_COL_NORMAL, .colId = pSchema->colId }; + taosArrayPush(pGroupExpr->columnInfo, &colIndex); + pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC; + pQueryInfo->stateWindow = true; + return TSDB_CODE_SUCCESS; } int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pSqlNode) { @@ -7319,7 +7343,8 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) { TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } - if (validateWindowStateNode(pCmd, pQueryInfo, pSqlNode, isSTable) != TSDB_CODE_SUCCESS) { + // parse the window_state + if (validateStateWindowNode(pCmd, pQueryInfo, pSqlNode, isSTable) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } // set order by info @@ -7344,7 +7369,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) { return TSDB_CODE_TSC_INVALID_SQL; } - // parse the window_state /* * transfer sql functions that need secondary merge into another format * in dealing with super table queries such as: count/first/last diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d2bf458e58..7b2e83a1ec 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -875,6 +875,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->simpleAgg = query.simpleAgg; pQueryMsg->pointInterpQuery = query.pointInterpQuery; pQueryMsg->needReverseScan = query.needReverseScan; + pQueryMsg->stateWindow = query.stateWindow; pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->sqlstrLen = htonl(sqlLen); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index a40501d1cf..d64a3de930 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3545,12 +3545,13 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->simpleAgg = isSimpleAggregate(pQueryInfo); pQueryAttr->needReverseScan = tscNeedReverseScan(pQueryInfo); pQueryAttr->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type); - pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo); + pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && tscGroupbyColumn(pQueryInfo); pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo); pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo); pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo); pQueryAttr->distinctTag = pQueryInfo->distinctTag; - pQueryAttr->windowState = pQueryInfo->windowState; + pQueryAttr->sw = pQueryInfo->sessionWindow; + pQueryAttr->stateWindow = pQueryInfo->stateWindow; pQueryAttr->numOfCols = numOfCols; pQueryAttr->numOfOutput = numOfOutput; @@ -3558,8 +3559,8 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->slimit = pQueryInfo->slimit; pQueryAttr->order = pQueryInfo->order; pQueryAttr->fillType = pQueryInfo->fillType; - pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo); pQueryAttr->havingNum = pQueryInfo->havingFieldNum; + if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor pQueryAttr->window = pQueryInfo->window; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 3b7022fb88..d98fc9a6e2 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -471,6 +471,7 @@ typedef struct { bool simpleAgg; bool pointInterpQuery; // point interpolation query bool needReverseScan; // need reverse scan + bool stateWindow; // state window flag STimeWindow window; int32_t numOfTables; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 6640061c63..6ad23e219b 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -196,7 +196,7 @@ typedef struct SQueryAttr { bool pointInterpQuery; // point interpolation query bool needReverseScan; // need reverse scan bool distinctTag; // distinct tag query - bool windowState; // window State on sub/normal table + bool stateWindow; // window State on sub/normal table int32_t interBufSize; // intermediate buffer sizse int32_t havingNum; // having expr number diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 3b1ffa46d9..e99f135904 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -3298,8 +3298,12 @@ static void col_project_function(SQLFunctionCtx *pCtx) { if (pCtx->numOfParams == 2) { return; } + if (pCtx->param[0].i64 == 1) { + SET_VAL(pCtx, pCtx->size, 1); + } else { + INC_INIT_VAL(pCtx, pCtx->size); + } - INC_INIT_VAL(pCtx, pCtx->size); char *pData = GET_INPUT_DATA_LIST(pCtx); if (pCtx->order == TSDB_ORDER_ASC) { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 9bec84e28e..6c87b848a4 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -190,7 +190,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator); static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock); static int32_t getGroupbyColumnIndex(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); -static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); +static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *bInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size); static void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); @@ -727,7 +727,6 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) { pCtx[k].preAggVals.isSet = false; } - if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { aAggs[functionId].xFunction(&pCtx[k]); } @@ -1295,7 +1294,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn } int32_t ret = - setGroupResultOutputBuf(pRuntimeEnv, pInfo, pOperator->numOfOutput, val, type, bytes, item->groupIndex); + setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, val, type, bytes, item->groupIndex); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } @@ -1336,7 +1335,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf pInfo->curWindow.ekey = tsList[j]; pInfo->prevTs = tsList[j]; pInfo->numOfRows += 1; - pInfo->start = j; + //pInfo->start = j; } else { // start a new session window SResultRow* pResult = NULL; @@ -1387,12 +1386,12 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { } } -static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) { +static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) { SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - int32_t *rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset; - SResultRowInfo *pResultRowInfo = &pInfo->binfo.resultRowInfo; - SQLFunctionCtx *pCtx = pInfo->binfo.pCtx; + int32_t *rowCellInfoOffset = binfo->rowCellInfoOffset; + SResultRowInfo *pResultRowInfo = &binfo->resultRowInfo; + SQLFunctionCtx *pCtx = binfo->pCtx; // not assign result buffer yet, add new result buffer, TODO remove it char* d = pData; @@ -3143,7 +3142,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t numOfOutput = pOperator->numOfOutput; - if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0) { + if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow) { // for each group result, call the finalize function for each column if (pQueryAttr->groupbyColumn) { closeAllResultRows(pResultRowInfo); @@ -5063,37 +5062,37 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STableQueryInfo* item = pRuntimeEnv->current; SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex); + SOptrBasicInfo* pBInfo = &pInfo->binfo; + + bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); int16_t bytes = pColInfoData->info.bytes; int16_t type = pColInfoData->info.type; - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - STableQueryInfo* item = pRuntimeEnv->current; - bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0); TSKEY* tsList = (TSKEY*)pTsColInfoData->pData; - - if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_TIMESTAMP) { - qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_QID(pRuntimeEnv)); - return; - } + + pInfo->numOfRows = 0; for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { char* val = ((char*)pColInfoData->pData) + bytes * j; if (isNull(val, type)) { continue; } if (pInfo->prevData == NULL) { - pInfo->prevData = malloc(bytes); + pInfo->prevData = malloc(bytes); memcpy(pInfo->prevData, val, bytes); + pInfo->numOfRows = 1; pInfo->curWindow.skey = tsList[j]; pInfo->curWindow.ekey = tsList[j]; - pInfo->numOfRows = 1; - pInfo->start = j; - } else if (0 == memcmp(pInfo->prevData, val, bytes)) { + pInfo->start = j; + + } else if (memcmp(pInfo->prevData, val, bytes) == 0) { pInfo->curWindow.ekey = tsList[j]; pInfo->numOfRows += 1; - pInfo->start = j; + pInfo->start = j; } else { SResultRow* pResult = NULL; int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan, @@ -5102,7 +5101,6 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } - doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); @@ -5111,7 +5109,30 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI memcpy(pInfo->prevData, val, bytes); pInfo->numOfRows = 1; pInfo->start = j; - } + + } + // Compare with the previous row of this column, and do not set the output buffer again if they are identical. + //if (pInfo->prevData == NULL || (memcmp(pInfo->prevData, val, bytes) != 0)) { + // if (pInfo->prevData == NULL) { + // pInfo->prevData = malloc(bytes); + // } + // pInfo->curWindow.skey = tsList[j]; + // pInfo->curWindow.ekey = tsList[j]; + + // memcpy(pInfo->prevData, val, bytes); + // int32_t ret = + // setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, (char *)&v, type, bytes, item->groupIndex); + // if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + // longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + // } + //} + //for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { + // pInfo->binfo.pCtx[k].size = 1; + // int32_t functionId = pInfo->binfo.pCtx[k].functionId; + // if (functionNeedToExecute(pRuntimeEnv, &pInfo->binfo.pCtx[k], functionId)) { + // aAggs[functionId].xFunctionF(&pInfo->binfo.pCtx[k], j); + // } + //} } SResultRow* pResult = NULL; @@ -5124,25 +5145,28 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); - } static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; } + SStateWindowOperatorInfo* pWindowInfo = pOperator->info; - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SOptrBasicInfo* pBInfo = &pWindowInfo->binfo; + + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } + return pBInfo->pRes; } + + SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t order = pQueryAttr->order.order; STimeWindow win = pQueryAttr->window; SOperatorInfo* upstream = pOperator->upstream; @@ -5153,7 +5177,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { } setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, pQueryAttr->order.order); if (pWindowInfo->colIndex == -1) { - pWindowInfo->colIndex = pOperator->pExpr->base.colInfo.colIndex; + pWindowInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock); } doStateWindowAggImpl(pOperator, pWindowInfo, pBlock); } @@ -6965,6 +6989,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr pQueryAttr->simpleAgg = pQueryMsg->simpleAgg; pQueryAttr->pointInterpQuery = pQueryMsg->pointInterpQuery; pQueryAttr->needReverseScan = pQueryMsg->needReverseScan; + pQueryAttr->stateWindow = pQueryMsg->stateWindow; pQueryAttr->vgId = vgId; pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index 52b0329293..7ed0e10327 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -113,6 +113,14 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { op = OP_SessionWindow; taosArrayPush(plan, &op); + if (pQueryAttr->pExpr2 != NULL) { + op = OP_Arithmetic; + taosArrayPush(plan, &op); + } + } else if (pQueryAttr->stateWindow) { + op = OP_StateWindow; + taosArrayPush(plan, &op); + if (pQueryAttr->pExpr2 != NULL) { op = OP_Arithmetic; taosArrayPush(plan, &op); @@ -121,10 +129,6 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { if (pQueryAttr->stableQuery && !pQueryAttr->tsCompQuery) { op = OP_MultiTableAggregate; } else { - if (pQueryAttr->windowState) { - op = OP_StateWindow; - taosArrayPush(plan, &op); - } op = OP_Aggregate; } -- GitLab