diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 3de357d4240294359cc23d3cc9344a341e0ad1e3..c1ebef5d53d510fff623f22a376fd4d89142baa3 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2970,7 +2970,6 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd) STableMeta* pTableMeta = NULL; SSchema* pSchema = NULL; -// SSchema s = tGetTbnameColumnSchema(); int32_t tableIndex = COLUMN_INDEX_INITIAL_VAL; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 96aae423d5e394a4fcb6a728fa84b94a039d77af..d9cfd7d8b39ba28fdf0d512559832ba3b8a579d9 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1878,14 +1878,31 @@ void doAppendData(SInterResult* pInterResult, TAOS_ROW row, int32_t numOfCols, S } } +static void destroySup(SFirstRoundQuerySup* pSup) { + taosArrayDestroyEx(pSup->pResult, freeInterResult); + taosArrayDestroy(pSup->pColsInfo); + tfree(pSup); +} + void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { SSqlObj* pSql = (SSqlObj*)tres; SSqlRes* pRes = &pSql->res; SFirstRoundQuerySup* pSup = param; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - if (numOfRows > 0) { + SSqlObj* pParent = pSup->pParent; + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + + int32_t code = taos_errno(pSql); + if (code != TSDB_CODE_SUCCESS) { + destroySup(pSup); + taos_free_result(pSql); + pParent->res.code = code; + tscAsyncResultOnError(pParent); + return; + } + + if (numOfRows > 0) { // the number is not correct for group by column in super table query TAOS_ROW row = NULL; int32_t numOfCols = taos_field_count(tres); @@ -1895,6 +1912,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { while ((row = taos_fetch_row(tres)) != NULL) { doAppendData(&interResult, row, numOfCols, pQueryInfo); + pSup->numOfRows += 1; } } else { // tagLen > 0 char* p = calloc(1, pSup->tagLen); @@ -1906,7 +1924,9 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { int32_t offset = 0; for (int32_t i = 0; i < numOfCols && offset < pSup->tagLen; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); - if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) { + + // tag or group by column + if (TSDB_COL_IS_TAG(pExpr->colInfo.flag) || pExpr->functionId == TSDB_FUNC_PRJ) { memcpy(p + offset, row[i], length[i]); offset += pExpr->resBytes; } @@ -1935,20 +1955,20 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { taosArrayPush(pSup->pResult, &interResult); doAppendData(&interResult, row, numOfCols, pQueryInfo); } + + pSup->numOfRows += 1; } tfree(p); } } - pSup->numOfRows += numOfRows; if (!pRes->completed) { taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param); return; } // set the parameters for the second round query process - SSqlObj *pParent = pSup->pParent; SSqlCmd *pPCmd = &pParent->cmd; SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pPCmd, 0); @@ -1974,9 +1994,19 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { } void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) { - int32_t c = taos_errno(tres); + SFirstRoundQuerySup* pSup = (SFirstRoundQuerySup*) param; + + SSqlObj* pSql = (SSqlObj*) tres; + int32_t c = taos_errno(pSql); + if (c != TSDB_CODE_SUCCESS) { - // TODO HANDLE ERROR + SSqlObj* parent = pSup->pParent; + + destroySup(pSup); + taos_free_result(pSql); + parent->res.code = code; + tscAsyncResultOnError(parent); + return; } taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param); @@ -2010,13 +2040,13 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { pNewQueryInfo->groupbyExpr.columnInfo = taosArrayDup(pQueryInfo->groupbyExpr.columnInfo); if (pNewQueryInfo->groupbyExpr.columnInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; -// goto _error; + goto _error; } } if (tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond) != 0) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; -// goto _error; + goto _error; } pNewQueryInfo->interval = pQueryInfo->interval; @@ -2027,7 +2057,6 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo); int32_t index = 0; - int32_t numOfTags = 0; for(int32_t i = 0; i < numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); if (pExpr->functionId == TSDB_FUNC_TS && pQueryInfo->interval.interval > 0) { @@ -2060,7 +2089,25 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TAG, &colIndex, schema, TSDB_COL_TAG); p->resColId = pExpr->resColId; - numOfTags += 1; + } else if (pExpr->functionId == TSDB_FUNC_PRJ) { + int32_t num = taosArrayGetSize(pNewQueryInfo->groupbyExpr.columnInfo); + for(int32_t k = 0; k < num; ++k) { + SColIndex* pIndex = taosArrayGet(pNewQueryInfo->groupbyExpr.columnInfo, k); + if (pExpr->colInfo.colId == pIndex->colId) { + pSup->tagLen += pExpr->resBytes; + taosArrayPush(pSup->pColsInfo, &pExpr->resColId); + + SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pIndex->colIndex}; + SSchema* schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->colInfo.colId); + + //doLimitOutputNormalColOfGroupby + SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_PRJ, &colIndex, schema, TSDB_COL_NORMAL); + p->numOfParams = 1; + p->param[0].i64 = 1; + p->param[0].nType = TSDB_DATA_TYPE_INT; + p->resColId = pExpr->resColId; // update the result column id + } + } } } @@ -2077,6 +2124,13 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { tscHandleMasterSTableQuery(pNew); return TSDB_CODE_SUCCESS; + + _error: + destroySup(pSup); + taos_free_result(pNew); + pSql->res.code = terrno; + tscAsyncResultOnError(pSql); + return terrno; } int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { @@ -2118,7 +2172,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { tfree(pMemoryBuf); return ret; } - + tscDebug("%p retrieved query data from %d vnode(s)", pSql, pState->numOfSub); pSql->pSubs = calloc(pState->numOfSub, POINTER_BYTES); if (pSql->pSubs == NULL) { diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index d540400961b00bad5bef80464c00a726fead16f3..ec1261da0a45ad24985ebf51b9b16c2acfad7709 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -235,7 +235,8 @@ typedef struct SQueryRuntimeEnv { bool hasTagResults; // if there are tag values in final result or not bool timeWindowInterpo;// if the time window start/end required interpolation bool queryWindowIdentical; // all query time windows are identical for all tables in one group - bool queryBlockDist; // if query data block distribution + bool queryBlockDist; // if query data block distribution + bool stabledev; // super table stddev query int32_t interBufSize; // intermediate buffer sizse int32_t prevGroupId; // previous executed group id SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 314e8823d3e961f45548ab169ee8713f4fe74461..6fbc8ed3c03807fe117794c186e67b1db2889f9a 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -1630,6 +1630,97 @@ static void stddev_dst_function(SQLFunctionCtx *pCtx) { memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)), sizeof(SAvgInfo)); } +static void stddev_dst_function_f(SQLFunctionCtx *pCtx, int32_t index) { + void *pData = GET_INPUT_DATA(pCtx, index); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + return; + } + + // the second stage to calculate standard deviation + SStddevdstInfo *pStd = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + double *retVal = &pStd->res; + + // all data are null, no need to proceed + SArray* resList = (SArray*) pCtx->param[0].pz; + if (resList == NULL) { + return; + } + + // find the correct group average results according to the tag value + int32_t len = (int32_t) taosArrayGetSize(resList); + assert(len > 0); + + double avg = 0; + if (len == 1) { + SResPair* p = taosArrayGet(resList, 0); + avg = p->avg; + } else { // todo opt performance by using iterator since the timestamp lsit is matched with the output result + SResPair* p = bsearch(&pCtx->startTs, resList->pData, len, sizeof(SResPair), tsCompare); + assert(p != NULL); + + avg = p->avg; + } + + int32_t num = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_INT: { + for (int32_t i = 0; i < pCtx->size; ++i) { + if (pCtx->hasNull && isNull((const char*) (&((int32_t *)pData)[i]), pCtx->inputType)) { + continue; + } + num += 1; + *retVal += POW2(((int32_t *)pData)[i] - avg); + } + break; + } + case TSDB_DATA_TYPE_FLOAT: { + LOOP_STDDEV_IMPL(float, *retVal, pData, pCtx, avg, pCtx->inputType, num); + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + LOOP_STDDEV_IMPL(double, *retVal, pData, pCtx, avg, pCtx->inputType, num); + break; + } + case TSDB_DATA_TYPE_TINYINT: { + LOOP_STDDEV_IMPL(int8_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); + break; + } + case TSDB_DATA_TYPE_UTINYINT: { + LOOP_STDDEV_IMPL(int8_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); + break; + } + case TSDB_DATA_TYPE_SMALLINT: { + LOOP_STDDEV_IMPL(int16_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); + break; + } + case TSDB_DATA_TYPE_USMALLINT: { + LOOP_STDDEV_IMPL(uint16_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); + break; + } + case TSDB_DATA_TYPE_UINT: { + LOOP_STDDEV_IMPL(uint32_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); + break; + } + case TSDB_DATA_TYPE_BIGINT: { + LOOP_STDDEV_IMPL(int64_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); + break; + } + case TSDB_DATA_TYPE_UBIGINT: { + LOOP_STDDEV_IMPL(uint64_t, *retVal, pData, pCtx, avg, pCtx->inputType, num); + break; + } + default: + qError("stddev function not support data type:%d", pCtx->inputType); + } + + pStd->num += num; + SET_VAL(pCtx, num, 1); + + // copy to the final output buffer for super table + memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)), sizeof(SAvgInfo)); +} + + static void stddev_dst_merge(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SStddevdstInfo* pRes = GET_ROWCELL_INTERBUF(pResInfo); @@ -4835,7 +4926,7 @@ SAggFunctionInfo aAggs[] = {{ TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STABLE, function_setup, stddev_dst_function, - noop2, + stddev_dst_function_f, no_next_step, stddev_dst_finalizer, stddev_dst_merge, diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 1f80e543b76811ee3019dfa7ca0994d57b43a542..e1f690407bf7fc7892ef45439cf928b1be1b04f2 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -280,6 +280,17 @@ bool isGroupbyColumn(SSqlGroupbyExpr *pGroupbyExpr) { return false; } +bool isStabledev(SQuery* pQuery) { + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + int32_t functId = pQuery->pExpr1[i].base.functionId; + if (functId == TSDB_FUNC_STDDEV_DST) { + return true; + } + } + + return false; +} + int16_t getGroupbyColumnType(SQuery *pQuery, SSqlGroupbyExpr *pGroupbyExpr) { assert(pGroupbyExpr != NULL); @@ -1637,8 +1648,9 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS pWindowResInfo->curIndex = index; } else { // other queries // decide which group this rows belongs to according to current state value + char* val = NULL; if (groupbyColumnValue) { - char *val = groupbyColumnData + bytes * offset; + val = groupbyColumnData + bytes * offset; if (isNull(val, type)) { // ignore the null value continue; } @@ -1649,6 +1661,34 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } } + if (pRuntimeEnv->stabledev) { + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { + int32_t functionId = pQuery->pExpr1[k].base.functionId; + if (functionId != TSDB_FUNC_STDDEV_DST) { + continue; + } + + pRuntimeEnv->pCtx[k].param[0].arr = NULL; + pRuntimeEnv->pCtx[k].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int + + // todo opt perf + int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult); + for (int32_t i = 0; i < numOfGroup; ++i) { + SInterResult *p = taosArrayGet(pRuntimeEnv->prevResult, i); + if (memcmp(p->tags, val, bytes) == 0) { + int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult); + for (int32_t f = 0; f < numOfCols; ++f) { + SStddevInterResult *pres = taosArrayGet(p->pResult, f); + if (pres->colId == pQuery->pExpr1[k].base.colInfo.colId) { + pRuntimeEnv->pCtx[k].param[0].arr = pres->pResult; + break; + } + } + } + } + } + } + for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { int32_t functionId = pQuery->pExpr1[k].base.functionId; if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { @@ -3799,7 +3839,7 @@ int32_t setTimestampListJoinInfo(SQInfo *pQInfo, STableQueryInfo *pTableQueryInf int32_t setParamValue(SQueryRuntimeEnv* pRuntimeEnv) { SQuery* pQuery = pRuntimeEnv->pQuery; - if (pRuntimeEnv->prevResult == NULL) { + if (pRuntimeEnv->prevResult == NULL || pRuntimeEnv->groupbyColumn) { return TSDB_CODE_SUCCESS; } @@ -4602,6 +4642,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pRuntimeEnv->stableQuery = isSTableQuery; pRuntimeEnv->prevGroupId = INT32_MIN; pRuntimeEnv->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr); + pRuntimeEnv->stabledev = isStabledev(pQuery); if (pTsBuf != NULL) { int16_t order = (pQuery->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; @@ -4701,13 +4742,6 @@ static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTa setTimestampListJoinInfo(pQInfo, pTableQueryInfo); } - for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { - if (pQuery->pExpr1[i].base.functionId == TSDB_FUNC_STDDEV_DST) { - setParamValue(pRuntimeEnv); - break; - } - } - if (QUERY_IS_INTERVAL_QUERY(pQuery)) { setIntervalQueryRange(pQInfo, pBlockInfo->window.skey); } else { // non-interval query @@ -4761,6 +4795,15 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo); } + if (pRuntimeEnv->stabledev) { + for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { + if (pQuery->pExpr1[i].base.functionId == TSDB_FUNC_STDDEV_DST) { + setParamValue(pRuntimeEnv); + break; + } + } + } + uint32_t status = 0; SDataStatis *pStatis = NULL; SArray *pDataBlock = NULL;