hyperloglog.c 24.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
/* hyperloglog.c - Redis HyperLogLog probabilistic cardinality approximation.
 * This file implements the algorithm and the exported Redis commands.
 *
 * Copyright (c) 2014, Salvatore Sanfilippo <antirez at gmail dot com>
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   * Redistributions of source code must retain the above copyright notice,
 *     this list of conditions and the following disclaimer.
 *   * Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *   * Neither the name of Redis nor the names of its contributors may be used
 *     to endorse or promote products derived from this software without
 *     specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

32 33
#include "redis.h"

A
antirez 已提交
34
#include <stdint.h>
A
antirez 已提交
35
#include <math.h>
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54

/* The Redis HyperLogLog implementation is based on the following ideas:
 *
 * * The use of a 64 bit hash function as proposed in [1], in order to don't
 *   limited to cardinalities up to 10^9, at the cost of just 1 additional
 *   bit per register.
 * * The use of 16384 6-bit registers for a great level of accuracy, using
 *   a total of 12k per key.
 * * The use of the Redis string data type. No new type is introduced.
 * * No attempt is made to compress the data structure as in [1]. Also the
 *   algorithm used is the original HyperLogLog Algorithm as in [2], with
 *   the only difference that a 64 bit hash function is used, so no correction
 *   is performed for values near 2^32 as in [1].
 *
 * [1] Heule, Nunkesser, Hall: HyperLogLog in Practice: Algorithmic
 *     Engineering of a State of The Art Cardinality Estimation Algorithm.
 *
 * [2] P. Flajolet, Éric Fusy, O. Gandouet, and F. Meunier. Hyperloglog: The
 *     analysis of a near-optimal cardinality estimation algorithm.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
 *
 * The representation used by Redis is the following:
 *
 * +--------+--------+--------+------//      //--+---------------------+
 * |11000000|22221111|33333322|55444444 ....     | 64 bit cardinality  |
 * +--------+--------+--------+------//      //--+---------------------+
 *
 * The 6 bits counters are encoded one after the other starting from the
 * LSB to the MSB, and using the next bytes as needed.
 *
 * At the end of the 16k counters, there is an additional 64 bit integer
 * stored in little endian format with the latest cardinality computed that
 * can be reused if the data structure was not modified since the last
 * computation (this is useful because there are high probabilities that
 * HLLADD operations don't modify the actual data structure and hence the
 * approximated cardinality).
 *
 * When the most significant bit in the most significant byte of the cached
 * cardinality is set, it means that the data structure was modified and
 * we can't reuse the cached value that must be recomputed. */
75

76 77
#define REDIS_HLL_P 14 /* The greater is P, the smaller the error. */
#define REDIS_HLL_REGISTERS (1<<REDIS_HLL_P) /* With P=14, 16384 registers. */
78
#define REDIS_HLL_P_MASK (REDIS_HLL_REGISTERS-1) /* Mask to index register. */
79
#define REDIS_HLL_BITS 6 /* Enough to count up to 63 leading zeroes. */
A
antirez 已提交
80
#define REDIS_HLL_REGISTER_MAX ((1<<REDIS_HLL_BITS)-1)
81 82 83
/* Note: REDIS_HLL_SIZE define has a final "+8" since we store a 64 bit
 * integer at the end of the HyperLogLog structure to cache the cardinality. */
#define REDIS_HLL_SIZE ((REDIS_HLL_REGISTERS*REDIS_HLL_BITS+7)/8)+8
84 85 86 87 88 89 90

/* =========================== Low level bit macros ========================= */

