db.c 34.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
/*
 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   * Redistributions of source code must retain the above copyright notice,
 *     this list of conditions and the following disclaimer.
 *   * Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *   * Neither the name of Redis nor the names of its contributors may be used
 *     to endorse or promote products derived from this software without
 *     specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

30
#include "redis.h"
31
#include "cluster.h"
32 33

#include <signal.h>
34
#include <ctype.h>
35

36 37 38
void slotToKeyAdd(robj *key);
void slotToKeyDel(robj *key);
void slotToKeyFlush(void);
39

40 41 42 43 44 45 46
/*-----------------------------------------------------------------------------
 * C-level DB API
 *----------------------------------------------------------------------------*/

robj *lookupKey(redisDb *db, robj *key) {
    dictEntry *de = dictFind(db->dict,key->ptr);
    if (de) {
47
        robj *val = dictGetVal(de);
48

G
guiquanz 已提交
49
        /* Update the access time for the ageing algorithm.
50 51
         * Don't do it if we have a saving child, as this will trigger
         * a copy on write madness. */
A
antirez 已提交
52
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
53
            val->lru = server.lruclock;
54 55 56 57 58 59 60
        return val;
    } else {
        return NULL;
    }
}

robj *lookupKeyRead(redisDb *db, robj *key) {
61 62
    robj *val;

63
    expireIfNeeded(db,key);
64 65 66 67 68 69
    val = lookupKey(db,key);
    if (val == NULL)
        server.stat_keyspace_misses++;
    else
        server.stat_keyspace_hits++;
    return val;
70 71 72
}

robj *lookupKeyWrite(redisDb *db, robj *key) {
73
    expireIfNeeded(db,key);
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
    return lookupKey(db,key);
}

robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
    robj *o = lookupKeyRead(c->db, key);
    if (!o) addReply(c,reply);
    return o;
}

robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
    robj *o = lookupKeyWrite(c->db, key);
    if (!o) addReply(c,reply);
    return o;
}

89
/* Add the key to the DB. It's up to the caller to increment the reference
G
guiquanz 已提交
90
 * counter of the value if needed.
91 92 93 94 95 96
 *
 * The program is aborted if the key already exists. */
void dbAdd(redisDb *db, robj *key, robj *val) {
    sds copy = sdsdup(key->ptr);
    int retval = dictAdd(db->dict, copy, val);

97
    redisAssertWithInfo(NULL,key,retval == REDIS_OK);
98
    if (server.cluster_enabled) slotToKeyAdd(key);
99 100 101 102 103 104 105 106 107 108
 }

/* Overwrite an existing key with a new value. Incrementing the reference
 * count of the new value is up to the caller.
 * This function does not modify the expire time of the existing key.
 *
 * The program is aborted if the key was not already present. */
void dbOverwrite(redisDb *db, robj *key, robj *val) {
    struct dictEntry *de = dictFind(db->dict,key->ptr);
    
109
    redisAssertWithInfo(NULL,key,de != NULL);
110
    dictReplace(db->dict, key->ptr, val);
111 112
}

113 114
/* High level Set operation. This function can be used in order to set
 * a key, whatever it was existing or not, to a new object.
115
 *
116 117 118 119 120 121
 * 1) The ref count of the value object is incremented.
 * 2) clients WATCHing for the destination key notified.
 * 3) The expire time of the key is reset (the key is made persistent). */
void setKey(redisDb *db, robj *key, robj *val) {
    if (lookupKeyWrite(db,key) == NULL) {
        dbAdd(db,key,val);
122
    } else {
123
        dbOverwrite(db,key,val);
124
    }
125 126
    incrRefCount(val);
    removeExpire(db,key);
127
    signalModifiedKey(db,key);
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
}

int dbExists(redisDb *db, robj *key) {
    return dictFind(db->dict,key->ptr) != NULL;
}

/* Return a random key, in form of a Redis object.
 * If there are no keys, NULL is returned.
 *
 * The function makes sure to return keys not already expired. */
robj *dbRandomKey(redisDb *db) {
    struct dictEntry *de;

    while(1) {
        sds key;
        robj *keyobj;

        de = dictGetRandomKey(db->dict);
        if (de == NULL) return NULL;

148
        key = dictGetKey(de);
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
        keyobj = createStringObject(key,sdslen(key));
        if (dictFind(db->expires,key)) {
            if (expireIfNeeded(db,keyobj)) {
                decrRefCount(keyobj);
                continue; /* search for another key. This expired. */
            }
        }
        return keyobj;
    }
}

/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbDelete(redisDb *db, robj *key) {
    /* Deleting an entry from the expires dict will not free the sds of
     * the key, because it is shared with the main dictionary. */
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
165
    if (dictDelete(db->dict,key->ptr) == DICT_OK) {
166
        if (server.cluster_enabled) slotToKeyDel(key);
167 168 169 170
        return 1;
    } else {
        return 0;
    }
171 172
}

