linux-aio.c 9.3 KB
Newer Older
1 2 3 4 5 6 7 8 9
/*
 * Linux native AIO support.
 *
 * Copyright (C) 2009 IBM, Corp.
 * Copyright (C) 2009 Red Hat, Inc.
 *
 * This work is licensed under the terms of the GNU GPL, version 2 or later.
 * See the COPYING file in the top-level directory.
 */
P
Peter Maydell 已提交
10
#include "qemu/osdep.h"
11
#include "qemu-common.h"
12
#include "block/aio.h"
13
#include "qemu/queue.h"
14
#include "block/block.h"
15
#include "block/raw-aio.h"
16
#include "qemu/event_notifier.h"
17
#include "qemu/coroutine.h"
18 19 20 21 22 23 24 25 26 27 28 29 30

#include <libaio.h>

/*
 * Queue size (per-device).
 *
 * XXX: eventually we need to communicate this to the guest and/or make it
 *      tunable by the guest.  If we get more outstanding requests at a time
 *      than this we will get EAGAIN from io_submit which is communicated to
 *      the guest as an I/O error.
 */
#define MAX_EVENTS 128

31 32
#define MAX_QUEUED_IO  128

33
struct qemu_laiocb {
34
    BlockAIOCB common;
35
    Coroutine *co;
36
    LinuxAioState *ctx;
37 38 39
    struct iocb iocb;
    ssize_t ret;
    size_t nbytes;
40 41
    QEMUIOVector *qiov;
    bool is_read;
42
    QSIMPLEQ_ENTRY(qemu_laiocb) next;
43 44
};

45 46
typedef struct {
    int plugged;
47
    unsigned int n;
48
    bool blocked;
49
    QSIMPLEQ_HEAD(, qemu_laiocb) pending;
50 51
} LaioQueue;

52
struct LinuxAioState {
53
    io_context_t ctx;
P
Paolo Bonzini 已提交
54
    EventNotifier e;
55 56 57

    /* io queue for submit at batch */
    LaioQueue io_q;
58 59 60 61 62 63

    /* I/O completion processing */
    QEMUBH *completion_bh;
    struct io_event events[MAX_EVENTS];
    int event_idx;
    int event_max;
64 65
};

66
static void ioq_submit(LinuxAioState *s);
67

68 69 70 71 72
static inline ssize_t io_event_ret(struct io_event *ev)
{
    return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
}

K
Kevin Wolf 已提交
73 74 75
/*
 * Completes an AIO request (calls the callback and frees the ACB).
 */
76
static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
K
Kevin Wolf 已提交
77 78 79 80 81
{
    int ret;

    ret = laiocb->ret;
    if (ret != -ECANCELED) {
82
        if (ret == laiocb->nbytes) {
K
Kevin Wolf 已提交
83
            ret = 0;
84 85 86
        } else if (ret >= 0) {
            /* Short reads mean EOF, pad with zeros. */
            if (laiocb->is_read) {
87 88
                qemu_iovec_memset(laiocb->qiov, ret, 0,
                    laiocb->qiov->size - ret);
89
            } else {
90
                ret = -ENOSPC;
91 92
            }
        }
K
Kevin Wolf 已提交
93 94
    }

95 96
    laiocb->ret = ret;
    if (laiocb->co) {
97
        qemu_coroutine_enter(laiocb->co);
98 99 100 101
    } else {
        laiocb->common.cb(laiocb->common.opaque, ret);
        qemu_aio_unref(laiocb);
    }
K
Kevin Wolf 已提交
102 103
}

104 105 106 107 108
/* The completion BH fetches completed I/O requests and invokes their
 * callbacks.
 *
 * The function is somewhat tricky because it supports nested event loops, for
 * example when a request callback invokes aio_poll().  In order to do this,
109
 * the completion events array and index are kept in LinuxAioState.  The BH
110 111 112 113 114 115
 * reschedules itself as long as there are completions pending so it will
 * either be called again in a nested event loop or will be called after all
 * events have been completed.  When there are no events left to complete, the
 * BH returns without rescheduling.
 */
