diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 324e56a4b4ce3c822e498df8123331df48e1cc5f..37e0175f996eb268459ac344a068f271c3f9eb45 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -5807,38 +5807,56 @@ int16_t getTimeWindowFunctionID(int16_t colIndex) { } static void window_start_function(SQLFunctionCtx *pCtx) { - SET_VAL(pCtx, pCtx->size, 1); if (pCtx->functionId == TSDB_FUNC_WSTART) { + SET_VAL(pCtx, pCtx->size, 1); *(int64_t *)(pCtx->pOutput) = pCtx->startTs; } else { //TSDB_FUNC_QSTART - *(TSKEY *)(pCtx->pOutput) = pCtx->qWindow.skey; + INC_INIT_VAL(pCtx, pCtx->size); + char *output = pCtx->pOutput; + for (int32_t i = 0; i < pCtx->size; ++i) { + memcpy(output, &pCtx->qWindow.skey, pCtx->outputBytes); + output += pCtx->outputBytes; + } } } static void window_stop_function(SQLFunctionCtx *pCtx) { - SET_VAL(pCtx, pCtx->size, 1); if (pCtx->functionId == TSDB_FUNC_WSTOP) { + SET_VAL(pCtx, pCtx->size, 1); *(int64_t *)(pCtx->pOutput) = pCtx->endTs; } else { //TSDB_FUNC_QSTOP - *(TSKEY *)(pCtx->pOutput) = pCtx->qWindow.ekey; + INC_INIT_VAL(pCtx, pCtx->size); + char *output = pCtx->pOutput; + for (int32_t i = 0; i < pCtx->size; ++i) { + memcpy(output, &pCtx->qWindow.ekey, pCtx->outputBytes); + output += pCtx->outputBytes; + } } } static void window_duration_function(SQLFunctionCtx *pCtx) { - SET_VAL(pCtx, pCtx->size, 1); int64_t duration; if (pCtx->functionId == TSDB_FUNC_WDURATION) { + SET_VAL(pCtx, pCtx->size, 1); duration = pCtx->endTs - pCtx->startTs; + if (duration < 0) { + duration = -duration; + } + *(int64_t *)(pCtx->pOutput) = duration; } else { //TSDB_FUNC_QDURATION + INC_INIT_VAL(pCtx, pCtx->size); duration = pCtx->qWindow.ekey - pCtx->qWindow.skey; + if (duration < 0) { + duration = -duration; + } + char *output = pCtx->pOutput; + for (int32_t i = 0; i < pCtx->size; ++i) { + memcpy(output, &duration, pCtx->outputBytes); + output += pCtx->outputBytes; + } } - - if (duration < 0) { - duration = -duration; - } - - *(int64_t *)(pCtx->pOutput) = duration; } + ///////////////////////////////////////////////////////////////////////////////////////////// /* * function compatible list. diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d2ccb64a37500e734b3ac1b7c8e4d2cf177795f8..46d5d32221014ecf5f253bb018b31f70e15dd75c 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -385,7 +385,7 @@ int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int3 * the number of output result is decided by main output */ if (hasMainFunction && (id == TSDB_FUNC_TS || id == TSDB_FUNC_TAG || id == TSDB_FUNC_TAGPRJ || - id == TSDB_FUNC_TS_DUMMY || id == TSDB_FUNC_TAG_DUMMY)) { + id == TSDB_FUNC_TS_DUMMY || id == TSDB_FUNC_TAG_DUMMY || isTimeWindowFunction(id))) { continue; } @@ -1958,6 +1958,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr pCtx->end.key = INT64_MIN; pCtx->startTs = INT64_MIN; + pCtx->qWindow = pQueryAttr->window; + pCtx->numOfParams = pSqlExpr->numOfParams; for (int32_t j = 0; j < pCtx->numOfParams; ++j) { int16_t type = pSqlExpr->param[j].nType; @@ -3925,7 +3927,8 @@ static bool hasMainOutput(SQueryAttr *pQueryAttr) { for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { int32_t functionId = pQueryAttr->pExpr1[i].base.functionId; - if (functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TAGPRJ) { + if (functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_TAG && + functionId != TSDB_FUNC_TAGPRJ && !isTimeWindowFunction(functionId)) { return true; } } @@ -5959,7 +5962,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock); // if all pCtx is completed, then query should be over if(allCtxCompleted(pOperator, pInfo->pCtx)) - break; + break; } doSetOperatorCompleted(pOperator);