t_list.c 33.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
/*
 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   * Redistributions of source code must retain the above copyright notice,
 *     this list of conditions and the following disclaimer.
 *   * Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *   * Neither the name of Redis nor the names of its contributors may be used
 *     to endorse or promote products derived from this software without
 *     specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

30 31 32 33 34 35
#include "redis.h"

/*-----------------------------------------------------------------------------
 * List API
 *----------------------------------------------------------------------------*/

G
guiquanz 已提交
36
/* The function pushes an element to the specified list object 'subject',
37 38
 * at head or tail position as specified by 'where'.
 *
G
guiquanz 已提交
39
 * There is no need for the caller to increment the refcount of 'value' as
40
 * the function takes care of it if needed. */
41
void listTypePush(robj *subject, robj *value, int where) {
M
Matt Stancliff 已提交
42 43
    if (subject->encoding == REDIS_ENCODING_QUICKLIST) {
        int pos = (where == REDIS_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
44
        value = getDecodedObject(value);
M
Matt Stancliff 已提交
45 46 47 48 49
        size_t len = sdslen(value->ptr);
        size_t zlen = server.list_max_ziplist_entries;
        /* If this value is greater than our allowed values, create it in
         * an isolated ziplist */
        quicklistPush(subject->ptr, zlen, value->ptr, len, pos);
50 51 52 53 54 55
        decrRefCount(value);
    } else {
        redisPanic("Unknown list encoding");
    }
}

M
Matt Stancliff 已提交
56 57 58 59
void *listPopSaver(unsigned char *data, unsigned int sz) {
    return createStringObject((char*)data,sz);
}

60
robj *listTypePop(robj *subject, int where) {
M
Matt Stancliff 已提交
61
    long long vlong;
62
    robj *value = NULL;
M
Matt Stancliff 已提交
63 64 65 66 67 68

    int ql_where = where == REDIS_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL;
    if (subject->encoding == REDIS_ENCODING_QUICKLIST) {
        if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value,
                               NULL, &vlong, listPopSaver)) {
            if (!value)
69 70 71 72 73 74 75 76 77
                value = createStringObjectFromLongLong(vlong);
        }
    } else {
        redisPanic("Unknown list encoding");
    }
    return value;
}

unsigned long listTypeLength(robj *subject) {
M
Matt Stancliff 已提交
78 79
    if (subject->encoding == REDIS_ENCODING_QUICKLIST) {
        return quicklistCount(subject->ptr);
80 81 82 83 84 85
    } else {
        redisPanic("Unknown list encoding");
    }
}

/* Initialize an iterator at the specified index. */
M
Matt Stancliff 已提交
86 87
listTypeIterator *listTypeInitIterator(robj *subject, long index,
                                       unsigned char direction) {
88 89 90 91
    listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
    li->subject = subject;
    li->encoding = subject->encoding;
    li->direction = direction;
M
Matt Stancliff 已提交
92 93 94 95 96 97 98 99
    li->iter = NULL;
    /* REDIS_HEAD means start at TAIL and move *towards* head.
     * REDIS_TAIL means start at HEAD and move *towards tail. */
    int iter_direction =
        direction == REDIS_HEAD ? AL_START_TAIL : AL_START_HEAD;
    if (li->encoding == REDIS_ENCODING_QUICKLIST) {
        li->iter = quicklistGetIteratorAtIdx(li->subject->ptr,
                                             iter_direction, index);
100 101 102 103 104 105 106 107
    } else {
        redisPanic("Unknown list encoding");
    }
    return li;
}

/* Clean up the iterator. */
void listTypeReleaseIterator(listTypeIterator *li) {
M
Matt Stancliff 已提交
108
    zfree(li->iter);
109 110 111 112 113 114 115 116 117 118 119
    zfree(li);
}

/* Stores pointer to current the entry in the provided entry structure
 * and advances the position of the iterator. Returns 1 when the current
 * entry is in fact an entry, 0 otherwise. */
int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
    /* Protect from converting when iterating */
    redisAssert(li->subject->encoding == li->encoding);

    entry->li = li;
M
Matt Stancliff 已提交
120 121
    if (li->encoding == REDIS_ENCODING_QUICKLIST) {
        return quicklistNext(li->iter, &entry->entry);
122 123 124 125 126 127 128 129 130
    } else {
        redisPanic("Unknown list encoding");
    }
    return 0;
}

