linux-aio.c 8.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
/*
 * Linux native AIO support.
 *
 * Copyright (C) 2009 IBM, Corp.
 * Copyright (C) 2009 Red Hat, Inc.
 *
 * This work is licensed under the terms of the GNU GPL, version 2 or later.
 * See the COPYING file in the top-level directory.
 */
#include "qemu-common.h"
11
#include "block/aio.h"
12
#include "qemu/queue.h"
13
#include "block/raw-aio.h"
14
#include "qemu/event_notifier.h"
15 16 17 18 19 20 21 22 23 24 25 26 27

#include <libaio.h>

/*
 * Queue size (per-device).
 *
 * XXX: eventually we need to communicate this to the guest and/or make it
 *      tunable by the guest.  If we get more outstanding requests at a time
 *      than this we will get EAGAIN from io_submit which is communicated to
 *      the guest as an I/O error.
 */
#define MAX_EVENTS 128

28 29
#define MAX_QUEUED_IO  128

30
struct qemu_laiocb {
31
    BlockAIOCB common;
32 33 34 35
    struct qemu_laio_state *ctx;
    struct iocb iocb;
    ssize_t ret;
    size_t nbytes;
36 37
    QEMUIOVector *qiov;
    bool is_read;
K
Kevin Wolf 已提交
38
    QLIST_ENTRY(qemu_laiocb) node;
39 40
};

41 42 43 44 45 46 47
typedef struct {
    struct iocb *iocbs[MAX_QUEUED_IO];
    int plugged;
    unsigned int size;
    unsigned int idx;
} LaioQueue;

48 49
struct qemu_laio_state {
    io_context_t ctx;
P
Paolo Bonzini 已提交
50
    EventNotifier e;
51 52 53

    /* io queue for submit at batch */
    LaioQueue io_q;
54 55 56 57 58 59

    /* I/O completion processing */
    QEMUBH *completion_bh;
    struct io_event events[MAX_EVENTS];
    int event_idx;
    int event_max;
60 61 62 63 64 65 66
};

static inline ssize_t io_event_ret(struct io_event *ev)
{
    return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
}

K
Kevin Wolf 已提交
67 68 69 70 71 72 73 74 75 76
/*
 * Completes an AIO request (calls the callback and frees the ACB).
 */
static void qemu_laio_process_completion(struct qemu_laio_state *s,
    struct qemu_laiocb *laiocb)
{
    int ret;

    ret = laiocb->ret;
    if (ret != -ECANCELED) {
77
        if (ret == laiocb->nbytes) {
K
Kevin Wolf 已提交
78
            ret = 0;
79 80 81
        } else if (ret >= 0) {
            /* Short reads mean EOF, pad with zeros. */
            if (laiocb->is_read) {
82 83
                qemu_iovec_memset(laiocb->qiov, ret, 0,
                    laiocb->qiov->size - ret);
84 85 86 87
            } else {
                ret = -EINVAL;
            }
        }
K
Kevin Wolf 已提交
88
    }
89
    laiocb->common.cb(laiocb->common.opaque, ret);
K
Kevin Wolf 已提交
90

91
    qemu_aio_unref(laiocb);
K
Kevin Wolf 已提交
92 93
}

94 95 96 97 98 99 100 101 102 103 104 105
/* The completion BH fetches completed I/O requests and invokes their
 * callbacks.
 *
 * The function is somewhat tricky because it supports nested event loops, for
 * example when a request callback invokes aio_poll().  In order to do this,
 * the completion events array and index are kept in qemu_laio_state.  The BH
 * reschedules itself as long as there are completions pending so it will
 * either be called again in a nested event loop or will be called after all
 * events have been completed.  When there are no events left to complete, the
 * BH returns without rescheduling.
 */
static void qemu_laio_completion_bh(void *opaque)
106
{
107
    struct qemu_laio_state *s = opaque;
108

109 110
    /* Fetch more completion events when empty */
    if (s->event_idx == s->event_max) {
111
        do {
112 113 114 115 116 117 118 119 120 121 122
            struct timespec ts = { 0 };
            s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS,
                                        s->events, &ts);
        } while (s->event_max == -EINTR);

        s->event_idx = 0;
        if (s->event_max <= 0) {
            s->event_max = 0;
            return; /* no more events */
        }
    }
123

124 125
    /* Reschedule so nested event loops see currently pending completions */
    qemu_bh_schedule(s->completion_bh);
126

127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
    /* Process completion events */
    while (s->event_idx < s->event_max) {
        struct iocb *iocb = s->events[s->event_idx].obj;
        struct qemu_laiocb *laiocb =
                container_of(iocb, struct qemu_laiocb, iocb);

        laiocb->ret = io_event_ret(&s->events[s->event_idx]);
        s->event_idx++;

        qemu_laio_process_completion(s, laiocb);
    }
}

static void qemu_laio_completion_cb(EventNotifier *e)
{
    struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);

    if (event_notifier_test_and_clear(&s->e)) {
        qemu_bh_schedule(s->completion_bh);
146 147 148
    }
}

149
static void laio_cancel(BlockAIOCB *blockacb)
150 151 152 153 154
{
    struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb;
    struct io_event event;
    int ret;

155
    if (laiocb->ret != -EINPROGRESS) {
156
        return;
157
    }
158
    ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
159 160 161
    laiocb->ret = -ECANCELED;
    if (ret != 0) {
        /* iocb is not cancelled, cb will be called by the event loop later */
162 163 164
        return;
    }

165
    laiocb->common.cb(laiocb->common.opaque, laiocb->ret);
166 167
}

