package com.linghu.timeTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.linghu.controller.CollectController; import com.linghu.model.dto.TaskResultResponse; import com.linghu.model.dto.TaskStatusResponse; import com.linghu.model.entity.KeywordTask; import com.linghu.model.entity.Question; import com.linghu.service.KeywordTaskService; import com.linghu.service.QuestionService; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @Component @Slf4j public class ScheduledTasks { // private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class); private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); @Autowired private KeywordTaskService keywordTaskService; @Autowired private CollectController collectController; @Autowired private WebClient webClient; // 假设已配置WebClient @Autowired private QuestionService questionService; private String baseUrl = "http://thirdparty-service"; // 第三方服务基础URL private ScheduledFuture scheduledTask; // 定时任务引用 // 健康检查专用调度器(单线程足够) private final ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(); // 任务处理专用调度器(可根据任务量调整线程数) private final ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(2); // 2 个线程 private volatile boolean isHealthy = true; // 健康状态标识 @PostConstruct public void init() { // 启动健康检查定时任务,每10分钟执行一次 healthCheckScheduler.scheduleAtFixedRate(this::checkHealth, 0, 10, TimeUnit.MINUTES); // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务) if (isHealthy) { startTaskProcessing(); } } /** * 健康检查方法,调用第三方服务健康检查接口 */ private void checkHealth() { log.info("开始执行健康检查..."); collectController.checkThirdPartyHealth() .subscribe( response -> { boolean previousHealthStatus = isHealthy; isHealthy = "healthy".equalsIgnoreCase(response.getStatus()); // 状态变化时记录日志并控制任务执行 if (previousHealthStatus != isHealthy) { if (isHealthy) { log.info("健康检查通过,恢复任务处理"); startTaskProcessing(); } else { log.warn("健康检查失败,暂停任务处理"); stopTaskProcessing(); } } }, error -> { log.error("健康检查请求失败: {}", error.getMessage()); if (isHealthy) { // 仅在健康状态变化时记录并停止任务 isHealthy = false; log.warn("因健康检查请求失败,暂停任务处理"); stopTaskProcessing(); } } ); } /** * 启动任务处理定时执行 */ private synchronized void startTaskProcessing() { if (scheduledTask == null || scheduledTask.isCancelled()) { scheduledTask = taskScheduler.scheduleAtFixedRate( this::executeTaskProcessing, 0, 10, TimeUnit.SECONDS ); System.out.println("nima"); } } /** * 停止任务处理定时执行 */ private synchronized void stopTaskProcessing() { if (scheduledTask != null && !scheduledTask.isCancelled()) { scheduledTask.cancel(false); } } @PreDestroy public void destroy() { // 关闭健康检查调度器 healthCheckScheduler.shutdown(); try { if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) { healthCheckScheduler.shutdownNow(); } } catch (InterruptedException e) { healthCheckScheduler.shutdownNow(); } // 关闭任务处理调度器 taskScheduler.shutdown(); try { if (!taskScheduler.awaitTermination(5, TimeUnit.SECONDS)) { taskScheduler.shutdownNow(); } } catch (InterruptedException e) { taskScheduler.shutdownNow(); } } /** * 实际的任务处理方法,替代原@Scheduled注解方法 */ public void executeTaskProcessing() { if (!isHealthy) { log.debug("系统不健康,跳过任务处理"); return; } // 查询所有状态为pending的任务 LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(KeywordTask::getStatus, "pending"); keywordTaskService.list(queryWrapper) .stream() .filter(task -> task.getTask_id() != null) .forEach(task -> processTaskStatus(task) .subscribeOn(Schedulers.boundedElastic()) .subscribe( updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) ) ); } private Mono processTaskStatus(KeywordTask task) { return collectController.getTaskStatus(task.getTask_id()) .flatMap(statusResponse -> { if ("completed".equalsIgnoreCase(statusResponse.getStatus())) { log.info("任务 {} 已完成,获取结果", task.getTask_id()); return collectController.getTaskResult(task.getTask_id()) .doOnSuccess(result -> log.info("获取任务 {} 结果成功", task.getTask_id())) .thenReturn(task) .map(t -> { t.setStatus("completed"); return t; }); } else if (!"submitted".equalsIgnoreCase(statusResponse.getStatus()) && !"running".equalsIgnoreCase(statusResponse.getStatus()) && !"Error".equalsIgnoreCase(statusResponse.getStatus())) { task.setStatus("false"); return Mono.just(task); }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) { //更新每个提问词的状态 return updateQuestionStatus(task, statusResponse); // 抽取为独立方法 }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&"任务不存在".equalsIgnoreCase(statusResponse.getMessage())){ task.setStatus("nonentity"); return Mono.just(task); } else { // 任务仍在进行中,不更新状态 return Mono.empty(); } }) .switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务 .flatMap(t -> { if (!"pending".equalsIgnoreCase(t.getStatus())) { // 修改这里:将updateById的结果包装成Mono return Mono.fromSupplier(() -> keywordTaskService.updateById(t)) .thenReturn(t); } return Mono.just(t); }) .onErrorResume(e -> { log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage()); task.setStatus("error"); // 修改这里:将updateById的结果包装成Mono return Mono.fromSupplier(() -> keywordTaskService.updateById(task)) .thenReturn(task); }); } private Mono updateQuestionStatus(KeywordTask task, TaskStatusResponse statusResponse) { // 1. 先执行同步查询,获取 List List questions = questionService.lambdaQuery() .eq(Question::getKeyword_id, task.getKeyword_id()) .list(); // 2. 将 List 转为 Flux,再进行响应式处理 return Flux.fromIterable(questions) .flatMap(question -> { // 更新逻辑... String newStatus = statusResponse.getQuestions_status().stream() .filter(qs -> qs.getQuestion().equals(question.getQuestion())) .findFirst() .map(qs -> qs.getStatus()) .orElse(question.getStatus()); question.setStatus(newStatus); return Mono.fromSupplier(() -> questionService.updateById(question)) // 此时可以调用 doOnError 处理异常 .doOnError(e -> log.error("更新 Question {} 失败: {}", question.getQuestion_id(), e.getMessage())) .onErrorReturn(false); // 异常时返回 false }) .then(Mono.just(task)); } }