package com.linghu.timeTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.linghu.controller.CollectController; import com.linghu.model.dto.TaskResultResponse; import com.linghu.model.dto.TaskStatusResponse; import com.linghu.model.entity.KeywordTask; import com.linghu.service.KeywordTaskService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.concurrent.TimeUnit; @Component public class ScheduledTasks { private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class); private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); @Autowired private KeywordTaskService keywordTaskService; @Autowired private CollectController collectController; @Scheduled(fixedRate = 10000) // 每5秒执行一次 public void scheduleFixedRateTask() { // 查询所有状态为pending的任务 LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(KeywordTask::getStatus, "pending"); keywordTaskService.list(queryWrapper) .stream() .filter(task -> task.getTask_id() != null) .forEach(task -> processTaskStatus(task) .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行 .subscribe( updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) ) ); } private Mono processTaskStatus(KeywordTask task) { return collectController.getTaskStatus(task.getTask_id()) .flatMap(statusResponse -> { if ("completed".equalsIgnoreCase(statusResponse.getStatus())) { log.info("任务 {} 已完成,获取结果", task.getTask_id()); return collectController.getTaskResult(task.getTask_id()) .doOnSuccess(result -> log.info("获取任务 {} 结果成功", task.getTask_id())) .thenReturn(task) .map(t -> { t.setStatus("completed"); return t; }); } else if (!"submitted".equalsIgnoreCase(statusResponse.getStatus()) && !"running".equalsIgnoreCase(statusResponse.getStatus()) && !"Error".equalsIgnoreCase(statusResponse.getStatus())) { task.setStatus("false"); return Mono.just(task); }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&"任务不存在".equalsIgnoreCase(statusResponse.getMessage())){ task.setStatus("false"); return Mono.just(task); } else { // 任务仍在进行中,不更新状态 return Mono.empty(); } }) .switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务 .flatMap(t -> { if (!"pending".equalsIgnoreCase(t.getStatus())) { // 修改这里:将updateById的结果包装成Mono return Mono.fromSupplier(() -> keywordTaskService.updateById(t)) .thenReturn(t); } return Mono.just(t); }) .onErrorResume(e -> { log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage()); task.setStatus("error"); // 修改这里:将updateById的结果包装成Mono return Mono.fromSupplier(() -> keywordTaskService.updateById(task)) .thenReturn(task); }); } }