linux-aio.c 13.6 KB
Newer Older
1 2 3 4 5 6 7 8 9
/*
 * Linux native AIO support.
 *
 * Copyright (C) 2009 IBM, Corp.
 * Copyright (C) 2009 Red Hat, Inc.
 *
 * This work is licensed under the terms of the GNU GPL, version 2 or later.
 * See the COPYING file in the top-level directory.
 */
P
Peter Maydell 已提交
10
#include "qemu/osdep.h"
11
#include "qemu-common.h"
12
#include "block/aio.h"
13
#include "qemu/queue.h"
14
#include "block/block.h"
15
#include "block/raw-aio.h"
16
#include "qemu/event_notifier.h"
17
#include "qemu/coroutine.h"
18 19 20 21 22 23 24 25 26 27 28 29 30 31

#include <libaio.h>

/*
 * Queue size (per-device).
 *
 * XXX: eventually we need to communicate this to the guest and/or make it
 *      tunable by the guest.  If we get more outstanding requests at a time
 *      than this we will get EAGAIN from io_submit which is communicated to
 *      the guest as an I/O error.
 */
#define MAX_EVENTS 128

struct qemu_laiocb {
32
    BlockAIOCB common;
33
    Coroutine *co;
34
    LinuxAioState *ctx;
35 36 37
    struct iocb iocb;
    ssize_t ret;
    size_t nbytes;
38 39
    QEMUIOVector *qiov;
    bool is_read;
40
    QSIMPLEQ_ENTRY(qemu_laiocb) next;
41 42
};

43 44
typedef struct {
    int plugged;
45 46
    unsigned int in_queue;
    unsigned int in_flight;
47
    bool blocked;
48
    QSIMPLEQ_HEAD(, qemu_laiocb) pending;
49 50
} LaioQueue;

51
struct LinuxAioState {
52 53
    AioContext *aio_context;

54
    io_context_t ctx;
P
Paolo Bonzini 已提交
55
    EventNotifier e;
56 57 58

    /* io queue for submit at batch */
    LaioQueue io_q;
59 60 61 62 63

    /* I/O completion processing */
    QEMUBH *completion_bh;
    int event_idx;
    int event_max;
64 65
};

66
static void ioq_submit(LinuxAioState *s);
67

68 69 70 71 72
static inline ssize_t io_event_ret(struct io_event *ev)
{
    return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
}

K
Kevin Wolf 已提交
73 74 75
/*
 * Completes an AIO request (calls the callback and frees the ACB).
 */
76
static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
K
Kevin Wolf 已提交
77 78 79 80 81
{
    int ret;

    ret = laiocb->ret;
    if (ret != -ECANCELED) {
82
        if (ret == laiocb->nbytes) {
K
Kevin Wolf 已提交
83
            ret = 0;
84 85 86
        } else if (ret >= 0) {
            /* Short reads mean EOF, pad with zeros. */
            if (laiocb->is_read) {
87 88
                qemu_iovec_memset(laiocb->qiov, ret, 0,
                    laiocb->qiov->size - ret);
89
            } else {
90
                ret = -ENOSPC;
91 92
            }
        }
K
Kevin Wolf 已提交
93 94
    }

95 96
    laiocb->ret = ret;
    if (laiocb->co) {
97 98 99 100 101 102
        /* If the coroutine is already entered it must be in ioq_submit() and
         * will notice laio->ret has been filled in when it eventually runs
         * later.  Coroutines cannot be entered recursively so avoid doing
         * that!
         */
        if (!qemu_coroutine_entered(laiocb->co)) {
103 104
            qemu_coroutine_enter(laiocb->co);
        }
105 106 107 108
    } else {
        laiocb->common.cb(laiocb->common.opaque, ret);
        qemu_aio_unref(laiocb);
    }
K
Kevin Wolf 已提交
109 110
}

111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
/**
 * aio_ring buffer which is shared between userspace and kernel.
 *
 * This copied from linux/fs/aio.c, common header does not exist
 * but AIO exists for ages so we assume ABI is stable.
 */
struct aio_ring {
    unsigned    id;    /* kernel internal index number */
    unsigned    nr;    /* number of io_events */
    unsigned    head;  /* Written to by userland or by kernel. */
    unsigned    tail;

