FreeRTOS的队列介绍以及怎么实现互斥访问,休眠唤醒以及保存数据(环形缓冲区)
前面介绍完了FreeRTOS的一些核心功能,如任务切换,创建任务等等,并将煮包从ARM内核以及内存的视角的相关思考进行了分享,从这里开始介绍FreeRTOS的另外一个板块,就是任务间通信机制,如队列、信号量、互斥量、事件组这些,今天分享的是FreeRTOS队列的学习笔记;是关于标题中的互斥访问、休眠唤醒以及环形缓冲区都会在文章中一一介绍到~~~~关于队列,数据结构里面的描述就不做介绍啦~,最重要的
前言
前面介绍完了FreeRTOS的一些核心功能,如任务切换,创建任务等等,并将煮包从ARM内核以及内存的视角的相关思考进行了分享,从这里开始介绍FreeRTOS的另外一个板块,就是任务间通信机制,如队列、信号量、互斥量、事件组这些,今天分享的是FreeRTOS队列的学习笔记;是关于标题中的互斥访问、休眠唤醒以及环形缓冲区都会在文章中一一介绍到~~~~
一、FreeRTOS队列的基础知识
FreeRTOS队列的核心:
关于队列,数据结构里面的描述就不做介绍啦~,最重要的就是队列是FIFO结构,即数据先进先出,对于FreeRTOS的队列,它有如下几个核心知识点:
1. 关中断(对应标题的实现互斥访问);
2.环形缓冲区(用于存放队列的数据)
3.链表(分为等待发送链表和等待获取链表,具体的后面细说)
介绍队列结构体(含有ringbuffer):
下面对队列的结构体进行一些介绍:
typedef struct QueueDefinition
{
int8_t * pcHead;
int8_t * pcWriteTo;
union
{
QueuePointers_t xQueue;
SemaphoreData_t xSemaphore;
} u;
List_t xTasksWaitingToSend;
List_t xTasksWaitingToReceive;
volatile UBaseType_t uxMessagesWaiting;
UBaseType_t uxLength
UBaseType_t uxItemSize;
volatile int8_t cRxLock;
volatile int8_t cTxLock;
#if ( ( configSUPPORT_STATIC_ALLOCATION == 1 ) && ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) )
uint8_t ucStaticallyAllocated;
#endif
#if ( configUSE_QUEUE_SETS == 1 )
struct QueueDefinition * pxQueueSetContainer;
#endif
#if ( configUSE_TRACE_FACILITY == 1 )
UBaseType_t uxQueueNumber;
uint8_t ucQueueType;
#endif
} xQUEUE;

通过上面两个图可以看到,FreeRTOS的队列结构体里面含有头尾指针以及写指针和读指针;
那么pcReadFrom和pcWriteTo两个指针就构成环形缓冲区,即ringbuffer;用来存放队列项,可以上网查一下环形缓冲区是啥嘿嘿~,这里就不介绍啦~~~
下面就是两个链表,一个是xTasksWaitingToSend,用来存放当队列满时,如果有任务想要写队列,就会将该任务挂载到此链表;一个是xTasksWaitingToReceive,用来存放当队列空时,如果有任务想读队列,就会将该任务挂载到此链表;

然后就是这三个,uxMessagesWaiting是一个计数值,用来表示队列里面Item的数量,uxLength是指的队列的长度,uxItemSize是队列每一个队列项的大小,由此可知,队列环形缓冲区的总大小为:uxLength*uxItemSize;

其他参数就是关于上锁和可以配置的参数,暂时不介绍了,因为有很多其他博主写的非常完善和全面嘿嘿;
二、FreeRTOS队列的源码解析
创建队列:
其实队列的创建也很简单,就是先在FreeRTOS管理的堆栈空间中申请一片内存,包含了队列结构体头和环形缓冲区的大小,然后就是给队列结构体头赋初始值;让我们详细看一下这个队列创建的函数~~~
#if ( configSUPPORT_DYNAMIC_ALLOCATION == 1 )
QueueHandle_t xQueueGenericCreate( const UBaseType_t uxQueueLength,
const UBaseType_t uxItemSize,
const uint8_t ucQueueType )
{
Queue_t * pxNewQueue;
size_t xQueueSizeInBytes;
uint8_t * pucQueueStorage;
configASSERT( uxQueueLength > ( UBaseType_t ) 0 );
xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize ); /*lint !e961 MISRA exception as the casts are only redundant for some ports. */
/* Check for multiplication overflow. */
configASSERT( ( uxItemSize == 0 ) || ( uxQueueLength == ( xQueueSizeInBytes / uxItemSize ) ) );
/* Check for addition overflow. */
configASSERT( ( sizeof( Queue_t ) + xQueueSizeInBytes ) > xQueueSizeInBytes );
pxNewQueue = ( Queue_t * ) pvPortMalloc( sizeof( Queue_t ) + xQueueSizeInBytes ); /*lint !e9087 !e9079 see comment above. */
if( pxNewQueue != NULL )
{
pucQueueStorage = ( uint8_t * ) pxNewQueue;
pucQueueStorage += sizeof( Queue_t );
#if ( configSUPPORT_STATIC_ALLOCATION == 1 )
{
pxNewQueue->ucStaticallyAllocated = pdFALSE;
}
#endif /* configSUPPORT_STATIC_ALLOCATION */
prvInitialiseNewQueue( uxQueueLength, uxItemSize, pucQueueStorage, ucQueueType, pxNewQueue );
}
else
{
traceQUEUE_CREATE_FAILED( ucQueueType );
mtCOVERAGE_TEST_MARKER();
}
return pxNewQueue;
}
#endif /* configSUPPORT_STATIC_ALLOCATION */
可以看到这两句就是计算了环形缓冲区的大小,然后从堆中malloc一片内存~
1. 计算环形缓冲区大小:xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize );
2. 从堆中申请内存,包括队列结构体头和ringbuffer:pxNewQueue = ( Queue_t * ) pvPortMalloc( sizeof( Queue_t ) + xQueueSizeInBytes );
如果内存申请成功,也就是pxNewQueue不为NULL,则让pucQueueStorage指向环形缓冲区的开头;并调用prvInitialiseNewQueue来初始化其他结构体头的字段;否则执行else部分的测试呀或者其他的,通常功能需要我们自己实现;
那我们来看一下prvInitialiseNewQueue这个函数具体做了什么吧~
static void prvInitialiseNewQueue( const UBaseType_t uxQueueLength,
const UBaseType_t uxItemSize,
uint8_t * pucQueueStorage,
const uint8_t ucQueueType,
Queue_t * pxNewQueue )
{
( void ) ucQueueType;
if( uxItemSize == ( UBaseType_t ) 0 )
{
pxNewQueue->pcHead = ( int8_t * ) pxNewQueue;
}
else
{
pxNewQueue->pcHead = ( int8_t * ) pucQueueStorage;
}
pxNewQueue->uxLength = uxQueueLength;
pxNewQueue->uxItemSize = uxItemSize;
( void ) xQueueGenericReset( pxNewQueue, pdTRUE );
#if ( configUSE_TRACE_FACILITY == 1 )
{
pxNewQueue->ucQueueType = ucQueueType;
}
#endif /* configUSE_TRACE_FACILITY */
#if ( configUSE_QUEUE_SETS == 1 )
{
pxNewQueue->pxQueueSetContainer = NULL;
}
#endif /* configUSE_QUEUE_SETS */
traceQUEUE_CREATE( pxNewQueue );
}
这个函数也很简单,主要就是判断每个队列项的大小是否是0,如果是,就把pcHead指向结构体头的起始位置,如果不为0,则pcHead指向环形缓冲区的起始位置;然后调用( void ) xQueueGenericReset( pxNewQueue, pdTRUE );函数,此函数如下:
BaseType_t xQueueGenericReset( QueueHandle_t xQueue,
BaseType_t xNewQueue )
{
Queue_t * const pxQueue = xQueue;
configASSERT( pxQueue );
taskENTER_CRITICAL();
{
pxQueue->u.xQueue.pcTail = pxQueue->pcHead + ( pxQueue->uxLength * pxQueue->uxItemSize );
pxQueue->uxMessagesWaiting = ( UBaseType_t ) 0U;
pxQueue->pcWriteTo = pxQueue->pcHead;
pxQueue->u.xQueue.pcReadFrom = pxQueue->pcHead + ( ( pxQueue->uxLength - 1U ) * pxQueue->uxItemSize );
pxQueue->cRxLock = queueUNLOCKED;
pxQueue->cTxLock = queueUNLOCKED;
if( xNewQueue == pdFALSE )
{
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE )
{
queueYIELD_IF_USING_PREEMPTION();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
/* Ensure the event queues start in the correct state. */
vListInitialise( &( pxQueue->xTasksWaitingToSend ) );
vListInitialise( &( pxQueue->xTasksWaitingToReceive ) );
}
}
taskEXIT_CRITICAL();
return pdPASS;
}
这个函数也很简单,主要还是对结构体头里面的字段进行初始化~,值得注意的是,这里在进行初始化操作之前需要先关中断,也就是实现了标题里面的互斥访问~;
1. 阅读代码,很容易可以理解pcTail 指向pcHead偏移环形缓冲区大小的位置,若每个队列项的大小是0,则pcTail和pcHead都指向结构体头的位置;若不为0,则pcHead指向ringbuffer的起始位置,pcTail指向ringbuffer的结束位置;
2. uxMessagesWaiting参数初始化为0;
3. pcWriteTo初始化是指向pcHead的,也就是当uxItemSize 为0时,pcWriteTo和pcHead和pcTail都指向结构体头的位置,若不为零,pcWriteTo指向ringbuffer的起始位置,也就是我们是从ringbuffer的开头开始写入数据,逻辑也合理嘿嘿;
4. pcReadFrom初始化指向队列最后一个队列项的起始位置,也就是我们从ringbuffer的最后一项开始读取数据;代码如下:
pxQueue->u.xQueue.pcReadFrom = pxQueue->pcHead + ( ( pxQueue->uxLength - 1U ) * pxQueue->uxItemSize );
5. 最后给两个上锁的参数初始值,就是queueUNLOCKED(-1),最后初始化两个链表
到此整个队列的创建工作就完成了~~,是不是和创建任务及其相似哈哈哈,还比创建任务简单,最后附图煮包在正点原子上截图的一张图片:

