一,同步互斥与通信

        各类 RTOS 都会涉及的概念:任务通知(task notification)、队列(queue)、事件组(event group)、信号量(semaphoe)、互斥量(mutex)。

1,同步与互斥的概念

通过场景对比同步互斥的差别:

        比如说你等我用完厕所的场景,厕所是临界资源  (同一时间只能一人访问)

  • 同步:我在用厕所,你等会(有依赖)
  • 互斥:我在用厕所,你不能进来(拒绝访问)

        “互斥”操作可以用“同步”来实现(你“等”我用完厕所,你再用厕所)。

        裸机程序里,可以用一个全局变量或静态变量实现互斥操作,如下:

int LCD_PrintString(int x, int y, char *str)
{
    static int bCanUse = 1;

    if (bCanUse)
    {
        bCanUse = 0;
        /* 使用LCD */
        bCanUse = 1;
        return 0;
    }

    return -1;
}

        然而在 RTOS 里,使用上述代码实现互斥操作却有缺陷:假设A、B两个任务都想调用 LCD_PrintString ,任务A发现互斥值为 1 进入 if 语句的瞬间切换到任务B,此时任务B也进入了 if 语句,此时没能实现互斥操作

        于是尝试换一个逻辑,通过自加自减实现互斥避免以上问题:

01 int LCD_PrintString(int x, int y, char *str)
02 {
03     static int bCanUse = 1;
04     bCanUse--;
05     if (bCanUse == 0)
06     {
07         /* 使用LCD */
08         bCanUse++;
09         return 0;
10     }
11     else
12     {
13         bCanUse++;
14         return -1;
15     }
16 }

第四行代码的汇编指令如下:

        04.1 LDR R0, [bCanUse]         // 读取bCanUse的值,存入寄存器R0
        04.2 DEC R0, #1                     // 把R0的值减一
        04.3 STR R0, [bCanUse]         // 把R0写入变量bCanUse

        发现此代码还是有缺陷:假设任务 A 执行到第 04.1 行代码时读到的互斥值为 1,存入寄存器后就被切换出去了,任务B 得到的互斥值也为 1 经过第4步操作后进入 if 语句可调用LED,此时切换到任务A 继续根据栈中保持的寄存器值处理 04.2 行代码,此时也没能实现互斥操作

        综上发现:两种代码的逻辑缺陷在于被 RTOS 切换操作打断了判断过程,可以通过关闭中断来保证万无一失。代码改进如下:

// 示例 1 的代码改进如下:在 if判断 前关闭中断
int LCD_PrintString(int x, int y, char *str)
{
    static int bCanUse = 1;
    
    disable_irq();  // 关中断
    if (bCanUse)
    {
        bCanUse = 0;
        enable_irq();  // 开中断
        /* 使用LCD */
        bCanUse = 1;
        return 0;
    }

    return -1;
}

// 示例 2 的代码改进如下:在第 4 行前关闭中断
01 int LCD_PrintString(int x, int y, char *str)
02 {
03     static int bCanUse = 1;
       disable_irq();  // 关中断
04     bCanUse--;
       enable_irq();   // 开中断
05     if (bCanUse == 0)
06     {
07         /* 使用LCD */
08         bCanUse++;
09         return 0;
10     }
11     else
12     {
           disable_irq();  // 关中断
13         bCanUse++;
           enable_irq();   // 开中断
14         return -1;
15     }
16 }

2,各类方法的对比

        能实现同步、互斥的内核方法有:任务通知(task notification)、队列(queue)、事件组(event group)、信号量(semaphoe)、互斥量(mutex)。它们都有类似的方法:获取/释放、阻塞/唤醒、超时。比如:

  • 任务A 获取资源,用完后任务A 释放资源
  • 任务A 获取不到资源则阻塞,任务B 释放资源并把任务A 唤醒
  • 任务A 获取不到资源则阻塞并定时;要么超时返回,要么被唤醒
内核对象 生产者 消费者 数据 / 状态 说明
队列 ALL ALL 数据:若干个数据谁都可以往队列里扔数据,谁都可以从队列里读数据 用来传递数据,发送者、接收者无限制,一个数据只能唤醒一个接收者
事件组 ALL ALL 多个位:或、与谁都可以设置 (生产) 多个位,谁都可以等待某个位、若干位 用来传递事件,可以是 N 个事件,发送者、接受者无限制,可以唤醒多个接收者:像广播
信号量 ALL ALL 数量:0~n谁都可以增加一个数量,谁都可消耗一个数量 用来维持资源的个数,生产者、消费者无限制,1 个资源只能唤醒 1 个接收者
任务通知 ALL 只有我 数据、状态都可以传输,使用任务通知时,必须指定接受者 N 对 1 的关系:发送者无限制,接收者只能是这个任务
互斥量 只能 A 开锁 A 上锁 位:0、1我上锁:1 变为 0,只能由我开锁:0 变为 1 就像一个空厕所,谁使用谁上锁,也只能由他开锁

二,队列(queue)

        队列可以用于“任务到任务”、“任务到中断”、“中断到任务”直接传输信息。

        对于队列需要关注的内容有:创建 清除 删除操作,队列中消息的保持,队列数据的发送 读取 覆盖,(单/多)队列的阻塞,读写队列如何影响任务的优先级。

队列的引入:

        环形缓冲区:可在两个任务间传输数据,且能保证数据正确(注意:为了区分缓冲区空\满逻辑的判断,实际存入的数据比缓冲区大小少1)【写任务只修改 w,读任务只修改 r,确保没有被同时访问的变量(只适合两个任务,不考虑阻塞-唤醒的效率问题)】

    环形缓冲区不引入 num 的记录,会导致 读/写 两个任务修改同一个变量【需要保护机制】

  • 队列的本质:环形缓冲区的基础上增加了保护措施、阻塞-环形机制
  • 信号量的本质:队列不传输数据,只调整“数据个数”
  • 互斥量的本质:信号量中“数据个数”最大值为1

