workqueue.c 10.4 KB
Newer Older
1
/*
2
 * Copyright (c) 2006-2018, RT-Thread Development Team
3
 *
4
 * SPDX-License-Identifier: Apache-2.0
5 6 7 8 9 10 11
 *
 * Change Logs:
 * Date           Author       Notes
 * 2017-02-27     bernard      fix the re-work issue.
 */

#include <rthw.h>
12 13 14 15
#include <rtthread.h>
#include <rtdevice.h>

#ifdef RT_USING_HEAP
16

17 18
static void _delayed_work_timeout_handler(void *parameter);

19 20 21
rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue)
{
    rt_err_t result;
22 23

    rt_enter_critical();
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
    while (1)
    {
        /* try to take condition semaphore */
        result = rt_sem_trytake(&(queue->sem));
        if (result == -RT_ETIMEOUT)
        {
            /* it's timeout, release this semaphore */
            rt_sem_release(&(queue->sem));
        }
        else if (result == RT_EOK)
        {
            /* keep the sem value = 0 */
            result = RT_EOK;
            break;
        }
        else
        {
            result = -RT_ERROR;
            break;
        }
    }
    rt_exit_critical();
46

47 48 49
    return result;
}

50
static void _workqueue_thread_entry(void *parameter)
51
{
52
    rt_base_t level;
53 54
    struct rt_work *work;
    struct rt_workqueue *queue;
55

56
    queue = (struct rt_workqueue *) parameter;
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
    RT_ASSERT(queue != RT_NULL);

    while (1)
    {
        if (rt_list_isempty(&(queue->work_list)))
        {
            /* no software timer exist, suspend self. */
            rt_thread_suspend(rt_thread_self());
            rt_schedule();
        }

        /* we have work to do with. */
        level = rt_hw_interrupt_disable();
        work = rt_list_entry(queue->work_list.next, struct rt_work, list);
        rt_list_remove(&(work->list));
        queue->work_current = work;
73
        work->flags &= ~RT_WORK_STATE_PENDING;
74 75 76 77 78 79 80 81
        rt_hw_interrupt_enable(level);

        /* do work */
        work->work_func(work, work->work_data);
        level = rt_hw_interrupt_disable();
        /* clean current work */
        queue->work_current = RT_NULL;
        rt_hw_interrupt_enable(level);
82 83 84

        /* ack work completion */
        _workqueue_work_completion(queue);
85
    }
86 87
}

88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
static rt_err_t _workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work)
{
    rt_base_t level;

    level = rt_hw_interrupt_disable();
    if (work->flags & RT_WORK_STATE_PENDING)
    {
        rt_hw_interrupt_enable(level);
        return -RT_EBUSY;
    }

    if (queue->work_current == work)
    {
        rt_hw_interrupt_enable(level);
        return -RT_EBUSY;
    }

    /* NOTE: the work MUST be initialized firstly */
    rt_list_remove(&(work->list));

    rt_list_insert_after(queue->work_list.prev, &(work->list));
    work->flags |= RT_WORK_STATE_PENDING;

    /* whether the workqueue is doing work */
    if (queue->work_current == RT_NULL)
    {
        rt_hw_interrupt_enable(level);
        /* resume work thread */
        rt_thread_resume(queue->work_thread);
        rt_schedule();
    }
    else
    {
        rt_hw_interrupt_enable(level);
    }

    return RT_EOK;
}

static rt_err_t _workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
{
    rt_base_t level;

    level = rt_hw_interrupt_disable();
    if (queue->work_current == work)
    {
        rt_hw_interrupt_enable(level);
        return -RT_EBUSY;
    }
    rt_list_remove(&(work->list));
    work->flags &= ~RT_WORK_STATE_PENDING;
    rt_hw_interrupt_enable(level);

    return RT_EOK;
}

