thread-pool.c 9.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * QEMU block layer thread pool
 *
 * Copyright IBM, Corp. 2008
 * Copyright Red Hat, Inc. 2012
 *
 * Authors:
 *  Anthony Liguori   <aliguori@us.ibm.com>
 *  Paolo Bonzini     <pbonzini@redhat.com>
 *
 * This work is licensed under the terms of the GNU GPL, version 2.  See
 * the COPYING file in the top-level directory.
 *
 * Contributions after 2012-01-13 are licensed under the terms of the
 * GNU GPL, version 2 or (at your option) any later version.
 */
P
Peter Maydell 已提交
17
#include "qemu/osdep.h"
18
#include "qemu-common.h"
19 20
#include "qemu/queue.h"
#include "qemu/thread.h"
21
#include "qemu/coroutine.h"
22
#include "trace-root.h"
23
#include "block/thread-pool.h"
24
#include "qemu/main-loop.h"
25

26
static void do_spawn_thread(ThreadPool *pool);
27 28 29 30 31 32 33 34 35 36

typedef struct ThreadPoolElement ThreadPoolElement;

enum ThreadState {
    THREAD_QUEUED,
    THREAD_ACTIVE,
    THREAD_DONE,
};

struct ThreadPoolElement {
37
    BlockAIOCB common;
38
    ThreadPool *pool;
39 40
    ThreadPoolFunc *func;
    void *arg;
41 42 43 44 45

    /* Moving state out of THREAD_QUEUED is protected by lock.  After
     * that, only the worker thread can write to it.  Reads and writes
     * of state and ret are ordered with memory barriers.
     */
46 47 48 49 50 51 52 53 54 55
    enum ThreadState state;
    int ret;

    /* Access to this list is protected by lock.  */
    QTAILQ_ENTRY(ThreadPoolElement) reqs;

    /* Access to this list is protected by the global mutex.  */
    QLIST_ENTRY(ThreadPoolElement) all;
};

56
struct ThreadPool {
57
    AioContext *ctx;
58
    QEMUBH *completion_bh;
59
    QemuMutex lock;
60
    QemuCond worker_stopped;
61 62 63 64 65 66 67 68 69 70 71 72 73
    QemuSemaphore sem;
    int max_threads;
    QEMUBH *new_thread_bh;

    /* The following variables are only accessed from one AioContext. */
    QLIST_HEAD(, ThreadPoolElement) head;

    /* The following variables are protected by lock.  */
    QTAILQ_HEAD(, ThreadPoolElement) request_list;
    int cur_threads;
    int idle_threads;
    int new_threads;     /* backlog of threads we need to create */
    int pending_threads; /* threads created but not running yet */
74
    bool stopping;
75 76 77
};

static void *worker_thread(void *opaque)
78
{
79 80 81 82 83
    ThreadPool *pool = opaque;

    qemu_mutex_lock(&pool->lock);
    pool->pending_threads--;
    do_spawn_thread(pool);
84

85
    while (!pool->stopping) {
86 87 88 89
        ThreadPoolElement *req;
        int ret;

        do {
90 91 92 93 94 95
            pool->idle_threads++;
            qemu_mutex_unlock(&pool->lock);
            ret = qemu_sem_timedwait(&pool->sem, 10000);
            qemu_mutex_lock(&pool->lock);
            pool->idle_threads--;
        } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
96
        if (ret == -1 || pool->stopping) {
97 98 99
            break;
        }

100 101
        req = QTAILQ_FIRST(&pool->request_list);
        QTAILQ_REMOVE(&pool->request_list, req, reqs);
102
        req->state = THREAD_ACTIVE;
103
        qemu_mutex_unlock(&pool->lock);
104 105 106 107

        ret = req->func(req->arg);

        req->ret = ret;
108 109 110 111
        /* Write ret before state.  */
        smp_wmb();
        req->state = THREAD_DONE;

112
        qemu_mutex_lock(&pool->lock);
113

114
        qemu_bh_schedule(pool->completion_bh);
115 116
    }

117
    pool->cur_threads--;
118
    qemu_cond_signal(&pool->worker_stopped);
119
    qemu_mutex_unlock(&pool->lock);
120 121 122
    return NULL;
}

123
static void do_spawn_thread(ThreadPool *pool)
124 125 126 127
{
    QemuThread t;

    /* Runs with lock taken.  */
128
    if (!pool->new_threads) {
129 130 131
        return;
    }

132 133
    pool->new_threads--;
    pool->pending_threads++;
134

135
    qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED);
136 137 138 139
}

static void spawn_thread_bh_fn(void *opaque)
{
140 141 142 143 144
    ThreadPool *pool = opaque;

    qemu_mutex_lock(&pool->lock);
    do_spawn_thread(pool);
    qemu_mutex_unlock(&pool->lock);
145 146
}

147
static void spawn_thread(ThreadPool *pool)
148
{
149 150
    pool->cur_threads++;
    pool->new_threads++;
151 152 153 154 155 156 157
    /* If there are threads being created, they will spawn new workers, so
     * we don't spend time creating many threads in a loop holding a mutex or
     * starving the current vcpu.
     *
     * If there are no idle threads, ask the main thread to create one, so we
     * inherit the correct affinity instead of the vcpu affinity.
     */
158 159
    if (!pool->pending_threads) {
        qemu_bh_schedule(pool->new_thread_bh);
160 161 162
    }
}

163
static void thread_pool_completion_bh(void *opaque)
164
{
165
    ThreadPool *pool = opaque;
166 167 168
    ThreadPoolElement *elem, *next;

restart:
169
    QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
170
        if (elem->state != THREAD_DONE) {
171 172
            continue;
        }
173 174 175 176 177 178

        trace_thread_pool_complete(pool, elem, elem->common.opaque,
                                   elem->ret);
        QLIST_REMOVE(elem, all);

        if (elem->common.cb) {
179 180
            /* Read state before ret.  */
            smp_rmb();
181 182 183 184 185 186

            /* Schedule ourselves in case elem->common.cb() calls aio_poll() to
             * wait for another request that completed at the same time.
             */
            qemu_bh_schedule(pool->completion_bh);

187
            elem->common.cb(elem->common.opaque, elem->ret);
188
            qemu_aio_unref(elem);
189 190
            goto restart;
        } else {
191
            qemu_aio_unref(elem);
192 193 194 195
        }
    }
}

196
static void thread_pool_cancel(BlockAIOCB *acb)
197 198
{
    ThreadPoolElement *elem = (ThreadPoolElement *)acb;
199
    ThreadPool *pool = elem->pool;
200 201 202

    trace_thread_pool_cancel(elem, elem->common.opaque);

203
    qemu_mutex_lock(&pool->lock);
204 205 206 207 208 209
    if (elem->state == THREAD_QUEUED &&
        /* No thread has yet started working on elem. we can try to "steal"
         * the item from the worker if we can get a signal from the
         * semaphore.  Because this is non-blocking, we can do it with
         * the lock taken and ensure that elem will remain THREAD_QUEUED.
         */
210 211
        qemu_sem_timedwait(&pool->sem, 0) == 0) {
        QTAILQ_REMOVE(&pool->request_list, elem, reqs);
212
        qemu_bh_schedule(pool->completion_bh);
213 214 215

        elem->state = THREAD_DONE;
        elem->ret = -ECANCELED;
216
    }
217

218
    qemu_mutex_unlock(&pool->lock);
219 220
}

221
static AioContext *thread_pool_get_aio_context(BlockAIOCB *acb)
222 223 224 225
{
    ThreadPoolElement *elem = (ThreadPoolElement *)acb;
    ThreadPool *pool = elem->pool;
    return pool->ctx;
226 227
}

S
Stefan Hajnoczi 已提交
228
static const AIOCBInfo thread_pool_aiocb_info = {
229
    .aiocb_size         = sizeof(ThreadPoolElement),
230 231
    .cancel_async       = thread_pool_cancel,
    .get_aio_context    = thread_pool_get_aio_context,
232 233
};

234
BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
235
        ThreadPoolFunc *func, void *arg,
236
        BlockCompletionFunc *cb, void *opaque)
237 238 239
{
    ThreadPoolElement *req;

S
Stefan Hajnoczi 已提交
240
    req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
241 242 243
    req->func = func;
    req->arg = arg;
    req->state = THREAD_QUEUED;
244
    req->pool = pool;
245

246
    QLIST_INSERT_HEAD(&pool->head, req, all);
247

248
    trace_thread_pool_submit(pool, req, arg);
249

250 251 252
    qemu_mutex_lock(&pool->lock);
    if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
        spawn_thread(pool);
253
    }
254 255 256
    QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
    qemu_mutex_unlock(&pool->lock);
    qemu_sem_post(&pool->sem);
257 258 259 260 261 262 263 264 265 266 267 268 269
    return &req->common;
}

typedef struct ThreadPoolCo {
    Coroutine *co;
    int ret;
} ThreadPoolCo;

static void thread_pool_co_cb(void *opaque, int ret)
{
    ThreadPoolCo *co = opaque;

    co->ret = ret;
270
    qemu_coroutine_enter(co->co);
271 272
}

273 274
int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
                                       void *arg)
275 276 277
{
    ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
    assert(qemu_in_coroutine());
278
    thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
279 280 281 282
    qemu_coroutine_yield();
    return tpc.ret;
}

283
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
284
{
285
    thread_pool_submit_aio(pool, func, arg, NULL, NULL);
286 287
}

288 289 290 291 292 293 294
static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
{
    if (!ctx) {
        ctx = qemu_get_aio_context();
    }

    memset(pool, 0, sizeof(*pool));
295
    pool->ctx = ctx;
296
    pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool);
297
    qemu_mutex_init(&pool->lock);
298
    qemu_cond_init(&pool->worker_stopped);
299 300 301 302 303 304 305 306
    qemu_sem_init(&pool->sem, 0);
    pool->max_threads = 64;
    pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);

    QLIST_INIT(&pool->head);
    QTAILQ_INIT(&pool->request_list);
}

307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
ThreadPool *thread_pool_new(AioContext *ctx)
{
    ThreadPool *pool = g_new(ThreadPool, 1);
    thread_pool_init_one(pool, ctx);
    return pool;
}

void thread_pool_free(ThreadPool *pool)
{
    if (!pool) {
        return;
    }

    assert(QLIST_EMPTY(&pool->head));

    qemu_mutex_lock(&pool->lock);

    /* Stop new threads from spawning */
    qemu_bh_delete(pool->new_thread_bh);
    pool->cur_threads -= pool->new_threads;
    pool->new_threads = 0;

    /* Wait for worker threads to terminate */
    pool->stopping = true;
    while (pool->cur_threads > 0) {
        qemu_sem_post(&pool->sem);
        qemu_cond_wait(&pool->worker_stopped, &pool->lock);
    }

    qemu_mutex_unlock(&pool->lock);

338
    qemu_bh_delete(pool->completion_bh);
339 340 341 342 343
    qemu_sem_destroy(&pool->sem);
    qemu_cond_destroy(&pool->worker_stopped);
    qemu_mutex_destroy(&pool->lock);
    g_free(pool);
}