From 09d010da12e5dd22766cf0979f8f34c968ab6ffd Mon Sep 17 00:00:00 2001 From: xuhy <3313886187@qq.com> Date: 星期二, 10 九月 2024 11:39:36 +0800 Subject: [PATCH] mq修改 --- ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BillingModeVerifyMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsAbortMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TransactionRecordMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingHandshakeMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java | 3 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/GroundLockRealTimeDataMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SetupBillingModelReplyMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/WorkingParameterSettingReplyMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/EnhanceProduce.java | 57 +++++----- ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UpdateBalanceReplyMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/OnlineMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRestartReplyMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java | 11 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PingMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileReturnsGroundLockDataMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/MotorAbortMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileStartsChargingMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ClearOfflineCardReplyMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRemoteUpdateReplyMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/configuration/RocketMQEnhanceAutoConfiguration.java | 2 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsInformationMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QueryOfflineCardReplyMessageListener.java | 2 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsDemandAndChargerExportationMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/AcquisitionBillingModeMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStopChargingReplyMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ParameterSettingMessageListener.java | 2 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SynchronizeOfflineCardReplyMessageListener.java | 8 + ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java | 1 ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStartChargingReplyMessageListener.java | 8 + 31 files changed, 165 insertions(+), 105 deletions(-) diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java index 51caba0..9ebc93b 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java @@ -22,7 +22,6 @@ @Autowired private EnhanceProduce enhanceProduce; - /** * 设备消息监听 * @param jsonObject diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/configuration/RocketMQEnhanceAutoConfiguration.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/configuration/RocketMQEnhanceAutoConfiguration.java index bffae6a..378c5d4 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/configuration/RocketMQEnhanceAutoConfiguration.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/configuration/RocketMQEnhanceAutoConfiguration.java @@ -29,7 +29,7 @@ /** * 解决RocketMQ Jackson不支持Java时间类型配置 - * 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration} + * 源码参考:{@link org.apache.rocketmq.spring.autoconfigure} */ @Bean @Primary diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/AcquisitionBillingModeMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/AcquisitionBillingModeMessageListener.java index cc9a8cc..6286373 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/AcquisitionBillingModeMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/AcquisitionBillingModeMessageListener.java @@ -13,6 +13,7 @@ import com.ruoyi.integration.rocket.model.AcquisitionBillingModeMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -26,9 +27,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_acquisition_billing_mode", + topic = "charge_acquisition_billing_mode", + selectorExpression = "acquisition_billing_mode", // 明确指定标签 consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class AcquisitionBillingModeMessageListener extends EnhanceMessageHandler<AcquisitionBillingModeMessage> implements RocketMQListener<AcquisitionBillingModeMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BillingModeVerifyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BillingModeVerifyMessageListener.java index 6f6e9ad..685aece 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BillingModeVerifyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BillingModeVerifyMessageListener.java @@ -14,6 +14,7 @@ import com.ruoyi.integration.rocket.model.BillingModeVerifyMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -23,9 +24,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_billing_mode_verify", + topic = "charge_billing_mode_verify", + selectorExpression = "billing_mode_verify", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class BillingModeVerifyMessageListener extends EnhanceMessageHandler<BillingModeVerifyMessage> implements RocketMQListener<BillingModeVerifyMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsAbortMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsAbortMessageListener.java index 9d6f37c..ceeebfc 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsAbortMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsAbortMessageListener.java @@ -7,6 +7,7 @@ import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import com.ruoyi.order.api.feignClient.ChargingOrderClient; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -18,9 +19,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_bms_abort", + topic = "charge_bms_abort", + selectorExpression = "bms_abort", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class BmsAbortMessageListener extends EnhanceMessageHandler<BmsAbortMessage> implements RocketMQListener<BmsAbortMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsDemandAndChargerExportationMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsDemandAndChargerExportationMessageListener.java index b30a74e..e537b29 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsDemandAndChargerExportationMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsDemandAndChargerExportationMessageListener.java @@ -6,6 +6,7 @@ import com.ruoyi.integration.rocket.model.BmsDemandAndChargerExportationMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -15,9 +16,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_bms_demand_and_charger_exportation", + topic = "charge_bms_demand_and_charger_exportation", + selectorExpression = "bms_demand_and_charger_exportation", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class BmsDemandAndChargerExportationMessageListener extends EnhanceMessageHandler<BmsDemandAndChargerExportationMessage> implements RocketMQListener<BmsDemandAndChargerExportationMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsInformationMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsInformationMessageListener.java index 920823d..b90a685 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsInformationMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/BmsInformationMessageListener.java @@ -6,6 +6,7 @@ import com.ruoyi.integration.rocket.model.BmsInformationMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -15,9 +16,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_bms_information", + topic = "charge_bms_information", + selectorExpression = "bms_information", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class BmsInformationMessageListener extends EnhanceMessageHandler<BmsInformationMessage> implements RocketMQListener<BmsInformationMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingHandshakeMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingHandshakeMessageListener.java index a307eb1..2f5439e 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingHandshakeMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingHandshakeMessageListener.java @@ -6,6 +6,7 @@ import com.ruoyi.integration.rocket.model.ChargingHandshakeMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -15,9 +16,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_charging_handshake", + topic = "charge_charging_handshake", + selectorExpression = "charging_handshake", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class ChargingHandshakeMessageListener extends EnhanceMessageHandler<ChargingHandshakeMessage> implements RocketMQListener<ChargingHandshakeMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileReturnsGroundLockDataMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileReturnsGroundLockDataMessageListener.java index 5bb08b4..9964786 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileReturnsGroundLockDataMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileReturnsGroundLockDataMessageListener.java @@ -6,6 +6,7 @@ import com.ruoyi.integration.rocket.model.ChargingPileReturnsGroundLockDataMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -15,9 +16,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_charging_pile_returns_ground_lock_data", + topic = "charge_charging_pile_returns_ground_lock_data", + selectorExpression = "charging_pile_returns_ground_lock_data", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class ChargingPileReturnsGroundLockDataMessageListener extends EnhanceMessageHandler<ChargingPileReturnsGroundLockDataMessage> implements RocketMQListener<ChargingPileReturnsGroundLockDataMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileStartsChargingMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileStartsChargingMessageListener.java index 4aff733..903deaa 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileStartsChargingMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ChargingPileStartsChargingMessageListener.java @@ -6,6 +6,7 @@ import com.ruoyi.integration.rocket.model.ChargingPileStartsChargingMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -15,9 +16,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_charging_pile_starts_charging", + topic = "charge_charging_pile_starts_charging", + selectorExpression = "charging_pile_starts_charging", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class ChargingPileStartsChargingMessageListener extends EnhanceMessageHandler<ChargingPileStartsChargingMessage> implements RocketMQListener<ChargingPileStartsChargingMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ClearOfflineCardReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ClearOfflineCardReplyMessageListener.java index 3993ec9..8a82427 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ClearOfflineCardReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ClearOfflineCardReplyMessageListener.java @@ -6,6 +6,7 @@ import com.ruoyi.integration.rocket.model.ClearOfflineCardReplyMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -15,9 +16,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_clear_offline_card_reply", + topic = "charge_clear_offline_card_reply", + selectorExpression = "clear_offline_card_reply", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class ClearOfflineCardReplyMessageListener extends EnhanceMessageHandler<ClearOfflineCardReplyMessage> implements RocketMQListener<ClearOfflineCardReplyMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java index 819768c..19a3046 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/EndChargeMessageListener.java @@ -13,6 +13,7 @@ import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import com.ruoyi.order.api.feignClient.ChargingOrderClient; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -24,9 +25,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_end_charge", + topic = "charge_end_charge", + selectorExpression = "end_charge", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class EndChargeMessageListener extends EnhanceMessageHandler<EndChargeMessage> implements RocketMQListener<EndChargeMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/GroundLockRealTimeDataMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/GroundLockRealTimeDataMessageListener.java index cdf1114..91768db 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/GroundLockRealTimeDataMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/GroundLockRealTimeDataMessageListener.java @@ -6,6 +6,7 @@ import com.ruoyi.integration.rocket.model.GroundLockRealTimeDataMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -15,9 +16,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_ground_lock_real_time_data", + topic = "charge_ground_lock_real_time_data", + selectorExpression = "ground_lock_real_time_data", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class GroundLockRealTimeDataMessageListener extends EnhanceMessageHandler<GroundLockRealTimeDataMessage> implements RocketMQListener<GroundLockRealTimeDataMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/MotorAbortMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/MotorAbortMessageListener.java index 8b96993..e6c6dc8 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/MotorAbortMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/MotorAbortMessageListener.java @@ -7,6 +7,7 @@ import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import com.ruoyi.order.api.feignClient.ChargingOrderClient; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -18,9 +19,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_motor_abort", + topic = "charge_motor_abort", + selectorExpression = "motor_abort", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class MotorAbortMessageListener extends EnhanceMessageHandler<MotorAbortMessage> implements RocketMQListener<MotorAbortMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/OnlineMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/OnlineMessageListener.java index 6d3b5c1..47343b7 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/OnlineMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/OnlineMessageListener.java @@ -10,6 +10,7 @@ import com.ruoyi.integration.rocket.model.OnlineMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -19,9 +20,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_online", + topic = "charge_online", + selectorExpression = "online", // 明确指定标签 consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class OnlineMessageListener extends EnhanceMessageHandler<OnlineMessage> implements RocketMQListener<OnlineMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ParameterSettingMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ParameterSettingMessageListener.java index e006852..d5d2c33 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ParameterSettingMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/ParameterSettingMessageListener.java @@ -6,6 +6,7 @@ import com.ruoyi.integration.rocket.model.ParameterSettingMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -15,6 +16,7 @@ @Slf4j @Component @RocketMQMessageListener( + messageModel = MessageModel.CLUSTERING, consumerGroup = "enhance_consumer_group", topic = "rocket_enhance", selectorExpression = "*", diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PingMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PingMessageListener.java index f44ef06..6c0bc92 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PingMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PingMessageListener.java @@ -11,6 +11,7 @@ import com.ruoyi.integration.rocket.model.PingMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -20,9 +21,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_ping", + topic = "charge_ping", + selectorExpression = "ping", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class PingMessageListener extends EnhanceMessageHandler<PingMessage> implements RocketMQListener<PingMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRemoteUpdateReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRemoteUpdateReplyMessageListener.java index 3d56522..74f0e41 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRemoteUpdateReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRemoteUpdateReplyMessageListener.java @@ -6,6 +6,7 @@ import com.ruoyi.integration.rocket.model.PlatformRemoteUpdateReplyMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -15,9 +16,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_platform_remote_update_reply", + topic = "charge_platform_remote_update_reply", + selectorExpression = "platform_remote_update_reply", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class PlatformRemoteUpdateReplyMessageListener extends EnhanceMessageHandler<PlatformRemoteUpdateReplyMessage> implements RocketMQListener<PlatformRemoteUpdateReplyMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRestartReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRestartReplyMessageListener.java index 5d422d5..4da5259 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRestartReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformRestartReplyMessageListener.java @@ -6,6 +6,7 @@ import com.ruoyi.integration.rocket.model.PlatformRestartReplyMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -15,9 +16,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_platform_restart_reply", + topic = "charge_platform_restart_reply", + selectorExpression = "platform_restart_reply", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class PlatformRestartReplyMessageListener extends EnhanceMessageHandler<PlatformRestartReplyMessage> implements RocketMQListener<PlatformRestartReplyMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStartChargingReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStartChargingReplyMessageListener.java index 3ae7a6b..d60ef45 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStartChargingReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStartChargingReplyMessageListener.java @@ -6,6 +6,7 @@ import com.ruoyi.integration.rocket.model.PlatformStartChargingReplyMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -15,9 +16,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_platform_start_charging_reply", + topic = "charge_platform_start_charging_reply", + selectorExpression = "platform_start_charging_reply", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class PlatformStartChargingReplyMessageListener extends EnhanceMessageHandler<PlatformStartChargingReplyMessage> implements RocketMQListener<PlatformStartChargingReplyMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStopChargingReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStopChargingReplyMessageListener.java index 3222881..6fb3c54 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStopChargingReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/PlatformStopChargingReplyMessageListener.java @@ -6,6 +6,7 @@ import com.ruoyi.integration.rocket.model.PlatformStopChargingReplyMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -15,9 +16,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_platform_stop_charging_reply", + topic = "charge_platform_stop_charging_reply", + selectorExpression = "platform_stop_charging_reply", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class PlatformStopChargingReplyMessageListener extends EnhanceMessageHandler<PlatformStopChargingReplyMessage> implements RocketMQListener<PlatformStopChargingReplyMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QueryOfflineCardReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QueryOfflineCardReplyMessageListener.java index b740303..8f569da 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QueryOfflineCardReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/QueryOfflineCardReplyMessageListener.java @@ -6,6 +6,7 @@ import com.ruoyi.integration.rocket.model.QueryOfflineCardReplyMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -15,6 +16,7 @@ @Slf4j @Component @RocketMQMessageListener( + messageModel = MessageModel.CLUSTERING, consumerGroup = "enhance_consumer_group", topic = "rocket_enhance", selectorExpression = "*", diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SetupBillingModelReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SetupBillingModelReplyMessageListener.java index 7a2501c..1866383 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SetupBillingModelReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SetupBillingModelReplyMessageListener.java @@ -6,6 +6,7 @@ import com.ruoyi.integration.rocket.model.SetupBillingModelReplyMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -15,9 +16,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_setup_billing_model_reply", + topic = "charge_setup_billing_model_reply", + selectorExpression = "setup_billing_model_reply", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class SetupBillingModelReplyMessageListener extends EnhanceMessageHandler<SetupBillingModelReplyMessage> implements RocketMQListener<SetupBillingModelReplyMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SynchronizeOfflineCardReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SynchronizeOfflineCardReplyMessageListener.java index 70b14e2..52212cd 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SynchronizeOfflineCardReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/SynchronizeOfflineCardReplyMessageListener.java @@ -6,6 +6,7 @@ import com.ruoyi.integration.rocket.model.SynchronizeOfflineCardReplyMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -15,9 +16,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_synchronize_offline_card_reply", + topic = "charge_synchronize_offline_card_reply", + selectorExpression = "synchronize_offline_card_reply", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class SynchronizeOfflineCardReplyMessageListener extends EnhanceMessageHandler<SynchronizeOfflineCardReplyMessage> implements RocketMQListener<SynchronizeOfflineCardReplyMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java index 4269c93..68e1fcc 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TimingSettingMessageListener.java @@ -11,6 +11,7 @@ import com.ruoyi.integration.rocket.model.TimingSettingMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -22,9 +23,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_timing_setting", + topic = "charge_timing_setting", + selectorExpression = "timing_setting", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class TimingSettingMessageListener extends EnhanceMessageHandler<TimingSettingMessage> implements RocketMQListener<TimingSettingMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TransactionRecordMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TransactionRecordMessageListener.java index d0c6380..03ddbc6 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TransactionRecordMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/TransactionRecordMessageListener.java @@ -10,6 +10,7 @@ import com.ruoyi.integration.rocket.model.TransactionRecordMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -19,9 +20,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_transaction_record", + topic = "charge_transaction_record", + selectorExpression = "transaction_record", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class TransactionRecordMessageListener extends EnhanceMessageHandler<TransactionRecordMessage> implements RocketMQListener<TransactionRecordMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UpdateBalanceReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UpdateBalanceReplyMessageListener.java index 23ff0cb..23eb6bb 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UpdateBalanceReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UpdateBalanceReplyMessageListener.java @@ -6,6 +6,7 @@ import com.ruoyi.integration.rocket.model.UpdateBalanceReplyMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -15,9 +16,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_update_balance_reply", + topic = "charge_update_balance_reply", + selectorExpression = "update_balance_reply", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class UpdateBalanceReplyMessageListener extends EnhanceMessageHandler<UpdateBalanceReplyMessage> implements RocketMQListener<UpdateBalanceReplyMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java index dc6342c..6fd4244 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/UploadRealTimeMonitoringDataMessageListener.java @@ -11,6 +11,7 @@ import com.ruoyi.order.api.model.TChargingOrder; import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -25,9 +26,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_upload_real_time_monitoring_data", + topic = "charge_upload_real_time_monitoring_data", + selectorExpression = "upload_real_time_monitoring_data", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class UploadRealTimeMonitoringDataMessageListener extends EnhanceMessageHandler<UploadRealTimeMonitoringDataMessage> implements RocketMQListener<UploadRealTimeMonitoringDataMessage> { @@ -73,6 +75,9 @@ BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); chargingOrderClient.chargeMonitoring(query); + // 存储状态信息 + + } @Override diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/WorkingParameterSettingReplyMessageListener.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/WorkingParameterSettingReplyMessageListener.java index 212b38b..3e4f3fb 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/WorkingParameterSettingReplyMessageListener.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/listener/WorkingParameterSettingReplyMessageListener.java @@ -6,6 +6,7 @@ import com.ruoyi.integration.rocket.model.WorkingParameterSettingReplyMessage; import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.BeanUtils; @@ -15,9 +16,10 @@ @Slf4j @Component @RocketMQMessageListener( - consumerGroup = "enhance_consumer_group", - topic = "rocket_enhance", - selectorExpression = "*", + messageModel = MessageModel.CLUSTERING, + consumerGroup = "charge_working_parameter_setting_reply", + topic = "charge_working_parameter_setting_reply", + selectorExpression = "working_parameter_setting_reply", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class WorkingParameterSettingReplyMessageListener extends EnhanceMessageHandler<WorkingParameterSettingReplyMessage> implements RocketMQListener<WorkingParameterSettingReplyMessage> { diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/EnhanceProduce.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/EnhanceProduce.java index a851ff7..762cdbd 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/EnhanceProduce.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/produce/EnhanceProduce.java @@ -7,11 +7,12 @@ import com.ruoyi.integration.rocket.util.RocketMQEnhanceTemplate; import lombok.Setter; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; import java.util.UUID; @@ -23,7 +24,7 @@ @Setter(onMethod_ = @Autowired) private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; - private static final String TOPIC = "rocket_enhance"; + private static final String TOPIC = "charge_"; /** * 充电桩登录认证 @@ -34,7 +35,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.ONLINE); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.ONLINE, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.ONLINE, SendTagConstant.ONLINE, message); } /** @@ -46,7 +47,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.PING); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.PING, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.PING, SendTagConstant.PING, message); } /** * 充电结束 @@ -57,7 +58,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.END_CHARGE); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.END_CHARGE, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.END_CHARGE, SendTagConstant.END_CHARGE, message); } /** @@ -69,7 +70,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.BILLING_MODE_VERIFY); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.BILLING_MODE_VERIFY, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.BILLING_MODE_VERIFY, SendTagConstant.BILLING_MODE_VERIFY, message); } /** @@ -81,7 +82,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.ACQUISITION_BILLING_MODE); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.ACQUISITION_BILLING_MODE, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.ACQUISITION_BILLING_MODE, SendTagConstant.ACQUISITION_BILLING_MODE, message); } /** @@ -93,7 +94,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA, SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA, message); } /** @@ -105,7 +106,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.CHARGING_HANDSHAKE); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.CHARGING_HANDSHAKE, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.CHARGING_HANDSHAKE, SendTagConstant.CHARGING_HANDSHAKE, message); } /** @@ -117,7 +118,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.BMS_ABORT); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.BMS_ABORT, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.BMS_ABORT, SendTagConstant.BMS_ABORT, message); } /** @@ -129,7 +130,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.MOTOR_ABORT); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.MOTOR_ABORT, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.MOTOR_ABORT, SendTagConstant.MOTOR_ABORT, message); } /** @@ -141,7 +142,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION, SendTagConstant.BMS_DEMAND_AND_CHARGER_EXPORTATION, message); } /** @@ -153,7 +154,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.BMS_INFORMATION); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.BMS_INFORMATION, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.BMS_INFORMATION, SendTagConstant.BMS_INFORMATION, message); } /** @@ -165,7 +166,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.CHARGING_PILE_STARTS_CHARGING); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.CHARGING_PILE_STARTS_CHARGING, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.CHARGING_PILE_STARTS_CHARGING, SendTagConstant.CHARGING_PILE_STARTS_CHARGING, message); } /** @@ -177,7 +178,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.PLATFORM_START_CHARGING_REPLY); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.PLATFORM_START_CHARGING_REPLY, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.PLATFORM_START_CHARGING_REPLY, SendTagConstant.PLATFORM_START_CHARGING_REPLY, message); } /** @@ -189,7 +190,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.PLATFORM_STOP_CHARGING_REPLY); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.PLATFORM_STOP_CHARGING_REPLY, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.PLATFORM_STOP_CHARGING_REPLY, SendTagConstant.PLATFORM_STOP_CHARGING_REPLY, message); } /** @@ -201,7 +202,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.TRANSACTION_RECORD); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.TRANSACTION_RECORD, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.TRANSACTION_RECORD, SendTagConstant.TRANSACTION_RECORD, message); } /** @@ -213,7 +214,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.UPDATE_BALANCE_REPLY); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.UPDATE_BALANCE_REPLY, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.UPDATE_BALANCE_REPLY, SendTagConstant.UPDATE_BALANCE_REPLY, message); } /** @@ -225,7 +226,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY, SendTagConstant.SYNCHRONIZE_OFFLINE_CARD_REPLY, message); } /** @@ -237,7 +238,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.CLEAR_OFFLINE_CARD_REPLY); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.CLEAR_OFFLINE_CARD_REPLY, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.CLEAR_OFFLINE_CARD_REPLY, SendTagConstant.CLEAR_OFFLINE_CARD_REPLY, message); } /** @@ -249,7 +250,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.WORKING_PARAMETER_SETTING_REPLY); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.WORKING_PARAMETER_SETTING_REPLY, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.WORKING_PARAMETER_SETTING_REPLY, SendTagConstant.WORKING_PARAMETER_SETTING_REPLY, message); } /** @@ -261,7 +262,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.TIMING_SETTING); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.TIMING_SETTING, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.TIMING_SETTING, SendTagConstant.TIMING_SETTING, message); } /** @@ -273,7 +274,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.SETUP_BILLING_MODEL_REPLY); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.SETUP_BILLING_MODEL_REPLY, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.SETUP_BILLING_MODEL_REPLY, SendTagConstant.SETUP_BILLING_MODEL_REPLY, message); } /** @@ -285,7 +286,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.GROUND_LOCK_REAL_TIME_DATA); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.GROUND_LOCK_REAL_TIME_DATA, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.GROUND_LOCK_REAL_TIME_DATA, SendTagConstant.GROUND_LOCK_REAL_TIME_DATA, message); } /** @@ -297,7 +298,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA, SendTagConstant.CHARGING_PILE_RETURNS_GROUND_LOCK_DATA, message); } /** @@ -309,7 +310,7 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.PLATFORM_RESTART_REPLY); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.PLATFORM_RESTART_REPLY, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.PLATFORM_RESTART_REPLY, SendTagConstant.PLATFORM_RESTART_REPLY, message); } /** @@ -321,6 +322,6 @@ message.setKey(UUID.randomUUID().toString()); // 设置消息来源,便于查询 message.setSource(SendTagConstant.PLATFORM_REMOTE_UPDATE_REPLY); - return rocketMQEnhanceTemplate.send(TOPIC, SendTagConstant.PLATFORM_REMOTE_UPDATE_REPLY, message); + return rocketMQEnhanceTemplate.send(TOPIC+SendTagConstant.PLATFORM_REMOTE_UPDATE_REPLY, SendTagConstant.PLATFORM_REMOTE_UPDATE_REPLY, message); } } \ No newline at end of file diff --git a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java index dad0f38..69773c3 100644 --- a/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java +++ b/ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/rocket/util/RocketMQEnhanceTemplate.java @@ -24,6 +24,9 @@ private RocketEnhanceProperties rocketEnhanceProperties; public RocketMQTemplate getTemplate() { +// DefaultMQProducer producer = new DefaultMQProducer(); +// producer.setProducerGroup("enhance_consumer_group"); +// template.setProducer(producer); return template; } -- Gitblit v1.7.1