diff --git a/src/query/inc/tdigest.h b/src/query/inc/tdigest.h index 0b8da2f557294a415df80b0b7cf4ed5141b086b8..81bbdb0226ca437d45a50d38250543d4e7a9d5c4 100644 --- a/src/query/inc/tdigest.h +++ b/src/query/inc/tdigest.h @@ -28,9 +28,9 @@ #define ADDITION_CENTROID_NUM 100 #define COMPRESSION 400 -#define GET_CENTROID(compression) (ceil(compression * M_PI / 2) + 1 + ADDITION_CENTROID_NUM) // addition 3 centroid +#define GET_CENTROID(compression) (ceil(compression * M_PI / 2) + 1 + ADDITION_CENTROID_NUM) #define GET_THRESHOLD(compression) (7.5 + 0.37 * compression - 2e-4 * pow(compression, 2)) -#define TDIGEST_SIZE(compression) (sizeof(TDigest) + sizeof(SCentroid)*GET_CENTROID(compression) + sizeof(SPt)*GET_THRESHOLD(compression)) +#define TDIGEST_SIZE(compression) (sizeof(TDigest) + sizeof(SPt)*GET_THRESHOLD(compression)) typedef struct SCentroid { double mean; @@ -64,6 +64,6 @@ void tdigestMerge(TDigest *t1, TDigest *t2); double tdigestQuantile(TDigest *t, double q); void tdigestCompress(TDigest *t); void tdigestFreeFrom(TDigest *t); -void tdigestAutoFill(TDigest* t, int32_t compression); +void tdigestCopy(TDigest* dst, TDigest* src); #endif /* TDIGEST_H */ diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 68a19937db8720f3bdf805589229d22afefac851..a015399fd5faaa475a07601cba590cc20a154b66 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -2510,8 +2510,7 @@ static void tdigest_merge(SQLFunctionCtx *pCtx) { SAPercentileInfo *pOutput = getAPerctInfo(pCtx); TDigest* pTDigest = pOutput->pTDigest; if(pTDigest->num_centroids == 0) { - memcpy(pTDigest, pInput->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION)); - tdigestAutoFill(pTDigest, COMPRESSION); + tdigestCopy(pTDigest, pInput->pTDigest); } else { tdigestMerge(pOutput->pTDigest, pInput->pTDigest); } diff --git a/src/query/src/tdigest.c b/src/query/src/tdigest.c index 64866610a3c94f1e48c900ea1b2092a8a06a0042..93149bd0d6d4a3d0b417ab8308a64857cfbd02af 100644 --- a/src/query/src/tdigest.c +++ b/src/query/src/tdigest.c @@ -42,16 +42,20 @@ typedef struct SMergeArgs { double max; }SMergeArgs; -void tdigestAutoFill(TDigest* t, int32_t compression) { - t->centroids = (SCentroid*)((char*)t + sizeof(TDigest)); - t->buffered_pts = (SPt*) ((char*)t + sizeof(TDigest) + sizeof(SCentroid) * (int32_t)GET_CENTROID(compression)); +void tdigestCopy(TDigest* dst, TDigest* src) { + memcpy(dst, src, (size_t)TDIGEST_SIZE(COMPRESSION)); + + dst->centroids = (SCentroid*)malloc((int32_t)GET_CENTROID(COMPRESSION) * sizeof(SCentroid)); + memcpy(dst->centroids, src->centroids, (int32_t)GET_CENTROID(COMPRESSION) * sizeof(SCentroid)); + dst->buffered_pts = (SPt*) ((char*)dst + sizeof(TDigest)); } TDigest *tdigestNewFrom(void* pBuf, int32_t compression) { memset(pBuf, 0, TDIGEST_SIZE(compression)); TDigest* t = (TDigest*)pBuf; - tdigestAutoFill(t, compression); - + + t->centroids = (SCentroid*)calloc((int32_t)GET_CENTROID(compression), sizeof(SCentroid)); + t->buffered_pts = (SPt*) ((char*)t + sizeof(TDigest)); t->compression = compression; t->size = (int64_t)GET_CENTROID(compression); t->threshold = (int32_t)GET_THRESHOLD(compression); @@ -98,29 +102,18 @@ static void mergeCentroid(SMergeArgs *args, SCentroid *merge) { } void tdigestCompress(TDigest *t) { - SCentroid *unmerged_centroids; - int64_t unmerged_weight = 0; + SCentroid *unmerged_centroids = (SCentroid *)t->buffered_pts; int32_t num_unmerged = t->num_buffered_pts; int32_t i, j; SMergeArgs args = {0}; - if (!t->num_buffered_pts) + if (t->num_buffered_pts <= 0) return; - unmerged_centroids = (SCentroid*)malloc(sizeof(SCentroid) * t->num_buffered_pts); - for (i = 0; i < num_unmerged; i++) { - SPt *p = t->buffered_pts + i; - SCentroid *c = &unmerged_centroids[i]; - c->mean = p->value; - c->weight = p->weight; - unmerged_weight += c->weight; - } t->num_buffered_pts = 0; - t->total_weight += unmerged_weight; qsort(unmerged_centroids, num_unmerged, sizeof(SCentroid), cmpCentroid); - args.centroids = (SCentroid*)malloc((size_t)(sizeof(SCentroid) * t->size)); - memset(args.centroids, 0, (size_t)(sizeof(SCentroid) * t->size)); + args.centroids = (SCentroid*)calloc(sizeof(SCentroid), t->size); args.t = t; args.min = INFINITY; @@ -146,7 +139,6 @@ void tdigestCompress(TDigest *t) { mergeCentroid(&args, &unmerged_centroids[i++]); assert(args.idx < (t->size)); } - free((void*)unmerged_centroids); while (j < t->num_centroids) { mergeCentroid(&args, &t->centroids[j++]); @@ -162,14 +154,8 @@ void tdigestCompress(TDigest *t) { t->max = MAX(t->max, args.max); } - static int32_t maxcentroids = t->size - 10; - if (t->num_centroids > maxcentroids) { - maxcentroids = t->num_centroids; - printf("maxcentroids:%d\n", maxcentroids); - } - - memcpy(t->centroids, args.centroids, sizeof(SCentroid) * t->num_centroids); - free((void*)args.centroids); + tfree(t->centroids); + t->centroids = args.centroids; } void tdigestAdd(TDigest* t, double x, int64_t w) { @@ -180,6 +166,7 @@ void tdigestAdd(TDigest* t, double x, int64_t w) { t->buffered_pts[i].value = x; t->buffered_pts[i].weight = w; t->num_buffered_pts++; + t->total_weight += w; if (t->num_buffered_pts >= t->threshold) tdigestCompress(t); diff --git a/src/query/tests/apercentileTest.cpp b/src/query/tests/apercentileTest.cpp index b11102f57a53e4edd6f1edc74a772db8c6ad297b..ab03f2e390315367f97c5f11ae50b6d290bb080c 100644 --- a/src/query/tests/apercentileTest.cpp +++ b/src/query/tests/apercentileTest.cpp @@ -175,13 +175,12 @@ void tdigestTest() { SHistogramInfo *pHisto = NULL; double ratio = 0.5; - int64_t totalNum[] = {100,10000,1000000000}; + int64_t totalNum[] = {100,10000,10000000}; int32_t numTimes = sizeof(totalNum)/sizeof(totalNum[0]); int64_t biggestNum = totalNum[numTimes - 1]; int32_t unitNum[] = {1,10,100,1000,5000,10000,100000}; int32_t unitTimes = sizeof(unitNum)/sizeof(unitNum[0]); -// int32_t dataMode[] = {TEST_DATA_MODE_SEQ, TEST_DATA_MODE_DSEQ, TEST_DATA_MODE_RAND_PER, TEST_DATA_MODE_RAND_LIMIT}; - int32_t dataMode[] = {TEST_DATA_MODE_SEQ}; + int32_t dataMode[] = {TEST_DATA_MODE_SEQ, TEST_DATA_MODE_DSEQ, TEST_DATA_MODE_RAND_PER, TEST_DATA_MODE_RAND_LIMIT}; int32_t modeTimes = sizeof(dataMode)/sizeof(dataMode[0]); int32_t dataTypes[] = {TEST_DATA_TYPE_INT, TEST_DATA_TYPE_BIGINT, TEST_DATA_TYPE_FLOAT, TEST_DATA_TYPE_DOUBLE}; int32_t typeTimes = sizeof(dataTypes)/sizeof(dataTypes[0]); @@ -290,30 +289,48 @@ void tdigestTest() { if (dataMode[i] == TEST_DATA_MODE_RAND_PER) { for (int32_t p = 0; p < randPTimes; ++p) { for (int32_t j = 0; j < typeTimes; ++j) { - printf("Mode:%d,Type:%d,randP:%d -", dataMode[i], dataTypes[j], randPers[p]); + printf("DMode:%d,Type:%d,randP:%d -", dataMode[i], dataTypes[j], randPers[p]); for (int32_t m = 0; m < numTimes; ++m) { - printf(" %d:%f", totalNum[m], useTime[i][j][m][p]); + printf(" %d:%f", totalNum[m], useTime[0][i][j][m][p]); } printf("\n"); + + printf("HMode:%d,Type:%d,randP:%d -", dataMode[i], dataTypes[j], randPers[p]); + for (int32_t m = 0; m < numTimes; ++m) { + printf(" %d:%f", totalNum[m], useTime[1][i][j][m][p]); + } + printf("\n"); } } } else if (dataMode[i] == TEST_DATA_MODE_RAND_LIMIT) { for (int32_t p = 0; p < randLTimes; ++p) { for (int32_t j = 0; j < typeTimes; ++j) { - printf("Mode:%d,Type:%d,randL:%d -", dataMode[i], dataTypes[j], randLimits[p]); + printf("DMode:%d,Type:%d,randL:%d -", dataMode[i], dataTypes[j], randLimits[p]); for (int64_t m = 0; m < numTimes; ++m) { - printf(" %d:%f", totalNum[m], useTime[i][j][m][p]); + printf(" %d:%f", totalNum[m], useTime[0][i][j][m][p]); } printf("\n"); + + printf("HMode:%d,Type:%d,randL:%d -", dataMode[i], dataTypes[j], randLimits[p]); + for (int64_t m = 0; m < numTimes; ++m) { + printf(" %d:%f", totalNum[m], useTime[1][i][j][m][p]); + } + printf("\n"); } } } else { for (int32_t j = 0; j < typeTimes; ++j) { - printf("Mode:%d,Type:%d -", dataMode[i], dataTypes[j]); + printf("DMode:%d,Type:%d -", dataMode[i], dataTypes[j]); for (int64_t m = 0; m < numTimes; ++m) { - printf(" %d:%f", totalNum[m], useTime[i][j][m][0]); + printf(" %d:%f", totalNum[m], useTime[0][i][j][m][0]); } printf("\n"); + + printf("HMode:%d,Type:%d -", dataMode[i], dataTypes[j]); + for (int64_t m = 0; m < numTimes; ++m) { + printf(" %d:%f", totalNum[m], useTime[1][i][j][m][0]); + } + printf("\n"); } } }