| | |
| | | import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient; |
| | | import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient; |
| | | import com.ruoyi.chargingPile.api.model.TAccountingStrategyDetail; |
| | | import com.ruoyi.chargingPile.api.model.TChargingGun; |
| | | import com.ruoyi.chargingPile.api.vo.UpdateChargingPileStatusVo; |
| | | import com.ruoyi.integration.api.model.*; |
| | | import com.ruoyi.integration.drainage.TCECPushUtil; |
| | | import com.ruoyi.integration.iotda.constant.SendTagConstant; |
| | | import com.ruoyi.integration.mongodb.service.*; |
| | | import com.ruoyi.integration.rocket.model.*; |
| | | import com.ruoyi.integration.rocket.util.EnhanceMessageHandler; |
| | | import com.ruoyi.order.api.feignClient.ChargingOrderClient; |
| | | import com.ruoyi.order.api.model.TChargingOrder; |
| | | import com.ruoyi.order.api.query.UploadRealTimeMonitoringDataQuery; |
| | |
| | | import com.ruoyi.order.api.vo.SecurityDetectionVO; |
| | | import com.ruoyi.order.api.vo.TransactionRecordMessageVO; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.apache.rocketmq.spring.annotation.MessageModel; |
| | | import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; |
| | | import org.apache.rocketmq.spring.core.RocketMQListener; |
| | | import org.apache.rocketmq.client.producer.SendResult; |
| | | import org.springframework.beans.BeanUtils; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.cloud.stream.annotation.StreamListener; |
| | | import org.springframework.data.redis.core.RedisTemplate; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.StringUtils; |
| | |
| | | private QrCodeDeliveryReplyService qrCodeDeliveryReplyService; |
| | | @Autowired |
| | | private SecurityDetectionService securityDetectionService; |
| | | // @Autowired |
| | | // private TCECPushUtil tcecPushUtil; |
| | | @Autowired |
| | | private TCECPushUtil tcecPushUtil; |
| | | |
| | | @Resource |
| | | private ChargingPileClient chargingPileClient; |
| | |
| | | |
| | | @Resource |
| | | private RedisTemplate redisTemplate; |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | @Autowired |
| | | private EnhanceProduce enhanceProduce; |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | public void handleMessage(com.ruoyi.integration.rocket.model.ChargingMessage message){ |
| | | log.info("rocket收到的消息内容:{}",message); |
| | | log.info("收到的消息内容:{}",message); |
| | | String serviceId = message.getServiceId(); |
| | | if(!StringUtils.hasLength(serviceId)){ |
| | | return; |
| | | } |
| | | log.info("rocket收到的消息内容:{} {}", serviceId,message); |
| | | switch (serviceId){ |
| | | case SendTagConstant.ONLINE: |
| | | OnlineMessage onlineMessage = message.getOnlineMessage(); |
| | |
| | | pingService.save(ping); |
| | | //存储缓存中,5分钟有效 |
| | | redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); |
| | | |
| | | |
| | | UpdateChargingPileStatusVo vo1 = new UpdateChargingPileStatusVo(); |
| | | vo1.setGun_code(pingMessage.getCharging_gun_code()); |
| | | vo1.setPile_code(pingMessage.getCharging_pile_code()); |
| | | vo1.setStatus(pingMessage.getCharging_gun_status()); |
| | | chargingPileClient.updateChargingPileStatus(vo1); |
| | | |
| | | // ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); |
| | | // threadPoolExecutor.execute(new Runnable() { |
| | | // @Override |
| | | // public void run() { |
| | | // |
| | | // try { |
| | | //// tcecPushUtil.pushSuperviseNotificationStationStatus(chargingGunClient.getChargingGunByFullNumber(pingMessage.getCharging_pile_code()+pingMessage.getCharging_gun_code()).getData()); |
| | | // }catch (Exception e){ |
| | | // e.printStackTrace(); |
| | | // System.out.println("设备状态推送监管平台失败:"+e.getMessage()); |
| | | // } |
| | | // } |
| | | // }); |
| | | // 监管平台推送充电设备状态 |
| | | String gunCode = pingMessage.getCharging_pile_code() + pingMessage.getCharging_gun_code(); |
| | | ChargingMessage chargingMessage = new ChargingMessage(); |
| | | chargingMessage.setServiceId(SendTagConstant.GUN_STATUS); |
| | | GunStatusMessage gunStatusMessage = new GunStatusMessage(); |
| | | gunStatusMessage.setFullNumber(gunCode); |
| | | chargingMessage.setGunStatusMessage(gunStatusMessage); |
| | | SendResult sendResult = enhanceProduce.gunStatusMessage(chargingMessage); |
| | | break; |
| | | case SendTagConstant.END_CHARGE: |
| | | EndChargeMessage endChargeMessage = message.getEndChargeMessage(); |
| | |
| | | endChargeService.create(endCharge); |
| | | // 业务处理 |
| | | chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); |
| | | // ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); |
| | | // threadPoolExecutor1.execute(new Runnable() { |
| | | // @Override |
| | | // public void run() { |
| | | // try { |
| | | // TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); |
| | | // tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); |
| | | // tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); |
| | | // }catch (Exception e){ |
| | | // e.printStackTrace(); |
| | | // System.out.println("充电结束推送监管平台失败:"+e.getMessage()); |
| | | // } |
| | | // } |
| | | // }); |
| | | // 订单id |
| | | String transactionSerialNumber = endCharge.getTransaction_serial_number(); |
| | | ChargingOrderMessage chargingOrderMessage = new ChargingOrderMessage(); |
| | | chargingOrderMessage.setOrderNumber(transactionSerialNumber); |
| | | // 推送充电订单信息 |
| | | ChargingMessage chargingMessage1 = new ChargingMessage(); |
| | | chargingMessage1.setServiceId(SendTagConstant.ORDER_INFO); |
| | | chargingMessage1.setOrderMessage(chargingOrderMessage); |
| | | enhanceProduce.orderInfoMessage(chargingMessage1); |
| | | // 推送充电订单状态 |
| | | ChargingMessage chargingMessage2 = new ChargingMessage(); |
| | | chargingMessage2.setServiceId(SendTagConstant.ORDER_STATUS); |
| | | chargingMessage2.setOrderMessage(chargingOrderMessage); |
| | | enhanceProduce.orderStatusMessage(chargingMessage2); |
| | | break; |
| | | case SendTagConstant.ERROR_MESSAGE: |
| | | ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); |
| | |
| | | acquisitionBillingModeService.create(acquisitionBillingMode); |
| | | break; |
| | | case SendTagConstant.UPLOAD_REAL_TIME_MONITORING_DATA: |
| | | UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage(); |
| | | try { |
| | | UploadRealTimeMonitoringDataMessage uploadRealTimeMonitoringDataMessage = message.getUploadRealTimeMonitoringDataMessage(); |
| | | log.info("上传实时监测数据-业务消息处理:{}",uploadRealTimeMonitoringDataMessage); |
| | | // 持久化消息 |
| | | UploadRealTimeMonitoringData uploadRealTimeMonitoringData = new UploadRealTimeMonitoringData(); |
| | |
| | | uploadRealTimeMonitoringData.setOrderType(chargingOrder.getOrderType()); |
| | | uploadRealTimeMonitoringData.setSiteId(chargingOrder.getSiteId()); |
| | | uploadRealTimeMonitoringData.setStatus(chargingOrder.getStatus()); |
| | | // uploadRealTimeMonitoringData.setStartTime(chargingOrder.getStartTime()); |
| | | // uploadRealTimeMonitoringData.setEndTime(chargingOrder.getEndTime()); |
| | | int i = uploadRealTimeMonitoringDataService.create(uploadRealTimeMonitoringData); |
| | | if(i == 0){ |
| | | log.error("数据存储mongo失败"); |
| | | } |
| | | |
| | | |
| | | // 业务处理 |
| | | UploadRealTimeMonitoringDataQuery query = new UploadRealTimeMonitoringDataQuery(); |
| | | BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); |
| | | chargingOrderClient.chargeMonitoring(query); |
| | | // ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); |
| | | // threadPoolExecutor2.execute(new Runnable() { |
| | | // @Override |
| | | // public void run() { |
| | | // chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); |
| | | // |
| | | // tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); |
| | | // } |
| | | // }); |
| | | // 订单id |
| | | ChargingOrderMessage chargingOrderMessage3 = new ChargingOrderMessage(); |
| | | chargingOrderMessage3.setOrderNumber(chargingOrder.getCode()); |
| | | // 推送充电订单信息 |
| | | ChargingMessage chargingMessage4 = new ChargingMessage(); |
| | | chargingMessage4.setServiceId(SendTagConstant.ORDER_STATUS); |
| | | chargingMessage4.setOrderMessage(chargingOrderMessage3); |
| | | enhanceProduce.orderInfoMessage(chargingMessage4); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | |
| | | break; |
| | | case SendTagConstant.PARAMETER_SETTING: |
| | | ParameterSettingMessage parameterSettingMessage = message.getParameterSettingMessage(); |
| | | log.info("业务消息处理:{}",parameterSettingMessage); |
| | | log.info("参数配置-业务消息处理:{}",parameterSettingMessage); |
| | | // 持久化消息 |
| | | ParameterSetting parameterSetting = new ParameterSetting(); |
| | | BeanUtils.copyProperties(parameterSettingMessage,parameterSetting); |
| | |
| | | BmsAbort bmsAbort = new BmsAbort(); |
| | | BeanUtils.copyProperties(bmsAbortMessage,bmsAbort); |
| | | bmsAbortService.create(bmsAbort); |
| | | |
| | | ThreadPoolExecutor threadPoolExecutor3 = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); |
| | | threadPoolExecutor3.execute(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | // 业务处理 |
| | | chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); |
| | | } |
| | | }); |
| | | // 业务处理 |
| | | chargingOrderClient.excelEndCharge(bmsAbort.getTransaction_serial_number()); |
| | | break; |
| | | case SendTagConstant.MOTOR_ABORT: |
| | | MotorAbortMessage motorAbortMessage = message.getMotorAbortMessage(); |
| | |
| | | case SendTagConstant.TRANSACTION_RECORD: |
| | | TransactionRecordMessage transactionRecordMessage = message.getTransactionRecordMessage(); |
| | | log.info("交易记录-业务消息处理:{}",transactionRecordMessage); |
| | | transactionRecordMessage.setResult(JSONObject.toJSONString(message)); |
| | | transactionRecordMessage.setResult(JSONObject.toJSONString(transactionRecordMessage)); |
| | | // 持久化消息 |
| | | TransactionRecord transactionRecord = new TransactionRecord(); |
| | | BeanUtils.copyProperties(transactionRecordMessage,transactionRecord); |
| | | transactionRecord.setResult(transactionRecordMessage.getResult()); |
| | | transactionRecordService.create(transactionRecord); |
| | | |
| | | // 业务处理 |
| | |
| | | //失败后添加到队列中继续处理数据 |
| | | redisTemplate.opsForSet().add(SendTagConstant.TRANSACTION_RECORD, transactionRecordMessage.getTransaction_serial_number()); |
| | | } |
| | | |
| | | |
| | | // 添加实时上传记录结束记录 |
| | | // 查询mogondb上一条数据 |
| | |
| | | break; |
| | | default: |
| | | PlatformRemoteUpdateReplyMessage platformRemoteUpdateReplyMessage = message.getPlatformRemoteUpdateReplyMessage(); |
| | | log.info("远程更新应答-业务消息处理:{}",platformRemoteUpdateReplyMessage); |
| | | log.info("业务消息处理:{}",platformRemoteUpdateReplyMessage); |
| | | // 持久化消息 |
| | | PlatformRemoteUpdateReply platformRemoteUpdateReply = new PlatformRemoteUpdateReply(); |
| | | BeanUtils.copyProperties(platformRemoteUpdateReplyMessage,platformRemoteUpdateReply); |
| | | platformRemoteUpdateReplyService.create(platformRemoteUpdateReply); |