提交 ef8807bd 编写于 作者: E Enrico Giordani

[Fix] Use overlapped sockets for cluster failover communication.

上级 2388de80
...@@ -42,6 +42,8 @@ POSIX_ONLY(#include <sys/socket.h>) ...@@ -42,6 +42,8 @@ POSIX_ONLY(#include <sys/socket.h>)
POSIX_ONLY(#include <sys/file.h>) POSIX_ONLY(#include <sys/file.h>)
#include <math.h> #include <math.h>
WIN32_ONLY(extern int WSIOCP_QueueAccept(int listenfd);)
/* A global reference to myself is handy to make code more clear. /* A global reference to myself is handy to make code more clear.
* Myself always points to server.cluster->myself, that is, the clusterNode * Myself always points to server.cluster->myself, that is, the clusterNode
* that represents this node. */ * that represents this node. */
...@@ -598,7 +600,10 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { ...@@ -598,7 +600,10 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
/* If the server is starting up, don't accept cluster connections: /* If the server is starting up, don't accept cluster connections:
* UPDATE messages may interact with the database content. */ * UPDATE messages may interact with the database content. */
if (server.masterhost == NULL && server.loading) return; if (server.masterhost == NULL && server.loading) {
WIN32_ONLY(WSIOCP_QueueAccept(fd);)
return;
}
while(max--) { while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
...@@ -606,6 +611,12 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { ...@@ -606,6 +611,12 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
if (errno != EWOULDBLOCK) if (errno != EWOULDBLOCK)
redisLog(REDIS_VERBOSE, redisLog(REDIS_VERBOSE,
"Error accepting cluster node: %s", server.neterr); "Error accepting cluster node: %s", server.neterr);
#ifdef _WIN32
if (WSIOCP_QueueAccept(fd) == -1) {
redisLog(REDIS_WARNING,
"acceptTcpHandler: failed to queue another accept.");
}
#endif
return; return;
} }
anetNonBlock(NULL,cfd); anetNonBlock(NULL,cfd);
...@@ -1994,6 +2005,42 @@ void handleLinkIOError(clusterLink *link) { ...@@ -1994,6 +2005,42 @@ void handleLinkIOError(clusterLink *link) {
freeClusterLink(link); freeClusterLink(link);
} }
#ifdef _WIN32
void clusterWriteDone(aeEventLoop *el, int fd, void *privdata, int written) {
WSIOCP_Request *req = (WSIOCP_Request *) privdata;
clusterLink *link = (clusterLink *) req->client;
REDIS_NOTUSED(el);
REDIS_NOTUSED(fd);
if (sdslen(link->sndbuf) == written) {
sdsrange(link->sndbuf, written, -1);
aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
redisLog(REDIS_WARNING, "clusterWriteDone written %d fd %d", written, link->fd);
}
}
void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
clusterLink *link = (clusterLink*) privdata;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
int result = WSIOCP_SocketSend(fd,
(char*) link->sndbuf,
(int) (sdslen(link->sndbuf)),
el,
link,
NULL,
clusterWriteDone);
if (errno == WSA_IO_PENDING)
redisLog(REDIS_WARNING, "WSA_IO_PENDING writing to socket fd %d", link->fd);
if (result == SOCKET_ERROR && errno != WSA_IO_PENDING) {
redisLog(REDIS_WARNING, "Error writing to socket fd", link->fd);
handleLinkIOError(link);
return;
}
}
#else
/* Send data. This is handled using a trivial send buffer that gets /* Send data. This is handled using a trivial send buffer that gets
* consumed by write(). We don't try to optimize this for speed too much * consumed by write(). We don't try to optimize this for speed too much
* as this is a very low traffic channel. */ * as this is a very low traffic channel. */
...@@ -2010,10 +2057,11 @@ void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) { ...@@ -2010,10 +2057,11 @@ void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
handleLinkIOError(link); handleLinkIOError(link);
return; return;
} }
sdsrange(link->sndbuf,(int)nwritten,-1); WIN_PORT_FIX /* cast (int) */ sdsrange(link->sndbuf,nwritten,-1);
if (sdslen(link->sndbuf) == 0) if (sdslen(link->sndbuf) == 0)
aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE); aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
} }
#endif
/* Read data. Try to read the first field of the header first to check the /* Read data. Try to read the first field of the header first to check the
* full length of the packet. When a whole packet is in memory this function * full length of the packet. When a whole packet is in memory this function
...@@ -2046,7 +2094,7 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) { ...@@ -2046,7 +2094,7 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
"Bad message length or signature received " "Bad message length or signature received "
"from Cluster bus."); "from Cluster bus.");
handleLinkIOError(link); handleLinkIOError(link);
IF_WIN32(goto done,return); return;
} }
} }
readlen = ntohl(hdr->totlen) - rcvbuflen; readlen = ntohl(hdr->totlen) - rcvbuflen;
...@@ -2054,14 +2102,14 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) { ...@@ -2054,14 +2102,14 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
} }
nread = read(fd,buf,readlen); nread = read(fd,buf,readlen);
if (nread == -1 && errno == EAGAIN) IF_WIN32(goto done, return); /* No more data ready. */ if (nread == -1 && errno == EAGAIN) { WIN32_ONLY(WSIOCP_QueueNextRead(fd);) return; } /* No more data ready. */
if (nread <= 0) { if (nread <= 0) {
/* I/O error... */ /* I/O error... */
redisLog(REDIS_DEBUG,"I/O error reading from node link: %s", redisLog(REDIS_DEBUG,"I/O error reading from node link: %s",
(nread == 0) ? "connection closed" : strerror(errno)); (nread == 0) ? "connection closed" : strerror(errno));
handleLinkIOError(link); handleLinkIOError(link);
IF_WIN32(goto done, return); return;
} else { } else {
/* Read data and recast the pointer to the new buffer. */ /* Read data and recast the pointer to the new buffer. */
link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread); link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
...@@ -2075,11 +2123,10 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) { ...@@ -2075,11 +2123,10 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
sdsfree(link->rcvbuf); sdsfree(link->rcvbuf);
link->rcvbuf = sdsempty(); link->rcvbuf = sdsempty();
} else { } else {
IF_WIN32(goto done, return); /* Link no longer valid. */ return; /* Link no longer valid. */
} }
} }
} }
WIN32_ONLY(done:)
WIN32_ONLY(WSIOCP_QueueNextRead(fd);) WIN32_ONLY(WSIOCP_QueueNextRead(fd);)
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册