dataqueue.c 13.0 KB
Newer Older
1
/*
2
 * Copyright (c) 2006-2021, RT-Thread Development Team
3
 *
4
 * SPDX-License-Identifier: Apache-2.0
5 6 7 8
 *
 * Change Logs:
 * Date           Author       Notes
 * 2012-09-30     Bernard      first version.
9
 * 2016-10-31     armink       fix some resume push and pop thread bugs
10 11 12 13 14 15
 */

#include <rtthread.h>
#include <rtdevice.h>
#include <rthw.h>

16 17
#define DATAQUEUE_MAGIC  0xbead0e0e

18 19 20 21 22 23
struct rt_data_item
{
    const void *data_ptr;
    rt_size_t data_size;
};

O
ousugo 已提交
24
/**
O
ousugo 已提交
25 26
 * @brief    This function will initialize a data queue.Calling this function will
 *           initialize the data queue control block and set the notification callback function.
O
ousugo 已提交
27
 *
O
ousugo 已提交
28
 * @param    queue is a pointer to a data queue object.
O
ousugo 已提交
29
 *
O
ousugo 已提交
30 31 32 33 34 35 36 37 38 39
 * @param    size is the maximum number of data in the data queue.
 *
 * @param    lwm is low water mark.
 *           When the number of data in the data queue is less than this value,will
 *           wake up the thread waiting for write data.
 *
 * @param    evt_notify is the notification callback function.
 *
 * @return   Return the operation status. When the return value is RT_EOK, the initialization is successful.
 *           When the return value is RT_ENOMEM, it means insufficient memory allocation failed.
O
ousugo 已提交
40
 */
41 42 43 44 45 46 47
rt_err_t
rt_data_queue_init(struct rt_data_queue *queue,
                   rt_uint16_t size,
                   rt_uint16_t lwm,
                   void (*evt_notify)(struct rt_data_queue *queue, rt_uint32_t event))
{
    RT_ASSERT(queue != RT_NULL);
Q
qiyongzhong0 已提交
48
    RT_ASSERT(size > 0);
49 50 51

    queue->evt_notify = evt_notify;

O
ousugo 已提交
52
    queue->magic = DATAQUEUE_MAGIC;
53 54 55 56 57
    queue->size = size;
    queue->lwm = lwm;

    queue->get_index = 0;
    queue->put_index = 0;
Q
qiyongzhong0 已提交
58 59
    queue->is_empty = 1;
    queue->is_full = 0;
60 61 62

    rt_list_init(&(queue->suspended_push_list));
    rt_list_init(&(queue->suspended_pop_list));
Q
qiyongzhong0 已提交
63

64 65 66 67 68 69 70 71 72 73
    queue->queue = (struct rt_data_item *)rt_malloc(sizeof(struct rt_data_item) * size);
    if (queue->queue == RT_NULL)
    {
        return -RT_ENOMEM;
    }

    return RT_EOK;
}
RTM_EXPORT(rt_data_queue_init);

O
ousugo 已提交
74
/**
O
ousugo 已提交
75 76
 * @brief    This function will write data to the data queue. If the data queue is full,
 *           the thread will suspend for the specified amount of time.
O
ousugo 已提交
77
 *
O
ousugo 已提交
78 79 80
 * @param    queue is a pointer to a data queue object.
 * .
 * @param    data_ptr is the buffer pointer of the data to be written.
O
ousugo 已提交
81
 *
O
ousugo 已提交
82 83 84 85 86
 * @param    size is the size in bytes of the data to be written.
 *
 * @param    timeout is the waiting time.
 *
 * @return   Return the operation status. When the return value is RT_EOK, the operation is successful.
O
ousugo 已提交
87
 */
