dataqueue.c 12.4 KB
Newer Older
1
/*
2
 * Copyright (c) 2006-2021, RT-Thread Development Team
3
 *
4
 * SPDX-License-Identifier: Apache-2.0
5 6 7 8
 *
 * Change Logs:
 * Date           Author       Notes
 * 2012-09-30     Bernard      first version.
9
 * 2016-10-31     armink       fix some resume push and pop thread bugs
10 11 12 13 14 15
 */

#include <rtthread.h>
#include <rtdevice.h>
#include <rthw.h>

16 17
#define DATAQUEUE_MAGIC  0xbead0e0e

18 19 20 21 22 23
struct rt_data_item
{
    const void *data_ptr;
    rt_size_t data_size;
};

O
ousugo 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36
/**
 * @brief This function will initialize a data queue.Calling this function will 
 * initialize the data queue control block and set the notification callback function.
 * 
 * @param queue The data queue object
 * @param size The maximum number of data in the data queue
 * @param lwm  Low water mark, when the number of data in the data queue is less than this value,
 * will wake up the thread waiting for write data.
 * @param evt_notify The notification callback function
 * 
 * @return the operation status, RT_EOK on successful, 
 * RT_ENOMEM on insufficient memory allocation failed.
 */
37 38 39 40 41 42 43
rt_err_t
rt_data_queue_init(struct rt_data_queue *queue,
                   rt_uint16_t size,
                   rt_uint16_t lwm,
                   void (*evt_notify)(struct rt_data_queue *queue, rt_uint32_t event))
{
    RT_ASSERT(queue != RT_NULL);
Q
qiyongzhong0 已提交
44
    RT_ASSERT(size > 0);
45 46 47

    queue->evt_notify = evt_notify;

O
ousugo 已提交
48
    queue->magic = DATAQUEUE_MAGIC; 
49 50 51 52 53
    queue->size = size;
    queue->lwm = lwm;

    queue->get_index = 0;
    queue->put_index = 0;
Q
qiyongzhong0 已提交
54 55
    queue->is_empty = 1;
    queue->is_full = 0;
56 57 58

    rt_list_init(&(queue->suspended_push_list));
    rt_list_init(&(queue->suspended_pop_list));
Q
qiyongzhong0 已提交
59

60 61 62 63 64 65 66 67 68 69
    queue->queue = (struct rt_data_item *)rt_malloc(sizeof(struct rt_data_item) * size);
    if (queue->queue == RT_NULL)
    {
        return -RT_ENOMEM;
    }

    return RT_EOK;
}
RTM_EXPORT(rt_data_queue_init);

O
ousugo 已提交
70 71 72 73 74 75 76 77 78 79 80
/**
 * @brief This function will write data to the data queue. If the data queue is full,
 * the thread will suspend for the specified amount of time.
 * 
 * @param queue The data queue object
 * @param data_ptr The buffer pointer of the data to be written
 * @param size The size in bytes of the data to be written
 * @param timeout The waiting time
 * 
 * @return  the operation status, RT_EOK on successful 
 */
