| | |
| | | import java.time.Duration; |
| | | import java.time.format.DateTimeFormatter; |
| | | import java.util.List; |
| | | import java.util.stream.Collectors; |
| | | |
| | | |
| | | @Component |
| | |
| | | |
| | | List<KeywordTask> tasks = keywordTaskService.list(queryWrapper); |
| | | log.info("查询到 {} 个待处理任务", tasks.size()); |
| | | // 先标记成 processing,避免下一轮又被调度发现 |
| | | if (!tasks.isEmpty()) { |
| | | // 提取查询到的任务id列表 |
| | | List<Integer> taskIds = tasks.stream() |
| | | .map(KeywordTask::getId) // 假设任务有唯一id字段 |
| | | .collect(Collectors.toList()); |
| | | |
| | | // 批量更新:仅更新“id在查询列表中”且“状态仍为pending”的任务 |
| | | boolean updatedCount = keywordTaskService.update( |
| | | new LambdaUpdateWrapper<KeywordTask>() |
| | | .in(KeywordTask::getId, taskIds) // 限定为查询到的任务 |
| | | .eq(KeywordTask::getStatus, "pending") // 确保状态未被其他进程修改 |
| | | .set(KeywordTask::getStatus, "processing") |
| | | ); |
| | | log.info("成功标记 {} 个任务为processing(查询到{}个)", updatedCount, tasks.size()); |
| | | } |
| | | |
| | | for (KeywordTask task : tasks) { |
| | | if (task.getTask_id() != null) { |
| | | |
| | | processTaskStatus(task) |
| | | .subscribeOn(Schedulers.boundedElastic()) |
| | | .subscribe( |
| | |
| | | return updateKeywordStatusWhenTaskFinished(task) |
| | | .then(Mono.just(task)); |
| | | }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) { |
| | | // 改回 pending,进行下一轮查询 |
| | | task.setStatus("pending"); |
| | | //更新每个提问词的状态 |
| | | return updateQuestionStatus(task, statusResponse); // 抽取为独立方法 |
| | | }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){ |
| | |
| | | }) |
| | | .switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务 |
| | | .flatMap(t -> { |
| | | if (!"pending".equalsIgnoreCase(t.getStatus())) { |
| | | |
| | | // 修改这里:将updateById的结果包装成Mono |
| | | return Mono.fromSupplier(() -> keywordTaskService.updateById(t)) |
| | | .thenReturn(t); |
| | | } |
| | | return Mono.just(t); |
| | | |
| | | }) |
| | | .onErrorResume(e -> { |
| | | log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage()); |