173
long long emptyDb(void(callback)(void*)) {
174 175 176 177 178
    int j;
    long long removed = 0;

    for (j = 0; j < server.dbnum; j++) {
        removed += dictSize(server.db[j].dict);
179 180
        dictEmpty(server.db[j].dict,callback);
        dictEmpty(server.db[j].expires,callback);
181
    }
182
    if (server.cluster_enabled) slotToKeyFlush();
183 184 185 186 187 188 189 190 191 192
    return removed;
}

int selectDb(redisClient *c, int id) {
    if (id < 0 || id >= server.dbnum)
        return REDIS_ERR;
    c->db = &server.db[id];
    return REDIS_OK;
}

193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
/*-----------------------------------------------------------------------------
 * Hooks for key space changes.
 *
 * Every time a key in the database is modified the function
 * signalModifiedKey() is called.
 *
 * Every time a DB is flushed the function signalFlushDb() is called.
 *----------------------------------------------------------------------------*/

void signalModifiedKey(redisDb *db, robj *key) {
    touchWatchedKey(db,key);
}

void signalFlushedDb(int dbid) {
    touchWatchedKeysOnFlush(dbid);
}

210 211 212 213 214 215
/*-----------------------------------------------------------------------------
 * Type agnostic commands operating on the key space
 *----------------------------------------------------------------------------*/

void flushdbCommand(redisClient *c) {
    server.dirty += dictSize(c->db->dict);
216
    signalFlushedDb(c->db->id);
217 218
    dictEmpty(c->db->dict,NULL);
    dictEmpty(c->db->expires,NULL);
219
    if (server.cluster_enabled) slotToKeyFlush();
220 221 222 223
    addReply(c,shared.ok);
}

void flushallCommand(redisClient *c) {
224
    signalFlushedDb(-1);
225
    server.dirty += emptyDb(NULL);
226
    addReply(c,shared.ok);
A
antirez 已提交
227
    if (server.rdb_child_pid != -1) {
228
        kill(server.rdb_child_pid,SIGUSR1);
A
antirez 已提交
229
        rdbRemoveTempFile(server.rdb_child_pid);
230
    }
231 232 233 234
    if (server.saveparamslen > 0) {
        /* Normally rdbSave() will reset dirty, but we don't want this here
         * as otherwise FLUSHALL will not be replicated nor put into the AOF. */
        int saved_dirty = server.dirty;
A
antirez 已提交
235
        rdbSave(server.rdb_filename);
236 237
        server.dirty = saved_dirty;
    }
238 239 240 241 242 243 244 245
    server.dirty++;
}

void delCommand(redisClient *c) {
    int deleted = 0, j;

    for (j = 1; j < c->argc; j++) {
        if (dbDelete(c->db,c->argv[j])) {
246
            signalModifiedKey(c->db,c->argv[j]);
247 248
            notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
                "del",c->argv[j],c->db->id);
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
            server.dirty++;
            deleted++;
        }
    }
    addReplyLongLong(c,deleted);
}

void existsCommand(redisClient *c) {
    expireIfNeeded(c->db,c->argv[1]);
    if (dbExists(c->db,c->argv[1])) {
        addReply(c, shared.cone);
    } else {
        addReply(c, shared.czero);
    }
}

void selectCommand(redisClient *c) {
266 267 268 269 270
    long id;

    if (getLongFromObjectOrReply(c, c->argv[1], &id,
        "invalid DB index") != REDIS_OK)
        return;
271

272
    if (server.cluster_enabled && id != 0) {
A
antirez 已提交
273 274 275
        addReplyError(c,"SELECT is not allowed in cluster mode");
        return;
    }
276
    if (selectDb(c,id) == REDIS_ERR) {
277
        addReplyError(c,"invalid DB index");
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
    } else {
        addReply(c,shared.ok);
    }
}

void randomkeyCommand(redisClient *c) {
    robj *key;

    if ((key = dbRandomKey(c->db)) == NULL) {
        addReply(c,shared.nullbulk);
        return;
    }

    addReplyBulk(c,key);
    decrRefCount(key);
}

void keysCommand(redisClient *c) {
    dictIterator *di;
    dictEntry *de;
    sds pattern = c->argv[1]->ptr;
A
antirez 已提交
299
    int plen = sdslen(pattern), allkeys;
300
    unsigned long numkeys = 0;
301
    void *replylen = addDeferredMultiBulkLength(c);
302

303
    di = dictGetSafeIterator(c->db->dict);
A
antirez 已提交
304
    allkeys = (pattern[0] == '*' && pattern[1] == '\0');
305
    while((de = dictNext(di)) != NULL) {
306
        sds key = dictGetKey(de);
307 308
        robj *keyobj;

A
antirez 已提交
309
        if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
310 311 312 313 314 315 316 317 318
            keyobj = createStringObject(key,sdslen(key));
            if (expireIfNeeded(c->db,keyobj) == 0) {
                addReplyBulk(c,keyobj);
                numkeys++;
            }
            decrRefCount(keyobj);
        }
    }
    dictReleaseIterator(di);
