diff --git a/src/redis-cli.c b/src/redis-cli.c index a93bd9b10572f1a309a0664fa4da1ec8a4e78895..b0a12ebb9ee41a558edae6cd2315f1670852a472 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2934,6 +2934,68 @@ static int clusterManagerSetSlotOwner(clusterManagerNode *owner, return success; } +/* Get the hash for the values of the specified keys in *keys_reply for the + * specified nodes *n1 and *n2, by calling DEBUG DIGEST-VALUE redis command + * on both nodes. Every key with same name on both nodes but having different + * values will be added to the *diffs list. Return 0 in case of reply + * error. */ +static int clusterManagerCompareKeysValues(clusterManagerNode *n1, + clusterManagerNode *n2, + redisReply *keys_reply, + list *diffs) +{ + size_t i, argc = keys_reply->elements + 2; + static const char *hash_zero = "0000000000000000000000000000000000000000"; + char **argv = zcalloc(argc * sizeof(char *)); + size_t *argv_len = zcalloc(argc * sizeof(size_t)); + argv[0] = "DEBUG"; + argv_len[0] = 5; + argv[1] = "DIGEST-VALUE"; + argv_len[1] = 12; + for (i = 0; i < keys_reply->elements; i++) { + redisReply *entry = keys_reply->element[i]; + int idx = i + 2; + argv[idx] = entry->str; + argv_len[idx] = entry->len; + } + int success = 0; + void *_reply1 = NULL, *_reply2 = NULL; + redisReply *r1 = NULL, *r2 = NULL; + redisAppendCommandArgv(n1->context,argc, (const char**)argv,argv_len); + success = (redisGetReply(n1->context, &_reply1) == REDIS_OK); + if (!success) goto cleanup; + r1 = (redisReply *) _reply1; + redisAppendCommandArgv(n2->context,argc, (const char**)argv,argv_len); + success = (redisGetReply(n2->context, &_reply2) == REDIS_OK); + if (!success) goto cleanup; + r2 = (redisReply *) _reply2; + success = (r1->type != REDIS_REPLY_ERROR && r2->type != REDIS_REPLY_ERROR); + if (r1->type == REDIS_REPLY_ERROR) { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(n1, r1->str); + success = 0; + } + if (r2->type == REDIS_REPLY_ERROR) { + CLUSTER_MANAGER_PRINT_REPLY_ERROR(n2, r2->str); + success = 0; + } + if (!success) goto cleanup; + assert(keys_reply->elements == r1->elements && + keys_reply->elements == r2->elements); + for (i = 0; i < keys_reply->elements; i++) { + char *key = keys_reply->element[i]->str; + char *hash1 = r1->element[i]->str; + char *hash2 = r2->element[i]->str; + /* Ignore keys that don't exist in both nodes. */ + if (strcmp(hash1, hash_zero) == 0 || strcmp(hash2, hash_zero) == 0) + continue; + if (strcmp(hash1, hash2) != 0) listAddNodeTail(diffs, key); + } +cleanup: + if (r1) freeReplyObject(r1); + if (r2) freeReplyObject(r2); + return success; +} + /* Migrate keys taken from reply->elements. It returns the reply from the * MIGRATE command, or NULL if something goes wrong. If the argument 'dots' * is not NULL, a dot will be printed for every migrated key. */ @@ -3014,8 +3076,10 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, char **err) { int success = 1; - int retry = (config.cluster_manager_command.flags & - (CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE)); + int do_fix = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_FIX; + int do_replace = config.cluster_manager_command.flags & + CLUSTER_MANAGER_CMD_FLAG_REPLACE; while (1) { char *dots = NULL; redisReply *reply = NULL, *migrate_reply = NULL; @@ -3049,6 +3113,8 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL; int not_served = 0; if (!is_busy) { + /* Check if the slot is unassigned (not served) in the + * source node's configuration. */ char *get_owner_err = NULL; clusterManagerNode *served_by = clusterManagerGetSlotOwner(source, slot, &get_owner_err); @@ -3061,20 +3127,69 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, } } } - if (retry && (is_busy || not_served)) { - /* If the key already exists, try to migrate keys - * adding REPLACE option. - * If the key's slot is not served, try to assign slot + /* Try to handle errors. */ + if (is_busy || not_served) { + /* If the key's slot is not served, try to assign slot * to the target node. */ - if (not_served) { + if (do_fix && not_served) { clusterManagerLogWarn("*** Slot was not served, setting " "owner to node %s:%d.\n", target->ip, target->port); clusterManagerSetSlot(source, target, slot, "node", NULL); } + /* If the key already exists in the target node (BUSYKEY), + * check whether its value is the same in both nodes. + * In case of equal values, retry migration with the + * REPLACE option. + * In case of different values: + * - If the migration is requested by the fix command, stop + * and warn the user. + * - In other cases (ie. reshard), proceed only if the user + * launched the command with the --cluster-replace option.*/ if (is_busy) { - clusterManagerLogWarn("*** Target key exists. " - "Replacing it for FIX.\n"); + clusterManagerLogWarn("\n*** Target key exists, " + "checking values...\n"); + list *diffs = listCreate(); + success = clusterManagerCompareKeysValues(source, + target, reply, diffs); + if (!success && (do_fix || !do_replace)) { + listRelease(diffs); + clusterManagerLogErr("*** Value check failed!\n"); + goto next; + } + if (listLength(diffs) > 0 && (do_fix || !do_replace)) { + success = 0; + clusterManagerLogErr( + "*** Found %d key(s) in both source node and " + "target node having different values.\n" + " Source node: %s:%d\n" + " Target node: %s:%d\n" + " Keys(s):\n", + listLength(diffs), + source->ip, source->port, + target->ip, target->port); + listIter dli; + listNode *dln; + listRewind(diffs, &dli); + while((dln = listNext(&dli)) != NULL) { + char *k = dln->value; + clusterManagerLogErr(" - %s\n", k); + } + clusterManagerLogErr("Please fix the above key(s) " + "manually "); + if (do_fix) + clusterManagerLogErr("and try again!\n"); + else { + clusterManagerLogErr("or relaunch the command " + "with --cluster-replace " + "option to force key " + "overriding.\n"); + } + listRelease(diffs); + goto next; + } + listRelease(diffs); + clusterManagerLogWarn("*** Replacing target keys...\n"); } freeReplyObject(migrate_reply); migrate_reply = clusterManagerMigrateKeysInReply(source,