提交 d9c9e138 编写于 作者: O Ossi Herrala 提交者: Martin Kletzander

rpc: Fix slow volume download (virsh vol-download)

Use I/O vector (iovec) instead of one huge memory buffer as suggested
in https://bugzilla.redhat.com/show_bug.cgi?id=1026137#c7. This avoids
doing memmove() to big buffers and performance doesn't degrade if
source (virNetClientStreamQueuePacket()) is faster than sink
(virNetClientStreamRecvPacket()).

Resolves: http://bugzilla.redhat.com/1026137Signed-off-by: NMartin Kletzander <mkletzan@redhat.com>
上级 e7fef6d0
...@@ -49,9 +49,9 @@ struct _virNetClientStream { ...@@ -49,9 +49,9 @@ struct _virNetClientStream {
* time by stopping consuming any incoming data * time by stopping consuming any incoming data
* off the socket.... * off the socket....
*/ */
char *incoming; struct iovec *incomingVec; /* I/O Vector to hold data */
size_t incomingOffset; size_t writeVec; /* Vectors produced */
size_t incomingLength; size_t readVec; /* Vectors consumed */
bool incomingEOF; bool incomingEOF;
virNetClientStreamEventCallback cb; virNetClientStreamEventCallback cb;
...@@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st) ...@@ -86,9 +86,9 @@ virNetClientStreamEventTimerUpdate(virNetClientStreamPtr st)
if (!st->cb) if (!st->cb)
return; return;
VIR_DEBUG("Check timer offset=%zu %d", st->incomingOffset, st->cbEvents); VIR_DEBUG("Check timer readVec %zu writeVec %zu %d", st->readVec, st->writeVec, st->cbEvents);
if (((st->incomingOffset || st->incomingEOF) && if ((((st->readVec < st->writeVec) || st->incomingEOF) &&
(st->cbEvents & VIR_STREAM_EVENT_READABLE)) || (st->cbEvents & VIR_STREAM_EVENT_READABLE)) ||
(st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) { (st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) {
VIR_DEBUG("Enabling event timer"); VIR_DEBUG("Enabling event timer");
...@@ -110,13 +110,14 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque) ...@@ -110,13 +110,14 @@ virNetClientStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
if (st->cb && if (st->cb &&
(st->cbEvents & VIR_STREAM_EVENT_READABLE) && (st->cbEvents & VIR_STREAM_EVENT_READABLE) &&
(st->incomingOffset || st->incomingEOF)) ((st->readVec < st->writeVec) || st->incomingEOF))
events |= VIR_STREAM_EVENT_READABLE; events |= VIR_STREAM_EVENT_READABLE;
if (st->cb && if (st->cb &&
(st->cbEvents & VIR_STREAM_EVENT_WRITABLE)) (st->cbEvents & VIR_STREAM_EVENT_WRITABLE))
events |= VIR_STREAM_EVENT_WRITABLE; events |= VIR_STREAM_EVENT_WRITABLE;
VIR_DEBUG("Got Timer dispatch %d %d offset=%zu", events, st->cbEvents, st->incomingOffset); VIR_DEBUG("Got Timer dispatch %d %d readVec %zu writeVec %zu", events, st->cbEvents,
st->readVec, st->writeVec);
if (events) { if (events) {
virNetClientStreamEventCallback cb = st->cb; virNetClientStreamEventCallback cb = st->cb;
void *cbOpaque = st->cbOpaque; void *cbOpaque = st->cbOpaque;
...@@ -161,7 +162,7 @@ void virNetClientStreamDispose(void *obj) ...@@ -161,7 +162,7 @@ void virNetClientStreamDispose(void *obj)
virNetClientStreamPtr st = obj; virNetClientStreamPtr st = obj;
virResetError(&st->err); virResetError(&st->err);
VIR_FREE(st->incoming); VIR_FREE(st->incomingVec);
virObjectUnref(st->prog); virObjectUnref(st->prog);
} }
...@@ -265,38 +266,50 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st, ...@@ -265,38 +266,50 @@ int virNetClientStreamQueuePacket(virNetClientStreamPtr st,
virNetMessagePtr msg) virNetMessagePtr msg)
{ {
int ret = -1; int ret = -1;
size_t need; struct iovec iov;
char *base;
size_t piece, pieces, length, offset = 0, size = 1024*1024;
virObjectLock(st); virObjectLock(st);
need = msg->bufferLength - msg->bufferOffset;
if (need) {
size_t avail = st->incomingLength - st->incomingOffset;
if (need > avail) {
size_t extra = need - avail;
if (VIR_REALLOC_N(st->incoming,
st->incomingLength + extra) < 0) {
VIR_DEBUG("Out of memory handling stream data");
goto cleanup;
}
st->incomingLength += extra;
}
memcpy(st->incoming + st->incomingOffset, length = msg->bufferLength - msg->bufferOffset;
msg->buffer + msg->bufferOffset,
msg->bufferLength - msg->bufferOffset); if (length == 0) {
st->incomingOffset += (msg->bufferLength - msg->bufferOffset);
} else {
st->incomingEOF = true; st->incomingEOF = true;
goto end;
} }
VIR_DEBUG("Stream incoming data offset %zu length %zu EOF %d", pieces = (length + size - 1) / size;
st->incomingOffset, st->incomingLength, for (piece = 0; piece < pieces; piece++) {
st->incomingEOF); if (size > length - offset)
virNetClientStreamEventTimerUpdate(st); size = length - offset;
if (VIR_ALLOC_N(base, size)) {
VIR_DEBUG("Allocation failed");
goto cleanup;
}
memcpy(base, msg->buffer + msg->bufferOffset + offset, size);
iov.iov_base = base;
iov.iov_len = size;
offset += size;
if (VIR_APPEND_ELEMENT(st->incomingVec, st->writeVec, iov) < 0) {
VIR_DEBUG("Append failed");
VIR_FREE(base);
goto cleanup;
}
VIR_DEBUG("Wrote piece of vector. readVec %zu, writeVec %zu size %zu",
st->readVec, st->writeVec, size);
}
end:
virNetClientStreamEventTimerUpdate(st);
ret = 0; ret = 0;
cleanup: cleanup:
VIR_DEBUG("Stream incoming data readVec %zu writeVec %zu EOF %d",
st->readVec, st->writeVec, st->incomingEOF);
virObjectUnlock(st); virObjectUnlock(st);
return ret; return ret;
} }
...@@ -361,17 +374,21 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, ...@@ -361,17 +374,21 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
size_t nbytes, size_t nbytes,
bool nonblock) bool nonblock)
{ {
int rv = -1; int ret = -1;
size_t partial, offset;
virObjectLock(st);
VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d", VIR_DEBUG("st=%p client=%p data=%p nbytes=%zu nonblock=%d",
st, client, data, nbytes, nonblock); st, client, data, nbytes, nonblock);
virObjectLock(st);
if (!st->incomingOffset && !st->incomingEOF) { if ((st->readVec >= st->writeVec) && !st->incomingEOF) {
virNetMessagePtr msg; virNetMessagePtr msg;
int ret; int rv;
if (nonblock) { if (nonblock) {
VIR_DEBUG("Non-blocking mode and no data available"); VIR_DEBUG("Non-blocking mode and no data available");
rv = -2; ret = -2;
goto cleanup; goto cleanup;
} }
...@@ -387,37 +404,66 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, ...@@ -387,37 +404,66 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
VIR_DEBUG("Dummy packet to wait for stream data"); VIR_DEBUG("Dummy packet to wait for stream data");
virObjectUnlock(st); virObjectUnlock(st);
ret = virNetClientSendWithReplyStream(client, msg, st); rv = virNetClientSendWithReplyStream(client, msg, st);
virObjectLock(st); virObjectLock(st);
virNetMessageFree(msg); virNetMessageFree(msg);
if (ret < 0) if (rv < 0)
goto cleanup; goto cleanup;
} }
VIR_DEBUG("After IO %zu", st->incomingOffset); offset = 0;
if (st->incomingOffset) { partial = nbytes;
int want = st->incomingOffset;
if (want > nbytes) while (st->incomingVec && (st->readVec < st->writeVec)) {
want = nbytes; struct iovec *iov = st->incomingVec + st->readVec;
memcpy(data, st->incoming, want);
if (want < st->incomingOffset) { if (!iov || !iov->iov_base) {
memmove(st->incoming, st->incoming + want, st->incomingOffset - want); virReportError(VIR_ERR_INTERNAL_ERROR,
st->incomingOffset -= want; "%s", _("NULL pointer encountered"));
} else { goto cleanup;
VIR_FREE(st->incoming);
st->incomingOffset = st->incomingLength = 0;
} }
rv = want;
} else { if (partial < iov->iov_len) {
rv = 0; memcpy(data+offset, iov->iov_base, partial);
memmove(iov->iov_base, (char*)iov->iov_base+partial,
iov->iov_len-partial);
iov->iov_len -= partial;
offset += partial;
VIR_DEBUG("Consumed %zu, left %zu", partial, iov->iov_len);
break;
}
memcpy(data+offset, iov->iov_base, iov->iov_len);
VIR_DEBUG("Consumed %zu. Moving to next piece", iov->iov_len);
partial -= iov->iov_len;
offset += iov->iov_len;
VIR_FREE(iov->iov_base);
iov->iov_len = 0;
st->readVec++;
VIR_DEBUG("Read piece of vector. read %zu, readVec %zu, writeVec %zu",
offset, st->readVec, st->writeVec);
} }
/* Shrink the I/O Vector buffer to free up memory. Do the
shrinking only when there is selected amount or more buffers to
free so it doesn't constantly memmove() and realloc() buffers.
*/
if (st->readVec >= 16) {
memmove(st->incomingVec, st->incomingVec + st->readVec,
sizeof(*st->incomingVec)*(st->writeVec - st->readVec));
VIR_SHRINK_N(st->incomingVec, st->writeVec, st->readVec);
VIR_DEBUG("shrink removed %zu, left %zu", st->readVec, st->writeVec);
st->readVec = 0;
}
ret = offset;
virNetClientStreamEventTimerUpdate(st); virNetClientStreamEventTimerUpdate(st);
cleanup: cleanup:
virObjectUnlock(st); virObjectUnlock(st);
return rv; return ret;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册