提交 e9555360 编写于 作者: H Henry Rawas 提交者: JonathanPickett

Improve non blocking opening and closing of sockets

上级 47bbccf9
Redis on Windows 2.6 prototype Redis on Windows 2.6 prototype
=== ===
## What's new in this release ## What's new in this release
...@@ -6,9 +6,10 @@ ...@@ -6,9 +6,10 @@
- The port is similar to the port of Redis 2.4, including the same background saving technology. - The port is similar to the port of Redis 2.4, including the same background saving technology.
- There is support for x64 version as well as 32 bit versions. - There is support for x64 version as well as 32 bit versions.
- For the 64 bit version, there is a limit of 2^32 objects in a structure, and a max length of 2^32 for any object - For the 64 bit version, there is a limit of 2^32 objects in a structure, and a max length of 2^32 for any object
- Version number now 2.6.8-pre1 to indicate prerelease and to enable changing - Version number now 2.6.8-pre2 to indicate prerelease and to enable changing
- Version 2.6.8-pre2 fixes several failures that existed in 2.6.8-pre1. Most of these were related to handling opening and closing of non-blocking sockets.
=== ##Acknowledgements
Special thanks to Dušan Majkic (https://github.com/dmajkic, https://github.com/dmajkic/redis/) for his project on GitHub that gave us the opportunity to quickly learn some on the intricacies of Redis code. His project also helped us to build our prototype quickly. Special thanks to Dušan Majkic (https://github.com/dmajkic, https://github.com/dmajkic/redis/) for his project on GitHub that gave us the opportunity to quickly learn some on the intricacies of Redis code. His project also helped us to build our prototype quickly.
## Repo branches ## Repo branches
...@@ -49,8 +50,5 @@ To run the Redis test suite requires some manual work: ...@@ -49,8 +50,5 @@ To run the Redis test suite requires some manual work:
If a Unix shell is not installed you may see the following error message: "couldn't execute "cat": no such file or directory". If a Unix shell is not installed you may see the following error message: "couldn't execute "cat": no such file or directory".
## Known issues ## Known issues
When the unit tests are run, a few failures are reported, mostly related to replication. The exact failures vary per run. The tests pass if they are run individually. We are still investigating if this is a timing problem with the tests or with the port of the Redis server.
The are 3 consistent failures in the protocol module, having to do with receiving an error message when the client connection is closed due to bad data. These tests are disabled for now. The are 3 consistent failures in the protocol module, having to do with receiving an error message when the client connection is closed due to bad data. These tests are disabled for now.
\ No newline at end of file
...@@ -132,6 +132,7 @@ aeSockState *aeGetExistingSockState(void *apistate, int fd) { ...@@ -132,6 +132,7 @@ aeSockState *aeGetExistingSockState(void *apistate, int fd) {
// find matching value in list and remove. If found return 1 // find matching value in list and remove. If found return 1
int removeMatchFromList(list *socklist, void *value) { int removeMatchFromList(list *socklist, void *value) {
listNode *node; listNode *node;
if (socklist == NULL) return 0;
node = listFirst(socklist); node = listFirst(socklist);
while (node != NULL) { while (node != NULL) {
if (listNodeValue(node) == value) { if (listNodeValue(node) == value) {
...@@ -143,14 +144,17 @@ int removeMatchFromList(list *socklist, void *value) { ...@@ -143,14 +144,17 @@ int removeMatchFromList(list *socklist, void *value) {
return 0; return 0;
} }
/* delete data for socket / fd being monitored */ /* delete data for socket / fd being monitored
or move to the closing queue if operations are pending.
Return 1 if deleted or not found, 0 if pending*/
void aeDelSockState(void *apistate, aeSockState *sockState) { void aeDelSockState(void *apistate, aeSockState *sockState) {
int sindex; int sindex;
list *socklist; list *socklist;
if (apistate == NULL) return; if (apistate == NULL) return;
if (sockState->wreqs == 0 && (sockState->masks & (READ_QUEUED | SOCKET_ATTACHED)) == 0) { if (sockState->wreqs == 0 &&
(sockState->masks & (READ_QUEUED | CONNECT_PENDING | SOCKET_ATTACHED | CLOSE_PENDING)) == 0) {
// see if in active list // see if in active list
sindex = aeSocketIndex(sockState->fd); sindex = aeSocketIndex(sockState->fd);
socklist = &(((aeApiState *)apistate)->lookup[sindex]); socklist = &(((aeApiState *)apistate)->lookup[sindex]);
...@@ -228,31 +232,35 @@ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { ...@@ -228,31 +232,35 @@ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
if (mask & AE_READABLE) { if (mask & AE_READABLE) {
sockstate->masks |= AE_READABLE; sockstate->masks |= AE_READABLE;
if (sockstate->masks & LISTEN_SOCK) { if ((sockstate->masks & CONNECT_PENDING) == 0) {
/* actually a listen. Do not treat as read */ if (sockstate->masks & LISTEN_SOCK) {
} else { /* actually a listen. Do not treat as read */
if ((sockstate->masks & READ_QUEUED) == 0) { } else {
// queue up a 0 byte read if ((sockstate->masks & READ_QUEUED) == 0) {
aeWinReceiveDone(fd); // queue up a 0 byte read
aeWinReceiveDone(fd);
}
} }
} }
} }
if (mask & AE_WRITABLE) { if (mask & AE_WRITABLE) {
sockstate->masks |= AE_WRITABLE; sockstate->masks |= AE_WRITABLE;
// if no write active, then need to queue write ready if ((sockstate->masks & CONNECT_PENDING) == 0) {
if (sockstate->wreqs == 0) { // if no write active, then need to queue write ready
asendreq *areq = (asendreq *)zmalloc(sizeof(asendreq)); if (sockstate->wreqs == 0) {
memset(areq, 0, sizeof(asendreq)); asendreq *areq = (asendreq *)zmalloc(sizeof(asendreq));
if (PostQueuedCompletionStatus(state->iocp, memset(areq, 0, sizeof(asendreq));
0, if (PostQueuedCompletionStatus(state->iocp,
fd, 0,
&areq->ov) == 0) { fd,
errno = GetLastError(); &areq->ov) == 0) {
zfree(areq); errno = GetLastError();
return -1; zfree(areq);
return -1;
}
sockstate->wreqs++;
listAddNodeTail(&sockstate->wreqlist, areq);
} }
sockstate->wreqs++;
listAddNodeTail(&sockstate->wreqlist, areq);
} }
} }
return 0; return 0;
...@@ -327,88 +335,102 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { ...@@ -327,88 +335,102 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
/* the competion key is the socket */ /* the competion key is the socket */
SOCKET sock = (SOCKET)entry->lpCompletionKey; SOCKET sock = (SOCKET)entry->lpCompletionKey;
sockstate = aeGetExistingSockState(state, (int)sock); sockstate = aeGetExistingSockState(state, (int)sock);
if (sockstate == NULL) continue;
if (sockstate != NULL) {
if ((sockstate->masks & LISTEN_SOCK) && entry->lpOverlapped != NULL) { if ((sockstate->masks & LISTEN_SOCK) && entry->lpOverlapped != NULL) {
/* need to set event for listening */ /* need to set event for listening */
aacceptreq *areq = (aacceptreq *)entry->lpOverlapped; aacceptreq *areq = (aacceptreq *)entry->lpOverlapped;
areq->next = sockstate->reqs; areq->next = sockstate->reqs;
sockstate->reqs = areq; sockstate->reqs = areq;
sockstate->masks &= ~ACCEPT_PENDING; sockstate->masks &= ~ACCEPT_PENDING;
if (sockstate->masks & AE_READABLE) {
eventLoop->fired[numevents].fd = (int)sock;
eventLoop->fired[numevents].mask = AE_READABLE;
numevents++;
}
} else {
/* check if event is read complete (may be 0 length read) */
int matched = 0;
if (entry->lpOverlapped == &sockstate->ov_read) {
matched = 1;
sockstate->masks &= ~READ_QUEUED;
if (sockstate->masks & AE_READABLE) { if (sockstate->masks & AE_READABLE) {
eventLoop->fired[numevents].fd = (int)sock; eventLoop->fired[numevents].fd = (int)sock;
eventLoop->fired[numevents].mask = AE_READABLE; eventLoop->fired[numevents].mask = AE_READABLE;
numevents++; numevents++;
} }
} else if (sockstate->wreqs > 0 && entry->lpOverlapped != NULL) { } else if (sockstate->masks & CONNECT_PENDING) {
/* should be write complete. Get results */ /* check if connect complete */
asendreq *areq = (asendreq *)entry->lpOverlapped; if (entry->lpOverlapped == &sockstate->ov_read) {
matched = removeMatchFromList(&sockstate->wreqlist, areq); sockstate->masks &= ~CONNECT_PENDING;
if (matched) { /* enable read and write events for this connection */
/* call write complete callback so buffers can be freed */ aeApiAddEvent(eventLoop, (int)sock, sockstate->masks);
if (areq->proc != NULL) { }
DWORD written = 0; } else {
DWORD flags; int matched = 0;
WSAGetOverlappedResult(sock, &areq->ov, &written, FALSE, &flags); /* check if event is read complete (may be 0 length read) */
areq->proc(areq->eventLoop, (int)sock, &areq->req, (int)written); if (entry->lpOverlapped == &sockstate->ov_read) {
} matched = 1;
sockstate->wreqs--; sockstate->masks &= ~READ_QUEUED;
zfree(areq); if (sockstate->masks & AE_READABLE) {
/* if no active write requests, set ready to write */
if (sockstate->wreqs == 0 && sockstate->masks & AE_WRITABLE) {
eventLoop->fired[numevents].fd = (int)sock; eventLoop->fired[numevents].fd = (int)sock;
eventLoop->fired[numevents].mask = AE_WRITABLE; eventLoop->fired[numevents].mask = AE_READABLE;
numevents++; numevents++;
} }
} else if (sockstate->wreqs > 0 && entry->lpOverlapped != NULL) {
/* should be write complete. Get results */
asendreq *areq = (asendreq *)entry->lpOverlapped;
matched = removeMatchFromList(&sockstate->wreqlist, areq);
if (matched) {
/* call write complete callback so buffers can be freed */
if (areq->proc != NULL) {
DWORD written = 0;
DWORD flags;
WSAGetOverlappedResult(sock, &areq->ov, &written, FALSE, &flags);
areq->proc(areq->eventLoop, (int)sock, &areq->req, (int)written);
}
sockstate->wreqs--;
zfree(areq);
/* if no active write requests, set ready to write */
if (sockstate->wreqs == 0 && sockstate->masks & AE_WRITABLE) {
eventLoop->fired[numevents].fd = (int)sock;
eventLoop->fired[numevents].mask = AE_WRITABLE;
numevents++;
}
}
}
if (matched == 0) {
/* redisLog */printf("Sec:%lld Unknown complete (closed) on %d\n", gettimeofdaysecs(NULL), sock);
sockstate = NULL;
} }
} }
} else {
if (matched == 0) { // no match for active connection.
// no match for active connection. // Try the closing list.
// Try the closing list since socket value is reused. list *socklist = &(state->closing);
list *socklist = &(state->closing); listNode *node;
listNode *node; node = listFirst(socklist);
node = listFirst(socklist); while (node != NULL) {
while (node != NULL && matched == 0) { sockstate = (aeSockState *)listNodeValue(node);
sockstate = (aeSockState *)listNodeValue(node); if (sockstate->fd == sock) {
if (sockstate->fd == sock) { if (sockstate->masks & CONNECT_PENDING) {
/* check if connect complete */
if (entry->lpOverlapped == &sockstate->ov_read) { if (entry->lpOverlapped == &sockstate->ov_read) {
// read complete sockstate->masks &= ~CONNECT_PENDING;
sockstate->masks &= ~READ_QUEUED; }
matched = 1; } else if (entry->lpOverlapped == &sockstate->ov_read) {
} else { // read complete
// check pending writes sockstate->masks &= ~READ_QUEUED;
asendreq *areq = (asendreq *)entry->lpOverlapped; } else {
matched = removeMatchFromList(&sockstate->wreqlist, areq); // check pending writes
if (matched) { asendreq *areq = (asendreq *)entry->lpOverlapped;
sockstate->wreqs--; if (removeMatchFromList(&sockstate->wreqlist, areq)) {
zfree(areq); sockstate->wreqs--;
} zfree(areq);
} }
} }
node = listNextNode(node); if (sockstate->wreqs == 0 &&
(sockstate->masks & (CONNECT_PENDING | READ_QUEUED | SOCKET_ATTACHED)) == 0) {
if ((sockstate->masks & CLOSE_PENDING) != 0) {
closesocket(sock);
sockstate->masks &= ~(CLOSE_PENDING);
}
// safe to delete sockstate
aeDelSockState(state, sockstate);
}
break;
} }
node = listNextNode(node);
} }
if (matched == 0) {
sockstate = NULL;
}
}
if (sockstate != NULL && sockstate->wreqs == 0 &&
(sockstate->masks & (READ_QUEUED | SOCKET_ATTACHED)) == 0) {
// safe to delete sockstate
aeDelSockState(state, sockstate);
} }
} }
} }
......
...@@ -266,15 +266,9 @@ static int anetTcpGenericConnect(char *err, char *addr, int port, int flags) { ...@@ -266,15 +266,9 @@ static int anetTcpGenericConnect(char *err, char *addr, int port, int flags) {
sa.sin_addr.s_addr = inAddress; sa.sin_addr.s_addr = inAddress;
} }
if (flags & ANET_CONNECT_NONBLOCK) { if (aeWinSocketConnect(s, (struct sockaddr*)&sa, sizeof(sa)) == SOCKET_ERROR) {
if (anetNonBlock(err,s) != ANET_OK) if ((errno == WSAEWOULDBLOCK || errno == WSA_IO_PENDING)) errno = EINPROGRESS;
return ANET_ERR;
}
if (connect((SOCKET)s, (struct sockaddr*)&sa, sizeof(sa)) == SOCKET_ERROR) {
errno = WSAGetLastError();
if ((errno == WSAEWOULDBLOCK)) errno = EINPROGRESS;
if (errno == EINPROGRESS && flags & ANET_CONNECT_NONBLOCK) { if (errno == EINPROGRESS && flags & ANET_CONNECT_NONBLOCK) {
aeWinSocketAttach(s);
return s; return s;
} }
...@@ -282,9 +276,6 @@ static int anetTcpGenericConnect(char *err, char *addr, int port, int flags) { ...@@ -282,9 +276,6 @@ static int anetTcpGenericConnect(char *err, char *addr, int port, int flags) {
closesocket(s); closesocket(s);
return ANET_ERR; return ANET_ERR;
} }
if (flags & ANET_CONNECT_NONBLOCK) {
aeWinSocketAttach(s);
}
return s; return s;
} }
...@@ -490,8 +481,7 @@ int anetTcpServer(char *err, int port, char *bindaddr) ...@@ -490,8 +481,7 @@ int anetTcpServer(char *err, int port, char *bindaddr)
inAddress = inet_addr(bindaddr); inAddress = inet_addr(bindaddr);
if (inAddress == INADDR_NONE || inAddress == INADDR_ANY) { if (inAddress == INADDR_NONE || inAddress == INADDR_ANY) {
anetSetError(err, "Invalid bind address\n"); anetSetError(err, "Invalid bind address\n");
aeWinSocketDetach(s, 0); aeWinCloseSocket((SOCKET)s);
closesocket((SOCKET)s);
return ANET_ERR; return ANET_ERR;
} }
else { else {
......
...@@ -159,8 +159,7 @@ void migrateCommand(redisClient *c) { ...@@ -159,8 +159,7 @@ void migrateCommand(redisClient *c) {
} }
if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) { if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
#ifdef _WIN32 #ifdef _WIN32
aeWinSocketDetach(fd, 1); aeWinCloseSocket(fd);
closesocket(fd);
#else #else
close(fd); close(fd);
#endif #endif
...@@ -236,8 +235,7 @@ void migrateCommand(redisClient *c) { ...@@ -236,8 +235,7 @@ void migrateCommand(redisClient *c) {
sdsfree(cmd.io.buffer.ptr); sdsfree(cmd.io.buffer.ptr);
#ifdef _WIN32 #ifdef _WIN32
aeWinSocketDetach(fd, 1); aeWinCloseSocket(fd);
closesocket(fd);
#else #else
close(fd); close(fd);
#endif #endif
...@@ -247,8 +245,7 @@ socket_wr_err: ...@@ -247,8 +245,7 @@ socket_wr_err:
addReplySds(c,sdsnew("-IOERR error or timeout writing to target instance\r\n")); addReplySds(c,sdsnew("-IOERR error or timeout writing to target instance\r\n"));
sdsfree(cmd.io.buffer.ptr); sdsfree(cmd.io.buffer.ptr);
#ifdef _WIN32 #ifdef _WIN32
aeWinSocketDetach(fd, 1); aeWinCloseSocket(fd);
closesocket(fd);
#else #else
close(fd); close(fd);
#endif #endif
...@@ -258,8 +255,7 @@ socket_rd_err: ...@@ -258,8 +255,7 @@ socket_rd_err:
addReplySds(c,sdsnew("-IOERR error or timeout reading from target node\r\n")); addReplySds(c,sdsnew("-IOERR error or timeout reading from target node\r\n"));
sdsfree(cmd.io.buffer.ptr); sdsfree(cmd.io.buffer.ptr);
#ifdef _WIN32 #ifdef _WIN32
aeWinSocketDetach(fd, 1); aeWinCloseSocket(fd);
closesocket(fd);
#else #else
close(fd); close(fd);
#endif #endif
......
...@@ -67,8 +67,7 @@ redisClient *createClient(int fd) { ...@@ -67,8 +67,7 @@ redisClient *createClient(int fd) {
readQueryFromClient, c) == AE_ERR) readQueryFromClient, c) == AE_ERR)
{ {
#ifdef _WIN32 #ifdef _WIN32
aeWinSocketDetach(fd, 0); aeWinCloseSocket(fd);
closesocket(fd);
#else #else
close(fd); close(fd);
#endif #endif
...@@ -546,8 +545,7 @@ static void acceptCommonHandler(int fd, int flags) { ...@@ -546,8 +545,7 @@ static void acceptCommonHandler(int fd, int flags) {
"Error registering fd event for the new client: %s (fd=%d)", "Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd); strerror(errno),fd);
#ifdef _WIN32 #ifdef _WIN32
aeWinSocketDetach(fd, 0); aeWinCloseSocket(fd); /* May be already closed, just ingore errors */
closesocket(fd); /* May be already closed, just ingore errors */
#else #else
close(fd); /* May be already closed, just ignore errors */ close(fd); /* May be already closed, just ignore errors */
#endif #endif
...@@ -637,9 +635,6 @@ void freeClient(redisClient *c) { ...@@ -637,9 +635,6 @@ void freeClient(redisClient *c) {
* unblockClientWaitingData() to avoid processInputBuffer() will get * unblockClientWaitingData() to avoid processInputBuffer() will get
* called. Also it is important to remove the file events after * called. Also it is important to remove the file events after
* this, because this call adds the READABLE event. */ * this, because this call adds the READABLE event. */
#ifdef _WIN32
aeWinSocketDetach(c->fd, 1);
#endif
sdsfree(c->querybuf); sdsfree(c->querybuf);
c->querybuf = NULL; c->querybuf = NULL;
if (c->flags & REDIS_BLOCKED) if (c->flags & REDIS_BLOCKED)
...@@ -660,7 +655,7 @@ void freeClient(redisClient *c) { ...@@ -660,7 +655,7 @@ void freeClient(redisClient *c) {
listRelease(c->reply); listRelease(c->reply);
freeClientArgv(c); freeClientArgv(c);
#ifdef _WIN32 #ifdef _WIN32
closesocket(c->fd); aeWinCloseSocket(c->fd);
#else #else
close(c->fd); close(c->fd);
#endif #endif
...@@ -755,7 +750,7 @@ void sendReplyBufferDone(aeEventLoop *el, int fd, void *privdata, int written) { ...@@ -755,7 +750,7 @@ void sendReplyBufferDone(aeEventLoop *el, int fd, void *privdata, int written) {
/* Close connection after entire reply has been sent. */ /* Close connection after entire reply has been sent. */
if (c->flags & REDIS_CLOSE_AFTER_REPLY) { if (c->flags & REDIS_CLOSE_AFTER_REPLY) {
freeClient(c); freeClientAsync(c);
} }
} }
} }
...@@ -775,7 +770,7 @@ void sendReplyListDone(aeEventLoop *el, int fd, void *privdata, int written) { ...@@ -775,7 +770,7 @@ void sendReplyListDone(aeEventLoop *el, int fd, void *privdata, int written) {
/* Close connection after entire reply has been sent. */ /* Close connection after entire reply has been sent. */
if (c->flags & REDIS_CLOSE_AFTER_REPLY){ if (c->flags & REDIS_CLOSE_AFTER_REPLY){
freeClient(c); freeClientAsync(c);
} }
} }
} }
......
...@@ -123,7 +123,8 @@ static void freeClient(client c) { ...@@ -123,7 +123,8 @@ static void freeClient(client c) {
aeDeleteFileEvent(config.el,(int)c->context->fd,AE_WRITABLE); aeDeleteFileEvent(config.el,(int)c->context->fd,AE_WRITABLE);
aeDeleteFileEvent(config.el,(int)c->context->fd,AE_READABLE); aeDeleteFileEvent(config.el,(int)c->context->fd,AE_READABLE);
#ifdef _WIN32 #ifdef _WIN32
aeWinSocketDetach((int)c->context->fd, 1); aeWinCloseSocket((int)c->context->fd);
c->context->fd = 0;
#endif #endif
redisFree(c->context); redisFree(c->context);
sdsfree(c->obuf); sdsfree(c->obuf);
......
...@@ -426,8 +426,7 @@ void replicationAbortSyncTransfer(void) { ...@@ -426,8 +426,7 @@ void replicationAbortSyncTransfer(void) {
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
#ifdef _WIN32 #ifdef _WIN32
aeWinSocketDetach(server.repl_transfer_s, 1); aeWinCloseSocket(server.repl_transfer_s);
closesocket(server.repl_transfer_s);
#else #else
close(server.repl_transfer_s); close(server.repl_transfer_s);
#endif #endif
...@@ -783,8 +782,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { ...@@ -783,8 +782,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
error: error:
#ifdef _WIN32 #ifdef _WIN32
aeWinSocketDetach(fd, 1); aeWinCloseSocket(fd);
closesocket(fd);
#else #else
close(fd); close(fd);
#endif #endif
...@@ -807,8 +805,7 @@ int connectWithMaster(void) { ...@@ -807,8 +805,7 @@ int connectWithMaster(void) {
AE_ERR) AE_ERR)
{ {
#ifdef _WIN32 #ifdef _WIN32
aeWinSocketDetach(fd, 1); aeWinCloseSocket(fd);
closesocket(fd);
#else #else
close(fd); close(fd);
#endif #endif
...@@ -831,8 +828,7 @@ void undoConnectWithMaster(void) { ...@@ -831,8 +828,7 @@ void undoConnectWithMaster(void) {
server.repl_state == REDIS_REPL_RECEIVE_PONG); server.repl_state == REDIS_REPL_RECEIVE_PONG);
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
#ifdef _WIN32 #ifdef _WIN32
aeWinSocketDetach(fd, 1); aeWinCloseSocket(fd);
closesocket(fd);
#else #else
close(fd); close(fd);
#endif #endif
......
...@@ -35,6 +35,7 @@ static fnGetSockState * aeGetSockState; ...@@ -35,6 +35,7 @@ static fnGetSockState * aeGetSockState;
static fnDelSockState * aeDelSockState; static fnDelSockState * aeDelSockState;
static LPFN_ACCEPTEX acceptex; static LPFN_ACCEPTEX acceptex;
static LPFN_CONNECTEX connectex;
static LPFN_GETACCEPTEXSOCKADDRS getaddrs; static LPFN_GETACCEPTEXSOCKADDRS getaddrs;
#define SUCCEEDED_WITH_IOCP(result) \ #define SUCCEEDED_WITH_IOCP(result) \
...@@ -262,6 +263,12 @@ int aeWinSocketSend(int fd, char *buf, int len, int flags, ...@@ -262,6 +263,12 @@ int aeWinSocketSend(int fd, char *buf, int len, int flags,
asendreq *areq; asendreq *areq;
sockstate = aeGetSockState(iocpState, fd); sockstate = aeGetSockState(iocpState, fd);
if (sockstate != NULL &&
(sockstate->masks & CONNECT_PENDING)) {
aeWait(fd, AE_WRITABLE, 50);
}
/* if not an async socket, do normal send */ /* if not an async socket, do normal send */
if (sockstate == NULL || if (sockstate == NULL ||
(sockstate->masks & SOCKET_ATTACHED) == 0 || (sockstate->masks & SOCKET_ATTACHED) == 0 ||
...@@ -304,6 +311,61 @@ int aeWinSocketSend(int fd, char *buf, int len, int flags, ...@@ -304,6 +311,61 @@ int aeWinSocketSend(int fd, char *buf, int len, int flags,
return SOCKET_ERROR; return SOCKET_ERROR;
} }
/* for non-blocking connect with IOCP */
int aeWinSocketConnect(int fd, const struct sockaddr *sa, int len) {
const GUID wsaid_connectex = WSAID_CONNECTEX;
DWORD result, bytes;
SOCKET sock = (SOCKET)fd;
aeSockState *sockstate;
struct sockaddr_in addr;
if (connectex == NULL) {
result = WSAIoctl(sock,
SIO_GET_EXTENSION_FUNCTION_POINTER,
(void *)&wsaid_connectex,
sizeof(GUID),
&connectex,
sizeof(LPFN_CONNECTEX),
&bytes,
NULL,
NULL);
if (result == SOCKET_ERROR) {
connectex = NULL;
return SOCKET_ERROR;
}
}
if ((sockstate = aeGetSockState(iocpState, fd)) == NULL) {
errno = WSAEINVAL;
return SOCKET_ERROR;
}
if (aeWinSocketAttach(fd) != 0) {
return SOCKET_ERROR;
}
memset(&sockstate->ov_read, 0, sizeof(sockstate->ov_read));
/* need to bind sock before connectex */
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = 0;
result = bind(sock, (struct sockaddr *)&addr, sizeof(addr));
result = connectex(sock, sa, len, NULL, 0, NULL, &sockstate->ov_read);
if (result != TRUE) {
result = WSAGetLastError();
if (result == ERROR_IO_PENDING) {
errno = WSA_IO_PENDING;
sockstate->masks |= CONNECT_PENDING;
} else {
errno = result;
return SOCKET_ERROR;
}
}
return 0;
}
/* for each asynch socket, need to associate completion port */ /* for each asynch socket, need to associate completion port */
int aeWinSocketAttach(int fd) { int aeWinSocketAttach(int fd) {
DWORD yes = 1; DWORD yes = 1;
...@@ -341,45 +403,55 @@ int aeWinSocketAttach(int fd) { ...@@ -341,45 +403,55 @@ int aeWinSocketAttach(int fd) {
return 0; return 0;
} }
void aeShutdown(int fd) {
char rbuf[100];
struct timeval timenow;
long long waitmsecs = 50; /* wait up to 50 millisecs */
long long endms;
long long nowms;
/* wait for last item to complete up to tosecs seconds*/
gettimeofday(&timenow, NULL);
endms = ((long long)timenow.tv_sec * 1000) +
((long long)timenow.tv_usec / 1000) + waitmsecs;
if (shutdown(fd, SD_SEND) != SOCKET_ERROR) {
/* read data until no more or error to ensure shutdown completed */
while (1) {
int rc = recv(fd, rbuf, 100, 0);
if (rc == 0 || rc == SOCKET_ERROR)
break;
else {
gettimeofday(&timenow, NULL);
nowms = ((long long)timenow.tv_sec * 1000) +
((long long)timenow.tv_usec / 1000);
if (nowms > endms)
break;
}
}
}
}
/* when closing socket, need to unassociate completion port */ /* when closing socket, need to unassociate completion port */
int aeWinSocketDetach(int fd, int shutd) { int aeWinCloseSocket(int fd) {
aeSockState *sockstate; aeSockState *sockstate;
char rbuf[100];
if ((sockstate = aeGetSockState(iocpState, fd)) == NULL) { if ((sockstate = aeGetSockState(iocpState, fd)) == NULL) {
errno = WSAEINVAL; closesocket((SOCKET)fd);
return -1; return 0;
} }
if (shutd == 1) { aeShutdown(fd);
struct timeval timenow;
struct timeval timeend;
long tosecs = 5;
/* wait for last item to complete up to tosecs seconds*/
gettimeofday(&timenow, NULL);
timeend.tv_sec = timenow.tv_sec + tosecs;
timeend.tv_usec = timenow.tv_usec;
if (shutdown(fd, SD_SEND) != SOCKET_ERROR) {
/* read data until no more or error to ensure shutdown completed */
while (1) {
int rc = recv(fd, rbuf, 100, 0);
if (rc == 0 || rc == SOCKET_ERROR)
break;
else {
gettimeofday(&timenow, NULL);
if (timenow.tv_sec > timeend.tv_sec ||
(timenow.tv_sec == timeend.tv_sec && timenow.tv_usec > timeend.tv_usec))
break;
}
}
} else {
int err = WSAGetLastError();
}
}
sockstate->masks &= ~(SOCKET_ATTACHED | AE_WRITABLE | AE_READABLE); sockstate->masks &= ~(SOCKET_ATTACHED | AE_WRITABLE | AE_READABLE);
if (sockstate->wreqs == 0 &&
(sockstate->masks & (READ_QUEUED | CONNECT_PENDING | SOCKET_ATTACHED)) == 0) {
closesocket((SOCKET)fd);
} else {
sockstate->masks |= CLOSE_PENDING;
}
aeDelSockState(iocpState, sockstate); aeDelSockState(iocpState, sockstate);
return 0; return 0;
} }
......
...@@ -65,6 +65,8 @@ typedef void fnDelSockState(void *apistate, aeSockState *sockState); ...@@ -65,6 +65,8 @@ typedef void fnDelSockState(void *apistate, aeSockState *sockState);
#define SOCKET_ATTACHED 0x000400 #define SOCKET_ATTACHED 0x000400
#define ACCEPT_PENDING 0x000800 #define ACCEPT_PENDING 0x000800
#define LISTEN_SOCK 0x001000 #define LISTEN_SOCK 0x001000
#define CONNECT_PENDING 0x002000
#define CLOSE_PENDING 0x004000
void aeWinInit(void *state, HANDLE iocp, fnGetSockState *getSockState, fnDelSockState *delSockState); void aeWinInit(void *state, HANDLE iocp, fnGetSockState *getSockState, fnDelSockState *delSockState);
void aeWinCleanup(); void aeWinCleanup();
......
...@@ -240,7 +240,7 @@ int pthread_sigmask(int how, const sigset_t *set, sigset_t *oset) { ...@@ -240,7 +240,7 @@ int pthread_sigmask(int how, const sigset_t *set, sigset_t *oset) {
} }
errno = ENOSYS; errno = ENOSYS;
return -1; return 0;
} }
int win32_pthread_join(pthread_t *thread, void **value_ptr) { int win32_pthread_join(pthread_t *thread, void **value_ptr) {
......
...@@ -305,12 +305,13 @@ typedef struct aeWinSendReq { ...@@ -305,12 +305,13 @@ typedef struct aeWinSendReq {
int aeWinSocketAttach(int fd); int aeWinSocketAttach(int fd);
int aeWinSocketDetach(int fd, int shutd); int aeWinCloseSocket(int fd);
int aeWinReceiveDone(int fd); int aeWinReceiveDone(int fd);
int aeWinSocketSend(int fd, char *buf, int len, int flags, int aeWinSocketSend(int fd, char *buf, int len, int flags,
void *eventLoop, void *client, void *data, void *proc); void *eventLoop, void *client, void *data, void *proc);
int aeWinListen(SOCKET sock, int backlog); int aeWinListen(SOCKET sock, int backlog);
int aeWinAccept(int fd, struct sockaddr *sa, socklen_t *len); int aeWinAccept(int fd, struct sockaddr *sa, socklen_t *len);
int aeWinSocketConnect(int fd, const struct sockaddr *sa, int len);
int strerror_r(int err, char* buf, size_t buflen); int strerror_r(int err, char* buf, size_t buflen);
char *wsa_strerror(int err); char *wsa_strerror(int err);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册