/* We need to get and set 6 bit counters in an array of 8 bit bytes.
 * We use macros to make sure the code is inlined since speed is critical
 * especially in order to compute the approximated cardinality in
 * HLLCOUNT where we need to access all the registers at once.
A
antirez 已提交
91
 * For the same reason we also want to avoid conditionals in this code path.
92 93
 *
 * +--------+--------+--------+------//
94
 * |11000000|22221111|33333322|55444444
95 96
 * +--------+--------+--------+------//
 *
97 98 99
 * Note: in the above representation the most significant bit (MSB)
 * of every byte is on the left. We start using bits from the LSB to MSB,
 * and so forth passing to the next byte.
100
 *
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
 * Example, we want to access to counter at pos = 1 ("111111" in the
 * illustration above).
 *
 * The index of the first byte b0 containing our data is:
 *
 *  b0 = 6 * pos / 8 = 0
 *
 *   +--------+
 *   |11000000|  <- Our byte at b0
 *   +--------+
 *
 * The position of the first bit (counting from the LSB = 0) in the byte
 * is given by:
 *
 *  fb = 6 * pos % 8 -> 6
 *
 * Right shift b0 of 'fb' bits.
118 119
 *
 *   +--------+
120 121
 *   |11000000|  <- Initial value of b0
 *   |00000011|  <- After right shift of 6 pos.
122 123
 *   +--------+
 *
124
 * Left shift b1 of bits 8-fb bits (2 bits)
125 126
 *
 *   +--------+
127 128
 *   |22221111|  <- Initial value of b1
 *   |22111100|  <- After left shift of 2 bits.
129 130
 *   +--------+
 *
131 132
 * OR the two bits, and finally AND with 111111 (63 in decimal) to
 * clean the higher order bits we are not interested in:
133 134
 *
 *   +--------+
135 136 137 138
 *   |00000011|  <- b0 right shifted
 *   |22111100|  <- b1 left shifted
 *   |22111111|  <- b0 OR b1
 *   |  111111|  <- (b0 OR b1) AND 63, our value.
139 140
 *   +--------+
 *
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
 * We can try with a different example, like pos = 0. In this case
 * the 6-bit counter is actually contained in a single byte.
 *
 *  b0 = 6 * pos / 8 = 0
 *
 *   +--------+
 *   |11000000|  <- Our byte at b0
 *   +--------+
 *
 *  fb = 6 * pos % 8 = 0
 *
 *  So we right shift of 0 bits (no shift in practice) and
 *  left shift the next byte of 8 bits, even if we don't use it,
 *  but this has the effect of clearing the bits so the result
 *  will not be affacted after the OR.
156 157 158 159 160 161 162 163 164
 *
 * -------------------------------------------------------------------------
 *
 * Setting the register is a bit more complex, let's assume that 'val'
 * is the value we want to set, already in the right range.
 *
 * We need two steps, in one we need to clear the bits, and in the other
 * we need to bitwise-OR the new bits.
 *
165
 * Let's try with 'pos' = 1, so our first byte at 'b' is 0,
A
antirez 已提交
166
 *
167
 * "fb" is 6 in this case.
168 169
 *
 *   +--------+
170
 *   |11000000|  <- Our byte at b0
171 172
 *   +--------+
 *
A
antirez 已提交
173
 * To create a AND-mask to clear the bits about this position, we just
174 175
 * initialize the mask with the value 63, left shift it of "fs" bits,
 * and finally invert the result.
176 177
 *
 *   +--------+
178 179 180
 *   |00111111|  <- "mask" starts at 63
 *   |11000000|  <- "mask" after left shift of "ls" bits.
 *   |00111111|  <- "mask" after invert.
181 182 183
 *   +--------+
 *
 * Now we can bitwise-AND the byte at "b" with the mask, and bitwise-OR
184
 * it with "val" left-shifted of "ls" bits to set the new bits.
185
 *
186
 * Now let's focus on the next byte b1:
187 188
 *
 *   +--------+
189
 *   |22221111|  <- Initial value of b1
190 191
 *   +--------+
 *
192 193
 * To build the AND mask we start again with the 63 value, right shift
 * it by 8-fb bits, and invert it.
194 195
 *
 *   +--------+
196 197 198
 *   |00111111|  <- "mask" set at 2&6-1
 *   |00001111|  <- "mask" after the right shift by 8-fb = 2 bits
 *   |11110000|  <- "mask" after bitwise not.
199 200 201 202 203 204 205 206 207 208 209 210 211 212
 *   +--------+
 *
 * Now we can mask it with b+1 to clear the old bits, and bitwise-OR
 * with "val" left-shifted by "rs" bits to set the new value.
 */

/* Note: if we access the last counter, we will also access the b+1 byte
 * that is out of the array, but sds strings always have an implicit null
 * term, so the byte exists, and we can skip the conditional (or the need
 * to allocate 1 byte more explicitly). */

/* Store the value of the register at position 'regnum' into variable 'target'.
 * 'p' is an array of unsigned bytes. */
