提交 8ef5f263 编写于 作者: J Jiri Denemark

qemu: Avoid bogus error at the end of tunnelled migration

Once qemu monitor reports migration has completed, we just closed our
end of the pipe and let migration tunnel die. This generated bogus error
in case we did so before the thread saw EOF on the pipe and migration
was aborted even though it was in fact successful.

With this patch we first wake up the tunnel thread and once it has read
all data from the pipe and finished the stream we close the
filedescriptor.

A small additional bonus of this patch is that real errors reported
inside qemuMigrationIOFunc are not overwritten by virStreamAbort any
more.
上级 25a63451
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include <gnutls/gnutls.h> #include <gnutls/gnutls.h>
#include <gnutls/x509.h> #include <gnutls/x509.h>
#include <fcntl.h> #include <fcntl.h>
#include <poll.h>
#include "qemu_migration.h" #include "qemu_migration.h"
#include "qemu_monitor.h" #include "qemu_monitor.h"
...@@ -1585,49 +1586,113 @@ struct _qemuMigrationIOThread { ...@@ -1585,49 +1586,113 @@ struct _qemuMigrationIOThread {
virStreamPtr st; virStreamPtr st;
int sock; int sock;
virError err; virError err;
int wakeupRecvFD;
int wakeupSendFD;
}; };
static void qemuMigrationIOFunc(void *arg) static void qemuMigrationIOFunc(void *arg)
{ {
qemuMigrationIOThreadPtr data = arg; qemuMigrationIOThreadPtr data = arg;
char *buffer; char *buffer = NULL;
int nbytes = TUNNEL_SEND_BUF_SIZE; struct pollfd fds[2];
int timeout = -1;
virErrorPtr err = NULL;
VIR_DEBUG("Running migration tunnel; stream=%p, sock=%d",
data->st, data->sock);
if (VIR_ALLOC_N(buffer, TUNNEL_SEND_BUF_SIZE) < 0) { if (VIR_ALLOC_N(buffer, TUNNEL_SEND_BUF_SIZE) < 0) {
virReportOOMError(); virReportOOMError();
virStreamAbort(data->st); goto abrt;
goto error;
} }
fds[0].fd = data->sock;
fds[1].fd = data->wakeupRecvFD;
for (;;) { for (;;) {
nbytes = saferead(data->sock, buffer, TUNNEL_SEND_BUF_SIZE); int ret;
if (nbytes < 0) {
fds[0].events = fds[1].events = POLLIN;
fds[0].revents = fds[1].revents = 0;
ret = poll(fds, ARRAY_CARDINALITY(fds), timeout);
if (ret < 0) {
if (errno == EAGAIN || errno == EINTR)
continue;
virReportSystemError(errno, "%s", virReportSystemError(errno, "%s",
_("tunnelled migration failed to read from qemu")); _("poll failed in migration tunnel"));
virStreamAbort(data->st); goto abrt;
VIR_FREE(buffer);
goto error;
} }
else if (nbytes == 0)
/* EOF; get out of here */ if (ret == 0) {
/* We were asked to gracefully stop but reading would block. This
* can only happen if qemu told us migration finished but didn't
* close the migration fd. We handle this in the same way as EOF.
*/
VIR_DEBUG("QEMU forgot to close migration fd");
break; break;
}
if (virStreamSend(data->st, buffer, nbytes) < 0) { if (fds[1].revents & (POLLIN | POLLERR | POLLHUP)) {
VIR_FREE(buffer); char stop = 0;
goto error;
if (saferead(data->wakeupRecvFD, &stop, 1) != 1) {
virReportSystemError(errno, "%s",
_("failed to read from wakeup fd"));
goto abrt;
}
VIR_DEBUG("Migration tunnel was asked to %s",
stop ? "abort" : "finish");
if (stop) {
goto abrt;
} else {
timeout = 0;
}
} }
}
VIR_FREE(buffer); if (fds[0].revents & (POLLIN | POLLERR | POLLHUP)) {
int nbytes;
nbytes = saferead(data->sock, buffer, TUNNEL_SEND_BUF_SIZE);
if (nbytes > 0) {
if (virStreamSend(data->st, buffer, nbytes) < 0)
goto error;
} else if (nbytes < 0) {
virReportSystemError(errno, "%s",
_("tunnelled migration failed to read from qemu"));
goto abrt;
} else {
/* EOF; get out of here */
break;
}
}
}
if (virStreamFinish(data->st) < 0) if (virStreamFinish(data->st) < 0)
goto error; goto error;
VIR_FREE(buffer);
return; return;
abrt:
err = virSaveLastError();
if (err && err->code == VIR_ERR_OK) {
virFreeError(err);
err = NULL;
}
virStreamAbort(data->st);
if (err) {
virSetError(err);
virFreeError(err);
}
error: error:
virCopyLastError(&data->err); virCopyLastError(&data->err);
virResetLastError(); virResetLastError();
VIR_FREE(buffer);
} }
...@@ -1635,37 +1700,63 @@ static qemuMigrationIOThreadPtr ...@@ -1635,37 +1700,63 @@ static qemuMigrationIOThreadPtr
qemuMigrationStartTunnel(virStreamPtr st, qemuMigrationStartTunnel(virStreamPtr st,
int sock) int sock)
{ {
qemuMigrationIOThreadPtr io; qemuMigrationIOThreadPtr io = NULL;
int wakeupFD[2] = { -1, -1 };
if (VIR_ALLOC(io) < 0) { if (pipe2(wakeupFD, O_CLOEXEC) < 0) {
virReportOOMError(); virReportSystemError(errno, "%s",
return NULL; _("Unable to make pipe"));
goto error;
} }
if (VIR_ALLOC(io) < 0)
goto no_memory;
io->st = st; io->st = st;
io->sock = sock; io->sock = sock;
io->wakeupRecvFD = wakeupFD[0];
io->wakeupSendFD = wakeupFD[1];
if (virThreadCreate(&io->thread, true, if (virThreadCreate(&io->thread, true,
qemuMigrationIOFunc, qemuMigrationIOFunc,
io) < 0) { io) < 0) {
virReportSystemError(errno, "%s", virReportSystemError(errno, "%s",
_("Unable to create migration thread")); _("Unable to create migration thread"));
VIR_FREE(io); goto error;
return NULL;
} }
return io; return io;
no_memory:
virReportOOMError();
error:
VIR_FORCE_CLOSE(wakeupFD[0]);
VIR_FORCE_CLOSE(wakeupFD[1]);
VIR_FREE(io);
return NULL;
} }
static int static int
qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io) qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io, bool error)
{ {
int rv = -1; int rv = -1;
char stop = error ? 1 : 0;
/* make sure the thread finishes its job and is joinable */
if (safewrite(io->wakeupSendFD, &stop, 1) != 1) {
virReportSystemError(errno, "%s",
_("failed to wakeup migration tunnel"));
goto cleanup;
}
virThreadJoin(&io->thread); virThreadJoin(&io->thread);
/* Forward error from the IO thread, to this thread */ /* Forward error from the IO thread, to this thread */
if (io->err.code != VIR_ERR_OK) { if (io->err.code != VIR_ERR_OK) {
virSetError(&io->err); if (error)
rv = 0;
else
virSetError(&io->err);
virResetError(&io->err); virResetError(&io->err);
goto cleanup; goto cleanup;
} }
...@@ -1673,6 +1764,8 @@ qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io) ...@@ -1673,6 +1764,8 @@ qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io)
rv = 0; rv = 0;
cleanup: cleanup:
VIR_FORCE_CLOSE(io->wakeupSendFD);
VIR_FORCE_CLOSE(io->wakeupRecvFD);
VIR_FREE(io); VIR_FREE(io);
return rv; return rv;
} }
...@@ -1880,10 +1973,9 @@ cleanup: ...@@ -1880,10 +1973,9 @@ cleanup:
orig_err = virSaveLastError(); orig_err = virSaveLastError();
if (spec->fwdType != MIGRATION_FWD_DIRECT) { if (spec->fwdType != MIGRATION_FWD_DIRECT) {
/* Close now to ensure the IO thread quits & is joinable */ if (iothread && qemuMigrationStopTunnel(iothread, ret < 0) < 0)
VIR_FORCE_CLOSE(fd);
if (iothread && qemuMigrationStopTunnel(iothread) < 0)
ret = -1; ret = -1;
VIR_FORCE_CLOSE(fd);
} }
if (ret == 0 && if (ret == 0 &&
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册