workqueue.c 9.8 KB
Newer Older
1
/*
2
 * Copyright (c) 2006-2018, RT-Thread Development Team
3
 *
4
 * SPDX-License-Identifier: Apache-2.0
5 6 7 8 9 10 11
 *
 * Change Logs:
 * Date           Author       Notes
 * 2017-02-27     bernard      fix the re-work issue.
 */

#include <rthw.h>
12 13 14 15
#include <rtthread.h>
#include <rtdevice.h>

#ifdef RT_USING_HEAP
16

17 18 19
rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue)
{
    rt_err_t result;
20 21

    rt_enter_critical();
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
    while (1)
    {
        /* try to take condition semaphore */
        result = rt_sem_trytake(&(queue->sem));
        if (result == -RT_ETIMEOUT)
        {
            /* it's timeout, release this semaphore */
            rt_sem_release(&(queue->sem));
        }
        else if (result == RT_EOK)
        {
            /* keep the sem value = 0 */
            result = RT_EOK;
            break;
        }
        else
        {
            result = -RT_ERROR;
            break;
        }
    }
    rt_exit_critical();
44

45 46 47
    return result;
}

48
static void _workqueue_thread_entry(void *parameter)
49
{
50
    rt_base_t level;
51 52
    struct rt_work *work;
    struct rt_workqueue *queue;
53

54
    queue = (struct rt_workqueue *) parameter;
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
    RT_ASSERT(queue != RT_NULL);

    while (1)
    {
        if (rt_list_isempty(&(queue->work_list)))
        {
            /* no software timer exist, suspend self. */
            rt_thread_suspend(rt_thread_self());
            rt_schedule();
        }

        /* we have work to do with. */
        level = rt_hw_interrupt_disable();
        work = rt_list_entry(queue->work_list.next, struct rt_work, list);
        rt_list_remove(&(work->list));
        queue->work_current = work;
71
        work->flags &= ~RT_WORK_STATE_PENDING;
72 73 74 75 76 77 78 79
        rt_hw_interrupt_enable(level);

        /* do work */
        work->work_func(work, work->work_data);
        level = rt_hw_interrupt_disable();
        /* clean current work */
        queue->work_current = RT_NULL;
        rt_hw_interrupt_enable(level);
80 81 82

        /* ack work completion */
        _workqueue_work_completion(queue);
83
    }
84 85
}

86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
static rt_err_t _workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work)
{
    rt_base_t level;

    level = rt_hw_interrupt_disable();
    if (work->flags & RT_WORK_STATE_PENDING)
    {
        rt_hw_interrupt_enable(level);
        return -RT_EBUSY;
    }

    if (queue->work_current == work)
    {
        rt_hw_interrupt_enable(level);
        return -RT_EBUSY;
    }

    /* NOTE: the work MUST be initialized firstly */
    rt_list_remove(&(work->list));

    rt_list_insert_after(queue->work_list.prev, &(work->list));
    work->flags |= RT_WORK_STATE_PENDING;

    /* whether the workqueue is doing work */
    if (queue->work_current == RT_NULL)
    {
        rt_hw_interrupt_enable(level);
        /* resume work thread */
        rt_thread_resume(queue->work_thread);
        rt_schedule();
    }
    else
    {
        rt_hw_interrupt_enable(level);
    }

    return RT_EOK;
}

static rt_err_t _workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
{
    rt_base_t level;

    level = rt_hw_interrupt_disable();
    if (queue->work_current == work)
    {
        rt_hw_interrupt_enable(level);
        return -RT_EBUSY;
    }
    rt_list_remove(&(work->list));
    work->flags &= ~RT_WORK_STATE_PENDING;
    rt_hw_interrupt_enable(level);

    return RT_EOK;
}

static rt_err_t _workqueue_cancel_delayed_work(struct rt_delayed_work *work)
{
    rt_base_t level;
    int ret = RT_EOK;

    if (!work->workqueue)
    {
        ret = -EINVAL;
        goto __exit;
    }

    if (work->work.flags & RT_WORK_STATE_PENDING)
    {
        /* Remove from the queue if already submitted */
        ret = rt_workqueue_cancel_work(work->workqueue, &(work->work));
        if (ret)
        {
            goto __exit;
        }
    }
    else
    {
        rt_timer_stop(&(work->timer));
    }

    level = rt_hw_interrupt_disable();
    /* Detach from workqueue */
    work->workqueue = RT_NULL;
    work->work.flags &= ~(RT_WORK_STATE_PENDING);
    rt_hw_interrupt_enable(level);

__exit:
    return ret;
}

static rt_err_t _workqueue_submit_delayed_work(struct rt_workqueue *queue,
        struct rt_delayed_work *work, rt_tick_t ticks)
{
    rt_base_t level;
    int ret = RT_EOK;


    /* Work cannot be active in multiple queues */
    if (work->workqueue && work->workqueue != queue)
    {
        ret = -RT_EINVAL;
        goto __exit;
    }

    /* Cancel if work has been submitted */
    if (work->workqueue == queue)
    {
        ret = _workqueue_cancel_delayed_work(work);
        if (ret < 0)
        {
            goto __exit;
        }
    }

    level = rt_hw_interrupt_disable();
    /* Attach workqueue so the timeout callback can submit it */
    work->workqueue = queue;
    rt_hw_interrupt_enable(level);

    if (!ticks)
    {
        /* Submit work if no ticks is 0 */
        _workqueue_submit_work(work->workqueue, &(work->work));
    }
    else
    {
        /* Add timeout */
        rt_timer_control(&(work->timer), RT_TIMER_CTRL_SET_TIME, &ticks);
        rt_timer_start(&(work->timer));
    }

__exit:
    return ret;
}

