guyue
3 天以前 faa95a5b183a42a6c3fcf1d6a41d81caa33da3bc
src/main/java/com/linghu/timeTask/ScheduledTasks.java
@@ -2,53 +2,225 @@
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.linghu.controller.CollectController;
import com.linghu.model.dto.TaskResultResponse;
import com.linghu.model.dto.TaskStatusResponse;
import com.linghu.model.entity.Keyword;
import com.linghu.model.entity.KeywordTask;
import com.linghu.model.entity.Question;
import com.linghu.service.KeywordService;
import com.linghu.service.KeywordTaskService;
import com.linghu.service.QuestionService;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.LocalDateTime;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Component
@Slf4j
public class ScheduledTasks {
    private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class);
    private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
    @Autowired
    private KeywordTaskService keywordTaskService;
    @Autowired
    private CollectController collectController;
    @Autowired
    private WebClient webClient; // 假设已配置WebClient
    @Autowired
    private QuestionService questionService;
    @Autowired
    private KeywordService keywordService;
    @Scheduled(fixedRate = 5000) // 每5秒执行一次
    public void scheduleFixedRateTask() {
        // 查询所有状态为pending的任务
        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(KeywordTask::getStatus, "pending");
    private volatile boolean isHealthy = true; // 健康状态标识
    private volatile boolean initialCheckComplete = false; // 初始检查完成标志
    private volatile boolean taskEnabled = true; // 任务启用开关
    @PostConstruct
    public void init() {
        // 启动健康检查定时任务,每10分钟执行一次
        checkInitialHealth();
        // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务)
        keywordTaskService.list(queryWrapper)
                .stream()
                .filter(task -> task.getTask_id() != null)
                .forEach(task -> processTaskStatus(task)
                        .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行
                        .subscribe(
                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
                        )
                );
    }
    /**
     * 执行初始健康检查,确保系统启动时任务状态正确
     */
    private void checkInitialHealth() {
        log.info("执行系统启动初始健康检查...");
        try {
            // 同步执行健康检查,最多等待30秒
            Boolean healthCheckResult = collectController.checkThirdPartyHealth()
                    .map(response -> "healthy".equalsIgnoreCase(response.getStatus()))
                    .block(Duration.ofSeconds(30));
            isHealthy = Boolean.TRUE.equals(healthCheckResult);
            taskEnabled = isHealthy;
            if (isHealthy) {
                log.info("系统启动时健康检查通过,任务处理将正常执行");
            } else {
                log.warn("系统启动时健康检查失败,任务处理将暂停");
            }
        } catch (Exception e) {
            log.error("初始健康检查失败: {}", e.getMessage());
            isHealthy = false;
            taskEnabled = false;
        } finally {
            initialCheckComplete = true;
        }
    }
    /**
     * 健康检查定时任务,每10分钟执行一次
     */
    @Scheduled(initialDelay = 600000, fixedRate = 600000) // 10分钟 = 600000毫秒
    public void checkHealth() {
        // 等待初始检查完成
        if (!initialCheckComplete) {
            log.debug("初始健康检查未完成,跳过本次健康检查");
            return;
        }
        log.info("开始执行健康检查...");
        try {
            collectController.checkThirdPartyHealth()
                    .map(response -> "healthy".equalsIgnoreCase(response.getStatus()))
                    .subscribe(
                            healthStatus -> {
                                boolean previousHealthStatus = isHealthy;
                                isHealthy = healthStatus;
                                // 状态变化时更新任务开关
                                if (previousHealthStatus != isHealthy) {
                                    taskEnabled = isHealthy;
                                    if (isHealthy) {
                                        log.info("健康检查通过,恢复任务处理");
                                    } else {
                                        log.warn("健康检查失败,暂停任务处理");
                                    }
                                } else {
                                    log.info("健康状态未变化,当前状态: {}", isHealthy ? "健康" : "不健康");
                                }
                            },
                            error -> {
                                log.error("健康检查请求失败: {}", error.getMessage());
                                if (isHealthy) {
                                    isHealthy = false;
                                    taskEnabled = false;
                                    log.warn("因健康检查请求失败,暂停任务处理");
                                }
                            }
                    );
        } catch (Exception e) {
            log.error("健康检查执行异常: {}", e.getMessage());
            if (isHealthy) {
                isHealthy = false;
                taskEnabled = false;
            }
        }
    }
    /**
     * 任务处理定时任务,每10秒执行一次
     */
    @Scheduled(initialDelay = 0, fixedRate = 10000) // 10秒 = 10000毫秒
    public void executeTaskProcessing() {
        // 检查初始检查是否完成和任务是否启用
        if (!initialCheckComplete) {
            log.debug("初始检查未完成,跳过本次任务处理");
            return;
        }
        if (!taskEnabled) {
            log.debug("任务已被禁用,跳过本次任务处理");
            return;
        }
        if (!isHealthy) {
            log.debug("系统不健康,跳过任务处理");
            return;
        }
        try {
            // 查询所有状态为pending的任务
            LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
            queryWrapper.eq(KeywordTask::getStatus, "pending");
            List<KeywordTask> tasks = keywordTaskService.list(queryWrapper);
            log.info("查询到 {} 个待处理任务", tasks.size());
            // 先标记成 processing,避免下一轮又被调度发现
            if (!tasks.isEmpty()) {
                // 提取查询到的任务id列表
                List<Integer> taskIds = tasks.stream()
                        .map(KeywordTask::getId) // 假设任务有唯一id字段
                        .collect(Collectors.toList());
                // 批量更新:仅更新“id在查询列表中”且“状态仍为pending”的任务
                boolean updatedCount = keywordTaskService.update(
                        new LambdaUpdateWrapper<KeywordTask>()
                                .in(KeywordTask::getId, taskIds) // 限定为查询到的任务
                                .eq(KeywordTask::getStatus, "pending") // 确保状态未被其他进程修改
                                .set(KeywordTask::getStatus, "processing")
                );
                log.info("成功标记 {} 个任务为processing(查询到{}个)", updatedCount, tasks.size());
            }
            for (KeywordTask task : tasks) {
                if (task.getTask_id() != null) {
                    processTaskStatus(task)
                            .subscribeOn(Schedulers.boundedElastic())
                            .subscribe(
                                    updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
                                    error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
                            );
                }
            }
        } catch (Exception e) {
            log.error("任务处理执行异常: {}", e.getMessage());
        }
    }
    /**
     * 实际的任务处理方法,替代原@Scheduled注解方法
     */
//    public void executeTaskProcessing() {
//        if (!isHealthy) {
//            log.debug("系统不健康,跳过任务处理");
//            return;
//        }
//
//        // 查询所有状态为pending的任务
//        LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
//        queryWrapper.eq(KeywordTask::getStatus, "pending");
//
//        keywordTaskService.list(queryWrapper)
//                .stream()
//                .filter(task -> task.getTask_id() != null)
//                .forEach(task -> processTaskStatus(task)
//                        .subscribeOn(Schedulers.boundedElastic())
//                        .subscribe(
//                                updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
//                                error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
//                        )
//                );
//    }
    private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
        return collectController.getTaskStatus(task.getTask_id())
@@ -62,33 +234,98 @@
                                    t.setStatus("completed");
                                    return t;
                                });
                    } else if (!"submit".equalsIgnoreCase(statusResponse.getStatus())
                            && !"running".equalsIgnoreCase(statusResponse.getStatus())) {
                    } else if (!"submitted".equalsIgnoreCase(statusResponse.getStatus())
                            && !"running".equalsIgnoreCase(statusResponse.getStatus())
                            && !"Error".equalsIgnoreCase(statusResponse.getStatus())) {
                        task.setStatus("false");
                        return Mono.just(task);
                    } else {
                        // 新增:处理status为false时的关键词状态更新
                        return updateKeywordStatusWhenTaskFinished(task)
                                .then(Mono.just(task));
                    }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) {
                        // 改回 pending,进行下一轮查询
                        task.setStatus("pending");
                        //更新每个提问词的状态
                        return updateQuestionStatus(task, statusResponse); // 抽取为独立方法
                    }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){
                        task.setStatus("nonentity");
                        // 更新关键词状态
                        return updateKeywordStatusWhenTaskFinished(task)
                                .then(Mono.just(task));
                    }
                    else {
                        // 任务仍在进行中,不更新状态
                        return Mono.empty();
                    }
                })
                .switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务
                .flatMap(t -> {
                    if (!"pending".equalsIgnoreCase(t.getStatus())) {
                    // 修改这里:将updateById的结果包装成Mono
                    return Mono.fromSupplier(() -> keywordTaskService.updateById(t))
                            .thenReturn(t);
                        // 修改这里:将updateById的结果包装成Mono
                        return Mono.fromSupplier(() -> keywordTaskService.updateById(t))
                                .thenReturn(t);
                    }
                    return Mono.just(t);
                })
                .onErrorResume(e -> {
                    log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage());
                    task.setStatus("error");
                    // 修改这里:将updateById的结果包装成Mono
                    // 将updateById的结果包装成Mono
                    return Mono.fromSupplier(() -> keywordTaskService.updateById(task))
                            .thenReturn(task);
                });
    }
    private Mono<KeywordTask> updateQuestionStatus(KeywordTask task, TaskStatusResponse statusResponse) {
        // 1. 先执行同步查询,获取 List<Question>
        List<Question> questions = questionService.lambdaQuery()
                .eq(Question::getKeyword_id, task.getKeyword_id())
                .list();
        // 2. 将 List 转为 Flux,再进行响应式处理
        return Flux.fromIterable(questions)
                .flatMap(question -> {
                    // 更新逻辑...
                    String newStatus = statusResponse.getQuestions_status().stream()
                            .filter(qs -> qs.getQuestion().equals(question.getQuestion()))
                            .findFirst()
                            .map(qs -> qs.getStatus())
                            .orElse(question.getStatus());
                    question.setStatus(newStatus);
                    return Mono.fromSupplier(() -> questionService.updateById(question))
                            // 此时可以调用 doOnError 处理异常
                            .doOnError(e -> log.error("更新 Question {} 失败: {}", question.getQuestion_id(), e.getMessage()))
                            .onErrorReturn(false); // 异常时返回 false
                })
                .then(Mono.just(task));
    }
    /**
     * 当任务状态为false/nonentity时,更新关键词状态
     */
    private Mono<Void> updateKeywordStatusWhenTaskFinished(KeywordTask task) {
        return Mono.fromSupplier(() -> {
            Keyword keyword = keywordService.getById(task.getKeyword_id());
            LambdaQueryWrapper<KeywordTask> keywordTaskWrapper = new LambdaQueryWrapper<>();
            keywordTaskWrapper.eq(KeywordTask::getKeyword_id, keyword.getKeyword_id());
            return keywordTaskService.list(keywordTaskWrapper);
        }).flatMap(keywordTasks -> {
            // 检查所有关联任务是否都已完成(包括各种结束状态)
            boolean allCompleted = keywordTasks.stream().allMatch(t ->
                    "completed".equals(t.getStatus()) ||
                            "false".equals(t.getStatus()) ||
                            "cancelled".equals(t.getStatus()) ||
                            "canceled".equals(t.getStatus()) ||
                            "nonentity".equals(t.getStatus())
            );
            if (allCompleted) {
                Keyword keyword = keywordService.getById(task.getKeyword_id());
                keyword.setStatus("completed");
                //
                return Mono.fromSupplier(() -> keywordService.updateById(keyword))
                        .then(); // 转换为Mono<Void>
            }
            return Mono.empty();
        });
    }
}