stream.c 22.5 KB
Newer Older
1 2 3
/*
 * stream.c: APIs for managing client streams
 *
E
Eric Blake 已提交
4
 * Copyright (C) 2009, 2011 Red Hat, Inc.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
 *
 * Author: Daniel P. Berrange <berrange@redhat.com>
 */


#include <config.h>

#include "stream.h"
27
#include "remote.h"
28 29
#include "memory.h"
#include "logging.h"
30
#include "virnetserverclient.h"
31 32 33
#include "virterror_internal.h"

#define VIR_FROM_THIS VIR_FROM_STREAMS
34

35 36 37 38 39 40
#define virNetError(code, ...)                                    \
    virReportErrorHelper(VIR_FROM_THIS, code, __FILE__,           \
                         __FUNCTION__, __LINE__, __VA_ARGS__)

struct daemonClientStream {
    daemonClientPrivatePtr priv;
41
    int refs;
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59

    virNetServerProgramPtr prog;

    virStreamPtr st;
    int procedure;
    int serial;

    unsigned int recvEOF : 1;
    unsigned int closed : 1;

    int filterID;

    virNetMessagePtr rx;
    int tx;

    daemonClientStreamPtr next;
};

60
static int
61 62
daemonStreamHandleWrite(virNetServerClientPtr client,
                        daemonClientStream *stream);
63
static int
64 65
daemonStreamHandleRead(virNetServerClientPtr client,
                       daemonClientStream *stream);
66
static int
67 68 69
daemonStreamHandleFinish(virNetServerClientPtr client,
                         daemonClientStream *stream,
                         virNetMessagePtr msg);
70
static int
71 72 73
daemonStreamHandleAbort(virNetServerClientPtr client,
                        daemonClientStream *stream,
                        virNetMessagePtr msg);
74 75 76 77



static void
78
daemonStreamUpdateEvents(daemonClientStream *stream)
79 80 81 82
{
    int newEvents = 0;
    if (stream->rx)
        newEvents |= VIR_STREAM_EVENT_WRITABLE;
83 84
    if (stream->tx && !stream->recvEOF)
        newEvents |= VIR_STREAM_EVENT_READABLE;
85 86 87 88

    virStreamEventUpdateCallback(stream->st, newEvents);
}

89 90 91 92 93 94 95 96
/*
 * Invoked when an outgoing data packet message has been fully sent.
 * This simply re-enables TX of further data.
 *
 * The idea is to stop the daemon growing without bound due to
 * fast stream, but slow client
 */
static void
97
daemonStreamMessageFinished(virNetMessagePtr msg ATTRIBUTE_UNUSED,
98 99 100 101 102 103 104 105
                            void *opaque)
{
    daemonClientStream *stream = opaque;
    VIR_DEBUG("stream=%p proc=%d serial=%d",
              stream, msg->header.proc, msg->header.serial);

    stream->tx = 1;
    daemonStreamUpdateEvents(stream);
106 107

    daemonFreeClientStream(NULL, stream);
108
}
109

110 111 112 113 114 115 116 117 118

static void
daemonStreamEventFreeFunc(void *opaque)
{
    virNetServerClientPtr client = opaque;

    virNetServerClientFree(client);
}

119 120 121 122
/*
 * Callback that gets invoked when a stream becomes writable/readable
 */
static void
123
daemonStreamEvent(virStreamPtr st, int events, void *opaque)
124
{
125 126 127
    virNetServerClientPtr client = opaque;
    daemonClientStream *stream;
    daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client);
128

129
    virMutexLock(&priv->lock);
130

131 132 133 134 135 136
    stream = priv->streams;
    while (stream) {
        if (stream->st == st)
            break;
        stream = stream->next;
    }
137 138 139 140 141 142 143

    if (!stream) {
        VIR_WARN("event for client=%p stream st=%p, but missing stream state", client, st);
        virStreamEventRemoveCallback(st);
        goto cleanup;
    }

144
    VIR_DEBUG("st=%p events=%d EOF=%d closed=%d", st, events, stream->recvEOF, stream->closed);
145

D
Daniel P. Berrange 已提交
146 147
    if (!stream->closed &&
        (events & VIR_STREAM_EVENT_WRITABLE)) {
148 149 150
        if (daemonStreamHandleWrite(client, stream) < 0) {
            daemonRemoveClientStream(client, stream);
            virNetServerClientClose(client);
151 152 153 154
            goto cleanup;
        }
    }

D
Daniel P. Berrange 已提交
155 156 157
    if (!stream->closed && !stream->recvEOF &&
        (events & (VIR_STREAM_EVENT_READABLE))) {
        events = events & ~(VIR_STREAM_EVENT_READABLE);
158 159 160
        if (daemonStreamHandleRead(client, stream) < 0) {
            daemonRemoveClientStream(client, stream);
            virNetServerClientClose(client);
161 162 163 164
            goto cleanup;
        }
    }

165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
    /* If we have a completion/abort message, always process it */
    if (stream->rx) {
        virNetMessagePtr msg = stream->rx;
        switch (msg->header.status) {
        case VIR_NET_CONTINUE:
            /* nada */
            break;
        case VIR_NET_OK:
            virNetMessageQueueServe(&stream->rx);
            if (daemonStreamHandleFinish(client, stream, msg) < 0) {
                virNetMessageFree(msg);
                daemonRemoveClientStream(client, stream);
                virNetServerClientClose(client);
                goto cleanup;
            }
            break;
        case VIR_NET_ERROR:
        default:
            virNetMessageQueueServe(&stream->rx);
            if (daemonStreamHandleAbort(client, stream, msg) < 0) {
                virNetMessageFree(msg);
                daemonRemoveClientStream(client, stream);
                virNetServerClientClose(client);
                goto cleanup;
            }
            break;
        }
    }

D
Daniel P. Berrange 已提交
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224

    /* If we got HANGUP, we need to only send an empty
     * packet so the client sees an EOF and cleans up
     */
    if (!stream->closed && !stream->recvEOF &&
        (events & VIR_STREAM_EVENT_HANGUP)) {
        virNetMessagePtr msg;
        events &= ~(VIR_STREAM_EVENT_HANGUP);
        stream->tx = 0;
        stream->recvEOF = 1;
        if (!(msg = virNetMessageNew(false))) {
            daemonRemoveClientStream(client, stream);
            virNetServerClientClose(client);
            goto cleanup;
        }
        msg->cb = daemonStreamMessageFinished;
        msg->opaque = stream;
        stream->refs++;
        if (virNetServerProgramSendStreamData(remoteProgram,
                                              client,
                                              msg,
                                              stream->procedure,
                                              stream->serial,
                                              "", 0) < 0) {
            virNetMessageFree(msg);
            daemonRemoveClientStream(client, stream);
            virNetServerClientClose(client);
            goto cleanup;
        }
    }

225 226 227
    if (!stream->closed &&
        (events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP))) {
        int ret;
228 229 230 231
        virNetMessagePtr msg;
        virNetMessageError rerr;

        memset(&rerr, 0, sizeof(rerr));
232
        stream->closed = 1;
233
        virStreamEventRemoveCallback(stream->st);
234 235
        virStreamAbort(stream->st);
        if (events & VIR_STREAM_EVENT_HANGUP)
236 237
            virNetError(VIR_ERR_RPC,
                        "%s", _("stream had unexpected termination"));
238
        else
239 240 241
            virNetError(VIR_ERR_RPC,
                        "%s", _("stream had I/O failure"));

242
        msg = virNetMessageNew(false);
243 244 245 246 247 248 249 250 251 252 253
        if (!msg) {
            ret = -1;
        } else {
            ret = virNetServerProgramSendStreamError(remoteProgram,
                                                     client,
                                                     msg,
                                                     &rerr,
                                                     stream->procedure,
                                                     stream->serial);
        }
        daemonRemoveClientStream(client, stream);
254
        if (ret < 0)
255
            virNetServerClientClose(client);
256 257 258 259
        goto cleanup;
    }

    if (stream->closed) {
260
        daemonRemoveClientStream(client, stream);
261
    } else {
262
        daemonStreamUpdateEvents(stream);
263 264 265
    }

