db.c 41.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
/*
 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   * Redistributions of source code must retain the above copyright notice,
 *     this list of conditions and the following disclaimer.
 *   * Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *   * Neither the name of Redis nor the names of its contributors may be used
 *     to endorse or promote products derived from this software without
 *     specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

30
#include "server.h"
31
#include "cluster.h"
32
#include "atomicvar.h"
33 34

#include <signal.h>
35
#include <ctype.h>
36 37 38 39 40 41 42 43

/*-----------------------------------------------------------------------------
 * C-level DB API
 *----------------------------------------------------------------------------*/

robj *lookupKey(redisDb *db, robj *key) {
    dictEntry *de = dictFind(db->dict,key->ptr);
    if (de) {
44
        robj *val = dictGetVal(de);
45

G
guiquanz 已提交
46
        /* Update the access time for the ageing algorithm.
47 48
         * Don't do it if we have a saving child, as this will trigger
         * a copy on write madness. */
A
antirez 已提交
49
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
50
            val->lru = LRU_CLOCK();
51 52 53 54 55 56 57
        return val;
    } else {
        return NULL;
    }
}

robj *lookupKeyRead(redisDb *db, robj *key) {
58 59
    robj *val;

60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
    if (expireIfNeeded(db,key) == 1) {
        /* Key expired. If we are in the context of a master, expireIfNeeded()
         * returns 0 only when the key does not exist at all, so it's save
         * to return NULL ASAP. */
        if (server.masterhost == NULL) return NULL;

        /* However if we are in the context of a slave, expireIfNeeded() will
         * not really try to expire the key, it only returns information
         * about the "logical" status of the key: key expiring is up to the
         * master in order to have a consistent view of master's data set.
         *
         * However, if the command caller is not the master, and as additional
         * safety measure, the command invoked is a read-only command, we can
         * safely return NULL here, and provide a more consistent behavior
         * to clients accessign expired values in a read-only fashion, that
         * will say the key as non exisitng.
         *
         * Notably this covers GETs when slaves are used to scale reads. */
        if (server.current_client &&
            server.current_client != server.master &&
            server.current_client->cmd &&
A
antirez 已提交
81
            server.current_client->cmd->flags & CMD_READONLY)
82 83 84 85
        {
            return NULL;
        }
    }
86 87 88 89 90 91
    val = lookupKey(db,key);
    if (val == NULL)
        server.stat_keyspace_misses++;
    else
        server.stat_keyspace_hits++;
    return val;
92 93 94
}

robj *lookupKeyWrite(redisDb *db, robj *key) {
95
    expireIfNeeded(db,key);
96 97 98
    return lookupKey(db,key);
}

99
robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
100 101 102 103 104
    robj *o = lookupKeyRead(c->db, key);
    if (!o) addReply(c,reply);
    return o;
}

105
robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
106 107 108 109 110
    robj *o = lookupKeyWrite(c->db, key);
    if (!o) addReply(c,reply);
    return o;
}

111
/* Add the key to the DB. It's up to the caller to increment the reference
G
guiquanz 已提交
112
 * counter of the value if needed.
113 114 115 116 117 118
 *
 * The program is aborted if the key already exists. */
void dbAdd(redisDb *db, robj *key, robj *val) {
    sds copy = sdsdup(key->ptr);
    int retval = dictAdd(db->dict, copy, val);

119
    serverAssertWithInfo(NULL,key,retval == C_OK);
120
    if (val->type == OBJ_LIST) signalListAsReady(db, key);
121
    if (server.cluster_enabled) slotToKeyAdd(key);
122 123 124 125 126 127 128 129
 }

/* Overwrite an existing key with a new value. Incrementing the reference
 * count of the new value is up to the caller.
 * This function does not modify the expire time of the existing key.
 *
 * The program is aborted if the key was not already present. */
void dbOverwrite(redisDb *db, robj *key, robj *val) {
A
antirez 已提交
130
    dictEntry *de = dictFind(db->dict,key->ptr);
131

A
antirez 已提交
132
    serverAssertWithInfo(NULL,key,de != NULL);
133
    dictReplace(db->dict, key->ptr, val);
134 135
}

136 137
/* High level Set operation. This function can be used in order to set
 * a key, whatever it was existing or not, to a new object.
138
 *
139 140 141 142 143 144
 * 1) The ref count of the value object is incremented.
 * 2) clients WATCHing for the destination key notified.
 * 3) The expire time of the key is reset (the key is made persistent). */
void setKey(redisDb *db, robj *key, robj *val) {
    if (lookupKeyWrite(db,key) == NULL) {
        dbAdd(db,key,val);
145
    } else {
146
        dbOverwrite(db,key,val);
147
    }
148 149
    incrRefCount(val);
    removeExpire(db,key);
150
    signalModifiedKey(db,key);
151 152 153 154 155 156 157 158 159 160 161
}

int dbExists(redisDb *db, robj *key) {
    return dictFind(db->dict,key->ptr) != NULL;
}

/* Return a random key, in form of a Redis object.
 * If there are no keys, NULL is returned.
 *
 * The function makes sure to return keys not already expired. */
robj *dbRandomKey(redisDb *db) {
A
antirez 已提交
162
    dictEntry *de;
163 164 165 166 167 168 169 170

    while(1) {
        sds key;
        robj *keyobj;

        de = dictGetRandomKey(db->dict);
        if (de == NULL) return NULL;

171
        key = dictGetKey(de);
172 173 174 175 176 177 178 179 180 181 182 183
        keyobj = createStringObject(key,sdslen(key));
        if (dictFind(db->expires,key)) {
            if (expireIfNeeded(db,keyobj)) {
                decrRefCount(keyobj);
                continue; /* search for another key. This expired. */
            }
        }
        return keyobj;
    }
}

/* Delete a key, value, and associated expiration entry if any, from the DB */
184
int dbSyncDelete(redisDb *db, robj *key) {
185 186 187
    /* Deleting an entry from the expires dict will not free the sds of
     * the key, because it is shared with the main dictionary. */
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
188
    if (dictDelete(db->dict,key->ptr) == DICT_OK) {
189
        if (server.cluster_enabled) slotToKeyDel(key);
190 191 192 193
        return 1;
    } else {
        return 0;
    }
194 195
}

196 197 198 199 200 201 202 203
/* This is a wrapper whose behavior depends on the Redis lazy free
 * configuration. Deletes the key synchronously or asynchronously. */
int dbDelete(redisDb *db, robj *key) {
    int async = 1; /* TODO: Fixme making this a proper option. */
    if (async) return dbAsyncDelete(db,key);
    else       return dbSyncDelete(db,key);
}

204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
/* Prepare the string object stored at 'key' to be modified destructively
 * to implement commands like SETBIT or APPEND.
 *
 * An object is usually ready to be modified unless one of the two conditions
 * are true:
 *
 * 1) The object 'o' is shared (refcount > 1), we don't want to affect
 *    other users.
 * 2) The object encoding is not "RAW".
 *
 * If the object is found in one of the above conditions (or both) by the
 * function, an unshared / not-encoded copy of the string object is stored
 * at 'key' in the specified 'db'. Otherwise the object 'o' itself is
 * returned.
 *
 * USAGE:
 *
 * The object 'o' is what the caller already obtained by looking up 'key'
 * in 'db', the usage pattern looks like this:
 *
 * o = lookupKeyWrite(db,key);
225
 * if (checkType(c,o,OBJ_STRING)) return;
226 227 228 229 230 231
 * o = dbUnshareStringValue(db,key,o);
 *
 * At this point the caller is ready to modify the object, for example
 * using an sdscat() call to append some data, or anything else.
 */
robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
A
antirez 已提交
232
    serverAssert(o->type == OBJ_STRING);
233
    if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
234 235 236 237 238 239 240 241
        robj *decoded = getDecodedObject(o);
        o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
        decrRefCount(decoded);
        dbOverwrite(db,key,o);
    }
    return o;
}

242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
/* Remove all keys from all the databases in a Redis server.
 * If callback is given the function is called from time to time to
 * signal that work is in progress.
 *
 * The dbnum can be -1 if all teh DBs should be flushed, or the specified
 * DB number if we want to flush only a single Redis database number.
 *
 * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
 * EMPTYDB_ASYCN if we want the memory to be freed in a different thread
 * and the function to return ASAP.
 *
 * On success the fuction returns the number of keys removed from the
 * database(s). Otherwise -1 is returned in the specific case the
 * DB number is out of range, and errno is set to EINVAL. */
long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
    int j, async = (flags & EMPTYDB_ASYNC);
258 259
    long long removed = 0;

260 261 262 263 264
    if (dbnum < -1 || dbnum >= server.dbnum) {
        errno = EINVAL;
        return -1;
    }

265
    for (j = 0; j < server.dbnum; j++) {
266
        if (dbnum != 1 && dbnum != j) continue;
267
        removed += dictSize(server.db[j].dict);
268 269 270 271 272 273 274 275 276 277 278 279 280
        if (async) {
            emptyDbAsync(&server.db[j]);
        } else {
            dictEmpty(server.db[j].dict,callback);
            dictEmpty(server.db[j].expires,callback);
        }
    }
    if (server.cluster_enabled) {
        if (async) {
            slotToKeyFlushAsync();
        } else {
            slotToKeyFlush();
        }
281 282 283 284
    }
    return removed;
}

285
int selectDb(client *c, int id) {
286
    if (id < 0 || id >= server.dbnum)
287
        return C_ERR;
288
    c->db = &server.db[id];
289
    return C_OK;
290 291
}

292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
/*-----------------------------------------------------------------------------
 * Hooks for key space changes.
 *
 * Every time a key in the database is modified the function
 * signalModifiedKey() is called.
 *
 * Every time a DB is flushed the function signalFlushDb() is called.
 *----------------------------------------------------------------------------*/

void signalModifiedKey(redisDb *db, robj *key) {
    touchWatchedKey(db,key);
}

void signalFlushedDb(int dbid) {
    touchWatchedKeysOnFlush(dbid);
}

309 310 311 312
/*-----------------------------------------------------------------------------
 * Type agnostic commands operating on the key space
 *----------------------------------------------------------------------------*/

313
void flushdbCommand(client *c) {
314
    server.dirty += dictSize(c->db->dict);
315
    signalFlushedDb(c->db->id);
316 317
    dictEmpty(c->db->dict,NULL);
    dictEmpty(c->db->expires,NULL);
318
    if (server.cluster_enabled) slotToKeyFlush();
319 320 321
    addReply(c,shared.ok);
}

322
void flushallCommand(client *c) {
323
    signalFlushedDb(-1);
324
    server.dirty += emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
325
    addReply(c,shared.ok);
A
antirez 已提交
326
    if (server.rdb_child_pid != -1) {
327
        kill(server.rdb_child_pid,SIGUSR1);
A
antirez 已提交
328
        rdbRemoveTempFile(server.rdb_child_pid);
329
    }
330 331 332 333
    if (server.saveparamslen > 0) {
        /* Normally rdbSave() will reset dirty, but we don't want this here
         * as otherwise FLUSHALL will not be replicated nor put into the AOF. */
        int saved_dirty = server.dirty;
A
antirez 已提交
334
        rdbSave(server.rdb_filename);
335 336
        server.dirty = saved_dirty;
    }
337 338 339
    server.dirty++;
}

340 341 342
/* This command implements DEL and LAZYDEL. */
void delGenericCommand(client *c, int lazy) {
    int numdel = 0, j;
343 344

    for (j = 1; j < c->argc; j++) {
345
        expireIfNeeded(c->db,c->argv[j]);
346 347 348
        int deleted  = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
                              dbSyncDelete(c->db,c->argv[j]);
        if (deleted) {
349
            signalModifiedKey(c->db,c->argv[j]);
A
antirez 已提交
350
            notifyKeyspaceEvent(NOTIFY_GENERIC,
351
                "del",c->argv[j],c->db->id);
352
            server.dirty++;
353
            numdel++;
354 355
        }
    }
356 357 358 359 360 361 362 363 364
    addReplyLongLong(c,numdel);
}

