#28 后端连接rabbitmq导致报错崩溃

Open
opened 4 days ago by lyq · 3 comments
天问 commented 4 days ago

满足性能指标要求

满足性能指标要求
天问 commented 11 hours ago
Owner

RabbitMQ中间件

使用RabbitMQ的原因(而非直接推送到后端)

  1. 提高可靠性

    • 当后端系统暂时不可用时,消息仍然可以发送到RabbitMQ并存储
    • 后端恢复后可以继续处理这些消息,避免数据丢失
  2. 简化通信协议

    • 物联网设备通常使用MQTT协议(轻量级、低带宽)
    • 后端系统通常使用AMQP协议(功能丰富)
    • RabbitMQ支持两种协议,充当协议转换器
  3. 实现消息广播

    • 同一消息可能需要被多个服务消费(如数据处理、监控、分析等)
    • 消息队列支持一对多的发布/订阅模式
  4. 更灵活的部署架构

    • RabbitMQ可以部署在云端或边缘节点
    • 更容易实现跨网络、跨区域的数据传输

为何RabbitMQ存储的消息还需保存到数据库

  1. 数据访问需求

    • RabbitMQ中的消息一旦被消费,通常会从队列中移除
    • 数据库支持复杂的查询和分析功能(SQL查询、索引、统计等)

      // 保存设备状态历史数据
      if (!needUpdateList.isEmpty()) {
      List<DeviceStatusHistory> deviceStatusHistories = new ArrayList<>();
      // 创建历史记录...
      deviceStatusHistoryDao.saveAll(deviceStatusHistories);
      }
      
  2. 事务和一致性

    • 数据库提供完善的事务机制,保证数据一致性
    • 多表操作可以在一个事务中完成

总结

  • RabbitMQ:作为通信中间件,处理消息传输、暂存和分发,提高系统的可靠性和扩展性
  • 数据库:作为持久存储,支持数据的长期保存、分析和业务价值挖掘

这种架构结合了消息队列的高吞吐和数据库的持久性存储,是大规模物联网数据处理系统的常见最佳实践。

# RabbitMQ中间件 ## 使用RabbitMQ的原因(而非直接推送到后端) 3. **提高可靠性**: - 当后端系统暂时不可用时,消息仍然可以发送到RabbitMQ并存储 - 后端恢复后可以继续处理这些消息,避免数据丢失 5. **简化通信协议**: - 物联网设备通常使用MQTT协议(轻量级、低带宽) - 后端系统通常使用AMQP协议(功能丰富) - RabbitMQ支持两种协议,充当协议转换器 6. **实现消息广播**: - 同一消息可能需要被多个服务消费(如数据处理、监控、分析等) - 消息队列支持一对多的发布/订阅模式 7. **更灵活的部署架构**: - RabbitMQ可以部署在云端或边缘节点 - 更容易实现跨网络、跨区域的数据传输 ## 为何RabbitMQ存储的消息还需保存到数据库 2. **数据访问需求**: - RabbitMQ中的消息一旦被消费,通常会从队列中移除 - 数据库支持复杂的查询和分析功能(SQL查询、索引、统计等) ```java // 保存设备状态历史数据 if (!needUpdateList.isEmpty()) { List<DeviceStatusHistory> deviceStatusHistories = new ArrayList<>(); // 创建历史记录... deviceStatusHistoryDao.saveAll(deviceStatusHistories); } ``` 6. **事务和一致性**: - 数据库提供完善的事务机制,保证数据一致性 - 多表操作可以在一个事务中完成 ## 总结 - **RabbitMQ**:作为**通信中间件**,处理消息传输、暂存和分发,提高系统的可靠性和扩展性 - **数据库**:作为**持久存储**,支持数据的长期保存、分析和业务价值挖掘 这种架构结合了消息队列的高吞吐和数据库的持久性存储,是大规模物联网数据处理系统的常见最佳实践。
天问 commented 11 hours ago
Owner

MQ容器与后端项目的关系及消息流转流程

根据您提供的端口信息和项目代码,我可以详细解释MQ容器、设备、项目之间的关系和消息处理流程。

MQ容器与项目的关系

您提到的端口信息表明您正在运行一个RabbitMQ容器,该容器暴露了多个端口:

  • 1883: MQTT协议端口,用于物联网设备连接
  • 5672: AMQP协议端口,用于应用程序连接RabbitMQ
  • 15672: RabbitMQ管理界面端口

