| | |
| | | import com.linghu.model.dto.TaskStatusResponse; |
| | | import com.linghu.model.entity.KeywordTask; |
| | | import com.linghu.service.KeywordTaskService; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.scheduling.annotation.Scheduled; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.web.reactive.function.client.WebClient; |
| | | import reactor.core.publisher.Flux; |
| | | import reactor.core.publisher.Mono; |
| | | import reactor.core.scheduler.Schedulers; |
| | | |
| | | import javax.annotation.PostConstruct; |
| | | import javax.annotation.PreDestroy; |
| | | import java.time.LocalDateTime; |
| | | import java.time.format.DateTimeFormatter; |
| | | import java.util.List; |
| | | import java.util.concurrent.Executors; |
| | | import java.util.concurrent.ScheduledExecutorService; |
| | | import java.util.concurrent.ScheduledFuture; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | @Component |
| | | @Slf4j |
| | | public class ScheduledTasks { |
| | | private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class); |
| | | // private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class); |
| | | private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); |
| | | |
| | | @Autowired |
| | |
| | | |
| | | @Autowired |
| | | private CollectController collectController; |
| | | @Autowired |
| | | private WebClient webClient; // 假设已配置WebClient |
| | | |
| | | @Scheduled(fixedRate = 10000) // 每5秒执行一次 |
| | | public void scheduleFixedRateTask() { |
| | | private String baseUrl = "http://thirdparty-service"; // 第三方服务基础URL |
| | | private ScheduledFuture<?> scheduledTask; // 定时任务引用 |
| | | // 健康检查专用调度器(单线程足够) |
| | | private final ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(); |
| | | // 任务处理专用调度器(可根据任务量调整线程数) |
| | | private final ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(2); // 2 个线程 |
| | | private volatile boolean isHealthy = true; // 健康状态标识 |
| | | @PostConstruct |
| | | public void init() { |
| | | // 启动健康检查定时任务,每10分钟执行一次 |
| | | healthCheckScheduler.scheduleAtFixedRate(this::checkHealth, 0, 10, TimeUnit.MINUTES); |
| | | // 初始健康时,主动启动任务(如果需要应用启动就立即执行任务) |
| | | if (isHealthy) { |
| | | startTaskProcessing(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 健康检查方法,调用第三方服务健康检查接口 |
| | | */ |
| | | private void checkHealth() { |
| | | log.info("开始执行健康检查..."); |
| | | collectController.checkThirdPartyHealth() |
| | | .subscribe( |
| | | response -> { |
| | | boolean previousHealthStatus = isHealthy; |
| | | isHealthy = "healthy".equalsIgnoreCase(response.getStatus()); |
| | | |
| | | // 状态变化时记录日志并控制任务执行 |
| | | if (previousHealthStatus != isHealthy) { |
| | | if (isHealthy) { |
| | | log.info("健康检查通过,恢复任务处理"); |
| | | startTaskProcessing(); |
| | | } else { |
| | | log.warn("健康检查失败,暂停任务处理"); |
| | | stopTaskProcessing(); |
| | | } |
| | | } |
| | | }, |
| | | error -> { |
| | | log.error("健康检查请求失败: {}", error.getMessage()); |
| | | if (isHealthy) { // 仅在健康状态变化时记录并停止任务 |
| | | isHealthy = false; |
| | | log.warn("因健康检查请求失败,暂停任务处理"); |
| | | stopTaskProcessing(); |
| | | } |
| | | } |
| | | ); |
| | | } |
| | | |
| | | /** |
| | | * 启动任务处理定时执行 |
| | | */ |
| | | private synchronized void startTaskProcessing() { |
| | | if (scheduledTask == null || scheduledTask.isCancelled()) { |
| | | scheduledTask = taskScheduler.scheduleAtFixedRate( |
| | | this::executeTaskProcessing, |
| | | 0, |
| | | 10, |
| | | TimeUnit.SECONDS |
| | | ); |
| | | System.out.println("nima"); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 停止任务处理定时执行 |
| | | */ |
| | | private synchronized void stopTaskProcessing() { |
| | | if (scheduledTask != null && !scheduledTask.isCancelled()) { |
| | | scheduledTask.cancel(false); |
| | | } |
| | | } |
| | | @PreDestroy |
| | | public void destroy() { |
| | | // 关闭健康检查调度器 |
| | | healthCheckScheduler.shutdown(); |
| | | try { |
| | | if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) { |
| | | healthCheckScheduler.shutdownNow(); |
| | | } |
| | | } catch (InterruptedException e) { |
| | | healthCheckScheduler.shutdownNow(); |
| | | } |
| | | |
| | | // 关闭任务处理调度器 |
| | | taskScheduler.shutdown(); |
| | | try { |
| | | if (!taskScheduler.awaitTermination(5, TimeUnit.SECONDS)) { |
| | | taskScheduler.shutdownNow(); |
| | | } |
| | | } catch (InterruptedException e) { |
| | | taskScheduler.shutdownNow(); |
| | | } |
| | | } |
| | | /** |
| | | * 实际的任务处理方法,替代原@Scheduled注解方法 |
| | | */ |
| | | public void executeTaskProcessing() { |
| | | if (!isHealthy) { |
| | | log.debug("系统不健康,跳过任务处理"); |
| | | return; |
| | | } |
| | | |
| | | // 查询所有状态为pending的任务 |
| | | LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); |
| | | queryWrapper.eq(KeywordTask::getStatus, "pending"); |
| | |
| | | .stream() |
| | | .filter(task -> task.getTask_id() != null) |
| | | .forEach(task -> processTaskStatus(task) |
| | | .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行 |
| | | .subscribeOn(Schedulers.boundedElastic()) |
| | | .subscribe( |
| | | updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), |
| | | error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) |
| | | ) |
| | | ); |
| | | } |
| | | // @Scheduled(fixedRate = 10000) // 每5秒执行一次 |
| | | // public void scheduleFixedRateTask() { |
| | | // // 查询所有状态为pending的任务 |
| | | // LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); |
| | | // queryWrapper.eq(KeywordTask::getStatus, "pending"); |
| | | // |
| | | // keywordTaskService.list(queryWrapper) |
| | | // .stream() |
| | | // .filter(task -> task.getTask_id() != null) |
| | | // .forEach(task -> processTaskStatus(task) |
| | | // .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行 |
| | | // .subscribe( |
| | | // updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), |
| | | // error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) |
| | | // ) |
| | | // ); |
| | | // } |
| | | |
| | | private Mono<KeywordTask> processTaskStatus(KeywordTask task) { |
| | | return collectController.getTaskStatus(task.getTask_id()) |