319
    setDeferredMultiBulkLength(c,replylen,numkeys);
320 321
}

322 323
/* This callback is used by scanGenericCommand in order to collect elements
 * returned by the dictionary iterator into a list. */
P
Pieter Noordhuis 已提交
324
void scanCallback(void *privdata, const dictEntry *de) {
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
    void **pd = (void**) privdata;
    list *keys = pd[0];
    robj *o = pd[1];
    robj *key, *val = NULL;

    if (o == NULL) {
        sds sdskey = dictGetKey(de);
        key = createStringObject(sdskey, sdslen(sdskey));
    } else if (o->type == REDIS_SET) {
        key = dictGetKey(de);
        incrRefCount(key);
    } else if (o->type == REDIS_HASH) {
        key = dictGetKey(de);
        incrRefCount(key);
        val = dictGetVal(de);
        incrRefCount(val);
    } else if (o->type == REDIS_ZSET) {
        key = dictGetKey(de);
        incrRefCount(key);
        val = createStringObjectFromLongDouble(*(double*)dictGetVal(de));
    } else {
        redisPanic("Type not handled in SCAN callback.");
    }

    listAddNodeTail(keys, key);
    if (val) listAddNodeTail(keys, val);
P
Pieter Noordhuis 已提交
351 352
}

353
/* Try to parse a SCAN cursor stored at object 'o':
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
 * if the cursor is valid, store it as unsigned integer into *cursor and
 * returns REDIS_OK. Otherwise return REDIS_ERR and send an error to the
 * client. */
int parseScanCursorOrReply(redisClient *c, robj *o, unsigned long *cursor) {
    char *eptr;

    /* Use strtoul() because we need an *unsigned* long, so
     * getLongLongFromObject() does not cover the whole cursor space. */
    errno = 0;
    *cursor = strtoul(o->ptr, &eptr, 10);
    if (isspace(((char*)o->ptr)[0]) || eptr[0] != '\0' || errno == ERANGE)
    {
        addReplyError(c, "invalid cursor");
        return REDIS_ERR;
    }
    return REDIS_OK;
}

372
/* This command implements SCAN, HSCAN and SSCAN commands.
373
 * If object 'o' is passed, then it must be a Hash or Set object, otherwise
374 375 376 377 378 379 380
 * if 'o' is NULL the command will operate on the dictionary associated with
 * the current database.
 *
 * When 'o' is not NULL the function assumes that the first argument in
 * the client arguments vector is a key so it skips it before iterating
 * in order to parse options.
 *
381
 * In the case of a Hash object the function returns both the field and value
382
 * of every element on the Hash. */
383
void scanGenericCommand(redisClient *c, robj *o, unsigned long cursor) {
P
Pieter Noordhuis 已提交
384 385
    int rv;
    int i, j;
386
    char buf[REDIS_LONGSTR_SIZE];
P
Pieter Noordhuis 已提交
387
    list *keys = listCreate();
388
    listNode *node, *nextnode;
389
    long count = 10;
P
Pieter Noordhuis 已提交
390
    sds pat;
391
    int patlen, use_pattern = 0;
392 393 394 395 396 397 398 399 400
    dict *ht;

    /* Object must be NULL (to iterate keys names), or the type of the object
     * must be Set, Sorted Set, or Hash. */
    redisAssert(o == NULL || o->type == REDIS_SET || o->type == REDIS_HASH ||
                o->type == REDIS_ZSET);

    /* Set i to the first option argument. The previous one is the cursor. */
    i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */
P
Pieter Noordhuis 已提交
401

402
    /* Step 1: Parse options. */
P
Pieter Noordhuis 已提交
403 404 405
    while (i < c->argc) {
        j = c->argc - i;
        if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
A
antirez 已提交
406 407 408
            if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
                != REDIS_OK)
            {
P
Pieter Noordhuis 已提交
409 410 411 412 413 414 415 416 417
                goto cleanup;
            }

            if (count < 1) {
                addReply(c,shared.syntaxerr);
                goto cleanup;
            }

            i += 2;
418
        } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
P
Pieter Noordhuis 已提交
419 420 421
            pat = c->argv[i+1]->ptr;
            patlen = sdslen(pat);

422 423 424
            /* The pattern always matches if it is exactly "*", so it is
             * equivalent to disabling it. */
            use_pattern = !(pat[0] == '*' && patlen == 1);
P
Pieter Noordhuis 已提交
425 426 427 428 429 430 431 432

            i += 2;
        } else {
            addReply(c,shared.syntaxerr);
            goto cleanup;
        }
    }

433 434 435
    /* Step 2: Iterate the collection.
     *
     * Note that if the object is encoded with a ziplist, intset, or any other
436
     * representation that is not a hash table, we are sure that it is also
437 438 439 440
     * composed of a small number of elements. So to avoid taking state we
     * just return everything inside the object in a single call, setting the
     * cursor to zero to signal the end of the iteration. */

