From dd028e18a12ad9ae7c43ed09b15ddd6bde1a43e9 Mon Sep 17 00:00:00 2001
From: guyue <1721849008@qq.com>
Date: 星期三, 03 九月 2025 11:27:50 +0800
Subject: [PATCH] 采集中状态修改提前,统计数据合并

---
 src/main/java/com/linghu/service/impl/CollectionServiceImpl.java |   31 ++++++++++++++++++-------------
 1 files changed, 18 insertions(+), 13 deletions(-)

diff --git a/src/main/java/com/linghu/service/impl/CollectionServiceImpl.java b/src/main/java/com/linghu/service/impl/CollectionServiceImpl.java
index 60953c6..772a0c1 100644
--- a/src/main/java/com/linghu/service/impl/CollectionServiceImpl.java
+++ b/src/main/java/com/linghu/service/impl/CollectionServiceImpl.java
@@ -30,7 +30,6 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
-
 @Slf4j
 @Service
 public class CollectionServiceImpl implements CollectionService {
@@ -85,7 +84,12 @@
 
                     // 将新的任务请求加入队列
                     taskQueue.add(searchTaskRequest);
+                    //更新状态为采集中
+                    LambdaUpdateWrapper<Keyword> updateWrapper = new LambdaUpdateWrapper<>();
+                    updateWrapper.eq(Keyword::getKeyword_id, searchTaskRequest.getKeyword_id());
+                    updateWrapper.set(Keyword::getStatus, FinalStatus.SUBMITTED.getValue());
 
+                    keywordService.update(updateWrapper);
                     // 如果当前没有任务在处理中,则启动任务队列的处理
                     if (!isProcessing) {
                         processNextTaskInQueue();
@@ -181,7 +185,7 @@
         return Flux.fromIterable(tasksToCancelRemotely)
                 .flatMap(task -> {
                     // 创建状态更新和远程取消的组合操作
-                    Mono<Void> updateStatus = updateTaskStatus(task.getTask_id(), "cancelled");
+                    Mono<Void> updateStatus = updateTaskStatus(task.getTask_id(), FinalStatus.CANCELLED.getValue());
                     Mono<ResponseResult<?>> cancelOp = cancelRemoteTask(task.getTask_id())
                             .onErrorResume(e -> {
                                 log.error("取消任务 {} 失败: {}", task.getTask_id(), e.getMessage());
@@ -347,7 +351,7 @@
                     // 如果任务状态是"submitted"或"running",继续轮询
                     if (!FinalStatus.COMPLETED.getValue().equalsIgnoreCase(statusResponse.getStatus())
                             && !FinalStatus.FAILED.getValue().equalsIgnoreCase(statusResponse.getStatus())
-                            && !FinalStatus.CANCELED.getValue().equalsIgnoreCase(statusResponse.getStatus())
+                            && !FinalStatus.CANCELLED.getValue().equalsIgnoreCase(statusResponse.getStatus())
                             && !(FinalStatus.ERROR.getValue().equalsIgnoreCase(statusResponse.getStatus())
                             && statusResponse.getMessage().contains("Task not found")) ) {
                         return Mono.delay(Duration.ofSeconds(5))  // 延迟 5 秒后再次查询
@@ -397,6 +401,7 @@
             KeywordTask keywordTask = new KeywordTask();
             keywordTask.setKeyword_id(keywordId);
             keywordTask.setTask_id(null);  // 任务ID为空
+            keywordTask.setCreate_time( LocalDateTime.now());
 
             keywordTask.setNum(keyword.getNum());
             keywordTaskService.save(keywordTask);  // 保存 KeywordTask
@@ -439,13 +444,12 @@
                                     //更新关键词状态
                                     LambdaUpdateWrapper<Keyword> updateWrapper = new LambdaUpdateWrapper<>();
                                     updateWrapper.eq(Keyword::getKeyword_id, batchRequest.getKeyword_id());
-                                    updateWrapper.set(Keyword::getStatus, FinalStatus.COMPLETED.getValue());
                                     updateWrapper.set(Keyword::getTask_id, taskResponse.getTask_id());
                                     keywordService.update(updateWrapper);
                                     //设置轮数
                                     Keyword keyword = keywordService.getById(batchRequest.getKeyword_id());
                                     // 更新关键词任务与任务ID的关联
-                                    // 获取与关键词相关的任务,task_id 为 null,确保只取一个任务
+                                    // 获取与关键词相关的任务
                                     List<KeywordTask> keywordTasks = keywordTaskService.list(new LambdaQueryWrapper<KeywordTask>()
                                             .eq(KeywordTask::getKeyword_id, keyword.getKeyword_id())
                                             .eq(KeywordTask::getNum, keyword.getNum())
@@ -453,12 +457,12 @@
                                     if (keywordTasks.size() > 0) {
                                         KeywordTask keywordTask = keywordTasks.get(0);
                                         keywordTask.setTask_id(taskResponse.getTask_id());
-                                        keywordTask.setStatus("pending");
+                                        keywordTask.setStatus(FinalStatus.PENDING.getValue());
                                         keywordTaskService.updateById(keywordTask);
                                     }
                                     //将提问词列表的状态转为pending
                                     for (String questionName : batchRequest.getQuestions()) {
-                                        questionService.update(new LambdaUpdateWrapper<Question>().eq(Question::getKeyword_id, keyword.getKeyword_id()).eq(Question::getQuestion,questionName).set(Question::getStatus, "pending"));
+                                        questionService.update(new LambdaUpdateWrapper<Question>().eq(Question::getKeyword_id, keyword.getKeyword_id()).eq(Question::getQuestion,questionName).set(Question::getStatus, FinalStatus.PENDING.getValue()));
 
                                     }
                                     //所有关键词都在采集中或者已完成或者错误设置订单进入采集状态
@@ -497,7 +501,7 @@
                 new LambdaQueryWrapper<KeywordTask>()
                         .eq(KeywordTask::getKeyword_id, keywordId)
                         .isNotNull(KeywordTask::getTask_id)
-                        .eq(KeywordTask::getStatus, "pending")
+                        .eq(KeywordTask::getStatus, FinalStatus.PENDING.getValue())
         );
     }
 
@@ -524,7 +528,7 @@
                 List<KeywordTask> keywordTasks = keywordTaskService.list(keywordTaskWrapper);
 
                 // 更新关键词状态
-                keyword.setStatus("completed");
+                keyword.setStatus(FinalStatus.COMPLETED.getValue());
                 keywordService.updateById(keyword);
 
                 //更新提问词状态为取消
@@ -588,7 +592,7 @@
     private Mono<ResponseResult<?>> cancelRemoteTask(String taskId) {
         // 使用Collections.singletonMap或手动创建Map
         Map<String, Object> requestBody = new HashMap<>();
-        requestBody.put("status", "pending");
+        requestBody.put("status", FinalStatus.PENDING.getValue());
 
         return webClient.post()
                 .uri(baseUrl + "/api/v1/tasks/" + taskId + "/cancel")
@@ -762,7 +766,6 @@
                 .allMatch(task -> FinalStatus.COMPLETED.getValue().equals(task.getStatus())
                         || FinalStatus.FALSE.getValue().equals(task.getStatus())
                         || FinalStatus.CANCELLED.getValue().equals(task.getStatus())
-                        || FinalStatus.CANCELED.getValue().equals(task.getStatus())
                         || FinalStatus.NONENTITY.getValue().equals(task.getStatus()));
 
         if (allCompletedOrFailed) {
@@ -908,7 +911,9 @@
             List<Reference> validRefGroup = refGroup.stream()
                     .filter(Objects::nonNull)
                     .collect(Collectors.toList());
-            if (validRefGroup.isEmpty()) return;
+            if (validRefGroup.isEmpty()){
+                return;
+            }
 
             Optional<Reference> existingRef = validRefGroup.stream()
                     .filter(ref -> ref.getReference_id() != null)
@@ -983,7 +988,7 @@
         // 批量更新问题状态
         List<Question> questionsToUpdate = new ArrayList<>();
         questions.forEach(question -> {
-            List<QuestionResultList> results = questionResultsMap.getOrDefault(question.getQuestion(), io.jsonwebtoken.lang.Collections.emptyList());
+            List<QuestionResultList> results = questionResultsMap.getOrDefault(question.getQuestion(), Collections.emptyList());
             FinalStatus finalStatus = determineFinalStatus(results);
 
             // 使用枚举进行switch判断

--
Gitblit v1.7.1