From be06cd6731dd2a5d1a9bbbd3d12a2f7b3a00c966 Mon Sep 17 00:00:00 2001 From: Pu Zhibing <393733352@qq.com> Date: 星期一, 14 七月 2025 09:30:00 +0800 Subject: [PATCH] 移除mq --- ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java | 106 +--------------------------------------------------- 1 files changed, 3 insertions(+), 103 deletions(-) diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java index 048678b..67e8379 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java @@ -12,25 +12,20 @@ import com.ruoyi.integration.api.model.*; import com.ruoyi.integration.iotda.constant.SendTagConstant; import com.ruoyi.integration.iotda.enums.ServiceIdMenu; +import com.ruoyi.integration.iotda.model.*; import com.ruoyi.integration.iotda.utils.produce.IotMessageProduce; import com.ruoyi.integration.iotda.utils.tools.CP56Time2aConverter; import com.ruoyi.integration.iotda.utils.tools.MessageUtil; import com.ruoyi.integration.iotda.utils.tools.StrategyUtil; -import com.ruoyi.integration.rocket.model.*; -import com.ruoyi.integration.rocket.produce.ChargingMessageUtil; -import com.ruoyi.integration.rocket.produce.EnhanceProduce; +import com.ruoyi.integration.iotda.utils.tools.ChargingMessageUtil; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.producer.SendResult; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; -import java.io.File; -import java.io.FileOutputStream; -import java.io.FileWriter; import java.io.IOException; import java.math.BigDecimal; import java.util.*; @@ -44,8 +39,6 @@ @RequestMapping("/iot") public class IotMessageListener { - @Autowired - private EnhanceProduce enhanceProduce; @Autowired private MessageUtil messageUtil; @Autowired @@ -93,7 +86,6 @@ String productId = header.getString("product_id"); // 产品id String service_id = content.getString("service_id"); log.info("服务id:{}",service_id); - SendResult sendResult = null; ChargingMessage chargingMessage = new ChargingMessage(); chargingMessage.setServiceId(service_id); // 设备消息下发 @@ -101,9 +93,6 @@ switch (service_id){ case SendTagConstant.ONLINE: OnlineMessage onlineMessage = JSON.parseObject(content.toJSONString(),OnlineMessage.class); - //走rocketmq -// sendResult = enhanceProduce.onlineMessage(onlineMessage); - //直连 chargingMessage.setOnlineMessage(onlineMessage); chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 @@ -123,6 +112,7 @@ PingMessage pingMessage = JSON.parseObject(content.toJSONString(),PingMessage.class); //存储缓存中,5分钟有效 redisTemplate.opsForValue().set("ping:" + pingMessage.getCharging_pile_code() + pingMessage.getCharging_gun_code(), pingMessage, 5, TimeUnit.MINUTES); + // 响应硬件 Pong pong = new Pong(); pong.setCharging_pile_code(pingMessage.getCharging_pile_code()); @@ -135,36 +125,19 @@ vo1.setPile_code(pingMessage.getCharging_pile_code()); vo1.setStatus(pingMessage.getCharging_gun_status()); chargingPileClient.updateChargingPileStatus(vo1); - // 监管平台推送充电设备状态 - String gunCode = pingMessage.getCharging_pile_code() + pingMessage.getCharging_gun_code(); - ChargingMessage chargingMessage1 = new ChargingMessage(); - chargingMessage1.setServiceId(SendTagConstant.GUN_STATUS); - GunStatusMessage gunStatusMessage = new GunStatusMessage(); - gunStatusMessage.setFullNumber(gunCode); - chargingMessage1.setGunStatusMessage(gunStatusMessage); - sendResult = enhanceProduce.gunStatusMessage(chargingMessage1); break; case SendTagConstant.END_CHARGE: EndChargeMessage endChargeMessage = JSON.parseObject(content.toJSONString(),EndChargeMessage.class); - //走rocketmq -// sendResult = enhanceProduce.endChargeMessage(endChargeMessage); - //直连 chargingMessage.setEndChargeMessage(endChargeMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.ERROR_MESSAGE: ErrorMessageMessage errorMessageMessage = JSON.parseObject(content.toJSONString(),ErrorMessageMessage.class); - //走rocketmq -// sendResult = enhanceProduce.errorMessageMessage(errorMessageMessage); - //直连 chargingMessage.setErrorMessageMessage(errorMessageMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.BILLING_MODE_VERIFY: BillingModeVerifyMessage billingModeVerifyMessage = JSON.parseObject(content.toJSONString(),BillingModeVerifyMessage.class); - //走rocketmq -// sendResult = enhanceProduce.billingModeVerifyMessage(billingModeVerifyMessage); - //直连 chargingMessage.setBillingModeVerifyMessage(billingModeVerifyMessage); chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 @@ -193,9 +166,6 @@ break; case SendTagConstant.ACQUISITION_BILLING_MODE: AcquisitionBillingModeMessage acquisitionBillingModeMessage = JSON.parseObject(content.toJSONString(),AcquisitionBillingModeMessage.class); - //走rocketmq -// sendResult = enhanceProduce.acquisitionBillingModeMessage(acquisitionBillingModeMessage); - //直连 chargingMessage.setAcquisitionBillingModeMessage(acquisitionBillingModeMessage); chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 计费模型请求应答 1=尖阶段,2=峰阶段,3=平阶段,4=谷阶段 @@ -215,65 +185,41 @@ case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: log.info("充电实时数据上传"); UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = JSON.parseObject(content.toJSONString(),UploadRealTimeMonitoringDataMessage.class); - //走rocketmq -// sendResult = enhanceProduce.uploadRealTimeMonitoringDataMessage(uploadRealTimeMonitoringDataMessage); - //直连 chargingMessage.setUploadRealTimeMonitoringDataMessage(uploadRealTimeMonitoringDataMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.CHARGING_HANDSHAKE: ChargingHandshakeMessage chargingHandshakeMessage = JSON.parseObject(content.toJSONString(),ChargingHandshakeMessage.class); - //走rocketmq -// sendResult = enhanceProduce.chargingHandshakeMessage(chargingHandshakeMessage); - //直连 chargingMessage.setChargingHandshakeMessage(chargingHandshakeMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.PARAMETER_SETTING: ParameterSettingMessage parameterSettingMessage = JSON.parseObject(content.toJSONString(),ParameterSettingMessage.class); - //走rocketmq -// sendResult = enhanceProduce.parameterSettingMessage(parameterSettingMessage); - //直连 chargingMessage.setParameterSettingMessage(parameterSettingMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.BMS_ABORT: BmsAbortMessage bmsAbortMessage = JSON.parseObject(content.toJSONString(),BmsAbortMessage.class); - //走rocketmq -// sendResult = enhanceProduce.bmsAbortMessage(bmsAbortMessage); - //直连 chargingMessage.setBmsAbortMessage(bmsAbortMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.MOTOR_ABORT: MotorAbortMessage motorAbortMessage = JSON.parseObject(content.toJSONString(),MotorAbortMessage.class); - //走rocketmq -// sendResult = enhanceProduce.motorAbortMessage(motorAbortMessage); - //直连 chargingMessage.setMotorAbortMessage(motorAbortMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION: BmsDemandAndChargerExportationMessage bmsDemandAndChargerExportationMessage = JSON.parseObject(content.toJSONString(),BmsDemandAndChargerExportationMessage.class); - //走rocketmq -// sendResult = enhanceProduce.bmsDemandAndChargerExportationMessage(bmsDemandAndChargerExportationMessage); - //直连 chargingMessage.setBmsDemandAndChargerExportationMessage(bmsDemandAndChargerExportationMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.BMS_INFORMATION: BmsInformationMessage bmsInformationMessage = JSON.parseObject(content.toJSONString(),BmsInformationMessage.class); - //走rocketmq -// sendResult = enhanceProduce.bmsInformationMessage(bmsInformationMessage); - //直连 chargingMessage.setBmsInformationMessage(bmsInformationMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.CHARGING_PILE_STARTS_CHARGING: ChargingPileStartsChargingMessage chargingPileStartsChargingMessage = JSON.parseObject(content.toJSONString(),ChargingPileStartsChargingMessage.class); - //走rocketmq -// sendResult = enhanceProduce.chargingPileStartsChargingMessage(chargingPileStartsChargingMessage); - //直连 chargingMessage.setChargingPileStartsChargingMessage(chargingPileStartsChargingMessage); chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 @@ -288,26 +234,17 @@ break; case SendTagConstant.PLATFORM_START_CHARGING_REPLY: PlatformStartChargingReplyMessage platformStartChargingReplyMessage = JSON.parseObject(content.toJSONString(),PlatformStartChargingReplyMessage.class); - //走rocketmq -// sendResult = enhanceProduce.platformStartChargingReplyMessage(platformStartChargingReplyMessage); - //直连 chargingMessage.setPlatformStartChargingReplyMessage(platformStartChargingReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.PLATFORM_STOP_CHARGING_REPLY: PlatformStopChargingReplyMessage platformStopChargingReplyMessage = JSON.parseObject(content.toJSONString(),PlatformStopChargingReplyMessage.class); - //走rocketmq -// sendResult = enhanceProduce.platformStopChargingReplyMessage(platformStopChargingReplyMessage); - //直连 chargingMessage.setPlatformStopChargingReplyMessage(platformStopChargingReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.TRANSACTION_RECORD: TransactionRecordMessage transactionRecordMessage = JSON.parseObject(content.toJSONString(),TransactionRecordMessage.class); transactionRecordMessage.setResult(content.toJSONString()); - //走rocketmq -// sendResult = enhanceProduce.transactionRecordMessage(transactionRecordMessage); - //直连 chargingMessage.setTransactionRecordMessage(transactionRecordMessage); chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 @@ -318,41 +255,26 @@ break; case SendTagConstant.UPDATE_BALANCE_REPLY: UpdateBalanceReplyMessage updateBalanceReplyMessage = JSON.parseObject(content.toJSONString(),UpdateBalanceReplyMessage.class); - //走rocketmq -// sendResult = enhanceProduce.updateBalanceReplyMessage(updateBalanceReplyMessage); - //直连 chargingMessage.setUpdateBalanceReplyMessage(updateBalanceReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY: SynchronizeOfflineCardReplyMessage synchronizeOfflineCardReplyMessage = JSON.parseObject(content.toJSONString(),SynchronizeOfflineCardReplyMessage.class); - //走rocketmq -// sendResult = enhanceProduce.synchronizeOfflineCardReplyMessage(synchronizeOfflineCardReplyMessage); - //直连 chargingMessage.setSynchronizeOfflineCardReplyMessage(synchronizeOfflineCardReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.CLEAR_OFFLINE_CARD_REPLY: ClearOfflineCardReplyMessage clearOfflineCardReplyMessage = JSON.parseObject(content.toJSONString(),ClearOfflineCardReplyMessage.class); - //走rocketmq -// sendResult = enhanceProduce.clearOfflineCardReplyMessage(clearOfflineCardReplyMessage); - //直连 chargingMessage.setClearOfflineCardReplyMessage(clearOfflineCardReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.WORKING_PARAMETER_SETTING_REPLY: WorkingParameterSettingReplyMessage workingParameterSettingReplyMessage = JSON.parseObject(content.toJSONString(),WorkingParameterSettingReplyMessage.class); - //走rocketmq -// sendResult = enhanceProduce.workingParameterSettingReplyMessage(workingParameterSettingReplyMessage); - //直连 chargingMessage.setWorkingParameterSettingReplyMessage(workingParameterSettingReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.TIMING_SETTING: TimingSettingMessage timingSettingMessage = JSON.parseObject(content.toJSONString(),TimingSettingMessage.class); - //走rocketmq -// sendResult = enhanceProduce.timingSettingMessage(timingSettingMessage); - //直连 chargingMessage.setTimingSettingMessage(timingSettingMessage); chargingMessageUtil.handleMessage(chargingMessage); // 响应硬件 对时设置应答 @@ -363,62 +285,40 @@ break; case SendTagConstant.SETUP_BILLING_MODEL_REPLY: SetupBillingModelReplyMessage setupBillingModelReplyMessage = JSON.parseObject(content.toJSONString(),SetupBillingModelReplyMessage.class); - //走rocketmq -// sendResult = enhanceProduce.setupBillingModelReplyMessage(setupBillingModelReplyMessage); - //直连 chargingMessage.setSetupBillingModelReplyMessage(setupBillingModelReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.GROUND_LOCK_REAL_TIME_DATA: GroundLockRealTimeDataMessage groundLockRealTimeDataMessage = JSON.parseObject(content.toJSONString(),GroundLockRealTimeDataMessage.class); - //走rocketmq -// sendResult = enhanceProduce.groundLockRealTimeDataMessage(groundLockRealTimeDataMessage); - //直连 chargingMessage.setGroundLockRealTimeDataMessage(groundLockRealTimeDataMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA: ChargingPileReturnsGroundLockDataMessage chargingPileReturnsGroundLockDataMessage = JSON.parseObject(content.toJSONString(),ChargingPileReturnsGroundLockDataMessage.class); - //走rocketmq -// sendResult = enhanceProduce.chargingPileReturnsGroundLockDataMessage(chargingPileReturnsGroundLockDataMessage); - //直连 chargingMessage.setChargingPileReturnsGroundLockDataMessage(chargingPileReturnsGroundLockDataMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.PLATFORM_RESTART_REPLY: PlatformRestartReplyMessage platformRestartReplyMessage = JSON.parseObject(content.toJSONString(),PlatformRestartReplyMessage.class); - //走rocketmq -// sendResult = enhanceProduce.platformRestartReplyMessage(platformRestartReplyMessage); - //直连 chargingMessage.setPlatformRestartReplyMessage(platformRestartReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.QR_CODE_DELIVERY_REPLY: QrCodeDeliveryReplyMessage qrCodeDeliveryReplyMessage = JSON.parseObject(content.toJSONString(),QrCodeDeliveryReplyMessage.class); - //走rocketmq -// sendResult = enhanceProduce.qrCodeDeliveryReplyMessage(qrCodeDeliveryReplyMessage); - //直连 chargingMessage.setQrCodeDeliveryReplyMessage(qrCodeDeliveryReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); break; case SendTagConstant.SECURITY_DETECTION: SecurityDetectionMessage securityDetectionMessage = JSON.parseObject(content.toJSONString(),SecurityDetectionMessage.class); - //走rocketmq -// sendResult = enhanceProduce.securityDetectionMessage(securityDetectionMessage); - //直连 chargingMessage.setSecurityDetectionMessage(securityDetectionMessage); chargingMessageUtil.handleMessage(chargingMessage); break; default: PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = JSON.parseObject(content.toJSONString(),PlatformRemoteUpdateReplyMessage.class); - //走rocketmq -// sendResult = enhanceProduce.platformRemoteUpdateReplyMessage(platformRemoteUpdateReplyMessage); - //直连 chargingMessage.setPlatformRemoteUpdateReplyMessage(platformRemoteUpdateReplyMessage); chargingMessageUtil.handleMessage(chargingMessage); break; } - log.info("rocketmq消息下发结果:{}",sendResult); return AjaxResult.success(); } -- Gitblit v1.7.1