| | |
| | | import reactor.core.scheduler.Schedulers; |
| | | |
| | | import javax.annotation.PostConstruct; |
| | | import javax.annotation.PreDestroy; |
| | | import java.time.LocalDateTime; |
| | | import java.time.Duration; |
| | | import java.time.format.DateTimeFormatter; |
| | | import java.util.List; |
| | | import java.util.concurrent.Executors; |
| | | import java.util.concurrent.ScheduledExecutorService; |
| | | import java.util.concurrent.ScheduledFuture; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | |
| | | @Component |
| | | @Slf4j |
| | |
| | | @Autowired |
| | | private QuestionService questionService; |
| | | |
| | | private ScheduledFuture<?> scheduledTask; // 定时任务引用 |
| | | // 健康检查专用调度器(单线程足够) |
| | | private final ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(); |
| | | // 任务处理专用调度器(可根据任务量调整线程数) |
| | | private final ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(2); // 2 个线程 |
| | | private volatile boolean isHealthy = true; // 健康状态标识 |
| | | private volatile boolean initialCheckComplete = false; // 初始检查完成标志 |
| | | private volatile boolean taskEnabled = true; // 任务启用开关 |
| | | @PostConstruct |
| | | public void init() { |
| | | // 启动健康检查定时任务,每10分钟执行一次 |
| | | healthCheckScheduler.scheduleAtFixedRate(this::checkHealth, 0, 10, TimeUnit.MINUTES); |
| | | checkInitialHealth(); |
| | | // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务) |
| | | if (isHealthy) { |
| | | startTaskProcessing(); |
| | | |
| | | } |
| | | |
| | | /** |
| | | * 执行初始健康检查,确保系统启动时任务状态正确 |
| | | */ |
| | | private void checkInitialHealth() { |
| | | log.info("执行系统启动初始健康检查..."); |
| | | |
| | | try { |
| | | // 同步执行健康检查,最多等待30秒 |
| | | Boolean healthCheckResult = collectController.checkThirdPartyHealth() |
| | | .map(response -> "healthy".equalsIgnoreCase(response.getStatus())) |
| | | .block(Duration.ofSeconds(30)); |
| | | |
| | | isHealthy = Boolean.TRUE.equals(healthCheckResult); |
| | | taskEnabled = isHealthy; |
| | | |
| | | if (isHealthy) { |
| | | log.info("系统启动时健康检查通过,任务处理将正常执行"); |
| | | } else { |
| | | log.warn("系统启动时健康检查失败,任务处理将暂停"); |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("初始健康检查失败: {}", e.getMessage()); |
| | | isHealthy = false; |
| | | taskEnabled = false; |
| | | } finally { |
| | | initialCheckComplete = true; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 健康检查方法,调用第三方服务健康检查接口 |
| | | * 健康检查定时任务,每10分钟执行一次 |
| | | */ |
| | | private void checkHealth() { |
| | | log.info("开始执行健康检查..."); |
| | | collectController.checkThirdPartyHealth() |
| | | .subscribe( |
| | | response -> { |
| | | boolean previousHealthStatus = isHealthy; |
| | | isHealthy = "healthy".equalsIgnoreCase(response.getStatus()); |
| | | @Scheduled(initialDelay = 600000, fixedRate = 600000) // 10分钟 = 600000毫秒 |
| | | public void checkHealth() { |
| | | // 等待初始检查完成 |
| | | if (!initialCheckComplete) { |
| | | log.debug("初始健康检查未完成,跳过本次健康检查"); |
| | | return; |
| | | } |
| | | |
| | | // 状态变化时记录日志并控制任务执行 |
| | | if (previousHealthStatus != isHealthy) { |
| | | if (isHealthy) { |
| | | log.info("健康检查通过,恢复任务处理"); |
| | | startTaskProcessing(); |
| | | log.info("开始执行健康检查..."); |
| | | try { |
| | | collectController.checkThirdPartyHealth() |
| | | .map(response -> "healthy".equalsIgnoreCase(response.getStatus())) |
| | | .subscribe( |
| | | healthStatus -> { |
| | | boolean previousHealthStatus = isHealthy; |
| | | isHealthy = healthStatus; |
| | | |
| | | // 状态变化时更新任务开关 |
| | | if (previousHealthStatus != isHealthy) { |
| | | taskEnabled = isHealthy; |
| | | if (isHealthy) { |
| | | log.info("健康检查通过,恢复任务处理"); |
| | | } else { |
| | | log.warn("健康检查失败,暂停任务处理"); |
| | | } |
| | | } else { |
| | | log.warn("健康检查失败,暂停任务处理"); |
| | | stopTaskProcessing(); |
| | | log.info("健康状态未变化,当前状态: {}", isHealthy ? "健康" : "不健康"); |
| | | } |
| | | }, |
| | | error -> { |
| | | log.error("健康检查请求失败: {}", error.getMessage()); |
| | | if (isHealthy) { |
| | | isHealthy = false; |
| | | taskEnabled = false; |
| | | log.warn("因健康检查请求失败,暂停任务处理"); |
| | | } |
| | | } |
| | | }, |
| | | error -> { |
| | | log.error("健康检查请求失败: {}", error.getMessage()); |
| | | if (isHealthy) { // 仅在健康状态变化时记录并停止任务 |
| | | isHealthy = false; |
| | | log.warn("因健康检查请求失败,暂停任务处理"); |
| | | stopTaskProcessing(); |
| | | } |
| | | } |
| | | ); |
| | | } |
| | | |
| | | /** |
| | | * 启动任务处理定时执行 |
| | | */ |
| | | private synchronized void startTaskProcessing() { |
| | | if (scheduledTask == null || scheduledTask.isCancelled()) { |
| | | scheduledTask = taskScheduler.scheduleAtFixedRate( |
| | | this::executeTaskProcessing, |
| | | 0, |
| | | 10, |
| | | TimeUnit.SECONDS |
| | | ); |
| | | System.out.println("nima"); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 停止任务处理定时执行 |
| | | */ |
| | | private synchronized void stopTaskProcessing() { |
| | | if (scheduledTask != null && !scheduledTask.isCancelled()) { |
| | | scheduledTask.cancel(false); |
| | | } |
| | | } |
| | | @PreDestroy |
| | | public void destroy() { |
| | | // 关闭健康检查调度器 |
| | | healthCheckScheduler.shutdown(); |
| | | try { |
| | | if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) { |
| | | healthCheckScheduler.shutdownNow(); |
| | | ); |
| | | } catch (Exception e) { |
| | | log.error("健康检查执行异常: {}", e.getMessage()); |
| | | if (isHealthy) { |
| | | isHealthy = false; |
| | | taskEnabled = false; |
| | | } |
| | | } catch (InterruptedException e) { |
| | | healthCheckScheduler.shutdownNow(); |
| | | } |
| | | |
| | | // 关闭任务处理调度器 |
| | | taskScheduler.shutdown(); |
| | | try { |
| | | if (!taskScheduler.awaitTermination(5, TimeUnit.SECONDS)) { |
| | | taskScheduler.shutdownNow(); |
| | | } |
| | | } catch (InterruptedException e) { |
| | | taskScheduler.shutdownNow(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 实际的任务处理方法,替代原@Scheduled注解方法 |
| | | * 任务处理定时任务,每10秒执行一次 |
| | | */ |
| | | @Scheduled(initialDelay = 0, fixedRate = 10000) // 10秒 = 10000毫秒 |
| | | public void executeTaskProcessing() { |
| | | // 检查初始检查是否完成和任务是否启用 |
| | | if (!initialCheckComplete) { |
| | | log.debug("初始检查未完成,跳过本次任务处理"); |
| | | return; |
| | | } |
| | | |
| | | if (!taskEnabled) { |
| | | log.debug("任务已被禁用,跳过本次任务处理"); |
| | | return; |
| | | } |
| | | |
| | | if (!isHealthy) { |
| | | log.debug("系统不健康,跳过任务处理"); |
| | | return; |
| | | } |
| | | |
| | | // 查询所有状态为pending的任务 |
| | | LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); |
| | | queryWrapper.eq(KeywordTask::getStatus, "pending"); |
| | | try { |
| | | // 查询所有状态为pending的任务 |
| | | LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); |
| | | queryWrapper.eq(KeywordTask::getStatus, "pending"); |
| | | |
| | | keywordTaskService.list(queryWrapper) |
| | | .stream() |
| | | .filter(task -> task.getTask_id() != null) |
| | | .forEach(task -> processTaskStatus(task) |
| | | .subscribeOn(Schedulers.boundedElastic()) |
| | | .subscribe( |
| | | updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), |
| | | error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) |
| | | ) |
| | | ); |
| | | List<KeywordTask> tasks = keywordTaskService.list(queryWrapper); |
| | | log.info("查询到 {} 个待处理任务", tasks.size()); |
| | | |
| | | for (KeywordTask task : tasks) { |
| | | if (task.getTask_id() != null) { |
| | | processTaskStatus(task) |
| | | .subscribeOn(Schedulers.boundedElastic()) |
| | | .subscribe( |
| | | updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), |
| | | error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) |
| | | ); |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("任务处理执行异常: {}", e.getMessage()); |
| | | } |
| | | } |
| | | /** |
| | | * 实际的任务处理方法,替代原@Scheduled注解方法 |
| | | */ |
| | | // public void executeTaskProcessing() { |
| | | // if (!isHealthy) { |
| | | // log.debug("系统不健康,跳过任务处理"); |
| | | // return; |
| | | // } |
| | | // |
| | | // // 查询所有状态为pending的任务 |
| | | // LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); |
| | | // queryWrapper.eq(KeywordTask::getStatus, "pending"); |
| | | // |
| | | // keywordTaskService.list(queryWrapper) |
| | | // .stream() |
| | | // .filter(task -> task.getTask_id() != null) |
| | | // .forEach(task -> processTaskStatus(task) |
| | | // .subscribeOn(Schedulers.boundedElastic()) |
| | | // .subscribe( |
| | | // updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), |
| | | // error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) |
| | | // ) |
| | | // ); |
| | | // } |
| | | |
| | | |
| | | private Mono<KeywordTask> processTaskStatus(KeywordTask task) { |