cleanup:
266
    virMutexUnlock(&priv->lock);
267 268
}

269 270 271 272 273 274 275 276 277 278

/*
 * @client: a locked client object
 *
 * Invoked by the main loop when filtering incoming messages.
 *
 * Returns 1 if the message was processed, 0 if skipped,
 * -1 on fatal client error
 */
static int
279
daemonStreamFilter(virNetServerClientPtr client ATTRIBUTE_UNUSED,
280 281
                   virNetMessagePtr msg,
                   void *opaque)
282
{
283 284
    daemonClientStream *stream = opaque;
    int ret = 0;
285

286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
    virMutexLock(&stream->priv->lock);

    if (msg->header.type != VIR_NET_STREAM)
        goto cleanup;

    if (!virNetServerProgramMatches(stream->prog, msg))
        goto cleanup;

    if (msg->header.proc != stream->procedure ||
        msg->header.serial != stream->serial)
        goto cleanup;

    VIR_DEBUG("Incoming client=%p, rx=%p, serial=%d, proc=%d, status=%d",
              client, stream->rx, msg->header.proc,
              msg->header.serial, msg->header.status);

    virNetMessageQueuePush(&stream->rx, msg);
    daemonStreamUpdateEvents(stream);
    ret = 1;

cleanup:
    virMutexUnlock(&stream->priv->lock);
    return ret;
309 310 311 312 313
}


/*
 * @conn: a connection object to associate the stream with
314
 * @header: the method call to associate with the stream
315 316 317 318 319
 *
 * Creates a new stream for this conn
 *
 * Returns a new stream object, or NULL upon OOM
 */
320 321 322 323 324
daemonClientStream *
daemonCreateClientStream(virNetServerClientPtr client,
                         virStreamPtr st,
                         virNetServerProgramPtr prog,
                         virNetMessageHeaderPtr header)
325
{
326 327
    daemonClientStream *stream;
    daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client);
328

329 330
    VIR_DEBUG("client=%p, proc=%d, serial=%d, st=%p",
              client, header->proc, header->serial, st);
331

332 333
    if (VIR_ALLOC(stream) < 0) {
        virReportOOMError();
334
        return NULL;
335
    }
336

337
    stream->refs = 1;
338 339 340 341 342 343
    stream->priv = priv;
    stream->prog = prog;
    stream->procedure = header->proc;
    stream->serial = header->serial;
    stream->filterID = -1;
    stream->st = st;
344

345
    virNetServerProgramRef(prog);
346 347 348 349 350 351 352 353 354 355

    return stream;
}

/*
 * @stream: an unused client stream
 *
 * Frees the memory associated with this inactive client
 * stream
 */
356 357
int daemonFreeClientStream(virNetServerClientPtr client,
                           daemonClientStream *stream)
358
{
359 360
    virNetMessagePtr msg;
    int ret = 0;
361 362

    if (!stream)
363 364
        return 0;

365 366 367 368
    stream->refs--;
    if (stream->refs)
        return 0;

369 370
    VIR_DEBUG("client=%p, proc=%d, serial=%d",
              client, stream->procedure, stream->serial);
371

372
    virNetServerProgramFree(stream->prog);
373 374 375

    msg = stream->rx;
    while (msg) {
376
        virNetMessagePtr tmp = msg->next;
377 378
        if (client) {
            /* Send a dummy reply to free up 'msg' & unblock client rx */
379
            virNetMessageClear(msg);
380 381 382 383 384 385 386
            msg->header.type = VIR_NET_REPLY;
            if (virNetServerClientSendMessage(client, msg) < 0) {
                virNetServerClientImmediateClose(client);
                virNetMessageFree(msg);
                ret = -1;
            }
        } else {
387 388
            virNetMessageFree(msg);
        }
389 390 391 392 393
        msg = tmp;
    }

    virStreamFree(stream->st);
    VIR_FREE(stream);
394 395

    return ret;
396 397 398 399 400 401 402
}


/*
 * @client: a locked client to add the stream to
 * @stream: a stream to add
 */
403 404 405
int daemonAddClientStream(virNetServerClientPtr client,
                          daemonClientStream *stream,
                          bool transmit)
406
{
407 408 409
    VIR_DEBUG("client=%p, proc=%d, serial=%d, st=%p, transmit=%d",
              client, stream->procedure, stream->serial, stream->st, transmit);
    daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client);
410

411 412 413 414
    if (stream->filterID != -1) {
        VIR_WARN("Filter already added to client %p", client);
        return -1;
    }
415

416
    if (virStreamEventAddCallback(stream->st, 0,
417 418
                                  daemonStreamEvent, client,
                                  daemonStreamEventFreeFunc) < 0)
419 420
        return -1;

421
    virNetServerClientRef(client);
422 423 424 425 426
    if ((stream->filterID = virNetServerClientAddFilter(client,
                                                        daemonStreamFilter,
                                                        stream)) < 0) {
        virStreamEventRemoveCallback(stream->st);
        return -1;
427 428
    }

429 430
    if (transmit)
        stream->tx = 1;
431

432 433 434
    virMutexLock(&priv->lock);
    stream->next = priv->streams;
    priv->streams = stream;
435

436
    daemonStreamUpdateEvents(stream);
437

438
    virMutexUnlock(&priv->lock);
439

440
    return 0;
441 442 443 444 445 446 447 448 449 450 451 452
}


/*
 * @client: a locked client object
 * @stream: an inactive, closed stream object
 *
 * Removes a stream from the list of active streams for the client
 *
 * Returns 0 if the stream was removd, -1 if it doesn't exist
 */
int
453 454
daemonRemoveClientStream(virNetServerClientPtr client,
                         daemonClientStream *stream)
455
{
456 457 458 459 460 461 462 463 464 465
    VIR_DEBUG("client=%p, proc=%d, serial=%d, st=%p",
              client, stream->procedure, stream->serial, stream->st);
    daemonClientPrivatePtr priv = virNetServerClientGetPrivateData(client);
    daemonClientStream *curr = priv->streams;
    daemonClientStream *prev = NULL;

    if (stream->filterID != -1) {
        virNetServerClientRemoveFilter(client,
                                       stream->filterID);
        stream->filterID = -1;
466 467
    }

468 469
    if (!stream->closed) {
        virStreamEventRemoveCallback(stream->st);
470
        virStreamAbort(stream->st);
471
    }
472 473 474 475 476 477

    while (curr) {
        if (curr == stream) {
            if (prev)
                prev->next = curr->next;
            else
478 479
                priv->streams = curr->next;
            return daemonFreeClientStream(client, stream);
480 481 482 483 484 485
        }
        prev = curr;
        curr = curr->next;
    }
    return -1;
}
486 487


488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
void
daemonRemoveAllClientStreams(daemonClientStream *stream)
{
    daemonClientStream *tmp;

    VIR_DEBUG("stream=%p", stream);

    while (stream) {
        tmp = stream->next;

        if (!stream->closed) {
            virStreamEventRemoveCallback(stream->st);
            virStreamAbort(stream->st);
        }

        daemonFreeClientStream(NULL, stream);

        VIR_DEBUG("next stream=%p", tmp);
        stream = tmp;
    }
}

510 511 512 513 514 515 516
/*
 * Returns:
 *   -1  if fatal error occurred
 *    0  if message was fully processed
 *    1  if message is still being processed
 */
static int
517 518 519
daemonStreamHandleWriteData(virNetServerClientPtr client,
                            daemonClientStream *stream,
                            virNetMessagePtr msg)
