dataqueue.c 13.3 KB
Newer Older
1
/*
2
 * Copyright (c) 2006-2021, RT-Thread Development Team
3
 *
4
 * SPDX-License-Identifier: Apache-2.0
5 6 7 8
 *
 * Change Logs:
 * Date           Author       Notes
 * 2012-09-30     Bernard      first version.
9
 * 2016-10-31     armink       fix some resume push and pop thread bugs
10 11 12 13 14 15
 */

#include <rtthread.h>
#include <rtdevice.h>
#include <rthw.h>

16 17
#define DATAQUEUE_MAGIC  0xbead0e0e

18 19 20 21 22 23
struct rt_data_item
{
    const void *data_ptr;
    rt_size_t data_size;
};

O
ousugo 已提交
24
/**
O
ousugo 已提交
25
 * @brief    This function will initialize the data queue. Calling this function will
O
ousugo 已提交
26
 *           initialize the data queue control block and set the notification callback function.
O
ousugo 已提交
27
 *
O
ousugo 已提交
28
 * @param    queue is a pointer to the data queue object.
O
ousugo 已提交
29
 *
O
ousugo 已提交
30 31 32
 * @param    size is the maximum number of data in the data queue.
 *
 * @param    lwm is low water mark.
O
ousugo 已提交
33
 *           When the number of data in the data queue is less than this value, this function will
O
ousugo 已提交
34 35 36 37 38 39
 *           wake up the thread waiting for write data.
 *
 * @param    evt_notify is the notification callback function.
 *
 * @return   Return the operation status. When the return value is RT_EOK, the initialization is successful.
 *           When the return value is RT_ENOMEM, it means insufficient memory allocation failed.
O
ousugo 已提交
40
 */
41 42 43 44 45 46 47
rt_err_t
rt_data_queue_init(struct rt_data_queue *queue,
                   rt_uint16_t size,
                   rt_uint16_t lwm,
                   void (*evt_notify)(struct rt_data_queue *queue, rt_uint32_t event))
{
    RT_ASSERT(queue != RT_NULL);
Q
qiyongzhong0 已提交
48
    RT_ASSERT(size > 0);
49 50 51

    queue->evt_notify = evt_notify;

O
ousugo 已提交
52
    queue->magic = DATAQUEUE_MAGIC;
53 54 55 56 57
    queue->size = size;
    queue->lwm = lwm;

    queue->get_index = 0;
    queue->put_index = 0;
Q
qiyongzhong0 已提交
58 59
    queue->is_empty = 1;
    queue->is_full = 0;
60 61 62

    rt_list_init(&(queue->suspended_push_list));
    rt_list_init(&(queue->suspended_pop_list));
Q
qiyongzhong0 已提交
63

64 65 66 67 68 69 70 71 72 73
    queue->queue = (struct rt_data_item *)rt_malloc(sizeof(struct rt_data_item) * size);
    if (queue->queue == RT_NULL)
    {
        return -RT_ENOMEM;
    }

    return RT_EOK;
}
RTM_EXPORT(rt_data_queue_init);

O
ousugo 已提交
74
/**
O
ousugo 已提交
75 76
 * @brief    This function will write data to the data queue. If the data queue is full,
 *           the thread will suspend for the specified amount of time.
O
ousugo 已提交
77
 *
O
ousugo 已提交
78
 * @param    queue is a pointer to the data queue object.
O
ousugo 已提交
79 80
 * .
 * @param    data_ptr is the buffer pointer of the data to be written.
O
ousugo 已提交
81
 *
O
ousugo 已提交
82 83 84 85 86
 * @param    size is the size in bytes of the data to be written.
 *
 * @param    timeout is the waiting time.
 *
 * @return   Return the operation status. When the return value is RT_EOK, the operation is successful.
O
ousugo 已提交
87
 *           When the return value is RT_ETIMEOUT, it means the specified time out.
O
ousugo 已提交
88
 */
