t_list.c 25.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
/*
 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   * Redistributions of source code must retain the above copyright notice,
 *     this list of conditions and the following disclaimer.
 *   * Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *   * Neither the name of Redis nor the names of its contributors may be used
 *     to endorse or promote products derived from this software without
 *     specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

30
#include "server.h"
31 32 33 34 35

/*-----------------------------------------------------------------------------
 * List API
 *----------------------------------------------------------------------------*/

G
guiquanz 已提交
36
/* The function pushes an element to the specified list object 'subject',
37 38
 * at head or tail position as specified by 'where'.
 *
G
guiquanz 已提交
39
 * There is no need for the caller to increment the refcount of 'value' as
40
 * the function takes care of it if needed. */
41
void listTypePush(robj *subject, robj *value, int where) {
42
    if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
A
antirez 已提交
43
        int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
44
        value = getDecodedObject(value);
M
Matt Stancliff 已提交
45
        size_t len = sdslen(value->ptr);
46
        quicklistPush(subject->ptr, value->ptr, len, pos);
47 48
        decrRefCount(value);
    } else {
A
antirez 已提交
49
        serverPanic("Unknown list encoding");
50 51 52
    }
}

M
Matt Stancliff 已提交
53 54 55 56
void *listPopSaver(unsigned char *data, unsigned int sz) {
    return createStringObject((char*)data,sz);
}

57
robj *listTypePop(robj *subject, int where) {
M
Matt Stancliff 已提交
58
    long long vlong;
59
    robj *value = NULL;
M
Matt Stancliff 已提交
60

A
antirez 已提交
61
    int ql_where = where == LIST_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL;
62
    if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
63 64 65
        if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value,
                               NULL, &vlong, listPopSaver)) {
            if (!value)
66 67 68
                value = createStringObjectFromLongLong(vlong);
        }
    } else {
A
antirez 已提交
69
        serverPanic("Unknown list encoding");
70 71 72 73
    }
    return value;
}

74
unsigned long listTypeLength(const robj *subject) {
75
    if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
76
        return quicklistCount(subject->ptr);
77
    } else {
A
antirez 已提交
78
        serverPanic("Unknown list encoding");
79 80 81 82
    }
}

/* Initialize an iterator at the specified index. */
M
Matt Stancliff 已提交
83 84
listTypeIterator *listTypeInitIterator(robj *subject, long index,
                                       unsigned char direction) {
85 86 87 88
    listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
    li->subject = subject;
    li->encoding = subject->encoding;
    li->direction = direction;
M
Matt Stancliff 已提交
89
    li->iter = NULL;
A
antirez 已提交
90 91
    /* LIST_HEAD means start at TAIL and move *towards* head.
     * LIST_TAIL means start at HEAD and move *towards tail. */
M
Matt Stancliff 已提交
92
    int iter_direction =
A
antirez 已提交
93
        direction == LIST_HEAD ? AL_START_TAIL : AL_START_HEAD;
94
    if (li->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
95 96
        li->iter = quicklistGetIteratorAtIdx(li->subject->ptr,
                                             iter_direction, index);
97
    } else {
A
antirez 已提交
98
        serverPanic("Unknown list encoding");
99 100 101 102 103 104
    }
    return li;
}

/* Clean up the iterator. */
void listTypeReleaseIterator(listTypeIterator *li) {
M
Matt Stancliff 已提交
105
    zfree(li->iter);
106 107 108 109 110 111 112 113
    zfree(li);
}

/* Stores pointer to current the entry in the provided entry structure
 * and advances the position of the iterator. Returns 1 when the current
 * entry is in fact an entry, 0 otherwise. */
int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
    /* Protect from converting when iterating */
A
antirez 已提交
114
    serverAssert(li->subject->encoding == li->encoding);
115 116

    entry->li = li;
117
    if (li->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
118
        return quicklistNext(li->iter, &entry->entry);
119
    } else {
A
antirez 已提交
120
        serverPanic("Unknown list encoding");
121 122 123 124 125 126 127
    }
    return 0;
}

