async.c 12.3 KB
Newer Older
K
Kevin Wolf 已提交
1
/*
2
 * Data plane event loop
K
Kevin Wolf 已提交
3 4
 *
 * Copyright (c) 2003-2008 Fabrice Bellard
5
 * Copyright (c) 2009-2017 QEMU contributors
K
Kevin Wolf 已提交
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 * THE SOFTWARE.
 */

P
Peter Maydell 已提交
26
#include "qemu/osdep.h"
27
#include "qapi/error.h"
K
Kevin Wolf 已提交
28
#include "qemu-common.h"
29
#include "block/aio.h"
30
#include "block/thread-pool.h"
31
#include "qemu/main-loop.h"
P
Paolo Bonzini 已提交
32
#include "qemu/atomic.h"
33
#include "block/raw-aio.h"
34 35
#include "qemu/coroutine_int.h"
#include "trace.h"
36

K
Kevin Wolf 已提交
37 38 39 40
/***********************************************************/
/* bottom halves (can be seen as timers which expire ASAP) */

struct QEMUBH {
P
Paolo Bonzini 已提交
41
    AioContext *ctx;
K
Kevin Wolf 已提交
42 43 44
    QEMUBHFunc *cb;
    void *opaque;
    QEMUBH *next;
45 46 47
    bool scheduled;
    bool idle;
    bool deleted;
K
Kevin Wolf 已提交
48 49
};

P
Paolo Bonzini 已提交
50 51 52 53 54 55 56 57 58
void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
{
    QEMUBH *bh;
    bh = g_new(QEMUBH, 1);
    *bh = (QEMUBH){
        .ctx = ctx,
        .cb = cb,
        .opaque = opaque,
    };
59
    qemu_lockcnt_lock(&ctx->list_lock);
P
Paolo Bonzini 已提交
60 61 62 63 64 65
    bh->next = ctx->first_bh;
    bh->scheduled = 1;
    bh->deleted = 1;
    /* Make sure that the members are ready before putting bh into list */
    smp_wmb();
    ctx->first_bh = bh;
66
    qemu_lockcnt_unlock(&ctx->list_lock);
67
    aio_notify(ctx);
P
Paolo Bonzini 已提交
68 69
}

70
QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
K
Kevin Wolf 已提交
71 72
{
    QEMUBH *bh;
73 74 75 76 77 78
    bh = g_new(QEMUBH, 1);
    *bh = (QEMUBH){
        .ctx = ctx,
        .cb = cb,
        .opaque = opaque,
    };
79
    qemu_lockcnt_lock(&ctx->list_lock);
80
    bh->next = ctx->first_bh;
81 82
    /* Make sure that the members are ready before putting bh into list */
    smp_wmb();
83
    ctx->first_bh = bh;
84
    qemu_lockcnt_unlock(&ctx->list_lock);
K
Kevin Wolf 已提交
85 86 87
    return bh;
}

88 89 90 91 92
void aio_bh_call(QEMUBH *bh)
{
    bh->cb(bh->opaque);
}

93
/* Multiple occurrences of aio_bh_poll cannot be called concurrently */
94
int aio_bh_poll(AioContext *ctx)
K
Kevin Wolf 已提交
95
{
96
    QEMUBH *bh, **bhp, *next;
K
Kevin Wolf 已提交
97
    int ret;
P
Paolo Bonzini 已提交
98
    bool deleted = false;
99

100
    qemu_lockcnt_inc(&ctx->list_lock);
K
Kevin Wolf 已提交
101 102

    ret = 0;
103 104
    for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) {
        next = atomic_rcu_read(&bh->next);
105 106 107 108 109 110
        /* The atomic_xchg is paired with the one in qemu_bh_schedule.  The
         * implicit memory barrier ensures that the callback sees all writes
         * done by the scheduling thread.  It also ensures that the scheduling
         * thread sees the zero before bh->cb has run, and thus will call
         * aio_notify again if necessary.
         */
P
Paolo Bonzini 已提交
111
        if (atomic_xchg(&bh->scheduled, 0)) {
112 113
            /* Idle BHs don't count as progress */
            if (!bh->idle) {
K
Kevin Wolf 已提交
114
                ret = 1;
115
            }
K
Kevin Wolf 已提交
116
            bh->idle = 0;
117
            aio_context_acquire(ctx);
118
            aio_bh_call(bh);
119
            aio_context_release(ctx);
K
Kevin Wolf 已提交
120
        }
P
Paolo Bonzini 已提交
121 122 123
        if (bh->deleted) {
            deleted = true;
        }
K
Kevin Wolf 已提交
124 125 126
    }

    /* remove deleted bhs */
