提交 f362f7a1 编写于 作者: A antirez

Modules: sorted set iterators WIP #3.

上级 bdbb5a02
......@@ -1010,13 +1010,13 @@ int RM_ZsetRangeEndReached(RedisModuleKey *key) {
return key->zer;
}
/* Setup a sorted set iterator seeking the first element in the specified
* range. Returns REDISMODULE_OK if the iterator was correctly initialized
* otherwise REDISMODULE_ERR is returned in the following conditions:
*
* 1. The value stored at key is not a sorted set or the key is empty.
* 2. The iterator type is unrecognized. */
int RM_ZsetFirstInRange(RedisModuleKey *key, RedisModuleZsetRange *zr) {
/* Helper function for RM_ZsetFirstInRange() and RM_ZsetLastInRange().
* Setup the sorted set iteration according to the specified range
* (see the functions calling it for more info). If first is true the
* first element in the range is used as a starting point for the iterator
* otherwise the last. Return REDISMODULE_OK on success otherwise
* REDISMODULE_ERR. */
int zsetInitRange(RedisModuleKey *key, RedisModuleZsetRange *zr, int first) {
if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
key->zr = zr;
key->zcurrent = NULL;
......@@ -1032,11 +1032,13 @@ int RM_ZsetFirstInRange(RedisModuleKey *key, RedisModuleZsetRange *zr) {
zrs.maxex = (zr->flags & REDISMODULE_ZSET_RANGE_END_EX) != 0;
if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
key->zcurrent = zzlFirstInRange(key->value->ptr,&zrs);
key->zcurrent = first ? zzlFirstInRange(key->value->ptr,&zrs) :
zzlLastInRange(key->value->ptr,&zrs);
} else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = key->value->ptr;
zskiplist *zsl = zs->zsl;
key->zcurrent = zslFirstInRange(zsl,&zrs);
key->zcurrent = first ? zslFirstInRange(zsl,&zrs) :
zslLastInRange(zsl,&zrs);
} else {
serverPanic("Unsupported zset encoding");
}
......@@ -1047,6 +1049,22 @@ int RM_ZsetFirstInRange(RedisModuleKey *key, RedisModuleZsetRange *zr) {
}
}
/* Setup a sorted set iterator seeking the first element in the specified
* range. Returns REDISMODULE_OK if the iterator was correctly initialized
* otherwise REDISMODULE_ERR is returned in the following conditions:
*
* 1. The value stored at key is not a sorted set or the key is empty.
* 2. The iterator type is unrecognized. */
int RM_ZsetFirstInRange(RedisModuleKey *key, RedisModuleZsetRange *zr) {
return zsetInitRange(key,zr,1);
}
/* Exactly like RM_ZsetFirstInRange() but the last element of the range
* is seeked instead. */
int RM_ZsetLastInRange(RedisModuleKey *key, RedisModuleZsetRange *zr) {
return zsetInitRange(key,zr,0);
}
/* Return the current sorted set element of an active sorted set iterator
* or NULL if the range specified in the iterator does not include any
* element. */
......@@ -1100,19 +1118,20 @@ int RM_ZsetRangeNext(RedisModuleKey *key) {
key->zer = 1;
return 0;
} else {
/* Fetch the next element score for the
* range check. */
unsigned char *saved_next = next;
next = ziplistNext(zl,next); /* Skip next element. */
double score = zzlGetScore(next); /* Obtain the next score. */
/* Are we still within the range? */
if (zr->type == REDISMODULE_ZSET_RANGE_SCORE &&
!zslValueLteMax(score,&zrs))
{
key->zer = 1;
return 0;
if (zr->type == REDISMODULE_ZSET_RANGE_SCORE) {
/* Fetch the next element score for the
* range check. */
unsigned char *saved_next = next;
next = ziplistNext(zl,next); /* Skip next element. */
double score = zzlGetScore(next); /* Obtain the next score. */
if (!zslValueLteMax(score,&zrs)) {
key->zer = 1;
return 0;
}
next = saved_next;
}
key->zcurrent = saved_next;
key->zcurrent = next;
return 1;
}
} else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
......@@ -1136,6 +1155,69 @@ int RM_ZsetRangeNext(RedisModuleKey *key) {
}
}
/* Go to the previous element of the sorted set iterator. Returns 1 if there was
* a previous element, 0 if we are already at the first element or the range
* does not include any item at all. */
int RM_ZsetRangePrev(RedisModuleKey *key) {
if (!key->zr || !key->zcurrent) return 0; /* No active iterator. */
zrangespec zrs;
/* Convert to core range structure. */
RedisModuleZsetRange *zr = key->zr;
if (zr->type == REDISMODULE_ZSET_RANGE_SCORE) {
zrs.min = zr->score_start;
zrs.max = zr->score_end;
zrs.minex = (zr->flags & REDISMODULE_ZSET_RANGE_START_EX) != 0;
zrs.maxex = (zr->flags & REDISMODULE_ZSET_RANGE_END_EX) != 0;
}
if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = key->value->ptr;
unsigned char *eptr = key->zcurrent;
unsigned char *prev;
prev = ziplistPrev(zl,eptr); /* Go back to previous score. */
if (prev) prev = ziplistPrev(zl,prev); /* Back to previous ele. */
if (prev == NULL) {
key->zer = 1;
return 0;
} else {
/* Are we still within the range? */
if (zr->type == REDISMODULE_ZSET_RANGE_SCORE) {
/* Fetch the previous element score for the
* range check. */
unsigned char *saved_prev = prev;
prev = ziplistNext(zl,prev); /* Skip element to get the score. */
double score = zzlGetScore(prev); /* Obtain the prev score. */
if (!zslValueGteMin(score,&zrs)) {
key->zer = 1;
return 0;
}
prev = saved_prev;
}
key->zcurrent = prev;
return 1;
}
} else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
zskiplistNode *ln = key->zcurrent, *prev = ln->backward;
if (prev == NULL) {
key->zer = 1;
return 0;
} else {
/* Are we still within the range? */
if (zr->type == REDISMODULE_ZSET_RANGE_SCORE &&
!zslValueGteMin(ln->score,&zrs))
{
key->zer = 1;
return 0;
}
key->zcurrent = prev;
return 1;
}
} else {
serverPanic("Unsupported zset encoding");
}
}
/* --------------------------------------------------------------------------
* Redis <-> Modules generic Call() API
* -------------------------------------------------------------------------- */
......@@ -1589,8 +1671,10 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(ZsetRem);
REGISTER_API(ZsetRangeStop);
REGISTER_API(ZsetFirstInRange);
REGISTER_API(ZsetLastInRange);
REGISTER_API(ZsetRangeCurrentElement);
REGISTER_API(ZsetRangeNext);
REGISTER_API(ZsetRangePrev);
REGISTER_API(ZsetRangeEndReached);
}
......
......@@ -327,7 +327,11 @@ int HelloMoreExpire_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
}
/* HELLO.ZSUMRANGE key startscore endscore
* Return the sum of all the scores elements between startscore and endscore. */
* Return the sum of all the scores elements between startscore and endscore.
*
* The computation is performed two times, one time from start to end and
* another time backward. The two scores, returned as a two element array,
* should match.*/
int HelloZsumRange_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModuleZsetRange zrange = REDISMODULE_ZSET_RANGE_INIT;
zrange.type = REDISMODULE_ZSET_RANGE_SCORE;
......@@ -344,18 +348,35 @@ int HelloZsumRange_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, i
return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE);
}
double scoresum_a = 0;
double scoresum_b = 0;
RedisModule_ZsetFirstInRange(key,&zrange);
double scoresum = 0;
while(!RedisModule_ZsetRangeEndReached(key)) {
double score;
RedisModuleString *ele = RedisModule_ZsetRangeCurrentElement(key,&score);
RedisModule_FreeString(ctx,ele);
scoresum += score;
scoresum_a += score;
RedisModule_ZsetRangeNext(key);
}
RedisModule_ZsetRangeStop(key);
RedisModule_ZsetLastInRange(key,&zrange);
while(!RedisModule_ZsetRangeEndReached(key)) {
double score;
RedisModuleString *ele = RedisModule_ZsetRangeCurrentElement(key,&score);
RedisModule_FreeString(ctx,ele);
scoresum_b += score;
RedisModule_ZsetRangePrev(key);
}
RedisModule_ZsetRangeStop(key);
RedisModule_CloseKey(key);
RedisModule_ReplyWithDouble(ctx,scoresum);
RedisModule_ReplyWithArray(ctx,2);
RedisModule_ReplyWithDouble(ctx,scoresum_a);
RedisModule_ReplyWithDouble(ctx,scoresum_b);
return REDISMODULE_OK;
}
......
......@@ -145,8 +145,10 @@ int REDISMODULE_API_FUNC(RedisModule_ZsetScore)(RedisModuleKey *key, RedisModule
int REDISMODULE_API_FUNC(RedisModule_ZsetRem)(RedisModuleKey *key, RedisModuleString *ele, int *deleted);
void REDISMODULE_API_FUNC(RedisModule_ZsetRangeStop)(RedisModuleKey *key);
int REDISMODULE_API_FUNC(RedisModule_ZsetFirstInRange)(RedisModuleKey *key, RedisModuleZsetRange *zr);
int REDISMODULE_API_FUNC(RedisModule_ZsetLastInRange)(RedisModuleKey *key, RedisModuleZsetRange *zr);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_ZsetRangeCurrentElement)(RedisModuleKey *key, double *score);
int REDISMODULE_API_FUNC(RedisModule_ZsetRangeNext)(RedisModuleKey *key);
int REDISMODULE_API_FUNC(RedisModule_ZsetRangePrev)(RedisModuleKey *key);
int REDISMODULE_API_FUNC(RedisModule_ZsetRangeEndReached)(RedisModuleKey *key);
/* This is included inline inside each Redis module. */
......@@ -203,8 +205,10 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(ZsetRem);
REDISMODULE_GET_API(ZsetRangeStop);
REDISMODULE_GET_API(ZsetFirstInRange);
REDISMODULE_GET_API(ZsetLastInRange);
REDISMODULE_GET_API(ZsetRangeCurrentElement);
REDISMODULE_GET_API(ZsetRangeNext);
REDISMODULE_GET_API(ZsetRangePrev);
REDISMODULE_GET_API(ZsetRangeEndReached);
RedisModule_SetModuleAttribs(ctx,name,ver,apiver);
......
......@@ -1335,6 +1335,7 @@ double zzlGetScore(unsigned char *sptr);
void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec *range);
unsigned char *zzlLastInRange(unsigned char *zl, zrangespec *range);
unsigned int zsetLength(robj *zobj);
void zsetConvert(robj *zobj, int encoding);
void zsetConvertToZiplistIfNeeded(robj *zobj, size_t maxelelen);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册