db.c 40.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
/*
 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   * Redistributions of source code must retain the above copyright notice,
 *     this list of conditions and the following disclaimer.
 *   * Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *   * Neither the name of Redis nor the names of its contributors may be used
 *     to endorse or promote products derived from this software without
 *     specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

30
#include "redis.h"
31
#include "cluster.h"
32 33

#include <signal.h>
34
#include <ctype.h>
35

36 37 38
void slotToKeyAdd(robj *key);
void slotToKeyDel(robj *key);
void slotToKeyFlush(void);
39

40 41 42 43 44 45 46
/*-----------------------------------------------------------------------------
 * C-level DB API
 *----------------------------------------------------------------------------*/

robj *lookupKey(redisDb *db, robj *key) {
    dictEntry *de = dictFind(db->dict,key->ptr);
    if (de) {
47
        robj *val = dictGetVal(de);
48

G
guiquanz 已提交
49
        /* Update the access time for the ageing algorithm.
50 51
         * Don't do it if we have a saving child, as this will trigger
         * a copy on write madness. */
A
antirez 已提交
52
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
53
            val->lru = LRU_CLOCK();
54 55 56 57 58 59 60
        return val;
    } else {
        return NULL;
    }
}

robj *lookupKeyRead(redisDb *db, robj *key) {
61 62
    robj *val;

63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
    if (expireIfNeeded(db,key) == 1) {
        /* Key expired. If we are in the context of a master, expireIfNeeded()
         * returns 0 only when the key does not exist at all, so it's save
         * to return NULL ASAP. */
        if (server.masterhost == NULL) return NULL;

        /* However if we are in the context of a slave, expireIfNeeded() will
         * not really try to expire the key, it only returns information
         * about the "logical" status of the key: key expiring is up to the
         * master in order to have a consistent view of master's data set.
         *
         * However, if the command caller is not the master, and as additional
         * safety measure, the command invoked is a read-only command, we can
         * safely return NULL here, and provide a more consistent behavior
         * to clients accessign expired values in a read-only fashion, that
         * will say the key as non exisitng.
         *
         * Notably this covers GETs when slaves are used to scale reads. */
        if (server.current_client &&
            server.current_client != server.master &&
            server.current_client->cmd &&
            server.current_client->cmd->flags & REDIS_CMD_READONLY)
        {
            return NULL;
        }
    }
89 90 91 92 93 94
    val = lookupKey(db,key);
    if (val == NULL)
        server.stat_keyspace_misses++;
    else
        server.stat_keyspace_hits++;
    return val;
95 96 97
}

robj *lookupKeyWrite(redisDb *db, robj *key) {
98
    expireIfNeeded(db,key);
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
    return lookupKey(db,key);
}

robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
    robj *o = lookupKeyRead(c->db, key);
    if (!o) addReply(c,reply);
    return o;
}

robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
    robj *o = lookupKeyWrite(c->db, key);
    if (!o) addReply(c,reply);
    return o;
}

114
/* Add the key to the DB. It's up to the caller to increment the reference
G
guiquanz 已提交
115
 * counter of the value if needed.
116 117 118 119 120 121
 *
 * The program is aborted if the key already exists. */
void dbAdd(redisDb *db, robj *key, robj *val) {
    sds copy = sdsdup(key->ptr);
    int retval = dictAdd(db->dict, copy, val);

122
    redisAssertWithInfo(NULL,key,retval == REDIS_OK);
123
    if (val->type == REDIS_LIST) signalListAsReady(db, key);
124
    if (server.cluster_enabled) slotToKeyAdd(key);
125 126 127 128 129 130 131 132
 }

/* Overwrite an existing key with a new value. Incrementing the reference
 * count of the new value is up to the caller.
 * This function does not modify the expire time of the existing key.
 *
 * The program is aborted if the key was not already present. */
void dbOverwrite(redisDb *db, robj *key, robj *val) {
A
antirez 已提交
133
    dictEntry *de = dictFind(db->dict,key->ptr);
134

135
    redisAssertWithInfo(NULL,key,de != NULL);
136
    dictReplace(db->dict, key->ptr, val);
137 138
}

139 140
/* High level Set operation. This function can be used in order to set
 * a key, whatever it was existing or not, to a new object.
141
 *
142 143 144 145 146 147
 * 1) The ref count of the value object is incremented.
 * 2) clients WATCHing for the destination key notified.
 * 3) The expire time of the key is reset (the key is made persistent). */
void setKey(redisDb *db, robj *key, robj *val) {
    if (lookupKeyWrite(db,key) == NULL) {
        dbAdd(db,key,val);
148
    } else {
149
        dbOverwrite(db,key,val);
150
    }
151 152
    incrRefCount(val);
    removeExpire(db,key);
153
    signalModifiedKey(db,key);
154 155 156 157 158 159 160 161 162 163 164
}

int dbExists(redisDb *db, robj *key) {
    return dictFind(db->dict,key->ptr) != NULL;
}

/* Return a random key, in form of a Redis object.
 * If there are no keys, NULL is returned.
 *
 * The function makes sure to return keys not already expired. */
robj *dbRandomKey(redisDb *db) {
A
antirez 已提交
165
    dictEntry *de;
166 167 168 169 170 171 172 173

    while(1) {
        sds key;
        robj *keyobj;

        de = dictGetRandomKey(db->dict);
        if (de == NULL) return NULL;

174
        key = dictGetKey(de);
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
        keyobj = createStringObject(key,sdslen(key));
        if (dictFind(db->expires,key)) {
            if (expireIfNeeded(db,keyobj)) {
                decrRefCount(keyobj);
                continue; /* search for another key. This expired. */
            }
        }
        return keyobj;
    }
}

/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbDelete(redisDb *db, robj *key) {
    /* Deleting an entry from the expires dict will not free the sds of
     * the key, because it is shared with the main dictionary. */
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
191
    if (dictDelete(db->dict,key->ptr) == DICT_OK) {
192
        if (server.cluster_enabled) slotToKeyDel(key);
193 194 195 196
        return 1;
    } else {
        return 0;
    }
197 198
}

199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
/* Prepare the string object stored at 'key' to be modified destructively
 * to implement commands like SETBIT or APPEND.
 *
 * An object is usually ready to be modified unless one of the two conditions
 * are true:
 *
 * 1) The object 'o' is shared (refcount > 1), we don't want to affect
 *    other users.
 * 2) The object encoding is not "RAW".
 *
 * If the object is found in one of the above conditions (or both) by the
 * function, an unshared / not-encoded copy of the string object is stored
 * at 'key' in the specified 'db'. Otherwise the object 'o' itself is
 * returned.
 *
 * USAGE:
 *
 * The object 'o' is what the caller already obtained by looking up 'key'
 * in 'db', the usage pattern looks like this:
 *
 * o = lookupKeyWrite(db,key);
 * if (checkType(c,o,REDIS_STRING)) return;
 * o = dbUnshareStringValue(db,key,o);
 *
 * At this point the caller is ready to modify the object, for example
 * using an sdscat() call to append some data, or anything else.
 */
robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
    redisAssert(o->type == REDIS_STRING);
    if (o->refcount != 1 || o->encoding != REDIS_ENCODING_RAW) {
        robj *decoded = getDecodedObject(o);
        o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
        decrRefCount(decoded);
        dbOverwrite(db,key,o);
    }
    return o;
}

237
long long emptyDb(void(callback)(void*)) {
238 239 240 241 242
    int j;
    long long removed = 0;

    for (j = 0; j < server.dbnum; j++) {
        removed += dictSize(server.db[j].dict);
243 244
        dictEmpty(server.db[j].dict,callback);
        dictEmpty(server.db[j].expires,callback);
245
    }
246
    if (server.cluster_enabled) slotToKeyFlush();
247 248 249 250 251 252 253 254 255 256
    return removed;
}

int selectDb(redisClient *c, int id) {
    if (id < 0 || id >= server.dbnum)
        return REDIS_ERR;
    c->db = &server.db[id];
    return REDIS_OK;
}

257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
/*-----------------------------------------------------------------------------
 * Hooks for key space changes.
 *
 * Every time a key in the database is modified the function
 * signalModifiedKey() is called.
 *
 * Every time a DB is flushed the function signalFlushDb() is called.
 *----------------------------------------------------------------------------*/

void signalModifiedKey(redisDb *db, robj *key) {
    touchWatchedKey(db,key);
}

void signalFlushedDb(int dbid) {
    touchWatchedKeysOnFlush(dbid);
}

274 275 276 277 278 279
/*-----------------------------------------------------------------------------
 * Type agnostic commands operating on the key space
 *----------------------------------------------------------------------------*/

void flushdbCommand(redisClient *c) {
    server.dirty += dictSize(c->db->dict);
280
    signalFlushedDb(c->db->id);
281 282
    dictEmpty(c->db->dict,NULL);
    dictEmpty(c->db->expires,NULL);
283
    if (server.cluster_enabled) slotToKeyFlush();
284 285 286 287
    addReply(c,shared.ok);
}

void flushallCommand(redisClient *c) {
288
    signalFlushedDb(-1);
289
    server.dirty += emptyDb(NULL);
290
    addReply(c,shared.ok);
A
antirez 已提交
291
    if (server.rdb_child_pid != -1) {
292
        kill(server.rdb_child_pid,SIGUSR1);
A
antirez 已提交
293
        rdbRemoveTempFile(server.rdb_child_pid);
294
    }
295 296 297 298
    if (server.saveparamslen > 0) {
        /* Normally rdbSave() will reset dirty, but we don't want this here
         * as otherwise FLUSHALL will not be replicated nor put into the AOF. */
        int saved_dirty = server.dirty;
A
antirez 已提交
299
        rdbSave(server.rdb_filename);
300 301
        server.dirty = saved_dirty;
    }
302 303 304 305 306 307 308
    server.dirty++;
}

void delCommand(redisClient *c) {
    int deleted = 0, j;

    for (j = 1; j < c->argc; j++) {
309
        expireIfNeeded(c->db,c->argv[j]);
310
        if (dbDelete(c->db,c->argv[j])) {
311
            signalModifiedKey(c->db,c->argv[j]);
312 313
            notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
                "del",c->argv[j],c->db->id);
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
            server.dirty++;
            deleted++;
        }
    }
    addReplyLongLong(c,deleted);
}

void existsCommand(redisClient *c) {
    expireIfNeeded(c->db,c->argv[1]);
    if (dbExists(c->db,c->argv[1])) {
        addReply(c, shared.cone);
    } else {
        addReply(c, shared.czero);
    }
}

void selectCommand(redisClient *c) {
331 332 333 334 335
    long id;

    if (getLongFromObjectOrReply(c, c->argv[1], &id,
        "invalid DB index") != REDIS_OK)
        return;
336

337
    if (server.cluster_enabled && id != 0) {
A
antirez 已提交
338 339 340
        addReplyError(c,"SELECT is not allowed in cluster mode");
        return;
    }
341
    if (selectDb(c,id) == REDIS_ERR) {
342
        addReplyError(c,"invalid DB index");
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
    } else {
        addReply(c,shared.ok);
    }
}

