diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h index c314087179763f9b745c2fa4a4c318c061c72102..a78bb7ec51de2886d5396a9a18192947221bce88 100644 --- a/src/query/inc/tsqlfunction.h +++ b/src/query/inc/tsqlfunction.h @@ -177,7 +177,7 @@ typedef struct SQLFunctionCtx { int16_t outputType; int16_t outputBytes; // size of results, determined by function and input column data type bool hasNull; // null value exist in current block - bool requireNull; // require null in some function + bool requireNull; // require null in some function int16_t functionId; // function id void * aInputElemBuf; char * aOutputBuf; // final result output buffer, point to sdata->data diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 75614578feb81f4a218d6f9a850d5f414c773290..511d6c36efdfb0a74c0265ebaba533d854a41a23 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -743,9 +743,9 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, bool updateLastKey) { assert(startPos >= 0 && startPos < pDataBlockInfo->rows); - int32_t num = -1; + int32_t num = -1; int32_t order = pQuery->order.order; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); + int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); STableQueryInfo* item = pQuery->current; @@ -779,27 +779,24 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo return num; } -static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, bool closed, STimeWindow *pWin, - int32_t offset, int32_t forwardStep, TSKEY *tsBuf, int32_t numOfTotal) { +static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, bool closed, STimeWindow *pWin, int32_t offset, + int32_t forwardStep, TSKEY *tsCol, int32_t numOfTotal) { SQuery * pQuery = pRuntimeEnv->pQuery; SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; if (IS_MASTER_SCAN(pRuntimeEnv) || closed) { for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { - int32_t functionId = pQuery->pSelectExpr[k].base.functionId; - pCtx[k].nStartQueryTimestamp = pWin->skey; pCtx[k].size = forwardStep; pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1); + int32_t functionId = pQuery->pSelectExpr[k].base.functionId; if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) { - pCtx[k].ptsList = &tsBuf[pCtx[k].startOffset]; + pCtx[k].ptsList = &tsCol[pCtx[k].startOffset]; } // not a whole block involved in query processing, statistics data can not be used - if (forwardStep != numOfTotal) { - pCtx[k].preAggVals.isSet = false; - } + pCtx[k].preAggVals.isSet = (forwardStep == numOfTotal); if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { aAggs[functionId].xFunction(&pCtx[k]); @@ -924,19 +921,11 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas char *dataBlock = NULL; SQuery *pQuery = pRuntimeEnv->pQuery; - SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; int32_t functionId = pQuery->pSelectExpr[col].base.functionId; if (functionId == TSDB_FUNC_ARITHM) { sas->pArithExpr = &pQuery->pSelectExpr[col]; - // set the start offset to be the lowest start position, no matter asc/desc query order - if (QUERY_IS_ASC_QUERY(pQuery)) { - pCtx->startOffset = pQuery->pos; - } else { - pCtx->startOffset = pQuery->pos - (size - 1); - } - sas->offset = 0; sas->colList = pQuery->colList; sas->numOfCols = pQuery->numOfCols; @@ -1478,7 +1467,8 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY pCtx->size = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->rows - pQuery->pos : pQuery->pos + 1; // minimum value no matter ascending/descending order query - pCtx->startOffset = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos: (pQuery->pos - pCtx->size - 1); + pCtx->startOffset = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos: (pQuery->pos - pCtx->size + 1); + assert(pCtx->startOffset >= 0); uint32_t status = aAggs[functionId].nStatus; if (((status & (TSDB_FUNCSTATE_SELECTIVITY | TSDB_FUNCSTATE_NEED_TS)) != 0) && (tsCol != NULL)) {