linux-aio.c 13.3 KB
Newer Older
1 2 3 4 5 6 7 8 9
/*
 * Linux native AIO support.
 *
 * Copyright (C) 2009 IBM, Corp.
 * Copyright (C) 2009 Red Hat, Inc.
 *
 * This work is licensed under the terms of the GNU GPL, version 2 or later.
 * See the COPYING file in the top-level directory.
 */
P
Peter Maydell 已提交
10
#include "qemu/osdep.h"
11
#include "qemu-common.h"
12
#include "block/aio.h"
13
#include "qemu/queue.h"
14
#include "block/block.h"
15
#include "block/raw-aio.h"
16
#include "qemu/event_notifier.h"
17
#include "qemu/coroutine.h"
18 19 20 21 22 23 24 25 26 27 28 29 30 31

#include <libaio.h>

/*
 * Queue size (per-device).
 *
 * XXX: eventually we need to communicate this to the guest and/or make it
 *      tunable by the guest.  If we get more outstanding requests at a time
 *      than this we will get EAGAIN from io_submit which is communicated to
 *      the guest as an I/O error.
 */
#define MAX_EVENTS 128

struct qemu_laiocb {
32
    BlockAIOCB common;
33
    Coroutine *co;
34
    LinuxAioState *ctx;
35 36 37
    struct iocb iocb;
    ssize_t ret;
    size_t nbytes;
38 39
    QEMUIOVector *qiov;
    bool is_read;
40
    QSIMPLEQ_ENTRY(qemu_laiocb) next;
41 42
};

43 44
typedef struct {
    int plugged;
45 46
    unsigned int in_queue;
    unsigned int in_flight;
47
    bool blocked;
48
    QSIMPLEQ_HEAD(, qemu_laiocb) pending;
49 50
} LaioQueue;

51
struct LinuxAioState {
52 53
    AioContext *aio_context;

54
    io_context_t ctx;
P
Paolo Bonzini 已提交
55
    EventNotifier e;
56 57 58

    /* io queue for submit at batch */
    LaioQueue io_q;
59 60 61 62 63

    /* I/O completion processing */
    QEMUBH *completion_bh;
    int event_idx;
    int event_max;
64 65
};

66
static void ioq_submit(LinuxAioState *s);
67

68 69 70 71 72
static inline ssize_t io_event_ret(struct io_event *ev)
{
    return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
}

K
Kevin Wolf 已提交
73 74 75
/*
 * Completes an AIO request (calls the callback and frees the ACB).
 */
76
static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
K
Kevin Wolf 已提交
77 78 79 80 81
{
    int ret;

    ret = laiocb->ret;
    if (ret != -ECANCELED) {
82
        if (ret == laiocb->nbytes) {
K
Kevin Wolf 已提交
83
            ret = 0;
84 85 86
        } else if (ret >= 0) {
            /* Short reads mean EOF, pad with zeros. */
            if (laiocb->is_read) {
87 88
                qemu_iovec_memset(laiocb->qiov, ret, 0,
                    laiocb->qiov->size - ret);
89
            } else {
90
                ret = -ENOSPC;
91 92
            }
        }
K
Kevin Wolf 已提交
93 94
    }

95 96
    laiocb->ret = ret;
    if (laiocb->co) {
97 98 99 100 101 102
        /* If the coroutine is already entered it must be in ioq_submit() and
         * will notice laio->ret has been filled in when it eventually runs
         * later.  Coroutines cannot be entered recursively so avoid doing
         * that!
         */
        if (!qemu_coroutine_entered(laiocb->co)) {
103 104
            qemu_coroutine_enter(laiocb->co);
        }
105 106 107 108
    } else {
        laiocb->common.cb(laiocb->common.opaque, ret);
        qemu_aio_unref(laiocb);
    }
K
Kevin Wolf 已提交
109 110
}

111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
/**
 * aio_ring buffer which is shared between userspace and kernel.
 *
 * This copied from linux/fs/aio.c, common header does not exist
 * but AIO exists for ages so we assume ABI is stable.
 */