#define HLL_GET_REGISTER(target,p,regnum) do { \
A
antirez 已提交
213
    uint8_t *_p = (uint8_t*) p; \
214
    unsigned long _byte = regnum*REDIS_HLL_BITS/8; \
215 216 217 218 219
    unsigned long _fb = regnum*REDIS_HLL_BITS&7; \
    unsigned long _fb8 = 8 - _fb; \
    unsigned long b0 = _p[_byte]; \
    unsigned long b1 = _p[_byte+1]; \
    target = ((b0 >> _fb) | (b1 << _fb8)) & REDIS_HLL_REGISTER_MAX; \
220 221 222 223
} while(0)

/* Set the value of the register at position 'regnum' to 'val'.
 * 'p' is an array of unsigned bytes. */
A
antirez 已提交
224
#define HLL_SET_REGISTER(p,regnum,val) do { \
A
antirez 已提交
225
    uint8_t *_p = (uint8_t*) p; \
226
    unsigned long _byte = regnum*REDIS_HLL_BITS/8; \
227 228 229 230 231 232 233
    unsigned long _fb = regnum*REDIS_HLL_BITS&7; \
    unsigned long _fb8 = 8 - _fb; \
    unsigned long _v = val; \
    _p[_byte] &= ~(REDIS_HLL_REGISTER_MAX << _fb); \
    _p[_byte] |= _v << _fb; \
    _p[_byte+1] &= ~(REDIS_HLL_REGISTER_MAX >> _fb8); \
    _p[_byte+1] |= _v >> _fb8; \
234 235 236 237
} while(0)

/* ========================= HyperLogLog algorithm  ========================= */

238 239 240
/* Our hash function is MurmurHash2, 64 bit version.
 * It was modified for Redis in order to provide the same result in
 * big and little endian archs (endian neutral). */
A
antirez 已提交
241 242 243 244
uint64_t MurmurHash64A (const void * key, int len, unsigned int seed) {
    const uint64_t m = 0xc6a4a7935bd1e995;
    const int r = 47;
    uint64_t h = seed ^ (len * m);
245 246
    const uint8_t *data = (const uint8_t *)key;
    const uint8_t *end = data + (len-(len&7));
A
antirez 已提交
247 248

    while(data != end) {
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
        uint64_t k;

#if (BYTE_ORDER == LITTLE_ENDIAN)
        k = *((uint64_t*)data);
#else
        k = (uint64_t) data[0];
        k |= (uint64_t) data[1] << 8;
        k |= (uint64_t) data[2] << 16;
        k |= (uint64_t) data[3] << 24;
        k |= (uint64_t) data[4] << 32;
        k |= (uint64_t) data[5] << 40;
        k |= (uint64_t) data[6] << 48;
        k |= (uint64_t) data[7] << 56;
#endif

A
antirez 已提交
264 265 266 267 268
        k *= m;
        k ^= k >> r;
        k *= m;
        h ^= k;
        h *= m;
269
        data += 8;
A
antirez 已提交
270 271 272
    }

    switch(len & 7) {
273 274 275 276 277 278 279
    case 7: h ^= (uint64_t)data[6] << 48;
    case 6: h ^= (uint64_t)data[5] << 40;
    case 5: h ^= (uint64_t)data[4] << 32;
    case 4: h ^= (uint64_t)data[3] << 24;
    case 3: h ^= (uint64_t)data[2] << 16;
    case 2: h ^= (uint64_t)data[1] << 8;
    case 1: h ^= (uint64_t)data[0];
A
antirez 已提交
280 281 282 283 284 285 286 287 288
            h *= m;
    };

    h ^= h >> r;
    h *= m;
    h ^= h >> r;
    return h;
}

289 290 291 292 293 294 295 296 297 298 299
/* "Add" the element in the hyperloglog data structure.
 * Actually nothing is added, but the max 0 pattern counter of the subset
 * the element belongs to is incremented if needed.
 *
 * 'registers' is expected to have room for REDIS_HLL_REGISTERS plus an
 * additional byte on the right. This requirement is met by sds strings
 * automatically since they are implicitly null terminated.
 *
 * The function always succeed, however if as a result of the operation
 * the approximated cardinality changed, 1 is returned. Otherwise 0
 * is returned. */
A
antirez 已提交
300
int hllAdd(uint8_t *registers, unsigned char *ele, size_t elesize) {
301 302 303 304 305
    uint64_t hash, bit, index;
    uint8_t oldcount, count;

    /* Count the number of zeroes starting from bit REDIS_HLL_REGISTERS
     * (that is a power of two corresponding to the first bit we don't use
306 307 308 309 310 311
     * as index). The max run can be 64-P+1 bits.
     *
     * Note that the final "1" ending the sequence of zeroes must be
     * included in the count, so if we find "001" the count is 3, and
     * the smallest count possible is no zeroes at all, just a 1 bit
     * at the first position, that is a count of 1.
312 313 314
     *
     * This may sound like inefficient, but actually in the average case
     * there are high probabilities to find a 1 after a few iterations. */
A
antirez 已提交
315
    hash = MurmurHash64A(ele,elesize,0);
316
    bit = REDIS_HLL_REGISTERS;
317
    count = 1;
318 319 320 321 322 323 324 325 326
    while((hash & bit) == 0) {
        count++;
        /* Test the next bit. Note that if we run out of bits in the 64
         * bit integer, bit will be set to 0, and the while test will fail,
         * so we can save the explicit check and yet the algorithm will
         * terminate. */
        bit <<= 1;
    }

A
antirez 已提交
327
    /* Update the register if this element produced a longer run of zeroes. */
328 329 330 331 332 333 334 335 336 337
    index = hash & REDIS_HLL_P_MASK; /* Index a register inside registers. */
    HLL_GET_REGISTER(oldcount,registers,index);
    if (count > oldcount) {
        HLL_SET_REGISTER(registers,index,count);
        return 1;
    } else {
        return 0;
    }
}

A
antirez 已提交
338 339 340 341 342
/* Return the approximated cardinality of the set based on the armonic
 * mean of the registers values. */
