package com.ruoyi.jianguan.governmentCloud;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.ruoyi.common.core.utils.DateUtils;
|
import com.ruoyi.integration.api.model.*;
|
import com.ruoyi.jianguan.mongodb.service.*;
|
import com.ruoyi.jianguan.mqtt.config.TopicConstants;
|
import com.ruoyi.jianguan.mqtt.util.MqttPushUtil;
|
import com.ruoyi.jianguan.rocket.model.ErrorMessageMessage;
|
import io.netty.util.concurrent.DefaultThreadFactory;
|
import lombok.SneakyThrows;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.stereotype.Component;
|
|
import java.time.LocalDate;
|
import java.time.LocalDateTime;
|
import java.util.List;
|
import java.util.concurrent.*;
|
|
/**
|
* 定时上传政务云数据
|
*/
|
@Slf4j
|
@Component
|
public class UploadDataTaskUtil {
|
|
@Autowired
|
private AcquisitionBillingModeService acquisitionBillingModeService;
|
@Autowired
|
private BillingModeVerifyService billingModeVerifyService;
|
@Autowired
|
private BmsAbortService bmsAbortService;
|
@Autowired
|
private BmsDemandAndChargerExportationService bmsDemandAndChargerExportationService;
|
@Autowired
|
private OnlineService onlineService;
|
@Autowired
|
private EndChargeService endChargeService;
|
@Autowired
|
private ErrorMessageMessageService errorMessageMessageService;
|
@Autowired
|
private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService;
|
@Autowired
|
private ChargingHandshakeService chargingHandshakeService;
|
@Autowired
|
private ParameterSettingService parameterSettingService;
|
@Autowired
|
private MotorAbortService motorAbortService;
|
@Autowired
|
private BmsInformationService bmsInformationService;
|
@Autowired
|
private ChargingPileStartsChargingService chargingPileStartsChargingService;
|
@Autowired
|
private PlatformStartChargingReplyService platformStartChargingReplyService;
|
@Autowired
|
private PlatformStopChargingReplyService platformStopChargingReplyService;
|
@Autowired
|
private TransactionRecordService transactionRecordService;
|
@Autowired
|
private UpdateBalanceReplyService updateBalanceReplyService;
|
@Autowired
|
private SynchronizeOfflineCardReplyService synchronizeOfflineCardReplyService;
|
@Autowired
|
private ClearOfflineCardReplyService clearOfflineCardReplyService;
|
@Autowired
|
private WorkingParameterSettingReplyService workingParameterSettingReplyService;
|
@Autowired
|
private TimingSettingService timingSettingService;
|
@Autowired
|
private SetupBillingModelReplyService setupBillingModelReplyService;
|
@Autowired
|
private GroundLockRealTimeDataService groundLockRealTimeDataService;
|
@Autowired
|
private ChargingPileReturnsGroundLockDataService chargingPileReturnsGroundLockDataService;
|
@Autowired
|
private PlatformRestartReplyService platformRestartReplyService;
|
@Autowired
|
private PlatformRemoteUpdateReplyService platformRemoteUpdateReplyService;
|
@Autowired
|
private QrCodeDeliveryReplyService qrCodeDeliveryReplyService;
|
@Autowired
|
private SecurityDetectionService securityDetectionService;
|
|
/**
|
* 每天的9点执行的任务
|
*/
|
@Scheduled(cron = "0 0 9 * * *")
|
// @Scheduled(fixedRate = 60000)
|
public void taskDay(){
|
try {
|
// 传输mongodb的硬件数据
|
createCustomThreadPool();
|
}catch (Exception e){
|
e.printStackTrace();
|
}
|
}
|
|
|
/**
|
* 创建自定义线程池
|
* 特点:
|
* 1. 支持定时及周期性任务
|
* 2. 核心线程数固定,但可以不断创建新线程执行后续任务
|
* 3. 适用于需要定时执行或周期性执行的场景
|
*/
|
@SneakyThrows
|
public void createCustomThreadPool() {
|
/*
|
创建自定义线程池
|
字段:
|
1. corePoolSize:核心线程池数量
|
2. maximumPoolSize: 最大线程池数量
|
3. keepAliveTime: 线程空闲时间
|
4. unit: 时间单位
|
5. workQueue: 阻塞队列
|
5. threadFactory: 线程工厂
|
5. handler: 拒绝策略
|
*/
|
ThreadPoolExecutor customthreadPoolExecutor = new ThreadPoolExecutor(
|
5, // 根据CPU核心数设置
|
10, // 最大应急线程数
|
30, TimeUnit.SECONDS, // 空闲线程存活时间
|
new ArrayBlockingQueue<>(100), // 有界队列防止内存溢出
|
new DefaultThreadFactory("custom-thread-pool"), // 自定义线程命名
|
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
|
);
|
|
try {
|
LocalDate localDate = LocalDate.now().minusDays(1);
|
String startTime = localDate + " 00:00:00";
|
String endTime = localDate + " 23:59:59";
|
long l = System.currentTimeMillis();
|
System.err.println("开始时间:"+l);
|
// 查询所有的mango数据
|
List<AcquisitionBillingMode> acquisitionBillingModeList = acquisitionBillingModeService.getRangeTimeData(startTime, endTime);
|
List<BillingModeVerify> billingModeVerifyList = billingModeVerifyService.getRangeTimeData(startTime, endTime);
|
List<BmsAbort> bmsAbortList = bmsAbortService.getRangeTimeData(startTime, endTime);
|
List<BmsDemandAndChargerExportation> bmsDemandAndChargerExportationList = bmsDemandAndChargerExportationService.getRangeTimeData(startTime, endTime);
|
List<Online> onlineList = onlineService.getRangeTimeData(startTime, endTime);
|
List<EndCharge> endChargeList = endChargeService.getRangeTimeData(startTime, endTime);
|
List<ErrorMessageMessage> errorMessageMessageList = errorMessageMessageService.getRangeTimeData(startTime, endTime);
|
List<UploadRealTimeMonitoringData> uploadRealTimeMonitoringDataList = uploadRealTimeMonitoringDataService.getRangeTimeData(startTime, endTime);
|
List<ChargingHandshake> chargingHandshakeList = chargingHandshakeService.getRangeTimeData(startTime, endTime);
|
List<ParameterSetting> parameterSettingList = parameterSettingService.getRangeTimeData(startTime, endTime);
|
List<MotorAbort> motorAbortList = motorAbortService.getRangeTimeData(startTime, endTime);
|
List<BmsInformation> bmsInformationList = bmsInformationService.getRangeTimeData(startTime, endTime);
|
List<ChargingPileStartsCharging> chargingPileStartsChargingList = chargingPileStartsChargingService.getRangeTimeData(startTime, endTime);
|
List<PlatformStartChargingReply> platformStartChargingReplyList = platformStartChargingReplyService.getRangeTimeData(startTime, endTime);
|
List<PlatformStopChargingReply> platformStopChargingReplyList = platformStopChargingReplyService.getRangeTimeData(startTime, endTime);
|
List<TransactionRecord> transactionRecordList = transactionRecordService.getRangeTimeData(startTime, endTime);
|
List<UpdateBalanceReply> updateBalanceReplyList = updateBalanceReplyService.getRangeTimeData(startTime, endTime);
|
List<SynchronizeOfflineCardReply> synchronizeOfflineCardReplyList = synchronizeOfflineCardReplyService.getRangeTimeData(startTime, endTime);
|
List<ClearOfflineCardReply> clearOfflineCardReplyList = clearOfflineCardReplyService.getRangeTimeData(startTime, endTime);
|
List<WorkingParameterSettingReply> workingParameterSettingReplyList = workingParameterSettingReplyService.getRangeTimeData(startTime, endTime);
|
List<TimingSetting> timingSettingList = timingSettingService.getRangeTimeData(startTime, endTime);
|
List<SetupBillingModelReply> setupBillingModelReplyList = setupBillingModelReplyService.getRangeTimeData(startTime, endTime);
|
List<GroundLockRealTimeData> groundLockRealTimeDataList = groundLockRealTimeDataService.getRangeTimeData(startTime, endTime);
|
List<ChargingPileReturnsGroundLockData> chargingPileReturnsGroundLockDataList = chargingPileReturnsGroundLockDataService.getRangeTimeData(startTime, endTime);
|
List<PlatformRestartReply> platformRestartReplyList = platformRestartReplyService.getRangeTimeData(startTime, endTime);
|
List<PlatformRemoteUpdateReply> platformRemoteUpdateReplyList = platformRemoteUpdateReplyService.getRangeTimeData(startTime, endTime);
|
List<QrCodeDeliveryReply> qrCodeDeliveryReplyList = qrCodeDeliveryReplyService.getRangeTimeData(startTime, endTime);
|
List<SecurityDetection> securityDetectionList = securityDetectionService.getRangeTimeData(startTime, endTime);
|
System.err.println("结束时间:"+(System.currentTimeMillis()-l));
|
customthreadPoolExecutor.execute(() -> {
|
// if (acquisitionBillingModeList != null && acquisitionBillingModeList.size() > 0) {
|
// for (AcquisitionBillingMode acquisitionBillingMode : acquisitionBillingModeList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", acquisitionBillingMode.getCharging_pile_code()), JSONObject.toJSONString(acquisitionBillingMode));
|
// }
|
// }
|
// if (billingModeVerifyList != null && billingModeVerifyList.size() > 0) {
|
// for (BillingModeVerify billingModeVerify : billingModeVerifyList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", billingModeVerify.getCharging_pile_code()), JSONObject.toJSONString(billingModeVerify));
|
// }
|
// }
|
// if (bmsAbortList != null && bmsAbortList.size() > 0) {
|
// for (BmsAbort bmsAbort : bmsAbortList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", bmsAbort.getCharging_pile_code()), JSONObject.toJSONString(bmsAbort));
|
// }
|
// }
|
// if (bmsDemandAndChargerExportationList != null && bmsDemandAndChargerExportationList.size() > 0) {
|
// for (BmsDemandAndChargerExportation bmsDemandAndChargerExportation : bmsDemandAndChargerExportationList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", bmsDemandAndChargerExportation.getCharging_pile_code()), JSONObject.toJSONString(bmsDemandAndChargerExportation));
|
// }
|
// }
|
// if (onlineList != null && onlineList.size() > 0) {
|
// for (Online online : onlineList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", online.getCharging_pile_code()), JSONObject.toJSONString(online));
|
// }
|
// }
|
// if (endChargeList != null && endChargeList.size() > 0) {
|
// for (EndCharge endCharge : endChargeList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", endCharge.getCharging_pile_code()), JSONObject.toJSONString(endCharge));
|
// }
|
// }
|
// if (errorMessageMessageList != null && errorMessageMessageList.size() > 0) {
|
// for (ErrorMessageMessage errorMessageMessage : errorMessageMessageList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", errorMessageMessage.getCharging_pile_code()), JSONObject.toJSONString(errorMessageMessage));
|
// }
|
// }
|
// if (uploadRealTimeMonitoringDataList != null && uploadRealTimeMonitoringDataList.size() > 0) {
|
// for (UploadRealTimeMonitoringData uploadRealTimeMonitoringData : uploadRealTimeMonitoringDataList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", uploadRealTimeMonitoringData.getCharging_pile_code()), JSONObject.toJSONString(uploadRealTimeMonitoringData));
|
// }
|
// }
|
// if (chargingHandshakeList != null && chargingHandshakeList.size() > 0) {
|
// for (ChargingHandshake chargingHandshake : chargingHandshakeList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", chargingHandshake.getCharging_pile_code()), JSONObject.toJSONString(chargingHandshake));
|
// }
|
// }
|
// if (parameterSettingList != null && parameterSettingList.size() > 0) {
|
// for (ParameterSetting parameterSetting : parameterSettingList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", parameterSetting.getCharging_pile_code()), JSONObject.toJSONString(parameterSetting));
|
// }
|
// }
|
// if (motorAbortList != null && motorAbortList.size() > 0) {
|
// for (MotorAbort motorAbort : motorAbortList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", motorAbort.getCharging_pile_code()), JSONObject.toJSONString(motorAbort));
|
// }
|
// }
|
// if (bmsInformationList != null && bmsInformationList.size() > 0) {
|
// for (BmsInformation bmsInformation : bmsInformationList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", bmsInformation.getCharging_pile_code()), JSONObject.toJSONString(bmsInformation));
|
// }
|
// }
|
// if (chargingPileStartsChargingList != null && chargingPileStartsChargingList.size() > 0) {
|
// for (ChargingPileStartsCharging chargingPileStartsCharging : chargingPileStartsChargingList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", chargingPileStartsCharging.getCharging_pile_code()),JSONObject.toJSONString(chargingPileStartsCharging));
|
// }
|
// }
|
// if (platformStartChargingReplyList != null && platformStartChargingReplyList.size() > 0) {
|
// for (PlatformStartChargingReply platformStartChargingReply : platformStartChargingReplyList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", platformStartChargingReply.getCharging_pile_code()), JSONObject.toJSONString(platformStartChargingReply));
|
// }
|
// }
|
// if (platformStopChargingReplyList != null && platformStopChargingReplyList.size() > 0) {
|
// for (PlatformStopChargingReply platformStopChargingReply : platformStopChargingReplyList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", platformStopChargingReply.getCharging_pile_code()), JSONObject.toJSONString(platformStopChargingReply));
|
// }
|
// }
|
// if (transactionRecordList != null && transactionRecordList.size() > 0) {
|
// for (TransactionRecord transactionRecord : transactionRecordList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", transactionRecord.getCharging_pile_code()), JSONObject.toJSONString(transactionRecord));
|
// }
|
// }
|
//// if (updateBalanceReplyList != null && updateBalanceReplyList.size() > 0) {
|
//// for (UpdateBalanceReply updateBalanceReply : updateBalanceReplyList) {
|
//// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", updateBalanceReply.getCharging_pile_code()), JSONObject.toJSONString(updateBalanceReply));
|
//// }
|
//// }
|
//// if (synchronizeOfflineCardReplyList != null && synchronizeOfflineCardReplyList.size() > 0) {
|
//// for (SynchronizeOfflineCardReply synchronizeOfflineCardReply : synchronizeOfflineCardReplyList) {
|
//// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", synchronizeOfflineCardReply.getCharging_pile_code()), JSONObject.toJSONString(synchronizeOfflineCardReply));
|
//// }
|
//// }
|
//// if (clearOfflineCardReplyList != null && clearOfflineCardReplyList.size() > 0) {
|
//// for (ClearOfflineCardReply clearOfflineCardReply : clearOfflineCardReplyList) {
|
//// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", clearOfflineCardReply.getCharging_pile_code()), JSONObject.toJSONString(clearOfflineCardReply));
|
//// }
|
//// }
|
// if (workingParameterSettingReplyList != null && workingParameterSettingReplyList.size() > 0) {
|
// for (WorkingParameterSettingReply workingParameterSettingReply : workingParameterSettingReplyList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", workingParameterSettingReply.getCharging_pile_code()), JSONObject.toJSONString(workingParameterSettingReply));
|
// }
|
// }
|
// if (timingSettingList != null && timingSettingList.size() > 0) {
|
// for (TimingSetting timingSetting : timingSettingList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", timingSetting.getCharging_pile_code()), JSONObject.toJSONString(timingSetting));
|
// }
|
// }
|
//// if (setupBillingModelReplyList != null && setupBillingModelReplyList.size() > 0) {
|
//// for (SetupBillingModelReply setupBillingModelReply : setupBillingModelReplyList) {
|
//// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", setupBillingModelReply.getCharging_pile_code()), JSONObject.toJSONString(setupBillingModelReply));
|
//// }
|
//// }
|
// if (groundLockRealTimeDataList != null && groundLockRealTimeDataList.size() > 0) {
|
// for (GroundLockRealTimeData groundLockRealTimeData : groundLockRealTimeDataList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", groundLockRealTimeData.getCharging_pile_code()), JSONObject.toJSONString(groundLockRealTimeData));
|
// }
|
// }
|
// if (chargingPileReturnsGroundLockDataList != null && chargingPileReturnsGroundLockDataList.size() > 0) {
|
// for (ChargingPileReturnsGroundLockData chargingPileReturnsGroundLockData : chargingPileReturnsGroundLockDataList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", chargingPileReturnsGroundLockData.getCharging_pile_code()), JSONObject.toJSONString(chargingPileReturnsGroundLockData));
|
// }
|
// }
|
// if (platformRestartReplyList != null && platformRestartReplyList.size() > 0) {
|
// for (PlatformRestartReply platformRestartReply : platformRestartReplyList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", platformRestartReply.getCharging_pile_code()), JSONObject.toJSONString(platformRestartReply));
|
// }
|
// }
|
// if (platformRemoteUpdateReplyList != null && platformRemoteUpdateReplyList.size() > 0) {
|
// for (PlatformRemoteUpdateReply platformRemoteUpdateReply : platformRemoteUpdateReplyList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", platformRemoteUpdateReply.getCharging_pile_code()), JSONObject.toJSONString(platformRemoteUpdateReply));
|
// }
|
// }
|
// if (qrCodeDeliveryReplyList != null && qrCodeDeliveryReplyList.size() > 0) {
|
// for (QrCodeDeliveryReply qrCodeDeliveryReply : qrCodeDeliveryReplyList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", qrCodeDeliveryReply.getCharging_pile_code()), JSONObject.toJSONString(qrCodeDeliveryReply));
|
// }
|
// }
|
// if (securityDetectionList != null && securityDetectionList.size() > 0) {
|
// for (SecurityDetection securityDetection : securityDetectionList) {
|
// MqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", securityDetection.getCharging_pile_code()), JSONObject.toJSONString(securityDetection));
|
// }
|
// }
|
|
});
|
|
TimeUnit.MILLISECONDS.sleep(1);
|
|
// Future<String> future = customthreadPoolExecutor.submit(() -> {
|
// log.info("线程:{},办理业务", Thread.currentThread().getName());
|
// return "业务办理完成";
|
// });
|
// log.info(future.get());
|
} finally {
|
gracefulShutdown(customthreadPoolExecutor);
|
}
|
}
|
|
|
/**
|
* 优雅关闭线程池通用方法
|
*
|
* @param pool 需要关闭的线程池
|
*/
|
private static void gracefulShutdown(ExecutorService pool) {
|
pool.shutdown(); // 拒绝新任务提交
|
try {
|
// 等待现有任务完成
|
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
|
pool.shutdownNow(); // 取消等待中的任务 只等待运行中的任务
|
// 再次等待任务响应中断
|
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
|
log.error("线程池未完全关闭");
|
}
|
}
|
} catch (InterruptedException e) {
|
// 重新尝试关闭
|
pool.shutdownNow();
|
Thread.currentThread().interrupt();
|
} finally {
|
log.info("线程池是否执行完成:{}", pool.isTerminated());
|
}
|
}
|
|
}
|