guyue
2025-08-12 a271ed5a232236383bc96c8c1e380a5c91dc1c3c
src/main/java/com/linghu/timeTask/ScheduledTasks.java
@@ -3,6 +3,7 @@
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.linghu.config.FinalStatus;
import com.linghu.controller.CollectController;
import com.linghu.model.dto.TaskResultResponse;
import com.linghu.model.dto.TaskStatusResponse;
@@ -158,7 +159,7 @@
        try {
            // 查询所有状态为pending的任务
            LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
            queryWrapper.eq(KeywordTask::getStatus, "pending");
            queryWrapper.eq(KeywordTask::getStatus, FinalStatus.PENDING.getValue());
            List<KeywordTask> tasks = keywordTaskService.list(queryWrapper);
            log.info("查询到 {} 个待处理任务", tasks.size());
@@ -173,7 +174,7 @@
                boolean updatedCount = keywordTaskService.update(
                        new LambdaUpdateWrapper<KeywordTask>()
                                .in(KeywordTask::getId, taskIds) // 限定为查询到的任务
                                .eq(KeywordTask::getStatus, "pending") // 确保状态未被其他进程修改
                                .eq(KeywordTask::getStatus, FinalStatus.PENDING.getValue()) // 确保状态未被其他进程修改
                                .set(KeywordTask::getStatus, "processing")
                );
                log.info("成功标记 {} 个任务为processing(查询到{}个)", updatedCount, tasks.size());
@@ -201,29 +202,29 @@
    private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
        return collectController.getTaskStatus(task.getTask_id())
                .flatMap(statusResponse -> {
                    if ("completed".equalsIgnoreCase(statusResponse.getStatus())) {
                    if (FinalStatus.COMPLETED.getValue().equalsIgnoreCase(statusResponse.getStatus())) {
                        log.info("任务 {} 已完成,获取结果", task.getTask_id());
                        return collectController.getTaskResult(task.getTask_id())
                                .doOnSuccess(result -> log.info("获取任务 {} 结果成功", task.getTask_id()))
                                .thenReturn(task)
                                .map(t -> {
                                    t.setStatus("completed");
                                    t.setStatus(FinalStatus.COMPLETED.getValue());
                                    return t;
                                });
                    } else if (!"submitted".equalsIgnoreCase(statusResponse.getStatus())
                            && !"running".equalsIgnoreCase(statusResponse.getStatus())
                            && !"Error".equalsIgnoreCase(statusResponse.getStatus())) {
                        task.setStatus("false");
                    } else if (!FinalStatus.SUBMITTED.getValue().equalsIgnoreCase(statusResponse.getStatus())
                            && !FinalStatus.RUNNING.getValue().equalsIgnoreCase(statusResponse.getStatus())
                            && !FinalStatus.ERROR.getValue().equalsIgnoreCase(statusResponse.getStatus())) {
                        task.setStatus(FinalStatus.FALSE.getValue());
                        // 新增:处理status为false时的关键词状态更新
                        return updateKeywordStatusWhenTaskFinished(task)
                                .then(Mono.just(task));
                    }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) {
                    }else if ( FinalStatus.RUNNING.getValue().equalsIgnoreCase(statusResponse.getStatus())) {
                        // 改回 pending,进行下一轮查询
                        task.setStatus("pending");
                        task.setStatus(FinalStatus.PENDING.getValue());
                        //更新每个提问词的状态
                        return updateQuestionStatus(task, statusResponse); // 抽取为独立方法
                    }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){
                        task.setStatus("nonentity");
                    }else if(FinalStatus.ERROR.getValue().equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){
                        task.setStatus(FinalStatus.NONENTITY.getValue());
                        // 更新关键词状态
                        return updateKeywordStatusWhenTaskFinished(task)
                                .then(Mono.just(task));
@@ -243,7 +244,7 @@
                })
                .onErrorResume(e -> {
                    log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage());
                    task.setStatus("error");
                    task.setStatus(FinalStatus.ERROR.getValue());
                    // 将updateById的结果包装成Mono
                    return Mono.fromSupplier(() -> keywordTaskService.updateById(task))
@@ -287,16 +288,15 @@
        }).flatMap(keywordTasks -> {
            // 检查所有关联任务是否都已完成(包括各种结束状态)
            boolean allCompleted = keywordTasks.stream().allMatch(t ->
                    "completed".equals(t.getStatus()) ||
                            "false".equals(t.getStatus()) ||
                            "cancelled".equals(t.getStatus()) ||
                            "canceled".equals(t.getStatus()) ||
                            "nonentity".equals(t.getStatus())
                    FinalStatus.COMPLETED.getValue().equals(t.getStatus()) ||
                            FinalStatus.FALSE.getValue().equals(t.getStatus()) ||
                            FinalStatus.CANCELLED.getValue().equals(t.getStatus()) ||
                            FinalStatus.NONENTITY.getValue().equals(t.getStatus())
            );
            if (allCompleted) {
                Keyword keyword = keywordService.getById(task.getKeyword_id());
                keyword.setStatus("completed");
                keyword.setStatus(FinalStatus.COMPLETED.getValue());
                //
                return Mono.fromSupplier(() -> keywordService.updateById(keyword))
                        .then(); // 转换为Mono<Void>