guyue
2025-07-21 ad835011afaf88624e5b5f27b248c6b1089b7d8a
src/main/java/com/linghu/timeTask/ScheduledTasks.java
@@ -2,11 +2,14 @@
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.linghu.controller.CollectController;
import com.linghu.model.dto.TaskResultResponse;
import com.linghu.model.dto.TaskStatusResponse;
import com.linghu.model.entity.KeywordTask;
import com.linghu.model.entity.Question;
import com.linghu.service.KeywordTaskService;
import com.linghu.service.QuestionService;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,18 +34,17 @@
@Component
@Slf4j
public class ScheduledTasks {
//    private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class);
    private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
    @Autowired
    private KeywordTaskService keywordTaskService;
    @Autowired
    private CollectController collectController;
    @Autowired
    private WebClient webClient; // 假设已配置WebClient
    @Autowired
    private QuestionService questionService;
    private String baseUrl = "http://thirdparty-service"; // 第三方服务基础URL
    private ScheduledFuture<?> scheduledTask; // 定时任务引用
    // 健康检查专用调度器(单线程足够)
    private final ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor();
@@ -161,23 +163,7 @@
                        )
                );
    }
//    @Scheduled(fixedRate = 10000) // 每5秒执行一次
//    public void scheduleFixedRateTask() {
//        // 查询所有状态为pending的任务
//        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
//        queryWrapper.eq(KeywordTask::getStatus, "pending");
//
//        keywordTaskService.list(queryWrapper)
//                .stream()
//                .filter(task -> task.getTask_id() != null)
//                .forEach(task -> processTaskStatus(task)
//                        .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行
//                        .subscribe(
//                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
//                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
//                        )
//                );
//    }
    private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
        return collectController.getTaskStatus(task.getTask_id())
@@ -196,8 +182,11 @@
                            && !"Error".equalsIgnoreCase(statusResponse.getStatus())) {
                        task.setStatus("false");
                        return Mono.just(task);
                    }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) {
                        //更新每个提问词的状态
                        return updateQuestionStatus(task, statusResponse); // 抽取为独立方法
                    }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&"任务不存在".equalsIgnoreCase(statusResponse.getMessage())){
                        task.setStatus("false");
                        task.setStatus("nonentity");
                        return Mono.just(task);
                    }
                    else {
@@ -219,9 +208,34 @@
                    log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage());
                    task.setStatus("error");
                    // 修改这里:将updateById的结果包装成Mono
                    // 将updateById的结果包装成Mono
                    return Mono.fromSupplier(() -> keywordTaskService.updateById(task))
                            .thenReturn(task);
                });
    }
    private Mono<KeywordTask> updateQuestionStatus(KeywordTask task, TaskStatusResponse statusResponse) {
        // 1. 先执行同步查询,获取 List<Question>
        List<Question> questions = questionService.lambdaQuery()
                .eq(Question::getKeyword_id, task.getKeyword_id())
                .list();
        // 2. 将 List 转为 Flux,再进行响应式处理
        return Flux.fromIterable(questions)
                .flatMap(question -> {
                    // 更新逻辑...
                    String newStatus = statusResponse.getQuestions_status().stream()
                            .filter(qs -> qs.getQuestion().equals(question.getQuestion()))
                            .findFirst()
                            .map(qs -> qs.getStatus())
                            .orElse(question.getStatus());
                    question.setStatus(newStatus);
                    return Mono.fromSupplier(() -> questionService.updateById(question))
                            // 此时可以调用 doOnError 处理异常
                            .doOnError(e -> log.error("更新 Question {} 失败: {}", question.getQuestion_id(), e.getMessage()))
                            .onErrorReturn(false); // 异常时返回 false
                })
                .then(Mono.just(task));
    }
}