t_list.c 33.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
/*
 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   * Redistributions of source code must retain the above copyright notice,
 *     this list of conditions and the following disclaimer.
 *   * Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *   * Neither the name of Redis nor the names of its contributors may be used
 *     to endorse or promote products derived from this software without
 *     specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

30
#include "server.h"
31 32 33 34 35

/*-----------------------------------------------------------------------------
 * List API
 *----------------------------------------------------------------------------*/

G
guiquanz 已提交
36
/* The function pushes an element to the specified list object 'subject',
37 38
 * at head or tail position as specified by 'where'.
 *
G
guiquanz 已提交
39
 * There is no need for the caller to increment the refcount of 'value' as
40
 * the function takes care of it if needed. */
41
void listTypePush(robj *subject, robj *value, int where) {
42
    if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
A
antirez 已提交
43
        int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
44
        value = getDecodedObject(value);
M
Matt Stancliff 已提交
45
        size_t len = sdslen(value->ptr);
46
        quicklistPush(subject->ptr, value->ptr, len, pos);
47 48
        decrRefCount(value);
    } else {
A
antirez 已提交
49
        serverPanic("Unknown list encoding");
50 51 52
    }
}

M
Matt Stancliff 已提交
53 54 55 56
void *listPopSaver(unsigned char *data, unsigned int sz) {
    return createStringObject((char*)data,sz);
}

57
robj *listTypePop(robj *subject, int where) {
M
Matt Stancliff 已提交
58
    long long vlong;
59
    robj *value = NULL;
M
Matt Stancliff 已提交
60

A
antirez 已提交
61
    int ql_where = where == LIST_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL;
62
    if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
63 64 65
        if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value,
                               NULL, &vlong, listPopSaver)) {
            if (!value)
66 67 68
                value = createStringObjectFromLongLong(vlong);
        }
    } else {
A
antirez 已提交
69
        serverPanic("Unknown list encoding");
70 71 72 73 74
    }
    return value;
}

unsigned long listTypeLength(robj *subject) {
75
    if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
76
        return quicklistCount(subject->ptr);
77
    } else {
A
antirez 已提交
78
        serverPanic("Unknown list encoding");
79 80 81 82
    }
}

/* Initialize an iterator at the specified index. */
M
Matt Stancliff 已提交
83 84
listTypeIterator *listTypeInitIterator(robj *subject, long index,
                                       unsigned char direction) {
85 86 87 88
    listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
    li->subject = subject;
    li->encoding = subject->encoding;
    li->direction = direction;
M
Matt Stancliff 已提交
89
    li->iter = NULL;
A
antirez 已提交
90 91
    /* LIST_HEAD means start at TAIL and move *towards* head.
     * LIST_TAIL means start at HEAD and move *towards tail. */
M
Matt Stancliff 已提交
92
    int iter_direction =
A
antirez 已提交
93
        direction == LIST_HEAD ? AL_START_TAIL : AL_START_HEAD;
94
    if (li->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
95 96
        li->iter = quicklistGetIteratorAtIdx(li->subject->ptr,
                                             iter_direction, index);
97
    } else {
A
antirez 已提交
98
        serverPanic("Unknown list encoding");
99 100 101 102 103 104
    }
    return li;
}

/* Clean up the iterator. */
void listTypeReleaseIterator(listTypeIterator *li) {
M
Matt Stancliff 已提交
105
    zfree(li->iter);
106 107 108 109 110 111 112 113
    zfree(li);
}

/* Stores pointer to current the entry in the provided entry structure
 * and advances the position of the iterator. Returns 1 when the current
 * entry is in fact an entry, 0 otherwise. */
int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
    /* Protect from converting when iterating */
A
antirez 已提交
114
    serverAssert(li->subject->encoding == li->encoding);
115 116

    entry->li = li;
117
    if (li->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
118
        return quicklistNext(li->iter, &entry->entry);
119
    } else {
A
antirez 已提交
120
        serverPanic("Unknown list encoding");
121 122 123 124 125 126 127
    }
    return 0;
}

/* Return entry or NULL at the current position of the iterator. */
robj *listTypeGet(listTypeEntry *entry) {
    robj *value = NULL;
128
    if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
129 130 131 132 133
        if (entry->entry.value) {
            value = createStringObject((char *)entry->entry.value,
                                       entry->entry.sz);
        } else {
            value = createStringObjectFromLongLong(entry->entry.longval);
134 135
        }
    } else {
A
antirez 已提交
136
        serverPanic("Unknown list encoding");
137 138 139 140 141
    }
    return value;
}

void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
142
    if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
143
        value = getDecodedObject(value);
M
Matt Stancliff 已提交
144 145
        sds str = value->ptr;
        size_t len = sdslen(str);
A
antirez 已提交
146
        if (where == LIST_TAIL) {
147
            quicklistInsertAfter((quicklist *)entry->entry.quicklist,
M
Matt Stancliff 已提交
148
                                 &entry->entry, str, len);
A
antirez 已提交
149
        } else if (where == LIST_HEAD) {
150
            quicklistInsertBefore((quicklist *)entry->entry.quicklist,
M
Matt Stancliff 已提交
151
                                  &entry->entry, str, len);
152 153 154
        }
        decrRefCount(value);
    } else {
A
antirez 已提交
155
        serverPanic("Unknown list encoding");
156 157 158 159 160
    }
}

/* Compare the given object with the entry at the current position. */
int listTypeEqual(listTypeEntry *entry, robj *o) {
161
    if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
A
antirez 已提交
162
        serverAssertWithInfo(NULL,o,sdsEncodedObject(o));
M
Matt Stancliff 已提交
163
        return quicklistCompare(entry->entry.zi,o->ptr,sdslen(o->ptr));
164
    } else {
A
antirez 已提交
165
        serverPanic("Unknown list encoding");
166 167 168 169
    }
}

/* Delete the element pointed to. */
M
Matt Stancliff 已提交
170
void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry) {
171
    if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
172
        quicklistDelEntry(iter->iter, &entry->entry);
173
    } else {
A
antirez 已提交
174
        serverPanic("Unknown list encoding");
175 176 177
    }
}

M
Matt Stancliff 已提交
178
/* Create a quicklist from a single ziplist */
179
void listTypeConvert(robj *subject, int enc) {
A
antirez 已提交
180 181
    serverAssertWithInfo(NULL,subject,subject->type==OBJ_LIST);
    serverAssertWithInfo(NULL,subject,subject->encoding==OBJ_ENCODING_ZIPLIST);
182

183
    if (enc == OBJ_ENCODING_QUICKLIST) {
184 185 186
        size_t zlen = server.list_max_ziplist_size;
        int depth = server.list_compress_depth;
        subject->ptr = quicklistCreateFromZiplist(zlen, depth, subject->ptr);
187
        subject->encoding = OBJ_ENCODING_QUICKLIST;
188
    } else {
A
antirez 已提交
189
        serverPanic("Unsupported list conversion");
190 191 192 193 194 195 196
    }
}

/*-----------------------------------------------------------------------------
 * List Commands
 *----------------------------------------------------------------------------*/

197
void pushGenericCommand(client *c, int where) {
P
Pierre Chapuis 已提交
198
    int j, pushed = 0;
199
    robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
A
antirez 已提交
200

201
    if (lobj && lobj->type != OBJ_LIST) {
A
antirez 已提交
202 203 204 205 206 207
        addReply(c,shared.wrongtypeerr);
        return;
    }

    for (j = 2; j < c->argc; j++) {
        if (!lobj) {
M
Matt Stancliff 已提交
208
            lobj = createQuicklistObject();
209 210
            quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
                                server.list_compress_depth);
A
antirez 已提交
211
            dbAdd(c->db,c->argv[1],lobj);
212
        }
A
antirez 已提交
213 214
        listTypePush(lobj,c->argv[j],where);
        pushed++;
215
    }
P
Pierre Chapuis 已提交
216
    addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0));
217
    if (pushed) {
A
antirez 已提交
218
        char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
219 220

        signalModifiedKey(c->db,c->argv[1]);
A
antirez 已提交
221
        notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
222
    }
A
antirez 已提交
223
    server.dirty += pushed;
224 225
}

226
void lpushCommand(client *c) {
A
antirez 已提交
227
    pushGenericCommand(c,LIST_HEAD);
228 229
}

230
void rpushCommand(client *c) {
A
antirez 已提交
231
    pushGenericCommand(c,LIST_TAIL);
232 233
}

234
void pushxGenericCommand(client *c, int where) {
P
Pierre Chapuis 已提交
235
    int j, pushed = 0;
236 237
    robj *subject;

238
    if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
239
        checkType(c,subject,OBJ_LIST)) return;
240

P
Pierre Chapuis 已提交
241 242 243 244
    for (j = 2; j < c->argc; j++) {
        listTypePush(subject,c->argv[j],where);
        pushed++;
    }
245

246
    addReplyLongLong(c,listTypeLength(subject));
P
Pierre Chapuis 已提交
247 248 249 250 251 252 253

    if (pushed) {
        char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
    }
    server.dirty += pushed;
254 255
}

256
void lpushxCommand(client *c) {
257
    pushxGenericCommand(c,LIST_HEAD);
258 259
}

260
void rpushxCommand(client *c) {
261
    pushxGenericCommand(c,LIST_TAIL);
262 263
}

264
void linsertCommand(client *c) {
265 266 267 268 269 270
    int where;
    robj *subject;
    listTypeIterator *iter;
    listTypeEntry entry;
    int inserted = 0;

271
    if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
272
        where = LIST_TAIL;
273
    } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
274
        where = LIST_HEAD;
275 276
    } else {
        addReply(c,shared.syntaxerr);
277 278 279 280 281 282 283 284 285 286 287 288 289 290
        return;
    }

    if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
        checkType(c,subject,OBJ_LIST)) return;

    /* Seek pivot from head to tail */
    iter = listTypeInitIterator(subject,0,LIST_TAIL);
    while (listTypeNext(iter,&entry)) {
        if (listTypeEqual(&entry,c->argv[3])) {
            listTypeInsert(&entry,c->argv[4],where);
            inserted = 1;
            break;
        }
291
    }
292 293 294 295 296 297 298 299 300 301 302 303 304 305
    listTypeReleaseIterator(iter);

    if (inserted) {
        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(NOTIFY_LIST,"linsert",
                            c->argv[1],c->db->id);
        server.dirty++;
    } else {
        /* Notify client of a failed insert */
        addReply(c,shared.cnegone);
        return;
    }

    addReplyLongLong(c,listTypeLength(subject));
306 307
}

308
void llenCommand(client *c) {
309
    robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
310
    if (o == NULL || checkType(c,o,OBJ_LIST)) return;
311
    addReplyLongLong(c,listTypeLength(o));
312 313
}

314
void lindexCommand(client *c) {
315
    robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
316
    if (o == NULL || checkType(c,o,OBJ_LIST)) return;
317
    long index;
318 319
    robj *value = NULL;

320
    if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
321 322
        return;

323
    if (o->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
324 325 326 327
        quicklistEntry entry;
        if (quicklistIndex(o->ptr, index, &entry)) {
            if (entry.value) {
                value = createStringObject((char*)entry.value,entry.sz);
328
            } else {
M
Matt Stancliff 已提交
329
                value = createStringObjectFromLongLong(entry.longval);
330 331 332 333 334 335 336
            }
            addReplyBulk(c,value);
            decrRefCount(value);
        } else {
            addReply(c,shared.nullbulk);
        }
    } else {
A
antirez 已提交
337
        serverPanic("Unknown list encoding");
338 339 340
    }
}

341
void lsetCommand(client *c) {
342
    robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
343
    if (o == NULL || checkType(c,o,OBJ_LIST)) return;
344
    long index;
M
Matt Stancliff 已提交
345
    robj *value = c->argv[3];
346

347
    if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
348 349
        return;

350
    if (o->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
351 352 353 354
        quicklist *ql = o->ptr;
        int replaced = quicklistReplaceAtIndex(ql, index,
                                               value->ptr, sdslen(value->ptr));
        if (!replaced) {
355 356 357
            addReply(c,shared.outofrangeerr);
        } else {
            addReply(c,shared.ok);
358
            signalModifiedKey(c->db,c->argv[1]);
A
antirez 已提交
359
            notifyKeyspaceEvent(NOTIFY_LIST,"lset",c->argv[1],c->db->id);
360 361 362
            server.dirty++;
        }
    } else {
A
antirez 已提交
363
        serverPanic("Unknown list encoding");
364 365 366
    }
}

367
void popGenericCommand(client *c, int where) {
368
    robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
369
    if (o == NULL || checkType(c,o,OBJ_LIST)) return;
370 371 372 373 374

    robj *value = listTypePop(o,where);
    if (value == NULL) {
        addReply(c,shared.nullbulk);
    } else {
A
antirez 已提交
375
        char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
376

377 378
        addReplyBulk(c,value);
        decrRefCount(value);
A
antirez 已提交
379
        notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
380
        if (listTypeLength(o) == 0) {
A
antirez 已提交
381
            notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
382
                                c->argv[1],c->db->id);
383 384
            dbDelete(c->db,c->argv[1]);
        }
385
        signalModifiedKey(c->db,c->argv[1]);
386 387 388 389
        server.dirty++;
    }
}

390
void lpopCommand(client *c) {
A
antirez 已提交
391
    popGenericCommand(c,LIST_HEAD);
392 393
}

394
void rpopCommand(client *c) {
A
antirez 已提交
395
    popGenericCommand(c,LIST_TAIL);
396 397
}

398
void lrangeCommand(client *c) {
399
    robj *o;
400
    long start, end, llen, rangelen;
401

402 403
    if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
        (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
404

405
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
406
         || checkType(c,o,OBJ_LIST)) return;
407 408 409 410 411 412 413
    llen = listTypeLength(o);

    /* convert negative indexes */
    if (start < 0) start = llen+start;
    if (end < 0) end = llen+end;
    if (start < 0) start = 0;

414 415
    /* Invariant: start >= 0, so this test will be true when end < 0.
     * The range is empty when start > end or start >= length. */
416 417 418 419 420 421 422 423
    if (start > end || start >= llen) {
        addReply(c,shared.emptymultibulk);
        return;
    }
    if (end >= llen) end = llen-1;
    rangelen = (end-start)+1;

    /* Return the result in form of a multi-bulk reply */
424
    addReplyMultiBulkLen(c,rangelen);
425
    if (o->encoding == OBJ_ENCODING_QUICKLIST) {
A
antirez 已提交
426
        listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);
427 428

        while(rangelen--) {
M
Matt Stancliff 已提交
429 430 431 432 433
            listTypeEntry entry;
            listTypeNext(iter, &entry);
            quicklistEntry *qe = &entry.entry;
            if (qe->value) {
                addReplyBulkCBuffer(c,qe->value,qe->sz);
434
            } else {
M
Matt Stancliff 已提交
435
                addReplyBulkLongLong(c,qe->longval);
436 437
            }
        }
M
Matt Stancliff 已提交
438
        listTypeReleaseIterator(iter);
439
    } else {
A
antirez 已提交
440
        serverPanic("List encoding is not QUICKLIST!");
441 442 443
    }
}

444
void ltrimCommand(client *c) {
445
    robj *o;
M
Matt Stancliff 已提交
446
    long start, end, llen, ltrim, rtrim;
447

448 449
    if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
        (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;
450

451
    if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
452
        checkType(c,o,OBJ_LIST)) return;
453 454 455 456 457 458 459
    llen = listTypeLength(o);

    /* convert negative indexes */
    if (start < 0) start = llen+start;
    if (end < 0) end = llen+end;
    if (start < 0) start = 0;

460 461
    /* Invariant: start >= 0, so this test will be true when end < 0.
     * The range is empty when start > end or start >= length. */
462 463 464 465 466 467 468 469 470 471 472
    if (start > end || start >= llen) {
        /* Out of range start or start > end result in empty list */
        ltrim = llen;
        rtrim = 0;
    } else {
        if (end >= llen) end = llen-1;
        ltrim = start;
        rtrim = llen-end-1;
    }

    /* Remove list elements to perform the trim */
473
    if (o->encoding == OBJ_ENCODING_QUICKLIST) {
M
Matt Stancliff 已提交
474 475
        quicklistDelRange(o->ptr,0,ltrim);
        quicklistDelRange(o->ptr,-rtrim,rtrim);
476
    } else {
A
antirez 已提交
477
        serverPanic("Unknown list encoding");
478
    }
479

A
antirez 已提交
480
    notifyKeyspaceEvent(NOTIFY_LIST,"ltrim",c->argv[1],c->db->id);
481 482
    if (listTypeLength(o) == 0) {
        dbDelete(c->db,c->argv[1]);
A
antirez 已提交
483
        notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
484
    }
485
    signalModifiedKey(c->db,c->argv[1]);
486 487 488 489
    server.dirty++;
    addReply(c,shared.ok);
}

490
void lremCommand(client *c) {
491
    robj *subject, *obj;
M
Matt Stancliff 已提交
492
    obj = c->argv[3];
493
    long toremove;
494
    long removed = 0;
495

496
    if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != C_OK))
497 498
        return;

499
    subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
500
    if (subject == NULL || checkType(c,subject,OBJ_LIST)) return;
501 502 503 504

    listTypeIterator *li;
    if (toremove < 0) {
        toremove = -toremove;
A
antirez 已提交
505
        li = listTypeInitIterator(subject,-1,LIST_HEAD);
506
    } else {
A
antirez 已提交
507
        li = listTypeInitIterator(subject,0,LIST_TAIL);
508 509
    }

M
Matt Stancliff 已提交
510
    listTypeEntry entry;
511 512
    while (listTypeNext(li,&entry)) {
        if (listTypeEqual(&entry,obj)) {
M
Matt Stancliff 已提交
513
            listTypeDelete(li, &entry);
514 515 516 517 518 519 520
            server.dirty++;
            removed++;
            if (toremove && removed == toremove) break;
        }
    }
    listTypeReleaseIterator(li);

521 522 523 524 525
    if (removed) {
        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(NOTIFY_GENERIC,"lrem",c->argv[1],c->db->id);
    }

M
Matt Stancliff 已提交
526 527
    if (listTypeLength(subject) == 0) {
        dbDelete(c->db,c->argv[1]);
528
        notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
M
Matt Stancliff 已提交
529
    }
530

531
    addReplyLongLong(c,removed);
532 533 534 535
}

/* This is the semantic of this command:
 *  RPOPLPUSH srclist dstlist:
536 537 538 539 540 541 542
 *    IF LLEN(srclist) > 0
 *      element = RPOP srclist
 *      LPUSH dstlist element
 *      RETURN element
 *    ELSE
 *      RETURN nil
 *    END
543 544 545 546 547 548
 *  END
 *
 * The idea is to be able to get an element from a list in a reliable way
 * since the element is not just returned but pushed against another list
 * as well. This command was originally proposed by Ezra Zygmuntowicz.
 */
549

550
void rpoplpushHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value) {
551 552
    /* Create the list if the key does not exist */
    if (!dstobj) {
M
Matt Stancliff 已提交
553
        dstobj = createQuicklistObject();
554 555
        quicklistSetOptions(dstobj->ptr, server.list_max_ziplist_size,
                            server.list_compress_depth);
556
        dbAdd(c->db,dstkey,dstobj);
557
    }
558
    signalModifiedKey(c->db,dstkey);
A
antirez 已提交
559 560
    listTypePush(dstobj,value,LIST_HEAD);
    notifyKeyspaceEvent(NOTIFY_LIST,"lpush",dstkey,c->db->id);
561 562 563 564
    /* Always send the pushed value to the client. */
    addReplyBulk(c,value);
}

565
void rpoplpushCommand(client *c) {
566 567
    robj *sobj, *value;
    if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
568
        checkType(c,sobj,OBJ_LIST)) return;
569 570

    if (listTypeLength(sobj) == 0) {
571 572
        /* This may only happen after loading very old RDB files. Recent
         * versions of Redis delete keys of empty lists. */
573 574 575
        addReply(c,shared.nullbulk);
    } else {
        robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
576 577
        robj *touchedkey = c->argv[1];

578
        if (dobj && checkType(c,dobj,OBJ_LIST)) return;
A
antirez 已提交
579
        value = listTypePop(sobj,LIST_TAIL);
580
        /* We saved touched key, and protect it, since rpoplpushHandlePush
581 582
         * may change the client command argument vector (it does not
         * currently). */
583
        incrRefCount(touchedkey);
584
        rpoplpushHandlePush(c,c->argv[2],dobj,value);
585 586 587 588 589

        /* listTypePop returns an object with its refcount incremented */
        decrRefCount(value);

        /* Delete the source list when it is empty */
A
antirez 已提交
590
        notifyKeyspaceEvent(NOTIFY_LIST,"rpop",touchedkey,c->db->id);
591 592
        if (listTypeLength(sobj) == 0) {
            dbDelete(c->db,touchedkey);
A
antirez 已提交
593
            notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
594
                                touchedkey,c->db->id);
595
        }
596 597
        signalModifiedKey(c->db,touchedkey);
        decrRefCount(touchedkey);
598 599 600 601 602 603 604 605
        server.dirty++;
    }
}

/*-----------------------------------------------------------------------------
 * Blocking POP operations
 *----------------------------------------------------------------------------*/

606
/* This is how the current blocking POP works, we use BLPOP as example:
607 608
 * - If the user calls BLPOP and the key exists and contains a non empty list
 *   then LPOP is called instead. So BLPOP is semantically the same as LPOP
609
 *   if blocking is not required.
610 611 612 613 614 615 616
 * - If instead BLPOP is called and the key does not exists or the list is
 *   empty we need to block. In order to do so we remove the notification for
 *   new data to read in the client socket (so that we'll not serve new
 *   requests if the blocking request is not served). Also we put the client
 *   in a dictionary (db->blocking_keys) mapping keys to a list of clients
 *   blocking for this keys.
 * - If a PUSH operation against a key with blocked clients waiting is
617 618 619 620
 *   performed, we mark this key as "ready", and after the current command,
 *   MULTI/EXEC block, or script, is executed, we serve all the clients waiting
 *   for this list, from the one that blocked first, to the last, accordingly
 *   to the number of elements we have in the ready list.
621 622 623 624
 */

/* Set a client in blocking mode for the specified key, with the specified
 * timeout */
625
void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
626 627
    dictEntry *de;
    list *l;
