提交 6d1a7cec 编写于 作者: A artix

Cluster Manager: rebalance command

上级 adebee8a
......@@ -71,6 +71,7 @@
#define CLUSTER_MANAGER_SLOTS 16384
#define CLUSTER_MANAGER_MIGRATE_TIMEOUT 60000
#define CLUSTER_MANAGER_MIGRATE_PIPELINE 10
#define CLUSTER_MANAGER_REBALANCE_THRESHOLD 2
#define CLUSTER_MANAGER_INVALID_HOST_ARG \
"Invalid arguments: you need to pass either a valid " \
......@@ -108,10 +109,13 @@
#define CLUSTER_MANAGER_FLAG_DISCONNECT 1 << 4
#define CLUSTER_MANAGER_FLAG_FAIL 1 << 5
#define CLUSTER_MANAGER_CMD_FLAG_FIX 1 << 0
#define CLUSTER_MANAGER_CMD_FLAG_SLAVE 1 << 1
#define CLUSTER_MANAGER_CMD_FLAG_YES 1 << 2
#define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 7
#define CLUSTER_MANAGER_CMD_FLAG_FIX 1 << 0
#define CLUSTER_MANAGER_CMD_FLAG_SLAVE 1 << 1
#define CLUSTER_MANAGER_CMD_FLAG_YES 1 << 2
#define CLUSTER_MANAGER_CMD_FLAG_AUTOWEIGHTS 1 << 3
#define CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER 1 << 4
#define CLUSTER_MANAGER_CMD_FLAG_SIMULATE 1 << 5
#define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 7
#define CLUSTER_MANAGER_OPT_GETFRIENDS 1 << 0
#define CLUSTER_MANAGER_OPT_COLD 1 << 1
......@@ -157,9 +161,12 @@ typedef struct clusterManagerCommand {
int replicas;
char *from;
char *to;
char **weight;
int weight_argc;
int slots;
int timeout;
int pipeline;
float threshold;
} clusterManagerCommand;
static void createClusterManagerCommand(char *cmdname, int argc, char **argv);
......@@ -206,6 +213,7 @@ static struct config {
int eval_ldb_end; /* Lua debugging session ended. */
int enable_ldb_on_eval; /* Handle manual SCRIPT DEBUG + EVAL commands. */
int last_cmd_type;
int verbose;
clusterManagerCommand cluster_manager_command;
} config;
......@@ -1266,6 +1274,8 @@ static int parseOptions(int argc, char **argv) {
} else if (!strcmp(argv[i],"-d") && !lastarg) {
sdsfree(config.mb_delim);
config.mb_delim = sdsnew(argv[++i]);
} else if (!strcmp(argv[i],"--verbose")) {
config.verbose = 1;
} else if (!strcmp(argv[i],"--cluster") && !lastarg) {
if (CLUSTER_MANAGER_MODE()) usage();
char *cmd = argv[++i];
......@@ -1282,15 +1292,35 @@ static int parseOptions(int argc, char **argv) {
config.cluster_manager_command.from = argv[++i];
} else if (!strcmp(argv[i],"--cluster-to") && !lastarg) {
config.cluster_manager_command.to = argv[++i];
} else if (!strcmp(argv[i],"--cluster-weight") && !lastarg) {
int widx = i + 1;
char **weight = argv + widx;
int wargc = 0;
for (; widx < argc; widx++) {
if (strstr(argv[widx], "--") == argv[widx]) break;
wargc++;
}
if (wargc > 0) {
config.cluster_manager_command.weight = weight;
config.cluster_manager_command.weight_argc = wargc;
}
} else if (!strcmp(argv[i],"--cluster-slots") && !lastarg) {
config.cluster_manager_command.slots = atoi(argv[++i]);
} else if (!strcmp(argv[i],"--cluster-timeout") && !lastarg) {
config.cluster_manager_command.timeout = atoi(argv[++i]);
} else if (!strcmp(argv[i],"--cluster-pipeline") && !lastarg) {
config.cluster_manager_command.pipeline = atoi(argv[++i]);
} else if (!strcmp(argv[i],"--cluster-threshold") && !lastarg) {
config.cluster_manager_command.threshold = atof(argv[++i]);
} else if (!strcmp(argv[i],"--cluster-yes")) {
config.cluster_manager_command.flags |=
CLUSTER_MANAGER_CMD_FLAG_YES;
} else if (!strcmp(argv[i],"--cluster-simulate")) {
config.cluster_manager_command.flags |=
CLUSTER_MANAGER_CMD_FLAG_SIMULATE;
} else if (!strcmp(argv[i],"--cluster-use-empty-masters")) {
config.cluster_manager_command.flags |=
CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER;
} else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) {
sds version = cliVersion();
printf("redis-cli %s\n", version);
......@@ -1390,6 +1420,7 @@ static void usage(void) {
" are not rolled back from the server memory.\n"
" --cluster <command> [args...] [opts...]\n"
" Cluster Manager command and arguments (see below).\n"
" --verbose Verbose mode.\n"
" --help Output this help and exit.\n"
" --version Output version and exit.\n"
"\n"
......@@ -1749,6 +1780,8 @@ typedef struct clusterManagerNode {
sds *importing;
int migrating_count;
int importing_count;
float weight; /* Weight used by rebalance */
int balance; /* Used by rebalance */
} clusterManagerNode;
/* Data structure used to represent a sequence of nodes. */
......@@ -1780,6 +1813,7 @@ typedef int clusterManagerCommandProc(int argc, char **argv);
static clusterManagerNode *clusterManagerNewNode(char *ip, int port);
static clusterManagerNode *clusterManagerNodeByName(const char *name);
static clusterManagerNode *clusterManagerNodeByAbbreviatedName(const char *n);
static void clusterManagerNodeResetSlots(clusterManagerNode *node);
static int clusterManagerNodeIsCluster(clusterManagerNode *node, char **err);
static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
......@@ -1813,6 +1847,7 @@ static int clusterManagerCommandCreate(int argc, char **argv);
static int clusterManagerCommandInfo(int argc, char **argv);
static int clusterManagerCommandCheck(int argc, char **argv);
static int clusterManagerCommandReshard(int argc, char **argv);
static int clusterManagerCommandRebalance(int argc, char **argv);
static int clusterManagerCommandCall(int argc, char **argv);
static int clusterManagerCommandHelp(int argc, char **argv);
......@@ -1831,6 +1866,9 @@ clusterManagerCommandDef clusterManagerCommands[] = {
{"info", clusterManagerCommandInfo, -1, "host:port", NULL},
{"reshard", clusterManagerCommandReshard, -1, "host:port",
"from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>"},
{"rebalance", clusterManagerCommandRebalance, -1, "host:port",
"weight <node1=w1...nodeN=wN>,use-empty-masters,"
"timeout <arg>,simulate,pipeline <arg>,threshold <arg>"},
{"call", clusterManagerCommandCall, -2,
"host:port command arg arg .. arg", NULL},
{"help", clusterManagerCommandHelp, 0, NULL, NULL}
......@@ -1970,10 +2008,13 @@ static clusterManagerNode *clusterManagerNewNode(char *ip, int port) {
node->migrating_count = 0;
node->importing_count = 0;
node->replicas_count = 0;
node->weight = 1.0f;
node->balance = 0;
clusterManagerNodeResetSlots(node);
return node;
}
/* Return the node with the specified ID or NULL. */
static clusterManagerNode *clusterManagerNodeByName(const char *name) {
if (cluster_manager.nodes == NULL) return NULL;
clusterManagerNode *found = NULL;
......@@ -1994,6 +2035,32 @@ static clusterManagerNode *clusterManagerNodeByName(const char *name) {
return found;
}
/* Like get_node_by_name but the specified name can be just the first
* part of the node ID as long as the prefix in unique across the
* cluster.
*/
static clusterManagerNode *clusterManagerNodeByAbbreviatedName(const char*name)
{
if (cluster_manager.nodes == NULL) return NULL;
clusterManagerNode *found = NULL;
sds lcname = sdsempty();
lcname = sdscpy(lcname, name);
sdstolower(lcname);
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->name &&
strstr(n->name, lcname) == n->name) {
found = n;
break;
}
}
sdsfree(lcname);
return found;
}
static void clusterManagerNodeResetSlots(clusterManagerNode *node) {
memset(node->slots, 0, sizeof(node->slots));
node->slots_count = 0;
......@@ -2898,6 +2965,12 @@ int clusterManagerSlotCountCompareDesc(const void *n1, const void *n2) {
return node2->slots_count - node1->slots_count;
}
int clusterManagerCompareNodeBalance(const void *n1, const void *n2) {
clusterManagerNode *node1 = *((clusterManagerNode **) n1);
clusterManagerNode *node2 = *((clusterManagerNode **) n2);
return node1->balance - node2->balance;
}
static sds clusterManagerGetConfigSignature(clusterManagerNode *node) {
sds signature = NULL;
int node_count = 0, i = 0, name_len = 0;
......@@ -3200,6 +3273,19 @@ static void clusterManagerShowReshardTable(list *table) {
}
}
static void clusterManagerReleaseReshardTable(list *table) {
if (table != NULL) {
listIter li;
listNode *ln;
listRewind(table, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerReshardTableItem *item = ln->value;
zfree(item);
}
listRelease(table);
}
}
static void clusterManagerLog(int level, const char* fmt, ...) {
int use_colors =
(config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_COLOR);
......@@ -3775,14 +3861,199 @@ static int clusterManagerCommandReshard(int argc, char **argv) {
}
cleanup:
listRelease(sources);
if (table) {
listRewind(table, &li);
clusterManagerReleaseReshardTable(table);
return result;
invalid_args:
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
return 0;
}
static int clusterManagerCommandRebalance(int argc, char **argv) {
int port = 0;
char *ip = NULL;
clusterManagerNode **weightedNodes = NULL;
list *involved = NULL;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port);
if (!clusterManagerLoadInfoFromNode(node, 0)) return 0;
int result = 1, i;
if (config.cluster_manager_command.weight != NULL) {
for (i = 0; i < config.cluster_manager_command.weight_argc; i++) {
char *name = config.cluster_manager_command.weight[i];
char *p = strchr(name, '=');
if (p == NULL) {
result = 0;
goto cleanup;
}
*p = '\0';
float w = atof(++p);
clusterManagerNode *n = clusterManagerNodeByAbbreviatedName(name);
if (n == NULL) {
clusterManagerLogErr("*** No such master node %s\n", name);
result = 0;
goto cleanup;
}
n->weight = w;
}
}
float total_weight = 0;
int nodes_involved = 0;
int use_empty = config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER;
involved = listCreate();
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
/* Compute the total cluster weight. */
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate)
continue;
if (!use_empty && n->slots_count == 0) {
n->weight = 0;
continue;
}
total_weight += n->weight;
nodes_involved++;
listAddNodeTail(involved, n);
}
weightedNodes = zmalloc(nodes_involved *
sizeof(clusterManagerNode *));
if (weightedNodes == NULL) goto cleanup;
/* Check cluster, only proceed if it looks sane. */
clusterManagerCheckCluster(1);
if (cluster_manager.errors && listLength(cluster_manager.errors) > 0) {
clusterManagerLogErr("*** Please fix your cluster problems "
"before rebalancing" );
result = 0;
goto cleanup;
}
/* Calculate the slots balance for each node. It's the number of
* slots the node should lose (if positive) or gain (if negative)
* in order to be balanced. */
int threshold_reached = 0, total_balance = 0;
float threshold = config.cluster_manager_command.threshold;
i = 0;
listRewind(involved, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
weightedNodes[i++] = n;
int expected = (((float)CLUSTER_MANAGER_SLOTS / total_weight) *
(int) n->weight);
n->balance = n->slots_count - expected;
total_balance += n->balance;
/* Compute the percentage of difference between the
* expected number of slots and the real one, to see
* if it's over the threshold specified by the user. */
int over_threshold = 0;
if (config.cluster_manager_command.threshold > 0) {
if (n->slots_count > 0) {
float err_perc = fabs((100-(100.0*expected/n->slots_count)));
if (err_perc > threshold) over_threshold = 1;
} else if (expected > 1) {
over_threshold = 1;
}
}
if (over_threshold) threshold_reached = 1;
}
if (!threshold_reached) {
clusterManagerLogErr("*** No rebalancing needed! "
"All nodes are within the %.2f%% threshold.\n",
config.cluster_manager_command.threshold);
result = 0;
goto cleanup;
}
/* Because of rounding, it is possible that the balance of all nodes
* summed does not give 0. Make sure that nodes that have to provide
* slots are always matched by nodes receiving slots. */
while (total_balance > 0) {
listRewind(involved, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerReshardTableItem *item = ln->value;
zfree(item);
clusterManagerNode *n = ln->value;
if (n->balance < 0 && total_balance > 0) {
n->balance--;
total_balance--;
}
}
listRelease(table);
}
/* Sort nodes by their slots balance. */
qsort(weightedNodes, nodes_involved, sizeof(clusterManagerNode *),
clusterManagerCompareNodeBalance);
clusterManagerLogInfo(">>> Rebalancing across %d nodes. "
"Total weight = %.2f\n",
nodes_involved, total_weight);
if (config.verbose) {
for (i = 0; i < nodes_involved; i++) {
clusterManagerNode *n = weightedNodes[i];
printf("%s:%d balance is %d slots\n", n->ip, n->port, n->balance);
}
}
/* Now we have at the start of the 'sn' array nodes that should get
* slots, at the end nodes that must give slots.
* We take two indexes, one at the start, and one at the end,
* incrementing or decrementing the indexes accordingly til we
* find nodes that need to get/provide slots. */
int dst_idx = 0;
int src_idx = nodes_involved - 1;
int simulate = config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_SIMULATE;
while (dst_idx < src_idx) {
clusterManagerNode *dst = weightedNodes[dst_idx];
clusterManagerNode *src = weightedNodes[src_idx];
int db = abs(dst->balance);
int sb = abs(src->balance);
int numslots = (db < sb ? db : sb);
if (numslots > 0) {
printf("Moving %d slots from %s:%d to %s:%d\n", numslots,
src->ip,
src->port,
dst->ip,
dst->port);
/* Actaully move the slots. */
list *lsrc = listCreate(), *table = NULL;
listAddNodeTail(lsrc, src);
table = clusterManagerComputeReshardTable(lsrc, numslots);
listRelease(lsrc);
int table_len = (int) listLength(table);
if (!table || table_len != numslots) {
clusterManagerLogErr("*** Assertio failed: Reshard table "
"!= number of slots");
result = 0;
goto end_move;
}
if (simulate) {
for (i = 0; i < table_len; i++) printf("#");
} else {
int opts = CLUSTER_MANAGER_OPT_QUIET |
CLUSTER_MANAGER_OPT_UPDATE;
listRewind(table, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerReshardTableItem *item = ln->value;
result = clusterManagerMoveSlot(item->source,
dst,
item->slot,
opts, NULL);
if (!result) goto end_move;
printf("#");
fflush(stdout);
}
}
printf("\n");
end_move:
clusterManagerReleaseReshardTable(table);
if (!result) goto cleanup;
}
/* Update nodes balance. */
dst->balance += numslots;
src->balance -= numslots;
if (dst->balance == 0) dst_idx++;
if (src->balance == 0) src_idx --;
}
cleanup:
if (involved != NULL) listRelease(involved);
if (weightedNodes != NULL) zfree(weightedNodes);
return result;
invalid_args:
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
......@@ -5169,6 +5440,7 @@ int main(int argc, char **argv) {
config.eval_ldb_sync = 0;
config.enable_ldb_on_eval = 0;
config.last_cmd_type = -1;
config.verbose = 0;
config.cluster_manager_command.name = NULL;
config.cluster_manager_command.argc = 0;
config.cluster_manager_command.argv = NULL;
......@@ -5176,9 +5448,12 @@ int main(int argc, char **argv) {
config.cluster_manager_command.replicas = 0;
config.cluster_manager_command.from = NULL;
config.cluster_manager_command.to = NULL;
config.cluster_manager_command.weight = NULL;
config.cluster_manager_command.slots = 0;
config.cluster_manager_command.timeout = CLUSTER_MANAGER_MIGRATE_TIMEOUT;
config.cluster_manager_command.pipeline = CLUSTER_MANAGER_MIGRATE_PIPELINE;
config.cluster_manager_command.threshold =
CLUSTER_MANAGER_REBALANCE_THRESHOLD;
pref.hints = 1;
spectrum_palette = spectrum_palette_color;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册