提交 a5249510 编写于 作者: M Michal Privoznik

Introduce virStreamSparseSendAll

This is just a wrapper over new function that have been just
introduced: virStreamSendHole() . It's very similar to
virStreamSendAll() except it handles sparse streams well.
Signed-off-by: NMichal Privoznik <mprivozn@redhat.com>
上级 3a1e2e92
......@@ -71,9 +71,9 @@ int virStreamRecvHole(virStreamPtr,
* @nbytes: size of the data array
* @opaque: optional application provided data
*
* The virStreamSourceFunc callback is used together
* with the virStreamSendAll function for libvirt to
* obtain the data that is to be sent.
* The virStreamSourceFunc callback is used together with
* the virStreamSendAll and virStreamSparseSendAll functions
* for libvirt to obtain the data that is to be sent.
*
* The callback will be invoked multiple times,
* fetching data in small chunks. The application
......@@ -96,6 +96,65 @@ int virStreamSendAll(virStreamPtr st,
virStreamSourceFunc handler,
void *opaque);
/**
* virStreamSourceHoleFunc:
* @st: the stream object
* @inData: are we in data section
* @length: how long is the section we are currently in
* @opaque: optional application provided data
*
* The virStreamSourceHoleFunc callback is used together with the
* virStreamSparseSendAll function for libvirt to obtain the
* length of section stream is currently in.
*
* Moreover, upon successful return, @length should be updated
* with how many bytes are left until the current section ends
* (either data section or hole section). Also the stream is
* currently in data section, @inData should be set to a non-zero
* value and vice versa.
*
* NB: there's an implicit hole at the end of each file. If
* that's the case, @inData and @length should be both set to 0.
*
* This function should not adjust the current position within
* the file.
*
* Returns 0 on success,
* -1 upon error
*/
typedef int (*virStreamSourceHoleFunc)(virStreamPtr st,
int *inData,
long long *length,
void *opaque);
/**
* virStreamSourceSkipFunc:
* @st: the stream object
* @length: stream hole size
* @opaque: optional application provided data
*
* This callback is used together with the virStreamSparseSendAll
* to skip holes in the underlying file as reported by
* virStreamSourceHoleFunc.
*
* The callback may be invoked multiple times as holes are found
* during processing a stream. The application should skip
* processing the hole in the stream source and then return.
* A return value of -1 at any time will abort the send operation.
*
* Returns 0 on success,
* -1 upon error.
*/
typedef int (*virStreamSourceSkipFunc)(virStreamPtr st,
long long length,
void *opaque);
int virStreamSparseSendAll(virStreamPtr st,
virStreamSourceFunc handler,
virStreamSourceHoleFunc holeHandler,
virStreamSourceSkipFunc skipHandler,
void *opaque);
/**
* virStreamSinkFunc:
*
......
......@@ -574,6 +574,165 @@ virStreamSendAll(virStreamPtr stream,
}
/**
* virStreamSparseSendAll:
* @stream: pointer to the stream object
* @handler: source callback for reading data from application
* @holeHandler: source callback for determining holes
* @skipHandler: skip holes as reported by @holeHandler
* @opaque: application defined data
*
* Send the entire data stream, reading the data from the
* requested data source. This is simply a convenient alternative
* to virStreamSend, for apps that do blocking-I/O.
*
* An example using this with a hypothetical file upload
* API looks like
*
* int mysource(virStreamPtr st, char *buf, int nbytes, void *opaque) {
* int *fd = opaque;
*
* return read(*fd, buf, nbytes);
* }
*
* int myskip(virStreamPtr st, long long offset, void *opaque) {
* int *fd = opaque;
*
* return lseek(*fd, offset, SEEK_CUR) == (off_t) -1 ? -1 : 0;
* }
*
* int myindata(virStreamPtr st, int *inData,
* long long *offset, void *opaque) {
* int *fd = opaque;
*
* if (@fd in hole) {
* *inData = 0;
* *offset = holeSize;
* } else {
* *inData = 1;
* *offset = dataSize;
* }
*
* return 0;
* }
*
* virStreamPtr st = virStreamNew(conn, 0);
* int fd = open("demo.iso", O_RDONLY);
*
* virConnectUploadSparseFile(conn, st);
* if (virStreamSparseSendAll(st,
* mysource,
* myindata,
* myskip,
* &fd) < 0) {
* ...report an error ...
* goto done;
* }
* if (virStreamFinish(st) < 0)
* ...report an error...
* virStreamFree(st);
* close(fd);
*
* Note that @opaque data are shared between @handler, @holeHandler and @skipHandler.
*
* Returns 0 if all the data was successfully sent. The caller
* should invoke virStreamFinish(st) to flush the stream upon
* success and then virStreamFree.
*
* Returns -1 upon any error, with virStreamAbort() already
* having been called, so the caller need only call
* virStreamFree().
*/
int virStreamSparseSendAll(virStreamPtr stream,
virStreamSourceFunc handler,
virStreamSourceHoleFunc holeHandler,
virStreamSourceSkipFunc skipHandler,
void *opaque)
{
char *bytes = NULL;
size_t want = VIR_NET_MESSAGE_LEGACY_PAYLOAD_MAX;
int ret = -1;
unsigned long long dataLen = 0;
VIR_DEBUG("stream=%p handler=%p holeHandler=%p opaque=%p",
stream, handler, holeHandler, opaque);
virResetLastError();
virCheckStreamReturn(stream, -1);
virCheckNonNullArgGoto(handler, cleanup);
virCheckNonNullArgGoto(holeHandler, cleanup);
virCheckNonNullArgGoto(skipHandler, cleanup);
if (stream->flags & VIR_STREAM_NONBLOCK) {
virReportError(VIR_ERR_OPERATION_INVALID, "%s",
_("data sources cannot be used for non-blocking streams"));
goto cleanup;
}
if (VIR_ALLOC_N(bytes, want) < 0)
goto cleanup;
for (;;) {
int inData, got, offset = 0;
long long sectionLen;
const unsigned int skipFlags = 0;
if (!dataLen) {
if (holeHandler(stream, &inData, &sectionLen, opaque) < 0) {
virStreamAbort(stream);
goto cleanup;
}
if (!inData && sectionLen) {
if (virStreamSendHole(stream, sectionLen, skipFlags) < 0) {
virStreamAbort(stream);
goto cleanup;
}
if (skipHandler(stream, sectionLen, opaque) < 0) {
virReportSystemError(errno, "%s",
_("unable to skip hole"));
virStreamAbort(stream);
goto cleanup;
}
continue;
} else {
dataLen = sectionLen;
}
}
if (want > dataLen)
want = dataLen;
got = (handler)(stream, bytes, want, opaque);
if (got < 0) {
virStreamAbort(stream);
goto cleanup;
}
if (got == 0)
break;
while (offset < got) {
int done;
done = virStreamSend(stream, bytes + offset, got - offset);
if (done < 0)
goto cleanup;
offset += done;
dataLen -= done;
}
}
ret = 0;
cleanup:
VIR_FREE(bytes);
if (ret != 0)
virDispatchError(stream->conn);
return ret;
}
/**
* virStreamRecvAll:
* @stream: pointer to the stream object
......
......@@ -765,6 +765,7 @@ LIBVIRT_3.4.0 {
virStreamRecvHole;
virStreamSendHole;
virStreamSparseRecvAll;
virStreamSparseSendAll;
} LIBVIRT_3.1.0;
# .... define new API here using predicted next version number ....
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册