628
    int j;
629

630 631
    c->bpop.timeout = timeout;
    c->bpop.target = target;
632

633 634
    if (target != NULL) incrRefCount(target);

635
    for (j = 0; j < numkeys; j++) {
636 637
        /* If the key already exists in the dict ignore it. */
        if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
638 639 640 641 642 643 644 645 646 647 648
        incrRefCount(keys[j]);

        /* And in the other "side", to map keys -> clients */
        de = dictFind(c->db->blocking_keys,keys[j]);
        if (de == NULL) {
            int retval;

            /* For every key we take a list of clients blocked for it */
            l = listCreate();
            retval = dictAdd(c->db->blocking_keys,keys[j],l);
            incrRefCount(keys[j]);
A
antirez 已提交
649
            serverAssertWithInfo(c,keys[j],retval == DICT_OK);
650
        } else {
651
            l = dictGetVal(de);
652 653 654
        }
        listAddNodeTail(l,c);
    }
A
antirez 已提交
655
    blockClient(c,BLOCKED_LIST);
656 657
}

658 659
/* Unblock a client that's waiting in a blocking operation such as BLPOP.
 * You should never call this function directly, but unblockClient() instead. */
660
void unblockClientWaitingData(client *c) {
661
    dictEntry *de;
662
    dictIterator *di;
663 664
    list *l;

A
antirez 已提交
665
    serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
666
    di = dictGetIterator(c->bpop.keys);
667
    /* The client may wait for multiple keys, so unblock it for every key. */
668 669 670
    while((de = dictNext(di)) != NULL) {
        robj *key = dictGetKey(de);

671
        /* Remove this client from the list of clients waiting for this key. */
672
        l = dictFetchValue(c->db->blocking_keys,key);
A
antirez 已提交
673
        serverAssertWithInfo(c,key,l != NULL);
674 675 676
        listDelNode(l,listSearchKey(l,c));
        /* If the list is empty we need to remove it to avoid wasting memory */
        if (listLength(l) == 0)
677
            dictDelete(c->db->blocking_keys,key);
678
    }
679
    dictReleaseIterator(di);
680

681
    /* Cleanup the client structure */
682
    dictEmpty(c->bpop.keys,NULL);
683 684 685 686
    if (c->bpop.target) {
        decrRefCount(c->bpop.target);
        c->bpop.target = NULL;
    }
687 688
}

689 690
/* If the specified key has clients blocked waiting for list pushes, this
 * function will put the key reference into the server.ready_keys list.
691
 * Note that db->ready_keys is a hash table that allows us to avoid putting
G
guiquanz 已提交
692
 * the same key again and again in the list in case of multiple pushes
693
 * made by a script or in the context of MULTI/EXEC.
694
 *
695
 * The list will be finally processed by handleClientsBlockedOnLists() */
696
void signalListAsReady(redisDb *db, robj *key) {
697 698 699
    readyList *rl;

    /* No clients blocking for this key? No need to queue it. */
700
    if (dictFind(db->blocking_keys,key) == NULL) return;
701 702

    /* Key was already signaled? No need to queue it again. */
703
    if (dictFind(db->ready_keys,key) != NULL) return;
704 705 706 707

    /* Ok, we need to queue this key into server.ready_keys. */
    rl = zmalloc(sizeof(*rl));
    rl->key = key;
708
    rl->db = db;
709 710 711 712 713 714 715
    incrRefCount(key);
    listAddNodeTail(server.ready_keys,rl);

    /* We also add the key in the db->ready_keys dictionary in order
     * to avoid adding it multiple times into a list with a simple O(1)
     * check. */
    incrRefCount(key);
A
antirez 已提交
716
    serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
717 718
}

719
/* This is a helper function for handleClientsBlockedOnLists(). It's work
720 721
 * is to serve a specific client (receiver) that is blocked on 'key'
 * in the context of the specified 'db', doing the following:
722
 *
723 724
 * 1) Provide the client with the 'value' element.
 * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
G
guiquanz 已提交
725
 *    'value' element on the destination list (the LPUSH side of the command).
726 727 728
 * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
 *    the AOF and replication channel.
 *
A
antirez 已提交
729
 * The argument 'where' is LIST_TAIL or LIST_HEAD, and indicates if the
730 731 732
 * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that
 * we can propagate the command properly.
 *
733 734
 * The function returns C_OK if we are able to serve the client, otherwise
 * C_ERR is returned to signal the caller that the list POP operation
G
guiquanz 已提交
735
 * should be undone as the client was not served: This only happens for
736 737
 * BRPOPLPUSH that fails to push the value to the destination key as it is
 * of the wrong type. */
