diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 278d9d8b7c503889f147984bc151e0e80a02e259..9a518589b0dc597f566906200ced19c2d1ed7884 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -183,11 +183,11 @@ typedef struct SqlFunctionCtx { int32_t columnIndex; // TODO remove it uint8_t currentStage; // record current running step, default: 0 bool isAggSet; + int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it ///////////////////////////////////////////////////////////////// bool stableQuery; int16_t functionId; // function id char * pOutput; // final result output buffer, point to sdata->data - int64_t startTs; // timestamp range of current query when function is executed on a specific data block int32_t numOfParams; SVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param int64_t *ptsList; // corresponding timestamp array list diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 64e2b2c1d37bc4c78a556e708f31c84f166f7efa..485ae0a9c0f3584f240f7e247005a79650c70cf0 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -550,15 +550,16 @@ typedef struct SGroupbyOperatorInfo { } SGroupbyOperatorInfo; typedef struct SSessionAggOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SGroupResInfo groupResInfo; - STimeWindow curWindow; // current time window - TSKEY prevTs; // previous timestamp - int32_t numOfRows; // number of rows - int32_t start; // start row index - bool reptScan; // next round scan - int64_t gap; // session window gap + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SGroupResInfo groupResInfo; + STimeWindow curWindow; // current time window + TSKEY prevTs; // previous timestamp + int32_t numOfRows; // number of rows + int32_t start; // start row index + bool reptScan; // next round scan + int64_t gap; // session window gap + SColumnInfoData timeWindowData; // query time window info for scalar function execution. } SSessionAggOperatorInfo; typedef struct SStateWindowOperatorInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b238af06a3bc07a50483f9f4abb3d30f4f1a7cb3..3f895b3f1a99e4e93bc168dfc5d61592101d7fac 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1024,29 +1024,25 @@ static void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQuer colDataAppendInt64(pColData, 4, &pQueryWindow->ekey); } -static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin) { +static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, bool includeEndpoint) { int64_t* ts = (int64_t*)pColData->pData; + int32_t delta = includeEndpoint? 1:0; - int64_t duration = pWin->ekey - pWin->skey + 1; + int64_t duration = pWin->ekey - pWin->skey + delta; ts[2] = duration; // set the duration ts[3] = pWin->skey; // window start key - ts[4] = pWin->ekey + 1; // window end key + ts[4] = pWin->ekey + delta; // window end key } static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) { - SScalarParam intervalParam = {.numOfRows = 5, .columnData = pTimeWindowData}; //TODO move out of this function - if (pTimeWindowData != NULL) { - updateTimeWindowInfo(pTimeWindowData, pWin); - } - for (int32_t k = 0; k < numOfOutput; ++k) { pCtx[k].startTs = pWin->skey; // keep it temporarialy - bool hasAgg = pCtx[k].input.colDataAggIsSet; + bool hasAgg = pCtx[k].input.colDataAggIsSet; + int32_t numOfRows = pCtx[k].input.numOfRows; int32_t startOffset = pCtx[k].input.startRowIndex; - int32_t numOfRows = pCtx[k].input.numOfRows; int32_t pos = (order == TSDB_ORDER_ASC) ? offset : offset - (forwardStep - 1); pCtx[k].input.startRowIndex = pos; @@ -1066,12 +1062,14 @@ static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInf SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]); char* p = GET_ROWCELL_INTERBUF(pEntryInfo); - SScalarParam out = {.columnData = NULL}; - out.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData)); - out.columnData->info.type = TSDB_DATA_TYPE_BIGINT; - out.columnData->info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; - out.columnData->pData = p; - pCtx[k].sfp.process(&intervalParam, 1, &out); + SColumnInfoData idata = {0}; + idata.info.type = TSDB_DATA_TYPE_BIGINT; + idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; + idata.pData = p; + + SScalarParam out = {.columnData = &idata}; + SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData}; + pCtx[k].sfp.process(&tw, 1, &out); pEntryInfo->numOfRes = 1; pEntryInfo->hasResult = ','; continue; @@ -1617,8 +1615,9 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe #endif // window start key interpolation - doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep, - pInfo->order, false); + doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep, pInfo->order, false); + + updateTimeWindowInfo(&pInfo->timeWindowData, &win, true); doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); STimeWindow nextWin = win; @@ -1647,8 +1646,9 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); // window start(end) key interpolation - doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, - pInfo->order, false); + doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, pInfo->order, false); + + updateTimeWindowInfo(&pInfo->timeWindowData, &win, true); doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } @@ -1915,73 +1915,80 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } } +static void doKeepTuple(SSessionAggOperatorInfo* pInfo, int64_t ts) { + pInfo->curWindow.ekey = ts; + pInfo->prevTs = ts; + pInfo->numOfRows += 1; +} + +static void doKeepSessionStartInfo(SSessionAggOperatorInfo* pInfo, const int64_t* tsList, int32_t rowIndex) { + pInfo->start = rowIndex; + pInfo->numOfRows = 0; + pInfo->curWindow.skey = tsList[rowIndex]; +} + // todo handle multiple tables cases. static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - // primary timestamp column SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0); - bool masterScan = true; - STimeWindow window = {0}; - int32_t numOfOutput = pOperator->numOfOutput; - int64_t gid = pBlock->info.groupId; + bool masterScan = true; + int32_t numOfOutput = pOperator->numOfOutput; + int64_t gid = pBlock->info.groupId; int64_t gap = pInfo->gap; pInfo->numOfRows = 0; - if (/*IS_REPEAT_SCAN(pRuntimeEnv) && */ !pInfo->reptScan) { + if (!pInfo->reptScan) { pInfo->reptScan = true; - pInfo->prevTs = INT64_MIN; + pInfo->prevTs = INT64_MIN; } + // In case of ascending or descending order scan data, only one time window needs to be kepted for each table. TSKEY* tsList = (TSKEY*)pColInfoData->pData; for (int32_t j = 0; j < pBlock->info.rows; ++j) { if (pInfo->prevTs == INT64_MIN) { - pInfo->curWindow.skey = tsList[j]; - pInfo->curWindow.ekey = tsList[j]; - pInfo->prevTs = tsList[j]; - pInfo->numOfRows = 1; - pInfo->start = j; + doKeepSessionStartInfo(pInfo, tsList, j); + doKeepTuple(pInfo, tsList[j]); } else if (tsList[j] - pInfo->prevTs <= gap && (tsList[j] - pInfo->prevTs) >= 0) { - pInfo->curWindow.ekey = tsList[j]; - pInfo->prevTs = tsList[j]; - pInfo->numOfRows += 1; + // The gap is less than the threshold, so it belongs to current session window that has been opened already. + doKeepTuple(pInfo, tsList[j]); if (j == 0 && pInfo->start != 0) { - pInfo->numOfRows = 1; pInfo->start = 0; } } else { // start a new session window SResultRow* pResult = NULL; + + // keep the time window for the closed time window. + STimeWindow window = pInfo->curWindow; + pInfo->curWindow.ekey = pInfo->curWindow.skey; int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &window, masterScan, - &pResult, gid, pInfo->binfo.pCtx, numOfOutput, - pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); + &pResult, gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } // pInfo->numOfRows data belong to the current session window - doApplyFunctions(pInfo->binfo.pCtx, &window, NULL, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->timeWindowData, &window, false); + doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->timeWindowData, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); - pInfo->curWindow.skey = tsList[j]; - pInfo->curWindow.ekey = tsList[j]; - pInfo->prevTs = tsList[j]; - pInfo->numOfRows = 1; - pInfo->start = j; + // here we start a new session window + doKeepSessionStartInfo(pInfo, tsList, j); + doKeepTuple(pInfo, tsList[j]); } } SResultRow* pResult = NULL; - - pInfo->curWindow.ekey = pInfo->curWindow.skey; - int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &window, masterScan, &pResult, - gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, - &pInfo->aggSup, pTaskInfo); + pInfo->curWindow.ekey = tsList[pBlock->info.rows - 1]; + int32_t ret = setResultOutputBufByKey_rv(&pInfo->binfo.resultRowInfo, pBlock->info.uid, &pInfo->curWindow, masterScan, &pResult, + gid, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } - doApplyFunctions(pInfo->binfo.pCtx, &window, NULL, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); + updateTimeWindowInfo(&pInfo->timeWindowData, &pInfo->curWindow, false); + doApplyFunctions(pInfo->binfo.pCtx, &pInfo->curWindow, &pInfo->timeWindowData, pInfo->start, pInfo->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { @@ -7219,8 +7226,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, - pBInfo->rowCellInfoOffset); + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -7248,14 +7254,11 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) // restore the value pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); - finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, - pBInfo->rowCellInfoOffset); + finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); - blockDataEnsureCapacity(pBInfo->pRes, pBInfo->capacity); - toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, - pBInfo->rowCellInfoOffset); + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pBInfo->pRes, pBInfo->capacity, pBInfo->rowCellInfoOffset); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -7863,28 +7866,28 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo } int32_t numOfRows = 4096; - int32_t code = - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); + initExecTimeWindowInfo(&pInfo->timeWindowData, &pTaskInfo->window); - pInfo->gap = gap; - pInfo->binfo.pRes = pResBlock; - pInfo->prevTs = INT64_MIN; - pInfo->reptScan = false; - pOperator->name = "SessionWindowAggOperator"; + pInfo->gap = gap; + pInfo->binfo.pRes = pResBlock; + pInfo->prevTs = INT64_MIN; + pInfo->reptScan = false; + pOperator->name = "SessionWindowAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; - pOperator->info = pInfo; - pOperator->getNextFn = doSessionWindowAgg; - pOperator->closeFn = destroySWindowOperatorInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->numOfOutput = numOfCols; + pOperator->info = pInfo; + pOperator->getNextFn = doSessionWindowAgg; + pOperator->closeFn = destroySWindowOperatorInfo; + pOperator->pTaskInfo = pTaskInfo; code = appendDownstream(pOperator, &downstream, 1); return pOperator;