88 89 90 91 92 93 94 95
rt_err_t rt_data_queue_push(struct rt_data_queue *queue,
                            const void *data_ptr,
                            rt_size_t data_size,
                            rt_int32_t timeout)
{
    rt_ubase_t  level;
    rt_thread_t thread;
    rt_err_t    result;
96

97
    RT_ASSERT(queue != RT_NULL);
Q
qiyongzhong0 已提交
98
    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
99 100 101 102 103

    result = RT_EOK;
    thread = rt_thread_self();

    level = rt_hw_interrupt_disable();
Q
qiyongzhong0 已提交
104
    while (queue->is_full)
105 106 107 108 109 110 111 112 113 114 115 116 117 118
    {
        /* queue is full */
        if (timeout == 0)
        {
            result = -RT_ETIMEOUT;

            goto __exit;
        }

        /* current context checking */
        RT_DEBUG_NOT_IN_INTERRUPT;

        /* reset thread error number */
        thread->error = RT_EOK;
119

120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
        /* suspend thread on the push list */
        rt_thread_suspend(thread);
        rt_list_insert_before(&(queue->suspended_push_list), &(thread->tlist));
        /* start timer */
        if (timeout > 0)
        {
            /* reset the timeout of thread timer and start it */
            rt_timer_control(&(thread->thread_timer),
                             RT_TIMER_CTRL_SET_TIME,
                             &timeout);
            rt_timer_start(&(thread->thread_timer));
        }

        /* enable interrupt */
        rt_hw_interrupt_enable(level);

        /* do schedule */
        rt_schedule();

        /* thread is waked up */
        result = thread->error;
        level = rt_hw_interrupt_disable();
142
        if (result != RT_EOK) goto __exit;
143 144
    }

Q
qiyongzhong0 已提交
145 146
    queue->queue[queue->put_index].data_ptr  = data_ptr;
    queue->queue[queue->put_index].data_size = data_size;
147
    queue->put_index += 1;
Q
qiyongzhong0 已提交
148 149 150 151 152 153 154 155 156
    if (queue->put_index == queue->size)
    {
        queue->put_index = 0;
    }
    queue->is_empty = 0;
    if (queue->put_index == queue->get_index)
    {
        queue->is_full = 1;
    }
157

158
    /* there is at least one thread in suspended list */
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
    if (!rt_list_isempty(&(queue->suspended_pop_list)))
    {
        /* get thread entry */
        thread = rt_list_entry(queue->suspended_pop_list.next,
                               struct rt_thread,
                               tlist);

        /* resume it */
        rt_thread_resume(thread);
        rt_hw_interrupt_enable(level);

        /* perform a schedule */
        rt_schedule();

        return result;
    }

__exit:
    rt_hw_interrupt_enable(level);
    if ((result == RT_EOK) && queue->evt_notify != RT_NULL)
    {
        queue->evt_notify(queue, RT_DATAQUEUE_EVENT_PUSH);
    }

    return result;
}
RTM_EXPORT(rt_data_queue_push);

O
ousugo 已提交
187
/**
O
ousugo 已提交
188 189 190 191 192
 * @brief    This function will pop data from the data queue. If the data queue is empty,the thread
 *           will suspend for the specified amount of time.
 *
 * @note     when the number of data in the data queue is less than lwm(low water mark),will
 *           wake up the thread waiting for write data.
O
ousugo 已提交
193
 *
O
ousugo 已提交
194
 * @param    queue is a pointer to a data queue object.
O
ousugo 已提交
195
 *
O
ousugo 已提交
196
 * @param    data_ptr is the buffer pointer of the data to be fetched.
O
ousugo 已提交
197
 *
O
ousugo 已提交
198 199 200 201 202
 * @param    size is the size in bytes of the data to be fetched.
 *
 * @param    timeout is the waiting time.
 *
 * @return   Return the operation status. When the return value is RT_EOK, the operation is successful.
O
ousugo 已提交
203
 */