void delCommand(client *c) {
    delGenericCommand(c,0);
}

void unlinkCommand(client *c) {
    delGenericCommand(c,1);
365 366
}

A
antirez 已提交
367 368
/* EXISTS key1 key2 ... key_N.
 * Return value is the number of keys existing. */
369
void existsCommand(client *c) {
A
antirez 已提交
370 371 372 373 374 375
    long long count = 0;
    int j;

    for (j = 1; j < c->argc; j++) {
        expireIfNeeded(c->db,c->argv[j]);
        if (dbExists(c->db,c->argv[j])) count++;
376
    }
A
antirez 已提交
377
    addReplyLongLong(c,count);
378 379
}

380
void selectCommand(client *c) {
381 382 383
    long id;

    if (getLongFromObjectOrReply(c, c->argv[1], &id,
384
        "invalid DB index") != C_OK)
385
        return;
386

387
    if (server.cluster_enabled && id != 0) {
A
antirez 已提交
388 389 390
        addReplyError(c,"SELECT is not allowed in cluster mode");
        return;
    }
391
    if (selectDb(c,id) == C_ERR) {
392
        addReplyError(c,"invalid DB index");
393 394 395 396 397
    } else {
        addReply(c,shared.ok);
    }
}

398
void randomkeyCommand(client *c) {
399 400 401 402 403 404 405 406 407 408 409
    robj *key;

    if ((key = dbRandomKey(c->db)) == NULL) {
        addReply(c,shared.nullbulk);
        return;
    }

    addReplyBulk(c,key);
    decrRefCount(key);
}

410
void keysCommand(client *c) {
411 412 413
    dictIterator *di;
    dictEntry *de;
    sds pattern = c->argv[1]->ptr;
A
antirez 已提交
414
    int plen = sdslen(pattern), allkeys;
415
    unsigned long numkeys = 0;
416
    void *replylen = addDeferredMultiBulkLength(c);
417

418
    di = dictGetSafeIterator(c->db->dict);
A
antirez 已提交
419
    allkeys = (pattern[0] == '*' && pattern[1] == '\0');
420
    while((de = dictNext(di)) != NULL) {
421
        sds key = dictGetKey(de);
422 423
        robj *keyobj;

A
antirez 已提交
424
        if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
425 426 427 428 429 430 431 432 433
            keyobj = createStringObject(key,sdslen(key));
            if (expireIfNeeded(c->db,keyobj) == 0) {
                addReplyBulk(c,keyobj);
                numkeys++;
            }
            decrRefCount(keyobj);
        }
    }
    dictReleaseIterator(di);
434
    setDeferredMultiBulkLength(c,replylen,numkeys);
435 436
}

437 438
/* This callback is used by scanGenericCommand in order to collect elements
 * returned by the dictionary iterator into a list. */
P
Pieter Noordhuis 已提交
439
void scanCallback(void *privdata, const dictEntry *de) {
440 441 442 443 444 445 446 447
    void **pd = (void**) privdata;
    list *keys = pd[0];
    robj *o = pd[1];
    robj *key, *val = NULL;

    if (o == NULL) {
        sds sdskey = dictGetKey(de);
        key = createStringObject(sdskey, sdslen(sdskey));
448
    } else if (o->type == OBJ_SET) {
449 450
        sds keysds = dictGetKey(de);
        key = createStringObject(keysds,sdslen(keysds));
451
    } else if (o->type == OBJ_HASH) {
452 453
        sds sdskey = dictGetKey(de);
        sds sdsval = dictGetVal(de);
454 455
        key = createStringObject(sdskey,sdslen(sdskey));
        val = createStringObject(sdsval,sdslen(sdsval));
456
    } else if (o->type == OBJ_ZSET) {
457 458
        sds sdskey = dictGetKey(de);
        key = createStringObject(sdskey,sdslen(sdskey));
459
        val = createStringObjectFromLongDouble(*(double*)dictGetVal(de),0);
460
    } else {
A
antirez 已提交
461
        serverPanic("Type not handled in SCAN callback.");
462 463 464 465
    }

    listAddNodeTail(keys, key);
    if (val) listAddNodeTail(keys, val);
P
Pieter Noordhuis 已提交
466 467
}

468
/* Try to parse a SCAN cursor stored at object 'o':
469
 * if the cursor is valid, store it as unsigned integer into *cursor and
470
 * returns C_OK. Otherwise return C_ERR and send an error to the
471
 * client. */
472
int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor) {
473 474 475 476 477 478 479 480 481
    char *eptr;

    /* Use strtoul() because we need an *unsigned* long, so
     * getLongLongFromObject() does not cover the whole cursor space. */
    errno = 0;
    *cursor = strtoul(o->ptr, &eptr, 10);
    if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' || errno == ERANGE)
    {
        addReplyError(c, "invalid cursor");
482
        return C_ERR;
483
    }
484
    return C_OK;
485 486
}

487
/* This command implements SCAN, HSCAN and SSCAN commands.
488
 * If object 'o' is passed, then it must be a Hash or Set object, otherwise
489 490 491 492 493 494 495
 * if 'o' is NULL the command will operate on the dictionary associated with
 * the current database.
 *
 * When 'o' is not NULL the function assumes that the first argument in
 * the client arguments vector is a key so it skips it before iterating
 * in order to parse options.
 *
496
 * In the case of a Hash object the function returns both the field and value
497
 * of every element on the Hash. */
498
void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
P
Pieter Noordhuis 已提交
499 500
    int i, j;
    list *keys = listCreate();
501
    listNode *node, *nextnode;
502
    long count = 10;
C
clark.kang 已提交
503 504
    sds pat = NULL;
    int patlen = 0, use_pattern = 0;
505 506 507 508
    dict *ht;

    /* Object must be NULL (to iterate keys names), or the type of the object
     * must be Set, Sorted Set, or Hash. */
A
antirez 已提交
509
    serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH ||
510
                o->type == OBJ_ZSET);
