package com.ruoyi.dataInterchange.util.mqtt;
|
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Configuration;
|
import org.springframework.integration.annotation.ServiceActivator;
|
import org.springframework.integration.channel.ExecutorChannel;
|
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
import org.springframework.messaging.MessageHandler;
|
|
import javax.annotation.Resource;
|
import java.util.concurrent.Executors;
|
|
/**
|
* @author zhibing.pu
|
* @Date 2025/5/23 17:26
|
*/
|
@Configuration
|
public class MqttConfiguration {
|
|
@Resource
|
private MqttConfigurationProperties mqttConfigurationProperties;
|
|
|
|
/**
|
* 连接参数
|
*
|
* @return
|
*/
|
@Bean
|
public MqttConnectOptions mqttConnectOptions() {
|
MqttConnectOptions options = new MqttConnectOptions();
|
options.setUserName(mqttConfigurationProperties.getUsername());
|
options.setPassword(mqttConfigurationProperties.getPassword().toCharArray());
|
options.setServerURIs(new String[]{mqttConfigurationProperties.getUrl()});
|
options.setConnectionTimeout(mqttConfigurationProperties.getConnectionTimeout());
|
options.setKeepAliveInterval(mqttConfigurationProperties.getKeepAlive());
|
options.setCleanSession(true); // 设置为false以便断线重连后恢复会话
|
options.setAutomaticReconnect(true);
|
return options;
|
}
|
/**
|
* 连接工厂
|
*
|
* @param options
|
* @return
|
*/
|
@Bean
|
public MqttPahoClientFactory mqttClientFactory(MqttConnectOptions options) {
|
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
factory.setConnectionOptions(options);
|
return factory;
|
}
|
|
|
|
/**
|
* 配置入站适配器
|
*
|
* @param mqttClientFactory
|
* @return
|
*/
|
@Bean
|
public MqttPahoMessageDrivenChannelAdapter messageDrivenChannelAdapter(MqttPahoClientFactory mqttClientFactory) {
|
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfigurationProperties.getSubClientId(), mqttClientFactory);
|
// adapter.addTopic("pub/300119110099"); 订阅主题,也可以放在初始化动态配置
|
adapter.setOutputChannel(mqttInputChannel());
|
return adapter;
|
}
|
|
/**
|
* 支持多线程并发处理消息的输入通道
|
*
|
* @return
|
*/
|
@Bean
|
public ExecutorChannel mqttInputChannel() {
|
return new ExecutorChannel(Executors.newFixedThreadPool(10)); // 线程池大小可以调整
|
}
|
|
/**
|
* 配置消息处理器
|
*
|
* @return
|
*/
|
@Bean
|
@ServiceActivator(inputChannel = "mqttInputChannel") // 指定通道
|
public MessageHandler messageHandler() {
|
return new MqttReceiverMessageHandler();
|
}
|
|
|
|
}
|