提交 37ab76c9 编写于 作者: A antirez

WATCH for MULTI/EXEC (CAS alike concurrency)

上级 75a190ca
......@@ -190,6 +190,7 @@ static char* strencoding[] = {
#define REDIS_MULTI 8 /* This client is in a MULTI context */
#define REDIS_BLOCKED 16 /* The client is waiting in a blocking operation */
#define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */
#define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */
/* Slave replication state - slave side */
#define REDIS_REPL_NONE 0 /* No active replication */
......@@ -285,8 +286,9 @@ typedef struct redisObject {
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
dict *io_keys; /* Keys with clients waiting for VM I/O */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id;
} redisDb;
......@@ -324,13 +326,14 @@ typedef struct redisClient {
long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
multiState mstate; /* MULTI/EXEC state */
robj **blockingkeys; /* The key we are waiting to terminate a blocking
robj **blocking_keys; /* The key we are waiting to terminate a blocking
* operation such as BLPOP. Otherwise NULL. */
int blockingkeysnum; /* Number of blocking keys */
int blocking_keys_num; /* Number of blocking keys */
time_t blockingto; /* Blocking operation timeout. If UNIX current time
* is >= blockingto then the operation timed out. */
list *io_keys; /* Keys this client is waiting to be loaded from the
* swap file in order to continue. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
} redisClient;
......@@ -633,6 +636,8 @@ static void usage();
static int rewriteAppendOnlyFileBackground(void);
static int vmSwapObjectBlocking(robj *key, robj *val);
static int prepareForShutdown();
static void touchWatchedKey(redisDb *db, robj *key);
static void unwatchAllKeys(redisClient *c);
static void authCommand(redisClient *c);
static void pingCommand(redisClient *c);
......@@ -739,6 +744,8 @@ static void unsubscribeCommand(redisClient *c);
static void psubscribeCommand(redisClient *c);
static void punsubscribeCommand(redisClient *c);
static void publishCommand(redisClient *c);
static void watchCommand(redisClient *c);
static void unwatchCommand(redisClient *c);
/*================================= Globals ================================= */
......@@ -851,6 +858,8 @@ static struct redisCommand cmdTable[] = {
{"psubscribe",psubscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0},
{"publish",publishCommand,3,REDIS_CMD_BULK|REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0},
{"watch",watchCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
{"unwatch",unwatchCommand,1,REDIS_CMD_INLINE,NULL,0,0,0},
{NULL,NULL,0,0,NULL,0,0,0}
};
......@@ -1745,7 +1754,8 @@ static void initServer() {
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
if (server.vm_enabled)
server.db[j].io_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
......@@ -2011,6 +2021,9 @@ static void freeClient(redisClient *c) {
if (c->flags & REDIS_BLOCKED)
unblockClientWaitingData(c);
/* UNWATCH all the keys */
unwatchAllKeys(c);
listRelease(c->watched_keys);
/* Unsubscribe from all the pubsub channels */
pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeAllPatterns(c,0);
......@@ -2026,7 +2039,8 @@ static void freeClient(redisClient *c) {
ln = listSearchKey(server.clients,c);
redisAssert(ln != NULL);
listDelNode(server.clients,ln);
/* Remove from the list of clients waiting for swapped keys */
/* Remove from the list of clients that are now ready to be restarted
* after waiting for swapped keys */
if (c->flags & REDIS_IO_WAIT && listLength(c->io_keys) == 0) {
ln = listSearchKey(server.io_ready_clients,c);
if (ln) {
......@@ -2034,6 +2048,7 @@ static void freeClient(redisClient *c) {
server.vm_blocked_clients--;
}
}
/* Remove from the list of clients waiting for swapped keys */
while (server.vm_enabled && listLength(c->io_keys)) {
ln = listFirst(c->io_keys);
dontWaitForSwappedKey(c,ln->value);
......@@ -2725,8 +2740,8 @@ static redisClient *createClient(int fd) {
c->reply = listCreate();
listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue);
c->blockingkeys = NULL;
c->blockingkeysnum = 0;
c->blocking_keys = NULL;
c->blocking_keys_num = 0;
c->io_keys = listCreate();
listSetFreeMethod(c->io_keys,decrRefCount);
c->pubsub_channels = dictCreate(&setDictType,NULL);
......@@ -3096,6 +3111,7 @@ static robj *lookupKeyRead(redisDb *db, robj *key) {
static robj *lookupKeyWrite(redisDb *db, robj *key) {
deleteIfVolatile(db,key);
touchWatchedKey(db,key);
return lookupKey(db,key);
}
......@@ -4229,6 +4245,7 @@ static void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj
}
}
touchWatchedKey(c->db,key);
if (nx) deleteIfVolatile(c->db,key);
retval = dictAdd(c->db->dict,key,val);
if (retval == DICT_ERR) {
......@@ -4499,6 +4516,7 @@ static void delCommand(redisClient *c) {
for (j = 1; j < c->argc; j++) {
if (deleteKey(c->db,c->argv[j])) {
touchWatchedKey(c->db,c->argv[j]);
server.dirty++;
deleted++;
}
......@@ -7511,6 +7529,17 @@ static void execCommand(redisClient *c) {
return;
}
/* Check if we need to abort the EXEC if some WATCHed key was touched.
* A failed EXEC will return a multi bulk nil object. */
if (c->flags & REDIS_DIRTY_CAS) {
freeClientMultiState(c);
initClientMultiState(c);
c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);
unwatchAllKeys(c);
addReply(c,shared.nullmultibulk);
return;
}
/* Replicate a MULTI request now that we are sure the block is executed.
* This way we'll deliver the MULTI/..../EXEC block as a whole and
* both the AOF and the replication link will have the same consistency
......@@ -7531,6 +7560,7 @@ static void execCommand(redisClient *c) {
freeClientMultiState(c);
initClientMultiState(c);
c->flags &= (~REDIS_MULTI);
unwatchAllKeys(c);
/* Make sure the EXEC command is always replicated / AOF, since we
* always send the MULTI command (we can't know beforehand if the
* next operations will contain at least a modification to the DB). */
......@@ -7557,7 +7587,7 @@ static void execCommand(redisClient *c) {
* empty we need to block. In order to do so we remove the notification for
* new data to read in the client socket (so that we'll not serve new
* requests if the blocking request is not served). Also we put the client
* in a dictionary (db->blockingkeys) mapping keys to a list of clients
* in a dictionary (db->blocking_keys) mapping keys to a list of clients
* blocking for this keys.
* - If a PUSH operation against a key with blocked clients waiting is
* performed, we serve the first in the list: basically instead to push
......@@ -7575,22 +7605,22 @@ static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeou
list *l;
int j;
c->blockingkeys = zmalloc(sizeof(robj*)*numkeys);
c->blockingkeysnum = numkeys;
c->blocking_keys = zmalloc(sizeof(robj*)*numkeys);
c->blocking_keys_num = numkeys;
c->blockingto = timeout;
for (j = 0; j < numkeys; j++) {
/* Add the key in the client structure, to map clients -> keys */
c->blockingkeys[j] = keys[j];
c->blocking_keys[j] = keys[j];
incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */
de = dictFind(c->db->blockingkeys,keys[j]);
de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
int retval;
/* For every key we take a list of clients blocked for it */
l = listCreate();
retval = dictAdd(c->db->blockingkeys,keys[j],l);
retval = dictAdd(c->db->blocking_keys,keys[j],l);
incrRefCount(keys[j]);
assert(retval == DICT_OK);
} else {
......@@ -7609,22 +7639,22 @@ static void unblockClientWaitingData(redisClient *c) {
list *l;
int j;
assert(c->blockingkeys != NULL);
assert(c->blocking_keys != NULL);
/* The client may wait for multiple keys, so unblock it for every key. */
for (j = 0; j < c->blockingkeysnum; j++) {
for (j = 0; j < c->blocking_keys_num; j++) {
/* Remove this client from the list of clients waiting for this key. */
de = dictFind(c->db->blockingkeys,c->blockingkeys[j]);
de = dictFind(c->db->blocking_keys,c->blocking_keys[j]);
assert(de != NULL);
l = dictGetEntryVal(de);
listDelNode(l,listSearchKey(l,c));
/* If the list is empty we need to remove it to avoid wasting memory */
if (listLength(l) == 0)
dictDelete(c->db->blockingkeys,c->blockingkeys[j]);
decrRefCount(c->blockingkeys[j]);
dictDelete(c->db->blocking_keys,c->blocking_keys[j]);
decrRefCount(c->blocking_keys[j]);
}
/* Cleanup the client structure */
zfree(c->blockingkeys);
c->blockingkeys = NULL;
zfree(c->blocking_keys);
c->blocking_keys = NULL;
c->flags &= (~REDIS_BLOCKED);
server.blpop_blocked_clients--;
/* We want to process data if there is some command waiting
......@@ -7651,7 +7681,7 @@ static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
list *l;
listNode *ln;
de = dictFind(c->db->blockingkeys,key);
de = dictFind(c->db->blocking_keys,key);
if (de == NULL) return 0;
l = dictGetEntryVal(de);
ln = listFirst(l);
......@@ -10348,6 +10378,116 @@ static void publishCommand(redisClient *c) {
addReplyLongLong(c,receivers);
}
/* ===================== WATCH (CAS alike for MULTI/EXEC) ===================
*
* The implementation uses a per-DB hash table mapping keys to list of clients
* WATCHing those keys, so that given a key that is going to be modified
* we can mark all the associated clients as dirty.
*
* Also every client contains a list of WATCHed keys so that's possible to
* un-watch such keys when the client is freed or when UNWATCH is called. */
/* In the client->watched_keys list we need to use watchedKey structures
* as in order to identify a key in Redis we need both the key name and the
* DB */
typedef struct watchedKey {
robj *key;
redisDb *db;
} watchedKey;
/* Watch for the specified key */
static void watchForKey(redisClient *c, robj *key) {
list *clients = NULL;
listIter li;
listNode *ln;
watchedKey *wk;
/* Check if we are already watching for this key */
listRewind(c->watched_keys,&li);
while((ln = listNext(&li))) {
wk = listNodeValue(ln);
if (wk->db == c->db && equalStringObjects(key,wk->key))
return; /* Key already watched */
}
/* This key is not already watched in this DB. Let's add it */
clients = dictFetchValue(c->db->watched_keys,key);
if (!clients) {
clients = listCreate();
dictAdd(c->db->watched_keys,key,clients);
incrRefCount(key);
}
listAddNodeTail(clients,c);
/* Add the new key to the lits of keys watched by this client */
wk = zmalloc(sizeof(*wk));
wk->key = key;
wk->db = c->db;
incrRefCount(key);
listAddNodeTail(c->watched_keys,wk);
}
/* Unwatch all the keys watched by this client. To clean the EXEC dirty
* flag is up to the caller. */
static void unwatchAllKeys(redisClient *c) {
listIter li;
listNode *ln;
if (listLength(c->watched_keys) == 0) return;
listRewind(c->watched_keys,&li);
while((ln = listNext(&li))) {
list *clients;
watchedKey *wk;
/* Lookup the watched key -> clients list and remove the client
* from the list */
wk = listNodeValue(ln);
clients = dictFetchValue(wk->db->watched_keys, wk->key);
assert(clients != NULL);
listDelNode(clients,listSearchKey(clients,c));
/* Kill the entry at all if this was the only client */
if (listLength(clients) == 0)
dictDelete(wk->db->watched_keys, wk->key);
/* Remove this watched key from the client->watched list */
listDelNode(c->watched_keys,ln);
decrRefCount(wk->key);
zfree(wk);
}
}
/* "Touch" a key, so that if this key is being WATCHed by soem client the
* next EXEC will fail. */
static void touchWatchedKey(redisDb *db, robj *key) {
list *clients;
listIter li;
listNode *ln;
if (dictSize(db->watched_keys) == 0) return;
clients = dictFetchValue(db->watched_keys, key);
if (!clients) return;
/* Mark all the clients watching this key as REDIS_DIRTY_CAS */
/* Check if we are already watching for this key */
listRewind(clients,&li);
while((ln = listNext(&li))) {
redisClient *c = listNodeValue(ln);
c->flags |= REDIS_DIRTY_CAS;
}
}
static void watchCommand(redisClient *c) {
int j;
for (j = 1; j < c->argc; j++)
watchForKey(c,c->argv[j]);
addReply(c,shared.ok);
}
static void unwatchCommand(redisClient *c) {
unwatchAllKeys(c);
c->flags &= (~REDIS_DIRTY_CAS);
addReply(c,shared.ok);
}
/* ================================= Debugging ============================== */
/* Compute the sha1 of string at 's' with 'len' bytes long.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册