package com.linghu.timeTask;
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.linghu.controller.CollectController;
|
import com.linghu.model.dto.TaskResultResponse;
|
import com.linghu.model.dto.TaskStatusResponse;
|
import com.linghu.model.entity.KeywordTask;
|
import com.linghu.service.KeywordTaskService;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.stereotype.Component;
|
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Mono;
|
import reactor.core.scheduler.Schedulers;
|
|
import java.time.LocalDateTime;
|
import java.time.format.DateTimeFormatter;
|
import java.util.List;
|
import java.util.concurrent.TimeUnit;
|
|
@Component
|
public class ScheduledTasks {
|
private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class);
|
private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
|
|
@Autowired
|
private KeywordTaskService keywordTaskService;
|
|
@Autowired
|
private CollectController collectController;
|
|
@Scheduled(fixedRate = 5000) // 每5秒执行一次
|
public void scheduleFixedRateTask() {
|
// 查询所有状态为pending的任务
|
LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>();
|
queryWrapper.eq(KeywordTask::getStatus, "pending");
|
|
keywordTaskService.list(queryWrapper)
|
.stream()
|
.filter(task -> task.getTask_id() != null)
|
.forEach(task -> processTaskStatus(task)
|
.subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行
|
.subscribe(
|
updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()),
|
error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage())
|
)
|
);
|
}
|
|
private Mono<KeywordTask> processTaskStatus(KeywordTask task) {
|
return collectController.getTaskStatus(task.getTask_id())
|
.flatMap(statusResponse -> {
|
if ("completed".equalsIgnoreCase(statusResponse.getStatus())) {
|
log.info("任务 {} 已完成,获取结果", task.getTask_id());
|
return collectController.getTaskResult(task.getTask_id())
|
.doOnSuccess(result -> log.info("获取任务 {} 结果成功", task.getTask_id()))
|
.thenReturn(task)
|
.map(t -> {
|
t.setStatus("completed");
|
return t;
|
});
|
} else if (!"submitted".equalsIgnoreCase(statusResponse.getStatus())
|
&& !"running".equalsIgnoreCase(statusResponse.getStatus())
|
&& !"Error".equalsIgnoreCase(statusResponse.getStatus())) {
|
task.setStatus("false");
|
return Mono.just(task);
|
} else {
|
// 任务仍在进行中,不更新状态
|
return Mono.empty();
|
}
|
})
|
.switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务
|
.flatMap(t -> {
|
if (!"pending".equalsIgnoreCase(t.getStatus())) {
|
|
|
// 修改这里:将updateById的结果包装成Mono
|
return Mono.fromSupplier(() -> keywordTaskService.updateById(t))
|
.thenReturn(t);
|
}
|
return Mono.just(t);
|
})
|
.onErrorResume(e -> {
|
log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage());
|
task.setStatus("error");
|
|
// 修改这里:将updateById的结果包装成Mono
|
return Mono.fromSupplier(() -> keywordTaskService.updateById(task))
|
.thenReturn(task);
|
});
|
}
|
}
|