提交 df26a0ae 编写于 作者: P Pieter Noordhuis

Encode sorted set after loading from dump

上级 255eebe2
......@@ -756,11 +756,13 @@ robj *rdbLoadObject(int type, FILE *fp) {
} else if (type == REDIS_ZSET) {
/* Read list/set value */
size_t zsetlen;
size_t maxelelen = 0;
zset *zs;
if ((zsetlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
o = createZsetObject();
zs = o->ptr;
/* Load every single element of the list/set */
while(zsetlen--) {
robj *ele;
......@@ -770,10 +772,21 @@ robj *rdbLoadObject(int type, FILE *fp) {
if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
ele = tryObjectEncoding(ele);
if (rdbLoadDoubleValue(fp,&score) == -1) return NULL;
/* Don't care about integer-encoded strings. */
if (ele->encoding == REDIS_ENCODING_RAW &&
sdslen(ele->ptr) > maxelelen)
maxelelen = sdslen(ele->ptr);
znode = zslInsert(zs->zsl,score,ele);
dictAdd(zs->dict,ele,&znode->score);
incrRefCount(ele); /* added to skiplist */
}
/* Convert *after* loading, since sorted sets are not stored ordered. */
if (zsetLength(o) <= server.zset_max_ziplist_entries &&
maxelelen <= server.zset_max_ziplist_value)
zsetConvert(o,REDIS_ENCODING_ZIPLIST);
} else if (type == REDIS_HASH) {
size_t hashlen;
......@@ -861,6 +874,8 @@ robj *rdbLoadObject(int type, FILE *fp) {
case REDIS_ZSET_ZIPLIST:
o->type = REDIS_ZSET;
o->encoding = REDIS_ENCODING_ZIPLIST;
if (zsetLength(o) > server.zset_max_ziplist_entries)
zsetConvert(o,REDIS_ENCODING_RAW);
break;
default:
redisPanic("Unknown enoding");
......
......@@ -799,6 +799,9 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal);
zskiplist *zslCreate(void);
void zslFree(zskiplist *zsl);
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);
int zzlInsert(robj *zobj, robj *ele, double score);
unsigned int zsetLength(robj *zobj);
void zsetConvert(robj *zobj, int encoding);
/* Core functions */
void freeMemoryIfNeeded(void);
......
......@@ -715,7 +715,7 @@ unsigned long zzlDeleteRangeByRank(robj *zobj, unsigned int start, unsigned int
* Common sorted set API
*----------------------------------------------------------------------------*/
int zsLength(robj *zobj) {
unsigned int zsetLength(robj *zobj) {
int length = -1;
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
length = zzlLength(zobj->ptr);
......@@ -727,7 +727,7 @@ int zsLength(robj *zobj) {
return length;
}
void zsConvert(robj *zobj, int encoding) {
void zsetConvert(robj *zobj, int encoding) {
zset *zs;
zskiplistNode *node, *next;
robj *ele;
......@@ -873,9 +873,9 @@ void zaddGenericCommand(redisClient *c, int incr) {
* too long *before* executing zzlInsert. */
redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK);
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
zsConvert(zobj,REDIS_ENCODING_RAW);
zsetConvert(zobj,REDIS_ENCODING_RAW);
if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
zsConvert(zobj,REDIS_ENCODING_RAW);
zsetConvert(zobj,REDIS_ENCODING_RAW);
signalModifiedKey(c->db,key);
server.dirty++;
......@@ -1042,7 +1042,7 @@ void zremrangebyrankCommand(redisClient *c) {
checkType(c,zobj,REDIS_ZSET)) return;
/* Sanitize indexes. */
llen = zsLength(zobj);
llen = zsetLength(zobj);
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
......@@ -1598,10 +1598,10 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
/* Convert to ziplist when in limits. */
if (dstzset->zsl->length <= server.zset_max_ziplist_entries &&
maxelelen <= server.zset_max_ziplist_value)
zsConvert(dstobj,REDIS_ENCODING_ZIPLIST);
zsetConvert(dstobj,REDIS_ENCODING_ZIPLIST);
dbAdd(c->db,dstkey,dstobj);
addReplyLongLong(c,zsLength(dstobj));
addReplyLongLong(c,zsetLength(dstobj));
if (!touched) signalModifiedKey(c->db,dstkey);
server.dirty++;
} else {
......@@ -1642,7 +1642,7 @@ void zrangeGenericCommand(redisClient *c, int reverse) {
|| checkType(c,zobj,REDIS_ZSET)) return;
/* Sanitize indexes. */
llen = zsLength(zobj);
llen = zsetLength(zobj);
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
......@@ -1926,7 +1926,7 @@ void zcardCommand(redisClient *c) {
if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
checkType(c,zobj,REDIS_ZSET)) return;
addReplyLongLong(c,zsLength(zobj));
addReplyLongLong(c,zsetLength(zobj));
}
void zscoreCommand(redisClient *c) {
......@@ -1968,7 +1968,7 @@ void zrankGenericCommand(redisClient *c, int reverse) {
if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
checkType(c,zobj,REDIS_ZSET)) return;
llen = zsLength(zobj);
llen = zsetLength(zobj);
redisAssert(ele->encoding == REDIS_ENCODING_RAW);
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册