src/main/java/com/linghu/LingHuApplication.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/linghu/controller/CollectController.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/linghu/model/entity/KeywordTask.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/linghu/timeTask/ScheduledTasks.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/resources/mapper/KeywordTaskMapper.xml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/main/java/com/linghu/LingHuApplication.java
@@ -3,8 +3,10 @@ import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling @MapperScan("com.linghu.mapper") public class LingHuApplication { src/main/java/com/linghu/controller/CollectController.java
@@ -239,6 +239,7 @@ KeywordTask keywordTask = new KeywordTask(); keywordTask.setKeyword_id(keywordId); keywordTask.setTask_id(response.getTask_id()); keywordTask.setStatus("pending"); return keywordTask; }) .collect(Collectors.toList()); @@ -642,4 +643,9 @@ .onErrorResume(e -> Mono.just( new ServerResourceResponse( e.getMessage()))); } /** * 传入orderid查所有关键词id以及关键词下面的所有任务id,轮询所有任务状态,如果状态为completed,则循环调用获取结果接口,处理结果 */ } src/main/java/com/linghu/model/entity/KeywordTask.java
@@ -29,6 +29,7 @@ * */ private String task_id; private String status; @TableField(exist = false) private static final long serialVersionUID = 1L; @@ -47,7 +48,8 @@ KeywordTask other = (KeywordTask) that; return (this.getId() == null ? other.getId() == null : this.getId().equals(other.getId())) && (this.getKeyword_id() == null ? other.getKeyword_id() == null : this.getKeyword_id().equals(other.getKeyword_id())) && (this.getTask_id() == null ? other.getTask_id() == null : this.getTask_id().equals(other.getTask_id())); && (this.getTask_id() == null ? other.getTask_id() == null : this.getTask_id().equals(other.getTask_id())) && (this.getStatus() == null ? other.getStatus() == null : this.getStatus().equals(other.getStatus())); } @Override @@ -57,6 +59,7 @@ result = prime * result + ((getId() == null) ? 0 : getId().hashCode()); result = prime * result + ((getKeyword_id() == null) ? 0 : getKeyword_id().hashCode()); result = prime * result + ((getTask_id() == null) ? 0 : getTask_id().hashCode()); result = prime * result + ((getStatus() == null) ? 0 : getStatus().hashCode()); return result; } @@ -70,6 +73,7 @@ sb.append(", keyword_id=").append(keyword_id); sb.append(", task_id=").append(task_id); sb.append(", serialVersionUID=").append(serialVersionUID); sb.append(", status=").append(status); sb.append("]"); return sb.toString(); } src/main/java/com/linghu/timeTask/ScheduledTasks.java
New file @@ -0,0 +1,94 @@ package com.linghu.timeTask; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.linghu.controller.CollectController; import com.linghu.model.dto.TaskResultResponse; import com.linghu.model.dto.TaskStatusResponse; import com.linghu.model.entity.KeywordTask; import com.linghu.service.KeywordTaskService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.concurrent.TimeUnit; @Component public class ScheduledTasks { private static final Logger log = LoggerFactory.getLogger(ScheduledTasks.class); private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); @Autowired private KeywordTaskService keywordTaskService; @Autowired private CollectController collectController; @Scheduled(fixedRate = 5000) // 每5秒执行一次 public void scheduleFixedRateTask() { // 查询所有状态为pending的任务 LambdaQueryWrapper<KeywordTask> queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(KeywordTask::getStatus, "pending"); keywordTaskService.list(queryWrapper) .stream() .filter(task -> task.getTask_id() != null) .forEach(task -> processTaskStatus(task) .subscribeOn(Schedulers.boundedElastic()) // 在弹性线程池执行 .subscribe( updatedTask -> log.info("任务状态已更新: {}", updatedTask.getTask_id()), error -> log.error("处理任务 {} 时发生错误: {}", task.getTask_id(), error.getMessage()) ) ); } private Mono<KeywordTask> processTaskStatus(KeywordTask task) { return collectController.getTaskStatus(task.getTask_id()) .flatMap(statusResponse -> { if ("completed".equalsIgnoreCase(statusResponse.getStatus())) { log.info("任务 {} 已完成,获取结果", task.getTask_id()); return collectController.getTaskResult(task.getTask_id()) .doOnSuccess(result -> log.info("获取任务 {} 结果成功", task.getTask_id())) .thenReturn(task) .map(t -> { t.setStatus("completed"); return t; }); } else if (!"submit".equalsIgnoreCase(statusResponse.getStatus()) && !"running".equalsIgnoreCase(statusResponse.getStatus())) { task.setStatus("false"); return Mono.just(task); } else { // 任务仍在进行中,不更新状态 return Mono.empty(); } }) .switchIfEmpty(Mono.just(task)) // 如果状态检查返回empty,保持原有任务 .flatMap(t -> { if (!"pending".equalsIgnoreCase(t.getStatus())) { // 修改这里:将updateById的结果包装成Mono return Mono.fromSupplier(() -> keywordTaskService.updateById(t)) .thenReturn(t); } return Mono.just(t); }) .onErrorResume(e -> { log.error("处理任务 {} 状态时发生错误: {}", task.getTask_id(), e.getMessage()); task.setStatus("error"); // 修改这里:将updateById的结果包装成Mono return Mono.fromSupplier(() -> keywordTaskService.updateById(task)) .thenReturn(task); }); } } src/main/resources/mapper/KeywordTaskMapper.xml
@@ -8,9 +8,10 @@ <id property="id" column="id" jdbcType="INTEGER"/> <result property="keyword_id" column="keyword_id" jdbcType="INTEGER"/> <result property="task_id" column="task_id" jdbcType="VARCHAR"/> <result property="status" column="status" jdbcType="VARCHAR"/> </resultMap> <sql id="Base_Column_List"> id,keyword_id,task_id id,keyword_id,task_id, status </sql> </mapper>