441
    /* Handle the case of a hash table. */
442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468
    ht = NULL;
    if (o == NULL) {
        ht = c->db->dict;
    } else if (o->type == REDIS_SET && o->encoding == REDIS_ENCODING_HT) {
        ht = o->ptr;
    } else if (o->type == REDIS_HASH && o->encoding == REDIS_ENCODING_HT) {
        ht = o->ptr;
        count *= 2; /* We return key / value for this type. */
    } else if (o->type == REDIS_ZSET && o->encoding == REDIS_ENCODING_SKIPLIST) {
        zset *zs = o->ptr;
        ht = zs->dict;
        count *= 2; /* We return key / value for this type. */
    }

    if (ht) {
        void *privdata[2];

        /* We pass two pointers to the callback: the list to which it will
         * add new elements, and the object containing the dictionary so that
         * it is possible to fetch more data in a type-dependent way. */
        privdata[0] = keys;
        privdata[1] = o;
        do {
            cursor = dictScan(ht, cursor, scanCallback, privdata);
        } while (cursor && listLength(keys) < count);
    } else if (o->type == REDIS_SET) {
        int pos = 0;
469
        int64_t ll;
470 471 472

        while(intsetGet(o->ptr,pos++,&ll))
            listAddNodeTail(keys,createStringObjectFromLongLong(ll));
473
        cursor = 0;
474 475 476 477 478 479 480 481 482 483 484
    } else if (o->type == REDIS_HASH || o->type == REDIS_ZSET) {
        unsigned char *p = ziplistIndex(o->ptr,0);
        unsigned char *vstr;
        unsigned int vlen;
        long long vll;

        while(p) {
            ziplistGet(p,&vstr,&vlen,&vll);
            listAddNodeTail(keys,
                (vstr != NULL) ? createStringObject((char*)vstr,vlen) :
                                 createStringObjectFromLongLong(vll));
A
antirez 已提交
485
            p = ziplistNext(o->ptr,p);
486
        }
487
        cursor = 0;
488 489 490
    } else {
        redisPanic("Not handled encoding in SCAN.");
    }
P
Pieter Noordhuis 已提交
491

492
    /* Step 3: Filter elements. */
493 494 495 496
    node = listFirst(keys);
    while (node) {
        robj *kobj = listNodeValue(node);
        nextnode = listNextNode(node);
497 498 499
        int filter = 0;

        /* Filter element if it does not match the pattern. */
500
        if (!filter && use_pattern) {
501 502 503 504 505 506 507 508 509 510 511 512
            if (sdsEncodedObject(kobj)) {
                if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0))
                    filter = 1;
            } else {
                char buf[REDIS_LONGSTR_SIZE];
                int len;

                redisAssert(kobj->encoding == REDIS_ENCODING_INT);
                len = ll2string(buf,sizeof(buf),(long)kobj->ptr);
                if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1;
            }
        }
P
Pieter Noordhuis 已提交
513

514 515 516 517 518
        /* Filter element if it is an expired key. */
        if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1;

        /* Remove the element and its associted value if needed. */
        if (filter) {
P
Pieter Noordhuis 已提交
519
            decrRefCount(kobj);
520
            listDelNode(keys, node);
A
antirez 已提交
521 522
        }

523
        /* If this is a hash or a sorted set, we have a flat list of
A
antirez 已提交
524 525 526 527 528 529
         * key-value elements, so if this element was filtered, remove the
         * value, or skip it if it was not filtered: we only match keys. */
        if (o && (o->type == REDIS_ZSET || o->type == REDIS_HASH)) {
            node = nextnode;
            nextnode = listNextNode(node);
            if (filter) {
530 531 532 533
                kobj = listNodeValue(node);
                decrRefCount(kobj);
                listDelNode(keys, node);
            }
P
Pieter Noordhuis 已提交
534
        }
535
        node = nextnode;
P
Pieter Noordhuis 已提交
536 537
    }

538
    /* Step 4: Reply to the client. */
P
Pieter Noordhuis 已提交
539 540 541 542 543 544
    addReplyMultiBulkLen(c, 2);
    rv = snprintf(buf, sizeof(buf), "%lu", cursor);
    redisAssert(rv < sizeof(buf));
    addReplyBulkCBuffer(c, buf, rv);

    addReplyMultiBulkLen(c, listLength(keys));
545 546
    while ((node = listFirst(keys)) != NULL) {
        robj *kobj = listNodeValue(node);
P
Pieter Noordhuis 已提交
547 548
        addReplyBulk(c, kobj);
        decrRefCount(kobj);
549
        listDelNode(keys, node);
P
Pieter Noordhuis 已提交
550 551 552
    }

cleanup:
553
    listSetFreeMethod(keys,decrRefCountVoid);
P
Pieter Noordhuis 已提交
554 555 556
    listRelease(keys);
}