81 82 83 84 85 86 87 88
rt_err_t rt_data_queue_push(struct rt_data_queue *queue,
                            const void *data_ptr,
                            rt_size_t data_size,
                            rt_int32_t timeout)
{
    rt_ubase_t  level;
    rt_thread_t thread;
    rt_err_t    result;
89

90
    RT_ASSERT(queue != RT_NULL);
Q
qiyongzhong0 已提交
91
    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
92 93 94 95 96

    result = RT_EOK;
    thread = rt_thread_self();

    level = rt_hw_interrupt_disable();
Q
qiyongzhong0 已提交
97
    while (queue->is_full)
98 99 100 101 102 103 104 105 106 107 108 109 110 111
    {
        /* queue is full */
        if (timeout == 0)
        {
            result = -RT_ETIMEOUT;

            goto __exit;
        }

        /* current context checking */
        RT_DEBUG_NOT_IN_INTERRUPT;

        /* reset thread error number */
        thread->error = RT_EOK;
112

113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
        /* suspend thread on the push list */
        rt_thread_suspend(thread);
        rt_list_insert_before(&(queue->suspended_push_list), &(thread->tlist));
        /* start timer */
        if (timeout > 0)
        {
            /* reset the timeout of thread timer and start it */
            rt_timer_control(&(thread->thread_timer),
                             RT_TIMER_CTRL_SET_TIME,
                             &timeout);
            rt_timer_start(&(thread->thread_timer));
        }

        /* enable interrupt */
        rt_hw_interrupt_enable(level);

        /* do schedule */
        rt_schedule();

        /* thread is waked up */
        result = thread->error;
        level = rt_hw_interrupt_disable();
135
        if (result != RT_EOK) goto __exit;
136 137
    }

Q
qiyongzhong0 已提交
138 139
    queue->queue[queue->put_index].data_ptr  = data_ptr;
    queue->queue[queue->put_index].data_size = data_size;
140
    queue->put_index += 1;
Q
qiyongzhong0 已提交
141 142 143 144 145 146 147 148 149
    if (queue->put_index == queue->size)
    {
        queue->put_index = 0;
    }
    queue->is_empty = 0;
    if (queue->put_index == queue->get_index)
    {
        queue->is_full = 1;
    }
150

151
    /* there is at least one thread in suspended list */
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
    if (!rt_list_isempty(&(queue->suspended_pop_list)))
    {
        /* get thread entry */
        thread = rt_list_entry(queue->suspended_pop_list.next,
                               struct rt_thread,
                               tlist);

        /* resume it */
        rt_thread_resume(thread);
        rt_hw_interrupt_enable(level);

        /* perform a schedule */
        rt_schedule();

        return result;
    }

__exit:
    rt_hw_interrupt_enable(level);
    if ((result == RT_EOK) && queue->evt_notify != RT_NULL)
    {
        queue->evt_notify(queue, RT_DATAQUEUE_EVENT_PUSH);
    }

    return result;
}
RTM_EXPORT(rt_data_queue_push);

O
ousugo 已提交
180 181 182 183 184 185 186 187 188 189 190 191 192 193
/**
 * @brief This function will pop data from the data queue. If the data queue is empty,
 * the thread will suspend for the specified amount of time.
 * 
 * @attention when the number of data in the data queue is less than lwm(low water mark),
 * will wake up the thread waiting for write data.
 * 
 * @param queue The data queue object
 * @param data_ptr The buffer pointer of the data to be fetched
 * @param size The size in bytes of the data to be fetched
 * @param timeout The waiting time
 * 
 * @return Operation status, RT_EOK on successful, RT_ETIMEOUT on timeout.
 */
