diff --git a/ChangeLog b/ChangeLog index 2ebe11b6fbe491b09c6ea89305aec999d5157389..31784521c03d2486f40cd238d3857c6d8d3b0249 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,12 @@ +Tue Jan 20 16:36:53 GMT 2009 Daniel P. Berrange + + Make RPC call dispatch threaded + * src/libvirt_private.syms, src/util.h, src/util.c: Add + a general virSetNonBlock() helper with portability to + Win32 + * src/remote_internal.c: Re-factor I/O to allow RPC calls + from multiple threads to be handled concurrently. + Tue Jan 20 17:08:20 CET 2009 Daniel Veillard * src/domain_conf.h src/lxc_driver.c src/uml_driver.c: virDomainObj diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index 3f83a07807ecee47e950c35ec894c5a7450aca4d..02f54f1d964a9318604b393dcbd8b80772dc65e8 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -290,6 +290,7 @@ virEnumToString; virEventAddHandle; virEventRemoveHandle; virExec; +virSetNonBlock; virFormatMacAddr; virGetHostname; virParseMacAddr; diff --git a/src/remote_internal.c b/src/remote_internal.c index 82ab7b22692b441ea3f3ffc9317c7759a0f6e0ed..d338844979cf8803ad40b8169ff1aab589abe4ae 100644 --- a/src/remote_internal.c +++ b/src/remote_internal.c @@ -68,6 +68,8 @@ #include +#include + /* AI_ADDRCONFIG is missing on some systems. */ #ifndef AI_ADDRCONFIG # define AI_ADDRCONFIG 0 @@ -86,8 +88,44 @@ #include "util.h" #include "event.h" +#ifdef WIN32 +#define pipe(fds) _pipe(fds,4096, _O_BINARY) +#endif + + static int inside_daemon = 0; +struct remote_thread_call; + + +enum { + REMOTE_MODE_WAIT_TX, + REMOTE_MODE_WAIT_RX, + REMOTE_MODE_COMPLETE, + REMOTE_MODE_ERROR, +}; + +struct remote_thread_call { + int mode; + + /* 4 byte length, followed by RPC message header+body */ + char buffer[4 + REMOTE_MESSAGE_MAX]; + unsigned int bufferLength; + unsigned int bufferOffset; + + unsigned int serial; + unsigned int proc_nr; + + virCond cond; + + xdrproc_t ret_filter; + char *ret; + + remote_error err; + + struct remote_thread_call *next; +}; + struct private_data { virMutex lock; @@ -101,12 +139,24 @@ struct private_data { int localUses; /* Ref count for private data */ char *hostname; /* Original hostname */ FILE *debugLog; /* Debug remote protocol */ + #if HAVE_SASL sasl_conn_t *saslconn; /* SASL context */ + const char *saslDecoded; unsigned int saslDecodedLength; unsigned int saslDecodedOffset; + + const char *saslEncoded; + unsigned int saslEncodedLength; + unsigned int saslEncodedOffset; #endif + + /* 4 byte length, followed by RPC message header+body */ + char buffer[4 + REMOTE_MESSAGE_MAX]; + unsigned int bufferLength; + unsigned int bufferOffset; + /* The list of domain event callbacks */ virDomainEventCallbackListPtr callbackList; /* The queue of domain events generated @@ -114,6 +164,13 @@ struct private_data { virDomainEventQueuePtr domainEvents; /* Timer for flushing domainEvents queue */ int eventFlushTimer; + + /* Self-pipe to wakeup threads waiting in poll() */ + int wakeupSendFD; + int wakeupReadFD; + + /* List of threads currently waiting for dispatch */ + struct remote_thread_call *waitDispatch; }; enum { @@ -160,7 +217,6 @@ static void make_nonnull_network (remote_nonnull_network *net_dst, virNetworkPtr static void make_nonnull_storage_pool (remote_nonnull_storage_pool *pool_dst, virStoragePoolPtr vol_src); static void make_nonnull_storage_vol (remote_nonnull_storage_vol *vol_dst, virStorageVolPtr vol_src); void remoteDomainEventFired(int watch, int fd, int event, void *data); -static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr); static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr); void remoteDomainEventQueueFlush(int timer, void *opaque); /*----------------------------------------------------------------------*/ @@ -274,6 +330,7 @@ doRemoteOpen (virConnectPtr conn, virConnectAuthPtr auth ATTRIBUTE_UNUSED, int flags) { + int wakeupFD[2]; char *transport_str = NULL; if (conn->uri) { @@ -696,6 +753,21 @@ doRemoteOpen (virConnectPtr conn, } /* switch (transport) */ + if (virSetNonBlock(priv->sock) < 0) { + errorf (conn, VIR_ERR_SYSTEM_ERROR, + _("unable to make socket non-blocking %s"), + strerror(errno)); + goto failed; + } + + if (pipe(wakeupFD) < 0) { + errorf (conn, VIR_ERR_SYSTEM_ERROR, + _("unable to make pipe %s"), + strerror(errno)); + goto failed; + } + priv->wakeupReadFD = wakeupFD[0]; + priv->wakeupSendFD = wakeupFD[1]; /* Try and authenticate with server */ if (remoteAuthenticate(conn, priv, 1, auth, authtype) == -1) @@ -768,6 +840,7 @@ doRemoteOpen (virConnectPtr conn, DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. " "continuing without events."); virEventRemoveHandle(priv->watch); + priv->watch = -1; } } /* Successful. */ @@ -848,6 +921,7 @@ remoteOpen (virConnectPtr conn, } remoteDriverLock(priv); priv->localUses = 1; + priv->watch = -1; if (flags & VIR_CONNECT_RO) rflags |= VIR_DRV_OPEN_REMOTE_RO; @@ -1220,6 +1294,7 @@ doRemoteClose (virConnectPtr conn, struct private_data *priv) virEventRemoveTimeout(priv->eventFlushTimer); /* Remove handle for remote events */ virEventRemoveHandle(priv->watch); + priv->watch = -1; } /* Close socket. */ @@ -5537,90 +5612,373 @@ done: /*----------------------------------------------------------------------*/ -static int really_write (virConnectPtr conn, struct private_data *priv, - int in_open, char *bytes, int len); -static int really_read (virConnectPtr conn, struct private_data *priv, - int in_open, char *bytes, int len); -/* This function performs a remote procedure call to procedure PROC_NR. - * - * NB. This does not free the args structure (not desirable, since you - * often want this allocated on the stack or else it contains strings - * which come from the user). It does however free any intermediate - * results, eg. the error structure if there is one. - * - * NB(2). Make sure to memset (&ret, 0, sizeof ret) before calling, - * else Bad Things will happen in the XDR code. - */ -static int -doCall (virConnectPtr conn, struct private_data *priv, - int flags /* if we are in virConnectOpen */, - int proc_nr, - xdrproc_t args_filter, char *args, - xdrproc_t ret_filter, char *ret) -{ - char buffer[REMOTE_MESSAGE_MAX]; - char buffer2[4]; - struct remote_message_header hdr; +static struct remote_thread_call * +prepareCall(virConnectPtr conn, + struct private_data *priv, + int flags, + int proc_nr, + xdrproc_t args_filter, char *args, + xdrproc_t ret_filter, char *ret) +{ XDR xdr; - int len; - struct remote_error rerror; + struct remote_message_header hdr; + struct remote_thread_call *rv; + + if (VIR_ALLOC(rv) < 0) + return NULL; + + if (virCondInit(&rv->cond) < 0) { + VIR_FREE(rv); + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, + VIR_ERR_INTERNAL_ERROR, + _("cannot initialize mutex")); + return NULL; + } /* Get a unique serial number for this message. */ - int serial = priv->counter++; + rv->serial = priv->counter++; + rv->proc_nr = proc_nr; + rv->ret_filter = ret_filter; + rv->ret = ret; hdr.prog = REMOTE_PROGRAM; hdr.vers = REMOTE_PROTOCOL_VERSION; hdr.proc = proc_nr; hdr.direction = REMOTE_CALL; - hdr.serial = serial; + hdr.serial = rv->serial; hdr.status = REMOTE_OK; /* Serialise header followed by args. */ - xdrmem_create (&xdr, buffer, sizeof buffer, XDR_ENCODE); + xdrmem_create (&xdr, rv->buffer+4, REMOTE_MESSAGE_MAX, XDR_ENCODE); if (!xdr_remote_message_header (&xdr, &hdr)) { error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, _("xdr_remote_message_header failed")); - return -1; + goto error; } if (!(*args_filter) (&xdr, args)) { error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, _("marshalling args")); - return -1; + goto error; } /* Get the length stored in buffer. */ - len = xdr_getpos (&xdr); + rv->bufferLength = xdr_getpos (&xdr); xdr_destroy (&xdr); /* Length must include the length word itself (always encoded in * 4 bytes as per RFC 4506). */ - len += 4; + rv->bufferLength += 4; /* Encode the length word. */ - xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_ENCODE); - if (!xdr_int (&xdr, &len)) { + xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE); + if (!xdr_int (&xdr, (int *)&rv->bufferLength)) { error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, _("xdr_int (length word)")); - return -1; + goto error; } xdr_destroy (&xdr); - /* Send length word followed by header+args. */ - if (really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1 || - really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) == -1) - return -1; + return rv; + +error: + xdr_destroy (&xdr); + VIR_FREE(rv); + return NULL; +} + + + +static int +processCallWrite(virConnectPtr conn, + struct private_data *priv, + int in_open /* if we are in virConnectOpen */, + const char *bytes, int len) +{ + int ret; + + if (priv->uses_tls) { + tls_resend: + ret = gnutls_record_send (priv->session, bytes, len); + if (ret < 0) { + if (ret == GNUTLS_E_INTERRUPTED) + goto tls_resend; + if (ret == GNUTLS_E_AGAIN) + return 0; + + error (in_open ? NULL : conn, + VIR_ERR_GNUTLS_ERROR, gnutls_strerror (ret)); + return -1; + } + } else { + resend: + ret = send (priv->sock, bytes, len, 0); + if (ret == -1) { + if (errno == EINTR) + goto resend; + if (errno == EWOULDBLOCK) + return 0; + + error (in_open ? NULL : conn, + VIR_ERR_SYSTEM_ERROR, strerror (errno)); + return -1; + + } + } + + return ret; +} + + +static int +processCallRead(virConnectPtr conn, + struct private_data *priv, + int in_open /* if we are in virConnectOpen */, + char *bytes, int len) +{ + int ret; + + if (priv->uses_tls) { + tls_resend: + ret = gnutls_record_recv (priv->session, bytes, len); + if (ret == GNUTLS_E_INTERRUPTED) + goto tls_resend; + if (ret == GNUTLS_E_AGAIN) + return 0; + + /* Treat 0 == EOF as an error */ + if (ret <= 0) { + if (ret < 0) + errorf (in_open ? NULL : conn, + VIR_ERR_GNUTLS_ERROR, + _("failed to read from TLS socket %s"), + gnutls_strerror (ret)); + else + errorf (in_open ? NULL : conn, + VIR_ERR_SYSTEM_ERROR, + "%s", _("server closed connection")); + return -1; + } + } else { + resend: + ret = recv (priv->sock, bytes, len, 0); + if (ret <= 0) { + if (ret == -1) { + if (errno == EINTR) + goto resend; + if (errno == EWOULDBLOCK) + return 0; + + errorf (in_open ? NULL : conn, + VIR_ERR_SYSTEM_ERROR, + _("failed to read from socket %s"), + strerror (errno)); + } else { + errorf (in_open ? NULL : conn, + VIR_ERR_SYSTEM_ERROR, + "%s", _("server closed connection")); + } + return -1; + } + } + + return ret; +} + + +static int +processCallSendOne(virConnectPtr conn, + struct private_data *priv, + int in_open, + struct remote_thread_call *thecall) +{ +#if HAVE_SASL + if (priv->saslconn) { + const char *output; + unsigned int outputlen; + int err, ret; + + if (!priv->saslEncoded) { + err = sasl_encode(priv->saslconn, + thecall->buffer + thecall->bufferOffset, + thecall->bufferLength - thecall->bufferOffset, + &output, &outputlen); + if (err != SASL_OK) { + errorf (in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR, + _("failed to encode SASL data: %s"), + sasl_errstring(err, NULL, NULL)); + return -1; + } + priv->saslEncoded = output; + priv->saslEncodedLength = outputlen; + priv->saslEncodedOffset = 0; + + thecall->bufferOffset = thecall->bufferLength; + } + + ret = processCallWrite(conn, priv, in_open, + priv->saslEncoded + priv->saslEncodedOffset, + priv->saslEncodedLength - priv->saslEncodedOffset); + if (ret < 0) + return ret; + priv->saslEncodedOffset += ret; + + if (priv->saslEncodedOffset == priv->saslEncodedLength) { + priv->saslEncoded = NULL; + priv->saslEncodedOffset = priv->saslEncodedLength = 0; + thecall->mode = REMOTE_MODE_WAIT_RX; + } + } else { +#endif + int ret; + ret = processCallWrite(conn, priv, in_open, + thecall->buffer + thecall->bufferOffset, + thecall->bufferLength - thecall->bufferOffset); + if (ret < 0) + return ret; + thecall->bufferOffset += ret; + + if (thecall->bufferOffset == thecall->bufferLength) { + thecall->bufferOffset = thecall->bufferLength = 0; + thecall->mode = REMOTE_MODE_WAIT_RX; + } +#if HAVE_SASL + } +#endif + return 0; +} + + +static int +processCallSend(virConnectPtr conn, struct private_data *priv, + int in_open) { + struct remote_thread_call *thecall = priv->waitDispatch; + + while (thecall && + thecall->mode != REMOTE_MODE_WAIT_TX) + thecall = thecall->next; + + if (!thecall) + return -1; /* Shouldn't happen, but you never know... */ + + while (thecall) { + int ret = processCallSendOne(conn, priv, in_open, thecall); + if (ret < 0) + return ret; + + if (thecall->mode == REMOTE_MODE_WAIT_TX) + return 0; /* Blocking write, to back to event loop */ + + thecall = thecall->next; + } + + return 0; /* No more calls to send, all done */ +} + +static int +processCallRecvSome(virConnectPtr conn, struct private_data *priv, + int in_open) { + unsigned int wantData; + + /* Start by reading length word */ + if (priv->bufferLength == 0) + priv->bufferLength = 4; + + wantData = priv->bufferLength - priv->bufferOffset; + +#if HAVE_SASL + if (priv->saslconn) { + if (priv->saslDecoded == NULL) { + char encoded[8192]; + unsigned int encodedLen = sizeof(encoded); + int ret, err; + ret = processCallRead(conn, priv, in_open, + encoded, encodedLen); + if (ret < 0) + return -1; + if (ret == 0) + return 0; + + err = sasl_decode(priv->saslconn, encoded, ret, + &priv->saslDecoded, &priv->saslDecodedLength); + if (err != SASL_OK) { + errorf (in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR, + _("failed to decode SASL data: %s"), + sasl_errstring(err, NULL, NULL)); + return -1; + } + priv->saslDecodedOffset = 0; + } + + if ((priv->saslDecodedLength - priv->saslDecodedOffset) < wantData) + wantData = (priv->saslDecodedLength - priv->saslDecodedOffset); + + memcpy(priv->buffer + priv->bufferOffset, + priv->saslDecoded + priv->saslDecodedOffset, + wantData); + priv->saslDecodedOffset += wantData; + priv->bufferOffset += wantData; + if (priv->saslDecodedOffset == priv->saslDecodedLength) { + priv->saslDecodedLength = priv->saslDecodedLength = 0; + priv->saslDecoded = NULL; + } + + return wantData; + } else { +#endif + int ret; + + ret = processCallRead(conn, priv, in_open, + priv->buffer + priv->bufferOffset, + wantData); + if (ret < 0) + return -1; + if (ret == 0) + return 0; + + priv->bufferOffset += ret; + + return ret; +#if HAVE_SASL + } +#endif +} -retry_read: - /* Read and deserialise length word. */ - if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1) - return -1; - xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE); +static void +processCallAsyncEvent(virConnectPtr conn, struct private_data *priv, + int in_open, + remote_message_header *hdr, + XDR *xdr) { + /* An async message has come in while we were waiting for the + * response. Process it to pull it off the wire, and try again + */ + DEBUG0("Encountered an event while waiting for a response"); + + if (in_open) { + DEBUG("Ignoring bogus event %d received while in open", hdr->proc); + return; + } + + if (hdr->proc == REMOTE_PROC_DOMAIN_EVENT) { + remoteDomainQueueEvent(conn, xdr); + virEventUpdateTimeout(priv->eventFlushTimer, 0); + } else { + DEBUG("Unexpected event proc %d", hdr->proc); + } +} + +static int +processCallRecvLen(virConnectPtr conn, struct private_data *priv, + int in_open) { + XDR xdr; + int len; + + xdrmem_create (&xdr, priv->buffer, priv->bufferLength, XDR_DECODE); if (!xdr_int (&xdr, &len)) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, + error (in_open ? NULL : conn, VIR_ERR_RPC, _("xdr_int (length word, reply)")); return -1; } @@ -5630,76 +5988,96 @@ retry_read: len -= 4; if (len < 0 || len > REMOTE_MESSAGE_MAX) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, + error (in_open ? NULL : conn, VIR_ERR_RPC, _("packet received from server too large")); return -1; } - /* Read reply header and what follows (either a ret or an error). */ - if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len) == -1) - return -1; + /* Extend our declared buffer length and carry + on reading the header + payload */ + priv->bufferLength += len; + DEBUG("Got length, now need %d total (%d more)", priv->bufferLength, len); + return 0; +} + + +static int +processCallRecvMsg(virConnectPtr conn, struct private_data *priv, + int in_open) { + XDR xdr; + struct remote_message_header hdr; + int len = priv->bufferLength - 4; + struct remote_thread_call *thecall; /* Deserialise reply header. */ - xdrmem_create (&xdr, buffer, len, XDR_DECODE); + xdrmem_create (&xdr, priv->buffer + 4, len, XDR_DECODE); if (!xdr_remote_message_header (&xdr, &hdr)) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, + error (in_open ? NULL : conn, VIR_ERR_RPC, _("invalid header in reply")); return -1; } /* Check program, version, etc. are what we expect. */ if (hdr.prog != REMOTE_PROGRAM) { - virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - NULL, NULL, VIR_FROM_REMOTE, - VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, - _("unknown program (received %x, expected %x)"), - hdr.prog, REMOTE_PROGRAM); + virRaiseError (in_open ? NULL : conn, + NULL, NULL, VIR_FROM_REMOTE, + VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, + _("unknown program (received %x, expected %x)"), + hdr.prog, REMOTE_PROGRAM); return -1; } if (hdr.vers != REMOTE_PROTOCOL_VERSION) { - virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - NULL, NULL, VIR_FROM_REMOTE, - VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, - _("unknown protocol version (received %x, expected %x)"), - hdr.vers, REMOTE_PROTOCOL_VERSION); + virRaiseError (in_open ? NULL : conn, + NULL, NULL, VIR_FROM_REMOTE, + VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, + _("unknown protocol version (received %x, expected %x)"), + hdr.vers, REMOTE_PROTOCOL_VERSION); return -1; } - if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT && - hdr.direction == REMOTE_MESSAGE) { - /* An async message has come in while we were waiting for the - * response. Process it to pull it off the wire, and try again - */ - DEBUG0("Encountered an event while waiting for a response"); - - remoteDomainQueueEvent(conn, &xdr); - virEventUpdateTimeout(priv->eventFlushTimer, 0); + /* Async events from server need special handling */ + if (hdr.direction == REMOTE_MESSAGE) { + processCallAsyncEvent(conn, priv, in_open, + &hdr, &xdr); + xdr_destroy(&xdr); + return 0; + } - DEBUG0("Retrying read"); - xdr_destroy (&xdr); - goto retry_read; - } - if (hdr.proc != proc_nr) { - virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - NULL, NULL, VIR_FROM_REMOTE, - VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, - _("unknown procedure (received %x, expected %x)"), - hdr.proc, proc_nr); + if (hdr.direction != REMOTE_REPLY) { + virRaiseError (in_open ? NULL : conn, + NULL, NULL, VIR_FROM_REMOTE, + VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, + _("got unexpected RPC call %d from server"), + hdr.proc); + xdr_destroy(&xdr); return -1; } - if (hdr.direction != REMOTE_REPLY) { - virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, - NULL, NULL, VIR_FROM_REMOTE, - VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, - _("unknown direction (received %x, expected %x)"), - hdr.direction, REMOTE_REPLY); + + /* Ok, definitely got an RPC reply now find + out who's been waiting for it */ + + thecall = priv->waitDispatch; + while (thecall && + thecall->serial != hdr.serial) + thecall = thecall->next; + + if (!thecall) { + virRaiseError (in_open ? NULL : conn, + NULL, NULL, VIR_FROM_REMOTE, + VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, + _("no call waiting for reply with serial %d"), + hdr.serial); + xdr_destroy(&xdr); return -1; } - if (hdr.serial != serial) { - virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE, - VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, - _("unknown serial (received %x, expected %x)"), - hdr.serial, serial); + + if (hdr.proc != thecall->proc_nr) { + virRaiseError (in_open ? NULL : conn, + NULL, NULL, VIR_FROM_REMOTE, + VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, + _("unknown procedure (received %x, expected %x)"), + hdr.proc, thecall->proc_nr); + xdr_destroy (&xdr); return -1; } @@ -5709,40 +6087,31 @@ retry_read: */ switch (hdr.status) { case REMOTE_OK: - if (!(*ret_filter) (&xdr, ret)) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC, + if (!(*thecall->ret_filter) (&xdr, thecall->ret)) { + error (in_open ? NULL : conn, VIR_ERR_RPC, _("unmarshalling ret")); return -1; } + thecall->mode = REMOTE_MODE_COMPLETE; xdr_destroy (&xdr); return 0; case REMOTE_ERROR: - memset (&rerror, 0, sizeof rerror); - if (!xdr_remote_error (&xdr, &rerror)) { - error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, + memset (&thecall->err, 0, sizeof thecall->err); + if (!xdr_remote_error (&xdr, &thecall->err)) { + error (in_open ? NULL : conn, VIR_ERR_RPC, _("unmarshalling remote_error")); return -1; } xdr_destroy (&xdr); - /* See if caller asked us to keep quiet about missing RPCs - * eg for interop with older servers */ - if (flags & REMOTE_CALL_QUIET_MISSING_RPC && - rerror.domain == VIR_FROM_REMOTE && - rerror.code == VIR_ERR_RPC && - rerror.level == VIR_ERR_ERROR && - STRPREFIX(*rerror.message, "unknown procedure")) { - return -2; - } - server_error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, &rerror); - xdr_free ((xdrproc_t) xdr_remote_error, (char *) &rerror); - return -1; + thecall->mode = REMOTE_MODE_ERROR; + return 0; default: - virRaiseError (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE, - VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, - _("unknown status (received %x)"), - hdr.status); + virRaiseError (in_open ? NULL : conn, NULL, NULL, VIR_FROM_REMOTE, + VIR_ERR_RPC, VIR_ERR_ERROR, NULL, NULL, NULL, 0, 0, + _("unknown status (received %x)"), + hdr.status); xdr_destroy (&xdr); return -1; } @@ -5750,225 +6119,429 @@ retry_read: static int -call (virConnectPtr conn, struct private_data *priv, - int flags /* if we are in virConnectOpen */, - int proc_nr, - xdrproc_t args_filter, char *args, - xdrproc_t ret_filter, char *ret) -{ - int rv; - /* - * Avoid needless wake-ups of the event loop in the - * case where this call is being made from a different - * thread than the event loop. These wake-ups would - * cause the event loop thread to be blocked on the - * mutex for the duration of the call +processCallRecv(virConnectPtr conn, struct private_data *priv, + int in_open) { + int ret; + + /* Read as much data as is available, until we get + * EGAIN */ - if (priv->watch >= 0) - virEventUpdateHandle(priv->watch, 0); + for (;;) { + ret = processCallRecvSome(conn, priv, in_open); - rv = doCall(conn, priv,flags, proc_nr, - args_filter, args, - ret_filter, ret); + if (ret < 0) + return -1; + if (ret == 0) + return 0; /* Blocking on read */ - if (priv->watch >= 0) - virEventUpdateHandle(priv->watch, VIR_EVENT_HANDLE_READABLE); - return rv; + /* Check for completion of our goal */ + if (priv->bufferOffset == priv->bufferLength) { + if (priv->bufferOffset == 4) { + ret = processCallRecvLen(conn, priv, in_open); + } else { + ret = processCallRecvMsg(conn, priv, in_open); + priv->bufferOffset = priv->bufferLength = 0; + } + if (ret < 0) + return -1; + } + } } +/* + * Process all calls pending dispatch/receive until we + * get a reply to our own call. Then quit and pass the buck + * to someone else. + */ static int -really_write_buf (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - const char *bytes, int len) +processCalls(virConnectPtr conn, + struct private_data *priv, + int in_open, + struct remote_thread_call *thiscall) { - const char *p; - int err; + struct pollfd fds[2]; + int ret; - p = bytes; - if (priv->uses_tls) { - do { - err = gnutls_record_send (priv->session, p, len); - if (err < 0) { - if (err == GNUTLS_E_INTERRUPTED || err == GNUTLS_E_AGAIN) - continue; - error (in_open ? NULL : conn, - VIR_ERR_GNUTLS_ERROR, gnutls_strerror (err)); + fds[0].fd = priv->sock; + fds[1].fd = priv->wakeupReadFD; + + for (;;) { + struct remote_thread_call *tmp = priv->waitDispatch; + struct remote_thread_call *prev; + char ignore; + + fds[0].events = fds[0].revents = 0; + fds[1].events = fds[1].revents = 0; + + fds[1].events = POLLIN; + while (tmp) { + if (tmp->mode == REMOTE_MODE_WAIT_RX) + fds[0].events |= POLLIN; + if (tmp->mode == REMOTE_MODE_WAIT_TX) + fds[0].events |= POLLOUT; + + tmp = tmp->next; + } + + /* Release lock while poll'ing so other threads + * can stuff themselves on the queue */ + remoteDriverUnlock(priv); + + repoll: + ret = poll(fds, ARRAY_CARDINALITY(fds), -1); + if (ret < 0 && errno == EINTR) + goto repoll; + remoteDriverLock(priv); + + if (fds[1].revents) { + DEBUG0("Woken up from poll by other thread"); + saferead(priv->wakeupReadFD, &ignore, sizeof(ignore)); + } + + if (ret < 0) { + if (errno == EWOULDBLOCK) + continue; + errorf (in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR, + _("poll on socket failed %s"), strerror(errno)); + return -1; + } + + if (fds[0].revents & POLLOUT) { + if (processCallSend(conn, priv, in_open) < 0) return -1; - } - len -= err; - p += err; } - while (len > 0); - } else { - do { - err = send (priv->sock, p, len, 0); - if (err == -1) { - if (errno == EINTR || errno == EAGAIN) - continue; - error (in_open ? NULL : conn, - VIR_ERR_SYSTEM_ERROR, strerror (errno)); + + if (fds[0].revents & POLLIN) { + if (processCallRecv(conn, priv, in_open) < 0) return -1; + } + + /* Iterate through waiting threads and if + * any are complete then tell 'em to wakeup + */ + tmp = priv->waitDispatch; + prev = NULL; + while (tmp) { + if (tmp != thiscall && + (tmp->mode == REMOTE_MODE_COMPLETE || + tmp->mode == REMOTE_MODE_ERROR)) { + /* Take them out of the list */ + if (prev) + prev->next = tmp->next; + else + priv->waitDispatch = tmp->next; + + /* And wake them up.... + * ...they won't actually wakeup until + * we release our mutex a short while + * later... + */ + DEBUG("Waking up sleep %d %p %p", tmp->proc_nr, tmp, priv->waitDispatch); + virCondSignal(&tmp->cond); } - len -= err; - p += err; + prev = tmp; + tmp = tmp->next; } - while (len > 0); - } - return 0; -} + /* Now see if *we* are done */ + if (thiscall->mode == REMOTE_MODE_COMPLETE || + thiscall->mode == REMOTE_MODE_ERROR) { + /* We're at head of the list already, so + * remove us + */ + priv->waitDispatch = thiscall->next; + DEBUG("Giving up the buck %d %p %p", thiscall->proc_nr, thiscall, priv->waitDispatch); + /* See if someone else is still waiting + * and if so, then pass the buck ! */ + if (priv->waitDispatch) { + DEBUG("Passing the buck to %d %p", priv->waitDispatch->proc_nr, priv->waitDispatch); + virCondSignal(&priv->waitDispatch->cond); + } + return 0; + } -static int -really_write_plain (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) -{ - return really_write_buf(conn, priv, in_open, bytes, len); + + if (fds[0].revents & (POLLHUP | POLLERR)) { + errorf(in_open ? NULL : conn, VIR_ERR_INTERNAL_ERROR, + "%s", _("received hangup / error event on socket")); + return -1; + } + } } -#if HAVE_SASL +/* + * This function performs a remote procedure call to procedure PROC_NR. + * + * NB. This does not free the args structure (not desirable, since you + * often want this allocated on the stack or else it contains strings + * which come from the user). It does however free any intermediate + * results, eg. the error structure if there is one. + * + * NB(2). Make sure to memset (&ret, 0, sizeof ret) before calling, + * else Bad Things will happen in the XDR code. + * + * NB(3) You must have the private_data lock before calling this + * + * NB(4) This is very complicated. Due to connection cloning, multiple + * threads can want to use the socket at once. Obviously only one of + * them can. So if someone's using the socket, other threads are put + * to sleep on condition variables. THe existing thread may completely + * send & receive their RPC call/reply while they're asleep. Or it + * may only get around to dealing with sending the call. Or it may + * get around to neither. So upon waking up from slumber, the other + * thread may or may not have more work todo. + * + * We call this dance 'passing the buck' + * + * http://en.wikipedia.org/wiki/Passing_the_buck + * + * "Buck passing or passing the buck is the action of transferring + * responsibility or blame unto another person. It is also used as + * a strategy in power politics when the actions of one country/ + * nation are blamed on another, providing an opportunity for war." + * + * NB(5) Don't Panic! + */ static int -really_write_sasl (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) +call (virConnectPtr conn, struct private_data *priv, + int flags /* if we are in virConnectOpen */, + int proc_nr, + xdrproc_t args_filter, char *args, + xdrproc_t ret_filter, char *ret) { - const char *output; - unsigned int outputlen; - int err; + int rv; + struct remote_thread_call *thiscall; - err = sasl_encode(priv->saslconn, bytes, len, &output, &outputlen); - if (err != SASL_OK) { + DEBUG("Doing call %d %p", proc_nr, priv->waitDispatch); + thiscall = prepareCall(conn, priv, flags, proc_nr, + args_filter, args, + ret_filter, ret); + + if (!thiscall) { + error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, + VIR_ERR_NO_MEMORY, NULL); return -1; } - return really_write_buf(conn, priv, in_open, output, outputlen); -} -#endif - -static int -really_write (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) -{ -#if HAVE_SASL - if (priv->saslconn) - return really_write_sasl(conn, priv, in_open, bytes, len); - else -#endif - return really_write_plain(conn, priv, in_open, bytes, len); -} + /* Check to see if another thread is dispatching */ + if (priv->waitDispatch) { + /* Stick ourselves on the end of the wait queue */ + struct remote_thread_call *tmp = priv->waitDispatch; + char ignore = 1; + while (tmp && tmp->next) + tmp = tmp->next; + if (tmp) + tmp->next = thiscall; + else + priv->waitDispatch = thiscall; -static int -really_read_buf (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) -{ - int err; + /* Force other thread to wakup from poll */ + safewrite(priv->wakeupSendFD, &ignore, sizeof(ignore)); - if (priv->uses_tls) { - tlsreread: - err = gnutls_record_recv (priv->session, bytes, len); - if (err < 0) { - if (err == GNUTLS_E_INTERRUPTED) - goto tlsreread; - error (in_open ? NULL : conn, - VIR_ERR_GNUTLS_ERROR, gnutls_strerror (err)); + DEBUG("Going to sleep %d %p %p", proc_nr, priv->waitDispatch, thiscall); + /* Go to sleep while other thread is working... */ + if (virCondWait(&thiscall->cond, &priv->lock) < 0) { + if (priv->waitDispatch == thiscall) { + priv->waitDispatch = thiscall->next; + } else { + tmp = priv->waitDispatch; + while (tmp && tmp->next && + tmp->next != thiscall) { + tmp = tmp->next; + } + if (tmp && tmp->next == thiscall) + tmp->next = thiscall->next; + } + errorf(flags & REMOTE_CALL_IN_OPEN ? NULL : conn, + VIR_ERR_INTERNAL_ERROR, "%s", + _("failed to wait on condition")); + VIR_FREE(thiscall); return -1; } - if (err == 0) { - error (in_open ? NULL : conn, - VIR_ERR_RPC, _("socket closed unexpectedly")); - return -1; + + DEBUG("Wokeup from sleep %d %p %p", proc_nr, priv->waitDispatch, thiscall); + /* Two reasons we can be woken up + * 1. Other thread has got our reply ready for us + * 2. Other thread is all done, and it is our turn to + * be the dispatcher to finish waiting for + * our reply + */ + if (thiscall->mode == REMOTE_MODE_COMPLETE || + thiscall->mode == REMOTE_MODE_ERROR) { + /* + * We avoided catching the buck and our reply is ready ! + * We've already had 'thiscall' removed from the list + * so just need to (maybe) handle errors & free it + */ + goto cleanup; } + + /* Grr, someone passed the buck onto us ... */ + } else { - reread: - err = recv (priv->sock, bytes, len, 0); - if (err == -1) { - if (errno == EINTR) - goto reread; - error (in_open ? NULL : conn, - VIR_ERR_SYSTEM_ERROR, strerror (errno)); - return -1; - } - if (err == 0) { - error (in_open ? NULL : conn, - VIR_ERR_RPC, _("socket closed unexpectedly")); - return -1; + /* We're first to catch the buck */ + priv->waitDispatch = thiscall; + } + + DEBUG("We have the buck %d %p %p", proc_nr, priv->waitDispatch, thiscall); + /* + * The buck stops here! + * + * At this point we're about to own the dispatch + * process... + */ + + /* + * Avoid needless wake-ups of the event loop in the + * case where this call is being made from a different + * thread than the event loop. These wake-ups would + * cause the event loop thread to be blocked on the + * mutex for the duration of the call + */ + if (priv->watch >= 0) + virEventUpdateHandle(priv->watch, 0); + + rv = processCalls(conn, priv, + flags & REMOTE_CALL_IN_OPEN ? 1 : 0, + thiscall); + + if (priv->watch >= 0) + virEventUpdateHandle(priv->watch, VIR_EVENT_HANDLE_READABLE); + + if (rv < 0) { + VIR_FREE(thiscall); + return -1; + } + +cleanup: + DEBUG("All done with our call %d %p %p", proc_nr, priv->waitDispatch, thiscall); + if (thiscall->mode == REMOTE_MODE_ERROR) { + /* See if caller asked us to keep quiet about missing RPCs + * eg for interop with older servers */ + if (flags & REMOTE_CALL_QUIET_MISSING_RPC && + thiscall->err.domain == VIR_FROM_REMOTE && + thiscall->err.code == VIR_ERR_RPC && + thiscall->err.level == VIR_ERR_ERROR && + STRPREFIX(*thiscall->err.message, "unknown procedure")) { + rv = -2; + } else { + server_error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, + &thiscall->err); + rv = -1; } + } else { + rv = 0; } + VIR_FREE(thiscall); + return rv; +} - return err; +/** + * remoteDomainReadEvent + * + * Read the event data off the wire + */ +static virDomainEventPtr +remoteDomainReadEvent(virConnectPtr conn, XDR *xdr) +{ + remote_domain_event_ret ret; + virDomainPtr dom; + virDomainEventPtr event = NULL; + memset (&ret, 0, sizeof ret); + + /* unmarshall parameters, and process it*/ + if (! xdr_remote_domain_event_ret(xdr, &ret) ) { + error (conn, VIR_ERR_RPC, + _("remoteDomainProcessEvent: unmarshalling ret")); + return NULL; + } + + dom = get_nonnull_domain(conn,ret.dom); + if (!dom) + return NULL; + + event = virDomainEventNewFromDom(dom, ret.event, ret.detail); + + virDomainFree(dom); + return event; } -static int -really_read_plain (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) +static void +remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr) { - do { - int ret = really_read_buf (conn, priv, in_open, bytes, len); - if (ret < 0) - return -1; + struct private_data *priv = conn->privateData; + virDomainEventPtr event; - len -= ret; - bytes += ret; - } while (len > 0); + event = remoteDomainReadEvent(conn, xdr); + if (!event) + return; - return 0; + if (virDomainEventQueuePush(priv->domainEvents, + event) < 0) { + DEBUG0("Error adding event to queue"); + virDomainEventFree(event); + } } -#if HAVE_SASL -static int -really_read_sasl (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) +/** remoteDomainEventFired: + * + * The callback for monitoring the remote socket + * for event data + */ +void +remoteDomainEventFired(int watch, + int fd, + int event, + void *opaque) { - do { - int want, got; - if (priv->saslDecoded == NULL) { - char encoded[8192]; - int encodedLen = sizeof(encoded); - int err, ret; - ret = really_read_buf (conn, priv, in_open, encoded, encodedLen); - if (ret < 0) - return -1; + virConnectPtr conn = opaque; + struct private_data *priv = conn->privateData; - err = sasl_decode(priv->saslconn, encoded, ret, - &priv->saslDecoded, &priv->saslDecodedLength); - } + remoteDriverLock(priv); - got = priv->saslDecodedLength - priv->saslDecodedOffset; - want = len; - if (want > got) - want = got; + /* This should be impossible, but it doesn't hurt to check */ + if (priv->waitDispatch) + goto done; - memcpy(bytes, priv->saslDecoded + priv->saslDecodedOffset, want); - priv->saslDecodedOffset += want; - if (priv->saslDecodedOffset == priv->saslDecodedLength) { - priv->saslDecoded = NULL; - priv->saslDecodedOffset = priv->saslDecodedLength = 0; - } - bytes += want; - len -= want; - } while (len > 0); + DEBUG("Event fired %d %d %d %X", watch, fd, event, event); - return 0; + if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) { + DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or " + "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__); + virEventRemoveHandle(watch); + priv->watch = -1; + goto done; + } + + if (fd != priv->sock) { + virEventRemoveHandle(watch); + priv->watch = -1; + goto done; + } + + if (processCallRecv(conn, priv, 0) < 0) + DEBUG0("Something went wrong during async message processing"); + +done: + remoteDriverUnlock(priv); } -#endif -static int -really_read (virConnectPtr conn, struct private_data *priv, - int in_open /* if we are in virConnectOpen */, - char *bytes, int len) +void +remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque) { -#if HAVE_SASL - if (priv->saslconn) - return really_read_sasl (conn, priv, in_open, bytes, len); - else -#endif - return really_read_plain (conn, priv, in_open, bytes, len); + virConnectPtr conn = opaque; + struct private_data *priv = conn->privateData; + + remoteDriverLock(priv); + + virDomainEventQueueDispatch(priv->domainEvents, priv->callbackList, + virDomainEventDispatchDefaultFunc, NULL); + virEventUpdateTimeout(priv->eventFlushTimer, -1); + + remoteDriverUnlock(priv); } + /* For errors internal to this library. */ static void error (virConnectPtr conn, virErrorNumber code, const char *info) @@ -6267,161 +6840,3 @@ remoteRegister (void) return 0; } -/** - * remoteDomainReadEvent - * - * Read the event data off the wire - */ -static virDomainEventPtr -remoteDomainReadEvent(virConnectPtr conn, XDR *xdr) -{ - remote_domain_event_ret ret; - virDomainPtr dom; - virDomainEventPtr event = NULL; - memset (&ret, 0, sizeof ret); - - /* unmarshall parameters, and process it*/ - if (! xdr_remote_domain_event_ret(xdr, &ret) ) { - error (conn, VIR_ERR_RPC, - _("remoteDomainProcessEvent: unmarshalling ret")); - return NULL; - } - - dom = get_nonnull_domain(conn,ret.dom); - if (!dom) - return NULL; - - event = virDomainEventNewFromDom(dom, ret.event, ret.detail); - - virDomainFree(dom); - return event; -} - -static void -remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr) -{ - struct private_data *priv = conn->privateData; - virDomainEventPtr event; - - event = remoteDomainReadEvent(conn, xdr); - if (!event) - return; - - DEBUG0("Calling domain event callbacks (no queue)"); - virDomainEventDispatch(event, priv->callbackList, - virDomainEventDispatchDefaultFunc, NULL); - virDomainEventFree(event); -} - -static void -remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr) -{ - struct private_data *priv = conn->privateData; - virDomainEventPtr event; - - event = remoteDomainReadEvent(conn, xdr); - if (!event) - return; - - if (virDomainEventQueuePush(priv->domainEvents, - event) < 0) { - DEBUG0("Error adding event to queue"); - virDomainEventFree(event); - } -} - -/** remoteDomainEventFired: - * - * The callback for monitoring the remote socket - * for event data - */ -void -remoteDomainEventFired(int watch, - int fd, - int event, - void *opaque) -{ - char buffer[REMOTE_MESSAGE_MAX]; - char buffer2[4]; - struct remote_message_header hdr; - XDR xdr; - int len; - - virConnectPtr conn = opaque; - struct private_data *priv = conn->privateData; - - remoteDriverLock(priv); - - DEBUG("Event fired %d %d %d %X", watch, fd, event, event); - - if (event & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR)) { - DEBUG("%s : VIR_EVENT_HANDLE_HANGUP or " - "VIR_EVENT_HANDLE_ERROR encountered", __FUNCTION__); - virEventRemoveHandle(watch); - goto done; - } - - if (fd != priv->sock) { - virEventRemoveHandle(watch); - goto done; - } - - /* Read and deserialise length word. */ - if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1) - goto done; - - xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE); - if (!xdr_int (&xdr, &len)) { - error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)")); - goto done; - } - xdr_destroy (&xdr); - - /* Length includes length word - adjust to real length to read. */ - len -= 4; - - if (len < 0 || len > REMOTE_MESSAGE_MAX) { - error (conn, VIR_ERR_RPC, _("packet received from server too large")); - goto done; - } - - /* Read reply header and what follows (either a ret or an error). */ - if (really_read (conn, priv, 0, buffer, len) == -1) { - error (conn, VIR_ERR_RPC, _("error reading buffer from memory")); - goto done; - } - - /* Deserialise reply header. */ - xdrmem_create (&xdr, buffer, len, XDR_DECODE); - if (!xdr_remote_message_header (&xdr, &hdr)) { - error (conn, VIR_ERR_RPC, _("invalid header in event firing")); - goto done; - } - - if (hdr.proc == REMOTE_PROC_DOMAIN_EVENT && - hdr.direction == REMOTE_MESSAGE) { - DEBUG0("Encountered an async event"); - remoteDomainProcessEvent(conn, &xdr); - } else { - DEBUG0("invalid proc in event firing"); - error (conn, VIR_ERR_RPC, _("invalid proc in event firing")); - } - -done: - remoteDriverUnlock(priv); -} - -void -remoteDomainEventQueueFlush(int timer ATTRIBUTE_UNUSED, void *opaque) -{ - virConnectPtr conn = opaque; - struct private_data *priv = conn->privateData; - - remoteDriverLock(priv); - - virDomainEventQueueDispatch(priv->domainEvents, priv->callbackList, - virDomainEventDispatchDefaultFunc, NULL); - virEventUpdateTimeout(priv->eventFlushTimer, -1); - - remoteDriverUnlock(priv); -} diff --git a/src/util.c b/src/util.c index 6a8d4d36c21ecdcffa0fa7c38395e955b3feec84..e19500b90c5059bba2d5c97c6489067b38efd5f8 100644 --- a/src/util.c +++ b/src/util.c @@ -34,6 +34,7 @@ #include #include #include +#include #if HAVE_SYS_WAIT_H #include #endif @@ -155,25 +156,35 @@ virArgvToString(const char *const *argv) return ret; } - -#ifndef __MINGW32__ - -static int virSetCloseExec(int fd) { +int virSetNonBlock(int fd) { +#ifndef WIN32 int flags; - if ((flags = fcntl(fd, F_GETFD)) < 0) + if ((flags = fcntl(fd, F_GETFL)) < 0) return -1; - flags |= FD_CLOEXEC; - if ((fcntl(fd, F_SETFD, flags)) < 0) + flags |= O_NONBLOCK; + if ((fcntl(fd, F_SETFL, flags)) < 0) return -1; +#else + unsigned long flag = 1; + + /* This is actually Gnulib's replacement rpl_ioctl function. + * We can't call ioctlsocket directly in any case. + */ + if (ioctl (fd, FIONBIO, (void *) &flag) == -1) + return -1; +#endif return 0; } -static int virSetNonBlock(int fd) { + +#ifndef WIN32 + +static int virSetCloseExec(int fd) { int flags; - if ((flags = fcntl(fd, F_GETFL)) < 0) + if ((flags = fcntl(fd, F_GETFD)) < 0) return -1; - flags |= O_NONBLOCK; - if ((fcntl(fd, F_SETFL, flags)) < 0) + flags |= FD_CLOEXEC; + if ((fcntl(fd, F_SETFD, flags)) < 0) return -1; return 0; } diff --git a/src/util.h b/src/util.h index 4fad6df8a1aaefba9a29cface1a0cc13cb64a0d6..81cfad6b01ae3efd547db21dfdc9ac10ab85a417 100644 --- a/src/util.h +++ b/src/util.h @@ -38,6 +38,8 @@ enum { VIR_EXEC_DAEMON = (1 << 1), }; +int virSetNonBlock(int fd); + int virExec(virConnectPtr conn, const char *const*argv, const char *const*envp,