1,队列的特性

1)常规操作:

  • 队列可以包含若干数据(队列的项的个数称为“长度” length)
  • 每个数据大小固定
  • 创建队列时就要指定长度、数据大小
  • 数据的操作采用先进先出 (FIFO) 方法【写数据时放尾部,读数据时从头部】
  • 可以强制写队列头部:即覆盖头部数据

        图示如下:

        队列包含环形 Buffer 和 两个链表(Sender\Receiver List),通过链表互相唤醒

        如果任务B 想读队列却读不到数据且愿意等待时,从 Ready链表 中移除后放入队列的 接收链表和Delay链表。【情况1】任务A 写队列后会从 接收链表 中取出第一个任务唤醒(把任务B 从两个链表中删除,重新放入 Ready链表);任务B 有几乎运行时会 处理后续操作。【情况2】tick 中断会判断任务是否超时,当超时时会把任务B 从两个链表中删除后重新放入 Ready链表;xQueueReceive 函数返回值时错误(可知被超时唤醒)

2)传输数据的两种方法:

  • 拷贝:把数据、变量的值复制进队列里
  • 引用:把数据、变量的地址复制进队列里

FreeRTOS  使用拷贝值的方法,更简单:

  • 局部变量的值可以发送到队列中,即时局部变量被回收也不会影响队列
  • 无需分配 buffer 来保存数据,队列中有 buffer
  • 发送、接收任务解耦(接受任务不需要知道数据的来源 与 发送者)
  • 数据太大时可用队列传输其地址
  • 队列的空间由 FreeRTOS 内核分配,无需操心
  • 对于内存有保护功能的系统,若队列使用引用方法(即使用地址),必须确保双方任务对这个地址都有访问权限。而拷贝方法无此限制(内核有足够的权限复制数据)。

3)队列的阻塞访问

        只需知道队列的句柄,谁都可以读/写该队列(任务/ISR 都可以),可以多个任务读写队列。

        读写队列时:如果读写不成功,则阻塞,可以指定超时时间。

        任务读队列时,若队列没有数据则该任务可进入阻塞状态;队列有数据后阻塞的任务变为就绪态。(可指定阻塞时间,超时后也会进入就绪态)。多个任务等待同一队列数据时:优先级高的先进入就绪态,优先级相同时等待时间最久的先进入就绪态。

        类似,写队列时,若队列已满则该任务进入阻塞状态,当队列有空间后阻塞的任务变为就绪态,也可以指定阻塞时间。多个任务写“满队列“ 时:优先级高的先进入就绪态,优先级相同时等待时间最久的先进入就绪态。

2,队列函数

1)创建

  • 动态分配内存:xQueueCreate
QueueHandle_t xQueueCreate( UBaseType_t uxQueueLength, UBaseType_t uxItemSize );
// uxQueueLength队列长度,最多能存放多少个item
// uxItemSize每个item的大小,以字节为单位
// 返回值:非 0 表示成功,返回句柄(以后可用句柄来操作队列);内存不足失败返回 NULL
  • 静态分配内存:xQueueCreateStatic
QueueHandle_t xQueueCreateStatic(
                        UBaseType_t uxQueueLength,  // 队列长度
                        UBaseType_t uxItemSize,     // 每个item的大小
                        uint8_t *pucQueueStorageBuffer,   // 指向存放队列的 uint8_t数组
                        StaticQueue_t *pxQueueBuffer   // 必须指向一个结构体,保存队列数据结构
);
// pucQueueStorageBuffer 数组大小为"uxQueueLength * uxItemSize"
// 返回值:非 0 表示成功,返回句柄(以后可用句柄来操作队列);内存不足失败返回 NULL
// 示例代码
#define QUEUE_LENGTH  10
#define ITEM_SIZE sizeof( uint32_t )

StaticQueue_t xQueueBuffer;  //保存队列结构体,存储队列的管理信息(如队列状态、头尾指针等)
uint8_t ucQueueStorage[QUEUE_LENGTH * ITEM_SIZE ];  // 对应大小的数组 (内存缓冲区)

void vATask( void *pvParameters )
{
    QueueHandle_t xQueue1;
    // 创建队列(静态)
    xQueue1 = xQueueCreateStatic(QUEUE_LENGTH, ITEM_SIZE, ucQueueStorage, &xQueueBuffer);
}

2)复位

        队列刚被创建时,里面没有数据

        使用过程中可把队列恢复为初始状态,用:xQueueReset()

/* pxQueue : 复位哪个队列;
* 返回值: pdPASS(宏定义的1,必定成功)
*/
BaseType_t xQueueReset( QueueHandle_t pxQueue);

3)删除

        只能删除用动态方法创建的队列,会释放内存。

void vQueueDelete( QueueHandle_t xQueue );

4)写队列

        可以把数据写到队列头部,也可以写到尾部。函数有两个版本:任务中使用,ISR 中使用。

/*
 * 等同于xQueueSendToBack
 * 往队列尾部写入数据,如果没有空间,阻塞时间为xTicksToWait
 */
BaseType_t xQueueSend(
                      QueueHandle_t xQueue,
                      const void   *pvItemToQueue,
                      TickType_t   xTicksToWait
                    );

/*
 * 往队列尾部写入数据,如果没有空间,阻塞时间为xTicksToWait
 */
BaseType_t xQueueSendToBack(
                      QueueHandle_t xQueue,
                      const void   *pvItemToQueue,
                      TickType_t   xTicksToWait
                    );