struct aio_ring {
    unsigned    id;    /* kernel internal index number */
    unsigned    nr;    /* number of io_events */
    unsigned    head;  /* Written to by userland or by kernel. */
    unsigned    tail;

    unsigned    magic;
    unsigned    compat_features;
    unsigned    incompat_features;
    unsigned    header_length;  /* size of aio_ring */

    struct io_event io_events[0];
};

/**
 * io_getevents_peek:
 * @ctx: AIO context
 * @events: pointer on events array, output value

 * Returns the number of completed events and sets a pointer
 * on events array.  This function does not update the internal
 * ring buffer, only reads head and tail.  When @events has been
 * processed io_getevents_commit() must be called.
 */
static inline unsigned int io_getevents_peek(io_context_t ctx,
                                             struct io_event **events)
{
    struct aio_ring *ring = (struct aio_ring *)ctx;
    unsigned int head = ring->head, tail = ring->tail;
    unsigned int nr;

    nr = tail >= head ? tail - head : ring->nr - head;
    *events = ring->io_events + head;
    /* To avoid speculative loads of s->events[i] before observing tail.
       Paired with smp_wmb() inside linux/fs/aio.c: aio_complete(). */
    smp_rmb();

    return nr;
}

/**
 * io_getevents_commit:
 * @ctx: AIO context
 * @nr: the number of events on which head should be advanced
 *
 * Advances head of a ring buffer.
 */
static inline void io_getevents_commit(io_context_t ctx, unsigned int nr)
{
    struct aio_ring *ring = (struct aio_ring *)ctx;

    if (nr) {
        ring->head = (ring->head + nr) % ring->nr;
    }
}

/**
 * io_getevents_advance_and_peek:
 * @ctx: AIO context
 * @events: pointer on events array, output value
 * @nr: the number of events on which head should be advanced
 *
 * Advances head of a ring buffer and returns number of elements left.
 */
static inline unsigned int
io_getevents_advance_and_peek(io_context_t ctx,
                              struct io_event **events,
                              unsigned int nr)
{
    io_getevents_commit(ctx, nr);
    return io_getevents_peek(ctx, events);
}

190 191 192 193 194
/**
 * qemu_laio_process_completions:
 * @s: AIO state
 *
 * Fetches completed I/O requests and invokes their callbacks.
195 196 197
 *
 * The function is somewhat tricky because it supports nested event loops, for
 * example when a request callback invokes aio_poll().  In order to do this,
198 199 200
 * indices are kept in LinuxAioState.  Function schedules BH completion so it
 * can be called again in a nested event loop.  When there are no events left
 * to complete the BH is being canceled.
201
 */
202
static void qemu_laio_process_completions(LinuxAioState *s)
203
{
204
    struct io_event *events;
205

206 207
    /* Reschedule so nested event loops see currently pending completions */
    qemu_bh_schedule(s->completion_bh);
208

209 210 211 212 213
    while ((s->event_max = io_getevents_advance_and_peek(s->ctx, &events,
                                                         s->event_idx))) {
        for (s->event_idx = 0; s->event_idx < s->event_max; ) {
            struct iocb *iocb = events[s->event_idx].obj;
            struct qemu_laiocb *laiocb =
214 215
                container_of(iocb, struct qemu_laiocb, iocb);

216
            laiocb->ret = io_event_ret(&events[s->event_idx]);
217

218 219 220 221 222
            /* Change counters one-by-one because we can be nested. */
            s->io_q.in_flight--;
            s->event_idx++;
            qemu_laio_process_completion(laiocb);
        }
223
    }
224

225 226 227 228 229 230 231
    qemu_bh_cancel(s->completion_bh);

    /* If we are nested we have to notify the level above that we are done
     * by setting event_max to zero, upper level will then jump out of it's
     * own `for` loop.  If we are the last all counters droped to zero. */
    s->event_max = 0;
    s->event_idx = 0;
232
}
233

234 235 236
static void qemu_laio_process_completions_and_submit(LinuxAioState *s)
{
    qemu_laio_process_completions(s);
237 238 239
    if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
        ioq_submit(s);
    }
240 241
}

