diff --git a/src/rpc/virnetclientstream.c b/src/rpc/virnetclientstream.c index b428f4b4d575c234abcbe160eb324fa157a23d39..34989a9220c7da335e19942be2663b300e66a107 100644 --- a/src/rpc/virnetclientstream.c +++ b/src/rpc/virnetclientstream.c @@ -49,9 +49,7 @@ struct _virNetClientStream { * time by stopping consuming any incoming data * off the socket.... */ - char *incoming; - size_t incomingOffset; - size_t incomingLength; + virNetMessagePtr rx; bool incomingEOF; virNetClientStreamEventCallback cb; @@ -86,9 +84,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) if (!st->cb) return; - VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); + VIR_DEBUG("Check timer rx=%p cbEvents=%d", st->rx, st->cbEvents); - if (((st->incomingOffset || st->incomingEOF) && + if (((st->rx || st->incomingEOF) && (st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { VIR_DEBUG("Enabling event timer"); @@ -110,13 +108,13 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_READABLE) && - (st->incomingOffset || st->incomingEOF)) + (st->rx || st->incomingEOF)) events |= VIR_STREAM_EVENT_READABLE; if (st->cb && (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) events |= VIR_STREAM_EVENT_WRITABLE; - VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); + VIR_DEBUG("Got Timer dispatch events=%d cbEvents=%d rx=%p", events, st->cbEvents, st->rx); if (events) { virNetClientStreamEventCallback cb = st->cb; void *cbOpaque = st->cbOpaque; @@ -161,7 +159,11 @@ void virNetClientStreamDispose(void *obj) virNetClientStreamPtr st = obj; virResetError(&st->err); - VIR_FREE(st->incoming); + while (st->rx) { + virNetMessagePtr msg = st->rx; + virNetMessageQueueServe(&st->rx); + virNetMessageFree(msg); + } virObjectUnref(st->prog); } @@ -264,41 +266,34 @@ int virNetClientStreamSetError(virNetClientStreamPtr st, int virNetClientStreamQueuePacket(virNetClientStreamPtr st, virNetMessagePtr msg) { - int ret = -1; - size_t need; + virNetMessagePtr tmp_msg; + + VIR_DEBUG("Incoming stream message: stream=%p message=%p", st, msg); + + /* Unfortunately, we must allocate new message as the one we + * get in @msg is going to be cleared later in the process. */ + + if (!(tmp_msg = virNetMessageNew(false))) + return -1; + + /* Copy header */ + memcpy(&tmp_msg->header, &msg->header, sizeof(msg->header)); + + /* Steal message buffer */ + tmp_msg->buffer = msg->buffer; + tmp_msg->bufferLength = msg->bufferLength; + tmp_msg->bufferOffset = msg->bufferOffset; + msg->buffer = NULL; + msg->bufferLength = msg->bufferOffset = 0; virObjectLock(st); - need = msg->bufferLength - msg->bufferOffset; - if (need) { - size_t avail = st->incomingLength - st->incomingOffset; - if (need > avail) { - size_t extra = need - avail; - if (VIR_REALLOC_N(st->incoming, - st->incomingLength + extra) < 0) { - VIR_DEBUG("Out of memory handling stream data"); - goto cleanup; - } - st->incomingLength += extra; - } - memcpy(st->incoming + st->incomingOffset, - msg->buffer + msg->bufferOffset, - msg->bufferLength - msg->bufferOffset); - st->incomingOffset += (msg->bufferLength - msg->bufferOffset); - } else { - st->incomingEOF = true; - } + virNetMessageQueuePush(&st->rx, tmp_msg); - VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", - st->incomingOffset, st->incomingLength, - st->incomingEOF); virNetClientStreamEventTimerUpdate(st); - ret = 0; - - cleanup: virObjectUnlock(st); - return ret; + return 0; } @@ -362,10 +357,12 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, bool nonblock) { int rv = -1; + size_t want; + VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", st, client, data, nbytes, nonblock); virObjectLock(st); - if (!st->incomingOffset && !st->incomingEOF) { + if (!st->rx && !st->incomingEOF) { virNetMessagePtr msg; int ret; @@ -395,23 +392,28 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, goto cleanup; } - VIR_DEBUG("After IO %zu", st->incomingOffset); - if (st->incomingOffset) { - int want = st->incomingOffset; - if (want > nbytes) - want = nbytes; - memcpy(data, st->incoming, want); - if (want < st->incomingOffset) { - memmove(st->incoming, st->incoming + want, st->incomingOffset - want); - st->incomingOffset -= want; - } else { - VIR_FREE(st->incoming); - st->incomingOffset = st->incomingLength = 0; + VIR_DEBUG("After IO rx=%p", st->rx); + want = nbytes; + while (want && st->rx) { + virNetMessagePtr msg = st->rx; + size_t len = want; + + if (len > msg->bufferLength - msg->bufferOffset) + len = msg->bufferLength - msg->bufferOffset; + + if (!len) + break; + + memcpy(data + (nbytes - want), msg->buffer + msg->bufferOffset, len); + want -= len; + msg->bufferOffset += len; + + if (msg->bufferOffset == msg->bufferLength) { + virNetMessageQueueServe(&st->rx); + virNetMessageFree(msg); } - rv = want; - } else { - rv = 0; } + rv = nbytes - want; virNetClientStreamEventTimerUpdate(st);