From 693f70da11701e777203e263d7da41abb648dd0f Mon Sep 17 00:00:00 2001
From: guyue <1721849008@qq.com>
Date: 星期三, 16 七月 2025 20:16:29 +0800
Subject: [PATCH] 对比采集,实时状态

---
 src/main/java/com/linghu/timeTask/ScheduledTasks.java |   53 +++++++++++++++++++++++++++++++++++------------------
 1 files changed, 35 insertions(+), 18 deletions(-)

diff --git a/src/main/java/com/linghu/timeTask/ScheduledTasks.java b/src/main/java/com/linghu/timeTask/ScheduledTasks.java
index 7ec4f3c..701c2fd 100644
--- a/src/main/java/com/linghu/timeTask/ScheduledTasks.java
+++ b/src/main/java/com/linghu/timeTask/ScheduledTasks.java
@@ -2,11 +2,14 @@
 
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.linghu.controller.CollectController;
 import com.linghu.model.dto.TaskResultResponse;
 import com.linghu.model.dto.TaskStatusResponse;
 import com.linghu.model.entity.KeywordTask;
+import com.linghu.model.entity.Question;
 import com.linghu.service.KeywordTaskService;
+import com.linghu.service.QuestionService;
 import lombok.extern.slf4j.Slf4j;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,6 +44,8 @@
     private CollectController collectController;
     @Autowired
     private WebClient webClient; // 假设已配置WebClient
+    @Autowired
+    private QuestionService questionService;
 
     private String baseUrl = "http://thirdparty-service"; // 第三方服务基础URL
     private ScheduledFuture<?> scheduledTask; // 定时任务引用
@@ -161,23 +166,7 @@
                         )
                 );
     }
-//    @Scheduled(fixedRate = 10000) // 每5秒执行一次
-//    public void scheduleFixedRateTask() {
-//        // 查询所有状态为pending的任务
-//        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
-//        queryWrapper.eq(KeywordTask::getStatus, "pending");
-//
-//        keywordTaskService.list(queryWrapper)
-//                .stream()
-//                .filter(task -> task.getTask_id() != null)
-//                .forEach(task -> processTaskStatus(task)
-//                        .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行
-//                        .subscribe(
-//                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
-//                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
-//                        )
-//                );
-//    }
+
 
     private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
         return collectController.getTaskStatus(task.getTask_id())
@@ -196,8 +185,11 @@
                             && !"Error".equalsIgnoreCase(statusResponse.getStatus())) {
                         task.setStatus("false");
                         return Mono.just(task);
+                    }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) {
+                        //更新每个提问词的状态
+                        return updateQuestionStatus(task, statusResponse); // 抽取为独立方法
                     }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&"任务不存在".equalsIgnoreCase(statusResponse.getMessage())){
-                        task.setStatus("false");
+                        task.setStatus("nonentity");
                         return Mono.just(task);
                     }
                     else {
@@ -224,4 +216,29 @@
                             .thenReturn(task);
                 });
     }
+
+    private Mono<KeywordTask> updateQuestionStatus(KeywordTask task, TaskStatusResponse statusResponse) {
+        // 1. 先执行同步查询,获取 List<Question>
+        List<Question> questions = questionService.lambdaQuery()
+                .eq(Question::getKeyword_id, task.getKeyword_id())
+                .list();
+
+        // 2. 将 List 转为 Flux,再进行响应式处理
+        return Flux.fromIterable(questions)
+                .flatMap(question -> {
+                    // 更新逻辑...
+                    String newStatus = statusResponse.getQuestions_status().stream()
+                            .filter(qs -> qs.getQuestion().equals(question.getQuestion()))
+                            .findFirst()
+                            .map(qs -> qs.getStatus())
+                            .orElse(question.getStatus());
+                    question.setStatus(newStatus);
+
+                    return Mono.fromSupplier(() -> questionService.updateById(question))
+                            // 此时可以调用 doOnError 处理异常
+                            .doOnError(e -> log.error("更新 Question {} 失败: {}", question.getQuestion_id(), e.getMessage()))
+                            .onErrorReturn(false); // 异常时返回 false
+                })
+                .then(Mono.just(task));
+    }
 }
\ No newline at end of file

--
Gitblit v1.7.1