package com.ruoyi.jianguan.mqtt.util; import com.ruoyi.common.core.domain.R; import com.ruoyi.jianguan.mqtt.callback.PushCallback; import com.ruoyi.jianguan.mqtt.client.ClientMQTT; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; @Slf4j @Component public class MqttPushUtil { // private final static String HOST = "tcp://127.0.0.1:9882"; // private final static String CLIENT_ID = "mqttx_3267186711111fdsasdsa"; // private final static String USER_NAME = "admin"; // private final static String PASS_WORD = "mingxingdianli123"; private final static String HOST = "tcp://183.220.129.95:1883"; private final static String CLIENT_ID = "mqttx_3267186711111mxcdszyun"; private final static String USER_NAME = "mxEmqx"; private final static String PASS_WORD = "admin"; private final static Integer TIME_OUT = 30000; private final static Integer KEEP_ALIVE = 60000; private MqttClient mqttClient; /** * 推送充电桩数据 * @param topic * @param message * @return */ public R pushChargePileData(String topic, String message){ try { //发布消息 this.publish(topic,message); return R.ok("["+topic+"]-推送成功;内容为:["+message+"]"); } catch (MqttException e) { return R.fail("推送失败:["+e.getMessage()+"]"); } } /** * 测试订阅消息 */ // public static void main(String[] args) throws Exception { // MqttPushUtil mqttConnect = new MqttPushUtil(); // mqttConnect.start(); // //订阅消息 // mqttConnect.publish("/cmt/IoT/pub/8/6700/status/silent","message11消息"); // } @Bean public MqttPushUtil getMqttPushClient() { //连接至mqtt服务器,获取mqtt连接 this.start(); return this; } /** * 初始化mqtt链接 * @throws MqttException */ public void start(){ try { // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 mqttClient = new MqttClient(HOST, CLIENT_ID + System.currentTimeMillis(), new MemoryPersistence()); // MQTT的连接设置 MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(USER_NAME); options.setPassword(PASS_WORD.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(TIME_OUT);///默认:30 // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(false);//默认:true // 设置断开后重新连接(设置为true时将启用自动重新连接) options.setAutomaticReconnect(true);//默认:false // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(KEEP_ALIVE);//默认:60 // 设置回调 mqttClient.setCallback(new PushCallback()); mqttClient.connect(options); } catch (Exception e) { e.printStackTrace(); } } /** * 自定义mqtt连接 * @param host * @param clientId * @param userName * @param passWord * @param connectionTimeout * @param cleanSession * @param automaticReconnect * @param keepAliveInterval * @param mqttCallback * @throws MqttException */ public void start(String host,String clientId, String userName, String passWord, int connectionTimeout, boolean cleanSession,boolean automaticReconnect, int keepAliveInterval,MqttCallback mqttCallback) throws MqttException { // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 mqttClient = new MqttClient(host, clientId + System.currentTimeMillis(), new MemoryPersistence()); // MQTT的连接设置 MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(userName); options.setPassword(passWord.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(connectionTimeout);///默认:30 // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(cleanSession);//默认:true // 设置断开后重新连接(设置为true时将启用自动重新连接) options.setAutomaticReconnect(automaticReconnect);//默认:false // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(keepAliveInterval);//默认:60 // 设置回调 mqttClient.setCallback(mqttCallback); mqttClient.connect(options); } /** * 发布,默认qos为0,非持久化 */ public void publish(String topic, String message1) throws MqttException { publish(0, false, topic, message1); } /** * 发布 */ public void publish(int qos, boolean retained, String topic, String message1) throws MqttException { MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(message1.getBytes()); MqttTopic mTopic = mqttClient.getTopic(topic); if (null == mTopic) { System.err.println("topic not exist"); log.error("topic not exist"); } MqttDeliveryToken token; token = mTopic.publish(message); token.waitForCompletion(); } /** * 订阅某个主题,qos默认为0 */ public void subscribe(String topic) { subscribe(topic, 0); } /** * 订阅某个主题 */ public void subscribe(String topic, int qos) { try { mqttClient.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } }