提交 3d139127 编写于 作者: A antirez

Safer handling of MULTI/EXEC on errors.

After the transcation starts with a MULIT, the previous behavior was to
return an error on problems such as maxmemory limit reached. But still
to execute the transaction with the subset of queued commands on EXEC.

While it is true that the client was able to check for errors
distinguish QUEUED by an error reply, MULTI/EXEC in most client
implementations uses pipelining for speed, so all the commands and EXEC
are sent without caring about replies.

With this change:

1) EXEC fails if at least one command was not queued because of an
error. The EXECABORT error is used.
2) A generic error is always reported on EXEC.
3) The client DISCARDs the MULTI state after a failed EXEC, otherwise
pipelining multiple transactions would be basically impossible:
After a failed EXEC the next transaction would be simply queued as
the tail of the previous transaction.
上级 75369917
......@@ -72,10 +72,17 @@ void queueMultiCommand(redisClient *c) {
void discardTransaction(redisClient *c) {
freeClientMultiState(c);
initClientMultiState(c);
c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);;
c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);;
unwatchAllKeys(c);
}
/* Flag the transacation as DIRTY_EXEC so that EXEC will fail.
* Should be called every time there is an error while queueing a command. */
void flagTransaction(redisClient *c) {
if (c->flags & REDIS_MULTI)
c->flags |= REDIS_DIRTY_EXEC;
}
void multiCommand(redisClient *c) {
if (c->flags & REDIS_MULTI) {
addReplyError(c,"MULTI calls can not be nested");
......@@ -117,14 +124,19 @@ void execCommand(redisClient *c) {
return;
}
/* Check if we need to abort the EXEC if some WATCHed key was touched.
* A failed EXEC will return a multi bulk nil object. */
if (c->flags & REDIS_DIRTY_CAS) {
/* Check if we need to abort the EXEC because:
* 1) Some WATCHed key was touched.
* 2) There was a previous error while queueing commands.
* A failed EXEC in the first case returns a multi bulk nil object
* (technically it is not an error but a special behavior), while
* in the second an EXECABORT error is returned. */
if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {
addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
shared.nullmultibulk);
freeClientMultiState(c);
initClientMultiState(c);
c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);
c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);
unwatchAllKeys(c);
addReply(c,shared.nullmultibulk);
goto handle_monitor;
}
......@@ -156,7 +168,7 @@ void execCommand(redisClient *c) {
c->cmd = orig_cmd;
freeClientMultiState(c);
initClientMultiState(c);
c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);
c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);
/* Make sure the EXEC command is always replicated / AOF, since we
* always send the MULTI command (we can't know beforehand if the
* next operations will contain at least a modification to the DB). */
......
......@@ -1070,6 +1070,8 @@ void createSharedObjects(void) {
"-READONLY You can't write against a read only slave.\r\n"));
shared.oomerr = createObject(REDIS_STRING,sdsnew(
"-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
shared.execaborterr = createObject(REDIS_STRING,sdsnew(
"-EXECABORT Transaction discarded because of previous errors.\r\n"));
shared.space = createObject(REDIS_STRING,sdsnew(" "));
shared.colon = createObject(REDIS_STRING,sdsnew(":"));
shared.plus = createObject(REDIS_STRING,sdsnew("+"));
......@@ -1590,11 +1592,13 @@ int processCommand(redisClient *c) {
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
flagTransaction(c);
addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[0]->ptr);
return REDIS_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return REDIS_OK;
......@@ -1603,6 +1607,7 @@ int processCommand(redisClient *c) {
/* Check if the user is authenticated */
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
{
flagTransaction(c);
addReplyError(c,"operation not permitted");
return REDIS_OK;
}
......@@ -1638,6 +1643,7 @@ int processCommand(redisClient *c) {
if (server.maxmemory) {
int retval = freeMemoryIfNeeded();
if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
flagTransaction(c);
addReply(c, shared.oomerr);
return REDIS_OK;
}
......@@ -1649,6 +1655,7 @@ int processCommand(redisClient *c) {
&& server.lastbgsave_status == REDIS_ERR &&
c->cmd->flags & REDIS_CMD_WRITE)
{
flagTransaction(c);
addReply(c, shared.bgsaveerr);
return REDIS_OK;
}
......@@ -1680,6 +1687,7 @@ int processCommand(redisClient *c) {
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & REDIS_CMD_STALE))
{
flagTransaction(c);
addReply(c, shared.masterdownerr);
return REDIS_OK;
}
......@@ -1701,6 +1709,7 @@ int processCommand(redisClient *c) {
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
{
flagTransaction(c);
addReply(c, shared.slowscripterr);
return REDIS_OK;
}
......
......@@ -169,19 +169,20 @@
#define REDIS_AOF_WAIT_REWRITE 2 /* AOF waits rewrite to start appending */
/* Client flags */
#define REDIS_SLAVE 1 /* This client is a slave server */
#define REDIS_MASTER 2 /* This client is a master server */
#define REDIS_MONITOR 4 /* This client is a slave monitor, see MONITOR */
#define REDIS_MULTI 8 /* This client is in a MULTI context */
#define REDIS_BLOCKED 16 /* The client is waiting in a blocking operation */
#define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */
#define REDIS_CLOSE_AFTER_REPLY 128 /* Close after writing entire reply. */
#define REDIS_UNBLOCKED 256 /* This client was unblocked and is stored in
server.unblocked_clients */
#define REDIS_LUA_CLIENT 512 /* This is a non connected client used by Lua */
#define REDIS_ASKING 1024 /* Client issued the ASKING command */
#define REDIS_CLOSE_ASAP 2048 /* Close this client ASAP */
#define REDIS_UNIX_SOCKET 4096 /* Client connected via Unix domain socket */
#define REDIS_SLAVE (1<<0) /* This client is a slave server */
#define REDIS_MASTER (1<<1) /* This client is a master server */
#define REDIS_MONITOR (1<<2) /* This client is a slave monitor, see MONITOR */
#define REDIS_MULTI (1<<3) /* This client is in a MULTI context */
#define REDIS_BLOCKED (1<<4) /* The client is waiting in a blocking operation */
#define REDIS_DIRTY_CAS (1<<5) /* Watched keys modified. EXEC will fail. */
#define REDIS_CLOSE_AFTER_REPLY (1<<6) /* Close after writing entire reply. */
#define REDIS_UNBLOCKED (1<<7) /* This client was unblocked and is stored in
server.unblocked_clients */
#define REDIS_LUA_CLIENT (1<<8) /* This is a non connected client used by Lua */
#define REDIS_ASKING (1<<9) /* Client issued the ASKING command */
#define REDIS_CLOSE_ASAP (1<<10)/* Close this client ASAP */
#define REDIS_UNIX_SOCKET (1<<11) /* Client connected via Unix domain socket */
#define REDIS_DIRTY_EXEC (1<<12) /* EXEC will fail for errors while queueing */
/* Client request types */
#define REDIS_REQ_INLINE 1
......@@ -425,7 +426,7 @@ struct sharedObjectsStruct {
*colon, *nullbulk, *nullmultibulk, *queued,
*emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
*outofrangeerr, *noscripterr, *loadingerr, *slowscripterr, *bgsaveerr,
*masterdownerr, *roslaveerr,
*masterdownerr, *roslaveerr, *execaborterr,
*oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop,
*lpush,
......@@ -980,6 +981,7 @@ void queueMultiCommand(redisClient *c);
void touchWatchedKey(redisDb *db, robj *key);
void touchWatchedKeysOnFlush(int dbid);
void discardTransaction(redisClient *c);
void flagTransaction(redisClient *c);
/* Redis object implementation */
void decrRefCount(void *o);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册