aio-posix.c 7.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/*
 * QEMU aio implementation
 *
 * Copyright IBM, Corp. 2008
 *
 * Authors:
 *  Anthony Liguori   <aliguori@us.ibm.com>
 *
 * This work is licensed under the terms of the GNU GPL, version 2.  See
 * the COPYING file in the top-level directory.
 *
12 13
 * Contributions after 2012-01-13 are licensed under the terms of the
 * GNU GPL, version 2 or (at your option) any later version.
14 15 16
 */

#include "qemu-common.h"
17
#include "block/block.h"
18 19
#include "qemu/queue.h"
#include "qemu/sockets.h"
20 21 22

struct AioHandler
{
23
    GPollFD pfd;
24 25 26 27
    IOHandler *io_read;
    IOHandler *io_write;
    int deleted;
    void *opaque;
B
Blue Swirl 已提交
28
    QLIST_ENTRY(AioHandler) node;
29 30
};

31
static AioHandler *find_aio_handler(AioContext *ctx, int fd)
32 33 34
{
    AioHandler *node;

35
    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
36
        if (node->pfd.fd == fd)
A
Alexander Graf 已提交
37 38
            if (!node->deleted)
                return node;
39 40 41 42 43
    }

    return NULL;
}

44 45 46 47 48
void aio_set_fd_handler(AioContext *ctx,
                        int fd,
                        IOHandler *io_read,
                        IOHandler *io_write,
                        void *opaque)
49 50 51
{
    AioHandler *node;

52
    node = find_aio_handler(ctx, fd);
53 54 55 56

    /* Are we deleting the fd handler? */
    if (!io_read && !io_write) {
        if (node) {
P
Paolo Bonzini 已提交
57 58
            g_source_remove_poll(&ctx->source, &node->pfd);

59
            /* If the lock is held, just mark the node as deleted */
60
            if (ctx->walking_handlers) {
61
                node->deleted = 1;
62 63
                node->pfd.revents = 0;
            } else {
64 65 66 67
                /* Otherwise, delete it for real.  We can't just mark it as
                 * deleted because deleted nodes are only cleaned up after
                 * releasing the walking_handlers lock.
                 */
B
Blue Swirl 已提交
68
                QLIST_REMOVE(node, node);
69
                g_free(node);
70 71 72 73 74
            }
        }
    } else {
        if (node == NULL) {
            /* Alloc and insert if it's not already there */
75
            node = g_new0(AioHandler, 1);
76
            node->pfd.fd = fd;
77
            QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node);
P
Paolo Bonzini 已提交
78 79

            g_source_add_poll(&ctx->source, &node->pfd);
80 81 82 83 84
        }
        /* Update handler with latest information */
        node->io_read = io_read;
        node->io_write = io_write;
        node->opaque = opaque;
85

86 87
        node->pfd.events = (io_read ? G_IO_IN | G_IO_HUP | G_IO_ERR : 0);
        node->pfd.events |= (io_write ? G_IO_OUT | G_IO_ERR : 0);
88
    }
89 90

    aio_notify(ctx);
91 92
}

93 94
void aio_set_event_notifier(AioContext *ctx,
                            EventNotifier *notifier,
S
Stefan Hajnoczi 已提交
95
                            EventNotifierHandler *io_read)
96
{
97
    aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),
S
Stefan Hajnoczi 已提交
98
                       (IOHandler *)io_read, NULL, notifier);
99 100
}

101 102 103 104 105
bool aio_prepare(AioContext *ctx)
{
    return false;
}

106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
bool aio_pending(AioContext *ctx)
{
    AioHandler *node;

    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
        int revents;

        revents = node->pfd.revents & node->pfd.events;
        if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read) {
            return true;
        }
        if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write) {
            return true;
        }
    }

    return false;
}

125
bool aio_dispatch(AioContext *ctx)
126
{
P
Paolo Bonzini 已提交
127
    AioHandler *node;
128
    bool progress = false;
129

130 131 132 133 134 135 136 137 138
    /*
     * If there are callbacks left that have been queued, we need to call them.
     * Do not call select in this case, because it is possible that the caller
     * does not need a complete flush (as is the case for aio_poll loops).
     */
    if (aio_bh_poll(ctx)) {
        progress = true;
    }

139
    /*
140
     * We have to walk very carefully in case aio_set_fd_handler is
141 142 143 144 145 146 147 148 149 150 151 152
     * called while we're walking.
     */
    node = QLIST_FIRST(&ctx->aio_handlers);
    while (node) {
        AioHandler *tmp;
        int revents;

        ctx->walking_handlers++;

        revents = node->pfd.revents & node->pfd.events;
        node->pfd.revents = 0;

153 154 155
        if (!node->deleted &&
            (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) &&
            node->io_read) {
156
            node->io_read(node->opaque);
S
Stefan Hajnoczi 已提交
157 158 159 160 161

            /* aio_notify() does not count as progress */
            if (node->opaque != &ctx->notifier) {
                progress = true;
            }
162
        }
163 164 165
        if (!node->deleted &&
            (revents & (G_IO_OUT | G_IO_ERR)) &&
            node->io_write) {
166 167 168 169 170 171 172 173 174 175 176 177 178 179
            node->io_write(node->opaque);
            progress = true;
        }

        tmp = node;
        node = QLIST_NEXT(node, node);

        ctx->walking_handlers--;

        if (!ctx->walking_handlers && tmp->deleted) {
            QLIST_REMOVE(tmp, node);
            g_free(tmp);
        }
    }
180 181 182 183

    /* Run our timers */
    progress |= timerlistgroup_run_timers(&ctx->tlg);

184 185 186
    return progress;
}

