提交 a0775e28 编写于 作者: P Peter Maydell

Merge remote-tracking branch 'remotes/stefanha/tags/block-pull-request' into staging

Pull request

v2:
 * Rebased to resolve scsi conflicts

# gpg: Signature made Tue 21 Feb 2017 11:56:24 GMT
# gpg:                using RSA key 0x9CA4ABB381AB73C8
# gpg: Good signature from "Stefan Hajnoczi <stefanha@redhat.com>"
# gpg:                 aka "Stefan Hajnoczi <stefanha@gmail.com>"
# Primary key fingerprint: 8695 A8BF D3F9 7CDA AC35  775A 9CA4 ABB3 81AB 73C8

* remotes/stefanha/tags/block-pull-request: (24 commits)
  coroutine-lock: make CoRwlock thread-safe and fair
  coroutine-lock: add mutex argument to CoQueue APIs
  coroutine-lock: place CoMutex before CoQueue in header
  test-aio-multithread: add performance comparison with thread-based mutexes
  coroutine-lock: add limited spinning to CoMutex
  coroutine-lock: make CoMutex thread-safe
  block: document fields protected by AioContext lock
  async: remove unnecessary inc/dec pairs
  aio-posix: partially inline aio_dispatch into aio_poll
  block: explicitly acquire aiocontext in aio callbacks that need it
  block: explicitly acquire aiocontext in bottom halves that need it
  block: explicitly acquire aiocontext in callbacks that need it
  block: explicitly acquire aiocontext in timers that need it
  aio: push aio_context_acquire/release down to dispatching
  qed: introduce qed_aio_start_io and qed_aio_next_io_cb
  blkdebug: reschedule coroutine on the AioContext it is running on
  coroutine-lock: reschedule coroutine on the AioContext it was running on
  nbd: convert to use qio_channel_yield
  io: make qio_channel_yield aware of AioContexts
  io: add methods to set I/O handlers on AioContext
  ...
