package com.ruoyi.jianguan.governmentCloud; import com.alibaba.fastjson.JSONObject; import com.ruoyi.common.core.utils.DateUtils; import com.ruoyi.integration.api.model.*; import com.ruoyi.jianguan.mongodb.service.*; import com.ruoyi.jianguan.mqtt.config.TopicConstants; import com.ruoyi.jianguan.mqtt.util.MqttPushUtil; import com.ruoyi.jianguan.rocket.model.ErrorMessageMessage; import io.netty.util.concurrent.DefaultThreadFactory; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.Date; import java.util.List; import java.util.concurrent.*; /** * 定时上传政务云数据 */ @Slf4j @Component public class UploadDataTaskUtil { @Autowired private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; @Autowired private MqttPushUtil mqttPushUtil; /** * 每天的9点执行的任务 */ // @Scheduled(cron = "0 0 9 * * *") @Scheduled(fixedRate = 60000) public void taskDay(){ try { // 传输mongodb的硬件数据 createCustomThreadPool(); }catch (Exception e){ e.printStackTrace(); } } /** * 创建自定义线程池 * 特点: * 1. 支持定时及周期性任务 * 2. 核心线程数固定,但可以不断创建新线程执行后续任务 * 3. 适用于需要定时执行或周期性执行的场景 */ @SneakyThrows public void createCustomThreadPool() { /* 创建自定义线程池 字段: 1. corePoolSize:核心线程池数量 2. maximumPoolSize: 最大线程池数量 3. keepAliveTime: 线程空闲时间 4. unit: 时间单位 5. workQueue: 阻塞队列 5. threadFactory: 线程工厂 5. handler: 拒绝策略 */ ThreadPoolExecutor customthreadPoolExecutor = new ThreadPoolExecutor( 5, // 根据CPU核心数设置 10, // 最大应急线程数 30, TimeUnit.SECONDS, // 空闲线程存活时间 new ArrayBlockingQueue<>(100), // 有界队列防止内存溢出 new DefaultThreadFactory("custom-thread-pool"), // 自定义线程命名 new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略 ); try { LocalDate localDate = LocalDate.now().minusDays(1); String start = localDate + " 00:00:00"; String end = localDate + " 23:59:59"; Date startTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(start); Date endTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(end); // 查询所有的mango数据 List uploadRealTimeMonitoringDataList = uploadRealTimeMonitoringDataService.getRangeTimeData(startTime, endTime); customthreadPoolExecutor.execute(() -> { if (uploadRealTimeMonitoringDataList != null && uploadRealTimeMonitoringDataList.size() > 0) { for (UploadRealTimeMonitoringData uploadRealTimeMonitoringData : uploadRealTimeMonitoringDataList) { JSONObject jsonObject = new JSONObject(); jsonObject.put("device_code", uploadRealTimeMonitoringData.getCharging_pile_code()); jsonObject.put("report_time", System.currentTimeMillis()); jsonObject.put("properties", uploadRealTimeMonitoringData); mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", uploadRealTimeMonitoringData.getCharging_pile_code()), jsonObject.toJSONString()); } } }); TimeUnit.MILLISECONDS.sleep(1); // Future future = customthreadPoolExecutor.submit(() -> { // log.info("线程:{},办理业务", Thread.currentThread().getName()); // return "业务办理完成"; // }); // log.info(future.get()); } finally { gracefulShutdown(customthreadPoolExecutor); } } /** * 优雅关闭线程池通用方法 * * @param pool 需要关闭的线程池 */ private static void gracefulShutdown(ExecutorService pool) { pool.shutdown(); // 拒绝新任务提交 try { // 等待现有任务完成 if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // 取消等待中的任务 只等待运行中的任务 // 再次等待任务响应中断 if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { log.error("线程池未完全关闭"); } } } catch (InterruptedException e) { // 重新尝试关闭 pool.shutdownNow(); Thread.currentThread().interrupt(); } finally { log.info("线程池是否执行完成:{}", pool.isTerminated()); } } public static void main(String[] args) { System.err.println(System.currentTimeMillis()); } }