diff --git a/components/drivers/include/ipc/workqueue.h b/components/drivers/include/ipc/workqueue.h index 36eb825dc99f99252e0698304ada8d3a28011e8b..0b044b5ad31215c10a27091c4f9f5bb50c80d475 100644 --- a/components/drivers/include/ipc/workqueue.h +++ b/components/drivers/include/ipc/workqueue.h @@ -8,6 +8,8 @@ struct rt_workqueue { rt_list_t work_list; struct rt_work *work_current; /* current work */ + + struct rt_semaphore sem; rt_thread_t work_thread; }; @@ -27,6 +29,7 @@ struct rt_workqueue *rt_workqueue_create(const char* name, rt_uint16_t stack_siz rt_err_t rt_workqueue_destroy(struct rt_workqueue* queue); rt_err_t rt_workqueue_dowork(struct rt_workqueue* queue, struct rt_work* work); rt_err_t rt_workqueue_cancel_work(struct rt_workqueue* queue, struct rt_work* work); +rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue* queue, struct rt_work* work); rt_inline void rt_work_init(struct rt_work* work, void (*work_func)(struct rt_work* work, void* work_data), void* work_data) diff --git a/components/drivers/src/workqueue.c b/components/drivers/src/workqueue.c index 767c991d94cbded01842976874f921c6ffffb9ee..b83b76a88be317a7db701416f16d2ff462618947 100644 --- a/components/drivers/src/workqueue.c +++ b/components/drivers/src/workqueue.c @@ -27,6 +27,37 @@ #include #ifdef RT_USING_HEAP +rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue) +{ + rt_err_t result; + + rt_enter_critical(); + while (1) + { + /* try to take condition semaphore */ + result = rt_sem_trytake(&(queue->sem)); + if (result == -RT_ETIMEOUT) + { + /* it's timeout, release this semaphore */ + rt_sem_release(&(queue->sem)); + } + else if (result == RT_EOK) + { + /* keep the sem value = 0 */ + result = RT_EOK; + break; + } + else + { + result = -RT_ERROR; + break; + } + } + rt_exit_critical(); + + return result; +} + static void _workqueue_thread_entry(void* parameter) { rt_base_t level; @@ -58,6 +89,9 @@ static void _workqueue_thread_entry(void* parameter) /* clean current work */ queue->work_current = RT_NULL; rt_hw_interrupt_enable(level); + + /* ack work completion */ + _workqueue_work_completion(queue); } } @@ -71,6 +105,7 @@ struct rt_workqueue *rt_workqueue_create(const char* name, rt_uint16_t stack_siz /* initialize work list */ rt_list_init(&(queue->work_list)); queue->work_current = RT_NULL; + rt_sem_init(&(queue->sem), "wqueue", 0, RT_IPC_FLAG_FIFO); /* create the work thread */ queue->work_thread = rt_thread_create(name, _workqueue_thread_entry, queue, stack_size, priority, 10); @@ -174,6 +209,30 @@ rt_err_t rt_workqueue_cancel_work(struct rt_workqueue* queue, struct rt_work* wo return RT_EOK; } +rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue* queue, struct rt_work* work) +{ + rt_base_t level; + + RT_ASSERT(queue != RT_NULL); + RT_ASSERT(work != RT_NULL); + + level = rt_hw_interrupt_disable(); + if (queue->work_current == work) /* it's current work in the queue */ + { + rt_uint32_t recv; + + /* wait for work completion */ + rt_sem_take(&(queue->sem), RT_WAITING_FOREVER); + } + else + { + rt_list_remove(&(work->list)); + } + rt_hw_interrupt_enable(level); + + return RT_EOK; +} + rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue* queue) { struct rt_list_node *node, *next;