From 8a88c368edbc12540eee3d129b8a017bd6a84cac Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 6 Dec 2010 16:04:10 +0100 Subject: [PATCH] Check other blocked clients when value could not be pushed --- src/t_list.c | 64 +++++++++++++++++++++++++--------------- tests/unit/type/list.tcl | 14 +++++++++ 2 files changed, 55 insertions(+), 23 deletions(-) diff --git a/src/t_list.c b/src/t_list.c index 3ce0f992..866a6a3e 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -780,35 +780,53 @@ void unblockClientWaitingData(redisClient *c) { int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { struct dictEntry *de; redisClient *receiver; - list *l; + int numclients; + list *clients; listNode *ln; + robj *dstkey, *dstobj; de = dictFind(c->db->blocking_keys,key); if (de == NULL) return 0; - l = dictGetEntryVal(de); - ln = listFirst(l); - redisAssert(ln != NULL); - receiver = ln->value; - - robj *target = receiver->bpop.target; - - unblockClientWaitingData(receiver); - - if (target == NULL) { - /* BRPOP/BLPOP return a multi-bulk with the name - * of the popped list */ - addReplyMultiBulkLen(receiver,2); - addReplyBulk(receiver,key); - addReplyBulk(receiver,ele); - } else { - /* BRPOPLPUSH */ - robj *dobj = lookupKeyWrite(receiver->db,target); - if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; - rpoplpushHandlePush(receiver,target,dobj,ele); - decrRefCount(target); + clients = dictGetEntryVal(de); + numclients = listLength(clients); + + /* Try to handle the push as long as there are clients waiting for a push. + * Note that "numclients" is used because the list of clients waiting for a + * push on "key" is deleted by unblockClient() when empty. + * + * This loop will have more than 1 iteration when there is a BRPOPLPUSH + * that cannot push the target list because it does not contain a list. If + * this happens, it simply tries the next client waiting for a push. */ + while (numclients--) { + ln = listFirst(clients); + redisAssert(ln != NULL); + receiver = ln->value; + dstkey = receiver->bpop.target; + + /* This should remove the first element of the "clients" list. */ + unblockClientWaitingData(receiver); + redisAssert(ln != listFirst(clients)); + + if (dstkey == NULL) { + /* BRPOP/BLPOP */ + addReplyMultiBulkLen(receiver,2); + addReplyBulk(receiver,key); + addReplyBulk(receiver,ele); + return 1; + } else { + /* BRPOPLPUSH */ + dstobj = lookupKeyWrite(receiver->db,dstkey); + if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) { + decrRefCount(dstkey); + } else { + rpoplpushHandlePush(receiver,dstkey,dstobj,ele); + decrRefCount(dstkey); + return 1; + } + } } - return 1; + return 0; } int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) { diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 36164433..8ac128c5 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -191,6 +191,20 @@ start_server { assert_equal {foo} [r lrange blist 0 -1] } + test "BRPOPLPUSH with multiple blocked clients" { + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + r del blist target1 target2 + r set target1 nolist + $rd1 brpoplpush blist target1 0 + $rd2 brpoplpush blist target2 0 + r lpush blist foo + + assert_error "ERR*wrong kind*" {$rd1 read} + assert_equal {foo} [$rd2 read] + assert_equal {foo} [r lrange target2 0 -1] + } + test "linked BRPOPLPUSH" { set rd1 [redis_deferring_client] set rd2 [redis_deferring_client] -- GitLab