这个MQ容器与您的项目之间的关系是:

  1. 消息中间件服务提供者:RabbitMQ容器作为独立的服务,提供消息队列功能
  2. 连接桥梁:它连接了物联网设备(通过MQTT协议)和您的后端系统(通过AMQP协议)
  3. 消息临时存储:接收到的消息先存储在RabbitMQ中,然后由您的后端系统消费

完整的消息流转处理流程

1. 设备消息推送阶段

设备通过MQTT协议(端口1883)将数据推送到RabbitMQ服务器。根据您的配置文件:

# application-testn.yml配置
mqtt:
  url: tcp://101.37.83.72:1883
  username: TraneAdmin
  password: Trane8888
  client:
    id: consumer-id
  default:
    topic: topic

设备会连接到MQTT服务器,并发布消息到指定的主题(可能是像 x.request 这样的主题)。

2. 消息路由阶段

RabbitMQ服务器接收到MQTT消息后,会根据配置的交换机和路由规则,将消息路由到相应的队列,如 q.requestq.respond 等。这个过程是在RabbitMQ内部完成的,基于您项目中设置的交换机和队列绑定关系:

// 在MQUtils类中创建交换机、队列和绑定关系
amqpAdmin.declareExchange(new FanoutExchange(exname));
amqpAdmin.declareQueue(new Queue(qname));
amqpAdmin.declareBinding(new Binding(qname, Binding.DestinationType.QUEUE, exname, rkey, null));

3. 后端消息接收阶段

您的Java后端项目通过AMQP协议(端口5672)连接到RabbitMQ,并通过监听器监听相应的队列:

// 在RabbitMqUseMqttHander类中的监听方法(实际使用时会启用@RabbitListener注解)
public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
    String messageString = new String(message.getBody(), StandardCharsets.UTF_8);
    try {
        // 处理消息
        putdata(messageString);
        channel.basicAck(deliveryTag, false);
    } catch (Exception e) {
        // 异常处理...
    }
}

您的项目配置了连接到RabbitMQ的相关参数:

# application-testn.yml中RabbitMQ配置
rabbitmq:
  host: 101.37.83.72
  port: 5672
  username: TraneAdmin
  password: Trane8888
  template:
    routing-key: python
    exchange-name: x.request
    queue-name: q.request

4. 消息处理与解析阶段

后端项目接收到消息后,在RabbitMqUseMqttHander类的putdata方法中进行处理:

public void putdata(String messageString) throws Exception {
    // 解析JSON消息
    JSONObject jsonObject = JSONObject.parseObject(messageString);
    
    // 获取项目ID和相关数据
    String projectId = jsonObject.getString("projectID");
    String tjId = jsonObject.getString("tjID");
    
    // 查询相关数据
    List<DeviceStatus> deviceStatusList = deviceStatusDao.findAll(projectId, tjId);
    List<ProjectSystemInfo> projectSystemInfos = projectSystemInfoRepository.findAll(projectId, tjId);
    
    // 创建待更新列表
    List<DeviceStatus> needUpdateList = new ArrayList<>();
    List<ProjectSystemInfo> needUpdateProjectList = new ArrayList<>();
    
    // 处理实时数据
    JSONObject actual = jsonObject.getJSONObject("actual");
    if (actual != null) {
        setActualValue(actual, deviceStatusList, needUpdateList, projectSystemInfos, 
                       needUpdateProjectList, projectId, tjId, projectTraneList, dateTime);
    }
    
    // 处理预测数据
    JSONObject prediction = jsonObject.getJSONObject("prediction");
    if (prediction != null) {
        setPredictionValue(prediction, projectSystemInfos, needUpdateProjectList, 
                           projectId, tjId, projectTraneList);
    }
    
    // 处理优化数据
    JSONObject optimization = jsonObject.getJSONObject("optimization");
    if (optimization != null) {
        setOptimizationValue(optimization, deviceStatusList, needUpdateList, projectSystemInfos, 
                             needUpdateProjectList, projectId, tjId, projectTraneList, dateTime);
    }
    
    // 处理冷却器数据
    JSONObject chiller = jsonObject.getJSONObject("chiller");
    if (chiller != null) {
        chiller(chiller, projectId, dateTime);
    }
}

5. 数据保存阶段

解析后的数据会保存到不同的数据库表中:

// 保存设备状态数据
deviceStatusDao.saveAll(needUpdateList);

// 保存项目系统信息
projectSystemInfoRepository.saveAll(needUpdateProjectList);

// 保存历史数据
if (!needUpdateList.isEmpty()) {
    List<DeviceStatusHistory> deviceStatusHistories = new ArrayList<>();
    // 创建历史记录...
    deviceStatusHistoryDao.saveAll(deviceStatusHistories);
}

