guyue
2025-07-16 8964e3f22fb3a10835c88aaa23d6a522d650c304
定时任务健康检查
1个文件已修改
137 ■■■■■ 已修改文件
src/main/java/com/linghu/timeTask/ScheduledTasks.java 137 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/linghu/timeTask/ScheduledTasks.java
@@ -7,23 +7,31 @@
import com.linghu.model.dto.TaskStatusResponse;
import com.linghu.model.entity.KeywordTask;
import com.linghu.service.KeywordTaskService;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class ScheduledTasks {
    private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class);
//    private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class);
    private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
    @Autowired
@@ -31,9 +39,113 @@
    @Autowired
    private CollectController collectController;
    @Autowired
    private WebClient webClient; // 假设已配置WebClient
    @Scheduled(fixedRate = 10000) // 每5秒执行一次
    public void scheduleFixedRateTask() {
    private String baseUrl = "http://thirdparty-service"; // 第三方服务基础URL
    private ScheduledFuture<?> scheduledTask; // 定时任务引用
    // 健康检查专用调度器(单线程足够)
    private final ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor();
    // 任务处理专用调度器(可根据任务量调整线程数)
    private final ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(2); // 2 个线程
    private volatile boolean isHealthy = true; // 健康状态标识
    @PostConstruct
    public void init() {
        // 启动健康检查定时任务,每10分钟执行一次
        healthCheckScheduler.scheduleAtFixedRate(this::checkHealth, 0, 10, TimeUnit.MINUTES);
        // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务)
        if (isHealthy) {
            startTaskProcessing();
        }
    }
    /**
     * 健康检查方法,调用第三方服务健康检查接口
     */
    private void checkHealth() {
        log.info("开始执行健康检查...");
        collectController.checkThirdPartyHealth()
                .subscribe(
                        response -> {
                            boolean previousHealthStatus = isHealthy;
                            isHealthy = "healthy".equalsIgnoreCase(response.getStatus());
                            // 状态变化时记录日志并控制任务执行
                            if (previousHealthStatus != isHealthy) {
                                if (isHealthy) {
                                    log.info("健康检查通过,恢复任务处理");
                                    startTaskProcessing();
                                } else {
                                    log.warn("健康检查失败,暂停任务处理");
                                    stopTaskProcessing();
                                }
                            }
                        },
                        error -> {
                            log.error("健康检查请求失败: {}", error.getMessage());
                            if (isHealthy) { // 仅在健康状态变化时记录并停止任务
                                isHealthy = false;
                                log.warn("因健康检查请求失败,暂停任务处理");
                                stopTaskProcessing();
                            }
                        }
                );
    }
    /**
     * 启动任务处理定时执行
     */
    private synchronized void startTaskProcessing() {
        if (scheduledTask == null || scheduledTask.isCancelled()) {
            scheduledTask = taskScheduler.scheduleAtFixedRate(
                    this::executeTaskProcessing,
                    0,
                    10,
                    TimeUnit.SECONDS
            );
            System.out.println("nima");
        }
    }
    /**
     * 停止任务处理定时执行
     */
    private synchronized void stopTaskProcessing() {
        if (scheduledTask != null && !scheduledTask.isCancelled()) {
            scheduledTask.cancel(false);
        }
    }
    @PreDestroy
    public void destroy() {
        // 关闭健康检查调度器
        healthCheckScheduler.shutdown();
        try {
            if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                healthCheckScheduler.shutdownNow();
            }
        } catch (InterruptedException e) {
            healthCheckScheduler.shutdownNow();
        }
        // 关闭任务处理调度器
        taskScheduler.shutdown();
        try {
            if (!taskScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                taskScheduler.shutdownNow();
            }
        } catch (InterruptedException e) {
            taskScheduler.shutdownNow();
        }
    }
    /**
     * 实际的任务处理方法,替代原@Scheduled注解方法
     */
    public void executeTaskProcessing() {
        if (!isHealthy) {
            log.debug("系统不健康,跳过任务处理");
            return;
        }
        // 查询所有状态为pending的任务
        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(KeywordTask::getStatus, "pending");
@@ -42,13 +154,30 @@
                .stream()
                .filter(task -> task.getTask_id() != null)
                .forEach(task -> processTaskStatus(task)
                        .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行
                        .subscribeOn(Schedulers.boundedElastic())
                        .subscribe(
                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
                        )
                );
    }
//    @Scheduled(fixedRate = 10000) // 每5秒执行一次
//    public void scheduleFixedRateTask() {
//        // 查询所有状态为pending的任务
//        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
//        queryWrapper.eq(KeywordTask::getStatus, "pending");
//
//        keywordTaskService.list(queryWrapper)
//                .stream()
//                .filter(task -> task.getTask_id() != null)
//                .forEach(task -> processTaskStatus(task)
//                        .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行
//                        .subscribe(
//                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
//                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
//                        )
//                );
//    }
    private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
        return collectController.getTaskStatus(task.getTask_id())