From 6cc2a721f30df01406ecf3edfbfe1aa2078cc015 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 15 Feb 2022 18:38:35 +0800 Subject: [PATCH] modify unique function like top --- src/client/inc/tscUtil.h | 2 +- src/client/src/tscSQLParser.c | 4 +++- src/client/src/tscUtil.c | 2 +- src/query/src/qAggMain.c | 35 +++++++++++++++++++---------------- src/query/src/qUtil.c | 8 ++++---- 5 files changed, 28 insertions(+), 23 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 3dfaae820e..1f84fa27d7 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -242,7 +242,7 @@ SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnInde int16_t size, int16_t resColId, int16_t interSize, bool isTagCol); SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type, - int16_t size); + int32_t size); size_t tscNumOfExprs(SQueryInfo* pQueryInfo); int32_t tscExprTopBottomIndex(SQueryInfo* pQueryInfo); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 430c88a1d7..551064a981 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4000,7 +4000,9 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) { (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_STDDEV_DST) || (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_IRATE) || (functionId == TSDB_FUNC_SAMPLE) || - (functionId == TSDB_FUNC_ELAPSED) || (functionId == TSDB_FUNC_HISTOGRAM)) { + (functionId == TSDB_FUNC_ELAPSED) || + (functionId == TSDB_FUNC_HISTOGRAM) || + (functionId == TSDB_FUNC_UNIQUE)) { if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes, &interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_OPERATION; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 68b5a9e5f2..e67dd7e14b 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2615,7 +2615,7 @@ SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnInde } SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, - int16_t type, int16_t size) { + int16_t type, int32_t size) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); SExprInfo* pExpr = tscExprGet(pQueryInfo, index); if (pExpr == NULL) { diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index db7a73eec6..4b5007eb2a 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -5193,15 +5193,15 @@ static bool unique_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRes return true; } -static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSKEY timestamp, char *pData, char *tag){ - UniqueUnit *unique = taosHashGet(pCtx->pUniqueSet, pData, pCtx->inputBytes); +static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSKEY timestamp, char *pData, char *tag, int32_t bytes){ + UniqueUnit *unique = taosHashGet(pCtx->pUniqueSet, pData, bytes); if (unique == NULL) { - size_t size = sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen; + size_t size = sizeof(UniqueUnit) + bytes + pCtx->tagInfo.tagsLen; char *tmp = pInfo->res + pInfo->num * size; ((UniqueUnit*)tmp)->timestamp = timestamp; char *data = tmp + sizeof(UniqueUnit); - char *tags = tmp + sizeof(UniqueUnit) + pCtx->inputBytes; - memcpy(data, pData, pCtx->inputBytes); + char *tags = tmp + sizeof(UniqueUnit) + bytes; + memcpy(data, pData, bytes); if (pCtx->currentStage == MERGE_STAGE && tag != NULL) { memcpy(tags, tag, (size_t)pCtx->tagInfo.tagsLen); @@ -5209,15 +5209,18 @@ static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSK int32_t offset = 0; for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) { SQLFunctionCtx *tagCtx = pCtx->tagInfo.pTagCtxList[j]; - if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) { - aAggs[TSDB_FUNC_TAG].xFunction(tagCtx); - memcpy(tags + offset, tagCtx->pOutput, tagCtx->outputBytes); - offset += tagCtx->outputBytes; + if (tagCtx->functionId == TSDB_FUNC_TS_DUMMY) { + tagCtx->tag.nType = TSDB_DATA_TYPE_BIGINT; + tagCtx->tag.i64 = timestamp; } + + tVariantDump(&tagCtx->tag, tagCtx->pOutput, tagCtx->tag.nType, true); + memcpy(tags + offset, tagCtx->pOutput, tagCtx->outputBytes); + offset += tagCtx->outputBytes; } } - taosHashPut(pCtx->pUniqueSet, pData, pCtx->inputBytes, &tmp, sizeof(tValuePair*)); + taosHashPut(pCtx->pUniqueSet, pData, bytes, &tmp, sizeof(UniqueUnit*)); pInfo->num++; }else if(unique->timestamp > timestamp){ unique->timestamp = timestamp; @@ -5233,7 +5236,7 @@ static void unique_function(SQLFunctionCtx *pCtx) { if (pCtx->ptsList != NULL) { k = GET_TS_DATA(pCtx, i); } - do_unique_function(pCtx, pInfo, k, pData, NULL); + do_unique_function(pCtx, pInfo, k, pData, NULL, pCtx->inputBytes); if (sizeof(SUniqueFuncInfo) + pInfo->num * (sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE){ GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory @@ -5247,15 +5250,15 @@ static void unique_function(SQLFunctionCtx *pCtx) { static void unique_function_merge(SQLFunctionCtx *pCtx) { SUniqueFuncInfo *pInput = (SUniqueFuncInfo *)GET_INPUT_DATA_LIST(pCtx); SUniqueFuncInfo *pOutput = getUniqueOutputInfo(pCtx); - size_t size = sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen; + size_t size = sizeof(UniqueUnit) + pCtx->outputBytes + pCtx->tagInfo.tagsLen; for (int32_t i = 0; i < pInput->num; ++i) { char *tmp = pInput->res + i* size; TSKEY timestamp = ((UniqueUnit*)tmp)->timestamp; char *data = tmp + sizeof(UniqueUnit); - char *tags = tmp + sizeof(UniqueUnit) + pCtx->inputBytes; - do_unique_function(pCtx, pOutput, timestamp, data, tags); + char *tags = tmp + sizeof(UniqueUnit) + pCtx->outputBytes; + do_unique_function(pCtx, pOutput, timestamp, data, tags, pCtx->outputBytes); - if (sizeof(SUniqueFuncInfo) + pOutput->num * (sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE){ + if (sizeof(SUniqueFuncInfo) + pOutput->num * (sizeof(UniqueUnit) + pCtx->outputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE){ GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory return; } @@ -5772,7 +5775,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ // 39 "unique", TSDB_FUNC_UNIQUE, - TSDB_FUNC_INVALID_ID, + TSDB_FUNC_UNIQUE, TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_SELECTIVITY, unique_function_setup, unique_function, diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 36e97f9fcf..c585f44050 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -33,7 +33,7 @@ typedef struct SCompSupporter { } SCompSupporter; int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, bool stable) { - if (pQueryAttr && (!stable)) { + if (pQueryAttr && (!stable)) { // if table is stable, no need return more than 1 no in merge stage for (int16_t i = 0; i < pQueryAttr->numOfOutput; ++i) { if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TOP || pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_BOTTOM || @@ -42,11 +42,11 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i64; } } + if (pQueryAttr->uniqueQuery){ + return MAX_UNIQUE_RESULT_ROWS; + } } - if (pQueryAttr->uniqueQuery){ - return MAX_UNIQUE_RESULT_ROWS; - } return 1; } -- GitLab