huliguo
2025-05-23 71e051cddfba05a4e7876a92650cc8d7b5fabf28
ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/governmentCloud/UploadDataTaskUtil.java
New file
@@ -0,0 +1,144 @@
package com.ruoyi.jianguan.governmentCloud;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.core.utils.DateUtils;
import com.ruoyi.integration.api.model.*;
import com.ruoyi.jianguan.mongodb.service.*;
import com.ruoyi.jianguan.mqtt.config.TopicConstants;
import com.ruoyi.jianguan.mqtt.util.MqttPushUtil;
import com.ruoyi.jianguan.rocket.model.ErrorMessageMessage;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
/**
 * 定时上传政务云数据
 */
@Slf4j
@Component
public class UploadDataTaskUtil {
    @Autowired
    private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService;
    @Autowired
    private MqttPushUtil mqttPushUtil;
    /**
     * 每天的9点执行的任务
     */
//    @Scheduled(cron = "0 0 9 * * *")
    @Scheduled(fixedRate = 60000)
    public void taskDay(){
        try {
            // 传输mongodb的硬件数据
            createCustomThreadPool();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    /**
     * 创建自定义线程池
     * 特点:
     * 1. 支持定时及周期性任务
     * 2. 核心线程数固定,但可以不断创建新线程执行后续任务
     * 3. 适用于需要定时执行或周期性执行的场景
     */
    @SneakyThrows
    public void createCustomThreadPool() {
        /*
          创建自定义线程池
          字段:
          1. corePoolSize:核心线程池数量
          2. maximumPoolSize: 最大线程池数量
          3. keepAliveTime: 线程空闲时间
          4. unit: 时间单位
          5. workQueue: 阻塞队列
          5. threadFactory: 线程工厂
          5. handler: 拒绝策略
         */
        ThreadPoolExecutor customthreadPoolExecutor = new ThreadPoolExecutor(
                5,                                              // 根据CPU核心数设置
                10,                                                         // 最大应急线程数
                30, TimeUnit.SECONDS,                                       // 空闲线程存活时间
                new ArrayBlockingQueue<>(100),                      // 有界队列防止内存溢出
                new DefaultThreadFactory("custom-thread-pool"),   // 自定义线程命名
                new ThreadPoolExecutor.CallerRunsPolicy()                   // 拒绝策略
        );
        try {
            LocalDate localDate = LocalDate.now().minusDays(1);
            String start = localDate + " 00:00:00";
            String end = localDate + " 23:59:59";
            Date startTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(start);
            Date endTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(end);
            // 查询所有的mango数据
            List<UploadRealTimeMonitoringData> uploadRealTimeMonitoringDataList = uploadRealTimeMonitoringDataService.getRangeTimeData(startTime, endTime);
            customthreadPoolExecutor.execute(() -> {
                if (uploadRealTimeMonitoringDataList != null && uploadRealTimeMonitoringDataList.size() > 0) {
                    for (UploadRealTimeMonitoringData uploadRealTimeMonitoringData : uploadRealTimeMonitoringDataList) {
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("device_code", uploadRealTimeMonitoringData.getCharging_pile_code());
                        jsonObject.put("report_time", System.currentTimeMillis());
                        jsonObject.put("properties", uploadRealTimeMonitoringData);
                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", uploadRealTimeMonitoringData.getCharging_pile_code()), jsonObject.toJSONString());
                    }
                }
            });
            TimeUnit.MILLISECONDS.sleep(1);
//            Future<String> future = customthreadPoolExecutor.submit(() -> {
//                log.info("线程:{},办理业务", Thread.currentThread().getName());
//                return "业务办理完成";
//            });
//            log.info(future.get());
        } finally {
            gracefulShutdown(customthreadPoolExecutor);
        }
    }
    /**
     * 优雅关闭线程池通用方法
     *
     * @param pool 需要关闭的线程池
     */
    private static void gracefulShutdown(ExecutorService pool) {
        pool.shutdown(); // 拒绝新任务提交
        try {
            // 等待现有任务完成
            if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                pool.shutdownNow(); // 取消等待中的任务 只等待运行中的任务
                // 再次等待任务响应中断
                if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                    log.error("线程池未完全关闭");
                }
            }
        } catch (InterruptedException e) {
            // 重新尝试关闭
            pool.shutdownNow();
            Thread.currentThread().interrupt();
        } finally {
            log.info("线程池是否执行完成:{}", pool.isTerminated());
        }
    }
    public static void main(String[] args) {
        System.err.println(System.currentTimeMillis());
    }
}