From a9e958ce3675c4950ceddd3fd6f939cdf0d2bc5a Mon Sep 17 00:00:00 2001
From: guyue <1721849008@qq.com>
Date: 星期五, 25 七月 2025 19:00:20 +0800
Subject: [PATCH] 任务不存在时,更新关键词状态,更新创建订单id逻辑

---
 src/main/java/com/linghu/timeTask/ScheduledTasks.java |  263 +++++++++++++++++++++++++++++++++++++++++++++++----
 1 files changed, 239 insertions(+), 24 deletions(-)

diff --git a/src/main/java/com/linghu/timeTask/ScheduledTasks.java b/src/main/java/com/linghu/timeTask/ScheduledTasks.java
index 4c45557..cb6e4a3 100644
--- a/src/main/java/com/linghu/timeTask/ScheduledTasks.java
+++ b/src/main/java/com/linghu/timeTask/ScheduledTasks.java
@@ -2,53 +2,207 @@
 
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.linghu.controller.CollectController;
 import com.linghu.model.dto.TaskResultResponse;
 import com.linghu.model.dto.TaskStatusResponse;
+import com.linghu.model.entity.Keyword;
 import com.linghu.model.entity.KeywordTask;
+import com.linghu.model.entity.Question;
+import com.linghu.service.KeywordService;
 import com.linghu.service.KeywordTaskService;
+import com.linghu.service.QuestionService;
+import lombok.extern.slf4j.Slf4j;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.client.WebClient;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
-import java.time.LocalDateTime;
+import javax.annotation.PostConstruct;
+import java.time.Duration;
 import java.time.format.DateTimeFormatter;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
+
 
 @Component
+@Slf4j
 public class ScheduledTasks {
-    private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class);
     private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
 
     @Autowired
     private KeywordTaskService keywordTaskService;
-
     @Autowired
     private CollectController collectController;
+    @Autowired
+    private WebClient webClient; // 假设已配置WebClient
+    @Autowired
+    private QuestionService questionService;
+    @Autowired
+    private KeywordService keywordService;
 
