diff --git a/src/cluster.c b/src/cluster.c index 6e3fc8b004a59ad1fcc67d4f8ef3041bfec4449d..4d7b0502d63ba5a7c9d40b3840e4d579c74dd019 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4756,6 +4756,7 @@ void migrateCommand(client *c) { rio cmd, payload; int may_retry = 1; int write_error = 0; + int argv_rewritten = 0; /* To support the KEYS option we need the following additional state. */ int first_key = 3; /* Argument index of the first key. */ @@ -4939,12 +4940,20 @@ try_again: goto socket_err; /* A retry is guaranteed because of tested conditions.*/ } + /* On socket errors, close the migration socket now that we still have + * the original host/port in the ARGV. Later the original command may be + * rewritten to DEL and will be too later. */ + if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]); + if (!copy) { - /* Translate MIGRATE as DEL for replication/AOF. */ + /* Translate MIGRATE as DEL for replication/AOF. Note that we do + * this only for the keys for which we received an acknowledgement + * from the receiving Redis server, by using the del_idx index. */ if (del_idx > 1) { newargv[0] = createStringObject("DEL",3); /* Note that the following call takes ownership of newargv. */ replaceClientCommandVector(c,del_idx,newargv); + argv_rewritten = 1; } else { /* No key transfer acknowledged, no need to rewrite as DEL. */ zfree(newargv); @@ -4953,8 +4962,8 @@ try_again: } /* If we are here and a socket error happened, we don't want to retry. - * Just signal the problem to the client, but only do it if we don't - * already queued a different error reported by the destination server. */ + * Just signal the problem to the client, but only do it if we did not + * already queue a different error reported by the destination server. */ if (!error_from_target && socket_error) { may_retry = 0; goto socket_err; @@ -4962,7 +4971,11 @@ try_again: if (!error_from_target) { /* Success! Update the last_dbid in migrateCachedSocket, so that we can - * avoid SELECT the next time if the target DB is the same. Reply +OK. */ + * avoid SELECT the next time if the target DB is the same. Reply +OK. + * + * Note: If we reached this point, even if socket_error is true + * still the SELECT command succeeded (otherwise the code jumps to + * socket_err label. */ cs->last_dbid = dbid; addReply(c,shared.ok); } else { @@ -4972,7 +4985,6 @@ try_again: sdsfree(cmd.io.buffer.ptr); zfree(ov); zfree(kv); zfree(newargv); - if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]); return; /* On socket errors we try to close the cached socket and try again. @@ -4982,7 +4994,12 @@ socket_err: /* Cleanup we want to perform in both the retry and no retry case. * Note: Closing the migrate socket will also force SELECT next time. */ sdsfree(cmd.io.buffer.ptr); - migrateCloseSocket(c->argv[1],c->argv[2]); + + /* If the command was rewritten as DEL and there was a socket error, + * we already closed the socket earlier. While migrateCloseSocket() + * is idempotent, the host/port arguments are now gone, so don't do it + * again. */ + if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]); zfree(newargv); newargv = NULL; /* This will get reallocated on retry. */