From 31c777158a7d57ea39b6fd18a29186ee30370454 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 21 Feb 2022 16:16:44 +0800 Subject: [PATCH] add mode function code --- src/client/src/tscGlobalmerge.c | 8 +- src/client/src/tscSQLParser.c | 9 +- src/inc/taoserror.h | 2 +- src/query/inc/qAggMain.h | 6 +- src/query/inc/qExecutor.h | 1 + src/query/inc/qResultbuf.h | 2 + src/query/src/qAggMain.c | 279 +++++++++++++++++++++----------- src/query/src/qExecutor.c | 22 +-- src/util/src/terror.c | 2 +- 9 files changed, 212 insertions(+), 119 deletions(-) diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index d01e1fcae3..f6a9b8e257 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -615,11 +615,9 @@ static void doMergeResultImpl(SOperatorInfo* pInfo, SQLFunctionCtx *pCtx, int32_ aAggs[functionId].mergeFunc(&pCtx[j]); } - if (functionId == TSDB_FUNC_UNIQUE && - (GET_RES_INFO(&(pCtx[j]))->numOfRes > MAX_UNIQUE_RESULT_ROWS || GET_RES_INFO(&(pCtx[j]))->numOfRes == -1)){ - tscError("Unique result num is too large. num: %d, limit: %d", - GET_RES_INFO(&(pCtx[j]))->numOfRes, MAX_UNIQUE_RESULT_ROWS); - longjmp(pInfo->pRuntimeEnv->env, TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE); + if (GET_RES_INFO(&(pCtx[j]))->numOfRes == -1){ + tscError("result num is too large."); + longjmp(pInfo->pRuntimeEnv->env, TSDB_CODE_QRY_RESULT_TOO_LARGE); } } } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 5e61d8e0c1..7f0a8021aa 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2693,7 +2693,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col const char* msg26 = "start param cannot be 0 with 'log_bin'"; const char* msg27 = "factor param cannot be negative or equal to 0/1"; const char* msg28 = "the second paramter of diff should be 0 or 1"; - const char* msg29 = "key timestamp column cannot be used to unique function"; + const char* msg29 = "key timestamp column cannot be used to unique/mode function"; switch (functionId) { case TSDB_FUNC_COUNT: { @@ -2791,7 +2791,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col case TSDB_FUNC_CSUM: case TSDB_FUNC_STDDEV: case TSDB_FUNC_LEASTSQR: - case TSDB_FUNC_ELAPSED: { + case TSDB_FUNC_ELAPSED: + case TSDB_FUNC_MODE: { // 1. valid the number of parameters int32_t numOfParams = (pItem->pNode->Expr.paramList == NULL) ? 0 : (int32_t)taosArrayGetSize(pItem->pNode->Expr.paramList); @@ -2852,7 +2853,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col // 2. check if sql function can be applied on this column data type SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); - if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED)) { + if (functionId == TSDB_FUNC_MODE && pColumnSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX ){ + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg29); + } else if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); } else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) && (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) { diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 64065d0b46..54c34f5d8a 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -293,7 +293,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070D) //"System error") #define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition") #define TSDB_CODE_QRY_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0710) //"invalid schema version") -#define TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0711) //"unique result num is too large") +#define TSDB_CODE_QRY_RESULT_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0711) //"result num is too large") // grant #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired" diff --git a/src/query/inc/qAggMain.h b/src/query/inc/qAggMain.h index aa5e2abd80..ab506b7061 100644 --- a/src/query/inc/qAggMain.h +++ b/src/query/inc/qAggMain.h @@ -79,8 +79,9 @@ extern "C" { #define TSDB_FUNC_ELAPSED 37 #define TSDB_FUNC_HISTOGRAM 38 #define TSDB_FUNC_UNIQUE 39 +#define TSDB_FUNC_MODE 40 -#define TSDB_FUNC_MAX_NUM 40 +#define TSDB_FUNC_MAX_NUM 41 #define TSDB_FUNCSTATE_SO 0x1u // single output #define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM @@ -148,7 +149,7 @@ typedef struct SResultRowCellInfo { int8_t hasResult; // result generated, not NULL value bool initialized; // output buffer has been initialized bool complete; // query has completed - uint32_t numOfRes; // num of output result in current buffer + int32_t numOfRes; // num of output result in current buffer } SResultRowCellInfo; typedef struct SPoint1 { @@ -203,6 +204,7 @@ typedef struct SQLFunctionCtx { SPoint1 end; SHashObj **pUniqueSet; // for unique function + SHashObj **pModeSet; // for mode function } SQLFunctionCtx; typedef struct SAggFunctionInfo { diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index c4aebc07b1..23c67793fe 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -91,6 +91,7 @@ typedef struct SResultRow { STimeWindow win; char *key; // start key of current result row SHashObj *uniqueHash; // for unique function + SHashObj *modeHash; // for unique function } SResultRow; typedef struct SResultRowCell { diff --git a/src/query/inc/qResultbuf.h b/src/query/inc/qResultbuf.h index d4194168e5..c8779f8130 100644 --- a/src/query/inc/qResultbuf.h +++ b/src/query/inc/qResultbuf.h @@ -80,6 +80,8 @@ typedef struct SDiskbasedResultBuf { #define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1} #define MAX_UNIQUE_RESULT_ROWS (1000) #define MAX_UNIQUE_RESULT_SIZE (1024*1024*1) +#define MAX_MODE_INNER_RESULT_ROWS (1000000) +#define MAX_MODE_INNER_RESULT_SIZE (1024*1024*10) /** * create disk-based result buffer * @param pResultBuf diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index b294c0482f..b853ec6db1 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -233,6 +233,16 @@ typedef struct { char res[]; } SUniqueFuncInfo; +typedef struct { + int64_t count; + char data[]; +} ModeUnit; + +typedef struct { + int32_t num; + char res[]; +} SModeFuncInfo; + int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) { if (!isValidDataType(dataType)) { @@ -369,13 +379,25 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI int64_t size = sizeof(UniqueUnit) + dataBytes + extLength; size *= param; size += sizeof(SUniqueFuncInfo); - if (size > MAX_UNIQUE_RESULT_SIZE){ + if (size > MAX_UNIQUE_RESULT_SIZE) { size = MAX_UNIQUE_RESULT_SIZE; } *bytes = size; *interBytes = *bytes; return TSDB_CODE_SUCCESS; + } else if (functionId == TSDB_FUNC_MODE) { + *type = TSDB_DATA_TYPE_BINARY; + int64_t size = sizeof(ModeUnit) + dataBytes; + size *= MAX_MODE_INNER_RESULT_ROWS; + size += sizeof(SModeFuncInfo); + if (size > MAX_MODE_INNER_RESULT_SIZE){ + size = MAX_MODE_INNER_RESULT_SIZE; + } + *bytes = size; + *interBytes = *bytes; + + return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_SAMPLE) { *type = TSDB_DATA_TYPE_BINARY; *bytes = (sizeof(SSampleFuncInfo) + dataBytes*param + sizeof(int64_t)*param + extLength*param); @@ -513,7 +535,18 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI size = MAX_UNIQUE_RESULT_SIZE; } *interBytes = (int32_t)size; - } else if (functionId == TSDB_FUNC_SAMPLE) { + } else if(functionId == TSDB_FUNC_MODE) { + *type = (int16_t)dataType; + *bytes = dataBytes; + int64_t size = sizeof(ModeUnit) + dataBytes; + size *= MAX_MODE_INNER_RESULT_ROWS; + size += sizeof(SModeFuncInfo); + if (size > MAX_MODE_INNER_RESULT_SIZE){ + size = MAX_MODE_INNER_RESULT_SIZE; + } + *interBytes = size; + return TSDB_CODE_SUCCESS; + }else if (functionId == TSDB_FUNC_SAMPLE) { *type = (int16_t)dataType; *bytes = dataBytes; size_t size = sizeof(SSampleFuncInfo) + dataBytes*param + sizeof(int64_t)*param + extLength*param; @@ -2245,20 +2278,12 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) { tfree(pData); } -/* - * Parameters values: - * 1. param[0]: maximum allowable results - * 2. param[1]: order by type (time or value) - * 3. param[2]: asc/desc order - * - * top/bottom use the intermediate result buffer to keep the intermediate result - */ -static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) { +static void *getOutputInfo(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); // only the first_stage_merge is directly written data into final output buffer if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { - return (STopBotInfo*) pCtx->pOutput; + return pCtx->pOutput; } else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer return GET_ROWCELL_INTERBUF(pResInfo); } @@ -2291,7 +2316,7 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, const char *minval, const cha return true; } - STopBotInfo *pTopBotInfo = getTopBotOutputInfo(pCtx); + STopBotInfo *pTopBotInfo = getOutputInfo(pCtx); // required number of results are not reached, continue load data block if (pTopBotInfo->num < pCtx->param[0].i64) { @@ -2346,7 +2371,7 @@ static bool top_bottom_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* return false; } - STopBotInfo *pInfo = getTopBotOutputInfo(pCtx); + STopBotInfo *pInfo = getOutputInfo(pCtx); buildTopBotStruct(pInfo, pCtx); return true; } @@ -2354,7 +2379,7 @@ static bool top_bottom_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* static void top_function(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; - STopBotInfo *pRes = getTopBotOutputInfo(pCtx); + STopBotInfo *pRes = getOutputInfo(pCtx); assert(pRes->num >= 0); if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].i64)) { @@ -2393,7 +2418,7 @@ static void top_func_merge(SQLFunctionCtx *pCtx) { // construct the input data struct from binary data buildTopBotStruct(pInput, pCtx); - STopBotInfo *pOutput = getTopBotOutputInfo(pCtx); + STopBotInfo *pOutput = getOutputInfo(pCtx); // the intermediate result is binary, we only use the output data type for (int32_t i = 0; i < pInput->num; ++i) { @@ -2413,7 +2438,7 @@ static void top_func_merge(SQLFunctionCtx *pCtx) { static void bottom_function(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; - STopBotInfo *pRes = getTopBotOutputInfo(pCtx); + STopBotInfo *pRes = getOutputInfo(pCtx); if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotInfo) + POINTER_BYTES * pCtx->param[0].i64)) { buildTopBotStruct(pRes, pCtx); @@ -2450,7 +2475,7 @@ static void bottom_func_merge(SQLFunctionCtx *pCtx) { // construct the input data struct from binary data buildTopBotStruct(pInput, pCtx); - STopBotInfo *pOutput = getTopBotOutputInfo(pCtx); + STopBotInfo *pOutput = getOutputInfo(pCtx); // the intermediate result is binary, we only use the output data type for (int32_t i = 0; i < pInput->num; ++i) { @@ -2619,18 +2644,6 @@ static void buildHistogramInfo(SAPercentileInfo* pInfo) { pInfo->pHisto->elems = (SHistBin*) ((char*)pInfo->pHisto + sizeof(SHistogramInfo)); } -static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) { - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SAPercentileInfo* pInfo = NULL; - - if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { - pInfo = (SAPercentileInfo*) pCtx->pOutput; - } else { - pInfo = GET_ROWCELL_INTERBUF(pResInfo); - } - return pInfo; -} - // // ----------------- tdigest ------------------- // @@ -2642,7 +2655,7 @@ static bool tdigest_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo) } // new TDigest - SAPercentileInfo *pInfo = getAPerctInfo(pCtx); + SAPercentileInfo *pInfo = getOutputInfo(pCtx); char *tmp = (char *)pInfo + sizeof(SAPercentileInfo); pInfo->pTDigest = tdigestNewFrom(tmp, COMPRESSION); return true; @@ -2652,7 +2665,7 @@ static void tdigest_do(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SAPercentileInfo * pAPerc = getAPerctInfo(pCtx); + SAPercentileInfo * pAPerc = getOutputInfo(pCtx); assert(pAPerc->pTDigest != NULL); if(pAPerc->pTDigest == NULL) { @@ -2694,7 +2707,7 @@ static void tdigest_merge(SQLFunctionCtx *pCtx) { return ; } - SAPercentileInfo *pOutput = getAPerctInfo(pCtx); + SAPercentileInfo *pOutput = getOutputInfo(pCtx); if(pOutput->pTDigest->num_centroids == 0) { memcpy(pOutput->pTDigest, pInput->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION)); tdigestAutoFill(pOutput->pTDigest, COMPRESSION); @@ -2711,7 +2724,7 @@ static void tdigest_finalizer(SQLFunctionCtx *pCtx) { double q = (pCtx->param[0].nType == TSDB_DATA_TYPE_INT) ? pCtx->param[0].i64 : pCtx->param[0].dKey; SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SAPercentileInfo * pAPerc = getAPerctInfo(pCtx); + SAPercentileInfo * pAPerc = getOutputInfo(pCtx); if (pCtx->currentStage == MERGE_STAGE) { if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null @@ -2755,7 +2768,7 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* return false; } - SAPercentileInfo *pInfo = getAPerctInfo(pCtx); + SAPercentileInfo *pInfo = getOutputInfo(pCtx); buildHistogramInfo(pInfo); char *tmp = (char *)pInfo + sizeof(SAPercentileInfo); @@ -2772,7 +2785,7 @@ static void apercentile_function(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); - SAPercentileInfo *pInfo = getAPerctInfo(pCtx); + SAPercentileInfo *pInfo = getOutputInfo(pCtx); buildHistogramInfo(pInfo); assert(pInfo->pHisto->elems != NULL); @@ -2816,7 +2829,7 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) { return; } - SAPercentileInfo *pOutput = getAPerctInfo(pCtx); + SAPercentileInfo *pOutput = getOutputInfo(pCtx); buildHistogramInfo(pOutput); SHistogramInfo *pHisto = pOutput->pHisto; @@ -4710,17 +4723,6 @@ static void mavg_function(SQLFunctionCtx *pCtx) { ////////////////////////////////////////////////////////////////////////////////// // Sample function with reservoir sampling algorithm -static SSampleFuncInfo* getSampleFuncOutputInfo(SQLFunctionCtx *pCtx) { - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - - // only the first_stage stable is directly written data into final output buffer - if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { - return (SSampleFuncInfo *) pCtx->pOutput; - } else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer - return GET_ROWCELL_INTERBUF(pResInfo); - } -} - static void assignResultSample(SQLFunctionCtx *pCtx, SSampleFuncInfo *pInfo, int32_t index, int64_t ts, void *pData, uint16_t type, int16_t bytes, char *inputTags) { assignVal(pInfo->values + index*bytes, pData, bytes, type); *(pInfo->timeStamps + index) = ts; @@ -4800,7 +4802,7 @@ static bool sample_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRes srand(taosSafeRand()); - SSampleFuncInfo *pRes = getSampleFuncOutputInfo(pCtx); + SSampleFuncInfo *pRes = getOutputInfo(pCtx); pRes->totalPoints = 0; pRes->numSampled = 0; pRes->values = ((char*)pRes + sizeof(SSampleFuncInfo)); @@ -4814,7 +4816,7 @@ static void sample_function(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SSampleFuncInfo *pRes = getSampleFuncOutputInfo(pCtx); + SSampleFuncInfo *pRes = getOutputInfo(pCtx); if (pRes->values != ((char*)pRes + sizeof(SSampleFuncInfo))) { pRes->values = ((char*)pRes + sizeof(SSampleFuncInfo)); @@ -4852,7 +4854,7 @@ static void sample_func_merge(SQLFunctionCtx *pCtx) { pInput->timeStamps = (int64_t*)((char*)pInput->values + pInput->colBytes * pCtx->param[0].i64); pInput->taglists = (char*)pInput->timeStamps + sizeof(int64_t)*pCtx->param[0].i64; - SSampleFuncInfo *pOutput = getSampleFuncOutputInfo(pCtx); + SSampleFuncInfo *pOutput = getOutputInfo(pCtx); pOutput->totalPoints = pInput->totalPoints; pOutput->numSampled = pInput->numSampled; for (int32_t i = 0; i < pInput->numSampled; ++i) { @@ -4886,20 +4888,12 @@ static void sample_func_finalizer(SQLFunctionCtx *pCtx) { ////////////////////////////////////////////////////////////////////////////////// // elapsed function -static SElapsedInfo * getSElapsedInfo(SQLFunctionCtx *pCtx) { - if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { - return (SElapsedInfo *)pCtx->pOutput; - } else { - return GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - } -} - static bool elapsedSetup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { if (!function_setup(pCtx, pResInfo)) { return false; } - SElapsedInfo *pInfo = getSElapsedInfo(pCtx); + SElapsedInfo *pInfo = getOutputInfo(pCtx); pInfo->min = MAX_TS_KEY; pInfo->max = 0; pInfo->hasResult = 0; @@ -4912,7 +4906,7 @@ static int32_t elapsedRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t col } static void elapsedFunction(SQLFunctionCtx *pCtx) { - SElapsedInfo *pInfo = getSElapsedInfo(pCtx); + SElapsedInfo *pInfo = getOutputInfo(pCtx); if (pCtx->preAggVals.isSet) { if (pInfo->min == MAX_TS_KEY) { pInfo->min = pCtx->preAggVals.statis.min; @@ -4979,7 +4973,7 @@ elapsedOver: } static void elapsedMerge(SQLFunctionCtx *pCtx) { - SElapsedInfo *pInfo = getSElapsedInfo(pCtx); + SElapsedInfo *pInfo = getOutputInfo(pCtx); memcpy(pInfo, pCtx->pInput, (size_t)pCtx->inputBytes); GET_RES_INFO(pCtx)->hasResult = pInfo->hasResult; } @@ -5002,25 +4996,12 @@ static void elapsedFinalizer(SQLFunctionCtx *pCtx) { } ////////////////////////////////////////////////////////////////////////////////// -// histogram function -static SHistogramFuncInfo* getHistogramFuncOutputInfo(SQLFunctionCtx *pCtx) { - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - - // only the first_stage stable is directly written data into final output buffer - if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { - return (SHistogramFuncInfo *) pCtx->pOutput; - } else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer - return GET_ROWCELL_INTERBUF(pResInfo); - } -} - - static bool histogram_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { if (!function_setup(pCtx, pResInfo)) { return false; } - SHistogramFuncInfo *pRes = getHistogramFuncOutputInfo(pCtx); + SHistogramFuncInfo *pRes = getOutputInfo(pCtx); if (!pRes) { return false; } @@ -5044,7 +5025,7 @@ static bool histogram_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* p static void histogram_function(SQLFunctionCtx *pCtx) { SResultRowCellInfo* pResInfo = GET_RES_INFO(pCtx); - SHistogramFuncInfo* pRes = getHistogramFuncOutputInfo(pCtx); + SHistogramFuncInfo* pRes = getOutputInfo(pCtx); if (pRes->orderedBins != (SHistogramFuncBin*)((char*)pRes + sizeof(SHistogramFuncInfo))) { pRes->orderedBins = (SHistogramFuncBin*)((char*)pRes + sizeof(SHistogramFuncInfo)); @@ -5092,7 +5073,7 @@ static void histogram_func_merge(SQLFunctionCtx *pCtx) { SHistogramFuncInfo* pInput = (SHistogramFuncInfo*) GET_INPUT_DATA_LIST(pCtx); pInput->orderedBins = (SHistogramFuncBin*)((char*)pInput + sizeof(SHistogramFuncInfo)); - SHistogramFuncInfo* pRes = getHistogramFuncOutputInfo(pCtx); + SHistogramFuncInfo* pRes = getOutputInfo(pCtx); for (int32_t i = 0; i < pInput->numOfBins; ++i) { pRes->orderedBins[i].count += pInput->orderedBins[i].count; } @@ -5129,18 +5110,6 @@ static void histogram_func_finalizer(SQLFunctionCtx *pCtx) { doFinalizer(pCtx); } -// unique use the intermediate result buffer to keep the intermediate result -static SUniqueFuncInfo *getUniqueOutputInfo(SQLFunctionCtx *pCtx) { - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - - // only the first_stage_merge is directly written data into final output buffer - if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { - return (SUniqueFuncInfo*) pCtx->pOutput; - } else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer - return GET_ROWCELL_INTERBUF(pResInfo); - } -} - // unique static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t bytes) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); @@ -5238,7 +5207,7 @@ static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSK } static void unique_function(SQLFunctionCtx *pCtx) { - SUniqueFuncInfo *pInfo = getUniqueOutputInfo(pCtx); + SUniqueFuncInfo *pInfo = getOutputInfo(pCtx); for (int32_t i = 0; i < pCtx->size; i++) { char *pData = GET_INPUT_DATA(pCtx, i); @@ -5248,18 +5217,19 @@ static void unique_function(SQLFunctionCtx *pCtx) { } do_unique_function(pCtx, pInfo, k, pData, NULL, pCtx->inputBytes, pCtx->inputType); - if (sizeof(SUniqueFuncInfo) + pInfo->num * (sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE){ + if (sizeof(SUniqueFuncInfo) + pInfo->num * (sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE + || (pInfo->num > MAX_UNIQUE_RESULT_ROWS)){ GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory return; } } - GET_RES_INFO(pCtx)->numOfRes = 1; +// GET_RES_INFO(pCtx)->numOfRes = pInfo->num; } static void unique_function_merge(SQLFunctionCtx *pCtx) { SUniqueFuncInfo *pInput = (SUniqueFuncInfo *)GET_INPUT_DATA_LIST(pCtx); - SUniqueFuncInfo *pOutput = getUniqueOutputInfo(pCtx); + SUniqueFuncInfo *pOutput = getOutputInfo(pCtx); size_t size = sizeof(UniqueUnit) + pCtx->outputBytes + pCtx->tagInfo.tagsLen; for (int32_t i = 0; i < pInput->num; ++i) { char *tmp = pInput->res + i* size; @@ -5268,13 +5238,14 @@ static void unique_function_merge(SQLFunctionCtx *pCtx) { char *tags = tmp + sizeof(UniqueUnit) + pCtx->outputBytes; do_unique_function(pCtx, pOutput, timestamp, data, tags, pCtx->outputBytes, pCtx->outputType); - if (sizeof(SUniqueFuncInfo) + pOutput->num * (sizeof(UniqueUnit) + pCtx->outputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE){ + if (sizeof(SUniqueFuncInfo) + pOutput->num * (sizeof(UniqueUnit) + pCtx->outputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE + || (pOutput->num > MAX_UNIQUE_RESULT_ROWS)){ GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory return; } } - GET_RES_INFO(pCtx)->numOfRes = pOutput->num; +// GET_RES_INFO(pCtx)->numOfRes = pOutput->num; } typedef struct{ @@ -5288,7 +5259,7 @@ static int32_t uniqueCompareFn(const void *p1, const void *p2, const void *param } static void unique_func_finalizer(SQLFunctionCtx *pCtx) { - SUniqueFuncInfo *pInfo = getUniqueOutputInfo(pCtx); + SUniqueFuncInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); GET_RES_INFO(pCtx)->numOfRes = pInfo->num; int32_t bytes = 0; @@ -5317,6 +5288,108 @@ static void unique_func_finalizer(SQLFunctionCtx *pCtx) { doFinalizer(pCtx); } +static bool mode_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { + if (!function_setup(pCtx, pResInfo)) { + return false; + } + if(*pCtx->pModeSet != NULL){ + taosHashClear(*pCtx->pModeSet); + }else{ + *pCtx->pModeSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + } + + return true; +} + +static void do_mode_function(SQLFunctionCtx *pCtx, SModeFuncInfo *pInfo, char *pData, int32_t bytes, int16_t type){ + int32_t hashKeyBytes = bytes; + if(IS_VAR_DATA_TYPE(type)){ // for var data, we can not use bytes, because there are dirty data in the back of var data + hashKeyBytes = varDataTLen(pData); + } + ModeUnit **mode = taosHashGet(*pCtx->pModeSet, pData, hashKeyBytes); + if (mode == NULL) { + size_t size = sizeof(ModeUnit) + bytes; + char *tmp = pInfo->res + pInfo->num * size; + ((ModeUnit*)tmp)->count = 1; + char *data = tmp + sizeof(ModeUnit); + memcpy(data, pData, bytes); + + taosHashPut(*pCtx->pModeSet, pData, hashKeyBytes, &tmp, sizeof(ModeUnit*)); + pInfo->num++; + }else{ + (*mode)->count++; + } +} + +static void mode_function(SQLFunctionCtx *pCtx) { + SModeFuncInfo *pInfo = getOutputInfo(pCtx); + + for (int32_t i = 0; i < pCtx->size; i++) { + char *pData = GET_INPUT_DATA(pCtx, i); + + do_mode_function(pCtx, pInfo, pData, pCtx->inputBytes, pCtx->inputType); + + if (sizeof(SModeFuncInfo) + pInfo->num * (sizeof(ModeUnit) + pCtx->inputBytes) >= MAX_MODE_INNER_RESULT_SIZE){ + GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory + return; + } + } +} + +static void mode_function_merge(SQLFunctionCtx *pCtx) { + SModeFuncInfo *pInput = (SModeFuncInfo *)GET_INPUT_DATA_LIST(pCtx); + SModeFuncInfo *pOutput = getOutputInfo(pCtx); + size_t size = sizeof(ModeUnit) + pCtx->outputBytes; + for (int32_t i = 0; i < pInput->num; ++i) { + char *tmp = pInput->res + i* size; + char *data = tmp + sizeof(ModeUnit); + do_mode_function(pCtx, pOutput, data, pCtx->outputBytes, pCtx->outputType); + + if (sizeof(SModeFuncInfo) + pOutput->num * (sizeof(ModeUnit) + pCtx->outputBytes) >= MAX_MODE_INNER_RESULT_SIZE){ + GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory + return; + } + } +} + +static void mode_func_finalizer(SQLFunctionCtx *pCtx) { + int32_t bytes = 0; + if (pCtx->currentStage == MERGE_STAGE) { + bytes = pCtx->outputBytes; + assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); + } else { + bytes = pCtx->inputBytes; + } + + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SModeFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); + + size_t size = sizeof(ModeUnit) + bytes; + + char *tvp = pRes->res; + char *result = NULL; + int64_t maxCount = 0; + for (int32_t i = 0; i < pRes->num; ++i) { + int64_t count = ((ModeUnit*)tvp)->count; + if (count > maxCount){ + maxCount = count; + result = tvp; + }else if(count == maxCount){ + result = NULL; + } + tvp += size; + } + + if (result){ + memcpy(pCtx->pOutput, result + sizeof(ModeUnit), bytes); + pResInfo->numOfRes = 1; + }else{ + pResInfo->numOfRes = 0; + } + + doFinalizer(pCtx); +} + ///////////////////////////////////////////////////////////////////////////////////////////// /* * function compatible list. @@ -5823,5 +5896,17 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ unique_func_finalizer, unique_function_merge, dataBlockRequired, + }, + { + // 40 + "mode", + TSDB_FUNC_MODE, + TSDB_FUNC_MODE, + TSDB_FUNCSTATE_SO, + mode_function_setup, + mode_function, + mode_func_finalizer, + mode_function_merge, + dataBlockRequired, } }; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 0e1cf3a883..ffee9d6b00 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1009,11 +1009,9 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx } } - if (functionId == TSDB_FUNC_UNIQUE && - (GET_RES_INFO(&(pCtx[k]))->numOfRes > MAX_UNIQUE_RESULT_ROWS || GET_RES_INFO(&(pCtx[k]))->numOfRes == -1)){ - qError("Unique result num is too large. num: %d, limit: %d", - GET_RES_INFO(&(pCtx[k]))->numOfRes, MAX_UNIQUE_RESULT_ROWS); - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE); + if (GET_RES_INFO(&(pCtx[k]))->numOfRes == -1){ + qError("result num is too large."); + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_RESULT_TOO_LARGE); } // restore it @@ -1276,11 +1274,9 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction assert(0); } - if (functionId == TSDB_FUNC_UNIQUE && - (GET_RES_INFO(&(pCtx[k]))->numOfRes > MAX_UNIQUE_RESULT_ROWS || GET_RES_INFO(&(pCtx[k]))->numOfRes == -1)){ - qError("Unique result num is too large. num: %d, limit: %d", - GET_RES_INFO(&(pCtx[k]))->numOfRes, MAX_UNIQUE_RESULT_ROWS); - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE); + if (GET_RES_INFO(&(pCtx[k]))->numOfRes == -1){ + qError("Mode inner result num is too large"); + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_RESULT_TOO_LARGE); } } } @@ -3690,6 +3686,8 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i pCtx[i].resultInfo = pCellInfo; if (pCtx[i].functionId == TSDB_FUNC_UNIQUE) { pCtx[i].pUniqueSet = &pRow->uniqueHash; + }else if (pCtx[i].functionId == TSDB_FUNC_MODE) { + pCtx[i].pUniqueSet = &pRow->modeHash; } pCtx[i].pOutput = pData->pData; pCtx[i].currentStage = stage; @@ -4027,6 +4025,8 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); if (pCtx[i].functionId == TSDB_FUNC_UNIQUE){ pCtx[i].pUniqueSet = &pResult->uniqueHash; + }else if (pCtx[i].functionId == TSDB_FUNC_MODE){ + pCtx[i].pUniqueSet = &pResult->modeHash; } SResultRowCellInfo* pResInfo = pCtx[i].resultInfo; @@ -4123,6 +4123,8 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); if (pCtx[i].functionId == TSDB_FUNC_UNIQUE) { pCtx[i].pUniqueSet = &pResult->uniqueHash; + }else if (pCtx[i].functionId == TSDB_FUNC_MODE) { + pCtx[i].pUniqueSet = &pResult->modeHash; } } } diff --git a/src/util/src/terror.c b/src/util/src/terror.c index e78d1d37ee..159ae7cd1d 100644 --- a/src/util/src/terror.c +++ b/src/util/src/terror.c @@ -299,7 +299,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, "Query buffer limit ha TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in replica") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_TIME_CONDITION, "One valid time range condition expected") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SYS_ERROR, "System error") -TAOS_DEFINE_ERROR(TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE, "Unique result num is too large") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_RESULT_TOO_LARGE, "result num is too large") // grant TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired") -- GitLab