P
Paolo Bonzini 已提交
127 128 129 130 131
    if (!deleted) {
        qemu_lockcnt_dec(&ctx->list_lock);
        return ret;
    }

132
    if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) {
133
        bhp = &ctx->first_bh;
134 135
        while (*bhp) {
            bh = *bhp;
P
Paolo Bonzini 已提交
136
            if (bh->deleted && !bh->scheduled) {
137 138 139 140 141 142
                *bhp = bh->next;
                g_free(bh);
            } else {
                bhp = &bh->next;
            }
        }
143
        qemu_lockcnt_unlock(&ctx->list_lock);
K
Kevin Wolf 已提交
144 145 146 147 148 149 150
    }
    return ret;
}

void qemu_bh_schedule_idle(QEMUBH *bh)
{
    bh->idle = 1;
151 152 153
    /* Make sure that idle & any writes needed by the callback are done
     * before the locations are read in the aio_bh_poll.
     */
154
    atomic_mb_set(&bh->scheduled, 1);
K
Kevin Wolf 已提交
155 156 157 158
}

void qemu_bh_schedule(QEMUBH *bh)
{
159 160 161
    AioContext *ctx;

    ctx = bh->ctx;
K
Kevin Wolf 已提交
162
    bh->idle = 0;
163
    /* The memory barrier implicit in atomic_xchg makes sure that:
164 165 166 167
     * 1. idle & any writes needed by the callback are done before the
     *    locations are read in the aio_bh_poll.
     * 2. ctx is loaded before scheduled is set and the callback has a chance
     *    to execute.
168
     */
169 170 171
    if (atomic_xchg(&bh->scheduled, 1) == 0) {
        aio_notify(ctx);
    }
K
Kevin Wolf 已提交
172 173
}

174 175 176

/* This func is async.
 */
K
Kevin Wolf 已提交
177 178 179 180 181
void qemu_bh_cancel(QEMUBH *bh)
{
    bh->scheduled = 0;
}

182 183 184
/* This func is async.The bottom half will do the delete action at the finial
 * end.
 */
K
Kevin Wolf 已提交
185 186 187 188 189 190
void qemu_bh_delete(QEMUBH *bh)
{
    bh->scheduled = 0;
    bh->deleted = 1;
}

191 192
int64_t
aio_compute_timeout(AioContext *ctx)
K
Kevin Wolf 已提交
193
{
194 195
    int64_t deadline;
    int timeout = -1;
K
Kevin Wolf 已提交
196 197
    QEMUBH *bh;

198 199
    for (bh = atomic_rcu_read(&ctx->first_bh); bh;
         bh = atomic_rcu_read(&bh->next)) {
P
Paolo Bonzini 已提交
200
        if (bh->scheduled) {
K
Kevin Wolf 已提交
201 202 203
            if (bh->idle) {
                /* idle bottom halves will be polled at least
                 * every 10ms */
204
                timeout = 10000000;
K
Kevin Wolf 已提交
205 206 207
            } else {
                /* non-idle bottom halves will be executed
                 * immediately */
208
                return 0;
K
Kevin Wolf 已提交
209 210 211
            }
        }
    }
P
Paolo Bonzini 已提交
212

213
    deadline = timerlistgroup_deadline_ns(&ctx->tlg);
214
    if (deadline == 0) {
215
        return 0;
216
    } else {
217
        return qemu_soonest_timeout(timeout, deadline);
218
    }
219
}
220

221 222 223 224 225
static gboolean
aio_ctx_prepare(GSource *source, gint    *timeout)
{
    AioContext *ctx = (AioContext *) source;

226 227
    atomic_or(&ctx->notify_me, 1);

228 229
    /* We assume there is no timeout already supplied */
    *timeout = qemu_timeout_ns_to_ms(aio_compute_timeout(ctx));
230 231 232 233 234

    if (aio_prepare(ctx)) {
        *timeout = 0;
    }

235
    return *timeout == 0;
P
Paolo Bonzini 已提交
236 237 238 239 240 241 242 243
}

static gboolean
aio_ctx_check(GSource *source)
{
    AioContext *ctx = (AioContext *) source;
    QEMUBH *bh;

244
    atomic_and(&ctx->notify_me, ~1);
245
    aio_notify_accept(ctx);
246

P
Paolo Bonzini 已提交
247
    for (bh = ctx->first_bh; bh; bh = bh->next) {
P
Paolo Bonzini 已提交
248
        if (bh->scheduled) {
P
Paolo Bonzini 已提交
249
            return true;
C
Cao jin 已提交
250
        }
P
Paolo Bonzini 已提交
251
    }
252
    return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0);
P
Paolo Bonzini 已提交
253 254 255 256 257 258 259 260 261 262
}

static gboolean
aio_ctx_dispatch(GSource     *source,
                 GSourceFunc  callback,
                 gpointer     user_data)
{
    AioContext *ctx = (AioContext *) source;

    assert(callback == NULL);
263
    aio_dispatch(ctx, true);
P
Paolo Bonzini 已提交
264 265 266
    return true;
}

P
Paolo Bonzini 已提交
267 268 269 270 271
static void
aio_ctx_finalize(GSource     *source)
{
    AioContext *ctx = (AioContext *) source;

272
    thread_pool_free(ctx->thread_pool);
273

274 275 276 277 278 279 280 281
#ifdef CONFIG_LINUX_AIO
    if (ctx->linux_aio) {
        laio_detach_aio_context(ctx->linux_aio, ctx);
        laio_cleanup(ctx->linux_aio);
        ctx->linux_aio = NULL;
    }
#endif

282 283 284
    assert(QSLIST_EMPTY(&ctx->scheduled_coroutines));
    qemu_bh_delete(ctx->co_schedule_bh);

285 286
    qemu_lockcnt_lock(&ctx->list_lock);
    assert(!qemu_lockcnt_count(&ctx->list_lock));
287 288 289 290 291 292 293 294 295
    while (ctx->first_bh) {
        QEMUBH *next = ctx->first_bh->next;

        /* qemu_bh_delete() must have been called on BHs in this AioContext */
        assert(ctx->first_bh->deleted);

        g_free(ctx->first_bh);
        ctx->first_bh = next;
    }
296
    qemu_lockcnt_unlock(&ctx->list_lock);
297

298
    aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL);
P
Paolo Bonzini 已提交
299
    event_notifier_cleanup(&ctx->notifier);
300
    qemu_rec_mutex_destroy(&ctx->lock);
301
    qemu_lockcnt_destroy(&ctx->list_lock);
302
    timerlistgroup_deinit(&ctx->tlg);
P
Paolo Bonzini 已提交
303 304
}

P
Paolo Bonzini 已提交
305 306 307 308
static GSourceFuncs aio_source_funcs = {
    aio_ctx_prepare,
    aio_ctx_check,
    aio_ctx_dispatch,
P
Paolo Bonzini 已提交
309
    aio_ctx_finalize
P
Paolo Bonzini 已提交
310 311 312 313 314 315 316
};

GSource *aio_get_g_source(AioContext *ctx)
{
    g_source_ref(&ctx->source);
    return &ctx->source;
}
317

318 319 320 321 322 323 324 325
ThreadPool *aio_get_thread_pool(AioContext *ctx)
{
    if (!ctx->thread_pool) {
        ctx->thread_pool = thread_pool_new(ctx);
    }
    return ctx->thread_pool;
}

326 327 328 329 330 331 332 333 334 335 336
#ifdef CONFIG_LINUX_AIO
LinuxAioState *aio_get_linux_aio(AioContext *ctx)
{
    if (!ctx->linux_aio) {
        ctx->linux_aio = laio_init();
        laio_attach_aio_context(ctx->linux_aio, ctx);
    }
    return ctx->linux_aio;
}
#endif

P
Paolo Bonzini 已提交
337 338
void aio_notify(AioContext *ctx)
{
339 340 341
    /* Write e.g. bh->scheduled before reading ctx->notify_me.  Pairs
     * with atomic_or in aio_ctx_prepare or atomic_add in aio_poll.
     */
P
Paolo Bonzini 已提交
342
    smp_mb();
343
    if (ctx->notify_me) {
P
Paolo Bonzini 已提交
344
        event_notifier_set(&ctx->notifier);
345 346 347 348 349 350 351 352
        atomic_mb_set(&ctx->notified, true);
    }
}

void aio_notify_accept(AioContext *ctx)
{
    if (atomic_xchg(&ctx->notified, false)) {
        event_notifier_test_and_clear(&ctx->notifier);
P
Paolo Bonzini 已提交
353
    }
P
Paolo Bonzini 已提交
354 355
}

356 357 358 359 360
static void aio_timerlist_notify(void *opaque)
{
    aio_notify(opaque);
}

361 362 363 364
static void event_notifier_dummy_cb(EventNotifier *e)
{
}

365 366 367 368 369 370 371 372 373
/* Returns true if aio_notify() was called (e.g. a BH was scheduled) */
static bool event_notifier_poll(void *opaque)
{
    EventNotifier *e = opaque;
    AioContext *ctx = container_of(e, AioContext, notifier);

    return atomic_read(&ctx->notified);
}

374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
static void co_schedule_bh_cb(void *opaque)
{
    AioContext *ctx = opaque;
    QSLIST_HEAD(, Coroutine) straight, reversed;

    QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
    QSLIST_INIT(&straight);

    while (!QSLIST_EMPTY(&reversed)) {
        Coroutine *co = QSLIST_FIRST(&reversed);
        QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
        QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
    }

    while (!QSLIST_EMPTY(&straight)) {
        Coroutine *co = QSLIST_FIRST(&straight);
        QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
        trace_aio_co_schedule_bh_cb(ctx, co);
        qemu_coroutine_enter(co);
    }
}

396
AioContext *aio_context_new(Error **errp)
397
{
398
    int ret;
P
Paolo Bonzini 已提交
399
    AioContext *ctx;
F
Fam Zheng 已提交
400

P
Paolo Bonzini 已提交
401
    ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
C
Cao jin 已提交
402 403
    aio_context_setup(ctx);

404 405 406
    ret = event_notifier_init(&ctx->notifier, false);
    if (ret < 0) {
        error_setg_errno(errp, -ret, "Failed to initialize event notifier");
F
Fam Zheng 已提交
407
        goto fail;
408
    }
409
    g_source_set_can_recurse(&ctx->source, true);
410
    qemu_lockcnt_init(&ctx->list_lock);
411 412 413 414

    ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
    QSLIST_INIT(&ctx->scheduled_coroutines);

415
    aio_set_event_notifier(ctx, &ctx->notifier,
416
                           false,
417
                           (EventNotifierHandler *)
418
                           event_notifier_dummy_cb,
419
                           event_notifier_poll);
420 421 422
#ifdef CONFIG_LINUX_AIO
    ctx->linux_aio = NULL;
#endif
423
    ctx->thread_pool = NULL;
424
    qemu_rec_mutex_init(&ctx->lock);
425
    timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
P
Paolo Bonzini 已提交
426

S
Stefan Hajnoczi 已提交
427
    ctx->poll_ns = 0;
428
    ctx->poll_max_ns = 0;
S
Stefan Hajnoczi 已提交
429 430
    ctx->poll_grow = 0;
    ctx->poll_shrink = 0;
431

P
Paolo Bonzini 已提交
432
    return ctx;
F
Fam Zheng 已提交
433 434 435
fail:
    g_source_destroy(&ctx->source);
    return NULL;
P
Paolo Bonzini 已提交
436 437
}

438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
void aio_co_schedule(AioContext *ctx, Coroutine *co)
{
    trace_aio_co_schedule(ctx, co);
    QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines,
                              co, co_scheduled_next);
    qemu_bh_schedule(ctx->co_schedule_bh);
}

void aio_co_wake(struct Coroutine *co)
{
    AioContext *ctx;

    /* Read coroutine before co->ctx.  Matches smp_wmb in
     * qemu_coroutine_enter.
     */
    smp_read_barrier_depends();
    ctx = atomic_read(&co->ctx);

    if (ctx != qemu_get_current_aio_context()) {
        aio_co_schedule(ctx, co);
        return;
    }

    if (qemu_in_coroutine()) {
        Coroutine *self = qemu_coroutine_self();
        assert(self != co);
        QSIMPLEQ_INSERT_TAIL(&self->co_queue_wakeup, co, co_queue_next);
    } else {
        aio_context_acquire(ctx);
        qemu_coroutine_enter(co);
        aio_context_release(ctx);
    }
}

P
Paolo Bonzini 已提交
472 473 474 475 476 477 478 479
void aio_context_ref(AioContext *ctx)
{
    g_source_ref(&ctx->source);
}

void aio_context_unref(AioContext *ctx)
{
    g_source_unref(&ctx->source);
480
}
481 482 483

void aio_context_acquire(AioContext *ctx)
{
484
    qemu_rec_mutex_lock(&ctx->lock);
485 486 487 488
}

void aio_context_release(AioContext *ctx)
{
489
    qemu_rec_mutex_unlock(&ctx->lock);
490
}