/*
 * 往队列尾部写入数据,此函数可以在中断函数中使用,不可阻塞
 */
BaseType_t xQueueSendToBackFromISR(
                      QueueHandle_t xQueue,
                      const void   *pvItemToQueue,
                      BaseType_t   *pxHigherPriorityTaskWoken
                    );

/*
 * 往队列头部写入数据,如果没有空间,阻塞时间为xTicksToWait
 */
BaseType_t xQueueSendToFront(
                      QueueHandle_t xQueue,
                      const void   *pvItemToQueue,
                      TickType_t   xTicksToWait
                    );

/*
 * 往队列头部写入数据,此函数可以在中断函数中使用,不可阻塞
 */
BaseType_t xQueueSendToFrontFromISR(
                      QueueHandle_t xQueue,
                      const void   *pvItemToQueue,
                      BaseType_t   *pxHigherPriorityTaskWoken
                    );
参数 说明
xQueue
队列句柄,要写哪个队列
pvItemToQueue
数据指针,这个数据的值会被复制进队列,
复制多大的数据?在创建队列时已经指定了数据大小
xTicksToWait
xTicksToWait 表示阻塞的最大时间(Tick Count)
如果被设为 0,无法写入数据时函数会立刻返回;
如果被设为 portMAX_DELAY,则会一直阻塞直到有空间可写。
pxHigherPriorityTaskWoken

标记 “是否有更高优先级的任务因为本次操作需要被唤醒”

pdTRUE表示 “是”,pdFALSE表示 “否”

返回值
pdPASS:数据成功写入了队列
errQUEUE_FULL:写入失败,因为队列满了

5)读队列

        读到一个数据后,队列中该数据会被移除。Receive 函数有两个版本,任务/中断。

BaseType_t xQueueReceive( QueueHandle_t xQueue,
                          void* const pvBuffer,
                          TickType_t xTicksToWait );

BaseType_t xQueueReceiveFromISR( QueueHandle_t xQueue,
                                 void* const pvBuffer,
                                 BaseType_t *pxTaskWoken );

6)查询

        可查询队列中有多少个可用数据、有多少空余空间

/*
* 返回队列中可用数据的个数
*/
UBaseType_t uxQueueMessagesWaiting( const QueueHandle_t xQueue );

/*
* 返回队列中可用空间的个数
*/
UBaseType_t uxQueueSpacesAvailable( const QueueHandle_t xQueue );

7)覆盖 / 偷看

        当队列长度为 1 时【必须】,可用 Overwrite 来覆盖数据,这也意味着函数不会被阻塞。

/* 覆盖队列
* xQueue: 写哪个队列
* pvItemToQueue: 数据地址
* 返回值: pdTRUE表示成功, pdFALSE表示失败
*/
BaseType_t xQueueOverwrite( QueueHandle_t xQueue,
                            const void * pvItemToQueue);

BaseType_t xQueueOverwriteFromISR( QueueHandle_t xQueue,
                                   const void * pvItemToQueue,
                                   BaseType_t *pxHigherPriorityTaskWoken);

        如果想让队列中的数据供多方读取,即读取时不移除数据,留给后面的人。可以用窥视 Peek,从队列中复制数据但不移除。若队列没有数据,Peek 时也会导致阻塞。

/* 偷看队列
* xQueue: 偷看哪个队列
* pvItemToQueue: 数据地址, 用来保存复制出来的数据
* xTicksToWait: 没有数据的话阻塞一会
* 返回值: pdTRUE表示成功, pdFALSE表示失败
*/
BaseType_t xQueuePeek( QueueHandle_t xQueue,
                       void * const pvBuffer,
                       TickType_t xTicksToWait);

BaseType_t xQueuePeekFromISR( QueueHandle_t xQueue,
                              void *pvBuffer,);

3,队列的基本使用

1)程序框架

        基本方法是用环形缓冲区传输红外遥控的数据(轮询的方式从中读取键值),现把环形缓冲区改为队列,框架如下:

        game任务主要逻辑是进行球的轨迹控制,进而调整移动方向、砖块、分数;platform任务轨迹遥控器移动挡球板;IRReceiver解析遥控器键值后写队列。

2)源码分析

        IRReceiver_IRQ_Callback中断回调函数里,识别出红外遥控键值后,构造一个struct input_data结构体,然后使用xQueueSendFromISR函数把它写入队列g_xQueuePlatform。

        写队列代码如下:

struct input_data idata;
idata.dev = 0;
idata.val = 0;
xQueueSendToBackFromISR(g_xQueuePlatform, &idata, NULL);

        挡球板中的从全局队列中读取数据的示例如下:

// 挡球板任务
static void platform_task(void* params)
{
    byte platformXtmp = platoformX;
    uint8_t dev, data, last_data;
    struct input_data idata;

    // Draw platform
    draw_bitmap(platformXtmp, g_yres - 8, platform, 12, 8, NOINVERT, 0);
    draw_flushArea(platformXtmp, g_yres - 8, 12, 8);

    while(1)
    {
        // 读取红外遥控器
        // if(0 == IRReceiver_Read(&dev, &data))
        if(pdPass == xQueueReceive(g_xQueuePlatform, &idata, portMAX_DELAY))
        {
            data = idata.val;
            if (data == 0x00) // 重复码传输时间较短(长按控制方便)
            {
            data = last_data;
            }

            (data == 0xe0) /* Left */
            {
            btnLeft();
            }

            if (data == 0x90) /* Right */
            {
            btnRight();
            }
            last_data = data;
        }
    }
}

        注释掉的 if 是初始代码用轮询的方式读取遥控键值,效率很低。修改后的读取队列当没有数据时,挡球板任务阻塞,当IRReceiver_IRQ_Callback中断回调函数把数据写入队列后,挡球板任务马上被唤醒,继续执行后面代码