194 195
rt_err_t rt_data_queue_pop(struct rt_data_queue *queue,
                           const void** data_ptr,
196
                           rt_size_t *size,
197 198 199 200 201
                           rt_int32_t timeout)
{
    rt_ubase_t  level;
    rt_thread_t thread;
    rt_err_t    result;
202

203
    RT_ASSERT(queue != RT_NULL);
Q
qiyongzhong0 已提交
204
    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
205 206 207 208 209 210 211
    RT_ASSERT(data_ptr != RT_NULL);
    RT_ASSERT(size != RT_NULL);

    result = RT_EOK;
    thread = rt_thread_self();

    level = rt_hw_interrupt_disable();
Q
qiyongzhong0 已提交
212
    while (queue->is_empty)
213 214 215 216 217 218 219 220 221 222 223 224 225
    {
        /* queue is empty */
        if (timeout == 0)
        {
            result = -RT_ETIMEOUT;
            goto __exit;
        }

        /* current context checking */
        RT_DEBUG_NOT_IN_INTERRUPT;

        /* reset thread error number */
        thread->error = RT_EOK;
226

227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
        /* suspend thread on the pop list */
        rt_thread_suspend(thread);
        rt_list_insert_before(&(queue->suspended_pop_list), &(thread->tlist));
        /* start timer */
        if (timeout > 0)
        {
            /* reset the timeout of thread timer and start it */
            rt_timer_control(&(thread->thread_timer),
                             RT_TIMER_CTRL_SET_TIME,
                             &timeout);
            rt_timer_start(&(thread->thread_timer));
        }

        /* enable interrupt */
        rt_hw_interrupt_enable(level);

        /* do schedule */
        rt_schedule();

        /* thread is waked up */
        result = thread->error;
        level  = rt_hw_interrupt_disable();
        if (result != RT_EOK)
            goto __exit;
    }

Q
qiyongzhong0 已提交
253 254
    *data_ptr = queue->queue[queue->get_index].data_ptr;
    *size     = queue->queue[queue->get_index].data_size;
255
    queue->get_index += 1;
Q
qiyongzhong0 已提交
256 257 258 259 260 261 262 263 264
    if (queue->get_index == queue->size)
    {
        queue->get_index = 0;
    }
    queue->is_full = 0;
    if (queue->put_index == queue->get_index)
    {
        queue->is_empty = 1;
    }
265

Q
qiyongzhong0 已提交
266
    if (rt_data_queue_len(queue) <= queue->lwm)
267
    {
268
        /* there is at least one thread in suspended list */
269 270 271 272 273 274 275 276 277 278 279 280 281 282
        if (!rt_list_isempty(&(queue->suspended_push_list)))
        {
            /* get thread entry */
            thread = rt_list_entry(queue->suspended_push_list.next,
                                   struct rt_thread,
                                   tlist);

            /* resume it */
            rt_thread_resume(thread);
            rt_hw_interrupt_enable(level);

            /* perform a schedule */
            rt_schedule();
        }
283 284 285 286
        else
        {
            rt_hw_interrupt_enable(level);
        }
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304

        if (queue->evt_notify != RT_NULL)
            queue->evt_notify(queue, RT_DATAQUEUE_EVENT_LWM);

        return result;
    }

__exit:
    rt_hw_interrupt_enable(level);
    if ((result == RT_EOK) && (queue->evt_notify != RT_NULL))
    {
        queue->evt_notify(queue, RT_DATAQUEUE_EVENT_POP);
    }

    return result;
}
RTM_EXPORT(rt_data_queue_pop);

O
ousugo 已提交
305 306 307 308 309 310 311 312 313
/**
 * @brief This function will fetching but retaining data in the data queue.
 * 
 * @param queue The data queue object
 * @param data_ptr The buffer pointer of the data to be fetched
 * @param size The size in bytes of the data to be fetched
 * 
 * @return The operation status, RT_EOK on successful 
 */
314
rt_err_t rt_data_queue_peek(struct rt_data_queue *queue,
315 316 317 318
                            const void** data_ptr,
                            rt_size_t *size)
{
    rt_ubase_t  level;
319

320
    RT_ASSERT(queue != RT_NULL);
Q
qiyongzhong0 已提交
321
    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
322

323
    if (queue->is_empty)
324 325 326 327
    {
        return -RT_EEMPTY;
    }

Q
qiyongzhong0 已提交
328 329 330 331
    level = rt_hw_interrupt_disable();

    *data_ptr = queue->queue[queue->get_index].data_ptr;
    *size     = queue->queue[queue->get_index].data_size;
332 333 334 335 336

    rt_hw_interrupt_enable(level);

    return RT_EOK;
}
337
RTM_EXPORT(rt_data_queue_peek);
338

O
ousugo 已提交
339 340 341 342 343 344
/**
 * @brief Reset a data queue. Calling this function will wake up all threads on the data queue
 * that are hanging and waiting.
 * 
 * @param queue The data queue object
 */
