| | |
| | | |
| | | import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
| | | import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; |
| | | import com.linghu.config.FinalStatus; |
| | | import com.linghu.controller.CollectController; |
| | | import com.linghu.model.dto.TaskResultResponse; |
| | | import com.linghu.model.dto.TaskStatusResponse; |
| | |
| | | try { |
| | | // 查询所有状态为pending的任务 |
| | | LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); |
| | | queryWrapper.eq(KeywordTask::getStatus, "pending"); |
| | | queryWrapper.eq(KeywordTask::getStatus, FinalStatus.PENDING.getValue()); |
| | | |
| | | List<KeywordTask> tasks = keywordTaskService.list(queryWrapper); |
| | | log.info("查询到 {} 个待处理任务", tasks.size()); |
| | |
| | | boolean updatedCount = keywordTaskService.update( |
| | | new LambdaUpdateWrapper<KeywordTask>() |
| | | .in(KeywordTask::getId, taskIds) // 限定为查询到的任务 |
| | | .eq(KeywordTask::getStatus, "pending") // 确保状态未被其他进程修改 |
| | | .eq(KeywordTask::getStatus, FinalStatus.PENDING.getValue()) // 确保状态未被其他进程修改 |
| | | .set(KeywordTask::getStatus, "processing") |
| | | ); |
| | | log.info("成功标记 {} 个任务为processing(查询到{}个)", updatedCount, tasks.size()); |
| | |
| | | private Mono<KeywordTask> processTaskStatus(KeywordTask task) { |
| | | return collectController.getTaskStatus(task.getTask_id()) |
| | | .flatMap(statusResponse -> { |
| | | if ("completed".equalsIgnoreCase(statusResponse.getStatus())) { |
| | | if (FinalStatus.COMPLETED.getValue().equalsIgnoreCase(statusResponse.getStatus())) { |
| | | log.info("任务 {} 已完成,获取结果", task.getTask_id()); |
| | | return collectController.getTaskResult(task.getTask_id()) |
| | | .doOnSuccess(result -> log.info("获取任务 {} 结果成功", task.getTask_id())) |
| | | .thenReturn(task) |
| | | .map(t -> { |
| | | t.setStatus("completed"); |
| | | t.setStatus(FinalStatus.COMPLETED.getValue()); |
| | | return t; |
| | | }); |
| | | } else if (!"submitted".equalsIgnoreCase(statusResponse.getStatus()) |
| | | && !"running".equalsIgnoreCase(statusResponse.getStatus()) |
| | | && !"Error".equalsIgnoreCase(statusResponse.getStatus())) { |
| | | task.setStatus("false"); |
| | | } else if (!FinalStatus.SUBMITTED.getValue().equalsIgnoreCase(statusResponse.getStatus()) |
| | | && !FinalStatus.RUNNING.getValue().equalsIgnoreCase(statusResponse.getStatus()) |
| | | && !FinalStatus.ERROR.getValue().equalsIgnoreCase(statusResponse.getStatus())) { |
| | | task.setStatus(FinalStatus.FALSE.getValue()); |
| | | // 新增:处理status为false时的关键词状态更新 |
| | | return updateKeywordStatusWhenTaskFinished(task) |
| | | .then(Mono.just(task)); |
| | | }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) { |
| | | }else if ( FinalStatus.RUNNING.getValue().equalsIgnoreCase(statusResponse.getStatus())) { |
| | | // 改回 pending,进行下一轮查询 |
| | | task.setStatus("pending"); |
| | | task.setStatus(FinalStatus.PENDING.getValue()); |
| | | //更新每个提问词的状态 |
| | | return updateQuestionStatus(task, statusResponse); // 抽取为独立方法 |
| | | }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){ |
| | | task.setStatus("nonentity"); |
| | | }else if(FinalStatus.ERROR.getValue().equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){ |
| | | task.setStatus(FinalStatus.NONENTITY.getValue()); |
| | | // 更新关键词状态 |
| | | return updateKeywordStatusWhenTaskFinished(task) |
| | | .then(Mono.just(task)); |
| | |
| | | }) |
| | | .onErrorResume(e -> { |
| | | log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage()); |
| | | task.setStatus("error"); |
| | | task.setStatus(FinalStatus.ERROR.getValue()); |
| | | |
| | | // 将updateById的结果包装成Mono |
| | | return Mono.fromSupplier(() -> keywordTaskService.updateById(task)) |
| | |
| | | }).flatMap(keywordTasks -> { |
| | | // 检查所有关联任务是否都已完成(包括各种结束状态) |
| | | boolean allCompleted = keywordTasks.stream().allMatch(t -> |
| | | "completed".equals(t.getStatus()) || |
| | | "false".equals(t.getStatus()) || |
| | | "cancelled".equals(t.getStatus()) || |
| | | "canceled".equals(t.getStatus()) || |
| | | "nonentity".equals(t.getStatus()) |
| | | FinalStatus.COMPLETED.getValue().equals(t.getStatus()) || |
| | | FinalStatus.FALSE.getValue().equals(t.getStatus()) || |
| | | FinalStatus.CANCELLED.getValue().equals(t.getStatus()) || |
| | | FinalStatus.NONENTITY.getValue().equals(t.getStatus()) |
| | | ); |
| | | |
| | | if (allCompleted) { |
| | | Keyword keyword = keywordService.getById(task.getKeyword_id()); |
| | | keyword.setStatus("completed"); |
| | | keyword.setStatus(FinalStatus.COMPLETED.getValue()); |
| | | // |
| | | return Mono.fromSupplier(() -> keywordService.updateById(keyword)) |
| | | .then(); // 转换为Mono<Void> |