提交 c772d9c6 编写于 作者: A antirez

take a hashslot -> keys index, will be used for cluster rehasing

上级 45b0f6fb
......@@ -202,6 +202,7 @@ void clusterInit(void) {
}
if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
server.cluster.slots_to_keys = zslCreate();
}
/* -----------------------------------------------------------------------------
......
......@@ -2,6 +2,9 @@
#include <signal.h>
void SlotToKeyAdd(robj *key);
void SlotToKeyDel(robj *key);
/*-----------------------------------------------------------------------------
* C-level DB API
*----------------------------------------------------------------------------*/
......@@ -131,6 +134,7 @@ int dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr);
dictAdd(db->dict, copy, val);
if (server.ds_enabled) cacheSetKeyMayExist(db,key);
if (server.cluster_enabled) SlotToKeyAdd(key);
return REDIS_OK;
}
}
......@@ -146,6 +150,7 @@ int dbReplace(redisDb *db, robj *key, robj *val) {
if ((oldval = dictFetchValue(db->dict,key->ptr)) == NULL) {
sds copy = sdsdup(key->ptr);
dictAdd(db->dict, copy, val);
if (server.cluster_enabled) SlotToKeyAdd(key);
retval = 1;
} else {
dictReplace(db->dict, key->ptr, val);
......@@ -198,7 +203,12 @@ int dbDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
return dictDelete(db->dict,key->ptr) == DICT_OK;
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
if (server.cluster_enabled) SlotToKeyDel(key);
return 1;
} else {
return 0;
}
}
/* Empty the whole database.
......@@ -698,3 +708,31 @@ int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *num
*numkeys = num;
return keys;
}
/* Slot to Key API. This is used by Redis Cluster in order to obtain in
* a fast way a key that belongs to a specified hash slot. This is useful
* while rehashing the cluster. */
void SlotToKeyAdd(robj *key) {
unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
zslInsert(server.cluster.slots_to_keys,hashslot,key);
incrRefCount(key);
}
void SlotToKeyDel(robj *key) {
unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
zslDelete(server.cluster.slots_to_keys,hashslot,key);
}
robj *GetKeyInSlot(unsigned int hashslot) {
zskiplistNode *n;
zrangespec range;
range.min = range.max = hashslot;
range.minex = range.maxex = 0;
n = zslFirstInRange(server.cluster.slots_to_keys, range);
if (!n) return NULL;
return n->obj;
}
......@@ -367,6 +367,28 @@ struct sharedObjectsStruct {
*integers[REDIS_SHARED_INTEGERS];
};
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
robj *obj;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned int span;
} level[];
} zskiplistNode;
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;
/*-----------------------------------------------------------------------------
* Redis cluster data structures
*----------------------------------------------------------------------------*/
......@@ -425,6 +447,7 @@ typedef struct {
clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];
clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];
clusterNode *slots[REDIS_CLUSTER_SLOTS];
zskiplist *slots_to_keys;
} clusterState;
/* Redis cluster messages header */
......@@ -627,6 +650,7 @@ struct redisServer {
/* Misc */
unsigned lruclock:22; /* clock incrementing every minute, for LRU */
unsigned lruclock_padding:10;
/* Cluster */
int cluster_enabled;
clusterState cluster;
};
......@@ -671,28 +695,6 @@ typedef struct _redisSortOperation {
robj *pattern;
} redisSortOperation;
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
robj *obj;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned int span;
} level[];
} zskiplistNode;
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;
/* DIsk store threaded I/O request message */
#define REDIS_IOJOB_LOAD 0
#define REDIS_IOJOB_SAVE 1
......@@ -921,10 +923,19 @@ int startAppendOnly(void);
void backgroundRewriteDoneHandler(int exitcode, int bysignal);
/* Sorted sets data type */
/* Struct to hold a inclusive/exclusive range spec. */
typedef struct {
double min, max;
int minex, maxex; /* are min or max exclusive? */
} zrangespec;
zskiplist *zslCreate(void);
void zslFree(zskiplist *zsl);
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);
unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score);
int zslDelete(zskiplist *zsl, double score, robj *obj);
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range);
double zzlGetScore(unsigned char *sptr);
void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr);
......
......@@ -174,12 +174,6 @@ int zslDelete(zskiplist *zsl, double score, robj *obj) {
return 0; /* not found */
}
/* Struct to hold a inclusive/exclusive range spec. */
typedef struct {
double min, max;
int minex, maxex; /* are min or max exclusive? */
} zrangespec;
static int zslValueGteMin(double value, zrangespec *spec) {
return spec->minex ? (value > spec->min) : (value >= spec->min);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册