204 205
rt_err_t rt_data_queue_pop(struct rt_data_queue *queue,
                           const void** data_ptr,
206
                           rt_size_t *size,
207 208 209 210 211
                           rt_int32_t timeout)
{
    rt_ubase_t  level;
    rt_thread_t thread;
    rt_err_t    result;
212

213
    RT_ASSERT(queue != RT_NULL);
Q
qiyongzhong0 已提交
214
    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
215 216 217 218 219 220 221
    RT_ASSERT(data_ptr != RT_NULL);
    RT_ASSERT(size != RT_NULL);

    result = RT_EOK;
    thread = rt_thread_self();

    level = rt_hw_interrupt_disable();
Q
qiyongzhong0 已提交
222
    while (queue->is_empty)
223 224 225 226 227 228 229 230 231 232 233 234 235
    {
        /* queue is empty */
        if (timeout == 0)
        {
            result = -RT_ETIMEOUT;
            goto __exit;
        }

        /* current context checking */
        RT_DEBUG_NOT_IN_INTERRUPT;

        /* reset thread error number */
        thread->error = RT_EOK;
236

237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
        /* suspend thread on the pop list */
        rt_thread_suspend(thread);
        rt_list_insert_before(&(queue->suspended_pop_list), &(thread->tlist));
        /* start timer */
        if (timeout > 0)
        {
            /* reset the timeout of thread timer and start it */
            rt_timer_control(&(thread->thread_timer),
                             RT_TIMER_CTRL_SET_TIME,
                             &timeout);
            rt_timer_start(&(thread->thread_timer));
        }

        /* enable interrupt */
        rt_hw_interrupt_enable(level);

        /* do schedule */
        rt_schedule();

        /* thread is waked up */
        result = thread->error;
        level  = rt_hw_interrupt_disable();
        if (result != RT_EOK)
            goto __exit;
    }

Q
qiyongzhong0 已提交
263 264
    *data_ptr = queue->queue[queue->get_index].data_ptr;
    *size     = queue->queue[queue->get_index].data_size;
265
    queue->get_index += 1;
Q
qiyongzhong0 已提交
266 267 268 269 270 271 272 273 274
    if (queue->get_index == queue->size)
    {
        queue->get_index = 0;
    }
    queue->is_full = 0;
    if (queue->put_index == queue->get_index)
    {
        queue->is_empty = 1;
    }
275

Q
qiyongzhong0 已提交
276
    if (rt_data_queue_len(queue) <= queue->lwm)
277
    {
278
        /* there is at least one thread in suspended list */
279 280 281 282 283 284 285 286 287 288 289 290 291 292
        if (!rt_list_isempty(&(queue->suspended_push_list)))
        {
            /* get thread entry */
            thread = rt_list_entry(queue->suspended_push_list.next,
                                   struct rt_thread,
                                   tlist);

            /* resume it */
            rt_thread_resume(thread);
            rt_hw_interrupt_enable(level);

            /* perform a schedule */
            rt_schedule();
        }
293 294 295 296
        else
        {
            rt_hw_interrupt_enable(level);
        }
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314

        if (queue->evt_notify != RT_NULL)
            queue->evt_notify(queue, RT_DATAQUEUE_EVENT_LWM);

        return result;
    }

__exit:
    rt_hw_interrupt_enable(level);
    if ((result == RT_EOK) && (queue->evt_notify != RT_NULL))
    {
        queue->evt_notify(queue, RT_DATAQUEUE_EVENT_POP);
    }

    return result;
}
RTM_EXPORT(rt_data_queue_pop);

O
ousugo 已提交
315
/**
O
ousugo 已提交
316 317 318
 * @brief    This function will fetching but retaining data in the data queue.
 *
 * @param    queue is a pointer to a data queue object.
O
ousugo 已提交
319
 *
O
ousugo 已提交
320
 * @param    data_ptr is the buffer pointer of the data to be fetched.
O
ousugo 已提交
321
 *
O
ousugo 已提交
322 323 324
 * @param    size is the size in bytes of the data to be fetched.
 *
 * @return   Return the operation status. When the return value is RT_EOK, the operation is successful.
O
ousugo 已提交
325
 */
326
rt_err_t rt_data_queue_peek(struct rt_data_queue *queue,
327 328 329 330
                            const void** data_ptr,
                            rt_size_t *size)
{
    rt_ubase_t  level;
331

332
    RT_ASSERT(queue != RT_NULL);
Q
qiyongzhong0 已提交
333
    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
334

335
    if (queue->is_empty)
336 337 338 339
    {
        return -RT_EEMPTY;
    }

Q
qiyongzhong0 已提交
340 341 342 343
    level = rt_hw_interrupt_disable();

    *data_ptr = queue->queue[queue->get_index].data_ptr;
    *size     = queue->queue[queue->get_index].data_size;
344 345 346 347 348

    rt_hw_interrupt_enable(level);

    return RT_EOK;
}
349
RTM_EXPORT(rt_data_queue_peek);
350

O
ousugo 已提交
351
/**
O
ousugo 已提交
352 353 354 355
 * @brief    This function will reset a data queue.
 *
 * @note     Calling this function will wake up all threads on the data queue
 *           that are hanging and waiting.
O
ousugo 已提交
356
 *
O
ousugo 已提交
357
 * @param    queue is a pointer to a data queue object.
O
ousugo 已提交
358
 */
