From 1d22a73ebd5cb8bd420e8ab55e18d5bd19bfdc1e Mon Sep 17 00:00:00 2001 From: guyue <1721849008@qq.com> Date: 星期三, 23 七月 2025 10:22:34 +0800 Subject: [PATCH] 更新判断新平台 --- src/main/java/com/linghu/timeTask/ScheduledTasks.java | 230 +++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 206 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/linghu/timeTask/ScheduledTasks.java b/src/main/java/com/linghu/timeTask/ScheduledTasks.java index 95c2a50..4e00c6a 100644 --- a/src/main/java/com/linghu/timeTask/ScheduledTasks.java +++ b/src/main/java/com/linghu/timeTask/ScheduledTasks.java @@ -2,53 +2,203 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.linghu.controller.CollectController; import com.linghu.model.dto.TaskResultResponse; import com.linghu.model.dto.TaskStatusResponse; import com.linghu.model.entity.KeywordTask; +import com.linghu.model.entity.Question; import com.linghu.service.KeywordTaskService; +import com.linghu.service.QuestionService; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -import java.time.LocalDateTime; +import javax.annotation.PostConstruct; +import java.time.Duration; import java.time.format.DateTimeFormatter; import java.util.List; -import java.util.concurrent.TimeUnit; + @Component +@Slf4j public class ScheduledTasks { - private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class); private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); @Autowired private KeywordTaskService keywordTaskService; - @Autowired private CollectController collectController; + @Autowired + private WebClient webClient; // 假设已配置WebClient + @Autowired + private QuestionService questionService; - @Scheduled(fixedRate = 5000) // 每5秒执行一次 - public void scheduleFixedRateTask() { - // 查询所有状态为pending的任务 - LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(KeywordTask::getStatus, "pending"); + private volatile boolean isHealthy = true; // 健康状态标识 + private volatile boolean initialCheckComplete = false; // 初始检查完成标志 + private volatile boolean taskEnabled = true; // 任务启用开关 + @PostConstruct + public void init() { + // 启动健康检查定时任务,每10分钟执行一次 + checkInitialHealth(); + // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务) - keywordTaskService.list(queryWrapper) - .stream() - .filter(task -> task.getTask_id() != null) - .forEach(task -> processTaskStatus(task) - .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行 - .subscribe( - updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), - error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) - ) - ); } + + /** + * 执行初始健康检查,确保系统启动时任务状态正确 + */ + private void checkInitialHealth() { + log.info("执行系统启动初始健康检查..."); + + try { + // 同步执行健康检查,最多等待30秒 + Boolean healthCheckResult = collectController.checkThirdPartyHealth() + .map(response -> "healthy".equalsIgnoreCase(response.getStatus())) + .block(Duration.ofSeconds(30)); + + isHealthy = Boolean.TRUE.equals(healthCheckResult); + taskEnabled = isHealthy; + + if (isHealthy) { + log.info("系统启动时健康检查通过,任务处理将正常执行"); + } else { + log.warn("系统启动时健康检查失败,任务处理将暂停"); + } + } catch (Exception e) { + log.error("初始健康检查失败: {}", e.getMessage()); + isHealthy = false; + taskEnabled = false; + } finally { + initialCheckComplete = true; + } + } + + /** + * 健康检查定时任务,每10分钟执行一次 + */ + @Scheduled(initialDelay = 600000, fixedRate = 600000) // 10分钟 = 600000毫秒 + public void checkHealth() { + // 等待初始检查完成 + if (!initialCheckComplete) { + log.debug("初始健康检查未完成,跳过本次健康检查"); + return; + } + + log.info("开始执行健康检查..."); + try { + collectController.checkThirdPartyHealth() + .map(response -> "healthy".equalsIgnoreCase(response.getStatus())) + .subscribe( + healthStatus -> { + boolean previousHealthStatus = isHealthy; + isHealthy = healthStatus; + + // 状态变化时更新任务开关 + if (previousHealthStatus != isHealthy) { + taskEnabled = isHealthy; + if (isHealthy) { + log.info("健康检查通过,恢复任务处理"); + } else { + log.warn("健康检查失败,暂停任务处理"); + } + } else { + log.info("健康状态未变化,当前状态: {}", isHealthy ? "健康" : "不健康"); + } + }, + error -> { + log.error("健康检查请求失败: {}", error.getMessage()); + if (isHealthy) { + isHealthy = false; + taskEnabled = false; + log.warn("因健康检查请求失败,暂停任务处理"); + } + } + ); + } catch (Exception e) { + log.error("健康检查执行异常: {}", e.getMessage()); + if (isHealthy) { + isHealthy = false; + taskEnabled = false; + } + } + } + + /** + * 任务处理定时任务,每10秒执行一次 + */ + @Scheduled(initialDelay = 0, fixedRate = 10000) // 10秒 = 10000毫秒 + public void executeTaskProcessing() { + // 检查初始检查是否完成和任务是否启用 + if (!initialCheckComplete) { + log.debug("初始检查未完成,跳过本次任务处理"); + return; + } + + if (!taskEnabled) { + log.debug("任务已被禁用,跳过本次任务处理"); + return; + } + + if (!isHealthy) { + log.debug("系统不健康,跳过任务处理"); + return; + } + + try { + // 查询所有状态为pending的任务 + LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(KeywordTask::getStatus, "pending"); + + List<KeywordTask> tasks = keywordTaskService.list(queryWrapper); + log.info("查询到 {} 个待处理任务", tasks.size()); + + for (KeywordTask task : tasks) { + if (task.getTask_id() != null) { + processTaskStatus(task) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe( + updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), + error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) + ); + } + } + } catch (Exception e) { + log.error("任务处理执行异常: {}", e.getMessage()); + } + } + /** + * 实际的任务处理方法,替代原@Scheduled注解方法 + */ +// public void executeTaskProcessing() { +// if (!isHealthy) { +// log.debug("系统不健康,跳过任务处理"); +// return; +// } +// +// // 查询所有状态为pending的任务 +// LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); +// queryWrapper.eq(KeywordTask::getStatus, "pending"); +// +// keywordTaskService.list(queryWrapper) +// .stream() +// .filter(task -> task.getTask_id() != null) +// .forEach(task -> processTaskStatus(task) +// .subscribeOn(Schedulers.boundedElastic()) +// .subscribe( +// updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), +// error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) +// ) +// ); +// } + private Mono<KeywordTask> processTaskStatus(KeywordTask task) { return collectController.getTaskStatus(task.getTask_id()) @@ -62,11 +212,19 @@ t.setStatus("completed"); return t; }); - } else if (!"submit".equalsIgnoreCase(statusResponse.getStatus()) - && !"running".equalsIgnoreCase(statusResponse.getStatus())) { + } else if (!"submitted".equalsIgnoreCase(statusResponse.getStatus()) + && !"running".equalsIgnoreCase(statusResponse.getStatus()) + && !"Error".equalsIgnoreCase(statusResponse.getStatus())) { task.setStatus("false"); return Mono.just(task); - } else { + }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) { + //更新每个提问词的状态 + return updateQuestionStatus(task, statusResponse); // 抽取为独立方法 + }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&"任务不存在".equalsIgnoreCase(statusResponse.getMessage())){ + task.setStatus("nonentity"); + return Mono.just(task); + } + else { // 任务仍在进行中,不更新状态 return Mono.empty(); } @@ -74,7 +232,6 @@ .switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务 .flatMap(t -> { if (!"pending".equalsIgnoreCase(t.getStatus())) { - // 修改这里:将updateById的结果包装成Mono return Mono.fromSupplier(() -> keywordTaskService.updateById(t)) @@ -86,9 +243,34 @@ log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage()); task.setStatus("error"); - // 修改这里:将updateById的结果包装成Mono + // 将updateById的结果包装成Mono return Mono.fromSupplier(() -> keywordTaskService.updateById(task)) .thenReturn(task); }); } + + private Mono<KeywordTask> updateQuestionStatus(KeywordTask task, TaskStatusResponse statusResponse) { + // 1. 先执行同步查询,获取 List<Question> + List<Question> questions = questionService.lambdaQuery() + .eq(Question::getKeyword_id, task.getKeyword_id()) + .list(); + + // 2. 将 List 转为 Flux,再进行响应式处理 + return Flux.fromIterable(questions) + .flatMap(question -> { + // 更新逻辑... + String newStatus = statusResponse.getQuestions_status().stream() + .filter(qs -> qs.getQuestion().equals(question.getQuestion())) + .findFirst() + .map(qs -> qs.getStatus()) + .orElse(question.getStatus()); + question.setStatus(newStatus); + + return Mono.fromSupplier(() -> questionService.updateById(question)) + // 此时可以调用 doOnError 处理异常 + .doOnError(e -> log.error("更新 Question {} 失败: {}", question.getQuestion_id(), e.getMessage())) + .onErrorReturn(false); // 异常时返回 false + }) + .then(Mono.just(task)); + } } \ No newline at end of file -- Gitblit v1.7.1