/* Return entry or NULL at the current position of the iterator. */
robj *listTypeGet(listTypeEntry *entry) {
    robj *value = NULL;
M
Matt Stancliff 已提交
131 132 133 134 135 136
    if (entry->li->encoding == REDIS_ENCODING_QUICKLIST) {
        if (entry->entry.value) {
            value = createStringObject((char *)entry->entry.value,
                                       entry->entry.sz);
        } else {
            value = createStringObjectFromLongLong(entry->entry.longval);
137 138 139 140 141 142 143 144
        }
    } else {
        redisPanic("Unknown list encoding");
    }
    return value;
}

void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
M
Matt Stancliff 已提交
145
    if (entry->li->encoding == REDIS_ENCODING_QUICKLIST) {
146
        value = getDecodedObject(value);
M
Matt Stancliff 已提交
147 148 149
        sds str = value->ptr;
        size_t len = sdslen(str);
        size_t zlen = server.list_max_ziplist_entries;
150
        if (where == REDIS_TAIL) {
M
Matt Stancliff 已提交
151 152 153 154 155
            quicklistInsertAfter((quicklist *)entry->entry.quicklist, zlen,
                                 &entry->entry, str, len);
        } else if (where == REDIS_HEAD) {
            quicklistInsertBefore((quicklist *)entry->entry.quicklist, zlen,
                                  &entry->entry, str, len);
156 157 158 159 160 161 162 163 164
        }
        decrRefCount(value);
    } else {
        redisPanic("Unknown list encoding");
    }
}

/* Compare the given object with the entry at the current position. */
int listTypeEqual(listTypeEntry *entry, robj *o) {
M
Matt Stancliff 已提交
165
    if (entry->li->encoding == REDIS_ENCODING_QUICKLIST) {
166
        redisAssertWithInfo(NULL,o,sdsEncodedObject(o));
M
Matt Stancliff 已提交
167
        return quicklistCompare(entry->entry.zi,o->ptr,sdslen(o->ptr));
168 169 170 171 172 173
    } else {
        redisPanic("Unknown list encoding");
    }
}

/* Delete the element pointed to. */
M
Matt Stancliff 已提交
174 175 176
void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry) {
    if (entry->li->encoding == REDIS_ENCODING_QUICKLIST) {
        quicklistDelEntry(iter->iter, &entry->entry);
177 178 179 180 181
    } else {
        redisPanic("Unknown list encoding");
    }
}

M
Matt Stancliff 已提交
182
/* Create a quicklist from a single ziplist */
183
void listTypeConvert(robj *subject, int enc) {
M
Matt Stancliff 已提交
184 185
    redisAssertWithInfo(NULL,subject,subject->type==REDIS_LIST);
    redisAssertWithInfo(NULL,subject,subject->encoding==REDIS_ENCODING_ZIPLIST);
186

M
Matt Stancliff 已提交
187 188
    if (enc == REDIS_ENCODING_QUICKLIST) {
        size_t zlen = server.list_max_ziplist_entries;
189

M
Matt Stancliff 已提交
190 191
        subject->encoding = REDIS_ENCODING_QUICKLIST;
        subject->ptr = quicklistCreateFromZiplist(zlen, subject->ptr);
192 193 194 195 196 197 198 199 200 201
    } else {
        redisPanic("Unsupported list conversion");
    }
}

/*-----------------------------------------------------------------------------
 * List Commands
 *----------------------------------------------------------------------------*/

void pushGenericCommand(redisClient *c, int where) {
202
    int j, waiting = 0, pushed = 0;
203
    robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
A
antirez 已提交
204 205 206 207 208 209 210 211 212

    if (lobj && lobj->type != REDIS_LIST) {
        addReply(c,shared.wrongtypeerr);
        return;
    }

    for (j = 2; j < c->argc; j++) {
        c->argv[j] = tryObjectEncoding(c->argv[j]);
        if (!lobj) {
M
Matt Stancliff 已提交
213
            lobj = createQuicklistObject();
A
antirez 已提交
214
            dbAdd(c->db,c->argv[1],lobj);
215
        }
A
antirez 已提交
216 217
        listTypePush(lobj,c->argv[j],where);
        pushed++;
218
    }
219
    addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
220 221 222 223
    if (pushed) {
        char *event = (where == REDIS_HEAD) ? "lpush" : "rpush";

        signalModifiedKey(c->db,c->argv[1]);
224
        notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id);
225
    }
A
antirez 已提交
226
    server.dirty += pushed;
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
}

void lpushCommand(redisClient *c) {
    pushGenericCommand(c,REDIS_HEAD);
}

void rpushCommand(redisClient *c) {
    pushGenericCommand(c,REDIS_TAIL);
}

void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
    robj *subject;
    listTypeIterator *iter;
    listTypeEntry entry;
    int inserted = 0;

243
    if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
        checkType(c,subject,REDIS_LIST)) return;

    if (refval != NULL) {
        /* Seek refval from head to tail */
        iter = listTypeInitIterator(subject,0,REDIS_TAIL);
        while (listTypeNext(iter,&entry)) {
            if (listTypeEqual(&entry,refval)) {
                listTypeInsert(&entry,val,where);
                inserted = 1;
                break;
            }
        }
        listTypeReleaseIterator(iter);

        if (inserted) {
259
            signalModifiedKey(c->db,c->argv[1]);
260 261
            notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"linsert",
                                c->argv[1],c->db->id);
262 263 264 265 266 267 268
            server.dirty++;
        } else {
            /* Notify client of a failed insert */
            addReply(c,shared.cnegone);
            return;
        }
    } else {
269 270
        char *event = (where == REDIS_HEAD) ? "lpush" : "rpush";

271
        listTypePush(subject,val,where);
272
        signalModifiedKey(c->db,c->argv[1]);
273
        notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id);
274 275 276
        server.dirty++;
    }

277
    addReplyLongLong(c,listTypeLength(subject));
278 279 280
}

void lpushxCommand(redisClient *c) {
281
    c->argv[2] = tryObjectEncoding(c->argv[2]);
282 283 284 285
    pushxGenericCommand(c,NULL,c->argv[2],REDIS_HEAD);
}

void rpushxCommand(redisClient *c) {
286
    c->argv[2] = tryObjectEncoding(c->argv[2]);
287 288 289 290
    pushxGenericCommand(c,NULL,c->argv[2],REDIS_TAIL);
}

void linsertCommand(redisClient *c) {
291
    c->argv[4] = tryObjectEncoding(c->argv[4]);
292 293 294 295 296 297 298 299 300 301 302 303
    if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
        pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL);
    } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
        pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_HEAD);
    } else {
        addReply(c,shared.syntaxerr);
    }
}

void llenCommand(redisClient *c) {
    robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
    if (o == NULL || checkType(c,o,REDIS_LIST)) return;
304
    addReplyLongLong(c,listTypeLength(o));
305 306 307 308 309
}

void lindexCommand(redisClient *c) {
    robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
    if (o == NULL || checkType(c,o,REDIS_LIST)) return;
310
    long index;
311 312
    robj *value = NULL;

313 314 315
    if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
        return;

M
Matt Stancliff 已提交
316 317 318 319 320
    if (o->encoding == REDIS_ENCODING_QUICKLIST) {
        quicklistEntry entry;
        if (quicklistIndex(o->ptr, index, &entry)) {
            if (entry.value) {
                value = createStringObject((char*)entry.value,entry.sz);
321
            } else {
M
Matt Stancliff 已提交
322
                value = createStringObjectFromLongLong(entry.longval);
323 324 325 326 327 328 329 330 331 332 333 334 335 336
            }
            addReplyBulk(c,value);
            decrRefCount(value);
        } else {
            addReply(c,shared.nullbulk);
        }
    } else {
        redisPanic("Unknown list encoding");
    }
}

void lsetCommand(redisClient *c) {
    robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
    if (o == NULL || checkType(c,o,REDIS_LIST)) return;
337
    long index;
M
Matt Stancliff 已提交
338
    robj *value = c->argv[3];
339

340 341 342
    if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
        return;

M
Matt Stancliff 已提交
343 344 345 346 347
    if (o->encoding == REDIS_ENCODING_QUICKLIST) {
        quicklist *ql = o->ptr;
        int replaced = quicklistReplaceAtIndex(ql, index,
                                               value->ptr, sdslen(value->ptr));
        if (!replaced) {
348 349 350
            addReply(c,shared.outofrangeerr);
        } else {
            addReply(c,shared.ok);
351
            signalModifiedKey(c->db,c->argv[1]);
352
            notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"lset",c->argv[1],c->db->id);
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
            server.dirty++;
        }
    } else {
        redisPanic("Unknown list encoding");
    }
}

void popGenericCommand(redisClient *c, int where) {
    robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
    if (o == NULL || checkType(c,o,REDIS_LIST)) return;

    robj *value = listTypePop(o,where);
    if (value == NULL) {
        addReply(c,shared.nullbulk);
    } else {
368 369
        char *event = (where == REDIS_HEAD) ? "lpop" : "rpop";

370 371
        addReplyBulk(c,value);
        decrRefCount(value);
372
        notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id);
373
        if (listTypeLength(o) == 0) {
374 375
            notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
                                c->argv[1],c->db->id);
376 377
            dbDelete(c->db,c->argv[1]);
        }
378
        signalModifiedKey(c->db,c->argv[1]);
379 380 381 382 383 384 385 386 387 388 389 390 391
        server.dirty++;
    }
}

void lpopCommand(redisClient *c) {
    popGenericCommand(c,REDIS_HEAD);
}

void rpopCommand(redisClient *c) {
    popGenericCommand(c,REDIS_TAIL);
}

void lrangeCommand(redisClient *c) {
392
    robj *o;
393
    long start, end, llen, rangelen;
394

395 396 397
    if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
        (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;

398 399 400 401 402 403 404 405 406
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
         || checkType(c,o,REDIS_LIST)) return;
    llen = listTypeLength(o);

    /* convert negative indexes */
    if (start < 0) start = llen+start;
    if (end < 0) end = llen+end;
    if (start < 0) start = 0;

407 408
    /* Invariant: start >= 0, so this test will be true when end < 0.
     * The range is empty when start > end or start >= length. */
409 410 411 412 413 414 415 416
    if (start > end || start >= llen) {
        addReply(c,shared.emptymultibulk);
        return;
    }
    if (end >= llen) end = llen-1;
    rangelen = (end-start)+1;

    /* Return the result in form of a multi-bulk reply */
417
    addReplyMultiBulkLen(c,rangelen);
M
Matt Stancliff 已提交
418 419
    if (o->encoding == REDIS_ENCODING_QUICKLIST) {
        listTypeIterator *iter = listTypeInitIterator(o, start, REDIS_TAIL);
420 421

        while(rangelen--) {
M
Matt Stancliff 已提交
422 423 424 425 426
            listTypeEntry entry;
            listTypeNext(iter, &entry);
            quicklistEntry *qe = &entry.entry;
            if (qe->value) {
                addReplyBulkCBuffer(c,qe->value,qe->sz);
427
            } else {
M
Matt Stancliff 已提交
428
                addReplyBulkLongLong(c,qe->longval);
429 430
            }
        }
M
Matt Stancliff 已提交
431
        listTypeReleaseIterator(iter);
432
    } else {
M
Matt Stancliff 已提交
433
        redisPanic("List encoding is not QUICKLIST!");
434 435 436 437 438
    }
}

void ltrimCommand(redisClient *c) {
    robj *o;
M
Matt Stancliff 已提交
439
    long start, end, llen, ltrim, rtrim;
440

441 442 443
    if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
        (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;

444 445 446 447 448 449 450 451 452
    if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
        checkType(c,o,REDIS_LIST)) return;
    llen = listTypeLength(o);

    /* convert negative indexes */
    if (start < 0) start = llen+start;
    if (end < 0) end = llen+end;
    if (start < 0) start = 0;

453 454
    /* Invariant: start >= 0, so this test will be true when end < 0.
     * The range is empty when start > end or start >= length. */
455 456 457 458 459 460 461 462 463 464 465
    if (start > end || start >= llen) {
        /* Out of range start or start > end result in empty list */
        ltrim = llen;
        rtrim = 0;
    } else {
        if (end >= llen) end = llen-1;
        ltrim = start;
        rtrim = llen-end-1;
    }

    /* Remove list elements to perform the trim */
M
Matt Stancliff 已提交
466 467 468
    if (o->encoding == REDIS_ENCODING_QUICKLIST) {
        quicklistDelRange(o->ptr,0,ltrim);
        quicklistDelRange(o->ptr,-rtrim,rtrim);
469 470 471
    } else {
        redisPanic("Unknown list encoding");
    }
472

473
    notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"ltrim",c->argv[1],c->db->id);
474 475
    if (listTypeLength(o) == 0) {
        dbDelete(c->db,c->argv[1]);
476
        notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
477
    }
478
    signalModifiedKey(c->db,c->argv[1]);
479 480 481 482 483
    server.dirty++;
    addReply(c,shared.ok);
}

void lremCommand(redisClient *c) {
484
    robj *subject, *obj;
M
Matt Stancliff 已提交
485
    obj = c->argv[3];
486
    long toremove;
487
    long removed = 0;
488

489 490 491
    if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != REDIS_OK))
        return;