4,使用队列实现多设备输入

        同时用红外遥控器、旋转编码器玩游戏:用两种写队列的方式(中断 任务);通过读写队列(有阻塞)提高CPU运行效率。【注意:中断或任务中用的读写队列的函数有所不同】

    旋转编码器控制略复杂(需分辨不同的旋转速度),需要中间增加一级队列与处理数据的任务。

static void RotaryEncoder_task(void *params)
{
    struct rotary_data rdata;
    struct input_data idata;
    int left, i, cnt;

    while(1)
    {
        // 读旋转编码器队列
        xQueueReceive(g_xQueueRotary, &rdata, portMAX_DELAY);

        // 处理数据
        // 判断速度:负数表示向左转,反之之间右转
        if(rdata.speed < 0)
        {
            left = 1;
            rdata.speed = 0 - rdata.speed;
        }
        else
        {
            left = 0;
        }

        cnt = rdata.speed / 10;
        if(!cnt)
        {
            cnt = 1;
        }

        // 旋转编码器模拟红外遥控
        idata.dev = 1;
        idata.val = left ? 0xe0 : 0x90;

        // 写挡球板队列(可能多次)
        for(i=0; i < cnt; i++)
        {
            xQueueSend(g_xQueueFlat, &idata, 0);
        }
    }
}

        使用队列后多线程性能有所改进(测试:增加音乐播放任务后运行流程)。

问题:明明是旋转编码器,为什么要通过转换成红外控制的键值来控制程序?

原因:挡球板队列沿用了早期的红外遥控的格式。

改造:val值不对应键值而是创建宏后 统一用代表左右的宏给 val 赋值。       

          【初始程序业务切换时按键意义改变(都用红外键值),修改后增加中间任务处理数据,但仍然有问题:每个设备对应一个任务解析导致内存占用大浪费资源】

       新框架设计:通过多个队列保存写入数据(只和硬件相关),通过同一任务 inputTask 修改目标队列,inputTask可以用轮询方式(不推荐,多个receive的轮询不可设置阻塞时间,会占用cpu资源)和 队列集【推荐,容易添加硬件对队列数量无限制,高效且框架结构清晰】。

5,队列集

        假设有两个输入设备(红外遥控器、旋转编码器),它们的驱动程序应该专注于“产生硬件数据”,不应该跟“业务有任何连续”。【驱动程序只应该把 键值\操作 记录下来,不应该转换为游戏的控制键,即驱动程序不应该有游戏相关代码(游戏的任务把各种输入操作解析为游戏控制)】。当场景变化时这个驱动程序还可以继续使用。

        要支持多个输入设备,需实现一个“InputTask”读取各个设备的队列【用队列集】,得到数据后再分别转换为游戏的控制。

  队列集的本质也是队列,但是里面存放的是“队列句柄”,过程如下:
        a. 创建队列 A,它的长度是 n1
        b. 创建队列 B,它的长度是 n2
        c. 创建队列集 S,它的长度是“n1+n2”
        d. 把队列 A、B 加入队列集 S
        e. 这样,写队列 A 的时候,会顺便把队列 A 的句柄写入队列集 S
        f. 这样,写队列 B 的时候,会顺便把队列 B 的句柄写入队列集 S
        g. InputTask 先读取队列集 S,它的返回值是一个队列句柄,这样就可以知道哪个队列有数据了;然后 InputTask 再读取这个队列句柄得到数据。

1)创建队列集

QueueSetHandle_t xQueueCreateSet( const UBaseType_t uxEventQueueLength );
// uxEventQueueLength 是队列集长度,最多能放的数据个数(队列句柄)
// 返回值非0表成功对应句柄,返回NULL表内存不足失败

2)把队列加入队列集

BaseType_t xQueueAddToSet( QueueSetMemberHandle_t xQueueOrSemaphore,
                           QueueSetHandle_t xQueueSet );
// xQueueOeSemaphore 队列句柄,要加入队列集的队列
// xQueueSet 队列集句柄
// 返回值:pdTRUE / pdFALSE

3)读取队列集

QueueSetMemberHandle_t xQueueSelectFromSet( QueueSetHandle_t xQueueSet,
                                            TickType_t const xTicksToWait );
// xQueueSet队列集句柄
// xTicksToWait 表示阻塞的最大时间(Tick Count)。
// 若设为 0,无法读出数据时函数会立刻返回;若为 portMAX_DELAY,则会一直阻塞直到有数据可写

        读取成功后返回读取到的 队列句柄

6,数据分发给多个任务

        实现另一个游戏:用红外遥控器不同按钮控制 3 辆汽车,框架如下:

        三辆汽车分别创建自己的队列,并注册给 dev_irda.c 读取队列,根据遥控器键值移动汽车。

static void CarTask(void *params)
{
    struct car *pcar = params;
    struct ir_data idata;

    /* **创建自己的队列** */
    QueueHandle_t xQueueIR = xQueueCreate(10, sizeof(struct ir_data));

    /* **注册队列** */
    RegisterQueueHandle(xQueueIR);

    /* 显示汽车 */
    ShowCar(pcar);

    while (1)
    {
        /* 读取按键值:读队列 */
        xQueueReceive(xQueueIR, &idata, portMAX_DELAY);

        /* 控制汽车往右移动 */
        if (idata.val == pcar->control_key)
        {
            if (pcar->x < g_xres - CAR_LENGTH)
            {
                /* 隐藏汽车 */
                HideCar(pcar);

                /* 调整位置 */
                pcar->x += 20;
                if (pcar->x > g_xres - CAR_LENGTH)
                {
                    pcar->x = g_xres - CAR_LENGTH;
                }

                /* 重新显示汽车 */
                ShowCar(pcar);
            }
        }
    }
}

