提交 3178df9a 编写于 作者: M Michal Privoznik

virCommand: Don't misuse the eventloop for async IO

Currently, if a command wants to do asynchronous IO, a callback
is registered in the libvirtd eventloop to handle writes and
reads. However, there's a race in virCommandWait. The eventloop
may already be executing the callback, while virCommandWait is
mangling internal state of virCommand. To deal with it, we need
to either introduce locking or spawn a separate thread where we
poll() on stdio from child. The former, however, requires to
unlock all mutexes held, as the event loop may execute other
callbacks which tries to lock one of the mutexes, deadlock and
thus never wake us up. So it's safer to spawn a separate thread.
上级 bbd09adb
...@@ -42,6 +42,7 @@ ...@@ -42,6 +42,7 @@
#include "virpidfile.h" #include "virpidfile.h"
#include "virprocess.h" #include "virprocess.h"
#include "virbuffer.h" #include "virbuffer.h"
#include "virthread.h"
#define VIR_FROM_THIS VIR_FROM_NONE #define VIR_FROM_THIS VIR_FROM_NONE
...@@ -80,15 +81,13 @@ struct _virCommand { ...@@ -80,15 +81,13 @@ struct _virCommand {
char **errbuf; char **errbuf;
int infd; int infd;
int inpipe;
int outfd; int outfd;
int errfd; int errfd;
int *outfdptr; int *outfdptr;
int *errfdptr; int *errfdptr;
size_t inbufOffset; virThreadPtr asyncioThread;
int inWatch;
int outWatch;
int errWatch;
bool handshake; bool handshake;
int handshakeWait[2]; int handshakeWait[2];
...@@ -784,8 +783,7 @@ virCommandNewArgs(const char *const*args) ...@@ -784,8 +783,7 @@ virCommandNewArgs(const char *const*args)
cmd->handshakeNotify[0] = -1; cmd->handshakeNotify[0] = -1;
cmd->handshakeNotify[1] = -1; cmd->handshakeNotify[1] = -1;
cmd->infd = cmd->outfd = cmd->errfd = -1; cmd->infd = cmd->inpipe = cmd->outfd = cmd->errfd = -1;
cmd->inWatch = cmd->outWatch = cmd->errWatch = -1;
cmd->pid = -1; cmd->pid = -1;
virCommandAddArgSet(cmd, args); virCommandAddArgSet(cmd, args);
...@@ -1703,19 +1701,17 @@ virCommandToString(virCommandPtr cmd) ...@@ -1703,19 +1701,17 @@ virCommandToString(virCommandPtr cmd)
* Manage input and output to the child process. * Manage input and output to the child process.
*/ */
static int static int
virCommandProcessIO(virCommandPtr cmd, int *inpipe) virCommandProcessIO(virCommandPtr cmd)
{ {
int infd = -1, outfd = -1, errfd = -1; int outfd = -1, errfd = -1;
size_t inlen = 0, outlen = 0, errlen = 0; size_t inlen = 0, outlen = 0, errlen = 0;
size_t inoff = 0; size_t inoff = 0;
int ret = 0; int ret = 0;
/* With an input buffer, feed data to child /* With an input buffer, feed data to child
* via pipe */ * via pipe */
if (cmd->inbuf) { if (cmd->inbuf)
inlen = strlen(cmd->inbuf); inlen = strlen(cmd->inbuf);
infd = *inpipe;
}
/* With out/err buffer, the outfd/errfd have been filled with an /* With out/err buffer, the outfd/errfd have been filled with an
* FD for us. Guarantee an allocated string with partial results * FD for us. Guarantee an allocated string with partial results
...@@ -1744,8 +1740,8 @@ virCommandProcessIO(virCommandPtr cmd, int *inpipe) ...@@ -1744,8 +1740,8 @@ virCommandProcessIO(virCommandPtr cmd, int *inpipe)
struct pollfd fds[3]; struct pollfd fds[3];
int nfds = 0; int nfds = 0;
if (infd != -1) { if (cmd->inpipe != -1) {
fds[nfds].fd = infd; fds[nfds].fd = cmd->inpipe;
fds[nfds].events = POLLOUT; fds[nfds].events = POLLOUT;
fds[nfds].revents = 0; fds[nfds].revents = 0;
nfds++; nfds++;
...@@ -1817,21 +1813,19 @@ virCommandProcessIO(virCommandPtr cmd, int *inpipe) ...@@ -1817,21 +1813,19 @@ virCommandProcessIO(virCommandPtr cmd, int *inpipe)
} }
if (fds[i].revents & (POLLOUT | POLLERR) && if (fds[i].revents & (POLLOUT | POLLERR) &&
fds[i].fd == infd) { fds[i].fd == cmd->inpipe) {
int done; int done;
/* Coverity 5.3.0 can't see that we only get here if /* Coverity 5.3.0 can't see that we only get here if
* infd is in the set because it was non-negative. */ * infd is in the set because it was non-negative. */
sa_assert(infd != -1); sa_assert(infd != -1);
done = write(infd, cmd->inbuf + inoff, done = write(cmd->inpipe, cmd->inbuf + inoff,
inlen - inoff); inlen - inoff);
if (done < 0) { if (done < 0) {
if (errno == EPIPE) { if (errno == EPIPE) {
VIR_DEBUG("child closed stdin early, ignoring EPIPE " VIR_DEBUG("child closed stdin early, ignoring EPIPE "
"on fd %d", infd); "on fd %d", cmd->inpipe);
if (VIR_CLOSE(*inpipe) < 0) VIR_FORCE_CLOSE(cmd->inpipe);
VIR_DEBUG("ignoring failed close on fd %d", infd);
infd = -1;
} else if (errno != EINTR && errno != EAGAIN) { } else if (errno != EINTR && errno != EAGAIN) {
virReportSystemError(errno, "%s", virReportSystemError(errno, "%s",
_("unable to write to child input")); _("unable to write to child input"));
...@@ -1839,11 +1833,8 @@ virCommandProcessIO(virCommandPtr cmd, int *inpipe) ...@@ -1839,11 +1833,8 @@ virCommandProcessIO(virCommandPtr cmd, int *inpipe)
} }
} else { } else {
inoff += done; inoff += done;
if (inoff == inlen) { if (inoff == inlen)
if (VIR_CLOSE(*inpipe) < 0) VIR_FORCE_CLOSE(cmd->inpipe);
VIR_DEBUG("ignoring failed close on fd %d", infd);
infd = -1;
}
} }
} }
} }
...@@ -1914,7 +1905,6 @@ virCommandRun(virCommandPtr cmd, int *exitstatus) ...@@ -1914,7 +1905,6 @@ virCommandRun(virCommandPtr cmd, int *exitstatus)
int ret = 0; int ret = 0;
char *outbuf = NULL; char *outbuf = NULL;
char *errbuf = NULL; char *errbuf = NULL;
int infd[2] = { -1, -1 };
struct stat st; struct stat st;
bool string_io; bool string_io;
bool async_io = false; bool async_io = false;
...@@ -1960,18 +1950,6 @@ virCommandRun(virCommandPtr cmd, int *exitstatus) ...@@ -1960,18 +1950,6 @@ virCommandRun(virCommandPtr cmd, int *exitstatus)
} }
} }
/* If we have an input buffer, we need
* a pipe to feed the data to the child */
if (cmd->inbuf) {
if (pipe2(infd, O_CLOEXEC) < 0) {
virReportSystemError(errno, "%s",
_("unable to open pipe"));
cmd->has_error = -1;
return -1;
}
cmd->infd = infd[0];
}
/* If caller requested the same string for stdout and stderr, then /* If caller requested the same string for stdout and stderr, then
* merge those into one string. */ * merge those into one string. */
if (cmd->outbuf && cmd->outbuf == cmd->errbuf) { if (cmd->outbuf && cmd->outbuf == cmd->errbuf) {
...@@ -1999,23 +1977,14 @@ virCommandRun(virCommandPtr cmd, int *exitstatus) ...@@ -1999,23 +1977,14 @@ virCommandRun(virCommandPtr cmd, int *exitstatus)
cmd->flags |= VIR_EXEC_RUN_SYNC; cmd->flags |= VIR_EXEC_RUN_SYNC;
if (virCommandRunAsync(cmd, NULL) < 0) { if (virCommandRunAsync(cmd, NULL) < 0) {
if (cmd->inbuf) {
tmpfd = infd[0];
if (VIR_CLOSE(infd[0]) < 0)
VIR_DEBUG("ignoring failed close on fd %d", tmpfd);
tmpfd = infd[1];
if (VIR_CLOSE(infd[1]) < 0)
VIR_DEBUG("ignoring failed close on fd %d", tmpfd);
}
cmd->has_error = -1; cmd->has_error = -1;
return -1; return -1;
} }
tmpfd = infd[0]; if (string_io) {
if (VIR_CLOSE(infd[0]) < 0) VIR_FORCE_CLOSE(cmd->infd);
VIR_DEBUG("ignoring failed close on fd %d", tmpfd); ret = virCommandProcessIO(cmd);
if (string_io) }
ret = virCommandProcessIO(cmd, &infd[1]);
if (virCommandWait(cmd, exitstatus) < 0) if (virCommandWait(cmd, exitstatus) < 0)
ret = -1; ret = -1;
...@@ -2031,11 +2000,7 @@ virCommandRun(virCommandPtr cmd, int *exitstatus) ...@@ -2031,11 +2000,7 @@ virCommandRun(virCommandPtr cmd, int *exitstatus)
/* Reset any capturing, in case caller runs /* Reset any capturing, in case caller runs
* this identical command again */ * this identical command again */
if (cmd->inbuf) { VIR_FORCE_CLOSE(cmd->inpipe);
tmpfd = infd[1];
if (VIR_CLOSE(infd[1]) < 0)
VIR_DEBUG("ignoring failed close on fd %d", tmpfd);
}
if (cmd->outbuf == &outbuf) { if (cmd->outbuf == &outbuf) {
tmpfd = cmd->outfd; tmpfd = cmd->outfd;
if (VIR_CLOSE(cmd->outfd) < 0) if (VIR_CLOSE(cmd->outfd) < 0)
...@@ -2135,177 +2100,13 @@ virCommandHook(void *data) ...@@ -2135,177 +2100,13 @@ virCommandHook(void *data)
static void static void
virCommandHandleReadWrite(int watch, int fd, int events, void *opaque) virCommandDoAsyncIOHelper(void *opaque)
{
virCommandPtr cmd = (virCommandPtr) opaque;
char ***bufptr = NULL;
char buf[1024];
ssize_t nread, nwritten;
size_t len = 0;
int *watchPtr = NULL;
bool eof = false;
int *fdptr = NULL, **fdptrptr = NULL;
VIR_DEBUG("watch=%d fd=%d events=%d", watch, fd, events);
errno = 0;
if (watch == cmd->inWatch) {
watchPtr = &cmd->inWatch;
fdptr = &cmd->infd;
if (events & VIR_EVENT_HANDLE_WRITABLE) {
len = strlen(cmd->inbuf);
while (true) {
nwritten = write(fd, cmd->inbuf + cmd->inbufOffset,
len - cmd->inbufOffset);
if (nwritten < 0) {
if (errno != EAGAIN && errno != EINTR) {
virReportSystemError(errno,
_("Unable to write command's "
"input to FD %d"),
fd);
eof = true;
}
break;
}
if (nwritten == 0) {
eof = true;
break;
}
cmd->inbufOffset += nwritten;
if (cmd->inbufOffset == len) {
VIR_FORCE_CLOSE(cmd->infd);
eof = true;
break;
}
}
}
} else {
if (watch == cmd->outWatch) {
watchPtr = &cmd->outWatch;
bufptr = &cmd->outbuf;
fdptr = &cmd->outfd;
fdptrptr = &cmd->outfdptr;
} else {
watchPtr = &cmd->errWatch;
bufptr = &cmd->errbuf;
fdptr = &cmd->errfd;
fdptrptr = &cmd->errfdptr;
}
if (events & VIR_EVENT_HANDLE_READABLE) {
if (**bufptr)
len = strlen(**bufptr);
while (true) {
nread = read(fd, buf, sizeof(buf));
if (nread < 0) {
if (errno != EAGAIN && errno != EINTR) {
virReportSystemError(errno,
_("unable to read command's "
"output from FD %d"),
fd);
eof = true;
}
break;
}
if (nread == 0) {
eof = true;
break;
}
if (VIR_REALLOC_N(**bufptr, len + nread + 1) < 0) {
virReportOOMError();
break;
}
memcpy(**bufptr + len, buf, nread);
(**bufptr)[len + nread] = '\0';
}
}
}
if (eof || (events & VIR_EVENT_HANDLE_HANGUP) ||
(events & VIR_EVENT_HANDLE_ERROR)) {
virEventRemoveHandle(watch);
*watchPtr = -1;
VIR_FORCE_CLOSE(*fdptr);
if (bufptr)
*bufptr = NULL;
if (fdptrptr)
*fdptrptr = NULL;
}
}
static int
virCommandRegisterEventLoop(virCommandPtr cmd)
{ {
int ret = -1; virCommandPtr cmd = opaque;
if (virCommandProcessIO(cmd) < 0) {
if (cmd->inbuf && /* If something went wrong, save errno or -1*/
(cmd->inWatch = virEventAddHandle(cmd->infd, cmd->has_error = errno ? errno : -1;
VIR_EVENT_HANDLE_WRITABLE |
VIR_EVENT_HANDLE_HANGUP |
VIR_EVENT_HANDLE_ERROR,
virCommandHandleReadWrite,
cmd, NULL)) < 0) {
virReportError(VIR_ERR_INTERNAL_ERROR,
_("Unable to register infd %d in the event loop"),
cmd->infd);
goto cleanup;
}
if (cmd->outbuf && cmd->outfdptr == &cmd->outfd &&
(cmd->outWatch = virEventAddHandle(cmd->outfd,
VIR_EVENT_HANDLE_READABLE |
VIR_EVENT_HANDLE_HANGUP |
VIR_EVENT_HANDLE_ERROR,
virCommandHandleReadWrite,
cmd, NULL)) < 0) {
virReportError(VIR_ERR_INTERNAL_ERROR,
_("Unable to register outfd %d in the event loop"),
cmd->outfd);
if (cmd->inWatch != -1) {
virEventRemoveHandle(cmd->inWatch);
cmd->inWatch = -1;
}
goto cleanup;
} }
if (cmd->errbuf && cmd->errfdptr == &cmd->errfd &&
(cmd->errWatch = virEventAddHandle(cmd->errfd,
VIR_EVENT_HANDLE_READABLE |
VIR_EVENT_HANDLE_HANGUP |
VIR_EVENT_HANDLE_ERROR,
virCommandHandleReadWrite,
cmd, NULL)) < 0) {
virReportError(VIR_ERR_INTERNAL_ERROR,
_("Unable to register errfd %d in the event loop"),
cmd->errfd);
if (cmd->inWatch != -1) {
virEventRemoveHandle(cmd->inWatch);
cmd->inWatch = -1;
}
if (cmd->outWatch != -1) {
virEventRemoveHandle(cmd->outWatch);
cmd->outWatch = -1;
}
goto cleanup;
}
ret = 0;
cleanup:
return ret;
} }
...@@ -2332,7 +2133,7 @@ cleanup: ...@@ -2332,7 +2133,7 @@ cleanup:
int int
virCommandRunAsync(virCommandPtr cmd, pid_t *pid) virCommandRunAsync(virCommandPtr cmd, pid_t *pid)
{ {
int ret; int ret = -1;
char *str; char *str;
int i; int i;
bool synchronous = false; bool synchronous = false;
...@@ -2351,23 +2152,21 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) ...@@ -2351,23 +2152,21 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid)
synchronous = cmd->flags & VIR_EXEC_RUN_SYNC; synchronous = cmd->flags & VIR_EXEC_RUN_SYNC;
cmd->flags &= ~VIR_EXEC_RUN_SYNC; cmd->flags &= ~VIR_EXEC_RUN_SYNC;
/* Buffer management can only be requested via virCommandRun, unless help /* Buffer management can only be requested via virCommandRun or
* from the event loop has been requested via virCommandDoAsyncIO. */ * virCommandDoAsyncIO. */
if (cmd->flags & VIR_EXEC_ASYNC_IO) { if (cmd->inbuf && cmd->infd == -1 &&
/* If we have an input buffer, we need (synchronous || cmd->flags & VIR_EXEC_ASYNC_IO)) {
* a pipe to feed the data to the child */ if (pipe2(infd, O_CLOEXEC) < 0) {
if (cmd->inbuf && cmd->infd == -1) { virReportSystemError(errno, "%s",
if (pipe2(infd, O_CLOEXEC) < 0) { _("unable to open pipe"));
virReportSystemError(errno, "%s", cmd->has_error = -1;
_("unable to open pipe")); return -1;
cmd->has_error = -1;
return -1;
}
cmd->infd = infd[0];
} }
cmd->infd = infd[0];
cmd->inpipe = infd[1];
} else if ((cmd->inbuf && cmd->infd == -1) || } else if ((cmd->inbuf && cmd->infd == -1) ||
(cmd->outbuf && cmd->outfdptr != &cmd->outfd) || (cmd->outbuf && cmd->outfdptr != &cmd->outfd) ||
(cmd->errbuf && cmd->errfdptr != &cmd->errfd)) { (cmd->errbuf && cmd->errfdptr != &cmd->errfd)) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s", virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("cannot mix string I/O with asynchronous command")); _("cannot mix string I/O with asynchronous command"));
return -1; return -1;
...@@ -2377,24 +2176,24 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) ...@@ -2377,24 +2176,24 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid)
virReportError(VIR_ERR_INTERNAL_ERROR, virReportError(VIR_ERR_INTERNAL_ERROR,
_("command is already running as pid %lld"), _("command is already running as pid %lld"),
(long long) cmd->pid); (long long) cmd->pid);
return -1; goto cleanup;
} }
if (!synchronous && (cmd->flags & VIR_EXEC_DAEMON)) { if (!synchronous && (cmd->flags & VIR_EXEC_DAEMON)) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s", virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("daemonized command cannot use virCommandRunAsync")); _("daemonized command cannot use virCommandRunAsync"));
return -1; goto cleanup;
} }
if (cmd->pwd && (cmd->flags & VIR_EXEC_DAEMON)) { if (cmd->pwd && (cmd->flags & VIR_EXEC_DAEMON)) {
virReportError(VIR_ERR_INTERNAL_ERROR, virReportError(VIR_ERR_INTERNAL_ERROR,
_("daemonized command cannot set working directory %s"), _("daemonized command cannot set working directory %s"),
cmd->pwd); cmd->pwd);
return -1; goto cleanup;
} }
if (cmd->pidfile && !(cmd->flags & VIR_EXEC_DAEMON)) { if (cmd->pidfile && !(cmd->flags & VIR_EXEC_DAEMON)) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s", virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("creation of pid file requires daemonized command")); _("creation of pid file requires daemonized command"));
return -1; goto cleanup;
} }
str = virCommandToString(cmd); str = virCommandToString(cmd);
...@@ -2430,15 +2229,27 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid) ...@@ -2430,15 +2229,27 @@ virCommandRunAsync(virCommandPtr cmd, pid_t *pid)
cmd->reap = true; cmd->reap = true;
if (ret == 0 && cmd->flags & VIR_EXEC_ASYNC_IO) { if (ret == 0 && cmd->flags & VIR_EXEC_ASYNC_IO) {
cmd->flags &= ~VIR_EXEC_ASYNC_IO; if (cmd->inbuf)
if (cmd->inbuf && cmd->infd != -1) {
/* close the read end of infd and replace it with the write end */
VIR_FORCE_CLOSE(cmd->infd); VIR_FORCE_CLOSE(cmd->infd);
cmd->infd = infd[1]; /* clear any error so we can catch if the helper thread reports one */
cmd->has_error = 0;
if (VIR_ALLOC(cmd->asyncioThread) < 0 ||
virThreadCreate(cmd->asyncioThread, true,
virCommandDoAsyncIOHelper, cmd) < 0) {
virReportSystemError(errno, "%s",
_("Unable to create thread "
"to process command's IO"));
VIR_FREE(cmd->asyncioThread);
virCommandAbort(cmd);
ret = -1;
} }
ret = virCommandRegisterEventLoop(cmd);
} }
cleanup:
if (ret < 0) {
VIR_FORCE_CLOSE(cmd->infd);
VIR_FORCE_CLOSE(cmd->inpipe);
}
return ret; return ret;
} }
...@@ -2459,7 +2270,6 @@ virCommandWait(virCommandPtr cmd, int *exitstatus) ...@@ -2459,7 +2270,6 @@ virCommandWait(virCommandPtr cmd, int *exitstatus)
{ {
int ret; int ret;
int status = 0; int status = 0;
const int events = VIR_EVENT_HANDLE_READABLE | VIR_EVENT_HANDLE_HANGUP;
if (!cmd ||cmd->has_error == ENOMEM) { if (!cmd ||cmd->has_error == ENOMEM) {
virReportOOMError(); virReportOOMError();
...@@ -2484,24 +2294,20 @@ virCommandWait(virCommandPtr cmd, int *exitstatus) ...@@ -2484,24 +2294,20 @@ virCommandWait(virCommandPtr cmd, int *exitstatus)
* guarantee that virProcessWait only fails due to failure to wait, * guarantee that virProcessWait only fails due to failure to wait,
* and repeat the exitstatus check code ourselves. */ * and repeat the exitstatus check code ourselves. */
ret = virProcessWait(cmd->pid, exitstatus ? exitstatus : &status); ret = virProcessWait(cmd->pid, exitstatus ? exitstatus : &status);
if (cmd->flags & VIR_EXEC_ASYNC_IO) {
if (cmd->inWatch != -1) { cmd->flags &= ~VIR_EXEC_ASYNC_IO;
virEventRemoveHandle(cmd->inWatch); virThreadJoin(cmd->asyncioThread);
cmd->inWatch = -1; VIR_FREE(cmd->asyncioThread);
} VIR_FORCE_CLOSE(cmd->inpipe);
if (cmd->has_error) {
if (cmd->outWatch != -1) { const char *msg = _("Error while processing command's IO");
virEventRemoveHandle(cmd->outWatch); if (cmd->has_error < 0)
virCommandHandleReadWrite(cmd->outWatch, cmd->outfd, events, cmd); virReportError(VIR_ERR_INTERNAL_ERROR, "%s", msg);
cmd->outWatch = -1; else
} virReportSystemError(cmd->has_error, "%s", msg);
ret = -1;
if (cmd->errWatch != -1) { }
virEventRemoveHandle(cmd->errWatch);
virCommandHandleReadWrite(cmd->errWatch, cmd->errfd, events, cmd);
cmd->errWatch = -1;
} }
if (ret == 0) { if (ret == 0) {
cmd->pid = -1; cmd->pid = -1;
cmd->reap = false; cmd->reap = false;
...@@ -2719,6 +2525,10 @@ virCommandFree(virCommandPtr cmd) ...@@ -2719,6 +2525,10 @@ virCommandFree(virCommandPtr cmd)
VIR_FORCE_CLOSE(cmd->transfer[i]); VIR_FORCE_CLOSE(cmd->transfer[i]);
} }
if (cmd->asyncioThread) {
virThreadJoin(cmd->asyncioThread);
VIR_FREE(cmd->asyncioThread);
}
VIR_FREE(cmd->inbuf); VIR_FREE(cmd->inbuf);
VIR_FORCE_CLOSE(cmd->outfd); VIR_FORCE_CLOSE(cmd->outfd);
VIR_FORCE_CLOSE(cmd->errfd); VIR_FORCE_CLOSE(cmd->errfd);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册