队列读:
队列读取,也是比较简单的,就分为三步,先关中断(实现互斥),再分有数据和无数据两种情况,有数据就copy数据,然后如果有任务在等待写队列,就唤醒队列,没有数据就返回ERR或者将任务休眠,接下来我们来看一下FreeRTOS的源码时怎么实现的吧~,由于FreeRTOS的队列获取函数实在太长,这里就放函数定义,然后截取关键的函数实现部分来分析啦~
BaseType_t xQueueReceive( QueueHandle_t xQueue,
void * const pvBuffer,
TickType_t xTicksToWait )
从定义看,我们调用这个接口后,就会从xQueue队列中读取数据并存放到pvBuffer里面,如果队列为空,就会将读取队列的任务阻塞xTicksToWait 长的时间;
然后看一下主要的函数实现部分:
有数据:
以下这个for循环里面就实现了队列读的有数据时的功能,嗯~~~应该能看懂吧~哈哈哈,就是判断一下uxMessagesWaiting 是否大于0,是就是有数据,copy数据到pvBuffer ,然后uxMessagesWaiting减1;
for( ; ; )
{
taskENTER_CRITICAL();
{
const UBaseType_t uxMessagesWaiting = pxQueue->uxMessagesWaiting;
if( uxMessagesWaiting > ( UBaseType_t ) 0 )
{
prvCopyDataFromQueue( pxQueue, pvBuffer );
traceQUEUE_RECEIVE( pxQueue );
pxQueue->uxMessagesWaiting = uxMessagesWaiting - ( UBaseType_t ) 1;
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE )
{
queueYIELD_IF_USING_PREEMPTION();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
mtCOVERAGE_TEST_MARKER();
}
taskEXIT_CRITICAL();
return pdPASS;
}
else
{
if( xTicksToWait == ( TickType_t ) 0 )
{
taskEXIT_CRITICAL();
traceQUEUE_RECEIVE_FAILED( pxQueue );
return errQUEUE_EMPTY;
}
else if( xEntryTimeSet == pdFALSE )
{
vTaskInternalSetTimeOutState( &xTimeOut );
xEntryTimeSet = pdTRUE;
}
else
{
/* Entry time was already set. */
mtCOVERAGE_TEST_MARKER();
}
}
}
taskEXIT_CRITICAL();
如果等待写这个队列的链表不为空,就唤醒一下链表里面的第一个任务(从等待写队列链表中移除,添加到对应优先级的就绪链表);具体的实现就是下面这个函数里面的如图所示内容;
( void ) uxListRemove( &( pxUnblockedTCB->xEventListItem ) );