/* Return entry or NULL at the current position of the iterator. */
robj *listTypeGet(listTypeEntry *entry) {
    robj *value = NULL;
128
    if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
129 130 131 132 133
        if (entry->entry.value) {
            value = createStringObject((char *)entry->entry.value,
                                       entry->entry.sz);
        } else {
            value = createStringObjectFromLongLong(entry->entry.longval);
134 135
        }
    } else {
A
antirez 已提交
136
        serverPanic("Unknown list encoding");
137 138 139 140 141
    }
    return value;
}

void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
142
    if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
143
        value = getDecodedObject(value);
M
Matt Stancliff 已提交
144 145
        sds str = value->ptr;
        size_t len = sdslen(str);
A
antirez 已提交
146
        if (where == LIST_TAIL) {
147
            quicklistInsertAfter((quicklist *)entry->entry.quicklist,
M
Matt Stancliff 已提交
148
                                 &entry->entry, str, len);
A
antirez 已提交
149
        } else if (where == LIST_HEAD) {
150
            quicklistInsertBefore((quicklist *)entry->entry.quicklist,
M
Matt Stancliff 已提交
151
                                  &entry->entry, str, len);
152 153 154
        }
        decrRefCount(value);
    } else {
A
antirez 已提交
155
        serverPanic("Unknown list encoding");
156 157 158 159 160
    }
}

/* Compare the given object with the entry at the current position. */
int listTypeEqual(listTypeEntry *entry, robj *o) {
161
    if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
A
antirez 已提交
162
        serverAssertWithInfo(NULL,o,sdsEncodedObject(o));
M
Matt Stancliff 已提交
163
        return quicklistCompare(entry->entry.zi,o->ptr,sdslen(o->ptr));
164
    } else {
A
antirez 已提交
165
        serverPanic("Unknown list encoding");
166 167 168 169
    }
}

/* Delete the element pointed to. */
M
Matt Stancliff 已提交
170
void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry) {
171
    if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
172
        quicklistDelEntry(iter->iter, &entry->entry);
173
    } else {
A
antirez 已提交
174
        serverPanic("Unknown list encoding");
175 176 177
    }
}

M
Matt Stancliff 已提交
178
/* Create a quicklist from a single ziplist */
179
void listTypeConvert(robj *subject, int enc) {
A
antirez 已提交
180 181
    serverAssertWithInfo(NULL,subject,subject->type==OBJ_LIST);
    serverAssertWithInfo(NULL,subject,subject->encoding==OBJ_ENCODING_ZIPLIST);
182

183
    if (enc == OBJ_ENCODING_QUICKLIST) {
184 185 186
        size_t zlen = server.list_max_ziplist_size;
        int depth = server.list_compress_depth;
        subject->ptr = quicklistCreateFromZiplist(zlen, depth, subject->ptr);
187
        subject->encoding = OBJ_ENCODING_QUICKLIST;
188
    } else {
A
antirez 已提交
189
        serverPanic("Unsupported list conversion");
190 191 192 193 194 195 196
    }
}

/*-----------------------------------------------------------------------------
 * List Commands
 *----------------------------------------------------------------------------*/

197
void pushGenericCommand(client *c, int where) {
P
Pierre Chapuis 已提交
198
    int j, pushed = 0;
199
    robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
A
antirez 已提交
200

201
    if (lobj && lobj->type != OBJ_LIST) {
A
antirez 已提交
202 203 204 205 206 207
        addReply(c,shared.wrongtypeerr);
        return;
    }

    for (j = 2; j < c->argc; j++) {
        if (!lobj) {
M
Matt Stancliff 已提交
208
            lobj = createQuicklistObject();
209 210
            quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
                                server.list_compress_depth);
A
antirez 已提交
211
            dbAdd(c->db,c->argv[1],lobj);
212
        }
A
antirez 已提交
213 214
        listTypePush(lobj,c->argv[j],where);
        pushed++;
215
    }
P
Pierre Chapuis 已提交
216
    addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0));
217
    if (pushed) {
A
antirez 已提交
218
        char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
219 220

        signalModifiedKey(c->db,c->argv[1]);
A
antirez 已提交
221
        notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
222
    }
A
antirez 已提交
223
    server.dirty += pushed;
224 225
}

