linux-aio.c 12.2 KB
Newer Older
1 2 3 4 5 6 7 8 9
/*
 * Linux native AIO support.
 *
 * Copyright (C) 2009 IBM, Corp.
 * Copyright (C) 2009 Red Hat, Inc.
 *
 * This work is licensed under the terms of the GNU GPL, version 2 or later.
 * See the COPYING file in the top-level directory.
 */
P
Peter Maydell 已提交
10
#include "qemu/osdep.h"
11
#include "qemu-common.h"
12
#include "block/aio.h"
13
#include "qemu/queue.h"
14
#include "block/block.h"
15
#include "block/raw-aio.h"
16
#include "qemu/event_notifier.h"
17
#include "qemu/coroutine.h"
18 19 20 21 22 23 24 25 26 27 28 29 30 31

#include <libaio.h>

/*
 * Queue size (per-device).
 *
 * XXX: eventually we need to communicate this to the guest and/or make it
 *      tunable by the guest.  If we get more outstanding requests at a time
 *      than this we will get EAGAIN from io_submit which is communicated to
 *      the guest as an I/O error.
 */
#define MAX_EVENTS 128

struct qemu_laiocb {
32
    BlockAIOCB common;
33
    Coroutine *co;
34
    LinuxAioState *ctx;
35 36 37
    struct iocb iocb;
    ssize_t ret;
    size_t nbytes;
38 39
    QEMUIOVector *qiov;
    bool is_read;
40
    QSIMPLEQ_ENTRY(qemu_laiocb) next;
41 42
};

43 44
typedef struct {
    int plugged;
45 46
    unsigned int in_queue;
    unsigned int in_flight;
47
    bool blocked;
48
    QSIMPLEQ_HEAD(, qemu_laiocb) pending;
49 50
} LaioQueue;

51
struct LinuxAioState {
52 53
    AioContext *aio_context;

54
    io_context_t ctx;
P
Paolo Bonzini 已提交
55
    EventNotifier e;
56 57 58

    /* io queue for submit at batch */
    LaioQueue io_q;
59 60 61 62 63

    /* I/O completion processing */
    QEMUBH *completion_bh;
    int event_idx;
    int event_max;
64 65
};

66
static void ioq_submit(LinuxAioState *s);
67

68 69 70 71 72
static inline ssize_t io_event_ret(struct io_event *ev)
{
    return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
}

K
Kevin Wolf 已提交
73 74 75
/*
 * Completes an AIO request (calls the callback and frees the ACB).
 */
76
static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
K
Kevin Wolf 已提交
77 78 79 80 81
{
    int ret;

    ret = laiocb->ret;
    if (ret != -ECANCELED) {
82
        if (ret == laiocb->nbytes) {
K
Kevin Wolf 已提交
83
            ret = 0;
84 85 86
        } else if (ret >= 0) {
            /* Short reads mean EOF, pad with zeros. */
            if (laiocb->is_read) {
87 88
                qemu_iovec_memset(laiocb->qiov, ret, 0,
                    laiocb->qiov->size - ret);
89
            } else {
90
                ret = -ENOSPC;
91 92
            }
        }
K
Kevin Wolf 已提交
93 94
    }

95 96
    laiocb->ret = ret;
    if (laiocb->co) {
97
        qemu_coroutine_enter(laiocb->co);
98 99 100 101
    } else {
        laiocb->common.cb(laiocb->common.opaque, ret);
        qemu_aio_unref(laiocb);
    }
K
Kevin Wolf 已提交
102 103
}

104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
/**
 * aio_ring buffer which is shared between userspace and kernel.
 *
 * This copied from linux/fs/aio.c, common header does not exist
 * but AIO exists for ages so we assume ABI is stable.
 */
struct aio_ring {
    unsigned    id;    /* kernel internal index number */
    unsigned    nr;    /* number of io_events */
    unsigned    head;  /* Written to by userland or by kernel. */
    unsigned    tail;

    unsigned    magic;
    unsigned    compat_features;
    unsigned    incompat_features;
    unsigned    header_length;  /* size of aio_ring */

    struct io_event io_events[0];
};

/**
 * io_getevents_peek:
 * @ctx: AIO context
 * @events: pointer on events array, output value

 * Returns the number of completed events and sets a pointer
 * on events array.  This function does not update the internal
 * ring buffer, only reads head and tail.  When @events has been
 * processed io_getevents_commit() must be called.
 */
static inline unsigned int io_getevents_peek(io_context_t ctx,
                                             struct io_event **events)
{
    struct aio_ring *ring = (struct aio_ring *)ctx;
    unsigned int head = ring->head, tail = ring->tail;
    unsigned int nr;

    nr = tail >= head ? tail - head : ring->nr - head;
    *events = ring->io_events + head;
    /* To avoid speculative loads of s->events[i] before observing tail.
       Paired with smp_wmb() inside linux/fs/aio.c: aio_complete(). */
    smp_rmb();

    return nr;
}

/**
 * io_getevents_commit:
 * @ctx: AIO context
 * @nr: the number of events on which head should be advanced
 *
 * Advances head of a ring buffer.
 */
