提交 a669d5e9 编写于 作者: P Pieter Noordhuis

Convert encoding when thresholds overflow

上级 3ea204e1
......@@ -728,6 +728,85 @@ int zsLength(robj *zobj) {
return length;
}
void zsConvert(robj *zobj, int encoding) {
zset *zs;
zskiplistNode *node, *next;
robj *ele;
double score;
if (zobj->encoding == encoding) return;
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
if (encoding != REDIS_ENCODING_RAW)
redisPanic("Unknown target encoding");
zs = zmalloc(sizeof(*zs));
zs->dict = dictCreate(&zsetDictType,NULL);
zs->zsl = zslCreate();
eptr = ziplistIndex(zl,0);
redisAssert(eptr != NULL);
sptr = ziplistNext(zl,eptr);
redisAssert(sptr != NULL);
while (eptr != NULL) {
score = zzlGetScore(sptr);
redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
if (vstr == NULL)
ele = createStringObjectFromLongLong(vlong);
else
ele = createStringObject((char*)vstr,vlen);
/* Has incremented refcount since it was just created. */
node = zslInsert(zs->zsl,score,ele);
redisAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
incrRefCount(ele); /* Added to dictionary. */
zzlNext(zl,&eptr,&sptr);
}
zfree(zobj->ptr);
zobj->ptr = zs;
zobj->encoding = REDIS_ENCODING_RAW;
} else if (zobj->encoding == REDIS_ENCODING_RAW) {
unsigned char *zl = ziplistNew();
if (encoding != REDIS_ENCODING_ZIPLIST)
redisPanic("Unknown target encoding");
/* Approach similar to zslFree(), since we want to free the skiplist at
* the same time as creating the ziplist. */
zs = zobj->ptr;
dictRelease(zs->dict);
node = zs->zsl->header->level[0].forward;
zfree(zs->zsl->header);
zfree(zs->zsl);
/* Immediately store pointer to ziplist in object because it will
* change because of reallocations when pushing to the ziplist. */
zobj->ptr = zl;
while (node) {
ele = getDecodedObject(node->obj);
redisAssert(zzlInsertAt(zobj,ele,node->score,NULL) == REDIS_OK);
decrRefCount(ele);
next = node->level[0].forward;
zslFreeNode(node);
node = next;
}
zfree(zs);
zobj->encoding = REDIS_ENCODING_ZIPLIST;
} else {
redisPanic("Unknown sorted set encoding");
}
}
/*-----------------------------------------------------------------------------
* Sorted set commands
*----------------------------------------------------------------------------*/
......@@ -746,7 +825,13 @@ void zaddGenericCommand(redisClient *c, int incr) {
zobj = lookupKeyWrite(c->db,key);
if (zobj == NULL) {
zobj = createZsetZiplistObject();
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
{
zobj = createZsetObject();
} else {
zobj = createZsetZiplistObject();
}
dbAdd(c->db,key,zobj);
} else {
if (zobj->type != REDIS_ZSET) {
......@@ -785,7 +870,13 @@ void zaddGenericCommand(redisClient *c, int incr) {
else /* ZADD */
addReply(c,shared.czero);
} else {
/* Optimize: check if the element is too large or the list becomes
* too long *before* executing zzlInsert. */
redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK);
if (zzlLength(zobj) > server.zset_max_ziplist_entries)
zsConvert(zobj,REDIS_ENCODING_RAW);
if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
zsConvert(zobj,REDIS_ENCODING_RAW);
signalModifiedKey(c->db,key);
server.dirty++;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册