738
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
739 740 741 742 743
{
    robj *argv[3];

    if (dstkey == NULL) {
        /* Propagate the [LR]POP operation. */
A
antirez 已提交
744
        argv[0] = (where == LIST_HEAD) ? shared.lpop :
745 746
                                          shared.rpop;
        argv[1] = key;
A
antirez 已提交
747
        propagate((where == LIST_HEAD) ?
748
            server.lpopCommand : server.rpopCommand,
A
antirez 已提交
749
            db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
750 751 752 753 754 755 756 757 758 759

        /* BRPOP/BLPOP */
        addReplyMultiBulkLen(receiver,2);
        addReplyBulk(receiver,key);
        addReplyBulk(receiver,value);
    } else {
        /* BRPOPLPUSH */
        robj *dstobj =
            lookupKeyWrite(receiver->db,dstkey);
        if (!(dstobj &&
760
             checkType(receiver,dstobj,OBJ_LIST)))
761 762 763 764 765 766
        {
            /* Propagate the RPOP operation. */
            argv[0] = shared.rpop;
            argv[1] = key;
            propagate(server.rpopCommand,
                db->id,argv,2,
A
antirez 已提交
767 768
                PROPAGATE_AOF|
                PROPAGATE_REPL);
769 770 771 772 773 774 775 776
            rpoplpushHandlePush(receiver,dstkey,dstobj,
                value);
            /* Propagate the LPUSH operation. */
            argv[0] = shared.lpush;
            argv[1] = dstkey;
            argv[2] = value;
            propagate(server.lpushCommand,
                db->id,argv,3,
A
antirez 已提交
777 778
                PROPAGATE_AOF|
                PROPAGATE_REPL);
779
        } else {
780 781
            /* BRPOPLPUSH failed because of wrong
             * destination type. */
782
            return C_ERR;
783
        }
D
Damian Janowski &amp; Michel Martens 已提交
784
    }
785
    return C_OK;
786
}
787

788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819
/* This function should be called by Redis every time a single command,
 * a MULTI/EXEC block, or a Lua script, terminated its execution after
 * being called by a client.
 *
 * All the keys with at least one client blocked that received at least
 * one new element via some PUSH operation are accumulated into
 * the server.ready_keys list. This function will run the list and will
 * serve clients accordingly. Note that the function will iterate again and
 * again as a result of serving BRPOPLPUSH we can have new blocking clients
 * to serve because of the PUSH side of BRPOPLPUSH. */
void handleClientsBlockedOnLists(void) {
    while(listLength(server.ready_keys) != 0) {
        list *l;

        /* Point server.ready_keys to a fresh list and save the current one
         * locally. This way as we run the old list we are free to call
         * signalListAsReady() that may push new elements in server.ready_keys
         * when handling clients blocked into BRPOPLPUSH. */
        l = server.ready_keys;
        server.ready_keys = listCreate();

        while(listLength(l) != 0) {
            listNode *ln = listFirst(l);
            readyList *rl = ln->value;

            /* First of all remove this key from db->ready_keys so that
             * we can safely call signalListAsReady() against this key. */
            dictDelete(rl->db->ready_keys,rl->key);

            /* If the key exists and it's a list, serve blocked clients
             * with data. */
            robj *o = lookupKeyWrite(rl->db,rl->key);
820
            if (o != NULL && o->type == OBJ_LIST) {
821 822 823 824 825 826 827 828 829 830 831
                dictEntry *de;

                /* We serve clients in the same order they blocked for
                 * this key, from the first blocked to the last. */
                de = dictFind(rl->db->blocking_keys,rl->key);
                if (de) {
                    list *clients = dictGetVal(de);
                    int numclients = listLength(clients);

                    while(numclients--) {
                        listNode *clientnode = listFirst(clients);
832
                        client *receiver = clientnode->value;
833 834 835
                        robj *dstkey = receiver->bpop.target;
                        int where = (receiver->lastcmd &&
                                     receiver->lastcmd->proc == blpopCommand) ?
A
antirez 已提交
836
                                    LIST_HEAD : LIST_TAIL;
837 838 839 840
                        robj *value = listTypePop(o,where);

                        if (value) {
                            /* Protect receiver->bpop.target, that will be
841
                             * freed by the next unblockClient()
842 843
                             * call. */
                            if (dstkey) incrRefCount(dstkey);
844
                            unblockClient(receiver);
845 846 847

                            if (serveClientBlockedOnList(receiver,
                                rl->key,dstkey,rl->db,value,
848
                                where) == C_ERR)
849 850 851 852 853 854 855 856 857 858 859 860 861
                            {
                                /* If we failed serving the client we need
                                 * to also undo the POP operation. */
                                    listTypePush(o,value,where);
                            }

                            if (dstkey) decrRefCount(dstkey);
                            decrRefCount(value);
                        } else {
                            break;
                        }
                    }
                }
862

M
Matt Stancliff 已提交
863 864 865
                if (listTypeLength(o) == 0) {
                    dbDelete(rl->db,rl->key);
                }
866 867 868 869 870 871 872 873 874 875 876
                /* We don't call signalModifiedKey() as it was already called
                 * when an element was pushed on the list. */
            }

            /* Free this item. */
            decrRefCount(rl->key);
            zfree(rl);
            listDelNode(l,ln);
        }
        listRelease(l); /* We have the new list on place at this point. */
    }
877 878 879
}

/* Blocking RPOP/LPOP */
880
void blockingPopGenericCommand(client *c, int where) {
881
    robj *o;
882
    mstime_t timeout;
883 884
    int j;

885
    if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
886
        != C_OK) return;
887

888 889 890
    for (j = 1; j < c->argc-1; j++) {
        o = lookupKeyWrite(c->db,c->argv[j]);
        if (o != NULL) {
891
            if (o->type != OBJ_LIST) {
892 893 894 895
                addReply(c,shared.wrongtypeerr);
                return;
            } else {
                if (listTypeLength(o) != 0) {
896
                    /* Non empty list, this is like a non normal [LR]POP. */
A
antirez 已提交
897
                    char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
898
                    robj *value = listTypePop(o,where);
A
antirez 已提交
899
                    serverAssert(value != NULL);
D
Damian Janowski &amp; Michel Martens 已提交
900

901 902 903 904
                    addReplyMultiBulkLen(c,2);
                    addReplyBulk(c,c->argv[j]);
                    addReplyBulk(c,value);
                    decrRefCount(value);
A
antirez 已提交
905
                    notifyKeyspaceEvent(NOTIFY_LIST,event,
906
                                        c->argv[j],c->db->id);
907 908
                    if (listTypeLength(o) == 0) {
                        dbDelete(c->db,c->argv[j]);
A
antirez 已提交
909
                        notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
910
                                            c->argv[j],c->db->id);
911
                    }
912 913 914 915 916
                    signalModifiedKey(c->db,c->argv[j]);
                    server.dirty++;

                    /* Replicate it as an [LR]POP instead of B[LR]POP. */
                    rewriteClientCommandVector(c,2,
A
antirez 已提交
917
                        (where == LIST_HEAD) ? shared.lpop : shared.rpop,
918
                        c->argv[j]);
919 920 921 922 923
                    return;
                }
            }
        }
    }
