Pu Zhibing
昨天 50dde470ec18f292e8e58547ef1c4a4cbd4138b2
ruoyi-service/ruoyi-integration/src/main/java/com/ruoyi/integration/iotda/utils/listener/IotMessageListener.java
@@ -10,6 +10,8 @@
import com.ruoyi.common.core.domain.R;
import com.ruoyi.common.core.web.domain.AjaxResult;
import com.ruoyi.integration.api.model.*;
import com.ruoyi.integration.drainage.TCECUtil;
import com.ruoyi.integration.drainage.model.NotificationChargeOrderInfoResult;
import com.ruoyi.integration.iotda.constant.SendTagConstant;
import com.ruoyi.integration.iotda.enums.ServiceIdMenu;
import com.ruoyi.integration.iotda.model.*;
@@ -29,6 +31,8 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
@@ -54,7 +58,9 @@
    
    @Resource
    private ChargingMessageUtil chargingMessageUtil;
    private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(100, 500, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    
    
    
@@ -116,23 +122,43 @@
                break;
            case SendTagConstant.PING:
                long PING = System.currentTimeMillis();
                PingMessage pingMessage = JSON.parseObject(content.toJSONString(),PingMessage.class);
                //存储缓存中,5分钟有效
//                redisTemplate.opsForValue().set("ping:" + pingMessage.getCharging_pile_code() + pingMessage.getCharging_gun_code(), pingMessage, 5, TimeUnit.MINUTES);
                String fullName = pingMessage.getCharging_pile_code() + pingMessage.getCharging_gun_code();
                //主动丢弃一次心跳数据,用于降低调华为接口评率,防止被限流
                boolean heartRate = redisTemplate.hasKey("HeartRate:" + fullName);
                if(heartRate){
                    redisTemplate.delete("HeartRate:" + fullName);
                    break;
                }
                redisTemplate.opsForValue().set("HeartRate:" + fullName, System.currentTimeMillis());
                // 响应硬件
                Pong pong = new Pong();
                pong.setCharging_pile_code(pingMessage.getCharging_pile_code());
                pong.setCharging_gun_code(pingMessage.getCharging_gun_code());
                pong.setCharging_gun_status(0);
                iotMessageProduce.sendMessage(pong.getCharging_pile_code(), ServiceIdMenu.PONG.getKey(), messageUtil.pong(pong));
                Long time = (Long) redisTemplate.opsForHash().get("charging_gun_online", (pingMessage.getCharging_pile_code() + pingMessage.getCharging_gun_code()));
                //缓解高并发调华为接口触发限流
//                threadPoolExecutor.execute(()->{
//                    long longValue = Double.valueOf(Math.random() * 1000).longValue();
//                    System.err.println("随机等待" + longValue + "毫秒:" + fullName);
//                    try {
//                        Thread.sleep(longValue);
//                    } catch (InterruptedException e) {
//                        throw new RuntimeException(e);
//                    }
//
//                    // 响应硬件
//                    Pong pong = new Pong();
//                    pong.setCharging_pile_code(pingMessage.getCharging_pile_code());
//                    pong.setCharging_gun_code(pingMessage.getCharging_gun_code());
//                    pong.setCharging_gun_status(0);
//                    iotMessageProduce.sendMessage(pong.getCharging_pile_code(), ServiceIdMenu.PONG.getKey(), messageUtil.pong(pong));
//                });
                Long time = (Long) redisTemplate.opsForHash().get("charging_gun_online", fullName);
                //小于1分钟才处理数据,防止频繁查询数据
                if(null != time && (System.currentTimeMillis() - time) < 60000){
                    log.warn("PING消息处理:{} 毫秒", System.currentTimeMillis() - PING);
                    break;
                }