void randomkeyCommand(redisClient *c) {
    robj *key;

    if ((key = dbRandomKey(c->db)) == NULL) {
        addReply(c,shared.nullbulk);
        return;
    }

    addReplyBulk(c,key);
    decrRefCount(key);
}

void keysCommand(redisClient *c) {
    dictIterator *di;
    dictEntry *de;
    sds pattern = c->argv[1]->ptr;
A
antirez 已提交
364
    int plen = sdslen(pattern), allkeys;
365
    unsigned long numkeys = 0;
366
    void *replylen = addDeferredMultiBulkLength(c);
367

368
    di = dictGetSafeIterator(c->db->dict);
A
antirez 已提交
369
    allkeys = (pattern[0] == '*' && pattern[1] == '\0');
370
    while((de = dictNext(di)) != NULL) {
371
        sds key = dictGetKey(de);
372 373
        robj *keyobj;

A
antirez 已提交
374
        if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
375 376 377 378 379 380 381 382 383
            keyobj = createStringObject(key,sdslen(key));
            if (expireIfNeeded(c->db,keyobj) == 0) {
                addReplyBulk(c,keyobj);
                numkeys++;
            }
            decrRefCount(keyobj);
        }
    }
    dictReleaseIterator(di);
384
    setDeferredMultiBulkLength(c,replylen,numkeys);
385 386
}

387 388
/* This callback is used by scanGenericCommand in order to collect elements
 * returned by the dictionary iterator into a list. */
P
Pieter Noordhuis 已提交
389
void scanCallback(void *privdata, const dictEntry *de) {
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408
    void **pd = (void**) privdata;
    list *keys = pd[0];
    robj *o = pd[1];
    robj *key, *val = NULL;

    if (o == NULL) {
        sds sdskey = dictGetKey(de);
        key = createStringObject(sdskey, sdslen(sdskey));
    } else if (o->type == REDIS_SET) {
        key = dictGetKey(de);
        incrRefCount(key);
    } else if (o->type == REDIS_HASH) {
        key = dictGetKey(de);
        incrRefCount(key);
        val = dictGetVal(de);
        incrRefCount(val);
    } else if (o->type == REDIS_ZSET) {
        key = dictGetKey(de);
        incrRefCount(key);
409
        val = createStringObjectFromLongDouble(*(double*)dictGetVal(de),0);
410 411 412 413 414 415
    } else {
        redisPanic("Type not handled in SCAN callback.");
    }

    listAddNodeTail(keys, key);
    if (val) listAddNodeTail(keys, val);
P
Pieter Noordhuis 已提交
416 417
}

418
/* Try to parse a SCAN cursor stored at object 'o':
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436
 * if the cursor is valid, store it as unsigned integer into *cursor and
 * returns REDIS_OK. Otherwise return REDIS_ERR and send an error to the
 * client. */
int parseScanCursorOrReply(redisClient *c, robj *o, unsigned long *cursor) {
    char *eptr;

    /* Use strtoul() because we need an *unsigned* long, so
     * getLongLongFromObject() does not cover the whole cursor space. */
    errno = 0;
    *cursor = strtoul(o->ptr, &eptr, 10);
    if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' || errno == ERANGE)
    {
        addReplyError(c, "invalid cursor");
        return REDIS_ERR;
    }
    return REDIS_OK;
}

437
/* This command implements SCAN, HSCAN and SSCAN commands.
438
 * If object 'o' is passed, then it must be a Hash or Set object, otherwise
439 440 441 442 443 444 445
 * if 'o' is NULL the command will operate on the dictionary associated with
 * the current database.
 *
 * When 'o' is not NULL the function assumes that the first argument in
 * the client arguments vector is a key so it skips it before iterating
 * in order to parse options.
 *
446
 * In the case of a Hash object the function returns both the field and value
447
 * of every element on the Hash. */
448
void scanGenericCommand(redisClient *c, robj *o, unsigned long cursor) {
P
Pieter Noordhuis 已提交
449 450
    int i, j;
    list *keys = listCreate();
451
    listNode *node, *nextnode;
452
    long count = 10;
C
clark.kang 已提交
453 454
    sds pat = NULL;
    int patlen = 0, use_pattern = 0;
455 456 457 458 459 460 461 462 463
    dict *ht;

    /* Object must be NULL (to iterate keys names), or the type of the object
     * must be Set, Sorted Set, or Hash. */
    redisAssert(o == NULL || o->type == REDIS_SET || o->type == REDIS_HASH ||
                o->type == REDIS_ZSET);

    /* Set i to the first option argument. The previous one is the cursor. */
    i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */
P
Pieter Noordhuis 已提交
464

465
    /* Step 1: Parse options. */
P
Pieter Noordhuis 已提交
466 467 468
    while (i < c->argc) {
        j = c->argc - i;
        if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
A
antirez 已提交
469 470 471
            if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
                != REDIS_OK)
            {
P
Pieter Noordhuis 已提交
472 473 474 475 476 477 478 479 480
                goto cleanup;
            }

            if (count < 1) {
                addReply(c,shared.syntaxerr);
                goto cleanup;
            }

            i += 2;
481
        } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
P
Pieter Noordhuis 已提交
482 483 484
            pat = c->argv[i+1]->ptr;
            patlen = sdslen(pat);

485 486 487
            /* The pattern always matches if it is exactly "*", so it is
             * equivalent to disabling it. */
            use_pattern = !(pat[0] == '*' && patlen == 1);
P
Pieter Noordhuis 已提交
488 489 490 491 492 493 494 495

            i += 2;
        } else {
            addReply(c,shared.syntaxerr);
            goto cleanup;
        }
    }

496 497 498
    /* Step 2: Iterate the collection.
     *
     * Note that if the object is encoded with a ziplist, intset, or any other
499
     * representation that is not a hash table, we are sure that it is also
500 501 502 503
     * composed of a small number of elements. So to avoid taking state we
     * just return everything inside the object in a single call, setting the
     * cursor to zero to signal the end of the iteration. */

504
    /* Handle the case of a hash table. */
505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520
    ht = NULL;
    if (o == NULL) {
        ht = c->db->dict;
    } else if (o->type == REDIS_SET && o->encoding == REDIS_ENCODING_HT) {
        ht = o->ptr;
    } else if (o->type == REDIS_HASH && o->encoding == REDIS_ENCODING_HT) {
        ht = o->ptr;
        count *= 2; /* We return key / value for this type. */
    } else if (o->type == REDIS_ZSET && o->encoding == REDIS_ENCODING_SKIPLIST) {
        zset *zs = o->ptr;
        ht = zs->dict;
        count *= 2; /* We return key / value for this type. */
    }

    if (ht) {
        void *privdata[2];
521 522 523 524 525
        /* We set the max number of iterations to ten times the specified
         * COUNT, so if the hash table is in a pathological state (very
         * sparsely populated) we avoid to block too much time at the cost
         * of returning no or very few elements. */
        long maxiterations = count*10;
526 527 528 529 530 531 532 533

        /* We pass two pointers to the callback: the list to which it will
         * add new elements, and the object containing the dictionary so that
         * it is possible to fetch more data in a type-dependent way. */
        privdata[0] = keys;
        privdata[1] = o;
        do {
            cursor = dictScan(ht, cursor, scanCallback, privdata);
534 535 536
        } while (cursor &&
              maxiterations-- &&
              listLength(keys) < (unsigned long)count);
537 538
    } else if (o->type == REDIS_SET) {
        int pos = 0;
539
        int64_t ll;
540 541 542

        while(intsetGet(o->ptr,pos++,&ll))
            listAddNodeTail(keys,createStringObjectFromLongLong(ll));
543
        cursor = 0;
544 545 546 547 548 549 550 551 552 553 554
    } else if (o->type == REDIS_HASH || o->type == REDIS_ZSET) {
        unsigned char *p = ziplistIndex(o->ptr,0);
        unsigned char *vstr;
        unsigned int vlen;
        long long vll;

        while(p) {
            ziplistGet(p,&vstr,&vlen,&vll);
            listAddNodeTail(keys,
                (vstr != NULL) ? createStringObject((char*)vstr,vlen) :
                                 createStringObjectFromLongLong(vll));
A
antirez 已提交
555
            p = ziplistNext(o->ptr,p);
556
        }
557
        cursor = 0;
558 559 560
    } else {
        redisPanic("Not handled encoding in SCAN.");
    }
P
Pieter Noordhuis 已提交
561

562
    /* Step 3: Filter elements. */
563 564 565 566
    node = listFirst(keys);
    while (node) {
        robj *kobj = listNodeValue(node);
        nextnode = listNextNode(node);
567 568 569
        int filter = 0;

        /* Filter element if it does not match the pattern. */
570
        if (!filter && use_pattern) {
571 572 573 574 575 576 577 578 579 580 581 582
            if (sdsEncodedObject(kobj)) {
                if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0))
                    filter = 1;
            } else {
                char buf[REDIS_LONGSTR_SIZE];
                int len;

                redisAssert(kobj->encoding == REDIS_ENCODING_INT);
                len = ll2string(buf,sizeof(buf),(long)kobj->ptr);
                if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1;
            }
        }
P
Pieter Noordhuis 已提交
583

584 585 586 587 588
        /* Filter element if it is an expired key. */
        if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1;

        /* Remove the element and its associted value if needed. */
        if (filter) {
P
Pieter Noordhuis 已提交
589
            decrRefCount(kobj);
590
            listDelNode(keys, node);
A
antirez 已提交
591 592
        }

593
        /* If this is a hash or a sorted set, we have a flat list of
A
antirez 已提交
594 595 596 597 598 599
         * key-value elements, so if this element was filtered, remove the
         * value, or skip it if it was not filtered: we only match keys. */
        if (o && (o->type == REDIS_ZSET || o->type == REDIS_HASH)) {
            node = nextnode;
            nextnode = listNextNode(node);
            if (filter) {
600 601 602 603
                kobj = listNodeValue(node);
                decrRefCount(kobj);
                listDelNode(keys, node);
            }
P
Pieter Noordhuis 已提交
604
        }
605
        node = nextnode;
P
Pieter Noordhuis 已提交
606 607
    }

608
    /* Step 4: Reply to the client. */
P
Pieter Noordhuis 已提交
609
    addReplyMultiBulkLen(c, 2);
610
    addReplyBulkLongLong(c,cursor);
P
Pieter Noordhuis 已提交
611 612

    addReplyMultiBulkLen(c, listLength(keys));
613 614
    while ((node = listFirst(keys)) != NULL) {
        robj *kobj = listNodeValue(node);
P
Pieter Noordhuis 已提交
615 616
        addReplyBulk(c, kobj);
        decrRefCount(kobj);
617
        listDelNode(keys, node);
P
Pieter Noordhuis 已提交
618 619 620
    }

cleanup:
621
    listSetFreeMethod(keys,decrRefCountVoid);
P
Pieter Noordhuis 已提交
622 623 624
    listRelease(keys);
}

625 626
/* The SCAN command completely relies on scanGenericCommand. */
void scanCommand(redisClient *c) {
627 628 629
    unsigned long cursor;
    if (parseScanCursorOrReply(c,c->argv[1],&cursor) == REDIS_ERR) return;
    scanGenericCommand(c,NULL,cursor);
630 631
}

632
void dbsizeCommand(redisClient *c) {
633
    addReplyLongLong(c,dictSize(c->db->dict));
634 635 636
}

