linux-aio.c 9.4 KB
Newer Older
1 2 3 4 5 6 7 8 9
/*
 * Linux native AIO support.
 *
 * Copyright (C) 2009 IBM, Corp.
 * Copyright (C) 2009 Red Hat, Inc.
 *
 * This work is licensed under the terms of the GNU GPL, version 2 or later.
 * See the COPYING file in the top-level directory.
 */
P
Peter Maydell 已提交
10
#include "qemu/osdep.h"
11
#include "qemu-common.h"
12
#include "block/aio.h"
13
#include "qemu/queue.h"
14
#include "block/block.h"
15
#include "block/raw-aio.h"
16
#include "qemu/event_notifier.h"
17
#include "qemu/coroutine.h"
18 19 20 21 22 23 24 25 26 27 28 29 30

#include <libaio.h>

/*
 * Queue size (per-device).
 *
 * XXX: eventually we need to communicate this to the guest and/or make it
 *      tunable by the guest.  If we get more outstanding requests at a time
 *      than this we will get EAGAIN from io_submit which is communicated to
 *      the guest as an I/O error.
 */
#define MAX_EVENTS 128

31 32
#define MAX_QUEUED_IO  128

33
struct qemu_laiocb {
34
    BlockAIOCB common;
35
    Coroutine *co;
36
    LinuxAioState *ctx;
37 38 39
    struct iocb iocb;
    ssize_t ret;
    size_t nbytes;
40 41
    QEMUIOVector *qiov;
    bool is_read;
42
    QSIMPLEQ_ENTRY(qemu_laiocb) next;
43 44
};

45 46
typedef struct {
    int plugged;
47
    unsigned int n;
48
    bool blocked;
49
    QSIMPLEQ_HEAD(, qemu_laiocb) pending;
50 51
} LaioQueue;

52
struct LinuxAioState {
53 54
    AioContext *aio_context;

55
    io_context_t ctx;
P
Paolo Bonzini 已提交
56
    EventNotifier e;
57 58 59

    /* io queue for submit at batch */
    LaioQueue io_q;
60 61 62 63 64 65

    /* I/O completion processing */
    QEMUBH *completion_bh;
    struct io_event events[MAX_EVENTS];
    int event_idx;
    int event_max;
66 67
};

68
static void ioq_submit(LinuxAioState *s);
69

70 71 72 73 74
static inline ssize_t io_event_ret(struct io_event *ev)
{
    return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
}

K
Kevin Wolf 已提交
75 76 77
/*
 * Completes an AIO request (calls the callback and frees the ACB).
 */
78
static void qemu_laio_process_completion(struct qemu_laiocb *laiocb)
K
Kevin Wolf 已提交
79 80 81 82 83
{
    int ret;

    ret = laiocb->ret;
    if (ret != -ECANCELED) {
84
        if (ret == laiocb->nbytes) {
K
Kevin Wolf 已提交
85
            ret = 0;
86 87 88
        } else if (ret >= 0) {
            /* Short reads mean EOF, pad with zeros. */
            if (laiocb->is_read) {
89 90
                qemu_iovec_memset(laiocb->qiov, ret, 0,
                    laiocb->qiov->size - ret);
91
            } else {
92
                ret = -ENOSPC;
93 94
            }
        }
K
Kevin Wolf 已提交
95 96
    }

97 98
    laiocb->ret = ret;
    if (laiocb->co) {
99
        qemu_coroutine_enter(laiocb->co);
100 101 102 103
    } else {
        laiocb->common.cb(laiocb->common.opaque, ret);
        qemu_aio_unref(laiocb);
    }
K
Kevin Wolf 已提交
104 105
}

106 107 108 109 110
/* The completion BH fetches completed I/O requests and invokes their
 * callbacks.
 *
 * The function is somewhat tricky because it supports nested event loops, for
 * example when a request callback invokes aio_poll().  In order to do this,
111
 * the completion events array and index are kept in LinuxAioState.  The BH
112 113 114 115 116 117
 * reschedules itself as long as there are completions pending so it will
 * either be called again in a nested event loop or will be called after all
 * events have been completed.  When there are no events left to complete, the
 * BH returns without rescheduling.
 */
