前言

前面介绍完了FreeRTOS的一些核心功能,如任务切换,创建任务等等,并将煮包从ARM内核以及内存的视角的相关思考进行了分享,从这里开始介绍FreeRTOS的另外一个板块,就是任务间通信机制,如队列、信号量、互斥量、事件组这些,今天分享的是FreeRTOS队列的学习笔记;是关于标题中的互斥访问、休眠唤醒以及环形缓冲区都会在文章中一一介绍到~~~~

一、FreeRTOS队列的基础知识

FreeRTOS队列的核心:

关于队列,数据结构里面的描述就不做介绍啦~,最重要的就是队列是FIFO结构,即数据先进先出,对于FreeRTOS的队列,它有如下几个核心知识点:

1. 关中断(对应标题的实现互斥访问);

2.环形缓冲区(用于存放队列的数据)

3.链表(分为等待发送链表和等待获取链表,具体的后面细说)

介绍队列结构体(含有ringbuffer):

下面对队列的结构体进行一些介绍:

typedef struct QueueDefinition
{
    int8_t * pcHead;
    int8_t * pcWriteTo;

    union
    {
        QueuePointers_t xQueue;
        SemaphoreData_t xSemaphore;
    } u;

    List_t xTasksWaitingToSend;
    List_t xTasksWaitingToReceive;

    volatile UBaseType_t uxMessagesWaiting;
    UBaseType_t uxLength
    UBaseType_t uxItemSize;

    volatile int8_t cRxLock;
    volatile int8_t cTxLock;

    #if ( ( configSUPPORT_STATIC_ALLOCATION == 1 ) && ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) )
        uint8_t ucStaticallyAllocated; 
    #endif

    #if ( configUSE_QUEUE_SETS == 1 )
        struct QueueDefinition * pxQueueSetContainer;
    #endif

    #if ( configUSE_TRACE_FACILITY == 1 )
        UBaseType_t uxQueueNumber;
        uint8_t ucQueueType;
    #endif
} xQUEUE;

通过上面两个图可以看到,FreeRTOS的队列结构体里面含有头尾指针以及写指针和读指针;

那么pcReadFrom和pcWriteTo两个指针就构成环形缓冲区,即ringbuffer;用来存放队列项,可以上网查一下环形缓冲区是啥嘿嘿~,这里就不介绍啦~~~

下面就是两个链表,一个是xTasksWaitingToSend,用来存放当队列满时,如果有任务想要写队列,就会将该任务挂载到此链表;一个是xTasksWaitingToReceive,用来存放当队列空时,如果有任务想读队列,就会将该任务挂载到此链表;

然后就是这三个,uxMessagesWaiting是一个计数值,用来表示队列里面Item的数量,uxLength是指的队列的长度,uxItemSize是队列每一个队列项的大小,由此可知,队列环形缓冲区的总大小为:uxLength*uxItemSize;

其他参数就是关于上锁和可以配置的参数,暂时不介绍了,因为有很多其他博主写的非常完善和全面嘿嘿;

二、FreeRTOS队列的源码解析

创建队列:

其实队列的创建也很简单,就是先在FreeRTOS管理的堆栈空间中申请一片内存,包含了队列结构体头和环形缓冲区的大小,然后就是给队列结构体头赋初始值;让我们详细看一下这个队列创建的函数~~~

#if ( configSUPPORT_DYNAMIC_ALLOCATION == 1 )

    QueueHandle_t xQueueGenericCreate( const UBaseType_t uxQueueLength,
                                       const UBaseType_t uxItemSize,
                                       const uint8_t ucQueueType )
    {
        Queue_t * pxNewQueue;
        size_t xQueueSizeInBytes;
        uint8_t * pucQueueStorage;

        configASSERT( uxQueueLength > ( UBaseType_t ) 0 );

        xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize ); /*lint !e961 MISRA exception as the casts are only redundant for some ports. */

        /* Check for multiplication overflow. */
        configASSERT( ( uxItemSize == 0 ) || ( uxQueueLength == ( xQueueSizeInBytes / uxItemSize ) ) );

        /* Check for addition overflow. */
        configASSERT( ( sizeof( Queue_t ) + xQueueSizeInBytes ) >  xQueueSizeInBytes );

        pxNewQueue = ( Queue_t * ) pvPortMalloc( sizeof( Queue_t ) + xQueueSizeInBytes ); /*lint !e9087 !e9079 see comment above. */

        if( pxNewQueue != NULL )
        {
            pucQueueStorage = ( uint8_t * ) pxNewQueue;
            pucQueueStorage += sizeof( Queue_t );

            #if ( configSUPPORT_STATIC_ALLOCATION == 1 )
                {
                    pxNewQueue->ucStaticallyAllocated = pdFALSE;
                }
            #endif /* configSUPPORT_STATIC_ALLOCATION */

            prvInitialiseNewQueue( uxQueueLength, uxItemSize, pucQueueStorage, ucQueueType, pxNewQueue );
        }
        else
        {
            traceQUEUE_CREATE_FAILED( ucQueueType );
            mtCOVERAGE_TEST_MARKER();
        }

        return pxNewQueue;
    }

