From 0b71af4620572229218a4f59b85f0a9c0090279b Mon Sep 17 00:00:00 2001
From: Pu Zhibing <393733352@qq.com>
Date: 星期一, 26 五月 2025 11:48:28 +0800
Subject: [PATCH] 集成MQTT对接公交主防数据
---
ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/WarnMsgService.java | 312 ++++++++++++++++++++++++++++++++++++++++++++++-----
1 files changed, 277 insertions(+), 35 deletions(-)
diff --git a/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/WarnMsgService.java b/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/WarnMsgService.java
index c0064ce..2932e7e 100644
--- a/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/WarnMsgService.java
+++ b/ruoyi-service/ruoyi-dataInterchange/src/main/java/com/ruoyi/dataInterchange/server/WarnMsgService.java
@@ -1,27 +1,33 @@
package com.ruoyi.dataInterchange.server;
import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import com.ruoyi.dataInterchange.dao.UPExgMsgRealLocationDao;
import com.ruoyi.dataInterchange.dao.UPWarnMsgAdptInfoDao;
import com.ruoyi.dataInterchange.dao.UPWarnMsgUrgeTodoAckDao;
-import com.ruoyi.dataInterchange.model.DOWNWarnMsgUrgeTodoReq;
-import com.ruoyi.dataInterchange.model.UPWarnMsgAdptInfo;
-import com.ruoyi.dataInterchange.model.UPWarnMsgUrgeTodoAck;
-import com.ruoyi.dataInterchange.model.WarnMsg;
+import com.ruoyi.dataInterchange.model.*;
import com.ruoyi.dataInterchange.model.enu.DataType;
import com.ruoyi.dataInterchange.netty.client.ChannelMap;
+import com.ruoyi.dataInterchange.util.haikang.model.Alarm;
+import com.ruoyi.dataInterchange.util.haikang.model.AlarmPic;
+import com.ruoyi.dataInterchange.util.haikang.model.Event;
+import com.ruoyi.dataInterchange.util.haikang.model.SubscriptionEvent;
import com.ruoyi.dataInterchange.util.jtt809.common.Jtt809Util;
import com.ruoyi.dataInterchange.util.jtt809.packet.common.OuterPacket;
-import com.ruoyi.dataInterchange.wapper.UPConnect;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.time.LocalDateTime;
-import java.util.ArrayList;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
import java.util.List;
/**
@@ -41,8 +47,20 @@
@Resource
private UPWarnMsgAdptInfoDao upWarnMsgAdptInfoDao;
+ @Resource
+ private RedisTemplate redisTemplate;
+
+ @Resource
+ private UPExgMsgRealLocationDao upExgMsgRealLocationDao;
+ //走川标的公司
+ private List<Integer> enterprise = Arrays.asList(17458936, 43984060, 13914394);
public void up_warn_msg(ChannelHandlerContext ctx, OuterPacket out) {
+ if (!redisTemplate.hasKey("login:" + out.getGnsscenterId())) {
+ log.error("链路还未登录校验,拒绝连接:{}", out.getGnsscenterId());
+ ctx.close();
+ return;
+ }
WarnMsg warnMsg = getWarnMsg(out);
DataType dataType = DataType.getDataType(warnMsg.getDataType());
switch (dataType) {
@@ -54,23 +72,10 @@
log.info("上报报警信息({}):{}", DataType.UP_WARN_MSG_ADPT_INFO.getCode(), out);
up_warn_msg_adpt_info(ctx, out.getGnsscenterId(), warnMsg);
break;
- case UP_CTRL_MSG_MONITOR_VEHICLE_ACK:
- log.info("车辆单向监听应答消息({}):{}", DataType.UP_CTRL_MSG_MONITOR_VEHICLE_ACK.getCode(), out);
- break;
- case UP_CTRL_MSG_TAKE_PHOTO_ACK:
- log.info("车辆牌照应答消息({}):{}", DataType.UP_CTRL_MSG_TAKE_PHOTO_ACK.getCode(), out);
- break;
- case UP_CTRL_MSG_TEXT_INFO_ACK:
- log.info("下发车辆报文应答消息({}):{}", DataType.UP_CTRL_MSG_TEXT_INFO_ACK.getCode(), out);
- break;
- case UP_CTRL_MSG_EMERGENCY_MONITORING_ACK:
- log.info("车辆应急接入监管平台应答消息({}):{}", DataType.UP_CTRL_MSG_EMERGENCY_MONITORING_ACK.getCode(), out);
- break;
default:
break;
}
}
-
/**
* 解析子业务数据
@@ -100,8 +105,7 @@
warnMsg.setData(data);
return warnMsg;
}
-
-
+
/**
* 报警督办应答消息
*
@@ -112,10 +116,10 @@
public void up_warn_msg_urge_todo_ack(ChannelHandlerContext ctx, int inferiorPlatformId, WarnMsg warnMsg) {
UPWarnMsgUrgeTodoAck upWarnMsgUrgeTodoAck = new UPWarnMsgUrgeTodoAck().decode(warnMsg);
upWarnMsgUrgeTodoAck.setInferiorPlatformId(inferiorPlatformId);
- upWarnMsgUrgeTodoAck.setCreateTime(LocalDateTime.now());
+ upWarnMsgUrgeTodoAck.setCreateTime(LocalDateTime.now().toEpochSecond(ZoneOffset.ofHours(8)));
upWarnMsgUrgeTodoAckDao.save(upWarnMsgUrgeTodoAck);
}
-
+
/**
* 上报报警信息
*
@@ -124,9 +128,21 @@
* @param warnMsg
*/
public void up_warn_msg_adpt_info(ChannelHandlerContext ctx, int inferiorPlatformId, WarnMsg warnMsg) {
- UPWarnMsgAdptInfo upWarnMsgAdptInfo = new UPWarnMsgAdptInfo().decode(warnMsg);
+ UPWarnMsgAdptInfo upWarnMsgAdptInfo = enterprise.contains(inferiorPlatformId) ? new UPWarnMsgAdptInfo().decode2(warnMsg) : new UPWarnMsgAdptInfo().decode1(warnMsg);
+ upWarnMsgAdptInfo.setResult(0x00);
upWarnMsgAdptInfo.setInferiorPlatformId(inferiorPlatformId);
- upWarnMsgAdptInfo.setCreateTime(LocalDateTime.now());
+ upWarnMsgAdptInfo.setCreateTime(LocalDateTime.now().toEpochSecond(ZoneOffset.ofHours(8)));
+ if(0 == upWarnMsgAdptInfo.getLongitude()){
+ UPExgMsgRealLocation upExgMsgRealLocation = upExgMsgRealLocationDao.findByVehicleNoOrderByCreateTimeDesc(upWarnMsgAdptInfo.getVehicleNo());
+ if (null != upExgMsgRealLocation) {
+ GnssData gnssData = upExgMsgRealLocation.getGnssData();
+ upWarnMsgAdptInfo.setSpeed(gnssData.getVec1());
+ upWarnMsgAdptInfo.setLongitude(gnssData.getLon());
+ upWarnMsgAdptInfo.setLatitude(gnssData.getLat());
+ }
+ }else if(2 != upWarnMsgAdptInfo.getLevel()){
+ return;
+ }
upWarnMsgAdptInfoDao.save(upWarnMsgAdptInfo);
}
@@ -135,8 +151,7 @@
* 定时任务督办报警请求
*/
public void taskUrgeTodo() {
- upWarnMsgAdptInfoDao.findAll()
- List<UPWarnMsgAdptInfo> list = new ArrayList<>();
+ List<UPWarnMsgAdptInfo> list = upWarnMsgAdptInfoDao.findByResultIsAndLevelNotNullAndPushTimeBefore(0x00, LocalDateTime.now().toEpochSecond(ZoneOffset.ofHours(8)));
for (UPWarnMsgAdptInfo upWarnMsgAdptInfo : list) {
down_warn_msg_urge_todo_req(upWarnMsgAdptInfo);
}
@@ -147,21 +162,248 @@
* 报警督办请求
*/
public void down_warn_msg_urge_todo_req(UPWarnMsgAdptInfo upWarnMsgAdptInfo) {
+ if (!redisTemplate.hasKey("login:" + upWarnMsgAdptInfo.getInferiorPlatformId())) {
+ log.error("链路还未登录校验,拒绝连接:{}", upWarnMsgAdptInfo.getInferiorPlatformId());
+ return;
+ }
int inferiorPlatformId = upWarnMsgAdptInfo.getInferiorPlatformId();
DOWNWarnMsgUrgeTodoReq downWarnMsgUrgeTodoReq = new DOWNWarnMsgUrgeTodoReq().build(upWarnMsgAdptInfo);
- log.info("报警督办请求({}):{}", DataType.DOWN_WARN_MSG_URGE_TODO_REQ.getCode(), JSON.toJSONString(downWarnMsgUrgeTodoReq));
+ downWarnMsgUrgeTodoReq.setDataType(DataType.DOWN_WARN_MSG_URGE_TODO_REQ.getCode());
+ downWarnMsgUrgeTodoReq.setDataLength(92);
byte[] body = downWarnMsgUrgeTodoReq.encode();
- OuterPacket out = new OuterPacket(DataType.DOWN_WARN_MSG_URGE_TODO_REQ.getCode(), body);
+ OuterPacket out = new OuterPacket(DataType.DOWN_WARN_MSG.getCode(), inferiorPlatformId, body);
//获取从链路通道
Channel channel = ChannelMap.getClientChannel(inferiorPlatformId);
if (null != channel && channel.isActive()) {
channel.writeAndFlush(out);
- channel.flush();
- } else {
- //重新连接从链路
- UPConnect ipAndPort = ChannelMap.getIpAndPort(inferiorPlatformId);
- connectReqService.downConnect(inferiorPlatformId, ipAndPort.getDownLinkIp(), ipAndPort.getDownLinkPort(), ipAndPort.getVerifyCode());
+ log.info("报警督办请求({}):{}", DataType.DOWN_WARN_MSG_URGE_TODO_REQ.getCode(), JSON.toJSONString(downWarnMsgUrgeTodoReq));
}
-
+ upWarnMsgAdptInfo.setPushTime(LocalDateTime.now().plusDays(7).toEpochSecond(ZoneOffset.ofHours(8)));
+ upWarnMsgAdptInfoDao.save(upWarnMsgAdptInfo);
}
+
+
+ /**
+ * 存储mqtt协议报警信息
+ */
+ public void saveWarnMsgService(JSONObject jsonObject) {
+ SubscriptionEvent subscriptionEvent = jsonObject.getObject("", SubscriptionEvent.class);
+ List<Event> events = subscriptionEvent.getEvents();
+ events.forEach(event -> {
+ //车牌号
+ String srcName = event.getSrcName();
+ Alarm data = JSONObject.parseObject(event.getData(), Alarm.class);
+ UPWarnMsgAdptInfo upWarnMsgAdptInfo = upWarnMsgAdptInfoDao.findByInfoIdIs(data.getAlarmId());
+ if(null == upWarnMsgAdptInfo){
+ upWarnMsgAdptInfo = new UPWarnMsgAdptInfo();
+ }
+ upWarnMsgAdptInfo.setVehicleNo(srcName);
+ upWarnMsgAdptInfo.setVehicleColor(data.getVehicleLicenseColor());
+ upWarnMsgAdptInfo.setWarnSrc(data.getSourceType() + 1);
+ upWarnMsgAdptInfo.setWarnType(getAlarmType(data.getEventType()));
+ upWarnMsgAdptInfo.setWarnTime(LocalDateTime.parse(data.getAlarmTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).toEpochSecond(ZoneOffset.ofHours(8)));
+ upWarnMsgAdptInfo.setInfoId(data.getAlarmId());
+ upWarnMsgAdptInfo.setLevel(getLevel(data.getLevel()));
+ upWarnMsgAdptInfo.setLongitude(data.getLongitude().intValue());
+ upWarnMsgAdptInfo.setLatitude(data.getLatitude().intValue());
+ upWarnMsgAdptInfo.setAltitude(data.getHeight().intValue() / 10);
+ upWarnMsgAdptInfo.setSpeed(data.getSpeed().intValue() / 10000);
+ upWarnMsgAdptInfo.setVec2(data.getSpeed().intValue());
+ upWarnMsgAdptInfo.setStatus(data.getStatus());
+ upWarnMsgAdptInfo.setDirection(data.getDirection() / 100);
+ upWarnMsgAdptInfo.setInfoContent(data.getAlarmInfo());
+ upWarnMsgAdptInfoDao.save(upWarnMsgAdptInfo);
+ });
+ }
+
+ public int getAlarmType(long eventType){
+ switch ((int) eventType){
+ case 0x00020501:
+ return 0x0301;
+ case 0x00020502:
+ return 0x0201;
+ case 0x00020503:
+ return 0x0202;
+ case 0x00020504:
+ return 0x0202;
+ case 0x00020505:
+ return 0x0202;
+ case 0x00020506:
+ return 0x0202;
+ case 0x00020507:
+ return 0x0203;
+ case 0x00020508:
+ return 0x0203;
+ case 0x00020509:
+ return 0x0204;
+ case 0x00020510:
+ return 0x0204;
+ case 0x00037017:
+ return 0x00FF;
+ case 0x00020511:
+ return 0x00FF;
+ case 0x00037005:
+ return 0x00FF;
+ case 0x00020512:
+ return 0x0002;
+ case 0x00020513:
+ return 0x0002;
+ case 0x00020514:
+ return 0x00FF;
+ case 0x00020515:
+ return 0x00FF;
+ case 0x00020516:
+ return 0x0214;
+ case 0x00020517:
+ return 0x0213;
+ case 0x00020519:
+ return 0x0215;
+ case 0x00020520:
+ return 0x0234;
+ case 0x00020521:
+ return 0x0234;
+ case 0x00020522:
+ return 0x0234;
+ case 0x00020523:
+ return 0x0241;
+ case 0x00020524:
+ return 0x0242;
+ case 0x00020525:
+ return 0x0243;
+ case 0x00020526:
+ return 0x0243;
+ case 0x00020531:
+ return 0x00FF;
+ case 0x00020532:
+ return 0x00FF;
+ case 0x00020533:
+ return 0x00FF;
+ case 0x00020534:
+ return 0x0218;
+ case 0x00020518:
+ return 0xA0FF;
+ case 0x00037001:
+ return 0xA0FF;
+ case 0x00037002:
+ return 0x0216;
+ case 0x00037006:
+ return 0x0216;
+ case 0x00020535:
+ return 0x0001;
+ case 0x00020536:
+ return 0x0001;
+ case 0x00020537:
+ return 0x000D;
+ case 0x00020102:
+ return 0x0102;
+ case 0x00020538:
+ return 0x0005;
+ case 0x00020539:
+ return 0x0004;
+ case 0x00020540:
+ return 0x0304;
+ case 0x00020541:
+ return 0x0304;
+ case 0x00020542:
+ return 0x000B;
+ case 0x00020543:
+ return 0x0001;
+ case 0x00020544:
+ return 0x0001;
+ case 0x00020545:
+ return 0xA001;
+ case 0x00020546:
+ return 0x0305;
+ case 0x00020547:
+ return 0x0305;
+ case 0x00020548:
+ return 0xA0FF;
+ case 0x00010202:
+ return 0x0103;
+ case 0x00010206:
+ return 0x0103;
+ case 0x00020101:
+ return 0x0101;
+ case 0x00020103:
+ return 0x000C;
+ case 0x00020549:
+ return 0xA00A;
+ case 0x00020550:
+ return 0xA00A;
+ case 0x00020551:
+ return 0xA00A;
+ case 0x00020552:
+ return 0x0308;
+ case 0x00020553:
+ return 0x0308;
+ case 0x00020554:
+ return 0x0308;
+ case 0x00020555:
+ return 0x0308;
+ case 0x00020556:
+ return 0x0308;
+ case 0x00020557:
+ return 0x0308;
+ case 0x00020558:
+ return 0x0308;
+ case 0x00020559:
+ return 0x0003;
+ case 0x00020527:
+ return 0x0301;
+ case 0x00020528:
+ return 0x0201;
+ case 0x00020529:
+ return 0x0302;
+ case 0x00020530:
+ return 0x0302;
+ case 0x00020560:
+ return 0x0009;
+ case 0x00020561:
+ return 0x0306;
+ case 0x00020562:
+ return 0x0307;
+ case 0x00090001:
+ return 0x00FF;
+ default:
+ return 0x00FF;
+ }
+ }
+
+ public int getLevel(String level){
+ switch (level){
+ case "h":
+ case "m":
+ return 1;
+ case "l":
+ case "w":
+ return 2;
+ default:
+ return 0;
+ }
+ }
+
+ /**
+ * 存储mqtt协议报警图片信息
+ */
+ public void saveWarnMsgPicService(JSONObject jsonObject) {
+ SubscriptionEvent subscriptionEvent = jsonObject.getObject("", SubscriptionEvent.class);
+ List<Event> events = subscriptionEvent.getEvents();
+ events.forEach(event -> {
+ AlarmPic data = JSONObject.parseObject(event.getData(), AlarmPic.class);
+ if(StringUtils.hasLength(data.getAlarmId())){
+ UPWarnMsgAdptInfo upWarnMsgAdptInfo = upWarnMsgAdptInfoDao.findByInfoIdIs(data.getAlarmId());
+ if(null != upWarnMsgAdptInfo){
+ upWarnMsgAdptInfo.setPicUrl(data.getUrl());
+ upWarnMsgAdptInfoDao.save(upWarnMsgAdptInfo);
+ }else{
+ upWarnMsgAdptInfo = new UPWarnMsgAdptInfo();
+ upWarnMsgAdptInfo.setInfoId(data.getAlarmId());
+ upWarnMsgAdptInfoDao.save(upWarnMsgAdptInfo);
+ }
+ }
+ });
+ }
+
+
+
}
--
Gitblit v1.7.1