| | |
| | | import com.linghu.controller.CollectController; |
| | | import com.linghu.model.dto.TaskResultResponse; |
| | | import com.linghu.model.dto.TaskStatusResponse; |
| | | import com.linghu.model.entity.Keyword; |
| | | import com.linghu.model.entity.KeywordTask; |
| | | import com.linghu.model.entity.Question; |
| | | import com.linghu.service.KeywordService; |
| | | import com.linghu.service.KeywordTaskService; |
| | | import com.linghu.service.QuestionService; |
| | | import lombok.extern.slf4j.Slf4j; |
| | |
| | | private WebClient webClient; // 假设已配置WebClient |
| | | @Autowired |
| | | private QuestionService questionService; |
| | | @Autowired |
| | | private KeywordService keywordService; |
| | | |
| | | private volatile boolean isHealthy = true; // 健康状态标识 |
| | | private volatile boolean initialCheckComplete = false; // 初始检查完成标志 |
| | |
| | | && !"running".equalsIgnoreCase(statusResponse.getStatus()) |
| | | && !"Error".equalsIgnoreCase(statusResponse.getStatus())) { |
| | | task.setStatus("false"); |
| | | return Mono.just(task); |
| | | // 新增:处理status为false时的关键词状态更新 |
| | | return updateKeywordStatusWhenTaskFinished(task) |
| | | .then(Mono.just(task)); |
| | | }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) { |
| | | //更新每个提问词的状态 |
| | | return updateQuestionStatus(task, statusResponse); // 抽取为独立方法 |
| | | }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&"任务不存在".equalsIgnoreCase(statusResponse.getMessage())){ |
| | | }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){ |
| | | task.setStatus("nonentity"); |
| | | return Mono.just(task); |
| | | // 更新关键词状态 |
| | | return updateKeywordStatusWhenTaskFinished(task) |
| | | .then(Mono.just(task)); |
| | | } |
| | | else { |
| | | // 任务仍在进行中,不更新状态 |
| | |
| | | }) |
| | | .then(Mono.just(task)); |
| | | } |
| | | /** |
| | | * 当任务状态为false/nonentity时,更新关键词状态 |
| | | */ |
| | | private Mono<Void> updateKeywordStatusWhenTaskFinished(KeywordTask task) { |
| | | return Mono.fromSupplier(() -> { |
| | | Keyword keyword = keywordService.getById(task.getKeyword_id()); |
| | | LambdaQueryWrapper<KeywordTask> keywordTaskWrapper = new LambdaQueryWrapper<>(); |
| | | keywordTaskWrapper.eq(KeywordTask::getKeyword_id, keyword.getKeyword_id()); |
| | | return keywordTaskService.list(keywordTaskWrapper); |
| | | }).flatMap(keywordTasks -> { |
| | | // 检查所有关联任务是否都已完成(包括各种结束状态) |
| | | boolean allCompleted = keywordTasks.stream().allMatch(t -> |
| | | "completed".equals(t.getStatus()) || |
| | | "false".equals(t.getStatus()) || |
| | | "cancelled".equals(t.getStatus()) || |
| | | "canceled".equals(t.getStatus()) || |
| | | "nonentity".equals(t.getStatus()) |
| | | ); |
| | | |
| | | if (allCompleted) { |
| | | Keyword keyword = keywordService.getById(task.getKeyword_id()); |
| | | keyword.setStatus("completed"); |
| | | // |
| | | return Mono.fromSupplier(() -> keywordService.updateById(keyword)) |
| | | .then(); // 转换为Mono<Void> |
| | | } |
| | | return Mono.empty(); |
| | | }); |
| | | } |
| | | } |