stream.c 22.5 KB
Newer Older
1 2 3
/*
 * stream.c: APIs for managing client streams
 *
4
 * Copyright (C) 2009-2014 Red Hat, Inc.
5 6 7 8 9 10 11 12 13 14 15 16
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
17
 * License along with this library.  If not, see
O
Osier Yang 已提交
18
 * <http://www.gnu.org/licenses/>.
19 20 21 22 23 24 25 26
 *
 * Author: Daniel P. Berrange <berrange@redhat.com>
 */


#include <config.h>

#include "stream.h"
27
#include "remote.h"
28
#include "viralloc.h"
29
#include "virlog.h"
30
#include "virnetserverclient.h"
31
#include "virerror.h"
32 33

#define VIR_FROM_THIS VIR_FROM_STREAMS
34

35 36
VIR_LOG_INIT("daemon.stream");

37 38
struct daemonClientStream {
    daemonClientPrivatePtr priv;
39
    int refs;
40 41 42 43 44

    virNetServerProgramPtr prog;

    virStreamPtr st;
    int procedure;
45
    unsigned int serial;
46

47 48
    bool recvEOF;
    bool closed;
49 50 51 52

    int filterID;

    virNetMessagePtr rx;
53
    bool tx;
54 55 56 57

    daemonClientStreamPtr next;
};

58
static int
59 60
daemonStreamHandleWrite(virNetServerClientPtr client,
                        daemonClientStream *stream);
61
static int
62 63
daemonStreamHandleRead(virNetServerClientPtr client,
                       daemonClientStream *stream);
64
static int
65 66 67
daemonStreamHandleFinish(virNetServerClientPtr client,
                         daemonClientStream *stream,
                         virNetMessagePtr msg);
68
static int
69 70 71
daemonStreamHandleAbort(virNetServerClientPtr client,
                        daemonClientStream *stream,
                        virNetMessagePtr msg);
72 73 74 75



static void
76
daemonStreamUpdateEvents(daemonClientStream *stream)
77 78
{
    int newEvents = 0;
79 80
    if (stream->closed)
        return;
81 82
    if (stream->rx)
        newEvents |= VIR_STREAM_EVENT_WRITABLE;
83 84
    if (stream->tx && !stream->recvEOF)
        newEvents |= VIR_STREAM_EVENT_READABLE;
85 86 87 88

    virStreamEventUpdateCallback(stream->st, newEvents);
}

89 90 91 92 93 94 95 96
/*
 * Invoked when an outgoing data packet message has been fully sent.
 * This simply re-enables TX of further data.
 *
 * The idea is to stop the daemon growing without bound due to
 * fast stream, but slow client
 */
static void
97
daemonStreamMessageFinished(virNetMessagePtr msg,
98 99 100
                            void *opaque)
{
    daemonClientStream *stream = opaque;
101
    VIR_DEBUG("stream=%p proc=%d serial=%u",
102 103
              stream, msg->header.proc, msg->header.serial);

104
    stream->tx = true;
105
    daemonStreamUpdateEvents(stream);
106 107

    daemonFreeClientStream(NULL, stream);
108
}
109

110

111 112 113 114
/*
 * Callback that gets invoked when a stream becomes writable/readable
 */
static void
115
daemonStreamEvent(virStreamPtr st, int events, void *opaque)
116
{
117 118 119
    virNetServerClientPtr client = opaque;
    daemonClientStream *stream;
    daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client);
120

121
    virMutexLock(&priv->lock);
122

123 124 125 126 127 128
    stream = priv->streams;
    while (stream) {
        if (stream->st == st)
            break;
        stream = stream->next;
    }
129 130 131 132 133 134 135

    if (!stream) {
        VIR_WARN("event for client=%p stream st=%p, but missing stream state", client, st);
        virStreamEventRemoveCallback(st);
        goto cleanup;
    }

136
    VIR_DEBUG("st=%p events=%d EOF=%d closed=%d", st, events, stream->recvEOF, stream->closed);
137