更加详细的可以下载FreeRTOS源码进行查看啊哈哈哈哈~~~
无数据(实现真正的任务休眠):
当队列中没有数据的时候呢?就看以下代码:
else
{
if( xTicksToWait == ( TickType_t ) 0 )
{
taskEXIT_CRITICAL();
traceQUEUE_RECEIVE_FAILED( pxQueue );
return errQUEUE_EMPTY;
}
else if( xEntryTimeSet == pdFALSE )
{
vTaskInternalSetTimeOutState( &xTimeOut );
xEntryTimeSet = pdTRUE;
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
当任务传入的等待参数为0的时候,直接返回errQUEUE_EMPTY,如果不是0,则执行如下代码,将当前任务插入到队列的xTasksWaitingToReceive链表,最后再将当前任务添加到阻塞链表,阻塞时间就是传入的xTicksToWait;
void vTaskPlaceOnEventList( List_t * const pxEventList,
const TickType_t xTicksToWait )
{
configASSERT( pxEventList );
vListInsert( pxEventList, &( pxCurrentTCB->xEventListItem ) );
prvAddCurrentTaskToDelayedList( xTicksToWait, pdTRUE );
}
当任务被添加到阻塞链表后就实现了真正的任务休眠啦~,休眠位置如下:
具体来说就是执行任务切换的地方;

当后续任务被唤醒后,又会从这里开始往下执行,最后回到for循环开头,再判断uxMessagesWaiting,有没有数据,此时肯定是有的啦,因为有数据才会唤醒相应的任务呀~
队列写:
队列写和队列读基本是完全对称的行为,首先就是关中断,其次分为有空间或者无空间,有空间直接往队列里面copy数据,copy完后判断一下有没有任务等待读取数据,有就把它唤醒(唤醒和队列读基本一样,就是将任务从xTasksWaitingToReceive移除,再添加到对应优先级的就绪列表),如果没有空间了,就将当前任务挂载到队列的xTasksWaitingToSend链表,然后将当前任务从就绪链表中移动到阻塞链表;因为基本和队列读一样,这里就只放一段源码作为标记了~~
BaseType_t xQueueGenericSend( QueueHandle_t xQueue,
const void * const pvItemToQueue,
TickType_t xTicksToWait,
const BaseType_t xCopyPosition )
{
BaseType_t xEntryTimeSet = pdFALSE, xYieldRequired;
TimeOut_t xTimeOut;
Queue_t * const pxQueue = xQueue;
configASSERT( pxQueue );
configASSERT( !( ( pvItemToQueue == NULL ) && ( pxQueue->uxItemSize != ( UBaseType_t ) 0U ) ) );
configASSERT( !( ( xCopyPosition == queueOVERWRITE ) && ( pxQueue->uxLength != 1 ) ) );
#if ( ( INCLUDE_xTaskGetSchedulerState == 1 ) || ( configUSE_TIMERS == 1 ) )
{
configASSERT( !( ( xTaskGetSchedulerState() == taskSCHEDULER_SUSPENDED ) && ( xTicksToWait != 0 ) ) );
}
#endif
/*lint -save -e904 This function relaxes the coding standard somewhat to
* allow return statements within the function itself. This is done in the
* interest of execution time efficiency. */
for( ; ; )
{
taskENTER_CRITICAL();
{
/* Is there room on the queue now? The running task must be the
* highest priority task wanting to access the queue. If the head item
* in the queue is to be overwritten then it does not matter if the
* queue is full. */
if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) )
{
traceQUEUE_SEND( pxQueue );
#if ( configUSE_QUEUE_SETS == 1 )
{
const UBaseType_t uxPreviousMessagesWaiting = pxQueue->uxMessagesWaiting;
xYieldRequired = prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );
if( pxQueue->pxQueueSetContainer != NULL )
{
if( ( xCopyPosition == queueOVERWRITE ) && ( uxPreviousMessagesWaiting != ( UBaseType_t ) 0 ) )
{
/* Do not notify the queue set as an existing item
* was overwritten in the queue so the number of items
* in the queue has not changed. */
mtCOVERAGE_TEST_MARKER();
}
else if( prvNotifyQueueSetContainer( pxQueue ) != pdFALSE )
{
/* The queue is a member of a queue set, and posting
* to the queue set caused a higher priority task to
* unblock. A context switch is required. */
queueYIELD_IF_USING_PREEMPTION();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else
{
/* If there was a task waiting for data to arrive on the
* queue then unblock it now. */
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
{
/* The unblocked task has a priority higher than
* our own so yield immediately. Yes it is ok to
* do this from within the critical section - the
* kernel takes care of that. */
queueYIELD_IF_USING_PREEMPTION();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else if( xYieldRequired != pdFALSE )
{
/* This path is a special case that will only get
* executed if the task was holding multiple mutexes
* and the mutexes were given back in an order that is
* different to that in which they were taken. */
queueYIELD_IF_USING_PREEMPTION();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
}
#else /* configUSE_QUEUE_SETS */
{
xYieldRequired = prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );
/* If there was a task waiting for data to arrive on the
* queue then unblock it now. */
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
{
/* The unblocked task has a priority higher than
* our own so yield immediately. Yes it is ok to do
* this from within the critical section - the kernel
* takes care of that. */
queueYIELD_IF_USING_PREEMPTION();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
else if( xYieldRequired != pdFALSE )
{
/* This path is a special case that will only get
* executed if the task was holding multiple mutexes and
* the mutexes were given back in an order that is
* different to that in which they were taken. */
queueYIELD_IF_USING_PREEMPTION();
}
else
{
mtCOVERAGE_TEST_MARKER();
}
}
#endif /* configUSE_QUEUE_SETS */
taskEXIT_CRITICAL();
return pdPASS;
}
else
{
if( xTicksToWait == ( TickType_t ) 0 )
{
/* The queue was full and no block time is specified (or
* the block time has expired) so leave now. */
taskEXIT_CRITICAL();
/* Return to the original privilege level before exiting
* the function. */
traceQUEUE_SEND_FAILED( pxQueue );
return errQUEUE_FULL;
}
else if( xEntryTimeSet == pdFALSE )
{
/* The queue was full and a block time was specified so
* configure the timeout structure. */
vTaskInternalSetTimeOutState( &xTimeOut );
xEntryTimeSet = pdTRUE;
}
else
{
/* Entry time was already set. */
mtCOVERAGE_TEST_MARKER();
}
}
}
taskEXIT_CRITICAL();
/* Interrupts and other tasks can send to and receive from the queue
* now the critical section has been exited. */
vTaskSuspendAll();
prvLockQueue( pxQueue );
/* Update the timeout state to see if it has expired yet. */
if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE )
{
if( prvIsQueueFull( pxQueue ) != pdFALSE )
{
traceBLOCKING_ON_QUEUE_SEND( pxQueue );
vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToSend ), xTicksToWait );
/* Unlocking the queue means queue events can effect the
* event list. It is possible that interrupts occurring now
* remove this task from the event list again - but as the
* scheduler is suspended the task will go onto the pending
* ready last instead of the actual ready list. */
prvUnlockQueue( pxQueue );
/* Resuming the scheduler will move tasks from the pending
* ready list into the ready list - so it is feasible that this
* task is already in a ready list before it yields - in which
* case the yield will not cause a context switch unless there
* is also a higher priority task in the pending ready list. */
if( xTaskResumeAll() == pdFALSE )
{
portYIELD_WITHIN_API();
}
}
else
{
/* Try again. */
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
}
}
else
{
/* The timeout has expired. */
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
traceQUEUE_SEND_FAILED( pxQueue );
return errQUEUE_FULL;
}
} /*lint -restore */
}
好啦,今天煮包分享的内容就到这里啦~~~希望对大家有帮助~
openvela 操作系统专为 AIoT 领域量身定制,以轻量化、标准兼容、安全性和高度可扩展性为核心特点。openvela 以其卓越的技术优势,已成为众多物联网设备和 AI 硬件的技术首选,涵盖了智能手表、运动手环、智能音箱、耳机、智能家居设备以及机器人等多个领域。
更多推荐


所有评论(0)