From 0b92dee8b1b392625fa618298f49add787db7a20 Mon Sep 17 00:00:00 2001 From: AlexDuan <417921451@qq.com> Date: Thu, 9 Sep 2021 20:08:13 +0800 Subject: [PATCH] tdigest merge is finished --- src/query/src/qAggMain.c | 44 +++++++---- src/query/src/tdigest.c | 158 ++++++++++++++++++++------------------- 2 files changed, 108 insertions(+), 94 deletions(-) diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 8d0ad37472..09a3b85adc 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -30,6 +30,7 @@ #include "qUdf.h" #include "queryLog.h" + #define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput)) #define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes) @@ -290,7 +291,9 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_APERCT) { *type = TSDB_DATA_TYPE_BINARY; - *bytes = sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo) + sizeof(SAPercentileInfo); + int16_t bytesHist = sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo) + sizeof(SAPercentileInfo); + int16_t bytesDigest = sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION); + *bytes = MAX(bytesHist, bytesDigest); *interBytes = *bytes; return TSDB_CODE_SUCCESS; @@ -2312,10 +2315,10 @@ static bool percentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo * SET_DOUBLE_VAL(&pInfo->minval, DBL_MAX); SET_DOUBLE_VAL(&pInfo->maxval, -DBL_MAX); pInfo->numOfElems = 0; - return true; } + static void percentile_function(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; @@ -2360,7 +2363,7 @@ static void percentile_function(SQLFunctionCtx *pCtx) { SET_DOUBLE_VAL(&pInfo->maxval, tmax); } - pInfo->numOfElems += (pCtx->size - pCtx->preAggVals.statis.numOfNull); + pInfo->numOfElems += (pCtx->size - pCtx->preAggVals.statis.numOfNull); } else { for (int32_t i = 0; i < pCtx->size; ++i) { char *data = GET_INPUT_DATA(pCtx, i); @@ -2382,7 +2385,6 @@ static void percentile_function(SQLFunctionCtx *pCtx) { pInfo->numOfElems += 1; } } - return; } @@ -2403,6 +2405,7 @@ static void percentile_function(SQLFunctionCtx *pCtx) { static void percentile_finalizer(SQLFunctionCtx *pCtx) { double v = pCtx->param[0].nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].i64 : pCtx->param[0].dKey; + double result = 0; SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SPercentileInfo * ppInfo = (SPercentileInfo *)GET_ROWCELL_INTERBUF(pResInfo); @@ -2412,7 +2415,8 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) { assert(ppInfo->numOfElems == 0); setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); } else { - SET_DOUBLE_VAL((double *)pCtx->pOutput, getPercentile(pMemBucket, v)); + result = getPercentile(pMemBucket, v); + SET_DOUBLE_VAL((double *)pCtx->pOutput, result); } tMemBucketDestroy(pMemBucket); @@ -2450,13 +2454,9 @@ static bool tdigest_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo) } // new TDigest - SAPercentileInfo *pAPerc = getAPerctInfo(pCtx); - int compression = 500; - if(pAPerc) { - if(pAPerc->pTDigest == NULL) { - pAPerc->pTDigest = tdigestNew(compression); - } - } + SAPercentileInfo *pInfo = getAPerctInfo(pCtx); + char *tmp = (char *)pInfo + sizeof(SAPercentileInfo); + pInfo->pTDigest = tdigestNewFrom(tmp, COMPRESSION); return true; } @@ -2502,10 +2502,23 @@ static void tdigest_do(SQLFunctionCtx *pCtx) { static void tdigest_merge(SQLFunctionCtx *pCtx) { SAPercentileInfo *pInput = (SAPercentileInfo *)GET_INPUT_DATA_LIST(pCtx); assert(pInput->pTDigest); + pInput->pTDigest = (TDigest*)((char*)pInput + sizeof(SAPercentileInfo)); + pInput->pTDigest->centroids = (Centroid*)((char*)pInput + sizeof(SAPercentileInfo) + sizeof(TDigest)); - SAPercentileInfo *pOutput = getAPerctInfo(pCtx); - tdigestMerge(pOutput->pTDigest, pInput->pTDigest); + // input merge no elements , no need merge + if(pInput->pTDigest->num_centroids == 0) { + return ; + } + SAPercentileInfo *pOutput = getAPerctInfo(pCtx); + TDigest* pTDigest = pOutput->pTDigest; + if(pTDigest->num_centroids == 0) { + memcpy(pTDigest, pInput->pTDigest, TDIGEST_SIZE(COMPRESSION)); + tdigestAutoFill(pTDigest, COMPRESSION); + } else { + tdigestMerge(pOutput->pTDigest, pInput->pTDigest); + } + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); pResInfo->hasResult = DATA_SET_FLAG; SET_VAL(pCtx, 1, 1); @@ -2535,7 +2548,7 @@ static void tdigest_finalizer(SQLFunctionCtx *pCtx) { } } - tdigestFree(pAPerc->pTDigest); + tdigestFreeFrom(pAPerc->pTDigest); pAPerc->pTDigest = NULL; doFinalizer(pCtx); @@ -2658,7 +2671,6 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) { double ratio[] = {v}; double *res = tHistogramUniform(pOutput->pHisto, ratio, 1); - memcpy(pCtx->pOutput, res, sizeof(double)); free(res); } else { diff --git a/src/query/src/tdigest.c b/src/query/src/tdigest.c index 17eb577601..02f3f09394 100644 --- a/src/query/src/tdigest.c +++ b/src/query/src/tdigest.c @@ -1,3 +1,18 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + /* * src/tdigest.c * @@ -33,22 +48,38 @@ __typeof__ (b) _b = (b); \ _a < _b ? _a : _b; }) -struct TDigest *tdigestNew(int compression) { - struct TDigest *t = malloc(sizeof(struct TDigest)); +void tdigestAutoFill(TDigest* t, int compression) { + t->centroids = (Centroid*)((char*)t + sizeof(TDigest)); + t->buffered_pts = (Point*) ((char*)t + sizeof(TDigest) + sizeof(Centroid) * (int)GET_CENTROID(compression)); +} + +TDigest *tdigestNew(int compression) { + TDigest *t = malloc(sizeof(TDigest)); - memset(t, 0, sizeof(struct TDigest)); + memset(t, 0, sizeof(TDigest)); t->compression = compression; - t->size = ceil(compression * M_PI / 2) + 1; - t->threshold = 7.5 + 0.37 * compression - 2e-4 * pow(compression, 2); + t->size = GET_CENTROID(compression); + t->threshold = GET_THRESHOLD(compression); t->min = INFINITY; return t; } +TDigest *tdigestNewFrom(void* pBuf, int compression) { + memset(pBuf, 0, sizeof(TDigest) + sizeof(Centroid)*(compression + 1)); + TDigest* t = (TDigest*)pBuf; + tdigestAutoFill(t, compression); + + t->compression = compression; + t->size = GET_CENTROID(compression); + t->threshold = GET_THRESHOLD(compression); + t->min = INFINITY; + return t; +} static int centroid_cmp(const void *a, const void *b) { - struct Centroid *c1 = (struct Centroid *) a; - struct Centroid *c2 = (struct Centroid *) b; + Centroid *c1 = (Centroid *) a; + Centroid *c2 = (Centroid *) b; if (c1->mean < c2->mean) return -1; if (c1->mean > c2->mean) @@ -56,19 +87,19 @@ static int centroid_cmp(const void *a, const void *b) { return 0; } -struct MergeArgs { - struct TDigest *t; - struct Centroid *centroids; +typedef struct MergeArgs { + TDigest *t; + Centroid *centroids; int idx; double weight_so_far; double k1; double min; double max; -}; +}MergeArgs; -static void merge_centroid(struct MergeArgs *args, struct Centroid *merge) { +static void merge_centroid(MergeArgs *args, Centroid *merge) { double k2; - struct Centroid *c = &args->centroids[args->idx]; + Centroid *c = &args->centroids[args->idx]; args->weight_so_far += merge->weight; k2 = INTEGRATED_LOCATION(args->t->compression, @@ -90,43 +121,32 @@ static void merge_centroid(struct MergeArgs *args, struct Centroid *merge) { } } -void tdigestCompress(struct TDigest *t) { - struct Centroid *unmerged_centroids; +void tdigestCompress(TDigest *t) { + Centroid *unmerged_centroids; long long unmerged_weight = 0; int num_unmerged = t->num_buffered_pts; - int old_num_centroids = t->num_centroids; int i, j; - struct MergeArgs args; + MergeArgs args; if (!t->num_buffered_pts) return; - unmerged_centroids = malloc(sizeof(struct Centroid) * t->num_buffered_pts); - - i = 0; + unmerged_centroids = malloc(sizeof(Centroid) * t->num_buffered_pts); for (i = 0; i < num_unmerged; i++) { - struct Point *p = t->buffered_pts; - struct Centroid *c = &unmerged_centroids[i]; - + Point *p = t->buffered_pts + i; + Centroid *c = &unmerged_centroids[i]; c->mean = p->value; c->weight = p->weight; - unmerged_weight += c->weight; - - t->buffered_pts = p->next; - free(p); } t->num_buffered_pts = 0; t->total_weight += unmerged_weight; - qsort(unmerged_centroids, num_unmerged, sizeof(struct Centroid), - centroid_cmp); - - memset(&args, 0, sizeof(struct MergeArgs)); - - args.centroids = malloc(sizeof(struct Centroid) * t->size); - memset(args.centroids, 0, sizeof(struct Centroid) * t->size); + qsort(unmerged_centroids, num_unmerged, sizeof(Centroid), centroid_cmp); + memset(&args, 0, sizeof(MergeArgs)); + args.centroids = malloc(sizeof(Centroid) * t->size); + memset(args.centroids, 0, sizeof(Centroid) * t->size); args.t = t; args.min = INFINITY; @@ -134,8 +154,8 @@ void tdigestCompress(struct TDigest *t) { i = 0; j = 0; while (i < num_unmerged && j < t->num_centroids) { - struct Centroid *a = &unmerged_centroids[i]; - struct Centroid *b = &t->centroids[j]; + Centroid *a = &unmerged_centroids[i]; + Centroid *b = &t->centroids[j]; if (a->mean <= b->mean) { merge_centroid(&args, a); @@ -164,42 +184,31 @@ void tdigestCompress(struct TDigest *t) { t->max = MAX(t->max, args.max); } - if (t->num_centroids > old_num_centroids) { - t->centroids = realloc(t->centroids, - sizeof(struct Centroid) * t->num_centroids); - } - - memcpy(t->centroids, args.centroids, - sizeof(struct Centroid) * t->num_centroids); - + memcpy(t->centroids, args.centroids, sizeof(Centroid) * t->num_centroids); free(args.centroids); } -void tdigestAdd(struct TDigest *t, double x, long long w) { +void tdigestAdd(TDigest* t, double x, long long w) { if (w == 0) return; - struct Point *p = malloc(sizeof(struct Point)); - - p->value = x; - p->weight = w; - p->next = t->buffered_pts; - - t->buffered_pts = p; + int i = t->num_buffered_pts; + t->buffered_pts[i].value = x; + t->buffered_pts[i].weight = w; t->num_buffered_pts++; if (t->num_buffered_pts > t->threshold) tdigestCompress(t); } -double tdigestCDF(struct TDigest *t, double x) { +double tdigestCDF(TDigest *t, double x) { if (t == NULL) return 0; int i; double left, right; long long weight_so_far; - struct Centroid *a, *b, tmp; + Centroid *a, *b, tmp; tdigestCompress(t); @@ -225,7 +234,7 @@ double tdigestCDF(struct TDigest *t, double x) { right = 0; for (i = 0; i < t->num_centroids; i++) { - struct Centroid *c = &t->centroids[i]; + Centroid *c = &t->centroids[i]; left = b->mean - (a->mean + right); a = b; @@ -255,14 +264,14 @@ double tdigestCDF(struct TDigest *t, double x) { return 1; } -double tdigestQuantile(struct TDigest *t, double q) { +double tdigestQuantile(TDigest *t, double q) { if (t == NULL) return 0; int i; double left, right, idx; long long weight_so_far; - struct Centroid *a, *b, tmp; + Centroid *a, *b, tmp; tdigestCompress(t); @@ -287,7 +296,7 @@ double tdigestQuantile(struct TDigest *t, double q) { right = t->min; for (i = 0; i < t->num_centroids; i++) { - struct Centroid *c = &t->centroids[i]; + Centroid *c = &t->centroids[i]; a = b; left = right; @@ -315,31 +324,24 @@ double tdigestQuantile(struct TDigest *t, double q) { return t->max; } -void tdigestMerge(struct TDigest *t1, struct TDigest *t2) { - int i = t2->num_buffered_pts; - struct Point *p = t2->buffered_pts; - - while (i) { - tdigestAdd(t1, p->value, p->weight); - p = p->next; - i--; +void tdigestMerge(TDigest *t1, TDigest *t2) { + // points + int num_points = t2->num_buffered_pts; + for(int i = num_points - 1; i >= 0; i--) { + Point* p = t2->buffered_pts + i; + tdigestAdd(t1, p->value, p->value); + t2->num_buffered_pts --; } - - for (i = 0; i < t2->num_centroids; i++) { + // centroids + for (int i = 0; i < t2->num_centroids; i++) { tdigestAdd(t1, t2->centroids[i].mean, t2->centroids[i].weight); } } -void tdigestFree(struct TDigest *t) { - while (t->buffered_pts) { - struct Point *p = t->buffered_pts; - t->buffered_pts = t->buffered_pts->next; - free(p); - } - - if (t->centroids) - free(t->centroids); - +void tdigestFree(TDigest *t) { free(t); } +void tdigestFreeFrom(TDigest *t) { + // nothing to do +} \ No newline at end of file -- GitLab