D
Daniel P. Berrange 已提交
138 139
    if (!stream->closed &&
        (events & VIR_STREAM_EVENT_WRITABLE)) {
140 141 142
        if (daemonStreamHandleWrite(client, stream) < 0) {
            daemonRemoveClientStream(client, stream);
            virNetServerClientClose(client);
143 144 145 146
            goto cleanup;
        }
    }

D
Daniel P. Berrange 已提交
147 148 149
    if (!stream->closed && !stream->recvEOF &&
        (events & (VIR_STREAM_EVENT_READABLE))) {
        events = events & ~(VIR_STREAM_EVENT_READABLE);
150 151 152
        if (daemonStreamHandleRead(client, stream) < 0) {
            daemonRemoveClientStream(client, stream);
            virNetServerClientClose(client);
153 154
            goto cleanup;
        }
155 156 157 158 159 160 161 162
        /* If we detected EOF during read processing,
         * then clear hangup/error conditions, since
         * we want the client to see the EOF message
         * we just sent them
         */
        if (stream->recvEOF)
            events = events & ~(VIR_STREAM_EVENT_HANGUP |
                                VIR_STREAM_EVENT_ERROR);
163 164
    }

165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
    /* If we have a completion/abort message, always process it */
    if (stream->rx) {
        virNetMessagePtr msg = stream->rx;
        switch (msg->header.status) {
        case VIR_NET_CONTINUE:
            /* nada */
            break;
        case VIR_NET_OK:
            virNetMessageQueueServe(&stream->rx);
            if (daemonStreamHandleFinish(client, stream, msg) < 0) {
                virNetMessageFree(msg);
                daemonRemoveClientStream(client, stream);
                virNetServerClientClose(client);
                goto cleanup;
            }
            break;
        case VIR_NET_ERROR:
        default:
            virNetMessageQueueServe(&stream->rx);
            if (daemonStreamHandleAbort(client, stream, msg) < 0) {
                virNetMessageFree(msg);
                daemonRemoveClientStream(client, stream);
                virNetServerClientClose(client);
                goto cleanup;
            }
            break;
        }
    }

D
Daniel P. Berrange 已提交
194 195 196 197 198 199 200 201

    /* If we got HANGUP, we need to only send an empty
     * packet so the client sees an EOF and cleans up
     */
    if (!stream->closed && !stream->recvEOF &&
        (events & VIR_STREAM_EVENT_HANGUP)) {
        virNetMessagePtr msg;
        events &= ~(VIR_STREAM_EVENT_HANGUP);
202
        stream->tx = false;
203
        stream->recvEOF = true;
D
Daniel P. Berrange 已提交
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
        if (!(msg = virNetMessageNew(false))) {
            daemonRemoveClientStream(client, stream);
            virNetServerClientClose(client);
            goto cleanup;
        }
        msg->cb = daemonStreamMessageFinished;
        msg->opaque = stream;
        stream->refs++;
        if (virNetServerProgramSendStreamData(remoteProgram,
                                              client,
                                              msg,
                                              stream->procedure,
                                              stream->serial,
                                              "", 0) < 0) {
            virNetMessageFree(msg);
            daemonRemoveClientStream(client, stream);
            virNetServerClientClose(client);
            goto cleanup;
        }
    }

225 226 227
    if (!stream->closed &&
        (events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP))) {
        int ret;
228 229 230 231
        virNetMessagePtr msg;
        virNetMessageError rerr;

        memset(&rerr, 0, sizeof(rerr));
232
        stream->closed = true;
233
        virStreamEventRemoveCallback(stream->st);
234 235
        virStreamAbort(stream->st);
        if (events & VIR_STREAM_EVENT_HANGUP)
236 237
            virReportError(VIR_ERR_RPC,
                           "%s", _("stream had unexpected termination"));
238
        else
239 240
            virReportError(VIR_ERR_RPC,
                           "%s", _("stream had I/O failure"));
241

242
        msg = virNetMessageNew(false);
243 244 245 246 247 248 249 250 251 252 253
        if (!msg) {
            ret = -1;
        } else {
            ret = virNetServerProgramSendStreamError(remoteProgram,
                                                     client,
                                                     msg,
                                                     &rerr,
                                                     stream->procedure,
                                                     stream->serial);
        }
        daemonRemoveClientStream(client, stream);
254
        if (ret < 0)
255
            virNetServerClientClose(client);
256 257 258 259
        goto cleanup;
    }

    if (stream->closed) {
260
        daemonRemoveClientStream(client, stream);
261
    } else {
262
        daemonStreamUpdateEvents(stream);
263 264
    }

