diff --git a/src/query/inc/tdigest.h b/src/query/inc/tdigest.h index 758eba486496930bb6d6a967186a45e638900fea..b513317169163f5ce79b03add60395834b33ea8c 100644 --- a/src/query/inc/tdigest.h +++ b/src/query/inc/tdigest.h @@ -64,6 +64,6 @@ void tdigestMerge(TDigest *t1, TDigest *t2); double tdigestQuantile(TDigest *t, double q); void tdigestCompress(TDigest *t); void tdigestFreeFrom(TDigest *t); -void tdigestCopy(TDigest* dst, TDigest* src); +void tdigestAutoFill(TDigest* t, int32_t compression); #endif /* TDIGEST_H */ diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index a015399fd5faaa475a07601cba590cc20a154b66..68a19937db8720f3bdf805589229d22afefac851 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -2510,7 +2510,8 @@ static void tdigest_merge(SQLFunctionCtx *pCtx) { SAPercentileInfo *pOutput = getAPerctInfo(pCtx); TDigest* pTDigest = pOutput->pTDigest; if(pTDigest->num_centroids == 0) { - tdigestCopy(pTDigest, pInput->pTDigest); + memcpy(pTDigest, pInput->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION)); + tdigestAutoFill(pTDigest, COMPRESSION); } else { tdigestMerge(pOutput->pTDigest, pInput->pTDigest); } diff --git a/src/query/src/tdigest.c b/src/query/src/tdigest.c index eb78e8aceebb13b83dd7fe60c9a704f418860797..5479b9aeb9d8e6c526cc043d17a85074894ce2f8 100644 --- a/src/query/src/tdigest.c +++ b/src/query/src/tdigest.c @@ -29,6 +29,7 @@ #include "tdigest.h" #define INTERPOLATE(x, x0, x1) (((x) - (x0)) / ((x1) - (x0))) +//#define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (q) - 1) + M_PI / 2) / M_PI) #define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (double)(q) - 1)/M_PI + (double)1/2)) #define FLOAT_EQ(f1, f2) (fabs((f1) - (f2)) <= FLT_EPSILON) @@ -42,20 +43,16 @@ typedef struct SMergeArgs { double max; }SMergeArgs; -void tdigestCopy(TDigest* dst, TDigest* src) { - memcpy(dst, src, (size_t)TDIGEST_SIZE(COMPRESSION)); - - dst->centroids = (SCentroid*)malloc((int32_t)GET_CENTROID(COMPRESSION) * sizeof(SCentroid)); - memcpy(dst->centroids, src->centroids, (int32_t)GET_CENTROID(COMPRESSION) * sizeof(SCentroid)); - dst->buffered_pts = (SPt*) ((char*)dst + sizeof(TDigest)); +void tdigestAutoFill(TDigest* t, int32_t compression) { + t->centroids = (SCentroid*)((char*)t + sizeof(TDigest)); + t->buffered_pts = (SPt*) ((char*)t + sizeof(TDigest) + sizeof(SCentroid) * (int32_t)GET_CENTROID(compression)); } TDigest *tdigestNewFrom(void* pBuf, int32_t compression) { - memset(pBuf, 0, (size_t)TDIGEST_SIZE(compression)); + memset(pBuf, 0, TDIGEST_SIZE(compression)); TDigest* t = (TDigest*)pBuf; - - t->centroids = (SCentroid*)calloc((int32_t)GET_CENTROID(compression), sizeof(SCentroid)); - t->buffered_pts = (SPt*) ((char*)t + sizeof(TDigest)); + tdigestAutoFill(t, compression); + t->compression = compression; t->size = (int64_t)GET_CENTROID(compression); t->threshold = (int32_t)GET_THRESHOLD(compression); @@ -73,6 +70,7 @@ static int32_t cmpCentroid(const void *a, const void *b) { return 0; } + static void mergeCentroid(SMergeArgs *args, SCentroid *merge) { double k2; SCentroid *c = &args->centroids[args->idx]; @@ -104,18 +102,30 @@ static void mergeCentroid(SMergeArgs *args, SCentroid *merge) { } void tdigestCompress(TDigest *t) { - SCentroid *unmerged_centroids = (SCentroid *)t->buffered_pts; + SCentroid *unmerged_centroids; + int64_t unmerged_weight = 0; int32_t num_unmerged = t->num_buffered_pts; int32_t i, j; - SMergeArgs args = {0}; + SMergeArgs args; if (t->num_buffered_pts <= 0) return; + unmerged_centroids = (SCentroid*)malloc(sizeof(SCentroid) * t->num_buffered_pts); + for (i = 0; i < num_unmerged; i++) { + SPt *p = t->buffered_pts + i; + SCentroid *c = &unmerged_centroids[i]; + c->mean = p->value; + c->weight = p->weight; + unmerged_weight += c->weight; + } t->num_buffered_pts = 0; + t->total_weight += unmerged_weight; qsort(unmerged_centroids, num_unmerged, sizeof(SCentroid), cmpCentroid); - args.centroids = (SCentroid*)calloc(sizeof(SCentroid), (size_t)t->size); + memset(&args, 0, sizeof(SMergeArgs)); + args.centroids = (SCentroid*)malloc((size_t)(sizeof(SCentroid) * t->size)); + memset(args.centroids, 0, (size_t)(sizeof(SCentroid) * t->size)); args.t = t; args.min = INFINITY; @@ -128,23 +138,24 @@ void tdigestCompress(TDigest *t) { if (a->mean <= b->mean) { mergeCentroid(&args, a); - assert(args.idx < (t->size)); + assert(args.idx < t->size); i++; } else { mergeCentroid(&args, b); - assert(args.idx < (t->size)); + assert(args.idx < t->size); j++; } } while (i < num_unmerged) { mergeCentroid(&args, &unmerged_centroids[i++]); - assert(args.idx < (t->size)); + assert(args.idx < t->size); } + free((void*)unmerged_centroids); while (j < t->num_centroids) { mergeCentroid(&args, &t->centroids[j++]); - assert(args.idx < (t->size)); + assert(args.idx < t->size); } if (t->total_weight > 0) { @@ -156,8 +167,8 @@ void tdigestCompress(TDigest *t) { t->max = MAX(t->max, args.max); } - tfree(t->centroids); - t->centroids = args.centroids; + memcpy(t->centroids, args.centroids, sizeof(SCentroid) * t->num_centroids); + free((void*)args.centroids); } void tdigestAdd(TDigest* t, double x, int64_t w) { @@ -173,10 +184,70 @@ void tdigestAdd(TDigest* t, double x, int64_t w) { t->num_buffered_pts++; } + if (t->num_buffered_pts >= t->threshold) tdigestCompress(t); } +double tdigestCDF(TDigest *t, double x) { + if (t == NULL) + return 0; + + int32_t i; + double left, right; + int64_t weight_so_far; + SCentroid *a, *b, tmp; + + tdigestCompress(t); + if (t->num_centroids == 0) + return NAN; + if (x < t->min) + return 0; + if (x > t->max) + return 1; + if (t->num_centroids == 1) { + if (FLOAT_EQ(t->max, t->min)) + return 0.5; + + return INTERPOLATE(x, t->min, t->max); + } + + weight_so_far = 0; + a = b = &tmp; + b->mean = t->min; + b->weight = 0; + right = 0; + + for (i = 0; i < t->num_centroids; i++) { + SCentroid *c = &t->centroids[i]; + + left = b->mean - (a->mean + right); + a = b; + b = c; + right = (b->mean - a->mean) * a->weight / (a->weight + b->weight); + + if (x < a->mean + right) { + double cdf = (weight_so_far + + a->weight + * INTERPOLATE(x, a->mean - left, a->mean + right)) + / t->total_weight; + return MAX(cdf, 0.0); + } + + weight_so_far += a->weight; + } + + left = b->mean - (a->mean + right); + a = b; + right = t->max - a->mean; + + if (x < a->mean + right) { + return (weight_so_far + a->weight * INTERPOLATE(x, a->mean - left, a->mean + right)) + / t->total_weight; + } + + return 1; +} double tdigestQuantile(TDigest *t, double q) { if (t == NULL)