492 493 494 495 496 497 498 499 500 501 502
    subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
    if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;

    listTypeIterator *li;
    if (toremove < 0) {
        toremove = -toremove;
        li = listTypeInitIterator(subject,-1,REDIS_HEAD);
    } else {
        li = listTypeInitIterator(subject,0,REDIS_TAIL);
    }

M
Matt Stancliff 已提交
503
    listTypeEntry entry;
504 505
    while (listTypeNext(li,&entry)) {
        if (listTypeEqual(&entry,obj)) {
M
Matt Stancliff 已提交
506
            listTypeDelete(li, &entry);
507 508 509 510 511 512 513
            server.dirty++;
            removed++;
            if (toremove && removed == toremove) break;
        }
    }
    listTypeReleaseIterator(li);

M
Matt Stancliff 已提交
514 515 516
    if (listTypeLength(subject) == 0) {
        dbDelete(c->db,c->argv[1]);
    }
517

518
    addReplyLongLong(c,removed);
519
    if (removed) signalModifiedKey(c->db,c->argv[1]);
520 521 522 523
}

/* This is the semantic of this command:
 *  RPOPLPUSH srclist dstlist:
524 525 526 527 528 529 530
 *    IF LLEN(srclist) > 0
 *      element = RPOP srclist
 *      LPUSH dstlist element
 *      RETURN element
 *    ELSE
 *      RETURN nil
 *    END
531 532 533 534 535 536
 *  END
 *
 * The idea is to be able to get an element from a list in a reliable way
 * since the element is not just returned but pushed against another list
 * as well. This command was originally proposed by Ezra Zygmuntowicz.
 */
537

538 539 540
void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
    /* Create the list if the key does not exist */
    if (!dstobj) {
M
Matt Stancliff 已提交
541
        dstobj = createQuicklistObject();
542
        dbAdd(c->db,dstkey,dstobj);
543
    }
544 545
    signalModifiedKey(c->db,dstkey);
    listTypePush(dstobj,value,REDIS_HEAD);
546
    notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"lpush",dstkey,c->db->id);
547 548 549 550
    /* Always send the pushed value to the client. */
    addReplyBulk(c,value);
}

551
void rpoplpushCommand(redisClient *c) {
552 553 554 555 556
    robj *sobj, *value;
    if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
        checkType(c,sobj,REDIS_LIST)) return;

    if (listTypeLength(sobj) == 0) {
557 558
        /* This may only happen after loading very old RDB files. Recent
         * versions of Redis delete keys of empty lists. */
559 560 561
        addReply(c,shared.nullbulk);
    } else {
        robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
562 563
        robj *touchedkey = c->argv[1];

564 565
        if (dobj && checkType(c,dobj,REDIS_LIST)) return;
        value = listTypePop(sobj,REDIS_TAIL);
566
        /* We saved touched key, and protect it, since rpoplpushHandlePush
567 568
         * may change the client command argument vector (it does not
         * currently). */
569
        incrRefCount(touchedkey);
570
        rpoplpushHandlePush(c,c->argv[2],dobj,value);
571 572 573 574 575

        /* listTypePop returns an object with its refcount incremented */
        decrRefCount(value);

        /* Delete the source list when it is empty */
576
        notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"rpop",touchedkey,c->db->id);
577 578
        if (listTypeLength(sobj) == 0) {
            dbDelete(c->db,touchedkey);
579 580
            notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
                                touchedkey,c->db->id);
581
        }
582 583
        signalModifiedKey(c->db,touchedkey);
        decrRefCount(touchedkey);
584 585 586 587 588 589 590 591
        server.dirty++;
    }
}

/*-----------------------------------------------------------------------------
 * Blocking POP operations
 *----------------------------------------------------------------------------*/

592
/* This is how the current blocking POP works, we use BLPOP as example:
593 594
 * - If the user calls BLPOP and the key exists and contains a non empty list
 *   then LPOP is called instead. So BLPOP is semantically the same as LPOP
595
 *   if blocking is not required.
596 597 598 599 600 601 602
 * - If instead BLPOP is called and the key does not exists or the list is
 *   empty we need to block. In order to do so we remove the notification for
 *   new data to read in the client socket (so that we'll not serve new
 *   requests if the blocking request is not served). Also we put the client
 *   in a dictionary (db->blocking_keys) mapping keys to a list of clients
 *   blocking for this keys.
 * - If a PUSH operation against a key with blocked clients waiting is
603 604 605 606
 *   performed, we mark this key as "ready", and after the current command,
 *   MULTI/EXEC block, or script, is executed, we serve all the clients waiting
 *   for this list, from the one that blocked first, to the last, accordingly
 *   to the number of elements we have in the ready list.
607 608 609 610
 */

/* Set a client in blocking mode for the specified key, with the specified
 * timeout */
611
void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
612 613
    dictEntry *de;
    list *l;
614
    int j;
615

616 617
    c->bpop.timeout = timeout;
    c->bpop.target = target;
618

619 620
    if (target != NULL) incrRefCount(target);

621
    for (j = 0; j < numkeys; j++) {
622 623
        /* If the key already exists in the dict ignore it. */
        if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
624 625 626 627 628 629 630 631 632 633 634
        incrRefCount(keys[j]);

        /* And in the other "side", to map keys -> clients */
        de = dictFind(c->db->blocking_keys,keys[j]);
        if (de == NULL) {
            int retval;

            /* For every key we take a list of clients blocked for it */
            l = listCreate();
            retval = dictAdd(c->db->blocking_keys,keys[j],l);
            incrRefCount(keys[j]);
635
            redisAssertWithInfo(c,keys[j],retval == DICT_OK);
636
        } else {
637
            l = dictGetVal(de);
638 639 640
        }
        listAddNodeTail(l,c);
    }
641
    blockClient(c,REDIS_BLOCKED_LIST);
642 643
}

644 645
/* Unblock a client that's waiting in a blocking operation such as BLPOP.
 * You should never call this function directly, but unblockClient() instead. */
646 647
void unblockClientWaitingData(redisClient *c) {
    dictEntry *de;
648
    dictIterator *di;
649 650
    list *l;

651 652
    redisAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
    di = dictGetIterator(c->bpop.keys);
653
    /* The client may wait for multiple keys, so unblock it for every key. */
654 655 656
    while((de = dictNext(di)) != NULL) {
        robj *key = dictGetKey(de);

657
        /* Remove this client from the list of clients waiting for this key. */
658 659
        l = dictFetchValue(c->db->blocking_keys,key);
        redisAssertWithInfo(c,key,l != NULL);
660 661 662
        listDelNode(l,listSearchKey(l,c));
        /* If the list is empty we need to remove it to avoid wasting memory */
        if (listLength(l) == 0)
663
            dictDelete(c->db->blocking_keys,key);
664
    }
665
    dictReleaseIterator(di);
666

667
    /* Cleanup the client structure */
668
    dictEmpty(c->bpop.keys,NULL);
669 670 671 672
    if (c->bpop.target) {
        decrRefCount(c->bpop.target);
        c->bpop.target = NULL;
    }
673 674
}

675 676
/* If the specified key has clients blocked waiting for list pushes, this
 * function will put the key reference into the server.ready_keys list.
677
 * Note that db->ready_keys is a hash table that allows us to avoid putting
G
guiquanz 已提交
678
 * the same key again and again in the list in case of multiple pushes
679
 * made by a script or in the context of MULTI/EXEC.
680
 *
681
 * The list will be finally processed by handleClientsBlockedOnLists() */
682
void signalListAsReady(redisDb *db, robj *key) {
683 684 685
    readyList *rl;

    /* No clients blocking for this key? No need to queue it. */
686
    if (dictFind(db->blocking_keys,key) == NULL) return;
687 688

    /* Key was already signaled? No need to queue it again. */
689
    if (dictFind(db->ready_keys,key) != NULL) return;
690 691 692 693

    /* Ok, we need to queue this key into server.ready_keys. */
    rl = zmalloc(sizeof(*rl));
    rl->key = key;
694
    rl->db = db;
695 696 697 698 699 700 701
    incrRefCount(key);
    listAddNodeTail(server.ready_keys,rl);

    /* We also add the key in the db->ready_keys dictionary in order
     * to avoid adding it multiple times into a list with a simple O(1)
     * check. */
    incrRefCount(key);
702
    redisAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
703 704
}

705
/* This is a helper function for handleClientsBlockedOnLists(). It's work
706 707
 * is to serve a specific client (receiver) that is blocked on 'key'
 * in the context of the specified 'db', doing the following:
708
 *
709 710
 * 1) Provide the client with the 'value' element.
 * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
G
guiquanz 已提交
711
 *    'value' element on the destination list (the LPUSH side of the command).
712 713 714 715 716 717 718 719 720
 * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
 *    the AOF and replication channel.
 *
 * The argument 'where' is REDIS_TAIL or REDIS_HEAD, and indicates if the
 * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that
 * we can propagate the command properly.
 *
 * The function returns REDIS_OK if we are able to serve the client, otherwise
 * REDIS_ERR is returned to signal the caller that the list POP operation
G
guiquanz 已提交
721
 * should be undone as the client was not served: This only happens for
722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764
 * BRPOPLPUSH that fails to push the value to the destination key as it is
 * of the wrong type. */