226
void lpushCommand(client *c) {
A
antirez 已提交
227
    pushGenericCommand(c,LIST_HEAD);
228 229
}

230
void rpushCommand(client *c) {
A
antirez 已提交
231
    pushGenericCommand(c,LIST_TAIL);
232 233
}

234
void pushxGenericCommand(client *c, int where) {
P
Pierre Chapuis 已提交
235
    int j, pushed = 0;
236 237
    robj *subject;

238
    if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
239
        checkType(c,subject,OBJ_LIST)) return;
240

P
Pierre Chapuis 已提交
241 242 243 244
    for (j = 2; j < c->argc; j++) {
        listTypePush(subject,c->argv[j],where);
        pushed++;
    }
245

246
    addReplyLongLong(c,listTypeLength(subject));
P
Pierre Chapuis 已提交
247 248 249 250 251 252 253

    if (pushed) {
        char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
    }
    server.dirty += pushed;
254 255
}

256
void lpushxCommand(client *c) {
257
    pushxGenericCommand(c,LIST_HEAD);
258 259
}

260
void rpushxCommand(client *c) {
261
    pushxGenericCommand(c,LIST_TAIL);
262 263
}

264
void linsertCommand(client *c) {
265 266 267 268 269 270
    int where;
    robj *subject;
    listTypeIterator *iter;
    listTypeEntry entry;
    int inserted = 0;

271
    if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
272
        where = LIST_TAIL;
273
    } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
274
        where = LIST_HEAD;
275 276
    } else {
        addReply(c,shared.syntaxerr);
277 278 279 280 281 282 283 284 285 286 287 288 289 290
        return;
    }

    if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
        checkType(c,subject,OBJ_LIST)) return;

    /* Seek pivot from head to tail */
    iter = listTypeInitIterator(subject,0,LIST_TAIL);
    while (listTypeNext(iter,&entry)) {
        if (listTypeEqual(&entry,c->argv[3])) {
            listTypeInsert(&entry,c->argv[4],where);
            inserted = 1;
            break;
        }
291
    }
292 293 294 295 296 297 298 299 300
    listTypeReleaseIterator(iter);

    if (inserted) {
        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(NOTIFY_LIST,"linsert",
                            c->argv[1],c->db->id);
        server.dirty++;
    } else {
        /* Notify client of a failed insert */
301
        addReplyLongLong(c,-1);
302 303 304 305
        return;
    }

    addReplyLongLong(c,listTypeLength(subject));
306 307
}

308
void llenCommand(client *c) {
309
    robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
310
    if (o == NULL || checkType(c,o,OBJ_LIST)) return;
311
    addReplyLongLong(c,listTypeLength(o));
312 313
}

314
void lindexCommand(client *c) {
A
antirez 已提交
315
    robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]);
316
    if (o == NULL || checkType(c,o,OBJ_LIST)) return;
317
    long index;
318 319
    robj *value = NULL;

320
    if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
321 322
        return;

323
    if (o->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
324 325 326 327
        quicklistEntry entry;
        if (quicklistIndex(o->ptr, index, &entry)) {
            if (entry.value) {
                value = createStringObject((char*)entry.value,entry.sz);
328
            } else {
M
Matt Stancliff 已提交
329
                value = createStringObjectFromLongLong(entry.longval);
330 331 332 333
            }
            addReplyBulk(c,value);
            decrRefCount(value);
        } else {
A
antirez 已提交
334
            addReplyNull(c);
335 336
        }
    } else {
A
antirez 已提交
337
        serverPanic("Unknown list encoding");
338 339 340
    }
}

341
void lsetCommand(client *c) {
342
    robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
343
    if (o == NULL || checkType(c,o,OBJ_LIST)) return;
344
    long index;
M
Matt Stancliff 已提交
345
    robj *value = c->argv[3];
346

347
    if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
348 349
        return;

350
    if (o->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
351 352 353 354
        quicklist *ql = o->ptr;
        int replaced = quicklistReplaceAtIndex(ql, index,
                                               value->ptr, sdslen(value->ptr));
        if (!replaced) {
355 356 357
            addReply(c,shared.outofrangeerr);
        } else {
            addReply(c,shared.ok);
358
            signalModifiedKey(c->db,c->argv[1]);
A
antirez 已提交
359
            notifyKeyspaceEvent(NOTIFY_LIST,"lset",c->argv[1],c->db->id);
360 361 362
            server.dirty++;
        }
    } else {
A
antirez 已提交
363
        serverPanic("Unknown list encoding");
364 365 366
    }
}

