diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 2e692f76acd40d1c778d3ab3be485b642712db45..2bbd16fbd02648d136164eb090088355c2210395 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -573,6 +573,7 @@ typedef struct SGroupbyOperatorInfo { typedef struct SSessionAggOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; + SGroupResInfo groupResInfo; STimeWindow curWindow; // current time window TSKEY prevTs; // previous timestamp int32_t numOfRows; // number of rows diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3282724ddd512a2d608561e00ef9c4f1fbbfcb51..6c70a582051e699d4b84749a6b4ff0afe6a235c5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1798,14 +1798,17 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) { } } -static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { +// todo handle multiple tables cases. +static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo *pInfo, SSDataBlock *pBlock) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; // primary timestamp column - SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0); -// bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); - SOptrBasicInfo* pBInfo = &pInfo->binfo; + bool masterScan = true; + STimeWindow window = {0}; + int32_t numOfOutput = pOperator->numOfOutput; + int64_t gid = pBlock->info.groupId; int64_t gap = pInfo->gap; pInfo->numOfRows = 0; @@ -1815,7 +1818,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } TSKEY* tsList = (TSKEY*)pColInfoData->pData; - for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { + for (int32_t j = 0; j < pBlock->info.rows; ++j) { if (pInfo->prevTs == INT64_MIN) { pInfo->curWindow.skey = tsList[j]; pInfo->curWindow.ekey = tsList[j]; @@ -1832,16 +1835,15 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } } else { // start a new session window SResultRow* pResult = NULL; - - int64_t gid = pSDataBlock->info.groupId; pInfo->curWindow.ekey = pInfo->curWindow.skey; -// int32_t ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, gid, pInfo->binfo.pCtx, -// numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); -// if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code -// longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); -// } + int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &window, masterScan, &pResult, gid, pInfo->binfo.pCtx, + numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); + } -// doApplyFunctions(pInfo->binfo.pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + // pInfo->numOfRows data belong to the current session window + doApplyFunctions(pInfo->binfo.pCtx, &window, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); pInfo->curWindow.skey = tsList[j]; pInfo->curWindow.ekey = tsList[j]; @@ -1854,14 +1856,13 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator SResultRow* pResult = NULL; pInfo->curWindow.ekey = pInfo->curWindow.skey; -// int32_t ret = setResultOutputBufByKey_rv(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.uid, &pInfo->curWindow, masterScan, -// &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, -// pBInfo->rowCellInfoOffset); -// if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code -// longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); -// } + int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &window, masterScan, &pResult, gid, pInfo->binfo.pCtx, + numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); + } -// doApplyFunctions(pInfo->binfo.pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pInfo->binfo.pCtx, &window, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { @@ -6703,7 +6704,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) { STableIntervalOperatorInfo* pInfo = pOperator->info; - // int32_t order = pQueryAttr->order.order; + int32_t order = TSDB_ORDER_ASC; // STimeWindow win = pQueryAttr->window; bool newgroup = false; SOperatorInfo* downstream = pOperator->pDownstream[0]; @@ -6720,7 +6721,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) { // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); } @@ -6732,7 +6733,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) { return TSDB_CODE_SUCCESS; } -static SSDataBlock* doIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { +static SSDataBlock* doBuildIntervalResult(SOperatorInfo *pOperator, bool* newgroup) { STableIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -7064,13 +7065,14 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo *pOperator, bool* newgroup) return NULL; } - SSessionAggOperatorInfo* pWindowInfo = pOperator->info; - SOptrBasicInfo* pBInfo = &pWindowInfo->binfo; + SSessionAggOperatorInfo* pInfo = pOperator->info; + SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { -// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); - if (pBInfo->pRes->info.rows == 0/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) { - pOperator->status = OP_EXEC_DONE; + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset); + if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + doSetOperatorCompleted(pOperator); + return NULL; } return pBInfo->pRes; @@ -7089,19 +7091,20 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo *pOperator, bool* newgroup) // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order); - doSessionWindowAggImpl(pOperator, pWindowInfo, pBlock); + doSessionWindowAggImpl(pOperator, pInfo, pBlock); } // restore the value pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); -// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); - finalizeQueryResult(pBInfo->pCtx, pOperator->numOfOutput); + finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); -// initGroupResInfo(&pBInfo->groupResInfo, &pBInfo->resultRowInfo); -// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); - if (pBInfo->pRes->info.rows == 0/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) { - pOperator->status = OP_EXEC_DONE; + initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); + + blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity); + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset); + if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + doSetOperatorCompleted(pOperator); } return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes; @@ -7678,7 +7681,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; pOperator->_openFn = doOpenIntervalAgg; - pOperator->getNextFn = doIntervalAgg; + pOperator->getNextFn = doBuildIntervalResult; pOperator->closeFn = destroyIntervalOperatorInfo; code = appendDownstream(pOperator, &downstream, 1); @@ -8775,11 +8778,11 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod for(int32_t i = 0; i < num; ++i) { SSlotDescNode* pNode = (SSlotDescNode*) nodesListGetNode(pOutputNodeList->pSlots, i); SColMatchInfo* info = taosArrayGet(pList, pNode->slotId); - if (pNode->output) { +// if (pNode->output) { (*numOfOutputCols) += 1; - } else { - info->output = false; - } +// } else { +// info->output = false; +// } } return pList;