IRReceiver_IRQ_Callback解析出遥控器键值后,写多个队列。

/* 创建3个汽车任务 */
#if 0
    for (i = 0; i < 3; i++)
    {
        draw_bitmap(g_cars[i].x, g_cars[i].y, carImg, 15, 16, NOINVERT, 0);
        draw_flushArea(g_cars[i].x, g_cars[i].y, 15, 16);
    }
#endif
    xTaskCreate(CarTask, "car1", 128, &g_cars[0], osPriorityNormal, NULL);
    xTaskCreate(CarTask, "car2", 128, &g_cars[1], osPriorityNormal, NULL);
    xTaskCreate(CarTask, "car3", 128, &g_cars[2], osPriorityNormal, NULL);
}

三、信号量(semaphore)

        上一节的消息队列用于传输多个数据,但是有时候只需要传递状态,这个状态值需要用一个数值表示(停车后车位数减一,离开后车位数加1),这种情况下我们只需要维护一个数值【用信号量效率更高、更节省内存】

        涉及内容:创建、删除、发送、获得 信号量; 计数型、二进制信号量的区别。

1)信号量特性

  • 信号:提醒通知
  • 量:   计数(没限制时就是 计数型信号量(Counting Semaphores)二进制信号量(Binary Semaphores) 只有 0、1两个取值)
  • 动作:“give” 给出资源,计数值加 1;“take” 获得资源,计数值减 1

        【信号量的"give"、"take"双方并不需要相同,可以用于生产者-消费者场合

           二进制信号量跟计数型的差别,就是计数值的最大值被限定为1。(二进制型创建时初始值为0,计数型创建时初始值可以设定,其它操作一样。)

        信号量只有计数值,无法容纳其他数据(创建时只需要分配结构体);队列可容纳多个数据(创建时还要有存储数据的空间)。队列的生产者和消费者都可以阻塞,而信号量只有消费者可以阻塞(信号量的生产者发现计数值达到最大时返回失败)。

2)信号量函数

(1)创建(动态、静态)

        二进制信号量:

/* 创建一个二进制信号量,返回它的句柄。
* 此函数内部会分配信号量结构体
* 返回值: 返回句柄,非 NULL 表示成功
*/
SemaphoreHandle_t xSemaphoreCreateBinary( void );


/* 创建一个二进制信号量,返回它的句柄。
* 此函数无需动态分配内存,所以需要先有一个 StaticSemaphore_t 结构体,并传入它的
指针
* 返回值: 返回句柄,非 NULL 表示成功
*/
SemaphoreHandle_t xSemaphoreCreateBinaryStatic( StaticSemaphore_t *pxSemaphoreBuf
fer );

        计数型信号量:

/* 创建一个计数型信号量,返回它的句柄。
* 此函数内部会分配信号量结构体
* uxMaxCount: 最大计数值
* uxInitialCount: 初始计数值
* 返回值: 返回句柄,非 NULL 表示成功
*/
SemaphoreHandle_t xSemaphoreCreateCounting(UBaseType_t uxMaxCount, UBaseType_t
uxInitialCount);


/* 创建一个计数型信号量,返回它的句柄。
* 此函数无需动态分配内存,所以需要先有一个 StaticSemaphore_t 结构体,并传入它的指针
* uxMaxCount: 最大计数值
* uxInitialCount: 初始计数值
* pxSemaphoreBuffer: StaticSemaphore_t 结构体指针
* 返回值: 返回句柄,非 NULL 表示成功
*/
SemaphoreHandle_t xSemaphoreCreateCountingStatic( UBaseType_t uxMaxCount, UBaseType_t uxInitialCount, StaticSemaphore_t *pxSemaphoreBuffer );

(2)删除(可以用来删除二进制、计数型信号量)

/*
* xSemaphore: 信号量句柄,你要删除哪个信号量
*/
void vSemaphoreDelete( SemaphoreHandle_t xSemaphore )

(3)give / take(take可以阻塞,give不能阻塞(若计数值已经最大则返回失败))

3)计数型信号量案例

        3俩小车要进城,但是通行证只有2张,进城后就交还,其他车辆就可以得到通行证。

void car_game(void)
{
	int x;
	int i, j;
	g_framebuffer = LCD_GetFrameBuffer(&g_xres, &g_yres, &g_bpp);
	draw_init();
	draw_end();
	g_xSemTicks = xSemaphoreCreateCounting(3, 2);  // 先创建信号量(最大值为3,初始值为2)
...
}

// 汽车任务如下
static void CarTask(void *pvParameters)
{
    struct car *pcar = (struct car *)pvParameters;
    struct ir_data idata;

    /* 创建自己的队列 */
    xQueueHandle xQueueIR = xQueueCreate(10, sizeof(struct ir_data));

    /* 注册队列 */
    RegisterCarQueue(xQueueIR);

    /* 显示汽车 */
    ShowCar(pcar);

    /* 获得信号量 */
    xSemaphoreTake(g_SemTicks, portMAX_DELAY);

    while (1)
    {
       
        if (pcar->x < g_xres - CAR_LENGTH)
        {
            /* 隐藏汽车 */
            HideCar(pcar);

            /* 调整位置 */
            pcar->x += 1;
            if (pcar->x >= g_xres - CAR_LENGTH)
            {
                pcar->x = g_xres - CAR_LENGTH;
            }

            /* 重新显示汽车 */
            ShowCar(pcar);

            vTaskDelay(60);
        }

        if (pcar->x == g_xres - CAR_LENGTH)  // 汽车行驶倒最右边
        {
            /* 释放信号量 */
            xSemaphoreGive(g_SemTicks);
            vTaskDelete(NULL);
        }
    }
}
现象为:car1car2一起往右行驶,任何一辆到达右边后car3才开始往右行驶。
        (优先级相同时,按照调用函数 take 的时刻序列排序)