242 243 244 245 246 247 248
static void qemu_laio_completion_bh(void *opaque)
{
    LinuxAioState *s = opaque;

    qemu_laio_process_completions_and_submit(s);
}

249 250
static void qemu_laio_completion_cb(EventNotifier *e)
{
251
    LinuxAioState *s = container_of(e, LinuxAioState, e);
252 253

    if (event_notifier_test_and_clear(&s->e)) {
254
        qemu_laio_process_completions_and_submit(s);
255 256 257
    }
}

258
static void laio_cancel(BlockAIOCB *blockacb)
259 260 261 262 263
{
    struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb;
    struct io_event event;
    int ret;

264
    if (laiocb->ret != -EINPROGRESS) {
265
        return;
266
    }
267
    ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
268 269 270
    laiocb->ret = -ECANCELED;
    if (ret != 0) {
        /* iocb is not cancelled, cb will be called by the event loop later */
271 272 273
        return;
    }

274
    laiocb->common.cb(laiocb->common.opaque, laiocb->ret);
275 276
}

S
Stefan Hajnoczi 已提交
277
static const AIOCBInfo laio_aiocb_info = {
278
    .aiocb_size         = sizeof(struct qemu_laiocb),
279
    .cancel_async       = laio_cancel,
280 281
};

282 283
static void ioq_init(LaioQueue *io_q)
{
284
    QSIMPLEQ_INIT(&io_q->pending);
285
    io_q->plugged = 0;
286 287
    io_q->in_queue = 0;
    io_q->in_flight = 0;
288
    io_q->blocked = false;
289 290
}

291
static void ioq_submit(LinuxAioState *s)
292
{
293
    int ret, len;
294
    struct qemu_laiocb *aiocb;
295
    struct iocb *iocbs[MAX_EVENTS];
296
    QSIMPLEQ_HEAD(, qemu_laiocb) completed;
297

298
    do {
299 300 301
        if (s->io_q.in_flight >= MAX_EVENTS) {
            break;
        }
302 303 304
        len = 0;
        QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
            iocbs[len++] = &aiocb->iocb;
305
            if (s->io_q.in_flight + len >= MAX_EVENTS) {
306 307
                break;
            }
308
        }
309

310 311
        ret = io_submit(s->ctx, len, iocbs);
        if (ret == -EAGAIN) {
312
            break;
313 314
        }
        if (ret < 0) {
315 316 317 318 319 320 321
            /* Fail the first request, retry the rest */
            aiocb = QSIMPLEQ_FIRST(&s->io_q.pending);
            QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next);
            s->io_q.in_queue--;
            aiocb->ret = ret;
            qemu_laio_process_completion(aiocb);
            continue;
322 323
        }

324 325
        s->io_q.in_flight += ret;
        s->io_q.in_queue  -= ret;
326 327
        aiocb = container_of(iocbs[ret - 1], struct qemu_laiocb, iocb);
        QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, aiocb, next, &completed);
328
    } while (ret == len && !QSIMPLEQ_EMPTY(&s->io_q.pending));
329
    s->io_q.blocked = (s->io_q.in_queue > 0);
330 331 332 333 334 335 336 337 338 339 340 341 342

    if (s->io_q.in_flight) {
        /* We can try to complete something just right away if there are
         * still requests in-flight. */
        qemu_laio_process_completions(s);
        /*
         * Even we have completed everything (in_flight == 0), the queue can
         * have still pended requests (in_queue > 0).  We do not attempt to
         * repeat submission to avoid IO hang.  The reason is simple: s->e is
         * still set and completion callback will be called shortly and all
         * pended requests will be submitted from there.
         */
    }
343 344
}

345
void laio_io_plug(BlockDriverState *bs, LinuxAioState *s)
346
{
347
    s->io_q.plugged++;
348 349
}

350
void laio_io_unplug(BlockDriverState *bs, LinuxAioState *s)
351
{
352
    assert(s->io_q.plugged);
353 354
    if (--s->io_q.plugged == 0 &&
        !s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
355
        ioq_submit(s);
356 357 358
    }
}

359 360
static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset,
                          int type)