511 512 513

    /* Set i to the first option argument. The previous one is the cursor. */
    i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */
P
Pieter Noordhuis 已提交
514

515
    /* Step 1: Parse options. */
P
Pieter Noordhuis 已提交
516 517 518
    while (i < c->argc) {
        j = c->argc - i;
        if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
A
antirez 已提交
519
            if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
520
                != C_OK)
A
antirez 已提交
521
            {
P
Pieter Noordhuis 已提交
522 523 524 525 526 527 528 529 530
                goto cleanup;
            }

            if (count < 1) {
                addReply(c,shared.syntaxerr);
                goto cleanup;
            }

            i += 2;
531
        } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
P
Pieter Noordhuis 已提交
532 533 534
            pat = c->argv[i+1]->ptr;
            patlen = sdslen(pat);

535 536 537
            /* The pattern always matches if it is exactly "*", so it is
             * equivalent to disabling it. */
            use_pattern = !(pat[0] == '*' && patlen == 1);
P
Pieter Noordhuis 已提交
538 539 540 541 542 543 544 545

            i += 2;
        } else {
            addReply(c,shared.syntaxerr);
            goto cleanup;
        }
    }

546 547 548
    /* Step 2: Iterate the collection.
     *
     * Note that if the object is encoded with a ziplist, intset, or any other
549
     * representation that is not a hash table, we are sure that it is also
550 551 552 553
     * composed of a small number of elements. So to avoid taking state we
     * just return everything inside the object in a single call, setting the
     * cursor to zero to signal the end of the iteration. */

554
    /* Handle the case of a hash table. */
555 556 557
    ht = NULL;
    if (o == NULL) {
        ht = c->db->dict;
558
    } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
559
        ht = o->ptr;
560
    } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
561 562
        ht = o->ptr;
        count *= 2; /* We return key / value for this type. */
563
    } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
564 565 566 567 568 569 570
        zset *zs = o->ptr;
        ht = zs->dict;
        count *= 2; /* We return key / value for this type. */
    }

    if (ht) {
        void *privdata[2];
571 572 573 574 575
        /* We set the max number of iterations to ten times the specified
         * COUNT, so if the hash table is in a pathological state (very
         * sparsely populated) we avoid to block too much time at the cost
         * of returning no or very few elements. */
        long maxiterations = count*10;
576 577 578 579 580 581 582 583

        /* We pass two pointers to the callback: the list to which it will
         * add new elements, and the object containing the dictionary so that
         * it is possible to fetch more data in a type-dependent way. */
        privdata[0] = keys;
        privdata[1] = o;
        do {
            cursor = dictScan(ht, cursor, scanCallback, privdata);
584 585 586
        } while (cursor &&
              maxiterations-- &&
              listLength(keys) < (unsigned long)count);
587
    } else if (o->type == OBJ_SET) {
588
        int pos = 0;
589
        int64_t ll;
590 591 592

        while(intsetGet(o->ptr,pos++,&ll))
            listAddNodeTail(keys,createStringObjectFromLongLong(ll));
593
        cursor = 0;
594
    } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) {
595 596 597 598 599 600 601 602 603 604
        unsigned char *p = ziplistIndex(o->ptr,0);
        unsigned char *vstr;
        unsigned int vlen;
        long long vll;

        while(p) {
            ziplistGet(p,&vstr,&vlen,&vll);
            listAddNodeTail(keys,
                (vstr != NULL) ? createStringObject((char*)vstr,vlen) :
                                 createStringObjectFromLongLong(vll));
A
antirez 已提交
605
            p = ziplistNext(o->ptr,p);
606
        }
607
        cursor = 0;
608
    } else {
A
antirez 已提交
609
        serverPanic("Not handled encoding in SCAN.");
610
    }
P
Pieter Noordhuis 已提交
611

612
    /* Step 3: Filter elements. */
613 614 615 616
    node = listFirst(keys);
    while (node) {
        robj *kobj = listNodeValue(node);
        nextnode = listNextNode(node);
617 618 619
        int filter = 0;

        /* Filter element if it does not match the pattern. */
620
        if (!filter && use_pattern) {
621 622 623 624
            if (sdsEncodedObject(kobj)) {
                if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0))
                    filter = 1;
            } else {
A
antirez 已提交
625
                char buf[LONG_STR_SIZE];
626 627
                int len;

A
antirez 已提交
628
                serverAssert(kobj->encoding == OBJ_ENCODING_INT);
629 630 631 632
                len = ll2string(buf,sizeof(buf),(long)kobj->ptr);
                if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1;
            }
        }
P
Pieter Noordhuis 已提交
633

634 635 636 637 638
        /* Filter element if it is an expired key. */
        if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1;

        /* Remove the element and its associted value if needed. */
        if (filter) {
P
Pieter Noordhuis 已提交
639
            decrRefCount(kobj);
640
            listDelNode(keys, node);
A
antirez 已提交
641 642
        }

643
        /* If this is a hash or a sorted set, we have a flat list of
A
antirez 已提交
644 645
         * key-value elements, so if this element was filtered, remove the
         * value, or skip it if it was not filtered: we only match keys. */
646
        if (o && (o->type == OBJ_ZSET || o->type == OBJ_HASH)) {
A
antirez 已提交
647 648 649
            node = nextnode;
            nextnode = listNextNode(node);
            if (filter) {
650 651 652 653
                kobj = listNodeValue(node);
                decrRefCount(kobj);
                listDelNode(keys, node);
            }
P
Pieter Noordhuis 已提交
654
        }
655
        node = nextnode;
P
Pieter Noordhuis 已提交
656 657
    }

658
    /* Step 4: Reply to the client. */
P
Pieter Noordhuis 已提交
659
    addReplyMultiBulkLen(c, 2);
660
    addReplyBulkLongLong(c,cursor);
P
Pieter Noordhuis 已提交
661 662

    addReplyMultiBulkLen(c, listLength(keys));
663 664
    while ((node = listFirst(keys)) != NULL) {
        robj *kobj = listNodeValue(node);
P
Pieter Noordhuis 已提交
665 666
        addReplyBulk(c, kobj);
        decrRefCount(kobj);
667
        listDelNode(keys, node);
P
Pieter Noordhuis 已提交
668 669 670
    }