S
Stefan Hajnoczi 已提交
168
static const AIOCBInfo laio_aiocb_info = {
169
    .aiocb_size         = sizeof(struct qemu_laiocb),
170
    .cancel_async       = laio_cancel,
171 172
};

173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
static void ioq_init(LaioQueue *io_q)
{
    io_q->size = MAX_QUEUED_IO;
    io_q->idx = 0;
    io_q->plugged = 0;
}

static int ioq_submit(struct qemu_laio_state *s)
{
    int ret, i = 0;
    int len = s->io_q.idx;

    do {
        ret = io_submit(s->ctx, len, s->io_q.iocbs);
    } while (i++ < 3 && ret == -EAGAIN);

    /* empty io queue */
    s->io_q.idx = 0;

    if (ret < 0) {
        i = 0;
    } else {
        i = ret;
    }

    for (; i < len; i++) {
        struct qemu_laiocb *laiocb =
            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);

        laiocb->ret = (ret < 0) ? ret : -EIO;
        qemu_laio_process_completion(s, laiocb);
    }
    return ret;
}

static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
{
    unsigned int idx = s->io_q.idx;

    s->io_q.iocbs[idx++] = iocb;
    s->io_q.idx = idx;

    /* submit immediately if queue is full */
    if (idx == s->io_q.size) {
        ioq_submit(s);
    }
}

void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
{
    struct qemu_laio_state *s = aio_ctx;

    s->io_q.plugged++;
}

int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug)
{
    struct qemu_laio_state *s = aio_ctx;
    int ret = 0;

    assert(s->io_q.plugged > 0 || !unplug);

    if (unplug && --s->io_q.plugged > 0) {
        return 0;
    }

    if (s->io_q.idx > 0) {
        ret = ioq_submit(s);
    }

    return ret;
}

246
BlockAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
247
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
248
        BlockCompletionFunc *cb, void *opaque, int type)
249 250 251 252 253 254
{
    struct qemu_laio_state *s = aio_ctx;
    struct qemu_laiocb *laiocb;
    struct iocb *iocbs;
    off_t offset = sector_num * 512;

S
Stefan Hajnoczi 已提交
255
    laiocb = qemu_aio_get(&laio_aiocb_info, bs, cb, opaque);
256 257 258
    laiocb->nbytes = nb_sectors * 512;
    laiocb->ctx = s;
    laiocb->ret = -EINPROGRESS;
259 260
    laiocb->is_read = (type == QEMU_AIO_READ);
    laiocb->qiov = qiov;
261 262 263 264 265 266 267 268 269 270

    iocbs = &laiocb->iocb;

    switch (type) {
    case QEMU_AIO_WRITE:
        io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
	break;
    case QEMU_AIO_READ:
        io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset);
	break;
F
Frediano Ziglio 已提交
271
    /* Currently Linux kernel does not support other operations */
272 273 274 275 276
    default:
        fprintf(stderr, "%s: invalid AIO request type 0x%x.\n",
                        __func__, type);
        goto out_free_aiocb;
    }
P
Paolo Bonzini 已提交
277
    io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
278

279 280 281 282 283 284 285
    if (!s->io_q.plugged) {
        if (io_submit(s->ctx, 1, &iocbs) < 0) {
            goto out_free_aiocb;
        }
    } else {
        ioq_enqueue(s, iocbs);
    }
286 287
    return &laiocb->common;

288
out_free_aiocb:
289
    qemu_aio_unref(laiocb);
290 291 292
    return NULL;
}

293 294 295 296 297
void laio_detach_aio_context(void *s_, AioContext *old_context)
{
    struct qemu_laio_state *s = s_;

    aio_set_event_notifier(old_context, &s->e, NULL);
298
    qemu_bh_delete(s->completion_bh);
299 300 301 302 303 304
}

void laio_attach_aio_context(void *s_, AioContext *new_context)
{
    struct qemu_laio_state *s = s_;

305
    s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
306 307 308
    aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
}

309 310 311 312
void *laio_init(void)
{
    struct qemu_laio_state *s;

313
    s = g_malloc0(sizeof(*s));
P
Paolo Bonzini 已提交
314
    if (event_notifier_init(&s->e, false) < 0) {
315
        goto out_free_state;
P
Paolo Bonzini 已提交
316
    }
317

P
Paolo Bonzini 已提交
318
    if (io_setup(MAX_EVENTS, &s->ctx) != 0) {
319
        goto out_close_efd;
P
Paolo Bonzini 已提交
320
    }
321

322 323
    ioq_init(&s->io_q);

324 325 326
    return s;

out_close_efd:
P
Paolo Bonzini 已提交
327
    event_notifier_cleanup(&s->e);
328
out_free_state:
329
    g_free(s);
330 331
    return NULL;
}
332 333 334 335 336 337

void laio_cleanup(void *s_)
{
    struct qemu_laio_state *s = s_;

    event_notifier_cleanup(&s->e);
G
Gonglei 已提交
338 339 340 341 342

    if (io_destroy(s->ctx) != 0) {
        fprintf(stderr, "%s: destroy AIO context %p failed\n",
                        __func__, &s->ctx);
    }
343 344
    g_free(s);
}