diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 6b4b188c5ede2fc10baa894a55ad5d2225283d0c..895b414a56287bb6e35bcc0c50adcc338edbee1b 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -171,11 +171,10 @@ typedef struct SQuery { typedef struct SQueryRuntimeEnv { jmp_buf env; - SResultRow* pResultRow; // todo refactor to merge with SWindowResInfo SQuery* pQuery; SQLFunctionCtx* pCtx; int32_t numOfRowsPerPage; - uint16_t offset[TSDB_MAX_COLUMNS]; + uint16_t* offset; uint16_t scanFlag; // denotes reversed scan of data or not SFillInfo* pFillInfo; SWindowResInfo windowResInfo; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e9e2327ce5a722a4937e3f1a4279a60c5da7b450..101fb7e82050ccecde631fd932d85351f7ff6c44 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1627,10 +1627,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order SQuery *pQuery = pRuntimeEnv->pQuery; pRuntimeEnv->pCtx = (SQLFunctionCtx *)calloc(pQuery->numOfOutput, sizeof(SQLFunctionCtx)); + pRuntimeEnv->offset = calloc(pQuery->numOfOutput, sizeof(int16_t)); pRuntimeEnv->rowCellInfoOffset = calloc(pQuery->numOfOutput, sizeof(int32_t)); - pRuntimeEnv->pResultRow = getNewResultRow(pRuntimeEnv->pool); - if (pRuntimeEnv->pResultRow == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL) { + if (pRuntimeEnv->offset == NULL || pRuntimeEnv->pCtx == NULL || pRuntimeEnv->rowCellInfoOffset == NULL) { goto _clean; } @@ -1670,15 +1670,15 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order assert(isValidDataType(pCtx->inputType)); pCtx->ptsOutputBuf = NULL; - pCtx->outputBytes = pQuery->pExpr1[i].bytes; - pCtx->outputType = pQuery->pExpr1[i].type; + pCtx->outputBytes = pQuery->pExpr1[i].bytes; + pCtx->outputType = pQuery->pExpr1[i].type; - pCtx->order = pQuery->order.order; - pCtx->functionId = pSqlFuncMsg->functionId; - pCtx->stableQuery = pRuntimeEnv->stableQuery; + pCtx->order = pQuery->order.order; + pCtx->functionId = pSqlFuncMsg->functionId; + pCtx->stableQuery = pRuntimeEnv->stableQuery; pCtx->interBufBytes = pQuery->pExpr1[i].interBytes; - pCtx->numOfParams = pSqlFuncMsg->numOfParams; + pCtx->numOfParams = pSqlFuncMsg->numOfParams; for (int32_t j = 0; j < pCtx->numOfParams; ++j) { int16_t type = pSqlFuncMsg->arg[j].argType; int16_t bytes = pSqlFuncMsg->arg[j].argBytes; @@ -1726,6 +1726,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order _clean: tfree(pRuntimeEnv->pCtx); + tfree(pRuntimeEnv->offset); + tfree(pRuntimeEnv->rowCellInfoOffset); return TSDB_CODE_QRY_OUT_OF_MEMORY; } @@ -1775,6 +1777,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { doFreeQueryHandle(pQInfo); pRuntimeEnv->pTSBuf = tsBufDestroy(pRuntimeEnv->pTSBuf); + + tfree(pRuntimeEnv->offset); tfree(pRuntimeEnv->keyBuf); tfree(pRuntimeEnv->rowCellInfoOffset); @@ -3337,7 +3341,13 @@ int32_t initResultRow(SResultRow *pResultRow) { void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - SResultRow* pRow = pRuntimeEnv->pResultRow; + + SResultRow* pRow = NULL; + if (pRuntimeEnv->windowResInfo.size == 0) { + int32_t groupIndex = 0; + int32_t uid = 0; + pRow = doPrepareResultRowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&groupIndex, sizeof(groupIndex), true, uid); + } for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i]; @@ -4619,12 +4629,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order); } - // create runtime environment - code = setupQueryRuntimeEnv(pRuntimeEnv, pQuery->order.order); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - int32_t ps = DEFAULT_PAGE_SIZE; int32_t rowsize = 0; getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize); @@ -4656,7 +4660,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo return code; } } - } else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) { + } else if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery) || (!isSTableQuery)) { int32_t numOfResultRows = getInitialPageNum(pQInfo); getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize); code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rowsize, ps, TWOMB, pQInfo); @@ -4677,6 +4681,12 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo } } + // create runtime environment + code = setupQueryRuntimeEnv(pRuntimeEnv, pQuery->order.order); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) { SFillColInfo* pColInfo = createFillColInfo(pQuery); STimeWindow w = TSWINDOW_INITIALIZER;