From 1d22a73ebd5cb8bd420e8ab55e18d5bd19bfdc1e Mon Sep 17 00:00:00 2001
From: guyue <1721849008@qq.com>
Date: 星期三, 23 七月 2025 10:22:34 +0800
Subject: [PATCH] 更新判断新平台

---
 src/main/java/com/linghu/timeTask/ScheduledTasks.java |  229 +++++++++++++++++++++++++++++++++------------------------
 1 files changed, 132 insertions(+), 97 deletions(-)

diff --git a/src/main/java/com/linghu/timeTask/ScheduledTasks.java b/src/main/java/com/linghu/timeTask/ScheduledTasks.java
index 9c5d31f..4e00c6a 100644
--- a/src/main/java/com/linghu/timeTask/ScheduledTasks.java
+++ b/src/main/java/com/linghu/timeTask/ScheduledTasks.java
@@ -22,14 +22,10 @@
 import reactor.core.scheduler.Schedulers;
 
 import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import java.time.LocalDateTime;
+import java.time.Duration;
 import java.time.format.DateTimeFormatter;
 import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+
 
 @Component
 @Slf4j
@@ -45,124 +41,163 @@
     @Autowired
     private QuestionService questionService;
 
-    private ScheduledFuture<?> scheduledTask; // 定时任务引用
-    // 健康检查专用调度器(单线程足够)
-    private final ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor();
-    // 任务处理专用调度器(可根据任务量调整线程数)
-    private final ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(2); // 2 个线程
     private volatile boolean isHealthy = true; // 健康状态标识
