diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index ba382bf0807101eebbe87049962d487b1bab2136..f9ac48de6eff6692e7b88d504885d11468c07834 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -200,6 +200,7 @@ static void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t static void setResultBufSize(SQueryAttr* pQueryAttr, SRspResultInfo* pResultInfo); static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr); +static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes); static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t groupIndex); @@ -1251,8 +1252,10 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn STableQueryInfo* item = pRuntimeEnv->current; SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex); - int16_t bytes = pColInfoData->info.bytes; - int16_t type = pColInfoData->info.type; + + SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + int16_t bytes = pColInfoData->info.bytes; + int16_t type = pColInfoData->info.type; if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { qError("QInfo:%p group by not supported on double/float columns, abort", pRuntimeEnv->qinfo); @@ -1273,6 +1276,10 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn memcpy(pInfo->prevData, val, bytes); + if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) { + setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, bytes); + } + int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, pInfo, pOperator->numOfOutput, val, type, bytes, item->groupIndex); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code @@ -3319,7 +3326,7 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx int32_t numOfExprs = pQueryAttr->numOfOutput; for(int32_t i = 0; i < numOfExprs; ++i) { SExprInfo* pExprInfo1 = &(pExprInfo[i]); - if (pExprInfo->base.functionId != TSDB_FUNC_STDDEV_DST) { + if (pExprInfo1->base.functionId != TSDB_FUNC_STDDEV_DST) { continue; } @@ -3347,6 +3354,36 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx } +void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes) { + SQueryAttr* pQuery = pRuntimeEnv->pQueryAttr; + + int32_t numOfExprs = pQuery->numOfOutput; + for(int32_t i = 0; i < numOfExprs; ++i) { + SSqlExpr* pExpr1 = &pExpr[i].base; + if (pExpr1->functionId != TSDB_FUNC_STDDEV_DST) { + continue; + } + + pCtx[i].param[0].arr = NULL; + pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int + + // TODO use hash to speedup this loop + int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult); + for (int32_t j = 0; j < numOfGroup; ++j) { + SInterResult* p = taosArrayGet(pRuntimeEnv->prevResult, j); + if (bytes == 0 || memcmp(p->tags, val, bytes) == 0) { + int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult); + for (int32_t k = 0; k < numOfCols; ++k) { + SStddevInterResult* pres = taosArrayGet(p->pResult, k); + if (pres->colId == pExpr1->colInfo.colId) { + pCtx[i].param[0].arr = pres->pResult; + break; + } + } + } + } + } +} /* * There are two cases to handle: * @@ -3476,9 +3513,9 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti } } -static void updateNumOfRowsInResultRows(SQueryRuntimeEnv *pRuntimeEnv, - SQLFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { - SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; +static void updateNumOfRowsInResultRows(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, + SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { + SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; // update the number of result for each, only update the number of rows for the corresponding window result. if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {