提交 9ab84470 编写于 作者: P Peter Xu 提交者: Markus Armbruster

monitor: Suspend monitor instead dropping commands

When a QMP client sends in-band commands more quickly that we can
process them, we can either queue them without limit (QUEUE), drop
commands when the queue is full (DROP), or suspend receiving commands
when the queue is full (SUSPEND).  None of them is ideal:

* QUEUE lets a misbehaving client make QEMU eat memory without bounds.
Not such a hot idea.

* With DROP, the client has to cope with dropped in-band commands.  To
inform the client, we send a COMMAND_DROPPED event then.  The event is
flawed by design in two ways: it's ambiguous (see commit d621cfe0),
and it brings back the "eat memory without bounds" problem.

* With SUSPEND, the client has to manage the flow of in-band commands to
keep the monitor available for out-of-band commands.

We currently DROP.  Switch to SUSPEND.

Managing the flow of in-band commands to keep the monitor available for
out-of-band commands isn't really hard: just count the number of
"outstanding" in-band commands (commands sent minus replies received),
and if it exceeds the limit, hold back additional ones until it drops
below the limit again.

Note that we need to be careful pairing the suspend with a resume, or
else the monitor will hang, possibly forever.  And here since we need to
make sure both:

     (1) popping request from the req queue, and
     (2) reading length of the req queue

