diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 7fc8c73a6dfceea51272c5eba3fa0e9ce8955a76..1c5bef86a169396657d5681232ff1e70fd6dca63 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -21,7 +21,6 @@ #include #include -#include #include #include @@ -36,6 +35,7 @@ #include "virerror.h" #include "virprobe.h" #include "virstring.h" +#include "vireventglibwatch.h" #define VIR_FROM_THIS VIR_FROM_RPC @@ -83,9 +83,8 @@ struct _virNetClient { virNetSASLSessionPtr sasl; #endif - /* Self-pipe to wakeup threads waiting in poll() */ - int wakeupSendFD; - int wakeupReadFD; + GMainLoop *eventLoop; + GMainContext *eventCtx; /* * List of calls currently waiting for dispatch @@ -294,25 +293,18 @@ static virNetClientPtr virNetClientNew(virNetSocketPtr sock, const char *hostname) { virNetClientPtr client = NULL; - int wakeupFD[2] = { -1, -1 }; if (virNetClientInitialize() < 0) goto error; - if (pipe2(wakeupFD, O_CLOEXEC) < 0) { - virReportSystemError(errno, "%s", - _("unable to make pipe")); - goto error; - } - if (!(client = virObjectLockableNew(virNetClientClass))) goto error; client->sock = sock; sock = NULL; - client->wakeupReadFD = wakeupFD[0]; - client->wakeupSendFD = wakeupFD[1]; - wakeupFD[0] = wakeupFD[1] = -1; + + client->eventCtx = g_main_context_new(); + client->eventLoop = g_main_loop_new(client->eventCtx, FALSE); client->hostname = g_strdup(hostname); @@ -322,8 +314,6 @@ static virNetClientPtr virNetClientNew(virNetSocketPtr sock, return client; error: - VIR_FORCE_CLOSE(wakeupFD[0]); - VIR_FORCE_CLOSE(wakeupFD[1]); virObjectUnref(client); virObjectUnref(sock); return NULL; @@ -698,8 +688,8 @@ void virNetClientDispose(void *obj) virObjectUnref(client->programs[i]); VIR_FREE(client->programs); - VIR_FORCE_CLOSE(client->wakeupSendFD); - VIR_FORCE_CLOSE(client->wakeupReadFD); + g_main_loop_unref(client->eventLoop); + g_main_context_unref(client->eventCtx); VIR_FREE(client->hostname); @@ -778,6 +768,7 @@ virNetClientCloseLocked(virNetClientPtr client) } } + static void virNetClientCloseInternal(virNetClientPtr client, int reason) { @@ -800,11 +791,7 @@ static void virNetClientCloseInternal(virNetClientPtr client, * queue and close the client because we set client->wantClose. */ if (client->haveTheBuck) { - char ignore = 1; - size_t len = sizeof(ignore); - - if (safewrite(client->wakeupSendFD, &ignore, len) != len) - VIR_ERROR(_("failed to wake up polling thread")); + g_main_loop_quit(client->eventLoop); } else { virNetClientIOEventLoopPassTheBuck(client, NULL); } @@ -831,13 +818,70 @@ void virNetClientSetSASLSession(virNetClientPtr client, #endif +static gboolean +virNetClientIOEventTLS(int fd, + GIOCondition ev, + gpointer opaque); + +static gboolean +virNetClientTLSHandshake(virNetClientPtr client) +{ + GIOCondition ev; + int ret; + + ret = virNetTLSSessionHandshake(client->tls); + + if (ret <= 0) + return FALSE; + + if (virNetTLSSessionGetHandshakeStatus(client->tls) == + VIR_NET_TLS_HANDSHAKE_RECVING) + ev = G_IO_IN; + else + ev = G_IO_OUT; + + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock), + ev, + client->eventCtx, + virNetClientIOEventTLS, client, NULL); + + return TRUE; +} + + +static gboolean +virNetClientIOEventTLS(int fd G_GNUC_UNUSED, + GIOCondition ev G_GNUC_UNUSED, + gpointer opaque) +{ + virNetClientPtr client = opaque; + + if (!virNetClientTLSHandshake(client)) + g_main_loop_quit(client->eventLoop); + + return G_SOURCE_REMOVE; +} + + +static gboolean +virNetClientIOEventTLSConfirm(int fd G_GNUC_UNUSED, + GIOCondition ev G_GNUC_UNUSED, + gpointer opaque) +{ + virNetClientPtr client = opaque; + + g_main_loop_quit(client->eventLoop); + + return G_SOURCE_REMOVE; +} + + int virNetClientSetTLSSession(virNetClientPtr client, virNetTLSContextPtr tls) { int ret; char buf[1]; int len; - struct pollfd fds[1]; #ifndef WIN32 sigset_t oldmask, blockedsigs; @@ -860,22 +904,8 @@ int virNetClientSetTLSSession(virNetClientPtr client, virNetSocketSetTLSSession(client->sock, client->tls); - for (;;) { - ret = virNetTLSSessionHandshake(client->tls); - - if (ret < 0) - goto error; - if (ret == 0) - break; - - fds[0].fd = virNetSocketGetFD(client->sock); - fds[0].revents = 0; - if (virNetTLSSessionGetHandshakeStatus(client->tls) == - VIR_NET_TLS_HANDSHAKE_RECVING) - fds[0].events = POLLIN; - else - fds[0].events = POLLOUT; - + virResetLastError(); + if (virNetClientTLSHandshake(client)) { #ifndef WIN32 /* Block SIGWINCH from interrupting poll in curses programs, * then restore the original signal mask again immediately @@ -885,16 +915,16 @@ int virNetClientSetTLSSession(virNetClientPtr client, ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask)); #endif /* !WIN32 */ - repoll: - ret = poll(fds, G_N_ELEMENTS(fds), -1); - if (ret < 0 && (errno == EAGAIN || errno == EINTR)) - goto repoll; + g_main_loop_run(client->eventLoop); #ifndef WIN32 ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL)); #endif /* !WIN32 */ } + if (virGetLastErrorCode() != VIR_ERR_OK) + goto error; + ret = virNetTLSContextCheckCertificate(tls, client->tls); if (ret < 0) @@ -904,19 +934,17 @@ int virNetClientSetTLSSession(virNetClientPtr client, * etc. If we make the grade, it will send us a '\1' byte. */ - fds[0].fd = virNetSocketGetFD(client->sock); - fds[0].revents = 0; - fds[0].events = POLLIN; + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock), + G_IO_IN, + client->eventCtx, + virNetClientIOEventTLSConfirm, client, NULL); #ifndef WIN32 /* Block SIGWINCH from interrupting poll in curses programs */ ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask)); #endif /* !WIN32 */ - repoll2: - ret = poll(fds, G_N_ELEMENTS(fds), -1); - if (ret < 0 && (errno == EAGAIN || errno == EINTR)) - goto repoll2; + g_main_loop_run(client->eventLoop); #ifndef WIN32 ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL)); @@ -1451,12 +1479,12 @@ virNetClientIOHandleInput(virNetClientPtr client) static bool virNetClientIOEventLoopPollEvents(virNetClientCallPtr call, void *opaque) { - struct pollfd *fd = opaque; + GIOCondition *ev = opaque; if (call->mode == VIR_NET_CLIENT_MODE_WAIT_RX) - fd->events |= POLLIN; + *ev |= G_IO_IN; if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX) - fd->events |= POLLOUT; + *ev |= G_IO_OUT; return false; } @@ -1552,6 +1580,23 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, } +struct virNetClientIOEventData { + virNetClientPtr client; + GIOCondition rev; +}; + +static gboolean +virNetClientIOEventFD(int fd G_GNUC_UNUSED, + GIOCondition ev, + gpointer opaque) +{ + struct virNetClientIOEventData *data = opaque; + data->rev = ev; + g_main_loop_quit(data->client->eventLoop); + return G_SOURCE_REMOVE; +} + + /* * Process all calls pending dispatch/receive until we * get a reply to our own call. Then quit and pass the buck @@ -1563,21 +1608,20 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, static int virNetClientIOEventLoop(virNetClientPtr client, virNetClientCallPtr thiscall) { - struct pollfd fds[2]; bool error = false; int closeReason; - int ret; - - fds[0].fd = virNetSocketGetFD(client->sock); - fds[1].fd = client->wakeupReadFD; for (;;) { - char ignore; #ifndef WIN32 sigset_t oldmask, blockedsigs; #endif /* !WIN32 */ int timeout = -1; virNetMessagePtr msg = NULL; + GIOCondition ev = 0; + struct virNetClientIOEventData data = { + .client = client, + .rev = 0, + }; /* If we have existing SASL decoded data we don't want to sleep in * the poll(), just check if any other FDs are also ready. @@ -1595,22 +1639,22 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (timeout == -1) timeout = virKeepAliveTimeout(client->keepalive); - fds[0].events = fds[0].revents = 0; - fds[1].events = fds[1].revents = 0; - - fds[1].events = POLLIN; - /* Calculate poll events for calls */ virNetClientCallMatchPredicate(client->waitDispatch, virNetClientIOEventLoopPollEvents, - &fds[0]); + &ev); /* We have to be prepared to receive stream data * regardless of whether any of the calls waiting * for dispatch are for streams. */ if (client->nstreams) - fds[0].events |= POLLIN; + ev |= G_IO_IN; + + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock), + ev, + client->eventCtx, + virNetClientIOEventFD, &data, NULL); /* Release lock while poll'ing so other threads * can stuff themselves on the queue */ @@ -1630,13 +1674,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client, sigaddset(&blockedsigs, SIGCHLD); # endif sigaddset(&blockedsigs, SIGPIPE); + ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask)); #endif /* !WIN32 */ - repoll: - ret = poll(fds, G_N_ELEMENTS(fds), timeout); - if (ret < 0 && (errno == EAGAIN || errno == EINTR)) - goto repoll; + g_main_loop_run(client->eventLoop); #ifndef WIN32 ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL)); @@ -1644,12 +1686,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client, virObjectLock(client); - if (ret < 0) { - virReportSystemError(errno, - "%s", _("poll on socket failed")); - goto error; - } - if (virKeepAliveTrigger(client->keepalive, &msg)) { virNetClientMarkClose(client, VIR_CONNECT_CLOSE_REASON_KEEPALIVE); } else if (msg && virNetClientQueueNonBlocking(client, msg) < 0) { @@ -1661,7 +1697,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, * the socket became readable so we consume it */ if (virNetSocketHasCachedData(client->sock)) - fds[0].revents |= POLLIN; + data.rev |= G_IO_IN; /* If wantClose flag is set, pretend there was an error on the socket, * but still read and process any data we received so far. @@ -1669,23 +1705,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (client->wantClose) error = true; - if (fds[1].revents) { - VIR_DEBUG("Woken up from poll by other thread"); - if (saferead(client->wakeupReadFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { - virReportSystemError(errno, "%s", - _("read on wakeup fd failed")); - virNetClientMarkClose(client, VIR_CONNECT_CLOSE_REASON_ERROR); - error = true; - /* Fall through to process any pending data. */ - } - } - - if (fds[0].revents & POLLHUP) + if (data.rev & G_IO_HUP) closeReason = VIR_CONNECT_CLOSE_REASON_EOF; else closeReason = VIR_CONNECT_CLOSE_REASON_ERROR; - if (fds[0].revents & POLLOUT) { + if (data.rev & G_IO_OUT) { if (virNetClientIOHandleOutput(client) < 0) { virNetClientMarkClose(client, closeReason); error = true; @@ -1693,7 +1718,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, } } - if (fds[0].revents & POLLIN) { + if (data.rev & G_IO_IN) { if (virNetClientIOHandleInput(client) < 0) { virNetClientMarkClose(client, closeReason); error = true; @@ -1725,13 +1750,13 @@ static int virNetClientIOEventLoop(virNetClientPtr client, if (error) goto error; - if (fds[0].revents & POLLHUP) { + if (data.rev & G_IO_HUP) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("received hangup event on socket")); virNetClientMarkClose(client, closeReason); goto error; } - if (fds[0].revents & POLLERR) { + if (data.rev & G_IO_ERR) { virReportError(VIR_ERR_INTERNAL_ERROR, "%s", _("received error event on socket")); virNetClientMarkClose(client, closeReason); @@ -1858,15 +1883,8 @@ static int virNetClientIO(virNetClientPtr client, /* Check to see if another thread is dispatching */ if (client->haveTheBuck) { - char ignore = 1; - /* Force other thread to wakeup from poll */ - if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { - virNetClientCallRemove(&client->waitDispatch, thiscall); - virReportSystemError(errno, "%s", - _("failed to wake up polling thread")); - return -1; - } + g_main_loop_quit(client->eventLoop); /* If we are non-blocking, detach the thread and keep the call in the * queue. */