4)二进制信号量案例

        3俩小车要进城,但是通行证只有1张,进城后就可以交还同行证,其他车辆就可以得到
通行证。这个场景使用二进制信号量。
// 只是在创建信号量时的代码不一样
void car_game(void)
{
	int x;

	int i, j;
	g_framebuffer = LCD_GetFrameBuffer(&g_xres, &g_yres, &g_bpp);
	draw_init();
	draw_end();

	//g_xSemTicks = xSemaphoreCreateCounting(1, 1);
	g_xSemTicks = xSemaphoreCreateBinary();
    //【注意】:二进制信号量创建时初值为0,需要置1后车辆才能移动
	xSemaphoreGive(g_xSemTicks); /* 对于二进制信号量,它的最大值是1,后面两次give无效 */
	xSemaphoreGive(g_xSemTicks);
	xSemaphoreGive(g_xSemTicks);
...
}
实现现象:car1car2car3依次运行,前面的车行驶到最右边时,下一辆车才开始运行。

5)优先级反转问题

        低优先级任务比高优先级任务先运行(计数型、二进制信号量都有此问题

使用信号量时,有如下优先级反转现象:        

        第二个任务优先级中等,但没有获得信号量,导致信号量阻塞(运行第二个任务,第一个任务不运行也无法释放信号量);第三个高优先级任务信号量获取失败反而无法运行

        【互斥量可解决优先级反转问题(领导临时提拔你)】

四,互斥量 (mutex)

        互斥量是一种特殊的信号量

        场景:校长临时提拔学生,让学生在主任前运行,学生运行完后恢复级别的同时校长已被唤醒,校长达成了用超算的目的(有问题的2号程序被排在了最后)

        本质:L 的优先级临时被提升到 H 的优先级

    概念:互斥量 ≈ 带优先级继承 / 天花板的二进制信号量

        使用队列、信号量可以实现互斥访问(如信号量初值为 1,任务A take成功,任务B 等待,A处理完后give,进而任务B 使用)。但是有前提:任务B 在任务A 后再 give;期间没有别的任务 give。【谁拿谁放都行】

        使用互斥量可解决此问题,互斥量是实现互斥访问的量,值为 0 或 1。【谁上锁,谁开锁】

注意:FreeRTOS的互斥锁,并没有在代码上实现这点:
        ·  即使任务 A 获得了互斥锁,任务 竟然也可以释放互斥锁。
        ·  谁上锁、谁释放:只是约定。

      本章涉及:为什么要实现互斥操作、怎么使用互斥量、互斥量导致的优先级反转、优先级继承

1)使用场合

        任务 A 正在使用某个资源,还没用完的情况下任务 B 也来使用的话, 就可能导致问题。

        (如串口打印信息时被打断造成混乱)

  现象常见于:访问外设;读、修改、写操作导致的问题;对变量的非原子化访问;函数重入

        解决方法是:任务A访问这些全局变量、函数代码时,独占它(就是上个锁)。被独占的被称为临界资源。

    互斥量也被称为互斥锁,使用过程如下:

  • 互斥量初始值为 1
  • 任务 A 想访问临界资源,先获得并占有互斥量,然后开始访问
  • 任务 B 也想访问临界资源,也要先获得互斥量:被别人占有了,于是阻塞
  • 任务 A 使用完毕,释放互斥量;任务 B 被唤醒、得到并占有互斥量,然后开 始访问临界资源
  • 任务 B 使用完毕,释放互斥量
        FreeRTOS未实现:任务A占有互斥量的情况下,其他任务也可释放互斥量。 

2)互斥量函数

(1)创建
        要想使用互斥量,需要在配置文件FreeRTOSConfig.h中定义:
##define configUSE_MUTEXES 1
/* 创建一个互斥量,返回它的句柄。
* 此函数内部会分配互斥量结构体
* 返回值: 返回句柄,非 NULL 表示成功
*/
SemaphoreHandle_t xSemaphoreCreateMutex( void );


/* 创建一个互斥量,返回它的句柄。
* 此函数无需动态分配内存,所以需要先有一个 StaticSemaphore_t 结构体,并传入它的指针
* 返回值: 返回句柄,非 NULL 表示成功
*/
SemaphoreHandle_t xSemaphoreCreateMutexStatic( StaticSemaphore_t *pxMutexBuffer );
(2)其他函数
        注意:互斥量不能在 ISR 中使用
        各类操作函数(删除、give、take) 与信号量是一样的
/*
* xSemaphore: 信号量句柄,你要删除哪个信号量, 互斥量也是一种信号量
*/
void vSemaphoreDelete( SemaphoreHandle_t xSemaphore );


/* 释放 */
BaseType_t xSemaphoreGive( SemaphoreHandle_t xSemaphore );


/* 释放(ISR 版本) */
BaseType_t xSemaphoreGiveFromISR(
SemaphoreHandle_t xSemaphore, BaseType_t *pxHigherPriorityTaskWoken);


/* 获得 */
BaseType_t xSemaphoreTake(
SemaphoreHandle_t xSemaphore, TickType_t xTicksToWait);


/* 获得(ISR 版本) */
xSemaphoreGiveFromISR(
SemaphoreHandle_t xSemaphore, BaseType_t *pxHigherPriorityTaskWoken);

3)优先级继承

        之前优先级问题是:car1低优先级任务获得了锁,但是它优先级太低而无法运行。

        解决方法是:临时提升 car1 任务的优先级,让它能尽快运行、释放锁。【car3也想获得同一个互斥锁,不成功而阻塞时, car1的优先级提升得跟car3一样。

        具体场景见本节开始的讲解。

4)递归锁

    死锁的概念:
  • A 获得了互斥量 M1
  • B 获得了互斥量 M2
  • A 还要获得互斥量 M2 才能运行,结果 A 阻塞
  • B 还要获得互斥量 M1 才能运行,结果 B 阻塞
  • AB 都阻塞,再无法释放它们持有的互斥量
  • 死锁发生!

    自我死锁:

  • 任务 获得了互斥锁 M
  • 它调用一个库函数
  • 库函数要去获取同一个互斥锁 M,于是它阻塞:任务 A 休眠,等待任务 A 来释放互斥锁!
  • 死锁发生!        
// 普通互斥量
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

// 函数A:加锁后调用函数B
void funcA() {
    pthread_mutex_lock(&mutex); // 第一次加锁:成功(锁状态=1)
    funcB(); // 调用funcB
    pthread_mutex_unlock(&mutex);
}

// 函数B:又加同一把锁
void funcB() {
    pthread_mutex_lock(&mutex); // 第二次加锁:锁已被当前线程持有
    // 此处发生死锁:当前线程等待自己释放锁 → 永远阻塞
    do_something();
    pthread_mutex_unlock(&mutex);
}

// 主线程调用funcA → 死锁
funcA();

    递归锁(Recursive Mutexes) 可解决自死锁问题,无法解决交叉死锁问题

    递归锁(Recursive Mutex)在普通互斥量基础上,增加了计数机制所有权验证

  • 计数机制:记录同一线程对锁的 “加锁次数”(初始 0);
  • 所有权验证只有持有锁的线程能重复加锁
  • 解锁规则加锁几次,就要解锁几次,计数归 0 时才真正释放锁。
递归锁 一般互斥量
创建 xSemaphoreCreateRecursiveMutex xSemaphoreCreateMutex
获得 xSemaphoreTakeRecursive xSemaphoreTake
释放 xSemaphoreGiveRecursive xSemaphoreGive
// 递归互斥量(初始化时指定类型)
pthread_mutex_t rmutex;
pthread_mutexattr_t attr;
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); // 关键:设置为递归锁
pthread_mutex_init(&rmutex, &attr);

void funcA() {
    pthread_mutex_lock(&rmutex); // 第1次加锁:计数=1,成功
    funcB();
    pthread_mutex_unlock(&rmutex); // 第1次解锁:计数=0 → 真正释放锁
}

void funcB() {
    pthread_mutex_lock(&rmutex); // 第2次加锁:同一线程,计数=2,成功(无死锁)
    do_something();
    pthread_mutex_unlock(&rmutex); // 第2次解锁:计数=1
}

// 调用funcA → 正常执行,无死锁
funcA();

五,事件组 (event group)

    比如日常生活场景:

  • 出发:要等待 3 个人都到齐,他们是""的关系
  • 交报告:只需等待 3 人中的任何一个,他们是""的关系

   在 FreeRTOS 中,可以使用事件组(event group)来解决这些问题。

        涉及内容:概念,操作函数,优缺点,设置 等待 清除 事件组中的位,同步多个任务

1,概念与操作

    事件组可以简单的认为就是一个整数:

  • 每一位表示一个事件  
  • 每一位事件的含义由程序员决定,如:Bit0 表示用来串口是否就绪,Bit1 表示按键是否被按下
  • 这些位,值为 1 表示事件发生了,值为 0 表示事件没发生
  • 一个或多个任务、ISR 都可以去写这些位;一个或多个任务、ISR 都可以去读
  • 这些位可以等待某一位、某些位中的任意一个,也可以等待多位

        事件组用一个整数表示,其中高 8 位留给内核使用,其他位来表示事件。(设置configUSE_16_BIT_TICKS来区分该处理器用 16/32 位更高效,事件组也对应此位数分配

        事件组和队列、信号量的区别:
特性维度 队列 / 二进制信号量 事件组
唤醒机制 事件发生时仅唤醒一个任务(单点通知) 事件发生时唤醒所有符合条件的任务(广播通知)
事件 / 资源特性 消耗型:- 队列数据读走即消失- 信号量获取后计数减少 非消耗型:事件位是 “状态标记”,被唤醒的任务可选择:1. 清除事件位(仅自己生效)2. 保留事件位(供其他任务使用)
等待逻辑 只能等待 “资源可用”(队列有数据 / 信号量可获取) 支持灵活的等待逻辑:- 等待某 1 个位(如 bit0)- 等待多个位中任意一个(或关系,如 bit0|bit1)- 等待多个位全部满足(与关系,如 bit0&bit1)
核心用途 同步(一对一)、资源传递(队列带数据) 同步(一对多)、多条件状态通知

2,事件组函数

1)创建(动态 / 静态 分配内存)

        使用事件组前,先创建句柄,用句柄来区分不同事件组

/* 创建一个事件组,返回它的句柄。
* 此函数内部会分配事件组结构体
* 返回值: 返回句柄,非 NULL 表示成功
*/
EventGroupHandle_t xEventGroupCreate( void );


/* 创建一个事件组,返回它的句柄。
* 此函数无需动态分配内存,所以需要先有一个 StaticEventGroup_t 结构体,并传入它的指针
* 返回值: 返回句柄,非 NULL 表示成功
*/
EventGroupHandle_t xEventGroupCreateStatic( StaticEventGroup_t * pxEventGroupBuffer);

2)删除

        动态创建的事件组,不用时可回收内存

/*
* xEventGroup: 事件组句柄,你要删除哪个事件组
*/
void vEventGroupDelete( EventGroupHandle_t xEventGroup )

3)设置事件(某个位、某些位)

  • 在任务中使用 xEventGroupSetBits()
  • ISR 中使用 xEventGroupSetBitsFromISR()

   有一个或多个任务在等待事件,如果这些事件符合这些任务的期望,那么任务还会被唤醒。

