提交 6efb6c1e 编写于 作者: A antirez

ZPOP: renaming to have explicit MIN/MAX score idea.

This commit also adds a top comment about a subtle behavior of mixing
blocking operations of different types in the same key.
上级 6b026b70
......@@ -204,14 +204,25 @@ void disconnectAllBlockedClients(void) {
/* This function should be called by Redis every time a single command,
* a MULTI/EXEC block, or a Lua script, terminated its execution after
* being called by a client.
* being called by a client. It handles serving clients blocked in
* lists, streams, and sorted sets, via a blocking commands.
*
* All the keys with at least one client blocked that received at least
* one new element via some PUSH/XADD operation are accumulated into
* one new element via some write operation are accumulated into
* the server.ready_keys list. This function will run the list and will
* serve clients accordingly. Note that the function will iterate again and
* again as a result of serving BRPOPLPUSH we can have new blocking clients
* to serve because of the PUSH side of BRPOPLPUSH. */
* to serve because of the PUSH side of BRPOPLPUSH.
*
* This function is normally "fair", that is, it will server clients
* using a FIFO behavior. However this fairness is violated in certain
* edge cases, that is, when we have clients blocked at the same time
* in a sorted set and in a list, for the same key (a very odd thing to
* do client side, indeed!). Because mismatching clients (blocking for
* a different type compared to the current key type) are moved in the
* other side of the linked list. However as long as the key starts to
* be used only for a single type, like virtually any Redis application will
* do, the function is already fair. */
void handleClientsBlockedOnKeys(void) {
while(listLength(server.ready_keys) != 0) {
list *l;
......@@ -316,14 +327,14 @@ void handleClientsBlockedOnKeys(void) {
continue;
}
int reverse = (receiver->lastcmd &&
receiver->lastcmd->proc == bzpopCommand) ?
0 : 1;
int where = (receiver->lastcmd &&
receiver->lastcmd->proc == bzpopminCommand)
? ZSET_MIN : ZSET_MAX;
unblockClient(receiver);
genericZpopCommand(receiver,&rl->key,1,reverse);
genericZpopCommand(receiver,&rl->key,1,where);
propagate(reverse ?
server.zrevpopCommand : server.zpopCommand,
propagate(where == ZSET_MIN ?
server.zpopminCommand : server.zpopmaxCommand,
receiver->db->id,receiver->argv,receiver->argc,
PROPAGATE_AOF|PROPAGATE_REPL);
}
......
......@@ -198,10 +198,10 @@ struct redisCommand redisCommandTable[] = {
{"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0},
{"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0},
{"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
{"zpop",zpopCommand,-2,"wF",0,NULL,1,-1,1,0,0},
{"zrevpop",zrevpopCommand,-2,"wF",0,NULL,1,-1,1,0,0},
{"bzpop",bzpopCommand,-2,"wsF",0,NULL,1,-2,1,0,0},
{"bzrevpop",bzrevpopCommand,-2,"wsF",0,NULL,1,-2,1,0,0},
{"zpopmin",zpopminCommand,-2,"wF",0,NULL,1,-1,1,0,0},
{"zpopmax",zpopmaxCommand,-2,"wF",0,NULL,1,-1,1,0,0},
{"bzpopmin",bzpopminCommand,-2,"wsF",0,NULL,1,-2,1,0,0},
{"bzpopmax",bzpopmaxCommand,-2,"wsF",0,NULL,1,-2,1,0,0},
{"hset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
{"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0},
{"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0},
......@@ -1373,8 +1373,8 @@ void createSharedObjects(void) {
shared.rpop = createStringObject("RPOP",4);
shared.lpop = createStringObject("LPOP",4);
shared.lpush = createStringObject("LPUSH",5);
shared.zpop = createStringObject("ZPOP",4);
shared.zrevpop = createStringObject("ZREVPOP",7);
shared.zpopmin = createStringObject("ZPOPMIN",7);
shared.zpopmax = createStringObject("ZPOPMAX",7);
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
shared.integers[j] =
makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
......@@ -1568,8 +1568,8 @@ void initServerConfig(void) {
server.lpushCommand = lookupCommandByCString("lpush");
server.lpopCommand = lookupCommandByCString("lpop");
server.rpopCommand = lookupCommandByCString("rpop");
server.zpopCommand = lookupCommandByCString("zpop");
server.zrevpopCommand = lookupCommandByCString("zrevpop");
server.zpopminCommand = lookupCommandByCString("zpopmin");
server.zpopmaxCommand = lookupCommandByCString("zpopmax");
server.sremCommand = lookupCommandByCString("srem");
server.execCommand = lookupCommandByCString("exec");
server.expireCommand = lookupCommandByCString("expire");
......
......@@ -316,6 +316,8 @@ typedef long long mstime_t; /* millisecond time type. */
/* List related stuff */
#define LIST_HEAD 0
#define LIST_TAIL 1
#define ZSET_MIN 0
#define ZSET_MAX 1
/* Sort operations */
#define SORT_OP_GET 0
......@@ -763,7 +765,7 @@ struct sharedObjectsStruct {
*masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr,
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink,
*rpop, *lpop, *lpush, *zpop, *zrevpop, *emptyscan,
*rpop, *lpop, *lpush, *zpopmin, *zpopmax, *emptyscan,
*select[PROTO_SHARED_SELECT_CMDS],
*integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
......@@ -960,9 +962,10 @@ struct redisServer {
time_t loading_start_time;
off_t loading_process_events_interval_bytes;
/* Fast pointers to often looked up command */
struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
*rpopCommand, *zpopCommand, *zrevpopCommand, *sremCommand,
*execCommand, *expireCommand, *pexpireCommand, *xclaimCommand;
struct redisCommand *delCommand, *multiCommand, *lpushCommand,
*lpopCommand, *rpopCommand, *zpopminCommand,
*zpopmaxCommand, *sremCommand, *execCommand,
*expireCommand, *pexpireCommand, *xclaimCommand;
/* Fields used only for stats */
time_t stat_starttime; /* Server start time */
long long stat_numcommands; /* Number of processed commands */
......@@ -1970,10 +1973,10 @@ void zremCommand(client *c);
void zscoreCommand(client *c);
void zremrangebyscoreCommand(client *c);
void zremrangebylexCommand(client *c);
void zpopCommand(client *c);
void zrevpopCommand(client *c);
void bzpopCommand(client *c);
void bzrevpopCommand(client *c);
void zpopminCommand(client *c);
void zpopmaxCommand(client *c);
void bzpopminCommand(client *c);
void bzpopmaxCommand(client *c);
void multiCommand(client *c);
void execCommand(client *c);
void discardCommand(client *c);
......
......@@ -3070,16 +3070,15 @@ void zscanCommand(client *c) {
}
/* This command implements the generic zpop operation, used by:
* ZPOP, ZREVPOP, BZPOP and BZREVPOP */
void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse) {
* ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX */
void genericZpopCommand(client *c, robj **keyv, int keyc, int where) {
int idx;
robj *key;
robj *zobj;
sds ele;
double score;
char *events[2] = {"zpop","zrevpop"};
// Check type and break on the first error, otherwise identify candidate
/* Check type and break on the first error, otherwise identify candidate. */
idx = 0;
while (idx < keyc) {
key = keyv[idx++];
......@@ -3089,7 +3088,7 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse) {
break;
}
// No candidate for zpopping, return empty
/* No candidate for zpopping, return empty. */
if (!zobj) {
addReply(c,shared.emptymultibulk);
return;
......@@ -3102,11 +3101,8 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse) {
unsigned int vlen;
long long vlong;
// Get the first or last element in the sorted set
eptr = ziplistIndex(zl,reverse ? -2 : 0);
serverAssertWithInfo(c,zobj,eptr != NULL);
// There must be an element in the sorted set
/* Get the first or last element in the sorted set. */
eptr = ziplistIndex(zl,where == ZSET_MAX ? -2 : 0);
serverAssertWithInfo(c,zobj,eptr != NULL);
serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
if (vstr == NULL)
......@@ -3114,22 +3110,22 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse) {
else
ele = sdsnewlen(vstr,vlen);
// Get the score
/* Get the score. */
sptr = ziplistNext(zl,eptr);
serverAssertWithInfo(c,zobj,sptr != NULL);
score = zzlGetScore(sptr);
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
zskiplistNode *zln;
// Get the first or last element in the sorted set
ln = (reverse ? zsl->tail : zsl->header->level[0].forward);
zln = (where == ZSET_MAX ? zsl->tail : zsl->header->level[0].forward);
// There must be an element in the sorted set
serverAssertWithInfo(c,zobj,ln != NULL);
ele = sdsdup(ln->ele);
score = ln->score;
serverAssertWithInfo(c,zobj,zln != NULL);
ele = sdsdup(zln->ele);
score = zln->score;
} else {
serverPanic("Unknown sorted set encoding");
}
......@@ -3138,7 +3134,8 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse) {
serverAssertWithInfo(c,zobj,zsetDel(zobj,ele));
server.dirty++;
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,events[reverse],key,c->db->id);
char *events[2] = {"zpopmin","zpopmax"};
notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id);
// Remove the key, if indeed needed
if (zsetLength(zobj) == 0) {
......@@ -3153,18 +3150,18 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse) {
sdsfree(ele);
}
// ZPOP key [key ...]
void zpopCommand(client *c) {
genericZpopCommand(c,&c->argv[1],c->argc-1,0);
// ZPOPMIN key [key ...]
void zpopminCommand(client *c) {
genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MIN);
}
// ZREVPOP key [key ...]
void zrevpopCommand(client *c) {
genericZpopCommand(c,&c->argv[1],c->argc-1,1);
// ZMAXPOP key [key ...]
void zpopmaxCommand(client *c) {
genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MAX);
}
/* Blocking Z[REV]POP */
void blockingGenericZpopCommand(client *c, int reverse) {
/* BZPOPMIN / BZPOPMAX actual implementation. */
void blockingGenericZpopCommand(client *c, int where) {
robj *o;
mstime_t timeout;
int j;
......@@ -3181,10 +3178,10 @@ void blockingGenericZpopCommand(client *c, int reverse) {
} else {
if (zsetLength(o) != 0) {
/* Non empty zset, this is like a normal Z[REV]POP. */
genericZpopCommand(c,&c->argv[j],1,reverse);
genericZpopCommand(c,&c->argv[j],1,where);
/* Replicate it as an Z[REV]POP instead of BZ[REV]POP. */
rewriteClientCommandVector(c,2,
reverse ? shared.zrevpop : shared.zpop,
where == ZSET_MAX ? shared.zpopmax : shared.zpopmin,
c->argv[j]);
return;
}
......@@ -3203,12 +3200,12 @@ void blockingGenericZpopCommand(client *c, int reverse) {
blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
}
// BZPOP key [key ...] timeout
void bzpopCommand(client *c) {
blockingGenericZpopCommand(c,0);
// BZPOPMIN key [key ...] timeout
void bzpopminCommand(client *c) {
blockingGenericZpopCommand(c,ZSET_MIN);
}
// BZREVPOP key [key ...] timeout
void bzrevpopCommand(client *c) {
blockingGenericZpopCommand(c,1);
// BZPOPMAX key [key ...] timeout
void bzpopmaxCommand(client *c) {
blockingGenericZpopCommand(c,ZSET_MAX);
}
......@@ -649,74 +649,74 @@ start_server {tags {"zset"}} {
}
}
test "Basic Z\[REV\]POP with a single key - $encoding" {
test "Basic ZPOP with a single key - $encoding" {
r del zset
assert_equal {} [r zpop zset]
assert_equal {} [r zpopmin zset]
create_zset zset {-1 a 1 b 2 c 3 d 4 e}
assert_equal {zset -1 a} [r zpop zset]
assert_equal {zset 1 b} [r zpop zset]
assert_equal {zset 4 e} [r zrevpop zset]
assert_equal {zset 3 d} [r zrevpop zset]
assert_equal {zset 2 c} [r zpop zset]
assert_equal {zset -1 a} [r zpopmin zset]
assert_equal {zset 1 b} [r zpopmin zset]
assert_equal {zset 4 e} [r zpopmax zset]
assert_equal {zset 3 d} [r zpopmax zset]
assert_equal {zset 2 c} [r zpopmin zset]
assert_equal 0 [r exists zset]
r set foo bar
assert_error "*WRONGTYPE*" {r zpop foo}
assert_error "*WRONGTYPE*" {r zpopmin foo}
}
test "Z\[REV\]POP with multiple keys - $encoding" {
test "ZPOP with multiple keys - $encoding" {
r del z1 z2 z3 foo
r set foo bar
assert_equal {} [r zpop z1 z2 z3]
assert_error "*WRONGTYPE*" {r zpop z1 foo}
assert_equal {} [r zpopmin z1 z2 z3]
assert_error "*WRONGTYPE*" {r zpopmin z1 foo}
create_zset z1 {0 a 1 b 2 c}
assert_equal {z1 0 a} [r zpop z1 z2 z3]
assert_equal {z1 1 b} [r zpop z3 z2 z1]
assert_equal {z1 0 a} [r zpopmin z1 z2 z3]
assert_equal {z1 1 b} [r zpopmin z3 z2 z1]
create_zset z3 {0 a 1 b 2 c}
assert_equal {z3 2 c} [r zrevpop z3 z2 z1]
assert_equal {z3 2 c} [r zpopmax z3 z2 z1]
assert_equal 1 [r exists z1]
assert_equal 1 [r exists z3]
}
test "BZ\[REV\]POP with a single existing sorted set - $encoding" {
test "BZPOP with a single existing sorted set - $encoding" {
set rd [redis_deferring_client]
create_zset zset {0 a 1 b 2 c}
$rd bzpop zset 5
$rd bzpopmin zset 5
assert_equal {zset 0 a} [$rd read]
$rd bzpop zset 5
$rd bzpopmin zset 5
assert_equal {zset 1 b} [$rd read]
$rd bzrevpop zset 5
$rd bzpopmax zset 5
assert_equal {zset 2 c} [$rd read]
assert_equal 0 [r exists zset]
}
test "BZ\[REV\]POP with multiple existing sorted sets - $encoding" {
test "BZPOP with multiple existing sorted sets - $encoding" {
set rd [redis_deferring_client]
create_zset z1 {0 a 1 b 2 c}
create_zset z2 {3 d 4 e 5 f}
$rd bzpop z1 z2 5
$rd bzpopmin z1 z2 5
assert_equal {z1 0 a} [$rd read]
$rd bzrevpop z1 z2 5
$rd bzpopmax z1 z2 5
assert_equal {z1 2 c} [$rd read]
assert_equal 1 [r zcard z1]
assert_equal 3 [r zcard z2]
$rd bzrevpop z2 z1 5
$rd bzpopmax z2 z1 5
assert_equal {z2 5 f} [$rd read]
$rd bzpop z2 z1 5
$rd bzpopmin z2 z1 5
assert_equal {z2 3 d} [$rd read]
assert_equal 1 [r zcard z1]
assert_equal 1 [r zcard z2]
}
test "BZ\[REV\]POP second sorted set has members - $encoding" {
test "BZPOP second sorted set has members - $encoding" {
set rd [redis_deferring_client]
r del z1
create_zset z2 {3 d 4 e 5 f}
$rd bzrevpop z1 z2 5
$rd bzpopmax z1 z2 5
assert_equal {z2 5 f} [$rd read]
$rd bzpop z2 z1 5
$rd bzpopmin z2 z1 5
assert_equal {z2 3 d} [$rd read]
assert_equal 0 [r zcard z1]
assert_equal 1 [r zcard z2]
......@@ -1099,11 +1099,11 @@ start_server {tags {"zset"}} {
assert_equal {} $err
}
test "BZPOP, ZADD + DEL should not awake blocked client" {
test "BZPOPMIN, ZADD + DEL should not awake blocked client" {
set rd [redis_deferring_client]
r del zset
$rd bzpop zset 0
$rd bzpopmin zset 0
r multi
r zadd zset 0 foo
r del zset
......@@ -1113,13 +1113,13 @@ start_server {tags {"zset"}} {
$rd read
} {zset 1 bar}
test "BZPOP, ZADD + DEL + SET should not awake blocked client" {
test "BZPOPMIN, ZADD + DEL + SET should not awake blocked client" {
set rd [redis_deferring_client]
r del list
r del zset
$rd bzpop zset 0
$rd bzpopmin zset 0
r multi
r zadd zset 0 foo
r del zset
......@@ -1130,31 +1130,31 @@ start_server {tags {"zset"}} {
$rd read
} {zset 1 bar}
test "BZPOP with same key multiple times should work" {
test "BZPOPMIN with same key multiple times should work" {
set rd [redis_deferring_client]
r del z1 z2
# Data arriving after the BZPOP.
$rd bzpop z1 z2 z2 z1 0
# Data arriving after the BZPOPMIN.
$rd bzpopmin z1 z2 z2 z1 0
r zadd z1 0 a
assert_equal [$rd read] {z1 0 a}
$rd bzpop z1 z2 z2 z1 0
$rd bzpopmin z1 z2 z2 z1 0
r zadd z2 1 b
assert_equal [$rd read] {z2 1 b}
# Data already there.
r zadd z1 0 a
r zadd z2 1 b
$rd bzpop z1 z2 z2 z1 0
$rd bzpopmin z1 z2 z2 z1 0
assert_equal [$rd read] {z1 0 a}
$rd bzpop z1 z2 z2 z1 0
$rd bzpopmin z1 z2 z2 z1 0
assert_equal [$rd read] {z2 1 b}
}
test "MULTI/EXEC is isolated from the point of view of BZPOP" {
test "MULTI/EXEC is isolated from the point of view of BZPOPMIN" {
set rd [redis_deferring_client]
r del zset
$rd bzpop zset 0
$rd bzpopmin zset 0
r multi
r zadd zset 0 a
r zadd zset 1 b
......@@ -1163,11 +1163,11 @@ start_server {tags {"zset"}} {
$rd read
} {zset 0 a}
test "BZPOP with variadic ZADD" {
test "BZPOPMIN with variadic ZADD" {
set rd [redis_deferring_client]
r del zset
if {$::valgrind} {after 100}
$rd bzpop zset 0
$rd bzpopmin zset 0
if {$::valgrind} {after 100}
assert_equal 2 [r zadd zset -1 foo 1 bar]
if {$::valgrind} {after 100}
......@@ -1175,10 +1175,10 @@ start_server {tags {"zset"}} {
assert_equal {bar} [r zrange zset 0 -1]
}
test "BZPOP with zero timeout should block indefinitely" {
test "BZPOPMIN with zero timeout should block indefinitely" {
set rd [redis_deferring_client]
r del zset
$rd bzpop zset 0
$rd bzpopmin zset 0
after 1000
r zadd zset 0 foo
assert_equal {zset 0 foo} [$rd read]
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册