文件4: message_router.cpp - 消息路由和分发

/**
 * @file message_router.cpp
 * @brief 消息路由器实现
 * @author IoT Platform Team
 * @version 1.0
 * @date 2024
 * 
 * 实现责任链模式和策略模式
 * 处理消息解析、路由、转换和分发
 */
​
#include "config.h"
#include "device_manager.h"
#include <memory>
#include <vector>
#include <queue>
#include <unordered_map>
#include <functional>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <chrono>
#include <sstream>
#include <iomanip>
​
namespace iot {
​
// 前向声明
class WebSocketConnection;
class DeviceManager;
​
/**
 * @struct RouteConfig
 * @brief 路由配置结构
 * @details 定义消息路由规则和策略
 */
struct RouteConfig {
    MessageType message_type;          ///< 消息类型
    std::string source_pattern;        ///< 源设备模式(支持通配符)
    std::string target_pattern;        ///< 目标设备模式
    uint32_t priority;                 ///< 路由优先级(0-100,越高越优先)
    bool require_auth;                 ///< 是否需要认证
    size_t rate_limit;                 ///< 速率限制(消息/秒)
    bool enable_logging;               ///< 是否启用日志
    
    RouteConfig() : message_type(MessageType::DEVICE_DATA),
                   priority(50),
                   require_auth(true),
                   rate_limit(1000),
                   enable_logging(false) {}
};
​
/**
 * @class MessageContext
 * @brief 消息上下文类
 * @details 封装消息处理过程中的所有上下文信息
 * 
 * 设计模式:上下文模式(Context Pattern)
 * - 处理突发事件:消息丢失、处理超时、依赖解析失败
 * - 工程作用:传递处理状态,支持中间件链式调用
 */
class MessageContext {
private:
    WebSocketMessage original_message_;           ///< 原始消息
    std::vector<uint8_t> processed_payload_;      ///< 处理后的负载
    std::string source_device_id_;                ///< 源设备ID
    std::string target_device_id_;                ///< 目标设备ID
    RouteConfig route_config_;                    ///< 路由配置
    
    // 处理状态
    std::atomic<bool> processed_{false};          ///< 是否已处理
    std::atomic<bool> successful_{false};         ///< 是否处理成功
    std::atomic<uint32_t> error_code_{0};         ///< 错误码
    std::string error_message_;                   ///< 错误信息
    
    // 时间戳记录
    uint64_t receive_timestamp_;                  ///< 接收时间戳
    uint64_t start_process_timestamp_;            ///< 开始处理时间戳
    uint64_t end_process_timestamp_;              ///< 结束处理时间戳
    
    // 扩展数据(用于中间件传递数据)
    std::unordered_map<std::string, std::any> extra_data_; ///< 额外数据存储
    
    mutable std::mutex context_mutex_;            ///< 上下文互斥锁
    
public:
    /**
     * @brief 构造函数
     * @param message 原始WebSocket消息
     * @param source_id 源设备ID
     */
    MessageContext(const WebSocketMessage& message, const std::string& source_id)
        : original_message_(message),
          source_device_id_(source_id),
          receive_timestamp_(WebSocketConnection::getCurrentTimestamp()) {
        
        processed_payload_ = message.payload;  ///< 初始化为原始负载
    }
    
    /**
     * @brief 设置路由配置
     * @param config 路由配置
     */
    void setRouteConfig(const RouteConfig& config) {
        std::lock_guard<std::mutex> lock(context_mutex_);
        route_config_ = config;
    }
    
    /**
     * @brief 获取路由配置
     * @return 路由配置引用
     */
    const RouteConfig& getRouteConfig() const {
        std::lock_guard<std::mutex> lock(context_mutex_);
        return route_config_;
    }
    
    /**
     * @brief 设置目标设备ID
     * @param target_id 目标设备ID
     */
    void setTargetDeviceId(const std::string& target_id) {
        std::lock_guard<std::mutex> lock(context_mutex_);
        target_device_id_ = target_id;
    }
    
    /**
     * @brief 获取目标设备ID
     * @return 目标设备ID
     */
    std::string getTargetDeviceId() const {
        std::lock_guard<std::mutex> lock(context_mutex_);
        return target_device_id_;
    }
    
    /**
     * @brief 获取源设备ID
     * @return 源设备ID
     */
    std::string getSourceDeviceId() const {
        std::lock_guard<std::mutex> lock(context_mutex_);
        return source_device_id_;
    }
    
    /**
     * @brief 获取原始消息
     * @return 原始消息引用
     */
    const WebSocketMessage& getOriginalMessage() const {
        return original_message_;
    }
    
    /**
     * @brief 获取处理后的负载
     * @return 负载数据引用
     */
    const std::vector<uint8_t>& getProcessedPayload() const {
        std::lock_guard<std::mutex> lock(context_mutex_);
        return processed_payload_;
    }
    
    /**
     * @brief 设置处理后的负载
     * @param payload 新的负载数据
     */
    void setProcessedPayload(const std::vector<uint8_t>& payload) {
        std::lock_guard<std::mutex> lock(context_mutex_);
        processed_payload_ = payload;
    }
    
    /**
     * @brief 添加额外数据
     * @param key 数据键
     * @param value 数据值
     */
    template<typename T>
    void setExtraData(const std::string& key, const T& value) {
        std::lock_guard<std::mutex> lock(context_mutex_);
        extra_data_[key] = value;
    }
    
    /**
     * @brief 获取额外数据
     * @param key 数据键
     * @return 数据值,不存在抛出异常
     */
    template<typename T>
    T getExtraData(const std::string& key) const {
        std::lock_guard<std::mutex> lock(context_mutex_);
        auto it = extra_data_.find(key);
        if (it == extra_data_.end()) {
            throw std::runtime_error("Extra data not found: " + key);
        }
        return std::any_cast<T>(it->second);
    }
    
    /**
     * @brief 开始处理消息
     */
    void startProcessing() {
        start_process_timestamp_ = WebSocketConnection::getCurrentTimestamp();
    }
    
    /**
     * @brief 标记消息处理完成
     * @param success 是否成功
     * @param error_code 错误码(成功时为0)
     * @param error_msg 错误信息
     */
    void finishProcessing(bool success, uint32_t error_code = 0, 
                         const std::string& error_msg = "") {
        end_process_timestamp_ = WebSocketConnection::getCurrentTimestamp();
        processed_ = true;
        successful_ = success;
        error_code_ = error_code;
        error_message_ = error_msg;
    }
    
    /**
     * @brief 检查消息是否已处理
     * @return 已处理返回true
     */
    bool isProcessed() const {
        return processed_;
    }
    
    /**
     * @brief 检查处理是否成功
     * @return 成功返回true
     */
    bool isSuccessful() const {
        return successful_;
    }
    
    /**
     * @brief 获取处理延迟(毫秒)
     * @return 处理延迟
     */
    uint64_t getProcessingDelay() const {
        if (!processed_) {
            return 0;
        }
        return end_process_timestamp_ - start_process_timestamp_;
    }
    
    /**
     * @brief 获取总延迟(从接收到完成)
     * @return 总延迟
     */
    uint64_t getTotalDelay() const {
        if (!processed_) {
            return WebSocketConnection::getCurrentTimestamp() - receive_timestamp_;
        }
        return end_process_timestamp_ - receive_timestamp_;
    }
    
    /**
     * @brief 生成消息处理报告
     * @return 报告字符串
     */
    std::string generateReport() const {
        std::lock_guard<std::mutex> lock(context_mutex_);
        
        std::stringstream ss;
        ss << "=== Message Processing Report ===\n"
           << "Message ID: " << original_message_.message_id << "\n"
           << "Message Type: " << static_cast<int>(original_message_.type) << "\n"
           << "Source: " << source_device_id_ << "\n"
           << "Target: " << target_device_id_ << "\n"
           << "Payload Size: " << original_message_.payload.size() << " bytes\n"
           << "Status: " << (successful_ ? "SUCCESS" : "FAILED") << "\n";
        
        if (!successful_) {
            ss << "Error Code: " << error_code_ << "\n"
               << "Error Message: " << error_message_ << "\n";
        }
        
        ss << "Receive Time: " << receive_timestamp_ << "\n"
           << "Processing Delay: " << getProcessingDelay() << "ms\n"
           << "Total Delay: " << getTotalDelay() << "ms\n";
        
        return ss.str();
    }
};
​
/**
 * @class MessageHandler
 * @brief 消息处理器基类
 * @details 定义消息处理接口,支持责任链模式
 * 
 * 设计模式:责任链模式(Chain of Responsibility Pattern)
 * - 处理突发事件:处理器故障、消息格式错误、依赖服务不可用
 * - 工程作用:动态组合处理逻辑,支持灵活扩展
 */
class MessageHandler {
protected:
    std::shared_ptr<MessageHandler> next_handler_; ///< 下一个处理器
    std::string handler_name_;                    ///< 处理器名称
    std::atomic<uint64_t> processed_count_{0};    ///< 处理消息计数
    std::atomic<uint64_t> error_count_{0};        ///< 错误计数
    
public:
    /**
     * @brief 构造函数
     * @param name 处理器名称
     */
    explicit MessageHandler(const std::string& name) : handler_name_(name) {}
    
    virtual ~MessageHandler() = default;
    
    /**
     * @brief 设置下一个处理器
     * @param handler 下一个处理器指针
     */
    void setNextHandler(std::shared_ptr<MessageHandler> handler) {
        next_handler_ = handler;
    }
    
    /**
     * @brief 处理消息(模板方法模式)
     * @param context 消息上下文
     * @return 处理成功返回true
     * 
     * 实现责任链模式的传递机制
     */
    bool handleMessage(std::shared_ptr<MessageContext> context) {
        // 记录开始时间
        if (!context->isProcessed()) {
            context->startProcessing();
        }
        
        bool success = false;
        try {
            // 调用具体处理逻辑
            success = processMessage(context);
            
            if (success) {
                processed_count_++;
            } else {
                error_count_++;
                context->finishProcessing(false, 1000, 
                                         handler_name_ + " processing failed");
            }
        } catch (const std::exception& e) {
            error_count_++;
            context->finishProcessing(false, 1001, 
                                     handler_name_ + " exception: " + e.what());
            success = false;
        }
        
        // 如果处理成功且有下一个处理器,继续传递
        if (success && next_handler_) {
            return next_handler_->handleMessage(context);
        }
        
        // 如果没有下一个处理器,标记处理完成
        if (!context->isProcessed()) {
            context->finishProcessing(success);
        }
        
        return success;
    }
    
    /**
     * @brief 具体处理逻辑(子类实现)
     * @param context 消息上下文
     * @return 处理成功返回true
     */
    virtual bool processMessage(std::shared_ptr<MessageContext> context) = 0;
    