cleanup:
671
    listSetFreeMethod(keys,decrRefCountVoid);
P
Pieter Noordhuis 已提交
672 673 674
    listRelease(keys);
}

675
/* The SCAN command completely relies on scanGenericCommand. */
676
void scanCommand(client *c) {
677
    unsigned long cursor;
678
    if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
679
    scanGenericCommand(c,NULL,cursor);
680 681
}

682
void dbsizeCommand(client *c) {
683
    addReplyLongLong(c,dictSize(c->db->dict));
684 685
}

686
void lastsaveCommand(client *c) {
687
    addReplyLongLong(c,server.lastsave);
688 689
}

690
void typeCommand(client *c) {
691 692 693 694 695
    robj *o;
    char *type;

    o = lookupKeyRead(c->db,c->argv[1]);
    if (o == NULL) {
696
        type = "none";
697 698
    } else {
        switch(o->type) {
699 700 701 702 703
        case OBJ_STRING: type = "string"; break;
        case OBJ_LIST: type = "list"; break;
        case OBJ_SET: type = "set"; break;
        case OBJ_ZSET: type = "zset"; break;
        case OBJ_HASH: type = "hash"; break;
704
        default: type = "unknown"; break;
705 706
        }
    }
707
    addReplyStatus(c,type);
708 709
}

710
void shutdownCommand(client *c) {
711 712 713 714 715 716 717
    int flags = 0;

    if (c->argc > 2) {
        addReply(c,shared.syntaxerr);
        return;
    } else if (c->argc == 2) {
        if (!strcasecmp(c->argv[1]->ptr,"nosave")) {
A
antirez 已提交
718
            flags |= SHUTDOWN_NOSAVE;
719
        } else if (!strcasecmp(c->argv[1]->ptr,"save")) {
A
antirez 已提交
720
            flags |= SHUTDOWN_SAVE;
721 722 723 724 725
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }
726 727
    /* When SHUTDOWN is called while the server is loading a dataset in
     * memory we need to make sure no attempt is performed to save
A
antirez 已提交
728
     * the dataset on shutdown (otherwise it could overwrite the current DB
729 730 731 732
     * with half-read data).
     *
     * Also when in Sentinel mode clear the SAVE flag and force NOSAVE. */
    if (server.loading || server.sentinel_mode)
A
antirez 已提交
733
        flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE;
734
    if (prepareForShutdown(flags) == C_OK) exit(0);
735
    addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
736 737
}

738
void renameGenericCommand(client *c, int nx) {
739
    robj *o;
740
    long long expire;
741
    int samekey = 0;
742

743 744 745
    /* When source and dest key is the same, no operation is performed,
     * if the key exists, however we still return an error on unexisting key. */
    if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1;
746 747 748 749

    if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
        return;

750 751 752 753 754
    if (samekey) {
        addReply(c,nx ? shared.czero : shared.ok);
        return;
    }

755
    incrRefCount(o);
756
    expire = getExpire(c->db,c->argv[1]);
757
    if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
758 759 760 761 762
        if (nx) {
            decrRefCount(o);
            addReply(c,shared.czero);
            return;
        }
A
antirez 已提交
763 764
        /* Overwrite: delete the old key before creating the new one
         * with the same name. */
765
        dbDelete(c->db,c->argv[2]);
766
    }
767 768
    dbAdd(c->db,c->argv[2],o);
    if (expire != -1) setExpire(c->db,c->argv[2],expire);
769
    dbDelete(c->db,c->argv[1]);
770 771
    signalModifiedKey(c->db,c->argv[1]);
    signalModifiedKey(c->db,c->argv[2]);
A
antirez 已提交
772
    notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
773
        c->argv[1],c->db->id);
A
antirez 已提交
774
    notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to",
775
        c->argv[2],c->db->id);
776 777 778 779
    server.dirty++;
    addReply(c,nx ? shared.cone : shared.ok);
}

780
void renameCommand(client *c) {
781 782 783
    renameGenericCommand(c,0);
}

784
void renamenxCommand(client *c) {
785 786 787
    renameGenericCommand(c,1);
}

788
void moveCommand(client *c) {
789 790 791
    robj *o;
    redisDb *src, *dst;
    int srcid;
792
    long long dbid, expire;
793

A
antirez 已提交
794 795 796 797 798
    if (server.cluster_enabled) {
        addReplyError(c,"MOVE is not allowed in cluster mode");
        return;
    }

799 800 801
    /* Obtain source and target DB pointers */
    src = c->db;
    srcid = c->db->id;
M
Matt Stancliff 已提交
802

803
    if (getLongLongFromObject(c->argv[2],&dbid) == C_ERR ||
M
Matt Stancliff 已提交
804
        dbid < INT_MIN || dbid > INT_MAX ||
805
        selectDb(c,dbid) == C_ERR)
M
Matt Stancliff 已提交
806
    {
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825
        addReply(c,shared.outofrangeerr);
        return;
    }
    dst = c->db;
    selectDb(c,srcid); /* Back to the source DB */

    /* If the user is moving using as target the same
     * DB as the source DB it is probably an error. */
    if (src == dst) {
        addReply(c,shared.sameobjecterr);
        return;
    }

    /* Check if the element exists and get a reference */
    o = lookupKeyWrite(c->db,c->argv[1]);
    if (!o) {
        addReply(c,shared.czero);
        return;
    }
826
    expire = getExpire(c->db,c->argv[1]);
827

828 829
    /* Return zero if the key already exists in the target DB */
    if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
830 831 832
        addReply(c,shared.czero);
        return;
    }
833
    dbAdd(dst,c->argv[1],o);
A
antirez 已提交
834
    if (expire != -1) setExpire(dst,c->argv[1],expire);
835 836 837 838 839 840 841 842 843 844 845 846 847 848 849
    incrRefCount(o);

    /* OK! key moved, free the entry in the source DB */
    dbDelete(src,c->argv[1]);
    server.dirty++;
    addReply(c,shared.cone);
}