static void qemu_laio_completion_bh(void *opaque)
118
{
119
    LinuxAioState *s = opaque;
120

121 122
    /* Fetch more completion events when empty */
    if (s->event_idx == s->event_max) {
123
        do {
124 125 126 127 128 129 130 131 132 133 134
            struct timespec ts = { 0 };
            s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS,
                                        s->events, &ts);
        } while (s->event_max == -EINTR);

        s->event_idx = 0;
        if (s->event_max <= 0) {
            s->event_max = 0;
            return; /* no more events */
        }
    }
135

136 137
    /* Reschedule so nested event loops see currently pending completions */
    qemu_bh_schedule(s->completion_bh);
138

139 140 141 142 143 144 145 146 147
    /* Process completion events */
    while (s->event_idx < s->event_max) {
        struct iocb *iocb = s->events[s->event_idx].obj;
        struct qemu_laiocb *laiocb =
                container_of(iocb, struct qemu_laiocb, iocb);

        laiocb->ret = io_event_ret(&s->events[s->event_idx]);
        s->event_idx++;

148
        qemu_laio_process_completion(laiocb);
149
    }
150 151 152 153

    if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
        ioq_submit(s);
    }
K
Kevin Wolf 已提交
154 155

    qemu_bh_cancel(s->completion_bh);
156 157 158 159
}

static void qemu_laio_completion_cb(EventNotifier *e)
{
160
    LinuxAioState *s = container_of(e, LinuxAioState, e);
161 162

    if (event_notifier_test_and_clear(&s->e)) {
K
Kevin Wolf 已提交
163
        qemu_laio_completion_bh(s);
164 165 166
    }
}

167
static void laio_cancel(BlockAIOCB *blockacb)
168 169 170 171 172
{
    struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb;
    struct io_event event;
    int ret;

173
    if (laiocb->ret != -EINPROGRESS) {
174
        return;
175
    }
176
    ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
177 178 179
    laiocb->ret = -ECANCELED;
    if (ret != 0) {
        /* iocb is not cancelled, cb will be called by the event loop later */
180 181 182
        return;
    }

183
    laiocb->common.cb(laiocb->common.opaque, laiocb->ret);
184 185
}

S
Stefan Hajnoczi 已提交
186
static const AIOCBInfo laio_aiocb_info = {
187
    .aiocb_size         = sizeof(struct qemu_laiocb),
188
    .cancel_async       = laio_cancel,
189 190
};

191 192
static void ioq_init(LaioQueue *io_q)
{
193
    QSIMPLEQ_INIT(&io_q->pending);
194
    io_q->plugged = 0;
195
    io_q->n = 0;
196
    io_q->blocked = false;
197 198
}

199
static void ioq_submit(LinuxAioState *s)
200
{
201
    int ret, len;
202 203
    struct qemu_laiocb *aiocb;
    struct iocb *iocbs[MAX_QUEUED_IO];
204
    QSIMPLEQ_HEAD(, qemu_laiocb) completed;
205

206 207 208 209 210 211 212
    do {
        len = 0;
        QSIMPLEQ_FOREACH(aiocb, &s->io_q.pending, next) {
            iocbs[len++] = &aiocb->iocb;
            if (len == MAX_QUEUED_IO) {
                break;
            }
213
        }
214

215 216
        ret = io_submit(s->ctx, len, iocbs);
        if (ret == -EAGAIN) {
217
            break;
218 219 220 221 222
        }
        if (ret < 0) {
            abort();
        }

223 224 225
        s->io_q.n -= ret;
        aiocb = container_of(iocbs[ret - 1], struct qemu_laiocb, iocb);
        QSIMPLEQ_SPLIT_AFTER(&s->io_q.pending, aiocb, next, &completed);
226
    } while (ret == len && !QSIMPLEQ_EMPTY(&s->io_q.pending));
227
    s->io_q.blocked = (s->io_q.n > 0);
228 229
}

230
void laio_io_plug(BlockDriverState *bs, LinuxAioState *s)
231
{
232
    s->io_q.plugged++;
233 234
}