557 558
/* The SCAN command completely relies on scanGenericCommand. */
void scanCommand(redisClient *c) {
559 560 561
    unsigned long cursor;
    if (parseScanCursorOrReply(c,c->argv[1],&cursor) == REDIS_ERR) return;
    scanGenericCommand(c,NULL,cursor);
562 563
}

564
void dbsizeCommand(redisClient *c) {
565
    addReplyLongLong(c,dictSize(c->db->dict));
566 567 568
}

void lastsaveCommand(redisClient *c) {
569
    addReplyLongLong(c,server.lastsave);
570 571 572 573 574 575 576 577
}

void typeCommand(redisClient *c) {
    robj *o;
    char *type;

    o = lookupKeyRead(c->db,c->argv[1]);
    if (o == NULL) {
578
        type = "none";
579 580
    } else {
        switch(o->type) {
581 582 583 584 585 586
        case REDIS_STRING: type = "string"; break;
        case REDIS_LIST: type = "list"; break;
        case REDIS_SET: type = "set"; break;
        case REDIS_ZSET: type = "zset"; break;
        case REDIS_HASH: type = "hash"; break;
        default: type = "unknown"; break;
587 588
        }
    }
589
    addReplyStatus(c,type);
590 591 592
}

void shutdownCommand(redisClient *c) {
593 594 595 596 597 598 599 600 601 602 603 604 605 606 607
    int flags = 0;

    if (c->argc > 2) {
        addReply(c,shared.syntaxerr);
        return;
    } else if (c->argc == 2) {
        if (!strcasecmp(c->argv[1]->ptr,"nosave")) {
            flags |= REDIS_SHUTDOWN_NOSAVE;
        } else if (!strcasecmp(c->argv[1]->ptr,"save")) {
            flags |= REDIS_SHUTDOWN_SAVE;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }
608 609
    /* When SHUTDOWN is called while the server is loading a dataset in
     * memory we need to make sure no attempt is performed to save
A
antirez 已提交
610
     * the dataset on shutdown (otherwise it could overwrite the current DB
611 612 613 614
     * with half-read data).
     *
     * Also when in Sentinel mode clear the SAVE flag and force NOSAVE. */
    if (server.loading || server.sentinel_mode)
A
antirez 已提交
615
        flags = (flags & ~REDIS_SHUTDOWN_SAVE) | REDIS_SHUTDOWN_NOSAVE;
616
    if (prepareForShutdown(flags) == REDIS_OK) exit(0);
617
    addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
618 619 620 621
}

void renameGenericCommand(redisClient *c, int nx) {
    robj *o;
622
    long long expire;
623 624 625 626 627 628 629 630 631 632 633

    /* To use the same key as src and dst is probably an error */
    if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
        addReply(c,shared.sameobjecterr);
        return;
    }

    if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
        return;

    incrRefCount(o);
634
    expire = getExpire(c->db,c->argv[1]);
635
    if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
636 637 638 639 640
        if (nx) {
            decrRefCount(o);
            addReply(c,shared.czero);
            return;
        }
A
antirez 已提交
641 642
        /* Overwrite: delete the old key before creating the new one
         * with the same name. */
643
        dbDelete(c->db,c->argv[2]);
644
    }
645 646
    dbAdd(c->db,c->argv[2],o);
    if (expire != -1) setExpire(c->db,c->argv[2],expire);
647
    dbDelete(c->db,c->argv[1]);
648 649
    signalModifiedKey(c->db,c->argv[1]);
    signalModifiedKey(c->db,c->argv[2]);
650 651 652 653
    notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"rename_from",
        c->argv[1],c->db->id);
    notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"rename_to",
        c->argv[2],c->db->id);
654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670
    server.dirty++;
    addReply(c,nx ? shared.cone : shared.ok);
}

void renameCommand(redisClient *c) {
    renameGenericCommand(c,0);
}

void renamenxCommand(redisClient *c) {
    renameGenericCommand(c,1);
}

void moveCommand(redisClient *c) {
    robj *o;
    redisDb *src, *dst;
    int srcid;

A
antirez 已提交
671 672 673 674 675
    if (server.cluster_enabled) {
        addReplyError(c,"MOVE is not allowed in cluster mode");
        return;
    }

676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699
    /* Obtain source and target DB pointers */
    src = c->db;
    srcid = c->db->id;
    if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
        addReply(c,shared.outofrangeerr);
        return;
    }
    dst = c->db;
    selectDb(c,srcid); /* Back to the source DB */

    /* If the user is moving using as target the same
     * DB as the source DB it is probably an error. */
    if (src == dst) {
        addReply(c,shared.sameobjecterr);
        return;
    }

    /* Check if the element exists and get a reference */
    o = lookupKeyWrite(c->db,c->argv[1]);
    if (!o) {
        addReply(c,shared.czero);
        return;
    }

700 701
    /* Return zero if the key already exists in the target DB */
    if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
