diff --git a/src/cluster.c b/src/cluster.c index 33a267589ff945d589c64d508323de8d41e9183f..3d758276ad9739ff36d9c0221f685521dd53d7c0 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1595,15 +1595,27 @@ void restoreCommand(redisClient *c) { server.dirty++; } -/* MIGRATE host port key dbid timeout */ +/* MIGRATE host port key dbid timeout [COPY | REPLACE] */ void migrateCommand(redisClient *c) { - int fd; + int fd, copy = 0, replace = 0, j; long timeout; long dbid; long long ttl = 0, expireat; robj *o; rio cmd, payload; + /* Parse additional options */ + for (j = 6; j < c->argc; j++) { + if (!strcasecmp(c->argv[j]->ptr,"copy")) { + copy = 1; + } else if (!strcasecmp(c->argv[j]->ptr,"replace")) { + replace = 1; + } else { + addReply(c,shared.syntaxerr); + return; + } + } + /* Sanity check */ if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK) return; @@ -1643,19 +1655,24 @@ void migrateCommand(redisClient *c) { ttl = expireat-mstime(); if (ttl < 1) ttl = 1; } - redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4)); + redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW); redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr))); redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); - /* Finally the last argument that is the serailized object payload - * in the DUMP format. */ + /* Emit the payload argument, that is the serailized object using + * the DUMP format. */ createDumpPayload(&payload,o); redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr, sdslen(payload.io.buffer.ptr))); sdsfree(payload.io.buffer.ptr); + /* Add the REPLACE option to the RESTORE command if it was specified + * as a MIGRATE option. */ + if (replace) + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); + /* Tranfer the query to the other node in 64K chunks. */ { sds buf = cmd.io.buffer.ptr; @@ -1686,8 +1703,11 @@ void migrateCommand(redisClient *c) { } else { robj *aux; - dbDelete(c->db,c->argv[3]); - signalModifiedKey(c->db,c->argv[3]); + if (!copy) { + /* No COPY option: remove the local key, signal the change. */ + dbDelete(c->db,c->argv[3]); + signalModifiedKey(c->db,c->argv[3]); + } addReply(c,shared.ok); server.dirty++; diff --git a/src/redis.c b/src/redis.c index 0780e8876b4f5bbb2e618d1de50709106f8988f6..1dcca1fd25fb57c6df8ef6c62550d376dec3e3a8 100644 --- a/src/redis.c +++ b/src/redis.c @@ -241,7 +241,7 @@ struct redisCommand redisCommandTable[] = { {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0}, {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0}, {"restore",restoreCommand,-4,"awm",0,NULL,1,1,1,0,0}, - {"migrate",migrateCommand,6,"aw",0,NULL,0,0,0,0,0}, + {"migrate",migrateCommand,-6,"aw",0,NULL,0,0,0,0,0}, {"asking",askingCommand,1,"r",0,NULL,0,0,0,0,0}, {"dump",dumpCommand,2,"ar",0,NULL,1,1,1,0,0}, {"object",objectCommand,-2,"r",0,NULL,2,2,2,0,0}, diff --git a/tests/unit/dump.tcl b/tests/unit/dump.tcl index 1eb91eb21842571067e84474cbd817d9d94efe96..202098da2b5ef8a473952b0d378aeb40026b06d9 100644 --- a/tests/unit/dump.tcl +++ b/tests/unit/dump.tcl @@ -62,6 +62,47 @@ start_server {tags {"dump"}} { } } + test {MIGRATE is able to copy a key between two instances} { + set first [srv 0 client] + r del list + r lpush list a b c d + start_server {tags {"repl"}} { + set second [srv 0 client] + set second_host [srv 0 host] + set second_port [srv 0 port] + + assert {[$first exists list] == 1} + assert {[$second exists list] == 0} + set ret [r -1 migrate $second_host $second_port list 9 5000 copy] + assert {$ret eq {OK}} + assert {[$first exists list] == 1} + assert {[$second exists list] == 1} + assert {[$first lrange list 0 -1] eq [$second lrange list 0 -1]} + } + } + + test {MIGRATE will not overwrite existing keys, unless REPLACE is used} { + set first [srv 0 client] + r del list + r lpush list a b c d + start_server {tags {"repl"}} { + set second [srv 0 client] + set second_host [srv 0 host] + set second_port [srv 0 port] + + assert {[$first exists list] == 1} + assert {[$second exists list] == 0} + $second set list somevalue + catch {r -1 migrate $second_host $second_port list 9 5000 copy} e + assert_match {ERR*} $e + set res [r -1 migrate $second_host $second_port list 9 5000 copy replace] + assert {$ret eq {OK}} + assert {[$first exists list] == 1} + assert {[$second exists list] == 1} + assert {[$first lrange list 0 -1] eq [$second lrange list 0 -1]} + } + } + test {MIGRATE propagates TTL correctly} { set first [srv 0 client] r set key "Some Value"