187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
/* These thread-local variables are used only in a small part of aio_poll
 * around the call to the poll() system call.  In particular they are not
 * used while aio_poll is performing callbacks, which makes it much easier
 * to think about reentrancy!
 *
 * Stack-allocated arrays would be perfect but they have size limitations;
 * heap allocation is expensive enough that we want to reuse arrays across
 * calls to aio_poll().  And because poll() has to be called without holding
 * any lock, the arrays cannot be stored in AioContext.  Thread-local data
 * has none of the disadvantages of these three options.
 */
static __thread GPollFD *pollfds;
static __thread AioHandler **nodes;
static __thread unsigned npfd, nalloc;
static __thread Notifier pollfds_cleanup_notifier;

static void pollfds_cleanup(Notifier *n, void *unused)
{
    g_assert(npfd == 0);
    g_free(pollfds);
    g_free(nodes);
    nalloc = 0;
}

static void add_pollfd(AioHandler *node)
{
    if (npfd == nalloc) {
        if (nalloc == 0) {
            pollfds_cleanup_notifier.notify = pollfds_cleanup;
            qemu_thread_atexit_add(&pollfds_cleanup_notifier);
            nalloc = 8;
        } else {
            g_assert(nalloc <= INT_MAX);
            nalloc *= 2;
        }
        pollfds = g_renew(GPollFD, pollfds, nalloc);
        nodes = g_renew(AioHandler *, nodes, nalloc);
    }
    nodes[npfd] = node;
    pollfds[npfd] = (GPollFD) {
        .fd = node->pfd.fd,
        .events = node->pfd.events,
    };
    npfd++;
}

233 234 235
bool aio_poll(AioContext *ctx, bool blocking)
{
    AioHandler *node;
236
    int i, ret;
S
Stefan Hajnoczi 已提交
237
    bool progress;
238
    int64_t timeout;
239

240
    aio_context_acquire(ctx);
241 242
    progress = false;

P
Paolo Bonzini 已提交
243 244
    /* aio_notify can avoid the expensive event_notifier_set if
     * everything (file descriptors, bottom halves, timers) will
245 246
     * be re-evaluated before the next blocking poll().  This is
     * already true when aio_poll is called with blocking == false;
247 248
     * if blocking == true, it is only true after poll() returns,
     * so disable the optimization now.
P
Paolo Bonzini 已提交
249
     */
250 251 252
    if (blocking) {
        atomic_add(&ctx->notify_me, 2);
    }
P
Paolo Bonzini 已提交
253

254
    ctx->walking_handlers++;
255

256
    assert(npfd == 0);
257

258
    /* fill pollfds */
259
    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
260
        if (!node->deleted && node->pfd.events) {
261
            add_pollfd(node);
P
Paolo Bonzini 已提交
262 263
        }
    }
264

265
    timeout = blocking ? aio_compute_timeout(ctx) : 0;
266

P
Paolo Bonzini 已提交
267
    /* wait until next event */
268 269 270
    if (timeout) {
        aio_context_release(ctx);
    }
271
    ret = qemu_poll_ns((GPollFD *)pollfds, npfd, timeout);
272 273 274
    if (blocking) {
        atomic_sub(&ctx->notify_me, 2);
    }
275 276 277
    if (timeout) {
        aio_context_acquire(ctx);
    }
P
Paolo Bonzini 已提交
278

279
    aio_notify_accept(ctx);
280

P
Paolo Bonzini 已提交
281 282
    /* if we have any readable fds, dispatch event */
    if (ret > 0) {
283 284
        for (i = 0; i < npfd; i++) {
            nodes[i]->pfd.revents = pollfds[i].revents;
285
        }
286 287
    }

288 289 290
    npfd = 0;
    ctx->walking_handlers--;

291 292 293
    /* Run dispatch even if there were no readable fds to run timers */
    if (aio_dispatch(ctx)) {
        progress = true;
P
Paolo Bonzini 已提交
294
    }
295

296 297
    aio_context_release(ctx);

S
Stefan Hajnoczi 已提交
298
    return progress;
299
}