From d381770738b4862608620faa6fa78c7cea60b920 Mon Sep 17 00:00:00 2001
From: guyue <1721849008@qq.com>
Date: 星期四, 17 七月 2025 23:58:38 +0800
Subject: [PATCH] jacksion版本更换

---
 src/main/java/com/linghu/timeTask/ScheduledTasks.java |  156 ++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 151 insertions(+), 5 deletions(-)

diff --git a/src/main/java/com/linghu/timeTask/ScheduledTasks.java b/src/main/java/com/linghu/timeTask/ScheduledTasks.java
index 4c45557..701c2fd 100644
--- a/src/main/java/com/linghu/timeTask/ScheduledTasks.java
+++ b/src/main/java/com/linghu/timeTask/ScheduledTasks.java
@@ -2,28 +2,39 @@
 
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.linghu.controller.CollectController;
 import com.linghu.model.dto.TaskResultResponse;
 import com.linghu.model.dto.TaskStatusResponse;
 import com.linghu.model.entity.KeywordTask;
+import com.linghu.model.entity.Question;
 import com.linghu.service.KeywordTaskService;
+import com.linghu.service.QuestionService;
+import lombok.extern.slf4j.Slf4j;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.client.WebClient;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 @Component
+@Slf4j
 public class ScheduledTasks {
-    private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class);
+//    private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class);
     private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
 
     @Autowired
@@ -31,9 +42,115 @@
 
     @Autowired
     private CollectController collectController;
+    @Autowired
+    private WebClient webClient; // 假设已配置WebClient
+    @Autowired
+    private QuestionService questionService;
 
-    @Scheduled(fixedRate = 10000) // 每5秒执行一次
-    public void scheduleFixedRateTask() {
+    private String baseUrl = "http://thirdparty-service"; // 第三方服务基础URL
+    private ScheduledFuture<?> scheduledTask; // 定时任务引用
+    // 健康检查专用调度器(单线程足够)
+    private final ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor();
+    // 任务处理专用调度器(可根据任务量调整线程数)
+    private final ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(2); // 2 个线程
+    private volatile boolean isHealthy = true; // 健康状态标识
+    @PostConstruct
+    public void init() {
+        // 启动健康检查定时任务,每10分钟执行一次
+        healthCheckScheduler.scheduleAtFixedRate(this::checkHealth, 0, 10, TimeUnit.MINUTES);
+        // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务)
+        if (isHealthy) {
+            startTaskProcessing();
+        }
+    }
+
+    /**
+     * 健康检查方法,调用第三方服务健康检查接口
+     */
+    private void checkHealth() {
+        log.info("开始执行健康检查...");
+        collectController.checkThirdPartyHealth()
+                .subscribe(
+                        response -> {
+                            boolean previousHealthStatus = isHealthy;
+                            isHealthy = "healthy".equalsIgnoreCase(response.getStatus());
+
+                            // 状态变化时记录日志并控制任务执行
+                            if (previousHealthStatus != isHealthy) {
+                                if (isHealthy) {
+                                    log.info("健康检查通过,恢复任务处理");
+                                    startTaskProcessing();
+                                } else {
+                                    log.warn("健康检查失败,暂停任务处理");
+                                    stopTaskProcessing();
+                                }
+                            }
+                        },
+                        error -> {
+                            log.error("健康检查请求失败: {}", error.getMessage());
+                            if (isHealthy) { // 仅在健康状态变化时记录并停止任务
+                                isHealthy = false;
+                                log.warn("因健康检查请求失败,暂停任务处理");
+                                stopTaskProcessing();
+                            }
+                        }
+                );
+    }
+
+    /**
+     * 启动任务处理定时执行
+     */
+    private synchronized void startTaskProcessing() {
+        if (scheduledTask == null || scheduledTask.isCancelled()) {
+            scheduledTask = taskScheduler.scheduleAtFixedRate(
+                    this::executeTaskProcessing,
+                    0,
+                    10,
+                    TimeUnit.SECONDS
+            );
+            System.out.println("nima");
+        }
+    }
+
+    /**
+     * 停止任务处理定时执行
+     */
+    private synchronized void stopTaskProcessing() {
+        if (scheduledTask != null && !scheduledTask.isCancelled()) {
+            scheduledTask.cancel(false);
+        }
+    }
+    @PreDestroy
+    public void destroy() {
+        // 关闭健康检查调度器
+        healthCheckScheduler.shutdown();
+        try {
+            if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+                healthCheckScheduler.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            healthCheckScheduler.shutdownNow();
+        }
+
+        // 关闭任务处理调度器
+        taskScheduler.shutdown();
+        try {
+            if (!taskScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+                taskScheduler.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            taskScheduler.shutdownNow();
+        }
+    }
+    /**
+     * 实际的任务处理方法,替代原@Scheduled注解方法
+     */
+    public void executeTaskProcessing() {
+        if (!isHealthy) {
+            log.debug("系统不健康,跳过任务处理");
+            return;
+        }
+
         // 查询所有状态为pending的任务
         LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
         queryWrapper.eq(KeywordTask::getStatus, "pending");
@@ -42,13 +159,14 @@
                 .stream()
                 .filter(task -> task.getTask_id() != null)
                 .forEach(task -> processTaskStatus(task)
-                        .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行
+                        .subscribeOn(Schedulers.boundedElastic())
                         .subscribe(
                                 updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
                                 error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
                         )
                 );
     }
+
 
     private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
         return collectController.getTaskStatus(task.getTask_id())
@@ -67,8 +185,11 @@
                             && !"Error".equalsIgnoreCase(statusResponse.getStatus())) {
                         task.setStatus("false");
                         return Mono.just(task);
+                    }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) {
+                        //更新每个提问词的状态
+                        return updateQuestionStatus(task, statusResponse); // 抽取为独立方法
                     }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&"任务不存在".equalsIgnoreCase(statusResponse.getMessage())){
-                        task.setStatus("false");
+                        task.setStatus("nonentity");
                         return Mono.just(task);
                     }
                     else {
@@ -95,4 +216,29 @@
                             .thenReturn(task);
                 });
     }
+
+    private Mono<KeywordTask> updateQuestionStatus(KeywordTask task, TaskStatusResponse statusResponse) {
+        // 1. 先执行同步查询,获取 List<Question>
+        List<Question> questions = questionService.lambdaQuery()
+                .eq(Question::getKeyword_id, task.getKeyword_id())
+                .list();
+
+        // 2. 将 List 转为 Flux,再进行响应式处理
+        return Flux.fromIterable(questions)
+                .flatMap(question -> {
+                    // 更新逻辑...
+                    String newStatus = statusResponse.getQuestions_status().stream()
+                            .filter(qs -> qs.getQuestion().equals(question.getQuestion()))
+                            .findFirst()
+                            .map(qs -> qs.getStatus())
+                            .orElse(question.getStatus());
+                    question.setStatus(newStatus);
+
+                    return Mono.fromSupplier(() -> questionService.updateById(question))
+                            // 此时可以调用 doOnError 处理异常
+                            .doOnError(e -> log.error("更新 Question {} 失败: {}", question.getQuestion_id(), e.getMessage()))
+                            .onErrorReturn(false); // 异常时返回 false
+                })
+                .then(Mono.just(task));
+    }
 }
\ No newline at end of file

--
Gitblit v1.7.1