void lastsaveCommand(redisClient *c) {
637
    addReplyLongLong(c,server.lastsave);
638 639 640 641 642 643 644 645
}

void typeCommand(redisClient *c) {
    robj *o;
    char *type;

    o = lookupKeyRead(c->db,c->argv[1]);
    if (o == NULL) {
646
        type = "none";
647 648
    } else {
        switch(o->type) {
649 650 651 652 653 654
        case REDIS_STRING: type = "string"; break;
        case REDIS_LIST: type = "list"; break;
        case REDIS_SET: type = "set"; break;
        case REDIS_ZSET: type = "zset"; break;
        case REDIS_HASH: type = "hash"; break;
        default: type = "unknown"; break;
655 656
        }
    }
657
    addReplyStatus(c,type);
658 659 660
}

void shutdownCommand(redisClient *c) {
661 662 663 664 665 666 667 668 669 670 671 672 673 674 675
    int flags = 0;

    if (c->argc > 2) {
        addReply(c,shared.syntaxerr);
        return;
    } else if (c->argc == 2) {
        if (!strcasecmp(c->argv[1]->ptr,"nosave")) {
            flags |= REDIS_SHUTDOWN_NOSAVE;
        } else if (!strcasecmp(c->argv[1]->ptr,"save")) {
            flags |= REDIS_SHUTDOWN_SAVE;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }
676 677
    /* When SHUTDOWN is called while the server is loading a dataset in
     * memory we need to make sure no attempt is performed to save
A
antirez 已提交
678
     * the dataset on shutdown (otherwise it could overwrite the current DB
679 680 681 682
     * with half-read data).
     *
     * Also when in Sentinel mode clear the SAVE flag and force NOSAVE. */
    if (server.loading || server.sentinel_mode)
A
antirez 已提交
683
        flags = (flags & ~REDIS_SHUTDOWN_SAVE) | REDIS_SHUTDOWN_NOSAVE;
684
    if (prepareForShutdown(flags) == REDIS_OK) exit(0);
685
    addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
686 687 688 689
}

void renameGenericCommand(redisClient *c, int nx) {
    robj *o;
690
    long long expire;
691
    int samekey = 0;
692

693 694 695
    /* When source and dest key is the same, no operation is performed,
     * if the key exists, however we still return an error on unexisting key. */
    if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1;
696 697 698 699

    if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
        return;

700 701 702 703 704
    if (samekey) {
        addReply(c,nx ? shared.czero : shared.ok);
        return;
    }

705
    incrRefCount(o);
706
    expire = getExpire(c->db,c->argv[1]);
707
    if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
708 709 710 711 712
        if (nx) {
            decrRefCount(o);
            addReply(c,shared.czero);
            return;
        }
A
antirez 已提交
713 714
        /* Overwrite: delete the old key before creating the new one
         * with the same name. */
715
        dbDelete(c->db,c->argv[2]);
716
    }
717 718
    dbAdd(c->db,c->argv[2],o);
    if (expire != -1) setExpire(c->db,c->argv[2],expire);
719
    dbDelete(c->db,c->argv[1]);
720 721
    signalModifiedKey(c->db,c->argv[1]);
    signalModifiedKey(c->db,c->argv[2]);
722 723 724 725
    notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"rename_from",
        c->argv[1],c->db->id);
    notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"rename_to",
        c->argv[2],c->db->id);
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741
    server.dirty++;
    addReply(c,nx ? shared.cone : shared.ok);
}

void renameCommand(redisClient *c) {
    renameGenericCommand(c,0);
}

void renamenxCommand(redisClient *c) {
    renameGenericCommand(c,1);
}

void moveCommand(redisClient *c) {
    robj *o;
    redisDb *src, *dst;
    int srcid;
M
Matt Stancliff 已提交
742
    long long dbid;
743

A
antirez 已提交
744 745 746 747 748
    if (server.cluster_enabled) {
        addReplyError(c,"MOVE is not allowed in cluster mode");
        return;
    }

749 750 751
    /* Obtain source and target DB pointers */
    src = c->db;
    srcid = c->db->id;
M
Matt Stancliff 已提交
752 753 754 755 756

    if (getLongLongFromObject(c->argv[2],&dbid) == REDIS_ERR ||
        dbid < INT_MIN || dbid > INT_MAX ||
        selectDb(c,dbid) == REDIS_ERR)
    {
757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776
        addReply(c,shared.outofrangeerr);
        return;
    }
    dst = c->db;
    selectDb(c,srcid); /* Back to the source DB */

    /* If the user is moving using as target the same
     * DB as the source DB it is probably an error. */
    if (src == dst) {
        addReply(c,shared.sameobjecterr);
        return;
    }

    /* Check if the element exists and get a reference */
    o = lookupKeyWrite(c->db,c->argv[1]);
    if (!o) {
        addReply(c,shared.czero);
        return;
    }

777 778
    /* Return zero if the key already exists in the target DB */
    if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
779 780 781
        addReply(c,shared.czero);
        return;
    }
782
    dbAdd(dst,c->argv[1],o);
783 784 785 786 787 788 789 790 791 792 793 794 795 796 797
    incrRefCount(o);

    /* OK! key moved, free the entry in the source DB */
    dbDelete(src,c->argv[1]);
    server.dirty++;
    addReply(c,shared.cone);
}

/*-----------------------------------------------------------------------------
 * Expires API
 *----------------------------------------------------------------------------*/

int removeExpire(redisDb *db, robj *key) {
    /* An expire may only be removed if there is a corresponding entry in the
     * main dict. Otherwise, the key will never be freed. */
798
    redisAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
A
antirez 已提交
799
    return dictDelete(db->expires,key->ptr) == DICT_OK;
800 801
}

