提交 78602c4e 编写于 作者: J Jiri Denemark

client rpc: Don't drop non-blocking calls

So far, we were dropping non-blocking calls whenever sending them would
block. In case a client is sending lots of stream calls (which are not
supposed to generate any reply), the assumption that having other calls
in a queue is sufficient to get a reply from the server doesn't work. I
tried to fix this in b1e374a7 but
failed and reverted that commit.

With this patch, non-blocking calls are never dropped (unless the
connection is being closed) and will always be sent.
上级 9e747e5c
...@@ -58,7 +58,6 @@ struct _virNetClientCall { ...@@ -58,7 +58,6 @@ struct _virNetClientCall {
bool expectReply; bool expectReply;
bool nonBlock; bool nonBlock;
bool haveThread; bool haveThread;
bool sentSomeData;
virCond cond; virCond cond;
...@@ -108,6 +107,10 @@ struct _virNetClient { ...@@ -108,6 +107,10 @@ struct _virNetClient {
}; };
static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
virNetClientCallPtr thiscall);
static void virNetClientLock(virNetClientPtr client) static void virNetClientLock(virNetClientPtr client)
{ {
virMutexLock(&client->lock); virMutexLock(&client->lock);
...@@ -525,19 +528,21 @@ void virNetClientClose(virNetClientPtr client) ...@@ -525,19 +528,21 @@ void virNetClientClose(virNetClientPtr client)
virNetClientLock(client); virNetClientLock(client);
/* If there is a thread polling for data on the socket, set wantClose flag client->wantClose = true;
* and wake the thread up or just immediately close the socket when no-one
* is polling on it. /* If there is a thread polling for data on the socket, wake the thread up
* otherwise try to pass the buck to a possibly waiting thread. If no
* thread is waiting, virNetClientIOEventLoopPassTheBuck will clean the
* queue and close the client because we set client->wantClose.
*/ */
if (client->waitDispatch) { if (client->haveTheBuck) {
char ignore = 1; char ignore = 1;
size_t len = sizeof(ignore); size_t len = sizeof(ignore);
client->wantClose = true;
if (safewrite(client->wakeupSendFD, &ignore, len) != len) if (safewrite(client->wakeupSendFD, &ignore, len) != len)
VIR_ERROR(_("failed to wake up polling thread")); VIR_ERROR(_("failed to wake up polling thread"));
} else { } else {
virNetClientCloseLocked(client); virNetClientIOEventLoopPassTheBuck(client, NULL);
} }
virNetClientUnlock(client); virNetClientUnlock(client);
...@@ -972,8 +977,6 @@ virNetClientIOWriteMessage(virNetClientPtr client, ...@@ -972,8 +977,6 @@ virNetClientIOWriteMessage(virNetClientPtr client,
ret = virNetSocketWrite(client->sock, ret = virNetSocketWrite(client->sock,
thecall->msg->buffer + thecall->msg->bufferOffset, thecall->msg->buffer + thecall->msg->bufferOffset,
thecall->msg->bufferLength - thecall->msg->bufferOffset); thecall->msg->bufferLength - thecall->msg->bufferOffset);
if (ret > 0 || virNetSocketHasPendingData(client->sock))
thecall->sentSomeData = true;
if (ret <= 0) if (ret <= 0)
return ret; return ret;
...@@ -1197,71 +1200,43 @@ static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call, ...@@ -1197,71 +1200,43 @@ static bool virNetClientIOEventLoopRemoveDone(virNetClientCallPtr call,
} }
static bool virNetClientIOEventLoopRemoveNonBlocking(virNetClientCallPtr call, static bool
void *opaque) virNetClientIOEventLoopDetachNonBlocking(virNetClientCallPtr call,
void *opaque)
{ {
virNetClientCallPtr thiscall = opaque; virNetClientCallPtr thiscall = opaque;
if (call == thiscall) if (call != thiscall && call->nonBlock && call->haveThread) {
return false; VIR_DEBUG("Waking up sleep %p", call);
call->haveThread = false;
if (!call->nonBlock) virCondSignal(&call->cond);
return false;
if (call->sentSomeData) {
/*
* If some data has been sent we must keep it in the list,
* but still wakeup any thread
*/
if (call->haveThread) {
VIR_DEBUG("Waking up sleep %p", call);
virCondSignal(&call->cond);
} else {
VIR_DEBUG("Keeping unfinished call %p in the list", call);
}
return false;
} else {
/*
* If no data has been sent, we can remove it from the list.
* Wakup any thread, otherwise free the caller ourselves
*/
if (call->haveThread) {
VIR_DEBUG("Waking up sleep %p", call);
virCondSignal(&call->cond);
} else {
VIR_DEBUG("Removing call %p", call);
if (call->expectReply)
VIR_WARN("Got a call expecting a reply but without a waiting thread");
ignore_value(virCondDestroy(&call->cond));
VIR_FREE(call->msg);
VIR_FREE(call);
}
return true; return true;
} }
return false;
} }
static void static bool
virNetClientIOEventLoopRemoveAll(virNetClientPtr client, virNetClientIOEventLoopRemoveAll(virNetClientCallPtr call,
virNetClientCallPtr thiscall) void *opaque)
{ {
if (!client->waitDispatch) virNetClientCallPtr thiscall = opaque;
return;
if (client->waitDispatch == thiscall) { if (call == thiscall)
/* just pretend nothing was sent and the caller will free the call */ return false;
thiscall->sentSomeData = false;
} else { VIR_DEBUG("Removing call %p", call);
virNetClientCallPtr call = client->waitDispatch; ignore_value(virCondDestroy(&call->cond));
virNetClientCallRemove(&client->waitDispatch, call); VIR_FREE(call->msg);
ignore_value(virCondDestroy(&call->cond)); VIR_FREE(call);
VIR_FREE(call->msg); return true;
VIR_FREE(call);
}
} }
static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetClientCallPtr thiscall) static void
virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
virNetClientCallPtr thiscall)
{ {
VIR_DEBUG("Giving up the buck %p", thiscall); VIR_DEBUG("Giving up the buck %p", thiscall);
virNetClientCallPtr tmp = client->waitDispatch; virNetClientCallPtr tmp = client->waitDispatch;
...@@ -1280,14 +1255,18 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli ...@@ -1280,14 +1255,18 @@ static void virNetClientIOEventLoopPassTheBuck(virNetClientPtr client, virNetCli
VIR_DEBUG("No thread to pass the buck to"); VIR_DEBUG("No thread to pass the buck to");
if (client->wantClose) { if (client->wantClose) {
virNetClientCloseLocked(client); virNetClientCloseLocked(client);
virNetClientIOEventLoopRemoveAll(client, thiscall); virNetClientCallRemovePredicate(&client->waitDispatch,
virNetClientIOEventLoopRemoveAll,
thiscall);
} }
} }
static bool virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call, void *opaque ATTRIBUTE_UNUSED) static bool
virNetClientIOEventLoopWantNonBlock(virNetClientCallPtr call,
void *opaque ATTRIBUTE_UNUSED)
{ {
return call->nonBlock; return call->nonBlock && call->haveThread;
} }
/* /*
...@@ -1320,8 +1299,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client, ...@@ -1320,8 +1299,8 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
if (virNetSocketHasCachedData(client->sock) || client->wantClose) if (virNetSocketHasCachedData(client->sock) || client->wantClose)
timeout = 0; timeout = 0;
/* If there are any non-blocking calls in the queue, /* If there are any non-blocking calls with an associated thread
* then we don't want to sleep in poll() * in the queue, then we don't want to sleep in poll()
*/ */
if (virNetClientCallMatchPredicate(client->waitDispatch, if (virNetClientCallMatchPredicate(client->waitDispatch,
virNetClientIOEventLoopWantNonBlock, virNetClientIOEventLoopWantNonBlock,
...@@ -1394,12 +1373,15 @@ static int virNetClientIOEventLoop(virNetClientPtr client, ...@@ -1394,12 +1373,15 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
} }
/* If we were woken up because a new non-blocking call was queued, /* If we were woken up because a new non-blocking call was queued,
* we need to re-poll to check if we can send it. * we need to re-poll to check if we can send it. To be precise, we
* will re-poll even if a blocking call arrived when unhandled
* non-blocking calls are still in the queue. But this can't hurt.
*/ */
if (virNetClientCallMatchPredicate(client->waitDispatch, if (virNetClientCallMatchPredicate(client->waitDispatch,
virNetClientIOEventLoopWantNonBlock, virNetClientIOEventLoopWantNonBlock,
NULL)) { NULL)) {
VIR_DEBUG("New non-blocking call arrived; repolling"); VIR_DEBUG("The queue contains new non-blocking call(s);"
" repolling");
continue; continue;
} }
} }
...@@ -1424,18 +1406,18 @@ static int virNetClientIOEventLoop(virNetClientPtr client, ...@@ -1424,18 +1406,18 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
} }
/* Iterate through waiting calls and if any are /* Iterate through waiting calls and if any are
* complete, remove them from the dispatch list.. * complete, remove them from the dispatch list.
*/ */
virNetClientCallRemovePredicate(&client->waitDispatch, virNetClientCallRemovePredicate(&client->waitDispatch,
virNetClientIOEventLoopRemoveDone, virNetClientIOEventLoopRemoveDone,
thiscall); thiscall);
/* Iterate through waiting calls and if any are /* Iterate through waiting calls and wake up and detach threads
* non-blocking, remove them from the dispatch list... * attached to non-blocking calls.
*/ */
virNetClientCallRemovePredicate(&client->waitDispatch, virNetClientCallMatchPredicate(client->waitDispatch,
virNetClientIOEventLoopRemoveNonBlocking, virNetClientIOEventLoopDetachNonBlocking,
thiscall); thiscall);
/* Now see if *we* are done */ /* Now see if *we* are done */
if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) { if (thiscall->mode == VIR_NET_CLIENT_MODE_COMPLETE) {
...@@ -1444,15 +1426,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client, ...@@ -1444,15 +1426,11 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
return 2; return 2;
} }
/* We're not done, but we're non-blocking */ /* We're not done, but we're non-blocking; keep the call queued */
if (thiscall->nonBlock) { if (thiscall->nonBlock) {
thiscall->haveThread = false;
virNetClientIOEventLoopPassTheBuck(client, thiscall); virNetClientIOEventLoopPassTheBuck(client, thiscall);
if (thiscall->sentSomeData) { return 1;
return 1;
} else {
virNetClientCallRemove(&client->waitDispatch, thiscall);
return 0;
}
} }
if (fds[0].revents & (POLLHUP | POLLERR)) { if (fds[0].revents & (POLLHUP | POLLERR)) {
...@@ -1462,7 +1440,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client, ...@@ -1462,7 +1440,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
} }
} }
error: error:
virNetClientCallRemove(&client->waitDispatch, thiscall); virNetClientCallRemove(&client->waitDispatch, thiscall);
virNetClientIOEventLoopPassTheBuck(client, thiscall); virNetClientIOEventLoopPassTheBuck(client, thiscall);
...@@ -1614,9 +1591,11 @@ static int virNetClientIO(virNetClientPtr client, ...@@ -1614,9 +1591,11 @@ static int virNetClientIO(virNetClientPtr client,
goto cleanup; goto cleanup;
} }
/* If we're non-blocking, get outta here */ /* If we're non-blocking, we were either queued (and detached) or the
* call was not sent because of an error.
*/
if (thiscall->nonBlock) { if (thiscall->nonBlock) {
if (thiscall->sentSomeData) if (!thiscall->haveThread)
rv = 1; /* In progress */ rv = 1; /* In progress */
else else
rv = 0; /* none at all */ rv = 0; /* none at all */
...@@ -1708,7 +1687,7 @@ done: ...@@ -1708,7 +1687,7 @@ done:
/* /*
* Returns 2 if fully sent, 1 if partially sent (only for nonBlock==true), * Returns 2 if fully sent, 1 if queued (only for nonBlock==true),
* 0 if nothing sent (only for nonBlock==true) and -1 on error * 0 if nothing sent (only for nonBlock==true) and -1 on error
*/ */
static int virNetClientSendInternal(virNetClientPtr client, static int virNetClientSendInternal(virNetClientPtr client,
...@@ -1768,16 +1747,15 @@ static int virNetClientSendInternal(virNetClientPtr client, ...@@ -1768,16 +1747,15 @@ static int virNetClientSendInternal(virNetClientPtr client,
ret = virNetClientIO(client, call); ret = virNetClientIO(client, call);
/* If partially sent, then the call is still on the dispatch queue */ /* If queued, the call will be finished and freed later by another thread;
if (ret == 1) { * we're done. */
call->haveThread = false; if (ret == 1)
} else { return 1;
ignore_value(virCondDestroy(&call->cond));
} ignore_value(virCondDestroy(&call->cond));
cleanup: cleanup:
if (ret != 1) VIR_FREE(call);
VIR_FREE(call);
return ret; return ret;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册