static void qemu_laio_completion_bh(void *opaque)
116
{
117
    LinuxAioState *s = opaque;
118

119 120
    /* Fetch more completion events when empty */
    if (s->event_idx == s->event_max) {
121
        do {
122 123 124 125 126 127 128 129 130 131 132
            struct timespec ts = { 0 };
            s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS,
                                        s->events, &ts);
        } while (s->event_max == -EINTR);

        s->event_idx = 0;
        if (s->event_max <= 0) {
            s->event_max = 0;
            return; /* no more events */
        }
    }
133

134 135
    /* Reschedule so nested event loops see currently pending completions */
    qemu_bh_schedule(s->completion_bh);
136

137 138 139 140 141 142 143 144 145
    /* Process completion events */
    while (s->event_idx < s->event_max) {
        struct iocb *iocb = s->events[s->event_idx].obj;
        struct qemu_laiocb *laiocb =
                container_of(iocb, struct qemu_laiocb, iocb);

        laiocb->ret = io_event_ret(&s->events[s->event_idx]);
        s->event_idx++;

146
        qemu_laio_process_completion(laiocb);
147
    }
148 149 150 151

    if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
        ioq_submit(s);
    }
K
Kevin Wolf 已提交
152 153

    qemu_bh_cancel(s->completion_bh);
154 155 156 157
}

static void qemu_laio_completion_cb(EventNotifier *e)
{
158
    LinuxAioState *s = container_of(e, LinuxAioState, e);
159 160

    if (event_notifier_test_and_clear(&s->e)) {
K
Kevin Wolf 已提交
161
        qemu_laio_completion_bh(s);
162 163 164
    }
}

165
static void laio_cancel(BlockAIOCB *blockacb)
166 167 168 169 170
{
    struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb;
    struct io_event event;
    int ret;

171
    if (laiocb->ret != -EINPROGRESS) {
172
        return;
173
    }
174
    ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
175 176 177
    laiocb->ret = -ECANCELED;
    if (ret != 0) {
        /* iocb is not cancelled, cb will be called by the event loop later */
178 179 180
        return;
    }

181
    laiocb->common.cb(laiocb->common.opaque, laiocb->ret);
182 183
}

S
Stefan Hajnoczi 已提交
184
static const AIOCBInfo laio_aiocb_info = {
185
    .aiocb_size         = sizeof(struct qemu_laiocb),
186
    .cancel_async       = laio_cancel,
187 188
};

189 190
static void ioq_init(LaioQueue *io_q)
{
191
    QSIMPLEQ_INIT(&io_q->pending);
192
    io_q->plugged = 0;
193
    io_q->n = 0;
194
    io_q->blocked = false;
195 196
}

197
static void ioq_submit(LinuxAioState *s)
198
{
199
    int ret, len;
200 201
    struct qemu_laiocb *aiocb;
    struct iocb *iocbs[MAX_QUEUED_IO];
202
    QSIMPLEQ_HEAD(, qemu_laiocb) completed;
203

204 205 206 207 208 209 210
    do {
        len = 0;
        QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
            iocbs[len++] = &aiocb->iocb;
            if (len == MAX_QUEUED_IO) {
                break;
            }
211
        }
212

213 214
        ret = io_submit(s->ctx, len, iocbs);
        if (ret == -EAGAIN) {
215
            break;
216 217 218 219 220
        }
        if (ret < 0) {
            abort();
        }

221 222 223
        s->io_q.n -= ret;
        aiocb = container_of(iocbs[ret - 1], struct qemu_laiocb, iocb);
        QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, aiocb, next, &completed);
224
    } while (ret == len && !QSIMPLEQ_EMPTY(&s->io_q.pending));
225
    s->io_q.blocked = (s->io_q.n > 0);
226 227
}

228
void laio_io_plug(BlockDriverState *bs, LinuxAioState *s)
229
{
230 231
    assert(!s->io_q.plugged);
    s->io_q.plugged = 1;
232 233
}

234
void laio_io_unplug(BlockDriverState *bs, LinuxAioState *s)
235
{
236 237
    assert(s->io_q.plugged);
    s->io_q.plugged = 0;
238
    if (!s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
239
        ioq_submit(s);
240 241 242
    }
}