-    @Scheduled(fixedRate = 10000) // 每5秒执行一次
-    public void scheduleFixedRateTask() {
-        // 查询所有状态为pending的任务
-        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
-        queryWrapper.eq(KeywordTask::getStatus, "pending");
+    private volatile boolean isHealthy = true; // 健康状态标识
+    private volatile boolean initialCheckComplete = false; // 初始检查完成标志
+    private volatile boolean taskEnabled = true; // 任务启用开关
+    @PostConstruct
+    public void init() {
+        // 启动健康检查定时任务,每10分钟执行一次
+        checkInitialHealth();
+        // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务)
 
-        keywordTaskService.list(queryWrapper)
-                .stream()
-                .filter(task -> task.getTask_id() != null)
-                .forEach(task -> processTaskStatus(task)
-                        .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行
-                        .subscribe(
-                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
-                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
-                        )
-                );
     }
+
+    /**
+     * 执行初始健康检查,确保系统启动时任务状态正确
+     */
+    private void checkInitialHealth() {
+        log.info("执行系统启动初始健康检查...");
+
+        try {
+            // 同步执行健康检查,最多等待30秒
+            Boolean healthCheckResult = collectController.checkThirdPartyHealth()
+                    .map(response -> "healthy".equalsIgnoreCase(response.getStatus()))
+                    .block(Duration.ofSeconds(30));
+
+            isHealthy = Boolean.TRUE.equals(healthCheckResult);
+            taskEnabled = isHealthy;
+
+            if (isHealthy) {
+                log.info("系统启动时健康检查通过,任务处理将正常执行");
+            } else {
+                log.warn("系统启动时健康检查失败,任务处理将暂停");
+            }
+        } catch (Exception e) {
+            log.error("初始健康检查失败: {}", e.getMessage());
+            isHealthy = false;
+            taskEnabled = false;
+        } finally {
+            initialCheckComplete = true;
+        }
+    }
+
+    /**
+     * 健康检查定时任务,每10分钟执行一次
+     */
+    @Scheduled(initialDelay = 600000, fixedRate = 600000) // 10分钟 = 600000毫秒
+    public void checkHealth() {
+        // 等待初始检查完成
+        if (!initialCheckComplete) {
+            log.debug("初始健康检查未完成,跳过本次健康检查");
+            return;
+        }
+
+        log.info("开始执行健康检查...");
+        try {
+            collectController.checkThirdPartyHealth()
+                    .map(response -> "healthy".equalsIgnoreCase(response.getStatus()))
+                    .subscribe(
+                            healthStatus -> {
+                                boolean previousHealthStatus = isHealthy;
+                                isHealthy = healthStatus;
+
+                                // 状态变化时更新任务开关
+                                if (previousHealthStatus != isHealthy) {
+                                    taskEnabled = isHealthy;
+                                    if (isHealthy) {
+                                        log.info("健康检查通过,恢复任务处理");
+                                    } else {
+                                        log.warn("健康检查失败,暂停任务处理");
+                                    }
+                                } else {
+                                    log.info("健康状态未变化,当前状态: {}", isHealthy ? "健康" : "不健康");
+                                }
+                            },
+                            error -> {
+                                log.error("健康检查请求失败: {}", error.getMessage());
+                                if (isHealthy) {
+                                    isHealthy = false;
+                                    taskEnabled = false;
+                                    log.warn("因健康检查请求失败,暂停任务处理");
+                                }
+                            }
+                    );
+        } catch (Exception e) {
+            log.error("健康检查执行异常: {}", e.getMessage());
+            if (isHealthy) {
+                isHealthy = false;
+                taskEnabled = false;
+            }
+        }
+    }
+
+    /**
+     * 任务处理定时任务,每10秒执行一次
+     */
+    @Scheduled(initialDelay = 0, fixedRate = 10000) // 10秒 = 10000毫秒
+    public void executeTaskProcessing() {
+        // 检查初始检查是否完成和任务是否启用
+        if (!initialCheckComplete) {
+            log.debug("初始检查未完成,跳过本次任务处理");
+            return;
+        }
+
+        if (!taskEnabled) {
+            log.debug("任务已被禁用,跳过本次任务处理");
+            return;
+        }
+
+        if (!isHealthy) {
+            log.debug("系统不健康,跳过任务处理");
+            return;
+        }
+
+        try {
+            // 查询所有状态为pending的任务
+            LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
+            queryWrapper.eq(KeywordTask::getStatus, "pending");
+
+            List<KeywordTask> tasks = keywordTaskService.list(queryWrapper);
+            log.info("查询到 {} 个待处理任务", tasks.size());
+
+            for (KeywordTask task : tasks) {
+                if (task.getTask_id() != null) {
+                    processTaskStatus(task)
+                            .subscribeOn(Schedulers.boundedElastic())
+                            .subscribe(
+                                    updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
+                                    error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
+                            );
+                }
+            }
+        } catch (Exception e) {
+            log.error("任务处理执行异常: {}", e.getMessage());
+        }
+    }
+    /**
+     * 实际的任务处理方法,替代原@Scheduled注解方法
+     */
+//    public void executeTaskProcessing() {
+//        if (!isHealthy) {
+//            log.debug("系统不健康,跳过任务处理");
+//            return;
+//        }
+//
+//        // 查询所有状态为pending的任务
+//        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
+//        queryWrapper.eq(KeywordTask::getStatus, "pending");
+//
+//        keywordTaskService.list(queryWrapper)
+//                .stream()
+//                .filter(task -> task.getTask_id() != null)
+//                .forEach(task -> processTaskStatus(task)
+//                        .subscribeOn(Schedulers.boundedElastic())
+//                        .subscribe(
+//                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
+//                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
+//                        )
+//                );
+//    }
+
 
     private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
         return collectController.getTaskStatus(task.getTask_id())
@@ -66,10 +220,17 @@
                             && !"running".equalsIgnoreCase(statusResponse.getStatus())
                             && !"Error".equalsIgnoreCase(statusResponse.getStatus())) {
                         task.setStatus("false");
-                        return Mono.just(task);
-                    }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&"任务不存在".equalsIgnoreCase(statusResponse.getMessage())){
-                        task.setStatus("false");
-                        return Mono.just(task);
+                        // 新增:处理status为false时的关键词状态更新
+                        return updateKeywordStatusWhenTaskFinished(task)
+                                .then(Mono.just(task));
+                    }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) {
+                        //更新每个提问词的状态
+                        return updateQuestionStatus(task, statusResponse); // 抽取为独立方法
+                    }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){
+                        task.setStatus("nonentity");
+                        // 更新关键词状态
+                        return updateKeywordStatusWhenTaskFinished(task)
+                                .then(Mono.just(task));
                     }
                     else {
                         // 任务仍在进行中,不更新状态
@@ -90,9 +251,63 @@
                     log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage());
                     task.setStatus("error");
 
-                    // 修改这里:将updateById的结果包装成Mono
+                    // 将updateById的结果包装成Mono
                     return Mono.fromSupplier(() -> keywordTaskService.updateById(task))
                             .thenReturn(task);
                 });
     }
+
+    private Mono<KeywordTask> updateQuestionStatus(KeywordTask task, TaskStatusResponse statusResponse) {
+        // 1. 先执行同步查询,获取 List<Question>
+        List<Question> questions = questionService.lambdaQuery()
+                .eq(Question::getKeyword_id, task.getKeyword_id())
+                .list();
+
+        // 2. 将 List 转为 Flux,再进行响应式处理
+        return Flux.fromIterable(questions)
+                .flatMap(question -> {
+                    // 更新逻辑...
+                    String newStatus = statusResponse.getQuestions_status().stream()
+                            .filter(qs -> qs.getQuestion().equals(question.getQuestion()))
+                            .findFirst()
+                            .map(qs -> qs.getStatus())
+                            .orElse(question.getStatus());
+                    question.setStatus(newStatus);
+
+                    return Mono.fromSupplier(() -> questionService.updateById(question))
+                            // 此时可以调用 doOnError 处理异常
+                            .doOnError(e -> log.error("更新 Question {} 失败: {}", question.getQuestion_id(), e.getMessage()))
+                            .onErrorReturn(false); // 异常时返回 false
+                })
+                .then(Mono.just(task));
+    }
+    /**
+     * 当任务状态为false/nonentity时,更新关键词状态
+     */
+    private Mono<Void> updateKeywordStatusWhenTaskFinished(KeywordTask task) {
+        return Mono.fromSupplier(() -> {
+            Keyword keyword = keywordService.getById(task.getKeyword_id());
+            LambdaQueryWrapper<KeywordTask> keywordTaskWrapper = new LambdaQueryWrapper<>();
+            keywordTaskWrapper.eq(KeywordTask::getKeyword_id, keyword.getKeyword_id());
+            return keywordTaskService.list(keywordTaskWrapper);
+        }).flatMap(keywordTasks -> {
+            // 检查所有关联任务是否都已完成(包括各种结束状态)
+            boolean allCompleted = keywordTasks.stream().allMatch(t ->
+                    "completed".equals(t.getStatus()) ||
+                            "false".equals(t.getStatus()) ||
+                            "cancelled".equals(t.getStatus()) ||
+                            "canceled".equals(t.getStatus()) ||
+                            "nonentity".equals(t.getStatus())
+            );
+
+            if (allCompleted) {
+                Keyword keyword = keywordService.getById(task.getKeyword_id());
+                keyword.setStatus("completed");
+                //
+                return Mono.fromSupplier(() -> keywordService.updateById(keyword))
+                        .then(); // 转换为Mono<Void>
+            }
+            return Mono.empty();
+        });
+    }
 }
\ No newline at end of file

--
Gitblit v1.7.1