diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c index 4b7d4a9863a5170ffa179ea4cbd31822fdc87052..93e9d8aa12c2ba44fdc6fcaf092deb0c1fd5bd7b 100644 --- a/src/rpc/virnetclient.c +++ b/src/rpc/virnetclient.c @@ -110,6 +110,97 @@ static void virNetClientIncomingEvent(virNetSocketPtr sock, int events, void *opaque); +/* Append a call to the end of the list */ +static void virNetClientCallQueue(virNetClientCallPtr *head, + virNetClientCallPtr call) +{ + virNetClientCallPtr tmp = *head; + while (tmp && tmp->next) { + tmp = tmp->next; + } + if (tmp) + tmp->next = call; + else + *head = call; + call->next = NULL; +} + +#if 0 +/* Obtain a call from the head of the list */ +static virNetClientCallPtr virNetClientCallServe(virNetClientCallPtr *head) +{ + virNetClientCallPtr tmp = *head; + if (tmp) + *head = tmp->next; + else + *head = NULL; + tmp->next = NULL; + return tmp; +} +#endif + +/* Remove a call from anywhere in the list */ +static void virNetClientCallRemove(virNetClientCallPtr *head, + virNetClientCallPtr call) +{ + virNetClientCallPtr tmp = *head; + virNetClientCallPtr prev = NULL; + while (tmp) { + if (tmp == call) { + if (prev) + prev->next = tmp->next; + else + *head = tmp->next; + tmp->next = NULL; + return; + } + prev = tmp; + tmp = tmp->next; + } +} + +/* Predicate returns true if matches */ +typedef bool (*virNetClientCallPredicate)(virNetClientCallPtr call, void *opaque); + +/* Remove a list of calls from the list based on a predicate */ +static void virNetClientCallRemovePredicate(virNetClientCallPtr *head, + virNetClientCallPredicate pred, + void *opaque) +{ + virNetClientCallPtr tmp = *head; + virNetClientCallPtr prev = NULL; + while (tmp) { + virNetClientCallPtr next = tmp->next; + tmp->next = NULL; /* Temp unlink */ + if (pred(tmp, opaque)) { + if (prev) + prev->next = next; + else + *head = next; + } else { + tmp->next = next; /* Reverse temp unlink */ + prev = tmp; + } + tmp = next; + } +} + +/* Returns true if the predicate matches at least one call in the list */ +static bool virNetClientCallMatchPredicate(virNetClientCallPtr head, + virNetClientCallPredicate pred, + void *opaque) +{ + virNetClientCallPtr tmp = head; + while (tmp) { + if (pred(tmp, opaque)) { + return true; + } + tmp = tmp->next; + } + return false; +} + + static void virNetClientEventFree(void *opaque) { virNetClientPtr client = opaque; @@ -896,6 +987,42 @@ virNetClientIOHandleInput(virNetClientPtr client) } +static bool virNetClientIOEventLoopPollEvents(virNetClientCallPtr call, + void *opaque) +{ + struct pollfd *fd = opaque; + + if (call->mode == VIR_NET_CLIENT_MODE_WAIT_RX) + fd->events |= POLLIN; + if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX) + fd->events |= POLLOUT; + + return false; +} + + +static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call, + void *opaque) +{ + virNetClientCallPtr thiscall = opaque; + + if (call == thiscall) + return false; + + if (call->mode != VIR_NET_CLIENT_MODE_COMPLETE) + return false; + + /* + * ...they won't actually wakeup until + * we release our mutex a short while + * later... + */ + VIR_DEBUG("Waking up sleeping call %p", call); + virCondSignal(&call->cond); + + return true; +} + /* * Process all calls pending dispatch/receive until we * get a reply to our own call. Then quit and pass the buck @@ -911,8 +1038,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client, fds[1].fd = client->wakeupReadFD; for (;;) { - virNetClientCallPtr tmp = client->waitDispatch; - virNetClientCallPtr prev; char ignore; sigset_t oldmask, blockedsigs; int timeout = -1; @@ -928,14 +1053,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client, fds[1].events = fds[1].revents = 0; fds[1].events = POLLIN; - while (tmp) { - if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_RX) - fds[0].events |= POLLIN; - if (tmp->mode == VIR_NET_CLIENT_MODE_WAIT_TX) - fds[0].events |= POLLOUT; - tmp = tmp->next; - } + /* Calculate poll events for calls */ + virNetClientCallMatchPredicate(client->waitDispatch, + virNetClientIOEventLoopPollEvents, + &fds[0]); /* We have to be prepared to receive stream data * regardless of whether any of the calls waiting @@ -1008,37 +1130,16 @@ static int virNetClientIOEventLoop(virNetClientPtr client, /* Iterate through waiting threads and if * any are complete then tell 'em to wakeup */ - tmp = client->waitDispatch; - prev = NULL; - while (tmp) { - if (tmp != thiscall && - tmp->mode == VIR_NET_CLIENT_MODE_COMPLETE) { - /* Take them out of the list */ - if (prev) - prev->next = tmp->next; - else - client->waitDispatch = tmp->next; - - /* And wake them up.... - * ...they won't actually wakeup until - * we release our mutex a short while - * later... - */ - VIR_DEBUG("Waking up sleep %p %p", tmp, client->waitDispatch); - virCondSignal(&tmp->cond); - } else { - prev = tmp; - } - tmp = tmp->next; - } + virNetClientCallRemovePredicate(&client->waitDispatch, + virNetClientIOEventLoopRemoveDone, + thiscall); /* Now see if *we* are done */ if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { - /* We're at head of the list already, so - * remove us - */ - client->waitDispatch = thiscall->next; + virNetClientCallRemove(&client->waitDispatch, thiscall); + VIR_DEBUG("Giving up the buck %p %p", thiscall, client->waitDispatch); + /* See if someone else is still waiting * and if so, then pass the buck ! */ if (client->waitDispatch) { @@ -1058,7 +1159,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client, error: - client->waitDispatch = thiscall->next; + virNetClientCallRemove(&client->waitDispatch, thiscall); VIR_DEBUG("Giving up the buck due to I/O error %p %p", thiscall, client->waitDispatch); /* See if someone else is still waiting * and if so, then pass the buck ! */ @@ -1119,22 +1220,13 @@ static int virNetClientIO(virNetClientPtr client, /* Check to see if another thread is dispatching */ if (client->waitDispatch) { - /* Stick ourselves on the end of the wait queue */ - virNetClientCallPtr tmp = client->waitDispatch; char ignore = 1; - while (tmp && tmp->next) - tmp = tmp->next; - if (tmp) - tmp->next = thiscall; - else - client->waitDispatch = thiscall; + /* Stick ourselves on the end of the wait queue */ + virNetClientCallQueue(&client->waitDispatch, thiscall); /* Force other thread to wakeup from poll */ if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) { - if (tmp) - tmp->next = NULL; - else - client->waitDispatch = NULL; + virNetClientCallRemove(&client->waitDispatch, thiscall); virReportSystemError(errno, "%s", _("failed to wake up polling thread")); return -1; @@ -1143,17 +1235,7 @@ static int virNetClientIO(virNetClientPtr client, VIR_DEBUG("Going to sleep %p %p", client->waitDispatch, thiscall); /* Go to sleep while other thread is working... */ if (virCondWait(&thiscall->cond, &client->lock) < 0) { - if (client->waitDispatch == thiscall) { - client->waitDispatch = thiscall->next; - } else { - tmp = client->waitDispatch; - while (tmp && tmp->next && - tmp->next != thiscall) { - tmp = tmp->next; - } - if (tmp && tmp->next == thiscall) - tmp->next = thiscall->next; - } + virNetClientCallRemove(&client->waitDispatch, thiscall); virNetError(VIR_ERR_INTERNAL_ERROR, "%s", _("failed to wait on condition")); return -1; @@ -1177,10 +1259,9 @@ static int virNetClientIO(virNetClientPtr client, } /* Grr, someone passed the buck onto us ... */ - } else { - /* We're first to catch the buck */ - client->waitDispatch = thiscall; + /* We're the first to arrive */ + virNetClientCallQueue(&client->waitDispatch, thiscall); } VIR_DEBUG("We have the buck %p %p", client->waitDispatch, thiscall);