if (!needUpdateProjectList.isEmpty()) {
    List<ProjectSystemInfoHistory> projectSystemInfoHistories = new ArrayList<>();
    // 创建历史记录...
    projectSystemInfoHistoryRepository.saveAll(projectSystemInfoHistories);
}

6. 数据处理与分析示例

以处理设备状态的实时数据为例:

private void setActualDeviceStatusData(JSONObject object, List<DeviceStatus> deviceStatusList,
                                   List<DeviceStatus> needUpdateList, String deviceType,
                                   String projectId, String tjId, Date dateTime) {
    for (Map.Entry<String, Object> entry : object.entrySet()) {
        String key = entry.getKey();
        JSONObject value = (JSONObject) entry.getValue();
        
        // 查找现有设备记录
        Optional<DeviceStatus> device = deviceStatusList.stream()
                .filter(v -> v.getDeviceId().equals(key) && v.getDataType() == 0)
                .findFirst();
        
        if (device.isPresent()) {
            // 更新现有记录
            DeviceStatus deviceStatus = device.get();
            deviceStatus.setDateTime(dateTime);
            deviceStatus.setPower(Float.valueOf(value.get("power").toString()));
            deviceStatus.setFrequency(Float.valueOf(value.get("frequency").toString()));
            deviceStatus.setRunningStatus(Float.valueOf(value.get("running_status").toString()));
            needUpdateList.add(deviceStatus);
        } else {
            // 创建新记录
            DeviceStatus deviceStatus = new DeviceStatus();
            deviceStatus.setDateTime(dateTime);
            deviceStatus.setId(UUID.randomUUID().toString());
            deviceStatus.setName(key);
            deviceStatus.setDeviceId(key);
            deviceStatus.setDeviceType(deviceType);
            deviceStatus.setIsDel("0");
            deviceStatus.setVersion("0");
            deviceStatus.setProjectId(projectId);
            deviceStatus.setTjId(Integer.valueOf(tjId));
            deviceStatus.setDataType(0);
            deviceStatus.setPower(Float.valueOf(value.get("power").toString()));
            deviceStatus.setFrequency(Float.valueOf(value.get("frequency").toString()));
            deviceStatus.setRunningStatus(Float.valueOf(value.get("running_status").toString()));
            needUpdateList.add(deviceStatus);
        }
    }
}

消息格式及处理详解

根据您的代码,设备推送的消息格式是JSON格式,包含以下主要部分:

{
  "projectID": "项目ID",
  "tjID": "统计ID",
  "actual": {
    "dateTime": "2023-01-01 12:00:00",
    "PP": {
      "设备ID1": {
        "power": 100.0,
        "frequency": 50.0,
        "running_status": 1.0
      }
    },
    "CT": {...},
    "CP": {...},
    "CH": {...},
    "SYS": {...},
    "spt": {...},
    "strategy": {...}
  },
  "prediction": {...},
  "optimization": {...},
  "chiller": {...}
}

各模块处理逻辑:

  • actual: 表示设备实时数据,包含PP、CT、CP、CH等设备类型
  • prediction: 表示预测数据
  • optimization: 表示优化数据
  • chiller: 表示冷却器数据

系统架构与组件关系图

+----------------+       +------------------+      +------------------+
|                |  MQTT |                  | AMQP |                  |
| 物联网设备      +------->  RabbitMQ容器    +------>  Java后端项目     |
| (传感器/控制器)  | 1883  | (消息中间件)      | 5672 | (数据处理与存储)   |
|                |       |                  |      |                  |
+----------------+       +------------------+      +--------+---------+
                                                           |
                                                           | JDBC
                                                           v
                                                  +------------------+
                                                  |                  |
                                                  |   数据库          |
                                                  | (MySQL)          |
                                                  |                  |
                                                  +------------------+

总结

  1. 设备通过MQTT协议将数据推送到RabbitMQ服务器的指定主题
  2. RabbitMQ服务器接收数据并根据路由规则将消息存储在相应队列中
  3. Java后端项目通过AMQP协议连接RabbitMQ,监听队列接收消息
  4. 后端项目解析消息,将其分解为实时数据、预测数据、优化数据等
  5. 数据持久化到数据库,同时保存历史记录
  6. 异常处理机制确保消息处理的可靠性,包括消息确认和异常重试

这个架构实现了从物联网设备到数据存储的完整链路,通过消息中间件解耦了数据采集和数据处理,提高了系统的可扩展性和可靠性。