static void _delayed_work_timeout_handler(void *parameter)
{
    struct rt_delayed_work *delayed_work;

    delayed_work = (struct rt_delayed_work *)parameter;
    rt_timer_stop(&(delayed_work->timer));
    _workqueue_submit_work(delayed_work->workqueue, &(delayed_work->work));
}

struct rt_workqueue *rt_workqueue_create(const char *name, rt_uint16_t stack_size, rt_uint8_t priority)
232
{
233
    struct rt_workqueue *queue = RT_NULL;
234

235
    queue = (struct rt_workqueue *)RT_KERNEL_MALLOC(sizeof(struct rt_workqueue));
236 237
    if (queue != RT_NULL)
    {
238 239
        /* initialize work list */
        rt_list_init(&(queue->work_list));
240
        queue->work_current = RT_NULL;
241
        rt_sem_init(&(queue->sem), "wqueue", 0, RT_IPC_FLAG_FIFO);
242 243 244 245 246 247 248 249 250 251 252 253 254

        /* create the work thread */
        queue->work_thread = rt_thread_create(name, _workqueue_thread_entry, queue, stack_size, priority, 10);
        if (queue->work_thread == RT_NULL)
        {
            RT_KERNEL_FREE(queue);
            return RT_NULL;
        }

        rt_thread_startup(queue->work_thread);
    }

    return queue;
255 256
}

257
rt_err_t rt_workqueue_destroy(struct rt_workqueue *queue)
258
{
259
    RT_ASSERT(queue != RT_NULL);
260

261 262
    rt_thread_delete(queue->work_thread);
    RT_KERNEL_FREE(queue);
263

264
    return RT_EOK;
265 266
}

267
rt_err_t rt_workqueue_dowork(struct rt_workqueue *queue, struct rt_work *work)
268
{
269 270 271
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);

272 273
    return _workqueue_submit_work(queue, work);
}
274

275 276 277 278
rt_err_t rt_workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work, rt_tick_t time)
{
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);
279

280
    if (work->type & RT_WORK_TYPE_DELAYED)
281
    {
282 283 284 285 286
        return _workqueue_submit_delayed_work(queue, (struct rt_delayed_work *)work, time);
    }
    else
    {
        return _workqueue_submit_work(queue, work);
287
    }
288 289
}

290
rt_err_t rt_workqueue_critical_work(struct rt_workqueue *queue, struct rt_work *work)
291
{
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
    rt_base_t level;
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);

    level = rt_hw_interrupt_disable();
    if (queue->work_current == work)
    {
        rt_hw_interrupt_enable(level);
        return -RT_EBUSY;
    }

    /* NOTE: the work MUST be initialized firstly */
    rt_list_remove(&(work->list));

    rt_list_insert_after(queue->work_list.prev, &(work->list));
    if (queue->work_current == RT_NULL)
    {
        rt_hw_interrupt_enable(level);
        /* resume work thread */
        rt_thread_resume(queue->work_thread);
        rt_schedule();
    }
    else rt_hw_interrupt_enable(level);

    return RT_EOK;
317 318
}

319
rt_err_t rt_workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
320
{
321 322
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);
323

324
    if (work->type & RT_WORK_TYPE_DELAYED)
325
    {
326 327 328 329 330
        return _workqueue_cancel_delayed_work((struct rt_delayed_work *)work);
    }
    else
    {
        return _workqueue_cancel_work(queue, work);
331
    }
332 333
}

334
rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue *queue, struct rt_work *work)
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
{
    rt_base_t level;

    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);

    level = rt_hw_interrupt_disable();
    if (queue->work_current == work) /* it's current work in the queue */
    {
        /* wait for work completion */
        rt_sem_take(&(queue->sem), RT_WAITING_FOREVER);
    }
    else
    {
        rt_list_remove(&(work->list));
    }
351
    work->flags &= ~RT_WORK_STATE_PENDING;
352 353 354 355 356
    rt_hw_interrupt_enable(level);

    return RT_EOK;
}

357
rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue *queue)
358
{
359 360 361 362 363 364 365 366 367 368 369 370
    struct rt_list_node *node, *next;
    RT_ASSERT(queue != RT_NULL);

    rt_enter_critical();
    for (node = queue->work_list.next; node != &(queue->work_list); node = next)
    {
        next = node->next;
        rt_list_remove(node);
    }
    rt_exit_critical();

    return RT_EOK;
371 372
}

373 374 375 376 377 378 379 380
void rt_delayed_work_init(struct rt_delayed_work *work, void (*work_func)(struct rt_work *work,
                          void *work_data), void *work_data)
{
    rt_work_init(&(work->work), work_func, work_data);
    work->work.type = RT_WORK_TYPE_DELAYED;
    rt_timer_init(&(work->timer), "work", _delayed_work_timeout_handler, work, 0,
                  RT_TIMER_FLAG_ONE_SHOT | RT_TIMER_FLAG_SOFT_TIMER);
}
381

382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
#ifdef RT_USING_SYSTEM_WORKQUEUE
static struct rt_workqueue *sys_workq;

rt_err_t rt_work_submit(struct rt_work *work, rt_tick_t time)
{
    return rt_workqueue_submit_work(sys_workq, work, time);
}

rt_err_t rt_work_cancel(struct rt_work *work)
{
    return rt_workqueue_cancel_work(sys_workq, work);
}

static int rt_work_sys_workqueue_init(void)
{
    sys_workq = rt_workqueue_create("sys_work", RT_SYSTEM_WORKQUEUE_STACKSIZE * 4,
                                    RT_SYSTEM_WORKQUEUE_PRIORITY);

    return RT_EOK;
}

INIT_DEVICE_EXPORT(rt_work_sys_workqueue_init);
#endif
#endif