/* * File : pipe.c * This file is part of RT-Thread RTOS * COPYRIGHT (C) 2012, RT-Thread Development Team * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. * * Change Logs: * Date Author Notes * 2012-09-30 Bernard first version. */ #include #include #include static void _rt_pipe_resume_writer(struct rt_pipe_device *pipe) { if (!rt_list_isempty(&pipe->suspended_write_list)) { rt_thread_t thread; RT_ASSERT(pipe->flag & RT_PIPE_FLAG_BLOCK_WR); /* get suspended thread */ thread = rt_list_entry(pipe->suspended_write_list.next, struct rt_thread, tlist); /* resume the write thread */ rt_thread_resume(thread); rt_schedule(); } } static rt_size_t rt_pipe_read(rt_device_t dev, rt_off_t pos, void *buffer, rt_size_t size) { rt_uint32_t level; rt_thread_t thread; struct rt_pipe_device *pipe; rt_size_t read_nbytes; pipe = PIPE_DEVICE(dev); RT_ASSERT(pipe != RT_NULL); if (!(pipe->flag & RT_PIPE_FLAG_BLOCK_RD)) { level = rt_hw_interrupt_disable(); read_nbytes = rt_ringbuffer_get(&(pipe->ringbuffer), buffer, size); /* if the ringbuffer is empty, there won't be any writer waiting */ if (read_nbytes) _rt_pipe_resume_writer(pipe); rt_hw_interrupt_enable(level); return read_nbytes; } thread = rt_thread_self(); /* current context checking */ RT_DEBUG_NOT_IN_INTERRUPT; do { level = rt_hw_interrupt_disable(); read_nbytes = rt_ringbuffer_get(&(pipe->ringbuffer), buffer, size); if (read_nbytes == 0) { rt_thread_suspend(thread); /* waiting on suspended read list */ rt_list_insert_before(&(pipe->suspended_read_list), &(thread->tlist)); rt_hw_interrupt_enable(level); rt_schedule(); } else { _rt_pipe_resume_writer(pipe); rt_hw_interrupt_enable(level); break; } } while (read_nbytes == 0); return read_nbytes; } static void _rt_pipe_resume_reader(struct rt_pipe_device *pipe) { if (pipe->parent.rx_indicate) pipe->parent.rx_indicate(&pipe->parent, rt_ringbuffer_data_len(&pipe->ringbuffer)); if (!rt_list_isempty(&pipe->suspended_read_list)) { rt_thread_t thread; RT_ASSERT(pipe->flag & RT_PIPE_FLAG_BLOCK_RD); /* get suspended thread */ thread = rt_list_entry(pipe->suspended_read_list.next, struct rt_thread, tlist); /* resume the read thread */ rt_thread_resume(thread); rt_schedule(); } } static rt_size_t rt_pipe_write(rt_device_t dev, rt_off_t pos, const void *buffer, rt_size_t size) { rt_uint32_t level; rt_thread_t thread; struct rt_pipe_device *pipe; rt_size_t write_nbytes; pipe = PIPE_DEVICE(dev); RT_ASSERT(pipe != RT_NULL); if ((pipe->flag & RT_PIPE_FLAG_FORCE_WR) || !(pipe->flag & RT_PIPE_FLAG_BLOCK_WR)) { level = rt_hw_interrupt_disable(); if (pipe->flag & RT_PIPE_FLAG_FORCE_WR) write_nbytes = rt_ringbuffer_put_force(&(pipe->ringbuffer), buffer, size); else write_nbytes = rt_ringbuffer_put(&(pipe->ringbuffer), buffer, size); _rt_pipe_resume_reader(pipe); rt_hw_interrupt_enable(level); return write_nbytes; } thread = rt_thread_self(); /* current context checking */ RT_DEBUG_NOT_IN_INTERRUPT; do { level = rt_hw_interrupt_disable(); write_nbytes = rt_ringbuffer_put(&(pipe->ringbuffer), buffer, size); if (write_nbytes == 0) { /* pipe full, waiting on suspended write list */ rt_thread_suspend(thread); /* waiting on suspended read list */ rt_list_insert_before(&(pipe->suspended_write_list), &(thread->tlist)); rt_hw_interrupt_enable(level); rt_schedule(); } else { _rt_pipe_resume_reader(pipe); rt_hw_interrupt_enable(level); break; } } while (write_nbytes == 0); return write_nbytes; } static rt_err_t rt_pipe_control(rt_device_t dev, rt_uint8_t cmd, void *args) { return RT_EOK; } /** * This function will initialize a pipe device and put it under control of * resource management. * * @param pipe the pipe device * @param name the name of pipe device * @param flag the attribute of the pipe device * @param buf the buffer of pipe device * @param size the size of pipe device buffer * * @return the operation status, RT_EOK on successful */ rt_err_t rt_pipe_init(struct rt_pipe_device *pipe, const char *name, enum rt_pipe_flag flag, rt_uint8_t *buf, rt_size_t size) { RT_ASSERT(pipe); RT_ASSERT(buf); /* initialize suspended list */ rt_list_init(&pipe->suspended_read_list); rt_list_init(&pipe->suspended_write_list); /* initialize ring buffer */ rt_ringbuffer_init(&pipe->ringbuffer, buf, size); pipe->flag = flag; /* create pipe */ pipe->parent.type = RT_Device_Class_Char; pipe->parent.init = RT_NULL; pipe->parent.open = RT_NULL; pipe->parent.close = RT_NULL; pipe->parent.read = rt_pipe_read; pipe->parent.write = rt_pipe_write; pipe->parent.control = rt_pipe_control; return rt_device_register(&(pipe->parent), name, RT_DEVICE_FLAG_RDWR); } RTM_EXPORT(rt_pipe_init); /** * This function will detach a pipe device from resource management * * @param pipe the pipe device * * @return the operation status, RT_EOK on successful */ rt_err_t rt_pipe_detach(struct rt_pipe_device *pipe) { return rt_device_unregister(&pipe->parent); } RTM_EXPORT(rt_pipe_detach); #ifdef RT_USING_HEAP rt_err_t rt_pipe_create(const char *name, enum rt_pipe_flag flag, rt_size_t size) { rt_uint8_t *rb_memptr = RT_NULL; struct rt_pipe_device *pipe = RT_NULL; /* get aligned size */ size = RT_ALIGN(size, RT_ALIGN_SIZE); pipe = (struct rt_pipe_device *)rt_calloc(1, sizeof(struct rt_pipe_device)); if (pipe == RT_NULL) return -RT_ENOMEM; /* create ring buffer of pipe */ rb_memptr = rt_malloc(size); if (rb_memptr == RT_NULL) { rt_free(pipe); return -RT_ENOMEM; } return rt_pipe_init(pipe, name, flag, rb_memptr, size); } RTM_EXPORT(rt_pipe_create); void rt_pipe_destroy(struct rt_pipe_device *pipe) { if (pipe == RT_NULL) return; /* un-register pipe device */ rt_pipe_detach(pipe); /* release memory */ rt_free(pipe->ringbuffer.buffer_ptr); rt_free(pipe); return; } RTM_EXPORT(rt_pipe_destroy); #endif /* RT_USING_HEAP */