From e298c9a3d7dc23624bf0b79a0bc18ebd1b8219ce Mon Sep 17 00:00:00 2001 From: guyue <1721849008@qq.com> Date: 星期三, 06 八月 2025 11:05:47 +0800 Subject: [PATCH] 删掉之前的注释 --- src/main/java/com/linghu/timeTask/ScheduledTasks.java | 56 +++++++++++++++++++++++++------------------------------- 1 files changed, 25 insertions(+), 31 deletions(-) diff --git a/src/main/java/com/linghu/timeTask/ScheduledTasks.java b/src/main/java/com/linghu/timeTask/ScheduledTasks.java index cb6e4a3..413d277 100644 --- a/src/main/java/com/linghu/timeTask/ScheduledTasks.java +++ b/src/main/java/com/linghu/timeTask/ScheduledTasks.java @@ -27,6 +27,7 @@ import java.time.Duration; import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.stream.Collectors; @Component @@ -38,8 +39,6 @@ private KeywordTaskService keywordTaskService; @Autowired private CollectController collectController; - @Autowired - private WebClient webClient; // 假设已配置WebClient @Autowired private QuestionService questionService; @Autowired @@ -163,9 +162,26 @@ List<KeywordTask> tasks = keywordTaskService.list(queryWrapper); log.info("查询到 {} 个待处理任务", tasks.size()); + // 先标记成 processing,避免下一轮又被调度发现 + if (!tasks.isEmpty()) { + // 提取查询到的任务id列表 + List<Integer> taskIds = tasks.stream() + .map(KeywordTask::getId) // 假设任务有唯一id字段 + .collect(Collectors.toList()); + + // 批量更新:仅更新“id在查询列表中”且“状态仍为pending”的任务 + boolean updatedCount = keywordTaskService.update( + new LambdaUpdateWrapper<KeywordTask>() + .in(KeywordTask::getId, taskIds) // 限定为查询到的任务 + .eq(KeywordTask::getStatus, "pending") // 确保状态未被其他进程修改 + .set(KeywordTask::getStatus, "processing") + ); + log.info("成功标记 {} 个任务为processing(查询到{}个)", updatedCount, tasks.size()); + } for (KeywordTask task : tasks) { if (task.getTask_id() != null) { + processTaskStatus(task) .subscribeOn(Schedulers.boundedElastic()) .subscribe( @@ -179,30 +195,8 @@ } } /** - * 实际的任务处理方法,替代原@Scheduled注解方法 + * 实际的任务处理方法, */ -// public void executeTaskProcessing() { -// if (!isHealthy) { -// log.debug("系统不健康,跳过任务处理"); -// return; -// } -// -// // 查询所有状态为pending的任务 -// LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); -// queryWrapper.eq(KeywordTask::getStatus, "pending"); -// -// keywordTaskService.list(queryWrapper) -// .stream() -// .filter(task -> task.getTask_id() != null) -// .forEach(task -> processTaskStatus(task) -// .subscribeOn(Schedulers.boundedElastic()) -// .subscribe( -// updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), -// error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) -// ) -// ); -// } - private Mono<KeywordTask> processTaskStatus(KeywordTask task) { return collectController.getTaskStatus(task.getTask_id()) @@ -224,6 +218,8 @@ return updateKeywordStatusWhenTaskFinished(task) .then(Mono.just(task)); }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) { + // 改回 pending,进行下一轮查询 + task.setStatus("pending"); //更新每个提问词的状态 return updateQuestionStatus(task, statusResponse); // 抽取为独立方法 }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){ @@ -239,13 +235,11 @@ }) .switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务 .flatMap(t -> { - if (!"pending".equalsIgnoreCase(t.getStatus())) { - // 修改这里:将updateById的结果包装成Mono - return Mono.fromSupplier(() -> keywordTaskService.updateById(t)) - .thenReturn(t); - } - return Mono.just(t); + // 修改这里:将updateById的结果包装成Mono + return Mono.fromSupplier(() -> keywordTaskService.updateById(t)) + .thenReturn(t); + }) .onErrorResume(e -> { log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage()); -- Gitblit v1.7.1