From d4a509e51f9d0feafc5637eaf5525855d74815cd Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 17 Jan 2022 14:45:26 +0800 Subject: [PATCH] (query):code prototyping, histogram function --- src/query/inc/qAggMain.h | 1 + src/query/src/qAggMain.c | 115 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+) diff --git a/src/query/inc/qAggMain.h b/src/query/inc/qAggMain.h index b1b82ae762..78f0e78521 100644 --- a/src/query/inc/qAggMain.h +++ b/src/query/inc/qAggMain.h @@ -77,6 +77,7 @@ extern "C" { #define TSDB_FUNC_BLKINFO 36 #define TSDB_FUNC_ELAPSED 37 +#define TSDB_FUNC_HISTOGRAM 38 /////////////////////////////////////////// // the following functions is not implemented. diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 48e52e078a..9a67e7060e 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -210,6 +210,17 @@ typedef struct { }; } SDiffFuncInfo; +typedef struct { + double lower; // >lower + double upper; // <=upper + int64_t count; +} SHistogramFuncBin; + +typedef struct{ + int32_t numOfBins; + SHistogramFuncBin* orderedBins; +} SHistogramFuncInfo; + int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) { if (!isValidDataType(dataType)) { @@ -4802,6 +4813,9 @@ static void sample_func_finalizer(SQLFunctionCtx *pCtx) { doFinalizer(pCtx); } +////////////////////////////////////////////////////////////////////////////////// +// elapsed function + static SElapsedInfo * getSElapsedInfo(SQLFunctionCtx *pCtx) { if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { return (SElapsedInfo *)pCtx->pOutput; @@ -4917,6 +4931,96 @@ static void elapsedFinalizer(SQLFunctionCtx *pCtx) { doFinalizer(pCtx); } +////////////////////////////////////////////////////////////////////////////////// +// histogram function +static SHistogramFuncInfo* getHistogramFuncOutputInfo(SQLFunctionCtx *pCtx) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + + // only the first_stage stable is directly written data into final output buffer + if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { + return (SHistogramFuncInfo *) pCtx->pOutput; + } else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer + return GET_ROWCELL_INTERBUF(pResInfo); + } +} + + +static bool histogram_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { + if (!function_setup(pCtx, pResInfo)) { + return false; + } + + SHistogramFuncInfo *pRes = getHistogramFuncOutputInfo(pCtx); + if (!pRes) { + return false; + } + + double* listBin = (double*) pCtx->param[0].pz; + int32_t numOfBins = pCtx->param[0].nLen / sizeof(double) - 1; + pRes->orderedBins = (SHistogramFuncBin*)((char*)pRes + sizeof(SHistogramFuncInfo)); + for (int32_t i = 0; i < numOfBins; ++i) { + pRes->orderedBins[i].lower = listBin[i]; + pRes->orderedBins[i].upper = listBin[i+1]; + pRes->orderedBins[i].count = 0; + } + return true; +} + +static void histogram_function(SQLFunctionCtx *pCtx) { + SResultRowCellInfo* pResInfo = GET_RES_INFO(pCtx); + + SHistogramFuncInfo* pRes = getHistogramFuncOutputInfo(pCtx); + + if (pRes->orderedBins != (SHistogramFuncBin*)((char*)pRes + sizeof(SHistogramFuncInfo))) { + pRes->orderedBins = (SHistogramFuncBin*)((char*)pRes + sizeof(SHistogramFuncInfo)); + } + + int32_t notNullElems = 0; + for (int32_t i = 0; i < pCtx->size; ++i) { + char *data = GET_INPUT_DATA(pCtx, i); + if (pCtx->hasNull && isNull(data, pCtx->inputType)) { + continue; + } + + notNullElems++; + double v; + GET_TYPED_DATA(v, double, pCtx->inputType, data); + + for (int32_t b = 0; b < pRes->numOfBins; ++b) { + if (v > pRes->orderedBins[b].lower && v <= pRes->orderedBins[b].upper) { + pRes->orderedBins[b].count++; + } + } + } + + // treat the result as only one result + SET_VAL(pCtx, notNullElems, 1); + if (notNullElems > 0) { + pResInfo->hasResult = DATA_SET_FLAG; + } +} + +static void histogram_func_merge(SQLFunctionCtx *pCtx) { + SHistogramFuncInfo* pInput = (SHistogramFuncInfo*) GET_INPUT_DATA_LIST(pCtx); + SHistogramFuncInfo* pRes = getHistogramFuncOutputInfo(pCtx); + for (int32_t i = 0; i < pInput->numOfBins; ++i) { + pRes->orderedBins[i].count += pInput->orderedBins[i].count; + } + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + pResInfo->hasResult = DATA_SET_FLAG; +} + +static void histogram_func_finalizer(SQLFunctionCtx *pCtx) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SHistogramFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); + + if (!pRes) { + return; + } + + doFinalizer(pCtx); +} + ///////////////////////////////////////////////////////////////////////////////////////////// /* * function compatible list. @@ -5399,5 +5503,16 @@ SAggFunctionInfo aAggs[40] = {{ elapsedFinalizer, elapsedMerge, elapsedRequired, + }, + { + "histogram", + TSDB_FUNC_HISTOGRAM, + TSDB_FUNC_HISTOGRAM, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE, + histogram_function_setup, + histogram_function, + histogram_func_finalizer, + histogram_func_merge, + dataBlockRequired, } }; -- GitLab