#endif /* configSUPPORT_STATIC_ALLOCATION */

可以看到这两句就是计算了环形缓冲区的大小,然后从堆中malloc一片内存~

        1. 计算环形缓冲区大小:xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize );
        2. 从堆中申请内存,包括队列结构体头和ringbuffer:pxNewQueue = ( Queue_t * ) pvPortMalloc( sizeof( Queue_t ) + xQueueSizeInBytes );

如果内存申请成功,也就是pxNewQueue不为NULL,则让pucQueueStorage指向环形缓冲区的开头;并调用prvInitialiseNewQueue来初始化其他结构体头的字段;否则执行else部分的测试呀或者其他的,通常功能需要我们自己实现;

那我们来看一下prvInitialiseNewQueue这个函数具体做了什么吧~

static void prvInitialiseNewQueue( const UBaseType_t uxQueueLength,
                                   const UBaseType_t uxItemSize,
                                   uint8_t * pucQueueStorage,
                                   const uint8_t ucQueueType,
                                   Queue_t * pxNewQueue )
{
    ( void ) ucQueueType;

    if( uxItemSize == ( UBaseType_t ) 0 )
    {
        pxNewQueue->pcHead = ( int8_t * ) pxNewQueue;
    }
    else
    {
        pxNewQueue->pcHead = ( int8_t * ) pucQueueStorage;
    }

    pxNewQueue->uxLength = uxQueueLength;
    pxNewQueue->uxItemSize = uxItemSize;

    ( void ) xQueueGenericReset( pxNewQueue, pdTRUE );

    #if ( configUSE_TRACE_FACILITY == 1 )
        {
            pxNewQueue->ucQueueType = ucQueueType;
        }
    #endif /* configUSE_TRACE_FACILITY */

    #if ( configUSE_QUEUE_SETS == 1 )
        {
            pxNewQueue->pxQueueSetContainer = NULL;
        }
    #endif /* configUSE_QUEUE_SETS */

    traceQUEUE_CREATE( pxNewQueue );
}

这个函数也很简单,主要就是判断每个队列项的大小是否是0,如果是,就把pcHead指向结构体头的起始位置,如果不为0,则pcHead指向环形缓冲区的起始位置;然后调用( void ) xQueueGenericReset( pxNewQueue, pdTRUE );函数,此函数如下:

BaseType_t xQueueGenericReset( QueueHandle_t xQueue,
                               BaseType_t xNewQueue )
{
    Queue_t * const pxQueue = xQueue;

    configASSERT( pxQueue );

    taskENTER_CRITICAL();
    {
        pxQueue->u.xQueue.pcTail = pxQueue->pcHead + ( pxQueue->uxLength * pxQueue->uxItemSize ); 

        pxQueue->uxMessagesWaiting = ( UBaseType_t ) 0U;
        pxQueue->pcWriteTo = pxQueue->pcHead;
        pxQueue->u.xQueue.pcReadFrom = pxQueue->pcHead + ( ( pxQueue->uxLength - 1U ) * pxQueue->uxItemSize ); 

        pxQueue->cRxLock = queueUNLOCKED;
        pxQueue->cTxLock = queueUNLOCKED;

        if( xNewQueue == pdFALSE )
        {
            if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE )
            {
                if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE )
                {
                    queueYIELD_IF_USING_PREEMPTION();
                }
                else
                {
                    mtCOVERAGE_TEST_MARKER();
                }
            }
            else
            {
                mtCOVERAGE_TEST_MARKER();
            }
        }
        else
        {
            /* Ensure the event queues start in the correct state. */
            vListInitialise( &( pxQueue->xTasksWaitingToSend ) );
            vListInitialise( &( pxQueue->xTasksWaitingToReceive ) );
        }
    }
    taskEXIT_CRITICAL();

    return pdPASS;
}

这个函数也很简单,主要还是对结构体头里面的字段进行初始化~,值得注意的是,这里在进行初始化操作之前需要先关中断,也就是实现了标题里面的互斥访问~;