702 703 704
        addReply(c,shared.czero);
        return;
    }
705
    dbAdd(dst,c->argv[1],o);
706 707 708 709 710 711 712 713 714 715 716 717 718 719 720
    incrRefCount(o);

    /* OK! key moved, free the entry in the source DB */
    dbDelete(src,c->argv[1]);
    server.dirty++;
    addReply(c,shared.cone);
}

/*-----------------------------------------------------------------------------
 * Expires API
 *----------------------------------------------------------------------------*/

int removeExpire(redisDb *db, robj *key) {
    /* An expire may only be removed if there is a corresponding entry in the
     * main dict. Otherwise, the key will never be freed. */
721
    redisAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
A
antirez 已提交
722
    return dictDelete(db->expires,key->ptr) == DICT_OK;
723 724
}

725 726
void setExpire(redisDb *db, robj *key, long long when) {
    dictEntry *kde, *de;
727 728

    /* Reuse the sds from the main dict in the expire dict */
729 730 731 732
    kde = dictFind(db->dict,key->ptr);
    redisAssertWithInfo(NULL,key,kde != NULL);
    de = dictReplaceRaw(db->expires,dictGetKey(kde));
    dictSetSignedIntegerVal(de,when);
733 734 735 736
}

/* Return the expire time of the specified key, or -1 if no expire
 * is associated with this key (i.e. the key is non volatile) */
737
long long getExpire(redisDb *db, robj *key) {
738 739 740 741 742 743 744 745
    dictEntry *de;

    /* No expire? return ASAP */
    if (dictSize(db->expires) == 0 ||
       (de = dictFind(db->expires,key->ptr)) == NULL) return -1;

    /* The entry was found in the expire dict, this means it should also
     * be present in the main dict (safety check). */
746
    redisAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
747
    return dictGetSignedIntegerVal(de);
748 749
}

750 751 752 753 754 755 756 757 758 759 760
/* Propagate expires into slaves and the AOF file.
 * When a key expires in the master, a DEL operation for this key is sent
 * to all the slaves and the AOF file if enabled.
 *
 * This way the key expiry is centralized in one place, and since both
 * AOF and the master->slave link guarantee operation ordering, everything
 * will be consistent even if we allow write operations against expiring
 * keys. */
void propagateExpire(redisDb *db, robj *key) {
    robj *argv[2];

761
    argv[0] = shared.del;
762
    argv[1] = key;
763 764
    incrRefCount(argv[0]);
    incrRefCount(argv[1]);
765

766
    if (server.aof_state != REDIS_AOF_OFF)
767
        feedAppendOnlyFile(server.delCommand,db->id,argv,2);
768
    replicationFeedSlaves(server.slaves,db->id,argv,2);
769

770 771
    decrRefCount(argv[0]);
    decrRefCount(argv[1]);
772 773
}

774
int expireIfNeeded(redisDb *db, robj *key) {
775 776
    mstime_t when = getExpire(db,key);
    mstime_t now;
777

778 779
    if (when < 0) return 0; /* No expire for this key */

780 781 782
    /* Don't expire anything while loading. It will be done later. */
    if (server.loading) return 0;

783 784 785 786 787 788 789
    /* If we are in the context of a Lua script, we claim that time is
     * blocked to when the Lua script started. This way a key can expire
     * only the first time it is accessed and not in the middle of the
     * script execution, making propagation to slaves / AOF consistent.
     * See issue #1525 on Github for more information. */
    now = server.lua_caller ? server.lua_time_start : mstime();

790 791 792 793 794 795 796
    /* If we are running in the context of a slave, return ASAP:
     * the slave key expiration is controlled by the master that will
     * send us synthesized DEL operations for expired keys.
     *
     * Still we try to return the right information to the caller, 
     * that is, 0 if we think the key should be still valid, 1 if
     * we think the key is expired at this time. */
797
    if (server.masterhost != NULL) return now > when;
798

799
    /* Return when this key has not expired */
800
    if (now <= when) return 0;
801 802 803

    /* Delete the key */
    server.stat_expiredkeys++;
804
    propagateExpire(db,key);
805 806
    notifyKeyspaceEvent(REDIS_NOTIFY_EXPIRED,
        "expired",key,db->id);
807 808 809 810 811 812 813
    return dbDelete(db,key);
}

/*-----------------------------------------------------------------------------
 * Expires Commands
 *----------------------------------------------------------------------------*/

814 815 816 817 818 819
/* This is the generic command implementation for EXPIRE, PEXPIRE, EXPIREAT
 * and PEXPIREAT. Because the commad second argument may be relative or absolute
 * the "basetime" argument is used to signal what the base time is (either 0
 * for *AT variants of the command, or the current time for relative expires).
 *
 * unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for
G
guiquanz 已提交
820
 * the argv[2] parameter. The basetime is always specified in milliseconds. */
