提交 07a9f854 编写于 作者: A antirez

Blocking POP: use a dictionary to store keys clinet side.

To store the keys we block for during a blocking pop operation, in the
case the client is blocked for more data to arrive, we used a simple
linear array of redis objects, in the blockingState structure:

    robj **keys;
    int count;

However in order to fix issue #801 we also use a dictionary in order to
avoid to end in the blocked clients queue for the same key multiple
times with the same client.

The dictionary was only temporary, just to avoid duplicates, but since
we create / destroy it there is no point in doing this duplicated work,
so this commit simply use a dictionary as the main structure to store
the keys we are blocked for. So instead of the previous fields we now
just have:

    dict *keys;

This simplifies the code and reduces the work done by the server during
a blocking POP operation.
上级 fbf3e33d
...@@ -90,8 +90,7 @@ redisClient *createClient(int fd) { ...@@ -90,8 +90,7 @@ redisClient *createClient(int fd) {
c->obuf_soft_limit_reached_time = 0; c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,decrRefCount); listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue);
c->bpop.keys = NULL; c->bpop.keys = dictCreate(&setDictType,NULL);
c->bpop.count = 0;
c->bpop.timeout = 0; c->bpop.timeout = 0;
c->bpop.target = NULL; c->bpop.target = NULL;
c->io_keys = listCreate(); c->io_keys = listCreate();
......
...@@ -350,9 +350,8 @@ typedef struct multiState { ...@@ -350,9 +350,8 @@ typedef struct multiState {
} multiState; } multiState;
typedef struct blockingState { typedef struct blockingState {
robj **keys; /* The key we are waiting to terminate a blocking dict *keys; /* The keys we are waiting to terminate a blocking
* operation such as BLPOP. Otherwise NULL. */ * operation such as BLPOP. Otherwise NULL. */
int count; /* Number of blocking keys */
time_t timeout; /* Blocking operation timeout. If UNIX current time time_t timeout; /* Blocking operation timeout. If UNIX current time
* is >= timeout then the operation timed out. */ * is >= timeout then the operation timed out. */
robj *target; /* The key that should receive the element, robj *target; /* The key that should receive the element,
......
...@@ -753,33 +753,18 @@ void rpoplpushCommand(redisClient *c) { ...@@ -753,33 +753,18 @@ void rpoplpushCommand(redisClient *c) {
/* Set a client in blocking mode for the specified key, with the specified /* Set a client in blocking mode for the specified key, with the specified
* timeout */ * timeout */
void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) { void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
dict *added;
dictEntry *de; dictEntry *de;
list *l; list *l;
int j, i; int j;
c->bpop.keys = zmalloc(sizeof(robj*)*numkeys);
c->bpop.timeout = timeout; c->bpop.timeout = timeout;
c->bpop.target = target; c->bpop.target = target;
if (target != NULL) incrRefCount(target); if (target != NULL) incrRefCount(target);
/* Create a dictionary that we use to avoid adding duplicated keys
* in case the user calls something like: "BLPOP foo foo foo 0".
* The rest of the implementation is simpler if we know there are no
* duplications in the key waiting list. */
added = dictCreate(&setDictType,NULL);
i = 0; /* The index for c->bpop.keys[...], we can't use the j loop
variable as the list of keys may have duplicated elements. */
for (j = 0; j < numkeys; j++) { for (j = 0; j < numkeys; j++) {
/* Add the key in the "added" dictionary to make sure there are /* If the key already exists in the dict ignore it. */
* no duplicated keys. */ if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
if (dictAdd(added,keys[j],NULL) != DICT_OK) continue;
incrRefCount(keys[j]);
/* Add the key in the client structure, to map clients -> keys */
c->bpop.keys[i++] = keys[j];
incrRefCount(keys[j]); incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */ /* And in the other "side", to map keys -> clients */
...@@ -797,39 +782,40 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj ...@@ -797,39 +782,40 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj
} }
listAddNodeTail(l,c); listAddNodeTail(l,c);
} }
c->bpop.count = i;
/* Mark the client as a blocked client */ /* Mark the client as a blocked client */
c->flags |= REDIS_BLOCKED; c->flags |= REDIS_BLOCKED;
server.bpop_blocked_clients++; server.bpop_blocked_clients++;
dictRelease(added);
} }
/* Unblock a client that's waiting in a blocking operation such as BLPOP */ /* Unblock a client that's waiting in a blocking operation such as BLPOP */
void unblockClientWaitingData(redisClient *c) { void unblockClientWaitingData(redisClient *c) {
dictEntry *de; dictEntry *de;
dictIterator *di;
list *l; list *l;
int j;
redisAssertWithInfo(c,NULL,c->bpop.keys != NULL); redisAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
di = dictGetIterator(c->bpop.keys);
/* The client may wait for multiple keys, so unblock it for every key. */ /* The client may wait for multiple keys, so unblock it for every key. */
for (j = 0; j < c->bpop.count; j++) { while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
/* Remove this client from the list of clients waiting for this key. */ /* Remove this client from the list of clients waiting for this key. */
de = dictFind(c->db->blocking_keys,c->bpop.keys[j]); l = dictFetchValue(c->db->blocking_keys,key);
redisAssertWithInfo(c,c->bpop.keys[j],de != NULL); redisAssertWithInfo(c,key,l != NULL);
l = dictGetVal(de);
listDelNode(l,listSearchKey(l,c)); listDelNode(l,listSearchKey(l,c));
/* If the list is empty we need to remove it to avoid wasting memory */ /* If the list is empty we need to remove it to avoid wasting memory */
if (listLength(l) == 0) if (listLength(l) == 0)
dictDelete(c->db->blocking_keys,c->bpop.keys[j]); dictDelete(c->db->blocking_keys,key);
decrRefCount(c->bpop.keys[j]);
} }
dictReleaseIterator(di);
/* Cleanup the client structure */ /* Cleanup the client structure */
zfree(c->bpop.keys); dictEmpty(c->bpop.keys);
c->bpop.keys = NULL; if (c->bpop.target) {
if (c->bpop.target) decrRefCount(c->bpop.target); decrRefCount(c->bpop.target);
c->bpop.target = NULL; c->bpop.target = NULL;
}
c->flags &= ~REDIS_BLOCKED; c->flags &= ~REDIS_BLOCKED;
c->flags |= REDIS_UNBLOCKED; c->flags |= REDIS_UNBLOCKED;
server.bpop_blocked_clients--; server.bpop_blocked_clients--;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册