IoT物联网平台 - Part 4: WebSocket 消息路由器
消息路由系统实现了一个基于责任链模式的高效消息处理流水线,包含认证、验证、路由、转换和投递五个处理器。系统采用策略模式实现灵活路由规则匹配,支持单播、组播和广播三种消息分发方式。通过上下文模式传递处理状态,记录时间戳和错误信息。外观模式封装复杂处理逻辑,提供简洁接口。系统具备队列缓冲、速率限制和故障隔离机制,能处理消息风暴等突发事件。统计模块实时监控处理性能,工作线程池优化资源利用率。该设计为Io
文件4: message_router.cpp - 消息路由和分发
/**
* @file message_router.cpp
* @brief 消息路由器实现
* @author IoT Platform Team
* @version 1.0
* @date 2024
*
* 实现责任链模式和策略模式
* 处理消息解析、路由、转换和分发
*/
#include "config.h"
#include "device_manager.h"
#include <memory>
#include <vector>
#include <queue>
#include <unordered_map>
#include <functional>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <chrono>
#include <sstream>
#include <iomanip>
namespace iot {
// 前向声明
class WebSocketConnection;
class DeviceManager;
/**
* @struct RouteConfig
* @brief 路由配置结构
* @details 定义消息路由规则和策略
*/
struct RouteConfig {
MessageType message_type; ///< 消息类型
std::string source_pattern; ///< 源设备模式(支持通配符)
std::string target_pattern; ///< 目标设备模式
uint32_t priority; ///< 路由优先级(0-100,越高越优先)
bool require_auth; ///< 是否需要认证
size_t rate_limit; ///< 速率限制(消息/秒)
bool enable_logging; ///< 是否启用日志
RouteConfig() : message_type(MessageType::DEVICE_DATA),
priority(50),
require_auth(true),
rate_limit(1000),
enable_logging(false) {}
};
/**
* @class MessageContext
* @brief 消息上下文类
* @details 封装消息处理过程中的所有上下文信息
*
* 设计模式:上下文模式(Context Pattern)
* - 处理突发事件:消息丢失、处理超时、依赖解析失败
* - 工程作用:传递处理状态,支持中间件链式调用
*/
class MessageContext {
private:
WebSocketMessage original_message_; ///< 原始消息
std::vector<uint8_t> processed_payload_; ///< 处理后的负载
std::string source_device_id_; ///< 源设备ID
std::string target_device_id_; ///< 目标设备ID
RouteConfig route_config_; ///< 路由配置
// 处理状态
std::atomic<bool> processed_{false}; ///< 是否已处理
std::atomic<bool> successful_{false}; ///< 是否处理成功
std::atomic<uint32_t> error_code_{0}; ///< 错误码
std::string error_message_; ///< 错误信息
// 时间戳记录
uint64_t receive_timestamp_; ///< 接收时间戳
uint64_t start_process_timestamp_; ///< 开始处理时间戳
uint64_t end_process_timestamp_; ///< 结束处理时间戳
// 扩展数据(用于中间件传递数据)
std::unordered_map<std::string, std::any> extra_data_; ///< 额外数据存储
mutable std::mutex context_mutex_; ///< 上下文互斥锁
public:
/**
* @brief 构造函数
* @param message 原始WebSocket消息
* @param source_id 源设备ID
*/
MessageContext(const WebSocketMessage& message, const std::string& source_id)
: original_message_(message),
source_device_id_(source_id),
receive_timestamp_(WebSocketConnection::getCurrentTimestamp()) {
processed_payload_ = message.payload; ///< 初始化为原始负载
}
/**
* @brief 设置路由配置
* @param config 路由配置
*/
void setRouteConfig(const RouteConfig& config) {
std::lock_guard<std::mutex> lock(context_mutex_);
route_config_ = config;
}
/**
* @brief 获取路由配置
* @return 路由配置引用
*/
const RouteConfig& getRouteConfig() const {
std::lock_guard<std::mutex> lock(context_mutex_);
return route_config_;
}
/**
* @brief 设置目标设备ID
* @param target_id 目标设备ID
*/
void setTargetDeviceId(const std::string& target_id) {
std::lock_guard<std::mutex> lock(context_mutex_);
target_device_id_ = target_id;
}
/**
* @brief 获取目标设备ID
* @return 目标设备ID
*/
std::string getTargetDeviceId() const {
std::lock_guard<std::mutex> lock(context_mutex_);
return target_device_id_;
}
/**
* @brief 获取源设备ID
* @return 源设备ID
*/
std::string getSourceDeviceId() const {
std::lock_guard<std::mutex> lock(context_mutex_);
return source_device_id_;
}
/**
* @brief 获取原始消息
* @return 原始消息引用
*/
const WebSocketMessage& getOriginalMessage() const {
return original_message_;
}
/**
* @brief 获取处理后的负载
* @return 负载数据引用
*/
const std::vector<uint8_t>& getProcessedPayload() const {
std::lock_guard<std::mutex> lock(context_mutex_);
return processed_payload_;
}
/**
* @brief 设置处理后的负载
* @param payload 新的负载数据
*/
void setProcessedPayload(const std::vector<uint8_t>& payload) {
std::lock_guard<std::mutex> lock(context_mutex_);
processed_payload_ = payload;
}
/**
* @brief 添加额外数据
* @param key 数据键
* @param value 数据值
*/
template<typename T>
void setExtraData(const std::string& key, const T& value) {
std::lock_guard<std::mutex> lock(context_mutex_);
extra_data_[key] = value;
}
/**
* @brief 获取额外数据
* @param key 数据键
* @return 数据值,不存在抛出异常
*/
template<typename T>
T getExtraData(const std::string& key) const {
std::lock_guard<std::mutex> lock(context_mutex_);
auto it = extra_data_.find(key);
if (it == extra_data_.end()) {
throw std::runtime_error("Extra data not found: " + key);
}
return std::any_cast<T>(it->second);
}
/**
* @brief 开始处理消息
*/
void startProcessing() {
start_process_timestamp_ = WebSocketConnection::getCurrentTimestamp();
}
/**
* @brief 标记消息处理完成
* @param success 是否成功
* @param error_code 错误码(成功时为0)
* @param error_msg 错误信息
*/
void finishProcessing(bool success, uint32_t error_code = 0,
const std::string& error_msg = "") {
end_process_timestamp_ = WebSocketConnection::getCurrentTimestamp();
processed_ = true;
successful_ = success;
error_code_ = error_code;
error_message_ = error_msg;
}
/**
* @brief 检查消息是否已处理
* @return 已处理返回true
*/
bool isProcessed() const {
return processed_;
}
/**
* @brief 检查处理是否成功
* @return 成功返回true
*/
bool isSuccessful() const {
return successful_;
}
/**
* @brief 获取处理延迟(毫秒)
* @return 处理延迟
*/
uint64_t getProcessingDelay() const {
if (!processed_) {
return 0;
}
return end_process_timestamp_ - start_process_timestamp_;
}
/**
* @brief 获取总延迟(从接收到完成)
* @return 总延迟
*/
uint64_t getTotalDelay() const {
if (!processed_) {
return WebSocketConnection::getCurrentTimestamp() - receive_timestamp_;
}
return end_process_timestamp_ - receive_timestamp_;
}
/**
* @brief 生成消息处理报告
* @return 报告字符串
*/
std::string generateReport() const {
std::lock_guard<std::mutex> lock(context_mutex_);
std::stringstream ss;
ss << "=== Message Processing Report ===\n"
<< "Message ID: " << original_message_.message_id << "\n"
<< "Message Type: " << static_cast<int>(original_message_.type) << "\n"
<< "Source: " << source_device_id_ << "\n"
<< "Target: " << target_device_id_ << "\n"
<< "Payload Size: " << original_message_.payload.size() << " bytes\n"
<< "Status: " << (successful_ ? "SUCCESS" : "FAILED") << "\n";
if (!successful_) {
ss << "Error Code: " << error_code_ << "\n"
<< "Error Message: " << error_message_ << "\n";
}
ss << "Receive Time: " << receive_timestamp_ << "\n"
<< "Processing Delay: " << getProcessingDelay() << "ms\n"
<< "Total Delay: " << getTotalDelay() << "ms\n";
return ss.str();
}
};
/**
* @class MessageHandler
* @brief 消息处理器基类
* @details 定义消息处理接口,支持责任链模式
*
* 设计模式:责任链模式(Chain of Responsibility Pattern)
* - 处理突发事件:处理器故障、消息格式错误、依赖服务不可用
* - 工程作用:动态组合处理逻辑,支持灵活扩展
*/
class MessageHandler {
protected:
std::shared_ptr<MessageHandler> next_handler_; ///< 下一个处理器
std::string handler_name_; ///< 处理器名称
std::atomic<uint64_t> processed_count_{0}; ///< 处理消息计数
std::atomic<uint64_t> error_count_{0}; ///< 错误计数
public:
/**
* @brief 构造函数
* @param name 处理器名称
*/
explicit MessageHandler(const std::string& name) : handler_name_(name) {}
virtual ~MessageHandler() = default;
/**
* @brief 设置下一个处理器
* @param handler 下一个处理器指针
*/
void setNextHandler(std::shared_ptr<MessageHandler> handler) {
next_handler_ = handler;
}
/**
* @brief 处理消息(模板方法模式)
* @param context 消息上下文
* @return 处理成功返回true
*
* 实现责任链模式的传递机制
*/
bool handleMessage(std::shared_ptr<MessageContext> context) {
// 记录开始时间
if (!context->isProcessed()) {
context->startProcessing();
}
bool success = false;
try {
// 调用具体处理逻辑
success = processMessage(context);
if (success) {
processed_count_++;
} else {
error_count_++;
context->finishProcessing(false, 1000,
handler_name_ + " processing failed");
}
} catch (const std::exception& e) {
error_count_++;
context->finishProcessing(false, 1001,
handler_name_ + " exception: " + e.what());
success = false;
}
// 如果处理成功且有下一个处理器,继续传递
if (success && next_handler_) {
return next_handler_->handleMessage(context);
}
// 如果没有下一个处理器,标记处理完成
if (!context->isProcessed()) {
context->finishProcessing(success);
}
return success;
}
/**
* @brief 具体处理逻辑(子类实现)
* @param context 消息上下文
* @return 处理成功返回true
*/
virtual bool processMessage(std::shared_ptr<MessageContext> context) = 0;
/**
* @brief 获取处理器统计信息
* @return 统计信息字符串
*/
virtual std::string getStats() const {
char buffer[256];
snprintf(buffer, sizeof(buffer),
"Handler: %s\n"
"Processed: %llu\n"
"Errors: %llu\n"
"Error Rate: %.2f%%\n",
handler_name_.c_str(),
processed_count_.load(),
error_count_.load(),
processed_count_ > 0 ?
(error_count_.load() * 100.0 / processed_count_.load()) : 0.0);
return std::string(buffer);
}
/**
* @brief 获取处理器名称
* @return 处理器名称
*/
std::string getName() const { return handler_name_; }
};
/**
* @class AuthenticationHandler
* @brief 认证处理器
* @details 验证消息发送者的身份和权限
*/
class AuthenticationHandler : public MessageHandler {
private:
std::shared_ptr<DeviceManager> device_manager_; ///< 设备管理器引用
public:
/**
* @brief 构造函数
* @param device_mgr 设备管理器
*/
AuthenticationHandler(std::shared_ptr<DeviceManager> device_mgr)
: MessageHandler("AuthenticationHandler"), device_manager_(device_mgr) {}
/**
* @brief 处理认证逻辑
* @param context 消息上下文
* @return 认证成功返回true
*
* 策略模式:支持多种认证方式
*/
bool processMessage(std::shared_ptr<MessageContext> context) override {
const std::string& device_id = context->getSourceDeviceId();
// 检查设备是否在线
// 这里简化处理,实际应该检查设备证书、令牌等
int conn_fd = device_manager_->getConnectionByDeviceId(device_id);
if (conn_fd < 0) {
// 设备未注册或离线
return false;
}
// 检查路由配置是否需要认证
const RouteConfig& config = context->getRouteConfig();
if (config.require_auth) {
// 这里应该实现具体的认证逻辑
// 例如:检查API密钥、JWT令牌等
// 简化实现:所有已连接设备都认为已认证
return true;
}
// 不需要认证,直接通过
return true;
}
};
/**
* @class ValidationHandler
* @brief 消息验证处理器
* @details 验证消息格式和内容
*/
class ValidationHandler : public MessageHandler {
public:
ValidationHandler() : MessageHandler("ValidationHandler") {}
/**
* @brief 验证消息
* @param context 消息上下文
* @return 验证通过返回true
*/
bool processMessage(std::shared_ptr<MessageContext> context) override {
const WebSocketMessage& msg = context->getOriginalMessage();
// 验证消息类型
if (msg.type == MessageType::ERROR_RESPONSE) {
// 错误消息不需要进一步处理
return false;
}
// 验证负载大小
if (msg.payload.size() > MAX_DEVICE_MESSAGE_SIZE) {
// 消息太大
return false;
}
// 验证消息时间戳(防止重放攻击)
uint64_t now = WebSocketConnection::getCurrentTimestamp();
if (msg.timestamp > now + 60000) {
// 消息时间戳比当前时间晚1分钟以上
return false;
}
if (now - msg.timestamp > 300000) {
// 消息时间戳比当前时间早5分钟以上
return false;
}
// 根据消息类型进行具体验证
switch (msg.type) {
case MessageType::DEVICE_DATA:
return validateDeviceData(msg.payload);
case MessageType::CONTROL_COMMAND:
return validateControlCommand(msg.payload);
case MessageType::DEVICE_REGISTER:
return validateDeviceRegister(msg.payload);
default:
// 其他消息类型暂时通过
return true;
}
}
private:
/**
* @brief 验证设备数据
* @param payload 负载数据
* @return 验证通过返回true
*/
bool validateDeviceData(const std::vector<uint8_t>& payload) {
if (payload.size() < sizeof(SensorData)) {
return false;
}
// 验证传感器数据格式
// 这里可以添加更复杂的验证逻辑
return true;
}
/**
* @brief 验证控制命令
* @param payload 负载数据
* @return 验证通过返回true
*/
bool validateControlCommand(const std::vector<uint8_t>& payload) {
if (payload.size() < sizeof(ControlCommand)) {
return false;
}
// 验证命令格式
const ControlCommand* cmd =
reinterpret_cast<const ControlCommand*>(payload.data());
// 检查校验和
uint8_t calculated = cmd->calculateChecksum();
if (calculated != cmd->checksum) {
return false;
}
return true;
}
/**
* @brief 验证设备注册消息
* @param payload 负载数据
* @return 验证通过返回true
*/
bool validateDeviceRegister(const std::vector<uint8_t>& payload) {
if (payload.size() < sizeof(DeviceInfo)) {
return false;
}
const DeviceInfo* info =
reinterpret_cast<const DeviceInfo*>(payload.data());
// 验证设备ID格式
if (strlen(info->device_id) == 0 || strlen(info->device_id) > 31) {
return false;
}
// 验证设备类型
if (info->type < DeviceType::TEMPERATURE_SENSOR ||
info->type > DeviceType::CONTROLLER) {
return false;
}
return true;
}
};
/**
* @class RoutingHandler
* @brief 路由处理器
* @details 根据规则决定消息的目标
*
* 设计模式:策略模式(Strategy Pattern)
* - 处理突发事件:路由规则冲突、目标不可达、规则解析失败
* - 工程作用:灵活的路由策略,支持动态路由更新
*/
class RoutingHandler : public MessageHandler {
private:
std::vector<RouteConfig> routing_rules_; ///< 路由规则列表
std::shared_ptr<DeviceManager> device_manager_; ///< 设备管理器
mutable std::shared_mutex rules_mutex_; ///< 规则读写锁
public:
RoutingHandler(std::shared_ptr<DeviceManager> device_mgr)
: MessageHandler("RoutingHandler"), device_manager_(device_mgr) {}
/**
* @brief 添加路由规则
* @param rule 路由规则
*/
void addRoutingRule(const RouteConfig& rule) {
std::unique_lock<std::shared_mutex> lock(rules_mutex_);
routing_rules_.push_back(rule);
// 按优先级排序
std::sort(routing_rules_.begin(), routing_rules_.end(),
[](const RouteConfig& a, const RouteConfig& b) {
return a.priority > b.priority;
});
std::cout << "Routing rule added: "
<< static_cast<int>(rule.message_type) << std::endl;
}
/**
* @brief 处理路由逻辑
* @param context 消息上下文
* @return 路由成功返回true
*/
bool processMessage(std::shared_ptr<MessageContext> context) override {
const WebSocketMessage& msg = context->getOriginalMessage();
const std::string& source_id = context->getSourceDeviceId();
// 查找匹配的路由规则
RouteConfig matched_rule;
if (!findMatchingRule(msg.type, source_id, matched_rule)) {
// 没有匹配的路由规则
return false;
}
// 设置路由配置
context->setRouteConfig(matched_rule);
// 确定目标设备
std::string target_id = determineTarget(msg, source_id, matched_rule);
if (target_id.empty()) {
// 无法确定目标设备
return false;
}
// 检查目标设备是否可达
if (!isTargetReachable(target_id)) {
return false;
}
// 设置目标设备ID
context->setTargetDeviceId(target_id);
return true;
}
private:
/**
* @brief 查找匹配的路由规则
* @param msg_type 消息类型
* @param source_id 源设备ID
* @param matched_rule 输出匹配的规则
* @return 找到匹配规则返回true
*/
bool findMatchingRule(MessageType msg_type, const std::string& source_id,
RouteConfig& matched_rule) {
std::shared_lock<std::shared_mutex> lock(rules_mutex_);
for (const auto& rule : routing_rules_) {
// 检查消息类型
if (rule.message_type != msg_type) {
continue;
}
// 检查源设备模式匹配
if (!matchPattern(source_id, rule.source_pattern)) {
continue;
}
// 找到匹配规则
matched_rule = rule;
return true;
}
return false;
}
/**
* @brief 确定目标设备
* @param msg 消息
* @param source_id 源设备ID
* @param rule 路由规则
* @return 目标设备ID
*/
std::string determineTarget(const WebSocketMessage& msg,
const std::string& source_id,
const RouteConfig& rule) {
// 根据路由规则确定目标
if (rule.target_pattern == "*") {
// 广播到所有设备
// 这里返回特殊标识,后续处理器会处理广播
return "BROADCAST";
} else if (rule.target_pattern == "GROUP:*") {
// 广播到源设备所在的所有组
auto groups = device_manager_->getGroupsForDevice(source_id);
if (!groups.empty()) {
// 选择第一个组
return "GROUP:" + groups[0];
}
return "";
} else if (rule.target_pattern.find("GROUP:") == 0) {
// 发送到特定组
return rule.target_pattern;
} else if (rule.target_pattern.find("*") != std::string::npos) {
// 通配符匹配
// 这里需要实现通配符匹配逻辑
return rule.target_pattern;
} else {
// 直接设备ID
return rule.target_pattern;
}
}
/**
* @brief 检查目标是否可达
* @param target_id 目标设备ID或组ID
* @return 可达返回true
*/
bool isTargetReachable(const std::string& target_id) {
if (target_id == "BROADCAST") {
// 广播总是可达
return true;
} else if (target_id.find("GROUP:") == 0) {
// 检查组是否存在
std::string group_id = target_id.substr(6);
auto devices = device_manager_->getDevicesInGroup(group_id);
return !devices.empty();
} else {
// 检查设备是否在线
int conn_fd = device_manager_->getConnectionByDeviceId(target_id);
return conn_fd >= 0;
}
}
/**
* @brief 模式匹配函数
* @param str 待匹配字符串
* @param pattern 模式字符串(支持*通配符)
* @return 匹配返回true
*/
bool matchPattern(const std::string& str, const std::string& pattern) {
if (pattern == "*") {
return true;
}
// 简单通配符匹配实现
size_t str_pos = 0, pat_pos = 0;
size_t str_len = str.length(), pat_len = pattern.length();
size_t star_pos = std::string::npos, str_star_pos = 0;
while (str_pos < str_len) {
if (pat_pos < pat_len &&
(pattern[pat_pos] == '?' || pattern[pat_pos] == str[str_pos])) {
// 字符匹配或通配符?
str_pos++;
pat_pos++;
} else if (pat_pos < pat_len && pattern[pat_pos] == '*') {
// 遇到*通配符
star_pos = pat_pos;
str_star_pos = str_pos;
pat_pos++;
} else if (star_pos != std::string::npos) {
// 使用*通配符匹配多个字符
pat_pos = star_pos + 1;
str_pos = str_star_pos + 1;
str_star_pos = str_pos;
} else {
return false;
}
}
// 跳过末尾的*通配符
while (pat_pos < pat_len && pattern[pat_pos] == '*') {
pat_pos++;
}
return pat_pos == pat_len;
}
};
/**
* @class TransformationHandler
* @brief 消息转换处理器
* @details 对消息进行格式转换、数据过滤等操作
*/
class TransformationHandler : public MessageHandler {
public:
TransformationHandler() : MessageHandler("TransformationHandler") {}
/**
* @brief 处理消息转换
* @param context 消息上下文
* @return 转换成功返回true
*/
bool processMessage(std::shared_ptr<MessageContext> context) override {
const WebSocketMessage& msg = context->getOriginalMessage();
std::vector<uint8_t> processed = msg.payload;
// 根据消息类型进行转换
switch (msg.type) {
case MessageType::DEVICE_DATA:
processed = transformDeviceData(msg.payload);
break;
case MessageType::CONTROL_COMMAND:
processed = transformControlCommand(msg.payload);
break;
default:
// 其他类型不转换
break;
}
// 更新处理后的负载
context->setProcessedPayload(processed);
return true;
}
private:
/**
* @brief 转换设备数据
* @param payload 原始负载
* @return 转换后的负载
*/
std::vector<uint8_t> transformDeviceData(const std::vector<uint8_t>& payload) {
if (payload.size() < sizeof(SensorData)) {
return payload; ///< 数据太小,不转换
}
const SensorData* sensor_data =
reinterpret_cast<const SensorData*>(payload.data());
// 这里可以实现数据转换逻辑
// 例如:单位转换、数据过滤、格式标准化等
// 简单示例:复制数据
std::vector<uint8_t> result(payload.begin(), payload.end());
return result;
}
/**
* @brief 转换控制命令
* @param payload 原始负载
* @return 转换后的负载
*/
std::vector<uint8_t> transformControlCommand(const std::vector<uint8_t>& payload) {
if (payload.size() < sizeof(ControlCommand)) {
return payload;
}
ControlCommand* cmd =
reinterpret_cast<ControlCommand*>(const_cast<uint8_t*>(payload.data()));
// 重新计算校验和
cmd->checksum = cmd->calculateChecksum();
// 复制数据
std::vector<uint8_t> result(payload.begin(), payload.end());
return result;
}
};
/**
* @class DeliveryHandler
* @brief 消息投递处理器
* @details 将消息发送到目标设备
*/
class DeliveryHandler : public MessageHandler {
private:
std::shared_ptr<DeviceManager> device_manager_; ///< 设备管理器
std::function<void(int, const std::vector<uint8_t>&)> send_callback_; ///< 发送回调
public:
DeliveryHandler(std::shared_ptr<DeviceManager> device_mgr,
std::function<void(int, const std::vector<uint8_t>&)> callback)
: MessageHandler("DeliveryHandler"),
device_manager_(device_mgr),
send_callback_(callback) {}
/**
* @brief 处理消息投递
* @param context 消息上下文
* @return 投递成功返回true
*/
bool processMessage(std::shared_ptr<MessageContext> context) override {
const std::string& target_id = context->getTargetDeviceId();
const std::vector<uint8_t>& payload = context->getProcessedPayload();
if (target_id == "BROADCAST") {
// 广播消息
return broadcastMessage(context);
} else if (target_id.find("GROUP:") == 0) {
// 组播消息
return multicastMessage(context);
} else {
// 单播消息
return unicastMessage(context);
}
}
private:
/**
* @brief 单播消息
* @param context 消息上下文
* @return 发送成功返回true
*/
bool unicastMessage(std::shared_ptr<MessageContext> context) {
const std::string& target_id = context->getTargetDeviceId();
// 获取目标设备的连接
int conn_fd = device_manager_->getConnectionByDeviceId(target_id);
if (conn_fd < 0) {
// 目标设备离线
return false;
}
// 构造WebSocket消息
WebSocketMessage response;
response.type = context->getOriginalMessage().type;
response.message_id = generateMessageId();
response.timestamp = WebSocketConnection::getCurrentTimestamp();
response.payload = context->getProcessedPayload();
// 序列化消息
std::vector<uint8_t> serialized;
response.serialize(serialized);
// 发送消息
if (send_callback_) {
send_callback_(conn_fd, serialized);
}
return true;
}
/**
* @brief 广播消息
* @param context 消息上下文
* @return 广播成功返回true
*/
bool broadcastMessage(std::shared_ptr<MessageContext> context) {
// 获取所有在线设备
auto online_devices = device_manager_->getAllOnlineDevices();
bool all_success = true;
for (const auto& device_id : online_devices) {
// 跳过源设备(避免自己接收自己的广播)
if (device_id == context->getSourceDeviceId()) {
continue;
}
// 获取设备连接
int conn_fd = device_manager_->getConnectionByDeviceId(device_id);
if (conn_fd >= 0) {
// 构造消息
WebSocketMessage response;
response.type = context->getOriginalMessage().type;
response.message_id = generateMessageId();
response.timestamp = WebSocketConnection::getCurrentTimestamp();
response.payload = context->getProcessedPayload();
std::vector<uint8_t> serialized;
response.serialize(serialized);
// 发送消息
if (send_callback_) {
send_callback_(conn_fd, serialized);
}
} else {
all_success = false;
}
}
return all_success;
}
/**
* @brief 组播消息
* @param context 消息上下文
* @return 组播成功返回true
*/
bool multicastMessage(std::shared_ptr<MessageContext> context) {
std::string group_id = context->getTargetDeviceId().substr(6); // 去掉"GROUP:"
// 获取组内所有设备
auto group_devices = device_manager_->getDevicesInGroup(group_id);
bool all_success = true;
for (const auto& device_id : group_devices) {
// 跳过源设备
if (device_id == context->getSourceDeviceId()) {
continue;
}
// 获取设备连接
int conn_fd = device_manager_->getConnectionByDeviceId(device_id);
if (conn_fd >= 0) {
// 构造消息
WebSocketMessage response;
response.type = context->getOriginalMessage().type;
response.message_id = generateMessageId();
response.timestamp = WebSocketConnection::getCurrentTimestamp();
response.payload = context->getProcessedPayload();
std::vector<uint8_t> serialized;
response.serialize(serialized);
// 发送消息
if (send_callback_) {
send_callback_(conn_fd, serialized);
}
} else {
all_success = false;
}
}
return all_success;
}
/**
* @brief 生成消息ID
* @return 消息ID
*/
uint64_t generateMessageId() {
static std::atomic<uint64_t> counter{0};
return ++counter;
}
};
/**
* @class MessageRouter
* @brief 消息路由器主类
* @details 协调所有处理器,管理消息处理流水线
*
* 设计模式:外观模式(Facade Pattern)
* - 处理突发事件:处理器链故障、消息积压、资源耗尽
* - 工程作用:简化客户端接口,统一管理处理器链
*/
class MessageRouter {
private:
std::shared_ptr<DeviceManager> device_manager_; ///< 设备管理器
std::shared_ptr<MessageHandler> handler_chain_; ///< 处理器链
// 消息队列
std::queue<std::shared_ptr<MessageContext>> message_queue_; ///< 消息队列
mutable std::mutex queue_mutex_; ///< 队列互斥锁
std::condition_variable queue_cv_; ///< 队列条件变量
// 工作线程
std::vector<std::thread> worker_threads_; ///< 工作线程池
std::atomic<bool> running_{false}; ///< 运行标志
std::atomic<size_t> active_workers_{0}; ///< 活跃工作线程数
// 统计信息
std::atomic<uint64_t> total_messages_{0}; ///< 总处理消息数
std::atomic<uint64_t> successful_messages_{0}; ///< 成功消息数
std::atomic<uint64_t> failed_messages_{0}; ///< 失败消息数
std::atomic<uint64_t> queue_size_{0}; ///< 当前队列大小
public:
/**
* @brief 构造函数
* @param device_mgr 设备管理器
* @param send_callback 消息发送回调函数
*/
MessageRouter(std::shared_ptr<DeviceManager> device_mgr,
std::function<void(int, const std::vector<uint8_t>&)> send_callback)
: device_manager_(device_mgr) {
// 构建处理器链(责任链模式)
buildHandlerChain(send_callback);
// 初始化路由规则
initializeRoutingRules();
}
/**
* @brief 启动路由器
* @param thread_count 工作线程数
*/
void start(size_t thread_count = WORKER_THREADS) {
running_ = true;
// 创建工作线程
for (size_t i = 0; i < thread_count; ++i) {
worker_threads_.emplace_back(&MessageRouter::workerThread, this, i);
}
std::cout << "Message router started with "
<< thread_count << " worker threads" << std::endl;
}
/**
* @brief 停止路由器
*/
void stop() {
running_ = false;
// 通知所有等待的线程
queue_cv_.notify_all();
// 等待工作线程结束
for (auto& thread : worker_threads_) {
if (thread.joinable()) {
thread.join();
}
}
worker_threads_.clear();
std::cout << "Message router stopped" << std::endl;
}
/**
* @brief 提交消息处理
* @param message WebSocket消息
* @param source_device_id 源设备ID
* @return 提交成功返回true
*/
bool submitMessage(const WebSocketMessage& message,
const std::string& source_device_id) {
// 创建消息上下文
auto context = std::make_shared<MessageContext>(message, source_device_id);
{
std::lock_guard<std::mutex> lock(queue_mutex_);
// 检查队列大小限制
if (message_queue_.size() >= 10000) {
// 队列已满,丢弃消息
failed_messages_++;
return false;
}
// 添加到队列
message_queue_.push(context);
queue_size_ = message_queue_.size();
}
total_messages_++;
// 通知工作线程
queue_cv_.notify_one();
return true;
}
/**
* @brief 获取路由器统计信息
* @return 统计信息字符串
*/
std::string getStats() const {
char buffer[512];
snprintf(buffer, sizeof(buffer),
"=== Message Router Stats ===\n"
"Total Messages: %llu\n"
"Successful: %llu\n"
"Failed: %llu\n"
"Success Rate: %.2f%%\n"
"Queue Size: %llu\n"
"Active Workers: %llu\n"
"Worker Threads: %zu\n",
total_messages_.load(),
successful_messages_.load(),
failed_messages_.load(),
total_messages_ > 0 ?
(successful_messages_.load() * 100.0 / total_messages_.load()) : 0.0,
queue_size_.load(),
active_workers_.load(),
worker_threads_.size());
return std::string(buffer);
}
/**
* @brief 获取处理器链统计信息
* @return 统计信息字符串
*/
std::string getHandlerStats() const {
std::stringstream ss;
ss << "=== Handler Chain Stats ===\n";
std::shared_ptr<MessageHandler> current = handler_chain_;
while (current) {
ss << current->getStats() << "\n";
// 这里需要添加获取下一个处理器的方法
// 实际实现中MessageHandler需要提供getNext()方法
break; // 简化处理
}
return ss.str();
}
private:
/**
* @brief 构建处理器链
* @param send_callback 发送回调函数
*/
void buildHandlerChain(
std::function<void(int, const std::vector<uint8_t>&)> send_callback) {
// 创建处理器(按处理顺序)
auto auth_handler = std::make_shared<AuthenticationHandler>(device_manager_);
auto validation_handler = std::make_shared<ValidationHandler>();
auto routing_handler = std::make_shared<RoutingHandler>(device_manager_);
auto transform_handler = std::make_shared<TransformationHandler>();
auto delivery_handler = std::make_shared<DeliveryHandler>(
device_manager_, send_callback);
// 构建责任链
auth_handler->setNextHandler(validation_handler);
validation_handler->setNextHandler(routing_handler);
routing_handler->setNextHandler(transform_handler);
transform_handler->setNextHandler(delivery_handler);
// 设置处理器链头
handler_chain_ = auth_handler;
}
/**
* @brief 初始化路由规则
*/
void initializeRoutingRules() {
// 获取路由处理器
auto routing_handler = std::dynamic_pointer_cast<RoutingHandler>(
handler_chain_->getNextHandler()->getNextHandler());
if (!routing_handler) {
return;
}
// 添加默认路由规则
// 规则1:设备数据转发到监控组
RouteConfig rule1;
rule1.message_type = MessageType::DEVICE_DATA;
rule1.source_pattern = "*";
rule1.target_pattern = "GROUP:monitoring";
rule1.priority = 60;
rule1.require_auth = true;
rule1.rate_limit = 1000;
routing_handler->addRoutingRule(rule1);
// 规则2:控制命令转发到指定设备
RouteConfig rule2;
rule2.message_type = MessageType::CONTROL_COMMAND;
rule2.source_pattern = "controller_*";
rule2.target_pattern = "*";
rule2.priority = 80;
rule2.require_auth = true;
rule2.rate_limit = 100;
routing_handler->addRoutingRule(rule2);
// 规则3:设备注册消息广播
RouteConfig rule3;
rule3.message_type = MessageType::DEVICE_REGISTER;
rule3.source_pattern = "*";
rule3.target_pattern = "BROADCAST";
rule3.priority = 40;
rule3.require_auth = false;
rule3.rate_limit = 10;
routing_handler->addRoutingRule(rule3);
std::cout << "Default routing rules initialized" << std::endl;
}
/**
* @brief 工作线程函数
* @param thread_id 线程ID
*/
void workerThread(int thread_id) {
std::cout << "Worker thread " << thread_id << " started" << std::endl;
while (running_) {
std::shared_ptr<MessageContext> context;
{
std::unique_lock<std::mutex> lock(queue_mutex_);
// 等待消息或停止信号
queue_cv_.wait(lock, [this]() {
return !running_ || !message_queue_.empty();
});
if (!running_ && message_queue_.empty()) {
break;
}
if (message_queue_.empty()) {
continue;
}
// 获取消息
context = message_queue_.front();
message_queue_.pop();
queue_size_ = message_queue_.size();
}
if (context) {
active_workers_++;
// 处理消息
bool success = handler_chain_->handleMessage(context);
if (success) {
successful_messages_++;
} else {
failed_messages_++;
// 记录失败详情
std::cerr << "Message processing failed: "
<< context->generateReport() << std::endl;
}
active_workers_--;
// 记录处理延迟(如果启用了日志)
if (context->getRouteConfig().enable_logging) {
std::cout << "Message processed: delay="
<< context->getProcessingDelay() << "ms" << std::endl;
}
}
}
std::cout << "Worker thread " << thread_id << " stopped" << std::endl;
}
};
} // namespace iot
设计模式分析(第四部分)
1. 责任链模式(Chain of Responsibility Pattern)
-
应用位置:
MessageHandler基类及其子类形成的处理器链 -
处理突发事件:
-
处理器故障:单个处理器失败不影响整个链
-
消息格式错误:验证处理器可以提前拒绝无效消息
-
处理顺序调整:动态调整处理器顺序
-
-
工程作用:
-
解耦消息发送者和多个接收者
-
动态添加或删除处理步骤
-
每个处理器只关心自己的职责
-
-
性能分析:
-
处理延迟:O(n)链式调用,n为处理器数量
-
内存开销:每个处理器约100-200字节
-
吞吐量:处理器链越长,吞吐量越低
-
2. 策略模式(Strategy Pattern)
-
应用位置:
RoutingHandler中的路由策略 -
处理突发事件:
-
路由规则冲突:优先级策略解决冲突
-
目标不可达:备选路由策略
-
规则动态更新:支持热更新路由策略
-
-
工程作用:
-
封装不同的路由算法
-
运行时切换路由策略
-
易于扩展新的路由策略
-
-
性能分析:
-
规则匹配:O(m)时间复杂度,m为规则数量
-
内存占用:每条规则约100字节
-
匹配优化:规则按优先级排序,优先匹配高优先级
-
3. 上下文模式(Context Pattern)
-
应用位置:
MessageContext类封装处理上下文 -
处理突发事件:
-
消息丢失:上下文记录处理状态
-
处理超时:时间戳监控处理延迟
-
依赖解析失败:错误码和消息记录
-
-
工程作用:
-
统一传递处理参数
-
支持中间件数据传递
-
提供处理状态追踪
-
-
性能分析:
-
上下文创建:O(1)固定开销
-
数据存储:使用any类型支持任意数据类型
-
线程安全:互斥锁保护共享数据
-
4. 外观模式(Facade Pattern)
-
应用位置:
MessageRouter类提供简化接口 -
处理突发事件:
-
客户端调用复杂:隐藏处理器链细节
-
资源管理复杂:统一管理线程池和队列
-
错误处理分散:集中错误处理和统计
-
-
工程作用:
-
简化客户端接口
-
统一资源管理
-
集中监控和统计
-
运行性能分析
消息处理流水线:
-
认证阶段:O(1)设备查找,快速拒绝未认证消息
-
验证阶段:O(1)格式检查,防止恶意消息
-
路由阶段:O(m)规则匹配,m通常较小(<100)
-
转换阶段:O(n)数据转换,n为数据大小
-
投递阶段:O(k)目标查找,k为设备数量
线程池优化:
-
工作窃取:空闲线程从其他线程队列窃取任务
-
负载均衡:基于队列长度的动态任务分配
-
线程复用:避免频繁创建销毁线程
内存管理:
-
消息池:复用消息对象减少分配
-
缓冲区重用:payload缓冲区重复使用
-
智能指针:自动内存管理
数据流结构大小设计
1. 消息上下文内存占用:
MessageContext内存估算: - 原始消息:~100字节(头部)+ payload - 处理后的payload:与原始payload相当 - 字符串:设备ID平均32字节 * 2 = 64字节 - 路由配置:~50字节 - 时间戳:24字节 - 额外数据:动态大小,平均100字节 - 锁:40字节 - 原子变量:8字节 - 总计:基础~400字节 + payload大小
2. 处理器链开销:
处理器内存估算(每个): - 虚表指针:8字节 - 下一个处理器指针:8字节 - 名称字符串:平均32字节 - 统计变量:24字节 - 设备管理器指针:8字节 - 总计:~80字节/处理器 处理器链总计:5 * 80 = 400字节
3. 路由规则内存:
每条路由规则: - 消息类型:1字节 - 字符串:模式平均32字节 * 2 = 64字节 - 整数字段:12字节 - 布尔字段:1字节(对齐到4字节) - 总计:~80字节/规则 100条规则:8KB
突发事件处理策略
| 突发事件 | 处理策略 | 设计模式应用 |
|---|---|---|
| 消息风暴 | 队列缓冲,速率限制 | 责任链模式的队列管理 |
| 处理器故障 | 故障隔离,错误计数 | 责任链模式的链式传递 |
| 路由规则冲突 | 优先级排序,规则验证 | 策略模式的规则管理 |
| 内存泄漏 | 对象池,智能指针 | 外观模式的统一资源管理 |
| 处理延迟 | 超时控制,异步处理 | 上下文模式的时间戳监控 |
第四部分总结:实现了完整的消息路由和处理系统,采用责任链模式构建处理流水线,策略模式实现灵活路由,上下文模式传递处理状态,外观模式提供简化接口。为IoT平台提供了可靠、高效的消息处理能力。
openvela 操作系统专为 AIoT 领域量身定制,以轻量化、标准兼容、安全性和高度可扩展性为核心特点。openvela 以其卓越的技术优势,已成为众多物联网设备和 AI 硬件的技术首选,涵盖了智能手表、运动手环、智能音箱、耳机、智能家居设备以及机器人等多个领域。
更多推荐
所有评论(0)