802 803
void setExpire(redisDb *db, robj *key, long long when) {
    dictEntry *kde, *de;
804 805

    /* Reuse the sds from the main dict in the expire dict */
806 807 808 809
    kde = dictFind(db->dict,key->ptr);
    redisAssertWithInfo(NULL,key,kde != NULL);
    de = dictReplaceRaw(db->expires,dictGetKey(kde));
    dictSetSignedIntegerVal(de,when);
810 811 812 813
}

/* Return the expire time of the specified key, or -1 if no expire
 * is associated with this key (i.e. the key is non volatile) */
814
long long getExpire(redisDb *db, robj *key) {
815 816 817 818 819 820 821 822
    dictEntry *de;

    /* No expire? return ASAP */
    if (dictSize(db->expires) == 0 ||
       (de = dictFind(db->expires,key->ptr)) == NULL) return -1;

    /* The entry was found in the expire dict, this means it should also
     * be present in the main dict (safety check). */
823
    redisAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
824
    return dictGetSignedIntegerVal(de);
825 826
}

827 828 829 830 831 832 833 834 835 836 837
/* Propagate expires into slaves and the AOF file.
 * When a key expires in the master, a DEL operation for this key is sent
 * to all the slaves and the AOF file if enabled.
 *
 * This way the key expiry is centralized in one place, and since both
 * AOF and the master->slave link guarantee operation ordering, everything
 * will be consistent even if we allow write operations against expiring
 * keys. */
void propagateExpire(redisDb *db, robj *key) {
    robj *argv[2];

838
    argv[0] = shared.del;
839
    argv[1] = key;
840 841
    incrRefCount(argv[0]);
    incrRefCount(argv[1]);
842

843
    if (server.aof_state != REDIS_AOF_OFF)
844
        feedAppendOnlyFile(server.delCommand,db->id,argv,2);
845
    replicationFeedSlaves(server.slaves,db->id,argv,2);
846

847 848
    decrRefCount(argv[0]);
    decrRefCount(argv[1]);
849 850
}

851
int expireIfNeeded(redisDb *db, robj *key) {
852 853
    mstime_t when = getExpire(db,key);
    mstime_t now;
854

855 856
    if (when < 0) return 0; /* No expire for this key */

857 858 859
    /* Don't expire anything while loading. It will be done later. */
    if (server.loading) return 0;

860 861 862 863 864 865 866
    /* If we are in the context of a Lua script, we claim that time is
     * blocked to when the Lua script started. This way a key can expire
     * only the first time it is accessed and not in the middle of the
     * script execution, making propagation to slaves / AOF consistent.
     * See issue #1525 on Github for more information. */
    now = server.lua_caller ? server.lua_time_start : mstime();

867 868 869 870
    /* If we are running in the context of a slave, return ASAP:
     * the slave key expiration is controlled by the master that will
     * send us synthesized DEL operations for expired keys.
     *
871
     * Still we try to return the right information to the caller,
872 873
     * that is, 0 if we think the key should be still valid, 1 if
     * we think the key is expired at this time. */
874
    if (server.masterhost != NULL) return now > when;
875

876
    /* Return when this key has not expired */
877
    if (now <= when) return 0;
878 879 880

    /* Delete the key */
    server.stat_expiredkeys++;
881
    propagateExpire(db,key);
882 883
    notifyKeyspaceEvent(REDIS_NOTIFY_EXPIRED,
        "expired",key,db->id);
884 885 886 887 888 889 890
    return dbDelete(db,key);
}

/*-----------------------------------------------------------------------------
 * Expires Commands
 *----------------------------------------------------------------------------*/

891 892 893 894 895 896
/* This is the generic command implementation for EXPIRE, PEXPIRE, EXPIREAT
 * and PEXPIREAT. Because the commad second argument may be relative or absolute
 * the "basetime" argument is used to signal what the base time is (either 0
 * for *AT variants of the command, or the current time for relative expires).
 *
 * unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for
G
guiquanz 已提交
897
 * the argv[2] parameter. The basetime is always specified in milliseconds. */
898
void expireGenericCommand(redisClient *c, long long basetime, int unit) {
899
    robj *key = c->argv[1], *param = c->argv[2];
900
    long long when; /* unix time in milliseconds when the key will expire. */
901

902
    if (getLongLongFromObjectOrReply(c, param, &when, NULL) != REDIS_OK)
903
        return;
904

905 906
    if (unit == UNIT_SECONDS) when *= 1000;
    when += basetime;
907

908
    /* No key, return zero. */
909
    if (lookupKeyWrite(c->db,key) == NULL) {
910 911 912
        addReply(c,shared.czero);
        return;
    }
913

914 915 916 917 918 919
    /* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
     * should never be executed as a DEL when load the AOF or in the context
     * of a slave instance.
     *
     * Instead we take the other branch of the IF statement setting an expire
     * (possibly in the past) and wait for an explicit DEL from the master. */
920
    if (when <= mstime() && !server.loading && !server.masterhost) {
921 922
        robj *aux;

923
        redisAssertWithInfo(c,key,dbDelete(c->db,key));
924 925 926 927 928 929
        server.dirty++;

        /* Replicate/AOF this as an explicit DEL. */
        aux = createStringObject("DEL",3);
        rewriteClientCommandVector(c,2,aux,key);
        decrRefCount(aux);
930
        signalModifiedKey(c->db,key);
931
        notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",key,c->db->id);
932
        addReply(c, shared.cone);
933 934
        return;
    } else {
935
        setExpire(c->db,key,when);
936
        addReply(c,shared.cone);
937
        signalModifiedKey(c->db,key);
938
        notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"expire",key,c->db->id);
939
        server.dirty++;
940 941 942 943 944
        return;
    }
}

