提交 076c5ac7 编写于 作者: W wpan

fix bug

上级 1e7fe211
......@@ -28,9 +28,9 @@
#define ADDITION_CENTROID_NUM 100
#define COMPRESSION 400
#define GET_CENTROID(compression) (ceil(compression * M_PI / 2) + 1 + ADDITION_CENTROID_NUM) // addition 3 centroid
#define GET_CENTROID(compression) (ceil(compression * M_PI / 2) + 1 + ADDITION_CENTROID_NUM)
#define GET_THRESHOLD(compression) (7.5 + 0.37 * compression - 2e-4 * pow(compression, 2))
#define TDIGEST_SIZE(compression) (sizeof(TDigest) + sizeof(SCentroid)*GET_CENTROID(compression) + sizeof(SPt)*GET_THRESHOLD(compression))
#define TDIGEST_SIZE(compression) (sizeof(TDigest) + sizeof(SPt)*GET_THRESHOLD(compression))
typedef struct SCentroid {
double mean;
......@@ -64,6 +64,6 @@ void tdigestMerge(TDigest *t1, TDigest *t2);
double tdigestQuantile(TDigest *t, double q);
void tdigestCompress(TDigest *t);
void tdigestFreeFrom(TDigest *t);
void tdigestAutoFill(TDigest* t, int32_t compression);
void tdigestCopy(TDigest* dst, TDigest* src);
#endif /* TDIGEST_H */
......@@ -2510,8 +2510,7 @@ static void tdigest_merge(SQLFunctionCtx *pCtx) {
SAPercentileInfo *pOutput = getAPerctInfo(pCtx);
TDigest* pTDigest = pOutput->pTDigest;
if(pTDigest->num_centroids == 0) {
memcpy(pTDigest, pInput->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION));
tdigestAutoFill(pTDigest, COMPRESSION);
tdigestCopy(pTDigest, pInput->pTDigest);
} else {
tdigestMerge(pOutput->pTDigest, pInput->pTDigest);
}
......
......@@ -42,16 +42,20 @@ typedef struct SMergeArgs {
double max;
}SMergeArgs;
void tdigestAutoFill(TDigest* t, int32_t compression) {
t->centroids = (SCentroid*)((char*)t + sizeof(TDigest));
t->buffered_pts = (SPt*) ((char*)t + sizeof(TDigest) + sizeof(SCentroid) * (int32_t)GET_CENTROID(compression));
void tdigestCopy(TDigest* dst, TDigest* src) {
memcpy(dst, src, (size_t)TDIGEST_SIZE(COMPRESSION));
dst->centroids = (SCentroid*)malloc((int32_t)GET_CENTROID(COMPRESSION) * sizeof(SCentroid));
memcpy(dst->centroids, src->centroids, (int32_t)GET_CENTROID(COMPRESSION) * sizeof(SCentroid));
dst->buffered_pts = (SPt*) ((char*)dst + sizeof(TDigest));
}
TDigest *tdigestNewFrom(void* pBuf, int32_t compression) {
memset(pBuf, 0, TDIGEST_SIZE(compression));
TDigest* t = (TDigest*)pBuf;
tdigestAutoFill(t, compression);
t->centroids = (SCentroid*)calloc((int32_t)GET_CENTROID(compression), sizeof(SCentroid));
t->buffered_pts = (SPt*) ((char*)t + sizeof(TDigest));
t->compression = compression;
t->size = (int64_t)GET_CENTROID(compression);
t->threshold = (int32_t)GET_THRESHOLD(compression);
......@@ -98,29 +102,18 @@ static void mergeCentroid(SMergeArgs *args, SCentroid *merge) {
}
void tdigestCompress(TDigest *t) {
SCentroid *unmerged_centroids;
int64_t unmerged_weight = 0;
SCentroid *unmerged_centroids = (SCentroid *)t->buffered_pts;
int32_t num_unmerged = t->num_buffered_pts;
int32_t i, j;
SMergeArgs args = {0};
if (!t->num_buffered_pts)
if (t->num_buffered_pts <= 0)
return;
unmerged_centroids = (SCentroid*)malloc(sizeof(SCentroid) * t->num_buffered_pts);
for (i = 0; i < num_unmerged; i++) {
SPt *p = t->buffered_pts + i;
SCentroid *c = &unmerged_centroids[i];
c->mean = p->value;
c->weight = p->weight;
unmerged_weight += c->weight;
}
t->num_buffered_pts = 0;
t->total_weight += unmerged_weight;
qsort(unmerged_centroids, num_unmerged, sizeof(SCentroid), cmpCentroid);
args.centroids = (SCentroid*)malloc((size_t)(sizeof(SCentroid) * t->size));
memset(args.centroids, 0, (size_t)(sizeof(SCentroid) * t->size));
args.centroids = (SCentroid*)calloc(sizeof(SCentroid), t->size);
args.t = t;
args.min = INFINITY;
......@@ -146,7 +139,6 @@ void tdigestCompress(TDigest *t) {
mergeCentroid(&args, &unmerged_centroids[i++]);
assert(args.idx < (t->size));
}
free((void*)unmerged_centroids);
while (j < t->num_centroids) {
mergeCentroid(&args, &t->centroids[j++]);
......@@ -162,14 +154,8 @@ void tdigestCompress(TDigest *t) {
t->max = MAX(t->max, args.max);
}
static int32_t maxcentroids = t->size - 10;
if (t->num_centroids > maxcentroids) {
maxcentroids = t->num_centroids;
printf("maxcentroids:%d\n", maxcentroids);
}
memcpy(t->centroids, args.centroids, sizeof(SCentroid) * t->num_centroids);
free((void*)args.centroids);
tfree(t->centroids);
t->centroids = args.centroids;
}
void tdigestAdd(TDigest* t, double x, int64_t w) {
......@@ -180,6 +166,7 @@ void tdigestAdd(TDigest* t, double x, int64_t w) {
t->buffered_pts[i].value = x;
t->buffered_pts[i].weight = w;
t->num_buffered_pts++;
t->total_weight += w;
if (t->num_buffered_pts >= t->threshold)
tdigestCompress(t);
......
......@@ -175,13 +175,12 @@ void tdigestTest() {
SHistogramInfo *pHisto = NULL;
double ratio = 0.5;
int64_t totalNum[] = {100,10000,1000000000};
int64_t totalNum[] = {100,10000,10000000};
int32_t numTimes = sizeof(totalNum)/sizeof(totalNum[0]);
int64_t biggestNum = totalNum[numTimes - 1];
int32_t unitNum[] = {1,10,100,1000,5000,10000,100000};
int32_t unitTimes = sizeof(unitNum)/sizeof(unitNum[0]);
// int32_t dataMode[] = {TEST_DATA_MODE_SEQ, TEST_DATA_MODE_DSEQ, TEST_DATA_MODE_RAND_PER, TEST_DATA_MODE_RAND_LIMIT};
int32_t dataMode[] = {TEST_DATA_MODE_SEQ};
int32_t dataMode[] = {TEST_DATA_MODE_SEQ, TEST_DATA_MODE_DSEQ, TEST_DATA_MODE_RAND_PER, TEST_DATA_MODE_RAND_LIMIT};
int32_t modeTimes = sizeof(dataMode)/sizeof(dataMode[0]);
int32_t dataTypes[] = {TEST_DATA_TYPE_INT, TEST_DATA_TYPE_BIGINT, TEST_DATA_TYPE_FLOAT, TEST_DATA_TYPE_DOUBLE};
int32_t typeTimes = sizeof(dataTypes)/sizeof(dataTypes[0]);
......@@ -290,30 +289,48 @@ void tdigestTest() {
if (dataMode[i] == TEST_DATA_MODE_RAND_PER) {
for (int32_t p = 0; p < randPTimes; ++p) {
for (int32_t j = 0; j < typeTimes; ++j) {
printf("Mode:%d,Type:%d,randP:%d -", dataMode[i], dataTypes[j], randPers[p]);
printf("DMode:%d,Type:%d,randP:%d -", dataMode[i], dataTypes[j], randPers[p]);
for (int32_t m = 0; m < numTimes; ++m) {
printf(" %d:%f", totalNum[m], useTime[i][j][m][p]);
printf(" %d:%f", totalNum[m], useTime[0][i][j][m][p]);
}
printf("\n");
printf("HMode:%d,Type:%d,randP:%d -", dataMode[i], dataTypes[j], randPers[p]);
for (int32_t m = 0; m < numTimes; ++m) {
printf(" %d:%f", totalNum[m], useTime[1][i][j][m][p]);
}
printf("\n");
}
}
} else if (dataMode[i] == TEST_DATA_MODE_RAND_LIMIT) {
for (int32_t p = 0; p < randLTimes; ++p) {
for (int32_t j = 0; j < typeTimes; ++j) {
printf("Mode:%d,Type:%d,randL:%d -", dataMode[i], dataTypes[j], randLimits[p]);
printf("DMode:%d,Type:%d,randL:%d -", dataMode[i], dataTypes[j], randLimits[p]);
for (int64_t m = 0; m < numTimes; ++m) {
printf(" %d:%f", totalNum[m], useTime[i][j][m][p]);
printf(" %d:%f", totalNum[m], useTime[0][i][j][m][p]);
}
printf("\n");
printf("HMode:%d,Type:%d,randL:%d -", dataMode[i], dataTypes[j], randLimits[p]);
for (int64_t m = 0; m < numTimes; ++m) {
printf(" %d:%f", totalNum[m], useTime[1][i][j][m][p]);
}
printf("\n");
}
}
} else {
for (int32_t j = 0; j < typeTimes; ++j) {
printf("Mode:%d,Type:%d -", dataMode[i], dataTypes[j]);
printf("DMode:%d,Type:%d -", dataMode[i], dataTypes[j]);
for (int64_t m = 0; m < numTimes; ++m) {
printf(" %d:%f", totalNum[m], useTime[i][j][m][0]);
printf(" %d:%f", totalNum[m], useTime[0][i][j][m][0]);
}
printf("\n");
printf("HMode:%d,Type:%d -", dataMode[i], dataTypes[j]);
for (int64_t m = 0; m < numTimes; ++m) {
printf(" %d:%f", totalNum[m], useTime[1][i][j][m][0]);
}
printf("\n");
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册