From faa95a5b183a42a6c3fcf1d6a41d81caa33da3bc Mon Sep 17 00:00:00 2001 From: guyue <1721849008@qq.com> Date: 星期三, 30 七月 2025 17:42:00 +0800 Subject: [PATCH] 修改定时器抢占 --- src/main/java/com/linghu/timeTask/ScheduledTasks.java | 73 ++++++++++++++++++++++++++++++++---- 1 files changed, 64 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/linghu/timeTask/ScheduledTasks.java b/src/main/java/com/linghu/timeTask/ScheduledTasks.java index 4e00c6a..9312445 100644 --- a/src/main/java/com/linghu/timeTask/ScheduledTasks.java +++ b/src/main/java/com/linghu/timeTask/ScheduledTasks.java @@ -6,8 +6,10 @@ import com.linghu.controller.CollectController; import com.linghu.model.dto.TaskResultResponse; import com.linghu.model.dto.TaskStatusResponse; +import com.linghu.model.entity.Keyword; import com.linghu.model.entity.KeywordTask; import com.linghu.model.entity.Question; +import com.linghu.service.KeywordService; import com.linghu.service.KeywordTaskService; import com.linghu.service.QuestionService; import lombok.extern.slf4j.Slf4j; @@ -25,6 +27,7 @@ import java.time.Duration; import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.stream.Collectors; @Component @@ -40,6 +43,8 @@ private WebClient webClient; // 假设已配置WebClient @Autowired private QuestionService questionService; + @Autowired + private KeywordService keywordService; private volatile boolean isHealthy = true; // 健康状态标识 private volatile boolean initialCheckComplete = false; // 初始检查完成标志 @@ -159,9 +164,26 @@ List<KeywordTask> tasks = keywordTaskService.list(queryWrapper); log.info("查询到 {} 个待处理任务", tasks.size()); + // 先标记成 processing,避免下一轮又被调度发现 + if (!tasks.isEmpty()) { + // 提取查询到的任务id列表 + List<Integer> taskIds = tasks.stream() + .map(KeywordTask::getId) // 假设任务有唯一id字段 + .collect(Collectors.toList()); + + // 批量更新:仅更新“id在查询列表中”且“状态仍为pending”的任务 + boolean updatedCount = keywordTaskService.update( + new LambdaUpdateWrapper<KeywordTask>() + .in(KeywordTask::getId, taskIds) // 限定为查询到的任务 + .eq(KeywordTask::getStatus, "pending") // 确保状态未被其他进程修改 + .set(KeywordTask::getStatus, "processing") + ); + log.info("成功标记 {} 个任务为processing(查询到{}个)", updatedCount, tasks.size()); + } for (KeywordTask task : tasks) { if (task.getTask_id() != null) { + processTaskStatus(task) .subscribeOn(Schedulers.boundedElastic()) .subscribe( @@ -216,13 +238,19 @@ && !"running".equalsIgnoreCase(statusResponse.getStatus()) && !"Error".equalsIgnoreCase(statusResponse.getStatus())) { task.setStatus("false"); - return Mono.just(task); + // 新增:处理status为false时的关键词状态更新 + return updateKeywordStatusWhenTaskFinished(task) + .then(Mono.just(task)); }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) { + // 改回 pending,进行下一轮查询 + task.setStatus("pending"); //更新每个提问词的状态 return updateQuestionStatus(task, statusResponse); // 抽取为独立方法 - }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&"任务不存在".equalsIgnoreCase(statusResponse.getMessage())){ + }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){ task.setStatus("nonentity"); - return Mono.just(task); + // 更新关键词状态 + return updateKeywordStatusWhenTaskFinished(task) + .then(Mono.just(task)); } else { // 任务仍在进行中,不更新状态 @@ -231,13 +259,11 @@ }) .switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务 .flatMap(t -> { - if (!"pending".equalsIgnoreCase(t.getStatus())) { - // 修改这里:将updateById的结果包装成Mono - return Mono.fromSupplier(() -> keywordTaskService.updateById(t)) - .thenReturn(t); - } - return Mono.just(t); + // 修改这里:将updateById的结果包装成Mono + return Mono.fromSupplier(() -> keywordTaskService.updateById(t)) + .thenReturn(t); + }) .onErrorResume(e -> { log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage()); @@ -273,4 +299,33 @@ }) .then(Mono.just(task)); } + /** + * 当任务状态为false/nonentity时,更新关键词状态 + */ + private Mono<Void> updateKeywordStatusWhenTaskFinished(KeywordTask task) { + return Mono.fromSupplier(() -> { + Keyword keyword = keywordService.getById(task.getKeyword_id()); + LambdaQueryWrapper<KeywordTask> keywordTaskWrapper = new LambdaQueryWrapper<>(); + keywordTaskWrapper.eq(KeywordTask::getKeyword_id, keyword.getKeyword_id()); + return keywordTaskService.list(keywordTaskWrapper); + }).flatMap(keywordTasks -> { + // 检查所有关联任务是否都已完成(包括各种结束状态) + boolean allCompleted = keywordTasks.stream().allMatch(t -> + "completed".equals(t.getStatus()) || + "false".equals(t.getStatus()) || + "cancelled".equals(t.getStatus()) || + "canceled".equals(t.getStatus()) || + "nonentity".equals(t.getStatus()) + ); + + if (allCompleted) { + Keyword keyword = keywordService.getById(task.getKeyword_id()); + keyword.setStatus("completed"); + // + return Mono.fromSupplier(() -> keywordService.updateById(keyword)) + .then(); // 转换为Mono<Void> + } + return Mono.empty(); + }); + } } \ No newline at end of file -- Gitblit v1.7.1