# MQ容器与后端项目的关系及消息流转流程 根据您提供的端口信息和项目代码,我可以详细解释MQ容器、设备、项目之间的关系和消息处理流程。 ## MQ容器与项目的关系 您提到的端口信息表明您正在运行一个RabbitMQ容器,该容器暴露了多个端口: - `1883`: MQTT协议端口,用于物联网设备连接 - `5672`: AMQP协议端口,用于应用程序连接RabbitMQ - `15672`: RabbitMQ管理界面端口 这个MQ容器与您的项目之间的关系是: 1. **消息中间件服务提供者**:RabbitMQ容器作为独立的服务,提供消息队列功能 2. **连接桥梁**:它连接了物联网设备(通过MQTT协议)和您的后端系统(通过AMQP协议) 3. **消息临时存储**:接收到的消息先存储在RabbitMQ中,然后由您的后端系统消费 ## 完整的消息流转处理流程 ### 1. 设备消息推送阶段 设备通过MQTT协议(端口1883)将数据推送到RabbitMQ服务器。根据您的配置文件: ```yaml # application-testn.yml配置 mqtt: url: tcp://101.37.83.72:1883 username: TraneAdmin password: Trane8888 client: id: consumer-id default: topic: topic ``` 设备会连接到MQTT服务器,并发布消息到指定的主题(可能是像 `x.request` 这样的主题)。 ### 2. 消息路由阶段 RabbitMQ服务器接收到MQTT消息后,会根据配置的交换机和路由规则,将消息路由到相应的队列,如 `q.request`、`q.respond` 等。这个过程是在RabbitMQ内部完成的,基于您项目中设置的交换机和队列绑定关系: ```java // 在MQUtils类中创建交换机、队列和绑定关系 amqpAdmin.declareExchange(new FanoutExchange(exname)); amqpAdmin.declareQueue(new Queue(qname)); amqpAdmin.declareBinding(new Binding(qname, Binding.DestinationType.QUEUE, exname, rkey, null)); ``` ### 3. 后端消息接收阶段 您的Java后端项目通过AMQP协议(端口5672)连接到RabbitMQ,并通过监听器监听相应的队列: ```java // 在RabbitMqUseMqttHander类中的监听方法(实际使用时会启用@RabbitListener注解) public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { String messageString = new String(message.getBody(), StandardCharsets.UTF_8); try { // 处理消息 putdata(messageString); channel.basicAck(deliveryTag, false); } catch (Exception e) { // 异常处理... } } ``` 您的项目配置了连接到RabbitMQ的相关参数: ```yaml # application-testn.yml中RabbitMQ配置 rabbitmq: host: 101.37.83.72 port: 5672 username: TraneAdmin password: Trane8888 template: routing-key: python exchange-name: x.request queue-name: q.request ``` ### 4. 消息处理与解析阶段 后端项目接收到消息后,在`RabbitMqUseMqttHander`类的`putdata`方法中进行处理: ```java public void putdata(String messageString) throws Exception { // 解析JSON消息 JSONObject jsonObject = JSONObject.parseObject(messageString); // 获取项目ID和相关数据 String projectId = jsonObject.getString("projectID"); String tjId = jsonObject.getString("tjID"); // 查询相关数据 List<DeviceStatus> deviceStatusList = deviceStatusDao.findAll(projectId, tjId); List<ProjectSystemInfo> projectSystemInfos = projectSystemInfoRepository.findAll(projectId, tjId); // 创建待更新列表 List<DeviceStatus> needUpdateList = new ArrayList<>(); List<ProjectSystemInfo> needUpdateProjectList = new ArrayList<>(); // 处理实时数据 JSONObject actual = jsonObject.getJSONObject("actual"); if (actual != null) { setActualValue(actual, deviceStatusList, needUpdateList, projectSystemInfos, needUpdateProjectList, projectId, tjId, projectTraneList, dateTime); } // 处理预测数据 JSONObject prediction = jsonObject.getJSONObject("prediction"); if (prediction != null) { setPredictionValue(prediction, projectSystemInfos, needUpdateProjectList, projectId, tjId, projectTraneList); } // 处理优化数据 JSONObject optimization = jsonObject.getJSONObject("optimization"); if (optimization != null) { setOptimizationValue(optimization, deviceStatusList, needUpdateList, projectSystemInfos, needUpdateProjectList, projectId, tjId, projectTraneList, dateTime); } // 处理冷却器数据 JSONObject chiller = jsonObject.getJSONObject("chiller"); if (chiller != null) { chiller(chiller, projectId, dateTime); } } ``` ### 5. 数据保存阶段 解析后的数据会保存到不同的数据库表中: ```java // 保存设备状态数据 deviceStatusDao.saveAll(needUpdateList); // 保存项目系统信息 projectSystemInfoRepository.saveAll(needUpdateProjectList); // 保存历史数据 if (!needUpdateList.isEmpty()) { List<DeviceStatusHistory> deviceStatusHistories = new ArrayList<>(); // 创建历史记录... deviceStatusHistoryDao.saveAll(deviceStatusHistories); } if (!needUpdateProjectList.isEmpty()) { List<ProjectSystemInfoHistory> projectSystemInfoHistories = new ArrayList<>(); // 创建历史记录... projectSystemInfoHistoryRepository.saveAll(projectSystemInfoHistories); } ``` ### 6. 数据处理与分析示例 以处理设备状态的实时数据为例: ```java private void setActualDeviceStatusData(JSONObject object, List<DeviceStatus> deviceStatusList, List<DeviceStatus> needUpdateList, String deviceType, String projectId, String tjId, Date dateTime) { for (Map.Entry<String, Object> entry : object.entrySet()) { String key = entry.getKey(); JSONObject value = (JSONObject) entry.getValue(); // 查找现有设备记录 Optional<DeviceStatus> device = deviceStatusList.stream() .filter(v -> v.getDeviceId().equals(key) && v.getDataType() == 0) .findFirst(); if (device.isPresent()) { // 更新现有记录 DeviceStatus deviceStatus = device.get(); deviceStatus.setDateTime(dateTime); deviceStatus.setPower(Float.valueOf(value.get("power").toString())); deviceStatus.setFrequency(Float.valueOf(value.get("frequency").toString())); deviceStatus.setRunningStatus(Float.valueOf(value.get("running_status").toString())); needUpdateList.add(deviceStatus); } else { // 创建新记录 DeviceStatus deviceStatus = new DeviceStatus(); deviceStatus.setDateTime(dateTime); deviceStatus.setId(UUID.randomUUID().toString()); deviceStatus.setName(key); deviceStatus.setDeviceId(key); deviceStatus.setDeviceType(deviceType); deviceStatus.setIsDel("0"); deviceStatus.setVersion("0"); deviceStatus.setProjectId(projectId); deviceStatus.setTjId(Integer.valueOf(tjId)); deviceStatus.setDataType(0); deviceStatus.setPower(Float.valueOf(value.get("power").toString())); deviceStatus.setFrequency(Float.valueOf(value.get("frequency").toString())); deviceStatus.setRunningStatus(Float.valueOf(value.get("running_status").toString())); needUpdateList.add(deviceStatus); } } } ``` ## 消息格式及处理详解 根据您的代码,设备推送的消息格式是JSON格式,包含以下主要部分: ```json { "projectID": "项目ID", "tjID": "统计ID", "actual": { "dateTime": "2023-01-01 12:00:00", "PP": { "设备ID1": { "power": 100.0, "frequency": 50.0, "running_status": 1.0 } }, "CT": {...}, "CP": {...}, "CH": {...}, "SYS": {...}, "spt": {...}, "strategy": {...} }, "prediction": {...}, "optimization": {...}, "chiller": {...} } ``` 各模块处理逻辑: - **actual**: 表示设备实时数据,包含PP、CT、CP、CH等设备类型 - **prediction**: 表示预测数据 - **optimization**: 表示优化数据 - **chiller**: 表示冷却器数据 ## 系统架构与组件关系图 ``` +----------------+ +------------------+ +------------------+ | | MQTT | | AMQP | | | 物联网设备 +-------> RabbitMQ容器 +------> Java后端项目 | | (传感器/控制器) | 1883 | (消息中间件) | 5672 | (数据处理与存储) | | | | | | | +----------------+ +------------------+ +--------+---------+ | | JDBC v +------------------+ | | | 数据库 | | (MySQL) | | | +------------------+ ``` ## 总结 1. **设备通过MQTT协议**将数据推送到RabbitMQ服务器的指定主题 2. **RabbitMQ服务器**接收数据并根据路由规则将消息存储在相应队列中 3. **Java后端项目**通过AMQP协议连接RabbitMQ,监听队列接收消息 4. **后端项目解析消息**,将其分解为实时数据、预测数据、优化数据等 5. **数据持久化**到数据库,同时保存历史记录 6. **异常处理机制**确保消息处理的可靠性,包括消息确认和异常重试 这个架构实现了从物联网设备到数据存储的完整链路,通过消息中间件解耦了数据采集和数据处理,提高了系统的可扩展性和可靠性。
天问 commented 11 hours ago
Owner

