package com.ruoyi.jianguan.rocket.produce;
|
|
import com.ruoyi.chargingPile.api.feignClient.AccountingStrategyDetailClient;
|
import com.ruoyi.chargingPile.api.feignClient.ChargingGunClient;
|
import com.ruoyi.chargingPile.api.feignClient.ChargingPileClient;
|
import com.ruoyi.chargingPile.api.model.TChargingGun;
|
import com.ruoyi.integration.api.model.*;
|
import com.ruoyi.jianguan.model.ConnectorStatusInfo;
|
import com.ruoyi.jianguan.model.SupChargeOrderInfo;
|
import com.ruoyi.jianguan.model.SupEquipChargeStatus;
|
import com.ruoyi.jianguan.rocket.base.ChargingOrderMessage;
|
import com.ruoyi.jianguan.rocket.base.GunStatusMessage;
|
import com.ruoyi.jianguan.rocket.base.JianGuanMessage;
|
import com.ruoyi.jianguan.rocket.util.EnhanceMessageHandler;
|
import com.ruoyi.jianguan.util.SendTagConstant;
|
import com.ruoyi.jianguan.util.TCECSuperviseUtil;
|
import com.ruoyi.order.api.feignClient.ChargingOrderClient;
|
import com.ruoyi.order.api.model.TChargingOrder;
|
import com.ruoyi.other.api.domain.Operator;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.rocketmq.spring.annotation.MessageModel;
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
import org.springframework.beans.BeanUtils;
|
import org.springframework.cloud.stream.annotation.StreamListener;
|
import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.stereotype.Component;
|
import org.springframework.util.StringUtils;
|
|
import javax.annotation.Resource;
|
import java.math.BigDecimal;
|
import java.text.SimpleDateFormat;
|
import java.time.format.DateTimeFormatter;
|
import java.util.Date;
|
|
@Slf4j
|
@Component
|
@RocketMQMessageListener(
|
messageModel = MessageModel.CLUSTERING,
|
consumerGroup = "jianguan_message",
|
topic = "jianguan_message",
|
selectorExpression = "jianguan_message",
|
consumeThreadMax = 64 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
|
)
|
public class ChargingMessageListener extends EnhanceMessageHandler<JianGuanMessage> implements RocketMQListener<JianGuanMessage> {
|
|
@Resource
|
private ChargingOrderClient chargingOrderClient;
|
@Resource
|
private AccountingStrategyDetailClient accountingStrategyDetailClient;
|
|
@Resource
|
private ChargingPileClient chargingPileClient;
|
@Resource
|
private ChargingGunClient chargingGunClient;
|
|
@Resource
|
private RedisTemplate redisTemplate;
|
@Resource
|
private TCECSuperviseUtil tcecSuperviseUtil;
|
|
private final static String operatorId = "906171535";
|
|
@StreamListener("input")
|
@Override
|
protected void handleMessage(JianGuanMessage message) throws Exception {
|
log.info("rocket收到的消息内容:{}", message);
|
String serviceId = message.getServiceId();
|
if (!StringUtils.hasLength(serviceId)) {
|
return;
|
}
|
log.info("rocket收到的消息内容:{} {}", serviceId, message);
|
switch (serviceId) {
|
case SendTagConstant.GUN_STATUS:
|
GunStatusMessage gunStatusMessage = message.getGunStatusMessage();
|
log.info("推送充电设备接口状态:{}", gunStatusMessage);
|
TChargingGun chargingGun = chargingGunClient.getChargingGunByFullNumber(gunStatusMessage.getFullNumber()).getData();
|
if (chargingGun!=null){
|
ConnectorStatusInfo connectorStatusInfo = new ConnectorStatusInfo();
|
connectorStatusInfo.setOperatorID(operatorId);
|
connectorStatusInfo.setEquipmentOwnerID(operatorId);
|
connectorStatusInfo.setStationID(String.valueOf(chargingGun.getSiteId()));
|
connectorStatusInfo.setEquipmentID(String.valueOf(chargingGun.getChargingPileId()));
|
connectorStatusInfo.setConnectorID(chargingGun.getFullNumber());
|
if (chargingGun.getEquipmentClassification()!=null){
|
connectorStatusInfo.setEquipmentClassification(chargingGun.getEquipmentClassification());
|
}else {
|
connectorStatusInfo.setEquipmentClassification(1);
|
}
|
switch (chargingGun.getStatus()){
|
case 1:
|
connectorStatusInfo.setStatus(0);
|
break;
|
case 2:
|
connectorStatusInfo.setStatus(1);
|
break;
|
case 3:
|
connectorStatusInfo.setStatus(2);
|
break;
|
case 4:
|
connectorStatusInfo.setStatus(3);
|
break;
|
case 5:
|
connectorStatusInfo.setStatus(3);
|
break;
|
case 6:
|
connectorStatusInfo.setStatus(4);
|
break;
|
case 7:
|
connectorStatusInfo.setStatus(255);
|
break;
|
}
|
connectorStatusInfo.setUpdateTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
|
tcecSuperviseUtil.notificationStationStatus(new Operator(), connectorStatusInfo);
|
}
|
|
break;
|
case SendTagConstant.ORDER_INFO:
|
ChargingOrderMessage orderInfoMessage = message.getOrderMessage();
|
log.info("推送充电订单信息:{}", orderInfoMessage);
|
TChargingOrder chargingOrder1 = chargingOrderClient.getOrderByCode(orderInfoMessage.getOrderNumber()).getData();
|
SupChargeOrderInfo supChargeOrderInfo = new SupChargeOrderInfo();
|
supChargeOrderInfo.setOperatorID(operatorId);
|
supChargeOrderInfo.setEquipmentOwnerID(operatorId);
|
supChargeOrderInfo.setStationID(String.valueOf(chargingOrder1.getSiteId()));
|
supChargeOrderInfo.setEquipmentID(String.valueOf(chargingOrder1.getChargingPileId()));
|
supChargeOrderInfo.setOrderNo(operatorId+chargingOrder1.getCode());
|
TChargingGun chargingGun1 = chargingGunClient.getChargingGunById(chargingOrder1.getChargingGunId()).getData();
|
supChargeOrderInfo.setConnectorID(chargingGun1.getFullNumber());
|
supChargeOrderInfo.setEquipmentClassification(1);
|
supChargeOrderInfo.setPushTimeStamp(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
|
supChargeOrderInfo.setStartTime(chargingOrder1.getStartTime() != null ? chargingOrder1.getStartTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) : "");
|
supChargeOrderInfo.setEndTime(chargingOrder1.getEndTime() != null ? chargingOrder1.getEndTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) : "");
|
supChargeOrderInfo.setTotalPower(chargingOrder1.getElectrovalence());
|
supChargeOrderInfo.setTotalElecMoney(chargingOrder1.getElectrovalence());
|
supChargeOrderInfo.setTotalServiceMoney(chargingOrder1.getServiceCharge());
|
supChargeOrderInfo.setTotalMoney(chargingOrder1.getOrderAmount());
|
switch (chargingOrder1.getEndMode()){
|
case 0:
|
supChargeOrderInfo.setStopReason(5);
|
supChargeOrderInfo.setStopDesc("异常终止");
|
break;
|
case 1:
|
supChargeOrderInfo.setStopReason(0);
|
supChargeOrderInfo.setStopDesc("用户手动停止充电");
|
break;
|
case 2:
|
supChargeOrderInfo.setStopReason(1);
|
supChargeOrderInfo.setStopDesc("客户归属地运营商平台停止充电");
|
break;
|
case 3:
|
supChargeOrderInfo.setStopReason(1);
|
supChargeOrderInfo.setStopDesc("费用不足中止");
|
break;
|
}
|
tcecSuperviseUtil.notificationChargeOrderInfo(new Operator(), supChargeOrderInfo);
|
break;
|
case SendTagConstant.ORDER_STATUS:
|
ChargingOrderMessage orderStatusMessage = message.getOrderMessage();
|
log.info("推送充电订单状态:{}", orderStatusMessage);
|
TChargingOrder chargingOrder2 = chargingOrderClient.getOrderByCode(orderStatusMessage.getOrderNumber()).getData();
|
SupEquipChargeStatus supEquipChargeStatus = new SupEquipChargeStatus();
|
supEquipChargeStatus.setOperatorID(operatorId);
|
supEquipChargeStatus.setEquipmentOwnerID(operatorId);
|
supEquipChargeStatus.setStationID(String.valueOf(chargingOrder2.getSiteId()));
|
supEquipChargeStatus.setEquipmentID(String.valueOf(chargingOrder2.getChargingPileId()));
|
supEquipChargeStatus.setOrderNo(operatorId+chargingOrder2.getCode());
|
switch (chargingOrder2.getStatus()){
|
case 2:
|
supEquipChargeStatus.setConnectorStatus(1);
|
break;
|
case 3:
|
supEquipChargeStatus.setConnectorStatus(2);
|
break;
|
case 4:
|
supEquipChargeStatus.setConnectorStatus(3);
|
break;
|
case 5:
|
supEquipChargeStatus.setConnectorStatus(4);
|
break;
|
}
|
TChargingGun chargingGun2 = chargingGunClient.getChargingGunById(chargingOrder2.getChargingGunId()).getData();
|
supEquipChargeStatus.setConnectorID(chargingGun2.getFullNumber());
|
supEquipChargeStatus.setEquipmentClassification(1);
|
supEquipChargeStatus.setPushTimeStamp(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
|
switch (chargingGun2.getStatus()){
|
case 1:
|
supEquipChargeStatus.setConnectorStatus(0);
|
break;
|
case 2:
|
supEquipChargeStatus.setConnectorStatus(1);
|
break;
|
case 3:
|
supEquipChargeStatus.setConnectorStatus(2);
|
break;
|
case 4:
|
supEquipChargeStatus.setConnectorStatus(3);
|
break;
|
case 5:
|
supEquipChargeStatus.setConnectorStatus(3);
|
break;
|
case 6:
|
supEquipChargeStatus.setConnectorStatus(4);
|
break;
|
case 7:
|
supEquipChargeStatus.setConnectorStatus(255);
|
break;
|
}
|
supEquipChargeStatus.setCurrentA(chargingOrder2.getCurrent());
|
|
supEquipChargeStatus.setSOC(StringUtils.hasLength(orderStatusMessage.getSoc())?new BigDecimal(orderStatusMessage.getSoc()):new BigDecimal("1"));
|
supEquipChargeStatus.setStartTime(chargingOrder2.getStartTime() != null ? chargingOrder2.getStartTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) : "");
|
supEquipChargeStatus.setEndTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
|
supEquipChargeStatus.setTotalPower(chargingOrder2.getElectrovalence());
|
tcecSuperviseUtil.notificationSupEquipChargeStatus(new Operator(), supEquipChargeStatus);
|
break;
|
}
|
}
|
|
@Override
|
protected void handleMaxRetriesExceeded(JianGuanMessage message) {
|
// 当超过指定重试次数消息时此处方法会被调用
|
// 生产中可以进行回退或其他业务操作
|
log.error("消息消费失败,请执行后续处理");
|
}
|
|
|
/**
|
* 是否执行重试机制
|
*/
|
@Override
|
protected boolean isRetry() {
|
return true;
|
}
|
|
@Override
|
protected boolean throwException() {
|
// 是否抛出异常,false搭配retry自行处理异常
|
return false;
|
}
|
|
/**
|
* 若需要处理消息过滤,在父级中进行统一处理,或者在此处实现之后,自行处理
|
*
|
* @param message 待处理消息
|
* @return true: 本次消息被过滤,false:不过滤
|
*/
|
@Override
|
protected boolean filter(JianGuanMessage message) {
|
// 此处可做消息过滤
|
return false;
|
}
|
|
/**
|
* 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
|
*/
|
@Override
|
public void onMessage(JianGuanMessage message) {
|
super.dispatchMessage(message);
|
}
|
|
}
|