| | |
| | | import com.ruoyi.integration.rocket.model.BillingModeVerifyMessage; |
| | | import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.apache.rocketmq.spring.annotation.MessageModel; |
| | | import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; |
| | | import org.apache.rocketmq.spring.core.RocketMQListener; |
| | | import org.springframework.beans.BeanUtils; |
| | |
| | | @Slf4j |
| | | @Component |
| | | @RocketMQMessageListener( |
| | | consumerGroup = "enhance_consumer_group", |
| | | topic = "rocket_enhance", |
| | | selectorExpression = "*", |
| | | messageModel = MessageModel.CLUSTERING, |
| | | consumerGroup = "charge_billing_mode_verify", |
| | | topic = "charge_billing_mode_verify", |
| | | selectorExpression = "billing_mode_verify", |
| | | consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 |
| | | ) |
| | | public class BillingModeVerifyMessageListener extends EnhanceMessageHandler<BillingModeVerifyMessage> implements RocketMQListener<BillingModeVerifyMessage> { |
| | | |
| | | @Autowired |
| | | private BillingModeVerifyService billingModeVerifyService; |
| | | @Autowired |
| | | private IotMessageProduce iotMessageProduce; |
| | | @Autowired |
| | | private MessageUtil messageUtil; |
| | | @Autowired |
| | | private AccountingStrategyDetailClient accountingStrategyDetailClient; |
| | | |
| | | @Override |
| | | protected void handleMessage(BillingModeVerifyMessage message) throws Exception { |
| | |
| | | BeanUtils.copyProperties(message,billingModeVerify); |
| | | billingModeVerifyService.create(billingModeVerify); |
| | | // 业务处理 |
| | | BillingModeVerifyReply billingModeVerifyReply = new BillingModeVerifyReply(); |
| | | if(message.getBilling_model_code().equals("0")){ |
| | | // 首次 |
| | | billingModeVerifyReply.setCharging_pile_code(billingModeVerify.getCharging_pile_code()); |
| | | billingModeVerifyReply.setBilling_model_code("0"); |
| | | billingModeVerifyReply.setBilling_model_result(1); |
| | | }else { |
| | | // 查询桩使用的模版 |
| | | CheckChargingStrategyDTO dto = new CheckChargingStrategyDTO(); |
| | | dto.setCode(message.getBilling_model_code()); |
| | | dto.setStrategyDetailId(Integer.valueOf(message.getBilling_model_code())); |
| | | Boolean check = accountingStrategyDetailClient.checkChargingStrategy(dto).getData(); |
| | | // 校验计费模版是否准确 |
| | | billingModeVerifyReply.setCharging_pile_code(billingModeVerify.getCharging_pile_code()); |
| | | billingModeVerifyReply.setBilling_model_code(message.getBilling_model_code()); |
| | | if(check){ |
| | | billingModeVerifyReply.setBilling_model_result(0); |
| | | }else { |
| | | billingModeVerifyReply.setBilling_model_result(1); |
| | | } |
| | | } |
| | | iotMessageProduce.sendMessage(billingModeVerifyReply.getCharging_pile_code(), ServiceIdMenu.BILLING_MODE_VERIFY_REPLY.getKey(),messageUtil.billingModeVerifyReply(billingModeVerifyReply)); |
| | | } |
| | | |
| | | @Override |