Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
openeuler
libvirt
提交
61674cc1
L
libvirt
项目概览
openeuler
/
libvirt
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
L
libvirt
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
61674cc1
编写于
1月 20, 2009
作者:
D
Daniel P. Berrange
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Make RPC call dispatch threaded
上级
458a673c
变更
5
显示空白变更内容
内联
并排
Showing
5 changed file
with
891 addition
and
453 deletion
+891
-453
ChangeLog
ChangeLog
+9
-0
src/libvirt_private.syms
src/libvirt_private.syms
+1
-0
src/remote_internal.c
src/remote_internal.c
+857
-442
src/util.c
src/util.c
+22
-11
src/util.h
src/util.h
+2
-0
未找到文件。
ChangeLog
浏览文件 @
61674cc1
Tue Jan 20 16:36:53 GMT 2009 Daniel P. Berrange <berrange@redhat.com>
Make RPC call dispatch threaded
* src/libvirt_private.syms, src/util.h, src/util.c: Add
a general virSetNonBlock() helper with portability to
Win32
* src/remote_internal.c: Re-factor I/O to allow RPC calls
from multiple threads to be handled concurrently.
Tue Jan 20 17:08:20 CET 2009 Daniel Veillard <veillard@redhat.com>
* src/domain_conf.h src/lxc_driver.c src/uml_driver.c: virDomainObj
...
...
src/libvirt_private.syms
浏览文件 @
61674cc1
...
...
@@ -290,6 +290,7 @@ virEnumToString;
virEventAddHandle;
virEventRemoveHandle;
virExec;
virSetNonBlock;
virFormatMacAddr;
virGetHostname;
virParseMacAddr;
...
...
src/remote_internal.c
浏览文件 @
61674cc1
...
...
@@ -68,6 +68,8 @@
#include <netdb.h>
#include <poll.h>
/* AI_ADDRCONFIG is missing on some systems. */
#ifndef AI_ADDRCONFIG
# define AI_ADDRCONFIG 0
...
...
@@ -86,8 +88,44 @@
#include "util.h"
#include "event.h"
#ifdef WIN32
#define pipe(fds) _pipe(fds,4096, _O_BINARY)
#endif
static
int
inside_daemon
=
0
;
struct
remote_thread_call
;
enum
{
REMOTE_MODE_WAIT_TX
,
REMOTE_MODE_WAIT_RX
,
REMOTE_MODE_COMPLETE
,
REMOTE_MODE_ERROR
,
};
struct
remote_thread_call
{
int
mode
;
/* 4 byte length, followed by RPC message header+body */
char
buffer
[
4
+
REMOTE_MESSAGE_MAX
];
unsigned
int
bufferLength
;
unsigned
int
bufferOffset
;
unsigned
int
serial
;
unsigned
int
proc_nr
;
virCond
cond
;
xdrproc_t
ret_filter
;
char
*
ret
;
remote_error
err
;
struct
remote_thread_call
*
next
;
};
struct
private_data
{
virMutex
lock
;
...
...
@@ -101,12 +139,24 @@ struct private_data {
int
localUses
;
/* Ref count for private data */
char
*
hostname
;
/* Original hostname */
FILE
*
debugLog
;
/* Debug remote protocol */
#if HAVE_SASL
sasl_conn_t
*
saslconn
;
/* SASL context */
const
char
*
saslDecoded
;
unsigned
int
saslDecodedLength
;
unsigned
int
saslDecodedOffset
;
const
char
*
saslEncoded
;
unsigned
int
saslEncodedLength
;
unsigned
int
saslEncodedOffset
;
#endif
/* 4 byte length, followed by RPC message header+body */
char
buffer
[
4
+
REMOTE_MESSAGE_MAX
];
unsigned
int
bufferLength
;
unsigned
int
bufferOffset
;
/* The list of domain event callbacks */
virDomainEventCallbackListPtr
callbackList
;
/* The queue of domain events generated
...
...
@@ -114,6 +164,13 @@ struct private_data {
virDomainEventQueuePtr
domainEvents
;
/* Timer for flushing domainEvents queue */
int
eventFlushTimer
;
/* Self-pipe to wakeup threads waiting in poll() */
int
wakeupSendFD
;
int
wakeupReadFD
;
/* List of threads currently waiting for dispatch */
struct
remote_thread_call
*
waitDispatch
;
};
enum
{
...
...
@@ -160,7 +217,6 @@ static void make_nonnull_network (remote_nonnull_network *net_dst, virNetworkPtr
static
void
make_nonnull_storage_pool
(
remote_nonnull_storage_pool
*
pool_dst
,
virStoragePoolPtr
vol_src
);
static
void
make_nonnull_storage_vol
(
remote_nonnull_storage_vol
*
vol_dst
,
virStorageVolPtr
vol_src
);
void
remoteDomainEventFired
(
int
watch
,
int
fd
,
int
event
,
void
*
data
);
static
void
remoteDomainProcessEvent
(
virConnectPtr
conn
,
XDR
*
xdr
);
static
void
remoteDomainQueueEvent
(
virConnectPtr
conn
,
XDR
*
xdr
);
void
remoteDomainEventQueueFlush
(
int
timer
,
void
*
opaque
);
/*----------------------------------------------------------------------*/
...
...
@@ -274,6 +330,7 @@ doRemoteOpen (virConnectPtr conn,
virConnectAuthPtr
auth
ATTRIBUTE_UNUSED
,
int
flags
)
{
int
wakeupFD
[
2
];
char
*
transport_str
=
NULL
;
if
(
conn
->
uri
)
{
...
...
@@ -696,6 +753,21 @@ doRemoteOpen (virConnectPtr conn,
}
/* switch (transport) */
if
(
virSetNonBlock
(
priv
->
sock
)
<
0
)
{
errorf
(
conn
,
VIR_ERR_SYSTEM_ERROR
,
_
(
"unable to make socket non-blocking %s"
),
strerror
(
errno
));
goto
failed
;
}
if
(
pipe
(
wakeupFD
)
<
0
)
{
errorf
(
conn
,
VIR_ERR_SYSTEM_ERROR
,
_
(
"unable to make pipe %s"
),
strerror
(
errno
));
goto
failed
;
}
priv
->
wakeupReadFD
=
wakeupFD
[
0
];
priv
->
wakeupSendFD
=
wakeupFD
[
1
];
/* Try and authenticate with server */
if
(
remoteAuthenticate
(
conn
,
priv
,
1
,
auth
,
authtype
)
==
-
1
)
...
...
@@ -768,6 +840,7 @@ doRemoteOpen (virConnectPtr conn,
DEBUG0
(
"virEventAddTimeout failed: No addTimeoutImpl defined. "
"continuing without events."
);
virEventRemoveHandle
(
priv
->
watch
);
priv
->
watch
=
-
1
;
}
}
/* Successful. */
...
...
@@ -848,6 +921,7 @@ remoteOpen (virConnectPtr conn,
}
remoteDriverLock
(
priv
);
priv
->
localUses
=
1
;
priv
->
watch
=
-
1
;
if
(
flags
&
VIR_CONNECT_RO
)
rflags
|=
VIR_DRV_OPEN_REMOTE_RO
;
...
...
@@ -1220,6 +1294,7 @@ doRemoteClose (virConnectPtr conn, struct private_data *priv)
virEventRemoveTimeout
(
priv
->
eventFlushTimer
);
/* Remove handle for remote events */
virEventRemoveHandle
(
priv
->
watch
);
priv
->
watch
=
-
1
;
}
/* Close socket. */
...
...
@@ -5537,90 +5612,373 @@ done:
/*----------------------------------------------------------------------*/
static
int
really_write
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
,
char
*
bytes
,
int
len
);
static
int
really_read
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
,
char
*
bytes
,
int
len
);
/* This function performs a remote procedure call to procedure PROC_NR.
*
* NB. This does not free the args structure (not desirable, since you
* often want this allocated on the stack or else it contains strings
* which come from the user). It does however free any intermediate
* results, eg. the error structure if there is one.
*
* NB(2). Make sure to memset (&ret, 0, sizeof ret) before calling,
* else Bad Things will happen in the XDR code.
*/
static
int
doCall
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
flags
/* if we are in virConnectOpen */
,
static
struct
remote_thread_call
*
prepareCall
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
flags
,
int
proc_nr
,
xdrproc_t
args_filter
,
char
*
args
,
xdrproc_t
ret_filter
,
char
*
ret
)
{
char
buffer
[
REMOTE_MESSAGE_MAX
];
char
buffer2
[
4
];
struct
remote_message_header
hdr
;
XDR
xdr
;
int
len
;
struct
remote_error
rerror
;
struct
remote_message_header
hdr
;
struct
remote_thread_call
*
rv
;
if
(
VIR_ALLOC
(
rv
)
<
0
)
return
NULL
;
if
(
virCondInit
(
&
rv
->
cond
)
<
0
)
{
VIR_FREE
(
rv
);
error
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
VIR_ERR_INTERNAL_ERROR
,
_
(
"cannot initialize mutex"
));
return
NULL
;
}
/* Get a unique serial number for this message. */
int
serial
=
priv
->
counter
++
;
rv
->
serial
=
priv
->
counter
++
;
rv
->
proc_nr
=
proc_nr
;
rv
->
ret_filter
=
ret_filter
;
rv
->
ret
=
ret
;
hdr
.
prog
=
REMOTE_PROGRAM
;
hdr
.
vers
=
REMOTE_PROTOCOL_VERSION
;
hdr
.
proc
=
proc_nr
;
hdr
.
direction
=
REMOTE_CALL
;
hdr
.
serial
=
serial
;
hdr
.
serial
=
rv
->
serial
;
hdr
.
status
=
REMOTE_OK
;
/* Serialise header followed by args. */
xdrmem_create
(
&
xdr
,
buffer
,
sizeof
buffer
,
XDR_ENCODE
);
xdrmem_create
(
&
xdr
,
rv
->
buffer
+
4
,
REMOTE_MESSAGE_MAX
,
XDR_ENCODE
);
if
(
!
xdr_remote_message_header
(
&
xdr
,
&
hdr
))
{
error
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
VIR_ERR_RPC
,
_
(
"xdr_remote_message_header failed"
));
return
-
1
;
goto
error
;
}
if
(
!
(
*
args_filter
)
(
&
xdr
,
args
))
{
error
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
VIR_ERR_RPC
,
_
(
"marshalling args"
));
return
-
1
;
goto
error
;
}
/* Get the length stored in buffer. */
len
=
xdr_getpos
(
&
xdr
);
rv
->
bufferLength
=
xdr_getpos
(
&
xdr
);
xdr_destroy
(
&
xdr
);
/* Length must include the length word itself (always encoded in
* 4 bytes as per RFC 4506).
*/
len
+=
4
;
rv
->
bufferLength
+=
4
;
/* Encode the length word. */
xdrmem_create
(
&
xdr
,
buffer2
,
sizeof
buffer2
,
XDR_ENCODE
);
if
(
!
xdr_int
(
&
xdr
,
&
len
))
{
xdrmem_create
(
&
xdr
,
rv
->
buffer
,
4
,
XDR_ENCODE
);
if
(
!
xdr_int
(
&
xdr
,
(
int
*
)
&
rv
->
bufferLength
))
{
error
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
VIR_ERR_RPC
,
_
(
"xdr_int (length word)"
));
return
-
1
;
goto
error
;
}
xdr_destroy
(
&
xdr
);
/* Send length word followed by header+args. */
if
(
really_write
(
conn
,
priv
,
flags
&
REMOTE_CALL_IN_OPEN
,
buffer2
,
sizeof
buffer2
)
==
-
1
||
really_write
(
conn
,
priv
,
flags
&
REMOTE_CALL_IN_OPEN
,
buffer
,
len
-
4
)
==
-
1
)
return
rv
;
error:
xdr_destroy
(
&
xdr
);
VIR_FREE
(
rv
);
return
NULL
;
}
static
int
processCallWrite
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
/* if we are in virConnectOpen */
,
const
char
*
bytes
,
int
len
)
{
int
ret
;
if
(
priv
->
uses_tls
)
{
tls_resend:
ret
=
gnutls_record_send
(
priv
->
session
,
bytes
,
len
);
if
(
ret
<
0
)
{
if
(
ret
==
GNUTLS_E_INTERRUPTED
)
goto
tls_resend
;
if
(
ret
==
GNUTLS_E_AGAIN
)
return
0
;
error
(
in_open
?
NULL
:
conn
,
VIR_ERR_GNUTLS_ERROR
,
gnutls_strerror
(
ret
));
return
-
1
;
}
}
else
{
resend:
ret
=
send
(
priv
->
sock
,
bytes
,
len
,
0
);
if
(
ret
==
-
1
)
{
if
(
errno
==
EINTR
)
goto
resend
;
if
(
errno
==
EWOULDBLOCK
)
return
0
;
error
(
in_open
?
NULL
:
conn
,
VIR_ERR_SYSTEM_ERROR
,
strerror
(
errno
));
return
-
1
;
}
}
return
ret
;
}
static
int
processCallRead
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
/* if we are in virConnectOpen */
,
char
*
bytes
,
int
len
)
{
int
ret
;
if
(
priv
->
uses_tls
)
{
tls_resend:
ret
=
gnutls_record_recv
(
priv
->
session
,
bytes
,
len
);
if
(
ret
==
GNUTLS_E_INTERRUPTED
)
goto
tls_resend
;
if
(
ret
==
GNUTLS_E_AGAIN
)
return
0
;
/* Treat 0 == EOF as an error */
if
(
ret
<=
0
)
{
if
(
ret
<
0
)
errorf
(
in_open
?
NULL
:
conn
,
VIR_ERR_GNUTLS_ERROR
,
_
(
"failed to read from TLS socket %s"
),
gnutls_strerror
(
ret
));
else
errorf
(
in_open
?
NULL
:
conn
,
VIR_ERR_SYSTEM_ERROR
,
"%s"
,
_
(
"server closed connection"
));
return
-
1
;
}
}
else
{
resend:
ret
=
recv
(
priv
->
sock
,
bytes
,
len
,
0
);
if
(
ret
<=
0
)
{
if
(
ret
==
-
1
)
{
if
(
errno
==
EINTR
)
goto
resend
;
if
(
errno
==
EWOULDBLOCK
)
return
0
;
errorf
(
in_open
?
NULL
:
conn
,
VIR_ERR_SYSTEM_ERROR
,
_
(
"failed to read from socket %s"
),
strerror
(
errno
));
}
else
{
errorf
(
in_open
?
NULL
:
conn
,
VIR_ERR_SYSTEM_ERROR
,
"%s"
,
_
(
"server closed connection"
));
}
return
-
1
;
}
}
return
ret
;
}
static
int
processCallSendOne
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
,
struct
remote_thread_call
*
thecall
)
{
#if HAVE_SASL
if
(
priv
->
saslconn
)
{
const
char
*
output
;
unsigned
int
outputlen
;
int
err
,
ret
;
if
(
!
priv
->
saslEncoded
)
{
err
=
sasl_encode
(
priv
->
saslconn
,
thecall
->
buffer
+
thecall
->
bufferOffset
,
thecall
->
bufferLength
-
thecall
->
bufferOffset
,
&
output
,
&
outputlen
);
if
(
err
!=
SASL_OK
)
{
errorf
(
in_open
?
NULL
:
conn
,
VIR_ERR_INTERNAL_ERROR
,
_
(
"failed to encode SASL data: %s"
),
sasl_errstring
(
err
,
NULL
,
NULL
));
return
-
1
;
}
priv
->
saslEncoded
=
output
;
priv
->
saslEncodedLength
=
outputlen
;
priv
->
saslEncodedOffset
=
0
;
thecall
->
bufferOffset
=
thecall
->
bufferLength
;
}
ret
=
processCallWrite
(
conn
,
priv
,
in_open
,
priv
->
saslEncoded
+
priv
->
saslEncodedOffset
,
priv
->
saslEncodedLength
-
priv
->
saslEncodedOffset
);
if
(
ret
<
0
)
return
ret
;
priv
->
saslEncodedOffset
+=
ret
;
if
(
priv
->
saslEncodedOffset
==
priv
->
saslEncodedLength
)
{
priv
->
saslEncoded
=
NULL
;
priv
->
saslEncodedOffset
=
priv
->
saslEncodedLength
=
0
;
thecall
->
mode
=
REMOTE_MODE_WAIT_RX
;
}
}
else
{
#endif
int
ret
;
ret
=
processCallWrite
(
conn
,
priv
,
in_open
,
thecall
->
buffer
+
thecall
->
bufferOffset
,
thecall
->
bufferLength
-
thecall
->
bufferOffset
);
if
(
ret
<
0
)
return
ret
;
thecall
->
bufferOffset
+=
ret
;
if
(
thecall
->
bufferOffset
==
thecall
->
bufferLength
)
{
thecall
->
bufferOffset
=
thecall
->
bufferLength
=
0
;
thecall
->
mode
=
REMOTE_MODE_WAIT_RX
;
}
#if HAVE_SASL
}
#endif
return
0
;
}
static
int
processCallSend
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
)
{
struct
remote_thread_call
*
thecall
=
priv
->
waitDispatch
;
while
(
thecall
&&
thecall
->
mode
!=
REMOTE_MODE_WAIT_TX
)
thecall
=
thecall
->
next
;
if
(
!
thecall
)
return
-
1
;
/* Shouldn't happen, but you never know... */
while
(
thecall
)
{
int
ret
=
processCallSendOne
(
conn
,
priv
,
in_open
,
thecall
);
if
(
ret
<
0
)
return
ret
;
if
(
thecall
->
mode
==
REMOTE_MODE_WAIT_TX
)
return
0
;
/* Blocking write, to back to event loop */
thecall
=
thecall
->
next
;
}
return
0
;
/* No more calls to send, all done */
}
static
int
processCallRecvSome
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
)
{
unsigned
int
wantData
;
/* Start by reading length word */
if
(
priv
->
bufferLength
==
0
)
priv
->
bufferLength
=
4
;
wantData
=
priv
->
bufferLength
-
priv
->
bufferOffset
;
#if HAVE_SASL
if
(
priv
->
saslconn
)
{
if
(
priv
->
saslDecoded
==
NULL
)
{
char
encoded
[
8192
];
unsigned
int
encodedLen
=
sizeof
(
encoded
);
int
ret
,
err
;
ret
=
processCallRead
(
conn
,
priv
,
in_open
,
encoded
,
encodedLen
);
if
(
ret
<
0
)
return
-
1
;
if
(
ret
==
0
)
return
0
;
err
=
sasl_decode
(
priv
->
saslconn
,
encoded
,
ret
,
&
priv
->
saslDecoded
,
&
priv
->
saslDecodedLength
);
if
(
err
!=
SASL_OK
)
{
errorf
(
in_open
?
NULL
:
conn
,
VIR_ERR_INTERNAL_ERROR
,
_
(
"failed to decode SASL data: %s"
),
sasl_errstring
(
err
,
NULL
,
NULL
));
return
-
1
;
}
priv
->
saslDecodedOffset
=
0
;
}
if
((
priv
->
saslDecodedLength
-
priv
->
saslDecodedOffset
)
<
wantData
)
wantData
=
(
priv
->
saslDecodedLength
-
priv
->
saslDecodedOffset
);
memcpy
(
priv
->
buffer
+
priv
->
bufferOffset
,
priv
->
saslDecoded
+
priv
->
saslDecodedOffset
,
wantData
);
priv
->
saslDecodedOffset
+=
wantData
;
priv
->
bufferOffset
+=
wantData
;
if
(
priv
->
saslDecodedOffset
==
priv
->
saslDecodedLength
)
{
priv
->
saslDecodedLength
=
priv
->
saslDecodedLength
=
0
;
priv
->
saslDecoded
=
NULL
;
}
return
wantData
;
}
else
{
#endif
int
ret
;
retry_read:
/* Read and deserialise length word. */
if
(
really_read
(
conn
,
priv
,
flags
&
REMOTE_CALL_IN_OPEN
,
buffer2
,
sizeof
buffer2
)
==
-
1
)
ret
=
processCallRead
(
conn
,
priv
,
in_open
,
priv
->
buffer
+
priv
->
bufferOffset
,
wantData
);
if
(
ret
<
0
)
return
-
1
;
if
(
ret
==
0
)
return
0
;
priv
->
bufferOffset
+=
ret
;
return
ret
;
#if HAVE_SASL
}
#endif
}
static
void
processCallAsyncEvent
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
,
remote_message_header
*
hdr
,
XDR
*
xdr
)
{
/* An async message has come in while we were waiting for the
* response. Process it to pull it off the wire, and try again
*/
DEBUG0
(
"Encountered an event while waiting for a response"
);
if
(
in_open
)
{
DEBUG
(
"Ignoring bogus event %d received while in open"
,
hdr
->
proc
);
return
;
}
if
(
hdr
->
proc
==
REMOTE_PROC_DOMAIN_EVENT
)
{
remoteDomainQueueEvent
(
conn
,
xdr
);
virEventUpdateTimeout
(
priv
->
eventFlushTimer
,
0
);
}
else
{
DEBUG
(
"Unexpected event proc %d"
,
hdr
->
proc
);
}
}
static
int
processCallRecvLen
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
)
{
XDR
xdr
;
int
len
;
xdrmem_create
(
&
xdr
,
buffer2
,
sizeof
buffer2
,
XDR_DECODE
);
xdrmem_create
(
&
xdr
,
priv
->
buffer
,
priv
->
bufferLength
,
XDR_DECODE
);
if
(
!
xdr_int
(
&
xdr
,
&
len
))
{
error
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
error
(
in_open
?
NULL
:
conn
,
VIR_ERR_RPC
,
_
(
"xdr_int (length word, reply)"
));
return
-
1
;
}
...
...
@@ -5630,26 +5988,38 @@ retry_read:
len
-=
4
;
if
(
len
<
0
||
len
>
REMOTE_MESSAGE_MAX
)
{
error
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
error
(
in_open
?
NULL
:
conn
,
VIR_ERR_RPC
,
_
(
"packet received from server too large"
));
return
-
1
;
}
/* Read reply header and what follows (either a ret or an error). */
if
(
really_read
(
conn
,
priv
,
flags
&
REMOTE_CALL_IN_OPEN
,
buffer
,
len
)
==
-
1
)
return
-
1
;
/* Extend our declared buffer length and carry
on reading the header + payload */
priv
->
bufferLength
+=
len
;
DEBUG
(
"Got length, now need %d total (%d more)"
,
priv
->
bufferLength
,
len
);
return
0
;
}
static
int
processCallRecvMsg
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
)
{
XDR
xdr
;
struct
remote_message_header
hdr
;
int
len
=
priv
->
bufferLength
-
4
;
struct
remote_thread_call
*
thecall
;
/* Deserialise reply header. */
xdrmem_create
(
&
xdr
,
buffer
,
len
,
XDR_DECODE
);
xdrmem_create
(
&
xdr
,
priv
->
buffer
+
4
,
len
,
XDR_DECODE
);
if
(
!
xdr_remote_message_header
(
&
xdr
,
&
hdr
))
{
error
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
error
(
in_open
?
NULL
:
conn
,
VIR_ERR_RPC
,
_
(
"invalid header in reply"
));
return
-
1
;
}
/* Check program, version, etc. are what we expect. */
if
(
hdr
.
prog
!=
REMOTE_PROGRAM
)
{
virRaiseError
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
virRaiseError
(
in_open
?
NULL
:
conn
,
NULL
,
NULL
,
VIR_FROM_REMOTE
,
VIR_ERR_RPC
,
VIR_ERR_ERROR
,
NULL
,
NULL
,
NULL
,
0
,
0
,
_
(
"unknown program (received %x, expected %x)"
),
...
...
@@ -5657,7 +6027,7 @@ retry_read:
return
-
1
;
}
if
(
hdr
.
vers
!=
REMOTE_PROTOCOL_VERSION
)
{
virRaiseError
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
virRaiseError
(
in_open
?
NULL
:
conn
,
NULL
,
NULL
,
VIR_FROM_REMOTE
,
VIR_ERR_RPC
,
VIR_ERR_ERROR
,
NULL
,
NULL
,
NULL
,
0
,
0
,
_
(
"unknown protocol version (received %x, expected %x)"
),
...
...
@@ -5665,41 +6035,49 @@ retry_read:
return
-
1
;
}
if
(
hdr
.
proc
==
REMOTE_PROC_DOMAIN_EVENT
&&
hdr
.
direction
==
REMOTE_MESSAGE
)
{
/* An async message has come in while we were waiting for the
* response. Process it to pull it off the wire, and try again
*/
DEBUG0
(
"Encountered an event while waiting for a response"
);
remoteDomainQueueEvent
(
conn
,
&
xdr
);
virEventUpdateTimeout
(
priv
->
eventFlushTimer
,
0
);
DEBUG0
(
"Retrying read"
);
xdr_destroy
(
&
xdr
);
goto
retry_read
;
/* Async events from server need special handling */
if
(
hdr
.
direction
==
REMOTE_MESSAGE
)
{
processCallAsyncEvent
(
conn
,
priv
,
in_open
,
&
hdr
,
&
xdr
);
xdr_destroy
(
&
xdr
);
return
0
;
}
if
(
hdr
.
proc
!=
proc_nr
)
{
virRaiseError
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
if
(
hdr
.
direction
!=
REMOTE_REPLY
)
{
virRaiseError
(
in_open
?
NULL
:
conn
,
NULL
,
NULL
,
VIR_FROM_REMOTE
,
VIR_ERR_RPC
,
VIR_ERR_ERROR
,
NULL
,
NULL
,
NULL
,
0
,
0
,
_
(
"unknown procedure (received %x, expected %x)"
),
hdr
.
proc
,
proc_nr
);
_
(
"got unexpected RPC call %d from server"
),
hdr
.
proc
);
xdr_destroy
(
&
xdr
);
return
-
1
;
}
if
(
hdr
.
direction
!=
REMOTE_REPLY
)
{
virRaiseError
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
/* Ok, definitely got an RPC reply now find
out who's been waiting for it */
thecall
=
priv
->
waitDispatch
;
while
(
thecall
&&
thecall
->
serial
!=
hdr
.
serial
)
thecall
=
thecall
->
next
;
if
(
!
thecall
)
{
virRaiseError
(
in_open
?
NULL
:
conn
,
NULL
,
NULL
,
VIR_FROM_REMOTE
,
VIR_ERR_RPC
,
VIR_ERR_ERROR
,
NULL
,
NULL
,
NULL
,
0
,
0
,
_
(
"unknown direction (received %x, expected %x)"
),
hdr
.
direction
,
REMOTE_REPLY
);
_
(
"no call waiting for reply with serial %d"
),
hdr
.
serial
);
xdr_destroy
(
&
xdr
);
return
-
1
;
}
if
(
hdr
.
serial
!=
serial
)
{
virRaiseError
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
NULL
,
NULL
,
VIR_FROM_REMOTE
,
if
(
hdr
.
proc
!=
thecall
->
proc_nr
)
{
virRaiseError
(
in_open
?
NULL
:
conn
,
NULL
,
NULL
,
VIR_FROM_REMOTE
,
VIR_ERR_RPC
,
VIR_ERR_ERROR
,
NULL
,
NULL
,
NULL
,
0
,
0
,
_
(
"unknown serial (received %x, expected %x)"
),
hdr
.
serial
,
serial
);
_
(
"unknown procedure (received %x, expected %x)"
),
hdr
.
proc
,
thecall
->
proc_nr
);
xdr_destroy
(
&
xdr
);
return
-
1
;
}
...
...
@@ -5709,37 +6087,28 @@ retry_read:
*/
switch
(
hdr
.
status
)
{
case
REMOTE_OK
:
if
(
!
(
*
ret_filter
)
(
&
xdr
,
ret
))
{
error
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
VIR_ERR_RPC
,
if
(
!
(
*
thecall
->
ret_filter
)
(
&
xdr
,
thecall
->
ret
))
{
error
(
in_open
?
NULL
:
conn
,
VIR_ERR_RPC
,
_
(
"unmarshalling ret"
));
return
-
1
;
}
thecall
->
mode
=
REMOTE_MODE_COMPLETE
;
xdr_destroy
(
&
xdr
);
return
0
;
case
REMOTE_ERROR
:
memset
(
&
rerror
,
0
,
sizeof
rerro
r
);
if
(
!
xdr_remote_error
(
&
xdr
,
&
rerro
r
))
{
error
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
memset
(
&
thecall
->
err
,
0
,
sizeof
thecall
->
er
r
);
if
(
!
xdr_remote_error
(
&
xdr
,
&
thecall
->
er
r
))
{
error
(
in_open
?
NULL
:
conn
,
VIR_ERR_RPC
,
_
(
"unmarshalling remote_error"
));
return
-
1
;
}
xdr_destroy
(
&
xdr
);
/* See if caller asked us to keep quiet about missing RPCs
* eg for interop with older servers */
if
(
flags
&
REMOTE_CALL_QUIET_MISSING_RPC
&&
rerror
.
domain
==
VIR_FROM_REMOTE
&&
rerror
.
code
==
VIR_ERR_RPC
&&
rerror
.
level
==
VIR_ERR_ERROR
&&
STRPREFIX
(
*
rerror
.
message
,
"unknown procedure"
))
{
return
-
2
;
}
server_error
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
&
rerror
);
xdr_free
((
xdrproc_t
)
xdr_remote_error
,
(
char
*
)
&
rerror
);
return
-
1
;
thecall
->
mode
=
REMOTE_MODE_ERROR
;
return
0
;
default:
virRaiseError
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
NULL
,
NULL
,
VIR_FROM_REMOTE
,
virRaiseError
(
in_open
?
NULL
:
conn
,
NULL
,
NULL
,
VIR_FROM_REMOTE
,
VIR_ERR_RPC
,
VIR_ERR_ERROR
,
NULL
,
NULL
,
NULL
,
0
,
0
,
_
(
"unknown status (received %x)"
),
hdr
.
status
);
...
...
@@ -5750,225 +6119,429 @@ retry_read:
static
int
call
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
flags
/* if we are in virConnectOpen */
,
int
proc_nr
,
xdrproc_t
args_filter
,
char
*
args
,
xdrproc_t
ret_filter
,
char
*
ret
)
{
int
rv
;
/*
* Avoid needless wake-ups of the event loop in the
* case where this call is being made from a different
* thread than the event loop. These wake-ups would
* cause the event loop thread to be blocked on the
* mutex for the duration of the call
processCallRecv
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
)
{
int
ret
;
/* Read as much data as is available, until we get
* EGAIN
*/
if
(
priv
->
watch
>=
0
)
virEventUpdateHandle
(
priv
->
watch
,
0
);
for
(;;)
{
ret
=
processCallRecvSome
(
conn
,
priv
,
in_open
);
rv
=
doCall
(
conn
,
priv
,
flags
,
proc_nr
,
args_filter
,
args
,
ret_filter
,
ret
);
if
(
ret
<
0
)
return
-
1
;
if
(
ret
==
0
)
return
0
;
/* Blocking on read */
if
(
priv
->
watch
>=
0
)
virEventUpdateHandle
(
priv
->
watch
,
VIR_EVENT_HANDLE_READABLE
);
return
rv
;
/* Check for completion of our goal */
if
(
priv
->
bufferOffset
==
priv
->
bufferLength
)
{
if
(
priv
->
bufferOffset
==
4
)
{
ret
=
processCallRecvLen
(
conn
,
priv
,
in_open
);
}
else
{
ret
=
processCallRecvMsg
(
conn
,
priv
,
in_open
);
priv
->
bufferOffset
=
priv
->
bufferLength
=
0
;
}
if
(
ret
<
0
)
return
-
1
;
}
}
}
/*
* Process all calls pending dispatch/receive until we
* get a reply to our own call. Then quit and pass the buck
* to someone else.
*/
static
int
really_write_buf
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
/* if we are in virConnectOpen */
,
const
char
*
bytes
,
int
len
)
processCalls
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
,
struct
remote_thread_call
*
thiscall
)
{
const
char
*
p
;
int
err
;
struct
pollfd
fds
[
2
]
;
int
ret
;
p
=
bytes
;
if
(
priv
->
uses_tls
)
{
do
{
err
=
gnutls_record_send
(
priv
->
session
,
p
,
len
);
if
(
err
<
0
)
{
if
(
err
==
GNUTLS_E_INTERRUPTED
||
err
==
GNUTLS_E_AGAIN
)
fds
[
0
].
fd
=
priv
->
sock
;
fds
[
1
].
fd
=
priv
->
wakeupReadFD
;
for
(;;)
{
struct
remote_thread_call
*
tmp
=
priv
->
waitDispatch
;
struct
remote_thread_call
*
prev
;
char
ignore
;
fds
[
0
].
events
=
fds
[
0
].
revents
=
0
;
fds
[
1
].
events
=
fds
[
1
].
revents
=
0
;
fds
[
1
].
events
=
POLLIN
;
while
(
tmp
)
{
if
(
tmp
->
mode
==
REMOTE_MODE_WAIT_RX
)
fds
[
0
].
events
|=
POLLIN
;
if
(
tmp
->
mode
==
REMOTE_MODE_WAIT_TX
)
fds
[
0
].
events
|=
POLLOUT
;
tmp
=
tmp
->
next
;
}
/* Release lock while poll'ing so other threads
* can stuff themselves on the queue */
remoteDriverUnlock
(
priv
);
repoll:
ret
=
poll
(
fds
,
ARRAY_CARDINALITY
(
fds
),
-
1
);
if
(
ret
<
0
&&
errno
==
EINTR
)
goto
repoll
;
remoteDriverLock
(
priv
);
if
(
fds
[
1
].
revents
)
{
DEBUG0
(
"Woken up from poll by other thread"
);
saferead
(
priv
->
wakeupReadFD
,
&
ignore
,
sizeof
(
ignore
));
}
if
(
ret
<
0
)
{
if
(
errno
==
EWOULDBLOCK
)
continue
;
error
(
in_open
?
NULL
:
conn
,
VIR_ERR_GNUTLS_ERROR
,
gnutls_strerror
(
err
));
errorf
(
in_open
?
NULL
:
conn
,
VIR_ERR_INTERNAL_ERROR
,
_
(
"poll on socket failed %s"
),
strerror
(
errno
));
return
-
1
;
}
len
-=
err
;
p
+=
err
;
if
(
fds
[
0
].
revents
&
POLLOUT
)
{
if
(
processCallSend
(
conn
,
priv
,
in_open
)
<
0
)
return
-
1
;
}
while
(
len
>
0
);
}
else
{
do
{
err
=
send
(
priv
->
sock
,
p
,
len
,
0
);
if
(
err
==
-
1
)
{
if
(
errno
==
EINTR
||
errno
==
EAGAIN
)
continue
;
error
(
in_open
?
NULL
:
conn
,
VIR_ERR_SYSTEM_ERROR
,
strerror
(
errno
));
if
(
fds
[
0
].
revents
&
POLLIN
)
{
if
(
processCallRecv
(
conn
,
priv
,
in_open
)
<
0
)
return
-
1
;
}
len
-=
err
;
p
+=
err
;
/* Iterate through waiting threads and if
* any are complete then tell 'em to wakeup
*/
tmp
=
priv
->
waitDispatch
;
prev
=
NULL
;
while
(
tmp
)
{
if
(
tmp
!=
thiscall
&&
(
tmp
->
mode
==
REMOTE_MODE_COMPLETE
||
tmp
->
mode
==
REMOTE_MODE_ERROR
))
{
/* Take them out of the list */
if
(
prev
)
prev
->
next
=
tmp
->
next
;
else
priv
->
waitDispatch
=
tmp
->
next
;
/* And wake them up....
* ...they won't actually wakeup until
* we release our mutex a short while
* later...
*/
DEBUG
(
"Waking up sleep %d %p %p"
,
tmp
->
proc_nr
,
tmp
,
priv
->
waitDispatch
);
virCondSignal
(
&
tmp
->
cond
);
}
while
(
len
>
0
);
prev
=
tmp
;
tmp
=
tmp
->
next
;
}
/* Now see if *we* are done */
if
(
thiscall
->
mode
==
REMOTE_MODE_COMPLETE
||
thiscall
->
mode
==
REMOTE_MODE_ERROR
)
{
/* We're at head of the list already, so
* remove us
*/
priv
->
waitDispatch
=
thiscall
->
next
;
DEBUG
(
"Giving up the buck %d %p %p"
,
thiscall
->
proc_nr
,
thiscall
,
priv
->
waitDispatch
);
/* See if someone else is still waiting
* and if so, then pass the buck ! */
if
(
priv
->
waitDispatch
)
{
DEBUG
(
"Passing the buck to %d %p"
,
priv
->
waitDispatch
->
proc_nr
,
priv
->
waitDispatch
);
virCondSignal
(
&
priv
->
waitDispatch
->
cond
);
}
return
0
;
}
}
static
int
really_write_plain
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
/* if we are in virConnectOpen */
,
char
*
bytes
,
int
len
)
{
return
really_write_buf
(
conn
,
priv
,
in_open
,
bytes
,
len
);
if
(
fds
[
0
].
revents
&
(
POLLHUP
|
POLLERR
))
{
errorf
(
in_open
?
NULL
:
conn
,
VIR_ERR_INTERNAL_ERROR
,
"%s"
,
_
(
"received hangup / error event on socket"
));
return
-
1
;
}
}
}
#if HAVE_SASL
/*
* This function performs a remote procedure call to procedure PROC_NR.
*
* NB. This does not free the args structure (not desirable, since you
* often want this allocated on the stack or else it contains strings
* which come from the user). It does however free any intermediate
* results, eg. the error structure if there is one.
*
* NB(2). Make sure to memset (&ret, 0, sizeof ret) before calling,
* else Bad Things will happen in the XDR code.
*
* NB(3) You must have the private_data lock before calling this
*
* NB(4) This is very complicated. Due to connection cloning, multiple
* threads can want to use the socket at once. Obviously only one of
* them can. So if someone's using the socket, other threads are put
* to sleep on condition variables. THe existing thread may completely
* send & receive their RPC call/reply while they're asleep. Or it
* may only get around to dealing with sending the call. Or it may
* get around to neither. So upon waking up from slumber, the other
* thread may or may not have more work todo.
*
* We call this dance 'passing the buck'
*
* http://en.wikipedia.org/wiki/Passing_the_buck
*
* "Buck passing or passing the buck is the action of transferring
* responsibility or blame unto another person. It is also used as
* a strategy in power politics when the actions of one country/
* nation are blamed on another, providing an opportunity for war."
*
* NB(5) Don't Panic!
*/
static
int
really_write_sasl
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
/* if we are in virConnectOpen */
,
char
*
bytes
,
int
len
)
call
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
flags
/* if we are in virConnectOpen */
,
int
proc_nr
,
xdrproc_t
args_filter
,
char
*
args
,
xdrproc_t
ret_filter
,
char
*
ret
)
{
const
char
*
output
;
unsigned
int
outputlen
;
int
err
;
int
rv
;
struct
remote_thread_call
*
thiscall
;
err
=
sasl_encode
(
priv
->
saslconn
,
bytes
,
len
,
&
output
,
&
outputlen
);
if
(
err
!=
SASL_OK
)
{
DEBUG
(
"Doing call %d %p"
,
proc_nr
,
priv
->
waitDispatch
);
thiscall
=
prepareCall
(
conn
,
priv
,
flags
,
proc_nr
,
args_filter
,
args
,
ret_filter
,
ret
);
if
(
!
thiscall
)
{
error
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
VIR_ERR_NO_MEMORY
,
NULL
);
return
-
1
;
}
return
really_write_buf
(
conn
,
priv
,
in_open
,
output
,
outputlen
);
}
#endif
static
int
really_write
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
/* if we are in virConnectOpen */
,
char
*
bytes
,
int
len
)
{
#if HAVE_SASL
if
(
priv
->
saslconn
)
return
really_write_sasl
(
conn
,
priv
,
in_open
,
bytes
,
len
);
/* Check to see if another thread is dispatching */
if
(
priv
->
waitDispatch
)
{
/* Stick ourselves on the end of the wait queue */
struct
remote_thread_call
*
tmp
=
priv
->
waitDispatch
;
char
ignore
=
1
;
while
(
tmp
&&
tmp
->
next
)
tmp
=
tmp
->
next
;
if
(
tmp
)
tmp
->
next
=
thiscall
;
else
#endif
return
really_write_plain
(
conn
,
priv
,
in_open
,
bytes
,
len
);
}
priv
->
waitDispatch
=
thiscall
;
static
int
really_read_buf
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
/* if we are in virConnectOpen */
,
char
*
bytes
,
int
len
)
{
int
err
;
/* Force other thread to wakup from poll */
safewrite
(
priv
->
wakeupSendFD
,
&
ignore
,
sizeof
(
ignore
));
if
(
priv
->
uses_tls
)
{
tlsreread:
err
=
gnutls_record_recv
(
priv
->
session
,
bytes
,
len
);
if
(
err
<
0
)
{
if
(
err
==
GNUTLS_E_INTERRUPTED
)
goto
tlsreread
;
error
(
in_open
?
NULL
:
conn
,
VIR_ERR_GNUTLS_ERROR
,
gnutls_strerror
(
err
));
DEBUG
(
"Going to sleep %d %p %p"
,
proc_nr
,
priv
->
waitDispatch
,
thiscall
);
/* Go to sleep while other thread is working... */
if
(
virCondWait
(
&
thiscall
->
cond
,
&
priv
->
lock
)
<
0
)
{
if
(
priv
->
waitDispatch
==
thiscall
)
{
priv
->
waitDispatch
=
thiscall
->
next
;
}
else
{
tmp
=
priv
->
waitDispatch
;
while
(
tmp
&&
tmp
->
next
&&
tmp
->
next
!=
thiscall
)
{
tmp
=
tmp
->
next
;
}
if
(
tmp
&&
tmp
->
next
==
thiscall
)
tmp
->
next
=
thiscall
->
next
;
}
errorf
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
VIR_ERR_INTERNAL_ERROR
,
"%s"
,
_
(
"failed to wait on condition"
));
VIR_FREE
(
thiscall
);
return
-
1
;
}
if
(
err
==
0
)
{
error
(
in_open
?
NULL
:
conn
,
VIR_ERR_RPC
,
_
(
"socket closed unexpectedly"
));
return
-
1
;
DEBUG
(
"Wokeup from sleep %d %p %p"
,
proc_nr
,
priv
->
waitDispatch
,
thiscall
);
/* Two reasons we can be woken up
* 1. Other thread has got our reply ready for us
* 2. Other thread is all done, and it is our turn to
* be the dispatcher to finish waiting for
* our reply
*/
if
(
thiscall
->
mode
==
REMOTE_MODE_COMPLETE
||
thiscall
->
mode
==
REMOTE_MODE_ERROR
)
{
/*
* We avoided catching the buck and our reply is ready !
* We've already had 'thiscall' removed from the list
* so just need to (maybe) handle errors & free it
*/
goto
cleanup
;
}
/* Grr, someone passed the buck onto us ... */
}
else
{
reread:
err
=
recv
(
priv
->
sock
,
bytes
,
len
,
0
);
if
(
err
==
-
1
)
{
if
(
errno
==
EINTR
)
goto
reread
;
error
(
in_open
?
NULL
:
conn
,
VIR_ERR_SYSTEM_ERROR
,
strerror
(
errno
));
return
-
1
;
/* We're first to catch the buck */
priv
->
waitDispatch
=
thiscall
;
}
if
(
err
==
0
)
{
error
(
in_open
?
NULL
:
conn
,
VIR_ERR_RPC
,
_
(
"socket closed unexpectedly"
));
DEBUG
(
"We have the buck %d %p %p"
,
proc_nr
,
priv
->
waitDispatch
,
thiscall
);
/*
* The buck stops here!
*
* At this point we're about to own the dispatch
* process...
*/
/*
* Avoid needless wake-ups of the event loop in the
* case where this call is being made from a different
* thread than the event loop. These wake-ups would
* cause the event loop thread to be blocked on the
* mutex for the duration of the call
*/
if
(
priv
->
watch
>=
0
)
virEventUpdateHandle
(
priv
->
watch
,
0
);
rv
=
processCalls
(
conn
,
priv
,
flags
&
REMOTE_CALL_IN_OPEN
?
1
:
0
,
thiscall
);
if
(
priv
->
watch
>=
0
)
virEventUpdateHandle
(
priv
->
watch
,
VIR_EVENT_HANDLE_READABLE
);
if
(
rv
<
0
)
{
VIR_FREE
(
thiscall
);
return
-
1
;
}
}
return
err
;
cleanup:
DEBUG
(
"All done with our call %d %p %p"
,
proc_nr
,
priv
->
waitDispatch
,
thiscall
);
if
(
thiscall
->
mode
==
REMOTE_MODE_ERROR
)
{
/* See if caller asked us to keep quiet about missing RPCs
* eg for interop with older servers */
if
(
flags
&
REMOTE_CALL_QUIET_MISSING_RPC
&&
thiscall
->
err
.
domain
==
VIR_FROM_REMOTE
&&
thiscall
->
err
.
code
==
VIR_ERR_RPC
&&
thiscall
->
err
.
level
==
VIR_ERR_ERROR
&&
STRPREFIX
(
*
thiscall
->
err
.
message
,
"unknown procedure"
))
{
rv
=
-
2
;
}
else
{
server_error
(
flags
&
REMOTE_CALL_IN_OPEN
?
NULL
:
conn
,
&
thiscall
->
err
);
rv
=
-
1
;
}
}
else
{
rv
=
0
;
}
VIR_FREE
(
thiscall
);
return
rv
;
}
static
int
really_read_plain
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
/* if we are in virConnectOpen */
,
char
*
bytes
,
int
len
)
/**
* remoteDomainReadEvent
*
* Read the event data off the wire
*/
static
virDomainEventPtr
remoteDomainReadEvent
(
virConnectPtr
conn
,
XDR
*
xdr
)
{
do
{
int
ret
=
really_read_buf
(
conn
,
priv
,
in_open
,
bytes
,
len
)
;
if
(
ret
<
0
)
return
-
1
;
remote_domain_event_ret
ret
;
virDomainPtr
dom
;
virDomainEventPtr
event
=
NULL
;
memset
(
&
ret
,
0
,
sizeof
ret
)
;
len
-=
ret
;
bytes
+=
ret
;
}
while
(
len
>
0
);
/* unmarshall parameters, and process it*/
if
(
!
xdr_remote_domain_event_ret
(
xdr
,
&
ret
)
)
{
error
(
conn
,
VIR_ERR_RPC
,
_
(
"remoteDomainProcessEvent: unmarshalling ret"
));
return
NULL
;
}
return
0
;
dom
=
get_nonnull_domain
(
conn
,
ret
.
dom
);
if
(
!
dom
)
return
NULL
;
event
=
virDomainEventNewFromDom
(
dom
,
ret
.
event
,
ret
.
detail
);
virDomainFree
(
dom
);
return
event
;
}
#if HAVE_SASL
static
int
really_read_sasl
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
/* if we are in virConnectOpen */
,
char
*
bytes
,
int
len
)
static
void
remoteDomainQueueEvent
(
virConnectPtr
conn
,
XDR
*
xdr
)
{
do
{
int
want
,
got
;
if
(
priv
->
saslDecoded
==
NULL
)
{
char
encoded
[
8192
];
int
encodedLen
=
sizeof
(
encoded
);
int
err
,
ret
;
ret
=
really_read_buf
(
conn
,
priv
,
in_open
,
encoded
,
encodedLen
);
if
(
ret
<
0
)
return
-
1
;
struct
private_data
*
priv
=
conn
->
privateData
;
virDomainEventPtr
event
;
err
=
sasl_decode
(
priv
->
saslconn
,
encoded
,
ret
,
&
priv
->
saslDecoded
,
&
priv
->
saslDecodedLength
);
event
=
remoteDomainReadEvent
(
conn
,
xdr
);
if
(
!
event
)
return
;
if
(
virDomainEventQueuePush
(
priv
->
domainEvents
,
event
)
<
0
)
{
DEBUG0
(
"Error adding event to queue"
);
virDomainEventFree
(
event
);
}
}
got
=
priv
->
saslDecodedLength
-
priv
->
saslDecodedOffset
;
want
=
len
;
if
(
want
>
got
)
want
=
got
;
/** remoteDomainEventFired:
*
* The callback for monitoring the remote socket
* for event data
*/
void
remoteDomainEventFired
(
int
watch
,
int
fd
,
int
event
,
void
*
opaque
)
{
virConnectPtr
conn
=
opaque
;
struct
private_data
*
priv
=
conn
->
privateData
;
memcpy
(
bytes
,
priv
->
saslDecoded
+
priv
->
saslDecodedOffset
,
want
);
priv
->
saslDecodedOffset
+=
want
;
if
(
priv
->
saslDecodedOffset
==
priv
->
saslDecodedLength
)
{
priv
->
saslDecoded
=
NULL
;
priv
->
saslDecodedOffset
=
priv
->
saslDecodedLength
=
0
;
remoteDriverLock
(
priv
);
/* This should be impossible, but it doesn't hurt to check */
if
(
priv
->
waitDispatch
)
goto
done
;
DEBUG
(
"Event fired %d %d %d %X"
,
watch
,
fd
,
event
,
event
);
if
(
event
&
(
VIR_EVENT_HANDLE_HANGUP
|
VIR_EVENT_HANDLE_ERROR
))
{
DEBUG
(
"%s : VIR_EVENT_HANDLE_HANGUP or "
"VIR_EVENT_HANDLE_ERROR encountered"
,
__FUNCTION__
);
virEventRemoveHandle
(
watch
);
priv
->
watch
=
-
1
;
goto
done
;
}
bytes
+=
want
;
len
-=
want
;
}
while
(
len
>
0
);
return
0
;
if
(
fd
!=
priv
->
sock
)
{
virEventRemoveHandle
(
watch
);
priv
->
watch
=
-
1
;
goto
done
;
}
if
(
processCallRecv
(
conn
,
priv
,
0
)
<
0
)
DEBUG0
(
"Something went wrong during async message processing"
);
done:
remoteDriverUnlock
(
priv
);
}
#endif
static
int
really_read
(
virConnectPtr
conn
,
struct
private_data
*
priv
,
int
in_open
/* if we are in virConnectOpen */
,
char
*
bytes
,
int
len
)
void
remoteDomainEventQueueFlush
(
int
timer
ATTRIBUTE_UNUSED
,
void
*
opaque
)
{
#if HAVE_SASL
if
(
priv
->
saslconn
)
return
really_read_sasl
(
conn
,
priv
,
in_open
,
bytes
,
len
);
else
#endif
return
really_read_plain
(
conn
,
priv
,
in_open
,
bytes
,
len
);
virConnectPtr
conn
=
opaque
;
struct
private_data
*
priv
=
conn
->
privateData
;
remoteDriverLock
(
priv
);
virDomainEventQueueDispatch
(
priv
->
domainEvents
,
priv
->
callbackList
,
virDomainEventDispatchDefaultFunc
,
NULL
);
virEventUpdateTimeout
(
priv
->
eventFlushTimer
,
-
1
);
remoteDriverUnlock
(
priv
);
}
/* For errors internal to this library. */
static
void
error
(
virConnectPtr
conn
,
virErrorNumber
code
,
const
char
*
info
)
...
...
@@ -6267,161 +6840,3 @@ remoteRegister (void)
return
0
;
}
/**
* remoteDomainReadEvent
*
* Read the event data off the wire
*/
static
virDomainEventPtr
remoteDomainReadEvent
(
virConnectPtr
conn
,
XDR
*
xdr
)
{
remote_domain_event_ret
ret
;
virDomainPtr
dom
;
virDomainEventPtr
event
=
NULL
;
memset
(
&
ret
,
0
,
sizeof
ret
);
/* unmarshall parameters, and process it*/
if
(
!
xdr_remote_domain_event_ret
(
xdr
,
&
ret
)
)
{
error
(
conn
,
VIR_ERR_RPC
,
_
(
"remoteDomainProcessEvent: unmarshalling ret"
));
return
NULL
;
}
dom
=
get_nonnull_domain
(
conn
,
ret
.
dom
);
if
(
!
dom
)
return
NULL
;
event
=
virDomainEventNewFromDom
(
dom
,
ret
.
event
,
ret
.
detail
);
virDomainFree
(
dom
);
return
event
;
}
static
void
remoteDomainProcessEvent
(
virConnectPtr
conn
,
XDR
*
xdr
)
{
struct
private_data
*
priv
=
conn
->
privateData
;
virDomainEventPtr
event
;
event
=
remoteDomainReadEvent
(
conn
,
xdr
);
if
(
!
event
)
return
;
DEBUG0
(
"Calling domain event callbacks (no queue)"
);
virDomainEventDispatch
(
event
,
priv
->
callbackList
,
virDomainEventDispatchDefaultFunc
,
NULL
);
virDomainEventFree
(
event
);
}
static
void
remoteDomainQueueEvent
(
virConnectPtr
conn
,
XDR
*
xdr
)
{
struct
private_data
*
priv
=
conn
->
privateData
;
virDomainEventPtr
event
;
event
=
remoteDomainReadEvent
(
conn
,
xdr
);
if
(
!
event
)
return
;
if
(
virDomainEventQueuePush
(
priv
->
domainEvents
,
event
)
<
0
)
{
DEBUG0
(
"Error adding event to queue"
);
virDomainEventFree
(
event
);
}
}
/** remoteDomainEventFired:
*
* The callback for monitoring the remote socket
* for event data
*/
void
remoteDomainEventFired
(
int
watch
,
int
fd
,
int
event
,
void
*
opaque
)
{
char
buffer
[
REMOTE_MESSAGE_MAX
];
char
buffer2
[
4
];
struct
remote_message_header
hdr
;
XDR
xdr
;
int
len
;
virConnectPtr
conn
=
opaque
;
struct
private_data
*
priv
=
conn
->
privateData
;
remoteDriverLock
(
priv
);
DEBUG
(
"Event fired %d %d %d %X"
,
watch
,
fd
,
event
,
event
);
if
(
event
&
(
VIR_EVENT_HANDLE_HANGUP
|
VIR_EVENT_HANDLE_ERROR
))
{
DEBUG
(
"%s : VIR_EVENT_HANDLE_HANGUP or "
"VIR_EVENT_HANDLE_ERROR encountered"
,
__FUNCTION__
);
virEventRemoveHandle
(
watch
);
goto
done
;
}
if
(
fd
!=
priv
->
sock
)
{
virEventRemoveHandle
(
watch
);
goto
done
;
}
/* Read and deserialise length word. */
if
(
really_read
(
conn
,
priv
,
0
,
buffer2
,
sizeof
buffer2
)
==
-
1
)
goto
done
;
xdrmem_create
(
&
xdr
,
buffer2
,
sizeof
buffer2
,
XDR_DECODE
);
if
(
!
xdr_int
(
&
xdr
,
&
len
))
{
error
(
conn
,
VIR_ERR_RPC
,
_
(
"xdr_int (length word, reply)"
));
goto
done
;
}
xdr_destroy
(
&
xdr
);
/* Length includes length word - adjust to real length to read. */
len
-=
4
;
if
(
len
<
0
||
len
>
REMOTE_MESSAGE_MAX
)
{
error
(
conn
,
VIR_ERR_RPC
,
_
(
"packet received from server too large"
));
goto
done
;
}
/* Read reply header and what follows (either a ret or an error). */
if
(
really_read
(
conn
,
priv
,
0
,
buffer
,
len
)
==
-
1
)
{
error
(
conn
,
VIR_ERR_RPC
,
_
(
"error reading buffer from memory"
));
goto
done
;
}
/* Deserialise reply header. */
xdrmem_create
(
&
xdr
,
buffer
,
len
,
XDR_DECODE
);
if
(
!
xdr_remote_message_header
(
&
xdr
,
&
hdr
))
{
error
(
conn
,
VIR_ERR_RPC
,
_
(
"invalid header in event firing"
));
goto
done
;
}
if
(
hdr
.
proc
==
REMOTE_PROC_DOMAIN_EVENT
&&
hdr
.
direction
==
REMOTE_MESSAGE
)
{
DEBUG0
(
"Encountered an async event"
);
remoteDomainProcessEvent
(
conn
,
&
xdr
);
}
else
{
DEBUG0
(
"invalid proc in event firing"
);
error
(
conn
,
VIR_ERR_RPC
,
_
(
"invalid proc in event firing"
));
}
done:
remoteDriverUnlock
(
priv
);
}
void
remoteDomainEventQueueFlush
(
int
timer
ATTRIBUTE_UNUSED
,
void
*
opaque
)
{
virConnectPtr
conn
=
opaque
;
struct
private_data
*
priv
=
conn
->
privateData
;
remoteDriverLock
(
priv
);
virDomainEventQueueDispatch
(
priv
->
domainEvents
,
priv
->
callbackList
,
virDomainEventDispatchDefaultFunc
,
NULL
);
virEventUpdateTimeout
(
priv
->
eventFlushTimer
,
-
1
);
remoteDriverUnlock
(
priv
);
}
src/util.c
浏览文件 @
61674cc1
...
...
@@ -34,6 +34,7 @@
#include <poll.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#if HAVE_SYS_WAIT_H
#include <sys/wait.h>
#endif
...
...
@@ -155,25 +156,35 @@ virArgvToString(const char *const *argv)
return
ret
;
}
#ifndef __MINGW32__
static
int
virSetCloseExec
(
int
fd
)
{
int
virSetNonBlock
(
int
fd
)
{
#ifndef WIN32
int
flags
;
if
((
flags
=
fcntl
(
fd
,
F_GETF
D
))
<
0
)
if
((
flags
=
fcntl
(
fd
,
F_GETF
L
))
<
0
)
return
-
1
;
flags
|=
FD_CLOEXEC
;
if
((
fcntl
(
fd
,
F_SETF
D
,
flags
))
<
0
)
flags
|=
O_NONBLOCK
;
if
((
fcntl
(
fd
,
F_SETF
L
,
flags
))
<
0
)
return
-
1
;
#else
unsigned
long
flag
=
1
;
/* This is actually Gnulib's replacement rpl_ioctl function.
* We can't call ioctlsocket directly in any case.
*/
if
(
ioctl
(
fd
,
FIONBIO
,
(
void
*
)
&
flag
)
==
-
1
)
return
-
1
;
#endif
return
0
;
}
static
int
virSetNonBlock
(
int
fd
)
{
#ifndef WIN32
static
int
virSetCloseExec
(
int
fd
)
{
int
flags
;
if
((
flags
=
fcntl
(
fd
,
F_GETF
L
))
<
0
)
if
((
flags
=
fcntl
(
fd
,
F_GETF
D
))
<
0
)
return
-
1
;
flags
|=
O_NONBLOCK
;
if
((
fcntl
(
fd
,
F_SETF
L
,
flags
))
<
0
)
flags
|=
FD_CLOEXEC
;
if
((
fcntl
(
fd
,
F_SETF
D
,
flags
))
<
0
)
return
-
1
;
return
0
;
}
...
...
src/util.h
浏览文件 @
61674cc1
...
...
@@ -38,6 +38,8 @@ enum {
VIR_EXEC_DAEMON
=
(
1
<<
1
),
};
int
virSetNonBlock
(
int
fd
);
int
virExec
(
virConnectPtr
conn
,
const
char
*
const
*
argv
,
const
char
*
const
*
envp
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录