From faa95a5b183a42a6c3fcf1d6a41d81caa33da3bc Mon Sep 17 00:00:00 2001
From: guyue <1721849008@qq.com>
Date: 星期三, 30 七月 2025 17:42:00 +0800
Subject: [PATCH] 修改定时器抢占

---
 src/main/java/com/linghu/controller/CollectController.java |   24 +++++++++++++++++++++++-
 src/main/java/com/linghu/timeTask/ScheduledTasks.java      |   30 ++++++++++++++++++++++++------
 2 files changed, 47 insertions(+), 7 deletions(-)

diff --git a/src/main/java/com/linghu/controller/CollectController.java b/src/main/java/com/linghu/controller/CollectController.java
index 0fd9d17..0200878 100644
--- a/src/main/java/com/linghu/controller/CollectController.java
+++ b/src/main/java/com/linghu/controller/CollectController.java
@@ -1183,6 +1183,22 @@
             log.info("无有效引用数据,跳过批量保存");
             return;
         }
+//        // 在合并到 compositeKeyMap 之前
+//        validReferences = validReferences.stream()
+//                .collect(Collectors.toMap(
+//                        ref -> ref.getQuestion_id()
+//                                + "|" + ref.getPlatform_id()
+//                                + "|" + ref.getType_id()
+//                                + "|" + ref.getUrl()
+//                                + "|" + ref.getDomain()
+//                                + "|" + ref.getKeyword_id()
+//                                + "|" + ref.getNum(),
+//                        ref -> ref,
+//                        (r1, r2) -> r1  // 碰到同 key 就保第一个
+//                ))
+//                .values()
+//                .stream()
+//                .collect(Collectors.toList());
 
         // 2. 查询数据库中已存在的引用并过滤 null
         LambdaQueryWrapper<Reference> dbRefWrapper = new LambdaQueryWrapper<>();
@@ -1250,7 +1266,13 @@
             log.warn("引用核心字段为空,跳过映射:{}", ref);
             return;
         }
-        String key = ref.getQuestion_id() + "|" + ref.getTitle() + "|" + ref.getUrl() + "|" + ref.getDomain() + "|"  + ref.getNum();
+        String key = ref.getQuestion_id() + "|"
+                + ref.getPlatform_id() + "|"
+                + ref.getType_id() + "|"
+                + ref.getTitle() + "|"
+                + ref.getUrl() + "|"
+                + ref.getDomain() + "|"
+                + ref.getNum();
         map.computeIfAbsent(key, k -> new ArrayList<>()).add(ref);
     }
 
diff --git a/src/main/java/com/linghu/timeTask/ScheduledTasks.java b/src/main/java/com/linghu/timeTask/ScheduledTasks.java
index cb6e4a3..9312445 100644
--- a/src/main/java/com/linghu/timeTask/ScheduledTasks.java
+++ b/src/main/java/com/linghu/timeTask/ScheduledTasks.java
@@ -27,6 +27,7 @@
 import java.time.Duration;
 import java.time.format.DateTimeFormatter;
 import java.util.List;
+import java.util.stream.Collectors;
 
 
 @Component
@@ -163,9 +164,26 @@
 
             List<KeywordTask> tasks = keywordTaskService.list(queryWrapper);
             log.info("查询到 {} 个待处理任务", tasks.size());
+            // 先标记成 processing,避免下一轮又被调度发现
+            if (!tasks.isEmpty()) {
+                // 提取查询到的任务id列表
+                List<Integer> taskIds = tasks.stream()
+                        .map(KeywordTask::getId) // 假设任务有唯一id字段
+                        .collect(Collectors.toList());
+
+                // 批量更新:仅更新“id在查询列表中”且“状态仍为pending”的任务
+                boolean updatedCount = keywordTaskService.update(
+                        new LambdaUpdateWrapper<KeywordTask>()
+                                .in(KeywordTask::getId, taskIds) // 限定为查询到的任务
+                                .eq(KeywordTask::getStatus, "pending") // 确保状态未被其他进程修改
+                                .set(KeywordTask::getStatus, "processing")
+                );
+                log.info("成功标记 {} 个任务为processing(查询到{}个)", updatedCount, tasks.size());
+            }
 
             for (KeywordTask task : tasks) {
                 if (task.getTask_id() != null) {
+
                     processTaskStatus(task)
                             .subscribeOn(Schedulers.boundedElastic())
                             .subscribe(
@@ -224,6 +242,8 @@
                         return updateKeywordStatusWhenTaskFinished(task)
                                 .then(Mono.just(task));
                     }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) {
+                        // 改回 pending,进行下一轮查询
+                        task.setStatus("pending");
                         //更新每个提问词的状态
                         return updateQuestionStatus(task, statusResponse); // 抽取为独立方法
                     }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){
@@ -239,13 +259,11 @@
                 })
                 .switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务
                 .flatMap(t -> {
-                    if (!"pending".equalsIgnoreCase(t.getStatus())) {
 
-                        // 修改这里:将updateById的结果包装成Mono
-                        return Mono.fromSupplier(() -> keywordTaskService.updateById(t))
-                                .thenReturn(t);
-                    }
-                    return Mono.just(t);
+                    // 修改这里:将updateById的结果包装成Mono
+                    return Mono.fromSupplier(() -> keywordTaskService.updateById(t))
+                            .thenReturn(t);
+
                 })
                 .onErrorResume(e -> {
                     log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage());

--
Gitblit v1.7.1