根据您的项目代码,我将详细介绍系统中RabbitMQ和MQTT的使用情况。

消息中间件概览

您的项目使用了两种消息中间件:RabbitMQ和MQTT,主要用于数据收集、处理和分析的架构中。

RabbitMQ配置

RabbitMQ的配置在RabbitMqConfig类中定义:

@Configuration
public class RabbitMqConfig {
    // ... existing code ...
    
    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        
        // 设置连接超时
        connectionFactory.setConnectionTimeout(30000);
        // 设置心跳时间间隔
        connectionFactory.setRequestedHeartbeat(60);
        // 设置缓存模式
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
        // 设置通道缓存大小
        connectionFactory.setChannelCacheSize(25);
        // 设置连接恢复间隔时间
        connectionFactory.setRecoveryInterval(5000);
        return connectionFactory;
    }
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 设置手动确认模式
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 降低并发消费者数量,避免过多线程消耗资源
        factory.setConcurrentConsumers(2);
        // ... existing code ...
    }
}

MQTT配置

MQTT相关配置在应用配置文件中,例如application-testn.yml

mqtt:
  url: tcp://101.37.83.72:1883
  username: TraneAdmin
  password: Trane8888
  client:
    id: consumer-id
  default:
    topic: topic

消息处理流程

1. 消息推送