uint64_t hllCount(uint8_t *registers) {
    double m = REDIS_HLL_REGISTERS;
    double alpha = 0.7213/(1+1.079/m);
343
    double E = 0;
A
antirez 已提交
344 345 346
    int ez = 0; /* Number of registers equal to 0. */
    int j;

347 348 349 350 351 352 353 354 355 356 357 358 359
    /* We precompute 2^(-reg[j]) in a small table in order to
     * speedup the computation of SUM(2^-register[0..i]). */
    static int initialized = 0;
    static double PE[64];
    if (!initialized) {
        PE[0] = 1; /* 2^(-reg[j]) is 1 when m is 0. */
        for (j = 1; j < 64; j++) {
            /* 2^(-reg[j]) is the same as 1/2^reg[j]. */
            PE[j] = 1.0/(1ULL << j);
        }
        initialized = 1;
    }

360 361 362 363
    /* Compute SUM(2^-register[0..i]).
     * Redis default is to use 16384 registers 6 bits each. The code works
     * with other values by modifying the defines, but for our target value
     * we take a faster path with unrolled loops. */
364
    if (REDIS_HLL_REGISTERS == 16384 && REDIS_HLL_BITS == 6) {
365 366 367 368 369 370
        uint8_t *r = registers;
        unsigned long r0, r1, r2, r3, r4, r5, r6, r7, r8, r9,
                      r10, r11, r12, r13, r14, r15;
        for (j = 0; j < 1024; j++) {
            /* Handle 16 registers per iteration. */
            r0 = r[0] & 63; if (r0 == 0) ez++;
371 372 373
            r1 = (r[0] >> 6 | r[1] << 2) & 63; if (r1 == 0) ez++;
            r2 = (r[1] >> 4 | r[2] << 4) & 63; if (r2 == 0) ez++;
            r3 = (r[2] >> 2) & 63; if (r3 == 0) ez++;
374
            r4 = r[3] & 63; if (r4 == 0) ez++;
375 376 377
            r5 = (r[3] >> 6 | r[4] << 2) & 63; if (r5 == 0) ez++;
            r6 = (r[4] >> 4 | r[5] << 4) & 63; if (r6 == 0) ez++;
            r7 = (r[5] >> 2) & 63; if (r7 == 0) ez++;
378
            r8 = r[6] & 63; if (r8 == 0) ez++;
379 380 381
            r9 = (r[6] >> 6 | r[7] << 2) & 63; if (r9 == 0) ez++;
            r10 = (r[7] >> 4 | r[8] << 4) & 63; if (r10 == 0) ez++;
            r11 = (r[8] >> 2) & 63; if (r11 == 0) ez++;
382
            r12 = r[9] & 63; if (r12 == 0) ez++;
383 384 385
            r13 = (r[9] >> 6 | r[10] << 2) & 63; if (r13 == 0) ez++;
            r14 = (r[10] >> 4 | r[11] << 4) & 63; if (r14 == 0) ez++;
            r15 = (r[11] >> 2) & 63; if (r15 == 0) ez++;
A
antirez 已提交
386

387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
            /* Additional parens will allow the compiler to optimize the
             * code more with a loss of precision that is not very relevant
             * here (floating point math is not commutative!). */
            E += (PE[r0] + PE[r1]) + (PE[r2] + PE[r3]) + (PE[r4] + PE[r5]) +
                 (PE[r6] + PE[r7]) + (PE[r8] + PE[r9]) + (PE[r10] + PE[r11]) +
                 (PE[r12] + PE[r13]) + (PE[r14] + PE[r15]);
            r += 12;
        }
    } else {
        for (j = 0; j < REDIS_HLL_REGISTERS; j++) {
            unsigned long reg;

            HLL_GET_REGISTER(reg,registers,j);
            if (reg == 0) {
                ez++;
                E += 1; /* 2^(-reg[j]) is 1 when m is 0. */
            } else {
                E += PE[reg]; /* Precomputed 2^(-reg[j]). */
            }
A
antirez 已提交
406 407 408 409 410
        }
    }
    /* Muliply the inverse of E for alpha_m * m^2 to have the raw estimate. */
    E = (1/E)*alpha*m*m;

411 412 413 414 415 416
    /* Use the LINEARCOUNTING algorithm for small cardinalities.
     * For larger values but up to 72000 HyperLogLog raw approximation is
     * used since linear counting error starts to increase. However HyperLogLog
     * shows a strong bias in the range 2.5*16384 - 72000, so we try to
     * compensate for it. */
    if (E < m*2.5 && ez != 0) {
A
antirez 已提交
417
        E = m*log(m/ez); /* LINEARCOUNTING() */
418 419 420 421 422 423 424 425 426 427 428
    } else if (m == 16384 && E < 72000) {
        /* We did polynomial regression of the bias for this range, this
         * way we can compute the bias for a given cardinality and correct
         * according to it. Only apply the correction for P=14 that's what
         * we use and the value the correction was verified with. */
        double bias = 5.9119*1.0e-18*(E*E*E*E)
                      -1.4253*1.0e-12*(E*E*E)+
                      1.2940*1.0e-7*(E*E)
                      -5.2921*1.0e-3*E+
                      83.3216;
        E -= E*(bias/100);
A
antirez 已提交
429 430 431 432 433 434 435 436
    }
    /* We don't apply the correction for E > 1/30 of 2^32 since we use
     * a 64 bit function and 6 bit counters. To apply the correction for
     * 1/30 of 2^64 is not needed since it would require a huge set
     * to approach such a value. */
    return (uint64_t) E;
}

437 438
/* ========================== HyperLogLog commands ========================== */

439 440
/* PFADD var ele ele ele ... ele => :0 or :1 */
void pfaddCommand(redisClient *c) {
A
antirez 已提交
441 442 443 444 445 446 447 448 449 450
    robj *o = lookupKeyWrite(c->db,c->argv[1]);
    uint8_t *registers;
    int updated = 0, j;

    if (o == NULL) {
        /* Create the key with a string value of the exact length to
         * hold our HLL data structure. sdsnewlen() when NULL is passed
         * is guaranteed to return bytes initialized to zero. */
        o = createObject(REDIS_STRING,sdsnewlen(NULL,REDIS_HLL_SIZE));
        dbAdd(c->db,c->argv[1],o);
451
        updated++;
A
antirez 已提交
452 453 454 455 456 457 458 459 460
    } else {
        /* Key exists, check type */
        if (checkType(c,o,REDIS_STRING))
            return;

        /* If this is a string representing an HLL, the size should match
         * exactly. */
        if (stringObjectLen(o) != REDIS_HLL_SIZE) {
            addReplyErrorFormat(c,
461
                "PFADD target key must contain a %d bytes string.",
A
antirez 已提交
462 463 464
                REDIS_HLL_SIZE);
            return;
        }
465
        o = dbUnshareStringValue(c->db,c->argv[1],o);
A
antirez 已提交
466 467
    }
    /* Perform the low level ADD operation for every element. */
A
antirez 已提交
468
    registers = o->ptr;
A
antirez 已提交
469 470 471 472 473 474 475 476 477
    for (j = 2; j < c->argc; j++) {
        if (hllAdd(registers, (unsigned char*)c->argv[j]->ptr,
                sdslen(c->argv[j]->ptr)))
        {
            updated++;
        }
    }
    if (updated) {
        signalModifiedKey(c->db,c->argv[1]);
478
        notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"pfadd",c->argv[1],c->db->id);
A
antirez 已提交
479
        server.dirty++;
480 481
        /* Invalidate the cached cardinality. */
        registers[REDIS_HLL_SIZE-1] |= (1<<7);
A
antirez 已提交
482 483 484 485
    }
    addReply(c, updated ? shared.cone : shared.czero);
}

