提交 faa9c649 编写于 作者: A AlexDuan

rest part code

上级 58500a3c
...@@ -64,6 +64,6 @@ void tdigestMerge(TDigest *t1, TDigest *t2); ...@@ -64,6 +64,6 @@ void tdigestMerge(TDigest *t1, TDigest *t2);
double tdigestQuantile(TDigest *t, double q); double tdigestQuantile(TDigest *t, double q);
void tdigestCompress(TDigest *t); void tdigestCompress(TDigest *t);
void tdigestFreeFrom(TDigest *t); void tdigestFreeFrom(TDigest *t);
void tdigestCopy(TDigest* dst, TDigest* src); void tdigestAutoFill(TDigest* t, int32_t compression);
#endif /* TDIGEST_H */ #endif /* TDIGEST_H */
...@@ -2510,7 +2510,8 @@ static void tdigest_merge(SQLFunctionCtx *pCtx) { ...@@ -2510,7 +2510,8 @@ static void tdigest_merge(SQLFunctionCtx *pCtx) {
SAPercentileInfo *pOutput = getAPerctInfo(pCtx); SAPercentileInfo *pOutput = getAPerctInfo(pCtx);
TDigest* pTDigest = pOutput->pTDigest; TDigest* pTDigest = pOutput->pTDigest;
if(pTDigest->num_centroids == 0) { if(pTDigest->num_centroids == 0) {
tdigestCopy(pTDigest, pInput->pTDigest); memcpy(pTDigest, pInput->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION));
tdigestAutoFill(pTDigest, COMPRESSION);
} else { } else {
tdigestMerge(pOutput->pTDigest, pInput->pTDigest); tdigestMerge(pOutput->pTDigest, pInput->pTDigest);
} }
......
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
#include "tdigest.h" #include "tdigest.h"
#define INTERPOLATE(x, x0, x1) (((x) - (x0)) / ((x1) - (x0))) #define INTERPOLATE(x, x0, x1) (((x) - (x0)) / ((x1) - (x0)))
//#define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (q) - 1) + M_PI / 2) / M_PI)
#define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (double)(q) - 1)/M_PI + (double)1/2)) #define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (double)(q) - 1)/M_PI + (double)1/2))
#define FLOAT_EQ(f1, f2) (fabs((f1) - (f2)) <= FLT_EPSILON) #define FLOAT_EQ(f1, f2) (fabs((f1) - (f2)) <= FLT_EPSILON)
...@@ -42,20 +43,16 @@ typedef struct SMergeArgs { ...@@ -42,20 +43,16 @@ typedef struct SMergeArgs {
double max; double max;
}SMergeArgs; }SMergeArgs;
void tdigestCopy(TDigest* dst, TDigest* src) { void tdigestAutoFill(TDigest* t, int32_t compression) {
memcpy(dst, src, (size_t)TDIGEST_SIZE(COMPRESSION)); t->centroids = (SCentroid*)((char*)t + sizeof(TDigest));
t->buffered_pts = (SPt*) ((char*)t + sizeof(TDigest) + sizeof(SCentroid) * (int32_t)GET_CENTROID(compression));
dst->centroids = (SCentroid*)malloc((int32_t)GET_CENTROID(COMPRESSION) * sizeof(SCentroid));
memcpy(dst->centroids, src->centroids, (int32_t)GET_CENTROID(COMPRESSION) * sizeof(SCentroid));
dst->buffered_pts = (SPt*) ((char*)dst + sizeof(TDigest));
} }
TDigest *tdigestNewFrom(void* pBuf, int32_t compression) { TDigest *tdigestNewFrom(void* pBuf, int32_t compression) {
memset(pBuf, 0, (size_t)TDIGEST_SIZE(compression)); memset(pBuf, 0, TDIGEST_SIZE(compression));
TDigest* t = (TDigest*)pBuf; TDigest* t = (TDigest*)pBuf;
tdigestAutoFill(t, compression);
t->centroids = (SCentroid*)calloc((int32_t)GET_CENTROID(compression), sizeof(SCentroid));
t->buffered_pts = (SPt*) ((char*)t + sizeof(TDigest));
t->compression = compression; t->compression = compression;
t->size = (int64_t)GET_CENTROID(compression); t->size = (int64_t)GET_CENTROID(compression);
t->threshold = (int32_t)GET_THRESHOLD(compression); t->threshold = (int32_t)GET_THRESHOLD(compression);
...@@ -73,6 +70,7 @@ static int32_t cmpCentroid(const void *a, const void *b) { ...@@ -73,6 +70,7 @@ static int32_t cmpCentroid(const void *a, const void *b) {
return 0; return 0;
} }
static void mergeCentroid(SMergeArgs *args, SCentroid *merge) { static void mergeCentroid(SMergeArgs *args, SCentroid *merge) {
double k2; double k2;
SCentroid *c = &args->centroids[args->idx]; SCentroid *c = &args->centroids[args->idx];
...@@ -104,18 +102,30 @@ static void mergeCentroid(SMergeArgs *args, SCentroid *merge) { ...@@ -104,18 +102,30 @@ static void mergeCentroid(SMergeArgs *args, SCentroid *merge) {
} }
void tdigestCompress(TDigest *t) { void tdigestCompress(TDigest *t) {
SCentroid *unmerged_centroids = (SCentroid *)t->buffered_pts; SCentroid *unmerged_centroids;
int64_t unmerged_weight = 0;
int32_t num_unmerged = t->num_buffered_pts; int32_t num_unmerged = t->num_buffered_pts;
int32_t i, j; int32_t i, j;
SMergeArgs args = {0}; SMergeArgs args;
if (t->num_buffered_pts <= 0) if (t->num_buffered_pts <= 0)
return; return;
unmerged_centroids = (SCentroid*)malloc(sizeof(SCentroid) * t->num_buffered_pts);
for (i = 0; i < num_unmerged; i++) {
SPt *p = t->buffered_pts + i;
SCentroid *c = &unmerged_centroids[i];
c->mean = p->value;
c->weight = p->weight;
unmerged_weight += c->weight;
}
t->num_buffered_pts = 0; t->num_buffered_pts = 0;
t->total_weight += unmerged_weight;
qsort(unmerged_centroids, num_unmerged, sizeof(SCentroid), cmpCentroid); qsort(unmerged_centroids, num_unmerged, sizeof(SCentroid), cmpCentroid);
args.centroids = (SCentroid*)calloc(sizeof(SCentroid), (size_t)t->size); memset(&args, 0, sizeof(SMergeArgs));
args.centroids = (SCentroid*)malloc((size_t)(sizeof(SCentroid) * t->size));
memset(args.centroids, 0, (size_t)(sizeof(SCentroid) * t->size));
args.t = t; args.t = t;
args.min = INFINITY; args.min = INFINITY;
...@@ -128,23 +138,24 @@ void tdigestCompress(TDigest *t) { ...@@ -128,23 +138,24 @@ void tdigestCompress(TDigest *t) {
if (a->mean <= b->mean) { if (a->mean <= b->mean) {
mergeCentroid(&args, a); mergeCentroid(&args, a);
assert(args.idx < (t->size)); assert(args.idx < t->size);
i++; i++;
} else { } else {
mergeCentroid(&args, b); mergeCentroid(&args, b);
assert(args.idx < (t->size)); assert(args.idx < t->size);
j++; j++;
} }
} }
while (i < num_unmerged) { while (i < num_unmerged) {
mergeCentroid(&args, &unmerged_centroids[i++]); mergeCentroid(&args, &unmerged_centroids[i++]);
assert(args.idx < (t->size)); assert(args.idx < t->size);
} }
free((void*)unmerged_centroids);
while (j < t->num_centroids) { while (j < t->num_centroids) {
mergeCentroid(&args, &t->centroids[j++]); mergeCentroid(&args, &t->centroids[j++]);
assert(args.idx < (t->size)); assert(args.idx < t->size);
} }
if (t->total_weight > 0) { if (t->total_weight > 0) {
...@@ -156,8 +167,8 @@ void tdigestCompress(TDigest *t) { ...@@ -156,8 +167,8 @@ void tdigestCompress(TDigest *t) {
t->max = MAX(t->max, args.max); t->max = MAX(t->max, args.max);
} }
tfree(t->centroids); memcpy(t->centroids, args.centroids, sizeof(SCentroid) * t->num_centroids);
t->centroids = args.centroids; free((void*)args.centroids);
} }
void tdigestAdd(TDigest* t, double x, int64_t w) { void tdigestAdd(TDigest* t, double x, int64_t w) {
...@@ -173,10 +184,70 @@ void tdigestAdd(TDigest* t, double x, int64_t w) { ...@@ -173,10 +184,70 @@ void tdigestAdd(TDigest* t, double x, int64_t w) {
t->num_buffered_pts++; t->num_buffered_pts++;
} }
if (t->num_buffered_pts >= t->threshold) if (t->num_buffered_pts >= t->threshold)
tdigestCompress(t); tdigestCompress(t);
} }
double tdigestCDF(TDigest *t, double x) {
if (t == NULL)
return 0;
int32_t i;
double left, right;
int64_t weight_so_far;
SCentroid *a, *b, tmp;
tdigestCompress(t);
if (t->num_centroids == 0)
return NAN;
if (x < t->min)
return 0;
if (x > t->max)
return 1;
if (t->num_centroids == 1) {
if (FLOAT_EQ(t->max, t->min))
return 0.5;
return INTERPOLATE(x, t->min, t->max);
}
weight_so_far = 0;
a = b = &tmp;
b->mean = t->min;
b->weight = 0;
right = 0;
for (i = 0; i < t->num_centroids; i++) {
SCentroid *c = &t->centroids[i];
left = b->mean - (a->mean + right);
a = b;
b = c;
right = (b->mean - a->mean) * a->weight / (a->weight + b->weight);
if (x < a->mean + right) {
double cdf = (weight_so_far
+ a->weight
* INTERPOLATE(x, a->mean - left, a->mean + right))
/ t->total_weight;
return MAX(cdf, 0.0);
}
weight_so_far += a->weight;
}
left = b->mean - (a->mean + right);
a = b;
right = t->max - a->mean;
if (x < a->mean + right) {
return (weight_so_far + a->weight * INTERPOLATE(x, a->mean - left, a->mean + right))
/ t->total_weight;
}
return 1;
}
double tdigestQuantile(TDigest *t, double q) { double tdigestQuantile(TDigest *t, double q) {
if (t == NULL) if (t == NULL)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册