    /**
     * @brief 获取处理器统计信息
     * @return 统计信息字符串
     */
    virtual std::string getStats() const {
        char buffer[256];
        snprintf(buffer, sizeof(buffer),
                "Handler: %s\n"
                "Processed: %llu\n"
                "Errors: %llu\n"
                "Error Rate: %.2f%%\n",
                handler_name_.c_str(),
                processed_count_.load(),
                error_count_.load(),
                processed_count_ > 0 ? 
                    (error_count_.load() * 100.0 / processed_count_.load()) : 0.0);
        return std::string(buffer);
    }
    
    /**
     * @brief 获取处理器名称
     * @return 处理器名称
     */
    std::string getName() const { return handler_name_; }
};
​
/**
 * @class AuthenticationHandler
 * @brief 认证处理器
 * @details 验证消息发送者的身份和权限
 */
class AuthenticationHandler : public MessageHandler {
private:
    std::shared_ptr<DeviceManager> device_manager_; ///< 设备管理器引用
    
public:
    /**
     * @brief 构造函数
     * @param device_mgr 设备管理器
     */
    AuthenticationHandler(std::shared_ptr<DeviceManager> device_mgr)
        : MessageHandler("AuthenticationHandler"), device_manager_(device_mgr) {}
    
    /**
     * @brief 处理认证逻辑
     * @param context 消息上下文
     * @return 认证成功返回true
     * 
     * 策略模式:支持多种认证方式
     */
    bool processMessage(std::shared_ptr<MessageContext> context) override {
        const std::string& device_id = context->getSourceDeviceId();
        
        // 检查设备是否在线
        // 这里简化处理,实际应该检查设备证书、令牌等
        int conn_fd = device_manager_->getConnectionByDeviceId(device_id);
        if (conn_fd < 0) {
            // 设备未注册或离线
            return false;
        }
        
        // 检查路由配置是否需要认证
        const RouteConfig& config = context->getRouteConfig();
        if (config.require_auth) {
            // 这里应该实现具体的认证逻辑
            // 例如:检查API密钥、JWT令牌等
            // 简化实现:所有已连接设备都认为已认证
            return true;
        }
        
        // 不需要认证,直接通过
        return true;
    }
};
​
/**
 * @class ValidationHandler
 * @brief 消息验证处理器
 * @details 验证消息格式和内容
 */
class ValidationHandler : public MessageHandler {
public:
    ValidationHandler() : MessageHandler("ValidationHandler") {}
    
    /**
     * @brief 验证消息
     * @param context 消息上下文
     * @return 验证通过返回true
     */
    bool processMessage(std::shared_ptr<MessageContext> context) override {
        const WebSocketMessage& msg = context->getOriginalMessage();
        
        // 验证消息类型
        if (msg.type == MessageType::ERROR_RESPONSE) {
            // 错误消息不需要进一步处理
            return false;
        }
        
        // 验证负载大小
        if (msg.payload.size() > MAX_DEVICE_MESSAGE_SIZE) {
            // 消息太大
            return false;
        }
        
        // 验证消息时间戳(防止重放攻击)
        uint64_t now = WebSocketConnection::getCurrentTimestamp();
        if (msg.timestamp > now + 60000) {
            // 消息时间戳比当前时间晚1分钟以上
            return false;
        }
        if (now - msg.timestamp > 300000) {
            // 消息时间戳比当前时间早5分钟以上
            return false;
        }
        
        // 根据消息类型进行具体验证
        switch (msg.type) {
            case MessageType::DEVICE_DATA:
                return validateDeviceData(msg.payload);
                
            case MessageType::CONTROL_COMMAND:
                return validateControlCommand(msg.payload);
                
            case MessageType::DEVICE_REGISTER:
                return validateDeviceRegister(msg.payload);
                
            default:
                // 其他消息类型暂时通过
                return true;
        }
    }
    
private:
    /**
     * @brief 验证设备数据
     * @param payload 负载数据
     * @return 验证通过返回true
     */
    bool validateDeviceData(const std::vector<uint8_t>& payload) {
        if (payload.size() < sizeof(SensorData)) {
            return false;
        }
        
        // 验证传感器数据格式
        // 这里可以添加更复杂的验证逻辑
        return true;
    }
    
    /**
     * @brief 验证控制命令
     * @param payload 负载数据
     * @return 验证通过返回true
     */
    bool validateControlCommand(const std::vector<uint8_t>& payload) {
        if (payload.size() < sizeof(ControlCommand)) {
            return false;
        }
        
        // 验证命令格式
        const ControlCommand* cmd = 
            reinterpret_cast<const ControlCommand*>(payload.data());
        
        // 检查校验和
        uint8_t calculated = cmd->calculateChecksum();
        if (calculated != cmd->checksum) {
            return false;
        }
        
        return true;
    }
    
    /**
     * @brief 验证设备注册消息
     * @param payload 负载数据
     * @return 验证通过返回true
     */
    bool validateDeviceRegister(const std::vector<uint8_t>& payload) {
        if (payload.size() < sizeof(DeviceInfo)) {
            return false;
        }
        
        const DeviceInfo* info = 
            reinterpret_cast<const DeviceInfo*>(payload.data());
        
        // 验证设备ID格式
        if (strlen(info->device_id) == 0 || strlen(info->device_id) > 31) {
            return false;
        }
        
        // 验证设备类型
        if (info->type < DeviceType::TEMPERATURE_SENSOR || 
            info->type > DeviceType::CONTROLLER) {
            return false;
        }
        
        return true;
    }
};
​
/**
 * @class RoutingHandler
 * @brief 路由处理器
 * @details 根据规则决定消息的目标
 * 
 * 设计模式:策略模式(Strategy Pattern)
 * - 处理突发事件:路由规则冲突、目标不可达、规则解析失败
 * - 工程作用:灵活的路由策略,支持动态路由更新
 */
class RoutingHandler : public MessageHandler {
private:
    std::vector<RouteConfig> routing_rules_;     ///< 路由规则列表
    std::shared_ptr<DeviceManager> device_manager_; ///< 设备管理器
    mutable std::shared_mutex rules_mutex_;      ///< 规则读写锁
    
public:
    RoutingHandler(std::shared_ptr<DeviceManager> device_mgr)
        : MessageHandler("RoutingHandler"), device_manager_(device_mgr) {}
    
    /**
     * @brief 添加路由规则
     * @param rule 路由规则
     */
    void addRoutingRule(const RouteConfig& rule) {
        std::unique_lock<std::shared_mutex> lock(rules_mutex_);
        routing_rules_.push_back(rule);
        
        // 按优先级排序
        std::sort(routing_rules_.begin(), routing_rules_.end(),
                 [](const RouteConfig& a, const RouteConfig& b) {
                     return a.priority > b.priority;
                 });
        
        std::cout << "Routing rule added: " 
                  << static_cast<int>(rule.message_type) << std::endl;
    }
    
    /**
     * @brief 处理路由逻辑
     * @param context 消息上下文
     * @return 路由成功返回true
     */
    bool processMessage(std::shared_ptr<MessageContext> context) override {
        const WebSocketMessage& msg = context->getOriginalMessage();
        const std::string& source_id = context->getSourceDeviceId();
        
        // 查找匹配的路由规则
        RouteConfig matched_rule;
        if (!findMatchingRule(msg.type, source_id, matched_rule)) {
            // 没有匹配的路由规则
            return false;
        }
        
        // 设置路由配置
        context->setRouteConfig(matched_rule);
        
        // 确定目标设备
        std::string target_id = determineTarget(msg, source_id, matched_rule);
        if (target_id.empty()) {
            // 无法确定目标设备
            return false;
        }
        
        // 检查目标设备是否可达
        if (!isTargetReachable(target_id)) {
            return false;
        }
        
        // 设置目标设备ID
        context->setTargetDeviceId(target_id);
        
        return true;
    }
    
private:
    /**
     * @brief 查找匹配的路由规则
     * @param msg_type 消息类型
     * @param source_id 源设备ID
     * @param matched_rule 输出匹配的规则
     * @return 找到匹配规则返回true
     */
    bool findMatchingRule(MessageType msg_type, const std::string& source_id,
                         RouteConfig& matched_rule) {
        std::shared_lock<std::shared_mutex> lock(rules_mutex_);
        
        for (const auto& rule : routing_rules_) {
            // 检查消息类型
            if (rule.message_type != msg_type) {
                continue;
            }
            
            // 检查源设备模式匹配
            if (!matchPattern(source_id, rule.source_pattern)) {
                continue;
            }
            
            // 找到匹配规则
            matched_rule = rule;
            return true;
        }
        
        return false;
    }
    
    /**
     * @brief 确定目标设备
     * @param msg 消息
     * @param source_id 源设备ID
     * @param rule 路由规则
     * @return 目标设备ID
     */
    std::string determineTarget(const WebSocketMessage& msg,
                               const std::string& source_id,
                               const RouteConfig& rule) {
        // 根据路由规则确定目标
        if (rule.target_pattern == "*") {
            // 广播到所有设备
            // 这里返回特殊标识,后续处理器会处理广播
            return "BROADCAST";
        } else if (rule.target_pattern == "GROUP:*") {
            // 广播到源设备所在的所有组
            auto groups = device_manager_->getGroupsForDevice(source_id);
            if (!groups.empty()) {
                // 选择第一个组
                return "GROUP:" + groups[0];
            }
            return "";
        } else if (rule.target_pattern.find("GROUP:") == 0) {
            // 发送到特定组
            return rule.target_pattern;
        } else if (rule.target_pattern.find("*") != std::string::npos) {
            // 通配符匹配
            // 这里需要实现通配符匹配逻辑
            return rule.target_pattern;
        } else {
            // 直接设备ID
            return rule.target_pattern;
        }
    }
    
    /**
     * @brief 检查目标是否可达
     * @param target_id 目标设备ID或组ID
     * @return 可达返回true
     */
    bool isTargetReachable(const std::string& target_id) {
        if (target_id == "BROADCAST") {
            // 广播总是可达
            return true;
        } else if (target_id.find("GROUP:") == 0) {
            // 检查组是否存在
            std::string group_id = target_id.substr(6);
            auto devices = device_manager_->getDevicesInGroup(group_id);
            return !devices.empty();
        } else {
            // 检查设备是否在线
            int conn_fd = device_manager_->getConnectionByDeviceId(target_id);
            return conn_fd >= 0;
        }
    }
    
    /**
     * @brief 模式匹配函数
     * @param str 待匹配字符串
     * @param pattern 模式字符串(支持*通配符)
     * @return 匹配返回true
     */
    bool matchPattern(const std::string& str, const std::string& pattern) {
        if (pattern == "*") {
            return true;
        }
        
        // 简单通配符匹配实现
        size_t str_pos = 0, pat_pos = 0;
        size_t str_len = str.length(), pat_len = pattern.length();
        size_t star_pos = std::string::npos, str_star_pos = 0;
        
        while (str_pos < str_len) {
            if (pat_pos < pat_len && 
                (pattern[pat_pos] == '?' || pattern[pat_pos] == str[str_pos])) {
                // 字符匹配或通配符?
                str_pos++;
                pat_pos++;
            } else if (pat_pos < pat_len && pattern[pat_pos] == '*') {
                // 遇到*通配符
                star_pos = pat_pos;
                str_star_pos = str_pos;
                pat_pos++;
            } else if (star_pos != std::string::npos) {
                // 使用*通配符匹配多个字符
                pat_pos = star_pos + 1;
                str_pos = str_star_pos + 1;
                str_star_pos = str_pos;
            } else {
                return false;
            }
        }
        
        // 跳过末尾的*通配符
        while (pat_pos < pat_len && pattern[pat_pos] == '*') {
            pat_pos++;
        }
        
        return pat_pos == pat_len;
    }
};
​
/**
 * @class TransformationHandler
 * @brief 消息转换处理器
 * @details 对消息进行格式转换、数据过滤等操作
 */
class TransformationHandler : public MessageHandler {
public:
    TransformationHandler() : MessageHandler("TransformationHandler") {}
    
    /**
     * @brief 处理消息转换
     * @param context 消息上下文
     * @return 转换成功返回true
     */
    bool processMessage(std::shared_ptr<MessageContext> context) override {
        const WebSocketMessage& msg = context->getOriginalMessage();
        std::vector<uint8_t> processed = msg.payload;
        
        // 根据消息类型进行转换
        switch (msg.type) {
            case MessageType::DEVICE_DATA:
                processed = transformDeviceData(msg.payload);
                break;
                
            case MessageType::CONTROL_COMMAND:
                processed = transformControlCommand(msg.payload);
                break;
                
            default:
                // 其他类型不转换
                break;
        }
        
        // 更新处理后的负载
        context->setProcessedPayload(processed);
        
        return true;
    }
    
private:
    /**
     * @brief 转换设备数据
     * @param payload 原始负载
     * @return 转换后的负载
     */
    std::vector<uint8_t> transformDeviceData(const std::vector<uint8_t>& payload) {
        if (payload.size() < sizeof(SensorData)) {
            return payload;  ///< 数据太小,不转换
        }
        
        const SensorData* sensor_data = 
            reinterpret_cast<const SensorData*>(payload.data());
        
        // 这里可以实现数据转换逻辑
        // 例如:单位转换、数据过滤、格式标准化等
        
        // 简单示例:复制数据
        std::vector<uint8_t> result(payload.begin(), payload.end());
        return result;
    }
    
    /**
     * @brief 转换控制命令
     * @param payload 原始负载
     * @return 转换后的负载
     */
    std::vector<uint8_t> transformControlCommand(const std::vector<uint8_t>& payload) {
        if (payload.size() < sizeof(ControlCommand)) {
            return payload;
        }
        
        ControlCommand* cmd = 
            reinterpret_cast<ControlCommand*>(const_cast<uint8_t*>(payload.data()));
        
        // 重新计算校验和
        cmd->checksum = cmd->calculateChecksum();
        
        // 复制数据
        std::vector<uint8_t> result(payload.begin(), payload.end());
        return result;
    }
};
​
/**
 * @class DeliveryHandler
 * @brief 消息投递处理器
 * @details 将消息发送到目标设备
 */
class DeliveryHandler : public MessageHandler {
private:
    std::shared_ptr<DeviceManager> device_manager_; ///< 设备管理器
    std::function<void(int, const std::vector<uint8_t>&)> send_callback_; ///< 发送回调
    
public:
    DeliveryHandler(std::shared_ptr<DeviceManager> device_mgr,
                   std::function<void(int, const std::vector<uint8_t>&)> callback)
        : MessageHandler("DeliveryHandler"), 
          device_manager_(device_mgr),
          send_callback_(callback) {}
    
    /**
     * @brief 处理消息投递
     * @param context 消息上下文
     * @return 投递成功返回true
     */
    bool processMessage(std::shared_ptr<MessageContext> context) override {
        const std::string& target_id = context->getTargetDeviceId();
        const std::vector<uint8_t>& payload = context->getProcessedPayload();
        
        if (target_id == "BROADCAST") {
            // 广播消息
            return broadcastMessage(context);
        } else if (target_id.find("GROUP:") == 0) {
            // 组播消息
            return multicastMessage(context);
        } else {
            // 单播消息
            return unicastMessage(context);
        }
    }
    
private:
    /**
     * @brief 单播消息
     * @param context 消息上下文
     * @return 发送成功返回true
     */
    bool unicastMessage(std::shared_ptr<MessageContext> context) {
        const std::string& target_id = context->getTargetDeviceId();
        
        // 获取目标设备的连接
        int conn_fd = device_manager_->getConnectionByDeviceId(target_id);
        if (conn_fd < 0) {
            // 目标设备离线
            return false;
        }
        
        // 构造WebSocket消息
        WebSocketMessage response;
        response.type = context->getOriginalMessage().type;
        response.message_id = generateMessageId();
        response.timestamp = WebSocketConnection::getCurrentTimestamp();
        response.payload = context->getProcessedPayload();
        
        // 序列化消息
        std::vector<uint8_t> serialized;
        response.serialize(serialized);
        
        // 发送消息
        if (send_callback_) {
            send_callback_(conn_fd, serialized);
        }
        
        return true;
    }
    
