diff --git a/src/query/inc/tdigest.h b/src/query/inc/tdigest.h index 81bbdb0226ca437d45a50d38250543d4e7a9d5c4..758eba486496930bb6d6a967186a45e638900fea 100644 --- a/src/query/inc/tdigest.h +++ b/src/query/inc/tdigest.h @@ -26,11 +26,11 @@ #define M_PI 3.14159265358979323846264338327950288 /* pi */ #endif -#define ADDITION_CENTROID_NUM 100 +#define ADDITION_CENTROID_NUM 2 #define COMPRESSION 400 #define GET_CENTROID(compression) (ceil(compression * M_PI / 2) + 1 + ADDITION_CENTROID_NUM) #define GET_THRESHOLD(compression) (7.5 + 0.37 * compression - 2e-4 * pow(compression, 2)) -#define TDIGEST_SIZE(compression) (sizeof(TDigest) + sizeof(SPt)*GET_THRESHOLD(compression)) +#define TDIGEST_SIZE(compression) (sizeof(TDigest) + sizeof(SCentroid)*GET_CENTROID(compression) + sizeof(SPt)*GET_THRESHOLD(compression)) typedef struct SCentroid { double mean; diff --git a/src/query/src/tdigest.c b/src/query/src/tdigest.c index 3028b8daa9a48046dd2abd7704aa87c592ac1241..eb78e8aceebb13b83dd7fe60c9a704f418860797 100644 --- a/src/query/src/tdigest.c +++ b/src/query/src/tdigest.c @@ -29,7 +29,7 @@ #include "tdigest.h" #define INTERPOLATE(x, x0, x1) (((x) - (x0)) / ((x1) - (x0))) -#define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (q) - 1) + M_PI / 2) / M_PI) +#define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (double)(q) - 1)/M_PI + (double)1/2)) #define FLOAT_EQ(f1, f2) (fabs((f1) - (f2)) <= FLT_EPSILON) typedef struct SMergeArgs { @@ -78,26 +78,28 @@ static void mergeCentroid(SMergeArgs *args, SCentroid *merge) { SCentroid *c = &args->centroids[args->idx]; args->weight_so_far += merge->weight; - k2 = INTEGRATED_LOCATION(args->t->compression, - args->weight_so_far / args->t->total_weight); - - if (k2 - args->k1 > 1 && c->weight > 0) { - if(args->idx + 1 < args->t->size) { // check avoid overflow - args->idx++; - } else { - assert(0); + k2 = INTEGRATED_LOCATION(args->t->size, + args->weight_so_far / (args->t->total_weight + merge->weight)); + //idx++ + if(k2 - args->k1 > 1 && c->weight > 0) { + if(args->idx + 1 < args->t->size + && merge->mean != args->centroids[args->idx].mean) { + args->idx++; } - args->k1 = INTEGRATED_LOCATION(args->t->compression, - (args->weight_so_far - merge->weight) / args->t->total_weight); + args->k1 = k2; } c = &args->centroids[args->idx]; - c->weight += merge->weight; - c->mean += (merge->mean - c->mean) * merge->weight / c->weight; - - if (merge->weight > 0) { - args->min = MIN(merge->mean, args->min); - args->max = MAX(merge->mean, args->max); + if(c->mean == merge->mean) { + c->weight += merge->weight; + } else { + c->weight += merge->weight; + c->mean += (merge->mean - c->mean) * merge->weight / c->weight; + + if (merge->weight > 0) { + args->min = MIN(merge->mean, args->min); + args->max = MAX(merge->mean, args->max); + } } } @@ -163,10 +165,13 @@ void tdigestAdd(TDigest* t, double x, int64_t w) { return; int32_t i = t->num_buffered_pts; - t->buffered_pts[i].value = x; - t->buffered_pts[i].weight = w; - t->num_buffered_pts++; - t->total_weight += w; + if(i > 0 && t->buffered_pts[i-1].value == x ) { + t->buffered_pts[i].weight = w; + } else { + t->buffered_pts[i].value = x; + t->buffered_pts[i].weight = w; + t->num_buffered_pts++; + } if (t->num_buffered_pts >= t->threshold) tdigestCompress(t);