| | |
| | | |
| | | |
| | | import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
| | | import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; |
| | | import com.linghu.controller.CollectController; |
| | | import com.linghu.model.dto.TaskResultResponse; |
| | | import com.linghu.model.dto.TaskStatusResponse; |
| | | import com.linghu.model.entity.Keyword; |
| | | import com.linghu.model.entity.KeywordTask; |
| | | import com.linghu.model.entity.Question; |
| | | import com.linghu.service.KeywordService; |
| | | import com.linghu.service.KeywordTaskService; |
| | | import com.linghu.service.QuestionService; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.scheduling.annotation.Scheduled; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.web.reactive.function.client.WebClient; |
| | | import reactor.core.publisher.Flux; |
| | | import reactor.core.publisher.Mono; |
| | | import reactor.core.scheduler.Schedulers; |
| | | |
| | | import java.time.LocalDateTime; |
| | | import javax.annotation.PostConstruct; |
| | | import java.time.Duration; |
| | | import java.time.format.DateTimeFormatter; |
| | | import java.util.List; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.stream.Collectors; |
| | | |
| | | |
| | | @Component |
| | | @Slf4j |
| | | public class ScheduledTasks { |
| | | private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class); |
| | | private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); |
| | | |
| | | @Autowired |
| | | private KeywordTaskService keywordTaskService; |
| | | |
| | | @Autowired |
| | | private CollectController collectController; |
| | | @Autowired |
| | | private WebClient webClient; // 假设已配置WebClient |
| | | @Autowired |
| | | private QuestionService questionService; |
| | | @Autowired |
| | | private KeywordService keywordService; |
| | | |
| | | @Scheduled(fixedRate = 5000) // 每5秒执行一次 |
| | | public void scheduleFixedRateTask() { |
| | | // 查询所有状态为pending的任务 |
| | | LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); |
| | | queryWrapper.eq(KeywordTask::getStatus, "pending"); |
| | | private volatile boolean isHealthy = true; // 健康状态标识 |
| | | private volatile boolean initialCheckComplete = false; // 初始检查完成标志 |
| | | private volatile boolean taskEnabled = true; // 任务启用开关 |
| | | @PostConstruct |
| | | public void init() { |
| | | // 启动健康检查定时任务,每10分钟执行一次 |
| | | checkInitialHealth(); |
| | | // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务) |
| | | |
| | | keywordTaskService.list(queryWrapper) |
| | | .stream() |
| | | .filter(task -> task.getTask_id() != null) |
| | | .forEach(task -> processTaskStatus(task) |
| | | .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行 |
| | | .subscribe( |
| | | updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), |
| | | error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) |
| | | ) |
| | | ); |
| | | } |
| | | |
| | | /** |
| | | * 执行初始健康检查,确保系统启动时任务状态正确 |
| | | */ |
| | | private void checkInitialHealth() { |
| | | log.info("执行系统启动初始健康检查..."); |
| | | |
| | | try { |
| | | // 同步执行健康检查,最多等待30秒 |
| | | Boolean healthCheckResult = collectController.checkThirdPartyHealth() |
| | | .map(response -> "healthy".equalsIgnoreCase(response.getStatus())) |
| | | .block(Duration.ofSeconds(30)); |
| | | |
| | | isHealthy = Boolean.TRUE.equals(healthCheckResult); |
| | | taskEnabled = isHealthy; |
| | | |
| | | if (isHealthy) { |
| | | log.info("系统启动时健康检查通过,任务处理将正常执行"); |
| | | } else { |
| | | log.warn("系统启动时健康检查失败,任务处理将暂停"); |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("初始健康检查失败: {}", e.getMessage()); |
| | | isHealthy = false; |
| | | taskEnabled = false; |
| | | } finally { |
| | | initialCheckComplete = true; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 健康检查定时任务,每10分钟执行一次 |
| | | */ |
| | | @Scheduled(initialDelay = 600000, fixedRate = 600000) // 10分钟 = 600000毫秒 |
| | | public void checkHealth() { |
| | | // 等待初始检查完成 |
| | | if (!initialCheckComplete) { |
| | | log.debug("初始健康检查未完成,跳过本次健康检查"); |
| | | return; |
| | | } |
| | | |
| | | log.info("开始执行健康检查..."); |
| | | try { |
| | | collectController.checkThirdPartyHealth() |
| | | .map(response -> "healthy".equalsIgnoreCase(response.getStatus())) |
| | | .subscribe( |
| | | healthStatus -> { |
| | | boolean previousHealthStatus = isHealthy; |
| | | isHealthy = healthStatus; |
| | | |
| | | // 状态变化时更新任务开关 |
| | | if (previousHealthStatus != isHealthy) { |
| | | taskEnabled = isHealthy; |
| | | if (isHealthy) { |
| | | log.info("健康检查通过,恢复任务处理"); |
| | | } else { |
| | | log.warn("健康检查失败,暂停任务处理"); |
| | | } |
| | | } else { |
| | | log.info("健康状态未变化,当前状态: {}", isHealthy ? "健康" : "不健康"); |
| | | } |
| | | }, |
| | | error -> { |
| | | log.error("健康检查请求失败: {}", error.getMessage()); |
| | | if (isHealthy) { |
| | | isHealthy = false; |
| | | taskEnabled = false; |
| | | log.warn("因健康检查请求失败,暂停任务处理"); |
| | | } |
| | | } |
| | | ); |
| | | } catch (Exception e) { |
| | | log.error("健康检查执行异常: {}", e.getMessage()); |
| | | if (isHealthy) { |
| | | isHealthy = false; |
| | | taskEnabled = false; |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 任务处理定时任务,每10秒执行一次 |
| | | */ |
| | | @Scheduled(initialDelay = 0, fixedRate = 10000) // 10秒 = 10000毫秒 |
| | | public void executeTaskProcessing() { |
| | | // 检查初始检查是否完成和任务是否启用 |
| | | if (!initialCheckComplete) { |
| | | log.debug("初始检查未完成,跳过本次任务处理"); |
| | | return; |
| | | } |
| | | |
| | | if (!taskEnabled) { |
| | | log.debug("任务已被禁用,跳过本次任务处理"); |
| | | return; |
| | | } |
| | | |
| | | if (!isHealthy) { |
| | | log.debug("系统不健康,跳过任务处理"); |
| | | return; |
| | | } |
| | | |
| | | try { |
| | | // 查询所有状态为pending的任务 |
| | | LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); |
| | | queryWrapper.eq(KeywordTask::getStatus, "pending"); |
| | | |
| | | List<KeywordTask> tasks = keywordTaskService.list(queryWrapper); |
| | | log.info("查询到 {} 个待处理任务", tasks.size()); |
| | | // 先标记成 processing,避免下一轮又被调度发现 |
| | | if (!tasks.isEmpty()) { |
| | | // 提取查询到的任务id列表 |
| | | List<Integer> taskIds = tasks.stream() |
| | | .map(KeywordTask::getId) // 假设任务有唯一id字段 |
| | | .collect(Collectors.toList()); |
| | | |
| | | // 批量更新:仅更新“id在查询列表中”且“状态仍为pending”的任务 |
| | | boolean updatedCount = keywordTaskService.update( |
| | | new LambdaUpdateWrapper<KeywordTask>() |
| | | .in(KeywordTask::getId, taskIds) // 限定为查询到的任务 |
| | | .eq(KeywordTask::getStatus, "pending") // 确保状态未被其他进程修改 |
| | | .set(KeywordTask::getStatus, "processing") |
| | | ); |
| | | log.info("成功标记 {} 个任务为processing(查询到{}个)", updatedCount, tasks.size()); |
| | | } |
| | | |
| | | for (KeywordTask task : tasks) { |
| | | if (task.getTask_id() != null) { |
| | | |
| | | processTaskStatus(task) |
| | | .subscribeOn(Schedulers.boundedElastic()) |
| | | .subscribe( |
| | | updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), |
| | | error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) |
| | | ); |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("任务处理执行异常: {}", e.getMessage()); |
| | | } |
| | | } |
| | | /** |
| | | * 实际的任务处理方法,替代原@Scheduled注解方法 |
| | | */ |
| | | // public void executeTaskProcessing() { |
| | | // if (!isHealthy) { |
| | | // log.debug("系统不健康,跳过任务处理"); |
| | | // return; |
| | | // } |
| | | // |
| | | // // 查询所有状态为pending的任务 |
| | | // LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); |
| | | // queryWrapper.eq(KeywordTask::getStatus, "pending"); |
| | | // |
| | | // keywordTaskService.list(queryWrapper) |
| | | // .stream() |
| | | // .filter(task -> task.getTask_id() != null) |
| | | // .forEach(task -> processTaskStatus(task) |
| | | // .subscribeOn(Schedulers.boundedElastic()) |
| | | // .subscribe( |
| | | // updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), |
| | | // error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) |
| | | // ) |
| | | // ); |
| | | // } |
| | | |
| | | |
| | | private Mono<KeywordTask> processTaskStatus(KeywordTask task) { |
| | | return collectController.getTaskStatus(task.getTask_id()) |
| | |
| | | t.setStatus("completed"); |
| | | return t; |
| | | }); |
| | | } else if (!"submit".equalsIgnoreCase(statusResponse.getStatus()) |
| | | && !"running".equalsIgnoreCase(statusResponse.getStatus())) { |
| | | } else if (!"submitted".equalsIgnoreCase(statusResponse.getStatus()) |
| | | && !"running".equalsIgnoreCase(statusResponse.getStatus()) |
| | | && !"Error".equalsIgnoreCase(statusResponse.getStatus())) { |
| | | task.setStatus("false"); |
| | | return Mono.just(task); |
| | | } else { |
| | | // 新增:处理status为false时的关键词状态更新 |
| | | return updateKeywordStatusWhenTaskFinished(task) |
| | | .then(Mono.just(task)); |
| | | }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) { |
| | | // 改回 pending,进行下一轮查询 |
| | | task.setStatus("pending"); |
| | | //更新每个提问词的状态 |
| | | return updateQuestionStatus(task, statusResponse); // 抽取为独立方法 |
| | | }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){ |
| | | task.setStatus("nonentity"); |
| | | // 更新关键词状态 |
| | | return updateKeywordStatusWhenTaskFinished(task) |
| | | .then(Mono.just(task)); |
| | | } |
| | | else { |
| | | // 任务仍在进行中,不更新状态 |
| | | return Mono.empty(); |
| | | } |
| | | }) |
| | | .switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务 |
| | | .flatMap(t -> { |
| | | if (!"pending".equalsIgnoreCase(t.getStatus())) { |
| | | |
| | | // 修改这里:将updateById的结果包装成Mono |
| | | return Mono.fromSupplier(() -> keywordTaskService.updateById(t)) |
| | | .thenReturn(t); |
| | | |
| | | // 修改这里:将updateById的结果包装成Mono |
| | | return Mono.fromSupplier(() -> keywordTaskService.updateById(t)) |
| | | .thenReturn(t); |
| | | } |
| | | return Mono.just(t); |
| | | }) |
| | | .onErrorResume(e -> { |
| | | log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage()); |
| | | task.setStatus("error"); |
| | | |
| | | // 修改这里:将updateById的结果包装成Mono |
| | | // 将updateById的结果包装成Mono |
| | | return Mono.fromSupplier(() -> keywordTaskService.updateById(task)) |
| | | .thenReturn(task); |
| | | }); |
| | | } |
| | | |
| | | private Mono<KeywordTask> updateQuestionStatus(KeywordTask task, TaskStatusResponse statusResponse) { |
| | | // 1. 先执行同步查询,获取 List<Question> |
| | | List<Question> questions = questionService.lambdaQuery() |
| | | .eq(Question::getKeyword_id, task.getKeyword_id()) |
| | | .list(); |
| | | |
| | | // 2. 将 List 转为 Flux,再进行响应式处理 |
| | | return Flux.fromIterable(questions) |
| | | .flatMap(question -> { |
| | | // 更新逻辑... |
| | | String newStatus = statusResponse.getQuestions_status().stream() |
| | | .filter(qs -> qs.getQuestion().equals(question.getQuestion())) |
| | | .findFirst() |
| | | .map(qs -> qs.getStatus()) |
| | | .orElse(question.getStatus()); |
| | | question.setStatus(newStatus); |
| | | |
| | | return Mono.fromSupplier(() -> questionService.updateById(question)) |
| | | // 此时可以调用 doOnError 处理异常 |
| | | .doOnError(e -> log.error("更新 Question {} 失败: {}", question.getQuestion_id(), e.getMessage())) |
| | | .onErrorReturn(false); // 异常时返回 false |
| | | }) |
| | | .then(Mono.just(task)); |
| | | } |
| | | /** |
| | | * 当任务状态为false/nonentity时,更新关键词状态 |
| | | */ |
| | | private Mono<Void> updateKeywordStatusWhenTaskFinished(KeywordTask task) { |
| | | return Mono.fromSupplier(() -> { |
| | | Keyword keyword = keywordService.getById(task.getKeyword_id()); |
| | | LambdaQueryWrapper<KeywordTask> keywordTaskWrapper = new LambdaQueryWrapper<>(); |
| | | keywordTaskWrapper.eq(KeywordTask::getKeyword_id, keyword.getKeyword_id()); |
| | | return keywordTaskService.list(keywordTaskWrapper); |
| | | }).flatMap(keywordTasks -> { |
| | | // 检查所有关联任务是否都已完成(包括各种结束状态) |
| | | boolean allCompleted = keywordTasks.stream().allMatch(t -> |
| | | "completed".equals(t.getStatus()) || |
| | | "false".equals(t.getStatus()) || |
| | | "cancelled".equals(t.getStatus()) || |
| | | "canceled".equals(t.getStatus()) || |
| | | "nonentity".equals(t.getStatus()) |
| | | ); |
| | | |
| | | if (allCompleted) { |
| | | Keyword keyword = keywordService.getById(task.getKeyword_id()); |
| | | keyword.setStatus("completed"); |
| | | // |
| | | return Mono.fromSupplier(() -> keywordService.updateById(keyword)) |
| | | .then(); // 转换为Mono<Void> |
| | | } |
| | | return Mono.empty(); |
| | | }); |
| | | } |
| | | } |