摘要:针对多任务共享队列导致的阻塞问题,提出了三种解决方案:1)统一使用xQueueReceive避免数据残留;2)采用事件组实现任务同步;3)推荐使用共享数据区+互斥锁机制。方案3通过全局共享变量结合时间戳判断数据新旧,配合事件组通知,从根本上解决了队列饥饿和数据锁定问题。所有方案均需在主函数初始化系统资源,避免句柄丢失。三种方案均有效,其中方案3在实时性和内存效率方面表现最优,特别适合多消费者场景。

提出问题:在任务A初始化创建队列1,且由任务A为写队列任务,即生产者。任务B、任务C为读同一队列1,xQueuePeekxQueueReceive混合使用导致的数据锁定和队列饥饿。

        队列饥饿:是指某个或多个任务因为无法获得队列的使用权(例如,尝试发送但队列已满,或尝试接收但队列为空)而长期处于阻塞状态,而其他任务却持续占用队列资源的情况。

        数据锁定:指的是队列的发送和接收操作中的阻塞行为,即任务在访问队列时被阻塞,直到队列可用(有空位或有数据)。这种阻塞机制实际上是一种同步机制,也可以看作是对数据的锁定(因为任务在等待数据时被挂起)。

分析问题:        

 1、如果任务C先运行,它使用xQueuePeek读取数据,数据仍然留在队列中,然后延迟50ms。

 2、接着任务B运行,它使用xQueueReceive读取同一个队列,会成功读取并移除数据,然后延迟100ms。

 3、然后任务C再次运行,再次调用xQueuePeek,但此时队列为空(因为数据已被任务B移除),所以它会阻塞等待100ms(超时时间)。

 4、而任务B运行时会再次调用xQueueReceive,但队列为空,且没有生产者,所以会无限阻塞。

        这样,两个任务可能因为队列为空而阻塞,但如果没有生产者任务向队列写入数据,就会一直卡死

总结:当队列为空时,任务C使用xQueuePeek会阻塞100ms,而任务B使用xQueueReceive会无限阻塞(因为超时时间为portMAX_DELAY)。如果没有生产者,两个任务都会阻塞,但任务C会在100ms后超时,然后继续运行,而任务B会一直阻塞。注释掉任务C就不卡了呢?因为这样任务B就可以独占比特队列,如果队列中有数据,它就能读取,然后显示,然后延迟100ms。但是如果没有生产者,它还是会卡在xQueueReceive上。

解决方案:

方案1:对于上一个问题来说,任务C采用xQueuePeek读取数据,只偷看数据不移除。可以将xQueuePeek换为xQueueReceive,最后最好在void MX_FREERTOS_Init(void)里创建队列,而不是在任务A里面创建。

        方案1会让任务B、任务C所用(消费)的不是同一个数据。

方案2:采用事件组通知任务进行同步

该方案占用内存较大,延时较长。!!!这里声明:初始化Init_System();一定要放在主函数中初始化,否则会导致事件组句柄丢失,致使,事件组未创建成功。!!!通常会出现卡死在下面地方:

/* Check the user is not attempting to wait on the bits used by the kernel itself, and that at least one bit is being requested. */

configASSERT( xEventGroup ); #define configASSERT( x ) if ((x) == 0) {taskDISABLE_INTERRUPTS(); for( ;; );}

// 定义事件组用于同步 
EventGroupHandle_t xRDataEventGroup ;
QueueHandle_t queue1;
//  初始化
void Init_System(void)
{
    // 队列1创建
    queue1= xQueueCreate(10, sizeof(struct DataStructure));
    
    // 新增事件组
    xRDataEventGroup = xEventGroupCreate();
}

//  修改任务A 
void TaskA(void *para)
{
    struct DataStructure input;
    while(1)
    {
        /*
            数据处理
        */
        
        // 发送到队列
        if(xQueueSend(queue1, &input, 0) == pdPASS)
        {
            // 发送成功后,设置事件标志
            xEventGroupSetBits(xRDataEventGroup , 0x01);
        }
        vTaskDelay(10);
    }
}

//  修改任务C
void TaskC(void *para)
{
    struct DataStructure input;
    BaseType_t xResult;
    
    while(1)
    {
        // 尝试从队列读取数据(不阻塞太久)
        xResult = xQueueReceive(queue1, &input, pdMS_TO_TICKS(100));
        
        if(xResult == pdPASS)
        {
            // 成功读取数据
            // 数据处理
        }
        else
        {
            // 队列为空,可以检查事件组看看是否有新数据被其他任务取走了
            EventBits_t uxBits = xEventGroupGetBits(xRDataEventGroup );
            if((uxBits & 0x01) != 0)
            {
                // 有新数据产生,但被其他任务取走了
                // 可以尝试Peek查看(但不移除)
                if(xQueuePeek(queue1, &input, 0) == pdPASS)
                {
                  // 数据处理
                }
            }
        }
        
        vTaskDelay(50);
    }
}

// 修改任务B
void TaskB(void *para)
{
    struct DataStructure input;
    EventBits_t uxBits;
    
    while(1)
    {

        // 检查是否有新的速度数据
        uxBits = xEventGroupWaitBits(
            xVelocityEventGroup,
            0x01,                    // 等待bit 0
            pdTRUE,                  // 清除bit 0
            pdFALSE,                 // 不等待所有位
            0                        // 不等待,立即返回
        );
        
        if((uxBits & 0x01) != 0)
        {
            // 有新数据,从队列读取
            if(xQueueReceive(queue1, &input, 0) == pdPASS)
            {
                // 成功获取最新数据
            }
            else
            {
                // 数据被其他任务取走了,使用Peek查看但不移除
                xQueuePeek(queue1, &input, 0);
            }
        }
        else
        {
            // 没有新数据,保持上次的值或使用Peek查看当前数据
            xQueuePeek(queue1, &input, 0);
        }
        
        // 显示代码...或数据处理
        vTaskDelay(100);
    }
}

