tdigest.c 8.0 KB
Newer Older
G
Ganlin Zhao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

/*
 * src/tdigest.c
 *
 * Implementation of the t-digest data structure used to compute accurate percentiles.
 *
 * It is based on the MergingDigest implementation found at:
 *   https://github.com/tdunning/t-digest/blob/master/src/main/java/com/tdunning/math/stats/MergingDigest.java
 *
 * Copyright (c) 2016, Usman Masood <usmanm at fastmail dot fm>
 */

H
Hongze Cheng 已提交
27
#include "tdigest.h"
G
Ganlin Zhao 已提交
28 29 30 31 32
#include "os.h"
#include "osMath.h"

#define INTERPOLATE(x, x0, x1) (((x) - (x0)) / ((x1) - (x0)))
//#define INTEGRATED_LOCATION(compression, q)   ((compression) * (asin(2 * (q) - 1) + M_PI / 2) / M_PI)
H
Hongze Cheng 已提交
33 34
#define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (double)(q)-1) / M_PI + (double)1 / 2))
#define FLOAT_EQ(f1, f2)                    (fabs((f1) - (f2)) <= FLT_EPSILON)
G
Ganlin Zhao 已提交
35 36

typedef struct SMergeArgs {
H
Hongze Cheng 已提交
37 38 39 40 41 42 43 44 45 46 47 48
  TDigest   *t;
  SCentroid *centroids;
  int32_t    idx;
  double     weight_so_far;
  double     k1;
  double     min;
  double     max;
} SMergeArgs;

void tdigestAutoFill(TDigest *t, int32_t compression) {
  t->centroids = (SCentroid *)((char *)t + sizeof(TDigest));
  t->buffered_pts = (SPt *)((char *)t + sizeof(TDigest) + sizeof(SCentroid) * (int32_t)GET_CENTROID(compression));
G
Ganlin Zhao 已提交
49 50
}

H
Hongze Cheng 已提交
51 52 53 54
TDigest *tdigestNewFrom(void *pBuf, int32_t compression) {
  memset(pBuf, 0, (size_t)TDIGEST_SIZE(compression));
  TDigest *t = (TDigest *)pBuf;
  tdigestAutoFill(t, compression);
G
Ganlin Zhao 已提交
55

H
Hongze Cheng 已提交
56 57 58 59 60
  t->compression = compression;
  t->size = (int64_t)GET_CENTROID(compression);
  t->threshold = (int32_t)GET_THRESHOLD(compression);
  t->min = DOUBLE_MAX;
  t->max = -DOUBLE_MAX;
G
Ganlin Zhao 已提交
61

H
Hongze Cheng 已提交
62
  return t;
G
Ganlin Zhao 已提交
63 64 65
}

static int32_t cmpCentroid(const void *a, const void *b) {
H
Hongze Cheng 已提交
66 67 68 69 70
  SCentroid *c1 = (SCentroid *)a;
  SCentroid *c2 = (SCentroid *)b;
  if (c1->mean < c2->mean) return -1;
  if (c1->mean > c2->mean) return 1;
  return 0;
G
Ganlin Zhao 已提交
71 72 73
}

static void mergeCentroid(SMergeArgs *args, SCentroid *merge) {
H
Hongze Cheng 已提交
74 75 76 77 78 79 80 81 82
  double     k2;
  SCentroid *c = &args->centroids[args->idx];

  args->weight_so_far += merge->weight;
  k2 = INTEGRATED_LOCATION(args->t->size, args->weight_so_far / args->t->total_weight);
  // idx++
  if (k2 - args->k1 > 1 && c->weight > 0) {
    if (args->idx + 1 < args->t->size && merge->mean != args->centroids[args->idx].mean) {
      args->idx++;
G
Ganlin Zhao 已提交
83
    }
H
Hongze Cheng 已提交
84 85 86 87 88 89 90 91 92 93 94 95 96
    args->k1 = k2;
  }

  c = &args->centroids[args->idx];
  if (c->mean == merge->mean) {
    c->weight += merge->weight;
  } else {
    c->weight += merge->weight;
    c->mean += (merge->mean - c->mean) * merge->weight / c->weight;

    if (merge->weight > 0) {
      args->min = TMIN(merge->mean, args->min);
      args->max = TMAX(merge->mean, args->max);
G
Ganlin Zhao 已提交
97
    }
H
Hongze Cheng 已提交
98
  }
G
Ganlin Zhao 已提交
99 100 101
}

