You need to sign in or sign up before continuing.
linux-aio.c 9.4 KB
Newer Older
1 2 3 4 5 6 7 8 9
/*
 * Linux native AIO support.
 *
 * Copyright (C) 2009 IBM, Corp.
 * Copyright (C) 2009 Red Hat, Inc.
 *
 * This work is licensed under the terms of the GNU GPL, version 2 or later.
 * See the COPYING file in the top-level directory.
 */
P
Peter Maydell 已提交
10
#include "qemu/osdep.h"
11
#include "qemu-common.h"
12
#include "block/aio.h"
13
#include "qemu/queue.h"
14
#include "block/block.h"
15
#include "block/raw-aio.h"
16
#include "qemu/event_notifier.h"
17
#include "qemu/coroutine.h"
18 19 20 21 22 23 24 25 26 27 28 29 30

#include <libaio.h>

/*
 * Queue size (per-device).
 *
 * XXX: eventually we need to communicate this to the guest and/or make it
 *      tunable by the guest.  If we get more outstanding requests at a time
 *      than this we will get EAGAIN from io_submit which is communicated to
 *      the guest as an I/O error.
 */
#define MAX_EVENTS 128

31 32
#define MAX_QUEUED_IO  128

33
struct qemu_laiocb {
34
    BlockAIOCB common;
35
    Coroutine *co;
36
    LinuxAioState *ctx;
37 38 39
    struct iocb iocb;
    ssize_t ret;
    size_t nbytes;
40 41
    QEMUIOVector *qiov;
    bool is_read;
42
    QSIMPLEQ_ENTRY(qemu_laiocb) next;
43 44
};

45 46
typedef struct {
    int plugged;
47
    unsigned int n;
48
    bool blocked;
49
    QSIMPLEQ_HEAD(, qemu_laiocb) pending;
50 51
} LaioQueue;

52
struct LinuxAioState {
53
    io_context_t ctx;
P
Paolo Bonzini 已提交
54
    EventNotifier e;
55 56 57

    /* io queue for submit at batch */
    LaioQueue io_q;
58 59 60 61 62 63

    /* I/O completion processing */
    QEMUBH *completion_bh;
    struct io_event events[MAX_EVENTS];
    int event_idx;
    int event_max;
64 65
};

66
static void ioq_submit(LinuxAioState *s);
67

68 69 70 71 72
static inline ssize_t io_event_ret(struct io_event *ev)
{
    return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
}

K
Kevin Wolf 已提交
73 74 75
/*
 * Completes an AIO request (calls the callback and frees the ACB).
 */
76
static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
K
Kevin Wolf 已提交
77 78 79 80 81
{
    int ret;

    ret = laiocb->ret;
    if (ret != -ECANCELED) {
82
        if (ret == laiocb->nbytes) {
K
Kevin Wolf 已提交
83
            ret = 0;
84 85 86
        } else if (ret >= 0) {
            /* Short reads mean EOF, pad with zeros. */
            if (laiocb->is_read) {
87 88
                qemu_iovec_memset(laiocb->qiov, ret, 0,
                    laiocb->qiov->size - ret);
89 90 91 92
            } else {
                ret = -EINVAL;
            }
        }
K
Kevin Wolf 已提交
93 94
    }

95 96 97 98 99 100 101
    laiocb->ret = ret;
    if (laiocb->co) {
        qemu_coroutine_enter(laiocb->co, NULL);
    } else {
        laiocb->common.cb(laiocb->common.opaque, ret);
        qemu_aio_unref(laiocb);
    }
K
Kevin Wolf 已提交
102 103
}

104 105 106 107 108
/* The completion BH fetches completed I/O requests and invokes their
 * callbacks.
 *
 * The function is somewhat tricky because it supports nested event loops, for
 * example when a request callback invokes aio_poll().  In order to do this,
109
 * the completion events array and index are kept in LinuxAioState.  The BH
110 111 112 113 114 115
 * reschedules itself as long as there are completions pending so it will
 * either be called again in a nested event loop or will be called after all
 * events have been completed.  When there are no events left to complete, the
 * BH returns without rescheduling.
 */