89 90 91 92 93 94 95 96
rt_err_t rt_data_queue_push(struct rt_data_queue *queue,
                            const void *data_ptr,
                            rt_size_t data_size,
                            rt_int32_t timeout)
{
    rt_ubase_t  level;
    rt_thread_t thread;
    rt_err_t    result;
97

98
    RT_ASSERT(queue != RT_NULL);
Q
qiyongzhong0 已提交
99
    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
100

101
    /* current context checking */
H
Henson 已提交
102
    RT_DEBUG_SCHEDULER_AVAILABLE(timeout != 0);
103

104 105 106 107
    result = RT_EOK;
    thread = rt_thread_self();

    level = rt_hw_interrupt_disable();
Q
qiyongzhong0 已提交
108
    while (queue->is_full)
109 110 111 112 113 114 115 116 117 118 119
    {
        /* queue is full */
        if (timeout == 0)
        {
            result = -RT_ETIMEOUT;

            goto __exit;
        }

        /* reset thread error number */
        thread->error = RT_EOK;
120

121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
        /* suspend thread on the push list */
        rt_thread_suspend(thread);
        rt_list_insert_before(&(queue->suspended_push_list), &(thread->tlist));
        /* start timer */
        if (timeout > 0)
        {
            /* reset the timeout of thread timer and start it */
            rt_timer_control(&(thread->thread_timer),
                             RT_TIMER_CTRL_SET_TIME,
                             &timeout);
            rt_timer_start(&(thread->thread_timer));
        }

        /* enable interrupt */
        rt_hw_interrupt_enable(level);

        /* do schedule */
        rt_schedule();

        /* thread is waked up */
        result = thread->error;
        level = rt_hw_interrupt_disable();
143
        if (result != RT_EOK) goto __exit;
144 145
    }

Q
qiyongzhong0 已提交
146 147
    queue->queue[queue->put_index].data_ptr  = data_ptr;
    queue->queue[queue->put_index].data_size = data_size;
148
    queue->put_index += 1;
Q
qiyongzhong0 已提交
149 150 151 152 153 154 155 156 157
    if (queue->put_index == queue->size)
    {
        queue->put_index = 0;
    }
    queue->is_empty = 0;
    if (queue->put_index == queue->get_index)
    {
        queue->is_full = 1;
    }
158

159
    /* there is at least one thread in suspended list */
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
    if (!rt_list_isempty(&(queue->suspended_pop_list)))
    {
        /* get thread entry */
        thread = rt_list_entry(queue->suspended_pop_list.next,
                               struct rt_thread,
                               tlist);

        /* resume it */
        rt_thread_resume(thread);
        rt_hw_interrupt_enable(level);

        /* perform a schedule */
        rt_schedule();

        return result;
    }

__exit:
    rt_hw_interrupt_enable(level);
    if ((result == RT_EOK) && queue->evt_notify != RT_NULL)
    {
        queue->evt_notify(queue, RT_DATAQUEUE_EVENT_PUSH);
    }

    return result;
}
RTM_EXPORT(rt_data_queue_push);

O
ousugo 已提交
188
/**
O
ousugo 已提交
189 190 191
 * @brief    This function will pop data from the data queue. If the data queue is empty,the thread
 *           will suspend for the specified amount of time.
 *
O
ousugo 已提交
192
 * @note     When the number of data in the data queue is less than lwm(low water mark), will
O
ousugo 已提交
193
 *           wake up the thread waiting for write data.
O
ousugo 已提交
194
 *
O
ousugo 已提交
195
 * @param    queue is a pointer to the data queue object.
O
ousugo 已提交
196
 *
O
ousugo 已提交
197
 * @param    data_ptr is the buffer pointer of the data to be fetched.
O
ousugo 已提交
198
 *
O
ousugo 已提交
199 200 201 202 203
 * @param    size is the size in bytes of the data to be fetched.
 *
 * @param    timeout is the waiting time.
 *
 * @return   Return the operation status. When the return value is RT_EOK, the operation is successful.
O
ousugo 已提交
204
 *           When the return value is RT_ETIMEOUT, it means the specified time out.
O
ousugo 已提交
205
 */
