workqueue.c 13.7 KB
Newer Older
1
/*
mysterywolf's avatar
mysterywolf 已提交
2
 * Copyright (c) 2006-2022, RT-Thread Development Team
3
 *
4
 * SPDX-License-Identifier: Apache-2.0
5 6 7
 *
 * Change Logs:
 * Date           Author       Notes
8 9
 * 2017-02-27     Bernard      fix the re-work issue.
 * 2021-08-01     Meco Man     remove rt_delayed_work_init()
mysterywolf's avatar
mysterywolf 已提交
10 11
 * 2021-08-14     Jackistang   add comments for function interface
 * 2022-01-16     Meco Man     add rt_work_urgent()
12 13 14
 */

#include <rthw.h>
15 16 17 18
#include <rtthread.h>
#include <rtdevice.h>

#ifdef RT_USING_HEAP
19

20 21
static void _delayed_work_timeout_handler(void *parameter);

22 23 24
rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue)
{
    rt_err_t result;
25 26

    rt_enter_critical();
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
    while (1)
    {
        /* try to take condition semaphore */
        result = rt_sem_trytake(&(queue->sem));
        if (result == -RT_ETIMEOUT)
        {
            /* it's timeout, release this semaphore */
            rt_sem_release(&(queue->sem));
        }
        else if (result == RT_EOK)
        {
            /* keep the sem value = 0 */
            result = RT_EOK;
            break;
        }
        else
        {
            result = -RT_ERROR;
            break;
        }
    }
    rt_exit_critical();
49

50 51 52
    return result;
}

53
static void _workqueue_thread_entry(void *parameter)
54
{
55
    rt_base_t level;
56 57
    struct rt_work *work;
    struct rt_workqueue *queue;
58

59
    queue = (struct rt_workqueue *) parameter;
60 61 62 63
    RT_ASSERT(queue != RT_NULL);

    while (1)
    {
64
        level = rt_hw_interrupt_disable();
65 66 67
        if (rt_list_isempty(&(queue->work_list)))
        {
            /* no software timer exist, suspend self. */
G
guo 已提交
68
            rt_thread_suspend_with_flag(rt_thread_self(), RT_UNINTERRUPTIBLE);
69
            rt_hw_interrupt_enable(level);
70
            rt_schedule();
71
            continue;
72 73 74 75 76 77
        }

        /* we have work to do with. */
        work = rt_list_entry(queue->work_list.next, struct rt_work, list);
        rt_list_remove(&(work->list));
        queue->work_current = work;
78
        work->flags &= ~RT_WORK_STATE_PENDING;
79
        work->workqueue = RT_NULL;
80 81 82 83 84 85
        rt_hw_interrupt_enable(level);

        /* do work */
        work->work_func(work, work->work_data);
        /* clean current work */
        queue->work_current = RT_NULL;
86 87 88

        /* ack work completion */
        _workqueue_work_completion(queue);
89
    }
90 91
}

92
static rt_err_t _workqueue_submit_work(struct rt_workqueue *queue,
A
Aligagago 已提交
93
                                       struct rt_work *work, rt_tick_t ticks)