235
void laio_io_unplug(BlockDriverState *bs, LinuxAioState *s)
236
{
237
    assert(s->io_q.plugged);
238 239
    if (--s->io_q.plugged == 0 &&
        !s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
240
        ioq_submit(s);
241 242 243
    }
}

244 245
static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset,
                          int type)
246
{
247 248 249
    LinuxAioState *s = laiocb->ctx;
    struct iocb *iocbs = &laiocb->iocb;
    QEMUIOVector *qiov = laiocb->qiov;
250 251 252 253 254 255 256 257

    switch (type) {
    case QEMU_AIO_WRITE:
        io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
	break;
    case QEMU_AIO_READ:
        io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset);
	break;
F
Frediano Ziglio 已提交
258
    /* Currently Linux kernel does not support other operations */
259 260 261
    default:
        fprintf(stderr, "%s: invalid AIO request type 0x%x.\n",
                        __func__, type);
262
        return -EIO;
263
    }
P
Paolo Bonzini 已提交
264
    io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
265

266
    QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next);
267
    s->io_q.n++;
268
    if (!s->io_q.blocked &&
269
        (!s->io_q.plugged || s->io_q.n >= MAX_QUEUED_IO)) {
270
        ioq_submit(s);
271
    }
272

273 274 275 276
    return 0;
}

int coroutine_fn laio_co_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
277
                                uint64_t offset, QEMUIOVector *qiov, int type)
278 279 280 281
{
    int ret;
    struct qemu_laiocb laiocb = {
        .co         = qemu_coroutine_self(),
282
        .nbytes     = qiov->size,
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
        .ctx        = s,
        .is_read    = (type == QEMU_AIO_READ),
        .qiov       = qiov,
    };

    ret = laio_do_submit(fd, &laiocb, offset, type);
    if (ret < 0) {
        return ret;
    }

    qemu_coroutine_yield();
    return laiocb.ret;
}

BlockAIOCB *laio_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
        BlockCompletionFunc *cb, void *opaque, int type)
{
    struct qemu_laiocb *laiocb;
    off_t offset = sector_num * BDRV_SECTOR_SIZE;
    int ret;

    laiocb = qemu_aio_get(&laio_aiocb_info, bs, cb, opaque);
    laiocb->nbytes = nb_sectors * BDRV_SECTOR_SIZE;
    laiocb->ctx = s;
    laiocb->ret = -EINPROGRESS;
    laiocb->is_read = (type == QEMU_AIO_READ);
    laiocb->qiov = qiov;

    ret = laio_do_submit(fd, laiocb, offset, type);
    if (ret < 0) {
        qemu_aio_unref(laiocb);
        return NULL;
    }

    return &laiocb->common;
319 320
}

321
void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
322
{
323
    aio_set_event_notifier(old_context, &s->e, false, NULL);
324
    qemu_bh_delete(s->completion_bh);
325 326
}

327
void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
328
{
329
    s->aio_context = new_context;
330
    s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
331 332
    aio_set_event_notifier(new_context, &s->e, false,
                           qemu_laio_completion_cb);
333 334
}

335
LinuxAioState *laio_init(void)
336
{
337
    LinuxAioState *s;
338

339
    s = g_malloc0(sizeof(*s));
P
Paolo Bonzini 已提交
340
    if (event_notifier_init(&s->e, false) < 0) {
341
        goto out_free_state;
P
Paolo Bonzini 已提交
342
    }
343

P
Paolo Bonzini 已提交
344
    if (io_setup(MAX_EVENTS, &s->ctx) != 0) {
345
        goto out_close_efd;
P
Paolo Bonzini 已提交
346
    }
347

348 349
    ioq_init(&s->io_q);

350 351 352
    return s;

out_close_efd:
P
Paolo Bonzini 已提交
353
    event_notifier_cleanup(&s->e);
354
out_free_state:
355
    g_free(s);
356 357
    return NULL;
}
358

359
void laio_cleanup(LinuxAioState *s)
360 361
{
    event_notifier_cleanup(&s->e);
G
Gonglei 已提交
362 363 364 365 366

    if (io_destroy(s->ctx) != 0) {
        fprintf(stderr, "%s: destroy AIO context %p failed\n",
                        __func__, &s->ctx);
    }
367 368
    g_free(s);
}