520 521 522
{
    int ret;

523 524 525
    VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%d, len=%zu, offset=%zu",
              client, stream, msg->header.proc, msg->header.serial,
              msg->bufferLength, msg->bufferOffset);
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540

    ret = virStreamSend(stream->st,
                        msg->buffer + msg->bufferOffset,
                        msg->bufferLength - msg->bufferOffset);

    if (ret > 0) {
        msg->bufferOffset += ret;

        /* Partial write, so indicate we have more todo later */
        if (msg->bufferOffset < msg->bufferLength)
            return 1;
    } else if (ret == -2) {
        /* Blocking, so indicate we have more todo later */
        return 1;
    } else {
541 542 543 544
        virNetMessageError rerr;

        memset(&rerr, 0, sizeof(rerr));

545
        VIR_INFO("Stream send failed");
546
        stream->closed = 1;
547 548 549 550 551
        return virNetServerProgramSendReplyError(stream->prog,
                                                 client,
                                                 msg,
                                                 &rerr,
                                                 &msg->header);
552 553 554 555 556 557 558
    }

    return 0;
}


/*
E
Eric Blake 已提交
559
 * Process a finish handshake from the client.
560
 *
561
 * Returns a VIR_NET_OK confirmation if successful, or a VIR_NET_ERROR
562 563 564 565 566
 * if there was a stream error
 *
 * Returns 0 if successfully sent RPC reply, -1 upon fatal error
 */
static int
567 568 569
daemonStreamHandleFinish(virNetServerClientPtr client,
                         daemonClientStream *stream,
                         virNetMessagePtr msg)
570 571 572
{
    int ret;

573 574
    VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%d",
              client, stream, msg->header.proc, msg->header.serial);
575 576

    stream->closed = 1;
577
    virStreamEventRemoveCallback(stream->st);
578 579 580
    ret = virStreamFinish(stream->st);

    if (ret < 0) {
581 582 583 584 585 586 587
        virNetMessageError rerr;
        memset(&rerr, 0, sizeof(rerr));
        return virNetServerProgramSendReplyError(stream->prog,
                                                 client,
                                                 msg,
                                                 &rerr,
                                                 &msg->header);
588 589
    } else {
        /* Send zero-length confirm */
590 591 592 593 594 595
        return virNetServerProgramSendStreamData(stream->prog,
                                                 client,
                                                 msg,
                                                 stream->procedure,
                                                 stream->serial,
                                                 NULL, 0);
596 597 598 599 600 601 602 603 604 605
    }
}


/*
 * Process an abort request from the client.
 *
 * Returns 0 if successfully aborted, -1 upon error
 */
static int
606 607 608
daemonStreamHandleAbort(virNetServerClientPtr client,
                        daemonClientStream *stream,
                        virNetMessagePtr msg)
609
{
610 611 612
    VIR_DEBUG("client=%p, stream=%p, proc=%d, serial=%d",
              client, stream, msg->header.proc, msg->header.serial);
    virNetMessageError rerr;
613

614
    memset(&rerr, 0, sizeof(rerr));
615 616

    stream->closed = 1;
617
    virStreamEventRemoveCallback(stream->st);
618 619
    virStreamAbort(stream->st);

620 621 622
    if (msg->header.status == VIR_NET_ERROR)
        virNetError(VIR_ERR_RPC,
                    "%s", _("stream aborted at client request"));
623
    else {
624 625 626 627
        VIR_WARN("unexpected stream status %d", msg->header.status);
        virNetError(VIR_ERR_RPC,
                    _("stream aborted with unexpected status %d"),
                    msg->header.status);
628 629
    }

630 631 632 633 634
    return virNetServerProgramSendReplyError(remoteProgram,
                                             client,
                                             msg,
                                             &rerr,
                                             &msg->header);
635 636 637 638 639 640 641 642 643 644 645 646
}



/*
 * Called when the stream is signalled has being able to accept
 * data writes. Will process all pending incoming messages
 * until they're all gone, or I/O blocks
 *
 * Returns 0 on success, or -1 upon fatal error
 */
static int
647 648
daemonStreamHandleWrite(virNetServerClientPtr client,
                        daemonClientStream *stream)