821
void expireGenericCommand(redisClient *c, long long basetime, int unit) {
822
    robj *key = c->argv[1], *param = c->argv[2];
823
    long long when; /* unix time in milliseconds when the key will expire. */
824

825
    if (getLongLongFromObjectOrReply(c, param, &when, NULL) != REDIS_OK)
826
        return;
827

828 829
    if (unit == UNIT_SECONDS) when *= 1000;
    when += basetime;
830

831 832
    /* No key, return zero. */
    if (lookupKeyRead(c->db,key) == NULL) {
833 834 835
        addReply(c,shared.czero);
        return;
    }
836

837 838 839 840 841 842
    /* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
     * should never be executed as a DEL when load the AOF or in the context
     * of a slave instance.
     *
     * Instead we take the other branch of the IF statement setting an expire
     * (possibly in the past) and wait for an explicit DEL from the master. */
843
    if (when <= mstime() && !server.loading && !server.masterhost) {
844 845
        robj *aux;

846
        redisAssertWithInfo(c,key,dbDelete(c->db,key));
847 848 849 850 851 852
        server.dirty++;

        /* Replicate/AOF this as an explicit DEL. */
        aux = createStringObject("DEL",3);
        rewriteClientCommandVector(c,2,aux,key);
        decrRefCount(aux);
853
        signalModifiedKey(c->db,key);
854
        notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",key,c->db->id);
855
        addReply(c, shared.cone);
856 857
        return;
    } else {
858
        setExpire(c->db,key,when);
859
        addReply(c,shared.cone);
860
        signalModifiedKey(c->db,key);
861
        notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"expire",key,c->db->id);
862
        server.dirty++;
863 864 865 866 867
        return;
    }
}

void expireCommand(redisClient *c) {
868
    expireGenericCommand(c,mstime(),UNIT_SECONDS);
869 870 871
}

void expireatCommand(redisClient *c) {
872
    expireGenericCommand(c,0,UNIT_SECONDS);
873 874
}

875
void pexpireCommand(redisClient *c) {
876
    expireGenericCommand(c,mstime(),UNIT_MILLISECONDS);
877
}
878

879
void pexpireatCommand(redisClient *c) {
880
    expireGenericCommand(c,0,UNIT_MILLISECONDS);
881 882 883 884
}

void ttlGenericCommand(redisClient *c, int output_ms) {
    long long expire, ttl = -1;
885

886
    /* If the key does not exist at all, return -2 */
A
antirez 已提交
887
    if (lookupKeyRead(c->db,c->argv[1]) == NULL) {
888 889 890 891 892
        addReplyLongLong(c,-2);
        return;
    }
    /* The key exists. Return -1 if it has no expire, or the actual
     * TTL value otherwise. */
A
antirez 已提交
893
    expire = getExpire(c->db,c->argv[1]);
894
    if (expire != -1) {
895
        ttl = expire-mstime();
A
antirez 已提交
896
        if (ttl < 0) ttl = 0;
897
    }
898 899 900
    if (ttl == -1) {
        addReplyLongLong(c,-1);
    } else {
901
        addReplyLongLong(c,output_ms ? ttl : ((ttl+500)/1000));
902
    }
903
}
A
antirez 已提交
904

905 906 907 908 909 910 911 912
void ttlCommand(redisClient *c) {
    ttlGenericCommand(c, 0);
}

void pttlCommand(redisClient *c) {
    ttlGenericCommand(c, 1);
}

A
antirez 已提交
913 914 915 916 917 918 919
void persistCommand(redisClient *c) {
    dictEntry *de;

    de = dictFind(c->db->dict,c->argv[1]->ptr);
    if (de == NULL) {
        addReply(c,shared.czero);
    } else {
A
antirez 已提交
920
        if (removeExpire(c->db,c->argv[1])) {
A
antirez 已提交
921
            addReply(c,shared.cone);
A
antirez 已提交
922 923
            server.dirty++;
        } else {
A
antirez 已提交
924
            addReply(c,shared.czero);
A
antirez 已提交
925
        }
A
antirez 已提交
926 927
    }
}
928 929 930 931 932

/* -----------------------------------------------------------------------------
 * API to get key arguments from commands
 * ---------------------------------------------------------------------------*/

933 934
/* The base case is to use the keys position as given in the command table
 * (firstkey, lastkey, step). */
935 936 937 938 939 940 941 942 943 944 945 946 947
int *getKeysUsingCommandTable(struct redisCommand *cmd,robj **argv, int argc, int *numkeys) {
    int j, i = 0, last, *keys;
    REDIS_NOTUSED(argv);

    if (cmd->firstkey == 0) {
        *numkeys = 0;
        return NULL;
    }
    last = cmd->lastkey;
    if (last < 0) last = argc+last;
    keys = zmalloc(sizeof(int)*((last - cmd->firstkey)+1));
    for (j = cmd->firstkey; j <= last; j += cmd->keystep) {
        redisAssert(j < argc);
948
        keys[i++] = j;
949
    }
950
    *numkeys = i;
951 952 953
    return keys;
}