94 95 96 97
{
    rt_base_t level;

    level = rt_hw_interrupt_disable();
98

99
    /* remove list */
100
    rt_list_remove(&(work->list));
101
    work->flags &= ~RT_WORK_STATE_PENDING;
mysterywolf's avatar
mysterywolf 已提交
102

103
    if (ticks == 0)
104
    {
105 106 107
        rt_list_insert_after(queue->work_list.prev, &(work->list));
        work->flags |= RT_WORK_STATE_PENDING;
        work->workqueue = queue;
108 109 110

        /* whether the workqueue is doing work */
        if (queue->work_current == RT_NULL &&
111
                ((queue->work_thread->stat & RT_THREAD_STAT_MASK) == RT_THREAD_SUSPEND))
112 113 114
        {
            /* resume work thread */
            rt_thread_resume(queue->work_thread);
115
            rt_hw_interrupt_enable(level);
116 117 118 119 120 121
            rt_schedule();
        }
        else
        {
            rt_hw_interrupt_enable(level);
        }
122
        return RT_EOK;
123
    }
124
    else if (ticks < RT_TICK_MAX / 2)
125
    {
126 127 128 129 130 131 132 133 134
        /* Timer started */
        if (work->flags & RT_WORK_STATE_SUBMITTING)
        {
            rt_timer_stop(&work->timer);
            rt_timer_control(&work->timer, RT_TIMER_CTRL_SET_TIME, &ticks);
        }
        else
        {
            rt_timer_init(&(work->timer), "work", _delayed_work_timeout_handler,
A
Aligagago 已提交
135
                          work, ticks, RT_TIMER_FLAG_ONE_SHOT | RT_TIMER_FLAG_SOFT_TIMER);
136 137 138
            work->flags |= RT_WORK_STATE_SUBMITTING;
        }
        work->workqueue = queue;
139 140
        /* insert delay work list */
        rt_list_insert_after(queue->delayed_list.prev, &(work->list));
141
        rt_hw_interrupt_enable(level);
142 143
        rt_timer_start(&(work->timer));
        return RT_EOK;
144
    }
145 146
    rt_hw_interrupt_enable(level);
    return -RT_ERROR;
147 148
}

149
static rt_err_t _workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
150 151
{
    rt_base_t level;
152
    rt_err_t err;
153 154 155 156

    level = rt_hw_interrupt_disable();
    rt_list_remove(&(work->list));
    work->flags &= ~RT_WORK_STATE_PENDING;
157 158
    /* Timer started */
    if (work->flags & RT_WORK_STATE_SUBMITTING)
159
    {
160 161 162
        rt_timer_stop(&(work->timer));
        rt_timer_detach(&(work->timer));
        work->flags &= ~RT_WORK_STATE_SUBMITTING;
163
    }
164
    err = queue->work_current != work ? RT_EOK : -RT_EBUSY;
165 166
    work->workqueue = RT_NULL;
    rt_hw_interrupt_enable(level);
167
    return err;
168 169 170 171
}

static void _delayed_work_timeout_handler(void *parameter)
{
172 173
    struct rt_work *work;
    struct rt_workqueue *queue;
174
    rt_base_t level;
175

176 177 178 179
    work = (struct rt_work *)parameter;
    queue = work->workqueue;
    RT_ASSERT(queue != RT_NULL);

180
    level = rt_hw_interrupt_disable();
181 182
    rt_timer_detach(&(work->timer));
    work->flags &= ~RT_WORK_STATE_SUBMITTING;
183 184
    /* remove delay list */
    rt_list_remove(&(work->list));
185 186 187 188 189 190 191 192
    /* insert work queue */
    if (queue->work_current != work)
    {
        rt_list_insert_after(queue->work_list.prev, &(work->list));
        work->flags |= RT_WORK_STATE_PENDING;
    }
    /* whether the workqueue is doing work */
    if (queue->work_current == RT_NULL &&
193
            ((queue->work_thread->stat & RT_THREAD_STAT_MASK) == RT_THREAD_SUSPEND))
194 195 196
    {
        /* resume work thread */
        rt_thread_resume(queue->work_thread);
197
        rt_hw_interrupt_enable(level);
198 199 200 201 202 203
        rt_schedule();
    }
    else
    {
        rt_hw_interrupt_enable(level);
    }
204 205
}

206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
/**
 * @brief Initialize a work item, binding with a callback function.
 *
 * @param work is a pointer to the work item object.
 *
 * @param work_func is a callback function that will be called when this work item is executed.
 *
 * @param work_data is a user data passed to the callback function as the second parameter.
 */