static inline void io_getevents_commit(io_context_t ctx, unsigned int nr)
{
    struct aio_ring *ring = (struct aio_ring *)ctx;

    if (nr) {
        ring->head = (ring->head + nr) % ring->nr;
    }
}

/**
 * io_getevents_advance_and_peek:
 * @ctx: AIO context
 * @events: pointer on events array, output value
 * @nr: the number of events on which head should be advanced
 *
 * Advances head of a ring buffer and returns number of elements left.
 */
static inline unsigned int
io_getevents_advance_and_peek(io_context_t ctx,
                              struct io_event **events,
                              unsigned int nr)
{
    io_getevents_commit(ctx, nr);
    return io_getevents_peek(ctx, events);
}

183 184 185 186 187
/* The completion BH fetches completed I/O requests and invokes their
 * callbacks.
 *
 * The function is somewhat tricky because it supports nested event loops, for
 * example when a request callback invokes aio_poll().  In order to do this,
188
 * the completion events array and index are kept in LinuxAioState.  The BH
189 190 191 192 193 194
 * reschedules itself as long as there are completions pending so it will
 * either be called again in a nested event loop or will be called after all
 * events have been completed.  When there are no events left to complete, the
 * BH returns without rescheduling.
 */
static void qemu_laio_completion_bh(void *opaque)
195
{
196
    LinuxAioState *s = opaque;
197
    struct io_event *events;
198

199 200
    /* Reschedule so nested event loops see currently pending completions */
    qemu_bh_schedule(s->completion_bh);
201

202 203 204 205 206
    while ((s->event_max = io_getevents_advance_and_peek(s->ctx, &events,
                                                         s->event_idx))) {
        for (s->event_idx = 0; s->event_idx < s->event_max; ) {
            struct iocb *iocb = events[s->event_idx].obj;
            struct qemu_laiocb *laiocb =
207 208
                container_of(iocb, struct qemu_laiocb, iocb);

209
            laiocb->ret = io_event_ret(&events[s->event_idx]);
210

211 212 213 214 215
            /* Change counters one-by-one because we can be nested. */
            s->io_q.in_flight--;
            s->event_idx++;
            qemu_laio_process_completion(laiocb);
        }
216
    }
217

218 219 220 221 222 223 224 225
    qemu_bh_cancel(s->completion_bh);

    /* If we are nested we have to notify the level above that we are done
     * by setting event_max to zero, upper level will then jump out of it's
     * own `for` loop.  If we are the last all counters droped to zero. */
    s->event_max = 0;
    s->event_idx = 0;

226 227 228
    if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
        ioq_submit(s);
    }
229 230 231 232
}

static void qemu_laio_completion_cb(EventNotifier *e)
{
233
    LinuxAioState *s = container_of(e, LinuxAioState, e);
234 235

    if (event_notifier_test_and_clear(&s->e)) {
K
Kevin Wolf 已提交
236
        qemu_laio_completion_bh(s);
237 238 239
    }
}

240
static void laio_cancel(BlockAIOCB *blockacb)
241 242 243 244 245
{
    struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb;
    struct io_event event;
    int ret;

246
    if (laiocb->ret != -EINPROGRESS) {
247
        return;
248
    }
249
    ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
250 251 252
    laiocb->ret = -ECANCELED;
    if (ret != 0) {
        /* iocb is not cancelled, cb will be called by the event loop later */
253 254 255
        return;
    }

256
    laiocb->common.cb(laiocb->common.opaque, laiocb->ret);
257 258
}

S
Stefan Hajnoczi 已提交
259
static const AIOCBInfo laio_aiocb_info = {
260
    .aiocb_size         = sizeof(struct qemu_laiocb),
261
    .cancel_async       = laio_cancel,
262 263
};

264 265
static void ioq_init(LaioQueue *io_q)
{
266
    QSIMPLEQ_INIT(&io_q->pending);
267
    io_q->plugged = 0;
268 269
    io_q->in_queue = 0;
    io_q->in_flight = 0;
270
    io_q->blocked = false;
271 272
}

273
static void ioq_submit(LinuxAioState *s)
274
{
275
    int ret, len;
276
    struct qemu_laiocb *aiocb;
277
    struct iocb *iocbs[MAX_EVENTS];
278
    QSIMPLEQ_HEAD(, qemu_laiocb) completed;
279

280
    do {
281 282 283
        if (s->io_q.in_flight >= MAX_EVENTS) {
            break;
        }
284 285 286
        len = 0;
        QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
            iocbs[len++] = &aiocb->iocb;
287
            if (s->io_q.in_flight + len >= MAX_EVENTS) {
288 289
                break;
            }
290
        }
291

292 293
        ret = io_submit(s->ctx, len, iocbs);
        if (ret == -EAGAIN) {
294
            break;
295 296
        }
        if (ret < 0) {
297 298 299 300 301 302 303
            /* Fail the first request, retry the rest */
            aiocb = QSIMPLEQ_FIRST(&s->io_q.pending);
            QSIMPLEQ_REMOVE_HEAD(&s->io_q.pending, next);
            s->io_q.in_queue--;
            aiocb->ret = ret;
            qemu_laio_process_completion(aiocb);
            continue;
304 305
        }

306 307
        s->io_q.in_flight += ret;
        s->io_q.in_queue  -= ret;
308 309
        aiocb = container_of(iocbs[ret - 1], struct qemu_laiocb, iocb);
        QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, aiocb, next, &completed);