265
 cleanup:
266
    virMutexUnlock(&priv->lock);
267 268
}

269 270 271 272 273 274 275 276 277 278

/*
 * @client: a locked client object
 *
 * Invoked by the main loop when filtering incoming messages.
 *
 * Returns 1 if the message was processed, 0 if skipped,
 * -1 on fatal client error
 */
static int
279
daemonStreamFilter(virNetServerClientPtr client ATTRIBUTE_UNUSED,
280 281
                   virNetMessagePtr msg,
                   void *opaque)
282
{
283 284
    daemonClientStream *stream = opaque;
    int ret = 0;
285

286 287 288 289 290 291 292 293 294 295 296 297
    virMutexLock(&stream->priv->lock);

    if (msg->header.type != VIR_NET_STREAM)
        goto cleanup;

    if (!virNetServerProgramMatches(stream->prog, msg))
        goto cleanup;

    if (msg->header.proc != stream->procedure ||
        msg->header.serial != stream->serial)
        goto cleanup;

298
    VIR_DEBUG("Incoming client=%p, rx=%p, serial=%u, proc=%d, status=%d",
299 300 301 302 303 304 305
              client, stream->rx, msg->header.proc,
              msg->header.serial, msg->header.status);

    virNetMessageQueuePush(&stream->rx, msg);
    daemonStreamUpdateEvents(stream);
    ret = 1;

306
 cleanup:
307 308
    virMutexUnlock(&stream->priv->lock);
    return ret;
309 310 311 312 313
}


/*
 * @conn: a connection object to associate the stream with
314
 * @header: the method call to associate with the stream
315 316 317 318 319
 *
 * Creates a new stream for this conn
 *
 * Returns a new stream object, or NULL upon OOM
 */
320 321 322 323 324
daemonClientStream *
daemonCreateClientStream(virNetServerClientPtr client,
                         virStreamPtr st,
                         virNetServerProgramPtr prog,
                         virNetMessageHeaderPtr header)
325
{
326 327
    daemonClientStream *stream;
    daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client);
328

329
    VIR_DEBUG("client=%p, proc=%d, serial=%u, st=%p",
330
              client, header->proc, header->serial, st);
331

332
    if (VIR_ALLOC(stream) < 0)
333 334
        return NULL;

335
    stream->refs = 1;
336
    stream->priv = priv;
337
    stream->prog = virObjectRef(prog);
338 339 340 341
    stream->procedure = header->proc;
    stream->serial = header->serial;
    stream->filterID = -1;
    stream->st = st;
342 343 344 345 346 347 348 349 350 351

    return stream;
}

/*
 * @stream: an unused client stream
 *
 * Frees the memory associated with this inactive client
 * stream
 */
352 353
int daemonFreeClientStream(virNetServerClientPtr client,
                           daemonClientStream *stream)
354
{
355 356
    virNetMessagePtr msg;
    int ret = 0;
357 358

    if (!stream)
359 360
        return 0;

361 362 363 364
    stream->refs--;
    if (stream->refs)
        return 0;

365
    VIR_DEBUG("client=%p, proc=%d, serial=%u",
366
              client, stream->procedure, stream->serial);
367

368
    virObjectUnref(stream->prog);
369 370 371

    msg = stream->rx;
    while (msg) {
372
        virNetMessagePtr tmp = msg->next;
373 374
        if (client) {
            /* Send a dummy reply to free up 'msg' & unblock client rx */
375
            virNetMessageClear(msg);
376 377 378 379 380 381 382
            msg->header.type = VIR_NET_REPLY;
            if (virNetServerClientSendMessage(client, msg) < 0) {
                virNetServerClientImmediateClose(client);
                virNetMessageFree(msg);
                ret = -1;
            }
        } else {
383 384
            virNetMessageFree(msg);
        }
385 386 387
        msg = tmp;
    }

388
    virObjectUnref(stream->st);
389
    VIR_FREE(stream);
390 391

    return ret;
392 393 394 395 396 397 398
}