    unsigned    magic;
    unsigned    compat_features;
    unsigned    incompat_features;
    unsigned    header_length;  /* size of aio_ring */

    struct io_event io_events[0];
};

/**
 * io_getevents_peek:
 * @ctx: AIO context
 * @events: pointer on events array, output value

 * Returns the number of completed events and sets a pointer
 * on events array.  This function does not update the internal
 * ring buffer, only reads head and tail.  When @events has been
 * processed io_getevents_commit() must be called.
 */
static inline unsigned int io_getevents_peek(io_context_t ctx,
                                             struct io_event **events)
{
    struct aio_ring *ring = (struct aio_ring *)ctx;
    unsigned int head = ring->head, tail = ring->tail;
    unsigned int nr;

    nr = tail >= head ? tail - head : ring->nr - head;
    *events = ring->io_events + head;
    /* To avoid speculative loads of s->events[i] before observing tail.
       Paired with smp_wmb() inside linux/fs/aio.c: aio_complete(). */
    smp_rmb();

    return nr;
}

/**
 * io_getevents_commit:
 * @ctx: AIO context
 * @nr: the number of events on which head should be advanced
 *
 * Advances head of a ring buffer.
 */
static inline void io_getevents_commit(io_context_t ctx, unsigned int nr)
{
    struct aio_ring *ring = (struct aio_ring *)ctx;

    if (nr) {
        ring->head = (ring->head + nr) % ring->nr;
    }
}

/**
 * io_getevents_advance_and_peek:
 * @ctx: AIO context
 * @events: pointer on events array, output value
 * @nr: the number of events on which head should be advanced
 *
 * Advances head of a ring buffer and returns number of elements left.
 */
static inline unsigned int
io_getevents_advance_and_peek(io_context_t ctx,
                              struct io_event **events,
                              unsigned int nr)
{
    io_getevents_commit(ctx, nr);
    return io_getevents_peek(ctx, events);
}

190 191 192 193 194
/**
 * qemu_laio_process_completions:
 * @s: AIO state
 *
 * Fetches completed I/O requests and invokes their callbacks.
195 196 197
 *
 * The function is somewhat tricky because it supports nested event loops, for
 * example when a request callback invokes aio_poll().  In order to do this,
198 199 200
 * indices are kept in LinuxAioState.  Function schedules BH completion so it
 * can be called again in a nested event loop.  When there are no events left
 * to complete the BH is being canceled.
201
 */
202
static void qemu_laio_process_completions(LinuxAioState *s)
203
{
204
    struct io_event *events;
205

206 207
    /* Reschedule so nested event loops see currently pending completions */
    qemu_bh_schedule(s->completion_bh);
208

209 210 211 212 213
    while ((s->event_max = io_getevents_advance_and_peek(s->ctx, &events,
                                                         s->event_idx))) {
        for (s->event_idx = 0; s->event_idx < s->event_max; ) {
            struct iocb *iocb = events[s->event_idx].obj;
            struct qemu_laiocb *laiocb =
214 215
                container_of(iocb, struct qemu_laiocb, iocb);

216
            laiocb->ret = io_event_ret(&events[s->event_idx]);
217

218 219 220 221 222
            /* Change counters one-by-one because we can be nested. */
            s->io_q.in_flight--;
            s->event_idx++;
            qemu_laio_process_completion(laiocb);
        }
223
    }
224

225 226 227 228 229 230 231
    qemu_bh_cancel(s->completion_bh);

    /* If we are nested we have to notify the level above that we are done
     * by setting event_max to zero, upper level will then jump out of it's
     * own `for` loop.  If we are the last all counters droped to zero. */
    s->event_max = 0;
    s->event_idx = 0;
232
}
233

234 235 236
static void qemu_laio_process_completions_and_submit(LinuxAioState *s)
{
    qemu_laio_process_completions(s);
237 238 239
    if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
        ioq_submit(s);
    }
240 241
}

242 243 244 245 246 247 248
static void qemu_laio_completion_bh(void *opaque)
{
    LinuxAioState *s = opaque;

    qemu_laio_process_completions_and_submit(s);
}

