diff --git a/src/anet.c b/src/anet.c index 2851318e23a7c22b5742b9fbd06dc4ebb330da4a..91545a0d5d267975fadd8099eaf72c7b29aee2fc 100644 --- a/src/anet.c +++ b/src/anet.c @@ -234,11 +234,12 @@ static int anetCreateSocket(char *err, int domain) { #define ANET_CONNECT_NONE 0 #define ANET_CONNECT_NONBLOCK 1 -static int anetTcpGenericConnect(char *err, char *addr, int port, int flags) +static int anetTcpGenericConnect(char *err, char *addr, int port, + char *source_addr, int flags) { int s = ANET_ERR, rv; char portstr[6]; /* strlen("65535") + 1; */ - struct addrinfo hints, *servinfo, *p; + struct addrinfo hints, *servinfo, *bservinfo, *p, *b; snprintf(portstr,sizeof(portstr),"%d",port); memset(&hints,0,sizeof(hints)); @@ -258,6 +259,24 @@ static int anetTcpGenericConnect(char *err, char *addr, int port, int flags) if (anetSetReuseAddr(err,s) == ANET_ERR) goto error; if (flags & ANET_CONNECT_NONBLOCK && anetNonBlock(err,s) != ANET_OK) goto error; + if (source_addr) { + int bound = 0; + /* Using getaddrinfo saves us from self-determining IPv4 vs IPv6 */ + if ((rv = getaddrinfo(source_addr, NULL, &hints, &bservinfo)) != 0) { + anetSetError(err, "%s", gai_strerror(rv)); + goto end; + } + for (b = bservinfo; b != NULL; b = b->ai_next) { + if (bind(s,b->ai_addr,b->ai_addrlen) != -1) { + bound = 1; + break; + } + } + if (!bound) { + anetSetError(err, "bind: %s", strerror(errno)); + goto end; + } + } if (connect(s,p->ai_addr,p->ai_addrlen) == -1) { /* If the socket is non-blocking, it is ok for connect() to * return an EINPROGRESS error here. */ @@ -287,12 +306,17 @@ end: int anetTcpConnect(char *err, char *addr, int port) { - return anetTcpGenericConnect(err,addr,port,ANET_CONNECT_NONE); + return anetTcpGenericConnect(err,addr,port,NULL,ANET_CONNECT_NONE); } int anetTcpNonBlockConnect(char *err, char *addr, int port) { - return anetTcpGenericConnect(err,addr,port,ANET_CONNECT_NONBLOCK); + return anetTcpGenericConnect(err,addr,port,NULL,ANET_CONNECT_NONBLOCK); +} + +int anetTcpNonBlockBindConnect(char *err, char *addr, int port, char *source_addr) +{ + return anetTcpGenericConnect(err,addr,port,source_addr,ANET_CONNECT_NONBLOCK); } int anetUnixGenericConnect(char *err, char *path, int flags) diff --git a/src/anet.h b/src/anet.h index 3f893be2dd36829b51973e839ab0f8422080b28a..c4659cd35951d616b9860bc77148c02ad4d2291a 100644 --- a/src/anet.h +++ b/src/anet.h @@ -45,6 +45,7 @@ int anetTcpConnect(char *err, char *addr, int port); int anetTcpNonBlockConnect(char *err, char *addr, int port); +int anetTcpNonBlockBindConnect(char *err, char *addr, int port, char *source_addr); int anetUnixConnect(char *err, char *path); int anetUnixNonBlockConnect(char *err, char *path); int anetRead(int fd, char *buf, int count); diff --git a/src/cluster.c b/src/cluster.c index 15afd8e5422487c389282cdd7521a3608a6bfcad..e4e8dbf95b059e56f196fa4c22456c29ccf584f7 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -2424,9 +2424,14 @@ void clusterCron(void) { mstime_t old_ping_sent; clusterLink *link; - fd = anetTcpNonBlockConnect(server.neterr, node->ip, - node->port+REDIS_CLUSTER_PORT_INCR); - if (fd == -1) continue; + fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, + node->port+REDIS_CLUSTER_PORT_INCR, server.bindaddr[0]); + if (fd == -1) { + redisLog(REDIS_DEBUG, "Unable to connect to " + "Cluster Client [%s]:%d", node->ip, + node->port+REDIS_CLUSTER_PORT_INCR); + continue; + } link = createClusterLink(node); link->fd = fd; node->link = link;