void tdigestCompress(TDigest *t) {
H
Hongze Cheng 已提交
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
  SCentroid *unmerged_centroids;
  int64_t    unmerged_weight = 0;
  int32_t    num_unmerged = t->num_buffered_pts;
  int32_t    i, j;
  SMergeArgs args;

  if (t->num_buffered_pts <= 0) return;

  unmerged_centroids = (SCentroid *)taosMemoryMalloc(sizeof(SCentroid) * t->num_buffered_pts);
  for (i = 0; i < num_unmerged; i++) {
    SPt       *p = t->buffered_pts + i;
    SCentroid *c = &unmerged_centroids[i];
    c->mean = p->value;
    c->weight = p->weight;
    unmerged_weight += c->weight;
  }
  t->num_buffered_pts = 0;
  t->total_weight += unmerged_weight;

  taosSort(unmerged_centroids, num_unmerged, sizeof(SCentroid), cmpCentroid);
  memset(&args, 0, sizeof(SMergeArgs));
  args.centroids = (SCentroid *)taosMemoryMalloc((size_t)(sizeof(SCentroid) * t->size));
  memset(args.centroids, 0, (size_t)(sizeof(SCentroid) * t->size));

  args.t = t;
  args.min = DOUBLE_MAX;
  args.max = -DOUBLE_MAX;

  i = 0;
  j = 0;
  while (i < num_unmerged && j < t->num_centroids) {
    SCentroid *a = &unmerged_centroids[i];
    SCentroid *b = &t->centroids[j];

    if (a->mean <= b->mean) {
      mergeCentroid(&args, a);
      assert(args.idx < t->size);
      i++;
    } else {
      mergeCentroid(&args, b);
      assert(args.idx < t->size);
      j++;
G
Ganlin Zhao 已提交
144
    }
H
Hongze Cheng 已提交
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
  }

  while (i < num_unmerged) {
    mergeCentroid(&args, &unmerged_centroids[i++]);
    assert(args.idx < t->size);
  }
  taosMemoryFree((void *)unmerged_centroids);

  while (j < t->num_centroids) {
    mergeCentroid(&args, &t->centroids[j++]);
    assert(args.idx < t->size);
  }

  if (t->total_weight > 0) {
    t->min = TMIN(t->min, args.min);
    if (args.centroids[args.idx].weight <= 0) {
      args.idx--;
G
Ganlin Zhao 已提交
162
    }
H
Hongze Cheng 已提交
163 164 165
    t->num_centroids = args.idx + 1;
    t->max = TMAX(t->max, args.max);
  }
G
Ganlin Zhao 已提交
166

H
Hongze Cheng 已提交
167 168 169
  memcpy(t->centroids, args.centroids, sizeof(SCentroid) * t->num_centroids);
  taosMemoryFree((void *)args.centroids);
}
G
Ganlin Zhao 已提交
170

H
Hongze Cheng 已提交
171 172
void tdigestAdd(TDigest *t, double x, int64_t w) {
  if (w == 0) return;
G
Ganlin Zhao 已提交
173

H
Hongze Cheng 已提交
174 175 176 177 178 179 180 181
  int32_t i = t->num_buffered_pts;
  if (i > 0 && t->buffered_pts[i - 1].value == x) {
    t->buffered_pts[i].weight = w;
  } else {
    t->buffered_pts[i].value = x;
    t->buffered_pts[i].weight = w;
    t->num_buffered_pts++;
  }
G
Ganlin Zhao 已提交
182

H
Hongze Cheng 已提交
183
  if (t->num_buffered_pts >= t->threshold) tdigestCompress(t);
G
Ganlin Zhao 已提交
184 185
}

