From 8964e3f22fb3a10835c88aaa23d6a522d650c304 Mon Sep 17 00:00:00 2001
From: guyue <1721849008@qq.com>
Date: 星期三, 16 七月 2025 00:38:02 +0800
Subject: [PATCH] 定时任务健康检查

---
 src/main/java/com/linghu/timeTask/ScheduledTasks.java |  137 ++++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 133 insertions(+), 4 deletions(-)

diff --git a/src/main/java/com/linghu/timeTask/ScheduledTasks.java b/src/main/java/com/linghu/timeTask/ScheduledTasks.java
index 4c45557..7ec4f3c 100644
--- a/src/main/java/com/linghu/timeTask/ScheduledTasks.java
+++ b/src/main/java/com/linghu/timeTask/ScheduledTasks.java
@@ -7,23 +7,31 @@
 import com.linghu.model.dto.TaskStatusResponse;
 import com.linghu.model.entity.KeywordTask;
 import com.linghu.service.KeywordTaskService;
+import lombok.extern.slf4j.Slf4j;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.client.WebClient;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 @Component
+@Slf4j
 public class ScheduledTasks {
-    private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class);
+//    private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class);
     private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
 
     @Autowired
@@ -31,9 +39,113 @@
 
     @Autowired
     private CollectController collectController;
+    @Autowired
+    private WebClient webClient; // 假设已配置WebClient
 
-    @Scheduled(fixedRate = 10000) // 每5秒执行一次
-    public void scheduleFixedRateTask() {
+    private String baseUrl = "http://thirdparty-service"; // 第三方服务基础URL
+    private ScheduledFuture<?> scheduledTask; // 定时任务引用
+    // 健康检查专用调度器(单线程足够)
+    private final ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor();
+    // 任务处理专用调度器(可根据任务量调整线程数)
+    private final ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(2); // 2 个线程
+    private volatile boolean isHealthy = true; // 健康状态标识
+    @PostConstruct
+    public void init() {
+        // 启动健康检查定时任务,每10分钟执行一次
+        healthCheckScheduler.scheduleAtFixedRate(this::checkHealth, 0, 10, TimeUnit.MINUTES);
+        // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务)
+        if (isHealthy) {
+            startTaskProcessing();
+        }
+    }
+
+    /**
+     * 健康检查方法,调用第三方服务健康检查接口
+     */
+    private void checkHealth() {
+        log.info("开始执行健康检查...");
+        collectController.checkThirdPartyHealth()
+                .subscribe(
+                        response -> {
+                            boolean previousHealthStatus = isHealthy;
+                            isHealthy = "healthy".equalsIgnoreCase(response.getStatus());
+
+                            // 状态变化时记录日志并控制任务执行
+                            if (previousHealthStatus != isHealthy) {
+                                if (isHealthy) {
+                                    log.info("健康检查通过,恢复任务处理");
+                                    startTaskProcessing();
+                                } else {
+                                    log.warn("健康检查失败,暂停任务处理");
+                                    stopTaskProcessing();
+                                }
+                            }
+                        },
+                        error -> {
+                            log.error("健康检查请求失败: {}", error.getMessage());
+                            if (isHealthy) { // 仅在健康状态变化时记录并停止任务
+                                isHealthy = false;
+                                log.warn("因健康检查请求失败,暂停任务处理");
+                                stopTaskProcessing();
+                            }
+                        }
+                );
+    }
+
+    /**
+     * 启动任务处理定时执行
+     */
+    private synchronized void startTaskProcessing() {
+        if (scheduledTask == null || scheduledTask.isCancelled()) {
+            scheduledTask = taskScheduler.scheduleAtFixedRate(
+                    this::executeTaskProcessing,
+                    0,
+                    10,
+                    TimeUnit.SECONDS
+            );
+            System.out.println("nima");
+        }
+    }
+
+    /**
+     * 停止任务处理定时执行
+     */
+    private synchronized void stopTaskProcessing() {
+        if (scheduledTask != null && !scheduledTask.isCancelled()) {
+            scheduledTask.cancel(false);
+        }
+    }
+    @PreDestroy
+    public void destroy() {
+        // 关闭健康检查调度器
+        healthCheckScheduler.shutdown();
+        try {
+            if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+                healthCheckScheduler.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            healthCheckScheduler.shutdownNow();
+        }
+
+        // 关闭任务处理调度器
+        taskScheduler.shutdown();
+        try {
+            if (!taskScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+                taskScheduler.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            taskScheduler.shutdownNow();
+        }
+    }
+    /**
+     * 实际的任务处理方法,替代原@Scheduled注解方法
+     */
+    public void executeTaskProcessing() {
+        if (!isHealthy) {
+            log.debug("系统不健康,跳过任务处理");
+            return;
+        }
+
         // 查询所有状态为pending的任务
         LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
         queryWrapper.eq(KeywordTask::getStatus, "pending");
@@ -42,13 +154,30 @@
                 .stream()
                 .filter(task -> task.getTask_id() != null)
                 .forEach(task -> processTaskStatus(task)
-                        .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行
+                        .subscribeOn(Schedulers.boundedElastic())
                         .subscribe(
                                 updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
                                 error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
                         )
                 );
     }
+//    @Scheduled(fixedRate = 10000) // 每5秒执行一次
+//    public void scheduleFixedRateTask() {
+//        // 查询所有状态为pending的任务
+//        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
+//        queryWrapper.eq(KeywordTask::getStatus, "pending");
+//
+//        keywordTaskService.list(queryWrapper)
+//                .stream()
+//                .filter(task -> task.getTask_id() != null)
+//                .forEach(task -> processTaskStatus(task)
+//                        .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行
+//                        .subscribe(
+//                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
+//                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
+//                        )
+//                );
+//    }
 
     private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
         return collectController.getTaskStatus(task.getTask_id())

--
Gitblit v1.7.1