提交 4a00119a 编写于 作者: D Daniel P. Berrange

Use a thread pool for RPC dispatch

上级 f8a51923
Thu Dec 4 22:16:41 GMT 2008 Daniel P. Berrange <berrange@redhat.com>
Make daemon use a thread pool for handling RPC calls
from multiple clients.
* qemud/THREADING.txt: Describe threading rules
* qemud/qemud.h, qemud/qemud.c, src/remote.c: Add a pool
of threads for handling RPC call processing. Use main
thread only for event loop
* qemud/remote_generate_stubs.pl: Pass virConnectPtr object
into dispatch stubs to avoid need to access client struct
* qemud/remote_dispatch_prototypes.h: Re-generated
Thu Dec 4 22:14:41 GMT 2008 Daniel P. Berrange <berrange@redhat.com>
* src/event.c, src/event.h: Make all event handling thread
......
Threading: the RULES.
====================
If you don't understand this, don't touch the code. Ask for
further advice / explanation on the mailing list first.
- the top level lock is on 'struct qemud_server'. This must be
held before acquiring any other lock
- Each 'struct qemud_client' object has a lock. The server lock
must be held before acquiring it. Once the client lock is acquired
the server lock can (optionally) be dropped.
- The event loop has its own self-contained lock. You can ignore
this as a caller of virEvent APIs.
The server lock is only needed / used once the daemon has entered
its main loop, which is the qemudRunLoop() . The initial thread
acquires the lock upon entering this method.
It immediatelty spawns 'n' worker threads, whose main loop is
the qemudWorker() method. The workers will immediately try to
acquire the server lock, and thus block since its held by the
initial thread.
When the initial thread enters the poll() call, it drops the
server lock. The worker locks now each wakeup, acquire the
server lock and go into a condition wait on the 'job' condition
variable. The workers are now all 'primed' for incoming RPC
calls.
A file descriptor event now occurrs, causing the initial thread
to exit poll(). It invokes the registered callback associated
with the file descriptors on which the event occurrs. The callbacks
are required to immediately acquire the server lock.
If the callback is dealing with a client event, it will then
acquire the client lock, and drop the server lock.
The callback will now handle the I/O event, reading or writing
a RPC message. Once a complete RPC message has been read the
client is marked as being in state QEMUD_MODE_WAIT_DISPATCH,
and the 'job' condition variable is signaled. The callback
now drops the client lock and goes back into the poll() loop
waiting for more I/O events.
Meanwhile one of the worker threads wakes up from its condition
variable sleep, holding the server lock. It now searches for a
client in state QEMUD_MODE_WAIT_DISPATCH. If it doesn't find
one, it goes back to sleep. If it does find one, then it calls
into the remoteDispatchClientRequest() method de-serialize the
incoming message into an XDR object and invoke the helper method
for the associated RPC call.
While the helper method is executing, no locks are held on either
the client or server, but the ref count on the 'struct qemud_client'
object is incremented to ensure its not deleted. The helper can
now safely invoke the neccessary libvirt API call.
......@@ -263,9 +263,12 @@ qemudDispatchSignalEvent(int watch ATTRIBUTE_UNUSED,
siginfo_t siginfo;
int ret;
pthread_mutex_lock(&server->lock);
if (saferead(server->sigread, &siginfo, sizeof(siginfo)) != sizeof(siginfo)) {
qemudLog(QEMUD_ERR, _("Failed to read from signal pipe: %s"),
strerror(errno));
pthread_mutex_unlock(&server->lock);
return;
}
......@@ -294,6 +297,8 @@ qemudDispatchSignalEvent(int watch ATTRIBUTE_UNUSED,
if (ret != 0)
server->shutdown = 1;
pthread_mutex_unlock(&server->lock);
}
int qemudSetCloseExec(int fd) {
......@@ -751,6 +756,11 @@ static struct qemud_server *qemudInitialize(int sigread) {
return NULL;
}
if (pthread_mutex_init(&server->lock, NULL) != 0) {
VIR_FREE(server);
return NULL;
}
server->sigread = sigread;
if (virEventInit() < 0) {
......@@ -1182,6 +1192,9 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
if (VIR_ALLOC(client) < 0)
goto cleanup;
if (pthread_mutex_init(&client->lock, NULL) != 0)
goto cleanup;
client->magic = QEMUD_CLIENT_MAGIC;
client->fd = fd;
client->readonly = sock->readonly;
......@@ -1255,31 +1268,23 @@ static int qemudDispatchServer(struct qemud_server *server, struct qemud_socket
return 0;
cleanup:
if (client->tlssession) gnutls_deinit (client->tlssession);
if (client &&
client->tlssession) gnutls_deinit (client->tlssession);
close (fd);
free (client);
return -1;
}
static void qemudDispatchClientFailure(struct qemud_server *server, struct qemud_client *client) {
int i, n = -1;
for (i = 0 ; i < server->nclients ; i++) {
if (server->clients[i] == client) {
n = i;
break;
}
}
if (n != -1) {
if (n < (server->nclients-1))
memmove(server->clients + n,
server->clients + n + 1,
server->nclients - (n + 1));
server->nclients--;
}
/*
* You must hold lock for at least the client
* We don't free stuff here, merely disconnect the client's
* network socket & resources.
* We keep the libvirt connection open until any async
* jobs have finished, then clean it up elsehwere
*/
static void qemudDispatchClientFailure(struct qemud_server *server ATTRIBUTE_UNUSED,
struct qemud_client *client) {
virEventRemoveHandleImpl(client->watch);
/* Deregister event delivery callback */
......@@ -1288,18 +1293,63 @@ static void qemudDispatchClientFailure(struct qemud_server *server, struct qemud
virConnectDomainEventDeregister(client->conn, remoteRelayDomainEvent);
}
if (client->conn)
virConnectClose(client->conn);
#if HAVE_SASL
if (client->saslconn) sasl_dispose(&client->saslconn);
free(client->saslUsername);
#endif
if (client->tlssession) gnutls_deinit (client->tlssession);
close(client->fd);
free(client);
client->fd = -1;
}
/* Caller must hold server lock */
static struct qemud_client *qemudPendingJob(struct qemud_server *server)
{
int i;
for (i = 0 ; i < server->nclients ; i++) {
pthread_mutex_lock(&server->clients[i]->lock);
if (server->clients[i]->mode == QEMUD_MODE_WAIT_DISPATCH) {
/* Delibrately don't unlock client - caller wants the lock */
return server->clients[i];
}
pthread_mutex_unlock(&server->clients[i]->lock);
}
return NULL;
}
static void *qemudWorker(void *data)
{
struct qemud_server *server = data;
while (1) {
struct qemud_client *client;
int len;
pthread_mutex_lock(&server->lock);
while ((client = qemudPendingJob(server)) == NULL)
pthread_cond_wait(&server->job, &server->lock);
pthread_mutex_unlock(&server->lock);
/* We own a locked client now... */
client->mode = QEMUD_MODE_IN_DISPATCH;
client->refs++;
if ((len = remoteDispatchClientRequest (server, client)) == 0)
qemudDispatchClientFailure(server, client);
/* Set up the output buffer. */
client->mode = QEMUD_MODE_TX_PACKET;
client->bufferLength = len;
client->bufferOffset = 0;
if (qemudRegisterClientEvent(server, client, 1) < 0)
qemudDispatchClientFailure(server, client);
client->refs--;
pthread_mutex_unlock(&client->lock);
pthread_mutex_unlock(&server->lock);
}
}
static int qemudClientReadBuf(struct qemud_server *server,
......@@ -1465,17 +1515,12 @@ static void qemudDispatchClientRead(struct qemud_server *server, struct qemud_cl
if (client->bufferOffset < client->bufferLength)
return; /* Not read enough */
if ((len = remoteDispatchClientRequest (server, client)) == 0)
qemudDispatchClientFailure(server, client);
/* Set up the output buffer. */
client->mode = QEMUD_MODE_TX_PACKET;
client->bufferLength = len;
client->bufferOffset = 0;
client->mode = QEMUD_MODE_WAIT_DISPATCH;
if (qemudRegisterClientEvent(server, client, 1) < 0)
qemudDispatchClientFailure(server, client);
pthread_cond_signal(&server->job);
break;
}
......@@ -1607,13 +1652,17 @@ qemudDispatchClientWrite(struct qemud_server *server,
return;
if (client->bufferOffset == client->bufferLength) {
/* Done writing, switch back to receive */
client->mode = QEMUD_MODE_RX_HEADER;
client->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
client->bufferOffset = 0;
if (qemudRegisterClientEvent (server, client, 1) < 0)
if (client->closing) {
qemudDispatchClientFailure (server, client);
} else {
/* Done writing, switch back to receive */
client->mode = QEMUD_MODE_RX_HEADER;
client->bufferLength = REMOTE_MESSAGE_HEADER_XDR_LEN;
client->bufferOffset = 0;
if (qemudRegisterClientEvent (server, client, 1) < 0)
qemudDispatchClientFailure (server, client);
}
}
/* Still writing */
break;
......@@ -1655,6 +1704,8 @@ qemudDispatchClientEvent(int watch, int fd, int events, void *opaque) {
struct qemud_client *client = NULL;
int i;
pthread_mutex_lock(&server->lock);
for (i = 0 ; i < server->nclients ; i++) {
if (server->clients[i]->watch == watch) {
client = server->clients[i];
......@@ -1662,8 +1713,13 @@ qemudDispatchClientEvent(int watch, int fd, int events, void *opaque) {
}
}
if (!client)
if (!client) {
pthread_mutex_unlock(&server->lock);
return;
}
pthread_mutex_lock(&client->lock);
pthread_mutex_unlock(&server->lock);
if (client->fd != fd)
return;
......@@ -1674,11 +1730,12 @@ qemudDispatchClientEvent(int watch, int fd, int events, void *opaque) {
qemudDispatchClientRead(server, client);
else
qemudDispatchClientFailure(server, client);
pthread_mutex_unlock(&client->lock);
}
static int qemudRegisterClientEvent(struct qemud_server *server,
struct qemud_client *client,
int removeFirst) {
int update) {
int mode;
switch (client->mode) {
case QEMUD_MODE_TLS_HANDSHAKE:
......@@ -1697,20 +1754,23 @@ static int qemudRegisterClientEvent(struct qemud_server *server,
mode = VIR_EVENT_HANDLE_WRITABLE;
break;
case QEMUD_MODE_WAIT_DISPATCH:
mode = 0;
break;
default:
return -1;
}
if (removeFirst)
if (virEventRemoveHandleImpl(client->watch) < 0)
return -1;
if ((client->watch = virEventAddHandleImpl(client->fd,
mode | VIR_EVENT_HANDLE_ERROR |
VIR_EVENT_HANDLE_HANGUP,
qemudDispatchClientEvent,
server, NULL)) < 0)
if (update) {
virEventUpdateHandleImpl(client->watch, mode);
} else {
if ((client->watch = virEventAddHandleImpl(client->fd,
mode,
qemudDispatchClientEvent,
server, NULL)) < 0)
return -1;
}
return 0;
}
......@@ -1718,7 +1778,11 @@ static int qemudRegisterClientEvent(struct qemud_server *server,
static void
qemudDispatchServerEvent(int watch, int fd, int events, void *opaque) {
struct qemud_server *server = (struct qemud_server *)opaque;
struct qemud_socket *sock = server->sockets;
struct qemud_socket *sock;
pthread_mutex_lock(&server->lock);
sock = server->sockets;
while (sock) {
if (sock->watch == watch)
......@@ -1727,14 +1791,10 @@ qemudDispatchServerEvent(int watch, int fd, int events, void *opaque) {
sock = sock->next;
}
if (!sock)
return;
if (sock->fd != fd)
return;
if (events)
if (sock && sock->fd == fd && events)
qemudDispatchServer(server, sock);
pthread_mutex_unlock(&server->lock);
}
......@@ -1768,6 +1828,26 @@ static void qemudInactiveTimer(int timer ATTRIBUTE_UNUSED, void *data) {
static int qemudRunLoop(struct qemud_server *server) {
int timerid = -1;
int ret = -1, i;
pthread_mutex_lock(&server->lock);
server->nworkers = 10;
if (VIR_ALLOC_N(server->workers, server->nworkers) < 0) {
qemudLog(QEMUD_ERR, "%s", _("Failed to allocate workers"));
return -1;
}
for (i = 0 ; i < server->nworkers ; i++) {
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, 1);
pthread_create(&server->workers[i],
&attr,
qemudWorker,
server);
}
for (;;) {
/* A shutdown timeout is specified, so check
......@@ -1781,8 +1861,31 @@ static int qemudRunLoop(struct qemud_server *server) {
qemudDebug("Scheduling shutdown timer %d", timerid);
}
pthread_mutex_unlock(&server->lock);
if (qemudOneLoop() < 0)
break;
pthread_mutex_lock(&server->lock);
reprocess:
for (i = 0 ; i < server->nclients ; i++) {
int inactive;
pthread_mutex_lock(&server->clients[i]->lock);
inactive = server->clients[i]->fd == -1
&& server->clients[i]->refs == 0;
pthread_mutex_unlock(&server->clients[i]->lock);
if (inactive) {
if (server->clients[i]->conn)
virConnectClose(server->clients[i]->conn);
VIR_FREE(server->clients[i]);
server->nclients--;
if (i < server->nclients) {
memmove(server->clients + i,
server->clients + i + 1,
server->nclients - i);
goto reprocess;
}
}
}
/* Unregister any timeout that's active, since we
* just had an event processed
......@@ -1793,11 +1896,21 @@ static int qemudRunLoop(struct qemud_server *server) {
timerid = -1;
}
if (server->shutdown)
return 0;
if (server->shutdown) {
ret = 0;
break;
}
}
return -1;
for (i = 0 ; i < server->nworkers ; i++) {
pthread_t thread = server->workers[i];
pthread_mutex_unlock(&server->lock);
pthread_join(thread, NULL);
pthread_mutex_lock(&server->lock);
}
pthread_mutex_unlock(&server->lock);
return ret;
}
static void qemudCleanup(struct qemud_server *server) {
......
......@@ -72,10 +72,12 @@ typedef enum {
enum qemud_mode {
QEMUD_MODE_RX_HEADER,
QEMUD_MODE_RX_PAYLOAD,
QEMUD_MODE_TX_PACKET,
QEMUD_MODE_TLS_HANDSHAKE,
QEMUD_MODE_RX_HEADER, /* Receiving the fixed length RPC header data */
QEMUD_MODE_RX_PAYLOAD, /* Receiving the variable length RPC payload data */
QEMUD_MODE_WAIT_DISPATCH, /* Message received, waiting for worker to process */
QEMUD_MODE_IN_DISPATCH, /* RPC call being processed */
QEMUD_MODE_TX_PACKET, /* Transmitting reply to RPC call */
QEMUD_MODE_TLS_HANDSHAKE, /* Performing TLS handshake */
};
/* Whether we're passing reads & writes through a sasl SSF */
......@@ -93,11 +95,14 @@ enum qemud_sock_type {
/* Stores the per-client connection state */
struct qemud_client {
PTHREAD_MUTEX_T(lock);
int magic;
int fd;
int watch;
int readonly;
int readonly:1;
int closing:1;
enum qemud_mode mode;
struct sockaddr_storage addr;
......@@ -130,6 +135,7 @@ struct qemud_client {
* called, it will be set back to NULL if that succeeds.
*/
virConnectPtr conn;
int refs;
/* back-pointer to our server */
struct qemud_server *server;
......@@ -150,10 +156,16 @@ struct qemud_socket {
/* Main server state */
struct qemud_server {
pthread_mutex_t lock;
pthread_cond_t job;
int nworkers;
pthread_t *workers;
int nsockets;
struct qemud_socket *sockets;
int nclients;
struct qemud_client **clients;
int sigread;
char logDir[PATH_MAX];
unsigned int shutdown : 1;
......
此差异已折叠。
此差异已折叠。
......@@ -100,7 +100,13 @@ if ($opt_d) {
elsif ($opt_p) {
my @keys = sort (keys %calls);
foreach (@keys) {
print "static int remoteDispatch$calls{$_}->{ProcName} (struct qemud_server *server, struct qemud_client *client, remote_error *err, $calls{$_}->{args} *args, $calls{$_}->{ret} *ret);\n";
print "static int remoteDispatch$calls{$_}->{ProcName}(\n";
print " struct qemud_server *server,\n";
print " struct qemud_client *client,\n";
print " virConnectPtr conn,\n";
print " remote_error *err,\n";
print " $calls{$_}->{args} *args,\n";
print " $calls{$_}->{ret} *ret);\n";
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册