    /**
     * @brief 广播消息
     * @param context 消息上下文
     * @return 广播成功返回true
     */
    bool broadcastMessage(std::shared_ptr<MessageContext> context) {
        // 获取所有在线设备
        auto online_devices = device_manager_->getAllOnlineDevices();
        
        bool all_success = true;
        for (const auto& device_id : online_devices) {
            // 跳过源设备(避免自己接收自己的广播)
            if (device_id == context->getSourceDeviceId()) {
                continue;
            }
            
            // 获取设备连接
            int conn_fd = device_manager_->getConnectionByDeviceId(device_id);
            if (conn_fd >= 0) {
                // 构造消息
                WebSocketMessage response;
                response.type = context->getOriginalMessage().type;
                response.message_id = generateMessageId();
                response.timestamp = WebSocketConnection::getCurrentTimestamp();
                response.payload = context->getProcessedPayload();
                
                std::vector<uint8_t> serialized;
                response.serialize(serialized);
                
                // 发送消息
                if (send_callback_) {
                    send_callback_(conn_fd, serialized);
                }
            } else {
                all_success = false;
            }
        }
        
        return all_success;
    }
    
    /**
     * @brief 组播消息
     * @param context 消息上下文
     * @return 组播成功返回true
     */
    bool multicastMessage(std::shared_ptr<MessageContext> context) {
        std::string group_id = context->getTargetDeviceId().substr(6); // 去掉"GROUP:"
        
        // 获取组内所有设备
        auto group_devices = device_manager_->getDevicesInGroup(group_id);
        
        bool all_success = true;
        for (const auto& device_id : group_devices) {
            // 跳过源设备
            if (device_id == context->getSourceDeviceId()) {
                continue;
            }
            
            // 获取设备连接
            int conn_fd = device_manager_->getConnectionByDeviceId(device_id);
            if (conn_fd >= 0) {
                // 构造消息
                WebSocketMessage response;
                response.type = context->getOriginalMessage().type;
                response.message_id = generateMessageId();
                response.timestamp = WebSocketConnection::getCurrentTimestamp();
                response.payload = context->getProcessedPayload();
                
                std::vector<uint8_t> serialized;
                response.serialize(serialized);
                
                // 发送消息
                if (send_callback_) {
                    send_callback_(conn_fd, serialized);
                }
            } else {
                all_success = false;
            }
        }
        
        return all_success;
    }
    
    /**
     * @brief 生成消息ID
     * @return 消息ID
     */
    uint64_t generateMessageId() {
        static std::atomic<uint64_t> counter{0};
        return ++counter;
    }
};
​
/**
 * @class MessageRouter
 * @brief 消息路由器主类
 * @details 协调所有处理器,管理消息处理流水线
 * 
 * 设计模式:外观模式(Facade Pattern)
 * - 处理突发事件:处理器链故障、消息积压、资源耗尽
 * - 工程作用:简化客户端接口,统一管理处理器链
 */
class MessageRouter {
private:
    std::shared_ptr<DeviceManager> device_manager_; ///< 设备管理器
    std::shared_ptr<MessageHandler> handler_chain_; ///< 处理器链
    
    // 消息队列
    std::queue<std::shared_ptr<MessageContext>> message_queue_; ///< 消息队列
    mutable std::mutex queue_mutex_;                           ///< 队列互斥锁
    std::condition_variable queue_cv_;                         ///< 队列条件变量
    
    // 工作线程
    std::vector<std::thread> worker_threads_;                  ///< 工作线程池
    std::atomic<bool> running_{false};                         ///< 运行标志
    std::atomic<size_t> active_workers_{0};                    ///< 活跃工作线程数
    
    // 统计信息
    std::atomic<uint64_t> total_messages_{0};                  ///< 总处理消息数
    std::atomic<uint64_t> successful_messages_{0};             ///< 成功消息数
    std::atomic<uint64_t> failed_messages_{0};                 ///< 失败消息数
    std::atomic<uint64_t> queue_size_{0};                      ///< 当前队列大小
    
public:
    /**
     * @brief 构造函数
     * @param device_mgr 设备管理器
     * @param send_callback 消息发送回调函数
     */
    MessageRouter(std::shared_ptr<DeviceManager> device_mgr,
                 std::function<void(int, const std::vector<uint8_t>&)> send_callback)
        : device_manager_(device_mgr) {
        
        // 构建处理器链(责任链模式)
        buildHandlerChain(send_callback);
        
        // 初始化路由规则
        initializeRoutingRules();
    }
    
    /**
     * @brief 启动路由器
     * @param thread_count 工作线程数
     */
    void start(size_t thread_count = WORKER_THREADS) {
        running_ = true;
        
        // 创建工作线程
        for (size_t i = 0; i < thread_count; ++i) {
            worker_threads_.emplace_back(&MessageRouter::workerThread, this, i);
        }
        
        std::cout << "Message router started with " 
                  << thread_count << " worker threads" << std::endl;
    }
    
    /**
     * @brief 停止路由器
     */
    void stop() {
        running_ = false;
        
        // 通知所有等待的线程
        queue_cv_.notify_all();
        
        // 等待工作线程结束
        for (auto& thread : worker_threads_) {
            if (thread.joinable()) {
                thread.join();
            }
        }
        
        worker_threads_.clear();
        
        std::cout << "Message router stopped" << std::endl;
    }
    
    /**
     * @brief 提交消息处理
     * @param message WebSocket消息
     * @param source_device_id 源设备ID
     * @return 提交成功返回true
     */
    bool submitMessage(const WebSocketMessage& message, 
                      const std::string& source_device_id) {
        // 创建消息上下文
        auto context = std::make_shared<MessageContext>(message, source_device_id);
        
        {
            std::lock_guard<std::mutex> lock(queue_mutex_);
            
            // 检查队列大小限制
            if (message_queue_.size() >= 10000) {
                // 队列已满,丢弃消息
                failed_messages_++;
                return false;
            }
            
            // 添加到队列
            message_queue_.push(context);
            queue_size_ = message_queue_.size();
        }
        
        total_messages_++;
        
        // 通知工作线程
        queue_cv_.notify_one();
        
        return true;
    }
    
    /**
     * @brief 获取路由器统计信息
     * @return 统计信息字符串
     */
    std::string getStats() const {
        char buffer[512];
        snprintf(buffer, sizeof(buffer),
                "=== Message Router Stats ===\n"
                "Total Messages: %llu\n"
                "Successful: %llu\n"
                "Failed: %llu\n"
                "Success Rate: %.2f%%\n"
                "Queue Size: %llu\n"
                "Active Workers: %llu\n"
                "Worker Threads: %zu\n",
                total_messages_.load(),
                successful_messages_.load(),
                failed_messages_.load(),
                total_messages_ > 0 ? 
                    (successful_messages_.load() * 100.0 / total_messages_.load()) : 0.0,
                queue_size_.load(),
                active_workers_.load(),
                worker_threads_.size());
        
        return std::string(buffer);
    }
    
    /**
     * @brief 获取处理器链统计信息
     * @return 统计信息字符串
     */
    std::string getHandlerStats() const {
        std::stringstream ss;
        ss << "=== Handler Chain Stats ===\n";
        
        std::shared_ptr<MessageHandler> current = handler_chain_;
        while (current) {
            ss << current->getStats() << "\n";
            // 这里需要添加获取下一个处理器的方法
            // 实际实现中MessageHandler需要提供getNext()方法
            break;  // 简化处理
        }
        
        return ss.str();
    }
    
private:
    /**
     * @brief 构建处理器链
     * @param send_callback 发送回调函数
     */
    void buildHandlerChain(
        std::function<void(int, const std::vector<uint8_t>&)> send_callback) {
        
        // 创建处理器(按处理顺序)
        auto auth_handler = std::make_shared<AuthenticationHandler>(device_manager_);
        auto validation_handler = std::make_shared<ValidationHandler>();
        auto routing_handler = std::make_shared<RoutingHandler>(device_manager_);
        auto transform_handler = std::make_shared<TransformationHandler>();
        auto delivery_handler = std::make_shared<DeliveryHandler>(
            device_manager_, send_callback);
        
        // 构建责任链
        auth_handler->setNextHandler(validation_handler);
        validation_handler->setNextHandler(routing_handler);
        routing_handler->setNextHandler(transform_handler);
        transform_handler->setNextHandler(delivery_handler);
        
        // 设置处理器链头
        handler_chain_ = auth_handler;
    }
    
    /**
     * @brief 初始化路由规则
     */
    void initializeRoutingRules() {
        // 获取路由处理器
        auto routing_handler = std::dynamic_pointer_cast<RoutingHandler>(
            handler_chain_->getNextHandler()->getNextHandler());
        
        if (!routing_handler) {
            return;
        }
        
        // 添加默认路由规则
        
        // 规则1:设备数据转发到监控组
        RouteConfig rule1;
        rule1.message_type = MessageType::DEVICE_DATA;
        rule1.source_pattern = "*";
        rule1.target_pattern = "GROUP:monitoring";
        rule1.priority = 60;
        rule1.require_auth = true;
        rule1.rate_limit = 1000;
        routing_handler->addRoutingRule(rule1);
        
        // 规则2:控制命令转发到指定设备
        RouteConfig rule2;
        rule2.message_type = MessageType::CONTROL_COMMAND;
        rule2.source_pattern = "controller_*";
        rule2.target_pattern = "*";
        rule2.priority = 80;
        rule2.require_auth = true;
        rule2.rate_limit = 100;
        routing_handler->addRoutingRule(rule2);
        
        // 规则3:设备注册消息广播
        RouteConfig rule3;
        rule3.message_type = MessageType::DEVICE_REGISTER;
        rule3.source_pattern = "*";
        rule3.target_pattern = "BROADCAST";
        rule3.priority = 40;
        rule3.require_auth = false;
        rule3.rate_limit = 10;
        routing_handler->addRoutingRule(rule3);
        
        std::cout << "Default routing rules initialized" << std::endl;
    }
    
    /**
     * @brief 工作线程函数
     * @param thread_id 线程ID
     */
    void workerThread(int thread_id) {
        std::cout << "Worker thread " << thread_id << " started" << std::endl;
        
        while (running_) {
            std::shared_ptr<MessageContext> context;
            
            {
                std::unique_lock<std::mutex> lock(queue_mutex_);
                
                // 等待消息或停止信号
                queue_cv_.wait(lock, [this]() {
                    return !running_ || !message_queue_.empty();
                });
                
                if (!running_ && message_queue_.empty()) {
                    break;
                }
                
                if (message_queue_.empty()) {
                    continue;
                }
                
                // 获取消息
                context = message_queue_.front();
                message_queue_.pop();
                queue_size_ = message_queue_.size();
            }
            
            if (context) {
                active_workers_++;
                
                // 处理消息
                bool success = handler_chain_->handleMessage(context);
                
                if (success) {
                    successful_messages_++;
                } else {
                    failed_messages_++;
                    
                    // 记录失败详情
                    std::cerr << "Message processing failed: " 
                              << context->generateReport() << std::endl;
                }
                
                active_workers_--;
                
                // 记录处理延迟(如果启用了日志)
                if (context->getRouteConfig().enable_logging) {
                    std::cout << "Message processed: delay=" 
                              << context->getProcessingDelay() << "ms" << std::endl;
                }
            }
        }
        
        std::cout << "Worker thread " << thread_id << " stopped" << std::endl;
    }
};
​
} // namespace iot