367
void popGenericCommand(client *c, int where) {
A
antirez 已提交
368
    robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]);
369
    if (o == NULL || checkType(c,o,OBJ_LIST)) return;
370 371 372

    robj *value = listTypePop(o,where);
    if (value == NULL) {
A
antirez 已提交
373
        addReplyNull(c);
374
    } else {
A
antirez 已提交
375
        char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
376

377 378
        addReplyBulk(c,value);
        decrRefCount(value);
A
antirez 已提交
379
        notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
380
        if (listTypeLength(o) == 0) {
A
antirez 已提交
381
            notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
382
                                c->argv[1],c->db->id);
383 384
            dbDelete(c->db,c->argv[1]);
        }
385
        signalModifiedKey(c->db,c->argv[1]);
386 387 388 389
        server.dirty++;
    }
}

390
void lpopCommand(client *c) {
A
antirez 已提交
391
    popGenericCommand(c,LIST_HEAD);
392 393
}

394
void rpopCommand(client *c) {
A
antirez 已提交
395
    popGenericCommand(c,LIST_TAIL);
396 397
}

398
void lrangeCommand(client *c) {
399
    robj *o;
400
    long start, end, llen, rangelen;
401

402 403
    if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
        (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
404

405
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyarray)) == NULL
406
         || checkType(c,o,OBJ_LIST)) return;
407 408 409 410 411 412 413
    llen = listTypeLength(o);

    /* convert negative indexes */
    if (start < 0) start = llen+start;
    if (end < 0) end = llen+end;
    if (start < 0) start = 0;

414 415
    /* Invariant: start >= 0, so this test will be true when end < 0.
     * The range is empty when start > end or start >= length. */
416
    if (start > end || start >= llen) {
417
        addReply(c,shared.emptyarray);
418 419 420 421 422 423
        return;
    }
    if (end >= llen) end = llen-1;
    rangelen = (end-start)+1;

    /* Return the result in form of a multi-bulk reply */
424
    addReplyArrayLen(c,rangelen);
425
    if (o->encoding == OBJ_ENCODING_QUICKLIST) {
A
antirez 已提交
426
        listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);
427 428

        while(rangelen--) {
M
Matt Stancliff 已提交
429 430 431 432 433
            listTypeEntry entry;
            listTypeNext(iter, &entry);
            quicklistEntry *qe = &entry.entry;
            if (qe->value) {
                addReplyBulkCBuffer(c,qe->value,qe->sz);
434
            } else {
M
Matt Stancliff 已提交
435
                addReplyBulkLongLong(c,qe->longval);
436 437
            }
        }
M
Matt Stancliff 已提交
438
        listTypeReleaseIterator(iter);
439
    } else {
A
antirez 已提交
440
        serverPanic("List encoding is not QUICKLIST!");
441 442 443
    }
}

444
void ltrimCommand(client *c) {
445
    robj *o;
M
Matt Stancliff 已提交
446
    long start, end, llen, ltrim, rtrim;
447

448 449
    if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
        (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
450

451
    if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
452
        checkType(c,o,OBJ_LIST)) return;
453 454 455 456 457 458 459
    llen = listTypeLength(o);

    /* convert negative indexes */
    if (start < 0) start = llen+start;
    if (end < 0) end = llen+end;
    if (start < 0) start = 0;

460 461
    /* Invariant: start >= 0, so this test will be true when end < 0.
     * The range is empty when start > end or start >= length. */
462 463 464 465 466 467 468 469 470 471 472
    if (start > end || start >= llen) {
        /* Out of range start or start > end result in empty list */
        ltrim = llen;
        rtrim = 0;
    } else {
        if (end >= llen) end = llen-1;
        ltrim = start;
        rtrim = llen-end-1;
    }

    /* Remove list elements to perform the trim */
473
    if (o->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
474 475
        quicklistDelRange(o->ptr,0,ltrim);
        quicklistDelRange(o->ptr,-rtrim,rtrim);
476
    } else {
A
antirez 已提交
477
        serverPanic("Unknown list encoding");
478
    }
479

A
antirez 已提交
480
    notifyKeyspaceEvent(NOTIFY_LIST,"ltrim",c->argv[1],c->db->id);
481 482
    if (listTypeLength(o) == 0) {
        dbDelete(c->db,c->argv[1]);
A
antirez 已提交
483
        notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
484
    }
485
    signalModifiedKey(c->db,c->argv[1]);
486 487 488 489
    server.dirty++;
    addReply(c,shared.ok);
}