1. 阅读代码,很容易可以理解pcTail 指向pcHead偏移环形缓冲区大小的位置,若每个队列项的大小是0,则pcTail和pcHead都指向结构体头的位置;若不为0,则pcHead指向ringbuffer的起始位置,pcTail指向ringbuffer的结束位置;

2. uxMessagesWaiting参数初始化为0;

3. pcWriteTo初始化是指向pcHead的,也就是当uxItemSize 为0时,pcWriteTo和pcHead和pcTail都指向结构体头的位置,若不为零,pcWriteTo指向ringbuffer的起始位置,也就是我们是从ringbuffer的开头开始写入数据,逻辑也合理嘿嘿;

4. pcReadFrom初始化指向队列最后一个队列项的起始位置,也就是我们从ringbuffer的最后一项开始读取数据;代码如下:

pxQueue->u.xQueue.pcReadFrom = pxQueue->pcHead + ( ( pxQueue->uxLength - 1U ) * pxQueue->uxItemSize );

5. 最后给两个上锁的参数初始值,就是queueUNLOCKED(-1),最后初始化两个链表

到此整个队列的创建工作就完成了~~,是不是和创建任务及其相似哈哈哈,还比创建任务简单,最后附图煮包在正点原子上截图的一张图片:

队列读:

队列读取,也是比较简单的,就分为三步,先关中断(实现互斥),再分有数据和无数据两种情况,有数据就copy数据,然后如果有任务在等待写队列,就唤醒队列,没有数据就返回ERR或者将任务休眠,接下来我们来看一下FreeRTOS的源码时怎么实现的吧~,由于FreeRTOS的队列获取函数实在太长,这里就放函数定义,然后截取关键的函数实现部分来分析啦~

BaseType_t xQueueReceive( QueueHandle_t xQueue,
                          void * const pvBuffer,
                          TickType_t xTicksToWait )

从定义看,我们调用这个接口后,就会从xQueue队列中读取数据并存放到pvBuffer里面,如果队列为空,就会将读取队列的任务阻塞xTicksToWait 长的时间;

然后看一下主要的函数实现部分:

有数据:

以下这个for循环里面就实现了队列读的有数据时的功能,嗯~~~应该能看懂吧~哈哈哈,就是判断一下uxMessagesWaiting 是否大于0,是就是有数据,copy数据到pvBuffer ,然后uxMessagesWaiting减1;

    for( ; ; )
    {
        taskENTER_CRITICAL();
        {
            const UBaseType_t uxMessagesWaiting = pxQueue->uxMessagesWaiting;

            if( uxMessagesWaiting > ( UBaseType_t ) 0 )
            {
                prvCopyDataFromQueue( pxQueue, pvBuffer );
                traceQUEUE_RECEIVE( pxQueue );
                pxQueue->uxMessagesWaiting = uxMessagesWaiting - ( UBaseType_t ) 1;

                if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE )
                {
                    if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE )
                    {
                        queueYIELD_IF_USING_PREEMPTION();
                    }
                    else
                    {
                        mtCOVERAGE_TEST_MARKER();
                    }
                }
                else
                {
                    mtCOVERAGE_TEST_MARKER();
                }

                taskEXIT_CRITICAL();
                return pdPASS;
            }
            else
            {
                if( xTicksToWait == ( TickType_t ) 0 )
                {
                    taskEXIT_CRITICAL();
                    traceQUEUE_RECEIVE_FAILED( pxQueue );
                    return errQUEUE_EMPTY;
                }
                else if( xEntryTimeSet == pdFALSE )
                {
                    vTaskInternalSetTimeOutState( &xTimeOut );
                    xEntryTimeSet = pdTRUE;
                }
                else
                {
                    /* Entry time was already set. */
                    mtCOVERAGE_TEST_MARKER();
                }
            }
        }
        taskEXIT_CRITICAL();

如果等待写这个队列的链表不为空,就唤醒一下链表里面的第一个任务(从等待写队列链表中移除,添加到对应优先级的就绪链表);具体的实现就是下面这个函数里面的如图所示内容;

( void ) uxListRemove( &( pxUnblockedTCB->xEventListItem ) );

更加详细的可以下载FreeRTOS源码进行查看啊哈哈哈哈~~~

无数据(实现真正的任务休眠):

当队列中没有数据的时候呢?就看以下代码:

            else
            {
                if( xTicksToWait == ( TickType_t ) 0 )
                {
                    taskEXIT_CRITICAL();
                    traceQUEUE_RECEIVE_FAILED( pxQueue );
                    return errQUEUE_EMPTY;
                }
                else if( xEntryTimeSet == pdFALSE )
                {
                    vTaskInternalSetTimeOutState( &xTimeOut );
                    xEntryTimeSet = pdTRUE;
                }
                else
                {
                    mtCOVERAGE_TEST_MARKER();
                }
            }