消息推送通过MQUtils类实现:

@Service
public class MQUtils {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private AmqpAdmin amqpAdmin;

    public void sendmq(String projectid, String message, String qname, String exname, String rkey) {
        // 检查队列是否存在,不存在则创建
        boolean b = true;
        if(qname != null && !"".equals(qname)) {
            try {
                Properties queueProperties = amqpAdmin.getQueueProperties(qname);
                if(queueProperties == null) {
                    b = false;
                }
            } catch (Exception e) {
                b = false;
            }
            if(!b) {
                // 创建交换机、队列和绑定关系
                amqpAdmin.declareExchange(new FanoutExchange(exname));
                amqpAdmin.declareQueue(new Queue(qname));
                amqpAdmin.declareBinding(new Binding(qname, Binding.DestinationType.QUEUE, exname, rkey, null));
            }
        }

        // 发送消息
        rabbitTemplate.convertAndSend(exname, rkey, message);
    }
}

2. 消息接收与处理

消息接收和处理主要由RabbitMqUseMqttHander类负责:

@Slf4j
@Service
public class RabbitMqUseMqttHander {
    // ... existing code ...
    
    // 监听RabbitMQ队列的消息
    public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws ParseException, IOException {
        String messageString = new String(message.getBody(), StandardCharsets.UTF_8);
        
        // 检查异常计数和上一次异常时间
        long currentTime = System.currentTimeMillis();
        if (currentTime - lastExceptionTime.get() > EXCEPTION_RESET_INTERVAL) {
            exceptionCounter.set(0);
        }
        
        // 如果短时间内异常次数过多,停止处理一段时间
        if (exceptionCounter.get() > MAX_EXCEPTIONS) {
            log.error("短时间内异常过多,拒绝处理消息: {}", message.getMessageProperties());
            channel.basicReject(deliveryTag, false); // 拒绝消息,不重入队列
            return;
        }
        
        try {
            // 同步处理消息,确保在确认消息前完成处理
            putdata(messageString);
            channel.basicAck(deliveryTag, false);
            
            // 成功处理,重置异常计数
            if (exceptionCounter.get() > 0) {
                exceptionCounter.decrementAndGet();
            }
        } catch (Exception e) {
            // ... 异常处理
        }
    }
    
    // 判断异常是否为临时性异常
    private boolean isTransientException(Exception e) {
        // ... 异常判断逻辑
    }
}

3. 数据解析与存储

putdata方法负责解析消息内容并存储到数据库:

public void putdata(String messageString) throws Exception {
    // 解析JSON消息
    JSONObject jsonObject = JSONObject.parseObject(messageString);
    // 获取项目ID和相关数据
    String projectId = jsonObject.getString("projectID");
    String tjId = jsonObject.getString("tjID");
    
    // 查询设备状态和系统信息
    List<DeviceStatus> deviceStatusList = deviceStatusDao.findAll(projectId, tjId);
    List<ProjectSystemInfo> projectSystemInfos = projectSystemInfoRepository.findAll(projectId, tjId);
    
    // 存储各种类型的数据
    List<DeviceStatus> needUpdateList = new ArrayList<>();
    List<ProjectSystemInfo> needUpdateProjectList = new ArrayList<>();
    
    // 处理不同类型的数据
    JSONObject actual = jsonObject.getJSONObject("actual");
    JSONObject prediction = jsonObject.getJSONObject("prediction");
    JSONObject optimization = jsonObject.getJSONObject("optimization");
    
    // 保存数据到数据库
    deviceStatusDao.saveAll(needUpdateList);
    projectSystemInfoRepository.saveAll(needUpdateProjectList);
    
    // 保存历史记录
    saveHistoryData(needUpdateList, needUpdateProjectList);
}