249 250
static void qemu_laio_completion_cb(EventNotifier *e)
{
251
    LinuxAioState *s = container_of(e, LinuxAioState, e);
252 253

    if (event_notifier_test_and_clear(&s->e)) {
254
        qemu_laio_process_completions_and_submit(s);
255 256 257
    }
}

258 259 260 261 262 263 264 265 266 267 268 269 270 271
static bool qemu_laio_poll_cb(void *opaque)
{
    EventNotifier *e = opaque;
    LinuxAioState *s = container_of(e, LinuxAioState, e);
    struct io_event *events;

    if (!io_getevents_peek(s->ctx, &events)) {
        return false;
    }

    qemu_laio_process_completions_and_submit(s);
    return true;
}

272
static void laio_cancel(BlockAIOCB *blockacb)
273 274 275 276 277
{
    struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb;
    struct io_event event;
    int ret;

278
    if (laiocb->ret != -EINPROGRESS) {
279
        return;
280
    }
281
    ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
282 283 284
    laiocb->ret = -ECANCELED;
    if (ret != 0) {
        /* iocb is not cancelled, cb will be called by the event loop later */
285 286 287
        return;
    }

288
    laiocb->common.cb(laiocb->common.opaque, laiocb->ret);
289 290
}

S
Stefan Hajnoczi 已提交
291
static const AIOCBInfo laio_aiocb_info = {
292
    .aiocb_size         = sizeof(struct qemu_laiocb),
293
    .cancel_async       = laio_cancel,
294 295
};

296 297
static void ioq_init(LaioQueue *io_q)
{
298
    QSIMPLEQ_INIT(&io_q->pending);
299
    io_q->plugged = 0;
300 301
    io_q->in_queue = 0;
    io_q->in_flight = 0;
302
    io_q->blocked = false;
303 304
}

305
static void ioq_submit(LinuxAioState *s)
306
{
307
    int ret, len;
308
    struct qemu_laiocb *aiocb;
309
    struct iocb *iocbs[MAX_EVENTS];
310
    QSIMPLEQ_HEAD(, qemu_laiocb) completed;
311

312
    do {
313 314 315
        if (s->io_q.in_flight >= MAX_EVENTS) {
            break;
        }
316 317 318
        len = 0;
        QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
            iocbs[len++] = &aiocb->iocb;
319
            if (s->io_q.in_flight + len >= MAX_EVENTS) {
320 321
                break;
            }
322
        }
323

324 325
        ret = io_submit(s->ctx, len, iocbs);
        if (ret == -EAGAIN) {
326
            break;
327 328
        }
        if (ret < 0) {
329 330 331 332 333 334 335
            /* Fail the first request, retry the rest */
            aiocb = QSIMPLEQ_FIRST(&s->io_q.pending);
            QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next);
            s->io_q.in_queue--;
            aiocb->ret = ret;
            qemu_laio_process_completion(aiocb);
            continue;
336 337
        }

338 339
        s->io_q.in_flight += ret;
        s->io_q.in_queue  -= ret;
340 341
        aiocb = container_of(iocbs[ret - 1], struct qemu_laiocb, iocb);
        QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, aiocb, next, &completed);
342
    } while (ret == len && !QSIMPLEQ_EMPTY(&s->io_q.pending));
343
    s->io_q.blocked = (s->io_q.in_queue > 0);
344 345 346 347 348 349 350 351 352 353 354 355 356

    if (s->io_q.in_flight) {
        /* We can try to complete something just right away if there are
         * still requests in-flight. */
        qemu_laio_process_completions(s);
        /*
         * Even we have completed everything (in_flight == 0), the queue can
         * have still pended requests (in_queue > 0).  We do not attempt to
         * repeat submission to avoid IO hang.  The reason is simple: s->e is
         * still set and completion callback will be called shortly and all
         * pended requests will be submitted from there.
         */
    }
357 358
}

359
void laio_io_plug(BlockDriverState *bs, LinuxAioState *s)
360
{
361
    s->io_q.plugged++;
362 363
}

364
void laio_io_unplug(BlockDriverState *bs, LinuxAioState *s)
365
{
366
    assert(s->io_q.plugged);
367 368
    if (--s->io_q.plugged == 0 &&
        !s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
369
        ioq_submit(s);
370 371 372
    }
}

373 374
static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset,
                          int type)
