From ba2e6c5fa2e21f535da27f29e54da2d6c7eb9aaa Mon Sep 17 00:00:00 2001 From: xuhy <3313886187@qq.com> Date: 星期二, 15 七月 2025 10:47:14 +0800 Subject: [PATCH] 硬件数据推送 --- ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/mqtt/util/MqttPushUtil.java | 73 +++++++++++++++++++++++++----------- 1 files changed, 51 insertions(+), 22 deletions(-) diff --git a/ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/mqtt/util/MqttPushUtil.java b/ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/mqtt/util/MqttPushUtil.java index 2f78687..a76477a 100644 --- a/ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/mqtt/util/MqttPushUtil.java +++ b/ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/mqtt/util/MqttPushUtil.java @@ -11,12 +11,17 @@ import org.springframework.stereotype.Component; @Slf4j +@Component public class MqttPushUtil { - private final static String HOST = "tcp://127.0.0.1:9882"; - private final static String CLIENT_ID = "mqttx_3267186711111fdsasdsa"; - private final static String USER_NAME = "admin"; - private final static String PASS_WORD = "mingxingdianli123"; +// private final static String HOST = "tcp://127.0.0.1:9882"; +// private final static String CLIENT_ID = "mqttx_3267186711111fdsasdsa"; +// private final static String USER_NAME = "admin"; +// private final static String PASS_WORD = "mingxingdianli123"; + private final static String HOST = "tcp://182.129.202.2:1883"; + private final static String CLIENT_ID = "mqttx_3267186711111mxcdszyun"; + private final static String USER_NAME = "mxEmqx"; + private final static String PASS_WORD = "csDN_wlwpt"; private final static Integer TIME_OUT = 30000; private final static Integer KEEP_ALIVE = 60000; private MqttClient mqttClient; @@ -27,12 +32,10 @@ * @param message * @return */ - public static R<String> pushChargePileData(String topic, String message){ + public R<String> pushChargePileData(String topic, String message){ try { - MqttPushUtil mqttConnect = new MqttPushUtil(); - mqttConnect.start(); //发布消息 - mqttConnect.publish(topic,message); + this.publish(topic,message); return R.ok("["+topic+"]-推送成功;内容为:["+message+"]"); } catch (MqttException e) { return R.fail("推送失败:["+e.getMessage()+"]"); @@ -42,19 +45,19 @@ /** * 测试订阅消息 */ - public static void main(String[] args) throws Exception { - MqttPushUtil mqttConnect = new MqttPushUtil(); - mqttConnect.start(); - //订阅消息 - mqttConnect.publish("/cmt/IoT/pub/8/6700/status/silent","message11消息"); - } - -// @Bean -// public MqttPushUtil getMqttPushClient() { -// //连接至mqtt服务器,获取mqtt连接 -// this.start(); -// return this; +// public static void main(String[] args) throws Exception { +// MqttPushUtil mqttConnect = new MqttPushUtil(); +// mqttConnect.start(); +// //订阅消息 +// mqttConnect.publish("/cmt/IoT/pub/8/6700/status/silent","message11消息"); // } + + @Bean + public MqttPushUtil getMqttPushClient() { + //连接至mqtt服务器,获取mqtt连接 + this.start(); + return this; + } /** * 初始化mqtt链接 * @throws MqttException @@ -70,13 +73,29 @@ // 设置超时时间 单位为秒 options.setConnectionTimeout(TIME_OUT);///默认:30 // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接 - options.setCleanSession(false);//默认:true + options.setCleanSession(true);//默认:true // 设置断开后重新连接(设置为true时将启用自动重新连接) options.setAutomaticReconnect(true);//默认:false // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(KEEP_ALIVE);//默认:60 // 设置回调 - mqttClient.setCallback(new PushCallback()); + mqttClient.setCallback(new MqttCallback() { + @Override + public void connectionLost(Throwable cause) { + System.out.println("Connection lost, reconnecting..."); + reconnect(); + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + System.out.println("Message arrived: " + new String(message.getPayload())); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("Delivery complete"); + } + }); mqttClient.connect(options); } catch (Exception e) { e.printStackTrace(); @@ -118,6 +137,16 @@ mqttClient.connect(options); } + public void reconnect() { + while (!mqttClient.isConnected()) { + try { + mqttClient.connect(); + } catch (MqttException e) { + e.printStackTrace(); + } + } + } + /** * 发布,默认qos为0,非持久化 */ -- Gitblit v1.7.1