From 71e051cddfba05a4e7876a92650cc8d7b5fabf28 Mon Sep 17 00:00:00 2001 From: huliguo <2023611923@qq.com> Date: 星期五, 23 五月 2025 20:59:58 +0800 Subject: [PATCH] Merge branch 'dev' of http://120.76.84.145:10101/gitblit/r/java/mx_charging_pile into dev --- ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/governmentCloud/UploadDataTaskUtil.java | 144 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 144 insertions(+), 0 deletions(-) diff --git a/ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/governmentCloud/UploadDataTaskUtil.java b/ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/governmentCloud/UploadDataTaskUtil.java new file mode 100644 index 0000000..7b39d9a --- /dev/null +++ b/ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/governmentCloud/UploadDataTaskUtil.java @@ -0,0 +1,144 @@ +package com.ruoyi.jianguan.governmentCloud; + + +import com.alibaba.fastjson.JSONObject; +import com.ruoyi.common.core.utils.DateUtils; +import com.ruoyi.integration.api.model.*; +import com.ruoyi.jianguan.mongodb.service.*; +import com.ruoyi.jianguan.mqtt.config.TopicConstants; +import com.ruoyi.jianguan.mqtt.util.MqttPushUtil; +import com.ruoyi.jianguan.rocket.model.ErrorMessageMessage; +import io.netty.util.concurrent.DefaultThreadFactory; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.Date; +import java.util.List; +import java.util.concurrent.*; + +/** + * 定时上传政务云数据 + */ +@Slf4j +@Component +public class UploadDataTaskUtil { + + @Autowired + private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService; + @Autowired + private MqttPushUtil mqttPushUtil; + + /** + * 每天的9点执行的任务 + */ +// @Scheduled(cron = "0 0 9 * * *") + @Scheduled(fixedRate = 60000) + public void taskDay(){ + try { + // 传输mongodb的硬件数据 + createCustomThreadPool(); + }catch (Exception e){ + e.printStackTrace(); + } + } + + + /** + * 创建自定义线程池 + * 特点: + * 1. 支持定时及周期性任务 + * 2. 核心线程数固定,但可以不断创建新线程执行后续任务 + * 3. 适用于需要定时执行或周期性执行的场景 + */ + @SneakyThrows + public void createCustomThreadPool() { + /* + 创建自定义线程池 + 字段: + 1. corePoolSize:核心线程池数量 + 2. maximumPoolSize: 最大线程池数量 + 3. keepAliveTime: 线程空闲时间 + 4. unit: 时间单位 + 5. workQueue: 阻塞队列 + 5. threadFactory: 线程工厂 + 5. handler: 拒绝策略 + */ + ThreadPoolExecutor customthreadPoolExecutor = new ThreadPoolExecutor( + 5, // 根据CPU核心数设置 + 10, // 最大应急线程数 + 30, TimeUnit.SECONDS, // 空闲线程存活时间 + new ArrayBlockingQueue<>(100), // 有界队列防止内存溢出 + new DefaultThreadFactory("custom-thread-pool"), // 自定义线程命名 + new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略 + ); + + try { + LocalDate localDate = LocalDate.now().minusDays(1); + String start = localDate + " 00:00:00"; + String end = localDate + " 23:59:59"; + Date startTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(start); + Date endTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(end); + // 查询所有的mango数据 + List<UploadRealTimeMonitoringData> uploadRealTimeMonitoringDataList = uploadRealTimeMonitoringDataService.getRangeTimeData(startTime, endTime); + customthreadPoolExecutor.execute(() -> { + if (uploadRealTimeMonitoringDataList != null && uploadRealTimeMonitoringDataList.size() > 0) { + for (UploadRealTimeMonitoringData uploadRealTimeMonitoringData : uploadRealTimeMonitoringDataList) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("device_code", uploadRealTimeMonitoringData.getCharging_pile_code()); + jsonObject.put("report_time", System.currentTimeMillis()); + jsonObject.put("properties", uploadRealTimeMonitoringData); + mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", uploadRealTimeMonitoringData.getCharging_pile_code()), jsonObject.toJSONString()); + } + } + }); + + TimeUnit.MILLISECONDS.sleep(1); + +// Future<String> future = customthreadPoolExecutor.submit(() -> { +// log.info("线程:{},办理业务", Thread.currentThread().getName()); +// return "业务办理完成"; +// }); +// log.info(future.get()); + } finally { + gracefulShutdown(customthreadPoolExecutor); + } + } + + + /** + * 优雅关闭线程池通用方法 + * + * @param pool 需要关闭的线程池 + */ + private static void gracefulShutdown(ExecutorService pool) { + pool.shutdown(); // 拒绝新任务提交 + try { + // 等待现有任务完成 + if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { + pool.shutdownNow(); // 取消等待中的任务 只等待运行中的任务 + // 再次等待任务响应中断 + if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { + log.error("线程池未完全关闭"); + } + } + } catch (InterruptedException e) { + // 重新尝试关闭 + pool.shutdownNow(); + Thread.currentThread().interrupt(); + } finally { + log.info("线程池是否执行完成:{}", pool.isTerminated()); + } + } + + public static void main(String[] args) { + System.err.println(System.currentTimeMillis()); + } + +} -- Gitblit v1.7.1