/*-----------------------------------------------------------------------------
 * Expires API
 *----------------------------------------------------------------------------*/

int removeExpire(redisDb *db, robj *key) {
    /* An expire may only be removed if there is a corresponding entry in the
     * main dict. Otherwise, the key will never be freed. */
A
antirez 已提交
850
    serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
A
antirez 已提交
851
    return dictDelete(db->expires,key->ptr) == DICT_OK;
852 853
}

854 855
void setExpire(redisDb *db, robj *key, long long when) {
    dictEntry *kde, *de;
856 857

    /* Reuse the sds from the main dict in the expire dict */
858
    kde = dictFind(db->dict,key->ptr);
A
antirez 已提交
859
    serverAssertWithInfo(NULL,key,kde != NULL);
860 861
    de = dictReplaceRaw(db->expires,dictGetKey(kde));
    dictSetSignedIntegerVal(de,when);
862 863 864 865
}

/* Return the expire time of the specified key, or -1 if no expire
 * is associated with this key (i.e. the key is non volatile) */
866
long long getExpire(redisDb *db, robj *key) {
867 868 869 870 871 872 873 874
    dictEntry *de;

    /* No expire? return ASAP */
    if (dictSize(db->expires) == 0 ||
       (de = dictFind(db->expires,key->ptr)) == NULL) return -1;

    /* The entry was found in the expire dict, this means it should also
     * be present in the main dict (safety check). */
A
antirez 已提交
875
    serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
876
    return dictGetSignedIntegerVal(de);
877 878
}

879 880 881 882 883 884 885 886 887 888 889
/* Propagate expires into slaves and the AOF file.
 * When a key expires in the master, a DEL operation for this key is sent
 * to all the slaves and the AOF file if enabled.
 *
 * This way the key expiry is centralized in one place, and since both
 * AOF and the master->slave link guarantee operation ordering, everything
 * will be consistent even if we allow write operations against expiring
 * keys. */
void propagateExpire(redisDb *db, robj *key) {
    robj *argv[2];

890
    argv[0] = shared.del;
891
    argv[1] = key;
892 893
    incrRefCount(argv[0]);
    incrRefCount(argv[1]);
894

A
antirez 已提交
895
    if (server.aof_state != AOF_OFF)
896
        feedAppendOnlyFile(server.delCommand,db->id,argv,2);
897
    replicationFeedSlaves(server.slaves,db->id,argv,2);
898

899 900
    decrRefCount(argv[0]);
    decrRefCount(argv[1]);
901 902
}

903
int expireIfNeeded(redisDb *db, robj *key) {
904 905
    mstime_t when = getExpire(db,key);
    mstime_t now;
906

907 908
    if (when < 0) return 0; /* No expire for this key */

909 910 911
    /* Don't expire anything while loading. It will be done later. */
    if (server.loading) return 0;

912 913 914 915 916 917 918
    /* If we are in the context of a Lua script, we claim that time is
     * blocked to when the Lua script started. This way a key can expire
     * only the first time it is accessed and not in the middle of the
     * script execution, making propagation to slaves / AOF consistent.
     * See issue #1525 on Github for more information. */
    now = server.lua_caller ? server.lua_time_start : mstime();

919 920 921 922
    /* If we are running in the context of a slave, return ASAP:
     * the slave key expiration is controlled by the master that will
     * send us synthesized DEL operations for expired keys.
     *
923
     * Still we try to return the right information to the caller,
924 925
     * that is, 0 if we think the key should be still valid, 1 if
     * we think the key is expired at this time. */
926
    if (server.masterhost != NULL) return now > when;
927

928
    /* Return when this key has not expired */
929
    if (now <= when) return 0;
930 931 932

    /* Delete the key */
    server.stat_expiredkeys++;
933
    propagateExpire(db,key);
A
antirez 已提交
934
    notifyKeyspaceEvent(NOTIFY_EXPIRED,
935
        "expired",key,db->id);
936 937 938 939 940 941 942
    return dbDelete(db,key);
}

/*-----------------------------------------------------------------------------
 * Expires Commands
 *----------------------------------------------------------------------------*/

943 944 945 946 947 948
/* This is the generic command implementation for EXPIRE, PEXPIRE, EXPIREAT
 * and PEXPIREAT. Because the commad second argument may be relative or absolute
 * the "basetime" argument is used to signal what the base time is (either 0
 * for *AT variants of the command, or the current time for relative expires).
 *
 * unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for
G
guiquanz 已提交
949
 * the argv[2] parameter. The basetime is always specified in milliseconds. */
950
void expireGenericCommand(client *c, long long basetime, int unit) {
951
    robj *key = c->argv[1], *param = c->argv[2];
952
    long long when; /* unix time in milliseconds when the key will expire. */
953

954
    if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK)
955
        return;
956

957 958
    if (unit == UNIT_SECONDS) when *= 1000;
    when += basetime;
959

960
    /* No key, return zero. */
961
    if (lookupKeyWrite(c->db,key) == NULL) {
962 963 964
        addReply(c,shared.czero);
        return;
    }
965

966 967 968 969 970 971
    /* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
     * should never be executed as a DEL when load the AOF or in the context
     * of a slave instance.
     *
     * Instead we take the other branch of the IF statement setting an expire
     * (possibly in the past) and wait for an explicit DEL from the master. */
972
    if (when <= mstime() && !server.loading && !server.masterhost) {
973 974
        robj *aux;

A
antirez 已提交
975
        serverAssertWithInfo(c,key,dbDelete(c->db,key));
976 977 978 979 980 981
        server.dirty++;

        /* Replicate/AOF this as an explicit DEL. */
        aux = createStringObject("DEL",3);
        rewriteClientCommandVector(c,2,aux,key);
        decrRefCount(aux);
982
        signalModifiedKey(c->db,key);
A
antirez 已提交
983
        notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
984
        addReply(c, shared.cone);
985 986
        return;
    } else {
987
        setExpire(c->db,key,when);
988
        addReply(c,shared.cone);
989
        signalModifiedKey(c->db,key);
A
antirez 已提交
990
        notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
991
        server.dirty++;
992 993 994 995
        return;
    }
}

