From 8964e3f22fb3a10835c88aaa23d6a522d650c304 Mon Sep 17 00:00:00 2001 From: guyue <1721849008@qq.com> Date: 星期三, 16 七月 2025 00:38:02 +0800 Subject: [PATCH] 定时任务健康检查 --- src/main/java/com/linghu/timeTask/ScheduledTasks.java | 137 ++++++++++++++++++++++++++++++++++++++++++++- 1 files changed, 133 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/linghu/timeTask/ScheduledTasks.java b/src/main/java/com/linghu/timeTask/ScheduledTasks.java index 4c45557..7ec4f3c 100644 --- a/src/main/java/com/linghu/timeTask/ScheduledTasks.java +++ b/src/main/java/com/linghu/timeTask/ScheduledTasks.java @@ -7,23 +7,31 @@ import com.linghu.model.dto.TaskStatusResponse; import com.linghu.model.entity.KeywordTask; import com.linghu.service.KeywordTaskService; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @Component +@Slf4j public class ScheduledTasks { - private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class); +// private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class); private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); @Autowired @@ -31,9 +39,113 @@ @Autowired private CollectController collectController; + @Autowired + private WebClient webClient; // 假设已配置WebClient - @Scheduled(fixedRate = 10000) // 每5秒执行一次 - public void scheduleFixedRateTask() { + private String baseUrl = "http://thirdparty-service"; // 第三方服务基础URL + private ScheduledFuture<?> scheduledTask; // 定时任务引用 + // 健康检查专用调度器(单线程足够) + private final ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(); + // 任务处理专用调度器(可根据任务量调整线程数) + private final ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(2); // 2 个线程 + private volatile boolean isHealthy = true; // 健康状态标识 + @PostConstruct + public void init() { + // 启动健康检查定时任务,每10分钟执行一次 + healthCheckScheduler.scheduleAtFixedRate(this::checkHealth, 0, 10, TimeUnit.MINUTES); + // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务) + if (isHealthy) { + startTaskProcessing(); + } + } + + /** + * 健康检查方法,调用第三方服务健康检查接口 + */ + private void checkHealth() { + log.info("开始执行健康检查..."); + collectController.checkThirdPartyHealth() + .subscribe( + response -> { + boolean previousHealthStatus = isHealthy; + isHealthy = "healthy".equalsIgnoreCase(response.getStatus()); + + // 状态变化时记录日志并控制任务执行 + if (previousHealthStatus != isHealthy) { + if (isHealthy) { + log.info("健康检查通过,恢复任务处理"); + startTaskProcessing(); + } else { + log.warn("健康检查失败,暂停任务处理"); + stopTaskProcessing(); + } + } + }, + error -> { + log.error("健康检查请求失败: {}", error.getMessage()); + if (isHealthy) { // 仅在健康状态变化时记录并停止任务 + isHealthy = false; + log.warn("因健康检查请求失败,暂停任务处理"); + stopTaskProcessing(); + } + } + ); + } + + /** + * 启动任务处理定时执行 + */ + private synchronized void startTaskProcessing() { + if (scheduledTask == null || scheduledTask.isCancelled()) { + scheduledTask = taskScheduler.scheduleAtFixedRate( + this::executeTaskProcessing, + 0, + 10, + TimeUnit.SECONDS + ); + System.out.println("nima"); + } + } + + /** + * 停止任务处理定时执行 + */ + private synchronized void stopTaskProcessing() { + if (scheduledTask != null && !scheduledTask.isCancelled()) { + scheduledTask.cancel(false); + } + } + @PreDestroy + public void destroy() { + // 关闭健康检查调度器 + healthCheckScheduler.shutdown(); + try { + if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) { + healthCheckScheduler.shutdownNow(); + } + } catch (InterruptedException e) { + healthCheckScheduler.shutdownNow(); + } + + // 关闭任务处理调度器 + taskScheduler.shutdown(); + try { + if (!taskScheduler.awaitTermination(5, TimeUnit.SECONDS)) { + taskScheduler.shutdownNow(); + } + } catch (InterruptedException e) { + taskScheduler.shutdownNow(); + } + } + /** + * 实际的任务处理方法,替代原@Scheduled注解方法 + */ + public void executeTaskProcessing() { + if (!isHealthy) { + log.debug("系统不健康,跳过任务处理"); + return; + } + // 查询所有状态为pending的任务 LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(KeywordTask::getStatus, "pending"); @@ -42,13 +154,30 @@ .stream() .filter(task -> task.getTask_id() != null) .forEach(task -> processTaskStatus(task) - .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行 + .subscribeOn(Schedulers.boundedElastic()) .subscribe( updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) ) ); } +// @Scheduled(fixedRate = 10000) // 每5秒执行一次 +// public void scheduleFixedRateTask() { +// // 查询所有状态为pending的任务 +// LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); +// queryWrapper.eq(KeywordTask::getStatus, "pending"); +// +// keywordTaskService.list(queryWrapper) +// .stream() +// .filter(task -> task.getTask_id() != null) +// .forEach(task -> processTaskStatus(task) +// .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行 +// .subscribe( +// updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), +// error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) +// ) +// ); +// } private Mono<KeywordTask> processTaskStatus(KeywordTask task) { return collectController.getTaskStatus(task.getTask_id()) -- Gitblit v1.7.1