package com.linghu.timeTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.linghu.config.FinalStatus; import com.linghu.controller.CollectController; import com.linghu.model.dto.TaskResultResponse; import com.linghu.model.dto.TaskStatusResponse; import com.linghu.model.entity.Keyword; import com.linghu.model.entity.KeywordTask; import com.linghu.model.entity.Question; import com.linghu.service.KeywordService; import com.linghu.service.KeywordTaskService; import com.linghu.service.QuestionService; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import javax.annotation.PostConstruct; import java.time.Duration; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.stream.Collectors; @Component @Slf4j public class ScheduledTasks { private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); @Autowired private KeywordTaskService keywordTaskService; @Autowired private CollectController collectController; @Autowired private QuestionService questionService; @Autowired private KeywordService keywordService; private volatile boolean isHealthy = true; // 健康状态标识 private volatile boolean initialCheckComplete = false; // 初始检查完成标志 private volatile boolean taskEnabled = true; // 任务启用开关 @PostConstruct public void init() { // 启动健康检查定时任务,每10分钟执行一次 checkInitialHealth(); // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务) } /** * 执行初始健康检查,确保系统启动时任务状态正确 */ private void checkInitialHealth() { log.info("执行系统启动初始健康检查..."); try { // 同步执行健康检查,最多等待30秒 Boolean healthCheckResult = collectController.checkThirdPartyHealth() .map(response -> "healthy".equalsIgnoreCase(response.getStatus())) .block(Duration.ofSeconds(30)); isHealthy = Boolean.TRUE.equals(healthCheckResult); taskEnabled = isHealthy; if (isHealthy) { log.info("系统启动时健康检查通过,任务处理将正常执行"); } else { log.warn("系统启动时健康检查失败,任务处理将暂停"); } } catch (Exception e) { log.error("初始健康检查失败: {}", e.getMessage()); isHealthy = false; taskEnabled = false; } finally { initialCheckComplete = true; } } /** * 健康检查定时任务,每10分钟执行一次 */ @Scheduled(initialDelay = 600000, fixedRate = 600000) // 10分钟 = 600000毫秒 public void checkHealth() { // 等待初始检查完成 if (!initialCheckComplete) { log.debug("初始健康检查未完成,跳过本次健康检查"); return; } log.info("开始执行健康检查..."); try { collectController.checkThirdPartyHealth() .map(response -> "healthy".equalsIgnoreCase(response.getStatus())) .subscribe( healthStatus -> { boolean previousHealthStatus = isHealthy; isHealthy = healthStatus; // 状态变化时更新任务开关 if (previousHealthStatus != isHealthy) { taskEnabled = isHealthy; if (isHealthy) { log.info("健康检查通过,恢复任务处理"); } else { log.warn("健康检查失败,暂停任务处理"); } } else { log.info("健康状态未变化,当前状态: {}", isHealthy ? "健康" : "不健康"); } }, error -> { log.error("健康检查请求失败: {}", error.getMessage()); if (isHealthy) { isHealthy = false; taskEnabled = false; log.warn("因健康检查请求失败,暂停任务处理"); } } ); } catch (Exception e) { log.error("健康检查执行异常: {}", e.getMessage()); if (isHealthy) { isHealthy = false; taskEnabled = false; } } } /** * 任务处理定时任务,每10秒执行一次 */ @Scheduled(initialDelay = 0, fixedRate = 10000) // 10秒 = 10000毫秒 public void executeTaskProcessing() { // 检查初始检查是否完成和任务是否启用 if (!initialCheckComplete) { log.debug("初始检查未完成,跳过本次任务处理"); return; } if (!taskEnabled) { log.debug("任务已被禁用,跳过本次任务处理"); return; } if (!isHealthy) { log.debug("系统不健康,跳过任务处理"); return; } try { // 查询所有状态为pending的任务 LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(KeywordTask::getStatus, FinalStatus.PENDING.getValue()); List tasks = keywordTaskService.list(queryWrapper); log.info("查询到 {} 个待处理任务", tasks.size()); // 先标记成 processing,避免下一轮又被调度发现 if (!tasks.isEmpty()) { // 提取查询到的任务id列表 List taskIds = tasks.stream() .map(KeywordTask::getId) // 假设任务有唯一id字段 .collect(Collectors.toList()); // 批量更新:仅更新“id在查询列表中”且“状态仍为pending”的任务 boolean updatedCount = keywordTaskService.update( new LambdaUpdateWrapper() .in(KeywordTask::getId, taskIds) // 限定为查询到的任务 .eq(KeywordTask::getStatus, FinalStatus.PENDING.getValue()) // 确保状态未被其他进程修改 .set(KeywordTask::getStatus, "processing") ); log.info("成功标记 {} 个任务为processing(查询到{}个)", updatedCount, tasks.size()); } for (KeywordTask task : tasks) { if (task.getTask_id() != null) { processTaskStatus(task) .subscribeOn(Schedulers.boundedElastic()) .subscribe( updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) ); } } } catch (Exception e) { log.error("任务处理执行异常: {}", e.getMessage()); } } /** * 实际的任务处理方法, */ private Mono processTaskStatus(KeywordTask task) { return collectController.getTaskStatus(task.getTask_id()) .flatMap(statusResponse -> { if (FinalStatus.COMPLETED.getValue().equalsIgnoreCase(statusResponse.getStatus())) { log.info("任务 {} 已完成,获取结果", task.getTask_id()); return collectController.getTaskResult(task.getTask_id()) .doOnSuccess(result -> log.info("获取任务 {} 结果成功", task.getTask_id())) .thenReturn(task) .map(t -> { t.setStatus(FinalStatus.COMPLETED.getValue()); return t; }); } else if (!FinalStatus.SUBMITTED.getValue().equalsIgnoreCase(statusResponse.getStatus()) && !FinalStatus.RUNNING.getValue().equalsIgnoreCase(statusResponse.getStatus()) && !FinalStatus.ERROR.getValue().equalsIgnoreCase(statusResponse.getStatus())) { task.setStatus(FinalStatus.FALSE.getValue()); // 新增:处理status为false时的关键词状态更新 return updateKeywordStatusWhenTaskFinished(task) .then(Mono.just(task)); }else if ( FinalStatus.RUNNING.getValue().equalsIgnoreCase(statusResponse.getStatus())) { // 改回 pending,进行下一轮查询 task.setStatus(FinalStatus.PENDING.getValue()); //更新每个提问词的状态 return updateQuestionStatus(task, statusResponse); // 抽取为独立方法 }else if(FinalStatus.ERROR.getValue().equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){ task.setStatus(FinalStatus.NONENTITY.getValue()); // 更新关键词状态 return updateKeywordStatusWhenTaskFinished(task) .then(Mono.just(task)); } else { // 任务仍在进行中,不更新状态 return Mono.empty(); } }) .switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务 .flatMap(t -> { // 修改这里:将updateById的结果包装成Mono return Mono.fromSupplier(() -> keywordTaskService.updateById(t)) .thenReturn(t); }) .onErrorResume(e -> { log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage()); task.setStatus(FinalStatus.ERROR.getValue()); // 将updateById的结果包装成Mono return Mono.fromSupplier(() -> keywordTaskService.updateById(task)) .thenReturn(task); }); } private Mono updateQuestionStatus(KeywordTask task, TaskStatusResponse statusResponse) { // 1. 先执行同步查询,获取 List List questions = questionService.lambdaQuery() .eq(Question::getKeyword_id, task.getKeyword_id()) .list(); // 2. 将 List 转为 Flux,再进行响应式处理 return Flux.fromIterable(questions) .flatMap(question -> { // 更新逻辑... String newStatus = statusResponse.getQuestions_status().stream() .filter(qs -> qs.getQuestion().equals(question.getQuestion())) .findFirst() .map(qs -> qs.getStatus()) .orElse(question.getStatus()); question.setStatus(newStatus); return Mono.fromSupplier(() -> questionService.updateById(question)) // 此时可以调用 doOnError 处理异常 .doOnError(e -> log.error("更新 Question {} 失败: {}", question.getQuestion_id(), e.getMessage())) .onErrorReturn(false); // 异常时返回 false }) .then(Mono.just(task)); } /** * 当任务状态为false/nonentity时,更新关键词状态 */ private Mono updateKeywordStatusWhenTaskFinished(KeywordTask task) { return Mono.fromSupplier(() -> { Keyword keyword = keywordService.getById(task.getKeyword_id()); LambdaQueryWrapper keywordTaskWrapper = new LambdaQueryWrapper<>(); keywordTaskWrapper.eq(KeywordTask::getKeyword_id, keyword.getKeyword_id()); return keywordTaskService.list(keywordTaskWrapper); }).flatMap(keywordTasks -> { // 检查所有关联任务是否都已完成(包括各种结束状态) boolean allCompleted = keywordTasks.stream().allMatch(t -> FinalStatus.COMPLETED.getValue().equals(t.getStatus()) || FinalStatus.FALSE.getValue().equals(t.getStatus()) || FinalStatus.CANCELLED.getValue().equals(t.getStatus()) || FinalStatus.NONENTITY.getValue().equals(t.getStatus()) ); if (allCompleted) { Keyword keyword = keywordService.getById(task.getKeyword_id()); keyword.setStatus(FinalStatus.COMPLETED.getValue()); // return Mono.fromSupplier(() -> keywordService.updateById(keyword)) .then(); // 转换为Mono } return Mono.empty(); }); } }