996
void expireCommand(client *c) {
997
    expireGenericCommand(c,mstime(),UNIT_SECONDS);
998 999
}

1000
void expireatCommand(client *c) {
1001
    expireGenericCommand(c,0,UNIT_SECONDS);
1002 1003
}

1004
void pexpireCommand(client *c) {
1005
    expireGenericCommand(c,mstime(),UNIT_MILLISECONDS);
1006
}
1007

1008
void pexpireatCommand(client *c) {
1009
    expireGenericCommand(c,0,UNIT_MILLISECONDS);
1010 1011
}

1012
void ttlGenericCommand(client *c, int output_ms) {
1013
    long long expire, ttl = -1;
1014

1015
    /* If the key does not exist at all, return -2 */
A
antirez 已提交
1016
    if (lookupKeyRead(c->db,c->argv[1]) == NULL) {
1017 1018 1019 1020 1021
        addReplyLongLong(c,-2);
        return;
    }
    /* The key exists. Return -1 if it has no expire, or the actual
     * TTL value otherwise. */
A
antirez 已提交
1022
    expire = getExpire(c->db,c->argv[1]);
1023
    if (expire != -1) {
1024
        ttl = expire-mstime();
A
antirez 已提交
1025
        if (ttl < 0) ttl = 0;
1026
    }
1027 1028 1029
    if (ttl == -1) {
        addReplyLongLong(c,-1);
    } else {
1030
        addReplyLongLong(c,output_ms ? ttl : ((ttl+500)/1000));
1031
    }
1032
}
A
antirez 已提交
1033

1034
void ttlCommand(client *c) {
1035 1036 1037
    ttlGenericCommand(c, 0);
}

1038
void pttlCommand(client *c) {
1039 1040 1041
    ttlGenericCommand(c, 1);
}

1042
void persistCommand(client *c) {
A
antirez 已提交
1043 1044 1045 1046 1047 1048
    dictEntry *de;

    de = dictFind(c->db->dict,c->argv[1]->ptr);
    if (de == NULL) {
        addReply(c,shared.czero);
    } else {
A
antirez 已提交
1049
        if (removeExpire(c->db,c->argv[1])) {
A
antirez 已提交
1050
            addReply(c,shared.cone);
A
antirez 已提交
1051 1052
            server.dirty++;
        } else {
A
antirez 已提交
1053
            addReply(c,shared.czero);
A
antirez 已提交
1054
        }
A
antirez 已提交
1055 1056
    }
}
1057 1058 1059 1060 1061

/* -----------------------------------------------------------------------------
 * API to get key arguments from commands
 * ---------------------------------------------------------------------------*/

1062 1063
/* The base case is to use the keys position as given in the command table
 * (firstkey, lastkey, step). */
1064 1065
int *getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, int *numkeys) {
    int j, i = 0, last, *keys;
A
antirez 已提交
1066
    UNUSED(argv);
1067 1068 1069 1070 1071 1072 1073 1074 1075

    if (cmd->firstkey == 0) {
        *numkeys = 0;
        return NULL;
    }
    last = cmd->lastkey;
    if (last < 0) last = argc+last;
    keys = zmalloc(sizeof(int)*((last - cmd->firstkey)+1));
    for (j = cmd->firstkey; j <= last; j += cmd->keystep) {
A
antirez 已提交
1076
        serverAssert(j < argc);
1077
        keys[i++] = j;
1078
    }
1079
    *numkeys = i;
1080 1081 1082
    return keys;
}

1083 1084 1085 1086 1087 1088 1089 1090
/* Return all the arguments that are keys in the command passed via argc / argv.
 *
 * The command returns the positions of all the key arguments inside the array,
 * so the actual return value is an heap allocated array of integers. The
 * length of the array is returned by reference into *numkeys.
 *
 * 'cmd' must be point to the corresponding entry into the redisCommand
 * table, according to the command name in argv[0].
1091 1092 1093
 *
 * This function uses the command table if a command-specific helper function
 * is not required, otherwise it calls the command-specific function. */
1094
int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1095
    if (cmd->getkeys_proc) {
1096
        return cmd->getkeys_proc(cmd,argv,argc,numkeys);
1097 1098 1099 1100 1101
    } else {
        return getKeysUsingCommandTable(cmd,argv,argc,numkeys);
    }
}

1102
/* Free the result of getKeysFromCommand. */
1103 1104 1105 1106
void getKeysFreeResult(int *result) {
    zfree(result);
}

1107 1108 1109 1110
/* Helper function to extract keys from following commands:
 * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options>
 * ZINTERSTORE <destkey> <num-keys> <key> <key> ... <key> <options> */
int *zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
1111
    int i, num, *keys;
A
antirez 已提交
1112
    UNUSED(cmd);
1113 1114 1115 1116 1117 1118 1119 1120

    num = atoi(argv[2]->ptr);
    /* Sanity check. Don't return any key if the command is going to
     * reply with syntax error. */
    if (num > (argc-3)) {
        *numkeys = 0;
        return NULL;
    }
1121 1122

    /* Keys in z{union,inter}store come from two places:
1123 1124
     * argv[1] = storage key,
     * argv[3...n] = keys to intersect */
1125 1126 1127
    keys = zmalloc(sizeof(int)*(num+1));

    /* Add all key positions for argv[3...n] to keys[] */
1128
    for (i = 0; i < num; i++) keys[i] = 3+i;
1129

1130
    /* Finally add the argv[1] key position (the storage key target). */
1131 1132
    keys[num] = 1;
    *numkeys = num+1;  /* Total keys = {union,inter} keys + storage key */
1133 1134
    return keys;
}
1135

1136 1137 1138 1139 1140
/* Helper function to extract keys from the following commands:
 * EVAL <script> <num-keys> <key> <key> ... <key> [more stuff]
 * EVALSHA <script> <num-keys> <key> <key> ... <key> [more stuff] */
int *evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
    int i, num, *keys;