当任务传入的等待参数为0的时候,直接返回errQUEUE_EMPTY,如果不是0,则执行如下代码,将当前任务插入到队列的xTasksWaitingToReceive链表,最后再将当前任务添加到阻塞链表,阻塞时间就是传入的xTicksToWait;

void vTaskPlaceOnEventList( List_t * const pxEventList,
                            const TickType_t xTicksToWait )
{
    configASSERT( pxEventList );

    vListInsert( pxEventList, &( pxCurrentTCB->xEventListItem ) );

    prvAddCurrentTaskToDelayedList( xTicksToWait, pdTRUE );
}

当任务被添加到阻塞链表后就实现了真正的任务休眠啦~,休眠位置如下:

具体来说就是执行任务切换的地方

当后续任务被唤醒后,又会从这里开始往下执行,最后回到for循环开头,再判断uxMessagesWaiting,有没有数据,此时肯定是有的啦,因为有数据才会唤醒相应的任务呀~

队列写:

队列写和队列读基本是完全对称的行为,首先就是关中断,其次分为有空间或者无空间,有空间直接往队列里面copy数据,copy完后判断一下有没有任务等待读取数据,有就把它唤醒(唤醒和队列读基本一样,就是将任务从xTasksWaitingToReceive移除,再添加到对应优先级的就绪列表),如果没有空间了,就将当前任务挂载到队列的xTasksWaitingToSend链表,然后将当前任务从就绪链表中移动到阻塞链表;因为基本和队列读一样,这里就只放一段源码作为标记了~~