关键组件详解

1. RabbitMQ监听器

项目使用Spring AMQP提供的@RabbitListener注解来监听指定队列的消息。在RabbitMqUseMqttHander类中,虽然当前注解被注释掉了,但实际上在项目运行时会激活这些监听器:

// @RabbitListener(queues = "test.q.respond")
public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
    // 消息处理逻辑
}

2. 消息确认机制

项目使用手动确认模式处理消息,确保消息被正确处理后才确认:

try {
    // 处理消息
    putdata(messageString);
    // 确认消息
    channel.basicAck(deliveryTag, false);
} catch (Exception e) {
    // 拒绝消息
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}

3. 数据模型

项目处理的主要数据实体包括:

  • DeviceStatus: 设备状态信息
  • ProjectSystemInfo: 项目系统信息
  • ProjectTrane: 项目基本信息
  • ProjectLargeData: 大数据信息

4. MQTT与RabbitMQ结合

项目将MQTT和RabbitMQ结合使用:

  • MQTT主要用于接收来自物联网设备的数据
  • RabbitMQ用于系统内部组件间的消息传递
  • RabbitMqUseMqttHander类将两者结合起来处理各种场景的数据

总结

您的项目中消息处理流程:

  1. 数据采集: 通过MQTT从物联网设备收集数据
  2. 消息转发: 使用RabbitMQ在系统内部传递和处理消息
  3. 数据处理: 通过RabbitMqUseMqttHander解析并处理消息
  4. 数据存储: 将处理后的数据保存到数据库
  5. 历史记录: 同时保存数据历史记录供后续分析

系统的设计考虑了异常处理、消息确认、连接恢复等关键因素,以确保在面对网络问题或系统故障时的稳定性。

