提交 0b92dee8 编写于 作者: A AlexDuan

tdigest merge is finished

上级 78327d55
......@@ -30,6 +30,7 @@
#include "qUdf.h"
#include "queryLog.h"
#define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput))
#define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes)
......@@ -290,7 +291,9 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_APERCT) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo) + sizeof(SAPercentileInfo);
int16_t bytesHist = sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo) + sizeof(SAPercentileInfo);
int16_t bytesDigest = sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION);
*bytes = MAX(bytesHist, bytesDigest);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
......@@ -2312,10 +2315,10 @@ static bool percentile_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *
SET_DOUBLE_VAL(&pInfo->minval, DBL_MAX);
SET_DOUBLE_VAL(&pInfo->maxval, -DBL_MAX);
pInfo->numOfElems = 0;
return true;
}
static void percentile_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0;
......@@ -2360,7 +2363,7 @@ static void percentile_function(SQLFunctionCtx *pCtx) {
SET_DOUBLE_VAL(&pInfo->maxval, tmax);
}
pInfo->numOfElems += (pCtx->size - pCtx->preAggVals.statis.numOfNull);
pInfo->numOfElems += (pCtx->size - pCtx->preAggVals.statis.numOfNull);
} else {
for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_DATA(pCtx, i);
......@@ -2382,7 +2385,6 @@ static void percentile_function(SQLFunctionCtx *pCtx) {
pInfo->numOfElems += 1;
}
}
return;
}
......@@ -2403,6 +2405,7 @@ static void percentile_function(SQLFunctionCtx *pCtx) {
static void percentile_finalizer(SQLFunctionCtx *pCtx) {
double v = pCtx->param[0].nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].i64 : pCtx->param[0].dKey;
double result = 0;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SPercentileInfo * ppInfo = (SPercentileInfo *)GET_ROWCELL_INTERBUF(pResInfo);
......@@ -2412,7 +2415,8 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) {
assert(ppInfo->numOfElems == 0);
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
} else {
SET_DOUBLE_VAL((double *)pCtx->pOutput, getPercentile(pMemBucket, v));
result = getPercentile(pMemBucket, v);
SET_DOUBLE_VAL((double *)pCtx->pOutput, result);
}
tMemBucketDestroy(pMemBucket);
......@@ -2450,13 +2454,9 @@ static bool tdigest_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo *pResultInfo)
}
// new TDigest
SAPercentileInfo *pAPerc = getAPerctInfo(pCtx);
int compression = 500;
if(pAPerc) {
if(pAPerc->pTDigest == NULL) {
pAPerc->pTDigest = tdigestNew(compression);
}
}
SAPercentileInfo *pInfo = getAPerctInfo(pCtx);
char *tmp = (char *)pInfo + sizeof(SAPercentileInfo);
pInfo->pTDigest = tdigestNewFrom(tmp, COMPRESSION);
return true;
}
......@@ -2502,10 +2502,23 @@ static void tdigest_do(SQLFunctionCtx *pCtx) {
static void tdigest_merge(SQLFunctionCtx *pCtx) {
SAPercentileInfo *pInput = (SAPercentileInfo *)GET_INPUT_DATA_LIST(pCtx);
assert(pInput->pTDigest);
pInput->pTDigest = (TDigest*)((char*)pInput + sizeof(SAPercentileInfo));
pInput->pTDigest->centroids = (Centroid*)((char*)pInput + sizeof(SAPercentileInfo) + sizeof(TDigest));
SAPercentileInfo *pOutput = getAPerctInfo(pCtx);
tdigestMerge(pOutput->pTDigest, pInput->pTDigest);
// input merge no elements , no need merge
if(pInput->pTDigest->num_centroids == 0) {
return ;
}
SAPercentileInfo *pOutput = getAPerctInfo(pCtx);
TDigest* pTDigest = pOutput->pTDigest;
if(pTDigest->num_centroids == 0) {
memcpy(pTDigest, pInput->pTDigest, TDIGEST_SIZE(COMPRESSION));
tdigestAutoFill(pTDigest, COMPRESSION);
} else {
tdigestMerge(pOutput->pTDigest, pInput->pTDigest);
}
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
pResInfo->hasResult = DATA_SET_FLAG;
SET_VAL(pCtx, 1, 1);
......@@ -2535,7 +2548,7 @@ static void tdigest_finalizer(SQLFunctionCtx *pCtx) {
}
}
tdigestFree(pAPerc->pTDigest);
tdigestFreeFrom(pAPerc->pTDigest);
pAPerc->pTDigest = NULL;
doFinalizer(pCtx);
......@@ -2658,7 +2671,6 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
double ratio[] = {v};
double *res = tHistogramUniform(pOutput->pHisto, ratio, 1);
memcpy(pCtx->pOutput, res, sizeof(double));
free(res);
} else {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/*
* src/tdigest.c
*
......@@ -33,22 +48,38 @@
__typeof__ (b) _b = (b); \
_a < _b ? _a : _b; })
struct TDigest *tdigestNew(int compression) {
struct TDigest *t = malloc(sizeof(struct TDigest));
void tdigestAutoFill(TDigest* t, int compression) {
t->centroids = (Centroid*)((char*)t + sizeof(TDigest));
t->buffered_pts = (Point*) ((char*)t + sizeof(TDigest) + sizeof(Centroid) * (int)GET_CENTROID(compression));
}
TDigest *tdigestNew(int compression) {
TDigest *t = malloc(sizeof(TDigest));
memset(t, 0, sizeof(struct TDigest));
memset(t, 0, sizeof(TDigest));
t->compression = compression;
t->size = ceil(compression * M_PI / 2) + 1;
t->threshold = 7.5 + 0.37 * compression - 2e-4 * pow(compression, 2);
t->size = GET_CENTROID(compression);
t->threshold = GET_THRESHOLD(compression);
t->min = INFINITY;
return t;
}
TDigest *tdigestNewFrom(void* pBuf, int compression) {
memset(pBuf, 0, sizeof(TDigest) + sizeof(Centroid)*(compression + 1));
TDigest* t = (TDigest*)pBuf;
tdigestAutoFill(t, compression);
t->compression = compression;
t->size = GET_CENTROID(compression);
t->threshold = GET_THRESHOLD(compression);
t->min = INFINITY;
return t;
}
static int centroid_cmp(const void *a, const void *b) {
struct Centroid *c1 = (struct Centroid *) a;
struct Centroid *c2 = (struct Centroid *) b;
Centroid *c1 = (Centroid *) a;
Centroid *c2 = (Centroid *) b;
if (c1->mean < c2->mean)
return -1;
if (c1->mean > c2->mean)
......@@ -56,19 +87,19 @@ static int centroid_cmp(const void *a, const void *b) {
return 0;
}
struct MergeArgs {
struct TDigest *t;
struct Centroid *centroids;
typedef struct MergeArgs {
TDigest *t;
Centroid *centroids;
int idx;
double weight_so_far;
double k1;
double min;
double max;
};
}MergeArgs;
static void merge_centroid(struct MergeArgs *args, struct Centroid *merge) {
static void merge_centroid(MergeArgs *args, Centroid *merge) {
double k2;
struct Centroid *c = &args->centroids[args->idx];
Centroid *c = &args->centroids[args->idx];
args->weight_so_far += merge->weight;
k2 = INTEGRATED_LOCATION(args->t->compression,
......@@ -90,43 +121,32 @@ static void merge_centroid(struct MergeArgs *args, struct Centroid *merge) {
}
}
void tdigestCompress(struct TDigest *t) {
struct Centroid *unmerged_centroids;
void tdigestCompress(TDigest *t) {
Centroid *unmerged_centroids;
long long unmerged_weight = 0;
int num_unmerged = t->num_buffered_pts;
int old_num_centroids = t->num_centroids;
int i, j;
struct MergeArgs args;
MergeArgs args;
if (!t->num_buffered_pts)
return;
unmerged_centroids = malloc(sizeof(struct Centroid) * t->num_buffered_pts);
i = 0;
unmerged_centroids = malloc(sizeof(Centroid) * t->num_buffered_pts);
for (i = 0; i < num_unmerged; i++) {
struct Point *p = t->buffered_pts;
struct Centroid *c = &unmerged_centroids[i];
Point *p = t->buffered_pts + i;
Centroid *c = &unmerged_centroids[i];
c->mean = p->value;
c->weight = p->weight;
unmerged_weight += c->weight;
t->buffered_pts = p->next;
free(p);
}
t->num_buffered_pts = 0;
t->total_weight += unmerged_weight;
qsort(unmerged_centroids, num_unmerged, sizeof(struct Centroid),
centroid_cmp);
memset(&args, 0, sizeof(struct MergeArgs));
args.centroids = malloc(sizeof(struct Centroid) * t->size);
memset(args.centroids, 0, sizeof(struct Centroid) * t->size);
qsort(unmerged_centroids, num_unmerged, sizeof(Centroid), centroid_cmp);
memset(&args, 0, sizeof(MergeArgs));
args.centroids = malloc(sizeof(Centroid) * t->size);
memset(args.centroids, 0, sizeof(Centroid) * t->size);
args.t = t;
args.min = INFINITY;
......@@ -134,8 +154,8 @@ void tdigestCompress(struct TDigest *t) {
i = 0;
j = 0;
while (i < num_unmerged && j < t->num_centroids) {
struct Centroid *a = &unmerged_centroids[i];
struct Centroid *b = &t->centroids[j];
Centroid *a = &unmerged_centroids[i];
Centroid *b = &t->centroids[j];
if (a->mean <= b->mean) {
merge_centroid(&args, a);
......@@ -164,42 +184,31 @@ void tdigestCompress(struct TDigest *t) {
t->max = MAX(t->max, args.max);
}
if (t->num_centroids > old_num_centroids) {
t->centroids = realloc(t->centroids,
sizeof(struct Centroid) * t->num_centroids);
}
memcpy(t->centroids, args.centroids,
sizeof(struct Centroid) * t->num_centroids);
memcpy(t->centroids, args.centroids, sizeof(Centroid) * t->num_centroids);
free(args.centroids);
}
void tdigestAdd(struct TDigest *t, double x, long long w) {
void tdigestAdd(TDigest* t, double x, long long w) {
if (w == 0)
return;
struct Point *p = malloc(sizeof(struct Point));
p->value = x;
p->weight = w;
p->next = t->buffered_pts;
t->buffered_pts = p;
int i = t->num_buffered_pts;
t->buffered_pts[i].value = x;
t->buffered_pts[i].weight = w;
t->num_buffered_pts++;
if (t->num_buffered_pts > t->threshold)
tdigestCompress(t);
}
double tdigestCDF(struct TDigest *t, double x) {
double tdigestCDF(TDigest *t, double x) {
if (t == NULL)
return 0;
int i;
double left, right;
long long weight_so_far;
struct Centroid *a, *b, tmp;
Centroid *a, *b, tmp;
tdigestCompress(t);
......@@ -225,7 +234,7 @@ double tdigestCDF(struct TDigest *t, double x) {
right = 0;
for (i = 0; i < t->num_centroids; i++) {
struct Centroid *c = &t->centroids[i];
Centroid *c = &t->centroids[i];
left = b->mean - (a->mean + right);
a = b;
......@@ -255,14 +264,14 @@ double tdigestCDF(struct TDigest *t, double x) {
return 1;
}
double tdigestQuantile(struct TDigest *t, double q) {
double tdigestQuantile(TDigest *t, double q) {
if (t == NULL)
return 0;
int i;
double left, right, idx;
long long weight_so_far;
struct Centroid *a, *b, tmp;
Centroid *a, *b, tmp;
tdigestCompress(t);
......@@ -287,7 +296,7 @@ double tdigestQuantile(struct TDigest *t, double q) {
right = t->min;
for (i = 0; i < t->num_centroids; i++) {
struct Centroid *c = &t->centroids[i];
Centroid *c = &t->centroids[i];
a = b;
left = right;
......@@ -315,31 +324,24 @@ double tdigestQuantile(struct TDigest *t, double q) {
return t->max;
}
void tdigestMerge(struct TDigest *t1, struct TDigest *t2) {
int i = t2->num_buffered_pts;
struct Point *p = t2->buffered_pts;
while (i) {
tdigestAdd(t1, p->value, p->weight);
p = p->next;
i--;
void tdigestMerge(TDigest *t1, TDigest *t2) {
// points
int num_points = t2->num_buffered_pts;
for(int i = num_points - 1; i >= 0; i--) {
Point* p = t2->buffered_pts + i;
tdigestAdd(t1, p->value, p->value);
t2->num_buffered_pts --;
}
for (i = 0; i < t2->num_centroids; i++) {
// centroids
for (int i = 0; i < t2->num_centroids; i++) {
tdigestAdd(t1, t2->centroids[i].mean, t2->centroids[i].weight);
}
}
void tdigestFree(struct TDigest *t) {
while (t->buffered_pts) {
struct Point *p = t->buffered_pts;
t->buffered_pts = t->buffered_pts->next;
free(p);
}
if (t->centroids)
free(t->centroids);
void tdigestFree(TDigest *t) {
free(t);
}
void tdigestFreeFrom(TDigest *t) {
// nothing to do
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册