提交 ffc9558e 编写于 作者: A AlexDuan

put code in order

上级 0b92dee8
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/* /*
* include/tdigest.c * include/tdigest.c
* *
...@@ -7,17 +22,19 @@ ...@@ -7,17 +22,19 @@
#ifndef TDIGEST_H #ifndef TDIGEST_H
#define TDIGEST_H #define TDIGEST_H
#define DEFAULT_COMPRESSION 400 #define COMPRESSION 400
#define GET_CENTROID(compression) (ceil(compression * M_PI / 2) + 1)
#define GET_THRESHOLD(compression) (7.5 + 0.37 * compression - 2e-4 * pow(compression, 2))
#define TDIGEST_SIZE(compression) (sizeof(TDigest) + sizeof(Centroid)*GET_CENTROID(compression) + sizeof(Point)*GET_THRESHOLD(compression))
typedef struct Centroid { typedef struct Centroid {
long long weight;
double mean; double mean;
long long weight;
}Centroid; }Centroid;
typedef struct Point { typedef struct Point {
double value; double value;
long long weight; long long weight;
struct Point *next;
}Point; }Point;
typedef struct TDigest { typedef struct TDigest {
...@@ -36,12 +53,13 @@ typedef struct TDigest { ...@@ -36,12 +53,13 @@ typedef struct TDigest {
Centroid *centroids; Centroid *centroids;
}TDigest; }TDigest;
extern struct TDigest *tdigestNew(int compression); TDigest *tdigestNewFrom(void* pBuf, int compression);
extern void tdigestAdd(struct TDigest *t, double x, long long w); void tdigestAdd(TDigest *t, double x, long long w);
extern void tdigestMerge(struct TDigest *t1, struct TDigest *t2); void tdigestMerge(TDigest *t1, TDigest *t2);
extern double tdigestCDF(struct TDigest *t, double x); double tdigestCDF(TDigest *t, double x);
extern double tdigestQuantile(struct TDigest *t, double q); double tdigestQuantile(TDigest *t, double q);
extern void tdigestCompress(struct TDigest *t); void tdigestCompress(TDigest *t);
extern void tdigestFree(struct TDigest *t); void tdigestFreeFrom(TDigest *t);
void tdigestAutoFill(TDigest* t, int compression);
#endif /* TDIGEST_H */ #endif /* TDIGEST_H */
...@@ -2442,7 +2442,6 @@ static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) { ...@@ -2442,7 +2442,6 @@ static SAPercentileInfo *getAPerctInfo(SQLFunctionCtx *pCtx) {
return pInfo; return pInfo;
} }
// //
// ----------------- tdigest ------------------- // ----------------- tdigest -------------------
// //
...@@ -2477,7 +2476,6 @@ static void tdigest_do(SQLFunctionCtx *pCtx) { ...@@ -2477,7 +2476,6 @@ static void tdigest_do(SQLFunctionCtx *pCtx) {
if (pCtx->hasNull && isNull(data, pCtx->inputType)) { if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
continue; continue;
} }
notNullElems += 1; notNullElems += 1;
double v = 0; // value double v = 0; // value
...@@ -2486,14 +2484,11 @@ static void tdigest_do(SQLFunctionCtx *pCtx) { ...@@ -2486,14 +2484,11 @@ static void tdigest_do(SQLFunctionCtx *pCtx) {
tdigestAdd(pAPerc->pTDigest, v, w); tdigestAdd(pAPerc->pTDigest, v, w);
} }
//tdigestCompress(pAPerc->pTDigest);
if (!pCtx->hasNull) { if (!pCtx->hasNull) {
assert(pCtx->size == notNullElems); assert(pCtx->size == notNullElems);
} }
SET_VAL(pCtx, notNullElems, 1); SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) { if (notNullElems > 0) {
pResInfo->hasResult = DATA_SET_FLAG; pResInfo->hasResult = DATA_SET_FLAG;
} }
...@@ -2548,9 +2543,7 @@ static void tdigest_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2548,9 +2543,7 @@ static void tdigest_finalizer(SQLFunctionCtx *pCtx) {
} }
} }
tdigestFreeFrom(pAPerc->pTDigest);
pAPerc->pTDigest = NULL; pAPerc->pTDigest = NULL;
doFinalizer(pCtx); doFinalizer(pCtx);
} }
...@@ -2653,7 +2646,6 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) { ...@@ -2653,7 +2646,6 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, 1, 1); SET_VAL(pCtx, 1, 1);
} }
static void apercentile_finalizer(SQLFunctionCtx *pCtx) { static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
if (getAlgo(pCtx) == ALGO_TDIGEST) { if (getAlgo(pCtx) == ALGO_TDIGEST) {
tdigest_finalizer(pCtx); tdigest_finalizer(pCtx);
......
...@@ -31,7 +31,6 @@ ...@@ -31,7 +31,6 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include "tdigest.h" #include "tdigest.h"
#define INTERPOLATE(x, x0, x1) (((x) - (x0)) / ((x1) - (x0))) #define INTERPOLATE(x, x0, x1) (((x) - (x0)) / ((x1) - (x0)))
...@@ -47,24 +46,21 @@ ...@@ -47,24 +46,21 @@
({ __typeof__ (a) _a = (a); \ ({ __typeof__ (a) _a = (a); \
__typeof__ (b) _b = (b); \ __typeof__ (b) _b = (b); \
_a < _b ? _a : _b; }) _a < _b ? _a : _b; })
typedef struct MergeArgs {
TDigest *t;
Centroid *centroids;
int idx;
double weight_so_far;
double k1;
double min;
double max;
}MergeArgs;
void tdigestAutoFill(TDigest* t, int compression) { void tdigestAutoFill(TDigest* t, int compression) {
t->centroids = (Centroid*)((char*)t + sizeof(TDigest)); t->centroids = (Centroid*)((char*)t + sizeof(TDigest));
t->buffered_pts = (Point*) ((char*)t + sizeof(TDigest) + sizeof(Centroid) * (int)GET_CENTROID(compression)); t->buffered_pts = (Point*) ((char*)t + sizeof(TDigest) + sizeof(Centroid) * (int)GET_CENTROID(compression));
} }
TDigest *tdigestNew(int compression) {
TDigest *t = malloc(sizeof(TDigest));
memset(t, 0, sizeof(TDigest));
t->compression = compression;
t->size = GET_CENTROID(compression);
t->threshold = GET_THRESHOLD(compression);
t->min = INFINITY;
return t;
}
TDigest *tdigestNewFrom(void* pBuf, int compression) { TDigest *tdigestNewFrom(void* pBuf, int compression) {
memset(pBuf, 0, sizeof(TDigest) + sizeof(Centroid)*(compression + 1)); memset(pBuf, 0, sizeof(TDigest) + sizeof(Centroid)*(compression + 1));
TDigest* t = (TDigest*)pBuf; TDigest* t = (TDigest*)pBuf;
...@@ -87,16 +83,6 @@ static int centroid_cmp(const void *a, const void *b) { ...@@ -87,16 +83,6 @@ static int centroid_cmp(const void *a, const void *b) {
return 0; return 0;
} }
typedef struct MergeArgs {
TDigest *t;
Centroid *centroids;
int idx;
double weight_so_far;
double k1;
double min;
double max;
}MergeArgs;
static void merge_centroid(MergeArgs *args, Centroid *merge) { static void merge_centroid(MergeArgs *args, Centroid *merge) {
double k2; double k2;
Centroid *c = &args->centroids[args->idx]; Centroid *c = &args->centroids[args->idx];
...@@ -168,7 +154,6 @@ void tdigestCompress(TDigest *t) { ...@@ -168,7 +154,6 @@ void tdigestCompress(TDigest *t) {
while (i < num_unmerged) while (i < num_unmerged)
merge_centroid(&args, &unmerged_centroids[i++]); merge_centroid(&args, &unmerged_centroids[i++]);
free(unmerged_centroids); free(unmerged_centroids);
while (j < t->num_centroids) while (j < t->num_centroids)
...@@ -256,10 +241,10 @@ double tdigestCDF(TDigest *t, double x) { ...@@ -256,10 +241,10 @@ double tdigestCDF(TDigest *t, double x) {
a = b; a = b;
right = t->max - a->mean; right = t->max - a->mean;
if (x < a->mean + right) if (x < a->mean + right) {
return (weight_so_far return (weight_so_far + a->weight * INTERPOLATE(x, a->mean - left, a->mean + right))
+ a->weight * INTERPOLATE(x, a->mean - left, a->mean + right))
/ t->total_weight; / t->total_weight;
}
return 1; return 1;
} }
...@@ -301,14 +286,11 @@ double tdigestQuantile(TDigest *t, double q) { ...@@ -301,14 +286,11 @@ double tdigestQuantile(TDigest *t, double q) {
left = right; left = right;
b = c; b = c;
right = (b->weight * a->mean + a->weight * b->mean) right = (b->weight * a->mean + a->weight * b->mean)/ (a->weight + b->weight);
/ (a->weight + b->weight);
if (idx < weight_so_far + a->weight) { if (idx < weight_so_far + a->weight) {
double p = (idx - weight_so_far) / a->weight; double p = (idx - weight_so_far) / a->weight;
return left * (1 - p) + right * p; return left * (1 - p) + right * p;
} }
weight_so_far += a->weight; weight_so_far += a->weight;
} }
...@@ -337,11 +319,3 @@ void tdigestMerge(TDigest *t1, TDigest *t2) { ...@@ -337,11 +319,3 @@ void tdigestMerge(TDigest *t1, TDigest *t2) {
tdigestAdd(t1, t2->centroids[i].mean, t2->centroids[i].weight); tdigestAdd(t1, t2->centroids[i].mean, t2->centroids[i].weight);
} }
} }
\ No newline at end of file
void tdigestFree(TDigest *t) {
free(t);
}
void tdigestFreeFrom(TDigest *t) {
// nothing to do
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册