diff --git a/daemon/dispatch.c b/daemon/dispatch.c index 1934d244e34714f73509b7ca91bb6af5cdfb7126..7417001e1422b4cf192372f48b7b12bde0ed7ea1 100644 --- a/daemon/dispatch.c +++ b/daemon/dispatch.c @@ -636,6 +636,8 @@ remoteSendStreamData(struct qemud_client *client, DEBUG("Total %d", msg->bufferOffset); } + if (data) + msg->streamTX = 1; /* Reset ready for I/O */ msg->bufferLength = msg->bufferOffset; diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c index e36151f6fad99f764a74e2fd3940b41996e2e2d7..78dfb2d092ac37f632bda630e2039b33fc09e4b9 100644 --- a/daemon/libvirtd.c +++ b/daemon/libvirtd.c @@ -1898,7 +1898,9 @@ void qemudClientMessageRelease(struct qemud_client *client, struct qemud_client_message *msg) { - if (!msg->async) + if (msg->streamTX) { + remoteStreamMessageFinished(client, msg); + } else if (!msg->async) client->nrequests--; /* See if the recv queue is currently throttled */ diff --git a/daemon/libvirtd.h b/daemon/libvirtd.h index aae23fcacff50487a2684dd082fe7ef489c49986..579e1c4144ae6b295801db00b04bf5e1b721aa01 100644 --- a/daemon/libvirtd.h +++ b/daemon/libvirtd.h @@ -130,6 +130,7 @@ struct qemud_client_message { unsigned int bufferOffset; unsigned int async : 1; + unsigned int streamTX : 1; remote_message_header hdr; diff --git a/daemon/stream.c b/daemon/stream.c index 1fe0e58db6e31795e0bf3f2168b3d9757c1435a2..584268dc4256cd072586e5fa5d3a5cd1b3d26855 100644 --- a/daemon/stream.c +++ b/daemon/stream.c @@ -32,6 +32,9 @@ static int remoteStreamHandleWrite(struct qemud_client *client, struct qemud_client_stream *stream); static int +remoteStreamHandleRead(struct qemud_client *client, + struct qemud_client_stream *stream); +static int remoteStreamHandleFinish(struct qemud_client *client, struct qemud_client_stream *stream, struct qemud_client_message *msg); @@ -48,6 +51,8 @@ remoteStreamUpdateEvents(struct qemud_client_stream *stream) int newEvents = 0; if (stream->rx) newEvents |= VIR_STREAM_EVENT_WRITABLE; + if (stream->tx && !stream->recvEOF) + newEvents |= VIR_STREAM_EVENT_READABLE; virStreamEventUpdateCallback(stream->st, newEvents); } @@ -87,6 +92,16 @@ remoteStreamEvent(virStreamPtr st, int events, void *opaque) } } + if (!stream->recvEOF && + (events & (VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP))) { + events = events & ~(VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP); + if (remoteStreamHandleRead(client, stream) < 0) { + remoteRemoveClientStream(client, stream); + qemudDispatchClientFailure(client); + goto cleanup; + } + } + if (!stream->closed && (events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP))) { int ret; @@ -507,3 +522,84 @@ remoteStreamHandleWrite(struct qemud_client *client, return 0; } + + + +/* + * Invoked when a stream is signalled as having data + * available to read. This reads upto one message + * worth of data, and then queues that for transmission + * to the client. + * + * Returns 0 if data was queued for TX, or a error RPC + * was sent, or -1 on fatal error, indicating client should + * be killed + */ +static int +remoteStreamHandleRead(struct qemud_client *client, + struct qemud_client_stream *stream) +{ + char *buffer; + size_t bufferLen = REMOTE_MESSAGE_PAYLOAD_MAX; + int ret; + + DEBUG("stream=%p", stream); + + /* Shouldn't ever be called unless we're marked able to + * transmit, but doesn't hurt to check */ + if (!stream->tx) + return 0; + + if (VIR_ALLOC_N(buffer, bufferLen) < 0) + return -1; + + ret = virStreamRecv(stream->st, buffer, bufferLen); + if (ret == -2) { + /* Should never get this, since we're only called when we know + * we're readable, but hey things change... */ + ret = 0; + } else if (ret < 0) { + remote_error rerr; + memset(&rerr, 0, sizeof rerr); + remoteDispatchConnError(&rerr, NULL); + + ret = remoteSerializeStreamError(client, &rerr, stream->procedure, stream->serial); + } else { + stream->tx = 0; + if (ret == 0) + stream->recvEOF = 1; + ret = remoteSendStreamData(client, stream, buffer, ret); + } + + VIR_FREE(buffer); + return ret; +} + + +/* + * Invoked when an outgoing data packet message has been fully sent. + * This simply re-enables TX of further data. + * + * The idea is to stop the daemon growing without bound due to + * fast stream, but slow client + */ +void +remoteStreamMessageFinished(struct qemud_client *client, + struct qemud_client_message *msg) +{ + struct qemud_client_stream *stream = client->streams; + + while (stream) { + if (msg->hdr.proc == stream->procedure && + msg->hdr.serial == stream->serial) + break; + stream = stream->next; + } + + DEBUG("Message client=%p stream=%p proc=%d serial=%d", client, stream, msg->hdr.proc, msg->hdr.serial); + + if (stream) { + stream->tx = 1; + remoteStreamUpdateEvents(stream); + } +} diff --git a/daemon/stream.h b/daemon/stream.h index 250498472eeb6c280bcc4e0570a6e06e8af8bb96..2e2d249517fdb298e6a13134aecea5460d7a50a4 100644 --- a/daemon/stream.h +++ b/daemon/stream.h @@ -46,4 +46,8 @@ int remoteRemoveClientStream(struct qemud_client *client, struct qemud_client_stream *stream); +void +remoteStreamMessageFinished(struct qemud_client *client, + struct qemud_client_message *msg); + #endif /* __LIBVIRTD_STREAM_H__ */