guyue
2025-09-03 dd028e18a12ad9ae7c43ed09b15ddd6bde1a43e9
src/main/java/com/linghu/timeTask/ScheduledTasks.java
@@ -3,6 +3,7 @@
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.linghu.config.FinalStatus;
import com.linghu.controller.CollectController;
import com.linghu.model.dto.TaskResultResponse;
import com.linghu.model.dto.TaskStatusResponse;
@@ -39,8 +40,6 @@
    private KeywordTaskService keywordTaskService;
    @Autowired
    private CollectController collectController;
    @Autowired
    private WebClient webClient; // 假设已配置WebClient
    @Autowired
    private QuestionService questionService;
    @Autowired
@@ -160,7 +159,7 @@
        try {
            // 查询所有状态为pending的任务
            LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
            queryWrapper.eq(KeywordTask::getStatus, "pending");
            queryWrapper.eq(KeywordTask::getStatus, FinalStatus.PENDING.getValue());
            List<KeywordTask> tasks = keywordTaskService.list(queryWrapper);
            log.info("查询到 {} 个待处理任务", tasks.size());
@@ -175,7 +174,7 @@
                boolean updatedCount = keywordTaskService.update(
                        new LambdaUpdateWrapper<KeywordTask>()
                                .in(KeywordTask::getId, taskIds) // 限定为查询到的任务
                                .eq(KeywordTask::getStatus, "pending") // 确保状态未被其他进程修改
                                .eq(KeywordTask::getStatus, FinalStatus.PENDING.getValue()) // 确保状态未被其他进程修改
                                .set(KeywordTask::getStatus, "processing")
                );
                log.info("成功标记 {} 个任务为processing(查询到{}个)", updatedCount, tasks.size());
@@ -197,57 +196,35 @@
        }
    }
    /**
     * 实际的任务处理方法,替代原@Scheduled注解方法
     * 实际的任务处理方法,
     */
//    public void executeTaskProcessing() {
//        if (!isHealthy) {
//            log.debug("系统不健康,跳过任务处理");
//            return;
//        }
//
//        // 查询所有状态为pending的任务
//        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
//        queryWrapper.eq(KeywordTask::getStatus, "pending");
//
//        keywordTaskService.list(queryWrapper)
//                .stream()
//                .filter(task -> task.getTask_id() != null)
//                .forEach(task -> processTaskStatus(task)
//                        .subscribeOn(Schedulers.boundedElastic())
//                        .subscribe(
//                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
//                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
//                        )
//                );
//    }
    private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
        return collectController.getTaskStatus(task.getTask_id())
                .flatMap(statusResponse -> {
                    if ("completed".equalsIgnoreCase(statusResponse.getStatus())) {
                    if (FinalStatus.COMPLETED.getValue().equalsIgnoreCase(statusResponse.getStatus())) {
                        log.info("任务 {} 已完成,获取结果", task.getTask_id());
                        return collectController.getTaskResult(task.getTask_id())
                                .doOnSuccess(result -> log.info("获取任务 {} 结果成功", task.getTask_id()))
                                .thenReturn(task)
                                .map(t -> {
                                    t.setStatus("completed");
                                    t.setStatus(FinalStatus.COMPLETED.getValue());
                                    return t;
                                });
                    } else if (!"submitted".equalsIgnoreCase(statusResponse.getStatus())
                            && !"running".equalsIgnoreCase(statusResponse.getStatus())
                            && !"Error".equalsIgnoreCase(statusResponse.getStatus())) {
                        task.setStatus("false");
                    } else if (!FinalStatus.SUBMITTED.getValue().equalsIgnoreCase(statusResponse.getStatus())
                            && !FinalStatus.RUNNING.getValue().equalsIgnoreCase(statusResponse.getStatus())
                            && !FinalStatus.ERROR.getValue().equalsIgnoreCase(statusResponse.getStatus())) {
                        task.setStatus(FinalStatus.FALSE.getValue());
                        // 新增:处理status为false时的关键词状态更新
                        return updateKeywordStatusWhenTaskFinished(task)
                                .then(Mono.just(task));
                    }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) {
                    }else if ( FinalStatus.RUNNING.getValue().equalsIgnoreCase(statusResponse.getStatus())) {
                        // 改回 pending,进行下一轮查询
                        task.setStatus("pending");
                        task.setStatus(FinalStatus.PENDING.getValue());
                        //更新每个提问词的状态
                        return updateQuestionStatus(task, statusResponse); // 抽取为独立方法
                    }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){
                        task.setStatus("nonentity");
                    }else if(FinalStatus.ERROR.getValue().equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){
                        task.setStatus(FinalStatus.NONENTITY.getValue());
                        // 更新关键词状态
                        return updateKeywordStatusWhenTaskFinished(task)
                                .then(Mono.just(task));
@@ -267,7 +244,7 @@
                })
                .onErrorResume(e -> {
                    log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage());
                    task.setStatus("error");
                    task.setStatus(FinalStatus.ERROR.getValue());
                    // 将updateById的结果包装成Mono
                    return Mono.fromSupplier(() -> keywordTaskService.updateById(task))
@@ -311,16 +288,15 @@
        }).flatMap(keywordTasks -> {
            // 检查所有关联任务是否都已完成(包括各种结束状态)
            boolean allCompleted = keywordTasks.stream().allMatch(t ->
                    "completed".equals(t.getStatus()) ||
                            "false".equals(t.getStatus()) ||
                            "cancelled".equals(t.getStatus()) ||
                            "canceled".equals(t.getStatus()) ||
                            "nonentity".equals(t.getStatus())
                    FinalStatus.COMPLETED.getValue().equals(t.getStatus()) ||
                            FinalStatus.FALSE.getValue().equals(t.getStatus()) ||
                            FinalStatus.CANCELLED.getValue().equals(t.getStatus()) ||
                            FinalStatus.NONENTITY.getValue().equals(t.getStatus())
            );
            if (allCompleted) {
                Keyword keyword = keywordService.getById(task.getKeyword_id());
                keyword.setStatus("completed");
                keyword.setStatus(FinalStatus.COMPLETED.getValue());
                //
                return Mono.fromSupplier(() -> keywordService.updateById(keyword))
                        .then(); // 转换为Mono<Void>