345 346
void rt_data_queue_reset(struct rt_data_queue *queue)
{
Q
qiyongzhong0 已提交
347
    rt_ubase_t  level;
348
    struct rt_thread *thread;
349

Q
qiyongzhong0 已提交
350
    RT_ASSERT(queue != RT_NULL);
351
    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
Q
qiyongzhong0 已提交
352 353 354 355 356 357 358

    level = rt_hw_interrupt_disable();

    queue->get_index = 0;
    queue->put_index = 0;
    queue->is_empty = 1;
    queue->is_full = 0;
359

Q
qiyongzhong0 已提交
360
    rt_hw_interrupt_enable(level);
361

362 363 364 365 366 367 368
    rt_enter_critical();
    /* wakeup all suspend threads */

    /* resume on pop list */
    while (!rt_list_isempty(&(queue->suspended_pop_list)))
    {
        /* disable interrupt */
Q
qiyongzhong0 已提交
369
        level = rt_hw_interrupt_disable();
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385

        /* get next suspend thread */
        thread = rt_list_entry(queue->suspended_pop_list.next,
                               struct rt_thread,
                               tlist);
        /* set error code to RT_ERROR */
        thread->error = -RT_ERROR;

        /*
         * resume thread
         * In rt_thread_resume function, it will remove current thread from
         * suspend list
         */
        rt_thread_resume(thread);

        /* enable interrupt */
Q
qiyongzhong0 已提交
386
        rt_hw_interrupt_enable(level);
387 388 389 390 391 392
    }

    /* resume on push list */
    while (!rt_list_isempty(&(queue->suspended_push_list)))
    {
        /* disable interrupt */
Q
qiyongzhong0 已提交
393
        level = rt_hw_interrupt_disable();
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409

        /* get next suspend thread */
        thread = rt_list_entry(queue->suspended_push_list.next,
                               struct rt_thread,
                               tlist);
        /* set error code to RT_ERROR */
        thread->error = -RT_ERROR;

        /*
         * resume thread
         * In rt_thread_resume function, it will remove current thread from
         * suspend list
         */
        rt_thread_resume(thread);

        /* enable interrupt */
Q
qiyongzhong0 已提交
410
        rt_hw_interrupt_enable(level);
411 412 413 414 415 416
    }
    rt_exit_critical();

    rt_schedule();
}
RTM_EXPORT(rt_data_queue_reset);
417

O
ousugo 已提交
418 419 420 421 422 423 424
/**
 * @brief Deinit a data queue.
 * 
 * @param queue The data queue object
 * 
 * @return operation status, RT_EOK on successful.
 */
425 426 427 428 429
rt_err_t rt_data_queue_deinit(struct rt_data_queue *queue)
{
    rt_ubase_t level;

    RT_ASSERT(queue != RT_NULL);
Q
qiyongzhong0 已提交
430
    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);
431 432 433 434

    /* wakeup all suspend threads */
    rt_data_queue_reset(queue);

Q
qiyongzhong0 已提交
435
    level = rt_hw_interrupt_disable();
436
    queue->magic = 0;
437
    rt_hw_interrupt_enable(level);
438

Q
qiyongzhong0 已提交
439
    rt_free(queue->queue);
440 441 442 443

    return RT_EOK;
}
RTM_EXPORT(rt_data_queue_deinit);
Q
qiyongzhong0 已提交
444

O
ousugo 已提交
445 446 447 448 449 450
/**
 * @brief This function get the number of data in the data queue.
 * 
 * @param queue The data queue object
 * @return The number of data in the data queue 
 */
Q
qiyongzhong0 已提交
451 452 453 454
rt_uint16_t rt_data_queue_len(struct rt_data_queue *queue)
{
    rt_ubase_t level;
    rt_int16_t len;
455

Q
qiyongzhong0 已提交
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473
    RT_ASSERT(queue != RT_NULL);
    RT_ASSERT(queue->magic == DATAQUEUE_MAGIC);

    if (queue->is_empty)
    {
        return 0;
    }

    level = rt_hw_interrupt_disable();

    if (queue->put_index > queue->get_index)
    {
        len = queue->put_index - queue->get_index;
    }
    else
    {
        len = queue->size + queue->put_index - queue->get_index;
    }
474

Q
qiyongzhong0 已提交
475 476 477 478 479 480
    rt_hw_interrupt_enable(level);

    return len;
}
RTM_EXPORT(rt_data_queue_len);