static rt_err_t _workqueue_cancel_delayed_work(struct rt_delayed_work *work)
{
    rt_base_t level;
    int ret = RT_EOK;

    if (!work->workqueue)
    {
        ret = -EINVAL;
        goto __exit;
    }

    if (work->work.flags & RT_WORK_STATE_PENDING)
    {
        /* Remove from the queue if already submitted */
        ret = rt_workqueue_cancel_work(work->workqueue, &(work->work));
        if (ret)
        {
            goto __exit;
        }
    }
    else
    {
166 167 168 169 170 171 172 173
        if (work->work.flags & RT_WORK_STATE_SUBMITTING)
        {
            level = rt_hw_interrupt_disable();
            rt_timer_stop(&(work->timer));
            rt_timer_detach(&(work->timer));
            work->work.flags &= ~RT_WORK_STATE_SUBMITTING;
            rt_hw_interrupt_enable(level);
        }
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
    }

    level = rt_hw_interrupt_disable();
    /* Detach from workqueue */
    work->workqueue = RT_NULL;
    work->work.flags &= ~(RT_WORK_STATE_PENDING);
    rt_hw_interrupt_enable(level);

__exit:
    return ret;
}

static rt_err_t _workqueue_submit_delayed_work(struct rt_workqueue *queue,
        struct rt_delayed_work *work, rt_tick_t ticks)
{
    rt_base_t level;
    int ret = RT_EOK;


    /* Work cannot be active in multiple queues */
    if (work->workqueue && work->workqueue != queue)
    {
        ret = -RT_EINVAL;
        goto __exit;
    }

    /* Cancel if work has been submitted */
    if (work->workqueue == queue)
    {
        ret = _workqueue_cancel_delayed_work(work);
        if (ret < 0)
        {
            goto __exit;
        }
    }

    level = rt_hw_interrupt_disable();
    /* Attach workqueue so the timeout callback can submit it */
    work->workqueue = queue;
    rt_hw_interrupt_enable(level);

    if (!ticks)
    {
        /* Submit work if no ticks is 0 */
        _workqueue_submit_work(work->workqueue, &(work->work));
    }
    else
    {
222
        level = rt_hw_interrupt_disable();
223
        /* Add timeout */
224 225 226 227
        work->work.flags |= RT_WORK_STATE_SUBMITTING;
        rt_timer_init(&(work->timer), "work", _delayed_work_timeout_handler, work, ticks,
                      RT_TIMER_FLAG_ONE_SHOT | RT_TIMER_FLAG_SOFT_TIMER);
        rt_hw_interrupt_enable(level);
228 229 230 231 232 233 234 235 236 237
        rt_timer_start(&(work->timer));
    }

__exit:
    return ret;
}

static void _delayed_work_timeout_handler(void *parameter)
{
    struct rt_delayed_work *delayed_work;
238
    rt_base_t level;
239 240

    delayed_work = (struct rt_delayed_work *)parameter;
241
    level = rt_hw_interrupt_disable();
242
    rt_timer_stop(&(delayed_work->timer));
243 244 245
    rt_timer_detach(&(delayed_work->timer));
    delayed_work->work.flags &= ~RT_WORK_STATE_SUBMITTING;
    rt_hw_interrupt_enable(level);
246 247 248 249
    _workqueue_submit_work(delayed_work->workqueue, &(delayed_work->work));
}

struct rt_workqueue *rt_workqueue_create(const char *name, rt_uint16_t stack_size, rt_uint8_t priority)
250
{
251
    struct rt_workqueue *queue = RT_NULL;
252

253
    queue = (struct rt_workqueue *)RT_KERNEL_MALLOC(sizeof(struct rt_workqueue));
254 255
    if (queue != RT_NULL)
    {
256 257
        /* initialize work list */
        rt_list_init(&(queue->work_list));
258
        queue->work_current = RT_NULL;
259
        rt_sem_init(&(queue->sem), "wqueue", 0, RT_IPC_FLAG_FIFO);
260 261 262 263 264 265 266 267 268 269 270 271 272

        /* create the work thread */
        queue->work_thread = rt_thread_create(name, _workqueue_thread_entry, queue, stack_size, priority, 10);
        if (queue->work_thread == RT_NULL)
        {
            RT_KERNEL_FREE(queue);
            return RT_NULL;
        }

        rt_thread_startup(queue->work_thread);
    }

    return queue;
273 274
}

