提交 182eba1b 编写于 作者: D Daniel P. Berrange

Add public API definition for data stream handling

* include/libvirt/libvirt.h.in: Public API contract for
  virStreamPtr object
* src/libvirt_public.syms: Export data stream APIs
* src/libvirt_private.syms: Export internal helper APIs
* src/libvirt.c: Data stream API driver dispatch
* src/datatypes.h, src/datatypes.c: Internal helpers for virStreamPtr
  object
* src/driver.h: Define internal driver API for streams
* .x-sc_avoid_write: Ignore src/libvirt.c because it trips
  up on comments including write()
* python/Makefile.am: Add libvirt-override-virStream.py
* python/generator.py: Add rules for virStreamPtr class
* python/typewrappers.h, python/typewrappers.c: Wrapper
  for virStreamPtr
* docs/libvirt-api.xml, docs/libvirt-refs.xml: Regenerate
  with new APIs
上级 d9e66a62
^src/libvirt\.c$
^src/util/util\.c$
^src/xen/xend_internal\.c$
^daemon/libvirtd.c$
......
此差异已折叠。
此差异已折叠。
......@@ -110,6 +110,24 @@ typedef enum {
VIR_DOMAIN_NONE = 0
} virDomainCreateFlags;
/**
* virStream:
*
* a virStream is a private structure representing a data stream.
*/
typedef struct _virStream virStream;
/**
* virStreamPtr:
*
* a virStreamPtr is pointer to a virStream private structure, this is the
* type used to reference a data stream in the API.
*/
typedef virStream *virStreamPtr;
/**
* VIR_SECURITY_LABEL_BUFLEN:
*
......@@ -1502,6 +1520,127 @@ int virSecretUndefine (virSecretPtr secret);
int virSecretRef (virSecretPtr secret);
int virSecretFree (virSecretPtr secret);
typedef enum {
VIR_STREAM_NONBLOCK = (1 << 0),
} virStreamFlags;
virStreamPtr virStreamNew(virConnectPtr conn,
unsigned int flags);
int virStreamRef(virStreamPtr st);
int virStreamSend(virStreamPtr st,
const char *data,
size_t nbytes);
int virStreamRecv(virStreamPtr st,
char *data,
size_t nbytes);
/**
* virStreamSourceFunc:
*
* @st: the stream object
* @data: preallocated array to be filled with data
* @nbytes: size of the data array
* @opaque: optional application provided data
*
* The virStreamSourceFunc callback is used together
* with the virStreamSendAll function for libvirt to
* obtain the data that is to be sent.
*
* The callback will be invoked multiple times,
* fetching data in small chunks. The application
* should fill the 'data' array with upto 'nbytes'
* of data and then return the number actual number
* of bytes. The callback will continue to be
* invoked until it indicates the end of the source
* has been reached by returning 0. A return value
* of -1 at any time will abort the send operation
*
* Returns the number of bytes filled, 0 upon end
* of file, or -1 upon error
*/
typedef int (*virStreamSourceFunc)(virStreamPtr st,
char *data,
size_t nbytes,
void *opaque);
int virStreamSendAll(virStreamPtr st,
virStreamSourceFunc handler,
void *opaque);
/**
* virStreamSinkFunc:
*
* @st: the stream object
* @data: preallocated array to be filled with data
* @nbytes: size of the data array
* @opaque: optional application provided data
*
* The virStreamSinkFunc callback is used together
* with the virStreamRecvAll function for libvirt to
* provide the data that has been received.
*
* The callback will be invoked multiple times,
* providing data in small chunks. The application
* should consume up 'nbytes' from the 'data' array
* of data and then return the number actual number
* of bytes consumed. The callback will continue to be
* invoked until it indicates the end of the stream
* has been reached. A return value of -1 at any time
* will abort the receive operation
*
* Returns the number of bytes consumed or -1 upon
* error
*/
typedef int (*virStreamSinkFunc)(virStreamPtr st,
const char *data,
size_t nbytes,
void *opaque);
int virStreamRecvAll(virStreamPtr st,
virStreamSinkFunc handler,
void *opaque);
typedef enum {
VIR_STREAM_EVENT_READABLE = (1 << 0),
VIR_STREAM_EVENT_WRITABLE = (1 << 1),
VIR_STREAM_EVENT_ERROR = (1 << 2),
VIR_STREAM_EVENT_HANGUP = (1 << 3),
} virStreamEventType;
/**
* virStreamEventCallback:
*
* @stream: stream on which the event occurred
* @events: bitset of events from virEventHandleType constants
* @opaque: user data registered with handle
*
* Callback for receiving stream events. The callback will
* be invoked once for each event which is pending.
*/
typedef void (*virStreamEventCallback)(virStreamPtr stream, int events, void *opaque);
int virStreamEventAddCallback(virStreamPtr stream,
int events,
virStreamEventCallback cb,
void *opaque,
virFreeCallback ff);
int virStreamEventUpdateCallback(virStreamPtr stream,
int events);
int virStreamEventRemoveCallback(virStreamPtr stream);
int virStreamFinish(virStreamPtr st);
int virStreamAbort(virStreamPtr st);
int virStreamFree(virStreamPtr st);
#ifdef __cplusplus
}
#endif
......
......@@ -13,7 +13,9 @@ DOCS_DIR = $(datadir)/doc/libvirt-python-$(LIBVIRT_VERSION)
DOCS = ${srcdir}/TODO
CLASSES_EXTRA = libvirt-override-virConnect.py
CLASSES_EXTRA = \
libvirt-override-virConnect.py \
libvirt-override-virStream.py
EXTRA_DIST = \
generator.py \
......
......@@ -282,6 +282,11 @@ py_types = {
'const virSecretPtr': ('O', "virSecret", "virSecretPtr", "virSecretPtr"),
'virSecret *': ('O', "virSecret", "virSecretPtr", "virSecretPtr"),
'const virSecret *': ('O', "virSecret", "virSecretPtr", "virSecretPtr"),
'virStreamPtr': ('O', "virStream", "virStreamPtr", "virStreamPtr"),
'const virStreamPtr': ('O', "virStream", "virStreamPtr", "virStreamPtr"),
'virStream *': ('O', "virStream", "virStreamPtr", "virStreamPtr"),
'const virStream *': ('O', "virStream", "virStreamPtr", "virStreamPtr"),
}
py_return_types = {
......@@ -338,6 +343,8 @@ skip_impl = (
'virSecretGetUUID',
'virSecretGetUUIDString',
'virSecretLookupByUUID',
'virStreamRecv',
'virStreamSend',
'virStoragePoolGetUUID',
'virStoragePoolGetUUIDString',
'virStoragePoolLookupByUUID',
......@@ -373,6 +380,11 @@ skip_function = (
'virConnectDomainEventDeregister', # overridden in virConnect.py
'virSaveLastError', # We have our own python error wrapper
'virFreeError', # Only needed if we use virSaveLastError
'virStreamEventAddCallback',
'virStreamRecvAll',
'virStreamSendAll',
'virStreamRef',
'virStreamFree',
)
......@@ -643,6 +655,8 @@ classes_type = {
"virNodeDevice *": ("._o", "virNodeDevice(self, _obj=%s)", "virNodeDevice"),
"virSecretPtr": ("._o", "virSecret(self, _obj=%s)", "virSecret"),
"virSecret *": ("._o", "virSecret(self, _obj=%s)", "virSecret"),
"virStreamPtr": ("._o", "virStream(self, _obj=%s)", "virStream"),
"virStream *": ("._o", "virStream(self, _obj=%s)", "virStream"),
"virConnectPtr": ("._o", "virConnect(_obj=%s)", "virConnect"),
"virConnect *": ("._o", "virConnect(_obj=%s)", "virConnect"),
}
......@@ -652,7 +666,8 @@ converter_type = {
primary_classes = ["virDomain", "virNetwork", "virInterface",
"virStoragePool", "virStorageVol",
"virConnect", "virNodeDevice", "virSecret" ]
"virConnect", "virNodeDevice", "virSecret",
"virStream"]
classes_ancestor = {
}
......@@ -663,7 +678,9 @@ classes_destructors = {
"virStoragePool": "virStoragePoolFree",
"virStorageVol": "virStorageVolFree",
"virNodeDevice" : "virNodeDeviceFree",
"virSecret": "virSecretFree"
"virSecret": "virSecretFree",
# We hand-craft __del__ for this one
#"virStream": "virStreamFree",
}
functions_noexcept = {
......@@ -782,6 +799,11 @@ def nameFixup(name, classe, type, file):
elif name[0:9] == 'virSecret':
func = name[9:]
func = string.lower(func[0:1]) + func[1:]
elif name[0:12] == 'virStreamNew':
func = "newStream"
elif name[0:9] == 'virStream':
func = name[9:]
func = string.lower(func[0:1]) + func[1:]
elif name[0:17] == "virStoragePoolGet":
func = name[17:]
func = string.lower(func[0:1]) + func[1:]
......@@ -1059,7 +1081,8 @@ def buildWrappers():
classes_ancestor[classname]))
else:
classes.write("class %s:\n" % (classname))
if classname in [ "virDomain", "virNetwork", "virInterface", "virStoragePool", "virStorageVol", "virNodeDevice", "virSecret" ]:
if classname in [ "virDomain", "virNetwork", "virInterface", "virStoragePool",
"virStorageVol", "virNodeDevice", "virSecret","virStream" ]:
classes.write(" def __init__(self, conn, _obj=None):\n")
else:
classes.write(" def __init__(self, _obj=None):\n")
......@@ -1067,7 +1090,8 @@ def buildWrappers():
list = reference_keepers[classname]
for ref in list:
classes.write(" self.%s = None\n" % ref[1])
if classname in [ "virDomain", "virNetwork", "virInterface", "virNodeDevice", "virSecret" ]:
if classname in [ "virDomain", "virNetwork", "virInterface",
"virNodeDevice", "virSecret", "virStream" ]:
classes.write(" self._conn = conn\n")
elif classname in [ "virStorageVol", "virStoragePool" ]:
classes.write(" self._conn = conn\n" + \
......
def __del__(self):
try:
if self.cb:
libvirtmod.virStreamEventRemoveCallback(self._o)
except AttributeError:
pass
if self._o != None:
libvirtmod.virStreamFree(self._o)
self._o = None
def eventAddCallback(self, cb, opaque):
""" """
try:
self.cb = cb
self.opaque = opaque
ret = libvirtmod.virStreamEventAddCallback(self._o, self)
if ret == -1: raise libvirtError ('virStreamEventAddCallback() failed', conn=self._conn)
except AttributeError:
pass
......@@ -206,6 +206,19 @@ libvirt_virSecretPtrWrap(virSecretPtr node)
return (ret);
}
PyObject *
libvirt_virStreamPtrWrap(virStreamPtr node)
{
PyObject *ret;
if (node == NULL) {
Py_INCREF(Py_None);
return Py_None;
}
ret = PyCObject_FromVoidPtrAndDesc(node, (char *) "virStreamPtr", NULL);
return (ret);
}
PyObject *
libvirt_virEventHandleCallbackWrap(virEventHandleCallback node)
{
......
......@@ -92,6 +92,15 @@ typedef struct {
} PyvirSecret_Object;
#define PyvirStream_Get(v) (((v) == Py_None) ? NULL : \
(((PyvirStream_Object *)(v))->obj))
typedef struct {
PyObject_HEAD
virStreamPtr obj;
} PyvirStream_Object;
#define PyvirEventHandleCallback_Get(v) (((v) == Py_None) ? NULL : \
(((PyvirEventHandleCallback_Object *)(v))->obj))
......@@ -144,6 +153,7 @@ PyObject * libvirt_virFreeCallbackWrap(virFreeCallback node);
PyObject * libvirt_virVoidPtrWrap(void* node);
PyObject * libvirt_virNodeDevicePtrWrap(virNodeDevicePtr node);
PyObject * libvirt_virSecretPtrWrap(virSecretPtr node);
PyObject * libvirt_virStreamPtrWrap(virStreamPtr node);
/* Provide simple macro statement wrappers (adapted from GLib, in turn from Perl):
......
......@@ -1162,6 +1162,7 @@ virUnrefNodeDevice(virNodeDevicePtr dev) {
return (refs);
}
/**
* virGetSecret:
* @conn: the hypervisor connection
......@@ -1297,3 +1298,61 @@ virUnrefSecret(virSecretPtr secret) {
virMutexUnlock(&secret->conn->lock);
return refs;
}
virStreamPtr virGetStream(virConnectPtr conn) {
virStreamPtr ret = NULL;
virMutexLock(&conn->lock);
if (VIR_ALLOC(ret) < 0) {
virReportOOMError(conn);
goto error;
}
ret->magic = VIR_STREAM_MAGIC;
ret->conn = conn;
conn->refs++;
ret->refs++;
virMutexUnlock(&conn->lock);
return(ret);
error:
virMutexUnlock(&conn->lock);
VIR_FREE(ret);
return(NULL);
}
static void
virReleaseStream(virStreamPtr st) {
virConnectPtr conn = st->conn;
DEBUG("release dev %p", st);
st->magic = -1;
VIR_FREE(st);
DEBUG("unref connection %p %d", conn, conn->refs);
conn->refs--;
if (conn->refs == 0) {
virReleaseConnect(conn);
/* Already unlocked mutex */
return;
}
virMutexUnlock(&conn->lock);
}
int virUnrefStream(virStreamPtr st) {
int refs;
virMutexLock(&st->conn->lock);
DEBUG("unref stream %p %d", st, st->refs);
st->refs--;
refs = st->refs;
if (refs == 0) {
virReleaseStream(st);
/* Already unlocked mutex */
return (0);
}
virMutexUnlock(&st->conn->lock);
return (refs);
}
......@@ -109,6 +109,17 @@
#define VIR_IS_CONNECTED_SECRET(obj) (VIR_IS_SECRET(obj) && VIR_IS_CONNECT((obj)->conn))
/**
* VIR_STREAM_MAGIC:
*
* magic value used to protect the API when pointers to stream structures
* are passed down by the users.
*/
#define VIR_STREAM_MAGIC 0x1DEAD666
#define VIR_IS_STREAM(obj) ((obj) && (obj)->magic==VIR_STREAM_MAGIC)
#define VIR_IS_CONNECTED_STREAM(obj) (VIR_IS_STREAM(obj) && VIR_IS_CONNECT((obj)->conn))
/**
* _virConnect:
*
......@@ -261,6 +272,25 @@ struct _virSecret {
};
typedef int (*virStreamAbortFunc)(virStreamPtr, void *opaque);
typedef int (*virStreamFinishFunc)(virStreamPtr, void *opaque);
/**
* _virStream:
*
* Internal structure associated with an input stream
*/
struct _virStream {
unsigned int magic;
virConnectPtr conn;
int refs;
int flags;
virStreamDriverPtr driver;
void *privateData;
};
/************************************************************************
* *
* API for domain/connections (de)allocations and lookups *
......@@ -303,4 +333,7 @@ virSecretPtr virGetSecret(virConnectPtr conn,
const char *usageID);
int virUnrefSecret(virSecretPtr secret);
virStreamPtr virGetStream(virConnectPtr conn);
int virUnrefStream(virStreamPtr st);
#endif
......@@ -879,6 +879,41 @@ struct _virSecretDriver {
virDrvSecretUndefine undefine;
};
typedef struct _virStreamDriver virStreamDriver;
typedef virStreamDriver *virStreamDriverPtr;
typedef int (*virDrvStreamSend)(virStreamPtr st,
const char *data,
size_t nbytes);
typedef int (*virDrvStreamRecv)(virStreamPtr st,
char *data,
size_t nbytes);
typedef int (*virDrvStreamEventAddCallback)(virStreamPtr stream,
int events,
virStreamEventCallback cb,
void *opaque,
virFreeCallback ff);
typedef int (*virDrvStreamEventUpdateCallback)(virStreamPtr stream,
int events);
typedef int (*virDrvStreamEventRemoveCallback)(virStreamPtr stream);
typedef int (*virDrvStreamFinish)(virStreamPtr st);
typedef int (*virDrvStreamAbort)(virStreamPtr st);
struct _virStreamDriver {
virDrvStreamSend streamSend;
virDrvStreamRecv streamRecv;
virDrvStreamEventAddCallback streamAddCallback;
virDrvStreamEventUpdateCallback streamUpdateCallback;
virDrvStreamEventRemoveCallback streamRemoveCallback;
virDrvStreamFinish streamFinish;
virDrvStreamAbort streamAbort;
};
/*
* Registration
* TODO: also need ways to (des)activate a given driver
......
......@@ -561,6 +561,10 @@ virLibNodeDeviceError(virNodeDevicePtr dev, virErrorNumber error,
errmsg, info, NULL, 0, 0, errmsg, info);
}
#define virLibStreamError(conn, code, fmt...) \
virReportErrorHelper(conn, VIR_FROM_NONE, code, __FILE__, \
__FUNCTION__, __LINE__, fmt)
/**
* virLibSecretError:
* @secret: the secret if available
......@@ -9394,3 +9398,700 @@ virSecretFree(virSecretPtr secret)
return -1;
return 0;
}
/**
* virStreamNew:
* @conn: pointer to the connection
* @flags: control features of the stream
*
* Creates a new stream object which can be used to perform
* streamed I/O with other public API function.
*
* When no longer needed, a stream object must be released
* with virStreamFree. If a data stream has been used,
* then the application must call virStreamFinish or
* virStreamAbort before free'ing to, in order to notify
* the driver of termination.
*
* If a non-blocking data stream is required passed
* VIR_STREAM_NONBLOCK for flags, otherwise pass 0.
*
* Returns the new stream, or NULL upon error
*/
virStreamPtr
virStreamNew(virConnectPtr conn,
unsigned int flags)
{
virStreamPtr st;
DEBUG("conn=%p, flags=%u", conn, flags);
virResetLastError();
if (!VIR_IS_CONNECT(conn)) {
virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
return (NULL);
}
st = virGetStream(conn);
if (st)
st->flags = flags;
return st;
}
/**
* virStreamRef:
* @stream: pointer to the stream
*
* Increment the reference count on the stream. For each
* additional call to this method, there shall be a corresponding
* call to virStreamFree to release the reference count, once
* the caller no longer needs the reference to this object.
*
* Returns 0 in case of success, -1 in case of failure
*/
int
virStreamRef(virStreamPtr stream)
{
if ((!VIR_IS_CONNECTED_STREAM(stream))) {
virLibConnError(NULL, VIR_ERR_INVALID_ARG, __FUNCTION__);
return(-1);
}
virMutexLock(&stream->conn->lock);
DEBUG("stream=%p refs=%d", stream, stream->refs);
stream->refs++;
virMutexUnlock(&stream->conn->lock);
return 0;
}
/**
* virStreamSend:
* @stream: pointer to the stream object
* @data: buffer to write to stream
* @nbytes: size of @data buffer
*
* Write a series of bytes to the stream. This method may
* block the calling application for an arbitrary amount
* of time. Once an application has finished sending data
* it should call virStreamFinish to wait for succesful
* confirmation from the driver, or detect any error
*
* This method may not be used if a stream source has been
* registered
*
* Errors are not guaranteed to be reported synchronously
* with the call, but may instead be delayed until a
* subsequent call.
*
* A example using this with a hypothetical file upload
* API looks like
*
* virStreamPtr st = virStreamNew(conn, 0);
* int fd = open("demo.iso", O_RDONLY)
*
* virConnectUploadFile(conn, "demo.iso", st);
*
* while (1) {
* char buf[1024];
* int got = read(fd, buf, 1024);
* if (got < 0) {
* virStreamAbort(st);
* break;
* }
* if (got == 0) {
* virStreamFinish(st);
* break;
* }
* int offset = 0;
* while (offset < got) {
* int sent = virStreamSend(st, buf+offset, got-offset)
* if (sent < 0) {
* virStreamAbort(st);
* goto done;
* }
* offset += sent;
* }
* }
* if (virStreamFinish(st) < 0)
* ... report an error ....
* done:
* virStreamFree(st);
* close(fd);
*
* Returns the number of bytes written, which may be less
* than requested.
*
* Returns -1 upon error, at which time the stream will
* be marked as aborted, and the caller should now release
* the stream with virStreamFree.
*
* Returns -2 if the outgoing transmit buffers are full &
* the stream is marked as non-blocking.
*/
int virStreamSend(virStreamPtr stream,
const char *data,
size_t nbytes)
{
DEBUG("stream=%p, data=%p, nbytes=%zi", stream, data, nbytes);
virResetLastError();
if (!VIR_IS_CONNECTED_STREAM(stream)) {
virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
return (-1);
}
if (stream->driver &&
stream->driver->streamSend) {
int ret;
ret = (stream->driver->streamSend)(stream, data, nbytes);
if (ret == -2)
return -2;
if (ret < 0)
goto error;
return ret;
}
virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
error:
/* Copy to connection error object for back compatability */
virSetConnError(stream->conn);
return -1;
}
/**
* virStreamRecv:
* @stream: pointer to the stream object
* @data: buffer to write to stream
* @nbytes: size of @data buffer
*
* Write a series of bytes to the stream. This method may
* block the calling application for an arbitrary amount
* of time.
*
* Errors are not guaranteed to be reported synchronously
* with the call, but may instead be delayed until a
* subsequent call.
*
* A example using this with a hypothetical file download
* API looks like
*
* virStreamPtr st = virStreamNew(conn, 0);
* int fd = open("demo.iso", O_WRONLY, 0600)
*
* virConnectDownloadFile(conn, "demo.iso", st);
*
* while (1) {
* char buf[1024];
* int got = virStreamRecv(st, buf, 1024);
* if (got < 0)
* break;
* if (got == 0) {
* virStreamFinish(st);
* break;
* }
* int offset = 0;
* while (offset < got) {
* int sent = write(fd, buf+offset, got-offset)
* if (sent < 0) {
* virStreamAbort(st);
* goto done;
* }
* offset += sent;
* }
* }
* if (virStreamFinish(st) < 0)
* ... report an error ....
* done:
* virStreamFree(st);
* close(fd);
*
*
* Returns the number of bytes read, which may be less
* than requested.
*
* Returns 0 when the end of the stream is reached, at
* which time the caller should invoke virStreamFinish()
* to get confirmation of stream completion.
*
* Returns -1 upon error, at which time the stream will
* be marked as aborted, and the caller should now release
* the stream with virStreamFree.
*
* Returns -2 if there is no data pending to be read & the
* stream is marked as non-blocking.
*/
int virStreamRecv(virStreamPtr stream,
char *data,
size_t nbytes)
{
DEBUG("stream=%p, data=%p, nbytes=%zi", stream, data, nbytes);
virResetLastError();
if (!VIR_IS_CONNECTED_STREAM(stream)) {
virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
return (-1);
}
if (stream->driver &&
stream->driver->streamRecv) {
int ret;
ret = (stream->driver->streamRecv)(stream, data, nbytes);
if (ret == -2)
return -2;
if (ret < 0)
goto error;
return ret;
}
virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
error:
/* Copy to connection error object for back compatability */
virSetConnError(stream->conn);
return -1;
}
/**
* virStreamSendAll:
* @stream: pointer to the stream object
* @handler: source callback for reading data from application
* @opaque: application defined data
*
* Send the entire data stream, reading the data from the
* requested data source. This is simply a convenient alternative
* to virStreamSend, for apps that do blocking-I/o.
*
* A example using this with a hypothetical file upload
* API looks like
*
* int mysource(virStreamPtr st, char *buf, int nbytes, void *opaque) {
* int *fd = opaque;
*
* return read(*fd, buf, nbytes);
* }
*
* virStreamPtr st = virStreamNew(conn, 0);
* int fd = open("demo.iso", O_RDONLY)
*
* virConnectUploadFile(conn, st);
* if (virStreamSendAll(st, mysource, &fd) < 0) {
* ...report an error ...
* goto done;
* }
* if (virStreamFinish(st) < 0)
* ...report an error...
* virStreamFree(st);
* close(fd);
*
* Returns 0 if all the data was succesfully sent. The caller
* should invoke virStreamFinish(st) to flush the stream upon
* success and then virStreamFree
*
* Returns -1 upon any error, with virStreamAbort() already
* having been called, so the caller need only call
* virStreamFree()
*/
int virStreamSendAll(virStreamPtr stream,
virStreamSourceFunc handler,
void *opaque)
{
char *bytes = NULL;
int want = 1024*64;
int ret = -1;
DEBUG("stream=%p, handler=%p, opaque=%p", stream, handler, opaque);
virResetLastError();
if (!VIR_IS_CONNECTED_STREAM(stream)) {
virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
return (-1);
}
if (stream->flags & VIR_STREAM_NONBLOCK) {
virLibConnError(NULL, VIR_ERR_OPERATION_INVALID,
_("data sources cannot be used for non-blocking streams"));
goto cleanup;
}
if (VIR_ALLOC_N(bytes, want) < 0) {
virReportOOMError(stream->conn);
goto cleanup;
}
for (;;) {
int got, offset = 0;
got = (handler)(stream, bytes, want, opaque);
if (got < 0) {
virStreamAbort(stream);
goto cleanup;
}
if (got == 0)
break;
while (offset < got) {
int done;
done = virStreamSend(stream, bytes + offset, got - offset);
if (done < 0)
goto cleanup;
offset += done;
}
}
ret = 0;
cleanup:
VIR_FREE(bytes);
/* Copy to connection error object for back compatability */
if (ret != 0)
virSetConnError(stream->conn);
return ret;
}
/**
* virStreamRecvAll:
* @stream: pointer to the stream object
* @handler: sink callback for writing data to application
* @opaque: application defined data
*
* Receive the entire data stream, sending the data to the
* requested data sink. This is simply a convenient alternative
* to virStreamRecv, for apps that do blocking-I/o.
*
* A example using this with a hypothetical file download
* API looks like
*
* int mysink(virStreamPtr st, const char *buf, int nbytes, void *opaque) {
* int *fd = opaque;
*
* return write(*fd, buf, nbytes);
* }
*
* virStreamPtr st = virStreamNew(conn, 0);
* int fd = open("demo.iso", O_WRONLY)
*
* virConnectUploadFile(conn, st);
* if (virStreamRecvAll(st, mysink, &fd) < 0) {
* ...report an error ...
* goto done;
* }
* if (virStreamFinish(st) < 0)
* ...report an error...
* virStreamFree(st);
* close(fd);
*
* Returns 0 if all the data was succesfully received. The caller
* should invoke virStreamFinish(st) to flush the stream upon
* success and then virStreamFree
*
* Returns -1 upon any error, with virStreamAbort() already
* having been called, so the caller need only call
* virStreamFree()
*/
int virStreamRecvAll(virStreamPtr stream,
virStreamSinkFunc handler,
void *opaque)
{
char *bytes = NULL;
int want = 1024*64;
int ret = -1;
DEBUG("stream=%p, handler=%p, opaque=%p", stream, handler, opaque);
virResetLastError();
if (!VIR_IS_CONNECTED_STREAM(stream)) {
virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
return (-1);
}
if (stream->flags & VIR_STREAM_NONBLOCK) {
virLibConnError(NULL, VIR_ERR_OPERATION_INVALID,
_("data sinks cannot be used for non-blocking streams"));
goto cleanup;
}
if (VIR_ALLOC_N(bytes, want) < 0) {
virReportOOMError(stream->conn);
goto cleanup;
}
for (;;) {
int got, offset = 0;
got = virStreamRecv(stream, bytes, want);
if (got < 0)
goto cleanup;
if (got == 0)
break;
while (offset < got) {
int done;
done = (handler)(stream, bytes + offset, got - offset, opaque);
if (done < 0) {
virStreamAbort(stream);
goto cleanup;
}
offset += done;
}
}
ret = 0;
cleanup:
VIR_FREE(bytes);
/* Copy to connection error object for back compatability */
if (ret != 0)
virSetConnError(stream->conn);
return ret;
}
/**
* virStreamEventAddCallback
* @stream: pointer to the stream object
* @events: set of events to monitor
* @cb: callback to invoke when an event occurs
* @opaque: application defined data
* @ff: callback to free @opaque data
*
* Register a callback to be notified when a stream
* becomes writable, or readable. This is most commonly
* used in conjunction with non-blocking data streams
* to integrate into an event loop
*
* Returns 0 on success, -1 upon error
*/
int virStreamEventAddCallback(virStreamPtr stream,
int events,
virStreamEventCallback cb,
void *opaque,
virFreeCallback ff)
{
DEBUG("stream=%p, events=%d, cb=%p, opaque=%p, ff=%p", stream, events, cb, opaque, ff);
virResetLastError();
if (!VIR_IS_CONNECTED_STREAM(stream)) {
virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
return (-1);
}
if (stream->driver &&
stream->driver->streamAddCallback) {
int ret;
ret = (stream->driver->streamAddCallback)(stream, events, cb, opaque, ff);
if (ret < 0)
goto error;
return ret;
}
virLibConnError(stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
error:
/* Copy to connection error object for back compatability */
virSetConnError(stream->conn);
return -1;
}
/**
* virStreamEventUpdateCallback
* @stream: pointer to the stream object
* @events: set of events to monitor
*
* Changes the set of events to monitor for a stream. This allows
* for event notification to be changed without having to
* unregister & register the callback completely. This method
* is guarenteed to succeed if a callback is already registered
*
* Returns 0 on success, -1 if no callback is registered
*/
int virStreamEventUpdateCallback(virStreamPtr stream,
int events)
{
DEBUG("stream=%p, events=%d", stream, events);
virResetLastError();
if (!VIR_IS_CONNECTED_STREAM(stream)) {
virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
return (-1);
}
if (stream->driver &&
stream->driver->streamUpdateCallback) {
int ret;
ret = (stream->driver->streamUpdateCallback)(stream, events);
if (ret < 0)
goto error;
return ret;
}
virLibConnError (stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
error:
/* Copy to connection error object for back compatability */
virSetConnError(stream->conn);
return -1;
}
/**
* virStreamEventRemoveCallback
* @stream: pointer to the stream object
*
* Remove a event callback from the stream
*
* Returns 0 on success, -1 on error
*/
int virStreamEventRemoveCallback(virStreamPtr stream)
{
DEBUG("stream=%p", stream);
virResetLastError();
if (!VIR_IS_CONNECTED_STREAM(stream)) {
virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
return (-1);
}
if (stream->driver &&
stream->driver->streamRemoveCallback) {
int ret;
ret = (stream->driver->streamRemoveCallback)(stream);
if (ret < 0)
goto error;
return ret;
}
virLibConnError (stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
error:
/* Copy to connection error object for back compatability */
virSetConnError(stream->conn);
return -1;
}
/**
* virStreamFinish:
* @stream: pointer to the stream object
*
* Indicate that there is no further data is to be transmitted
* on the stream. For output streams this should be called once
* all data has been written. For input streams this should be
* called once virStreamRecv returns end-of-file.
*
* This method is a synchronization point for all asynchronous
* errors, so if this returns a success code the application can
* be sure that all data has been successfully processed.
*
* Returns 0 on success, -1 upon error
*/
int virStreamFinish(virStreamPtr stream)
{
DEBUG("stream=%p", stream);
virResetLastError();
if (!VIR_IS_CONNECTED_STREAM(stream)) {
virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
return (-1);
}
if (stream->driver &&
stream->driver->streamFinish) {
int ret;
ret = (stream->driver->streamFinish)(stream);
if (ret < 0)
goto error;
return ret;
}
virLibConnError (stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
error:
/* Copy to connection error object for back compatability */
virSetConnError(stream->conn);
return -1;
}
/**
* virStreamAbort:
* @stream: pointer to the stream object
*
* Request that the in progress data transfer be cancelled
* abnormally before the end of the stream has been reached.
* For output streams this can be used to inform the driver
* that the stream is being terminated early. For input
* streams this can be used to inform the driver that it
* should stop sending data.
*
* Returns 0 on success, -1 upon error
*/
int virStreamAbort(virStreamPtr stream)
{
DEBUG("stream=%p", stream);
virResetLastError();
if (!VIR_IS_CONNECTED_STREAM(stream)) {
virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
return (-1);
}
if (stream->driver &&
stream->driver->streamAbort) {
int ret;
ret = (stream->driver->streamAbort)(stream);
if (ret < 0)
goto error;
return ret;
}
virLibConnError (stream->conn, VIR_ERR_NO_SUPPORT, __FUNCTION__);
error:
/* Copy to connection error object for back compatability */
virSetConnError(stream->conn);
return -1;
}
/**
* virStreamFree:
* @stream: pointer to the stream object
*
* Decrement the reference count on a stream, releasing
* the stream object if the reference count has hit zero.
*
* There must not be a active data transfer in progress
* when releasing the stream. If a stream needs to be
* disposed of prior to end of stream being reached, then
* the virStreamAbort function should be called first.
*
* Returns 0 upon success, or -1 on error
*/
int virStreamFree(virStreamPtr stream)
{
DEBUG("stream=%p", stream);
virResetLastError();
if (!VIR_IS_CONNECTED_STREAM(stream)) {
virLibConnError(NULL, VIR_ERR_INVALID_CONN, __FUNCTION__);
return (-1);
}
/* XXX Enforce shutdown before free'ing resources ? */
if (virUnrefStream(stream) < 0)
return (-1);
return (0);
}
......@@ -78,6 +78,8 @@ virGetNodeDevice;
virUnrefDomain;
virUnrefConnect;
virUnrefSecret;
virGetStream;
virUnrefStream;
# domain_conf.h
......
......@@ -312,4 +312,20 @@ LIBVIRT_0.7.1 {
virSecretFree;
} LIBVIRT_0.7.0;
LIBVIRT_0.7.2 {
global:
virStreamNew;
virStreamRef;
virStreamSend;
virStreamRecv;
virStreamSendAll;
virStreamRecvAll;
virStreamEventAddCallback;
virStreamEventUpdateCallback;
virStreamEventRemoveCallback;
virStreamFinish;
virStreamAbort;
virStreamFree;
} LIBVIRT_0.7.1;
# .... define new API here using predicted next version number ....
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册