设计模式分析(第四部分)

1. 责任链模式(Chain of Responsibility Pattern)

  • 应用位置MessageHandler 基类及其子类形成的处理器链

  • 处理突发事件

    • 处理器故障:单个处理器失败不影响整个链

    • 消息格式错误:验证处理器可以提前拒绝无效消息

    • 处理顺序调整:动态调整处理器顺序

  • 工程作用

    • 解耦消息发送者和多个接收者

    • 动态添加或删除处理步骤

    • 每个处理器只关心自己的职责

  • 性能分析

    • 处理延迟:O(n)链式调用,n为处理器数量

    • 内存开销:每个处理器约100-200字节

    • 吞吐量:处理器链越长,吞吐量越低

2. 策略模式(Strategy Pattern)

  • 应用位置RoutingHandler 中的路由策略

  • 处理突发事件

    • 路由规则冲突:优先级策略解决冲突

    • 目标不可达:备选路由策略

    • 规则动态更新:支持热更新路由策略

  • 工程作用

    • 封装不同的路由算法

    • 运行时切换路由策略

    • 易于扩展新的路由策略

  • 性能分析

    • 规则匹配:O(m)时间复杂度,m为规则数量

    • 内存占用:每条规则约100字节

    • 匹配优化:规则按优先级排序,优先匹配高优先级

3. 上下文模式(Context Pattern)

  • 应用位置MessageContext 类封装处理上下文

  • 处理突发事件

    • 消息丢失:上下文记录处理状态

    • 处理超时:时间戳监控处理延迟

    • 依赖解析失败:错误码和消息记录

  • 工程作用

    • 统一传递处理参数

    • 支持中间件数据传递

    • 提供处理状态追踪

  • 性能分析

    • 上下文创建:O(1)固定开销

    • 数据存储:使用any类型支持任意数据类型

    • 线程安全:互斥锁保护共享数据

4. 外观模式(Facade Pattern)

  • 应用位置MessageRouter 类提供简化接口

  • 处理突发事件

    • 客户端调用复杂:隐藏处理器链细节

    • 资源管理复杂:统一管理线程池和队列

    • 错误处理分散:集中错误处理和统计

  • 工程作用

    • 简化客户端接口

    • 统一资源管理

    • 集中监控和统计

运行性能分析

消息处理流水线:

  1. 认证阶段:O(1)设备查找,快速拒绝未认证消息

  2. 验证阶段:O(1)格式检查,防止恶意消息

  3. 路由阶段:O(m)规则匹配,m通常较小(<100)

  4. 转换阶段:O(n)数据转换,n为数据大小

  5. 投递阶段:O(k)目标查找,k为设备数量

线程池优化:

  1. 工作窃取:空闲线程从其他线程队列窃取任务

  2. 负载均衡:基于队列长度的动态任务分配

  3. 线程复用:避免频繁创建销毁线程

内存管理:

  1. 消息池:复用消息对象减少分配

  2. 缓冲区重用:payload缓冲区重复使用

  3. 智能指针:自动内存管理

数据流结构大小设计

1. 消息上下文内存占用:

MessageContext内存估算:
- 原始消息:~100字节(头部)+ payload
- 处理后的payload:与原始payload相当
- 字符串:设备ID平均32字节 * 2 = 64字节
- 路由配置:~50字节
- 时间戳:24字节
- 额外数据:动态大小,平均100字节
- 锁:40字节
- 原子变量:8字节
- 总计:基础~400字节 + payload大小

2. 处理器链开销:

处理器内存估算(每个):
- 虚表指针:8字节
- 下一个处理器指针:8字节
- 名称字符串:平均32字节
- 统计变量:24字节
- 设备管理器指针:8字节
- 总计:~80字节/处理器
处理器链总计:5 * 80 = 400字节

3. 路由规则内存:

每条路由规则:
- 消息类型:1字节
- 字符串:模式平均32字节 * 2 = 64字节
- 整数字段:12字节
- 布尔字段:1字节(对齐到4字节)
- 总计:~80字节/规则
100条规则:8KB

突发事件处理策略

突发事件 处理策略 设计模式应用
消息风暴 队列缓冲,速率限制 责任链模式的队列管理
处理器故障 故障隔离,错误计数 责任链模式的链式传递
路由规则冲突 优先级排序,规则验证 策略模式的规则管理
内存泄漏 对象池,智能指针 外观模式的统一资源管理
处理延迟 超时控制,异步处理 上下文模式的时间戳监控

第四部分总结:实现了完整的消息路由和处理系统,采用责任链模式构建处理流水线,策略模式实现灵活路由,上下文模式传递处理状态,外观模式提供简化接口。为IoT平台提供了可靠、高效的消息处理能力。

Logo

openvela 操作系统专为 AIoT 领域量身定制,以轻量化、标准兼容、安全性和高度可扩展性为核心特点。openvela 以其卓越的技术优势,已成为众多物联网设备和 AI 硬件的技术首选,涵盖了智能手表、运动手环、智能音箱、耳机、智能家居设备以及机器人等多个领域。

更多推荐