924

925 926
    /* If we are inside a MULTI/EXEC and the list is empty the only thing
     * we can do is treating it as a timeout (even with timeout 0). */
A
antirez 已提交
927
    if (c->flags & CLIENT_MULTI) {
928 929 930 931
        addReply(c,shared.nullmultibulk);
        return;
    }

932
    /* If the list is empty or the key does not exists we must block */
933
    blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
934 935
}

936
void blpopCommand(client *c) {
A
antirez 已提交
937
    blockingPopGenericCommand(c,LIST_HEAD);
938 939
}

940
void brpopCommand(client *c) {
A
antirez 已提交
941
    blockingPopGenericCommand(c,LIST_TAIL);
942
}
D
Damian Janowski &amp; Michel Martens 已提交
943

944
void brpoplpushCommand(client *c) {
945
    mstime_t timeout;
D
Damian Janowski &amp; Michel Martens 已提交
946

947
    if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
948
        != C_OK) return;
949 950 951 952

    robj *key = lookupKeyWrite(c->db, c->argv[1]);

    if (key == NULL) {
A
antirez 已提交
953
        if (c->flags & CLIENT_MULTI) {
954 955
            /* Blocking against an empty list in a multi state
             * returns immediately. */
956
            addReply(c, shared.nullbulk);
957
        } else {
958
            /* The list is empty and the client blocks. */
959 960 961
            blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
        }
    } else {
962
        if (key->type != OBJ_LIST) {
963 964 965 966
            addReply(c, shared.wrongtypeerr);
        } else {
            /* The list exists and has elements, so
             * the regular rpoplpushCommand is executed. */
A
antirez 已提交
967
            serverAssertWithInfo(c,key,listTypeLength(key) > 0);
968 969
            rpoplpushCommand(c);
        }
970
    }
D
Damian Janowski &amp; Michel Martens 已提交
971
}