359 360
void rt_data_queue_reset(struct rt_data_queue *queue)
{
Q
qiyongzhong0 已提交
361
    rt_ubase_t  level;
362
    struct rt_thread *thread;
363

Q
qiyongzhong0 已提交
364
    RT_ASSERT(queue != RT_NULL);
365
    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
Q
qiyongzhong0 已提交
366 367 368 369 370 371 372

    level = rt_hw_interrupt_disable();

    queue->get_index = 0;
    queue->put_index = 0;
    queue->is_empty = 1;
    queue->is_full = 0;
373

Q
qiyongzhong0 已提交
374
    rt_hw_interrupt_enable(level);
375

376 377 378 379 380 381 382
    rt_enter_critical();
    /* wakeup all suspend threads */

    /* resume on pop list */
    while (!rt_list_isempty(&(queue->suspended_pop_list)))
    {
        /* disable interrupt */
Q
qiyongzhong0 已提交
383
        level = rt_hw_interrupt_disable();
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399

        /* get next suspend thread */
        thread = rt_list_entry(queue->suspended_pop_list.next,
                               struct rt_thread,
                               tlist);
        /* set error code to RT_ERROR */
        thread->error = -RT_ERROR;

        /*
         * resume thread
         * In rt_thread_resume function, it will remove current thread from
         * suspend list
         */
        rt_thread_resume(thread);

        /* enable interrupt */
Q
qiyongzhong0 已提交
400
        rt_hw_interrupt_enable(level);
401 402 403 404 405 406
    }

    /* resume on push list */
    while (!rt_list_isempty(&(queue->suspended_push_list)))
    {
        /* disable interrupt */
Q
qiyongzhong0 已提交
407
        level = rt_hw_interrupt_disable();
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423

        /* get next suspend thread */
        thread = rt_list_entry(queue->suspended_push_list.next,
                               struct rt_thread,
                               tlist);
        /* set error code to RT_ERROR */
        thread->error = -RT_ERROR;

        /*
         * resume thread
         * In rt_thread_resume function, it will remove current thread from
         * suspend list
         */
        rt_thread_resume(thread);

        /* enable interrupt */
Q
qiyongzhong0 已提交
424
        rt_hw_interrupt_enable(level);
425 426 427 428 429 430
    }
    rt_exit_critical();

    rt_schedule();
}
RTM_EXPORT(rt_data_queue_reset);
431

O
ousugo 已提交
432
/**
O
ousugo 已提交
433
 * @brief    This function will deinit a data queue.
O
ousugo 已提交
434
 *
O
ousugo 已提交
435
 * @param    queue is a pointer to a data queue object.
O
ousugo 已提交
436
 *
O
ousugo 已提交
437
 * @return   Return the operation status. When the return value is RT_EOK, the operation is successful.
O
ousugo 已提交
438
 */
439 440 441 442 443
rt_err_t rt_data_queue_deinit(struct rt_data_queue *queue)
{
    rt_ubase_t level;

    RT_ASSERT(queue != RT_NULL);
Q
qiyongzhong0 已提交
444
    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
445 446 447 448

    /* wakeup all suspend threads */
    rt_data_queue_reset(queue);

Q
qiyongzhong0 已提交
449
    level = rt_hw_interrupt_disable();
450
    queue->magic = 0;
451
    rt_hw_interrupt_enable(level);
452

Q
qiyongzhong0 已提交
453
    rt_free(queue->queue);
454 455 456 457

    return RT_EOK;
}
RTM_EXPORT(rt_data_queue_deinit);
Q
qiyongzhong0 已提交
458

O
ousugo 已提交
459
/**
O
ousugo 已提交
460 461 462
 * @brief    This function will get the number of data in the data queue.
 *
 * @param    queue is a pointer to a data queue object.
O
ousugo 已提交
463
 *
O
ousugo 已提交
464
 * @return   Return the number of data in the data queue.
O
ousugo 已提交
465
 */
Q
qiyongzhong0 已提交
466 467 468 469
rt_uint16_t rt_data_queue_len(struct rt_data_queue *queue)
{
    rt_ubase_t level;
    rt_int16_t len;
470

Q
qiyongzhong0 已提交
471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);

    if (queue->is_empty)
    {
        return 0;
    }

    level = rt_hw_interrupt_disable();

    if (queue->put_index > queue->get_index)
    {
        len = queue->put_index - queue->get_index;
    }
    else
    {
        len = queue->size + queue->put_index - queue->get_index;
    }
489

Q
qiyongzhong0 已提交
490 491 492 493 494 495
    rt_hw_interrupt_enable(level);

    return len;
}
RTM_EXPORT(rt_data_queue_len);