static void qemu_laio_completion_bh(void *opaque)
116
{
117
    LinuxAioState *s = opaque;
118

119 120
    /* Fetch more completion events when empty */
    if (s->event_idx == s->event_max) {
121
        do {
122 123 124 125 126 127 128 129 130 131 132
            struct timespec ts = { 0 };
            s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS,
                                        s->events, &ts);
        } while (s->event_max == -EINTR);

        s->event_idx = 0;
        if (s->event_max <= 0) {
            s->event_max = 0;
            return; /* no more events */
        }
    }
133

134 135
    /* Reschedule so nested event loops see currently pending completions */
    qemu_bh_schedule(s->completion_bh);
136

137 138 139 140 141 142 143 144 145
    /* Process completion events */
    while (s->event_idx < s->event_max) {
        struct iocb *iocb = s->events[s->event_idx].obj;
        struct qemu_laiocb *laiocb =
                container_of(iocb, struct qemu_laiocb, iocb);

        laiocb->ret = io_event_ret(&s->events[s->event_idx]);
        s->event_idx++;

146
        qemu_laio_process_completion(laiocb);
147
    }
148 149 150 151

    if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
        ioq_submit(s);
    }
152 153 154 155
}

static void qemu_laio_completion_cb(EventNotifier *e)
{
156
    LinuxAioState *s = container_of(e, LinuxAioState, e);
157 158 159

    if (event_notifier_test_and_clear(&s->e)) {
        qemu_bh_schedule(s->completion_bh);
160 161 162
    }
}

163
static void laio_cancel(BlockAIOCB *blockacb)
164 165 166 167 168
{
    struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb;
    struct io_event event;
    int ret;

169
    if (laiocb->ret != -EINPROGRESS) {
170
        return;
171
    }
172
    ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
173 174 175
    laiocb->ret = -ECANCELED;
    if (ret != 0) {
        /* iocb is not cancelled, cb will be called by the event loop later */
176 177 178
        return;
    }

179
    laiocb->common.cb(laiocb->common.opaque, laiocb->ret);
180 181
}

S
Stefan Hajnoczi 已提交
182
static const AIOCBInfo laio_aiocb_info = {
183
    .aiocb_size         = sizeof(struct qemu_laiocb),
184
    .cancel_async       = laio_cancel,
185 186
};

187 188
static void ioq_init(LaioQueue *io_q)
{
189
    QSIMPLEQ_INIT(&io_q->pending);
190
    io_q->plugged = 0;
191
    io_q->n = 0;
192
    io_q->blocked = false;
193 194
}

195
static void ioq_submit(LinuxAioState *s)
196
{
197
    int ret, len;
198 199
    struct qemu_laiocb *aiocb;
    struct iocb *iocbs[MAX_QUEUED_IO];
200
    QSIMPLEQ_HEAD(, qemu_laiocb) completed;
201

202 203 204 205 206 207 208
    do {
        len = 0;
        QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
            iocbs[len++] = &aiocb->iocb;
            if (len == MAX_QUEUED_IO) {
                break;
            }
209
        }
210

211 212
        ret = io_submit(s->ctx, len, iocbs);
        if (ret == -EAGAIN) {
213
            break;
214 215 216 217 218
        }
        if (ret < 0) {
            abort();
        }

219 220 221
        s->io_q.n -= ret;
        aiocb = container_of(iocbs[ret - 1], struct qemu_laiocb, iocb);
        QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, aiocb, next, &completed);
222
    } while (ret == len && !QSIMPLEQ_EMPTY(&s->io_q.pending));
223
    s->io_q.blocked = (s->io_q.n > 0);
224 225
}

226
void laio_io_plug(BlockDriverState *bs, LinuxAioState *s)
227
{
228 229
    assert(!s->io_q.plugged);
    s->io_q.plugged = 1;
230 231
}

232
void laio_io_unplug(BlockDriverState *bs, LinuxAioState *s)
233
{
234 235
    assert(s->io_q.plugged);
    s->io_q.plugged = 0;
236
    if (!s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
237
        ioq_submit(s);
238 239 240
    }
}