954 955 956 957 958
/* Return keys as an heap allocated array of integers. The length of the array
 * is returned by reference into *numkeys.
 *
 * This function uses the command table if a command-specific helper function
 * is not required, otherwise it calls the command-specific function. */
959
int *getKeysFromCommand(struct redisCommand *cmd,robj **argv, int argc, int *numkeys) {
960
    if (cmd->getkeys_proc) {
961
        return cmd->getkeys_proc(cmd,argv,argc,numkeys);
962 963 964 965 966
    } else {
        return getKeysUsingCommandTable(cmd,argv,argc,numkeys);
    }
}

967
/* Free the result of getKeysFromCommand. */
968 969 970 971
void getKeysFreeResult(int *result) {
    zfree(result);
}

972 973 974 975
/* Helper function to extract keys from following commands:
 * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options>
 * ZINTERSTORE <destkey> <num-keys> <key> <key> ... <key> <options> */
int *zunionInterGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
976 977 978 979 980 981 982 983 984 985
    int i, num, *keys;
    REDIS_NOTUSED(cmd);

    num = atoi(argv[2]->ptr);
    /* Sanity check. Don't return any key if the command is going to
     * reply with syntax error. */
    if (num > (argc-3)) {
        *numkeys = 0;
        return NULL;
    }
986 987

    /* Keys in z{union,inter}store come from two places:
988 989
     * argv[1] = storage key,
     * argv[3...n] = keys to intersect */
990 991 992
    keys = zmalloc(sizeof(int)*(num+1));

    /* Add all key positions for argv[3...n] to keys[] */
993
    for (i = 0; i < num; i++) keys[i] = 3+i;
994

995
    /* Finally add the argv[1] key position (the storage key target). */
996 997
    keys[num] = 1;
    *numkeys = num+1;  /* Total keys = {union,inter} keys + storage key */
998 999
    return keys;
}
1000

1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023
/* Helper function to extract keys from the following commands:
 * EVAL <script> <num-keys> <key> <key> ... <key> [more stuff]
 * EVALSHA <script> <num-keys> <key> <key> ... <key> [more stuff] */
int *evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkeys) {
    int i, num, *keys;
    REDIS_NOTUSED(cmd);

    num = atoi(argv[2]->ptr);
    /* Sanity check. Don't return any key if the command is going to
     * reply with syntax error. */
    if (num > (argc-3)) {
        *numkeys = 0;
        return NULL;
    }

    keys = zmalloc(sizeof(int)*num);

    /* Add all key positions for argv[3...n] to keys[] */
    for (i = 0; i < num; i++) keys[i] = 3+i;

    return keys;
}

1024 1025 1026
/* Slot to Key API. This is used by Redis Cluster in order to obtain in
 * a fast way a key that belongs to a specified hash slot. This is useful
 * while rehashing the cluster. */
1027
void slotToKeyAdd(robj *key) {
1028 1029
    unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));

1030
    zslInsert(server.cluster->slots_to_keys,hashslot,key);
1031 1032 1033
    incrRefCount(key);
}

1034
void slotToKeyDel(robj *key) {
1035 1036
    unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));

1037
    zslDelete(server.cluster->slots_to_keys,hashslot,key);
1038 1039
}

1040
void slotToKeyFlush(void) {
1041 1042 1043 1044
    zslFree(server.cluster->slots_to_keys);
    server.cluster->slots_to_keys = zslCreate();
}

1045
unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
1046 1047
    zskiplistNode *n;
    zrangespec range;
A
antirez 已提交
1048
    int j = 0;
1049 1050 1051 1052

    range.min = range.max = hashslot;
    range.minex = range.maxex = 0;
    
1053
    n = zslFirstInRange(server.cluster->slots_to_keys, range);
A
antirez 已提交
1054 1055 1056 1057 1058
    while(n && n->score == hashslot && count--) {
        keys[j++] = n->obj;
        n = n->level[0].forward;
    }
    return j;
1059
}
1060

1061
unsigned int countKeysInSlot(unsigned int hashslot) {
1062 1063
    zskiplist *zsl = server.cluster->slots_to_keys;
    zskiplistNode *zn;
1064
    zrangespec range;
1065
    int rank, count = 0;
1066 1067 1068

    range.min = range.max = hashslot;
    range.minex = range.maxex = 0;
1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085

    /* Find first element in range */
    zn = zslFirstInRange(zsl, range);

    /* Use rank of first element, if any, to determine preliminary count */
    if (zn != NULL) {
        rank = zslGetRank(zsl, zn->score, zn->obj);
        count = (zsl->length - (rank - 1));

        /* Find last element in range */
        zn = zslLastInRange(zsl, range);

        /* Use rank of last element, if any, to determine the actual count */
        if (zn != NULL) {
            rank = zslGetRank(zsl, zn->score, zn->obj);
            count -= (zsl->length - rank);
        }
1086
    }
1087
    return count;
1088
}