From f362f7a18a64a21e2feef3851d6c6b4dee06de50 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 20 Apr 2016 12:38:14 +0200 Subject: [PATCH] Modules: sorted set iterators WIP #3. --- src/module.c | 124 ++++++++++++++++++++++++++++++++------- src/modules/helloworld.c | 29 +++++++-- src/redismodule.h | 4 ++ src/server.h | 1 + 4 files changed, 134 insertions(+), 24 deletions(-) diff --git a/src/module.c b/src/module.c index 3ab9da71e..8b64fb9e5 100644 --- a/src/module.c +++ b/src/module.c @@ -1010,13 +1010,13 @@ int RM_ZsetRangeEndReached(RedisModuleKey *key) { return key->zer; } -/* Setup a sorted set iterator seeking the first element in the specified - * range. Returns REDISMODULE_OK if the iterator was correctly initialized - * otherwise REDISMODULE_ERR is returned in the following conditions: - * - * 1. The value stored at key is not a sorted set or the key is empty. - * 2. The iterator type is unrecognized. */ -int RM_ZsetFirstInRange(RedisModuleKey *key, RedisModuleZsetRange *zr) { +/* Helper function for RM_ZsetFirstInRange() and RM_ZsetLastInRange(). + * Setup the sorted set iteration according to the specified range + * (see the functions calling it for more info). If first is true the + * first element in the range is used as a starting point for the iterator + * otherwise the last. Return REDISMODULE_OK on success otherwise + * REDISMODULE_ERR. */ +int zsetInitRange(RedisModuleKey *key, RedisModuleZsetRange *zr, int first) { if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR; key->zr = zr; key->zcurrent = NULL; @@ -1032,11 +1032,13 @@ int RM_ZsetFirstInRange(RedisModuleKey *key, RedisModuleZsetRange *zr) { zrs.maxex = (zr->flags & REDISMODULE_ZSET_RANGE_END_EX) != 0; if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { - key->zcurrent = zzlFirstInRange(key->value->ptr,&zrs); + key->zcurrent = first ? zzlFirstInRange(key->value->ptr,&zrs) : + zzlLastInRange(key->value->ptr,&zrs); } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = key->value->ptr; zskiplist *zsl = zs->zsl; - key->zcurrent = zslFirstInRange(zsl,&zrs); + key->zcurrent = first ? zslFirstInRange(zsl,&zrs) : + zslLastInRange(zsl,&zrs); } else { serverPanic("Unsupported zset encoding"); } @@ -1047,6 +1049,22 @@ int RM_ZsetFirstInRange(RedisModuleKey *key, RedisModuleZsetRange *zr) { } } +/* Setup a sorted set iterator seeking the first element in the specified + * range. Returns REDISMODULE_OK if the iterator was correctly initialized + * otherwise REDISMODULE_ERR is returned in the following conditions: + * + * 1. The value stored at key is not a sorted set or the key is empty. + * 2. The iterator type is unrecognized. */ +int RM_ZsetFirstInRange(RedisModuleKey *key, RedisModuleZsetRange *zr) { + return zsetInitRange(key,zr,1); +} + +/* Exactly like RM_ZsetFirstInRange() but the last element of the range + * is seeked instead. */ +int RM_ZsetLastInRange(RedisModuleKey *key, RedisModuleZsetRange *zr) { + return zsetInitRange(key,zr,0); +} + /* Return the current sorted set element of an active sorted set iterator * or NULL if the range specified in the iterator does not include any * element. */ @@ -1100,19 +1118,20 @@ int RM_ZsetRangeNext(RedisModuleKey *key) { key->zer = 1; return 0; } else { - /* Fetch the next element score for the - * range check. */ - unsigned char *saved_next = next; - next = ziplistNext(zl,next); /* Skip next element. */ - double score = zzlGetScore(next); /* Obtain the next score. */ /* Are we still within the range? */ - if (zr->type == REDISMODULE_ZSET_RANGE_SCORE && - !zslValueLteMax(score,&zrs)) - { - key->zer = 1; - return 0; + if (zr->type == REDISMODULE_ZSET_RANGE_SCORE) { + /* Fetch the next element score for the + * range check. */ + unsigned char *saved_next = next; + next = ziplistNext(zl,next); /* Skip next element. */ + double score = zzlGetScore(next); /* Obtain the next score. */ + if (!zslValueLteMax(score,&zrs)) { + key->zer = 1; + return 0; + } + next = saved_next; } - key->zcurrent = saved_next; + key->zcurrent = next; return 1; } } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { @@ -1136,6 +1155,69 @@ int RM_ZsetRangeNext(RedisModuleKey *key) { } } +/* Go to the previous element of the sorted set iterator. Returns 1 if there was + * a previous element, 0 if we are already at the first element or the range + * does not include any item at all. */ +int RM_ZsetRangePrev(RedisModuleKey *key) { + if (!key->zr || !key->zcurrent) return 0; /* No active iterator. */ + zrangespec zrs; + + /* Convert to core range structure. */ + RedisModuleZsetRange *zr = key->zr; + if (zr->type == REDISMODULE_ZSET_RANGE_SCORE) { + zrs.min = zr->score_start; + zrs.max = zr->score_end; + zrs.minex = (zr->flags & REDISMODULE_ZSET_RANGE_START_EX) != 0; + zrs.maxex = (zr->flags & REDISMODULE_ZSET_RANGE_END_EX) != 0; + } + + if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { + unsigned char *zl = key->value->ptr; + unsigned char *eptr = key->zcurrent; + unsigned char *prev; + prev = ziplistPrev(zl,eptr); /* Go back to previous score. */ + if (prev) prev = ziplistPrev(zl,prev); /* Back to previous ele. */ + if (prev == NULL) { + key->zer = 1; + return 0; + } else { + /* Are we still within the range? */ + if (zr->type == REDISMODULE_ZSET_RANGE_SCORE) { + /* Fetch the previous element score for the + * range check. */ + unsigned char *saved_prev = prev; + prev = ziplistNext(zl,prev); /* Skip element to get the score. */ + double score = zzlGetScore(prev); /* Obtain the prev score. */ + if (!zslValueGteMin(score,&zrs)) { + key->zer = 1; + return 0; + } + prev = saved_prev; + } + key->zcurrent = prev; + return 1; + } + } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { + zskiplistNode *ln = key->zcurrent, *prev = ln->backward; + if (prev == NULL) { + key->zer = 1; + return 0; + } else { + /* Are we still within the range? */ + if (zr->type == REDISMODULE_ZSET_RANGE_SCORE && + !zslValueGteMin(ln->score,&zrs)) + { + key->zer = 1; + return 0; + } + key->zcurrent = prev; + return 1; + } + } else { + serverPanic("Unsupported zset encoding"); + } +} + /* -------------------------------------------------------------------------- * Redis <-> Modules generic Call() API * -------------------------------------------------------------------------- */ @@ -1589,8 +1671,10 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(ZsetRem); REGISTER_API(ZsetRangeStop); REGISTER_API(ZsetFirstInRange); + REGISTER_API(ZsetLastInRange); REGISTER_API(ZsetRangeCurrentElement); REGISTER_API(ZsetRangeNext); + REGISTER_API(ZsetRangePrev); REGISTER_API(ZsetRangeEndReached); } diff --git a/src/modules/helloworld.c b/src/modules/helloworld.c index 5f4366091..6ff1643cb 100644 --- a/src/modules/helloworld.c +++ b/src/modules/helloworld.c @@ -327,7 +327,11 @@ int HelloMoreExpire_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, } /* HELLO.ZSUMRANGE key startscore endscore - * Return the sum of all the scores elements between startscore and endscore. */ + * Return the sum of all the scores elements between startscore and endscore. + * + * The computation is performed two times, one time from start to end and + * another time backward. The two scores, returned as a two element array, + * should match.*/ int HelloZsumRange_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModuleZsetRange zrange = REDISMODULE_ZSET_RANGE_INIT; zrange.type = REDISMODULE_ZSET_RANGE_SCORE; @@ -344,18 +348,35 @@ int HelloZsumRange_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, i return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE); } + double scoresum_a = 0; + double scoresum_b = 0; + RedisModule_ZsetFirstInRange(key,&zrange); - double scoresum = 0; while(!RedisModule_ZsetRangeEndReached(key)) { double score; RedisModuleString *ele = RedisModule_ZsetRangeCurrentElement(key,&score); RedisModule_FreeString(ctx,ele); - scoresum += score; + scoresum_a += score; RedisModule_ZsetRangeNext(key); } RedisModule_ZsetRangeStop(key); + + RedisModule_ZsetLastInRange(key,&zrange); + while(!RedisModule_ZsetRangeEndReached(key)) { + double score; + RedisModuleString *ele = RedisModule_ZsetRangeCurrentElement(key,&score); + RedisModule_FreeString(ctx,ele); + scoresum_b += score; + RedisModule_ZsetRangePrev(key); + } + + RedisModule_ZsetRangeStop(key); + RedisModule_CloseKey(key); - RedisModule_ReplyWithDouble(ctx,scoresum); + + RedisModule_ReplyWithArray(ctx,2); + RedisModule_ReplyWithDouble(ctx,scoresum_a); + RedisModule_ReplyWithDouble(ctx,scoresum_b); return REDISMODULE_OK; } diff --git a/src/redismodule.h b/src/redismodule.h index 42058b992..b3c702e47 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -145,8 +145,10 @@ int REDISMODULE_API_FUNC(RedisModule_ZsetScore)(RedisModuleKey *key, RedisModule int REDISMODULE_API_FUNC(RedisModule_ZsetRem)(RedisModuleKey *key, RedisModuleString *ele, int *deleted); void REDISMODULE_API_FUNC(RedisModule_ZsetRangeStop)(RedisModuleKey *key); int REDISMODULE_API_FUNC(RedisModule_ZsetFirstInRange)(RedisModuleKey *key, RedisModuleZsetRange *zr); +int REDISMODULE_API_FUNC(RedisModule_ZsetLastInRange)(RedisModuleKey *key, RedisModuleZsetRange *zr); RedisModuleString *REDISMODULE_API_FUNC(RedisModule_ZsetRangeCurrentElement)(RedisModuleKey *key, double *score); int REDISMODULE_API_FUNC(RedisModule_ZsetRangeNext)(RedisModuleKey *key); +int REDISMODULE_API_FUNC(RedisModule_ZsetRangePrev)(RedisModuleKey *key); int REDISMODULE_API_FUNC(RedisModule_ZsetRangeEndReached)(RedisModuleKey *key); /* This is included inline inside each Redis module. */ @@ -203,8 +205,10 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(ZsetRem); REDISMODULE_GET_API(ZsetRangeStop); REDISMODULE_GET_API(ZsetFirstInRange); + REDISMODULE_GET_API(ZsetLastInRange); REDISMODULE_GET_API(ZsetRangeCurrentElement); REDISMODULE_GET_API(ZsetRangeNext); + REDISMODULE_GET_API(ZsetRangePrev); REDISMODULE_GET_API(ZsetRangeEndReached); RedisModule_SetModuleAttribs(ctx,name,ver,apiver); diff --git a/src/server.h b/src/server.h index d6d1fd18e..9015e3373 100644 --- a/src/server.h +++ b/src/server.h @@ -1335,6 +1335,7 @@ double zzlGetScore(unsigned char *sptr); void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range); +unsigned char *zzlLastInRange(unsigned char *zl, zrangespec *range); unsigned int zsetLength(robj *zobj); void zsetConvert(robj *zobj, int encoding); void zsetConvertToZiplistIfNeeded(robj *zobj, size_t maxelelen); -- GitLab