diff --git a/redis.c b/redis.c index 1ead01f8841b41d9ebe0ef98c14bf6eed7e0b03c..1abfd96b544206dfb7b3d731d575d4115e944993 100644 --- a/redis.c +++ b/redis.c @@ -456,6 +456,7 @@ typedef struct _redisSortOperation { typedef struct zskiplistNode { struct zskiplistNode **forward; struct zskiplistNode *backward; + unsigned long *span; double score; robj *obj; } zskiplistNode; @@ -658,6 +659,7 @@ static void discardCommand(redisClient *c); static void blpopCommand(redisClient *c); static void brpopCommand(redisClient *c); static void appendCommand(redisClient *c); +static void zrankCommand(redisClient *c); /*================================= Globals ================================= */ @@ -710,6 +712,7 @@ static struct redisCommand cmdTable[] = { {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE,1,1,1}, {"zcard",zcardCommand,2,REDIS_CMD_INLINE,1,1,1}, {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1}, + {"zrank",zrankCommand,3,REDIS_CMD_INLINE,1,1,1}, {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1}, {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1}, {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1}, @@ -4795,6 +4798,7 @@ static zskiplistNode *zslCreateNode(int level, double score, robj *obj) { zskiplistNode *zn = zmalloc(sizeof(*zn)); zn->forward = zmalloc(sizeof(zskiplistNode*) * level); + zn->span = zmalloc(sizeof(unsigned long) * level); zn->score = score; zn->obj = obj; return zn; @@ -4808,8 +4812,10 @@ static zskiplist *zslCreate(void) { zsl->level = 1; zsl->length = 0; zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL); - for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) + for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { zsl->header->forward[j] = NULL; + zsl->header->span[j] = 0; + } zsl->header->backward = NULL; zsl->tail = NULL; return zsl; @@ -4818,6 +4824,7 @@ static zskiplist *zslCreate(void) { static void zslFreeNode(zskiplistNode *node) { decrRefCount(node->obj); zfree(node->forward); + zfree(node->span); zfree(node); } @@ -4825,6 +4832,7 @@ static void zslFree(zskiplist *zsl) { zskiplistNode *node = zsl->header->forward[0], *next; zfree(zsl->header->forward); + zfree(zsl->header->span); zfree(zsl->header); while(node) { next = node->forward[0]; @@ -4843,15 +4851,21 @@ static int zslRandomLevel(void) { static void zslInsert(zskiplist *zsl, double score, robj *obj) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; + unsigned long span[ZSKIPLIST_MAXLEVEL]; int i, level; x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { + /* store span that is crossed to reach the insert position */ + span[i] = i == (zsl->level-1) ? 0 : span[i+1]; + while (x->forward[i] && (x->forward[i]->score < score || (x->forward[i]->score == score && - compareStringObjects(x->forward[i]->obj,obj) < 0))) + compareStringObjects(x->forward[i]->obj,obj) < 0))) { + span[i] += x->span[i]; x = x->forward[i]; + } update[i] = x; } /* we assume the key is not already inside, since we allow duplicated @@ -4860,15 +4874,28 @@ static void zslInsert(zskiplist *zsl, double score, robj *obj) { * if the element is already inside or not. */ level = zslRandomLevel(); if (level > zsl->level) { - for (i = zsl->level; i < level; i++) + for (i = zsl->level; i < level; i++) { + span[i] = 0; update[i] = zsl->header; + update[i]->span[i] = zsl->length; + } zsl->level = level; } x = zslCreateNode(level,score,obj); for (i = 0; i < level; i++) { x->forward[i] = update[i]->forward[i]; update[i]->forward[i] = x; + + /* update span covered by update[i] as x is inserted here */ + x->span[i] = update[i]->span[i] - (span[0] - span[i]); + update[i]->span[i] = (span[0] - span[i]) + 1; } + + /* increment span for untouched levels */ + for (i = level; i < zsl->level; i++) { + update[i]->span[i]++; + } + x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->forward[0]) x->forward[0]->backward = x; @@ -4896,8 +4923,12 @@ static int zslDelete(zskiplist *zsl, double score, robj *obj) { x = x->forward[0]; if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) { for (i = 0; i < zsl->level; i++) { - if (update[i]->forward[i] != x) break; - update[i]->forward[i] = x->forward[i]; + if (update[i]->forward[i] == x) { + update[i]->span[i] += x->span[i] - 1; + update[i]->forward[i] = x->forward[i]; + } else { + update[i]->span[i] -= 1; + } } if (x->forward[0]) { x->forward[0]->backward = (x->backward == zsl->header) ? @@ -4938,8 +4969,12 @@ static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict zskiplistNode *next; for (i = 0; i < zsl->level; i++) { - if (update[i]->forward[i] != x) break; - update[i]->forward[i] = x->forward[i]; + if (update[i]->forward[i] == x) { + update[i]->span[i] += x->span[i] - 1; + update[i]->forward[i] = x->forward[i]; + } else { + update[i]->span[i] -= 1; + } } if (x->forward[0]) { x->forward[0]->backward = (x->backward == zsl->header) ? @@ -5379,6 +5414,50 @@ static void zscoreCommand(redisClient *c) { } } +static void zrankCommand(redisClient *c) { + robj *o; + o = lookupKeyRead(c->db,c->argv[1]); + if (o == NULL) { + addReply(c,shared.nullbulk); + return; + } + if (o->type != REDIS_ZSET) { + addReply(c,shared.wrongtypeerr); + return; + } + + zset *zs = o->ptr; + zskiplist *zsl = zs->zsl; + dictEntry *de = dictFind(zs->dict,c->argv[2]); + if (!de) { + addReply(c,shared.nullbulk); + return; + } + + double *score = dictGetEntryVal(de); + zskiplistNode *x; + unsigned long rank = 0; + int i; + + x = zsl->header; + for (i = zsl->level-1; i >= 0; i--) { + while (x->forward[i] && + (x->forward[i]->score < *score || + (x->forward[i]->score == *score && + compareStringObjects(x->forward[i]->obj,c->argv[2]) < 0))) { + rank += x->span[i]; + x = x->forward[i]; + } + + if (x->forward[i] && compareStringObjects(x->forward[i]->obj,c->argv[2]) == 0) { + addReplyLong(c, rank); + return; + } + } + + addReply(c,shared.nullbulk); +} + /* ========================= Non type-specific commands ==================== */ static void flushdbCommand(redisClient *c) { diff --git a/test-redis.tcl b/test-redis.tcl index 6eea1189ff23757506af1dc1886162332329a71c..d0b664be66b96da2fb6df8abbb7337ebbd26411e 100644 --- a/test-redis.tcl +++ b/test-redis.tcl @@ -1192,6 +1192,18 @@ proc main {server port} { $r zcard ztmp-blabla } {0} + test {ZRANK basics} { + $r zadd zranktmp 10 x + $r zadd zranktmp 20 y + $r zadd zranktmp 30 z + list [$r zrank zranktmp x] [$r zrank zranktmp y] [$r zrank zranktmp z] + } {0 1 2} + + test {ZRANK - after deletion} { + $r zrem zranktmp y + list [$r zrank zranktmp x] [$r zrank zranktmp z] + } {0 1} + test {ZSCORE} { set aux {} set err {}