int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
{
    robj *argv[3];

    if (dstkey == NULL) {
        /* Propagate the [LR]POP operation. */
        argv[0] = (where == REDIS_HEAD) ? shared.lpop :
                                          shared.rpop;
        argv[1] = key;
        propagate((where == REDIS_HEAD) ?
            server.lpopCommand : server.rpopCommand,
            db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);

        /* BRPOP/BLPOP */
        addReplyMultiBulkLen(receiver,2);
        addReplyBulk(receiver,key);
        addReplyBulk(receiver,value);
    } else {
        /* BRPOPLPUSH */
        robj *dstobj =
            lookupKeyWrite(receiver->db,dstkey);
        if (!(dstobj &&
             checkType(receiver,dstobj,REDIS_LIST)))
        {
            /* Propagate the RPOP operation. */
            argv[0] = shared.rpop;
            argv[1] = key;
            propagate(server.rpopCommand,
                db->id,argv,2,
                REDIS_PROPAGATE_AOF|
                REDIS_PROPAGATE_REPL);
            rpoplpushHandlePush(receiver,dstkey,dstobj,
                value);
            /* Propagate the LPUSH operation. */
            argv[0] = shared.lpush;
            argv[1] = dstkey;
            argv[2] = value;
            propagate(server.lpushCommand,
                db->id,argv,3,
                REDIS_PROPAGATE_AOF|
                REDIS_PROPAGATE_REPL);
765
        } else {
766 767 768
            /* BRPOPLPUSH failed because of wrong
             * destination type. */
            return REDIS_ERR;
769
        }
D
Damian Janowski &amp; Michel Martens 已提交
770
    }
771 772
    return REDIS_OK;
}
773

774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826
/* This function should be called by Redis every time a single command,
 * a MULTI/EXEC block, or a Lua script, terminated its execution after
 * being called by a client.
 *
 * All the keys with at least one client blocked that received at least
 * one new element via some PUSH operation are accumulated into
 * the server.ready_keys list. This function will run the list and will
 * serve clients accordingly. Note that the function will iterate again and
 * again as a result of serving BRPOPLPUSH we can have new blocking clients
 * to serve because of the PUSH side of BRPOPLPUSH. */
void handleClientsBlockedOnLists(void) {
    while(listLength(server.ready_keys) != 0) {
        list *l;

        /* Point server.ready_keys to a fresh list and save the current one
         * locally. This way as we run the old list we are free to call
         * signalListAsReady() that may push new elements in server.ready_keys
         * when handling clients blocked into BRPOPLPUSH. */
        l = server.ready_keys;
        server.ready_keys = listCreate();

        while(listLength(l) != 0) {
            listNode *ln = listFirst(l);
            readyList *rl = ln->value;

            /* First of all remove this key from db->ready_keys so that
             * we can safely call signalListAsReady() against this key. */
            dictDelete(rl->db->ready_keys,rl->key);

            /* If the key exists and it's a list, serve blocked clients
             * with data. */
            robj *o = lookupKeyWrite(rl->db,rl->key);
            if (o != NULL && o->type == REDIS_LIST) {
                dictEntry *de;

                /* We serve clients in the same order they blocked for
                 * this key, from the first blocked to the last. */
                de = dictFind(rl->db->blocking_keys,rl->key);
                if (de) {
                    list *clients = dictGetVal(de);
                    int numclients = listLength(clients);

                    while(numclients--) {
                        listNode *clientnode = listFirst(clients);
                        redisClient *receiver = clientnode->value;
                        robj *dstkey = receiver->bpop.target;
                        int where = (receiver->lastcmd &&
                                     receiver->lastcmd->proc == blpopCommand) ?
                                    REDIS_HEAD : REDIS_TAIL;
                        robj *value = listTypePop(o,where);

                        if (value) {
                            /* Protect receiver->bpop.target, that will be
827
                             * freed by the next unblockClient()
828 829
                             * call. */
                            if (dstkey) incrRefCount(dstkey);
830
                            unblockClient(receiver);
831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847

                            if (serveClientBlockedOnList(receiver,
                                rl->key,dstkey,rl->db,value,
                                where) == REDIS_ERR)
                            {
                                /* If we failed serving the client we need
                                 * to also undo the POP operation. */
                                    listTypePush(o,value,where);
                            }

                            if (dstkey) decrRefCount(dstkey);
                            decrRefCount(value);
                        } else {
                            break;
                        }
                    }
                }
848

M
Matt Stancliff 已提交
849 850 851
                if (listTypeLength(o) == 0) {
                    dbDelete(rl->db,rl->key);
                }
852 853 854 855 856 857 858 859 860 861 862
                /* We don't call signalModifiedKey() as it was already called
                 * when an element was pushed on the list. */
            }

            /* Free this item. */
            decrRefCount(rl->key);
            zfree(rl);
            listDelNode(l,ln);
        }
        listRelease(l); /* We have the new list on place at this point. */
    }
863 864 865 866 867
}

