diff --git a/src/t_zset.c b/src/t_zset.c index 8c417ac7dd206bcb3803b0271f9ebb3f899acfc1..6b2b6d9f74b45d9760bf4d4f55762153524add1a 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -666,6 +666,22 @@ unsigned long zzlDeleteRangeByScore(robj *zobj, zrangespec range) { return deleted; } +/*----------------------------------------------------------------------------- + * Common sorted set API + *----------------------------------------------------------------------------*/ + +int zsLength(robj *zobj) { + int length = -1; + if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { + length = zzlLength(zobj); + } else if (zobj->encoding == REDIS_ENCODING_RAW) { + length = ((zset*)zobj->ptr)->zsl->length; + } else { + redisPanic("Unknown sorted set encoding"); + } + return length; +} + /*----------------------------------------------------------------------------- * Sorted set commands *----------------------------------------------------------------------------*/ @@ -1135,16 +1151,13 @@ void zinterstoreCommand(redisClient *c) { } void zrangeGenericCommand(redisClient *c, int reverse) { - robj *o; + robj *key = c->argv[1]; + robj *zobj; + int withscores = 0; long start; long end; - int withscores = 0; int llen; - int rangelen, j; - zset *zsetobj; - zskiplist *zsl; - zskiplistNode *ln; - robj *ele; + int rangelen; if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) || (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return; @@ -1156,13 +1169,11 @@ void zrangeGenericCommand(redisClient *c, int reverse) { return; } - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL - || checkType(c,o,REDIS_ZSET)) return; - zsetobj = o->ptr; - zsl = zsetobj->zsl; - llen = zsl->length; + if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL + || checkType(c,zobj,REDIS_ZSET)) return; - /* convert negative indexes */ + /* Sanitize indexes. */ + llen = zsLength(zobj); if (start < 0) start = llen+start; if (end < 0) end = llen+end; if (start < 0) start = 0; @@ -1176,23 +1187,79 @@ void zrangeGenericCommand(redisClient *c, int reverse) { if (end >= llen) end = llen-1; rangelen = (end-start)+1; - /* check if starting point is trivial, before searching - * the element in log(N) time */ - if (reverse) { - ln = start == 0 ? zsl->tail : zslGetElementByRank(zsl, llen-start); - } else { - ln = start == 0 ? - zsl->header->level[0].forward : zslGetElementByRank(zsl, start+1); - } - /* Return the result in form of a multi-bulk reply */ - addReplyMultiBulkLen(c,withscores ? (rangelen*2) : rangelen); - for (j = 0; j < rangelen; j++) { - ele = ln->obj; - addReplyBulk(c,ele); - if (withscores) - addReplyDouble(c,ln->score); - ln = reverse ? ln->backward : ln->level[0].forward; + addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen); + + if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + if (reverse) + eptr = ziplistIndex(zl,-2-(2*start)); + else + eptr = ziplistIndex(zl,2*start); + + while (rangelen--) { + redisAssert(eptr != NULL); + redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong)); + if (vstr == NULL) + addReplyBulkLongLong(c,vlong); + else + addReplyBulkCBuffer(c,vstr,vlen); + + if (withscores) { + sptr = ziplistNext(zl,eptr); + redisAssert(sptr != NULL); + addReplyDouble(c,zzlGetScore(sptr)); + } + + if (reverse) { + /* Move to previous element by moving to the score of previous + * element. When NULL, we know there also is no element. */ + sptr = ziplistPrev(zl,eptr); + if (sptr != NULL) { + eptr = ziplistPrev(zl,sptr); + redisAssert(eptr != NULL); + } else { + eptr = NULL; + } + } else { + sptr = ziplistNext(zl,eptr); + redisAssert(sptr != NULL); + eptr = ziplistNext(zl,sptr); + } + } + + } else if (zobj->encoding == REDIS_ENCODING_RAW) { + zset *zs = zobj->ptr; + zskiplist *zsl = zs->zsl; + zskiplistNode *ln; + robj *ele; + + /* Check if starting point is trivial, before doing log(N) lookup. */ + if (reverse) { + ln = zsl->tail; + if (start > 0) + ln = zslGetElementByRank(zsl,llen-start); + } else { + ln = zsl->header->level[0].forward; + if (start > 0) + ln = zslGetElementByRank(zsl,start+1); + } + + while(rangelen--) { + redisAssert(ln != NULL); + ele = ln->obj; + addReplyBulk(c,ele); + if (withscores) + addReplyDouble(c,ln->score); + ln = reverse ? ln->backward : ln->level[0].forward; + } + } else { + redisPanic("Unknown sorted set encoding"); } }