diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b81fdafef07d176df0fb9e73e6d66f574e065077..32a5140da2526c3bcd266aea4ed73ba5b689f0c4 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -550,21 +550,21 @@ typedef struct SFillOperatorInfo { int32_t capacity; } SFillOperatorInfo; -typedef struct SGroupKeys { +typedef struct { char *pData; bool isNull; int16_t type; int32_t bytes; -}SGroupKeys; +} SGroupKeys, SStateKeys; typedef struct SGroupbyOperatorInfo { SOptrBasicInfo binfo; SArray* pGroupCols; SArray* pGroupColVals; // current group column values, SArray SNode* pCondition; - bool isInit; // denote if current val is initialized or not - char* keyBuf; // group by keys for hash - int32_t groupKeyLen; // total group by column width + bool isInit; // denote if current val is initialized or not + char* keyBuf; // group by keys for hash + int32_t groupKeyLen; // total group by column width SGroupResInfo groupResInfo; SAggSupporter aggSup; } SGroupbyOperatorInfo; @@ -592,27 +592,33 @@ typedef struct SPartitionOperatorInfo { int32_t pageIndex; // page index of current group } SPartitionOperatorInfo; +typedef struct SWindowRowsSup { + STimeWindow win; + TSKEY prevTs; + int32_t startRowIndex; + int32_t numOfRows; +} SWindowRowsSup; + typedef struct SSessionAggOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; SGroupResInfo groupResInfo; - STimeWindow curWindow; // current time window - TSKEY prevTs; // previous timestamp - int32_t numOfRows; // number of rows - int32_t start; // start row index + SWindowRowsSup winSup; bool reptScan; // next round scan int64_t gap; // session window gap SColumnInfoData timeWindowData; // query time window info for scalar function execution. } SSessionAggOperatorInfo; typedef struct SStateWindowOperatorInfo { - SOptrBasicInfo binfo; - STimeWindow curWindow; // current time window - int32_t numOfRows; // number of rows - int32_t colIndex; // start row index - int32_t start; - char* prevData; // previous data - bool reptScan; + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + int32_t colIndex; // start row index + bool hasKey; + SStateKeys stateKey; + SColumnInfoData timeWindowData; // query time window info for scalar function execution. +// bool reptScan; } SStateWindowOperatorInfo; typedef struct SSortedMergeOperatorInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 28a8c7bce8ab1c7ece1ac22e336609aa75665735..fb427025eacbf4dad144db73f971533a9cb6e68b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1637,22 +1637,23 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe // updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey); } -static void doKeepTuple(SSessionAggOperatorInfo* pInfo, int64_t ts) { - pInfo->curWindow.ekey = ts; - pInfo->prevTs = ts; - pInfo->numOfRows += 1; +static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts) { + pRowSup->win.ekey = ts; + pRowSup->prevTs = ts; + pRowSup->numOfRows += 1; } -static void doKeepSessionStartInfo(SSessionAggOperatorInfo* pInfo, const int64_t* tsList, int32_t rowIndex) { - pInfo->start = rowIndex; - pInfo->numOfRows = 0; - pInfo->curWindow.skey = tsList[rowIndex]; +static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex) { + pRowSup->startRowIndex = rowIndex; + pRowSup->numOfRows = 0; + pRowSup->win.skey = tsList[rowIndex]; } // todo handle multiple tables cases. static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + // todo find the correct time stamp column slot SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0); bool masterScan = true; @@ -1660,31 +1661,34 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator int64_t gid = pBlock->info.groupId; int64_t gap = pInfo->gap; - pInfo->numOfRows = 0; + if (!pInfo->reptScan) { pInfo->reptScan = true; - pInfo->prevTs = INT64_MIN; + pInfo->winSup.prevTs = INT64_MIN; } + SWindowRowsSup* pRowSup = &pInfo->winSup; + pRowSup->numOfRows = 0; + // In case of ascending or descending order scan data, only one time window needs to be kepted for each table. TSKEY* tsList = (TSKEY*)pColInfoData->pData; for (int32_t j = 0; j < pBlock->info.rows; ++j) { - if (pInfo->prevTs == INT64_MIN) { - doKeepSessionStartInfo(pInfo, tsList, j); - doKeepTuple(pInfo, tsList[j]); - } else if (tsList[j] - pInfo->prevTs <= gap && (tsList[j] - pInfo->prevTs) >= 0) { + if (pInfo->winSup.prevTs == INT64_MIN) { + doKeepNewWindowStartInfo(pRowSup, tsList, j); + doKeepTuple(pRowSup, tsList[j]); + } else if (tsList[j] - pRowSup->prevTs <= gap && (tsList[j] - pRowSup->prevTs) >= 0) { // The gap is less than the threshold, so it belongs to current session window that has been opened already. - doKeepTuple(pInfo, tsList[j]); - if (j == 0 && pInfo->start != 0) { - pInfo->start = 0; + doKeepTuple(pRowSup, tsList[j]); + if (j == 0 && pRowSup->startRowIndex != 0) { + pRowSup->startRowIndex = 0; } } else { // start a new session window SResultRow* pResult = NULL; // keep the time window for the closed time window. - STimeWindow window = pInfo->curWindow; + STimeWindow window = pRowSup->win; - pInfo->curWindow.ekey = pInfo->curWindow.skey; + pRowSup->win.ekey = pRowSup->win.skey; int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &window, masterScan, &pResult, gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code @@ -1693,24 +1697,24 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator // pInfo->numOfRows data belong to the current session window updateTimeWindowInfo(&pInfo->timeWindowData, &window, false); - doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); // here we start a new session window - doKeepSessionStartInfo(pInfo, tsList, j); - doKeepTuple(pInfo, tsList[j]); + doKeepNewWindowStartInfo(pRowSup, tsList, j); + doKeepTuple(pRowSup, tsList[j]); } } SResultRow* pResult = NULL; - pInfo->curWindow.ekey = tsList[pBlock->info.rows - 1]; - int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &pInfo->curWindow, masterScan, &pResult, + pRowSup->win.ekey = tsList[pBlock->info.rows - 1]; + int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &pRowSup->win, masterScan, &pResult, gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } - updateTimeWindowInfo(&pInfo->timeWindowData, &pInfo->curWindow, false); - doApplyFunctions(pInfo->binfo.pCtx, &pInfo->curWindow, &pInfo->timeWindowData, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->timeWindowData, &pRowSup->win, false); + doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { @@ -5641,84 +5645,78 @@ static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgr pOperator->status = OP_EXEC_DONE; } - // SQInfo* pQInfo = pRuntimeEnv->qinfo; - // pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st); - return pIntervalInfo->binfo.pRes; } -static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pSDataBlock) { - STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - STableQueryInfo* item = pRuntimeEnv->current; - SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex); - +static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SOptrBasicInfo* pBInfo = &pInfo->binfo; - bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); - int16_t bytes = pColInfoData->info.bytes; - int16_t type = pColInfoData->info.type; + SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex); + int64_t gid = pBlock->info.groupId; - SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0); - TSKEY* tsList = (TSKEY*)pTsColInfoData->pData; - if (IS_REPEAT_SCAN(pRuntimeEnv) && !pInfo->reptScan) { - pInfo->reptScan = true; - taosMemoryFreeClear(pInfo->prevData); - } + bool masterScan = true; + int32_t numOfOutput = pOperator->numOfOutput; + + int16_t bytes = pStateColInfoData->info.bytes; + int16_t type = pStateColInfoData->info.type; + + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0); + TSKEY* tsList = (TSKEY*)pColInfoData->pData; - pInfo->numOfRows = 0; - for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { - char* val = ((char*)pColInfoData->pData) + bytes * j; - if (isNull(val, type)) { + SWindowRowsSup* pRowSup = &pInfo->winSup; + pRowSup->numOfRows = 0; + + for (int32_t j = 0; j < pBlock->info.rows; ++j) { + if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pBlock->pBlockAgg)) { continue; } - if (pInfo->prevData == NULL) { - pInfo->prevData = taosMemoryMalloc(bytes); - memcpy(pInfo->prevData, val, bytes); - pInfo->numOfRows = 1; - pInfo->curWindow.skey = tsList[j]; - pInfo->curWindow.ekey = tsList[j]; - pInfo->start = j; - - } else if (memcmp(pInfo->prevData, val, bytes) == 0) { - pInfo->curWindow.ekey = tsList[j]; - pInfo->numOfRows += 1; - // pInfo->start = j; - if (j == 0 && pInfo->start != 0) { - pInfo->numOfRows = 1; - pInfo->start = 0; + + char* val = colDataGetData(pStateColInfoData, j); + + if (!pInfo->hasKey) { + memcpy(pInfo->stateKey.pData, val, bytes); + pInfo->hasKey = true; + + doKeepNewWindowStartInfo(pRowSup, tsList, j); + doKeepTuple(pRowSup, tsList[j]); + } else if (memcmp(pInfo->stateKey.pData, val, bytes) == 0) { + doKeepTuple(pRowSup, tsList[j]); + if (j == 0 && pRowSup->startRowIndex != 0) { + pRowSup->startRowIndex = 0; } - } else { + } else { // a new state window started SResultRow* pResult = NULL; - pInfo->curWindow.ekey = pInfo->curWindow.skey; - int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.uid, - &pInfo->curWindow, masterScan, &pResult, item->groupIndex, pBInfo->pCtx, - pOperator->numOfOutput, pBInfo->rowCellInfoOffset); + + // keep the time window for the closed time window. + STimeWindow window = pRowSup->win; + + pRowSup->win.ekey = pRowSup->win.skey; + int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &window, masterScan, + &pResult, gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } - // doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, - // pSDataBlock->info.rows, pOperator->numOfOutput); - pInfo->curWindow.skey = tsList[j]; - pInfo->curWindow.ekey = tsList[j]; - memcpy(pInfo->prevData, val, bytes); - pInfo->numOfRows = 1; - pInfo->start = j; + updateTimeWindowInfo(&pInfo->timeWindowData, &window, false); + doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + + // here we start a new session window + doKeepNewWindowStartInfo(pRowSup, tsList, j); + doKeepTuple(pRowSup, tsList[j]); } } SResultRow* pResult = NULL; - - pInfo->curWindow.ekey = pInfo->curWindow.skey; - int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.uid, &pInfo->curWindow, - masterScan, &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, - pBInfo->rowCellInfoOffset); + pRowSup->win.ekey = tsList[pBlock->info.rows - 1]; + int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &pRowSup->win, masterScan, &pResult, + gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } - // doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, - // pSDataBlock->info.rows, pOperator->numOfOutput); + updateTimeWindowInfo(&pInfo->timeWindowData, &pRowSup->win, false); + doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { @@ -5726,16 +5724,16 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { return NULL; } - SStateWindowOperatorInfo* pWindowInfo = pOperator->info; + SStateWindowOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SOptrBasicInfo* pBInfo = &pWindowInfo->binfo; + SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); - -// if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { -// pOperator->status = OP_EXEC_DONE; -// } + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset); + if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + doSetOperatorCompleted(pOperator); + return NULL; + } return pBInfo->pRes; } @@ -5753,28 +5751,20 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { break; } -// setInputDataBlock(pOperator, pBInfo->pCtx, pDataBlock, TSDB_ORDER_ASC); -// if (pWindowInfo->colIndex == -1) { -// pWindowInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock); -// } - doStateWindowAggImpl(pOperator, pWindowInfo, pBlock); + setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order); + doStateWindowAggImpl(pOperator, pInfo, pBlock); } - // restore the value -// pQueryAttr->order.order = order; -// pQueryAttr->window = win; - pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - finalizeQueryResult(pBInfo->pCtx, pOperator->numOfOutput); - -// initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo); - // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); + finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); -// if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { -// pOperator->status = OP_EXEC_DONE; -// } + initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); + blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity); + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset); + if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + doSetOperatorCompleted(pOperator); + } return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; } @@ -6098,7 +6088,7 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) { SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); - taosMemoryFreeClear(pInfo->prevData); + taosMemoryFreeClear(pInfo->stateKey.pData); } void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { @@ -6332,16 +6322,18 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } pInfo->colIndex = -1; - pInfo->reptScan = false; - // pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity); + + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExpr, numOfCols, 4096, pResBlock, pTaskInfo->id.str); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - pOperator->name = "StateWindowOperator"; - // pOperator->operatorType = OP_StateWindow; + pOperator->name = "StateWindowOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; @@ -6354,6 +6346,10 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; + + _error: + pTaskInfo->code = TSDB_CODE_SUCCESS; + return NULL; } SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, @@ -6375,7 +6371,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo pInfo->gap = gap; pInfo->binfo.pRes = pResBlock; - pInfo->prevTs = INT64_MIN; + pInfo->winSup.prevTs = INT64_MIN; pInfo->reptScan = false; pOperator->name = "SessionWindowAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW; @@ -6946,9 +6942,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + int32_t num = 0; if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { - int32_t num = 0; SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode; SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num); @@ -6957,8 +6953,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset}; return createProjectOperatorInfo(op, pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) { - int32_t num = 0; - SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); @@ -6972,7 +6966,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; - int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); @@ -6997,7 +6990,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; - int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); return createSessionAggOperatorInfo(op, pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); @@ -7006,10 +6998,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num); - return createPartitionOperatorInfo(op, pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL); + } else if (QUERY_NODE_STATE_WINDOW == type) { + SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode; + + SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + return createStatewindowOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo); } else { ASSERT(0); } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index ba1f2dbd76369bf9558acf574b8f70f478450cf3..3eb8ff1b7242f666a43af4a89225e33034a91aeb 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -48,7 +48,7 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** SColumn* pCol = taosArrayGet(pGroupColList, i); (*keyLen) += pCol->bytes; - struct SGroupKeys key = {0}; + SGroupKeys key = {0}; key.bytes = pCol->bytes; key.type = pCol->type; key.isNull = false;