void rt_work_init(struct rt_work *work,
                  void (*work_func)(struct rt_work *work, void *work_data),
                  void *work_data)
{
    RT_ASSERT(work != RT_NULL);
    RT_ASSERT(work_func != RT_NULL);

    rt_list_init(&(work->list));
    work->work_func = work_func;
    work->work_data = work_data;
    work->workqueue = RT_NULL;
    work->flags = 0;
    work->type = 0;
}

230
/**
231
 * @brief Create a work queue with a thread inside.
J
Jackistang 已提交
232
 *
mysterywolf's avatar
mysterywolf 已提交
233
 * @param name is a name of the work queue thread.
J
Jackistang 已提交
234
 *
mysterywolf's avatar
mysterywolf 已提交
235 236 237 238 239
 * @param stack_size is stack size of the work queue thread.
 *
 * @param priority is a priority of the work queue thread.
 *
 * @return Return a pointer to the workqueue object. It will return RT_NULL if failed.
240
 */
241
struct rt_workqueue *rt_workqueue_create(const char *name, rt_uint16_t stack_size, rt_uint8_t priority)
242
{
243
    struct rt_workqueue *queue = RT_NULL;
244

245
    queue = (struct rt_workqueue *)RT_KERNEL_MALLOC(sizeof(struct rt_workqueue));
246 247
    if (queue != RT_NULL)
    {
248 249
        /* initialize work list */
        rt_list_init(&(queue->work_list));
250
        rt_list_init(&(queue->delayed_list));
251
        queue->work_current = RT_NULL;
252
        rt_sem_init(&(queue->sem), "wqueue", 0, RT_IPC_FLAG_FIFO);
253 254 255 256 257 258 259 260 261 262 263 264 265

        /* create the work thread */
        queue->work_thread = rt_thread_create(name, _workqueue_thread_entry, queue, stack_size, priority, 10);
        if (queue->work_thread == RT_NULL)
        {
            RT_KERNEL_FREE(queue);
            return RT_NULL;
        }

        rt_thread_startup(queue->work_thread);
    }

    return queue;
266 267
}

268 269
/**
 * @brief Destroy a work queue.
J
Jackistang 已提交
270
 *
mysterywolf's avatar
mysterywolf 已提交
271
 * @param queue is a pointer to the workqueue object.
J
Jackistang 已提交
272
 *
mysterywolf's avatar
mysterywolf 已提交
273
 * @return RT_EOK     Success.
274
 */
275
rt_err_t rt_workqueue_destroy(struct rt_workqueue *queue)
276
{
277
    RT_ASSERT(queue != RT_NULL);
278

279
    rt_workqueue_cancel_all_work(queue);
280
    rt_thread_delete(queue->work_thread);
281
    rt_sem_detach(&(queue->sem));
282
    RT_KERNEL_FREE(queue);
283

284
    return RT_EOK;
285 286
}

287
/**
288
 * @brief Submit a work item to the work queue without delay.
J
Jackistang 已提交
289
 *
mysterywolf's avatar
mysterywolf 已提交
290 291 292
 * @param queue is a pointer to the workqueue object.
 *
 * @param work is a pointer to the work item object.
J
Jackistang 已提交
293
 *
294
 * @return RT_EOK       Success.
mysterywolf's avatar
mysterywolf 已提交
295
 *         -RT_EBUSY    This work item is executing.
296
 */
297
rt_err_t rt_workqueue_dowork(struct rt_workqueue *queue, struct rt_work *work)
298
{
299 300 301
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);

302
    return _workqueue_submit_work(queue, work, 0);
303
}
304

305
/**
306
 * @brief Submit a work item to the work queue with a delay.
J
Jackistang 已提交
307
 *
mysterywolf's avatar
mysterywolf 已提交
308 309 310 311
 * @param queue is a pointer to the workqueue object.
 *
 * @param work is a pointer to the work item object.
 *
312
 * @param ticks is the delay ticks for the work item to be submitted to the work queue.
mysterywolf's avatar
mysterywolf 已提交
313 314
 *
 *             NOTE: The max timeout tick should be no more than (RT_TICK_MAX/2 - 1)
J
Jackistang 已提交
315
 *
316
 * @return RT_EOK       Success.
mysterywolf's avatar
mysterywolf 已提交
317
 *         -RT_EBUSY    This work item is executing.
318
 *         -RT_ERROR    The ticks parameter is invalid.
319
 */
320
rt_err_t rt_workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work, rt_tick_t ticks)
321 322 323
{
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);
324
    RT_ASSERT(ticks < RT_TICK_MAX / 2);
325

326
    return _workqueue_submit_work(queue, work, ticks);
327 328
}

329
/**
330
 * @brief Submit a work item to the work queue without delay. This work item will be executed after the current work item.
J
Jackistang 已提交
331
 *
mysterywolf's avatar
mysterywolf 已提交
332 333 334
 * @param queue is a pointer to the workqueue object.
 *
 * @param work is a pointer to the work item object.
J
Jackistang 已提交
335 336
 *
 * @return RT_EOK   Success.
337
 */
338
rt_err_t rt_workqueue_urgent_work(struct rt_workqueue *queue, struct rt_work *work)
339
{
340
    rt_base_t level;
mysterywolf's avatar
mysterywolf 已提交
341

342 343 344 345 346 347
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);

    level = rt_hw_interrupt_disable();
    /* NOTE: the work MUST be initialized firstly */
    rt_list_remove(&(work->list));
T
tangyuxin 已提交
348 349 350
    rt_list_insert_after(&queue->work_list, &(work->list));
    /* whether the workqueue is doing work */
    if (queue->work_current == RT_NULL &&
351
            ((queue->work_thread->stat & RT_THREAD_STAT_MASK) == RT_THREAD_SUSPEND))
352 353 354
    {
        /* resume work thread */
        rt_thread_resume(queue->work_thread);
355
        rt_hw_interrupt_enable(level);
356 357
        rt_schedule();
    }
T
tangyuxin 已提交
358 359 360 361
    else
    {
        rt_hw_interrupt_enable(level);
    }
362 363

    return RT_EOK;
364 365
}

366 367
/**
 * @brief Cancel a work item in the work queue.
J
Jackistang 已提交
368
 *
mysterywolf's avatar
mysterywolf 已提交
369 370 371
 * @param queue is a pointer to the workqueue object.
 *
 * @param work is a pointer to the work item object.
J
Jackistang 已提交
372
 *
373
 * @return RT_EOK       Success.
mysterywolf's avatar
mysterywolf 已提交
374
 *         -RT_EBUSY    This work item is executing.
375
 */
376
rt_err_t rt_workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
377
{
378
    RT_ASSERT(work != RT_NULL);
379
    RT_ASSERT(queue != RT_NULL);
mysterywolf's avatar
mysterywolf 已提交
380

381
    return _workqueue_cancel_work(queue, work);
382 383
}

384
/**
385
 * @brief Cancel a work item in the work queue. If the work item is executing, this function will block until it is done.
J
Jackistang 已提交
386
 *
mysterywolf's avatar
mysterywolf 已提交
387 388 389
 * @param queue is a pointer to the workqueue object.
 *
 * @param work is a pointer to the work item object.
J
Jackistang 已提交
390
 *
391 392
 * @return RT_EOK       Success.
 */
393
rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue *queue, struct rt_work *work)
394 395 396 397 398 399 400 401 402 403 404
{
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(work != RT_NULL);

    if (queue->work_current == work) /* it's current work in the queue */
    {
        /* wait for work completion */
        rt_sem_take(&(queue->sem), RT_WAITING_FOREVER);
    }
    else
    {
405
        _workqueue_cancel_work(queue, work);
406 407 408 409 410
    }

    return RT_EOK;
}

