提交 400a5a92 编写于 作者: J Jiri Denemark 提交者: Cole Robinson

client rpc: Separate call creation from running IO loop

This makes it possible to create and queue new calls while we are
running IO loop.
(cherry picked from commit c57103e5)
上级 e4859677
......@@ -1630,53 +1630,38 @@ done:
}
/*
* Returns 1 if the call was queued and will be completed later (only
* for nonBlock==true), 0 if the call was completed and -1 on error.
*/
static int virNetClientSendInternal(virNetClientPtr client,
virNetMessagePtr msg,
bool expectReply,
bool nonBlock)
static virNetClientCallPtr
virNetClientCallNew(virNetMessagePtr msg,
bool expectReply,
bool nonBlock)
{
virNetClientCallPtr call;
int ret = -1;
PROBE(RPC_CLIENT_MSG_TX_QUEUE,
"client=%p len=%zu prog=%u vers=%u proc=%u type=%u status=%u serial=%u",
client, msg->bufferLength,
msg->header.prog, msg->header.vers, msg->header.proc,
msg->header.type, msg->header.status, msg->header.serial);
virNetClientCallPtr call = NULL;
if (expectReply &&
(msg->bufferLength != 0) &&
(msg->header.status == VIR_NET_CONTINUE)) {
virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Attempt to send an asynchronous message with a synchronous reply"));
return -1;
_("Attempt to send an asynchronous message with"
" a synchronous reply"));
goto error;
}
if (expectReply && nonBlock) {
virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Attempt to send a non-blocking message with a synchronous reply"));
return -1;
_("Attempt to send a non-blocking message with"
" a synchronous reply"));
goto error;
}
if (VIR_ALLOC(call) < 0) {
virReportOOMError();
return -1;
}
if (!client->sock || client->wantClose) {
virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
_("client socket is closed"));
goto cleanup;
goto error;
}
if (virCondInit(&call->cond) < 0) {
virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
_("cannot initialize condition variable"));
goto cleanup;
goto error;
}
msg->donefds = 0;
......@@ -1687,8 +1672,48 @@ static int virNetClientSendInternal(virNetClientPtr client,
call->msg = msg;
call->expectReply = expectReply;
call->nonBlock = nonBlock;
call->haveThread = true;
VIR_DEBUG("New call %p: msg=%p, expectReply=%d, nonBlock=%d",
call, msg, expectReply, nonBlock);
return call;
error:
VIR_FREE(call);
return NULL;
}
/*
* Returns 1 if the call was queued and will be completed later (only
* for nonBlock==true), 0 if the call was completed and -1 on error.
*/
static int virNetClientSendInternal(virNetClientPtr client,
virNetMessagePtr msg,
bool expectReply,
bool nonBlock)
{
virNetClientCallPtr call;
int ret = -1;
PROBE(RPC_CLIENT_MSG_TX_QUEUE,
"client=%p len=%zu prog=%u vers=%u proc=%u type=%u status=%u serial=%u",
client, msg->bufferLength,
msg->header.prog, msg->header.vers, msg->header.proc,
msg->header.type, msg->header.status, msg->header.serial);
if (!client->sock || client->wantClose) {
virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
_("client socket is closed"));
return -1;
}
if (!(call = virNetClientCallNew(msg, expectReply, nonBlock))) {
virReportOOMError();
return -1;
}
call->haveThread = true;
ret = virNetClientIO(client, call);
/* If queued, the call will be finished and freed later by another thread;
......@@ -1697,8 +1722,6 @@ static int virNetClientSendInternal(virNetClientPtr client,
return 1;
ignore_value(virCondDestroy(&call->cond));
cleanup:
VIR_FREE(call);
return ret;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册