From 565e8ac5039eb2c7c63695f9e3494ef00cb47185 Mon Sep 17 00:00:00 2001 From: guyue <1721849008@qq.com> Date: 星期五, 18 七月 2025 15:15:23 +0800 Subject: [PATCH] 更改为传json --- src/main/java/com/linghu/timeTask/ScheduledTasks.java | 166 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 files changed, 158 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/linghu/timeTask/ScheduledTasks.java b/src/main/java/com/linghu/timeTask/ScheduledTasks.java index 95c2a50..701c2fd 100644 --- a/src/main/java/com/linghu/timeTask/ScheduledTasks.java +++ b/src/main/java/com/linghu/timeTask/ScheduledTasks.java @@ -2,28 +2,39 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.linghu.controller.CollectController; import com.linghu.model.dto.TaskResultResponse; import com.linghu.model.dto.TaskStatusResponse; import com.linghu.model.entity.KeywordTask; +import com.linghu.model.entity.Question; import com.linghu.service.KeywordTaskService; +import com.linghu.service.QuestionService; +import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @Component +@Slf4j public class ScheduledTasks { - private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class); +// private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class); private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); @Autowired @@ -31,9 +42,115 @@ @Autowired private CollectController collectController; + @Autowired + private WebClient webClient; // 假设已配置WebClient + @Autowired + private QuestionService questionService; - @Scheduled(fixedRate = 5000) // 每5秒执行一次 - public void scheduleFixedRateTask() { + private String baseUrl = "http://thirdparty-service"; // 第三方服务基础URL + private ScheduledFuture<?> scheduledTask; // 定时任务引用 + // 健康检查专用调度器(单线程足够) + private final ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(); + // 任务处理专用调度器(可根据任务量调整线程数) + private final ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(2); // 2 个线程 + private volatile boolean isHealthy = true; // 健康状态标识 + @PostConstruct + public void init() { + // 启动健康检查定时任务,每10分钟执行一次 + healthCheckScheduler.scheduleAtFixedRate(this::checkHealth, 0, 10, TimeUnit.MINUTES); + // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务) + if (isHealthy) { + startTaskProcessing(); + } + } + + /** + * 健康检查方法,调用第三方服务健康检查接口 + */ + private void checkHealth() { + log.info("开始执行健康检查..."); + collectController.checkThirdPartyHealth() + .subscribe( + response -> { + boolean previousHealthStatus = isHealthy; + isHealthy = "healthy".equalsIgnoreCase(response.getStatus()); + + // 状态变化时记录日志并控制任务执行 + if (previousHealthStatus != isHealthy) { + if (isHealthy) { + log.info("健康检查通过,恢复任务处理"); + startTaskProcessing(); + } else { + log.warn("健康检查失败,暂停任务处理"); + stopTaskProcessing(); + } + } + }, + error -> { + log.error("健康检查请求失败: {}", error.getMessage()); + if (isHealthy) { // 仅在健康状态变化时记录并停止任务 + isHealthy = false; + log.warn("因健康检查请求失败,暂停任务处理"); + stopTaskProcessing(); + } + } + ); + } + + /** + * 启动任务处理定时执行 + */ + private synchronized void startTaskProcessing() { + if (scheduledTask == null || scheduledTask.isCancelled()) { + scheduledTask = taskScheduler.scheduleAtFixedRate( + this::executeTaskProcessing, + 0, + 10, + TimeUnit.SECONDS + ); + System.out.println("nima"); + } + } + + /** + * 停止任务处理定时执行 + */ + private synchronized void stopTaskProcessing() { + if (scheduledTask != null && !scheduledTask.isCancelled()) { + scheduledTask.cancel(false); + } + } + @PreDestroy + public void destroy() { + // 关闭健康检查调度器 + healthCheckScheduler.shutdown(); + try { + if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) { + healthCheckScheduler.shutdownNow(); + } + } catch (InterruptedException e) { + healthCheckScheduler.shutdownNow(); + } + + // 关闭任务处理调度器 + taskScheduler.shutdown(); + try { + if (!taskScheduler.awaitTermination(5, TimeUnit.SECONDS)) { + taskScheduler.shutdownNow(); + } + } catch (InterruptedException e) { + taskScheduler.shutdownNow(); + } + } + /** + * 实际的任务处理方法,替代原@Scheduled注解方法 + */ + public void executeTaskProcessing() { + if (!isHealthy) { + log.debug("系统不健康,跳过任务处理"); + return; + } + // 查询所有状态为pending的任务 LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(KeywordTask::getStatus, "pending"); @@ -42,13 +159,14 @@ .stream() .filter(task -> task.getTask_id() != null) .forEach(task -> processTaskStatus(task) - .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行 + .subscribeOn(Schedulers.boundedElastic()) .subscribe( updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) ) ); } + private Mono<KeywordTask> processTaskStatus(KeywordTask task) { return collectController.getTaskStatus(task.getTask_id()) @@ -62,11 +180,19 @@ t.setStatus("completed"); return t; }); - } else if (!"submit".equalsIgnoreCase(statusResponse.getStatus()) - && !"running".equalsIgnoreCase(statusResponse.getStatus())) { + } else if (!"submitted".equalsIgnoreCase(statusResponse.getStatus()) + && !"running".equalsIgnoreCase(statusResponse.getStatus()) + && !"Error".equalsIgnoreCase(statusResponse.getStatus())) { task.setStatus("false"); return Mono.just(task); - } else { + }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) { + //更新每个提问词的状态 + return updateQuestionStatus(task, statusResponse); // 抽取为独立方法 + }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&"任务不存在".equalsIgnoreCase(statusResponse.getMessage())){ + task.setStatus("nonentity"); + return Mono.just(task); + } + else { // 任务仍在进行中,不更新状态 return Mono.empty(); } @@ -74,7 +200,6 @@ .switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务 .flatMap(t -> { if (!"pending".equalsIgnoreCase(t.getStatus())) { - // 修改这里:将updateById的结果包装成Mono return Mono.fromSupplier(() -> keywordTaskService.updateById(t)) @@ -91,4 +216,29 @@ .thenReturn(task); }); } + + private Mono<KeywordTask> updateQuestionStatus(KeywordTask task, TaskStatusResponse statusResponse) { + // 1. 先执行同步查询,获取 List<Question> + List<Question> questions = questionService.lambdaQuery() + .eq(Question::getKeyword_id, task.getKeyword_id()) + .list(); + + // 2. 将 List 转为 Flux,再进行响应式处理 + return Flux.fromIterable(questions) + .flatMap(question -> { + // 更新逻辑... + String newStatus = statusResponse.getQuestions_status().stream() + .filter(qs -> qs.getQuestion().equals(question.getQuestion())) + .findFirst() + .map(qs -> qs.getStatus()) + .orElse(question.getStatus()); + question.setStatus(newStatus); + + return Mono.fromSupplier(() -> questionService.updateById(question)) + // 此时可以调用 doOnError 处理异常 + .doOnError(e -> log.error("更新 Question {} 失败: {}", question.getQuestion_id(), e.getMessage())) + .onErrorReturn(false); // 异常时返回 false + }) + .then(Mono.just(task)); + } } \ No newline at end of file -- Gitblit v1.7.1