/*
 * @client: a locked client to add the stream to
 * @stream: a stream to add
 */
399 400 401
int daemonAddClientStream(virNetServerClientPtr client,
                          daemonClientStream *stream,
                          bool transmit)
402
{
403
    VIR_DEBUG("client=%p, proc=%d, serial=%u, st=%p, transmit=%d",
404 405
              client, stream->procedure, stream->serial, stream->st, transmit);
    daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client);
406

407 408 409 410
    if (stream->filterID != -1) {
        VIR_WARN("Filter already added to client %p", client);
        return -1;
    }
411

412
    if (virStreamEventAddCallback(stream->st, 0,
413
                                  daemonStreamEvent, client,
414
                                  virObjectFreeCallback) < 0)
415 416
        return -1;

417 418
    virObjectRef(client);

419 420 421 422 423
    if ((stream->filterID = virNetServerClientAddFilter(client,
                                                        daemonStreamFilter,
                                                        stream)) < 0) {
        virStreamEventRemoveCallback(stream->st);
        return -1;
424 425
    }

426
    if (transmit)
427
        stream->tx = true;
428

429 430 431
    virMutexLock(&priv->lock);
    stream->next = priv->streams;
    priv->streams = stream;
432

433
    daemonStreamUpdateEvents(stream);
434

435
    virMutexUnlock(&priv->lock);
436

437
    return 0;
438 439 440 441 442 443 444 445 446 447 448 449
}


/*
 * @client: a locked client object
 * @stream: an inactive, closed stream object
 *
 * Removes a stream from the list of active streams for the client
 *
 * Returns 0 if the stream was removd, -1 if it doesn't exist
 */
int
450 451
daemonRemoveClientStream(virNetServerClientPtr client,
                         daemonClientStream *stream)
452
{
453
    VIR_DEBUG("client=%p, proc=%d, serial=%u, st=%p",
454 455 456 457 458 459 460 461 462
              client, stream->procedure, stream->serial, stream->st);
    daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client);
    daemonClientStream *curr = priv->streams;
    daemonClientStream *prev = NULL;

    if (stream->filterID != -1) {
        virNetServerClientRemoveFilter(client,
                                       stream->filterID);
        stream->filterID = -1;
463 464
    }

465 466
    if (!stream->closed) {
        virStreamEventRemoveCallback(stream->st);
467
        virStreamAbort(stream->st);
468
    }
469 470 471 472 473 474

    while (curr) {
        if (curr == stream) {
            if (prev)
                prev->next = curr->next;
            else
475 476
                priv->streams = curr->next;
            return daemonFreeClientStream(client, stream);
477 478 479 480 481 482
        }
        prev = curr;
        curr = curr->next;
    }
    return -1;
}
483 484


485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506
void
daemonRemoveAllClientStreams(daemonClientStream *stream)
{
    daemonClientStream *tmp;

    VIR_DEBUG("stream=%p", stream);

    while (stream) {
        tmp = stream->next;

        if (!stream->closed) {
            virStreamEventRemoveCallback(stream->st);
            virStreamAbort(stream->st);
        }

        daemonFreeClientStream(NULL, stream);

        VIR_DEBUG("next stream=%p", tmp);
        stream = tmp;
    }
}

507 508 509 510 511 512 513
/*
 * Returns:
 *   -1  if fatal error occurred
 *    0  if message was fully processed
 *    1  if message is still being processed
 */
static int
514 515 516
daemonStreamHandleWriteData(virNetServerClientPtr client,
                            daemonClientStream *stream,
                            virNetMessagePtr msg)
517 518 519
{
    int ret;

520
    VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%u, len=%zu, offset=%zu",
521 522
              client, stream, msg->header.proc, msg->header.serial,
              msg->bufferLength, msg->bufferOffset);
