guyue
8 天以前 a9e958ce3675c4950ceddd3fd6f939cdf0d2bc5a
src/main/java/com/linghu/timeTask/ScheduledTasks.java
@@ -6,8 +6,10 @@
import com.linghu.controller.CollectController;
import com.linghu.model.dto.TaskResultResponse;
import com.linghu.model.dto.TaskStatusResponse;
import com.linghu.model.entity.Keyword;
import com.linghu.model.entity.KeywordTask;
import com.linghu.model.entity.Question;
import com.linghu.service.KeywordService;
import com.linghu.service.KeywordTaskService;
import com.linghu.service.QuestionService;
import lombok.extern.slf4j.Slf4j;
@@ -40,6 +42,8 @@
    private WebClient webClient; // 假设已配置WebClient
    @Autowired
    private QuestionService questionService;
    @Autowired
    private KeywordService keywordService;
    private volatile boolean isHealthy = true; // 健康状态标识
    private volatile boolean initialCheckComplete = false; // 初始检查完成标志
@@ -216,13 +220,17 @@
                            && !"running".equalsIgnoreCase(statusResponse.getStatus())
                            && !"Error".equalsIgnoreCase(statusResponse.getStatus())) {
                        task.setStatus("false");
                        return Mono.just(task);
                        // 新增:处理status为false时的关键词状态更新
                        return updateKeywordStatusWhenTaskFinished(task)
                                .then(Mono.just(task));
                    }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) {
                        //更新每个提问词的状态
                        return updateQuestionStatus(task, statusResponse); // 抽取为独立方法
                    }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&"任务不存在".equalsIgnoreCase(statusResponse.getMessage())){
                    }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){
                        task.setStatus("nonentity");
                        return Mono.just(task);
                        // 更新关键词状态
                        return updateKeywordStatusWhenTaskFinished(task)
                                .then(Mono.just(task));
                    }
                    else {
                        // 任务仍在进行中,不更新状态
@@ -273,4 +281,33 @@
                })
                .then(Mono.just(task));
    }
    /**
     * 当任务状态为false/nonentity时,更新关键词状态
     */
    private Mono<Void> updateKeywordStatusWhenTaskFinished(KeywordTask task) {
        return Mono.fromSupplier(() -> {
            Keyword keyword = keywordService.getById(task.getKeyword_id());
            LambdaQueryWrapper<KeywordTask> keywordTaskWrapper = new LambdaQueryWrapper<>();
            keywordTaskWrapper.eq(KeywordTask::getKeyword_id, keyword.getKeyword_id());
            return keywordTaskService.list(keywordTaskWrapper);
        }).flatMap(keywordTasks -> {
            // 检查所有关联任务是否都已完成(包括各种结束状态)
            boolean allCompleted = keywordTasks.stream().allMatch(t ->
                    "completed".equals(t.getStatus()) ||
                            "false".equals(t.getStatus()) ||
                            "cancelled".equals(t.getStatus()) ||
                            "canceled".equals(t.getStatus()) ||
                            "nonentity".equals(t.getStatus())
            );
            if (allCompleted) {
                Keyword keyword = keywordService.getById(task.getKeyword_id());
                keyword.setStatus("completed");
                //
                return Mono.fromSupplier(() -> keywordService.updateById(keyword))
                        .then(); // 转换为Mono<Void>
            }
            return Mono.empty();
        });
    }
}