411
/**
412
 * @brief This function will cancel all work items in work queue.
J
Jackistang 已提交
413
 *
mysterywolf's avatar
mysterywolf 已提交
414
 * @param queue is a pointer to the workqueue object.
J
Jackistang 已提交
415
 *
416 417
 * @return RT_EOK       Success.
 */
418
rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue *queue)
419
{
420 421
    struct rt_work *work;

422 423
    RT_ASSERT(queue != RT_NULL);

424
    /* cancel work */
425
    rt_enter_critical();
426
    while (rt_list_isempty(&queue->work_list) == RT_FALSE)
427
    {
428
        work = rt_list_first_entry(&queue->work_list, struct rt_work, list);
429
        _workqueue_cancel_work(queue, work);
430
    }
431
    /* cancel delay work */
432
    while (rt_list_isempty(&queue->delayed_list) == RT_FALSE)
433 434
    {
        work = rt_list_first_entry(&queue->delayed_list, struct rt_work, list);
435
        _workqueue_cancel_work(queue, work);
436 437 438 439
    }
    rt_exit_critical();

    return RT_EOK;
440 441
}

442
#ifdef RT_USING_SYSTEM_WORKQUEUE
443 444

static struct rt_workqueue *sys_workq; /* system work queue */
445

446
/**
447
 * @brief Submit a work item to the system work queue with a delay.
J
Jackistang 已提交
448
 *
mysterywolf's avatar
mysterywolf 已提交
449 450
 * @param work is a pointer to the work item object.
 *
451
 * @param ticks is the delay OS ticks for the work item to be submitted to the work queue.
mysterywolf's avatar
mysterywolf 已提交
452 453
 *
 *             NOTE: The max timeout tick should be no more than (RT_TICK_MAX/2 - 1)
J
Jackistang 已提交
454
 *
455
 * @return RT_EOK       Success.
mysterywolf's avatar
mysterywolf 已提交
456
 *         -RT_EBUSY    This work item is executing.
457
 *         -RT_ERROR    The ticks parameter is invalid.
458
 */
459
rt_err_t rt_work_submit(struct rt_work *work, rt_tick_t ticks)
460
{
461
    return rt_workqueue_submit_work(sys_workq, work, ticks);
462 463
}

mysterywolf's avatar
mysterywolf 已提交
464 465 466 467 468 469 470 471 472 473 474 475
/**
 * @brief Submit a work item to the system work queue without delay. This work item will be executed after the current work item.
 *
 * @param work is a pointer to the work item object.
 *
 * @return RT_EOK   Success.
 */
rt_err_t rt_work_urgent(struct rt_work *work)
{
    return rt_workqueue_urgent_work(sys_workq, work);
}

476
/**
477
 * @brief Cancel a work item in the system work queue.
J
Jackistang 已提交
478
 *
mysterywolf's avatar
mysterywolf 已提交
479
 * @param work is a pointer to the work item object.
J
Jackistang 已提交
480
 *
481
 * @return RT_EOK       Success.
mysterywolf's avatar
mysterywolf 已提交
482
 *         -RT_EBUSY    This work item is executing.
483
 */
484 485 486 487 488
rt_err_t rt_work_cancel(struct rt_work *work)
{
    return rt_workqueue_cancel_work(sys_workq, work);
}

489
static int rt_work_sys_workqueue_init(void)
490
{
491
    if (sys_workq != RT_NULL)
T
tangyuxin 已提交
492
        return RT_EOK;
493

mysterywolf's avatar
mysterywolf 已提交
494
    sys_workq = rt_workqueue_create("sys workq", RT_SYSTEM_WORKQUEUE_STACKSIZE,
495
                                    RT_SYSTEM_WORKQUEUE_PRIORITY);
T
tangyuxin 已提交
496
    RT_ASSERT(sys_workq != RT_NULL);
497 498 499

    return RT_EOK;
}
500
INIT_PREV_EXPORT(rt_work_sys_workqueue_init);
501 502
#endif /* RT_USING_SYSTEM_WORKQUEUE */
#endif /* RT_USING_HEAP */