根据您的项目代码,我将详细介绍系统中RabbitMQ和MQTT的使用情况。 ## 消息中间件概览 您的项目使用了两种消息中间件:RabbitMQ和MQTT,主要用于数据收集、处理和分析的架构中。 ### RabbitMQ配置 RabbitMQ的配置在`RabbitMqConfig`类中定义: ```java:trane-tech-energy-strategy-optimization-platform-backend/src/main/java/org/example/config/RabbitMqConfig.java @Configuration public class RabbitMqConfig { // ... existing code ... @Bean public CachingConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); // 设置连接超时 connectionFactory.setConnectionTimeout(30000); // 设置心跳时间间隔 connectionFactory.setRequestedHeartbeat(60); // 设置缓存模式 connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL); // 设置通道缓存大小 connectionFactory.setChannelCacheSize(25); // 设置连接恢复间隔时间 connectionFactory.setRecoveryInterval(5000); return connectionFactory; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); // 设置手动确认模式 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 降低并发消费者数量,避免过多线程消耗资源 factory.setConcurrentConsumers(2); // ... existing code ... } } ``` ### MQTT配置 MQTT相关配置在应用配置文件中,例如`application-testn.yml`: ```yaml mqtt: url: tcp://101.37.83.72:1883 username: TraneAdmin password: Trane8888 client: id: consumer-id default: topic: topic ``` ## 消息处理流程 ### 1. 消息推送 消息推送通过`MQUtils`类实现: ```java @Service public class MQUtils { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private AmqpAdmin amqpAdmin; public void sendmq(String projectid, String message, String qname, String exname, String rkey) { // 检查队列是否存在,不存在则创建 boolean b = true; if(qname != null && !"".equals(qname)) { try { Properties queueProperties = amqpAdmin.getQueueProperties(qname); if(queueProperties == null) { b = false; } } catch (Exception e) { b = false; } if(!b) { // 创建交换机、队列和绑定关系 amqpAdmin.declareExchange(new FanoutExchange(exname)); amqpAdmin.declareQueue(new Queue(qname)); amqpAdmin.declareBinding(new Binding(qname, Binding.DestinationType.QUEUE, exname, rkey, null)); } } // 发送消息 rabbitTemplate.convertAndSend(exname, rkey, message); } } ``` ### 2. 消息接收与处理 消息接收和处理主要由`RabbitMqUseMqttHander`类负责: ```java:trane-tech-energy-strategy-optimization-platform-backend/src/main/java/org/example/handler/RabbitMqUseMqttHander.java @Slf4j @Service public class RabbitMqUseMqttHander { // ... existing code ... // 监听RabbitMQ队列的消息 public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws ParseException, IOException { String messageString = new String(message.getBody(), StandardCharsets.UTF_8); // 检查异常计数和上一次异常时间 long currentTime = System.currentTimeMillis(); if (currentTime - lastExceptionTime.get() > EXCEPTION_RESET_INTERVAL) { exceptionCounter.set(0); } // 如果短时间内异常次数过多,停止处理一段时间 if (exceptionCounter.get() > MAX_EXCEPTIONS) { log.error("短时间内异常过多,拒绝处理消息: {}", message.getMessageProperties()); channel.basicReject(deliveryTag, false); // 拒绝消息,不重入队列 return; } try { // 同步处理消息,确保在确认消息前完成处理 putdata(messageString); channel.basicAck(deliveryTag, false); // 成功处理,重置异常计数 if (exceptionCounter.get() > 0) { exceptionCounter.decrementAndGet(); } } catch (Exception e) { // ... 异常处理 } } // 判断异常是否为临时性异常 private boolean isTransientException(Exception e) { // ... 异常判断逻辑 } } ``` ### 3. 数据解析与存储 `putdata`方法负责解析消息内容并存储到数据库: ```java public void putdata(String messageString) throws Exception { // 解析JSON消息 JSONObject jsonObject = JSONObject.parseObject(messageString); // 获取项目ID和相关数据 String projectId = jsonObject.getString("projectID"); String tjId = jsonObject.getString("tjID"); // 查询设备状态和系统信息 List<DeviceStatus> deviceStatusList = deviceStatusDao.findAll(projectId, tjId); List<ProjectSystemInfo> projectSystemInfos = projectSystemInfoRepository.findAll(projectId, tjId); // 存储各种类型的数据 List<DeviceStatus> needUpdateList = new ArrayList<>(); List<ProjectSystemInfo> needUpdateProjectList = new ArrayList<>(); // 处理不同类型的数据 JSONObject actual = jsonObject.getJSONObject("actual"); JSONObject prediction = jsonObject.getJSONObject("prediction"); JSONObject optimization = jsonObject.getJSONObject("optimization"); // 保存数据到数据库 deviceStatusDao.saveAll(needUpdateList); projectSystemInfoRepository.saveAll(needUpdateProjectList); // 保存历史记录 saveHistoryData(needUpdateList, needUpdateProjectList); } ``` ## 关键组件详解 ### 1. RabbitMQ监听器 项目使用Spring AMQP提供的`@RabbitListener`注解来监听指定队列的消息。在`RabbitMqUseMqttHander`类中,虽然当前注解被注释掉了,但实际上在项目运行时会激活这些监听器: ```java // @RabbitListener(queues = "test.q.respond") public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { // 消息处理逻辑 } ``` ### 2. 消息确认机制 项目使用手动确认模式处理消息,确保消息被正确处理后才确认: ```java try { // 处理消息 putdata(messageString); // 确认消息 channel.basicAck(deliveryTag, false); } catch (Exception e) { // 拒绝消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } ``` ### 3. 数据模型 项目处理的主要数据实体包括: - `DeviceStatus`: 设备状态信息 - `ProjectSystemInfo`: 项目系统信息 - `ProjectTrane`: 项目基本信息 - `ProjectLargeData`: 大数据信息 ### 4. MQTT与RabbitMQ结合 项目将MQTT和RabbitMQ结合使用: - MQTT主要用于接收来自物联网设备的数据 - RabbitMQ用于系统内部组件间的消息传递 - `RabbitMqUseMqttHander`类将两者结合起来处理各种场景的数据 ## 总结 您的项目中消息处理流程: 1. **数据采集**: 通过MQTT从物联网设备收集数据 2. **消息转发**: 使用RabbitMQ在系统内部传递和处理消息 3. **数据处理**: 通过`RabbitMqUseMqttHander`解析并处理消息 4. **数据存储**: 将处理后的数据保存到数据库 5. **历史记录**: 同时保存数据历史记录供后续分析 系统的设计考虑了异常处理、消息确认、连接恢复等关键因素,以确保在面对网络问题或系统故障时的稳定性。
Sign in to join this conversation.
No Milestone
No assignee
1 Participants
Loading...
Cancel
Save
There is no content yet.