H
Hongze Cheng 已提交
186 187
double tdigestCDF(TDigest *t, double x) {
  if (t == NULL) return 0;
G
Ganlin Zhao 已提交
188

H
Hongze Cheng 已提交
189 190 191 192
  int32_t    i;
  double     left, right;
  int64_t    weight_so_far;
  SCentroid *a, *b, tmp;
G
Ganlin Zhao 已提交
193

H
Hongze Cheng 已提交
194 195 196 197 198 199
  tdigestCompress(t);
  if (t->num_centroids == 0) return NAN;
  if (x < t->min) return 0;
  if (x > t->max) return 1;
  if (t->num_centroids == 1) {
    if (FLOAT_EQ(t->max, t->min)) return 0.5;
G
Ganlin Zhao 已提交
200

H
Hongze Cheng 已提交
201 202
    return INTERPOLATE(x, t->min, t->max);
  }
G
Ganlin Zhao 已提交
203

H
Hongze Cheng 已提交
204 205 206 207 208
  weight_so_far = 0;
  a = b = &tmp;
  b->mean = t->min;
  b->weight = 0;
  right = 0;
G
Ganlin Zhao 已提交
209

H
Hongze Cheng 已提交
210 211
  for (i = 0; i < t->num_centroids; i++) {
    SCentroid *c = &t->centroids[i];
G
Ganlin Zhao 已提交
212 213 214

    left = b->mean - (a->mean + right);
    a = b;
H
Hongze Cheng 已提交
215 216
    b = c;
    right = (b->mean - a->mean) * a->weight / (a->weight + b->weight);
G
Ganlin Zhao 已提交
217 218

    if (x < a->mean + right) {
H
Hongze Cheng 已提交
219 220
      double cdf = (weight_so_far + a->weight * INTERPOLATE(x, a->mean - left, a->mean + right)) / t->total_weight;
      return TMAX(cdf, 0.0);
G
Ganlin Zhao 已提交
221 222
    }

H
Hongze Cheng 已提交
223 224 225 226 227 228 229 230 231 232 233 234
    weight_so_far += a->weight;
  }

  left = b->mean - (a->mean + right);
  a = b;
  right = t->max - a->mean;

  if (x < a->mean + right) {
    return (weight_so_far + a->weight * INTERPOLATE(x, a->mean - left, a->mean + right)) / t->total_weight;
  }

  return 1;
G
Ganlin Zhao 已提交
235 236 237
}

double tdigestQuantile(TDigest *t, double q) {
H
Hongze Cheng 已提交
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
  if (t == NULL) return 0;

  int32_t    i;
  double     left, right, idx;
  int64_t    weight_so_far;
  SCentroid *a, *b, tmp;

  tdigestCompress(t);
  if (t->num_centroids == 0) return NAN;
  if (t->num_centroids == 1) return t->centroids[0].mean;
  if (FLOAT_EQ(q, 0.0)) return t->min;
  if (FLOAT_EQ(q, 1.0)) return t->max;

  idx = q * t->total_weight;
  weight_so_far = 0;
  b = &tmp;
  b->mean = t->min;
  b->weight = 0;
  right = t->min;

  for (i = 0; i < t->num_centroids; i++) {
    SCentroid *c = &t->centroids[i];
G
Ganlin Zhao 已提交
260
    a = b;
H
Hongze Cheng 已提交
261
    left = right;
G
Ganlin Zhao 已提交
262

H
Hongze Cheng 已提交
263 264 265 266 267
    b = c;
    right = (b->weight * a->mean + a->weight * b->mean) / (a->weight + b->weight);
    if (idx < weight_so_far + a->weight) {
      double p = (idx - weight_so_far) / a->weight;
      return left * (1 - p) + right * p;
G
Ganlin Zhao 已提交
268
    }
H
Hongze Cheng 已提交
269 270 271 272 273 274
    weight_so_far += a->weight;
  }

  left = right;
  a = b;
  right = t->max;
G
Ganlin Zhao 已提交
275

H
Hongze Cheng 已提交
276 277 278 279 280 281
  if (idx < weight_so_far + a->weight && a->weight != 0) {
    double p = (idx - weight_so_far) / a->weight;
    return left * (1 - p) + right * p;
  }

  return t->max;
G
Ganlin Zhao 已提交
282 283 284
}

void tdigestMerge(TDigest *t1, TDigest *t2) {
H
Hongze Cheng 已提交
285 286 287 288 289 290 291 292 293 294 295
  // SPoints
  int32_t num_pts = t2->num_buffered_pts;
  for (int32_t i = num_pts - 1; i >= 0; i--) {
    SPt *p = t2->buffered_pts + i;
    tdigestAdd(t1, p->value, p->weight);
    t2->num_buffered_pts--;
  }
  // centroids
  for (int32_t i = 0; i < t2->num_centroids; i++) {
    tdigestAdd(t1, t2->centroids[i].mean, t2->centroids[i].weight);
  }
G
Ganlin Zhao 已提交
296
}