提交 86b53e59 编写于 作者: D Daniel P. Berrange

Rewrite LXC I/O forwarding to use main event loop

The current I/O code for LXC uses a hand crafted event loop
to forward I/O between the container & host app, based on
epoll to handle EOF on PTYs. This event loop is not easily
extensible to add more consoles, or monitor other types of
file descriptors.

Remove the custom event loop and replace it with a normal
libvirt event loop. When detecting EOF on a PTY, disable
the event watch on that FD, and fork off a background thread
that does a edge-triggered epoll() on the FD. When the FD
finally shows new incoming data, the thread re-enables the
watch on the FD and exits.

When getting EOF from a read() on the PTY, the existing code
would do waitpid(WNOHANG) to see if the container had exited.
Unfortunately there is a race condition, because even though
the process has closed its stdio handles, it might still
exist.

To deal with this the new event loop uses a SIG_CHILD handler
to perform the waitpid only when the container is known to
have actually exited.

* src/lxc/lxc_controller.c: Rewrite the event loop to use
  the standard APIs.
上级 5990d921
......@@ -676,7 +676,7 @@ $(srcdir)/src/remote/remote_client_bodies.h: $(srcdir)/src/remote/remote_protoco
# List all syntax-check exemptions:
exclude_file_name_regexp--sc_avoid_strcase = ^tools/virsh\.c$$
_src1=libvirt|fdstream|qemu/qemu_monitor|util/(command|util)|xen/xend_internal|rpc/virnetsocket
_src1=libvirt|fdstream|qemu/qemu_monitor|util/(command|util)|xen/xend_internal|rpc/virnetsocket|lxc/lxc_controller
exclude_file_name_regexp--sc_avoid_write = \
^(src/($(_src1))|daemon/libvirtd|tools/console|tests/(shunload|virnettlscontext)test)\.c$$
......
......@@ -438,45 +438,6 @@ error:
return -1;
}
/**
* lxcFdForward:
* @readFd: file descriptor to read
* @writeFd: file desriptor to write
*
* Reads 1 byte of data from readFd and writes to writeFd.
*
* Returns 0 on success, EAGAIN if returned on read, or -1 in case of error
*/
static int lxcFdForward(int readFd, int writeFd)
{
int rc = -1;
char buf[2];
if (1 != (saferead(readFd, buf, 1))) {
if (EAGAIN == errno) {
rc = EAGAIN;
goto cleanup;
}
virReportSystemError(errno,
_("read of fd %d failed"),
readFd);
goto cleanup;
}
if (1 != (safewrite(writeFd, buf, 1))) {
virReportSystemError(errno,
_("write to fd %d failed"),
writeFd);
goto cleanup;
}
rc = 0;
cleanup:
return rc;
}
static int lxcControllerClearCapabilities(void)
{
......@@ -496,15 +457,10 @@ static int lxcControllerClearCapabilities(void)
return 0;
}
typedef struct _lxcTtyForwardFd_t {
int fd;
int active;
} lxcTtyForwardFd_t;
/* Return true if it is ok to ignore an accept-after-epoll syscall
that fails with the specified errno value. Else false. */
static bool
ignorable_epoll_accept_errno(int errnum)
ignorable_accept_errno(int errnum)
{
return (errnum == EINVAL
|| errnum == ECONNABORTED
......@@ -512,202 +468,441 @@ ignorable_epoll_accept_errno(int errnum)
|| errnum == EWOULDBLOCK);
}
static bool
lxcPidGone(pid_t container)
static bool quit = false;
static virMutex lock;
static int sigpipe[2];
static void lxcSignalChildHandler(int signum ATTRIBUTE_UNUSED)
{
ignore_value(write(sigpipe[1], "1", 1));
}
static void lxcSignalChildIO(int watch ATTRIBUTE_UNUSED,
int fd ATTRIBUTE_UNUSED,
int events ATTRIBUTE_UNUSED, void *opaque)
{
waitpid(container, NULL, WNOHANG);
char buf[1];
int ret;
int *container = opaque;
ignore_value(read(sigpipe[0], buf, 1));
ret = waitpid(-1, NULL, WNOHANG);
if (ret == *container) {
virMutexLock(&lock);
quit = true;
virMutexUnlock(&lock);
}
}
struct lxcConsole {
int hostWatch;
int hostFd; /* PTY FD in the host OS */
bool hostClosed;
int contWatch;
int contFd; /* PTY FD in the container */
bool contClosed;
size_t fromHostLen;
char fromHostBuf[1024];
size_t fromContLen;
char fromContBuf[1024];
};
struct lxcMonitor {
int serverWatch;
int serverFd; /* Server listen socket */
int clientWatch;
int clientFd; /* Current client FD (if any) */
};
if (kill(container, 0) < 0 &&
errno == ESRCH)
return true;
return false;
static void lxcClientIO(int watch ATTRIBUTE_UNUSED, int fd, int events, void *opaque)
{
struct lxcMonitor *monitor = opaque;
char buf[1024];
ssize_t ret;
if (events & (VIR_EVENT_HANDLE_HANGUP |
VIR_EVENT_HANDLE_ERROR)) {
virEventRemoveHandle(monitor->clientWatch);
monitor->clientWatch = -1;
return;
}
reread:
ret = read(fd, buf, sizeof(buf));
if (ret == -1 && errno == EINTR)
goto reread;
if (ret == -1 && errno == EAGAIN)
return;
if (ret == -1) {
lxcError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Unable to read from monitor client"));
virMutexLock(&lock);
quit = true;
virMutexUnlock(&lock);
return;
}
if (ret == 0) {
VIR_DEBUG("Client %d gone", fd);
VIR_FORCE_CLOSE(monitor->clientFd);
virEventRemoveHandle(monitor->clientWatch);
monitor->clientWatch = -1;
}
}
static void lxcServerAccept(int watch ATTRIBUTE_UNUSED, int fd, int events ATTRIBUTE_UNUSED, void *opaque)
{
struct lxcMonitor *monitor = opaque;
int client;
if ((client = accept(fd, NULL, NULL)) < 0) {
/* First reflex may be simply to declare accept failure
to be a fatal error. However, accept may fail when
a client quits between the above poll and here.
That case is not fatal, but rather to be expected,
if not common, so ignore it. */
if (ignorable_accept_errno(errno))
return;
virReportSystemError(errno, "%s",
_("Unable to accept monitor client"));
virMutexLock(&lock);
quit = true;
virMutexUnlock(&lock);
return;
}
VIR_DEBUG("New client %d (old %d)\n", client, monitor->clientFd);
VIR_FORCE_CLOSE(monitor->clientFd);
virEventRemoveHandle(monitor->clientWatch);
monitor->clientFd = client;
if ((monitor->clientWatch = virEventAddHandle(monitor->clientFd,
VIR_EVENT_HANDLE_READABLE,
lxcClientIO,
monitor,
NULL)) < 0) {
lxcError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Unable to watch client socket"));
virMutexLock(&lock);
quit = true;
virMutexUnlock(&lock);
return;
}
}
static void lxcConsoleUpdateWatch(struct lxcConsole *console)
{
int hostEvents = 0;
int contEvents = 0;
if (!console->hostClosed) {
if (console->fromHostLen < sizeof(console->fromHostBuf))
hostEvents |= VIR_EVENT_HANDLE_READABLE;
if (console->fromContLen)
hostEvents |= VIR_EVENT_HANDLE_WRITABLE;
}
if (!console->contClosed) {
if (console->fromContLen < sizeof(console->fromContBuf))
contEvents |= VIR_EVENT_HANDLE_READABLE;
if (console->fromHostLen)
contEvents |= VIR_EVENT_HANDLE_WRITABLE;
}
virEventUpdateHandle(console->contWatch, contEvents);
virEventUpdateHandle(console->hostWatch, hostEvents);
}
struct lxcConsoleEOFData {
struct lxcConsole *console;
int fd;
};
static void lxcConsoleEOFThread(void *opaque)
{
struct lxcConsoleEOFData *data = opaque;
int ret;
int epollfd = -1;
struct epoll_event event;
if ((epollfd = epoll_create(2)) < 0) {
virReportSystemError(errno, "%s",
_("Unable to create epoll fd"));
goto cleanup;
}
event.events = EPOLLIN | EPOLLET;
event.data.fd = data->fd;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, data->fd, &event) < 0) {
virReportSystemError(errno, "%s",
_("Unable to add epoll fd"));
goto cleanup;
}
for (;;) {
ret = epoll_wait(epollfd, &event, 1, -1);
if (ret < 0) {
if (ret == EINTR)
continue;
virReportSystemError(errno, "%s",
_("Unable to wait on epoll"));
virMutexLock(&lock);
quit = true;
virMutexUnlock(&lock);
goto cleanup;
}
/* If we get HUP+dead PID, we just re-enable the main loop
* which will see the PID has died and exit */
if ((event.events & EPOLLIN)) {
virMutexLock(&lock);
if (event.data.fd == data->console->hostFd) {
data->console->hostClosed = false;
} else {
data->console->contClosed = false;
}
lxcConsoleUpdateWatch(data->console);
virMutexUnlock(&lock);
break;
}
}
cleanup:
VIR_FORCE_CLOSE(epollfd);
VIR_FREE(data);
}
static int lxcCheckEOF(struct lxcConsole *console, int fd)
{
struct lxcConsoleEOFData *data;
virThread thread;
if (VIR_ALLOC(data) < 0) {
virReportOOMError();
return -1;
}
data->console = console;
data->fd = fd;
if (virThreadCreate(&thread, false, lxcConsoleEOFThread, data) < 0) {
VIR_FREE(data);
return -1;
}
return 0;
}
static void lxcConsoleIO(int watch, int fd, int events, void *opaque)
{
struct lxcConsole *console = opaque;
virMutexLock(&lock);
if (events & VIR_EVENT_HANDLE_READABLE) {
char *buf;
size_t *len;
size_t avail;
ssize_t done;
if (watch == console->hostWatch) {
buf = console->fromHostBuf;
len = &console->fromHostLen;
avail = sizeof(console->fromHostBuf) - *len;
} else {
buf = console->fromContBuf;
len = &console->fromContLen;
avail = sizeof(console->fromContBuf) - *len;
}
reread:
done = read(fd, buf + *len, avail);
if (done == -1 && errno == EINTR)
goto reread;
if (done == -1 && errno != EAGAIN) {
virReportSystemError(errno, "%s",
_("Unable to read container pty"));
goto error;
}
if (done > 0) {
*len += done;
} else {
VIR_DEBUG("Read fd %d done %d errno %d", fd, (int)done, errno);
}
}
if (events & VIR_EVENT_HANDLE_WRITABLE) {
char *buf;
size_t *len;
ssize_t done;
if (watch == console->hostWatch) {
buf = console->fromContBuf;
len = &console->fromContLen;
} else {
buf = console->fromHostBuf;
len = &console->fromHostLen;
}
rewrite:
done = write(fd, buf, *len);
if (done == -1 && errno == EINTR)
goto rewrite;
if (done == -1 && errno != EAGAIN) {
virReportSystemError(errno, "%s",
_("Unable to write to container pty"));
goto error;
}
if (done > 0) {
memmove(buf, buf + done, (*len - done));
*len -= done;
} else {
VIR_DEBUG("Write fd %d done %d errno %d", fd, (int)done, errno);
}
}
if (events & VIR_EVENT_HANDLE_HANGUP) {
if (watch == console->hostWatch) {
console->hostClosed = true;
} else {
console->contClosed = true;
}
VIR_DEBUG("Got EOF on %d %d", watch, fd);
if (lxcCheckEOF(console, fd) < 0)
goto error;
}
lxcConsoleUpdateWatch(console);
virMutexUnlock(&lock);
return;
error:
virEventRemoveHandle(console->contWatch);
virEventRemoveHandle(console->hostWatch);
console->contWatch = console->hostWatch = -1;
quit = true;
virMutexUnlock(&lock);
}
/**
* lxcControllerMain
* @monitor: server socket fd to accept client requests
* @client: initial client which is the libvirtd daemon
* @appPty: open fd for application facing Pty
* @contPty: open fd for container facing Pty
* @serverFd: server socket fd to accept client requests
* @clientFd: initial client which is the libvirtd daemon
* @hostFd: open fd for application facing Pty
* @contFd: open fd for container facing Pty
*
* Forwards traffic between fds. Data read from appPty will be written to contPty
* This process loops forever.
* This uses epoll in edge triggered mode to avoid a hard loop on POLLHUP
* events when the user disconnects the virsh console via ctrl-]
* Processes I/O on consoles and the monitor
*
* Returns 0 on success or -1 in case of error
*/
static int lxcControllerMain(int monitor,
int client,
int appPty,
int contPty,
static int lxcControllerMain(int serverFd,
int clientFd,
int hostFd,
int contFd,
pid_t container)
{
struct lxcConsole console = {
.hostFd = hostFd,
.contFd = contFd,
};
struct lxcMonitor monitor = {
.serverFd = serverFd,
.clientFd = clientFd,
};
virErrorPtr err;
int rc = -1;
int epollFd;
struct epoll_event epollEvent;
int numEvents;
int numActive = 0;
lxcTtyForwardFd_t fdArray[2];
int timeout = -1;
int curFdOff = 0;
int writeFdOff = 0;
fdArray[0].fd = appPty;
fdArray[0].active = 0;
fdArray[1].fd = contPty;
fdArray[1].active = 0;
VIR_DEBUG("monitor=%d client=%d appPty=%d contPty=%d",
monitor, client, appPty, contPty);
/* create the epoll fild descriptor */
epollFd = epoll_create(2);
if (0 > epollFd) {
if (virMutexInit(&lock) < 0)
goto cleanup2;
if (pipe2(sigpipe, O_CLOEXEC|O_NONBLOCK) < 0) {
virReportSystemError(errno, "%s",
_("epoll_create(2) failed"));
_("Cannot create signal pipe"));
goto cleanup;
}
/* add the file descriptors the epoll fd */
memset(&epollEvent, 0x00, sizeof(epollEvent));
epollEvent.events = EPOLLIN|EPOLLET; /* edge triggered */
epollEvent.data.fd = appPty;
if (0 > epoll_ctl(epollFd, EPOLL_CTL_ADD, appPty, &epollEvent)) {
virReportSystemError(errno, "%s",
_("epoll_ctl(appPty) failed"));
if (virEventAddHandle(sigpipe[0],
VIR_EVENT_HANDLE_READABLE,
lxcSignalChildIO,
&container,
NULL) < 0) {
lxcError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Unable to watch signal pipe"));
goto cleanup;
}
epollEvent.data.fd = contPty;
if (0 > epoll_ctl(epollFd, EPOLL_CTL_ADD, contPty, &epollEvent)) {
if (signal(SIGCHLD, lxcSignalChildHandler) == SIG_ERR) {
virReportSystemError(errno, "%s",
_("epoll_ctl(contPty) failed"));
_("Cannot install signal handler"));
goto cleanup;
}
epollEvent.events = EPOLLIN;
epollEvent.data.fd = monitor;
if (0 > epoll_ctl(epollFd, EPOLL_CTL_ADD, monitor, &epollEvent)) {
virReportSystemError(errno, "%s",
_("epoll_ctl(monitor) failed"));
VIR_DEBUG("serverFd=%d clientFd=%d hostFd=%d contFd=%d",
serverFd, clientFd, hostFd, contFd);
virResetLastError();
if ((monitor.serverWatch = virEventAddHandle(monitor.serverFd,
VIR_EVENT_HANDLE_READABLE,
lxcServerAccept,
&monitor,
NULL)) < 0) {
lxcError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Unable to watch monitor socket"));
goto cleanup;
}
epollEvent.events = EPOLLHUP;
epollEvent.data.fd = client;
if (0 > epoll_ctl(epollFd, EPOLL_CTL_ADD, client, &epollEvent)) {
virReportSystemError(errno, "%s",
_("epoll_ctl(client) failed"));
if (monitor.clientFd != -1 &&
(monitor.clientWatch = virEventAddHandle(monitor.clientFd,
VIR_EVENT_HANDLE_READABLE,
lxcClientIO,
&monitor,
NULL)) < 0) {
lxcError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Unable to watch client socket"));
goto cleanup;
}
while (1) {
/* if active fd's, return if no events, else wait forever */
timeout = (numActive > 0) ? 0 : -1;
numEvents = epoll_wait(epollFd, &epollEvent, 1, timeout);
if (numEvents > 0) {
if (epollEvent.data.fd == monitor) {
int fd = accept(monitor, NULL, 0);
if (fd < 0) {
/* First reflex may be simply to declare accept failure
to be a fatal error. However, accept may fail when
a client quits between the above epoll_wait and here.
That case is not fatal, but rather to be expected,
if not common, so ignore it. */
if (ignorable_epoll_accept_errno(errno))
continue;
virReportSystemError(errno, "%s",
_("accept(monitor,...) failed"));
goto cleanup;
}
if (client != -1) { /* Already connected, so kick new one out */
VIR_FORCE_CLOSE(fd);
continue;
}
client = fd;
epollEvent.events = EPOLLHUP;
epollEvent.data.fd = client;
if (0 > epoll_ctl(epollFd, EPOLL_CTL_ADD, client, &epollEvent)) {
virReportSystemError(errno, "%s",
_("epoll_ctl(client) failed"));
goto cleanup;
}
} else if (client != -1 && epollEvent.data.fd == client) {
if (0 > epoll_ctl(epollFd, EPOLL_CTL_DEL, client, &epollEvent)) {
virReportSystemError(errno, "%s",
_("epoll_ctl(client) failed"));
goto cleanup;
}
VIR_FORCE_CLOSE(client);
} else {
if (epollEvent.events & EPOLLIN) {
curFdOff = epollEvent.data.fd == appPty ? 0 : 1;
if (!fdArray[curFdOff].active) {
fdArray[curFdOff].active = 1;
++numActive;
}
} else if (epollEvent.events & EPOLLHUP) {
if (lxcPidGone(container))
goto cleanup;
curFdOff = epollEvent.data.fd == appPty ? 0 : 1;
if (fdArray[curFdOff].active) {
fdArray[curFdOff].active = 0;
--numActive;
}
continue;
} else {
lxcError(VIR_ERR_INTERNAL_ERROR,
_("error event %d"), epollEvent.events);
goto cleanup;
}
}
} else if (0 == numEvents) {
if (2 == numActive) {
/* both fds active, toggle between the two */
curFdOff ^= 1;
} else {
/* only one active, if current is active, use it, else it */
/* must be the other one (ie. curFd just went inactive) */
curFdOff = fdArray[curFdOff].active ? curFdOff : curFdOff ^ 1;
}
if ((console.hostWatch = virEventAddHandle(console.hostFd,
VIR_EVENT_HANDLE_READABLE,
lxcConsoleIO,
&console,
NULL)) < 0) {
lxcError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Unable to watch host console PTY"));
goto cleanup;
}
} else {
if (EINTR == errno) {
continue;
}
if ((console.contWatch = virEventAddHandle(console.contFd,
VIR_EVENT_HANDLE_READABLE,
lxcConsoleIO,
&console,
NULL)) < 0) {
lxcError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Unable to watch host console PTY"));
goto cleanup;
}
/* error */
virReportSystemError(errno, "%s",
_("epoll_wait() failed"));
virMutexLock(&lock);
while (!quit) {
virMutexUnlock(&lock);
if (virEventRunDefaultImpl() < 0)
goto cleanup;
}
if (0 < numActive) {
writeFdOff = curFdOff ^ 1;
rc = lxcFdForward(fdArray[curFdOff].fd, fdArray[writeFdOff].fd);
if (EAGAIN == rc) {
/* this fd no longer has data, set it as inactive */
--numActive;
fdArray[curFdOff].active = 0;
} else if (-1 == rc) {
if (lxcPidGone(container))
goto cleanup;
continue;
}
}
virMutexLock(&lock);
}
virMutexUnlock(&lock);
rc = 0;
err = virGetLastError();
if (!err || err->code == VIR_ERR_OK)
rc = 0;
cleanup:
VIR_FORCE_CLOSE(appPty);
VIR_FORCE_CLOSE(contPty);
VIR_FORCE_CLOSE(epollFd);
virMutexDestroy(&lock);
signal(SIGCHLD, SIG_DFL);
cleanup2:
VIR_FORCE_CLOSE(console.hostFd);
VIR_FORCE_CLOSE(console.contFd);
VIR_FORCE_CLOSE(monitor.serverFd);
VIR_FORCE_CLOSE(monitor.clientFd);
return rc;
}
......@@ -1004,7 +1199,17 @@ lxcControllerRun(virDomainDefPtr def,
}
VIR_FORCE_CLOSE(handshakefd);
if (virSetBlocking(monitor, false) < 0 ||
virSetBlocking(client, false) < 0 ||
virSetBlocking(appPty, false) < 0 ||
virSetBlocking(containerPty, false) < 0) {
virReportSystemError(errno, "%s",
_("Unable to set file descriptor non blocking"));
goto cleanup;
}
rc = lxcControllerMain(monitor, client, appPty, containerPty, container);
monitor = client = appPty = containerPty = -1;
cleanup:
VIR_FREE(devptmx);
......@@ -1156,6 +1361,8 @@ int main(int argc, char *argv[])
goto cleanup;
}
virEventRegisterDefaultImpl();
if ((caps = lxcCapsInit()) == NULL)
goto cleanup;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册