package com.linghu.timeTask;
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
import com.linghu.controller.CollectController;
|
import com.linghu.model.dto.TaskResultResponse;
|
import com.linghu.model.dto.TaskStatusResponse;
|
import com.linghu.model.entity.KeywordTask;
|
import com.linghu.model.entity.Question;
|
import com.linghu.service.KeywordTaskService;
|
import com.linghu.service.QuestionService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.stereotype.Component;
|
import org.springframework.web.reactive.function.client.WebClient;
|
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Mono;
|
import reactor.core.scheduler.Schedulers;
|
|
import javax.annotation.PostConstruct;
|
import javax.annotation.PreDestroy;
|
import java.time.LocalDateTime;
|
import java.time.format.DateTimeFormatter;
|
import java.util.List;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.TimeUnit;
|
|
@Component
|
@Slf4j
|
public class ScheduledTasks {
|
// private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class);
|
private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
|
|
@Autowired
|
private KeywordTaskService keywordTaskService;
|
|
@Autowired
|
private CollectController collectController;
|
@Autowired
|
private WebClient webClient; // 假设已配置WebClient
|
@Autowired
|
private QuestionService questionService;
|
|
private String baseUrl = "http://thirdparty-service"; // 第三方服务基础URL
|
private ScheduledFuture<?> scheduledTask; // 定时任务引用
|
// 健康检查专用调度器(单线程足够)
|
private final ScheduledExecutorService healthCheckScheduler = Executors.newSingleThreadScheduledExecutor();
|
// 任务处理专用调度器(可根据任务量调整线程数)
|
private final ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(2); // 2 个线程
|
private volatile boolean isHealthy = true; // 健康状态标识
|
@PostConstruct
|
public void init() {
|
// 启动健康检查定时任务,每10分钟执行一次
|
healthCheckScheduler.scheduleAtFixedRate(this::checkHealth, 0, 10, TimeUnit.MINUTES);
|
// 初始健康时,主动启动任务(如果需要应用启动就立即执行任务)
|
if (isHealthy) {
|
startTaskProcessing();
|
}
|
}
|
|
/**
|
* 健康检查方法,调用第三方服务健康检查接口
|
*/
|
private void checkHealth() {
|
log.info("开始执行健康检查...");
|
collectController.checkThirdPartyHealth()
|
.subscribe(
|
response -> {
|
boolean previousHealthStatus = isHealthy;
|
isHealthy = "healthy".equalsIgnoreCase(response.getStatus());
|
|
// 状态变化时记录日志并控制任务执行
|
if (previousHealthStatus != isHealthy) {
|
if (isHealthy) {
|
log.info("健康检查通过,恢复任务处理");
|
startTaskProcessing();
|
} else {
|
log.warn("健康检查失败,暂停任务处理");
|
stopTaskProcessing();
|
}
|
}
|
},
|
error -> {
|
log.error("健康检查请求失败: {}", error.getMessage());
|
if (isHealthy) { // 仅在健康状态变化时记录并停止任务
|
isHealthy = false;
|
log.warn("因健康检查请求失败,暂停任务处理");
|
stopTaskProcessing();
|
}
|
}
|
);
|
}
|
|
/**
|
* 启动任务处理定时执行
|
*/
|
private synchronized void startTaskProcessing() {
|
if (scheduledTask == null || scheduledTask.isCancelled()) {
|
scheduledTask = taskScheduler.scheduleAtFixedRate(
|
this::executeTaskProcessing,
|
0,
|
10,
|
TimeUnit.SECONDS
|
);
|
System.out.println("nima");
|
}
|
}
|
|
/**
|
* 停止任务处理定时执行
|
*/
|
private synchronized void stopTaskProcessing() {
|
if (scheduledTask != null && !scheduledTask.isCancelled()) {
|
scheduledTask.cancel(false);
|
}
|
}
|
@PreDestroy
|
public void destroy() {
|
// 关闭健康检查调度器
|
healthCheckScheduler.shutdown();
|
try {
|
if (!healthCheckScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
|
healthCheckScheduler.shutdownNow();
|
}
|
} catch (InterruptedException e) {
|
healthCheckScheduler.shutdownNow();
|
}
|
|
// 关闭任务处理调度器
|
taskScheduler.shutdown();
|
try {
|
if (!taskScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
|
taskScheduler.shutdownNow();
|
}
|
} catch (InterruptedException e) {
|
taskScheduler.shutdownNow();
|
}
|
}
|
/**
|
* 实际的任务处理方法,替代原@Scheduled注解方法
|
*/
|
public void executeTaskProcessing() {
|
if (!isHealthy) {
|
log.debug("系统不健康,跳过任务处理");
|
return;
|
}
|
|
// 查询所有状态为pending的任务
|
LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
|
queryWrapper.eq(KeywordTask::getStatus, "pending");
|
|
keywordTaskService.list(queryWrapper)
|
.stream()
|
.filter(task -> task.getTask_id() != null)
|
.forEach(task -> processTaskStatus(task)
|
.subscribeOn(Schedulers.boundedElastic())
|
.subscribe(
|
updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
|
error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
|
)
|
);
|
}
|
|
|
private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
|
return collectController.getTaskStatus(task.getTask_id())
|
.flatMap(statusResponse -> {
|
if ("completed".equalsIgnoreCase(statusResponse.getStatus())) {
|
log.info("任务 {} 已完成,获取结果", task.getTask_id());
|
return collectController.getTaskResult(task.getTask_id())
|
.doOnSuccess(result -> log.info("获取任务 {} 结果成功", task.getTask_id()))
|
.thenReturn(task)
|
.map(t -> {
|
t.setStatus("completed");
|
return t;
|
});
|
} else if (!"submitted".equalsIgnoreCase(statusResponse.getStatus())
|
&& !"running".equalsIgnoreCase(statusResponse.getStatus())
|
&& !"Error".equalsIgnoreCase(statusResponse.getStatus())) {
|
task.setStatus("false");
|
return Mono.just(task);
|
}else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) {
|
//更新每个提问词的状态
|
return updateQuestionStatus(task, statusResponse); // 抽取为独立方法
|
}else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&"任务不存在".equalsIgnoreCase(statusResponse.getMessage())){
|
task.setStatus("nonentity");
|
return Mono.just(task);
|
}
|
else {
|
// 任务仍在进行中,不更新状态
|
return Mono.empty();
|
}
|
})
|
.switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务
|
.flatMap(t -> {
|
if (!"pending".equalsIgnoreCase(t.getStatus())) {
|
|
// 修改这里:将updateById的结果包装成Mono
|
return Mono.fromSupplier(() -> keywordTaskService.updateById(t))
|
.thenReturn(t);
|
}
|
return Mono.just(t);
|
})
|
.onErrorResume(e -> {
|
log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage());
|
task.setStatus("error");
|
|
// 修改这里:将updateById的结果包装成Mono
|
return Mono.fromSupplier(() -> keywordTaskService.updateById(task))
|
.thenReturn(task);
|
});
|
}
|
|
private Mono<KeywordTask> updateQuestionStatus(KeywordTask task, TaskStatusResponse statusResponse) {
|
// 1. 先执行同步查询,获取 List<Question>
|
List<Question> questions = questionService.lambdaQuery()
|
.eq(Question::getKeyword_id, task.getKeyword_id())
|
.list();
|
|
// 2. 将 List 转为 Flux,再进行响应式处理
|
return Flux.fromIterable(questions)
|
.flatMap(question -> {
|
// 更新逻辑...
|
String newStatus = statusResponse.getQuestions_status().stream()
|
.filter(qs -> qs.getQuestion().equals(question.getQuestion()))
|
.findFirst()
|
.map(qs -> qs.getStatus())
|
.orElse(question.getStatus());
|
question.setStatus(newStatus);
|
|
return Mono.fromSupplier(() -> questionService.updateById(question))
|
// 此时可以调用 doOnError 处理异常
|
.doOnError(e -> log.error("更新 Question {} 失败: {}", question.getQuestion_id(), e.getMessage()))
|
.onErrorReturn(false); // 异常时返回 false
|
})
|
.then(Mono.just(task));
|
}
|
}
|