void expireCommand(redisClient *c) {
945
    expireGenericCommand(c,mstime(),UNIT_SECONDS);
946 947 948
}

void expireatCommand(redisClient *c) {
949
    expireGenericCommand(c,0,UNIT_SECONDS);
950 951
}

952
void pexpireCommand(redisClient *c) {
953
    expireGenericCommand(c,mstime(),UNIT_MILLISECONDS);
954
}
955

956
void pexpireatCommand(redisClient *c) {
957
    expireGenericCommand(c,0,UNIT_MILLISECONDS);
958 959 960 961
}

void ttlGenericCommand(redisClient *c, int output_ms) {
    long long expire, ttl = -1;
962

963
    /* If the key does not exist at all, return -2 */
A
antirez 已提交
964
    if (lookupKeyRead(c->db,c->argv[1]) == NULL) {
965 966 967 968 969
        addReplyLongLong(c,-2);
        return;
    }
    /* The key exists. Return -1 if it has no expire, or the actual
     * TTL value otherwise. */
A
antirez 已提交
970
    expire = getExpire(c->db,c->argv[1]);
971
    if (expire != -1) {
972
        ttl = expire-mstime();
A
antirez 已提交
973
        if (ttl < 0) ttl = 0;
974
    }
975 976 977
    if (ttl == -1) {
        addReplyLongLong(c,-1);
    } else {
978
        addReplyLongLong(c,output_ms ? ttl : ((ttl+500)/1000));
979
    }
980
}
A
antirez 已提交
981

982 983 984 985 986 987 988 989
void ttlCommand(redisClient *c) {
    ttlGenericCommand(c, 0);
}

void pttlCommand(redisClient *c) {
    ttlGenericCommand(c, 1);
}

A
antirez 已提交
990 991 992 993 994 995 996
void persistCommand(redisClient *c) {
    dictEntry *de;

    de = dictFind(c->db->dict,c->argv[1]->ptr);
    if (de == NULL) {
        addReply(c,shared.czero);
    } else {
A
antirez 已提交
997
        if (removeExpire(c->db,c->argv[1])) {
A
antirez 已提交
998
            addReply(c,shared.cone);
A
antirez 已提交
999 1000
            server.dirty++;
        } else {
A
antirez 已提交
1001
            addReply(c,shared.czero);
A
antirez 已提交
1002
        }
A
antirez 已提交
1003 1004
    }
}
1005 1006 1007 1008 1009

/* -----------------------------------------------------------------------------
 * API to get key arguments from commands
 * ---------------------------------------------------------------------------*/

1010 1011
/* The base case is to use the keys position as given in the command table
 * (firstkey, lastkey, step). */
1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024
int *getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, int *numkeys) {
    int j, i = 0, last, *keys;
    REDIS_NOTUSED(argv);

    if (cmd->firstkey == 0) {
        *numkeys = 0;
        return NULL;
    }
    last = cmd->lastkey;
    if (last < 0) last = argc+last;
    keys = zmalloc(sizeof(int)*((last - cmd->firstkey)+1));
    for (j = cmd->firstkey; j <= last; j += cmd->keystep) {
        redisAssert(j < argc);
1025
        keys[i++] = j;
1026
    }
1027
    *numkeys = i;
1028 1029 1030
    return keys;
}

1031 1032 1033 1034 1035 1036 1037 1038
/* Return all the arguments that are keys in the command passed via argc / argv.
 *
 * The command returns the positions of all the key arguments inside the array,
 * so the actual return value is an heap allocated array of integers. The
 * length of the array is returned by reference into *numkeys.
 *
 * 'cmd' must be point to the corresponding entry into the redisCommand
 * table, according to the command name in argv[0].
1039 1040 1041
 *
 * This function uses the command table if a command-specific helper function
 * is not required, otherwise it calls the command-specific function. */
1042
int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1043
    if (cmd->getkeys_proc) {
1044
        return cmd->getkeys_proc(cmd,argv,argc,numkeys);
1045 1046 1047 1048 1049
    } else {
        return getKeysUsingCommandTable(cmd,argv,argc,numkeys);
    }
}

1050
/* Free the result of getKeysFromCommand. */
1051 1052 1053 1054
void getKeysFreeResult(int *result) {
    zfree(result);
}

1055 1056 1057 1058
/* Helper function to extract keys from following commands:
 * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options>
 * ZINTERSTORE <destkey> <num-keys> <key> <key> ... <key> <options> */
int *zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1059 1060 1061 1062 1063 1064 1065 1066 1067 1068
    int i, num, *keys;
    REDIS_NOTUSED(cmd);

    num = atoi(argv[2]->ptr);
    /* Sanity check. Don't return any key if the command is going to
     * reply with syntax error. */
    if (num > (argc-3)) {
        *numkeys = 0;
        return NULL;
    }
1069 1070

    /* Keys in z{union,inter}store come from two places:
1071 1072
     * argv[1] = storage key,
     * argv[3...n] = keys to intersect */
1073 1074 1075
    keys = zmalloc(sizeof(int)*(num+1));

    /* Add all key positions for argv[3...n] to keys[] */
1076
    for (i = 0; i < num; i++) keys[i] = 3+i;
1077

1078
    /* Finally add the argv[1] key position (the storage key target). */
1079 1080
    keys[num] = 1;
    *numkeys = num+1;  /* Total keys = {union,inter} keys + storage key */
1081 1082
    return keys;
}
1083

1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099
/* Helper function to extract keys from the following commands:
 * EVAL <script> <num-keys> <key> <key> ... <key> [more stuff]
 * EVALSHA <script> <num-keys> <key> <key> ... <key> [more stuff] */
