FreeRTOS多任务读取同一队列问题,数据共享问题
摘要:针对多任务共享队列导致的阻塞问题,提出了三种解决方案:1)统一使用xQueueReceive避免数据残留;2)采用事件组实现任务同步;3)推荐使用共享数据区+互斥锁机制。方案3通过全局共享变量结合时间戳判断数据新旧,配合事件组通知,从根本上解决了队列饥饿和数据锁定问题。所有方案均需在主函数初始化系统资源,避免句柄丢失。三种方案均有效,其中方案3在实时性和内存效率方面表现最优,特别适合多消费者
摘要:针对多任务共享队列导致的阻塞问题,提出了三种解决方案:1)统一使用xQueueReceive避免数据残留;2)采用事件组实现任务同步;3)推荐使用共享数据区+互斥锁机制。方案3通过全局共享变量结合时间戳判断数据新旧,配合事件组通知,从根本上解决了队列饥饿和数据锁定问题。所有方案均需在主函数初始化系统资源,避免句柄丢失。三种方案均有效,其中方案3在实时性和内存效率方面表现最优,特别适合多消费者场景。
提出问题:在任务A初始化创建队列1,且由任务A为写队列任务,即生产者。任务B、任务C为读同一队列1,xQueuePeek和xQueueReceive混合使用导致的数据锁定和队列饥饿。
队列饥饿:是指某个或多个任务因为无法获得队列的使用权(例如,尝试发送但队列已满,或尝试接收但队列为空)而长期处于阻塞状态,而其他任务却持续占用队列资源的情况。
数据锁定:指的是队列的发送和接收操作中的阻塞行为,即任务在访问队列时被阻塞,直到队列可用(有空位或有数据)。这种阻塞机制实际上是一种同步机制,也可以看作是对数据的锁定(因为任务在等待数据时被挂起)。
分析问题:
1、如果任务C先运行,它使用xQueuePeek读取数据,数据仍然留在队列中,然后延迟50ms。
2、接着任务B运行,它使用xQueueReceive读取同一个队列,会成功读取并移除数据,然后延迟100ms。
3、然后任务C再次运行,再次调用xQueuePeek,但此时队列为空(因为数据已被任务B移除),所以它会阻塞等待100ms(超时时间)。
4、而任务B运行时会再次调用xQueueReceive,但队列为空,且没有生产者,所以会无限阻塞。
这样,两个任务可能因为队列为空而阻塞,但如果没有生产者任务向队列写入数据,就会一直卡死
总结:当队列为空时,任务C使用xQueuePeek会阻塞100ms,而任务B使用xQueueReceive会无限阻塞(因为超时时间为portMAX_DELAY)。如果没有生产者,两个任务都会阻塞,但任务C会在100ms后超时,然后继续运行,而任务B会一直阻塞。注释掉任务C就不卡了呢?因为这样任务B就可以独占比特队列,如果队列中有数据,它就能读取,然后显示,然后延迟100ms。但是如果没有生产者,它还是会卡在xQueueReceive上。
解决方案:
方案1:对于上一个问题来说,任务C采用xQueuePeek读取数据,只偷看数据不移除。可以将xQueuePeek换为xQueueReceive,最后最好在void MX_FREERTOS_Init(void)里创建队列,而不是在任务A里面创建。
方案1会让任务B、任务C所用(消费)的不是同一个数据。
方案2:采用事件组通知任务进行同步
该方案占用内存较大,延时较长。!!!这里声明:初始化Init_System();一定要放在主函数中初始化,否则会导致事件组句柄丢失,致使,事件组未创建成功。!!!通常会出现卡死在下面地方:
/* Check the user is not attempting to wait on the bits used by the kernel itself, and that at least one bit is being requested. */
configASSERT( xEventGroup ); #define configASSERT( x ) if ((x) == 0) {taskDISABLE_INTERRUPTS(); for( ;; );}
// 定义事件组用于同步
EventGroupHandle_t xRDataEventGroup ;
QueueHandle_t queue1;
// 初始化
void Init_System(void)
{
// 队列1创建
queue1= xQueueCreate(10, sizeof(struct DataStructure));
// 新增事件组
xRDataEventGroup = xEventGroupCreate();
}
// 修改任务A
void TaskA(void *para)
{
struct DataStructure input;
while(1)
{
/*
数据处理
*/
// 发送到队列
if(xQueueSend(queue1, &input, 0) == pdPASS)
{
// 发送成功后,设置事件标志
xEventGroupSetBits(xRDataEventGroup , 0x01);
}
vTaskDelay(10);
}
}
// 修改任务C
void TaskC(void *para)
{
struct DataStructure input;
BaseType_t xResult;
while(1)
{
// 尝试从队列读取数据(不阻塞太久)
xResult = xQueueReceive(queue1, &input, pdMS_TO_TICKS(100));
if(xResult == pdPASS)
{
// 成功读取数据
// 数据处理
}
else
{
// 队列为空,可以检查事件组看看是否有新数据被其他任务取走了
EventBits_t uxBits = xEventGroupGetBits(xRDataEventGroup );
if((uxBits & 0x01) != 0)
{
// 有新数据产生,但被其他任务取走了
// 可以尝试Peek查看(但不移除)
if(xQueuePeek(queue1, &input, 0) == pdPASS)
{
// 数据处理
}
}
}
vTaskDelay(50);
}
}
// 修改任务B
void TaskB(void *para)
{
struct DataStructure input;
EventBits_t uxBits;
while(1)
{
// 检查是否有新的速度数据
uxBits = xEventGroupWaitBits(
xVelocityEventGroup,
0x01, // 等待bit 0
pdTRUE, // 清除bit 0
pdFALSE, // 不等待所有位
0 // 不等待,立即返回
);
if((uxBits & 0x01) != 0)
{
// 有新数据,从队列读取
if(xQueueReceive(queue1, &input, 0) == pdPASS)
{
// 成功获取最新数据
}
else
{
// 数据被其他任务取走了,使用Peek查看但不移除
xQueuePeek(queue1, &input, 0);
}
}
else
{
// 没有新数据,保持上次的值或使用Peek查看当前数据
xQueuePeek(queue1, &input, 0);
}
// 显示代码...或数据处理
vTaskDelay(100);
}
}
方案3:完全使用共享数据区(推荐,解决根本问题)
对比分析:
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 共享数据区+事件组 | 1. 数据只存储一份 2. 多个任务可同时访问 3. 避免队列满/空阻塞 4. 实时性好 |
1. 需要互斥锁保护 2. 数据可能被覆盖 |
广播式数据分发 |
| 队列发送 | 1. 自带缓冲 2. 天然线程安全 3. 支持阻塞等待 |
1. 多个任务需要多个副本 2. 可能队列满 3. 内存占用多 |
点对点数据传输 |
可以解决上面两个方案导致任务B、任务C所用(消费)的不是同一个数据。
需要避免阻塞消费者不会因为队列空而阻塞,没有队列满/空的竞争,同时,总是能获取最新数据
// 全局定义
typedef struct {
float A;
float B;
uint32_t timestamp; // 添加时间戳,用于判断数据新旧
} DataStructure;
EventGroupHandle_t xRDataEventGroup;
SemaphoreHandle_t xDataMutex;
DataStructure xSharedData;
// 初始化函数
void System_Init(void)
{
// 创建事件组
xRDataEventGroup = xEventGroupCreate();
// 创建互斥锁
xDataMutex = xSemaphoreCreateMutex();
// 初始化数据
xSharedData.A = 0.0f;
xSharedData.B = 0.0f;
xSharedData.timestamp = 0;
}
// 修改任务A
void TaskA(void *para)
{
DataStructure newData;
while(1)
{
// 采集数据
略
// 填充新数据
newData.A = A;
newData.B = B;
newData.timestamp = xTaskGetTickCount(); // 记录时间戳
// 写入共享数据区(加锁保护)
xSemaphoreTake(xDataMutex, portMAX_DELAY);
xSharedData = newData;
xSemaphoreGive(xDataMutex);
// 设置事件标志,通知所有消费者
xEventGroupSetBits(xRDataEventGroup, 0x01);
vTaskDelay(10); // 100Hz更新频率
}
}
// TaskC(使用共享数据)
void TaskC(void *para)
{
DataStructure input;
EventBits_t uxBits;
uint32_t lastTimestamp = 0; // 记录上次处理的时间戳
while(1)
{
// 等待数据就绪事件(100ms超时)
uxBits = xEventGroupWaitBits(
xRDataEventGroup,
0x01,
pdTRUE, // 清除事件标志
pdFALSE,
pdMS_TO_TICKS(100)
);
if((uxBits & 0x01) != 0)
{
// 从共享数据区读取
xSemaphoreTake(xDataMutex, portMAX_DELAY);
input = xSharedData;
xSemaphoreGive(xDataMutex);
// 检查是否是新的数据(避免重复处理)
if(input.timestamp != lastTimestamp)
{
lastTimestamp = input.timestamp;
//数据处理
}
}
vTaskDelay(50);
}
}
// TaskB(使用共享数据)
void TaskB(void *para)
{
DataStructure RecData;
EventBits_t uxBits;
static uint32_t lastDisplayTimestamp = 0;
OLED_Init();
OLED_Clear();
while(1)
{
// 检查速度数据是否更新(非阻塞)
uxBits = xEventGroupGetBits(xRDataEventGroup);
if((uxBits & 0x01) != 0)
{
// 有新数据,清除事件标志
xEventGroupClearBits(xRDataEventGroup, 0x01);
// 从共享数据区读取
xSemaphoreTake(xDataMutex, portMAX_DELAY);
RecData = xSharedData;
xSemaphoreGive(xDataMutex);
lastDisplayTimestamp = RecData.timestamp;
}
else
{
// 没有新事件,但可以读取当前数据(可能和上次一样)
xSemaphoreTake(xDataMutex, portMAX_DELAY);
RecData = xSharedData;
xSemaphoreGive(xDataMutex);
}
//数据处理
vTaskDelay(100);
}
}
这里声明:初始化System_Init();一定要放在主函数中初始化,否则会导致事件组句柄丢失,致使,事件组未创建成功。!!!通常会出现卡死在下面地方:
/* Check the user is not attempting to wait on the bits used by the kernel itself, and that at least one bit is being requested. */
configASSERT( xEventGroup ); #define configASSERT( x ) if ((x) == 0) {taskDISABLE_INTERRUPTS(); for( ;; );}
最后,三个方案亲测有效,可以根据实际情况修改。
openvela 操作系统专为 AIoT 领域量身定制,以轻量化、标准兼容、安全性和高度可扩展性为核心特点。openvela 以其卓越的技术优势,已成为众多物联网设备和 AI 硬件的技术首选,涵盖了智能手表、运动手环、智能音箱、耳机、智能家居设备以及机器人等多个领域。
更多推荐
所有评论(0)