ruoyi-service/ruoyi-dataInterchange/pom.xml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/util/mqtt/MqttConfiguration.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/util/mqtt/MqttConfigurationProperties.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/util/mqtt/MqttInit.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/util/mqtt/MqttReceiverMessageHandler.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
ruoyi-service/ruoyi-dataInterchange/pom.xml
@@ -124,6 +124,15 @@ <version>1.1.13.RELEASE</version> <systemPath>${project.basedir}/lib/artemis-http-client-1.1.13.RELEASE.jar</systemPath> </dependency> <!--mqtt依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> </dependencies> <build> ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/util/mqtt/MqttConfiguration.java
New file @@ -0,0 +1,97 @@ package com.ruoyi.dataInterchange.util.mqtt; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.ExecutorChannel; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.messaging.MessageHandler; import javax.annotation.Resource; import java.util.concurrent.Executors; /** * @author zhibing.pu * @Date 2025/5/23 17:26 */ @Configuration public class MqttConfiguration { @Resource private MqttConfigurationProperties mqttConfigurationProperties; /** * 连接参数 * * @return */ @Bean public MqttConnectOptions mqttConnectOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttConfigurationProperties.getUsername()); options.setPassword(mqttConfigurationProperties.getPassword().toCharArray()); options.setServerURIs(new String[]{mqttConfigurationProperties.getUrl()}); options.setConnectionTimeout(mqttConfigurationProperties.getConnectionTimeout()); options.setKeepAliveInterval(mqttConfigurationProperties.getKeepAlive()); options.setCleanSession(true); // 设置为false以便断线重连后恢复会话 options.setAutomaticReconnect(true); return options; } /** * 连接工厂 * * @param options * @return */ @Bean public MqttPahoClientFactory mqttClientFactory(MqttConnectOptions options) { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(options); return factory; } /** * 配置入站适配器 * * @param mqttClientFactory * @return */ @Bean public MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfigurationProperties.getSubClientId(), mqttClientFactory); // adapter.addTopic("pub/300119110099"); 订阅主题,也可以放在初始化动态配置 adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * 支持多线程并发处理消息的输入通道 * * @return */ @Bean public ExecutorChannel mqttInputChannel() { return new ExecutorChannel(Executors.newFixedThreadPool(10)); // 线程池大小可以调整 } /** * 配置消息处理器 * * @return */ @Bean @ServiceActivator(inputChannel = "mqttInputChannel") // 指定通道 public MessageHandler messageHandler() { return new MqttReceiverMessageHandler(); } } ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/util/mqtt/MqttConfigurationProperties.java
New file @@ -0,0 +1,22 @@ package com.ruoyi.dataInterchange.util.mqtt; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; /** * @author zhibing.pu * @Date 2025/5/23 17:24 */ @Data @Component @ConfigurationProperties(prefix = "spring.mqtt") public class MqttConfigurationProperties { private String username; private String password; private String url; private String subClientId; private String pubClientId; private int connectionTimeout; private int keepAlive; } ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/util/mqtt/MqttInit.java
New file @@ -0,0 +1,27 @@ package com.ruoyi.dataInterchange.util.mqtt; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * @author zhibing.pu * @Date 2025/5/23 17:35 */ @Slf4j @Component public class MqttInit { @Autowired private MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter; @PostConstruct public void init() { subscribeAllTopics(); } public void subscribeAllTopics() { messageDrivenChannelAdapter.addTopic("artemis/event_msa_alarm/5201154049/admin"); } } ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/util/mqtt/MqttReceiverMessageHandler.java
New file @@ -0,0 +1,26 @@ package com.ruoyi.dataInterchange.util.mqtt; import lombok.extern.slf4j.Slf4j; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component; /** * @author zhibing.pu * @Date 2025/5/23 17:30 */ @Slf4j @Component public class MqttReceiverMessageHandler implements MessageHandler { @Override public void handleMessage(Message<?> message) throws MessagingException { MessageHeaders headers = message.getHeaders(); log.info("线程名称:{},收到消息,主题:{},消息:{}", Thread.currentThread().getName(), headers.get("mqtt_receivedTopic").toString(), message.getPayload()); // log.info("收到消息主题:{}", headers.get("mqtt_receivedTopic").toString()); // log.info("收到消息:{}", message.getPayload()); // 消息保存到内存队列里面,定时批量入库,也可以在这里直接入库 } }