| | |
| | | import com.ruoyi.integration.api.feignClient.TCECClient; |
| | | import com.ruoyi.integration.api.model.Online; |
| | | import com.ruoyi.integration.api.model.UploadRealTimeMonitoringData; |
| | | import com.ruoyi.integration.iotda.constant.SendTagConstant; |
| | | import com.ruoyi.integration.mongodb.service.UploadRealTimeMonitoringDataService; |
| | | import com.ruoyi.integration.rocket.model.ChargingMessage; |
| | | import com.ruoyi.integration.rocket.model.ChargingOrderMessage; |
| | | import com.ruoyi.integration.rocket.model.UploadRealTimeMonitoringDataMessage; |
| | | import com.ruoyi.integration.rocket.produce.EnhanceProduce; |
| | | import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; |
| | | import com.ruoyi.order.api.feignClient.ChargingOrderClient; |
| | | import com.ruoyi.order.api.model.TChargingOrder; |
| | |
| | | messageModel = MessageModel.CLUSTERING, |
| | | consumerGroup = "charge_upload_real_time_monitoring_data", |
| | | topic = "charge_upload_real_time_monitoring_data", |
| | | selectorExpression = "upload_real_time_monitoring_data", |
| | | consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 |
| | | selectorExpression = "upload_real_time_monitoring_data" |
| | | ) |
| | | public class UploadRealTimeMonitoringDataMessageListener extends EnhanceMessageHandler<UploadRealTimeMonitoringDataMessage> implements RocketMQListener<UploadRealTimeMonitoringDataMessage> { |
| | | public class UploadRealTimeMonitoringDataMessageListener implements RocketMQListener<UploadRealTimeMonitoringDataMessage> { |
| | | |
| | | @Autowired |
| | | private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; |
| | |
| | | private ChargingOrderClient chargingOrderClient; |
| | | @Resource |
| | | private AccountingStrategyDetailClient accountingStrategyDetailClient; |
| | | @Resource |
| | | private ChargingGunClient chargingGunClient; |
| | | @Resource |
| | | private FaultMessageClient faultMessageClient; |
| | | @Autowired |
| | | private EnhanceProduce enhanceProduce; |
| | | |
| | | |
| | | |
| | | @Resource |
| | | private TCECClient tcecClient; |
| | | |
| | | |
| | | @Override |
| | | protected void handleMessage(UploadRealTimeMonitoringDataMessage message) throws Exception { |
| | | // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 |
| | | log.info("上传实时监测数据-业务消息处理:{}",message); |
| | | // 持久化消息 |
| | | UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); |
| | | BeanUtils.copyProperties(message,uploadRealTimeMonitoringData); |
| | | // 查询mogondb上一条数据 |
| | | UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(message.getTransaction_serial_number()); |
| | | // 查询订单 |
| | | TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData(); |
| | | // 查询当前时间段的计费策略 |
| | | TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData(); |
| | | uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence()); |
| | | uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge()); |
| | | if (Objects.nonNull(data)) { |
| | | uploadRealTimeMonitoringData.setLast_time(data.getLast_time()); |
| | | uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount().divide(data.getPaid_amount())); |
| | | uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree().divide(data.getCharging_degree())); |
| | | uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); |
| | | }else { |
| | | log.info("首次上传实时监测数据"); |
| | | uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount()); |
| | | uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree()); |
| | | uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); |
| | | } |
| | | uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); |
| | | // 业务处理 |
| | | UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); |
| | | BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); |
| | | chargingOrderClient.chargeMonitoring(query); |
| | | GetChargingGunByCode code = new GetChargingGunByCode(); |
| | | code.setCharging_pile_code(message.getCharging_pile_code()); |
| | | code.setCharging_gun_code(message.getCharging_gun_code()); |
| | | TChargingGun chargingGun = chargingGunClient.getChargingGunByCode(code).getData(); |
| | | if(Objects.nonNull(chargingGun)){ |
| | | // 存储状态信息 |
| | | TFaultMessage faultMessage = new TFaultMessage(); |
| | | if(message.getCharging_gun_status().equals(0) || message.getCharging_gun_status().equals(1)){ |
| | | faultMessage.setSiteId(chargingGun.getSiteId()); |
| | | faultMessage.setChargingPileId(chargingGun.getChargingPileId()); |
| | | faultMessage.setChargingGunId(chargingGun.getId()); |
| | | switch (message.getCharging_gun_status()){ |
| | | case 0: |
| | | faultMessage.setStatus(1); |
| | | chargingGun.setStatus(1); |
| | | break; |
| | | case 1: |
| | | faultMessage.setStatus(2); |
| | | chargingGun.setStatus(7); |
| | | break; |
| | | } |
| | | faultMessage.setDownTime(LocalDateTime.now()); |
| | | faultMessageClient.createFaultMessage(faultMessage); |
| | | }else { |
| | | switch (message.getCharging_gun_status()){ |
| | | case 2: |
| | | chargingGun.setStatus(2); |
| | | break; |
| | | case 3: |
| | | chargingGun.setStatus(4); |
| | | break; |
| | | } |
| | | // 空闲 充电 查询是否该设备之前存在离线记录或者故障记录 |
| | | faultMessage = faultMessageClient.getFaultMessageByGunId(chargingGun.getId()).getData(); |
| | | if(Objects.nonNull(faultMessage)){ |
| | | faultMessage.setEndTime(LocalDateTime.now()); |
| | | faultMessageClient.updateFaultMessage(faultMessage); |
| | | } |
| | | } |
| | | chargingGunClient.updateChargingGunById(chargingGun); |
| | | //推送状态给三方平台 |
| | | tcecClient.pushChargingGunStatus(chargingGun.getFullNumber(), chargingGun.getStatus()); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | protected void handleMaxRetriesExceeded(UploadRealTimeMonitoringDataMessage message) { |
| | | // 当超过指定重试次数消息时此处方法会被调用 |
| | | // 生产中可以进行回退或其他业务操作 |
| | | log.error("消息消费失败,请执行后续处理"); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 是否执行重试机制 |
| | | */ |
| | | @Override |
| | | protected boolean isRetry() { |
| | | return true; |
| | | } |
| | | |
| | | @Override |
| | | protected boolean throwException() { |
| | | // 是否抛出异常,false搭配retry自行处理异常 |
| | | return false; |
| | | } |
| | | |
| | | /** |
| | | * 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理 |
| | | * @param message 待处理消息 |
| | | * @return true: 本次消息被过滤,false:不过滤 |
| | | */ |
| | | @Override |
| | | protected boolean filter(UploadRealTimeMonitoringDataMessage message) { |
| | | // 此处可做消息过滤 |
| | | return false; |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 |
| | | */ |
| | | @Override |
| | | public void onMessage(UploadRealTimeMonitoringDataMessage message) { |
| | | super.dispatchMessage(message); |
| | | try { |
| | | log.info("上传实时监测数据-业务消息处理:{}",message); |
| | | // 持久化消息 |
| | | UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); |
| | | BeanUtils.copyProperties(message,uploadRealTimeMonitoringData); |
| | | // 查询mogondb上一条数据 |
| | | UploadRealTimeMonitoringData data = uploadRealTimeMonitoringDataService.getLastDataById(message.getTransaction_serial_number()); |
| | | // 查询订单 |
| | | TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(message.getTransaction_serial_number()).getData(); |
| | | // 查询当前时间段的计费策略 |
| | | TAccountingStrategyDetail accountingStrategyDetail = accountingStrategyDetailClient.getDetailBySiteId(chargingOrder.getSiteId()).getData(); |
| | | uploadRealTimeMonitoringData.setElectrovalence_all(accountingStrategyDetail.getElectrovalence()); |
| | | uploadRealTimeMonitoringData.setService_charge(accountingStrategyDetail.getServiceCharge()); |
| | | if (Objects.nonNull(data)) { |
| | | uploadRealTimeMonitoringDataService.updateById(data.getId()); |
| | | uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount().subtract(data.getPaid_amount())); |
| | | uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree().subtract(data.getCharging_degree())); |
| | | uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); |
| | | }else { |
| | | log.info("首次上传实时监测数据"); |
| | | uploadRealTimeMonitoringData.setPeriod_electric_price(message.getPaid_amount()); |
| | | uploadRealTimeMonitoringData.setPeriod_charging_degree(message.getCharging_degree()); |
| | | uploadRealTimeMonitoringData.setPeriod_service_price(message.getCharging_degree().multiply(accountingStrategyDetail.getServiceCharge()).setScale(4, RoundingMode.HALF_UP)); |
| | | } |
| | | uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType()); |
| | | uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId()); |
| | | uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus()); |
| | | int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); |
| | | if(i == 0){ |
| | | log.error("数据存储mongo失败"); |
| | | } |
| | | |
| | | // 业务处理 |
| | | UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); |
| | | BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); |
| | | chargingOrderClient.chargeMonitoring(query); |
| | | // 订单id |
| | | ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage(); |
| | | chargingOrderMessage3.setOrderNumber(chargingOrder.getCode()); |
| | | // 推送充电订单信息 |
| | | ChargingMessage chargingMessage4 = new ChargingMessage(); |
| | | chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS); |
| | | chargingMessage4.setOrderMessage(chargingOrderMessage3); |
| | | enhanceProduce.orderInfoMessage(chargingMessage4); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | } |