Pu Zhibing
2025-03-14 3c66b754ee314ae87d0f2eda2fa86a30ea2304e7
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/WarnMsgService.java
@@ -11,17 +11,17 @@
import com.ruoyi.dataInterchange.netty.client.ChannelMap;
import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Util;
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
import com.ruoyi.dataInterchange.wapper.UPConnect;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.time.ZoneOffset;
import java.util.List;
/**
@@ -41,8 +41,16 @@
   @Resource
   private UPWarnMsgAdptInfoDao upWarnMsgAdptInfoDao;
   
   @Resource
   private RedisTemplate redisTemplate;
   
   public void up_warn_msg(ChannelHandlerContext ctx, OuterPacket out) {
      if (!redisTemplate.hasKey("login:" + out.getGnsscenterId())) {
         log.error("链路还未登录校验,拒绝连接");
         ctx.close();
         return;
      }
      WarnMsg warnMsg = getWarnMsg(out);
      DataType dataType = DataType.getDataType(warnMsg.getDataType());
      switch (dataType) {
@@ -53,18 +61,6 @@
         case UP_WARN_MSG_ADPT_INFO:
            log.info("上报报警信息({}):{}", DataType.UP_WARN_MSG_ADPT_INFO.getCode(), out);
            up_warn_msg_adpt_info(ctx, out.getGnsscenterId(), warnMsg);
            break;
         case UP_CTRL_MSG_MONITOR_VEHICLE_ACK:
            log.info("车辆单向监听应答消息({}):{}", DataType.UP_CTRL_MSG_MONITOR_VEHICLE_ACK.getCode(), out);
            break;
         case UP_CTRL_MSG_TAKE_PHOTO_ACK:
            log.info("车辆牌照应答消息({}):{}", DataType.UP_CTRL_MSG_TAKE_PHOTO_ACK.getCode(), out);
            break;
         case UP_CTRL_MSG_TEXT_INFO_ACK:
            log.info("下发车辆报文应答消息({}):{}", DataType.UP_CTRL_MSG_TEXT_INFO_ACK.getCode(), out);
            break;
         case UP_CTRL_MSG_EMERGENCY_MONITORING_ACK:
            log.info("车辆应急接入监管平台应答消息({}):{}", DataType.UP_CTRL_MSG_EMERGENCY_MONITORING_ACK.getCode(), out);
            break;
         default:
            break;
@@ -112,7 +108,7 @@
   public void up_warn_msg_urge_todo_ack(ChannelHandlerContext ctx, int inferiorPlatformId, WarnMsg warnMsg) {
      UPWarnMsgUrgeTodoAck upWarnMsgUrgeTodoAck = new UPWarnMsgUrgeTodoAck().decode(warnMsg);
      upWarnMsgUrgeTodoAck.setInferiorPlatformId(inferiorPlatformId);
      upWarnMsgUrgeTodoAck.setCreateTime(LocalDateTime.now());
      upWarnMsgUrgeTodoAck.setCreateTime(LocalDateTime.now().toEpochSecond(ZoneOffset.ofHours(8)));
      upWarnMsgUrgeTodoAckDao.save(upWarnMsgUrgeTodoAck);
   }
   
@@ -125,8 +121,9 @@
    */
   public void up_warn_msg_adpt_info(ChannelHandlerContext ctx, int inferiorPlatformId, WarnMsg warnMsg) {
      UPWarnMsgAdptInfo upWarnMsgAdptInfo = new UPWarnMsgAdptInfo().decode(warnMsg);
      upWarnMsgAdptInfo.setResult(0x00);
      upWarnMsgAdptInfo.setInferiorPlatformId(inferiorPlatformId);
      upWarnMsgAdptInfo.setCreateTime(LocalDateTime.now());
      upWarnMsgAdptInfo.setCreateTime(LocalDateTime.now().toEpochSecond(ZoneOffset.ofHours(8)));
      upWarnMsgAdptInfoDao.save(upWarnMsgAdptInfo);
   }
   
@@ -135,8 +132,7 @@
    * 定时任务督办报警请求
    */
   public void taskUrgeTodo() {
      upWarnMsgAdptInfoDao.findAll()
      List<UPWarnMsgAdptInfo> list = new ArrayList<>();
      List<UPWarnMsgAdptInfo> list = upWarnMsgAdptInfoDao.findByResultIsAndPushTimeBefore(0x00, LocalDateTime.now().toEpochSecond(ZoneOffset.ofHours(8)));
      for (UPWarnMsgAdptInfo upWarnMsgAdptInfo : list) {
         down_warn_msg_urge_todo_req(upWarnMsgAdptInfo);
      }
@@ -147,21 +143,23 @@
    * 报警督办请求
    */
   public void down_warn_msg_urge_todo_req(UPWarnMsgAdptInfo upWarnMsgAdptInfo) {
      if (!redisTemplate.hasKey("login:" + upWarnMsgAdptInfo.getInferiorPlatformId())) {
         log.error("链路还未登录校验,拒绝连接");
         return;
      }
      int inferiorPlatformId = upWarnMsgAdptInfo.getInferiorPlatformId();
      DOWNWarnMsgUrgeTodoReq downWarnMsgUrgeTodoReq = new DOWNWarnMsgUrgeTodoReq().build(upWarnMsgAdptInfo);
      log.info("报警督办请求({}):{}", DataType.DOWN_WARN_MSG_URGE_TODO_REQ.getCode(), JSON.toJSONString(downWarnMsgUrgeTodoReq));
      downWarnMsgUrgeTodoReq.setDataType(DataType.DOWN_WARN_MSG_URGE_TODO_REQ.getCode());
      downWarnMsgUrgeTodoReq.setDataLength(92);
      byte[] body = downWarnMsgUrgeTodoReq.encode();
      OuterPacket out = new OuterPacket(DataType.DOWN_WARN_MSG_URGE_TODO_REQ.getCode(), body);
      //获取从链路通道
      Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
      if (null != channel && channel.isActive()) {
         channel.writeAndFlush(out);
         channel.flush();
      } else {
         //重新连接从链路
         UPConnect ipAndPort = ChannelMap.getIpAndPort(inferiorPlatformId);
         connectReqService.downConnect(inferiorPlatformId, ipAndPort.getDownLinkIp(), ipAndPort.getDownLinkPort(), ipAndPort.getVerifyCode());
         log.info("报警督办请求({}):{}", DataType.DOWN_WARN_MSG_URGE_TODO_REQ.getCode(), JSON.toJSONString(downWarnMsgUrgeTodoReq));
      }
      upWarnMsgAdptInfo.setPushTime(LocalDateTime.now().plusDays(7).toEpochSecond(ZoneOffset.ofHours(8)));
      upWarnMsgAdptInfoDao.save(upWarnMsgAdptInfo);
   }
}