243 244
static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset,
                          int type)
245
{
246 247 248
    LinuxAioState *s = laiocb->ctx;
    struct iocb *iocbs = &laiocb->iocb;
    QEMUIOVector *qiov = laiocb->qiov;
249 250 251 252 253 254 255 256

    switch (type) {
    case QEMU_AIO_WRITE:
        io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
	break;
    case QEMU_AIO_READ:
        io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset);
	break;
F
Frediano Ziglio 已提交
257
    /* Currently Linux kernel does not support other operations */
258 259 260
    default:
        fprintf(stderr, "%s: invalid AIO request type 0x%x.\n",
                        __func__, type);
261
        return -EIO;
262
    }
P
Paolo Bonzini 已提交
263
    io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
264

265
    QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
266
    s->io_q.n++;
267
    if (!s->io_q.blocked &&
268
        (!s->io_q.plugged || s->io_q.n >= MAX_QUEUED_IO)) {
269
        ioq_submit(s);
270
    }
271

272 273 274 275
    return 0;
}

int coroutine_fn laio_co_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
276
                                uint64_t offset, QEMUIOVector *qiov, int type)
277 278 279 280
{
    int ret;
    struct qemu_laiocb laiocb = {
        .co         = qemu_coroutine_self(),
281
        .nbytes     = qiov->size,
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
        .ctx        = s,
        .is_read    = (type == QEMU_AIO_READ),
        .qiov       = qiov,
    };

    ret = laio_do_submit(fd, &laiocb, offset, type);
    if (ret < 0) {
        return ret;
    }

    qemu_coroutine_yield();
    return laiocb.ret;
}

BlockAIOCB *laio_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
        BlockCompletionFunc *cb, void *opaque, int type)
{
    struct qemu_laiocb *laiocb;
    off_t offset = sector_num * BDRV_SECTOR_SIZE;
    int ret;

    laiocb = qemu_aio_get(&laio_aiocb_info, bs, cb, opaque);
    laiocb->nbytes = nb_sectors * BDRV_SECTOR_SIZE;
    laiocb->ctx = s;
    laiocb->ret = -EINPROGRESS;
    laiocb->is_read = (type == QEMU_AIO_READ);
    laiocb->qiov = qiov;

    ret = laio_do_submit(fd, laiocb, offset, type);
    if (ret < 0) {
        qemu_aio_unref(laiocb);
        return NULL;
    }

    return &laiocb->common;
318 319
}

320
void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
321
{
322
    aio_set_event_notifier(old_context, &s->e, false, NULL);
323
    qemu_bh_delete(s->completion_bh);
324 325
}

326
void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
327
{
328
    s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
329 330
    aio_set_event_notifier(new_context, &s->e, false,
                           qemu_laio_completion_cb);
331 332
}

333
LinuxAioState *laio_init(void)
334
{
335
    LinuxAioState *s;
336

337
    s = g_malloc0(sizeof(*s));
P
Paolo Bonzini 已提交
338
    if (event_notifier_init(&s->e, false) < 0) {
339
        goto out_free_state;
P
Paolo Bonzini 已提交
340
    }
341

P
Paolo Bonzini 已提交
342
    if (io_setup(MAX_EVENTS, &s->ctx) != 0) {
343
        goto out_close_efd;
P
Paolo Bonzini 已提交
344
    }
345

346 347
    ioq_init(&s->io_q);

348 349 350
    return s;

out_close_efd:
P
Paolo Bonzini 已提交
351
    event_notifier_cleanup(&s->e);
352
out_free_state:
353
    g_free(s);
354 355
    return NULL;
}
356

357
void laio_cleanup(LinuxAioState *s)
358 359
{
    event_notifier_cleanup(&s->e);
G
Gonglei 已提交
360 361 362 363 364

    if (io_destroy(s->ctx) != 0) {
        fprintf(stderr, "%s: destroy AIO context %p failed\n",
                        __func__, &s->ctx);
    }
365 366
    g_free(s);
}