486 487
/* PFCOUNT var -> approximated cardinality of set. */
void pfcountCommand(redisClient *c) {
A
antirez 已提交
488 489
    robj *o = lookupKeyRead(c->db,c->argv[1]);
    uint8_t *registers;
490
    uint64_t card;
A
antirez 已提交
491 492 493

    if (o == NULL) {
        /* No key? Cardinality is zero since no element was added, otherwise
494
         * we would have a key as HLLADD creates it as a side effect. */
A
antirez 已提交
495 496 497 498 499 500 501 502 503 504
        addReply(c,shared.czero);
    } else {
        /* Key exists, check type */
        if (checkType(c,o,REDIS_STRING))
            return;

        /* If this is a string representing an HLL, the size should match
         * exactly. */
        if (stringObjectLen(o) != REDIS_HLL_SIZE) {
            addReplyErrorFormat(c,
505
                "PFCOUNT target key must contain a %d bytes string.",
A
antirez 已提交
506 507 508
                REDIS_HLL_SIZE);
            return;
        }
509 510

        /* Check if the cached cardinality is valid. */
A
antirez 已提交
511
        registers = o->ptr;
512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532
        if ((registers[REDIS_HLL_SIZE-1] & (1<<7)) == 0) {
            /* Just return the cached value. */
            card = (uint64_t)registers[REDIS_HLL_SIZE-8];
            card |= (uint64_t)registers[REDIS_HLL_SIZE-7] << 8;
            card |= (uint64_t)registers[REDIS_HLL_SIZE-6] << 16;
            card |= (uint64_t)registers[REDIS_HLL_SIZE-5] << 24;
            card |= (uint64_t)registers[REDIS_HLL_SIZE-4] << 32;
            card |= (uint64_t)registers[REDIS_HLL_SIZE-3] << 40;
            card |= (uint64_t)registers[REDIS_HLL_SIZE-2] << 48;
            card |= (uint64_t)registers[REDIS_HLL_SIZE-1] << 56;
        } else {
            /* Recompute it and update the cached value. */
            card = hllCount(registers);
            registers[REDIS_HLL_SIZE-8] = card & 0xff;
            registers[REDIS_HLL_SIZE-7] = (card >> 8) & 0xff;
            registers[REDIS_HLL_SIZE-6] = (card >> 16) & 0xff;
            registers[REDIS_HLL_SIZE-5] = (card >> 24) & 0xff;
            registers[REDIS_HLL_SIZE-4] = (card >> 32) & 0xff;
            registers[REDIS_HLL_SIZE-3] = (card >> 40) & 0xff;
            registers[REDIS_HLL_SIZE-2] = (card >> 48) & 0xff;
            registers[REDIS_HLL_SIZE-1] = (card >> 56) & 0xff;
533 534 535 536 537 538
            /* This is not considered a read-only command even if the
             * data structure is not modified, since the cached value
             * may be modified and given that the HLL is a Redis string
             * we need to propagate the change. */
            signalModifiedKey(c->db,c->argv[1]);
            server.dirty++;
539 540
        }
        addReplyLongLong(c,card);
A
antirez 已提交
541 542 543
    }
}

