提交 9768c30b 编写于 作者: H Haojun Liao

[td-1372]

上级 5b6ca20f
...@@ -117,6 +117,10 @@ typedef struct SFirstLastInfo { ...@@ -117,6 +117,10 @@ typedef struct SFirstLastInfo {
typedef struct SFirstLastInfo SLastrowInfo; typedef struct SFirstLastInfo SLastrowInfo;
typedef struct SPercentileInfo { typedef struct SPercentileInfo {
tMemBucket *pMemBucket; tMemBucket *pMemBucket;
int32_t stage;
double minval;
double maxval;
int64_t numOfElems;
} SPercentileInfo; } SPercentileInfo;
typedef struct STopBotInfo { typedef struct STopBotInfo {
...@@ -302,7 +306,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -302,7 +306,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
} else if (functionId == TSDB_FUNC_PERCT) { } else if (functionId == TSDB_FUNC_PERCT) {
*type = (int16_t)TSDB_DATA_TYPE_DOUBLE; *type = (int16_t)TSDB_DATA_TYPE_DOUBLE;
*bytes = (int16_t)sizeof(double); *bytes = (int16_t)sizeof(double);
*interBytes = (int16_t)sizeof(double); *interBytes = (int16_t)sizeof(SPercentileInfo);
} else if (functionId == TSDB_FUNC_LEASTSQR) { } else if (functionId == TSDB_FUNC_LEASTSQR) {
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
*bytes = TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE; // string *bytes = TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE; // string
...@@ -2428,12 +2432,14 @@ static bool percentile_function_setup(SQLFunctionCtx *pCtx) { ...@@ -2428,12 +2432,14 @@ static bool percentile_function_setup(SQLFunctionCtx *pCtx) {
if (!function_setup(pCtx)) { if (!function_setup(pCtx)) {
return false; return false;
} }
// in the first round, get the min-max value of all involved data
SResultInfo *pResInfo = GET_RES_INFO(pCtx); SResultInfo *pResInfo = GET_RES_INFO(pCtx);
SPercentileInfo *pInfo = pResInfo->interResultBuf;
pInfo->minval = DBL_MAX;
pInfo->maxval = -DBL_MAX;
pInfo->numOfElems = 0;
((SPercentileInfo *)(pResInfo->interResultBuf))->pMemBucket =
tMemBucketCreate(pCtx->inputBytes, pCtx->inputType);
return true; return true;
} }
...@@ -2442,7 +2448,65 @@ static void percentile_function(SQLFunctionCtx *pCtx) { ...@@ -2442,7 +2448,65 @@ static void percentile_function(SQLFunctionCtx *pCtx) {
SResultInfo * pResInfo = GET_RES_INFO(pCtx); SResultInfo * pResInfo = GET_RES_INFO(pCtx);
SPercentileInfo *pInfo = pResInfo->interResultBuf; SPercentileInfo *pInfo = pResInfo->interResultBuf;
// the first stage, only acquire the min/max value
if (pInfo->stage == 0) {
if (pCtx->preAggVals.isSet) {
if (pInfo->minval > pCtx->preAggVals.statis.min) {
pInfo->minval = pCtx->preAggVals.statis.min;
}
if (pInfo->maxval < pCtx->preAggVals.statis.max) {
pInfo->maxval = pCtx->preAggVals.statis.max;
}
pInfo->numOfElems += (pCtx->size - pCtx->preAggVals.statis.numOfNull);
} else {
for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_CHAR_INDEX(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
continue;
}
// TODO extract functions
double v = 0;
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT:
v = GET_INT8_VAL(data);
break;
case TSDB_DATA_TYPE_SMALLINT:
v = GET_INT16_VAL(data);
break;
case TSDB_DATA_TYPE_BIGINT:
v = (double)(GET_INT64_VAL(data));
break;
case TSDB_DATA_TYPE_FLOAT:
v = GET_FLOAT_VAL(data);
break;
case TSDB_DATA_TYPE_DOUBLE:
v = GET_DOUBLE_VAL(data);
break;
default:
v = GET_INT32_VAL(data);
break;
}
if (v < pInfo->minval) {
pInfo->minval = v;
}
if (v > pInfo->maxval) {
pInfo->maxval = v;
}
pInfo->numOfElems += 1;
}
}
return;
}
// the second stage, calculate the true percentile value
for (int32_t i = 0; i < pCtx->size; ++i) { for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_CHAR_INDEX(pCtx, i); char *data = GET_INPUT_CHAR_INDEX(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType)) { if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
...@@ -2462,10 +2526,47 @@ static void percentile_function_f(SQLFunctionCtx *pCtx, int32_t index) { ...@@ -2462,10 +2526,47 @@ static void percentile_function_f(SQLFunctionCtx *pCtx, int32_t index) {
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
return; return;
} }
SResultInfo *pResInfo = GET_RES_INFO(pCtx); SResultInfo *pResInfo = GET_RES_INFO(pCtx);
SPercentileInfo *pInfo = (SPercentileInfo *)pResInfo->interResultBuf; SPercentileInfo *pInfo = (SPercentileInfo *)pResInfo->interResultBuf;
if (pInfo->stage == 0) {
// TODO extract functions
double v = 0;
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT:
v = GET_INT8_VAL(pData);
break;
case TSDB_DATA_TYPE_SMALLINT:
v = GET_INT16_VAL(pData);
break;
case TSDB_DATA_TYPE_BIGINT:
v = (double)(GET_INT64_VAL(pData));
break;
case TSDB_DATA_TYPE_FLOAT:
v = GET_FLOAT_VAL(pData);
break;
case TSDB_DATA_TYPE_DOUBLE:
v = GET_DOUBLE_VAL(pData);
break;
default:
v = GET_INT32_VAL(pData);
break;
}
if (v < pInfo->minval) {
pInfo->minval = v;
}
if (v > pInfo->maxval) {
pInfo->maxval = v;
}
pInfo->numOfElems += 1;
return;
}
tMemBucketPut(pInfo->pMemBucket, pData, 1); tMemBucketPut(pInfo->pMemBucket, pData, 1);
SET_VAL(pCtx, 1, 1); SET_VAL(pCtx, 1, 1);
...@@ -2488,6 +2589,23 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2488,6 +2589,23 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx); doFinalizer(pCtx);
} }
static void percentile_next_step(SQLFunctionCtx *pCtx) {
SResultInfo * pResInfo = GET_RES_INFO(pCtx);
SPercentileInfo *pInfo = pResInfo->interResultBuf;
if (pInfo->stage == 0) {
// all data are null, set it completed
if (pInfo->numOfElems == 0) {
pResInfo->complete = true;
}
pInfo->stage += 1;
pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval);
} else {
pResInfo->complete = true;
}
}
////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////
static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) { static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx); SResultInfo *pResInfo = GET_RES_INFO(pCtx);
...@@ -4513,7 +4631,7 @@ SQLAggFuncElem aAggs[] = {{ ...@@ -4513,7 +4631,7 @@ SQLAggFuncElem aAggs[] = {{
percentile_function_setup, percentile_function_setup,
percentile_function, percentile_function,
percentile_function_f, percentile_function_f,
no_next_step, percentile_next_step,
percentile_finalizer, percentile_finalizer,
noop1, noop1,
noop1, noop1,
......
...@@ -64,11 +64,11 @@ typedef struct tMemBucket { ...@@ -64,11 +64,11 @@ typedef struct tMemBucket {
__perc_hash_func_t hashFunc; __perc_hash_func_t hashFunc;
} tMemBucket; } tMemBucket;
tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType); tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, double maxval);
void tMemBucketDestroy(tMemBucket *pBucket); void tMemBucketDestroy(tMemBucket *pBucket);
void tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size); int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size);
double getPercentile(tMemBucket *pMemBucket, double percent); double getPercentile(tMemBucket *pMemBucket, double percent);
......
...@@ -70,6 +70,33 @@ static void resetBoundingBox(MinMaxEntry* range, int32_t type) { ...@@ -70,6 +70,33 @@ static void resetBoundingBox(MinMaxEntry* range, int32_t type) {
} }
} }
static int32_t setBoundingBox(MinMaxEntry* range, int16_t type, double minval, double maxval) {
if (minval > maxval) {
return -1;
}
switch(type) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
range->iMinVal = minval;
range->iMaxVal = maxval;
break;
case TSDB_DATA_TYPE_BIGINT:
range->i64MinVal = minval;
range->i64MaxVal = maxval;
break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
range->dMinVal = minval;
range->dMaxVal = maxval;
break;
}
return 0;
}
static void resetPosInfo(SSlotInfo* pInfo) { static void resetPosInfo(SSlotInfo* pInfo) {
pInfo->size = 0; pInfo->size = 0;
pInfo->pageId = -1; pInfo->pageId = -1;
...@@ -135,6 +162,11 @@ int32_t tBucketBigIntHash(tMemBucket *pBucket, const void *value) { ...@@ -135,6 +162,11 @@ int32_t tBucketBigIntHash(tMemBucket *pBucket, const void *value) {
return index; return index;
} else { } else {
// out of range
if (v < pBucket->range.i64MinVal || v > pBucket->range.i64MaxVal) {
return -1;
}
// todo hash for bigint and float and double // todo hash for bigint and float and double
int64_t span = pBucket->range.i64MaxVal - pBucket->range.i64MinVal; int64_t span = pBucket->range.i64MaxVal - pBucket->range.i64MinVal;
if (span < pBucket->numOfSlots) { if (span < pBucket->numOfSlots) {
...@@ -179,6 +211,11 @@ int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) { ...@@ -179,6 +211,11 @@ int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) {
return index; return index;
} else { } else {
// out of range
if (v < pBucket->range.iMinVal || v > pBucket->range.iMaxVal) {
return -1;
}
// divide a range of [iMinVal, iMaxVal] into 1024 buckets // divide a range of [iMinVal, iMaxVal] into 1024 buckets
int32_t span = pBucket->range.iMaxVal - pBucket->range.iMinVal; int32_t span = pBucket->range.iMaxVal - pBucket->range.iMinVal;
if (span < pBucket->numOfSlots) { if (span < pBucket->numOfSlots) {
...@@ -209,6 +246,12 @@ int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) { ...@@ -209,6 +246,12 @@ int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) {
double posx = (v + DBL_MAX) / x; double posx = (v + DBL_MAX) / x;
return ((int32_t)posx) % pBucket->numOfSlots; return ((int32_t)posx) % pBucket->numOfSlots;
} else { } else {
// out of range
if (v < pBucket->range.dMinVal || v > pBucket->range.dMaxVal) {
return -1;
}
// divide a range of [dMinVal, dMaxVal] into 1024 buckets // divide a range of [dMinVal, dMaxVal] into 1024 buckets
double span = pBucket->range.dMaxVal - pBucket->range.dMinVal; double span = pBucket->range.dMaxVal - pBucket->range.dMinVal;
if (span < pBucket->numOfSlots) { if (span < pBucket->numOfSlots) {
...@@ -262,7 +305,7 @@ static void resetSlotInfo(tMemBucket* pBucket) { ...@@ -262,7 +305,7 @@ static void resetSlotInfo(tMemBucket* pBucket) {
} }
} }
tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType) { tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, double maxval) {
tMemBucket *pBucket = (tMemBucket *)calloc(1, sizeof(tMemBucket)); tMemBucket *pBucket = (tMemBucket *)calloc(1, sizeof(tMemBucket));
if (pBucket == NULL) { if (pBucket == NULL) {
return NULL; return NULL;
...@@ -278,9 +321,14 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType) { ...@@ -278,9 +321,14 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType) {
pBucket->maxCapacity = 200000; pBucket->maxCapacity = 200000;
if (setBoundingBox(&pBucket->range, pBucket->type, minval, maxval) != 0) {
uError("MemBucket:%p, invalid value range: %f-%f", pBucket, minval, maxval);
free(pBucket);
return NULL;
}
pBucket->elemPerPage = (pBucket->bufPageSize - sizeof(tFilePage))/pBucket->bytes; pBucket->elemPerPage = (pBucket->bufPageSize - sizeof(tFilePage))/pBucket->bytes;
pBucket->comparFn = getKeyComparFunc(pBucket->type); pBucket->comparFn = getKeyComparFunc(pBucket->type);
resetBoundingBox(&pBucket->range, pBucket->type);
pBucket->hashFunc = getHashFunc(pBucket->type); pBucket->hashFunc = getHashFunc(pBucket->type);
if (pBucket->hashFunc == NULL) { if (pBucket->hashFunc == NULL) {
...@@ -395,23 +443,25 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) { ...@@ -395,23 +443,25 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) {
/* /*
* in memory bucket, we only accept data array list * in memory bucket, we only accept data array list
*/ */
void tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
assert(pBucket != NULL && data != NULL && size > 0); assert(pBucket != NULL && data != NULL && size > 0);
pBucket->total += (int32_t)size; pBucket->total += (int32_t)size;
int32_t bytes = pBucket->bytes; int32_t bytes = pBucket->bytes;
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
char *d = (char *) data + i * bytes; char *d = (char *) data + i * bytes;
int32_t slotIdx = (pBucket->hashFunc)(pBucket, d); int32_t index = (pBucket->hashFunc)(pBucket, d);
assert(slotIdx >= 0); if (index == -1) { // the value is out of range, do not add it into bucket
return -1;
}
tMemBucketSlot *pSlot = &pBucket->pSlots[slotIdx]; tMemBucketSlot *pSlot = &pBucket->pSlots[index];
tMemBucketUpdateBoundingBox(&pSlot->range, d, pBucket->type); tMemBucketUpdateBoundingBox(&pSlot->range, d, pBucket->type);
// ensure available memory pages to allocate // ensure available memory pages to allocate
int32_t groupId = getGroupId(pBucket->numOfSlots, slotIdx, pBucket->times); int32_t groupId = getGroupId(pBucket->numOfSlots, index, pBucket->times);
int32_t pageId = -1; int32_t pageId = -1;
if (pSlot->info.data == NULL || pSlot->info.data->num >= pBucket->elemPerPage) { if (pSlot->info.data == NULL || pSlot->info.data->num >= pBucket->elemPerPage) {
...@@ -432,10 +482,12 @@ void tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { ...@@ -432,10 +482,12 @@ void tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pSlot->info.data->num += 1; pSlot->info.data->num += 1;
pSlot->info.size += 1; pSlot->info.size += 1;
} }
return 0;
} }
//////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////
static void findMaxMinValue(tMemBucket *pMemBucket, double *maxVal, double *minVal) { static UNUSED_FUNC void findMaxMinValue(tMemBucket *pMemBucket, double *maxVal, double *minVal) {
*minVal = DBL_MAX; *minVal = DBL_MAX;
*maxVal = -DBL_MAX; *maxVal = -DBL_MAX;
...@@ -681,16 +733,27 @@ double getPercentile(tMemBucket *pMemBucket, double percent) { ...@@ -681,16 +733,27 @@ double getPercentile(tMemBucket *pMemBucket, double percent) {
// find the min/max value, no need to scan all data in bucket // find the min/max value, no need to scan all data in bucket
if (fabs(percent - 100.0) < DBL_EPSILON || (percent < DBL_EPSILON)) { if (fabs(percent - 100.0) < DBL_EPSILON || (percent < DBL_EPSILON)) {
double minx = 0, maxx = 0; MinMaxEntry* pRange = &pMemBucket->range;
findMaxMinValue(pMemBucket, &maxx, &minx);
return fabs(percent - 100) < DBL_EPSILON ? maxx : minx; switch(pMemBucket->type) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
return fabs(percent - 100) < DBL_EPSILON? pRange->iMaxVal:pRange->iMinVal;
case TSDB_DATA_TYPE_BIGINT:
return fabs(percent - 100) < DBL_EPSILON? pRange->i64MaxVal:pRange->i64MinVal;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
return fabs(percent - 100) < DBL_EPSILON? pRange->dMaxVal:pRange->dMinVal;
default:
return -1;
}
} }
double percentVal = (percent * (pMemBucket->total - 1)) / ((double)100.0); double percentVal = (percent * (pMemBucket->total - 1)) / ((double)100.0);
int32_t orderIdx = (int32_t)percentVal;
// do put data by using buckets // do put data by using buckets
int32_t orderIdx = (int32_t)percentVal;
return getPercentileImpl(pMemBucket, orderIdx, percentVal - orderIdx); return getPercentileImpl(pMemBucket, orderIdx, percentVal - orderIdx);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册