AliOS Things 事件服务与 lwIP 套接字集成架构Reactor事件服务系统分析
本文分析了AliOS事件服务与lwIP套接字集成架构的设计与实现。该系统采用分层架构设计,包含应用层、事件服务层、lwIP套接字适配层和硬件抽象层。核心的事件服务采用单例模式实现,通过RPC机制和Reactor模式处理文件描述符事件和定时事件。重点介绍了事件订阅、发布机制以及select任务的核心循环流程,展现了如何通过微服务架构实现跨任务事件通信。系统通过接口隔离、回调机制和消息传递等设计实现了
·
源码:
/*
*
*
*/
/* 标准C库头文件 */
#include <stdlib.h> // 标准库函数:malloc、free等
#include <stdint.h> // 标准整数类型:uint32_t等
#include <string.h> // 字符串操作函数
/* AliOS Things 系统头文件 */
#include <aos/debug.h> // 调试断言宏:aos_assert等
#include <aos/list.h> // 双向链表实现:dlist_t等
#include <yoc/uservice.h> // 微服务框架:uservice_t、rpc_t等
#include <yoc/event.h> // 事件系统接口
#include <devices/driver.h> // 设备驱动框架
#include <sys/socket.h> // 套接字接口
#include <sys/select.h> // select多路复用IO
/* 内部头文件 */
#include "internal.h" // 模块内部定义
/* 常量定义 */
#define FD_MAX_STEMP 8 // FD最大步进值,用于select循环检测
/*
* 全局事件服务控制结构体 - 单例模式
* 总大小估算:约40-80字节(依赖指针大小和对齐)
*/
static struct event_call {
uservice_t *svr; // 微服务实例指针,4-8字节
event_list_t event; // 事件列表结构体,大小依赖实现,约20-40字节
dlist_t timeouts; // 超时事件链表头,16字节(两个指针)
int event_id; // 事件ID,4字节
void *data; // 事件数据指针,4-8字节
aos_task_t select_task; // select任务句柄,4-8字节
aos_sem_t select_sem; // 选择信号量,4-8字节
aos_event_t wait_event; // 等待事件,4-8字节
} ev_service; // 静态全局变量,整个模块唯一实例
/*
* 事件参数结构体 - 用于RPC通信
* 总大小:约24-32字节(考虑对齐)
*/
struct event_param {
uint32_t event_id; // 事件ID,4字节
union { // 联合体,节省内存
event_callback_t cb; // 事件回调函数指针,4-8字节
uint32_t timeout; // 超时时间,4字节
};
void *data; // 用户数据指针,4-8字节
dlist_t next; // 链表节点,16字节(两个指针)
};
/*
* RPC命令枚举 - 命令模式实现
* 用于区分不同的事件操作类型
*/
enum {
CMD_SUB_FD_EVENT, // 订阅文件描述符事件:0
CMD_SUB_EVENT, // 订阅普通事件:1
CMD_REMOVE_FD_EVENT, // 移除FD事件订阅:2
CMD_REMOVE_EVENT, // 移除普通事件订阅:3
CMD_PUBLISH_EVENT, // 发布普通事件:4
CMD_PUBLISH_FD_EVENT, // 发布FD事件:5
};
/* 事件操作掩码定义 */
#define EVENT_SUBSCRIBE 0x0000FF00 // 订阅掩码
#define EVENT_UNSUBSCRIBE 0x000000FF // 取消订阅掩码
/* 函数前置声明 */
static void select_task_entry(void *arg); // select任务入口函数
/*
* RPC请求处理函数 - 命令模式的核心分发器
* 参数:
* context - 上下文指针(未使用)
* rpc - RPC消息结构体
* 返回值:处理状态,0表示成功
* 设计模式:命令模式 + 工厂方法模式
* 性能:在微服务任务上下文中执行,避免竞态条件
*/
static int process_rpc(void *context, rpc_t *rpc)
{
int size = -1; // 缓冲区大小变量,4字节
/* 从RPC消息中提取事件参数 */
struct event_param *param = (struct event_param *)rpc_get_buffer(rpc, &size);
// param指向共享内存区,生命周期由RPC框架管理
/* 安全性检查:确保参数大小正确 */
aos_assert(size == sizeof(struct event_param)); // 断言保护,防止内存越界
/* 命令分发器 - 基于命令ID路由到不同处理方法 */
switch (rpc->cmd_id) { // cmd_id为枚举值,4字节
case CMD_SUB_FD_EVENT: // 订阅FD事件命令
/* 功能:向事件列表注册文件描述符事件回调 */
eventlist_subscribe_fd(&ev_service.event, param->event_id, param->cb, param->data);
/* 通知select任务有新的FD事件注册 */
aos_sem_signal(&ev_service.select_sem); // 信号量操作,线程安全
break;
case CMD_REMOVE_FD_EVENT: // 移除FD事件订阅
/* 功能:从事件列表取消文件描述符事件注册 */
eventlist_unsubscribe_fd(&ev_service.event, param->event_id, param->cb, param->data);
break;
case CMD_PUBLISH_FD_EVENT: // 发布FD事件
/* 功能:触发文件描述符相关的事件回调 */
eventlist_publish_fd(&ev_service.event, param->event_id, param->data);
break;
case CMD_SUB_EVENT: // 订阅普通事件
/* 功能:注册普通事件回调函数 */
eventlist_subscribe(&ev_service.event, param->event_id, param->cb, param->data);
break;
case CMD_REMOVE_EVENT: // 移除普通事件订阅
/* 功能:取消普通事件回调注册 */
eventlist_unsubscribe(&ev_service.event, param->event_id, param->cb, param->data);
break;
case CMD_PUBLISH_EVENT: // 发布普通事件(支持延迟发布)
/* 延迟事件处理:如果设置了超时时间 */
if (param->timeout > 0) { // 超时时间检查,4字节比较
/* 动态分配超时事件参数 - 生命周期由超时链表管理 */
struct event_param *timer = aos_malloc(sizeof(struct event_param)); // 分配24-32字节
if (timer == NULL) // 内存分配失败检查
break; // 优雅降级,不处理内存分配失败
/* 设置超时事件参数 */
timer->timeout = aos_now_ms() + param->timeout; // 计算绝对超时时间
timer->event_id = param->event_id; // 复制事件ID
timer->data = param->data; // 复制事件数据
/* 在超时链表中按时间顺序插入 - 有序链表维护 */
struct event_param *node; // 链表遍历指针
dlist_for_each_entry(&ev_service.timeouts, node, struct event_param, next) {
/* 找到第一个超时时间大于当前节点的时间位置 */
if (timer->timeout < node->timeout) // 时间比较
break; // 找到插入位置
}
/* 在找到的节点前插入新节点 */
dlist_add_tail(&timer->next, &node->next); // 链表操作,O(n)复杂度
/* 通知select任务有新的超时事件 */
aos_sem_signal(&ev_service.select_sem); // 唤醒select任务重新计算超时
} else {
/* 立即发布事件:无延迟 */
eventlist_publish(&ev_service.event, param->event_id, param->data);
}
break;
}
/* RPC响应回复:通知调用方处理完成 */
rpc_reply(rpc); // 发送响应消息
return 0; // 成功返回
}
/*
* 事件服务初始化函数 - 单例初始化模式
* 参数:task - 微服务任务上下文,如果为NULL则创建新任务
* 返回值:初始化状态,0成功,-1失败
* 设计模式:工厂方法模式
* 性能:系统启动时调用一次,初始化所有资源
*/
int event_service_init(utask_t *task)
{
/* 参数检查:如果传入任务为空,创建新任务 */
if (task == NULL)
/* 创建专用事件服务任务 */
task = utask_new("event_svr", 2*1024, QUEUE_MSG_COUNT * 5, AOS_DEFAULT_APP_PRI);
// 任务参数:名称"event_svr",栈大小2KB,消息队列容量扩大5倍,默认优先级
/* 任务创建失败检查 */
if (task == NULL)
return -1; // 返回错误码
/* 初始化事件列表数据结构 */
eventlist_init(&ev_service.event); // 初始化事件回调列表
dlist_init(&ev_service.timeouts); // 初始化超时事件链表
aos_sem_new(&ev_service.select_sem, 0); // 创建二进制信号量,初始值为0
/* 创建事件微服务实例 */
ev_service.svr = uservice_new("event_svr", process_rpc, NULL);
// 参数:服务名,RPC处理函数,上下文(NULL)
/* 初始化等待事件对象 */
aos_event_new(&ev_service.wait_event, 0); // 创建事件标志,初始状态为0
/* 创建select专用任务 - 用于FD事件监听 */
aos_task_new_ext(&ev_service.select_task, "select", select_task_entry, NULL,
1024, AOS_DEFAULT_APP_PRI);
// 参数:任务句柄,任务名,入口函数,参数,栈大小1KB,默认优先级
/* 将事件服务添加到任务中管理 */
utask_add(task, ev_service.svr); // 建立任务-服务关联
return 0; // 成功返回
}
/*
* 事件调用封装函数 - 门面模式(Facade Pattern)
* 功能:统一封装RPC调用过程
* 参数:
* param - 事件参数结构体指针
* cmd_id - 命令ID
* sync - 是否同步调用标志
* 设计模式:门面模式 + 模板方法模式
* 性能:同步调用会阻塞,异步调用立即返回
*/
static void event_call(struct event_param *param, int cmd_id, int sync)
{
rpc_t rpc; // RPC消息结构体,栈变量,大小约32-64字节
/* 初始化RPC消息 */
if (rpc_init(&rpc, cmd_id, sync ? AOS_WAIT_FOREVER : 0) == 0) {
/* 将事件参数复制到RPC缓冲区 */
rpc_put_buffer(&rpc, param, sizeof(struct event_param )); // 数据拷贝,24-32字节
/* 调用微服务处理请求 */
uservice_call(ev_service.svr, &rpc); // 跨任务消息传递
/* 释放RPC资源 */
rpc_deinit(&rpc); // 清理内部资源
}
}
/*
* 发布文件描述符事件 - 观察者模式通知
* 参数:
* event_id - 文件描述符(作为事件ID)
* data - 事件数据
* sync - 同步标志
* 性能:通过RPC异步通信,不阻塞调用者
*/
void event_publish_fd(uint32_t event_id, void *data, int sync)
{
struct event_param param; // 栈变量,24-32字节
/* 设置事件参数 */
param.event_id = event_id; // FD作为事件ID
param.data = data; // 用户数据
param.cb = NULL; // 回调函数为空(发布时不需要)
/* 调用RPC封装函数 */
event_call(¶m, CMD_PUBLISH_FD_EVENT, sync); // 命令:发布FD事件
}
/*
* 订阅文件描述符事件 - 观察者模式注册
* 参数:
* fd - 文件描述符
* cb - 事件回调函数
* context - 用户上下文
* 设计模式:观察者模式
*/
void event_subscribe_fd(uint32_t fd, event_callback_t cb, void *context)
{
aos_assert(cb); // 断言:回调函数不能为空
struct event_param param; // 栈变量
/* 设置订阅参数 */
param.event_id = fd; // 文件描述符作为事件ID
param.cb = cb; // 事件发生时的回调函数
param.data = context; // 回调时的用户上下文
/* 异步调用订阅命令 */
event_call(¶m, CMD_SUB_FD_EVENT, 0); // 命令:订阅FD事件
}
/*
* 取消订阅文件描述符事件
* 参数:
* event_id - 事件ID(文件描述符)
* cb - 要移除的回调函数
* context - 用户上下文
*/
void event_unsubscribe_fd(uint32_t event_id, event_callback_t cb, void *context)
{
struct event_param param; // 栈变量
param.event_id = event_id; // 目标文件描述符
param.cb = cb; // 要移除的回调函数
param.data = context; // 对应的用户上下文
event_call(¶m, CMD_REMOVE_FD_EVENT, 0); // 命令:移除FD事件订阅
}
/*
* 发布普通事件(立即执行)
* 参数:
* event_id - 事件ID
* data - 事件数据
*/
void event_publish(uint32_t event_id, void *data)
{
struct event_param param; // 栈变量
param.event_id = event_id; // 事件标识符
param.data = data; // 事件关联数据
param.timeout = 0; // 超时时间为0,表示立即发布
event_call(¶m, CMD_PUBLISH_EVENT, 0); // 命令:发布普通事件
}
/*
* 发布延迟事件 - 定时器模式
* 参数:
* event_id - 事件ID
* data - 事件数据
* timeout - 延迟时间(毫秒)
* 设计模式:定时器模式 + 延迟执行模式
*/
void event_publish_delay(uint32_t event_id, void *data, int timeout)
{
struct event_param param; // 栈变量
param.event_id = event_id; // 事件标识符
param.data = data; // 事件数据
param.timeout = timeout; // 延迟时间(毫秒)
event_call(¶m, CMD_PUBLISH_EVENT, 0); // 命令:发布延迟事件
}
/*
* 订阅普通事件
* 参数:
* event_id - 事件ID
* cb - 事件回调函数
* context - 用户上下文
*/
void event_subscribe(uint32_t event_id, event_callback_t cb, void *context)
{
aos_assert(cb); // 断言:回调函数不能为空
struct event_param param; // 栈变量
param.event_id = event_id; // 要监听的事件类型
param.cb = cb; // 事件触发时的处理函数
param.data = context; // 传递给回调函数的上下文
event_call(¶m, CMD_SUB_EVENT, 0); // 命令:订阅普通事件
}
/*
* 取消订阅普通事件
* 参数:
* event_id - 事件ID
* cb - 要移除的回调函数
* context - 用户上下文
*/
void event_unsubscribe(uint32_t event_id, event_callback_t cb, void *context)
{
aos_assert(cb); // 断言:回调函数不能为空
struct event_param param; // 栈变量
param.event_id = event_id; // 目标事件类型
param.cb = cb; // 要移除的特定回调函数
param.data = context; // 对应的用户上下文
event_call(¶m, CMD_REMOVE_EVENT, 0); // 命令:移除事件订阅
}
/*
* 处理超时事件函数 - 定时器管理核心
* 功能:检查并触发已到期的延迟事件
* 返回值:下一个超时事件的剩余时间(毫秒),-1表示无超时事件
* 设计模式:定时器轮询模式
* 性能:O(n)复杂度,n为超时事件数量
*/
static int do_time_event()
{
int delayed_ms = -1; // 返回值,默认-1表示无超时事件
struct event_param *node; // 当前节点指针
dlist_t *tmp; // 临时指针用于安全删除
/* 加锁保护超时链表 - 临界区开始 */
uservice_lock(ev_service.svr); // 获取微服务锁
/* 安全遍历超时链表(支持在遍历时删除) */
dlist_for_each_entry_safe(&ev_service.timeouts, tmp, node, struct event_param, next) {
long long now = aos_now_ms(); // 获取当前系统时间(毫秒)
/* 检查事件是否超时 */
if (now >= node->timeout) { // 当前时间 >= 超时时间
/* 触发超时事件:发布对应的事件 */
event_publish(node->event_id, node->data); // 发布事件到事件系统
/* 从链表中移除已触发的节点 */
dlist_del(&node->next); // 链表节点删除操作
/* 释放节点内存 */
aos_free(node); // 释放动态分配的event_param
} else {
/* 计算下一个超时事件的剩余时间 */
delayed_ms = node->timeout - now; // 剩余时间 = 超时时间 - 当前时间
break; // 链表按时间排序,第一个未超时节点就是下一个要处理的
}
}
/* 释放锁 - 临界区结束 */
uservice_unlock(ev_service.svr);
return delayed_ms; // 返回下一个超时事件的等待时间
}
/* select超时时间常量 */
#define SELECT_TIMEOUT (10) // 默认select超时时间10毫秒
/*
* select任务入口函数 - Reactor模式核心
* 功能:监听文件描述符事件和超时事件
* 参数:arg - 任务参数(未使用)
* 设计模式:Reactor模式 + 多路复用IO
* 性能:单线程处理所有IO事件,高效的事件驱动架构
*/
static void select_task_entry(void *arg)
{
event_list_t *evlist = &ev_service.event; // 获取事件列表指针
utask_t *task = ev_service.svr->task; // 获取微服务任务上下文
/* 系统初始化 */
sys_init(); // 底层系统初始化
/* 主事件循环 - 持续处理IO和超时事件 */
while (1) {
fd_set readfds; // 读文件描述符集合,大小依赖FD_SETSIZE
struct timeval timeout; // select超时时间结构体,8字节
/* 处理超时事件并获取下一个超时时间 */
int time_ms = do_time_event(); // 返回下一个超时等待时间
/* 设置要监听的文件描述符集合 */
int max_fd = eventlist_setfd(evlist, &readfds); // 返回最大文件描述符
/* 设置select超时时间 */
timeout.tv_sec = time_ms / 1000; // 秒部分
timeout.tv_usec = (time_ms % 1000) * 1000; // 微秒部分
/*
* 执行select系统调用 - 多路复用IO核心
* 参数:最大fd+1,读集合,写集合,错误集合,超时时间,唤醒信号量
* 功能:阻塞等待文件描述符就绪或超时
*/
int ret = select2(max_fd + 1, &readfds, NULL, NULL,
time_ms == -1 ? NULL : &timeout, &ev_service.select_sem);
/*
* select2返回值:
* >0 : 就绪的文件描述符数量
* =0 : 超时
* <0 : 错误
*/
/* 有文件描述符就绪 */
if (ret > 0) {
/* 遍历所有可能的文件描述符 */
for (int fd = 0; fd <= max_fd; fd++) {
/* 检查该文件描述符是否在就绪集合中 */
if (FD_ISSET(fd, &readfds)) {
/*
* 流量控制:检查任务消息队列是否接近满负荷
* 避免事件风暴导致消息队列溢出
*/
if(aos_queue_get_count(&task->queue) < (task->queue_count*3/4)) {
/* 发布FD事件 - 同步方式确保顺序 */
event_publish_fd(fd, NULL, 1); // 同步发布,阻塞直到处理完成
// 注意:这里传递的data为NULL,实际使用时可传递FD相关数据
}
// 注释掉的代码:eventlist_remove_fd(evlist, fd);
// 原设计考虑:处理一次后移除监听,但当前实现是持久监听
}
}
}
// 注意:select返回0(超时)或负数(错误)时,继续循环等待
}
/* 任务退出(实际上不会执行到这里) */
aos_task_exit(0); // 安全退出任务
}
第一部分:系统集成架构树形分析
AliOS事件服务 + lwIP套接字集成架构 │ ├── 应用层 (Application Layer) │ ├── 网络应用程序 │ │ ├── 调用标准BSD套接字API: socket(), bind(), connect(), select() │ │ └── 通过lwip_xxx()函数映射到lwIP实现 │ └── 其他应用程序 │ ├── 使用event_subscribe_fd()注册文件描述符事件 │ └── 通过事件回调接收网络数据 │ ├── 事件服务层 (Event Service Layer) - 当前分析的核心 │ ├── 事件分发核心 (event_service.c) │ │ ├── ev_service全局单例 - 事件服务控制中心 │ │ ├── process_rpc() - RPC命令分发器 │ │ └── select_task_entry() - Reactor事件循环 │ ├── 事件列表管理 (event_list.c) │ │ ├── eventlist_subscribe_fd() - FD事件注册 │ │ ├── eventlist_publish_fd() - FD事件发布 │ │ └── eventlist_setfd() - 设置select监听集合 │ └── 定时器管理 │ ├── 超时事件链表管理 │ └── do_time_event() - 定时事件处理 │ ├── lwIP套接字适配层 (Socket Adapter Layer) │ ├── lwip_sockets.c - BSD套接字API实现 │ │ ├── lwip_select() - 多路复用实现 │ │ ├── lwip_fcntl() - 文件控制 │ │ └── lwip_ioctl() - IO控制 │ ├── 网络连接管理 │ │ ├── netconn结构体 - lwIP网络连接抽象 │ │ └── API消息传递机制 │ └── 协议栈接口 │ ├── TCP/IP协议处理 │ └── 数据包收发队列 │ └── 硬件抽象层 (Hardware Abstraction Layer) ├── 网络设备驱动 ├── 系统调用适配 └── 中断处理机制
第二部分:事件服务与lwIP select集成数据流分析
/*
* lwIP select 与 AliOS 事件服务集成数据流
* 关键函数调用链分析
*/
/* lwIP套接字层 - select系统调用实现 */
int lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset,
fd_set *exceptset, struct timeval *timeout)
{
/*
* 数据流路径:
* 1. 参数验证和初始化
* 2. 将BSD套接字描述符转换为lwIP内部描述符
* 3. 检查每个套接字的状态
* 4. 如果有就绪套接字,立即返回
* 5. 否则调用sys_arch_wait()进入等待
*/
/* 关键:将BSD套接字FD转换为lwIP内部FD */
for (int i = 0; i < maxfdp1; i++) {
if (FD_ISSET(i, readset)) {
int lwip_fd = i - LWIP_SOCKET_OFFSET; /* FD转换 */
if (lwip_fd >= 0 && lwip_fd < MEMP_NUM_NETCONN) {
/* 检查套接字状态 */
if (lwip_socket_readable(lwip_fd)) {
FD_SET(i, &ready_set);
}
}
}
}
/* 设计模式:Bridge模式 - 连接BSD API和lwIP内部实现 */
return num_ready;
}
/*
* sys_arch_wait() - 系统架构等待函数
* 这是lwIP与AliOS事件系统集成的关键点
*/
err_t sys_arch_wait(sys_sem_t *sem, u32_t timeout)
{
/*
* 集成数据流:
* lwIP协议栈 → sys_arch_wait() → AliOS信号量 → 事件服务任务
*
* 当lwIP有网络数据到达时,会通过此函数唤醒等待的select调用
*/
return aos_sem_wait(sem, timeout); /* 委托给AliOS信号量系统 */
}
第三部分:事件服务RPC机制与lwIP集成分析
/*
* RPC命令处理与lwIP文件描述符事件集成
* 在process_rpc()函数中的关键处理逻辑
*/
static int process_rpc(void *context, rpc_t *rpc)
{
struct event_param *param = (struct event_param *)rpc_get_buffer(rpc, &size);
switch (rpc->cmd_id) {
case CMD_SUB_FD_EVENT: /* 文件描述符事件订阅 */
/*
* 关键集成点:将lwIP套接字FD注册到事件系统
* 数据流:应用层 → RPC → 事件服务 → lwIP套接字
*/
eventlist_subscribe_fd(&ev_service.event, param->event_id, param->cb, param->data);
/*
* param->event_id 实际上是lwIP套接字文件描述符
* 这个FD会被添加到事件系统的监听列表中
*/
aos_sem_signal(&ev_service.select_sem); /* 唤醒select任务 */
break;
case CMD_PUBLISH_FD_EVENT: /* 文件描述符事件发布 */
/*
* 当lwIP套接字有数据可读时,通过此命令发布事件
* 数据流:select任务 → RPC → 事件服务 → 应用回调
*/
eventlist_publish_fd(&ev_service.event, param->event_id, param->data);
break;
}
}
/*
* select任务中的lwIP套接字事件检测
* 这是事件系统与lwIP集成的核心循环
*/
static void select_task_entry(void *arg)
{
while (1) {
fd_set readfds;
struct timeval timeout;
/* 1. 处理超时事件 */
int time_ms = do_time_event();
/* 2. 设置要监听的FD集合 - 关键集成点 */
int max_fd = eventlist_setfd(&ev_service.event, &readfds);
/*
* eventlist_setfd() 会遍历所有注册的FD(包括lwIP套接字)
* 并将它们设置到readfds集合中供select监听
*/
/* 3. 调用select等待事件 - 这里集成了lwIP的select实现 */
int ret = select2(max_fd + 1, &readfds, NULL, NULL,
time_ms == -1 ? NULL : &timeout, &ev_service.select_sem);
/* 4. 处理就绪的lwIP套接字 */
if (ret > 0) {
for (int fd = 0; fd <= max_fd; fd++) {
if (FD_ISSET(fd, &readfds)) {
/* 流量控制检查 */
if(aos_queue_get_count(&task->queue) < (task->queue_count*3/4)) {
/*
* 关键:发布FD事件,通知相关应用
* 这个fd可能是lwIP套接字,当有网络数据到达时触发
*/
event_publish_fd(fd, NULL, 1); /* 同步发布确保顺序 */
}
}
}
}
}
}
第四部分:内存数据结构和大小详细分析
事件服务 + lwIP集成内存结构树形分析: │ ├── 全局事件服务实例 (40-80字节) │ ├── ev_service (静态全局单例) │ │ ├── uservice_t *svr: 4-8字节 - 微服务实例 │ │ ├── event_list_t event: 20-40字节 - 事件列表 │ │ ├── dlist_t timeouts: 16字节 - 超时链表头 │ │ ├── aos_task_t select_task: 4-8字节 - select任务 │ │ ├── aos_sem_t select_sem: 4-8字节 - 选择信号量 │ │ └── aos_event_t wait_event: 4-8字节 - 等待事件 │ │ │ └── 设计模式:单例模式 - 确保全局唯一事件服务实例 │ ├── 事件参数结构体 (24-32字节) - RPC通信载体 │ ├── struct event_param │ │ ├── event_id: 4字节 - 事件ID(lwIP套接字FD) │ │ ├── union: 4-8字节 - 回调函数或超时时间 │ │ ├── data: 4-8字节 - 用户数据指针 │ │ └── dlist_t next: 16字节 - 链表节点 │ │ │ └── 设计模式:数据传输对象 - 统一RPC参数格式 │ ├── lwIP套接字描述符映射 │ ├── FD转换机制:lwip_fd = bsdfd - LWIP_SOCKET_OFFSET │ ├── FD_SETSIZE = MEMP_NUM_NETCONN = 16(默认) │ └── fd_set大小:(16+7)/8 = 2字节位图 │ └── 网络连接内存池 ├── MEMP_NUM_NETCONN: 16个网络连接 ├── 每个netconn结构体: ~64字节 └── 总网络连接内存: 16 * 64 = 1024字节
第五部分:关键集成点函数调用流程分析
lwIP网络数据到达事件完整处理流程: │ ├── 阶段1:网络数据到达 │ ├── 硬件中断 → 网络驱动 → lwIP协议栈 │ ├── lwIP处理数据包,标记套接字可读 │ └→ sys_sem_signal() 唤醒等待的select │ ├── 阶段2:select检测到就绪套接字 │ ├── select2() 返回 > 0,指示有FD就绪 │ ├── select任务遍历readfds集合 │ ├→ 找到就绪的lwIP套接字FD │ └→ event_publish_fd(fd, NULL, 1) │ ├── 阶段3:RPC事件发布 │ ├── event_call(¶m, CMD_PUBLISH_FD_EVENT, 1) │ ├── RPC消息发送到事件服务任务 │ ├→ process_rpc() 处理CMD_PUBLISH_FD_EVENT │ └→ eventlist_publish_fd() 触发回调 │ ├── 阶段4:应用层回调执行 │ ├── 事件系统调用注册的回调函数 │ ├── 应用层从lwIP套接字读取数据 │ └→ 业务逻辑处理网络数据 │ └── 性能优化点: ├── 流量控制:检查消息队列负载 ├── 同步发布:确保事件处理顺序 └── 位图操作:高效FD集合管理
第六部分:解耦设计思想分析
/* * 高内聚低耦合设计体现: */ /* 1. 接口隔离 - 事件服务提供统一接口,不依赖具体协议 */ void event_subscribe_fd(uint32_t fd, event_callback_t cb, void *context); /* 应用层无需知道fd是lwIP套接字还是其他类型文件描述符 */ /* 2. 依赖倒置 - 通过回调函数解耦 */ typedef void (*event_callback_t)(uint32_t event_id, void *data, void *context); /* 事件服务不关心具体业务逻辑,只负责调用注册的回调 */ /* 3. 单一职责 - 每个模块职责清晰 */ /* * - lwIP: 纯TCP/IP协议栈实现 * - 事件服务: 纯事件分发机制 * - 应用层: 纯业务逻辑处理 * 各层通过标准接口协作,内部实现可独立变化 */ /* 4. 消息传递而非共享内存 - 避免竞态条件 */ /* 通过RPC消息在任务间传递事件,天然线程安全 */ /* 5. 配置驱动 - 通过编译时配置控制功能 */ /* * LWIP_SOCKET, MEMP_NUM_NETCONN等配置选项 * 允许根据不同应用场景定制系统行为 */
第七部分:事件服务初始化流程详细分析
/*
* 事件服务初始化完整流程分析
* 严格按照 event_service_init() 函数逐行分析
*/
int event_service_init(utask_t *task)
{
/* 第1行:任务参数检查 */
if (task == NULL)
/* 第2-3行:创建专用事件服务任务 */
task = utask_new("event_svr", 2*1024, QUEUE_MSG_COUNT * 5, AOS_DEFAULT_APP_PRI);
/*
* 参数详解:
* - "event_svr": 任务名称标识符,用于调试和管理
* - 2*1024: 任务栈大小2048字节,足够处理事件和RPC消息
* - QUEUE_MSG_COUNT * 5: 消息队列容量,扩大5倍避免队列满
* - AOS_DEFAULT_APP_PRI: 默认应用优先级,平衡响应性和系统负载
* 数据流:分配TCB(128B) + 栈(2KB) + 消息队列(可变)
*/
/* 第4-5行:任务创建失败检查 */
if (task == NULL)
return -1; /* 错误码-1表示任务创建失败 */
/* 第6行:初始化事件列表数据结构 */
eventlist_init(&ev_service.event);
/*
* 内部操作:
* - 初始化事件回调链表头
* - 重置事件计数器
* - 准备FD监听集合
* 数据结构大小:约20-40字节
*/
/* 第7行:初始化超时事件链表 */
dlist_init(&ev_service.timeouts);
/*
* 内部操作:
* - 设置链表头指针:prev和next都指向自身
* - 初始化链表节点计数
* 数据结构大小:16字节(两个指针)
*/
/* 第8行:创建二进制信号量 */
aos_sem_new(&ev_service.select_sem, 0);
/*
* 参数详解:
* - &ev_service.select_sem: 信号量句柄地址
* - 0: 初始值0,表示初始时无信号
* 功能:用于唤醒select任务处理新的事件注册
* 数据结构大小:4-8字节
*/
/* 第9-10行:创建事件微服务实例 */
ev_service.svr = uservice_new("event_svr", process_rpc, NULL);
/*
* 参数详解:
* - "event_svr": 服务名称,与任务名一致
* - process_rpc: RPC请求处理函数指针
* - NULL: 上下文参数,此处未使用
* 功能:创建微服务,用于处理跨任务事件请求
* 数据结构大小:约32-64字节(包含消息队列等)
*/
/* 第11行:初始化等待事件对象 */
aos_event_new(&ev_service.wait_event, 0);
/*
* 参数详解:
* - &ev_service.wait_event: 事件对象句柄地址
* - 0: 初始事件标志值
* 功能:用于复杂的多事件等待场景
* 数据结构大小:4-8字节
*/
/* 第12-13行:创建select专用任务 */
aos_task_new_ext(&ev_service.select_task, "select", select_task_entry, NULL,
1024, AOS_DEFAULT_APP_PRI);
/*
* 参数详解:
* - &ev_service.select_task: 任务句柄输出参数
* - "select": 任务名称,标识为select事件处理任务
* - select_task_entry: 任务入口函数指针
* - NULL: 任务参数,此处未使用
* - 1024: 栈大小1KB,足够处理select系统调用
* - AOS_DEFAULT_APP_PRI: 默认优先级
* 关键设计:专用任务处理select,避免阻塞主事件服务
*/
/* 第14行:将事件服务添加到任务中管理 */
utask_add(task, ev_service.svr);
/*
* 功能:建立任务与服务的关系
* - task: 宿主任务,负责调度和执行
* - ev_service.svr: 微服务实例,处理具体业务
* 数据流:将服务添加到任务的服务链表中
*/
/* 第15行:成功返回 */
return 0; /* 返回0表示初始化成功 */
}
第八部分:RPC命令处理详细流程分析
/*
* RPC命令处理函数 process_rpc() 逐行分析
* 这是事件分发的核心引擎
*/
static int process_rpc(void *context, rpc_t *rpc)
{
/* 第1行:缓冲区大小变量初始化 */
int size = -1; /* 初始值-1,表示未知大小 */
/* 第2行:从RPC消息中提取事件参数 */
struct event_param *param = (struct event_param *)rpc_get_buffer(rpc, &size);
/*
* 函数调用:rpc_get_buffer(rpc, &size)
* - 输入:rpc消息指针,大小变量地址
* - 输出:参数缓冲区指针,更新size为实际大小
* 数据流:从RPC消息中提取24-32字节的参数数据
*/
/* 第3行:安全性检查 - 参数大小验证 */
aos_assert(size == sizeof(struct event_param));
/*
* 断言检查:确保接收到的参数大小与预期一致
* - 防止内存越界和缓冲区溢出
* - 如果断言失败,系统会触发错误处理
* 保护机制:确保数据完整性
*/
/* 第4行:命令分发器 - 基于命令ID路由 */
switch (rpc->cmd_id) { /* rpc->cmd_id 来自RPC消息头 */
/* 第5-8行:CMD_SUB_FD_EVENT 文件描述符订阅 */
case CMD_SUB_FD_EVENT:
/* 调用事件列表订阅函数 */
eventlist_subscribe_fd(&ev_service.event, param->event_id, param->cb, param->data);
/*
* 参数传递:
* - &ev_service.event: 事件列表指针
* - param->event_id: 文件描述符(作为事件ID)
* - param->cb: 事件回调函数指针
* - param->data: 用户上下文数据指针
* 内部操作:将FD回调注册到事件列表中
*/
/* 通知select任务 */
aos_sem_signal(&ev_service.select_sem);
/*
* 功能:唤醒select任务,使其重新计算监听集合
* 必要性:新FD注册后,select需要更新监听位图
* 性能:避免select任务无谓的等待
*/
break; /* 跳出switch case */
/* 第9-11行:CMD_REMOVE_FD_EVENT FD事件取消订阅 */
case CMD_REMOVE_FD_EVENT:
eventlist_unsubscribe_fd(&ev_service.event, param->event_id, param->cb, param->data);
/*
* 功能:从事件列表中移除指定的FD回调
* 参数与订阅时一致,用于精确匹配要移除的条目
* 内部操作:遍历链表,找到匹配项并删除
*/
break;
/* 第12-14行:CMD_PUBLISH_FD_EVENT FD事件发布 */
case CMD_PUBLISH_FD_EVENT:
eventlist_publish_fd(&ev_service.event, param->event_id, param->data);
/*
* 功能:触发指定FD的所有注册回调
* 调用场景:当select检测到FD就绪时调用
* 数据流:遍历FD的回调链表,逐个调用回调函数
*/
break;
/* 第15-17行:CMD_SUB_EVENT 普通事件订阅 */
case CMD_SUB_EVENT:
eventlist_subscribe(&ev_service.event, param->event_id, param->cb, param->data);
/*
* 功能:注册普通事件回调(非FD事件)
* 区别:不涉及select监听,纯回调机制
* 使用场景:自定义应用层事件
*/
break;
/* 第18-20行:CMD_REMOVE_EVENT 普通事件取消订阅 */
case CMD_REMOVE_EVENT:
eventlist_unsubscribe(&ev_service.event, param->event_id, param->cb, param->data);
/* 功能:移除普通事件回调注册 */
break;
/* 第21-44行:CMD_PUBLISH_EVENT 事件发布(支持延迟)*/
case CMD_PUBLISH_EVENT:
/* 第22行:延迟事件检查 */
if (param->timeout > 0) { /* 超时时间大于0表示延迟发布 */
/* 第23-24行:动态分配超时事件参数 */
struct event_param *timer = aos_malloc(sizeof(struct event_param));
/* 内存分配:申请24-32字节内存块 */
/* 第25-26行:内存分配失败检查 */
if (timer == NULL)
break; /* 内存不足,优雅降级,跳过此事件 */
/* 第27-29行:设置超时事件参数 */
timer->timeout = aos_now_ms() + param->timeout; /* 计算绝对超时时间 */
timer->event_id = param->event_id; /* 复制事件ID */
timer->data = param->data; /* 复制事件数据 */
/* 数据流:从param复制到timer,准备加入超时链表 */
/* 第30-37行:在超时链表中按时间顺序插入 */
struct event_param *node; /* 链表遍历指针 */
dlist_for_each_entry(&ev_service.timeouts, node, struct event_param, next) {
/* 遍历超时链表,找到合适的插入位置 */
if (timer->timeout < node->timeout) /* 时间比较 */
break; /* 找到插入位置:在node节点之前 */
}
/* 第38行:在找到的节点前插入新节点 */
dlist_add_tail(&timer->next, &node->next);
/*
* 链表操作:将timer节点插入到node节点之前
* 效果:保持链表按超时时间升序排列
* 性能:O(n)复杂度,n为当前超时事件数量
*/
/* 第39行:通知select任务 */
aos_sem_signal(&ev_service.select_sem);
/* 必要性:新的超时事件可能影响select的超时计算 */
} else {
/* 第41-43行:立即发布事件 */
eventlist_publish(&ev_service.event, param->event_id, param->data);
/* 功能:无延迟,立即触发事件回调 */
}
break; /* 结束CMD_PUBLISH_EVENT case */
} /* 结束switch语句 */
/* 第45行:RPC响应回复 */
rpc_reply(rpc);
/*
* 功能:向RPC调用方发送响应,表示处理完成
* 同步机制:对于同步调用,这会唤醒等待的调用方
* 异步机制:对于异步调用,这只是清理资源
*/
/* 第46行:成功返回 */
return 0; /* 返回0表示RPC处理成功 */
} /* 结束process_rpc函数 */
第九部分:事件调用封装函数详细分析
/*
* 事件调用封装函数 event_call() 逐行分析
* 门面模式:统一封装RPC调用过程
*/
static void event_call(struct event_param *param, int cmd_id, int sync)
{
/* 第1行:声明RPC消息结构体 */
rpc_t rpc; /* 栈变量,大小约32-64字节,包含消息头和缓冲区 */
/* 第2行:初始化RPC消息 */
if (rpc_init(&rpc, cmd_id, sync ? AOS_WAIT_FOREVER : 0) == 0) {
/*
* rpc_init参数:
* - &rpc: RPC消息结构体指针
* - cmd_id: 命令ID,决定在process_rpc中如何路由
* - sync ? AOS_WAIT_FOREVER : 0: 超时时间
* - sync为1:同步调用,无限等待
* - sync为0:异步调用,立即返回
* 返回值0表示初始化成功
*/
/* 第3行:将事件参数复制到RPC缓冲区 */
rpc_put_buffer(&rpc, param, sizeof(struct event_param));
/*
* 数据拷贝:将param的24-32字节数据复制到RPC消息缓冲区
* 内存操作:可能涉及内存分配或缓冲区管理
* 序列化:将结构体数据序列化为字节流
*/
/* 第4行:调用微服务处理请求 */
uservice_call(ev_service.svr, &rpc);
/*
* 函数功能:跨任务消息传递
* 内部操作:
* 1. 将RPC消息发送到事件服务任务的消息队列
* 2. 如果同步调用,阻塞等待响应
* 3. 如果异步调用,立即返回
* 线程安全:通过消息队列实现任务间通信
*/
/* 第5行:释放RPC资源 */
rpc_deinit(&rpc);
/*
* 功能:清理RPC消息占用的资源
* 包括:释放缓冲区、重置状态等
* 必要性:防止内存泄漏
*/
} /* 结束if(rpc_init成功) */
/* 注意:如果rpc_init失败,则跳过整个RPC调用过程 */
} /* 结束event_call函数 */
第十部分:文件描述符事件接口详细分析
/*
* 文件描述符事件订阅函数 event_subscribe_fd() 逐行分析
*/
void event_subscribe_fd(uint32_t fd, event_callback_t cb, void *context)
{
/* 第1行:断言检查 - 回调函数不能为空 */
aos_assert(cb);
/*
* 安全性检查:确保有有效的回调函数
* 如果cb为NULL,断言失败,系统报错
* 必要性:避免注册无效的事件处理
*/
/* 第2行:声明事件参数结构体(栈变量) */
struct event_param param; /* 24-32字节栈变量 */
/* 第3-5行:设置订阅参数 */
param.event_id = fd; /* 文件描述符作为事件ID */
param.cb = cb; /* 事件发生时的回调函数指针 */
param.data = context; /* 回调时的用户上下文指针 */
/* 数据组装:将调用参数打包成统一的事件参数格式 */
/* 第6行:调用RPC封装函数 */
event_call(¶m, CMD_SUB_FD_EVENT, 0);
/*
* 参数详解:
* - ¶m: 事件参数指针
* - CMD_SUB_FD_EVENT: 命令ID,表示订阅FD事件
* - 0: 异步调用,不等待处理完成
* 设计选择:订阅操作通常是异步的,避免阻塞调用方
*/
} /* 结束event_subscribe_fd函数 */
/*
* 文件描述符事件发布函数 event_publish_fd() 逐行分析
*/
void event_publish_fd(uint32_t event_id, void *data, int sync)
{
/* 第1行:声明事件参数结构体 */
struct event_param param; /* 栈变量 */
/* 第2-4行:设置事件参数 */
param.event_id = event_id; /* 文件描述符(事件ID) */
param.data = data; /* 事件关联数据指针 */
param.cb = NULL; /* 回调函数为空(发布时不需要)*/
/*
* 关键设计:发布时cb为NULL,因为在process_rpc中不需要回调信息
* 事件发布是触发已注册的回调,而不是注册新回调
*/
/* 第5行:调用RPC封装函数 */
event_call(¶m, CMD_PUBLISH_FD_EVENT, sync);
/*
* 参数详解:
* - ¶m: 事件参数指针
* - CMD_PUBLISH_FD_EVENT: 命令ID,表示发布FD事件
* - sync: 同步标志,由调用方指定
* 同步选择:通常select任务使用同步发布确保顺序
*/
} /* 结束event_publish_fd函数 */
第十一部分:超时事件处理函数详细分析
/*
* 处理超时事件函数 do_time_event() 逐行分析
* 功能:检查并触发已到期的延迟事件
*/
static int do_time_event()
{
/* 第1行:返回值初始化 */
int delayed_ms = -1; /* 默认-1表示无超时事件需要处理 */
/* 第2-3行:声明链表遍历变量 */
struct event_param *node; /* 当前节点指针,用于遍历链表 */
dlist_t *tmp; /* 临时指针,用于安全删除操作 */
/* 第4行:加锁保护超时链表 - 临界区开始 */
uservice_lock(ev_service.svr);
/*
* 功能:获取微服务锁,保护共享资源
* 必要性:防止多任务同时操作超时链表导致数据损坏
* 锁范围:保护ev_service.timeouts链表
*/
/* 第5行:安全遍历超时链表 */
dlist_for_each_entry_safe(&ev_service.timeouts, tmp, node, struct event_param, next) {
/*
* 宏展开:安全遍历链表,支持在遍历时删除节点
* 参数详解:
* - &ev_service.timeouts: 链表头指针
* - tmp: 临时指针,用于保存下一个节点
* - node: 当前节点指针
* - struct event_param: 节点类型
* - next: 链表节点中的next字段名
* 遍历机制:从链表头开始,按顺序遍历每个节点
*/
/* 第6行:获取当前系统时间 */
long long now = aos_now_ms(); /* 获取当前系统时间,单位毫秒 */
/* 第7-8行:检查事件是否超时 */
if (now >= node->timeout) { /* 当前时间 >= 超时时间 */
/* 第9行:触发超时事件 */
event_publish(node->event_id, node->data);
/*
* 功能:发布超时事件,触发对应的回调函数
* 参数:
* - node->event_id: 超时事件ID
* - node->data: 事件关联数据
* 内部调用:通过event_call发布普通事件
*/
/* 第10行:从链表中移除已触发的节点 */
dlist_del(&node->next);
/*
* 功能:将节点从超时链表中删除
* 链表操作:调整前后节点的指针,跳过当前节点
* 效果:node节点从链表中脱离
*/
/* 第11行:释放节点内存 */
aos_free(node);
/*
* 功能:释放动态分配的event_param内存
* 内存管理:回收24-32字节内存
* 必要性:防止内存泄漏
*/
} else {
/* 第13-14行:计算下一个超时事件的剩余时间 */
delayed_ms = node->timeout - now; /* 剩余时间 = 超时时间 - 当前时间 */
/*
* 计算原理:链表按超时时间升序排列
* 第一个未超时节点就是下一个要处理的事件
* 返回值:用于设置select的超时时间
*/
/* 第15行:跳出循环 */
break; /* 由于链表有序,后续节点超时时间更晚,无需继续检查 */
} /* 结束if-else条件判断 */
} /* 结束链表遍历循环 */
/* 第16行:释放锁 - 临界区结束 */
uservice_unlock(ev_service.svr);
/*
* 功能:释放微服务锁,允许其他任务访问共享资源
* 配对操作:与第4行的uservice_lock配对使用
* 重要性:确保锁的及时释放,避免死锁
*/
/* 第17行:返回下一个超时事件的等待时间 */
return delayed_ms; /*
* 返回值含义:
* - >0: 下一个超时事件的剩余毫秒数
* - -1: 无超时事件需要处理
* 用途:作为select系统调用的超时参数
*/
} /* 结束do_time_event函数 */
第十二部分:select任务核心循环详细分析
/*
* select任务入口函数 select_task_entry() 逐行分析
* Reactor模式核心:监听文件描述符事件和超时事件
*/
static void select_task_entry(void *arg)
{
/* 第1行:获取事件列表指针 */
event_list_t *evlist = &ev_service.event; /* 指向全局事件列表 */
/* 第2行:获取微服务任务上下文 */
utask_t *task = ev_service.svr->task; /* 获取关联的任务控制块 */
/* 第3行:系统初始化 */
sys_init(); /* 底层系统初始化,确保select相关资源就绪 */
/* 第4行:主事件循环 - 无限循环 */
while (1) {
/* 第5-6行:声明select相关变量 */
fd_set readfds; /* 读文件描述符集合,大小依赖FD_SETSIZE */
struct timeval timeout; /* select超时时间结构体,8字节 */
/* 第7-8行:处理超时事件并获取下一个超时时间 */
int time_ms = do_time_event(); /* 返回下一个超时等待时间 */
/*
* 函数调用:执行超时事件检查
* 返回值:
* - time_ms > 0: 下一个超时事件的剩余时间
* - time_ms = -1: 无超时事件
*/
/* 第9-10行:设置要监听的文件描述符集合 */
int max_fd = eventlist_setfd(evlist, &readfds); /* 返回最大文件描述符 */
/*
* 功能:将事件列表中所有注册的FD设置到readfds集合中
* 内部操作:
* 1. 清空readfds集合
* 2. 遍历所有注册的FD回调
* 3. 将每个FD添加到readfds位图中
* 4. 记录最大的FD值
* 返回值max_fd:用于select的第一个参数
*/
/* 第11-12行:设置select超时时间 */
timeout.tv_sec = time_ms / 1000; /* 秒部分:整除1000 */
timeout.tv_usec = (time_ms % 1000) * 1000; /* 微秒部分:取余后乘1000 */
/*
* 时间转换:将毫秒转换为struct timeval格式
* 示例:time_ms=1500 → tv_sec=1, tv_usec=500000
* 格式要求:select系统调用需要的参数格式
*/
/* 第13-16行:执行select系统调用 */
int ret = select2(max_fd + 1, &readfds, NULL, NULL,
time_ms == -1 ? NULL : &timeout, &ev_service.select_sem);
/*
* 函数参数详解:
* - max_fd + 1: 最大FD值加1,select规范要求
* - &readfds: 读集合指针,监听可读事件
* - NULL: 写集合,此处不监听
* - NULL: 异常集合,此处不监听
* - time_ms == -1 ? NULL : &timeout: 超时参数
* - time_ms=-1时传NULL,表示无限等待
* - 否则传递&timeout,指定超时时间
* - &ev_service.select_sem: 唤醒信号量,允许其他任务唤醒select
*
* 返回值ret:
* - >0: 就绪的文件描述符数量
* - =0: 超时,无文件描述符就绪
* - <0: 错误,如被信号中断等
*/
/* 第17行:检查select返回值 - 有文件描述符就绪 */
if (ret > 0) {
/* 第18行:遍历所有可能的文件描述符 */
for (int fd = 0; fd <= max_fd; fd++) {
/* 第19行:检查该文件描述符是否在就绪集合中 */
if (FD_ISSET(fd, &readfds)) {
/*
* 宏功能:检查fd是否在readfds位图中被设置
* 实现:通过位操作检查对应bit是否为1
* 性能:O(1)复杂度,快速判断
*/
/* 第20-22行:流量控制检查 */
if(aos_queue_get_count(&task->queue) < (task->queue_count*3/4)) {
/*
* 功能:检查任务消息队列负载
* 条件:当前队列消息数 < 队列容量的75%
* 目的:避免事件风暴导致消息队列溢出
* 保护机制:在系统繁忙时丢弃部分事件,保证系统稳定
*/
/* 第23行:发布FD事件 - 同步方式 */
event_publish_fd(fd, NULL, 1); /* 同步发布,阻塞直到处理完成 */
/*
* 参数详解:
* - fd: 就绪的文件描述符
* - NULL: 事件数据,此处未使用
* - 1: 同步标志,等待事件处理完成
* 设计选择:同步确保事件处理顺序,避免竞态条件
*/
} /* 结束流量控制条件 */
/* 第24-25行:注释掉的代码 */
// eventlist_remove_fd(evlist, fd);
/*
* 原设计考虑:处理一次后移除监听
* 当前实现:持久监听,FD保持注册状态
* 设计选择:保持监听允许重复事件处理
*/
} /* 结束FD_ISSET条件 */
} /* 结束FD遍历循环 */
} /* 结束ret>0条件 */
/*
* 注意:当select返回0(超时)或负数(错误)时
* 直接继续循环,重新开始事件处理
* 超时处理:在下一轮循环中通过do_time_event()处理
* 错误处理:忽略错误,继续运行,保持系统韧性
*/
} /* 结束主事件循环while(1) */
/* 第26行:任务退出(实际上不会执行到这里) */
aos_task_exit(0); /* 安全退出任务,参数0表示正常退出 */
/*
* 功能:任务清理和资源释放
* 实际执行:由于while(1)无限循环,此代码不会被执行
* 保留原因:代码完整性,遵循任务编程规范
*/
} /* 结束select_task_entry函数 */
第十三部分:普通事件接口函数详细分析
/*
* 发布普通事件函数 event_publish() 逐行分析
* 功能:立即发布普通事件,无延迟
*/
void event_publish(uint32_t event_id, void *data)
{
/* 第1行:声明事件参数结构体 */
struct event_param param; /* 栈变量,24-32字节 */
/* 第2-4行:设置事件参数 */
param.event_id = event_id; /* 事件标识符 */
param.data = data; /* 事件关联数据指针 */
param.timeout = 0; /* 超时时间为0,表示立即发布 */
/*
* 参数设置:明确指定timeout=0,区别于延迟发布
* 数据流:将调用参数打包成统一格式
*/
/* 第5行:调用RPC封装函数 */
event_call(¶m, CMD_PUBLISH_EVENT, 0); /* 命令:发布普通事件,异步调用 */
/*
* 参数详解:
* - ¶m: 事件参数指针
* - CMD_PUBLISH_EVENT: 命令ID,在process_rpc中路由到相应处理
* - 0: 异步调用标志,不等待处理完成
* 设计选择:普通事件发布通常异步,避免阻塞调用方
*/
} /* 结束event_publish函数 */
/*
* 发布延迟事件函数 event_publish_delay() 逐行分析
* 功能:发布延迟执行的事件
*/
void event_publish_delay(uint32_t event_id, void *data, int timeout)
{
/* 第1行:声明事件参数结构体 */
struct event_param param; /* 栈变量 */
/* 第2-4行:设置事件参数 */
param.event_id = event_id; /* 事件标识符 */
param.data = data; /* 事件数据指针 */
param.timeout = timeout; /* 延迟时间(毫秒) */
/*
* 关键参数:timeout指定延迟时间
* 与event_publish的区别:timeout>0表示延迟发布
* 数据流:参数打包,准备通过RPC传递
*/
/* 第5行:调用RPC封装函数 */
event_call(¶m, CMD_PUBLISH_EVENT, 0); /* 命令:发布事件,支持延迟 */
/*
* 注意:使用相同的CMD_PUBLISH_EVENT命令
* 在process_rpc中通过param->timeout区分立即发布和延迟发布
* 设计统一:相同命令不同行为,简化接口设计
*/
} /* 结束event_publish_delay函数 */
/*
* 订阅普通事件函数 event_subscribe() 逐行分析
*/
void event_subscribe(uint32_t event_id, event_callback_t cb, void *context)
{
/* 第1行:断言检查 - 回调函数不能为空 */
aos_assert(cb); /* 安全性检查,确保有有效的回调处理函数 */
/* 第2行:声明事件参数结构体 */
struct event_param param; /* 栈变量 */
/* 第3-5行:设置订阅参数 */
param.event_id = event_id; /* 要监听的事件类型 */
param.cb = cb; /* 事件触发时的处理函数指针 */
param.data = context; /* 传递给回调函数的上下文指针 */
/*
* 参数组装:与FD订阅类似,但用于普通事件
* 事件类型:event_id可以是任何应用定义的事件标识
*/
/* 第6行:调用RPC封装函数 */
event_call(¶m, CMD_SUB_EVENT, 0); /* 命令:订阅普通事件,异步调用 */
} /* 结束event_subscribe函数 */
/*
* 取消订阅普通事件函数 event_unsubscribe() 逐行分析
*/
void event_unsubscribe(uint32_t event_id, event_callback_t cb, void *context)
{
/* 第1行:断言检查 - 回调函数不能为空 */
aos_assert(cb); /* 确保指定要移除的具体回调函数 */
/* 第2行:声明事件参数结构体 */
struct event_param param; /* 栈变量 */
/* 第3-5行:设置取消订阅参数 */
param.event_id = event_id; /* 目标事件类型 */
param.cb = cb; /* 要移除的特定回调函数指针 */
param.data = context; /* 对应的用户上下文指针 */
/*
* 精确匹配:需要event_id、cb、context三者完全匹配
* 设计考虑:确保准确移除指定的回调,避免误删
*/
/* 第6行:调用RPC封装函数 */
event_call(¶m, CMD_REMOVE_EVENT, 0); /* 命令:移除事件订阅,异步调用 */
} /* 结束event_unsubscribe函数 */
第十四部分:取消订阅FD事件函数详细分析
/*
* 取消订阅文件描述符事件函数 event_unsubscribe_fd() 逐行分析
*/
void event_unsubscribe_fd(uint32_t event_id, event_callback_t cb, void *context)
{
/* 第1行:声明事件参数结构体 */
struct event_param param; /* 栈变量,24-32字节 */
/* 第2-4行:设置取消订阅参数 */
param.event_id = event_id; /* 目标文件描述符 */
param.cb = cb; /* 要移除的回调函数指针 */
param.data = context; /* 对应的用户上下文指针 */
/*
* 参数要求:必须与订阅时使用的参数完全一致
* 匹配机制:通过event_id、cb、context三元组精确识别要移除的回调
* 设计原理:支持同一个FD注册多个不同回调的场景
*/
/* 第5行:调用RPC封装函数 */
event_call(¶m, CMD_REMOVE_FD_EVENT, 0); /* 命令:移除FD事件订阅,异步调用 */
/*
* 参数详解:
* - ¶m: 事件参数指针
* - CMD_REMOVE_FD_EVENT: 命令ID,在process_rpc中路由到移除处理
* - 0: 异步调用标志,不等待处理完成
* 设计选择:取消订阅操作通常是异步的,避免阻塞调用方
*/
} /* 结束event_unsubscribe_fd函数 */
openvela 操作系统专为 AIoT 领域量身定制,以轻量化、标准兼容、安全性和高度可扩展性为核心特点。openvela 以其卓越的技术优势,已成为众多物联网设备和 AI 硬件的技术首选,涵盖了智能手表、运动手环、智能音箱、耳机、智能家居设备以及机器人等多个领域。
更多推荐

所有评论(0)