提交 4df3af4e 编写于 作者: J jonathan pickett

issue 146: changes to network layer between 2.8.9 and 2.8.12 broke IOCP async connect code

上级 128f5b53
...@@ -167,12 +167,24 @@ static void __redisAsyncCopyError(redisAsyncContext *ac) { ...@@ -167,12 +167,24 @@ static void __redisAsyncCopyError(redisAsyncContext *ac) {
#ifdef WIN32_IOCP #ifdef WIN32_IOCP
redisAsyncContext *redisAsyncConnect(const char *ip, int port) { redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
struct sockaddr_in sa; SOCKADDR_STORAGE ss;
redisContext *c = redisPreConnectNonBlock(ip, port, &sa); redisContext *c = redisPreConnectNonBlock(ip, port, &ss);
redisAsyncContext *ac = redisAsyncInitialize(c); redisAsyncContext *ac = redisAsyncInitialize(c);
if (aeWinSocketConnect(ac->c.fd, (struct sockaddr *)&sa, sizeof(sa)) != 0) { if (aeWinSocketConnect(ac->c.fd, &ss) != 0) {
ac->c.err = errno;
strerror_r(errno, ac->c.errstr, sizeof(ac->c.errstr));
}
__redisAsyncCopyError(ac);
return ac;
}
redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, const char *source_addr) {
SOCKADDR_STORAGE ss;
redisContext *c = redisPreConnectNonBlock(ip, port, &ss);
redisAsyncContext *ac = redisAsyncInitialize(c);
if (aeWinSocketConnectBind(ac->c.fd, &ss, source_addr) != 0) {
ac->c.err = errno; ac->c.err = errno;
strerror_r(errno,ac->c.errstr,sizeof(ac->c.errstr)); strerror_r(errno, ac->c.errstr, sizeof(ac->c.errstr));
} }
__redisAsyncCopyError(ac); __redisAsyncCopyError(ac);
return ac; return ac;
...@@ -195,7 +207,6 @@ redisAsyncContext *redisAsyncConnect(const char *ip, int port) { ...@@ -195,7 +207,6 @@ redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
__redisAsyncCopyError(ac); __redisAsyncCopyError(ac);
return ac; return ac;
} }
#endif
redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, redisAsyncContext *redisAsyncConnectBind(const char *ip, int port,
const char *source_addr) { const char *source_addr) {
...@@ -204,6 +215,7 @@ redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, ...@@ -204,6 +215,7 @@ redisAsyncContext *redisAsyncConnectBind(const char *ip, int port,
__redisAsyncCopyError(ac); __redisAsyncCopyError(ac);
return ac; return ac;
} }
#endif
redisAsyncContext *redisAsyncConnectUnix(const char *path) { redisAsyncContext *redisAsyncConnectUnix(const char *path) {
redisContext *c; redisContext *c;
......
...@@ -1135,11 +1135,11 @@ redisContext *redisConnectFd(int fd) { ...@@ -1135,11 +1135,11 @@ redisContext *redisConnectFd(int fd) {
} }
#ifdef _WIN32 #ifdef _WIN32
redisContext *redisPreConnectNonBlock(const char *ip, int port, struct sockaddr_in *sa) { redisContext *redisPreConnectNonBlock(const char *ip, int port, SOCKADDR_STORAGE *ss) {
redisContext *c = redisContextInit(); redisContext *c = redisContextInit();
c->fd = -1; c->fd = -1;
c->flags &= ~REDIS_BLOCK; c->flags &= ~REDIS_BLOCK;
redisContextPreConnectTcp(c, ip, port, NULL, sa); redisContextPreConnectTcp(c, ip, port, NULL, ss);
return c; return c;
} }
#endif #endif
......
...@@ -197,7 +197,7 @@ int redisFreeKeepFd(redisContext *c); ...@@ -197,7 +197,7 @@ int redisFreeKeepFd(redisContext *c);
int redisBufferRead(redisContext *c); int redisBufferRead(redisContext *c);
int redisBufferWrite(redisContext *c, int *done); int redisBufferWrite(redisContext *c, int *done);
#ifdef _WIN32 #ifdef _WIN32
redisContext *redisPreConnectNonBlock(const char *ip, int port, struct sockaddr_in *sa); redisContext *redisPreConnectNonBlock(const char *ip, int port, SOCKADDR_STORAGE *sa);
int redisBufferReadDone(redisContext *c, char *buf, int nread); int redisBufferReadDone(redisContext *c, char *buf, int nread);
int redisBufferWriteDone(redisContext *c, int nwritten, int *done); int redisBufferWriteDone(redisContext *c, int nwritten, int *done);
#endif #endif
......
...@@ -287,32 +287,21 @@ int redisContextSetTimeout(redisContext *c, const struct timeval tv) { ...@@ -287,32 +287,21 @@ int redisContextSetTimeout(redisContext *c, const struct timeval tv) {
} }
#ifdef _WIN32 #ifdef _WIN32
int redisContextPreConnectTcp(redisContext *c, const char *addr, int port,
struct timeval *timeout, struct sockaddr_in *sa) { int redisContextPreConnectTcp(
redisContext *c,
const char *addr,
int port,
struct timeval *timeout,
SOCKADDR_STORAGE* ss) {
int blocking = (c->flags & REDIS_BLOCK); int blocking = (c->flags & REDIS_BLOCK);
unsigned long inAddress;
if (REDIS_OK != redisCreateSocket(c, AF_INET)) { if (ParseStorageAddress(addr, port, ss) == FALSE) {
return REDIS_ERR; DebugBreak();
} }
sa->sin_family = AF_INET; if (REDIS_OK != redisCreateSocket(c, ss->ss_family)) {
sa->sin_port = htons(port); return REDIS_ERR;
inAddress = inet_addr(addr);
if (inAddress == INADDR_NONE || inAddress == INADDR_ANY) {
struct hostent *he;
he = gethostbyname(addr);
if (he == NULL) {
__redisSetError(c, REDIS_ERR_OTHER,
sdscatprintf(sdsempty(), "can't resolve: %s\n", addr));
close(c->fd);
return REDIS_ERR;
}
memcpy(&sa->sin_addr, he->h_addr, sizeof(struct in_addr));
} else {
sa->sin_addr.s_addr = inAddress;
} }
if (redisSetTcpNoDelay(c) != REDIS_OK) if (redisSetTcpNoDelay(c) != REDIS_OK)
......
...@@ -49,7 +49,7 @@ int redisContextConnectUnix(redisContext *c, const char *path, const struct time ...@@ -49,7 +49,7 @@ int redisContextConnectUnix(redisContext *c, const char *path, const struct time
int redisKeepAlive(redisContext *c, int interval); int redisKeepAlive(redisContext *c, int interval);
#ifdef _WIN32 #ifdef _WIN32
int redisContextPreConnectTcp(redisContext *c, const char *addr, int port, struct timeval *timeout, struct sockaddr_in *sa); int redisContextPreConnectTcp(redisContext *c, const char *addr, int port, struct timeval *timeout, SOCKADDR_STORAGE *ss);
#endif #endif
#endif #endif
...@@ -1022,6 +1022,43 @@ const char* redis_inet_ntop_impl(int af, const void *src, char *dst, size_t size ...@@ -1022,6 +1022,43 @@ const char* redis_inet_ntop_impl(int af, const void *src, char *dst, size_t size
} }
} }
BOOL ParseStorageAddress(const char *ip, int port, SOCKADDR_STORAGE* pSotrageAddr) {
struct addrinfo hints, *res;
int status;
char port_buffer[6];
sprintf(port_buffer, "%hu", port);
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
/* Setting AI_PASSIVE will give you a wildcard address if addr is NULL */
hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV | AI_PASSIVE;
if ((status = getaddrinfo(ip, port_buffer, &hints, &res) != 0)) {
fprintf(stderr, "getaddrinfo: %S\n", gai_strerror(status));
return FALSE;
}
/* Note, we're taking the first valid address, there may be more than one */
memcpy(pSotrageAddr, res->ai_addr, res->ai_addrlen);
freeaddrinfo(res);
return TRUE;
}
int StorageSize(SOCKADDR_STORAGE *ss) {
switch (ss->ss_family) {
case AF_INET:
return sizeof(SOCKADDR_IN);
case AF_INET6:
return sizeof(SOCKADDR_IN6);
default:
return -1;
}
}
class Win32_FDSockMap { class Win32_FDSockMap {
public: public:
static Win32_FDSockMap& getInstance() { static Win32_FDSockMap& getInstance() {
......
...@@ -249,6 +249,10 @@ BOOL FDAPI_ConnectEx(int fd,const struct sockaddr *name,int namelen,PVOID lpSend ...@@ -249,6 +249,10 @@ BOOL FDAPI_ConnectEx(int fd,const struct sockaddr *name,int namelen,PVOID lpSend
void FDAPI_GetAcceptExSockaddrs(int fd, PVOID lpOutputBuffer,DWORD dwReceiveDataLength,DWORD dwLocalAddressLength,DWORD dwRemoteAddressLength,LPSOCKADDR *LocalSockaddr,LPINT LocalSockaddrLength,LPSOCKADDR *RemoteSockaddr,LPINT RemoteSockaddrLength); void FDAPI_GetAcceptExSockaddrs(int fd, PVOID lpOutputBuffer,DWORD dwReceiveDataLength,DWORD dwLocalAddressLength,DWORD dwRemoteAddressLength,LPSOCKADDR *LocalSockaddr,LPINT LocalSockaddrLength,LPSOCKADDR *RemoteSockaddr,LPINT RemoteSockaddrLength);
int FDAPI_UpdateAcceptContext( int fd ); int FDAPI_UpdateAcceptContext( int fd );
// other networking functions
BOOL ParseStorageAddress(const char *ip, int port, SOCKADDR_STORAGE* pSotrageAddr);
int StorageSize(SOCKADDR_STORAGE *ss);
// macroize CRT definitions to point to our own // macroize CRT definitions to point to our own
#ifndef FDAPI_NOCRTREDEFS #ifndef FDAPI_NOCRTREDEFS
#define close(fd) fdapi_close(fd) #define close(fd) fdapi_close(fd)
......
...@@ -281,12 +281,14 @@ int aeWinSocketSend(int fd, char *buf, int len, ...@@ -281,12 +281,14 @@ int aeWinSocketSend(int fd, char *buf, int len,
return SOCKET_ERROR; return SOCKET_ERROR;
} }
/* for non-blocking connect with IOCP */ /* for non-blocking connect with IOCP */
int aeWinSocketConnect(int fd, const struct sockaddr *sa, int len) { int aeWinSocketConnect(int fd, const SOCKADDR_STORAGE *ss) {
const GUID wsaid_connectex = WSAID_CONNECTEX; const GUID wsaid_connectex = WSAID_CONNECTEX;
DWORD result; DWORD result;
aeSockState *sockstate; aeSockState *sockstate;
struct sockaddr_in addr;
if ((sockstate = aeGetSockState(iocpState, fd)) == NULL) { if ((sockstate = aeGetSockState(iocpState, fd)) == NULL) {
errno = WSAEINVAL; errno = WSAEINVAL;
...@@ -298,14 +300,97 @@ int aeWinSocketConnect(int fd, const struct sockaddr *sa, int len) { ...@@ -298,14 +300,97 @@ int aeWinSocketConnect(int fd, const struct sockaddr *sa, int len) {
} }
memset(&sockstate->ov_read, 0, sizeof(sockstate->ov_read)); memset(&sockstate->ov_read, 0, sizeof(sockstate->ov_read));
/* need to bind sock before connectex */ /* need to bind sock before connectex */
memset(&addr, 0, sizeof(addr)); switch (ss->ss_family) {
addr.sin_family = AF_INET; case AF_INET:
addr.sin_addr.s_addr = INADDR_ANY; {
addr.sin_port = 0; SOCKADDR_IN addr;
result = bind(fd, (struct sockaddr *)&addr, sizeof(addr)); memset(&addr, 0, sizeof(SOCKADDR_IN));
addr.sin_family = ss->ss_family;
addr.sin_addr.S_un.S_addr = INADDR_ANY;
addr.sin_port = 0;
result = bind(fd, (SOCKADDR*)&addr, sizeof(addr));
result = FDAPI_ConnectEx(fd, (SOCKADDR*)ss, sizeof(SOCKADDR_IN), NULL, 0, NULL, &sockstate->ov_read);
break;
}
case AF_INET6:
{
SOCKADDR_IN6 addr;
memset(&addr, 0, sizeof(SOCKADDR_IN6));
addr.sin6_family = ss->ss_family;
memset(&(addr.sin6_addr.u.Byte), 0, 16);
addr.sin6_port = 0;
result = bind(fd, (SOCKADDR*)&addr, sizeof(addr));
result = FDAPI_ConnectEx(fd, (SOCKADDR*)ss, sizeof(SOCKADDR_IN6), NULL, 0, NULL, &sockstate->ov_read);
break;
}
default:
{
DebugBreak();
}
}
if (result != TRUE) {
result = WSAGetLastError();
if (result == ERROR_IO_PENDING) {
errno = WSA_IO_PENDING;
sockstate->masks |= CONNECT_PENDING;
} else {
errno = result;
return SOCKET_ERROR;
}
}
return 0;
}
int aeWinSocketConnectBind(int fd, const SOCKADDR_STORAGE *ss, const char* source_addr) {
const GUID wsaid_connectex = WSAID_CONNECTEX;
DWORD result;
aeSockState *sockstate;
if ((sockstate = aeGetSockState(iocpState, fd)) == NULL) {
errno = WSAEINVAL;
return SOCKET_ERROR;
}
if (aeWinSocketAttach(fd) != 0) {
return SOCKET_ERROR;
}
memset(&sockstate->ov_read, 0, sizeof(sockstate->ov_read));
/* need to bind sock before connectex */
switch (ss->ss_family) {
case AF_INET:
{
SOCKADDR_IN addr;
memset(&addr, 0, sizeof(SOCKADDR_IN));
addr.sin_family = ss->ss_family;
addr.sin_addr.S_un.S_addr = INADDR_ANY;
addr.sin_port = 0;
result = bind(fd, (SOCKADDR*)&addr, sizeof(addr));
break;
}
case AF_INET6:
{
SOCKADDR_IN6 addr;
memset(&addr, 0, sizeof(SOCKADDR_IN6));
addr.sin6_family = ss->ss_family;
memset(&(addr.sin6_addr.u.Byte), 0, 16);
addr.sin6_port = 0;
result = bind(fd, (SOCKADDR*)&addr, sizeof(addr));
break;
}
default:
{
DebugBreak();
}
}
result = FDAPI_ConnectEx(fd, sa, len, NULL, 0, NULL, &sockstate->ov_read); result = FDAPI_ConnectEx(fd, (const LPSOCKADDR)ss, StorageSize(ss), NULL, 0, NULL, &sockstate->ov_read);
if (result != TRUE) { if (result != TRUE) {
result = WSAGetLastError(); result = WSAGetLastError();
if (result == ERROR_IO_PENDING) { if (result == ERROR_IO_PENDING) {
......
...@@ -299,7 +299,8 @@ int aeWinSocketSend(int fd, char *buf, int len, ...@@ -299,7 +299,8 @@ int aeWinSocketSend(int fd, char *buf, int len,
void *eventLoop, void *client, void *data, void *proc); void *eventLoop, void *client, void *data, void *proc);
int aeWinListen(int rfd, int backlog); int aeWinListen(int rfd, int backlog);
int aeWinAccept(int fd, struct sockaddr *sa, socklen_t *len); int aeWinAccept(int fd, struct sockaddr *sa, socklen_t *len);
int aeWinSocketConnect(int fd, const struct sockaddr *sa, int len); int aeWinSocketConnect(int fd, const SOCKADDR_STORAGE *ss);
int aeWinSocketConnectBind(int fd, const SOCKADDR_STORAGE *ss, const char* source_addr);
int strerror_r(int err, char* buf, size_t buflen); int strerror_r(int err, char* buf, size_t buflen);
char *wsa_strerror(int err); char *wsa_strerror(int err);
......
...@@ -260,31 +260,14 @@ static int anetCreateSocket(char *err, int domain) { ...@@ -260,31 +260,14 @@ static int anetCreateSocket(char *err, int domain) {
#define ANET_CONNECT_NONBLOCK 1 #define ANET_CONNECT_NONBLOCK 1
static int anetTcpGenericConnect(char *err, char *addr, int port, int flags) { static int anetTcpGenericConnect(char *err, char *addr, int port, int flags) {
int rfd; int rfd;
struct sockaddr_in sa; SOCKADDR_STORAGE ss;
unsigned long inAddress;
if ((rfd = anetCreateSocket(err,AF_INET)) == ANET_ERR) { ParseStorageAddress(addr, port, &ss);
return ANET_ERR;
}
sa.sin_family = AF_INET;
sa.sin_port = htons((u_short)port);
inAddress = inet_addr(addr);
if (inAddress == INADDR_NONE || inAddress == INADDR_ANY) {
struct hostent *he;
he = gethostbyname(addr); if ((rfd = anetCreateSocket(err,ss.ss_family)) == ANET_ERR) {
if (he == NULL) { return ANET_ERR;
anetSetError(err, "can't resolve: %s\n", addr);
close(rfd);
return ANET_ERR;
}
memcpy(&sa.sin_addr, he->h_addr, sizeof(struct in_addr));
}
else {
sa.sin_addr.s_addr = inAddress;
} }
if (aeWinSocketConnect(rfd, &ss ) == SOCKET_ERROR) {
if (aeWinSocketConnect(rfd, (struct sockaddr*)&sa, sizeof(sa)) == SOCKET_ERROR) {
if ((errno == WSAEWOULDBLOCK || errno == WSA_IO_PENDING)) errno = EINPROGRESS; if ((errno == WSAEWOULDBLOCK || errno == WSA_IO_PENDING)) errno = EINPROGRESS;
if (errno == EINPROGRESS && flags & ANET_CONNECT_NONBLOCK) { if (errno == EINPROGRESS && flags & ANET_CONNECT_NONBLOCK) {
return rfd; return rfd;
......
...@@ -364,9 +364,9 @@ static client createClient(char *cmd, size_t len, client from) { ...@@ -364,9 +364,9 @@ static client createClient(char *cmd, size_t len, client from) {
if (config.hostsocket == NULL) { if (config.hostsocket == NULL) {
#ifdef WIN32_IOCP #ifdef WIN32_IOCP
struct sockaddr_in sa; SOCKADDR_STORAGE ss;
c->context = redisPreConnectNonBlock(config.hostip,config.hostport, &sa); c->context = redisPreConnectNonBlock(config.hostip,config.hostport, &ss);
if (aeWinSocketConnect(c->context->fd, (struct sockaddr *)&sa, sizeof(sa)) != 0) { if (aeWinSocketConnect(c->context->fd, &ss) != 0) {
c->context->err = errno; c->context->err = errno;
strerror_r(errno,c->context->errstr,sizeof(c->context->errstr)); strerror_r(errno,c->context->errstr,sizeof(c->context->errstr));
} }
......
...@@ -406,7 +406,7 @@ proc restart_instance {type id} { ...@@ -406,7 +406,7 @@ proc restart_instance {type id} {
# Check that the instance is running # Check that the instance is running
if {[server_is_up 127.0.0.1 $port 100] == 0} { if {[server_is_up 127.0.0.1 $port 100] == 0} {
abort_sentinel_test "Problems starting $type #$j: ping timeout" abort_sentinel_test "Problems starting $type: ping timeout"
} }
# Connect with it with a fresh link # Connect with it with a fresh link
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册