206 207
rt_err_t rt_data_queue_pop(struct rt_data_queue *queue,
                           const void** data_ptr,
208
                           rt_size_t *size,
209 210 211 212 213
                           rt_int32_t timeout)
{
    rt_ubase_t  level;
    rt_thread_t thread;
    rt_err_t    result;
214

215
    RT_ASSERT(queue != RT_NULL);
Q
qiyongzhong0 已提交
216
    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
217 218 219
    RT_ASSERT(data_ptr != RT_NULL);
    RT_ASSERT(size != RT_NULL);

220
    /* current context checking */
H
Henson 已提交
221
    RT_DEBUG_SCHEDULER_AVAILABLE(timeout != 0);
222

223 224 225 226
    result = RT_EOK;
    thread = rt_thread_self();

    level = rt_hw_interrupt_disable();
Q
qiyongzhong0 已提交
227
    while (queue->is_empty)
228 229 230 231 232 233 234 235 236 237
    {
        /* queue is empty */
        if (timeout == 0)
        {
            result = -RT_ETIMEOUT;
            goto __exit;
        }

        /* reset thread error number */
        thread->error = RT_EOK;
238

239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
        /* suspend thread on the pop list */
        rt_thread_suspend(thread);
        rt_list_insert_before(&(queue->suspended_pop_list), &(thread->tlist));
        /* start timer */
        if (timeout > 0)
        {
            /* reset the timeout of thread timer and start it */
            rt_timer_control(&(thread->thread_timer),
                             RT_TIMER_CTRL_SET_TIME,
                             &timeout);
            rt_timer_start(&(thread->thread_timer));
        }

        /* enable interrupt */
        rt_hw_interrupt_enable(level);

        /* do schedule */
        rt_schedule();

        /* thread is waked up */
        result = thread->error;
        level  = rt_hw_interrupt_disable();
        if (result != RT_EOK)
            goto __exit;
    }

Q
qiyongzhong0 已提交
265 266
    *data_ptr = queue->queue[queue->get_index].data_ptr;
    *size     = queue->queue[queue->get_index].data_size;
267
    queue->get_index += 1;
Q
qiyongzhong0 已提交
268 269 270 271 272 273 274 275 276
    if (queue->get_index == queue->size)
    {
        queue->get_index = 0;
    }
    queue->is_full = 0;
    if (queue->put_index == queue->get_index)
    {
        queue->is_empty = 1;
    }
277

Q
qiyongzhong0 已提交
278
    if (rt_data_queue_len(queue) <= queue->lwm)
279
    {
280
        /* there is at least one thread in suspended list */
281 282 283 284 285 286 287 288 289 290 291 292 293 294
        if (!rt_list_isempty(&(queue->suspended_push_list)))
        {
            /* get thread entry */
            thread = rt_list_entry(queue->suspended_push_list.next,
                                   struct rt_thread,
                                   tlist);

            /* resume it */
            rt_thread_resume(thread);
            rt_hw_interrupt_enable(level);

            /* perform a schedule */
            rt_schedule();
        }
295 296 297 298
        else
        {
            rt_hw_interrupt_enable(level);
        }
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316

        if (queue->evt_notify != RT_NULL)
            queue->evt_notify(queue, RT_DATAQUEUE_EVENT_LWM);

        return result;
    }

__exit:
    rt_hw_interrupt_enable(level);
    if ((result == RT_EOK) && (queue->evt_notify != RT_NULL))
    {
        queue->evt_notify(queue, RT_DATAQUEUE_EVENT_POP);
    }

    return result;
}
RTM_EXPORT(rt_data_queue_pop);

O
ousugo 已提交
317
/**
O
ousugo 已提交
318
 * @brief    This function will fetch but retaining data in the data queue.
O
ousugo 已提交
319
 *
O
ousugo 已提交
320
 * @param    queue is a pointer to the data queue object.
O
ousugo 已提交
321
 *
O
ousugo 已提交
322
 * @param    data_ptr is the buffer pointer of the data to be fetched.
O
ousugo 已提交
323
 *
O
ousugo 已提交
324 325 326
 * @param    size is the size in bytes of the data to be fetched.
 *
 * @return   Return the operation status. When the return value is RT_EOK, the operation is successful.
O
ousugo 已提交
327
 *           When the return value is -RT_EEMPTY, it means the data queue is empty.
O
ousugo 已提交
328
 */
329
rt_err_t rt_data_queue_peek(struct rt_data_queue *queue,
330 331 332 333
                            const void** data_ptr,
                            rt_size_t *size)
{
    rt_ubase_t  level;
334

335
    RT_ASSERT(queue != RT_NULL);
Q
qiyongzhong0 已提交
336
    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
337

338
    if (queue->is_empty)
339 340 341 342
    {
        return -RT_EEMPTY;
    }

Q
qiyongzhong0 已提交
343 344 345 346
    level = rt_hw_interrupt_disable();

    *data_ptr = queue->queue[queue->get_index].data_ptr;
    *size     = queue->queue[queue->get_index].data_size;
347 348 349 350 351

    rt_hw_interrupt_enable(level);

    return RT_EOK;
}
352
RTM_EXPORT(rt_data_queue_peek);
353

O
ousugo 已提交
354
/**
O
ousugo 已提交
355
 * @brief    This function will reset the data queue.
O
ousugo 已提交
356 357 358
 *
 * @note     Calling this function will wake up all threads on the data queue
 *           that are hanging and waiting.
O
ousugo 已提交
359
 *
O
ousugo 已提交
360
 * @param    queue is a pointer to the data queue object.
O
ousugo 已提交
361
 */