490
void lremCommand(client *c) {
491
    robj *subject, *obj;
M
Matt Stancliff 已提交
492
    obj = c->argv[3];
493
    long toremove;
494
    long removed = 0;
495

496
    if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != C_OK))
497 498
        return;

499
    subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
500
    if (subject == NULL || checkType(c,subject,OBJ_LIST)) return;
501 502 503 504

    listTypeIterator *li;
    if (toremove < 0) {
        toremove = -toremove;
A
antirez 已提交
505
        li = listTypeInitIterator(subject,-1,LIST_HEAD);
506
    } else {
A
antirez 已提交
507
        li = listTypeInitIterator(subject,0,LIST_TAIL);
508 509
    }

M
Matt Stancliff 已提交
510
    listTypeEntry entry;
511 512
    while (listTypeNext(li,&entry)) {
        if (listTypeEqual(&entry,obj)) {
M
Matt Stancliff 已提交
513
            listTypeDelete(li, &entry);
514 515 516 517 518 519 520
            server.dirty++;
            removed++;
            if (toremove && removed == toremove) break;
        }
    }
    listTypeReleaseIterator(li);

521 522
    if (removed) {
        signalModifiedKey(c->db,c->argv[1]);
523
        notifyKeyspaceEvent(NOTIFY_LIST,"lrem",c->argv[1],c->db->id);
524 525
    }

M
Matt Stancliff 已提交
526 527
    if (listTypeLength(subject) == 0) {
        dbDelete(c->db,c->argv[1]);
528
        notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
M
Matt Stancliff 已提交
529
    }
530

531
    addReplyLongLong(c,removed);
532 533 534 535
}

/* This is the semantic of this command:
 *  RPOPLPUSH srclist dstlist:
536 537 538 539 540 541 542
 *    IF LLEN(srclist) > 0
 *      element = RPOP srclist
 *      LPUSH dstlist element
 *      RETURN element
 *    ELSE
 *      RETURN nil
 *    END
543 544 545 546 547 548
 *  END
 *
 * The idea is to be able to get an element from a list in a reliable way
 * since the element is not just returned but pushed against another list
 * as well. This command was originally proposed by Ezra Zygmuntowicz.
 */
549

550
void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) {
551 552
    /* Create the list if the key does not exist */
    if (!dstobj) {
M
Matt Stancliff 已提交
553
        dstobj = createQuicklistObject();
554 555
        quicklistSetOptions(dstobj->ptr, server.list_max_ziplist_size,
                            server.list_compress_depth);
556
        dbAdd(c->db,dstkey,dstobj);
557
    }
558
    signalModifiedKey(c->db,dstkey);
A
antirez 已提交
559 560
    listTypePush(dstobj,value,LIST_HEAD);
    notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id);
561 562 563 564
    /* Always send the pushed value to the client. */
    addReplyBulk(c,value);
}

565
void rpoplpushCommand(client *c) {
566
    robj *sobj, *value;
A
antirez 已提交
567 568
    if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.null[c->resp]))
        == NULL || checkType(c,sobj,OBJ_LIST)) return;
