xuhy
2025-05-06 4ce9ac31e35aca8ebf075d0d9604ee5c6005c39a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package com.ruoyi.jianguan.mqtt.util;
 
 
import com.ruoyi.common.core.domain.R;
import com.ruoyi.jianguan.mqtt.callback.PushCallback;
import com.ruoyi.jianguan.mqtt.client.ClientMQTT;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
 
@Slf4j
@Component
public class MqttPushUtil {
 
    private final static String HOST = "tcp://127.0.0.1:9882";
    private final static String CLIENT_ID = "mqttx_3267186711111fdsasdsa";
    private final static String USER_NAME = "admin";
    private final static String PASS_WORD = "mingxingdianli123";
    private final static Integer TIME_OUT = 30000;
    private final static Integer KEEP_ALIVE = 60000;
    private MqttClient mqttClient;
 
    /**
     * 推送充电桩数据
     * @param topic
     * @param message
     * @return
     */
    public R<String> pushChargePileData(String topic, String message){
        try {
//            MqttPushUtil mqttConnect = new MqttPushUtil();
//            mqttConnect.start();
            //发布消息
            this.publish(topic,message);
            return R.ok("["+topic+"]-推送成功;内容为:["+message+"]");
        } catch (MqttException e) {
            return R.fail("推送失败:["+e.getMessage()+"]");
        }
    }
 
    /**
     * 测试订阅消息
     */
    public static void main(String[] args) throws Exception {
        MqttPushUtil mqttConnect = new MqttPushUtil();
        mqttConnect.start();
        //订阅消息
        mqttConnect.publish("/cmt/IoT/pub/8/6700/status/silent","message11消息");
    }
 
    @Bean
    public MqttPushUtil getMqttPushClient() {
        //连接至mqtt服务器,获取mqtt连接
        MqttPushUtil mqttConnect = new MqttPushUtil();
        mqttConnect.start();
        return this;
    }
    /**
     * 初始化mqtt链接
     * @throws MqttException
     */
    public void start(){
        try {
            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
            mqttClient = new MqttClient(HOST, CLIENT_ID + System.currentTimeMillis(), new MemoryPersistence());
            // MQTT的连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(USER_NAME);
            options.setPassword(PASS_WORD.toCharArray());
            // 设置超时时间 单位为秒
            options.setConnectionTimeout(TIME_OUT);///默认:30
            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
            options.setCleanSession(false);//默认:true
            // 设置断开后重新连接(设置为true时将启用自动重新连接)
            options.setAutomaticReconnect(true);//默认:false
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.setKeepAliveInterval(KEEP_ALIVE);//默认:60
            // 设置回调
            mqttClient.setCallback(new PushCallback());
            mqttClient.connect(options);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 自定义mqtt连接
     * @param host
     * @param clientId
     * @param userName
     * @param passWord
     * @param connectionTimeout
     * @param cleanSession
     * @param automaticReconnect
     * @param keepAliveInterval
     * @param mqttCallback
     * @throws MqttException
     */
    public void start(String host,String clientId, String userName, String passWord,
                      int connectionTimeout, boolean cleanSession,boolean automaticReconnect,
                      int keepAliveInterval,MqttCallback mqttCallback) throws MqttException {
        // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
        mqttClient = new MqttClient(host, clientId + System.currentTimeMillis(), new MemoryPersistence());
        // MQTT的连接设置
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(connectionTimeout);///默认:30
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
        options.setCleanSession(cleanSession);//默认:true
        // 设置断开后重新连接(设置为true时将启用自动重新连接)
        options.setAutomaticReconnect(automaticReconnect);//默认:false
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        options.setKeepAliveInterval(keepAliveInterval);//默认:60
        // 设置回调
        mqttClient.setCallback(mqttCallback);
        mqttClient.connect(options);
    }
 
    /**
     * 发布,默认qos为0,非持久化
     */
    public void publish(String topic, String message1) throws MqttException {
        publish(0, false, topic, message1);
    }
 
    /**
     * 发布
     */
    public void publish(int qos, boolean retained, String topic, String message1) throws MqttException {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(message1.getBytes());
        MqttTopic mTopic = mqttClient.getTopic(topic);
        if (null == mTopic) {
            System.err.println("topic not exist");
            log.error("topic not exist");
        }
        MqttDeliveryToken token;
        token = mTopic.publish(message);
        token.waitForCompletion();
    }
 
    /**
     * 订阅某个主题,qos默认为0
     */
    public void subscribe(String topic) {
        subscribe(topic, 0);
    }
 
    /**
     * 订阅某个主题
     */
    public void subscribe(String topic, int qos) {
        try {
            mqttClient.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
 
 
}