方案3:完全使用共享数据区(推荐,解决根本问题)

对比分析:

方案 优点 缺点 适用场景
共享数据区+事件组 1. 数据只存储一份
2. 多个任务可同时访问
3. 避免队列满/空阻塞
4. 实时性好
1. 需要互斥锁保护
2. 数据可能被覆盖
广播式数据分发
队列发送 1. 自带缓冲
2. 天然线程安全
3. 支持阻塞等待
1. 多个任务需要多个副本
2. 可能队列满
3. 内存占用多
点对点数据传输

可以解决上面两个方案导致任务B、任务C所用(消费)的不是同一个数据。

需要避免阻塞消费者不会因为队列空而阻塞,没有队列满/空的竞争,同时,总是能获取最新数据

//  全局定义 
typedef struct {
    float A;
    float B;
    uint32_t timestamp;  // 添加时间戳,用于判断数据新旧
} DataStructure;

EventGroupHandle_t xRDataEventGroup;
SemaphoreHandle_t xDataMutex;
DataStructure xSharedData;

// 初始化函数 
void System_Init(void)
{
    // 创建事件组
    xRDataEventGroup = xEventGroupCreate();
    
    // 创建互斥锁
    xDataMutex = xSemaphoreCreateMutex();
    
    // 初始化数据
    xSharedData.A = 0.0f;
    xSharedData.B = 0.0f;
    xSharedData.timestamp = 0;
}

//  修改任务A 
void TaskA(void *para)
{
    DataStructure newData;

    while(1)
    {
        // 采集数据
        略
        // 填充新数据
        newData.A = A;
        newData.B = B;
        newData.timestamp = xTaskGetTickCount();  // 记录时间戳
        
        // 写入共享数据区(加锁保护)
        xSemaphoreTake(xDataMutex, portMAX_DELAY);
        xSharedData = newData;
        xSemaphoreGive(xDataMutex);
        
        // 设置事件标志,通知所有消费者
        xEventGroupSetBits(xRDataEventGroup, 0x01);
        
        vTaskDelay(10);  // 100Hz更新频率
    }
}

//  TaskC(使用共享数据)
void TaskC(void *para)
{
    DataStructure input;
    EventBits_t uxBits;
    uint32_t lastTimestamp = 0;  // 记录上次处理的时间戳
    
    while(1)
    {
        // 等待数据就绪事件(100ms超时)
        uxBits = xEventGroupWaitBits(
            xRDataEventGroup,
            0x01,
            pdTRUE,  // 清除事件标志
            pdFALSE,
            pdMS_TO_TICKS(100)
        );
        
        if((uxBits & 0x01) != 0)
        {
            // 从共享数据区读取
            xSemaphoreTake(xDataMutex, portMAX_DELAY);
            input = xSharedData;
            xSemaphoreGive(xDataMutex);
            
            // 检查是否是新的数据(避免重复处理)
            if(input.timestamp != lastTimestamp)
            {
                lastTimestamp = input.timestamp;
                //数据处理
            }
        }
        vTaskDelay(50);
    }
}

// TaskB(使用共享数据)
void TaskB(void *para)
{
    DataStructure RecData;
    EventBits_t uxBits;
    static uint32_t lastDisplayTimestamp = 0;
    OLED_Init();
    OLED_Clear();
    
    while(1)
    {
        
        // 检查速度数据是否更新(非阻塞)
        uxBits = xEventGroupGetBits(xRDataEventGroup);
        
        if((uxBits & 0x01) != 0)
        {
            // 有新数据,清除事件标志
            xEventGroupClearBits(xRDataEventGroup, 0x01);
            
            // 从共享数据区读取
            xSemaphoreTake(xDataMutex, portMAX_DELAY);
            RecData = xSharedData;
            xSemaphoreGive(xDataMutex);
            
            lastDisplayTimestamp = RecData.timestamp;
        }
        else
        {
            // 没有新事件,但可以读取当前数据(可能和上次一样)
            xSemaphoreTake(xDataMutex, portMAX_DELAY);
            RecData = xSharedData;
            xSemaphoreGive(xDataMutex);
        }
	
	//数据处理
        
        vTaskDelay(100);
    }
}

        这里声明:初始化System_Init();一定要放在主函数中初始化,否则会导致事件组句柄丢失,致使,事件组未创建成功。!!!通常会出现卡死在下面地方:

/* Check the user is not attempting to wait on the bits used by the kernel itself, and that at least one bit is being requested. */

configASSERT( xEventGroup ); #define configASSERT( x ) if ((x) == 0) {taskDISABLE_INTERRUPTS(); for( ;; );}

最后,三个方案亲测有效,可以根据实际情况修改。

Logo

openvela 操作系统专为 AIoT 领域量身定制,以轻量化、标准兼容、安全性和高度可扩展性为核心特点。openvela 以其卓越的技术优势,已成为众多物联网设备和 AI 硬件的技术首选,涵盖了智能手表、运动手环、智能音箱、耳机、智能家居设备以及机器人等多个领域。

更多推荐