提交 513eb572 编写于 作者: A artix

Cluster Manager: auth support (-a argument).

上级 f3980bb9
......@@ -79,10 +79,8 @@
"and port (ie. 120.0.0.1 7000)\n"
#define CLUSTER_MANAGER_MODE() (config.cluster_manager_command.name != NULL)
#define CLUSTER_MANAGER_MASTERS_COUNT(nodes, replicas) (nodes/(replicas + 1))
#define CLUSTER_MANAGER_NODE_CONNECT(n) \
(n->context = redisConnect(n->ip, n->port));
#define CLUSTER_MANAGER_COMMAND(n,...) \
(reconnectingRedisCommand(n->context, __VA_ARGS__))
(redisCommand(n->context, __VA_ARGS__))
#define CLUSTER_MANAGER_NODE_ARRAY_FREE(array) zfree(array->alloc)
......@@ -2171,6 +2169,31 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n,
return 1;
}
static int clusterManagerNodeConnect(clusterManagerNode *node) {
if (node->context) redisFree(node->context);
node->context = redisConnect(node->ip, node->port);
if (node->context->err) {
fprintf(stderr,"Could not connect to Redis at ");
fprintf(stderr,"%s:%d: %s\n", node->ip, node->port,
node->context->errstr);
redisFree(node->context);
node->context = NULL;
return 0;
}
/* Set aggressive KEEP_ALIVE socket option in the Redis context socket
* in order to prevent timeouts caused by the execution of long
* commands. At the same time this improves the detection of real
* errors. */
anetKeepAlive(NULL, node->context->fd, REDIS_CLI_KEEPALIVE_INTERVAL);
if (config.auth) {
redisReply *reply = redisCommand(node->context,"AUTH %s",config.auth);
int ok = clusterManagerCheckRedisReply(node, reply, NULL);
if (reply != NULL) freeReplyObject(reply);
if (!ok) return 0;
}
return 1;
}
static void clusterManagerRemoveNodeFromList(list *nodelist,
clusterManagerNode *node) {
listIter li;
......@@ -2718,6 +2741,7 @@ static redisReply *clusterManagerMigrateKeysInReply(clusterManagerNode *source,
char **argv = NULL;
size_t *argv_len = NULL;
int c = (replace ? 8 : 7);
if (config.auth) c += 2;
size_t argc = c + reply->elements;
size_t i, offset = 6; // Keys Offset
argv = zcalloc(argc * sizeof(char *));
......@@ -2743,6 +2767,14 @@ static redisReply *clusterManagerMigrateKeysInReply(clusterManagerNode *source,
argv_len[offset] = 7;
offset++;
}
if (config.auth) {
argv[offset] = "AUTH";
argv_len[offset] = 4;
offset++;
argv[offset] = config.auth;
argv_len[offset] = strlen(config.auth);
offset++;
}
argv[offset] = "KEYS";
argv_len[offset] = 4;
offset++;
......@@ -3146,12 +3178,7 @@ cleanup:
* Warning: if something goes wrong, it will free the starting node before
* returning 0. */
static int clusterManagerLoadInfoFromNode(clusterManagerNode *node, int opts) {
if (node->context == NULL)
CLUSTER_MANAGER_NODE_CONNECT(node);
if (node->context->err) {
fprintf(stderr,"Could not connect to Redis at ");
fprintf(stderr,"%s:%d: %s\n", node->ip, node->port,
node->context->errstr);
if (node->context == NULL && !clusterManagerNodeConnect(node)) {
freeClusterManagerNode(node);
return 0;
}
......@@ -3187,9 +3214,8 @@ static int clusterManagerLoadInfoFromNode(clusterManagerNode *node, int opts) {
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *friend = ln->value;
if (!friend->ip || !friend->port) goto invalid_friend;
if (!friend->context)
CLUSTER_MANAGER_NODE_CONNECT(friend);
if (friend->context->err) goto invalid_friend;
if (!friend->context && !clusterManagerNodeConnect(friend))
goto invalid_friend;
e = NULL;
if (clusterManagerNodeLoadInfo(friend, 0, &e)) {
if (friend->flags & (CLUSTER_MANAGER_FLAG_NOADDR |
......@@ -4184,10 +4210,7 @@ static int clusterManagerCommandCreate(int argc, char **argv) {
char *ip = addr;
int port = atoi(++c);
clusterManagerNode *node = clusterManagerNewNode(ip, port);
CLUSTER_MANAGER_NODE_CONNECT(node);
if (node->context->err) {
fprintf(stderr,"Could not connect to Redis at ");
fprintf(stderr,"%s:%d: %s\n", ip, port, node->context->errstr);
if (!clusterManagerNodeConnect(node)) {
freeClusterManagerNode(node);
return 0;
}
......@@ -4478,8 +4501,7 @@ static int clusterManagerCommandAddNode(int argc, char **argv) {
// Add the new node
clusterManagerNode *new_node = clusterManagerNewNode(ip, port);
int added = 0;
CLUSTER_MANAGER_NODE_CONNECT(new_node);
if (new_node->context->err) {
if (!clusterManagerNodeConnect(new_node)) {
clusterManagerLogErr("[ERR] Sorry, can't connect to node %s:%d\n",
ip, port);
success = 0;
......@@ -5225,7 +5247,7 @@ static int clusterManagerCommandCall(int argc, char **argv) {
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (!n->context) CLUSTER_MANAGER_NODE_CONNECT(n);
if (!n->context && !clusterManagerNodeConnect(n)) continue;
redisReply *reply = NULL;
redisAppendCommandArgv(n->context, argc, (const char **) argv, argvlen);
int status = redisGetReply(n->context, (void **)(&reply));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册