+    private volatile boolean initialCheckComplete = false; // 初始检查完成标志
+    private volatile boolean taskEnabled = true; // 任务启用开关
     @PostConstruct
     public void init() {
         // 启动健康检查定时任务,每10分钟执行一次
-        healthCheckScheduler.scheduleAtFixedRate(this::checkHealth, 0, 10, TimeUnit.MINUTES);
+        checkInitialHealth();
         // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务)
-        if (isHealthy) {
-            startTaskProcessing();
+
+    }
+
+    /**
+     * 执行初始健康检查,确保系统启动时任务状态正确
+     */
+    private void checkInitialHealth() {
+        log.info("执行系统启动初始健康检查...");
+
+        try {
+            // 同步执行健康检查,最多等待30秒
+            Boolean healthCheckResult = collectController.checkThirdPartyHealth()
+                    .map(response -> "healthy".equalsIgnoreCase(response.getStatus()))
+                    .block(Duration.ofSeconds(30));
+
+            isHealthy = Boolean.TRUE.equals(healthCheckResult);
+            taskEnabled = isHealthy;
+
+            if (isHealthy) {
+                log.info("系统启动时健康检查通过,任务处理将正常执行");
+            } else {
+                log.warn("系统启动时健康检查失败,任务处理将暂停");
+            }
+        } catch (Exception e) {
+            log.error("初始健康检查失败: {}", e.getMessage());
+            isHealthy = false;
+            taskEnabled = false;
+        } finally {
+            initialCheckComplete = true;
         }
     }
 
     /**
-     * 健康检查方法,调用第三方服务健康检查接口
+     * 健康检查定时任务,每10分钟执行一次
      */
-    private void checkHealth() {
-        log.info("开始执行健康检查...");
-        collectController.checkThirdPartyHealth()
-                .subscribe(
-                        response -> {
-                            boolean previousHealthStatus = isHealthy;
-                            isHealthy = "healthy".equalsIgnoreCase(response.getStatus());
+    @Scheduled(initialDelay = 600000, fixedRate = 600000) // 10分钟 = 600000毫秒
+    public void checkHealth() {
+        // 等待初始检查完成
+        if (!initialCheckComplete) {
+            log.debug("初始健康检查未完成,跳过本次健康检查");
+            return;
+        }
 
-                            // 状态变化时记录日志并控制任务执行
-                            if (previousHealthStatus != isHealthy) {
-                                if (isHealthy) {
-                                    log.info("健康检查通过,恢复任务处理");
-                                    startTaskProcessing();
+        log.info("开始执行健康检查...");
+        try {
+            collectController.checkThirdPartyHealth()
+                    .map(response -> "healthy".equalsIgnoreCase(response.getStatus()))
+                    .subscribe(
+                            healthStatus -> {
+                                boolean previousHealthStatus = isHealthy;
+                                isHealthy = healthStatus;
+
+                                // 状态变化时更新任务开关
+                                if (previousHealthStatus != isHealthy) {
+                                    taskEnabled = isHealthy;
+                                    if (isHealthy) {
+                                        log.info("健康检查通过,恢复任务处理");
+                                    } else {
+                                        log.warn("健康检查失败,暂停任务处理");
+                                    }
                                 } else {
-                                    log.warn("健康检查失败,暂停任务处理");
-                                    stopTaskProcessing();
+                                    log.info("健康状态未变化,当前状态: {}", isHealthy ? "健康" : "不健康");
+                                }
+                            },
+                            error -> {
+                                log.error("健康检查请求失败: {}", error.getMessage());
+                                if (isHealthy) {
+                                    isHealthy = false;
+                                    taskEnabled = false;
+                                    log.warn("因健康检查请求失败,暂停任务处理");
                                 }
                             }
-                        },
-                        error -> {
-                            log.error("健康检查请求失败: {}", error.getMessage());
-                            if (isHealthy) { // 仅在健康状态变化时记录并停止任务
-                                isHealthy = false;
-                                log.warn("因健康检查请求失败,暂停任务处理");
-                                stopTaskProcessing();
-                            }
-                        }
-                );
-    }
-
-    /**
-     * 启动任务处理定时执行
-     */
-    private synchronized void startTaskProcessing() {
-        if (scheduledTask == null || scheduledTask.isCancelled()) {
-            scheduledTask = taskScheduler.scheduleAtFixedRate(
-                    this::executeTaskProcessing,
-                    0,
-                    10,
-                    TimeUnit.SECONDS
-            );
-            System.out.println("nima");
-        }
-    }
-
-    /**
-     * 停止任务处理定时执行
-     */
-    private synchronized void stopTaskProcessing() {
-        if (scheduledTask != null && !scheduledTask.isCancelled()) {
-            scheduledTask.cancel(false);
-        }
-    }
-    @PreDestroy
-    public void destroy() {
-        // 关闭健康检查调度器
-        healthCheckScheduler.shutdown();
-        try {
-            if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
-                healthCheckScheduler.shutdownNow();
+                    );
+        } catch (Exception e) {
+            log.error("健康检查执行异常: {}", e.getMessage());
+            if (isHealthy) {
+                isHealthy = false;
+                taskEnabled = false;
             }
-        } catch (InterruptedException e) {
-            healthCheckScheduler.shutdownNow();
-        }
-
-        // 关闭任务处理调度器
-        taskScheduler.shutdown();
-        try {
-            if (!taskScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
-                taskScheduler.shutdownNow();
-            }
-        } catch (InterruptedException e) {
-            taskScheduler.shutdownNow();
         }
     }
+
     /**
-     * 实际的任务处理方法,替代原@Scheduled注解方法
+     * 任务处理定时任务,每10秒执行一次
      */
+    @Scheduled(initialDelay = 0, fixedRate = 10000) // 10秒 = 10000毫秒
     public void executeTaskProcessing() {
+        // 检查初始检查是否完成和任务是否启用
+        if (!initialCheckComplete) {
+            log.debug("初始检查未完成,跳过本次任务处理");
+            return;
+        }
+
+        if (!taskEnabled) {
+            log.debug("任务已被禁用,跳过本次任务处理");
+            return;
+        }
+
         if (!isHealthy) {
             log.debug("系统不健康,跳过任务处理");
             return;
         }
 
-        // 查询所有状态为pending的任务
-        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
-        queryWrapper.eq(KeywordTask::getStatus, "pending");
+        try {
+            // 查询所有状态为pending的任务
+            LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
+            queryWrapper.eq(KeywordTask::getStatus, "pending");
 
-        keywordTaskService.list(queryWrapper)
-                .stream()
-                .filter(task -> task.getTask_id() != null)
-                .forEach(task -> processTaskStatus(task)
-                        .subscribeOn(Schedulers.boundedElastic())
-                        .subscribe(
-                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
-                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
-                        )
-                );
+            List<KeywordTask> tasks = keywordTaskService.list(queryWrapper);
+            log.info("查询到 {} 个待处理任务", tasks.size());
+
+            for (KeywordTask task : tasks) {
+                if (task.getTask_id() != null) {
+                    processTaskStatus(task)
+                            .subscribeOn(Schedulers.boundedElastic())
+                            .subscribe(
+                                    updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
+                                    error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
+                            );
+                }
+            }
+        } catch (Exception e) {
+            log.error("任务处理执行异常: {}", e.getMessage());
+        }
     }
+    /**
+     * 实际的任务处理方法,替代原@Scheduled注解方法
+     */
+//    public void executeTaskProcessing() {
+//        if (!isHealthy) {
+//            log.debug("系统不健康,跳过任务处理");
+//            return;
+//        }
+//
+//        // 查询所有状态为pending的任务
+//        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
+//        queryWrapper.eq(KeywordTask::getStatus, "pending");
+//
+//        keywordTaskService.list(queryWrapper)
+//                .stream()
+//                .filter(task -> task.getTask_id() != null)
+//                .forEach(task -> processTaskStatus(task)
+//                        .subscribeOn(Schedulers.boundedElastic())
+//                        .subscribe(
+//                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
+//                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
+//                        )
+//                );
+//    }
 
 
     private Mono<KeywordTask> processTaskStatus(KeywordTask task) {

--
Gitblit v1.7.1