From 1e7fe21150b74cdd83f00ab4a2845733e1d27873 Mon Sep 17 00:00:00 2001 From: wpan Date: Thu, 16 Sep 2021 11:20:05 +0800 Subject: [PATCH] fix bug and add ut --- src/query/inc/tdigest.h | 4 +- src/query/src/tdigest.c | 78 ++++---------------- src/query/tests/apercentileTest.cpp | 109 +++++++++++++++++++++++----- 3 files changed, 107 insertions(+), 84 deletions(-) diff --git a/src/query/inc/tdigest.h b/src/query/inc/tdigest.h index df57f983b1..0b8da2f557 100644 --- a/src/query/inc/tdigest.h +++ b/src/query/inc/tdigest.h @@ -26,8 +26,9 @@ #define M_PI 3.14159265358979323846264338327950288 /* pi */ #endif +#define ADDITION_CENTROID_NUM 100 #define COMPRESSION 400 -#define GET_CENTROID(compression) (ceil(compression * M_PI / 2) + 1 + 3) // addition 3 centroid +#define GET_CENTROID(compression) (ceil(compression * M_PI / 2) + 1 + ADDITION_CENTROID_NUM) // addition 3 centroid #define GET_THRESHOLD(compression) (7.5 + 0.37 * compression - 2e-4 * pow(compression, 2)) #define TDIGEST_SIZE(compression) (sizeof(TDigest) + sizeof(SCentroid)*GET_CENTROID(compression) + sizeof(SPt)*GET_THRESHOLD(compression)) @@ -60,7 +61,6 @@ typedef struct TDigest { TDigest *tdigestNewFrom(void* pBuf, int32_t compression); void tdigestAdd(TDigest *t, double x, int64_t w); void tdigestMerge(TDigest *t1, TDigest *t2); -double tdigestCDF(TDigest *t, double x); double tdigestQuantile(TDigest *t, double q); void tdigestCompress(TDigest *t); void tdigestFreeFrom(TDigest *t); diff --git a/src/query/src/tdigest.c b/src/query/src/tdigest.c index c10ae020e3..64866610a3 100644 --- a/src/query/src/tdigest.c +++ b/src/query/src/tdigest.c @@ -80,6 +80,8 @@ static void mergeCentroid(SMergeArgs *args, SCentroid *merge) { if (k2 - args->k1 > 1 && c->weight > 0) { if(args->idx + 1 < args->t->size) { // check avoid overflow args->idx++; + } else { + assert(0); } args->k1 = INTEGRATED_LOCATION(args->t->compression, (args->weight_so_far - merge->weight) / args->t->total_weight); @@ -100,7 +102,7 @@ void tdigestCompress(TDigest *t) { int64_t unmerged_weight = 0; int32_t num_unmerged = t->num_buffered_pts; int32_t i, j; - SMergeArgs args; + SMergeArgs args = {0}; if (!t->num_buffered_pts) return; @@ -117,7 +119,6 @@ void tdigestCompress(TDigest *t) { t->total_weight += unmerged_weight; qsort(unmerged_centroids, num_unmerged, sizeof(SCentroid), cmpCentroid); - memset(&args, 0, sizeof(SMergeArgs)); args.centroids = (SCentroid*)malloc((size_t)(sizeof(SCentroid) * t->size)); memset(args.centroids, 0, (size_t)(sizeof(SCentroid) * t->size)); @@ -132,24 +133,24 @@ void tdigestCompress(TDigest *t) { if (a->mean <= b->mean) { mergeCentroid(&args, a); - assert(args.idx < t->size); + assert(args.idx < (t->size)); i++; } else { mergeCentroid(&args, b); - assert(args.idx < t->size); + assert(args.idx < (t->size)); j++; } } while (i < num_unmerged) { mergeCentroid(&args, &unmerged_centroids[i++]); - assert(args.idx < t->size); + assert(args.idx < (t->size)); } free((void*)unmerged_centroids); while (j < t->num_centroids) { mergeCentroid(&args, &t->centroids[j++]); - assert(args.idx < t->size); + assert(args.idx < (t->size)); } if (t->total_weight > 0) { @@ -161,6 +162,12 @@ void tdigestCompress(TDigest *t) { t->max = MAX(t->max, args.max); } + static int32_t maxcentroids = t->size - 10; + if (t->num_centroids > maxcentroids) { + maxcentroids = t->num_centroids; + printf("maxcentroids:%d\n", maxcentroids); + } + memcpy(t->centroids, args.centroids, sizeof(SCentroid) * t->num_centroids); free((void*)args.centroids); } @@ -178,65 +185,6 @@ void tdigestAdd(TDigest* t, double x, int64_t w) { tdigestCompress(t); } -double tdigestCDF(TDigest *t, double x) { - if (t == NULL) - return 0; - - int32_t i; - double left, right; - int64_t weight_so_far; - SCentroid *a, *b, tmp; - - tdigestCompress(t); - if (t->num_centroids == 0) - return NAN; - if (x < t->min) - return 0; - if (x > t->max) - return 1; - if (t->num_centroids == 1) { - if (FLOAT_EQ(t->max, t->min)) - return 0.5; - - return INTERPOLATE(x, t->min, t->max); - } - - weight_so_far = 0; - a = b = &tmp; - b->mean = t->min; - b->weight = 0; - right = 0; - - for (i = 0; i < t->num_centroids; i++) { - SCentroid *c = &t->centroids[i]; - - left = b->mean - (a->mean + right); - a = b; - b = c; - right = (b->mean - a->mean) * a->weight / (a->weight + b->weight); - - if (x < a->mean + right) { - double cdf = (weight_so_far - + a->weight - * INTERPOLATE(x, a->mean - left, a->mean + right)) - / t->total_weight; - return MAX(cdf, 0.0); - } - - weight_so_far += a->weight; - } - - left = b->mean - (a->mean + right); - a = b; - right = t->max - a->mean; - - if (x < a->mean + right) { - return (weight_so_far + a->weight * INTERPOLATE(x, a->mean - left, a->mean + right)) - / t->total_weight; - } - - return 1; -} double tdigestQuantile(TDigest *t, double q) { if (t == NULL) diff --git a/src/query/tests/apercentileTest.cpp b/src/query/tests/apercentileTest.cpp index 3fc5f00e9b..b11102f57a 100644 --- a/src/query/tests/apercentileTest.cpp +++ b/src/query/tests/apercentileTest.cpp @@ -6,12 +6,15 @@ #include "taosdef.h" #include "assert.h" +#include "qHistogram.h" #pragma GCC diagnostic ignored "-Wunused-function" #pragma GCC diagnostic ignored "-Wunused-variable" extern "C" { #include "tdigest.h" +#include "qHistogram.h" + } @@ -37,6 +40,12 @@ void tdigest_init(TDigest **pTDigest) { *pTDigest = tdigestNewFrom(tmp, COMPRESSION); } +void thistogram_init(SHistogramInfo **pHisto) { + void *tmp = calloc(1, (int16_t)(sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo))); + *pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN); +} + + static FORCE_INLINE int64_t testGetTimestampUs() { struct timeval systemTime; gettimeofday(&systemTime, NULL); @@ -44,6 +53,15 @@ static FORCE_INLINE int64_t testGetTimestampUs() { } +double * thistogram_end(SHistogramInfo* pHisto, double* ratio, int32_t num){ + assert(pHisto->numOfElems > 0); + + double ratio2 = *ratio * 100; + + return tHistogramUniform(pHisto, &ratio2, 1); +} + + void setTestData(void *data, int64_t idx, int32_t type, int64_t value) { switch (type) { case TEST_DATA_TYPE_INT: @@ -64,7 +82,7 @@ void setTestData(void *data, int64_t idx, int32_t type, int64_t value) { } -void addTestData(void *data, int64_t idx, int32_t type, TDigest* pTDigest) { +void addDTestData(void *data, int64_t idx, int32_t type, TDigest* pTDigest) { switch (type) { case TEST_DATA_TYPE_INT: tdigestAdd(pTDigest, (double)*((int32_t*)data + idx), 1); @@ -83,6 +101,26 @@ void addTestData(void *data, int64_t idx, int32_t type, TDigest* pTDigest) { } } +void addHTestData(void *data, int64_t idx, int32_t type, SHistogramInfo *pHisto) { + switch (type) { + case TEST_DATA_TYPE_INT: + tHistogramAdd(&pHisto, (double)*((int32_t*)data + idx)); + break; + case TEST_DATA_TYPE_BIGINT: + tHistogramAdd(&pHisto, (double)*((int64_t*)data + idx)); + break; + case TEST_DATA_TYPE_FLOAT: + tHistogramAdd(&pHisto, (double)*((float*)data + idx)); + break; + case TEST_DATA_TYPE_DOUBLE: + tHistogramAdd(&pHisto, (double)*((double*)data + idx)); + break; + default: + assert(0); + } +} + + void initTestData(void **data, int32_t type, int64_t num, int32_t mode, int32_t randPar) { @@ -134,13 +172,16 @@ void tdigestTest() { TDigest *pTDigest = NULL; void *data = NULL; - - int64_t totalNum[] = {100,10000,10000000}; + SHistogramInfo *pHisto = NULL; + double ratio = 0.5; + + int64_t totalNum[] = {100,10000,1000000000}; int32_t numTimes = sizeof(totalNum)/sizeof(totalNum[0]); int64_t biggestNum = totalNum[numTimes - 1]; int32_t unitNum[] = {1,10,100,1000,5000,10000,100000}; int32_t unitTimes = sizeof(unitNum)/sizeof(unitNum[0]); - int32_t dataMode[] = {TEST_DATA_MODE_SEQ, TEST_DATA_MODE_DSEQ, TEST_DATA_MODE_RAND_PER, TEST_DATA_MODE_RAND_LIMIT}; +// int32_t dataMode[] = {TEST_DATA_MODE_SEQ, TEST_DATA_MODE_DSEQ, TEST_DATA_MODE_RAND_PER, TEST_DATA_MODE_RAND_LIMIT}; + int32_t dataMode[] = {TEST_DATA_MODE_SEQ}; int32_t modeTimes = sizeof(dataMode)/sizeof(dataMode[0]); int32_t dataTypes[] = {TEST_DATA_TYPE_INT, TEST_DATA_TYPE_BIGINT, TEST_DATA_TYPE_FLOAT, TEST_DATA_TYPE_DOUBLE}; int32_t typeTimes = sizeof(dataTypes)/sizeof(dataTypes[0]); @@ -149,7 +190,7 @@ void tdigestTest() { int32_t randLimits[] = {10, 50, 100, 1000, 10000}; int32_t randLTimes = sizeof(randLimits)/sizeof(randLimits[0]); - double useTime[10][10][10][10] = {0.0}; + double useTime[2][10][10][10][10] = {0.0}; for (int32_t i = 0; i < modeTimes; ++i) { if (dataMode[i] == TEST_DATA_MODE_RAND_PER) { @@ -160,12 +201,23 @@ void tdigestTest() { int64_t startu = testGetTimestampUs(); tdigest_init(&pTDigest); for (int64_t n = 0; n < totalNum[m]; ++n) { - addTestData(data, n, dataTypes[j], pTDigest); + addDTestData(data, n, dataTypes[j], pTDigest); } - double res = tdigestQuantile(pTDigest, 50/100); + double res = tdigestQuantile(pTDigest, ratio); free(pTDigest); - useTime[i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000; - printf("Mode:%d,Type:%d,Num:%"PRId64",randP:%d,Used:%fms\n", dataMode[i], dataTypes[j], totalNum[m], randPers[p], useTime[i][j][m][p]); + useTime[0][i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000; + printf("DMode:%d,Type:%d,Num:%"PRId64",randP:%d,Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], randPers[p], useTime[0][i][j][m][p], res); + + startu = testGetTimestampUs(); + thistogram_init(&pHisto); + for (int64_t n = 0; n < totalNum[m]; ++n) { + addHTestData(data, n, dataTypes[j], pHisto); + } + double *res2 = thistogram_end(pHisto, &ratio, 1); + free(pHisto); + useTime[1][i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000; + printf("HMode:%d,Type:%d,Num:%"PRId64",randP:%d,Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], randPers[p], useTime[1][i][j][m][p], *res2); + } free(data); } @@ -178,12 +230,23 @@ void tdigestTest() { int64_t startu = testGetTimestampUs(); tdigest_init(&pTDigest); for (int64_t n = 0; n < totalNum[m]; ++n) { - addTestData(data, m, dataTypes[j], pTDigest); + addDTestData(data, m, dataTypes[j], pTDigest); } - double res = tdigestQuantile(pTDigest, 50/100); + double res = tdigestQuantile(pTDigest, ratio); free(pTDigest); - useTime[i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000; - printf("Mode:%d,Type:%d,Num:%"PRId64",randL:%d,Used:%fms\n", dataMode[i], dataTypes[j], totalNum[m], randLimits[p], useTime[i][j][m][p]); + useTime[0][i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000; + printf("DMode:%d,Type:%d,Num:%"PRId64",randL:%d,Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], randLimits[p], useTime[0][i][j][m][p], res); + + + startu = testGetTimestampUs(); + thistogram_init(&pHisto); + for (int64_t n = 0; n < totalNum[m]; ++n) { + addHTestData(data, n, dataTypes[j], pHisto); + } + double* res2 = thistogram_end(pHisto, &ratio, 1); + free(pHisto); + useTime[1][i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000; + printf("HMode:%d,Type:%d,Num:%"PRId64",randL:%d,Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], randLimits[p], useTime[1][i][j][m][p], *res2); } free(data); } @@ -195,12 +258,24 @@ void tdigestTest() { int64_t startu = testGetTimestampUs(); tdigest_init(&pTDigest); for (int64_t n = 0; n < totalNum[m]; ++n) { - addTestData(data, m, dataTypes[j], pTDigest); + addDTestData(data, n, dataTypes[j], pTDigest); } - double res = tdigestQuantile(pTDigest, 50/100); + double res = tdigestQuantile(pTDigest, ratio); free(pTDigest); - useTime[i][j][m][0] = ((double)(testGetTimestampUs() - startu))/1000; - printf("Mode:%d,Type:%d,Num:%"PRId64",Used:%fms\n", dataMode[i], dataTypes[j], totalNum[m], useTime[i][j][m][0]); + useTime[0][i][j][m][0] = ((double)(testGetTimestampUs() - startu))/1000; + printf("DMode:%d,Type:%d,Num:%"PRId64",Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], useTime[0][i][j][m][0], res); + + + startu = testGetTimestampUs(); + thistogram_init(&pHisto); + for (int64_t n = 0; n < totalNum[m]; ++n) { + addHTestData(data, n, dataTypes[j], pHisto); + } + double* res2 = thistogram_end(pHisto, &ratio, 1); + free(pHisto); + useTime[1][i][j][m][0] = ((double)(testGetTimestampUs() - startu))/1000; + printf("HMode:%d,Type:%d,Num:%"PRId64",Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], useTime[1][i][j][m][0], *res2); + } free(data); } -- GitLab