375
{
376 377 378
    LinuxAioState *s = laiocb->ctx;
    struct iocb *iocbs = &laiocb->iocb;
    QEMUIOVector *qiov = laiocb->qiov;
379 380 381 382 383 384 385 386

    switch (type) {
    case QEMU_AIO_WRITE:
        io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
	break;
    case QEMU_AIO_READ:
        io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset);
	break;
F
Frediano Ziglio 已提交
387
    /* Currently Linux kernel does not support other operations */
388 389 390
    default:
        fprintf(stderr, "%s: invalid AIO request type 0x%x.\n",
                        __func__, type);
391
        return -EIO;
392
    }
P
Paolo Bonzini 已提交
393
    io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
394

395
    QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
396
    s->io_q.in_queue++;
397
    if (!s->io_q.blocked &&
398 399
        (!s->io_q.plugged ||
         s->io_q.in_flight + s->io_q.in_queue >= MAX_EVENTS)) {
400
        ioq_submit(s);
401
    }
402

403 404 405 406
    return 0;
}

int coroutine_fn laio_co_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
407
                                uint64_t offset, QEMUIOVector *qiov, int type)
408 409 410 411
{
    int ret;
    struct qemu_laiocb laiocb = {
        .co         = qemu_coroutine_self(),
412
        .nbytes     = qiov->size,
413
        .ctx        = s,
414
        .ret        = -EINPROGRESS,
415 416 417 418 419 420 421 422 423
        .is_read    = (type == QEMU_AIO_READ),
        .qiov       = qiov,
    };

    ret = laio_do_submit(fd, &laiocb, offset, type);
    if (ret < 0) {
        return ret;
    }

424 425 426
    if (laiocb.ret == -EINPROGRESS) {
        qemu_coroutine_yield();
    }
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451
    return laiocb.ret;
}

BlockAIOCB *laio_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
        BlockCompletionFunc *cb, void *opaque, int type)
{
    struct qemu_laiocb *laiocb;
    off_t offset = sector_num * BDRV_SECTOR_SIZE;
    int ret;

    laiocb = qemu_aio_get(&laio_aiocb_info, bs, cb, opaque);
    laiocb->nbytes = nb_sectors * BDRV_SECTOR_SIZE;
    laiocb->ctx = s;
    laiocb->ret = -EINPROGRESS;
    laiocb->is_read = (type == QEMU_AIO_READ);
    laiocb->qiov = qiov;

    ret = laio_do_submit(fd, laiocb, offset, type);
    if (ret < 0) {
        qemu_aio_unref(laiocb);
        return NULL;
    }

    return &laiocb->common;
452 453
}

454
void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
455
{
456
    aio_set_event_notifier(old_context, &s->e, false, NULL, NULL);
457
    qemu_bh_delete(s->completion_bh);
458 459
}

460
void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
461
{
462
    s->aio_context = new_context;
463
    s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
464
    aio_set_event_notifier(new_context, &s->e, false,
465 466
                           qemu_laio_completion_cb,
                           qemu_laio_poll_cb);
467 468
}

469
LinuxAioState *laio_init(void)
470
{
471
    LinuxAioState *s;
472

473
    s = g_malloc0(sizeof(*s));
P
Paolo Bonzini 已提交
474
    if (event_notifier_init(&s->e, false) < 0) {
475
        goto out_free_state;
P
Paolo Bonzini 已提交
476
    }
477

P
Paolo Bonzini 已提交
478
    if (io_setup(MAX_EVENTS, &s->ctx) != 0) {
479
        goto out_close_efd;
P
Paolo Bonzini 已提交
480
    }
481

482 483
    ioq_init(&s->io_q);

484 485 486
    return s;

out_close_efd:
P
Paolo Bonzini 已提交
487
    event_notifier_cleanup(&s->e);
488
out_free_state:
489
    g_free(s);
490 491
    return NULL;
}
492

493
void laio_cleanup(LinuxAioState *s)
494 495
{
    event_notifier_cleanup(&s->e);
G
Gonglei 已提交
496 497 498 499 500

    if (io_destroy(s->ctx) != 0) {
        fprintf(stderr, "%s: destroy AIO context %p failed\n",
                        __func__, &s->ctx);
    }
501 502
    g_free(s);
}