241 242
static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset,
                          int type)
243
{
244 245 246
    LinuxAioState *s = laiocb->ctx;
    struct iocb *iocbs = &laiocb->iocb;
    QEMUIOVector *qiov = laiocb->qiov;
247 248 249 250 251 252 253 254

    switch (type) {
    case QEMU_AIO_WRITE:
        io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
	break;
    case QEMU_AIO_READ:
        io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset);
	break;
F
Frediano Ziglio 已提交
255
    /* Currently Linux kernel does not support other operations */
256 257 258
    default:
        fprintf(stderr, "%s: invalid AIO request type 0x%x.\n",
                        __func__, type);
259
        return -EIO;
260
    }
P
Paolo Bonzini 已提交
261
    io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
262

263
    QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
264
    s->io_q.n++;
265
    if (!s->io_q.blocked &&
266
        (!s->io_q.plugged || s->io_q.n >= MAX_QUEUED_IO)) {
267
        ioq_submit(s);
268
    }
269

270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
    return 0;
}

int coroutine_fn laio_co_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
                                int64_t sector_num, QEMUIOVector *qiov,
                                int nb_sectors, int type)
{
    off_t offset = sector_num * BDRV_SECTOR_SIZE;
    int ret;

    struct qemu_laiocb laiocb = {
        .co         = qemu_coroutine_self(),
        .nbytes     = nb_sectors * BDRV_SECTOR_SIZE,
        .ctx        = s,
        .is_read    = (type == QEMU_AIO_READ),
        .qiov       = qiov,
    };

    ret = laio_do_submit(fd, &laiocb, offset, type);
    if (ret < 0) {
        return ret;
    }

    qemu_coroutine_yield();
    return laiocb.ret;
}

BlockAIOCB *laio_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
        BlockCompletionFunc *cb, void *opaque, int type)
{
    struct qemu_laiocb *laiocb;
    off_t offset = sector_num * BDRV_SECTOR_SIZE;
    int ret;

    laiocb = qemu_aio_get(&laio_aiocb_info, bs, cb, opaque);
    laiocb->nbytes = nb_sectors * BDRV_SECTOR_SIZE;
    laiocb->ctx = s;
    laiocb->ret = -EINPROGRESS;
    laiocb->is_read = (type == QEMU_AIO_READ);
    laiocb->qiov = qiov;

    ret = laio_do_submit(fd, laiocb, offset, type);
    if (ret < 0) {
        qemu_aio_unref(laiocb);
        return NULL;
    }

    return &laiocb->common;
319 320
}

321
void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
322
{
323
    aio_set_event_notifier(old_context, &s->e, false, NULL);
324
    qemu_bh_delete(s->completion_bh);
325 326
}

327
void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
328
{
329
    s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
330 331
    aio_set_event_notifier(new_context, &s->e, false,
                           qemu_laio_completion_cb);
332 333
}

334
LinuxAioState *laio_init(void)
335
{
336
    LinuxAioState *s;
337

338
    s = g_malloc0(sizeof(*s));
P
Paolo Bonzini 已提交
339
    if (event_notifier_init(&s->e, false) < 0) {
340
        goto out_free_state;
P
Paolo Bonzini 已提交
341
    }
342

P
Paolo Bonzini 已提交
343
    if (io_setup(MAX_EVENTS, &s->ctx) != 0) {
344
        goto out_close_efd;
P
Paolo Bonzini 已提交
345
    }
346

347 348
    ioq_init(&s->io_q);

349 350 351
    return s;

out_close_efd:
P
Paolo Bonzini 已提交
352
    event_notifier_cleanup(&s->e);
353
out_free_state:
354
    g_free(s);
355 356
    return NULL;
}
357

358
void laio_cleanup(LinuxAioState *s)
359 360
{
    event_notifier_cleanup(&s->e);
G
Gonglei 已提交
361 362 363 364 365

    if (io_destroy(s->ctx) != 0) {
        fprintf(stderr, "%s: destroy AIO context %p failed\n",
                        __func__, &s->ctx);
    }
366 367
    g_free(s);
}