will be in the same critical section, we let the pop function take the
corresponding queue lock when there is a request, then we release the
lock from the caller.
Reviewed-by: NMarc-André Lureau <marcandre.lureau@redhat.com>
Signed-off-by: NPeter Xu <peterx@redhat.com>
Message-Id: <20181009062718.1914-2-peterx@redhat.com>
Signed-off-by: NMarkus Armbruster <armbru@redhat.com>
上级 34f1f3e0
...@@ -130,8 +130,9 @@ to pass "id" with out-of-band commands. Passing it with all commands ...@@ -130,8 +130,9 @@ to pass "id" with out-of-band commands. Passing it with all commands
is recommended for clients that accept capability "oob". is recommended for clients that accept capability "oob".
If the client sends in-band commands faster than the server can If the client sends in-band commands faster than the server can
execute them, the server will eventually drop commands to limit the execute them, the server will stop reading the requests from the QMP
queue length. The sever sends event COMMAND_DROPPED then. channel until the request queue length is reduced to an acceptable
range.
Only a few commands support out-of-band execution. The ones that do Only a few commands support out-of-band execution. The ones that do
have "allow-oob": true in output of query-qmp-schema. have "allow-oob": true in output of query-qmp-schema.
......
...@@ -15,6 +15,8 @@ extern __thread Monitor *cur_mon; ...@@ -15,6 +15,8 @@ extern __thread Monitor *cur_mon;
#define MONITOR_USE_PRETTY 0x08 #define MONITOR_USE_PRETTY 0x08
#define MONITOR_USE_OOB 0x10 #define MONITOR_USE_OOB 0x10
#define QMP_REQ_QUEUE_LEN_MAX 8
bool monitor_cur_is_qmp(void); bool monitor_cur_is_qmp(void);
void monitor_init_globals(void); void monitor_init_globals(void);
......
...@@ -4110,8 +4110,12 @@ static void monitor_qmp_dispatch(Monitor *mon, QObject *req, QObject *id) ...@@ -4110,8 +4110,12 @@ static void monitor_qmp_dispatch(Monitor *mon, QObject *req, QObject *id)
* processing commands only on a very busy monitor. To achieve that, * processing commands only on a very busy monitor. To achieve that,
* when we process one request on a specific monitor, we put that * when we process one request on a specific monitor, we put that
* monitor to the end of mon_list queue. * monitor to the end of mon_list queue.
*
* Note: if the function returned with non-NULL, then the caller will
* be with mon->qmp.qmp_queue_lock held, and the caller is responsible
* to release it.
*/ */
static QMPRequest *monitor_qmp_requests_pop_any(void) static QMPRequest *monitor_qmp_requests_pop_any_with_lock(void)
{ {
QMPRequest *req_obj = NULL; QMPRequest *req_obj = NULL;
Monitor *mon; Monitor *mon;
...@@ -4121,10 +4125,11 @@ static QMPRequest *monitor_qmp_requests_pop_any(void) ...@@ -4121,10 +4125,11 @@ static QMPRequest *monitor_qmp_requests_pop_any(void)
QTAILQ_FOREACH(mon, &mon_list, entry) { QTAILQ_FOREACH(mon, &mon_list, entry) {
qemu_mutex_lock(&mon->qmp.qmp_queue_lock); qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
req_obj = g_queue_pop_head(mon->qmp.qmp_requests); req_obj = g_queue_pop_head(mon->qmp.qmp_requests);
qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
if (req_obj) { if (req_obj) {
/* With the lock of corresponding queue held */
break; break;
} }
qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
} }
if (req_obj) { if (req_obj) {
...@@ -4143,30 +4148,34 @@ static QMPRequest *monitor_qmp_requests_pop_any(void) ...@@ -4143,30 +4148,34 @@ static QMPRequest *monitor_qmp_requests_pop_any(void)
static void monitor_qmp_bh_dispatcher(void *data) static void monitor_qmp_bh_dispatcher(void *data)
{ {
QMPRequest *req_obj = monitor_qmp_requests_pop_any(); QMPRequest *req_obj = monitor_qmp_requests_pop_any_with_lock();
QDict *rsp; QDict *rsp;
bool need_resume; bool need_resume;
Monitor *mon;
if (!req_obj) { if (!req_obj) {
return; return;
} }
mon = req_obj->mon;
/* qmp_oob_enabled() might change after "qmp_capabilities" */ /* qmp_oob_enabled() might change after "qmp_capabilities" */
need_resume = !qmp_oob_enabled(req_obj->mon); need_resume = !qmp_oob_enabled(mon) ||
mon->qmp.qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;
qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
if (req_obj->req) { if (req_obj->req) {
trace_monitor_qmp_cmd_in_band(qobject_get_try_str(req_obj->id) ?: ""); trace_monitor_qmp_cmd_in_band(qobject_get_try_str(req_obj->id) ?: "");
monitor_qmp_dispatch(req_obj->mon, req_obj->req, req_obj->id); monitor_qmp_dispatch(mon, req_obj->req, req_obj->id);
} else { } else {
assert(req_obj->err); assert(req_obj->err);
rsp = qmp_error_response(req_obj->err); rsp = qmp_error_response(req_obj->err);
req_obj->err = NULL; req_obj->err = NULL;
monitor_qmp_respond(req_obj->mon, rsp, NULL); monitor_qmp_respond(mon, rsp, NULL);
qobject_unref(rsp); qobject_unref(rsp);
} }
if (need_resume) { if (need_resume) {
/* Pairs with the monitor_suspend() in handle_qmp_command() */ /* Pairs with the monitor_suspend() in handle_qmp_command() */
monitor_resume(req_obj->mon); monitor_resume(mon);
} }
qmp_request_free(req_obj); qmp_request_free(req_obj);
...@@ -4174,8 +4183,6 @@ static void monitor_qmp_bh_dispatcher(void *data) ...@@ -4174,8 +4183,6 @@ static void monitor_qmp_bh_dispatcher(void *data)
qemu_bh_schedule(qmp_dispatcher_bh); qemu_bh_schedule(qmp_dispatcher_bh);
} }
#define QMP_REQ_QUEUE_LEN_MAX (8)
static void handle_qmp_command(void *opaque, QObject *req, Error *err) static void handle_qmp_command(void *opaque, QObject *req, Error *err)
{ {
Monitor *mon = opaque; Monitor *mon = opaque;
...@@ -4217,28 +4224,14 @@ static void handle_qmp_command(void *opaque, QObject *req, Error *err) ...@@ -4217,28 +4224,14 @@ static void handle_qmp_command(void *opaque, QObject *req, Error *err)
qemu_mutex_lock(&mon->qmp.qmp_queue_lock); qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
/* /*
* If OOB is not enabled on the current monitor, we'll emulate the * Suspend the monitor when we can't queue more requests after
* old behavior that we won't process the current monitor any more * this one. Dequeuing in monitor_qmp_bh_dispatcher() will resume
* until it has responded. This helps make sure that as long as * it. Note that when OOB is disabled, we queue at most one
* OOB is not enabled, the server will never drop any command. * command, for backward compatibility.
*/ */
if (!qmp_oob_enabled(mon)) { if (!qmp_oob_enabled(mon) ||
mon->qmp.qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1) {
monitor_suspend(mon); monitor_suspend(mon);
} else {
/* Drop the request if queue is full. */
if (mon->qmp.qmp_requests->length >= QMP_REQ_QUEUE_LEN_MAX) {
qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
/*
* FIXME @id's scope is just @mon, and broadcasting it is
* wrong. If another monitor's client has a command with
* the same ID in flight, the event will incorrectly claim
* that command was dropped.
*/
qapi_event_send_command_dropped(id,
COMMAND_DROP_REASON_QUEUE_FULL);
qmp_request_free(req_obj);
return;
}
} }
/* /*
...@@ -4246,6 +4239,7 @@ static void handle_qmp_command(void *opaque, QObject *req, Error *err) ...@@ -4246,6 +4239,7 @@ static void handle_qmp_command(void *opaque, QObject *req, Error *err)
* handled in time order. Ownership for req_obj, req, id, * handled in time order. Ownership for req_obj, req, id,
* etc. will be delivered to the handler side. * etc. will be delivered to the handler side.
*/ */
assert(mon->qmp.qmp_requests->length < QMP_REQ_QUEUE_LEN_MAX);
g_queue_push_tail(mon->qmp.qmp_requests, req_obj); g_queue_push_tail(mon->qmp.qmp_requests, req_obj);
qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
......
...@@ -3444,46 +3444,6 @@ ...@@ -3444,46 +3444,6 @@
## ##
{ 'command': 'query-sev-capabilities', 'returns': 'SevCapability' } { 'command': 'query-sev-capabilities', 'returns': 'SevCapability' }
##
# @CommandDropReason:
#
# Reasons that caused one command to be dropped.
#
# @queue-full: the command queue is full. This can only occur when
# the client sends a new non-oob command before the
# response to the previous non-oob command has been
# received.
#
# Since: 2.12
##
{ 'enum': 'CommandDropReason',
'data': [ 'queue-full' ] }
##
# @COMMAND_DROPPED:
#
# Emitted when a command is dropped due to some reason. Commands can
# only be dropped when the oob capability is enabled.
#
# @id: The dropped command's "id" field.
# FIXME Broken by design. Events are broadcast to all monitors. If
# another monitor's client has a command with the same ID in flight,
# the event will incorrectly claim that command was dropped.
#
# @reason: The reason why the command is dropped.
#
# Since: 2.12
#
# Example:
#
# { "event": "COMMAND_DROPPED",
# "data": {"result": {"id": "libvirt-102",
# "reason": "queue-full" } } }
#
##
{ 'event': 'COMMAND_DROPPED' ,
'data': { 'id': 'any', 'reason': 'CommandDropReason' } }
## ##
# @set-numa-node: # @set-numa-node:
# #
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册