From 71e051cddfba05a4e7876a92650cc8d7b5fabf28 Mon Sep 17 00:00:00 2001
From: huliguo <2023611923@qq.com>
Date: 星期五, 23 五月 2025 20:59:58 +0800
Subject: [PATCH] Merge branch 'dev' of http://120.76.84.145:10101/gitblit/r/java/mx_charging_pile into dev

---
 ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/governmentCloud/UploadDataTaskUtil.java |  144 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 144 insertions(+), 0 deletions(-)

diff --git a/ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/governmentCloud/UploadDataTaskUtil.java b/ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/governmentCloud/UploadDataTaskUtil.java
new file mode 100644
index 0000000..7b39d9a
--- /dev/null
+++ b/ruoyi-service/ruoyi-jianguan/src/main/java/com/ruoyi/jianguan/governmentCloud/UploadDataTaskUtil.java
@@ -0,0 +1,144 @@
+package com.ruoyi.jianguan.governmentCloud;
+
+
+import com.alibaba.fastjson.JSONObject;
+import com.ruoyi.common.core.utils.DateUtils;
+import com.ruoyi.integration.api.model.*;
+import com.ruoyi.jianguan.mongodb.service.*;
+import com.ruoyi.jianguan.mqtt.config.TopicConstants;
+import com.ruoyi.jianguan.mqtt.util.MqttPushUtil;
+import com.ruoyi.jianguan.rocket.model.ErrorMessageMessage;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * 定时上传政务云数据
+ */
+@Slf4j
+@Component
+public class UploadDataTaskUtil {
+
+    @Autowired
+    private UploadRealTimeMonitoringDataService uploadRealTimeMonitoringDataService;
+    @Autowired
+    private MqttPushUtil mqttPushUtil;
+
+    /**
+     * 每天的9点执行的任务
+     */
+//    @Scheduled(cron = "0 0 9 * * *")
+    @Scheduled(fixedRate = 60000)
+    public void taskDay(){
+        try {
+            // 传输mongodb的硬件数据
+            createCustomThreadPool();
+        }catch (Exception e){
+            e.printStackTrace();
+        }
+    }
+
+
+    /**
+     * 创建自定义线程池
+     * 特点:
+     * 1. 支持定时及周期性任务
+     * 2. 核心线程数固定,但可以不断创建新线程执行后续任务
+     * 3. 适用于需要定时执行或周期性执行的场景
+     */
+    @SneakyThrows
+    public void createCustomThreadPool() {
+        /*
+          创建自定义线程池
+          字段:
+          1. corePoolSize:核心线程池数量
+          2. maximumPoolSize: 最大线程池数量
+          3. keepAliveTime: 线程空闲时间
+          4. unit: 时间单位
+          5. workQueue: 阻塞队列
+          5. threadFactory: 线程工厂
+          5. handler: 拒绝策略
+         */
+        ThreadPoolExecutor customthreadPoolExecutor = new ThreadPoolExecutor(
+                5,                                              // 根据CPU核心数设置
+                10,                                                         // 最大应急线程数
+                30, TimeUnit.SECONDS,                                       // 空闲线程存活时间
+                new ArrayBlockingQueue<>(100),                      // 有界队列防止内存溢出
+                new DefaultThreadFactory("custom-thread-pool"),   // 自定义线程命名
+                new ThreadPoolExecutor.CallerRunsPolicy()                   // 拒绝策略
+        );
+
+        try {
+            LocalDate localDate = LocalDate.now().minusDays(1);
+            String start = localDate + " 00:00:00";
+            String end = localDate + " 23:59:59";
+            Date startTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(start);
+            Date endTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(end);
+            // 查询所有的mango数据
+            List<UploadRealTimeMonitoringData> uploadRealTimeMonitoringDataList = uploadRealTimeMonitoringDataService.getRangeTimeData(startTime, endTime);
+            customthreadPoolExecutor.execute(() -> {
+                if (uploadRealTimeMonitoringDataList != null && uploadRealTimeMonitoringDataList.size() > 0) {
+                    for (UploadRealTimeMonitoringData uploadRealTimeMonitoringData : uploadRealTimeMonitoringDataList) {
+                        JSONObject jsonObject = new JSONObject();
+                        jsonObject.put("device_code", uploadRealTimeMonitoringData.getCharging_pile_code());
+                        jsonObject.put("report_time", System.currentTimeMillis());
+                        jsonObject.put("properties", uploadRealTimeMonitoringData);
+                        mqttPushUtil.pushChargePileData(TopicConstants.CHARGE_PILE_CODE.replace("CHARGE_PILE_CODE", uploadRealTimeMonitoringData.getCharging_pile_code()), jsonObject.toJSONString());
+                    }
+                }
+            });
+
+            TimeUnit.MILLISECONDS.sleep(1);
+
+//            Future<String> future = customthreadPoolExecutor.submit(() -> {
+//                log.info("线程:{},办理业务", Thread.currentThread().getName());
+//                return "业务办理完成";
+//            });
+//            log.info(future.get());
+        } finally {
+            gracefulShutdown(customthreadPoolExecutor);
+        }
+    }
+
+
+    /**
+     * 优雅关闭线程池通用方法
+     *
+     * @param pool 需要关闭的线程池
+     */
+    private static void gracefulShutdown(ExecutorService pool) {
+        pool.shutdown(); // 拒绝新任务提交
+        try {
+            // 等待现有任务完成
+            if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
+                pool.shutdownNow(); // 取消等待中的任务 只等待运行中的任务
+                // 再次等待任务响应中断
+                if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
+                    log.error("线程池未完全关闭");
+                }
+            }
+        } catch (InterruptedException e) {
+            // 重新尝试关闭
+            pool.shutdownNow();
+            Thread.currentThread().interrupt();
+        } finally {
+            log.info("线程池是否执行完成:{}", pool.isTerminated());
+        }
+    }
+
+    public static void main(String[] args) {
+        System.err.println(System.currentTimeMillis());
+    }
+
+}

--
Gitblit v1.7.1