guyue
3 天以前 e298c9a3d7dc23624bf0b79a0bc18ebd1b8219ce
src/main/java/com/linghu/timeTask/ScheduledTasks.java
@@ -27,6 +27,7 @@
import java.time.Duration;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.stream.Collectors;
@Component
@@ -38,8 +39,6 @@
    private KeywordTaskService keywordTaskService;
    @Autowired
    private CollectController collectController;
    @Autowired
    private WebClient webClient; // 假设已配置WebClient
    @Autowired
    private QuestionService questionService;
    @Autowired
@@ -163,9 +162,26 @@
            List<KeywordTask> tasks = keywordTaskService.list(queryWrapper);
            log.info("查询到 {} 个待处理任务", tasks.size());
            // 先标记成 processing,避免下一轮又被调度发现
            if (!tasks.isEmpty()) {
                // 提取查询到的任务id列表
                List<Integer> taskIds = tasks.stream()
                        .map(KeywordTask::getId) // 假设任务有唯一id字段
                        .collect(Collectors.toList());
                // 批量更新:仅更新“id在查询列表中”且“状态仍为pending”的任务
                boolean updatedCount = keywordTaskService.update(
                        new LambdaUpdateWrapper<KeywordTask>()
                                .in(KeywordTask::getId, taskIds) // 限定为查询到的任务
                                .eq(KeywordTask::getStatus, "pending") // 确保状态未被其他进程修改
                                .set(KeywordTask::getStatus, "processing")
                );
                log.info("成功标记 {} 个任务为processing(查询到{}个)", updatedCount, tasks.size());
            }
            for (KeywordTask task : tasks) {
                if (task.getTask_id() != null) {
                    processTaskStatus(task)
                            .subscribeOn(Schedulers.boundedElastic())
                            .subscribe(
@@ -179,30 +195,8 @@
        }
    }
    /**
     * 实际的任务处理方法,替代原@Scheduled注解方法
     * 实际的任务处理方法,
     */
//    public void executeTaskProcessing() {
//        if (!isHealthy) {
//            log.debug("系统不健康,跳过任务处理");
//            return;
//        }
//
//        // 查询所有状态为pending的任务
//        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
//        queryWrapper.eq(KeywordTask::getStatus, "pending");
//
//        keywordTaskService.list(queryWrapper)
//                .stream()
//                .filter(task -> task.getTask_id() != null)
//                .forEach(task -> processTaskStatus(task)
//                        .subscribeOn(Schedulers.boundedElastic())
//                        .subscribe(
//                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
//                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
//                        )
//                );
//    }
    private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
        return collectController.getTaskStatus(task.getTask_id())
@@ -224,6 +218,8 @@
                        return updateKeywordStatusWhenTaskFinished(task)
                                .then(Mono.just(task));
                    }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) {
                        // 改回 pending,进行下一轮查询
                        task.setStatus("pending");
                        //更新每个提问词的状态
                        return updateQuestionStatus(task, statusResponse); // 抽取为独立方法
                    }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){
@@ -239,13 +235,11 @@
                })
                .switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务
                .flatMap(t -> {
                    if (!"pending".equalsIgnoreCase(t.getStatus())) {
                        // 修改这里:将updateById的结果包装成Mono
                        return Mono.fromSupplier(() -> keywordTaskService.updateById(t))
                                .thenReturn(t);
                    }
                    return Mono.just(t);
                    // 修改这里:将updateById的结果包装成Mono
                    return Mono.fromSupplier(() -> keywordTaskService.updateById(t))
                            .thenReturn(t);
                })
                .onErrorResume(e -> {
                    log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage());