| | |
| | | |
| | | |
| | | import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
| | | import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; |
| | | import com.linghu.controller.CollectController; |
| | | import com.linghu.model.dto.TaskResultResponse; |
| | | import com.linghu.model.dto.TaskStatusResponse; |
| | | import com.linghu.model.entity.KeywordTask; |
| | | import com.linghu.model.entity.Question; |
| | | import com.linghu.service.KeywordTaskService; |
| | | import com.linghu.service.QuestionService; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.scheduling.annotation.Scheduled; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.web.reactive.function.client.WebClient; |
| | | import reactor.core.publisher.Flux; |
| | | import reactor.core.publisher.Mono; |
| | | import reactor.core.scheduler.Schedulers; |
| | | |
| | | import javax.annotation.PostConstruct; |
| | | import javax.annotation.PreDestroy; |
| | | import java.time.LocalDateTime; |
| | | import java.time.format.DateTimeFormatter; |
| | | import java.util.List; |
| | | import java.util.concurrent.Executors; |
| | | import java.util.concurrent.ScheduledExecutorService; |
| | | import java.util.concurrent.ScheduledFuture; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | @Component |
| | | @Slf4j |
| | | public class ScheduledTasks { |
| | | private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class); |
| | | // private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class); |
| | | private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); |
| | | |
| | | @Autowired |
| | |
| | | |
| | | @Autowired |
| | | private CollectController collectController; |
| | | @Autowired |
| | | private WebClient webClient; // 假设已配置WebClient |
| | | @Autowired |
| | | private QuestionService questionService; |
| | | |
| | | @Scheduled(fixedRate = 10000) // 每5秒执行一次 |
| | | public void scheduleFixedRateTask() { |
| | | private String baseUrl = "http://thirdparty-service"; // 第三方服务基础URL |
| | | private ScheduledFuture<?> scheduledTask; // 定时任务引用 |
| | | // 健康检查专用调度器(单线程足够) |
| | | private final ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(); |
| | | // 任务处理专用调度器(可根据任务量调整线程数) |
| | | private final ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(2); // 2 个线程 |
| | | private volatile boolean isHealthy = true; // 健康状态标识 |
| | | @PostConstruct |
| | | public void init() { |
| | | // 启动健康检查定时任务,每10分钟执行一次 |
| | | healthCheckScheduler.scheduleAtFixedRate(this::checkHealth, 0, 10, TimeUnit.MINUTES); |
| | | // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务) |
| | | if (isHealthy) { |
| | | startTaskProcessing(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 健康检查方法,调用第三方服务健康检查接口 |
| | | */ |
| | | private void checkHealth() { |
| | | log.info("开始执行健康检查..."); |
| | | collectController.checkThirdPartyHealth() |
| | | .subscribe( |
| | | response -> { |
| | | boolean previousHealthStatus = isHealthy; |
| | | isHealthy = "healthy".equalsIgnoreCase(response.getStatus()); |
| | | |
| | | // 状态变化时记录日志并控制任务执行 |
| | | if (previousHealthStatus != isHealthy) { |
| | | if (isHealthy) { |
| | | log.info("健康检查通过,恢复任务处理"); |
| | | startTaskProcessing(); |
| | | } else { |
| | | log.warn("健康检查失败,暂停任务处理"); |
| | | stopTaskProcessing(); |
| | | } |
| | | } |
| | | }, |
| | | error -> { |
| | | log.error("健康检查请求失败: {}", error.getMessage()); |
| | | if (isHealthy) { // 仅在健康状态变化时记录并停止任务 |
| | | isHealthy = false; |
| | | log.warn("因健康检查请求失败,暂停任务处理"); |
| | | stopTaskProcessing(); |
| | | } |
| | | } |
| | | ); |
| | | } |
| | | |
| | | /** |
| | | * 启动任务处理定时执行 |
| | | */ |
| | | private synchronized void startTaskProcessing() { |
| | | if (scheduledTask == null || scheduledTask.isCancelled()) { |
| | | scheduledTask = taskScheduler.scheduleAtFixedRate( |
| | | this::executeTaskProcessing, |
| | | 0, |
| | | 10, |
| | | TimeUnit.SECONDS |
| | | ); |
| | | System.out.println("nima"); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 停止任务处理定时执行 |
| | | */ |
| | | private synchronized void stopTaskProcessing() { |
| | | if (scheduledTask != null && !scheduledTask.isCancelled()) { |
| | | scheduledTask.cancel(false); |
| | | } |
| | | } |
| | | @PreDestroy |
| | | public void destroy() { |
| | | // 关闭健康检查调度器 |
| | | healthCheckScheduler.shutdown(); |
| | | try { |
| | | if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) { |
| | | healthCheckScheduler.shutdownNow(); |
| | | } |
| | | } catch (InterruptedException e) { |
| | | healthCheckScheduler.shutdownNow(); |
| | | } |
| | | |
| | | // 关闭任务处理调度器 |
| | | taskScheduler.shutdown(); |
| | | try { |
| | | if (!taskScheduler.awaitTermination(5, TimeUnit.SECONDS)) { |
| | | taskScheduler.shutdownNow(); |
| | | } |
| | | } catch (InterruptedException e) { |
| | | taskScheduler.shutdownNow(); |
| | | } |
| | | } |
| | | /** |
| | | * 实际的任务处理方法,替代原@Scheduled注解方法 |
| | | */ |
| | | public void executeTaskProcessing() { |
| | | if (!isHealthy) { |
| | | log.debug("系统不健康,跳过任务处理"); |
| | | return; |
| | | } |
| | | |
| | | // 查询所有状态为pending的任务 |
| | | LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); |
| | | queryWrapper.eq(KeywordTask::getStatus, "pending"); |
| | |
| | | .stream() |
| | | .filter(task -> task.getTask_id() != null) |
| | | .forEach(task -> processTaskStatus(task) |
| | | .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行 |
| | | .subscribeOn(Schedulers.boundedElastic()) |
| | | .subscribe( |
| | | updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), |
| | | error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) |
| | | ) |
| | | ); |
| | | } |
| | | |
| | | |
| | | private Mono<KeywordTask> processTaskStatus(KeywordTask task) { |
| | | return collectController.getTaskStatus(task.getTask_id()) |
| | |
| | | && !"Error".equalsIgnoreCase(statusResponse.getStatus())) { |
| | | task.setStatus("false"); |
| | | return Mono.just(task); |
| | | }else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) { |
| | | //更新每个提问词的状态 |
| | | return updateQuestionStatus(task, statusResponse); // 抽取为独立方法 |
| | | }else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&"任务不存在".equalsIgnoreCase(statusResponse.getMessage())){ |
| | | task.setStatus("false"); |
| | | task.setStatus("nonentity"); |
| | | return Mono.just(task); |
| | | } |
| | | else { |
| | |
| | | .thenReturn(task); |
| | | }); |
| | | } |
| | | |
| | | private Mono<KeywordTask> updateQuestionStatus(KeywordTask task, TaskStatusResponse statusResponse) { |
| | | // 1. 先执行同步查询,获取 List<Question> |
| | | List<Question> questions = questionService.lambdaQuery() |
| | | .eq(Question::getKeyword_id, task.getKeyword_id()) |
| | | .list(); |
| | | |
| | | // 2. 将 List 转为 Flux,再进行响应式处理 |
| | | return Flux.fromIterable(questions) |
| | | .flatMap(question -> { |
| | | // 更新逻辑... |
| | | String newStatus = statusResponse.getQuestions_status().stream() |
| | | .filter(qs -> qs.getQuestion().equals(question.getQuestion())) |
| | | .findFirst() |
| | | .map(qs -> qs.getStatus()) |
| | | .orElse(question.getStatus()); |
| | | question.setStatus(newStatus); |
| | | |
| | | return Mono.fromSupplier(() -> questionService.updateById(question)) |
| | | // 此时可以调用 doOnError 处理异常 |
| | | .doOnError(e -> log.error("更新 Question {} 失败: {}", question.getQuestion_id(), e.getMessage())) |
| | | .onErrorReturn(false); // 异常时返回 false |
| | | }) |
| | | .then(Mono.just(task)); |
| | | } |
| | | } |