649
{
650
    VIR_DEBUG("client=%p, stream=%p", client, stream);
651

652 653
    while (stream->rx && !stream->closed) {
        virNetMessagePtr msg = stream->rx;
654
        int ret;
655 656 657 658

        switch (msg->header.status) {
        case VIR_NET_OK:
            ret = daemonStreamHandleFinish(client, stream, msg);
659 660
            break;

661 662
        case VIR_NET_CONTINUE:
            ret = daemonStreamHandleWriteData(client, stream, msg);
663 664
            break;

665
        case VIR_NET_ERROR:
666
        default:
667
            ret = daemonStreamHandleAbort(client, stream, msg);
668 669 670
            break;
        }

671 672
        if (ret > 0)
            break;  /* still processing data from msg */
673

674 675 676
        virNetMessageQueueServe(&stream->rx);
        if (ret < 0) {
            virNetMessageFree(msg);
677
            virNetServerClientImmediateClose(client);
678 679
            return -1;
        }
680 681 682 683 684 685 686 687

        /* 'CONTINUE' messages don't send a reply (unless error
         * occurred), so to release the 'msg' object we need to
         * send a fake zero-length reply. Nothing actually gets
         * onto the wire, but this causes the client to reset
         * its active request count / throttling
         */
        if (msg->header.status == VIR_NET_CONTINUE) {
688
            virNetMessageClear(msg);
689 690 691
            msg->header.type = VIR_NET_REPLY;
            if (virNetServerClientSendMessage(client, msg) < 0) {
                virNetMessageFree(msg);
692
                virNetServerClientImmediateClose(client);
693 694 695
                return -1;
            }
        }
696 697 698 699
    }

    return 0;
}
700 701 702 703 704 705 706 707 708 709 710 711 712 713



/*
 * Invoked when a stream is signalled as having data
 * available to read. This reads upto one message
 * worth of data, and then queues that for transmission
 * to the client.
 *
 * Returns 0 if data was queued for TX, or a error RPC
 * was sent, or -1 on fatal error, indicating client should
 * be killed
 */
static int
714 715
daemonStreamHandleRead(virNetServerClientPtr client,
                       daemonClientStream *stream)
716 717
{
    char *buffer;
718
    size_t bufferLen = VIR_NET_MESSAGE_PAYLOAD_MAX;
719 720
    int ret;

721 722 723 724 725 726 727 728 729
    VIR_DEBUG("client=%p, stream=%p tx=%d closed=%d",
              client, stream, stream->tx, stream->closed);

    /* We might have had an event pending before we shut
     * down the stream, so if we're marked as closed,
     * then do nothing
     */
    if (stream->closed)
        return 0;
730 731 732 733 734 735 736 737 738 739 740 741 742 743 744

    /* Shouldn't ever be called unless we're marked able to
     * transmit, but doesn't hurt to check */
    if (!stream->tx)
        return 0;

    if (VIR_ALLOC_N(buffer, bufferLen) < 0)
        return -1;

    ret = virStreamRecv(stream->st, buffer, bufferLen);
    if (ret == -2) {
        /* Should never get this, since we're only called when we know
         * we're readable, but hey things change... */
        ret = 0;
    } else if (ret < 0) {
745 746 747 748
        virNetMessagePtr msg;
        virNetMessageError rerr;

        memset(&rerr, 0, sizeof(rerr));
749

750
        if (!(msg = virNetMessageNew(false)))
751 752 753 754 755 756 757 758
            ret = -1;
        else
            ret = virNetServerProgramSendStreamError(remoteProgram,
                                                     client,
                                                     msg,
                                                     &rerr,
                                                     stream->procedure,
                                                     stream->serial);
759
    } else {
760
        virNetMessagePtr msg;
761 762 763
        stream->tx = 0;
        if (ret == 0)
            stream->recvEOF = 1;
764
        if (!(msg = virNetMessageNew(false)))
765 766 767 768 769
            ret = -1;

        if (msg) {
            msg->cb = daemonStreamMessageFinished;
            msg->opaque = stream;
770
            stream->refs++;
771 772 773 774 775 776 777
            ret = virNetServerProgramSendStreamData(remoteProgram,
                                                    client,
                                                    msg,
                                                    stream->procedure,
                                                    stream->serial,
                                                    buffer, ret);
        }
778 779 780 781 782
    }

    VIR_FREE(buffer);
    return ret;
}