523 524 525 526 527 528 529 530 531 532 533 534 535 536 537

    ret = virStreamSend(stream->st,
                        msg->buffer + msg->bufferOffset,
                        msg->bufferLength - msg->bufferOffset);

    if (ret > 0) {
        msg->bufferOffset += ret;

        /* Partial write, so indicate we have more todo later */
        if (msg->bufferOffset < msg->bufferLength)
            return 1;
    } else if (ret == -2) {
        /* Blocking, so indicate we have more todo later */
        return 1;
    } else {
538 539 540 541
        virNetMessageError rerr;

        memset(&rerr, 0, sizeof(rerr));

542
        VIR_INFO("Stream send failed");
543
        stream->closed = true;
544 545 546
        virStreamEventRemoveCallback(stream->st);
        virStreamAbort(stream->st);

547 548 549 550 551
        return virNetServerProgramSendReplyError(stream->prog,
                                                 client,
                                                 msg,
                                                 &rerr,
                                                 &msg->header);
552 553 554 555 556 557 558
    }

    return 0;
}


/*
E
Eric Blake 已提交
559
 * Process a finish handshake from the client.
560
 *
561
 * Returns a VIR_NET_OK confirmation if successful, or a VIR_NET_ERROR
562 563 564 565 566
 * if there was a stream error
 *
 * Returns 0 if successfully sent RPC reply, -1 upon fatal error
 */
static int
567 568 569
daemonStreamHandleFinish(virNetServerClientPtr client,
                         daemonClientStream *stream,
                         virNetMessagePtr msg)
570 571 572
{
    int ret;

573
    VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%u",
574
              client, stream, msg->header.proc, msg->header.serial);
575

576
    stream->closed = true;
577
    virStreamEventRemoveCallback(stream->st);
578 579 580
    ret = virStreamFinish(stream->st);

    if (ret < 0) {
581 582 583 584 585 586 587
        virNetMessageError rerr;
        memset(&rerr, 0, sizeof(rerr));
        return virNetServerProgramSendReplyError(stream->prog,
                                                 client,
                                                 msg,
                                                 &rerr,
                                                 &msg->header);
588 589
    } else {
        /* Send zero-length confirm */
590 591 592 593 594 595
        return virNetServerProgramSendStreamData(stream->prog,
                                                 client,
                                                 msg,
                                                 stream->procedure,
                                                 stream->serial,
                                                 NULL, 0);
596 597 598 599 600 601 602 603 604 605
    }
}


/*
 * Process an abort request from the client.
 *
 * Returns 0 if successfully aborted, -1 upon error
 */
static int
606 607 608
daemonStreamHandleAbort(virNetServerClientPtr client,
                        daemonClientStream *stream,
                        virNetMessagePtr msg)
609
{
610
    VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%u",
611 612
              client, stream, msg->header.proc, msg->header.serial);
    virNetMessageError rerr;
613

614
    memset(&rerr, 0, sizeof(rerr));
615

616
    stream->closed = true;
617
    virStreamEventRemoveCallback(stream->st);
618 619
    virStreamAbort(stream->st);

620
    if (msg->header.status == VIR_NET_ERROR) {
621 622
        virReportError(VIR_ERR_RPC,
                       "%s", _("stream aborted at client request"));
623
    } else {
624
        VIR_WARN("unexpected stream status %d", msg->header.status);
625 626 627
        virReportError(VIR_ERR_RPC,
                       _("stream aborted with unexpected status %d"),
                       msg->header.status);
628 629
    }

630 631 632 633 634
    return virNetServerProgramSendReplyError(remoteProgram,
                                             client,
                                             msg,
                                             &rerr,
                                             &msg->header);
635 636 637 638 639 640 641 642 643 644 645 646
}



/*
 * Called when the stream is signalled has being able to accept
 * data writes. Will process all pending incoming messages
 * until they're all gone, or I/O blocks
 *
 * Returns 0 on success, or -1 upon fatal error
 */
static int
647 648
daemonStreamHandleWrite(virNetServerClientPtr client,
                        daemonClientStream *stream)