int *evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
    int i, num, *keys;
    REDIS_NOTUSED(cmd);

    num = atoi(argv[2]->ptr);
    /* Sanity check. Don't return any key if the command is going to
     * reply with syntax error. */
    if (num > (argc-3)) {
        *numkeys = 0;
        return NULL;
    }

    keys = zmalloc(sizeof(int)*num);
1100
    *numkeys = num;
1101 1102 1103 1104 1105 1106 1107

    /* Add all key positions for argv[3...n] to keys[] */
    for (i = 0; i < num; i++) keys[i] = 3+i;

    return keys;
}

1108 1109 1110 1111 1112 1113 1114 1115
/* Helper function to extract keys from the SORT command.
 *
 * SORT <sort-key> ... STORE <store-key> ...
 *
 * The first argument of SORT is always a key, however a list of options
 * follow in SQL-alike style. Here we parse just the minimum in order to
 * correctly identify keys in the "STORE" option. */
int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
M
Matt Stancliff 已提交
1116
    int i, j, num, *keys, found_store = 0;
1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143
    REDIS_NOTUSED(cmd);

    num = 0;
    keys = zmalloc(sizeof(int)*2); /* Alloc 2 places for the worst case. */

    keys[num++] = 1; /* <sort-key> is always present. */

    /* Search for STORE option. By default we consider options to don't
     * have arguments, so if we find an unknown option name we scan the
     * next. However there are options with 1 or 2 arguments, so we
     * provide a list here in order to skip the right number of args. */
    struct {
        char *name;
        int skip;
    } skiplist[] = {
        {"limit", 2},
        {"get", 1},
        {"by", 1},
        {NULL, 0} /* End of elements. */
    };

    for (i = 2; i < argc; i++) {
        for (j = 0; skiplist[j].name != NULL; j++) {
            if (!strcasecmp(argv[i]->ptr,skiplist[j].name)) {
                i += skiplist[j].skip;
                break;
            } else if (!strcasecmp(argv[i]->ptr,"store") && i+1 < argc) {
1144 1145 1146
                /* Note: we don't increment "num" here and continue the loop
                 * to be sure to process the *last* "STORE" option if multiple
                 * ones are provided. This is same behavior as SORT. */
M
Matt Stancliff 已提交
1147
                found_store = 1;
1148
                keys[num] = i+1; /* <store-key> */
1149 1150 1151 1152
                break;
            }
        }
    }
M
Matt Stancliff 已提交
1153
    *numkeys = num + found_store;
1154 1155 1156
    return keys;
}

1157 1158 1159
/* Slot to Key API. This is used by Redis Cluster in order to obtain in
 * a fast way a key that belongs to a specified hash slot. This is useful
 * while rehashing the cluster. */
1160
void slotToKeyAdd(robj *key) {
1161 1162
    unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));

1163
    zslInsert(server.cluster->slots_to_keys,hashslot,key);
1164 1165 1166
    incrRefCount(key);
}

1167
void slotToKeyDel(robj *key) {
1168 1169
    unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));

1170
    zslDelete(server.cluster->slots_to_keys,hashslot,key);
1171 1172
}

1173
void slotToKeyFlush(void) {
1174 1175 1176 1177
    zslFree(server.cluster->slots_to_keys);
    server.cluster->slots_to_keys = zslCreate();
}

1178
unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
1179 1180
    zskiplistNode *n;
    zrangespec range;
A
antirez 已提交
1181
    int j = 0;
1182 1183 1184

    range.min = range.max = hashslot;
    range.minex = range.maxex = 0;
1185

1186
    n = zslFirstInRange(server.cluster->slots_to_keys, &range);
A
antirez 已提交
1187 1188 1189 1190 1191
    while(n && n->score == hashslot && count--) {
        keys[j++] = n->obj;
        n = n->level[0].forward;
    }
    return j;
1192
}
1193

1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215
/* Remove all the keys in the specified hash slot.
 * The number of removed items is returned. */
unsigned int delKeysInSlot(unsigned int hashslot) {
    zskiplistNode *n;
    zrangespec range;
    int j = 0;

    range.min = range.max = hashslot;
    range.minex = range.maxex = 0;

    n = zslFirstInRange(server.cluster->slots_to_keys, &range);
    while(n && n->score == hashslot) {
        robj *key = n->obj;
        n = n->level[0].forward; /* Go to the next item before freeing it. */
        incrRefCount(key); /* Protect the object while freeing it. */
        dbDelete(&server.db[0],key);
        decrRefCount(key);
        j++;
    }
    return j;
}

1216
unsigned int countKeysInSlot(unsigned int hashslot) {
1217 1218
    zskiplist *zsl = server.cluster->slots_to_keys;
    zskiplistNode *zn;
1219
    zrangespec range;
1220
    int rank, count = 0;
1221 1222 1223

    range.min = range.max = hashslot;
    range.minex = range.maxex = 0;
1224 1225

    /* Find first element in range */
1226
    zn = zslFirstInRange(zsl, &range);
1227 1228 1229 1230 1231 1232 1233

    /* Use rank of first element, if any, to determine preliminary count */
    if (zn != NULL) {
        rank = zslGetRank(zsl, zn->score, zn->obj);
        count = (zsl->length - (rank - 1));

        /* Find last element in range */
1234
        zn = zslLastInRange(zsl, &range);
1235 1236 1237 1238 1239 1240

        /* Use rank of last element, if any, to determine the actual count */
        if (zn != NULL) {
            rank = zslGetRank(zsl, zn->score, zn->obj);
            count -= (zsl->length - rank);
        }
1241
    }
1242
    return count;
1243
}