569 570

    if (listTypeLength(sobj) == 0) {
571 572
        /* This may only happen after loading very old RDB files. Recent
         * versions of Redis delete keys of empty lists. */
A
antirez 已提交
573
        addReplyNull(c);
574 575
    } else {
        robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
576 577
        robj *touchedkey = c->argv[1];

578
        if (dobj && checkType(c,dobj,OBJ_LIST)) return;
A
antirez 已提交
579
        value = listTypePop(sobj,LIST_TAIL);
580
        /* We saved touched key, and protect it, since rpoplpushHandlePush
581 582
         * may change the client command argument vector (it does not
         * currently). */
583
        incrRefCount(touchedkey);
584
        rpoplpushHandlePush(c,c->argv[2],dobj,value);
585 586 587 588 589

        /* listTypePop returns an object with its refcount incremented */
        decrRefCount(value);

        /* Delete the source list when it is empty */
A
antirez 已提交
590
        notifyKeyspaceEvent(NOTIFY_LIST,"rpop",touchedkey,c->db->id);
591 592
        if (listTypeLength(sobj) == 0) {
            dbDelete(c->db,touchedkey);
A
antirez 已提交
593
            notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
594
                                touchedkey,c->db->id);
595
        }
596 597
        signalModifiedKey(c->db,touchedkey);
        decrRefCount(touchedkey);
598
        server.dirty++;
599
        if (c->cmd->proc == brpoplpushCommand) {
600 601
            rewriteClientCommandVector(c,3,shared.rpoplpush,c->argv[1],c->argv[2]);
        }
602 603 604 605 606 607 608
    }
}

/*-----------------------------------------------------------------------------
 * Blocking POP operations
 *----------------------------------------------------------------------------*/

609
/* This is a helper function for handleClientsBlockedOnKeys(). It's work
610 611
 * is to serve a specific client (receiver) that is blocked on 'key'
 * in the context of the specified 'db', doing the following:
612
 *
613 614
 * 1) Provide the client with the 'value' element.
 * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
G
guiquanz 已提交
615
 *    'value' element on the destination list (the LPUSH side of the command).
616 617 618
 * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
 *    the AOF and replication channel.
 *
A
antirez 已提交
619
 * The argument 'where' is LIST_TAIL or LIST_HEAD, and indicates if the
620
 * 'value' element was popped from the head (BLPOP) or tail (BRPOP) so that
621 622
 * we can propagate the command properly.
 *
623 624
 * The function returns C_OK if we are able to serve the client, otherwise
 * C_ERR is returned to signal the caller that the list POP operation
G
guiquanz 已提交
625
 * should be undone as the client was not served: This only happens for
626 627
 * BRPOPLPUSH that fails to push the value to the destination key as it is
 * of the wrong type. */
