From 4f17809a36e25f1ab0b754e1ed13ca6ed9d0b4fb Mon Sep 17 00:00:00 2001 From: "Daniel P. Berrange" Date: Mon, 24 Aug 2009 20:57:16 +0100 Subject: [PATCH] Handle outgoing data streams in libvirtd * daemon/dispatch.c: Set streamTX flag on outgoing data packets * daemon/qemud.h: Add streamTX flag to track outgoing data * daemon/qemud.c: Re-enable further TX when outgoing data packet has been fully sent. * daemon/stream.h, daemon/stream.c: Add method for enabling TX. Support reading from streams and transmitting data out to client --- daemon/dispatch.c | 2 + daemon/libvirtd.c | 4 +- daemon/libvirtd.h | 1 + daemon/stream.c | 96 +++++++++++++++++++++++++++++++++++++++++++++++ daemon/stream.h | 4 ++ 5 files changed, 106 insertions(+), 1 deletion(-) diff --git a/daemon/dispatch.c b/daemon/dispatch.c index 1934d244e3..7417001e14 100644 --- a/daemon/dispatch.c +++ b/daemon/dispatch.c @@ -636,6 +636,8 @@ remoteSendStreamData(struct qemud_client *client, DEBUG("Total %d", msg->bufferOffset); } + if (data) + msg->streamTX = 1; /* Reset ready for I/O */ msg->bufferLength = msg->bufferOffset; diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c index e36151f6fa..78dfb2d092 100644 --- a/daemon/libvirtd.c +++ b/daemon/libvirtd.c @@ -1898,7 +1898,9 @@ void qemudClientMessageRelease(struct qemud_client *client, struct qemud_client_message *msg) { - if (!msg->async) + if (msg->streamTX) { + remoteStreamMessageFinished(client, msg); + } else if (!msg->async) client->nrequests--; /* See if the recv queue is currently throttled */ diff --git a/daemon/libvirtd.h b/daemon/libvirtd.h index aae23fcacf..579e1c4144 100644 --- a/daemon/libvirtd.h +++ b/daemon/libvirtd.h @@ -130,6 +130,7 @@ struct qemud_client_message { unsigned int bufferOffset; unsigned int async : 1; + unsigned int streamTX : 1; remote_message_header hdr; diff --git a/daemon/stream.c b/daemon/stream.c index 1fe0e58db6..584268dc42 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -32,6 +32,9 @@ static int remoteStreamHandleWrite(struct qemud_client *client, struct qemud_client_stream *stream); static int +remoteStreamHandleRead(struct qemud_client *client, + struct qemud_client_stream *stream); +static int remoteStreamHandleFinish(struct qemud_client *client, struct qemud_client_stream *stream, struct qemud_client_message *msg); @@ -48,6 +51,8 @@ remoteStreamUpdateEvents(struct qemud_client_stream *stream) int newEvents = 0; if (stream->rx) newEvents |= VIR_STREAM_EVENT_WRITABLE; + if (stream->tx && !stream->recvEOF) + newEvents |= VIR_STREAM_EVENT_READABLE; virStreamEventUpdateCallback(stream->st, newEvents); } @@ -87,6 +92,16 @@ remoteStreamEvent(virStreamPtr st, int events, void *opaque) } } + if (!stream->recvEOF && + (events & (VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP))) { + events = events & ~(VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP); + if (remoteStreamHandleRead(client, stream) < 0) { + remoteRemoveClientStream(client, stream); + qemudDispatchClientFailure(client); + goto cleanup; + } + } + if (!stream->closed && (events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP))) { int ret; @@ -507,3 +522,84 @@ remoteStreamHandleWrite(struct qemud_client *client, return 0; } + + + +/* + * Invoked when a stream is signalled as having data + * available to read. This reads upto one message + * worth of data, and then queues that for transmission + * to the client. + * + * Returns 0 if data was queued for TX, or a error RPC + * was sent, or -1 on fatal error, indicating client should + * be killed + */ +static int +remoteStreamHandleRead(struct qemud_client *client, + struct qemud_client_stream *stream) +{ + char *buffer; + size_t bufferLen = REMOTE_MESSAGE_PAYLOAD_MAX; + int ret; + + DEBUG("stream=%p", stream); + + /* Shouldn't ever be called unless we're marked able to + * transmit, but doesn't hurt to check */ + if (!stream->tx) + return 0; + + if (VIR_ALLOC_N(buffer, bufferLen) < 0) + return -1; + + ret = virStreamRecv(stream->st, buffer, bufferLen); + if (ret == -2) { + /* Should never get this, since we're only called when we know + * we're readable, but hey things change... */ + ret = 0; + } else if (ret < 0) { + remote_error rerr; + memset(&rerr, 0, sizeof rerr); + remoteDispatchConnError(&rerr, NULL); + + ret = remoteSerializeStreamError(client, &rerr, stream->procedure, stream->serial); + } else { + stream->tx = 0; + if (ret == 0) + stream->recvEOF = 1; + ret = remoteSendStreamData(client, stream, buffer, ret); + } + + VIR_FREE(buffer); + return ret; +} + + +/* + * Invoked when an outgoing data packet message has been fully sent. + * This simply re-enables TX of further data. + * + * The idea is to stop the daemon growing without bound due to + * fast stream, but slow client + */ +void +remoteStreamMessageFinished(struct qemud_client *client, + struct qemud_client_message *msg) +{ + struct qemud_client_stream *stream = client->streams; + + while (stream) { + if (msg->hdr.proc == stream->procedure && + msg->hdr.serial == stream->serial) + break; + stream = stream->next; + } + + DEBUG("Message client=%p stream=%p proc=%d serial=%d", client, stream, msg->hdr.proc, msg->hdr.serial); + + if (stream) { + stream->tx = 1; + remoteStreamUpdateEvents(stream); + } +} diff --git a/daemon/stream.h b/daemon/stream.h index 250498472e..2e2d249517 100644 --- a/daemon/stream.h +++ b/daemon/stream.h @@ -46,4 +46,8 @@ int remoteRemoveClientStream(struct qemud_client *client, struct qemud_client_stream *stream); +void +remoteStreamMessageFinished(struct qemud_client *client, + struct qemud_client_message *msg); + #endif /* __LIBVIRTD_STREAM_H__ */ -- GitLab