649
{
650
    VIR_DEBUG("client=%p, stream=%p", client, stream);
651

652 653
    while (stream->rx && !stream->closed) {
        virNetMessagePtr msg = stream->rx;
654
        int ret;
655 656 657 658

        switch (msg->header.status) {
        case VIR_NET_OK:
            ret = daemonStreamHandleFinish(client, stream, msg);
659 660
            break;

661 662
        case VIR_NET_CONTINUE:
            ret = daemonStreamHandleWriteData(client, stream, msg);
663 664
            break;

665
        case VIR_NET_ERROR:
666
        default:
667
            ret = daemonStreamHandleAbort(client, stream, msg);
668 669 670
            break;
        }

671 672
        if (ret > 0)
            break;  /* still processing data from msg */
673

674 675 676
        virNetMessageQueueServe(&stream->rx);
        if (ret < 0) {
            virNetMessageFree(msg);
677
            virNetServerClientImmediateClose(client);
678 679
            return -1;
        }
680 681 682 683 684 685 686 687

        /* 'CONTINUE' messages don't send a reply (unless error
         * occurred), so to release the 'msg' object we need to
         * send a fake zero-length reply. Nothing actually gets
         * onto the wire, but this causes the client to reset
         * its active request count / throttling
         */
        if (msg->header.status == VIR_NET_CONTINUE) {
688
            virNetMessageClear(msg);
689 690 691
            msg->header.type = VIR_NET_REPLY;
            if (virNetServerClientSendMessage(client, msg) < 0) {
                virNetMessageFree(msg);
692
                virNetServerClientImmediateClose(client);
693 694 695
                return -1;
            }
        }
696 697 698 699
    }

    return 0;
}
700 701 702 703 704



/*
 * Invoked when a stream is signalled as having data
J
Ján Tomko 已提交
705
 * available to read. This reads up to one message
706 707 708 709 710 711 712 713
 * worth of data, and then queues that for transmission
 * to the client.
 *
 * Returns 0 if data was queued for TX, or a error RPC
 * was sent, or -1 on fatal error, indicating client should
 * be killed
 */
static int
714 715
daemonStreamHandleRead(virNetServerClientPtr client,
                       daemonClientStream *stream)
716
{
717 718
    virNetMessagePtr msg = NULL;
    virNetMessageError rerr;
719
    char *buffer;
720
    size_t bufferLen = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX;
721 722
    int ret = -1;
    int rv;
723

724 725 726 727 728 729 730 731 732
    VIR_DEBUG("client=%p, stream=%p tx=%d closed=%d",
              client, stream, stream->tx, stream->closed);

    /* We might have had an event pending before we shut
     * down the stream, so if we're marked as closed,
     * then do nothing
     */
    if (stream->closed)
        return 0;
733 734 735 736 737 738

    /* Shouldn't ever be called unless we're marked able to
     * transmit, but doesn't hurt to check */
    if (!stream->tx)
        return 0;

739 740
    memset(&rerr, 0, sizeof(rerr));

741 742 743
    if (VIR_ALLOC_N(buffer, bufferLen) < 0)
        return -1;

744 745 746 747 748
    if (!(msg = virNetMessageNew(false)))
        goto cleanup;

    rv = virStreamRecv(stream->st, buffer, bufferLen);
    if (rv == -2) {
749 750
        /* Should never get this, since we're only called when we know
         * we're readable, but hey things change... */
751 752 753 754 755 756 757 758 759
    } else if (rv < 0) {
        if (virNetServerProgramSendStreamError(remoteProgram,
                                               client,
                                               msg,
                                               &rerr,
                                               stream->procedure,
                                               stream->serial) < 0)
            goto cleanup;
        msg = NULL;
760
    } else {
761
        stream->tx = false;
762
        if (rv == 0)
763
            stream->recvEOF = true;
764

765 766 767 768 769 770 771 772 773 774 775
        msg->cb = daemonStreamMessageFinished;
        msg->opaque = stream;
        stream->refs++;
        if (virNetServerProgramSendStreamData(remoteProgram,
                                              client,
                                              msg,
                                              stream->procedure,
                                              stream->serial,
                                              buffer, rv) < 0)
            goto cleanup;
        msg = NULL;
776 777
    }

778 779
    ret = 0;
 cleanup:
780
    VIR_FREE(buffer);
781
    virNetMessageFree(msg);
782 783
    return ret;
}