| | |
| | | consumerGroup = "charge_charging_message", |
| | | topic = "charge_charging_message", |
| | | selectorExpression = "charging_message", |
| | | consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 |
| | | consumeThreadMax = 64 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 |
| | | ) |
| | | public class ChargingMessageListener extends EnhanceMessageHandler<ChargingMessage> implements RocketMQListener<ChargingMessage> { |
| | | |
| | |
| | | @StreamListener("input") |
| | | @Override |
| | | protected void handleMessage(ChargingMessage message) throws Exception { |
| | | log.info("rocket收到的消息内容:{}",message); |
| | | String serviceId = message.getServiceId(); |
| | | if(!StringUtils.hasLength(serviceId)){ |
| | | return; |
| | | } |
| | | log.info("rocket收到的消息内容:{} {}", serviceId,message); |
| | | switch (serviceId){ |
| | | case SendTagConstant.ONLINE: |
| | | OnlineMessage onlineMessage = message.getOnlineMessage(); |
| | |
| | | // 持久化消息 |
| | | Ping ping = new Ping(); |
| | | BeanUtils.copyProperties(pingMessage,ping); |
| | | // pingService.create(ping); |
| | | pingService.save(ping); |
| | | //存储缓存中,5分钟有效 |
| | | redisTemplate.opsForValue().set("ping:" + ping.getCharging_pile_code() + ping.getCharging_gun_code(), ping, 5, TimeUnit.MINUTES); |
| | | |
| | |
| | | vo1.setPile_code(pingMessage.getCharging_pile_code()); |
| | | vo1.setStatus(pingMessage.getCharging_gun_status()); |
| | | chargingPileClient.updateChargingPileStatus(vo1); |
| | | try { |
| | | tcecPushUtil.pushSuperviseNotificationStationStatus(chargingGunClient.getChargingGunByFullNumber(pingMessage.getCharging_pile_code()+pingMessage.getCharging_gun_code()).getData()); |
| | | }catch (Exception e){ |
| | | System.out.println("设备状态推送监管平台失败:"+e.getMessage()); |
| | | } |
| | | new Thread(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | try { |
| | | tcecPushUtil.pushSuperviseNotificationStationStatus(chargingGunClient.getChargingGunByFullNumber(pingMessage.getCharging_pile_code()+pingMessage.getCharging_gun_code()).getData()); |
| | | }catch (Exception e){ |
| | | e.printStackTrace(); |
| | | System.out.println("设备状态推送监管平台失败:"+e.getMessage()); |
| | | } |
| | | } |
| | | }).start(); |
| | | break; |
| | | case SendTagConstant.END_CHARGE: |
| | | EndChargeMessage endChargeMessage = message.getEndChargeMessage(); |
| | |
| | | chargingOrderClient.endCharge(endCharge.getTransaction_serial_number()); |
| | | // 监管平台 |
| | | // 查询订单信息 |
| | | try { |
| | | TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); |
| | | tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); |
| | | tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); |
| | | }catch (Exception e){ |
| | | System.out.println("充电结束推送监管平台失败:"+e.getMessage()); |
| | | } |
| | | new Thread(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | try { |
| | | TChargingOrder chargingOrder = chargingOrderClient.getOrderByCode(endCharge.getTransaction_serial_number()).getData(); |
| | | tcecPushUtil.pushSuperviseNotificationChargeOrderInfo(chargingOrder); |
| | | tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); |
| | | }catch (Exception e){ |
| | | e.printStackTrace(); |
| | | System.out.println("充电结束推送监管平台失败:"+e.getMessage()); |
| | | } |
| | | } |
| | | }).start(); |
| | | break; |
| | | case SendTagConstant.ERROR_MESSAGE: |
| | | ErrorMessageMessage errorMessageMessage1 = message.getErrorMessageMessage(); |
| | |
| | | BeanUtils.copyProperties(uploadRealTimeMonitoringData, query); |
| | | chargingOrderClient.chargeMonitoring(query); |
| | | chargingOrder.setEndSoc(uploadRealTimeMonitoringDataMessage.getSoc()+""); |
| | | tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); |
| | | new Thread(new Runnable() { |
| | | @Override |
| | | public void run() { |
| | | tcecPushUtil.pushSuperviseNotificationEquipChargeStatus(chargingOrder); |
| | | } |
| | | }).start(); |
| | | |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |