workqueue.c 10.4 KB
Newer Older
1
/*
2
 * Copyright (c) 2006-2018, RT-Thread Development Team
3
 *
4
 * SPDX-License-Identifier: Apache-2.0
5 6 7 8 9 10 11
 *
 * Change Logs:
 * Date           Author       Notes
 * 2017-02-27     bernard      fix the re-work issue.
 */

#include <rthw.h>
12 13 14 15
#include <rtthread.h>
#include <rtdevice.h>

#ifdef RT_USING_HEAP
16

17 18
static void _delayed_work_timeout_handler(void *parameter);

19 20 21
rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue)
{
    rt_err_t result;
22 23

    rt_enter_critical();
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
    while (1)
    {
        /* try to take condition semaphore */
        result = rt_sem_trytake(&(queue->sem));
        if (result == -RT_ETIMEOUT)
        {
            /* it's timeout, release this semaphore */
            rt_sem_release(&(queue->sem));
        }
        else if (result == RT_EOK)
        {
            /* keep the sem value = 0 */
            result = RT_EOK;
            break;
        }
        else
        {
            result = -RT_ERROR;
            break;
        }
    }
    rt_exit_critical();
46

47 48 49
    return result;
}

50
static void _workqueue_thread_entry(void *parameter)
51
{
52
    rt_base_t level;
53 54
    struct rt_work *work;
    struct rt_workqueue *queue;
55

56
    queue = (struct rt_workqueue *) parameter;
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
    RT_ASSERT(queue != RT_NULL);

    while (1)
    {
        if (rt_list_isempty(&(queue->work_list)))
        {
            /* no software timer exist, suspend self. */
            rt_thread_suspend(rt_thread_self());
            rt_schedule();
        }

        /* we have work to do with. */
        level = rt_hw_interrupt_disable();
        work = rt_list_entry(queue->work_list.next, struct rt_work, list);
        rt_list_remove(&(work->list));
        queue->work_current = work;
73
        work->flags &= ~RT_WORK_STATE_PENDING;
74
        work->workqueue = RT_NULL;
75 76 77 78 79 80 81 82
        rt_hw_interrupt_enable(level);

        /* do work */
        work->work_func(work, work->work_data);
        level = rt_hw_interrupt_disable();
        /* clean current work */
        queue->work_current = RT_NULL;
        rt_hw_interrupt_enable(level);
83 84 85

        /* ack work completion */
        _workqueue_work_completion(queue);
86
    }
87 88
}

89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
static rt_err_t _workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work)
{
    rt_base_t level;

    level = rt_hw_interrupt_disable();
    if (work->flags & RT_WORK_STATE_PENDING)
    {
        rt_hw_interrupt_enable(level);
        return -RT_EBUSY;
    }

    if (queue->work_current == work)
    {
        rt_hw_interrupt_enable(level);
        return -RT_EBUSY;
    }

    /* NOTE: the work MUST be initialized firstly */
    rt_list_remove(&(work->list));

    rt_list_insert_after(queue->work_list.prev, &(work->list));
    work->flags |= RT_WORK_STATE_PENDING;

    /* whether the workqueue is doing work */
    if (queue->work_current == RT_NULL)
    {
        rt_hw_interrupt_enable(level);
        /* resume work thread */
        rt_thread_resume(queue->work_thread);
        rt_schedule();
    }
    else
    {
        rt_hw_interrupt_enable(level);
    }

    return RT_EOK;
}

static rt_err_t _workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
{
    rt_base_t level;

    level = rt_hw_interrupt_disable();
    if (queue->work_current == work)
    {
        rt_hw_interrupt_enable(level);
        return -RT_EBUSY;
    }
    rt_list_remove(&(work->list));
    work->flags &= ~RT_WORK_STATE_PENDING;
    rt_hw_interrupt_enable(level);

    return RT_EOK;
}

145
static rt_err_t _workqueue_cancel_delayed_work(struct rt_work *work)
146 147 148 149 150 151 152 153 154 155
{
    rt_base_t level;
    int ret = RT_EOK;

    if (!work->workqueue)
    {
        ret = -EINVAL;
        goto __exit;
    }

156
    if (work->flags & RT_WORK_STATE_PENDING)
157 158
    {
        /* Remove from the queue if already submitted */
159
        ret = _workqueue_cancel_work(work->workqueue, work);
160 161 162 163 164 165 166
        if (ret)
        {
            goto __exit;
        }
    }
    else
    {
167
        if (work->flags & RT_WORK_STATE_SUBMITTING)
168 169 170 171
        {
            level = rt_hw_interrupt_disable();
            rt_timer_stop(&(work->timer));
            rt_timer_detach(&(work->timer));
172
            work->flags &= ~RT_WORK_STATE_SUBMITTING;
173 174
            rt_hw_interrupt_enable(level);
        }
175 176 177 178 179
    }

    level = rt_hw_interrupt_disable();
    /* Detach from workqueue */
    work->workqueue = RT_NULL;
180
    work->flags &= ~(RT_WORK_STATE_PENDING);
181 182 183 184 185 186 187
    rt_hw_interrupt_enable(level);

__exit:
    return ret;
}

static rt_err_t _workqueue_submit_delayed_work(struct rt_workqueue *queue,
188
        struct rt_work *work, rt_tick_t ticks)
189 190
{
    rt_base_t level;
191
    rt_err_t ret = RT_EOK;
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217

    /* Work cannot be active in multiple queues */
    if (work->workqueue && work->workqueue != queue)
    {
        ret = -RT_EINVAL;
        goto __exit;
    }

    /* Cancel if work has been submitted */
    if (work->workqueue == queue)
    {
        ret = _workqueue_cancel_delayed_work(work);
        if (ret < 0)
        {
            goto __exit;
        }
    }

    level = rt_hw_interrupt_disable();
    /* Attach workqueue so the timeout callback can submit it */
    work->workqueue = queue;
    rt_hw_interrupt_enable(level);

    if (!ticks)
    {
        /* Submit work if no ticks is 0 */
218
        ret = _workqueue_submit_work(work->workqueue, work);
219 220 221
    }
    else
    {
222
        level = rt_hw_interrupt_disable();
223
        /* Add timeout */
224
        work->flags |= RT_WORK_STATE_SUBMITTING;
225 226 227
        rt_timer_init(&(work->timer), "work", _delayed_work_timeout_handler, work, ticks,
                      RT_TIMER_FLAG_ONE_SHOT | RT_TIMER_FLAG_SOFT_TIMER);
        rt_hw_interrupt_enable(level);
228 229 230 231 232 233 234 235 236
        rt_timer_start(&(work->timer));
    }

__exit:
    return ret;
}

static void _delayed_work_timeout_handler(void *parameter)
{
237
    struct rt_work *delayed_work;
238
    rt_base_t level;
239

240
    delayed_work = (struct rt_work *)parameter;
241
    level = rt_hw_interrupt_disable();
242
    rt_timer_stop(&(delayed_work->timer));
243
    rt_timer_detach(&(delayed_work->timer));
244 245
    delayed_work->flags &= ~RT_WORK_STATE_SUBMITTING;
    delayed_work->type &= ~RT_WORK_TYPE_DELAYED;
246
    rt_hw_interrupt_enable(level);
247
    _workqueue_submit_work(delayed_work->workqueue, delayed_work);
248 249 250
}

struct rt_workqueue *rt_workqueue_create(const char *name, rt_uint16_t stack_size, rt_uint8_t priority)
251
{
252
    struct rt_workqueue *queue = RT_NULL;
253

254
    queue = (struct rt_workqueue *)RT_KERNEL_MALLOC(sizeof(struct rt_workqueue));
255 256
    if (queue != RT_NULL)
    {
257 258
        /* initialize work list */
        rt_list_init(&(queue->work_list));
259
        queue->work_current = RT_NULL;
260
        rt_sem_init(&(queue->sem), "wqueue", 0, RT_IPC_FLAG_FIFO);
261 262 263 264 265 266 267 268 269 270 271 272 273

        /* create the work thread */
        queue->work_thread = rt_thread_create(name, _workqueue_thread_entry, queue, stack_size, priority, 10);
        if (queue->work_thread == RT_NULL)
        {
            RT_KERNEL_FREE(queue);
            return RT_NULL;
        }

        rt_thread_startup(queue->work_thread);
    }

    return queue;
274 275
}

