puzhibing
9 天以前 8c6849d98fc168369d6de2d94d2f42a6d67cd9c5
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/util/mqtt/MqttReceiverMessageHandler.java
@@ -1,11 +1,15 @@
package com.ruoyi.dataInterchange.util.mqtt;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.dataInterchange.server.WarnMsgService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @author zhibing.pu
@@ -15,12 +19,27 @@
@Component
public class MqttReceiverMessageHandler implements MessageHandler {
   
   @Resource
   private WarnMsgService warnMsgService;
   @Override
   public void handleMessage(Message<?> message) throws MessagingException {
      MessageHeaders headers = message.getHeaders();
      log.info("线程名称:{},收到消息,主题:{},消息:{}", Thread.currentThread().getName(), headers.get("mqtt_receivedTopic").toString(), message.getPayload());
      log.error("线程名称:{},收到消息,主题:{},消息:{}", Thread.currentThread().getName(), headers.get("mqtt_receivedTopic").toString(), message.getPayload());
      // log.info("收到消息主题:{}", headers.get("mqtt_receivedTopic").toString());
      // log.info("收到消息:{}", message.getPayload());
      // 消息保存到内存队列里面,定时批量入库,也可以在这里直接入库
      String s = message.getPayload().toString();
      JSONObject jsonObject = JSONObject.parseObject(s);
      String method = jsonObject.getString("method");
      JSONObject params = jsonObject.getJSONObject("params");
      String ability = params.getString("ability");
      switch (ability){
         case "event_msa_alarm":
            warnMsgService.saveWarnMsgService(jsonObject);
            break;
         case "event_msa_pic":
            warnMsgService.saveWarnMsgPicService(jsonObject);
            break;
      }
   }
}