满足性能指标要求
提高可靠性:
简化通信协议:
实现消息广播:
更灵活的部署架构:
数据访问需求:
数据库支持复杂的查询和分析功能(SQL查询、索引、统计等)
// 保存设备状态历史数据 if (!needUpdateList.isEmpty()) { List<DeviceStatusHistory> deviceStatusHistories = new ArrayList<>(); // 创建历史记录... deviceStatusHistoryDao.saveAll(deviceStatusHistories); }
事务和一致性:
这种架构结合了消息队列的高吞吐和数据库的持久性存储,是大规模物联网数据处理系统的常见最佳实践。
根据您提供的端口信息和项目代码,我可以详细解释MQ容器、设备、项目之间的关系和消息处理流程。
您提到的端口信息表明您正在运行一个RabbitMQ容器,该容器暴露了多个端口:
1883
5672
15672
这个MQ容器与您的项目之间的关系是:
设备通过MQTT协议(端口1883)将数据推送到RabbitMQ服务器。根据您的配置文件:
# application-testn.yml配置 mqtt: url: tcp://101.37.83.72:1883 username: TraneAdmin password: Trane8888 client: id: consumer-id default: topic: topic
设备会连接到MQTT服务器,并发布消息到指定的主题(可能是像 x.request 这样的主题)。
x.request
RabbitMQ服务器接收到MQTT消息后,会根据配置的交换机和路由规则,将消息路由到相应的队列,如 q.request、q.respond 等。这个过程是在RabbitMQ内部完成的,基于您项目中设置的交换机和队列绑定关系:
q.request
q.respond
// 在MQUtils类中创建交换机、队列和绑定关系 amqpAdmin.declareExchange(new FanoutExchange(exname)); amqpAdmin.declareQueue(new Queue(qname)); amqpAdmin.declareBinding(new Binding(qname, Binding.DestinationType.QUEUE, exname, rkey, null));
您的Java后端项目通过AMQP协议(端口5672)连接到RabbitMQ,并通过监听器监听相应的队列:
// 在RabbitMqUseMqttHander类中的监听方法(实际使用时会启用@RabbitListener注解) public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { String messageString = new String(message.getBody(), StandardCharsets.UTF_8); try { // 处理消息 putdata(messageString); channel.basicAck(deliveryTag, false); } catch (Exception e) { // 异常处理... } }
您的项目配置了连接到RabbitMQ的相关参数:
# application-testn.yml中RabbitMQ配置 rabbitmq: host: 101.37.83.72 port: 5672 username: TraneAdmin password: Trane8888 template: routing-key: python exchange-name: x.request queue-name: q.request
后端项目接收到消息后,在RabbitMqUseMqttHander类的putdata方法中进行处理:
RabbitMqUseMqttHander
putdata
public void putdata(String messageString) throws Exception { // 解析JSON消息 JSONObject jsonObject = JSONObject.parseObject(messageString); // 获取项目ID和相关数据 String projectId = jsonObject.getString("projectID"); String tjId = jsonObject.getString("tjID"); // 查询相关数据 List<DeviceStatus> deviceStatusList = deviceStatusDao.findAll(projectId, tjId); List<ProjectSystemInfo> projectSystemInfos = projectSystemInfoRepository.findAll(projectId, tjId); // 创建待更新列表 List<DeviceStatus> needUpdateList = new ArrayList<>(); List<ProjectSystemInfo> needUpdateProjectList = new ArrayList<>(); // 处理实时数据 JSONObject actual = jsonObject.getJSONObject("actual"); if (actual != null) { setActualValue(actual, deviceStatusList, needUpdateList, projectSystemInfos, needUpdateProjectList, projectId, tjId, projectTraneList, dateTime); } // 处理预测数据 JSONObject prediction = jsonObject.getJSONObject("prediction"); if (prediction != null) { setPredictionValue(prediction, projectSystemInfos, needUpdateProjectList, projectId, tjId, projectTraneList); } // 处理优化数据 JSONObject optimization = jsonObject.getJSONObject("optimization"); if (optimization != null) { setOptimizationValue(optimization, deviceStatusList, needUpdateList, projectSystemInfos, needUpdateProjectList, projectId, tjId, projectTraneList, dateTime); } // 处理冷却器数据 JSONObject chiller = jsonObject.getJSONObject("chiller"); if (chiller != null) { chiller(chiller, projectId, dateTime); } }
解析后的数据会保存到不同的数据库表中:
// 保存设备状态数据 deviceStatusDao.saveAll(needUpdateList); // 保存项目系统信息 projectSystemInfoRepository.saveAll(needUpdateProjectList); // 保存历史数据 if (!needUpdateList.isEmpty()) { List<DeviceStatusHistory> deviceStatusHistories = new ArrayList<>(); // 创建历史记录... deviceStatusHistoryDao.saveAll(deviceStatusHistories); } if (!needUpdateProjectList.isEmpty()) { List<ProjectSystemInfoHistory> projectSystemInfoHistories = new ArrayList<>(); // 创建历史记录... projectSystemInfoHistoryRepository.saveAll(projectSystemInfoHistories); }
以处理设备状态的实时数据为例:
private void setActualDeviceStatusData(JSONObject object, List<DeviceStatus> deviceStatusList, List<DeviceStatus> needUpdateList, String deviceType, String projectId, String tjId, Date dateTime) { for (Map.Entry<String, Object> entry : object.entrySet()) { String key = entry.getKey(); JSONObject value = (JSONObject) entry.getValue(); // 查找现有设备记录 Optional<DeviceStatus> device = deviceStatusList.stream() .filter(v -> v.getDeviceId().equals(key) && v.getDataType() == 0) .findFirst(); if (device.isPresent()) { // 更新现有记录 DeviceStatus deviceStatus = device.get(); deviceStatus.setDateTime(dateTime); deviceStatus.setPower(Float.valueOf(value.get("power").toString())); deviceStatus.setFrequency(Float.valueOf(value.get("frequency").toString())); deviceStatus.setRunningStatus(Float.valueOf(value.get("running_status").toString())); needUpdateList.add(deviceStatus); } else { // 创建新记录 DeviceStatus deviceStatus = new DeviceStatus(); deviceStatus.setDateTime(dateTime); deviceStatus.setId(UUID.randomUUID().toString()); deviceStatus.setName(key); deviceStatus.setDeviceId(key); deviceStatus.setDeviceType(deviceType); deviceStatus.setIsDel("0"); deviceStatus.setVersion("0"); deviceStatus.setProjectId(projectId); deviceStatus.setTjId(Integer.valueOf(tjId)); deviceStatus.setDataType(0); deviceStatus.setPower(Float.valueOf(value.get("power").toString())); deviceStatus.setFrequency(Float.valueOf(value.get("frequency").toString())); deviceStatus.setRunningStatus(Float.valueOf(value.get("running_status").toString())); needUpdateList.add(deviceStatus); } } }
根据您的代码,设备推送的消息格式是JSON格式,包含以下主要部分:
{ "projectID": "项目ID", "tjID": "统计ID", "actual": { "dateTime": "2023-01-01 12:00:00", "PP": { "设备ID1": { "power": 100.0, "frequency": 50.0, "running_status": 1.0 } }, "CT": {...}, "CP": {...}, "CH": {...}, "SYS": {...}, "spt": {...}, "strategy": {...} }, "prediction": {...}, "optimization": {...}, "chiller": {...} }
各模块处理逻辑:
+----------------+ +------------------+ +------------------+ | | MQTT | | AMQP | | | 物联网设备 +-------> RabbitMQ容器 +------> Java后端项目 | | (传感器/控制器) | 1883 | (消息中间件) | 5672 | (数据处理与存储) | | | | | | | +----------------+ +------------------+ +--------+---------+ | | JDBC v +------------------+ | | | 数据库 | | (MySQL) | | | +------------------+
这个架构实现了从物联网设备到数据存储的完整链路,通过消息中间件解耦了数据采集和数据处理,提高了系统的可扩展性和可靠性。
根据您的项目代码,我将详细介绍系统中RabbitMQ和MQTT的使用情况。
您的项目使用了两种消息中间件:RabbitMQ和MQTT,主要用于数据收集、处理和分析的架构中。
RabbitMQ的配置在RabbitMqConfig类中定义:
RabbitMqConfig
@Configuration public class RabbitMqConfig { // ... existing code ... @Bean public CachingConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); // 设置连接超时 connectionFactory.setConnectionTimeout(30000); // 设置心跳时间间隔 connectionFactory.setRequestedHeartbeat(60); // 设置缓存模式 connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL); // 设置通道缓存大小 connectionFactory.setChannelCacheSize(25); // 设置连接恢复间隔时间 connectionFactory.setRecoveryInterval(5000); return connectionFactory; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); // 设置手动确认模式 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 降低并发消费者数量,避免过多线程消耗资源 factory.setConcurrentConsumers(2); // ... existing code ... } }
MQTT相关配置在应用配置文件中,例如application-testn.yml:
application-testn.yml
mqtt: url: tcp://101.37.83.72:1883 username: TraneAdmin password: Trane8888 client: id: consumer-id default: topic: topic
消息推送通过MQUtils类实现:
MQUtils
@Service public class MQUtils { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private AmqpAdmin amqpAdmin; public void sendmq(String projectid, String message, String qname, String exname, String rkey) { // 检查队列是否存在,不存在则创建 boolean b = true; if(qname != null && !"".equals(qname)) { try { Properties queueProperties = amqpAdmin.getQueueProperties(qname); if(queueProperties == null) { b = false; } } catch (Exception e) { b = false; } if(!b) { // 创建交换机、队列和绑定关系 amqpAdmin.declareExchange(new FanoutExchange(exname)); amqpAdmin.declareQueue(new Queue(qname)); amqpAdmin.declareBinding(new Binding(qname, Binding.DestinationType.QUEUE, exname, rkey, null)); } } // 发送消息 rabbitTemplate.convertAndSend(exname, rkey, message); } }
消息接收和处理主要由RabbitMqUseMqttHander类负责:
@Slf4j @Service public class RabbitMqUseMqttHander { // ... existing code ... // 监听RabbitMQ队列的消息 public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws ParseException, IOException { String messageString = new String(message.getBody(), StandardCharsets.UTF_8); // 检查异常计数和上一次异常时间 long currentTime = System.currentTimeMillis(); if (currentTime - lastExceptionTime.get() > EXCEPTION_RESET_INTERVAL) { exceptionCounter.set(0); } // 如果短时间内异常次数过多,停止处理一段时间 if (exceptionCounter.get() > MAX_EXCEPTIONS) { log.error("短时间内异常过多,拒绝处理消息: {}", message.getMessageProperties()); channel.basicReject(deliveryTag, false); // 拒绝消息,不重入队列 return; } try { // 同步处理消息,确保在确认消息前完成处理 putdata(messageString); channel.basicAck(deliveryTag, false); // 成功处理,重置异常计数 if (exceptionCounter.get() > 0) { exceptionCounter.decrementAndGet(); } } catch (Exception e) { // ... 异常处理 } } // 判断异常是否为临时性异常 private boolean isTransientException(Exception e) { // ... 异常判断逻辑 } }
putdata方法负责解析消息内容并存储到数据库:
public void putdata(String messageString) throws Exception { // 解析JSON消息 JSONObject jsonObject = JSONObject.parseObject(messageString); // 获取项目ID和相关数据 String projectId = jsonObject.getString("projectID"); String tjId = jsonObject.getString("tjID"); // 查询设备状态和系统信息 List<DeviceStatus> deviceStatusList = deviceStatusDao.findAll(projectId, tjId); List<ProjectSystemInfo> projectSystemInfos = projectSystemInfoRepository.findAll(projectId, tjId); // 存储各种类型的数据 List<DeviceStatus> needUpdateList = new ArrayList<>(); List<ProjectSystemInfo> needUpdateProjectList = new ArrayList<>(); // 处理不同类型的数据 JSONObject actual = jsonObject.getJSONObject("actual"); JSONObject prediction = jsonObject.getJSONObject("prediction"); JSONObject optimization = jsonObject.getJSONObject("optimization"); // 保存数据到数据库 deviceStatusDao.saveAll(needUpdateList); projectSystemInfoRepository.saveAll(needUpdateProjectList); // 保存历史记录 saveHistoryData(needUpdateList, needUpdateProjectList); }
项目使用Spring AMQP提供的@RabbitListener注解来监听指定队列的消息。在RabbitMqUseMqttHander类中,虽然当前注解被注释掉了,但实际上在项目运行时会激活这些监听器:
@RabbitListener
// @RabbitListener(queues = "test.q.respond") public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { // 消息处理逻辑 }
项目使用手动确认模式处理消息,确保消息被正确处理后才确认:
try { // 处理消息 putdata(messageString); // 确认消息 channel.basicAck(deliveryTag, false); } catch (Exception e) { // 拒绝消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); }
项目处理的主要数据实体包括:
DeviceStatus
ProjectSystemInfo
ProjectTrane
ProjectLargeData
项目将MQTT和RabbitMQ结合使用:
您的项目中消息处理流程:
系统的设计考虑了异常处理、消息确认、连接恢复等关键因素,以确保在面对网络问题或系统故障时的稳定性。
满足性能指标要求
RabbitMQ中间件
使用RabbitMQ的原因(而非直接推送到后端)
提高可靠性:
简化通信协议:
实现消息广播:
更灵活的部署架构:
为何RabbitMQ存储的消息还需保存到数据库
数据访问需求:
数据库支持复杂的查询和分析功能(SQL查询、索引、统计等)
事务和一致性:
总结
这种架构结合了消息队列的高吞吐和数据库的持久性存储,是大规模物联网数据处理系统的常见最佳实践。
MQ容器与后端项目的关系及消息流转流程
根据您提供的端口信息和项目代码,我可以详细解释MQ容器、设备、项目之间的关系和消息处理流程。
MQ容器与项目的关系
您提到的端口信息表明您正在运行一个RabbitMQ容器,该容器暴露了多个端口:
1883
: MQTT协议端口,用于物联网设备连接5672
: AMQP协议端口,用于应用程序连接RabbitMQ15672
: RabbitMQ管理界面端口这个MQ容器与您的项目之间的关系是:
完整的消息流转处理流程
1. 设备消息推送阶段
设备通过MQTT协议(端口1883)将数据推送到RabbitMQ服务器。根据您的配置文件:
设备会连接到MQTT服务器,并发布消息到指定的主题(可能是像
x.request
这样的主题)。2. 消息路由阶段
RabbitMQ服务器接收到MQTT消息后,会根据配置的交换机和路由规则,将消息路由到相应的队列,如
q.request
、q.respond
等。这个过程是在RabbitMQ内部完成的,基于您项目中设置的交换机和队列绑定关系:3. 后端消息接收阶段
您的Java后端项目通过AMQP协议(端口5672)连接到RabbitMQ,并通过监听器监听相应的队列:
您的项目配置了连接到RabbitMQ的相关参数:
4. 消息处理与解析阶段
后端项目接收到消息后,在
RabbitMqUseMqttHander
类的putdata
方法中进行处理:5. 数据保存阶段
解析后的数据会保存到不同的数据库表中:
6. 数据处理与分析示例
以处理设备状态的实时数据为例:
消息格式及处理详解
根据您的代码,设备推送的消息格式是JSON格式,包含以下主要部分:
各模块处理逻辑:
系统架构与组件关系图
总结
这个架构实现了从物联网设备到数据存储的完整链路,通过消息中间件解耦了数据采集和数据处理,提高了系统的可扩展性和可靠性。
根据您的项目代码,我将详细介绍系统中RabbitMQ和MQTT的使用情况。
消息中间件概览
您的项目使用了两种消息中间件:RabbitMQ和MQTT,主要用于数据收集、处理和分析的架构中。
RabbitMQ配置
RabbitMQ的配置在
RabbitMqConfig
类中定义:MQTT配置
MQTT相关配置在应用配置文件中,例如
application-testn.yml
:消息处理流程
1. 消息推送
消息推送通过
MQUtils
类实现:2. 消息接收与处理
消息接收和处理主要由
RabbitMqUseMqttHander
类负责:3. 数据解析与存储
putdata
方法负责解析消息内容并存储到数据库:关键组件详解
1. RabbitMQ监听器
项目使用Spring AMQP提供的
@RabbitListener
注解来监听指定队列的消息。在RabbitMqUseMqttHander
类中,虽然当前注解被注释掉了,但实际上在项目运行时会激活这些监听器:2. 消息确认机制
项目使用手动确认模式处理消息,确保消息被正确处理后才确认:
3. 数据模型
项目处理的主要数据实体包括:
DeviceStatus
: 设备状态信息ProjectSystemInfo
: 项目系统信息ProjectTrane
: 项目基本信息ProjectLargeData
: 大数据信息4. MQTT与RabbitMQ结合
项目将MQTT和RabbitMQ结合使用:
RabbitMqUseMqttHander
类将两者结合起来处理各种场景的数据总结
您的项目中消息处理流程:
RabbitMqUseMqttHander
解析并处理消息系统的设计考虑了异常处理、消息确认、连接恢复等关键因素,以确保在面对网络问题或系统故障时的稳定性。