362 363
void rt_data_queue_reset(struct rt_data_queue *queue)
{
Q
qiyongzhong0 已提交
364
    rt_ubase_t  level;
365
    struct rt_thread *thread;
366

Q
qiyongzhong0 已提交
367
    RT_ASSERT(queue != RT_NULL);
368
    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
Q
qiyongzhong0 已提交
369 370 371 372 373 374 375

    level = rt_hw_interrupt_disable();

    queue->get_index = 0;
    queue->put_index = 0;
    queue->is_empty = 1;
    queue->is_full = 0;
376

Q
qiyongzhong0 已提交
377
    rt_hw_interrupt_enable(level);
378

379 380 381 382 383 384 385
    rt_enter_critical();
    /* wakeup all suspend threads */

    /* resume on pop list */
    while (!rt_list_isempty(&(queue->suspended_pop_list)))
    {
        /* disable interrupt */
Q
qiyongzhong0 已提交
386
        level = rt_hw_interrupt_disable();
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402

        /* get next suspend thread */
        thread = rt_list_entry(queue->suspended_pop_list.next,
                               struct rt_thread,
                               tlist);
        /* set error code to RT_ERROR */
        thread->error = -RT_ERROR;

        /*
         * resume thread
         * In rt_thread_resume function, it will remove current thread from
         * suspend list
         */
        rt_thread_resume(thread);

        /* enable interrupt */
Q
qiyongzhong0 已提交
403
        rt_hw_interrupt_enable(level);
404 405 406 407 408 409
    }

    /* resume on push list */
    while (!rt_list_isempty(&(queue->suspended_push_list)))
    {
        /* disable interrupt */
Q
qiyongzhong0 已提交
410
        level = rt_hw_interrupt_disable();
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426

        /* get next suspend thread */
        thread = rt_list_entry(queue->suspended_push_list.next,
                               struct rt_thread,
                               tlist);
        /* set error code to RT_ERROR */
        thread->error = -RT_ERROR;

        /*
         * resume thread
         * In rt_thread_resume function, it will remove current thread from
         * suspend list
         */
        rt_thread_resume(thread);

        /* enable interrupt */
Q
qiyongzhong0 已提交
427
        rt_hw_interrupt_enable(level);
428 429 430 431 432 433
    }
    rt_exit_critical();

    rt_schedule();
}
RTM_EXPORT(rt_data_queue_reset);
434

O
ousugo 已提交
435
/**
O
ousugo 已提交
436
 * @brief    This function will deinit the data queue.
O
ousugo 已提交
437
 *
O
ousugo 已提交
438
 * @param    queue is a pointer to the data queue object.
O
ousugo 已提交
439
 *
O
ousugo 已提交
440
 * @return   Return the operation status. When the return value is RT_EOK, the operation is successful.
O
ousugo 已提交
441
 */
442 443 444 445 446
rt_err_t rt_data_queue_deinit(struct rt_data_queue *queue)
{
    rt_ubase_t level;

    RT_ASSERT(queue != RT_NULL);
Q
qiyongzhong0 已提交
447
    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
448 449 450 451

    /* wakeup all suspend threads */
    rt_data_queue_reset(queue);

Q
qiyongzhong0 已提交
452
    level = rt_hw_interrupt_disable();
453
    queue->magic = 0;
454
    rt_hw_interrupt_enable(level);
455

Q
qiyongzhong0 已提交
456
    rt_free(queue->queue);
457 458 459 460

    return RT_EOK;
}
RTM_EXPORT(rt_data_queue_deinit);
Q
qiyongzhong0 已提交
461

O
ousugo 已提交
462
/**
O
ousugo 已提交
463 464
 * @brief    This function will get the number of data in the data queue.
 *
O
ousugo 已提交
465
 * @param    queue is a pointer to the data queue object.
O
ousugo 已提交
466
 *
O
ousugo 已提交
467
 * @return   Return the number of data in the data queue.
O
ousugo 已提交
468
 */
Q
qiyongzhong0 已提交
469 470 471 472
rt_uint16_t rt_data_queue_len(struct rt_data_queue *queue)
{
    rt_ubase_t level;
    rt_int16_t len;
473

Q
qiyongzhong0 已提交
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);

    if (queue->is_empty)
    {
        return 0;
    }

    level = rt_hw_interrupt_disable();

    if (queue->put_index > queue->get_index)
    {
        len = queue->put_index - queue->get_index;
    }
    else
    {
        len = queue->size + queue->put_index - queue->get_index;
    }
492

Q
qiyongzhong0 已提交
493 494 495 496 497 498
    rt_hw_interrupt_enable(level);

    return len;
}
RTM_EXPORT(rt_data_queue_len);