/* Blocking RPOP/LPOP */
void blockingPopGenericCommand(redisClient *c, int where) {
    robj *o;
868
    mstime_t timeout;
869 870
    int j;

871 872
    if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
        != REDIS_OK) return;
873

874 875 876 877 878 879 880 881
    for (j = 1; j < c->argc-1; j++) {
        o = lookupKeyWrite(c->db,c->argv[j]);
        if (o != NULL) {
            if (o->type != REDIS_LIST) {
                addReply(c,shared.wrongtypeerr);
                return;
            } else {
                if (listTypeLength(o) != 0) {
882
                    /* Non empty list, this is like a non normal [LR]POP. */
883
                    char *event = (where == REDIS_HEAD) ? "lpop" : "rpop";
884 885
                    robj *value = listTypePop(o,where);
                    redisAssert(value != NULL);
D
Damian Janowski &amp; Michel Martens 已提交
886

887 888 889 890
                    addReplyMultiBulkLen(c,2);
                    addReplyBulk(c,c->argv[j]);
                    addReplyBulk(c,value);
                    decrRefCount(value);
891 892
                    notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,
                                        c->argv[j],c->db->id);
893 894
                    if (listTypeLength(o) == 0) {
                        dbDelete(c->db,c->argv[j]);
895 896
                        notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
                                            c->argv[j],c->db->id);
897
                    }
898 899 900 901 902 903 904
                    signalModifiedKey(c->db,c->argv[j]);
                    server.dirty++;

                    /* Replicate it as an [LR]POP instead of B[LR]POP. */
                    rewriteClientCommandVector(c,2,
                        (where == REDIS_HEAD) ? shared.lpop : shared.rpop,
                        c->argv[j]);
905 906 907 908 909
                    return;
                }
            }
        }
    }
910

911 912 913 914 915 916 917
    /* If we are inside a MULTI/EXEC and the list is empty the only thing
     * we can do is treating it as a timeout (even with timeout 0). */
    if (c->flags & REDIS_MULTI) {
        addReply(c,shared.nullmultibulk);
        return;
    }

918
    /* If the list is empty or the key does not exists we must block */
919
    blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
920 921 922 923 924 925 926 927 928
}

void blpopCommand(redisClient *c) {
    blockingPopGenericCommand(c,REDIS_HEAD);
}

void brpopCommand(redisClient *c) {
    blockingPopGenericCommand(c,REDIS_TAIL);
}
D
Damian Janowski &amp; Michel Martens 已提交
929 930

void brpoplpushCommand(redisClient *c) {
931
    mstime_t timeout;
D
Damian Janowski &amp; Michel Martens 已提交
932

933 934
    if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
        != REDIS_OK) return;
935 936 937 938 939

    robj *key = lookupKeyWrite(c->db, c->argv[1]);

    if (key == NULL) {
        if (c->flags & REDIS_MULTI) {
940 941
            /* Blocking against an empty list in a multi state
             * returns immediately. */
942
            addReply(c, shared.nullbulk);
943
        } else {
944
            /* The list is empty and the client blocks. */
945 946 947
            blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
        }
    } else {
948 949 950 951 952
        if (key->type != REDIS_LIST) {
            addReply(c, shared.wrongtypeerr);
        } else {
            /* The list exists and has elements, so
             * the regular rpoplpushCommand is executed. */
953
            redisAssertWithInfo(c,key,listTypeLength(key) > 0);
954 955
            rpoplpushCommand(c);
        }
956
    }
D
Damian Janowski &amp; Michel Martens 已提交
957
}