275
rt_err_t rt_workqueue_destroy(struct rt_workqueue *queue)
276
{
277
    RT_ASSERT(queue != RT_NULL);
278

279 280
    rt_thread_delete(queue->work_thread);
    RT_KERNEL_FREE(queue);
281

282
    return RT_EOK;
283 284
}

285
rt_err_t rt_workqueue_dowork(struct rt_workqueue *queue, struct rt_work *work)
286
{
287 288 289
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);

290 291
    return _workqueue_submit_work(queue, work);
}
292

293 294 295 296
rt_err_t rt_workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work, rt_tick_t time)
{
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);
297

298
    if (work->type & RT_WORK_TYPE_DELAYED)
299
    {
300 301 302 303 304
        return _workqueue_submit_delayed_work(queue, (struct rt_delayed_work *)work, time);
    }
    else
    {
        return _workqueue_submit_work(queue, work);
305
    }
306 307
}

308
rt_err_t rt_workqueue_critical_work(struct rt_workqueue *queue, struct rt_work *work)
309
{
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
    rt_base_t level;
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);

    level = rt_hw_interrupt_disable();
    if (queue->work_current == work)
    {
        rt_hw_interrupt_enable(level);
        return -RT_EBUSY;
    }

    /* NOTE: the work MUST be initialized firstly */
    rt_list_remove(&(work->list));

    rt_list_insert_after(queue->work_list.prev, &(work->list));
    if (queue->work_current == RT_NULL)
    {
        rt_hw_interrupt_enable(level);
        /* resume work thread */
        rt_thread_resume(queue->work_thread);
        rt_schedule();
    }
    else rt_hw_interrupt_enable(level);

    return RT_EOK;
335 336
}

337
rt_err_t rt_workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
338
{
339 340
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);
341

342
    if (work->type & RT_WORK_TYPE_DELAYED)
343
    {
344 345 346 347 348
        return _workqueue_cancel_delayed_work((struct rt_delayed_work *)work);
    }
    else
    {
        return _workqueue_cancel_work(queue, work);
349
    }
350 351
}

352
rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue *queue, struct rt_work *work)
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
{
    rt_base_t level;

    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);

    level = rt_hw_interrupt_disable();
    if (queue->work_current == work) /* it's current work in the queue */
    {
        /* wait for work completion */
        rt_sem_take(&(queue->sem), RT_WAITING_FOREVER);
    }
    else
    {
        rt_list_remove(&(work->list));
    }
369
    work->flags &= ~RT_WORK_STATE_PENDING;
370 371 372 373 374
    rt_hw_interrupt_enable(level);

    return RT_EOK;
}

375
rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue *queue)
376
{
377 378 379 380 381 382 383 384 385 386 387 388
    struct rt_list_node *node, *next;
    RT_ASSERT(queue != RT_NULL);

    rt_enter_critical();
    for (node = queue->work_list.next; node != &(queue->work_list); node = next)
    {
        next = node->next;
        rt_list_remove(node);
    }
    rt_exit_critical();

    return RT_EOK;
389 390
}

391 392 393 394 395 396
void rt_delayed_work_init(struct rt_delayed_work *work, void (*work_func)(struct rt_work *work,
                          void *work_data), void *work_data)
{
    rt_work_init(&(work->work), work_func, work_data);
    work->work.type = RT_WORK_TYPE_DELAYED;
}
397

398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
#ifdef RT_USING_SYSTEM_WORKQUEUE
static struct rt_workqueue *sys_workq;

rt_err_t rt_work_submit(struct rt_work *work, rt_tick_t time)
{
    return rt_workqueue_submit_work(sys_workq, work, time);
}

rt_err_t rt_work_cancel(struct rt_work *work)
{
    return rt_workqueue_cancel_work(sys_workq, work);
}

static int rt_work_sys_workqueue_init(void)
{
413
    sys_workq = rt_workqueue_create("sys_work", RT_SYSTEM_WORKQUEUE_STACKSIZE,
414 415 416 417 418 419 420 421
                                    RT_SYSTEM_WORKQUEUE_PRIORITY);

    return RT_EOK;
}

INIT_DEVICE_EXPORT(rt_work_sys_workqueue_init);
#endif
#endif