package com.linghu.timeTask;
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
import com.linghu.controller.CollectController;
|
import com.linghu.model.dto.TaskResultResponse;
|
import com.linghu.model.dto.TaskStatusResponse;
|
import com.linghu.model.entity.Keyword;
|
import com.linghu.model.entity.KeywordTask;
|
import com.linghu.model.entity.Question;
|
import com.linghu.service.KeywordService;
|
import com.linghu.service.KeywordTaskService;
|
import com.linghu.service.QuestionService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.stereotype.Component;
|
import org.springframework.web.reactive.function.client.WebClient;
|
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Mono;
|
import reactor.core.scheduler.Schedulers;
|
|
import javax.annotation.PostConstruct;
|
import java.time.Duration;
|
import java.time.format.DateTimeFormatter;
|
import java.util.List;
|
import java.util.stream.Collectors;
|
|
|
@Component
|
@Slf4j
|
public class ScheduledTasks {
|
private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
|
|
@Autowired
|
private KeywordTaskService keywordTaskService;
|
@Autowired
|
private CollectController collectController;
|
@Autowired
|
private WebClient webClient; // 假设已配置WebClient
|
@Autowired
|
private QuestionService questionService;
|
@Autowired
|
private KeywordService keywordService;
|
|
private volatile boolean isHealthy = true; // 健康状态标识
|
private volatile boolean initialCheckComplete = false; // 初始检查完成标志
|
private volatile boolean taskEnabled = true; // 任务启用开关
|
@PostConstruct
|
public void init() {
|
// 启动健康检查定时任务,每10分钟执行一次
|
checkInitialHealth();
|
// 初始健康时,主动启动任务(如果需要应用启动就立即执行任务)
|
|
}
|
|
/**
|
* 执行初始健康检查,确保系统启动时任务状态正确
|
*/
|
private void checkInitialHealth() {
|
log.info("执行系统启动初始健康检查...");
|
|
try {
|
// 同步执行健康检查,最多等待30秒
|
Boolean healthCheckResult = collectController.checkThirdPartyHealth()
|
.map(response -> "healthy".equalsIgnoreCase(response.getStatus()))
|
.block(Duration.ofSeconds(30));
|
|
isHealthy = Boolean.TRUE.equals(healthCheckResult);
|
taskEnabled = isHealthy;
|
|
if (isHealthy) {
|
log.info("系统启动时健康检查通过,任务处理将正常执行");
|
} else {
|
log.warn("系统启动时健康检查失败,任务处理将暂停");
|
}
|
} catch (Exception e) {
|
log.error("初始健康检查失败: {}", e.getMessage());
|
isHealthy = false;
|
taskEnabled = false;
|
} finally {
|
initialCheckComplete = true;
|
}
|
}
|
|
/**
|
* 健康检查定时任务,每10分钟执行一次
|
*/
|
@Scheduled(initialDelay = 600000, fixedRate = 600000) // 10分钟 = 600000毫秒
|
public void checkHealth() {
|
// 等待初始检查完成
|
if (!initialCheckComplete) {
|
log.debug("初始健康检查未完成,跳过本次健康检查");
|
return;
|
}
|
|
log.info("开始执行健康检查...");
|
try {
|
collectController.checkThirdPartyHealth()
|
.map(response -> "healthy".equalsIgnoreCase(response.getStatus()))
|
.subscribe(
|
healthStatus -> {
|
boolean previousHealthStatus = isHealthy;
|
isHealthy = healthStatus;
|
|
// 状态变化时更新任务开关
|
if (previousHealthStatus != isHealthy) {
|
taskEnabled = isHealthy;
|
if (isHealthy) {
|
log.info("健康检查通过,恢复任务处理");
|
} else {
|
log.warn("健康检查失败,暂停任务处理");
|
}
|
} else {
|
log.info("健康状态未变化,当前状态: {}", isHealthy ? "健康" : "不健康");
|
}
|
},
|
error -> {
|
log.error("健康检查请求失败: {}", error.getMessage());
|
if (isHealthy) {
|
isHealthy = false;
|
taskEnabled = false;
|
log.warn("因健康检查请求失败,暂停任务处理");
|
}
|
}
|
);
|
} catch (Exception e) {
|
log.error("健康检查执行异常: {}", e.getMessage());
|
if (isHealthy) {
|
isHealthy = false;
|
taskEnabled = false;
|
}
|
}
|
}
|
|
/**
|
* 任务处理定时任务,每10秒执行一次
|
*/
|
@Scheduled(initialDelay = 0, fixedRate = 10000) // 10秒 = 10000毫秒
|
public void executeTaskProcessing() {
|
// 检查初始检查是否完成和任务是否启用
|
if (!initialCheckComplete) {
|
log.debug("初始检查未完成,跳过本次任务处理");
|
return;
|
}
|
|
if (!taskEnabled) {
|
log.debug("任务已被禁用,跳过本次任务处理");
|
return;
|
}
|
|
if (!isHealthy) {
|
log.debug("系统不健康,跳过任务处理");
|
return;
|
}
|
|
try {
|
// 查询所有状态为pending的任务
|
LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
|
queryWrapper.eq(KeywordTask::getStatus, "pending");
|
|
List<KeywordTask> tasks = keywordTaskService.list(queryWrapper);
|
log.info("查询到 {} 个待处理任务", tasks.size());
|
// 先标记成 processing,避免下一轮又被调度发现
|
if (!tasks.isEmpty()) {
|
// 提取查询到的任务id列表
|
List<Integer> taskIds = tasks.stream()
|
.map(KeywordTask::getId) // 假设任务有唯一id字段
|
.collect(Collectors.toList());
|
|
// 批量更新:仅更新“id在查询列表中”且“状态仍为pending”的任务
|
boolean updatedCount = keywordTaskService.update(
|
new LambdaUpdateWrapper<KeywordTask>()
|
.in(KeywordTask::getId, taskIds) // 限定为查询到的任务
|
.eq(KeywordTask::getStatus, "pending") // 确保状态未被其他进程修改
|
.set(KeywordTask::getStatus, "processing")
|
);
|
log.info("成功标记 {} 个任务为processing(查询到{}个)", updatedCount, tasks.size());
|
}
|
|
for (KeywordTask task : tasks) {
|
if (task.getTask_id() != null) {
|
|
processTaskStatus(task)
|
.subscribeOn(Schedulers.boundedElastic())
|
.subscribe(
|
updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
|
error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
|
);
|
}
|
}
|
} catch (Exception e) {
|
log.error("任务处理执行异常: {}", e.getMessage());
|
}
|
}
|
/**
|
* 实际的任务处理方法,替代原@Scheduled注解方法
|
*/
|
// public void executeTaskProcessing() {
|
// if (!isHealthy) {
|
// log.debug("系统不健康,跳过任务处理");
|
// return;
|
// }
|
//
|
// // 查询所有状态为pending的任务
|
// LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
|
// queryWrapper.eq(KeywordTask::getStatus, "pending");
|
//
|
// keywordTaskService.list(queryWrapper)
|
// .stream()
|
// .filter(task -> task.getTask_id() != null)
|
// .forEach(task -> processTaskStatus(task)
|
// .subscribeOn(Schedulers.boundedElastic())
|
// .subscribe(
|
// updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
|
// error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
|
// )
|
// );
|
// }
|
|
|
private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
|
return collectController.getTaskStatus(task.getTask_id())
|
.flatMap(statusResponse -> {
|
if ("completed".equalsIgnoreCase(statusResponse.getStatus())) {
|
log.info("任务 {} 已完成,获取结果", task.getTask_id());
|
return collectController.getTaskResult(task.getTask_id())
|
.doOnSuccess(result -> log.info("获取任务 {} 结果成功", task.getTask_id()))
|
.thenReturn(task)
|
.map(t -> {
|
t.setStatus("completed");
|
return t;
|
});
|
} else if (!"submitted".equalsIgnoreCase(statusResponse.getStatus())
|
&& !"running".equalsIgnoreCase(statusResponse.getStatus())
|
&& !"Error".equalsIgnoreCase(statusResponse.getStatus())) {
|
task.setStatus("false");
|
// 新增:处理status为false时的关键词状态更新
|
return updateKeywordStatusWhenTaskFinished(task)
|
.then(Mono.just(task));
|
}else if ( "running".equalsIgnoreCase(statusResponse.getStatus())) {
|
// 改回 pending,进行下一轮查询
|
task.setStatus("pending");
|
//更新每个提问词的状态
|
return updateQuestionStatus(task, statusResponse); // 抽取为独立方法
|
}else if("ERROR".equalsIgnoreCase(statusResponse.getStatus())&&statusResponse.getMessage().contains("Task not found")){
|
task.setStatus("nonentity");
|
// 更新关键词状态
|
return updateKeywordStatusWhenTaskFinished(task)
|
.then(Mono.just(task));
|
}
|
else {
|
// 任务仍在进行中,不更新状态
|
return Mono.empty();
|
}
|
})
|
.switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务
|
.flatMap(t -> {
|
|
// 修改这里:将updateById的结果包装成Mono
|
return Mono.fromSupplier(() -> keywordTaskService.updateById(t))
|
.thenReturn(t);
|
|
})
|
.onErrorResume(e -> {
|
log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage());
|
task.setStatus("error");
|
|
// 将updateById的结果包装成Mono
|
return Mono.fromSupplier(() -> keywordTaskService.updateById(task))
|
.thenReturn(task);
|
});
|
}
|
|
private Mono<KeywordTask> updateQuestionStatus(KeywordTask task, TaskStatusResponse statusResponse) {
|
// 1. 先执行同步查询,获取 List<Question>
|
List<Question> questions = questionService.lambdaQuery()
|
.eq(Question::getKeyword_id, task.getKeyword_id())
|
.list();
|
|
// 2. 将 List 转为 Flux,再进行响应式处理
|
return Flux.fromIterable(questions)
|
.flatMap(question -> {
|
// 更新逻辑...
|
String newStatus = statusResponse.getQuestions_status().stream()
|
.filter(qs -> qs.getQuestion().equals(question.getQuestion()))
|
.findFirst()
|
.map(qs -> qs.getStatus())
|
.orElse(question.getStatus());
|
question.setStatus(newStatus);
|
|
return Mono.fromSupplier(() -> questionService.updateById(question))
|
// 此时可以调用 doOnError 处理异常
|
.doOnError(e -> log.error("更新 Question {} 失败: {}", question.getQuestion_id(), e.getMessage()))
|
.onErrorReturn(false); // 异常时返回 false
|
})
|
.then(Mono.just(task));
|
}
|
/**
|
* 当任务状态为false/nonentity时,更新关键词状态
|
*/
|
private Mono<Void> updateKeywordStatusWhenTaskFinished(KeywordTask task) {
|
return Mono.fromSupplier(() -> {
|
Keyword keyword = keywordService.getById(task.getKeyword_id());
|
LambdaQueryWrapper<KeywordTask> keywordTaskWrapper = new LambdaQueryWrapper<>();
|
keywordTaskWrapper.eq(KeywordTask::getKeyword_id, keyword.getKeyword_id());
|
return keywordTaskService.list(keywordTaskWrapper);
|
}).flatMap(keywordTasks -> {
|
// 检查所有关联任务是否都已完成(包括各种结束状态)
|
boolean allCompleted = keywordTasks.stream().allMatch(t ->
|
"completed".equals(t.getStatus()) ||
|
"false".equals(t.getStatus()) ||
|
"cancelled".equals(t.getStatus()) ||
|
"canceled".equals(t.getStatus()) ||
|
"nonentity".equals(t.getStatus())
|
);
|
|
if (allCompleted) {
|
Keyword keyword = keywordService.getById(task.getKeyword_id());
|
keyword.setStatus("completed");
|
//
|
return Mono.fromSupplier(() -> keywordService.updateById(keyword))
|
.then(); // 转换为Mono<Void>
|
}
|
return Mono.empty();
|
});
|
}
|
}
|