提交 c04da26c 编写于 作者: H Haojun Liao

[TD-2634]

上级 84b867e2
...@@ -16,6 +16,10 @@ ...@@ -16,6 +16,10 @@
#ifndef TDENGINE_QPERCENTILE_H #ifndef TDENGINE_QPERCENTILE_H
#define TDENGINE_QPERCENTILE_H #define TDENGINE_QPERCENTILE_H
#ifdef __cplusplus
extern "C" {
#endif
#include "qExtbuffer.h" #include "qExtbuffer.h"
#include "qResultbuf.h" #include "qResultbuf.h"
#include "qTsbuf.h" #include "qTsbuf.h"
...@@ -23,13 +27,13 @@ ...@@ -23,13 +27,13 @@
typedef struct MinMaxEntry { typedef struct MinMaxEntry {
union { union {
double dMinVal; double dMinVal;
int32_t iMinVal;
int64_t i64MinVal; int64_t i64MinVal;
uint64_t u64MinVal;
}; };
union { union {
double dMaxVal; double dMaxVal;
int32_t iMaxVal;
int64_t i64MaxVal; int64_t i64MaxVal;
int64_t u64MaxVal;
}; };
} MinMaxEntry; } MinMaxEntry;
...@@ -59,7 +63,7 @@ typedef struct tMemBucket { ...@@ -59,7 +63,7 @@ typedef struct tMemBucket {
int32_t times; // count that has been checked for deciding the correct data value buckets. int32_t times; // count that has been checked for deciding the correct data value buckets.
__compar_fn_t comparFn; __compar_fn_t comparFn;
tMemBucketSlot *pSlots; tMemBucketSlot * pSlots;
SDiskbasedResultBuf *pBuffer; SDiskbasedResultBuf *pBuffer;
__perc_hash_func_t hashFunc; __perc_hash_func_t hashFunc;
} tMemBucket; } tMemBucket;
...@@ -73,3 +77,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size); ...@@ -73,3 +77,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size);
double getPercentile(tMemBucket *pMemBucket, double percent); double getPercentile(tMemBucket *pMemBucket, double percent);
#endif // TDENGINE_QPERCENTILE_H #endif // TDENGINE_QPERCENTILE_H
#ifdef __cplusplus
}
#endif
\ No newline at end of file
...@@ -2545,7 +2545,7 @@ static void percentile_next_step(SQLFunctionCtx *pCtx) { ...@@ -2545,7 +2545,7 @@ static void percentile_next_step(SQLFunctionCtx *pCtx) {
if (pInfo->numOfElems == 0) { if (pInfo->numOfElems == 0) {
pResInfo->complete = true; pResInfo->complete = true;
} else { } else {
pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, GET_DOUBLE_VAL(&pInfo->minval), GET_DOUBLE_VAL(&pInfo->maxval)); pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval);
} }
pInfo->stage += 1; pInfo->stage += 1;
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "taosdef.h" #include "taosdef.h"
#include "tulog.h" #include "tulog.h"
#include "tcompare.h" #include "tcompare.h"
#include "ttype.h"
#define DEFAULT_NUM_OF_SLOT 1024 #define DEFAULT_NUM_OF_SLOT 1024
...@@ -48,25 +49,15 @@ static tFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) ...@@ -48,25 +49,15 @@ static tFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
} }
static void resetBoundingBox(MinMaxEntry* range, int32_t type) { static void resetBoundingBox(MinMaxEntry* range, int32_t type) {
switch (type) { if (IS_SIGNED_NUMERIC_TYPE(type)) {
case TSDB_DATA_TYPE_BIGINT: {
range->i64MaxVal = INT64_MIN; range->i64MaxVal = INT64_MIN;
range->i64MinVal = INT64_MAX; range->i64MinVal = INT64_MAX;
break; } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
}; range->u64MaxVal = 0;
case TSDB_DATA_TYPE_INT: range->u64MinVal = UINT64_MAX;
case TSDB_DATA_TYPE_SMALLINT: } else {
case TSDB_DATA_TYPE_TINYINT: {
range->iMaxVal = INT32_MIN;
range->iMinVal = INT32_MAX;
break;
};
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_FLOAT: {
range->dMaxVal = -DBL_MAX; range->dMaxVal = -DBL_MAX;
range->dMinVal = DBL_MAX; range->dMinVal = DBL_MAX;
break;
}
} }
} }
...@@ -75,23 +66,15 @@ static int32_t setBoundingBox(MinMaxEntry* range, int16_t type, double minval, d ...@@ -75,23 +66,15 @@ static int32_t setBoundingBox(MinMaxEntry* range, int16_t type, double minval, d
return -1; return -1;
} }
switch(type) { if (IS_SIGNED_NUMERIC_TYPE(type)) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
range->iMinVal = (int32_t) minval;
range->iMaxVal = (int32_t) maxval;
break;
case TSDB_DATA_TYPE_BIGINT:
range->i64MinVal = (int64_t) minval; range->i64MinVal = (int64_t) minval;
range->i64MaxVal = (int64_t) maxval; range->i64MaxVal = (int64_t) maxval;
break; } else if (IS_UNSIGNED_NUMERIC_TYPE(type)){
case TSDB_DATA_TYPE_FLOAT: range->u64MinVal = (uint64_t) minval;
case TSDB_DATA_TYPE_DOUBLE: range->u64MaxVal = (uint64_t) maxval;
} else {
range->dMinVal = minval; range->dMinVal = minval;
range->dMaxVal = maxval; range->dMaxVal = maxval;
break;
} }
return 0; return 0;
...@@ -120,58 +103,24 @@ double findOnlyResult(tMemBucket *pMemBucket) { ...@@ -120,58 +103,24 @@ double findOnlyResult(tMemBucket *pMemBucket) {
tFilePage* pPage = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId); tFilePage* pPage = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId);
assert(pPage->num == 1); assert(pPage->num == 1);
switch (pMemBucket->type) { double v = 0;
case TSDB_DATA_TYPE_INT: GET_TYPED_DATA(v, double, pMemBucket->type, pPage->data);
return *(int32_t *)pPage->data; return v;
case TSDB_DATA_TYPE_SMALLINT:
return *(int16_t *)pPage->data;
case TSDB_DATA_TYPE_TINYINT:
return *(int8_t *)pPage->data;
case TSDB_DATA_TYPE_BIGINT:
return (double)(*(int64_t *)pPage->data);
case TSDB_DATA_TYPE_DOUBLE: {
double dv = GET_DOUBLE_VAL(pPage->data);
return dv;
}
case TSDB_DATA_TYPE_FLOAT: {
float fv = GET_FLOAT_VAL(pPage->data);
return fv;
}
default:
return 0;
}
} }
return 0; return 0;
} }
int32_t tBucketBigIntHash(tMemBucket *pBucket, const void *value) { int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) {
int64_t v = *(int64_t *)value; int64_t v = 0;
int32_t index = -1; GET_TYPED_DATA(v, int64_t, pBucket->type, value);
int32_t halfSlot = pBucket->numOfSlots >> 1;
// int32_t bits = 32;//bitsOfNumber(pBucket->numOfSlots) - 1;
if (pBucket->range.i64MaxVal == INT64_MIN) {
if (v >= 0) {
index = (v >> (64 - 9)) + halfSlot;
} else { // v<0
index = ((-v) >> (64 - 9));
index = -index + (halfSlot - 1);
}
return index;
} else {
// out of range
if (v < pBucket->range.i64MinVal || v > pBucket->range.i64MaxVal) {
return -1;
}
// todo hash for bigint and float and double int32_t index = -1;
int64_t span = pBucket->range.i64MaxVal - pBucket->range.i64MinVal; // divide the value range into 1024 buckets
uint64_t span = pBucket->range.i64MaxVal - pBucket->range.i64MinVal;
if (span < pBucket->numOfSlots) { if (span < pBucket->numOfSlots) {
int32_t delta = (int32_t)(v - pBucket->range.i64MinVal); int32_t delta = v - pBucket->range.i64MinVal;
index = delta % pBucket->numOfSlots; index = (delta % pBucket->numOfSlots);
} else { } else {
double slotSpan = (double)span / pBucket->numOfSlots; double slotSpan = (double)span / pBucket->numOfSlots;
index = (int32_t)((v - pBucket->range.i64MinVal) / slotSpan); index = (int32_t)((v - pBucket->range.i64MinVal) / slotSpan);
...@@ -180,57 +129,30 @@ int32_t tBucketBigIntHash(tMemBucket *pBucket, const void *value) { ...@@ -180,57 +129,30 @@ int32_t tBucketBigIntHash(tMemBucket *pBucket, const void *value) {
} }
} }
assert(v >= pBucket->range.i64MinVal && v <= pBucket->range.i64MaxVal && index >= 0 && index < pBucket->numOfSlots);
return index; return index;
}
} }
// todo refactor to more generic int32_t tBucketUintHash(tMemBucket *pBucket, const void *value) {
int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) { int64_t v = 0;
int32_t v = 0; GET_TYPED_DATA(v, uint64_t, pBucket->type, value);
switch(pBucket->type) {
case TSDB_DATA_TYPE_SMALLINT: v = *(int16_t*) value; break;
case TSDB_DATA_TYPE_TINYINT: v = *(int8_t*) value; break;
default: v = *(int32_t*) value;break;
}
int32_t index = -1; int32_t index = -1;
if (pBucket->range.iMaxVal == INT32_MIN) { // divide the value range into 1024 buckets
/* uint64_t span = pBucket->range.u64MaxVal - pBucket->range.u64MinVal;
* taking negative integer into consideration,
* there is only half of pBucket->segs available for non-negative integer
*/
int32_t halfSlot = pBucket->numOfSlots >> 1;
int32_t bits = 32;//bitsOfNumber(pBucket->numOfSlots) - 1;
if (v >= 0) {
index = (v >> (bits - 9)) + halfSlot;
} else { // v < 0
index = ((-v) >> (32 - 9));
index = -index + (halfSlot - 1);
}
return index;
} else {
// out of range
if (v < pBucket->range.iMinVal || v > pBucket->range.iMaxVal) {
return -1;
}
// divide a range of [iMinVal, iMaxVal] into 1024 buckets
int32_t span = pBucket->range.iMaxVal - pBucket->range.iMinVal;
if (span < pBucket->numOfSlots) { if (span < pBucket->numOfSlots) {
int32_t delta = v - pBucket->range.iMinVal; int32_t delta = v - pBucket->range.u64MinVal;
index = (delta % pBucket->numOfSlots); index = (delta % pBucket->numOfSlots);
} else { } else {
double slotSpan = (double)span / pBucket->numOfSlots; double slotSpan = (double)span / pBucket->numOfSlots;
index = (int32_t)((v - pBucket->range.iMinVal) / slotSpan); index = (int32_t)((v - pBucket->range.u64MinVal) / slotSpan);
if (v == pBucket->range.iMaxVal) { if (v == pBucket->range.u64MaxVal) {
index -= 1; index -= 1;
} }
} }
assert(v >= pBucket->range.u64MinVal && v <= pBucket->range.i64MaxVal && index >= 0 && index < pBucket->numOfSlots);
return index; return index;
}
} }
int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) { int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) {
...@@ -243,21 +165,6 @@ int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) { ...@@ -243,21 +165,6 @@ int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) {
int32_t index = -1; int32_t index = -1;
if (pBucket->range.dMinVal == DBL_MAX) {
/*
* taking negative integer into consideration,
* there is only half of pBucket->segs available for non-negative integer
*/
double x = DBL_MAX / (pBucket->numOfSlots >> 1);
double posx = (v + DBL_MAX) / x;
return ((int32_t)posx) % pBucket->numOfSlots;
} else {
// out of range
if (v < pBucket->range.dMinVal || v > pBucket->range.dMaxVal) {
return -1;
}
// divide a range of [dMinVal, dMaxVal] into 1024 buckets // divide a range of [dMinVal, dMaxVal] into 1024 buckets
double span = pBucket->range.dMaxVal - pBucket->range.dMinVal; double span = pBucket->range.dMaxVal - pBucket->range.dMinVal;
if (span < pBucket->numOfSlots) { if (span < pBucket->numOfSlots) {
...@@ -271,34 +178,17 @@ int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) { ...@@ -271,34 +178,17 @@ int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) {
} }
} }
if (index < 0 || index > pBucket->numOfSlots) { assert(v >= pBucket->range.dMinVal && v <= pBucket->range.dMaxVal && index >= 0 && index < pBucket->numOfSlots);
uError("error in hash process. slot id: %d", index);
}
return index; return index;
}
} }
static __perc_hash_func_t getHashFunc(int32_t type) { static __perc_hash_func_t getHashFunc(int32_t type) {
switch (type) { if (IS_SIGNED_NUMERIC_TYPE(type)) {
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TINYINT: {
return tBucketIntHash; return tBucketIntHash;
}; } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
return tBucketUintHash;
case TSDB_DATA_TYPE_DOUBLE: } else {
case TSDB_DATA_TYPE_FLOAT: {
return tBucketDoubleHash; return tBucketDoubleHash;
};
case TSDB_DATA_TYPE_BIGINT: {
return tBucketBigIntHash;
};
default: {
return NULL;
}
} }
} }
...@@ -372,77 +262,41 @@ void tMemBucketDestroy(tMemBucket *pBucket) { ...@@ -372,77 +262,41 @@ void tMemBucketDestroy(tMemBucket *pBucket) {
} }
void tMemBucketUpdateBoundingBox(MinMaxEntry *r, const char *data, int32_t dataType) { void tMemBucketUpdateBoundingBox(MinMaxEntry *r, const char *data, int32_t dataType) {
switch (dataType) { if (IS_SIGNED_NUMERIC_TYPE(dataType)) {
case TSDB_DATA_TYPE_INT: { int64_t v = 0;
int32_t val = *(int32_t *)data; GET_TYPED_DATA(v, int64_t, dataType, data);
if (r->iMinVal > val) {
r->iMinVal = val;
}
if (r->iMaxVal < val) {
r->iMaxVal = val;
}
break;
};
case TSDB_DATA_TYPE_BIGINT: {
int64_t val = *(int64_t *)data;
if (r->i64MinVal > val) {
r->i64MinVal = val;
}
if (r->i64MaxVal < val) { if (r->i64MinVal > v) {
r->i64MaxVal = val; r->i64MinVal = v;
}
break;
};
case TSDB_DATA_TYPE_SMALLINT: {
int32_t val = *(int16_t *)data;
if (r->iMinVal > val) {
r->iMinVal = val;
}
if (r->iMaxVal < val) {
r->iMaxVal = val;
}
break;
};
case TSDB_DATA_TYPE_TINYINT: {
int32_t val = *(int8_t *)data;
if (r->iMinVal > val) {
r->iMinVal = val;
} }
if (r->iMaxVal < val) { if (r->i64MaxVal < v) {
r->iMaxVal = val; r->i64MaxVal = v;
} }
} else if (IS_UNSIGNED_NUMERIC_TYPE(dataType)) {
uint64_t v = 0;
GET_TYPED_DATA(v, uint64_t, dataType, data);
break; if (r->i64MinVal > v) {
}; r->i64MinVal = v;
case TSDB_DATA_TYPE_DOUBLE: {
// double val = *(double *)data;
double val = GET_DOUBLE_VAL(data);
if (r->dMinVal > val) {
r->dMinVal = val;
} }
if (r->dMaxVal < val) { if (r->i64MaxVal < v) {
r->dMaxVal = val; r->i64MaxVal = v;
} }
break; } else if (IS_FLOAT_TYPE(dataType)) {
}; double v = 0;
case TSDB_DATA_TYPE_FLOAT: { GET_TYPED_DATA(v, double, dataType, data);
double val = GET_FLOAT_VAL(data);
if (r->dMinVal > val) { if (r->dMinVal > v) {
r->dMinVal = val; r->dMinVal = v;
} }
if (r->dMaxVal < val) { if (r->dMaxVal < v) {
r->dMaxVal = val; r->dMaxVal = v;
} }
break; } else {
}; assert(0);
default: { assert(false); }
} }
} }
...@@ -452,16 +306,13 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, const char *data, int32_t dataT ...@@ -452,16 +306,13 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, const char *data, int32_t dataT
int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
assert(pBucket != NULL && data != NULL && size > 0); assert(pBucket != NULL && data != NULL && size > 0);
pBucket->total += (int32_t)size; int32_t count = 0;
int32_t bytes = pBucket->bytes; int32_t bytes = pBucket->bytes;
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
char *d = (char *) data + i * bytes; char *d = (char *) data + i * bytes;
count += 1;
int32_t index = (pBucket->hashFunc)(pBucket, d); int32_t index = (pBucket->hashFunc)(pBucket, d);
if (index == -1) { // the value is out of range, do not add it into bucket
return -1;
}
tMemBucketSlot *pSlot = &pBucket->pSlots[index]; tMemBucketSlot *pSlot = &pBucket->pSlots[index];
tMemBucketUpdateBoundingBox(&pSlot->range, d, pBucket->type); tMemBucketUpdateBoundingBox(&pSlot->range, d, pBucket->type);
...@@ -489,64 +340,11 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { ...@@ -489,64 +340,11 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pSlot->info.size += 1; pSlot->info.size += 1;
} }
pBucket->total += count;
return 0; return 0;
} }
//////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////
static UNUSED_FUNC void findMaxMinValue(tMemBucket *pMemBucket, double *maxVal, double *minVal) {
*minVal = DBL_MAX;
*maxVal = -DBL_MAX;
for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) {
tMemBucketSlot *pSlot = &pMemBucket->pSlots[i];
if (pSlot->info.size == 0) {
continue;
}
switch (pMemBucket->type) {
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TINYINT: {
double minv = pSlot->range.iMinVal;
double maxv = pSlot->range.iMaxVal;
if (*minVal > minv) {
*minVal = minv;
}
if (*maxVal < maxv) {
*maxVal = maxv;
}
break;
}
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_FLOAT: {
double minv = pSlot->range.dMinVal;
double maxv = pSlot->range.dMaxVal;
if (*minVal > minv) {
*minVal = minv;
}
if (*maxVal < maxv) {
*maxVal = maxv;
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
double minv = (double)pSlot->range.i64MinVal;
double maxv = (double)pSlot->range.i64MaxVal;
if (*minVal > minv) {
*minVal = minv;
}
if (*maxVal < maxv) {
*maxVal = maxv;
}
break;
}
}
}
}
/* /*
* *
* now, we need to find the minimum value of the next slot for * now, we need to find the minimum value of the next slot for
...@@ -565,7 +363,6 @@ static MinMaxEntry getMinMaxEntryOfNextSlotWithData(tMemBucket *pMemBucket, int3 ...@@ -565,7 +363,6 @@ static MinMaxEntry getMinMaxEntryOfNextSlotWithData(tMemBucket *pMemBucket, int3
} }
static bool isIdenticalData(tMemBucket *pMemBucket, int32_t index); static bool isIdenticalData(tMemBucket *pMemBucket, int32_t index);
char *getFirstElemOfMemBuffer(tMemBucketSlot *pSeg, int32_t slotIdx, tFilePage *pPage);
static double getIdenticalDataVal(tMemBucket* pMemBucket, int32_t slotIndex) { static double getIdenticalDataVal(tMemBucket* pMemBucket, int32_t slotIndex) {
assert(isIdenticalData(pMemBucket, slotIndex)); assert(isIdenticalData(pMemBucket, slotIndex));
...@@ -573,24 +370,12 @@ static double getIdenticalDataVal(tMemBucket* pMemBucket, int32_t slotIndex) { ...@@ -573,24 +370,12 @@ static double getIdenticalDataVal(tMemBucket* pMemBucket, int32_t slotIndex) {
tMemBucketSlot *pSlot = &pMemBucket->pSlots[slotIndex]; tMemBucketSlot *pSlot = &pMemBucket->pSlots[slotIndex];
double finalResult = 0.0; double finalResult = 0.0;
switch (pMemBucket->type) { if (IS_SIGNED_NUMERIC_TYPE(pMemBucket->type)) {
case TSDB_DATA_TYPE_SMALLINT: finalResult = pSlot->range.i64MinVal;
case TSDB_DATA_TYPE_TINYINT: } else if (IS_UNSIGNED_NUMERIC_TYPE(pMemBucket->type)) {
case TSDB_DATA_TYPE_INT: { finalResult = pSlot->range.u64MinVal;
finalResult = pSlot->range.iMinVal; } else {
break;
}
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: {
finalResult = pSlot->range.dMinVal; finalResult = pSlot->range.dMinVal;
break;
};
case TSDB_DATA_TYPE_BIGINT: {
finalResult = (double)pSlot->range.i64MinVal;
break;
}
} }
return finalResult; return finalResult;
...@@ -616,26 +401,16 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) ...@@ -616,26 +401,16 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
double maxOfThisSlot = 0; double maxOfThisSlot = 0;
double minOfNextSlot = 0; double minOfNextSlot = 0;
switch (pMemBucket->type) { if (IS_SIGNED_NUMERIC_TYPE(pMemBucket->type)) {
case TSDB_DATA_TYPE_INT: maxOfThisSlot = pSlot->range.i64MaxVal;
case TSDB_DATA_TYPE_SMALLINT: minOfNextSlot = next.i64MinVal;
case TSDB_DATA_TYPE_TINYINT: { } else if (IS_UNSIGNED_NUMERIC_TYPE(pMemBucket->type)) {
maxOfThisSlot = pSlot->range.iMaxVal; maxOfThisSlot = pSlot->range.u64MaxVal;
minOfNextSlot = next.iMinVal; minOfNextSlot = next.u64MinVal;
break; } else {
};
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE: {
maxOfThisSlot = pSlot->range.dMaxVal; maxOfThisSlot = pSlot->range.dMaxVal;
minOfNextSlot = next.dMinVal; minOfNextSlot = next.dMinVal;
break;
};
case TSDB_DATA_TYPE_BIGINT: {
maxOfThisSlot = (double)pSlot->range.i64MaxVal;
minOfNextSlot = (double)next.i64MinVal;
break;
} }
};
assert(minOfNextSlot > maxOfThisSlot); assert(minOfNextSlot > maxOfThisSlot);
...@@ -652,38 +427,8 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) ...@@ -652,38 +427,8 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
char *nextVal = thisVal + pMemBucket->bytes; char *nextVal = thisVal + pMemBucket->bytes;
double td = 1.0, nd = 1.0; double td = 1.0, nd = 1.0;
switch (pMemBucket->type) { GET_TYPED_DATA(td, double, pMemBucket->type, thisVal);
case TSDB_DATA_TYPE_SMALLINT: { GET_TYPED_DATA(nd, double, pMemBucket->type, nextVal);
td = *(int16_t *)thisVal;
nd = *(int16_t *)nextVal;
break;
}
case TSDB_DATA_TYPE_TINYINT: {
td = *(int8_t *)thisVal;
nd = *(int8_t *)nextVal;
break;
}
case TSDB_DATA_TYPE_INT: {
td = *(int32_t *)thisVal;
nd = *(int32_t *)nextVal;
break;
};
case TSDB_DATA_TYPE_FLOAT: {
td = GET_FLOAT_VAL(thisVal);
nd = GET_FLOAT_VAL(nextVal);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
td = GET_DOUBLE_VAL(thisVal);
nd = GET_DOUBLE_VAL(nextVal);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
td = (double)*(int64_t *)thisVal;
nd = (double)*(int64_t *)nextVal;
break;
}
}
double val = (1 - fraction) * td + fraction * nd; double val = (1 - fraction) * td + fraction * nd;
tfree(buffer); tfree(buffer);
...@@ -741,20 +486,14 @@ double getPercentile(tMemBucket *pMemBucket, double percent) { ...@@ -741,20 +486,14 @@ double getPercentile(tMemBucket *pMemBucket, double percent) {
if (fabs(percent - 100.0) < DBL_EPSILON || (percent < DBL_EPSILON)) { if (fabs(percent - 100.0) < DBL_EPSILON || (percent < DBL_EPSILON)) {
MinMaxEntry* pRange = &pMemBucket->range; MinMaxEntry* pRange = &pMemBucket->range;
switch(pMemBucket->type) { if (IS_SIGNED_NUMERIC_TYPE(pMemBucket->type)) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
return fabs(percent - 100) < DBL_EPSILON? pRange->iMaxVal:pRange->iMinVal;
case TSDB_DATA_TYPE_BIGINT: {
double v = (double)(fabs(percent - 100) < DBL_EPSILON ? pRange->i64MaxVal : pRange->i64MinVal); double v = (double)(fabs(percent - 100) < DBL_EPSILON ? pRange->i64MaxVal : pRange->i64MinVal);
return v; return v;
} } else if (IS_UNSIGNED_NUMERIC_TYPE(pMemBucket->type)) {
case TSDB_DATA_TYPE_FLOAT: double v = (double)(fabs(percent - 100) < DBL_EPSILON ? pRange->u64MaxVal : pRange->u64MinVal);
case TSDB_DATA_TYPE_DOUBLE: return v;
} else {
return fabs(percent - 100) < DBL_EPSILON? pRange->dMaxVal:pRange->dMinVal; return fabs(percent - 100) < DBL_EPSILON? pRange->dMaxVal:pRange->dMinVal;
default:
return -1;
} }
} }
...@@ -771,40 +510,9 @@ double getPercentile(tMemBucket *pMemBucket, double percent) { ...@@ -771,40 +510,9 @@ double getPercentile(tMemBucket *pMemBucket, double percent) {
bool isIdenticalData(tMemBucket *pMemBucket, int32_t index) { bool isIdenticalData(tMemBucket *pMemBucket, int32_t index) {
tMemBucketSlot *pSeg = &pMemBucket->pSlots[index]; tMemBucketSlot *pSeg = &pMemBucket->pSlots[index];
if (pMemBucket->type == TSDB_DATA_TYPE_INT || pMemBucket->type == TSDB_DATA_TYPE_BIGINT || if (IS_FLOAT_TYPE(pMemBucket->type)) {
pMemBucket->type == TSDB_DATA_TYPE_SMALLINT || pMemBucket->type == TSDB_DATA_TYPE_TINYINT) {
return pSeg->range.i64MinVal == pSeg->range.i64MaxVal;
}
if (pMemBucket->type == TSDB_DATA_TYPE_FLOAT || pMemBucket->type == TSDB_DATA_TYPE_DOUBLE) {
return fabs(pSeg->range.dMaxVal - pSeg->range.dMinVal) < DBL_EPSILON; return fabs(pSeg->range.dMaxVal - pSeg->range.dMinVal) < DBL_EPSILON;
} else {
return pSeg->range.i64MinVal == pSeg->range.i64MaxVal;
} }
return false;
}
/*
* get the first element of one slot into memory.
* if no data of current slot in memory, load it from disk
*/
char *getFirstElemOfMemBuffer(tMemBucketSlot *pSeg, int32_t slotIdx, tFilePage *pPage) {
// STSBuf *pMemBuffer = pSeg->pBuffer[slotIdx];
char *thisVal = NULL;
// if (pSeg->pBuffer[slotIdx]->numOfTotal != 0) {
//// thisVal = pSeg->pBuffer[slotIdx]->pHead->item.data;
// } else {
// /*
// * no data in memory, load one page into memory
// */
// tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[0];
// assert(pFlushInfo->numOfPages == pMemBuffer->fileMeta.nFileSize);
// int32_t ret;
// ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET);
// UNUSED(ret);
// size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file);
// UNUSED(sz);
// thisVal = pPage->data;
// }
return thisVal;
} }
\ No newline at end of file
#include <gtest/gtest.h>
#include <iostream>
#include "qResultbuf.h"
#include "taos.h"
#include "taosdef.h"
#include "qPercentile.h"
namespace {
tMemBucket *createBigIntDataBucket(int32_t start, int32_t end) {
tMemBucket *pBucket = tMemBucketCreate(sizeof(int64_t), TSDB_DATA_TYPE_BIGINT, start, end);
for (int32_t i = start; i <= end; ++i) {
int64_t val = i;
tMemBucketPut(pBucket, &val, 1);
}
return pBucket;
}
tMemBucket *createIntDataBucket(int32_t start, int32_t end) {
tMemBucket *pBucket = tMemBucketCreate(sizeof(int32_t), TSDB_DATA_TYPE_INT, start, end);
for (int32_t i = start; i <= end; ++i) {
int32_t val = i;
tMemBucketPut(pBucket, &val, 1);
}
return pBucket;
}
tMemBucket *createDoubleDataBucket(int32_t start, int32_t end) {
tMemBucket *pBucket = tMemBucketCreate(sizeof(double), TSDB_DATA_TYPE_DOUBLE, start, end);
for (int32_t i = start; i <= end; ++i) {
double val = i;
int32_t ret = tMemBucketPut(pBucket, &val, 1);
if (ret != 0) {
printf("value out of range:%f", val);
}
}
return pBucket;
}
tMemBucket *createUnsignedDataBucket(int32_t start, int32_t end, int32_t type) {
tMemBucket *pBucket = tMemBucketCreate(tDataTypeDesc[type].nSize, type, start, end);
for (int32_t i = start; i <= end; ++i) {
uint64_t k = i;
int32_t ret = tMemBucketPut(pBucket, &k, 1);
if (ret != 0) {
printf("value out of range:%f", k);
}
}
return pBucket;
}
void intDataTest() {
printf("running %s\n", __FUNCTION__);
tMemBucket *pBucket = NULL;
double result = 0.;
pBucket = createIntDataBucket(0, 0);
result = getPercentile(pBucket, 0);
ASSERT_DOUBLE_EQ(result, 0);
tMemBucketDestroy(pBucket);
pBucket = createIntDataBucket(0, 1);
result = getPercentile(pBucket, 100);
ASSERT_DOUBLE_EQ(result, 1);
result = getPercentile(pBucket, 0);
ASSERT_DOUBLE_EQ(result, 0);
tMemBucketDestroy(pBucket);
pBucket = createIntDataBucket(-1, 1);
result = getPercentile(pBucket, 50);
ASSERT_DOUBLE_EQ(result, 0);
result = getPercentile(pBucket, 0);
ASSERT_DOUBLE_EQ(result, -1);
result = getPercentile(pBucket, 75);
ASSERT_DOUBLE_EQ(result, 0.5);
result = getPercentile(pBucket, 100);
ASSERT_DOUBLE_EQ(result, 1);
tMemBucketDestroy(pBucket);
pBucket = createIntDataBucket(0, 99999);
result = getPercentile(pBucket, 50);
ASSERT_DOUBLE_EQ(result, 49999.5);
tMemBucketDestroy(pBucket);
}
void bigintDataTest() {
printf("running %s\n", __FUNCTION__);
tMemBucket *pBucket = NULL;
double result = 0.0;
pBucket = createBigIntDataBucket(-1000, 1000);
result = getPercentile(pBucket, 50);
ASSERT_DOUBLE_EQ(result, 0.);
tMemBucketDestroy(pBucket);
pBucket = createBigIntDataBucket(-10000, 10000);
result = getPercentile(pBucket, 100);
ASSERT_DOUBLE_EQ(result, 10000.0);
tMemBucketDestroy(pBucket);
pBucket = createBigIntDataBucket(-10000, 10000);
result = getPercentile(pBucket, 75);
ASSERT_DOUBLE_EQ(result, 5000.0);
tMemBucketDestroy(pBucket);
}
void doubleDataTest() {
printf("running %s\n", __FUNCTION__);
tMemBucket *pBucket = NULL;
double result = 0;
pBucket = createDoubleDataBucket(-10, 10);
result = getPercentile(pBucket, 0);
ASSERT_DOUBLE_EQ(result, -10.0);
printf("result is: %lf\n", result);
tMemBucketDestroy(pBucket);
pBucket = createDoubleDataBucket(-100000, 100000);
result = getPercentile(pBucket, 25);
ASSERT_DOUBLE_EQ(result, -50000);
printf("result is: %lf\n", result);
tMemBucketDestroy(pBucket);
pBucket = createDoubleDataBucket(-100000, 100000);
result = getPercentile(pBucket, 50);
ASSERT_DOUBLE_EQ(result, 0);
tMemBucketDestroy(pBucket);
pBucket = createDoubleDataBucket(-100000, 100000);
result = getPercentile(pBucket, 75);
ASSERT_DOUBLE_EQ(result, 50000);
tMemBucketDestroy(pBucket);
pBucket = createDoubleDataBucket(-100000, 100000);
result = getPercentile(pBucket, 100);
ASSERT_DOUBLE_EQ(result, 100000.0);
printf("result is: %lf\n", result);
tMemBucketDestroy(pBucket);
}
/*
* large data test, we employ 0.1billion double data to calculated the percentile
* which is 800MB data
*/
void largeDataTest() {
printf("running : %s\n", __FUNCTION__);
tMemBucket *pBucket = NULL;
double result = 0;
struct timeval tv;
gettimeofday(&tv, NULL);
int64_t start = tv.tv_sec;
printf("start time: %" PRId64 "\n", tv.tv_sec);
pBucket = createDoubleDataBucket(0, 100000000);
result = getPercentile(pBucket, 50);
ASSERT_DOUBLE_EQ(result, 50000000);
gettimeofday(&tv, NULL);
printf("total elapsed time: %" PRId64 " sec.", -start + tv.tv_sec);
printf("the result of %d is: %lf\n", 50, result);
tMemBucketDestroy(pBucket);
}
void qsortTest() {
printf("running : %s\n", __FUNCTION__);
SSchema field[1] = {
{TSDB_DATA_TYPE_INT, "k", sizeof(int32_t)},
};
const int32_t num = 2000;
int32_t *d = (int32_t *)malloc(sizeof(int32_t) * num);
for (int32_t i = 0; i < num; ++i) {
d[i] = i % 4;
}
const int32_t numOfOrderCols = 1;
int32_t orderColIdx = 0;
SColumnModel * pModel = createColumnModel(field, 1, 1000);
tOrderDescriptor *pDesc = tOrderDesCreate(&orderColIdx, numOfOrderCols, pModel, 1);
tColDataQSort(pDesc, num, 0, num - 1, (char *)d, 1);
for (int32_t i = 0; i < num; ++i) {
printf("%d\t", d[i]);
}
printf("\n");
destroyColumnModel(pModel);
}
void unsignedDataTest() {
printf("running %s\n", __FUNCTION__);
tMemBucket *pBucket = NULL;
double result = 0.0;
pBucket = createUnsignedDataBucket(0, 1000, TSDB_DATA_TYPE_UINT);
result = getPercentile(pBucket, 50);
ASSERT_DOUBLE_EQ(result, 500.0);
tMemBucketDestroy(pBucket);
pBucket = createUnsignedDataBucket(0, 10000, TSDB_DATA_TYPE_UBIGINT);
result = getPercentile(pBucket, 100);
ASSERT_DOUBLE_EQ(result, 10000.0);
result = getPercentile(pBucket, 0);
ASSERT_DOUBLE_EQ(result, 0.0);
result = getPercentile(pBucket, 50);
ASSERT_DOUBLE_EQ(result, 5000);
result = getPercentile(pBucket, 75);
ASSERT_DOUBLE_EQ(result, 7500);
tMemBucketDestroy(pBucket);
}
} // namespace
TEST(testCase, percentileTest) {
// qsortTest();
intDataTest();
bigintDataTest();
doubleDataTest();
unsignedDataTest();
largeDataTest();
}
...@@ -30,24 +30,32 @@ int32_t compareInt8Val(const void *pLeft, const void *pRight) { ...@@ -30,24 +30,32 @@ int32_t compareInt8Val(const void *pLeft, const void *pRight) {
return 0; return 0;
} }
int32_t compareIntDoubleVal(const void *pLeft, const void *pRight) { int32_t compareUint32Val(const void *pLeft, const void *pRight) {
int64_t lhs = GET_INT64_VAL(pLeft); int32_t left = GET_UINT32_VAL(pLeft), right = GET_UINT32_VAL(pRight);
double rhs = GET_DOUBLE_VAL(pRight); if (left > right) return 1;
if (fabs(lhs - rhs) < FLT_EPSILON) { if (left < right) return -1;
return 0; return 0;
} else {
return (lhs > rhs) ? 1 : -1;
}
} }
int32_t compareDoubleIntVal(const void *pLeft, const void *pRight) { int32_t compareUint64Val(const void *pLeft, const void *pRight) {
double lhs = GET_DOUBLE_VAL(pLeft); int64_t left = GET_UINT64_VAL(pLeft), right = GET_UINT64_VAL(pRight);
int64_t rhs = GET_INT64_VAL(pRight); if (left > right) return 1;
if (fabs(lhs - rhs) < FLT_EPSILON) { if (left < right) return -1;
return 0;
}
int32_t compareUint16Val(const void *pLeft, const void *pRight) {
int16_t left = GET_UINT16_VAL(pLeft), right = GET_UINT16_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareUint8Val(const void* pLeft, const void* pRight) {
uint8_t left = GET_UINT8_VAL(pLeft), right = GET_UINT8_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0; return 0;
} else {
return (lhs > rhs) ? 1 : -1;
}
} }
int32_t compareFloatVal(const void *pLeft, const void *pRight) { int32_t compareFloatVal(const void *pLeft, const void *pRight) {
...@@ -369,15 +377,24 @@ __compar_fn_t getKeyComparFunc(int32_t keyType) { ...@@ -369,15 +377,24 @@ __compar_fn_t getKeyComparFunc(int32_t keyType) {
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
comparFn = compareDoubleVal; comparFn = compareDoubleVal;
break; break;
case TSDB_DATA_TYPE_UTINYINT:
comparFn = compareUint8Val;
break;
case TSDB_DATA_TYPE_USMALLINT:
comparFn = compareUint16Val;
break;
case TSDB_DATA_TYPE_UINT:
comparFn = compareUint32Val;
break;
case TSDB_DATA_TYPE_UBIGINT:
comparFn = compareUint64Val;
break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
comparFn = compareLenPrefixedStr; comparFn = compareLenPrefixedStr;
break; break;
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
comparFn = compareLenPrefixedWStr; comparFn = compareLenPrefixedWStr;
break; break;
default: default:
comparFn = compareInt32Val; comparFn = compareInt32Val;
break; break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册