276
rt_err_t rt_workqueue_destroy(struct rt_workqueue *queue)
277
{
278
    RT_ASSERT(queue != RT_NULL);
279

280 281
    rt_thread_delete(queue->work_thread);
    RT_KERNEL_FREE(queue);
282

283
    return RT_EOK;
284 285
}

286
rt_err_t rt_workqueue_dowork(struct rt_workqueue *queue, struct rt_work *work)
287
{
288 289 290
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);

291 292
    return _workqueue_submit_work(queue, work);
}
293

294 295 296 297
rt_err_t rt_workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work, rt_tick_t time)
{
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);
298

299 300 301 302 303
    if (time > 0)
    {
        work->type |= RT_WORK_TYPE_DELAYED;
    }

304
    if (work->type & RT_WORK_TYPE_DELAYED)
305
    {
306
        return _workqueue_submit_delayed_work(queue, work, time);
307 308 309 310
    }
    else
    {
        return _workqueue_submit_work(queue, work);
311
    }
312 313
}

314
rt_err_t rt_workqueue_critical_work(struct rt_workqueue *queue, struct rt_work *work)
315
{
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
    rt_base_t level;
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);

    level = rt_hw_interrupt_disable();
    if (queue->work_current == work)
    {
        rt_hw_interrupt_enable(level);
        return -RT_EBUSY;
    }

    /* NOTE: the work MUST be initialized firstly */
    rt_list_remove(&(work->list));

    rt_list_insert_after(queue->work_list.prev, &(work->list));
    if (queue->work_current == RT_NULL)
    {
        rt_hw_interrupt_enable(level);
        /* resume work thread */
        rt_thread_resume(queue->work_thread);
        rt_schedule();
    }
    else rt_hw_interrupt_enable(level);

    return RT_EOK;
341 342
}

343
rt_err_t rt_workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
344
{
345 346
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);
347

348
    if (work->type & RT_WORK_TYPE_DELAYED)
349
    {
350
        return _workqueue_cancel_delayed_work(work);
351 352 353 354
    }
    else
    {
        return _workqueue_cancel_work(queue, work);
355
    }
356 357
}

358
rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue *queue, struct rt_work *work)
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
{
    rt_base_t level;

    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);

    level = rt_hw_interrupt_disable();
    if (queue->work_current == work) /* it's current work in the queue */
    {
        /* wait for work completion */
        rt_sem_take(&(queue->sem), RT_WAITING_FOREVER);
    }
    else
    {
        rt_list_remove(&(work->list));
    }
375
    work->flags &= ~RT_WORK_STATE_PENDING;
376 377 378 379 380
    rt_hw_interrupt_enable(level);

    return RT_EOK;
}

381
rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue *queue)
382
{
383 384 385 386 387 388 389 390 391 392 393 394
    struct rt_list_node *node, *next;
    RT_ASSERT(queue != RT_NULL);

    rt_enter_critical();
    for (node = queue->work_list.next; node != &(queue->work_list); node = next)
    {
        next = node->next;
        rt_list_remove(node);
    }
    rt_exit_critical();

    return RT_EOK;
395 396
}

397 398 399
void rt_delayed_work_init(struct rt_delayed_work *work, void (*work_func)(struct rt_work *work,
                          void *work_data), void *work_data)
{
400
    rt_work_init(&work->work, work_func, work_data);
401
}
402

403 404 405 406 407 408 409 410 411 412 413 414 415
#ifdef RT_USING_SYSTEM_WORKQUEUE
static struct rt_workqueue *sys_workq;

rt_err_t rt_work_submit(struct rt_work *work, rt_tick_t time)
{
    return rt_workqueue_submit_work(sys_workq, work, time);
}

rt_err_t rt_work_cancel(struct rt_work *work)
{
    return rt_workqueue_cancel_work(sys_workq, work);
}

416
int rt_work_sys_workqueue_init(void)
417
{
418 419 420
    if (sys_workq != RT_NULL)
        return 0;

421
    sys_workq = rt_workqueue_create("sys_work", RT_SYSTEM_WORKQUEUE_STACKSIZE,
422 423 424 425 426
                                    RT_SYSTEM_WORKQUEUE_PRIORITY);

    return RT_EOK;
}

427
INIT_PREV_EXPORT(rt_work_sys_workqueue_init);
428 429
#endif
#endif