/* 设置事件组中的位
* xEventGroup: 哪个事件组
* uxBitsToSet: 设置哪些位?
* 如果 uxBitsToSet 的 bitX, bitY 为 1, 那么事件组中的 bitX, bitY 被设置为 1
* 可以用来设置多个位,比如 0x15 就表示设置 bit4, bit2, bit0
* 返回值: 返回原来的事件值(没什么意义, 因为很可能已经被其他任务修改了) */
EventBits_t xEventGroupSetBits( EventGroupHandle_t xEventGroup,
                                const EventBits_tuxBitsToSet );

/* 设置事件组中的位
* xEventGroup: 哪个事件组
* uxBitsToSet: 设置哪些位?
* 如果 uxBitsToSet 的 bitX, bitY 为 1, 那么事件组中的 bitX, bitY 被设置为 1
* 可以用来设置多个位,比如 0x15 就表示设置 bit4, bit2, bit0
* pxHigherPriorityTaskWoken: 有没有导致更高优先级的任务进入就绪态? pdTRUE-有, pdFALSE-没有
* 返回值: pdPASS-成功, pdFALSE-失败
*/
BaseType_t xEventGroupSetBitsFromISR( EventGroupHandle_t xEventGroup, 
                 const EventBits_t uxBitsToSet, BaseType_t * pxHigherPriorityTaskWoken );

   注意:中断最多只能唤醒 1 个任务,所以 xEventGroupSetBitsFromISR 是给一个FreeRTOS后台任务(daemon task)发送队列数据,由这个任务来设置事件组。如果后台任务的优先级比当前被中断的任务优先级高,置 *pxHigherPriorityTaskWoken 为 pdTRUE。

4)等待事件

        使用 xEventGroupWaitBits 来等待事件,可以等待某一位、某些位中的任意一个,也可以等待多位;等到期望的事件后,还可以清除某些位。

EventBits_t xEventGroupWaitBits( EventGroupHandle_t xEventGroup, 
                                 const EventBits_t uxBitsToWaitFor, 
                                 const BaseType_t xClearOnExit, 
                                 const BaseType_t xWaitForAllBits, 
                                 TickType_t xTicksToWait );
特性维度 队列 / 二进制信号量 事件组
唤醒机制 事件发生时仅唤醒一个任务(单点通知) 事件发生时唤醒所有符合条件的任务(广播通知)
事件 / 资源特性 消耗型:- 队列数据读走即消失- 信号量获取后计数减少 非消耗型:事件位是 “状态标记”,被唤醒的任务可选择:1. 清除事件位(仅自己生效)2. 保留事件位(供其他任务使用)
等待逻辑 只能等待 “资源可用”(队列有数据 / 信号量可获取) 支持灵活的等待逻辑:- 等待某 1 个位(如 bit0)- 等待多个位中任意一个(或关系,如 bit0|bit1)- 等待多个位全部满足(与关系,如 bit0&bit1)
核心用途 同步(一对一)、资源传递(队列带数据) 同步(一对多)、多条件状态通知

        可 以 使 用 设 置 xClearOnExit pdTRUE , 使 得 对 事 件 组 的 测 试 、 清 零 都 在

xEventGroupWaitBits() 函数内部完成,这是一个原子操作

5)同步点

        使用 xEventGroupSync()函数可以同步多个任务:

     ABC 做好自己的事后,还要等别人做完;大家一起做完,才可开饭

EventBits_t xEventGroupSync( EventGroupHandle_t xEventGroup, 
                             const EventBits_t uxBitsToSet, 
                             const EventBits_t uxBitsToWaitFor, 
                             TickType_t xTicksToWait );
参数 / 返回值 说明
xEventGroup 哪个事件组?
uxBitsToSet 要设置哪些事件?我完成了哪些事件?比如 0x05(二进制为 0101)会导致事件组的 bit0、bit2 被设置为 1
uxBitsToWaitFor 等待哪个位、哪些位?比如 0x15(二进制 10101),表示要等待 bit0、bit2、bit4 都为 1
xTicksToWait 如果期待的事件未发生,阻塞多久。可设置为 0:判断后即刻返回;可设置为 portMAX_DELAY:一定等到成功才返回;可设置为期望的 Tick Count,一般用 pdMS_TO_TICKS 把 ms 转换为 Tick Count
返回值 返回的是事件值。如果期待的事件发生了,返回的是 “非阻塞条件成立” 时的事件值;如果是超时退出,返回的是超时时刻的事件值。

3,示例

        广播机制,等待任意一个事件,等待多个事件都发生 的小车示例见 手册14.3

        之前的程序是创建任务每50秒读一次mpu6050的i2c信号,获得操作指令。【缺陷在于板子平放时用其他方式控制挡球板的场景会浪费cpu资源】

        解决方法:用中断(事件组)唤醒任务 --> 读i2c数据 --> 写队列

        方案:加速度计用pb5作为中断(需要先在 CueMX 中设置对应的 gpio,HAL_GPIO_EXTI_Callback写入中断处理操作设置事件组MPU6050也要配置中断引脚并使能,中断过于频繁导致程序奔溃,等待事件后运行时加入vTaskDelay(50),freertos中打开挡球板游戏完善等待事件组及操作)

        效果:只有中断写了事件后才会读i2c(提升效率)

        

Logo

openvela 操作系统专为 AIoT 领域量身定制,以轻量化、标准兼容、安全性和高度可扩展性为核心特点。openvela 以其卓越的技术优势,已成为众多物联网设备和 AI 硬件的技术首选,涵盖了智能手表、运动手环、智能音箱、耳机、智能家居设备以及机器人等多个领域。

更多推荐