544 545
/* PFMERGE dest src1 src2 src3 ... srcN => OK */
void pfmergeCommand(redisClient *c) {
A
antirez 已提交
546 547
    uint8_t max[REDIS_HLL_REGISTERS];
    uint8_t *registers;
548
    int j, i;
A
antirez 已提交
549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564

    /* Compute an HLL with M[i] = MAX(M[i]_j).
     * We we the maximum into the max array of registers. We'll write
     * it to the target variable later. */
    memset(max,0,sizeof(max));
    for (j = 1; j < c->argc; j++) {
        uint8_t val;

        /* Check type and size. */
        robj *o = lookupKeyRead(c->db,c->argv[j]);
        if (o == NULL) continue; /* Assume empty HLL for non existing var. */
        if (checkType(c,o,REDIS_STRING))
            return;

        if (stringObjectLen(o) != REDIS_HLL_SIZE) {
            addReplyErrorFormat(c,
565
                "PFADD target key must contain a %d bytes string.",
A
antirez 已提交
566 567 568 569
                REDIS_HLL_SIZE);
            return;
        }

570 571
        /* Merge with this HLL with our 'max' HHL by setting max[i]
         * to MAX(max[i],hll[i]). */
A
antirez 已提交
572
        registers = o->ptr;
573 574 575 576
        for (i = 0; i < REDIS_HLL_REGISTERS; i++) {
            HLL_GET_REGISTER(val,registers,i);
            if (val > max[i]) max[i] = val;
        }
A
antirez 已提交
577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604
    }

    /* Create / unshare the destination key's value if needed. */
    robj *o = lookupKeyRead(c->db,c->argv[1]);
    if (o == NULL) {
        /* Create the key with a string value of the exact length to
         * hold our HLL data structure. sdsnewlen() when NULL is passed
         * is guaranteed to return bytes initialized to zero. */
        o = createObject(REDIS_STRING,sdsnewlen(NULL,REDIS_HLL_SIZE));
        dbAdd(c->db,c->argv[1],o);
    } else {
        /* If key exists we are sure it's of the right type/size
         * since we checked when merging the different HLLs, so we
         * don't check again. */
        o = dbUnshareStringValue(c->db,c->argv[1],o);
    }

    /* Write the resulting HLL to the destination HLL registers and
     * invalidate the cached value. */
    registers = o->ptr;
    for (j = 0; j < REDIS_HLL_REGISTERS; j++) {
        HLL_SET_REGISTER(registers,j,max[j]);
    }
    registers[REDIS_HLL_SIZE-1] |= (1<<7);

    signalModifiedKey(c->db,c->argv[1]);
    /* We generate an HLLADD event for HLLMERGE for semantical simplicity
     * since in theory this is a mass-add of elements. */
605
    notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"pfadd",c->argv[1],c->db->id);
A
antirez 已提交
606 607 608 609
    server.dirty++;
    addReply(c,shared.ok);
}

A
antirez 已提交
610 611 612 613 614 615 616 617
/* This command performs a self-test of the HLL registers implementation.
 * Something that is not easy to test from within the outside.
 *
 * The test is conceived to test that the different counters of our data
 * structure are accessible and that setting their values both result in
 * the correct value to be retained and not affect adjacent values. */

#define REDIS_HLL_TEST_CYCLES 1000
618
void pfselftestCommand(redisClient *c) {
A
antirez 已提交
619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651
    int j, i;
    sds bitcounters = sdsnewlen(NULL,REDIS_HLL_SIZE);
    uint8_t bytecounters[REDIS_HLL_REGISTERS];

    for (j = 0; j < REDIS_HLL_TEST_CYCLES; j++) {
        /* Set the HLL counters and an array of unsigned byes of the
         * same size to the same set of random values. */
        for (i = 0; i < REDIS_HLL_REGISTERS; i++) {
            unsigned int r = rand() & REDIS_HLL_REGISTER_MAX;

            bytecounters[i] = r;
            HLL_SET_REGISTER(bitcounters,i,r);
        }
        /* Check that we are able to retrieve the same values. */
        for (i = 0; i < REDIS_HLL_REGISTERS; i++) {
            unsigned int val;

            HLL_GET_REGISTER(val,bitcounters,i);
            if (val != bytecounters[i]) {
                addReplyErrorFormat(c,
                    "TESTFAILED Register %d should be %d but is %d",
                    i, (int) bytecounters[i], (int) val);
                goto cleanup;
            }
        }
    }

    /* Success! */
    addReply(c,shared.ok);

cleanup:
    sdsfree(bitcounters);
}