提交 1e7fe211 编写于 作者: W wpan

fix bug and add ut

上级 281746c1
......@@ -26,8 +26,9 @@
#define M_PI 3.14159265358979323846264338327950288 /* pi */
#endif
#define ADDITION_CENTROID_NUM 100
#define COMPRESSION 400
#define GET_CENTROID(compression) (ceil(compression * M_PI / 2) + 1 + 3) // addition 3 centroid
#define GET_CENTROID(compression) (ceil(compression * M_PI / 2) + 1 + ADDITION_CENTROID_NUM) // addition 3 centroid
#define GET_THRESHOLD(compression) (7.5 + 0.37 * compression - 2e-4 * pow(compression, 2))
#define TDIGEST_SIZE(compression) (sizeof(TDigest) + sizeof(SCentroid)*GET_CENTROID(compression) + sizeof(SPt)*GET_THRESHOLD(compression))
......@@ -60,7 +61,6 @@ typedef struct TDigest {
TDigest *tdigestNewFrom(void* pBuf, int32_t compression);
void tdigestAdd(TDigest *t, double x, int64_t w);
void tdigestMerge(TDigest *t1, TDigest *t2);
double tdigestCDF(TDigest *t, double x);
double tdigestQuantile(TDigest *t, double q);
void tdigestCompress(TDigest *t);
void tdigestFreeFrom(TDigest *t);
......
......@@ -80,6 +80,8 @@ static void mergeCentroid(SMergeArgs *args, SCentroid *merge) {
if (k2 - args->k1 > 1 && c->weight > 0) {
if(args->idx + 1 < args->t->size) { // check avoid overflow
args->idx++;
} else {
assert(0);
}
args->k1 = INTEGRATED_LOCATION(args->t->compression,
(args->weight_so_far - merge->weight) / args->t->total_weight);
......@@ -100,7 +102,7 @@ void tdigestCompress(TDigest *t) {
int64_t unmerged_weight = 0;
int32_t num_unmerged = t->num_buffered_pts;
int32_t i, j;
SMergeArgs args;
SMergeArgs args = {0};
if (!t->num_buffered_pts)
return;
......@@ -117,7 +119,6 @@ void tdigestCompress(TDigest *t) {
t->total_weight += unmerged_weight;
qsort(unmerged_centroids, num_unmerged, sizeof(SCentroid), cmpCentroid);
memset(&args, 0, sizeof(SMergeArgs));
args.centroids = (SCentroid*)malloc((size_t)(sizeof(SCentroid) * t->size));
memset(args.centroids, 0, (size_t)(sizeof(SCentroid) * t->size));
......@@ -132,24 +133,24 @@ void tdigestCompress(TDigest *t) {
if (a->mean <= b->mean) {
mergeCentroid(&args, a);
assert(args.idx < t->size);
assert(args.idx < (t->size));
i++;
} else {
mergeCentroid(&args, b);
assert(args.idx < t->size);
assert(args.idx < (t->size));
j++;
}
}
while (i < num_unmerged) {
mergeCentroid(&args, &unmerged_centroids[i++]);
assert(args.idx < t->size);
assert(args.idx < (t->size));
}
free((void*)unmerged_centroids);
while (j < t->num_centroids) {
mergeCentroid(&args, &t->centroids[j++]);
assert(args.idx < t->size);
assert(args.idx < (t->size));
}
if (t->total_weight > 0) {
......@@ -161,6 +162,12 @@ void tdigestCompress(TDigest *t) {
t->max = MAX(t->max, args.max);
}
static int32_t maxcentroids = t->size - 10;
if (t->num_centroids > maxcentroids) {
maxcentroids = t->num_centroids;
printf("maxcentroids:%d\n", maxcentroids);
}
memcpy(t->centroids, args.centroids, sizeof(SCentroid) * t->num_centroids);
free((void*)args.centroids);
}
......@@ -178,65 +185,6 @@ void tdigestAdd(TDigest* t, double x, int64_t w) {
tdigestCompress(t);
}
double tdigestCDF(TDigest *t, double x) {
if (t == NULL)
return 0;
int32_t i;
double left, right;
int64_t weight_so_far;
SCentroid *a, *b, tmp;
tdigestCompress(t);
if (t->num_centroids == 0)
return NAN;
if (x < t->min)
return 0;
if (x > t->max)
return 1;
if (t->num_centroids == 1) {
if (FLOAT_EQ(t->max, t->min))
return 0.5;
return INTERPOLATE(x, t->min, t->max);
}
weight_so_far = 0;
a = b = &tmp;
b->mean = t->min;
b->weight = 0;
right = 0;
for (i = 0; i < t->num_centroids; i++) {
SCentroid *c = &t->centroids[i];
left = b->mean - (a->mean + right);
a = b;
b = c;
right = (b->mean - a->mean) * a->weight / (a->weight + b->weight);
if (x < a->mean + right) {
double cdf = (weight_so_far
+ a->weight
* INTERPOLATE(x, a->mean - left, a->mean + right))
/ t->total_weight;
return MAX(cdf, 0.0);
}
weight_so_far += a->weight;
}
left = b->mean - (a->mean + right);
a = b;
right = t->max - a->mean;
if (x < a->mean + right) {
return (weight_so_far + a->weight * INTERPOLATE(x, a->mean - left, a->mean + right))
/ t->total_weight;
}
return 1;
}
double tdigestQuantile(TDigest *t, double q) {
if (t == NULL)
......
......@@ -6,12 +6,15 @@
#include "taosdef.h"
#include "assert.h"
#include "qHistogram.h"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
extern "C" {
#include "tdigest.h"
#include "qHistogram.h"
}
......@@ -37,6 +40,12 @@ void tdigest_init(TDigest **pTDigest) {
*pTDigest = tdigestNewFrom(tmp, COMPRESSION);
}
void thistogram_init(SHistogramInfo **pHisto) {
void *tmp = calloc(1, (int16_t)(sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo)));
*pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN);
}
static FORCE_INLINE int64_t testGetTimestampUs() {
struct timeval systemTime;
gettimeofday(&systemTime, NULL);
......@@ -44,6 +53,15 @@ static FORCE_INLINE int64_t testGetTimestampUs() {
}
double * thistogram_end(SHistogramInfo* pHisto, double* ratio, int32_t num){
assert(pHisto->numOfElems > 0);
double ratio2 = *ratio * 100;
return tHistogramUniform(pHisto, &ratio2, 1);
}
void setTestData(void *data, int64_t idx, int32_t type, int64_t value) {
switch (type) {
case TEST_DATA_TYPE_INT:
......@@ -64,7 +82,7 @@ void setTestData(void *data, int64_t idx, int32_t type, int64_t value) {
}
void addTestData(void *data, int64_t idx, int32_t type, TDigest* pTDigest) {
void addDTestData(void *data, int64_t idx, int32_t type, TDigest* pTDigest) {
switch (type) {
case TEST_DATA_TYPE_INT:
tdigestAdd(pTDigest, (double)*((int32_t*)data + idx), 1);
......@@ -83,6 +101,26 @@ void addTestData(void *data, int64_t idx, int32_t type, TDigest* pTDigest) {
}
}
void addHTestData(void *data, int64_t idx, int32_t type, SHistogramInfo *pHisto) {
switch (type) {
case TEST_DATA_TYPE_INT:
tHistogramAdd(&pHisto, (double)*((int32_t*)data + idx));
break;
case TEST_DATA_TYPE_BIGINT:
tHistogramAdd(&pHisto, (double)*((int64_t*)data + idx));
break;
case TEST_DATA_TYPE_FLOAT:
tHistogramAdd(&pHisto, (double)*((float*)data + idx));
break;
case TEST_DATA_TYPE_DOUBLE:
tHistogramAdd(&pHisto, (double)*((double*)data + idx));
break;
default:
assert(0);
}
}
void initTestData(void **data, int32_t type, int64_t num, int32_t mode, int32_t randPar) {
......@@ -134,13 +172,16 @@ void tdigestTest() {
TDigest *pTDigest = NULL;
void *data = NULL;
SHistogramInfo *pHisto = NULL;
double ratio = 0.5;
int64_t totalNum[] = {100,10000,10000000};
int64_t totalNum[] = {100,10000,1000000000};
int32_t numTimes = sizeof(totalNum)/sizeof(totalNum[0]);
int64_t biggestNum = totalNum[numTimes - 1];
int32_t unitNum[] = {1,10,100,1000,5000,10000,100000};
int32_t unitTimes = sizeof(unitNum)/sizeof(unitNum[0]);
int32_t dataMode[] = {TEST_DATA_MODE_SEQ, TEST_DATA_MODE_DSEQ, TEST_DATA_MODE_RAND_PER, TEST_DATA_MODE_RAND_LIMIT};
// int32_t dataMode[] = {TEST_DATA_MODE_SEQ, TEST_DATA_MODE_DSEQ, TEST_DATA_MODE_RAND_PER, TEST_DATA_MODE_RAND_LIMIT};
int32_t dataMode[] = {TEST_DATA_MODE_SEQ};
int32_t modeTimes = sizeof(dataMode)/sizeof(dataMode[0]);
int32_t dataTypes[] = {TEST_DATA_TYPE_INT, TEST_DATA_TYPE_BIGINT, TEST_DATA_TYPE_FLOAT, TEST_DATA_TYPE_DOUBLE};
int32_t typeTimes = sizeof(dataTypes)/sizeof(dataTypes[0]);
......@@ -149,7 +190,7 @@ void tdigestTest() {
int32_t randLimits[] = {10, 50, 100, 1000, 10000};
int32_t randLTimes = sizeof(randLimits)/sizeof(randLimits[0]);
double useTime[10][10][10][10] = {0.0};
double useTime[2][10][10][10][10] = {0.0};
for (int32_t i = 0; i < modeTimes; ++i) {
if (dataMode[i] == TEST_DATA_MODE_RAND_PER) {
......@@ -160,12 +201,23 @@ void tdigestTest() {
int64_t startu = testGetTimestampUs();
tdigest_init(&pTDigest);
for (int64_t n = 0; n < totalNum[m]; ++n) {
addTestData(data, n, dataTypes[j], pTDigest);
addDTestData(data, n, dataTypes[j], pTDigest);
}
double res = tdigestQuantile(pTDigest, 50/100);
double res = tdigestQuantile(pTDigest, ratio);
free(pTDigest);
useTime[i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000;
printf("Mode:%d,Type:%d,Num:%"PRId64",randP:%d,Used:%fms\n", dataMode[i], dataTypes[j], totalNum[m], randPers[p], useTime[i][j][m][p]);
useTime[0][i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000;
printf("DMode:%d,Type:%d,Num:%"PRId64",randP:%d,Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], randPers[p], useTime[0][i][j][m][p], res);
startu = testGetTimestampUs();
thistogram_init(&pHisto);
for (int64_t n = 0; n < totalNum[m]; ++n) {
addHTestData(data, n, dataTypes[j], pHisto);
}
double *res2 = thistogram_end(pHisto, &ratio, 1);
free(pHisto);
useTime[1][i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000;
printf("HMode:%d,Type:%d,Num:%"PRId64",randP:%d,Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], randPers[p], useTime[1][i][j][m][p], *res2);
}
free(data);
}
......@@ -178,12 +230,23 @@ void tdigestTest() {
int64_t startu = testGetTimestampUs();
tdigest_init(&pTDigest);
for (int64_t n = 0; n < totalNum[m]; ++n) {
addTestData(data, m, dataTypes[j], pTDigest);
addDTestData(data, m, dataTypes[j], pTDigest);
}
double res = tdigestQuantile(pTDigest, 50/100);
double res = tdigestQuantile(pTDigest, ratio);
free(pTDigest);
useTime[i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000;
printf("Mode:%d,Type:%d,Num:%"PRId64",randL:%d,Used:%fms\n", dataMode[i], dataTypes[j], totalNum[m], randLimits[p], useTime[i][j][m][p]);
useTime[0][i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000;
printf("DMode:%d,Type:%d,Num:%"PRId64",randL:%d,Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], randLimits[p], useTime[0][i][j][m][p], res);
startu = testGetTimestampUs();
thistogram_init(&pHisto);
for (int64_t n = 0; n < totalNum[m]; ++n) {
addHTestData(data, n, dataTypes[j], pHisto);
}
double* res2 = thistogram_end(pHisto, &ratio, 1);
free(pHisto);
useTime[1][i][j][m][p] = ((double)(testGetTimestampUs() - startu))/1000;
printf("HMode:%d,Type:%d,Num:%"PRId64",randL:%d,Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], randLimits[p], useTime[1][i][j][m][p], *res2);
}
free(data);
}
......@@ -195,12 +258,24 @@ void tdigestTest() {
int64_t startu = testGetTimestampUs();
tdigest_init(&pTDigest);
for (int64_t n = 0; n < totalNum[m]; ++n) {
addTestData(data, m, dataTypes[j], pTDigest);
addDTestData(data, n, dataTypes[j], pTDigest);
}
double res = tdigestQuantile(pTDigest, 50/100);
double res = tdigestQuantile(pTDigest, ratio);
free(pTDigest);
useTime[i][j][m][0] = ((double)(testGetTimestampUs() - startu))/1000;
printf("Mode:%d,Type:%d,Num:%"PRId64",Used:%fms\n", dataMode[i], dataTypes[j], totalNum[m], useTime[i][j][m][0]);
useTime[0][i][j][m][0] = ((double)(testGetTimestampUs() - startu))/1000;
printf("DMode:%d,Type:%d,Num:%"PRId64",Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], useTime[0][i][j][m][0], res);
startu = testGetTimestampUs();
thistogram_init(&pHisto);
for (int64_t n = 0; n < totalNum[m]; ++n) {
addHTestData(data, n, dataTypes[j], pHisto);
}
double* res2 = thistogram_end(pHisto, &ratio, 1);
free(pHisto);
useTime[1][i][j][m][0] = ((double)(testGetTimestampUs() - startu))/1000;
printf("HMode:%d,Type:%d,Num:%"PRId64",Used:%fms\tRES:%f\n", dataMode[i], dataTypes[j], totalNum[m], useTime[1][i][j][m][0], *res2);
}
free(data);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册