diff --git a/src/query/inc/tdigest.h b/src/query/inc/tdigest.h index 085e4ac55e0b63c8fd8ddfe8af96d213df7e9481..582ad338ac4dffcaac883669c6f4f72a0fb64a9b 100644 --- a/src/query/inc/tdigest.h +++ b/src/query/inc/tdigest.h @@ -1,3 +1,18 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + /* * include/tdigest.c * @@ -7,17 +22,19 @@ #ifndef TDIGEST_H #define TDIGEST_H -#define DEFAULT_COMPRESSION 400 +#define COMPRESSION 400 +#define GET_CENTROID(compression) (ceil(compression * M_PI / 2) + 1) +#define GET_THRESHOLD(compression) (7.5 + 0.37 * compression - 2e-4 * pow(compression, 2)) +#define TDIGEST_SIZE(compression) (sizeof(TDigest) + sizeof(Centroid)*GET_CENTROID(compression) + sizeof(Point)*GET_THRESHOLD(compression)) typedef struct Centroid { - long long weight; double mean; + long long weight; }Centroid; typedef struct Point { double value; long long weight; - struct Point *next; }Point; typedef struct TDigest { @@ -36,12 +53,13 @@ typedef struct TDigest { Centroid *centroids; }TDigest; -extern struct TDigest *tdigestNew(int compression); -extern void tdigestAdd(struct TDigest *t, double x, long long w); -extern void tdigestMerge(struct TDigest *t1, struct TDigest *t2); -extern double tdigestCDF(struct TDigest *t, double x); -extern double tdigestQuantile(struct TDigest *t, double q); -extern void tdigestCompress(struct TDigest *t); -extern void tdigestFree(struct TDigest *t); +TDigest *tdigestNewFrom(void* pBuf, int compression); +void tdigestAdd(TDigest *t, double x, long long w); +void tdigestMerge(TDigest *t1, TDigest *t2); +double tdigestCDF(TDigest *t, double x); +double tdigestQuantile(TDigest *t, double q); +void tdigestCompress(TDigest *t); +void tdigestFreeFrom(TDigest *t); +void tdigestAutoFill(TDigest* t, int compression); #endif /* TDIGEST_H */ diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 09a3b85adc2d451deefafd86ef0795b2a4529a37..f6f41c2d0bd4b2227f725e4fd3b2ff8f0caf0f30 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -2442,7 +2442,6 @@ static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) { return pInfo; } - // // ----------------- tdigest ------------------- // @@ -2477,7 +2476,6 @@ static void tdigest_do(SQLFunctionCtx *pCtx) { if (pCtx->hasNull && isNull(data, pCtx->inputType)) { continue; } - notNullElems += 1; double v = 0; // value @@ -2486,14 +2484,11 @@ static void tdigest_do(SQLFunctionCtx *pCtx) { tdigestAdd(pAPerc->pTDigest, v, w); } - //tdigestCompress(pAPerc->pTDigest); - if (!pCtx->hasNull) { assert(pCtx->size == notNullElems); } SET_VAL(pCtx, notNullElems, 1); - if (notNullElems > 0) { pResInfo->hasResult = DATA_SET_FLAG; } @@ -2548,9 +2543,7 @@ static void tdigest_finalizer(SQLFunctionCtx *pCtx) { } } - tdigestFreeFrom(pAPerc->pTDigest); pAPerc->pTDigest = NULL; - doFinalizer(pCtx); } @@ -2653,7 +2646,6 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, 1, 1); } - static void apercentile_finalizer(SQLFunctionCtx *pCtx) { if (getAlgo(pCtx) == ALGO_TDIGEST) { tdigest_finalizer(pCtx); diff --git a/src/query/src/tdigest.c b/src/query/src/tdigest.c index 02f3f09394d8949eedb17129caf0a6037b0aa8fd..c673c4d48c3a3d543325fd24ed20812de95b5422 100644 --- a/src/query/src/tdigest.c +++ b/src/query/src/tdigest.c @@ -31,7 +31,6 @@ #include #include #include - #include "tdigest.h" #define INTERPOLATE(x, x0, x1) (((x) - (x0)) / ((x1) - (x0))) @@ -47,24 +46,21 @@ ({ __typeof__ (a) _a = (a); \ __typeof__ (b) _b = (b); \ _a < _b ? _a : _b; }) +typedef struct MergeArgs { + TDigest *t; + Centroid *centroids; + int idx; + double weight_so_far; + double k1; + double min; + double max; +}MergeArgs; void tdigestAutoFill(TDigest* t, int compression) { t->centroids = (Centroid*)((char*)t + sizeof(TDigest)); t->buffered_pts = (Point*) ((char*)t + sizeof(TDigest) + sizeof(Centroid) * (int)GET_CENTROID(compression)); } -TDigest *tdigestNew(int compression) { - TDigest *t = malloc(sizeof(TDigest)); - - memset(t, 0, sizeof(TDigest)); - - t->compression = compression; - t->size = GET_CENTROID(compression); - t->threshold = GET_THRESHOLD(compression); - t->min = INFINITY; - - return t; -} TDigest *tdigestNewFrom(void* pBuf, int compression) { memset(pBuf, 0, sizeof(TDigest) + sizeof(Centroid)*(compression + 1)); TDigest* t = (TDigest*)pBuf; @@ -87,16 +83,6 @@ static int centroid_cmp(const void *a, const void *b) { return 0; } -typedef struct MergeArgs { - TDigest *t; - Centroid *centroids; - int idx; - double weight_so_far; - double k1; - double min; - double max; -}MergeArgs; - static void merge_centroid(MergeArgs *args, Centroid *merge) { double k2; Centroid *c = &args->centroids[args->idx]; @@ -168,7 +154,6 @@ void tdigestCompress(TDigest *t) { while (i < num_unmerged) merge_centroid(&args, &unmerged_centroids[i++]); - free(unmerged_centroids); while (j < t->num_centroids) @@ -244,7 +229,7 @@ double tdigestCDF(TDigest *t, double x) { if (x < a->mean + right) { double cdf = (weight_so_far + a->weight - * INTERPOLATE(x, a->mean - left, a->mean + right)) + * INTERPOLATE(x, a->mean - left, a->mean + right)) / t->total_weight; return MAX(cdf, 0.0); } @@ -256,10 +241,10 @@ double tdigestCDF(TDigest *t, double x) { a = b; right = t->max - a->mean; - if (x < a->mean + right) - return (weight_so_far - + a->weight * INTERPOLATE(x, a->mean - left, a->mean + right)) + if (x < a->mean + right) { + return (weight_so_far + a->weight * INTERPOLATE(x, a->mean - left, a->mean + right)) / t->total_weight; + } return 1; } @@ -301,14 +286,11 @@ double tdigestQuantile(TDigest *t, double q) { left = right; b = c; - right = (b->weight * a->mean + a->weight * b->mean) - / (a->weight + b->weight); - + right = (b->weight * a->mean + a->weight * b->mean)/ (a->weight + b->weight); if (idx < weight_so_far + a->weight) { double p = (idx - weight_so_far) / a->weight; return left * (1 - p) + right * p; } - weight_so_far += a->weight; } @@ -336,12 +318,4 @@ void tdigestMerge(TDigest *t1, TDigest *t2) { for (int i = 0; i < t2->num_centroids; i++) { tdigestAdd(t1, t2->centroids[i].mean, t2->centroids[i].weight); } -} - -void tdigestFree(TDigest *t) { - free(t); -} - -void tdigestFreeFrom(TDigest *t) { - // nothing to do } \ No newline at end of file