You need to sign in or sign up before continuing.
linux-aio.c 8.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
/*
 * Linux native AIO support.
 *
 * Copyright (C) 2009 IBM, Corp.
 * Copyright (C) 2009 Red Hat, Inc.
 *
 * This work is licensed under the terms of the GNU GPL, version 2 or later.
 * See the COPYING file in the top-level directory.
 */
#include "qemu-common.h"
11
#include "block/aio.h"
12
#include "qemu/queue.h"
13
#include "block/raw-aio.h"
14
#include "qemu/event_notifier.h"
15 16 17 18 19 20 21 22 23 24 25 26 27

#include <libaio.h>

/*
 * Queue size (per-device).
 *
 * XXX: eventually we need to communicate this to the guest and/or make it
 *      tunable by the guest.  If we get more outstanding requests at a time
 *      than this we will get EAGAIN from io_submit which is communicated to
 *      the guest as an I/O error.
 */
#define MAX_EVENTS 128

28 29
#define MAX_QUEUED_IO  128

30 31 32 33 34 35
struct qemu_laiocb {
    BlockDriverAIOCB common;
    struct qemu_laio_state *ctx;
    struct iocb iocb;
    ssize_t ret;
    size_t nbytes;
36 37
    QEMUIOVector *qiov;
    bool is_read;
K
Kevin Wolf 已提交
38
    QLIST_ENTRY(qemu_laiocb) node;
39 40
};

41 42 43 44 45 46 47
typedef struct {
    struct iocb *iocbs[MAX_QUEUED_IO];
    int plugged;
    unsigned int size;
    unsigned int idx;
} LaioQueue;

48 49
struct qemu_laio_state {
    io_context_t ctx;
P
Paolo Bonzini 已提交
50
    EventNotifier e;
51 52 53

    /* io queue for submit at batch */
    LaioQueue io_q;
54 55 56 57 58 59

    /* I/O completion processing */
    QEMUBH *completion_bh;
    struct io_event events[MAX_EVENTS];
    int event_idx;
    int event_max;
60 61 62 63 64 65 66
};

static inline ssize_t io_event_ret(struct io_event *ev)
{
    return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
}

K
Kevin Wolf 已提交
67 68 69 70 71 72 73 74 75 76
/*
 * Completes an AIO request (calls the callback and frees the ACB).
 */
static void qemu_laio_process_completion(struct qemu_laio_state *s,
    struct qemu_laiocb *laiocb)
{
    int ret;

    ret = laiocb->ret;
    if (ret != -ECANCELED) {
77
        if (ret == laiocb->nbytes) {
K
Kevin Wolf 已提交
78
            ret = 0;
79 80 81
        } else if (ret >= 0) {
            /* Short reads mean EOF, pad with zeros. */
            if (laiocb->is_read) {
82 83
                qemu_iovec_memset(laiocb->qiov, ret, 0,
                    laiocb->qiov->size - ret);
84 85 86 87
            } else {
                ret = -EINVAL;
            }
        }
K
Kevin Wolf 已提交
88 89 90 91 92 93 94

        laiocb->common.cb(laiocb->common.opaque, ret);
    }

    qemu_aio_release(laiocb);
}

95 96 97 98 99 100 101 102 103 104 105 106
/* The completion BH fetches completed I/O requests and invokes their
 * callbacks.
 *
 * The function is somewhat tricky because it supports nested event loops, for
 * example when a request callback invokes aio_poll().  In order to do this,
 * the completion events array and index are kept in qemu_laio_state.  The BH
 * reschedules itself as long as there are completions pending so it will
 * either be called again in a nested event loop or will be called after all
 * events have been completed.  When there are no events left to complete, the
 * BH returns without rescheduling.
 */
static void qemu_laio_completion_bh(void *opaque)
107
{
108
    struct qemu_laio_state *s = opaque;
109

110 111
    /* Fetch more completion events when empty */
    if (s->event_idx == s->event_max) {
112
        do {
113 114 115 116 117 118 119 120 121 122 123
            struct timespec ts = { 0 };
            s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS,
                                        s->events, &ts);
        } while (s->event_max == -EINTR);

        s->event_idx = 0;
        if (s->event_max <= 0) {
            s->event_max = 0;
            return; /* no more events */
        }
    }
124

125 126
    /* Reschedule so nested event loops see currently pending completions */
    qemu_bh_schedule(s->completion_bh);
127

128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
    /* Process completion events */
    while (s->event_idx < s->event_max) {
        struct iocb *iocb = s->events[s->event_idx].obj;
        struct qemu_laiocb *laiocb =
                container_of(iocb, struct qemu_laiocb, iocb);

        laiocb->ret = io_event_ret(&s->events[s->event_idx]);
        s->event_idx++;

        qemu_laio_process_completion(s, laiocb);
    }
}

static void qemu_laio_completion_cb(EventNotifier *e)
{
    struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);

    if (event_notifier_test_and_clear(&s->e)) {
        qemu_bh_schedule(s->completion_bh);
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
    }
}

static void laio_cancel(BlockDriverAIOCB *blockacb)
{
    struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb;
    struct io_event event;
    int ret;

    if (laiocb->ret != -EINPROGRESS)
        return;

    /*
     * Note that as of Linux 2.6.31 neither the block device code nor any
     * filesystem implements cancellation of AIO request.
     * Thus the polling loop below is the normal code path.
     */
    ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
    if (ret == 0) {
        laiocb->ret = -ECANCELED;
        return;
    }

    /*
     * We have to wait for the iocb to finish.
     *
     * The only way to get the iocb status update is by polling the io context.
     * We might be able to do this slightly more optimal by removing the
     * O_NONBLOCK flag.
     */
P
Paolo Bonzini 已提交
177 178 179
    while (laiocb->ret == -EINPROGRESS) {
        qemu_laio_completion_cb(&laiocb->ctx->e);
    }
180 181
}

S
Stefan Hajnoczi 已提交
182
static const AIOCBInfo laio_aiocb_info = {
183 184 185 186
    .aiocb_size         = sizeof(struct qemu_laiocb),
    .cancel             = laio_cancel,
};

187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
static void ioq_init(LaioQueue *io_q)
{
    io_q->size = MAX_QUEUED_IO;
    io_q->idx = 0;
    io_q->plugged = 0;
}

static int ioq_submit(struct qemu_laio_state *s)
{
    int ret, i = 0;
    int len = s->io_q.idx;

    do {
        ret = io_submit(s->ctx, len, s->io_q.iocbs);
    } while (i++ < 3 && ret == -EAGAIN);

    /* empty io queue */
    s->io_q.idx = 0;

    if (ret < 0) {
        i = 0;
    } else {
        i = ret;
    }

    for (; i < len; i++) {
        struct qemu_laiocb *laiocb =
            container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);

        laiocb->ret = (ret < 0) ? ret : -EIO;
        qemu_laio_process_completion(s, laiocb);
    }
    return ret;
}

static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
{
    unsigned int idx = s->io_q.idx;

    s->io_q.iocbs[idx++] = iocb;
    s->io_q.idx = idx;

    /* submit immediately if queue is full */
    if (idx == s->io_q.size) {
        ioq_submit(s);
    }
}

void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
{
    struct qemu_laio_state *s = aio_ctx;

    s->io_q.plugged++;
}

int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, bool unplug)
{
    struct qemu_laio_state *s = aio_ctx;
    int ret = 0;

    assert(s->io_q.plugged > 0 || !unplug);

    if (unplug && --s->io_q.plugged > 0) {
        return 0;
    }

    if (s->io_q.idx > 0) {
        ret = ioq_submit(s);
    }

    return ret;
}

260 261 262 263 264 265 266 267 268
BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
        BlockDriverCompletionFunc *cb, void *opaque, int type)
{
    struct qemu_laio_state *s = aio_ctx;
    struct qemu_laiocb *laiocb;
    struct iocb *iocbs;
    off_t offset = sector_num * 512;

S
Stefan Hajnoczi 已提交
269
    laiocb = qemu_aio_get(&laio_aiocb_info, bs, cb, opaque);
270 271 272
    laiocb->nbytes = nb_sectors * 512;
    laiocb->ctx = s;
    laiocb->ret = -EINPROGRESS;
273 274
    laiocb->is_read = (type == QEMU_AIO_READ);
    laiocb->qiov = qiov;
275 276 277 278 279 280 281 282 283 284

    iocbs = &laiocb->iocb;

    switch (type) {
    case QEMU_AIO_WRITE:
        io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
	break;
    case QEMU_AIO_READ:
        io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset);
	break;
F
Frediano Ziglio 已提交
285
    /* Currently Linux kernel does not support other operations */
286 287 288 289 290
    default:
        fprintf(stderr, "%s: invalid AIO request type 0x%x.\n",
                        __func__, type);
        goto out_free_aiocb;
    }
P
Paolo Bonzini 已提交
291
    io_set_eventfd(&laiocb->iocb, event_notifier_get_fd(&s->e));
292

293 294 295 296 297 298 299
    if (!s->io_q.plugged) {
        if (io_submit(s->ctx, 1, &iocbs) < 0) {
            goto out_free_aiocb;
        }
    } else {
        ioq_enqueue(s, iocbs);
    }
300 301
    return &laiocb->common;

302 303
out_free_aiocb:
    qemu_aio_release(laiocb);
304 305 306
    return NULL;
}

307 308 309 310 311
void laio_detach_aio_context(void *s_, AioContext *old_context)
{
    struct qemu_laio_state *s = s_;

    aio_set_event_notifier(old_context, &s->e, NULL);
312
    qemu_bh_delete(s->completion_bh);
313 314 315 316 317 318
}

void laio_attach_aio_context(void *s_, AioContext *new_context)
{
    struct qemu_laio_state *s = s_;

319
    s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
320 321 322
    aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
}

323 324 325 326
void *laio_init(void)
{
    struct qemu_laio_state *s;

327
    s = g_malloc0(sizeof(*s));
P
Paolo Bonzini 已提交
328
    if (event_notifier_init(&s->e, false) < 0) {
329
        goto out_free_state;
P
Paolo Bonzini 已提交
330
    }
331

P
Paolo Bonzini 已提交
332
    if (io_setup(MAX_EVENTS, &s->ctx) != 0) {
333
        goto out_close_efd;
P
Paolo Bonzini 已提交
334
    }
335

336 337
    ioq_init(&s->io_q);

338 339 340
    return s;

out_close_efd:
P
Paolo Bonzini 已提交
341
    event_notifier_cleanup(&s->e);
342
out_free_state:
343
    g_free(s);
344 345
    return NULL;
}
346 347 348 349 350 351

void laio_cleanup(void *s_)
{
    struct qemu_laio_state *s = s_;

    event_notifier_cleanup(&s->e);
G
Gonglei 已提交
352 353 354 355 356

    if (io_destroy(s->ctx) != 0) {
        fprintf(stderr, "%s: destroy AIO context %p failed\n",
                        __func__, &s->ctx);
    }
357 358
    g_free(s);
}