628
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
629 630 631 632 633
{
    robj *argv[3];

    if (dstkey == NULL) {
        /* Propagate the [LR]POP operation. */
A
antirez 已提交
634
        argv[0] = (where == LIST_HEAD) ? shared.lpop :
635 636
                                          shared.rpop;
        argv[1] = key;
A
antirez 已提交
637
        propagate((where == LIST_HEAD) ?
638
            server.lpopCommand : server.rpopCommand,
A
antirez 已提交
639
            db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
640 641

        /* BRPOP/BLPOP */
642
        addReplyArrayLen(receiver,2);
643 644
        addReplyBulk(receiver,key);
        addReplyBulk(receiver,value);
645

646 647 648
        /* Notify event. */
        char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
        notifyKeyspaceEvent(NOTIFY_LIST,event,key,receiver->db->id);
649 650 651 652 653
    } else {
        /* BRPOPLPUSH */
        robj *dstobj =
            lookupKeyWrite(receiver->db,dstkey);
        if (!(dstobj &&
654
             checkType(receiver,dstobj,OBJ_LIST)))
655 656 657
        {
            rpoplpushHandlePush(receiver,dstkey,dstobj,
                value);
658 659 660 661 662
            /* Propagate the RPOPLPUSH operation. */
            argv[0] = shared.rpoplpush;
            argv[1] = key;
            argv[2] = dstkey;
            propagate(server.rpoplpushCommand,
663
                db->id,argv,3,
A
antirez 已提交
664 665
                PROPAGATE_AOF|
                PROPAGATE_REPL);
666 667 668

            /* Notify event ("lpush" was notified by rpoplpushHandlePush). */
            notifyKeyspaceEvent(NOTIFY_LIST,"rpop",key,receiver->db->id);
669
        } else {
670 671
            /* BRPOPLPUSH failed because of wrong
             * destination type. */
672
            return C_ERR;
673
        }
D
Damian Janowski &amp; Michel Martens 已提交
674
    }
675
    return C_OK;
676
}
677 678

/* Blocking RPOP/LPOP */
679
void blockingPopGenericCommand(client *c, int where) {
680
    robj *o;
681
    mstime_t timeout;
682 683
    int j;

684
    if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
685
        != C_OK) return;
686

687 688 689
    for (j = 1; j < c->argc-1; j++) {
        o = lookupKeyWrite(c->db,c->argv[j]);
        if (o != NULL) {
690
            if (o->type != OBJ_LIST) {
691 692 693 694
                addReply(c,shared.wrongtypeerr);
                return;
            } else {
                if (listTypeLength(o) != 0) {
695
                    /* Non empty list, this is like a non normal [LR]POP. */
A
antirez 已提交
696
                    char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
697
                    robj *value = listTypePop(o,where);
A
antirez 已提交
698
                    serverAssert(value != NULL);
D
Damian Janowski &amp; Michel Martens 已提交
699

700
                    addReplyArrayLen(c,2);
701 702 703
                    addReplyBulk(c,c->argv[j]);
                    addReplyBulk(c,value);
                    decrRefCount(value);
A
antirez 已提交
704
                    notifyKeyspaceEvent(NOTIFY_LIST,event,
705
                                        c->argv[j],c->db->id);
706 707
                    if (listTypeLength(o) == 0) {
                        dbDelete(c->db,c->argv[j]);
A
antirez 已提交
708
                        notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
709
                                            c->argv[j],c->db->id);
710
                    }
711 712 713 714 715
                    signalModifiedKey(c->db,c->argv[j]);
                    server.dirty++;

                    /* Replicate it as an [LR]POP instead of B[LR]POP. */
                    rewriteClientCommandVector(c,2,
A
antirez 已提交
716
                        (where == LIST_HEAD) ? shared.lpop : shared.rpop,
717
                        c->argv[j]);
718 719 720 721 722
                    return;
                }
            }
        }
    }
723

724 725
    /* If we are inside a MULTI/EXEC and the list is empty the only thing
     * we can do is treating it as a timeout (even with timeout 0). */
A
antirez 已提交
726
    if (c->flags & CLIENT_MULTI) {
727
        addReplyNullArray(c);
728 729 730
        return;
    }

731
    /* If the list is empty or the key does not exists we must block */
732
    blockForKeys(c,BLOCKED_LIST,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
733 734
}

735
void blpopCommand(client *c) {
A
antirez 已提交
736
    blockingPopGenericCommand(c,LIST_HEAD);
737 738
}

739
void brpopCommand(client *c) {
A
antirez 已提交
740
    blockingPopGenericCommand(c,LIST_TAIL);
741
}
D
Damian Janowski &amp; Michel Martens 已提交
742

743
void brpoplpushCommand(client *c) {
744
    mstime_t timeout;
D
Damian Janowski &amp; Michel Martens 已提交
745

746
    if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
747
        != C_OK) return;
748 749 750 751

    robj *key = lookupKeyWrite(c->db, c->argv[1]);

    if (key == NULL) {
A
antirez 已提交
752
        if (c->flags & CLIENT_MULTI) {
753 754
            /* Blocking against an empty list in a multi state
             * returns immediately. */
A
antirez 已提交
755
            addReplyNull(c);
756
        } else {
757
            /* The list is empty and the client blocks. */
758
            blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,timeout,c->argv[2],NULL);
759 760
        }
    } else {
761
        if (key->type != OBJ_LIST) {
762 763 764 765
            addReply(c, shared.wrongtypeerr);
        } else {
            /* The list exists and has elements, so
             * the regular rpoplpushCommand is executed. */
A
antirez 已提交
766
            serverAssertWithInfo(c,key,listTypeLength(key) > 0);
767 768
            rpoplpushCommand(c);
        }
769
    }
D
Damian Janowski &amp; Michel Martens 已提交
770
}