A
antirez 已提交
1141
    UNUSED(cmd);
1142 1143 1144 1145 1146 1147 1148 1149 1150 1151

    num = atoi(argv[2]->ptr);
    /* Sanity check. Don't return any key if the command is going to
     * reply with syntax error. */
    if (num > (argc-3)) {
        *numkeys = 0;
        return NULL;
    }

    keys = zmalloc(sizeof(int)*num);
1152
    *numkeys = num;
1153 1154 1155 1156 1157 1158 1159

    /* Add all key positions for argv[3...n] to keys[] */
    for (i = 0; i < num; i++) keys[i] = 3+i;

    return keys;
}

1160 1161 1162 1163 1164 1165 1166 1167
/* Helper function to extract keys from the SORT command.
 *
 * SORT <sort-key> ... STORE <store-key> ...
 *
 * The first argument of SORT is always a key, however a list of options
 * follow in SQL-alike style. Here we parse just the minimum in order to
 * correctly identify keys in the "STORE" option. */
int *sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
M
Matt Stancliff 已提交
1168
    int i, j, num, *keys, found_store = 0;
A
antirez 已提交
1169
    UNUSED(cmd);
1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195

    num = 0;
    keys = zmalloc(sizeof(int)*2); /* Alloc 2 places for the worst case. */

    keys[num++] = 1; /* <sort-key> is always present. */

    /* Search for STORE option. By default we consider options to don't
     * have arguments, so if we find an unknown option name we scan the
     * next. However there are options with 1 or 2 arguments, so we
     * provide a list here in order to skip the right number of args. */
    struct {
        char *name;
        int skip;
    } skiplist[] = {
        {"limit", 2},
        {"get", 1},
        {"by", 1},
        {NULL, 0} /* End of elements. */
    };

    for (i = 2; i < argc; i++) {
        for (j = 0; skiplist[j].name != NULL; j++) {
            if (!strcasecmp(argv[i]->ptr,skiplist[j].name)) {
                i += skiplist[j].skip;
                break;
            } else if (!strcasecmp(argv[i]->ptr,"store") && i+1 < argc) {
1196 1197 1198
                /* Note: we don't increment "num" here and continue the loop
                 * to be sure to process the *last* "STORE" option if multiple
                 * ones are provided. This is same behavior as SORT. */
M
Matt Stancliff 已提交
1199
                found_store = 1;
1200
                keys[num] = i+1; /* <store-key> */
1201 1202 1203 1204
                break;
            }
        }
    }
M
Matt Stancliff 已提交
1205
    *numkeys = num + found_store;
1206 1207 1208
    return keys;
}

1209 1210 1211
/* Slot to Key API. This is used by Redis Cluster in order to obtain in
 * a fast way a key that belongs to a specified hash slot. This is useful
 * while rehashing the cluster. */
1212
void slotToKeyAdd(robj *key) {
1213 1214
    unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));

1215 1216
    sds sdskey = sdsdup(key->ptr);
    zslInsert(server.cluster->slots_to_keys,hashslot,sdskey);
1217 1218
}

1219
void slotToKeyDel(robj *key) {
1220
    unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
1221
    zslDelete(server.cluster->slots_to_keys,hashslot,key->ptr,NULL);
1222 1223
}

1224
void slotToKeyFlush(void) {
1225 1226 1227 1228
    zslFree(server.cluster->slots_to_keys);
    server.cluster->slots_to_keys = zslCreate();
}

1229 1230 1231
/* Pupulate the specified array of objects with keys in the specified slot.
 * New objects are returned to represent keys, it's up to the caller to
 * decrement the reference count to release the keys names. */
1232
unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
1233 1234
    zskiplistNode *n;
    zrangespec range;
A
antirez 已提交
1235
    int j = 0;
1236 1237 1238

    range.min = range.max = hashslot;
    range.minex = range.maxex = 0;
1239

1240
    n = zslFirstInRange(server.cluster->slots_to_keys, &range);
A
antirez 已提交
1241
    while(n && n->score == hashslot && count--) {
1242
        keys[j++] = createStringObject(n->ele,sdslen(n->ele));
A
antirez 已提交
1243 1244 1245
        n = n->level[0].forward;
    }
    return j;
1246
}
1247

1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259
/* Remove all the keys in the specified hash slot.
 * The number of removed items is returned. */
unsigned int delKeysInSlot(unsigned int hashslot) {
    zskiplistNode *n;
    zrangespec range;
    int j = 0;

    range.min = range.max = hashslot;
    range.minex = range.maxex = 0;

    n = zslFirstInRange(server.cluster->slots_to_keys, &range);
    while(n && n->score == hashslot) {
1260 1261
        sds sdskey = n->ele;
        robj *key = createStringObject(sdskey,sdslen(sdskey));
1262 1263 1264 1265 1266 1267 1268 1269
        n = n->level[0].forward; /* Go to the next item before freeing it. */
        dbDelete(&server.db[0],key);
        decrRefCount(key);
        j++;
    }
    return j;
}

1270
unsigned int countKeysInSlot(unsigned int hashslot) {
1271 1272
    zskiplist *zsl = server.cluster->slots_to_keys;
    zskiplistNode *zn;
1273
    zrangespec range;
1274
    int rank, count = 0;
1275 1276 1277

    range.min = range.max = hashslot;
    range.minex = range.maxex = 0;
1278 1279

    /* Find first element in range */
1280
    zn = zslFirstInRange(zsl, &range);
1281 1282 1283

    /* Use rank of first element, if any, to determine preliminary count */
    if (zn != NULL) {
1284
        rank = zslGetRank(zsl, zn->score, zn->ele);
1285 1286 1287
        count = (zsl->length - (rank - 1));

        /* Find last element in range */
1288
        zn = zslLastInRange(zsl, &range);
1289 1290 1291

        /* Use rank of last element, if any, to determine the actual count */
        if (zn != NULL) {
1292
            rank = zslGetRank(zsl, zn->score, zn->ele);
1293 1294
            count -= (zsl->length - rank);
        }
1295
    }
1296
    return count;
1297
}