diff --git a/daemon/Makefile.am b/daemon/Makefile.am index 8d9f00f66a0c9614bc7e712e832764525d5970f8..14f2e6ac5d8005667d918983db2aeb70acfc393b 100644 --- a/daemon/Makefile.am +++ b/daemon/Makefile.am @@ -5,6 +5,7 @@ DAEMON_SOURCES = \ libvirtd.c libvirtd.h \ remote.c remote.h \ dispatch.c dispatch.h \ + stream.c stream.h \ remote_dispatch_prototypes.h \ remote_dispatch_table.h \ remote_dispatch_args.h \ diff --git a/daemon/dispatch.c b/daemon/dispatch.c index a60f2f47d164f34cb408fe0748f957ec36beaebd..1934d244e34714f73509b7ca91bb6af5cdfb7126 100644 --- a/daemon/dispatch.c +++ b/daemon/dispatch.c @@ -104,7 +104,7 @@ void remoteDispatchOOMError (remote_error *rerr) { remoteDispatchStringError(rerr, VIR_ERR_NO_MEMORY, - NULL); + "out of memory"); } @@ -136,6 +136,10 @@ remoteSerializeError(struct qemud_client *client, unsigned int len; struct qemud_client_message *msg = NULL; + DEBUG("prog=%d ver=%d proc=%d type=%d serial=%d, msg=%s", + program, version, procedure, type, serial, + rerr->message ? *rerr->message : "(none)"); + if (VIR_ALLOC(msg) < 0) goto fatal_error; @@ -206,19 +210,38 @@ fatal_error: * * Returns 0 if the error was sent, -1 upon fatal error */ -static int +int remoteSerializeReplyError(struct qemud_client *client, remote_error *rerr, remote_message_header *req) { + /* + * For data streams, errors are sent back as data streams + * For method calls, errors are sent back as method replies + */ return remoteSerializeError(client, rerr, req->prog, req->vers, req->proc, - REMOTE_REPLY, + req->type == REMOTE_STREAM ? REMOTE_STREAM : REMOTE_REPLY, req->serial); } +int +remoteSerializeStreamError(struct qemud_client *client, + remote_error *rerr, + int proc, + int serial) +{ + return remoteSerializeError(client, + rerr, + REMOTE_PROGRAM, + REMOTE_PROTOCOL_VERSION, + proc, + REMOTE_STREAM, + serial); +} + /* * @msg: the complete incoming message, whose header to decode * @@ -338,6 +361,10 @@ remoteDispatchClientRequest (struct qemud_server *server, { remote_error rerr; + DEBUG("prog=%d ver=%d type=%d satus=%d serial=%d proc=%d", + msg->hdr.prog, msg->hdr.vers, msg->hdr.type, + msg->hdr.status, msg->hdr.serial, msg->hdr.proc); + memset(&rerr, 0, sizeof rerr); /* Check version, etc. */ @@ -358,11 +385,24 @@ remoteDispatchClientRequest (struct qemud_server *server, case REMOTE_CALL: return remoteDispatchClientCall(server, client, msg); + case REMOTE_STREAM: + /* Since stream data is non-acked, async, we may continue to received + * stream packets after we closed down a stream. Just drop & ignore + * these. + */ + VIR_INFO("Ignoring unexpected stream data serial=%d proc=%d status=%d", + msg->hdr.serial, msg->hdr.proc, msg->hdr.status); + qemudClientMessageRelease(client, msg); + break; + default: remoteDispatchFormatError (&rerr, _("type (%d) != REMOTE_CALL"), (int) msg->hdr.type); + goto error; } + return 0; + error: return remoteSerializeReplyError(client, &rerr, &msg->hdr); } @@ -532,3 +572,84 @@ xdr_error: fatal_error: return -1; } + + +int +remoteSendStreamData(struct qemud_client *client, + struct qemud_client_stream *stream, + const char *data, + size_t len) +{ + struct qemud_client_message *msg; + XDR xdr; + + DEBUG("client=%p stream=%p data=%p len=%d", client, stream, data, len); + + if (VIR_ALLOC(msg) < 0) { + return -1; + } + + /* Return header. We're re-using same message object, so + * only need to tweak type/status fields */ + msg->hdr.prog = REMOTE_PROGRAM; + msg->hdr.vers = REMOTE_PROTOCOL_VERSION; + msg->hdr.proc = stream->procedure; + msg->hdr.type = REMOTE_STREAM; + msg->hdr.serial = stream->serial; + /* + * NB + * data != NULL + len > 0 => REMOTE_CONTINUE (Sending back data) + * data != NULL + len == 0 => REMOTE_CONTINUE (Sending read EOF) + * data == NULL => REMOTE_OK (Sending finish handshake confirmation) + */ + msg->hdr.status = data ? REMOTE_CONTINUE : REMOTE_OK; + + if (remoteEncodeClientMessageHeader(msg) < 0) + goto fatal_error; + + if (data && len) { + if ((msg->bufferLength - msg->bufferOffset) < len) + goto fatal_error; + + /* Now for the payload */ + xdrmem_create (&xdr, + msg->buffer, + msg->bufferLength, + XDR_ENCODE); + + /* Skip over existing header already written */ + if (xdr_setpos(&xdr, msg->bufferOffset) == 0) + goto xdr_error; + + memcpy(msg->buffer + msg->bufferOffset, data, len); + msg->bufferOffset += len; + + /* Update the length word. */ + len = msg->bufferOffset; + if (xdr_setpos (&xdr, 0) == 0) + goto xdr_error; + + if (!xdr_u_int (&xdr, &len)) + goto xdr_error; + + xdr_destroy (&xdr); + + DEBUG("Total %d", msg->bufferOffset); + } + + /* Reset ready for I/O */ + msg->bufferLength = msg->bufferOffset; + msg->bufferOffset = 0; + + /* Put reply on end of tx queue to send out */ + qemudClientMessageQueuePush(&client->tx, msg); + qemudUpdateClientEvent(client); + + return 0; + +xdr_error: + xdr_destroy (&xdr); +fatal_error: + VIR_FREE(msg); + return -1; +} diff --git a/daemon/dispatch.h b/daemon/dispatch.h index 30de65887bad96c9568f9b9e1e81a2fa91066107..a5bf4741d05223fd417bd8c695dbfc0fdee8abe3 100644 --- a/daemon/dispatch.h +++ b/daemon/dispatch.h @@ -49,6 +49,17 @@ void remoteDispatchOOMError (remote_error *rerr); void remoteDispatchConnError (remote_error *rerr, virConnectPtr conn); + +int +remoteSerializeReplyError(struct qemud_client *client, + remote_error *rerr, + remote_message_header *req); +int +remoteSerializeStreamError(struct qemud_client *client, + remote_error *rerr, + int proc, + int serial); + /* Having this here is dubious. It should be in remote.h * but qemud.c shouldn't depend on that header directly. * Refactor this later to deal with this properly. @@ -60,4 +71,10 @@ int remoteRelayDomainEvent (virConnectPtr conn ATTRIBUTE_UNUSED, void *opaque); +int +remoteSendStreamData(struct qemud_client *client, + struct qemud_client_stream *stream, + const char *data, + size_t len); + #endif /* __LIBVIRTD_DISPATCH_H__ */ diff --git a/daemon/libvirtd.c b/daemon/libvirtd.c index 2bae782e7c2263e98315000e803eae1f6013cdd2..e36151f6fad99f764a74e2fd3940b41996e2e2d7 100644 --- a/daemon/libvirtd.c +++ b/daemon/libvirtd.c @@ -61,6 +61,7 @@ #include "conf.h" #include "event.h" #include "memory.h" +#include "stream.h" #ifdef HAVE_AVAHI #include "mdns.h" #endif @@ -1723,10 +1724,15 @@ readmore: /* Check if any filters match this message */ filter = client->filters; while (filter) { - if ((filter->query)(msg, filter->opaque)) { - qemudClientMessageQueuePush(&filter->dx, msg); + int ret; + ret = (filter->query)(client, msg, filter->opaque); + if (ret == 1) { msg = NULL; break; + } else if (ret == -1) { + VIR_FREE(msg); + qemudDispatchClientFailure(client); + return; } filter = filter->next; } @@ -1888,6 +1894,29 @@ static ssize_t qemudClientWrite(struct qemud_client *client) { } +void +qemudClientMessageRelease(struct qemud_client *client, + struct qemud_client_message *msg) +{ + if (!msg->async) + client->nrequests--; + + /* See if the recv queue is currently throttled */ + if (!client->rx && + client->nrequests < max_client_requests) { + /* Reset message record for next RX attempt */ + memset(msg, 0, sizeof(*msg)); + client->rx = msg; + /* Get ready to receive next message */ + client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN; + } else { + VIR_FREE(msg); + } + + qemudUpdateClientEvent(client); +} + + /* * Process all queued client->tx messages until * we would block on I/O @@ -1911,26 +1940,10 @@ qemudDispatchClientWrite(struct qemud_client *client) { /* Get finished reply from head of tx queue */ reply = qemudClientMessageQueueServe(&client->tx); - /* If its not an async message, then we have - * just completed an RPC request */ - if (!reply->async) - client->nrequests--; - - /* Move record to end of 'rx' ist */ - if (!client->rx && - client->nrequests < max_client_requests) { - /* Reset message record for next RX attempt */ - client->rx = reply; - client->rx->bufferOffset = 0; - client->rx->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN; - } else { - VIR_FREE(reply); - } + qemudClientMessageRelease(client, reply); if (client->closing) qemudDispatchClientFailure(client); - else - qemudUpdateClientEvent(client); } } } @@ -2142,6 +2155,9 @@ static void qemudFreeClient(struct qemud_client *client) { VIR_FREE(msg); } + while (client->streams) + remoteRemoveClientStream(client, client->streams); + if (client->conn) virConnectClose(client->conn); virMutexDestroy(&client->lock); diff --git a/daemon/libvirtd.h b/daemon/libvirtd.h index 241c637b4b15bf97af4ccbe8db36bd37aa51f422..aae23fcacff50487a2684dd082fe7ef489c49986 100644 --- a/daemon/libvirtd.h +++ b/daemon/libvirtd.h @@ -129,26 +129,43 @@ struct qemud_client_message { unsigned int bufferLength; unsigned int bufferOffset; - int async : 1; + unsigned int async : 1; remote_message_header hdr; struct qemud_client_message *next; }; +struct qemud_client; + /* Allow for filtering of incoming messages to a custom * dispatch processing queue, instead of client->dx. */ -typedef int (*qemud_client_filter_func)(struct qemud_client_message *msg, void *opaque); +typedef int (*qemud_client_filter_func)(struct qemud_client *client, + struct qemud_client_message *msg, void *opaque); struct qemud_client_filter { qemud_client_filter_func query; void *opaque; - struct qemud_client_message *dx; - struct qemud_client_filter *next; }; +struct qemud_client_stream { + virStreamPtr st; + int procedure; + int serial; + + unsigned int recvEOF : 1; + unsigned int closed : 1; + + struct qemud_client_filter filter; + + struct qemud_client_message *rx; + int tx; + + struct qemud_client_stream *next; +}; + /* Stores the per-client connection state */ struct qemud_client { virMutex lock; @@ -197,6 +214,10 @@ struct qemud_client { * end up on the 'dx' queue */ struct qemud_client_filter *filters; + /* Data streams */ + struct qemud_client_stream *streams; + + /* This is only valid if a remote open call has been made on this * connection, otherwise it will be NULL. Also if remote close is * called, it will be set back to NULL if that succeeds. @@ -275,6 +296,9 @@ qemudClientMessageQueuePush(struct qemud_client_message **queue, struct qemud_client_message * qemudClientMessageQueueServe(struct qemud_client_message **queue); +void +qemudClientMessageRelease(struct qemud_client *client, + struct qemud_client_message *msg); #if HAVE_POLKIT diff --git a/daemon/stream.c b/daemon/stream.c new file mode 100644 index 0000000000000000000000000000000000000000..1644a1bb0e9cf290aa8568dc4d8a2aeb775741ae --- /dev/null +++ b/daemon/stream.c @@ -0,0 +1,210 @@ +/* + * stream.c: APIs for managing client streams + * + * Copyright (C) 2009 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: Daniel P. Berrange + */ + + +#include + +#include "stream.h" +#include "memory.h" +#include "dispatch.h" +#include "logging.h" + + +/* + * @client: a locked client object + * + * Invoked by the main loop when filtering incoming messages. + * + * Returns 1 if the message was processed, 0 if skipped, + * -1 on fatal client error + */ +static int +remoteStreamFilter(struct qemud_client *client ATTRIBUTE_UNUSED, + struct qemud_client_message *msg ATTRIBUTE_UNUSED, + void *opaque ATTRIBUTE_UNUSED) +{ + return 0; +} + + +/* + * @conn: a connection object to associate the stream with + * @hdr: the method call to associate with the stram + * + * Creates a new stream for this conn + * + * Returns a new stream object, or NULL upon OOM + */ +struct qemud_client_stream * +remoteCreateClientStream(virConnectPtr conn, + remote_message_header *hdr) +{ + struct qemud_client_stream *stream; + + DEBUG("proc=%d serial=%d", hdr->proc, hdr->serial); + + if (VIR_ALLOC(stream) < 0) + return NULL; + + stream->procedure = hdr->proc; + stream->serial = hdr->serial; + + stream->st = virStreamNew(conn, VIR_STREAM_NONBLOCK); + if (!stream->st) { + VIR_FREE(stream); + return NULL; + } + + stream->filter.query = remoteStreamFilter; + stream->filter.opaque = stream; + + return stream; +} + +/* + * @stream: an unused client stream + * + * Frees the memory associated with this inactive client + * stream + */ +void remoteFreeClientStream(struct qemud_client *client, + struct qemud_client_stream *stream) +{ + struct qemud_client_message *msg; + + if (!stream) + return; + + DEBUG("proc=%d serial=%d", stream->procedure, stream->serial); + + msg = stream->rx; + while (msg) { + struct qemud_client_message *tmp = msg->next; + qemudClientMessageRelease(client, msg); + msg = tmp; + } + + virStreamFree(stream->st); + VIR_FREE(stream); +} + + +/* + * @client: a locked client to add the stream to + * @stream: a stream to add + */ +int remoteAddClientStream(struct qemud_client *client, + struct qemud_client_stream *stream) +{ + struct qemud_client_stream *tmp = client->streams; + + DEBUG("client=%p proc=%d serial=%d", client, stream->procedure, stream->serial); + + if (tmp) { + while (tmp->next) + tmp = tmp->next; + tmp->next = stream; + } else { + client->streams = stream; + } + + stream->filter.next = client->filters; + client->filters = &stream->filter; + + stream->tx = 1; + + return 0; +} + + +/* + * @client: a locked client object + * @procedure: procedure associated with the stream + * @serial: serial number associated with the stream + * + * Finds a existing active stream + * + * Returns a stream object matching the procedure+serial number, or NULL + */ +struct qemud_client_stream * +remoteFindClientStream(struct qemud_client *client, + virStreamPtr st) +{ + struct qemud_client_stream *stream = client->streams; + + while (stream) { + if (stream->st == st) + return stream; + stream = stream->next; + } + + return NULL; +} + + +/* + * @client: a locked client object + * @stream: an inactive, closed stream object + * + * Removes a stream from the list of active streams for the client + * + * Returns 0 if the stream was removd, -1 if it doesn't exist + */ +int +remoteRemoveClientStream(struct qemud_client *client, + struct qemud_client_stream *stream) +{ + DEBUG("client=%p proc=%d serial=%d", client, stream->procedure, stream->serial); + + struct qemud_client_stream *curr = client->streams; + struct qemud_client_stream *prev = NULL; + struct qemud_client_filter *filter = NULL; + + if (client->filters == &stream->filter) { + client->filters = client->filters->next; + } else { + filter = client->filters; + while (filter) { + if (filter->next == &stream->filter) { + filter->next = filter->next->next; + break; + } + } + } + + if (!stream->closed) + virStreamAbort(stream->st); + + while (curr) { + if (curr == stream) { + if (prev) + prev->next = curr->next; + else + client->streams = curr->next; + remoteFreeClientStream(client, stream); + return 0; + } + prev = curr; + curr = curr->next; + } + return -1; +} diff --git a/daemon/stream.h b/daemon/stream.h new file mode 100644 index 0000000000000000000000000000000000000000..250498472eeb6c280bcc4e0570a6e06e8af8bb96 --- /dev/null +++ b/daemon/stream.h @@ -0,0 +1,49 @@ +/* + * stream.h: APIs for managing client streams + * + * Copyright (C) 2009 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Author: Daniel P. Berrange + */ + + +#ifndef __LIBVIRTD_STREAM_H__ +#define __LIBVIRTD_STREAM_H__ + +#include "libvirtd.h" + + + +struct qemud_client_stream * +remoteCreateClientStream(virConnectPtr conn, + remote_message_header *hdr); + +void remoteFreeClientStream(struct qemud_client *client, + struct qemud_client_stream *stream); + +int remoteAddClientStream(struct qemud_client *client, + struct qemud_client_stream *stream); + +struct qemud_client_stream * +remoteFindClientStream(struct qemud_client *client, + virStreamPtr stream); + +int +remoteRemoveClientStream(struct qemud_client *client, + struct qemud_client_stream *stream); + +#endif /* __LIBVIRTD_STREAM_H__ */ diff --git a/src/remote/remote_protocol.h b/src/remote/remote_protocol.h index ceaf82c90796c2b32fced52da43f4fa9c6695e88..64da9fa88bb94bf866bbc6de69b32658033694ab 100644 --- a/src/remote/remote_protocol.h +++ b/src/remote/remote_protocol.h @@ -16,6 +16,8 @@ extern "C" { #include "internal.h" #include #define REMOTE_MESSAGE_MAX 262144 +#define REMOTE_MESSAGE_HEADER_MAX 24 +#define REMOTE_MESSAGE_PAYLOAD_MAX 262120 #define REMOTE_STRING_MAX 65536 typedef char *remote_nonnull_string; @@ -1684,12 +1686,14 @@ enum remote_message_type { REMOTE_CALL = 0, REMOTE_REPLY = 1, REMOTE_MESSAGE = 2, + REMOTE_STREAM = 3, }; typedef enum remote_message_type remote_message_type; enum remote_message_status { REMOTE_OK = 0, REMOTE_ERROR = 1, + REMOTE_CONTINUE = 2, }; typedef enum remote_message_status remote_message_status; #define REMOTE_MESSAGE_HEADER_XDR_LEN 4 diff --git a/src/remote/remote_protocol.x b/src/remote/remote_protocol.x index 29abdb7df8eb5aa55354587ba3c6ea6789053793..6b0a7842877fd2ffe714984be62fcc9df7651ce4 100644 --- a/src/remote/remote_protocol.x +++ b/src/remote/remote_protocol.x @@ -44,6 +44,12 @@ /* Maximum total message size (serialised). */ const REMOTE_MESSAGE_MAX = 262144; +/* Size of struct remote_message_header (serialized)*/ +const REMOTE_MESSAGE_HEADER_MAX = 24; + +/* Size of message payload */ +const REMOTE_MESSAGE_PAYLOAD_MAX = 262120; + /* Length of long, but not unbounded, strings. * This is an arbitrary limit designed to stop the decoder from trying * to allocate unbounded amounts of memory when fed with a bad message. @@ -1541,8 +1547,27 @@ enum remote_procedure { * * serial matches that from the corresponding REMOTE_CALL * * - type == REMOTE_MESSAGE - * * serial matches that from the corresponding REMOTE_CALL, or zero + * * serial is always zero + * + * - type == REMOTE_STREAM + * * serial matches that from the corresponding REMOTE_CALL * + * and the 'status' field varies according to: + * + * - type == REMOTE_CALL + * * REMOTE_OK always + * + * - type == REMOTE_REPLY + * * REMOTE_OK if RPC finished successfully + * * REMOTE_ERROR if something failed + * + * - type == REMOTE_MESSAGE + * * REMOTE_OK always + * + * - type == REMOTE_STREAM + * * REMOTE_CONTINUE if more data is following + * * REMOTE_OK if stream is complete + * * REMOTE_ERROR if stream had an error * * Payload varies according to type and status: * @@ -1561,6 +1586,13 @@ enum remote_procedure { * * status == REMOTE_ERROR * remote_error Error information * + * - type == REMOTE_STREAM + * * status == REMOTE_CONTINUE + * byte[] raw stream data + * * status == REMOTE_ERROR + * remote_error error information + * * status == REMOTE_OK + * */ enum remote_message_type { /* client -> server. args from a method call */ @@ -1568,7 +1600,9 @@ enum remote_message_type { /* server -> client. reply/error from a method call */ REMOTE_REPLY = 1, /* either direction. async notification */ - REMOTE_MESSAGE = 2 + REMOTE_MESSAGE = 2, + /* either direction. stream data packet */ + REMOTE_STREAM = 3 }; enum remote_message_status { @@ -1580,7 +1614,11 @@ enum remote_message_status { /* For replies, indicates that an error happened, and a struct * remote_error follows. */ - REMOTE_ERROR = 1 + REMOTE_ERROR = 1, + + /* For streams, indicates that more data is still expected + */ + REMOTE_CONTINUE = 2 }; /* 4 byte length word per header */