提交 2fb13628 编写于 作者: J Jiri Denemark

Implement keepalive protocol in remote driver

上级 673adba5
...@@ -68,6 +68,7 @@ ...@@ -68,6 +68,7 @@
#endif #endif
static int inside_daemon = 0; static int inside_daemon = 0;
static virDriverPtr remoteDriver = NULL;
struct private_data { struct private_data {
virMutex lock; virMutex lock;
...@@ -84,6 +85,7 @@ struct private_data { ...@@ -84,6 +85,7 @@ struct private_data {
char *type; /* Cached return from remoteType. */ char *type; /* Cached return from remoteType. */
int localUses; /* Ref count for private data */ int localUses; /* Ref count for private data */
char *hostname; /* Original hostname */ char *hostname; /* Original hostname */
bool serverKeepAlive; /* Does server support keepalive protocol? */
virDomainEventStatePtr domainEventState; virDomainEventStatePtr domainEventState;
}; };
...@@ -667,6 +669,26 @@ doRemoteOpen (virConnectPtr conn, ...@@ -667,6 +669,26 @@ doRemoteOpen (virConnectPtr conn,
if (remoteAuthenticate(conn, priv, auth, authtype) == -1) if (remoteAuthenticate(conn, priv, auth, authtype) == -1)
goto failed; goto failed;
if (virNetClientKeepAliveIsSupported(priv->client)) {
remote_supports_feature_args args =
{ VIR_DRV_FEATURE_PROGRAM_KEEPALIVE };
remote_supports_feature_ret ret = { 0 };
int rc;
rc = call(conn, priv, 0, REMOTE_PROC_SUPPORTS_FEATURE,
(xdrproc_t)xdr_remote_supports_feature_args, (char *) &args,
(xdrproc_t)xdr_remote_supports_feature_ret, (char *) &ret);
if (rc == -1)
goto failed;
if (ret.supported) {
priv->serverKeepAlive = true;
} else {
VIR_INFO("Disabling keepalive protocol since it is not supported"
" by the server");
}
}
/* Finally we can call the remote side's open function. */ /* Finally we can call the remote side's open function. */
{ {
remote_open_args args = { &name, flags }; remote_open_args args = { &name, flags };
...@@ -4180,6 +4202,33 @@ done: ...@@ -4180,6 +4202,33 @@ done:
} }
static int
remoteSetKeepAlive(virConnectPtr conn, int interval, unsigned int count)
{
struct private_data *priv = conn->privateData;
int ret = -1;
remoteDriverLock(priv);
if (!virNetClientKeepAliveIsSupported(priv->client)) {
remoteError(VIR_ERR_INTERNAL_ERROR, "%s",
_("the caller doesn't support keepalive protocol;"
" perhaps it's missing event loop implementation"));
goto cleanup;
}
if (!priv->serverKeepAlive) {
ret = 1;
goto cleanup;
}
ret = virNetClientKeepAliveStart(priv->client, interval, count);
cleanup:
remoteDriverUnlock(priv);
return ret;
}
#include "remote_client_bodies.h" #include "remote_client_bodies.h"
#include "qemu_client_bodies.h" #include "qemu_client_bodies.h"
...@@ -4550,6 +4599,7 @@ static virDriver remote_driver = { ...@@ -4550,6 +4599,7 @@ static virDriver remote_driver = {
.domainGetBlockJobInfo = remoteDomainGetBlockJobInfo, /* 0.9.4 */ .domainGetBlockJobInfo = remoteDomainGetBlockJobInfo, /* 0.9.4 */
.domainBlockJobSetSpeed = remoteDomainBlockJobSetSpeed, /* 0.9.4 */ .domainBlockJobSetSpeed = remoteDomainBlockJobSetSpeed, /* 0.9.4 */
.domainBlockPull = remoteDomainBlockPull, /* 0.9.4 */ .domainBlockPull = remoteDomainBlockPull, /* 0.9.4 */
.setKeepAlive = remoteSetKeepAlive, /* 0.9.7 */
}; };
static virNetworkDriver network_driver = { static virNetworkDriver network_driver = {
...@@ -4700,6 +4750,8 @@ static virStateDriver state_driver = { ...@@ -4700,6 +4750,8 @@ static virStateDriver state_driver = {
int int
remoteRegister (void) remoteRegister (void)
{ {
remoteDriver = &remote_driver;
if (virRegisterDriver (&remote_driver) == -1) return -1; if (virRegisterDriver (&remote_driver) == -1) return -1;
if (virRegisterNetworkDriver (&network_driver) == -1) return -1; if (virRegisterNetworkDriver (&network_driver) == -1) return -1;
if (virRegisterInterfaceDriver (&interface_driver) == -1) return -1; if (virRegisterInterfaceDriver (&interface_driver) == -1) return -1;
......
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
#include "virnetclient.h" #include "virnetclient.h"
#include "virnetsocket.h" #include "virnetsocket.h"
#include "virkeepalive.h"
#include "memory.h" #include "memory.h"
#include "threads.h" #include "threads.h"
#include "virfile.h" #include "virfile.h"
...@@ -102,11 +103,12 @@ struct _virNetClient { ...@@ -102,11 +103,12 @@ struct _virNetClient {
size_t nstreams; size_t nstreams;
virNetClientStreamPtr *streams; virNetClientStreamPtr *streams;
virKeepAlivePtr keepalive;
bool wantClose; bool wantClose;
}; };
void virNetClientRequestClose(virNetClientPtr client); static void virNetClientRequestClose(virNetClientPtr client);
static void virNetClientLock(virNetClientPtr client) static void virNetClientLock(virNetClientPtr client)
{ {
...@@ -222,11 +224,56 @@ static void virNetClientEventFree(void *opaque) ...@@ -222,11 +224,56 @@ static void virNetClientEventFree(void *opaque)
virNetClientFree(client); virNetClientFree(client);
} }
bool
virNetClientKeepAliveIsSupported(virNetClientPtr client)
{
bool supported;
virNetClientLock(client);
supported = !!client->keepalive;
virNetClientUnlock(client);
return supported;
}
int
virNetClientKeepAliveStart(virNetClientPtr client,
int interval,
unsigned int count)
{
int ret;
virNetClientLock(client);
ret = virKeepAliveStart(client->keepalive, interval, count);
virNetClientUnlock(client);
return ret;
}
static void
virNetClientKeepAliveDeadCB(void *opaque)
{
virNetClientRequestClose(opaque);
}
static int
virNetClientKeepAliveSendCB(void *opaque,
virNetMessagePtr msg)
{
int ret;
ret = virNetClientSendNonBlock(opaque, msg);
if (ret != -1 && ret != 1)
virNetMessageFree(msg);
return ret;
}
static virNetClientPtr virNetClientNew(virNetSocketPtr sock, static virNetClientPtr virNetClientNew(virNetSocketPtr sock,
const char *hostname) const char *hostname)
{ {
virNetClientPtr client = NULL; virNetClientPtr client = NULL;
int wakeupFD[2] = { -1, -1 }; int wakeupFD[2] = { -1, -1 };
virKeepAlivePtr ka = NULL;
if (pipe2(wakeupFD, O_CLOEXEC) < 0) { if (pipe2(wakeupFD, O_CLOEXEC) < 0) {
virReportSystemError(errno, "%s", virReportSystemError(errno, "%s",
...@@ -259,13 +306,24 @@ static virNetClientPtr virNetClientNew(virNetSocketPtr sock, ...@@ -259,13 +306,24 @@ static virNetClientPtr virNetClientNew(virNetSocketPtr sock,
client, client,
virNetClientEventFree) < 0) { virNetClientEventFree) < 0) {
client->refs--; client->refs--;
VIR_DEBUG("Failed to add event watch, disabling events"); VIR_DEBUG("Failed to add event watch, disabling events and support for"
" keepalive messages");
} else {
/* Keepalive protocol consists of async messages so it can only be used
* if the client supports them */
if (!(ka = virKeepAliveNew(-1, 0, client,
virNetClientKeepAliveSendCB,
virNetClientKeepAliveDeadCB,
virNetClientEventFree)))
goto error;
/* keepalive object has a reference to client */
client->refs++;
} }
client->keepalive = ka;
PROBE(RPC_CLIENT_NEW, PROBE(RPC_CLIENT_NEW,
"client=%p refs=%d sock=%p", "client=%p refs=%d sock=%p",
client, client->refs, client->sock); client, client->refs, client->sock);
return client; return client;
no_memory: no_memory:
...@@ -273,6 +331,10 @@ no_memory: ...@@ -273,6 +331,10 @@ no_memory:
error: error:
VIR_FORCE_CLOSE(wakeupFD[0]); VIR_FORCE_CLOSE(wakeupFD[0]);
VIR_FORCE_CLOSE(wakeupFD[1]); VIR_FORCE_CLOSE(wakeupFD[1]);
if (ka) {
virKeepAliveStop(ka);
virKeepAliveFree(ka);
}
virNetClientFree(client); virNetClientFree(client);
return NULL; return NULL;
} }
...@@ -416,6 +478,8 @@ void virNetClientFree(virNetClientPtr client) ...@@ -416,6 +478,8 @@ void virNetClientFree(virNetClientPtr client)
static void static void
virNetClientCloseLocked(virNetClientPtr client) virNetClientCloseLocked(virNetClientPtr client)
{ {
virKeepAlivePtr ka;
VIR_DEBUG("client=%p, sock=%p", client, client->sock); VIR_DEBUG("client=%p, sock=%p", client, client->sock);
if (!client->sock) if (!client->sock)
...@@ -430,7 +494,20 @@ virNetClientCloseLocked(virNetClientPtr client) ...@@ -430,7 +494,20 @@ virNetClientCloseLocked(virNetClientPtr client)
virNetSASLSessionFree(client->sasl); virNetSASLSessionFree(client->sasl);
client->sasl = NULL; client->sasl = NULL;
#endif #endif
ka = client->keepalive;
client->keepalive = NULL;
client->wantClose = false; client->wantClose = false;
if (ka) {
client->refs++;
virNetClientUnlock(client);
virKeepAliveStop(ka);
virKeepAliveFree(ka);
virNetClientLock(client);
client->refs--;
}
} }
void virNetClientClose(virNetClientPtr client) void virNetClientClose(virNetClientPtr client)
...@@ -443,7 +520,7 @@ void virNetClientClose(virNetClientPtr client) ...@@ -443,7 +520,7 @@ void virNetClientClose(virNetClientPtr client)
virNetClientUnlock(client); virNetClientUnlock(client);
} }
void static void
virNetClientRequestClose(virNetClientPtr client) virNetClientRequestClose(virNetClientPtr client)
{ {
VIR_DEBUG("client=%p", client); VIR_DEBUG("client=%p", client);
...@@ -844,6 +921,9 @@ virNetClientCallDispatch(virNetClientPtr client) ...@@ -844,6 +921,9 @@ virNetClientCallDispatch(virNetClientPtr client)
client->msg.header.prog, client->msg.header.vers, client->msg.header.proc, client->msg.header.prog, client->msg.header.vers, client->msg.header.proc,
client->msg.header.type, client->msg.header.status, client->msg.header.serial); client->msg.header.type, client->msg.header.status, client->msg.header.serial);
if (virKeepAliveCheckMessage(client->keepalive, &client->msg))
return 0;
switch (client->msg.header.type) { switch (client->msg.header.type) {
case VIR_NET_REPLY: /* Normal RPC replies */ case VIR_NET_REPLY: /* Normal RPC replies */
case VIR_NET_REPLY_WITH_FDS: /* Normal RPC replies with FDs */ case VIR_NET_REPLY_WITH_FDS: /* Normal RPC replies with FDs */
......
...@@ -95,4 +95,9 @@ int virNetClientGetTLSKeySize(virNetClientPtr client); ...@@ -95,4 +95,9 @@ int virNetClientGetTLSKeySize(virNetClientPtr client);
void virNetClientFree(virNetClientPtr client); void virNetClientFree(virNetClientPtr client);
void virNetClientClose(virNetClientPtr client); void virNetClientClose(virNetClientPtr client);
bool virNetClientKeepAliveIsSupported(virNetClientPtr client);
int virNetClientKeepAliveStart(virNetClientPtr client,
int interval,
unsigned int count);
#endif /* __VIR_NET_CLIENT_H__ */ #endif /* __VIR_NET_CLIENT_H__ */
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册