From c5de68964b9e6f408b91902ec48a4fd19f2cc262 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 11 Mar 2022 12:44:25 +0800 Subject: [PATCH] [TD-11216]: Time window related keywords add _qstart/_qstop/_qduration --- src/query/inc/qAggMain.h | 1 + src/query/src/qAggMain.c | 18 ++++++++++++------ src/query/src/qExecutor.c | 27 ++++++++++++++++----------- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/src/query/inc/qAggMain.h b/src/query/inc/qAggMain.h index 323dff5aeb..063358e7ea 100644 --- a/src/query/inc/qAggMain.h +++ b/src/query/inc/qAggMain.h @@ -215,6 +215,7 @@ typedef struct SQLFunctionCtx { SHashObj **pUniqueSet; // for unique function SHashObj **pModeSet; // for mode function STimeWindow qWindow; // for _qstart/_qstop/_qduration column + int32_t allocRows; // rows allocated for output buffer } SQLFunctionCtx; typedef struct SAggFunctionInfo { diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 574ea333b9..404b045315 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -5811,9 +5811,11 @@ static void window_start_function(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, pCtx->size, 1); *(int64_t *)(pCtx->pOutput) = pCtx->startTs; } else { //TSDB_FUNC_QSTART - INC_INIT_VAL(pCtx, pCtx->size); + int32_t size = MIN(pCtx->size, pCtx->allocRows); //size cannot exceeds allocated rows + SET_VAL(pCtx, pCtx->size, size); + //INC_INIT_VAL(pCtx, size); char *output = pCtx->pOutput; - for (int32_t i = 0; i < pCtx->size; ++i) { + for (int32_t i = 0; i < size; ++i) { if (pCtx->qWindow.skey == INT64_MIN) { *(TKEY *)output = TSDB_DATA_TIMESTAMP_NULL; } else { @@ -5829,9 +5831,11 @@ static void window_stop_function(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, pCtx->size, 1); *(int64_t *)(pCtx->pOutput) = pCtx->endTs; } else { //TSDB_FUNC_QSTOP - INC_INIT_VAL(pCtx, pCtx->size); + int32_t size = MIN(pCtx->size, pCtx->allocRows); //size cannot exceeds allocated rows + SET_VAL(pCtx, pCtx->size, size); + //INC_INIT_VAL(pCtx, size); char *output = pCtx->pOutput; - for (int32_t i = 0; i < pCtx->size; ++i) { + for (int32_t i = 0; i < size; ++i) { if (pCtx->qWindow.ekey == INT64_MAX) { *(TKEY *)output = TSDB_DATA_TIMESTAMP_NULL; } else { @@ -5852,13 +5856,15 @@ static void window_duration_function(SQLFunctionCtx *pCtx) { } *(int64_t *)(pCtx->pOutput) = duration; } else { //TSDB_FUNC_QDURATION - INC_INIT_VAL(pCtx, pCtx->size); + int32_t size = MIN(pCtx->size, pCtx->allocRows); //size cannot exceeds allocated rows + SET_VAL(pCtx, pCtx->size, size); + //INC_INIT_VAL(pCtx, size); duration = pCtx->qWindow.ekey - pCtx->qWindow.skey; if (duration < 0) { duration = -duration; } char *output = pCtx->pOutput; - for (int32_t i = 0; i < pCtx->size; ++i) { + for (int32_t i = 0; i < size; ++i) { if (pCtx->qWindow.skey == INT64_MIN || pCtx->qWindow.ekey == INT64_MAX) { *(int64_t *)output = TSDB_DATA_BIGINT_NULL; } else { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 46d5d32221..3efea1240c 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1908,7 +1908,7 @@ static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) { } static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, - int32_t** rowCellInfoOffset) { + int32_t** rowCellInfoOffset, int32_t numOfRows) { SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQLFunctionCtx * pFuncCtx = (SQLFunctionCtx *)calloc(numOfOutput, sizeof(SQLFunctionCtx)); @@ -1959,6 +1959,7 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr pCtx->startTs = INT64_MIN; pCtx->qWindow = pQueryAttr->window; + pCtx->allocRows = numOfRows; pCtx->numOfParams = pSqlExpr->numOfParams; for (int32_t j = 0; j < pCtx->numOfParams; ++j) { @@ -5643,7 +5644,8 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, pInfo->bufCapacity = 200; // TD-10899 pInfo->udfInfo = pUdfInfo; pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor); - pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset, + pInfo->bufCapacity * pInfo->resultRowFactor); pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); @@ -7284,7 +7286,7 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera int32_t numOfRows = (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); - pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset, numOfRows); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -7456,7 +7458,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) tableGroup); - pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset, (int32_t) tableGroup); initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT); if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) { @@ -7501,7 +7503,7 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato SOptrBasicInfo* pBInfo = &pInfo->binfo; pBInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity); - pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset); + pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset, pInfo->bufCapacity); initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -7636,7 +7638,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp return NULL; } - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -7686,7 +7688,7 @@ SOperatorInfo* createTimeEveryOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera SOptrBasicInfo* pBInfo = &pInfo->binfo; pBInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity); - pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset); + pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset, pInfo->bufCapacity); if (pQueryAttr->needReverseScan) { pInfo->rangeStart = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP), false, false); @@ -7738,7 +7740,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe pInfo->colIndex = -1; pInfo->reptScan = false; - pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset, + pRuntimeEnv->resultInfo.capacity); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -7778,7 +7781,8 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato return NULL; } - pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset, + pRuntimeEnv->resultInfo.capacity); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -7820,7 +7824,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti return NULL; } - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); + pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); @@ -7863,7 +7867,8 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato } pInfo->colIndex = -1; // group by column index - pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset, + pRuntimeEnv->resultInfo.capacity); SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; -- GitLab