361
{
362 363 364
    LinuxAioState *s = laiocb->ctx;
    struct iocb *iocbs = &laiocb->iocb;
    QEMUIOVector *qiov = laiocb->qiov;
365 366 367 368 369 370 371 372

    switch (type) {
    case QEMU_AIO_WRITE:
        io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
	break;
    case QEMU_AIO_READ:
        io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset);
	break;
F
Frediano Ziglio 已提交
373
    /* Currently Linux kernel does not support other operations */
374 375 376
    default:
        fprintf(stderr, "%s: invalid AIO request type 0x%x.\n",
                        __func__, type);
377
        return -EIO;
378
    }
P
Paolo Bonzini 已提交
379
    io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
380

381
    QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
382
    s->io_q.in_queue++;
383
    if (!s->io_q.blocked &&
384 385
        (!s->io_q.plugged ||
         s->io_q.in_flight + s->io_q.in_queue >= MAX_EVENTS)) {
386
        ioq_submit(s);
387
    }
388

389 390 391 392
    return 0;
}

int coroutine_fn laio_co_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
393
                                uint64_t offset, QEMUIOVector *qiov, int type)
394 395 396 397
{
    int ret;
    struct qemu_laiocb laiocb = {
        .co         = qemu_coroutine_self(),
398
        .nbytes     = qiov->size,
399
        .ctx        = s,
400
        .ret        = -EINPROGRESS,
401 402 403 404 405 406 407 408 409
        .is_read    = (type == QEMU_AIO_READ),
        .qiov       = qiov,
    };

    ret = laio_do_submit(fd, &laiocb, offset, type);
    if (ret < 0) {
        return ret;
    }

410 411 412
    if (laiocb.ret == -EINPROGRESS) {
        qemu_coroutine_yield();
    }
413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437
    return laiocb.ret;
}

BlockAIOCB *laio_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
        BlockCompletionFunc *cb, void *opaque, int type)
{
    struct qemu_laiocb *laiocb;
    off_t offset = sector_num * BDRV_SECTOR_SIZE;
    int ret;

    laiocb = qemu_aio_get(&laio_aiocb_info, bs, cb, opaque);
    laiocb->nbytes = nb_sectors * BDRV_SECTOR_SIZE;
    laiocb->ctx = s;
    laiocb->ret = -EINPROGRESS;
    laiocb->is_read = (type == QEMU_AIO_READ);
    laiocb->qiov = qiov;

    ret = laio_do_submit(fd, laiocb, offset, type);
    if (ret < 0) {
        qemu_aio_unref(laiocb);
        return NULL;
    }

    return &laiocb->common;
438 439
}

440
void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
441
{
442
    aio_set_event_notifier(old_context, &s->e, false, NULL, NULL);
443
    qemu_bh_delete(s->completion_bh);
444 445
}

446
void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
447
{
448
    s->aio_context = new_context;
449
    s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
450
    aio_set_event_notifier(new_context, &s->e, false,
451
                           qemu_laio_completion_cb, NULL);
452 453
}

454
LinuxAioState *laio_init(void)
455
{
456
    LinuxAioState *s;
457

458
    s = g_malloc0(sizeof(*s));
P
Paolo Bonzini 已提交
459
    if (event_notifier_init(&s->e, false) < 0) {
460
        goto out_free_state;
P
Paolo Bonzini 已提交
461
    }
462

P
Paolo Bonzini 已提交
463
    if (io_setup(MAX_EVENTS, &s->ctx) != 0) {
464
        goto out_close_efd;
P
Paolo Bonzini 已提交
465
    }
466

467 468
    ioq_init(&s->io_q);

469 470 471
    return s;

out_close_efd:
P
Paolo Bonzini 已提交
472
    event_notifier_cleanup(&s->e);
473
out_free_state:
474
    g_free(s);
475 476
    return NULL;
}
477

478
void laio_cleanup(LinuxAioState *s)
479 480
{
    event_notifier_cleanup(&s->e);
G
Gonglei 已提交
481 482 483 484 485

    if (io_destroy(s->ctx) != 0) {
        fprintf(stderr, "%s: destroy AIO context %p failed\n",
                        __func__, &s->ctx);
    }
486 487
    g_free(s);
}