Signed-off-by: NPeter Maydell <peter.maydell@linaro.org>
......@@ -9,12 +9,8 @@ chardev-obj-y = chardev/
#######################################################################
# block-obj-y is code used by both qemu system emulation and qemu-img
block-obj-y = async.o thread-pool.o
block-obj-y += nbd/
block-obj-y += block.o blockjob.o
block-obj-y += main-loop.o iohandler.o qemu-timer.o
block-obj-$(CONFIG_POSIX) += aio-posix.o
block-obj-$(CONFIG_WIN32) += aio-win32.o
block-obj-y += block/
block-obj-y += qemu-io-cmds.o
block-obj-$(CONFIG_REPLICATION) += replication.o
......
......@@ -64,7 +64,7 @@ static void coroutine_fn wait_for_overlapping_requests(BackupBlockJob *job,
retry = false;
QLIST_FOREACH(req, &job->inflight_reqs, list) {
if (end > req->start && start < req->end) {
qemu_co_queue_wait(&req->wait_queue);
qemu_co_queue_wait(&req->wait_queue, NULL);
retry = true;
break;
}
......
......@@ -405,12 +405,6 @@ out:
return ret;
}
static void error_callback_bh(void *opaque)
{
Coroutine *co = opaque;
qemu_coroutine_enter(co);
}
static int inject_error(BlockDriverState *bs, BlkdebugRule *rule)
{
BDRVBlkdebugState *s = bs->opaque;
......@@ -423,8 +417,7 @@ static int inject_error(BlockDriverState *bs, BlkdebugRule *rule)
}
if (!immediately) {
aio_bh_schedule_oneshot(bdrv_get_aio_context(bs), error_callback_bh,
qemu_coroutine_self());
aio_co_schedule(qemu_get_current_aio_context(), qemu_coroutine_self());
qemu_coroutine_yield();
}
......
......@@ -60,7 +60,7 @@ static int64_t blkreplay_getlength(BlockDriverState *bs)
static void blkreplay_bh_cb(void *opaque)
{
Request *req = opaque;
qemu_coroutine_enter(req->co);
aio_co_wake(req->co);
qemu_bh_delete(req->bh);
g_free(req);
}
......
......@@ -880,7 +880,6 @@ static int blk_prw(BlockBackend *blk, int64_t offset, uint8_t *buf,
{
QEMUIOVector qiov;
struct iovec iov;
Coroutine *co;
BlkRwCo rwco;
iov = (struct iovec) {
......@@ -897,9 +896,14 @@ static int blk_prw(BlockBackend *blk, int64_t offset, uint8_t *buf,
.ret = NOT_DONE,
};
co = qemu_coroutine_create(co_entry, &rwco);
qemu_coroutine_enter(co);
BDRV_POLL_WHILE(blk_bs(blk), rwco.ret == NOT_DONE);
if (qemu_in_coroutine()) {
/* Fast-path if already in coroutine context */
co_entry(&rwco);
} else {
Coroutine *co = qemu_coroutine_create(co_entry, &rwco);
qemu_coroutine_enter(co);
BDRV_POLL_WHILE(blk_bs(blk), rwco.ret == NOT_DONE);
}
return rwco.ret;
}
......@@ -979,7 +983,6 @@ static void blk_aio_complete(BlkAioEmAIOCB *acb)
static void blk_aio_complete_bh(void *opaque)
{
BlkAioEmAIOCB *acb = opaque;
assert(acb->has_returned);
blk_aio_complete(acb);
}
......
......@@ -386,9 +386,8 @@ static void curl_multi_check_completion(BDRVCURLState *s)
}
}
static void curl_multi_do(void *arg)
static void curl_multi_do_locked(CURLState *s)
{
CURLState *s = (CURLState *)arg;
CURLSocket *socket, *next_socket;
int running;
int r;
......@@ -406,12 +405,23 @@ static void curl_multi_do(void *arg)
}
}
static void curl_multi_do(void *arg)
{
CURLState *s = (CURLState *)arg;
aio_context_acquire(s->s->aio_context);
curl_multi_do_locked(s);
aio_context_release(s->s->aio_context);
}
static void curl_multi_read(void *arg)
{
CURLState *s = (CURLState *)arg;
curl_multi_do(arg);
aio_context_acquire(s->s->aio_context);
curl_multi_do_locked(s);
curl_multi_check_completion(s->s);
aio_context_release(s->s->aio_context);
}
static void curl_multi_timeout_do(void *arg)
......@@ -424,9 +434,11 @@ static void curl_multi_timeout_do(void *arg)
return;
}
aio_context_acquire(s->aio_context);
curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
curl_multi_check_completion(s);
aio_context_release(s->aio_context);
#else
abort();
#endif
......@@ -784,13 +796,18 @@ static void curl_readv_bh_cb(void *p)
{
CURLState *state;
int running;
int ret = -EINPROGRESS;
CURLAIOCB *acb = p;
BDRVCURLState *s = acb->common.bs->opaque;
BlockDriverState *bs = acb->common.bs;
BDRVCURLState *s = bs->opaque;
AioContext *ctx = bdrv_get_aio_context(bs);
size_t start = acb->sector_num * BDRV_SECTOR_SIZE;
size_t end;
aio_context_acquire(ctx);
// In case we have the requested data already (e.g. read-ahead),
// we can just call the callback and be done.
switch (curl_find_buf(s, start, acb->nb_sectors * BDRV_SECTOR_SIZE, acb)) {
......@@ -798,7 +815,7 @@ static void curl_readv_bh_cb(void *p)
qemu_aio_unref(acb);
// fall through
case FIND_RET_WAIT:
return;
goto out;
default:
break;
}
......@@ -806,9 +823,8 @@ static void curl_readv_bh_cb(void *p)
// No cache found, so let's start a new request
state = curl_init_state(acb->common.bs, s);
if (!state) {
acb->common.cb(acb->common.opaque, -EIO);
qemu_aio_unref(acb);
return;
ret = -EIO;
goto out;
}
acb->start = 0;
......@@ -822,9 +838,8 @@ static void curl_readv_bh_cb(void *p)
state->orig_buf = g_try_malloc(state->buf_len);
if (state->buf_len && state->orig_buf == NULL) {
curl_clean_state(state);
acb->common.cb(acb->common.opaque, -ENOMEM);
qemu_aio_unref(acb);
return;
ret = -ENOMEM;
goto out;
}
state->acb[0] = acb;
......@@ -837,6 +852,13 @@ static void curl_readv_bh_cb(void *p)
/* Tell curl it needs to kick things off */
curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running);
out:
aio_context_release(ctx);
if (ret != -EINPROGRESS) {
acb->common.cb(acb->common.opaque, ret);
qemu_aio_unref(acb);
}
}
static BlockAIOCB *curl_aio_readv(BlockDriverState *bs,
......
......@@ -698,13 +698,6 @@ static struct glfs *qemu_gluster_init(BlockdevOptionsGluster *gconf,
return qemu_gluster_glfs_init(gconf, errp);
}
static void qemu_gluster_complete_aio(void *opaque)
{
GlusterAIOCB *acb = (GlusterAIOCB *)opaque;
qemu_coroutine_enter(acb->coroutine);
}
/*
* AIO callback routine called from GlusterFS thread.
*/
......@@ -720,7 +713,7 @@ static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg)
acb->ret = -EIO; /* Partial read/write - fail it */
}
aio_bh_schedule_oneshot(acb->aio_context, qemu_gluster_complete_aio, acb);
aio_co_schedule(acb->aio_context, acb->coroutine);
}
static void qemu_gluster_parse_flags(int bdrv_flags, int *open_flags)
......
......@@ -189,7 +189,7 @@ static void bdrv_co_drain_bh_cb(void *opaque)
bdrv_dec_in_flight(bs);
bdrv_drained_begin(bs);
data->done = true;
qemu_coroutine_enter(co);
aio_co_wake(co);
}
static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs)
......@@ -539,7 +539,7 @@ static bool coroutine_fn wait_serialising_requests(BdrvTrackedRequest *self)
* (instead of producing a deadlock in the former case). */
if (!req->waiting_for) {
self->waiting_for = req;
qemu_co_queue_wait(&req->wait_queue);
qemu_co_queue_wait(&req->wait_queue, NULL);
self->waiting_for = NULL;
retry = true;
waited = true;
......@@ -813,7 +813,7 @@ static void bdrv_co_io_em_complete(void *opaque, int ret)
CoroutineIOCompletion *co = opaque;
co->ret = ret;
qemu_coroutine_enter(co->coroutine);
aio_co_wake(co->coroutine);
}
static int coroutine_fn bdrv_driver_preadv(BlockDriverState *bs,
......@@ -2080,6 +2080,11 @@ void bdrv_aio_cancel(BlockAIOCB *acb)
if (acb->aiocb_info->get_aio_context) {
aio_poll(acb->aiocb_info->get_aio_context(acb), true);
} else if (acb->bs) {
/* qemu_aio_ref and qemu_aio_unref are not thread-safe, so
* assert that we're not using an I/O thread. Thread-safe
* code should use bdrv_aio_cancel_async exclusively.
*/
assert(bdrv_get_aio_context(acb->bs) == qemu_get_aio_context());
aio_poll(bdrv_get_aio_context(acb->bs), true);
} else {
abort();
......@@ -2239,35 +2244,6 @@ BlockAIOCB *bdrv_aio_flush(BlockDriverState *bs,
return &acb->common;
}
void *qemu_aio_get(const AIOCBInfo *aiocb_info, BlockDriverState *bs,
BlockCompletionFunc *cb, void *opaque)
{
BlockAIOCB *acb;
acb = g_malloc(aiocb_info->aiocb_size);
acb->aiocb_info = aiocb_info;
acb->bs = bs;
acb->cb = cb;
acb->opaque = opaque;
acb->refcnt = 1;
return acb;
}
void qemu_aio_ref(void *p)
{
BlockAIOCB *acb = p;
acb->refcnt++;
}
void qemu_aio_unref(void *p)
{
BlockAIOCB *acb = p;
assert(acb->refcnt > 0);
if (--acb->refcnt == 0) {
g_free(acb);
}
}
/**************************************************************/
/* Coroutine block device emulation */
......@@ -2299,7 +2275,7 @@ int coroutine_fn bdrv_co_flush(BlockDriverState *bs)
/* Wait until any previous flushes are completed */
while (bs->active_flush_req) {
qemu_co_queue_wait(&bs->flush_queue);
qemu_co_queue_wait(&bs->flush_queue, NULL);
}
bs->active_flush_req = true;
......
......@@ -165,8 +165,9 @@ iscsi_schedule_bh(IscsiAIOCB *acb)
static void iscsi_co_generic_bh_cb(void *opaque)
{
struct IscsiTask *iTask = opaque;
iTask->complete = 1;
qemu_coroutine_enter(iTask->co);
aio_co_wake(iTask->co);
}
static void iscsi_retry_timer_expired(void *opaque)
......@@ -174,7 +175,7 @@ static void iscsi_retry_timer_expired(void *opaque)
struct IscsiTask *iTask = opaque;
iTask->complete = 1;
if (iTask->co) {
qemu_coroutine_enter(iTask->co);
aio_co_wake(iTask->co);
}
}
......@@ -394,8 +395,10 @@ iscsi_process_read(void *arg)
IscsiLun *iscsilun = arg;
struct iscsi_context *iscsi = iscsilun->iscsi;
aio_context_acquire(iscsilun->aio_context);
iscsi_service(iscsi, POLLIN);
iscsi_set_events(iscsilun);
aio_context_release(iscsilun->aio_context);
}
static void
......@@ -404,8 +407,10 @@ iscsi_process_write(void *arg)
IscsiLun *iscsilun = arg;
struct iscsi_context *iscsi = iscsilun->iscsi;
aio_context_acquire(iscsilun->aio_context);
iscsi_service(iscsi, POLLOUT);
iscsi_set_events(iscsilun);
aio_context_release(iscsilun->aio_context);
}
static int64_t sector_lun2qemu(int64_t sector, IscsiLun *iscsilun)
......@@ -1392,16 +1397,20 @@ static void iscsi_nop_timed_event(void *opaque)
{
IscsiLun *iscsilun = opaque;
aio_context_acquire(iscsilun->aio_context);
if (iscsi_get_nops_in_flight(iscsilun->iscsi) >= MAX_NOP_FAILURES) {
error_report("iSCSI: NOP timeout. Reconnecting...");
iscsilun->request_timed_out = true;
} else if (iscsi_nop_out_async(iscsilun->iscsi, NULL, NULL, 0, NULL) != 0) {
error_report("iSCSI: failed to sent NOP-Out. Disabling NOP messages.");
return;
goto out;
}
timer_mod(iscsilun->nop_timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) + NOP_INTERVAL);
iscsi_set_events(iscsilun);
out:
aio_context_release(iscsilun->aio_context);
}
static void iscsi_readcapacity_sync(IscsiLun *iscsilun, Error **errp)
......
......@@ -54,10 +54,10 @@ struct LinuxAioState {
io_context_t ctx;
EventNotifier e;
/* io queue for submit at batch */
/* io queue for submit at batch. Protected by AioContext lock. */
LaioQueue io_q;
/* I/O completion processing */
/* I/O completion processing. Only runs in I/O thread. */
QEMUBH *completion_bh;
int event_idx;
int event_max;
......@@ -100,7 +100,7 @@ static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
* that!
*/
if (!qemu_coroutine_entered(laiocb->co)) {
qemu_coroutine_enter(laiocb->co);
aio_co_wake(laiocb->co);
}
} else {
laiocb->common.cb(laiocb->common.opaque, ret);
......@@ -234,9 +234,12 @@ static void qemu_laio_process_completions(LinuxAioState *s)
static void qemu_laio_process_completions_and_submit(LinuxAioState *s)
{
qemu_laio_process_completions(s);
aio_context_acquire(s->aio_context);
if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
ioq_submit(s);
}
aio_context_release(s->aio_context);
}
static void qemu_laio_completion_bh(void *opaque)
......@@ -455,6 +458,7 @@ void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
{
aio_set_event_notifier(old_context, &s->e, false, NULL, NULL);
qemu_bh_delete(s->completion_bh);
s->aio_context = NULL;
}
void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
......
......@@ -132,6 +132,8 @@ static void mirror_write_complete(void *opaque, int ret)
{
MirrorOp *op = opaque;
MirrorBlockJob *s = op->s;
aio_context_acquire(blk_get_aio_context(s->common.blk));
if (ret < 0) {
BlockErrorAction action;
......@@ -142,12 +144,15 @@ static void mirror_write_complete(void *opaque, int ret)
}
}
mirror_iteration_done(op, ret);
aio_context_release(blk_get_aio_context(s->common.blk));
}
static void mirror_read_complete(void *opaque, int ret)
{
MirrorOp *op = opaque;
MirrorBlockJob *s = op->s;
aio_context_acquire(blk_get_aio_context(s->common.blk));
if (ret < 0) {
BlockErrorAction action;
......@@ -158,10 +163,11 @@ static void mirror_read_complete(void *opaque, int ret)
}
mirror_iteration_done(op, ret);
return;
} else {
blk_aio_pwritev(s->target, op->sector_num * BDRV_SECTOR_SIZE, &op->qiov,
0, mirror_write_complete, op);
}
blk_aio_pwritev(s->target, op->sector_num * BDRV_SECTOR_SIZE, &op->qiov,
0, mirror_write_complete, op);
aio_context_release(blk_get_aio_context(s->common.blk));
}
static inline void mirror_clip_sectors(MirrorBlockJob *s,
......
......@@ -33,8 +33,9 @@
#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
#define INDEX_TO_HANDLE(bs, index) ((index) ^ ((uint64_t)(intptr_t)bs))
static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
static void nbd_recv_coroutines_enter_all(BlockDriverState *bs)
{
NBDClientSession *s = nbd_get_client_session(bs);
int i;
for (i = 0; i < MAX_NBD_REQUESTS; i++) {
......@@ -42,6 +43,7 @@ static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
qemu_coroutine_enter(s->recv_coroutine[i]);
}
}
BDRV_POLL_WHILE(bs, s->read_reply_co);
}
static void nbd_teardown_connection(BlockDriverState *bs)
......@@ -56,7 +58,7 @@ static void nbd_teardown_connection(BlockDriverState *bs)
qio_channel_shutdown(client->ioc,
QIO_CHANNEL_SHUTDOWN_BOTH,
NULL);
nbd_recv_coroutines_enter_all(client);
nbd_recv_coroutines_enter_all(bs);
nbd_client_detach_aio_context(bs);
object_unref(OBJECT(client->sioc));
......@@ -65,54 +67,43 @@ static void nbd_teardown_connection(BlockDriverState *bs)
client->ioc = NULL;
}
static void nbd_reply_ready(void *opaque)
static coroutine_fn void nbd_read_reply_entry(void *opaque)
{
BlockDriverState *bs = opaque;
NBDClientSession *s = nbd_get_client_session(bs);
NBDClientSession *s = opaque;
uint64_t i;
int ret;
if (!s->ioc) { /* Already closed */
return;
}
if (s->reply.handle == 0) {
/* No reply already in flight. Fetch a header. It is possible
* that another thread has done the same thing in parallel, so
* the socket is not readable anymore.
*/
for (;;) {
assert(s->reply.handle == 0);
ret = nbd_receive_reply(s->ioc, &s->reply);
if (ret == -EAGAIN) {
return;
}
if (ret < 0) {
s->reply.handle = 0;
goto fail;
break;
}
}
/* There's no need for a mutex on the receive side, because the
* handler acts as a synchronization point and ensures that only
* one coroutine is called until the reply finishes. */
i = HANDLE_TO_INDEX(s, s->reply.handle);
if (i >= MAX_NBD_REQUESTS) {
goto fail;
}
/* There's no need for a mutex on the receive side, because the
* handler acts as a synchronization point and ensures that only
* one coroutine is called until the reply finishes.
*/
i = HANDLE_TO_INDEX(s, s->reply.handle);
if (i >= MAX_NBD_REQUESTS || !s->recv_coroutine[i]) {
break;
}
if (s->recv_coroutine[i]) {
qemu_coroutine_enter(s->recv_coroutine[i]);
return;
/* We're woken up by the recv_coroutine itself. Note that there
* is no race between yielding and reentering read_reply_co. This
* is because:
*
* - if recv_coroutine[i] runs on the same AioContext, it is only
* entered after we yield
*
* - if recv_coroutine[i] runs on a different AioContext, reentering
* read_reply_co happens through a bottom half, which can only
* run after we yield.
*/
aio_co_wake(s->recv_coroutine[i]);
qemu_coroutine_yield();
}
fail:
nbd_teardown_connection(bs);
}
static void nbd_restart_write(void *opaque)
{
BlockDriverState *bs = opaque;
qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine);
s->read_reply_co = NULL;
}
static int nbd_co_send_request(BlockDriverState *bs,
......@@ -120,7 +111,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
QEMUIOVector *qiov)
{
NBDClientSession *s = nbd_get_client_session(bs);
AioContext *aio_context;
int rc, ret, i;
qemu_co_mutex_lock(&s->send_mutex);
......@@ -141,11 +131,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
return -EPIPE;
}
s->send_coroutine = qemu_coroutine_self();
aio_context = bdrv_get_aio_context(bs);
aio_set_fd_handler(aio_context, s->sioc->fd, false,
nbd_reply_ready, nbd_restart_write, NULL, bs);
if (qiov) {
qio_channel_set_cork(s->ioc, true);
rc = nbd_send_request(s->ioc, request);
......@@ -160,9 +145,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
} else {
rc = nbd_send_request(s->ioc, request);
}
aio_set_fd_handler(aio_context, s->sioc->fd, false,
nbd_reply_ready, NULL, NULL, bs);
s->send_coroutine = NULL;
qemu_co_mutex_unlock(&s->send_mutex);
return rc;
}
......@@ -174,8 +156,7 @@ static void nbd_co_receive_reply(NBDClientSession *s,
{
int ret;
/* Wait until we're woken up by the read handler. TODO: perhaps
* peek at the next reply and avoid yielding if it's ours? */
/* Wait until we're woken up by nbd_read_reply_entry. */
qemu_coroutine_yield();
*reply = s->reply;
if (reply->handle != request->handle ||
......@@ -201,7 +182,7 @@ static void nbd_coroutine_start(NBDClientSession *s,
/* Poor man semaphore. The free_sema is locked when no other request
* can be accepted, and unlocked after receiving one reply. */
if (s->in_flight == MAX_NBD_REQUESTS) {
qemu_co_queue_wait(&s->free_sema);
qemu_co_queue_wait(&s->free_sema, NULL);
assert(s->in_flight < MAX_NBD_REQUESTS);
}
s->in_flight++;
......@@ -209,13 +190,19 @@ static void nbd_coroutine_start(NBDClientSession *s,
/* s->recv_coroutine[i] is set as soon as we get the send_lock. */
}
static void nbd_coroutine_end(NBDClientSession *s,
static void nbd_coroutine_end(BlockDriverState *bs,
NBDRequest *request)
{
NBDClientSession *s = nbd_get_client_session(bs);
int i = HANDLE_TO_INDEX(s, request->handle);
s->recv_coroutine[i] = NULL;
if (s->in_flight-- == MAX_NBD_REQUESTS) {
qemu_co_queue_next(&s->free_sema);
s->in_flight--;
qemu_co_queue_next(&s->free_sema);
/* Kick the read_reply_co to get the next reply. */
if (s->read_reply_co) {
aio_co_wake(s->read_reply_co);
}
}
......@@ -241,7 +228,7 @@ int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
} else {
nbd_co_receive_reply(client, &request, &reply, qiov);
}
nbd_coroutine_end(client, &request);
nbd_coroutine_end(bs, &request);
return -reply.error;
}
......@@ -271,7 +258,7 @@ int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
} else {
nbd_co_receive_reply(client, &request, &reply, NULL);
}
nbd_coroutine_end(client, &request);
nbd_coroutine_end(bs, &request);
return -reply.error;
}
......@@ -306,7 +293,7 @@ int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
} else {
nbd_co_receive_reply(client, &request, &reply, NULL);
}
nbd_coroutine_end(client, &request);
nbd_coroutine_end(bs, &request);
return -reply.error;
}
......@@ -331,7 +318,7 @@ int nbd_client_co_flush(BlockDriverState *bs)
} else {
nbd_co_receive_reply(client, &request, &reply, NULL);
}
nbd_coroutine_end(client, &request);
nbd_coroutine_end(bs, &request);
return -reply.error;
}
......@@ -357,23 +344,23 @@ int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int count)
} else {
nbd_co_receive_reply(client, &request, &reply, NULL);
}
nbd_coroutine_end(client, &request);
nbd_coroutine_end(bs, &request);
return -reply.error;
}
void nbd_client_detach_aio_context(BlockDriverState *bs)
{
aio_set_fd_handler(bdrv_get_aio_context(bs),
nbd_get_client_session(bs)->sioc->fd,
false, NULL, NULL, NULL, NULL);
NBDClientSession *client = nbd_get_client_session(bs);
qio_channel_detach_aio_context(QIO_CHANNEL(client->sioc));
}
void nbd_client_attach_aio_context(BlockDriverState *bs,
AioContext *new_context)
{
aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd,
false, nbd_reply_ready, NULL, NULL, bs);
NBDClientSession *client = nbd_get_client_session(bs);
qio_channel_attach_aio_context(QIO_CHANNEL(client->sioc), new_context);
aio_co_schedule(new_context, client->read_reply_co);
}
void nbd_client_close(BlockDriverState *bs)
......@@ -434,7 +421,7 @@ int nbd_client_init(BlockDriverState *bs,
/* Now that we're connected, set the socket to be non-blocking and
* kick the reply mechanism. */
qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
client->read_reply_co = qemu_coroutine_create(nbd_read_reply_entry, client);
nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
logout("Established connection with NBD server\n");
......
......@@ -25,7 +25,7 @@ typedef struct NBDClientSession {
CoMutex send_mutex;
CoQueue free_sema;
Coroutine *send_coroutine;
Coroutine *read_reply_co;
int in_flight;
Coroutine *recv_coroutine[MAX_NBD_REQUESTS];
......
......@@ -208,15 +208,21 @@ static void nfs_set_events(NFSClient *client)
static void nfs_process_read(void *arg)
{
NFSClient *client = arg;
aio_context_acquire(client->aio_context);
nfs_service(client->context, POLLIN);
nfs_set_events(client);
aio_context_release(client->aio_context);
}
static void nfs_process_write(void *arg)
{
NFSClient *client = arg;
aio_context_acquire(client->aio_context);
nfs_service(client->context, POLLOUT);
nfs_set_events(client);
aio_context_release(client->aio_context);
}
static void nfs_co_init_task(BlockDriverState *bs, NFSRPC *task)
......@@ -231,8 +237,9 @@ static void nfs_co_init_task(BlockDriverState *bs, NFSRPC *task)
static void nfs_co_generic_bh_cb(void *opaque)
{
NFSRPC *task = opaque;
task->complete = 1;
qemu_coroutine_enter(task->co);
aio_co_wake(task->co);
}
static void
......
......@@ -932,9 +932,7 @@ static int handle_dependencies(BlockDriverState *bs, uint64_t guest_offset,
if (bytes == 0) {
/* Wait for the dependency to complete. We need to recheck
* the free/allocated clusters when we continue. */
qemu_co_mutex_unlock(&s->lock);
qemu_co_queue_wait(&old_alloc->dependent_requests);
qemu_co_mutex_lock(&s->lock);
qemu_co_queue_wait(&old_alloc->dependent_requests, &s->lock);
return -EAGAIN;
}
}
......
......@@ -83,6 +83,7 @@ static void qed_find_cluster_cb(void *opaque, int ret)
unsigned int index;
unsigned int n;
qed_acquire(s);
if (ret) {
goto out;
}
......@@ -109,6 +110,7 @@ static void qed_find_cluster_cb(void *opaque, int ret)
out:
find_cluster_cb->cb(find_cluster_cb->opaque, ret, offset, len);
qed_release(s);
g_free(find_cluster_cb);
}
......
......@@ -31,6 +31,7 @@ static void qed_read_table_cb(void *opaque, int ret)
{
QEDReadTableCB *read_table_cb = opaque;
QEDTable *table = read_table_cb->table;
BDRVQEDState *s = read_table_cb->s;
int noffsets = read_table_cb->qiov.size / sizeof(uint64_t);
int i;
......@@ -40,13 +41,15 @@ static void qed_read_table_cb(void *opaque, int ret)
}
/* Byteswap offsets */
qed_acquire(s);
for (i = 0; i < noffsets; i++) {
table->offsets[i] = le64_to_cpu(table->offsets[i]);
}
qed_release(s);
out:
/* Completion */
trace_qed_read_table_cb(read_table_cb->s, read_table_cb->table, ret);
trace_qed_read_table_cb(s, read_table_cb->table, ret);
gencb_complete(&read_table_cb->gencb, ret);
}
......@@ -84,8 +87,9 @@ typedef struct {
static void qed_write_table_cb(void *opaque, int ret)
{
QEDWriteTableCB *write_table_cb = opaque;
BDRVQEDState *s = write_table_cb->s;
trace_qed_write_table_cb(write_table_cb->s,
trace_qed_write_table_cb(s,
write_table_cb->orig_table,
write_table_cb->flush,
ret);
......@@ -97,8 +101,10 @@ static void qed_write_table_cb(void *opaque, int ret)
if (write_table_cb->flush) {
/* We still need to flush first */
write_table_cb->flush = false;
qed_acquire(s);
bdrv_aio_flush(write_table_cb->s->bs, qed_write_table_cb,
write_table_cb);
qed_release(s);
return;
}
......@@ -213,6 +219,7 @@ static void qed_read_l2_table_cb(void *opaque, int ret)
CachedL2Table *l2_table = request->l2_table;
uint64_t l2_offset = read_l2_table_cb->l2_offset;
qed_acquire(s);
if (ret) {
/* can't trust loaded L2 table anymore */
qed_unref_l2_cache_entry(l2_table);
......@@ -228,6 +235,7 @@ static void qed_read_l2_table_cb(void *opaque, int ret)
request->l2_table = qed_find_l2_cache_entry(&s->l2_cache, l2_offset);
assert(request->l2_table != NULL);
}
qed_release(s);
gencb_complete(&read_l2_table_cb->gencb, ret);
}
......
......@@ -273,7 +273,19 @@ static CachedL2Table *qed_new_l2_table(BDRVQEDState *s)
return l2_table;
}
static void qed_aio_next_io(void *opaque, int ret);
static void qed_aio_next_io(QEDAIOCB *acb, int ret);
static void qed_aio_start_io(QEDAIOCB *acb)
{
qed_aio_next_io(acb, 0);
}
static void qed_aio_next_io_cb(void *opaque, int ret)
{
QEDAIOCB *acb = opaque;
qed_aio_next_io(acb, ret);
}
static void qed_plug_allocating_write_reqs(BDRVQEDState *s)
{
......@@ -292,7 +304,7 @@ static void qed_unplug_allocating_write_reqs(BDRVQEDState *s)
acb = QSIMPLEQ_FIRST(&s->allocating_write_reqs);
if (acb) {
qed_aio_next_io(acb, 0);
qed_aio_start_io(acb);
}
}
......@@ -333,10 +345,22 @@ static void qed_need_check_timer_cb(void *opaque)
trace_qed_need_check_timer_cb(s);
qed_acquire(s);
qed_plug_allocating_write_reqs(s);
/* Ensure writes are on disk before clearing flag */
bdrv_aio_flush(s->bs->file->bs, qed_clear_need_check, s);
qed_release(s);
}
void qed_acquire(BDRVQEDState *s)
{
aio_context_acquire(bdrv_get_aio_context(s->bs));
}
void qed_release(BDRVQEDState *s)
{
aio_context_release(bdrv_get_aio_context(s->bs));
}
static void qed_start_need_check_timer(BDRVQEDState *s)
......@@ -721,7 +745,7 @@ static void qed_is_allocated_cb(void *opaque, int ret, uint64_t offset, size_t l
}
if (cb->co) {
qemu_coroutine_enter(cb->co);
aio_co_wake(cb->co);
}
}
......@@ -918,6 +942,7 @@ static void qed_update_l2_table(BDRVQEDState *s, QEDTable *table, int index,
static void qed_aio_complete_bh(void *opaque)
{
QEDAIOCB *acb = opaque;
BDRVQEDState *s = acb_to_s(acb);
BlockCompletionFunc *cb = acb->common.cb;
void *user_opaque = acb->common.opaque;
int ret = acb->bh_ret;
......@@ -925,7 +950,9 @@ static void qed_aio_complete_bh(void *opaque)
qemu_aio_unref(acb);
/* Invoke callback */
qed_acquire(s);
cb(user_opaque, ret);
qed_release(s);
}
static void qed_aio_complete(QEDAIOCB *acb, int ret)
......@@ -959,7 +986,7 @@ static void qed_aio_complete(QEDAIOCB *acb, int ret)
QSIMPLEQ_REMOVE_HEAD(&s->allocating_write_reqs, next);
acb = QSIMPLEQ_FIRST(&s->allocating_write_reqs);
if (acb) {
qed_aio_next_io(acb, 0);
qed_aio_start_io(acb);
} else if (s->header.features & QED_F_NEED_CHECK) {
qed_start_need_check_timer(s);
}
......@@ -984,7 +1011,7 @@ static void qed_commit_l2_update(void *opaque, int ret)
acb->request.l2_table = qed_find_l2_cache_entry(&s->l2_cache, l2_offset);
assert(acb->request.l2_table != NULL);
qed_aio_next_io(opaque, ret);
qed_aio_next_io(acb, ret);
}
/**
......@@ -1032,11 +1059,11 @@ static void qed_aio_write_l2_update(QEDAIOCB *acb, int ret, uint64_t offset)
if (need_alloc) {
/* Write out the whole new L2 table */
qed_write_l2_table(s, &acb->request, 0, s->table_nelems, true,
qed_aio_write_l1_update, acb);
qed_aio_write_l1_update, acb);
} else {
/* Write out only the updated part of the L2 table */
qed_write_l2_table(s, &acb->request, index, acb->cur_nclusters, false,
qed_aio_next_io, acb);
qed_aio_next_io_cb, acb);
}
return;
......@@ -1088,7 +1115,7 @@ static void qed_aio_write_main(void *opaque, int ret)
}
if (acb->find_cluster_ret == QED_CLUSTER_FOUND) {
next_fn = qed_aio_next_io;
next_fn = qed_aio_next_io_cb;
} else {
if (s->bs->backing) {
next_fn = qed_aio_write_flush_before_l2_update;
......@@ -1201,7 +1228,7 @@ static void qed_aio_write_alloc(QEDAIOCB *acb, size_t len)
if (acb->flags & QED_AIOCB_ZERO) {
/* Skip ahead if the clusters are already zero */
if (acb->find_cluster_ret == QED_CLUSTER_ZERO) {
qed_aio_next_io(acb, 0);
qed_aio_start_io(acb);
return;
}
......@@ -1321,18 +1348,18 @@ static void qed_aio_read_data(void *opaque, int ret,
/* Handle zero cluster and backing file reads */
if (ret == QED_CLUSTER_ZERO) {
qemu_iovec_memset(&acb->cur_qiov, 0, 0, acb->cur_qiov.size);
qed_aio_next_io(acb, 0);
qed_aio_start_io(acb);
return;
} else if (ret != QED_CLUSTER_FOUND) {
qed_read_backing_file(s, acb->cur_pos, &acb->cur_qiov,
&acb->backing_qiov, qed_aio_next_io, acb);
&acb->backing_qiov, qed_aio_next_io_cb, acb);
return;
}
BLKDBG_EVENT(bs->file, BLKDBG_READ_AIO);
bdrv_aio_readv(bs->file, offset / BDRV_SECTOR_SIZE,
&acb->cur_qiov, acb->cur_qiov.size / BDRV_SECTOR_SIZE,
qed_aio_next_io, acb);
qed_aio_next_io_cb, acb);
return;
err:
......@@ -1342,9 +1369,8 @@ err:
/**
* Begin next I/O or complete the request
*/
static void qed_aio_next_io(void *opaque, int ret)
static void qed_aio_next_io(QEDAIOCB *acb, int ret)
{
QEDAIOCB *acb = opaque;
BDRVQEDState *s = acb_to_s(acb);
QEDFindClusterFunc *io_fn = (acb->flags & QED_AIOCB_WRITE) ?
qed_aio_write_data : qed_aio_read_data;
......@@ -1400,7 +1426,7 @@ static BlockAIOCB *qed_aio_setup(BlockDriverState *bs,
qemu_iovec_init(&acb->cur_qiov, qiov->niov);
/* Start request */
qed_aio_next_io(acb, 0);
qed_aio_start_io(acb);
return &acb->common;
}
......@@ -1436,7 +1462,7 @@ static void coroutine_fn qed_co_pwrite_zeroes_cb(void *opaque, int ret)
cb->done = true;
cb->ret = ret;
if (cb->co) {
qemu_coroutine_enter(cb->co);
aio_co_wake(cb->co);
}
}
......
......@@ -198,6 +198,9 @@ enum {
*/
typedef void QEDFindClusterFunc(void *opaque, int ret, uint64_t offset, size_t len);
void qed_acquire(BDRVQEDState *s);
void qed_release(BDRVQEDState *s);
/**
* Generic callback for chaining async callbacks
*/
......
......@@ -486,7 +486,7 @@ static void wait_for_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *acb)
retry:
QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
if (AIOCBOverlapping(acb, cb)) {
qemu_co_queue_wait(&s->overlapping_queue);
qemu_co_queue_wait(&s->overlapping_queue, NULL);
goto retry;
}
}
......@@ -575,13 +575,6 @@ static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
return ret;
}
static void restart_co_req(void *opaque)
{
Coroutine *co = opaque;
qemu_coroutine_enter(co);
}
typedef struct SheepdogReqCo {
int sockfd;
BlockDriverState *bs;
......@@ -592,12 +585,19 @@ typedef struct SheepdogReqCo {
unsigned int *rlen;
int ret;
bool finished;
Coroutine *co;
} SheepdogReqCo;
static void restart_co_req(void *opaque)
{
SheepdogReqCo *srco = opaque;
aio_co_wake(srco->co);
}
static coroutine_fn void do_co_req(void *opaque)
{
int ret;
Coroutine *co;
SheepdogReqCo *srco = opaque;
int sockfd = srco->sockfd;
SheepdogReq *hdr = srco->hdr;
......@@ -605,9 +605,9 @@ static coroutine_fn void do_co_req(void *opaque)
unsigned int *wlen = srco->wlen;
unsigned int *rlen = srco->rlen;
co = qemu_coroutine_self();
srco->co = qemu_coroutine_self();
aio_set_fd_handler(srco->aio_context, sockfd, false,
NULL, restart_co_req, NULL, co);
NULL, restart_co_req, NULL, srco);
ret = send_co_req(sockfd, hdr, data, wlen);
if (ret < 0) {
......@@ -615,7 +615,7 @@ static coroutine_fn void do_co_req(void *opaque)
}
aio_set_fd_handler(srco->aio_context, sockfd, false,
restart_co_req, NULL, NULL, co);
restart_co_req, NULL, NULL, srco);
ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
if (ret != sizeof(*hdr)) {
......@@ -643,6 +643,7 @@ out:
aio_set_fd_handler(srco->aio_context, sockfd, false,
NULL, NULL, NULL, NULL);
srco->co = NULL;
srco->ret = ret;
srco->finished = true;
if (srco->bs) {
......@@ -866,7 +867,7 @@ static void coroutine_fn aio_read_response(void *opaque)
* We've finished all requests which belong to the AIOCB, so
* we can switch back to sd_co_readv/writev now.
*/
qemu_coroutine_enter(acb->coroutine);
aio_co_wake(acb->coroutine);
}
return;
......@@ -883,14 +884,14 @@ static void co_read_response(void *opaque)
s->co_recv = qemu_coroutine_create(aio_read_response, opaque);
}
qemu_coroutine_enter(s->co_recv);
aio_co_wake(s->co_recv);
}
static void co_write_request(void *opaque)
{
BDRVSheepdogState *s = opaque;
qemu_coroutine_enter(s->co_send);
aio_co_wake(s->co_send);
}
/*
......
......@@ -889,10 +889,14 @@ static void restart_coroutine(void *opaque)
DPRINTF("co=%p", co);
qemu_coroutine_enter(co);
aio_co_wake(co);
}
static coroutine_fn void set_fd_handler(BDRVSSHState *s, BlockDriverState *bs)
/* A non-blocking call returned EAGAIN, so yield, ensuring the
* handlers are set up so that we'll be rescheduled when there is an
* interesting event on the socket.
*/
static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs)
{
int r;
IOHandler *rd_handler = NULL, *wr_handler = NULL;
......@@ -912,25 +916,10 @@ static coroutine_fn void set_fd_handler(BDRVSSHState *s, BlockDriverState *bs)
aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock,
false, rd_handler, wr_handler, NULL, co);
}
static coroutine_fn void clear_fd_handler(BDRVSSHState *s,
BlockDriverState *bs)
{
DPRINTF("s->sock=%d", s->sock);
aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock,
false, NULL, NULL, NULL, NULL);
}
/* A non-blocking call returned EAGAIN, so yield, ensuring the
* handlers are set up so that we'll be rescheduled when there is an
* interesting event on the socket.
*/
static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs)
{
set_fd_handler(s, bs);
qemu_coroutine_yield();
clear_fd_handler(s, bs);
DPRINTF("s->sock=%d - back", s->sock);
aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, false,
NULL, NULL, NULL, NULL);
}
/* SFTP has a function `libssh2_sftp_seek64' which seeks to a position
......
......@@ -326,7 +326,7 @@ void coroutine_fn throttle_group_co_io_limits_intercept(BlockBackend *blk,
if (must_wait || blkp->pending_reqs[is_write]) {
blkp->pending_reqs[is_write]++;
qemu_mutex_unlock(&tg->lock);
qemu_co_queue_wait(&blkp->throttled_reqs[is_write]);
qemu_co_queue_wait(&blkp->throttled_reqs[is_write], NULL);
qemu_mutex_lock(&tg->lock);
blkp->pending_reqs[is_write]--;
}
......@@ -416,7 +416,9 @@ static void timer_cb(BlockBackend *blk, bool is_write)
qemu_mutex_unlock(&tg->lock);
/* Run the request that was waiting for this timer */
aio_context_acquire(blk_get_aio_context(blk));
empty_queue = !qemu_co_enter_next(&blkp->throttled_reqs[is_write]);
aio_context_release(blk_get_aio_context(blk));
/* If the request queue was empty then we have to take care of
* scheduling the next one */
......
......@@ -41,7 +41,7 @@ struct QEMUWin32AIOState {
HANDLE hIOCP;
EventNotifier e;
int count;
bool is_aio_context_attached;
AioContext *aio_ctx;
};
typedef struct QEMUWin32AIOCB {
......@@ -87,7 +87,6 @@ static void win32_aio_process_completion(QEMUWin32AIOState *s,
qemu_vfree(waiocb->buf);
}
waiocb->common.cb(waiocb->common.opaque, ret);
qemu_aio_unref(waiocb);
}
......@@ -176,13 +175,13 @@ void win32_aio_detach_aio_context(QEMUWin32AIOState *aio,
AioContext *old_context)
{
aio_set_event_notifier(old_context, &aio->e, false, NULL, NULL);
aio->is_aio_context_attached = false;
aio->aio_ctx = NULL;
}
void win32_aio_attach_aio_context(QEMUWin32AIOState *aio,
AioContext *new_context)
{
aio->is_aio_context_attached = true;
aio->aio_ctx = new_context;
aio_set_event_notifier(new_context, &aio->e, false,
win32_aio_completion_cb, NULL);
}
......@@ -212,7 +211,7 @@ out_free_state:
void win32_aio_cleanup(QEMUWin32AIOState *aio)
{
assert(!aio->is_aio_context_attached);
assert(!aio->aio_ctx);
CloseHandle(aio->hIOCP);
event_notifier_cleanup(&aio->e);
g_free(aio);
......
......@@ -166,8 +166,10 @@ static void dma_blk_cb(void *opaque, int ret)
QEMU_ALIGN_DOWN(dbs->iov.size, dbs->align));
}
aio_context_acquire(dbs->ctx);
dbs->acb = dbs->io_func(dbs->offset, &dbs->iov,
dma_blk_cb, dbs, dbs->io_func_opaque);
aio_context_release(dbs->ctx);
assert(dbs->acb);
}
......
......@@ -2374,7 +2374,7 @@ static void coroutine_fn v9fs_flush(void *opaque)
/*
* Wait for pdu to complete.
*/
qemu_co_queue_wait(&cancel_pdu->complete);
qemu_co_queue_wait(&cancel_pdu->complete, NULL);
cancel_pdu->cancelled = 0;
pdu_free(cancel_pdu);
}
......
......@@ -89,7 +89,9 @@ static int virtio_blk_handle_rw_error(VirtIOBlockReq *req, int error,
static void virtio_blk_rw_complete(void *opaque, int ret)
{
VirtIOBlockReq *next = opaque;
VirtIOBlock *s = next->dev;
aio_context_acquire(blk_get_aio_context(s->conf.conf.blk));
while (next) {
VirtIOBlockReq *req = next;
next = req->mr_next;
......@@ -122,21 +124,27 @@ static void virtio_blk_rw_complete(void *opaque, int ret)
block_acct_done(blk_get_stats(req->dev->blk), &req->acct);
virtio_blk_free_request(req);
}
aio_context_release(blk_get_aio_context(s->conf.conf.blk));
}
static void virtio_blk_flush_complete(void *opaque, int ret)
{
VirtIOBlockReq *req = opaque;
VirtIOBlock *s = req->dev;
aio_context_acquire(blk_get_aio_context(s->conf.conf.blk));
if (ret) {
if (virtio_blk_handle_rw_error(req, -ret, 0)) {
return;
goto out;
}
}
virtio_blk_req_complete(req, VIRTIO_BLK_S_OK);
block_acct_done(blk_get_stats(req->dev->blk), &req->acct);
virtio_blk_free_request(req);
out:
aio_context_release(blk_get_aio_context(s->conf.conf.blk));
}
#ifdef __linux__
......@@ -150,7 +158,8 @@ static void virtio_blk_ioctl_complete(void *opaque, int status)
{
VirtIOBlockIoctlReq *ioctl_req = opaque;
VirtIOBlockReq *req = ioctl_req->req;
VirtIODevice *vdev = VIRTIO_DEVICE(req->dev);
VirtIOBlock *s = req->dev;
VirtIODevice *vdev = VIRTIO_DEVICE(s);
struct virtio_scsi_inhdr *scsi;
struct sg_io_hdr *hdr;
......@@ -182,8 +191,10 @@ static void virtio_blk_ioctl_complete(void *opaque, int status)
virtio_stl_p(vdev, &scsi->data_len, hdr->dxfer_len);
out:
aio_context_acquire(blk_get_aio_context(s->conf.conf.blk));
virtio_blk_req_complete(req, status);
virtio_blk_free_request(req);
aio_context_release(blk_get_aio_context(s->conf.conf.blk));
g_free(ioctl_req);
}
......@@ -587,6 +598,7 @@ bool virtio_blk_handle_vq(VirtIOBlock *s, VirtQueue *vq)
MultiReqBuffer mrb = {};
bool progress = false;
aio_context_acquire(blk_get_aio_context(s->blk));
blk_io_plug(s->blk);
do {
......@@ -609,6 +621,7 @@ bool virtio_blk_handle_vq(VirtIOBlock *s, VirtQueue *vq)
}
blk_io_unplug(s->blk);
aio_context_release(blk_get_aio_context(s->blk));
return progress;
}
......@@ -644,6 +657,7 @@ static void virtio_blk_dma_restart_bh(void *opaque)
s->rq = NULL;
aio_context_acquire(blk_get_aio_context(s->conf.conf.blk));
while (req) {
VirtIOBlockReq *next = req->next;
if (virtio_blk_handle_request(req, &mrb)) {
......@@ -664,6 +678,7 @@ static void virtio_blk_dma_restart_bh(void *opaque)
if (mrb.num_reqs) {
virtio_blk_submit_multireq(s->blk, &mrb);
}
aio_context_release(blk_get_aio_context(s->conf.conf.blk));
}
static void virtio_blk_dma_restart_cb(void *opaque, int running,
......
......@@ -105,6 +105,7 @@ static void scsi_dma_restart_bh(void *opaque)
qemu_bh_delete(s->bh);
s->bh = NULL;
aio_context_acquire(blk_get_aio_context(s->conf.blk));
QTAILQ_FOREACH_SAFE(req, &s->requests, next, next) {
scsi_req_ref(req);
if (req->retry) {
......@@ -122,6 +123,7 @@ static void scsi_dma_restart_bh(void *opaque)
}
scsi_req_unref(req);
}
aio_context_release(blk_get_aio_context(s->conf.blk));
}
void scsi_req_retry(SCSIRequest *req)
......
......@@ -207,6 +207,7 @@ static void scsi_aio_complete(void *opaque, int ret)
assert(r->req.aiocb != NULL);
r->req.aiocb = NULL;
aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
if (scsi_disk_req_check_error(r, ret, true)) {
goto done;
}
......@@ -215,6 +216,7 @@ static void scsi_aio_complete(void *opaque, int ret)
scsi_req_complete(&r->req, GOOD);
done:
aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
scsi_req_unref(&r->req);
}
......@@ -290,12 +292,14 @@ static void scsi_dma_complete(void *opaque, int ret)
assert(r->req.aiocb != NULL);
r->req.aiocb = NULL;
aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
if (ret < 0) {
block_acct_failed(blk_get_stats(s->qdev.conf.blk), &r->acct);
} else {
block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
}
scsi_dma_complete_noio(r, ret);
aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
}
static void scsi_read_complete(void * opaque, int ret)
......@@ -306,6 +310,7 @@ static void scsi_read_complete(void * opaque, int ret)
assert(r->req.aiocb != NULL);
r->req.aiocb = NULL;
aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
if (scsi_disk_req_check_error(r, ret, true)) {
goto done;
}
......@@ -320,6 +325,7 @@ static void scsi_read_complete(void * opaque, int ret)
done:
scsi_req_unref(&r->req);
aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
}
/* Actually issue a read to the block device. */
......@@ -364,12 +370,14 @@ static void scsi_do_read_cb(void *opaque, int ret)
assert (r->req.aiocb != NULL);
r->req.aiocb = NULL;
aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
if (ret < 0) {
block_acct_failed(blk_get_stats(s->qdev.conf.blk), &r->acct);
} else {
block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
}
scsi_do_read(opaque, ret);
aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
}
/* Read more data from scsi device into buffer. */
......@@ -489,12 +497,14 @@ static void scsi_write_complete(void * opaque, int ret)
assert (r->req.aiocb != NULL);
r->req.aiocb = NULL;
aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
if (ret < 0) {
block_acct_failed(blk_get_stats(s->qdev.conf.blk), &r->acct);
} else {
block_acct_done(blk_get_stats(s->qdev.conf.blk), &r->acct);
}
scsi_write_complete_noio(r, ret);
aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
}
static void scsi_write_data(SCSIRequest *req)
......@@ -1625,11 +1635,14 @@ static void scsi_unmap_complete(void *opaque, int ret)
{
UnmapCBData *data = opaque;
SCSIDiskReq *r = data->r;
SCSIDiskState *s = DO_UPCAST(SCSIDiskState, qdev, r->req.dev);
assert(r->req.aiocb != NULL);
r->req.aiocb = NULL;
aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
scsi_unmap_complete_noio(data, ret);
aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
}
static void scsi_disk_emulate_unmap(SCSIDiskReq *r, uint8_t *inbuf)
......@@ -1696,6 +1709,7 @@ static void scsi_write_same_complete(void *opaque, int ret)
assert(r->req.aiocb != NULL);
r->req.aiocb = NULL;
aio_context_acquire(blk_get_aio_context(s->qdev.conf.blk));
if (scsi_disk_req_check_error(r, ret, true)) {
goto done;
}
......@@ -1724,6 +1738,7 @@ done:
scsi_req_unref(&r->req);
qemu_vfree(data->iov.iov_base);
g_free(data);
aio_context_release(blk_get_aio_context(s->qdev.conf.blk));
}
static void scsi_disk_emulate_write_same(SCSIDiskReq *r, uint8_t *inbuf)
......
......@@ -143,10 +143,14 @@ done:
static void scsi_command_complete(void *opaque, int ret)
{
SCSIGenericReq *r = (SCSIGenericReq *)opaque;
SCSIDevice *s = r->req.dev;
assert(r->req.aiocb != NULL);
r->req.aiocb = NULL;
aio_context_acquire(blk_get_aio_context(s->conf.blk));
scsi_command_complete_noio(r, ret);
aio_context_release(blk_get_aio_context(s->conf.blk));
}
static int execute_command(BlockBackend *blk,
......@@ -182,9 +186,11 @@ static void scsi_read_complete(void * opaque, int ret)
assert(r->req.aiocb != NULL);
r->req.aiocb = NULL;
aio_context_acquire(blk_get_aio_context(s->conf.blk));
if (ret || r->req.io_canceled) {
scsi_command_complete_noio(r, ret);
return;
goto done;
}
len = r->io_header.dxfer_len - r->io_header.resid;
......@@ -193,7 +199,7 @@ static void scsi_read_complete(void * opaque, int ret)
r->len = -1;
if (len == 0) {
scsi_command_complete_noio(r, 0);
return;
goto done;
}
/* Snoop READ CAPACITY output to set the blocksize. */
......@@ -237,6 +243,9 @@ static void scsi_read_complete(void * opaque, int ret)
}
scsi_req_data(&r->req, len);
scsi_req_unref(&r->req);
done:
aio_context_release(blk_get_aio_context(s->conf.blk));
}
/* Read more data from scsi device into buffer. */
......@@ -272,9 +281,11 @@ static void scsi_write_complete(void * opaque, int ret)
assert(r->req.aiocb != NULL);
r->req.aiocb = NULL;
aio_context_acquire(blk_get_aio_context(s->conf.blk));
if (ret || r->req.io_canceled) {
scsi_command_complete_noio(r, ret);
return;
goto done;
}
if (r->req.cmd.buf[0] == MODE_SELECT && r->req.cmd.buf[4] == 12 &&
......@@ -284,6 +295,9 @@ static void scsi_write_complete(void * opaque, int ret)
}
scsi_command_complete_noio(r, ret);
done:
aio_context_release(blk_get_aio_context(s->conf.blk));
}
/* Write data to a scsi device. Returns nonzero on failure.
......
......@@ -441,10 +441,12 @@ bool virtio_scsi_handle_ctrl_vq(VirtIOSCSI *s, VirtQueue *vq)
VirtIOSCSIReq *req;
bool progress = false;
virtio_scsi_acquire(s);
while ((req = virtio_scsi_pop_req(s, vq))) {
progress = true;
virtio_scsi_handle_ctrl_req(s, req);
}
virtio_scsi_release(s);
return progress;
}
......@@ -602,6 +604,7 @@ bool virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq)
QTAILQ_HEAD(, VirtIOSCSIReq) reqs = QTAILQ_HEAD_INITIALIZER(reqs);
virtio_scsi_acquire(s);
do {
virtio_queue_set_notification(vq, 0);
......@@ -629,6 +632,7 @@ bool virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq)
QTAILQ_FOREACH_SAFE(req, &reqs, next, next) {
virtio_scsi_handle_cmd_req_submit(s, req);
}
virtio_scsi_release(s);
return progress;
}
......@@ -760,10 +764,13 @@ out:
bool virtio_scsi_handle_event_vq(VirtIOSCSI *s, VirtQueue *vq)
{
virtio_scsi_acquire(s);
if (s->events_dropped) {
virtio_scsi_push_event(s, NULL, VIRTIO_SCSI_T_NO_EVENT, 0);
virtio_scsi_release(s);
return true;
}
virtio_scsi_release(s);
return false;
}
......
......@@ -47,6 +47,7 @@ typedef void QEMUBHFunc(void *opaque);
typedef bool AioPollFn(void *opaque);
typedef void IOHandler(void *opaque);
struct Coroutine;
struct ThreadPool;
struct LinuxAioState;
......@@ -108,6 +109,9 @@ struct AioContext {
bool notified;
EventNotifier notifier;
QSLIST_HEAD(, Coroutine) scheduled_coroutines;
QEMUBH *co_schedule_bh;
/* Thread pool for performing work and receiving completion callbacks.
* Has its own locking.
*/
......@@ -306,12 +310,8 @@ bool aio_pending(AioContext *ctx);
/* Dispatch any pending callbacks from the GSource attached to the AioContext.
*
* This is used internally in the implementation of the GSource.
*
* @dispatch_fds: true to process fds, false to skip them
* (can be used as an optimization by callers that know there
* are no fds ready)
*/
bool aio_dispatch(AioContext *ctx, bool dispatch_fds);
void aio_dispatch(AioContext *ctx);
/* Progress in completing AIO work to occur. This can issue new pending
* aio as a result of executing I/O completion or bh callbacks.
......@@ -482,6 +482,34 @@ static inline bool aio_node_check(AioContext *ctx, bool is_external)
return !is_external || !atomic_read(&ctx->external_disable_cnt);
}
/**
* aio_co_schedule:
* @ctx: the aio context
* @co: the coroutine
*
* Start a coroutine on a remote AioContext.
*
* The coroutine must not be entered by anyone else while aio_co_schedule()
* is active. In addition the coroutine must have yielded unless ctx
* is the context in which the coroutine is running (i.e. the value of
* qemu_get_current_aio_context() from the coroutine itself).
*/
void aio_co_schedule(AioContext *ctx, struct Coroutine *co);
/**
* aio_co_wake:
* @co: the coroutine
*
* Restart a coroutine on the AioContext where it was running last, thus
* preventing coroutines from jumping from one context to another when they
* go to sleep.
*
* aio_co_wake may be executed either in coroutine or non-coroutine
* context. The coroutine must not be entered by anyone else while
* aio_co_wake() is active.
*/
void aio_co_wake(struct Coroutine *co);
/**
* Return the AioContext whose event loop runs in the current thread.
*
......
......@@ -430,8 +430,9 @@ struct BdrvChild {
* copied as well.
*/
struct BlockDriverState {
int64_t total_sectors; /* if we are reading a disk image, give its
size in sectors */
/* Protected by big QEMU lock or read-only after opening. No special
* locking needed during I/O...
*/
int open_flags; /* flags used to open the file, re-used for re-open */
bool read_only; /* if true, the media is read only */
bool encrypted; /* if true, the media is encrypted */
......@@ -439,14 +440,6 @@ struct BlockDriverState {
bool sg; /* if true, the device is a /dev/sg* */
bool probed; /* if true, format was probed rather than specified */
int copy_on_read; /* if nonzero, copy read backing sectors into image.
note this is a reference count */
CoQueue flush_queue; /* Serializing flush queue */
bool active_flush_req; /* Flush request in flight? */
unsigned int write_gen; /* Current data generation */
unsigned int flushed_gen; /* Flushed write generation */
BlockDriver *drv; /* NULL means no media */
void *opaque;
......@@ -468,18 +461,6 @@ struct BlockDriverState {
BdrvChild *backing;
BdrvChild *file;
/* Callback before write request is processed */
NotifierWithReturnList before_write_notifiers;
/* number of in-flight requests; overall and serialising */
unsigned int in_flight;
unsigned int serialising_in_flight;
bool wakeup;
/* Offset after the highest byte written to */
uint64_t wr_highest_offset;
/* I/O Limits */
BlockLimits bl;
......@@ -497,11 +478,8 @@ struct BlockDriverState {
QTAILQ_ENTRY(BlockDriverState) bs_list;
/* element of the list of monitor-owned BDS */
QTAILQ_ENTRY(BlockDriverState) monitor_list;
QLIST_HEAD(, BdrvDirtyBitmap) dirty_bitmaps;
int refcnt;
QLIST_HEAD(, BdrvTrackedRequest) tracked_requests;
/* operation blockers */
QLIST_HEAD(, BdrvOpBlocker) op_blockers[BLOCK_OP_TYPE_MAX];
......@@ -522,6 +500,31 @@ struct BlockDriverState {
/* The error object in use for blocking operations on backing_hd */
Error *backing_blocker;
/* Protected by AioContext lock */
/* If true, copy read backing sectors into image. Can be >1 if more
* than one client has requested copy-on-read.
*/
int copy_on_read;
/* If we are reading a disk image, give its size in sectors.
* Generally read-only; it is written to by load_vmstate and save_vmstate,
* but the block layer is quiescent during those.
*/
int64_t total_sectors;
/* Callback before write request is processed */
NotifierWithReturnList before_write_notifiers;
/* number of in-flight requests; overall and serialising */
unsigned int in_flight;
unsigned int serialising_in_flight;
bool wakeup;
/* Offset after the highest byte written to */
uint64_t wr_highest_offset;
/* threshold limit for writes, in bytes. "High water mark". */
uint64_t write_threshold_offset;
NotifierWithReturn write_threshold_notifier;
......@@ -529,6 +532,17 @@ struct BlockDriverState {
/* counter for nested bdrv_io_plug */
unsigned io_plugged;
QLIST_HEAD(, BdrvTrackedRequest) tracked_requests;
CoQueue flush_queue; /* Serializing flush queue */
bool active_flush_req; /* Flush request in flight? */
unsigned int write_gen; /* Current data generation */
unsigned int flushed_gen; /* Flushed write generation */
QLIST_HEAD(, BdrvDirtyBitmap) dirty_bitmaps;
/* do we need to tell the quest if we have a volatile write cache? */
int enable_write_cache;
int quiesce_counter;
};
......
......@@ -23,6 +23,8 @@
#include "qemu-common.h"
#include "qom/object.h"
#include "qemu/coroutine.h"
#include "block/aio.h"
#define TYPE_QIO_CHANNEL "qio-channel"
#define QIO_CHANNEL(obj) \
......@@ -80,6 +82,9 @@ struct QIOChannel {
Object parent;
unsigned int features; /* bitmask of QIOChannelFeatures */
char *name;
AioContext *ctx;
Coroutine *read_coroutine;
Coroutine *write_coroutine;
#ifdef _WIN32
HANDLE event; /* For use with GSource on Win32 */
#endif
......@@ -132,6 +137,11 @@ struct QIOChannelClass {
off_t offset,
int whence,
Error **errp);
void (*io_set_aio_fd_handler)(QIOChannel *ioc,
AioContext *ctx,
IOHandler *io_read,
IOHandler *io_write,
void *opaque);
};
/* General I/O handling functions */
......@@ -496,14 +506,51 @@ guint qio_channel_add_watch(QIOChannel *ioc,
GDestroyNotify notify);
/**
* qio_channel_attach_aio_context:
* @ioc: the channel object
* @ctx: the #AioContext to set the handlers on
*
* Request that qio_channel_yield() sets I/O handlers on
* the given #AioContext. If @ctx is %NULL, qio_channel_yield()
* uses QEMU's main thread event loop.
*
* You can move a #QIOChannel from one #AioContext to another even if
* I/O handlers are set for a coroutine. However, #QIOChannel provides
* no synchronization between the calls to qio_channel_yield() and
* qio_channel_attach_aio_context().
*
* Therefore you should first call qio_channel_detach_aio_context()
* to ensure that the coroutine is not entered concurrently. Then,
* while the coroutine has yielded, call qio_channel_attach_aio_context(),
* and then aio_co_schedule() to place the coroutine on the new
* #AioContext. The calls to qio_channel_detach_aio_context()
* and qio_channel_attach_aio_context() should be protected with
* aio_context_acquire() and aio_context_release().
*/
void qio_channel_attach_aio_context(QIOChannel *ioc,
AioContext *ctx);
/**
* qio_channel_detach_aio_context:
* @ioc: the channel object
*
* Disable any I/O handlers set by qio_channel_yield(). With the
* help of aio_co_schedule(), this allows moving a coroutine that was
* paused by qio_channel_yield() to another context.
*/
void qio_channel_detach_aio_context(QIOChannel *ioc);
/**
* qio_channel_yield:
* @ioc: the channel object
* @condition: the I/O condition to wait for
*
* Yields execution from the current coroutine until
* the condition indicated by @condition becomes
* available.
* Yields execution from the current coroutine until the condition
* indicated by @condition becomes available. @condition must
* be either %G_IO_IN or %G_IO_OUT; it cannot contain both. In
* addition, no two coroutine can be waiting on the same condition
* and channel at the same time.
*
* This must only be called from coroutine context
*/
......@@ -525,4 +572,23 @@ void qio_channel_yield(QIOChannel *ioc,
void qio_channel_wait(QIOChannel *ioc,
GIOCondition condition);
/**
* qio_channel_set_aio_fd_handler:
* @ioc: the channel object
* @ctx: the AioContext to set the handlers on
* @io_read: the read handler
* @io_write: the write handler
* @opaque: the opaque value passed to the handler
*
* This is used internally by qio_channel_yield(). It can
* be used by channel implementations to forward the handlers
* to another channel (e.g. from #QIOChannelTLS to the
* underlying socket).
*/
void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx,
IOHandler *io_read,
IOHandler *io_write,
void *opaque);
#endif /* QIO_CHANNEL_H */
......@@ -112,11 +112,56 @@ bool qemu_in_coroutine(void);
*/
bool qemu_coroutine_entered(Coroutine *co);
/**
* Provides a mutex that can be used to synchronise coroutines
*/
struct CoWaitRecord;
typedef struct CoMutex {
/* Count of pending lockers; 0 for a free mutex, 1 for an
* uncontended mutex.
*/
unsigned locked;
/* Context that is holding the lock. Useful to avoid spinning
* when two coroutines on the same AioContext try to get the lock. :)
*/
AioContext *ctx;
/* A queue of waiters. Elements are added atomically in front of
* from_push. to_pop is only populated, and popped from, by whoever
* is in charge of the next wakeup. This can be an unlocker or,
* through the handoff protocol, a locker that is about to go to sleep.
*/
QSLIST_HEAD(, CoWaitRecord) from_push, to_pop;
unsigned handoff, sequence;
Coroutine *holder;
} CoMutex;
/**
* Initialises a CoMutex. This must be called before any other operation is used
* on the CoMutex.
*/
void qemu_co_mutex_init(CoMutex *mutex);
/**
* Locks the mutex. If the lock cannot be taken immediately, control is
* transferred to the caller of the current coroutine.
*/
void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex);
/**
* Unlocks the mutex and schedules the next coroutine that was waiting for this
* lock to be run.
*/
void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex);
/**
* CoQueues are a mechanism to queue coroutines in order to continue executing
* them later. They provide the fundamental primitives on which coroutine locks
* are built.
* them later. They are similar to condition variables, but they need help
* from an external mutex in order to maintain thread-safety.
*/
typedef struct CoQueue {
QSIMPLEQ_HEAD(, Coroutine) entries;
......@@ -130,9 +175,10 @@ void qemu_co_queue_init(CoQueue *queue);
/**
* Adds the current coroutine to the CoQueue and transfers control to the
* caller of the coroutine.
* caller of the coroutine. The mutex is unlocked during the wait and
* locked again afterwards.
*/
void coroutine_fn qemu_co_queue_wait(CoQueue *queue);
void coroutine_fn qemu_co_queue_wait(CoQueue *queue, CoMutex *mutex);
/**
* Restarts the next coroutine in the CoQueue and removes it from the queue.
......@@ -157,36 +203,10 @@ bool qemu_co_enter_next(CoQueue *queue);
bool qemu_co_queue_empty(CoQueue *queue);
/**
* Provides a mutex that can be used to synchronise coroutines
*/
typedef struct CoMutex {
bool locked;
Coroutine *holder;
CoQueue queue;
} CoMutex;
/**
* Initialises a CoMutex. This must be called before any other operation is used
* on the CoMutex.
*/
void qemu_co_mutex_init(CoMutex *mutex);
/**
* Locks the mutex. If the lock cannot be taken immediately, control is
* transferred to the caller of the current coroutine.
*/
void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex);
/**
* Unlocks the mutex and schedules the next coroutine that was waiting for this
* lock to be run.
*/
void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex);
typedef struct CoRwlock {
bool writer;
int pending_writer;
int reader;
CoMutex mutex;
CoQueue queue;
} CoRwlock;
......
......@@ -40,12 +40,21 @@ struct Coroutine {
CoroutineEntry *entry;
void *entry_arg;
Coroutine *caller;
/* Only used when the coroutine has terminated. */
QSLIST_ENTRY(Coroutine) pool_next;
size_t locks_held;
/* Coroutines that should be woken up when we yield or terminate */
/* Coroutines that should be woken up when we yield or terminate.
* Only used when the coroutine is running.
*/
QSIMPLEQ_HEAD(, Coroutine) co_queue_wakeup;
/* Only used when the coroutine has yielded. */
AioContext *ctx;
QSIMPLEQ_ENTRY(Coroutine) co_queue_next;
QSLIST_ENTRY(Coroutine) co_scheduled_next;
};
Coroutine *qemu_coroutine_new(void);
......
......@@ -64,14 +64,20 @@ typedef struct BlockDevOps {
* fields that must be public. This is in particular for QLIST_ENTRY() and
* friends so that BlockBackends can be kept in lists outside block-backend.c */
typedef struct BlockBackendPublic {
/* I/O throttling.
* throttle_state tells us if this BlockBackend has I/O limits configured.
* io_limits_disabled tells us if they are currently being enforced */
/* I/O throttling has its own locking, but also some fields are
* protected by the AioContext lock.
*/
/* Protected by AioContext lock. */
CoQueue throttled_reqs[2];
/* Nonzero if the I/O limits are currently being ignored; generally
* it is zero. */
unsigned int io_limits_disabled;
/* The following fields are protected by the ThrottleGroup lock.
* See the ThrottleGroup documentation for details. */
* See the ThrottleGroup documentation for details.
* throttle_state tells us if I/O limits are configured. */
ThrottleState *throttle_state;
ThrottleTimers throttle_timers;
unsigned pending_reqs[2];
......
......@@ -328,6 +328,18 @@ static int qio_channel_command_close(QIOChannel *ioc,
}
static void qio_channel_command_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx,
IOHandler *io_read,
IOHandler *io_write,
void *opaque)
{
QIOChannelCommand *cioc = QIO_CHANNEL_COMMAND(ioc);
aio_set_fd_handler(ctx, cioc->readfd, false, io_read, NULL, NULL, opaque);
aio_set_fd_handler(ctx, cioc->writefd, false, NULL, io_write, NULL, opaque);
}
static GSource *qio_channel_command_create_watch(QIOChannel *ioc,
GIOCondition condition)
{
......@@ -349,6 +361,7 @@ static void qio_channel_command_class_init(ObjectClass *klass,
ioc_klass->io_set_blocking = qio_channel_command_set_blocking;
ioc_klass->io_close = qio_channel_command_close;
ioc_klass->io_create_watch = qio_channel_command_create_watch;
ioc_klass->io_set_aio_fd_handler = qio_channel_command_set_aio_fd_handler;
}
static const TypeInfo qio_channel_command_info = {
......
......@@ -186,6 +186,16 @@ static int qio_channel_file_close(QIOChannel *ioc,
}
static void qio_channel_file_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx,
IOHandler *io_read,
IOHandler *io_write,
void *opaque)
{
QIOChannelFile *fioc = QIO_CHANNEL_FILE(ioc);
aio_set_fd_handler(ctx, fioc->fd, false, io_read, io_write, NULL, opaque);
}
static GSource *qio_channel_file_create_watch(QIOChannel *ioc,
GIOCondition condition)
{
......@@ -206,6 +216,7 @@ static void qio_channel_file_class_init(ObjectClass *klass,
ioc_klass->io_seek = qio_channel_file_seek;
ioc_klass->io_close = qio_channel_file_close;
ioc_klass->io_create_watch = qio_channel_file_create_watch;
ioc_klass->io_set_aio_fd_handler = qio_channel_file_set_aio_fd_handler;
}
static const TypeInfo qio_channel_file_info = {
......
......@@ -649,11 +649,6 @@ qio_channel_socket_set_blocking(QIOChannel *ioc,
qemu_set_block(sioc->fd);
} else {
qemu_set_nonblock(sioc->fd);
#ifdef WIN32
WSAEventSelect(sioc->fd, ioc->event,
FD_READ | FD_ACCEPT | FD_CLOSE |
FD_CONNECT | FD_WRITE | FD_OOB);
#endif
}
return 0;
}
......@@ -733,6 +728,16 @@ qio_channel_socket_shutdown(QIOChannel *ioc,
return 0;
}
static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx,
IOHandler *io_read,
IOHandler *io_write,
void *opaque)
{
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
aio_set_fd_handler(ctx, sioc->fd, false, io_read, io_write, NULL, opaque);
}
static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
GIOCondition condition)
{
......@@ -755,6 +760,7 @@ static void qio_channel_socket_class_init(ObjectClass *klass,
ioc_klass->io_set_cork = qio_channel_socket_set_cork;
ioc_klass->io_set_delay = qio_channel_socket_set_delay;
ioc_klass->io_create_watch = qio_channel_socket_create_watch;
ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
}
static const TypeInfo qio_channel_socket_info = {
......
......@@ -345,6 +345,17 @@ static int qio_channel_tls_close(QIOChannel *ioc,
return qio_channel_close(tioc->master, errp);
}
static void qio_channel_tls_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx,
IOHandler *io_read,
IOHandler *io_write,
void *opaque)
{
QIOChannelTLS *tioc = QIO_CHANNEL_TLS(ioc);
qio_channel_set_aio_fd_handler(tioc->master, ctx, io_read, io_write, opaque);
}
static GSource *qio_channel_tls_create_watch(QIOChannel *ioc,
GIOCondition condition)
{
......@@ -372,6 +383,7 @@ static void qio_channel_tls_class_init(ObjectClass *klass,
ioc_klass->io_close = qio_channel_tls_close;
ioc_klass->io_shutdown = qio_channel_tls_shutdown;
ioc_klass->io_create_watch = qio_channel_tls_create_watch;
ioc_klass->io_set_aio_fd_handler = qio_channel_tls_set_aio_fd_handler;
}
static const TypeInfo qio_channel_tls_info = {
......
......@@ -285,6 +285,12 @@ GSource *qio_channel_create_socket_watch(QIOChannel *ioc,
GSource *source;
QIOChannelSocketSource *ssource;
#ifdef WIN32
WSAEventSelect(socket, ioc->event,
FD_READ | FD_ACCEPT | FD_CLOSE |
FD_CONNECT | FD_WRITE | FD_OOB);
#endif
source = g_source_new(&qio_channel_socket_source_funcs,
sizeof(QIOChannelSocketSource));
ssource = (QIOChannelSocketSource *)source;
......
......@@ -21,7 +21,7 @@
#include "qemu/osdep.h"
#include "io/channel.h"
#include "qapi/error.h"
#include "qemu/coroutine.h"
#include "qemu/main-loop.h"
bool qio_channel_has_feature(QIOChannel *ioc,
QIOChannelFeature feature)
......@@ -154,6 +154,17 @@ GSource *qio_channel_create_watch(QIOChannel *ioc,
}
void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
AioContext *ctx,
IOHandler *io_read,
IOHandler *io_write,
void *opaque)
{
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
}
guint qio_channel_add_watch(QIOChannel *ioc,
GIOCondition condition,
QIOChannelFunc func,
......@@ -227,36 +238,80 @@ off_t qio_channel_io_seek(QIOChannel *ioc,
}
typedef struct QIOChannelYieldData QIOChannelYieldData;
struct QIOChannelYieldData {
QIOChannel *ioc;
Coroutine *co;
};
static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc);
static void qio_channel_restart_read(void *opaque)
{
QIOChannel *ioc = opaque;
Coroutine *co = ioc->read_coroutine;
ioc->read_coroutine = NULL;
qio_channel_set_aio_fd_handlers(ioc);
aio_co_wake(co);
}
static gboolean qio_channel_yield_enter(QIOChannel *ioc,
GIOCondition condition,
gpointer opaque)
static void qio_channel_restart_write(void *opaque)
{
QIOChannelYieldData *data = opaque;
qemu_coroutine_enter(data->co);
return FALSE;
QIOChannel *ioc = opaque;
Coroutine *co = ioc->write_coroutine;
ioc->write_coroutine = NULL;
qio_channel_set_aio_fd_handlers(ioc);
aio_co_wake(co);
}
static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
{
IOHandler *rd_handler = NULL, *wr_handler = NULL;
AioContext *ctx;
if (ioc->read_coroutine) {
rd_handler = qio_channel_restart_read;
}
if (ioc->write_coroutine) {
wr_handler = qio_channel_restart_write;
}
ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
}
void qio_channel_attach_aio_context(QIOChannel *ioc,
AioContext *ctx)
{
AioContext *old_ctx;
if (ioc->ctx == ctx) {
return;
}
old_ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
qio_channel_set_aio_fd_handler(ioc, old_ctx, NULL, NULL, NULL);
ioc->ctx = ctx;
qio_channel_set_aio_fd_handlers(ioc);
}
void qio_channel_detach_aio_context(QIOChannel *ioc)
{
ioc->read_coroutine = NULL;
ioc->write_coroutine = NULL;
qio_channel_set_aio_fd_handlers(ioc);
ioc->ctx = NULL;
}
void coroutine_fn qio_channel_yield(QIOChannel *ioc,
GIOCondition condition)
{
QIOChannelYieldData data;
assert(qemu_in_coroutine());
data.ioc = ioc;
data.co = qemu_coroutine_self();
qio_channel_add_watch(ioc,
condition,
qio_channel_yield_enter,
&data,
NULL);
if (condition == G_IO_IN) {
assert(!ioc->read_coroutine);
ioc->read_coroutine = qemu_coroutine_self();
} else if (condition == G_IO_OUT) {
assert(!ioc->write_coroutine);
ioc->write_coroutine = qemu_coroutine_self();
} else {
abort();
}
qio_channel_set_aio_fd_handlers(ioc);
qemu_coroutine_yield();
}
......
......@@ -778,7 +778,7 @@ ssize_t nbd_receive_reply(QIOChannel *ioc, NBDReply *reply)
ssize_t ret;
ret = read_sync(ioc, buf, sizeof(buf));
if (ret < 0) {
if (ret <= 0) {
return ret;
}
......
......@@ -43,14 +43,7 @@ ssize_t nbd_wr_syncv(QIOChannel *ioc,
}
if (len == QIO_CHANNEL_ERR_BLOCK) {
if (qemu_in_coroutine()) {
/* XXX figure out if we can create a variant on
* qio_channel_yield() that works with AIO contexts
* and consider using that in this branch */
qemu_coroutine_yield();
} else if (done) {
/* XXX this is needed by nbd_reply_ready. */
qio_channel_wait(ioc,
do_read ? G_IO_IN : G_IO_OUT);
qio_channel_yield(ioc, do_read ? G_IO_IN : G_IO_OUT);
} else {
return -EAGAIN;
}
......
......@@ -95,8 +95,6 @@ struct NBDClient {
CoMutex send_lock;
Coroutine *send_coroutine;
bool can_read;
QTAILQ_ENTRY(NBDClient) next;
int nb_requests;
bool closing;
......@@ -104,9 +102,7 @@ struct NBDClient {
/* That's all folks */
static void nbd_set_handlers(NBDClient *client);
static void nbd_unset_handlers(NBDClient *client);
static void nbd_update_can_read(NBDClient *client);
static void nbd_client_receive_next_request(NBDClient *client);
static gboolean nbd_negotiate_continue(QIOChannel *ioc,
GIOCondition condition,
......@@ -785,7 +781,7 @@ void nbd_client_put(NBDClient *client)
*/
assert(client->closing);
nbd_unset_handlers(client);
qio_channel_detach_aio_context(client->ioc);
object_unref(OBJECT(client->sioc));
object_unref(OBJECT(client->ioc));
if (client->tlscreds) {
......@@ -826,7 +822,6 @@ static NBDRequestData *nbd_request_get(NBDClient *client)
assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
client->nb_requests++;
nbd_update_can_read(client);
req = g_new0(NBDRequestData, 1);
nbd_client_get(client);
......@@ -844,7 +839,8 @@ static void nbd_request_put(NBDRequestData *req)
g_free(req);
client->nb_requests--;
nbd_update_can_read(client);
nbd_client_receive_next_request(client);
nbd_client_put(client);
}
......@@ -858,7 +854,13 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
exp->ctx = ctx;
QTAILQ_FOREACH(client, &exp->clients, next) {
nbd_set_handlers(client);
qio_channel_attach_aio_context(client->ioc, ctx);
if (client->recv_coroutine) {
aio_co_schedule(ctx, client->recv_coroutine);
}
if (client->send_coroutine) {
aio_co_schedule(ctx, client->send_coroutine);
}
}
}
......@@ -870,7 +872,7 @@ static void blk_aio_detach(void *opaque)
TRACE("Export %s: Detaching clients from AIO context %p\n", exp->name, exp->ctx);
QTAILQ_FOREACH(client, &exp->clients, next) {
nbd_unset_handlers(client);
qio_channel_detach_aio_context(client->ioc);
}
exp->ctx = NULL;
......@@ -1045,7 +1047,6 @@ static ssize_t nbd_co_send_reply(NBDRequestData *req, NBDReply *reply,
g_assert(qemu_in_coroutine());
qemu_co_mutex_lock(&client->send_lock);
client->send_coroutine = qemu_coroutine_self();
nbd_set_handlers(client);
if (!len) {
rc = nbd_send_reply(client->ioc, reply);
......@@ -1062,7 +1063,6 @@ static ssize_t nbd_co_send_reply(NBDRequestData *req, NBDReply *reply,
}
client->send_coroutine = NULL;
nbd_set_handlers(client);
qemu_co_mutex_unlock(&client->send_lock);
return rc;
}
......@@ -1079,9 +1079,7 @@ static ssize_t nbd_co_receive_request(NBDRequestData *req,
ssize_t rc;
g_assert(qemu_in_coroutine());
client->recv_coroutine = qemu_coroutine_self();
nbd_update_can_read(client);
assert(client->recv_coroutine == qemu_coroutine_self());
rc = nbd_receive_request(client->ioc, request);
if (rc < 0) {
if (rc != -EAGAIN) {
......@@ -1163,23 +1161,25 @@ static ssize_t nbd_co_receive_request(NBDRequestData *req,
out:
client->recv_coroutine = NULL;
nbd_update_can_read(client);
nbd_client_receive_next_request(client);
return rc;
}
static void nbd_trip(void *opaque)
/* Owns a reference to the NBDClient passed as opaque. */
static coroutine_fn void nbd_trip(void *opaque)
{
NBDClient *client = opaque;
NBDExport *exp = client->exp;
NBDRequestData *req;
NBDRequest request;
NBDRequest request = { 0 }; /* GCC thinks it can be used uninitialized */
NBDReply reply;
ssize_t ret;
int flags;
TRACE("Reading request.");
if (client->closing) {
nbd_client_put(client);
return;
}
......@@ -1338,60 +1338,21 @@ static void nbd_trip(void *opaque)
done:
nbd_request_put(req);
nbd_client_put(client);
return;
out:
nbd_request_put(req);
client_close(client);
nbd_client_put(client);
}
static void nbd_read(void *opaque)
{
NBDClient *client = opaque;
if (client->recv_coroutine) {
qemu_coroutine_enter(client->recv_coroutine);
} else {
qemu_coroutine_enter(qemu_coroutine_create(nbd_trip, client));
}
}
static void nbd_restart_write(void *opaque)
{
NBDClient *client = opaque;
qemu_coroutine_enter(client->send_coroutine);
}
static void nbd_set_handlers(NBDClient *client)
{
if (client->exp && client->exp->ctx) {
aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true,
client->can_read ? nbd_read : NULL,
client->send_coroutine ? nbd_restart_write : NULL,
NULL, client);
}
}
static void nbd_unset_handlers(NBDClient *client)
{
if (client->exp && client->exp->ctx) {
aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true, NULL,
NULL, NULL, NULL);
}
}
static void nbd_update_can_read(NBDClient *client)
static void nbd_client_receive_next_request(NBDClient *client)
{
bool can_read = client->recv_coroutine ||
client->nb_requests < MAX_NBD_REQUESTS;
if (can_read != client->can_read) {
client->can_read = can_read;
nbd_set_handlers(client);
/* There is no need to invoke aio_notify(), since aio_set_fd_handler()
* in nbd_set_handlers() will have taken care of that */
if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
nbd_client_get(client);
client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
aio_co_schedule(client->exp->ctx, client->recv_coroutine);
}
}
......@@ -1409,11 +1370,13 @@ static coroutine_fn void nbd_co_client_start(void *opaque)
goto out;
}
qemu_co_mutex_init(&client->send_lock);
nbd_set_handlers(client);
if (exp) {
QTAILQ_INSERT_TAIL(&exp->clients, client, next);
}
nbd_client_receive_next_request(client);
out:
g_free(data);
}
......@@ -1439,7 +1402,6 @@ void nbd_client_new(NBDExport *exp,
object_ref(OBJECT(client->sioc));
client->ioc = QIO_CHANNEL(sioc);
object_ref(OBJECT(client->ioc));
client->can_read = true;
client->close = close_fn;
data->client = client;
......
......@@ -16,6 +16,7 @@ stub-obj-y += get-vm-name.o
stub-obj-y += iothread.o
stub-obj-y += iothread-lock.o
stub-obj-y += is-daemonized.o
stub-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
stub-obj-y += machine-init-done.o
stub-obj-y += migr-blocker.o
stub-obj-y += monitor.o
......
/*
* Linux native AIO support.
*
* Copyright (C) 2009 IBM, Corp.
* Copyright (C) 2009 Red Hat, Inc.
*
* This work is licensed under the terms of the GNU GPL, version 2 or later.
* See the COPYING file in the top-level directory.
*/
#include "qemu/osdep.h"
#include "block/aio.h"
#include "block/raw-aio.h"
void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
{
abort();
}
void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
{
abort();
}
LinuxAioState *laio_init(void)
{
abort();
}
void laio_cleanup(LinuxAioState *s)
{
abort();
}
......@@ -9,14 +9,3 @@ void qemu_set_fd_handler(int fd,
{
abort();
}
void aio_set_fd_handler(AioContext *ctx,
int fd,
bool is_external,
IOHandler *io_read,
IOHandler *io_write,
AioPollFn *io_poll,
void *opaque)
{
abort();
}
......@@ -45,9 +45,13 @@ check-unit-y += tests/test-visitor-serialization$(EXESUF)
check-unit-y += tests/test-iov$(EXESUF)
gcov-files-test-iov-y = util/iov.c
check-unit-y += tests/test-aio$(EXESUF)
gcov-files-test-aio-y = util/async.c util/qemu-timer.o
gcov-files-test-aio-$(CONFIG_WIN32) += util/aio-win32.c
gcov-files-test-aio-$(CONFIG_POSIX) += util/aio-posix.c
check-unit-y += tests/test-aio-multithread$(EXESUF)
gcov-files-test-aio-multithread-y = $(gcov-files-test-aio-y)
gcov-files-test-aio-multithread-y += util/qemu-coroutine.c tests/iothread.c
check-unit-y += tests/test-throttle$(EXESUF)
gcov-files-test-aio-$(CONFIG_WIN32) = aio-win32.c
gcov-files-test-aio-$(CONFIG_POSIX) = aio-posix.c
check-unit-y += tests/test-thread-pool$(EXESUF)
gcov-files-test-thread-pool-y = thread-pool.c
gcov-files-test-hbitmap-y = util/hbitmap.c
......@@ -505,7 +509,7 @@ test-qapi-obj-y = tests/test-qapi-visit.o tests/test-qapi-types.o \
$(test-qom-obj-y)
test-crypto-obj-y = $(crypto-obj-y) $(test-qom-obj-y)
test-io-obj-y = $(io-obj-y) $(test-crypto-obj-y)
test-block-obj-y = $(block-obj-y) $(test-io-obj-y)
test-block-obj-y = $(block-obj-y) $(test-io-obj-y) tests/iothread.o
tests/check-qint$(EXESUF): tests/check-qint.o $(test-util-obj-y)
tests/check-qstring$(EXESUF): tests/check-qstring.o $(test-util-obj-y)
......@@ -517,10 +521,10 @@ tests/check-qjson$(EXESUF): tests/check-qjson.o $(test-util-obj-y)
tests/check-qom-interface$(EXESUF): tests/check-qom-interface.o $(test-qom-obj-y)
tests/check-qom-proplist$(EXESUF): tests/check-qom-proplist.o $(test-qom-obj-y)
tests/test-char$(EXESUF): tests/test-char.o qemu-timer.o \
$(test-util-obj-y) $(qtest-obj-y) $(test-block-obj-y) $(chardev-obj-y)
tests/test-char$(EXESUF): tests/test-char.o $(test-util-obj-y) $(qtest-obj-y) $(test-io-obj-y) $(chardev-obj-y)
tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(test-block-obj-y)
tests/test-aio$(EXESUF): tests/test-aio.o $(test-block-obj-y)
tests/test-aio-multithread$(EXESUF): tests/test-aio-multithread.o $(test-block-obj-y)
tests/test-throttle$(EXESUF): tests/test-throttle.o $(test-block-obj-y)
tests/test-blockjob$(EXESUF): tests/test-blockjob.o $(test-block-obj-y) $(test-util-obj-y)
tests/test-blockjob-txn$(EXESUF): tests/test-blockjob-txn.o $(test-block-obj-y) $(test-util-obj-y)
......@@ -551,8 +555,7 @@ tests/test-vmstate$(EXESUF): tests/test-vmstate.o \
migration/vmstate.o migration/qemu-file.o \
migration/qemu-file-channel.o migration/qjson.o \
$(test-io-obj-y)
tests/test-timed-average$(EXESUF): tests/test-timed-average.o qemu-timer.o \
$(test-util-obj-y)
tests/test-timed-average$(EXESUF): tests/test-timed-average.o $(test-util-obj-y)
tests/test-base64$(EXESUF): tests/test-base64.o \
libqemuutil.a libqemustub.a
tests/ptimer-test$(EXESUF): tests/ptimer-test.o tests/ptimer-test-stubs.o hw/core/ptimer.o libqemustub.a
......@@ -712,7 +715,7 @@ tests/usb-hcd-ehci-test$(EXESUF): tests/usb-hcd-ehci-test.o $(libqos-usb-obj-y)
tests/usb-hcd-xhci-test$(EXESUF): tests/usb-hcd-xhci-test.o $(libqos-usb-obj-y)
tests/pc-cpu-test$(EXESUF): tests/pc-cpu-test.o
tests/postcopy-test$(EXESUF): tests/postcopy-test.o
tests/vhost-user-test$(EXESUF): tests/vhost-user-test.o qemu-timer.o \
tests/vhost-user-test$(EXESUF): tests/vhost-user-test.o $(test-util-obj-y) \
$(qtest-obj-y) $(test-io-obj-y) $(libqos-virtio-obj-y) $(libqos-pc-obj-y) \
$(chardev-obj-y)
tests/qemu-iotests/socket_scm_helper$(EXESUF): tests/qemu-iotests/socket_scm_helper.o
......
/*
* Event loop thread implementation for unit tests
*
* Copyright Red Hat Inc., 2013, 2016
*
* Authors:
* Stefan Hajnoczi <stefanha@redhat.com>
* Paolo Bonzini <pbonzini@redhat.com>
*
* This work is licensed under the terms of the GNU GPL, version 2 or later.
* See the COPYING file in the top-level directory.
*
*/
#include "qemu/osdep.h"
#include "qapi/error.h"
#include "block/aio.h"
#include "qemu/main-loop.h"
#include "qemu/rcu.h"
#include "iothread.h"
struct IOThread {
AioContext *ctx;
QemuThread thread;
QemuMutex init_done_lock;
QemuCond init_done_cond; /* is thread initialization done? */
bool stopping;
};
static __thread IOThread *my_iothread;
AioContext *qemu_get_current_aio_context(void)
{
return my_iothread ? my_iothread->ctx : qemu_get_aio_context();
}
static void *iothread_run(void *opaque)
{
IOThread *iothread = opaque;
rcu_register_thread();
my_iothread = iothread;
qemu_mutex_lock(&iothread->init_done_lock);
iothread->ctx = aio_context_new(&error_abort);
qemu_cond_signal(&iothread->init_done_cond);
qemu_mutex_unlock(&iothread->init_done_lock);
while (!atomic_read(&iothread->stopping)) {
aio_poll(iothread->ctx, true);
}
rcu_unregister_thread();
return NULL;
}
void iothread_join(IOThread *iothread)
{
iothread->stopping = true;
aio_notify(iothread->ctx);
qemu_thread_join(&iothread->thread);
qemu_cond_destroy(&iothread->init_done_cond);
qemu_mutex_destroy(&iothread->init_done_lock);
aio_context_unref(iothread->ctx);
g_free(iothread);
}
IOThread *iothread_new(void)
{
IOThread *iothread = g_new0(IOThread, 1);
qemu_mutex_init(&iothread->init_done_lock);
qemu_cond_init(&iothread->init_done_cond);
qemu_thread_create(&iothread->thread, NULL, iothread_run,
iothread, QEMU_THREAD_JOINABLE);
/* Wait for initialization to complete */
qemu_mutex_lock(&iothread->init_done_lock);
while (iothread->ctx == NULL) {
qemu_cond_wait(&iothread->init_done_cond,
&iothread->init_done_lock);
}
qemu_mutex_unlock(&iothread->init_done_lock);
return iothread;
}
AioContext *iothread_get_aio_context(IOThread *iothread)
{
return iothread->ctx;
}
/*
* Event loop thread implementation for unit tests
*
* Copyright Red Hat Inc., 2013, 2016
*
* Authors:
* Stefan Hajnoczi <stefanha@redhat.com>
* Paolo Bonzini <pbonzini@redhat.com>
*
* This work is licensed under the terms of the GNU GPL, version 2 or later.
* See the COPYING file in the top-level directory.
*/
#ifndef TEST_IOTHREAD_H
#define TEST_IOTHREAD_H
#include "block/aio.h"
#include "qemu/thread.h"
typedef struct IOThread IOThread;
IOThread *iothread_new(void);
void iothread_join(IOThread *iothread);
AioContext *iothread_get_aio_context(IOThread *iothread);
#endif
/*
* AioContext multithreading tests
*
* Copyright Red Hat, Inc. 2016
*
* Authors:
* Paolo Bonzini <pbonzini@redhat.com>
*
* This work is licensed under the terms of the GNU LGPL, version 2 or later.
* See the COPYING.LIB file in the top-level directory.
*/
#include "qemu/osdep.h"
#include <glib.h>
#include "block/aio.h"
#include "qapi/error.h"
#include "qemu/coroutine.h"
#include "qemu/thread.h"
#include "qemu/error-report.h"
#include "iothread.h"
/* AioContext management */
#define NUM_CONTEXTS 5
static IOThread *threads[NUM_CONTEXTS];
static AioContext *ctx[NUM_CONTEXTS];
static __thread int id = -1;
static QemuEvent done_event;
/* Run a function synchronously on a remote iothread. */
typedef struct CtxRunData {
QEMUBHFunc *cb;
void *arg;
} CtxRunData;
static void ctx_run_bh_cb(void *opaque)
{
CtxRunData *data = opaque;
data->cb(data->arg);
qemu_event_set(&done_event);
}
static void ctx_run(int i, QEMUBHFunc *cb, void *opaque)
{
CtxRunData data = {
.cb = cb,
.arg = opaque
};
qemu_event_reset(&done_event);
aio_bh_schedule_oneshot(ctx[i], ctx_run_bh_cb, &data);
qemu_event_wait(&done_event);
}
/* Starting the iothreads. */
static void set_id_cb(void *opaque)
{
int *i = opaque;
id = *i;
}
static void create_aio_contexts(void)
{
int i;
for (i = 0; i < NUM_CONTEXTS; i++) {
threads[i] = iothread_new();
ctx[i] = iothread_get_aio_context(threads[i]);
}
qemu_event_init(&done_event, false);
for (i = 0; i < NUM_CONTEXTS; i++) {
ctx_run(i, set_id_cb, &i);
}
}
/* Stopping the iothreads. */
static void join_aio_contexts(void)
{
int i;
for (i = 0; i < NUM_CONTEXTS; i++) {
aio_context_ref(ctx[i]);
}
for (i = 0; i < NUM_CONTEXTS; i++) {
iothread_join(threads[i]);
}
for (i = 0; i < NUM_CONTEXTS; i++) {
aio_context_unref(ctx[i]);
}
qemu_event_destroy(&done_event);
}
/* Basic test for the stuff above. */
static void test_lifecycle(void)
{
create_aio_contexts();
join_aio_contexts();
}
/* aio_co_schedule test. */
static Coroutine *to_schedule[NUM_CONTEXTS];
static bool now_stopping;
static int count_retry;
static int count_here;
static int count_other;
static bool schedule_next(int n)
{
Coroutine *co;
co = atomic_xchg(&to_schedule[n], NULL);
if (!co) {
atomic_inc(&count_retry);
return false;
}
if (n == id) {
atomic_inc(&count_here);
} else {
atomic_inc(&count_other);
}
aio_co_schedule(ctx[n], co);
return true;
}
static void finish_cb(void *opaque)
{
schedule_next(id);
}
static coroutine_fn void test_multi_co_schedule_entry(void *opaque)
{
g_assert(to_schedule[id] == NULL);
atomic_mb_set(&to_schedule[id], qemu_coroutine_self());
while (!atomic_mb_read(&now_stopping)) {
int n;
n = g_test_rand_int_range(0, NUM_CONTEXTS);
schedule_next(n);
qemu_coroutine_yield();
g_assert(to_schedule[id] == NULL);
atomic_mb_set(&to_schedule[id], qemu_coroutine_self());
}
}
static void test_multi_co_schedule(int seconds)
{
int i;
count_here = count_other = count_retry = 0;
now_stopping = false;
create_aio_contexts();
for (i = 0; i < NUM_CONTEXTS; i++) {
Coroutine *co1 = qemu_coroutine_create(test_multi_co_schedule_entry, NULL);
aio_co_schedule(ctx[i], co1);
}
g_usleep(seconds * 1000000);
atomic_mb_set(&now_stopping, true);
for (i = 0; i < NUM_CONTEXTS; i++) {
ctx_run(i, finish_cb, NULL);
to_schedule[i] = NULL;
}
join_aio_contexts();
g_test_message("scheduled %d, queued %d, retry %d, total %d\n",
count_other, count_here, count_retry,
count_here + count_other + count_retry);
}
static void test_multi_co_schedule_1(void)
{
test_multi_co_schedule(1);
}
static void test_multi_co_schedule_10(void)
{
test_multi_co_schedule(10);
}
/* CoMutex thread-safety. */
static uint32_t atomic_counter;
static uint32_t running;
static uint32_t counter;
static CoMutex comutex;
static void coroutine_fn test_multi_co_mutex_entry(void *opaque)
{
while (!atomic_mb_read(&now_stopping)) {
qemu_co_mutex_lock(&comutex);
counter++;
qemu_co_mutex_unlock(&comutex);
/* Increase atomic_counter *after* releasing the mutex. Otherwise
* there is a chance (it happens about 1 in 3 runs) that the iothread
* exits before the coroutine is woken up, causing a spurious
* assertion failure.
*/
atomic_inc(&atomic_counter);
}
atomic_dec(&running);
}
static void test_multi_co_mutex(int threads, int seconds)
{
int i;
qemu_co_mutex_init(&comutex);
counter = 0;
atomic_counter = 0;
now_stopping = false;
create_aio_contexts();
assert(threads <= NUM_CONTEXTS);
running = threads;
for (i = 0; i < threads; i++) {
Coroutine *co1 = qemu_coroutine_create(test_multi_co_mutex_entry, NULL);
aio_co_schedule(ctx[i], co1);
}
g_usleep(seconds * 1000000);
atomic_mb_set(&now_stopping, true);
while (running > 0) {
g_usleep(100000);
}
join_aio_contexts();
g_test_message("%d iterations/second\n", counter / seconds);
g_assert_cmpint(counter, ==, atomic_counter);
}
/* Testing with NUM_CONTEXTS threads focuses on the queue. The mutex however
* is too contended (and the threads spend too much time in aio_poll)
* to actually stress the handoff protocol.
*/
static void test_multi_co_mutex_1(void)
{
test_multi_co_mutex(NUM_CONTEXTS, 1);
}
static void test_multi_co_mutex_10(void)
{
test_multi_co_mutex(NUM_CONTEXTS, 10);
}
/* Testing with fewer threads stresses the handoff protocol too. Still, the
* case where the locker _can_ pick up a handoff is very rare, happening
* about 10 times in 1 million, so increase the runtime a bit compared to
* other "quick" testcases that only run for 1 second.
*/
static void test_multi_co_mutex_2_3(void)
{
test_multi_co_mutex(2, 3);
}
static void test_multi_co_mutex_2_30(void)
{
test_multi_co_mutex(2, 30);
}
/* Same test with fair mutexes, for performance comparison. */
#ifdef CONFIG_LINUX
#include "qemu/futex.h"
/* The nodes for the mutex reside in this structure (on which we try to avoid
* false sharing). The head of the mutex is in the "mutex_head" variable.
*/
static struct {
int next, locked;
int padding[14];
} nodes[NUM_CONTEXTS] __attribute__((__aligned__(64)));
static int mutex_head = -1;
static void mcs_mutex_lock(void)
{
int prev;
nodes[id].next = -1;
nodes[id].locked = 1;
prev = atomic_xchg(&mutex_head, id);
if (prev != -1) {
atomic_set(&nodes[prev].next, id);
qemu_futex_wait(&nodes[id].locked, 1);
}
}
static void mcs_mutex_unlock(void)
{
int next;
if (nodes[id].next == -1) {
if (atomic_read(&mutex_head) == id &&
atomic_cmpxchg(&mutex_head, id, -1) == id) {
/* Last item in the list, exit. */
return;
}
while (atomic_read(&nodes[id].next) == -1) {
/* mcs_mutex_lock did the xchg, but has not updated
* nodes[prev].next yet.
*/
}
}
/* Wake up the next in line. */
next = nodes[id].next;
nodes[next].locked = 0;
qemu_futex_wake(&nodes[next].locked, 1);
}
static void test_multi_fair_mutex_entry(void *opaque)
{
while (!atomic_mb_read(&now_stopping)) {
mcs_mutex_lock();
counter++;
mcs_mutex_unlock();
atomic_inc(&atomic_counter);
}
atomic_dec(&running);
}
static void test_multi_fair_mutex(int threads, int seconds)
{
int i;
assert(mutex_head == -1);
counter = 0;
atomic_counter = 0;
now_stopping = false;
create_aio_contexts();
assert(threads <= NUM_CONTEXTS);
running = threads;
for (i = 0; i < threads; i++) {
Coroutine *co1 = qemu_coroutine_create(test_multi_fair_mutex_entry, NULL);
aio_co_schedule(ctx[i], co1);
}
g_usleep(seconds * 1000000);
atomic_mb_set(&now_stopping, true);
while (running > 0) {
g_usleep(100000);
}
join_aio_contexts();
g_test_message("%d iterations/second\n", counter / seconds);
g_assert_cmpint(counter, ==, atomic_counter);
}
static void test_multi_fair_mutex_1(void)
{
test_multi_fair_mutex(NUM_CONTEXTS, 1);
}
static void test_multi_fair_mutex_10(void)
{
test_multi_fair_mutex(NUM_CONTEXTS, 10);
}
#endif
/* Same test with pthread mutexes, for performance comparison and
* portability. */
static QemuMutex mutex;
static void test_multi_mutex_entry(void *opaque)
{
while (!atomic_mb_read(&now_stopping)) {
qemu_mutex_lock(&mutex);
counter++;
qemu_mutex_unlock(&mutex);
atomic_inc(&atomic_counter);
}
atomic_dec(&running);
}
static void test_multi_mutex(int threads, int seconds)
{
int i;
qemu_mutex_init(&mutex);
counter = 0;
atomic_counter = 0;
now_stopping = false;
create_aio_contexts();
assert(threads <= NUM_CONTEXTS);
running = threads;
for (i = 0; i < threads; i++) {
Coroutine *co1 = qemu_coroutine_create(test_multi_mutex_entry, NULL);
aio_co_schedule(ctx[i], co1);
}
g_usleep(seconds * 1000000);
atomic_mb_set(&now_stopping, true);
while (running > 0) {
g_usleep(100000);
}
join_aio_contexts();
g_test_message("%d iterations/second\n", counter / seconds);
g_assert_cmpint(counter, ==, atomic_counter);
}
static void test_multi_mutex_1(void)
{
test_multi_mutex(NUM_CONTEXTS, 1);
}
static void test_multi_mutex_10(void)
{
test_multi_mutex(NUM_CONTEXTS, 10);
}
/* End of tests. */
int main(int argc, char **argv)
{
init_clocks();
g_test_init(&argc, &argv, NULL);
g_test_add_func("/aio/multi/lifecycle", test_lifecycle);
if (g_test_quick()) {
g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_1);
g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_3);
#ifdef CONFIG_LINUX
g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_1);
#endif
g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_1);
} else {
g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_10);
g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_30);
#ifdef CONFIG_LINUX
g_test_add_func("/aio/multi/mutex/mcs", test_multi_fair_mutex_10);
#endif
g_test_add_func("/aio/multi/mutex/pthread", test_multi_mutex_10);
}
return g_test_run();
}
......@@ -6,6 +6,7 @@
#include "qapi/error.h"
#include "qemu/timer.h"
#include "qemu/error-report.h"
#include "qemu/main-loop.h"
static AioContext *ctx;
static ThreadPool *pool;
......@@ -224,15 +225,9 @@ static void test_cancel_async(void)
int main(int argc, char **argv)
{
int ret;
Error *local_error = NULL;
init_clocks();
ctx = aio_context_new(&local_error);
if (!ctx) {
error_reportf_err(local_error, "Failed to create AIO Context: ");
exit(1);
}
qemu_init_main_loop(&error_abort);
ctx = qemu_get_current_aio_context();
pool = aio_get_thread_pool(ctx);
g_test_init(&argc, &argv, NULL);
......@@ -245,6 +240,5 @@ int main(int argc, char **argv)
ret = g_test_run();
aio_context_unref(ctx);
return ret;
}
......@@ -25,17 +25,6 @@
#
# The <format-string> should be a sprintf()-compatible format string.
# aio-posix.c
run_poll_handlers_begin(void *ctx, int64_t max_ns) "ctx %p max_ns %"PRId64
run_poll_handlers_end(void *ctx, bool progress) "ctx %p progress %d"
poll_shrink(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64
poll_grow(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64
# thread-pool.c
thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p"
thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d"
thread_pool_cancel(void *req, void *opaque) "req %p opaque %p"
# ioport.c
cpu_in(unsigned int addr, char size, unsigned int val) "addr %#x(%c) value %u"
cpu_out(unsigned int addr, char size, unsigned int val) "addr %#x(%c) value %u"
......
util-obj-y = osdep.o cutils.o unicode.o qemu-timer-common.o
util-obj-y += bufferiszero.o
util-obj-y += lockcnt.o
util-obj-y += aiocb.o async.o thread-pool.o qemu-timer.o
util-obj-y += main-loop.o iohandler.o
util-obj-$(CONFIG_POSIX) += aio-posix.o
util-obj-$(CONFIG_POSIX) += compatfd.o
util-obj-$(CONFIG_POSIX) += event_notifier-posix.o
util-obj-$(CONFIG_POSIX) += mmap-alloc.o
util-obj-$(CONFIG_POSIX) += oslib-posix.o
util-obj-$(CONFIG_POSIX) += qemu-openpty.o
util-obj-$(CONFIG_POSIX) += qemu-thread-posix.o
util-obj-$(CONFIG_WIN32) += event_notifier-win32.o
util-obj-$(CONFIG_POSIX) += memfd.o
util-obj-$(CONFIG_WIN32) += aio-win32.o
util-obj-$(CONFIG_WIN32) += event_notifier-win32.o
util-obj-$(CONFIG_WIN32) += oslib-win32.o
util-obj-$(CONFIG_WIN32) += qemu-thread-win32.o
util-obj-y += envlist.o path.o module.o
......
......@@ -19,7 +19,7 @@
#include "qemu/rcu_queue.h"
#include "qemu/sockets.h"
#include "qemu/cutils.h"
#include "trace-root.h"
#include "trace.h"
#ifdef CONFIG_EPOLL_CREATE1
#include <sys/epoll.h>
#endif
......@@ -386,12 +386,6 @@ static bool aio_dispatch_handlers(AioContext *ctx)
AioHandler *node, *tmp;
bool progress = false;
/*
* We have to walk very carefully in case aio_set_fd_handler is
* called while we're walking.
*/
qemu_lockcnt_inc(&ctx->list_lock);
QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
int revents;
......@@ -426,33 +420,17 @@ static bool aio_dispatch_handlers(AioContext *ctx)
}
}
qemu_lockcnt_dec(&ctx->list_lock);
return progress;
}
/*
* Note that dispatch_fds == false has the side-effect of post-poning the
* freeing of deleted handlers.
*/
bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
void aio_dispatch(AioContext *ctx)
{
bool progress;
/*
* If there are callbacks left that have been queued, we need to call them.
* Do not call select in this case, because it is possible that the caller
* does not need a complete flush (as is the case for aio_poll loops).
*/
progress = aio_bh_poll(ctx);
if (dispatch_fds) {
progress |= aio_dispatch_handlers(ctx);
}
/* Run our timers */
progress |= timerlistgroup_run_timers(&ctx->tlg);
qemu_lockcnt_inc(&ctx->list_lock);
aio_bh_poll(ctx);
aio_dispatch_handlers(ctx);
qemu_lockcnt_dec(&ctx->list_lock);
return progress;
timerlistgroup_run_timers(&ctx->tlg);
}
/* These thread-local variables are used only in a small part of aio_poll
......@@ -597,9 +575,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
int64_t timeout;
int64_t start = 0;
aio_context_acquire(ctx);
progress = false;
/* aio_notify can avoid the expensive event_notifier_set if
* everything (file descriptors, bottom halves, timers) will
* be re-evaluated before the next blocking poll(). This is
......@@ -617,9 +592,8 @@ bool aio_poll(AioContext *ctx, bool blocking)
start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
}
if (try_poll_mode(ctx, blocking)) {
progress = true;
} else {
progress = try_poll_mode(ctx, blocking);
if (!progress) {
assert(npfd == 0);
/* fill pollfds */
......@@ -636,9 +610,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
timeout = blocking ? aio_compute_timeout(ctx) : 0;
/* wait until next event */
if (timeout) {
aio_context_release(ctx);
}
if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) {
AioHandler epoll_handler;
......@@ -650,9 +621,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
} else {
ret = qemu_poll_ns(pollfds, npfd, timeout);
}
if (timeout) {
aio_context_acquire(ctx);
}
}
if (blocking) {
......@@ -710,14 +678,16 @@ bool aio_poll(AioContext *ctx, bool blocking)
}
npfd = 0;
qemu_lockcnt_dec(&ctx->list_lock);
/* Run dispatch even if there were no readable fds to run timers */
if (aio_dispatch(ctx, ret > 0)) {
progress = true;
progress |= aio_bh_poll(ctx);
if (ret > 0) {
progress |= aio_dispatch_handlers(ctx);
}
aio_context_release(ctx);
qemu_lockcnt_dec(&ctx->list_lock);
progress |= timerlistgroup_run_timers(&ctx->tlg);
return progress;
}
......
......@@ -253,8 +253,6 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
bool progress = false;
AioHandler *tmp;
qemu_lockcnt_inc(&ctx->list_lock);
/*
* We have to walk very carefully in case aio_set_fd_handler is
* called while we're walking.
......@@ -305,20 +303,16 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
}
}
qemu_lockcnt_dec(&ctx->list_lock);
return progress;
}
bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
void aio_dispatch(AioContext *ctx)
{
bool progress;
progress = aio_bh_poll(ctx);
if (dispatch_fds) {
progress |= aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
}
progress |= timerlistgroup_run_timers(&ctx->tlg);
return progress;
qemu_lockcnt_inc(&ctx->list_lock);
aio_bh_poll(ctx);
aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
qemu_lockcnt_dec(&ctx->list_lock);
timerlistgroup_run_timers(&ctx->tlg);
}
bool aio_poll(AioContext *ctx, bool blocking)
......@@ -329,7 +323,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
int count;
int timeout;
aio_context_acquire(ctx);
progress = false;
/* aio_notify can avoid the expensive event_notifier_set if
......@@ -355,7 +348,6 @@ bool aio_poll(AioContext *ctx, bool blocking)
}
}
qemu_lockcnt_dec(&ctx->list_lock);
first = true;
/* ctx->notifier is always registered. */
......@@ -371,17 +363,11 @@ bool aio_poll(AioContext *ctx, bool blocking)
timeout = blocking && !have_select_revents
? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0;
if (timeout) {
aio_context_release(ctx);
}
ret = WaitForMultipleObjects(count, events, FALSE, timeout);
if (blocking) {
assert(first);
atomic_sub(&ctx->notify_me, 2);
}
if (timeout) {
aio_context_acquire(ctx);
}
if (first) {
aio_notify_accept(ctx);
......@@ -404,9 +390,9 @@ bool aio_poll(AioContext *ctx, bool blocking)
progress |= aio_dispatch_handlers(ctx, event);
} while (count > 0);
progress |= timerlistgroup_run_timers(&ctx->tlg);
qemu_lockcnt_dec(&ctx->list_lock);
aio_context_release(ctx);
progress |= timerlistgroup_run_timers(&ctx->tlg);
return progress;
}
......
/*
* BlockAIOCB allocation
*
* Copyright (c) 2003-2017 Fabrice Bellard and other QEMU contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "qemu/osdep.h"
#include "block/aio.h"
void *qemu_aio_get(const AIOCBInfo *aiocb_info, BlockDriverState *bs,
BlockCompletionFunc *cb, void *opaque)
{
BlockAIOCB *acb;
acb = g_malloc(aiocb_info->aiocb_size);
acb->aiocb_info = aiocb_info;
acb->bs = bs;
acb->cb = cb;
acb->opaque = opaque;
acb->refcnt = 1;
return acb;
}
void qemu_aio_ref(void *p)
{
BlockAIOCB *acb = p;
acb->refcnt++;
}
void qemu_aio_unref(void *p)
{
BlockAIOCB *acb = p;
assert(acb->refcnt > 0);
if (--acb->refcnt == 0) {
g_free(acb);
}
}
/*
* QEMU System Emulator
* Data plane event loop
*
* Copyright (c) 2003-2008 Fabrice Bellard
* Copyright (c) 2009-2017 QEMU contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
......@@ -30,6 +31,8 @@
#include "qemu/main-loop.h"
#include "qemu/atomic.h"
#include "block/raw-aio.h"
#include "qemu/coroutine_int.h"
#include "trace.h"
/***********************************************************/
/* bottom halves (can be seen as timers which expire ASAP) */
......@@ -87,15 +90,16 @@ void aio_bh_call(QEMUBH *bh)
bh->cb(bh->opaque);
}
/* Multiple occurrences of aio_bh_poll cannot be called concurrently */
/* Multiple occurrences of aio_bh_poll cannot be called concurrently.
* The count in ctx->list_lock is incremented before the call, and is
* not affected by the call.
*/
int aio_bh_poll(AioContext *ctx)
{
QEMUBH *bh, **bhp, *next;
int ret;
bool deleted = false;
qemu_lockcnt_inc(&ctx->list_lock);
ret = 0;
for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) {
next = atomic_rcu_read(&bh->next);
......@@ -120,11 +124,10 @@ int aio_bh_poll(AioContext *ctx)
/* remove deleted bhs */
if (!deleted) {
qemu_lockcnt_dec(&ctx->list_lock);
return ret;
}
if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) {
if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
bhp = &ctx->first_bh;
while (*bhp) {
bh = *bhp;
......@@ -135,7 +138,7 @@ int aio_bh_poll(AioContext *ctx)
bhp = &bh->next;
}
}
qemu_lockcnt_unlock(&ctx->list_lock);
qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
}
return ret;
}
......@@ -255,7 +258,7 @@ aio_ctx_dispatch(GSource *source,
AioContext *ctx = (AioContext *) source;
assert(callback == NULL);
aio_dispatch(ctx, true);
aio_dispatch(ctx);
return true;
}
......@@ -274,6 +277,9 @@ aio_ctx_finalize(GSource *source)
}
#endif
assert(QSLIST_EMPTY(&ctx->scheduled_coroutines));
qemu_bh_delete(ctx->co_schedule_bh);
qemu_lockcnt_lock(&ctx->list_lock);
assert(!qemu_lockcnt_count(&ctx->list_lock));
while (ctx->first_bh) {
......@@ -363,6 +369,30 @@ static bool event_notifier_poll(void *opaque)
return atomic_read(&ctx->notified);
}
static void co_schedule_bh_cb(void *opaque)
{
AioContext *ctx = opaque;
QSLIST_HEAD(, Coroutine) straight, reversed;
QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
QSLIST_INIT(&straight);
while (!QSLIST_EMPTY(&reversed)) {
Coroutine *co = QSLIST_FIRST(&reversed);
QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
}
while (!QSLIST_EMPTY(&straight)) {
Coroutine *co = QSLIST_FIRST(&straight);
QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
trace_aio_co_schedule_bh_cb(ctx, co);
aio_context_acquire(ctx);
qemu_coroutine_enter(co);
aio_context_release(ctx);
}
}
AioContext *aio_context_new(Error **errp)
{
int ret;
......@@ -378,6 +408,10 @@ AioContext *aio_context_new(Error **errp)
}
g_source_set_can_recurse(&ctx->source, true);
qemu_lockcnt_init(&ctx->list_lock);
ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
QSLIST_INIT(&ctx->scheduled_coroutines);
aio_set_event_notifier(ctx, &ctx->notifier,
false,
(EventNotifierHandler *)
......@@ -401,6 +435,40 @@ fail:
return NULL;
}
void aio_co_schedule(AioContext *ctx, Coroutine *co)
{
trace_aio_co_schedule(ctx, co);
QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
co, co_scheduled_next);
qemu_bh_schedule(ctx->co_schedule_bh);
}
void aio_co_wake(struct Coroutine *co)
{
AioContext *ctx;
/* Read coroutine before co->ctx. Matches smp_wmb in
* qemu_coroutine_enter.
*/
smp_read_barrier_depends();
ctx = atomic_read(&co->ctx);
if (ctx != qemu_get_current_aio_context()) {
aio_co_schedule(ctx, co);
return;
}
if (qemu_in_coroutine()) {
Coroutine *self = qemu_coroutine_self();
assert(self != co);
QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next);
} else {
aio_context_acquire(ctx);
qemu_coroutine_enter(co);
aio_context_release(ctx);
}
}
void aio_context_ref(AioContext *ctx)
{
g_source_ref(&ctx->source);
......
......@@ -20,13 +20,19 @@
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
* The lock-free mutex implementation is based on OSv
* (core/lfmutex.cc, include/lockfree/mutex.hh).
* Copyright (C) 2013 Cloudius Systems, Ltd.
*/
#include "qemu/osdep.h"
#include "qemu-common.h"
#include "qemu/coroutine.h"
#include "qemu/coroutine_int.h"
#include "qemu/processor.h"
#include "qemu/queue.h"
#include "block/aio.h"
#include "trace.h"
void qemu_co_queue_init(CoQueue *queue)
......@@ -34,12 +40,30 @@ void qemu_co_queue_init(CoQueue *queue)
QSIMPLEQ_INIT(&queue->entries);
}
void coroutine_fn qemu_co_queue_wait(CoQueue *queue)
void coroutine_fn qemu_co_queue_wait(CoQueue *queue, CoMutex *mutex)
{
Coroutine *self = qemu_coroutine_self();
QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
if (mutex) {
qemu_co_mutex_unlock(mutex);
}
/* There is no race condition here. Other threads will call
* aio_co_schedule on our AioContext, which can reenter this
* coroutine but only after this yield and after the main loop
* has gone through the next iteration.
*/
qemu_coroutine_yield();
assert(qemu_in_coroutine());
/* TODO: OSv implements wait morphing here, where the wakeup
* primitive automatically places the woken coroutine on the
* mutex's queue. This avoids the thundering herd effect.
*/
if (mutex) {
qemu_co_mutex_lock(mutex);
}
}
/**
......@@ -63,7 +87,6 @@ void qemu_co_queue_run_restart(Coroutine *co)
static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
{
Coroutine *self = qemu_coroutine_self();
Coroutine *next;
if (QSIMPLEQ_EMPTY(&queue->entries)) {
......@@ -72,8 +95,7 @@ static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
while ((next = QSIMPLEQ_FIRST(&queue->entries)) != NULL) {
QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, next, co_queue_next);
trace_qemu_co_queue_next(next);
aio_co_wake(next);
if (single) {
break;
}
......@@ -112,27 +134,157 @@ bool qemu_co_queue_empty(CoQueue *queue)
return QSIMPLEQ_FIRST(&queue->entries) == NULL;
}
/* The wait records are handled with a multiple-producer, single-consumer
* lock-free queue. There cannot be two concurrent pop_waiter() calls
* because pop_waiter() can only be called while mutex->handoff is zero.
* This can happen in three cases:
* - in qemu_co_mutex_unlock, before the hand-off protocol has started.
* In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and
* not take part in the handoff.
* - in qemu_co_mutex_lock, if it steals the hand-off responsibility from
* qemu_co_mutex_unlock. In this case, qemu_co_mutex_unlock will fail
* the cmpxchg (it will see either 0 or the next sequence value) and
* exit. The next hand-off cannot begin until qemu_co_mutex_lock has
* woken up someone.
* - in qemu_co_mutex_unlock, if it takes the hand-off token itself.
* In this case another iteration starts with mutex->handoff == 0;
* a concurrent qemu_co_mutex_lock will fail the cmpxchg, and
* qemu_co_mutex_unlock will go back to case (1).
*
* The following functions manage this queue.
*/
typedef struct CoWaitRecord {
Coroutine *co;
QSLIST_ENTRY(CoWaitRecord) next;
} CoWaitRecord;
static void push_waiter(CoMutex *mutex, CoWaitRecord *w)
{
w->co = qemu_coroutine_self();
QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);
}
static void move_waiters(CoMutex *mutex)
{
QSLIST_HEAD(, CoWaitRecord) reversed;
QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push);
while (!QSLIST_EMPTY(&reversed)) {
CoWaitRecord *w = QSLIST_FIRST(&reversed);
QSLIST_REMOVE_HEAD(&reversed, next);
QSLIST_INSERT_HEAD(&mutex->to_pop, w, next);
}
}
static CoWaitRecord *pop_waiter(CoMutex *mutex)
{
CoWaitRecord *w;
if (QSLIST_EMPTY(&mutex->to_pop)) {
move_waiters(mutex);
if (QSLIST_EMPTY(&mutex->to_pop)) {
return NULL;
}
}
w = QSLIST_FIRST(&mutex->to_pop);
QSLIST_REMOVE_HEAD(&mutex->to_pop, next);
return w;
}
static bool has_waiters(CoMutex *mutex)
{
return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push);
}
void qemu_co_mutex_init(CoMutex *mutex)
{
memset(mutex, 0, sizeof(*mutex));
qemu_co_queue_init(&mutex->queue);
}
void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co)
{
/* Read co before co->ctx; pairs with smp_wmb() in
* qemu_coroutine_enter().
*/
smp_read_barrier_depends();
mutex->ctx = co->ctx;
aio_co_wake(co);
}
static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,
CoMutex *mutex)
{
Coroutine *self = qemu_coroutine_self();
CoWaitRecord w;
unsigned old_handoff;
trace_qemu_co_mutex_lock_entry(mutex, self);
w.co = self;
push_waiter(mutex, &w);
/* This is the "Responsibility Hand-Off" protocol; a lock() picks from
* a concurrent unlock() the responsibility of waking somebody up.
*/
old_handoff = atomic_mb_read(&mutex->handoff);
if (old_handoff &&
has_waiters(mutex) &&
atomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) {
/* There can be no concurrent pops, because there can be only
* one active handoff at a time.
*/
CoWaitRecord *to_wake = pop_waiter(mutex);
Coroutine *co = to_wake->co;
if (co == self) {
/* We got the lock ourselves! */
assert(to_wake == &w);
mutex->ctx = ctx;
return;
}
qemu_co_mutex_wake(mutex, co);
}
while (mutex->locked) {
qemu_co_queue_wait(&mutex->queue);
qemu_coroutine_yield();
trace_qemu_co_mutex_lock_return(mutex, self);
}
void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
{
AioContext *ctx = qemu_get_current_aio_context();
Coroutine *self = qemu_coroutine_self();
int waiters, i;
/* Running a very small critical section on pthread_mutex_t and CoMutex
* shows that pthread_mutex_t is much faster because it doesn't actually
* go to sleep. What happens is that the critical section is shorter
* than the latency of entering the kernel and thus FUTEX_WAIT always
* fails. With CoMutex there is no such latency but you still want to
* avoid wait and wakeup. So introduce it artificially.
*/
i = 0;
retry_fast_path:
waiters = atomic_cmpxchg(&mutex->locked, 0, 1);
if (waiters != 0) {
while (waiters == 1 && ++i < 1000) {
if (atomic_read(&mutex->ctx) == ctx) {
break;
}
if (atomic_read(&mutex->locked) == 0) {
goto retry_fast_path;
}
cpu_relax();
}
waiters = atomic_fetch_inc(&mutex->locked);
}
mutex->locked = true;
if (waiters == 0) {
/* Uncontended. */
trace_qemu_co_mutex_lock_uncontended(mutex, self);
mutex->ctx = ctx;
} else {
qemu_co_mutex_lock_slowpath(ctx, mutex);
}
mutex->holder = self;
self->locks_held++;
trace_qemu_co_mutex_lock_return(mutex, self);
}
void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
......@@ -141,14 +293,51 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
trace_qemu_co_mutex_unlock_entry(mutex, self);
assert(mutex->locked == true);
assert(mutex->locked);
assert(mutex->holder == self);
assert(qemu_in_coroutine());
mutex->locked = false;
mutex->ctx = NULL;
mutex->holder = NULL;
self->locks_held--;
qemu_co_queue_next(&mutex->queue);
if (atomic_fetch_dec(&mutex->locked) == 1) {
/* No waiting qemu_co_mutex_lock(). Pfew, that was easy! */
return;
}
for (;;) {
CoWaitRecord *to_wake = pop_waiter(mutex);
unsigned our_handoff;
if (to_wake) {
qemu_co_mutex_wake(mutex, to_wake->co);
break;
}
/* Some concurrent lock() is in progress (we know this because
* mutex->locked was >1) but it hasn't yet put itself on the wait
* queue. Pick a sequence number for the handoff protocol (not 0).
*/
if (++mutex->sequence == 0) {
mutex->sequence = 1;
}
our_handoff = mutex->sequence;
atomic_mb_set(&mutex->handoff, our_handoff);
if (!has_waiters(mutex)) {
/* The concurrent lock has not added itself yet, so it
* will be able to pick our handoff.
*/
break;
}
/* Try to do the handoff protocol ourselves; if somebody else has
* already taken it, however, we're done and they're responsible.
*/
if (atomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) {
break;
}
}
trace_qemu_co_mutex_unlock_return(mutex, self);
}
......@@ -157,16 +346,22 @@ void qemu_co_rwlock_init(CoRwlock *lock)
{
memset(lock, 0, sizeof(*lock));
qemu_co_queue_init(&lock->queue);
qemu_co_mutex_init(&lock->mutex);
}
void qemu_co_rwlock_rdlock(CoRwlock *lock)
{
Coroutine *self = qemu_coroutine_self();
while (lock->writer) {
qemu_co_queue_wait(&lock->queue);
qemu_co_mutex_lock(&lock->mutex);
/* For fairness, wait if a writer is in line. */
while (lock->pending_writer) {
qemu_co_queue_wait(&lock->queue, &lock->mutex);
}
lock->reader++;
qemu_co_mutex_unlock(&lock->mutex);
/* The rest of the read-side critical section is run without the mutex. */
self->locks_held++;
}
......@@ -175,10 +370,13 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
Coroutine *self = qemu_coroutine_self();
assert(qemu_in_coroutine());
if (lock->writer) {
lock->writer = false;
if (!lock->reader) {
/* The critical section started in qemu_co_rwlock_wrlock. */
qemu_co_queue_restart_all(&lock->queue);
} else {
self->locks_held--;
qemu_co_mutex_lock(&lock->mutex);
lock->reader--;
assert(lock->reader >= 0);
/* Wakeup only one waiting writer */
......@@ -186,16 +384,20 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
qemu_co_queue_next(&lock->queue);
}
}
self->locks_held--;
qemu_co_mutex_unlock(&lock->mutex);
}
void qemu_co_rwlock_wrlock(CoRwlock *lock)
{
Coroutine *self = qemu_coroutine_self();
while (lock->writer || lock->reader) {
qemu_co_queue_wait(&lock->queue);
qemu_co_mutex_lock(&lock->mutex);
lock->pending_writer++;
while (lock->reader) {
qemu_co_queue_wait(&lock->queue, &lock->mutex);
}
lock->writer = true;
self->locks_held++;
lock->pending_writer--;
/* The rest of the write-side critical section is run with
* the mutex taken, so that lock->reader remains zero.
* There is no need to update self->locks_held.
*/
}
......@@ -25,7 +25,7 @@ static void co_sleep_cb(void *opaque)
{
CoSleepCB *sleep_cb = opaque;
qemu_coroutine_enter(sleep_cb->co);
aio_co_wake(sleep_cb->co);
}
void coroutine_fn co_aio_sleep_ns(AioContext *ctx, QEMUClockType type,
......
......@@ -19,6 +19,7 @@
#include "qemu/atomic.h"
#include "qemu/coroutine.h"
#include "qemu/coroutine_int.h"
#include "block/aio.h"
enum {
POOL_BATCH_SIZE = 64,
......@@ -114,6 +115,13 @@ void qemu_coroutine_enter(Coroutine *co)
}
co->caller = self;
co->ctx = qemu_get_current_aio_context();
/* Store co->ctx before anything that stores co. Matches
* barrier in aio_co_wake and qemu_co_mutex_wake.
*/
smp_wmb();
ret = qemu_coroutine_switch(self, co, COROUTINE_ENTER);
qemu_co_queue_run_restart(co);
......
......@@ -19,7 +19,7 @@
#include "qemu/queue.h"
#include "qemu/thread.h"
#include "qemu/coroutine.h"
#include "trace-root.h"
#include "trace.h"
#include "block/thread-pool.h"
#include "qemu/main-loop.h"
......@@ -165,6 +165,7 @@ static void thread_pool_completion_bh(void *opaque)
ThreadPool *pool = opaque;
ThreadPoolElement *elem, *next;
aio_context_acquire(pool->ctx);
restart:
QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
if (elem->state != THREAD_DONE) {
......@@ -184,13 +185,16 @@ restart:
*/
qemu_bh_schedule(pool->completion_bh);
aio_context_release(pool->ctx);
elem->common.cb(elem->common.opaque, elem->ret);
aio_context_acquire(pool->ctx);
qemu_aio_unref(elem);
goto restart;
} else {
qemu_aio_unref(elem);
}
}
aio_context_release(pool->ctx);
}
static void thread_pool_cancel(BlockAIOCB *acb)
......@@ -267,7 +271,7 @@ static void thread_pool_co_cb(void *opaque, int ret)
ThreadPoolCo *co = opaque;
co->ret = ret;
qemu_coroutine_enter(co->co);
aio_co_wake(co->co);
}
int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
......
# See docs/tracing.txt for syntax documentation.
# util/aio-posix.c
run_poll_handlers_begin(void *ctx, int64_t max_ns) "ctx %p max_ns %"PRId64
run_poll_handlers_end(void *ctx, bool progress) "ctx %p progress %d"
poll_shrink(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64
poll_grow(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64
# util/async.c
aio_co_schedule(void *ctx, void *co) "ctx %p co %p"
aio_co_schedule_bh_cb(void *ctx, void *co) "ctx %p co %p"
# util/thread-pool.c
thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p"
thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d"
thread_pool_cancel(void *req, void *opaque) "req %p opaque %p"
# util/buffer.c
buffer_resize(const char *buf, size_t olen, size_t len) "%s: old %zd, new %zd"
buffer_move_empty(const char *buf, size_t len, const char *from) "%s: %zd bytes from %s"
......@@ -13,7 +28,7 @@ qemu_coroutine_terminate(void *co) "self %p"
# util/qemu-coroutine-lock.c
qemu_co_queue_run_restart(void *co) "co %p"
qemu_co_queue_next(void *nxt) "next %p"
qemu_co_mutex_lock_uncontended(void *mutex, void *self) "mutex %p self %p"
qemu_co_mutex_lock_entry(void *mutex, void *self) "mutex %p self %p"
qemu_co_mutex_lock_return(void *mutex, void *self) "mutex %p self %p"
qemu_co_mutex_unlock_entry(void *mutex, void *self) "mutex %p self %p"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册