From 71e051cddfba05a4e7876a92650cc8d7b5fabf28 Mon Sep 17 00:00:00 2001 From: huliguo <2023611923@qq.com> Date: 星期五, 23 五月 2025 20:59:58 +0800 Subject: [PATCH] Merge branch 'dev' of http://120.76.84.145:10101/gitblit/r/java/mx_charging_pile into dev --- ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/mqtt/callback/PushCallback.java | 70 +++++++++++++++++++++++++++++++++++ 1 files changed, 70 insertions(+), 0 deletions(-) diff --git a/ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/mqtt/callback/PushCallback.java b/ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/mqtt/callback/PushCallback.java new file mode 100644 index 0000000..14c0bd2 --- /dev/null +++ b/ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/mqtt/callback/PushCallback.java @@ -0,0 +1,70 @@ +package com.ruoyi.jianguan.mqtt.callback; + +import com.alibaba.fastjson.JSONObject; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import com.ruoyi.common.core.utils.HttpUtils; +import com.ruoyi.common.core.utils.StringUtils; +import com.ruoyi.jianguan.mqtt.config.TopicConstants; +import com.ruoyi.system.api.domain.SysRole; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import javax.annotation.PostConstruct; +import java.time.LocalDate; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +/** + * 消费监听 + */ +@Slf4j +@Component +public class PushCallback implements MqttCallback { + private static MqttConnectOptions options; + private static MqttClient client; + private static final double EARTH_RADIUS = 6378137.0; + + + @Override + public void connectionLost(Throwable throwable) { + if (client == null || !client.isConnected()) { + System.out.println("连接断开,正在重连...."); + int[] Qos = {0}; + String[] topic1 = {TopicConstants.CHARGE_PILE_CODE}; + try { + client.connect(options); + client.subscribe(topic1, Qos); + log.info("连接失败重连成功"); + } catch (MqttException e) { + log.info("连接失败重连失败"); + log.info("connectionLost e:{}", e.getMessage()); + } + } + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + System.err.println("接收消息主题 : " + topic); + System.err.println("接收消息Qos : " + message.getQos()); + System.err.println("接收消息内容 : " + new String(message.getPayload())); + + String silentMessage = new String(message.getPayload()); + JSONObject jsonObject = JSONObject.parseObject(silentMessage); + String messageType = jsonObject.getString("type"); + if(StringUtils.isEmpty(messageType)){ + log.warn("接收消息类型为空"); + } + } + + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + + } + +} -- Gitblit v1.7.1