From 9768c30be1f946830361873a091af1bf71993ff1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 9 Sep 2020 22:15:12 +0800 Subject: [PATCH] [td-1372] --- src/client/src/tscFunctionImpl.c | 136 +++++++++++++++++++++++++++++-- src/query/inc/qPercentile.h | 4 +- src/query/src/qPercentile.c | 89 +++++++++++++++++--- 3 files changed, 205 insertions(+), 24 deletions(-) diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 4b31a8001f..1356b98c67 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -117,6 +117,10 @@ typedef struct SFirstLastInfo { typedef struct SFirstLastInfo SLastrowInfo; typedef struct SPercentileInfo { tMemBucket *pMemBucket; + int32_t stage; + double minval; + double maxval; + int64_t numOfElems; } SPercentileInfo; typedef struct STopBotInfo { @@ -302,7 +306,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI } else if (functionId == TSDB_FUNC_PERCT) { *type = (int16_t)TSDB_DATA_TYPE_DOUBLE; *bytes = (int16_t)sizeof(double); - *interBytes = (int16_t)sizeof(double); + *interBytes = (int16_t)sizeof(SPercentileInfo); } else if (functionId == TSDB_FUNC_LEASTSQR) { *type = TSDB_DATA_TYPE_BINARY; *bytes = TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE; // string @@ -2428,12 +2432,14 @@ static bool percentile_function_setup(SQLFunctionCtx *pCtx) { if (!function_setup(pCtx)) { return false; } - + + // in the first round, get the min-max value of all involved data SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SPercentileInfo *pInfo = pResInfo->interResultBuf; + pInfo->minval = DBL_MAX; + pInfo->maxval = -DBL_MAX; + pInfo->numOfElems = 0; - ((SPercentileInfo *)(pResInfo->interResultBuf))->pMemBucket = - tMemBucketCreate(pCtx->inputBytes, pCtx->inputType); - return true; } @@ -2442,7 +2448,65 @@ static void percentile_function(SQLFunctionCtx *pCtx) { SResultInfo * pResInfo = GET_RES_INFO(pCtx); SPercentileInfo *pInfo = pResInfo->interResultBuf; - + + // the first stage, only acquire the min/max value + if (pInfo->stage == 0) { + if (pCtx->preAggVals.isSet) { + if (pInfo->minval > pCtx->preAggVals.statis.min) { + pInfo->minval = pCtx->preAggVals.statis.min; + } + + if (pInfo->maxval < pCtx->preAggVals.statis.max) { + pInfo->maxval = pCtx->preAggVals.statis.max; + } + + pInfo->numOfElems += (pCtx->size - pCtx->preAggVals.statis.numOfNull); + } else { + for (int32_t i = 0; i < pCtx->size; ++i) { + char *data = GET_INPUT_CHAR_INDEX(pCtx, i); + if (pCtx->hasNull && isNull(data, pCtx->inputType)) { + continue; + } + + // TODO extract functions + double v = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: + v = GET_INT8_VAL(data); + break; + case TSDB_DATA_TYPE_SMALLINT: + v = GET_INT16_VAL(data); + break; + case TSDB_DATA_TYPE_BIGINT: + v = (double)(GET_INT64_VAL(data)); + break; + case TSDB_DATA_TYPE_FLOAT: + v = GET_FLOAT_VAL(data); + break; + case TSDB_DATA_TYPE_DOUBLE: + v = GET_DOUBLE_VAL(data); + break; + default: + v = GET_INT32_VAL(data); + break; + } + + if (v < pInfo->minval) { + pInfo->minval = v; + } + + if (v > pInfo->maxval) { + pInfo->maxval = v; + } + + pInfo->numOfElems += 1; + } + } + + return; + } + + // the second stage, calculate the true percentile value for (int32_t i = 0; i < pCtx->size; ++i) { char *data = GET_INPUT_CHAR_INDEX(pCtx, i); if (pCtx->hasNull && isNull(data, pCtx->inputType)) { @@ -2462,10 +2526,47 @@ static void percentile_function_f(SQLFunctionCtx *pCtx, int32_t index) { if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } - + SResultInfo *pResInfo = GET_RES_INFO(pCtx); - + SPercentileInfo *pInfo = (SPercentileInfo *)pResInfo->interResultBuf; + + if (pInfo->stage == 0) { + // TODO extract functions + double v = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: + v = GET_INT8_VAL(pData); + break; + case TSDB_DATA_TYPE_SMALLINT: + v = GET_INT16_VAL(pData); + break; + case TSDB_DATA_TYPE_BIGINT: + v = (double)(GET_INT64_VAL(pData)); + break; + case TSDB_DATA_TYPE_FLOAT: + v = GET_FLOAT_VAL(pData); + break; + case TSDB_DATA_TYPE_DOUBLE: + v = GET_DOUBLE_VAL(pData); + break; + default: + v = GET_INT32_VAL(pData); + break; + } + + if (v < pInfo->minval) { + pInfo->minval = v; + } + + if (v > pInfo->maxval) { + pInfo->maxval = v; + } + + pInfo->numOfElems += 1; + return; + } + tMemBucketPut(pInfo->pMemBucket, pData, 1); SET_VAL(pCtx, 1, 1); @@ -2488,6 +2589,23 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) { doFinalizer(pCtx); } +static void percentile_next_step(SQLFunctionCtx *pCtx) { + SResultInfo * pResInfo = GET_RES_INFO(pCtx); + SPercentileInfo *pInfo = pResInfo->interResultBuf; + + if (pInfo->stage == 0) { + // all data are null, set it completed + if (pInfo->numOfElems == 0) { + pResInfo->complete = true; + } + + pInfo->stage += 1; + pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval); + } else { + pResInfo->complete = true; + } +} + ////////////////////////////////////////////////////////////////////////////////// static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) { SResultInfo *pResInfo = GET_RES_INFO(pCtx); @@ -4513,7 +4631,7 @@ SQLAggFuncElem aAggs[] = {{ percentile_function_setup, percentile_function, percentile_function_f, - no_next_step, + percentile_next_step, percentile_finalizer, noop1, noop1, diff --git a/src/query/inc/qPercentile.h b/src/query/inc/qPercentile.h index 0a52d4f205..c34c24c5b2 100644 --- a/src/query/inc/qPercentile.h +++ b/src/query/inc/qPercentile.h @@ -64,11 +64,11 @@ typedef struct tMemBucket { __perc_hash_func_t hashFunc; } tMemBucket; -tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType); +tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, double maxval); void tMemBucketDestroy(tMemBucket *pBucket); -void tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size); +int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size); double getPercentile(tMemBucket *pMemBucket, double percent); diff --git a/src/query/src/qPercentile.c b/src/query/src/qPercentile.c index 1ce5861e52..ccc451108e 100644 --- a/src/query/src/qPercentile.c +++ b/src/query/src/qPercentile.c @@ -70,6 +70,33 @@ static void resetBoundingBox(MinMaxEntry* range, int32_t type) { } } +static int32_t setBoundingBox(MinMaxEntry* range, int16_t type, double minval, double maxval) { + if (minval > maxval) { + return -1; + } + + switch(type) { + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_INT: + range->iMinVal = minval; + range->iMaxVal = maxval; + break; + + case TSDB_DATA_TYPE_BIGINT: + range->i64MinVal = minval; + range->i64MaxVal = maxval; + break; + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_DOUBLE: + range->dMinVal = minval; + range->dMaxVal = maxval; + break; + } + + return 0; +} + static void resetPosInfo(SSlotInfo* pInfo) { pInfo->size = 0; pInfo->pageId = -1; @@ -135,6 +162,11 @@ int32_t tBucketBigIntHash(tMemBucket *pBucket, const void *value) { return index; } else { + // out of range + if (v < pBucket->range.i64MinVal || v > pBucket->range.i64MaxVal) { + return -1; + } + // todo hash for bigint and float and double int64_t span = pBucket->range.i64MaxVal - pBucket->range.i64MinVal; if (span < pBucket->numOfSlots) { @@ -179,6 +211,11 @@ int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) { return index; } else { + // out of range + if (v < pBucket->range.iMinVal || v > pBucket->range.iMaxVal) { + return -1; + } + // divide a range of [iMinVal, iMaxVal] into 1024 buckets int32_t span = pBucket->range.iMaxVal - pBucket->range.iMinVal; if (span < pBucket->numOfSlots) { @@ -209,6 +246,12 @@ int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) { double posx = (v + DBL_MAX) / x; return ((int32_t)posx) % pBucket->numOfSlots; } else { + + // out of range + if (v < pBucket->range.dMinVal || v > pBucket->range.dMaxVal) { + return -1; + } + // divide a range of [dMinVal, dMaxVal] into 1024 buckets double span = pBucket->range.dMaxVal - pBucket->range.dMinVal; if (span < pBucket->numOfSlots) { @@ -262,7 +305,7 @@ static void resetSlotInfo(tMemBucket* pBucket) { } } -tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType) { +tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, double maxval) { tMemBucket *pBucket = (tMemBucket *)calloc(1, sizeof(tMemBucket)); if (pBucket == NULL) { return NULL; @@ -278,9 +321,14 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType) { pBucket->maxCapacity = 200000; + if (setBoundingBox(&pBucket->range, pBucket->type, minval, maxval) != 0) { + uError("MemBucket:%p, invalid value range: %f-%f", pBucket, minval, maxval); + free(pBucket); + return NULL; + } + pBucket->elemPerPage = (pBucket->bufPageSize - sizeof(tFilePage))/pBucket->bytes; pBucket->comparFn = getKeyComparFunc(pBucket->type); - resetBoundingBox(&pBucket->range, pBucket->type); pBucket->hashFunc = getHashFunc(pBucket->type); if (pBucket->hashFunc == NULL) { @@ -395,23 +443,25 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) { /* * in memory bucket, we only accept data array list */ -void tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { +int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { assert(pBucket != NULL && data != NULL && size > 0); + pBucket->total += (int32_t)size; int32_t bytes = pBucket->bytes; - for (int32_t i = 0; i < size; ++i) { char *d = (char *) data + i * bytes; - int32_t slotIdx = (pBucket->hashFunc)(pBucket, d); - assert(slotIdx >= 0); + int32_t index = (pBucket->hashFunc)(pBucket, d); + if (index == -1) { // the value is out of range, do not add it into bucket + return -1; + } - tMemBucketSlot *pSlot = &pBucket->pSlots[slotIdx]; + tMemBucketSlot *pSlot = &pBucket->pSlots[index]; tMemBucketUpdateBoundingBox(&pSlot->range, d, pBucket->type); // ensure available memory pages to allocate - int32_t groupId = getGroupId(pBucket->numOfSlots, slotIdx, pBucket->times); + int32_t groupId = getGroupId(pBucket->numOfSlots, index, pBucket->times); int32_t pageId = -1; if (pSlot->info.data == NULL || pSlot->info.data->num >= pBucket->elemPerPage) { @@ -432,10 +482,12 @@ void tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { pSlot->info.data->num += 1; pSlot->info.size += 1; } + + return 0; } //////////////////////////////////////////////////////////////////////////////////////////// -static void findMaxMinValue(tMemBucket *pMemBucket, double *maxVal, double *minVal) { +static UNUSED_FUNC void findMaxMinValue(tMemBucket *pMemBucket, double *maxVal, double *minVal) { *minVal = DBL_MAX; *maxVal = -DBL_MAX; @@ -681,16 +733,27 @@ double getPercentile(tMemBucket *pMemBucket, double percent) { // find the min/max value, no need to scan all data in bucket if (fabs(percent - 100.0) < DBL_EPSILON || (percent < DBL_EPSILON)) { - double minx = 0, maxx = 0; - findMaxMinValue(pMemBucket, &maxx, &minx); + MinMaxEntry* pRange = &pMemBucket->range; - return fabs(percent - 100) < DBL_EPSILON ? maxx : minx; + switch(pMemBucket->type) { + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_INT: + return fabs(percent - 100) < DBL_EPSILON? pRange->iMaxVal:pRange->iMinVal; + case TSDB_DATA_TYPE_BIGINT: + return fabs(percent - 100) < DBL_EPSILON? pRange->i64MaxVal:pRange->i64MinVal; + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_DOUBLE: + return fabs(percent - 100) < DBL_EPSILON? pRange->dMaxVal:pRange->dMinVal; + default: + return -1; + } } double percentVal = (percent * (pMemBucket->total - 1)) / ((double)100.0); - int32_t orderIdx = (int32_t)percentVal; // do put data by using buckets + int32_t orderIdx = (int32_t)percentVal; return getPercentileImpl(pMemBucket, orderIdx, percentVal - orderIdx); } -- GitLab