BaseType_t xQueueGenericSend( QueueHandle_t xQueue,
                              const void * const pvItemToQueue,
                              TickType_t xTicksToWait,
                              const BaseType_t xCopyPosition )
{
    BaseType_t xEntryTimeSet = pdFALSE, xYieldRequired;
    TimeOut_t xTimeOut;
    Queue_t * const pxQueue = xQueue;

    configASSERT( pxQueue );
    configASSERT( !( ( pvItemToQueue == NULL ) && ( pxQueue->uxItemSize != ( UBaseType_t ) 0U ) ) );
    configASSERT( !( ( xCopyPosition == queueOVERWRITE ) && ( pxQueue->uxLength != 1 ) ) );
    #if ( ( INCLUDE_xTaskGetSchedulerState == 1 ) || ( configUSE_TIMERS == 1 ) )
        {
            configASSERT( !( ( xTaskGetSchedulerState() == taskSCHEDULER_SUSPENDED ) && ( xTicksToWait != 0 ) ) );
        }
    #endif

    /*lint -save -e904 This function relaxes the coding standard somewhat to
     * allow return statements within the function itself.  This is done in the
     * interest of execution time efficiency. */
    for( ; ; )
    {
        taskENTER_CRITICAL();
        {
            /* Is there room on the queue now?  The running task must be the
             * highest priority task wanting to access the queue.  If the head item
             * in the queue is to be overwritten then it does not matter if the
             * queue is full. */
            if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) )
            {
                traceQUEUE_SEND( pxQueue );

                #if ( configUSE_QUEUE_SETS == 1 )
                    {
                        const UBaseType_t uxPreviousMessagesWaiting = pxQueue->uxMessagesWaiting;

                        xYieldRequired = prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );

                        if( pxQueue->pxQueueSetContainer != NULL )
                        {
                            if( ( xCopyPosition == queueOVERWRITE ) && ( uxPreviousMessagesWaiting != ( UBaseType_t ) 0 ) )
                            {
                                /* Do not notify the queue set as an existing item
                                 * was overwritten in the queue so the number of items
                                 * in the queue has not changed. */
                                mtCOVERAGE_TEST_MARKER();
                            }
                            else if( prvNotifyQueueSetContainer( pxQueue ) != pdFALSE )
                            {
                                /* The queue is a member of a queue set, and posting
                                 * to the queue set caused a higher priority task to
                                 * unblock. A context switch is required. */
                                queueYIELD_IF_USING_PREEMPTION();
                            }
                            else
                            {
                                mtCOVERAGE_TEST_MARKER();
                            }
                        }
                        else
                        {
                            /* If there was a task waiting for data to arrive on the
                             * queue then unblock it now. */
                            if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
                            {
                                if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
                                {
                                    /* The unblocked task has a priority higher than
                                     * our own so yield immediately.  Yes it is ok to
                                     * do this from within the critical section - the
                                     * kernel takes care of that. */
                                    queueYIELD_IF_USING_PREEMPTION();
                                }
                                else
                                {
                                    mtCOVERAGE_TEST_MARKER();
                                }
                            }
                            else if( xYieldRequired != pdFALSE )
                            {
                                /* This path is a special case that will only get
                                 * executed if the task was holding multiple mutexes
                                 * and the mutexes were given back in an order that is
                                 * different to that in which they were taken. */
                                queueYIELD_IF_USING_PREEMPTION();
                            }
                            else
                            {
                                mtCOVERAGE_TEST_MARKER();
                            }
                        }
                    }
                #else /* configUSE_QUEUE_SETS */
                    {
                        xYieldRequired = prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );

                        /* If there was a task waiting for data to arrive on the
                         * queue then unblock it now. */
                        if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
                        {
                            if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
                            {
                                /* The unblocked task has a priority higher than
                                 * our own so yield immediately.  Yes it is ok to do
                                 * this from within the critical section - the kernel
                                 * takes care of that. */
                                queueYIELD_IF_USING_PREEMPTION();
                            }
                            else
                            {
                                mtCOVERAGE_TEST_MARKER();
                            }
                        }
                        else if( xYieldRequired != pdFALSE )
                        {
                            /* This path is a special case that will only get
                             * executed if the task was holding multiple mutexes and
                             * the mutexes were given back in an order that is
                             * different to that in which they were taken. */
                            queueYIELD_IF_USING_PREEMPTION();
                        }
                        else
                        {
                            mtCOVERAGE_TEST_MARKER();
                        }
                    }
                #endif /* configUSE_QUEUE_SETS */

                taskEXIT_CRITICAL();
                return pdPASS;
            }
            else
            {
                if( xTicksToWait == ( TickType_t ) 0 )
                {
                    /* The queue was full and no block time is specified (or
                     * the block time has expired) so leave now. */
                    taskEXIT_CRITICAL();

                    /* Return to the original privilege level before exiting
                     * the function. */
                    traceQUEUE_SEND_FAILED( pxQueue );
                    return errQUEUE_FULL;
                }
                else if( xEntryTimeSet == pdFALSE )
                {
                    /* The queue was full and a block time was specified so
                     * configure the timeout structure. */
                    vTaskInternalSetTimeOutState( &xTimeOut );
                    xEntryTimeSet = pdTRUE;
                }
                else
                {
                    /* Entry time was already set. */
                    mtCOVERAGE_TEST_MARKER();
                }
            }
        }
        taskEXIT_CRITICAL();

        /* Interrupts and other tasks can send to and receive from the queue
         * now the critical section has been exited. */

        vTaskSuspendAll();
        prvLockQueue( pxQueue );

        /* Update the timeout state to see if it has expired yet. */
        if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE )
        {
            if( prvIsQueueFull( pxQueue ) != pdFALSE )
            {
                traceBLOCKING_ON_QUEUE_SEND( pxQueue );
                vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToSend ), xTicksToWait );

                /* Unlocking the queue means queue events can effect the
                 * event list.  It is possible that interrupts occurring now
                 * remove this task from the event list again - but as the
                 * scheduler is suspended the task will go onto the pending
                 * ready last instead of the actual ready list. */
                prvUnlockQueue( pxQueue );

                /* Resuming the scheduler will move tasks from the pending
                 * ready list into the ready list - so it is feasible that this
                 * task is already in a ready list before it yields - in which
                 * case the yield will not cause a context switch unless there
                 * is also a higher priority task in the pending ready list. */
                if( xTaskResumeAll() == pdFALSE )
                {
                    portYIELD_WITHIN_API();
                }
            }
            else
            {
                /* Try again. */
                prvUnlockQueue( pxQueue );
                ( void ) xTaskResumeAll();
            }
        }
        else
        {
            /* The timeout has expired. */
            prvUnlockQueue( pxQueue );
            ( void ) xTaskResumeAll();

            traceQUEUE_SEND_FAILED( pxQueue );
            return errQUEUE_FULL;
        }
    } /*lint -restore */
}

好啦,今天煮包分享的内容就到这里啦~~~希望对大家有帮助~

Logo

openvela 操作系统专为 AIoT 领域量身定制,以轻量化、标准兼容、安全性和高度可扩展性为核心特点。openvela 以其卓越的技术优势,已成为众多物联网设备和 AI 硬件的技术首选,涵盖了智能手表、运动手环、智能音箱、耳机、智能家居设备以及机器人等多个领域。

更多推荐