提交 833b901c 编写于 作者: M Michal Privoznik

stream: Check for stream EOF

If client stream does not have any data to sink and neither received
EOF, a dummy packet is sent to the daemon signalising client is ready to
sink some data. However, after we added event loop to client a race may
occur:

Thread 1 calls virNetClientStreamRecvPacket and since no data are cached
nor stream has EOF, it decides to send dummy packet to server which will
sent some data in turn. However, during this decision and actual message
exchange with server -

Thread 2 receives last stream data from server. Therefore an EOF is set
on stream and if there is a call waiting (which is not yet) it is woken
up. However, Thread 1 haven't sent anything so far, so there is no call
to be woken up. So this thread sent dummy packet to daemon, which
ignores that as no stream is associated with such packet and therefore
no reply will ever come.

This race causes client to hang indefinitely.
上级 74e701f8
...@@ -1703,8 +1703,6 @@ static int virNetClientSendInternal(virNetClientPtr client, ...@@ -1703,8 +1703,6 @@ static int virNetClientSendInternal(virNetClientPtr client,
return -1; return -1;
} }
virNetClientLock(client);
if (!client->sock || client->wantClose) { if (!client->sock || client->wantClose) {
virNetError(VIR_ERR_INTERNAL_ERROR, "%s", virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
_("client socket is closed")); _("client socket is closed"));
...@@ -1739,7 +1737,6 @@ static int virNetClientSendInternal(virNetClientPtr client, ...@@ -1739,7 +1737,6 @@ static int virNetClientSendInternal(virNetClientPtr client,
cleanup: cleanup:
if (ret != 1) if (ret != 1)
VIR_FREE(call); VIR_FREE(call);
virNetClientUnlock(client);
return ret; return ret;
} }
...@@ -1757,7 +1754,10 @@ cleanup: ...@@ -1757,7 +1754,10 @@ cleanup:
int virNetClientSendWithReply(virNetClientPtr client, int virNetClientSendWithReply(virNetClientPtr client,
virNetMessagePtr msg) virNetMessagePtr msg)
{ {
int ret = virNetClientSendInternal(client, msg, true, false); int ret;
virNetClientLock(client);
ret = virNetClientSendInternal(client, msg, true, false);
virNetClientUnlock(client);
if (ret < 0) if (ret < 0)
return -1; return -1;
return 0; return 0;
...@@ -1777,7 +1777,10 @@ int virNetClientSendWithReply(virNetClientPtr client, ...@@ -1777,7 +1777,10 @@ int virNetClientSendWithReply(virNetClientPtr client,
int virNetClientSendNoReply(virNetClientPtr client, int virNetClientSendNoReply(virNetClientPtr client,
virNetMessagePtr msg) virNetMessagePtr msg)
{ {
int ret = virNetClientSendInternal(client, msg, false, false); int ret;
virNetClientLock(client);
ret = virNetClientSendInternal(client, msg, false, false);
virNetClientUnlock(client);
if (ret < 0) if (ret < 0)
return -1; return -1;
return 0; return 0;
...@@ -1796,5 +1799,41 @@ int virNetClientSendNoReply(virNetClientPtr client, ...@@ -1796,5 +1799,41 @@ int virNetClientSendNoReply(virNetClientPtr client,
int virNetClientSendNonBlock(virNetClientPtr client, int virNetClientSendNonBlock(virNetClientPtr client,
virNetMessagePtr msg) virNetMessagePtr msg)
{ {
return virNetClientSendInternal(client, msg, false, true); int ret;
virNetClientLock(client);
ret = virNetClientSendInternal(client, msg, false, true);
virNetClientUnlock(client);
return ret;
}
/*
* @msg: a message allocated on heap or stack
*
* Send a message synchronously, and wait for the reply synchronously
*
* The caller is responsible for free'ing @msg if it was allocated
* on the heap
*
* Returns 0 on success, -1 on failure
*/
int virNetClientSendWithReplyStream(virNetClientPtr client,
virNetMessagePtr msg,
virNetClientStreamPtr st)
{
int ret;
virNetClientLock(client);
/* Other thread might have already received
* stream EOF so we don't want sent anything.
* Server won't respond anyway.
*/
if (virNetClientStreamEOF(st)) {
virNetClientUnlock(client);
return 0;
}
ret = virNetClientSendInternal(client, msg, true, false);
virNetClientUnlock(client);
if (ret < 0)
return -1;
return 0;
} }
...@@ -76,6 +76,9 @@ int virNetClientSendNoReply(virNetClientPtr client, ...@@ -76,6 +76,9 @@ int virNetClientSendNoReply(virNetClientPtr client,
int virNetClientSendNonBlock(virNetClientPtr client, int virNetClientSendNonBlock(virNetClientPtr client,
virNetMessagePtr msg); virNetMessagePtr msg);
int virNetClientSendWithReplyStream(virNetClientPtr client,
virNetMessagePtr msg,
virNetClientStreamPtr st);
# ifdef HAVE_SASL # ifdef HAVE_SASL
void virNetClientSetSASLSession(virNetClientPtr client, void virNetClientSetSASLSession(virNetClientPtr client,
......
...@@ -408,7 +408,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st, ...@@ -408,7 +408,7 @@ int virNetClientStreamRecvPacket(virNetClientStreamPtr st,
VIR_DEBUG("Dummy packet to wait for stream data"); VIR_DEBUG("Dummy packet to wait for stream data");
virMutexUnlock(&st->lock); virMutexUnlock(&st->lock);
ret = virNetClientSendWithReply(client, msg); ret = virNetClientSendWithReplyStream(client, msg, st);
virMutexLock(&st->lock); virMutexLock(&st->lock);
virNetMessageFree(msg); virNetMessageFree(msg);
...@@ -530,3 +530,8 @@ cleanup: ...@@ -530,3 +530,8 @@ cleanup:
virMutexUnlock(&st->lock); virMutexUnlock(&st->lock);
return ret; return ret;
} }
bool virNetClientStreamEOF(virNetClientStreamPtr st)
{
return st->incomingEOF;
}
...@@ -72,5 +72,7 @@ int virNetClientStreamEventUpdateCallback(virNetClientStreamPtr st, ...@@ -72,5 +72,7 @@ int virNetClientStreamEventUpdateCallback(virNetClientStreamPtr st,
int events); int events);
int virNetClientStreamEventRemoveCallback(virNetClientStreamPtr st); int virNetClientStreamEventRemoveCallback(virNetClientStreamPtr st);
bool virNetClientStreamEOF(virNetClientStreamPtr st)
ATTRIBUTE_NONNULL(1);
#endif /* __VIR_NET_CLIENT_STREAM_H__ */ #endif /* __VIR_NET_CLIENT_STREAM_H__ */
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册