310
    } while (ret == len && !QSIMPLEQ_EMPTY(&s->io_q.pending));
311
    s->io_q.blocked = (s->io_q.in_queue > 0);
312 313
}

314
void laio_io_plug(BlockDriverState *bs, LinuxAioState *s)
315
{
316
    s->io_q.plugged++;
317 318
}

319
void laio_io_unplug(BlockDriverState *bs, LinuxAioState *s)
320
{
321
    assert(s->io_q.plugged);
322 323
    if (--s->io_q.plugged == 0 &&
        !s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
324
        ioq_submit(s);
325 326 327
    }
}

328 329
static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset,
                          int type)
330
{
331 332 333
    LinuxAioState *s = laiocb->ctx;
    struct iocb *iocbs = &laiocb->iocb;
    QEMUIOVector *qiov = laiocb->qiov;
334 335 336 337 338 339 340 341

    switch (type) {
    case QEMU_AIO_WRITE:
        io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
	break;
    case QEMU_AIO_READ:
        io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset);
	break;
F
Frediano Ziglio 已提交
342
    /* Currently Linux kernel does not support other operations */
343 344 345
    default:
        fprintf(stderr, "%s: invalid AIO request type 0x%x.\n",
                        __func__, type);
346
        return -EIO;
347
    }
P
Paolo Bonzini 已提交
348
    io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
349

350
    QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
351
    s->io_q.in_queue++;
352
    if (!s->io_q.blocked &&
353 354
        (!s->io_q.plugged ||
         s->io_q.in_flight + s->io_q.in_queue >= MAX_EVENTS)) {
355
        ioq_submit(s);
356
    }
357

358 359 360 361
    return 0;
}

int coroutine_fn laio_co_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
362
                                uint64_t offset, QEMUIOVector *qiov, int type)
363 364 365 366
{
    int ret;
    struct qemu_laiocb laiocb = {
        .co         = qemu_coroutine_self(),
367
        .nbytes     = qiov->size,
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
        .ctx        = s,
        .is_read    = (type == QEMU_AIO_READ),
        .qiov       = qiov,
    };

    ret = laio_do_submit(fd, &laiocb, offset, type);
    if (ret < 0) {
        return ret;
    }

    qemu_coroutine_yield();
    return laiocb.ret;
}

BlockAIOCB *laio_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
        BlockCompletionFunc *cb, void *opaque, int type)
{
    struct qemu_laiocb *laiocb;
    off_t offset = sector_num * BDRV_SECTOR_SIZE;
    int ret;

    laiocb = qemu_aio_get(&laio_aiocb_info, bs, cb, opaque);
    laiocb->nbytes = nb_sectors * BDRV_SECTOR_SIZE;
    laiocb->ctx = s;
    laiocb->ret = -EINPROGRESS;
    laiocb->is_read = (type == QEMU_AIO_READ);
    laiocb->qiov = qiov;

    ret = laio_do_submit(fd, laiocb, offset, type);
    if (ret < 0) {
        qemu_aio_unref(laiocb);
        return NULL;
    }

    return &laiocb->common;
404 405
}

406
void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
407
{
408
    aio_set_event_notifier(old_context, &s->e, false, NULL);
409
    qemu_bh_delete(s->completion_bh);
410 411
}

412
void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
413
{
414
    s->aio_context = new_context;
415
    s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
416 417
    aio_set_event_notifier(new_context, &s->e, false,
                           qemu_laio_completion_cb);
418 419
}

420
LinuxAioState *laio_init(void)
421
{
422
    LinuxAioState *s;
423

424
    s = g_malloc0(sizeof(*s));
P
Paolo Bonzini 已提交
425
    if (event_notifier_init(&s->e, false) < 0) {
426
        goto out_free_state;
P
Paolo Bonzini 已提交
427
    }
428

P
Paolo Bonzini 已提交
429
    if (io_setup(MAX_EVENTS, &s->ctx) != 0) {
430
        goto out_close_efd;
P
Paolo Bonzini 已提交
431
    }
432

433 434
    ioq_init(&s->io_q);

435 436 437
    return s;

out_close_efd:
P
Paolo Bonzini 已提交
438
    event_notifier_cleanup(&s->e);
439
out_free_state:
440
    g_free(s);
441 442
    return NULL;
}
443

444
void laio_cleanup(LinuxAioState *s)
445 446
{
    event_notifier_cleanup(&s->e);
G
Gonglei 已提交
447 448 449 450 451

    if (io_destroy(s->ctx) != 0) {
        fprintf(stderr, "%s: destroy AIO context %p failed\n",
                        __func__, &s->ctx);
    }
452 453
    g_free(s);
}